OUT 了吧,Kafka 能实现消息延时了

本文分享自华为云社区《Kafka也能实现消息延时了?》,作者:HuaweiCloudDeveloper 。
1、背景
Kafka 是一个拥有高吞吐、可持久化、可水平扩展,支持流式数据处理等多种特性的分布式消息流处理中间件,采用分布式消息发布与订阅机制,在日志收集、流式数据传输、在线/离线系统分析、实时监控等领域有广泛的应用,Kafka 它虽有以上这么多的应用场景和优点,但也具备其缺陷,比如在延时消息场景下,Kafka 就不具备这种能力,因此希望能在保存 Kafka 特有能力的情况下给 Kafka 扩充一个具有能处理延时消息场景的能力。
2、开发环境
3、云服务介绍
分布式消息服务 Kafka 版:MySQL 是目前最受欢迎的开源数据库之一,其性能卓越,搭配 LAMP(Linux + Apache + MySQL + Perl/PHP/Python),成为 WEB 开发的高效解决方案。云数据库 RDS for MySQL 拥有稳定可靠、安全运行、弹性伸缩、轻松管理、经济实用等特点。
4、方案设计
i、方案简述
此方案实现,需要借助两个 Topic 来进行实现,一个 Topic 用于及时接收生产者们所产生的消息,另一个 Topic 则用于消费者拉取消息进行消费。另外在这两个 Topic 之间加上一个队列用于做延时的逻辑判断,如果消息满足了延时的条件,则将队列中的消息生产至我们的消费者需要拉取的 Topic 中。
ii、方案架构图
Kafka 消息延时方案架构图
Kafka 消息延时实现思路
生产者将生产消息存入 topic_delay 主题中进行存储。
将 topic_delay 主题中的所有消息拉取至 ConcurrentLinkedQueue 队列中。
取值判断是否满足延时要求。
a. 如果满足延时要求,则将消息生产至 topic_out 主题中,并将 queue 队列中的值移除。
b. 如果不满足延时要求,则等待自定义时间后重试判断。
消费者最终从 topic_out 主题中拉取消息进行消费。
iii、方案时序图
Kafka 消息延时方案时序图
5、代码参数指南
本项目中起到延时作用的类 Delay.java 其余类为官方提供用于测试生产和消费消息, 如需使用官方测试的使用的生产消费代码相关配置介绍可以参考 https://support.huaweicloud.com/devg-kafka/how-to-connect-kafka.html 。如需使用自己配置的生产者消费者,只配置 Delay.java 中的参数即可。
延迟.java 参数详情
delay:自定义延时时间,单位。
topic_delay 变量:用于临时存储消息的 topic 名称。
topic_out 变量:用于消费者拉取消息消费的 topic 名称。
关于消费者和生产者配置可按需配置,可参考 Kafka 官方文档:https://kafka.apache.org/documentation/#producerconfigs
6、代码实现
实现代码可参考Kafka消息延时
7、结果反馈
版权声明: 本文为 InfoQ 作者【华为云开发者联盟】的原创文章。
原文链接:【http://xie.infoq.cn/article/b6b8f7a14bbdb38add6228e0f】。文章转载请联系作者。










评论