کار با تعداد دلخواهی از Futures

وقتی در بخش قبلی از استفاده از دو future به سه future تغییر دادیم، مجبور شدیم به جای استفاده از join از join3 استفاده کنیم. این مسئله آزاردهنده خواهد بود اگر هر بار که تعداد futuresی که می‌خواهیم join کنیم تغییر می‌کند، مجبور به فراخوانی یک تابع متفاوت باشیم. خوشبختانه، یک فرم ماکروی join داریم که می‌توانیم به آن تعداد دلخواهی از آرگومان‌ها را ارسال کنیم. این ماکرو همچنین خودش مدیریت انتظار برای futures را انجام می‌دهد. بنابراین، می‌توانیم کد لیست ۱۷-۱۳ را بازنویسی کنیم تا به جای join3 از join! استفاده کنیم، همان‌طور که در لیست ۱۷-۱۴ نشان داده شده است.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        trpl::join!(tx1_fut, tx_fut, rx_fut);
    });
}
Listing 17-14: استفاده از join! برای منتظر ماندن چندین آینده

این قطعاً نسبت به جابجایی بین join، join3، join4 و موارد دیگر بهبود یافته است! با این حال، حتی این فرم ماکرو نیز فقط زمانی کار می‌کند که تعداد futures را از قبل بدانیم. اما در دنیای واقعی Rust، اضافه کردن futures به یک مجموعه و سپس انتظار برای کامل شدن برخی یا تمام آن‌ها یک الگوی رایج است.

برای بررسی تمام futures در یک مجموعه، باید روی همه آن‌ها حلقه بزنیم و آن‌ها را join کنیم. تابع trpl::join_all هر نوعی را که ویژگی Iterator را پیاده‌سازی می‌کند قبول می‌کند، که در فصل ۱۳ در بخش ویژگی Iterator و متد next درباره آن یاد گرفتید، بنابراین به نظر می‌رسد دقیقاً همان چیزی است که نیاز داریم. بیایید سعی کنیم futures خود را در یک وکتور قرار دهیم و join! را با join_all جایگزین کنیم، همان‌طور که در لیست ۱۷-۱۵ نشان داده شده است.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let futures = vec![tx1_fut, rx_fut, tx_fut];

        trpl::join_all(futures).await;
    });
}
Listing 17-15: ذخیره آینده‌های ناشناس در یک بردار و فراخوانی join_all

متأسفانه، این کد کامپایل نمی‌شود. در عوض، با این خطا مواجه می‌شویم:

error[E0308]: mismatched types
  --> src/main.rs:45:37
   |
10 |         let tx1_fut = async move {
   |                       ---------- the expected `async` block
...
24 |         let rx_fut = async {
   |                      ----- the found `async` block
...
45 |         let futures = vec![tx1_fut, rx_fut, tx_fut];
   |                                     ^^^^^^ expected `async` block, found a 
different `async` block
   |
   = note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
              found `async` block `{async block@src/main.rs:24:22: 24:27}`
   = note: no two async blocks, even if identical, have the same type
   = help: consider pinning your async block and casting it to a trait object

این ممکن است شگفت‌آور باشد. بالاخره، هیچ‌یک از بلوک‌های async چیزی بازنمی‌گردانند، بنابراین هر کدام یک Future<Output = ()> تولید می‌کنند. اما به یاد داشته باشید که Future یک ویژگی (trait) است و کامپایلر برای هر بلوک async یک enum منحصربه‌فرد ایجاد می‌کند. نمی‌توانید دو struct مختلف را که دستی نوشته شده‌اند در یک Vec قرار دهید، و همین قانون برای enumهای مختلفی که توسط کامپایلر تولید می‌شوند اعمال می‌شود.

برای اینکه این کار انجام شود، باید از اشیاء ویژگی (trait objects) استفاده کنیم، همان‌طور که در “بازگرداندن خطاها از تابع run” در فصل ۱۲ انجام دادیم. (ما اشیاء ویژگی را در فصل ۱۸ به‌طور مفصل پوشش خواهیم داد.) استفاده از اشیاء ویژگی به ما اجازه می‌دهد هر یک از futureهای ناشناس تولیدشده توسط این انواع را به‌عنوان یک نوع یکسان در نظر بگیریم، زیرا همه آن‌ها ویژگی Future را پیاده‌سازی می‌کنند.

نکته: در بخش فصل ۸ استفاده از یک Enum برای ذخیره مقادیر متعدد، درباره یک روش دیگر برای شامل کردن چندین نوع در یک Vec صحبت کردیم: استفاده از یک enum برای نمایش هر نوعی که می‌تواند در وکتور ظاهر شود. اما نمی‌توانیم اینجا از آن استفاده کنیم. از یک طرف، هیچ راهی برای نام‌گذاری انواع مختلف نداریم، زیرا آن‌ها ناشناس هستند. از طرف دیگر، دلیلی که ما در وهله اول به دنبال یک وکتور و join_all رفتیم، این بود که بتوانیم با یک مجموعه پویا از futures کار کنیم، جایی که فقط به این اهمیت می‌دهیم که همه آن‌ها خروجی یکسانی دارند.

ابتدا هر future درون vec! را در یک Box::new بسته‌بندی می‌کنیم، همان‌طور که در لیست ۱۷-۱۶ نشان داده شده است.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let futures =
            vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];

        trpl::join_all(futures).await;
    });
}
Listing 17-16: استفاده از Box::new برای تطبیق انواع futures در یک Vec

