0%

Hive UDF构建

Hive UDF 构建

什么是Hive UDF

UDF(User-Defined-Functions)用户自定义的Hive函数。

Hive UDF 种类

  • UDF:one to one ,操作单个数据行,产生对应的单行数据
  • UDAF:many to one,操作多行数据,产生一行数据
  • UDTF:one to many,操作一行数据,产生多行数据

实现UDF

maven 依赖

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.2.1</version>
</dependency>

注意点

  • hive版本和Hadoop版本一定要对应,不然会产生jar包冲突

  • UDF实现类需要继承 GenericUDF,许多教程案列中写的是继承UDF,在hive 3.1.2版本中 UDF类已经被标记为@Deprecated,不建议使用

实现过程

  • 继承GenericUDF后需要复写三个方法,分别是initialize,evaluate,getDisplayString
  • initialize方法在UDF实现类中首先被调用,主要负责:
    • 参数校验,验证输入参数类型是否符合预期
    • 设置返回值,设置返回一个与预期输出类型相符合的对象
    • 存储全局变量,为全局变量赋值
  • evaluate方法处理具体逻辑,返回预期执行结果
  • getDisplayString:类似toString方法

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package org.example.util;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.NDV;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.io.BooleanWritable;

/**
* 编写hive udf
* @author 张睿
* @create 2020-07-15 10:16
**/
@Description(name = "judge",value = "_FUNC_(array,value) - Returns TRUE if the array contains value"
,extended = "Example: SELECT _FUNC_(array('a','b'),'a')")
@NDV(maxNdv = 2)
public class ComplexUdf extends GenericUDF{
ListObjectInspector listOI;
ObjectInspector elementOI;

@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
// 参数数量校验
if(arguments.length!=2){
throw new UDFArgumentLengthException("the operator accept two arguments");
}
// 参数类型校验
ObjectInspector a = arguments[0];
ObjectInspector b = arguments[1];
if(!(a instanceof ListObjectInspector)||!(b instanceof StringObjectInspector)){
throw new UDFArgumentException("first argument must be a list / array, second argument must be a string");
}
this.listOI = (ListObjectInspector) a;
this.elementOI = b;
// 校验list是否由string组成
if(!(listOI.getListElementObjectInspector() instanceof StringObjectInspector)){
throw new UDFArgumentException("first argument must be a list of strings");
}
return PrimitiveObjectInspectorFactory.writableBooleanObjectInspector;
}

@Override
public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
// 使用对象检查器从延迟对象中获取列表和字符串
BooleanWritable result = new BooleanWritable(false);
int elemNum = this.listOI.getListLength(deferredObjects[0].get());
Object arg = deferredObjects[1].get();


for(int i=0;i<elemNum;i++){
this.listOI.getListElement(deferredObjects[0].get(),i);
Object listElement = listOI.getListElement(deferredObjects[0].get(),i);
// String element = elementOI.getPrimitiveJavaObject(lazyString);
if (arg.equals(listElement)){
result.set(true);
break;
}
}
return result;


@Override
public String getDisplayString(String[] args) {
return "if"+args[0]+" include "+args[1]+" return true";
}
}

注册UDF函数

  • 上传本地jar包:add jar xxx; (xxx对应jar包本地路径)
  • 注册临时函数:create temporary function args01 as “args02”; (args01:UDF方法名称,args02:具体实现类路径)
  • 注册永久函数:create function args01 as “args02”;