جمع‌بندی: Futures، Tasks، و Threads

همان‌طور که در فصل ۱۶ دیدیم، Threads یکی از روش‌های همزمانی را فراهم می‌کنند. در این فصل با روش دیگری آشنا شدیم: استفاده از async با Futures و Streams. اگر برایتان سؤال پیش آمده که چه زمانی باید یکی از این روش‌ها را انتخاب کنید، پاسخ این است: بستگی دارد! و در بسیاری از موارد، انتخاب فقط بین Threads یا async نیست، بلکه ترکیبی از Threads و async است.

بسیاری از سیستم‌عامل‌ها مدل‌های همزمانی مبتنی بر Threads را دهه‌هاست که فراهم کرده‌اند و بسیاری از زبان‌های برنامه‌نویسی از این مدل‌ها پشتیبانی می‌کنند. با این حال، این مدل‌ها بدون نقاط ضعف نیستند. در بسیاری از سیستم‌عامل‌ها، هر Thread مقدار زیادی حافظه استفاده می‌کند و راه‌اندازی و خاموش کردن آن‌ها نیز هزینه‌ای به همراه دارد. Threads همچنین فقط زمانی قابل استفاده هستند که سیستم‌عامل و سخت‌افزار شما از آن‌ها پشتیبانی کنند. برخلاف کامپیوترهای دسکتاپ و موبایل اصلی، برخی از سیستم‌های تعبیه‌شده (embedded systems) هیچ سیستم‌عاملی ندارند و بنابراین Threads هم ندارند.

مدل async مجموعه‌ای متفاوت و در نهایت مکمل از مصالحه‌ها را فراهم می‌کند. در مدل async، عملیات همزمان نیازی به Threadهای جداگانه ندارند. در عوض، می‌توانند بر روی Tasks اجرا شوند، همان‌طور که در بخش Streams از trpl::spawn_task برای شروع کار از یک تابع همزمان استفاده کردیم. یک Task مشابه یک Thread است، اما به جای اینکه توسط سیستم‌عامل مدیریت شود، توسط کد سطح کتابخانه‌ای یعنی Runtime مدیریت می‌شود.

در بخش قبلی، دیدیم که می‌توانیم یک Stream با استفاده از یک کانال async و ایجاد یک Task async که می‌توانیم از کد همزمان فراخوانی کنیم، بسازیم. می‌توانیم همین کار را با یک Thread انجام دهیم. در لیست ۱۷-۴۰ از trpl::spawn_task و trpl::sleep استفاده کردیم. در لیست ۱۷-۴۱، این موارد را با APIهای thread::spawn و thread::sleep از کتابخانه استاندارد در تابع get_intervals جایگزین می‌کنیم.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{pin::pin, thread, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let messages = get_messages().timeout(Duration::from_millis(200));
        let intervals = get_intervals()
            .map(|count| format!("Interval #{count}"))
            .throttle(Duration::from_millis(500))
            .timeout(Duration::from_secs(10));
        let merged = messages.merge(intervals).take(20);
        let mut stream = pin!(merged);

        while let Some(result) = stream.next().await {
            match result {
                Ok(item) => println!("{item}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    });
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];

        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            if let Err(send_error) = tx.send(format!("Message: '{message}'")) {
                eprintln!("Cannot send message '{message}': {send_error}");
                break;
            }
        }
    });

    ReceiverStream::new(rx)
}

fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    // This is *not* `trpl::spawn` but `std::thread::spawn`!
    thread::spawn(move || {
        let mut count = 0;
        loop {
            // Likewise, this is *not* `trpl::sleep` but `std::thread::sleep`!
            thread::sleep(Duration::from_millis(1));
            count += 1;

            if let Err(send_error) = tx.send(count) {
                eprintln!("Could not send interval {count}: {send_error}");
                break;
            };
        }
    });

    ReceiverStream::new(rx)
}
Listing 17-41: استفاده از APIهای std::thread به جای APIهای async trpl برای تابع get_intervals

اگر این کد را اجرا کنید، خروجی آن دقیقاً مشابه لیست ۱۷-۴۰ خواهد بود. و توجه کنید که از دید کدی که فراخوانی انجام می‌دهد، تغییرات بسیار کمی وجود دارد. علاوه بر این، حتی اگر یکی از توابع ما یک Task async را روی Runtime ایجاد کرده و دیگری یک Thread سیستم‌عامل را ایجاد کرده باشد، Streamهای حاصل از این تفاوت‌ها تأثیری نمی‌گیرند.

