写点什么

大数据 -77 Kafka 延时队列与消息重试机制全解析:从原理到实战落地 Java

作者:武子康
  • 2025-08-25
    山东
  • 本文字数:6781 字

    阅读完需:约 22 分钟

大数据-77 Kafka 延时队列与消息重试机制全解析:从原理到实战落地 Java

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 08 月 18 日更新到:Java-100 深入浅出 MySQL 事务隔离级别:读未提交、已提交、可重复读与串行化 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节我们完成了如下内容:


  • 生产者阶段丢失

  • 生产者 Broker 阶段丢失

  • 消费者丢失消息


延时队列

基本概念

两个 Follower 副本都已经拉取到了 Leader 副本的最新位置,此时又向 Leader 副本发送拉取请求,而 Leader 副本并没有新的消息写入,那么此时 Leader 副本应该如何处理问题呢?可以直接返回空的拉取结果给 Follower 副本,不过在 Leader 副本一直没有新消息写入的情况下,Follower 副本会一直发送拉取请求,并且总收到空的拉取结果,消耗资源。



Kafka 在处理拉取请求时,会先读取一次日志文件,如果收集不到足够多(fetchMinBytes),由参数(fetch.min.bytes 配置,默认为 1),那么就会创建一个延时拉取的操作(DelyedFetch)以等待拉取足够数量的消息。当延时拉取操作执行时,会再读取一次日志,然后将拉去结果返回给 Follower 副本。


延时操作不只是拉取消息时特有的操作,在 Kafka 中也有多种演示操作,比如延时数据删除、延时生产等。对于延时生产而言,如果在使用生产者客户端发送消息的时候将 acks 设置为-1,那么意味着需要等待 ISR 集合中所有副本都确认收到消息之后才能正确的响应结果,或者捕获超时异常。



假设某个分区有 3 个副本,Leader、Follower1 和 Follower2,他们都在分区的 ISR 集合中。不考虑 ISR 变动的情况,Kafka 在收到客户端的生产请求之后,将消息 3 和合消息 4 写入 Leader 副本的本地日志文件。


由于客户端设置了 acks=-1,那么需要等到 Follower1 和 Follower2 两个副本都收到消息 3 和消息 4 才能告知客户端正确的接收了所发送的消息。如果在一定时间内,Follower1 副本和 Follower2 副本没有能够完全拉取到消息 3 和消息 4,那么就需要返回超时异常给客户端。生产请求的超时时间由参数:request.timeout.ms 配置,默认值为 3000,即 30 秒。



那么这里等待消息 3 和消息 4 写入 Follower1 副本和 Follower2 副本,并返回相应的响应结果给客户端的动作是由谁来执行的?在将消息写入 Leader 副本的本地日志文件之后,Kafka 会创建一个延时的生产操作 DelayedProduce,用来处理消息正常写入所有副本或超时情况,以返回相应的响应结果给客户端。

延时操作

延时操作需要延时反应响应的结果,首先它必须有一个超时时间(delayMs),如果在这个超时时间内没有完成既定的任务,那么就需要强制完成以返回响应结果给客户端。其次,延时操作不同与定时操作,定时操作是指在特定时间之后执行的操作,而延时操作可以在所设定的超时时间之前完成,所以延时操作能够支持外部事件的触发。

延时生产操作

它的外部事件是所要写入消息的某个分区 HW(HighWatermark)发生增长。也就是说,随着 Follower 副本不断的与 Leader 副本进行消息同步,进而促使 HW 进一步增长,HW 每增长一次都会检测是否能够完成次延时生产操作,如果可以就执行以此返回响应结果给客户端,如果在超时时间内始终无法完成,则强制执行。

延迟拉取操作

是由超时触发或外部事件触发而被执行的,超时触发很好理解,就要等到超时时间之后触发第二次读取日志文件的操作。外部事件触发就稍微复杂了一些,因为拉取请求不单单由 Follower 副本发起,也可以由消费者客户端发起,两种情况所对应的外部事件也不同的。如果是 Follower 副本的延时拉取,它的外部事件就是消息追加到了 Leader 副本的本地日志文件中,如果是消费者客户端的延时拉取,它的外部事件可以简单的理解为 HW 的增长。