متأسفانه، این کد هنوز هم کامپایل نمی‌شود. در واقع، همان خطای پایه‌ای که قبلاً دریافت کردیم، برای فراخوانی‌های دوم و سوم Box::new نیز رخ می‌دهد، به همراه خطاهای جدیدی که به ویژگی Unpin اشاره دارند. به زودی به خطاهای مرتبط با Unpin بازمی‌گردیم. ابتدا، بیایید خطاهای نوع در فراخوانی‌های Box::new را با مشخص کردن صریح نوع متغیر futures رفع کنیم (نگاه کنید به لیست ۱۷-۱۷).

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{future::Future, time::Duration};

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let futures: Vec<Box<dyn Future<Output = ()>>> =
            vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];

        trpl::join_all(futures).await;
    });
}
Listing 17-17: برطرف کردن بقیه خطاهای ناسازگاری نوع با استفاده از اعلان صریح نوع

این type declaration کمی پیچیده است، بنابراین بیایید آن را مرحله به مرحله بررسی کنیم:

  1. نوع داخلی‌ترین، خود future است. به‌طور صریح اعلام می‌کنیم که خروجی future نوع واحد () است، با نوشتن Future<Output = ()>.
  2. سپس ویژگی را با dyn علامت‌گذاری می‌کنیم تا به‌صورت دینامیک باشد.
  3. کل مرجع ویژگی در یک Box بسته‌بندی می‌شود.
  4. در نهایت، به‌طور صریح بیان می‌کنیم که futures یک Vec است که شامل این آیتم‌ها است.

این تغییر تأثیر قابل‌توجهی داشت. اکنون وقتی کامپایلر را اجرا می‌کنیم، فقط خطاهایی که به Unpin اشاره دارند باقی می‌مانند. اگرچه سه خطا وجود دارد، اما محتوای آن‌ها بسیار مشابه است.

error[E0308]: mismatched types
   --> src/main.rs:46:46
    |
