0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

如何在同步的Rust方法中调用异步代码呢?

jf_wN0SrCdH 来源:GreptimeDB 2023-03-17 09:18 次阅读

在同步的 Rust 方法中调用异步代码经常会导致一些问题,特别是对于不熟悉异步 Rust runtime 底层原理的初学者。在本文中,我们将讨论我们遇到的一个特殊问题,并分享我们采取的解决方法的经验。

背景和问题

在做GreptimeDB项目的时候,我们遇到一个关于在同步 Rust 方法中调用异步代码的问题。经过一系列故障排查后,我们弄清了问题的原委,这大大加深了对异步 Rust 的理解,因此在这篇文章中分享给大家,希望能给被相似问题困扰的 Rust 开发者一些启发。

我们的整个项目是基于Tokio这个异步 Rust runtime 的,它将协作式的任务运行和调度方便地封装在.await调用中,非常简洁优雅。但是这样也让不熟悉 Tokio 底层原理的用户一不小心就掉入到坑里。

我们遇到的问题是,需要在一个第三方库的 trait 实现中执行一些异步代码,而这个 trait 是同步的,我们无法修改这个 trait 的定义。

traitSequencer{
fngenerate(&self)->Vec;
}

我们用一个PlainSequencer来实现这个 trait ,而在实现generate方法的时候依赖一些异步的调用(比如这里的PlainSequencer::generate_async):

implPlainSequencer{
asyncfngenerate_async(&self)->Vec{
letmutres=vec![];
foriin0..self.bound{
res.push(i);
tokio::sleep(Duration::from_millis(100)).await;
}
res
}
}

implSequencerforPlainSequencer{
fngenerate(&self)->Vec{
self.generate_async().await
}
}

这样就会出现问题,因为generate是一个同步方法,里面是不能直接 await 的。

error[E0728]:`await`isonlyallowedinside`async`functionsandblocks
-->src/common/tt.rs30
|
31|/fngenerate(&self)->Vec{
32||self.generate_async().await
||^^^^^^onlyallowedinside`async`functionsandblocks
33||}
||_____-thisisnot`async`

我们首先想到的是,Tokio 的 runtime 有一个Runtime::block_on方法,可以同步地等待一个 future 完成。

implSequencerforPlainSequencer{
fngenerate(&self)->Vec{
RUNTIME.block_on(async{
self.generate_async().await
})
}
}

#[cfg(test)]
modtests{
#[tokio::test]
asyncfntest_sync_method(){
letsequencer=PlainSequencer{
bound:3
};
letvec=sequencer.generate();
println!("vec:{:?}",vec);
}
}

编译可以通过,但是运行时直接报错:

Cannotstartaruntimefromwithinaruntime.Thishappensbecauseafunction(like`block_on`)attemptedtoblockthecurrentthreadwhilethethreadisbeingusedtodriveasynchronoustasks.
thread'tests::test_sync_method'panickedat'Cannotstartaruntimefromwithinaruntime.Thishappensbecauseafunction(like`block_on`)attemptedtoblockthecurrentthreadwhilethethreadisbeingusedtodriveasynchronoustasks.',/Users/lei/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/enter.rs9

提示不能从一个执行中的 runtime 直接启动另一个异步 runtime。看来 Tokio 为了避免这种情况特地在Runtime::block_on入口做了检查。既然不行那我们就再看看其他的异步库是否有类似的异步转同步的方法。

果然找到一个futures::block_on。

implSequencerforPlainSequencer{
fngenerate(&self)->Vec{
futures::block_on(async{
self.generate_async().await
})
}
}

编译同样没问题,但是运行时代码直接直接 hang 住不返回了。


cargotest--color=always--packagetokio-demo

--bintttests::test_sync_method

--no-fail-fast----format=json

--exact-Zunstable-options--show-output

Compilingtokio-demov0.1.0(/Users/lei/Workspace/Rust/learning/tokio-demo) Finishedtest[unoptimized+debuginfo]target(s)in0.39s Runningunittestssrc/common/tt.rs(target/debug/deps/tt-adb10abca6625c07) {"type":"suite","event":"started","test_count":1} {"type":"test","event":"started","name":"tests::test_sync_method"} #theexecutionjusthangshere:(

明明generate_async方法里面只有一个简单的sleep()调用,但是为什么 future 一直没完成呢?

并且吊诡的是,同样的代码,在tokio::test里面会 hang 住,但是在tokio::main中则可以正常执行完毕:

#[tokio::main]
pubasyncfnmain(){
letsequencer=PlainSequencer{
bound:3
};
letvec=sequencer.generate();
println!("vec:{:?}",vec);
}

执行结果:

cargorun--color=always--packagetokio-demo--bintt
Finisheddev[unoptimized+debuginfo]target(s)in0.05s
Running`target/debug/tt`
vec:[0,1,2]

其实当初真正遇到这个问题的时候定位到具体在哪里 hang 住并没有那么容易。真实代码中 async 执行的是一个远程的 gRPC 调用,当初怀疑过是否是 gRPC server 的问题,动用了网络抓包等等手段最终发现是 client 侧的问题。

