写点什么

Redis - Cluster - 源码阅读(一)

发布于: 3 小时前
Redis - Cluster - 源码阅读(一)

如果以集群模式启动,需要和其他节点交换信息,怎么实现呢?主要在两方面:

1、节点定时的主动的向其他节点发送本节点所拥有的信息

2、其他节点收到本节点的信息时,再将其所拥有的信息作为响应返回给本节点

看 redis 这一块的源码,包括 sentinel 的源码,最大的感受是其将状态的维护和状态的使用分离开来,解耦,各司其职。

主入口

下面就是节点的定时任务,其中就会向其他节点发送信息

// 每间隔 100 毫秒执行一次void clusterCron(void) {    dictIterator *di;    dictEntry *de;    int update_state = 0;    int orphaned_masters; /* How many masters there are without ok slaves. */    int max_slaves; /* Max number of ok slaves for a single master. */    int this_slaves; /* Number of ok slaves for our master (if we are slave). */    mstime_t min_pong = 0, now = mstime();    clusterNode *min_pong_node = NULL;    static unsigned long long iteration = 0;    mstime_t handshake_timeout;
//记录迭代次数,下面的部分逻辑会对该值取模,作为当前迭代是否执行的判断条件 iteration++;
//设置handshake超时时间,如果超时时间设置小于1秒,则设置为1秒 //本节点发现有新节点(meet或者其他)时,会创建新节点实例,此时不会立即与新节点通信,而是等到迭代周期 //到了才通信,所以新节点实例的初始状态未handshake,当本节点向新节点发送ping,新节点响应pong时, //则状态变为nohandshake,从handshake到nohandshake是有一段时间的,这个超时就是管控这里的时间 handshake_timeout = server.cluster_node_timeout; if (handshake_timeout < 1000) handshake_timeout = 1000;
//遍历本节点所持有的集群的所有节点(包括master/slave) di = dictGetSafeIterator(server.cluster->nodes); while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de);
// 跳过当前节点以及没有地址的节点 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
// 如果 handshake 节点已超时,则释放该节点,这里的释放包括: // 1、将该节点从本节点维护的集群节点中移除 // 2、若该节点是从节点,则让其主节点将该从节点移除 // 总得来说,就是删除该节点 if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) { freeClusterNode(node); continue; }
// 为未创建连接的节点创建连接。 if (node->link == NULL) { int fd; mstime_t old_ping_sent; clusterLink *link;
fd = anetTcpNonBlockBindConnect(server.neterr, node->ip, node->port+REDIS_CLUSTER_PORT_INCR, server.bindaddr_count ? server.bindaddr[0] : NULL); if (fd == -1) { redisLog(REDIS_DEBUG, "Unable to connect to " "Cluster Node [%s]:%d -> %s", node->ip, node->port+REDIS_CLUSTER_PORT_INCR, server.neterr); continue; } link = createClusterLink(node); link->fd = fd; node->link = link; aeCreateFileEvent(server.el,link->fd,AE_READABLE, clusterReadHandler,link); //创建好连接后,马上向目标节点发送ping/meet信息,以避免将目标节点误判为失败 //ping和meet本质上是相同的,只是发送ping的时候会记录ping发送时间,而meet不记录 //当目标节点是通过meet命令加入的,则目标节点的初始状态是handshake、meet //发送ping时记录ping发送时间,代表已发送ping,要求对方一定要回复pong,不回复则不再发送 //任何命令,当pong回复且回复时间大于ping,则设置pong接收时间,认为一次ping、pong完成,此时会设置ping发送 //时间为0,准备开始下一周期的ping、pong //meet是没有这种要求的(应该要回复,不回复也可以),即便没有回复,下一周期也可以再执行一次ping,所以这里不设置ping发送 //时间 old_ping_sent = node->ping_sent; clusterSendPing(link, node->flags & REDIS_NODE_MEET ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
if (old_ping_sent) { /* If there was an active ping before the link was * disconnected, we want to restore the ping time, otherwise * replaced by the clusterSendPing() call. */ node->ping_sent = old_ping_sent; }
node->flags &= ~REDIS_NODE_MEET;
redisLog(REDIS_DEBUG,"Connecting with Node %.40s at %s:%d", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR); } } dictReleaseIterator(di);
//每迭代10次执行一次,即1s执行一次 //随机挑5个节点出来,找到回复pong最老旧的节点,向其发送ping命令。 //pong老旧说明很长时间没有交互了,需要尽快确认下信息是否正确,状态是否正常 if (!(iteration % 10)) { int j;
/* Check a few random nodes and ping the one with the oldest * pong_received time. */ // 随机 5 个节点,选出其中一个 for (j = 0; j < 5; j++) {
// 随机在集群中挑选节点 de = dictGetRandomKey(server.cluster->nodes); clusterNode *this = dictGetVal(de);
/* Don't ping nodes disconnected or with a ping currently active. */ // 不要 PING 连接断开的节点,也不要 PING 最近已经 PING 过的节点 if (this->link == NULL || this->ping_sent != 0) continue;
if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue;
// 选出 5 个随机节点中最近一次接收 PONG 回复距离现在最旧的节点 if (min_pong_node == NULL || min_pong > this->pong_received) { min_pong_node = this; min_pong = this->pong_received; } }
// 向最久没有收到 PONG 回复的节点发送 PING 命令 if (min_pong_node) { redisLog(REDIS_DEBUG,"Pinging node %.40s", min_pong_node->name); clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING); } }
// 遍历所有节点,检查是否需要将某个节点标记为下线 /* Iterate nodes to check if we need to flag something as failing. * This loop is also responsible to: * 1) Check if there are orphaned masters (masters without non failing * slaves). * 2) Count the max number of non failing slaves for a single master. * 3) Count the number of slaves for our master, if we are a slave. */ orphaned_masters = 0; max_slaves = 0; this_slaves = 0; di = dictGetSafeIterator(server.cluster->nodes); while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); now = mstime(); /* Use an updated time at every iteration. */ mstime_t delay;
// 跳过节点本身、无地址节点、HANDSHAKE 状态的节点 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE)) continue;
/* Orphaned master check, useful only if the current instance * is a slave that may migrate to another master. */ if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) { int okslaves = clusterCountNonFailingSlaves(node);
if (okslaves == 0 && node->numslots > 0) orphaned_masters++; if (okslaves > max_slaves) max_slaves = okslaves; if (nodeIsSlave(myself) && myself->slaveof == node) this_slaves = okslaves; }
//释放掉可能存在问题的连接,下一次迭代重连 if (node->link && /* is connected */ now - node->link->ctime > server.cluster_node_timeout && /* was not already reconnected */ node->ping_sent && /* we already sent a ping */ node->pong_received < node->ping_sent && /* still waiting pong */ /* and we are waiting for the pong more than timeout/2 */ now - node->ping_sent > server.cluster_node_timeout/2) { /* Disconnect the link, it will be reconnected automatically. */ // 释放连接,下次 clusterCron() 会自动重连 freeClusterLink(node->link); } //选择还没有开始新一轮ping、pong且已完成最后一轮ping、pong的时间和当前时间间隔超过阈值的节点 //发送ping,让本节点维护的信息得到及时更新 if (node->link && node->ping_sent == 0 && (now - node->pong_received) > server.cluster_node_timeout/2) { clusterSendPing(node->link, CLUSTERMSG_TYPE_PING); continue; }
// 如果这是一个主节点,并且有一个从服务器请求进行手动故障转移 // 那么向从服务器发送 PING ,不断监测从节点的状态 if (server.cluster->mf_end && nodeIsMaster(myself) && server.cluster->mf_slave == node && node->link) { clusterSendPing(node->link, CLUSTERMSG_TYPE_PING); continue; }
// 以下代码只在节点发送了 PING 命令的情况下执行 if (node->ping_sent == 0) continue;
// 计算等待 PONG 回复的时长 delay = now - node->ping_sent;
// 等待 PONG 回复的时长超过了限制值,将目标节点标记为 PFAIL (疑似下线) if (delay > server.cluster_node_timeout) { /* Timeout reached. Set the node as possibly failing if it is * not already in this state. */ if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) { redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing", node->name); // 打开疑似下线标记 node->flags |= REDIS_NODE_PFAIL; update_state = 1; } } } dictReleaseIterator(di);
// 如果从节点没有在复制主节点,那么对从节点进行设置 if (nodeIsSlave(myself) && server.masterhost == NULL && myself->slaveof && nodeHasAddr(myself->slaveof)) { replicationSetMaster(myself->slaveof->ip, myself->slaveof->port); }
/* Abourt a manual failover if the timeout is reached. */ manualFailoverCheckTimeout();
if (nodeIsSlave(myself)) { clusterHandleManualFailover(); clusterHandleSlaveFailover(); /* If there are orphaned slaves, and we are a slave among the masters * with the max number of non-failing slaves, consider migrating to * the orphaned masters. Note that it does not make sense to try * a migration if there is no master with at least *two* working * slaves. */ if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves) clusterHandleSlaveMigration(max_slaves); }
// 更新集群状态 if (update_state || server.cluster->state == REDIS_CLUSTER_FAIL) clusterUpdateState();}
复制代码


