استفاده از پیام‌رسانی برای انتقال داده بین نخ‌ها

یکی از رویکردهای محبوب و فزاینده برای اطمینان از همزمانی ایمن، پیام‌رسانی است، جایی که نخ‌ها یا بازیگران با ارسال پیام‌های حاوی داده به یکدیگر ارتباط برقرار می‌کنند. ایده این رویکرد در یک شعار از مستندات زبان Go آمده است:
«با به اشتراک گذاشتن حافظه ارتباط برقرار نکنید؛ بلکه حافظه را با ارتباط برقرار کردن به اشتراک بگذارید.»

برای رسیدن به همزمانی مبتنی بر ارسال پیام، کتابخانه استاندارد Rust یک پیاده‌سازی از کانال‌ها ارائه می‌دهد. کانال یک مفهوم عمومی برنامه‌نویسی است که داده‌ها را از یک نخ به نخ دیگر ارسال می‌کند.

می‌توانید یک کانال در برنامه‌نویسی را مانند یک کانال آبی جهت‌دار، مانند یک جریان یا رودخانه تصور کنید. اگر چیزی مانند یک اردک پلاستیکی را به داخل رودخانه بیندازید، آن اردک به پایین‌دست رودخانه سفر می‌کند و به انتهای آن می‌رسد.

یک کانال دو نیمه دارد: یک فرستنده و یک گیرنده. نیمه فرستنده محل بالادستی است که اردک‌های پلاستیکی را به داخل رودخانه می‌اندازید، و نیمه گیرنده جایی است که اردک پلاستیکی در پایین‌دست پایان می‌یابد. یک بخش از کد شما متدهایی روی فرستنده با داده‌ای که می‌خواهید ارسال کنید فراخوانی می‌کند، و بخش دیگری انتهای گیرنده را برای پیام‌های واردشده بررسی می‌کند. اگر هر یک از نیمه‌های فرستنده یا گیرنده حذف شوند، کانال به عنوان بسته‌شده در نظر گرفته می‌شود.

اینجا، برنامه‌ای ایجاد می‌کنیم که یک نخ برای تولید مقادیر و ارسال آن‌ها از طریق یک کانال دارد، و نخ دیگری مقادیر را دریافت کرده و چاپ می‌کند. برای نمایش این ویژگی، مقادیر ساده‌ای بین نخ‌ها از طریق یک کانال ارسال خواهیم کرد. پس از آشنایی با این تکنیک، می‌توانید از کانال‌ها برای هر نخ‌هایی که نیاز به ارتباط با یکدیگر دارند استفاده کنید، مانند یک سیستم چت یا سیستمی که بسیاری از نخ‌ها بخش‌هایی از یک محاسبه را انجام می‌دهند و آن بخش‌ها را به یک نخ ارسال می‌کنند که نتایج را تجمیع می‌کند.

ابتدا، در لیستینگ 16-6، یک کانال ایجاد می‌کنیم اما هنوز کاری با آن انجام نمی‌دهیم. توجه داشته باشید که این کد هنوز کامپایل نمی‌شود زیرا Rust نمی‌تواند نوع مقادیری که می‌خواهیم از طریق کانال ارسال کنیم را تعیین کند.

Filename: src/main.rs

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

لیستینگ 16-6: ایجاد یک کانال و اختصاص دو نیمه آن به tx و rx

ما یک کانال جدید با استفاده از تابع mpsc::channel ایجاد می‌کنیم؛ mpsc مخفف تولیدکننده‌های چندگانه، مصرف‌کننده تک‌گانه است. به طور خلاصه، نحوه پیاده‌سازی کانال‌ها توسط کتابخانه استاندارد Rust به این معناست که یک کانال می‌تواند چندین انتهای ارسال‌کننده داشته باشد که مقادیر تولید می‌کنند، اما فقط یک انتهای گیرنده که آن مقادیر را مصرف می‌کند. تصور کنید چندین جریان کوچک به یک رودخانه بزرگ می‌ریزند: هر چیزی که در هر یک از جریان‌ها ارسال شود، در نهایت به رودخانه بزرگ در انتها می‌رسد. فعلاً با یک تولیدکننده شروع می‌کنیم، اما وقتی این مثال کار کرد، چندین تولیدکننده اضافه خواهیم کرد.

تابع mpsc::channel یک جفت را برمی‌گرداند که عنصر اول آن انتهای ارسال‌کننده (فرستنده) و عنصر دوم آن انتهای گیرنده (گیرنده) است. اختصارات tx و rx در بسیاری از حوزه‌ها به ترتیب برای فرستنده و گیرنده استفاده می‌شوند، بنابراین متغیرهای خود را به این نام‌ها می‌نامیم تا هر انتها را نشان دهیم. ما از یک دستور let با یک الگو که جفت را تخریب می‌کند استفاده می‌کنیم؛ در فصل 19 درباره استفاده از الگوها در دستورات let و تخریب بحث خواهیم کرد. فعلاً بدانید که استفاده از یک دستور let به این روش یک رویکرد مناسب برای استخراج قطعات جفت بازگشتی توسط mpsc::channel است.

