Hive UDF,就這
摘要:Hive UDF是什麼?有什麼用?怎麼用?什麼原理?本文從UDF使用入手,簡要介紹相關源碼,UDF從零開始。
本文分享自華為雲社區《Hive UDF,就這》,作者:湯忒撒。
UDF,(User Defined Function)用戶自定義函數
UDTF,(User-defined Table Generating Function)自定義表生成函數,一行數據生成多行
UDAF,(User-defined Aggregation Function)用戶自定義聚合函數,多行數據生成一行
1. UDF簡介
- apache.hadoop.hive.ql.exec.UDF,處理並返回基本數據類型,int、string、boolean、double等;
- apache.hadoop.hive.ql.udf.generic.GenericUDF,可處理並返回複雜數據類型,如Map、List、Array等,同時支援嵌套;
2. UDF相關語法
2.1. resources操作
Hive支援向會話中添加資源,支援文件、jar、存檔,添加後即可在sql中直接引用,僅當前會話有效,默認讀取本地路徑,支援hdfs等,路徑不加引號。例:add jar /opt/ht/AddUDF.jar;
添加資源 ADD { FILE[S] | JAR[S] | ARCHIVE[S] } <filepath1> [<filepath2>]* 查看資源 LIST { FILE[S] | JAR[S] | ARCHIVE[S] } [<filepath1> <filepath2> ..] 刪除資源 DELETE { FILE[S] | JAR[S] | ARCHIVE[S] } [<filepath1> <filepath2> ..]
2.2. 臨時函數
CREATE TEMPORARY FUNCTION function_name AS class_name [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ]; DROP TEMPORARY FUNCTION [IF EXISTS] function_name;
2.3. 永久函數
CREATE FUNCTION [db_name.]function_name AS class_name [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ]; DROP FUNCTION [IF EXISTS] function_name; RELOAD (FUNCTIONS|FUNCTION);
2.4. 查看函數
查看所有函數,不區分臨時函數與永久函數 show functions; 函數模糊查詢,此處為查詢x開頭的函數 show functions like 'x*'; 查看函數描述 desc function function_name; 查看函數詳細描述 desc function extended function_name;
3. Description註解
Hive已定義註解類型org.apache.hadoop.hive.ql.exec.Description,用於執行desc function [extended] function_name時介紹函數功能,內置函數與自定義函數用法相同。
public @interface Description { //函數簡單介紹 String value() default "_FUNC_ is undocumented"; //函數詳細使用說明 String extended() default ""; //函數名稱 String name() default ""; }
desc function ceil;
desc function extended ceil;
4. UDF
public Text evaluate(Text s) public int evaluate(Integer s) …
4.1. UDF示例
@Description( name="my_plus", value="my_plus() - if string, do concat; if integer, do plus", extended = "Example : \n >select my_plus('a', 'b');\n >ab\n >select my_plus(3, 5);\n >8" ) public class AddUDF extends UDF { public String evaluate(String... parameters) { if (parameters == null || parameters.length == 0) { return null; } StringBuilder sb = new StringBuilder(); for (String param : parameters) { sb.append(param); } return sb.toString(); } public int evaluate(IntWritable... parameters) { if (parameters == null || parameters.length == 0) { return 0; } long sum = 0; for (IntWritable currentNum : parameters) { sum = Math.addExact(sum, currentNum.get()); } return (int) sum; } }
hdfs dfs -put AddUDF.jar /tmp/ht/
create function my_plus as ‘’ using jar ‘hdfs:///tmp/ht/AddUDF.jar’;
desc function my_plus;
desc function extended my_plus;
4.2. 源碼淺析
public class UDF { //udf方法解析器 private UDFMethodResolver rslv; //默認構造器DefaultUDFMethodResolver public UDF() { rslv = new DefaultUDFMethodResolver(this.getClass()); } protected UDF(UDFMethodResolver rslv) { this.rslv = rslv; } public void setResolver(UDFMethodResolver rslv) { this.rslv = rslv; } public UDFMethodResolver getResolver() { return rslv; } public String[] getRequiredJars() { return null; } public String[] getRequiredFiles() { return null; } }
public class DefaultUDFMethodResolver implements UDFMethodResolver { //The class of the UDF. private final Class<? extends UDF> udfClass; public DefaultUDFMethodResolver(Class<? extends UDF> udfClass) { this.udfClass = udfClass; } @Override public Method getEvalMethod(List<TypeInfo> argClasses) throws UDFArgumentException { return FunctionRegistry.getMethodInternal(udfClass, "evaluate", false, argClasses); } }
5. GenericUDF
//初始化,ObjectInspector為數據類型封裝類,無實際參數值,返回結果類型 public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException { return null; } //DeferredObject封裝實際參數的對應Writable類 public Object evaluate(DeferredObject[] deferredObjects) throws HiveException { return null; } //函數資訊 public String getDisplayString(String[] strings) { return null; }
5.1. GenericUDF示例
自定義函數實現count函數,支援int與long類型,Hive中無long類型,對應類型為bigint,create function與資料庫保存與UDF一致,此處不再贅述。
@Description( name="my_count", value="my_count(...) - count int or long type numbers", extended = "Example :\n >select my_count(3, 5);\n >8\n >select my_count(3, 5, 25);\n >33" ) public class MyCountUDF extends GenericUDF { private PrimitiveObjectInspector.PrimitiveCategory[] inputType; private transient ObjectInspectorConverters.Converter intConverter; private transient ObjectInspectorConverters.Converter longConverter; @Override public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException { int length = objectInspectors.length; inputType = new PrimitiveObjectInspector.PrimitiveCategory[length]; for (int i = 0; i < length; i++) { ObjectInspector currentOI = objectInspectors[i]; ObjectInspector.Category type = currentOI.getCategory(); if (type != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentException("The function my_count need PRIMITIVE Category, but get " + type); } PrimitiveObjectInspector.PrimitiveCategory primitiveType = ((PrimitiveObjectInspector) currentOI).getPrimitiveCategory(); inputType[i] = primitiveType; switch (primitiveType) { case INT: if (intConverter == null) { ObjectInspector intOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType); intConverter = ObjectInspectorConverters.getConverter(currentOI, intOI); } break; case LONG: if (longConverter == null) { ObjectInspector longOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType); longConverter = ObjectInspectorConverters.getConverter(currentOI, longOI); } break; default: throw new UDFArgumentException("The function my_count need INT OR BIGINT, but get " + primitiveType); } } return PrimitiveObjectInspectorFactory.writableLongObjectInspector; } @Override public Object evaluate(DeferredObject[] deferredObjects) throws HiveException { LongWritable out = new LongWritable(); for (int i = 0; i < deferredObjects.length; i++) { PrimitiveObjectInspector.PrimitiveCategory type = this.inputType[i]; Object param = deferredObjects[i].get(); switch (type) { case INT: Object intObject = intConverter.convert(param); out.set(Math.addExact(out.get(), ((IntWritable) intObject).get())); break; case LONG: Object longObject = longConverter.convert(param); out.set(Math.addExact(out.get(), ((LongWritable) longObject).get())); break; default: throw new IllegalStateException("Unexpected type in MyCountUDF evaluate : " + type); } } return out; } @Override public String getDisplayString(String[] strings) { return "my_count(" + Joiner.on(", ").join(strings) + ")"; } }
create function my_count as ‘’ using jar ‘hdfs:///tmp/countUDF.jar’;
create table test_numeric(i1 int, b1 bigint, b2 bigint, i2 int, i3 int);
insert into table test_numeric values(0, -10, 25, 300, 15), (11, 22, 33, 44, 55);
select *, my_count(*) from test_numeric;
5.2. 源碼淺析
public interface ObjectInspector extends Cloneable { //用於類型名稱 String getTypeName(); //用於獲取ObjectInspector封裝的欄位類型 ObjectInspector.Category getCategory(); public static enum Category { PRIMITIVE, LIST, MAP, STRUCT, UNION; private Category() { } } }
public static enum PrimitiveCategory { VOID, BOOLEAN, BYTE, SHORT, INT, LONG, … }
GenericUDF. initializeAndFoldConstants
public ObjectInspector initializeAndFoldConstants(ObjectInspector[] arguments) throws UDFArgumentException { ObjectInspector oi = this.initialize(arguments); if (this.getRequiredFiles() == null && this.getRequiredJars() == null) { boolean allConstant = true; for(int ii = 0; ii < arguments.length; ++ii) { if (!ObjectInspectorUtils.isConstantObjectInspector(arguments[ii])) { allConstant = false; break; } } if (allConstant && !ObjectInspectorUtils.isConstantObjectInspector((ObjectInspector)oi) && FunctionRegistry.isConsistentWithinQuery(this) && ObjectInspectorUtils.supportsConstantObjectInspector((ObjectInspector)oi)) { GenericUDF.DeferredObject[] argumentValues = new GenericUDF.DeferredJavaObject[arguments.length]; for(int ii = 0; ii < arguments.length; ++ii) { argumentValues[ii] = new GenericUDF.DeferredJavaObject(((ConstantObjectInspector)arguments[ii]).getWritableConstantValue()); } try { Object constantValue = this.evaluate(argumentValues); oi = ObjectInspectorUtils.getConstantObjectInspector((ObjectInspector)oi, constantValue); } catch (HiveException var6) { throw new UDFArgumentException(var6); } } return (ObjectInspector)oi; } else { return (ObjectInspector)oi; } }
6. UDF相關源碼
6.1. 運算符
Hive SQL中,「+、-、*、/、=」等運算符都是是UDF函數,在FunctionRegistry中聲明,所有UDF均在編譯階段由AST生成Operator樹時解析,常量直接計算結果值,其他類型僅初始化,獲取輸出類型用於生成Operator樹,後續在Operator真正執行時計算結果值。
static { HIVE_OPERATORS.addAll(Arrays.asList( "+", "-", "*", "/", "%", "div", "&", "|", "^", "~", "and", "or", "not", "!", "=", "==", "<=>", "!=", "<>", "<", "<=", ">", ">=", "index")); }
6.2. 函數類型
public static enum FunctionType { BUILTIN, PERSISTENT, TEMPORARY; }
6.3. FunctionRegistry
public final class FunctionRegistry { … private static final Registry system = new Registry(true); static { system.registerGenericUDF("concat", GenericUDFConcat.class); system.registerUDF("substr", UDFSubstr.class, false); … } … public static void registerTemporaryMacro( String macroName, ExprNodeDesc body, List<String> colNames, List<TypeInfo> colTypes) { SessionState.getRegistryForWrite().registerMacro(macroName, body, colNames, colTypes); } public static FunctionInfo registerPermanentFunction(String functionName, String className, boolean registerToSession, FunctionResource[] resources) { return system.registerPermanentFunction(functionName, className, registerToSession, resources); } … }
6.4. GenericUDFBridge
private FunctionInfo registerUDF(String functionName, FunctionType functionType, Class<? extends UDF> UDFClass, boolean isOperator, String displayName, FunctionResource... resources) { validateClass(UDFClass, UDF.class); FunctionInfo fI = new FunctionInfo(functionType, displayName, new GenericUDFBridge(displayName, isOperator, UDFClass.getName()), resources); addFunction(functionName, fI); return fI; }
public GenericUDFBridge(String udfName, boolean isOperator, String udfClassName) { this.udfName = udfName; this.isOperator = isOperator; this.udfClassName = udfClassName; } @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { //初始化UDF對象 try { udf = (UDF)getUdfClassInternal().newInstance(); } catch (Exception e) { throw new UDFArgumentException( "Unable to instantiate UDF implementation class " + udfClassName + ": " + e); } // Resolve for the method based on argument types ArrayList<TypeInfo> argumentTypeInfos = new ArrayList<TypeInfo>( arguments.length); for (ObjectInspector argument : arguments) { argumentTypeInfos.add(TypeInfoUtils .getTypeInfoFromObjectInspector(argument)); } udfMethod = udf.getResolver().getEvalMethod(argumentTypeInfos); udfMethod.setAccessible(true); // Create parameter converters conversionHelper = new ConversionHelper(udfMethod, arguments); // Create the non-deferred realArgument realArguments = new Object[arguments.length]; // Get the return ObjectInspector. ObjectInspector returnOI = ObjectInspectorFactory .getReflectionObjectInspector(udfMethod.getGenericReturnType(), ObjectInspectorOptions.JAVA); return returnOI; } @Override public Object evaluate(DeferredObject[] arguments) throws HiveException { assert (arguments.length == realArguments.length); // Calculate all the arguments for (int i = 0; i < realArguments.length; i++) { realArguments[i] = arguments[i].get(); } // Call the function,反射執行UDF類evaluate方法 Object result = FunctionRegistry.invoke(udfMethod, udf, conversionHelper .convertIfNecessary(realArguments)); // For non-generic UDF, type info isn't available. This poses a problem for Hive Decimal. // If the returned value is HiveDecimal, we assume maximum precision/scale. if (result != null && result instanceof HiveDecimalWritable) { result = HiveDecimalWritable.enforcePrecisionScale ((HiveDecimalWritable) result, HiveDecimal.SYSTEM_DEFAULT_PRECISION, HiveDecimal.SYSTEM_DEFAULT_SCALE); } return result; }
6.5. 函數調用入口
1. 編譯時遍歷語法樹轉換Operator。
2. 啟用常量傳播優化器優化時,ConstantPropagate中遍歷樹過程調用;
3. UDF參數不是常量,SQL按計劃執行過程中Operator真正執行時;
Operator真正執行時,由ExprNodeGenericFuncEvaluator. _evaluate處理每行數據,計算UDF結果值。
@Override protected Object _evaluate(Object row, int version) throws HiveException { if (isConstant) { // The output of this UDF is constant, so don't even bother evaluating. return ((ConstantObjectInspector) outputOI).getWritableConstantValue(); } rowObject = row; for (GenericUDF.DeferredObject deferredObject : childrenNeedingPrepare) { deferredObject.prepare(version); } return genericUDF.evaluate(deferredChildren); }