Streamها: Futures به صورت متوالی
تا اینجا در این فصل، بیشتر به آیندههای فردی (individual futures) پایبند بودهایم. یک استثنای بزرگ استفاده از کانال async بود. به یاد بیاورید چگونه در ابتدای این فصل در بخش “ارسال پیام” از گیرنده کانال async استفاده کردیم. متد async به نام recv
یک دنباله از آیتمها را در طول زمان تولید میکند. این یک نمونه از یک الگوی کلیتر به نام stream است.
ما در فصل ۱۳ یک دنباله از آیتمها را دیدیم، زمانی که ویژگی Iterator
را در بخش ویژگی Iterator و متد next
بررسی کردیم، اما بین iteratorها و گیرنده کانال async دو تفاوت وجود دارد. تفاوت اول در زمان است: iteratorها همزمان (synchronous) هستند، در حالی که گیرنده کانال async است. تفاوت دوم در API است. هنگام کار مستقیم با Iterator
، ما متد همزمان next
را فراخوانی میکنیم. به طور خاص، با stream trpl::Receiver
، به جای آن، یک متد async به نام recv
را فراخوانی کردیم. در غیر این صورت، این APIها احساس بسیار مشابهی دارند و این شباهت تصادفی نیست. یک stream مانند یک شکل ناهمزمان از iteration است. در حالی که trpl::Receiver
به طور خاص منتظر دریافت پیامها است، API عمومی stream بسیار گستردهتر است: این API آیتم بعدی را همانطور که Iterator
انجام میدهد ارائه میدهد، اما به صورت ناهمزمان.
شباهت بین iteratorها و streamها در Rust به این معناست که ما در واقع میتوانیم از هر iterator یک stream ایجاد کنیم. مانند یک iterator، میتوانیم با فراخوانی متد next
یک stream کار کنیم و سپس خروجی را انتظار بکشیم، همانطور که در لیست ۱۷-۳۰ نشان داده شده است.
extern crate trpl; // required for mdbook test
fn main() {
trpl::run(async {
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let iter = values.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);
while let Some(value) = stream.next().await {
println!("The value was: {value}");
}
});
}
ما با یک آرایه از اعداد شروع میکنیم، آن را به یک iterator تبدیل کرده و سپس متد map
را فراخوانی میکنیم تا تمام مقادیر را دو برابر کنیم. سپس با استفاده از تابع trpl::stream_from_iter
، این iterator را به یک stream تبدیل میکنیم. در ادامه، با استفاده از حلقه while let
، بر روی آیتمهای موجود در stream که به مرور میرسند، حلقه میزنیم.
متأسفانه، وقتی سعی میکنیم این کد را اجرا کنیم، کامپایل نمیشود و به جای آن گزارش میدهد که متد next
در دسترس نیست:
error[E0599]: no method named `next` found for struct `Iter` in the current scope
--> src/main.rs:10:40
|
10 | while let Some(value) = stream.next().await {
| ^^^^
|
= note: the full type name has been written to 'file:///projects/async_await/target/debug/deps/async_await-9de943556a6001b8.long-type-1281356139287206597.txt'
= note: consider using `--verbose` to print the full type name to the console
= help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
|
1 + use crate::trpl::StreamExt;
|
1 + use futures_util::stream::stream::StreamExt;
|
1 + use std::iter::Iterator;
|
1 + use std::str::pattern::Searcher;
|
help: there is a method `try_next` with a similar name
|
10 | while let Some(value) = stream.try_next().await {
| ~~~~~~~~
همانطور که این خروجی توضیح میدهد، دلیل خطای کامپایلر این است که برای استفاده از متد next
باید ویژگی مناسب در دامنه باشد. با توجه به بحثهایی که تاکنون داشتهایم، ممکن است منطقی باشد که انتظار داشته باشید این ویژگی Stream
باشد، اما در واقع StreamExt
است. Ext
که مخفف extension است، یک الگوی رایج در جامعه Rust برای گسترش یک ویژگی با ویژگی دیگر است.
ما در انتهای این فصل ویژگیهای Stream
و StreamExt
را با جزئیات بیشتری توضیح خواهیم داد، اما فعلاً تنها چیزی که باید بدانید این است که ویژگی Stream
یک رابط سطح پایین تعریف میکند که به طور مؤثری ویژگیهای Iterator
و Future
را ترکیب میکند. StreamExt
مجموعهای از APIهای سطح بالاتر را روی Stream
ارائه میدهد، از جمله متد next
و همچنین متدهای کاربردی دیگر مشابه آنچه ویژگی Iterator
ارائه میدهد. Stream
و StreamExt
هنوز بخشی از کتابخانه استاندارد Rust نیستند، اما بیشتر crateهای اکوسیستم از همین تعریف استفاده میکنند.
برای رفع خطای کامپایل، باید یک دستور use
برای trpl::StreamExt
اضافه کنیم، همانطور که در فهرست 17-31 آمده است.
extern crate trpl; // required for mdbook test use trpl::StreamExt; fn main() { trpl::run(async { let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; let iter = values.iter().map(|n| n * 2); let mut stream = trpl::stream_from_iter(iter); while let Some(value) = stream.next().await { println!("The value was: {value}"); } }); }
با قرار دادن همه این قطعات در کنار هم، این کد به همان روشی که میخواهیم کار میکند! مهمتر از همه، اکنون که StreamExt
در دامنه داریم، میتوانیم از تمام متدهای کاربردی آن استفاده کنیم، درست مانند iteratorها. برای مثال، در فهرست 17-32، از متد filter
برای فیلتر کردن همه چیز به جز مضربهای سه و پنج استفاده میکنیم.
extern crate trpl; // required for mdbook test use trpl::StreamExt; fn main() { trpl::run(async { let values = 1..101; let iter = values.map(|n| n * 2); let stream = trpl::stream_from_iter(iter); let mut filtered = stream.filter(|value| value % 3 == 0 || value % 5 == 0); while let Some(value) = filtered.next().await { println!("The value was: {value}"); } }); }
Stream
با استفاده از متد StreamExt::filter
البته این خیلی جالب نیست، چون میتوانستیم همین کار را با iteratorهای معمولی و بدون هیچ async انجام دهیم. بیایید ببینیم چه کاری میتوانیم انجام دهیم که منحصربهفرد برای streamها باشد.
ترکیب Streamها
بسیاری از مفاهیم به طور طبیعی بهعنوان streamها نمایش داده میشوند: آیتمهایی که در یک صف در دسترس میشوند، بخشهایی از داده که به صورت تدریجی از سیستم فایل خوانده میشوند وقتی مجموعه داده کامل برای حافظه کامپیوتر بیش از حد بزرگ است، یا دادههایی که به مرور زمان از طریق شبکه میرسند. چون streamها نیز futures هستند، میتوانیم از آنها با هر نوع دیگر future استفاده کنیم و آنها را به روشهای جالبی ترکیب کنیم. برای مثال، میتوانیم رویدادها را به صورت دستهای جمع کنیم تا از ایجاد تعداد زیادی فراخوانی شبکه جلوگیری کنیم، تایماوتهایی روی دنبالهای از عملیاتهای طولانی تنظیم کنیم، یا رویدادهای رابط کاربری را کنترل کنیم تا از انجام کارهای غیرضروری اجتناب کنیم.
بیایید با ساخت یک stream کوچک از پیامها شروع کنیم که بهعنوان یک جایگزین برای یک stream از دادههایی که ممکن است از یک WebSocket یا یک پروتکل ارتباطی بلادرنگ دیگر ببینیم، همانطور که در لیست ۱۷-۳۳ نشان داده شده است.
extern crate trpl; // required for mdbook test use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messages = get_messages(); while let Some(message) = messages.next().await { println!("{message}"); } }); } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for message in messages { tx.send(format!("Message: '{message}'")).unwrap(); } ReceiverStream::new(rx) }
rx
بهعنوان یک ReceiverStream
ابتدا یک تابع به نام get_messages
ایجاد میکنیم که impl Stream<Item = String>
را بازمیگرداند. برای پیادهسازی آن، یک کانال async ایجاد میکنیم، بر روی ۱۰ حرف اول الفبای انگلیسی حلقه میزنیم، و آنها را از طریق کانال ارسال میکنیم.
همچنین از یک نوع جدید به نام ReceiverStream
استفاده میکنیم، که rx
گیرنده از trpl::channel
را به یک Stream
با متد next
تبدیل میکند. دوباره در main
، از یک حلقه while let
برای چاپ تمام پیامها از stream استفاده میکنیم.
وقتی این کد را اجرا میکنیم، دقیقاً نتایجی را که انتظار داریم دریافت میکنیم:
Message: 'a'
Message: 'b'
Message: 'c'
Message: 'd'
Message: 'e'
Message: 'f'
Message: 'g'
Message: 'h'
Message: 'i'
Message: 'j'
دوباره، میتوانستیم این کار را با API معمولی Receiver
یا حتی API معمولی Iterator
انجام دهیم، اما بیایید ویژگیای اضافه کنیم که نیاز به streams داشته باشد: اضافه کردن یک تایماوت که برای هر آیتم در stream اعمال شود، و یک تأخیر روی آیتمهایی که ارسال میکنیم، همانطور که در لیست ۱۷-۳۴ نشان داده شده است.
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messages = pin!(get_messages().timeout(Duration::from_millis(200))); while let Some(result) = messages.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }) } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for message in messages { tx.send(format!("Message: '{message}'")).unwrap(); } ReceiverStream::new(rx) }
StreamExt::timeout
برای تعیین یک محدودیت زمانی برای آیتمهای موجود در یک streamابتدا یک تایماوت به stream با استفاده از متد timeout
اضافه میکنیم، که از ویژگی StreamExt
میآید. سپس بدنه حلقه while let
را بهروزرسانی میکنیم، زیرا اکنون stream یک Result
بازمیگرداند. حالت Ok
نشاندهنده این است که یک پیام بهموقع رسیده است؛ حالت Err
نشان میدهد که تایماوت قبل از رسیدن هر پیامی منقضی شده است. روی این نتیجه یک match
انجام میدهیم و یا پیام را وقتی با موفقیت دریافت میکنیم چاپ میکنیم، یا اخطاری درباره تایماوت چاپ میکنیم. در نهایت، توجه کنید که پس از اعمال تایماوت به پیامها، آنها را pin میکنیم، زیرا ابزار تایماوت یک stream تولید میکند که باید pin شود تا بتوان آن را poll کرد.
با این حال، چون بین پیامها تأخیری وجود ندارد، این تایماوت رفتار برنامه را تغییر نمیدهد. بیایید یک تأخیر متغیر به پیامهایی که ارسال میکنیم اضافه کنیم، همانطور که در لیست ۱۷-۳۵ نشان داده شده است.
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messages = pin!(get_messages().timeout(Duration::from_millis(200))); while let Some(result) = messages.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }) } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; tx.send(format!("Message: '{message}'")).unwrap(); } }); ReceiverStream::new(rx) }
tx
با یک تأخیر async بدون تبدیل get_messages
به یک تابع asyncبرای خوابیدن بین پیامها در تابع get_messages
بدون مسدود کردن، باید از async استفاده کنیم. با این حال، نمیتوانیم خود get_messages
را به یک تابع async تبدیل کنیم، زیرا در این صورت یک Future<Output = Stream<Item = String>>
به جای یک Stream<Item = String>
بازمیگرداند. کاربر باید خود get_messages
را منتظر بماند تا به stream دسترسی پیدا کند. اما به یاد داشته باشید: هر چیزی در یک آینده مشخص بهصورت خطی اتفاق میافتد؛ همزمانی بین آیندهها اتفاق میافتد. انتظار برای get_messages
نیاز دارد که تمام پیامها را ارسال کند، از جمله خوابیدن بین ارسال هر پیام، قبل از بازگرداندن stream گیرنده. در نتیجه، زمان محدود بیفایده میشود. هیچ تأخیری در خود stream وجود نخواهد داشت: تمام تأخیرها قبل از در دسترس قرار گرفتن stream اتفاق میافتد.
در عوض، get_messages
را بهعنوان یک تابع معمولی که یک stream بازمیگرداند باقی میگذاریم و یک تسک برای مدیریت فراخوانیهای async sleep
ایجاد میکنیم.
نکته: فراخوانی
spawn_task
به این روش کار میکند زیرا ما از قبل runtime خود را تنظیم کردهایم. فراخوانی این پیادهسازی خاص ازspawn_task
بدون تنظیم اولیه یک runtime باعث panic میشود. پیادهسازیهای دیگر معاملات متفاوتی انتخاب میکنند: ممکن است یک runtime جدید ایجاد کنند و بنابراین از panic اجتناب کنند، اما با کمی سربار اضافی مواجه شوند، یا به سادگی راهی مستقل برای ایجاد تسکها بدون ارجاع به یک runtime ارائه ندهند. باید مطمئن شوید که میدانید runtime شما چه معاملهای انتخاب کرده است و کد خود را بر این اساس بنویسید!
اکنون کد ما نتیجه بسیار جالبتری دارد! بین هر جفت پیام، یک خطا گزارش میشود: Problem: Elapsed(())
.
Message: 'a'
Problem: Elapsed(())
Message: 'b'
Message: 'c'
Problem: Elapsed(())
Message: 'd'
Message: 'e'
Problem: Elapsed(())
Message: 'f'
Message: 'g'
Problem: Elapsed(())
Message: 'h'
Message: 'i'
Problem: Elapsed(())
Message: 'j'
تایماوت از رسیدن پیامها در نهایت جلوگیری نمیکند. ما همچنان تمام پیامهای اصلی را دریافت میکنیم، زیرا کانال ما بدون محدودیت است: میتواند به اندازهای که در حافظه جا شود پیامها را نگه دارد. اگر پیام قبل از تایماوت نرسد، handler stream ما آن را مدیریت میکند، اما وقتی دوباره stream را poll کند، ممکن است پیام اکنون رسیده باشد.
اگر به رفتار متفاوتی نیاز دارید، میتوانید از انواع دیگر کانالها یا به طور کلی انواع دیگر streamها استفاده کنید. بیایید یکی از این موارد را در عمل ببینیم، با ترکیب یک stream از فواصل زمانی با این stream از پیامها.
ترکیب Streamها
ابتدا، یک stream دیگر ایجاد میکنیم که اگر به طور مستقیم اجرا شود، هر میلیثانیه یک آیتم ارسال میکند. برای سادگی، میتوانیم از تابع sleep
برای ارسال یک پیام با تأخیر استفاده کنیم و آن را با همان روشی که در get_messages
استفاده کردیم—ایجاد یک stream از یک کانال—ترکیب کنیم. تفاوت این است که این بار، میخواهیم تعداد فواصل زمانی که گذشتهاند را بازگردانیم، بنابراین نوع بازگشتی impl Stream<Item = u32>
خواهد بود، و میتوانیم تابع را get_intervals
بنامیم (نگاه کنید به لیست ۱۷-۳۶).
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messages = pin!(get_messages().timeout(Duration::from_millis(200))); while let Some(result) = messages.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }) } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; tx.send(format!("Message: '{message}'")).unwrap(); } }); ReceiverStream::new(rx) } fn get_intervals() -> impl Stream<Item = u32> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let mut count = 0; loop { trpl::sleep(Duration::from_millis(1)).await; count += 1; tx.send(count).unwrap(); } }); ReceiverStream::new(rx) }
ابتدا یک متغیر count
را درون task تعریف میکنیم. (میتوانستیم آن را خارج از task نیز تعریف کنیم، اما محدود کردن دامنه هر متغیر دادهشده واضحتر است.) سپس یک حلقه بینهایت ایجاد میکنیم. در هر تکرار حلقه، به صورت ناهمزمان به مدت یک میلیثانیه میخوابد، مقدار count
را افزایش میدهد و سپس آن را از طریق کانال ارسال میکند. از آنجا که همه اینها درون taskی که توسط spawn_task
ایجاد شده است قرار دارد، همه آن—از جمله حلقه بینهایت—همراه با runtime پاکسازی میشود.
این نوع حلقه بینهایت، که تنها زمانی به پایان میرسد که کل runtime از بین برود، در async Rust نسبتاً رایج است: بسیاری از برنامهها نیاز دارند که به طور نامحدود اجرا شوند. با async، این کار چیزی دیگر را مسدود نمیکند، تا زمانی که حداقل یک نقطه انتظار (await point) در هر تکرار از حلقه وجود داشته باشد.
حالا، درون بلوک async تابع اصلی ما، میتوانیم تلاش کنیم که streamهای messages
و intervals
را با هم ترکیب کنیم، همانطور که در لیست ۱۷-۳۷ نشان داده شده است.
extern crate trpl; // required for mdbook test
use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};
fn main() {
trpl::run(async {
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals();
let merged = messages.merge(intervals);
while let Some(result) = merged.next().await {
match result {
Ok(message) => println!("{message}"),
Err(reason) => eprintln!("Problem: {reason:?}"),
}
}
})
}
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for (index, message) in messages.into_iter().enumerate() {
let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
trpl::sleep(Duration::from_millis(time_to_sleep)).await;
tx.send(format!("Message: '{message}'")).unwrap();
}
});
ReceiverStream::new(rx)
}
fn get_intervals() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let mut count = 0;
loop {
trpl::sleep(Duration::from_millis(1)).await;
count += 1;
tx.send(count).unwrap();
}
});
ReceiverStream::new(rx)
}
messages
و intervals
ابتدا get_intervals
را فراخوانی میکنیم. سپس streamهای messages
و intervals
را با استفاده از متد merge
ترکیب میکنیم. این متد چندین stream را به یک stream ترکیب میکند که آیتمها را از هر یک از streamهای منبع، به محض در دسترس بودن، تولید میکند، بدون اینکه ترتیب خاصی را اعمال کند. در نهایت، به جای اینکه روی messages
حلقه بزنیم، روی این stream ترکیبی حلقه میزنیم.
در این مرحله، نه messages
و نه intervals
نیازی به pin یا mutable بودن ندارند، زیرا هر دو در یک stream واحد به نام merged
ترکیب میشوند. با این حال، این فراخوانی به merge
کامپایل نمیشود! (فراخوانی next
در حلقه while let
هم کامپایل نمیشود، اما به آن برمیگردیم.) دلیل آن این است که این دو stream انواع مختلفی دارند. stream messages
نوع Timeout<impl Stream<Item = String>>
دارد، جایی که Timeout
نوعی است که ویژگی Stream
را برای فراخوانی timeout
پیادهسازی میکند. stream intervals
نوع impl Stream<Item = u32>
دارد. برای ترکیب این دو stream، باید یکی از آنها را به نوع دیگری تبدیل کنیم. ما stream intervals
را بازبینی میکنیم، زیرا messages
قبلاً در قالب اصلی مورد نظر ما است و باید خطاهای timeout را مدیریت کند (نگاه کنید به لیست ۱۷-۳۸).
extern crate trpl; // required for mdbook test
use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};
fn main() {
trpl::run(async {
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals()
.map(|count| format!("Interval: {count}"))
.timeout(Duration::from_secs(10));
let merged = messages.merge(intervals);
let mut stream = pin!(merged);
while let Some(result) = stream.next().await {
match result {
Ok(message) => println!("{message}"),
Err(reason) => eprintln!("Problem: {reason:?}"),
}
}
})
}
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for (index, message) in messages.into_iter().enumerate() {
let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
trpl::sleep(Duration::from_millis(time_to_sleep)).await;
tx.send(format!("Message: '{message}'")).unwrap();
}
});
ReceiverStream::new(rx)
}
fn get_intervals() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let mut count = 0;
loop {
trpl::sleep(Duration::from_millis(1)).await;
count += 1;
tx.send(count).unwrap();
}
});
ReceiverStream::new(rx)
}
intervals
با نوع stream messages
ابتدا میتوانیم از متد کمکی map
برای تبدیل intervals
به یک رشته استفاده کنیم. دوم، نیاز داریم که Timeout
از messages
را مدیریت کنیم. با این حال، چون واقعاً نمیخواهیم تایماوتی برای intervals
داشته باشیم، میتوانیم یک تایماوت ایجاد کنیم که طولانیتر از مدتهای دیگر مورد استفاده ما باشد. در اینجا، یک تایماوت ۱۰ ثانیهای با استفاده از Duration::from_secs(10)
ایجاد میکنیم. در نهایت، نیاز داریم که stream
را متغیر (mutable
) کنیم تا فراخوانیهای next
در حلقه while let
بتوانند روی stream تکرار کنند و آن را pin کنیم تا این کار ایمن باشد. این ما را تقریباً به جایی که باید برسیم میرساند. همه چیز از نظر نوع بررسی میشود. اما اگر این کد را اجرا کنید، دو مشکل وجود خواهد داشت. اول، هیچگاه متوقف نمیشود! باید با زدن ctrl-c آن را متوقف کنید. دوم، پیامهای الفبای انگلیسی در میان تمام پیامهای شمارنده interval دفن خواهند شد:
--snip--
Interval: 38
Interval: 39
Interval: 40
Message: 'a'
Interval: 41
Interval: 42
Interval: 43
--snip--
لیست ۱۷-۳۹ یک روش برای حل این دو مشکل آخر را نشان میدهد.
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let messages = get_messages().timeout(Duration::from_millis(200)); let intervals = get_intervals() .map(|count| format!("Interval: {count}")) .throttle(Duration::from_millis(100)) .timeout(Duration::from_secs(10)); let merged = messages.merge(intervals).take(20); let mut stream = pin!(merged); while let Some(result) = stream.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }) } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; tx.send(format!("Message: '{message}'")).unwrap(); } }); ReceiverStream::new(rx) } fn get_intervals() -> impl Stream<Item = u32> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let mut count = 0; loop { trpl::sleep(Duration::from_millis(1)).await; count += 1; tx.send(count).unwrap(); } }); ReceiverStream::new(rx) }
throttle
و take
برای مدیریت streams ترکیبشدهابتدا از متد throttle
روی stream intervals
استفاده میکنیم تا این stream باعث غرق شدن stream messages
نشود. Throttling روشی برای محدود کردن نرخ فراخوانی یک تابع است—یا در این مورد، محدود کردن نرخ poll کردن یک stream. یک بار در هر ۱۰۰ میلیثانیه کافی خواهد بود، زیرا تقریباً به همان اندازه پیامهای ما میرسند.
برای محدود کردن تعداد آیتمهایی که از یک stream قبول میکنیم، متد take
را روی stream merged
اعمال میکنیم، زیرا میخواهیم خروجی نهایی را محدود کنیم، نه فقط یکی از streamها را.
اکنون وقتی برنامه را اجرا میکنیم، پس از دریافت ۲۰ آیتم از stream متوقف میشود و intervals باعث غرق شدن messages نمیشود. همچنین، ما دیگر Interval: 100
یا Interval: 200
و موارد مشابه را نمیبینیم، بلکه به جای آن Interval: 1
، Interval: 2
و به همین ترتیب دریافت میکنیم—حتی اگر یک stream منبع داریم که میتواند هر میلیثانیه یک رویداد تولید کند. دلیل این است که فراخوانی throttle
یک stream جدید تولید میکند که stream اصلی را بستهبندی میکند تا stream اصلی فقط با نرخ throttle و نه با نرخ “ذاتی” خود poll شود. ما یک سری پیام interval غیرقابل پردازش نداریم که انتخاب کرده باشیم آنها را نادیده بگیریم. بلکه، ما هرگز آن پیامهای interval را در وهله اول تولید نمیکنیم! این همان “تنبلی” ذاتی futures در Rust است که دوباره به کار گرفته میشود و به ما اجازه میدهد ویژگیهای عملکردی خود را انتخاب کنیم.
Interval: 1
Message: 'a'
Interval: 2
Interval: 3
Problem: Elapsed(())
Interval: 4
Message: 'b'
Interval: 5
Message: 'c'
Interval: 6
Interval: 7
Problem: Elapsed(())
Interval: 8
Message: 'd'
Interval: 9
Message: 'e'
Interval: 10
Interval: 11
Problem: Elapsed(())
Interval: 12
تنها یک مورد باقی مانده که باید مدیریت کنیم: خطاها! با هر دو stream مبتنی بر کانال، فراخوانیهای send
ممکن است در صورتی که طرف دیگر کانال بسته شود، با شکست مواجه شوند—و این به نحوه اجرای runtime برای futures که stream را تشکیل میدهند بستگی دارد. تاکنون این احتمال را با فراخوانی unwrap
نادیده گرفتهایم، اما در یک برنامه با رفتار مناسب، باید بهطور صریح خطا را مدیریت کنیم، حداقل با پایان دادن به حلقه تا دیگر پیام ارسال نکنیم. لیست ۱۷-۴۰ یک استراتژی ساده برای مدیریت خطا را نشان میدهد: چاپ مشکل و سپس break
از حلقهها.
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let messages = get_messages().timeout(Duration::from_millis(200)); let intervals = get_intervals() .map(|count| format!("Interval #{count}")) .throttle(Duration::from_millis(500)) .timeout(Duration::from_secs(10)); let merged = messages.merge(intervals).take(20); let mut stream = pin!(merged); while let Some(result) = stream.next().await { match result { Ok(item) => println!("{item}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }); } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; if let Err(send_error) = tx.send(format!("Message: '{message}'")) { eprintln!("Cannot send message '{message}': {send_error}"); break; } } }); ReceiverStream::new(rx) } fn get_intervals() -> impl Stream<Item = u32> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let mut count = 0; loop { trpl::sleep(Duration::from_millis(1)).await; count += 1; if let Err(send_error) = tx.send(count) { eprintln!("Could not send interval {count}: {send_error}"); break; }; } }); ReceiverStream::new(rx) }
همانطور که معمول است، روش درست برای مدیریت یک خطای ارسال پیام میتواند متفاوت باشد؛ فقط مطمئن شوید که یک استراتژی دارید.
اکنون که مقدار زیادی از کد async را در عمل مشاهده کردیم، بیایید کمی به عقب برگردیم و به جزئیات نحوه کارکرد Future
، Stream
و ویژگیهای کلیدی دیگر که Rust برای اجرای async استفاده میکند، بپردازیم.