01、Curp Crate 的源碼組織
現(xiàn)在,讓我們把目光集中在 curp 共識(shí)模塊上。在 Xline 中,curp 模塊是一個(gè)獨(dú)立的 crate,其所有的源代碼都保存在 curp 目錄下,curp 目錄中有以下組織:
- curp/proto: 保存 curp 協(xié)議相關(guān)的 rpc 接口與 message 的定義
- curp/tla+: 存放 curp 協(xié)議的 tla+ specification 相關(guān)內(nèi)容
- curp/tests:集成測(cè)試
- curp/src:CURP 協(xié)議的主體實(shí)現(xiàn)代碼,可分為:
- client.rs:CURP client 端實(shí)現(xiàn)
- cmd.rs:定義了與外部 mod 交互的關(guān)鍵 trait
- log_entry.rs:狀態(tài)機(jī)日志條目
- rpc/:CURP server 相關(guān) rpc 方法的實(shí)現(xiàn)
- server/:CURP server 端的具體實(shí)現(xiàn),包括了
- 關(guān)鍵數(shù)據(jù)結(jié)構(gòu)定義:cmd_board.rs、spec_pool.rs
- 關(guān)鍵后臺(tái)任務(wù):cmd_worker/、gc.rs
- CURP server 前后端實(shí)現(xiàn):curp_node.rs、raw_curp/
- storage/:實(shí)現(xiàn)了持久化層的接口定義,用來(lái)和存儲(chǔ)層交互
02、Curp Server 的架構(gòu)與任務(wù)切分
在了解了 CurpServer 所提供的 rpc 服務(wù)和相應(yīng)的 trait 之后,我們來(lái)看看 Curp Server 的架構(gòu)與啟動(dòng)流程。整個(gè) Curp Server 架構(gòu)如下圖示:
從架構(gòu)上分,我們可以將整個(gè) Curp Server 劃分為前端 CurpNode 以及后端 RawCurp。之所以是這樣的設(shè)計(jì),其核心原因在于對(duì)同步異步代碼的分割。對(duì)于 Rust 的異步代碼實(shí)現(xiàn)而言,有兩個(gè)通電:
- 對(duì)于 tokio 所 spawn 出來(lái)的 task 而言,由于編譯器本身并不知道這個(gè) task 究竟會(huì)運(yùn)行多久,因此需要強(qiáng)制要求所有 task 其生命周期參數(shù)都必須滿足 ‘static,這種限制往往需要我們對(duì)其某些引用數(shù)據(jù)結(jié)構(gòu)執(zhí)行 clone 操作
- Non-Send 的變量,例如 MutexGuard, 不允許跨越 .await 語(yǔ)句。在這種情況下,如果同步代碼與異步代碼混合在一起,就會(huì)導(dǎo)致我們需要在 await 之前,手動(dòng)地去以某種形式(代碼塊或者顯示的 drop )來(lái)釋放這個(gè) MutexGuard,這就造成了代碼結(jié)構(gòu)上的一些嵌套,增加閱讀負(fù)擔(dān)。而通過(guò) CurpNode 和 RawCurp 的前后分離,由 CurpNode 來(lái)處理同步請(qǐng)求,而讓 RawCurp 處理異步請(qǐng)求,能夠更加清晰地劃分代碼邊界,提高代碼的可讀性。
關(guān)于如何優(yōu)雅組織 Rust 項(xiàng)目中的異步代碼,可以參考另一篇文章:如何優(yōu)雅地組織Rust項(xiàng)目中的異步代碼在 CurpNode 結(jié)構(gòu)當(dāng)中,包含了 spec_pool 和 cmd_board 兩個(gè)結(jié)構(gòu)。其中 spec_pool 對(duì)應(yīng)了 CURP 論文中的 witness 結(jié)構(gòu),其作用是保存那些在 fast path 當(dāng)中執(zhí)行的命令,而 cmd_board 則是用來(lái)保存命令的執(zhí)行結(jié)果。CurpNode 可以向 cmd_board 中注冊(cè)一個(gè)監(jiān)聽(tīng)器,當(dāng)后端的 RawCurp 異步執(zhí)行完命令后,便會(huì)將命令的執(zhí)行結(jié)果插入到 cmd_board 中,并通過(guò)監(jiān)聽(tīng)器通知 CurpNode 向用戶返回命令執(zhí)行結(jié)果。而當(dāng) RawCurp 接收到 Curp Node 的請(qǐng)求時(shí),會(huì)將命令插入到?jīng)_突檢測(cè)隊(duì)列 —— conflict_checked_channel 當(dāng)中。顧名思義,conflict_checked_channel 本質(zhì)上是一個(gè) mpmc channel,能夠并發(fā)地接受來(lái)自 CurpNode 的 cmd,并動(dòng)態(tài)地維護(hù)不同的 cmd 之間的沖突關(guān)系,以確保 command worker 從 channel 拿到的命令永遠(yuǎn)都會(huì)與當(dāng)前執(zhí)行的命令沖突。而 command worker 則是命令的執(zhí)行實(shí)體。conflict_checked_channel 只關(guān)心不同命令之間的關(guān)系,而 command worker 則只關(guān)心命令如何執(zhí)行。當(dāng) command worker 執(zhí)行完命令并獲得執(zhí)行結(jié)果后,它會(huì)將執(zhí)行的結(jié)果插入到 cmd_board 中,并觸發(fā)監(jiān)聽(tīng)器,通知 CurpNode 命令已經(jīng)執(zhí)行完畢。
03、Curp Server 如何與業(yè)務(wù) Server 交互
從前面的架構(gòu)圖當(dāng)中我們可以看出, Curp 共識(shí)模塊提供了一個(gè) CurpServer,用于對(duì)外提供 rpc 服務(wù)。在 Xline 中,業(yè)務(wù) Server 會(huì)通過(guò) rpc 調(diào)用向 CurpServer 發(fā)起請(qǐng)求,而 CurpServer 則會(huì)在請(qǐng)求處理完畢后,通過(guò)相應(yīng)的 trait 通知業(yè)務(wù) Server。
服務(wù)的定義
那么,讓我們先來(lái)看看 CurpServer 都定義了哪些的服務(wù):
service Protocol { rpc Propose (ProposeRequest) returns (ProposeResponse); rpc WaitSynced (WaitSyncedRequest) returns (WaitSyncedResponse); rpc AppendEntries (AppendEntriesRequest) returns (AppendEntriesResponse); rpc Vote (VoteRequest) returns (VoteResponse); rpc FetchLeader (FetchLeaderRequest) returns (FetchLeaderResponse); rpc InstallSnapshot (stream InstallSnapshotRequest) returns (InstallSnapshotResponse);}
這些服務(wù)的作用如下:
- Propose:用來(lái)向 Curp 集群發(fā)起一次提案請(qǐng)求
- WaitSynced:等待 Curp 集群完成 after sync 階段的請(qǐng)求
- AppendEntries:向 Curp 集群發(fā)起追加狀態(tài)機(jī)日志的請(qǐng)求
- Vote:當(dāng)一個(gè) Curp Server 發(fā)起選舉后,它會(huì)將自身的角色轉(zhuǎn)變?yōu)?Candidate,并調(diào)用此接口向其他的 Curp Server 發(fā)送投票請(qǐng)求,當(dāng)獲得過(guò)半數(shù)成功響應(yīng)后則變成 leader,否則退回 follower
- FetchLeader:獲取當(dāng)前 Curp 集群的 leader 節(jié)點(diǎn)
- InstallSnapshot:當(dāng)一個(gè) Curp Server 的狀態(tài)機(jī)日志落后 leader 太多時(shí),可以通過(guò)此接口向 leader 請(qǐng)求一個(gè)快照,用來(lái)追趕 leader 的狀態(tài)機(jī)日志
其中,除了 Propose
和 FetchLeader
以外的 Service 則主要用于 CurpServer 內(nèi)部,而業(yè)務(wù) Server 則通過(guò) FetchLeader
獲取集群當(dāng)前的 leader 信息,通過(guò) Propose
向 Curp 集群發(fā)起提案。
接口的定義
接下來(lái),讓我們來(lái)看看 Curp 模塊都定義了哪些 trait 呢?從設(shè)計(jì)目的來(lái)看,Curp 中定義的 trait 一共可以分為兩類:
- 與命令相關(guān):當(dāng) Curp 模塊針對(duì)特定的命令達(dá)成共識(shí)后,需要通過(guò)相關(guān)的 trait 來(lái)通知業(yè)務(wù) Server 執(zhí)行相應(yīng)的命令
- 與角色相關(guān):當(dāng) Curp 節(jié)點(diǎn)的 Role 發(fā)生變化時(shí),需要通過(guò)相關(guān)的 trait 來(lái)通知對(duì)應(yīng)的業(yè)務(wù)組件,例如 Lessor,Compactor 等。
命令相關(guān)的 Trait
Curp 模塊中定義的命令相關(guān)的 trait 主要包括了 Command
, ConflictCheck
和 CommandExecutor
三個(gè)。讓我們先來(lái)看看 Command
和 ConflictCheck
這兩個(gè) trait,它們的定義如下:
pub trait ConflictCheck { fn is_conflict(&self, other: &Self) - > bool;}#[async_trait]pub trait Command{ /// omit some code... #[inline] fn prepare E >(&self, e: &E, index: LogIndex) - > Result Self::PR, E::Error > where E: CommandExecutor Self > + Send + Sync, { E as CommandExecutor Self >>::prepare(e, self, index) } #[inline] async fn execute E >(&self, e: &E, index: LogIndex) - > Result Self::ER, E::Error > where E: CommandExecutor Self > + Send + Sync, { E as CommandExecutor Self >>::execute(e, self, index).await } #[inline] async fn after_sync E >( &self, e: &E, index: LogIndex, prepare_res: Self::PR, ) - > Result Self::ASR, E::Error > where E: CommandExecutor Self > + Send + Sync, { E as CommandExecutor Self >>::after_sync(e, self, index, prepare_res).await }}
Command
trait 描述了一個(gè)可以被業(yè)務(wù) Server 執(zhí)行的命令實(shí)體,而ConflictCheck
用來(lái)檢測(cè)多個(gè)命令之間是否存在沖突,沖突的判定條件則為兩個(gè)不同的命令的 key 之間是否存在交集。Command
定義了 4 個(gè)關(guān)聯(lián)類型(K
, PR
, ER
和 ASR
),其中,K
代表了命令所對(duì)應(yīng)的 Key,而 ER
和 ASR
則對(duì)應(yīng)了 Curp 協(xié)議中命令在 execute 階段和 after_sync 階段的執(zhí)行結(jié)果。那么 PR
又代表了什么呢?這里,PR
代表的是命令 在 prepare 階段的執(zhí)行結(jié)果。讀者可能會(huì)好奇,curp 協(xié)議中只定義了命令的兩個(gè)階段,并沒(méi)有關(guān)于 prepare 階段的說(shuō)明,這里又為什么需要 prepare 階段呢?為了說(shuō)明這個(gè)問(wèn)題,讓我們來(lái)看一個(gè)例子。由于 Xline 采用了 MVCC 機(jī)制對(duì)保存其中的 key-value pair 做了多版本管理,因此會(huì)為每個(gè) key 分配相應(yīng)的 revision。假設(shè)一個(gè) client 向 Xline 先后發(fā)送了兩個(gè)命令: PUT A=1
和 PUT B=1
,記為 cmd1 和 cmd2,預(yù)期對(duì)應(yīng)的 revision 分別為 3 和 4。由于這兩個(gè)命令之間不存在沖突,Xline 可以對(duì)這兩個(gè)命令并發(fā)亂序執(zhí)行。命令的開(kāi)始執(zhí)行順序雖然為先 PUT A = 1
再執(zhí)行 PUT B=1
,但是它們之間結(jié)束的順序是不確定的,因此 after_sync 執(zhí)行順序可能先執(zhí)行 cmd2 再執(zhí)行 cmd1。又因?yàn)?revision 需要等到 after sync 階段才能確定,因此就會(huì)出現(xiàn) cmd2 的 revision 為3,cmd1 的 revision 為 4 的情況,這就產(chǎn)生了錯(cuò)誤。為了解決這個(gè)問(wèn)題,我們?yōu)?Command 引入了一個(gè) prepare 階段。Curp 保證了 prepare 階段的執(zhí)行是串行化的,并且總是執(zhí)行在 execute 階段之前。Xline 將 revision 的計(jì)算從 after_sync 階段提前到了 prepare 階段,既保證了 revision 的產(chǎn)生順序與用戶請(qǐng)求到達(dá) Xline 的順序一致,同時(shí)又保證了互不沖突的命令之間能夠并發(fā)亂序執(zhí)行,不影響性能。接下來(lái),我們?cè)賮?lái)看看 CommandExecutor
trait 的定義:
#[async_trait]pub trait CommandExecutor< C: Command >{ fn prepare(&self, cmd: &C, index: LogIndex) - > Result< C::PR >; async fn execute(&self, cmd: &C, index: LogIndex) - > Result< C::ER >; async fn after_sync( &self, cmd: &C, index: LogIndex, prepare_res: C::PR, ) - > Result< C::ASR >; /// omit some code ...}
CommandExecutor
則描述了命令的執(zhí)行實(shí)體,Curp 模塊通過(guò)它通知業(yè)務(wù) Server 執(zhí)行相關(guān)的命令。這三個(gè) trait 之間的關(guān)系是:ConflictCheck
描述的是不同命令之間的關(guān)系,Curp Server 并不關(guān)心命令如何執(zhí)行,它只關(guān)心命令之間是否沖突,而 CommandExecutor
則描述了命令怎么執(zhí)行,并不關(guān)心命令之間的關(guān)系,它們分離了各自的關(guān)注點(diǎn)。它們之間的依賴關(guān)系為 CommandExecutor
<--> Command
--> ConflictCheck
角色變化的Trait
Curp 中定義的角色相關(guān)的 trait 便是 RoleChange
, 其定義如下:
/// Callback when the leadership changespub trait RoleChange { /// The `on_election_win` will be invoked when the current server win the election. /// It means that the current server's role will change from Candidate to Leader. fn on_election_win(&self); /// The `on_calibrate` will be invoked when the current server has been calibrated. /// It means that the current server's role will change from Leader to Follower. fn on_calibrate(&self);}
因?yàn)椋琗line 的 Curp 模塊中,協(xié)議的后端采用的是 Raft 協(xié)議。這里所謂的后端,是指當(dāng)沖突發(fā)生時(shí),Curp 模塊會(huì)自動(dòng) fallback 到 Raft 協(xié)議,此情況下達(dá)成共識(shí)所產(chǎn)生的 latency 和 Raft 協(xié)議相同,均為 2 RTT。我們知道,在原始的 Raft 論文中,為 Raft 集群中的節(jié)點(diǎn)定義了以下三種角色:Leader、Follower 和 Candidate,他們之間的轉(zhuǎn)換關(guān)系如下:
初始時(shí)刻,一個(gè)節(jié)點(diǎn)為 Follwer,在 election_timeout
時(shí)間內(nèi)沒(méi)有收到來(lái)自當(dāng)前集群 Leader 的任何消息,包括 heartbeat 或者 AppendEntries 請(qǐng)求,則會(huì)發(fā)起 election 操作,將自身轉(zhuǎn)變?yōu)?Candidate。當(dāng)贏得選舉時(shí),則該節(jié)點(diǎn)由 Candidate 轉(zhuǎn)變?yōu)?Leader,若失敗則回到 Follower。現(xiàn)在假設(shè)集群出現(xiàn)了網(wǎng)絡(luò)分區(qū),則可能會(huì)出現(xiàn)兩個(gè) Leader,當(dāng)網(wǎng)絡(luò)分區(qū)消失后,term 較小的 Leader 在接收到來(lái)自 term 較大的 Leader 的任何消息時(shí),會(huì)自動(dòng)進(jìn)行 calibrate,更新自己的 term 后轉(zhuǎn)變自身角色為 Follower。那為什么 Xline 需要定義 RoleChange
這樣的 trait 呢?這是在 Xline 的一些業(yè)務(wù)場(chǎng)景中,一些組件,例如 LeaseServer 和 LeaseStore 在 Leader 節(jié)點(diǎn)和非 Leader 節(jié)點(diǎn)上所執(zhí)行的操作是不同的,因此需要在節(jié)點(diǎn)角色發(fā)生變化時(shí)通知到對(duì)應(yīng)的組件。目前,由于 Lease 相關(guān)組件只區(qū)分 Leader 和非 Leader 節(jié)點(diǎn),因此,定義的 callback 只覆蓋了 election_win 和 calibrate 兩種事件。未來(lái)如果業(yè)務(wù)存在需要更細(xì)粒度的區(qū)分節(jié)點(diǎn)角色的需求,則會(huì)增加更多的 callback 方法來(lái)進(jìn)行需求覆蓋。
04、Curp Server 是如何處理請(qǐng)求的
現(xiàn)在讓我們假設(shè)有兩個(gè) PutRequest ,分別為 PUT A=1 和 PUT A = 2,我們來(lái)看看 curp server 是如何處理這兩個(gè)沖突請(qǐng)求的。正如前面所提到的,用戶需要先通過(guò) Curp Client 的 propose
方法,向 Curp Server 發(fā)起一個(gè)提案,我們先來(lái)看看 propose
的偽代碼實(shí)現(xiàn)。
/// Propose the request to servers#[inline]pub async fn propose(&self, cmd: C) - > Result< C::ER, ProposeError > { // create two futures let fast_round = self.fast_round(cmd); let slow_round = self.slow_round(cmd); // Wait for the fast and slow round at the same time match which_one_complete_first(fast_round, slow_round).await { fast_round returns fast_result = > { let (fast_er, success) = fast_result?; if success { Ok(fast_er.unwrap()) } else { let (_asr, er) = wait for slow_round to finish; Ok(er) } } slow_round returns slow_result = > match slow_result { Ok((_asr, er)) = > Ok(er), Err(e) = > { if let Ok((Some(er), true)) = wait for fast_round to finish { return Ok(er); } Err(e) } }, }}
如代碼所示,當(dāng) Client 調(diào)用 propose 時(shí),會(huì)同時(shí)啟動(dòng)兩個(gè)不同的 future,分別是 fast_round
和 slow_round
,對(duì)應(yīng)了 Curp 協(xié)議中的 fast path 和 slow path,并等待其中的某一個(gè) future 完成。顯然,當(dāng)?shù)谝粋€(gè)請(qǐng)求到來(lái)時(shí)是不會(huì)和其他請(qǐng)求沖突的,因此可以想象,這個(gè)請(qǐng)求能夠在 fast round 過(guò)程當(dāng)中被處理完畢。我們先來(lái)看看 fast_round 的實(shí)現(xiàn)。
Curp 共識(shí)流程 —— Fast Round
fast_round
的代碼定義在 curp/src/client 中,對(duì)應(yīng)了 Curp 協(xié)議中的前端過(guò)程。
/// The fast round of Curp protocol/// It broadcasts the requests to all the curp servers.async fn fast_round( &self, cmd_arc: Arc< C >,) - > Result< (Option
總體上來(lái)講,fast_round
邏輯可以分為三步:
- 將 Command 包裝成對(duì)應(yīng)的 ProposeRequest;
- 將 ProposeRequest 廣播給 Curp 集群中的所有節(jié)點(diǎn);
統(tǒng)計(jì)結(jié)果,如果當(dāng)前 Command 與其他命令沒(méi)有發(fā)生沖突,則會(huì)得到執(zhí)行成功的結(jié)果。當(dāng)接收到的成功的ProposeResponse 的數(shù)量超過(guò) super quorum (約集群節(jié)點(diǎn)數(shù)的 3/4 左右)的數(shù)量時(shí),這認(rèn)為命令執(zhí)行成功,否則失敗。
有些讀者可能就要發(fā)問(wèn)了,同樣一個(gè)節(jié)點(diǎn)個(gè)數(shù)為 2f + 1 的分布式集群中, 為什么像 Raft 或者 Paxos 中達(dá)成共識(shí)只需要f+1個(gè)節(jié)點(diǎn)返回成功響應(yīng), 而 Curp 在 fast path 路徑中卻需要超過(guò)
個(gè)節(jié)點(diǎn)返回成功響應(yīng)呢?其中 f 為集群的故障容忍度。為了說(shuō)明這個(gè)問(wèn)題,不妨讓我們來(lái)看看,如果 Curp 協(xié)議也采用f+1為 fast path 成功的判斷依據(jù),那么會(huì)發(fā)生什么樣的問(wèn)題?假設(shè)現(xiàn)在 Client_A 向 2f + 1 個(gè)節(jié)點(diǎn)廣播了 ProposeRequest(標(biāo)記為 A) 并接收到了 f + 1 個(gè)成功響應(yīng),則其中必定有一個(gè)節(jié)點(diǎn)為 leader,而剩下的 f 個(gè)節(jié)點(diǎn)為 follower。假設(shè),現(xiàn)在包含 leader 在內(nèi)的 f 個(gè)節(jié)點(diǎn) crash 了,并且這 f 個(gè)節(jié)點(diǎn)中都包含了 A 請(qǐng)求,則剩下的 f+1 個(gè)節(jié)點(diǎn)中僅有一個(gè) follower 在內(nèi)存的 spec_pool 保存了 A 請(qǐng)求。接著,Client 向集群中的所有節(jié)點(diǎn)廣播了 ProposeRequest(標(biāo)記為 B,并且 B 與 A 沖突)。由于每個(gè)節(jié)點(diǎn)均通過(guò)自身內(nèi)存中的 spec_pool 來(lái)判斷新來(lái)的請(qǐng)求是否與舊的 speculatively executed 請(qǐng)求沖突,因此,盡管 Client_B 不會(huì)獲得請(qǐng)求執(zhí)行成功的結(jié)果,但是 B 請(qǐng)求仍然會(huì)保存在剩下的 節(jié)點(diǎn)的 f 個(gè)節(jié)點(diǎn)中。假設(shè)此時(shí) leader 節(jié)點(diǎn)恢復(fù),在加載完持久化的狀態(tài)機(jī)日志后,需要 replay 所有節(jié)點(diǎn)上保存在 spec_pool 上的全部請(qǐng)求來(lái)恢復(fù)狀態(tài)機(jī),我們將這個(gè)過(guò)程稱為 recover。顯然,原來(lái)先被執(zhí)行的 A 請(qǐng)求在當(dāng)前的集群中反而成為了少數(shù)派(1 個(gè)節(jié)點(diǎn)),而沒(méi)有被成功執(zhí)行的 B 請(qǐng)求卻搖身一變成了多數(shù)派(f 個(gè)節(jié)點(diǎn)),這會(huì)導(dǎo)致 leader recover 過(guò)程出錯(cuò)。而當(dāng) Curp 協(xié)議也采用
為 fast path 成功的判斷依據(jù)時(shí),即便包含 A 請(qǐng)求的 f 個(gè)節(jié)點(diǎn)全部 crash,則至少在剩下的 f + 1 個(gè)節(jié)點(diǎn)中依然有超過(guò)半數(shù)
個(gè)節(jié)點(diǎn)保留了該請(qǐng)求,因此保證了不會(huì)出現(xiàn)后來(lái)的沖突請(qǐng)求在數(shù)量上壓過(guò)了原來(lái)成功執(zhí)行的請(qǐng)求的情況。現(xiàn)在,讓我們回到前面的例子中,由于 CurpServer 在處理請(qǐng)求 PUT A=1 時(shí)并未發(fā)生沖突,因此在 fast_round 能夠順利完成。對(duì)于 Leader 階段而言,請(qǐng)求會(huì)經(jīng)過(guò) CurpNode,到 RawCurp,到?jīng)_突檢測(cè)隊(duì)列 Conflict_Checked_MPMC 中,并最后交由 cmd_worker 來(lái)執(zhí)行。當(dāng) cmd_worker 執(zhí)行完 PUT A=1 后,會(huì)將對(duì)應(yīng)的結(jié)果插入到 cmd_board 中,并通知 CurpNode 向 Client 返回命令已執(zhí)行的響應(yīng)。下圖為 fast_round 過(guò)程所對(duì)應(yīng)的時(shí)序圖:
Curp 共識(shí)流程 —— Slow Round
當(dāng) Client 向 Curp Server 發(fā)送 PUT A=2 的請(qǐng)求時(shí),由前面的 propose
方法可知,Client 會(huì)同時(shí)啟動(dòng) fast_round
和 slow_round
兩個(gè) future。顯然,由于 PUT A=2 和前一個(gè)請(qǐng)求 PUT A=1 明顯沖突,因此 fast_round
無(wú)法成功執(zhí)行,進(jìn)而會(huì)等待 slow_round
的執(zhí)行完畢。slow_round
的代碼定義在 curp/src/client 中,對(duì)應(yīng)了 Curp 協(xié)議中的后端過(guò)程,即 wait_synced 過(guò)程。
/// The slow round of Curp protocolasync fn slow_round( &self, cmd: Arc< C >,) - > Result< (< C as Command >::ASR, < C as Command >::ER), ProposeError > { loop { let leader_id = self.get_leader_id().await; let resp = match call wait_synced from leader node { Ok(resp) = > resp.into_inner(), Err(e) = > { wait for retry_timeout to retry propose again; continue; } }; match resp? { SyncResult::Success { er, asr } = > { return Ok((asr, er)); } SyncResult::Error(Redirect(new_leader, term)) = > { let new_leader = new_leader.and_then(|id| { update_state(new_leader, term) }) }); self.resend_propose(Arc::clone(&cmd), new_leader).await?; // resend the propose to the new leader } SyncResult::Error(Timeout) = > { return Err(ProposeError::SyncedError("wait sync timeout".to_owned())); } SyncResult::Error(e) = > { return Err(ProposeError::SyncedError(format!("{e:?}"))); } } }}
總體上來(lái)講,slow_round
邏輯也可以分為兩步:
- 獲得集群當(dāng)前的 leader,并向其發(fā)送一個(gè)
WaitSyncedRequest
- 等待 leader 返回
WaitSyncedRequest
的執(zhí)行結(jié)果,失敗則等待retry_timeout
后重試。
由于在 Client 的 propose
方法中,由于 fast_round
在判斷新到的請(qǐng)求與之前 speculatively execute 的請(qǐng)求存在沖突,因此 RawCurp 會(huì)先將該 Command 保存到狀態(tài)機(jī)日志當(dāng)中,并向集群發(fā)起 AppendEntries 請(qǐng)求,完成后會(huì)向 Client 返回 ProposeError::KeyConflict
錯(cuò)誤,進(jìn)而等待 slow_round
的結(jié)束。當(dāng) Leader 向集群中的 Follower 完成了 AppendEntries 的操作后,便會(huì)執(zhí)行 apply
操作,將日志應(yīng)用到狀態(tài)機(jī)上。也正是在這個(gè)過(guò)程中,leader 會(huì)將 Command 發(fā)送到?jīng)_突檢測(cè)隊(duì)列 Conflict_Checked_MPMC 中。只有當(dāng) cmd_worker 執(zhí)行完所有和 PUT A=2 相沖突的命令后,才能從沖突檢測(cè)隊(duì)列中彈出 PUT A=2 這個(gè)命令來(lái)執(zhí)行。與 fast_round 不同的是,在 slow_round 過(guò)程中,命令在執(zhí)行完畢,并將執(zhí)行結(jié)果保存到 cmd_board 中后,并不會(huì)直接返回,而是會(huì)將命令重新放回到?jīng)_突檢測(cè)隊(duì)列中,直到命令執(zhí)行完 after_sync 操作后,并將對(duì)應(yīng)的結(jié)果保存到 cmd_board 中后,才會(huì)向 CurpNode 返回,最后向 Client 返回對(duì)應(yīng)的 ProposeResponse。整個(gè) slow_round 操作的時(shí)序圖如下:
05、Summary
在今天的文章中,我們討論了 Xline 中 Curp Server 是如何與業(yè)務(wù) Server 是如何進(jìn)行交互的。其中業(yè)務(wù) Server 通過(guò) Curp Server 所定義好的 RPC 接口來(lái)向 Curp Server 發(fā)起請(qǐng)求。而 CurpServer 則通過(guò) 2 類不同的 trait 來(lái)通知業(yè)務(wù) Server。其中命令相關(guān)的 Trait,如 Command
,ConflictCheck
和 CommandExecutor
等,主要負(fù)責(zé)在命令達(dá)成共識(shí)后通知業(yè)務(wù) Server,而角色變化的 Trait,如 RoleChange
, 則主要是負(fù)責(zé)在集群節(jié)點(diǎn)角色變化后通知業(yè)務(wù) Server。 Xline 中將 Curp Server 拆分成了前端 CurpNode 和后端 RawCurp 兩個(gè)部分,其中 CurpNode 主要負(fù)責(zé)接收同步的 RPC 調(diào)用請(qǐng)求,并將請(qǐng)求轉(zhuǎn)發(fā)給 RawCurp,由 RawCurp 來(lái)異步執(zhí)行。RawCurp 將命令投遞到 conflict_checked_channel 當(dāng)中,并由 command worker 來(lái)負(fù)責(zé)執(zhí)行,執(zhí)行完畢后將執(zhí)行結(jié)果插入到 cmd_board 中,并通知 CurpNode 返回執(zhí)行結(jié)果。
-
RPC
+關(guān)注
關(guān)注
0文章
111瀏覽量
11581 -
ASR
+關(guān)注
關(guān)注
2文章
43瀏覽量
18808 -
RTT
+關(guān)注
關(guān)注
0文章
65瀏覽量
17222 -
PUT
+關(guān)注
關(guān)注
0文章
5瀏覽量
6324 -
MVCC
+關(guān)注
關(guān)注
0文章
13瀏覽量
1490
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
Faster Transformer v2.1版本源碼解讀
![Faster Transformer v2.1版本<b class='flag-5'>源碼</b><b class='flag-5'>解讀</b>](https://file1.elecfans.com/web2/M00/A3/B5/wKgZomUJF9eAXWDQAAA5V2i2udk464.png)
OneFlow Softmax算子源碼解讀之WarpSoftmax
![OneFlow Softmax算子<b class='flag-5'>源碼</b><b class='flag-5'>解讀</b>之WarpSoftmax](https://file1.elecfans.com/web2/M00/BB/4D/wKgZomWbT1CAWm4KAAA2IsyxqO4463.png)
OneFlow Softmax算子源碼解讀之BlockSoftmax
![OneFlow Softmax算子<b class='flag-5'>源碼</b><b class='flag-5'>解讀</b>之BlockSoftmax](https://file1.elecfans.com/web2/M00/BC/38/wKgaomWbT5GAZdkPAAAcgecZoiU624.png)
AP側(cè)中網(wǎng)相關(guān)的PLMN業(yè)務(wù)源碼流程解讀
能不能使用lwip-dhcpd的源碼實(shí)現(xiàn)dhcpd server的功能呢?
SQL Server 2000菜鳥(niǎo)入門(mén)
基于規(guī)則的RADIUS Server設(shè)計(jì)與實(shí)現(xiàn)
ArcGIS Runtime和ArcGIS Engine、ArcGIS Server的比較_arcgis desktop、arcgis engine和arcgis server三者之間有什么區(qū)別
基于EAIDK的人臉?biāo)惴☉?yīng)用-源碼解讀(2)
openharmony源碼解讀
Xline源碼解讀(一)—初識(shí)CURP協(xié)議
![<b class='flag-5'>Xline</b><b class='flag-5'>源碼</b><b class='flag-5'>解讀</b>(一)—初識(shí)<b class='flag-5'>CURP</b>協(xié)議](https://file1.elecfans.com/web2/M00/A8/F5/wKgZomUiSVGAMeKbAAB5Fgkf3mg69.jpeg)
Xline源碼解讀(二)—Lease的機(jī)制與實(shí)現(xiàn)
![<b class='flag-5'>Xline</b><b class='flag-5'>源碼</b><b class='flag-5'>解讀</b>(二)—Lease的機(jī)制與<b class='flag-5'>實(shí)現(xiàn)</b>](https://file1.elecfans.com/web2/M00/A7/30/wKgaomUiSiuABvkjAABmwwhrH7A49.jpeg)
分布式系統(tǒng)中Membership Change 源碼解讀
![分布式系統(tǒng)中Membership Change <b class='flag-5'>源碼</b><b class='flag-5'>解讀</b>](https://file1.elecfans.com/web2/M00/C4/42/wKgaomXqr16AORuqAAAl7hbcU5Q712.png)
評(píng)論