写点什么

Cassandra Gossip 协议的二三事儿

发布于: 2020 年 09 月 01 日

摘要:Gossip协议是Cassandra维护各节点状态的一个重要组件,下面我们以Gossip协议三次握手为线索逐步分析Gossip协议源码。



Gossip协议是Cassandra维护各节点状态的一个重要组件,下面我们以Gossip协议三次握手为线索逐步分析Gossip协议源码。



Gossip协议通过判断节点的generationversion 来确认节点状态信息新旧,如果节点重启,则generation加一,version每次从零开始计算。所以 generation是大版本号,version为小版本号,理解这个概念对后面的握手逻辑有很大帮助。



Gossip协议最重要的一个属性是endpointStateMap ,这个map以address为key,以EndpointState为value维护节点自身状态信息。EndopointState 包含了节点 net_version,host_id,rpc_address,release_version,dc,rack,load,status,tokens 信息。总体来说,所有节点维护的endpointStateMap应该是一致的,如果出现不一致信息或者新增,替换,删除节点 ,这中间的状态维护就要靠Gossip来实现了。



另外一个重要属性subscribers ,当节点状态变更时候,gossip 会通知各个subscribers。



Gossip启动时候,会每隔一秒会在集群中随机选择一个节点发送一条GossipDigestSyn消息,开始和其他节点的通信,如下图:





接下来我们根据上面的了流程图一步步分析gossip代码,GossipDigestSyn 消息是在GossipTask构造的。



1 // syn消息包含 集群名字,分区器,和gDigests消息
2 GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),DatabaseDescriptor.getPartitionerName(),gDigests);
3
4 MessageOut<GossipDigestSyn> message = new MessageOut<GossipDigestSyn>(MessagingService.Verb.GOSSIP_DIGEST_SYN,digestSynMessage,GossipDigestSyn.serializer);



GossipDigestSyn 消息的主要部分在gDigests里面,gDigests是通过方法Gossiper.instance.makeRandomGossipDigest(gDigests) 生成的。



private void makeRandomGossipDigest(List<GossipDigest> gDigests)
02 {
03 EndpointState epState;
04 int generation = 0 ;
05 int maxVersion = 0 ;
06
07 // local epstate will be part of endpointStateMap
08 //当前节点维护的节点列表
09 List<InetAddress> endpoints = new ArrayList<InetAddress>(endpointStateMap.keySet());
10 //乱序处理
11 Collections.shuffle(endpoints, random);
12 for (InetAddress endpoint : endpoints)
13 {
14 epState = endpointStateMap.get(endpoint);
15 if (epState != null )
16 {
17 //获取generation版本号
18 generation = epState.getHeartBeatState().getGeneration();
19 //EndpointState包含了tokens,hostid,status,load等信息,所以冒泡排序获取其中最大的maxVersion
20 maxVersion = getMaxEndpointStateVersion(epState);
21 }
22 gDigests.add( new GossipDigest(endpoint, generation, maxVersion));
23 }
24
25 if (logger.isTraceEnabled())
26 {
27 StringBuilder sb = new StringBuilder();
28 for (GossipDigest gDigest : gDigests)
29 {
30 sb.append(gDigest);
31 sb.append( " " );
32 }
33 logger.trace( "Gossip Digests are : {}" , sb);
34 }
35 }



A节点发出GossipDigestSyn后,B节点会通过GossipDigestSynVerbHandler 来处理GossipDigestSyn 消息,具体处理逻辑在Gossiper.instance.examineGossiper中,







上面方法对比版本号以后,主要处理逻辑在senall方法和requestAll方法,继续跟进:



1 private void requestAll(GossipDigest gDigest, List<GossipDigest> deltaGossipDigestList, int remoteGeneration)
2 {
3 /* We are here since we have no data for this endpoint locally so request everthing. */
4 //生成一个Digest,等待对方节点发送消息
5 deltaGossipDigestList.add( new GossipDigest(gDigest.getEndpoint(), remoteGeneration, 0 ));
6 if (logger.isTraceEnabled())
7 logger.trace( "requestAll for {}" , gDigest.getEndpoint());
8 }
1 private void sendAll(GossipDigest gDigest, Map<InetAddress, EndpointState> deltaEpStateMap, int maxRemoteVersion)
2 {
3 EndpointState localEpStatePtr = getStateForVersionBiggerThan(gDigest.getEndpoint(), maxRemoteVersion);
4 if (localEpStatePtr != null )
5 //将endpintState信息通过ack 消息发送给对方
6 deltaEpStateMap.put(gDigest.getEndpoint(), localEpStatePtr);
7 }



到这里我们发现向对方节点发送的ack消息已经构造完成了,包含了deltaGossipDigestList(对方节点信息最新,我们需要对方节点给我们发endpointState) 和 deltaEpStateMap(当前节点新,我们发送给对方节点) 。



Gossip 通过GossipDigestAckVerbHandler 处理ack消息,主要逻辑有两块:



1.如果deltaEpStateMap有数据,则说明需要更新本地applicationState,执行Gossiper.instance.applyStateLocally方法



2.如果deltaGossipDigestList 有数据,则说明对方节点需要更新,构造EndpointState,并发送ack2消息给对方



GossipDigestAck2VerbHandler 用来处理 ack2消息,主要逻辑也在Gossiper.instance.applyStateLocally中,我们看一下Gossiper.instance.applyStateLocally的逻辑:







到这里Gossip 三次握手的全过程就分析完了(由于平台字数限制,部分代码以图片形式展示,可点击放大查看哦)。



点击关注,第一时间了解华为云新鲜技术~



发布于: 2020 年 09 月 01 日阅读数: 58
用户头像

提供全面深入的云计算技术干货 2020.07.14 加入

华为云开发者社区,提供全面深入的云计算前景分析、丰富的技术干货、程序样例,分享华为云前沿资讯动态,方便开发者快速成长与发展,欢迎提问、互动,多方位了解云计算! 传送门:https://bbs.huaweicloud.com/

评论

发布
暂无评论
Cassandra Gossip协议的二三事儿