大家好,我是历小冰,今天来讲一下 Reids Cluster 的 Gossip 协议和集群操作,文章的思维导图如下所示。
集群模式和 Gossip 简介
对于数据存储领域,当数据量或者请求流量大到一定程度后,就必然会引入分布式。比如 Redis,虽然其单机性能十分优秀,但是因为下列原因时,也不得不引入集群。
单机无法保证高可用,需要引入多实例来提供高可用性
单机能够提供高达 8W 左右的QPS,再高的QPS则需要引入多实例
单机能够支持的数据量有限,处理更多的数据需要引入多实例;
单机所处理的网络流量已经超过服务器的网卡的上限值,需要引入多实例来分流。
有集群,集群往往需要维护一定的元数据,比如实例的ip地址,缓存分片的 slots 信息等,所以需要一套分布式机制来维护元数据的一致性。这类机制一般有两个模式:分散式和集中式
分散式机制将元数据存储在部分或者所有节点上,不同节点之间进行不断的通信来维护元数据的变更和一致性。Redis Cluster,Consul 等都是该模式。
而集中式是将集群元数据集中存储在外部节点或者中间件上,比如 zookeeper。旧版本的 kafka 和 storm 等都是使用该模式。
两种模式各有优劣,具体如下表所示:
分散式的元数据模式有多种可选的算法进行元数据的同步,比如说 Paxos、Raft 和 Gossip。Paxos 和 Raft 等都需要全部节点或者大多数节点(超过一半)正常运行,整个集群才能稳定运行,而 Gossip 则不需要半数以上的节点运行。
Gossip 协议,顾名思义,就像流言蜚语一样,利用一种随机、带有传染性的方式,将信息传播到整个网络中,并在一定时间内,使得系统内的所有节点数据一致。对你来说,掌握这个协议不仅能很好地理解这种最常用的,实现最终一致性的算法,也能在后续工作中得心应手地实现数据的最终一致性。
Gossip 协议又称 epidemic 协议(epidemic protocol),是基于流行病传播方式的节点或者进程之间信息交换的协议,在P2P网络和分布式系统中应用广泛,它的方法论也特别简单:
在一个处于有界网络的集群里,如果每个节点都随机与其他节点交换特定信息,经过足够长的时间后,集群各个节点对该份信息的认知终将收敛到一致。
这里的“特定信息”一般就是指集群状态、各节点的状态以及其他元数据等。Gossip协议是完全符合 BASE 原则,可以用在任何要求最终一致性的领域,比如分布式存储和注册中心。另外,它可以很方便地实现弹性集群,允许节点随时上下线,提供快捷的失败检测和动态负载均衡等。
此外,Gossip 协议的最大的好处是,即使集群节点的数量增加,每个节点的负载也不会增加很多,几乎是恒定的。这就允许 Redis Cluster 或者 Consul 集群管理的节点规模能横向扩展到数千个。
Redis Cluster 的 Gossip 通信机制
Redis Cluster 是在 3.0 版本引入集群功能。为了让让集群中的每个实例都知道其他所有实例的状态信息,Redis 集群规定各个实例之间按照 Gossip 协议来通信传递信息。
上图展示了主从架构的 Redis Cluster 示意图,其中实线表示节点间的主从复制关系,而虚线表示各个节点之间的 Gossip 通信。
Redis Cluster 中的每个节点都维护一份自己视角下的当前整个集群的状态,主要包括:
当前集群状态
集群中各节点所负责的 slots信息,及其migrate状态
集群中各节点的master-slave状态
集群中各节点的存活状态及怀疑Fail状态
也就是说上面的信息,就是集群中Node相互八卦传播流言蜚语的内容主题,而且比较全面,既有自己的更有别人的,这么一来大家都相互传,最终信息就全面而且一致了。
Redis Cluster 的节点之间会相互发送多种消息,较为重要的如下所示:
MEET:通过「cluster meet ip port」命令,已有集群的节点会向新的节点发送邀请,加入现有集群,然后新节点就会开始与其他节点进行通信;
PING:节点按照配置的时间间隔向集群中其他节点发送 ping 消息,消息中带有自己的状态,还有自己维护的集群元数据,和部分其他节点的元数据;
PONG: 节点用于回应 PING 和 MEET 的消息,结构和 PING 消息类似,也包含自己的状态和其他信息,也可以用于信息广播和更新;
FAIL: 节点 PING 不通某节点后,会向集群所有节点广播该节点挂掉的消息。其他节点收到消息后标记已下线。
Redis 的源码中 cluster.h 文件定义了全部的消息类型,代码为 redis 4.0版本。
#define CLUSTERMSG_TYPE_PING 0
#define CLUSTERMSG_TYPE_PONG 1
#define CLUSTERMSG_TYPE_MEET 2
#define CLUSTERMSG_TYPE_FAIL 3
#define CLUSTERMSG_TYPE_PUBLISH 4
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6
#define CLUSTERMSG_TYPE_UPDATE 7
#define CLUSTERMSG_TYPE_MFSTART 8
#define CLUSTERMSG_TYPE_COUNT 9
通过上述这些消息,集群中的每一个实例都能获得其它所有实例的状态信息。这样一来,即使有新节点加入、节点故障、Slot 变更等事件发生,实例间也可以通过 PING、PONG 消息的传递,完成集群状态在每个实例上的同步。下面,我们依次来看看几种常见的场景。
定时 PING/PONG 消息
Redis Cluster 中的节点都会定时地向其他节点发送 PING 消息,来交换各个节点状态信息,检查各个节点状态,包括在线状态、疑似下线状态 PFAIL 和已下线状态 FAIL。
Redis 集群的定时 PING/PONG 的工作原理可以概括成两点:
下图显示了两个实例间进行 PING、PONG 消息传递的情况,其中实例一为发送节点,实例二是接收节点
新节点上线
Redis Cluster 加入新节点时,客户端需要执行 CLUSTER MEET 命令,如下图所示。
节点一在执行 CLUSTER MEET 命令时会首先为新节点创建一个 clusterNode 数据,并将其添加到自己维护的 clusterState 的 nodes 字典中。有关 clusterState 和 clusterNode 关系,我们在最后一节会有详尽的示意图和源码来讲解。
然后节点一会根据据 CLUSTER MEET 命令中的 IP 地址和端口号,向新节点发送一条 MEET 消息。新节点接收到节点一发送的MEET消息后,新节点也会为节点一创建一个 clusterNode 结构,并将该结构添加到自己维护的 clusterState 的 nodes 字典中。
接着,新节点向节点一返回一条PONG消息。节点一接收到节点B返回的PONG消息后,得知新节点已经成功的接收了自己发送的MEET消息。
最后,节点一还会向新节点发送一条 PING 消息。新节点接收到该条 PING 消息后,可以知道节点A已经成功的接收到了自己返回的P ONG消息,从而完成了新节点接入的握手操作。
MEET 操作成功之后,节点一会通过稍早时讲的定时 PING 机制将新节点的信息发送给集群中的其他节点,让其他节点也与新节点进行握手,最终,经过一段时间后,新节点会被集群中的所有节点认识。
节点疑似下线和真正下线
Redis Cluster 中的节点会定期检查已经发送 PING 消息的接收方节点是否在规定时间 ( cluster-node-timeout ) 内返回了 PONG 消息,如果没有则会将其标记为疑似下线状态,也就是 PFAIL 状态,如下图所示。
然后,节点一会通过 PING 消息,将节点二处于疑似下线状态的信息传递给其他节点,例如节点三。节点三接收到节点一的 PING 消息得知节点二进入 PFAIL 状态后,会在自己维护的 clusterState 的 nodes 字典中找到节点二所对应的 clusterNode 结构,并将主节点一的下线报告添加到 clusterNode 结构的 fail_reports 链表中。
随着时间的推移,如果节点十 (举个例子) 也因为 PONG 超时而认为节点二疑似下线了,并且发现自己维护的节点二的 clusterNode 的 fail_reports 中有半数以上的主节点数量的未过时的将节点二标记为 PFAIL 状态报告日志,那么节点十将会把节点二将被标记为已下线 FAIL 状态,并且节点十会立刻向集群其他节点广播主节点二已经下线的 FAIL 消息,所有收到 FAIL 消息的节点都会立即将节点二状态标记为已下线。如下图所示。
需要注意的是,报告疑似下线记录是由时效性的,如果超过 cluster-node-timeout *2 的时间,这个报告就会被忽略掉,让节点二又恢复成正常状态。
Redis Cluster 通信源码实现
综上,我们了解了 Redis Cluster 在定时 PING/PONG、新节点上线、节点疑似下线和真正下线等环节的原理和操作流程,下面我们来真正看一下 Redis 在这些环节的源码实现和具体操作。
涉及的数据结构体
首先,我们先来讲解一下其中涉及的数据结构,也就是上文提到的 ClusterNode 等结构。
每个节点都会维护一个 clusterState 结构,表示当前集群的整体状态,它的定义如下所示。
typedef struct clusterState {
clusterNode *myself;
....
dict *nodes;
....
clusterNode *slots[CLUSTER_SLOTS];
....
} clusterState;
它有三个比较关键的字段,具体示意图如下所示:
myself 字段,是一个 clusterNode 结构,用来记录自己的状态;
nodes 字典,记录一个 name 到 clusterNode 结构的映射,以此来记录其他节点的状态;
slot 数组,记录slot 对应的节点 clusterNode结构。
clusterNode 结构保存了一个节点的当前状态,比如节点的创建时间、节点的名字、节点 当前的配置纪元、节点的IP地址和端口号等等。除此之外,clusterNode结构的 link 属性是一个clusterLink结构,该结构保存了连接节点所需的有关信息,比如套接字描述符,输入缓冲区和输出缓冲区。clusterNode 还有一个 fail_report 的列表,用来记录疑似下线报告。具体定义如下所示。
typedef struct clusterNode {
mstime_t ctime;
char name[CLUSTER_NAMELEN];
int flags;
uint64_t configEpoch;
unsigned char slots[CLUSTER_SLOTS/8];
int numslots;
int numslaves;
struct clusterNode **slaves;
struct clusterNode *slaveof;
may be NULL even if the node is a slave
if we don't have the master node in our
tables. */
mstime_t ping_sent;
mstime_t pong_received;
mstime_t fail_time;
mstime_t voted_time;
mstime_t repl_offset_time;
mstime_t orphaned_time;
long long repl_offset;
char ip[NET_IP_STR_LEN];
int port;
int cport;
clusterLink *link;
list *fail_reports;
} clusterNode;
clusterNodeFailReport 是记录节点下线报告的结构体, node 是报告节点的信息,而 time 则代表着报告时间。
typedef struct clusterNodeFailReport {
struct clusterNode *node;
mstime_t time;
} clusterNodeFailReport;
消息结构体
了解了 Reids 节点维护的数据结构体后,我们再来看节点进行通信的消息结构体。 通信消息最外侧的结构体为 clusterMsg,它包括了很多消息记录信息,包括 RCmb 标志位,消息总长度,消息协议版本,消息类型;它还包括了发送该消息节点的记录信息,比如节点名称,节点负责的slot信息,节点ip和端口等;最后它包含了一个 clusterMsgData 来携带具体类型的消息。
typedef struct {
char sig[4];
uint32_t totlen;
uint16_t ver;
uint16_t port;
uint16_t type;
uint16_t count;
uint64_t currentEpoch;
uint64_t configEpoch;
uint64_t offset;
char sender[CLUSTER_NAMELEN];
unsigned char myslots[CLUSTER_SLOTS/8];
char slaveof[CLUSTER_NAMELEN];
char myip[NET_IP_STR_LEN];
char notused1[34];
uint16_t cport;
uint16_t flags;
unsigned char state;
unsigned char mflags[3];
union clusterMsgData data;
} clusterMsg;
clusterMsgData 是一个 union 结构体,它可以为 PING,MEET,PONG 或者 FAIL 等消息体。其中当消息为 PING、MEET 和 PONG 类型时,ping 字段是被赋值的,而是 FAIL 类型时,fail 字段是被赋值的。
union clusterMsgData {
struct {
clusterMsgDataGossip gossip[1];
} ping;
struct {
clusterMsgDataFail about;
} fail;
};
clusterMsgDataGossip 是 PING、PONG 和 MEET 消息的结构体,它会包括发送消息节点维护的其他节点信息,也就是上文中 clusterState 中 nodes 字段包含的信息,具体代码如下所示,你也会发现二者的字段是类似的。
typedef struct {
char nodename[CLUSTER_NAMELEN];
uint32_t ping_sent;
uint32_t pong_received;
char ip[NET_IP_STR_LEN];
uint16_t port;
uint16_t cport;
uint16_t flags;
uint32_t notused1;
} clusterMsgDataGossip;
typedef struct {
char nodename[CLUSTER_NAMELEN];
} clusterMsgDataFail;
看完了节点维护的数据结构体和发送的消息结构体后,我们就来看看 Redis 的具体行为源码了。
随机周期性发送PING消息
Redis 的 clusterCron 函数会被定时调用,每被执行10次,就会准备向随机的一个节点发送 PING 消息。
它会先随机的选出 5 个节点,然后从中选择最久没有与之通信的节点,调用 clusterSendPing 函数发送类型为 CLUSTERMSG_TYPE_PING 的消息
if (!(iteration % 10)) {
int j;
for (j = 0; j < 5; j++) {
de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
if (this->link == NULL || this->ping_sent != 0) continue;
if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
continue;
if (min_pong_node == NULL || min_pong > this->pong_received) {
min_pong_node = this;
min_pong = this->pong_received;
}
}
if (min_pong_node) {
serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
}
}
clusterSendPing 函数的具体行为我们后续再了解,因为该函数在其他环节也会经常用到
节点加入集群
当节点执行 CLUSTER MEET 命令后,会在自身给新节点维护一个 clusterNode 结构体,该结构体的 link 也就是TCP连接字段是 null,表示是新节点尚未建立连接。
clusterCron 函数中也会处理这些未建立连接的新节点,调用 createClusterLink 创立连接,然后调用 clusterSendPing 函数来发送 MEET 消息
if (node->link == NULL) {
int fd;
mstime_t old_ping_sent;
clusterLink *link;
fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
node->cport, NET_FIRST_BIND_ADDR);
link = createClusterLink(node);
link->fd = fd;
node->link = link;
aeCreateFileEvent(server.el,link->fd,AE_READABLE,
clusterReadHandler,link);
old_ping_sent = node->ping_sent;
clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?
CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
node->flags &= ~CLUSTER_NODE_MEET;
}
防止节点假超时及状态过期
防止节点假超时和标记疑似下线标记也是在 clusterCron 函数中,具体如下所示。它会检查当前所有的 nodes 节点列表,如果发现某个节点与自己的最后一个 PONG 通信时间超过了预定的阈值的一半时,为了防止节点是假超时,会主动释放掉与之的 link 连接,然后会主动向它发送一个 PING 消息。
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
now = mstime();
mstime_t delay;
if (node->link &&
now - node->link->ctime >
server.cluster_node_timeout &&
node->ping_sent &&
node->pong_received < node->ping_sent &&
now - node->ping_sent > server.cluster_node_timeout/2)
{
freeClusterLink(node->link);
}
if (node->link &&
node->ping_sent == 0 &&
(now - node->pong_received) > server.cluster_node_timeout/2)
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}
}
处理failover和标记疑似下线
如果防止节点假超时处理后,节点依旧未收到目标节点的 PONG 消息,并且时间已经超过了 cluster_node_timeout,那么就将该节点标记为疑似下线状态。
if (server.cluster->mf_end &&
nodeIsMaster(myself) &&
server.cluster->mf_slave == node &&
node->link)
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}
if (node->ping_sent == 0) continue;
delay = now - node->ping_sent;
if (delay > server.cluster_node_timeout) {
if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) {
redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
node->name);
node->flags |= REDIS_NODE_PFAIL;
update_state = 1;
}
}
实际发送Gossip消息
以下是前方多次调用过的clusterSendPing()方法的源码,代码中有详细的注释,大家可以自行阅读。主要的操作就是将节点自身维护的 clusterState 转换为对应的消息结构体。
void clusterSendPing(clusterLink *link, int type) {
unsigned char *buf;
clusterMsg *hdr;
int gossipcount = 0;
int wanted;
int totlen;
int freshnodes = dictSize(server.cluster->nodes)-2;
wanted = floor(dictSize(server.cluster->nodes)/10);
if (wanted < 3) wanted = 3;
if (wanted > freshnodes) wanted = freshnodes;
if (link->node && type == CLUSTERMSG_TYPE_PING)
link->node->ping_sent = mstime();
clusterBuildMessageHdr(hdr,type);
int maxiterations = wanted*3;
每次向目标节点发送 2 个被选中节点的 gossip 信息(gossipcount 计数) */
while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
dictEntry *de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
* Myself:节点本身。
* PFAIL状态的节点
* 处于 HANDSHAKE 状态的节点。
* 带有 NOADDR 标识的节点
* 因为不处理任何 Slot 而被断开连接的节点
*/
if (this == myself) continue;
if (this->flags & CLUSTER_NODE_PFAIL) continue;
if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||
(this->link == NULL && this->numslots == 0))
{
freshnodes--;
continue;
}
if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue;
clusterSetGossipEntry(hdr,gossipcount,this);
freshnodes--;
gossipcount++;
}
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
hdr->count = htons(gossipcount);
hdr->totlen = htonl(totlen);
clusterSendMessage(link,buf,totlen);
zfree(buf);
}
void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
clusterMsgDataGossip *gossip;
gossip = &(hdr->data.ping.gossip[i]);
memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN);
gossip->ping_sent = htonl(n->ping_sent/1000);
gossip->pong_received = htonl(n->pong_received/1000);
memcpy(gossip->ip,n->ip,sizeof(n->ip));
gossip->port = htons(n->port);
gossip->cport = htons(n->cport);
gossip->flags = htons(n->flags);
gossip->notused1 = 0;
}
下面是 clusterBuildMessageHdr 函数,它主要负责填充消息结构体中的基础信息和当前节点的状态信息。
void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
int totlen = 0;
uint64_t offset;
clusterNode *master;
master = (nodeIsSlave(myself) && myself->slaveof) ?
myself->slaveof : myself;
memset(hdr,0,sizeof(*hdr));
hdr->ver = htons(CLUSTER_PROTO_VER);
hdr->sig[0] = 'R';
hdr->sig[1] = 'C';
hdr->sig[2] = 'm';
hdr->sig[3] = 'b';
hdr->type = htons(type);
memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN);
memset(hdr->myip,0,NET_IP_STR_LEN);
if (server.cluster_announce_ip) {
strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN);
hdr->myip[NET_IP_STR_LEN-1] = '\0';
}
int announced_port = server.cluster_announce_port ?
server.cluster_announce_port : server.port;
int announced_cport = server.cluster_announce_bus_port ?
server.cluster_announce_bus_port :
(server.port + CLUSTER_PORT_INCR);
memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
memset(hdr->slaveof,0,CLUSTER_NAMELEN);
if (myself->slaveof != NULL)
memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);
hdr->port = htons(announced_port);
hdr->cport = htons(announced_cport);
hdr->flags = htons(myself->flags);
hdr->state = server.cluster->state;
hdr->currentEpoch = htonu64(server.cluster->currentEpoch);
hdr->configEpoch = htonu64(master->configEpoch);
if (nodeIsSlave(myself))
offset = replicationGetSlaveOffset();
else
offset = server.master_repl_offset;
hdr->offset = htonu64(offset);
if (nodeIsMaster(myself) && server.cluster->mf_end)
hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;
if (type == CLUSTERMSG_TYPE_FAIL) {
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataFail);
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataUpdate);
}
hdr->totlen = htonl(totlen);
}
后记
本来只想写一下 Redis Cluster 的 Gossip 协议,没想到文章越写,内容越多,最后源码分析也是有点虎头蛇尾,大家就凑合看一下,也希望大家继续关注我后续的问题。
评论