0%

Sedona是一个基于spark的集群计算框架,可以大规模处理地理空间数据。Sedona扩展了Spark中数据结构RDD,使之可以兼容集群中地理空间数据,Sedona的SpatialRDD可以由普通RDD转换而来或者通过读取数据源而生成。
Sedona提供了开箱即用的空间SQL API和RDD API,SQL API 标准与PostgreSQL一致,RDD API支持Scala,Java,Python,R语言开发。

Sedona 支持读取的数据源

”spark 支持读取的数据源,Sedona都支持“

1、CSV
2、HDFS
3、Amazon S3
4、ODPS

Sedona 支持解析的地理数据格式

1、WKT
2、WKB
3、GeoJSON
4、Shapefile

Sedona 内部的 Spatial RDD兼容7种空间类型:

  • 多点
  • 多边形
  • 多个多边形
  • 线条
  • 多个线条
  • 圆形
  • Geometry集合

Sedona 处理大规模数据集的方式

空间数据分区

Sedona 中的Spatial RDD中的数据根据空间数据分布进行分区,附近的空间的对象会被放在同一分区内。空间分区具有两种效果:

(1)当执行针对特定空间区域的空间查询时,可以避免对空间不接近的分区进行不必要的计算,从而加快查询速度。

(2)将Spatial RDD切分为多个数据分区,每个分区的数据量接近,这样在集群计算时可以避免出现数据倾斜。

Sedona目前支持的空间分区模式有三种:

  • KDB-Tree,
  • Quad-Tree
  • R-Tree

1_wdriTIbaf1b6EME7O0QbKg.png

空间数据索引

Sedona 使用分布式空间索引对集群中的Spatial RDD进行索引,Sedona的分布式索引由两部分组成

(1)global index 全局索引,存储在master机器上,在spatial rdd 进行空间分区阶段生成,global index定位Spatial RDD中的空间分区边界框,global index 的目的是在空间查询时快速去除确定没有有效空间对象的空间分区。

(2)local index 本地索引,建立在Spatial RDD 的每一个空间分区里面,每一个local index只对自己空间分区的数据有作用,在空间查询中local index可以并行计算用于加速空间查询。

Spatial RDD定制化的序列化方式

Sedona针对空间对象和空间索引提供了定制化的序列化方式,Sedona序列化器可以将空间对象和空间索引序列化为压缩的字节数组,该序列化器比spark中常用的kyro序列化更快,在进行复杂空间操作(例如空间连接查询)时,占用内存较小。
序列化器还可以序列化和反序列化local index ,使用DFS(深度优先遍历)先父节点再写对应的子节点。

Sedona地理空间计算和JDBC连接PostgresSQL对比

计算效率对比

  • 两种方案统一的Spark 集群配置都是60 instance , 2g instance memory, 2 instance core
  • Sedona使用KDB-Tree作为空间分区算法,使用Quad-Tree作为空间索引算法
  • PostgresSQL基于一台120核机器计算
方案名称 计算数据量 计算内容 耗时 TPS
分布式JDBC连接PostgresSQL地理计算 449655129条数据 计算5月份全量数据经纬度对应的路况 20h40m 6044
Spark+Sedona地理计算 449655129条数据 计算5月份全量数据经纬度对应的路况 2h20m2s 53518

计算精准度对比

Spark+Sedona地理计算方案的计算结果有87%的结果数据可以和PostgresSQL计算结果匹配上

使用Sedona RDD API进行查询

配置依赖

Sedona版本和spark版本保持一致

初始化spark上下文

1
2
3
4
5
val conf = new SparkConf()
conf.setAppName(“GeoSparkExample”)
conf.set(“spark.serializer”, classOf[KryoSerializer].getName)
conf.set(“spark.kryo.registrator”, classOf[GeoSparkKryoRegistrator].getName)
val sc = new SparkContext(conf)

创建SpatialRdd

将spark dataframe转为SpatialRdd,并指定Geometry类型字段

1
val roadRDD = Adapter.toSpatialRdd(roadWktDF, "geom_wkt", Seq("ref", "fclass"))

构建空间索引

可以在SpatialRDD上构建分布式空间索引。目前,该系统提供两种类型的空间索引,QUADTREE和RTREE,作为每个分区的本地索引

1
spatialRDD.buildIndex(IndexType.QUADTREE, false) // 在进行空间join查询时设置为true

编写空间范围查询

空间范围查询返回位于地理区域内的所有空间对象。
例如,范围查询可能会在余杭找到所有公园,或在用户当前位置的一英里范围内返回所有公园。在入参方面,空间范围查询以一组空间对象和一个多边形查询窗口作为输入,并返回位于查询区域的所有空间对象。

1
2
3
4
5
val rangeQueryWindow = new Envelope(-90.01, -80.01, 30.01, 40.01) //定义矩形空间窗口
/* considerIntersect=true, 只返回被窗口完全覆盖的Geometry; considerIntersect=false,返回和窗口发生相交的Geometry */
val considerIntersect = false
val usingIndex = false
var queryResult = RangeQuery.SpatialRangeQuery(spatialRDD, rangeQueryWindow, considerIntersect, usingIndex)

