Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

استفاده از پیام‌رسانی برای انتقال داده بین نخ‌ها

یکی از رویکردهای محبوب و فزاینده برای اطمینان از همزمانی ایمن، پیام‌رسانی است، جایی که نخ‌ها یا بازیگران با ارسال پیام‌های حاوی داده به یکدیگر ارتباط برقرار می‌کنند. ایده این رویکرد در یک شعار از مستندات زبان Go آمده است:
«با به اشتراک گذاشتن حافظه ارتباط برقرار نکنید؛ بلکه حافظه را با ارتباط برقرار کردن به اشتراک بگذارید.»

برای دستیابی به هم‌زمانی مبتنی بر ارسال پیام، کتابخانه‌ی استاندارد Rust پیاده‌سازی‌ای از مفهوم channel را فراهم کرده است. یک channel مفهومی کلی در برنامه‌نویسی است که از طریق آن داده‌ها از یک thread به thread دیگر ارسال می‌شوند.

می‌توانید یک کانال در برنامه‌نویسی را مانند یک کانال آبی جهت‌دار، مانند یک جریان یا رودخانه تصور کنید. اگر چیزی مانند یک اردک پلاستیکی را به داخل رودخانه بیندازید، آن اردک به پایین‌دست رودخانه سفر می‌کند و به انتهای آن می‌رسد.

یک channel دو بخش دارد: یک فرستنده (transmitter) و یک گیرنده (receiver). بخش فرستنده مانند نقطه‌ی بالادستی رودخانه‌ای است که در آن اردک پلاستیکی را داخل آب می‌اندازید، و بخش گیرنده جایی‌ست که اردک پلاستیکی در پایین‌دست به آن‌جا می‌رسد. بخشی از کد شما با متدهایی بر روی فرستنده، داده‌هایی را که می‌خواهید ارسال کنید قرار می‌دهد، و بخش دیگر کد بررسی می‌کند که آیا پیامی در سمت گیرنده دریافت شده است یا نه. وقتی که یا فرستنده یا گیرنده (یا هر دو) از بین بروند (drop شوند)، گفته می‌شود که channel بسته شده است.

در این‌جا، قصد داریم برنامه‌ای بسازیم که در آن یک thread وظیفه تولید مقادیر و ارسال آن‌ها از طریق یک channel را دارد، و thread دیگری این مقادیر را دریافت کرده و چاپ می‌کند. برای نمایش این قابلیت، مقادیر ساده‌ای را بین threadها ارسال خواهیم کرد. پس از آشنایی با این تکنیک، می‌توانید از channelها برای هر نوع ارتباط میان threadها استفاده کنید؛ مثلاً در یک سامانه‌ی چت یا سیستمی که در آن چند thread قسمت‌هایی از یک محاسبه را انجام می‌دهند و نتیجه‌ها را به یک thread مرکزی برای تجمیع ارسال می‌کنند.

ابتدا، در لیستینگ 16-6، یک کانال ایجاد می‌کنیم اما هنوز کاری با آن انجام نمی‌دهیم. توجه داشته باشید که این کد هنوز کامپایل نمی‌شود زیرا Rust نمی‌تواند نوع مقادیری که می‌خواهیم از طریق کانال ارسال کنیم را تعیین کند.

Filename: src/main.rs
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}
Listing 16-6: Creating a channel and assigning the two halves to tx and rx

ما یک کانال جدید با استفاده از تابع mpsc::channel ایجاد می‌کنیم؛ mpsc مخفف تولیدکننده‌های چندگانه، مصرف‌کننده تک‌گانه است. به طور خلاصه، نحوه پیاده‌سازی کانال‌ها توسط کتابخانه استاندارد Rust به این معناست که یک کانال می‌تواند چندین انتهای ارسال‌کننده داشته باشد که مقادیر تولید می‌کنند، اما فقط یک انتهای گیرنده که آن مقادیر را مصرف می‌کند. تصور کنید چندین جریان کوچک به یک رودخانه بزرگ می‌ریزند: هر چیزی که در هر یک از جریان‌ها ارسال شود، در نهایت به رودخانه بزرگ در انتها می‌رسد. فعلاً با یک تولیدکننده شروع می‌کنیم، اما وقتی این مثال کار کرد، چندین تولیدکننده اضافه خواهیم کرد.

تابع mpsc::channel یک تاپل برمی‌گرداند که عنصر اول آن، بخش ارسال‌کننده یا همان فرستنده (transmitter)، و عنصر دوم آن، بخش دریافت‌کننده یا همان گیرنده (receiver) است. در بسیاری از حوزه‌ها، اختصارات tx و rx به ترتیب برای transmitter و receiver به کار می‌روند، بنابراین ما نیز نام متغیرها را به همین صورت انتخاب می‌کنیم تا نقش هر بخش را مشخص کنیم. در این‌جا از یک دستور let همراه با یک الگو استفاده کرده‌ایم که تاپل را تجزیه (destructure) می‌کند؛ در فصل ۱۹ درباره‌ی استفاده از الگوها در دستورات let و تجزیه بیشتر صحبت خواهیم کرد. در حال حاضر، تنها کافی‌ست بدانید که استفاده از let به این صورت، روشی مناسب برای استخراج اجزای تاپلی است که تابع mpsc::channel بازمی‌گرداند.

