0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

如何在同步的 Rust 方法中调用异步代码 | Tokio 使用中的几点教训

OSC开源社区 来源:GreptimeDB 2023-12-24 16:23 次阅读
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

在同步的 Rust 方法中调用异步代码经常会导致一些问题,特别是对于不熟悉异步 Rust runtime 底层原理的初学者。在本文中,我们将讨论我们遇到的一个特殊问题,并分享我们采取的解决方法的经验。

背景和问题

在做GreptimeDB项目的时候,我们遇到一个关于在同步 Rust 方法中调用异步代码的问题。经过一系列故障排查后,我们弄清了问题的原委,这大大加深了对异步 Rust 的理解,因此在这篇文章中分享给大家,希望能给被相似问题困扰的 Rust 开发者一些启发。

我们的整个项目是基于Tokio这个异步 Rust runtime 的,它将协作式的任务运行和调度方便地封装在.await调用中,非常简洁优雅。但是这样也让不熟悉 Tokio 底层原理的用户一不小心就掉入到坑里。

我们遇到的问题是,需要在一个第三方库的 trait 实现中执行一些异步代码,而这个 trait 是同步的,我们无法修改这个 trait 的定义。

traitSequencer{
fngenerate(&self)->Vec;
}

我们用一个PlainSequencer来实现这个 trait ,而在实现generate方法的时候依赖一些异步的调用(比如这里的PlainSequencer::generate_async):

implPlainSequencer{
asyncfngenerate_async(&self)->Vec{
letmutres=vec![];
foriin0..self.bound{
res.push(i);
tokio::sleep(Duration::from_millis(100)).await;
}
res
}
}

implSequencerforPlainSequencer{
fngenerate(&self)->Vec{
self.generate_async().await
}
}

这样就会出现问题,因为generate是一个同步方法,里面是不能直接 await 的。

error[E0728]:`await`isonlyallowedinside`async`functionsandblocks
-->src/common/tt.rs30
|
31|/fngenerate(&self)->Vec{
32||self.generate_async().await
||^^^^^^onlyallowedinside`async`functionsandblocks
33||}
||_____-thisisnot`async`

我们首先想到的是,Tokio 的 runtime 有一个Runtime::block_on方法,可以同步地等待一个 future 完成。

implSequencerforPlainSequencer{
fngenerate(&self)->Vec{
RUNTIME.block_on(async{
self.generate_async().await
})
}
}

#[cfg(test)]
modtests{
#[tokio::test]
asyncfntest_sync_method(){
letsequencer=PlainSequencer{
bound:3
};
letvec=sequencer.generate();
println!("vec:{:?}",vec);
}
}

编译可以通过,但是运行时直接报错:

Cannotstartaruntimefromwithinaruntime.Thishappensbecauseafunction(like`block_on`)attemptedtoblockthecurrentthreadwhilethethreadisbeingusedtodriveasynchronoustasks.
thread'tests::test_sync_method'panickedat'Cannotstartaruntimefromwithinaruntime.Thishappensbecauseafunction(like`block_on`)attemptedtoblockthecurrentthreadwhilethethreadisbeingusedtodriveasynchronoustasks.',/Users/lei/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/enter.rs9

提示不能从一个执行中的 runtime 直接启动另一个异步 runtime。看来 Tokio 为了避免这种情况特地在Runtime::block_on入口做了检查。既然不行那我们就再看看其他的异步库是否有类似的异步转同步的方法。

果然找到一个futures::block_on。

implSequencerforPlainSequencer{
fngenerate(&self)->Vec{
futures::block_on(async{
self.generate_async().await
})
}
}

编译同样没问题,但是运行时代码直接直接 hang 住不返回了。


cargotest--color=always--packagetokio-demo

--bintttests::test_sync_method

--no-fail-fast----format=json

--exact-Zunstable-options--show-output

Compilingtokio-demov0.1.0(/Users/lei/Workspace/Rust/learning/tokio-demo) Finishedtest[unoptimized+debuginfo]target(s)in0.39s Runningunittestssrc/common/tt.rs(target/debug/deps/tt-adb10abca6625c07) {"type":"suite","event":"started","test_count":1} {"type":"test","event":"started","name":"tests::test_sync_method"} #theexecutionjusthangshere:(

明明generate_async方法里面只有一个简单的sleep()调用,但是为什么 future 一直没完成呢?

并且吊诡的是,同样的代码,在tokio::test里面会 hang 住,但是在tokio::main中则可以正常执行完毕:

#[tokio::main]
pubasyncfnmain(){
letsequencer=PlainSequencer{
bound:3
};
letvec=sequencer.generate();
println!("vec:{:?}",vec);
}

执行结果:

cargorun--color=always--packagetokio-demo--bintt
Finisheddev[unoptimized+debuginfo]target(s)in0.05s
Running`target/debug/tt`
vec:[0,1,2]

