本文是一个 Kafka 使用过程中的常见错误的总结。希望对你有帮助。
1、UnknownTopicOrPartitionException
org.apache.kafka.common.errors.UnknownTopicOrPartitionException:
This server does not host this topic-partition
复制代码
报错内容:分区数据不在
原因分析:producer 向不存在的 topic 发送消息,用户可以检查 topic 是否存在 或者设置 auto.create.topics.enable 参数
2、LEADER_NOT_AVAILABLE
WARN Error while fetching metadata with correlation id 0 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClien
复制代码
报错内容:leader 不可用
原因分析:原因很多 topic 正在被删除 正在进行 leader 选举 使用 kafka-topics 脚本检查 leader 信息
进而检查 broker 的存活情况 尝试重启解决。
3、NotLeaderForPartitionException
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition
复制代码
报错内容:broker 已经不是对应分区的 leader 了
原因分析:发生在 leader 变更时 当 leader 从一个 broker 切换到另一个 broker 时,要分析什么原因引起了 leader 的切换。
4、TimeoutException
org.apache.kafka.common.errors.TimeoutException: Expiring 5 record(s) for test-0: 30040 ms has passe
复制代码
报错内容:请求超时
原因分析:观察哪里抛出的 观察网络是否能通 如果可以通 可以考虑增加 request.timeout.ms 的值
5、RecordTooLargeException
WARN async.DefaultEventHandler: Produce request with correlation id 92548048 failed due to [TopicName,1]: org.apache.kafka.common.errors.RecordTooLargeException
复制代码
报错内容:消息过大
原因分析:生产者端 消息处理不过来了 可以增加 request.timeout.ms 减少 batch.size
6、Closing socket connection
Closing socket connection to/127,0,0,1.(kafka.network.Processor)
复制代码
报错内容:连接关闭
原因分析:如果 javaApi producer 版本高,想在客户端 consumer 启动低版本验证,会不停的报错
无法识别客户端消息。
7、ConcurrentModificationException
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
复制代码
报错内容:线程不安全
原因分析:Kafka consumer 是非线程安全的
8、NetWorkException
[kafka-producer-network-thread | producer-1] o.apache.kafka.common.network.Selector : [Producer clientId=producer-1] Connection with / disconnected
复制代码
报错内容:网络异常
原因分析:网络连接中断 检查 broker 的网络情况
八千里路云和月 | 从零到大数据专家学习路径指南
我们在学习Flink的时候,到底在学习什么?
193篇文章暴揍Flink,这个合集你需要关注一下
Flink生产环境TOP难题与优化,阿里巴巴藏经阁YYDS
Flink CDC我吃定了耶稣也留不住他!| Flink CDC线上问题小盘点
我们在学习Spark的时候,到底在学习什么?
在所有Spark模块中,我愿称SparkSQL为最强!
硬刚Hive | 4万字基础调优面试小总结
9、ILLEGAL_GENERATION
ILLEGAL_GENERATION occurred while committing offsets for group
复制代码
报错内容:无效的“代”
原因分析:consumer 错过了 rebalance 原因是 consumer 花了大量时间处理数据。
需要适当减少 max.poll.records 值 增加 max.poll.interval.ms 或者想办法增加消息处理的速度
10、启动 advertised.listeners 配置异常
java.lang.IllegalArgumentException: requirement failed: advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address.
at scala.Predef$.require(Predef.scala:277)
at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1203)
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1170)
at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:881)
at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:878)
at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
at kafka.Kafka$.main(Kafka.scala:82)
at kafka.Kafka.main(Kafka.scala)
复制代码
解决方法:修改 server.properties
advertised.listeners=PLAINTEXT://{ip}:9092 # ip可以内网、外网ip、127.0.0.1 或域名
复制代码
解析:
server.properties 中有两个 listeners。listeners:启动 kafka 服务监听的 ip 和端口,可以监听内网 ip 和 0.0.0.0(不能为外网 ip),默认为 java.net.InetAddress.getCanonicalHostName()获取的 ip。advertised.listeners:生产者和消费者连接的地址,kafka 会把该地址注册到 zookeeper 中,所以只能为除 0.0.0.0 之外的合法 ip 或域名 ,默认和 listeners 的配置一致。
11、启动 PrintGCDateStamps 异常
[0.004s][warning][gc] -Xloggc is deprecated. Will use -Xlog:gc:/data/service/kafka_2.11-0.11.0.2/bin/../logs/kafkaServer-gc.log instead.
Unrecognized VM option 'PrintGCDateStamps'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
复制代码
解决方法: 更换 jdk1.8.x 版本或者使用>=kafka1.0.x 的版本。
解析:
只有在 jdk1.9 并且 kafka 版本在 1.0.x 之前的版本才会出现。
12、生成者发送 message 失败或消费者不能消费(kafka1.0.1)
#(java)org.apache.kafka警告
Connection to node 0 could not be established. Broker may not be available.
# (nodejs) kafka-node异常 (执行producer.send后的异常)
{ TimeoutError: Request timed out after 30000ms
at new TimeoutError (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\errors\TimeoutError.js:6:9)
at Timeout.setTimeout [as _onTimeout] (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\kafkaClient.js:737:14)
at ontimeout (timers.js:466:11)
at tryOnTimeout (timers.js:304:5)
at Timer.listOnTimeout (timers.js:264:5) message: 'Request timed out after 30000ms' }
复制代码
解决方法: 检查 advertised.listeners 的配置(如果有多个 Broker 可根据 java 版本的对应的 node 号检查配置),判断当前的网络是否可以连接到地址(telnet 等)
13、partitions 配置的值过小造成错误(kafka1.0.1)
#(java)org.apache.kafka(执行producer.send)
Exception in thread "main" org.apache.kafka.common.KafkaException: Invalid partition given with record: 1 is not in the range [0...1).
at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:908)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:778)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:768)
at com.wenshao.dal.TestProducer.main(TestProducer.java:36)
# (nodejs) kafka-node异常 (执行producer.send后的异常)
{ BrokerNotAvailableError: Could not find the leader
at new BrokerNotAvailableError (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\errors\BrokerNotAvailableError.js:11:9)
at refreshMetadata.error (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\kafkaClient.js:831:16)
at D:\project\node\kafka-test\src\node_modules\kafka-node\lib\client.js:514:9
at KafkaClient.wrappedFn (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\kafkaClient.js:379:14)
at KafkaClient.Client.handleReceivedData (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\client.js:770:60)
at Socket.<anonymous> (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\kafkaClient.js:618:10)
at Socket.emit (events.js:159:13)
at addChunk (_stream_readable.js:265:12)
at readableAddChunk (_stream_readable.js:252:11)
at Socket.Readable.push (_stream_readable.js:209:10) message: 'Could not find the leader' }
复制代码
解决方法: 修改 num.partitions 的值,partitions 在是在创建 topic 的时候默认创建的 partitions 节点的个数,只对新创建的 topic 生效,所有尽量在项目规划时候定一个合理的值。也可以通过命令行动态扩容()
./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 2 --topic foo
复制代码
14、Kafka-Topic 操作
cd /usr/kafka/bin
./kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave3:2181/kafka --replication-factor 1 --partitions 1 --topic mobilePhone
复制代码
如果创建时,报以下错误:Error while executing topic command : Replication factor: 1 larger than available brokers: 0.
解决方法:很可能是之前在 server.properties 配置文件夹里面和执行命令的 zookeeper 目录不一致。--zookeeper 的值需要带上根目录,否则就会报这样的错误。例如配置文件里面写的连接目录是 zookeeper.connect=master:2181,slave1:2181,slave3:2181/kafka,但是在执行命令时少写了 kafka 目录,写成一下
--zookeeper master:2181,slave1:2181,slave3:2181。就会报上述的错误,因此,务必要保证 zookeeper 的目录一致。
当 Topic 成功创建时,会输出 Created topic “mobilePhone”,如上图。
注意:replication-factor 不能大于 broker 数。
cd /usr/kafka/bin
./kafka-topics.sh --describe --zookeeper master:2181,slave1:2181,slave3:2181/kafka --topic mobilePhone
复制代码
可以不指定具体的 topic 名称,即不带--topic 参数的执行命令,即可查询所有的 topic 及其信息。
cd /usr/kafka/bin
./kafka-topics.sh --alter --zookeeper master:2181,slave1:2181,slave3:2181/kafka --partitions 5 --topic mobilePhone
复制代码
注意:刚刚在输入命令时报了以下的错误:
Error while executing topic command : Topic mobilePhone does not exist on ZK path master:2181,slave1:2181,slave3:2181 [2018-11-29 16:14:02,100] ERROR java.lang.IllegalArgumentException: Topic mobilePhone does not exist on ZK path master:2181,slave1:2181,slave3:2181 at kafka.admin.TopicCommand$.alterTopic(TopicCommand.scala:124) at kafka.admin.TopicCommand$.main(TopicCommand.scala:65) at kafka.admin.TopicCommand.main(TopicCommand.scala) (kafka.admin.TopicCommand$)
造成这个错误的原因也是 在执行命令时,忘记输入配置 zookeeper 时的根目录 hostname:port/kafak,直接写成了主机名加端口号,从而 zookeeper 找不到 topic 的路径。
cd /usr/kafka/bin
./kafka-topics.sh --delete --zookeeper master:2181,slave1:2181,slave3:2181/kafka --topic mobilePhone
复制代码
在执行删除 topic 命令时,会提示无法删除,这是因为在 server.properties 的配置文件中,kafka 默认为无法删除即 false,因此需要去各个节点的配置文件中修改 delete.topic.enable=true。之后便可正常删除。如下图所示标记
但是,这样删除只是将刚刚的 topic 标记为删除状态,并没有真正意义上的删除,当重新创建一个同名的 topic 时,依然会报错,该 topic 已存在。因此,为了能够彻底的删除 topic,我们进入 zookeeper 的 bin 目录下,输入./zkCli.sh 进入 zookeeper 的命令行,删除三个目录:1、rmr /kafka/brokers/topics/mobilePhone; 2、rmr /kafka/admin/delete_topics/mobliePhone; 3、rmr /kafka/config/topics/mobilePhone
此时,便可以完全删除该 topic,如果重新创建同名 topic 依然报已存在错误,需要重启 kafka 服务即可解决。
15、Kafka-Producer 操作
在执行生产者和消费者命令之前,我们按照上面的创建方法,创建一个 topic 为 newPhone,并更改它的分区为 2。此处我们设置的 zookeeper 地址都为 localhost:2181/kafak,这与上面并无不同,只不过是在当前机器上创建。推荐此种做法,不仅简洁,而且节约空间。
cd /usr/kafka/config
./kafka-console-producer.sh --broker-list localhost:9092 --topic newPhone
复制代码
broker-list:kafka 的服务地址(用多个逗号隔开),端口号默认为 9092,如果不想使用该端口号,可以更改 config 下的 server.properties 配置文件进行修改,如下图:
--topic newPhone:代表生产者绑定了这个 topic,并且会向此主题里面生产数据,正确执行命令之后,可得如下图所示,并可以开始输入数据。
16、kafka-consumer 操作
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic newPhone [--from-beginning]
复制代码
--bootstrap-server:kafka 的服务地址, --topic newPhone:绑定主题,开始从指定 topic 里面消费(取出)数据,[--from-beginning]:从头开始读数据,并不是从 consumer 连上之后开始读。
整个运行流程,首先我们在使用 producer 生产几条数据:
。
此时,我们在 ssh 工具上(小厨用的是 SecureCRT,蛮好用的嘞),clone 一个 Session。执行./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic newPhone,可以看到如下图一样,如果不加 from-beginning 参数选项,consumer 则读不到 Topic 里面之前 Producer 生产的四条数据。
添加 from-beginning 参数选项之后,consumer 便可以从头开始消费 Topic 里面的数据。如下图所示:
提示:如果在生产者生产数据时,输入 message 出现以下错误:
[root@master bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic newPhone
>hisdhodsa
[2018-11-29 17:28:16,926] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {newPhone=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient)
[2018-11-29 17:28:17,080] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 5 : {newPhone=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient)
复制代码
解决方法:由于 9092 端口没有开,所以在 server.properties 配置文件里,将 listeners=PLAINTEXT://:9092 的注释删除,如下图所示
17、kafka-启动报错
第一种错误
2017-02-17 17:25:29,224] FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.KafkaException: Failed to acquire lock on file .lock in /var/log/kafka-logs. A Kafka instance in another process or thread is using this directory.
at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:100)
at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:97)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.log.LogManager.lockLogDirs(LogManager.scala:97)
at kafka.log.LogManager.<init>(LogManager.scala:59)
at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:609)
at kafka.server.KafkaServer.startup(KafkaServer.scala:183)
at io.confluent.support.metrics.SupportedServerStartable.startup(SupportedServerStartable.java:100)
at io.confluent.support.metrics.SupportedKafka.main(SupportedKafka.java:49)
复制代码
解决方法:Failed to acquire lock on file .lock in /var/log/kafka-logs.--问题原因是有其他的进程在使用 kafka,ps -ef|grep kafka,杀掉使用该目录的进程即可;
第二种错误:对 index 文件无权限
把文件的权限更改为正确的用户名和用户组即可;
目录/var/log/kafka-logs/,其中__consumer_offsets-29 是偏移量;
第三种生产消费报错:jaas 连接有问题
kafka_client_jaas.conf文件配置有问题
16环境上
/opt/dataload/filesource_wangjuan/conf下kafka_client_jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/home/client/keytabs/client.keytab"
serviceName="kafka"
principal="client/dcp@DCP.COM";
};
复制代码
18、kafka-生产报错
第一种:生产者向 topic 发送消息失败:
[2017-03-09 09:16:00,982] [ERROR] [startJob_Worker-10] [DCPKafkaProducer.java line:62] produceR向topicdf02211发送信息出现异常
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:335)
复制代码
原因是配置文件:kafka_client_jaas.conf 中配置有问题,keyTab 的路径不对,导致的;
第二种:生产消费报错:Failed to construct kafka producer
报错关键信息:Failed to construct kafka producer
解决方法:配置文件问题:KafkaClient 中 serviceName 应该是 kafka,之前配置成了 zookeeper;重启后,就好了;
配置文件如下:
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
useTicketCache=false
serviceName=kafka
keyTab="/etc/security/keytabs/kafka.service.keytab"
principal="kafka/dcp16@DCP.COM";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
serviceName=kafka
keyTab="/etc/security/keytabs/kafka.service.keytab"
principal="kafka/dcp16@DCP.COM";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
useTicketCache=false
serviceName=zookeeper
keyTab="/etc/security/keytabs/kafka.service.keytab"
principal="kafka/dcp16@DCP.COM";
};
复制代码
问题描述:
[kafka@DCP16 bin]$ ./kafka-console-producer --broker-list DCP16:9092 --topic topicin050511 --producer.config ../etc/kafka/producer.properties
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:335)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:188)
at kafka.producer.NewShinyProducer.<init>(BaseProducer.scala:40)
at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:45)
at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: Conflicting serviceName values found in JAAS and Kafka configs value in JAAS file zookeeper, value in Kafka config kafka
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:277)
... 4 more
Caused by: java.lang.IllegalArgumentException: Conflicting serviceName values found in JAAS and Kafka configs value in JAAS file zookeeper, value in Kafka config kafka
at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:305)
at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:103)
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:45)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)
... 7 more
[kafka@DCP16 bin]$ ./kafka-console-producer --broker-list DCP16:9092 --topic topicin050511 --producer.config ../etc/kafka/producer.properties
消费时报错: ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$)
[root@DCP16 bin]# ./kafka-console-consumer --zookeeper dcp18:2181,dcp16:2181,dcp19:2181/kafkakerberos --from-beginning --topic topicout050511 --new-consumer --consumer.config ../etc/kafka/consumer.properties --bootstrap-server DCP16:9092
[2017-05-07 22:24:37,479] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:587)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:569)
at kafka.consumer.NewShinyConsumer.<init>(BaseConsumer.scala:53)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:64)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:51)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623)
... 6 more
Caused by: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user
at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:899)
at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:719)
at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:584)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at javax.security.auth.login.LoginContext.invoke(LoginContext.java:762)
at javax.security.auth.login.LoginContext.access$000(LoginContext.java:203)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:690)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:688)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:687)
at javax.security.auth.login.LoginContext.login(LoginContext.java:595)
at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:69)
at org.apache.kafka.common.security.kerberos.KerberosLogin.login(KerberosLogin.java:110)
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:46)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)
复制代码
衍生问题:
kafka 生产消息就会报错:
[2017-05-07 23:17:16,240] ERROR Error when sending message to topic topicin050511 with key: null, value: 0 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
复制代码
把 KafkaClient 更改为如下的配置,就可以 了:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true;
};
复制代码
19、kafka-消费报错
第一种错误:
replication factor: 1 larger than available brokers: 0
复制代码
消费时报错:
Error while executing topic command : replication factor: 1 larger than available brokers: 0
复制代码
解决办法:
/confluent-3.0.0/bin 下重启daemon
./kafka-server-stop -daemon ../etc/kafka/server.properties
./kafka-server-start -daemon ../etc/kafka/server.properties
复制代码
然后 zk 重启;sh zkCli.sh -server ai186;
/usr/hdp/2.4.2.0-258/zookeeper/bin/zkCli.sh --脚本的目录
如果还报错,可以查看配置文件中下面的配置:
zookeeper.connect=dcp18:2181/kafkakerberos;--是 group 名称
第二种错误:TOPIC_AUTHORIZATION_FAILED
./bin/kafka-console-consumer --zookeeper DCP185:2181,DCP186:2181,DCP187:2181/kafka --from-beginning --topic wangjuan_topic1 --new-consumer --consumer.config ./etc/kafka/consumer.properties --bootstrap-server DCP187:9092
[2017-03-02 13:44:38,398] WARN The configuration zookeeper.connection.timeout.ms = 6000 was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2017-03-02 13:44:38,575] WARN Error while fetching metadata with correlation id 1 : {wangjuan_topic1=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2017-03-02 13:44:38,677] WARN Error while fetching metadata with correlation id 2 : {wangjuan_topic1=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2017-03-02 13:44:38,780] WARN Error while fetching metadata with correlation id 3 : {wangjuan_topic1=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
复制代码
解决方法:配置文件中下面的参数中的 User 的 U 必须是大写;
super.users=User:kafka
或者有可能是 server.properties 中的 adver.listen 的 IP 是不对的,有可能是代码中写死的 IP;
第三种错误的可能的解决方法:
无法消费,则查看 kafka 的启动日志中的报错信息:日志文件的所属组不对,应该是 hadoop;
或者,查看 kafka 对应的 zookeeper 的配置后缀,是否已经更改,如果更改了,则 topic 需要重新生成才行;
第三种错误:消费的 tomcat 报错:
[2017-04-01 06:37:21,823] [INFO] [Thread-5] [AbstractCoordinator.java line:542] Marking the coordinator DCP187:9092 (id: 2147483647 rack: null) dead for group test-consumer-group
[2017-04-01 06:37:21,825] [WARN] [Thread-5] [ConsumerCoordinator.java line:476] Auto offset commit failed for group test-consumer-group: Commit offsets failed with retriable exception. You should retry committing offsets.
复制代码
更改代码中,tomcat 的心跳超时时间如下:
没有改之前的:;
./webapps/web/WEB-INF/classes/com/ai/bdx/dcp/hadoop/service/impl/DCPKafkaConsumer.class;
复制代码
重启后,日志中显示:
[2017-04-01 10:14:56,167] [INFO] [Thread-5] [AbstractCoordinator.java line:542] Marking the coordinator DCP187:9092 (id: 2147483647 rack: null) dead for group test-consumer-group
[2017-04-01 10:14:56,286] [INFO] [Thread-5] [AbstractCoordinator.java line:505] Discovered coordinator DCP187:9092 (id: 2147483647 rack: null) for group test-consumer-group.
复制代码
20、kafka-创建 topic 时错误
创建 topic 时报错:
[2017-04-10 10:32:23,776] WARN SASL configuration failed: javax.security.auth.login.LoginException: Checksum failed Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
Exception in thread "main" org.I0Itec.zkclient.exception.ZkAuthFailedException: Authentication failure
at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:946)
at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:923)
at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1230)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:156)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:130)
at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:75)
at kafka.utils.ZkUtils$.apply(ZkUtils.scala:57)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:54)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
复制代码
问题定位:是 jaas 文件有问题:
解决方法:server.properties 文件中的 super.user 要和 jaas 文件中的 keytab 的 principle 一致;
server.properties:super.users=User:client
kafka_server_jaas.conf 文件改为:
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
useTicketCache=false
serviceName=kafka
keyTab="/data/data1/confluent-3.0.0/kafka.keytab"
principal="kafka@DCP.COM";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/home/client/client.keytab"
principal="client/DCP187@DCP.COM";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
useTicketCache=false
serviceName=zookeeper
keyTab="/home/client/client.keytab"
principal="client/DCP187@DCP.COM";
};
复制代码
21、DataCaptain--->报错(kafka)组件原因
报错提示
Not has broker can connection metadataBrokerList
java.lang.IllegalArgumentException: requirement failed: advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address.
at scala.Predef$.require(Predef.scala:277)
at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1203)
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1170)
at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:881)
at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:878)
at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
at kafka.Kafka$.main(Kafka.scala:82)
at kafka.Kafka.main(Kafka.scala)
复制代码
解决方法:修改 server.properties
advertised.listeners=PLAINTEXT://{ip}:9092 # ip可以内网、外网ip、127.0.0.1 或域名
复制代码
解析:
server.properties 中有两个 listeners。
listeners:启动 kafka 服务监听的 ip 和端口,可以监听内网 ip 和 0.0.0.0(不能为外网 ip),默认为 java.net.InetAddress.getCanonicalHostName()获取的 ip。
advertised.listeners:生产者和消费者连接的地址,kafka 会把该地址注册到 zookeeper 中,所以只能为除 0.0.0.0 之外的合法 ip 或域名 ,默认和 listeners 的配置一致。
22、报错“TimeoutException(Java)” 或“run out of brokers(Go)” 或 “Authentication failed for user(Python)”
首先,请确保 servers 配置正确,然后通过 ping 以及 telnet 排除网络问题。假设网络运行正常,云上 Kafka 在建立连接时,会对客户端进行鉴权。鉴权方式(sasl_mechanism)有两种:
如果鉴权失败,云上 Kafka 会掐掉连接。另外,请仔细参考各个 demo 的 readme 以配置正确。
23、报错“leader is not available”或“leader is in election”
首先,检查 Topic 是否有创建;其次,检查 Topic 类型是否为“Kafka 消息”。
24、报错“TOPIC_AUTHORIZATION_FAILED”或“Topic or group not authorized” 的类似字眼
此类报错通常代表权限不对,即您的 AccessKey 没有访问对应 Topic 或者 Consumer ID(又称 group 或 consumer group)的权限。
Topic 和 Consumer ID 的权限规则如下:
注意:请仔细检查 AccessKey、SecretKey 来自哪个账号,避免用错。
25、Java 客户端(包括 Spring 框架)报错“Failed to send SSL close message”
该错误后面通常还会跟“connection reset by peer”或“broken pipe”。该错误主要是因为,服务端是 VIP 网络环境,会主动掐掉空闲连接。建议在遇到此类错误时,重试发送一次。Java 客户端内部有重试机制,可以参考 Producer 最佳实践 进行配置。其它语言客户端,请参考相关文档。你可以通过修改日志级别来避免该错误,以 log4j 为例,加上下面这行配置:
log4j.logger.org.apache.kafka.common.network.SslTransportLayer=ERROR
26、Spring Cloud Stream 消费信息时报错"arrayindexoutofboundexception"
该错误的产生是因为 Spring Cloud 会按自己的格式解析消息内容。如果您同时使用 Spring Cloud 发送和消费,则不会有问题,这也是推荐的使用方式。
如果您使用其他方式发送,例如,调用 Kafka 原生的 Java 客户端发送,那么用 Spring Cloud 消费时,则需要设置 headerMode 为 raw,即禁用解析消息内容。具体信息参见 Spring Cloud 官网。
27、 报错“No worthy mechs found”
C++客户端或者包装 C++的客户端会报这个错。这个错说明缺少一个系统库:cyrus-sasl-plain。针对 yum 管理的系统的安装方法为:yum install cyrus-sasl{,-plain}。
28、什么是 CID, Consumer ID, Consumer Group 和 Group ID
这几个名称指代的都是同一个概念:Kafka 的消费组 (Consumer Group)。CID 是 Consumer ID 的缩写,也等同于 Group ID (Consumer Group ID 的简称,指代一个特定的消费组)。每个消费组可以包含多个消费实例,一起负载均衡消费订阅的 topic,所以 CID 与 topic 的关系可以总结为:每个 CID 可以订阅多个 topic,每个 topic 也可以被多个 CID 订阅;各个 CID 之间相对独立,互不影响。
假设 CID 1 订阅了 Topic 1,则 Topic 1 的每条消息会发给 CID 1 的某个实例,且只会发给其中一个实例。如果 CID 2 也订阅了 Topic 1,Topic 1 的每条消息也会发给 CID 2 里的某个实例,且只发给其中一个实例。
[backcolor=transparent]注意:控制台里看到的 Producer ID 是 Aliware MQ 里的概念,Kafka 不会用到, 后续会改进优化。
29、如何查看消费进度
如需查看某个特定订阅消费者的消费进度,请按照如下步骤操作:
在 ONS 控制台左侧点击[backcolor=transparent]发布订阅管理-订阅管理。
在搜索框中输入 topic 或者 Cosumer ID,点击[backcolor=transparent]搜索,查找你想查看消费进度的 Consumer ID。
找到该 Consumer ID 后,点击操作列中的[backcolor=transparent]消费者状态,在跳出的页面中可查看[backcolor=transparent]堆积总量。
堆积总量 = 所有的消息数 - 已经消费的消息数[backcolor=transparent]注意:目前消费者状态都会显示不在线,未来会进行优化。除了堆积总量,其它信息仅供参考。
30、消息堆积了怎么办
消息堆积,一般都是消费速度过慢或者消费线程阻塞造成的。建议打印出消费线程的堆栈情况查看线程执行情况。
注意:Java 进程可以用 jstack。
31、java.lang.OutOfMemoryError:Map failed
发生上述问题,原因是发生 OOM 啦,会导致 kafka 进程直接崩溃掉!因此只能重新启动 broker 节点了,但是为了让 broker 节点启动成功快一点的话,可以将一个参数的之调大:“num.recovery.threads.per.data.dir=30”,没错就是他,将他的值越调大越好。这个线程数主要是负责停止和启动 broker 的。因为是 32core 的服务器,给他分配了 30 个,可以尽量的把这个参数调大,便于该 broker 节点更快的加入到 ISR 列表当中。
首先,根据上面的提示恢复服务是第一件要做的事情,接下来,得分析分析为什么会出这个事情,给 kafka 集群分配了 20G 内存,如下图:
查看了近 2 个星期的监控图,发现可用内存在持续减少,初步怀疑可能发生了内存泄漏。
这也只是怀疑,因为出错之前没有监控 JVM 的情况,吃一堑,长一智,赶紧用 zabbix 将 kafka 的 jvm 监控起来。
之后,调整了下面的参数,先观察一段时间。
sysctl vm.max_map_count=262144 #进程中内存影视区域的最大数量。在调用 malloc,直接调用 mmap 和 mprotect 和加载共享库时产生内存映射区域。虽然大多数程序需要的内存映射区域不超过 1000 个,但是特定的程序,特别是 malloc 调试器,可能需要很多,例如每次分破都会产生一道两个内存映射区域,默认值是 65536。
解决方案:
第一:kafka 的 heap 内存分配不要大于 6G,我们知道 kafka 并不吃堆内存,如果设置默认的 1G 的话也并不太合理。推荐设置配置为 6G 即可。服务器是 32G 内存,然后给 kafka 就分配了 22G 的 heap 内存。经过参考《kafka 权威指南》和《Apache kafka 实战》两位大佬的笔记,他们推荐设置 kafka 的 heap 大小为 5G 或者 6G。最后我将 kafka 的 heap 内存配置成了 6G。
第二:调整“vm.max_map_count”参数,没错,就是上面提到的那个参数。在实际生产环境中,如果单台 broker 上 topic 数过多,用户可能碰到“java.lang.OutOfMemoryError:Map failed”的严重错误以导致 kafka 节点崩溃。这是因为大量创建 topic 将极大的消耗操作系统内存,用于内存映射操作。在这种情况下,用户需要调整“vm.max_map_count”参数。具体方法可以使用命令:“sysctl vm.max_map_count=N”来设置。该参数默认值是 65535,可以考虑线上环境设置更大的值,比如 262144 甚至更大。
数据治理方法论和实践小百科全书
标签体系下的用户画像建设小指南
4万字长文 | ClickHouse基础&实践&调优全视角解析
【面试&个人成长】2021年过半,社招和校招的经验之谈
大数据方向另一个十年开启 |《硬刚系列》第一版完结
我写过的关于成长/面试/职场进阶的文章
当我们在学习Hive的时候在学习什么?「硬刚Hive续集」
你好,我是王知无,一个大数据领域的硬核原创作者。
做过后端架构、数据中间件、数据平台 &架构、算法工程化。
专注大数据领域实时动态 &技术提升 &个人成长 &职场进阶,欢迎关注。
评论