编写K近邻查询

输入 K、查询点和空间RDD集合,查询距离查询点最近的K个空间RDD

编写空间连接查询

空间连接查询是将两个或多个数据集与空间距离相结合的查询,例如查询在500KM范围内有杂货店的加油站。

1
2
3
val considerBoundaryIntersection = true
val usingIndex = true
val resultRdd = DistanceJoinQueryFlat(roadRDD, pointCircleRDD, usingIndex, considerBoundaryIntersection)

Hive UDF 构建

什么是Hive UDF

UDF(User-Defined-Functions)用户自定义的Hive函数。

Hive UDF 种类

  • UDF:one to one ,操作单个数据行,产生对应的单行数据
  • UDAF:many to one,操作多行数据,产生一行数据
  • UDTF:one to many,操作一行数据,产生多行数据

实现UDF

maven 依赖

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.2.1</version>
</dependency>

注意点

  • hive版本和Hadoop版本一定要对应,不然会产生jar包冲突

  • UDF实现类需要继承 GenericUDF,许多教程案列中写的是继承UDF,在hive 3.1.2版本中 UDF类已经被标记为@Deprecated,不建议使用

实现过程

  • 继承GenericUDF后需要复写三个方法,分别是initialize,evaluate,getDisplayString
  • initialize方法在UDF实现类中首先被调用,主要负责:
    • 参数校验,验证输入参数类型是否符合预期
    • 设置返回值,设置返回一个与预期输出类型相符合的对象
    • 存储全局变量,为全局变量赋值
  • evaluate方法处理具体逻辑,返回预期执行结果
  • getDisplayString:类似toString方法

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package org.example.util;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.NDV;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.io.BooleanWritable;

/**
* 编写hive udf
* @author 张睿
* @create 2020-07-15 10:16
**/
@Description(name = "judge",value = "_FUNC_(array,value) - Returns TRUE if the array contains value"
,extended = "Example: SELECT _FUNC_(array('a','b'),'a')")
@NDV(maxNdv = 2)
public class ComplexUdf extends GenericUDF{
ListObjectInspector listOI;
ObjectInspector elementOI;

@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
// 参数数量校验
if(arguments.length!=2){
throw new UDFArgumentLengthException("the operator accept two arguments");
}
// 参数类型校验
ObjectInspector a = arguments[0];
ObjectInspector b = arguments[1];
if(!(a instanceof ListObjectInspector)||!(b instanceof StringObjectInspector)){
throw new UDFArgumentException("first argument must be a list / array, second argument must be a string");
}
this.listOI = (ListObjectInspector) a;
this.elementOI = b;
// 校验list是否由string组成
if(!(listOI.getListElementObjectInspector() instanceof StringObjectInspector)){
throw new UDFArgumentException("first argument must be a list of strings");
}
return PrimitiveObjectInspectorFactory.writableBooleanObjectInspector;
}

@Override
public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
// 使用对象检查器从延迟对象中获取列表和字符串
BooleanWritable result = new BooleanWritable(false);
int elemNum = this.listOI.getListLength(deferredObjects[0].get());
Object arg = deferredObjects[1].get();


for(int i=0;i<elemNum;i++){
this.listOI.getListElement(deferredObjects[0].get(),i);
Object listElement = listOI.getListElement(deferredObjects[0].get(),i);
// String element = elementOI.getPrimitiveJavaObject(lazyString);
if (arg.equals(listElement)){
result.set(true);
break;
}
}
return result;


@Override
public String getDisplayString(String[] args) {
return "if"+args[0]+" include "+args[1]+" return true";
}
}

注册UDF函数

  • 上传本地jar包:add jar xxx; (xxx对应jar包本地路径)
  • 注册临时函数:create temporary function args01 as “args02”; (args01:UDF方法名称,args02:具体实现类路径)
  • 注册永久函数:create function args01 as “args02”;

通过使用presto结合Redis实现RocksDB数据SQL结构化

为什么使用presto结合redis的方式实现?

  • 一开始我的目标是:因为presto支持自定义数据源插件开发,所以自己实现以RocksDB 作为数据源的插件,但是在开发过程中发现,因为RocksDB 数据源与业务耦合太紧,许多操作需要定制化开发,例如数据解密、数据反序列化、metadata映射等问题,这一步很难通过读取外部的配置文件或者通过presto客户端传递参数实现。所以我想借助第三方数据中间件进行业务与功能的去耦合。

实现以redis作为数据源,presto作为即席查询分析

什么是即席查询分析

  • 即席查询分析(Ad Hoc):用户根据自身需求灵活选择查询条件,即席查询与普通应用查询最大的不同是普通的应用查询是定制开发的,而即席查询是由用户自定义查询条件的。
  • 即席查询和分析的计算模式兼具了良好的时效性与灵活性,是对批处理,流计算两大计算模式有力补充。

开发步骤

  • 在presto中配置redis有关参数,在etc/catalog/目录下新建redis.properties 配置如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 连接名称
