Applying Concurrency with Async

در این بخش، async را به برخی از همان چالش‌های همزمانی که با نخ‌ها در فصل 16 انجام دادیم اعمال می‌کنیم. از آنجا که قبلاً درباره بسیاری از ایده‌های کلیدی در آنجا صحبت کرده‌ایم، در این بخش تمرکز بر تفاوت‌های بین نخ‌ها و آینده‌ها (futures) خواهیم داشت.

در بسیاری از موارد، APIها برای کار با همزمانی (concurrency) با استفاده از async بسیار شبیه به APIهایی هستند که برای استفاده از Threadها استفاده می‌شوند. در موارد دیگر، این APIها کاملاً متفاوت هستند. حتی زمانی که APIها بین Threadها و async شبیه به نظر می‌رسند، اغلب رفتار متفاوتی دارند—و تقریباً همیشه ویژگی‌های عملکردی متفاوتی دارند.

ایجاد یک Task جدید با spawn_task

اولین عملیاتی که در ایجاد یک Thread جدید با Spawn انجام دادیم، شمارش افزایشی در دو Thread جداگانه بود. بیایید همان کار را با استفاده از async انجام دهیم. crate trpl یک تابع spawn_task فراهم می‌کند که بسیار شبیه به API thread::spawn است، و یک تابع sleep که نسخه async از API thread::sleep است. می‌توانیم از این دو با هم استفاده کنیم تا مثال شمارش را پیاده‌سازی کنیم، همان‌طور که در لیست ۱۷-۶ نشان داده شده است.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }
    });
}
Listing 17-6: ایجاد یک Task جدید برای چاپ یک چیز در حالی که Task اصلی چیز دیگری را چاپ می‌کند

به‌عنوان نقطه شروع، تابع main خود را با استفاده از trpl::run تنظیم می‌کنیم تا تابع سطح بالای ما بتواند async باشد.

نکته: از این نقطه به بعد در فصل، هر مثال این کد بسته‌بندی یکسان را با trpl::run در main شامل خواهد شد، بنابراین اغلب آن را مانند main نادیده می‌گیریم. فراموش نکنید که آن را در کد خود بگنجانید!

سپس دو حلقه درون آن بلوک می‌نویسیم که هر کدام شامل یک فراخوانی به trpl::sleep هستند، که قبل از ارسال پیام بعدی به مدت نیم ثانیه (۵۰۰ میلی‌ثانیه) منتظر می‌مانند. یکی از حلقه‌ها را در بدنه یک trpl::spawn_task قرار می‌دهیم و دیگری را در یک حلقه for در سطح بالا. همچنین پس از فراخوانی‌های sleep یک await اضافه می‌کنیم.

این کد رفتاری مشابه با پیاده‌سازی مبتنی بر Thread دارد—از جمله اینکه ممکن است پیام‌ها را در ترتیبی متفاوت در ترمینال خود هنگام اجرا مشاهده کنید:s

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!

این نسخه به محض اینکه حلقه for در بدنه بلوک async اصلی به پایان می‌رسد، متوقف می‌شود، زیرا taskی که توسط spawn_task ایجاد شده است با پایان یافتن تابع main متوقف می‌شود. اگر بخواهید تا اتمام کامل task اجرا شود، باید از یک handle join استفاده کنید تا منتظر بمانید اولین task به پایان برسد. با Threadها، از متد join برای “مسدود کردن” تا زمانی که Thread اجرا می‌شد، استفاده می‌کردیم. در لیست ۱۷-۷، می‌توانیم از await برای انجام همین کار استفاده کنیم، زیرا handle task خودش یک future است. نوع Output آن یک Result است، بنابراین پس از منتظر ماندن آن را unwrap می‌کنیم.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let handle = trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }

        handle.await.unwrap();
    });
}
Listing 17-7: استفاده از await با یک handle الحاقی برای اجرای تسک تا تکمیل

نسخه به‌روزرسانی‌شده تا زمانی که هر دو حلقه تمام شوند اجرا می‌شود.

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

تا اینجا، به نظر می‌رسد async و نخ‌ها نتایج اصلی یکسانی به ما می‌دهند، فقط با سینتکس متفاوت: استفاده از await به جای فراخوانی join روی handle الحاقی و انتظار برای فراخوانی‌های sleep.

تفاوت بزرگ‌تر این است که نیازی به ایجاد یک نخ سیستم‌عامل جداگانه برای این کار نداشتیم. در واقع، حتی نیازی به ایجاد یک تسک هم در اینجا نداریم. زیرا بلوک‌های async به آینده‌های ناشناس کامپایل می‌شوند، می‌توانیم هر حلقه را در یک بلوک async قرار دهیم و اجازه دهیم runtime هر دو را با استفاده از تابع trpl::join تا تکمیل اجرا کند.

