使用Spark MLlib模块进行机器学习模型训练
使用模块介绍
Spark MLlib是基于dataframe数据格式的机器学习模块,Spark ML支持基于Pipeline的数据流式模型训练,我们可以通过构建一个一个的stage模块组成Pipeline模块,进行模型训练。
建模流程:
数据加载 ==> 特征预处理 ==> 算法初始化 ==> 构建Pipeline处理器 ==> 模型评估
数据加载
由于旧的基于RDD数据格式进行模型训练的Spark MLlib已经弃用,所以统一将数据加载为dataframe数据格式
由于数据源是CSV文件,所以需要注明head和分隔符,因为数据源数据并不大所以可以全部缓存在内存中,便于加速训练。
1 | Dataset<Row> dataDf = spark.read().format("csv").option("header","true").option("sep", ",") |
特征预处理
数据特征预处理步骤如下:
缺失值补充 ==> 区分字符型和数值型变量 ==> 字符型变量转为哑变量 ==> 数值类型变量归一化 => 数值型变量进行共线性检验 ==> 特征变量合并
区分字符型和数值型变量:
通过遍历找出字符类型变量和数值类型变量,同时去除label变量
1 | String str = "StringType"; |
字符型变量进行OneHot编码转换:
字符类型变量转为哑变量的过程由两部构成:
(1)字符变量索引为对应的数值index,这里调用了Spark ML的String Indexer模块对字符变量进行索引,由于只能对单个元素进行索引,所以只能放在了循环中迭代处理,代码如下:
1 | List<String> indexColumns = new ArrayList<>(); |
(2)字符索引数值类型变量OneHot处理,这里是调用了Spark ML中的 OneHotEncoderEstimator 模块,与字符变量索引为数值变量不同的是,这次操作没有将dataframe数据落地,而是通过生成Model构建Pipeline中的stage,代码如下
1 | OneHotEncoderModel oneHotModel = new OneHotEncoderEstimator().setInputCols(indexColumns.toArray(new String[0])).setOutputCols(vocColumns.toArray(new String[0])).fit(dataDf); |
特征变量合并
将所有特征变量合并为feature变量,生成stage
1 | VectorAssembler vectorAssembler = new VectorAssembler().setInputCols(numberColumns.toArray(new String[0])).setOutputCol("features"); |
注:由于元数据的特殊性,所以这次回归模型构建没有进行共线性检验,同时所有的缺失数据预处理已经在Dataiku平台上完成。
算法初始化
Spark MLlib模块支持大部分的回归算法,这次预测模型训练选用了梯度提升回归树算法
将算法进行初始化,设定label变量和feature变量以及迭代次数,构建Pipeline的stage
1 | GBTRegressor gbt = new GBTRegressor().setLabelCol("SCT").setFeaturesCol("features").setMaxIter(10); |
构建Pipeline处理器
将各个模块stage传入Pipeline中
1 | Pipeline pipeline = new Pipeline().setStages(new PipelineStage[]{oneHotModel,vectorAssembler,gbt}); |
Pipeline模型训练
将dataframe格式的元数据split为train data和test data进行模型训练
1 | Dataset<Row>[] splitDf = dataDf.randomSplit(new double[]{0.8,0.2}); |
模型评估
计算模型R2 Score
1 | Dataset<Row> prediction = pipelineModel.transform(splitDf[1]); |
对使用Spark ML中Pipeline进行模型训练的一些总结
1、我一开始并没有使用Pipeline流进行模型训练,而是基于训练数据,不断处理dataframe,每一步都将dataframe数据落地,这样导致的后果就是代码复杂,而且模型训练时间很长,整个流程耗时达到了1分多钟,这对比Sklearn实在是糟心,在使用Pipeline流式处理后,流程耗时只有2s左右,速度提升很大,使用Pipeline的关键在于不要去手动操作dataframe数据,整个Pipeline中应该只有一个数据入口,所有操作都要写成stage,这样要求我们必须熟悉Spark MLlib模块的API,所有在写代码之前需要仔细研究API入口,没有文档就需要我们去源码查阅。
2、Pipeline一个好处在于,整个Pipeline Model是可以持久化的,这样我们训练好模型后,只要load模型,传入数据就可以获取预测值,不需要再次进行数据预处理,方便了模型的应用。
3、最后是谈一下,Spark ML模块的一个缺点,Spark ML模块在Maven中的主要依赖包是
1 | <dependency> |
但是当我在window环境中load持久化到本地的Pipeline模型时,会报包缺失的异常,当我直接submit jar包到Spark中是没问题的,这样带来的后果就是如果想将代码打包给别人用会有包缺失的异常,如果将所有依赖包都添加进来,这样jar会特别大,总而言之Spark ML不适合轻量级的单机版机器学习模型应用,如果使用JVM语言进行机器学习,可以使用smile平台。