استفاده از پیامرسانی برای انتقال داده بین نخها
یکی از رویکردهای محبوب و فزاینده برای اطمینان از همزمانی ایمن، پیامرسانی است، جایی که نخها یا بازیگران با ارسال پیامهای حاوی داده به یکدیگر ارتباط برقرار میکنند. ایده این رویکرد در یک شعار از مستندات زبان Go آمده است:
«با به اشتراک گذاشتن حافظه ارتباط برقرار نکنید؛ بلکه حافظه را با ارتباط برقرار کردن به اشتراک بگذارید.»
برای رسیدن به همزمانی مبتنی بر ارسال پیام، کتابخانه استاندارد Rust یک پیادهسازی از کانالها ارائه میدهد. کانال یک مفهوم عمومی برنامهنویسی است که دادهها را از یک نخ به نخ دیگر ارسال میکند.
میتوانید یک کانال در برنامهنویسی را مانند یک کانال آبی جهتدار، مانند یک جریان یا رودخانه تصور کنید. اگر چیزی مانند یک اردک پلاستیکی را به داخل رودخانه بیندازید، آن اردک به پاییندست رودخانه سفر میکند و به انتهای آن میرسد.
یک کانال دو نیمه دارد: یک فرستنده و یک گیرنده. نیمه فرستنده محل بالادستی است که اردکهای پلاستیکی را به داخل رودخانه میاندازید، و نیمه گیرنده جایی است که اردک پلاستیکی در پاییندست پایان مییابد. یک بخش از کد شما متدهایی روی فرستنده با دادهای که میخواهید ارسال کنید فراخوانی میکند، و بخش دیگری انتهای گیرنده را برای پیامهای واردشده بررسی میکند. اگر هر یک از نیمههای فرستنده یا گیرنده حذف شوند، کانال به عنوان بستهشده در نظر گرفته میشود.
اینجا، برنامهای ایجاد میکنیم که یک نخ برای تولید مقادیر و ارسال آنها از طریق یک کانال دارد، و نخ دیگری مقادیر را دریافت کرده و چاپ میکند. برای نمایش این ویژگی، مقادیر سادهای بین نخها از طریق یک کانال ارسال خواهیم کرد. پس از آشنایی با این تکنیک، میتوانید از کانالها برای هر نخهایی که نیاز به ارتباط با یکدیگر دارند استفاده کنید، مانند یک سیستم چت یا سیستمی که بسیاری از نخها بخشهایی از یک محاسبه را انجام میدهند و آن بخشها را به یک نخ ارسال میکنند که نتایج را تجمیع میکند.
ابتدا، در لیستینگ 16-6، یک کانال ایجاد میکنیم اما هنوز کاری با آن انجام نمیدهیم. توجه داشته باشید که این کد هنوز کامپایل نمیشود زیرا Rust نمیتواند نوع مقادیری که میخواهیم از طریق کانال ارسال کنیم را تعیین کند.
Filename: src/main.rs
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
}
لیستینگ 16-6: ایجاد یک کانال و اختصاص دو نیمه آن به tx
و rx
ما یک کانال جدید با استفاده از تابع mpsc::channel
ایجاد میکنیم؛ mpsc
مخفف تولیدکنندههای چندگانه، مصرفکننده تکگانه است. به طور خلاصه، نحوه پیادهسازی کانالها توسط کتابخانه استاندارد Rust به این معناست که یک کانال میتواند چندین انتهای ارسالکننده داشته باشد که مقادیر تولید میکنند، اما فقط یک انتهای گیرنده که آن مقادیر را مصرف میکند. تصور کنید چندین جریان کوچک به یک رودخانه بزرگ میریزند: هر چیزی که در هر یک از جریانها ارسال شود، در نهایت به رودخانه بزرگ در انتها میرسد. فعلاً با یک تولیدکننده شروع میکنیم، اما وقتی این مثال کار کرد، چندین تولیدکننده اضافه خواهیم کرد.
تابع mpsc::channel
یک جفت را برمیگرداند که عنصر اول آن انتهای ارسالکننده (فرستنده) و عنصر دوم آن انتهای گیرنده (گیرنده) است. اختصارات tx
و rx
در بسیاری از حوزهها به ترتیب برای فرستنده و گیرنده استفاده میشوند، بنابراین متغیرهای خود را به این نامها مینامیم تا هر انتها را نشان دهیم. ما از یک دستور let
با یک الگو که جفت را تخریب میکند استفاده میکنیم؛ در فصل 19 درباره استفاده از الگوها در دستورات let
و تخریب بحث خواهیم کرد. فعلاً بدانید که استفاده از یک دستور let
به این روش یک رویکرد مناسب برای استخراج قطعات جفت بازگشتی توسط mpsc::channel
است.
بیایید انتهای ارسالکننده را به یک نخ ایجادشده منتقل کنیم و یک رشته ارسال کنیم تا نخ ایجادشده با نخ اصلی ارتباط برقرار کند، همانطور که در لیستینگ 16-7 نشان داده شده است. این شبیه به انداختن یک اردک پلاستیکی در رودخانه در بالادست یا ارسال یک پیام چت از یک نخ به نخ دیگر است.
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); }
tx
به یک نخ ایجادشده و ارسال ‘hi’دوباره از thread::spawn
برای ایجاد یک نخ جدید استفاده میکنیم و سپس از move
برای انتقال tx
به closure استفاده میکنیم تا نخ ایجادشده مالک tx
شود. نخ ایجادشده باید مالک فرستنده باشد تا بتواند پیامها را از طریق کانال ارسال کند. فرستنده یک متد send
دارد که مقداری که میخواهیم ارسال کنیم را میگیرد. متد send
نوع Result<T, E>
را برمیگرداند، بنابراین اگر گیرنده قبلاً حذف شده باشد و جایی برای ارسال مقدار وجود نداشته باشد، عملیات ارسال یک خطا برمیگرداند. در این مثال، ما unwrap
را برای panic در صورت خطا فراخوانی میکنیم. اما در یک برنامه واقعی، باید آن را به درستی مدیریت کنیم: برای مرور استراتژیهای مدیریت خطای مناسب به فصل 9 بازگردید.
در لیستینگ 16-8، مقداری را از گیرنده در نخ اصلی دریافت میکنیم. این شبیه به گرفتن اردک پلاستیکی از آب در انتهای رودخانه یا دریافت یک پیام چت است.
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); let received = rx.recv().unwrap(); println!("Got: {received}"); }
گیرنده دو متد مفید دارد: recv
و try_recv
. ما از recv
، که مخفف receive است، استفاده میکنیم. این متد اجرای نخ اصلی را مسدود کرده و منتظر میماند تا مقداری از طریق کانال ارسال شود. هنگامی که مقداری ارسال شد، recv
آن را در یک مقدار Result<T, E>
بازمیگرداند. وقتی فرستنده بسته میشود، recv
یک خطا برمیگرداند تا نشان دهد که هیچ مقدار دیگری نمیآید.
متد try_recv
مسدود نمیکند، بلکه بلافاصله یک مقدار Result<T, E>
بازمیگرداند: یک مقدار Ok
حاوی یک پیام اگر موجود باشد، و یک مقدار Err
اگر این بار هیچ پیامی موجود نباشد. استفاده از try_recv
زمانی مفید است که این نخ کار دیگری برای انجام دارد در حالی که منتظر پیامها است: میتوانیم یک حلقه بنویسیم که هر چند وقت یک بار try_recv
را فراخوانی کند، یک پیام را اگر موجود باشد پردازش کند، و در غیر این صورت کار دیگری را برای مدتی انجام دهد تا دوباره بررسی کند.
ما در این مثال برای سادگی از recv
استفاده کردهایم؛ نخ اصلی کار دیگری جز منتظر ماندن برای پیامها ندارد، بنابراین مسدود کردن نخ اصلی مناسب است.
وقتی کد موجود در لیستینگ 16-8 را اجرا کنیم، مقدار چاپشده از نخ اصلی را خواهیم دید:
Got: hi
عالی!
کانالها و انتقال مالکیت
قوانین مالکیت نقش حیاتی در ارسال پیام دارند زیرا به شما کمک میکنند کد ایمن و همزمان بنویسید. جلوگیری از خطاها در برنامهنویسی همزمان مزیت فکر کردن به مالکیت در سراسر برنامههای Rust شما است. بیایید یک آزمایش انجام دهیم تا نشان دهیم کانالها و مالکیت چگونه با هم کار میکنند تا از مشکلات جلوگیری کنند: ما سعی خواهیم کرد مقدار val
را در نخ ایجادشده پس از ارسال آن از طریق کانال استفاده کنیم. کد موجود در لیستینگ 16-9 را کامپایل کنید تا ببینید چرا این کد مجاز نیست:
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("val is {val}");
});
let received = rx.recv().unwrap();
println!("Got: {received}");
}
val
پس از ارسال آن از طریق کانالدر اینجا، ما سعی میکنیم val
را پس از ارسال آن از طریق tx.send
چاپ کنیم. اجازه دادن به این کار ایده بدی خواهد بود: هنگامی که مقدار به نخ دیگری ارسال شده است، آن نخ میتواند قبل از اینکه سعی کنیم دوباره از مقدار استفاده کنیم، آن را تغییر دهد یا حذف کند. به طور بالقوه، تغییرات نخ دیگر میتواند باعث خطاها یا نتایج غیرمنتظره به دلیل دادههای ناسازگار یا غیرموجود شود. با این حال، Rust اگر سعی کنیم کد موجود در لیستینگ 16-9 را کامپایل کنیم، به ما خطا میدهد:
$ cargo run
Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
--> src/main.rs:10:26
|
8 | let val = String::from("hi");
| --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9 | tx.send(val).unwrap();
| --- value moved here
10 | println!("val is {val}");
| ^^^^^ value borrowed here after move
|
= note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)
For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error
اشتباه ما در همزمانی باعث ایجاد یک خطای زمان کامپایل شده است. تابع send
مالکیت پارامتر خود را میگیرد و وقتی مقدار منتقل میشود، گیرنده مالکیت آن را میگیرد. این از استفاده تصادفی مجدد مقدار پس از ارسال آن جلوگیری میکند؛ سیستم مالکیت بررسی میکند که همه چیز درست است.
ارسال مقادیر متعدد و مشاهده انتظار گیرنده
کد موجود در لیستینگ 16-8 کامپایل و اجرا شد، اما به وضوح نشان نمیداد که دو نخ جداگانه از طریق کانال با یکدیگر صحبت میکنند. در لیستینگ 16-10 تغییراتی اعمال کردهایم که ثابت میکند کد موجود در لیستینگ 16-8 به صورت همزمان اجرا میشود: نخ ایجادشده اکنون چندین پیام ارسال میکند و بین هر پیام یک ثانیه مکث میکند.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
}
این بار، نخ ایجادشده یک بردار از رشتههایی دارد که میخواهیم به نخ اصلی ارسال کنیم. ما روی آنها پیمایش میکنیم، هر کدام را به صورت جداگانه ارسال میکنیم و بین هر پیام با فراخوانی تابع thread::sleep
با یک مقدار Duration
برابر با 1 ثانیه مکث میکنیم.
در نخ اصلی، دیگر تابع recv
را به طور صریح فراخوانی نمیکنیم: در عوض، با rx
به عنوان یک تکرارگر رفتار میکنیم. برای هر مقداری که دریافت میشود، آن را چاپ میکنیم. هنگامی که کانال بسته میشود، تکرار متوقف خواهد شد.
وقتی کد موجود در لیستینگ 16-10 را اجرا میکنید، باید خروجی زیر را ببینید، با یک مکث 1 ثانیهای بین هر خط:
Got: hi
Got: from
Got: the
Got: thread
از آنجا که هیچ کدی در حلقه for
نخ اصلی نداریم که مکث یا تأخیری ایجاد کند، میتوانیم بگوییم که نخ اصلی منتظر دریافت مقادیر از نخ ایجادشده است.
ایجاد تولیدکنندههای متعدد با کلون کردن فرستنده
قبلاً اشاره کردیم که mpsc
مخفف چندین تولیدکننده، یک مصرفکننده است. بیایید از mpsc
استفاده کنیم و کد موجود در لیستینگ 16-10 را گسترش دهیم تا چندین نخ ایجاد کنیم که همگی مقادیر را به همان گیرنده ارسال میکنند. میتوانیم این کار را با کلون کردن فرستنده انجام دهیم، همانطور که در لیستینگ 16-11 نشان داده شده است:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// --snip--
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
// --snip--
}
این بار، قبل از اینکه نخ ایجادشده اول را ایجاد کنیم، روی فرستنده clone
فراخوانی میکنیم. این کار به ما یک فرستنده جدید میدهد که میتوانیم به نخ ایجادشده اول ارسال کنیم. فرستنده اصلی را به نخ ایجادشده دوم ارسال میکنیم. این کار به ما دو نخ میدهد که هر کدام پیامهای مختلفی را به یک گیرنده ارسال میکنند.
وقتی کد را اجرا میکنید، خروجی شما باید چیزی شبیه به این باشد:
Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you
ممکن است مقادیر را به ترتیب دیگری ببینید، بسته به سیستم شما. این همان چیزی است که همزمانی را هم جالب و هم دشوار میکند. اگر با thread::sleep
آزمایش کنید و مقادیر مختلفی را در نخهای مختلف به آن بدهید، هر اجرا غیرقطعیتر خواهد شد و هر بار خروجی متفاوتی ایجاد میکند.
اکنون که دیدیم کانالها چگونه کار میکنند، بیایید به یک روش دیگر همزمانی نگاهی بیندازیم.