其实当初真正遇到这个问题的时候定位到具体在哪里 hang 住并没有那么容易。真实代码中 async 执行的是一个远程的 gRPC 调用,当初怀疑过是否是 gRPC server 的问题,动用了网络抓包等等手段最终发现是 client 侧的问题。

这也提醒了我们在出现 bug 的时候,抽象出问题代码的执行模式并且做出一个最小可复现的样例(Minimal Reproducible Example)是非常重要的。

Catchup

在 Rust 中,一个异步的代码块会被make_async_expr编译为一个实现了std::Future的 generator。

#[tokio::test]
asyncfntest_future(){
letfuture=async{
println!("hello");
};

//theaboveasyncblockwon'tgetexecuteduntilweawaitit.
future.await;
}

而.await本质上是一个语法糖,则会被lower_expr_await编译成类似于下面的一个语法结构:

//pseudo-rustcode
match::into_future(){
mut__awaitee=>loop{
matchunsafe{::poll(
<::Pin>::new_unchecked(&mut__awaitee),
::get_context(task_context),
)}{
::Ready(result)=>breakresult,
::Pending=>{}
}
task_context=yield();
}
}

在上面这个去掉了语法糖的伪代码中,可以看到有一个循环不停地检查 generator 的状态是否为已完成(std::poll)。

自然地,必然存在一个组件来做这件事,这里就是 Tokio 和async-std这类异步运行时发挥作用的地方了。Rust 在设计之初就特意将异步的语法(async/await)和异步运行时的实现分开,在上述的示例代码中,poll 的操作是由 Tokio 的 executor 执行的。

问题分析

回顾完背景知识,我们再看一眼方法的实现:

fngenerate(&self)->Vec{
futures::block_on(async{
self.generate_async().await
})
}

调用generate方法的肯定是 Tokio 的 executor,那么 block_on 里面的self.generate_async().await这个 future 又是谁在 poll 呢?

一开始我以为,futures::block_on会有一个内部的 runtime 去负责generate_async的 poll。于是查看了代码(主要是futures_executor::run_executor这个方法):

fnrun_executor,>

立刻嗅到了一丝不对的味道,虽然这个方法名为run_executor,但是整个方法里面貌似没有任何 spawn 的操作,只是在当前线程不停的循环判断用户提交的 future 的状态是否为 ready 啊!

这意味着,当 Tokio 的 runtime 线程执行到这里的时候,会立刻进入一个循环,在循环中不停地判断用户的的 future 是否 ready。如果还是 pending 状态,则将当前线程 park 住。

假设,用户 future 的异步任务也是交给了当前线程去执行,futures::block_on等待用户的 future ready,而用户 future 等待futures::block_on释放当前的线程资源,那么不就死锁了?

这个推论听起来很有道理,让我们来验证一下。既然不能在当前 runtime 线程 block,那就重新开一个 runtime block:

implSequencerforPlainSequencer{
fngenerate(&self)->Vec{
letbound=self.bound;
futures::block_on(asyncmove{
RUNTIME.spawn(asyncmove{
letmutres=vec![];
foriin0..bound{
res.push(i);
tokio::sleep(Duration::from_millis(100)).await;
}
res
}).await.unwrap()
})
}
}

果然可以了。


cargotest--color=always--packagetokio-demo

--bintttests::test_sync_method

--no-fail-fast----format=json

--exact-Zunstable-options--show-output

Finishedtest[unoptimized+debuginfo]target(s)in0.04s Runningunittestssrc/common/tt.rs(target/debug/deps/tt-adb10abca6625c07) vec:[0,1,2]

值得注意的是,在futures::block_on里面,额外使用了一个RUNTIME来 spawn 我们的异步代码。其原因还是刚刚所说的,这个异步任务需要一个 runtime 来驱动状态的变化。

如果我们删除 RUNTIME,而为 futures::block_on 生成一个新的线程,虽然死锁问题得到了解决,但tokio::sleep 方法的调用会报错"no reactor is running",这是因为 Tokio 的功能运作需要一个 runtime:

called`Result::unwrap()`onan`Err`value:Any{..}
thread''panickedat'thereisnoreactorrunning,mustbecalledfromthecontextofaTokio1.xruntime',
...

tokio::main和tokio::test

在分析完上面的原因之后,“为什么tokio::main中不会 hang 住而tokio::test会 hang 住?“ 这个问题也很清楚了,他们两者所使用的的 runtime 并不一样。tokio::main使用的是多线程的 runtime,而tokio::test使用的是单线程的 runtime,而在单线程的 runtime 下,当前线程被futures::block_on卡死,那么用户提交的异步代码是一定没机会执行的,从而必然形成上面所说的死锁。

Best practice

经过上面的分析,结合 Rust 基于 generator 的协作式异步特性,我们可以总结出 Rust 下桥接异步代码和同步代码的一些注意事项:

•将异步代码与同步代码结合使用可能会导致阻塞,因此不是一个明智的选择。

