Hive 进阶应用 – 泛型函数

  • 2019 年 12 月 25 日
  • 筆記

本文的主题: 1 – 泛型函数 (Generic Function) 存在的必要性 2 – 一则泛型函数的简例 3 – 全局函数

1 – 泛型函数存在的必要性

泛型函数 (Generic Function) 存在的意义,解决了运行时参数类型多变,而标准函数无法一一匹配的情况。以判断某变量是否为 Null 而赋予不同默认值为例。程序不可能做到对每种类型都做这样的判断,这样将需要重写很多方法,而泛型则很好解决了该问题

2 – 一则泛型函数的简例

package hive.function.generic;    import org.apache.hadoop.hive.ql.exec.Description;  import org.apache.hadoop.hive.ql.exec.UDFArgumentException;  import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;  import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;    import org.apache.hadoop.hive.ql.metadata.HiveException;    import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;  import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils;    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector ;      @Description(          name    =   "nvl",          value   =   "_FUNC_(value,default_value) - Returns default value " +                      " if value is null else returns value",          extended=   "Example: n" +                      ">SELECT _FUNC_(null,'bla') FROM src LIMIT 1;n"          )      public class genericNvl extends GenericUDF {        private GenericUDFUtils.ReturnObjectInspectorResolver returnOIResolver ;      private ObjectInspector[] argumentOIs ;        @Override      public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException{          argumentOIs = arguments ;          if (arguments.length !=2 ) {              throw new UDFArgumentLengthException(                      "The operator 'NVL' accepts 2 arguments.");          }            returnOIResolver = new GenericUDFUtils.ReturnObjectInspectorResolver(true);          if(!(returnOIResolver.update(arguments[0])&&returnOIResolver.update(arguments[1]))) {              throw new UDFArgumentTypeException(                      2,"The 1st and 2nd args of function NLV should have the same type,"+                      " but they are different: "" + arguments[0].getTypeName() +                      " " and "" + arguments[1].getTypeName() + """);              }            return returnOIResolver.get();          }        @Override      public Object evaluate(DeferredObject[] arguments) throws HiveException{          Object retVal = returnOIResolver.convertIfNecessary(arguments[0].get(),argumentOIs[0]);          if (retVal == null) {              retVal  =   returnOIResolver.convertIfNecessary(arguments[1].get(), argumentOIs[1]);          }          return retVal ;      }        @Override      public String getDisplayString(String[] children) {          StringBuilder sb = new StringBuilder();          sb.append("if ");          sb.append(children[0]);          sb.append(" is null ");          sb.append(" returns ");          sb.append(children[1]);      return sb.toString();      }  }  

returnOIResolver.update 起到的作用是判断两个参数是否能转换

