写点什么

Flink 计算 PV UV

用户头像
shengjk1
关注
发布于: 2021 年 03 月 23 日

前言

使用 flink 很长一段时间了,突然发现竟然没有计算过 pv uv,这可是 flink 常见的计算场景了,面试时也是常问题之一。故自己想了一个场景来计算一下。

基于 Flink 1.12


场景

外卖员听单的信息会发到单独一个 topic 中,计算一个每天有多少个 外卖员听单以及总共的听单次数。


kafka 中消息类型


{"locTime":"2020-12-28 12:32:23","courierId":12,"other":"aaa"}
复制代码

locTime:事件发生的时间,courierId 外卖员 id


计算一天有多少个外卖员听单( UV ),总共听单多少次( PV )


代码

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(topics, new SimpleStringSchema(), properties);		FlinkHelp.setOffset(parameter, consumer);		consumer.assignTimestampsAndWatermarks(				WatermarkStrategy.<String>forMonotonousTimestamps()						.withTimestampAssigner(new SerializableTimestampAssigner<String>() {							@Override							public long extractTimestamp(String element, long recordTimestamp) {								String locTime = "";								try {									Map<String, Object> map = Json2Others.json2map(element);									locTime = map.get("locTime").toString();								} catch (IOException e) {								}								LocalDateTime startDateTime =										LocalDateTime.parse(locTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));								long milli = startDateTime.toInstant(OffsetDateTime.now().getOffset()).toEpochMilli();								return milli;							}						}).withIdleness(Duration.ofSeconds(1)));
env.addSource(consumer).filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { return true; } }).windowAll(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .allowedLateness(Time.minutes(1))// .trigger(CountTrigger.of(5))// 其实多个 trigger 就是下一个 trigger 覆盖上一个 trigger //用 event time 可能会导致 window 延迟触发,最好的解决办法是在 processingTime 的基础上添加对窗口的判断 // watermark 不会回退,所以如果消息早到的话( 乱序了,该相对来说晚到的消息早到了),可能会导致窗口延迟触发 // 夸张一点的话,窗口不触发了,直到有大于等于 watermark + triggerTime 的消息到达 // ContinuousProcessingTimeTrigger 一样 .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30))) //追历史数据的时候会有问题,可能历史数据不足 10s 就全部消费完毕,导致窗口不会被触发而被跳过,消费同理// .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10))) //处理完毕后将 window state 中的数据清除掉 // 其实完全可以通过自定义 trigger 来达到 clear windowState 的目的 (Purge) .evictor(TimeEvictor.of(Time.seconds(0), true)) .process(new ProcessAllWindowFunction<String, String, TimeWindow>() { private JedisCluster jedisCluster; private MapState<String, String> courierInfoMapState; private MapStateDescriptor<String, String> mapStateDescriptor; private MapStateDescriptor<String, Long> mapStateUVDescriptor; private MapState<String, Long> courierInfoUVMapState; private MapStateDescriptor<String, Long> mapStatePVDescriptor; private MapState<String, Long> courierInfoPVMapState; private String beforeDay = ""; private String currentDay = "";
@Override public void open(Configuration parameters) throws Exception { StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(org.apache.flink.api.common.time.Time.hours(25)) //default,不支持 eventTime 1.12.0 .setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime) .cleanupInRocksdbCompactFilter(1000) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build();
mapStateDescriptor = new MapStateDescriptor<String, String>("courierInfos", TypeInformation.of(String.class), TypeInformation.of(String.class)); mapStateDescriptor.enableTimeToLive(ttlConfig); courierInfoMapState = getRuntimeContext().getMapState(mapStateDescriptor);
mapStateUVDescriptor = new MapStateDescriptor<String, Long>("courierUVStateDesc", TypeInformation.of(String.class), TypeInformation.of(Long.class)); mapStateUVDescriptor.enableTimeToLive(ttlConfig); courierInfoUVMapState = getRuntimeContext().getMapState(mapStateUVDescriptor);
mapStatePVDescriptor = new MapStateDescriptor<String, Long>("courierPVStateDesc", TypeInformation.of(String.class), TypeInformation.of(Long.class)); mapStatePVDescriptor.enableTimeToLive(ttlConfig); courierInfoPVMapState = getRuntimeContext().getMapState(mapStatePVDescriptor);

jedisCluster = RedisUtil.getJedisCluster(redisHp); }
@Override public void close() throws Exception { RedisUtil.closeConn(jedisCluster); }
@Override public void process(Context context, Iterable<String> elements, Collector<String> out) throws Exception { Iterator<String> iterator = elements.iterator(); TimeWindow window = context.window(); System.out.println(" window = " + DateUtils.millisecondsToDateStr(window.getStart(), "yyyy-MM-dd HH:mm:ss") + "-" + DateUtils.millisecondsToDateStr(window.getEnd(), "yyyy-MM-dd HH:mm:ss")); while (iterator.hasNext()) { Map<String, Object> map = Json2Others.json2map(iterator.next()); String courierId = map.get("courierId").toString(); String day = map.get("locTime").toString().split(" ")[0].replace("-", ""); if (courierInfoPVMapState.contains(day)) { courierInfoPVMapState.put(day, courierInfoPVMapState.get(day) + 1); } else { courierInfoPVMapState.put(day, 1L); } if (!courierInfoMapState.contains(day + "-" + courierId)) { if (courierInfoUVMapState.contains(day)) { courierInfoUVMapState.put(day, courierInfoUVMapState.get(day) + 1); } else { courierInfoUVMapState.put(day, 1L); } courierInfoMapState.put(day + "-" + courierId, ""); } currentDay = day; }
HashMap<String, String> map = new HashMap<String, String>(); if (currentDay.equals(beforeDay)) { map.put(currentDay + "-pv", courierInfoPVMapState.get(currentDay).toString()); map.put(currentDay + "-uv", courierInfoUVMapState.get(currentDay).toString());
} else { map.put(currentDay + "-pv", courierInfoPVMapState.get(currentDay).toString()); map.put(currentDay + "-uv", courierInfoUVMapState.get(currentDay).toString()); //超过25个小时,昨天的数据就不对了 if (!beforeDay.isEmpty()) { map.put(beforeDay + "-pv", courierInfoPVMapState.get(beforeDay).toString()); map.put(beforeDay + "-uv", courierInfoUVMapState.get(beforeDay).toString()); } } map.forEach((k, v) -> { System.out.println(k + ":" + v); }); jedisCluster.hmset("test_courier_puv:", map); jedisCluster.expire("test_courier_puv:", 3 * 24 * 60 * 60);
beforeDay = currentDay;
} });
复制代码


结果样例


20201227-pv:1111111

20201227-uv:111


发布于: 2021 年 03 月 23 日阅读数: 9
用户头像

shengjk1

关注

还未添加个人签名 2018.04.26 加入

博客 https://blog.csdn.net/jsjsjs1789

评论

发布
暂无评论
Flink 计算 PV UV