Redis - Cluster - 源码阅读(一)
发布于: 3 小时前

如果以集群模式启动,需要和其他节点交换信息,怎么实现呢?主要在两方面:
1、节点定时的主动的向其他节点发送本节点所拥有的信息
2、其他节点收到本节点的信息时,再将其所拥有的信息作为响应返回给本节点
看 redis 这一块的源码,包括 sentinel 的源码,最大的感受是其将状态的维护和状态的使用分离开来,解耦,各司其职。
主入口
下面就是节点的定时任务,其中就会向其他节点发送信息
// 每间隔 100 毫秒执行一次void clusterCron(void) {    dictIterator *di;    dictEntry *de;    int update_state = 0;    int orphaned_masters; /* How many masters there are without ok slaves. */    int max_slaves; /* Max number of ok slaves for a single master. */    int this_slaves; /* Number of ok slaves for our master (if we are slave). */    mstime_t min_pong = 0, now = mstime();    clusterNode *min_pong_node = NULL;    static unsigned long long iteration = 0;    mstime_t handshake_timeout;
		//记录迭代次数,下面的部分逻辑会对该值取模,作为当前迭代是否执行的判断条件    iteration++; 
		//设置handshake超时时间,如果超时时间设置小于1秒,则设置为1秒    //本节点发现有新节点(meet或者其他)时,会创建新节点实例,此时不会立即与新节点通信,而是等到迭代周期    //到了才通信,所以新节点实例的初始状态未handshake,当本节点向新节点发送ping,新节点响应pong时,    //则状态变为nohandshake,从handshake到nohandshake是有一段时间的,这个超时就是管控这里的时间    handshake_timeout = server.cluster_node_timeout;    if (handshake_timeout < 1000) handshake_timeout = 1000;
    //遍历本节点所持有的集群的所有节点(包括master/slave)    di = dictGetSafeIterator(server.cluster->nodes);    while((de = dictNext(di)) != NULL) {        clusterNode *node = dictGetVal(de);
        // 跳过当前节点以及没有地址的节点        if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
        // 如果 handshake 节点已超时,则释放该节点,这里的释放包括:        // 1、将该节点从本节点维护的集群节点中移除        // 2、若该节点是从节点,则让其主节点将该从节点移除        // 总得来说,就是删除该节点        if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {            freeClusterNode(node);            continue;        }
        // 为未创建连接的节点创建连接。        if (node->link == NULL) {            int fd;            mstime_t old_ping_sent;            clusterLink *link;
            fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,                node->port+REDIS_CLUSTER_PORT_INCR,                    server.bindaddr_count ? server.bindaddr[0] : NULL);            if (fd == -1) {                redisLog(REDIS_DEBUG, "Unable to connect to "                    "Cluster Node [%s]:%d -> %s", node->ip,                    node->port+REDIS_CLUSTER_PORT_INCR,                    server.neterr);                continue;            }            link = createClusterLink(node);            link->fd = fd;            node->link = link;            aeCreateFileEvent(server.el,link->fd,AE_READABLE,                    clusterReadHandler,link);                                //创建好连接后,马上向目标节点发送ping/meet信息,以避免将目标节点误判为失败            //ping和meet本质上是相同的,只是发送ping的时候会记录ping发送时间,而meet不记录            //当目标节点是通过meet命令加入的,则目标节点的初始状态是handshake、meet            //发送ping时记录ping发送时间,代表已发送ping,要求对方一定要回复pong,不回复则不再发送            //任何命令,当pong回复且回复时间大于ping,则设置pong接收时间,认为一次ping、pong完成,此时会设置ping发送            //时间为0,准备开始下一周期的ping、pong            //meet是没有这种要求的(应该要回复,不回复也可以),即便没有回复,下一周期也可以再执行一次ping,所以这里不设置ping发送            //时间            old_ping_sent = node->ping_sent;            clusterSendPing(link, node->flags & REDIS_NODE_MEET ?                    CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
            if (old_ping_sent) {                /* If there was an active ping before the link was                 * disconnected, we want to restore the ping time, otherwise                 * replaced by the clusterSendPing() call. */                node->ping_sent = old_ping_sent;            }
            node->flags &= ~REDIS_NODE_MEET;
            redisLog(REDIS_DEBUG,"Connecting with Node %.40s at %s:%d",                    node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);        }    }    dictReleaseIterator(di);
    //每迭代10次执行一次,即1s执行一次    //随机挑5个节点出来,找到回复pong最老旧的节点,向其发送ping命令。    //pong老旧说明很长时间没有交互了,需要尽快确认下信息是否正确,状态是否正常    if (!(iteration % 10)) {        int j;
        /* Check a few random nodes and ping the one with the oldest         * pong_received time. */        // 随机 5 个节点,选出其中一个        for (j = 0; j < 5; j++) {
            // 随机在集群中挑选节点            de = dictGetRandomKey(server.cluster->nodes);            clusterNode *this = dictGetVal(de);
            /* Don't ping nodes disconnected or with a ping currently active. */            // 不要 PING 连接断开的节点,也不要 PING 最近已经 PING 过的节点            if (this->link == NULL || this->ping_sent != 0) continue;
            if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE))                continue;
            // 选出 5 个随机节点中最近一次接收 PONG 回复距离现在最旧的节点            if (min_pong_node == NULL || min_pong > this->pong_received) {                min_pong_node = this;                min_pong = this->pong_received;            }        }
        // 向最久没有收到 PONG 回复的节点发送 PING 命令        if (min_pong_node) {            redisLog(REDIS_DEBUG,"Pinging node %.40s", min_pong_node->name);            clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);        }    }
    // 遍历所有节点,检查是否需要将某个节点标记为下线    /* Iterate nodes to check if we need to flag something as failing.     * This loop is also responsible to:     * 1) Check if there are orphaned masters (masters without non failing     *    slaves).     * 2) Count the max number of non failing slaves for a single master.     * 3) Count the number of slaves for our master, if we are a slave. */    orphaned_masters = 0;    max_slaves = 0;    this_slaves = 0;    di = dictGetSafeIterator(server.cluster->nodes);    while((de = dictNext(di)) != NULL) {        clusterNode *node = dictGetVal(de);        now = mstime(); /* Use an updated time at every iteration. */        mstime_t delay;
        // 跳过节点本身、无地址节点、HANDSHAKE 状态的节点        if (node->flags &            (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE))                continue;
        /* Orphaned master check, useful only if the current instance         * is a slave that may migrate to another master. */        if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {            int okslaves = clusterCountNonFailingSlaves(node);
            if (okslaves == 0 && node->numslots > 0) orphaned_masters++;            if (okslaves > max_slaves) max_slaves = okslaves;            if (nodeIsSlave(myself) && myself->slaveof == node)                this_slaves = okslaves;        }
				//释放掉可能存在问题的连接,下一次迭代重连        if (node->link && /* is connected */            now - node->link->ctime >            server.cluster_node_timeout && /* was not already reconnected */            node->ping_sent && /* we already sent a ping */            node->pong_received < node->ping_sent && /* still waiting pong */            /* and we are waiting for the pong more than timeout/2 */            now - node->ping_sent > server.cluster_node_timeout/2)        {            /* Disconnect the link, it will be reconnected automatically. */            // 释放连接,下次 clusterCron() 会自动重连            freeClusterLink(node->link);        }       	//选择还没有开始新一轮ping、pong且已完成最后一轮ping、pong的时间和当前时间间隔超过阈值的节点        //发送ping,让本节点维护的信息得到及时更新        if (node->link &&            node->ping_sent == 0 &&            (now - node->pong_received) > server.cluster_node_timeout/2)        {            clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);            continue;        }
        // 如果这是一个主节点,并且有一个从服务器请求进行手动故障转移        // 那么向从服务器发送 PING ,不断监测从节点的状态        if (server.cluster->mf_end &&            nodeIsMaster(myself) &&            server.cluster->mf_slave == node &&            node->link)        {            clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);            continue;        }
        // 以下代码只在节点发送了 PING 命令的情况下执行        if (node->ping_sent == 0) continue;
        // 计算等待 PONG 回复的时长        delay = now - node->ping_sent;
        // 等待 PONG 回复的时长超过了限制值,将目标节点标记为 PFAIL (疑似下线)        if (delay > server.cluster_node_timeout) {            /* Timeout reached. Set the node as possibly failing if it is             * not already in this state. */            if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) {                redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",                    node->name);                // 打开疑似下线标记                node->flags |= REDIS_NODE_PFAIL;                update_state = 1;            }        }    }    dictReleaseIterator(di);
      // 如果从节点没有在复制主节点,那么对从节点进行设置    if (nodeIsSlave(myself) &&        server.masterhost == NULL &&        myself->slaveof &&        nodeHasAddr(myself->slaveof))    {        replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);    }
    /* Abourt a manual failover if the timeout is reached. */    manualFailoverCheckTimeout();
    if (nodeIsSlave(myself)) {        clusterHandleManualFailover();        clusterHandleSlaveFailover();        /* If there are orphaned slaves, and we are a slave among the masters         * with the max number of non-failing slaves, consider migrating to         * the orphaned masters. Note that it does not make sense to try         * a migration if there is no master with at least *two* working         * slaves. */        if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)            clusterHandleSlaveMigration(max_slaves);    }
    // 更新集群状态    if (update_state || server.cluster->state == REDIS_CLUSTER_FAIL)        clusterUpdateState();}复制代码
 定时任务里面主要做了几件事情:
