本文介紹了協(xié)程的概念,并討論了 Tars Cpp 協(xié)程的實(shí)現(xiàn)原理和源碼分析。
一、前言
Tars 是 Linux 基金會(huì)的開源項(xiàng)目,它是基于名字服務(wù)使用 Tars 協(xié)議的高性能 RPC 開發(fā)框架,配套一體化的運(yùn)營(yíng)管理平臺(tái),并通過(guò)伸縮調(diào)度,實(shí)現(xiàn)運(yùn)維半托管服務(wù)。Tars 集可擴(kuò)展協(xié)議編解碼、高性能 RPC 通信框架、名字路由與發(fā)現(xiàn)、發(fā)布監(jiān)控、日志統(tǒng)計(jì)、配置管理等于一體,通過(guò)它可以快速用微服務(wù)的方式構(gòu)建自己的穩(wěn)定可靠的分布式應(yīng)用,并實(shí)現(xiàn)完整有效的服務(wù)治理。
Tars 目前支持 C++,Java,PHP,Nodejs,Go 語(yǔ)言,其中 TarsCpp 3.x 全面啟用對(duì)協(xié)程的支持,服務(wù)框架全面融合協(xié)程。本文基于TarsCpp-v3.0.0版本,討論了協(xié)程在TarsCpp服務(wù)框架的實(shí)現(xiàn)。
二、協(xié)程的介紹
2.1 什么是協(xié)程
協(xié)程的概念最早出現(xiàn)在Melvin Conway在1963年的論文("Design of a separable transition-diagram compiler"),協(xié)程認(rèn)為是“可以暫停和恢復(fù)執(zhí)行”的函數(shù)。
協(xié)程可以看成一種特殊的函數(shù),相比于函數(shù),協(xié)程最大的特點(diǎn)就是支持掛起(yield)和恢復(fù)(resume)的能力。如上圖所示:函數(shù)不能主動(dòng)中斷執(zhí)行流;而協(xié)程支持主動(dòng)掛起,中斷執(zhí)行流,并在一定時(shí)機(jī)恢復(fù)執(zhí)行。
協(xié)程的作用:
降低并發(fā)編碼的復(fù)雜度,尤其是異步編程(callback hell)。
協(xié)程在用戶態(tài)中實(shí)現(xiàn)調(diào)度,避免了陷入內(nèi)核,上下文切換開銷小。
2.2 進(jìn)程、線程和協(xié)程
我們可以簡(jiǎn)單的認(rèn)為協(xié)程是用戶態(tài)的線程。協(xié)程和線程主要異同:
相同點(diǎn):都可以實(shí)現(xiàn)上下文切換(保存和恢復(fù)執(zhí)行流)
不同點(diǎn):線程的上下文切換在內(nèi)核實(shí)現(xiàn),切換的時(shí)機(jī)由內(nèi)核調(diào)度器控制。協(xié)程的上下文切換在用戶態(tài)實(shí)現(xiàn),切換的時(shí)機(jī)由調(diào)用方自身控制。
進(jìn)程、線程和協(xié)程的比較:
2.3 協(xié)程的分類
按控制傳遞(Control-transfer)機(jī)制分為:對(duì)稱(Symmetric)協(xié)程和非對(duì)稱(Asymmetric)協(xié)程。
對(duì)稱協(xié)程:協(xié)程之間相互獨(dú)立,調(diào)度權(quán)(CPU)可以在任意協(xié)程之間轉(zhuǎn)移。協(xié)程只有一種控制傳遞操作(yield)。對(duì)稱協(xié)程一般需要調(diào)度器支持,通過(guò)調(diào)度算法選擇下一個(gè)目標(biāo)協(xié)程。
非對(duì)稱協(xié)程:協(xié)程之間存在調(diào)用關(guān)系,協(xié)程讓出的調(diào)度權(quán)只能返回給調(diào)用者。協(xié)程有兩種控制操作:恢復(fù)(resume)和掛起(yield)。
下圖演示了對(duì)稱協(xié)程的調(diào)度權(quán)轉(zhuǎn)移流程,協(xié)程只有一個(gè)操作yield,表示讓出CPU,返回給調(diào)度器。
對(duì)稱協(xié)程示意圖
下圖演示了非對(duì)稱協(xié)程的調(diào)度權(quán)轉(zhuǎn)移流程。協(xié)程可以有兩個(gè)操作,即resume和yield。resume表示轉(zhuǎn)移CPU給被調(diào)用者,yield表示被調(diào)用者返回CPU給調(diào)用者。
非對(duì)稱協(xié)程示意圖
根據(jù)協(xié)程是否有獨(dú)立的棧空間,協(xié)程分為有棧協(xié)程(stackful)和無(wú)棧協(xié)程(stackless)兩種。
有棧協(xié)程:每個(gè)協(xié)程有獨(dú)立的棧空間,保存獨(dú)立的上下文(執(zhí)行棧、寄存器等),協(xié)程的喚醒和掛起就是拷貝和切換上下文。優(yōu)點(diǎn):協(xié)程調(diào)度可以嵌套,在內(nèi)存中的任意位置、任意時(shí)刻進(jìn)行。局限:協(xié)程數(shù)目增大,內(nèi)存開銷增大。
無(wú)棧協(xié)程:單個(gè)線程內(nèi)所有協(xié)程都共享同一個(gè)棧空間(共享?xiàng)#瑓f(xié)程的切換就是簡(jiǎn)單的函數(shù)調(diào)用和返回,無(wú)棧協(xié)程通常是基于狀態(tài)機(jī)或閉包來(lái)實(shí)現(xiàn)。優(yōu)點(diǎn):減小內(nèi)存開銷。局限:協(xié)程調(diào)度產(chǎn)生的局部變量都在共享?xiàng)I? 一旦新的協(xié)程運(yùn)行后共享?xiàng)V械臄?shù)據(jù)就會(huì)被覆蓋, 先前協(xié)程的局部變量也就不再有效, 進(jìn)而無(wú)法實(shí)現(xiàn)參數(shù)傳遞、嵌套調(diào)用等高級(jí)協(xié)程交互。
Golang 中的 goroutine、Lua 中的協(xié)程都是有棧協(xié)程;ES6的 await/async、Python 的 Generator、C++20 中的 cooroutine 都是無(wú)棧協(xié)程。
三、Tars 協(xié)程實(shí)現(xiàn)
實(shí)現(xiàn)協(xié)程的核心有兩點(diǎn):
實(shí)現(xiàn)用戶態(tài)的上下文切換。
實(shí)現(xiàn)協(xié)程的調(diào)度。
Tars 協(xié)程的由下面幾個(gè)類實(shí)現(xiàn):
TC_CoroutineInfo 協(xié)程信息類:實(shí)現(xiàn)協(xié)程的上下文切換。每個(gè)協(xié)程對(duì)應(yīng)一個(gè) TC_CoroutineInfo 對(duì)象,上下文切換基于boost.context實(shí)現(xiàn)。
TC_CoroutineScheduler 協(xié)程調(diào)度器類:實(shí)現(xiàn)了協(xié)程的管理和調(diào)度。
TC_Coroutine 協(xié)程類:繼承于線程類(TC_Thread),方便業(yè)務(wù)快速使用協(xié)程。
Tars 協(xié)程有幾個(gè)特點(diǎn):
有棧協(xié)程。每個(gè)協(xié)程都分配了獨(dú)立的棧空間。
對(duì)稱協(xié)程。協(xié)程之間相互獨(dú)立,由調(diào)度器負(fù)責(zé)調(diào)度。
基于 epoll 實(shí)現(xiàn)協(xié)程調(diào)度,和網(wǎng)絡(luò)IO無(wú)縫結(jié)合。
3.1 用戶態(tài)上下文切換的實(shí)現(xiàn)方式
協(xié)程可以看成一種特殊的函數(shù),和普通函數(shù)不同,協(xié)程函數(shù)有掛起(yield)和恢復(fù)(resume)的能力,即可以中斷自己的執(zhí)行流,并且在合適的時(shí)候恢復(fù)執(zhí)行流,這也稱為上下文切換的能力。
協(xié)程執(zhí)行的過(guò)程,依賴兩個(gè)關(guān)鍵要素:協(xié)程棧和寄存器,協(xié)程的上下文環(huán)境其實(shí)就是寄存器和棧的狀態(tài)。實(shí)現(xiàn)上下文切換的核心就是實(shí)現(xiàn)保存并恢復(fù)當(dāng)前執(zhí)行環(huán)境的寄存器狀態(tài)的能力。
實(shí)現(xiàn)用戶態(tài)上下文切換一般有以下方式:
3.2 基于boost.context實(shí)現(xiàn)上下文切換
Tars 協(xié)程是基于 boost.context 實(shí)現(xiàn),boost.context 提供了兩個(gè)接口(make_fcontext, jump_fcontext)實(shí)現(xiàn)協(xié)程的上下文切換。
代碼1:
/** * @biref 執(zhí)行環(huán)境上下文 */ typedef void* fcontext_t; /** * @biref 事件參數(shù)包裝 */ struct transfer_t { fcontext_t fctx; // 來(lái)源的執(zhí)行上下文。來(lái)源的上下文指的是從什么位置跳轉(zhuǎn)過(guò)來(lái)的 void* data; // 接口傳入的自定義的指針 }; /** * @biref 初始化執(zhí)行環(huán)境上下文 * @param sp 棧空間地址 * @param size 棧空間的大小 * @param fn 入口函數(shù) * @return 返回初始化完成后的執(zhí)行環(huán)境上下文 */ extern "C" fcontext_t make_fcontext(void * stack, std::size_t stack_size, void (* fn)( transfer_t)); /** * @biref 跳轉(zhuǎn)到目標(biāo)上下文 * @param to 目標(biāo)上下文 * @param vp 目標(biāo)上下文的附加參數(shù),會(huì)設(shè)置為transfer_t里的data成員 * @return 跳轉(zhuǎn)來(lái)源 */ extern "C" transfer_t jump_fcontext(fcontext_t const to, void * vp);
(1)make_fcontext創(chuàng)建協(xié)程
接受三個(gè)參數(shù),stack是為協(xié)程分配的棧底,stack_size是棧的大小,fn是協(xié)程的入口函數(shù)
返回初始化完成后的執(zhí)行環(huán)境上下文
(2)jump_fcontext切換協(xié)程
接受兩個(gè)參數(shù),目標(biāo)上下文地址和參數(shù)指針
返回一個(gè)上下文,指向當(dāng)前上下文從哪個(gè)上下文跳轉(zhuǎn)過(guò)來(lái)
fcontext的結(jié)構(gòu)
boost context 是通過(guò) fcontext_t結(jié)構(gòu)體來(lái)保存協(xié)程狀態(tài)。相對(duì)于其它匯編實(shí)現(xiàn)的協(xié)程庫(kù),boost的context和stack是一起的,棧底指針就是context,切換context就是切換stack。
3.3 Tars協(xié)程信息類
TC_CoroutineInfo 協(xié)程信息類,包裝了 boost.context 提供的接口,表示一個(gè) TARS 協(xié)程。
其中,TC_CoroutineInfo::registerFunc 定義了協(xié)程的創(chuàng)建。
代碼2:
void TC_CoroutineInfo::registerFunc(const std::function& callback) { _callback = callback; _init_func.coroFunc = TC_CoroutineInfo::corotineProc; _init_func.args = this; fcontext_t ctx = make_fcontext(_stack_ctx.sp, _stack_ctx.size, TC_CoroutineInfo::corotineEntry); // 創(chuàng)建協(xié)程 transfer_t tf = jump_fcontext(ctx, this); // context 切換 //實(shí)際的ctx this->setCtx(tf.fctx); } void TC_CoroutineInfo::corotineEntry(transfer_t tf) { TC_CoroutineInfo * coro = static_cast< TC_CoroutineInfo * >(tf.data); // this auto func = coro->_init_func.coroFunc; void* args = coro->_init_func.args; transfer_t t = jump_fcontext(tf.fctx, NULL); //拿到自己的協(xié)程堆棧, 當(dāng)前協(xié)程結(jié)束以后, 好跳轉(zhuǎn)到main coro->_scheduler->setMainCtx(t.fctx); //再跳轉(zhuǎn)到具體函數(shù) func(args, t); }
TC_CoroutineInfo::switchCoro 定義了協(xié)程切換。
代碼3
void TC_CoroutineScheduler::switchCoro(TC_CoroutineInfo *to) { //跳轉(zhuǎn)到to協(xié)程 _currentCoro = to; transfer_t t = jump_fcontext(to->getCtx(), NULL); //并保存協(xié)程堆棧 to->setCtx(t.fctx); }
四、Tars 協(xié)程調(diào)度器
基于 boost.context 的 TC_CoroutineInfo 類實(shí)現(xiàn)了協(xié)程的上下文切換,協(xié)程的管理和調(diào)度,則是由 TC_CoroutineScheduler 協(xié)程調(diào)度器類來(lái)負(fù)責(zé),分管理和調(diào)度兩個(gè)方面來(lái)說(shuō)明 TC_CoroutineScheduler 調(diào)度類。
協(xié)程管理:目的是需要合理的數(shù)據(jù)結(jié)構(gòu)來(lái)組織協(xié)程(TC_CoroutineInfo),方便調(diào)度的實(shí)現(xiàn)。
協(xié)程調(diào)度:目的是控制協(xié)程的啟動(dòng)、休眠和喚醒,實(shí)現(xiàn)了 yield, sleep 等功能,本質(zhì)就是實(shí)現(xiàn)協(xié)程的狀態(tài)機(jī),完成協(xié)程的狀態(tài)切換。Tars 協(xié)程分為 5 個(gè)狀態(tài):FREE, ACTIVE, AVAIL, INACTIVE, TIMEOUT
代碼4:
/** * 協(xié)程的狀態(tài)信息 */ enum CORO_STATUS { CORO_FREE = 0, CORO_ACTIVE = 1, CORO_AVAIL = 2, CORO_INACTIVE = 3, CORO_TIMEOUT = 4 };
4.1 Tars 協(xié)程的管理
TC_CoroutineScheduler 主要通過(guò)以下方法管理協(xié)程:
TC_CoroutineScheduler::create()
創(chuàng)建TC_CoroutineScheduler對(duì)象
TC_CoroutineScheduler::init()初始化,分配協(xié)程棧內(nèi)存
TC_CoroutineScheduler::run()啟動(dòng)調(diào)度
TC_CoroutineScheduler::terminate()停止調(diào)度
TC_CoroutineScheduler::destroy()資源銷毀,釋放協(xié)程棧內(nèi)存
我們可以通過(guò) TC_CoroutineScheduler::init()看到數(shù)據(jù)結(jié)構(gòu)的初始化過(guò)程。
代碼5:
void TC_CoroutineScheduler::init() { ... .... createCoroutineInfo(_poolSize); // _all_coro = new TC_CoroutineInfo*[_poolSize+1]; TC_CoroutineInfo::CoroutineHeadInit(&_active); TC_CoroutineInfo::CoroutineHeadInit(&_avail); TC_CoroutineInfo::CoroutineHeadInit(&_inactive); TC_CoroutineInfo::CoroutineHeadInit(&_timeout); TC_CoroutineInfo::CoroutineHeadInit(&_free); int iSucc = 0; for(size_t i = 0; i < _currentSize; ++i) { //iId=0不使用, 給mainCoro使用!!!! uint32_t iId = generateId(); stack_context s_ctx = stack_traits::allocate(_stackSize); // 分配協(xié)程棧內(nèi)存 TC_CoroutineInfo *coro = new TC_CoroutineInfo(this, iId, s_ctx); _all_coro[iId] = coro; TC_CoroutineInfo::CoroutineAddTail(coro, &_free); ++iSucc; } _currentSize = iSucc; _mainCoro.setUid(0); _mainCoro.setStatus(TC_CoroutineInfo::CORO_FREE); _currentCoro = &_mainCoro; }
通過(guò)下面的 TC_CoroutineScheduler 調(diào)度類數(shù)據(jù)結(jié)構(gòu)圖,可以更清楚的看到協(xié)程的組織方式:
Tars調(diào)度類數(shù)據(jù)結(jié)構(gòu)
使用協(xié)程之前,需要在協(xié)程數(shù)組(_all_coro),創(chuàng)建指定數(shù)量的協(xié)程對(duì)象,并為每個(gè)協(xié)程分配協(xié)程棧內(nèi)存。
通過(guò)鏈表的方式管理協(xié)程,每個(gè)狀態(tài)都有一個(gè)鏈表。協(xié)程狀態(tài)切換,對(duì)應(yīng)協(xié)程在不同狀態(tài)鏈表的轉(zhuǎn)移。
4.2Tars 協(xié)程的調(diào)度
Tars 調(diào)度是基于epoll實(shí)現(xiàn),在 epoll 循環(huán)里檢查是否有需要執(zhí)行的協(xié)程, 有則執(zhí)行之, 沒有則等待在epoll對(duì)象上, 直到有喚醒或者超時(shí)。使用 epoll 實(shí)現(xiàn)的好處是可以和網(wǎng)絡(luò)IO無(wú)縫粘合, 當(dāng)有數(shù)據(jù)發(fā)送/接收時(shí), 喚醒epoll對(duì)象, 從而完成協(xié)程的切換。
Tars協(xié)程調(diào)度的核心邏輯是
TC_CoroutineScheduler::run()
代碼6:
void TC_CoroutineScheduler::run() { ... ... while(!_epoller->isTerminate()) { if(_activeCoroQueue.empty() && TC_CoroutineInfo::CoroutineHeadEmpty(&_avail) && TC_CoroutineInfo::CoroutineHeadEmpty(&_active)) { _epoller->done(1000); // epoll_wait(..., 1000ms) 先處理epoll的網(wǎng)絡(luò)事件 } //喚醒需要激活的協(xié)程 wakeup(); //喚醒sleep的協(xié)程 wakeupbytimeout(); //喚醒yield的協(xié)程 wakeupbyself(); int iLoop = 100; //執(zhí)行active協(xié)程, 每次執(zhí)行100個(gè), 避免占滿cpu while(iLoop > 0 && !TC_CoroutineInfo::CoroutineHeadEmpty(&_active)) { TC_CoroutineInfo *coro = _active._next; switchCoro(coro); --iLoop; } //執(zhí)行available協(xié)程, 每次執(zhí)行1個(gè) if(!TC_CoroutineInfo::CoroutineHeadEmpty(&_avail)) { TC_CoroutineInfo *coro = _avail._next; switchCoro(coro); } } ... ... }
下圖可以更清楚得看到協(xié)程調(diào)度和狀態(tài)轉(zhuǎn)移的過(guò)程。
Tars協(xié)程調(diào)度狀態(tài)轉(zhuǎn)移圖
TC_CoroutineScheduler 提供了下面四種方法實(shí)現(xiàn)協(xié)程的調(diào)度:
(1) TC_CoroutineScheduler:啟動(dòng)協(xié)程。
(2)TC_CoroutineScheduler:當(dāng)前協(xié)程放棄繼續(xù)執(zhí)行。并提供了兩種方式,支持不同的喚醒策略。
yield(true):會(huì)自動(dòng)喚醒(等到下次協(xié)程調(diào)度,都會(huì)再激活當(dāng)前線程)
yield(false):不再自動(dòng)喚醒,除非自己調(diào)度該協(xié)程(比如put到調(diào)度器中)
(3)TC_CoroutineScheduler:當(dāng)前協(xié)程休眠iSleepTime時(shí)間(單位:毫秒),然后會(huì)被喚醒繼續(xù)執(zhí)行。
(4)TC_CoroutineScheduler:放入需要喚醒的協(xié)程, 將協(xié)程放入到調(diào)度器中, 馬上會(huì)被調(diào)度器調(diào)度。
五、總結(jié)
本文介紹了協(xié)程的概念,并討論了 Tars Cpp 協(xié)程的實(shí)現(xiàn)原理和源碼分析。
審核編輯:劉清
-
RPC
+關(guān)注
關(guān)注
0文章
111瀏覽量
11703 -
C++語(yǔ)言
+關(guān)注
關(guān)注
0文章
147瀏覽量
7164 -
Lua語(yǔ)言
+關(guān)注
關(guān)注
0文章
9瀏覽量
1551 -
調(diào)度器
+關(guān)注
關(guān)注
0文章
98瀏覽量
5410
原文標(biāo)題:Tars-Cpp協(xié)程實(shí)現(xiàn)分析
文章出處:【微信號(hào):OSC開源社區(qū),微信公眾號(hào):OSC開源社區(qū)】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
談?wù)?b class='flag-5'>協(xié)程的那些事兒

協(xié)程和線程有什么區(qū)別
怎樣使用C語(yǔ)言去實(shí)現(xiàn)Linux系統(tǒng)協(xié)程呢
Tars在ARM平臺(tái)上的移植是如何去實(shí)現(xiàn)的
Tars移植到ARM64平臺(tái)上的過(guò)程實(shí)現(xiàn)
Python后端項(xiàng)目的協(xié)程是什么
Python協(xié)程與JavaScript協(xié)程的對(duì)比及經(jīng)驗(yàn)技巧
使用channel控制協(xié)程數(shù)量
詳解Linux線程、線程與異步編程、協(xié)程與異步
協(xié)程的概念及協(xié)程的掛起函數(shù)介紹
FreeRTOS任務(wù)與協(xié)程介紹
協(xié)程的作用、結(jié)構(gòu)及原理

評(píng)論