10  |         let tx1_fut = async move {
    |                       ---------- the expected `async` block
...
24  |         let rx_fut = async {
    |                      ----- the found `async` block
...
46  |             vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
    |                                     -------- ^^^^^^ expected `async` block, found a different `async` block
    |                                     |
    |                                     arguments to this function are incorrect
    |
    = note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
               found `async` block `{async block@src/main.rs:24:22: 24:27}`
    = note: no two async blocks, even if identical, have the same type
    = help: consider pinning your async block and casting it to a trait object
note: associated function defined here
   --> file:///home/.rustup/toolchains/1.82/lib/rustlib/src/rust/library/alloc/src/boxed.rs:255:12
    |
255 |     pub fn new(x: T) -> Self {
    |            ^^^

error[E0308]: mismatched types
   --> src/main.rs:46:64
    |
10  |         let tx1_fut = async move {
    |                       ---------- the expected `async` block
...
30  |         let tx_fut = async move {
    |                      ---------- the found `async` block
...
46  |             vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
    |                                                       -------- ^^^^^^ expected `async` block, found a different `async` block
    |                                                       |
    |                                                       arguments to this function are incorrect
    |
    = note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
               found `async` block `{async block@src/main.rs:30:22: 30:32}`
    = note: no two async blocks, even if identical, have the same type
    = help: consider pinning your async block and casting it to a trait object
note: associated function defined here
   --> file:///home/.rustup/toolchains/1.82/lib/rustlib/src/rust/library/alloc/src/boxed.rs:255:12
    |
255 |     pub fn new(x: T) -> Self {
    |            ^^^

error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
   --> src/main.rs:48:24
    |
48  |         trpl::join_all(futures).await;
    |         -------------- ^^^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
    |         |
    |         required by a bound introduced by this call
    |
    = note: consider using the `pin!` macro
            consider using `Box::pin` if you need to access the pinned value outside of the current scope
    = note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
note: required by a bound in `join_all`
   --> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:105:14
    |
102 | pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
    |        -------- required by a bound in this function
...
105 |     I::Item: Future,
    |              ^^^^^^ required by this bound in `join_all`

error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
  --> src/main.rs:48:9
   |
48 |         trpl::join_all(futures).await;
   |         ^^^^^^^^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
   |
   = note: consider using the `pin!` macro
           consider using `Box::pin` if you need to access the pinned value outside of the current scope
   = note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
  --> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
   |
27 | pub struct JoinAll<F>
   |            ------- required by a bound in this struct
28 | where
29 |     F: Future,
   |        ^^^^^^ required by this bound in `JoinAll`

error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
  --> src/main.rs:48:33
   |
48 |         trpl::join_all(futures).await;
   |                                 ^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
   |
   = note: consider using the `pin!` macro
           consider using `Box::pin` if you need to access the pinned value outside of the current scope
   = note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
  --> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
   |
27 | pub struct JoinAll<F>
   |            ------- required by a bound in this struct
28 | where
29 |     F: Future,
   |        ^^^^^^ required by this bound in `JoinAll`

این پیام اطلاعات زیادی برای هضم کردن دارد، بنابراین بیایید آن را تجزیه کنیم. بخش اول پیام به ما می‌گوید که اولین بلوک async (src/main.rs:8:23: 20:10) ویژگی Unpin را پیاده‌سازی نمی‌کند و پیشنهاد می‌دهد از pin! یا Box::pin برای حل آن استفاده کنیم. در ادامه این فصل، جزئیات بیشتری درباره Pin و Unpin بررسی خواهیم کرد. با این حال، فعلاً می‌توانیم فقط از توصیه کامپایلر پیروی کنیم تا از این مشکل عبور کنیم. در لیست ۱۷-۱۸، ابتدا با به‌روزرسانی اعلان نوع برای futures شروع می‌کنیم، به طوری که هر Box درون یک Pin قرار بگیرد. دوم، از Box::pin برای pin کردن خود futures استفاده می‌کنیم.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{
    future::Future,
    pin::{pin, Pin},
    time::Duration,
};

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = pin!(async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        });

        let rx_fut = pin!(async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        });

        let tx_fut = pin!(async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        });

        let futures: Vec<Pin<Box<dyn Future<Output = ()>>>> =
            vec![Box::pin(tx1_fut), Box::pin(rx_fut), Box::pin(tx_fut)];

        trpl::join_all(futures).await;
    });
}
Listing 17-18: استفاده از Pin و Box::pin برای برطرف کردن نوع Vec

اگر این کد را کامپایل و اجرا کنیم، در نهایت خروجی موردنظر خود را دریافت می‌کنیم:

received 'hi'
received 'more'
received 'from'
received 'messages'
received 'the'
received 'for'
received 'future'
received 'you'

آه!

اینجا چیزهای بیشتری برای بررسی وجود دارد. برای یک مورد، استفاده از Pin<Box<T>> یک مقدار کمی سربار اضافه می‌کند، زیرا این futures را با Box روی heap قرار می‌دهیم—و ما فقط این کار را برای هم‌تراز کردن انواع انجام می‌دهیم. بعد از همه این‌ها، ما واقعاً نیازی به تخصیص heap نداریم: این futures به این تابع خاص محدود هستند. همان‌طور که قبلاً ذکر شد، Pin خودش یک نوع wrapper است، بنابراین می‌توانیم از مزیت داشتن یک نوع واحد در Vec بهره‌مند شویم—دلیل اصلی که به دنبال Box رفتیم—بدون انجام تخصیص heap. می‌توانیم مستقیماً از Pin با هر future استفاده کنیم، با استفاده از ماکروی std::pin::pin.

با این حال، هنوز باید نوع مرجع pin شده را به‌صراحت مشخص کنیم؛ در غیر این صورت، Rust هنوز نمی‌داند که این‌ها را به‌عنوان اشیاء ویژگی دینامیک تفسیر کند، که دقیقاً همان چیزی است که ما در Vec به آن نیاز داریم. بنابراین، هر future را زمانی که تعریف می‌کنیم با pin! pin می‌کنیم، و futures را به‌عنوان یک Vec که شامل مراجع متغیر pin شده به نوع future دینامیک است تعریف می‌کنیم، همان‌طور که در لیست ۱۷-۱۹ نشان داده شده است.

