Applying Concurrency with Async
در این بخش، async را به برخی از همان چالشهای همزمانی که با نخها در فصل 16 انجام دادیم اعمال میکنیم. از آنجا که قبلاً درباره بسیاری از ایدههای کلیدی در آنجا صحبت کردهایم، در این بخش تمرکز بر تفاوتهای بین نخها و آیندهها (futures) خواهیم داشت.
در بسیاری از موارد، APIها برای کار با همزمانی (concurrency) با استفاده از async بسیار شبیه به APIهایی هستند که برای استفاده از Threadها استفاده میشوند. در موارد دیگر، این APIها کاملاً متفاوت هستند. حتی زمانی که APIها بین Threadها و async شبیه به نظر میرسند، اغلب رفتار متفاوتی دارند—و تقریباً همیشه ویژگیهای عملکردی متفاوتی دارند.
ایجاد یک Task جدید با spawn_task
اولین عملیاتی که در ایجاد یک Thread جدید با Spawn انجام دادیم، شمارش افزایشی در دو Thread جداگانه بود. بیایید همان کار را با استفاده از async انجام دهیم. crate trpl
یک تابع spawn_task
فراهم میکند که بسیار شبیه به API thread::spawn
است، و یک تابع sleep
که نسخه async از API thread::sleep
است. میتوانیم از این دو با هم استفاده کنیم تا مثال شمارش را پیادهسازی کنیم، همانطور که در لیست ۱۷-۶ نشان داده شده است.
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { trpl::spawn_task(async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(500)).await; } }); for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(500)).await; } }); }
بهعنوان نقطه شروع، تابع main
خود را با استفاده از trpl::run
تنظیم میکنیم تا تابع سطح بالای ما بتواند async باشد.
نکته: از این نقطه به بعد در فصل، هر مثال این کد بستهبندی یکسان را با
trpl::run
درmain
شامل خواهد شد، بنابراین اغلب آن را مانندmain
نادیده میگیریم. فراموش نکنید که آن را در کد خود بگنجانید!
سپس دو حلقه درون آن بلوک مینویسیم که هر کدام شامل یک فراخوانی به trpl::sleep هستند، که قبل از ارسال پیام بعدی به مدت نیم ثانیه (۵۰۰ میلیثانیه) منتظر میمانند. یکی از حلقهها را در بدنه یک trpl::spawn_task قرار میدهیم و دیگری را در یک حلقه for در سطح بالا. همچنین پس از فراخوانیهای sleep یک await اضافه میکنیم.
این کد رفتاری مشابه با پیادهسازی مبتنی بر Thread دارد—از جمله اینکه ممکن است پیامها را در ترتیبی متفاوت در ترمینال خود هنگام اجرا مشاهده کنید:s
hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
این نسخه به محض اینکه حلقه for
در بدنه بلوک async اصلی به پایان میرسد، متوقف میشود، زیرا taskی که توسط spawn_task
ایجاد شده است با پایان یافتن تابع main
متوقف میشود. اگر بخواهید تا اتمام کامل task اجرا شود، باید از یک handle join استفاده کنید تا منتظر بمانید اولین task به پایان برسد. با Threadها، از متد join
برای “مسدود کردن” تا زمانی که Thread اجرا میشد، استفاده میکردیم. در لیست ۱۷-۷، میتوانیم از await
برای انجام همین کار استفاده کنیم، زیرا handle task خودش یک future است. نوع Output
آن یک Result
است، بنابراین پس از منتظر ماندن آن را unwrap میکنیم.
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let handle = trpl::spawn_task(async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(500)).await; } }); for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(500)).await; } handle.await.unwrap(); }); }
await
با یک handle الحاقی برای اجرای تسک تا تکمیلنسخه بهروزرسانیشده تا زمانی که هر دو حلقه تمام شوند اجرا میشود.
hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!
تا اینجا، به نظر میرسد async و نخها نتایج اصلی یکسانی به ما میدهند، فقط با سینتکس متفاوت: استفاده از await
به جای فراخوانی join
روی handle الحاقی و انتظار برای فراخوانیهای sleep
.
تفاوت بزرگتر این است که نیازی به ایجاد یک نخ سیستمعامل جداگانه برای این کار نداشتیم. در واقع، حتی نیازی به ایجاد یک تسک هم در اینجا نداریم. زیرا بلوکهای async به آیندههای ناشناس کامپایل میشوند، میتوانیم هر حلقه را در یک بلوک async قرار دهیم و اجازه دهیم runtime هر دو را با استفاده از تابع trpl::join
تا تکمیل اجرا کند.
در بخش انتظار برای اتمام تمام Threadها با استفاده از Handles join
، نشان دادیم که چگونه میتوان از متد join
در نوع JoinHandle
که هنگام فراخوانی std::thread::spawn
بازگردانده میشود، استفاده کرد. تابع trpl::join
مشابه است، اما برای futures طراحی شده است. وقتی دو future به آن میدهید، یک future جدید ایجاد میکند که خروجی آن یک tuple شامل خروجی هر یک از futureهایی است که به آن ارسال کردهاید، به شرطی که هر دو کامل شوند. بنابراین، در لیست ۱۷-۸، از trpl::join
استفاده میکنیم تا منتظر بمانیم fut1
و fut2
به پایان برسند. ما نه برای fut1
و fut2
، بلکه برای future جدیدی که توسط trpl::join
تولید میشود، منتظر میمانیم. خروجی را نادیده میگیریم، زیرا فقط یک tuple شامل دو مقدار unit است.
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let fut1 = async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(500)).await; } }; let fut2 = async { for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(500)).await; } }; trpl::join(fut1, fut2).await; }); }
trpl::join
برای منتظر ماندن دو آینده ناشناسوقتی این کد را اجرا میکنیم، میبینیم هر دو futures تا تکمیل اجرا میشوند:
hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!
اکنون، هر بار دقیقاً همان ترتیب را مشاهده خواهید کرد، که بسیار متفاوت از چیزی است که با Threadها دیدیم. دلیل این امر این است که تابع trpl::join
منصفانه است، به این معنی که هر future را به یک اندازه بررسی میکند، بین آنها تناوب میگذارد و هرگز اجازه نمیدهد یکی از آنها جلو بیفتد اگر دیگری آماده باشد. با Threadها، سیستمعامل تصمیم میگیرد که کدام Thread بررسی شود و چه مدت به آن اجازه اجرا بدهد. با Rust async، runtime تصمیم میگیرد که کدام task بررسی شود. (در عمل، جزئیات پیچیده میشوند زیرا یک runtime async ممکن است از Threadهای سیستمعامل در پشت صحنه بهعنوان بخشی از نحوه مدیریت همزمانی استفاده کند، بنابراین تضمین منصفانه بودن میتواند برای runtime بیشتر کار ببرد—اما همچنان ممکن است!) runtimeها نیازی به تضمین منصفانه بودن برای هر عملیات خاصی ندارند، و اغلب APIهای مختلفی ارائه میدهند که به شما اجازه میدهند انتخاب کنید آیا میخواهید منصفانه بودن را اعمال کنید یا خیر.
برخی از این تغییرات در انتظار برای futures را امتحان کنید و ببینید چه میکنند:
- بلوک async را از اطراف یکی یا هر دو حلقه حذف کنید.
- هر بلوک async را بلافاصله پس از تعریف آن منتظر بمانید.
- فقط حلقه اول را در یک بلوک async قرار دهید و آینده حاصل را پس از بدنه حلقه دوم منتظر بمانید.
برای یک چالش اضافی، ببینید آیا میتوانید پیش از اجرای کد پیشبینی کنید که خروجی چه خواهد بود!
شمارش افزایشی در دو Task با استفاده از ارسال پیام
اشتراک دادهها بین futures نیز آشنا خواهد بود: دوباره از ارسال پیام استفاده خواهیم کرد، اما این بار با نسخههای async از انواع و توابع. ما مسیری کمی متفاوت از استفاده از ارسال پیام برای انتقال دادهها بین Threadها خواهیم پیمود تا برخی از تفاوتهای کلیدی بین همزمانی مبتنی بر Thread و همزمانی مبتنی بر futures را نشان دهیم. در لیست ۱۷-۹، فقط با یک بلوک async شروع میکنیم—و نه ایجاد یک task جداگانه، همانطور که یک Thread جداگانه ایجاد کردیم.
extern crate trpl; // required for mdbook test fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let val = String::from("hi"); tx.send(val).unwrap(); let received = rx.recv().await.unwrap(); println!("Got: {received}"); }); }
tx
و rx
اینجا، از trpl::channel
استفاده میکنیم، نسخه async از API کانال چندتولیدی، یکمصرفی که در فصل 16 با نخها استفاده کردیم. نسخه async از API فقط کمی با نسخه مبتنی بر نخ متفاوت است: به جای استفاده از یک گیرنده غیرقابلتغییر (immutable)، از یک گیرنده قابلتغییر (mutable) rx
استفاده میکند، و متد recv
آن یک آینده تولید میکند که باید منتظر آن بمانیم، به جای تولید مقدار بهطور مستقیم. اکنون میتوانیم پیامها را از فرستنده به گیرنده ارسال کنیم. توجه کنید که نیازی به ایجاد یک نخ جداگانه یا حتی یک تسک نداریم؛ فقط باید فراخوانی rx.recv
را منتظر بمانیم.
متد همگام Receiver::recv
در std::mpsc::channel
تا زمانی که پیامی دریافت شود مسدود میشود. متد trpl::Receiver::recv
این کار را نمیکند، زیرا async است. به جای مسدود شدن، کنترل را به runtime بازمیگرداند تا زمانی که یا پیامی دریافت شود یا سمت ارسال کانال بسته شود. در مقابل، ما فراخوانی send
را منتظر نمیمانیم، زیرا مسدود نمیشود. نیازی به این کار ندارد، زیرا کانالی که پیام را به آن ارسال میکنیم بدون حد است.
نکته: از آنجا که تمام این کد async در یک بلوک async درون یک فراخوانی
trpl::run
اجرا میشود، همه چیز درون آن میتواند از مسدود شدن اجتناب کند. با این حال، کد خارج از آن روی بازگشت تابعrun
مسدود میشود. این همان هدف اصلی تابعtrpl::run
است: به شما اجازه میدهد انتخاب کنید که کجا روی مجموعهای از کد async مسدود شوید و بنابراین کجا بین کدهای sync و async انتقال دهید. در بیشتر runtimeهای async،run
در واقع به همین دلیلblock_on
نامیده میشود.
دو نکته در مورد این مثال توجه کنید. اول، پیام بلافاصله خواهد رسید. دوم، اگرچه ما اینجا از یک future استفاده میکنیم، هنوز هم هیچ همزمانی (concurrency) وجود ندارد. همه چیز در این لیست به ترتیب انجام میشود، درست مانند اینکه هیچ futureای در کار نباشد.
بیایید به قسمت اول بپردازیم، با ارسال یک سری پیام و خوابیدن بین آنها، همانطور که در لیست ۱۷-۱۰ نشان داده شده است.
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_millis(500)).await; } while let Some(value) = rx.recv().await { println!("received '{value}'"); } }); }
await
بین هر پیامعلاوه بر ارسال پیامها، باید آنها را دریافت کنیم. در این مورد، چون میدانیم چند پیام قرار است دریافت شوند، میتوانستیم این کار را بهصورت دستی با چهار بار فراخوانی rx.recv().await
انجام دهیم. اما در دنیای واقعی، معمولاً در انتظار یک تعداد نامعلوم از پیامها خواهیم بود، بنابراین نیاز داریم تا زمانی که مشخص کنیم پیام دیگری وجود ندارد، به انتظار ادامه دهیم.
در لیست ۱۶-۱۰، از یک حلقه for
برای پردازش تمام آیتمهای دریافتشده از یک کانال همزمان استفاده کردیم. با این حال، Rust هنوز راهی برای نوشتن یک حلقه for
روی یک سری آیتم ناهمزمان ندارد، بنابراین باید از حلقهای استفاده کنیم که قبلاً ندیدهایم: حلقه شرطی while let
. این حلقه نسخه حلقهای از ساختار if let
است که در بخش کنترل جریان مختصر با if let
و let else
دیدیم. این حلقه تا زمانی که الگوی مشخصشده آن همچنان با مقدار مطابقت داشته باشد، به اجرا ادامه میدهد.
فراخوانی rx.recv
یک future تولید میکند که منتظر آن میمانیم. runtime تا زمانی که future آماده شود، آن را متوقف میکند. وقتی پیامی برسد، future به Some(message)
حل میشود، به ازای هر باری که پیام برسد. وقتی کانال بسته شود، صرفنظر از اینکه آیا پیامهایی رسیدهاند یا خیر، future به None
حل میشود تا نشان دهد دیگر مقادیری وجود ندارد و بنابراین باید polling را متوقف کنیم—یعنی منتظر ماندن را متوقف کنیم.
حلقه while let
همه اینها را کنار هم قرار میدهد. اگر نتیجه فراخوانی rx.recv().await
برابر با Some(message)
باشد، به پیام دسترسی پیدا میکنیم و میتوانیم از آن در بدنه حلقه استفاده کنیم، همانطور که با if let
میتوانستیم. اگر نتیجه None
باشد، حلقه متوقف میشود. هر بار که حلقه کامل میشود، به نقطه انتظار بازمیگردد، بنابراین runtime دوباره آن را متوقف میکند تا زمانی که پیام دیگری برسد.
کد اکنون تمام پیامها را با موفقیت ارسال و دریافت میکند. متأسفانه، هنوز چند مشکل وجود دارد. برای یک مورد، پیامها با فواصل نیمثانیهای نمیرسند. همه آنها بهیکباره و ۲ ثانیه (۲۰۰۰ میلیثانیه) پس از شروع برنامه میرسند. برای مورد دیگر، این برنامه هرگز بهطور خودکار پایان نمییابد! در عوض، برای همیشه منتظر پیامهای جدید میماند. برای متوقف کردن آن باید از ctrl-c استفاده کنید.
بیایید با بررسی دلیل اینکه چرا پیامها پس از تأخیر کامل بهیکباره میآیند، شروع کنیم، بهجای اینکه با تأخیر بین هرکدام ظاهر شوند. در یک بلوک async خاص، ترتیب ظاهر شدن کلمات کلیدی await
در کد، همان ترتیبی است که هنگام اجرای برنامه اجرا میشوند.
در فهرست 17-10 فقط یک بلوک async وجود دارد، بنابراین همه چیز در آن بهصورت خطی اجرا میشود. هنوز هم هیچ همزمانی وجود ندارد. تمام فراخوانیهای tx.send
انجام میشوند، در میان تمام فراخوانیهای trpl::sleep
و نقاط انتظار مرتبط با آنها. فقط پس از آن، حلقه while let
به نقاط انتظار روی فراخوانیهای recv
میرسد.
برای به دست آوردن رفتار مورد نظر، که در آن تأخیر خواب بین هر پیام رخ میدهد، باید عملیاتهای tx
و rx
را در بلوکهای async جداگانه قرار دهیم، همانطور که در لیست ۱۷-۱۱ نشان داده شده است. سپس runtime میتواند هر یک از آنها را جداگانه با استفاده از trpl::join
اجرا کند، دقیقاً مانند مثال شمارش. بار دیگر، منتظر نتیجه فراخوانی trpl::join
میمانیم، نه futures فردی. اگر به صورت ترتیبی برای futures فردی منتظر میماندیم، دوباره به جریان ترتیبی بازمیگشتیم—دقیقاً چیزی که تلاش میکنیم انجام ندهیم.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let tx_fut = async {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
trpl::join(tx_fut, rx_fut).await;
});
}
send
و recv
در بلوکهای async
جداگانه و منتظر ماندن برای آیندههای این بلوکهابا کد بهروزرسانیشده در لیست ۱۷-۱۱، پیامها با فواصل ۵۰۰ میلیثانیه چاپ میشوند، بهجای اینکه همه با عجله پس از ۲ ثانیه ظاهر شوند.
برنامه هنوز هم هرگز خارج نمیشود، به دلیل نحوه تعامل حلقه while let
با trpl::join
:
- Future بازگرداندهشده از
trpl::join
تنها زمانی تکمیل میشود که هر دو future ارسالشده به آن تکمیل شده باشند. - Future مربوط به
tx
زمانی تکمیل میشود که پس از ارسال آخرین پیام درvals
خوابیدن آن به پایان برسد. - Future مربوط به
rx
تا زمانی که حلقهwhile let
به پایان نرسد تکمیل نخواهد شد. - حلقه
while let
تا زمانی که منتظرrx.recv
باشد و مقدارNone
تولید شود، پایان نمییابد. - منتظر شدن برای
rx.recv
تنها زمانی مقدارNone
بازمیگرداند که طرف دیگر کانال بسته شود. - کانال تنها در صورتی بسته میشود که
rx.close
را فراخوانی کنیم یا طرف فرستنده، یعنیtx
، حذف شود. - ما هیچجا
rx.close
را فراخوانی نمیکنیم، وtx
تا زمانی که بیرونیترین بلوک async ارسالشده بهtrpl::run
به پایان نرسد، حذف نمیشود. - این بلوک نمیتواند به پایان برسد زیرا منتظر تکمیل شدن
trpl::join
است، که ما را دوباره به بالای این لیست بازمیگرداند.
ما میتوانیم بهصورت دستی با فراخوانی rx.close
کانال را ببندیم، اما این کار چندان منطقی نیست. توقف پس از پردازش تعداد دلخواهی از پیامها باعث میشود برنامه خاموش شود، اما ممکن است پیامها را از دست بدهیم. ما به راه دیگری نیاز داریم تا مطمئن شویم که tx
قبل از پایان تابع حذف میشود.
در حال حاضر، بلوک async که پیامها را ارسال میکند فقط tx
را قرض میگیرد زیرا ارسال پیام نیاز به مالکیت ندارد، اما اگر میتوانستیم tx
را به داخل آن بلوک async منتقل کنیم، پس از پایان آن بلوک حذف میشد. در بخش فصل ۱۳ گرفتن مراجع یا جابهجایی مالکیت یاد گرفتید چگونه از کلمه کلیدی move
با closures استفاده کنید، و همانطور که در بخش فصل ۱۶ استفاده از closures move
با Threadها بحث شد، اغلب هنگام کار با Threadها نیاز داریم دادهها را به داخل closures منتقل کنیم. همان دینامیکهای اساسی برای بلوکهای async اعمال میشود، بنابراین کلمه کلیدی move
با بلوکهای async همانطور کار میکند که با closures کار میکند.
در لیست ۱۷-۱۲، بلوک مورد استفاده برای ارسال پیامها را از async
به async move
تغییر میدهیم. وقتی این نسخه از کد را اجرا میکنیم، برنامه پس از ارسال و دریافت آخرین پیام بهطور مرتب خاتمه مییابد.
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_millis(500)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; trpl::join(tx_fut, rx_fut).await; }); }
این کانال async همچنین یک کانال چند-تولیدی (multiple-producer) است، بنابراین اگر بخواهیم پیامها را از چندین future ارسال کنیم، میتوانیم clone
را روی tx
فراخوانی کنیم، همانطور که در لیست ۱۷-۱۳ نشان داده شده است.
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_millis(500)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; let tx_fut = async move { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_millis(1500)).await; } }; trpl::join3(tx1_fut, tx_fut, rx_fut).await; }); }
ابتدا، tx
را clone کرده و tx1
را خارج از بلوک async اول ایجاد میکنیم. tx1
را همانند قبل با tx
به داخل آن بلوک منتقل میکنیم. سپس، در ادامه، tx
اصلی را به یک بلوک جدید async منتقل میکنیم، جایی که پیامهای بیشتری با یک تأخیر کمی کندتر ارسال میکنیم. ما این بلوک async جدید را بعد از بلوک async برای دریافت پیامها قرار میدهیم، اما میتوانستیم به همان اندازه آن را قبل از آن قرار دهیم. نکته کلیدی ترتیب منتظر ماندن برای futures است، نه ترتیب ایجاد آنها.
هر دو بلوک async برای ارسال پیامها باید بلوکهای async move
باشند تا tx
و tx1
هر دو پس از پایان آن بلوکها حذف شوند. در غیر این صورت، دوباره به همان حلقه بینهایت اولیه بازمیگردیم. در نهایت، از trpl::join
به trpl::join3
تغییر میدهیم تا future اضافی را مدیریت کنیم.
اکنون تمام پیامهای هر دو future ارسال را میبینیم، و چون futures ارسال از تأخیرهای کمی متفاوت پس از ارسال استفاده میکنند، پیامها نیز در این فواصل مختلف دریافت میشوند.
received 'hi'
received 'more'
received 'from'
received 'the'
received 'messages'
received 'future'
received 'for'
received 'you'
این یک شروع خوب است، اما ما را به تعداد محدودی از futures محدود میکند: دو عدد با join
یا سه عدد با join3
. بیایید ببینیم چگونه میتوانیم با تعداد بیشتری از futures کار کنیم.