Hive UDF 构建
什么是Hive UDF
UDF(User-Defined-Functions)用户自定义的Hive函数。
Hive UDF 种类
- UDF:one to one ,操作单个数据行,产生对应的单行数据
- UDAF:many to one,操作多行数据,产生一行数据
- UDTF:one to many,操作一行数据,产生多行数据
实现UDF
maven 依赖
1 2 3 4 5 6 7 8 9 10 11
| <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>3.1.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.2.1</version> </dependency>
|
注意点
实现过程
- 继承GenericUDF后需要复写三个方法,分别是initialize,evaluate,getDisplayString
- initialize方法在UDF实现类中首先被调用,主要负责:
- 参数校验,验证输入参数类型是否符合预期
- 设置返回值,设置返回一个与预期输出类型相符合的对象
- 存储全局变量,为全局变量赋值
- evaluate方法处理具体逻辑,返回预期执行结果
- getDisplayString:类似toString方法
实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| package org.example.util;
import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.ql.udf.generic.NDV; import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.apache.hadoop.io.BooleanWritable;
/** * 编写hive udf * @author 张睿 * @create 2020-07-15 10:16 **/ @Description(name = "judge",value = "_FUNC_(array,value) - Returns TRUE if the array contains value" ,extended = "Example: SELECT _FUNC_(array('a','b'),'a')") @NDV(maxNdv = 2) public class ComplexUdf extends GenericUDF{ ListObjectInspector listOI; ObjectInspector elementOI; @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { // 参数数量校验 if(arguments.length!=2){ throw new UDFArgumentLengthException("the operator accept two arguments"); } // 参数类型校验 ObjectInspector a = arguments[0]; ObjectInspector b = arguments[1]; if(!(a instanceof ListObjectInspector)||!(b instanceof StringObjectInspector)){ throw new UDFArgumentException("first argument must be a list / array, second argument must be a string"); } this.listOI = (ListObjectInspector) a; this.elementOI = b; // 校验list是否由string组成 if(!(listOI.getListElementObjectInspector() instanceof StringObjectInspector)){ throw new UDFArgumentException("first argument must be a list of strings"); } return PrimitiveObjectInspectorFactory.writableBooleanObjectInspector; } @Override public Object evaluate(DeferredObject[] deferredObjects) throws HiveException { // 使用对象检查器从延迟对象中获取列表和字符串 BooleanWritable result = new BooleanWritable(false); int elemNum = this.listOI.getListLength(deferredObjects[0].get()); Object arg = deferredObjects[1].get();
for(int i=0;i<elemNum;i++){ this.listOI.getListElement(deferredObjects[0].get(),i); Object listElement = listOI.getListElement(deferredObjects[0].get(),i); // String element = elementOI.getPrimitiveJavaObject(lazyString); if (arg.equals(listElement)){ result.set(true); break; } } return result;
@Override public String getDisplayString(String[] args) { return "if"+args[0]+" include "+args[1]+" return true"; } }
|
注册UDF函数
- 上传本地jar包:add jar xxx; (xxx对应jar包本地路径)
- 注册临时函数:create temporary function args01 as “args02”; (args01:UDF方法名称,args02:具体实现类路径)
- 注册永久函数:create function args01 as “args02”;