写点什么

Zookeeper 通信协议详解

用户头像
tunsuy
关注
发布于: 2020 年 06 月 25 日
Zookeeper通信协议详解

通信协议


基于 TCP/IP 协议,zk 实现了自己的通信协议来完成客户端与服务端,服务端与服务端之间的网络通信,zk 的通信协议整体上的设计非常简单。


客户端发起连接,发送握手包进行 timeout 协商,协商成功后会返回一个 session id 和 timeout 值。随后就可以进行正常通信,通信过程中要在 timeout 范围内发送 ping 包。

zookeeper client 和 server 之间的通信协议基本规则就是发送请求 获取响应,并根据响应做不同的动作。


1、请求数据格式

消息长度+xid+request

  • xid 每次请求必须是唯一的,消息长度和 xid 同为 4 字节,命令长度为 4 字节且必须为 request 的开始 4 字节。

  • 命令是从 1 到 11 的数字表示,close 的命令为-11,不同类型请求 request 有些差异。

  • 特殊请求具有固定的 xid,比如 watch_xid 固定为-1,ping_xid 为-2,auth_xid 固定为-4,普通请求一般从 0 开始每次请求累加一次 xid。


2、响应数据格式

消息长度+header+response

  • 消息长度为 4 字节,表明 header+response 的总长度。

  • header 为 xid、zxid、err,对应长度为 4、8、4。response 根据请求类型不同具有差别

  • 根据 header 里 xid 的区别分为 watch、ping、auth、data 这四种类型

  • 根据这四种类型来区分返回消息是事件、心跳、认证、请求数据,client 并以此作出不同响应。


消息结构

下面从几种不同的请求类型分别来介绍

1、握手消息

1.1、request 消息体

protocol_version+zxid+timeout+session_id+passwd_len+passwd+read_only

  • 对应的字节长度为 4,8,4,8,4,16,1

  • 取值除 timeout 外其他几个皆可为 0,password 可以为任意 16 个字符,read_only 为 0 或 1(是布尔值).

注:握手包没有 xid 和命令

1.2、response 消息体

protocol_version+timeout+session_id+passwd_len+passwd+read_only.

注:握手响应包没有 header.

1.3、效果展示

2020-04-21 19:26:53.990 localhost cms_watcher info  INFO cms_watcher [pid:240] [Thread-4] [connection.py:646 _connect]  Connecting to 30.3.3.60:9888, use_ssl: False2020-04-21 19:26:53.990 localhost cms_watcher info  INFO cms_watcher [pid:240] [Thread-4] [connection.py:650 _connect]      Using session_id: 144131667822575626 session_passwd: b'41f366ef7005bc5c859b7fc56fa40872'2020-04-21 19:26:53.990 localhost cms_watcher info  INFO cms_watcher [pid:240] [Thread-4] [connection.py:299 _submit]  Sending request(xid=None): Connect(protocol_version=0, last_zxid_seen=12884901993, time_out=30000, session_id=144131667822575626, passwd=b'A\xf3f\xefp\x05\xbc\\\x85\x9b\x7f\xc5o\xa4\x08r', read_only=None)2020-04-21 19:26:53.991 localhost cms_watcher info  INFO cms_watcher [pid:240] [Thread-4] [connection.py:285 _invoke]  Read response Connect(protocol_version=0, last_zxid_seen=0, time_out=30000, session_id=144131667822575626, passwd=b'A\xf3f\xefp\x05\xbc\\\x85\x9b\x7f\xc5o\xa4\x08r', read_only=False)2020-04-21 19:26:53.991 localhost cms_watcher info  INFO cms_watcher [pid:240] [Thread-4] [connection.py:694 _connect]  Session created, session_id: 144131667822575626 session_passwd: b'41f366ef7005bc5c859b7fc56fa40872'    negotiated session timeout: 30000    connect timeout: 10000.0    read timeout: 20000.02020-04-21 19:26:53.991 localhost cms_watcher info  INFO cms_watcher [pid:240] [Thread-4] [client.py:463 _session_callback]  test: cur state CONNECTED, old state CONNECTING
复制代码


2、ping 消息

该消息就是客户端与服务端之前的心跳请求,通过在握手消息中协商的 timeout 内,定时发送心跳包。

2.1、request 消息体

type (ping 包只有一个字段就是命令值是 11,它完整的发送包是 4 字节长度,4 字节 xid,4 字节命令.)

2.2、response 消息体

res_len+header+res (ping 响应包一般只拆到 header 即可通过 xid 确认)

2.3、效果展示


2020-04-21 20:05:03.971 localhost cms_watcher info  INFO cms_watcher [pid:240] [Thread-4] [connection.py:603 _connect_attempt]  test: send ping2020-04-21 20:05:03.971 localhost cms_watcher info  INFO cms_watcher [pid:240] [Thread-4] [connection.py:490 _send_ping]  test: send ping2020-04-21 20:05:03.971 localhost cms_watcher info  INFO cms_watcher [pid:240] [Thread-4] [connection.py:299 _submit]  Sending request(xid=-2): Ping()2020-04-21 20:05:03.973 localhost cms_watcher info  INFO cms_watcher [pid:240] [Thread-4] [connection.py:606 _connect_attempt]  test: read socket2020-04-21 20:05:03.973 localhost cms_watcher info  INFO cms_watcher [pid:240] [Thread-4] [connection.py:415 _read_socket]  test: Received Ping
复制代码