•在同步的上下文中调用异步代码时,请使用 futures::block_on 并将异步代码 spawn 到另一个专用的 runtime 中执行 ,因为前者会阻塞当前线程。

•如果必须从异步的上下文中调用有可能阻塞的同步代码(比如文件 IO 等),则建议使用 tokio::spawn_blocking 在专门处理阻塞操作的 executor 上执行相应的代码。








审核编辑:刘清

,>
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • Rust
    +关注

    关注

    1

    文章

    240

    浏览量

    7477

原文标题:如何在同步的Rust方法中调用异步代码 | Tokio使用中的几点教训

文章出处:【微信号:OSC开源社区,微信公众号:OSC开源社区】欢迎添加关注!文章转载请注明出处。

收藏 人收藏
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

    评论

    相关推荐
    热点推荐

    什么是Tokio模块 Channel?

    的一个重要组成部分,它可以用于在异步任务之间传递数据。在本教程,我们将介绍 Rust 语言中的 Tokio 模块 channel,并提供几个示例,以帮助您更好地理解它的使用
    的头像 发表于 09-19 15:57 1686次阅读

    何在Rust连接和使用MySQL数据库

    何在Rust连接和使用MySQL数据库。 安装 mysql 模块 这里我们假设你已经安装了Rust编程语言工具链,在本教程,我们将使用
    的头像 发表于 09-30 17:05 2910次阅读

    何在Rust读写文件

    见的内存安全问题和数据竞争问题。 在Rust,读写文件是一项非常常见的任务。本教程将介绍如何在Rust读写文件,包括基础用法和进阶用法。
    的头像 发表于 09-20 10:57 2970次阅读

    Rust的多线程编程概念和使用方法

    Rust是一种强类型、高性能的系统编程语言,其官方文档强调了Rust的标准库具有良好的并发编程支持。Thread是Rust的一种并发编程
    的头像 发表于 09-20 11:15 1747次阅读

    LabVIEW如何调试异步调用的VI?

    大佬们,我想在异步调用的VI插入探针,查看数据,应该怎么操作,LabVIEW如何调试异步调用的VI呀?网上都没查到啥结果。
    发表于 07-28 11:19

    FPGA设计异步复位同步释放问题

    异步复位同步释放 首先要说一下同步复位与异步复位的区别。 同步复位是指复位信号在时钟的上升沿或者下降沿才能起作用,而
    发表于 06-07 02:46 2518次阅读

    何在函数库调用指令?

    函数是一段可复用的代码。我们通常把重复的代码放进函数并且在不同的地方去调用它。库是函数的集合。我们可以在库定义经常使用的函数,这样其它脚
    的头像 发表于 08-31 15:51 4295次阅读

    何在同步Rust方法调用异步代码呢?

    同步Rust 方法调用异步代码经常会导致一些
    的头像 发表于 03-17 09:18 2882次阅读

    Tokio 模块的优雅停机机制

    在进行高并发、网络编程时,优雅停机是一个非常重要的问题。在 Rust 语言中,Tokio 是一个非常流行的异步编程框架,它提供了一些优雅停机的机制,本文将围绕 Tokio 模块的优雅停
    的头像 发表于 09-19 15:26 1277次阅读

    如何使用Tokio 和 Tracing模块构建异步的网络应用程序

    ,并在调试和故障排除时提供有用的信息。 在本教程,我们将介绍如何使用 Tokio 和 Tracing 模块来构建一个异步的网络应用程序,并使用 Tracing 来记录应用程序的行为和性能。我们将从安装和配置开始,然后介绍如何使
    的头像 发表于 09-19 15:29 1397次阅读

    如何使用 Tokio 模块的Channel

    便地进行消息传递和数据共享。 在本教程是 Channel 的下篇,我们将介绍如何使用 Tokio 模块的 Channel,包括如何使用异步 Channel 和如何使用标准库同步 C
    的头像 发表于 09-19 15:38 1513次阅读

    tokio模块channel的使用场景和优缺点

    Rust 语言的 tokio 模块提供了一种高效的异步编程方式,其中的 channel 模块是其核心组件之一。本教程将介绍 tokio 模块 channel 的除了上文提到的 mspc
    的头像 发表于 09-19 15:54 1702次阅读

    Tokio 的基本用法

    Tokio 是一个异步 I/O 框架,它提供了一种高效的方式来编写异步代码。它使用 Rust 语言的 Futures 库来管理
    的头像 发表于 09-19 16:05 1570次阅读

    Channel模块的使用方法示例

    Rust 语言中的 Tokio 模块是一个异步编程库,它提供了一种高效的方式来处理异步任务。其中,channel 是 Tokio 模块
    的头像 发表于 09-20 11:47 2033次阅读

    异步电路的时钟同步处理方法

    异步电路的时钟同步处理方法  时钟同步异步电路
    的头像 发表于 01-16 14:42 2126次阅读