写点什么

结合源码讲解:Kafka 消费者参数配置 (解释、定义、引用、注意事项)

发布于: 2021 年 08 月 02 日
结合源码讲解:Kafka消费者参数配置(解释、定义、引用、注意事项)

原创:石头哥 @大数据架构师 2021 年 6 月 20 日 微信:nevian668899

个人背景

  • 电商大数据存储工程师

  • 6K+存储集群调优实战者

  • 百 P 存储数据最佳实践

  • 精读 HDFS 源码 60%、glusterfs 70%,熟悉存储生态技术 Ceph ZFS lustre 等

参数配置源码解读

bootstrap.servers

  • broker 集群地址,格式:ip1:port,ip2:port...,不需要设定全部的集群地址,设置两个或者两个以上即可

  • 源码走读


// 定义参数public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";// 参数解释public static final String BOOTSTRAP_SERVERS_DOC = "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping&mdash;this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form <code>host1:port1,host2:port2,...</code>. Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).";
//创建KafkaConsumer对象consumer = new KafkaConsumer<>(props);//从客户端配置参数里获取服务端地址列表List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG));//调用metadata的方法bootstrap时,指定服务端地址列表this.metadata.bootstrap(addresses);
复制代码

group.id

消费者隶属的消费者组名称,如果为空会报异常,一般而言,这个参数要有一定的业务意义。


参数定义的代码:


public static final String GROUP_ID_CONFIG = "group.id";public static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy.";
复制代码


参数使用地方


创建 KafkaConsumer 对象时,根据配置项解析出组标识,见行 1 代码


GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config, GroupRebalanceConfig.ProtocolType.CONSUMER);this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);//1
复制代码


而具体的解析逻辑在组负载均衡配置类内部进行,见行 2 代码


public GroupRebalanceConfig(AbstractConfig config, ProtocolType protocolType) {        this.sessionTimeoutMs = config.getInt(CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG);
// Consumer and Connect use different config names for defining rebalance timeout if (protocolType == ProtocolType.CONSUMER) { this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); } else { this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.REBALANCE_TIMEOUT_MS_CONFIG); }
this.heartbeatIntervalMs = config.getInt(CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG); this.groupId = config.getString(CommonClientConfigs.GROUP_ID_CONFIG);//2 //省略.....
复制代码

fetch.min.bytes

消费者从服务器获取记录的最小字节数,broker 收到消费者拉取数据的请求的时候,如果可用数据量小于设置的值,那么 broker 将会等待有足够可用的数据的时候才返回给消费者,这样可以降低消费者和 broker 的工作负载。如果消费者的数量比较多,把该属性的值设置得大一点可以降低 broker 的工作负载。


参数定义的代码:


public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes";//0 定义参数 private static final String FETCH_MIN_BYTES_DOC = "The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency.";
[省略..]//1 构建ConfigDef(该类用于指定一组配置)对象时,引用 FETCH_MIN_BYTES_CONFIGdefine(FETCH_MIN_BYTES_CONFIG,//1 Type.INT, 1,//默认值1B字节 atLeast(0), Importance.HIGH, FETCH_MIN_BYTES_DOC)
复制代码


创建 KafkaConsumer 对象的内部属性 fetcher 时,用到该参数,见行 1 代码


//KafkaConsumer构造函数体,省略...this.fetcher = new Fetcher<>(                    logContext,                    this.client,                    config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),//1                    config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),                    config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),                    config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),                    config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),                    config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),                    config.getString(ConsumerConfig.CLIENT_RACK_CONFIG),                    this.keyDeserializer,                    this.valueDeserializer,                    this.metadata,                    this.subscriptions,                    metrics,                    metricsRegistry,                    this.time,                    this.retryBackoffMs,                    this.requestTimeoutMs,                    isolationLevel,                    apiVersions);
复制代码


而 Fetcher 的主要工作就是发送 FetchRequest 请求,获取指定的消息集合,处理 FetchResponse,更新消费位置,其中定义属性 minBytes,见行 2 代码


public class Fetcher<K, V> implements Closeable {    private final Logger log;    private final LogContext logContext;    private final ConsumerNetworkClient client;    private final Time time;    private final int minBytes;//2
复制代码

fetch.max.bytes

该参数与 fetch.min.bytes 参数对应,它用来配置 Consumer 在一次拉取请求中从 Kafka 中拉取的最大数据量,默认值为 52428800(B),也就是 50MB。如果这个参数设置的值比任何一条写入 Kafka 中的消息要小,那么会不会造成消费者无法拉取消息呢?该参数设定的不是绝对的最大值,如果在第一个非空分区中拉取的第一条消息大于该值,那么该消息将仍然返回,以确保消费者继续工作。因此,这不是一个绝对最大值。被 Broker 接收的批量记录最大大小是通过 broker 端参数 message.max.bytes 或主题配置参数 max.message.bytes 来定义的。


参数定义的代码:


public static final String FETCH_MAX_BYTES_CONFIG = "fetch.max.bytes";private static final String FETCH_MAX_BYTES_DOC = "The maximum amount of data the server should return for a fetch request. " +            "Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. The maximum record batch size accepted by the broker is defined via <code>message.max.bytes</code> (broker config) or <code>max.message.bytes</code> (topic config). Note that the consumer performs multiple fetches in parallel.";public static final int DEFAULT_FETCH_MAX_BYTES = 50 * 1024 * 1024;//1 默认值50M
复制代码


引用到获取最大字节数参数的代码如下:


this.fetcher = new Fetcher<>(    logContext,    this.client,    config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),    config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),//2 控制拉取消息最大大小    省略...
复制代码

fetch.max.wait.ms

如果 Kafka 仅仅参考 fetch.min.bytes 参数的要求,那么有可能会因为获取不到足够大小的消息而一直阻塞等待,从而无法发送响应给 Consumer,显然这是不合理的。fetch.max.wait.ms 参数用于指定 等待 FetchResponse 的最长时间,服务端根据此时间决定何时进行响应,默认值为 500(ms)。如果 Kafka 中没有足够多的消息而满足不了 fetch.min.bytes 参数的要求,那么最终会等待 500ms 再响应消费者请求。这个参数的设定需要参考 Consumer 与 Kafka 之间的延迟大小,如果业务应用对延迟敏感,那么可以适当调小这个参数。


参数定义的代码:


public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms";//0 定义参数private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes.";
//[省略..] [org.apache.kafka.clients.consumer.ConsumerConfig.java line:411].define(FETCH_MAX_WAIT_MS_CONFIG, Type.INT, 500, //默认值(ms) atLeast(0), Importance.LOW, FETCH_MAX_WAIT_MS_DOC)
复制代码


引用参数的代码:


this.fetcher = new Fetcher<>(                    logContext,                    this.client,                    config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),                    config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),                    config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),//1 设置拉取消息时等待最长时间                    config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),                    省略...
复制代码


