写点什么

轻松解决 Kafka 数据流丢失:提升数据完整性和可靠性

作者:xfgg
  • 2023-06-16
    福建
  • 本文字数:4170 字

    阅读完需:约 14 分钟

轻松解决Kafka数据流丢失:提升数据完整性和可靠性

引言

随着数据流量的不断增长,Kafka 已经成为了许多企业首选的数据传输解决方案。然而,尽管 Kafka 运行稳定,在高吞吐量的读写负载下,仍会导致数据流丢失的问题,这对于大多数业务场景来说是无法接受的。如果你正好遇到这样的问题,那么你不用担心,因为本文将向你介绍如何在 Kafka 中轻松提升数据完整性和可靠性。

Kafka 基本概念和架构

Kafka 是一种高吞吐量、低延迟、分布式的消息传递及处理系统。主要组件包括:

  • Producer:负责将来自应用程序的数据发送到 Kafka 中。

  • Topic:用于将消息分组和分类,便于消费者订阅所需数据。

  • Broker:Kafka 集群中的单个服务器,负责存储和处理 Topic 的一个或多个分区。

  • Consumer:从 Kafka 订阅并消费数据的客户端组件。

Kafka 将消息保存在日志文件中,按照时间顺序存储。消费者通过消费位移(offset)进行消息读取。

发现数据流丢失潜在原因

数据丢失可能出现在以下原因:

  • 消息发送失败:生产者发送消息时可能因为网络错误或其他问题导致失败。

  • 服务器宕机:Kafka Broker 因为硬件故障或者宕机影响数据存储和传输。

  • 网络问题:Kafka 集群和消费者之间的低延迟,不稳定的网络导致数据流丢失。

  • 消费端消费不及时:如果消费者不能及时接收和处理数据,也会导致数据流丢失,因为当前 Partitions 已经满足了缓存调度,后续到达的数据只能顺延丢失调度。

  • 消息轮替方式:当 consumer 使用手动提交 offset 并采用同步提交方式时,如果在 commit()之前 consumer 挂点,或 commit()返回错误,消息会发生重复消费或数据丢失。

  • 重试不当:消费者产生的异常、故障等问题可能会导致重试机制进入持续异常状态,从而导致数据丢失或重复消费的问题。

  • 高并发问题:如果 Kafka 监听的 Topic 特别受欢迎,一个 partition 有很多 kafak consumer 并发消费,消费者的并发导致 Kafka 压力增大,并增大出现丢失或消息堆积等风险。

提高数据完整性关键策略

为了确保数据的可靠传输和存储,我们可以采取以下优化策略:

  • 正确配置 Kafka Producer 参数以确保数据可靠性 

  1. 设置消息的持久性级别:Producer 可以设置消息的持久性级别(如仅写内存、同步写磁盘等),以调整在不同故障场景下数据的可靠性。

  2. 设置 acks 参数:通过 acks 参数可以设置消息发送的确认策略,如无确认、只确认 leader 写入或等待所有副本写入。

  3. 设置重试策略:生产者遇到临时错误时可以进行自动重试,以提高消息传输成功的概率。

mport org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ReliableProducer {
public static void main(String[] args) {
// Kafka broker 地址 String bootstrapServers = "localhost:9092";
// 设置 Producer 的配置信息 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 设置消息可靠性配置参数 props.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本写入确认 props.put(ProducerConfig.RETRIES_CONFIG, 3); // 自动重试 3 次 props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 开启幂等性 props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
// 设置消息序列化:使用 StringSerializer 将键和值序列化为字节数组 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 创建一个 KafkaProducer 实例,并指定 key 和 value 的类型为 String KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 创建一个消息 String topic = "test-topic"; String key = "key"; String value = "value";
// 发送消息,并阻塞直到发送完成 producer.send(new ProducerRecord<>(topic, key, value)).get();
// 关闭 producer producer.close();
}}
复制代码

在示例代码中,我们将 Kafka Producer 的消息序列化器设置为 StringSerializer,使用 acks=all 并且开启了幂等性,并使用 snappy 压缩技术。不同的场景下所需的参数可能会有所变化,详细参数说明请参考 Kafka 官方文档。


  • 配置 Kafka 的数据持久化存储策略 

  1. 选择合适的日志文件存储策略:Kafka 可以配置消息的存储方式,如基于时间或基于文件大小的滚动策略。

  2. 合理设置日志文件大小、分区和副本数量:设置较大的日志文件大小可以提高分区的磁盘利用率,而分区和副本数量可以提高系统的吞吐量和冗余保护。

