在线观看www成人影院-在线观看www日本免费网站-在线观看www视频-在线观看操-欧美18在线-欧美1级

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

如何在同步的Rust方法中調(diào)用異步代碼呢?

jf_wN0SrCdH ? 來(lái)源:GreptimeDB ? 2023-03-17 09:18 ? 次閱讀

在同步的 Rust 方法中調(diào)用異步代碼經(jīng)常會(huì)導(dǎo)致一些問(wèn)題,特別是對(duì)于不熟悉異步 Rust runtime 底層原理的初學(xué)者。在本文中,我們將討論我們遇到的一個(gè)特殊問(wèn)題,并分享我們采取的解決方法的經(jīng)驗(yàn)。

背景和問(wèn)題

在做GreptimeDB項(xiàng)目的時(shí)候,我們遇到一個(gè)關(guān)于在同步 Rust 方法中調(diào)用異步代碼的問(wèn)題。經(jīng)過(guò)一系列故障排查后,我們弄清了問(wèn)題的原委,這大大加深了對(duì)異步 Rust 的理解,因此在這篇文章中分享給大家,希望能給被相似問(wèn)題困擾的 Rust 開(kāi)發(fā)者一些啟發(fā)。

我們的整個(gè)項(xiàng)目是基于Tokio這個(gè)異步 Rust runtime 的,它將協(xié)作式的任務(wù)運(yùn)行和調(diào)度方便地封裝在.await調(diào)用中,非常簡(jiǎn)潔優(yōu)雅。但是這樣也讓不熟悉 Tokio 底層原理的用戶一不小心就掉入到坑里。

我們遇到的問(wèn)題是,需要在一個(gè)第三方庫(kù)的 trait 實(shí)現(xiàn)中執(zhí)行一些異步代碼,而這個(gè) trait 是同步的,我們無(wú)法修改這個(gè) trait 的定義。

traitSequencer{
fngenerate(&self)->Vec;
}

我們用一個(gè)PlainSequencer來(lái)實(shí)現(xiàn)這個(gè) trait ,而在實(shí)現(xiàn)generate方法的時(shí)候依賴一些異步的調(diào)用(比如這里的PlainSequencer::generate_async):

implPlainSequencer{
asyncfngenerate_async(&self)->Vec{
letmutres=vec![];
foriin0..self.bound{
res.push(i);
tokio::sleep(Duration::from_millis(100)).await;
}
res
}
}

implSequencerforPlainSequencer{
fngenerate(&self)->Vec{
self.generate_async().await
}
}

這樣就會(huì)出現(xiàn)問(wèn)題,因?yàn)間enerate是一個(gè)同步方法,里面是不能直接 await 的。

error[E0728]:`await`isonlyallowedinside`async`functionsandblocks
-->src/common/tt.rs30
|
31|/fngenerate(&self)->Vec{
32||self.generate_async().await
||^^^^^^onlyallowedinside`async`functionsandblocks
33||}
||_____-thisisnot`async`

我們首先想到的是,Tokio 的 runtime 有一個(gè)Runtime::block_on方法,可以同步地等待一個(gè) future 完成。

implSequencerforPlainSequencer{
fngenerate(&self)->Vec{
RUNTIME.block_on(async{
self.generate_async().await
})
}
}

#[cfg(test)]
modtests{
#[tokio::test]
asyncfntest_sync_method(){
letsequencer=PlainSequencer{
bound:3
};
letvec=sequencer.generate();
println!("vec:{:?}",vec);
}
}

編譯可以通過(guò),但是運(yùn)行時(shí)直接報(bào)錯(cuò):

Cannotstartaruntimefromwithinaruntime.Thishappensbecauseafunction(like`block_on`)attemptedtoblockthecurrentthreadwhilethethreadisbeingusedtodriveasynchronoustasks.
thread'tests::test_sync_method'panickedat'Cannotstartaruntimefromwithinaruntime.Thishappensbecauseafunction(like`block_on`)attemptedtoblockthecurrentthreadwhilethethreadisbeingusedtodriveasynchronoustasks.',/Users/lei/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/enter.rs9

提示不能從一個(gè)執(zhí)行中的 runtime 直接啟動(dòng)另一個(gè)異步 runtime??磥?lái) Tokio 為了避免這種情況特地在Runtime::block_on入口做了檢查。既然不行那我們就再看看其他的異步庫(kù)是否有類似的異步轉(zhuǎn)同步的方法。

果然找到一個(gè)futures::block_on。