而 Fetcher 类用于管理从 brokers 获取消息的过程,其中定义属性 maxWaitMs,见行 3 代码,具体控制由 Server 端控制。


public class Fetcher<K, V> implements Closeable {    private final Logger log;    private final LogContext logContext;    private final ConsumerNetworkClient client;    private final Time time;    private final int minBytes;    private final int maxBytes;    private final int maxWaitMs;//3 最大等待时间
复制代码

max.partition.fetch.bytes

这个参数用来配置从每个分区里返回给 Consumer 的最大数据量,默认值为 1048576(B),即 1MB。这个参数与 fetch.max.bytes 参数相似,只不过前者用来限制一次拉取中每个分区的消息大小,而后者用来限制一次拉取中整体消息的大小。


参数定义的代码和解释如下:


public static final String MAX_PARTITION_FETCH_BYTES_CONFIG = "max.partition.fetch.bytes";//0 定义参数private static final String MAX_PARTITION_FETCH_BYTES_DOC = "The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined via <code>message.max.bytes</code> (broker config) or <code>max.message.bytes</code> (topic config). See " + FETCH_MAX_BYTES_CONFIG + " for limiting the consumer request size.";public static final int DEFAULT_MAX_PARTITION_FETCH_BYTES = 1 * 1024 * 1024;//默认值1M
复制代码


引用到的地方,行 1 代码:


this.fetcher = new Fetcher<>(    logContext,    this.client,    config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),    config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),    config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),    config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),//1 从每个分区能够拉取的最大消息字节大小    省略...
复制代码

max.poll.records

这个参数用来配置 Consumer 在一次拉取请求中拉取的最大消息数,默认值为 500(条)。如果消息的大小都比较小,则可以适当调大这个参数值来提升一定的消费速度。


参数定义的代码:


public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";//0 定义参数private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned in a single call to poll(). Note, that <code>" + MAX_POLL_RECORDS_CONFIG + "</code> does not impact the underlying fetching behavior.The consumer will cache the records from each fetch request and returns them incrementally from each poll.";
复制代码


同样,应用参数的代码是 Fetcher 构造函数:


