0%

使用pyspark计算一点和周围距离最近的前100点位置

使用pyspark计算一点和周围距离最近的前100点位置

业务场景

有一批地理位置存储在postgres数据库表中为表1,有另一张基础表覆盖了地区所有的地理位置为表2,现在需要计算出表1中所有位置最近的前100个地点,并进行排序,最终在数据库中生成一张新表,表中包含目标地点的所有topN最近点的hash

使用工具

  • pyspark
  • postgres

实现过程

由于数据量过大所以需要将数据放在spark上运行,使用spark的dataframe数据格式进行数据处理比较便捷(spark2.0已经使用dataframe替代rdd)

  • 通过spark jdbc连接数据库,执行SQL生成dataframe格式数据,其中关键点在于如何通过SQL计算出topN距离(一开始我准备通过for循环,在代码中遍历求值,后来发现效率太低,而且需要通过spark udf将方法注册进spark),通过SQL进行计算范围两个步骤(1)计算出距离(2)对距离进行排序,排序时需要显性表示降序还是升序(3)对数据进行筛选


​ 以下是SQL,其中txpop是基础地理信息表,test_by_zhangrui是目标地理信息表,由于不能使用group by聚合函数, —– 所以换了一种方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
SELECT a.choose_site_hash,a.filtrate_site_hash,a.distance FROM
(SELECT
tmp2.hash1 as choose_site_hash,
txpop.grid_hash as filtrate__site_hash,
st_distance (tmp2.geo1 :: geography, txpop.geo :: geography) as distance,
ROW_NUMBER() OVER(PARTITION by tmp2.hash1 ORDER BY (st_distance (tmp2.geo1 :: geography, txpop.geo :: geography)) ASC) as n
FROM
(
SELECT
grid_hash AS hash1,
geo AS geo1
FROM
test_by_zhangrui
) tmp2,
txpop
) a
WHERE n<=10


1
2
3
4
5
6
7
8
9
10
11
12
13
14
def get_dataframe_jdbc(a):
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.master("local[*]") \
.getOrCreate()
jdbcDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://*****") \
.option("dbtable", a) \
.option("user", "*****") \
.option("password", "*****") \
.load()
return jdbcDF

  • 执行SQL后,spark返回一个spark dataframe格式数据,我们需要将dataframe并入数据库做持久化
1
2
3
4
5
6
7
8
9
def write_dataframe_jdbc(table_name, result):
result.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://******") \
.option("dbtable", table_name) \
.option("user", "*******") \
.option("password", "*******") \
.save()
print("表生成完毕")
  • 最后我们可以在数据库中得到我们想要的表(目前spark所有配置都是默认配置没有优化,包括SQL也没有优化,待日后进行优化)