logo头像
Snippet 博客主题

Spark+Python ML库进行分布式预测

很多机器学习库往往都只有python的版本,但是由于都是本地单机版的,所以并不适用于线上大规模的例行离线预测。Spark虽然也有mllib,但是支持度并不够,很多有用的算法并没有集成。所以需要考虑如何整合Spark以及开源python ml库来同时获得分布式执行的优点以及开源库的丰富性。


一种方法是spark读出DataFrame数据后直接调用toPandas()方法将其转化为pandas的DataFrame,然后就可以调用python库了。但是这种方法是将数据都加载到了driver端,对于线上真实大数据的场景可能并不适用,单机内存可能不太够。


还有一种方法是利用rdd的mappartitions的功能在每个executor上调用python库,具体来说整体思路如下:

  • 离线使用python库(sklearn等都可)训练模型
  • 将训练好的模型保存为本地文件并存储到分布式对象存储系统中(能从中取得二进制流即可)
  • 启动spark任务
    • 从读取的二进制流加载模型
    • 从hive读取所需要的特征数据
    • 并行使用模型预测结果
    • 将预测结果保存到hive表


核心代码框架如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import pickle
import pandas as pd
import xgboost as xgb
from pyspark.sql.types import *

def execute(data):
features = ['f1', 'f2', 'f3']
model = pickle.loads('') # 从网络中获得的二进制流中加载模型
df = pd.DataFrame(list(data), columns=features) # 将数据转成pandas的DataFrame
preds = model.predict(xgb.DMatrix(df[features].values, feature_names=features)) # xgb
preds = model.predict(df[features]) # 如sklearn的rf模型
for i, prediction in enumerate(preds):
yield df[i]['id'], str(prediction) # 将类似id等必要信息及预测值存储到hive

result = df.rdd.mapPartitions(execute)
schema = StructType([
StructField('id', StringType(), True),
StructField('prediction', StringType(), True)
])
result = spark.createDataFrame(result, schema) # 获得Spark DataFrame结果

spark相关文档

  1. rdd mapPartition:https://spark.apache.org/docs/3.2.1/api/python/reference/api/pyspark.RDD.mapPartitions.html
  2. pyspark user_guide:https://spark.apache.org/docs/3.2.1/api/python/user_guide/index.html

评论系统未开启,无法评论!