0%

使用spark ml构建回归模型

使用Spark MLlib模块进行机器学习模型训练


使用模块介绍

Spark MLlib是基于dataframe数据格式的机器学习模块,Spark ML支持基于Pipeline的数据流式模型训练,我们可以通过构建一个一个的stage模块组成Pipeline模块,进行模型训练。

建模流程:

数据加载 ==> 特征预处理 ==> 算法初始化 ==> 构建Pipeline处理器 ==> 模型评估

完整代码位于https://github.com/zhangruipython/ETLPlatform/blob/master/SparkPlatform/src/main/java/com/application/ml/RegressionPipeline.java


数据加载

由于旧的基于RDD数据格式进行模型训练的Spark MLlib已经弃用,所以统一将数据加载为dataframe数据格式

由于数据源是CSV文件,所以需要注明head和分隔符,因为数据源数据并不大所以可以全部缓存在内存中,便于加速训练。

1
2
Dataset<Row> dataDf = spark.read().format("csv").option("header","true").option("sep", ",")
.option("inferSchema", "true").load("D:/rongze/notebook/SCT___a___v4_prepared.csv").cache();

特征预处理

数据特征预处理步骤如下:

缺失值补充 ==> 区分字符型和数值型变量 ==> 字符型变量转为哑变量 ==> 数值类型变量归一化 => 数值型变量进行共线性检验 ==> 特征变量合并

区分字符型和数值型变量:

通过遍历找出字符类型变量和数值类型变量,同时去除label变量

1
2
3
4
5
6
7
8
9
10
11
String str = "StringType";
List<String> strColumns = new ArrayList<String>();
dataDf.schema().foreach(s->{
// 判断列属性
if(str.equals(s.dataType().toString())){
strColumns.add(s.name());
}
return strColumns;
});
List<String> numberColumns = Arrays.stream(dataDf.columns()).filter(item->!strColumns.contains(item)).collect(Collectors.toList());
numberColumns.remove("SCT"); //去除label变量

字符型变量进行OneHot编码转换:

字符类型变量转为哑变量的过程由两部构成:

(1)字符变量索引为对应的数值index,这里调用了Spark ML的String Indexer模块对字符变量进行索引,由于只能对单个元素进行索引,所以只能放在了循环中迭代处理,代码如下:

1
2
3
4
5
6
7
8
9
10
List<String> indexColumns = new ArrayList<>();
List<String> vocColumns = new ArrayList<>();
for (String strColumn : strColumns) {
String columnIndex = strColumn + "_index";
String columnVoc = strColumn + "_voc";
indexColumns.add(columnIndex);
vocColumns.add(columnVoc);
StringIndexer indexer = new StringIndexer().setInputCol(strColumn).setOutputCol(columnIndex);
dataDf = indexer.fit(dataDf).transform(dataDf);
}

(2)字符索引数值类型变量OneHot处理,这里是调用了Spark ML中的 OneHotEncoderEstimator 模块,与字符变量索引为数值变量不同的是,这次操作没有将dataframe数据落地,而是通过生成Model构建Pipeline中的stage,代码如下

1
OneHotEncoderModel oneHotModel = new OneHotEncoderEstimator().setInputCols(indexColumns.toArray(new String[0])).setOutputCols(vocColumns.toArray(new String[0])).fit(dataDf);

特征变量合并

将所有特征变量合并为feature变量,生成stage

1
VectorAssembler vectorAssembler = new VectorAssembler().setInputCols(numberColumns.toArray(new String[0])).setOutputCol("features");

注:由于元数据的特殊性,所以这次回归模型构建没有进行共线性检验,同时所有的缺失数据预处理已经在Dataiku平台上完成。

算法初始化

Spark MLlib模块支持大部分的回归算法,这次预测模型训练选用了梯度提升回归树算法

将算法进行初始化,设定label变量和feature变量以及迭代次数,构建Pipeline的stage

1
GBTRegressor gbt  = new GBTRegressor().setLabelCol("SCT").setFeaturesCol("features").setMaxIter(10);

构建Pipeline处理器

将各个模块stage传入Pipeline中

1
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[]{oneHotModel,vectorAssembler,gbt});

Pipeline模型训练

将dataframe格式的元数据split为train data和test data进行模型训练

1
2
Dataset<Row>[] splitDf = dataDf.randomSplit(new double[]{0.8,0.2});
PipelineModel pipelineModel = pipeline.fit(splitDf[0]);

模型评估

计算模型R2 Score

1
2
3
4
Dataset<Row> prediction = pipelineModel.transform(splitDf[1]);
prediction.select("SCT","prediction","features").show(100);
RegressionEvaluator evaluator = new RegressionEvaluator().setLabelCol("SCT").setMetricName("r2").setPredictionCol("prediction");
System.out.println(evaluator.evaluate(prediction));

对使用Spark ML中Pipeline进行模型训练的一些总结

1、我一开始并没有使用Pipeline流进行模型训练,而是基于训练数据,不断处理dataframe,每一步都将dataframe数据落地,这样导致的后果就是代码复杂,而且模型训练时间很长,整个流程耗时达到了1分多钟,这对比Sklearn实在是糟心,在使用Pipeline流式处理后,流程耗时只有2s左右,速度提升很大,使用Pipeline的关键在于不要去手动操作dataframe数据,整个Pipeline中应该只有一个数据入口,所有操作都要写成stage,这样要求我们必须熟悉Spark MLlib模块的API,所有在写代码之前需要仔细研究API入口,没有文档就需要我们去源码查阅。

2、Pipeline一个好处在于,整个Pipeline Model是可以持久化的,这样我们训练好模型后,只要load模型,传入数据就可以获取预测值,不需要再次进行数据预处理,方便了模型的应用。

3最后是谈一下,Spark ML模块的一个缺点,Spark ML模块在Maven中的主要依赖包是

1
2
3
4
5
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.12</artifactId>
<version>2.4.5</version>
</dependency>

但是当我在window环境中load持久化到本地的Pipeline模型时,会报包缺失的异常,当我直接submit jar包到Spark中是没问题的,这样带来的后果就是如果想将代码打包给别人用会有包缺失的异常,如果将所有依赖包都添加进来,这样jar会特别大,总而言之Spark ML不适合轻量级的单机版机器学习模型应用,如果使用JVM语言进行机器学习,可以使用smile平台。