کار با تعداد دلخواهی از Futures
وقتی در بخش قبلی از استفاده از دو future به سه future تغییر دادیم، مجبور شدیم به جای استفاده از join
از join3
استفاده کنیم. این مسئله آزاردهنده خواهد بود اگر هر بار که تعداد futuresی که میخواهیم join کنیم تغییر میکند، مجبور به فراخوانی یک تابع متفاوت باشیم. خوشبختانه، یک فرم ماکروی join
داریم که میتوانیم به آن تعداد دلخواهی از آرگومانها را ارسال کنیم. این ماکرو همچنین خودش مدیریت انتظار برای futures را انجام میدهد. بنابراین، میتوانیم کد لیست ۱۷-۱۳ را بازنویسی کنیم تا به جای join3
از join!
استفاده کنیم، همانطور که در لیست ۱۷-۱۴ نشان داده شده است.
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; let tx_fut = async move { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; trpl::join!(tx1_fut, tx_fut, rx_fut); }); }
join!
برای منتظر ماندن چندین آیندهاین قطعاً نسبت به جابجایی بین join
، join3
، join4
و موارد دیگر بهبود یافته است! با این حال، حتی این فرم ماکرو نیز فقط زمانی کار میکند که تعداد futures را از قبل بدانیم. اما در دنیای واقعی Rust، اضافه کردن futures به یک مجموعه و سپس انتظار برای کامل شدن برخی یا تمام آنها یک الگوی رایج است.
برای بررسی تمام futures در یک مجموعه، باید روی همه آنها حلقه بزنیم و آنها را join کنیم. تابع trpl::join_all
هر نوعی را که ویژگی Iterator
را پیادهسازی میکند قبول میکند، که در فصل ۱۳ در بخش ویژگی Iterator و متد next
درباره آن یاد گرفتید، بنابراین به نظر میرسد دقیقاً همان چیزی است که نیاز داریم. بیایید سعی کنیم futures خود را در یک وکتور قرار دهیم و join!
را با join_all
جایگزین کنیم، همانطور که در لیست ۱۷-۱۵ نشان داده شده است.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = async move {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
let tx_fut = async move {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let futures = vec![tx1_fut, rx_fut, tx_fut];
trpl::join_all(futures).await;
});
}
join_all
متأسفانه، این کد کامپایل نمیشود. در عوض، با این خطا مواجه میشویم:
error[E0308]: mismatched types
--> src/main.rs:45:37
|
10 | let tx1_fut = async move {
| ---------- the expected `async` block
...
24 | let rx_fut = async {
| ----- the found `async` block
...
45 | let futures = vec![tx1_fut, rx_fut, tx_fut];
| ^^^^^^ expected `async` block, found a
different `async` block
|
= note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
found `async` block `{async block@src/main.rs:24:22: 24:27}`
= note: no two async blocks, even if identical, have the same type
= help: consider pinning your async block and casting it to a trait object
این ممکن است شگفتآور باشد. بالاخره، هیچیک از بلوکهای async چیزی بازنمیگردانند، بنابراین هر کدام یک Future<Output = ()>
تولید میکنند. اما به یاد داشته باشید که Future
یک ویژگی (trait) است و کامپایلر برای هر بلوک async یک enum منحصربهفرد ایجاد میکند. نمیتوانید دو struct مختلف را که دستی نوشته شدهاند در یک Vec
قرار دهید، و همین قانون برای enumهای مختلفی که توسط کامپایلر تولید میشوند اعمال میشود.
برای اینکه این کار انجام شود، باید از اشیاء ویژگی (trait objects) استفاده کنیم، همانطور که در “بازگرداندن خطاها از تابع run” در فصل ۱۲ انجام دادیم. (ما اشیاء ویژگی را در فصل ۱۸ بهطور مفصل پوشش خواهیم داد.) استفاده از اشیاء ویژگی به ما اجازه میدهد هر یک از futureهای ناشناس تولیدشده توسط این انواع را بهعنوان یک نوع یکسان در نظر بگیریم، زیرا همه آنها ویژگی Future
را پیادهسازی میکنند.
نکته: در بخش فصل ۸ استفاده از یک Enum برای ذخیره مقادیر متعدد، درباره یک روش دیگر برای شامل کردن چندین نوع در یک
Vec
صحبت کردیم: استفاده از یک enum برای نمایش هر نوعی که میتواند در وکتور ظاهر شود. اما نمیتوانیم اینجا از آن استفاده کنیم. از یک طرف، هیچ راهی برای نامگذاری انواع مختلف نداریم، زیرا آنها ناشناس هستند. از طرف دیگر، دلیلی که ما در وهله اول به دنبال یک وکتور وjoin_all
رفتیم، این بود که بتوانیم با یک مجموعه پویا از futures کار کنیم، جایی که فقط به این اهمیت میدهیم که همه آنها خروجی یکسانی دارند.
ابتدا هر future درون vec!
را در یک Box::new
بستهبندی میکنیم، همانطور که در لیست ۱۷-۱۶ نشان داده شده است.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = async move {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
let tx_fut = async move {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let futures =
vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
trpl::join_all(futures).await;
});
}
Box::new
برای تطبیق انواع futures در یک Vec
متأسفانه، این کد هنوز هم کامپایل نمیشود. در واقع، همان خطای پایهای که قبلاً دریافت کردیم، برای فراخوانیهای دوم و سوم Box::new
نیز رخ میدهد، به همراه خطاهای جدیدی که به ویژگی Unpin
اشاره دارند. به زودی به خطاهای مرتبط با Unpin
بازمیگردیم. ابتدا، بیایید خطاهای نوع در فراخوانیهای Box::new
را با مشخص کردن صریح نوع متغیر futures
رفع کنیم (نگاه کنید به لیست ۱۷-۱۷).
extern crate trpl; // required for mdbook test
use std::{future::Future, time::Duration};
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = async move {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
let tx_fut = async move {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let futures: Vec<Box<dyn Future<Output = ()>>> =
vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
trpl::join_all(futures).await;
});
}
این type declaration کمی پیچیده است، بنابراین بیایید آن را مرحله به مرحله بررسی کنیم:
- نوع داخلیترین، خود future است. بهطور صریح اعلام میکنیم که خروجی future نوع واحد
()
است، با نوشتنFuture<Output = ()>
. - سپس ویژگی را با
dyn
علامتگذاری میکنیم تا بهصورت دینامیک باشد. - کل مرجع ویژگی در یک
Box
بستهبندی میشود. - در نهایت، بهطور صریح بیان میکنیم که
futures
یکVec
است که شامل این آیتمها است.
این تغییر تأثیر قابلتوجهی داشت. اکنون وقتی کامپایلر را اجرا میکنیم، فقط خطاهایی که به Unpin
اشاره دارند باقی میمانند. اگرچه سه خطا وجود دارد، اما محتوای آنها بسیار مشابه است.
error[E0308]: mismatched types
--> src/main.rs:46:46
|
10 | let tx1_fut = async move {
| ---------- the expected `async` block
...
24 | let rx_fut = async {
| ----- the found `async` block
...
46 | vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
| -------- ^^^^^^ expected `async` block, found a different `async` block
| |
| arguments to this function are incorrect
|
= note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
found `async` block `{async block@src/main.rs:24:22: 24:27}`
= note: no two async blocks, even if identical, have the same type
= help: consider pinning your async block and casting it to a trait object
note: associated function defined here
--> file:///home/.rustup/toolchains/1.82/lib/rustlib/src/rust/library/alloc/src/boxed.rs:255:12
|
255 | pub fn new(x: T) -> Self {
| ^^^
error[E0308]: mismatched types
--> src/main.rs:46:64
|
10 | let tx1_fut = async move {
| ---------- the expected `async` block
...
30 | let tx_fut = async move {
| ---------- the found `async` block
...
46 | vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
| -------- ^^^^^^ expected `async` block, found a different `async` block
| |
| arguments to this function are incorrect
|
= note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
found `async` block `{async block@src/main.rs:30:22: 30:32}`
= note: no two async blocks, even if identical, have the same type
= help: consider pinning your async block and casting it to a trait object
note: associated function defined here
--> file:///home/.rustup/toolchains/1.82/lib/rustlib/src/rust/library/alloc/src/boxed.rs:255:12
|
255 | pub fn new(x: T) -> Self {
| ^^^
error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
--> src/main.rs:48:24
|
48 | trpl::join_all(futures).await;
| -------------- ^^^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
| |
| required by a bound introduced by this call
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
note: required by a bound in `join_all`
--> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:105:14
|
102 | pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
| -------- required by a bound in this function
...
105 | I::Item: Future,
| ^^^^^^ required by this bound in `join_all`
error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
--> src/main.rs:48:9
|
48 | trpl::join_all(futures).await;
| ^^^^^^^^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
--> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
--> src/main.rs:48:33
|
48 | trpl::join_all(futures).await;
| ^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
--> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
این پیام اطلاعات زیادی برای هضم کردن دارد، بنابراین بیایید آن را تجزیه کنیم. بخش اول پیام به ما میگوید که اولین بلوک async (src/main.rs:8:23: 20:10
) ویژگی Unpin
را پیادهسازی نمیکند و پیشنهاد میدهد از pin!
یا Box::pin
برای حل آن استفاده کنیم. در ادامه این فصل، جزئیات بیشتری درباره Pin
و Unpin
بررسی خواهیم کرد. با این حال، فعلاً میتوانیم فقط از توصیه کامپایلر پیروی کنیم تا از این مشکل عبور کنیم. در لیست ۱۷-۱۸، ابتدا با بهروزرسانی اعلان نوع برای futures
شروع میکنیم، به طوری که هر Box
درون یک Pin
قرار بگیرد. دوم، از Box::pin
برای pin کردن خود futures استفاده میکنیم.
extern crate trpl; // required for mdbook test use std::{ future::Future, pin::{pin, Pin}, time::Duration, }; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = pin!(async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }); let rx_fut = pin!(async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }); let tx_fut = pin!(async move { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }); let futures: Vec<Pin<Box<dyn Future<Output = ()>>>> = vec![Box::pin(tx1_fut), Box::pin(rx_fut), Box::pin(tx_fut)]; trpl::join_all(futures).await; }); }
Pin
و Box::pin
برای برطرف کردن نوع Vec
اگر این کد را کامپایل و اجرا کنیم، در نهایت خروجی موردنظر خود را دریافت میکنیم:
received 'hi'
received 'more'
received 'from'
received 'messages'
received 'the'
received 'for'
received 'future'
received 'you'
آه!
اینجا چیزهای بیشتری برای بررسی وجود دارد. برای یک مورد، استفاده از Pin<Box<T>>
یک مقدار کمی سربار اضافه میکند، زیرا این futures را با Box
روی heap قرار میدهیم—و ما فقط این کار را برای همتراز کردن انواع انجام میدهیم. بعد از همه اینها، ما واقعاً نیازی به تخصیص heap نداریم: این futures به این تابع خاص محدود هستند. همانطور که قبلاً ذکر شد، Pin
خودش یک نوع wrapper است، بنابراین میتوانیم از مزیت داشتن یک نوع واحد در Vec
بهرهمند شویم—دلیل اصلی که به دنبال Box
رفتیم—بدون انجام تخصیص heap. میتوانیم مستقیماً از Pin
با هر future استفاده کنیم، با استفاده از ماکروی std::pin::pin
.
با این حال، هنوز باید نوع مرجع pin شده را بهصراحت مشخص کنیم؛ در غیر این صورت، Rust هنوز نمیداند که اینها را بهعنوان اشیاء ویژگی دینامیک تفسیر کند، که دقیقاً همان چیزی است که ما در Vec
به آن نیاز داریم. بنابراین، هر future را زمانی که تعریف میکنیم با pin!
pin میکنیم، و futures
را بهعنوان یک Vec
که شامل مراجع متغیر pin شده به نوع future دینامیک است تعریف میکنیم، همانطور که در لیست ۱۷-۱۹ نشان داده شده است.
با این حال، باید بهصراحت نوع مرجع pinned را مشخص کنیم؛ در غیر این صورت، راست همچنان نمیداند که اینها را بهعنوان شیءهای ویژگی دینامیک تفسیر کند، که همان چیزی است که برای قرار گرفتن در Vec
نیاز داریم. بنابراین، هر آینده را وقتی تعریف میکنیم pin!
میکنیم و futures
را بهعنوان یک Vec
که شامل مراجع متغیر pinned به نوع ویژگی دینامیک Future
است تعریف میکنیم، همانطور که در فهرست 17-19 نشان داده شده است.
extern crate trpl; // required for mdbook test use std::{ future::Future, pin::{pin, Pin}, time::Duration, }; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = pin!(async move { // --snip-- let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }); let rx_fut = pin!(async { // --snip-- while let Some(value) = rx.recv().await { println!("received '{value}'"); } }); let tx_fut = pin!(async move { // --snip-- let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }); let futures: Vec<Pin<&mut dyn Future<Output = ()>>> = vec![tx1_fut, rx_fut, tx_fut]; trpl::join_all(futures).await; }); }
Pin
با ماکروی pin!
برای اجتناب از تخصیصهای غیرضروری heapتا اینجا با نادیده گرفتن این واقعیت که ممکن است نوعهای Output
مختلفی داشته باشیم، پیش رفتیم. برای مثال، در فهرست 17-20، آینده ناشناس برای a
ویژگی Future<Output = u32>
را پیادهسازی میکند، آینده ناشناس برای b
ویژگی Future<Output = &str>
را پیادهسازی میکند، و آینده ناشناس برای c
ویژگی Future<Output = bool>
را پیادهسازی میکند.
extern crate trpl; // required for mdbook test fn main() { trpl::run(async { let a = async { 1u32 }; let b = async { "Hello!" }; let c = async { true }; let (a_result, b_result, c_result) = trpl::join!(a, b, c); println!("{a_result}, {b_result}, {c_result}"); }); }
میتوانیم از trpl::join!
برای منتظر ماندن استفاده کنیم، زیرا به ما اجازه میدهد چندین نوع future را ارسال کنیم و یک tuple از آن انواع تولید میکند. اما نمیتوانیم از trpl::join_all
استفاده کنیم، زیرا این تابع نیاز دارد که همه futures ارسالشده نوع یکسانی داشته باشند. به یاد داشته باشید، همین خطا بود که ما را به این ماجراجویی با Pin
کشاند!
این یک معاوضه بنیادی است: میتوانیم با تعداد پویایی از futures با استفاده از join_all
کار کنیم، به شرطی که همه آنها نوع یکسانی داشته باشند، یا میتوانیم با تعداد مشخصی از futures با توابع join
یا ماکروی join!
کار کنیم، حتی اگر آنها انواع مختلفی داشته باشند. این همان شرایطی است که هنگام کار با هر نوع دیگری در Rust با آن مواجه میشویم. Futures خاص نیستند، حتی اگر سینتکس مناسبی برای کار با آنها داشته باشیم، و این یک نکته مثبت است.
Racing Futures
وقتی آیندهها را با خانواده توابع و ماکروهای join
“منتظر میمانیم”، نیاز داریم همه آنها تمام شوند قبل از اینکه به مرحله بعدی برویم. گاهی اوقات، اما، فقط نیاز داریم یکی از آیندهها از مجموعهای تمام شود قبل از اینکه به مرحله بعدی برویم—کمی شبیه به مسابقه دادن یک آینده در برابر دیگری.
در لیست ۱۷-۲۱، ما دوباره از trpl::race
استفاده میکنیم تا دو future، یعنی slow
و fast
، را در برابر یکدیگر اجرا کنیم.
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let slow = async { println!("'slow' started."); trpl::sleep(Duration::from_millis(100)).await; println!("'slow' finished."); }; let fast = async { println!("'fast' started."); trpl::sleep(Duration::from_millis(50)).await; println!("'fast' finished."); }; trpl::race(slow, fast).await; }); }
race
برای دریافت نتیجه اولین آیندهای که تمام میشودهر future یک پیام هنگام شروع اجرا چاپ میکند، با فراخوانی و انتظار برای sleep
به مدت مشخصی مکث میکند، و سپس یک پیام دیگر هنگام اتمام چاپ میکند. سپس، هر دو future یعنی slow
و fast
را به trpl::race
ارسال میکنیم و منتظر میمانیم تا یکی از آنها به پایان برسد. (نتیجه اینجا چندان شگفتآور نیست: fast
برنده میشود.) برخلاف زمانی که در “اولین برنامه Async ما” از race
استفاده کردیم، اینجا به نمونه Either
که بازمیگرداند توجه نمیکنیم، زیرا تمام رفتار جالب در بدنه بلوکهای async رخ میدهد.
توجه کنید که اگر ترتیب آرگومانها به race
را جابهجا کنید، ترتیب پیامهای “started” تغییر میکند، حتی اگر future fast
همیشه زودتر به پایان برسد. دلیل این است که پیادهسازی این تابع خاص race
منصفانه نیست. این تابع همیشه futures ارسالشده را به ترتیب آرگومانها اجرا میکند. سایر پیادهسازیها منصفانه هستند و به صورت تصادفی انتخاب میکنند که کدام future را ابتدا poll کنند. با این حال، صرفنظر از اینکه پیادهسازی race
ما منصفانه باشد یا نه، یکی از futures تا اولین await
در بدنهاش اجرا میشود قبل از اینکه task دیگری بتواند شروع شود.
به یاد بیاورید از اولین برنامه Async ما که در هر نقطه await
، Rust به runtime اجازه میدهد تا task را متوقف کند و به task دیگری سوئیچ کند اگر future در حال انتظار آماده نباشد. عکس این موضوع هم صادق است: Rust فقط بلوکهای async را متوقف میکند و کنترل را به runtime بازمیگرداند در یک نقطه await
.
این بدان معناست که اگر در یک بلوک async بدون نقطه await
مقدار زیادی کار انجام دهید، آن future دیگر futures را از پیشرفت باز میدارد. گاهی اوقات ممکن است به این موضوع اشاره شود که یک future دیگر futures را گرسنه میکند. در برخی موارد، این ممکن است مشکل بزرگی نباشد. با این حال، اگر در حال انجام برخی تنظیمات پرهزینه یا کار طولانیمدت هستید، یا اگر futureای دارید که به طور نامحدود یک کار خاص را انجام میدهد، باید به این فکر کنید که چه زمانی و کجا کنترل را به runtime بازگردانید.
به همان اندازه، اگر عملیاتهای مسدودکننده طولانیمدت دارید، async میتواند ابزاری مفید برای ارائه راههایی باشد که بخشهای مختلف برنامه بتوانند با یکدیگر تعامل داشته باشند.
اما در این موارد چگونه کنترل را به runtime بازمیگردانید؟
Yielding Control to the Runtime
بیایید یک عملیات طولانیمدت را شبیهسازی کنیم. لیست ۱۷-۲۲ یک تابع به نام slow
معرفی میکند.
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::run(async { // We will call `slow` here later }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
thread::sleep
برای شبیهسازی عملیات کنداین کد از std::thread::sleep
به جای trpl::sleep
استفاده میکند، به طوری که فراخوانی slow
، Thread فعلی را برای مدت مشخصی از میلیثانیهها مسدود میکند. میتوانیم از slow
به عنوان جایگزینی برای عملیاتهای واقعی که هم طولانیمدت هستند و هم مسدودکننده، استفاده کنیم.
در لیست ۱۷-۲۳، از slow
برای شبیهسازی انجام این نوع کارهای CPU-bound در یک جفت future استفاده میکنیم.
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::run(async { let a = async { println!("'a' started."); slow("a", 30); slow("a", 10); slow("a", 20); trpl::sleep(Duration::from_millis(50)).await; println!("'a' finished."); }; let b = async { println!("'b' started."); slow("b", 75); slow("b", 10); slow("b", 15); slow("b", 350); trpl::sleep(Duration::from_millis(50)).await; println!("'b' finished."); }; trpl::race(a, b).await; }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
thread::sleep
برای شبیهسازی عملیات کندبرای شروع، هر future فقط پس از انجام یک سری عملیات کند، کنترل را به runtime بازمیگرداند. اگر این کد را اجرا کنید، این خروجی را مشاهده خواهید کرد:
'a' started.
'a' ran for 30ms
'a' ran for 10ms
'a' ran for 20ms
'b' started.
'b' ran for 75ms
'b' ran for 10ms
'b' ran for 15ms
'b' ran for 350ms
'a' finished.
همانطور که در مثال قبلی دیدیم، race
همچنان به محض اینکه a
تمام شود، کار را تمام میکند. اما بین دو future هیچ تداخل یا جابهجایی وجود ندارد. future a
تمام کار خود را انجام میدهد تا زمانی که فراخوانی trpl::sleep
منتظر بماند، سپس future b
تمام کار خود را انجام میدهد تا زمانی که فراخوانی trpl::sleep
خودش منتظر بماند، و در نهایت future a
کامل میشود. برای اینکه هر دو future بتوانند بین taskهای کند خود پیشرفت کنند، به نقاط await
نیاز داریم تا بتوانیم کنترل را به runtime بازگردانیم. این به این معناست که به چیزی نیاز داریم که بتوانیم برای آن منتظر بمانیم!
هماکنون میتوانیم این نوع انتقال کنترل را در لیست ۱۷-۲۳ مشاهده کنیم: اگر trpl::sleep
در انتهای future a
را حذف کنیم، این future بدون اجرای future b
به طور کامل به پایان میرسد. بیایید از تابع sleep
بهعنوان نقطه شروعی برای اجازه دادن به عملیاتها برای جابهجا شدن و پیشرفت استفاده کنیم، همانطور که در لیست ۱۷-۲۴ نشان داده شده است.
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::run(async { let one_ms = Duration::from_millis(1); let a = async { println!("'a' started."); slow("a", 30); trpl::sleep(one_ms).await; slow("a", 10); trpl::sleep(one_ms).await; slow("a", 20); trpl::sleep(one_ms).await; println!("'a' finished."); }; let b = async { println!("'b' started."); slow("b", 75); trpl::sleep(one_ms).await; slow("b", 10); trpl::sleep(one_ms).await; slow("b", 15); trpl::sleep(one_ms).await; slow("b", 35); trpl::sleep(one_ms).await; println!("'b' finished."); }; trpl::race(a, b).await; }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
sleep
برای اجازه دادن به عملیاتها برای پیشرفت متناوبدر فهرست 17-24، فراخوانیهای trpl::sleep
با نقاط انتظار بین هر فراخوانی به slow
اضافه میکنیم. اکنون کار دو آینده درهمتنیده شده است:
'a' started.
'a' ran for 30ms
'b' started.
'b' ran for 75ms
'a' ran for 10ms
'b' ran for 10ms
'a' ran for 20ms
'b' ran for 15ms
'a' finished.
future a
هنوز برای مدتی اجرا میشود قبل از اینکه کنترل را به b
منتقل کند، زیرا ابتدا slow
را فراخوانی میکند قبل از اینکه trpl::sleep
را فراخوانی کند. اما پس از آن، futures هر بار که یکی از آنها به یک نقطه await
میرسد، به صورت متناوب جابهجا میشوند. در این مورد، ما این کار را پس از هر فراخوانی به slow
انجام دادهایم، اما میتوانستیم کار را به هر شکلی که برای ما منطقیتر است تقسیم کنیم.
با این حال، واقعاً نمیخواهیم اینجا sleep کنیم؛ میخواهیم به سریعترین شکلی که میتوانیم پیشرفت کنیم. فقط نیاز داریم کنترل را به runtime بازگردانیم. میتوانیم این کار را بهطور مستقیم با استفاده از تابع yield_now
انجام دهیم. در فهرست 17-25، تمام این فراخوانیهای sleep
را با yield_now
جایگزین میکنیم.
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::run(async { let a = async { println!("'a' started."); slow("a", 30); trpl::yield_now().await; slow("a", 10); trpl::yield_now().await; slow("a", 20); trpl::yield_now().await; println!("'a' finished."); }; let b = async { println!("'b' started."); slow("b", 75); trpl::yield_now().await; slow("b", 10); trpl::yield_now().await; slow("b", 15); trpl::yield_now().await; slow("b", 35); trpl::yield_now().await; println!("'b' finished."); }; trpl::race(a, b).await; }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
yield_now
برای اجازه دادن به عملیاتها برای پیشرفت متناوباین کد هم از نظر بیان هدف واقعی واضحتر است و هم میتواند به طور قابلتوجهی سریعتر از استفاده از sleep
باشد، زیرا تایمرهایی مانند آنچه که توسط sleep
استفاده میشود اغلب محدودیتهایی در دقت خود دارند. نسخهای از sleep
که ما استفاده میکنیم، برای مثال، همیشه حداقل به مدت یک میلیثانیه میخوابد، حتی اگر یک Duration
یک نانوثانیهای به آن بدهیم. دوباره، کامپیوترهای مدرن سریع هستند: آنها میتوانند در یک میلیثانیه کارهای زیادی انجام دهند!
میتوانید خودتان این را ببینید با راهاندازی یک بنچمارک کوچک، مانند آنچه در لیست ۱۷-۲۶ نشان داده شده است. (این روش بهویژه دقیقی برای انجام تست عملکرد نیست، اما برای نشان دادن تفاوت در اینجا کافی است.)
extern crate trpl; // required for mdbook test use std::time::{Duration, Instant}; fn main() { trpl::run(async { let one_ns = Duration::from_nanos(1); let start = Instant::now(); async { for _ in 1..1000 { trpl::sleep(one_ns).await; } } .await; let time = Instant::now() - start; println!( "'sleep' version finished after {} seconds.", time.as_secs_f32() ); let start = Instant::now(); async { for _ in 1..1000 { trpl::yield_now().await; } } .await; let time = Instant::now() - start; println!( "'yield' version finished after {} seconds.", time.as_secs_f32() ); }); }
sleep
و yield_now
در اینجا، تمام چاپ وضعیت را کنار میگذاریم، یک Duration
یک نانوثانیهای به trpl::sleep
میدهیم و اجازه میدهیم هر future بهصورت مستقل اجرا شود، بدون هیچ جابهجایی بین futures. سپس ۱,۰۰۰ بار این عملیات را تکرار میکنیم و میبینیم که futureی که از trpl::sleep
استفاده میکند در مقایسه با futureی که از trpl::yield_now
استفاده میکند چقدر زمان میبرد.
نسخهای که از yield_now
استفاده میکند، بسیار سریعتر است!
این بدان معناست که async حتی برای وظایف وابسته به CPU میتواند مفید باشد، بسته به اینکه برنامه شما چه کار دیگری انجام میدهد، زیرا ابزاری مفید برای ساختاردهی روابط بین بخشهای مختلف برنامه فراهم میکند. این نوعی از چندوظیفهگی مشارکتی است، جایی که هر آینده قدرت تصمیمگیری درباره زمان واگذاری کنترل از طریق نقاط انتظار را دارد. بنابراین، هر آینده نیز مسئولیت دارد که از مسدود کردن بیش از حد طولانی اجتناب کند. در برخی سیستمعاملهای مبتنی بر راست برای سیستمهای تعبیهشده، این تنها نوع چندوظیفهگی است!
در کد واقعی، معمولاً فراخوانی توابع را با نقاط await
در هر خط متناوب نمیکنید، البته. در حالی که واگذاری کنترل به این روش نسبتاً کمهزینه است، اما رایگان نیست. در بسیاری از موارد، تلاش برای تقسیم یک task که CPU-bound است ممکن است آن را بهطور قابل توجهی کندتر کند، بنابراین گاهی اوقات برای عملکرد کلی بهتر است که اجازه دهید یک عملیات بهطور مختصر مسدود شود. همیشه اندازهگیری کنید تا ببینید تنگناهای عملکرد واقعی کد شما کجا هستند. اما، این دینامیک اساسی را باید در ذهن داشته باشید، بهویژه اگر واقعاً شاهد انجام مقدار زیادی کار بهصورت ترتیبی باشید، در حالی که انتظار داشتید بهطور همزمان انجام شود!
ساخت انتزاعات Async خودمان
ما همچنین میتوانیم futures را با هم ترکیب کنیم تا الگوهای جدیدی ایجاد کنیم. برای مثال، میتوانیم یک تابع timeout
با استفاده از بلوکهای سازنده async که از قبل داریم، بسازیم. هنگامی که کارمان تمام شد، نتیجه یک بلوک سازنده دیگر خواهد بود که میتوانیم برای ایجاد انتزاعات async بیشتری از آن استفاده کنیم.
فهرست 17-27 نشان میدهد که چگونه انتظار داریم این timeout
با یک آینده کند کار کند.
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let slow = async { trpl::sleep(Duration::from_millis(100)).await; "I finished!" }; match timeout(slow, Duration::from_millis(10)).await { Ok(message) => println!("Succeeded with '{message}'"), Err(duration) => { println!("Failed after {} seconds", duration.as_secs()) } } }); }
timeout
با یک آینده کندبیایید این را پیادهسازی کنیم! برای شروع، بیایید به API مورد نیاز برای timeout
فکر کنیم:
- باید خودش یک تابع async باشد تا بتوانیم منتظر آن بمانیم.
- پارامتر اول آن باید یک آینده برای اجرا باشد. میتوانیم آن را عمومی کنیم تا بتواند با هر آیندهای کار کند.
- پارامتر دوم آن مدتزمان حداکثری برای انتظار خواهد بود. اگر از یک
Duration
استفاده کنیم، این کار ارسال آن بهtrpl::sleep
را آسان میکند. - باید یک
Result
بازگرداند. اگر آینده با موفقیت کامل شود،Result
شاملOk
با مقدار تولیدشده توسط آینده خواهد بود. اگر زمان محدودیت زودتر سپری شود،Result
شاملErr
با مدتزمانی که زمان محدودیت برای آن منتظر ماند خواهد بود.
فهرست 17-28 این اعلان را نشان میدهد.
extern crate trpl; // required for mdbook test
use std::{future::Future, time::Duration};
fn main() {
trpl::run(async {
let slow = async {
trpl::sleep(Duration::from_secs(5)).await;
"Finally finished"
};
match timeout(slow, Duration::from_millis(10)).await {
Ok(message) => println!("Succeeded with '{message}'"),
Err(duration) => {
println!("Failed after {} seconds", duration.as_secs())
}
}
});
}
async fn timeout<F: Future>(
future_to_try: F,
max_time: Duration,
) -> Result<F::Output, Duration> {
// Here is where our implementation will go!
}
timeout
این اهداف ما برای نوعها را برآورده میکند. حالا بیایید به رفتاری که نیاز داریم فکر کنیم: میخواهیم آینده ارسالشده به آن را در برابر مدتزمان محدودیت مسابقه دهیم. میتوانیم از trpl::sleep
برای ساختن یک آینده تایمر از مدتزمان استفاده کنیم و از trpl::race
برای اجرای آن تایمر با آیندهای که کاربر ارسال میکند استفاده کنیم.
ما همچنین میدانیم که race
منصفانه نیست و آرگومانها را به ترتیب ارسالشده poll میکند. بنابراین، ابتدا future_to_try
را به race
ارسال میکنیم تا حتی اگر max_time
مدت زمان بسیار کوتاهی باشد، فرصتی برای تکمیل شدن داشته باشد. اگر future_to_try
زودتر تمام شود، race
مقدار Left
را با خروجی future_to_try
بازمیگرداند. اگر timer
زودتر تمام شود، race
مقدار Right
را با خروجی ()
تایمر بازمیگرداند.
در لیست ۱۷-۲۹، نتیجه انتظار برای trpl::race
را match میکنیم.
extern crate trpl; // required for mdbook test use std::{future::Future, time::Duration}; use trpl::Either; // --snip-- fn main() { trpl::run(async { let slow = async { trpl::sleep(Duration::from_secs(5)).await; "Finally finished" }; match timeout(slow, Duration::from_secs(2)).await { Ok(message) => println!("Succeeded with '{message}'"), Err(duration) => { println!("Failed after {} seconds", duration.as_secs()) } } }); } async fn timeout<F: Future>( future_to_try: F, max_time: Duration, ) -> Result<F::Output, Duration> { match trpl::race(future_to_try, trpl::sleep(max_time)).await { Either::Left(output) => Ok(output), Either::Right(_) => Err(max_time), } }
timeout
با استفاده از race
و sleep
اگر future_to_try
موفق شود و مقدار Left(output)
دریافت کنیم، مقدار Ok(output)
را بازمیگردانیم. اگر به جای آن تایمر خواب منقضی شود و مقدار Right(())
دریافت کنیم، ()
را با _
نادیده گرفته و به جای آن مقدار Err(max_time)
را بازمیگردانیم.
با این کار، یک timeout
عملیاتی داریم که از دو ابزار کمکی async دیگر ساخته شده است. اگر کد خود را اجرا کنیم، پس از انقضای timeout، حالت شکست را چاپ خواهد کرد:
Failed after 2 seconds
از آنجا که futures میتوانند با دیگر futures ترکیب شوند، میتوانید ابزارهای بسیار قدرتمندی با استفاده از بلوکهای سازنده کوچکتر async بسازید. برای مثال، میتوانید از همین رویکرد برای ترکیب timeoutها با retries استفاده کنید و به نوبه خود از آنها با عملیاتی مانند تماسهای شبکه (یکی از مثالهای ابتدای فصل) استفاده کنید.
در عمل، معمولاً مستقیماً با async
و await
کار میکنید و به طور ثانویه از توابع و ماکروهایی مانند join
، join_all
، race
و غیره استفاده میکنید. فقط گاهی نیاز خواهید داشت از pin
برای استفاده از futures با آن APIها استفاده کنید.
اکنون روشهای متعددی برای کار با چندین future به طور همزمان دیدهایم. در ادامه، بررسی خواهیم کرد که چگونه میتوانیم با چندین future به صورت متوالی در طول زمان با streams کار کنیم. با این حال، در ابتدا ممکن است بخواهید به چند نکته دیگر توجه کنید:
-
ما از یک
Vec
همراه باjoin_all
استفاده کردیم تا منتظر بمانیم تمام futures در یک گروه به پایان برسند. چگونه میتوانید از یکVec
برای پردازش یک گروه از futures به صورت متوالی استفاده کنید؟ معاوضههای انجام این کار چیست؟ -
به نوع
futures::stream::FuturesUnordered
از cratefutures
نگاهی بیندازید. استفاده از آن چگونه میتواند با استفاده از یکVec
متفاوت باشد؟ (نگران این نباشید که این نوع از بخشstream
crate آمده است؛ با هر مجموعهای از futures به خوبی کار میکند.)