implSequencerforPlainSequencer{
fngenerate(&self)->Vec{
futures::block_on(async{
self.generate_async().await
})
}
}

編譯同樣沒(méi)問(wèn)題,但是運(yùn)行時(shí)代碼直接直接 hang 住不返回了。


cargotest--color=always--packagetokio-demo

--bintttests::test_sync_method

--no-fail-fast----format=json

--exact-Zunstable-options--show-output

Compilingtokio-demov0.1.0(/Users/lei/Workspace/Rust/learning/tokio-demo) Finishedtest[unoptimized+debuginfo]target(s)in0.39s Runningunittestssrc/common/tt.rs(target/debug/deps/tt-adb10abca6625c07) {"type":"suite","event":"started","test_count":1} {"type":"test","event":"started","name":"tests::test_sync_method"} #theexecutionjusthangshere:(

明明generate_async方法里面只有一個(gè)簡(jiǎn)單的sleep()調(diào)用,但是為什么 future 一直沒(méi)完成呢?

并且吊詭的是,同樣的代碼,在tokio::test里面會(huì) hang 住,但是在tokio::main中則可以正常執(zhí)行完畢:

#[tokio::main]
pubasyncfnmain(){
letsequencer=PlainSequencer{
bound:3
};
letvec=sequencer.generate();
println!("vec:{:?}",vec);
}

執(zhí)行結(jié)果:

cargorun--color=always--packagetokio-demo--bintt
Finisheddev[unoptimized+debuginfo]target(s)in0.05s
Running`target/debug/tt`
vec:[0,1,2]

其實(shí)當(dāng)初真正遇到這個(gè)問(wèn)題的時(shí)候定位到具體在哪里 hang 住并沒(méi)有那么容易。真實(shí)代碼中 async 執(zhí)行的是一個(gè)遠(yuǎn)程的 gRPC 調(diào)用,當(dāng)初懷疑過(guò)是否是 gRPC server 的問(wèn)題,動(dòng)用了網(wǎng)絡(luò)抓包等等手段最終發(fā)現(xiàn)是 client 側(cè)的問(wèn)題。

這也提醒了我們在出現(xiàn) bug 的時(shí)候,抽象出問(wèn)題代碼的執(zhí)行模式并且做出一個(gè)最小可復(fù)現(xiàn)的樣例(Minimal Reproducible Example)是非常重要的。

Catchup

在 Rust 中,一個(gè)異步的代碼塊會(huì)被make_async_expr編譯為一個(gè)實(shí)現(xiàn)了std::Future的 generator。

#[tokio::test]
asyncfntest_future(){
letfuture=async{
println!("hello");
};

//theaboveasyncblockwon'tgetexecuteduntilweawaitit.
future.await;
}

而.await本質(zhì)上是一個(gè)語(yǔ)法糖,則會(huì)被lower_expr_await編譯成類似于下面的一個(gè)語(yǔ)法結(jié)構(gòu):

//pseudo-rustcode
match::into_future(){
mut__awaitee=>loop{
matchunsafe{::poll(
<::Pin>::new_unchecked(&mut__awaitee),
::get_context(task_context),
)}{
::Ready(result)=>breakresult,
::Pending=>{}
}
task_context=yield();
}
}

在上面這個(gè)去掉了語(yǔ)法糖的偽代碼中,可以看到有一個(gè)循環(huán)不停地檢查 generator 的狀態(tài)是否為已完成(std::poll)。

自然地,必然存在一個(gè)組件來(lái)做這件事,這里就是 Tokio 和async-std這類異步運(yùn)行時(shí)發(fā)揮作用的地方了。Rust 在設(shè)計(jì)之初就特意將異步的語(yǔ)法(async/await)和異步運(yùn)行時(shí)的實(shí)現(xiàn)分開(kāi),在上述的示例代碼中,poll 的操作是由 Tokio 的 executor 執(zhí)行的。

問(wèn)題分析

回顧完背景知識(shí),我們?cè)倏匆谎鄯椒ǖ膶?shí)現(xiàn):

fngenerate(&self)->Vec{
futures::block_on(async{
self.generate_async().await
})
}

調(diào)用generate方法的肯定是 Tokio 的 executor,那么 block_on 里面的self.generate_async().await這個(gè) future 又是誰(shuí)在 poll 呢?

一開(kāi)始我以為,futures::block_on會(huì)有一個(gè)內(nèi)部的 runtime 去負(fù)責(zé)generate_async的 poll。于是查看了代碼(主要是futures_executor::run_executor這個(gè)方法):