1、超时节点/连接的维护以及节点初始化通信、主从建立等
2、1 秒执行一次,选取一个老节点向其发送 ping 命令
3、100 毫秒执行一次,挑选出一批老节点向其发送 ping 命令
4、“疑似下线”判断
5、更新集群状态信息
下面再看下几个核心的部分。
消息发送
从主入口我们已经可以知道,每秒需要向 1 + 10 * N(N 为当前时间 - pong 接收时间 > node_timeout/2 的节点数量)个节点发送消息。
那么发送什么消息呢,主要有两部分:
1、本节点的信息
2、本节点维护的其他一定数量的节点信息
// 向指定节点发送一条 MEET 、 PING 或者 PONG 消息// 主要包含节点的名称、纪元、槽位、通信地址、flags(主从、下线状态等)void clusterSendPing(clusterLink *link, int type) {    unsigned char buf[sizeof(clusterMsg)];    clusterMsg *hdr = (clusterMsg*) buf;    int gossipcount = 0, totlen;      //根据gossip协议,当前节点向目标节点发送消息时,消息中除了包含当前节点的信息,还应该带上当前节点维护    //的其他节点信息,那么要带上多少节点的信息呢,这是个取舍。带得越多,信息交换越及时,但带宽占用越大    //freshnodes是理论可以带得节点数 = 集群节点总数 - 2,即减去当前节点以及目标节点,其他都可以带。    //不带当前节点是因为消息中已经有了,不带目标节点,是因为消息给目标节点后,其会回复pong,没必要带    int freshnodes = dictSize(server.cluster->nodes)-2;
    // 如果发送的信息是 PING ,那么更新最后一次发送 PING 命令的时间戳    // 这里更新ping_sent很关键,很多逻辑依赖该值,具体见主入口中的逻辑    if (link->node && type == CLUSTERMSG_TYPE_PING)        link->node->ping_sent = mstime();
    // 将当前节点的信息(比如名字、地址、端口号、负责处理的槽)记录到消息里面    clusterBuildMessageHdr(hdr,type);
		//从集群节点中随机跳出附带节点,gossipcount<3,代表实际最多只能带2个。    //后续redis版本已经更新,最少带2个,最多带freshnodes的十分之一    while(freshnodes > 0 && gossipcount < 3) {        // 从 nodes 字典中随机选出一个节点(被选中节点)        dictEntry *de = dictGetRandomKey(server.cluster->nodes);        clusterNode *this = dictGetVal(de);
        clusterMsgDataGossip *gossip;        int j;
        /* In the gossip section don't include:         * 以下节点不能作为被选中节点:         * 1) Myself.         *    节点本身。         * 2) Nodes in HANDSHAKE state.         *    处于 HANDSHAKE 状态的节点。         * 3) Nodes with the NOADDR flag set.         *    带有 NOADDR 标识的节点         * 4) Disconnected nodes if they don't have configured slots.         *    因为不处理任何槽而被断开连接的节点          */        if (this == myself ||            this->flags & (REDIS_NODE_HANDSHAKE|REDIS_NODE_NOADDR) ||            (this->link == NULL && this->numslots == 0))        {                freshnodes--; /* otherwise we may loop forever. */                continue;        }
        /* Check if we already added this node */        // 检查被选中节点是否已经在 hdr->data.ping.gossip 数组里面        // 如果是的话说明这个节点之前已经被选中了        // 不要再选中它(否则就会出现重复)        for (j = 0; j < gossipcount; j++) {            if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,                    REDIS_CLUSTER_NAMELEN) == 0) break;        }        if (j != gossipcount) continue;
        /* Add it */
        // 这个被选中节点有效,计数器减一        freshnodes--;
        // 指向 gossip 信息结构        gossip = &(hdr->data.ping.gossip[gossipcount]);
        // 将被选中节点的名字记录到 gossip 信息        memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);        // 将被选中节点的 PING 命令发送时间戳记录到 gossip 信息        gossip->ping_sent = htonl(this->ping_sent);        // 将被选中节点的 PING 命令回复的时间戳记录到 gossip 信息        gossip->pong_received = htonl(this->pong_received);        // 将被选中节点的 IP 记录到 gossip 信息        memcpy(gossip->ip,this->ip,sizeof(this->ip));        // 将被选中节点的端口号记录到 gossip 信息        gossip->port = htons(this->port);        // 将被选中节点的标识值记录到 gossip 信息        gossip->flags = htons(this->flags);
        // 这个被选中节点有效,计数器增一        gossipcount++;    }
    // 计算信息长度    totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);    totlen += (sizeof(clusterMsgDataGossip)*gossipcount);    // 将被选中节点的数量(gossip 信息中包含了多少个节点的信息)    // 记录在 count 属性里面    hdr->count = htons(gossipcount);    // 将信息的长度记录到信息里面    hdr->totlen = htonl(totlen);
    // 发送信息    clusterSendMessage(link,buf,totlen);}复制代码
 集群状态更新
