写点什么

Flink 计算 TopN

用户头像
shengjk1
关注
发布于: 2021 年 03 月 23 日

前言

使用 flink 很长一段时间了,突然发现竟然没有计算过 topN,这可是 flink 常见的计算场景了, 故自己想了一个场景来计算一下。

基于 Flink 1.12


场景

外卖员听单的信息会发到单独一个 topic 中,计算一个每天有多少个 外卖员听单以及总共的听单次数。


kafka 中消息类型


{"locTime":"2020-12-28 12:32:23","courierId":12,"other":"aaa"}
复制代码


locTime:事件发生的时间,courierId 外卖员 id


计算一天中 听单次数 top5 的外卖员


代码


FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(topics, new SimpleStringSchema(), properties);		FlinkHelp.setOffset(parameter, consumer);		consumer.assignTimestampsAndWatermarks(				WatermarkStrategy.<String>forMonotonousTimestamps()						.withTimestampAssigner(new SerializableTimestampAssigner<String>() {							@Override							public long extractTimestamp(String element, long recordTimestamp) {								String locTime = "";								try {									Map<String, Object> map = Json2Others.json2map(element);									locTime = map.get("locTime").toString();								} catch (IOException e) {								}								LocalDateTime startDateTime =										LocalDateTime.parse(locTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));								long milli = startDateTime.toInstant(OffsetDateTime.now().getOffset()).toEpochMilli();								return milli;							}						}).withIdleness(Duration.ofSeconds(1)));
SingleOutputStreamOperator<CourierListenInfos> process = env.addSource(consumer).filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { return true; } }).keyBy(new KeySelector<String, String>() { @Override public String getKey(String value) throws Exception { Map<String, Object> map = Json2Others.json2map(value); String courierId = map.get("courierId").toString(); String day = map.get("locTime").toString().split(" ")[0].replace("-", ""); return day + "-" + courierId; } }).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .allowedLateness(Time.minutes(1))// .trigger(CountTrigger.of(5))// 其实多个 trigger 就是下一个 trigger 覆盖上一个 trigger .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30))) //追历史数据的时候会有问题// .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10))) //处理完毕后将 window state 中的数据清除掉 .evictor(TimeEvictor.of(Time.seconds(0), true)) .process(new ProcessWindowFunction<String, CourierListenInfos, String, TimeWindow>() { private JedisCluster jedisCluster; private ReducingStateDescriptor<Long> reducingStateDescriptor; private ReducingState<Long> listenCount;
@Override public void open(Configuration parameters) throws Exception { StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(org.apache.flink.api.common.time.Time.hours(25)) //default,不支持 eventTime 1.12.0 .setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime) .cleanupInRocksdbCompactFilter(1000) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build();
reducingStateDescriptor = new ReducingStateDescriptor<Long>("listenCount", new Sum(), TypeInformation.of(Long.class)); reducingStateDescriptor.enableTimeToLive(ttlConfig); listenCount = getRuntimeContext().getReducingState(reducingStateDescriptor);
jedisCluster = RedisUtil.getJedisCluster(redisHp); }
@Override public void close() throws Exception { RedisUtil.closeConn(jedisCluster); }
@Override public void process(String s, Context context, Iterable<String> elements, Collector<CourierListenInfos> out) throws Exception { Iterator<String> iterator = elements.iterator();
long l = context.currentProcessingTime(); long watermark = context.currentWatermark(); TimeWindow window = context.window();
String endDay = DateUtils.millisecondsToDateStr(window.getEnd(), "yyyyMMdd HH:mm:ss"); String startDay = DateUtils.millisecondsToDateStr(window.getStart(), "yyyyMMdd HH:mm:ss");
System.out.println("currentProcessingTime:" + l + " watermark:" + watermark + " windowTime:" + startDay + "-" + endDay);
while (iterator.hasNext()) { iterator.next(); listenCount.add(1L); }
iterator = elements.iterator(); Map<String, Object> map = Json2Others.json2map(iterator.next()); String courierId = map.get("courierId").toString(); String day = map.get("locTime").toString().split(" ")[0].replace("-", ""); out.collect(new CourierListenInfos(day, courierId, listenCount.get())); } });
process.keyBy(new KeySelector<CourierListenInfos, String>() { @Override public String getKey(CourierListenInfos value) throws Exception { return value.getDay(); } }).process(new KeyedProcessFunction<String, CourierListenInfos, String>() { private JedisCluster jedisCluster; private MapStateDescriptor<String, Long> mapStateCountDescriptor; private MapState<String, Long> courierInfoCountMapState; private boolean mucalc = false;
@Override public void open(Configuration parameters) throws Exception { StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(org.apache.flink.api.common.time.Time.hours(25)) //default,不支持 eventTime 1.12.0 .setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime) .cleanupInRocksdbCompactFilter(1000) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build();
mapStateCountDescriptor = new MapStateDescriptor<String, Long>("courierInfoCountMapState", TypeInformation.of(String.class), TypeInformation.of(Long.class)); mapStateCountDescriptor.enableTimeToLive(ttlConfig); courierInfoCountMapState = getRuntimeContext().getMapState(mapStateCountDescriptor);
jedisCluster = RedisUtil.getJedisCluster(redisHp); }
@Override public void close() throws Exception { RedisUtil.closeConn(jedisCluster); }
@Override public void processElement(CourierListenInfos value, Context ctx, Collector<String> out) throws Exception { courierInfoCountMapState.put(value.getDay() + "#" + value.getCourierId(), value.getListenCount());// System.out.println("ctx.timerService().currentWatermark() = " + DateUtils.millisecondsToDateStr(ctx.timerService().currentWatermark(), "yyyyMMdd HH:mm:ss"));// System.out.println("ctx.timestamp() = " + DateUtils.millisecondsToDateStr(ctx.timestamp(), "yyyyMMdd HH:mm:ss")); ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() / 1000 + 1000); }
@Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { String day = ctx.getCurrentKey(); PriorityQueue<CourierListenInfos> courierListenInfos = new PriorityQueue<>(new Comparator<CourierListenInfos>() { @Override public int compare(CourierListenInfos o1, CourierListenInfos o2) { return (int) (o1.listenCount - o2.listenCount); } });
Iterable<Map.Entry<String, Long>> entries = courierInfoCountMapState.entries(); for (Map.Entry<String, Long> entry : entries) {// System.out.println("entry.getKey() " + entry.getKey()); String[] split = entry.getKey().split("#", -1); courierListenInfos.offer(new CourierListenInfos(split[0], split[1], entry.getValue())); if (courierListenInfos.size() > 5) { courierListenInfos.poll(); } }
courierInfoCountMapState.clear(); String tops = ""; int size = courierListenInfos.size(); for (int i = 0; i < size; i++) { CourierListenInfos courierListenInfos1 = courierListenInfos.poll(); System.out.println("courierListenInfos1 " + courierListenInfos1); courierInfoCountMapState.put(courierListenInfos1.getDay() + "#" + courierListenInfos1.getCourierId(), courierListenInfos1.listenCount); tops = tops + courierListenInfos1.courierId + "#" + courierListenInfos1.listenCount; if (i != size - 1) { tops += ","; } }// System.out.println("courierListenInfos.poll() = " + tops); jedisCluster.hset("test_courier_tops", day + "-top5", tops); System.out.println("============"); } }).setParallelism(1);
复制代码


结果样例


'20201227-top5':'1#1111,2#2222,3#3333'

'20201227-top5':'1#1111,2#2222,3#3333'


发布于: 2021 年 03 月 23 日阅读数: 8
用户头像

shengjk1

关注

还未添加个人签名 2018.04.26 加入

博客 https://blog.csdn.net/jsjsjs1789

评论

发布
暂无评论
Flink 计算 TopN