0%

pyspark udf自定义函数

pyspark udf自定义函数

spark udf的定义:

UDF(User-defined functions, UDFs),即用户自定义函数,在Spark Sql的开发中十分常用,UDF对表中的每一行进行函数处理,返回新的值。

实际应用:

  • 定义一个函数

这是一个普通的判断函数

1
2
3
4
5
def get_bad_point(grid01_order_index, grid02_order_index, grid01_hash, grid02_hash):
if grid01_order_index < grid02_order_index:
return grid02_hash
elif grid02_order_index < grid01_order_index:
return grid01_hash
  • 将函数注册为spark udf

这里需要表明方法的返回值类型

1
get_bad_udf = udf(get_bad_point, StringType())
  • 结合spark SQL自带方法使用

向udf函数传入spark dataframe中列名作为参数

alias为udf返回的结果生成别名

distinct()是对dataframe中数据进行去重

1
2
3
unmatched_point_df = unmatched_distance_point.select(
get_bad_udf("grid01_order_index", "grid02_order_index", "grid01_hash",
"grid02_hash").alias("bad_point_hash")).distinct()

总结

pyspark的使用可以大大降低编写spark查询的代码量,结合udf函数的使用使我们操作spark SQL时不需要写大量的SQL脚本,由于spark SQL为我们封装了许多的操作函数,比如:过滤filter,两个dataframe之间取差集exceptAll 等等

所以我们编写spark 程序代码量不应该很多,如果代码量很多出现了大量的for循环,if判断等那么我们的代码就需要优化,原因就是所有的SQL都是循环+判断,所以可以用spark SQL就用spark SQL,最好不要手写循环判断。