connector.name=redis
# redis服务端节点地址
redis.nodes=192.168.1.54:6379
# redis中表名称
redis.table-names=user_info
# redis中库名称
redis.default-schema=user_profile
# redis metadata配置文件路径
redis.table-description-dir=/home/Documents/presto-server-0.235.1/etc/catalog/redis
# presto查找表名称与库名称分隔符
redis.key-delimiter=:
redis.key-prefix-schema-table=true
redis.hide-internal-columns=false
redis.hide-internal-columns=false

注意点:因为在redis中存储数据结构为 hash,所以presto通过解析 key值生成结构化数据中的表名称和库名称,解析的分隔符默认为:

  • 创建redis metadata文件,在redis.properties 中写定的metadata配置路径下新建redis.json文件,文件如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
{
"tableName": "user_info",
"schemaName": "user_profile",
"key": {
"dataFormat": "raw",
"fields": [
{
"name": "redis_key",
"type": "varchar",
"hidden": "false"
}
]
},
"value": {
"dataFormat": "hash",
"fields": [
{
"name": "hospitalizedNum",
"type": "varchar",
"mapping": "hospitalizedNum"
},{
"name": "hid",
"type": "varchar",
"mapping": "hid"
}
]
}
}
  • 在redis客户端中写入数据,数据格式为HASH,进行测试:
1

  • 在presto客户端进行查询:
1
./presto --server localhost:8080 --catalog redis --schema user_profile(user_profile即为redis key:分隔符第一位)
  • 结果展示:

批注 2020-07-07 170252.png

RocksDB中大批次数据写入Redis中

使用redis pipeline提高写入速度

Redis是使用客户端-服务端模型和请求/响应协议的TCP服务器,通常情况下一次请求需要以下步骤

1、客户端向服务端发送查询,以阻塞的方式从套接字中读取服务器响应

2、服务器处理命令并将响应发送给客户端

这种模式依赖于RTT(往返时间),如果有1000条数据插入,则会耗时1000*RTT

pipeline是将所有命令打包,通过一次网络参数发送至服务端,所以可以大大减少网络通信时间。

使用多线程处理模式提高写入速度

将对数据集进行拆分,规定每一个线程处理N条数据,并行写入Redis提高速度。

执行时间表

数据量 方案说明 耗时(MS)
320000 不使用redis pipeline模式 291070
320000 使用redis pipeline 25337
320000 使用 pipeline+多线程 9780

通过开发Plugin插件方式开发Presto UDF

Presto 插件机制为开发者提供了以下功能:

  • 对接自定义存储系统
  • 添加自定义数据类型
  • 添加自定义处理函数
  • 自定义权限控制

以下是开发Presto UDF函数的步骤

1、逻辑代码开发

1
2
3
4
5
6
7
8
9
10
11
public class UdfDemo
{
private UdfDemo(){}
@Description("两值相除")
@ScalarFunction(value = "divide")
@SqlType(StandardTypes.DOUBLE)
public static double divide(@SqlType(StandardTypes.DOUBLE) double num01,@SqlType(StandardTypes.DOUBLE) double num02){
double result = num01/num02;
return result;
}
}

注意点

  • @Description 是自定义方法注释
  • @ScalarFunction 是函数名称
  • @SqlType 是函数出参
  • @ScalarFunction 中函数名称需要和具体方法名称保持一致
  • 方法入参需要添加 @SqlType 控制Java类型与Presto类型对应

2、插件编写

1
2
3
4
5
6
7
8
public class DemoPlugin implements Plugin {
@Override
public Set<Class<?>> getFunctions() {
return ImmutableSet.<Class<?>>builder()
.add(UdfDemo.class)
.build();
}
}

注意点

  • add所添加类即为对应的UDF方法类

3、服务注册

  • 在src/main/ 目录下创建以下文件夹resource/META-INF/services
  • 在services文件夹中添加名称为 com.facebook.presto.spi.Plugin的文件
  • 在 com.facebook.presto.spi.Plugin中添加实现的插件类名称,如图所示

批注 2020-07-01 192624.png

4、发布jar包

  • 用maven 将项目打包(添加所有依赖)
  • 在presto 安装目录下的plugin 文件夹中新建文件夹名称为UDF名称,将jar包放置于该文件夹中
  • 重启presto服务

5、在cli端使用UDF

如图所示

批注 2020-07-01 193431.png

过程中可能出现的异常

presto 启动日志中出现 localfile already exist,原因maven打包中添加了 以下依赖

1
2
3
4
5
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-local-file</artifactId>
<version>0.235.1</version>
</dependency>

使用Spark MLlib模块进行机器学习模型训练


使用模块介绍

Spark MLlib是基于dataframe数据格式的机器学习模块,Spark ML支持基于Pipeline的数据流式模型训练,我们可以通过构建一个一个的stage模块组成Pipeline模块,进行模型训练。

建模流程:

数据加载 ==> 特征预处理 ==> 算法初始化 ==> 构建Pipeline处理器 ==> 模型评估

完整代码位于https://github.com/zhangruipython/ETLPlatform/blob/master/SparkPlatform/src/main/java/com/application/ml/RegressionPipeline.java


数据加载

由于旧的基于RDD数据格式进行模型训练的Spark MLlib已经弃用,所以统一将数据加载为dataframe数据格式