با این حال، باید به‌صراحت نوع مرجع pinned را مشخص کنیم؛ در غیر این صورت، راست همچنان نمی‌داند که این‌ها را به‌عنوان شیءهای ویژگی دینامیک تفسیر کند، که همان چیزی است که برای قرار گرفتن در Vec نیاز داریم. بنابراین، هر آینده را وقتی تعریف می‌کنیم pin! می‌کنیم و futures را به‌عنوان یک Vec که شامل مراجع متغیر pinned به نوع ویژگی دینامیک Future است تعریف می‌کنیم، همانطور که در فهرست 17-19 نشان داده شده است.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{
    future::Future,
    pin::{pin, Pin},
    time::Duration,
};

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = pin!(async move {
            // --snip--
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        });

        let rx_fut = pin!(async {
            // --snip--
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        });

        let tx_fut = pin!(async move {
            // --snip--
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        });

        let futures: Vec<Pin<&mut dyn Future<Output = ()>>> =
            vec![tx1_fut, rx_fut, tx_fut];

        trpl::join_all(futures).await;
    });
}
Listing 17-19: استفاده مستقیم از Pin با ماکروی pin! برای اجتناب از تخصیص‌های غیرضروری heap

تا اینجا با نادیده گرفتن این واقعیت که ممکن است نوع‌های Output مختلفی داشته باشیم، پیش رفتیم. برای مثال، در فهرست 17-20، آینده ناشناس برای a ویژگی Future<Output = u32> را پیاده‌سازی می‌کند، آینده ناشناس برای b ویژگی Future<Output = &str> را پیاده‌سازی می‌کند، و آینده ناشناس برای c ویژگی Future<Output = bool> را پیاده‌سازی می‌کند.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

fn main() {
    trpl::run(async {
        let a = async { 1u32 };
        let b = async { "Hello!" };
        let c = async { true };

        let (a_result, b_result, c_result) = trpl::join!(a, b, c);
        println!("{a_result}, {b_result}, {c_result}");
    });
}
Listing 17-20: سه آینده با نوع‌های متفاوت

می‌توانیم از trpl::join! برای منتظر ماندن استفاده کنیم، زیرا به ما اجازه می‌دهد چندین نوع future را ارسال کنیم و یک tuple از آن انواع تولید می‌کند. اما نمی‌توانیم از trpl::join_all استفاده کنیم، زیرا این تابع نیاز دارد که همه futures ارسال‌شده نوع یکسانی داشته باشند. به یاد داشته باشید، همین خطا بود که ما را به این ماجراجویی با Pin کشاند!

این یک معاوضه بنیادی است: می‌توانیم با تعداد پویایی از futures با استفاده از join_all کار کنیم، به شرطی که همه آن‌ها نوع یکسانی داشته باشند، یا می‌توانیم با تعداد مشخصی از futures با توابع join یا ماکروی join! کار کنیم، حتی اگر آن‌ها انواع مختلفی داشته باشند. این همان شرایطی است که هنگام کار با هر نوع دیگری در Rust با آن مواجه می‌شویم. Futures خاص نیستند، حتی اگر سینتکس مناسبی برای کار با آن‌ها داشته باشیم، و این یک نکته مثبت است.

Racing Futures

وقتی آینده‌ها را با خانواده توابع و ماکروهای join “منتظر می‌مانیم”، نیاز داریم همه آن‌ها تمام شوند قبل از اینکه به مرحله بعدی برویم. گاهی اوقات، اما، فقط نیاز داریم یکی از آینده‌ها از مجموعه‌ای تمام شود قبل از اینکه به مرحله بعدی برویم—کمی شبیه به مسابقه دادن یک آینده در برابر دیگری.

در لیست ۱۷-۲۱، ما دوباره از trpl::race استفاده می‌کنیم تا دو future، یعنی slow و fast، را در برابر یکدیگر اجرا کنیم.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let slow = async {
            println!("'slow' started.");
            trpl::sleep(Duration::from_millis(100)).await;
            println!("'slow' finished.");
        };

        let fast = async {
            println!("'fast' started.");
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'fast' finished.");
        };

        trpl::race(slow, fast).await;
    });
}
Listing 17-21: استفاده از race برای دریافت نتیجه اولین آینده‌ای که تمام می‌شود

هر future یک پیام هنگام شروع اجرا چاپ می‌کند، با فراخوانی و انتظار برای sleep به مدت مشخصی مکث می‌کند، و سپس یک پیام دیگر هنگام اتمام چاپ می‌کند. سپس، هر دو future یعنی slow و fast را به trpl::race ارسال می‌کنیم و منتظر می‌مانیم تا یکی از آن‌ها به پایان برسد. (نتیجه اینجا چندان شگفت‌آور نیست: fast برنده می‌شود.) برخلاف زمانی که در “اولین برنامه Async ما” از race استفاده کردیم، اینجا به نمونه Either که بازمی‌گرداند توجه نمی‌کنیم، زیرا تمام رفتار جالب در بدنه بلوک‌های async رخ می‌دهد.