بیایید انتهای ارسال‌کننده را به یک نخ ایجادشده منتقل کنیم و یک رشته ارسال کنیم تا نخ ایجادشده با نخ اصلی ارتباط برقرار کند، همان‌طور که در لیستینگ 16-7 نشان داده شده است. این شبیه به انداختن یک اردک پلاستیکی در رودخانه در بالادست یا ارسال یک پیام چت از یک نخ به نخ دیگر است.

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}
Listing 16-7: انتقال tx به یک نخ ایجادشده و ارسال ‘hi’

دوباره از thread::spawn برای ایجاد یک نخ جدید استفاده می‌کنیم و سپس از move برای انتقال tx به closure استفاده می‌کنیم تا نخ ایجادشده مالک tx شود. نخ ایجادشده باید مالک فرستنده باشد تا بتواند پیام‌ها را از طریق کانال ارسال کند. فرستنده یک متد send دارد که مقداری که می‌خواهیم ارسال کنیم را می‌گیرد. متد send نوع Result<T, E> را برمی‌گرداند، بنابراین اگر گیرنده قبلاً حذف شده باشد و جایی برای ارسال مقدار وجود نداشته باشد، عملیات ارسال یک خطا برمی‌گرداند. در این مثال، ما unwrap را برای panic در صورت خطا فراخوانی می‌کنیم. اما در یک برنامه واقعی، باید آن را به درستی مدیریت کنیم: برای مرور استراتژی‌های مدیریت خطای مناسب به فصل 9 بازگردید.

در لیستینگ 16-8، مقداری را از گیرنده در نخ اصلی دریافت می‌کنیم. این شبیه به گرفتن اردک پلاستیکی از آب در انتهای رودخانه یا دریافت یک پیام چت است.

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}
Listing 16-8: Receiving the value “hi” in the main thread and printing it

گیرنده دو متد مفید دارد: recv و try_recv. ما از recv، که مخفف receive است، استفاده می‌کنیم. این متد اجرای نخ اصلی را مسدود کرده و منتظر می‌ماند تا مقداری از طریق کانال ارسال شود. هنگامی که مقداری ارسال شد، recv آن را در یک مقدار Result<T, E> بازمی‌گرداند. وقتی فرستنده بسته می‌شود، recv یک خطا برمی‌گرداند تا نشان دهد که هیچ مقدار دیگری نمی‌آید.

متد try_recv مسدود نمی‌کند، بلکه بلافاصله یک مقدار Result<T, E> بازمی‌گرداند: یک مقدار Ok حاوی یک پیام اگر موجود باشد، و یک مقدار Err اگر این بار هیچ پیامی موجود نباشد. استفاده از try_recv زمانی مفید است که این نخ کار دیگری برای انجام دارد در حالی که منتظر پیام‌ها است: می‌توانیم یک حلقه بنویسیم که هر چند وقت یک بار try_recv را فراخوانی کند، یک پیام را اگر موجود باشد پردازش کند، و در غیر این صورت کار دیگری را برای مدتی انجام دهد تا دوباره بررسی کند.

ما در این مثال برای سادگی از recv استفاده کرده‌ایم؛ نخ اصلی کار دیگری جز منتظر ماندن برای پیام‌ها ندارد، بنابراین مسدود کردن نخ اصلی مناسب است.

وقتی کد موجود در لیستینگ 16-8 را اجرا کنیم، مقدار چاپ‌شده از نخ اصلی را خواهیم دید:

Got: hi

عالی!

کانال‌ها و انتقال مالکیت

قوانین مالکیت نقش حیاتی در ارسال پیام دارند زیرا به شما کمک می‌کنند کد ایمن و همزمان بنویسید. جلوگیری از خطاها در برنامه‌نویسی همزمان مزیت فکر کردن به مالکیت در سراسر برنامه‌های Rust شما است. بیایید یک آزمایش انجام دهیم تا نشان دهیم کانال‌ها و مالکیت چگونه با هم کار می‌کنند تا از مشکلات جلوگیری کنند: ما سعی خواهیم کرد مقدار val را در نخ ایجادشده پس از ارسال آن از طریق کانال استفاده کنیم. کد موجود در لیستینگ 16-9 را کامپایل کنید تا ببینید چرا این کد مجاز نیست:

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {val}");
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}
Listing 16-9: تلاش برای استفاده از val پس از ارسال آن از طریق کانال

