作者: LeoYang90 原文来源:https://tidb.net/blog/c7b79ced
前言
raft-rs 的 5 节点示例程序稍微比较复杂一些,但是看懂的话,就会对 raft 的使用得心应手。
示例程序
Node 结构体
struct Node { // None if the raft is not initialized. raft_group: Option<RawNode<MemStorage>>, my_mailbox: Receiver<Message>, mailboxes: HashMap<u64, Sender<Message>>, // Key-value pairs after applied. `MemStorage` only contains raft logs, // so we need an additional storage engine. kv_pairs: HashMap<u16, String>,}
复制代码
示例程序中,Node 是使用 RAFT 的外部应用,代表 RAFT 的一个节点应用程序,其中 raft_group 就是上一篇文章所说的 RawNode,是 RAFT 对外的接口,也就是 Node 节点内部运行的 RAFT;
my_mailbox 是 Node 接受其他 Node 信息的窗口,mailboxes 是 Node 发送给其他 Node 信息的窗口;
kv_pairs 是 request 最后 apply 的结果。
应用启动
fn main() { const NUM_NODES: u32 = 5; // Create 5 mailboxes to send/receive messages. Every node holds a `Receiver` to receive // messages from others, and uses the respective `Sender` to send messages to others. let (mut tx_vec, mut rx_vec) = (Vec::new(), Vec::new()); for _ in 0..NUM_NODES { let (tx, rx) = mpsc::channel(); tx_vec.push(tx); rx_vec.push(rx); }
// A global pending proposals queue. New proposals will be pushed back into the queue, and // after it's committed by the raft cluster, it will be poped from the queue. let proposals = Arc::new(Mutex::new(VecDeque::<Proposal>::new()));
for (i, rx) in rx_vec.into_iter().enumerate() { // A map[peer_id -> sender]. In the example we create 5 nodes, with ids in [1, 5]. let mailboxes = (1..6u64).zip(tx_vec.iter().cloned()).collect(); let mut node = match i { // Peer 1 is the leader. 0 => Node::create_raft_leader(1, rx, mailboxes, &logger), // Other peers are followers. _ => Node::create_raft_follower(rx, mailboxes), }; }
// Propose some conf changes so that followers can be initialized. add_all_followers(proposals.as_ref()); ...}
复制代码
从上述代码可以看到,示例程序先创建了 5 对 channel,这些 channel 是示例程序模拟真实应用的 transport 接口。
在创建 RAFT 5 个 node 节点的时候,每个 node 节点都会选择 5 对 channel 其中一个接收端作为自己的 my_mailbox,作为接收窗口,接收其他 peer node 节点的 msg。
然后复制全部其他的 5 个 channel 的发送端,作为 Node 节点的发送窗口,每个发送端对应一个 peer node 节点,向这些 channel 发送端发送 message,相应的 peer node 节点的 channel 接收端就会接收到消息。
create_raft
fn create_raft_leader(r id: u64, my_mailbox: Receiver<Message>, mailboxes: HashMap<u64, Sender<Message>>, logger: &slog::Logger, ) -> Self { let mut cfg = example_config(); cfg.id = id; let mut s = Snapshot::default(); let storage = MemStorage::new(); storage.wl().apply_snapshot(s).unwrap(); let raft_group = Some(RawNode::new(&cfg, storage, &logger).unwrap()); Node { raft_group, my_mailbox, mailboxes, kv_pairs: Default::default(), } } fn create_raft_follower( my_mailbox: Receiver<Message>, mailboxes: HashMap<u64, Sender<Message>>, ) -> Self { Node { raft_group: None, my_mailbox, mailboxes, kv_pairs: Default::default(), } }
复制代码
示例中 create_raft_follower 并没有创建 RAFT,而是等待新消息进入再创建,这个我们后面再说。
现在我们成功创建了 Leader 节点和 4 个 Follower 节点,但是 Follower 节点上面并没有运行 RAFT 程序,也就是说现在 RAFT 集群现在只有 Leader 节点一个,其他 Follower 节点上面没有运行 RAFT 模块。
接下来,我们需要要求 Leader 节点的 RAFT 程序提出配置变更要求,方法就是调用 propose 接口,并且传入 ConfChange-AddNode 的 Msg 参数:
fn add_all_followers(proposals: &Mutex<VecDeque<Proposal>>) { for i in 2..6u64 { let mut conf_change = ConfChange::default(); conf_change.node_id = i; conf_change.set_change_type(ConfChangeType::AddNode); loop { let (proposal, rx) = Proposal::conf_change(&conf_change); proposals.lock().unwrap().push_back(proposal); if rx.recv().unwrap() { break; } thread::sleep(Duration::from_millis(100)); } }}
复制代码
propose_conf_change
Leader 节点将 AddNode 的请求发送给内部 RAFT 程序,调用了 propose_conf_change 接口:
fn main() { ... if raft_group.raft.state == StateRole::Leader { // Handle new proposals. let mut proposals = proposals.lock().unwrap(); for p in proposals.iter_mut().skip_while(|p| p.proposed > 0) { propose(raft_group, p); } } ...}
fn propose(raft_group: &mut RawNode<MemStorage>, proposal: &mut Proposal) { let last_index1 = raft_group.raft.raft_log.last_index() + 1; ... } else if let Some(ref cc) = proposal.conf_change { let _ = raft_group.propose_conf_change(vec![], cc.clone()); } ...
let last_index2 = raft_group.raft.raft_log.last_index() + 1; if last_index2 == last_index1 { // Propose failed, don't forget to respond to the client. proposal.propose_success.send(false).unwrap(); } else { proposal.proposed = last_index1; }}
复制代码
ready
Leader 节点调用 propose_conf_change 后,就需要调用 ready 函数等待内部 RAFT 程序处理 Msg 完成。
值得注意的是,一般来说,如何处理 ready 函数返回的 Ready 结构体是示例应用程序的关键:
fn main() { ... // Handle readies from the raft. on_ready( raft_group, &mut node.kv_pairs, &node.mailboxes, &proposals, &logger, ); ...}
fn on_ready( raft_group: &mut RawNode<MemStorage>, kv_pairs: &mut HashMap<u16, String>, mailboxes: &HashMap<u64, Sender<Message>>, proposals: &Mutex<VecDeque<Proposal>>, logger: &slog::Logger,) { if !raft_group.has_ready() { return; } let store = raft_group.raft.raft_log.store.clone();
// Get the `Ready` with `RawNode::ready` interface. let mut ready = raft_group.ready();
... if !ready.messages().is_empty() { // Send out the messages come from the node. handle_messages(ready.take_messages()); }
// Apply the snapshot. It's necessary because in `RawNode::advance` we stabilize the snapshot. if *ready.snapshot() != Snapshot::default() { let s = ready.snapshot().clone(); if let Err(e) = store.wl().apply_snapshot(s) { ... } }
... // Apply all committed entries. handle_committed_entries(raft_group, ready.take_committed_entries());
// Persistent raft logs. It's necessary because in `RawNode::advance` we stabilize // raft logs to the latest position. if let Err(e) = store.wl().append(ready.entries()) { ... }
if let Some(hs) = ready.hs() { // Raft HardState changed, and we need to persist it. store.wl().set_hardstate(hs.clone()); }
if !ready.persisted_messages().is_empty() { // Send out the persisted messages come from the node. handle_messages(ready.take_persisted_messages()); }
// Call `RawNode::advance` interface to update position flags in the raft. let mut light_rd = raft_group.advance(ready); // Update commit index. if let Some(commit) = light_rd.commit_index() { store.wl().mut_hard_state().set_commit(commit); } // Send out the messages. handle_messages(light_rd.take_messages()); // Apply all committed entries. handle_committed_entries(raft_group, light_rd.take_committed_entries()); // Advance the apply index. raft_group.advance_apply();}
复制代码
其中,handle_messages 逻辑很简单,就是轮询各个 channel 的发送端,将消息发送到相应的 peer node:
let handle_messages = |msgs: Vec<Message>| { for msg in msgs { let to = msg.to; if mailboxes[&to].send(msg).is_err() { ... } }};
复制代码
其中处理 committed entries 的逻辑也很简单,
如果 entries 的类型是 confChange 的话,就调用 RAFT 的 apply_conf_change 函数,并且落盘到日志磁盘中。因为 raft-rs 的 joint consensus 是需要 conf change entries 在 commit 后才起作用的,必须调用 apply_conf_change 函数才能进行真正的配置变更。
如果 entries 的类型是普通类型的 entries 的话,就存储到 kv_pairs 当中去。
step
当 Leader 调用 handle_messages 函数将 msg 发送给 followers 的 channel 发送端后,followers 的 channel 接收端就会收到消息:
fn main() { ... let handle = thread::spawn(move || loop { thread::sleep(Duration::from_millis(10)); loop { // Step raft messages. match node.my_mailbox.try_recv() { Ok(msg) => node.step(msg, &logger), Err(TryRecvError::Empty) => break, Err(TryRecvError::Disconnected) => return, } } ...} fn step(&mut self, msg: Message, logger: &slog::Logger) { if self.raft_group.is_none() { if is_initial_msg(&msg) { self.initialize_raft_from_message(&msg, logger); } else { return; } } let raft_group = self.raft_group.as_mut().unwrap(); let _ = raft_group.step(msg);}
复制代码
由于 follower 在启动的时候并没有创建 RAFT 模块,因此 raft_group 是空的,这时候就会调用 initialize_raft_from_message:
fn initialize_raft_from_message(&mut self, msg: &Message, logger: &slog::Logger) { if !is_initial_msg(msg) { return; } let mut cfg = example_config(); cfg.id = msg.to; let logger = logger.new(o!("tag" => format!("peer_{}", msg.to))); let storage = MemStorage::new(); self.raft_group = Some(RawNode::new(&cfg, storage, &logger).unwrap());}
fn is_initial_msg(msg: &Message) -> bool { let msg_type = msg.get_msg_type(); msg_type == MessageType::MsgRequestVote || msg_type == MessageType::MsgRequestPreVote || (msg_type == MessageType::MsgHeartbeat && msg.commit == 0)}
复制代码
直到这个时候,Leader 和 follower 才组成 5 节点的 RAFT 集群。
propose
示例程序造出来了 100 个请求,并且让 Leader node 通过 propose 函数发送给内部的 RAFT 模块。
接下来,Leader node 的 on_ready 函数就会接收到 RAFT 模块的 Ready 结构体,解析后发送相关的 msgAppend 给 followers 的 mailboxs
followers node 通过 my_mailbox 接收到请求后,会调用 step 函数传入 follower node 内部的 RAFT 模块。
followers node 通过 on_ready 函数接收到 RAFT 模块的 Ready 结构体,解析后发送 msgAppendRespone 给 leader node 的 mailbox
Leader node 的 my_mailbox 接收到请求后,继续调用 step 将消息传入 leader 的 RAFT,RAFT 解析 msgAppendRespone 中的 index,并且更新其 committed index
leader node 的 on_ready 函数接收到 RAFT 模块的 Ready 结构体,分析出其中的 committed entries,将其存储到 kv_pairs,并且返回给客户端成功。接着还会发送 message 给 followers 的 mailboxs 最新的 commit index
followers 通过 my_maibox 收到消息后,继续调用 step 函数传入 RAFT,RAFT 模块根据 message 更新自身的 commit index
followers 调用 on_ready 函数解析出 committed entries,将其存储到自身的 kv_pairs。
至此,5 个节点的 kv_pairs 都含有用户请求的 data 数据。
fn main() { let handle = thread::spawn(move || loop { if raft_group.raft.state == StateRole::Leader { // Handle new proposals. let mut proposals = proposals.lock().unwrap(); for p in proposals.iter_mut().skip_while(|p| p.proposed > 0) { propose(raft_group, p); } } } ... (0..100u16) .filter(|i| { let (proposal, rx) = Proposal::normal(*i, "hello, world".to_owned()); proposals.lock().unwrap().push_back(proposal); // After we got a response from `rx`, we can assume the put succeeded and following // `get` operations can find the key-value pair. rx.recv().unwrap() }) .count(); ...} fn propose(raft_group: &mut RawNode<MemStorage>, proposal: &mut Proposal) { ... if let Some((ref key, ref value)) = proposal.normal { let data = format!("put {} {}", key, value).into_bytes(); let _ = raft_group.propose(vec![], data); } ...}
复制代码
Tick
tick 比较简单,直接调用 RAFT 模块的 tick 接口即可。
fn main() { ... for (i, rx) in rx_vec.into_iter().enumerate() { ... let handle = thread::spawn(move || loop { ... let raft_group = match node.raft_group { Some(ref mut r) => r, // When Node::raft_group is `None` it means the node is not initialized. _ => continue, }; ... if t.elapsed() >= Duration::from_millis(100) { // Tick the raft. raft_group.tick(); t = Instant::now(); } ... } }}
复制代码
示例程序到此为止所有逻辑已完成,完整代码链接:
https://github.com/tikv/raft-rs/blob/master/examples/five_mem_node/main.rs
评论