定时任务里面主要做了几件事情:

1、超时节点/连接的维护以及节点初始化通信、主从建立等

2、1 秒执行一次,选取一个老节点向其发送 ping 命令

3、100 毫秒执行一次,挑选出一批老节点向其发送 ping 命令

4、“疑似下线”判断

5、更新集群状态信息


下面再看下几个核心的部分。

消息发送

从主入口我们已经可以知道,每秒需要向 1 + 10 * N(N 为当前时间 - pong 接收时间 > node_timeout/2 的节点数量)个节点发送消息。

那么发送什么消息呢,主要有两部分:

1、本节点的信息

2、本节点维护的其他一定数量的节点信息

// 向指定节点发送一条 MEET 、 PING 或者 PONG 消息// 主要包含节点的名称、纪元、槽位、通信地址、flags(主从、下线状态等)void clusterSendPing(clusterLink *link, int type) {    unsigned char buf[sizeof(clusterMsg)];    clusterMsg *hdr = (clusterMsg*) buf;    int gossipcount = 0, totlen;      //根据gossip协议,当前节点向目标节点发送消息时,消息中除了包含当前节点的信息,还应该带上当前节点维护    //的其他节点信息,那么要带上多少节点的信息呢,这是个取舍。带得越多,信息交换越及时,但带宽占用越大    //freshnodes是理论可以带得节点数 = 集群节点总数 - 2,即减去当前节点以及目标节点,其他都可以带。    //不带当前节点是因为消息中已经有了,不带目标节点,是因为消息给目标节点后,其会回复pong,没必要带    int freshnodes = dictSize(server.cluster->nodes)-2;
// 如果发送的信息是 PING ,那么更新最后一次发送 PING 命令的时间戳 // 这里更新ping_sent很关键,很多逻辑依赖该值,具体见主入口中的逻辑 if (link->node && type == CLUSTERMSG_TYPE_PING) link->node->ping_sent = mstime();
// 将当前节点的信息(比如名字、地址、端口号、负责处理的槽)记录到消息里面 clusterBuildMessageHdr(hdr,type);
//从集群节点中随机跳出附带节点,gossipcount<3,代表实际最多只能带2个。 //后续redis版本已经更新,最少带2个,最多带freshnodes的十分之一 while(freshnodes > 0 && gossipcount < 3) { // 从 nodes 字典中随机选出一个节点(被选中节点) dictEntry *de = dictGetRandomKey(server.cluster->nodes); clusterNode *this = dictGetVal(de);
clusterMsgDataGossip *gossip; int j;
/* In the gossip section don't include: * 以下节点不能作为被选中节点: * 1) Myself. * 节点本身。 * 2) Nodes in HANDSHAKE state. * 处于 HANDSHAKE 状态的节点。 * 3) Nodes with the NOADDR flag set. * 带有 NOADDR 标识的节点 * 4) Disconnected nodes if they don't have configured slots. * 因为不处理任何槽而被断开连接的节点 */ if (this == myself || this->flags & (REDIS_NODE_HANDSHAKE|REDIS_NODE_NOADDR) || (this->link == NULL && this->numslots == 0)) { freshnodes--; /* otherwise we may loop forever. */ continue; }
/* Check if we already added this node */ // 检查被选中节点是否已经在 hdr->data.ping.gossip 数组里面 // 如果是的话说明这个节点之前已经被选中了 // 不要再选中它(否则就会出现重复) for (j = 0; j < gossipcount; j++) { if (memcmp(hdr->data.ping.gossip[j].nodename,this->name, REDIS_CLUSTER_NAMELEN) == 0) break; } if (j != gossipcount) continue;
/* Add it */
// 这个被选中节点有效,计数器减一 freshnodes--;
// 指向 gossip 信息结构 gossip = &(hdr->data.ping.gossip[gossipcount]);
// 将被选中节点的名字记录到 gossip 信息 memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN); // 将被选中节点的 PING 命令发送时间戳记录到 gossip 信息 gossip->ping_sent = htonl(this->ping_sent); // 将被选中节点的 PING 命令回复的时间戳记录到 gossip 信息 gossip->pong_received = htonl(this->pong_received); // 将被选中节点的 IP 记录到 gossip 信息 memcpy(gossip->ip,this->ip,sizeof(this->ip)); // 将被选中节点的端口号记录到 gossip 信息 gossip->port = htons(this->port); // 将被选中节点的标识值记录到 gossip 信息 gossip->flags = htons(this->flags);
// 这个被选中节点有效,计数器增一 gossipcount++; }
// 计算信息长度 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); totlen += (sizeof(clusterMsgDataGossip)*gossipcount); // 将被选中节点的数量(gossip 信息中包含了多少个节点的信息) // 记录在 count 属性里面 hdr->count = htons(gossipcount); // 将信息的长度记录到信息里面 hdr->totlen = htonl(totlen);
// 发送信息 clusterSendMessage(link,buf,totlen);}
复制代码