در اینجا، ما سعی می‌کنیم val را پس از ارسال آن از طریق tx.send چاپ کنیم. اجازه دادن به این کار ایده بدی خواهد بود: هنگامی که مقدار به نخ دیگری ارسال شده است، آن نخ می‌تواند قبل از اینکه سعی کنیم دوباره از مقدار استفاده کنیم، آن را تغییر دهد یا حذف کند. به طور بالقوه، تغییرات نخ دیگر می‌تواند باعث خطاها یا نتایج غیرمنتظره به دلیل داده‌های ناسازگار یا غیرموجود شود. با این حال، Rust اگر سعی کنیم کد موجود در لیستینگ 16-9 را کامپایل کنیم، به ما خطا می‌دهد:

$ cargo run
   Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:26
   |
8  |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {val}");
   |                          ^^^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)

For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error

اشتباه ما در همزمانی باعث ایجاد یک خطای زمان کامپایل شده است. تابع send مالکیت پارامتر خود را می‌گیرد و وقتی مقدار منتقل می‌شود، گیرنده مالکیت آن را می‌گیرد. این از استفاده تصادفی مجدد مقدار پس از ارسال آن جلوگیری می‌کند؛ سیستم مالکیت بررسی می‌کند که همه چیز درست است.

ارسال مقادیر متعدد و مشاهده انتظار گیرنده

کد موجود در لیستینگ 16-8 کامپایل و اجرا شد، اما به وضوح نشان نمی‌داد که دو نخ جداگانه از طریق کانال با یکدیگر صحبت می‌کنند. در لیستینگ 16-10 تغییراتی اعمال کرده‌ایم که ثابت می‌کند کد موجود در لیستینگ 16-8 به صورت همزمان اجرا می‌شود: نخ ایجادشده اکنون چندین پیام ارسال می‌کند و بین هر پیام یک ثانیه مکث می‌کند.

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }
}
Listing 16-10: Sending multiple messages and pausing between each

این بار، نخ ایجادشده یک بردار از رشته‌هایی دارد که می‌خواهیم به نخ اصلی ارسال کنیم. ما روی آن‌ها پیمایش می‌کنیم، هر کدام را به صورت جداگانه ارسال می‌کنیم و بین هر پیام با فراخوانی تابع thread::sleep با یک مقدار Duration برابر با 1 ثانیه مکث می‌کنیم.

در نخ اصلی، دیگر تابع recv را به طور صریح فراخوانی نمی‌کنیم: در عوض، با rx به عنوان یک تکرارگر رفتار می‌کنیم. برای هر مقداری که دریافت می‌شود، آن را چاپ می‌کنیم. هنگامی که کانال بسته می‌شود، تکرار متوقف خواهد شد.

وقتی کد موجود در لیستینگ 16-10 را اجرا می‌کنید، باید خروجی زیر را ببینید، با یک مکث 1 ثانیه‌ای بین هر خط:

Got: hi
Got: from
Got: the
Got: thread

از آنجا که هیچ کدی در حلقه for نخ اصلی نداریم که مکث یا تأخیری ایجاد کند، می‌توانیم بگوییم که نخ اصلی منتظر دریافت مقادیر از نخ ایجادشده است.

ایجاد تولیدکننده‌های متعدد با کلون کردن فرستنده

قبلاً اشاره کردیم که mpsc مخفف چندین تولیدکننده، یک مصرف‌کننده است. بیایید از mpsc استفاده کنیم و کد موجود در لیستینگ 16-10 را گسترش دهیم تا چندین نخ ایجاد کنیم که همگی مقادیر را به همان گیرنده ارسال می‌کنند. می‌توانیم این کار را با کلون کردن فرستنده انجام دهیم، همان‌طور که در لیستینگ 16-11 نشان داده شده است:

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // --snip--

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }

    // --snip--
}
Listing 16-11: ارسال چندین پیام از چندین تولیدکننده

این بار، قبل از اینکه نخ ایجادشده اول را ایجاد کنیم، روی فرستنده clone فراخوانی می‌کنیم. این کار به ما یک فرستنده جدید می‌دهد که می‌توانیم به نخ ایجادشده اول ارسال کنیم. فرستنده اصلی را به نخ ایجادشده دوم ارسال می‌کنیم. این کار به ما دو نخ می‌دهد که هر کدام پیام‌های مختلفی را به یک گیرنده ارسال می‌کنند.

وقتی کد را اجرا می‌کنید، خروجی شما باید چیزی شبیه به این باشد:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

ممکن است مقادیر را به ترتیب دیگری ببینید، بسته به سیستم شما. این همان چیزی است که همزمانی را هم جالب و هم دشوار می‌کند. اگر با thread::sleep آزمایش کنید و مقادیر مختلفی را در نخ‌های مختلف به آن بدهید، هر اجرا غیرقطعی‌تر خواهد شد و هر بار خروجی متفاوتی ایجاد می‌کند.

اکنون که دیدیم کانال‌ها چگونه کار می‌کنند، بیایید به یک روش دیگر همزمانی نگاهی بیندازیم.