写点什么

zk 源码—会话的实现原理

  • 2025-04-18
    福建
  • 本文字数:24898 字

    阅读完需:约 82 分钟

1.创建会话

 

会话是 zk 中最核心的概念之一,客户端与服务端的交互都离不开会话的相关操作。其中包括临时节点的生命周期、客户端请求的顺序、Watcher 通知机制等。比如会话关闭时,服务端会自动删除该会话所创建的临时节点。当客户端会话退出,通过 Watcher 机制可向订阅该事件的客户端发送通知。

 

(1)客户端的会话状态


当 zk 客户端与服务端成功建立连接后,就会创建一个会话。在 zk 客户端的运行过程(会话生命周期)中,会话会经历不同的状态变化。

 

这些不同的会话状态包括:正在连接(CONNECTING)、已经连接(CONNECTED)、会话关闭(CLOSE)、正在重新连接(RECONNECTING)、已经重新连接(RECONNECTED)等。

 

如果 zk 客户端需要与服务端建立连接创建一个会话,那么客户端就必须提供一个使用字符串表示的 zk 服务端地址列表。

 

当客户端刚开始创建 ZooKeeper 对象时,其会话状态就是 CONNECTING,之后客户端会根据服务端地址列表中的 IP 地址分别尝试进行网络连接。如果成功连接上 zk 服务端,那么客户端的会话状态就会变为 CONNECTED。

 

如果因为网络闪断或者其他原因造成客户端与服务端之间的连接断开,那么 zk 客户端会自动进行重连操作,同时其会话状态变为 CONNECTING,直到重新连接上 zk 服务端后,客户端的会话状态才变回 CONNECTED。

 

通常 总是在 CONNECTING 或 CONNECTED 间切换。如果出现会话超时、权限检查失败、客户端主动退出程序等情况,那么客户端的会话状态就会直接变为 CLOSE。