بیایید انتهای ارسال‌کننده را به یک نخ ایجادشده منتقل کنیم و یک رشته ارسال کنیم تا نخ ایجادشده با نخ اصلی ارتباط برقرار کند، همان‌طور که در لیستینگ 16-7 نشان داده شده است. این شبیه به انداختن یک اردک پلاستیکی در رودخانه در بالادست یا ارسال یک پیام چت از یک نخ به نخ دیگر است.

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}
Listing 16-7: انتقال tx به یک نخ ایجادشده و ارسال ‘hi’

باز هم از thread::spawn برای ایجاد یک نخ جدید استفاده می‌کنیم و سپس با استفاده از move، مالکیت tx را به داخل closure منتقل می‌کنیم تا نخ ایجادشده مالک tx شود. نخ ایجادشده باید مالک فرستنده باشد تا بتواند از طریق کانال پیام ارسال کند.

فرستنده دارای متدی به نام send است که مقداری را که می‌خواهیم ارسال کنیم دریافت می‌کند. متد send یک نوع Result<T, E> بازمی‌گرداند؛ بنابراین اگر گیرنده قبلاً حذف شده باشد و دیگر جایی برای ارسال مقدار وجود نداشته باشد، عملیات ارسال منجر به خطا خواهد شد. در این مثال، ما با فراخوانی unwrap در صورت بروز خطا باعث panick شدن برنامه می‌شویم. اما در یک برنامه واقعی، باید این خطا را به‌درستی مدیریت کنیم؛ برای مرور استراتژی‌های مدیریت خطا به فصل ۹ بازگردید.

در لیستینگ 16-8، مقداری را از گیرنده در نخ اصلی دریافت می‌کنیم. این شبیه به گرفتن اردک پلاستیکی از آب در انتهای رودخانه یا دریافت یک پیام چت است.

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}
Listing 16-8: Receiving the value "hi" in the main thread and printing it

گیرنده دو متد مفید دارد: recv و try_recv. ما از recv، که مخفف receive است، استفاده می‌کنیم. این متد اجرای نخ اصلی را مسدود کرده و منتظر می‌ماند تا مقداری از طریق کانال ارسال شود. هنگامی که مقداری ارسال شد، recv آن را در یک مقدار Result<T, E> بازمی‌گرداند. وقتی فرستنده بسته می‌شود، recv یک خطا برمی‌گرداند تا نشان دهد که هیچ مقدار دیگری نمی‌آید.

متد try_recv مسدود نمی‌کند، بلکه بلافاصله یک مقدار Result<T, E> بازمی‌گرداند: یک مقدار Ok حاوی یک پیام اگر موجود باشد، و یک مقدار Err اگر این بار هیچ پیامی موجود نباشد. استفاده از try_recv زمانی مفید است که این نخ کار دیگری برای انجام دارد در حالی که منتظر پیام‌ها است: می‌توانیم یک حلقه بنویسیم که هر چند وقت یک بار try_recv را فراخوانی کند، یک پیام را اگر موجود باشد پردازش کند، و در غیر این صورت کار دیگری را برای مدتی انجام دهد تا دوباره بررسی کند.

ما در این مثال برای سادگی از recv استفاده کرده‌ایم؛ نخ اصلی کار دیگری جز منتظر ماندن برای پیام‌ها ندارد، بنابراین مسدود کردن نخ اصلی مناسب است.

وقتی کد موجود در لیستینگ 16-8 را اجرا کنیم، مقدار چاپ‌شده از نخ اصلی را خواهیم دید:

Got: hi

عالی!

کانال‌ها و انتقال مالکیت

قوانین مالکیت نقش حیاتی‌ای در ارسال پیام ایفا می‌کنند، چرا که به شما کمک می‌کنند تا کدی ایمن و هم‌زمان (concurrent) بنویسید. جلوگیری از بروز خطا در برنامه‌نویسی هم‌زمان یکی از مزایای تفکر بر مبنای مالکیت در سراسر برنامه‌های Rust است. بیایید یک آزمایش انجام دهیم تا ببینیم چگونه کانال‌ها و مالکیت با هم همکاری می‌کنند تا از بروز مشکل جلوگیری شود: در این آزمایش، سعی می‌کنیم از متغیر val در نخ ایجاد‌شده بعد از این‌که آن را از طریق کانال ارسال کرده‌ایم، استفاده کنیم. سعی کنید کدی که در فهرست 16-9 آمده را کامپایل کنید تا ببینید چرا این کد مجاز نیست.

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {val}");
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}
Listing 16-9: تلاش برای استفاده از val پس از ارسال آن از طریق کانال

