خاموشی و پاکسازی منظم
کدی که در فهرست 21-20 آمده است، همانطور که انتظار داشتیم، با استفاده از یک thread pool بهصورت asynchronous به درخواستها پاسخ میدهد. در این میان، هشدارهایی در مورد فیلدهای workers، id و thread دریافت میکنیم که بهطور مستقیم از آنها استفاده نمیشود و این موضوع به ما یادآوری میکند که عملیات پاکسازی یا مدیریت پایانی انجام نشده است. زمانی که از روش نهچندان ظریف Ctrl+C برای متوقف کردن thread اصلی استفاده میکنیم، تمام threadهای دیگر نیز بلافاصله متوقف میشوند، حتی اگر در حال پردازش یک درخواست باشند.
سپس، ما Drop trait را پیادهسازی خواهیم کرد تا join را روی هر یک از نخهای موجود در مجموعه نخ فراخوانی کنیم تا بتوانند درخواستهایی که در حال کار روی آنها هستند را قبل از بستهشدن تکمیل کنند. سپس روشی برای اطلاع به نخها که نباید درخواستهای جدید بپذیرند و باید خاموش شوند، پیادهسازی خواهیم کرد. برای مشاهده عملکرد این کد، سرور خود را تغییر میدهیم تا فقط دو درخواست را قبل از خاموشی منظم مجموعه نخها بپذیرد.
چیزی که باید توجه داشته باشید این است که هیچکدام از این موارد بخشهایی از کد را که مدیریت اجرای closureها را بر عهده دارند، تحت تأثیر قرار نمیدهند، بنابراین همه چیز در اینجا همانطور باقی میماند اگر از یک مجموعه نخ برای یک runtime غیرهمزمان استفاده میکردیم.
پیادهسازی Drop Trait روی ThreadPool
بیایید با پیادهسازی Drop روی مجموعه نخ شروع کنیم. وقتی مجموعه نخ حذف میشود، تمام نخهای ما باید به یکدیگر ملحق شوند تا مطمئن شویم کار خود را تکمیل میکنند. لیستینگ 21-22 اولین تلاش برای پیادهسازی Drop را نشان میدهد؛ این کد هنوز به درستی کار نخواهد کرد.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
ابتدا، از میان تمام workerهای موجود در thread pool یک حلقه اجرا میکنیم. از &mut استفاده میکنیم، زیرا self یک ارجاع قابلتغییر است و همچنین باید بتوانیم worker را نیز تغییر دهیم. برای هر worker، پیامی چاپ میکنیم که نشان دهد این نمونهی خاص از Worker در حال خاموش شدن است، و سپس روی thread مربوط به همان Worker تابع join را فراخوانی میکنیم. اگر فراخوانی join با شکست مواجه شود، از unwrap استفاده میکنیم تا باعث panic در برنامه شود و خاموش شدن برنامه بهصورت نامناسب انجام گیرد.
اینجا خطایی که هنگام کامپایل این کد دریافت میکنیم آمده است:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
--> src/lib.rs:52:13
|
52 | worker.thread.join().unwrap();
| ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
| |
| move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
|
note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`
--> /rustc/4eb161250e340c8f48f66e2b929ef4a5bed7c181/library/std/src/thread/mod.rs:1876:17
For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` (lib) due to 1 previous error
این خطا به ما میگوید که نمیتوانیم join را فراخوانی کنیم زیرا فقط یک قرض قابل تغییر از هر worker داریم و join مالکیت آرگومان خود را میگیرد. برای حل این مشکل، باید نخ را از نمونه Worker که مالک thread است خارج کنیم تا join بتواند نخ را مصرف کند. یک راه برای انجام این کار استفاده از همان رویکردی است که در لیستینگ 18-15 استفاده کردیم. اگر Worker یک Option<thread::JoinHandle<()>> نگه میداشت، میتوانستیم با استفاده از متد take مقدار را از نوع Some به نوع None منتقل کنیم.
با این حال، تنها زمانی که این مسئله مطرح میشود زمانی است که Worker حذف میشود. در عوض، باید با یک Option<thread::JoinHandle<()>> در همه جا سر و کار داشته باشیم. Rust ایدئوماتیک اغلب از Option استفاده میکند، اما زمانی که متوجه شوید چیزی را در Option قرار میدهید به عنوان یک راهحل موقت، حتی اگر بدانید آن مورد همیشه حضور دارد، ایده خوبی است که به دنبال روشهای جایگزین باشید.
در این حالت، یک جایگزین بهتر استفاده از متد Vec::drain است. این متد یک پارامتر محدوده میگیرد تا مشخص کند کدام آیتمها باید از Vec حذف شوند و یک تکرارگر از آن آیتمها بازمیگرداند. استفاده از .. برای محدوده تمام مقادیر را از Vec حذف خواهد کرد.
بنابراین باید پیادهسازی drop در ThreadPool را به این صورت بهروزرسانی کنیم:
#![allow(unused)] fn main() { use std::{ sync::{Arc, Mutex, mpsc}, thread, }; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { for worker in self.workers.drain(..) { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap(); } } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {id} got a job; executing."); job(); } }); Worker { id, thread } } } }
این کار خطای کامپایلر را برطرف میکند و نیازی به هیچ تغییر دیگری در کد ما ندارد. توجه داشته باشید که از آنجا که drop میتواند هنگام panic فراخوانی شود، تابع unwrap نیز ممکن است panic ایجاد کند و باعث double panic شود که در نتیجه، برنامه بلافاصله crash میکند و هرگونه عملیات پاکسازی در حال انجام را متوقف میسازد. این موضوع برای یک برنامهی نمونه قابل قبول است، اما برای کدهای محصولی توصیه نمیشود.
علامتدهی به نخها برای توقف گوش دادن به وظایف
با تمام تغییراتی که اعمال کردهایم، کد ما بدون هیچ
هشداری کامپایل میشود. اما خبر بد این است که
این کد هنوز آنطور که میخواهیم عمل نمیکند. نکتهی
کلیدی در منطق closureهایی است که توسط threadهای
نمونههای Worker اجرا میشوند: در حال حاضر
ما تابع join را فراخوانی میکنیم، اما این باعث
خاموش شدن threadها نمیشود، چون آنها
بهصورت بیپایان در حال loop برای یافتن job هستند.
اگر سعی کنیم ThreadPool را با پیادهسازی فعلی
تابع drop حذف کنیم، thread اصلی برای همیشه
در حالت انتظار باقی میماند تا اولین thread به پایان برسد.
برای حل این مشکل، باید تغییری در پیادهسازی drop در ThreadPool و سپس تغییری در حلقه Worker ایجاد کنیم.
ابتدا پیادهسازی تابع drop برای ThreadPool را
تغییر میدهیم تا پیش از منتظر ماندن برای پایان یافتن
threadها، بهصورت صریح sender را حذف کند.
فهرست 21-23 تغییرات اعمالشده روی ThreadPool را
نشان میدهد که در آن sender بهطور صریح
حذف میشود. برخلاف thread، در اینجا نیاز داریم
که از یک Option استفاده کنیم تا بتوانیم
sender را با استفاده از Option::take از
ساختار ThreadPool بیرون بکشیم.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
// --snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
// --snip--
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in self.workers.drain(..) {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
sender before joining the Worker threadsحذف کردن sender باعث بسته شدن channel میشود،
که این موضوع نشان میدهد دیگر هیچ پیامی ارسال
نخواهد شد. در این حالت، تمام فراخوانیهای recv
که نمونههای Worker درون حلقهی بینهایت انجام
میدهند با خطا بازمیگردند. در فهرست 21-24،
حلقهی Worker را طوری تغییر میدهیم که
در چنین حالتی بهصورت مناسب از حلقه خارج شود،
که به این معناست threadها زمانی پایان مییابند که
تابع drop مربوط به ThreadPool تابع join را
روی آنها فراخوانی کند.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in self.workers.drain(..) {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!("Worker {id} got a job; executing.");
job();
}
Err(_) => {
println!("Worker {id} disconnected; shutting down.");
break;
}
}
}
});
Worker { id, thread }
}
}
recv با خطا بازمیگرددبرای دیدن این کد در عمل، بیایید main را تغییر دهیم تا فقط دو درخواست را قبل از خاموششدن منظم سرور بپذیرد، همانطور که در لیستینگ 21-25 نشان داده شده است.
use hello::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
شما نمیخواهید یک سرور وب واقعی پس از فقط دو درخواست خاموش شود. این کد فقط نشان میدهد که خاموشی منظم و پاکسازی به درستی کار میکند.
متد take که در trait Iterator تعریف شده است، تکرار را به حداکثر دو آیتم محدود میکند. ThreadPool در انتهای main از محدوده خارج میشود و پیادهسازی drop اجرا خواهد شد.
سرور را با دستور cargo run راهاندازی کنید و سه درخواست ارسال کنید. درخواست سوم باید با خطا مواجه شود و در ترمینال خود باید خروجی مشابه زیر را ببینید:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.41s
Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3
ممکن است ترتیب نمایش شناسههای Worker و
پیامهای چاپشده متفاوت باشد. از طریق این پیامها
میتوانیم بفهمیم کد چگونه کار میکند: نمونههای
Worker با شناسههای 0 و 3 اولین دو درخواست را
دریافت کردهاند. سرور پس از اتصال دوم،
پذیرفتن ارتباطهای جدید را متوقف کرده و پیادهسازی
Drop برای ThreadPool پیش از آنکه Worker 3
کار خود را آغاز کند اجرا شده است. حذف کردن
sender باعث قطع ارتباط تمامی نمونههای
Worker میشود و به آنها اطلاع میدهد که
باید خاموش شوند. هر Worker هنگام قطع اتصال،
پیامی چاپ میکند و سپس thread pool تابع join
را فراخوانی میکند تا منتظر پایان thread مربوط به
هر Worker بماند.
به نکتهای جالب در این اجرای خاص توجه کنید:
ThreadPool ابتدا sender را حذف کرده و پیش از
آنکه هیچکدام از Workerها خطایی دریافت کنند،
تلاش کردهایم تا Worker 0 را join کنیم. در آن لحظه،
Worker 0 هنوز خطایی از recv دریافت نکرده
بود، بنابراین thread اصلی منتظر ماند تا Worker 0
به کار خود پایان دهد. در این فاصله، Worker 3
یک job دریافت کرد و سپس همهی threadها
خطا دریافت کردند. وقتی Worker 0 به پایان رسید،
thread اصلی منتظر پایان سایر Workerها ماند.
در آن لحظه، همهی آنها از حلقهی خود خارج شده
و متوقف شده بودند.
تبریک میگویم! پروژه خود را کامل کردید؛ ما یک سرور وب ساده داریم که از یک مجموعه نخ برای پاسخدهی غیرهمزمان استفاده میکند. ما توانستیم سرور را به صورت منظم خاموش کنیم و تمام نخها در مجموعه را پاکسازی کنیم.
در اینجا کد کامل برای مرجع آورده شده است:
use hello::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!("Worker {id} got a job; executing.");
job();
}
Err(_) => {
println!("Worker {id} disconnected; shutting down.");
break;
}
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
میتوانستیم بیشتر اینجا انجام دهیم! اگر میخواهید این پروژه را بیشتر گسترش دهید، اینجا چند ایده آمده است:
- مستندات بیشتری به
ThreadPoolو متدهای عمومی آن اضافه کنید. - تستهایی برای عملکرد کتابخانه اضافه کنید.
- فراخوانیهای
unwrapرا به مدیریت خطای قویتر تغییر دهید. - از
ThreadPoolبرای انجام برخی کارها به غیر از ارائه درخواستهای وب استفاده کنید. - یک crate مجموعه نخ از crates.io پیدا کنید و یک سرور وب مشابه با استفاده از آن crate پیادهسازی کنید. سپس API و مقاومت آن را با مجموعه نخی که ما پیادهسازی کردیم مقایسه کنید.
خلاصه
آفرین! شما تا پایان این کتاب پیش آمدهاید! از اینکه در این سفر با ما همراه بودید صمیمانه سپاسگزاریم. اکنون آمادهاید تا پروژههای Rust خودتان را پیادهسازی کنید و در پروژههای دیگران نیز مشارکت داشته باشید. فراموش نکنید که جامعهای گرم و صمیمی از دیگر Rustaceanها وجود دارد که با آغوش باز آمادهاند در مسیر یادگیری Rust به شما کمک کنند.