写点什么

ZAB 与 Paxos:分布式一致性算法的工程实践与深度对比

作者:异常君
  • 2025-06-11
    吉林
  • 本文字数:91667 字

    阅读完需:约 301 分钟

ZAB 与 Paxos:分布式一致性算法的工程实践与深度对比

本文基于 Java 11+实现


构建可靠的分布式系统时,一致性问题是核心挑战之一。ZooKeeper 的 ZAB 协议和 Paxos 算法作为两种主流解决方案,在理论基础和工程实现上各有特点。本文深入分析它们的实现机制、性能特性和最佳实践。

一、基本概念

ZAB 协议

ZAB (ZooKeeper Atomic Broadcast) 是专为 ZooKeeper 设计的分布式一致性协议,核心目标是保证分布式系统中数据更新的原子性和顺序一致性。

Paxos 算法

Paxos 是 Leslie Lamport 提出的通用分布式一致性算法,是众多分布式系统的理论基础,解决的是在不可靠网络中如何达成共识的问题。

二、ZAB 协议实现

ZAB 协议工作在两种模式下:


  1. 恢复模式:系统启动或 Leader 崩溃时触发

  2. 广播模式:正常运行时处理写请求

核心接口定义

public interface ZabProcessor {    // 恢复模式接口    boolean startRecovery() throws RecoveryException;
// 广播模式接口 CompletableFuture<Boolean> processWrite(Request request); CompletableFuture<Result> processRead(String key, ConsistencyLevel level);
// 状态查询接口 boolean isLeader(); long getCurrentEpoch();}
public interface NetworkClient { // 基础网络通信接口 void connect(String serverId, String address, int port) throws IOException; void disconnect(String serverId);
// ZAB协议消息 ACK sendProposal(String serverId, ProposalPacket proposal) throws IOException; void sendCommit(String serverId, CommitPacket commit) throws IOException; LastZxidResponse sendEpochRequest(String serverId, EpochPacket epochPkt) throws IOException; boolean sendTruncate(String serverId, TruncatePacket truncPkt) throws IOException; boolean sendTransactions(String serverId, List<Transaction> txns) throws IOException; boolean sendNewLeader(String serverId, NewLeaderPacket newLeaderPkt) throws IOException; void sendHeartbeat(String serverId, long zxid) throws IOException; void sendSnapshot(String serverId, byte[] snapshot, long zxid) throws IOException;}
public interface StateMachine { void apply(long zxid, byte[] command) throws Exception; long getLastAppliedZxid(); byte[] takeSnapshot() throws Exception; void restoreSnapshot(byte[] snapshot, long zxid) throws Exception;}
复制代码

ZAB 恢复模式实现

public class ZABRecovery {    private final AtomicLong zxid = new AtomicLong(0);    private final AtomicInteger epoch = new AtomicInteger(0);    private volatile ServerState state = ServerState.LOOKING;    private final Logger logger = LoggerFactory.getLogger(ZABRecovery.class);    private final ConcurrentMap<String, ServerData> serverDataMap;    private final int quorumSize;    private final NetworkClient networkClient;    private final StateMachine stateMachine;    private final String serverId;
// 构造函数 public ZABRecovery(String serverId, int quorumSize, NetworkClient networkClient, StateMachine stateMachine) { this.serverId = serverId; this.quorumSize = quorumSize; this.networkClient = networkClient; this.stateMachine = stateMachine; this.serverDataMap = new ConcurrentHashMap<>(); }
// Leader恢复流程 public boolean startRecovery() throws RecoveryException { MDC.put("component", "zab-recovery"); MDC.put("serverId", serverId); try { // 1. 更新选举轮次 int newEpoch = epoch.incrementAndGet(); logger.info("Starting recovery with epoch: {}", newEpoch);
// 2. 发现阶段:收集所有Follower状态 Map<Long, Set<String>> commitMap = discoverFollowerStates();
// 3. 确定截断点和提交点 long truncateZxid = determineMaxCommittedZxid(commitMap); logger.info("Determined truncate zxid: {}", Long.toHexString(truncateZxid));
// 4. 解决可能的冲突(脑裂后) resolveConflictsAfterPartition(truncateZxid, commitMap);
// 5. 同步阶段:将历史事务同步给Follower syncFollowers(truncateZxid);
// 6. 切换到广播模式 state = ServerState.LEADING; logger.info("Recovery completed, switching to broadcast mode"); return true; } catch (IOException e) { logger.error("Recovery failed due to I/O error", e); state = ServerState.LOOKING; throw new RecoveryException("I/O error during recovery", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.warn("Recovery interrupted", e); state = ServerState.LOOKING; throw new RecoveryException("Recovery process interrupted", e); } catch (Exception e) { logger.error("Unexpected error during recovery", e); state = ServerState.LOOKING; throw new RecoveryException("Unexpected error during recovery", e); } finally { MDC.remove("component"); MDC.remove("serverId"); } }
// 发现阶段:收集所有Follower的最新事务信息 private Map<Long, Set<String>> discoverFollowerStates() throws IOException, InterruptedException { Map<Long, Set<String>> acceptedZxids = new ConcurrentHashMap<>(); CountDownLatch latch = new CountDownLatch(serverDataMap.size()); List<CompletableFuture<?>> futures = new ArrayList<>();
// 向所有Follower发送CEPOCH消息 for (var entry : serverDataMap.entrySet()) { final String targetServerId = entry.getKey(); final ServerData serverData = entry.getValue();
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { MDC.put("targetServerId", targetServerId); try { // 发送新的epoch EpochPacket epochPkt = new EpochPacket(epoch.get()); LastZxidResponse response = networkClient.sendEpochRequest( targetServerId, epochPkt);
// 记录该服务器的最新zxid synchronized (acceptedZxids) { acceptedZxids.computeIfAbsent(response.getLastZxid(), k -> new HashSet<>()) .add(targetServerId); }
logger.info("Server {} last zxid: {}", targetServerId, Long.toHexString(response.getLastZxid())); } catch (IOException e) { logger.error("Failed to discover state from server: {}", targetServerId, e); } finally { MDC.remove("targetServerId"); latch.countDown(); } });
futures.add(future); }
// 等待大多数响应或超时 if (!latch.await(10, TimeUnit.SECONDS)) { logger.warn("Discovery phase timed out, proceeding with available responses"); }
// 取消未完成的任务 for (CompletableFuture<?> future : futures) { if (!future.isDone()) { future.cancel(true); } }
return acceptedZxids; }
// 确定需要保留的最大已提交事务ID private long determineMaxCommittedZxid(Map<Long, Set<String>> commitMap) { // 寻找被多数派确认的最大ZXID long maxZxid = 0; int quorum = getQuorum();
for (var entry : commitMap.entrySet()) { if (entry.getValue().size() >= quorum && entry.getKey() > maxZxid) { maxZxid = entry.getKey(); } } return maxZxid; }
// 解决网络分区后可能的数据冲突 private void resolveConflictsAfterPartition(long truncateZxid, Map<Long, Set<String>> commitMap) { logger.info("Checking for potential conflicts after network partition");
// 1. 识别潜在冲突事务 - 那些不在多数派中的更高zxid List<ConflictingTransaction> conflicts = new ArrayList<>();
for (var entry : commitMap.entrySet()) { long txnZxid = entry.getKey(); Set<String> servers = entry.getValue();
// 如果zxid大于已确定的截断点,但不是多数派确认的 if (txnZxid > truncateZxid && servers.size() < getQuorum()) { // 获取事务的epoch int txnEpoch = ZxidUtils.getEpochFromZxid(txnZxid); int truncateEpoch = ZxidUtils.getEpochFromZxid(truncateZxid);
conflicts.add(new ConflictingTransaction(txnZxid, truncateZxid, txnEpoch, truncateEpoch, servers)); } }
// 2. 处理冲突 if (!conflicts.isEmpty()) { logger.warn("Found {} potential conflicting transactions after partition", conflicts.size());
for (ConflictingTransaction conflict : conflicts) { if (conflict.isFromHigherEpoch()) { logger.warn("Conflict: transaction with zxid {} from higher epoch {} " + "found but not in majority. Will be discarded.", Long.toHexString(conflict.getConflictZxid()), conflict.getConflictEpoch()); } else { logger.warn("Conflict: transaction with zxid {} from same epoch {} " + "found but not in majority. Will be discarded.", Long.toHexString(conflict.getConflictZxid()), conflict.getConflictEpoch()); }
// 通知这些服务器截断这些事务 notifyServersToTruncate(conflict.getServers(), truncateZxid); } } else { logger.info("No conflicting transactions found"); } }
// 通知服务器截断超出安全点的事务 private void notifyServersToTruncate(Set<String> servers, long truncateZxid) { for (String serverId : servers) { CompletableFuture.runAsync(() -> { try { TruncatePacket truncPkt = new TruncatePacket(truncateZxid); boolean success = networkClient.sendTruncate(serverId, truncPkt); if (success) { logger.info("Successfully notified server {} to truncate to zxid {}", serverId, Long.toHexString(truncateZxid)); } else { logger.warn("Failed to notify server {} to truncate", serverId); } } catch (IOException e) { logger.error("Error notifying server {} to truncate", serverId, e); } }); } }
// 同步阶段:将历史事务同步给Follower private void syncFollowers(long truncateZxid) throws IOException, InterruptedException { // 获取从truncateZxid开始的所有事务 List<Transaction> txns = loadTransactionsFromLog(truncateZxid); logger.info("Syncing {} transactions to followers", txns.size());
// 并行同步给所有Follower CountDownLatch syncLatch = new CountDownLatch(serverDataMap.size()); AtomicInteger successCount = new AtomicInteger(0); List<CompletableFuture<?>> futures = new ArrayList<>();
for (var entry : serverDataMap.entrySet()) { final String targetServerId = entry.getKey(); final ServerData serverData = entry.getValue();
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { MDC.put("targetServerId", targetServerId); try { // 检查Follower是否需要使用快照追赶 long followerZxid = serverData.getLastZxid(); if (truncateZxid - followerZxid > SNAPSHOT_THRESHOLD) { syncFollowerWithSnapshot(targetServerId, followerZxid); } else { // 1. 发送TRUNC命令,通知Follower截断日志 TruncatePacket truncPkt = new TruncatePacket(truncateZxid); if (networkClient.sendTruncate(targetServerId, truncPkt)) { // 2. 发送DIFF命令,同步缺失的事务 if (networkClient.sendTransactions(targetServerId, txns)) { // 3. 发送NEWLEADER命令,确认同步完成 NewLeaderPacket newLeaderPkt = new NewLeaderPacket(epoch.get()); if (networkClient.sendNewLeader(targetServerId, newLeaderPkt)) { // 同步成功 successCount.incrementAndGet(); logger.info("Successfully synced server: {}", targetServerId); } } } } } catch (IOException e) { logger.error("Failed to sync server {} with {} transactions, last zxid: {}", targetServerId, txns.size(), Long.toHexString(truncateZxid), e); } finally { MDC.remove("targetServerId"); syncLatch.countDown(); } });
futures.add(future); }
// 等待同步完成或超时 if (!syncLatch.await(30, TimeUnit.SECONDS)) { logger.warn("Sync phase timed out"); }
// 取消未完成的任务 for (CompletableFuture<?> future : futures) { if (!future.isDone()) { future.cancel(true); } }
// 检查是否有足够的服务器同步成功 if (successCount.get() < quorumSize) { throw new QuorumNotFoundException("Failed to sync with quorum of followers", successCount.get(), quorumSize); } }
// 使用快照同步落后太多的Follower private void syncFollowerWithSnapshot(String followerId, long followerZxid) throws IOException { try { logger.info("Follower {} is too far behind (zxid: {}), syncing with snapshot", followerId, Long.toHexString(followerZxid));
// 1. 获取当前状态快照 byte[] snapshot = stateMachine.takeSnapshot();
// 2. 发送快照给Follower networkClient.sendSnapshot(followerId, snapshot, zxid.get());
logger.info("Successfully sent snapshot to follower: {}", followerId); } catch (Exception e) { logger.error("Failed to sync follower {} with snapshot", followerId, e); throw new IOException("Snapshot sync failed", e); } }
// 从事务日志加载事务 private List<Transaction> loadTransactionsFromLog(long fromZxid) throws IOException { List<Transaction> result = new ArrayList<>(); // 实际实现会从持久化存储读取事务记录 logger.info("Loading transactions starting from zxid: {}", Long.toHexString(fromZxid)); return result; }
private int getQuorum() { return quorumSize / 2 + 1; }
// 常量定义 private static final long SNAPSHOT_THRESHOLD = 100000; // 事务差距超过10万时使用快照
// 冲突事务数据结构 static class ConflictingTransaction { private final long conflictZxid; private final long truncateZxid; private final int conflictEpoch; private final int truncateEpoch; private final Set<String> servers;
public ConflictingTransaction(long conflictZxid, long truncateZxid, int conflictEpoch, int truncateEpoch, Set<String> servers) { this.conflictZxid = conflictZxid; this.truncateZxid = truncateZxid; this.conflictEpoch = conflictEpoch; this.truncateEpoch = truncateEpoch; this.servers = new HashSet<>(servers); }
public boolean isFromHigherEpoch() { return conflictEpoch > truncateEpoch; }
public long getConflictZxid() { return conflictZxid; }
public int getConflictEpoch() { return conflictEpoch; }
public Set<String> getServers() { return Collections.unmodifiableSet(servers); } }
// 其他内部类定义...
enum ServerState { LOOKING, // 寻找Leader FOLLOWING, // Follower角色 LEADING // Leader角色 }}
复制代码

ZAB 广播模式实现

public class ZABBroadcast implements AutoCloseable {    private final AtomicLong zxid;    private final AtomicInteger epoch;    private final ConcurrentMap<String, ServerData> followers;    private final Logger logger = LoggerFactory.getLogger(ZABBroadcast.class);    private final CircuitBreaker circuitBreaker;    private final NetworkClient networkClient;    private final StateMachine stateMachine;    private final String serverId;    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();    private final ScheduledExecutorService scheduler;    private final MetricsCollector metrics;    private final RateLimiter heartbeatLogLimiter = RateLimiter.create(0.1); // 每10秒最多一条日志
public ZABBroadcast(String serverId, AtomicLong zxid, AtomicInteger epoch, NetworkClient networkClient, StateMachine stateMachine) { this.serverId = serverId; this.zxid = zxid; this.epoch = epoch; this.networkClient = networkClient; this.stateMachine = stateMachine; this.followers = new ConcurrentHashMap<>(); this.circuitBreaker = new CircuitBreaker(5, 10000); // 5次失败,10秒重置 this.scheduler = Executors.newScheduledThreadPool(2, r -> { Thread t = new Thread(r, "zab-scheduler-" + serverId); t.setDaemon(true); return t; }); this.metrics = new MetricsCollector("zab_broadcast");
// 启动心跳任务 scheduler.scheduleWithFixedDelay(this::sendHeartbeats, 500, 500, TimeUnit.MILLISECONDS); }
// 添加Follower public void addFollower(ServerData follower) { followers.put(follower.getId(), follower); }
// Leader处理写请求 public CompletableFuture<Boolean> processWrite(Request request) { Stopwatch stopwatch = Stopwatch.createStarted(); MDC.put("component", "zab-broadcast"); MDC.put("serverId", serverId); MDC.put("requestId", request.getId());
try { return GlobalExceptionHandler.withExceptionHandling( circuitBreaker.execute(() -> { try { // 1. 为请求生成zxid (高32位是epoch,低32位是计数器) long newZxid = createNewZxid(); MDC.put("zxid", Long.toHexString(newZxid)); logger.info("Processing write request: {} with zxid: {}", request.getId(), Long.toHexString(newZxid));
// 2. 将请求发送给所有Follower List<Future<ACK>> futures = sendToFollowers(request, newZxid);
// 3. 等待过半Follower的ACK if (waitForMajority(futures)) { // 4. 通知所有Follower提交事务 commit(newZxid); logger.info("Request {} committed successfully", request.getId());
// 5. 记录指标 metrics.recordSuccessfulWrite(stopwatch.elapsed(TimeUnit.MILLISECONDS)); return CompletableFuture.completedFuture(true); } else { logger.warn("Failed to get majority ACKs for request {}", request.getId()); metrics.recordFailedWrite(); return CompletableFuture.completedFuture(false); } } catch (IOException e) { logger.error("Failed to process write request: {}", request.getId(), e); metrics.recordFailedWrite(); return CompletableFuture.failedFuture( new ProcessingException("Failed to process write request", e)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.warn("Interrupted while processing write request: {}", request.getId(), e); metrics.recordFailedWrite(); return CompletableFuture.failedFuture( new ProcessingException("Interrupted during write processing", e)); } }) ); } catch (CircuitBreakerOpenException e) { logger.error("Circuit breaker is open, rejecting request: {}", request.getId()); metrics.recordRejectedWrite(); return CompletableFuture.failedFuture( new ProcessingException("Circuit breaker open, system overloaded", e)); } finally { MDC.remove("component"); MDC.remove("serverId"); MDC.remove("requestId"); MDC.remove("zxid"); } }
// 处理批量写请求,提高吞吐量 public CompletableFuture<Map<String, Boolean>> processBatchWrite(List<Request> requests) { if (requests.isEmpty()) { return CompletableFuture.completedFuture(Collections.emptyMap()); }
Stopwatch stopwatch = Stopwatch.createStarted(); MDC.put("component", "zab-broadcast"); MDC.put("serverId", serverId); MDC.put("batchSize", String.valueOf(requests.size()));
try { return GlobalExceptionHandler.withExceptionHandling( circuitBreaker.execute(() -> { Map<String, Boolean> results = new HashMap<>(); try { // 创建批处理包 BatchRequest batch = new BatchRequest(); for (Request req : requests) { batch.addRequest(req); results.put(req.getId(), false); // 默认失败 }
// 为批次生成一个zxid long batchZxid = createNewZxid(); MDC.put("zxid", Long.toHexString(batchZxid)); logger.info("Processing batch of {} requests with zxid: {}", requests.size(), Long.toHexString(batchZxid));
// 发送批处理请求给所有Follower List<Future<ACK>> futures = sendBatchToFollowers(batch, batchZxid);
// 等待多数派确认 if (waitForMajority(futures)) { // 提交批次 commitBatch(batchZxid); logger.info("Batch with {} requests committed successfully", requests.size());
// 设置所有请求结果为成功 for (Request req : requests) { results.put(req.getId(), true); }
metrics.recordSuccessfulBatchWrite( requests.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS)); } else { logger.warn("Failed to get majority ACKs for batch"); metrics.recordFailedBatchWrite(requests.size()); } } catch (Exception e) { logger.error("Error processing batch write of {} requests", requests.size(), e); metrics.recordFailedBatchWrite(requests.size()); } return CompletableFuture.completedFuture(results); }) ); } catch (CircuitBreakerOpenException e) { logger.error("Circuit breaker is open, rejecting batch of {} requests", requests.size()); metrics.recordRejectedBatchWrite(requests.size());
Map<String, Boolean> results = new HashMap<>(); for (Request req : requests) { results.put(req.getId(), false); } return CompletableFuture.failedFuture( new ProcessingException("Circuit breaker open, system overloaded", e)); } finally { MDC.remove("component"); MDC.remove("serverId"); MDC.remove("batchSize"); MDC.remove("zxid"); } }
// 读取操作的一致性保证 public CompletableFuture<Result> readWithConsistency(String key, ConsistencyLevel level) { Stopwatch stopwatch = Stopwatch.createStarted(); MDC.put("component", "zab-broadcast"); MDC.put("serverId", serverId); MDC.put("key", key); MDC.put("consistency", level.name());
try { ReadStrategy strategy = readStrategies.getOrDefault( level, readStrategies.get(ConsistencyLevel.EVENTUAL));
CompletableFuture<Result> result = strategy.execute(key, this::readLocal);
result.thenAccept(r -> metrics.recordRead(level, stopwatch.elapsed(TimeUnit.MILLISECONDS)));
return result; } catch (Exception e) { logger.error("Error performing {} read for key: {}", level, key, e); metrics.recordFailedRead(level); return CompletableFuture.failedFuture( new ProcessingException("Read operation failed", e)); } finally { MDC.remove("component"); MDC.remove("serverId"); MDC.remove("key"); MDC.remove("consistency"); } }
// 本地读取数据 private Result readLocal(String key) { rwLock.readLock().lock(); try { // 实际实现会从本地数据库读取 return new Result(key, "value", true); } finally { rwLock.readLock().unlock(); } }
// 生成新的zxid,处理溢出情况 private long createNewZxid() { rwLock.writeLock().lock(); try { long currentCounter = zxid.get() & 0xFFFFFFFFL; // 检测溢出并处理 if (currentCounter >= 0xFFFFFFFFL) { // 计数器即将溢出,增加epoch int newEpoch = epoch.incrementAndGet(); logger.warn("ZXID counter overflow, incrementing epoch to {}", newEpoch); long newZxid = ((long)newEpoch << 32); // 重置计数器 zxid.set(newZxid); return newZxid; } return zxid.incrementAndGet(); } finally { rwLock.writeLock().unlock(); } }
// 发送提案给所有Follower private List<Future<ACK>> sendToFollowers(Request request, long newZxid) throws IOException { List<Future<ACK>> futures = new ArrayList<>(); ProposalPacket proposal = new ProposalPacket(newZxid, request);
ExecutorService executor = Executors.newFixedThreadPool(followers.size(), r -> { Thread t = new Thread(r, "proposal-sender-" + serverId); t.setDaemon(true); return t; });
try { for (var entry : followers.entrySet()) { final String targetServerId = entry.getKey();
futures.add(executor.submit(() -> { MDC.put("targetServerId", targetServerId); try { ACK ack = networkClient.sendProposal(targetServerId, proposal); logger.debug("Received ACK from {} for zxid {}", targetServerId, Long.toHexString(newZxid)); return ack; } catch (IOException e) { logger.error("Failed to send proposal to follower {}, zxid: {}", targetServerId, Long.toHexString(newZxid), e); return null; } finally { MDC.remove("targetServerId"); } })); } } finally { executor.shutdown(); try { if (!executor.awaitTermination(200, TimeUnit.MILLISECONDS)) { List<Runnable> pendingTasks = executor.shutdownNow(); logger.warn("Force shutdown executor with {} pending tasks", pendingTasks.size()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.warn("Interrupted while waiting for executor to terminate"); } }
return futures; }
// 等待多数派响应 private boolean waitForMajority(List<Future<ACK>> futures) throws InterruptedException { int ackCount = 0; int majority = (followers.size() / 2) + 1;
for (Future<ACK> future : futures) { try { ACK ack = future.get(5, TimeUnit.SECONDS); if (ack != null && ack.isSuccess()) { ackCount++; if (ackCount >= majority) { // 已获得多数派确认,可以提前返回 return true; } } } catch (ExecutionException e) { logger.warn("Error getting ACK", e.getCause()); } catch (TimeoutException e) { logger.warn("Timeout waiting for ACK"); } }
return ackCount >= majority; }
// 通知所有Follower提交事务 private void commit(long zxid) throws IOException { CommitPacket commit = new CommitPacket(zxid);
for (var entry : followers.entrySet()) { final String targetServerId = entry.getKey();
CompletableFuture.runAsync(() -> { MDC.put("targetServerId", targetServerId); try { networkClient.sendCommit(targetServerId, commit); logger.debug("Sent commit to {} for zxid {}", targetServerId, Long.toHexString(zxid)); } catch (IOException e) { logger.error("Failed to send commit to follower {}, zxid: {}", targetServerId, Long.toHexString(zxid), e); } finally { MDC.remove("targetServerId"); } }); } }
// 发送批处理请求 private List<Future<ACK>> sendBatchToFollowers(BatchRequest batch, long batchZxid) throws IOException { ProposalPacket proposal = new ProposalPacket(batchZxid, batch); return sendProposalToFollowers(proposal, batchZxid); }
// 提交批处理请求 private void commitBatch(long batchZxid) throws IOException { commit(batchZxid); }
// 发送心跳给所有Follower private void sendHeartbeats() { long currentZxid = zxid.get();
for (var entry : followers.entrySet()) { final String targetServerId = entry.getKey();
CompletableFuture.runAsync(() -> { try { networkClient.sendHeartbeat(targetServerId, currentZxid); } catch (IOException e) { // 心跳失败,使用限流器避免日志泛滥 if (heartbeatLogLimiter.tryAcquire()) { logger.debug("Failed to send heartbeat to {}", targetServerId, e); } } }); } }
// 发送提案给所有Follower(通用方法) private List<Future<ACK>> sendProposalToFollowers(ProposalPacket proposal, long zxid) throws IOException { List<Future<ACK>> futures = new ArrayList<>();
ExecutorService executor = Executors.newFixedThreadPool(followers.size(), r -> { Thread t = new Thread(r, "proposal-sender-" + serverId); t.setDaemon(true); return t; });
try { for (var entry : followers.entrySet()) { final String targetServerId = entry.getKey();
futures.add(executor.submit(() -> { MDC.put("targetServerId", targetServerId); try { ACK ack = networkClient.sendProposal(targetServerId, proposal); logger.debug("Received ACK from {} for zxid {}", targetServerId, Long.toHexString(zxid)); return ack; } catch (IOException e) { logger.error("Failed to send proposal to follower {}, zxid: {}", targetServerId, Long.toHexString(zxid), e); return null; } finally { MDC.remove("targetServerId"); } })); } } finally { executor.shutdown(); try { if (!executor.awaitTermination(200, TimeUnit.MILLISECONDS)) { List<Runnable> pendingTasks = executor.shutdownNow(); logger.warn("Force shutdown executor with {} pending tasks", pendingTasks.size()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.warn("Interrupted while waiting for executor to terminate"); } }
return futures; }
// 定义读取策略接口和实现 private interface ReadStrategy { CompletableFuture<Result> execute(String key, Supplier<Result> readFromLocal); }
private final Map<ConsistencyLevel, ReadStrategy> readStrategies = new EnumMap<>(ConsistencyLevel.class);
{ // 初始化读取策略 readStrategies.put(ConsistencyLevel.LINEARIZABLE, new LinearizableReadStrategy()); readStrategies.put(ConsistencyLevel.SEQUENTIAL, new SequentialReadStrategy()); readStrategies.put(ConsistencyLevel.READ_YOUR_WRITES, new ReadYourWritesStrategy()); readStrategies.put(ConsistencyLevel.BOUNDED_STALENESS, new BoundedStalenessStrategy()); readStrategies.put(ConsistencyLevel.EVENTUAL, new EventualReadStrategy()); }
// 线性一致性读取策略 private class LinearizableReadStrategy implements ReadStrategy { private final AtomicLong leaseExpirationTime = new AtomicLong(0); private final long leaderLeaseMs = 5000; // 5秒租约
@Override public CompletableFuture<Result> execute(String key, Supplier<Result> readFromLocal) { // Leader需要确认自己仍然是Leader (租约机制) if (System.currentTimeMillis() < leaseExpirationTime.get()) { // 租约有效,可以安全读取 return CompletableFuture.completedFuture(readFromLocal.get()); } else { // 租约过期,需要重新获取多数派确认 return renewLease().thenApply(renewed -> { if (renewed) { return readFromLocal.get(); } else { throw new ConsistencyException("Cannot guarantee linearizable read"); } }); } }
private CompletableFuture<Boolean> renewLease() { // 实际实现中,需要获取多数派确认 leaseExpirationTime.set(System.currentTimeMillis() + leaderLeaseMs); logger.info("Renewed leader lease until {}", leaseExpirationTime.get()); return CompletableFuture.completedFuture(true); } }
// 顺序一致性读取策略 private class SequentialReadStrategy implements ReadStrategy { @Override public CompletableFuture<Result> execute(String key, Supplier<Result> readFromLocal) { // 确保应用了所有已提交的事务 return ensureAppliedUpToDate() .thenApply(v -> readFromLocal.get()); }
private CompletableFuture<Void> ensureAppliedUpToDate() { // 实际实现会确保所有已提交的事务都已应用 logger.debug("Ensuring all committed transactions are applied"); return CompletableFuture.completedFuture(null); } }
// 读己所写策略 private class ReadYourWritesStrategy implements ReadStrategy { private final ConcurrentMap<String, Long> writeTimestamps = new ConcurrentHashMap<>();
@Override public CompletableFuture<Result> execute(String key, Supplier<Result> readFromLocal) { // 检查是否有该key的写入记录 Long writeTime = writeTimestamps.get(key); if (writeTime != null) { // 确保经过足够时间,写入已经完成 long elapsed = System.currentTimeMillis() - writeTime; if (elapsed < 100) { // 假设100ms足够写入完成 try { Thread.sleep(100 - elapsed); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
return CompletableFuture.completedFuture(readFromLocal.get()); }
// 记录写入操作 public void recordWrite(String key) { writeTimestamps.put(key, System.currentTimeMillis()); } }
// 有界陈旧性策略 private class BoundedStalenessStrategy implements ReadStrategy { private final ConcurrentMap<String, CacheEntry> cache = new ConcurrentHashMap<>(); private final long maxStalenessMs = 1000; // 最大陈旧时间1秒
@Override public CompletableFuture<Result> execute(String key, Supplier<Result> readFromLocal) { // 检查缓存 CacheEntry entry = cache.get(key); if (entry != null) { long age = System.currentTimeMillis() - entry.getTimestamp(); if (age <= maxStalenessMs) { // 缓存未过期,直接返回 return CompletableFuture.completedFuture(entry.getResult()); } }
// 缓存过期或不存在,从本地读取并更新缓存 Result result = readFromLocal.get(); cache.put(key, new CacheEntry(result, System.currentTimeMillis())); return CompletableFuture.completedFuture(result); }
// 定期清理过期缓存 public void cleanup() { long now = System.currentTimeMillis(); cache.entrySet().removeIf(entry -> now - entry.getValue().getTimestamp() > maxStalenessMs); } }
// 最终一致性策略 private class EventualReadStrategy implements ReadStrategy { @Override public CompletableFuture<Result> execute(String key, Supplier<Result> readFromLocal) { // 直接从本地读取,不保证看到最新写入 return CompletableFuture.completedFuture(readFromLocal.get()); } }
// 缓存条目 private static class CacheEntry { private final Result result; private final long timestamp;
public CacheEntry(Result result, long timestamp) { this.result = result; this.timestamp = timestamp; }
public Result getResult() { return result; }
public long getTimestamp() { return timestamp; } }
@Override public void close() { try { List<Runnable> pendingTasks = scheduler.shutdownNow(); if (!pendingTasks.isEmpty()) { logger.warn("Scheduler shutdown with {} pending tasks", pendingTasks.size()); }
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { logger.warn("Scheduler did not terminate in time"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.warn("Interrupted while waiting for scheduler termination"); } }
// 断路器实现(更安全的版本) static class CircuitBreaker { private final AtomicReference<State> state = new AtomicReference<>(State.CLOSED); private final AtomicLong failureCount = new AtomicLong(0); private final AtomicLong lastFailureTime = new AtomicLong(0); private final int threshold; private final long resetTimeoutMs; private final StampedLock stateLock = new StampedLock(); private final Logger logger = LoggerFactory.getLogger(CircuitBreaker.class);
public enum State { CLOSED, OPEN, HALF_OPEN }
public CircuitBreaker(int threshold, long resetTimeoutMs) { this.threshold = threshold; this.resetTimeoutMs = resetTimeoutMs; }
public <T> CompletableFuture<T> execute(Supplier<CompletableFuture<T>> action) throws CircuitBreakerOpenException { State currentState = getCurrentState();
if (currentState == State.OPEN) { // 检查是否应该尝试半开状态 if (System.currentTimeMillis() - lastFailureTime.get() > resetTimeoutMs) { boolean transitioned = tryTransitionState(State.OPEN, State.HALF_OPEN); if (!transitioned) { throw new CircuitBreakerOpenException("Circuit breaker is open"); } currentState = State.HALF_OPEN; } else { throw new CircuitBreakerOpenException("Circuit breaker is open"); } }
final State executionState = currentState;
try { CompletableFuture<T> future = action.get(); return future.handle((result, ex) -> { if (ex != null) { recordFailure(); throw new CompletionException(ex); } else { // 成功执行,重置失败计数 if (executionState == State.HALF_OPEN) { tryTransitionState(State.HALF_OPEN, State.CLOSED); } failureCount.set(0); return result; } }); } catch (Exception e) { recordFailure(); throw e; } }
private void recordFailure() { long stamp = stateLock.writeLock(); try { long failures = failureCount.incrementAndGet(); lastFailureTime.set(System.currentTimeMillis());
if (failures >= threshold && state.get() == State.CLOSED) { logger.warn("Circuit breaker opening after {} failures", failures); state.set(State.OPEN); } } finally { stateLock.unlockWrite(stamp); } }
private boolean tryTransitionState(State fromState, State toState) { long stamp = stateLock.writeLock(); try { if (state.get() == fromState) { state.set(toState); logger.info("Circuit breaker state changed from {} to {}", fromState, toState); return true; } return false; } finally { stateLock.unlockWrite(stamp); } }
// 使用乐观读获取当前状态 public State getCurrentState() { long stamp = stateLock.tryOptimisticRead(); State result = state.get();
if (!stateLock.validate(stamp)) { stamp = stateLock.readLock(); try { result = state.get(); } finally { stateLock.unlockRead(stamp); } }
return result; } }
// 全局异常处理器 static class GlobalExceptionHandler { private static final Logger logger = LoggerFactory.getLogger(GlobalExceptionHandler.class);
public static <T> CompletableFuture<T> withExceptionHandling(CompletableFuture<T> future) { return future.exceptionally(e -> { Throwable cause = e instanceof CompletionException ? e.getCause() : e;
if (cause instanceof ConsistencyException) { logger.error("Consistency error: {}", cause.getMessage()); } else if (cause instanceof IOException) { logger.error("I/O error: {}", cause.getMessage()); } else if (cause instanceof InterruptedException) { Thread.currentThread().interrupt(); logger.warn("Operation interrupted"); } else { logger.error("Unexpected error: {}", cause.getClass().getName(), cause); }
throw new CompletionException(cause); }); } }
// 指标收集类 private static class MetricsCollector { private final Counter writeRequests; private final Counter writeSuccess; private final Counter writeFailed; private final Counter writeRejected; private final Counter batchWrites; private final Counter batchWriteRequests; private final Counter readRequests; private final Map<ConsistencyLevel, Counter> readsByLevel = new EnumMap<>(ConsistencyLevel.class); private final Histogram writeLatency; private final Histogram batchWriteLatency; private final Map<ConsistencyLevel, Histogram> readLatency = new EnumMap<>(ConsistencyLevel.class);
public MetricsCollector(String prefix) { this.writeRequests = Counter.build() .name(prefix + "_write_requests_total") .help("Total number of write requests").register();
this.writeSuccess = Counter.build() .name(prefix + "_write_success_total") .help("Total number of successful writes").register();
this.writeFailed = Counter.build() .name(prefix + "_write_failed_total") .help("Total number of failed writes").register();
this.writeRejected = Counter.build() .name(prefix + "_write_rejected_total") .help("Total number of rejected writes").register();
this.batchWrites = Counter.build() .name(prefix + "_batch_writes_total") .help("Total number of batch writes").register();
this.batchWriteRequests = Counter.build() .name(prefix + "_batch_write_requests_total") .help("Total number of requests in batch writes").register();
this.readRequests = Counter.build() .name(prefix + "_read_requests_total") .help("Total number of read requests").register();
this.writeLatency = Histogram.build() .name(prefix + "_write_latency_ms") .help("Write latency in milliseconds").register();
this.batchWriteLatency = Histogram.build() .name(prefix + "_batch_write_latency_ms") .help("Batch write latency in milliseconds").register();
// 初始化各一致性级别的计数器和直方图 for (ConsistencyLevel level : ConsistencyLevel.values()) { readsByLevel.put(level, Counter.build() .name(prefix + "_reads_" + level.name().toLowerCase() + "_total") .help("Total " + level + " reads").register());
readLatency.put(level, Histogram.build() .name(prefix + "_read_" + level.name().toLowerCase() + "_latency_ms") .help(level + " read latency in milliseconds").register()); } }
public void recordSuccessfulWrite(long latencyMs) { writeRequests.inc(); writeSuccess.inc(); writeLatency.observe(latencyMs); }
public void recordFailedWrite() { writeRequests.inc(); writeFailed.inc(); }
public void recordRejectedWrite() { writeRequests.inc(); writeRejected.inc(); }
public void recordSuccessfulBatchWrite(int batchSize, long latencyMs) { batchWrites.inc(); batchWriteRequests.inc(batchSize); writeRequests.inc(batchSize); writeSuccess.inc(batchSize); batchWriteLatency.observe(latencyMs); }
public void recordFailedBatchWrite(int batchSize) { batchWrites.inc(); batchWriteRequests.inc(batchSize); writeRequests.inc(batchSize); writeFailed.inc(batchSize); }
public void recordRejectedBatchWrite(int batchSize) { batchWrites.inc(); batchWriteRequests.inc(batchSize); writeRequests.inc(batchSize); writeRejected.inc(batchSize); }
public void recordRead(ConsistencyLevel level, long latencyMs) { readRequests.inc(); readsByLevel.get(level).inc(); readLatency.get(level).observe(latencyMs); }
public void recordFailedRead(ConsistencyLevel level) { readRequests.inc(); // 可以添加失败计数器 } }
// 异常类 public static class CircuitBreakerOpenException extends Exception { public CircuitBreakerOpenException(String message) { super(message); } }
public static class ConsistencyException extends RuntimeException { public ConsistencyException(String message) { super(message); } }
public static class ProcessingException extends RuntimeException { public ProcessingException(String message, Throwable cause) { super(message, cause); } }
// 其他内部类和常量定义...
enum ConsistencyLevel { LINEARIZABLE, // 线性一致性(最强) SEQUENTIAL, // 顺序一致性 READ_YOUR_WRITES, // 读己所写 BOUNDED_STALENESS, // 有界陈旧性 EVENTUAL // 最终一致性(最弱) }}
复制代码

Fast Leader Election 算法

public class FastLeaderElection {    private final AtomicLong logicalClock = new AtomicLong(0);    private final ConcurrentMap<String, Vote> receivedVotes = new ConcurrentHashMap<>();    private final String serverId;    private final NetworkManager networkManager;    private final int quorumSize;    private final AtomicInteger electionAttempts = new AtomicInteger(0);    private final Logger logger = LoggerFactory.getLogger(FastLeaderElection.class);    private final ZxidUtils zxidUtils;
public FastLeaderElection(String serverId, int quorumSize, NetworkManager networkManager, ZxidUtils zxidUtils) { this.serverId = serverId; this.quorumSize = quorumSize; this.networkManager = networkManager; this.zxidUtils = zxidUtils; }
public String lookForLeader() throws InterruptedException { MDC.put("component", "fast-leader-election"); MDC.put("serverId", serverId); try { // 递增逻辑时钟 long newLogicalClock = logicalClock.incrementAndGet(); logger.info("Starting leader election with logical clock: {}", newLogicalClock);
// 初始化选票,投给自己 Vote vote = new Vote(serverId, zxidUtils.getLastZxid(), newLogicalClock); receivedVotes.clear(); receivedVotes.put(serverId, vote);
// 向所有其他服务器发送选票 networkManager.broadcastVote(vote);
// 选举超时时间 long startTime = System.currentTimeMillis(); long maxTimeout = 60000; // 60秒最大超时
// 选举循环 Map<String, Integer> voteCounter = new HashMap<>(); String currentLeader = null;
while (System.currentTimeMillis() - startTime < maxTimeout) { // 接收选票 Vote receivedVote = networkManager.receiveVote(200); // 200ms超时 if (receivedVote != null) { MDC.put("candidateId", receivedVote.getServerId()); logger.debug("Received vote from {}: zxid={}, logicalClock={}", receivedVote.getServerId(), Long.toHexString(receivedVote.getZxid()), receivedVote.getLogicalClock());
// 验证逻辑时钟 if (receivedVote.getLogicalClock() > newLogicalClock) { // 发现更高的逻辑时钟,需要更新自己的时钟并重新开始选举 logicalClock.set(receivedVote.getLogicalClock()); logger.info("Found higher logical clock: {}, restarting election", receivedVote.getLogicalClock()); MDC.remove("candidateId"); electionAttempts.set(0); // 重置尝试计数 return lookForLeader(); // 重新开始选举 } else if (receivedVote.getLogicalClock() < newLogicalClock) { // 忽略旧的逻辑时钟选票 logger.debug("Ignoring vote with older logical clock: {}", receivedVote.getLogicalClock()); MDC.remove("candidateId"); continue; }
// 比较选票 int comparison = compareVotes(vote, receivedVote); if (comparison < 0) { // 收到更好的选票,更新自己的选票 vote = new Vote(receivedVote.getServerId(), receivedVote.getZxid(), newLogicalClock); // 重新广播更新后的选票 networkManager.broadcastVote(vote); logger.info("Updated vote to server: {}", vote.getServerId()); }
// 记录收到的选票 receivedVotes.put(receivedVote.getServerId(), receivedVote); MDC.remove("candidateId");
// 统计票数 voteCounter.clear(); for (Vote v : receivedVotes.values()) { String candidate = v.getServerId(); voteCounter.put(candidate, voteCounter.getOrDefault(candidate, 0) + 1);
// 检查是否有候选人获得多数派支持 if (voteCounter.get(candidate) >= quorumSize) { currentLeader = candidate; logger.info("Elected leader: {} with {} votes of {} required", candidate, voteCounter.get(candidate), quorumSize); break; } }
if (currentLeader != null) { break; // 选出了Leader } } }
if (currentLeader == null) { // 处理选举失败,使用指数退避避免活锁 handleElectionFailure(); logger.warn("Failed to elect a leader, retrying..."); return lookForLeader(); // 重试 }
electionAttempts.set(0); // 重置尝试计数 return currentLeader;
} catch (Exception e) { logger.error("Error during leader election", e); // 增加选举尝试计数并退避 handleElectionFailure(); throw new LeaderElectionException("Leader election failed", e); } finally { MDC.remove("component"); MDC.remove("serverId"); } }
// 处理选举失败,使用指数退避避免活锁 private void handleElectionFailure() { int attempts = electionAttempts.incrementAndGet(); // 指数退避 int backoffMs = Math.min(1000 * (1 << Math.min(attempts, 10)), 30000); // 添加随机抖动避免同步 backoffMs += ThreadLocalRandom.current().nextInt(backoffMs / 2); logger.info("Election attempt {} failed, backing off for {}ms", attempts, backoffMs); try { Thread.sleep(backoffMs); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.warn("Interrupted during election backoff"); } }
// 比较两个选票,返回负数表示v2更好,0表示相等,正数表示v1更好 private int compareVotes(Vote v1, Vote v2) { // 首先比较zxid,更大的zxid具有更高优先级 long zxidDiff = ZxidUtils.compareZxid(v1.getZxid(), v2.getZxid()); if (zxidDiff != 0) { return (int) Math.signum(zxidDiff); }
// zxid相等,比较serverId return v1.getServerId().compareTo(v2.getServerId()); }
// 内部类和工具方法...
static class Vote { private final String serverId; private final long zxid; private final long logicalClock;
public Vote(String serverId, long zxid, long logicalClock) { this.serverId = serverId; this.zxid = zxid; this.logicalClock = logicalClock; }
public String getServerId() { return serverId; }
public long getZxid() { return zxid; }
public long getLogicalClock() { return logicalClock; }
@Override public String toString() { return "Vote{serverId='" + serverId + "', zxid=" + Long.toHexString(zxid) + ", logicalClock=" + logicalClock + '}'; } }
// 自定义异常类 public static class LeaderElectionException extends RuntimeException { public LeaderElectionException(String message, Throwable cause) { super(message, cause); } }}
复制代码

网络客户端实现示例

public class NettyNetworkClient implements NetworkClient {    private final EventLoopGroup workerGroup;    private final Bootstrap bootstrap;    private final ConcurrentMap<String, Channel> channels = new ConcurrentHashMap<>();    private final int connectionTimeoutMs;    private final Logger logger = LoggerFactory.getLogger(NettyNetworkClient.class);
public NettyNetworkClient(int connectionTimeoutMs) { this.connectionTimeoutMs = connectionTimeoutMs; this.workerGroup = new NioEventLoopGroup(); this.bootstrap = new Bootstrap() .group(workerGroup) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeoutMs) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline() .addLast(new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)) .addLast(new LengthFieldPrepender(4)) .addLast(new PacketEncoder()) .addLast(new PacketDecoder()) .addLast(new ClientHandler()); } }); }
@Override public void connect(String serverId, String address, int port) throws IOException { try { ChannelFuture future = bootstrap.connect(address, port); boolean connected = future.await(connectionTimeoutMs, TimeUnit.MILLISECONDS);
if (!connected || !future.isSuccess()) { throw new IOException("Failed to connect to " + serverId + " at " + address + ":" + port); }
channels.put(serverId, future.channel()); logger.info("Connected to server: {} at {}:{}", serverId, address, port); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException("Interrupted while connecting to " + serverId, e); } catch (Exception e) { throw new IOException("Failed to connect to " + serverId, e); } }
@Override public void disconnect(String serverId) { Channel channel = channels.remove(serverId); if (channel != null) { channel.close(); logger.info("Disconnected from server: {}", serverId); } }
@Override public ACK sendProposal(String serverId, ProposalPacket proposal) throws IOException { MDC.put("targetServerId", serverId); try { Channel channel = getChannel(serverId); RequestFuture<ACK> future = new RequestFuture<>();
// 存储请求-响应映射 Long requestId = generateRequestId(); RequestRegistry.register(requestId, future);
// 包装请求 Request request = new Request(requestId, RequestType.PROPOSAL, proposal);
// 发送请求 channel.writeAndFlush(request).sync();
// 等待响应 ACK ack = future.get(5, TimeUnit.SECONDS); if (ack == null) { throw new IOException("Received null ACK from " + serverId); }
return ack; } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException("Interrupted while sending proposal to " + serverId, e); } catch (TimeoutException e) { throw new IOException("Timed out waiting for ACK from " + serverId, e); } catch (ExecutionException e) { throw new IOException("Error sending proposal to " + serverId, e.getCause()); } finally { MDC.remove("targetServerId"); } }
@Override public void sendCommit(String serverId, CommitPacket commit) throws IOException { MDC.put("targetServerId", serverId); try { Channel channel = getChannel(serverId);
// 包装请求 Request request = new Request(generateRequestId(), RequestType.COMMIT, commit);
// 发送请求 - 不等待响应 channel.writeAndFlush(request); } catch (Exception e) { throw new IOException("Error sending commit to " + serverId, e); } finally { MDC.remove("targetServerId"); } }
@Override public LastZxidResponse sendEpochRequest(String serverId, EpochPacket epochPkt) throws IOException { MDC.put("targetServerId", serverId); try { Channel channel = getChannel(serverId); RequestFuture<LastZxidResponse> future = new RequestFuture<>();
// 存储请求-响应映射 Long requestId = generateRequestId(); RequestRegistry.register(requestId, future);
// 包装请求 Request request = new Request(requestId, RequestType.EPOCH, epochPkt);
// 发送请求 channel.writeAndFlush(request).sync();
// 等待响应 LastZxidResponse response = future.get(5, TimeUnit.SECONDS); if (response == null) { throw new IOException("Received null LastZxidResponse from " + serverId); }
return response; } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException("Interrupted while sending epoch request to " + serverId, e); } catch (TimeoutException e) { throw new IOException("Timed out waiting for LastZxidResponse from " + serverId, e); } catch (ExecutionException e) { throw new IOException("Error sending epoch request to " + serverId, e.getCause()); } finally { MDC.remove("targetServerId"); } }
// 实现其他接口方法...
@Override public void sendSnapshot(String serverId, byte[] snapshot, long zxid) throws IOException { MDC.put("targetServerId", serverId); try { Channel channel = getChannel(serverId);
// 由于快照可能很大,按块发送 int chunkSize = 1024 * 1024; // 1MB块 int totalChunks = (snapshot.length + chunkSize - 1) / chunkSize;
logger.info("Sending snapshot to {}, size: {} bytes, chunks: {}", serverId, snapshot.length, totalChunks);
// 发送快照元数据 SnapshotMetadata metadata = new SnapshotMetadata(zxid, snapshot.length, totalChunks); Request metadataRequest = new Request(generateRequestId(), RequestType.SNAPSHOT_META, metadata); channel.writeAndFlush(metadataRequest).sync();
// 分块发送快照数据 for (int i = 0; i < totalChunks; i++) { int offset = i * chunkSize; int length = Math.min(chunkSize, snapshot.length - offset); byte[] chunk = new byte[length]; System.arraycopy(snapshot, offset, chunk, 0, length);
SnapshotChunk snapshotChunk = new SnapshotChunk(i, totalChunks, chunk); Request chunkRequest = new Request(generateRequestId(), RequestType.SNAPSHOT_CHUNK, snapshotChunk);
channel.writeAndFlush(chunkRequest).sync();
if (i % 10 == 0 || i == totalChunks - 1) { logger.debug("Sent snapshot chunk {}/{} to {}", i + 1, totalChunks, serverId); } }
logger.info("Snapshot sent successfully to {}", serverId); } catch (Exception e) { throw new IOException("Error sending snapshot to " + serverId, e); } finally { MDC.remove("targetServerId"); } }
// 获取连接到指定服务器的通道 private Channel getChannel(String serverId) throws IOException { Channel channel = channels.get(serverId); if (channel == null || !channel.isActive()) { throw new IOException("No active connection to server: " + serverId); } return channel; }
// 生成唯一请求ID private static final AtomicLong requestIdGenerator = new AtomicLong(0);
private static Long generateRequestId() { return requestIdGenerator.incrementAndGet(); }
// 关闭客户端 public void shutdown() { // 关闭所有连接 for (Channel channel : channels.values()) { channel.close(); } channels.clear();
// 关闭事件循环组 workerGroup.shutdownGracefully(); }
// 请求类型 enum RequestType { PROPOSAL, COMMIT, EPOCH, TRUNCATE, TRANSACTION, NEWLEADER, HEARTBEAT, SNAPSHOT_META, SNAPSHOT_CHUNK }
// 请求对象 static class Request { private final Long id; private final RequestType type; private final Object payload;
public Request(Long id, RequestType type, Object payload) { this.id = id; this.type = type; this.payload = payload; }
public Long getId() { return id; }
public RequestType getType() { return type; }
public Object getPayload() { return payload; } }
// 快照元数据 static class SnapshotMetadata { private final long zxid; private final int totalSize; private final int totalChunks;
public SnapshotMetadata(long zxid, int totalSize, int totalChunks) { this.zxid = zxid; this.totalSize = totalSize; this.totalChunks = totalChunks; }
public long getZxid() { return zxid; }
public int getTotalSize() { return totalSize; }
public int getTotalChunks() { return totalChunks; } }
// 快照数据块 static class SnapshotChunk { private final int chunkIndex; private final int totalChunks; private final byte[] data;
public SnapshotChunk(int chunkIndex, int totalChunks, byte[] data) { this.chunkIndex = chunkIndex; this.totalChunks = totalChunks; this.data = data.clone(); // 防御性复制 }
public int getChunkIndex() { return chunkIndex; }
public int getTotalChunks() { return totalChunks; }
public byte[] getData() { return data.clone(); // 防御性复制 } }
// 请求-响应映射注册表 static class RequestRegistry { private static final ConcurrentMap<Long, RequestFuture<?>> futures = new ConcurrentHashMap<>();
public static <T> void register(Long requestId, RequestFuture<T> future) { futures.put(requestId, future); }
@SuppressWarnings("unchecked") public static <T> void complete(Long requestId, T response) { RequestFuture<T> future = (RequestFuture<T>) futures.remove(requestId); if (future != null) { future.complete(response); } }
public static void completeExceptionally(Long requestId, Throwable exception) { RequestFuture<?> future = futures.remove(requestId); if (future != null) { future.completeExceptionally(exception); } } }
// 请求Future static class RequestFuture<T> extends CompletableFuture<T> { // 继承CompletableFuture,无需额外实现 }
// 客户端处理器 private class ClientHandler extends SimpleChannelInboundHandler<Response> { @Override protected void channelRead0(ChannelHandlerContext ctx, Response response) { Long requestId = response.getRequestId(); if (response.isSuccess()) { RequestRegistry.complete(requestId, response.getPayload()); } else { RequestRegistry.completeExceptionally(requestId, new IOException("Request failed: " + response.getErrorMessage())); } }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { logger.error("Network client exception", cause); ctx.close(); } }
// 响应对象 static class Response { private final Long requestId; private final boolean success; private final Object payload; private final String errorMessage;
public Response(Long requestId, boolean success, Object payload, String errorMessage) { this.requestId = requestId; this.success = success; this.payload = payload; this.errorMessage = errorMessage; }
public Long getRequestId() { return requestId; }
public boolean isSuccess() { return success; }
public Object getPayload() { return payload; }
public String getErrorMessage() { return errorMessage; } }
// 编码器 static class PacketEncoder extends MessageToByteEncoder<Request> { @Override protected void encode(ChannelHandlerContext ctx, Request msg, ByteBuf out) throws Exception { // 使用协议缓冲区或自定义序列化 // 这里简化为示例 byte[] bytes = serializeRequest(msg); out.writeBytes(bytes); }
private byte[] serializeRequest(Request request) { // 实际实现应使用正式的序列化机制 // 这里简化为示例 return new byte[0]; } }
// 解码器 static class PacketDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // 使用协议缓冲区或自定义反序列化 // 这里简化为示例 if (in.readableBytes() >= 4) { // 至少包含长度字段 in.markReaderIndex(); int length = in.readInt();
if (in.readableBytes() < length) { in.resetReaderIndex(); return; }
byte[] data = new byte[length]; in.readBytes(data);
Response response = deserializeResponse(data); out.add(response); } }
private Response deserializeResponse(byte[] data) { // 实际实现应使用正式的反序列化机制 // 这里简化为示例 return null; } }}
复制代码

三、Paxos 算法实现

核心接口定义

// 角色接口定义public interface Proposer {    CompletableFuture<Boolean> prepare(int ballot);    CompletableFuture<Boolean> propose(int ballot, Object value);}
public interface Acceptor { CompletableFuture<Promise> handlePrepare(int ballot); CompletableFuture<Accepted> handleAccept(int ballot, Object value);}
public interface Learner { void learn(long instanceId, int ballot, Object value);}
public interface NetworkClient { CompletableFuture<Promise> sendPrepare(int nodeId, int ballot); CompletableFuture<Accepted> sendAccept(int nodeId, int ballot, Object value); void sendLearn(int nodeId, long instanceId, int ballot, Object value); CompletableFuture<Map<Long, PrepareResponse>> sendPrepareAllInstances(int nodeId, int ballot); CompletableFuture<Void> sendSnapshot(int nodeId, byte[] snapshot, long lastInstanceId);}
public interface StateMachine { CompletableFuture<Void> apply(long instanceId, byte[] command); long getLastApplied(); CompletableFuture<byte[]> takeSnapshot(); CompletableFuture<Void> restoreSnapshot(byte[] snapshot, long instanceId);}
复制代码

Basic Paxos 实现

public class BasicPaxosNode implements Proposer, Acceptor, Learner, AutoCloseable {    private final int nodeId;    private final AtomicInteger ballot = new AtomicInteger(0);    private volatile Object proposalValue = null;    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();    private volatile int acceptedBallot = 0;    private volatile Object acceptedValue = null;    private final int totalNodes;    private final NetworkClient networkClient;    private final Logger logger = LoggerFactory.getLogger(BasicPaxosNode.class);    private final RetryStrategy retryStrategy;    private final MetricsCollector metrics;
public BasicPaxosNode(int nodeId, int totalNodes, NetworkClient networkClient) { this.nodeId = nodeId; this.totalNodes = totalNodes; this.networkClient = networkClient; this.retryStrategy = new ExponentialBackoffRetry(100, 5000, 3); this.metrics = new MetricsCollector("paxos_basic", nodeId); }
// Proposer: 准备阶段 @Override public CompletableFuture<Boolean> prepare(int suggestedBallot) { final int newBallot = suggestedBallot > 0 ? suggestedBallot : generateNewBallot(); final Stopwatch stopwatch = Stopwatch.createStarted();
MDC.put("component", "paxos-proposer"); MDC.put("nodeId", String.valueOf(nodeId)); MDC.put("ballot", String.valueOf(newBallot)); logger.info("Starting prepare phase with ballot {}", newBallot);
CompletableFuture<Boolean> result = new CompletableFuture<>(); CompletableFuture.runAsync(() -> { try { // 向所有Acceptor发送Prepare请求 List<CompletableFuture<Promise>> futures = sendPrepare(newBallot);
// 收集结果 List<Promise> promises = new ArrayList<>(); for (CompletableFuture<Promise> future : futures) { try { Promise promise = future.get(3, TimeUnit.SECONDS); if (promise != null) { promises.add(promise); } } catch (Exception e) { logger.warn("Error getting prepare response", e); } }
// 如果获得多数派响应 int quorum = getQuorum(); int okCount = (int) promises.stream().filter(Promise::isOk).count();
if (okCount >= quorum) { // 更新ballot ballot.updateAndGet(current -> Math.max(current, newBallot));
// 选择已接受的最高编号提案的值 Promise highestPromise = selectHighestBallotPromise(promises); rwLock.writeLock().lock(); try { if (highestPromise != null && highestPromise.getAcceptedValue() != null) { proposalValue = highestPromise.getAcceptedValue(); logger.info("Using previously accepted value: {}", proposalValue); } } finally { rwLock.writeLock().unlock(); }
metrics.recordPrepareSuccess(stopwatch.elapsed(TimeUnit.MILLISECONDS)); result.complete(true); } else { logger.info("Failed to get quorum in prepare phase: {} of {} responses ok", okCount, promises.size()); metrics.recordPrepareFailed(); result.complete(false); } } catch (Exception e) { logger.error("Error in prepare phase", e); metrics.recordPrepareFailed(); result.completeExceptionally(e); } finally { MDC.remove("component"); MDC.remove("nodeId"); MDC.remove("ballot"); } });
return result; }
// Proposer: 接受阶段 @Override public CompletableFuture<Boolean> propose(int ballot, Object value) { final Stopwatch stopwatch = Stopwatch.createStarted();
MDC.put("component", "paxos-proposer"); MDC.put("nodeId", String.valueOf(nodeId)); MDC.put("ballot", String.valueOf(ballot));
return prepare(ballot).thenCompose(prepared -> { if (!prepared) { logger.info("Prepare phase failed, cannot proceed to propose"); metrics.recordProposeFailed(); return CompletableFuture.completedFuture(false); }
// 获取当前要提议的值 final Object valueToPropose; rwLock.readLock().lock(); try { // 如果准备阶段没有发现已接受的值,使用提议者的值 valueToPropose = proposalValue != null ? proposalValue : value; logger.info("Starting accept phase with ballot {} and value {}", ballot, valueToPropose); } finally { rwLock.readLock().unlock(); }
return CompletableFuture.supplyAsync(() -> { try { // 向所有Acceptor发送Accept请求 List<CompletableFuture<Accepted>> futures = sendAccept(ballot, valueToPropose);
// 收集结果 List<Accepted> responses = new ArrayList<>(); for (CompletableFuture<Accepted> future : futures) { try { Accepted accepted = future.get(3, TimeUnit.SECONDS); if (accepted != null) { responses.add(accepted); } } catch (Exception e) { logger.warn("Error getting accept response", e); } }
// 如果获得多数派接受 int quorum = getQuorum(); int accepted = (int) responses.stream().filter(Accepted::isOk).count(); boolean success = accepted >= quorum;
if (success) { logger.info("Value {} has been accepted by the majority ({} of {})", valueToPropose, accepted, responses.size());
// 通知所有Learner broadcastToLearners(1, ballot, valueToPropose); metrics.recordProposeSuccess(stopwatch.elapsed(TimeUnit.MILLISECONDS)); } else { logger.info("Failed to get quorum in accept phase: {} of {} responses ok", accepted, responses.size()); metrics.recordProposeFailed(); }
return success; } catch (Exception e) { logger.error("Error in propose phase", e); metrics.recordProposeFailed(); throw new CompletionException(e); } finally { MDC.remove("component"); MDC.remove("nodeId"); MDC.remove("ballot"); } }); }).exceptionally(e -> { logger.error("Failed to propose value", e); metrics.recordProposeFailed(); return false; }); }
// Acceptor: 处理Prepare请求 @Override public CompletableFuture<Promise> handlePrepare(int proposalBallot) { MDC.put("component", "paxos-acceptor"); MDC.put("nodeId", String.valueOf(nodeId)); MDC.put("ballot", String.valueOf(proposalBallot));
return CompletableFuture.supplyAsync(() -> { Promise promise = new Promise();
rwLock.writeLock().lock(); try { if (proposalBallot > acceptedBallot) { // 承诺不再接受编号小于等于proposalBallot的提案 acceptedBallot = proposalBallot; promise.setOk(true); promise.setAcceptedBallot(this.acceptedBallot); promise.setAcceptedValue(this.acceptedValue); logger.info("Acceptor {} promised ballot {}", nodeId, proposalBallot); metrics.recordPromiseMade(); } else { promise.setOk(false); logger.info("Acceptor {} rejected ballot {}, current ballot: {}", nodeId, proposalBallot, acceptedBallot); metrics.recordPromiseRejected(); } return promise; } finally { rwLock.writeLock().unlock(); MDC.remove("component"); MDC.remove("nodeId"); MDC.remove("ballot"); } }); }
// Acceptor: 处理Accept请求 @Override public CompletableFuture<Accepted> handleAccept(int proposalBallot, Object proposalValue) { MDC.put("component", "paxos-acceptor"); MDC.put("nodeId", String.valueOf(nodeId)); MDC.put("ballot", String.valueOf(proposalBallot));
return CompletableFuture.supplyAsync(() -> { Accepted accepted = new Accepted();
rwLock.writeLock().lock(); try { if (proposalBallot >= acceptedBallot) { acceptedBallot = proposalBallot; acceptedValue = proposalValue; accepted.setOk(true); logger.info("Acceptor {} accepted ballot {} with value {}", nodeId, proposalBallot, proposalValue); metrics.recordAcceptMade(); } else { accepted.setOk(false); logger.info("Acceptor {} rejected accept for ballot {}, current ballot: {}", nodeId, proposalBallot, acceptedBallot); metrics.recordAcceptRejected(); } return accepted; } finally { rwLock.writeLock().unlock(); MDC.remove("component"); MDC.remove("nodeId"); MDC.remove("ballot"); } }); }
// Learner: 学习已决议的值 @Override public void learn(long instanceId, int ballot, Object value) { MDC.put("component", "paxos-learner"); MDC.put("nodeId", String.valueOf(nodeId)); MDC.put("instanceId", String.valueOf(instanceId)); MDC.put("ballot", String.valueOf(ballot));
try { logger.info("Learner {} learned value {} for instance {} with ballot {}", nodeId, value, instanceId, ballot); metrics.recordLearnReceived();
// 实际实现中,这里会将学习到的值应用到状态机 // applyToStateMachine(instanceId, value); } finally { MDC.remove("component"); MDC.remove("nodeId"); MDC.remove("instanceId"); MDC.remove("ballot"); } }
// 发送Prepare请求给所有Acceptor private List<CompletableFuture<Promise>> sendPrepare(int newBallot) { List<CompletableFuture<Promise>> futures = new ArrayList<>();
for (int i = 0; i < totalNodes; i++) { final int targetNodeId = i; if (targetNodeId == this.nodeId) { // 处理本地请求 futures.add(handlePrepare(newBallot)); } else { // 发送远程请求 futures.add(networkClient.sendPrepare(targetNodeId, newBallot) .exceptionally(e -> { logger.error("Failed to send prepare to node {}", targetNodeId, e); return null; })); } }
return futures; }
// 发送Accept请求给所有Acceptor private List<CompletableFuture<Accepted>> sendAccept(int ballot, Object value) { List<CompletableFuture<Accepted>> futures = new ArrayList<>();
for (int i = 0; i < totalNodes; i++) { final int targetNodeId = i; if (targetNodeId == this.nodeId) { // 处理本地请求 futures.add(handleAccept(ballot, value)); } else { // 发送远程请求 futures.add(networkClient.sendAccept(targetNodeId, ballot, value) .exceptionally(e -> { logger.error("Failed to send accept to node {}", targetNodeId, e); return null; })); } }
return futures; }
// 通知所有Learner已决议的值 private void broadcastToLearners(long instanceId, int ballot, Object value) { for (int i = 0; i < totalNodes; i++) { final int targetNodeId = i; if (targetNodeId == this.nodeId) { // 本地学习 learn(instanceId, ballot, value); } else { // 异步通知其他Learner CompletableFuture.runAsync(() -> { try { networkClient.sendLearn(targetNodeId, instanceId, ballot, value); } catch (Exception e) { logger.error("Failed to notify learner {}", targetNodeId, e); } }); } } }
// 选择最高ballot的Promise private Promise selectHighestBallotPromise(List<Promise> promises) { return promises.stream() .filter(p -> p.isOk() && p.getAcceptedValue() != null) .max(Comparator.comparingInt(Promise::getAcceptedBallot)) .orElse(null); }
// 生成比当前更大的提案编号 (加入节点ID保证唯一性) private int generateNewBallot() { // 确保新ballot大于之前的,并且保证不同节点的ballot唯一 return ballot.incrementAndGet() * totalNodes + nodeId; }
// 获取多数派数量 private int getQuorum() { return totalNodes / 2 + 1; }
@Override public void close() { // 释放资源 metrics.close(); }
// Promise类 public static class Promise { private boolean ok; private int acceptedBallot; private Object acceptedValue;
public boolean isOk() { return ok; }
public void setOk(boolean ok) { this.ok = ok; }
public int getAcceptedBallot() { return acceptedBallot; }
public void setAcceptedBallot(int acceptedBallot) { this.acceptedBallot = acceptedBallot; }
public Object getAcceptedValue() { return acceptedValue; }
public void setAcceptedValue(Object acceptedValue) { this.acceptedValue = acceptedValue; } }
// Accepted类 public static class Accepted { private boolean ok;
public boolean isOk() { return ok; }
public void setOk(boolean ok) { this.ok = ok; } }
// 指标收集类 private static class MetricsCollector implements AutoCloseable { // 指标定义...
public MetricsCollector(String prefix, int nodeId) { // 初始化指标... }
public void recordPrepareSuccess(long latencyMs) { // 记录准备阶段成功 }
public void recordPrepareFailed() { // 记录准备阶段失败 }
public void recordProposeSuccess(long latencyMs) { // 记录提议阶段成功 }
public void recordProposeFailed() { // 记录提议阶段失败 }
public void recordPromiseMade() { // 记录承诺次数 }
public void recordPromiseRejected() { // 记录拒绝承诺次数 }
public void recordAcceptMade() { // 记录接受次数 }
public void recordAcceptRejected() { // 记录拒绝接受次数 }
public void recordLearnReceived() { // 记录学习次数 }
@Override public void close() { // 清理资源 } }
// 异常处理与重试策略 interface RetryStrategy { <T> CompletableFuture<T> retry(Supplier<CompletableFuture<T>> action); }
// 指数退避重试策略 static class ExponentialBackoffRetry implements RetryStrategy { private final long initialBackoffMs; private final long maxBackoffMs; private final int maxRetries; private final Logger logger = LoggerFactory.getLogger(ExponentialBackoffRetry.class);
public ExponentialBackoffRetry(long initialBackoffMs, long maxBackoffMs, int maxRetries) { this.initialBackoffMs = initialBackoffMs; this.maxBackoffMs = maxBackoffMs; this.maxRetries = maxRetries; }
@Override public <T> CompletableFuture<T> retry(Supplier<CompletableFuture<T>> action) { return retryInternal(action, 0); }
private <T> CompletableFuture<T> retryInternal(Supplier<CompletableFuture<T>> action, int attempt) { return action.get().exceptionally(e -> { if (attempt >= maxRetries) { throw new CompletionException( new RetryExhaustedException("Max retries exceeded", e)); }
long backoff = Math.min(initialBackoffMs * (long)Math.pow(2, attempt), maxBackoffMs); backoff += ThreadLocalRandom.current().nextInt((int)(backoff / 5));
logger.info("Retry attempt {} after {}ms due to: {}", attempt + 1, backoff, e.getMessage());
return CompletableFuture.delayedExecutor(backoff, TimeUnit.MILLISECONDS) .execute(() -> retryInternal(action, attempt + 1)) .join(); }); } }
// 自定义异常类 public static class RetryExhaustedException extends RuntimeException { public RetryExhaustedException(String message, Throwable cause) { super(message, cause); } }}
复制代码

Multi-Paxos 实现

下面实现了 Multi-Paxos 的组件化架构,通过分离关注点提高代码的可维护性:


public class MultiPaxosSystem {    private final int nodeId;    private final Configuration config;    private final MultiPaxosLog log;    private final MultiPaxosStateMachine stateMachine;    private final MultiPaxosNetworking networking;    private final RoleManager roleManager;    private final ScheduledExecutorService scheduler;    private final Logger logger = LoggerFactory.getLogger(MultiPaxosSystem.class);
public MultiPaxosSystem(int nodeId, Configuration config) { this.nodeId = nodeId; this.config = config; this.log = new MultiPaxosLog(); this.stateMachine = new MultiPaxosStateMachine(); this.networking = new MultiPaxosNetworking(nodeId, config.getNodes()); this.roleManager = new RoleManager(this);
this.scheduler = Executors.newScheduledThreadPool(2, r -> { Thread t = new Thread(r, "multi-paxos-scheduler-" + nodeId); t.setDaemon(true); return t; });
// 启动日志应用线程 scheduler.scheduleWithFixedDelay(this::applyCommittedLogs, 100, 100, TimeUnit.MILLISECONDS);
// 启动Leader租约检查 scheduler.scheduleWithFixedDelay(this::checkLeaderLease, 1000, 1000, TimeUnit.MILLISECONDS); }
// 客户端API
// 追加新日志(写操作) public CompletableFuture<Boolean> appendLog(byte[] command) { if (!roleManager.isLeader()) { return CompletableFuture.failedFuture( new NotLeaderException("Not the leader", roleManager.getLeaderHint())); }
return roleManager.getLeaderRole().appendLog(command); }
// 读取操作 public CompletableFuture<byte[]> read(String key, ConsistencyLevel level) { switch (level) { case LINEARIZABLE: if (!roleManager.isLeader()) { return CompletableFuture.failedFuture( new NotLeaderException("Not the leader", roleManager.getLeaderHint())); } return roleManager.getLeaderRole().linearizableRead(key);
case SEQUENTIAL: return roleManager.getFollowerRole().sequentialRead(key);
case EVENTUAL: default: return roleManager.getFollowerRole().eventualRead(key); } }
// 尝试成为Leader public CompletableFuture<Boolean> electSelf() { return roleManager.electSelf(); }
// 日志应用 private void applyCommittedLogs() { try { long applied = stateMachine.getLastApplied(); long toApply = log.getCommitIndex();
if (applied >= toApply) { return; // 已全部应用 }
List<CompletableFuture<Void>> applyFutures = new ArrayList<>();
// 应用从applied+1到toApply的所有日志 for (long i = applied + 1; i <= toApply; i++) { final long instanceId = i; LogEntry entry = log.getEntry(instanceId);
if (entry != null && entry.isCommitted()) { applyFutures.add( stateMachine.apply(instanceId, entry.getCommand()) .thenRun(() -> { logger.debug("Applied log entry at instance {} to state machine", instanceId); }) .exceptionally(e -> { logger.error("Failed to apply log at instance {}", instanceId, e); return null; }) ); } }
// 等待所有应用完成 CompletableFuture.allOf(applyFutures.toArray(new CompletableFuture[0])) .thenRun(() -> { // 日志压缩 if (toApply - applied > 1000) { // 如果应用了大量日志,考虑压缩 log.compactLogs(stateMachine.getLastApplied()); } }) .exceptionally(e -> { logger.error("Error during log application", e); return null; }); } catch (Exception e) { logger.error("Error applying committed logs", e); } }
// 检查Leader租约 private void checkLeaderLease() { if (roleManager.isLeader()) { roleManager.getLeaderRole().checkLease(); } }
// 关闭系统 public void shutdown() { try { List<Runnable> pendingTasks = scheduler.shutdownNow(); if (!pendingTasks.isEmpty()) { logger.warn("Scheduler shutdown with {} pending tasks", pendingTasks.size()); }
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { logger.warn("Scheduler did not terminate in time"); }
networking.close(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.warn("Interrupted while waiting for scheduler termination"); } }
// 角色管理 public class RoleManager { private final MultiPaxosSystem system; private final AtomicBoolean isLeader = new AtomicBoolean(false); private final AtomicInteger currentBallot = new AtomicInteger(0); private volatile int leaderNodeId = -1; // -1表示未知
private final LeaderRole leaderRole; private final FollowerRole followerRole;
public RoleManager(MultiPaxosSystem system) { this.system = system; this.leaderRole = new LeaderRole(system); this.followerRole = new FollowerRole(system); }
public boolean isLeader() { return isLeader.get(); }
public int getLeaderHint() { return leaderNodeId; }
public LeaderRole getLeaderRole() { return leaderRole; }
public FollowerRole getFollowerRole() { return followerRole; }
public int getCurrentBallot() { return currentBallot.get(); }
public void setCurrentBallot(int ballot) { currentBallot.set(ballot); }
public CompletableFuture<Boolean> electSelf() { return leaderRole.electSelf().thenApply(elected -> { if (elected) { isLeader.set(true); leaderNodeId = nodeId; } return elected; }); }
public void stepDown() { if (isLeader.compareAndSet(true, false)) { logger.info("Node {} stepping down from leader", nodeId); } }
public void recognizeLeader(int leaderId, int ballot) { leaderNodeId = leaderId; currentBallot.set(ballot); if (leaderId != nodeId) { isLeader.set(false); } } }
// Leader角色实现 public class LeaderRole { private final MultiPaxosSystem system; private final AtomicLong leaseExpirationTime = new AtomicLong(0); private final long leaderLeaseMs = 5000; // 5秒租约
public LeaderRole(MultiPaxosSystem system) { this.system = system; }
// Leader选举 public CompletableFuture<Boolean> electSelf() { MDC.put("component", "multi-paxos-leader"); MDC.put("nodeId", String.valueOf(nodeId)); logger.info("Node {} attempting to become leader", nodeId);
try { int newBallot = generateNewBallot(); MDC.put("ballot", String.valueOf(newBallot));
return CompletableFuture.supplyAsync(() -> { try { // 执行Prepare阶段 Map<Long, PrepareResponse> responseMap = networking.sendPrepareForAllInstances(newBallot) .get(10, TimeUnit.SECONDS);
// 检查是否获得多数派支持 if (hasQuorumPromises(responseMap)) { // 根据收集到的信息,更新本地日志 updateLogFromPromises(responseMap);
// 成为Leader system.roleManager.setCurrentBallot(newBallot); system.roleManager.recognizeLeader(nodeId, newBallot);
logger.info("Node {} became leader with ballot {}", nodeId, newBallot); renewLease();
// 执行接受阶段,确保之前的日志得到多数派接受 confirmPendingLogs();
return true; } else { logger.info("Failed to become leader - did not get quorum promises"); return false; } } catch (Exception e) { logger.error("Error in become leader process", e); return false; } finally { MDC.remove("component"); MDC.remove("nodeId"); MDC.remove("ballot"); } }); } catch (Exception e) { logger.error("Error initiating leader election", e); MDC.remove("component"); MDC.remove("nodeId"); return CompletableFuture.failedFuture(e); } }
// Leader: 追加新日志 public CompletableFuture<Boolean> appendLog(byte[] command) { Stopwatch stopwatch = Stopwatch.createStarted(); MDC.put("component", "multi-paxos-leader"); MDC.put("nodeId", String.valueOf(nodeId));
if (!system.roleManager.isLeader()) { MDC.remove("component"); MDC.remove("nodeId"); return CompletableFuture.failedFuture( new NotLeaderException("Node is not the leader", system.roleManager.getLeaderHint())); }
try { long nextInstance = system.log.getNextInstanceId(); MDC.put("instanceId", String.valueOf(nextInstance)); logger.info("Leader {} appending log at instance {}", nodeId, nextInstance);
// 创建日志条目 int currentBallot = system.roleManager.getCurrentBallot(); LogEntry entry = new LogEntry(currentBallot, command.clone()); // 防御性复制
// 存储日志条目 system.log.setEntry(nextInstance, entry);
// 对于已有Leader,可以跳过Prepare阶段,直接进入Accept阶段 return CompletableFuture.supplyAsync(() -> { try { List<AcceptResponse> responses = networking.sendAcceptRequests( nextInstance, currentBallot, command) .get(5, TimeUnit.SECONDS);
// 如果多数派接受 int quorum = getQuorum(); if (countAccepts(responses) >= quorum) { // 提交日志 entry.setCommitted(true); system.log.updateCommitIndex(nextInstance);
// 通知所有节点提交 networking.sendCommitNotifications(nextInstance, currentBallot);
logger.info("Log entry at instance {} has been committed", nextInstance); return true; } else { logger.warn("Failed to get quorum for instance {}", nextInstance); // 可能失去了领导权,尝试重新选举 system.roleManager.stepDown(); return false; } } catch (Exception e) { logger.error("Error in append log", e); throw new CompletionException(e); } finally { MDC.remove("component"); MDC.remove("nodeId"); MDC.remove("instanceId"); } }); } catch (Exception e) { logger.error("Error initiating append log", e); MDC.remove("component"); MDC.remove("nodeId"); return CompletableFuture.failedFuture(e); } }
// 线性一致性读取(通过Leader确认) public CompletableFuture<byte[]> linearizableRead(String key) { if (!system.roleManager.isLeader()) { return CompletableFuture.failedFuture( new NotLeaderException("Not the leader", system.roleManager.getLeaderHint())); }
// 检查租约 if (System.currentTimeMillis() >= leaseExpirationTime.get()) { // 租约过期,需要重新确认Leader身份 return renewLease().thenCompose(renewed -> { if (!renewed) { return CompletableFuture.failedFuture( new ConsistencyException("Could not renew leadership lease")); } return system.stateMachine.read(key); }); }
// 租约有效,直接读取 return system.stateMachine.read(key); }
// 更新Leader租约 private CompletableFuture<Boolean> renewLease() { if (!system.roleManager.isLeader()) { return CompletableFuture.completedFuture(false); }
return CompletableFuture.supplyAsync(() -> { try { // 向多数派发送心跳以确认仍是Leader int currentBallot = system.roleManager.getCurrentBallot(); int responses = networking.sendLeadershipHeartbeats(currentBallot) .get(3, TimeUnit.SECONDS);
if (responses >= getQuorum()) { leaseExpirationTime.set(System.currentTimeMillis() + leaderLeaseMs); logger.debug("Renewed leader lease until {}", leaseExpirationTime.get()); return true; } else { logger.warn("Failed to renew leadership lease"); return false; } } catch (Exception e) { logger.error("Error renewing leadership lease", e); return false; } }); }
// 检查租约状态 public void checkLease() { if (!system.roleManager.isLeader()) { return; }
// 如果租约即将过期,尝试续期 long now = System.currentTimeMillis(); long expiration = leaseExpirationTime.get();
// 如果租约将在1秒内过期,提前续期 if (now + 1000 > expiration) { renewLease().thenAccept(renewed -> { if (!renewed) { logger.warn("Lease renewal failed, stepping down as leader"); system.roleManager.stepDown(); } }); } }
// 确保之前的日志条目被多数派接受 private void confirmPendingLogs() { // 实现逻辑... }
// 根据prepare响应更新日志 private void updateLogFromPromises(Map<Long, PrepareResponse> responseMap) { // 实现逻辑... }
// 检查是否获得多数派promise private boolean hasQuorumPromises(Map<Long, PrepareResponse> responseMap) { // 实现逻辑... return true; // 简化 }
// 统计accept响应 private int countAccepts(List<AcceptResponse> responses) { return (int) responses.stream() .filter(r -> r != null && r.isAccepted()) .count(); } }
// Follower角色实现 public class FollowerRole { private final MultiPaxosSystem system; private final Map<String, CacheEntry> readCache = new ConcurrentHashMap<>(); private final long maxCacheAgeMs = 5000; // 5秒缓存过期
public FollowerRole(MultiPaxosSystem system) { this.system = system; }
// 处理心跳消息 public void handleHeartbeat(int leaderBallot, int leaderNodeId, long leaderCommitIndex) { // 更新本地commit index system.log.updateCommitIndex(leaderCommitIndex);
// 如果自己认为自己是Leader但收到更高ballot的心跳,则退位 if (system.roleManager.isLeader() && leaderBallot > system.roleManager.getCurrentBallot()) { logger.info("Stepping down as leader due to heartbeat with higher ballot: {}", leaderBallot); system.roleManager.stepDown(); }
// 记录当前Leader system.roleManager.recognizeLeader(leaderNodeId, leaderBallot); }
// 顺序一致性读(确保看到所有之前的写入) public CompletableFuture<byte[]> sequentialRead(String key) { // 确保应用了所有已提交的事务 return ensureAppliedUpToCommitIndex() .thenCompose(v -> system.stateMachine.read(key)); }
// 最终一致性读(直接从本地读取) public CompletableFuture<byte[]> eventualRead(String key) { return system.stateMachine.read(key); }
// 确保应用到当前commitIndex private CompletableFuture<Void> ensureAppliedUpToCommitIndex() { long current = system.log.getCommitIndex(); long applied = system.stateMachine.getLastApplied();
if (applied >= current) { return CompletableFuture.completedFuture(null); // 已全部应用 }
// 等待应用完成 CompletableFuture<Void> result = new CompletableFuture<>(); scheduler.execute(() -> { try { // 触发应用 system.applyCommittedLogs();
// 检查是否应用完成 if (system.stateMachine.getLastApplied() >= current) { result.complete(null); } else { // 可能有一些延迟,再次检查 scheduler.schedule(() -> { system.applyCommittedLogs(); result.complete(null); }, 50, TimeUnit.MILLISECONDS); } } catch (Exception e) { result.completeExceptionally(e); } });
return result; }
// 清理过期缓存 public void cleanupReadCache() { long now = System.currentTimeMillis(); // 移除过期条目 readCache.entrySet().removeIf(entry -> now - entry.getValue().getTimestamp() > maxCacheAgeMs);
// 如果缓存过大,移除最旧的条目 if (readCache.size() > config.getMaxCacheSize()) { List<String> oldestKeys = readCache.entrySet().stream() .sorted(Comparator.comparingLong(e -> e.getValue().getTimestamp())) .limit(readCache.size() - config.getMaxCacheSize()) .map(Map.Entry::getKey) .collect(Collectors.toList());
for (String key : oldestKeys) { readCache.remove(key); } logger.info("Cache cleanup: removed {} old entries", oldestKeys.size()); } } }
// 内部组件类
// 日志管理 public static class MultiPaxosLog { private final ReadWriteLock logLock = new ReentrantReadWriteLock(); private final ConcurrentNavigableMap<Long, LogEntry> log = new ConcurrentSkipListMap<>(); private final AtomicLong nextInstanceId = new AtomicLong(1); private final AtomicLong commitIndex = new AtomicLong(0); private final Logger logger = LoggerFactory.getLogger(MultiPaxosLog.class);
public LogEntry getEntry(long index) { logLock.readLock().lock(); try { return log.get(index); } finally { logLock.readLock().unlock(); } }
public void setEntry(long index, LogEntry entry) { logLock.writeLock().lock(); try { log.put(index, entry); nextInstanceId.updateAndGet(current -> Math.max(current, index + 1)); } finally { logLock.writeLock().unlock(); } }
public long getNextInstanceId() { return nextInstanceId.getAndIncrement(); }
public long getCommitIndex() { return commitIndex.get(); }
public void updateCommitIndex(long newCommitIndex) { // 原子更新提交索引,确保只增不减 commitIndex.updateAndGet(current -> Math.max(current, newCommitIndex)); }
// 日志压缩 public void compactLogs(long appliedIndex) { // 保留最近的日志,删除旧日志 final int retentionWindow = 1000; // 保留最近1000条 long truncatePoint = appliedIndex - retentionWindow;
if (truncatePoint <= 0) { return; // 不需要压缩 }
logLock.writeLock().lock(); try { List<Long> toRemove = log.keySet().stream() .filter(idx -> idx < truncatePoint) .collect(Collectors.toList());
for (Long idx : toRemove) { log.remove(idx); }
logger.info("Compacted {} log entries before index {}", toRemove.size(), truncatePoint); } finally { logLock.writeLock().unlock(); } } }
// 状态机实现 public static class MultiPaxosStateMachine { private final AtomicLong lastApplied = new AtomicLong(0); private final Map<String, byte[]> keyValueStore = new ConcurrentHashMap<>(); private final Logger logger = LoggerFactory.getLogger(MultiPaxosStateMachine.class);
public CompletableFuture<Void> apply(long instanceId, byte[] command) { return CompletableFuture.runAsync(() -> { try { // 解析命令 Command cmd = deserializeCommand(command);
// 应用到状态机 if (cmd.getType() == CommandType.PUT) { keyValueStore.put(cmd.getKey(), cmd.getValue()); } else if (cmd.getType() == CommandType.DELETE) { keyValueStore.remove(cmd.getKey()); }
// 更新已应用索引 lastApplied.updateAndGet(current -> Math.max(current, instanceId)); } catch (Exception e) { logger.error("Error applying command at instance {}", instanceId, e); throw new CompletionException(e); } }); }
public CompletableFuture<byte[]> read(String key) { return CompletableFuture.supplyAsync(() -> { byte[] value = keyValueStore.get(key); return value != null ? value.clone() : null; // 防御性复制 }); }
public long getLastApplied() { return lastApplied.get(); }
public CompletableFuture<byte[]> takeSnapshot() { return CompletableFuture.supplyAsync(() -> { try { // 创建状态机快照 return serializeState(); } catch (Exception e) { logger.error("Error taking snapshot", e); throw new CompletionException(e); } }); }
public CompletableFuture<Void> restoreSnapshot(byte[] snapshot, long instanceId) { return CompletableFuture.runAsync(() -> { try { // 从快照恢复状态 deserializeState(snapshot);
// 更新已应用索引 lastApplied.set(instanceId); } catch (Exception e) { logger.error("Error restoring snapshot", e); throw new CompletionException(e); } }); }
// 序列化和反序列化辅助方法 private Command deserializeCommand(byte[] data) { // 实际实现应使用正式的序列化机制 return new Command(CommandType.PUT, "key", data); // 简化示例 }
private byte[] serializeState() { // 实际实现应使用正式的序列化机制 return new byte[0]; // 简化示例 }
private void deserializeState(byte[] data) { // 实际实现应使用正式的序列化机制 // 简化示例 } }
// 网络层 public static class MultiPaxosNetworking implements AutoCloseable { private final int nodeId; private final Map<Integer, NodeInfo> nodes; private final NetworkClient client; private final Logger logger = LoggerFactory.getLogger(MultiPaxosNetworking.class);
public MultiPaxosNetworking(int nodeId, Map<Integer, NodeInfo> nodes) { this.nodeId = nodeId; this.nodes = new HashMap<>(nodes); this.client = createNetworkClient(); }
private NetworkClient createNetworkClient() { // 实际实现应创建合适的网络客户端 return new NetworkClientImpl(); }
public CompletableFuture<Map<Long, PrepareResponse>> sendPrepareForAllInstances(int ballot) { // 实现逻辑... return CompletableFuture.completedFuture(new HashMap<>()); }
public CompletableFuture<List<AcceptResponse>> sendAcceptRequests( long instanceId, int ballot, byte[] command) { // 实现逻辑... return CompletableFuture.completedFuture(new ArrayList<>()); }
public CompletableFuture<Integer> sendLeadershipHeartbeats(int ballot) { // 实现逻辑... return CompletableFuture.completedFuture(0); }
public void sendCommitNotifications(long instanceId, int ballot) { // 实现逻辑... }
@Override public void close() { // 关闭网络客户端 } }
// 生成新的ballot,确保全局唯一性 private int generateNewBallot() { // 确保新ballot大于之前的,并且不同节点生成的ballot唯一 int currentBallot = roleManager.getCurrentBallot(); return (currentBallot / config.getTotalNodes() + 1) * config.getTotalNodes() + nodeId; }
// 获取多数派数量 private int getQuorum() { return config.getTotalNodes() / 2 + 1; }
// 日志条目 public static class LogEntry { private int ballot; private final byte[] command; private volatile boolean committed;
LogEntry(int ballot, byte[] command) { this.ballot = ballot; this.command = command.clone(); // 防御性复制 this.committed = false; }
public int getBallot() { return ballot; }
public void setBallot(int ballot) { this.ballot = ballot; }
public byte[] getCommand() { return command.clone(); // 防御性复制 }
public boolean isCommitted() { return committed; }
public void setCommitted(boolean committed) { this.committed = committed; } }
// 配置类 public static class Configuration { private final int totalNodes; private final Map<Integer, NodeInfo> nodes; private final int maxCacheSize;
public Configuration(int totalNodes, Map<Integer, NodeInfo> nodes, int maxCacheSize) { this.totalNodes = totalNodes; this.nodes = new HashMap<>(nodes); this.maxCacheSize = maxCacheSize; }
public int getTotalNodes() { return totalNodes; }
public Map<Integer, NodeInfo> getNodes() { return Collections.unmodifiableMap(nodes); }
public int getMaxCacheSize() { return maxCacheSize; } }
// 节点信息 public static class NodeInfo { private final int id; private final String host; private final int port;
public NodeInfo(int id, String host, int port) { this.id = id; this.host = host; this.port = port; }
public int getId() { return id; }
public String getHost() { return host; }
public int getPort() { return port; } }
// 命令类型 enum CommandType { PUT, DELETE }
// 命令对象 static class Command { private final CommandType type; private final String key; private final byte[] value;
public Command(CommandType type, String key, byte[] value) { this.type = type; this.key = key; this.value = value != null ? value.clone() : null; // 防御性复制 }
public CommandType getType() { return type; }
public String getKey() { return key; }
public byte[] getValue() { return value != null ? value.clone() : null; // 防御性复制 } }
// 响应类 public static class PrepareResponse { // 实现... }
public static class AcceptResponse { private final boolean accepted;
public AcceptResponse(boolean accepted) { this.accepted = accepted; }
public boolean isAccepted() { return accepted; } }
// 一致性级别 public enum ConsistencyLevel { LINEARIZABLE, // 线性一致性 SEQUENTIAL, // 顺序一致性 EVENTUAL // 最终一致性 }
// 异常类 public static class NotLeaderException extends RuntimeException { private final int leaderHint;
public NotLeaderException(String message, int leaderHint) { super(message); this.leaderHint = leaderHint; }
public int getLeaderHint() { return leaderHint; } }
public static class ConsistencyException extends RuntimeException { public ConsistencyException(String message) { super(message); } }
// 简化的网络客户端实现 private static class NetworkClientImpl implements NetworkClient { // 实现网络接口...
@Override public CompletableFuture<Promise> sendPrepare(int nodeId, int ballot) { return null; }
@Override public CompletableFuture<Accepted> sendAccept(int nodeId, int ballot, Object value) { return null; }
@Override public void sendLearn(int nodeId, long instanceId, int ballot, Object value) {
}
@Override public CompletableFuture<Map<Long, PrepareResponse>> sendPrepareAllInstances(int nodeId, int ballot) { return null; }
@Override public CompletableFuture<Void> sendSnapshot(int nodeId, byte[] snapshot, long lastInstanceId) { return null; } }}
复制代码

四、网络分区处理与成员变更

网络分区检测

public class PartitionHandler implements AutoCloseable {    private final String nodeId;    private final AtomicLong lastHeartbeatTime = new AtomicLong(0);    private final AtomicBoolean suspectPartition = new AtomicBoolean(false);    private final ScheduledExecutorService scheduler;    private final long heartbeatTimeoutMs;    private final Consumer<PartitionEvent> partitionCallback;    private final Logger logger = LoggerFactory.getLogger(PartitionHandler.class);
public PartitionHandler(String nodeId, long heartbeatTimeoutMs, Consumer<PartitionEvent> partitionCallback) { this.nodeId = nodeId; this.heartbeatTimeoutMs = heartbeatTimeoutMs; this.partitionCallback = partitionCallback; this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "partition-detector-" + nodeId); t.setDaemon(true); return t; });
// 启动心跳检测任务 scheduler.scheduleAtFixedRate( this::checkHeartbeat, heartbeatTimeoutMs / 2, heartbeatTimeoutMs / 2, TimeUnit.MILLISECONDS ); }
// 记录收到心跳 public void recordHeartbeat() { lastHeartbeatTime.set(System.currentTimeMillis()); if (suspectPartition.compareAndSet(true, false)) { logger.info("Node {} no longer suspects network partition", nodeId); partitionCallback.accept(new PartitionEvent(PartitionStatus.RECOVERED, nodeId)); } }
// 检查心跳超时 private void checkHeartbeat() { try { long now = System.currentTimeMillis(); long last = lastHeartbeatTime.get();
if (last > 0 && now - last > heartbeatTimeoutMs) { // 可能存在网络分区 if (suspectPartition.compareAndSet(false, true)) { logger.warn("Node {} suspects network partition, last heartbeat: {}ms ago", nodeId, now - last);
// 执行分区检测回调 partitionCallback.accept(new PartitionEvent(PartitionStatus.SUSPECTED, nodeId)); } } } catch (Exception e) { logger.error("Error checking heartbeat", e); } }
@Override public void close() { scheduler.shutdownNow(); try { if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) { logger.warn("Partition detector scheduler did not terminate in time"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.warn("Interrupted while shutting down partition detector"); } }
// 分区状态枚举 public enum PartitionStatus { SUSPECTED, // 怀疑发生分区 CONFIRMED, // 确认发生分区 RECOVERED // 分区已恢复 }
// 分区事件类 public static class PartitionEvent { private final PartitionStatus status; private final String nodeId;
public PartitionEvent(PartitionStatus status, String nodeId) { this.status = status; this.nodeId = nodeId; }
public PartitionStatus getStatus() { return status; }
public String getNodeId() { return nodeId; } }}
复制代码

成员变更实现

public class MembershipManager implements AutoCloseable {    private final ConcurrentMap<String, ServerInfo> servers = new ConcurrentHashMap<>();    private volatile Configuration currentConfig;    private final AtomicLong configVersion = new AtomicLong(0);    private final String nodeId;    private final AtomicBoolean isLeader = new AtomicBoolean(false);    private final Logger logger = LoggerFactory.getLogger(MembershipManager.class);    private final ConfigurationStore configStore;    private final NetworkClient networkClient;    private final StampedLock configLock = new StampedLock();
public MembershipManager(String nodeId, boolean isLeader, ConfigurationStore configStore, NetworkClient networkClient) { this.nodeId = nodeId; this.isLeader.set(isLeader); this.configStore = configStore; this.networkClient = networkClient;
// 初始化配置 try { this.currentConfig = configStore.loadConfiguration(); if (this.currentConfig == null) { this.currentConfig = new Configuration(configVersion.get(), new HashMap<>()); } servers.putAll(currentConfig.getServers()); } catch (IOException e) { logger.error("Failed to load configuration", e); this.currentConfig = new Configuration(configVersion.get(), new HashMap<>()); } }
// 两阶段成员变更 - 安全添加节点 public CompletableFuture<Boolean> addServer(String serverId, String address, int port) { MDC.put("component", "membership-manager"); MDC.put("nodeId", nodeId); MDC.put("targetServerId", serverId);
if (!isLeader.get()) { logger.warn("Only leader can change membership"); MDC.remove("component"); MDC.remove("nodeId"); MDC.remove("targetServerId"); return CompletableFuture.failedFuture( new IllegalStateException("Only leader can change membership")); }
CompletableFuture<Boolean> result = new CompletableFuture<>();
CompletableFuture.runAsync(() -> { long stamp = configLock.writeLock(); try { logger.info("Starting server addition: {}", serverId);
// 第一阶段:创建过渡配置(包含新旧所有节点) Configuration oldConfig = currentConfig; Configuration jointConfig = createJointConfig(oldConfig, serverId, address, port);
// 将过渡配置提交给集群 commitConfiguration(jointConfig).thenAccept(committed -> { if (!committed) { logger.warn("Failed to commit joint configuration for server {}", serverId); result.complete(false); return; }
logger.info("Joint configuration committed, proceeding to second phase");
// 第二阶段:创建新配置(确认包含新节点) Configuration newConfig = createNewConfig(jointConfig);
// 将新配置提交给集群 commitConfiguration(newConfig).thenAccept(finalCommitted -> { if (finalCommitted) { logger.info("Server {} successfully added to cluster", serverId); } else { logger.warn("Failed to commit final configuration for server {}", serverId); } result.complete(finalCommitted); }).exceptionally(e -> { logger.error("Error committing final configuration for server {}", serverId, e); result.completeExceptionally(e); return null; }); }).exceptionally(e -> { logger.error("Error committing joint configuration for server {}", serverId, e); result.completeExceptionally(e); return null; }); } catch (Exception e) { logger.error("Error adding server {}", serverId, e); result.completeExceptionally(e); } finally { configLock.unlockWrite(stamp); MDC.remove("component"); MDC.remove("nodeId"); MDC.remove("targetServerId"); } });
return result; }
// 两阶段成员变更 - 安全移除节点 public CompletableFuture<Boolean> removeServer(String serverId) { MDC.put("component", "membership-manager"); MDC.put("nodeId", nodeId); MDC.put("targetServerId", serverId);
if (!isLeader.get()) { logger.warn("Only leader can change membership"); MDC.remove("component"); MDC.remove("nodeId"); MDC.remove("targetServerId"); return CompletableFuture.failedFuture( new IllegalStateException("Only leader can change membership")); }
if (!servers.containsKey(serverId)) { logger.warn("Server {} not found in configuration", serverId); MDC.remove("component"); MDC.remove("nodeId"); MDC.remove("targetServerId"); return CompletableFuture.completedFuture(false); }
CompletableFuture<Boolean> result = new CompletableFuture<>();
CompletableFuture.runAsync(() -> { long stamp = configLock.writeLock(); try { logger.info("Starting server removal: {}", serverId);
// 第一阶段:创建过渡配置(标记要移除的节点) Configuration oldConfig = currentConfig; Configuration jointConfig = createJointConfig(oldConfig, serverId);
// 将过渡配置提交给集群 commitConfiguration(jointConfig).thenAccept(committed -> { if (!committed) { logger.warn("Failed to commit joint configuration for removing server {}", serverId); result.complete(false); return; }
logger.info("Joint configuration committed, proceeding to second phase");
// 第二阶段:创建新配置(移除目标节点) Configuration newConfig = createNewConfigWithout(jointConfig, serverId);
// 将新配置提交给集群 commitConfiguration(newConfig).thenAccept(finalCommitted -> { if (finalCommitted) { logger.info("Server {} successfully removed from cluster", serverId); } else { logger.warn("Failed to commit final configuration for removing server {}", serverId); } result.complete(finalCommitted); }).exceptionally(e -> { logger.error("Error committing final configuration for removing server {}", serverId, e); result.completeExceptionally(e); return null; }); }).exceptionally(e -> { logger.error("Error committing joint configuration for removing server {}", serverId, e); result.completeExceptionally(e); return null; }); } catch (Exception e) { logger.error("Error removing server {}", serverId, e); result.completeExceptionally(e); } finally { configLock.unlockWrite(stamp); MDC.remove("component"); MDC.remove("nodeId"); MDC.remove("targetServerId"); } });
return result; }
// 创建过渡配置(添加节点) private Configuration createJointConfig(Configuration oldConfig, String newServerId, String address, int port) { Map<String, ServerInfo> newServers = new HashMap<>(oldConfig.getServers()); newServers.put(newServerId, new ServerInfo(newServerId, address, port));
return new Configuration(configVersion.incrementAndGet(), newServers); }
// 创建过渡配置(删除节点) private Configuration createJointConfig(Configuration oldConfig, String serverId) { // 标记要删除的节点(在过渡配置中仍存在,但标记为待移除) Map<String, ServerInfo> jointServers = new HashMap<>(oldConfig.getServers()); ServerInfo serverInfo = jointServers.get(serverId); if (serverInfo != null) { ServerInfo markedServer = new ServerInfo( serverId, serverInfo.getAddress(), serverInfo.getPort(), true); jointServers.put(serverId, markedServer); }
return new Configuration(configVersion.incrementAndGet(), jointServers); }
// 创建新配置(确认添加节点) private Configuration createNewConfig(Configuration jointConfig) { // 最终配置,清除所有标记 Map<String, ServerInfo> newServers = new HashMap<>(); for (var entry : jointConfig.getServers().entrySet()) { if (!entry.getValue().isMarkedForRemoval()) { newServers.put(entry.getKey(), new ServerInfo( entry.getValue().getId(), entry.getValue().getAddress(), entry.getValue().getPort(), false )); } }
return new Configuration(configVersion.incrementAndGet(), newServers); }
// 创建新配置(确认删除节点) private Configuration createNewConfigWithout(Configuration jointConfig, String serverId) { Map<String, ServerInfo> newServers = new HashMap<>(); for (var entry : jointConfig.getServers().entrySet()) { if (!entry.getKey().equals(serverId) && !entry.getValue().isMarkedForRemoval()) { newServers.put(entry.getKey(), new ServerInfo( entry.getValue().getId(), entry.getValue().getAddress(), entry.getValue().getPort(), false )); } }
return new Configuration(configVersion.incrementAndGet(), newServers); }
// 提交配置变更 private CompletableFuture<Boolean> commitConfiguration(Configuration config) { return CompletableFuture.supplyAsync(() -> { try { // 实际实现会通过共识算法提交配置变更 logger.info("Committing configuration version {}", config.getVersion());
// 持久化配置 configStore.saveConfiguration(config);
// 更新本地配置 synchronized (this) { currentConfig = config; servers.clear(); servers.putAll(config.getServers()); }
// 广播配置变更 broadcastConfigChange(config);
return true; } catch (Exception e) { logger.error("Error committing configuration", e); return false; } }); }
// 广播配置变更 private void broadcastConfigChange(Configuration config) { // 向所有节点广播配置变更 for (String serverId : servers.keySet()) { if (!serverId.equals(nodeId)) { CompletableFuture.runAsync(() -> { try { // 实际实现中调用网络客户端发送配置 notifyConfigChange(serverId, config); } catch (Exception e) { logger.error("Failed to notify server {} of config change", serverId, e); } }); } } }
// 通知节点配置变更 private void notifyConfigChange(String serverId, Configuration config) { // 实际实现会发送配置给指定节点 logger.debug("Notifying server {} of configuration change to version {}", serverId, config.getVersion()); }
// 处理接收到的配置变更 public void handleConfigChange(Configuration newConfig) { long stamp = configLock.writeLock(); try { if (newConfig.getVersion() > currentConfig.getVersion()) { try { // 持久化新配置 configStore.saveConfiguration(newConfig);
// 更新本地配置 currentConfig = newConfig; servers.clear(); servers.putAll(newConfig.getServers());
logger.info("Updated to new configuration version {}", newConfig.getVersion()); } catch (IOException e) { logger.error("Failed to persist new configuration", e); } } else { logger.debug("Ignoring old configuration version {} (current is {})", newConfig.getVersion(), currentConfig.getVersion()); } } finally { configLock.unlockWrite(stamp); } }
// 获取当前配置 public Configuration getCurrentConfig() { long stamp = configLock.tryOptimisticRead(); Configuration config = currentConfig;
if (!configLock.validate(stamp)) { stamp = configLock.readLock(); try { config = currentConfig; } finally { configLock.unlockRead(stamp); } }
return config; }
// 检查节点是否在配置中(不包括标记为移除的节点) public boolean isServerInConfig(String serverId) { ServerInfo info = servers.get(serverId); return info != null && !info.isMarkedForRemoval(); }
// 获取有效服务器数量(不包括标记为移除的节点) public int getActiveServerCount() { return (int) servers.values().stream() .filter(s -> !s.isMarkedForRemoval()) .count(); }
// 设置Leader状态 public void setLeader(boolean isLeader) { this.isLeader.set(isLeader); }
@Override public void close() { // 释放资源 }
// 配置类 public static class Configuration implements Serializable { private static final long serialVersionUID = 1L;
private final long version; private final Map<String, ServerInfo> servers;
public Configuration(long version, Map<String, ServerInfo> servers) { this.version = version; this.servers = new HashMap<>(servers); }
public long getVersion() { return version; }
public Map<String, ServerInfo> getServers() { return Collections.unmodifiableMap(servers); } }
// 服务器信息 public static class ServerInfo implements Serializable { private static final long serialVersionUID = 1L;
private final String id; private final String address; private final int port; private final boolean markedForRemoval;
public ServerInfo(String id, String address, int port) { this(id, address, port, false); }
public ServerInfo(String id, String address, int port, boolean markedForRemoval) { this.id = id; this.address = address; this.port = port; this.markedForRemoval = markedForRemoval; }
public String getId() { return id; }
public String getAddress() { return address; }
public int getPort() { return port; }
public boolean isMarkedForRemoval() { return markedForRemoval; } }}
复制代码

配置存储实现

public class FileBasedConfigurationStore implements ConfigurationStore {    private final Path configPath;    private final Path snapshotDir;    private final Logger logger = LoggerFactory.getLogger(FileBasedConfigurationStore.class);
public FileBasedConfigurationStore(Path configPath, Path snapshotDir) { this.configPath = configPath; this.snapshotDir = snapshotDir;
try { Files.createDirectories(configPath.getParent()); Files.createDirectories(snapshotDir); } catch (IOException e) { logger.error("Failed to create directories", e); throw new UncheckedIOException("Failed to create directories", e); } }
@Override public void saveConfiguration(MembershipManager.Configuration config) throws IOException { // 使用原子写入保证一致性 Path tempPath = configPath.resolveSibling(configPath.getFileName() + ".tmp"); try (ObjectOutputStream oos = new ObjectOutputStream( new BufferedOutputStream(Files.newOutputStream(tempPath)))) { oos.writeObject(config); oos.flush(); Files.move(tempPath, configPath, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
logger.info("Configuration version {} saved successfully", config.getVersion()); } catch (IOException e) { logger.error("Failed to save configuration", e); throw e; } }
@Override public MembershipManager.Configuration loadConfiguration() throws IOException { if (!Files.exists(configPath)) { logger.info("Configuration file does not exist: {}", configPath); return null; }
try (ObjectInputStream ois = new ObjectInputStream( new BufferedInputStream(Files.newInputStream(configPath)))) { MembershipManager.Configuration config = (MembershipManager.Configuration) ois.readObject(); logger.info("Loaded configuration version {}", config.getVersion()); return config; } catch (ClassNotFoundException e) { logger.error("Failed to deserialize configuration", e); throw new IOException("Failed to deserialize configuration", e); } }
@Override public void saveSnapshot(long index, byte[] data) throws IOException { // 创建快照文件名,包含索引 String snapshotFileName = String.format("snapshot-%020d.bin", index); Path snapshotPath = snapshotDir.resolve(snapshotFileName); Path tempPath = snapshotDir.resolve(snapshotFileName + ".tmp");
try { // 写入临时文件 Files.write(tempPath, data);
// 原子移动 Files.move(tempPath, snapshotPath, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
logger.info("Snapshot at index {} saved successfully, size: {} bytes", index, data.length);
// 清理旧快照,保留最近的5个 cleanupOldSnapshots(5); } catch (IOException e) { logger.error("Failed to save snapshot at index {}", index, e); throw e; } }
@Override public SnapshotInfo loadLatestSnapshot() throws IOException { try { // 查找最新的快照文件 Optional<Path> latestSnapshot = Files.list(snapshotDir) .filter(p -> p.getFileName().toString().startsWith("snapshot-") && p.getFileName().toString().endsWith(".bin")) .max(Comparator.comparing(p -> p.getFileName().toString()));
if (latestSnapshot.isPresent()) { Path snapshotPath = latestSnapshot.get(); String fileName = snapshotPath.getFileName().toString();
// 从文件名中提取索引 long index = Long.parseLong(fileName.substring(9, 29));
// 读取快照数据 byte[] data = Files.readAllBytes(snapshotPath);
logger.info("Loaded snapshot at index {}, size: {} bytes", index, data.length); return new SnapshotInfo(index, data); } else { logger.info("No snapshot found in directory: {}", snapshotDir); return null; } } catch (IOException e) { logger.error("Failed to load latest snapshot", e); throw e; } }
// 清理旧快照,只保留最新的n个 private void cleanupOldSnapshots(int keepCount) throws IOException { try { List<Path> snapshots = Files.list(snapshotDir) .filter(p -> p.getFileName().toString().startsWith("snapshot-") && p.getFileName().toString().endsWith(".bin")) .sorted(Comparator.comparing(p -> p.getFileName().toString())) .collect(Collectors.toList());
// 如果快照数量超过保留数量,删除旧的 if (snapshots.size() > keepCount) { int toDelete = snapshots.size() - keepCount; for (int i = 0; i < toDelete; i++) { Files.delete(snapshots.get(i)); logger.info("Deleted old snapshot: {}", snapshots.get(i).getFileName()); } } } catch (IOException e) { logger.error("Failed to cleanup old snapshots", e); throw e; } }
// 快照信息类 public static class SnapshotInfo { private final long index; private final byte[] data;
public SnapshotInfo(long index, byte[] data) { this.index = index; this.data = data.clone(); // 防御性复制 }
public long getIndex() { return index; }
public byte[] getData() { return data.clone(); // 防御性复制 } }}
// 配置存储接口public interface ConfigurationStore { void saveConfiguration(MembershipManager.Configuration config) throws IOException; MembershipManager.Configuration loadConfiguration() throws IOException; void saveSnapshot(long index, byte[] data) throws IOException; SnapshotInfo loadLatestSnapshot() throws IOException;
// 快照信息内部类定义 class SnapshotInfo { private final long index; private final byte[] data;
public SnapshotInfo(long index, byte[] data) { this.index = index; this.data = data.clone(); }
public long getIndex() { return index; }
public byte[] getData() { return data.clone(); } }}
复制代码

跨数据中心复制支持

public class CrossDCReplication implements AutoCloseable {    private final String localDC;    private final List<String> allDCs;    private final Map<String, DCConnection> dcConnections;    private final ConsensusSystem localSystem;    private final Logger logger = LoggerFactory.getLogger(CrossDCReplication.class);    private final ScheduledExecutorService scheduler;    private final AtomicLong replicationIndex = new AtomicLong(0);    private final ConcurrentMap<String, AtomicLong> dcReplicationProgress = new ConcurrentHashMap<>();
public CrossDCReplication(String localDC, List<String> allDCs, ConsensusSystem localSystem, Map<String, DCConnectionConfig> dcConfigs) { this.localDC = localDC; this.allDCs = new ArrayList<>(allDCs); this.localSystem = localSystem; this.dcConnections = new HashMap<>();
// 初始化数据中心连接 for (String dc : allDCs) { if (!dc.equals(localDC)) { DCConnectionConfig config = dcConfigs.get(dc); if (config != null) { dcConnections.put(dc, new DCConnection(dc, config)); dcReplicationProgress.put(dc, new AtomicLong(0)); } } }
this.scheduler = Executors.newScheduledThreadPool(2, r -> { Thread t = new Thread(r, "dc-replication-scheduler"); t.setDaemon(true); return t; });
// 启动定期复制任务 scheduler.scheduleWithFixedDelay( this::replicateChanges, 1000, 1000, TimeUnit.MILLISECONDS );
// 启动健康检查任务 scheduler.scheduleWithFixedDelay( this::checkDCHealth, 5000, 5000, TimeUnit.MILLISECONDS ); }
// 复制请求到其他数据中心 public CompletableFuture<Boolean> replicateRequest(Request request) { MDC.put("component", "cross-dc-replication"); MDC.put("requestId", request.getId());
try { // 1. 首先在本地DC处理请求 return localSystem.processWrite(request) .thenCompose(localSuccess -> { if (!localSuccess) { logger.warn("Request {} failed in local DC", request.getId()); return CompletableFuture.completedFuture(false); }
// 2. 如果本地成功,更新复制索引 long index = replicationIndex.incrementAndGet();
// 3. 异步复制到其他数据中心 List<CompletableFuture<Boolean>> dcFutures = new ArrayList<>();
for (var entry : dcConnections.entrySet()) { String dc = entry.getKey(); DCConnection connection = entry.getValue();
dcFutures.add(connection.replicateRequest(request, index) .thenApply(success -> { if (success) { // 更新复制进度 dcReplicationProgress.get(dc).updateAndGet( current -> Math.max(current, index)); logger.info("Request {} successfully replicated to DC {}", request.getId(), dc); } else { logger.warn("Failed to replicate request {} to DC {}", request.getId(), dc); } return success; }) .exceptionally(e -> { logger.error("Error replicating request {} to DC {}", request.getId(), dc, e); return false; })); }
// 4. 等待所有DC的响应,基于配置的复制策略 return handleDCReplications(dcFutures); }); } finally { MDC.remove("component"); MDC.remove("requestId"); } }
// 根据复制策略处理跨DC复制结果 private CompletableFuture<Boolean> handleDCReplications( List<CompletableFuture<Boolean>> dcFutures) {
ReplicationStrategy strategy = ReplicationStrategy.QUORUM; // 可配置
switch (strategy) { case ALL: // 所有DC都必须成功 return CompletableFuture.allOf( dcFutures.toArray(new CompletableFuture[0])) .thenApply(v -> dcFutures.stream() .allMatch(f -> { try { return f.get(); } catch (Exception e) { return false; } }));
case QUORUM: // 多数DC必须成功 return CompletableFuture.supplyAsync(() -> { int successCount = 0; int requiredSuccesses = (dcFutures.size() / 2) + 1;
for (CompletableFuture<Boolean> future : dcFutures) { try { if (future.get(5, TimeUnit.SECONDS)) { successCount++; if (successCount >= requiredSuccesses) { return true; } } } catch (Exception e) { logger.warn("Error waiting for DC replication", e); } }
return successCount >= requiredSuccesses; });
case ANY: // 至少一个DC成功 return CompletableFuture.supplyAsync(() -> { for (CompletableFuture<Boolean> future : dcFutures) { try { if (future.get(5, TimeUnit.SECONDS)) { return true; } } catch (Exception e) { logger.warn("Error waiting for DC replication", e); } }
return false; });
case ASYNC: // 异步复制,不等待结果 return CompletableFuture.completedFuture(true);
default: logger.warn("Unknown replication strategy: {}, using QUORUM", strategy); return CompletableFuture.supplyAsync(() -> { int successCount = 0; int requiredSuccesses = (dcFutures.size() / 2) + 1;
for (CompletableFuture<Boolean> future : dcFutures) { try { if (future.get(5, TimeUnit.SECONDS)) { successCount++; if (successCount >= requiredSuccesses) { return true; } } } catch (Exception e) { logger.warn("Error waiting for DC replication", e); } }
return successCount >= requiredSuccesses; }); } }
// 定期同步数据中心之间的变更 private void replicateChanges() { try { // 获取当前复制进度 Map<String, Long> progress = new HashMap<>(); for (var entry : dcReplicationProgress.entrySet()) { progress.put(entry.getKey(), entry.getValue().get()); }
// 对每个DC,复制尚未同步的变更 for (var entry : dcConnections.entrySet()) { String dc = entry.getKey(); DCConnection connection = entry.getValue(); long currentProgress = progress.get(dc);
if (currentProgress < replicationIndex.get()) { // 查找需要复制的变更 List<ReplicationEntry> changes = getChangesSince(currentProgress, replicationIndex.get());
if (!changes.isEmpty()) { connection.replicateChanges(changes) .thenAccept(lastIndex -> { if (lastIndex > currentProgress) { // 更新复制进度 dcReplicationProgress.get(dc).updateAndGet( current -> Math.max(current, lastIndex)); logger.info("Replicated changes to DC {} up to index {}", dc, lastIndex); } }) .exceptionally(e -> { logger.error("Failed to replicate changes to DC {}", dc, e); return null; }); } } } } catch (Exception e) { logger.error("Error in replication task", e); } }
// 检查数据中心健康状态 private void checkDCHealth() { for (var entry : dcConnections.entrySet()) { String dc = entry.getKey(); DCConnection connection = entry.getValue();
connection.checkHealth() .thenAccept(healthy -> { if (healthy) { if (connection.markHealthy()) { logger.info("DC {} is now healthy", dc); } } else { if (connection.markUnhealthy()) { logger.warn("DC {} is now unhealthy", dc); } } }) .exceptionally(e -> { logger.error("Error checking health of DC {}", dc, e); connection.markUnhealthy(); return null; }); } }
// 获取指定范围内的变更 private List<ReplicationEntry> getChangesSince(long fromIndex, long toIndex) { // 实际实现应从日志存储中检索变更 List<ReplicationEntry> changes = new ArrayList<>();
// 简化示例 for (long i = fromIndex + 1; i <= toIndex; i++) { // 模拟获取变更 changes.add(new ReplicationEntry(i, null)); }
return changes; }
@Override public void close() { // 关闭调度器 scheduler.shutdownNow(); try { if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { logger.warn("Scheduler did not terminate in time"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.warn("Interrupted while waiting for scheduler termination"); }
// 关闭所有DC连接 for (DCConnection connection : dcConnections.values()) { connection.close(); } }
// 数据中心连接类 private class DCConnection implements AutoCloseable { private final String dcId; private final DCConnectionConfig config; private final AtomicBoolean healthy = new AtomicBoolean(true); private final NetworkClient networkClient;
public DCConnection(String dcId, DCConnectionConfig config) { this.dcId = dcId; this.config = config; this.networkClient = createNetworkClient(); }
private NetworkClient createNetworkClient() { // 创建用于跨DC通信的网络客户端 // 简化示例 return null; }
public CompletableFuture<Boolean> replicateRequest(Request request, long index) { if (!healthy.get()) { return CompletableFuture.completedFuture(false); }
// 实际实现中,将请求发送到目标DC return CompletableFuture.completedFuture(true); }
public CompletableFuture<Long> replicateChanges(List<ReplicationEntry> changes) { if (!healthy.get() || changes.isEmpty()) { return CompletableFuture.completedFuture(0L); }
// 实际实现中,将变更批量发送到目标DC long lastIndex = changes.get(changes.size() - 1).getIndex(); return CompletableFuture.completedFuture(lastIndex); }
public CompletableFuture<Boolean> checkHealth() { // 实际实现中,执行健康检查 return CompletableFuture.completedFuture(true); }
public boolean markHealthy() { return healthy.compareAndSet(false, true); }
public boolean markUnhealthy() { return healthy.compareAndSet(true, false); }
@Override public void close() { // 关闭网络客户端 } }
// 复制条目 private static class ReplicationEntry { private final long index; private final byte[] data;
public ReplicationEntry(long index, byte[] data) { this.index = index; this.data = data != null ? data.clone() : null; }
public long getIndex() { return index; }
public byte[] getData() { return data != null ? data.clone() : null; } }
// 数据中心连接配置 public static class DCConnectionConfig { private final String primaryEndpoint; private final List<String> backupEndpoints; private final int connectTimeoutMs; private final int readTimeoutMs;
public DCConnectionConfig(String primaryEndpoint, List<String> backupEndpoints, int connectTimeoutMs, int readTimeoutMs) { this.primaryEndpoint = primaryEndpoint; this.backupEndpoints = new ArrayList<>(backupEndpoints); this.connectTimeoutMs = connectTimeoutMs; this.readTimeoutMs = readTimeoutMs; }
// Getters... }
// 复制策略 public enum ReplicationStrategy { ALL, // 所有DC必须成功 QUORUM, // 多数DC必须成功 ANY, // 至少一个DC成功 ASYNC // 异步复制,不等待结果 }}
复制代码

五、ZAB 与 Paxos 的联系与区别

联系


两者共同点:


  1. 多数派机制:都需要超过半数节点的确认以保证安全性,防止脑裂

  2. 阶段性操作:都分为准备和提交/接受两个主要阶段来达成共识

  3. 安全性保证:在任何情况下都不会出现数据不一致的状态

  4. 容错能力:都能在部分节点失败的情况下继续工作

  5. 对网络分区的处理:在网络分区情况下保证安全性,宁可停止服务也不破坏一致性

区别


关键区别:


  1. 设计目标

  2. ZAB:专为 ZooKeeper 设计的状态机复制协议,强调系统整体的复制和顺序性

  3. Paxos:通用的分布式共识算法,关注对单一值的决议过程

  4. 主从关系

  5. ZAB:明确的 Leader-Follower 架构,强调中心化处理

  6. Basic Paxos:原始设计中角色对称,没有固定 Leader

  7. Multi-Paxos:引入了 Leader 优化,但在理论上保持角色对称性

  8. 消息顺序

  9. ZAB:保证 FIFO 严格顺序处理,使用 ZXID(epoch + counter)保证全局顺序

  10. Basic Paxos:不保证顺序,只关注单值共识

  11. Multi-Paxos:可以通过实例 ID 保证顺序,但需要额外机制

  12. 恢复机制

  13. ZAB:有专门的崩溃恢复模式,包括选举、发现、同步和激活等阶段

  14. Paxos:通过常规算法流程处理崩溃恢复,没有特殊的恢复模式

  15. 事务标识

  16. ZAB:使用 ZXID(epoch + counter)作为全局唯一标识

  17. Paxos:使用提案编号(ballot number)和实例 ID 分别标识提案和位置

六、性能对比与工程实践

性能对比


横向可扩展性

随着集群规模增加,性能变化情况:


JVM 调优建议

/** * 推荐的JVM参数设置: * -Xms4g -Xmx4g                  // 固定堆大小避免动态调整 * -XX:+UseG1GC                   // 使用G1垃圾收集器 * -XX:MaxGCPauseMillis=200       // 最大GC暂停时间 * -XX:InitiatingHeapOccupancyPercent=45  // GC启动阈值 * -XX:+AlwaysPreTouch            // 预分配内存页 * -XX:+DisableExplicitGC         // 禁用显式GC调用 * -XX:+HeapDumpOnOutOfMemoryError // OOM时生成堆转储 * -XX:HeapDumpPath=/path/to/dumps // 堆转储路径 * -XX:+UseCompressedOops         // 使用压缩指针 * -XX:+UseCompressedClassPointers // 使用压缩类指针 * -Djava.net.preferIPv4Stack=true // 优先使用IPv4 */
复制代码

选型决策

工程实践最佳建议

  1. 合理设置超时参数

  2. 过短的超时会导致不必要的选举

  3. 过长的超时会增加故障恢复时间

  4. 建议根据网络环境动态调整

  5. 批处理请求

  6. 合并多个写请求为一个批次

  7. 减少网络往返次数

  8. 提高整体吞吐量

  9. 读写分离

  10. 写请求经过 Leader

  11. 读请求可以在本地处理(根据一致性需求)

  12. 使用读取缓存减少磁盘 IO

  13. 监控关键指标

  14. 提交延迟

  15. Leader 切换频率

  16. 请求排队深度

  17. 网络延迟和带宽使用

  18. 预防性维护

  19. 定期压缩日志

  20. 创建快照

  21. 监控磁盘空间

  22. 进行故障演练测试恢复流程

七、单元测试示例

@RunWith(MockitoJUnitRunner.class)public class ZABBroadcastTest {    private ZABBroadcast zabBroadcast;    private AtomicLong zxid;    private AtomicInteger epoch;
@Mock private NetworkClient mockNetworkClient;
@Mock private StateMachine mockStateMachine;
@Before public void setUp() { zxid = new AtomicLong(0); epoch = new AtomicInteger(0);
zabBroadcast = new ZABBroadcast("server1", zxid, epoch, mockNetworkClient, mockStateMachine);
// 添加follower ServerData follower1 = new ServerData("server2", "localhost", 8001); ServerData follower2 = new ServerData("server3", "localhost", 8002); zabBroadcast.addFollower(follower1); zabBroadcast.addFollower(follower2); }
@After public void tearDown() { zabBroadcast.close(); }
@Test public void testProcessWriteSuccess() throws Exception { // 准备模拟对象行为 ACK successAck = new ACK(true, zxid.get() + 1); when(mockNetworkClient.sendProposal(anyString(), any(ProposalPacket.class))) .thenReturn(successAck);
// 执行测试 Request request = new Request("req1", "test data".getBytes()); CompletableFuture<Boolean> result = zabBroadcast.processWrite(request);
// 验证结果 assertTrue(result.get(1, TimeUnit.SECONDS));
// 验证交互 verify(mockNetworkClient, times(2)).sendProposal(anyString(), any(ProposalPacket.class)); verify(mockNetworkClient, times(2)).sendCommit(anyString(), any(CommitPacket.class)); }
@Test public void testProcessWriteFailure() throws Exception { // 准备模拟对象行为 - 一个成功,一个失败 when(mockNetworkClient.sendProposal(eq("server2"), any(ProposalPacket.class))) .thenReturn(new ACK(true, zxid.get() + 1)); when(mockNetworkClient.sendProposal(eq("server3"), any(ProposalPacket.class))) .thenReturn(new ACK(false, zxid.get()));
// 执行测试 Request request = new Request("req1", "test data".getBytes()); CompletableFuture<Boolean> result = zabBroadcast.processWrite(request);
// 验证结果 - 应该失败,因为没有多数派确认 assertFalse(result.get(1, TimeUnit.SECONDS));
// 验证交互 - 不应该发送commit verify(mockNetworkClient, times(2)).sendProposal(anyString(), any(ProposalPacket.class)); verify(mockNetworkClient, never()).sendCommit(anyString(), any(CommitPacket.class)); }
@Test public void testBatchWritePerformance() throws Exception { // 准备模拟对象行为 ACK successAck = new ACK(true, zxid.get() + 1); when(mockNetworkClient.sendProposal(anyString(), any(ProposalPacket.class))) .thenReturn(successAck);
// 准备批处理请求 List<Request> requests = new ArrayList<>(); for (int i = 0; i < 100; i++) { requests.add(new Request("req" + i, ("data" + i).getBytes())); }
// 执行测试 Stopwatch stopwatch = Stopwatch.createStarted(); CompletableFuture<Map<String, Boolean>> result = zabBroadcast.processBatchWrite(requests); Map<String, Boolean> results = result.get(5, TimeUnit.SECONDS); stopwatch.stop();
// 验证结果 assertEquals(100, results.size()); assertTrue(results.values().stream().allMatch(v -> v));
// 打印性能数据 System.out.println("Batch write of 100 requests took " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms");
// 验证交互 - 只应该有一次网络往返 verify(mockNetworkClient, times(2)).sendProposal(anyString(), any(ProposalPacket.class)); verify(mockNetworkClient, times(2)).sendCommit(anyString(), any(CommitPacket.class)); }
@Test public void testCircuitBreakerTrip() throws Exception { // 准备模拟对象行为 - 总是失败 when(mockNetworkClient.sendProposal(anyString(), any(ProposalPacket.class))) .thenReturn(new ACK(false, zxid.get()));
// 执行多次请求,触发断路器 Request request = new Request("req1", "test data".getBytes()); for (int i = 0; i < 5; i++) { try { CompletableFuture<Boolean> result = zabBroadcast.processWrite(request); result.get(1, TimeUnit.SECONDS); } catch (Exception e) { // 忽略预期中的异常 } }
// 执行第6次请求,应该直接被断路器拒绝 try { CompletableFuture<Boolean> result = zabBroadcast.processWrite(request); result.get(1, TimeUnit.SECONDS); fail("Should have thrown CircuitBreakerOpenException"); } catch (ExecutionException e) { assertTrue(e.getCause() instanceof ProcessingException); assertTrue(e.getCause().getCause() instanceof ZABBroadcast.CircuitBreakerOpenException); } }
@Test public void testReadWithConsistencyLevels() throws Exception { // 测试不同一致性级别的读取 when(mockNetworkClient.sendHeartbeat(anyString(), anyLong())) .thenReturn();
// 执行线性一致性读取 CompletableFuture<Result> linearResult = zabBroadcast.readWithConsistency("testKey", ConsistencyLevel.LINEARIZABLE);
// 执行顺序一致性读取 CompletableFuture<Result> sequentialResult = zabBroadcast.readWithConsistency("testKey", ConsistencyLevel.SEQUENTIAL);
// 执行最终一致性读取 CompletableFuture<Result> eventualResult = zabBroadcast.readWithConsistency("testKey", ConsistencyLevel.EVENTUAL);
// 验证所有请求都成功完成 assertNotNull(linearResult.get(1, TimeUnit.SECONDS)); assertNotNull(sequentialResult.get(1, TimeUnit.SECONDS)); assertNotNull(eventualResult.get(1, TimeUnit.SECONDS)); }}
复制代码

八、客户端 API 示例

public class DistributedSystemClient implements AutoCloseable {    private final ZabClient zabClient;    private final PaxosClient paxosClient;    private final Logger logger = LoggerFactory.getLogger(DistributedSystemClient.class);
public DistributedSystemClient(String zkConnectString, String paxosConnectString) { this.zabClient = new ZabClient(zkConnectString); this.paxosClient = new PaxosClient(paxosConnectString); }
// ZAB客户端示例 - 配置服务 public class ZabClient implements AutoCloseable { private final String connectString; private final CuratorFramework client;
public ZabClient(String connectString) { this.connectString = connectString; this.client = CuratorFramework.builder() .connectString(connectString) .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .build(); this.client.start(); }
// 存储配置 public void storeConfig(String path, String data) throws Exception { try { // 检查路径是否存在 if (client.checkExists().forPath(path) == null) { client.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .forPath(path, data.getBytes(StandardCharsets.UTF_8)); logger.info("Created config at path: {}", path); } else { client.setData() .forPath(path, data.getBytes(StandardCharsets.UTF_8)); logger.info("Updated config at path: {}", path); } } catch (Exception e) { logger.error("Failed to store config at path: {}", path, e); throw e; } }
// 读取配置 public String getConfig(String path) throws Exception { try { byte[] data = client.getData().forPath(path); return new String(data, StandardCharsets.UTF_8); } catch (Exception e) { logger.error("Failed to read config from path: {}", path, e); throw e; } }
// 监听配置变化 public void watchConfig(String path, Consumer<String> changeCallback) throws Exception { try { // 设置监听器 client.getData().usingWatcher(new CuratorWatcher() { @Override public void process(WatchedEvent event) throws Exception { if (event.getType() == EventType.NodeDataChanged) { String newData = getConfig(path); changeCallback.accept(newData); // 重新设置监听 watchConfig(path, changeCallback); } } }).forPath(path);
logger.info("Set watch on path: {}", path); } catch (Exception e) { logger.error("Failed to set watch on path: {}", path, e); throw e; } }
// 分布式锁 public DistributedLock getLock(String lockPath) { return new DistributedLock(client, lockPath); }
@Override public void close() { client.close(); }
// 分布式锁实现 public class DistributedLock { private final InterProcessMutex mutex; private final String lockPath;
public DistributedLock(CuratorFramework client, String lockPath) { this.lockPath = lockPath; this.mutex = new InterProcessMutex(client, lockPath); }
public void lock(long timeout, TimeUnit unit) throws Exception { if (mutex.acquire(timeout, unit)) { logger.info("Acquired lock: {}", lockPath); } else { logger.warn("Failed to acquire lock: {} within timeout", lockPath); throw new TimeoutException("Failed to acquire lock: " + lockPath); } }
public void unlock() { try { mutex.release(); logger.info("Released lock: {}", lockPath); } catch (Exception e) { logger.error("Error releasing lock: {}", lockPath, e); } } } }
// Paxos客户端示例 - 分布式KV存储 public class PaxosClient implements AutoCloseable { private final String connectString; private final PaxosKVStore kvStore;
public PaxosClient(String connectString) { this.connectString = connectString; this.kvStore = new PaxosKVStore(connectString); }
// 写入键值对 public CompletableFuture<Boolean> put(String key, String value, ConsistencyLevel consistencyLevel) { return kvStore.put(key, value, consistencyLevel); }
// 读取键值 public CompletableFuture<String> get(String key, ConsistencyLevel consistencyLevel) { return kvStore.get(key, consistencyLevel); }
// 删除键 public CompletableFuture<Boolean> delete(String key) { return kvStore.delete(key); }
@Override public void close() { kvStore.close(); }
// Paxos KV存储实现 private class PaxosKVStore implements AutoCloseable { private final PaxosClient client;
public PaxosKVStore(String connectString) { // 实际实现会连接到Paxos集群 this.client = null; // 简化示例 }
public CompletableFuture<Boolean> put(String key, String value, ConsistencyLevel consistencyLevel) { // 实际实现会通过Paxos协议提交写请求 logger.info("Putting key: {} with consistency: {}", key, consistencyLevel); return CompletableFuture.completedFuture(true); }
public CompletableFuture<String> get(String key, ConsistencyLevel consistencyLevel) { // 实际实现会根据一致性级别选择读取策略 logger.info("Getting key: {} with consistency: {}", key, consistencyLevel); return CompletableFuture.completedFuture("value"); }
public CompletableFuture<Boolean> delete(String key) { // 删除操作也是写操作,通过Paxos协议提交 logger.info("Deleting key: {}", key); return CompletableFuture.completedFuture(true); }
@Override public void close() { // 释放资源 } } }
// 使用示例 public void runExample() throws Exception { // ZAB客户端使用示例 try (ZabClient zab = new ZabClient("localhost:2181")) { // 存储配置 zab.storeConfig("/app/config", "{\"timeout\": 30, \"maxRetries\": 3}");
// 读取配置 String config = zab.getConfig("/app/config"); System.out.println("Config: " + config);
// 监听配置变化 zab.watchConfig("/app/config", newConfig -> { System.out.println("Config changed: " + newConfig); });
// 使用分布式锁 ZabClient.DistributedLock lock = zab.getLock("/app/locks/resource1"); try { lock.lock(10, TimeUnit.SECONDS); // 临界区操作 System.out.println("Performing critical operation..."); Thread.sleep(1000); } finally { lock.unlock(); } }
// Paxos客户端使用示例 try (PaxosClient paxos = new PaxosClient("localhost:8000,localhost:8001,localhost:8002")) { // 写入数据 paxos.put("user:1001", "{\"name\":\"John\",\"email\":\"john@example.com\"}", ConsistencyLevel.LINEARIZABLE) .thenAccept(success -> { System.out.println("Write success: " + success); }) .join();
// 读取数据 paxos.get("user:1001", ConsistencyLevel.SEQUENTIAL) .thenAccept(value -> { System.out.println("User data: " + value); }) .join();
// 删除数据 paxos.delete("user:1001") .thenAccept(success -> { System.out.println("Delete success: " + success); }) .join(); } }
@Override public void close() throws Exception { zabClient.close(); paxosClient.close(); }}
复制代码

九、总结

ZAB 和 Paxos 都是优秀的分布式一致性算法,在现代分布式系统设计中占据核心地位。理解它们的工作原理、实现细节和适用场景,对构建可靠的分布式系统至关重要。


无论选择哪种算法,都需要根据具体应用场景、一致性需求和性能要求进行权衡。通过本文展示的工程实践和优化技术,开发者可以构建出高性能、高可靠的分布式系统,满足各种复杂业务场景的需求。

发布于: 刚刚阅读数: 5
用户头像

异常君

关注

还未添加个人签名 2025-06-06 加入

Java、Python、Go

评论

发布
暂无评论
ZAB 与 Paxos:分布式一致性算法的工程实践与深度对比_zookeeper_异常君_InfoQ写作社区