写点什么

OUT 了吧,Kafka 能实现消息延时了

  • 2022 年 6 月 28 日
  • 本文字数:2663 字

    阅读完需:约 9 分钟

OUT了吧,Kafka能实现消息延时了

本文分享自华为云社区《Kafka也能实现消息延时了?》,作者:HuaweiCloudDeveloper 。

1、背景


Kafka 是一个拥有高吞吐、可持久化、可水平扩展,支持流式数据处理等多种特性的分布式消息流处理中间件,采用分布式消息发布与订阅机制,在日志收集、流式数据传输、在线/离线系统分析、实时监控等领域有广泛的应用,Kafka 它虽有以上这么多的应用场景和优点,但也具备其缺陷,比如在延时消息场景下,Kafka 就不具备这种能力,因此希望能在保存 Kafka 特有能力的情况下给 Kafka 扩充一个具有能处理延时消息场景的能力。

2、开发环境


3、云服务介绍


分布式消息服务 Kafka 版:MySQL 是目前最受欢迎的开源数据库之一,其性能卓越,搭配 LAMP(Linux + Apache + MySQL + Perl/PHP/Python),成为 WEB 开发的高效解决方案。云数据库 RDS for MySQL 拥有稳定可靠、安全运行、弹性伸缩、轻松管理、经济实用等特点。

4、方案设计

i、方案简述


此方案实现,需要借助两个 Topic 来进行实现,一个 Topic 用于及时接收生产者们所产生的消息,另一个 Topic 则用于消费者拉取消息进行消费。另外在这两个 Topic 之间加上一个队列用于做延时的逻辑判断,如果消息满足了延时的条件,则将队列中的消息生产至我们的消费者需要拉取的 Topic 中。

ii、方案架构图

Kafka 消息延时方案架构图



Kafka 消息延时实现思路


  1. 生产者将生产消息存入 topic_delay 主题中进行存储。

  2. 将 topic_delay 主题中的所有消息拉取至 ConcurrentLinkedQueue 队列中。

  3. 取值判断是否满足延时要求。

    a. 如果满足延时要求,则将消息生产至 topic_out 主题中,并将 queue 队列中的值移除。

    b. 如果不满足延时要求,则等待自定义时间后重试判断。

  4. 消费者最终从 topic_out 主题中拉取消息进行消费。

iii、方案时序图


Kafka 消息延时方案时序图


5、代码参数指南


本项目中起到延时作用的类 Delay.java 其余类为官方提供用于测试生产和消费消息, 如需使用官方测试的使用的生产消费代码相关配置介绍可以参考 https://support.huaweicloud.com/devg-kafka/how-to-connect-kafka.html 。如需使用自己配置的生产者消费者,只配置 Delay.java 中的参数即可。

延迟.java 参数详情


  1. delay:自定义延时时间,单位。

  2. topic_delay 变量:用于临时存储消息的 topic 名称。

  3. topic_out 变量:用于消费者拉取消息消费的 topic 名称。

  4. 关于消费者和生产者配置可按需配置,可参考 Kafka 官方文档:https://kafka.apache.org/documentation/#producerconfigs

6、代码实现


实现代码可参考Kafka消息延时



package com.dms.delay;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Hello world!
*
*/
public class Delay {
//缓存队列
public static ConcurrentLinkedQueue<ConsumerRecord<String, String>> link = new ConcurrentLinkedQueue();
//延迟时间(20秒),可根据需要设置延迟大小
public static long delay = 20000L;
/**
*入口
* @param args
*/
public static void main( String[] args )
{
//延时主题(用于控制延时缓冲)
String topic_delay = "topic_delay";
//输出主题(直接供消费者消费)
String topic_out = "topic_out";
/*
消费线程
*/
new Thread(new Runnable() {
@Override
public void run() {
//消费者配置。请根据需要自行设置Kafka配置
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.0.59:9092,192.168.0.185:9092,192.168.0.4:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//指定消费主题
consumer.subscribe(Arrays.asList(topic_delay));
while (true) {
//轮询消费
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10));
//遍历当前轮询批次拉取到的消息
for (ConsumerRecord<String, String> record : records){
System.out.println(record);
//将消息添加到缓存队列
link.add(record);
}
}
}
}).start();
/*
生产线程
*/
new Thread(new Runnable() {
@Override
public void run() {
//生产者配置(请根据需求自行配置)
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.0.59:9092,192.168.0.185:9092,192.168.0.4:9092");
props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//创建生产者
Producer<String, String> producer = new KafkaProducer<>(props);
//持续从缓存队列中获取消息
while(true){
//如果缓存队列为空则放缓取值速度
if(link.isEmpty()){
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
//获取缓存队列栈顶消息
ConsumerRecord<String, String> record = link.peek();
//获取该消息时间戳
long timestamp = record.timestamp();
Date now = new Date();
long nowTime = now.getTime();
if(timestamp+ Delay.delay <nowTime){
//获取消息值
String value = record.value();
//生产者发送消息到输出主题
producer.send(new ProducerRecord<String, String>(topic_out, "",value));
//从缓存队列中移除该消息
link.poll();
}else {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}).start();
}
}
复制代码

7、结果反馈



点击关注,第一时间了解华为云新鲜技术~

发布于: 刚刚阅读数: 3
用户头像

提供全面深入的云计算技术干货 2020.07.14 加入

华为云开发者社区,提供全面深入的云计算前景分析、丰富的技术干货、程序样例,分享华为云前沿资讯动态,方便开发者快速成长与发展,欢迎提问、互动,多方位了解云计算! 传送门:https://bbs.huaweicloud.com/

评论

发布
暂无评论
OUT了吧,Kafka能实现消息延时了_云计算_华为云开发者联盟_InfoQ写作社区