در بخش انتظار برای اتمام تمام Threadها با استفاده از Handles join، نشان دادیم که چگونه می‌توان از متد join در نوع JoinHandle که هنگام فراخوانی std::thread::spawn بازگردانده می‌شود، استفاده کرد. تابع trpl::join مشابه است، اما برای futures طراحی شده است. وقتی دو future به آن می‌دهید، یک future جدید ایجاد می‌کند که خروجی آن یک tuple شامل خروجی هر یک از futureهایی است که به آن ارسال کرده‌اید، به شرطی که هر دو کامل شوند. بنابراین، در لیست ۱۷-۸، از trpl::join استفاده می‌کنیم تا منتظر بمانیم fut1 و fut2 به پایان برسند. ما نه برای fut1 و fut2، بلکه برای future جدیدی که توسط trpl::join تولید می‌شود، منتظر می‌مانیم. خروجی را نادیده می‌گیریم، زیرا فقط یک tuple شامل دو مقدار unit است.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let fut1 = async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let fut2 = async {
            for i in 1..5 {
                println!("hi number {i} from the second task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        trpl::join(fut1, fut2).await;
    });
}
Listing 17-8: استفاده از trpl::join برای منتظر ماندن دو آینده ناشناس

وقتی این کد را اجرا می‌کنیم، می‌بینیم هر دو futures تا تکمیل اجرا می‌شوند:

hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

اکنون، هر بار دقیقاً همان ترتیب را مشاهده خواهید کرد، که بسیار متفاوت از چیزی است که با Threadها دیدیم. دلیل این امر این است که تابع trpl::join منصفانه است، به این معنی که هر future را به یک اندازه بررسی می‌کند، بین آن‌ها تناوب می‌گذارد و هرگز اجازه نمی‌دهد یکی از آن‌ها جلو بیفتد اگر دیگری آماده باشد. با Threadها، سیستم‌عامل تصمیم می‌گیرد که کدام Thread بررسی شود و چه مدت به آن اجازه اجرا بدهد. با Rust async، runtime تصمیم می‌گیرد که کدام task بررسی شود. (در عمل، جزئیات پیچیده می‌شوند زیرا یک runtime async ممکن است از Threadهای سیستم‌عامل در پشت صحنه به‌عنوان بخشی از نحوه مدیریت همزمانی استفاده کند، بنابراین تضمین منصفانه بودن می‌تواند برای runtime بیشتر کار ببرد—اما همچنان ممکن است!) runtimeها نیازی به تضمین منصفانه بودن برای هر عملیات خاصی ندارند، و اغلب APIهای مختلفی ارائه می‌دهند که به شما اجازه می‌دهند انتخاب کنید آیا می‌خواهید منصفانه بودن را اعمال کنید یا خیر.

برخی از این تغییرات در انتظار برای futures را امتحان کنید و ببینید چه می‌کنند:

  • بلوک async را از اطراف یکی یا هر دو حلقه حذف کنید.
  • هر بلوک async را بلافاصله پس از تعریف آن منتظر بمانید.
  • فقط حلقه اول را در یک بلوک async قرار دهید و آینده حاصل را پس از بدنه حلقه دوم منتظر بمانید.

برای یک چالش اضافی، ببینید آیا می‌توانید پیش از اجرای کد پیش‌بینی کنید که خروجی چه خواهد بود!

شمارش افزایشی در دو Task با استفاده از ارسال پیام

اشتراک داده‌ها بین futures نیز آشنا خواهد بود: دوباره از ارسال پیام استفاده خواهیم کرد، اما این بار با نسخه‌های async از انواع و توابع. ما مسیری کمی متفاوت از استفاده از ارسال پیام برای انتقال داده‌ها بین Threadها خواهیم پیمود تا برخی از تفاوت‌های کلیدی بین همزمانی مبتنی بر Thread و همزمانی مبتنی بر futures را نشان دهیم. در لیست ۱۷-۹، فقط با یک بلوک async شروع می‌کنیم—و نه ایجاد یک task جداگانه، همان‌طور که یک Thread جداگانه ایجاد کردیم.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let val = String::from("hi");
        tx.send(val).unwrap();

        let received = rx.recv().await.unwrap();
        println!("Got: {received}");
    });
}
Listing 17-9: ایجاد یک کانال async و اختصاص دو نیمه به tx و rx