集群状态更新

随着消息的交换,集群各节点的状态以及集群的状态都在发生变化,所以需要定时更新集群状态。

// 更新节点状态void clusterUpdateState(void) {    int j, new_state;    int unreachable_masters = 0;    static mstime_t among_minority_time;    static mstime_t first_call_time = 0;
server.cluster->todo_before_sleep &= ~CLUSTER_TODO_UPDATE_STATE;
/* If this is a master node, wait some time before turning the state * into OK, since it is not a good idea to rejoin the cluster as a writable * master, after a reboot, without giving the cluster a chance to * reconfigure this node. Note that the delay is calculated starting from * the first call to this function and not since the server start, in order * to don't count the DB loading time. */ if (first_call_time == 0) first_call_time = mstime(); if (nodeIsMaster(myself) && mstime() - first_call_time < REDIS_CLUSTER_WRITABLE_DELAY) return;
/* Start assuming the state is OK. We'll turn it into FAIL if there * are the right conditions. */ new_state = REDIS_CLUSTER_OK;
// 这个逻辑比较重要,可以看到,如果任意一个槽没有分配到节点或者分配了但节点fail,会判定为集群失败 for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) { if (server.cluster->slots[j] == NULL || server.cluster->slots[j]->flags & (REDIS_NODE_FAIL)) { new_state = REDIS_CLUSTER_FAIL; break; } }
/* Compute the cluster size, that is the number of master nodes * serving at least a single slot. * * At the same time count the number of unreachable masters with * at least one node. */ // 统计在线并且正在处理至少一个槽的 master 的数量, // 以及下线 master 的数量 { dictIterator *di; dictEntry *de;
server.cluster->size = 0; di = dictGetSafeIterator(server.cluster->nodes); while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de);
if (nodeIsMaster(node) && node->numslots) { server.cluster->size++; if (node->flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)) unreachable_masters++; } } dictReleaseIterator(di); }
/* If we can't reach at least half the masters, change the cluster state * to FAIL, as we are not even able to mark nodes as FAIL in this side * of the netsplit because of lack of majority. */ { int needed_quorum = (server.cluster->size / 2) + 1;
if (unreachable_masters >= needed_quorum) { new_state = REDIS_CLUSTER_FAIL; among_minority_time = mstime(); } }
/* Log a state change */ // 记录状态变更 if (new_state != server.cluster->state) { mstime_t rejoin_delay = server.cluster_node_timeout;
/* If the instance is a master and was partitioned away with the * minority, don't let it accept queries for some time after the * partition heals, to make sure there is enough time to receive * a configuration update. */ if (rejoin_delay > REDIS_CLUSTER_MAX_REJOIN_DELAY) rejoin_delay = REDIS_CLUSTER_MAX_REJOIN_DELAY; if (rejoin_delay < REDIS_CLUSTER_MIN_REJOIN_DELAY) rejoin_delay = REDIS_CLUSTER_MIN_REJOIN_DELAY;
if (new_state == REDIS_CLUSTER_OK && nodeIsMaster(myself) && mstime() - among_minority_time < rejoin_delay) { return; }
/* Change the state and log the event. */ redisLog(REDIS_WARNING,"Cluster state changed: %s", new_state == REDIS_CLUSTER_OK ? "ok" : "fail");
// 设置新状态 server.cluster->state = new_state; }}
复制代码


用户头像

还未添加个人签名 2020.04.30 加入

还未添加个人简介

评论

发布
暂无评论
Redis - Cluster - 源码阅读(一)