public class CreateSessionDemo {    private final static String CONNECTSTRING = "192.168.1.5:2181";    private static CountDownLatch countDownLatch = new CountDownLatch(1);        public static void main(String[] args) throws Exception {        //创建zk        ZooKeeper zooKeeper = new ZooKeeper(CONNECTSTRING, 5000, new Watcher() {            public void process(WatchedEvent watchedEvent) {                //如果当前的连接状态是连接成功, 则通过计数器去控制, 否则进行阻塞, 因为连接是需要时间的                //如果已经获得连接了, 那么状态会是SyncConnected                if (watchedEvent.getState() == Event.KeeperState.SyncConnected){                    countDownLatch.countDown();                    System.out.println(watchedEvent.getState());                }                //如果数据发生了变化                if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {                    System.out.println("节点发生了变化, 路径: " + watchedEvent.getPath());                }            }        });        //进行阻塞        countDownLatch.await();        //确定已经获得连接了再进行zk的操作: 增删改查        ...    }}
public class ZooKeeper implements AutoCloseable { protected final ClientCnxn cnxn; protected final ZKWatchManager watchManager;//ZKWatchManager实现了ClientWatchManager ... //1.初始化ZooKeeper对象 public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException { ... //创建客户端的Watcher管理器ZKWatchManager watchManager = defaultWatchManager(); //2.设置会话默认的Watcher,保存在客户端的Watcher管理器ZKWatchManager中 watchManager.defaultWatcher = watcher; ConnectStringParser connectStringParser = new ConnectStringParser(connectString); //3.构造服务器地址列表管理器StaticHostProvider hostProvider = aHostProvider; //4.创建并初始化客户端的网络连接器ClientCnxn + 5.初始化SendThread和EventThread cnxn = createConnection(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); //6.启动SendThread和EventThread cnxn.start(); } protected ClientCnxn createConnection(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly) throws IOException { return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this, watchManager, clientCnxnSocket, canBeReadOnly); } //从配置中获取客户端使用的网络连接配置:使用NIO还是Netty,然后通过反射进行实例化客户端Socket private ClientCnxnSocket getClientCnxnSocket() throws IOException { String clientCnxnSocketName = getClientConfig().getProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET); if (clientCnxnSocketName == null) { clientCnxnSocketName = ClientCnxnSocketNIO.class.getName(); } Constructor<?> clientCxnConstructor = Class.forName(clientCnxnSocketName).getDeclaredConstructor(ZKClientConfig.class); ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig()); return clientCxnSocket; } public enum States { //客户端的会话状态包括 CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY, CLOSED, AUTH_FAILED, NOT_CONNECTED; public boolean isAlive() { return this != CLOSED && this != AUTH_FAILED; } public boolean isConnected() { return this == CONNECTED || this == CONNECTEDREADONLY; } } ... static class ZKWatchManager implements ClientWatchManager { private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>(); private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>(); private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>(); protected volatile Watcher defaultWatcher; ... } protected ZKWatchManager defaultWatchManager() { //创建客户端的Watcher管理器ZKWatchManager return new ZKWatchManager(getClientConfig().getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)); } ...}
public class ClientCnxn { ... volatile States state = States.NOT_CONNECTED; private final HostProvider hostProvider; public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) { ... this.hostProvider = hostProvider; //5.初始化SendThread和EventThread sendThread = new SendThread(clientCnxnSocket); eventThread = new EventThread(); ... } //6.启动SendThread和EventThread public void start() { sendThread.start(); eventThread.start(); } class SendThread extends ZooKeeperThread { private final ClientCnxnSocket clientCnxnSocket; ... SendThread(ClientCnxnSocket clientCnxnSocket) { super(makeThreadName("-SendThread()")); //客户端刚开始创建ZooKeeper对象时,设置其会话状态为CONNECTING state = States.CONNECTING; this.clientCnxnSocket = clientCnxnSocket; //设置为守护线程 setDaemon(true); } @Override public void run() { ... while (state.isAlive()) { ... //7.获取其中一个zk服务端的地址 serverAddress = hostProvider.next(1000); //向zk服务端发起连接请求 startConnect(serverAddress); ... } ... } private void startConnect(InetSocketAddress addr) throws IOException { ... state = States.CONNECTING; //8.创建TCP连接 //接下来以ClientCnxnSocketNetty的connect为例 clientCnxnSocket.connect(addr); } void onConnected(int _negotiatedSessionTimeout, long _sessionId, byte[] _sessionPasswd, boolean isRO) throws IOException { ... //和服务端建立连接后的处理 state = (isRO) ? States.CONNECTEDREADONLY : States.CONNECTED; ... } ... }}
public class ClientCnxnSocketNetty extends ClientCnxnSocket { //向zk服务端发起建立连接的请求 @Override void connect(InetSocketAddress addr) throws IOException { ... Bootstrap bootstrap = new Bootstrap().group(eventLoopGroup) .channel(NettyUtils.nioOrEpollSocketChannel()) .option(ChannelOption.SO_LINGER, -1).option(ChannelOption.TCP_NODELAY, true) .handler(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort())); bootstrap = configureBootstrapAllocator(bootstrap); bootstrap.validate(); connectFuture = bootstrap.connect(addr); ... } private class ZKClientHandler extends SimpleChannelInboundHandler<ByteBuf> { ... @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) throws Exception { ... //与zk服务端建立好连接后的处理,调用父类ClientCnxnSocket的readConnectResult()方法 readConnectResult(); ... } ... } ...}
abstract class ClientCnxnSocket { void readConnectResult() throws IOException { ... sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO); } ...}
复制代码


(2)服务端的会话创建


在 zk 服务端中,使用 SessionImpl 表示客户端与服务器端连接的会话实体。SessionImpl 由三个部分组成:会话 ID(sessionID)、会话超时时间(timeout)、会话关闭状态(isClosing)。

 

一.会话 ID

会话 ID 是一个会话的标识符,当创建一次会话时,zk 服务端会自动为其分配一个唯一的 ID。

 

二.会话超时时间

一个会话的超时时间就是指一次会话从发起后到被服务器关闭的时长。设置会话超时时间后,zk 服务端会参考设置的超时时间,最终计算一个服务端自己的超时时间。这个超时时间才是真正被 zk 服务端用于管理用户会话的超时时间。

 

三.会话关闭状态

会话关闭状态 isClosing 表示一个会话是否已经关闭。如果 zk 服务端检查到一个会话已经因为超时等原因失效时,就会将该会话的 isClosing 标记为关闭,之后就不再对该会话进行操作。


public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {    ...    public static class SessionImpl implements Session {        SessionImpl(long sessionId, int timeout) {            this.sessionId = sessionId;            this.timeout = timeout;            isClosing = false;        }
final long sessionId;//会话ID final int timeout;//会话超时时间 boolean isClosing;//会话关闭状态 ... } ...}
复制代码


服务端收到客户端的创建会话请求后,进行会话创建的过程大概分四步:处理 ConnectRequest 请求、创建会话、请求处理链处理和会话响应。

 

步骤一:处理 ConnectRequest 请求

首先由 NettyServerCnxn 负责接收来自客户端的创建会话请求,然后反序列化出 ConnectRequest 对象,并完成会话超时时间的协商。

 

步骤二:创建会话

SessionTrackerImpl 的 createSession()方法会为该会话分配一个 sessionID,并将该 sessionID 注册到 sessionsById 和 sessionsWithTimeout 中,同时通过 SessionTrackerImpl 的 updateSessionExpiry()方法进行会话激活。

 

步骤三:请求处理链处理

接着调用 ZooKeeperServer.firstProcessor 的 processRequest()方法,让该会话请求会在 zk 服务端的各个请求处理器之间进行顺序流转。

 

步骤四:会话响应

最后在请求处理器 FinalRequestProcessor 的 processRequest()方法中进行会话响应。


//由网络连接工厂类监听到客户端的创建会话请求public class NettyServerCnxnFactory extends ServerCnxnFactory {    class CnxnChannelHandler extends ChannelDuplexHandler {        @Override        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {            ...            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();            cnxn.processMessage((ByteBuf) msg);            ...        }        ...    }    ...}
public class NettyServerCnxn extends ServerCnxn { private volatile ZooKeeperServer zkServer; void processMessage(ByteBuf buf) { ... receiveMessage(buf); ... } private void receiveMessage(ByteBuf message) { ... ZooKeeperServer zks = this.zkServer; //处理会话连接请求 zks.processConnectRequest(this, bb); ... } ...}
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { protected SessionTracker sessionTracker; ... public synchronized void startup() { startupWithServerState(State.RUNNING); } private void startupWithServerState(State state) { //创建并启动会话管理器 if (sessionTracker == null) { createSessionTracker(); } startSessionTracker(); //初始化请求处理链 setupRequestProcessors(); ... } protected void createSessionTracker() { sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime, createSessionTrackerServerId, getZooKeeperServerListener()); } public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { //步骤一:处理ConnectRequest请求 BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer)); ConnectRequest connReq = new ConnectRequest(); connReq.deserialize(bia, "connect"); ... //协商会话超时时间 int sessionTimeout = connReq.getTimeOut(); byte passwd[] = connReq.getPasswd(); int minSessionTimeout = getMinSessionTimeout(); if (sessionTimeout < minSessionTimeout) { sessionTimeout = minSessionTimeout; } int maxSessionTimeout = getMaxSessionTimeout(); if (sessionTimeout > maxSessionTimeout) { sessionTimeout = maxSessionTimeout; } cnxn.setSessionTimeout(sessionTimeout); ... long sessionId = connReq.getSessionId(); if (sessionId == 0) { //步骤二:创建会话 long id = createSession(cnxn, passwd, sessionTimeout); } else { long clientSessionId = connReq.getSessionId(); if (serverCnxnFactory != null) { serverCnxnFactory.closeSession(sessionId); } if (secureServerCnxnFactory != null) { secureServerCnxnFactory.closeSession(sessionId); } cnxn.setSessionId(sessionId); reopenSession(cnxn, sessionId, passwd, sessionTimeout); } } long createSession(ServerCnxn cnxn, byte passwd[], int timeout) { if (passwd == null) { passwd = new byte[0]; } //通过会话管理器创建会话 long sessionId = sessionTracker.createSession(timeout); Random r = new Random(sessionId ^ superSecret); r.nextBytes(passwd); ByteBuffer to = ByteBuffer.allocate(4); to.putInt(timeout); cnxn.setSessionId(sessionId); Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null); setLocalSessionFlag(si); //激活会话 + 提交请求到请求处理链进行处理 submitRequest(si); return sessionId; } public void submitRequest(Request si) { ... //激活会话 touch(si.cnxn); //步骤三:交给请求处理链进行处理,在FinalRequestProcessor中会进行会话响应 firstProcessor.processRequest(si); ... } ...}
public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker { ... private final AtomicLong nextSessionId = new AtomicLong(); private final ExpiryQueue<SessionImpl> sessionExpiryQueue; public SessionTrackerImpl(SessionExpirer expirer, ConcurrentMap<Long, Integer> sessionsWithTimeout, int tickTime, long serverId, ZooKeeperServerListener listener) { super("SessionTracker", listener); this.expirer = expirer; this.sessionExpiryQueue = new ExpiryQueue<SessionImpl>(tickTime); this.sessionsWithTimeout = sessionsWithTimeout; //初始化SessionId this.nextSessionId.set(initializeNextSession(serverId)); for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) { addSession(e.getKey(), e.getValue()); } EphemeralType.validateServerId(serverId); } ... public long createSession(int sessionTimeout) { //为会话分配一个sessionID long sessionId = nextSessionId.getAndIncrement(); //将sessionID注册到sessionsById和sessionsWithTimeout中 addSession(sessionId, sessionTimeout); return sessionId; } public synchronized boolean addSession(long id, int sessionTimeout) { sessionsWithTimeout.put(id, sessionTimeout); boolean added = false; SessionImpl session = sessionsById.get(id); if (session == null) { session = new SessionImpl(id, sessionTimeout); }
SessionImpl existedSession = sessionsById.putIfAbsent(id, session); if (existedSession != null) { session = existedSession; } else { added = true; LOG.debug("Adding session 0x" + Long.toHexString(id)); } ... updateSessionExpiry(session, sessionTimeout); return added; } private void updateSessionExpiry(SessionImpl s, int timeout) { ... sessionExpiryQueue.update(s, timeout); } ...}
public class FinalRequestProcessor implements RequestProcessor { ... public void processRequest(Request request) { ... ServerCnxn cnxn = request.cnxn; //步骤四:会话响应 cnxn.sendResponse(hdr, rsp, "response"); ... } ...}
public abstract class ServerCnxn implements Stats, Watcher { ... public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); try { baos.write(fourBytes); bos.writeRecord(h, "header"); if (r != null) { bos.writeRecord(r, tag); } baos.close(); } catch (IOException e) { LOG.error("Error serializing response"); } byte b[] = baos.toByteArray(); serverStats().updateClientResponseSize(b.length - 4); ByteBuffer bb = ByteBuffer.wrap(b); bb.putInt(b.length - 4).rewind(); sendBuffer(bb); } ...}
public class NettyServerCnxn extends ServerCnxn { ... @Override public void sendBuffer(ByteBuffer sendBuffer) { if (sendBuffer == ServerCnxnFactory.closeConn) { close(); return; } channel.writeAndFlush(Unpooled.wrappedBuffer(sendBuffer)).addListener(onSendBufferDoneListener); } ...}
复制代码


(3)会话 ID 的初始化实现


SessionTracker 是 zk 服务端的会话管理器,zk 会话的整个生命周期都离不开 SessionTracker 的参与。SessionTracker 是一个接口类型,规定了会话管理的相关操作行为,具体的会话管理逻辑则由 SessionTrackerImpl 来完成。

 

SessionTrackerImpl 类实现了 SessionTracker 接口,其中有四个关键字段:sessionExpiryQueue 字段表示的是会话过期队列,用于管理会话自动过期。nextSessionId 字段记录了当前生成的会话 ID。sessionsById 字段用于根据会话 ID 来管理具体的会话实体。sessionsWithTimeout 字段用于根据会话 ID 管理会话的超时时间。

 

在 SessionTrackerImpl 类初始化时,会调用 initializeNextSession()方法来生成一个初始化的会话 ID。之后在 zk 的运行过程中,会在该会话 ID 的基础上为每个会话分配 ID。


public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {    ...    private final ExpiryQueue<SessionImpl> sessionExpiryQueue;//过期队列    private final AtomicLong nextSessionId = new AtomicLong();//当前生成的会话ID    ConcurrentHashMap<Long, SessionImpl> sessionsById;//根据会话ID来管理具体的会话实体    ConcurrentMap<Long, Integer> sessionsWithTimeout;//根据不同的会话ID管理每个会话的超时时间        public SessionTrackerImpl(SessionExpirer expirer, ConcurrentMap<Long, Integer> sessionsWithTimeout,             int tickTime, long serverId, ZooKeeperServerListener listener) {        super("SessionTracker", listener);        this.expirer = expirer;        this.sessionExpiryQueue = new ExpiryQueue<SessionImpl>(tickTime);        this.sessionsWithTimeout = sessionsWithTimeout;        //初始化SessionId        this.nextSessionId.set(initializeNextSession(serverId));        for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) {            addSession(e.getKey(), e.getValue());        }        EphemeralType.validateServerId(serverId);    }        public static long initializeNextSession(long id) {        long nextSid;        nextSid = (Time.currentElapsedTime() << 24) >>> 8;        nextSid = nextSid | (id <<56);        return nextSid;    }    ...}
复制代码


在 SessionTrackerImpl 的 initializeNextSession()方法中,生成初始化的会话 ID 的过程如下:

步骤一:获取当前时间的毫秒表示

步骤二:将得到的毫秒表示的时间先左移 24 位

步骤三:将左移 24 位后的结果再右移 8 位

步骤四:服务器 SID 左移 56 位

步骤五:将右移 8 位的结果和左移 56 位的结果进行位与运算

 

算法概述:高 8 位确定所在机器,低 56 位使用当前时间的毫秒表示来进行随机。

 

其中左移 24 位的目的是:将毫秒表示的时间的最高位的 1 移出,可以防止出现负数。


//时间的二进制表示,41位10100000110000011110001000100110111110111//左移24位变成,64位0100000110000011110001000100110111110111000000000000000000000000//右移8位变成,64位0000000001000001100000111100010001001101111101110000000000000000//假设服务器SID为2,那么左移56位变成0000001000000000000000000000000000000000000000000000000000000000//位与运算0000001001000001100000111100010001001101111101110000000000000000
复制代码


(4)设置的会话超时时间没生效的原因


在平时的开发工作中,最常遇到的场景就是会话超时异常。zk 的会话超时异常包括:客户端 readTimeout 异常和服务端 sessionTimeout 异常。

 

需要注意的是:可能虽然设置了超时时间,但实际服务运行时 zk 并没有按设置的超时时间来管理会话。

 

这是因为实际起作用的超时时间是由客户端和服务端协商决定的。zk 客户端在和服务端建立连接时,会提交一个客户端设置的会话超时时间,而该超时时间会和服务端设置的最大超时时间和最小超时时间进行比较。如果正好在服务端设置允许的范围内,则采用客户端的超时时间管理会话。如果大于或小于服务端设置的超时时间,则采用服务端的超时时间管理会话。

 

2.分桶策略和会话管理


zk 作为分布式系统的核心组件,经常要处理大量的会话请求。zk 之所以能快速响应大量客户端操作,与它自身的会话管理策略密不可分。

 

(1)分桶策略和过期队列


一.会话管理中的心跳消息和过期时间


在 zk 中为了保持会话的存活状态,客户端要向服务端周期性发送心跳信息。客户端的心跳信息可以是一个 PING 请求,也可以是一个普通的业务请求。

 

zk 服务端收到请求后,便会更新会话的过期时间,来保持会话的存活状态。因此 zk 的会话管理,最主要的工作就是管理会话的过期时间。

 

zk 服务端的会话管理是由 SessionTracker 负责的,会话管理器 SessionTracker 采用了分桶策略来管理会话的过期时间。

 

二.分桶策略的原理


会话管理器 SessionTracker 会按照不同的时间间隔对会话进行划分,超时时间相近的会话将会被放在同一个间隔区间中。

 

具体的划分原则就是:每个会话的最近过期时间点 ExpirationTime,ExpirationTime 是指会话最近的过期时间点。



对于一个新会话创建完毕后,zk 服务端都会计算其 ExpirationTime,会话管理器 SessionTracker 会每隔 ExpirationInterval 进行会话超时检查。


//CurrentTime是指当前时间,单位毫秒//SessionTimeout指会话的超时时间,单位毫秒//SessionTrackerImpl会每隔ExpirationInterval进行会话超时检查ExpirationTime = CurrentTime + SessionTimeoutExpirationTime = (ExpirationTime / ExpirationInterval + 1) * ExpirationInterval
复制代码


这种方式避免了对每一个会话进行检查。采用分批次的方式管理会话,可以降低会话管理的难度。因为每次小批量的处理会话过期可以提高会话处理的效率。

 

三.分桶策略的过期队列和 bucket


zk 服务端所有会话过期的相关操作都是围绕过期队列来进行的,可以说 zk 服务端底层就是通过这个过期队列来管理会话过期的。过期队列就是 ExpiryQueue 类型的 sessionExpiryQueue。


public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {    private final ExpiryQueue<SessionImpl> sessionExpiryQueue;//过期队列    private final AtomicLong nextSessionId = new AtomicLong();//当前生成的会话ID    ConcurrentHashMap<Long, SessionImpl> sessionsById;//根据会话ID来管理具体的会话实体    ConcurrentMap<Long, Integer> sessionsWithTimeout;//根据不同的会话ID管理每个会话的超时时间    ...}
复制代码


什么是 bucket:


SessionTracker 的过期队列是 ExpiryQueue 类型的,ExpiryQueue 类型的过期队列会由若干个 bucket 组成。每个 bucket 是以 expirationInterval 为单位进行时间区间划分的。每个 bucket 中会存放一些在某一时间点内过期的会话。

 

如何实现过期队列:


在 zk 中会使用 ExpiryQueue 类来实现一个会话过期队列。ExpiryQueue 类中有两个 HashMap:elemMap 和一个 expiryMap。elemMap 中存放会话对象 SessionImpl 及其对应的最近过期时间点,expiryMap 中存放的就是过期队列。

 

expiryMap 的 key 就是 bucket 的时间划分,即会话的最近过期时间点。expiryMap 的 value 就是 bucket 中存放的某一时间内过期的会话集合。所以 bucket 可以理解为一个 Set 会话对象集合。expiryMap 是线程安全的 HaspMap,可根据不同的过期时间区间存放会话。expiryMap 过期队列中的一个过期时间点就对应一个 bucket。

 

ExpiryQueue 中也实现了 remove()、update()、poll()等队列的操作方法。超时检查的定时任务一开始会获取最近的会话过期时间点看看当前是否已经到达,然后从过期队列中 poll 出 bucket 时会更新下一次的最近的会话过期时间点。


public class ExpiryQueue<E> {    //存放会话对象SessionImpl及其对应的最近的过期时间点    private final ConcurrentHashMap<E, Long> elemMap = new ConcurrentHashMap<E, Long>();    //存放过期队列,bucket可以理解为一个Set<SessionImpl>会话对象集合    private final ConcurrentHashMap<Long, Set<E>> expiryMap = new ConcurrentHashMap<Long, Set<E>>();    //最近的一批会话的过期时间点    private final AtomicLong nextExpirationTime = new AtomicLong();    //将会话划分到一个个bucket的时间间隔,也是超时检查线程定时检查时间间隔    private final int expirationInterval;
public ExpiryQueue(int expirationInterval) { this.expirationInterval = expirationInterval; nextExpirationTime.set(roundToNextInterval(Time.currentElapsedTime())); } private long roundToNextInterval(long time) { return (time / expirationInterval + 1) * expirationInterval; } public long getWaitTime() { long now = Time.currentElapsedTime(); long expirationTime = nextExpirationTime.get(); return now < expirationTime ? (expirationTime - now) : 0L; } public Long update(E elem, int timeout) { Long prevExpiryTime = elemMap.get(elem); long now = Time.currentElapsedTime(); Long newExpiryTime = roundToNextInterval(now + timeout);
if (newExpiryTime.equals(prevExpiryTime)) { // No change, so nothing to update return null; }
// First add the elem to the new expiry time bucket in expiryMap. Set<E> set = expiryMap.get(newExpiryTime); if (set == null) { // Construct a ConcurrentHashSet using a ConcurrentHashMap set = Collections.newSetFromMap(new ConcurrentHashMap<E, Boolean>()); // Put the new set in the map, but only if another thread hasn't beaten us to it Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set); if (existingSet != null) { set = existingSet; } } set.add(elem);
// Map the elem to the new expiry time. If a different previous mapping was present, clean up the previous expiry bucket. prevExpiryTime = elemMap.put(elem, newExpiryTime); if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) { Set<E> prevSet = expiryMap.get(prevExpiryTime); if (prevSet != null) { prevSet.remove(elem); } } return newExpiryTime; } public Set<E> poll() { long now = Time.currentElapsedTime(); long expirationTime = nextExpirationTime.get(); if (now < expirationTime) { return Collections.emptySet(); }
Set<E> set = null; long newExpirationTime = expirationTime + expirationInterval; //设置最近的会话过期时间点 if (nextExpirationTime.compareAndSet(expirationTime, newExpirationTime)) { set = expiryMap.remove(expirationTime); } if (set == null) { return Collections.emptySet(); } return set; } public Long remove(E elem) { Long expiryTime = elemMap.remove(elem); if (expiryTime != null) { Set<E> set = expiryMap.get(expiryTime); if (set != null) { set.remove(elem); } } return expiryTime; } ...}
复制代码


(2)会话激活


为了保持客户端会话的有效性,客户端要不断发送 PING 请求进行心跳检测。服务端要不断接收客户端的这个心跳检测,并重新激活对应的客户端会话。这个重新激活会话的过程由 SessionTracker 的 touchSession()方法实现。

 

服务端处理 PING 请求的主要流程如下:


public class NIOServerCnxnFactory extends ServerCnxnFactory {    private ExpiryQueue<NIOServerCnxn> cnxnExpiryQueue;    ...    public void start() {        stopped = false;        if (workerPool == null) {            workerPool = new WorkerService("NIOWorker", numWorkerThreads, false);        }        for (SelectorThread thread : selectorThreads) {            if (thread.getState() == Thread.State.NEW) {                thread.start();            }        }        if (acceptThread.getState() == Thread.State.NEW) {            acceptThread.start();        }        if (expirerThread.getState() == Thread.State.NEW) {            expirerThread.start();        }    }        class SelectorThread extends AbstractSelectThread {        @Override        public void run() {            ...            while (!stopped) {                select();                ...            }            ...        }                private void select() {            selector.select();            Set<SelectionKey> selected = selector.selectedKeys();            ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);            Collections.shuffle(selectedList);            Iterator<SelectionKey> selectedKeys = selectedList.iterator();            while (!stopped && selectedKeys.hasNext()) {                SelectionKey key = selectedKeys.next();                selected.remove(key);                ...                if (key.isReadable() || key.isWritable()) {                    //服务端从客户端读数据(读取请求) + 服务端向客户端写数据(发送响应)                    handleIO(key);                }                ...            }        }                private void handleIO(SelectionKey key) {            IOWorkRequest workRequest = new IOWorkRequest(this, key);            NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();            cnxn.disableSelectable();            key.interestOps(0);            //激活连接:添加连接到连接过期队列            touchCnxn(cnxn);            //通过工作线程池来处理请求            workerPool.schedule(workRequest);        }        ...    }        public void touchCnxn(NIOServerCnxn cnxn) {        cnxnExpiryQueue.update(cnxn, cnxn.getSessionTimeout());    }    ...}
public class WorkerService { ... public void schedule(WorkRequest workRequest) { schedule(workRequest, 0); } public void schedule(WorkRequest workRequest, long id) { ScheduledWorkRequest scheduledWorkRequest = new ScheduledWorkRequest(workRequest); int size = workers.size(); if (size > 0) { int workerNum = ((int) (id % size) + size) % size; ExecutorService worker = workers.get(workerNum); worker.execute(scheduledWorkRequest); } else { scheduledWorkRequest.run(); } } private class ScheduledWorkRequest implements Runnable { private final WorkRequest workRequest; ScheduledWorkRequest(WorkRequest workRequest) { this.workRequest = workRequest; } @Override public void run() { ... workRequest.doWork(); } } ...}
public class NIOServerCnxnFactory extends ServerCnxnFactory { private class IOWorkRequest extends WorkerService.WorkRequest { private final NIOServerCnxn cnxn; public void doWork() throws InterruptedException { ... if (key.isReadable() || key.isWritable()) { cnxn.doIO(key); ... } ... } ... } ...}
public class NIOServerCnxn extends ServerCnxn { private final ZooKeeperServer zkServer; void doIO(SelectionKey k) throws InterruptedException { ... if (k.isReadable()) { ... readPayload(); ... } ... } private void readPayload() throws IOException, InterruptedException { ... readRequest(); ... } private void readRequest() throws IOException { //处理输入流 zkServer.processPacket(this, incomingBuffer); } ...}
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { ... public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { InputStream bais = new ByteBufferInputStream(incomingBuffer); BinaryInputArchive bia = BinaryInputArchive.getArchive(bais); RequestHeader h = new RequestHeader(); h.deserialize(bia, "header"); incomingBuffer = incomingBuffer.slice(); ... Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); ... submitRequest(si); ... } public void submitRequest(Request si) { ... //激活会话 touch(si.cnxn); firstProcessor.processRequest(si); ... } void touch(ServerCnxn cnxn) throws MissingSessionException { if (cnxn == null) { return; } long id = cnxn.getSessionId(); int to = cnxn.getSessionTimeout(); //激活会话 if (!sessionTracker.touchSession(id, to)) { throw new MissingSessionException("No session with sessionid 0x" + Long.toHexString(id) + " exists, probably expired and removed"); } } ...}
复制代码


由于 ZooKeeperServer 的 submitRequest()方法会调用 touch()方法激活会话,所以只要客户端有请求发送到服务端,服务端就会进行一次会话激活。

 

执行 SessionTracker 的 touchSession()方法进行会话激活的主要流程如下:

 

一.检查该会话是否已经被关闭


如果该会话已经被关闭,则返回,不用激活会话。

 

二.计算该会话新的过期时间点 newExpiryTime


调用 ExpiryQueue 的 roundToNextInterval()方法计算会话新的过期时间点。通过总时间除以间隔时间然后向上取整再乘以间隔时间来计算新的过期时间点。

 

三.将该会话添加到新的过期时间点对应的 bucket 中


从过期队列 expiryMap 获取新的过期时间点对应的 bucket,然后添加该会话到新的过期时间点对应的 bucket 中。

 

四.将该会话从旧的过期时间点对应的 bucket 中移除


从 elemMap 中获取该会话旧的过期时间点,然后将该会话从旧的过期时间点对应的 bucket 中移除。


public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {    ...    ConcurrentHashMap<Long, SessionImpl> sessionsById;//根据会话ID来管理具体的会话实体        synchronized public boolean touchSession(long sessionId, int timeout) {        SessionImpl s = sessionsById.get(sessionId);        if (s == null) {            logTraceTouchInvalidSession(sessionId, timeout);            return false;        }        //1.检查会话是否已经被关闭        if (s.isClosing()) {            logTraceTouchClosingSession(sessionId, timeout);            return false;        }        //激活会话        updateSessionExpiry(s, timeout);        return true;    }        private void updateSessionExpiry(SessionImpl s, int timeout) {        logTraceTouchSession(s.sessionId, timeout, "");        //激活会话        sessionExpiryQueue.update(s, timeout);    }    ...}
public class ExpiryQueue<E> { //存放会话对象SessionImpl及其对应的最近的过期时间点 private final ConcurrentHashMap<E, Long> elemMap = new ConcurrentHashMap<E, Long>(); //存放过期队列,bucket可以理解为一个Set<SessionImpl>会话对象集合 private final ConcurrentHashMap<Long, Set<E>> expiryMap = new ConcurrentHashMap<Long, Set<E>>(); //最近的一批会话的过期时间点 private final AtomicLong nextExpirationTime = new AtomicLong(); //将会话划分到一个个bucket的时间间隔,也是超时检查线程的定时检查时间间隔 private final int expirationInterval; ... private long roundToNextInterval(long time) { //通过向上取整来进行计算新的过期时间点 return (time / expirationInterval + 1) * expirationInterval; } ... public Long update(E elem, int timeout) { Long prevExpiryTime = elemMap.get(elem); long now = Time.currentElapsedTime(); //2.计算该会话新的过期时间点newExpiryTime Long newExpiryTime = roundToNextInterval(now + timeout);

if (newExpiryTime.equals(prevExpiryTime)) { // No change, so nothing to update return null; } //3.从过期队列expiryMap获取新的过期时间点对应的bucket //First add the elem to the new expiry time bucket in expiryMap. Set<E> set = expiryMap.get(newExpiryTime); if (set == null) { // Construct a ConcurrentHashSet using a ConcurrentHashMap set = Collections.newSetFromMap(new ConcurrentHashMap<E, Boolean>()); // Put the new set in the map, but only if another thread hasn't beaten us to it Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set); if (existingSet != null) { set = existingSet; } } //将会话添加到新的过期时间点对应的bucket中 set.add(elem); //4.从elemMap中获取该会话旧的过期时间点 //Map the elem to the new expiry time. If a different previous mapping was present, clean up the previous expiry bucket. prevExpiryTime = elemMap.put(elem, newExpiryTime); //然后将该会话从旧的过期时间点对应的bucket中移除 if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) { Set<E> prevSet = expiryMap.get(prevExpiryTime); if (prevSet != null) { prevSet.remove(elem); } } return newExpiryTime; } ...}
复制代码


(3)会话超时检查


SessionTracker 中会有一个线程专门进行会话超时检查,该线程会依次对 bucket 会话桶中剩下的会话进行清理,超时检查线程的定时检查时间间隔其实就是 expirationInterval。

 

当一个会话被激活时,SessionTracker 会将其从上一个 bucket 会话桶迁移到下一个 bucket 会话桶。所以超时检查线程的任务就是检查 bucket 会话桶中没被迁移的会话。

 

超时检查线程是如何进行定时检查的:


由于会话分桶策略会将 expirationInterval 的倍数作为会话最近过期时间点,所以超时检查线程只要在 expirationInterval 倍数的时间点进行检查即可。这样既提高了效率,而且由于是批量清理,因此性能也非常好。这也是 zk 要通过分桶策略来管理客户端会话的最主要原因。一个 zk 集群的客户端会话可能会非常多,逐个依次检查会非常耗费时间。


public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {    private final ExpiryQueue<SessionImpl> sessionExpiryQueue;//会话过期队列    private final SessionExpirer expirer;    ...    //超时检查线程    @Override    public void run() {        try {            while (running) {                //获取会话过期队列中最近的过期时间和当前时间之差                long waitTime = sessionExpiryQueue.getWaitTime();                if (waitTime > 0) {                    //时间未到则进行睡眠                    Thread.sleep(waitTime);                    continue;                }
for (SessionImpl s : sessionExpiryQueue.poll()) { //设置过期的会话状态为已关闭 setSessionClosing(s.sessionId); //对会话进行过期处理 expirer.expire(s); } } } catch (InterruptedException e) { handleException(this.getName(), e); } LOG.info("SessionTrackerImpl exited loop!"); } ...}
public class ExpiryQueue<E> { private final AtomicLong nextExpirationTime = new AtomicLong(); ... public long getWaitTime() { //当前时间 long now = Time.currentElapsedTime(); //获取最近的过期时间点 long expirationTime = nextExpirationTime.get(); return now < expirationTime ? (expirationTime - now) : 0L; } public Set<E> poll() { long now = Time.currentElapsedTime(); //获取最近的过期时间点 long expirationTime = nextExpirationTime.get(); if (now < expirationTime) { return Collections.emptySet(); } Set<E> set = null; //根据expirationInterval计算最新的最近过期时间点 long newExpirationTime = expirationTime + expirationInterval; //重置bucket桶中最近的过期时间点 if (nextExpirationTime.compareAndSet(expirationTime, newExpirationTime)) { //移出过期队列 set = expiryMap.remove(expirationTime); } if (set == null) { return Collections.emptySet(); } return set; } ...}
复制代码


(4)会话清理


当 SessionTracker 的会话超时检查线程遍历出一些已经过期的会话时,就要进行会话清理了,会话清理的步骤如下:


一.标记会话状态为已关闭


SessionTracker 的 setSessionClosing()方法会标记会话状态为已关闭,这是因为整个会话清理过程需要一段时间,为了保证在会话清理期间不再处理来自该会话对应的客户端的请求,SessionTracker 会首先将该会话的 isClosing 属性标记为 true。

 

二.发起关闭会话请求


ZooKeeperServer 的 expire()方法和 close()方法会发起关闭会话请求,为了使对该会话的关闭操作在整个服务端集群中都生效,zk 使用提交"关闭会话"请求的方式,将请求交给 PrepRequestProcessor 处理。

 

三.收集临时节点


PrepRequestProcessor 的 pRequest2Txn()方法会收集需要清理的临时节点。在 zk 中,一旦某个会话失效,那么和该会话相关的临时节点也要被清除掉。因此需要首先将服务器上所有和该会话相关的临时节点找出来。

 

zk 的内存数据库会为每个会话都保存一份由该会话维护的临时节点集合。因此在会话清理阶段,只需根据当前即将关闭的会话的 sessionID,便可以从 zk 的内存数据库中获取到该会话的临时节点列表。

 

四.添加临时节点的删除请求到事务变更队列


将临时节点的删除请求添加到事务变更队列 outstandingChanges 中。完成该会话相关的临时节点收集后,zk 会将这些临时节点逐个转换成节点删除请求,添加到事务变更队列中。

 

五.删除临时节点


FinalRequestProcessor 的 processRequest()方法触发删除临时节点。当收集完所有需要删除的临时节点,以及创建了对应的节点删除请求后,便会在 FinalRequestProcessor 的 processRequest()方法中,通过调用 ZooKeeperServer 的 processTxn()方法,调用到 ZKDatabase 的 processTxn()方法,最后调用 DataTree 的 killSession()方法,从而最终删除内存数据库中该会话的所有临时节点。

 

六.移除会话


在 FinalRequestProcessor 的 processRequest()方法中,会通过调用 ZooKeeperServer 的 processTxn()方法,调用到 SessionTracker 的 removeSession()方法将会话从 SessionTracker 移除。即从 sessionsById、sessionsWithTimeout、sessionExpiryQueue 中移除会话。

 

七.关闭 NIOServerCnxn


在 FinalRequestProcessor 的 processRequest()方法中,最后会调用 FinalRequestProcessor 的 closeSession()方法,从 NIOServerCnxnFactory 的 sessionMap 中将该会话进行移除。


public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {    private final ExpiryQueue<SessionImpl> sessionExpiryQueue;//会话过期队列    private final SessionExpirer expirer;    ...    //超时检查线程    @Override    public void run() {        while (running) {            //获取会话过期队列中最近的过期时间和当前时间之差            long waitTime = sessionExpiryQueue.getWaitTime();            if (waitTime > 0) {                //时间未到则进行睡眠                Thread.sleep(waitTime);                continue;            }            for (SessionImpl s : sessionExpiryQueue.poll()) {                //1.设置过期的会话状态为已关闭                setSessionClosing(s.sessionId);                //2.对会话进行过期处理,ZooKeeperServer实现了SessionExpirer接口                expirer.expire(s);            }        }    }        synchronized public void setSessionClosing(long sessionId) {        SessionImpl s = sessionsById.get(sessionId);        s.isClosing = true;    }        //6.移除会话    synchronized public void removeSession(long sessionId) {        SessionImpl s = sessionsById.remove(sessionId);        sessionsWithTimeout.remove(sessionId);        sessionExpiryQueue.remove(s);    }    ...}
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { public synchronized void startup() { if (sessionTracker == null) { createSessionTracker();//创建会话管理器 } startSessionTracker();//启动会话管理器的超时检查线程 setupRequestProcessors();//初始化请求处理链 registerJMX(); setState(State.RUNNING); notifyAll(); } protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); ((SyncRequestProcessor)syncProcessor).start(); firstProcessor = new PrepRequestProcessor(this, syncProcessor); ((PrepRequestProcessor)firstProcessor).start(); } ... public void expire(Session session) { long sessionId = session.getSessionId(); //2.发起关闭会话请求 close(sessionId); } private void close(long sessionId) { Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null); setLocalSessionFlag(si); //2.以提交"关闭会话"请求的方式,发起关闭会话请求 submitRequest(si); } public void submitRequest(Request si) { ... touch(si.cnxn); //2.首先由PrepRequestProcessor请求处理器的processRequest方法进行处理 firstProcessor.processRequest(si); ... } public ProcessTxnResult processTxn(Request request) { return processTxn(request, request.getHdr(), request.getTxn()); } private ProcessTxnResult processTxn(Request request, TxnHeader hdr, Record txn) { ... //5.ZKDatabase.processTxn方法会根据opCode.closeSession来删除临时节点 rc = getZKDatabase().processTxn(hdr, txn); ... if (opCode == OpCode.createSession) { ... } else if (opCode == OpCode.closeSession) { //6.移除会话 sessionTracker.removeSession(sessionId); } return rc; } ...}
public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor { LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>(); private final RequestProcessor nextProcessor; ZooKeeperServer zks; public PrepRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) { super("ProcessThread(sid:" + zks.getServerId() + " cport:" + zks.getClientPort() + "):", zks.getZooKeeperServerListener()); this.nextProcessor = nextProcessor; this.zks = zks; } ... public void processRequest(Request request) { submittedRequests.add(request); } @Override public void run() { while (true) { Request request = submittedRequests.take(); pRequest(request); } } protected void pRequest(Request request) throws RequestProcessorException { ... case OpCode.closeSession: pRequest2Txn(request.type, zks.getNextZxid(), request, null, true); break; ... //交给下一个请求处理器处理 nextProcessor.processRequest(request); } protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) { //将请求标记为事务请求 request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type)); ... case OpCode.closeSession: //3.收集需要清理的临时节点 Set<String> es = zks.getZKDatabase().getEphemerals(request.sessionId); synchronized (zks.outstandingChanges) { ... for (String path2Delete : es) { //4.将临时节点的删除请求添加到事务变更队列outstandingChanges中 addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path2Delete, null, 0, null)); } zks.sessionTracker.setSessionClosing(request.sessionId); } break; ... } private void addChangeRecord(ChangeRecord c) { //4.将临时节点的删除请求添加到事务变更队列outstandingChanges中 synchronized (zks.outstandingChanges) { zks.outstandingChanges.add(c); zks.outstandingChangesForPath.put(c.path, c); } } ...}
public class FinalRequestProcessor implements RequestProcessor { ZooKeeperServer zks; public void processRequest(Request request) { ... //5.删除临时节点 + 6.移除会话 rc = zks.processTxn(request); ... if (request.type == OpCode.closeSession && connClosedByClient(request)) { //7.关闭NIOServerCnxn if (closeSession(zks.serverCnxnFactory, request.sessionId) || closeSession(zks.secureServerCnxnFactory, request.sessionId)) { return; } } ... } private boolean closeSession(ServerCnxnFactory serverCnxnFactory, long sessionId) { if (serverCnxnFactory == null) { return false; } //7.关闭NIOServerCnxn return serverCnxnFactory.closeSession(sessionId); } ...}
public class NIOServerCnxnFactory extends ServerCnxnFactory { private final ConcurrentHashMap<Long, NIOServerCnxn> sessionMap = new ConcurrentHashMap<Long, NIOServerCnxn>(); ... public void addSession(long sessionId, NIOServerCnxn cnxn) { sessionMap.put(sessionId, cnxn); } @Override public boolean closeSession(long sessionId) { NIOServerCnxn cnxn = sessionMap.remove(sessionId); if (cnxn != null) { cnxn.close(); return true; } return false; } ...}
复制代码


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18817631

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2025-04-01 加入

还未添加个人简介

评论

发布
暂无评论
zk源码—会话的实现原理_Java_电子尖叫食人鱼_InfoQ写作社区