با وجود شباهت‌هایشان، این دو رویکرد رفتارهای بسیار متفاوتی دارند، اگرچه ممکن است در این مثال بسیار ساده سخت باشد این تفاوت‌ها را اندازه‌گیری کنیم. می‌توانیم میلیون‌ها Task async را روی هر کامپیوتر شخصی مدرن ایجاد کنیم. اما اگر بخواهیم همین کار را با Threads انجام دهیم، واقعاً از حافظه خارج خواهیم شد!

اما دلیلی وجود دارد که این APIها این‌قدر مشابه هستند. نخ‌ها به عنوان مرزی برای مجموعه‌ای از عملیات همزمان عمل می‌کنند؛ همزمانی بین نخ‌ها ممکن است. tasks به عنوان مرزی برای مجموعه‌ای از عملیات غیرهمزمان عمل می‌کنند؛ همزمانی هم بین و هم درون tasks ممکن است، زیرا یک task می‌تواند بین futures در بدنه خود جابه‌جا شود. در نهایت، futures کوچک‌ترین واحد همزمانی در Rust هستند و هر future ممکن است یک درخت از futures دیگر را نمایندگی کند. runtime—به‌ویژه، executor آن—tasks را مدیریت می‌کند و tasks futures را مدیریت می‌کنند. از این نظر، tasks شبیه نخ‌های سبک و مدیریت‌شده توسط runtime هستند که قابلیت‌های بیشتری دارند زیرا توسط runtime به جای سیستم‌عامل مدیریت می‌شوند.

این بدان معنا نیست که Taskهای async همیشه بهتر از Threads هستند (یا برعکس). همزمانی با Threads از برخی جهات مدل برنامه‌نویسی ساده‌تری نسبت به همزمانی با async است. این می‌تواند یک نقطه قوت یا ضعف باشد. Threads تا حدودی “آتش و فراموشی” (fire and forget) هستند؛ آن‌ها معادل ذاتی برای یک Future ندارند، بنابراین بدون اینکه جز توسط خود سیستم‌عامل متوقف شوند، تا انتها اجرا می‌شوند. به عبارت دیگر، آن‌ها پشتیبانی داخلی برای همزمانی درون وظیفه‌ای (intratask concurrency) مانند Futures ندارند. همچنین، Threads در Rust هیچ مکانیزمی برای لغو ندارند—موضوعی که به‌طور صریح در این فصل به آن پرداخته نشده است، اما از این واقعیت که هر زمان یک Future به پایان می‌رسید، وضعیت آن به درستی پاک‌سازی می‌شد، به‌طور ضمنی بیان شده است.

این محدودیت‌ها همچنین باعث می‌شوند Threads سخت‌تر از Futures ترکیب شوند. برای مثال، استفاده از Threads برای ساخت ابزارهایی مانند متدهای timeout و throttle که قبلاً در این فصل ساخته‌ایم، بسیار دشوارتر است. این واقعیت که Futures ساختار داده غنی‌تری هستند به این معناست که آن‌ها می‌توانند به‌طور طبیعی‌تر با هم ترکیب شوند، همان‌طور که دیده‌ایم.

Tasks، در نتیجه، کنترل اضافه‌ای بر روی Futures به ما می‌دهند و به ما اجازه می‌دهند که انتخاب کنیم کجا و چگونه آن‌ها را گروه‌بندی کنیم. و معلوم می‌شود که Threads و Tasks اغلب به خوبی با هم کار می‌کنند، زیرا Tasks می‌توانند (حداقل در برخی Runtimeها) بین Threads جابه‌جا شوند. در واقع، در پس‌زمینه، Runtimeی که استفاده کرده‌ایم—از جمله توابع spawn_blocking و spawn_task—به طور پیش‌فرض چند Threadی (multithreaded) است! بسیاری از Runtimeها از رویکردی به نام دزدیدن کار (work stealing) استفاده می‌کنند تا Tasks را به‌طور شفاف بین Threads جابه‌جا کنند، بر اساس اینکه چگونه Threads در حال حاضر استفاده می‌شوند، تا عملکرد کلی سیستم را بهبود بخشند. این رویکرد در واقع به Threads و Tasks، و بنابراین Futures نیاز دارد.