this.fetcher = new Fetcher<>(    logContext,    this.client,    config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),    config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),    config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),    config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),    config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),//1一次拉取的最大消息数    省略...
复制代码

connections.max.idle.ms

这个参数用来指定在多久之后关闭闲置的连接,其中源码注释有提到:


default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time。


也就是默认值设置为略低于服务器默认值(10 分钟),以避免客户端和服务器同时关闭连接,所以默认值是 540000(ms),即 9 分钟。具体根据各自应用系统资源的实际情况自行调整。


public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;//0 定义参数//引用的CommonClientConfigs类,变量名定义及解释public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";public static final String CONNECTIONS_MAX_IDLE_MS_DOC = "Close idle connections after the number of milliseconds specified by this config.";
复制代码


构建 ConfigDef 对象时,引用到该参数以及设定默认值如下:


define(CONNECTIONS_MAX_IDLE_MS_CONFIG,        Type.LONG,        9 * 60 * 1000, //默认值9 分钟        Importance.MEDIUM,        CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
复制代码


NIO 多路复用器,使用到该参数:


new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG)/*1*/, metrics, time, metricGrpPrefix, channelBuilder, logContext)//1 防止空闲的Socket连接占用资源
复制代码

exclude.internal.topics

Kafka 中有两个内部的主题: _consumer_offsets 和 _transaction_state。exclude.internal.topics 用来指定 Kafka 中的内部主题是否可以向消费者公开,默认值为 true。如果设置为 true,那么只能使用 subscribe(Collection)的方式而不能使用 subscribe(Pattern)的方式来订阅内部主题,设置为 false 则没有这个限制。


参数定义的代码如下:


public static final String EXCLUDE_INTERNAL_TOPICS_CONFIG = "exclude.internal.topics";//0 定义参数private static final String EXCLUDE_INTERNAL_TOPICS_DOC = "Whether internal topics matching a subscribed pattern should be excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.";public static final boolean DEFAULT_EXCLUDE_INTERNAL_TOPICS = true;
复制代码

receive.buffer.bytes

这个参数用来设置 Socket 接收消息缓冲区(SO_RECBUF)的大小,默认值为 65536(B),即 64KB。如果设置为-1,则使用操作系统的默认值:87380B。适当调大该值,可以增加消息吞吐量。尤其 Client 和 Broker 之间是跨 IDC 通信,建议适当调大值。


参数定义代码:


public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG;//0 定义参数//引用的CommonClientConfigs类,变量名定义及解释public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";public static final String RECEIVE_BUFFER_DOC = "The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.";public static final int RECEIVE_BUFFER_LOWER_BOUND = -1;
复制代码


构建 ConfigDef(该类用于指定一组配置)对象时,引用 RECEIVE_BUFFER_CONFIG,默认值 64K。