这也提醒了我们在出现 bug 的时候,抽象出问题代码的执行模式并且做出一个最小可复现的样例(Minimal Reproducible Example)是非常重要的。

Catchup

在 Rust 中,一个异步的代码块会被make_async_expr编译为一个实现了std::Future的 generator。

#[tokio::test]
asyncfntest_future(){
letfuture=async{
println!("hello");
};

//theaboveasyncblockwon'tgetexecuteduntilweawaitit.
future.await;
}

而.await本质上是一个语法糖,则会被lower_expr_await编译成类似于下面的一个语法结构:

//pseudo-rustcode
match::into_future(){
mut__awaitee=>loop{
matchunsafe{::poll(
<::Pin>::new_unchecked(&mut__awaitee),
::get_context(task_context),
)}{
::Ready(result)=>breakresult,
::Pending=>{}
}
task_context=yield();
}
}

在上面这个去掉了语法糖的伪代码中,可以看到有一个循环不停地检查 generator 的状态是否为已完成(std::poll)。

自然地,必然存在一个组件来做这件事,这里就是 Tokio 和async-std这类异步运行时发挥作用的地方了。Rust 在设计之初就特意将异步的语法(async/await)和异步运行时的实现分开,在上述的示例代码中,poll 的操作是由 Tokio 的 executor 执行的。

问题分析

回顾完背景知识,我们再看一眼方法的实现:

fngenerate(&self)->Vec{
futures::block_on(async{
self.generate_async().await
})
}

调用generate方法的肯定是 Tokio 的 executor,那么 block_on 里面的self.generate_async().await这个 future 又是谁在 poll 呢?

一开始我以为,futures::block_on会有一个内部的 runtime 去负责generate_async的 poll。于是查看了代码(主要是futures_executor::run_executor这个方法):

fnrun_executor)->Poll>(mutf:F)->T{
let_enter=enter().expect(
"cannotexecute`LocalPool`executorfromwithin
anotherexecutor",
);

CURRENT_THREAD_NOTIFY.with(|thread_notify|{
letwaker=waker_ref(thread_notify);
letmutcx=Context::from_waker(&waker);
loop{
ifletPoll::Ready(t)=f(&mutcx){
returnt;
}
letunparked=thread_notify.unparked.swap(false,Ordering::Acquire);
if!unparked{
thread::park();
thread_notify.unparked.store(false,Ordering::Release);
}
}
})
}

立刻嗅到了一丝不对的味道,虽然这个方法名为run_executor,但是整个方法里面貌似没有任何 spawn 的操作,只是在当前线程不停的循环判断用户提交的 future 的状态是否为 ready 啊!

这意味着,当 Tokio 的 runtime 线程执行到这里的时候,会立刻进入一个循环,在循环中不停地判断用户的的 future 是否 ready。如果还是 pending 状态,则将当前线程 park 住。

假设,用户 future 的异步任务也是交给了当前线程去执行,futures::block_on等待用户的 future ready,而用户 future 等待futures::block_on释放当前的线程资源,那么不就死锁了?

这个推论听起来很有道理,让我们来验证一下。既然不能在当前 runtime 线程 block,那就重新开一个 runtime block:

implSequencerforPlainSequencer{
fngenerate(&self)->Vec{
letbound=self.bound;
futures::block_on(asyncmove{
RUNTIME.spawn(asyncmove{
letmutres=vec![];
foriin0..bound{
res.push(i);
tokio::sleep(Duration::from_millis(100)).await;
}
res
}).await.unwrap()
})
}
}

果然可以了。


cargotest--color=always--packagetokio-demo

--bintttests::test_sync_method

--no-fail-fast----format=json

--exact-Zunstable-options--show-output

Finishedtest[unoptimized+debuginfo]target(s)in0.04s Runningunittestssrc/common/tt.rs(target/debug/deps/tt-adb10abca6625c07) vec:[0,1,2]

值得注意的是,在futures::block_on里面,额外使用了一个RUNTIME来 spawn 我们的异步代码。其原因还是刚刚所说的,这个异步任务需要一个 runtime 来驱动状态的变化。

如果我们删除 RUNTIME,而为 futures::block_on 生成一个新的线程,虽然死锁问题得到了解决,但tokio::sleep 方法的调用会报错"no reactor is running",这是因为 Tokio 的功能运作需要一个 runtime:

called`Result::unwrap()`onan`Err`value:Any{..}
thread''panickedat'thereisnoreactorrunning,mustbecalledfromthecontextofaTokio1.xruntime',
...

tokio::main和tokio::test

在分析完上面的原因之后,“为什么tokio::main中不会 hang 住而tokio::test会 hang 住?“ 这个问题也很清楚了,他们两者所使用的的 runtime 并不一样。tokio::main使用的是多线程的 runtime,而tokio::test使用的是单线程的 runtime,而在单线程的 runtime 下,当前线程被futures::block_on卡死,那么用户提交的异步代码是一定没机会执行的,从而必然形成上面所说的死锁。

Best practice