اینجا، از trpl::channel استفاده می‌کنیم، نسخه async از API کانال چندتولیدی، یک‌مصرفی که در فصل 16 با نخ‌ها استفاده کردیم. نسخه async از API فقط کمی با نسخه مبتنی بر نخ متفاوت است: به جای استفاده از یک گیرنده غیرقابل‌تغییر (immutable)، از یک گیرنده قابل‌تغییر (mutable) rx استفاده می‌کند، و متد recv آن یک آینده تولید می‌کند که باید منتظر آن بمانیم، به جای تولید مقدار به‌طور مستقیم. اکنون می‌توانیم پیام‌ها را از فرستنده به گیرنده ارسال کنیم. توجه کنید که نیازی به ایجاد یک نخ جداگانه یا حتی یک تسک نداریم؛ فقط باید فراخوانی rx.recv را منتظر بمانیم.

متد همگام Receiver::recv در std::mpsc::channel تا زمانی که پیامی دریافت شود مسدود می‌شود. متد trpl::Receiver::recv این کار را نمی‌کند، زیرا async است. به جای مسدود شدن، کنترل را به runtime بازمی‌گرداند تا زمانی که یا پیامی دریافت شود یا سمت ارسال کانال بسته شود. در مقابل، ما فراخوانی send را منتظر نمی‌مانیم، زیرا مسدود نمی‌شود. نیازی به این کار ندارد، زیرا کانالی که پیام را به آن ارسال می‌کنیم بدون حد است.

نکته: از آنجا که تمام این کد async در یک بلوک async درون یک فراخوانی trpl::run اجرا می‌شود، همه چیز درون آن می‌تواند از مسدود شدن اجتناب کند. با این حال، کد خارج از آن روی بازگشت تابع run مسدود می‌شود. این همان هدف اصلی تابع trpl::run است: به شما اجازه می‌دهد انتخاب کنید که کجا روی مجموعه‌ای از کد async مسدود شوید و بنابراین کجا بین کدهای sync و async انتقال دهید. در بیشتر runtimeهای async، run در واقع به همین دلیل block_on نامیده می‌شود.

دو نکته در مورد این مثال توجه کنید. اول، پیام بلافاصله خواهد رسید. دوم، اگرچه ما اینجا از یک future استفاده می‌کنیم، هنوز هم هیچ همزمانی (concurrency) وجود ندارد. همه چیز در این لیست به ترتیب انجام می‌شود، درست مانند اینکه هیچ futureای در کار نباشد.

بیایید به قسمت اول بپردازیم، با ارسال یک سری پیام و خوابیدن بین آن‌ها، همان‌طور که در لیست ۱۷-۱۰ نشان داده شده است.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("future"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            trpl::sleep(Duration::from_millis(500)).await;
        }

        while let Some(value) = rx.recv().await {
            println!("received '{value}'");
        }
    });
}
Listing 17-10: ارسال و دریافت چندین پیام از طریق کانال async و استفاده از await بین هر پیام

علاوه بر ارسال پیام‌ها، باید آن‌ها را دریافت کنیم. در این مورد، چون می‌دانیم چند پیام قرار است دریافت شوند، می‌توانستیم این کار را به‌صورت دستی با چهار بار فراخوانی rx.recv().await انجام دهیم. اما در دنیای واقعی، معمولاً در انتظار یک تعداد نامعلوم از پیام‌ها خواهیم بود، بنابراین نیاز داریم تا زمانی که مشخص کنیم پیام دیگری وجود ندارد، به انتظار ادامه دهیم.

در لیست ۱۶-۱۰، از یک حلقه for برای پردازش تمام آیتم‌های دریافت‌شده از یک کانال همزمان استفاده کردیم. با این حال، Rust هنوز راهی برای نوشتن یک حلقه for روی یک سری آیتم ناهمزمان ندارد، بنابراین باید از حلقه‌ای استفاده کنیم که قبلاً ندیده‌ایم: حلقه شرطی while let. این حلقه نسخه حلقه‌ای از ساختار if let است که در بخش کنترل جریان مختصر با if let و let else دیدیم. این حلقه تا زمانی که الگوی مشخص‌شده آن همچنان با مقدار مطابقت داشته باشد، به اجرا ادامه می‌دهد.

فراخوانی rx.recv یک future تولید می‌کند که منتظر آن می‌مانیم. runtime تا زمانی که future آماده شود، آن را متوقف می‌کند. وقتی پیامی برسد، future به Some(message) حل می‌شود، به ازای هر باری که پیام برسد. وقتی کانال بسته شود، صرف‌نظر از اینکه آیا پیام‌هایی رسیده‌اند یا خیر، future به None حل می‌شود تا نشان دهد دیگر مقادیری وجود ندارد و بنابراین باید polling را متوقف کنیم—یعنی منتظر ماندن را متوقف کنیم.