define(RECEIVE_BUFFER_CONFIG,//1        Type.INT,        64 * 1024,//默认值        atLeast(CommonClientConfigs.RECEIVE_BUFFER_LOWER_BOUND),        Importance.MEDIUM,        CommonClientConfigs.RECEIVE_BUFFER_DOC)
复制代码


构建网络客户端,它是用于异步请求/响应网络 i/o,其中构造参数引用到:


new NetworkClient(      selector,      new ManualMetadataUpdater(),      clientId,      1,      0,      0,      Selectable.USE_DEFAULT_BUFFER_SIZE,      consumerConfig.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),//2 Socket接收消息缓冲区   省略...
复制代码

send.buffer.bytes

这个参数用来设置 Socket 发送消息缓冲区(SO_SNDBUF)的大小,默认值为 131072(B),即 128KB。与 receive.buffer.bytes 参数一样,如果设置为-1,则使用操作系统的默认值。理论上发送缓冲区越大,吞吐量越高。具体还要考虑系统内存大小、延迟容忍性、并发量等因素,综合考虑后再调整。


引用的 CommonClientConfigs 类,变量名定义及解释代码:


public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";public static final String SEND_BUFFER_DOC = "The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used.";public static final int SEND_BUFFER_LOWER_BOUND = -1;
复制代码


构建 ConfigDef(该类用于指定一组配置)对象时,引用该参数:


define(SEND_BUFFER_CONFIG,//1        Type.INT,        128 * 1024,//默认值        atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND),        Importance.MEDIUM,        CommonClientConfigs.SEND_BUFFER_DOC)
复制代码


构建网络客户端,它是用于异步请求/响应网络 i/o,其中构造参数发送缓冲区引用到该参数:


NetworkClient netClient = new NetworkClient(                    new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix,    channelBuilder, logContext),                    this.metadata,                    clientId,                    100, // a fixed large enough value will suffice for max in-flight requests                    config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),                    config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),                    config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),//2 设置发送缓冲区大小   
复制代码

request.timeout.ms

这个参数用来配置 Consumer 等待请求响应的最长时间,默认值为 30000(ms),具体由客户端控制。


引用的 CommonClientConfigs 类,变量名定义及解释:


public static final String REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms";public static final String REQUEST_TIMEOUT_MS_DOC = "The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.";
复制代码


构建 ConfigDef(该类用于指定一组配置)对象时,引用到,默认值为 30s:


define(REQUEST_TIMEOUT_MS_CONFIG,//1        Type.INT,        30000,//默认值(ms)        atLeast(0),        Importance.MEDIUM,        REQUEST_TIMEOUT_MS_DOC)
复制代码


构建网络客户端,用于异步请求/响应网络 i/o,如果网络请求一直得不到响应,则客户端报超时异常 org.apache.kafka.common.errors.TimeoutException:


 new NetworkClient(      selector,      new ManualMetadataUpdater(),      clientId,      1,      0,      0,      Selectable.USE_DEFAULT_BUFFER_SIZE,      consumerConfig.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),      consumerConfig.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),//2 设置网络请求超时多久
复制代码

metadata.max.age.ms

这个参数用来配置元数据的过期时间,默认值为 300000(ms),即 5 分钟。如果元数据在此参数所限定的时间范围内没有进行更新,则会被强制更新,即使没有任何分区变化或有新的 broker 加入。有时无法消费到新创建的 topic 消息,这时我们要修改这个参数的时间,让元数据更新更快些。


引用的 CommonClientConfigs 类,变量名定义及解释代码:


public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.";
复制代码


引用的代码,构建 ConfigDef(该类用于指定一组配置)对象时,引用该参数:


define(METADATA_MAX_AGE_CONFIG, //1       Type.LONG,       5 * 60 * 1000, //默认值 5min       atLeast(0),       Importance.LOW,       CommonClientConfigs.METADATA_MAX_AGE_DOC)
复制代码

reconnect.backoff.ms

这个参数用来配置尝试重新连接指定主机之前的等待时间(也称为退避时间),避免频繁地连接主机,默认值为 50(ms)。建议配置不宜过小,否则加大 broker 机器的性能开销;如果过大,会影响服务恢复的效率,所以需要权衡一个合适的值。


引用的 CommonClientConfigs 类,变量名定义及解释:


public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";public static final String RECONNECT_BACKOFF_MS_DOC = "The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker.";
复制代码


引用的代码,构建 ConfigDef 对象时,引用该参数:


define(RECONNECT_BACKOFF_MS_CONFIG,//1       Type.LONG,       50L,//默认是 50ms       atLeast(0L),       Importance.LOW,       CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
复制代码


构建网络客户端,限制重建 socket 的间隔时间:


NetworkClient netClient = new NetworkClient(        new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext),        this.metadata,        clientId,        100, // a fixed large enough value will suffice for max in-flight requests        config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),省略...
复制代码

auto.offset.reset

如果 Consumer 在 zk 中发现没有初始的 offset 时(例如,因为该数据已被删除),该怎么办?Kafka 提供了三种选项,用于控制开始的消费位移。


  • earliest:从最早的 offset,即 partition 的起始位置开始消费

  • latest:从最近的 offset 开始消费,就是新发送 partition 的消息才会被消费

  • none:只要分区不存在已提交的 offset,则抛出异常

  • anything else: 抛出异常到消费者


public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";public static final String AUTO_OFFSET_RESET_DOC = "What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): <ul><li>earliest: automatically reset the offset to the earliest offset<li>latest: automatically reset the offset to the latest offset</li><li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li><li>anything else: throw exception to the consumer.</li></ul>";
//在KafkaConsumer的构造函数中,构建偏移量复位策略对象时,引用到该变量OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));this.subscriptions = new SubscriptionState(logContext, offsetResetStrategy);
//OffsetResetStrategy枚举定义了三种值public enum OffsetResetStrategy { LATEST, EARLIEST, NONE}
复制代码

enable.auto.commit

配置是否开启自动提交消费位移的功能,默认 True,代表开启。在实践中,不建议开启,建议由消费者程序自行控制位移提交。


参数定义和解释代码:


public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit";private static final String ENABLE_AUTO_COMMIT_DOC = "If true the consumer's offset will be periodically committed in the background.";
复制代码


引用的代码,构建 ConfigDef 对象时引用:


define(ENABLE_AUTO_COMMIT_CONFIG,//1       Type.BOOLEAN,       true,//默认值true       Importance.MEDIUM,       ENABLE_AUTO_COMMIT_DOC)
复制代码

auto.commit.interval.ms

当 enbale.auto.commit 参数设置为 true 时才生效,表示开启自动提交消费位移功能时,自动提交消费位移的时间间隔。


参数定义和解释如下:


public static final String AUTO_COMMIT_INTERVAL_MS_CONFIG = "auto.commit.interval.ms";private static final String AUTO_COMMIT_INTERVAL_MS_DOC = "The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if <code>enable.auto.commit</code> is set to <code>true</code>.";
复制代码


构建消费协调者对象时,引用了该变量,用于设置 Timer 对象 nextAutoCommitTimer,周期性控制提交位移操作:


new ConsumerCoordinator(groupRebalanceConfig,                        logContext,                        this.client,                        assignors,                        this.metadata,                        this.subscriptions,                        metrics,                        metricGrpPrefix,                        this.time,                        enableAutoCommit,                        config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),//1 设置周期                        this.interceptors,                        config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
复制代码

partition.assignment.strategy

消费者的分区分配策略,通过实现消费者分配接口 org.apache.kafka.clients.consumer.ConsumerPartitionAssignor,允许你定制自定义分配策略。默认的分区分配器是轮询分配器:org.apache.kafka.clients.consumer.RoundRobinAssignor,RoundRobinAssignor 策略的原理是将消费组内所有消费者以及消费者所订阅的所有 topic 的 partition 按照字典序排序,然后通过轮询方式逐个将分区以此分配给每个消费者。如果有不满足业务的分配策略,可以自定义。


参数定义和解释代码:


public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "A list of class names or class types, " +            "ordered by preference, of supported partition assignment " +            "strategies that the client will use to distribute partition " +            "ownership amongst consumer instances when group management is " +            "used.<p>In addition to the default class specified below, " +            "you can use the " +            "<code>org.apache.kafka.clients.consumer.RoundRobinAssignor</code>" + //默认:轮询分区分配器            "class for round robin assignments of partitions to consumers. " +            "</p><p>Implementing the " +            "<code>org.apache.kafka.clients.consumer.ConsumerPartitionAssignor" + //实现接口            "</code> interface allows you to plug in a custom assignment" +            "strategy.";
复制代码


在 KafkaConsumer 构造函数中,获取客户端分区分配器集合时引用该参数,用于指定分配策略,其中 getAssignorInstances 函数是:根据 ConsumerConfig 指定的类名/类型获取已配置的 ConsumerPartitionAssignor 实例列表,其中实现旧版 PartitionAssignor 接口的任何实例都封装在新 ConsumerPartitionAssignor 接口的适配器中。


private List<ConsumerPartitionAssignor> assignors;this.assignors = getAssignorInstances(config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), config.originals());
复制代码

interceptor.class

用来配置消费者客户端的拦截器。可以配置多个拦截器类,中间以逗号分隔开。实现 ConsumerInterceptor 的拦截器类,支持截取、修改消费者收到的消息记录,默认情况下,是没有拦截器的。


参数定义和解释如下:


public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";public static final String INTERCEPTOR_CLASSES_DOC = "A list of classes to use as interceptors. Implementing the <code>org.apache.kafka.clients.consumer.ConsumerInterceptor</code> interface allows you to intercept (and possibly mutate) records received by the consumer. By default, there are no interceptors.";
复制代码


在 KafkaConsumer 构造函数中,根据 ConsumerConfig 配置的 interceptor.classes 名称,加载接口拦截器实例列表。


List<ConsumerInterceptor<K, V>> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs, false)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class);
复制代码

总结

Kafka 参数众多,涉及生产者、服务端、消费者,三端均有不同的参数定义。这些参数的调整影响整个 Kafka 的处理延迟、吞吐量、容错性、资源管理以及实现消息的个性化处理。当然影响整个集群运行的稳定性还包括:服务器配置、网络带宽、磁盘容量、内核参数、业务流量大小等等。Kafka 预留的参数给我们预留了调整、干预系统运行的入口,我们要好好掌握它,从而提高运维能力,让集群稳定运行。如果满足不了我们的需求,可能需要修改 Kafka 内核代码,以满足适应复杂业务的场景。

用户头像

如切如磋;如琢如磨 2019.10.21 加入

还未添加个人简介

评论

发布
暂无评论
结合源码讲解:Kafka消费者参数配置(解释、定义、引用、注意事项)