经过上面的分析,结合 Rust 基于 generator 的协作式异步特性,我们可以总结出 Rust 下桥接异步代码和同步代码的一些注意事项:

•将异步代码与同步代码结合使用可能会导致阻塞,因此不是一个明智的选择。

•在同步的上下文中调用异步代码时,请使用 futures::block_on 并将异步代码 spawn 到另一个专用的 runtime 中执行 ,因为前者会阻塞当前线程。

•如果必须从异步的上下文中调用有可能阻塞的同步代码(比如文件 IO 等),则建议使用 tokio::spawn_blocking 在专门处理阻塞操作的 executor 上执行相应的代码。





审核编辑:刘清

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • RPC
    RPC
    +关注

    关注

    0

    文章

    102

    浏览量

    11421
  • Asynchrono
    +关注

    关注

    0

    文章

    4

    浏览量

    6494
  • Rust
    +关注

    关注

    1

    文章

    223

    浏览量

    6387

原文标题:如何在同步的 Rust 方法中调用异步代码 | Tokio 使用中的几点教训

文章出处:【微信号:Rust语言中文社区,微信公众号:Rust语言中文社区】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    何在Rust中使用Memcached

    Memcached是一种高性能、分布式的内存对象缓存系统,可用于加速动态Web应用程序。Rust是一种系统级编程语言,具有内存安全、高性能和并发性等特点。Rust语言的Memcached库提供
    的头像 发表于 09-19 16:30 882次阅读

    何在Rust中读写文件

    见的内存安全问题和数据竞争问题。 在Rust中,读写文件是一项非常常见的任务。本教程将介绍如何在Rust中读写文件,包括基础用法和进阶用法。 基础用法 读取文件内容 使用 std::fs::File 和 std::io::Rea
    的头像 发表于 09-20 10:57 1189次阅读

    Rust的多线程编程概念和使用方法

    Rust是一种强类型、高性能的系统编程语言,其官方文档中强调了Rust的标准库具有良好的并发编程支持。Thread是Rust中的一种并发编程方式,本文将介绍Rust中thread的相关
    的头像 发表于 09-20 11:15 503次阅读

    何在labview调用Matlab程序

    大家好!我是新手,想问下如何在labview调用Matlab程序?谢谢
    发表于 05-05 17:10

    何在电路解决EMC的问题?有什么方法

    何在电路解决EMC的问题?有什么方法
    发表于 09-08 15:28

    何在vi调用子vi时,使用队列完成数据的同步

    何在vi调用子vi时,使用队列完成主vi和子vi数据的同步
    发表于 01-14 10:24

    只会用Python?教你在树莓派上开始使用Rust

    如果您对编程感兴趣,那么您可能听说过Rust。该语言由Mozilla设计,受到开发人员的广泛喜爱,并继续在奉献者成长。Raspberry Pi是小型计算机的瑞士军刀,非常适合学习代码。我们将两者
    发表于 05-20 08:00

    USART异步通信同步异步有什么区别

    USART异步通信同步异步有什么区别异步通信怎样连线?
    发表于 12-10 07:34

    同步复位和异步复位到底孰优孰劣

    异步复位,同步释放的理解目录目录同步复位和异步复位异步复位 同步复位 那么
    发表于 01-17 07:01

    如何利用C语言去调用rust静态库

    这篇文章: c语言调用rust库函数按步骤做完,倒是挺顺利,增强了信心。编译arm版静态库上面测试都是在x86上面进行的,嵌入式基本是使用arm和riscv等芯片。考虑到上手门槛,我这里选择了
    发表于 06-21 10:27

    Rust代码中加载静态库时,出现错误 ` rust-lld: error: undefined symbol: malloc `怎么解决?

    我正在 MCUXpresso IDE 创建一个静态库。我正在使用 redlib 在我的代码中导入 ` [i]stdlib.h`。它成功地构建了一个静态库。但是,静态库未定义一些标准库函数,例如
    发表于 06-09 08:44

    LabVIEW如何调试异步调用的VI?

    大佬们,我想在异步调用的VI插入探针,查看数据,应该怎么操作,LabVIEW如何调试异步调用的VI呀?网上都没查到啥结果。
    发表于 07-28 11:19

    【FPGA】异步复位,同步释放的理解

    异步复位,同步释放的理解目录目录 同步复位和异步复位 异步复位 同步复位 那么
    发表于 01-17 12:53 4次下载
    【FPGA】<b class='flag-5'>异步</b>复位,<b class='flag-5'>同步</b>释放的理解

    何在Rust项目中使用InfluxDB 2.x

    了更好的性能和更好的用户体验。Rust语言提供了InfluxDB 2.x的官方客户端库,可以方便地在Rust项目中使用InfluxDB 2.x。 本教程将介绍如何在Rust项目中使用I
    的头像 发表于 09-19 16:33 353次阅读

    何在同步Rust 方法调用异步代码 | Tokio 使用中的几点教训

    同步Rust 方法调用异步代码经常会导致一些问题,特别是对于不熟悉
    的头像 发表于 12-24 16:23 571次阅读