点一下关注吧!!!非常感谢!!持续更新!!!
🚀 AI 篇持续更新中!(长期更新)
AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖
💻 Java 篇正式开启!(300 篇)
目前 2025 年 08 月 18 日更新到:Java-100 深入浅出 MySQL 事务隔离级别:读未提交、已提交、可重复读与串行化 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!
📊 大数据板块已完成多项干货更新(300 篇):
包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解
章节内容
上节我们完成了如下内容:
Kafka 事务配置
Kafka 事务语义
消息定义
事务概览
事务组
事务协调器等等
事务相关配置
BrokerConfigs
transactional.id.timeout.ms:在 ms 中,事务协调器在生产者 TranscationalId 提前过期之前等待的最长时间,并且没有从该生产者 TransactionalId 接收到任何任务状态更新,默认是 604800000(7 天),这允许每周一次的生产者作业维护它们的 ID。
max.transaction.timeout.ms:事务允许的最大超时,如果客户端请求的事务时间超过此时间,broker 将在 InitPidRequest 中返回 InvalidTransactionTimeout 错误,这可以防止客户端超时过大,从而导致用户无法从事务中包含的主题读取内容默认值为 900000(15 分钟),这是消息事务需要发送的事件的保守上限
transaction.state.log.replication.factor:事务状态 topic 的副本数量,默认值 3
transaction.state.log.num.partitions:事务状态主题的分区数,默认值 50
transaction.state.log.min.isr:事务状态主题每个分区 ISR 的最小数量 默认是 2
transaction.state.log.segement.bytes:事务状态主题的 Segment 大小,默认 104857600 字节
ProducerConfigs
enbale.idempotence:开启幂等
transaction.timeout.ms:事务超时时间,事务协调器在主动中止正在进行的事务之前等待生产者更新事务状态的最长时间,这个配置值将于 InitPidRequest 一起发送到事务协调器,如果该值大于 max.transaction.timeout,在 Broker 中设置 ms 时,请求将失败,并出现 InvalidTransactionTimeout 错误。默认是 60000,这使得交易不会阻塞下游消费超过一分钟,这在实时应用程序中通常是允许的。
transactional.id:用于事务性交付的 TransactionalId,这支持跨多个生产者会话的可靠性语义,因为它允许客户端确保使用相同 Transaction 的事务在启动任何新事务之前已经完成。如果没有提供 TransactionalId,则生产者仅限于幂等交付。
ConsumerConfigs
幂等性
基本流程
Kafka 在引入幂等性之前,Producer 向 Broker 发送消息,然后 Broker 将消息追加到消息流中后给 Producer 返回 ACK 信号值,实现流程如下:
生产中,会出现各种不确定的因素,比如在 Producer 在发送给 Broker 的时候出现网络异常。比如以下这种异常情况的出现:
上图这种情况,当 Producer 第一次发送消息给 Broker 时,Broker 消息(x2,y2)追加到消息中,但是在返回 ACK 信号给 Producer 时失败了(比如网络异常)。此时,Producer 端触发重试机制,将消息(x2,y2)重新发送给 Broker,Broker 接收到消息后,再次将该消息追加到消息流中,然后成功返回 ACK 信号给 Producer。这样下来,消息流中就被重复追加两条相同的(x2,y2)的消息。
幂等性
保证咋消息重发的时候,消费者不会重复处理,即使在消费者收到重复消息的时候,重复处理,也要保证最终结果的一致性。所谓幂等性,数学概念就是:f(f(x)) = f(x),f 函数表示对消费的处理比如,银行转账,如果失败,需要重试。不管重试多少次,都要保证结果一定一致的。
幂等性实现
添加唯一 Id,类似于数据库的主键,用于标记一个消息:Kafka 为了实现幂等性,它在底层设计架构中引入了 Producer 和 SequenceNumber
同样的,这是一种理想状态下的发送流程。实际情况下,会有很多不确定的因素,比如 Broker 在发送 ACK 信号给 Producer 时出现了网络异常,导致发送失败。异常情况如下图所示:
当 Producer 发送消息(x2,y2)给 Broker 时,Broker 接收到消息并将其追加到消息流中。此时,Broker 返回 ACK 信号给 Producer 时,发生异常导致 Producer 接收 ACK 信号失败。对于 Producer 来说,会触发重试机制,将消息(x2,y2)再次发送,但是,由于引入幂等性,在每条消息中附带了 PID(ProducerID)和 SequenceNumber。相同的 PID 和 SequenceNumber 发送给 Broker,而之前 Broker 缓存之前发送过的相同的消息,那么在消息流中的消息就只有一条(x2,y2),不会出现重复发送的情况。
// 实例化⼀个Producer对象
Producer<String, String> producer = new KafkaProducer<>(props);
复制代码
在 org.apache.kafka.clients.producer.iinternals.Sender 类中,在 run()中有一个 maybeWaitForPid()方法,用来生成一个 ProducerID,实现代码如下:
private void maybeWaitForPid() {
if (transactionState == null) {
return;
}
while (!transactionState.hasPid()) {
try {
Node node = awaitLeastLoadedNodeReady(requestTimeout);
if (node != null) {
ClientResponse response = sendAndAwaitInitPidRequest(node);
if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) {
InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();
transactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
} else {
log.error("Received an unexpected response type for an InitPidRequest from {}. We will back off and try again.", node);
}
} else {
log.debug("Could not find an available broker to send InitPidRequest to. We will back off and try again.");
}
} catch (Exception e) {
log.warn("Received an exception while trying to get a pid. Will back off and retry.", e);
}
log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs);
time.sleep(retryBackoffMs);
metadata.requestUpdate();
}
}
复制代码
事务操作
基本介绍
在 Kafka 事务中,一个原子性操作,根据操作类型可以分为 3 中情况,情况如下:
只要 Producer 生产消息,这种场景需要事务的介入
消费消息和生产消息并存,比如 Consumer&Producer 模式,这种场景是一般 Kafka 项目中比较常见的模式,需要事务介入。
只有 Consumer 消费消息,这种操作在实际项目中意义不大,和手动 CommitOffets 的结果一样,而且这种场景不是事务的引入的目的。
// 初始化事务,需要注意确保transation.id属性被分配
void initTransactions();
// 开启事务
void beginTransaction() throws ProducerFencedException;
// 为Consumer提供的在事务内Commit Offsets的操作
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException;
// 提交事务
void commitTransaction() throws ProducerFencedException;
// 放弃事务,类似于回滚事务的操作
void abortTransaction() throws ProducerFencedException;
复制代码
案例 1:单 Producer 保证仅一次发送
编写代码
public class MyTransactionalProducer {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "h121.wzk.icu:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 提供客户端ID
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer");
// 事务ID
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my_tx_id");
// 要求ISR确认
configs.put(ProducerConfig.ACKS_CONFIG, "all");
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
// 初始化事务
producer.initTransactions();
// 开启事务
producer.beginTransaction();
try {
// 发送消息
producer.send(new ProducerRecord<>("tp_tx_01", "tx_msg_02"));
// 可以在这里设置一些异常来测试 比如:
// int n = 1 / 0;
} catch (Exception e) {
// 中止事务
producer.abortTransaction();
} finally {
producer.close();
}
}
}
复制代码
测试运行
运行之后,控制台输出结果如下:
案例 2:消费-转换-生产 事务保证仅一次发送
编写代码
public class MyTransactional {
public static void main(String[] args) {
KafkaProducer<String, String> producer = getProducer();
KafkaConsumer<String, String> consumer = getConsumer();
// 事务的初始化
producer.initTransactions();
// 订阅主题
consumer.subscribe(Collections.singleton("tp_tx_01"));
final ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
// 开启事务
producer.beginTransaction();
try {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
producer.send(new ProducerRecord<>("tp_tx_out_01", record.key(), record.value()));
offsets.put(
new TopicPartition(record.topic(), record.partition()),
// 偏移量表示下一条要消费的消息
new OffsetAndMetadata(record.offset() + 1)
);
}
// 将该消息的偏移量提交作为事务的一部分,随事务提交和回滚(不提交消费偏移量)
producer.sendOffsetsToTransaction(offsets, "consumer_grp_02");
// 提交事务
producer.commitTransaction();
} catch (Exception e) {
e.printStackTrace();
// 中止事务
producer.abortTransaction();
} finally {
producer.close();
consumer.close();
}
}
public static KafkaProducer<String, String> getProducer() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "h121.wzk.icu:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 设置ClientID
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer_01");
// 设置事务ID
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx_id_02");
// 要求ISR确认
configs.put(ProducerConfig.ACKS_CONFIG, "all");
// 启用幂等性
configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
return producer;
}
public static KafkaConsumer<String, String> getConsumer() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "h121.wzk.icu:9092");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 设置消费组ID
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_grp_02");
// 不启⽤消费者偏移量的⾃动确认,也不要⼿动确认
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_client_02");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
return consumer;
}
}
复制代码
测试运行
(由于我测试的云服务器的 Kafka 掉线了,我又启动了一次,重新执行一次案例 1。)下面是案例 2 直接的结果如下图:
评论