实现方式

  • 时间轮实现延时队列:TimeWheel,size,每个单元格的事件,每个单元格都代表一个时间,size*每个单元格的时间就是一个周期。

重试队列

基本概念

Kafka 没有重试机制不支持消息重试,也没有死信队列,因此使用 Kafka 做消息队列时,需要自己实现消息重试的功能。

如何实现

创建新的 Kafka 主题作为重试队列:


  • 创建一个 topic 作为重试 topic,用于接收等待重试的消息

  • 普通 topic 消费者设置等待重试消息的下一个重试 topic

  • 从重试 topic 获取等待重试消息存储到 redis 的 zset 中,并以下一次消费时间排序

  • 定时任务从 Redis 获取到达消费事件的消息,并把消息发送到对应的 topic

  • 同一个消息重试次数过多则不再重试

代码实现

新建项目

由于重复了很多次,且没什么技术难度,这里跳过。我们新建一个 Maven 项目。

POM.xml

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId> <artifactId>springboot-kafka</artifactId> <version>1.0-SNAPSHOT</version>
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.73</version> </dependency> </dependencies>
<build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
</project>
复制代码

修改配置

spring:  application:    name: kafka-test  kafka:    bootstrap-servers: h121.wzk.icu:9092    producer:      key-serializer: org.apache.kafka.common.serialization.IntegerSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer    consumer:      key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer      auto-offset-reset: earliest  redis:    database: 0    host: h121.wzk.icu    port: 6379    password:    lettuce:      pool:        min-idle: 8        max-idle: 500        max-active: 2000        max-wait: 10000    timeout: 5000
server: port: 8085
复制代码

启动类

@SpringBootApplicationpublic class StartApp {
public static void main(String[] args) { SpringApplication.run(StartApp.class, args); }
}
复制代码

AppConfig

@Configurationpublic class AppConfig {
@Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); // 配置连接⼯⼚ template.setConnectionFactory(factory); return template; }
}
复制代码

KafkaController

@RestControllerpublic class KafkaController {
@Resource private KafkaService kafkaService;
private static final String TOPIC = "tp_demo_retry_01";
@RequestMapping("/send/{message}") public String sendMessage(@PathVariable String message) throws ExecutionException, InterruptedException { ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, message); String result = kafkaService.sendMessage(record); return result; }
}
复制代码

KafkaService

@Servicepublic class KafkaService {
@Resource private KafkaTemplate<String, String> kafkaTemplate;
public String sendMessage(ProducerRecord<String, String> record) throws ExecutionException, InterruptedException { SendResult<String, String> result = kafkaTemplate.send(record).get(); RecordMetadata metadata = result.getRecordMetadata(); String returnResult = metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset(); System.out.println("发送消息: " + returnResult); return returnResult; }
}
复制代码

ConsumerService