توجه کنید که اگر ترتیب آرگومان‌ها به race را جابه‌جا کنید، ترتیب پیام‌های “started” تغییر می‌کند، حتی اگر future fast همیشه زودتر به پایان برسد. دلیل این است که پیاده‌سازی این تابع خاص race منصفانه نیست. این تابع همیشه futures ارسال‌شده را به ترتیب آرگومان‌ها اجرا می‌کند. سایر پیاده‌سازی‌ها منصفانه هستند و به صورت تصادفی انتخاب می‌کنند که کدام future را ابتدا poll کنند. با این حال، صرف‌نظر از اینکه پیاده‌سازی race ما منصفانه باشد یا نه، یکی از futures تا اولین await در بدنه‌اش اجرا می‌شود قبل از اینکه task دیگری بتواند شروع شود.

به یاد بیاورید از اولین برنامه Async ما که در هر نقطه await، Rust به runtime اجازه می‌دهد تا task را متوقف کند و به task دیگری سوئیچ کند اگر future در حال انتظار آماده نباشد. عکس این موضوع هم صادق است: Rust فقط بلوک‌های async را متوقف می‌کند و کنترل را به runtime بازمی‌گرداند در یک نقطه await.

این بدان معناست که اگر در یک بلوک async بدون نقطه await مقدار زیادی کار انجام دهید، آن future دیگر futures را از پیشرفت باز می‌دارد. گاهی اوقات ممکن است به این موضوع اشاره شود که یک future دیگر futures را گرسنه می‌کند. در برخی موارد، این ممکن است مشکل بزرگی نباشد. با این حال، اگر در حال انجام برخی تنظیمات پرهزینه یا کار طولانی‌مدت هستید، یا اگر futureای دارید که به طور نامحدود یک کار خاص را انجام می‌دهد، باید به این فکر کنید که چه زمانی و کجا کنترل را به runtime بازگردانید.

به همان اندازه، اگر عملیات‌های مسدودکننده طولانی‌مدت دارید، async می‌تواند ابزاری مفید برای ارائه راه‌هایی باشد که بخش‌های مختلف برنامه بتوانند با یکدیگر تعامل داشته باشند.

اما در این موارد چگونه کنترل را به runtime بازمی‌گردانید؟

Yielding Control to the Runtime

