编程语言具有表示异步操作的不同方法。如果您曾经在 JavaScript 中使用过 async/await,那么处理并发的方式Rust应该很熟悉。关键字相同,基本模型相似,只是在 中Rust,延迟计算称为 futures 而不是 promises。不太熟悉的是,您需要选择一个运行时来实际运行异步代码。
Rust面向从裸机、嵌入式设备到在高级操作系统上运行的程序的所有内容,并且像 C++ 一样,专注于零成本抽象。这会影响标准库中包含和不包含的内容。
一旦你掌握了诀窍,你就会发现在 Rust中开始使用 async 真的不难。如果您Rust是第一次编写异步程序,或者只需要使用异步库并且不知道从哪里开始,那么本指南适合您。我会尽力让你尽快开始,同时向你介绍你应该知道的所有要点。
一个异步应用程序应该从 Rusts 生态系统中拉取至少两个 crate:
futures
,居住在存储库中的 rust-lang
官员Rust crate
有些人不想引入比他们需要的更多的依赖项,但这些依赖项与 chrono
or log
板条箱一样重要。唯一的区别是,这些都侧重于异步。
在本教程中,我们将使用 tokio
。您应该至少了解一个运行时,并首先关注它。您可以稍后查看其他运行时。您可能会注意到,它们在功能方面有很多共同点,即使 API 或实现可能不同。
为了在 中使用 async 时提供最佳体验Rust,您应该启用一些 features
.您的 dependencies
Cargo.toml
部分应如下所示:
[dependencies]
futures = { version = "0.3.*" }
tokio = {version = "0.2.*", features = ["full"] }
您的 main.rs
应如下所示。
use futures::prelude::*;
use tokio::prelude::*;
fn main() {
todo!();
}
与您可能习惯的其他语言相反,Rust它没有内置的运行时。我们不会在这里讨论它的利弊,但你需要做出选择并将其作为依赖关系。
某些库要求您使用特定的运行时,因为它们依赖于运行时内部来提供可靠的 API,以便您在现有运行时周围使用或包装自己的 API。一个例子是 actix_web
Web 框架,它将自己的 API 包裹起来 tokio
。
大多数时候,你可以选择任何你想要的运行时。但是,无论您选择哪个运行时,在开始编码之前,您都应该弄清楚如何执行三个基本操作:
Future
如果您了解这些基本操作,则可以完成大多数任务。让我们 Tokio
以为例来介绍这三个方面。
您可以显式实例化运行时,并在其上生成一个未来。你生成的未来将是你的程序的主要入口点,所以把它想象成一个异 main
步函数。
async fn app() {
todo!()
}
fn main() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
let future = app();
rt.block_on(future);
}
您也可以使用较短的版本。这基本上做同样的事情。
#[tokio::main]
async fn main() {
}
Future
当您想同时运行期货(即同时进行的任务)时,这会派上用场。
use tokio::task;
async fn our_async_program() {
todo!();
}
async fn app() {
let concurrent_future = task::spawn(our_async_program());
todo!()
}
这是编写异步代码时的常见问题。如果要利用并发运行代码的运行时,则应避免阻止 Futures
或运行 CPU 密集型代码本身。您以异步方式编写的大多数代码实际上Rust都会在 Future
中执行,这一点很重要。
大多数运行时都提供了一种将此工作卸载到其他线程的方法,这有助于避免阻塞实际推动您的未来完成的线程。在 中 tokio
,您可以通过 task::spawn_blocking
来执行此操作。
使用我们的示例,我们可以执行以下操作。
use tokio::task;
fn fib_cpu_intensive(n: u32) -> u32 {
match n {
0 => 0,
1 => 1,
n => fib_cpu_intensive(n - 1) + fib_cpu_intensive(n - 2),
}
}
async fn app() {
let threadpool_future = task::spawn_blocking(||fib_cpu_intensive(30));
todo!()
}
每个运行时都有一个略有不同的 API 来完成这些任务,但它们都支持它们。如果您知道要寻找什么,您将更容易上手。
在这一点上,我们可以像编写普通同步代码一样轻松地使用异步代码Rust,但让我们更进一步,介绍一些以后可能会派上用场的东西。
I’ll provide a template you can use to start applications where you know you’ll need to write async code. I like to instantiate the runtime explicitly, which is what we’ll do in the template below.
use futures::prelude::*;
use tokio::prelude::*;
use tokio::task;
use log::*;
// Just a generic Result type to ease error handling for us. Errors in multithreaded
// async contexts needs some extra restrictions
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
async fn app() -> Result<()> {
// I treat this as the `main` function of the async part of our program.
todo!()
}
fn main() {
env_logger::init();
let mut rt = tokio::runtime::Runtime::new().unwrap();
match rt.block_on(app()) {
Ok(_) => info!("Done"),
Err(e) => error!("An error ocurred: {}", e),
};
}
I’ve pulled in a few more crates, including:
tokio
)log
env_logger
)Our Cargo.toml
now looks like this:
[dependencies]
futures = { version = "0.3.*"}
tokio = {version = "0.2.*", features = ["full"] }
log = "0.4.*"
env_logger = "0.7.*"
Remember that env_logger
relies on the RUST_LOG
environment variable to determine the log level.
Most of my async application projects start with a main.rs
and a Cargo.toml
like the ones presented above. I can add better error handling or logging if necessary as the project evolves. A popular crate for handling errors in applications is Anyhow. async-log is useful for improving logging in async context.
Throughout this tutorial, we’ll use this basic template for all our code. However, you might notice that I’ve adapted the log output slightly to better suit what I wanted to show. If you want to follow along, you should change your logging initialization in main.rs
to the following.
let start = std::time::Instant::now();
env_logger::Builder::from_default_env().format(move |buf, rec| {
let t = start.elapsed().as_secs_f32();
writeln!(buf, "{:.03} [{}] - {}", t, rec.level(),rec.args())
}).init();
Async functions in Rust differ somewhat from what you’re used to. When you learned Rust, you probably noticed how it’s very precise about what types the argument of a function has and what type the function returns.
Async functions differ in one important way: all your return types are “wrapped” into a Future
.
You might read the documentation about Futures
in Rust and think your async function needs to look like this:
async fn our_async_program() -> impl Future<Output = Result<String>> {
future::ok("Hello world".to_string()).await
}
This is wrong! If you’re doing this, you’re overthinking it. An async
function already wraps the return type, so you can write functions the way you’re used to.
This is what you actually want:
async fn our_async_program() -> Result<String> {
future::ok("Hello world".to_string()).await
}
future::ok
is one of the convenience methods we get from the futures
crate. It wraps a value in a future that returns ready
immediately.
This might seem a bit strange since Rust is usually extremely rigorous when it comes to declaring the correct types, but it’s actually a huge ergonomic boost because it automatically wraps the return types from our async
functions.
You’ll often see examples using async blocks, such as async { ... }
. These are similar to async functions in that they return a special kind of future that wraps whatever we return from the closure. One drawback with these closures is that you’ll have to jump through some hoops to return errors from them via ?
. The return types can be difficult to reason about, which can cause some unneeded confusion when you’re starting out writing async Rust.
My suggestion is to use async functions if you can, especially if you intend to return anything from the future — at least until you’re comfortable with the different return types and how async in Rust works.
Futures in Rust are lazy. By default, they won’t do anything before they’re polled the first time. The future gets polled when you await
it.
For example, if you call a function that returns a future at the start of your program but don’t await
it before the end of the program, the actual request will not be made before you reach the point where you await it (in the end).
Let’s put what we’ve learned so far into practice.
Reqwest is a popular client library for creating web requests. We’ll use that together with Slowwly endpoint, which enables us to define a delay for the server response and gives us little more determinism in our concurrency, making it easier to reason about it.
Let’s add reqwest
to our Cargo.toml
by adding reqwest = "0.10.*"
to the [dependencies]
section.
Create a few requests and see what happens.
fn slowwly(delay_ms: u32) -> reqwest::Url {
let url = format!(
"http://slowwly.robertomurray.co.uk/delay/{}/url/http://www.google.co.uk",
delay_ms,
);
reqwest::Url::parse(&url).unwrap()
}
async fn app() -> Result<()> {
info!("Starting program!");
let _resp1 = reqwest::get(slowwly(1000)).await?;
info!("Got response 1");
let _resp2 = reqwest::get(slowwly(1000)).await?;
info!("Got response 2");
Ok(())
}
Running this gives us the following output.
1.264 [INFO] - Got response 1
2.467 [INFO] - Got response 2
2.468 [INFO] - Done
The time is in seconds/milliseconds. At 1.246, we got the first response from our endpoint (remember, we asked for a delay of one second on the first request). Roughly one second later, at 2.467, we got the second response. The whole program took 2.468 seconds to run.
So, our program is working, but this is not really concurrent, is it? Honestly, it’s not much better than a complicated synchronous program.
Let’s actually take advantage of our async runtime and run the requests concurrently.
async fn request(n: usize) -> Result<()> {
reqwest::get(slowwly(1000)).await?;
info!("Got response {}", n);
Ok(())
}
async fn app() -> Result<()> {
let resp1 = task::spawn(request(1));
let resp2 = task::spawn(request(2));
let _ = resp1.await??;
let _ = resp2.await??;
Ok(())
}
At this point, we should refactor our request out to a separate function for two reasons:
async { }
block. But although it’s possible to specify a return type for our Result<()>
, it’s pretty awkward, so we’ll avoid that for the purpose of this tutorialIf we run our code, we should get this:
1.247 [INFO] - Got response 2
1.256 [INFO] - Got response 1
1.257 [INFO] - Done
That looks better. Our second request finishes at 1.247 and our first at 1.256. The whole program takes 1.257 seconds, which is less than half the time it took in the first example.
Using spawn
enables us to run our requests concurrently. Since Tokio defaults to a multithreaded runtime, tasks spawned this way can also run in parallel on different cores.
Now that we got our program to run concurrently, we can combine some CPU-intensive tasks with some I/O-bound tasks and create a more complex scenario.
Let’s expand our example slightly by making 10 requests and doing some analysis of each response as we get them. We’re super excited to see the ratio of ones versus zeroes in the bytes we get from the response, so we’ll return a count for ones and zeros and report the ratio in the end.
use futures::future::join_all;
// Now we want to both fetch some data and do some CPU intensive analysis on it
async fn get_and_analyze(n: usize) -> Result<(u64, u64)> {
let response: reqwest::Response = reqwest::get(slowwly(1000)).await?;
info!("Dataset {}", n);
// we get the body of the request
let txt = response.text().await?;
// We send our analysis work to a thread where there is no runtime running
// so we don't block the runtime by analyzing the data
let res= task::spawn_blocking(move ||analyze(&txt)).await?;
info!("Processed {}", n);
Ok(res)
}
// Counting the number of ones and zeros in the bytes we get.
fn analyze(txt: &str) -> (u64, u64) {
let txt = txt.as_bytes();
// Let's spend as much time as we can and count them in two passes
let ones = txt.iter().fold(0u64, |acc, b: &u8| acc + b.count_ones() as u64);
let zeros = txt.iter().fold(0u64, |acc, b: &u8| acc + b.count_zeros() as u64);
(ones, zeros)
}
async fn app() -> Result<()> {
// This is new. We can collect futures in a collection. Nice to know!
let mut futures = vec![];
for i in 1..=10 {
let fut = task::spawn(get_and_analyze(i));
futures.push(fut);
}
let results = join_all(futures).await;
let mut total_ones = 0;
let mut total_zeros = 0;
// Returning errors using `?` in iterators can be a bit difficult. Using a
// simple for loop to inspect and work with our results can often be more
// ergonomic
for result in results {
// `spawn_blocking` returns a `JoinResult` we need to unwrap first
let ones_res: Result<(u64, u64)> = result?;
let (ones, zeros) = ones_res?;
total_ones += ones;
total_zeros += zeros;
}
info!("Ratio of ones/zeros: {:.02}",total_ones as f64 / total_zeros as f64);
Ok(())
}
A few things to note:
Futures
crate has many convenient tools, including join_all
, which treats a collection of futures as a single future and drives them all to completion, and the FuturesUnordered
API from the same crate.Vec
rust-analyzer
can help you keep track of what errors are returned. An error handling crate such as Anyhow can also help hereLet’s take a look at what our program outputs.
1.270 [INFO] - Dataset 7
1.275 [INFO] - Dataset 3
1.285 [INFO] - Dataset 2
1.285 [INFO] - Dataset 4
1.291 [INFO] - Dataset 9
1.297 [INFO] - Dataset 1
1.301 [INFO] - Dataset 5
1.308 [INFO] - Dataset 6
1.312 [INFO] - Dataset 8
1.322 [INFO] - Dataset 10
1.374 [INFO] - Processed 7
1.377 [INFO] - Processed 3
1.378 [INFO] - Processed 4
1.378 [INFO] - Processed 2
1.380 [INFO] - Processed 9
1.384 [INFO] - Processed 1
1.385 [INFO] - Processed 5
1.391 [INFO] - Processed 8
1.391 [INFO] - Processed 6
1.397 [INFO] - Processed 10
1.397 [INFO] - Ratio of ones/zeros: 0.95
1.397 [INFO] - Done
Since we send off all our datasets immediately and each one takes a second to return, all our responses come back in as datasets almost simultaneously. Each response is then sent for analysis on a thread pool.
We can see that datasets are not necessarily finished processing in the order in which they come in since they are processed on separate CPU cores.
Since we should spawn both futures and CPU-bound and blocking tasks, what can we write in our async functions?
Normally, you can write most of your code without worrying too much about that, but blocking and CPU-intensive tasks should make you stop and consider whether you should refactor that part of your code so it can be spawned on the thread pool designed for handling these.
If you encounter a situation where you might need one of the following modules, you should check if your run-time has an async alternative for the task you want to perform.
std::sync
std::thread
std::fs
std::net
If your runtime doesn’t have an equivalent, you can use spawn_blocking
and do the operation in a thread pool like you would do with CPU-intensive tasks and await
the result.
In general, functions that call in to your OS and might result in the OS parking the thread you’re calling from are especially harmful to concurrency since you’ll park the executor as well.
Using thread::sleep
is a prime example of a function you should avoid in an async context for this exact reason. It’s tempting to use sleep
to delay an operation, but it’s not a good idea because the OS will park the calling thread for the whole duration. Most runtimes have a way to cause a delay or sleep a task that does not block the executor, thereby disabling all other tasks running on that thread as well.
For CPU-bound tasks, the lines are much blurrier. In general, I would encourage you to not be paranoid about this. Just remember it’s easier to refactor back if you decide to reduce the number of calls to spawn_blocking
than the other way around.
When in doubt, spawn.
By now you should be prepared to write async Rust, and I hope you’ll find it easier to get started on your next async project.
Debugging Rust applications can be difficult, especially when users experience issues that are hard to reproduce. If you’re interested in monitoring and tracking the performance of your Rust apps, automatically surfacing errors, and tracking slow network requests and load time, try LogRocket.
LogRocket is like a DVR for web and mobile apps, recording literally everything that happens on your Rust application. Instead of guessing why problems happen, you can aggregate and report on what state your application was in when an issue occurred. LogRocket also monitors your app’s performance, reporting metrics like client CPU load, client memory usage, and more.
Modernize how you debug your Rust apps — start monitoring for free.
嘿,想帮助我们的博客变得更好吗?
Join LogRocket’s Content Advisory Board. You’ll help inform the type of content we create and get access to exclusive meetups, social accreditation, and swag.
Sign up nowThe Angular tree view can be hard to get right, but once you understand it, it can be quite a powerful visual representation.
Explore the top headless commerce platforms tailored for frontend ecommerce development, with free and paid options like Saleor and Shopify.
Native file watching being stable in Node.js 22 raises a question: is it a worthy candidate to replace the industry favorite Nodemon?
Check out alternatives to the Headless UI library to find unstyled components to optimize your website’s performance without compromising your design.
23 Replies to "A practical guide to async in Rust"
3. Spawning blocking or CPU-intensive tasks
uses `use tokio::runtime::task;` which causes an error. The example before that uses `use tokio::task;`.
Is the former a mistake?
1. Starting the runtime
with `#[tokio::main]` is there implicitly `app()` called?
This doesn’t seem to work for me.
first!
well explained for a new starter, nice examples, thank you
You’re right. It should be `use tokio::task;` as in the earlier example. I’ll check if I can get that corrected. Thanks for catching.
with `#[tokio::main]` is there implicitly `app()` called?
No, in the first example `app` becomes your “asynchronous main”, in the second example the macro turns you “main” into your “asynchronous main”. What happens behind the scenes is pretty much the same though but you lose access to your “synchronous main” in the latter example.
Take a look example here for #1
https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=4ef678f287235988d3cf7d5865a43bb9
Looks like the “shorter version” should be
“`
#[tokio::main]
async fn main() {
app().await;
}
“`
“An async project starter template” needs `use std::io::Write;`
I had to change
“`
let res = future::ok(“Hello world”.to_string()).await?;
“`
to
“`
let res = future::ok::<String,Box>(“Hello world”.to_string()).await.unwrap();
“`
to get rid of compile errors
“CPU-intensive tasks”
`let res= task::spawn_blocking(move ||analyze(txt)).await?;`
is missing a `&`
`let res= task::spawn_blocking(move ||analyze(&txt)).await?;`
I guess the issues were meant as an exercise for the reader 😉
Thanks for posting. Very helpful.
You could write that but if you `cargo expand` (cargo install cargo-expand) the example I wrote you get something like this:
“`
fn main() {
tokio::runtime::Builder::new()
.basic_scheduler()
.threaded_scheduler()
.enable_all()
.build()
.unwrap()
.block_on(async {
{
{
todo!();
}
}
})
}
“`
As you see the “app” part is wrapped in an async block and passed to the runtime so “main” would in essence function like the “app” in the example above.
That’s strange if you get that as an error. There’s no need to pull in Write in that template? It works without problems for me and builds fine. See: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=2a3562d21a808d5b62a49db250cb565e
Yeah, you’re right. I was trying to avoid these in this article but it seems I inadvertently introduced it in those examples. I think it’s better to change it to:
“‘
async fn our_async_program() -> Result {
future::ok(“Hello world”.to_string()).await
}
“‘
Since the compiler can infer the rest in this case. Thanks for pointing it out.
Yes, you’re right. analyze took a “String” in an earlier draft but I changed that without catching this one. Should be fixed soon.
Glad you enjoyed it. Well, I couldn’t make it too easy 🙂 Seriously, thanks for posting. The next person testing all the code should have a slightly easier time, though.
Hi Philip. I see where the confusion lies now. You see, `#[tokio::main]`is a macro that rewrites `fn main()` in a way that the code ends up looking like the first example.
The difference is that the code you write in a main function with `#[tokio::main]` is wrapped in an async block instead of put in a function called `app` but the end result is pretty similar.
I posted the output from the macro in another answer below but it doesn’t look pretty in the comments section here. If you want to check it out for yourself install `cargo install cargo-expand` and run `cargo expand` in the root of a project with the example code in `main.rs`. You’ll see what it expands into.
> // Returning errors using `?` in iterators can be a bit difficult. Using a
// simple for loop to inspect and work with our results can often be more
// ergonomic
What’s wrong with `try_for_each`?
Hi Daniel.
That’s a good suggestion, but I feel the example gets harder to understand using Iterators since we can’t simply unwrap using `?` as we do in the rest of the examples.
`results` is of type: `Vec<Result<Result<(u64, u64), Box, JoinError>>` which is a nested result making the iterator version pretty difficult to understand for people not intimately familiar with functional style programming.
We might as well use `try_fold` instead of `try_for_each` if we were to do this operation using a functional style so the code would look something like this:
“`
let (total_ones, total_zeros) =
results
.into_iter()
.flatten()
.try_fold((0, 0), |(total_ones, total_zeros), res| {
res.map(|(ones, zeros)| (total_ones + ones, total_zeros + zeros))
})?;
“`
I feel it makes things more complicated than it needs to be in an example where the main focus is on keeping the code easy to read for a wide audience without really giving a lot of benefit in return 🙂
Thank you soo much Carl for this amazing article, it has helped me a lot to polish my understanding of async Rust.
Thanks for letting me know Taimoor. Glad to hear you found it useful!
Very cool, Carl Fredrik! Clear and well written.
Keep ’em coming!
Thanks for the feedback, Aksel!
very informative and helpful !!! thanks a lot Carl 🙂
The slowwly service seems broken or more “sloww” than intended, so the examples do not really work right now.
You can work around that by requesting e.g. `http://example.com` and sleeping in the appropriate place using `tokio::time::sleep(Duration::from_secs(1)).await;`. Otherwise good introduction!