حلقه while let همه این‌ها را کنار هم قرار می‌دهد. اگر نتیجه فراخوانی rx.recv().await برابر با Some(message) باشد، به پیام دسترسی پیدا می‌کنیم و می‌توانیم از آن در بدنه حلقه استفاده کنیم، همانطور که با if let می‌توانستیم. اگر نتیجه None باشد، حلقه متوقف می‌شود. هر بار که حلقه کامل می‌شود، به نقطه انتظار بازمی‌گردد، بنابراین runtime دوباره آن را متوقف می‌کند تا زمانی که پیام دیگری برسد.

کد اکنون تمام پیام‌ها را با موفقیت ارسال و دریافت می‌کند. متأسفانه، هنوز چند مشکل وجود دارد. برای یک مورد، پیام‌ها با فواصل نیم‌ثانیه‌ای نمی‌رسند. همه آن‌ها به‌یک‌باره و ۲ ثانیه (۲۰۰۰ میلی‌ثانیه) پس از شروع برنامه می‌رسند. برای مورد دیگر، این برنامه هرگز به‌طور خودکار پایان نمی‌یابد! در عوض، برای همیشه منتظر پیام‌های جدید می‌ماند. برای متوقف کردن آن باید از ctrl-c استفاده کنید.

بیایید با بررسی دلیل اینکه چرا پیام‌ها پس از تأخیر کامل به‌یک‌باره می‌آیند، شروع کنیم، به‌جای اینکه با تأخیر بین هرکدام ظاهر شوند. در یک بلوک async خاص، ترتیب ظاهر شدن کلمات کلیدی await در کد، همان ترتیبی است که هنگام اجرای برنامه اجرا می‌شوند.

در فهرست 17-10 فقط یک بلوک async وجود دارد، بنابراین همه چیز در آن به‌صورت خطی اجرا می‌شود. هنوز هم هیچ همزمانی وجود ندارد. تمام فراخوانی‌های tx.send انجام می‌شوند، در میان تمام فراخوانی‌های trpl::sleep و نقاط انتظار مرتبط با آن‌ها. فقط پس از آن، حلقه while let به نقاط انتظار روی فراخوانی‌های recv می‌رسد.

برای به دست آوردن رفتار مورد نظر، که در آن تأخیر خواب بین هر پیام رخ می‌دهد، باید عملیات‌های tx و rx را در بلوک‌های async جداگانه قرار دهیم، همان‌طور که در لیست ۱۷-۱۱ نشان داده شده است. سپس runtime می‌تواند هر یک از آن‌ها را جداگانه با استفاده از trpl::join اجرا کند، دقیقاً مانند مثال شمارش. بار دیگر، منتظر نتیجه فراخوانی trpl::join می‌مانیم، نه futures فردی. اگر به صورت ترتیبی برای futures فردی منتظر می‌ماندیم، دوباره به جریان ترتیبی بازمی‌گشتیم—دقیقاً چیزی که تلاش می‌کنیم انجام ندهیم.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}
Listing 17-11: جدا کردن send و recv در بلوک‌های async جداگانه و منتظر ماندن برای آینده‌های این بلوک‌ها

با کد به‌روزرسانی‌شده در لیست ۱۷-۱۱، پیام‌ها با فواصل ۵۰۰ میلی‌ثانیه چاپ می‌شوند، به‌جای اینکه همه با عجله پس از ۲ ثانیه ظاهر شوند.

برنامه هنوز هم هرگز خارج نمی‌شود، به دلیل نحوه تعامل حلقه while let با trpl::join:

  • Future بازگردانده‌شده از trpl::join تنها زمانی تکمیل می‌شود که هر دو future ارسال‌شده به آن تکمیل شده باشند.
  • Future مربوط به tx زمانی تکمیل می‌شود که پس از ارسال آخرین پیام در vals خوابیدن آن به پایان برسد.
  • Future مربوط به rx تا زمانی که حلقه while let به پایان نرسد تکمیل نخواهد شد.
  • حلقه while let تا زمانی که منتظر rx.recv باشد و مقدار None تولید شود، پایان نمی‌یابد.
  • منتظر شدن برای rx.recv تنها زمانی مقدار None بازمی‌گرداند که طرف دیگر کانال بسته شود.
  • کانال تنها در صورتی بسته می‌شود که rx.close را فراخوانی کنیم یا طرف فرستنده، یعنی tx، حذف شود.
  • ما هیچ‌جا rx.close را فراخوانی نمی‌کنیم، و tx تا زمانی که بیرونی‌ترین بلوک async ارسال‌شده به trpl::run به پایان نرسد، حذف نمی‌شود.
  • این بلوک نمی‌تواند به پایان برسد زیرا منتظر تکمیل شدن trpl::join است، که ما را دوباره به بالای این لیست بازمی‌گرداند.