بیایید یک عملیات طولانی‌مدت را شبیه‌سازی کنیم. لیست ۱۷-۲۲ یک تابع به نام slow معرفی می‌کند.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::run(async {
        // We will call `slow` here later
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}
Listing 17-22: استفاده از thread::sleep برای شبیه‌سازی عملیات کند

این کد از std::thread::sleep به جای trpl::sleep استفاده می‌کند، به طوری که فراخوانی slow، Thread فعلی را برای مدت مشخصی از میلی‌ثانیه‌ها مسدود می‌کند. می‌توانیم از slow به عنوان جایگزینی برای عملیات‌های واقعی که هم طولانی‌مدت هستند و هم مسدودکننده، استفاده کنیم.

در لیست ۱۷-۲۳، از slow برای شبیه‌سازی انجام این نوع کارهای CPU-bound در یک جفت future استفاده می‌کنیم.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::run(async {
        let a = async {
            println!("'a' started.");
            slow("a", 30);
            slow("a", 10);
            slow("a", 20);
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            slow("b", 10);
            slow("b", 15);
            slow("b", 350);
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'b' finished.");
        };

        trpl::race(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}
Listing 17-23: استفاده از thread::sleep برای شبیه‌سازی عملیات کند

برای شروع، هر future فقط پس از انجام یک سری عملیات کند، کنترل را به runtime بازمی‌گرداند. اگر این کد را اجرا کنید، این خروجی را مشاهده خواهید کرد:

'a' started.
'a' ran for 30ms
'a' ran for 10ms
'a' ran for 20ms
'b' started.
'b' ran for 75ms
'b' ran for 10ms
'b' ran for 15ms
'b' ran for 350ms
'a' finished.

همان‌طور که در مثال قبلی دیدیم، race همچنان به محض اینکه a تمام شود، کار را تمام می‌کند. اما بین دو future هیچ تداخل یا جابه‌جایی وجود ندارد. future a تمام کار خود را انجام می‌دهد تا زمانی که فراخوانی trpl::sleep منتظر بماند، سپس future b تمام کار خود را انجام می‌دهد تا زمانی که فراخوانی trpl::sleep خودش منتظر بماند، و در نهایت future a کامل می‌شود. برای اینکه هر دو future بتوانند بین taskهای کند خود پیشرفت کنند، به نقاط await نیاز داریم تا بتوانیم کنترل را به runtime بازگردانیم. این به این معناست که به چیزی نیاز داریم که بتوانیم برای آن منتظر بمانیم!

هم‌اکنون می‌توانیم این نوع انتقال کنترل را در لیست ۱۷-۲۳ مشاهده کنیم: اگر trpl::sleep در انتهای future a را حذف کنیم، این future بدون اجرای future b به طور کامل به پایان می‌رسد. بیایید از تابع sleep به‌عنوان نقطه شروعی برای اجازه دادن به عملیات‌ها برای جابه‌جا شدن و پیشرفت استفاده کنیم، همان‌طور که در لیست ۱۷-۲۴ نشان داده شده است.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::run(async {
        let one_ms = Duration::from_millis(1);

        let a = async {
            println!("'a' started.");
            slow("a", 30);
            trpl::sleep(one_ms).await;
            slow("a", 10);
            trpl::sleep(one_ms).await;
            slow("a", 20);
            trpl::sleep(one_ms).await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            trpl::sleep(one_ms).await;
            slow("b", 10);
            trpl::sleep(one_ms).await;
            slow("b", 15);
            trpl::sleep(one_ms).await;
            slow("b", 35);
            trpl::sleep(one_ms).await;
            println!("'b' finished.");
        };

        trpl::race(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}
Listing 17-24: استفاده از sleep برای اجازه دادن به عملیات‌ها برای پیشرفت متناوب

در فهرست 17-24، فراخوانی‌های trpl::sleep با نقاط انتظار بین هر فراخوانی به slow اضافه می‌کنیم. اکنون کار دو آینده درهم‌تنیده شده است:

'a' started.
'a' ran for 30ms
'b' started.
'b' ran for 75ms
'a' ran for 10ms
'b' ran for 10ms
'a' ran for 20ms
'b' ran for 15ms
'a' finished.

future a هنوز برای مدتی اجرا می‌شود قبل از اینکه کنترل را به b منتقل کند، زیرا ابتدا slow را فراخوانی می‌کند قبل از اینکه trpl::sleep را فراخوانی کند. اما پس از آن، futures هر بار که یکی از آن‌ها به یک نقطه await می‌رسد، به صورت متناوب جابه‌جا می‌شوند. در این مورد، ما این کار را پس از هر فراخوانی به slow انجام داده‌ایم، اما می‌توانستیم کار را به هر شکلی که برای ما منطقی‌تر است تقسیم کنیم.

با این حال، واقعاً نمی‌خواهیم اینجا sleep کنیم؛ می‌خواهیم به سریع‌ترین شکلی که می‌توانیم پیشرفت کنیم. فقط نیاز داریم کنترل را به runtime بازگردانیم. می‌توانیم این کار را به‌طور مستقیم با استفاده از تابع yield_now انجام دهیم. در فهرست 17-25، تمام این فراخوانی‌های sleep را با yield_now جایگزین می‌کنیم.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::run(async {
        let a = async {
            println!("'a' started.");
            slow("a", 30);
            trpl::yield_now().await;
            slow("a", 10);
            trpl::yield_now().await;
            slow("a", 20);
            trpl::yield_now().await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            trpl::yield_now().await;
            slow("b", 10);
            trpl::yield_now().await;
            slow("b", 15);
            trpl::yield_now().await;
            slow("b", 35);
            trpl::yield_now().await;
            println!("'b' finished.");
        };

        trpl::race(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}
Listing 17-25: استفاده از yield_now برای اجازه دادن به عملیات‌ها برای پیشرفت متناوب

این کد هم از نظر بیان هدف واقعی واضح‌تر است و هم می‌تواند به طور قابل‌توجهی سریع‌تر از استفاده از sleep باشد، زیرا تایمرهایی مانند آنچه که توسط sleep استفاده می‌شود اغلب محدودیت‌هایی در دقت خود دارند. نسخه‌ای از sleep که ما استفاده می‌کنیم، برای مثال، همیشه حداقل به مدت یک میلی‌ثانیه می‌خوابد، حتی اگر یک Duration یک نانوثانیه‌ای به آن بدهیم. دوباره، کامپیوترهای مدرن سریع هستند: آن‌ها می‌توانند در یک میلی‌ثانیه کارهای زیادی انجام دهند!

می‌توانید خودتان این را ببینید با راه‌اندازی یک بنچمارک کوچک، مانند آنچه در لیست ۱۷-۲۶ نشان داده شده است. (این روش به‌ویژه دقیقی برای انجام تست عملکرد نیست، اما برای نشان دادن تفاوت در اینجا کافی است.)

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::{Duration, Instant};

fn main() {
    trpl::run(async {
        let one_ns = Duration::from_nanos(1);
        let start = Instant::now();
        async {
            for _ in 1..1000 {
                trpl::sleep(one_ns).await;
            }
        }
        .await;
        let time = Instant::now() - start;
        println!(
            "'sleep' version finished after {} seconds.",
            time.as_secs_f32()
        );

        let start = Instant::now();
        async {
            for _ in 1..1000 {
                trpl::yield_now().await;
            }
        }
        .await;
        let time = Instant::now() - start;
        println!(
            "'yield' version finished after {} seconds.",
            time.as_secs_f32()
        );
    });
}
Listing 17-26: مقایسه عملکرد sleep و yield_now

در اینجا، تمام چاپ وضعیت را کنار می‌گذاریم، یک Duration یک نانوثانیه‌ای به trpl::sleep می‌دهیم و اجازه می‌دهیم هر future به‌صورت مستقل اجرا شود، بدون هیچ جابه‌جایی بین futures. سپس ۱,۰۰۰ بار این عملیات را تکرار می‌کنیم و می‌بینیم که futureی که از trpl::sleep استفاده می‌کند در مقایسه با futureی که از trpl::yield_now استفاده می‌کند چقدر زمان می‌برد.

نسخه‌ای که از yield_now استفاده می‌کند، بسیار سریع‌تر است!

این بدان معناست که async حتی برای وظایف وابسته به CPU می‌تواند مفید باشد، بسته به اینکه برنامه شما چه کار دیگری انجام می‌دهد، زیرا ابزاری مفید برای ساختاردهی روابط بین بخش‌های مختلف برنامه فراهم می‌کند. این نوعی از چندوظیفه‌گی مشارکتی است، جایی که هر آینده قدرت تصمیم‌گیری درباره زمان واگذاری کنترل از طریق نقاط انتظار را دارد. بنابراین، هر آینده نیز مسئولیت دارد که از مسدود کردن بیش از حد طولانی اجتناب کند. در برخی سیستم‌عامل‌های مبتنی بر راست برای سیستم‌های تعبیه‌شده، این تنها نوع چندوظیفه‌گی است!

در کد واقعی، معمولاً فراخوانی توابع را با نقاط await در هر خط متناوب نمی‌کنید، البته. در حالی که واگذاری کنترل به این روش نسبتاً کم‌هزینه است، اما رایگان نیست. در بسیاری از موارد، تلاش برای تقسیم یک task که CPU-bound است ممکن است آن را به‌طور قابل توجهی کندتر کند، بنابراین گاهی اوقات برای عملکرد کلی بهتر است که اجازه دهید یک عملیات به‌طور مختصر مسدود شود. همیشه اندازه‌گیری کنید تا ببینید تنگناهای عملکرد واقعی کد شما کجا هستند. اما، این دینامیک اساسی را باید در ذهن داشته باشید، به‌ویژه اگر واقعاً شاهد انجام مقدار زیادی کار به‌صورت ترتیبی باشید، در حالی که انتظار داشتید به‌طور همزمان انجام شود!

ساخت انتزاعات Async خودمان

ما همچنین می‌توانیم futures را با هم ترکیب کنیم تا الگوهای جدیدی ایجاد کنیم. برای مثال، می‌توانیم یک تابع timeout با استفاده از بلوک‌های سازنده async که از قبل داریم، بسازیم. هنگامی که کارمان تمام شد، نتیجه یک بلوک سازنده دیگر خواهد بود که می‌توانیم برای ایجاد انتزاعات async بیشتری از آن استفاده کنیم.

فهرست 17-27 نشان می‌دهد که چگونه انتظار داریم این timeout با یک آینده کند کار کند.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let slow = async {
            trpl::sleep(Duration::from_millis(100)).await;
            "I finished!"
        };

        match timeout(slow, Duration::from_millis(10)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}
Listing 17-27: تعریف نحوه کار timeout با یک آینده کند

بیایید این را پیاده‌سازی کنیم! برای شروع، بیایید به API مورد نیاز برای timeout فکر کنیم:

  • باید خودش یک تابع async باشد تا بتوانیم منتظر آن بمانیم.
  • پارامتر اول آن باید یک آینده برای اجرا باشد. می‌توانیم آن را عمومی کنیم تا بتواند با هر آینده‌ای کار کند.
  • پارامتر دوم آن مدت‌زمان حداکثری برای انتظار خواهد بود. اگر از یک Duration استفاده کنیم، این کار ارسال آن به trpl::sleep را آسان می‌کند.
  • باید یک Result بازگرداند. اگر آینده با موفقیت کامل شود، Result شامل Ok با مقدار تولیدشده توسط آینده خواهد بود. اگر زمان محدودیت زودتر سپری شود، Result شامل Err با مدت‌زمانی که زمان محدودیت برای آن منتظر ماند خواهد بود.

فهرست 17-28 این اعلان را نشان می‌دهد.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{future::Future, time::Duration};

fn main() {
    trpl::run(async {
        let slow = async {
            trpl::sleep(Duration::from_secs(5)).await;
            "Finally finished"
        };

        match timeout(slow, Duration::from_millis(10)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}

async fn timeout<F: Future>(
    future_to_try: F,
    max_time: Duration,
) -> Result<F::Output, Duration> {
    // Here is where our implementation will go!
}
Listing 17-28: تعریف امضای timeout

این اهداف ما برای نوع‌ها را برآورده می‌کند. حالا بیایید به رفتاری که نیاز داریم فکر کنیم: می‌خواهیم آینده ارسال‌شده به آن را در برابر مدت‌زمان محدودیت مسابقه دهیم. می‌توانیم از trpl::sleep برای ساختن یک آینده تایمر از مدت‌زمان استفاده کنیم و از trpl::race برای اجرای آن تایمر با آینده‌ای که کاربر ارسال می‌کند استفاده کنیم.

ما همچنین می‌دانیم که race منصفانه نیست و آرگومان‌ها را به ترتیب ارسال‌شده poll می‌کند. بنابراین، ابتدا future_to_try را به race ارسال می‌کنیم تا حتی اگر max_time مدت زمان بسیار کوتاهی باشد، فرصتی برای تکمیل شدن داشته باشد. اگر future_to_try زودتر تمام شود، race مقدار Left را با خروجی future_to_try بازمی‌گرداند. اگر timer زودتر تمام شود، race مقدار Right را با خروجی () تایمر بازمی‌گرداند.

در لیست ۱۷-۲۹، نتیجه انتظار برای trpl::race را match می‌کنیم.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{future::Future, time::Duration};

use trpl::Either;

// --snip--

fn main() {
    trpl::run(async {
        let slow = async {
            trpl::sleep(Duration::from_secs(5)).await;
            "Finally finished"
        };

        match timeout(slow, Duration::from_secs(2)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}

async fn timeout<F: Future>(
    future_to_try: F,
    max_time: Duration,
) -> Result<F::Output, Duration> {
    match trpl::race(future_to_try, trpl::sleep(max_time)).await {
        Either::Left(output) => Ok(output),
        Either::Right(_) => Err(max_time),
    }
}
Listing 17-29: تعریف timeout با استفاده از race و sleep

اگر future_to_try موفق شود و مقدار Left(output) دریافت کنیم، مقدار Ok(output) را بازمی‌گردانیم. اگر به جای آن تایمر خواب منقضی شود و مقدار Right(()) دریافت کنیم، () را با _ نادیده گرفته و به جای آن مقدار Err(max_time) را بازمی‌گردانیم.

با این کار، یک timeout عملیاتی داریم که از دو ابزار کمکی async دیگر ساخته شده است. اگر کد خود را اجرا کنیم، پس از انقضای timeout، حالت شکست را چاپ خواهد کرد:

Failed after 2 seconds

از آنجا که futures می‌توانند با دیگر futures ترکیب شوند، می‌توانید ابزارهای بسیار قدرتمندی با استفاده از بلوک‌های سازنده کوچک‌تر async بسازید. برای مثال، می‌توانید از همین رویکرد برای ترکیب timeoutها با retries استفاده کنید و به نوبه خود از آن‌ها با عملیاتی مانند تماس‌های شبکه (یکی از مثال‌های ابتدای فصل) استفاده کنید.

در عمل، معمولاً مستقیماً با async و await کار می‌کنید و به طور ثانویه از توابع و ماکروهایی مانند join، join_all، race و غیره استفاده می‌کنید. فقط گاهی نیاز خواهید داشت از pin برای استفاده از futures با آن APIها استفاده کنید.

اکنون روش‌های متعددی برای کار با چندین future به طور همزمان دیده‌ایم. در ادامه، بررسی خواهیم کرد که چگونه می‌توانیم با چندین future به صورت متوالی در طول زمان با streams کار کنیم. با این حال، در ابتدا ممکن است بخواهید به چند نکته دیگر توجه کنید:

  • ما از یک Vec همراه با join_all استفاده کردیم تا منتظر بمانیم تمام futures در یک گروه به پایان برسند. چگونه می‌توانید از یک Vec برای پردازش یک گروه از futures به صورت متوالی استفاده کنید؟ معاوضه‌های انجام این کار چیست؟

  • به نوع futures::stream::FuturesUnordered از crate futures نگاهی بیندازید. استفاده از آن چگونه می‌تواند با استفاده از یک Vec متفاوت باشد؟ (نگران این نباشید که این نوع از بخش stream crate آمده است؛ با هر مجموعه‌ای از futures به خوبی کار می‌کند.)