01、Curp Crate 的源碼組織
現在,讓我們把目光集中在 curp 共識模塊上。在 Xline 中,curp 模塊是一個獨立的 crate,其所有的源代碼都保存在 curp 目錄下,curp 目錄中有以下組織:
- curp/proto: 保存 curp 協議相關的 rpc 接口與 message 的定義
- curp/tla+: 存放 curp 協議的 tla+ specification 相關內容
- curp/tests:集成測試
- curp/src:CURP 協議的主體實現代碼,可分為:
- client.rs:CURP client 端實現
- cmd.rs:定義了與外部 mod 交互的關鍵 trait
- log_entry.rs:狀態機日志條目
- rpc/:CURP server 相關 rpc 方法的實現
- server/:CURP server 端的具體實現,包括了
- 關鍵數據結構定義:cmd_board.rs、spec_pool.rs
- 關鍵后臺任務:cmd_worker/、gc.rs
- CURP server 前后端實現:curp_node.rs、raw_curp/
- storage/:實現了持久化層的接口定義,用來和存儲層交互
02、Curp Server 的架構與任務切分
在了解了 CurpServer 所提供的 rpc 服務和相應的 trait 之后,我們來看看 Curp Server 的架構與啟動流程。整個 Curp Server 架構如下圖示:
從架構上分,我們可以將整個 Curp Server 劃分為前端 CurpNode 以及后端 RawCurp。之所以是這樣的設計,其核心原因在于對同步異步代碼的分割。對于 Rust 的異步代碼實現而言,有兩個通電:
- 對于 tokio 所 spawn 出來的 task 而言,由于編譯器本身并不知道這個 task 究竟會運行多久,因此需要強制要求所有 task 其生命周期參數都必須滿足 ‘static,這種限制往往需要我們對其某些引用數據結構執行 clone 操作
- Non-Send 的變量,例如 MutexGuard, 不允許跨越 .await 語句。在這種情況下,如果同步代碼與異步代碼混合在一起,就會導致我們需要在 await 之前,手動地去以某種形式(代碼塊或者顯示的 drop )來釋放這個 MutexGuard,這就造成了代碼結構上的一些嵌套,增加閱讀負擔。而通過 CurpNode 和 RawCurp 的前后分離,由 CurpNode 來處理同步請求,而讓 RawCurp 處理異步請求,能夠更加清晰地劃分代碼邊界,提高代碼的可讀性。
關于如何優雅組織 Rust 項目中的異步代碼,可以參考另一篇文章:如何優雅地組織Rust項目中的異步代碼在 CurpNode 結構當中,包含了 spec_pool 和 cmd_board 兩個結構。其中 spec_pool 對應了 CURP 論文中的 witness 結構,其作用是保存那些在 fast path 當中執行的命令,而 cmd_board 則是用來保存命令的執行結果。CurpNode 可以向 cmd_board 中注冊一個監聽器,當后端的 RawCurp 異步執行完命令后,便會將命令的執行結果插入到 cmd_board 中,并通過監聽器通知 CurpNode 向用戶返回命令執行結果。而當 RawCurp 接收到 Curp Node 的請求時,會將命令插入到沖突檢測隊列 —— conflict_checked_channel 當中。顧名思義,conflict_checked_channel 本質上是一個 mpmc channel,能夠并發地接受來自 CurpNode 的 cmd,并動態地維護不同的 cmd 之間的沖突關系,以確保 command worker 從 channel 拿到的命令永遠都會與當前執行的命令沖突。而 command worker 則是命令的執行實體。conflict_checked_channel 只關心不同命令之間的關系,而 command worker 則只關心命令如何執行。當 command worker 執行完命令并獲得執行結果后,它會將執行的結果插入到 cmd_board 中,并觸發監聽器,通知 CurpNode 命令已經執行完畢。
03、Curp Server 如何與業務 Server 交互
從前面的架構圖當中我們可以看出, Curp 共識模塊提供了一個 CurpServer,用于對外提供 rpc 服務。在 Xline 中,業務 Server 會通過 rpc 調用向 CurpServer 發起請求,而 CurpServer 則會在請求處理完畢后,通過相應的 trait 通知業務 Server。
服務的定義
那么,讓我們先來看看 CurpServer 都定義了哪些的服務:
service Protocol { rpc Propose (ProposeRequest) returns (ProposeResponse); rpc WaitSynced (WaitSyncedRequest) returns (WaitSyncedResponse); rpc AppendEntries (AppendEntriesRequest) returns (AppendEntriesResponse); rpc Vote (VoteRequest) returns (VoteResponse); rpc FetchLeader (FetchLeaderRequest) returns (FetchLeaderResponse); rpc InstallSnapshot (stream InstallSnapshotRequest) returns (InstallSnapshotResponse);}
這些服務的作用如下:
- Propose:用來向 Curp 集群發起一次提案請求
- WaitSynced:等待 Curp 集群完成 after sync 階段的請求
- AppendEntries:向 Curp 集群發起追加狀態機日志的請求
- Vote:當一個 Curp Server 發起選舉后,它會將自身的角色轉變為 Candidate,并調用此接口向其他的 Curp Server 發送投票請求,當獲得過半數成功響應后則變成 leader,否則退回 follower
- FetchLeader:獲取當前 Curp 集群的 leader 節點
- InstallSnapshot:當一個 Curp Server 的狀態機日志落后 leader 太多時,可以通過此接口向 leader 請求一個快照,用來追趕 leader 的狀態機日志
其中,除了 Propose
和 FetchLeader
以外的 Service 則主要用于 CurpServer 內部,而業務 Server 則通過 FetchLeader
獲取集群當前的 leader 信息,通過 Propose
向 Curp 集群發起提案。
接口的定義
接下來,讓我們來看看 Curp 模塊都定義了哪些 trait 呢?從設計目的來看,Curp 中定義的 trait 一共可以分為兩類:
- 與命令相關:當 Curp 模塊針對特定的命令達成共識后,需要通過相關的 trait 來通知業務 Server 執行相應的命令
- 與角色相關:當 Curp 節點的 Role 發生變化時,需要通過相關的 trait 來通知對應的業務組件,例如 Lessor,Compactor 等。
命令相關的 Trait
Curp 模塊中定義的命令相關的 trait 主要包括了 Command
, ConflictCheck
和 CommandExecutor
三個。讓我們先來看看 Command
和 ConflictCheck
這兩個 trait,它們的定義如下:
pub trait ConflictCheck { fn is_conflict(&self, other: &Self) - > bool;}#[async_trait]pub trait Command{ /// omit some code... #[inline] fn prepare E >(&self, e: &E, index: LogIndex) - > Result Self::PR, E::Error > where E: CommandExecutor Self > + Send + Sync, { E as CommandExecutor Self >>::prepare(e, self, index) } #[inline] async fn execute E >(&self, e: &E, index: LogIndex) - > Result Self::ER, E::Error > where E: CommandExecutor Self > + Send + Sync, { E as CommandExecutor Self >>::execute(e, self, index).await } #[inline] async fn after_sync E >( &self, e: &E, index: LogIndex, prepare_res: Self::PR, ) - > Result Self::ASR, E::Error > where E: CommandExecutor Self > + Send + Sync, { E as CommandExecutor Self >>::after_sync(e, self, index, prepare_res).await }}
Command
trait 描述了一個可以被業務 Server 執行的命令實體,而ConflictCheck
用來檢測多個命令之間是否存在沖突,沖突的判定條件則為兩個不同的命令的 key 之間是否存在交集。Command
定義了 4 個關聯類型(K
, PR
, ER
和 ASR
),其中,K
代表了命令所對應的 Key,而 ER
和 ASR
則對應了 Curp 協議中命令在 execute 階段和 after_sync 階段的執行結果。那么 PR
又代表了什么呢?這里,PR
代表的是命令 在 prepare 階段的執行結果。讀者可能會好奇,curp 協議中只定義了命令的兩個階段,并沒有關于 prepare 階段的說明,這里又為什么需要 prepare 階段呢?為了說明這個問題,讓我們來看一個例子。由于 Xline 采用了 MVCC 機制對保存其中的 key-value pair 做了多版本管理,因此會為每個 key 分配相應的 revision。假設一個 client 向 Xline 先后發送了兩個命令: PUT A=1
和 PUT B=1
,記為 cmd1 和 cmd2,預期對應的 revision 分別為 3 和 4。由于這兩個命令之間不存在沖突,Xline 可以對這兩個命令并發亂序執行。命令的開始執行順序雖然為先 PUT A = 1
再執行 PUT B=1
,但是它們之間結束的順序是不確定的,因此 after_sync 執行順序可能先執行 cmd2 再執行 cmd1。又因為 revision 需要等到 after sync 階段才能確定,因此就會出現 cmd2 的 revision 為3,cmd1 的 revision 為 4 的情況,這就產生了錯誤。為了解決這個問題,我們為 Command 引入了一個 prepare 階段。Curp 保證了 prepare 階段的執行是串行化的,并且總是執行在 execute 階段之前。Xline 將 revision 的計算從 after_sync 階段提前到了 prepare 階段,既保證了 revision 的產生順序與用戶請求到達 Xline 的順序一致,同時又保證了互不沖突的命令之間能夠并發亂序執行,不影響性能。接下來,我們再來看看 CommandExecutor
trait 的定義:
#[async_trait]pub trait CommandExecutor< C: Command >{ fn prepare(&self, cmd: &C, index: LogIndex) - > Result< C::PR >; async fn execute(&self, cmd: &C, index: LogIndex) - > Result< C::ER >; async fn after_sync( &self, cmd: &C, index: LogIndex, prepare_res: C::PR, ) - > Result< C::ASR >; /// omit some code ...}
CommandExecutor
則描述了命令的執行實體,Curp 模塊通過它通知業務 Server 執行相關的命令。這三個 trait 之間的關系是:ConflictCheck
描述的是不同命令之間的關系,Curp Server 并不關心命令如何執行,它只關心命令之間是否沖突,而 CommandExecutor
則描述了命令怎么執行,并不關心命令之間的關系,它們分離了各自的關注點。它們之間的依賴關系為 CommandExecutor
<--> Command
--> ConflictCheck
角色變化的Trait
Curp 中定義的角色相關的 trait 便是 RoleChange
, 其定義如下:
/// Callback when the leadership changespub trait RoleChange { /// The `on_election_win` will be invoked when the current server win the election. /// It means that the current server's role will change from Candidate to Leader. fn on_election_win(&self); /// The `on_calibrate` will be invoked when the current server has been calibrated. /// It means that the current server's role will change from Leader to Follower. fn on_calibrate(&self);}
因為,Xline 的 Curp 模塊中,協議的后端采用的是 Raft 協議。這里所謂的后端,是指當沖突發生時,Curp 模塊會自動 fallback 到 Raft 協議,此情況下達成共識所產生的 latency 和 Raft 協議相同,均為 2 RTT。我們知道,在原始的 Raft 論文中,為 Raft 集群中的節點定義了以下三種角色:Leader、Follower 和 Candidate,他們之間的轉換關系如下:
初始時刻,一個節點為 Follwer,在 election_timeout
時間內沒有收到來自當前集群 Leader 的任何消息,包括 heartbeat 或者 AppendEntries 請求,則會發起 election 操作,將自身轉變為 Candidate。當贏得選舉時,則該節點由 Candidate 轉變為 Leader,若失敗則回到 Follower。現在假設集群出現了網絡分區,則可能會出現兩個 Leader,當網絡分區消失后,term 較小的 Leader 在接收到來自 term 較大的 Leader 的任何消息時,會自動進行 calibrate,更新自己的 term 后轉變自身角色為 Follower。那為什么 Xline 需要定義 RoleChange
這樣的 trait 呢?這是在 Xline 的一些業務場景中,一些組件,例如 LeaseServer 和 LeaseStore 在 Leader 節點和非 Leader 節點上所執行的操作是不同的,因此需要在節點角色發生變化時通知到對應的組件。目前,由于 Lease 相關組件只區分 Leader 和非 Leader 節點,因此,定義的 callback 只覆蓋了 election_win 和 calibrate 兩種事件。未來如果業務存在需要更細粒度的區分節點角色的需求,則會增加更多的 callback 方法來進行需求覆蓋。
04、Curp Server 是如何處理請求的
現在讓我們假設有兩個 PutRequest ,分別為 PUT A=1 和 PUT A = 2,我們來看看 curp server 是如何處理這兩個沖突請求的。正如前面所提到的,用戶需要先通過 Curp Client 的 propose
方法,向 Curp Server 發起一個提案,我們先來看看 propose
的偽代碼實現。
/// Propose the request to servers#[inline]pub async fn propose(&self, cmd: C) - > Result< C::ER, ProposeError > { // create two futures let fast_round = self.fast_round(cmd); let slow_round = self.slow_round(cmd); // Wait for the fast and slow round at the same time match which_one_complete_first(fast_round, slow_round).await { fast_round returns fast_result = > { let (fast_er, success) = fast_result?; if success { Ok(fast_er.unwrap()) } else { let (_asr, er) = wait for slow_round to finish; Ok(er) } } slow_round returns slow_result = > match slow_result { Ok((_asr, er)) = > Ok(er), Err(e) = > { if let Ok((Some(er), true)) = wait for fast_round to finish { return Ok(er); } Err(e) } }, }}
如代碼所示,當 Client 調用 propose 時,會同時啟動兩個不同的 future,分別是 fast_round
和 slow_round
,對應了 Curp 協議中的 fast path 和 slow path,并等待其中的某一個 future 完成。顯然,當第一個請求到來時是不會和其他請求沖突的,因此可以想象,這個請求能夠在 fast round 過程當中被處理完畢。我們先來看看 fast_round 的實現。
Curp 共識流程 —— Fast Round
fast_round
的代碼定義在 curp/src/client 中,對應了 Curp 協議中的前端過程。
/// The fast round of Curp protocol/// It broadcasts the requests to all the curp servers.async fn fast_round( &self, cmd_arc: Arc< C >,) - > Result< (Option
總體上來講,fast_round
邏輯可以分為三步:
- 將 Command 包裝成對應的 ProposeRequest;
- 將 ProposeRequest 廣播給 Curp 集群中的所有節點;
統計結果,如果當前 Command 與其他命令沒有發生沖突,則會得到執行成功的結果。當接收到的成功的ProposeResponse 的數量超過 super quorum (約集群節點數的 3/4 左右)的數量時,這認為命令執行成功,否則失敗。
有些讀者可能就要發問了,同樣一個節點個數為 2f + 1 的分布式集群中, 為什么像 Raft 或者 Paxos 中達成共識只需要f+1個節點返回成功響應, 而 Curp 在 fast path 路徑中卻需要超過
個節點返回成功響應呢?其中 f 為集群的故障容忍度。為了說明這個問題,不妨讓我們來看看,如果 Curp 協議也采用f+1為 fast path 成功的判斷依據,那么會發生什么樣的問題?假設現在 Client_A 向 2f + 1 個節點廣播了 ProposeRequest(標記為 A) 并接收到了 f + 1 個成功響應,則其中必定有一個節點為 leader,而剩下的 f 個節點為 follower。假設,現在包含 leader 在內的 f 個節點 crash 了,并且這 f 個節點中都包含了 A 請求,則剩下的 f+1 個節點中僅有一個 follower 在內存的 spec_pool 保存了 A 請求。接著,Client 向集群中的所有節點廣播了 ProposeRequest(標記為 B,并且 B 與 A 沖突)。由于每個節點均通過自身內存中的 spec_pool 來判斷新來的請求是否與舊的 speculatively executed 請求沖突,因此,盡管 Client_B 不會獲得請求執行成功的結果,但是 B 請求仍然會保存在剩下的 節點的 f 個節點中。假設此時 leader 節點恢復,在加載完持久化的狀態機日志后,需要 replay 所有節點上保存在 spec_pool 上的全部請求來恢復狀態機,我們將這個過程稱為 recover。顯然,原來先被執行的 A 請求在當前的集群中反而成為了少數派(1 個節點),而沒有被成功執行的 B 請求卻搖身一變成了多數派(f 個節點),這會導致 leader recover 過程出錯。而當 Curp 協議也采用
為 fast path 成功的判斷依據時,即便包含 A 請求的 f 個節點全部 crash,則至少在剩下的 f + 1 個節點中依然有超過半數
個節點保留了該請求,因此保證了不會出現后來的沖突請求在數量上壓過了原來成功執行的請求的情況。現在,讓我們回到前面的例子中,由于 CurpServer 在處理請求 PUT A=1 時并未發生沖突,因此在 fast_round 能夠順利完成。對于 Leader 階段而言,請求會經過 CurpNode,到 RawCurp,到沖突檢測隊列 Conflict_Checked_MPMC 中,并最后交由 cmd_worker 來執行。當 cmd_worker 執行完 PUT A=1 后,會將對應的結果插入到 cmd_board 中,并通知 CurpNode 向 Client 返回命令已執行的響應。下圖為 fast_round 過程所對應的時序圖:
Curp 共識流程 —— Slow Round
當 Client 向 Curp Server 發送 PUT A=2 的請求時,由前面的 propose
方法可知,Client 會同時啟動 fast_round
和 slow_round
兩個 future。顯然,由于 PUT A=2 和前一個請求 PUT A=1 明顯沖突,因此 fast_round
無法成功執行,進而會等待 slow_round
的執行完畢。slow_round
的代碼定義在 curp/src/client 中,對應了 Curp 協議中的后端過程,即 wait_synced 過程。
/// The slow round of Curp protocolasync fn slow_round( &self, cmd: Arc< C >,) - > Result< (< C as Command >::ASR, < C as Command >::ER), ProposeError > { loop { let leader_id = self.get_leader_id().await; let resp = match call wait_synced from leader node { Ok(resp) = > resp.into_inner(), Err(e) = > { wait for retry_timeout to retry propose again; continue; } }; match resp? { SyncResult::Success { er, asr } = > { return Ok((asr, er)); } SyncResult::Error(Redirect(new_leader, term)) = > { let new_leader = new_leader.and_then(|id| { update_state(new_leader, term) }) }); self.resend_propose(Arc::clone(&cmd), new_leader).await?; // resend the propose to the new leader } SyncResult::Error(Timeout) = > { return Err(ProposeError::SyncedError("wait sync timeout".to_owned())); } SyncResult::Error(e) = > { return Err(ProposeError::SyncedError(format!("{e:?}"))); } } }}
總體上來講,slow_round
邏輯也可以分為兩步:
- 獲得集群當前的 leader,并向其發送一個
WaitSyncedRequest
- 等待 leader 返回
WaitSyncedRequest
的執行結果,失敗則等待retry_timeout
后重試。
由于在 Client 的 propose
方法中,由于 fast_round
在判斷新到的請求與之前 speculatively execute 的請求存在沖突,因此 RawCurp 會先將該 Command 保存到狀態機日志當中,并向集群發起 AppendEntries 請求,完成后會向 Client 返回 ProposeError::KeyConflict
錯誤,進而等待 slow_round
的結束。當 Leader 向集群中的 Follower 完成了 AppendEntries 的操作后,便會執行 apply
操作,將日志應用到狀態機上。也正是在這個過程中,leader 會將 Command 發送到沖突檢測隊列 Conflict_Checked_MPMC 中。只有當 cmd_worker 執行完所有和 PUT A=2 相沖突的命令后,才能從沖突檢測隊列中彈出 PUT A=2 這個命令來執行。與 fast_round 不同的是,在 slow_round 過程中,命令在執行完畢,并將執行結果保存到 cmd_board 中后,并不會直接返回,而是會將命令重新放回到沖突檢測隊列中,直到命令執行完 after_sync 操作后,并將對應的結果保存到 cmd_board 中后,才會向 CurpNode 返回,最后向 Client 返回對應的 ProposeResponse。整個 slow_round 操作的時序圖如下:
05、Summary
在今天的文章中,我們討論了 Xline 中 Curp Server 是如何與業務 Server 是如何進行交互的。其中業務 Server 通過 Curp Server 所定義好的 RPC 接口來向 Curp Server 發起請求。而 CurpServer 則通過 2 類不同的 trait 來通知業務 Server。其中命令相關的 Trait,如 Command
,ConflictCheck
和 CommandExecutor
等,主要負責在命令達成共識后通知業務 Server,而角色變化的 Trait,如 RoleChange
, 則主要是負責在集群節點角色變化后通知業務 Server。 Xline 中將 Curp Server 拆分成了前端 CurpNode 和后端 RawCurp 兩個部分,其中 CurpNode 主要負責接收同步的 RPC 調用請求,并將請求轉發給 RawCurp,由 RawCurp 來異步執行。RawCurp 將命令投遞到 conflict_checked_channel 當中,并由 command worker 來負責執行,執行完畢后將執行結果插入到 cmd_board 中,并通知 CurpNode 返回執行結果。
-
RPC
+關注
關注
0文章
111瀏覽量
11747 -
ASR
+關注
關注
2文章
44瀏覽量
18997 -
RTT
+關注
關注
0文章
66瀏覽量
17468 -
PUT
+關注
關注
0文章
5瀏覽量
6354 -
MVCC
+關注
關注
0文章
13瀏覽量
1529
發布評論請先 登錄
相關推薦
Faster Transformer v2.1版本源碼解讀

OneFlow Softmax算子源碼解讀之WarpSoftmax

OneFlow Softmax算子源碼解讀之BlockSoftmax

Modbus TCP Server程序開發與Yocto系統構建

評論