由于数据源是CSV文件,所以需要注明head和分隔符,因为数据源数据并不大所以可以全部缓存在内存中,便于加速训练。

1
2
Dataset<Row> dataDf = spark.read().format("csv").option("header","true").option("sep", ",")
.option("inferSchema", "true").load("D:/rongze/notebook/SCT___a___v4_prepared.csv").cache();

特征预处理

数据特征预处理步骤如下:

缺失值补充 ==> 区分字符型和数值型变量 ==> 字符型变量转为哑变量 ==> 数值类型变量归一化 => 数值型变量进行共线性检验 ==> 特征变量合并

区分字符型和数值型变量:

通过遍历找出字符类型变量和数值类型变量,同时去除label变量

1
2
3
4
5
6
7
8
9
10
11
String str = "StringType";
List<String> strColumns = new ArrayList<String>();
dataDf.schema().foreach(s->{
// 判断列属性
if(str.equals(s.dataType().toString())){
strColumns.add(s.name());
}
return strColumns;
});
List<String> numberColumns = Arrays.stream(dataDf.columns()).filter(item->!strColumns.contains(item)).collect(Collectors.toList());
numberColumns.remove("SCT"); //去除label变量

字符型变量进行OneHot编码转换:

字符类型变量转为哑变量的过程由两部构成:

(1)字符变量索引为对应的数值index,这里调用了Spark ML的String Indexer模块对字符变量进行索引,由于只能对单个元素进行索引,所以只能放在了循环中迭代处理,代码如下:

1
2
3
4
5
6
7
8
9
10
List<String> indexColumns = new ArrayList<>();
List<String> vocColumns = new ArrayList<>();
for (String strColumn : strColumns) {
String columnIndex = strColumn + "_index";
String columnVoc = strColumn + "_voc";
indexColumns.add(columnIndex);
vocColumns.add(columnVoc);
StringIndexer indexer = new StringIndexer().setInputCol(strColumn).setOutputCol(columnIndex);
dataDf = indexer.fit(dataDf).transform(dataDf);
}

(2)字符索引数值类型变量OneHot处理,这里是调用了Spark ML中的 OneHotEncoderEstimator 模块,与字符变量索引为数值变量不同的是,这次操作没有将dataframe数据落地,而是通过生成Model构建Pipeline中的stage,代码如下

1
OneHotEncoderModel oneHotModel = new OneHotEncoderEstimator().setInputCols(indexColumns.toArray(new String[0])).setOutputCols(vocColumns.toArray(new String[0])).fit(dataDf);

特征变量合并

将所有特征变量合并为feature变量,生成stage

1
VectorAssembler vectorAssembler = new VectorAssembler().setInputCols(numberColumns.toArray(new String[0])).setOutputCol("features");

注:由于元数据的特殊性,所以这次回归模型构建没有进行共线性检验,同时所有的缺失数据预处理已经在Dataiku平台上完成。

算法初始化

Spark MLlib模块支持大部分的回归算法,这次预测模型训练选用了梯度提升回归树算法

将算法进行初始化,设定label变量和feature变量以及迭代次数,构建Pipeline的stage

1
GBTRegressor gbt  = new GBTRegressor().setLabelCol("SCT").setFeaturesCol("features").setMaxIter(10);

构建Pipeline处理器

将各个模块stage传入Pipeline中

1
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[]{oneHotModel,vectorAssembler,gbt});

Pipeline模型训练

将dataframe格式的元数据split为train data和test data进行模型训练

1
2
Dataset<Row>[] splitDf = dataDf.randomSplit(new double[]{0.8,0.2});
PipelineModel pipelineModel = pipeline.fit(splitDf[0]);

模型评估

计算模型R2 Score

1
2
3
4
Dataset<Row> prediction = pipelineModel.transform(splitDf[1]);
prediction.select("SCT","prediction","features").show(100);
RegressionEvaluator evaluator = new RegressionEvaluator().setLabelCol("SCT").setMetricName("r2").setPredictionCol("prediction");
System.out.println(evaluator.evaluate(prediction));

对使用Spark ML中Pipeline进行模型训练的一些总结

1、我一开始并没有使用Pipeline流进行模型训练,而是基于训练数据,不断处理dataframe,每一步都将dataframe数据落地,这样导致的后果就是代码复杂,而且模型训练时间很长,整个流程耗时达到了1分多钟,这对比Sklearn实在是糟心,在使用Pipeline流式处理后,流程耗时只有2s左右,速度提升很大,使用Pipeline的关键在于不要去手动操作dataframe数据,整个Pipeline中应该只有一个数据入口,所有操作都要写成stage,这样要求我们必须熟悉Spark MLlib模块的API,所有在写代码之前需要仔细研究API入口,没有文档就需要我们去源码查阅。

2、Pipeline一个好处在于,整个Pipeline Model是可以持久化的,这样我们训练好模型后,只要load模型,传入数据就可以获取预测值,不需要再次进行数据预处理,方便了模型的应用。

3最后是谈一下,Spark ML模块的一个缺点,Spark ML模块在Maven中的主要依赖包是