ما می‌توانیم به‌صورت دستی با فراخوانی rx.close کانال را ببندیم، اما این کار چندان منطقی نیست. توقف پس از پردازش تعداد دلخواهی از پیام‌ها باعث می‌شود برنامه خاموش شود، اما ممکن است پیام‌ها را از دست بدهیم. ما به راه دیگری نیاز داریم تا مطمئن شویم که tx قبل از پایان تابع حذف می‌شود.

در حال حاضر، بلوک async که پیام‌ها را ارسال می‌کند فقط tx را قرض می‌گیرد زیرا ارسال پیام نیاز به مالکیت ندارد، اما اگر می‌توانستیم tx را به داخل آن بلوک async منتقل کنیم، پس از پایان آن بلوک حذف می‌شد. در بخش فصل ۱۳ گرفتن مراجع یا جابه‌جایی مالکیت یاد گرفتید چگونه از کلمه کلیدی move با closures استفاده کنید، و همان‌طور که در بخش فصل ۱۶ استفاده از closures move با Threadها بحث شد، اغلب هنگام کار با Threadها نیاز داریم داده‌ها را به داخل closures منتقل کنیم. همان دینامیک‌های اساسی برای بلوک‌های async اعمال می‌شود، بنابراین کلمه کلیدی move با بلوک‌های async همان‌طور کار می‌کند که با closures کار می‌کند.

در لیست ۱۷-۱۲، بلوک مورد استفاده برای ارسال پیام‌ها را از async به async move تغییر می‌دهیم. وقتی این نسخه از کد را اجرا می‌کنیم، برنامه پس از ارسال و دریافت آخرین پیام به‌طور مرتب خاتمه می‌یابد.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}
Listing 17-12: نسخه بازبینی‌شده کد از لیست ۱۷-۱۱ که به‌درستی پس از اتمام خاتمه می‌یابد

این کانال async همچنین یک کانال چند-تولیدی (multiple-producer) است، بنابراین اگر بخواهیم پیام‌ها را از چندین future ارسال کنیم، می‌توانیم clone را روی tx فراخوانی کنیم، همان‌طور که در لیست ۱۷-۱۳ نشان داده شده است.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(1500)).await;
            }
        };

        trpl::join3(tx1_fut, tx_fut, rx_fut).await;
    });
}
Listing 17-13: استفاده از تولیدکنندگان متعدد با بلوک‌های async

ابتدا، tx را clone کرده و tx1 را خارج از بلوک async اول ایجاد می‌کنیم. tx1 را همانند قبل با tx به داخل آن بلوک منتقل می‌کنیم. سپس، در ادامه، tx اصلی را به یک بلوک جدید async منتقل می‌کنیم، جایی که پیام‌های بیشتری با یک تأخیر کمی کندتر ارسال می‌کنیم. ما این بلوک async جدید را بعد از بلوک async برای دریافت پیام‌ها قرار می‌دهیم، اما می‌توانستیم به همان اندازه آن را قبل از آن قرار دهیم. نکته کلیدی ترتیب منتظر ماندن برای futures است، نه ترتیب ایجاد آن‌ها.

هر دو بلوک async برای ارسال پیام‌ها باید بلوک‌های async move باشند تا tx و tx1 هر دو پس از پایان آن بلوک‌ها حذف شوند. در غیر این صورت، دوباره به همان حلقه بی‌نهایت اولیه بازمی‌گردیم. در نهایت، از trpl::join به trpl::join3 تغییر می‌دهیم تا future اضافی را مدیریت کنیم.

اکنون تمام پیام‌های هر دو future ارسال را می‌بینیم، و چون futures ارسال از تأخیرهای کمی متفاوت پس از ارسال استفاده می‌کنند، پیام‌ها نیز در این فواصل مختلف دریافت می‌شوند.

received 'hi'
received 'more'
received 'from'
received 'the'
received 'messages'
received 'future'
received 'for'
received 'you'

این یک شروع خوب است، اما ما را به تعداد محدودی از futures محدود می‌کند: دو عدد با join یا سه عدد با join3. بیایید ببینیم چگونه می‌توانیم با تعداد بیشتری از futures کار کنیم.