写点什么

(转)大数据开发之 Hive 中 UDTF 函数

  • 2021 年 12 月 21 日
  • 本文字数:4504 字

    阅读完需:约 15 分钟

​动作日志解析思路:


动作日志表中每行数据对应用户的一个动作记录,一个动作记录应当包含公共信息、页面信息以及动作信息。先将包含 action 字段的大数据培训日志过滤出来,然后通过 UDTF 函数,将 action 数组“炸开”(类似于 explode 函数的效果),然后使用 get_json_object 函数解析每个字段。



真实情况下,一条日志是存在多个动作的. 和下面这条日志很类似:


{"common":{"ar":"110000","ba":"Xiaomi","ch":"oppo","md":"Xiaomi 9","mid":"mid_128","os":"Android 10.0","uid":"160","vc":"v2.1.134"},"displays":[ 这是一个 json 数组,一对多的关系{"displayType":"activity","item":"1","item_type":"activity_id","order":1},{"displayType":"activity","item":"2","item_type":"activity_id","order":2,{"displayType":"promotion","item":"10","item_type":"sku_id","order":3},{"displayType":"query","item":"4","item_type":"sku_id","order":4},{"displayType":"query","item":"3","item_type":"sku_id","order":5},{"displayType":"recommend","item":"8","item_type":"sku_id","order":6},{"displayType":"promotion","item":"6","item_type":"sku_id","order":7},{"displayType":"promotion","item":"5","item_type":"sku_id","order":8},{"displayType":"query","item":"9","item_type":"sku_id","order":9},{"displayType":"query","item":"2","item_type":"sku_id","order":10},{"displayType":"query","item":"3","item_type":"sku_id","order":11}],"page":{"during_time":17802,"page_id":"home"},"ts":1592125099885}


UDTF 思想:


ods 层 ods_log 那张表,动作信息的日志中包含 3 大块:


普通信息 页面信息 动作信息

c1(用这个字段表示) p1(用这个字段表示) actionsa1,a2,a3

c2(用这个字段表示) p2(用这个字段表示) actionsa4,a5


上面这种表示是 ods 层 log 那张表表示的数据粒度。


但是 dwd 层的表,我们期望的数据粒度呢:


普通信息 页面信息 炸开动作信息

c1 p1 a1

c1 p1 a2

c1 p1 a3

c2 p2 a4

c2 p2 a5


select

common,

page,

action

from ods_log

lateral view explode(actions) tmp as action; tmp 是别名的意思,action 是列别名的意思。explode 是 hive 中的炸裂函数,只能炸 hive 中的数组。


但是此时 actions[a1,a2,a3]是 json 字符串数组,里面每一个元素都是对象。


写函数,首先想,输入参数是什么类型;输出参数是什么类型。



hive udtf 官网:


https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF


1)建表语句


2)创建 UDTF 函数——设计思路


3)创建 UDTF 函数——编写代码


(1)创建一个 maven 工程:hivefunction


(2)创建包名:com.alibaba.hive.udtf


(3)引入如下依赖


public class ExplodeJSONArray extends GenericUDTF {


@Overridepublic void process(Object[] args) {

}

@Overridepublic void close() throws HiveException {

}
复制代码


}


点进去可以看见初始化方法 initialize(),这个方法不是抽象方法,所以当我们实现上面 GenericUDTF 接口时就只有 2 个方法.


public StructObjectInspector initialize(StructObjectInspector argOIs)throws UDFArgumentException {List<? extends StructField> inputFields = argOIs.getAllStructFieldRefs();


ObjectInspector[] udtfInputOIs = new ObjectInspector[inputFields.size()];

for (int i = 0; i < inputFields.size(); i++) { udtfInputOIs[i] = inputFields.get(i).getFieldObjectInspector();}return initialize(udtfInputOIs);
复制代码


}


所以我们要重写父类的方法: idea 快捷键,ctrl+ o


public class ExplodeJSONArray extends GenericUDTF {/*** 初始化方法,里面要做三件* 1.约束函数传入参数的个数* 2.约束函数传入参数的类型* 3.约束函数返回值的类型*/@Overridepublic StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException { Struct 结构体,一个结构体,里面有多个属性名:属性值。结构体,类似 java 中的对象。return super.initialize(argOIs);}


@Overridepublic void process(Object[] args) {

}

@Overridepublic void close() throws HiveException {

}
复制代码


}


由于我们引入了 hive 依赖,我们打开依赖看下,



我们这里用这个:


完整的代码:


package com.alibaba.hive.udtf;


import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;import org.apache.hadoop.hive.ql.exec.UDFArgumentException;import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;import org.apache.hadoop.hive.ql.metadata.HiveException;import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;import org.apache.hadoop.hive.serde2.objectinspector.StructField;import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;import org.json.JSONArray;


import java.util.ArrayList;import java.util.List;