@Componentpublic class ConsumerListener {
@Resource private KafkaRetryService kafkaRetryService;
private static int index = 0;
@KafkaListener(topics = "tp_demo_retry_01", groupId = "wzkicu") public void consumer(ConsumerRecord<String, String> record) { try { // 业务处理 System.out.println("消费的消息: " + record); index ++; if (index % 2 == 0) { throw new Exception("重发消息"); } } catch (Exception e) { // 消息重试 kafkaRetryService.consumerLater(record); } }
}
复制代码

KafkaRetryService

@Servicepublic class KafkaRetryService {
/** * 消费失败后下一次消息的延迟时间(秒) */ private static final int[] RETRY_INTERVAL_SECONDS = { 10, 30, 1 * 60, 2 * 60, 5 * 60, 10 * 60, 30 * 60, 1* 60 * 60, 2 * 60 * 60 };
private static final String RETRY_TOPIC = "tp_demo_retry_02";
@Resource private KafkaTemplate<String, String> kafkaTemplate;
public void consumerLater(ConsumerRecord<String, String> record) { // 获取消息的已重试次数 int retryTimes = getRetryTimes(record); Date nextConsumerTime = getNextConsumerTime(retryTimes); if (null == nextConsumerTime) { return; } // 组织消息 RetryRecord retryRecord = new RetryRecord(); retryRecord.setNextTime(nextConsumerTime.getTime()); retryRecord.setTopic(record.topic()); retryRecord.setRetryTimes(retryTimes); retryRecord.setKey(record.key()); retryRecord.setValue(record.value()); // 转换字符串 String value = JSON.toJSONString(retryRecord); // 发到重试队列 kafkaTemplate.send(RETRY_TOPIC, null, value); }
/** * 获取消息已经重试的次数 */ private int getRetryTimes(ConsumerRecord record) { int retryTimes = -1; for (Header header :record.headers()) { if (RetryRecord.KEY_RETRY_TIMES.equals(header.key())) { ByteBuffer byteBuffer = ByteBuffer.wrap(header.value()); retryTimes = byteBuffer.getInt(); } } retryTimes ++; return retryTimes; }
/** * 获取等待重试的下一次消息时间 */ private Date getNextConsumerTime(int retryTimes) { // 重试次数超过上限 不再重试 if (RETRY_INTERVAL_SECONDS.length < retryTimes) { return null; } Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.SECOND, RETRY_INTERVAL_SECONDS[retryTimes]); return calendar.getTime(); }
}
复制代码

RetryListener

@Component@EnableSchedulingpublic class RetryListener {
private static final String RETRY_KEY_ZSET = "_retry_key"; private static final String RETRY_VALUE_MAP = "_retry_value";
@Resource private RedisTemplate<String, Object> redisTemplate;
@Resource private KafkaTemplate<String, String> kafkaTemplate;
private String topic = "tp_demo_retry_01";
@KafkaListener(topics = "tp_demo_retry_02", groupId = "wzkicu") public void consumer(ConsumerRecord<String, String> record) { System.out.println("需要重试的消息: " + record); RetryRecord retryRecord = JSON.parseObject(record.value(), RetryRecord.class); // 防止待重试消息太多撑爆redis 可以将重试消息按下一次重试时间分开存储到不同介质中 String key = UUID.randomUUID().toString(); redisTemplate.opsForHash().put(RETRY_VALUE_MAP, key, record.value()); redisTemplate.opsForZSet().add(RETRY_KEY_ZSET, key, retryRecord.getNextTime()); }
@Scheduled(fixedDelay = 2000) public void retryFromRedis() { System.out.println("retry redis"); long currentTime = System.currentTimeMillis(); // 时间倒序获取 Set<ZSetOperations.TypedTuple<Object>> typeTuples = redisTemplate .opsForZSet() .reverseRangeByScoreWithScores(RETRY_KEY_ZSET, 0, currentTime); // 移除取出的消息 redisTemplate.opsForZSet().removeRangeByScore(RETRY_KEY_ZSET, 0, currentTime); for (ZSetOperations.TypedTuple<Object> tuple : typeTuples) { String key = tuple.getValue().toString(); String value = redisTemplate.opsForHash().get(RETRY_VALUE_MAP, key).toString(); redisTemplate.opsForHash().delete(RETRY_VALUE_MAP, key); RetryRecord retryRecord = JSON.parseObject(value, RetryRecord.class); ProducerRecord record = retryRecord.parse(); ProducerRecord recordReal = new ProducerRecord( topic, record.partition(), record.timestamp(), record.key(), record.value(), record.headers() ); kafkaTemplate.send(recordReal); } }
}
复制代码

RetryRecord

@Datapublic class RetryRecord {
public static final String KEY_RETRY_TIMES = "retryTimes";
private String key; private String value; private Integer retryTimes; private String topic; private Long nextTime;
public ProducerRecord parse() { Integer partition = null; Long timestamp = System.currentTimeMillis(); List<Header> headers = new ArrayList<>(); ByteBuffer retryTimesBuffer = ByteBuffer.allocate(4); retryTimesBuffer.putInt(retryTimes); retryTimesBuffer.flip(); headers.add(new RecordHeader(RetryRecord.KEY_RETRY_TIMES, retryTimesBuffer)); ProducerRecord sendRecord = new ProducerRecord(topic, partition, timestamp, key, value, headers); return sendRecord; }}
复制代码

测试结果

Postman

控制台


发布于: 刚刚阅读数: 3
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-77 Kafka 延时队列与消息重试机制全解析:从原理到实战落地 Java_Java_武子康_InfoQ写作社区