/**       * Update returnObjectInspector and valueInspectorsAreTheSame based on the       * ObjectInspector seen.       *       * @return false if there is a type mismatch       */      private boolean update(ObjectInspector oi, boolean isUnionAll) throws UDFArgumentTypeException {        if (oi instanceof VoidObjectInspector) {          return true;        }          if (returnObjectInspector == null) {          // The first argument, just set the return to be the standard          // writable version of this OI.          returnObjectInspector = ObjectInspectorUtils              .getStandardObjectInspector(oi,              ObjectInspectorCopyOption.WRITABLE);          return true;        }          if (returnObjectInspector == oi) {          // The new ObjectInspector is the same as the old one, directly return          // true          return true;        }          TypeInfo oiTypeInfo = TypeInfoUtils.getTypeInfoFromObjectInspector(oi);        TypeInfo rTypeInfo = TypeInfoUtils            .getTypeInfoFromObjectInspector(returnObjectInspector);        if (oiTypeInfo == rTypeInfo) {          // Convert everything to writable, if types of arguments are the same,          // but ObjectInspectors are different.          returnObjectInspector = ObjectInspectorUtils              .getStandardObjectInspector(returnObjectInspector,              ObjectInspectorCopyOption.WRITABLE);          return true;        }          if (!allowTypeConversion) {          return false;        }          // Types are different, we need to check whether we can convert them to        // a common base class or not.        TypeInfo commonTypeInfo = null;        if (isUnionAll) {          commonTypeInfo = FunctionRegistry.getCommonClassForUnionAll(rTypeInfo, oiTypeInfo);        } else {          commonTypeInfo = FunctionRegistry.getCommonClass(oiTypeInfo,            rTypeInfo);        }        if (commonTypeInfo == null) {          return false;        }          commonTypeInfo = updateCommonTypeForDecimal(commonTypeInfo, oiTypeInfo, rTypeInfo);          returnObjectInspector = TypeInfoUtils            .getStandardWritableObjectInspectorFromTypeInfo(commonTypeInfo);          return true;      }  

除了 initialize 方法,GenericUDF 子类还需要重写其他两个方法,即 evaluate 和 getDisplayString.

3 – 全局函数

在添加临时自定义函数时,引用 Jar 包中定义的类名,而不是包名,如下:

hive> add jar /home/SparkAdmin/HiveFunctions/Nvl.jar      > ;  Added [/home/SparkAdmin/HiveFunctions/Nvl.jar] to class path  Added resources: [/home/SparkAdmin/HiveFunctions/Nvl.jar]  hive> create temporary function NullReplace as 'hive.function.generic.Nvl' ;  FAILED: Class hive.function.generic.Nvl not found  FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.FunctionTask  hive> create temporary function NullReplace as 'hive.function.generic.genericNvl' ;  OK  

3.1 -使用泛型函数:

初始化带 Null 值的数据:

hive> insert into default.employee(name,salary,subordinates,deductions,address)      > select null,null,subordinates,deductions,address from default.employee      > limit 10 ;  WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.  Query ID = SparkAdmin_20181124142056_7af103f3-95de-4d42-9b64-77337ad06734  Total jobs = 1  Launching Job 1 out of 1  Number of reduce tasks determined at compile time: 1  In order to change the average load for a reducer (in bytes):    set hive.exec.reducers.bytes.per.reducer=<number>  In order to limit the maximum number of reducers:    set hive.exec.reducers.max=<number>  In order to set a constant number of reducers:    set mapreduce.job.reduces=<number>  Job running in-process (local Hadoop)  2018-11-24 14:20:59,351 Stage-1 map = 100%,  reduce = 0%  2018-11-24 14:21:00,368 Stage-1 map = 100%,  reduce = 100%  Ended Job = job_local362424371_0001  Loading data to table default.employee  MapReduce Jobs Launched:  Stage-Stage-1:  HDFS Read: 50910 HDFS Write: 298 SUCCESS  Total MapReduce CPU Time Spent: 0 msec  OK  Time taken: 4.982 seconds  hive> select * from default.employee ;  OK  ali    320.0   ["ali","acai","ayun"]   {"ali":1,"acai":2,"ayun":7} {"street":"zhejiang","city":"hangzhou","state":"hubin","zip":"201210"}  liton    345.0   ["liton","acai","ayun"] {"liton":1,"acai":2,"ayun":7}   {"street":"zhejiang","city":"hangzhou","state":"hubin","zip":"201210"}  tencent    543.0   ["tencent","acai","ayun"]   {"tencent":1,"acai":2,"ayun":7} {"street":"zhejiang","city":"hangzhou","state":"hubin","zip":"201210"}  NULL    NULL    ["tencent","acai","ayun"]   {"tencent":1,"acai":2,"ayun":7} {"street":"zhejiang","city":"hangzhou","state":"hubin","zip":"201210"}  NULL    NULL    ["liton","acai","ayun"] {"liton":1,"acai":2,"ayun":7}   {"street":"zhejiang","city":"hangzhou","state":"hubin","zip":"201210"}  NULL    NULL    ["ali","acai","ayun"]   {"ali":1,"acai":2,"ayun":7} {"street":"zhejiang","city":"hangzhou","state":"hubin","zip":"201210"}  Time taken: 0.115 seconds, Fetched: 6 row(s)  hive>  

null 替换:

hive> select nullreplace(salary,0) as salary from default.employee ;  OK  320.0  345.0  543.0  0.0  0.0  0.0  Time taken: 0.109 seconds, Fetched: 6 row(s)  

即使 2 个参数明面上不是同一个类型,但最终还是相互转换了:

hive> select nullreplace(salary,"end") as salary from default.employee ;  OK  320.0  345.0  543.0  end  end  end  Time taken: 0.1 seconds, Fetched: 6 row(s)  hive>  

但如果不能像数字与字符之间进行隐式转换,就会报错了:

hive> select nullreplace(salary,array("em","bm","fm")) as salary from default.employee ;  FAILED: NullPointerException null  

3.2 – 函数全局可用

自定义函数的调用,是临时的。当关闭当前会话或重开会话时,函数就不能被调用了。

hive> select nullreplace(name,"end") as name from default.name ;  FAILED: SemanticException [Error 10011]: Invalid function nullreplace  

实现所有会话都能调用自定义函数,简单直接的方法就是配置 ~/.hiverc (runtime configuration) 文件,在会话开始就定义好要用的自定义函数。

修改 ~/.hiverc 文件:
[SparkAdmin@centos00 bin]$ vi ~/.hiverc  add jar /home/SparkAdmin/HiveFunctions/Nvl.jar;  create temporary function NullReplace as 'hive.function.generic.genericNvl';  ~  
Create Function 建立全局函数

.hiverc 配置方式放在大型的项目中,复杂了应用,所以 Hive 新版中直接使用 create function 就可以将自定义函数的生存周期放到全局,本质上是将定义的函数存储在了 metaData store 里面

hive> create function nullreplace2 as 'hive.function.generic.genericNvl' using jar '/home/SparkAdmin/HiveFunctions/Nvl.jar' ;  FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.FunctionTask. Hive warehouse is non-local, but /home/SparkAdmin/HiveFunctions/Nvl.jar specifies file on local filesystem. Resources on non-local warehouse should specify a non-local scheme/path  hive>  

解决方法:

[SparkAdmin@centos00 conf]$ hdfs dfs -copyFromLocal /home/SparkAdmin/HiveFunctions/Nvl.jar /user/hive/warehouse  [SparkAdmin@centos00 conf]$ hdfs dfs -ls /user/hive/warehouse  Found 5 items  -rw-r--r--   3 SparkAdmin supergroup       1798 2018-11-24 20:41 /user/hive/warehouse/Nvl.jar  drwxr-xr-x   - SparkAdmin supergroup          0 2018-11-05 22:04 /user/hive/warehouse/account  drwxr-xr-x   - SparkAdmin supergroup          0 2018-11-09 23:03 /user/hive/warehouse/crm.db  drwxr-xr-x   - SparkAdmin supergroup          0 2018-11-24 14:21 /user/hive/warehouse/employee  drwxr-xr-x   - SparkAdmin supergroup          0 2018-10-31 16:17 /user/hive/warehouse/student  [SparkAdmin@centos00 conf]$  

接着创建函数:

hive> create function nullreplace2 as 'hive.function.generic.genericNvl' using jar 'hdfs:///user/hive/warehouse/Nvl.jar' ;  Added [/tmp/06ebd574-bcbc-4146-bc39-f195b8d0c9c2_resources/Nvl.jar] to class path  Added resources: [hdfs:///user/hive/warehouse/Nvl.jar]  OK  Time taken: 0.814 seconds  hive> select nullreplace2(name,"end") as name from default.employee ;  OK  ali  liton  tencent  end  end  end  Time taken: 1.93 seconds, Fetched: 6 row(s)  hive>  

如果整个开发组中,有部分开发人员使用 hive 命令行,而另外部分开发使用了 oracle sql developer,如何让自定义函数在全组开发人员中共享呢?

答案是创建全局函数。

就如前面从 hdfs 的 Jar 包中调用函数一样,在 oracle sql developer 中创建一个全局函数:

create function nullReplace_osd as 'hive.function.generic.genericNvl' using jar 'hdfs:///user/hive/warehouse/Nvl.jar'  

打开 Hive 命令行,调用 oracle sql developer 中创建的函数 nullReplace_osd 即可:

hive> select default.nullReplace_osd(name,"end") as name from default.employee ;  Added [/tmp/8526a964-ef87-4924-a331-73013b31f553_resources/Nvl.jar] to class path  Added resources: [hdfs:///user/hive/warehouse/Nvl.jar]  OK  ali  liton  tencent  end  end  end  Time taken: 1.747 seconds, Fetched: 6 row(s)  hive>  

同理,在 Hive 命令行中创建的全局自定义函数,也可以在 oracle sql developer 中调用:

hive> create function NullReplace_hcmd  as 'hive.function.generic.genericNvl' using jar 'hdfs:///user/hive/warehouse/Nvl.jar' ;  Added [/tmp/8526a964-ef87-4924-a331-73013b31f553_resources/Nvl.jar] to class path  Added resources: [hdfs:///user/hive/warehouse/Nvl.jar]  OK  Time taken: 0.047 seconds  hive> select NullReplace_hcmd(name,"end") as name from default.employee;  OK  ali  liton  tencent  end  end  end  Time taken: 0.146 seconds, Fetched: 6 row(s)  hive>  

如果 oracle sql developer 打开则重启,然后调用 hive 命令行创建的全局自定义函数:

执行调用函数:

select default.NullReplace_hcmd2(name,"end") as name from default.employee;      在行: 6 上开始执行命令时出错 -  select default.NullReplace_hcmd2(name,"end") as name from default.employee  错误位于命令行: 6 列: 1  错误报告 -  SQL 错误: [Cloudera][HiveJDBCDriver](500051) ERROR processing query/statement. Error Code: 10011, SQL state: TStatus(statusCode:ERROR_STATUS, infoMessages:[*org.apache.hive.service.cli.HiveSQLException:Error while compiling statement: FAILED: SemanticException [Error 10011]: Invalid function default.NullReplace_hcmd2:17:16, org.apache.hive.service.cli.operation.Operation:toSQLException:Operation.java:380, org.apache.hive.service.cli.operation.SQLOperation:prepare:SQLOperation.java:206, org.apache.hive.service.cli.operation.SQLOperation:runInternal:SQLOperation.java:290, org.apache.hive.service.cli.operation.Operation:run:Operation.java:320, org.apache.hive.service.cli.session.HiveSessionImpl:executeStatementInternal:HiveSessionImpl.java:530, org.apache.hive.service.cli.session.HiveSessionImpl:executeStatementAsync:HiveSessionImpl.java:517, org.apache.hive.service.cli.CLIService:executeStatementAsync:CLIService.java:310, org.apache.hive.service.cli.thrift.ThriftCLIService:ExecuteStatement:ThriftCLIService.java:530, org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement:getResult:TCLIService.java:1437, org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement:getResult:TCLIService.java:1422, org.apache.thrift.ProcessFunction:process:ProcessFunction.java:39, org.apache.thrift.TBaseProcessor:process:TBaseProcessor.java:39, org.apache.hive.service.auth.TSetIpAddressProcessor:process:TSetIpAddressProcessor.java:56, org.apache.thrift.server.TThreadPoolServer$WorkerProcess:run:TThreadPoolServer.java:286, java.util.concurrent.ThreadPoolExecutor:runWorker:ThreadPoolExecutor.java:1142,  java.util.concurrent.ThreadPoolExecutor$Worker:run:ThreadPoolExecutor.java:617, java.lang.Thread:run:Thread.java:745, *org.apache.hadoop.hive.ql.parse.SemanticException:Invalid function default.NullReplace_hcmd2:28:12, org.apache.hadoop.hive.ql.parse.SemanticAnalyzer:doPhase1GetAllAggregations:SemanticAnalyzer.java:636, org.apache.hadoop.hive.ql.parse.SemanticAnalyzer:doPhase1GetAggregationsFromSelect:SemanticAnalyzer.java:558, org.apache.hadoop.hive.ql.parse.SemanticAnalyzer:doPhase1:SemanticAnalyzer.java:1464, org.apache.hadoop.hive.ql.parse.SemanticAnalyzer:doPhase1:SemanticAnalyzer.java:1768, org.apache.hadoop.hive.ql.parse.SemanticAnalyzer:doPhase1:SemanticAnalyzer.java:1768, org.apache.hadoop.hive.ql.parse.SemanticAnalyzer:genResolvedParseTree:SemanticAnalyzer.java:11072, org.apache.hadoop.hive.ql.parse.SemanticAnalyzer:analyzeInternal:SemanticAnalyzer.java:11133, org.apache.hadoop.hive.ql.parse.CalcitePlanner:analyzeInternal:CalcitePlanner.java:286, org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer:analyze:BaseSemanticAnalyzer.java:258, org.apache.hadoop.hive.ql.Driver:compile:Driver.java:512, org.apache.hadoop.hive.ql.Driver:compileInternal:Driver.java:1317, org.apache.hadoop.hive.ql.Driver:compileAndRespond:Driver.java:1295, org.apache.hive.service.cli.operation.SQLOperation:prepare:SQLOperation.java:204], sqlState:42000, errorCode:10011, errorMessage:Error while compiling statement: FAILED: SemanticException [Error 10011]: Invalid function default.NullReplace_hcmd2), Query: select default.NullReplace_hcmd2(name,"end") as name from default.employee.  

查询 metaData store 数据库,不难发现函数是全部创建成功了,但权限问题隔离了用户访问权限:

SELECT TOP (1000) [FUNC_ID]        ,[CLASS_NAME]        ,[CREATE_TIME]        ,[DB_ID]        ,[FUNC_NAME]        ,[FUNC_TYPE]        ,[OWNER_NAME]        ,[OWNER_TYPE]    FROM [metadata].[dbo].[FUNCS]  

image

Hive 的权限问题,另开一章讲。

重新编译 Hive

当有十足的把握和复用的必要,提交自定义函数,重新编译 Hive ,是解决覆盖率和及时性的惯用方法。但缼点也很明显,容易造成系统不稳定。所以 Hive 开发小组才有了 Create Function 即可全局使用函数这个补救措施。

Hive 性能调优,这 9 点都掌握了?

Hive 编程专题之 – 自定义函数 Java 篇