fnrun_executor)->Poll>(mutf:F)->T{
let_enter=enter().expect(
"cannotexecute`LocalPool`executorfromwithin
anotherexecutor",
);

CURRENT_THREAD_NOTIFY.with(|thread_notify|{
letwaker=waker_ref(thread_notify);
letmutcx=Context::from_waker(&waker);
loop{
ifletPoll::Ready(t)=f(&mutcx){
returnt;
}
letunparked=thread_notify.unparked.swap(false,Ordering::Acquire);
if!unparked{
thread::park();
thread_notify.unparked.store(false,Ordering::Release);
}
}
})
}

立刻嗅到了一絲不對(duì)的味道,雖然這個(gè)方法名為run_executor,但是整個(gè)方法里面貌似沒(méi)有任何 spawn 的操作,只是在當(dāng)前線程不停的循環(huán)判斷用戶提交的 future 的狀態(tài)是否為 ready ??!

這意味著,當(dāng) Tokio 的 runtime 線程執(zhí)行到這里的時(shí)候,會(huì)立刻進(jìn)入一個(gè)循環(huán),在循環(huán)中不停地判斷用戶的的 future 是否 ready。如果還是 pending 狀態(tài),則將當(dāng)前線程 park 住。

假設(shè),用戶 future 的異步任務(wù)也是交給了當(dāng)前線程去執(zhí)行,futures::block_on等待用戶的 future ready,而用戶 future 等待futures::block_on釋放當(dāng)前的線程資源,那么不就死鎖了?

這個(gè)推論聽(tīng)起來(lái)很有道理,讓我們來(lái)驗(yàn)證一下。既然不能在當(dāng)前 runtime 線程 block,那就重新開(kāi)一個(gè) runtime block:

implSequencerforPlainSequencer{
fngenerate(&self)->Vec{
letbound=self.bound;
futures::block_on(asyncmove{
RUNTIME.spawn(asyncmove{
letmutres=vec![];
foriin0..bound{
res.push(i);
tokio::sleep(Duration::from_millis(100)).await;
}
res
}).await.unwrap()
})
}
}

果然可以了。


cargotest--color=always--packagetokio-demo

--bintttests::test_sync_method

--no-fail-fast----format=json

--exact-Zunstable-options--show-output

Finishedtest[unoptimized+debuginfo]target(s)in0.04s Runningunittestssrc/common/tt.rs(target/debug/deps/tt-adb10abca6625c07) vec:[0,1,2]

值得注意的是,在futures::block_on里面,額外使用了一個(gè)RUNTIME來(lái) spawn 我們的異步代碼。其原因還是剛剛所說(shuō)的,這個(gè)異步任務(wù)需要一個(gè) runtime 來(lái)驅(qū)動(dòng)狀態(tài)的變化。

如果我們刪除 RUNTIME,而為 futures::block_on 生成一個(gè)新的線程,雖然死鎖問(wèn)題得到了解決,但tokio::sleep 方法的調(diào)用會(huì)報(bào)錯(cuò)"no reactor is running",這是因?yàn)?Tokio 的功能運(yùn)作需要一個(gè) runtime:

called`Result::unwrap()`onan`Err`value:Any{..}
thread''panickedat'thereisnoreactorrunning,mustbecalledfromthecontextofaTokio1.xruntime',
...

tokio::main和tokio::test

在分析完上面的原因之后,“為什么tokio::main中不會(huì) hang 住而tokio::test會(huì) hang ???“ 這個(gè)問(wèn)題也很清楚了,他們兩者所使用的的 runtime 并不一樣。tokio::main使用的是多線程的 runtime,而tokio::test使用的是單線程的 runtime,而在單線程的 runtime 下,當(dāng)前線程被futures::block_on卡死,那么用戶提交的異步代碼是一定沒(méi)機(jī)會(huì)執(zhí)行的,從而必然形成上面所說(shuō)的死鎖。

Best practice

經(jīng)過(guò)上面的分析,結(jié)合 Rust 基于 generator 的協(xié)作式異步特性,我們可以總結(jié)出 Rust 下橋接異步代碼和同步代碼的一些注意事項(xiàng):

?將異步代碼與同步代碼結(jié)合使用可能會(huì)導(dǎo)致阻塞,因此不是一個(gè)明智的選擇。