در اینجا، ما سعی می‌کنیم val را پس از ارسال آن از طریق tx.send چاپ کنیم. اجازه دادن به این کار ایده بدی خواهد بود: هنگامی که مقدار به نخ دیگری ارسال شده است، آن نخ می‌تواند قبل از اینکه سعی کنیم دوباره از مقدار استفاده کنیم، آن را تغییر دهد یا حذف کند. به طور بالقوه، تغییرات نخ دیگر می‌تواند باعث خطاها یا نتایج غیرمنتظره به دلیل داده‌های ناسازگار یا غیرموجود شود. با این حال، Rust اگر سعی کنیم کد موجود در لیستینگ 16-9 را کامپایل کنیم، به ما خطا می‌دهد:

$ cargo run
   Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:26
   |
8  |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {val}");
   |                          ^^^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)

For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error

اشتباه ما در هم‌زمانی منجر به بروز یک خطای زمان کامپایل شده است. تابع send مالکیت پارامتر خود را می‌گیرد، و زمانی که مقدار منتقل می‌شود، دریافت‌کننده مالکیت آن را در اختیار می‌گیرد. این موضوع باعث می‌شود که به‌صورت تصادفی پس از ارسال، دوباره از آن مقدار استفاده نکنیم؛ سیستم مالکیت بررسی می‌کند که همه چیز در وضعیت درستی قرار دارد.

ارسال مقادیر متعدد و مشاهده انتظار گیرنده

کدی که در لیستینگ 16-8 آمده بود کامپایل و اجرا شد، اما به‌صورت واضح به ما نشان نداد که دو نخ مجزا از طریق یک channel با یکدیگر در حال ارتباط هستند.

In Listing 16-10 we’ve made some modifications that will prove the code in Listing 16-8 is running concurrently: the spawned thread will now send multiple messages and pause for a second between each message.

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }
}
Listing 16-10: Sending multiple messages and pausing between each one

این بار، نخی که ایجاد شده (spawned thread) یک وکتور از رشته‌ها (vector of strings) دارد که می‌خواهیم آن‌ها را به نخ اصلی ارسال کنیم. روی این رشته‌ها پیمایش می‌کنیم، هر کدام را به‌صورت جداگانه ارسال می‌کنیم، و بین ارسال هر کدام، با فراخوانی تابع thread::sleep و دادن یک مقدار Duration برابر با یک ثانیه، مکث می‌کنیم.

در نخ اصلی، دیگر تابع recv را به طور صریح فراخوانی نمی‌کنیم: در عوض، با rx به عنوان یک تکرارگر رفتار می‌کنیم. برای هر مقداری که دریافت می‌شود، آن را چاپ می‌کنیم. هنگامی که کانال بسته می‌شود، تکرار متوقف خواهد شد.

هنگام اجرای کد موجود در لیست 16-10، باید خروجی زیر را مشاهده کنید با یک ثانیه توقف بین هر خط:

Got: hi
Got: from
Got: the
Got: thread

از آنجا که هیچ کدی در حلقه for نخ اصلی نداریم که مکث یا تأخیری ایجاد کند، می‌توانیم بگوییم که نخ اصلی منتظر دریافت مقادیر از نخ ایجادشده است.

ایجاد تولیدکننده‌های متعدد با کلون کردن فرستنده

پیش‌تر اشاره کردیم که mpsc مخفف چند تولیدکننده، یک مصرف‌کننده است. بیایید از mpsc استفاده کنیم و کد موجود در لیست 16-10 را گسترش دهیم تا چندین ترد ایجاد کنیم که همگی مقادیر را به یک دریافت‌کننده ارسال می‌کنند. برای این کار می‌توانیم فرستنده را clone کنیم، همان‌طور که در لیست 16-11 نشان داده شده است.

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // --snip--

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }

    // --snip--
}
Listing 16-11: ارسال چندین پیام از چندین تولیدکننده

این بار، قبل از اینکه نخ ایجادشده اول را ایجاد کنیم، روی فرستنده clone فراخوانی می‌کنیم. این کار به ما یک فرستنده جدید می‌دهد که می‌توانیم به نخ ایجادشده اول ارسال کنیم. فرستنده اصلی را به نخ ایجادشده دوم ارسال می‌کنیم. این کار به ما دو نخ می‌دهد که هر کدام پیام‌های مختلفی را به یک گیرنده ارسال می‌کنند.

وقتی کد را اجرا می‌کنید، خروجی شما باید چیزی شبیه به این باشد:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

ممکن است مقادیر را به ترتیب دیگری ببینید، بسته به سیستم شما. این همان چیزی است که همزمانی را هم جالب و هم دشوار می‌کند. اگر با thread::sleep آزمایش کنید و مقادیر مختلفی را در نخ‌های مختلف به آن بدهید، هر اجرا غیرقطعی‌تر خواهد شد و هر بار خروجی متفاوتی ایجاد می‌کند.

اکنون که دیدیم کانال‌ها چگونه کار می‌کنند، بیایید به یک روش دیگر همزمانی نگاهی بیندازیم.