3、getdata 消息

业务消息有很多种,这里只讲解请求数据的消息过程。其他类似,请自行查询官方文档。

3.1、request 消息体

type+path_len+path+watcher type=4.

  • path_len:是 4 字节,为 path 的长度

  • path 为需要查询的路径,支持 utf8

  • watcher 为布尔值:判断是否有事件注册,为 1 或 0,长度为 1 字节

3.2、response 消息体

data_len+data+stat

  • data_len 为 data 长度,4 字节.

  • stat 由 8,8,8,8,4,4,4,8,4,4,8 字节顺序组成.

3.3、效果展示


2020-04-21 20:25:13.460 localhost cms_watcher info  INFO cms_watcher [pid:9078] [Thread-4] [connection.py:610 _connect_attempt]  test: send request2020-04-21 20:25:13.460 localhost cms_watcher info  INFO cms_watcher [pid:9078] [Thread-4] [connection.py:482 _send_request]  test: send request xid 42020-04-21 20:25:13.460 localhost cms_watcher info  INFO cms_watcher [pid:9078] [Thread-4] [connection.py:299 _submit]  Sending request(xid=4): GetData(path='/cms/config/items/item.cts_cfg', watcher=<bound method DataWatch._watcher of <kazoo.recipe.watchers.DataWatch object at 0x7ff860ba3438>>)2020-04-21 20:25:13.461 localhost cms_watcher info  INFO cms_watcher [pid:9078] [Thread-4] [connection.py:606 _connect_attempt]  test: read socket2020-04-21 20:25:13.461 localhost cms_watcher info  INFO cms_watcher [pid:9078] [Thread-4] [connection.py:448 _read_socket]  test: Reading for header ReplyHeader(xid=4, zxid=17179869239, err=0)
复制代码


序列化和反序列化

为了有一个感性的认识:客户端根服务端到底是怎么通过这些协议来交互的呢?

下面就看下 kazoo 这个库是怎样根据 zk 的这个协议来组装数据和解析数据的

注:kazoo是python下的一个zookeeper客户端库。


1、request 字节流


下面的代码展示了将请求对象序列化成 socket 字节流的过程


def _submit(self, request, timeout, xid=None):	"""Submit a request object with a timeout value and optional	xid"""	b = bytearray()	if xid:		b.extend(int_struct.pack(xid))	if request.type:		b.extend(int_struct.pack(request.type))	b += request.serialize()	self.logger.log(		(BLATHER if isinstance(request, Ping) else logging.DEBUG),		"Sending request(xid=%s): %s", xid, request)	self._write(int_struct.pack(len(b)) + b, timeout)
复制代码


从上面的代码可以看出:

首先根据不同的请求,决定是否发送 xid 字段、type 字段(也就是上面协议所说的),

最后根据 request 对象序列化成字节流。

这里的 request 就是 kazoo/protocol/serialization.py 定义的各个类实例

比如连接类:


class Connect(namedtuple('Connect', 'protocol_version last_zxid_seen'                         ' time_out session_id passwd read_only')):    type = None
def serialize(self): b = bytearray() b.extend(int_long_int_long_struct.pack( self.protocol_version, self.last_zxid_seen, self.time_out, self.session_id)) b.extend(write_buffer(self.passwd)) b.extend([1 if self.read_only else 0]) return b
@classmethod def deserialize(cls, bytes, offset): proto_version, timeout, session_id = int_int_long_struct.unpack_from( bytes, offset) offset += int_int_long_struct.size password, offset = read_buffer(bytes, offset)
try: read_only = bool_struct.unpack_from(bytes, offset)[0] is 1 offset += bool_struct.size except struct.error: read_only = False return cls(proto_version, 0, timeout, session_id, password, read_only), offset
复制代码


2、response 字节流


下面的代码展示了将 socket 字节流反序列化成对象的过程


def _read_header(self, timeout):	b = self._read(4, timeout)	length = int_struct.unpack(b)[0]	b = self._read(length, timeout)	header, offset = ReplyHeader.deserialize(b, 0)	return header, b, offset
复制代码


从上面的代码可以看出:

首先从 socket 中读取 4 个字节,根据上面的协议,我们知道,这 4 个字节是 data_len,即包的大小

然后在根据 len 继续读取该大小的字节流,最后解析成具体的对象。


class ReplyHeader(namedtuple('ReplyHeader', 'xid, zxid, err')):    @classmethod    def deserialize(cls, bytes, offset):        """Given bytes and the current bytes offset, return a        :class:`ReplyHeader` instance and the new offset"""        new_offset = offset + reply_header_struct.size        return cls._make(            reply_header_struct.unpack_from(bytes, offset)), new_offset
复制代码


发布于: 2020 年 06 月 25 日阅读数: 128
用户头像

tunsuy

关注

公众号:有文化的技术人 2020.04.27 加入

专注于软件基础平台建设,中间件,架构设计,分布式系统,微服务等领域

评论

发布
暂无评论
Zookeeper通信协议详解