很多机器学习库往往都只有python的版本,但是由于都是本地单机版的,所以并不适用于线上大规模的例行离线预测。Spark虽然也有mllib,但是支持度并不够,很多有用的算法并没有集成。所以需要考虑如何整合Spark以及开源python ml库来同时获得分布式执行的优点以及开源库的丰富性。
一种方法是spark读出DataFrame数据后直接调用toPandas()方法将其转化为pandas的DataFrame,然后就可以调用python库了。但是这种方法是将数据都加载到了driver端,对于线上真实大数据的场景可能并不适用,单机内存可能不太够。
还有一种方法是利用rdd的mappartitions的功能在每个executor上调用python库,具体来说整体思路如下:
- 离线使用python库(sklearn等都可)训练模型
- 将训练好的模型保存为本地文件并存储到分布式对象存储系统中(能从中取得二进制流即可)
- 启动spark任务
- 从读取的二进制流加载模型
- 从hive读取所需要的特征数据
- 并行使用模型预测结果
- 将预测结果保存到hive表
核心代码框架如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| import pickle import pandas as pd import xgboost as xgb from pyspark.sql.types import *
def execute(data): features = ['f1', 'f2', 'f3'] model = pickle.loads('') # 从网络中获得的二进制流中加载模型 df = pd.DataFrame(list(data), columns=features) # 将数据转成pandas的DataFrame preds = model.predict(xgb.DMatrix(df[features].values, feature_names=features)) # xgb preds = model.predict(df[features]) # 如sklearn的rf模型 for i, prediction in enumerate(preds): yield df[i]['id'], str(prediction) # 将类似id等必要信息及预测值存储到hive
result = df.rdd.mapPartitions(execute) schema = StructType([ StructField('id', StringType(), True), StructField('prediction', StringType(), True) ]) result = spark.createDataFrame(result, schema) # 获得Spark DataFrame结果
|
spark相关文档
- rdd mapPartition:https://spark.apache.org/docs/3.2.1/api/python/reference/api/pyspark.RDD.mapPartitions.html
- pyspark user_guide:https://spark.apache.org/docs/3.2.1/api/python/user_guide/index.html
评论系统未开启,无法评论!