随着消息的交换,集群各节点的状态以及集群的状态都在发生变化,所以需要定时更新集群状态。
// 更新节点状态void clusterUpdateState(void) {    int j, new_state;    int unreachable_masters = 0;    static mstime_t among_minority_time;    static mstime_t first_call_time = 0;
    server.cluster->todo_before_sleep &= ~CLUSTER_TODO_UPDATE_STATE;
    /* If this is a master node, wait some time before turning the state     * into OK, since it is not a good idea to rejoin the cluster as a writable     * master, after a reboot, without giving the cluster a chance to     * reconfigure this node. Note that the delay is calculated starting from     * the first call to this function and not since the server start, in order     * to don't count the DB loading time. */    if (first_call_time == 0) first_call_time = mstime();    if (nodeIsMaster(myself) &&        mstime() - first_call_time < REDIS_CLUSTER_WRITABLE_DELAY) return;
    /* Start assuming the state is OK. We'll turn it into FAIL if there     * are the right conditions. */    new_state = REDIS_CLUSTER_OK;
    // 这个逻辑比较重要,可以看到,如果任意一个槽没有分配到节点或者分配了但节点fail,会判定为集群失败    for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {        if (server.cluster->slots[j] == NULL ||            server.cluster->slots[j]->flags & (REDIS_NODE_FAIL))        {            new_state = REDIS_CLUSTER_FAIL;            break;        }    }
    /* Compute the cluster size, that is the number of master nodes     * serving at least a single slot.     *     * At the same time count the number of unreachable masters with     * at least one node. */    // 统计在线并且正在处理至少一个槽的 master 的数量,    // 以及下线 master 的数量    {        dictIterator *di;        dictEntry *de;
        server.cluster->size = 0;        di = dictGetSafeIterator(server.cluster->nodes);        while((de = dictNext(di)) != NULL) {            clusterNode *node = dictGetVal(de);
            if (nodeIsMaster(node) && node->numslots) {                server.cluster->size++;                if (node->flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL))                    unreachable_masters++;            }        }        dictReleaseIterator(di);    }
    /* If we can't reach at least half the masters, change the cluster state     * to FAIL, as we are not even able to mark nodes as FAIL in this side     * of the netsplit because of lack of majority.     */    {        int needed_quorum = (server.cluster->size / 2) + 1;
        if (unreachable_masters >= needed_quorum) {            new_state = REDIS_CLUSTER_FAIL;            among_minority_time = mstime();        }    }
    /* Log a state change */    // 记录状态变更    if (new_state != server.cluster->state) {        mstime_t rejoin_delay = server.cluster_node_timeout;
        /* If the instance is a master and was partitioned away with the         * minority, don't let it accept queries for some time after the         * partition heals, to make sure there is enough time to receive         * a configuration update. */        if (rejoin_delay > REDIS_CLUSTER_MAX_REJOIN_DELAY)            rejoin_delay = REDIS_CLUSTER_MAX_REJOIN_DELAY;        if (rejoin_delay < REDIS_CLUSTER_MIN_REJOIN_DELAY)            rejoin_delay = REDIS_CLUSTER_MIN_REJOIN_DELAY;
        if (new_state == REDIS_CLUSTER_OK &&            nodeIsMaster(myself) &&            mstime() - among_minority_time < rejoin_delay)        {            return;        }
        /* Change the state and log the event. */        redisLog(REDIS_WARNING,"Cluster state changed: %s",            new_state == REDIS_CLUSTER_OK ? "ok" : "fail");
        // 设置新状态        server.cluster->state = new_state;    }}复制代码
 划线
评论
复制
发布于: 3 小时前阅读数: 4

旺仔大菜包
关注
还未添加个人签名 2020.04.30 加入
还未添加个人简介











 
    
评论