وقتی در مورد استفاده از روش‌های مختلف فکر می‌کنید، این قوانین کلی را در نظر بگیرید:

  • اگر کار به شدت قابل موازی‌سازی است، مانند پردازش مقدار زیادی داده که هر بخش می‌تواند جداگانه پردازش شود، Threads انتخاب بهتری هستند.
  • اگر کار به شدت همزمان است، مانند مدیریت پیام‌ها از منابع مختلفی که ممکن است در فواصل یا نرخ‌های مختلف وارد شوند، async انتخاب بهتری است.

و اگر به هر دو موازی‌سازی و همزمانی نیاز دارید، لازم نیست بین Threads و async یکی را انتخاب کنید. می‌توانید از هر دو به طور آزادانه استفاده کنید و اجازه دهید هر کدام نقشی که در آن بهتر هستند را بازی کنند. برای مثال، لیست ۱۷-۴۲ یک نمونه نسبتاً رایج از این نوع ترکیب در کد Rust دنیای واقعی را نشان می‌دهد.

Filename: src/main.rs
extern crate trpl; // for mdbook test

use std::{thread, time::Duration};

fn main() {
    let (tx, mut rx) = trpl::channel();

    thread::spawn(move || {
        for i in 1..11 {
            tx.send(i).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    trpl::run(async {
        while let Some(message) = rx.recv().await {
            println!("{message}");
        }
    });
}
Listing 17-42: ارسال پیام‌ها با کد مسدودکننده در یک نخ و انتظار برای پیام‌ها در یک بلوک async

ما با ایجاد یک کانال async شروع می‌کنیم، سپس یک Thread ایجاد می‌کنیم که مالکیت بخش ارسال‌کننده کانال را به دست می‌گیرد. درون Thread، اعداد ۱ تا ۱۰ را ارسال می‌کنیم و بین هر ارسال یک ثانیه می‌خوابیم. در نهایت، یک Future که با یک بلوک async ایجاد شده و به trpl::run ارسال شده است را اجرا می‌کنیم، درست همان‌طور که در طول این فصل انجام داده‌ایم. در آن Future، منتظر دریافت پیام‌ها می‌مانیم، دقیقاً مانند سایر مثال‌های ارسال پیام که دیده‌ایم.

برای بازگشت به سناریویی که فصل را با آن آغاز کردیم، تصور کنید که مجموعه‌ای از وظایف کدگذاری ویدئو را با استفاده از یک Thread اختصاصی (زیرا کدگذاری ویدئو به شدت وابسته به پردازش است) اجرا می‌کنید، اما با استفاده از یک کانال async به رابط کاربری اطلاع می‌دهید که آن عملیات به پایان رسیده‌اند. در موارد استفاده واقعی، بی‌شمار نمونه از این نوع ترکیب‌ها وجود دارد.

خلاصه

این آخرین باری نیست که در این کتاب با همزمانی مواجه می‌شوید. پروژه موجود در فصل ۲۱ این مفاهیم را در یک موقعیت واقعی‌تر از مثال‌های ساده‌ای که در اینجا بحث شد، به کار خواهد گرفت و حل مسئله با استفاده از Threadها در مقابل Tasks را به طور مستقیم‌تر مقایسه خواهد کرد.

صرف‌نظر از اینکه کدام یک از این رویکردها را انتخاب می‌کنید، Rust ابزارهای لازم برای نوشتن کدی ایمن، سریع و همزمان را در اختیار شما قرار می‌دهد—چه برای یک وب سرور با توان عملیاتی بالا و چه برای یک سیستم‌عامل تعبیه‌شده.

در ادامه، درباره روش‌های ایدئوماتیک برای مدل‌سازی مشکلات و ساختاردهی راه‌حل‌ها به‌عنوان برنامه‌های Rust شما بزرگ‌تر می‌شوند صحبت خواهیم کرد. علاوه بر این، درباره اینکه ایدئوم‌های Rust چگونه با آن‌هایی که ممکن است از برنامه‌نویسی شی‌گرا با آن‌ها آشنا باشید مرتبط هستند بحث خواهیم کرد.