1
2
3
4
5
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.12</artifactId>
<version>2.4.5</version>
</dependency>

但是当我在window环境中load持久化到本地的Pipeline模型时,会报包缺失的异常,当我直接submit jar包到Spark中是没问题的,这样带来的后果就是如果想将代码打包给别人用会有包缺失的异常,如果将所有依赖包都添加进来,这样jar会特别大,总而言之Spark ML不适合轻量级的单机版机器学习模型应用,如果使用JVM语言进行机器学习,可以使用smile平台。

nvidia docker配置GPU 深度学习镜像


使用目标

通过nvidia docker搭建在宿主机上搭建装有cuda,cudnn,opencv,darkent的基础镜像,从而便于后续基于GPU目标识别应用的容器化部署和管理

安装流程

安装nvidia docker

  • 卸载nvidia-docker和其他GPU容器

    1
    2
    docker volume ls -q -f driver=nvidia-docker | xargs -r -I{} -n1 docker ps -q -a -f volume={} | xargs -r docker rm -f
    sudo apt-get purge -y nvidia-docker
  • 添加package repositories

    1
    2
    3
    4
    5
    6
    curl -s -L https://nvidia.github.io/nvidia-docker/gpgkey | \
    sudo apt-key add -
    distribution=$(. /etc/os-release;echo $ID$VERSION_ID)
    curl -s -L https://nvidia.github.io/nvidia-docker/$distribution/nvidia-docker.list | \
    sudo tee /etc/apt/sources.list.d/nvidia-docker.list
    sudo apt-get update
  • 安装 nvidia-docker2

    1
    2
    sudo apt-get install -y nvidia-docker2
    sudo pkill -SIGHUP dockerd

安装cuda基础镜像

  • 安装指定版本cuda

    1
    nvidia-docker run --rm -ti nvidia/cuda:{cuda版本} nvcc --version

更新基础cuda镜像

更新cuda镜像的方式有很多,例如:通过DockerFile文件更新镜像,通过bash编辑镜像然后commit,我这里使用的方法是,先通过cuda镜像生成容器,通过编辑容器,将容器保存为新的镜像文件,这样的原因在于可以很方便的通过docker cap命令拷贝文件进入容器中

预先准备

1
2
3
4
5
6
7
8
9
10
11
12
13
14
1、更换apt安装镜像源
sed -i 's#http://archive.ubuntu.com/#http://mirrors.tuna.tsinghua.edu.cn/#' /etc/apt/sources.list
2、更新apt镜像源,修复包,安装python3 pip
apt-get update -y --fix-missing && apt-get install -y python3-pip python3-dev libsm6 libxext6 libxrender-dev --fix-missing
3、安装依赖包
apt-get install -y \
wget \
unzip \
ffmpeg \
git

# Install cmake
apt-get install -y build-essential
apt-get install -y cmake

安装OpenCV

1
2
3
4
5
6
7
8
9
10
11
12
13
14
1、下载opencv源码包,opencv源码包有1个多G,可以在gitee上clone源码,速度很快
git clone -b 3.4 https://gitee.com/mirrors/opencv.git
2、安装依赖包
sudo apt-get install libpng-dev
sudo apt-get install libjpeg-dev
sudo apt-get install libopenexr-dev
sudo apt-get install libtiff-dev
sudo apt-get install libwebp-dev
3、在OpenCV中创建build文件夹
mkdir build
cd build
4、配置和编译
cmake ../
make install

安装cudnn

1
2
3
4
5
6
1、下载与cuda版本对应的cudnn
2、将cudnn拷贝至容器指定目录下 docker cp {cudnn文件目录} {容器目录}
3、解压cudnn文件
4、sudo cp cuda/include/cudnn.h /usr/local/cuda/include
sudo cp cuda/lib64/libcudnn* /usr/local/cuda/lib64
sudo chmod a+r /usr/local/cuda/include/cudnn.h /usr/local/cuda/lib64/libcudnn*

编译darknet

1
2
3
4
5
6
7
8
1、下载ABdarknet源码
git clone https://github.com/AlexeyAB/darknet
2、修改Makefile文件
GPU=1
CUDNN=1
OPENCV=1
3、编译
make

根据基础镜像创建容器

1
nvidia-docker run -it name {容器名称} {镜像名称} {容器位置}

注意:1、一定要用nvidia-docker创建容器,否者会出现cudnn not found的问题