import org.apache.kafka.clients.admin.AdminClient;import org.apache.kafka.clients.admin.AlterConfigOp;import org.apache.kafka.clients.admin.Config;import org.apache.kafka.clients.admin.ConfigEntry;import org.apache.kafka.clients.admin.NewTopic;import org.apache.kafka.clients.admin.TopicDescription;import org.apache.kafka.clients.admin.TopicListing;import org.apache.kafka.common.config.ConfigResource;
import java.util.Collections;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import java.util.concurrent.ExecutionException;
public class KafkaConfiguration {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// Kafka broker 地址 String bootstrapServers = "localhost:9092";
Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServers);
// 创建 AdminClient try (AdminClient adminClient = AdminClient.create(props)) {
// 新建 Topic int numPartitions = 3; short replicationFactor = 2; String topicName = "test-topic"; NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor); adminClient.createTopics(Collections.singleton(newTopic)).all().get();
// 查看 Topic 列表 ListTopicsResult listTopicsResult = adminClient.listTopics(); for (TopicListing topicListing : listTopicsResult.listings().get()) { System.out.println(topicListing.name()); }
// 查看 Topic 描述 DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singleton(topicName)); Map<String, TopicDescription> topicDescriptionMap = describeTopicsResult.all().get(); System.out.println(topicName + " has " + topicDescriptionMap.get(topicName).partitions().size() + " partitions.");
// 设置 Topic 配置 Map<String, String> configs = new HashMap<>(); configs.put("segment.ms", "1000"); // 日志切片时间为1s configs.put("segment.bytes", "10485760"); // 每个日志切片的大小为10MB configs.put("retention.ms", "3600000"); // 数据保存时间为1h configs.put("message.max.bytes", "1048576"); // 单条消息最大大小为1MB
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName); Config config = new Config(Collections.singletonList(new ConfigEntry("segment.ms", "1000"))); List<AlterConfigOp> alterConfigOps = Collections.singletonList(new AlterConfigOp(config, AlterConfigOp.OpType.SET));
adminClient.incrementalAlterConfigs(Collections.singletonMap(configResource, alterConfigOps)).all().get(); }
}}
复制代码

在此示例中,我们使用 AdminClient 创建 Topic,查看 Topic 列表和描述,以及设置 Topic 的持久化存储策略。我们设置了日志切片的时间为 1s,并且每个日志切片的大小为 10MB。数据保存时间为 1h,单条消息最大大小为 1MB。Kafka 使用了消息传递责任链的方式传递消息,并在消息存储之前应用不透明的细节处理逻辑,从而实现高效和灵活的数据持久化存储。

提升系统可靠性必备策略

通过以下措施,可以降低 Kafka 集群出现故障的可能性:

  • 部署多节点的高可用 Kafka 集群 

  1. 单点故障及其影响:为了避免单点故障,可以设置多个 Kafka Broker。

  2. 配置进行故障切换的多副本集:通过配置多个 Topic 副本,可以在某个 Broker 出现故障时实现无缝切换。

  • 实现消费者端的实时监控和故障追踪 

  1. 使用监控工具及其指标:选用 JMX、Kafka 附带工具等工具可以实时监控 Kafka 集群的性能和状态。

  2. 对异常情况进行报警及快速响应:设定阈值实现自动报警,并采取措施进行问题排查和故障恢复。

  • 优化 Kafka Broker 的配置

  1. 配置主题分区和副本数量:设置合适的分区和副本数量可以提供更高的吞吐量和冗余保护。

  2. 为 Broker 分配足够的资源:确保 Broker 有足够的内存、CPU 和磁盘资源,以处理大量的读写请求。

  3. 配置底层操作系统和硬件优化:根据部署环境可以调整内核参数、I/O 调度策略等,提高系统性能。

  • 实现自动负载均衡

  1. 使用 Cruise Control 进行实时负载监测,并基于预测分析自动执行分区再分配,以确保资源的均衡使用。

  2. 定期检测和验证数据完整性

  3. 使用 Kafka Streams 或 KSQL 进行数据审计和校验,确保数据的一致性。

  • 备份和恢复策略

  1. 通过 Confluent Replicator 或其他备份工具实施异地备份,以应对灾难性故障。

  2. 制定完善的数据恢复流程和预案,确保在发生故障时能尽快进行恢复。

总结

通过对 Kafka 的优化配置以及持续监控,我们可以有效地降低数据流丢失的风险,提高数据完整性和系统可靠性。实践中,我们需要根据应用场景和需求来选择并应用这些策略,从而充分发挥 Kafka 的潜力。

发布于: 2023-06-16阅读数: 23
用户头像

xfgg

关注

THINK TWICE! CODE ONCE! 2022-11-03 加入

目前:全栈工程师(前端+后端+大数据) 目标:架构师

评论

发布
暂无评论
轻松解决Kafka数据流丢失:提升数据完整性和可靠性_Java_xfgg_InfoQ写作社区