/**


  • @author zhouyanjun

  • @create 2021-06-21 22:02/public class ExplodeJSONArray extends GenericUDTF {/*

  • 初始化方法,里面要做三件事

  • 1.约束函数传入参数的个数

  • 2.约束函数传入参数的类型

  • 3.约束函数返回值的类型*/@Overridepublic StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException { //返回结构体类型。udtf 函数,有可能炸开之后形成多列。所以用返回结构体来封装。属性名:属性值。属性名就是列名;属性值就是列的类型。//用结构体,来 1.约束函数传入参数的个数// List<? extends StructField> allStructFieldRefs = argOIs.getAllStructFieldRefs();//见名知意,获取结构体所有属性的引用 可以看见返回值是个 list 类型.

  • if(argOIs.getAllStructFieldRefs().size()!=1){ //只要个数不等于 1,就抛出异常 throw new UDFArgumentLengthException("explode_json_array()函数的参数个数只能为 1");}

  • //2.约束函数传入参数的类型// StructField structField = argOIs.getAllStructFieldRefs().get(0);//只能有一个参数,所以 index 给 0 可以看见,是获得结构体的属性// ObjectInspector fieldObjectInspector = argOIs.getAllStructFieldRefs().get(0).getFieldObjectInspector();//获得属性的对象检测器 。通过检查器我们才能知道是什么类型.String typeName = argOIs.getAllStructFieldRefs().get(0).getFieldObjectInspector().getTypeName();//我们要确保传入的类型是 stringif(!"string".equals(typeName)){throw new UDFArgumentTypeException(0,"explode_json_array 函数的第 1 个参数的类型只能为 String."); //抛出异常}

  • //3.约束函数返回值的类型 List<String> fieldNames = new ArrayList<>(); //② 表示我建立了一个 String 类型的集合。表示存储的列名

  • List<ObjectInspector> fieldOIs = new ArrayList<>(); //②fieldNames.add("item"); //炸裂之后有个列名,如果不重新 as,那这个 item 就是列名//不知道怎么写的时候,上官网去抄示例代码 fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); //表示 item 这一列是什么类型.基本数据类型工厂类,获取了个 string 类型的检查器

  • //用一个工厂类获取 StructObjectInspector 类型。return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);//①获取标准结构体检查器。fieldNames,fieldOI 是两个变量名

  • }

  • @Override //这里是实现主逻辑的方法。首先分析下需求:把 json array 字符串变成一个 json 字符串 public void process(Object[] args) throws HiveException { //形参类型是 object 数组//1 获取函数传入的 jsonarray 字符串 String jsonArrayStr = args[0].toString(); //我要把 jsonArrayStr 字符串划分为一个一个的 json,通过字符串这种类型是不好划分的。不知道如何 split 切分//2 将 jsonArray 字符串转换成 jsonArray 数组。正常情况下我们要引入依赖,比如 fastjson 啥的。JSONArray jsonArray = new JSONArray(jsonArrayStr); //通过 JSONArray 这种类型,我们就比较容易获得一条条的 json 字符串//3 得到 jsonArray 里面的一个个 json,并把他们写出。将 actions 里面的一个个 action 写出 for (int i = 0; i < jsonArray.length(); i++) { //普通 for 循环进行遍历 String jsonStr = jsonArray.getString(i);//前面定义了,要返回 String//因为初始化方法里面限定了返回值类型是 struct 结构体//所以在这个地方不能直接输出 action,这里 action 对应了我们要的字符串。需要用个字符串数组包装下

  • }

  • @Overridepublic void close() throws HiveException {

  • }}


接下来打包上传集群,然后在 hive 里创建函数,用这个包。


hive 里面函数有两种,一种是临时函数;一种是永久函数。因为我每次炸裂都要用它,所以我把它设置为永久函数。


同时注意下,hive 的永久函数和数据库名有关系。


4)创建函数


(1)打包


(2)将 hivefunction-1.0-SNAPSHOT.jar 上传到集群,然后再将该 jar 包上传到 HDFS 的/user/hive/jars 路径下


[myself@hadoop1 module] hadoop fs -put hivefunction-1.0-SNAPSHOT.jar /user/hive/jars(3)创建永久函数与开发好的 java class 关联 hive (gmall)>create function explode_json_array as 'com.alibaba.hive.udtf.ExplodeJSONArray' using jar 'hdfs://hadoop102:8020/user/hive/jars/hive0621-1.0-SNAPSHOT.jar'; as 全类名 hdfs 协议+存放 jar 包的 hdfs 路径


创建完之后,查看下:永久函数要带上库名,通过库名来找,因为我建立的库是 gmall,所以从 g 开头来找。show functions ;可以看到如下结果:gmall.explode_json_array


进行测试查询:select explode_json_array('[{"action_id":"cart_add","item":"7","item_type":"sku_id","ts":1592270774653},{"action_id":"cart_add","item":"7","item_type":"sku_id","ts":1592270774653}]');


输出结果,成功:"{""action_id"":""cart_add"",""item"":""7"",""item_type"":""sku_id"",""ts"":1592270774653}""{""action_id"":""cart_add"",""item"":""7"",""item_type"":""sku_id"",""ts"":1592270774653}"


select explode_json_array('[{"action_id":"cart_add","item":"7","item_type":"sku_id","ts":1592270774653},{"action_id":"cart_add","item":"7","item_type":"sku_id","ts":1592270774653}]') as aaaa;


(4)注意:如果修改了自定义函数重新生成 jar 包怎么处理?只需要替换 HDFS 路径上的旧 jar 包,然后重启 Hive 客户端即可。


转载于进击的大数据初学者


用户头像

关注尚硅谷,轻松学IT 2021.11.23 加入

还未添加个人简介

评论

发布
暂无评论
(转)大数据开发之Hive中UDTF函数