提示:本文可能已过期,请点击原文查看:【kafka源码】/log_dir_event_notification的LogDir脱机事件通知
前言
我们会看到 zk 的数据中有一个节点/log_dir_event_notification/,这是一个序列号持久节点这个节点在 kafka 中承担的作用是: 当某个 Broker 上的 LogDir 出现异常时(比如磁盘损坏,文件读写失败,等等异常): 向 zk 中谢增一个子节点/log_dir_event_notification/log_dir_event_序列号 ;Controller 监听到这个节点的变更之后,会向 Brokers 们发送LeaderAndIsrRequest请求; 然后做一些副本脱机的善后操作
1 源码分析
这里说的 dirLog 是 server.properties 中配置的log.dir 例如
副本异常处理
首先我们找到有使用这个节点的源码;kafka 启动之初有调用
ReplicaManager.startup()
def startup(): Unit = { // 省略... //当inter-broker protocol (IBP) < 1.0的时候,如果存在logDir的一些异常则直接让整个Broker启动失败; val haltBrokerOnFailure = config.interBrokerProtocolVersion < KAFKA_1_0_IV0 logDirFailureHandler = new LogDirFailureHandler("LogDirFailureHandler", haltBrokerOnFailure) logDirFailureHandler.start() } private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: Boolean) extends ShutdownableThread(name) { override def doWork(): Unit = { //从队列 offlineLogDirQueue 取数据 val newOfflineLogDir = logDirFailureChannel.takeNextOfflineLogDir() if (haltBrokerOnDirFailure) { fatal(s"Halting broker because dir $newOfflineLogDir is offline") Exit.halt(1) } handleLogDirFailure(newOfflineLogDir) } }
复制代码
// logDir should be an absolute path // sendZkNotification is needed for unit test def handleLogDirFailure(dir: String, sendZkNotification: Boolean = true): Unit = { // 省略... logManager.handleLogDirFailure(dir)
if (sendZkNotification) zkClient.propagateLogDirEvent(localBrokerId) warn(s"Stopped serving replicas in dir $dir") }
复制代码
代码比较长,就直接概况一下好了:主要是当读取或操作 LogDir 的时候出现了异常就会执行到这里,有可能是磁盘脱机了,或者文件突然没有读取写入权限等等之类的一些IOException异常;那么 Broker 就需要做一些处理;如下
做个判断inter.broker.protocol.version 协议版本 < 1.0 的时候 时候直接退出;那个时候还不支持单 Broker 上存在多个 logDir;
副本停止 fetche 数据
标记分区下线
可能移除一些监控信息
如果当前的log_dir 都脱机(或者异常了), 那么久可以直接 shutdown 这台机器了
如果还有其他的log_dir 还有在线的, 那么继续做一些其他的清理操作;
创建持久序列节点/log_dir_event_notification/log_dir_event_+序列号;数据是 BrokerID;例如:/log_dir_event_notification/log_dir_event_0000000003{"version":1,"broker":20003,"event":1}
PS: log_dir 是可以在一台 Broker 配置多个路径的 ,用逗号隔开
LogDir 发生异常
比如说在 给文件加锁的时候 lockLogDirs,磁盘损坏了就抛出异常IOException
/** * Lock all the given directories */ private def lockLogDirs(dirs: Seq[File]): Seq[FileLock] = { dirs.flatMap { dir => try { val lock = new FileLock(new File(dir, LockFile)) if (!lock.tryLock()) throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParent + ". A Kafka instance in another process or thread is using this directory.") Some(lock) } catch { case e: IOException => logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while locking directory $dir", e) None } } } def maybeAddOfflineLogDir(logDir: String, msg: => String, e: IOException): Unit = { error(msg, e) if (offlineLogDirs.putIfAbsent(logDir, logDir) == null) offlineLogDirQueue.add(logDir) }
复制代码
offlineLogDirQueue添加了一个异常队列之后就回到上面的副本异常处理代码了, 上面可是一致在queue.take()的
Controller 监听 zk 节点变更
KafkaController.processLogDirEventNotification
private def processLogDirEventNotification(): Unit = { if (!isActive) return val sequenceNumbers = zkClient.getAllLogDirEventNotifications try { val brokerIds = zkClient.getBrokerIdsFromLogDirEvents(sequenceNumbers) //尝试将这台Broker上的所有副本 走一下状态流转 到 OnlineReplica onBrokerLogDirFailure(brokerIds) } finally { // delete processed children zkClient.deleteLogDirEventNotifications(sequenceNumbers, controllerContext.epochZkVersion) } }
复制代码
主要将从 zk 节点 /log_dir_event_notification/log_dir_event_序列号 中获取到的数据的 Broker 上的所有副本进行一个副本状态流转 ->OnlineReplica ;关于状态机的流转请看 【kafka 源码】Controller 中的状态机
给所有 broker 发送LeaderAndIsrRequest请求,让 brokers 们去查询他们的副本的状态,如果副本 logDir 已经离线则返回KAFKA_STORAGE_ERROR异常;
完事之后会删除节点
2 源码总结
3Q&A
评论