Redis - Cluster - 源码阅读(一)
发布于: 3 小时前
如果以集群模式启动,需要和其他节点交换信息,怎么实现呢?主要在两方面:
1、节点定时的主动的向其他节点发送本节点所拥有的信息
2、其他节点收到本节点的信息时,再将其所拥有的信息作为响应返回给本节点
看 redis 这一块的源码,包括 sentinel 的源码,最大的感受是其将状态的维护和状态的使用分离开来,解耦,各司其职。
主入口
下面就是节点的定时任务,其中就会向其他节点发送信息
// 每间隔 100 毫秒执行一次
void clusterCron(void) {
dictIterator *di;
dictEntry *de;
int update_state = 0;
int orphaned_masters; /* How many masters there are without ok slaves. */
int max_slaves; /* Max number of ok slaves for a single master. */
int this_slaves; /* Number of ok slaves for our master (if we are slave). */
mstime_t min_pong = 0, now = mstime();
clusterNode *min_pong_node = NULL;
static unsigned long long iteration = 0;
mstime_t handshake_timeout;
//记录迭代次数,下面的部分逻辑会对该值取模,作为当前迭代是否执行的判断条件
iteration++;
//设置handshake超时时间,如果超时时间设置小于1秒,则设置为1秒
//本节点发现有新节点(meet或者其他)时,会创建新节点实例,此时不会立即与新节点通信,而是等到迭代周期
//到了才通信,所以新节点实例的初始状态未handshake,当本节点向新节点发送ping,新节点响应pong时,
//则状态变为nohandshake,从handshake到nohandshake是有一段时间的,这个超时就是管控这里的时间
handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < 1000) handshake_timeout = 1000;
//遍历本节点所持有的集群的所有节点(包括master/slave)
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
// 跳过当前节点以及没有地址的节点
if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
// 如果 handshake 节点已超时,则释放该节点,这里的释放包括:
// 1、将该节点从本节点维护的集群节点中移除
// 2、若该节点是从节点,则让其主节点将该从节点移除
// 总得来说,就是删除该节点
if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
freeClusterNode(node);
continue;
}
// 为未创建连接的节点创建连接。
if (node->link == NULL) {
int fd;
mstime_t old_ping_sent;
clusterLink *link;
fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
node->port+REDIS_CLUSTER_PORT_INCR,
server.bindaddr_count ? server.bindaddr[0] : NULL);
if (fd == -1) {
redisLog(REDIS_DEBUG, "Unable to connect to "
"Cluster Node [%s]:%d -> %s", node->ip,
node->port+REDIS_CLUSTER_PORT_INCR,
server.neterr);
continue;
}
link = createClusterLink(node);
link->fd = fd;
node->link = link;
aeCreateFileEvent(server.el,link->fd,AE_READABLE,
clusterReadHandler,link);
//创建好连接后,马上向目标节点发送ping/meet信息,以避免将目标节点误判为失败
//ping和meet本质上是相同的,只是发送ping的时候会记录ping发送时间,而meet不记录
//当目标节点是通过meet命令加入的,则目标节点的初始状态是handshake、meet
//发送ping时记录ping发送时间,代表已发送ping,要求对方一定要回复pong,不回复则不再发送
//任何命令,当pong回复且回复时间大于ping,则设置pong接收时间,认为一次ping、pong完成,此时会设置ping发送
//时间为0,准备开始下一周期的ping、pong
//meet是没有这种要求的(应该要回复,不回复也可以),即便没有回复,下一周期也可以再执行一次ping,所以这里不设置ping发送
//时间
old_ping_sent = node->ping_sent;
clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
if (old_ping_sent) {
/* If there was an active ping before the link was
* disconnected, we want to restore the ping time, otherwise
* replaced by the clusterSendPing() call. */
node->ping_sent = old_ping_sent;
}
node->flags &= ~REDIS_NODE_MEET;
redisLog(REDIS_DEBUG,"Connecting with Node %.40s at %s:%d",
node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
}
}
dictReleaseIterator(di);
//每迭代10次执行一次,即1s执行一次
//随机挑5个节点出来,找到回复pong最老旧的节点,向其发送ping命令。
//pong老旧说明很长时间没有交互了,需要尽快确认下信息是否正确,状态是否正常
if (!(iteration % 10)) {
int j;
/* Check a few random nodes and ping the one with the oldest
* pong_received time. */
// 随机 5 个节点,选出其中一个
for (j = 0; j < 5; j++) {
// 随机在集群中挑选节点
de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
/* Don't ping nodes disconnected or with a ping currently active. */
// 不要 PING 连接断开的节点,也不要 PING 最近已经 PING 过的节点
if (this->link == NULL || this->ping_sent != 0) continue;
if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE))
continue;
// 选出 5 个随机节点中最近一次接收 PONG 回复距离现在最旧的节点
if (min_pong_node == NULL || min_pong > this->pong_received) {
min_pong_node = this;
min_pong = this->pong_received;
}
}
// 向最久没有收到 PONG 回复的节点发送 PING 命令
if (min_pong_node) {
redisLog(REDIS_DEBUG,"Pinging node %.40s", min_pong_node->name);
clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
}
}
// 遍历所有节点,检查是否需要将某个节点标记为下线
/* Iterate nodes to check if we need to flag something as failing.
* This loop is also responsible to:
* 1) Check if there are orphaned masters (masters without non failing
* slaves).
* 2) Count the max number of non failing slaves for a single master.
* 3) Count the number of slaves for our master, if we are a slave. */
orphaned_masters = 0;
max_slaves = 0;
this_slaves = 0;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
now = mstime(); /* Use an updated time at every iteration. */
mstime_t delay;
// 跳过节点本身、无地址节点、HANDSHAKE 状态的节点
if (node->flags &
(REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE))
continue;
/* Orphaned master check, useful only if the current instance
* is a slave that may migrate to another master. */
if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
int okslaves = clusterCountNonFailingSlaves(node);
if (okslaves == 0 && node->numslots > 0) orphaned_masters++;
if (okslaves > max_slaves) max_slaves = okslaves;
if (nodeIsSlave(myself) && myself->slaveof == node)
this_slaves = okslaves;
}
//释放掉可能存在问题的连接,下一次迭代重连
if (node->link && /* is connected */
now - node->link->ctime >
server.cluster_node_timeout && /* was not already reconnected */
node->ping_sent && /* we already sent a ping */
node->pong_received < node->ping_sent && /* still waiting pong */
/* and we are waiting for the pong more than timeout/2 */
now - node->ping_sent > server.cluster_node_timeout/2)
{
/* Disconnect the link, it will be reconnected automatically. */
// 释放连接,下次 clusterCron() 会自动重连
freeClusterLink(node->link);
}
//选择还没有开始新一轮ping、pong且已完成最后一轮ping、pong的时间和当前时间间隔超过阈值的节点
//发送ping,让本节点维护的信息得到及时更新
if (node->link &&
node->ping_sent == 0 &&
(now - node->pong_received) > server.cluster_node_timeout/2)
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}
// 如果这是一个主节点,并且有一个从服务器请求进行手动故障转移
// 那么向从服务器发送 PING ,不断监测从节点的状态
if (server.cluster->mf_end &&
nodeIsMaster(myself) &&
server.cluster->mf_slave == node &&
node->link)
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}
// 以下代码只在节点发送了 PING 命令的情况下执行
if (node->ping_sent == 0) continue;
// 计算等待 PONG 回复的时长
delay = now - node->ping_sent;
// 等待 PONG 回复的时长超过了限制值,将目标节点标记为 PFAIL (疑似下线)
if (delay > server.cluster_node_timeout) {
/* Timeout reached. Set the node as possibly failing if it is
* not already in this state. */
if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) {
redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
node->name);
// 打开疑似下线标记
node->flags |= REDIS_NODE_PFAIL;
update_state = 1;
}
}
}
dictReleaseIterator(di);
// 如果从节点没有在复制主节点,那么对从节点进行设置
if (nodeIsSlave(myself) &&
server.masterhost == NULL &&
myself->slaveof &&
nodeHasAddr(myself->slaveof))
{
replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
}
/* Abourt a manual failover if the timeout is reached. */
manualFailoverCheckTimeout();
if (nodeIsSlave(myself)) {
clusterHandleManualFailover();
clusterHandleSlaveFailover();
/* If there are orphaned slaves, and we are a slave among the masters
* with the max number of non-failing slaves, consider migrating to
* the orphaned masters. Note that it does not make sense to try
* a migration if there is no master with at least *two* working
* slaves. */
if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
clusterHandleSlaveMigration(max_slaves);
}
// 更新集群状态
if (update_state || server.cluster->state == REDIS_CLUSTER_FAIL)
clusterUpdateState();
}
复制代码
定时任务里面主要做了几件事情:
1、超时节点/连接的维护以及节点初始化通信、主从建立等
2、1 秒执行一次,选取一个老节点向其发送 ping 命令
3、100 毫秒执行一次,挑选出一批老节点向其发送 ping 命令
4、“疑似下线”判断
5、更新集群状态信息
下面再看下几个核心的部分。
消息发送
从主入口我们已经可以知道,每秒需要向 1 + 10 * N(N 为当前时间 - pong 接收时间 > node_timeout/2 的节点数量)个节点发送消息。
那么发送什么消息呢,主要有两部分:
1、本节点的信息
2、本节点维护的其他一定数量的节点信息
// 向指定节点发送一条 MEET 、 PING 或者 PONG 消息
// 主要包含节点的名称、纪元、槽位、通信地址、flags(主从、下线状态等)
void clusterSendPing(clusterLink *link, int type) {
unsigned char buf[sizeof(clusterMsg)];
clusterMsg *hdr = (clusterMsg*) buf;
int gossipcount = 0, totlen;
//根据gossip协议,当前节点向目标节点发送消息时,消息中除了包含当前节点的信息,还应该带上当前节点维护
//的其他节点信息,那么要带上多少节点的信息呢,这是个取舍。带得越多,信息交换越及时,但带宽占用越大
//freshnodes是理论可以带得节点数 = 集群节点总数 - 2,即减去当前节点以及目标节点,其他都可以带。
//不带当前节点是因为消息中已经有了,不带目标节点,是因为消息给目标节点后,其会回复pong,没必要带
int freshnodes = dictSize(server.cluster->nodes)-2;
// 如果发送的信息是 PING ,那么更新最后一次发送 PING 命令的时间戳
// 这里更新ping_sent很关键,很多逻辑依赖该值,具体见主入口中的逻辑
if (link->node && type == CLUSTERMSG_TYPE_PING)
link->node->ping_sent = mstime();
// 将当前节点的信息(比如名字、地址、端口号、负责处理的槽)记录到消息里面
clusterBuildMessageHdr(hdr,type);
//从集群节点中随机跳出附带节点,gossipcount<3,代表实际最多只能带2个。
//后续redis版本已经更新,最少带2个,最多带freshnodes的十分之一
while(freshnodes > 0 && gossipcount < 3) {
// 从 nodes 字典中随机选出一个节点(被选中节点)
dictEntry *de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
clusterMsgDataGossip *gossip;
int j;
/* In the gossip section don't include:
* 以下节点不能作为被选中节点:
* 1) Myself.
* 节点本身。
* 2) Nodes in HANDSHAKE state.
* 处于 HANDSHAKE 状态的节点。
* 3) Nodes with the NOADDR flag set.
* 带有 NOADDR 标识的节点
* 4) Disconnected nodes if they don't have configured slots.
* 因为不处理任何槽而被断开连接的节点
*/
if (this == myself ||
this->flags & (REDIS_NODE_HANDSHAKE|REDIS_NODE_NOADDR) ||
(this->link == NULL && this->numslots == 0))
{
freshnodes--; /* otherwise we may loop forever. */
continue;
}
/* Check if we already added this node */
// 检查被选中节点是否已经在 hdr->data.ping.gossip 数组里面
// 如果是的话说明这个节点之前已经被选中了
// 不要再选中它(否则就会出现重复)
for (j = 0; j < gossipcount; j++) {
if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
REDIS_CLUSTER_NAMELEN) == 0) break;
}
if (j != gossipcount) continue;
/* Add it */
// 这个被选中节点有效,计数器减一
freshnodes--;
// 指向 gossip 信息结构
gossip = &(hdr->data.ping.gossip[gossipcount]);
// 将被选中节点的名字记录到 gossip 信息
memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);
// 将被选中节点的 PING 命令发送时间戳记录到 gossip 信息
gossip->ping_sent = htonl(this->ping_sent);
// 将被选中节点的 PING 命令回复的时间戳记录到 gossip 信息
gossip->pong_received = htonl(this->pong_received);
// 将被选中节点的 IP 记录到 gossip 信息
memcpy(gossip->ip,this->ip,sizeof(this->ip));
// 将被选中节点的端口号记录到 gossip 信息
gossip->port = htons(this->port);
// 将被选中节点的标识值记录到 gossip 信息
gossip->flags = htons(this->flags);
// 这个被选中节点有效,计数器增一
gossipcount++;
}
// 计算信息长度
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
// 将被选中节点的数量(gossip 信息中包含了多少个节点的信息)
// 记录在 count 属性里面
hdr->count = htons(gossipcount);
// 将信息的长度记录到信息里面
hdr->totlen = htonl(totlen);
// 发送信息
clusterSendMessage(link,buf,totlen);
}
复制代码
集群状态更新
随着消息的交换,集群各节点的状态以及集群的状态都在发生变化,所以需要定时更新集群状态。
// 更新节点状态
void clusterUpdateState(void) {
int j, new_state;
int unreachable_masters = 0;
static mstime_t among_minority_time;
static mstime_t first_call_time = 0;
server.cluster->todo_before_sleep &= ~CLUSTER_TODO_UPDATE_STATE;
/* If this is a master node, wait some time before turning the state
* into OK, since it is not a good idea to rejoin the cluster as a writable
* master, after a reboot, without giving the cluster a chance to
* reconfigure this node. Note that the delay is calculated starting from
* the first call to this function and not since the server start, in order
* to don't count the DB loading time. */
if (first_call_time == 0) first_call_time = mstime();
if (nodeIsMaster(myself) &&
mstime() - first_call_time < REDIS_CLUSTER_WRITABLE_DELAY) return;
/* Start assuming the state is OK. We'll turn it into FAIL if there
* are the right conditions. */
new_state = REDIS_CLUSTER_OK;
// 这个逻辑比较重要,可以看到,如果任意一个槽没有分配到节点或者分配了但节点fail,会判定为集群失败
for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
if (server.cluster->slots[j] == NULL ||
server.cluster->slots[j]->flags & (REDIS_NODE_FAIL))
{
new_state = REDIS_CLUSTER_FAIL;
break;
}
}
/* Compute the cluster size, that is the number of master nodes
* serving at least a single slot.
*
* At the same time count the number of unreachable masters with
* at least one node. */
// 统计在线并且正在处理至少一个槽的 master 的数量,
// 以及下线 master 的数量
{
dictIterator *di;
dictEntry *de;
server.cluster->size = 0;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (nodeIsMaster(node) && node->numslots) {
server.cluster->size++;
if (node->flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL))
unreachable_masters++;
}
}
dictReleaseIterator(di);
}
/* If we can't reach at least half the masters, change the cluster state
* to FAIL, as we are not even able to mark nodes as FAIL in this side
* of the netsplit because of lack of majority.
*/
{
int needed_quorum = (server.cluster->size / 2) + 1;
if (unreachable_masters >= needed_quorum) {
new_state = REDIS_CLUSTER_FAIL;
among_minority_time = mstime();
}
}
/* Log a state change */
// 记录状态变更
if (new_state != server.cluster->state) {
mstime_t rejoin_delay = server.cluster_node_timeout;
/* If the instance is a master and was partitioned away with the
* minority, don't let it accept queries for some time after the
* partition heals, to make sure there is enough time to receive
* a configuration update. */
if (rejoin_delay > REDIS_CLUSTER_MAX_REJOIN_DELAY)
rejoin_delay = REDIS_CLUSTER_MAX_REJOIN_DELAY;
if (rejoin_delay < REDIS_CLUSTER_MIN_REJOIN_DELAY)
rejoin_delay = REDIS_CLUSTER_MIN_REJOIN_DELAY;
if (new_state == REDIS_CLUSTER_OK &&
nodeIsMaster(myself) &&
mstime() - among_minority_time < rejoin_delay)
{
return;
}
/* Change the state and log the event. */
redisLog(REDIS_WARNING,"Cluster state changed: %s",
new_state == REDIS_CLUSTER_OK ? "ok" : "fail");
// 设置新状态
server.cluster->state = new_state;
}
}
复制代码
划线
评论
复制
发布于: 3 小时前阅读数: 4
旺仔大菜包
关注
还未添加个人签名 2020.04.30 加入
还未添加个人简介
评论