使用pyspark计算一点和周围距离最近的前100点位置
业务场景
有一批地理位置存储在postgres数据库表中为表1,有另一张基础表覆盖了地区所有的地理位置为表2,现在需要计算出表1中所有位置最近的前100个地点,并进行排序,最终在数据库中生成一张新表,表中包含目标地点的所有topN最近点的hash
使用工具
- pyspark
- postgres
实现过程
由于数据量过大所以需要将数据放在spark上运行,使用spark的dataframe数据格式进行数据处理比较便捷(spark2.0已经使用dataframe替代rdd)
通过spark jdbc连接数据库,执行SQL生成dataframe格式数据,其中关键点在于如何通过SQL计算出topN距离(一开始我准备通过for循环,在代码中遍历求值,后来发现效率太低,而且需要通过spark udf将方法注册进spark),通过SQL进行计算范围两个步骤(1)计算出距离(2)对距离进行排序,排序时需要显性表示降序还是升序(3)对数据进行筛选
以下是SQL,其中txpop是基础地理信息表,test_by_zhangrui是目标地理信息表,由于不能使用group by聚合函数, —– 所以换了一种方式:
1 | SELECT a.choose_site_hash,a.filtrate_site_hash,a.distance FROM |
- 注册spark (master参数表明spark是运行在本机还是集群上,以及配置多少个节点),通过spark jdbc驱动连接数据库,并执行SQL(详情可看官网:http://spark.apache.org/docs/latest/sql-data-sources-jdbc.html)
1 | def get_dataframe_jdbc(a): |
- 执行SQL后,spark返回一个spark dataframe格式数据,我们需要将dataframe并入数据库做持久化
1 | def write_dataframe_jdbc(table_name, result): |
- 最后我们可以在数据库中得到我们想要的表(目前spark所有配置都是默认配置没有优化,包括SQL也没有优化,待日后进行优化)