0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

如何在同步的 Rust 方法中调用异步代码 | Tokio 使用中的几点教训

OSC开源社区 来源:GreptimeDB 2023-12-24 16:23 次阅读

在同步的 Rust 方法中调用异步代码经常会导致一些问题,特别是对于不熟悉异步 Rust runtime 底层原理的初学者。在本文中,我们将讨论我们遇到的一个特殊问题,并分享我们采取的解决方法的经验。

背景和问题

在做GreptimeDB项目的时候,我们遇到一个关于在同步 Rust 方法中调用异步代码的问题。经过一系列故障排查后,我们弄清了问题的原委,这大大加深了对异步 Rust 的理解,因此在这篇文章中分享给大家,希望能给被相似问题困扰的 Rust 开发者一些启发。

我们的整个项目是基于Tokio这个异步 Rust runtime 的,它将协作式的任务运行和调度方便地封装在.await调用中,非常简洁优雅。但是这样也让不熟悉 Tokio 底层原理的用户一不小心就掉入到坑里。

我们遇到的问题是,需要在一个第三方库的 trait 实现中执行一些异步代码,而这个 trait 是同步的,我们无法修改这个 trait 的定义。

traitSequencer{
fngenerate(&self)->Vec;
}

我们用一个PlainSequencer来实现这个 trait ,而在实现generate方法的时候依赖一些异步的调用(比如这里的PlainSequencer::generate_async):

implPlainSequencer{
asyncfngenerate_async(&self)->Vec{
letmutres=vec![];
foriin0..self.bound{
res.push(i);
tokio::sleep(Duration::from_millis(100)).await;
}
res
}
}

implSequencerforPlainSequencer{
fngenerate(&self)->Vec{
self.generate_async().await
}
}

这样就会出现问题,因为generate是一个同步方法,里面是不能直接 await 的。

error[E0728]:`await`isonlyallowedinside`async`functionsandblocks
-->src/common/tt.rs30
|
31|/fngenerate(&self)->Vec{
32||self.generate_async().await
||^^^^^^onlyallowedinside`async`functionsandblocks
33||}
||_____-thisisnot`async`

我们首先想到的是,Tokio 的 runtime 有一个Runtime::block_on方法,可以同步地等待一个 future 完成。

implSequencerforPlainSequencer{
fngenerate(&self)->Vec{
RUNTIME.block_on(async{
self.generate_async().await
})
}
}

#[cfg(test)]
modtests{
#[tokio::test]
asyncfntest_sync_method(){
letsequencer=PlainSequencer{
bound:3
};
letvec=sequencer.generate();
println!("vec:{:?}",vec);
}
}

编译可以通过,但是运行时直接报错:

Cannotstartaruntimefromwithinaruntime.Thishappensbecauseafunction(like`block_on`)attemptedtoblockthecurrentthreadwhilethethreadisbeingusedtodriveasynchronoustasks.
thread'tests::test_sync_method'panickedat'Cannotstartaruntimefromwithinaruntime.Thishappensbecauseafunction(like`block_on`)attemptedtoblockthecurrentthreadwhilethethreadisbeingusedtodriveasynchronoustasks.',/Users/lei/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/enter.rs9

提示不能从一个执行中的 runtime 直接启动另一个异步 runtime。看来 Tokio 为了避免这种情况特地在Runtime::block_on入口做了检查。既然不行那我们就再看看其他的异步库是否有类似的异步转同步的方法。

果然找到一个futures::block_on。

implSequencerforPlainSequencer{
fngenerate(&self)->Vec{
futures::block_on(async{
self.generate_async().await
})
}
}

编译同样没问题,但是运行时代码直接直接 hang 住不返回了。


cargotest--color=always--packagetokio-demo

--bintttests::test_sync_method

--no-fail-fast----format=json

--exact-Zunstable-options--show-output

Compilingtokio-demov0.1.0(/Users/lei/Workspace/Rust/learning/tokio-demo) Finishedtest[unoptimized+debuginfo]target(s)in0.39s Runningunittestssrc/common/tt.rs(target/debug/deps/tt-adb10abca6625c07) {"type":"suite","event":"started","test_count":1} {"type":"test","event":"started","name":"tests::test_sync_method"} #theexecutionjusthangshere:(

明明generate_async方法里面只有一个简单的sleep()调用,但是为什么 future 一直没完成呢?

并且吊诡的是,同样的代码,在tokio::test里面会 hang 住,但是在tokio::main中则可以正常执行完毕:

#[tokio::main]
pubasyncfnmain(){
letsequencer=PlainSequencer{
bound:3
};
letvec=sequencer.generate();
println!("vec:{:?}",vec);
}

执行结果:

cargorun--color=always--packagetokio-demo--bintt
Finisheddev[unoptimized+debuginfo]target(s)in0.05s
Running`target/debug/tt`
vec:[0,1,2]

