写点什么

spark 调优(二):UDF 减少 JOIN 和判断

  • 2022 年 7 月 06 日
  • 本文字数:1255 字

    阅读完需:约 4 分钟

大家好,我是怀瑾握瑜,一只大数据萌新,家有两只吞金兽,嘉与嘉,上能 code 下能 teach 的全能奶爸

如果您喜欢我的文章,可以[关注⭐]+[点赞👍]+[评论📃],您的三连是我前进的动力,期待与您共同成长~



1. 起因

平时写 sql 语句的时候经常会有大表与小标做关联查询,然后再进行 group by 等逻辑分组处理,或者是有很多判断条件,sql 里有很多 if 语句,一些区间类的结构查询,这种 sql 语句直接放到 spark 上执行,会有大量的 shuffle,而且执行时间巨慢


尤其是大表和小标数据差距特别大,大表作为主要处理对象,进行 shuffle 和 map 的时候花费大量时间

2. 优化开始

2.1 改成 java 代码编写程序

首先的一个方法是用 java 代码编写 spark 程序,把所有的条件全部打散,或者小表做广播变量,然后每次处理数据时候在进行取值和判断


但这么会让代码可读性比较差,而且如果是用一些工具直接跑 sql 出计算结果,破坏程序整体性

2.2 使用 UDF

UDF(User-Defined Functions)即是用户定义的 hive 函数。hive 自带的函数并不能完全满足业务需求,这时就需要我们自定义函数了


我们这里只做最简单的 UDF,就是制作一个 hive 函数,然后在大表中查询的时候,直接去调用方法把当初需要关联才能获得数据直接返回


首先可以定义一个 udf 类


public class UDF implements UDF2<Long, Long, Long> {
Map<Long, TreeMap<Long, Long>> map;
public TripUDF(Broadcast<Map<Long, TreeMap<Long, Long>>> bmap) { this.map = bmap.getValue(); }
@Override public Long call(Long id, Long time) throws Exception { if (map.containsKey(terminalId)) { Map.Entry<Long, Long> a = map.get(id).floorEntry(time); Map.Entry<Long, Long> b = map.get(id).ceilingEntry(time); if (null != a && null != b) { if (a.getValue().equals(b.getValue())) { return a.getValue(); } } } return -1L; }}
复制代码


这个 UDF 方法就是先把小表的数据查询出来,做成 TreeMap,然后把范围都放进去,广播出去,再每次查询的时候,都用大表到这里去用 id 和 time 进行匹配,匹配成功就是要获得的结果


如果用 sql 去表达,大概就是,大表的 time 需要去匹配小表的时间段


tablea join tableb on tablea.id=tableb.id and tablea.time >= tableb.timeStart and tablea.time <= tableb.timeEnd
复制代码


然后 spark 去注册 UDF 方法


String udfMethod = "structureMap";spark.udf().register(udfMethod, new UDF(broadcast1), DataTypes.StringType);
复制代码


这样直接去查询大表,然后在特定字段使用 udf 方法,就可以直接获取相应的结果


select id,time,structureMap(id,time) as tag from tablea
复制代码


这样 tag 的最终结果就和直接关联 tableb 然后再获取其中的值是一样的结果,但具体执行的内容都交给 spark 去优化



结束语

如果您喜欢我的文章,可以[关注⭐]+[点赞👍]+[评论📃],您的三连是我前进的动力,期待与您共同成长~

可关注公众号【怀瑾握瑜的嘉与嘉】,获取资源下载方式

发布于: 刚刚阅读数: 3
用户头像

还未添加个人签名 2022.07.01 加入

还未添加个人简介

评论

发布
暂无评论
spark调优(二):UDF减少JOIN和判断_spark_怀瑾握瑜的嘉与嘉_InfoQ写作社区