استفاده از پیامرسانی برای انتقال داده بین نخها
یکی از رویکردهای محبوب و فزاینده برای اطمینان از همزمانی ایمن، پیامرسانی است، جایی که نخها یا بازیگران با ارسال پیامهای حاوی داده به یکدیگر ارتباط برقرار میکنند. ایده این رویکرد در یک شعار از مستندات زبان Go آمده است:
«با به اشتراک گذاشتن حافظه ارتباط برقرار نکنید؛ بلکه حافظه را با ارتباط برقرار کردن به اشتراک بگذارید.»
برای دستیابی به همزمانی مبتنی بر ارسال پیام، کتابخانهی استاندارد Rust پیادهسازیای از مفهوم channel را فراهم کرده است. یک channel مفهومی کلی در برنامهنویسی است که از طریق آن دادهها از یک thread به thread دیگر ارسال میشوند.
میتوانید یک کانال در برنامهنویسی را مانند یک کانال آبی جهتدار، مانند یک جریان یا رودخانه تصور کنید. اگر چیزی مانند یک اردک پلاستیکی را به داخل رودخانه بیندازید، آن اردک به پاییندست رودخانه سفر میکند و به انتهای آن میرسد.
یک channel دو بخش دارد: یک فرستنده (transmitter) و یک گیرنده (receiver). بخش فرستنده مانند نقطهی بالادستی رودخانهای است که در آن اردک پلاستیکی را داخل آب میاندازید، و بخش گیرنده جاییست که اردک پلاستیکی در پاییندست به آنجا میرسد. بخشی از کد شما با متدهایی بر روی فرستنده، دادههایی را که میخواهید ارسال کنید قرار میدهد، و بخش دیگر کد بررسی میکند که آیا پیامی در سمت گیرنده دریافت شده است یا نه. وقتی که یا فرستنده یا گیرنده (یا هر دو) از بین بروند (drop شوند)، گفته میشود که channel بسته شده است.
در اینجا، قصد داریم برنامهای بسازیم که در آن یک thread وظیفه تولید مقادیر و ارسال آنها از طریق یک channel را دارد، و thread دیگری این مقادیر را دریافت کرده و چاپ میکند. برای نمایش این قابلیت، مقادیر سادهای را بین threadها ارسال خواهیم کرد. پس از آشنایی با این تکنیک، میتوانید از channelها برای هر نوع ارتباط میان threadها استفاده کنید؛ مثلاً در یک سامانهی چت یا سیستمی که در آن چند thread قسمتهایی از یک محاسبه را انجام میدهند و نتیجهها را به یک thread مرکزی برای تجمیع ارسال میکنند.
ابتدا، در لیستینگ 16-6، یک کانال ایجاد میکنیم اما هنوز کاری با آن انجام نمیدهیم. توجه داشته باشید که این کد هنوز کامپایل نمیشود زیرا Rust نمیتواند نوع مقادیری که میخواهیم از طریق کانال ارسال کنیم را تعیین کند.
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
}
tx
and rx
ما یک کانال جدید با استفاده از تابع mpsc::channel
ایجاد میکنیم؛ mpsc
مخفف تولیدکنندههای چندگانه، مصرفکننده تکگانه است. به طور خلاصه، نحوه پیادهسازی کانالها توسط کتابخانه استاندارد Rust به این معناست که یک کانال میتواند چندین انتهای ارسالکننده داشته باشد که مقادیر تولید میکنند، اما فقط یک انتهای گیرنده که آن مقادیر را مصرف میکند. تصور کنید چندین جریان کوچک به یک رودخانه بزرگ میریزند: هر چیزی که در هر یک از جریانها ارسال شود، در نهایت به رودخانه بزرگ در انتها میرسد. فعلاً با یک تولیدکننده شروع میکنیم، اما وقتی این مثال کار کرد، چندین تولیدکننده اضافه خواهیم کرد.
تابع mpsc::channel
یک تاپل برمیگرداند که عنصر اول آن، بخش ارسالکننده یا همان فرستنده (transmitter)، و عنصر دوم آن، بخش دریافتکننده یا همان گیرنده (receiver) است. در بسیاری از حوزهها، اختصارات tx
و rx
به ترتیب برای transmitter و receiver به کار میروند، بنابراین ما نیز نام متغیرها را به همین صورت انتخاب میکنیم تا نقش هر بخش را مشخص کنیم. در اینجا از یک دستور let
همراه با یک الگو استفاده کردهایم که تاپل را تجزیه (destructure) میکند؛ در فصل ۱۹ دربارهی استفاده از الگوها در دستورات let
و تجزیه بیشتر صحبت خواهیم کرد. در حال حاضر، تنها کافیست بدانید که استفاده از let
به این صورت، روشی مناسب برای استخراج اجزای تاپلی است که تابع mpsc::channel
بازمیگرداند.
بیایید انتهای ارسالکننده را به یک نخ ایجادشده منتقل کنیم و یک رشته ارسال کنیم تا نخ ایجادشده با نخ اصلی ارتباط برقرار کند، همانطور که در لیستینگ 16-7 نشان داده شده است. این شبیه به انداختن یک اردک پلاستیکی در رودخانه در بالادست یا ارسال یک پیام چت از یک نخ به نخ دیگر است.
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); }
tx
به یک نخ ایجادشده و ارسال ‘hi’باز هم از thread::spawn
برای ایجاد یک نخ جدید استفاده میکنیم و سپس با استفاده از move
، مالکیت tx
را به داخل closure منتقل میکنیم تا نخ ایجادشده مالک tx
شود. نخ ایجادشده باید مالک فرستنده باشد تا بتواند از طریق کانال پیام ارسال کند.
فرستنده دارای متدی به نام send
است که مقداری را که میخواهیم ارسال کنیم دریافت میکند. متد send
یک نوع Result<T, E>
بازمیگرداند؛ بنابراین اگر گیرنده قبلاً حذف شده باشد و دیگر جایی برای ارسال مقدار وجود نداشته باشد، عملیات ارسال منجر به خطا خواهد شد. در این مثال، ما با فراخوانی unwrap
در صورت بروز خطا باعث panick شدن برنامه میشویم. اما در یک برنامه واقعی، باید این خطا را بهدرستی مدیریت کنیم؛ برای مرور استراتژیهای مدیریت خطا به فصل ۹ بازگردید.
در لیستینگ 16-8، مقداری را از گیرنده در نخ اصلی دریافت میکنیم. این شبیه به گرفتن اردک پلاستیکی از آب در انتهای رودخانه یا دریافت یک پیام چت است.
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); let received = rx.recv().unwrap(); println!("Got: {received}"); }
"hi"
in the main thread and printing itگیرنده دو متد مفید دارد: recv
و try_recv
. ما از recv
، که مخفف receive است، استفاده میکنیم. این متد اجرای نخ اصلی را مسدود کرده و منتظر میماند تا مقداری از طریق کانال ارسال شود. هنگامی که مقداری ارسال شد، recv
آن را در یک مقدار Result<T, E>
بازمیگرداند. وقتی فرستنده بسته میشود، recv
یک خطا برمیگرداند تا نشان دهد که هیچ مقدار دیگری نمیآید.
متد try_recv
مسدود نمیکند، بلکه بلافاصله یک مقدار Result<T, E>
بازمیگرداند: یک مقدار Ok
حاوی یک پیام اگر موجود باشد، و یک مقدار Err
اگر این بار هیچ پیامی موجود نباشد. استفاده از try_recv
زمانی مفید است که این نخ کار دیگری برای انجام دارد در حالی که منتظر پیامها است: میتوانیم یک حلقه بنویسیم که هر چند وقت یک بار try_recv
را فراخوانی کند، یک پیام را اگر موجود باشد پردازش کند، و در غیر این صورت کار دیگری را برای مدتی انجام دهد تا دوباره بررسی کند.
ما در این مثال برای سادگی از recv
استفاده کردهایم؛ نخ اصلی کار دیگری جز منتظر ماندن برای پیامها ندارد، بنابراین مسدود کردن نخ اصلی مناسب است.
وقتی کد موجود در لیستینگ 16-8 را اجرا کنیم، مقدار چاپشده از نخ اصلی را خواهیم دید:
Got: hi
عالی!
کانالها و انتقال مالکیت
قوانین مالکیت نقش حیاتیای در ارسال پیام ایفا میکنند، چرا که به شما کمک میکنند تا کدی ایمن و همزمان (concurrent) بنویسید. جلوگیری از بروز خطا در برنامهنویسی همزمان یکی از مزایای تفکر بر مبنای مالکیت در سراسر برنامههای Rust است. بیایید یک آزمایش انجام دهیم تا ببینیم چگونه کانالها و مالکیت با هم همکاری میکنند تا از بروز مشکل جلوگیری شود: در این آزمایش، سعی میکنیم از متغیر val
در نخ ایجادشده بعد از اینکه آن را از طریق کانال ارسال کردهایم، استفاده کنیم. سعی کنید کدی که در فهرست 16-9 آمده را کامپایل کنید تا ببینید چرا این کد مجاز نیست.
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("val is {val}");
});
let received = rx.recv().unwrap();
println!("Got: {received}");
}
val
پس از ارسال آن از طریق کانالدر اینجا، ما سعی میکنیم val
را پس از ارسال آن از طریق tx.send
چاپ کنیم. اجازه دادن به این کار ایده بدی خواهد بود: هنگامی که مقدار به نخ دیگری ارسال شده است، آن نخ میتواند قبل از اینکه سعی کنیم دوباره از مقدار استفاده کنیم، آن را تغییر دهد یا حذف کند. به طور بالقوه، تغییرات نخ دیگر میتواند باعث خطاها یا نتایج غیرمنتظره به دلیل دادههای ناسازگار یا غیرموجود شود. با این حال، Rust اگر سعی کنیم کد موجود در لیستینگ 16-9 را کامپایل کنیم، به ما خطا میدهد:
$ cargo run
Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
--> src/main.rs:10:26
|
8 | let val = String::from("hi");
| --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9 | tx.send(val).unwrap();
| --- value moved here
10 | println!("val is {val}");
| ^^^^^ value borrowed here after move
|
= note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)
For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error
اشتباه ما در همزمانی منجر به بروز یک خطای زمان کامپایل شده است. تابع send
مالکیت پارامتر خود را میگیرد، و زمانی که مقدار منتقل میشود، دریافتکننده مالکیت آن را در اختیار میگیرد. این موضوع باعث میشود که بهصورت تصادفی پس از ارسال، دوباره از آن مقدار استفاده نکنیم؛ سیستم مالکیت بررسی میکند که همه چیز در وضعیت درستی قرار دارد.
ارسال مقادیر متعدد و مشاهده انتظار گیرنده
کدی که در لیستینگ 16-8 آمده بود کامپایل و اجرا شد، اما بهصورت واضح به ما نشان نداد که دو نخ مجزا از طریق یک channel با یکدیگر در حال ارتباط هستند.
In Listing 16-10 we’ve made some modifications that will prove the code in Listing 16-8 is running concurrently: the spawned thread will now send multiple messages and pause for a second between each message.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
}
این بار، نخی که ایجاد شده (spawned thread) یک وکتور از رشتهها (vector of strings
) دارد که میخواهیم آنها را به نخ اصلی ارسال کنیم. روی این رشتهها پیمایش میکنیم، هر کدام را بهصورت جداگانه ارسال میکنیم، و بین ارسال هر کدام، با فراخوانی تابع thread::sleep
و دادن یک مقدار Duration
برابر با یک ثانیه، مکث میکنیم.
در نخ اصلی، دیگر تابع recv
را به طور صریح فراخوانی نمیکنیم: در عوض، با rx
به عنوان یک تکرارگر رفتار میکنیم. برای هر مقداری که دریافت میشود، آن را چاپ میکنیم. هنگامی که کانال بسته میشود، تکرار متوقف خواهد شد.
هنگام اجرای کد موجود در لیست 16-10، باید خروجی زیر را مشاهده کنید با یک ثانیه توقف بین هر خط:
Got: hi
Got: from
Got: the
Got: thread
از آنجا که هیچ کدی در حلقه for
نخ اصلی نداریم که مکث یا تأخیری ایجاد کند، میتوانیم بگوییم که نخ اصلی منتظر دریافت مقادیر از نخ ایجادشده است.
ایجاد تولیدکنندههای متعدد با کلون کردن فرستنده
پیشتر اشاره کردیم که mpsc
مخفف چند تولیدکننده، یک مصرفکننده است.
بیایید از mpsc
استفاده کنیم و کد موجود در لیست 16-10 را گسترش دهیم تا چندین ترد ایجاد کنیم
که همگی مقادیر را به یک دریافتکننده ارسال میکنند.
برای این کار میتوانیم فرستنده را clone کنیم، همانطور که در لیست 16-11 نشان داده شده است.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// --snip--
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
// --snip--
}
این بار، قبل از اینکه نخ ایجادشده اول را ایجاد کنیم، روی فرستنده clone
فراخوانی میکنیم. این کار به ما یک فرستنده جدید میدهد که میتوانیم به نخ ایجادشده اول ارسال کنیم. فرستنده اصلی را به نخ ایجادشده دوم ارسال میکنیم. این کار به ما دو نخ میدهد که هر کدام پیامهای مختلفی را به یک گیرنده ارسال میکنند.
وقتی کد را اجرا میکنید، خروجی شما باید چیزی شبیه به این باشد:
Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you
ممکن است مقادیر را به ترتیب دیگری ببینید، بسته به سیستم شما. این همان چیزی است که همزمانی را هم جالب و هم دشوار میکند. اگر با thread::sleep
آزمایش کنید و مقادیر مختلفی را در نخهای مختلف به آن بدهید، هر اجرا غیرقطعیتر خواهد شد و هر بار خروجی متفاوتی ایجاد میکند.
اکنون که دیدیم کانالها چگونه کار میکنند، بیایید به یک روش دیگر همزمانی نگاهی بیندازیم.