(转)大数据开发之 Hive 中 UDTF 函数
动作日志解析思路:
动作日志表中每行数据对应用户的一个动作记录,一个动作记录应当包含公共信息、页面信息以及动作信息。先将包含 action 字段的大数据培训日志过滤出来,然后通过 UDTF 函数,将 action 数组“炸开”(类似于 explode 函数的效果),然后使用 get_json_object 函数解析每个字段。
真实情况下,一条日志是存在多个动作的. 和下面这条日志很类似:
{"common":{"ar":"110000","ba":"Xiaomi","ch":"oppo","md":"Xiaomi 9","mid":"mid_128","os":"Android 10.0","uid":"160","vc":"v2.1.134"},"displays":[ 这是一个 json 数组,一对多的关系{"displayType":"activity","item":"1","item_type":"activity_id","order":1},{"displayType":"activity","item":"2","item_type":"activity_id","order":2,{"displayType":"promotion","item":"10","item_type":"sku_id","order":3},{"displayType":"query","item":"4","item_type":"sku_id","order":4},{"displayType":"query","item":"3","item_type":"sku_id","order":5},{"displayType":"recommend","item":"8","item_type":"sku_id","order":6},{"displayType":"promotion","item":"6","item_type":"sku_id","order":7},{"displayType":"promotion","item":"5","item_type":"sku_id","order":8},{"displayType":"query","item":"9","item_type":"sku_id","order":9},{"displayType":"query","item":"2","item_type":"sku_id","order":10},{"displayType":"query","item":"3","item_type":"sku_id","order":11}],"page":{"during_time":17802,"page_id":"home"},"ts":1592125099885}
UDTF 思想:
ods 层 ods_log 那张表,动作信息的日志中包含 3 大块:
普通信息 页面信息 动作信息
c1(用这个字段表示) p1(用这个字段表示) actionsa1,a2,a3
c2(用这个字段表示) p2(用这个字段表示) actionsa4,a5
上面这种表示是 ods 层 log 那张表表示的数据粒度。
但是 dwd 层的表,我们期望的数据粒度呢:
普通信息 页面信息 炸开动作信息
c1 p1 a1
c1 p1 a2
c1 p1 a3
c2 p2 a4
c2 p2 a5
select
common,
page,
action
from ods_log
lateral view explode(actions) tmp as action; tmp 是别名的意思,action 是列别名的意思。explode 是 hive 中的炸裂函数,只能炸 hive 中的数组。
但是此时 actions[a1,a2,a3]是 json 字符串数组,里面每一个元素都是对象。
写函数,首先想,输入参数是什么类型;输出参数是什么类型。
hive udtf 官网:
https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF
1)建表语句
2)创建 UDTF 函数——设计思路
3)创建 UDTF 函数——编写代码
(1)创建一个 maven 工程:hivefunction
(2)创建包名:com.alibaba.hive.udtf
(3)引入如下依赖
public class ExplodeJSONArray extends GenericUDTF {
}
点进去可以看见初始化方法 initialize(),这个方法不是抽象方法,所以当我们实现上面 GenericUDTF 接口时就只有 2 个方法.
public StructObjectInspector initialize(StructObjectInspector argOIs)throws UDFArgumentException {List<? extends StructField> inputFields = argOIs.getAllStructFieldRefs();
}
所以我们要重写父类的方法: idea 快捷键,ctrl+ o
public class ExplodeJSONArray extends GenericUDTF {/*** 初始化方法,里面要做三件* 1.约束函数传入参数的个数* 2.约束函数传入参数的类型* 3.约束函数返回值的类型*/@Overridepublic StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException { Struct 结构体,一个结构体,里面有多个属性名:属性值。结构体,类似 java 中的对象。return super.initialize(argOIs);}
}
由于我们引入了 hive 依赖,我们打开依赖看下,
我们这里用这个:
完整的代码:
package com.alibaba.hive.udtf;
import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;import org.apache.hadoop.hive.ql.exec.UDFArgumentException;import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;import org.apache.hadoop.hive.ql.metadata.HiveException;import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;import org.apache.hadoop.hive.serde2.objectinspector.StructField;import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;import org.json.JSONArray;
import java.util.ArrayList;import java.util.List;
/**
@author zhouyanjun
@create 2021-06-21 22:02/public class ExplodeJSONArray extends GenericUDTF {/*
初始化方法,里面要做三件事
1.约束函数传入参数的个数
2.约束函数传入参数的类型
3.约束函数返回值的类型*/@Overridepublic StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException { //返回结构体类型。udtf 函数,有可能炸开之后形成多列。所以用返回结构体来封装。属性名:属性值。属性名就是列名;属性值就是列的类型。//用结构体,来 1.约束函数传入参数的个数// List<? extends StructField> allStructFieldRefs = argOIs.getAllStructFieldRefs();//见名知意,获取结构体所有属性的引用 可以看见返回值是个 list 类型.
if(argOIs.getAllStructFieldRefs().size()!=1){ //只要个数不等于 1,就抛出异常 throw new UDFArgumentLengthException("explode_json_array()函数的参数个数只能为 1");}
//2.约束函数传入参数的类型// StructField structField = argOIs.getAllStructFieldRefs().get(0);//只能有一个参数,所以 index 给 0 可以看见,是获得结构体的属性// ObjectInspector fieldObjectInspector = argOIs.getAllStructFieldRefs().get(0).getFieldObjectInspector();//获得属性的对象检测器 。通过检查器我们才能知道是什么类型.String typeName = argOIs.getAllStructFieldRefs().get(0).getFieldObjectInspector().getTypeName();//我们要确保传入的类型是 stringif(!"string".equals(typeName)){throw new UDFArgumentTypeException(0,"explode_json_array 函数的第 1 个参数的类型只能为 String."); //抛出异常}
//3.约束函数返回值的类型 List<String> fieldNames = new ArrayList<>(); //② 表示我建立了一个 String 类型的集合。表示存储的列名
List<ObjectInspector> fieldOIs = new ArrayList<>(); //②fieldNames.add("item"); //炸裂之后有个列名,如果不重新 as,那这个 item 就是列名//不知道怎么写的时候,上官网去抄示例代码 fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); //表示 item 这一列是什么类型.基本数据类型工厂类,获取了个 string 类型的检查器
//用一个工厂类获取 StructObjectInspector 类型。return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);//①获取标准结构体检查器。fieldNames,fieldOI 是两个变量名
}
@Override //这里是实现主逻辑的方法。首先分析下需求:把 json array 字符串变成一个 json 字符串 public void process(Object[] args) throws HiveException { //形参类型是 object 数组//1 获取函数传入的 jsonarray 字符串 String jsonArrayStr = args[0].toString(); //我要把 jsonArrayStr 字符串划分为一个一个的 json,通过字符串这种类型是不好划分的。不知道如何 split 切分//2 将 jsonArray 字符串转换成 jsonArray 数组。正常情况下我们要引入依赖,比如 fastjson 啥的。JSONArray jsonArray = new JSONArray(jsonArrayStr); //通过 JSONArray 这种类型,我们就比较容易获得一条条的 json 字符串//3 得到 jsonArray 里面的一个个 json,并把他们写出。将 actions 里面的一个个 action 写出 for (int i = 0; i < jsonArray.length(); i++) { //普通 for 循环进行遍历 String jsonStr = jsonArray.getString(i);//前面定义了,要返回 String//因为初始化方法里面限定了返回值类型是 struct 结构体//所以在这个地方不能直接输出 action,这里 action 对应了我们要的字符串。需要用个字符串数组包装下
}
@Overridepublic void close() throws HiveException {
}}
接下来打包上传集群,然后在 hive 里创建函数,用这个包。
hive 里面函数有两种,一种是临时函数;一种是永久函数。因为我每次炸裂都要用它,所以我把它设置为永久函数。
同时注意下,hive 的永久函数和数据库名有关系。
4)创建函数
(1)打包
(2)将 hivefunction-1.0-SNAPSHOT.jar 上传到集群,然后再将该 jar 包上传到 HDFS 的/user/hive/jars 路径下
[myself@hadoop1 module] hadoop fs -put hivefunction-1.0-SNAPSHOT.jar /user/hive/jars(3)创建永久函数与开发好的 java class 关联 hive (gmall)>create function explode_json_array as 'com.alibaba.hive.udtf.ExplodeJSONArray' using jar 'hdfs://hadoop102:8020/user/hive/jars/hive0621-1.0-SNAPSHOT.jar'; as 全类名 hdfs 协议+存放 jar 包的 hdfs 路径
创建完之后,查看下:永久函数要带上库名,通过库名来找,因为我建立的库是 gmall,所以从 g 开头来找。show functions ;可以看到如下结果:gmall.explode_json_array
进行测试查询:select explode_json_array('[{"action_id":"cart_add","item":"7","item_type":"sku_id","ts":1592270774653},{"action_id":"cart_add","item":"7","item_type":"sku_id","ts":1592270774653}]');
输出结果,成功:"{""action_id"":""cart_add"",""item"":""7"",""item_type"":""sku_id"",""ts"":1592270774653}""{""action_id"":""cart_add"",""item"":""7"",""item_type"":""sku_id"",""ts"":1592270774653}"
select explode_json_array('[{"action_id":"cart_add","item":"7","item_type":"sku_id","ts":1592270774653},{"action_id":"cart_add","item":"7","item_type":"sku_id","ts":1592270774653}]') as aaaa;
(4)注意:如果修改了自定义函数重新生成 jar 包怎么处理?只需要替换 HDFS 路径上的旧 jar 包,然后重启 Hive 客户端即可。
转载于进击的大数据初学者
评论