
Redis - Cluster - 源码阅读(二)

发布于: 1 小时前
Redis - Cluster - 源码阅读(二)



int clusterProcessPacket(clusterLink *link) {
// 完整消息 clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
// 消息的长度(消息头 + 正文) uint32_t totlen = ntohl(hdr->totlen);
// 消息的类型 uint16_t type = ntohs(hdr->type);
// 消息发送者的标识 uint16_t flags = ntohs(hdr->flags);
uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
clusterNode *sender;
// 更新接受消息计数器 server.cluster->stats_bus_messages_received++;
redisLog(REDIS_DEBUG,"--- Processing packet of type %d, %lu bytes", type, (unsigned long) totlen);
/* Perform sanity checks */ // 合法性检查 if (totlen < 16) return 1; /* At least signature, version, totlen, count. */ if (ntohs(hdr->ver) != 0) return 1; /* Can't handle versions other than 0.*/ if (totlen > sdslen(link->rcvbuf)) return 1; if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) { uint16_t count = ntohs(hdr->count); uint32_t explen; /* expected length of this packet */
explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); explen += (sizeof(clusterMsgDataGossip)*count); if (totlen != explen) return 1; } else if (type == CLUSTERMSG_TYPE_FAIL) { uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataFail); if (totlen != explen) return 1; } else if (type == CLUSTERMSG_TYPE_PUBLISH) { uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataPublish) + ntohl(hdr->data.publish.msg.channel_len) + ntohl(hdr->data.publish.msg.message_len); if (totlen != explen) return 1; } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST || type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK || type == CLUSTERMSG_TYPE_MFSTART) { uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
if (totlen != explen) return 1; } else if (type == CLUSTERMSG_TYPE_UPDATE) { uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataUpdate); if (totlen != explen) return 1; }
//从当前集群中查找sender,如果是已知,则找出来,否则为null sender = clusterLookupNode(hdr->sender); // sender是已知节点并且不是 HANDSHAKE 节点,那么更新节点的配置纪元信息 if (sender && !nodeInHandshake(sender)) { senderCurrentEpoch = ntohu64(hdr->currentEpoch); senderConfigEpoch = ntohu64(hdr->configEpoch); //更新集群纪元 if (senderCurrentEpoch > server.cluster->currentEpoch) server.cluster->currentEpoch = senderCurrentEpoch; //更新节点配置纪元 if (senderConfigEpoch > sender->configEpoch) { sender->configEpoch = senderConfigEpoch; clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_FSYNC_CONFIG); }
//更新复制偏移量 sender->repl_offset = ntohu64(hdr->offset); sender->repl_offset_time = mstime(); /* If we are a slave performing a manual failover and our master * sent its offset while already paused, populate the MF state. */ if (server.cluster->mf_end && nodeIsSlave(myself) && myself->slaveof == sender && hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED && server.cluster->mf_master_offset == 0) { server.cluster->mf_master_offset = sender->repl_offset; redisLog(REDIS_WARNING, "Received replication offset for paused " "master manual failover: %lld", server.cluster->mf_master_offset); } }
/* Process packets by type. */ // 根据消息的类型,处理节点
// 这是一条 PING 消息或者 MEET 消息 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) { redisLog(REDIS_DEBUG,"Ping packet received: %p", (void*)link->node);
/* Add this node if it is new for us and the msg type is MEET. * * 如果当前节点是第一次遇见这个节点,并且对方发来的是 MEET 信息, * 那么将这个节点添加到集群的节点列表里面。 * * In this stage we don't try to add the node with the right * flags, slaveof pointer, and so forth, as this details will be * resolved when we'll receive PONGs from the node. * * 节点目前的 flag 、 slaveof 等属性的值都是未设置的, * 等当前节点向对方发送 PING 命令之后, * 这些信息可以从对方回复的 PONG 信息中取得。 */ if (!sender && type == CLUSTERMSG_TYPE_MEET) { clusterNode *node;
// 创建 HANDSHAKE 状态的新节点 node = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
// 设置 IP 和端口 nodeIp2String(node->ip,link); node->port = ntohs(hdr->port);
// 将新节点添加到集群 clusterAddNode(node);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); }
// 分析并取出消息中的 gossip 节点信息 clusterProcessGossipSection(hdr,link);
// 向目标节点返回一个 PONG clusterSendPing(link,CLUSTERMSG_TYPE_PONG); }

// 这是一条 PING 、 PONG 或者 MEET 消息 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) { redisLog(REDIS_DEBUG,"%s packet received: %p", type == CLUSTERMSG_TYPE_PING ? "ping" : "pong", (void*)link->node);
// 当前这条消息的发送源头是可以从两个地方的信息得来:1、link->node 2、sender // 这两个信息理论上都应该指向同一个节点,信息应该相同,但实际是可能不同的,以下主要解决冲突的问题 if (link->node) { // 如果两者地址(IP和端口)不同: // link ->node还没有完成handshake代表信息是最新的,应该用该信息更新sender信息并且释放掉link->node // 因为这两个信息实际都是指向同一个节点,只需要一个即可 if (nodeInHandshake(link->node)) { /* If we already have this node, try to change the * IP/port of the node with the new one. */ if (sender) { redisLog(REDIS_VERBOSE, "Handshake: we already know node %.40s, " "updating the address if needed.", sender->name); // 如果有需要的话,更新节点的地址 if (nodeUpdateAddressIfNeeded(sender,link,ntohs(hdr->port))) { clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_UPDATE_STATE); } /* Free this node as we alrady have it. This will * cause the link to be freed as well. */ // 释放节点 freeClusterNode(link->node); return 0; }
/* First thing to do is replacing the random name with the * right node name if this was a handshake stage. */ // 用节点的真名替换在 HANDSHAKE 时创建的随机名字 clusterRenameNode(link->node, hdr->sender); redisLog(REDIS_DEBUG,"Handshake with node %.40s completed.", link->node->name);
// 关闭 HANDSHAKE 状态 link->node->flags &= ~REDIS_NODE_HANDSHAKE;
// 设置节点的角色 link->node->flags |= flags&(REDIS_NODE_MASTER|REDIS_NODE_SLAVE);
// 如果两者ID不同 // 断开连接,设置状态为noaddr // 经过上面对于地址冲突的处理,集群中同一个节点只会有一个node实例,所以这里设置link->node即可 // 大部分情况下link->node都应该是空的,通过hdr->sender即可找到集群中的sender,什么情况link->node // 为空呢?看代码只知道还没完成握手时,link->node非空,所以下面link->node非空但不是握手状态有点不理解 } else if (memcmp(link->node->name,hdr->sender, REDIS_CLUSTER_NAMELEN) != 0) { /* If the reply has a non matching node ID we * disconnect this node and set it as not having an associated * address. */ // 那么将这个节点设为 NOADDR // 并断开连接 redisLog(REDIS_DEBUG,"PONG contains mismatching sender ID"); link->node->flags |= REDIS_NODE_NOADDR; link->node->ip[0] = '\0'; link->node->port = 0;
// 断开连接 freeClusterLink(link);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); return 0; } }
// 如果发送的消息为 PING 并且发送者不在 HANDSHAKE 状态,那么更新发送者的信息 if (sender && type == CLUSTERMSG_TYPE_PING && !nodeInHandshake(sender) && nodeUpdateAddressIfNeeded(sender,link,ntohs(hdr->port))) { clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_UPDATE_STATE); }
/* Update our info about the node */ // 如果这是一条 PONG 消息,那么更新我们关于 node 节点的认识 if (link->node && type == CLUSTERMSG_TYPE_PONG) {
// 最后一次接到该节点的 PONG 的时间 link->node->pong_received = mstime();
// 清零最近一次等待 PING 命令的时间 link->node->ping_sent = 0;
/* The PFAIL condition can be reversed without external * help if it is momentary (that is, if it does not * turn into a FAIL state). * * 接到节点的 PONG 回复,我们可以移除节点的 PFAIL 状态。 * * The FAIL condition is also reversible under specific * conditions detected by clearNodeFailureIfNeeded(). * * 如果节点的状态为 FAIL , * 那么是否撤销该状态要根据 clearNodeFailureIfNeeded() 函数来决定。 */ if (nodeTimedOut(link->node)) { // 撤销 PFAIL link->node->flags &= ~REDIS_NODE_PFAIL;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_UPDATE_STATE); } else if (nodeFailed(link->node)) { // 看是否可以撤销 FAIL clearNodeFailureIfNeeded(link->node); } }
/* Check for role switch: slave -> master or master -> slave. */ // 检测节点的身份信息,并在需要时进行更新 if (sender) {
// 发送消息的节点的 slaveof 为 REDIS_NODE_NULL_NAME // 那么 sender 就是一个主节点 if (!memcmp(hdr->slaveof,REDIS_NODE_NULL_NAME, sizeof(hdr->slaveof))) { /* Node is a master. */ // 设置 sender 为主节点 clusterSetNodeAsMaster(sender);
// sender 的 slaveof 不为空,那么这是一个从节点 } else {
/* Node is a slave. */ // 取出 sender 的主节点 clusterNode *master = clusterLookupNode(hdr->slaveof);
// sender 由主节点变成了从节点,重新配置 sender if (nodeIsMaster(sender)) { /* Master turned into a slave! Reconfigure the node. */
// 删除所有由该节点负责的槽 clusterDelNodeSlots(sender);
// 更新标识 sender->flags &= ~REDIS_NODE_MASTER; sender->flags |= REDIS_NODE_SLAVE;
/* Remove the list of slaves from the node. */ // 移除 sender 的从节点名单 if (sender->numslaves) clusterNodeResetSlaves(sender);
/* Update config and state. */ clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_UPDATE_STATE); }
/* Master node changed for this slave? */
// 检查 sender 的主节点是否变更 if (master && sender->slaveof != master) { // 如果 sender 之前的主节点不是现在的主节点 // 那么在旧主节点的从节点列表中移除 sender if (sender->slaveof) clusterNodeRemoveSlave(sender->slaveof,sender);
// 并在新主节点的从节点列表中添加 sender clusterNodeAddSlave(master,sender);
// 更新 sender 的主节点 sender->slaveof = master;
/* Update config. */ clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); } } }
/* Update our info about served slots. * * 更新当前节点对 sender 所处理槽的认识。 * * Note: this MUST happen after we update the master/slave state * so that REDIS_NODE_MASTER flag will be set. * * 这部分的更新 *必须* 在更新 sender 的主/从节点信息之后, * 因为这里需要用到 REDIS_NODE_MASTER 标识。 */
/* Many checks are only needed if the set of served slots this * instance claims is different compared to the set of slots we have * for it. Check this ASAP to avoid other computational expansive * checks later. */ clusterNode *sender_master = NULL; /* Sender or its master if slave. */ int dirty_slots = 0; /* Sender claimed slots don't match my view? */
if (sender) { sender_master = nodeIsMaster(sender) ? sender : sender->slaveof; if (sender_master) { dirty_slots = memcmp(sender_master->slots, hdr->myslots,sizeof(hdr->myslots)) != 0; } }
/* 1) If the sender of the message is a master, and we detected that * the set of slots it claims changed, scan the slots to see if we * need to update our configuration. */ // 如果 sender 是主节点,并且 sender 的槽布局出现了变动 // 那么检查当前节点对 sender 的槽布局设置,看是否需要进行更新 if (sender && nodeIsMaster(sender) && dirty_slots) clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);
/* 2) We also check for the reverse condition, that is, the sender * claims to serve slots we know are served by a master with a * greater configEpoch. If this happens we inform the sender. * * 检测和条件 1 的相反条件,也即是, * sender 处理的槽的配置纪元比当前节点已知的某个节点的配置纪元要低, * 如果是这样的话,通知 sender 。 * * This is useful because sometimes after a partition heals, a * reappearing master may be the last one to claim a given set of * hash slots, but with a configuration that other instances know to * be deprecated. Example: * * 这种情况可能会出现在网络分裂中, * 一个重新上线的主节点可能会带有已经过时的槽布局。 * * 比如说: * * A and B are master and slave for slots 1,2,3. * A 负责槽 1 、 2 、 3 ,而 B 是 A 的从节点。 * * A is partitioned away, B gets promoted. * A 从网络中分裂出去,B 被提升为主节点。 * * B is partitioned away, and A returns available. * B 从网络中分裂出去, A 重新上线(但是它所使用的槽布局是旧的)。 * * Usually B would PING A publishing its set of served slots and its * configEpoch, but because of the partition B can't inform A of the * new configuration, so other nodes that have an updated table must * do it. In this way A will stop to act as a master (or can try to * failover if there are the conditions to win the election). * * 在正常情况下, B 应该向 A 发送 PING 消息,告知 A ,自己(B)已经接替了 * 槽 1、 2、 3 ,并且带有更更的配置纪元,但因为网络分裂的缘故, * 节点 B 没办法通知节点 A , * 所以通知节点 A 它带有的槽布局已经更新的工作就交给其他知道 B 带有更高配置纪元的节点来做。 * 当 A 接到其他节点关于节点 B 的消息时, * 节点 A 就会停止自己的主节点工作,又或者重新进行故障转移。 */ if (sender && dirty_slots) { int j;
for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
// 检测 slots 中的槽 j 是否已经被指派 if (bitmapTestBit(hdr->myslots,j)) {
// 当前节点认为槽 j 由 sender 负责处理, // 或者当前节点认为该槽未指派,那么跳过该槽 if (server.cluster->slots[j] == sender || server.cluster->slots[j] == NULL) continue;
// 当前节点槽 j 的配置纪元比 sender 的配置纪元要大 if (server.cluster->slots[j]->configEpoch > senderConfigEpoch) { redisLog(REDIS_VERBOSE, "Node %.40s has old slots configuration, sending " "an UPDATE message about %.40s", sender->name, server.cluster->slots[j]->name);
// 向 sender 发送关于槽 j 的更新信息 clusterSendUpdate(sender->link, server.cluster->slots[j]);
/* TODO: instead of exiting the loop send every other * UPDATE packet for other nodes that are the new owner * of sender's slots. */ break; } } } }
/* If our config epoch collides with the sender's try to fix * the problem. */ if (sender && nodeIsMaster(myself) && nodeIsMaster(sender) && senderConfigEpoch == myself->configEpoch) { clusterHandleConfigEpochCollision(sender); }
/* Get info from the gossip section */ // 分析并提取出消息 gossip 协议部分的信息 clusterProcessGossipSection(hdr,link);
// 这是一条 FAIL 消息: sender 告知当前节点,某个节点已经进入 FAIL 状态。 } else if (type == CLUSTERMSG_TYPE_FAIL) { clusterNode *failing;
if (sender) {
// 获取下线节点的消息 failing = clusterLookupNode(hdr->data.fail.about.nodename); // 下线的节点既不是当前节点,也没有处于 FAIL 状态 if (failing && !(failing->flags & (REDIS_NODE_FAIL|REDIS_NODE_MYSELF))) { redisLog(REDIS_NOTICE, "FAIL message received from %.40s about %.40s", hdr->sender, hdr->data.fail.about.nodename);
// 打开 FAIL 状态 failing->flags |= REDIS_NODE_FAIL; failing->fail_time = mstime(); // 关闭 PFAIL 状态 failing->flags &= ~REDIS_NODE_PFAIL; clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_UPDATE_STATE); } } else { redisLog(REDIS_NOTICE, "Ignoring FAIL message from unknonw node %.40s about %.40s", hdr->sender, hdr->data.fail.about.nodename); }
// 这是一条 PUBLISH 消息 } else if (type == CLUSTERMSG_TYPE_PUBLISH) { robj *channel, *message; uint32_t channel_len, message_len;
/* Don't bother creating useless objects if there are no * Pub/Sub subscribers. */ // 只在有订阅者时创建消息对象 if (dictSize(server.pubsub_channels) || listLength(server.pubsub_patterns)) { // 频道长度 channel_len = ntohl(hdr->data.publish.msg.channel_len);
// 消息长度 message_len = ntohl(hdr->data.publish.msg.message_len);
// 频道 channel = createStringObject( (char*)hdr->data.publish.msg.bulk_data,channel_len);
// 消息 message = createStringObject( (char*)hdr->data.publish.msg.bulk_data+channel_len, message_len); // 发送消息 pubsubPublishMessage(channel,message);
decrRefCount(channel); decrRefCount(message); }
// 这是一条请求获得故障迁移授权的消息: sender 请求当前节点为它进行故障转移投票 } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) { if (!sender) return 1; /* We don't know that node. */ // 如果条件允许的话,向 sender 投票,支持它进行故障转移 clusterSendFailoverAuthIfNeeded(sender,hdr);
// 这是一条故障迁移投票信息: sender 支持当前节点执行故障转移操作 } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) { if (!sender) return 1; /* We don't know that node. */
/* We consider this vote only if the sender is a master serving * a non zero number of slots, and its currentEpoch is greater or * equal to epoch where this node started the election. */ // 只有正在处理至少一个槽的主节点的投票会被视为是有效投票 // 只有符合以下条件, sender 的投票才算有效: // 1) sender 是主节点 // 2) sender 正在处理至少一个槽 // 3) sender 的配置纪元大于等于当前节点的配置纪元 if (nodeIsMaster(sender) && sender->numslots > 0 && senderCurrentEpoch >= server.cluster->failover_auth_epoch) { // 增加支持票数 server.cluster->failover_auth_count++;
/* Maybe we reached a quorum here, set a flag to make sure * we check ASAP. */ clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER); }
} else if (type == CLUSTERMSG_TYPE_MFSTART) { /* This message is acceptable only if I'm a master and the sender * is one of my slaves. */ if (!sender || sender->slaveof != myself) return 1; /* Manual failover requested from slaves. Initialize the state * accordingly. */ resetManualFailover(); server.cluster->mf_end = mstime() + REDIS_CLUSTER_MF_TIMEOUT; server.cluster->mf_slave = sender; pauseClients(mstime()+(REDIS_CLUSTER_MF_TIMEOUT*2)); redisLog(REDIS_WARNING,"Manual failover requested by slave %.40s.", sender->name); } else if (type == CLUSTERMSG_TYPE_UPDATE) { clusterNode *n; /* The node the update is about. */ uint64_t reportedConfigEpoch = ntohu64(hdr->data.update.nodecfg.configEpoch);
if (!sender) return 1; /* We don't know the sender. */
// 获取需要更新的节点 n = clusterLookupNode(hdr->data.update.nodecfg.nodename); if (!n) return 1; /* We don't know the reported node. */
// 消息的纪元并不大于节点 n 所处的配置纪元 // 无须更新 if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */
/* If in our current config the node is a slave, set it as a master. */ // 如果节点 n 为从节点,但它的槽配置更新了 // 那么说明这个节点已经变为主节点,将它设置为主节点 if (nodeIsSlave(n)) clusterSetNodeAsMaster(n);
/* Update the node's configEpoch. */ n->configEpoch = reportedConfigEpoch; clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_FSYNC_CONFIG);
/* Check the bitmap of served slots and udpate our * config accordingly. */ // 将消息中对 n 的槽布局与当前节点对 n 的槽布局进行对比 // 在有需要时更新当前节点对 n 的槽布局的认识 clusterUpdateSlotsConfigWith(n,reportedConfigEpoch, hdr->data.update.nodecfg.slots); } else { redisLog(REDIS_WARNING,"Received unknown packet type: %d", type); } return 1;}


还未添加个人签名 2020.04.30 加入



Redis - Cluster - 源码阅读(二)