ZAB 与 Paxos:分布式一致性算法的工程实践与深度对比
- 2025-06-11 吉林
本文字数:91667 字
阅读完需:约 301 分钟

本文基于 Java 11+实现
构建可靠的分布式系统时,一致性问题是核心挑战之一。ZooKeeper 的 ZAB 协议和 Paxos 算法作为两种主流解决方案,在理论基础和工程实现上各有特点。本文深入分析它们的实现机制、性能特性和最佳实践。
一、基本概念
ZAB 协议
ZAB (ZooKeeper Atomic Broadcast) 是专为 ZooKeeper 设计的分布式一致性协议,核心目标是保证分布式系统中数据更新的原子性和顺序一致性。
Paxos 算法
Paxos 是 Leslie Lamport 提出的通用分布式一致性算法,是众多分布式系统的理论基础,解决的是在不可靠网络中如何达成共识的问题。
二、ZAB 协议实现
ZAB 协议工作在两种模式下:
恢复模式:系统启动或 Leader 崩溃时触发
广播模式:正常运行时处理写请求
核心接口定义
public interface ZabProcessor { // 恢复模式接口 boolean startRecovery() throws RecoveryException;
// 广播模式接口 CompletableFuture<Boolean> processWrite(Request request); CompletableFuture<Result> processRead(String key, ConsistencyLevel level);
// 状态查询接口 boolean isLeader(); long getCurrentEpoch();}
public interface NetworkClient { // 基础网络通信接口 void connect(String serverId, String address, int port) throws IOException; void disconnect(String serverId);
// ZAB协议消息 ACK sendProposal(String serverId, ProposalPacket proposal) throws IOException; void sendCommit(String serverId, CommitPacket commit) throws IOException; LastZxidResponse sendEpochRequest(String serverId, EpochPacket epochPkt) throws IOException; boolean sendTruncate(String serverId, TruncatePacket truncPkt) throws IOException; boolean sendTransactions(String serverId, List<Transaction> txns) throws IOException; boolean sendNewLeader(String serverId, NewLeaderPacket newLeaderPkt) throws IOException; void sendHeartbeat(String serverId, long zxid) throws IOException; void sendSnapshot(String serverId, byte[] snapshot, long zxid) throws IOException;}
public interface StateMachine { void apply(long zxid, byte[] command) throws Exception; long getLastAppliedZxid(); byte[] takeSnapshot() throws Exception; void restoreSnapshot(byte[] snapshot, long zxid) throws Exception;}
ZAB 恢复模式实现
public class ZABRecovery { private final AtomicLong zxid = new AtomicLong(0); private final AtomicInteger epoch = new AtomicInteger(0); private volatile ServerState state = ServerState.LOOKING; private final Logger logger = LoggerFactory.getLogger(ZABRecovery.class); private final ConcurrentMap<String, ServerData> serverDataMap; private final int quorumSize; private final NetworkClient networkClient; private final StateMachine stateMachine; private final String serverId;
// 构造函数 public ZABRecovery(String serverId, int quorumSize, NetworkClient networkClient, StateMachine stateMachine) { this.serverId = serverId; this.quorumSize = quorumSize; this.networkClient = networkClient; this.stateMachine = stateMachine; this.serverDataMap = new ConcurrentHashMap<>(); }
// Leader恢复流程 public boolean startRecovery() throws RecoveryException { MDC.put("component", "zab-recovery"); MDC.put("serverId", serverId); try { // 1. 更新选举轮次 int newEpoch = epoch.incrementAndGet(); logger.info("Starting recovery with epoch: {}", newEpoch);
// 2. 发现阶段:收集所有Follower状态 Map<Long, Set<String>> commitMap = discoverFollowerStates();
// 3. 确定截断点和提交点 long truncateZxid = determineMaxCommittedZxid(commitMap); logger.info("Determined truncate zxid: {}", Long.toHexString(truncateZxid));
// 4. 解决可能的冲突(脑裂后) resolveConflictsAfterPartition(truncateZxid, commitMap);
// 5. 同步阶段:将历史事务同步给Follower syncFollowers(truncateZxid);
// 6. 切换到广播模式 state = ServerState.LEADING; logger.info("Recovery completed, switching to broadcast mode"); return true; } catch (IOException e) { logger.error("Recovery failed due to I/O error", e); state = ServerState.LOOKING; throw new RecoveryException("I/O error during recovery", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.warn("Recovery interrupted", e); state = ServerState.LOOKING; throw new RecoveryException("Recovery process interrupted", e); } catch (Exception e) { logger.error("Unexpected error during recovery", e); state = ServerState.LOOKING; throw new RecoveryException("Unexpected error during recovery", e); } finally { MDC.remove("component"); MDC.remove("serverId"); } }
// 发现阶段:收集所有Follower的最新事务信息 private Map<Long, Set<String>> discoverFollowerStates() throws IOException, InterruptedException { Map<Long, Set<String>> acceptedZxids = new ConcurrentHashMap<>(); CountDownLatch latch = new CountDownLatch(serverDataMap.size()); List<CompletableFuture<?>> futures = new ArrayList<>();
// 向所有Follower发送CEPOCH消息 for (var entry : serverDataMap.entrySet()) { final String targetServerId = entry.getKey(); final ServerData serverData = entry.getValue();
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { MDC.put("targetServerId", targetServerId); try { // 发送新的epoch EpochPacket epochPkt = new EpochPacket(epoch.get()); LastZxidResponse response = networkClient.sendEpochRequest( targetServerId, epochPkt);
// 记录该服务器的最新zxid synchronized (acceptedZxids) { acceptedZxids.computeIfAbsent(response.getLastZxid(), k -> new HashSet<>()) .add(targetServerId); }
logger.info("Server {} last zxid: {}", targetServerId, Long.toHexString(response.getLastZxid())); } catch (IOException e) { logger.error("Failed to discover state from server: {}", targetServerId, e); } finally { MDC.remove("targetServerId"); latch.countDown(); } });
futures.add(future); }
// 等待大多数响应或超时 if (!latch.await(10, TimeUnit.SECONDS)) { logger.warn("Discovery phase timed out, proceeding with available responses"); }
// 取消未完成的任务 for (CompletableFuture<?> future : futures) { if (!future.isDone()) { future.cancel(true); } }
return acceptedZxids; }
// 确定需要保留的最大已提交事务ID private long determineMaxCommittedZxid(Map<Long, Set<String>> commitMap) { // 寻找被多数派确认的最大ZXID long maxZxid = 0; int quorum = getQuorum();
for (var entry : commitMap.entrySet()) { if (entry.getValue().size() >= quorum && entry.getKey() > maxZxid) { maxZxid = entry.getKey(); } } return maxZxid; }
// 解决网络分区后可能的数据冲突 private void resolveConflictsAfterPartition(long truncateZxid, Map<Long, Set<String>> commitMap) { logger.info("Checking for potential conflicts after network partition");
// 1. 识别潜在冲突事务 - 那些不在多数派中的更高zxid List<ConflictingTransaction> conflicts = new ArrayList<>();
for (var entry : commitMap.entrySet()) { long txnZxid = entry.getKey(); Set<String> servers = entry.getValue();
// 如果zxid大于已确定的截断点,但不是多数派确认的 if (txnZxid > truncateZxid && servers.size() < getQuorum()) { // 获取事务的epoch int txnEpoch = ZxidUtils.getEpochFromZxid(txnZxid); int truncateEpoch = ZxidUtils.getEpochFromZxid(truncateZxid);
conflicts.add(new ConflictingTransaction(txnZxid, truncateZxid, txnEpoch, truncateEpoch, servers)); } }
// 2. 处理冲突 if (!conflicts.isEmpty()) { logger.warn("Found {} potential conflicting transactions after partition", conflicts.size());
for (ConflictingTransaction conflict : conflicts) { if (conflict.isFromHigherEpoch()) { logger.warn("Conflict: transaction with zxid {} from higher epoch {} " + "found but not in majority. Will be discarded.", Long.toHexString(conflict.getConflictZxid()), conflict.getConflictEpoch()); } else { logger.warn("Conflict: transaction with zxid {} from same epoch {} " + "found but not in majority. Will be discarded.", Long.toHexString(conflict.getConflictZxid()), conflict.getConflictEpoch()); }
// 通知这些服务器截断这些事务 notifyServersToTruncate(conflict.getServers(), truncateZxid); } } else { logger.info("No conflicting transactions found"); } }
// 通知服务器截断超出安全点的事务 private void notifyServersToTruncate(Set<String> servers, long truncateZxid) { for (String serverId : servers) { CompletableFuture.runAsync(() -> { try { TruncatePacket truncPkt = new TruncatePacket(truncateZxid); boolean success = networkClient.sendTruncate(serverId, truncPkt); if (success) { logger.info("Successfully notified server {} to truncate to zxid {}", serverId, Long.toHexString(truncateZxid)); } else { logger.warn("Failed to notify server {} to truncate", serverId); } } catch (IOException e) { logger.error("Error notifying server {} to truncate", serverId, e); } }); } }
// 同步阶段:将历史事务同步给Follower private void syncFollowers(long truncateZxid) throws IOException, InterruptedException { // 获取从truncateZxid开始的所有事务 List<Transaction> txns = loadTransactionsFromLog(truncateZxid); logger.info("Syncing {} transactions to followers", txns.size());
// 并行同步给所有Follower CountDownLatch syncLatch = new CountDownLatch(serverDataMap.size()); AtomicInteger successCount = new AtomicInteger(0); List<CompletableFuture<?>> futures = new ArrayList<>();
for (var entry : serverDataMap.entrySet()) { final String targetServerId = entry.getKey(); final ServerData serverData = entry.getValue();
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { MDC.put("targetServerId", targetServerId); try { // 检查Follower是否需要使用快照追赶 long followerZxid = serverData.getLastZxid(); if (truncateZxid - followerZxid > SNAPSHOT_THRESHOLD) { syncFollowerWithSnapshot(targetServerId, followerZxid); } else { // 1. 发送TRUNC命令,通知Follower截断日志 TruncatePacket truncPkt = new TruncatePacket(truncateZxid); if (networkClient.sendTruncate(targetServerId, truncPkt)) { // 2. 发送DIFF命令,同步缺失的事务 if (networkClient.sendTransactions(targetServerId, txns)) { // 3. 发送NEWLEADER命令,确认同步完成 NewLeaderPacket newLeaderPkt = new NewLeaderPacket(epoch.get()); if (networkClient.sendNewLeader(targetServerId, newLeaderPkt)) { // 同步成功 successCount.incrementAndGet(); logger.info("Successfully synced server: {}", targetServerId); } } } } } catch (IOException e) { logger.error("Failed to sync server {} with {} transactions, last zxid: {}", targetServerId, txns.size(), Long.toHexString(truncateZxid), e); } finally { MDC.remove("targetServerId"); syncLatch.countDown(); } });
futures.add(future); }
// 等待同步完成或超时 if (!syncLatch.await(30, TimeUnit.SECONDS)) { logger.warn("Sync phase timed out"); }
// 取消未完成的任务 for (CompletableFuture<?> future : futures) { if (!future.isDone()) { future.cancel(true); } }
// 检查是否有足够的服务器同步成功 if (successCount.get() < quorumSize) { throw new QuorumNotFoundException("Failed to sync with quorum of followers", successCount.get(), quorumSize); } }
// 使用快照同步落后太多的Follower private void syncFollowerWithSnapshot(String followerId, long followerZxid) throws IOException { try { logger.info("Follower {} is too far behind (zxid: {}), syncing with snapshot", followerId, Long.toHexString(followerZxid));
// 1. 获取当前状态快照 byte[] snapshot = stateMachine.takeSnapshot();
// 2. 发送快照给Follower networkClient.sendSnapshot(followerId, snapshot, zxid.get());
logger.info("Successfully sent snapshot to follower: {}", followerId); } catch (Exception e) { logger.error("Failed to sync follower {} with snapshot", followerId, e); throw new IOException("Snapshot sync failed", e); } }
// 从事务日志加载事务 private List<Transaction> loadTransactionsFromLog(long fromZxid) throws IOException { List<Transaction> result = new ArrayList<>(); // 实际实现会从持久化存储读取事务记录 logger.info("Loading transactions starting from zxid: {}", Long.toHexString(fromZxid)); return result; }
private int getQuorum() { return quorumSize / 2 + 1; }
// 常量定义 private static final long SNAPSHOT_THRESHOLD = 100000; // 事务差距超过10万时使用快照
// 冲突事务数据结构 static class ConflictingTransaction { private final long conflictZxid; private final long truncateZxid; private final int conflictEpoch; private final int truncateEpoch; private final Set<String> servers;
public ConflictingTransaction(long conflictZxid, long truncateZxid, int conflictEpoch, int truncateEpoch, Set<String> servers) { this.conflictZxid = conflictZxid; this.truncateZxid = truncateZxid; this.conflictEpoch = conflictEpoch; this.truncateEpoch = truncateEpoch; this.servers = new HashSet<>(servers); }
public boolean isFromHigherEpoch() { return conflictEpoch > truncateEpoch; }
public long getConflictZxid() { return conflictZxid; }
public int getConflictEpoch() { return conflictEpoch; }
public Set<String> getServers() { return Collections.unmodifiableSet(servers); } }
// 其他内部类定义...
enum ServerState { LOOKING, // 寻找Leader FOLLOWING, // Follower角色 LEADING // Leader角色 }}
ZAB 广播模式实现
public class ZABBroadcast implements AutoCloseable { private final AtomicLong zxid; private final AtomicInteger epoch; private final ConcurrentMap<String, ServerData> followers; private final Logger logger = LoggerFactory.getLogger(ZABBroadcast.class); private final CircuitBreaker circuitBreaker; private final NetworkClient networkClient; private final StateMachine stateMachine; private final String serverId; private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); private final ScheduledExecutorService scheduler; private final MetricsCollector metrics; private final RateLimiter heartbeatLogLimiter = RateLimiter.create(0.1); // 每10秒最多一条日志
public ZABBroadcast(String serverId, AtomicLong zxid, AtomicInteger epoch, NetworkClient networkClient, StateMachine stateMachine) { this.serverId = serverId; this.zxid = zxid; this.epoch = epoch; this.networkClient = networkClient; this.stateMachine = stateMachine; this.followers = new ConcurrentHashMap<>(); this.circuitBreaker = new CircuitBreaker(5, 10000); // 5次失败,10秒重置 this.scheduler = Executors.newScheduledThreadPool(2, r -> { Thread t = new Thread(r, "zab-scheduler-" + serverId); t.setDaemon(true); return t; }); this.metrics = new MetricsCollector("zab_broadcast");
// 启动心跳任务 scheduler.scheduleWithFixedDelay(this::sendHeartbeats, 500, 500, TimeUnit.MILLISECONDS); }
// 添加Follower public void addFollower(ServerData follower) { followers.put(follower.getId(), follower); }
// Leader处理写请求 public CompletableFuture<Boolean> processWrite(Request request) { Stopwatch stopwatch = Stopwatch.createStarted(); MDC.put("component", "zab-broadcast"); MDC.put("serverId", serverId); MDC.put("requestId", request.getId());
try { return GlobalExceptionHandler.withExceptionHandling( circuitBreaker.execute(() -> { try { // 1. 为请求生成zxid (高32位是epoch,低32位是计数器) long newZxid = createNewZxid(); MDC.put("zxid", Long.toHexString(newZxid)); logger.info("Processing write request: {} with zxid: {}", request.getId(), Long.toHexString(newZxid));
// 2. 将请求发送给所有Follower List<Future<ACK>> futures = sendToFollowers(request, newZxid);
// 3. 等待过半Follower的ACK if (waitForMajority(futures)) { // 4. 通知所有Follower提交事务 commit(newZxid); logger.info("Request {} committed successfully", request.getId());
// 5. 记录指标 metrics.recordSuccessfulWrite(stopwatch.elapsed(TimeUnit.MILLISECONDS)); return CompletableFuture.completedFuture(true); } else { logger.warn("Failed to get majority ACKs for request {}", request.getId()); metrics.recordFailedWrite(); return CompletableFuture.completedFuture(false); } } catch (IOException e) { logger.error("Failed to process write request: {}", request.getId(), e); metrics.recordFailedWrite(); return CompletableFuture.failedFuture( new ProcessingException("Failed to process write request", e)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.warn("Interrupted while processing write request: {}", request.getId(), e); metrics.recordFailedWrite(); return CompletableFuture.failedFuture( new ProcessingException("Interrupted during write processing", e)); } }) ); } catch (CircuitBreakerOpenException e) { logger.error("Circuit breaker is open, rejecting request: {}", request.getId()); metrics.recordRejectedWrite(); return CompletableFuture.failedFuture( new ProcessingException("Circuit breaker open, system overloaded", e)); } finally { MDC.remove("component"); MDC.remove("serverId"); MDC.remove("requestId"); MDC.remove("zxid"); } }
// 处理批量写请求,提高吞吐量 public CompletableFuture<Map<String, Boolean>> processBatchWrite(List<Request> requests) { if (requests.isEmpty()) { return CompletableFuture.completedFuture(Collections.emptyMap()); }
Stopwatch stopwatch = Stopwatch.createStarted(); MDC.put("component", "zab-broadcast"); MDC.put("serverId", serverId); MDC.put("batchSize", String.valueOf(requests.size()));
try { return GlobalExceptionHandler.withExceptionHandling( circuitBreaker.execute(() -> { Map<String, Boolean> results = new HashMap<>(); try { // 创建批处理包 BatchRequest batch = new BatchRequest(); for (Request req : requests) { batch.addRequest(req); results.put(req.getId(), false); // 默认失败 }
// 为批次生成一个zxid long batchZxid = createNewZxid(); MDC.put("zxid", Long.toHexString(batchZxid)); logger.info("Processing batch of {} requests with zxid: {}", requests.size(), Long.toHexString(batchZxid));
// 发送批处理请求给所有Follower List<Future<ACK>> futures = sendBatchToFollowers(batch, batchZxid);
// 等待多数派确认 if (waitForMajority(futures)) { // 提交批次 commitBatch(batchZxid); logger.info("Batch with {} requests committed successfully", requests.size());
// 设置所有请求结果为成功 for (Request req : requests) { results.put(req.getId(), true); }
metrics.recordSuccessfulBatchWrite( requests.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS)); } else { logger.warn("Failed to get majority ACKs for batch"); metrics.recordFailedBatchWrite(requests.size()); } } catch (Exception e) { logger.error("Error processing batch write of {} requests", requests.size(), e); metrics.recordFailedBatchWrite(requests.size()); } return CompletableFuture.completedFuture(results); }) ); } catch (CircuitBreakerOpenException e) { logger.error("Circuit breaker is open, rejecting batch of {} requests", requests.size()); metrics.recordRejectedBatchWrite(requests.size());
Map<String, Boolean> results = new HashMap<>(); for (Request req : requests) { results.put(req.getId(), false); } return CompletableFuture.failedFuture( new ProcessingException("Circuit breaker open, system overloaded", e)); } finally { MDC.remove("component"); MDC.remove("serverId"); MDC.remove("batchSize"); MDC.remove("zxid"); } }
// 读取操作的一致性保证 public CompletableFuture<Result> readWithConsistency(String key, ConsistencyLevel level) { Stopwatch stopwatch = Stopwatch.createStarted(); MDC.put("component", "zab-broadcast"); MDC.put("serverId", serverId); MDC.put("key", key); MDC.put("consistency", level.name());
try { ReadStrategy strategy = readStrategies.getOrDefault( level, readStrategies.get(ConsistencyLevel.EVENTUAL));
CompletableFuture<Result> result = strategy.execute(key, this::readLocal);
result.thenAccept(r -> metrics.recordRead(level, stopwatch.elapsed(TimeUnit.MILLISECONDS)));
return result; } catch (Exception e) { logger.error("Error performing {} read for key: {}", level, key, e); metrics.recordFailedRead(level); return CompletableFuture.failedFuture( new ProcessingException("Read operation failed", e)); } finally { MDC.remove("component"); MDC.remove("serverId"); MDC.remove("key"); MDC.remove("consistency"); } }
// 本地读取数据 private Result readLocal(String key) { rwLock.readLock().lock(); try { // 实际实现会从本地数据库读取 return new Result(key, "value", true); } finally { rwLock.readLock().unlock(); } }
// 生成新的zxid,处理溢出情况 private long createNewZxid() { rwLock.writeLock().lock(); try { long currentCounter = zxid.get() & 0xFFFFFFFFL; // 检测溢出并处理 if (currentCounter >= 0xFFFFFFFFL) { // 计数器即将溢出,增加epoch int newEpoch = epoch.incrementAndGet(); logger.warn("ZXID counter overflow, incrementing epoch to {}", newEpoch); long newZxid = ((long)newEpoch << 32); // 重置计数器 zxid.set(newZxid); return newZxid; } return zxid.incrementAndGet(); } finally { rwLock.writeLock().unlock(); } }
// 发送提案给所有Follower private List<Future<ACK>> sendToFollowers(Request request, long newZxid) throws IOException { List<Future<ACK>> futures = new ArrayList<>(); ProposalPacket proposal = new ProposalPacket(newZxid, request);
ExecutorService executor = Executors.newFixedThreadPool(followers.size(), r -> { Thread t = new Thread(r, "proposal-sender-" + serverId); t.setDaemon(true); return t; });
try { for (var entry : followers.entrySet()) { final String targetServerId = entry.getKey();
futures.add(executor.submit(() -> { MDC.put("targetServerId", targetServerId); try { ACK ack = networkClient.sendProposal(targetServerId, proposal); logger.debug("Received ACK from {} for zxid {}", targetServerId, Long.toHexString(newZxid)); return ack; } catch (IOException e) { logger.error("Failed to send proposal to follower {}, zxid: {}", targetServerId, Long.toHexString(newZxid), e); return null; } finally { MDC.remove("targetServerId"); } })); } } finally { executor.shutdown(); try { if (!executor.awaitTermination(200, TimeUnit.MILLISECONDS)) { List<Runnable> pendingTasks = executor.shutdownNow(); logger.warn("Force shutdown executor with {} pending tasks", pendingTasks.size()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.warn("Interrupted while waiting for executor to terminate"); } }
return futures; }
// 等待多数派响应 private boolean waitForMajority(List<Future<ACK>> futures) throws InterruptedException { int ackCount = 0; int majority = (followers.size() / 2) + 1;
for (Future<ACK> future : futures) { try { ACK ack = future.get(5, TimeUnit.SECONDS); if (ack != null && ack.isSuccess()) { ackCount++; if (ackCount >= majority) { // 已获得多数派确认,可以提前返回 return true; } } } catch (ExecutionException e) { logger.warn("Error getting ACK", e.getCause()); } catch (TimeoutException e) { logger.warn("Timeout waiting for ACK"); } }
return ackCount >= majority; }
// 通知所有Follower提交事务 private void commit(long zxid) throws IOException { CommitPacket commit = new CommitPacket(zxid);
for (var entry : followers.entrySet()) { final String targetServerId = entry.getKey();
CompletableFuture.runAsync(() -> { MDC.put("targetServerId", targetServerId); try { networkClient.sendCommit(targetServerId, commit); logger.debug("Sent commit to {} for zxid {}", targetServerId, Long.toHexString(zxid)); } catch (IOException e) { logger.error("Failed to send commit to follower {}, zxid: {}", targetServerId, Long.toHexString(zxid), e); } finally { MDC.remove("targetServerId"); } }); } }
// 发送批处理请求 private List<Future<ACK>> sendBatchToFollowers(BatchRequest batch, long batchZxid) throws IOException { ProposalPacket proposal = new ProposalPacket(batchZxid, batch); return sendProposalToFollowers(proposal, batchZxid); }
// 提交批处理请求 private void commitBatch(long batchZxid) throws IOException { commit(batchZxid); }
// 发送心跳给所有Follower private void sendHeartbeats() { long currentZxid = zxid.get();
for (var entry : followers.entrySet()) { final String targetServerId = entry.getKey();
CompletableFuture.runAsync(() -> { try { networkClient.sendHeartbeat(targetServerId, currentZxid); } catch (IOException e) { // 心跳失败,使用限流器避免日志泛滥 if (heartbeatLogLimiter.tryAcquire()) { logger.debug("Failed to send heartbeat to {}", targetServerId, e); } } }); } }
// 发送提案给所有Follower(通用方法) private List<Future<ACK>> sendProposalToFollowers(ProposalPacket proposal, long zxid) throws IOException { List<Future<ACK>> futures = new ArrayList<>();
ExecutorService executor = Executors.newFixedThreadPool(followers.size(), r -> { Thread t = new Thread(r, "proposal-sender-" + serverId); t.setDaemon(true); return t; });
try { for (var entry : followers.entrySet()) { final String targetServerId = entry.getKey();
futures.add(executor.submit(() -> { MDC.put("targetServerId", targetServerId); try { ACK ack = networkClient.sendProposal(targetServerId, proposal); logger.debug("Received ACK from {} for zxid {}", targetServerId, Long.toHexString(zxid)); return ack; } catch (IOException e) { logger.error("Failed to send proposal to follower {}, zxid: {}", targetServerId, Long.toHexString(zxid), e); return null; } finally { MDC.remove("targetServerId"); } })); } } finally { executor.shutdown(); try { if (!executor.awaitTermination(200, TimeUnit.MILLISECONDS)) { List<Runnable> pendingTasks = executor.shutdownNow(); logger.warn("Force shutdown executor with {} pending tasks", pendingTasks.size()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.warn("Interrupted while waiting for executor to terminate"); } }
return futures; }
// 定义读取策略接口和实现 private interface ReadStrategy { CompletableFuture<Result> execute(String key, Supplier<Result> readFromLocal); }
private final Map<ConsistencyLevel, ReadStrategy> readStrategies = new EnumMap<>(ConsistencyLevel.class);
{ // 初始化读取策略 readStrategies.put(ConsistencyLevel.LINEARIZABLE, new LinearizableReadStrategy()); readStrategies.put(ConsistencyLevel.SEQUENTIAL, new SequentialReadStrategy()); readStrategies.put(ConsistencyLevel.READ_YOUR_WRITES, new ReadYourWritesStrategy()); readStrategies.put(ConsistencyLevel.BOUNDED_STALENESS, new BoundedStalenessStrategy()); readStrategies.put(ConsistencyLevel.EVENTUAL, new EventualReadStrategy()); }
// 线性一致性读取策略 private class LinearizableReadStrategy implements ReadStrategy { private final AtomicLong leaseExpirationTime = new AtomicLong(0); private final long leaderLeaseMs = 5000; // 5秒租约
@Override public CompletableFuture<Result> execute(String key, Supplier<Result> readFromLocal) { // Leader需要确认自己仍然是Leader (租约机制) if (System.currentTimeMillis() < leaseExpirationTime.get()) { // 租约有效,可以安全读取 return CompletableFuture.completedFuture(readFromLocal.get()); } else { // 租约过期,需要重新获取多数派确认 return renewLease().thenApply(renewed -> { if (renewed) { return readFromLocal.get(); } else { throw new ConsistencyException("Cannot guarantee linearizable read"); } }); } }
private CompletableFuture<Boolean> renewLease() { // 实际实现中,需要获取多数派确认 leaseExpirationTime.set(System.currentTimeMillis() + leaderLeaseMs); logger.info("Renewed leader lease until {}", leaseExpirationTime.get()); return CompletableFuture.completedFuture(true); } }
// 顺序一致性读取策略 private class SequentialReadStrategy implements ReadStrategy { @Override public CompletableFuture<Result> execute(String key, Supplier<Result> readFromLocal) { // 确保应用了所有已提交的事务 return ensureAppliedUpToDate() .thenApply(v -> readFromLocal.get()); }
private CompletableFuture<Void> ensureAppliedUpToDate() { // 实际实现会确保所有已提交的事务都已应用 logger.debug("Ensuring all committed transactions are applied"); return CompletableFuture.completedFuture(null); } }
// 读己所写策略 private class ReadYourWritesStrategy implements ReadStrategy { private final ConcurrentMap<String, Long> writeTimestamps = new ConcurrentHashMap<>();
@Override public CompletableFuture<Result> execute(String key, Supplier<Result> readFromLocal) { // 检查是否有该key的写入记录 Long writeTime = writeTimestamps.get(key); if (writeTime != null) { // 确保经过足够时间,写入已经完成 long elapsed = System.currentTimeMillis() - writeTime; if (elapsed < 100) { // 假设100ms足够写入完成 try { Thread.sleep(100 - elapsed); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
return CompletableFuture.completedFuture(readFromLocal.get()); }
// 记录写入操作 public void recordWrite(String key) { writeTimestamps.put(key, System.currentTimeMillis()); } }
// 有界陈旧性策略 private class BoundedStalenessStrategy implements ReadStrategy { private final ConcurrentMap<String, CacheEntry> cache = new ConcurrentHashMap<>(); private final long maxStalenessMs = 1000; // 最大陈旧时间1秒
@Override public CompletableFuture<Result> execute(String key, Supplier<Result> readFromLocal) { // 检查缓存 CacheEntry entry = cache.get(key); if (entry != null) { long age = System.currentTimeMillis() - entry.getTimestamp(); if (age <= maxStalenessMs) { // 缓存未过期,直接返回 return CompletableFuture.completedFuture(entry.getResult()); } }
// 缓存过期或不存在,从本地读取并更新缓存 Result result = readFromLocal.get(); cache.put(key, new CacheEntry(result, System.currentTimeMillis())); return CompletableFuture.completedFuture(result); }
// 定期清理过期缓存 public void cleanup() { long now = System.currentTimeMillis(); cache.entrySet().removeIf(entry -> now - entry.getValue().getTimestamp() > maxStalenessMs); } }
// 最终一致性策略 private class EventualReadStrategy implements ReadStrategy { @Override public CompletableFuture<Result> execute(String key, Supplier<Result> readFromLocal) { // 直接从本地读取,不保证看到最新写入 return CompletableFuture.completedFuture(readFromLocal.get()); } }
// 缓存条目 private static class CacheEntry { private final Result result; private final long timestamp;
public CacheEntry(Result result, long timestamp) { this.result = result; this.timestamp = timestamp; }
public Result getResult() { return result; }
public long getTimestamp() { return timestamp; } }
@Override public void close() { try { List<Runnable> pendingTasks = scheduler.shutdownNow(); if (!pendingTasks.isEmpty()) { logger.warn("Scheduler shutdown with {} pending tasks", pendingTasks.size()); }
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { logger.warn("Scheduler did not terminate in time"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.warn("Interrupted while waiting for scheduler termination"); } }
// 断路器实现(更安全的版本) static class CircuitBreaker { private final AtomicReference<State> state = new AtomicReference<>(State.CLOSED); private final AtomicLong failureCount = new AtomicLong(0); private final AtomicLong lastFailureTime = new AtomicLong(0); private final int threshold; private final long resetTimeoutMs; private final StampedLock stateLock = new StampedLock(); private final Logger logger = LoggerFactory.getLogger(CircuitBreaker.class);
public enum State { CLOSED, OPEN, HALF_OPEN }
public CircuitBreaker(int threshold, long resetTimeoutMs) { this.threshold = threshold; this.resetTimeoutMs = resetTimeoutMs; }
public <T> CompletableFuture<T> execute(Supplier<CompletableFuture<T>> action) throws CircuitBreakerOpenException { State currentState = getCurrentState();
if (currentState == State.OPEN) { // 检查是否应该尝试半开状态 if (System.currentTimeMillis() - lastFailureTime.get() > resetTimeoutMs) { boolean transitioned = tryTransitionState(State.OPEN, State.HALF_OPEN); if (!transitioned) { throw new CircuitBreakerOpenException("Circuit breaker is open"); } currentState = State.HALF_OPEN; } else { throw new CircuitBreakerOpenException("Circuit breaker is open"); } }
final State executionState = currentState;
try { CompletableFuture<T> future = action.get(); return future.handle((result, ex) -> { if (ex != null) { recordFailure(); throw new CompletionException(ex); } else { // 成功执行,重置失败计数 if (executionState == State.HALF_OPEN) { tryTransitionState(State.HALF_OPEN, State.CLOSED); } failureCount.set(0); return result; } }); } catch (Exception e) { recordFailure(); throw e; } }
private void recordFailure() { long stamp = stateLock.writeLock(); try { long failures = failureCount.incrementAndGet(); lastFailureTime.set(System.currentTimeMillis());
if (failures >= threshold && state.get() == State.CLOSED) { logger.warn("Circuit breaker opening after {} failures", failures); state.set(State.OPEN); } } finally { stateLock.unlockWrite(stamp); } }
private boolean tryTransitionState(State fromState, State toState) { long stamp = stateLock.writeLock(); try { if (state.get() == fromState) { state.set(toState); logger.info("Circuit breaker state changed from {} to {}", fromState, toState); return true; } return false; } finally { stateLock.unlockWrite(stamp); } }
// 使用乐观读获取当前状态 public State getCurrentState() { long stamp = stateLock.tryOptimisticRead(); State result = state.get();
if (!stateLock.validate(stamp)) { stamp = stateLock.readLock(); try { result = state.get(); } finally { stateLock.unlockRead(stamp); } }
return result; } }
// 全局异常处理器 static class GlobalExceptionHandler { private static final Logger logger = LoggerFactory.getLogger(GlobalExceptionHandler.class);
public static <T> CompletableFuture<T> withExceptionHandling(CompletableFuture<T> future) { return future.exceptionally(e -> { Throwable cause = e instanceof CompletionException ? e.getCause() : e;
if (cause instanceof ConsistencyException) { logger.error("Consistency error: {}", cause.getMessage()); } else if (cause instanceof IOException) { logger.error("I/O error: {}", cause.getMessage()); } else if (cause instanceof InterruptedException) { Thread.currentThread().interrupt(); logger.warn("Operation interrupted"); } else { logger.error("Unexpected error: {}", cause.getClass().getName(), cause); }
throw new CompletionException(cause); }); } }
// 指标收集类 private static class MetricsCollector { private final Counter writeRequests; private final Counter writeSuccess; private final Counter writeFailed; private final Counter writeRejected; private final Counter batchWrites; private final Counter batchWriteRequests; private final Counter readRequests; private final Map<ConsistencyLevel, Counter> readsByLevel = new EnumMap<>(ConsistencyLevel.class); private final Histogram writeLatency; private final Histogram batchWriteLatency; private final Map<ConsistencyLevel, Histogram> readLatency = new EnumMap<>(ConsistencyLevel.class);
public MetricsCollector(String prefix) { this.writeRequests = Counter.build() .name(prefix + "_write_requests_total") .help("Total number of write requests").register();
this.writeSuccess = Counter.build() .name(prefix + "_write_success_total") .help("Total number of successful writes").register();
this.writeFailed = Counter.build() .name(prefix + "_write_failed_total") .help("Total number of failed writes").register();
this.writeRejected = Counter.build() .name(prefix + "_write_rejected_total") .help("Total number of rejected writes").register();
this.batchWrites = Counter.build() .name(prefix + "_batch_writes_total") .help("Total number of batch writes").register();
this.batchWriteRequests = Counter.build() .name(prefix + "_batch_write_requests_total") .help("Total number of requests in batch writes").register();
this.readRequests = Counter.build() .name(prefix + "_read_requests_total") .help("Total number of read requests").register();
this.writeLatency = Histogram.build() .name(prefix + "_write_latency_ms") .help("Write latency in milliseconds").register();
this.batchWriteLatency = Histogram.build() .name(prefix + "_batch_write_latency_ms") .help("Batch write latency in milliseconds").register();
// 初始化各一致性级别的计数器和直方图 for (ConsistencyLevel level : ConsistencyLevel.values()) { readsByLevel.put(level, Counter.build() .name(prefix + "_reads_" + level.name().toLowerCase() + "_total") .help("Total " + level + " reads").register());
readLatency.put(level, Histogram.build() .name(prefix + "_read_" + level.name().toLowerCase() + "_latency_ms") .help(level + " read latency in milliseconds").register()); } }
public void recordSuccessfulWrite(long latencyMs) { writeRequests.inc(); writeSuccess.inc(); writeLatency.observe(latencyMs); }
public void recordFailedWrite() { writeRequests.inc(); writeFailed.inc(); }
public void recordRejectedWrite() { writeRequests.inc(); writeRejected.inc(); }
public void recordSuccessfulBatchWrite(int batchSize, long latencyMs) { batchWrites.inc(); batchWriteRequests.inc(batchSize); writeRequests.inc(batchSize); writeSuccess.inc(batchSize); batchWriteLatency.observe(latencyMs); }
public void recordFailedBatchWrite(int batchSize) { batchWrites.inc(); batchWriteRequests.inc(batchSize); writeRequests.inc(batchSize); writeFailed.inc(batchSize); }
public void recordRejectedBatchWrite(int batchSize) { batchWrites.inc(); batchWriteRequests.inc(batchSize); writeRequests.inc(batchSize); writeRejected.inc(batchSize); }
public void recordRead(ConsistencyLevel level, long latencyMs) { readRequests.inc(); readsByLevel.get(level).inc(); readLatency.get(level).observe(latencyMs); }
public void recordFailedRead(ConsistencyLevel level) { readRequests.inc(); // 可以添加失败计数器 } }
// 异常类 public static class CircuitBreakerOpenException extends Exception { public CircuitBreakerOpenException(String message) { super(message); } }
public static class ConsistencyException extends RuntimeException { public ConsistencyException(String message) { super(message); } }
public static class ProcessingException extends RuntimeException { public ProcessingException(String message, Throwable cause) { super(message, cause); } }
// 其他内部类和常量定义...
enum ConsistencyLevel { LINEARIZABLE, // 线性一致性(最强) SEQUENTIAL, // 顺序一致性 READ_YOUR_WRITES, // 读己所写 BOUNDED_STALENESS, // 有界陈旧性 EVENTUAL // 最终一致性(最弱) }}
Fast Leader Election 算法
public class FastLeaderElection { private final AtomicLong logicalClock = new AtomicLong(0); private final ConcurrentMap<String, Vote> receivedVotes = new ConcurrentHashMap<>(); private final String serverId; private final NetworkManager networkManager; private final int quorumSize; private final AtomicInteger electionAttempts = new AtomicInteger(0); private final Logger logger = LoggerFactory.getLogger(FastLeaderElection.class); private final ZxidUtils zxidUtils;
public FastLeaderElection(String serverId, int quorumSize, NetworkManager networkManager, ZxidUtils zxidUtils) { this.serverId = serverId; this.quorumSize = quorumSize; this.networkManager = networkManager; this.zxidUtils = zxidUtils; }
public String lookForLeader() throws InterruptedException { MDC.put("component", "fast-leader-election"); MDC.put("serverId", serverId); try { // 递增逻辑时钟 long newLogicalClock = logicalClock.incrementAndGet(); logger.info("Starting leader election with logical clock: {}", newLogicalClock);
// 初始化选票,投给自己 Vote vote = new Vote(serverId, zxidUtils.getLastZxid(), newLogicalClock); receivedVotes.clear(); receivedVotes.put(serverId, vote);
// 向所有其他服务器发送选票 networkManager.broadcastVote(vote);
// 选举超时时间 long startTime = System.currentTimeMillis(); long maxTimeout = 60000; // 60秒最大超时
// 选举循环 Map<String, Integer> voteCounter = new HashMap<>(); String currentLeader = null;
while (System.currentTimeMillis() - startTime < maxTimeout) { // 接收选票 Vote receivedVote = networkManager.receiveVote(200); // 200ms超时 if (receivedVote != null) { MDC.put("candidateId", receivedVote.getServerId()); logger.debug("Received vote from {}: zxid={}, logicalClock={}", receivedVote.getServerId(), Long.toHexString(receivedVote.getZxid()), receivedVote.getLogicalClock());
// 验证逻辑时钟 if (receivedVote.getLogicalClock() > newLogicalClock) { // 发现更高的逻辑时钟,需要更新自己的时钟并重新开始选举 logicalClock.set(receivedVote.getLogicalClock()); logger.info("Found higher logical clock: {}, restarting election", receivedVote.getLogicalClock()); MDC.remove("candidateId"); electionAttempts.set(0); // 重置尝试计数 return lookForLeader(); // 重新开始选举 } else if (receivedVote.getLogicalClock() < newLogicalClock) { // 忽略旧的逻辑时钟选票 logger.debug("Ignoring vote with older logical clock: {}", receivedVote.getLogicalClock()); MDC.remove("candidateId"); continue; }
// 比较选票 int comparison = compareVotes(vote, receivedVote); if (comparison < 0) { // 收到更好的选票,更新自己的选票 vote = new Vote(receivedVote.getServerId(), receivedVote.getZxid(), newLogicalClock); // 重新广播更新后的选票 networkManager.broadcastVote(vote); logger.info("Updated vote to server: {}", vote.getServerId()); }
// 记录收到的选票 receivedVotes.put(receivedVote.getServerId(), receivedVote); MDC.remove("candidateId");
// 统计票数 voteCounter.clear(); for (Vote v : receivedVotes.values()) { String candidate = v.getServerId(); voteCounter.put(candidate, voteCounter.getOrDefault(candidate, 0) + 1);
// 检查是否有候选人获得多数派支持 if (voteCounter.get(candidate) >= quorumSize) { currentLeader = candidate; logger.info("Elected leader: {} with {} votes of {} required", candidate, voteCounter.get(candidate), quorumSize); break; } }
if (currentLeader != null) { break; // 选出了Leader } } }
if (currentLeader == null) { // 处理选举失败,使用指数退避避免活锁 handleElectionFailure(); logger.warn("Failed to elect a leader, retrying..."); return lookForLeader(); // 重试 }
electionAttempts.set(0); // 重置尝试计数 return currentLeader;
} catch (Exception e) { logger.error("Error during leader election", e); // 增加选举尝试计数并退避 handleElectionFailure(); throw new LeaderElectionException("Leader election failed", e); } finally { MDC.remove("component"); MDC.remove("serverId"); } }
// 处理选举失败,使用指数退避避免活锁 private void handleElectionFailure() { int attempts = electionAttempts.incrementAndGet(); // 指数退避 int backoffMs = Math.min(1000 * (1 << Math.min(attempts, 10)), 30000); // 添加随机抖动避免同步 backoffMs += ThreadLocalRandom.current().nextInt(backoffMs / 2); logger.info("Election attempt {} failed, backing off for {}ms", attempts, backoffMs); try { Thread.sleep(backoffMs); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.warn("Interrupted during election backoff"); } }
// 比较两个选票,返回负数表示v2更好,0表示相等,正数表示v1更好 private int compareVotes(Vote v1, Vote v2) { // 首先比较zxid,更大的zxid具有更高优先级 long zxidDiff = ZxidUtils.compareZxid(v1.getZxid(), v2.getZxid()); if (zxidDiff != 0) { return (int) Math.signum(zxidDiff); }
// zxid相等,比较serverId return v1.getServerId().compareTo(v2.getServerId()); }
// 内部类和工具方法...
static class Vote { private final String serverId; private final long zxid; private final long logicalClock;
public Vote(String serverId, long zxid, long logicalClock) { this.serverId = serverId; this.zxid = zxid; this.logicalClock = logicalClock; }
public String getServerId() { return serverId; }
public long getZxid() { return zxid; }
public long getLogicalClock() { return logicalClock; }
@Override public String toString() { return "Vote{serverId='" + serverId + "', zxid=" + Long.toHexString(zxid) + ", logicalClock=" + logicalClock + '}'; } }
// 自定义异常类 public static class LeaderElectionException extends RuntimeException { public LeaderElectionException(String message, Throwable cause) { super(message, cause); } }}
网络客户端实现示例
public class NettyNetworkClient implements NetworkClient { private final EventLoopGroup workerGroup; private final Bootstrap bootstrap; private final ConcurrentMap<String, Channel> channels = new ConcurrentHashMap<>(); private final int connectionTimeoutMs; private final Logger logger = LoggerFactory.getLogger(NettyNetworkClient.class);
public NettyNetworkClient(int connectionTimeoutMs) { this.connectionTimeoutMs = connectionTimeoutMs; this.workerGroup = new NioEventLoopGroup(); this.bootstrap = new Bootstrap() .group(workerGroup) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeoutMs) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline() .addLast(new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)) .addLast(new LengthFieldPrepender(4)) .addLast(new PacketEncoder()) .addLast(new PacketDecoder()) .addLast(new ClientHandler()); } }); }
@Override public void connect(String serverId, String address, int port) throws IOException { try { ChannelFuture future = bootstrap.connect(address, port); boolean connected = future.await(connectionTimeoutMs, TimeUnit.MILLISECONDS);
if (!connected || !future.isSuccess()) { throw new IOException("Failed to connect to " + serverId + " at " + address + ":" + port); }
channels.put(serverId, future.channel()); logger.info("Connected to server: {} at {}:{}", serverId, address, port); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException("Interrupted while connecting to " + serverId, e); } catch (Exception e) { throw new IOException("Failed to connect to " + serverId, e); } }
@Override public void disconnect(String serverId) { Channel channel = channels.remove(serverId); if (channel != null) { channel.close(); logger.info("Disconnected from server: {}", serverId); } }
@Override public ACK sendProposal(String serverId, ProposalPacket proposal) throws IOException { MDC.put("targetServerId", serverId); try { Channel channel = getChannel(serverId); RequestFuture<ACK> future = new RequestFuture<>();
// 存储请求-响应映射 Long requestId = generateRequestId(); RequestRegistry.register(requestId, future);
// 包装请求 Request request = new Request(requestId, RequestType.PROPOSAL, proposal);
// 发送请求 channel.writeAndFlush(request).sync();
// 等待响应 ACK ack = future.get(5, TimeUnit.SECONDS); if (ack == null) { throw new IOException("Received null ACK from " + serverId); }
return ack; } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException("Interrupted while sending proposal to " + serverId, e); } catch (TimeoutException e) { throw new IOException("Timed out waiting for ACK from " + serverId, e); } catch (ExecutionException e) { throw new IOException("Error sending proposal to " + serverId, e.getCause()); } finally { MDC.remove("targetServerId"); } }
@Override public void sendCommit(String serverId, CommitPacket commit) throws IOException { MDC.put("targetServerId", serverId); try { Channel channel = getChannel(serverId);
// 包装请求 Request request = new Request(generateRequestId(), RequestType.COMMIT, commit);
// 发送请求 - 不等待响应 channel.writeAndFlush(request); } catch (Exception e) { throw new IOException("Error sending commit to " + serverId, e); } finally { MDC.remove("targetServerId"); } }
@Override public LastZxidResponse sendEpochRequest(String serverId, EpochPacket epochPkt) throws IOException { MDC.put("targetServerId", serverId); try { Channel channel = getChannel(serverId); RequestFuture<LastZxidResponse> future = new RequestFuture<>();
// 存储请求-响应映射 Long requestId = generateRequestId(); RequestRegistry.register(requestId, future);
// 包装请求 Request request = new Request(requestId, RequestType.EPOCH, epochPkt);
// 发送请求 channel.writeAndFlush(request).sync();
// 等待响应 LastZxidResponse response = future.get(5, TimeUnit.SECONDS); if (response == null) { throw new IOException("Received null LastZxidResponse from " + serverId); }
return response; } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException("Interrupted while sending epoch request to " + serverId, e); } catch (TimeoutException e) { throw new IOException("Timed out waiting for LastZxidResponse from " + serverId, e); } catch (ExecutionException e) { throw new IOException("Error sending epoch request to " + serverId, e.getCause()); } finally { MDC.remove("targetServerId"); } }
// 实现其他接口方法...
@Override public void sendSnapshot(String serverId, byte[] snapshot, long zxid) throws IOException { MDC.put("targetServerId", serverId); try { Channel channel = getChannel(serverId);
// 由于快照可能很大,按块发送 int chunkSize = 1024 * 1024; // 1MB块 int totalChunks = (snapshot.length + chunkSize - 1) / chunkSize;
logger.info("Sending snapshot to {}, size: {} bytes, chunks: {}", serverId, snapshot.length, totalChunks);
// 发送快照元数据 SnapshotMetadata metadata = new SnapshotMetadata(zxid, snapshot.length, totalChunks); Request metadataRequest = new Request(generateRequestId(), RequestType.SNAPSHOT_META, metadata); channel.writeAndFlush(metadataRequest).sync();
// 分块发送快照数据 for (int i = 0; i < totalChunks; i++) { int offset = i * chunkSize; int length = Math.min(chunkSize, snapshot.length - offset); byte[] chunk = new byte[length]; System.arraycopy(snapshot, offset, chunk, 0, length);
SnapshotChunk snapshotChunk = new SnapshotChunk(i, totalChunks, chunk); Request chunkRequest = new Request(generateRequestId(), RequestType.SNAPSHOT_CHUNK, snapshotChunk);
channel.writeAndFlush(chunkRequest).sync();
if (i % 10 == 0 || i == totalChunks - 1) { logger.debug("Sent snapshot chunk {}/{} to {}", i + 1, totalChunks, serverId); } }
logger.info("Snapshot sent successfully to {}", serverId); } catch (Exception e) { throw new IOException("Error sending snapshot to " + serverId, e); } finally { MDC.remove("targetServerId"); } }
// 获取连接到指定服务器的通道 private Channel getChannel(String serverId) throws IOException { Channel channel = channels.get(serverId); if (channel == null || !channel.isActive()) { throw new IOException("No active connection to server: " + serverId); } return channel; }
// 生成唯一请求ID private static final AtomicLong requestIdGenerator = new AtomicLong(0);
private static Long generateRequestId() { return requestIdGenerator.incrementAndGet(); }
// 关闭客户端 public void shutdown() { // 关闭所有连接 for (Channel channel : channels.values()) { channel.close(); } channels.clear();
// 关闭事件循环组 workerGroup.shutdownGracefully(); }
// 请求类型 enum RequestType { PROPOSAL, COMMIT, EPOCH, TRUNCATE, TRANSACTION, NEWLEADER, HEARTBEAT, SNAPSHOT_META, SNAPSHOT_CHUNK }
// 请求对象 static class Request { private final Long id; private final RequestType type; private final Object payload;
public Request(Long id, RequestType type, Object payload) { this.id = id; this.type = type; this.payload = payload; }
public Long getId() { return id; }
public RequestType getType() { return type; }
public Object getPayload() { return payload; } }
// 快照元数据 static class SnapshotMetadata { private final long zxid; private final int totalSize; private final int totalChunks;
public SnapshotMetadata(long zxid, int totalSize, int totalChunks) { this.zxid = zxid; this.totalSize = totalSize; this.totalChunks = totalChunks; }
public long getZxid() { return zxid; }
public int getTotalSize() { return totalSize; }
public int getTotalChunks() { return totalChunks; } }
// 快照数据块 static class SnapshotChunk { private final int chunkIndex; private final int totalChunks; private final byte[] data;
public SnapshotChunk(int chunkIndex, int totalChunks, byte[] data) { this.chunkIndex = chunkIndex; this.totalChunks = totalChunks; this.data = data.clone(); // 防御性复制 }
public int getChunkIndex() { return chunkIndex; }
public int getTotalChunks() { return totalChunks; }
public byte[] getData() { return data.clone(); // 防御性复制 } }
// 请求-响应映射注册表 static class RequestRegistry { private static final ConcurrentMap<Long, RequestFuture<?>> futures = new ConcurrentHashMap<>();
public static <T> void register(Long requestId, RequestFuture<T> future) { futures.put(requestId, future); }
@SuppressWarnings("unchecked") public static <T> void complete(Long requestId, T response) { RequestFuture<T> future = (RequestFuture<T>) futures.remove(requestId); if (future != null) { future.complete(response); } }
public static void completeExceptionally(Long requestId, Throwable exception) { RequestFuture<?> future = futures.remove(requestId); if (future != null) { future.completeExceptionally(exception); } } }
// 请求Future static class RequestFuture<T> extends CompletableFuture<T> { // 继承CompletableFuture,无需额外实现 }
// 客户端处理器 private class ClientHandler extends SimpleChannelInboundHandler<Response> { @Override protected void channelRead0(ChannelHandlerContext ctx, Response response) { Long requestId = response.getRequestId(); if (response.isSuccess()) { RequestRegistry.complete(requestId, response.getPayload()); } else { RequestRegistry.completeExceptionally(requestId, new IOException("Request failed: " + response.getErrorMessage())); } }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { logger.error("Network client exception", cause); ctx.close(); } }
// 响应对象 static class Response { private final Long requestId; private final boolean success; private final Object payload; private final String errorMessage;
public Response(Long requestId, boolean success, Object payload, String errorMessage) { this.requestId = requestId; this.success = success; this.payload = payload; this.errorMessage = errorMessage; }
public Long getRequestId() { return requestId; }
public boolean isSuccess() { return success; }
public Object getPayload() { return payload; }
public String getErrorMessage() { return errorMessage; } }
// 编码器 static class PacketEncoder extends MessageToByteEncoder<Request> { @Override protected void encode(ChannelHandlerContext ctx, Request msg, ByteBuf out) throws Exception { // 使用协议缓冲区或自定义序列化 // 这里简化为示例 byte[] bytes = serializeRequest(msg); out.writeBytes(bytes); }
private byte[] serializeRequest(Request request) { // 实际实现应使用正式的序列化机制 // 这里简化为示例 return new byte[0]; } }
// 解码器 static class PacketDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // 使用协议缓冲区或自定义反序列化 // 这里简化为示例 if (in.readableBytes() >= 4) { // 至少包含长度字段 in.markReaderIndex(); int length = in.readInt();
if (in.readableBytes() < length) { in.resetReaderIndex(); return; }
byte[] data = new byte[length]; in.readBytes(data);
Response response = deserializeResponse(data); out.add(response); } }
private Response deserializeResponse(byte[] data) { // 实际实现应使用正式的反序列化机制 // 这里简化为示例 return null; } }}
三、Paxos 算法实现
核心接口定义
// 角色接口定义public interface Proposer { CompletableFuture<Boolean> prepare(int ballot); CompletableFuture<Boolean> propose(int ballot, Object value);}
public interface Acceptor { CompletableFuture<Promise> handlePrepare(int ballot); CompletableFuture<Accepted> handleAccept(int ballot, Object value);}
public interface Learner { void learn(long instanceId, int ballot, Object value);}
public interface NetworkClient { CompletableFuture<Promise> sendPrepare(int nodeId, int ballot); CompletableFuture<Accepted> sendAccept(int nodeId, int ballot, Object value); void sendLearn(int nodeId, long instanceId, int ballot, Object value); CompletableFuture<Map<Long, PrepareResponse>> sendPrepareAllInstances(int nodeId, int ballot); CompletableFuture<Void> sendSnapshot(int nodeId, byte[] snapshot, long lastInstanceId);}
public interface StateMachine { CompletableFuture<Void> apply(long instanceId, byte[] command); long getLastApplied(); CompletableFuture<byte[]> takeSnapshot(); CompletableFuture<Void> restoreSnapshot(byte[] snapshot, long instanceId);}
Basic Paxos 实现
public class BasicPaxosNode implements Proposer, Acceptor, Learner, AutoCloseable { private final int nodeId; private final AtomicInteger ballot = new AtomicInteger(0); private volatile Object proposalValue = null; private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); private volatile int acceptedBallot = 0; private volatile Object acceptedValue = null; private final int totalNodes; private final NetworkClient networkClient; private final Logger logger = LoggerFactory.getLogger(BasicPaxosNode.class); private final RetryStrategy retryStrategy; private final MetricsCollector metrics;
public BasicPaxosNode(int nodeId, int totalNodes, NetworkClient networkClient) { this.nodeId = nodeId; this.totalNodes = totalNodes; this.networkClient = networkClient; this.retryStrategy = new ExponentialBackoffRetry(100, 5000, 3); this.metrics = new MetricsCollector("paxos_basic", nodeId); }
// Proposer: 准备阶段 @Override public CompletableFuture<Boolean> prepare(int suggestedBallot) { final int newBallot = suggestedBallot > 0 ? suggestedBallot : generateNewBallot(); final Stopwatch stopwatch = Stopwatch.createStarted();
MDC.put("component", "paxos-proposer"); MDC.put("nodeId", String.valueOf(nodeId)); MDC.put("ballot", String.valueOf(newBallot)); logger.info("Starting prepare phase with ballot {}", newBallot);
CompletableFuture<Boolean> result = new CompletableFuture<>(); CompletableFuture.runAsync(() -> { try { // 向所有Acceptor发送Prepare请求 List<CompletableFuture<Promise>> futures = sendPrepare(newBallot);
// 收集结果 List<Promise> promises = new ArrayList<>(); for (CompletableFuture<Promise> future : futures) { try { Promise promise = future.get(3, TimeUnit.SECONDS); if (promise != null) { promises.add(promise); } } catch (Exception e) { logger.warn("Error getting prepare response", e); } }
// 如果获得多数派响应 int quorum = getQuorum(); int okCount = (int) promises.stream().filter(Promise::isOk).count();
if (okCount >= quorum) { // 更新ballot ballot.updateAndGet(current -> Math.max(current, newBallot));
// 选择已接受的最高编号提案的值 Promise highestPromise = selectHighestBallotPromise(promises); rwLock.writeLock().lock(); try { if (highestPromise != null && highestPromise.getAcceptedValue() != null) { proposalValue = highestPromise.getAcceptedValue(); logger.info("Using previously accepted value: {}", proposalValue); } } finally { rwLock.writeLock().unlock(); }
metrics.recordPrepareSuccess(stopwatch.elapsed(TimeUnit.MILLISECONDS)); result.complete(true); } else { logger.info("Failed to get quorum in prepare phase: {} of {} responses ok", okCount, promises.size()); metrics.recordPrepareFailed(); result.complete(false); } } catch (Exception e) { logger.error("Error in prepare phase", e); metrics.recordPrepareFailed(); result.completeExceptionally(e); } finally { MDC.remove("component"); MDC.remove("nodeId"); MDC.remove("ballot"); } });
return result; }
// Proposer: 接受阶段 @Override public CompletableFuture<Boolean> propose(int ballot, Object value) { final Stopwatch stopwatch = Stopwatch.createStarted();
MDC.put("component", "paxos-proposer"); MDC.put("nodeId", String.valueOf(nodeId)); MDC.put("ballot", String.valueOf(ballot));
return prepare(ballot).thenCompose(prepared -> { if (!prepared) { logger.info("Prepare phase failed, cannot proceed to propose"); metrics.recordProposeFailed(); return CompletableFuture.completedFuture(false); }
// 获取当前要提议的值 final Object valueToPropose; rwLock.readLock().lock(); try { // 如果准备阶段没有发现已接受的值,使用提议者的值 valueToPropose = proposalValue != null ? proposalValue : value; logger.info("Starting accept phase with ballot {} and value {}", ballot, valueToPropose); } finally { rwLock.readLock().unlock(); }
return CompletableFuture.supplyAsync(() -> { try { // 向所有Acceptor发送Accept请求 List<CompletableFuture<Accepted>> futures = sendAccept(ballot, valueToPropose);
// 收集结果 List<Accepted> responses = new ArrayList<>(); for (CompletableFuture<Accepted> future : futures) { try { Accepted accepted = future.get(3, TimeUnit.SECONDS); if (accepted != null) { responses.add(accepted); } } catch (Exception e) { logger.warn("Error getting accept response", e); } }
// 如果获得多数派接受 int quorum = getQuorum(); int accepted = (int) responses.stream().filter(Accepted::isOk).count(); boolean success = accepted >= quorum;
if (success) { logger.info("Value {} has been accepted by the majority ({} of {})", valueToPropose, accepted, responses.size());
// 通知所有Learner broadcastToLearners(1, ballot, valueToPropose); metrics.recordProposeSuccess(stopwatch.elapsed(TimeUnit.MILLISECONDS)); } else { logger.info("Failed to get quorum in accept phase: {} of {} responses ok", accepted, responses.size()); metrics.recordProposeFailed(); }
return success; } catch (Exception e) { logger.error("Error in propose phase", e); metrics.recordProposeFailed(); throw new CompletionException(e); } finally { MDC.remove("component"); MDC.remove("nodeId"); MDC.remove("ballot"); } }); }).exceptionally(e -> { logger.error("Failed to propose value", e); metrics.recordProposeFailed(); return false; }); }
// Acceptor: 处理Prepare请求 @Override public CompletableFuture<Promise> handlePrepare(int proposalBallot) { MDC.put("component", "paxos-acceptor"); MDC.put("nodeId", String.valueOf(nodeId)); MDC.put("ballot", String.valueOf(proposalBallot));
return CompletableFuture.supplyAsync(() -> { Promise promise = new Promise();
rwLock.writeLock().lock(); try { if (proposalBallot > acceptedBallot) { // 承诺不再接受编号小于等于proposalBallot的提案 acceptedBallot = proposalBallot; promise.setOk(true); promise.setAcceptedBallot(this.acceptedBallot); promise.setAcceptedValue(this.acceptedValue); logger.info("Acceptor {} promised ballot {}", nodeId, proposalBallot); metrics.recordPromiseMade(); } else { promise.setOk(false); logger.info("Acceptor {} rejected ballot {}, current ballot: {}", nodeId, proposalBallot, acceptedBallot); metrics.recordPromiseRejected(); } return promise; } finally { rwLock.writeLock().unlock(); MDC.remove("component"); MDC.remove("nodeId"); MDC.remove("ballot"); } }); }
// Acceptor: 处理Accept请求 @Override public CompletableFuture<Accepted> handleAccept(int proposalBallot, Object proposalValue) { MDC.put("component", "paxos-acceptor"); MDC.put("nodeId", String.valueOf(nodeId)); MDC.put("ballot", String.valueOf(proposalBallot));
return CompletableFuture.supplyAsync(() -> { Accepted accepted = new Accepted();
rwLock.writeLock().lock(); try { if (proposalBallot >= acceptedBallot) { acceptedBallot = proposalBallot; acceptedValue = proposalValue; accepted.setOk(true); logger.info("Acceptor {} accepted ballot {} with value {}", nodeId, proposalBallot, proposalValue); metrics.recordAcceptMade(); } else { accepted.setOk(false); logger.info("Acceptor {} rejected accept for ballot {}, current ballot: {}", nodeId, proposalBallot, acceptedBallot); metrics.recordAcceptRejected(); } return accepted; } finally { rwLock.writeLock().unlock(); MDC.remove("component"); MDC.remove("nodeId"); MDC.remove("ballot"); } }); }
// Learner: 学习已决议的值 @Override public void learn(long instanceId, int ballot, Object value) { MDC.put("component", "paxos-learner"); MDC.put("nodeId", String.valueOf(nodeId)); MDC.put("instanceId", String.valueOf(instanceId)); MDC.put("ballot", String.valueOf(ballot));
try { logger.info("Learner {} learned value {} for instance {} with ballot {}", nodeId, value, instanceId, ballot); metrics.recordLearnReceived();
// 实际实现中,这里会将学习到的值应用到状态机 // applyToStateMachine(instanceId, value); } finally { MDC.remove("component"); MDC.remove("nodeId"); MDC.remove("instanceId"); MDC.remove("ballot"); } }
// 发送Prepare请求给所有Acceptor private List<CompletableFuture<Promise>> sendPrepare(int newBallot) { List<CompletableFuture<Promise>> futures = new ArrayList<>();
for (int i = 0; i < totalNodes; i++) { final int targetNodeId = i; if (targetNodeId == this.nodeId) { // 处理本地请求 futures.add(handlePrepare(newBallot)); } else { // 发送远程请求 futures.add(networkClient.sendPrepare(targetNodeId, newBallot) .exceptionally(e -> { logger.error("Failed to send prepare to node {}", targetNodeId, e); return null; })); } }
return futures; }
// 发送Accept请求给所有Acceptor private List<CompletableFuture<Accepted>> sendAccept(int ballot, Object value) { List<CompletableFuture<Accepted>> futures = new ArrayList<>();
for (int i = 0; i < totalNodes; i++) { final int targetNodeId = i; if (targetNodeId == this.nodeId) { // 处理本地请求 futures.add(handleAccept(ballot, value)); } else { // 发送远程请求 futures.add(networkClient.sendAccept(targetNodeId, ballot, value) .exceptionally(e -> { logger.error("Failed to send accept to node {}", targetNodeId, e); return null; })); } }
return futures; }
// 通知所有Learner已决议的值 private void broadcastToLearners(long instanceId, int ballot, Object value) { for (int i = 0; i < totalNodes; i++) { final int targetNodeId = i; if (targetNodeId == this.nodeId) { // 本地学习 learn(instanceId, ballot, value); } else { // 异步通知其他Learner CompletableFuture.runAsync(() -> { try { networkClient.sendLearn(targetNodeId, instanceId, ballot, value); } catch (Exception e) { logger.error("Failed to notify learner {}", targetNodeId, e); } }); } } }
// 选择最高ballot的Promise private Promise selectHighestBallotPromise(List<Promise> promises) { return promises.stream() .filter(p -> p.isOk() && p.getAcceptedValue() != null) .max(Comparator.comparingInt(Promise::getAcceptedBallot)) .orElse(null); }
// 生成比当前更大的提案编号 (加入节点ID保证唯一性) private int generateNewBallot() { // 确保新ballot大于之前的,并且保证不同节点的ballot唯一 return ballot.incrementAndGet() * totalNodes + nodeId; }
// 获取多数派数量 private int getQuorum() { return totalNodes / 2 + 1; }
@Override public void close() { // 释放资源 metrics.close(); }
// Promise类 public static class Promise { private boolean ok; private int acceptedBallot; private Object acceptedValue;
public boolean isOk() { return ok; }
public void setOk(boolean ok) { this.ok = ok; }
public int getAcceptedBallot() { return acceptedBallot; }
public void setAcceptedBallot(int acceptedBallot) { this.acceptedBallot = acceptedBallot; }
public Object getAcceptedValue() { return acceptedValue; }
public void setAcceptedValue(Object acceptedValue) { this.acceptedValue = acceptedValue; } }
// Accepted类 public static class Accepted { private boolean ok;
public boolean isOk() { return ok; }
public void setOk(boolean ok) { this.ok = ok; } }
// 指标收集类 private static class MetricsCollector implements AutoCloseable { // 指标定义...
public MetricsCollector(String prefix, int nodeId) { // 初始化指标... }
public void recordPrepareSuccess(long latencyMs) { // 记录准备阶段成功 }
public void recordPrepareFailed() { // 记录准备阶段失败 }
public void recordProposeSuccess(long latencyMs) { // 记录提议阶段成功 }
public void recordProposeFailed() { // 记录提议阶段失败 }
public void recordPromiseMade() { // 记录承诺次数 }
public void recordPromiseRejected() { // 记录拒绝承诺次数 }
public void recordAcceptMade() { // 记录接受次数 }
public void recordAcceptRejected() { // 记录拒绝接受次数 }
public void recordLearnReceived() { // 记录学习次数 }
@Override public void close() { // 清理资源 } }
// 异常处理与重试策略 interface RetryStrategy { <T> CompletableFuture<T> retry(Supplier<CompletableFuture<T>> action); }
// 指数退避重试策略 static class ExponentialBackoffRetry implements RetryStrategy { private final long initialBackoffMs; private final long maxBackoffMs; private final int maxRetries; private final Logger logger = LoggerFactory.getLogger(ExponentialBackoffRetry.class);
public ExponentialBackoffRetry(long initialBackoffMs, long maxBackoffMs, int maxRetries) { this.initialBackoffMs = initialBackoffMs; this.maxBackoffMs = maxBackoffMs; this.maxRetries = maxRetries; }
@Override public <T> CompletableFuture<T> retry(Supplier<CompletableFuture<T>> action) { return retryInternal(action, 0); }
private <T> CompletableFuture<T> retryInternal(Supplier<CompletableFuture<T>> action, int attempt) { return action.get().exceptionally(e -> { if (attempt >= maxRetries) { throw new CompletionException( new RetryExhaustedException("Max retries exceeded", e)); }
long backoff = Math.min(initialBackoffMs * (long)Math.pow(2, attempt), maxBackoffMs); backoff += ThreadLocalRandom.current().nextInt((int)(backoff / 5));
logger.info("Retry attempt {} after {}ms due to: {}", attempt + 1, backoff, e.getMessage());
return CompletableFuture.delayedExecutor(backoff, TimeUnit.MILLISECONDS) .execute(() -> retryInternal(action, attempt + 1)) .join(); }); } }
// 自定义异常类 public static class RetryExhaustedException extends RuntimeException { public RetryExhaustedException(String message, Throwable cause) { super(message, cause); } }}
Multi-Paxos 实现
下面实现了 Multi-Paxos 的组件化架构,通过分离关注点提高代码的可维护性:
public class MultiPaxosSystem { private final int nodeId; private final Configuration config; private final MultiPaxosLog log; private final MultiPaxosStateMachine stateMachine; private final MultiPaxosNetworking networking; private final RoleManager roleManager; private final ScheduledExecutorService scheduler; private final Logger logger = LoggerFactory.getLogger(MultiPaxosSystem.class);
public MultiPaxosSystem(int nodeId, Configuration config) { this.nodeId = nodeId; this.config = config; this.log = new MultiPaxosLog(); this.stateMachine = new MultiPaxosStateMachine(); this.networking = new MultiPaxosNetworking(nodeId, config.getNodes()); this.roleManager = new RoleManager(this);
this.scheduler = Executors.newScheduledThreadPool(2, r -> { Thread t = new Thread(r, "multi-paxos-scheduler-" + nodeId); t.setDaemon(true); return t; });
// 启动日志应用线程 scheduler.scheduleWithFixedDelay(this::applyCommittedLogs, 100, 100, TimeUnit.MILLISECONDS);
// 启动Leader租约检查 scheduler.scheduleWithFixedDelay(this::checkLeaderLease, 1000, 1000, TimeUnit.MILLISECONDS); }
// 客户端API
// 追加新日志(写操作) public CompletableFuture<Boolean> appendLog(byte[] command) { if (!roleManager.isLeader()) { return CompletableFuture.failedFuture( new NotLeaderException("Not the leader", roleManager.getLeaderHint())); }
return roleManager.getLeaderRole().appendLog(command); }
// 读取操作 public CompletableFuture<byte[]> read(String key, ConsistencyLevel level) { switch (level) { case LINEARIZABLE: if (!roleManager.isLeader()) { return CompletableFuture.failedFuture( new NotLeaderException("Not the leader", roleManager.getLeaderHint())); } return roleManager.getLeaderRole().linearizableRead(key);
case SEQUENTIAL: return roleManager.getFollowerRole().sequentialRead(key);
case EVENTUAL: default: return roleManager.getFollowerRole().eventualRead(key); } }
// 尝试成为Leader public CompletableFuture<Boolean> electSelf() { return roleManager.electSelf(); }
// 日志应用 private void applyCommittedLogs() { try { long applied = stateMachine.getLastApplied(); long toApply = log.getCommitIndex();
if (applied >= toApply) { return; // 已全部应用 }
List<CompletableFuture<Void>> applyFutures = new ArrayList<>();
// 应用从applied+1到toApply的所有日志 for (long i = applied + 1; i <= toApply; i++) { final long instanceId = i; LogEntry entry = log.getEntry(instanceId);
if (entry != null && entry.isCommitted()) { applyFutures.add( stateMachine.apply(instanceId, entry.getCommand()) .thenRun(() -> { logger.debug("Applied log entry at instance {} to state machine", instanceId); }) .exceptionally(e -> { logger.error("Failed to apply log at instance {}", instanceId, e); return null; }) ); } }
// 等待所有应用完成 CompletableFuture.allOf(applyFutures.toArray(new CompletableFuture[0])) .thenRun(() -> { // 日志压缩 if (toApply - applied > 1000) { // 如果应用了大量日志,考虑压缩 log.compactLogs(stateMachine.getLastApplied()); } }) .exceptionally(e -> { logger.error("Error during log application", e); return null; }); } catch (Exception e) { logger.error("Error applying committed logs", e); } }
// 检查Leader租约 private void checkLeaderLease() { if (roleManager.isLeader()) { roleManager.getLeaderRole().checkLease(); } }
// 关闭系统 public void shutdown() { try { List<Runnable> pendingTasks = scheduler.shutdownNow(); if (!pendingTasks.isEmpty()) { logger.warn("Scheduler shutdown with {} pending tasks", pendingTasks.size()); }
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { logger.warn("Scheduler did not terminate in time"); }
networking.close(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.warn("Interrupted while waiting for scheduler termination"); } }
// 角色管理 public class RoleManager { private final MultiPaxosSystem system; private final AtomicBoolean isLeader = new AtomicBoolean(false); private final AtomicInteger currentBallot = new AtomicInteger(0); private volatile int leaderNodeId = -1; // -1表示未知
private final LeaderRole leaderRole; private final FollowerRole followerRole;
public RoleManager(MultiPaxosSystem system) { this.system = system; this.leaderRole = new LeaderRole(system); this.followerRole = new FollowerRole(system); }
public boolean isLeader() { return isLeader.get(); }
public int getLeaderHint() { return leaderNodeId; }
public LeaderRole getLeaderRole() { return leaderRole; }
public FollowerRole getFollowerRole() { return followerRole; }
public int getCurrentBallot() { return currentBallot.get(); }
public void setCurrentBallot(int ballot) { currentBallot.set(ballot); }
public CompletableFuture<Boolean> electSelf() { return leaderRole.electSelf().thenApply(elected -> { if (elected) { isLeader.set(true); leaderNodeId = nodeId; } return elected; }); }
public void stepDown() { if (isLeader.compareAndSet(true, false)) { logger.info("Node {} stepping down from leader", nodeId); } }
public void recognizeLeader(int leaderId, int ballot) { leaderNodeId = leaderId; currentBallot.set(ballot); if (leaderId != nodeId) { isLeader.set(false); } } }
// Leader角色实现 public class LeaderRole { private final MultiPaxosSystem system; private final AtomicLong leaseExpirationTime = new AtomicLong(0); private final long leaderLeaseMs = 5000; // 5秒租约
public LeaderRole(MultiPaxosSystem system) { this.system = system; }
// Leader选举 public CompletableFuture<Boolean> electSelf() { MDC.put("component", "multi-paxos-leader"); MDC.put("nodeId", String.valueOf(nodeId)); logger.info("Node {} attempting to become leader", nodeId);
try { int newBallot = generateNewBallot(); MDC.put("ballot", String.valueOf(newBallot));
return CompletableFuture.supplyAsync(() -> { try { // 执行Prepare阶段 Map<Long, PrepareResponse> responseMap = networking.sendPrepareForAllInstances(newBallot) .get(10, TimeUnit.SECONDS);
// 检查是否获得多数派支持 if (hasQuorumPromises(responseMap)) { // 根据收集到的信息,更新本地日志 updateLogFromPromises(responseMap);
// 成为Leader system.roleManager.setCurrentBallot(newBallot); system.roleManager.recognizeLeader(nodeId, newBallot);
logger.info("Node {} became leader with ballot {}", nodeId, newBallot); renewLease();
// 执行接受阶段,确保之前的日志得到多数派接受 confirmPendingLogs();
return true; } else { logger.info("Failed to become leader - did not get quorum promises"); return false; } } catch (Exception e) { logger.error("Error in become leader process", e); return false; } finally { MDC.remove("component"); MDC.remove("nodeId"); MDC.remove("ballot"); } }); } catch (Exception e) { logger.error("Error initiating leader election", e); MDC.remove("component"); MDC.remove("nodeId"); return CompletableFuture.failedFuture(e); } }
// Leader: 追加新日志 public CompletableFuture<Boolean> appendLog(byte[] command) { Stopwatch stopwatch = Stopwatch.createStarted(); MDC.put("component", "multi-paxos-leader"); MDC.put("nodeId", String.valueOf(nodeId));
if (!system.roleManager.isLeader()) { MDC.remove("component"); MDC.remove("nodeId"); return CompletableFuture.failedFuture( new NotLeaderException("Node is not the leader", system.roleManager.getLeaderHint())); }
try { long nextInstance = system.log.getNextInstanceId(); MDC.put("instanceId", String.valueOf(nextInstance)); logger.info("Leader {} appending log at instance {}", nodeId, nextInstance);
// 创建日志条目 int currentBallot = system.roleManager.getCurrentBallot(); LogEntry entry = new LogEntry(currentBallot, command.clone()); // 防御性复制
// 存储日志条目 system.log.setEntry(nextInstance, entry);
// 对于已有Leader,可以跳过Prepare阶段,直接进入Accept阶段 return CompletableFuture.supplyAsync(() -> { try { List<AcceptResponse> responses = networking.sendAcceptRequests( nextInstance, currentBallot, command) .get(5, TimeUnit.SECONDS);
// 如果多数派接受 int quorum = getQuorum(); if (countAccepts(responses) >= quorum) { // 提交日志 entry.setCommitted(true); system.log.updateCommitIndex(nextInstance);
// 通知所有节点提交 networking.sendCommitNotifications(nextInstance, currentBallot);
logger.info("Log entry at instance {} has been committed", nextInstance); return true; } else { logger.warn("Failed to get quorum for instance {}", nextInstance); // 可能失去了领导权,尝试重新选举 system.roleManager.stepDown(); return false; } } catch (Exception e) { logger.error("Error in append log", e); throw new CompletionException(e); } finally { MDC.remove("component"); MDC.remove("nodeId"); MDC.remove("instanceId"); } }); } catch (Exception e) { logger.error("Error initiating append log", e); MDC.remove("component"); MDC.remove("nodeId"); return CompletableFuture.failedFuture(e); } }
// 线性一致性读取(通过Leader确认) public CompletableFuture<byte[]> linearizableRead(String key) { if (!system.roleManager.isLeader()) { return CompletableFuture.failedFuture( new NotLeaderException("Not the leader", system.roleManager.getLeaderHint())); }
// 检查租约 if (System.currentTimeMillis() >= leaseExpirationTime.get()) { // 租约过期,需要重新确认Leader身份 return renewLease().thenCompose(renewed -> { if (!renewed) { return CompletableFuture.failedFuture( new ConsistencyException("Could not renew leadership lease")); } return system.stateMachine.read(key); }); }
// 租约有效,直接读取 return system.stateMachine.read(key); }
// 更新Leader租约 private CompletableFuture<Boolean> renewLease() { if (!system.roleManager.isLeader()) { return CompletableFuture.completedFuture(false); }
return CompletableFuture.supplyAsync(() -> { try { // 向多数派发送心跳以确认仍是Leader int currentBallot = system.roleManager.getCurrentBallot(); int responses = networking.sendLeadershipHeartbeats(currentBallot) .get(3, TimeUnit.SECONDS);
if (responses >= getQuorum()) { leaseExpirationTime.set(System.currentTimeMillis() + leaderLeaseMs); logger.debug("Renewed leader lease until {}", leaseExpirationTime.get()); return true; } else { logger.warn("Failed to renew leadership lease"); return false; } } catch (Exception e) { logger.error("Error renewing leadership lease", e); return false; } }); }
// 检查租约状态 public void checkLease() { if (!system.roleManager.isLeader()) { return; }
// 如果租约即将过期,尝试续期 long now = System.currentTimeMillis(); long expiration = leaseExpirationTime.get();
// 如果租约将在1秒内过期,提前续期 if (now + 1000 > expiration) { renewLease().thenAccept(renewed -> { if (!renewed) { logger.warn("Lease renewal failed, stepping down as leader"); system.roleManager.stepDown(); } }); } }
// 确保之前的日志条目被多数派接受 private void confirmPendingLogs() { // 实现逻辑... }
// 根据prepare响应更新日志 private void updateLogFromPromises(Map<Long, PrepareResponse> responseMap) { // 实现逻辑... }
// 检查是否获得多数派promise private boolean hasQuorumPromises(Map<Long, PrepareResponse> responseMap) { // 实现逻辑... return true; // 简化 }
// 统计accept响应 private int countAccepts(List<AcceptResponse> responses) { return (int) responses.stream() .filter(r -> r != null && r.isAccepted()) .count(); } }
// Follower角色实现 public class FollowerRole { private final MultiPaxosSystem system; private final Map<String, CacheEntry> readCache = new ConcurrentHashMap<>(); private final long maxCacheAgeMs = 5000; // 5秒缓存过期
public FollowerRole(MultiPaxosSystem system) { this.system = system; }
// 处理心跳消息 public void handleHeartbeat(int leaderBallot, int leaderNodeId, long leaderCommitIndex) { // 更新本地commit index system.log.updateCommitIndex(leaderCommitIndex);
// 如果自己认为自己是Leader但收到更高ballot的心跳,则退位 if (system.roleManager.isLeader() && leaderBallot > system.roleManager.getCurrentBallot()) { logger.info("Stepping down as leader due to heartbeat with higher ballot: {}", leaderBallot); system.roleManager.stepDown(); }
// 记录当前Leader system.roleManager.recognizeLeader(leaderNodeId, leaderBallot); }
// 顺序一致性读(确保看到所有之前的写入) public CompletableFuture<byte[]> sequentialRead(String key) { // 确保应用了所有已提交的事务 return ensureAppliedUpToCommitIndex() .thenCompose(v -> system.stateMachine.read(key)); }
// 最终一致性读(直接从本地读取) public CompletableFuture<byte[]> eventualRead(String key) { return system.stateMachine.read(key); }
// 确保应用到当前commitIndex private CompletableFuture<Void> ensureAppliedUpToCommitIndex() { long current = system.log.getCommitIndex(); long applied = system.stateMachine.getLastApplied();
if (applied >= current) { return CompletableFuture.completedFuture(null); // 已全部应用 }
// 等待应用完成 CompletableFuture<Void> result = new CompletableFuture<>(); scheduler.execute(() -> { try { // 触发应用 system.applyCommittedLogs();
// 检查是否应用完成 if (system.stateMachine.getLastApplied() >= current) { result.complete(null); } else { // 可能有一些延迟,再次检查 scheduler.schedule(() -> { system.applyCommittedLogs(); result.complete(null); }, 50, TimeUnit.MILLISECONDS); } } catch (Exception e) { result.completeExceptionally(e); } });
return result; }
// 清理过期缓存 public void cleanupReadCache() { long now = System.currentTimeMillis(); // 移除过期条目 readCache.entrySet().removeIf(entry -> now - entry.getValue().getTimestamp() > maxCacheAgeMs);
// 如果缓存过大,移除最旧的条目 if (readCache.size() > config.getMaxCacheSize()) { List<String> oldestKeys = readCache.entrySet().stream() .sorted(Comparator.comparingLong(e -> e.getValue().getTimestamp())) .limit(readCache.size() - config.getMaxCacheSize()) .map(Map.Entry::getKey) .collect(Collectors.toList());
for (String key : oldestKeys) { readCache.remove(key); } logger.info("Cache cleanup: removed {} old entries", oldestKeys.size()); } } }
// 内部组件类
// 日志管理 public static class MultiPaxosLog { private final ReadWriteLock logLock = new ReentrantReadWriteLock(); private final ConcurrentNavigableMap<Long, LogEntry> log = new ConcurrentSkipListMap<>(); private final AtomicLong nextInstanceId = new AtomicLong(1); private final AtomicLong commitIndex = new AtomicLong(0); private final Logger logger = LoggerFactory.getLogger(MultiPaxosLog.class);
public LogEntry getEntry(long index) { logLock.readLock().lock(); try { return log.get(index); } finally { logLock.readLock().unlock(); } }
public void setEntry(long index, LogEntry entry) { logLock.writeLock().lock(); try { log.put(index, entry); nextInstanceId.updateAndGet(current -> Math.max(current, index + 1)); } finally { logLock.writeLock().unlock(); } }
public long getNextInstanceId() { return nextInstanceId.getAndIncrement(); }
public long getCommitIndex() { return commitIndex.get(); }
public void updateCommitIndex(long newCommitIndex) { // 原子更新提交索引,确保只增不减 commitIndex.updateAndGet(current -> Math.max(current, newCommitIndex)); }
// 日志压缩 public void compactLogs(long appliedIndex) { // 保留最近的日志,删除旧日志 final int retentionWindow = 1000; // 保留最近1000条 long truncatePoint = appliedIndex - retentionWindow;
if (truncatePoint <= 0) { return; // 不需要压缩 }
logLock.writeLock().lock(); try { List<Long> toRemove = log.keySet().stream() .filter(idx -> idx < truncatePoint) .collect(Collectors.toList());
for (Long idx : toRemove) { log.remove(idx); }
logger.info("Compacted {} log entries before index {}", toRemove.size(), truncatePoint); } finally { logLock.writeLock().unlock(); } } }
// 状态机实现 public static class MultiPaxosStateMachine { private final AtomicLong lastApplied = new AtomicLong(0); private final Map<String, byte[]> keyValueStore = new ConcurrentHashMap<>(); private final Logger logger = LoggerFactory.getLogger(MultiPaxosStateMachine.class);
public CompletableFuture<Void> apply(long instanceId, byte[] command) { return CompletableFuture.runAsync(() -> { try { // 解析命令 Command cmd = deserializeCommand(command);
// 应用到状态机 if (cmd.getType() == CommandType.PUT) { keyValueStore.put(cmd.getKey(), cmd.getValue()); } else if (cmd.getType() == CommandType.DELETE) { keyValueStore.remove(cmd.getKey()); }
// 更新已应用索引 lastApplied.updateAndGet(current -> Math.max(current, instanceId)); } catch (Exception e) { logger.error("Error applying command at instance {}", instanceId, e); throw new CompletionException(e); } }); }
public CompletableFuture<byte[]> read(String key) { return CompletableFuture.supplyAsync(() -> { byte[] value = keyValueStore.get(key); return value != null ? value.clone() : null; // 防御性复制 }); }
public long getLastApplied() { return lastApplied.get(); }
public CompletableFuture<byte[]> takeSnapshot() { return CompletableFuture.supplyAsync(() -> { try { // 创建状态机快照 return serializeState(); } catch (Exception e) { logger.error("Error taking snapshot", e); throw new CompletionException(e); } }); }
public CompletableFuture<Void> restoreSnapshot(byte[] snapshot, long instanceId) { return CompletableFuture.runAsync(() -> { try { // 从快照恢复状态 deserializeState(snapshot);
// 更新已应用索引 lastApplied.set(instanceId); } catch (Exception e) { logger.error("Error restoring snapshot", e); throw new CompletionException(e); } }); }
// 序列化和反序列化辅助方法 private Command deserializeCommand(byte[] data) { // 实际实现应使用正式的序列化机制 return new Command(CommandType.PUT, "key", data); // 简化示例 }
private byte[] serializeState() { // 实际实现应使用正式的序列化机制 return new byte[0]; // 简化示例 }
private void deserializeState(byte[] data) { // 实际实现应使用正式的序列化机制 // 简化示例 } }
// 网络层 public static class MultiPaxosNetworking implements AutoCloseable { private final int nodeId; private final Map<Integer, NodeInfo> nodes; private final NetworkClient client; private final Logger logger = LoggerFactory.getLogger(MultiPaxosNetworking.class);
public MultiPaxosNetworking(int nodeId, Map<Integer, NodeInfo> nodes) { this.nodeId = nodeId; this.nodes = new HashMap<>(nodes); this.client = createNetworkClient(); }
private NetworkClient createNetworkClient() { // 实际实现应创建合适的网络客户端 return new NetworkClientImpl(); }
public CompletableFuture<Map<Long, PrepareResponse>> sendPrepareForAllInstances(int ballot) { // 实现逻辑... return CompletableFuture.completedFuture(new HashMap<>()); }
public CompletableFuture<List<AcceptResponse>> sendAcceptRequests( long instanceId, int ballot, byte[] command) { // 实现逻辑... return CompletableFuture.completedFuture(new ArrayList<>()); }
public CompletableFuture<Integer> sendLeadershipHeartbeats(int ballot) { // 实现逻辑... return CompletableFuture.completedFuture(0); }
public void sendCommitNotifications(long instanceId, int ballot) { // 实现逻辑... }
@Override public void close() { // 关闭网络客户端 } }
// 生成新的ballot,确保全局唯一性 private int generateNewBallot() { // 确保新ballot大于之前的,并且不同节点生成的ballot唯一 int currentBallot = roleManager.getCurrentBallot(); return (currentBallot / config.getTotalNodes() + 1) * config.getTotalNodes() + nodeId; }
// 获取多数派数量 private int getQuorum() { return config.getTotalNodes() / 2 + 1; }
// 日志条目 public static class LogEntry { private int ballot; private final byte[] command; private volatile boolean committed;
LogEntry(int ballot, byte[] command) { this.ballot = ballot; this.command = command.clone(); // 防御性复制 this.committed = false; }
public int getBallot() { return ballot; }
public void setBallot(int ballot) { this.ballot = ballot; }
public byte[] getCommand() { return command.clone(); // 防御性复制 }
public boolean isCommitted() { return committed; }
public void setCommitted(boolean committed) { this.committed = committed; } }
// 配置类 public static class Configuration { private final int totalNodes; private final Map<Integer, NodeInfo> nodes; private final int maxCacheSize;
public Configuration(int totalNodes, Map<Integer, NodeInfo> nodes, int maxCacheSize) { this.totalNodes = totalNodes; this.nodes = new HashMap<>(nodes); this.maxCacheSize = maxCacheSize; }
public int getTotalNodes() { return totalNodes; }
public Map<Integer, NodeInfo> getNodes() { return Collections.unmodifiableMap(nodes); }
public int getMaxCacheSize() { return maxCacheSize; } }
// 节点信息 public static class NodeInfo { private final int id; private final String host; private final int port;
public NodeInfo(int id, String host, int port) { this.id = id; this.host = host; this.port = port; }
public int getId() { return id; }
public String getHost() { return host; }
public int getPort() { return port; } }
// 命令类型 enum CommandType { PUT, DELETE }
// 命令对象 static class Command { private final CommandType type; private final String key; private final byte[] value;
public Command(CommandType type, String key, byte[] value) { this.type = type; this.key = key; this.value = value != null ? value.clone() : null; // 防御性复制 }
public CommandType getType() { return type; }
public String getKey() { return key; }
public byte[] getValue() { return value != null ? value.clone() : null; // 防御性复制 } }
// 响应类 public static class PrepareResponse { // 实现... }
public static class AcceptResponse { private final boolean accepted;
public AcceptResponse(boolean accepted) { this.accepted = accepted; }
public boolean isAccepted() { return accepted; } }
// 一致性级别 public enum ConsistencyLevel { LINEARIZABLE, // 线性一致性 SEQUENTIAL, // 顺序一致性 EVENTUAL // 最终一致性 }
// 异常类 public static class NotLeaderException extends RuntimeException { private final int leaderHint;
public NotLeaderException(String message, int leaderHint) { super(message); this.leaderHint = leaderHint; }
public int getLeaderHint() { return leaderHint; } }
public static class ConsistencyException extends RuntimeException { public ConsistencyException(String message) { super(message); } }
// 简化的网络客户端实现 private static class NetworkClientImpl implements NetworkClient { // 实现网络接口...
@Override public CompletableFuture<Promise> sendPrepare(int nodeId, int ballot) { return null; }
@Override public CompletableFuture<Accepted> sendAccept(int nodeId, int ballot, Object value) { return null; }
@Override public void sendLearn(int nodeId, long instanceId, int ballot, Object value) {
}
@Override public CompletableFuture<Map<Long, PrepareResponse>> sendPrepareAllInstances(int nodeId, int ballot) { return null; }
@Override public CompletableFuture<Void> sendSnapshot(int nodeId, byte[] snapshot, long lastInstanceId) { return null; } }}
四、网络分区处理与成员变更
网络分区检测
public class PartitionHandler implements AutoCloseable { private final String nodeId; private final AtomicLong lastHeartbeatTime = new AtomicLong(0); private final AtomicBoolean suspectPartition = new AtomicBoolean(false); private final ScheduledExecutorService scheduler; private final long heartbeatTimeoutMs; private final Consumer<PartitionEvent> partitionCallback; private final Logger logger = LoggerFactory.getLogger(PartitionHandler.class);
public PartitionHandler(String nodeId, long heartbeatTimeoutMs, Consumer<PartitionEvent> partitionCallback) { this.nodeId = nodeId; this.heartbeatTimeoutMs = heartbeatTimeoutMs; this.partitionCallback = partitionCallback; this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "partition-detector-" + nodeId); t.setDaemon(true); return t; });
// 启动心跳检测任务 scheduler.scheduleAtFixedRate( this::checkHeartbeat, heartbeatTimeoutMs / 2, heartbeatTimeoutMs / 2, TimeUnit.MILLISECONDS ); }
// 记录收到心跳 public void recordHeartbeat() { lastHeartbeatTime.set(System.currentTimeMillis()); if (suspectPartition.compareAndSet(true, false)) { logger.info("Node {} no longer suspects network partition", nodeId); partitionCallback.accept(new PartitionEvent(PartitionStatus.RECOVERED, nodeId)); } }
// 检查心跳超时 private void checkHeartbeat() { try { long now = System.currentTimeMillis(); long last = lastHeartbeatTime.get();
if (last > 0 && now - last > heartbeatTimeoutMs) { // 可能存在网络分区 if (suspectPartition.compareAndSet(false, true)) { logger.warn("Node {} suspects network partition, last heartbeat: {}ms ago", nodeId, now - last);
// 执行分区检测回调 partitionCallback.accept(new PartitionEvent(PartitionStatus.SUSPECTED, nodeId)); } } } catch (Exception e) { logger.error("Error checking heartbeat", e); } }
@Override public void close() { scheduler.shutdownNow(); try { if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) { logger.warn("Partition detector scheduler did not terminate in time"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.warn("Interrupted while shutting down partition detector"); } }
// 分区状态枚举 public enum PartitionStatus { SUSPECTED, // 怀疑发生分区 CONFIRMED, // 确认发生分区 RECOVERED // 分区已恢复 }
// 分区事件类 public static class PartitionEvent { private final PartitionStatus status; private final String nodeId;
public PartitionEvent(PartitionStatus status, String nodeId) { this.status = status; this.nodeId = nodeId; }
public PartitionStatus getStatus() { return status; }
public String getNodeId() { return nodeId; } }}
成员变更实现
public class MembershipManager implements AutoCloseable { private final ConcurrentMap<String, ServerInfo> servers = new ConcurrentHashMap<>(); private volatile Configuration currentConfig; private final AtomicLong configVersion = new AtomicLong(0); private final String nodeId; private final AtomicBoolean isLeader = new AtomicBoolean(false); private final Logger logger = LoggerFactory.getLogger(MembershipManager.class); private final ConfigurationStore configStore; private final NetworkClient networkClient; private final StampedLock configLock = new StampedLock();
public MembershipManager(String nodeId, boolean isLeader, ConfigurationStore configStore, NetworkClient networkClient) { this.nodeId = nodeId; this.isLeader.set(isLeader); this.configStore = configStore; this.networkClient = networkClient;
// 初始化配置 try { this.currentConfig = configStore.loadConfiguration(); if (this.currentConfig == null) { this.currentConfig = new Configuration(configVersion.get(), new HashMap<>()); } servers.putAll(currentConfig.getServers()); } catch (IOException e) { logger.error("Failed to load configuration", e); this.currentConfig = new Configuration(configVersion.get(), new HashMap<>()); } }
// 两阶段成员变更 - 安全添加节点 public CompletableFuture<Boolean> addServer(String serverId, String address, int port) { MDC.put("component", "membership-manager"); MDC.put("nodeId", nodeId); MDC.put("targetServerId", serverId);
if (!isLeader.get()) { logger.warn("Only leader can change membership"); MDC.remove("component"); MDC.remove("nodeId"); MDC.remove("targetServerId"); return CompletableFuture.failedFuture( new IllegalStateException("Only leader can change membership")); }
CompletableFuture<Boolean> result = new CompletableFuture<>();
CompletableFuture.runAsync(() -> { long stamp = configLock.writeLock(); try { logger.info("Starting server addition: {}", serverId);
// 第一阶段:创建过渡配置(包含新旧所有节点) Configuration oldConfig = currentConfig; Configuration jointConfig = createJointConfig(oldConfig, serverId, address, port);
// 将过渡配置提交给集群 commitConfiguration(jointConfig).thenAccept(committed -> { if (!committed) { logger.warn("Failed to commit joint configuration for server {}", serverId); result.complete(false); return; }
logger.info("Joint configuration committed, proceeding to second phase");
// 第二阶段:创建新配置(确认包含新节点) Configuration newConfig = createNewConfig(jointConfig);
// 将新配置提交给集群 commitConfiguration(newConfig).thenAccept(finalCommitted -> { if (finalCommitted) { logger.info("Server {} successfully added to cluster", serverId); } else { logger.warn("Failed to commit final configuration for server {}", serverId); } result.complete(finalCommitted); }).exceptionally(e -> { logger.error("Error committing final configuration for server {}", serverId, e); result.completeExceptionally(e); return null; }); }).exceptionally(e -> { logger.error("Error committing joint configuration for server {}", serverId, e); result.completeExceptionally(e); return null; }); } catch (Exception e) { logger.error("Error adding server {}", serverId, e); result.completeExceptionally(e); } finally { configLock.unlockWrite(stamp); MDC.remove("component"); MDC.remove("nodeId"); MDC.remove("targetServerId"); } });
return result; }
// 两阶段成员变更 - 安全移除节点 public CompletableFuture<Boolean> removeServer(String serverId) { MDC.put("component", "membership-manager"); MDC.put("nodeId", nodeId); MDC.put("targetServerId", serverId);
if (!isLeader.get()) { logger.warn("Only leader can change membership"); MDC.remove("component"); MDC.remove("nodeId"); MDC.remove("targetServerId"); return CompletableFuture.failedFuture( new IllegalStateException("Only leader can change membership")); }
if (!servers.containsKey(serverId)) { logger.warn("Server {} not found in configuration", serverId); MDC.remove("component"); MDC.remove("nodeId"); MDC.remove("targetServerId"); return CompletableFuture.completedFuture(false); }
CompletableFuture<Boolean> result = new CompletableFuture<>();
CompletableFuture.runAsync(() -> { long stamp = configLock.writeLock(); try { logger.info("Starting server removal: {}", serverId);
// 第一阶段:创建过渡配置(标记要移除的节点) Configuration oldConfig = currentConfig; Configuration jointConfig = createJointConfig(oldConfig, serverId);
// 将过渡配置提交给集群 commitConfiguration(jointConfig).thenAccept(committed -> { if (!committed) { logger.warn("Failed to commit joint configuration for removing server {}", serverId); result.complete(false); return; }
logger.info("Joint configuration committed, proceeding to second phase");
// 第二阶段:创建新配置(移除目标节点) Configuration newConfig = createNewConfigWithout(jointConfig, serverId);
// 将新配置提交给集群 commitConfiguration(newConfig).thenAccept(finalCommitted -> { if (finalCommitted) { logger.info("Server {} successfully removed from cluster", serverId); } else { logger.warn("Failed to commit final configuration for removing server {}", serverId); } result.complete(finalCommitted); }).exceptionally(e -> { logger.error("Error committing final configuration for removing server {}", serverId, e); result.completeExceptionally(e); return null; }); }).exceptionally(e -> { logger.error("Error committing joint configuration for removing server {}", serverId, e); result.completeExceptionally(e); return null; }); } catch (Exception e) { logger.error("Error removing server {}", serverId, e); result.completeExceptionally(e); } finally { configLock.unlockWrite(stamp); MDC.remove("component"); MDC.remove("nodeId"); MDC.remove("targetServerId"); } });
return result; }
// 创建过渡配置(添加节点) private Configuration createJointConfig(Configuration oldConfig, String newServerId, String address, int port) { Map<String, ServerInfo> newServers = new HashMap<>(oldConfig.getServers()); newServers.put(newServerId, new ServerInfo(newServerId, address, port));
return new Configuration(configVersion.incrementAndGet(), newServers); }
// 创建过渡配置(删除节点) private Configuration createJointConfig(Configuration oldConfig, String serverId) { // 标记要删除的节点(在过渡配置中仍存在,但标记为待移除) Map<String, ServerInfo> jointServers = new HashMap<>(oldConfig.getServers()); ServerInfo serverInfo = jointServers.get(serverId); if (serverInfo != null) { ServerInfo markedServer = new ServerInfo( serverId, serverInfo.getAddress(), serverInfo.getPort(), true); jointServers.put(serverId, markedServer); }
return new Configuration(configVersion.incrementAndGet(), jointServers); }
// 创建新配置(确认添加节点) private Configuration createNewConfig(Configuration jointConfig) { // 最终配置,清除所有标记 Map<String, ServerInfo> newServers = new HashMap<>(); for (var entry : jointConfig.getServers().entrySet()) { if (!entry.getValue().isMarkedForRemoval()) { newServers.put(entry.getKey(), new ServerInfo( entry.getValue().getId(), entry.getValue().getAddress(), entry.getValue().getPort(), false )); } }
return new Configuration(configVersion.incrementAndGet(), newServers); }
// 创建新配置(确认删除节点) private Configuration createNewConfigWithout(Configuration jointConfig, String serverId) { Map<String, ServerInfo> newServers = new HashMap<>(); for (var entry : jointConfig.getServers().entrySet()) { if (!entry.getKey().equals(serverId) && !entry.getValue().isMarkedForRemoval()) { newServers.put(entry.getKey(), new ServerInfo( entry.getValue().getId(), entry.getValue().getAddress(), entry.getValue().getPort(), false )); } }
return new Configuration(configVersion.incrementAndGet(), newServers); }
// 提交配置变更 private CompletableFuture<Boolean> commitConfiguration(Configuration config) { return CompletableFuture.supplyAsync(() -> { try { // 实际实现会通过共识算法提交配置变更 logger.info("Committing configuration version {}", config.getVersion());
// 持久化配置 configStore.saveConfiguration(config);
// 更新本地配置 synchronized (this) { currentConfig = config; servers.clear(); servers.putAll(config.getServers()); }
// 广播配置变更 broadcastConfigChange(config);
return true; } catch (Exception e) { logger.error("Error committing configuration", e); return false; } }); }
// 广播配置变更 private void broadcastConfigChange(Configuration config) { // 向所有节点广播配置变更 for (String serverId : servers.keySet()) { if (!serverId.equals(nodeId)) { CompletableFuture.runAsync(() -> { try { // 实际实现中调用网络客户端发送配置 notifyConfigChange(serverId, config); } catch (Exception e) { logger.error("Failed to notify server {} of config change", serverId, e); } }); } } }
// 通知节点配置变更 private void notifyConfigChange(String serverId, Configuration config) { // 实际实现会发送配置给指定节点 logger.debug("Notifying server {} of configuration change to version {}", serverId, config.getVersion()); }
// 处理接收到的配置变更 public void handleConfigChange(Configuration newConfig) { long stamp = configLock.writeLock(); try { if (newConfig.getVersion() > currentConfig.getVersion()) { try { // 持久化新配置 configStore.saveConfiguration(newConfig);
// 更新本地配置 currentConfig = newConfig; servers.clear(); servers.putAll(newConfig.getServers());
logger.info("Updated to new configuration version {}", newConfig.getVersion()); } catch (IOException e) { logger.error("Failed to persist new configuration", e); } } else { logger.debug("Ignoring old configuration version {} (current is {})", newConfig.getVersion(), currentConfig.getVersion()); } } finally { configLock.unlockWrite(stamp); } }
// 获取当前配置 public Configuration getCurrentConfig() { long stamp = configLock.tryOptimisticRead(); Configuration config = currentConfig;
if (!configLock.validate(stamp)) { stamp = configLock.readLock(); try { config = currentConfig; } finally { configLock.unlockRead(stamp); } }
return config; }
// 检查节点是否在配置中(不包括标记为移除的节点) public boolean isServerInConfig(String serverId) { ServerInfo info = servers.get(serverId); return info != null && !info.isMarkedForRemoval(); }
// 获取有效服务器数量(不包括标记为移除的节点) public int getActiveServerCount() { return (int) servers.values().stream() .filter(s -> !s.isMarkedForRemoval()) .count(); }
// 设置Leader状态 public void setLeader(boolean isLeader) { this.isLeader.set(isLeader); }
@Override public void close() { // 释放资源 }
// 配置类 public static class Configuration implements Serializable { private static final long serialVersionUID = 1L;
private final long version; private final Map<String, ServerInfo> servers;
public Configuration(long version, Map<String, ServerInfo> servers) { this.version = version; this.servers = new HashMap<>(servers); }
public long getVersion() { return version; }
public Map<String, ServerInfo> getServers() { return Collections.unmodifiableMap(servers); } }
// 服务器信息 public static class ServerInfo implements Serializable { private static final long serialVersionUID = 1L;
private final String id; private final String address; private final int port; private final boolean markedForRemoval;
public ServerInfo(String id, String address, int port) { this(id, address, port, false); }
public ServerInfo(String id, String address, int port, boolean markedForRemoval) { this.id = id; this.address = address; this.port = port; this.markedForRemoval = markedForRemoval; }
public String getId() { return id; }
public String getAddress() { return address; }
public int getPort() { return port; }
public boolean isMarkedForRemoval() { return markedForRemoval; } }}
配置存储实现
public class FileBasedConfigurationStore implements ConfigurationStore { private final Path configPath; private final Path snapshotDir; private final Logger logger = LoggerFactory.getLogger(FileBasedConfigurationStore.class);
public FileBasedConfigurationStore(Path configPath, Path snapshotDir) { this.configPath = configPath; this.snapshotDir = snapshotDir;
try { Files.createDirectories(configPath.getParent()); Files.createDirectories(snapshotDir); } catch (IOException e) { logger.error("Failed to create directories", e); throw new UncheckedIOException("Failed to create directories", e); } }
@Override public void saveConfiguration(MembershipManager.Configuration config) throws IOException { // 使用原子写入保证一致性 Path tempPath = configPath.resolveSibling(configPath.getFileName() + ".tmp"); try (ObjectOutputStream oos = new ObjectOutputStream( new BufferedOutputStream(Files.newOutputStream(tempPath)))) { oos.writeObject(config); oos.flush(); Files.move(tempPath, configPath, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
logger.info("Configuration version {} saved successfully", config.getVersion()); } catch (IOException e) { logger.error("Failed to save configuration", e); throw e; } }
@Override public MembershipManager.Configuration loadConfiguration() throws IOException { if (!Files.exists(configPath)) { logger.info("Configuration file does not exist: {}", configPath); return null; }
try (ObjectInputStream ois = new ObjectInputStream( new BufferedInputStream(Files.newInputStream(configPath)))) { MembershipManager.Configuration config = (MembershipManager.Configuration) ois.readObject(); logger.info("Loaded configuration version {}", config.getVersion()); return config; } catch (ClassNotFoundException e) { logger.error("Failed to deserialize configuration", e); throw new IOException("Failed to deserialize configuration", e); } }
@Override public void saveSnapshot(long index, byte[] data) throws IOException { // 创建快照文件名,包含索引 String snapshotFileName = String.format("snapshot-%020d.bin", index); Path snapshotPath = snapshotDir.resolve(snapshotFileName); Path tempPath = snapshotDir.resolve(snapshotFileName + ".tmp");
try { // 写入临时文件 Files.write(tempPath, data);
// 原子移动 Files.move(tempPath, snapshotPath, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
logger.info("Snapshot at index {} saved successfully, size: {} bytes", index, data.length);
// 清理旧快照,保留最近的5个 cleanupOldSnapshots(5); } catch (IOException e) { logger.error("Failed to save snapshot at index {}", index, e); throw e; } }
@Override public SnapshotInfo loadLatestSnapshot() throws IOException { try { // 查找最新的快照文件 Optional<Path> latestSnapshot = Files.list(snapshotDir) .filter(p -> p.getFileName().toString().startsWith("snapshot-") && p.getFileName().toString().endsWith(".bin")) .max(Comparator.comparing(p -> p.getFileName().toString()));
if (latestSnapshot.isPresent()) { Path snapshotPath = latestSnapshot.get(); String fileName = snapshotPath.getFileName().toString();
// 从文件名中提取索引 long index = Long.parseLong(fileName.substring(9, 29));
// 读取快照数据 byte[] data = Files.readAllBytes(snapshotPath);
logger.info("Loaded snapshot at index {}, size: {} bytes", index, data.length); return new SnapshotInfo(index, data); } else { logger.info("No snapshot found in directory: {}", snapshotDir); return null; } } catch (IOException e) { logger.error("Failed to load latest snapshot", e); throw e; } }
// 清理旧快照,只保留最新的n个 private void cleanupOldSnapshots(int keepCount) throws IOException { try { List<Path> snapshots = Files.list(snapshotDir) .filter(p -> p.getFileName().toString().startsWith("snapshot-") && p.getFileName().toString().endsWith(".bin")) .sorted(Comparator.comparing(p -> p.getFileName().toString())) .collect(Collectors.toList());
// 如果快照数量超过保留数量,删除旧的 if (snapshots.size() > keepCount) { int toDelete = snapshots.size() - keepCount; for (int i = 0; i < toDelete; i++) { Files.delete(snapshots.get(i)); logger.info("Deleted old snapshot: {}", snapshots.get(i).getFileName()); } } } catch (IOException e) { logger.error("Failed to cleanup old snapshots", e); throw e; } }
// 快照信息类 public static class SnapshotInfo { private final long index; private final byte[] data;
public SnapshotInfo(long index, byte[] data) { this.index = index; this.data = data.clone(); // 防御性复制 }
public long getIndex() { return index; }
public byte[] getData() { return data.clone(); // 防御性复制 } }}
// 配置存储接口public interface ConfigurationStore { void saveConfiguration(MembershipManager.Configuration config) throws IOException; MembershipManager.Configuration loadConfiguration() throws IOException; void saveSnapshot(long index, byte[] data) throws IOException; SnapshotInfo loadLatestSnapshot() throws IOException;
// 快照信息内部类定义 class SnapshotInfo { private final long index; private final byte[] data;
public SnapshotInfo(long index, byte[] data) { this.index = index; this.data = data.clone(); }
public long getIndex() { return index; }
public byte[] getData() { return data.clone(); } }}
跨数据中心复制支持
public class CrossDCReplication implements AutoCloseable { private final String localDC; private final List<String> allDCs; private final Map<String, DCConnection> dcConnections; private final ConsensusSystem localSystem; private final Logger logger = LoggerFactory.getLogger(CrossDCReplication.class); private final ScheduledExecutorService scheduler; private final AtomicLong replicationIndex = new AtomicLong(0); private final ConcurrentMap<String, AtomicLong> dcReplicationProgress = new ConcurrentHashMap<>();
public CrossDCReplication(String localDC, List<String> allDCs, ConsensusSystem localSystem, Map<String, DCConnectionConfig> dcConfigs) { this.localDC = localDC; this.allDCs = new ArrayList<>(allDCs); this.localSystem = localSystem; this.dcConnections = new HashMap<>();
// 初始化数据中心连接 for (String dc : allDCs) { if (!dc.equals(localDC)) { DCConnectionConfig config = dcConfigs.get(dc); if (config != null) { dcConnections.put(dc, new DCConnection(dc, config)); dcReplicationProgress.put(dc, new AtomicLong(0)); } } }
this.scheduler = Executors.newScheduledThreadPool(2, r -> { Thread t = new Thread(r, "dc-replication-scheduler"); t.setDaemon(true); return t; });
// 启动定期复制任务 scheduler.scheduleWithFixedDelay( this::replicateChanges, 1000, 1000, TimeUnit.MILLISECONDS );
// 启动健康检查任务 scheduler.scheduleWithFixedDelay( this::checkDCHealth, 5000, 5000, TimeUnit.MILLISECONDS ); }
// 复制请求到其他数据中心 public CompletableFuture<Boolean> replicateRequest(Request request) { MDC.put("component", "cross-dc-replication"); MDC.put("requestId", request.getId());
try { // 1. 首先在本地DC处理请求 return localSystem.processWrite(request) .thenCompose(localSuccess -> { if (!localSuccess) { logger.warn("Request {} failed in local DC", request.getId()); return CompletableFuture.completedFuture(false); }
// 2. 如果本地成功,更新复制索引 long index = replicationIndex.incrementAndGet();
// 3. 异步复制到其他数据中心 List<CompletableFuture<Boolean>> dcFutures = new ArrayList<>();
for (var entry : dcConnections.entrySet()) { String dc = entry.getKey(); DCConnection connection = entry.getValue();
dcFutures.add(connection.replicateRequest(request, index) .thenApply(success -> { if (success) { // 更新复制进度 dcReplicationProgress.get(dc).updateAndGet( current -> Math.max(current, index)); logger.info("Request {} successfully replicated to DC {}", request.getId(), dc); } else { logger.warn("Failed to replicate request {} to DC {}", request.getId(), dc); } return success; }) .exceptionally(e -> { logger.error("Error replicating request {} to DC {}", request.getId(), dc, e); return false; })); }
// 4. 等待所有DC的响应,基于配置的复制策略 return handleDCReplications(dcFutures); }); } finally { MDC.remove("component"); MDC.remove("requestId"); } }
// 根据复制策略处理跨DC复制结果 private CompletableFuture<Boolean> handleDCReplications( List<CompletableFuture<Boolean>> dcFutures) {
ReplicationStrategy strategy = ReplicationStrategy.QUORUM; // 可配置
switch (strategy) { case ALL: // 所有DC都必须成功 return CompletableFuture.allOf( dcFutures.toArray(new CompletableFuture[0])) .thenApply(v -> dcFutures.stream() .allMatch(f -> { try { return f.get(); } catch (Exception e) { return false; } }));
case QUORUM: // 多数DC必须成功 return CompletableFuture.supplyAsync(() -> { int successCount = 0; int requiredSuccesses = (dcFutures.size() / 2) + 1;
for (CompletableFuture<Boolean> future : dcFutures) { try { if (future.get(5, TimeUnit.SECONDS)) { successCount++; if (successCount >= requiredSuccesses) { return true; } } } catch (Exception e) { logger.warn("Error waiting for DC replication", e); } }
return successCount >= requiredSuccesses; });
case ANY: // 至少一个DC成功 return CompletableFuture.supplyAsync(() -> { for (CompletableFuture<Boolean> future : dcFutures) { try { if (future.get(5, TimeUnit.SECONDS)) { return true; } } catch (Exception e) { logger.warn("Error waiting for DC replication", e); } }
return false; });
case ASYNC: // 异步复制,不等待结果 return CompletableFuture.completedFuture(true);
default: logger.warn("Unknown replication strategy: {}, using QUORUM", strategy); return CompletableFuture.supplyAsync(() -> { int successCount = 0; int requiredSuccesses = (dcFutures.size() / 2) + 1;
for (CompletableFuture<Boolean> future : dcFutures) { try { if (future.get(5, TimeUnit.SECONDS)) { successCount++; if (successCount >= requiredSuccesses) { return true; } } } catch (Exception e) { logger.warn("Error waiting for DC replication", e); } }
return successCount >= requiredSuccesses; }); } }
// 定期同步数据中心之间的变更 private void replicateChanges() { try { // 获取当前复制进度 Map<String, Long> progress = new HashMap<>(); for (var entry : dcReplicationProgress.entrySet()) { progress.put(entry.getKey(), entry.getValue().get()); }
// 对每个DC,复制尚未同步的变更 for (var entry : dcConnections.entrySet()) { String dc = entry.getKey(); DCConnection connection = entry.getValue(); long currentProgress = progress.get(dc);
if (currentProgress < replicationIndex.get()) { // 查找需要复制的变更 List<ReplicationEntry> changes = getChangesSince(currentProgress, replicationIndex.get());
if (!changes.isEmpty()) { connection.replicateChanges(changes) .thenAccept(lastIndex -> { if (lastIndex > currentProgress) { // 更新复制进度 dcReplicationProgress.get(dc).updateAndGet( current -> Math.max(current, lastIndex)); logger.info("Replicated changes to DC {} up to index {}", dc, lastIndex); } }) .exceptionally(e -> { logger.error("Failed to replicate changes to DC {}", dc, e); return null; }); } } } } catch (Exception e) { logger.error("Error in replication task", e); } }
// 检查数据中心健康状态 private void checkDCHealth() { for (var entry : dcConnections.entrySet()) { String dc = entry.getKey(); DCConnection connection = entry.getValue();
connection.checkHealth() .thenAccept(healthy -> { if (healthy) { if (connection.markHealthy()) { logger.info("DC {} is now healthy", dc); } } else { if (connection.markUnhealthy()) { logger.warn("DC {} is now unhealthy", dc); } } }) .exceptionally(e -> { logger.error("Error checking health of DC {}", dc, e); connection.markUnhealthy(); return null; }); } }
// 获取指定范围内的变更 private List<ReplicationEntry> getChangesSince(long fromIndex, long toIndex) { // 实际实现应从日志存储中检索变更 List<ReplicationEntry> changes = new ArrayList<>();
// 简化示例 for (long i = fromIndex + 1; i <= toIndex; i++) { // 模拟获取变更 changes.add(new ReplicationEntry(i, null)); }
return changes; }
@Override public void close() { // 关闭调度器 scheduler.shutdownNow(); try { if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { logger.warn("Scheduler did not terminate in time"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.warn("Interrupted while waiting for scheduler termination"); }
// 关闭所有DC连接 for (DCConnection connection : dcConnections.values()) { connection.close(); } }
// 数据中心连接类 private class DCConnection implements AutoCloseable { private final String dcId; private final DCConnectionConfig config; private final AtomicBoolean healthy = new AtomicBoolean(true); private final NetworkClient networkClient;
public DCConnection(String dcId, DCConnectionConfig config) { this.dcId = dcId; this.config = config; this.networkClient = createNetworkClient(); }
private NetworkClient createNetworkClient() { // 创建用于跨DC通信的网络客户端 // 简化示例 return null; }
public CompletableFuture<Boolean> replicateRequest(Request request, long index) { if (!healthy.get()) { return CompletableFuture.completedFuture(false); }
// 实际实现中,将请求发送到目标DC return CompletableFuture.completedFuture(true); }
public CompletableFuture<Long> replicateChanges(List<ReplicationEntry> changes) { if (!healthy.get() || changes.isEmpty()) { return CompletableFuture.completedFuture(0L); }
// 实际实现中,将变更批量发送到目标DC long lastIndex = changes.get(changes.size() - 1).getIndex(); return CompletableFuture.completedFuture(lastIndex); }
public CompletableFuture<Boolean> checkHealth() { // 实际实现中,执行健康检查 return CompletableFuture.completedFuture(true); }
public boolean markHealthy() { return healthy.compareAndSet(false, true); }
public boolean markUnhealthy() { return healthy.compareAndSet(true, false); }
@Override public void close() { // 关闭网络客户端 } }
// 复制条目 private static class ReplicationEntry { private final long index; private final byte[] data;
public ReplicationEntry(long index, byte[] data) { this.index = index; this.data = data != null ? data.clone() : null; }
public long getIndex() { return index; }
public byte[] getData() { return data != null ? data.clone() : null; } }
// 数据中心连接配置 public static class DCConnectionConfig { private final String primaryEndpoint; private final List<String> backupEndpoints; private final int connectTimeoutMs; private final int readTimeoutMs;
public DCConnectionConfig(String primaryEndpoint, List<String> backupEndpoints, int connectTimeoutMs, int readTimeoutMs) { this.primaryEndpoint = primaryEndpoint; this.backupEndpoints = new ArrayList<>(backupEndpoints); this.connectTimeoutMs = connectTimeoutMs; this.readTimeoutMs = readTimeoutMs; }
// Getters... }
// 复制策略 public enum ReplicationStrategy { ALL, // 所有DC必须成功 QUORUM, // 多数DC必须成功 ANY, // 至少一个DC成功 ASYNC // 异步复制,不等待结果 }}
五、ZAB 与 Paxos 的联系与区别
联系
两者共同点:
多数派机制:都需要超过半数节点的确认以保证安全性,防止脑裂
阶段性操作:都分为准备和提交/接受两个主要阶段来达成共识
安全性保证:在任何情况下都不会出现数据不一致的状态
容错能力:都能在部分节点失败的情况下继续工作
对网络分区的处理:在网络分区情况下保证安全性,宁可停止服务也不破坏一致性
区别
关键区别:
设计目标:
ZAB:专为 ZooKeeper 设计的状态机复制协议,强调系统整体的复制和顺序性
Paxos:通用的分布式共识算法,关注对单一值的决议过程
主从关系:
ZAB:明确的 Leader-Follower 架构,强调中心化处理
Basic Paxos:原始设计中角色对称,没有固定 Leader
Multi-Paxos:引入了 Leader 优化,但在理论上保持角色对称性
消息顺序:
ZAB:保证 FIFO 严格顺序处理,使用 ZXID(epoch + counter)保证全局顺序
Basic Paxos:不保证顺序,只关注单值共识
Multi-Paxos:可以通过实例 ID 保证顺序,但需要额外机制
恢复机制:
ZAB:有专门的崩溃恢复模式,包括选举、发现、同步和激活等阶段
Paxos:通过常规算法流程处理崩溃恢复,没有特殊的恢复模式
事务标识:
ZAB:使用 ZXID(epoch + counter)作为全局唯一标识
Paxos:使用提案编号(ballot number)和实例 ID 分别标识提案和位置
六、性能对比与工程实践
性能对比
横向可扩展性
随着集群规模增加,性能变化情况:
JVM 调优建议
/** * 推荐的JVM参数设置: * -Xms4g -Xmx4g // 固定堆大小避免动态调整 * -XX:+UseG1GC // 使用G1垃圾收集器 * -XX:MaxGCPauseMillis=200 // 最大GC暂停时间 * -XX:InitiatingHeapOccupancyPercent=45 // GC启动阈值 * -XX:+AlwaysPreTouch // 预分配内存页 * -XX:+DisableExplicitGC // 禁用显式GC调用 * -XX:+HeapDumpOnOutOfMemoryError // OOM时生成堆转储 * -XX:HeapDumpPath=/path/to/dumps // 堆转储路径 * -XX:+UseCompressedOops // 使用压缩指针 * -XX:+UseCompressedClassPointers // 使用压缩类指针 * -Djava.net.preferIPv4Stack=true // 优先使用IPv4 */
选型决策
工程实践最佳建议
合理设置超时参数:
过短的超时会导致不必要的选举
过长的超时会增加故障恢复时间
建议根据网络环境动态调整
批处理请求:
合并多个写请求为一个批次
减少网络往返次数
提高整体吞吐量
读写分离:
写请求经过 Leader
读请求可以在本地处理(根据一致性需求)
使用读取缓存减少磁盘 IO
监控关键指标:
提交延迟
Leader 切换频率
请求排队深度
网络延迟和带宽使用
预防性维护:
定期压缩日志
创建快照
监控磁盘空间
进行故障演练测试恢复流程
七、单元测试示例
@RunWith(MockitoJUnitRunner.class)public class ZABBroadcastTest { private ZABBroadcast zabBroadcast; private AtomicLong zxid; private AtomicInteger epoch;
@Mock private NetworkClient mockNetworkClient;
@Mock private StateMachine mockStateMachine;
@Before public void setUp() { zxid = new AtomicLong(0); epoch = new AtomicInteger(0);
zabBroadcast = new ZABBroadcast("server1", zxid, epoch, mockNetworkClient, mockStateMachine);
// 添加follower ServerData follower1 = new ServerData("server2", "localhost", 8001); ServerData follower2 = new ServerData("server3", "localhost", 8002); zabBroadcast.addFollower(follower1); zabBroadcast.addFollower(follower2); }
@After public void tearDown() { zabBroadcast.close(); }
@Test public void testProcessWriteSuccess() throws Exception { // 准备模拟对象行为 ACK successAck = new ACK(true, zxid.get() + 1); when(mockNetworkClient.sendProposal(anyString(), any(ProposalPacket.class))) .thenReturn(successAck);
// 执行测试 Request request = new Request("req1", "test data".getBytes()); CompletableFuture<Boolean> result = zabBroadcast.processWrite(request);
// 验证结果 assertTrue(result.get(1, TimeUnit.SECONDS));
// 验证交互 verify(mockNetworkClient, times(2)).sendProposal(anyString(), any(ProposalPacket.class)); verify(mockNetworkClient, times(2)).sendCommit(anyString(), any(CommitPacket.class)); }
@Test public void testProcessWriteFailure() throws Exception { // 准备模拟对象行为 - 一个成功,一个失败 when(mockNetworkClient.sendProposal(eq("server2"), any(ProposalPacket.class))) .thenReturn(new ACK(true, zxid.get() + 1)); when(mockNetworkClient.sendProposal(eq("server3"), any(ProposalPacket.class))) .thenReturn(new ACK(false, zxid.get()));
// 执行测试 Request request = new Request("req1", "test data".getBytes()); CompletableFuture<Boolean> result = zabBroadcast.processWrite(request);
// 验证结果 - 应该失败,因为没有多数派确认 assertFalse(result.get(1, TimeUnit.SECONDS));
// 验证交互 - 不应该发送commit verify(mockNetworkClient, times(2)).sendProposal(anyString(), any(ProposalPacket.class)); verify(mockNetworkClient, never()).sendCommit(anyString(), any(CommitPacket.class)); }
@Test public void testBatchWritePerformance() throws Exception { // 准备模拟对象行为 ACK successAck = new ACK(true, zxid.get() + 1); when(mockNetworkClient.sendProposal(anyString(), any(ProposalPacket.class))) .thenReturn(successAck);
// 准备批处理请求 List<Request> requests = new ArrayList<>(); for (int i = 0; i < 100; i++) { requests.add(new Request("req" + i, ("data" + i).getBytes())); }
// 执行测试 Stopwatch stopwatch = Stopwatch.createStarted(); CompletableFuture<Map<String, Boolean>> result = zabBroadcast.processBatchWrite(requests); Map<String, Boolean> results = result.get(5, TimeUnit.SECONDS); stopwatch.stop();
// 验证结果 assertEquals(100, results.size()); assertTrue(results.values().stream().allMatch(v -> v));
// 打印性能数据 System.out.println("Batch write of 100 requests took " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms");
// 验证交互 - 只应该有一次网络往返 verify(mockNetworkClient, times(2)).sendProposal(anyString(), any(ProposalPacket.class)); verify(mockNetworkClient, times(2)).sendCommit(anyString(), any(CommitPacket.class)); }
@Test public void testCircuitBreakerTrip() throws Exception { // 准备模拟对象行为 - 总是失败 when(mockNetworkClient.sendProposal(anyString(), any(ProposalPacket.class))) .thenReturn(new ACK(false, zxid.get()));
// 执行多次请求,触发断路器 Request request = new Request("req1", "test data".getBytes()); for (int i = 0; i < 5; i++) { try { CompletableFuture<Boolean> result = zabBroadcast.processWrite(request); result.get(1, TimeUnit.SECONDS); } catch (Exception e) { // 忽略预期中的异常 } }
// 执行第6次请求,应该直接被断路器拒绝 try { CompletableFuture<Boolean> result = zabBroadcast.processWrite(request); result.get(1, TimeUnit.SECONDS); fail("Should have thrown CircuitBreakerOpenException"); } catch (ExecutionException e) { assertTrue(e.getCause() instanceof ProcessingException); assertTrue(e.getCause().getCause() instanceof ZABBroadcast.CircuitBreakerOpenException); } }
@Test public void testReadWithConsistencyLevels() throws Exception { // 测试不同一致性级别的读取 when(mockNetworkClient.sendHeartbeat(anyString(), anyLong())) .thenReturn();
// 执行线性一致性读取 CompletableFuture<Result> linearResult = zabBroadcast.readWithConsistency("testKey", ConsistencyLevel.LINEARIZABLE);
// 执行顺序一致性读取 CompletableFuture<Result> sequentialResult = zabBroadcast.readWithConsistency("testKey", ConsistencyLevel.SEQUENTIAL);
// 执行最终一致性读取 CompletableFuture<Result> eventualResult = zabBroadcast.readWithConsistency("testKey", ConsistencyLevel.EVENTUAL);
// 验证所有请求都成功完成 assertNotNull(linearResult.get(1, TimeUnit.SECONDS)); assertNotNull(sequentialResult.get(1, TimeUnit.SECONDS)); assertNotNull(eventualResult.get(1, TimeUnit.SECONDS)); }}
八、客户端 API 示例
public class DistributedSystemClient implements AutoCloseable { private final ZabClient zabClient; private final PaxosClient paxosClient; private final Logger logger = LoggerFactory.getLogger(DistributedSystemClient.class);
public DistributedSystemClient(String zkConnectString, String paxosConnectString) { this.zabClient = new ZabClient(zkConnectString); this.paxosClient = new PaxosClient(paxosConnectString); }
// ZAB客户端示例 - 配置服务 public class ZabClient implements AutoCloseable { private final String connectString; private final CuratorFramework client;
public ZabClient(String connectString) { this.connectString = connectString; this.client = CuratorFramework.builder() .connectString(connectString) .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .build(); this.client.start(); }
// 存储配置 public void storeConfig(String path, String data) throws Exception { try { // 检查路径是否存在 if (client.checkExists().forPath(path) == null) { client.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .forPath(path, data.getBytes(StandardCharsets.UTF_8)); logger.info("Created config at path: {}", path); } else { client.setData() .forPath(path, data.getBytes(StandardCharsets.UTF_8)); logger.info("Updated config at path: {}", path); } } catch (Exception e) { logger.error("Failed to store config at path: {}", path, e); throw e; } }
// 读取配置 public String getConfig(String path) throws Exception { try { byte[] data = client.getData().forPath(path); return new String(data, StandardCharsets.UTF_8); } catch (Exception e) { logger.error("Failed to read config from path: {}", path, e); throw e; } }
// 监听配置变化 public void watchConfig(String path, Consumer<String> changeCallback) throws Exception { try { // 设置监听器 client.getData().usingWatcher(new CuratorWatcher() { @Override public void process(WatchedEvent event) throws Exception { if (event.getType() == EventType.NodeDataChanged) { String newData = getConfig(path); changeCallback.accept(newData); // 重新设置监听 watchConfig(path, changeCallback); } } }).forPath(path);
logger.info("Set watch on path: {}", path); } catch (Exception e) { logger.error("Failed to set watch on path: {}", path, e); throw e; } }
// 分布式锁 public DistributedLock getLock(String lockPath) { return new DistributedLock(client, lockPath); }
@Override public void close() { client.close(); }
// 分布式锁实现 public class DistributedLock { private final InterProcessMutex mutex; private final String lockPath;
public DistributedLock(CuratorFramework client, String lockPath) { this.lockPath = lockPath; this.mutex = new InterProcessMutex(client, lockPath); }
public void lock(long timeout, TimeUnit unit) throws Exception { if (mutex.acquire(timeout, unit)) { logger.info("Acquired lock: {}", lockPath); } else { logger.warn("Failed to acquire lock: {} within timeout", lockPath); throw new TimeoutException("Failed to acquire lock: " + lockPath); } }
public void unlock() { try { mutex.release(); logger.info("Released lock: {}", lockPath); } catch (Exception e) { logger.error("Error releasing lock: {}", lockPath, e); } } } }
// Paxos客户端示例 - 分布式KV存储 public class PaxosClient implements AutoCloseable { private final String connectString; private final PaxosKVStore kvStore;
public PaxosClient(String connectString) { this.connectString = connectString; this.kvStore = new PaxosKVStore(connectString); }
// 写入键值对 public CompletableFuture<Boolean> put(String key, String value, ConsistencyLevel consistencyLevel) { return kvStore.put(key, value, consistencyLevel); }
// 读取键值 public CompletableFuture<String> get(String key, ConsistencyLevel consistencyLevel) { return kvStore.get(key, consistencyLevel); }
// 删除键 public CompletableFuture<Boolean> delete(String key) { return kvStore.delete(key); }
@Override public void close() { kvStore.close(); }
// Paxos KV存储实现 private class PaxosKVStore implements AutoCloseable { private final PaxosClient client;
public PaxosKVStore(String connectString) { // 实际实现会连接到Paxos集群 this.client = null; // 简化示例 }
public CompletableFuture<Boolean> put(String key, String value, ConsistencyLevel consistencyLevel) { // 实际实现会通过Paxos协议提交写请求 logger.info("Putting key: {} with consistency: {}", key, consistencyLevel); return CompletableFuture.completedFuture(true); }
public CompletableFuture<String> get(String key, ConsistencyLevel consistencyLevel) { // 实际实现会根据一致性级别选择读取策略 logger.info("Getting key: {} with consistency: {}", key, consistencyLevel); return CompletableFuture.completedFuture("value"); }
public CompletableFuture<Boolean> delete(String key) { // 删除操作也是写操作,通过Paxos协议提交 logger.info("Deleting key: {}", key); return CompletableFuture.completedFuture(true); }
@Override public void close() { // 释放资源 } } }
// 使用示例 public void runExample() throws Exception { // ZAB客户端使用示例 try (ZabClient zab = new ZabClient("localhost:2181")) { // 存储配置 zab.storeConfig("/app/config", "{\"timeout\": 30, \"maxRetries\": 3}");
// 读取配置 String config = zab.getConfig("/app/config"); System.out.println("Config: " + config);
// 监听配置变化 zab.watchConfig("/app/config", newConfig -> { System.out.println("Config changed: " + newConfig); });
// 使用分布式锁 ZabClient.DistributedLock lock = zab.getLock("/app/locks/resource1"); try { lock.lock(10, TimeUnit.SECONDS); // 临界区操作 System.out.println("Performing critical operation..."); Thread.sleep(1000); } finally { lock.unlock(); } }
// Paxos客户端使用示例 try (PaxosClient paxos = new PaxosClient("localhost:8000,localhost:8001,localhost:8002")) { // 写入数据 paxos.put("user:1001", "{\"name\":\"John\",\"email\":\"john@example.com\"}", ConsistencyLevel.LINEARIZABLE) .thenAccept(success -> { System.out.println("Write success: " + success); }) .join();
// 读取数据 paxos.get("user:1001", ConsistencyLevel.SEQUENTIAL) .thenAccept(value -> { System.out.println("User data: " + value); }) .join();
// 删除数据 paxos.delete("user:1001") .thenAccept(success -> { System.out.println("Delete success: " + success); }) .join(); } }
@Override public void close() throws Exception { zabClient.close(); paxosClient.close(); }}
九、总结
ZAB 和 Paxos 都是优秀的分布式一致性算法,在现代分布式系统设计中占据核心地位。理解它们的工作原理、实现细节和适用场景,对构建可靠的分布式系统至关重要。
无论选择哪种算法,都需要根据具体应用场景、一致性需求和性能要求进行权衡。通过本文展示的工程实践和优化技术,开发者可以构建出高性能、高可靠的分布式系统,满足各种复杂业务场景的需求。
版权声明: 本文为 InfoQ 作者【异常君】的原创文章。
原文链接:【http://xie.infoq.cn/article/250a02bcf840bbc8ea208ebb3】。文章转载请联系作者。
异常君
还未添加个人签名 2025-06-06 加入
Java、Python、Go









评论