?在同步的上下文中調(diào)用異步代碼時(shí),請(qǐng)使用 futures::block_on 并將異步代碼 spawn 到另一個(gè)專用的 runtime 中執(zhí)行 ,因?yàn)榍罢邥?huì)阻塞當(dāng)前線程。

?如果必須從異步的上下文中調(diào)用有可能阻塞的同步代碼(比如文件 IO 等),則建議使用 tokio::spawn_blocking 在專門處理阻塞操作的 executor 上執(zhí)行相應(yīng)的代碼。





審核編輯:劉清

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問(wèn)題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • RPC
    RPC
    +關(guān)注

    關(guān)注

    0

    文章

    111

    瀏覽量

    11725
  • Asynchrono
    +關(guān)注

    關(guān)注

    0

    文章

    4

    瀏覽量

    6576
  • Rust
    +關(guān)注

    關(guān)注

    1

    文章

    233

    瀏覽量

    6873

原文標(biāo)題:如何在同步的 Rust 方法中調(diào)用異步代碼 | Tokio 使用中的幾點(diǎn)教訓(xùn)

文章出處:【微信號(hào):Rust語(yǔ)言中文社區(qū),微信公眾號(hào):Rust語(yǔ)言中文社區(qū)】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。

收藏 人收藏

    評(píng)論

    相關(guān)推薦

    何在Rust中高效地操作文件

    ,本教程將介紹如何在Rust中高效地操作文件,并提供多個(gè)實(shí)際應(yīng)用示例。 文件讀取 Rust語(yǔ)言中操作文件的第一步就是文件讀取,使用Rust內(nèi)置的 std::fs::File 類型即可。
    的頭像 發(fā)表于 09-19 11:51 ?2751次閱讀

    如何使用Rust語(yǔ)言和rumqttc模塊實(shí)現(xiàn)MQTT協(xié)議的異步API

    的系統(tǒng)編程語(yǔ)言,非常適合開(kāi)發(fā)物聯(lián)網(wǎng)設(shè)備和后端服務(wù)。本教程將介紹如何使用Rust語(yǔ)言和rumqttc模塊實(shí)現(xiàn)MQTT協(xié)議的異步API,并提供幾個(gè)相關(guān)的代碼示例,最佳實(shí)踐和教程總結(jié)。 本篇內(nèi)容主要圍繞
    的頭像 發(fā)表于 09-19 14:45 ?2779次閱讀

    何在Rust連接和使用MySQL數(shù)據(jù)庫(kù)

    何在Rust連接和使用MySQL數(shù)據(jù)庫(kù)。 安裝 mysql 模塊 這里我們假設(shè)你已經(jīng)安裝了Rust編程語(yǔ)言工具鏈,在本教程,我們將使用
    的頭像 發(fā)表于 09-30 17:05 ?1943次閱讀

    何在Rust讀寫文件

    見(jiàn)的內(nèi)存安全問(wèn)題和數(shù)據(jù)競(jìng)爭(zhēng)問(wèn)題。 在Rust,讀寫文件是一項(xiàng)非常常見(jiàn)的任務(wù)。本教程將介紹如何在Rust讀寫文件,包括基礎(chǔ)用法和進(jìn)階用法。
    的頭像 發(fā)表于 09-20 10:57 ?2309次閱讀

    Rust的多線程編程概念和使用方法

    Rust是一種強(qiáng)類型、高性能的系統(tǒng)編程語(yǔ)言,其官方文檔強(qiáng)調(diào)了Rust的標(biāo)準(zhǔn)庫(kù)具有良好的并發(fā)編程支持。Thread是Rust的一種并發(fā)編程
    的頭像 發(fā)表于 09-20 11:15 ?1249次閱讀

    異步復(fù)位同步撤離是什么意思?如何做到異步復(fù)位同步撤離?

    復(fù)位消抖之后的下一件事,[異步復(fù)位]()同步撤離。這句話什么意思?
    的頭像 發(fā)表于 12-04 13:57 ?5840次閱讀
    <b class='flag-5'>異步</b>復(fù)位<b class='flag-5'>同步</b>撤離是什么意思?如何做到<b class='flag-5'>異步</b>復(fù)位<b class='flag-5'>同步</b>撤離<b class='flag-5'>呢</b>?

    USART異步通信同步異步有什么區(qū)別

    USART異步通信同步異步有什么區(qū)別?異步通信怎樣連線?
    發(fā)表于 12-10 07:34

    同步復(fù)位和異步復(fù)位到底孰優(yōu)孰劣

    異步復(fù)位,同步釋放的理解目錄目錄同步復(fù)位和異步復(fù)位異步復(fù)位 同步復(fù)位 那么
    發(fā)表于 01-17 07:01

    如何利用C語(yǔ)言去調(diào)用rust靜態(tài)庫(kù)

    這篇文章: c語(yǔ)言調(diào)用rust庫(kù)函數(shù)按步驟做完,倒是挺順利,增強(qiáng)了信心。編譯arm版靜態(tài)庫(kù)上面測(cè)試都是在x86上面進(jìn)行的,嵌入式基本是使用arm和riscv等芯片。考慮到上手門檻,我這里選擇了
    發(fā)表于 06-21 10:27

    Rust代碼中加載靜態(tài)庫(kù)時(shí),出現(xiàn)錯(cuò)誤 ` rust-lld: error: undefined symbol: malloc `怎么解決?

    我正在 MCUXpresso IDE 創(chuàng)建一個(gè)靜態(tài)庫(kù)。我正在使用 redlib 在我的代碼中導(dǎo)入 ` [i]stdlib.h`。它成功地構(gòu)建了一個(gè)靜態(tài)庫(kù)。但是,靜態(tài)庫(kù)未定義一些標(biāo)準(zhǔn)庫(kù)函數(shù),例如
    發(fā)表于 06-09 08:44

    FPGA設(shè)計(jì)異步復(fù)位同步釋放問(wèn)題

    異步復(fù)位同步釋放 首先要說(shuō)一下同步復(fù)位與異步復(fù)位的區(qū)別。 同步復(fù)位是指復(fù)位信號(hào)在時(shí)鐘的上升沿或者下降沿才能起作用,而
    發(fā)表于 06-07 02:46 ?2266次閱讀

    【FPGA】異步復(fù)位,同步釋放的理解

    異步復(fù)位,同步釋放的理解目錄目錄 同步復(fù)位和異步復(fù)位 異步復(fù)位 同步復(fù)位 那么
    發(fā)表于 01-17 12:53 ?4次下載
    【FPGA】<b class='flag-5'>異步</b>復(fù)位,<b class='flag-5'>同步</b>釋放的理解

    異步信號(hào)與同步電路交互的問(wèn)題及其解決方法

    異步信號(hào)與同步電路交互的問(wèn)題及其解決方法? 異步信號(hào)和同步電路的交互問(wèn)題是指在使用異步信號(hào)與
    的頭像 發(fā)表于 12-07 10:53 ?1006次閱讀

    何在同步Rust 方法調(diào)用異步代碼 | Tokio 使用的幾點(diǎn)教訓(xùn)

    同步Rust 方法調(diào)用異步代碼經(jīng)常會(huì)導(dǎo)致一些
    的頭像 發(fā)表于 12-24 16:23 ?1670次閱讀

    異步電路的時(shí)鐘同步處理方法

    異步電路的時(shí)鐘同步處理方法? 時(shí)鐘同步異步電路
    的頭像 發(fā)表于 01-16 14:42 ?1501次閱讀
    主站蜘蛛池模板: 欧美午夜视频在线 | 手机在线免费观看视频 | 天天色综合1 | aaaaa毛片| 国产精品美女自在线观看免费 | 一级毛片在线不卡直接观看 | 亚洲国产人成在线观看 | 额去鲁97在线观看视频 | 欧美高清激情毛片 | 三级黄色网 | 天天做人人爱夜夜爽2020毛片 | 亚洲天天综合网 | 男人的天堂色偷偷之色偷偷 | 国产成人精品视频一区二区不卡 | 精品女视频在线观看免费 | 老师你好大好白好紧好硬 | 天天干天天上 | 国产aaaaa一级毛片 | 日韩a毛片免费全部播放完整 | 久久精品国产2020观看福利色 | 4虎 影视 免费 | 亚洲不卡视频在线 | 亚洲天堂免费观看 | 国产高清在线精品一区 | 天天操天天摸天天射 | 在线另类 | 国产在线精品观看一区 | 视频一区二区不卡 | 黄色成人在线 | 四虎精品影院永久在线播放 | 人人干在线 | jiucao视频在线观看 | 国产精品亚洲一区二区三区在线播放 | 日本色色图 | 国产吧在线视频 | 色综久久 | 免费a在线看 | 狠狠躁夜夜躁人人爽天天天天 | 加勒比在线免费视频 | 国产美女亚洲精品久久久久久 | 拍拍拍成人免费高清视频 |