写点什么

Redis - Cluster - 源码阅读(二)

发布于: 1 小时前
Redis - Cluster - 源码阅读(二)

节点间消息接收后的处理,主要干的事情:



源码如下,主要是受到不同消息后的各种处理。(正文正文正文正文正文正文正文正文正文正文正文正文正文正文正文正文正文正文正文正文正文正文正文正文正文正文正文正文正文正文正文正文正文正文正文正文)

int clusterProcessPacket(clusterLink *link) {
// 完整消息 clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
// 消息的长度(消息头 + 正文) uint32_t totlen = ntohl(hdr->totlen);
// 消息的类型 uint16_t type = ntohs(hdr->type);
// 消息发送者的标识 uint16_t flags = ntohs(hdr->flags);
uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
clusterNode *sender;
// 更新接受消息计数器 server.cluster->stats_bus_messages_received++;
redisLog(REDIS_DEBUG,"--- Processing packet of type %d, %lu bytes", type, (unsigned long) totlen);
/* Perform sanity checks */ // 合法性检查 if (totlen < 16) return 1; /* At least signature, version, totlen, count. */ if (ntohs(hdr->ver) != 0) return 1; /* Can't handle versions other than 0.*/ if (totlen > sdslen(link->rcvbuf)) return 1; if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) { uint16_t count = ntohs(hdr->count); uint32_t explen; /* expected length of this packet */
explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); explen += (sizeof(clusterMsgDataGossip)*count); if (totlen != explen) return 1; } else if (type == CLUSTERMSG_TYPE_FAIL) { uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataFail); if (totlen != explen) return 1; } else if (type == CLUSTERMSG_TYPE_PUBLISH) { uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataPublish) + ntohl(hdr->data.publish.msg.channel_len) + ntohl(hdr->data.publish.msg.message_len); if (totlen != explen) return 1; } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST || type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK || type == CLUSTERMSG_TYPE_MFSTART) { uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
if (totlen != explen) return 1; } else if (type == CLUSTERMSG_TYPE_UPDATE) { uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataUpdate); if (totlen != explen) return 1; }
//从当前集群中查找sender,如果是已知,则找出来,否则为null sender = clusterLookupNode(hdr->sender); // sender是已知节点并且不是 HANDSHAKE 节点,那么更新节点的配置纪元信息 if (sender && !nodeInHandshake(sender)) { senderCurrentEpoch = ntohu64(hdr->currentEpoch); senderConfigEpoch = ntohu64(hdr->configEpoch); //更新集群纪元 if (senderCurrentEpoch > server.cluster->currentEpoch) server.cluster->currentEpoch = senderCurrentEpoch; //更新节点配置纪元 if (senderConfigEpoch > sender->configEpoch) { sender->configEpoch = senderConfigEpoch; clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_FSYNC_CONFIG); }
//更新复制偏移量 sender->repl_offset = ntohu64(hdr->offset); sender->repl_offset_time = mstime(); /* If we are a slave performing a manual failover and our master * sent its offset while already paused, populate the MF state. */ if (server.cluster->mf_end && nodeIsSlave(myself) && myself->slaveof == sender && hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED && server.cluster->mf_master_offset == 0) { server.cluster->mf_master_offset = sender->repl_offset; redisLog(REDIS_WARNING, "Received replication offset for paused " "master manual failover: %lld", server.cluster->mf_master_offset); } }
/* Process packets by type. */ // 根据消息的类型,处理节点
// 这是一条 PING 消息或者 MEET 消息 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) { redisLog(REDIS_DEBUG,"Ping packet received: %p", (void*)link->node);
/* Add this node if it is new for us and the msg type is MEET. * * 如果当前节点是第一次遇见这个节点,并且对方发来的是 MEET 信息, * 那么将这个节点添加到集群的节点列表里面。 * * In this stage we don't try to add the node with the right * flags, slaveof pointer, and so forth, as this details will be * resolved when we'll receive PONGs from the node. * * 节点目前的 flag 、 slaveof 等属性的值都是未设置的, * 等当前节点向对方发送 PING 命令之后, * 这些信息可以从对方回复的 PONG 信息中取得。 */ if (!sender && type == CLUSTERMSG_TYPE_MEET) { clusterNode *node;
// 创建 HANDSHAKE 状态的新节点 node = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
// 设置 IP 和端口 nodeIp2String(node->ip,link); node->port = ntohs(hdr->port);
// 将新节点添加到集群 clusterAddNode(node);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); }
// 分析并取出消息中的 gossip 节点信息 clusterProcessGossipSection(hdr,link);
// 向目标节点返回一个 PONG clusterSendPing(link,CLUSTERMSG_TYPE_PONG); }

// 这是一条 PING 、 PONG 或者 MEET 消息 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) { redisLog(REDIS_DEBUG,"%s packet received: %p", type == CLUSTERMSG_TYPE_PING ? "ping" : "pong", (void*)link->node);
// 当前这条消息的发送源头是可以从两个地方的信息得来:1、link->node 2、sender // 这两个信息理论上都应该指向同一个节点,信息应该相同,但实际是可能不同的,以下主要解决冲突的问题 if (link->node) { // 如果两者地址(IP和端口)不同: // link ->node还没有完成handshake代表信息是最新的,应该用该信息更新sender信息并且释放掉link->node // 因为这两个信息实际都是指向同一个节点,只需要一个即可 if (nodeInHandshake(link->node)) { /* If we already have this node, try to change the * IP/port of the node with the new one. */ if (sender) { redisLog(REDIS_VERBOSE, "Handshake: we already know node %.40s, " "updating the address if needed.", sender->name); // 如果有需要的话,更新节点的地址 if (nodeUpdateAddressIfNeeded(sender,link,ntohs(hdr->port))) { clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_UPDATE_STATE); } /* Free this node as we alrady have it. This will * cause the link to be freed as well. */ // 释放节点 freeClusterNode(link->node); return 0; }
/* First thing to do is replacing the random name with the * right node name if this was a handshake stage. */ // 用节点的真名替换在 HANDSHAKE 时创建的随机名字 clusterRenameNode(link->node, hdr->sender); redisLog(REDIS_DEBUG,"Handshake with node %.40s completed.", link->node->name);
// 关闭 HANDSHAKE 状态 link->node->flags &= ~REDIS_NODE_HANDSHAKE;
// 设置节点的角色 link->node->flags |= flags&(REDIS_NODE_MASTER|REDIS_NODE_SLAVE);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
// 如果两者ID不同 // 断开连接,设置状态为noaddr // 经过上面对于地址冲突的处理,集群中同一个节点只会有一个node实例,所以这里设置link->node即可 // 大部分情况下link->node都应该是空的,通过hdr->sender即可找到集群中的sender,什么情况link->node // 为空呢?看代码只知道还没完成握手时,link->node非空,所以下面link->node非空但不是握手状态有点不理解 } else if (memcmp(link->node->name,hdr->sender, REDIS_CLUSTER_NAMELEN) != 0) { /* If the reply has a non matching node ID we * disconnect this node and set it as not having an associated * address. */ // 那么将这个节点设为 NOADDR // 并断开连接 redisLog(REDIS_DEBUG,"PONG contains mismatching sender ID"); link->node->flags |= REDIS_NODE_NOADDR; link->node->ip[0] = '\0'; link->node->port = 0;
// 断开连接 freeClusterLink(link);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); return 0; } }
// 如果发送的消息为 PING 并且发送者不在 HANDSHAKE 状态,那么更新发送者的信息 if (sender && type == CLUSTERMSG_TYPE_PING && !nodeInHandshake(sender) && nodeUpdateAddressIfNeeded(sender,link,ntohs(hdr->port))) { clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_UPDATE_STATE); }
/* Update our info about the node */ // 如果这是一条 PONG 消息,那么更新我们关于 node 节点的认识 if (link->node && type == CLUSTERMSG_TYPE_PONG) {
// 最后一次接到该节点的 PONG 的时间 link->node->pong_received = mstime();
// 清零最近一次等待 PING 命令的时间 link->node->ping_sent = 0;
/* The PFAIL condition can be reversed without external * help if it is momentary (that is, if it does not * turn into a FAIL state). * * 接到节点的 PONG 回复,我们可以移除节点的 PFAIL 状态。 * * The FAIL condition is also reversible under specific * conditions detected by clearNodeFailureIfNeeded(). * * 如果节点的状态为 FAIL , * 那么是否撤销该状态要根据 clearNodeFailureIfNeeded() 函数来决定。 */ if (nodeTimedOut(link->node)) { // 撤销 PFAIL link->node->flags &= ~REDIS_NODE_PFAIL;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_UPDATE_STATE); } else if (nodeFailed(link->node)) { // 看是否可以撤销 FAIL clearNodeFailureIfNeeded(link->node); } }
/* Check for role switch: slave -> master or master -> slave. */ // 检测节点的身份信息,并在需要时进行更新 if (sender) {
// 发送消息的节点的 slaveof 为 REDIS_NODE_NULL_NAME // 那么 sender 就是一个主节点 if (!memcmp(hdr->slaveof,REDIS_NODE_NULL_NAME, sizeof(hdr->slaveof))) { /* Node is a master. */ // 设置 sender 为主节点 clusterSetNodeAsMaster(sender);
// sender 的 slaveof 不为空,那么这是一个从节点 } else {
/* Node is a slave. */ // 取出 sender 的主节点 clusterNode *master = clusterLookupNode(hdr->slaveof);
// sender 由主节点变成了从节点,重新配置 sender if (nodeIsMaster(sender)) { /* Master turned into a slave! Reconfigure the node. */
// 删除所有由该节点负责的槽 clusterDelNodeSlots(sender);
// 更新标识 sender->flags &= ~REDIS_NODE_MASTER; sender->flags |= REDIS_NODE_SLAVE;
/* Remove the list of slaves from the node. */ // 移除 sender 的从节点名单 if (sender->numslaves) clusterNodeResetSlaves(sender);
/* Update config and state. */ clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_UPDATE_STATE); }
/* Master node changed for this slave? */
// 检查 sender 的主节点是否变更 if (master && sender->slaveof != master) { // 如果 sender 之前的主节点不是现在的主节点 // 那么在旧主节点的从节点列表中移除 sender if (sender->slaveof) clusterNodeRemoveSlave(sender->slaveof,sender);
// 并在新主节点的从节点列表中添加 sender clusterNodeAddSlave(master,sender);
// 更新 sender 的主节点 sender->slaveof = master;
/* Update config. */ clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); } } }
/* Update our info about served slots. * * 更新当前节点对 sender 所处理槽的认识。 * * Note: this MUST happen after we update the master/slave state * so that REDIS_NODE_MASTER flag will be set. * * 这部分的更新 *必须* 在更新 sender 的主/从节点信息之后, * 因为这里需要用到 REDIS_NODE_MASTER 标识。 */
/* Many checks are only needed if the set of served slots this * instance claims is different compared to the set of slots we have * for it. Check this ASAP to avoid other computational expansive * checks later. */ clusterNode *sender_master = NULL; /* Sender or its master if slave. */ int dirty_slots = 0; /* Sender claimed slots don't match my view? */
if (sender) { sender_master = nodeIsMaster(sender) ? sender : sender->slaveof; if (sender_master) { dirty_slots = memcmp(sender_master->slots, hdr->myslots,sizeof(hdr->myslots)) != 0; } }
/* 1) If the sender of the message is a master, and we detected that * the set of slots it claims changed, scan the slots to see if we * need to update our configuration. */ // 如果 sender 是主节点,并且 sender 的槽布局出现了变动 // 那么检查当前节点对 sender 的槽布局设置,看是否需要进行更新 if (sender && nodeIsMaster(sender) && dirty_slots) clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);
/* 2) We also check for the reverse condition, that is, the sender * claims to serve slots we know are served by a master with a * greater configEpoch. If this happens we inform the sender. * * 检测和条件 1 的相反条件,也即是, * sender 处理的槽的配置纪元比当前节点已知的某个节点的配置纪元要低, * 如果是这样的话,通知 sender 。 * * This is useful because sometimes after a partition heals, a * reappearing master may be the last one to claim a given set of * hash slots, but with a configuration that other instances know to * be deprecated. Example: * * 这种情况可能会出现在网络分裂中, * 一个重新上线的主节点可能会带有已经过时的槽布局。 * * 比如说: * * A and B are master and slave for slots 1,2,3. * A 负责槽 1 、 2 、 3 ,而 B 是 A 的从节点。 * * A is partitioned away, B gets promoted. * A 从网络中分裂出去,B 被提升为主节点。 * * B is partitioned away, and A returns available. * B 从网络中分裂出去, A 重新上线(但是它所使用的槽布局是旧的)。 * * Usually B would PING A publishing its set of served slots and its * configEpoch, but because of the partition B can't inform A of the * new configuration, so other nodes that have an updated table must * do it. In this way A will stop to act as a master (or can try to * failover if there are the conditions to win the election). * * 在正常情况下, B 应该向 A 发送 PING 消息,告知 A ,自己(B)已经接替了 * 槽 1、 2、 3 ,并且带有更更的配置纪元,但因为网络分裂的缘故, * 节点 B 没办法通知节点 A , * 所以通知节点 A 它带有的槽布局已经更新的工作就交给其他知道 B 带有更高配置纪元的节点来做。 * 当 A 接到其他节点关于节点 B 的消息时, * 节点 A 就会停止自己的主节点工作,又或者重新进行故障转移。 */ if (sender && dirty_slots) { int j;
for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
// 检测 slots 中的槽 j 是否已经被指派 if (bitmapTestBit(hdr->myslots,j)) {
// 当前节点认为槽 j 由 sender 负责处理, // 或者当前节点认为该槽未指派,那么跳过该槽 if (server.cluster->slots[j] == sender || server.cluster->slots[j] == NULL) continue;
// 当前节点槽 j 的配置纪元比 sender 的配置纪元要大 if (server.cluster->slots[j]->configEpoch > senderConfigEpoch) { redisLog(REDIS_VERBOSE, "Node %.40s has old slots configuration, sending " "an UPDATE message about %.40s", sender->name, server.cluster->slots[j]->name);
// 向 sender 发送关于槽 j 的更新信息 clusterSendUpdate(sender->link, server.cluster->slots[j]);
/* TODO: instead of exiting the loop send every other * UPDATE packet for other nodes that are the new owner * of sender's slots. */ break; } } } }
/* If our config epoch collides with the sender's try to fix * the problem. */ if (sender && nodeIsMaster(myself) && nodeIsMaster(sender) && senderConfigEpoch == myself->configEpoch) { clusterHandleConfigEpochCollision(sender); }
/* Get info from the gossip section */ // 分析并提取出消息 gossip 协议部分的信息 clusterProcessGossipSection(hdr,link);
// 这是一条 FAIL 消息: sender 告知当前节点,某个节点已经进入 FAIL 状态。 } else if (type == CLUSTERMSG_TYPE_FAIL) { clusterNode *failing;
if (sender) {
// 获取下线节点的消息 failing = clusterLookupNode(hdr->data.fail.about.nodename); // 下线的节点既不是当前节点,也没有处于 FAIL 状态 if (failing && !(failing->flags & (REDIS_NODE_FAIL|REDIS_NODE_MYSELF))) { redisLog(REDIS_NOTICE, "FAIL message received from %.40s about %.40s", hdr->sender, hdr->data.fail.about.nodename);
// 打开 FAIL 状态 failing->flags |= REDIS_NODE_FAIL; failing->fail_time = mstime(); // 关闭 PFAIL 状态 failing->flags &= ~REDIS_NODE_PFAIL; clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_UPDATE_STATE); } } else { redisLog(REDIS_NOTICE, "Ignoring FAIL message from unknonw node %.40s about %.40s", hdr->sender, hdr->data.fail.about.nodename); }
// 这是一条 PUBLISH 消息 } else if (type == CLUSTERMSG_TYPE_PUBLISH) { robj *channel, *message; uint32_t channel_len, message_len;
/* Don't bother creating useless objects if there are no * Pub/Sub subscribers. */ // 只在有订阅者时创建消息对象 if (dictSize(server.pubsub_channels) || listLength(server.pubsub_patterns)) { // 频道长度 channel_len = ntohl(hdr->data.publish.msg.channel_len);
// 消息长度 message_len = ntohl(hdr->data.publish.msg.message_len);
// 频道 channel = createStringObject( (char*)hdr->data.publish.msg.bulk_data,channel_len);
// 消息 message = createStringObject( (char*)hdr->data.publish.msg.bulk_data+channel_len, message_len); // 发送消息 pubsubPublishMessage(channel,message);
decrRefCount(channel); decrRefCount(message); }
// 这是一条请求获得故障迁移授权的消息: sender 请求当前节点为它进行故障转移投票 } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) { if (!sender) return 1; /* We don't know that node. */ // 如果条件允许的话,向 sender 投票,支持它进行故障转移 clusterSendFailoverAuthIfNeeded(sender,hdr);
// 这是一条故障迁移投票信息: sender 支持当前节点执行故障转移操作 } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) { if (!sender) return 1; /* We don't know that node. */
/* We consider this vote only if the sender is a master serving * a non zero number of slots, and its currentEpoch is greater or * equal to epoch where this node started the election. */ // 只有正在处理至少一个槽的主节点的投票会被视为是有效投票 // 只有符合以下条件, sender 的投票才算有效: // 1) sender 是主节点 // 2) sender 正在处理至少一个槽 // 3) sender 的配置纪元大于等于当前节点的配置纪元 if (nodeIsMaster(sender) && sender->numslots > 0 && senderCurrentEpoch >= server.cluster->failover_auth_epoch) { // 增加支持票数 server.cluster->failover_auth_count++;
/* Maybe we reached a quorum here, set a flag to make sure * we check ASAP. */ clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER); }
} else if (type == CLUSTERMSG_TYPE_MFSTART) { /* This message is acceptable only if I'm a master and the sender * is one of my slaves. */ if (!sender || sender->slaveof != myself) return 1; /* Manual failover requested from slaves. Initialize the state * accordingly. */ resetManualFailover(); server.cluster->mf_end = mstime() + REDIS_CLUSTER_MF_TIMEOUT; server.cluster->mf_slave = sender; pauseClients(mstime()+(REDIS_CLUSTER_MF_TIMEOUT*2)); redisLog(REDIS_WARNING,"Manual failover requested by slave %.40s.", sender->name); } else if (type == CLUSTERMSG_TYPE_UPDATE) { clusterNode *n; /* The node the update is about. */ uint64_t reportedConfigEpoch = ntohu64(hdr->data.update.nodecfg.configEpoch);
if (!sender) return 1; /* We don't know the sender. */
// 获取需要更新的节点 n = clusterLookupNode(hdr->data.update.nodecfg.nodename); if (!n) return 1; /* We don't know the reported node. */
// 消息的纪元并不大于节点 n 所处的配置纪元 // 无须更新 if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */
/* If in our current config the node is a slave, set it as a master. */ // 如果节点 n 为从节点,但它的槽配置更新了 // 那么说明这个节点已经变为主节点,将它设置为主节点 if (nodeIsSlave(n)) clusterSetNodeAsMaster(n);
/* Update the node's configEpoch. */ n->configEpoch = reportedConfigEpoch; clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_FSYNC_CONFIG);
/* Check the bitmap of served slots and udpate our * config accordingly. */ // 将消息中对 n 的槽布局与当前节点对 n 的槽布局进行对比 // 在有需要时更新当前节点对 n 的槽布局的认识 clusterUpdateSlotsConfigWith(n,reportedConfigEpoch, hdr->data.update.nodecfg.slots); } else { redisLog(REDIS_WARNING,"Received unknown packet type: %d", type); } return 1;}
复制代码


用户头像

还未添加个人签名 2020.04.30 加入

还未添加个人简介

评论

发布
暂无评论
Redis - Cluster - 源码阅读(二)