其实当初真正遇到这个问题的时候定位到具体在哪里 hang 住并没有那么容易。真实代码中 async 执行的是一个远程的 gRPC 调用,当初怀疑过是否是 gRPC server 的问题,动用了网络抓包等等手段最终发现是 client 侧的问题。

这也提醒了我们在出现 bug 的时候,抽象出问题代码的执行模式并且做出一个最小可复现的样例(Minimal Reproducible Example)是非常重要的。

Catchup

在 Rust 中,一个异步的代码块会被make_async_expr编译为一个实现了std::Future的 generator。

#[tokio::test]
asyncfntest_future(){
letfuture=async{
println!("hello");
};

//theaboveasyncblockwon'tgetexecuteduntilweawaitit.
future.await;
}

而.await本质上是一个语法糖,则会被lower_expr_await编译成类似于下面的一个语法结构:

//pseudo-rustcode
match::into_future(){
mut__awaitee=>loop{
matchunsafe{::poll(
<::Pin>::new_unchecked(&mut__awaitee),
::get_context(task_context),
)}{
::Ready(result)=>breakresult,
::Pending=>{}
}
task_context=yield();
}
}

在上面这个去掉了语法糖的伪代码中,可以看到有一个循环不停地检查 generator 的状态是否为已完成(std::poll)。

自然地,必然存在一个组件来做这件事,这里就是 Tokio 和async-std这类异步运行时发挥作用的地方了。Rust 在设计之初就特意将异步的语法(async/await)和异步运行时的实现分开,在上述的示例代码中,poll 的操作是由 Tokio 的 executor 执行的。

问题分析

回顾完背景知识,我们再看一眼方法的实现:

fngenerate(&self)->Vec{
futures::block_on(async{
self.generate_async().await
})
}

调用generate方法的肯定是 Tokio 的 executor,那么 block_on 里面的self.generate_async().await这个 future 又是谁在 poll 呢?

一开始我以为,futures::block_on会有一个内部的 runtime 去负责generate_async的 poll。于是查看了代码(主要是futures_executor::run_executor这个方法):

fnrun_executor,>

立刻嗅到了一丝不对的味道,虽然这个方法名为run_executor,但是整个方法里面貌似没有任何 spawn 的操作,只是在当前线程不停的循环判断用户提交的 future 的状态是否为 ready 啊!

这意味着,当 Tokio 的 runtime 线程执行到这里的时候,会立刻进入一个循环,在循环中不停地判断用户的的 future 是否 ready。如果还是 pending 状态,则将当前线程 park 住。

假设,用户 future 的异步任务也是交给了当前线程去执行,futures::block_on等待用户的 future ready,而用户 future 等待futures::block_on释放当前的线程资源,那么不就死锁了?

这个推论听起来很有道理,让我们来验证一下。既然不能在当前 runtime 线程 block,那就重新开一个 runtime block:

implSequencerforPlainSequencer{
fngenerate(&self)->Vec{
letbound=self.bound;
futures::block_on(asyncmove{
RUNTIME.spawn(asyncmove{
letmutres=vec![];
foriin0..bound{
res.push(i);
tokio::sleep(Duration::from_millis(100)).await;
}
res
}).await.unwrap()
})
}
}

果然可以了。


cargotest--color=always--packagetokio-demo

--bintttests::test_sync_method

--no-fail-fast----format=json

--exact-Zunstable-options--show-output

Finishedtest[unoptimized+debuginfo]target(s)in0.04s Runningunittestssrc/common/tt.rs(target/debug/deps/tt-adb10abca6625c07) vec:[0,1,2]

值得注意的是,在futures::block_on里面,额外使用了一个RUNTIME来 spawn 我们的异步代码。其原因还是刚刚所说的,这个异步任务需要一个 runtime 来驱动状态的变化。

如果我们删除 RUNTIME,而为 futures::block_on 生成一个新的线程,虽然死锁问题得到了解决,但tokio::sleep 方法的调用会报错"no reactor is running",这是因为 Tokio 的功能运作需要一个 runtime:

called`Result::unwrap()`onan`Err`value:Any{..}
thread''panickedat'thereisnoreactorrunning,mustbecalledfromthecontextofaTokio1.xruntime',
...

tokio::main和tokio::test

在分析完上面的原因之后,“为什么tokio::main中不会 hang 住而tokio::test会 hang 住?“ 这个问题也很清楚了,他们两者所使用的的 runtime 并不一样。tokio::main使用的是多线程的 runtime,而tokio::test使用的是单线程的 runtime,而在单线程的 runtime 下,当前线程被futures::block_on卡死,那么用户提交的异步代码是一定没机会执行的,从而必然形成上面所说的死锁。

Best practice

经过上面的分析,结合 Rust 基于 generator 的协作式异步特性,我们可以总结出 Rust 下桥接异步代码和同步代码的一些注意事项:

•将异步代码与同步代码结合使用可能会导致阻塞,因此不是一个明智的选择。

•在同步的上下文中调用异步代码时,请使用 futures::block_on 并将异步代码 spawn 到另一个专用的 runtime 中执行 ,因为前者会阻塞当前线程。