2、调用darknet时如果出现 error while loading shared libraries:libopencv_highgui.so.3.4: cannot open shared object file的问题,编辑 /etc/ld.so.conf文件,加上include /etc/ld.so.conf.d/*.con,然后执行ldconfig,在darknet目录下make clean后重新make

spacy+doccano 中文NLP流程

使用工具介绍

spacy是一个工业级python自然语言处理包,支持自然语言文本分析、命名实体识别、词性标注、依存句法分析等功能。spacy2.0之后通过引入结巴分词,添加了对中文NLP的支持,不过在使用spacy进行中文自然语言处理时有许多需要注意的地方。

doccano是一个在GitHub上开源的可视化实体标注工具,支持自定义实体标签,文本实体标注,导出标注数据为jsonl(JSON Lines文件,结构化数据,用于管道文件传输)。在中文NPL处理中doccano主要作用是提供中文NER训练参数。


doccano使用过程

在doccano上对语句进行命名实体标签标注

  • 自定义实体标签
  • 选择实体设定实体标签

将doccano上数据导出为json数据

  • export data为json格式
1
2
{"id": 1, "text": "The Hitchhiker's Guide to the Galaxy (sometimes referred to as HG2G, HHGTTGor H2G2) is a comedy science fiction series created by Douglas Adams. Originally a radio comedy broadcast on BBC Radio 4 in 1978, it was later adapted to other formats, including stage shows, novels, comic books, a 1981 TV series, a 1984 video game, and 2005 feature film.", "annotations": [], "meta": {}, "annotation_approver": null}

解析json数据为TRAIN_DATA 数据格式

  • 解析json格式数据
  • 抽取数据中的labels

在现有模型中训练TRAIN_DATA,添加实体类别,更新模型

  • 在模型中添加实体label
  • 循环训练model

对模型测试,查看模型训练效果,对于当前模型未能准确识别的实体,在doccano上更新标注

  • 与训练语句相同句法语句使用模型划分出实体label和实体名称
  • 对照找出当前模型划分错误之处
  • 在doccano调整训练语句

spacy的使用及安装过程

spacy在centos7环境下的安装

在使用pip安装spacy时,因为面对的是中文环境,spacy2.0以后通过引入jieba分词添加了对中文的支持,但是还是有较多的坑。

如果要使用中文模型,目前的安装环境是python3.6+spacy2.0.1,如果安装了最高版本的spacy2.1,在使用中文模型时会出现python re正则包出错的情况。

中文模型的下载地址:https://github.com/howl-anderson/Chinese_models_for_SpaCy/releases

  • 在使用中文模型时需要控制 msgpack-numpy==0.4.4.2 不然会出现 (TypeError:encoding)编码的错误

安装好spacy和下载好中文模型之后,就是在spacy中配置中文环境,如何配置中文环境参考https://www.jianshu.com/p/0dab70cb540e

配置好中文环境后,由于spacy使用的jieba分词模式是最普通的模式,遇到行业内专业词汇很难准确分词,这就需要我们使用jieba的自定义词典分词功能。

  • 使用自定义分词字典

先自定义一个TXT文档,用于标注自定义词块、词性、词频

1
2
3
4
5
6
7
8
9
拿铁 3 i
摩卡 3 i
全自动咖啡机 3 i
半自动咖啡机 3 i
美式(传统滴滤式)咖啡机 100 i
意式咖啡机 19 i
胶囊咖啡机 3 i
月房租 4 i
卡布奇洛 4 i

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import jieba
import jieba.posseg as pseg
import jieba.analyse as anls

doc = """我在江北新区研创园喝咖啡,喝的是卡布奇洛"""
doc01 = """有一台全自动咖啡机出现故障了"""
doc02 = """江北新区研创园的月房租是10000人民币"""
jieba.load_userdict("demo.txt")

# 全模式
seg_list = jieba.cut(doc)
seg_list01 = jieba.cut(doc01)
seg_list02 = jieba.cut(doc02)
print(type(seg_list))
print("/".join(seg_list))
print("/".join(seg_list01))
print("/".join(seg_list02))

效果如下:

1
2
3
我/在/江北/新区/研创园/喝咖啡/,/喝/的/是/卡布奇洛
有/一台/全自动咖啡机/出现/故障/了
江北/新区/研创园/的/月房租/是/10000/人民币
  • 因为当前目标是在文档中找出指定实体类型的实体,如找出实体类型为”法定代表人”的所有实体,在spacy中现有的实体类型并不能满足需求,所以这里需要使用命名实体标注和NER(命名实体识别),自定义实体标注的工作由doccano完成,在获取到训练数据后,需要将其训练至spacy的中文模型中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
"""
训练一个新的命名实体识别
"""
from __future__ import unicode_literals, print_function
# from nlp_app.spacy_demo.data_conversion import conversion
import plac
import random
from pathlib import Path
import spacy
from spacy.util import minibatch, compounding
import json

# 解析json数据
def conversion(path):
doc = open(path, "r")
train_data = []
try:
# all_text = doc.read()
lines = doc.readlines()
if len(lines) != 1:
for all_text in lines:
print(type(json.loads(all_text)))
all_text = json.loads(all_text)
text_content = all_text["text"]
text_entities = all_text["labels"]
entity_list = []
for entity in text_entities:
entity = tuple(entity)
entity_list.append(entity)
data = (text_content, {"entities": entity_list})
train_data.append(data)
finally:
doc.close()
return train_data


TRAIN_DATA = conversion("/home/hadoop/Documents/train_data.json1")
# 获取TRAIN_DATA 中的label
label_list = []
for data in TRAIN_DATA:
if len(data) == 2:
entities = data[1]["entities"]
for entity in entities:
label_list.append(entity[2])


@plac.annotations(
model=("Model name. Defaults to blank 'en' model.", "option", "m", str),
output_dir=("Optional output directory", "option", "o", Path),
n_iter=("Number of training iterations", "option", "n", int),
)
def main(model="zh_core_2.0.3", output_dir="/usr/local/lib/python3.6/dist-packages/spacy/data/zh_core_2.0.3",
n_iter=100):
random.seed(0)
if model is not None:
nlp = spacy.load(model)
print("Loaded model '%s'" % model)
else:
nlp = spacy.blank("en") # 选择语言创建模型
print("Created blank 'en' model")
if "ner" not in nlp.pipe_names:
ner = nlp.create_pipe("ner")
nlp.add_pipe(ner)
else:
ner = nlp.get_pipe("ner")
# 命名实体识别添加标签
for label in label_list:
ner.add_label(label)

move_names = list(ner.move_names)
other_pipes = [pipe for pipe in nlp.pipe_names if pipe != "ner"]
with nlp.disable_pipes(*other_pipes): # 训练模型
sizes = compounding(1.0, 4.0, 1.001)
for itn in range(n_iter):
random.shuffle(TRAIN_DATA)
batches = minibatch(TRAIN_DATA, size=sizes)
losses = {} # 训练损失
for batch in batches:
texts, annotations = zip(*batch)
nlp.update(texts, annotations, drop=0.35, losses=losses)
print("Losses", losses)
test_text = TRAIN_DATA[0][0]
# doc = nlp(test_text)
print("Entities in '%s'" % test_text)
# for ent in doc.ents:
# print(ent.label_, ent.text)

# 保存模型至路径
if output_dir is not None:
output_dir = Path(output_dir)
if not output_dir.exists():
output_dir.mkdir()
# nlp.meta["name"] = new_model_name
nlp.to_disk(output_dir)
print("Saved model to", output_dir)

# 测试模型效果
print("Loading from", output_dir)
nlp2 = spacy.load(output_dir)
assert nlp2.get_pipe("ner").move_names == move_names
doc2 = nlp2(test_text)
for ent in doc2.ents:
print(ent.label_, ent.text)


if __name__ == "__main__":
plac.call(main)

通过不断强化训练,将自定义实体与文本间句法关系写入中文模型中。在模型训练中有一个值得重视的问题就是深度学习的灾难性遗忘问题,而这个问题在中文模型中尤为明显,模型训练好之后,可以对与训练数据类似句法模板的文档进行命名实体识别,而且准确率较高,但是一旦在该模型基础上再一次训练数据,那么之前所训练的内容将被遗忘,也就是说所有模型训练都是一次性的,不可重复。

最近接触了知识图谱,并开始学习了Neo4j作为知识图谱的工具入门

什么是知识图谱?

从一个简单的例子入手:在Google搜索引擎里输入“Who is the wife of Bill Gates?”,我们直接可以得到答案-“Melinda Gates”。这是因为我们在系统层面上已经创建好了一个包含“Bill Gates”和“Melinda Gates”的实体以及他俩之间关系的知识库。所以,当我们执行搜索的时候,就可以通过关键词提取(”Bill Gates”, “Melinda Gates”, “wife”)以及知识库上的匹配可以直接获得最终的答案。这种搜索方式跟传统的搜索引擎是不一样的,一个传统的搜索引擎它返回的是网页、而不是最终的答案,所以就多了一层用户自己筛选并过滤信息的过程。

这就是一种KBQA模式的知识图谱应用,所以知识图谱本质上就是语义网络的知识库,从实际应用的角度出发可以认为知识图谱就是多关系图。在知识图谱中,我们通常使用”实体”来表示图里的节点、用”关系”表示图中边。

如何建立知识图谱

在建立知识图谱中我们选取的工具是Neo4j图形数据库,在建立知识图谱中最为关键的点在于:设定关系。有别于传统的关系型数据库,图数据库中一切基于关系。

图数据库和传统关系型数据库的区别

传统的关系型数据库在设计时都应该满足三范式


回顾一下三范式

第一范式:确保每列的原子性

第二范式:确保表中每个字段都和主键有关

第三范式:确保每列都和主键直接相关而不是间接相关


而在图数据库中每一个节点有自己的key-value属性,一群相同属性的节点就构成了label(标签),节点和节点之间通过关系相连接,relationship(关系)也有自己的key-value属性,同时在图数据中所有关系都是有向的。

将图数据库与传统关系型数据库比较可以认为,关系型数据库中的实体表就是图数据库中label,实体表中的每一条记录对应着图数据库中的一个节点,关系型数据库表与表之间的join关系就是图数据库中的relationship。

Neo4j的使用

将关系型数据库数据转移到Neo4j中

转移数据,我们使用的是Neo4j ETL 工具,它支持通过jdbc连接不同类型的数据库,将数据库中表导入Neo4j中,同时它可以自动判别关系型数据库中表与表的join关系,并在Neo4j中将其转化为relationship。

但是在使用Neo4j ETL 工具时有几点需要注意:

  • 千万别在Windows上操作!!! 由于Neo4j只支持以csv格式大批次导入数据,所以Neo4j ETL在本质上还是将关系型数据库中的表转为csv缓存在本地,然后在将csv导入Neo4j中,问题就出现在缓存这一步上,数据一旦在Windows上落地,Windows就会将UTF-8格式的数据转为UTF-8+BOM格式数据,也就是加了三个空字符,UTF-8+BOM的编码格式数据在导入Neo4j时会出现乱码。
  • Neo4j ETL在做表与表之间关系处理上,是不会辨别关系方向的,很多时候关系方向是乱的,而在图数据中关系方向很重要,所以不建议使用表与表之间的join在图数据库中建立关系,还是要手动建立关系,指明方向。

一些常用的cypher语句

删除关系,删除节点

MATCH (n:BC_Person)-[r]-() DELETE n,r

创建关系

match (a:LastSiteHangjiang),(b:PersonMessage) where a.lastType = “写字楼” create (a)-[r:is_contain]->(b) return r

查询关系

match p=(PersonMessage)-[r:is_contain]->() return p

使用pyspark计算一点和周围距离最近的前100点位置

业务场景

有一批地理位置存储在postgres数据库表中为表1,有另一张基础表覆盖了地区所有的地理位置为表2,现在需要计算出表1中所有位置最近的前100个地点,并进行排序,最终在数据库中生成一张新表,表中包含目标地点的所有topN最近点的hash

使用工具

  • pyspark
  • postgres

实现过程

由于数据量过大所以需要将数据放在spark上运行,使用spark的dataframe数据格式进行数据处理比较便捷(spark2.0已经使用dataframe替代rdd)

  • 通过spark jdbc连接数据库,执行SQL生成dataframe格式数据,其中关键点在于如何通过SQL计算出topN距离(一开始我准备通过for循环,在代码中遍历求值,后来发现效率太低,而且需要通过spark udf将方法注册进spark),通过SQL进行计算范围两个步骤(1)计算出距离(2)对距离进行排序,排序时需要显性表示降序还是升序(3)对数据进行筛选


​ 以下是SQL,其中txpop是基础地理信息表,test_by_zhangrui是目标地理信息表,由于不能使用group by聚合函数, —– 所以换了一种方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
SELECT a.choose_site_hash,a.filtrate_site_hash,a.distance FROM
(SELECT
tmp2.hash1 as choose_site_hash,
txpop.grid_hash as filtrate__site_hash,
st_distance (tmp2.geo1 :: geography, txpop.geo :: geography) as distance,
ROW_NUMBER() OVER(PARTITION by tmp2.hash1 ORDER BY (st_distance (tmp2.geo1 :: geography, txpop.geo :: geography)) ASC) as n
FROM
(
SELECT
grid_hash AS hash1,
geo AS geo1
FROM
test_by_zhangrui
) tmp2,
txpop
) a
WHERE n<=10


1
2
3
4
5
6
7
8
9
10
11
12
13
14
def get_dataframe_jdbc(a):
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.master("local[*]") \
.getOrCreate()
jdbcDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://*****") \
.option("dbtable", a) \
.option("user", "*****") \
.option("password", "*****") \
.load()
return jdbcDF

  • 执行SQL后,spark返回一个spark dataframe格式数据,我们需要将dataframe并入数据库做持久化
1
2
3
4
5
6
7
8
9
def write_dataframe_jdbc(table_name, result):
result.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://******") \
.option("dbtable", table_name) \
.option("user", "*******") \
.option("password", "*******") \
.save()
print("表生成完毕")
  • 最后我们可以在数据库中得到我们想要的表(目前spark所有配置都是默认配置没有优化,包括SQL也没有优化,待日后进行优化)

pyspark udf自定义函数

spark udf的定义:

UDF(User-defined functions, UDFs),即用户自定义函数,在Spark Sql的开发中十分常用,UDF对表中的每一行进行函数处理,返回新的值。

实际应用:

  • 定义一个函数

这是一个普通的判断函数

1
2
3
4
5
def get_bad_point(grid01_order_index, grid02_order_index, grid01_hash, grid02_hash):
if grid01_order_index < grid02_order_index:
return grid02_hash
elif grid02_order_index < grid01_order_index:
return grid01_hash
  • 将函数注册为spark udf

这里需要表明方法的返回值类型

1
get_bad_udf = udf(get_bad_point, StringType())
  • 结合spark SQL自带方法使用

向udf函数传入spark dataframe中列名作为参数

alias为udf返回的结果生成别名

distinct()是对dataframe中数据进行去重

1
2
3
unmatched_point_df = unmatched_distance_point.select(
get_bad_udf("grid01_order_index", "grid02_order_index", "grid01_hash",
"grid02_hash").alias("bad_point_hash")).distinct()

总结

pyspark的使用可以大大降低编写spark查询的代码量,结合udf函数的使用使我们操作spark SQL时不需要写大量的SQL脚本,由于spark SQL为我们封装了许多的操作函数,比如:过滤filter,两个dataframe之间取差集exceptAll 等等

所以我们编写spark 程序代码量不应该很多,如果代码量很多出现了大量的for循环,if判断等那么我们的代码就需要优化,原因就是所有的SQL都是循环+判断,所以可以用spark SQL就用spark SQL,最好不要手写循环判断。