•如果必须从异步的上下文中调用有可能阻塞的同步代码(比如文件 IO 等),则建议使用 tokio::spawn_blocking 在专门处理阻塞操作的 executor 上执行相应的代码。








审核编辑:刘清

,>
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • Rust
    +关注

    关注

    1

    文章

    223

    浏览量

    6387

原文标题:如何在同步的Rust方法中调用异步代码 | Tokio使用中的几点教训

文章出处:【微信号:OSC开源社区,微信公众号:OSC开源社区】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    什么是Tokio模块 Channel?

    中的一个重要组成部分,它可以用于在异步任务之间传递数据。在本教程中,我们将介绍 Rust 语言中的 Tokio 模块 channel,并提供几个示例,以帮助您更好地理解它的使用方法
    的头像 发表于 09-19 15:57 681次阅读

    FPGA异步时钟设计同步策略

    摘要:FPGA异步时钟设计如何避免亚稳态的产生是一个必须考虑的问题。本文介绍了FPGA异步时钟设计容易产生的亚稳态现象及其可能造成的危害,同时根据实践经验给出了解决这些问题的几种
    发表于 04-21 16:52

    何在vi调用子vi时,使用队列完成数据的同步

    何在vi调用子vi时,使用队列完成主vi和子vi数据的同步
    发表于 01-14 10:24

    简谈异步电路的时钟同步处理方法

    大家好,又到了每日学习的时候了。今天我们来聊一聊异步电路的时钟同步处理方法。既然说到了时钟的同步处理,那么什么是时钟的
    发表于 02-09 11:21

    Rust代码中加载静态库时,出现错误 ` rust-lld: error: undefined symbol: malloc `怎么解决?

    我正在 MCUXpresso IDE 创建一个静态库。我正在使用 redlib 在我的代码中导入 ` [i]stdlib.h`。它成功地构建了一个静态库。但是,静态库未定义一些标准库函数,例如
    发表于 06-09 08:44

    LabVIEW如何调试异步调用的VI?

    大佬们,我想在异步调用的VI插入探针,查看数据,应该怎么操作,LabVIEW如何调试异步调用的VI呀?网上都没查到啥结果。
    发表于 07-28 11:19

    使用tokio实现一个简单的Client和Server通讯模型

    本系列是关于用Rust构建一个KV Server的系列文章,内容包括用tokio做底层异步网络通讯、使用toml文件做配置、protobuf做传输协议、内存/RockDB做数据存储、事件通知、优雅关机、并发连接限制及测量监控等。
    的头像 发表于 09-09 09:45 1885次阅读

    何在同步Rust方法调用异步代码呢?

    同步Rust 方法调用异步代码经常会导致一些问题,特别是对于不熟悉
    的头像 发表于 03-17 09:18 1547次阅读

    文盘Rust -- 用Tokio实现简易任务池

    59执行完后面就没有输出了,如果把max_task设置为2,情况会好一点,但是也没有执行完所有的异步操作,也就是说在资源不足的情况下,Tokio会抛弃某些任务,这不符合我们的预期。
    的头像 发表于 04-09 10:24 1092次阅读

    Tokio 模块的优雅停机机制

    在进行高并发、网络编程时,优雅停机是一个非常重要的问题。在 Rust 语言中,Tokio 是一个非常流行的异步编程框架,它提供了一些优雅停机的机制,本文将围绕 Tokio 模块的优雅停
    的头像 发表于 09-19 15:26 308次阅读

    如何使用Tokio 和 Tracing模块构建异步的网络应用程序

    Rust 语言中,Tokio 是一个非常流行的异步运行时,它提供了高效的异步 I/O 操作和任务调度。而 Tracing 则是一个用于应用程序跟踪的框架,它可以帮助我们理解应用程序
    的头像 发表于 09-19 15:29 348次阅读

    如何使用 Tokio 模块的Channel

    Channel 是一种在多线程环境下进行通信的机制,可以让线程之间互相发送消息和共享数据。Rust 语言中的 Tokio 模块提供了一种异步的 Channel 实现,使得我们可以在异步
    的头像 发表于 09-19 15:38 334次阅读

    tokio模块channel中的使用场景和优缺点

    Rust 语言的 tokio 模块提供了一种高效的异步编程方式,其中的 channel 模块是其核心组件之一。本教程将介绍 tokio 模块 channel 的除了上文提到的 mspc
    的头像 发表于 09-19 15:54 399次阅读

    Tokio 的基本用法

    Tokio 是一个异步 I/O 框架,它提供了一种高效的方式来编写异步代码。它使用 Rust 语言的 Futures 库来管理
    的头像 发表于 09-19 16:05 424次阅读

    Channel模块的使用方法示例

    Rust 语言中的 Tokio 模块是一个异步编程库,它提供了一种高效的方式来处理异步任务。其中,channel 是 Tokio 模块中的一
    的头像 发表于 09-20 11:47 509次阅读