خاموشی و پاکسازی منظم
کدی که در فهرست 21-20 آمده است، همانطور که انتظار داشتیم، با استفاده از یک thread pool بهصورت asynchronous به درخواستها پاسخ میدهد. در این میان، هشدارهایی در مورد فیلدهای workers
، id
و thread
دریافت میکنیم که بهطور مستقیم از آنها استفاده نمیشود و این موضوع به ما یادآوری میکند که عملیات پاکسازی یا مدیریت پایانی انجام نشده است. زمانی که از روش نهچندان ظریف Ctrl+C برای متوقف کردن thread اصلی استفاده میکنیم، تمام threadهای دیگر نیز بلافاصله متوقف میشوند، حتی اگر در حال پردازش یک درخواست باشند.
سپس، ما Drop
trait را پیادهسازی خواهیم کرد تا join
را روی هر یک از نخهای موجود در مجموعه نخ فراخوانی کنیم تا بتوانند درخواستهایی که در حال کار روی آنها هستند را قبل از بستهشدن تکمیل کنند. سپس روشی برای اطلاع به نخها که نباید درخواستهای جدید بپذیرند و باید خاموش شوند، پیادهسازی خواهیم کرد. برای مشاهده عملکرد این کد، سرور خود را تغییر میدهیم تا فقط دو درخواست را قبل از خاموشی منظم مجموعه نخها بپذیرد.
چیزی که باید توجه داشته باشید این است که هیچکدام از این موارد بخشهایی از کد را که مدیریت اجرای closureها را بر عهده دارند، تحت تأثیر قرار نمیدهند، بنابراین همه چیز در اینجا همانطور باقی میماند اگر از یک مجموعه نخ برای یک runtime غیرهمزمان استفاده میکردیم.
پیادهسازی Drop
Trait روی ThreadPool
بیایید با پیادهسازی Drop
روی مجموعه نخ شروع کنیم. وقتی مجموعه نخ حذف میشود، تمام نخهای ما باید به یکدیگر ملحق شوند تا مطمئن شویم کار خود را تکمیل میکنند. لیستینگ 21-22 اولین تلاش برای پیادهسازی Drop
را نشان میدهد؛ این کد هنوز به درستی کار نخواهد کرد.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
ابتدا، از میان تمام worker
های موجود در thread pool یک حلقه اجرا میکنیم. از &mut
استفاده میکنیم، زیرا self
یک ارجاع قابلتغییر است و همچنین باید بتوانیم worker
را نیز تغییر دهیم. برای هر worker
، پیامی چاپ میکنیم که نشان دهد این نمونهی خاص از Worker
در حال خاموش شدن است، و سپس روی thread مربوط به همان Worker
تابع join
را فراخوانی میکنیم. اگر فراخوانی join
با شکست مواجه شود، از unwrap
استفاده میکنیم تا باعث panic در برنامه شود و خاموش شدن برنامه بهصورت نامناسب انجام گیرد.
اینجا خطایی که هنگام کامپایل این کد دریافت میکنیم آمده است:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
--> src/lib.rs:52:13
|
52 | worker.thread.join().unwrap();
| ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
| |
| move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
|
note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`
--> /rustc/4eb161250e340c8f48f66e2b929ef4a5bed7c181/library/std/src/thread/mod.rs:1876:17
For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` (lib) due to 1 previous error
این خطا به ما میگوید که نمیتوانیم join
را فراخوانی کنیم زیرا فقط یک قرض قابل تغییر از هر worker
داریم و join
مالکیت آرگومان خود را میگیرد. برای حل این مشکل، باید نخ را از نمونه Worker
که مالک thread
است خارج کنیم تا join
بتواند نخ را مصرف کند. یک راه برای انجام این کار استفاده از همان رویکردی است که در لیستینگ 18-15 استفاده کردیم. اگر Worker
یک Option<thread::JoinHandle<()>>
نگه میداشت، میتوانستیم با استفاده از متد take
مقدار را از نوع Some
به نوع None
منتقل کنیم.
با این حال، تنها زمانی که این مسئله مطرح میشود زمانی است که Worker
حذف میشود. در عوض، باید با یک Option<thread::JoinHandle<()>>
در همه جا سر و کار داشته باشیم. Rust ایدئوماتیک اغلب از Option
استفاده میکند، اما زمانی که متوجه شوید چیزی را در Option
قرار میدهید به عنوان یک راهحل موقت، حتی اگر بدانید آن مورد همیشه حضور دارد، ایده خوبی است که به دنبال روشهای جایگزین باشید.
در این حالت، یک جایگزین بهتر استفاده از متد Vec::drain
است. این متد یک پارامتر محدوده میگیرد تا مشخص کند کدام آیتمها باید از Vec
حذف شوند و یک تکرارگر از آن آیتمها بازمیگرداند. استفاده از ..
برای محدوده تمام مقادیر را از Vec
حذف خواهد کرد.
بنابراین باید پیادهسازی drop
در ThreadPool
را به این صورت بهروزرسانی کنیم:
#![allow(unused)] fn main() { use std::{ sync::{Arc, Mutex, mpsc}, thread, }; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { for worker in self.workers.drain(..) { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap(); } } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {id} got a job; executing."); job(); } }); Worker { id, thread } } } }
این کار خطای کامپایلر را برطرف میکند و نیازی به هیچ تغییر دیگری در کد ما ندارد. توجه داشته باشید که از آنجا که drop میتواند هنگام panic فراخوانی شود، تابع unwrap نیز ممکن است panic ایجاد کند و باعث double panic شود که در نتیجه، برنامه بلافاصله crash میکند و هرگونه عملیات پاکسازی در حال انجام را متوقف میسازد. این موضوع برای یک برنامهی نمونه قابل قبول است، اما برای کدهای محصولی توصیه نمیشود.
علامتدهی به نخها برای توقف گوش دادن به وظایف
با تمام تغییراتی که اعمال کردهایم، کد ما بدون هیچ
هشداری کامپایل میشود. اما خبر بد این است که
این کد هنوز آنطور که میخواهیم عمل نمیکند. نکتهی
کلیدی در منطق closureهایی است که توسط threadهای
نمونههای Worker
اجرا میشوند: در حال حاضر
ما تابع join
را فراخوانی میکنیم، اما این باعث
خاموش شدن threadها نمیشود، چون آنها
بهصورت بیپایان در حال loop
برای یافتن job هستند.
اگر سعی کنیم ThreadPool
را با پیادهسازی فعلی
تابع drop
حذف کنیم، thread اصلی برای همیشه
در حالت انتظار باقی میماند تا اولین thread به پایان برسد.
برای حل این مشکل، باید تغییری در پیادهسازی drop
در ThreadPool
و سپس تغییری در حلقه Worker
ایجاد کنیم.
ابتدا پیادهسازی تابع drop
برای ThreadPool
را
تغییر میدهیم تا پیش از منتظر ماندن برای پایان یافتن
threadها، بهصورت صریح sender
را حذف کند.
فهرست 21-23 تغییرات اعمالشده روی ThreadPool
را
نشان میدهد که در آن sender
بهطور صریح
حذف میشود. برخلاف thread، در اینجا نیاز داریم
که از یک Option
استفاده کنیم تا بتوانیم
sender
را با استفاده از Option::take
از
ساختار ThreadPool
بیرون بکشیم.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
// --snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
// --snip--
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in self.workers.drain(..) {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
sender
before joining the Worker
threadsحذف کردن sender
باعث بسته شدن channel میشود،
که این موضوع نشان میدهد دیگر هیچ پیامی ارسال
نخواهد شد. در این حالت، تمام فراخوانیهای recv
که نمونههای Worker
درون حلقهی بینهایت انجام
میدهند با خطا بازمیگردند. در فهرست 21-24،
حلقهی Worker
را طوری تغییر میدهیم که
در چنین حالتی بهصورت مناسب از حلقه خارج شود،
که به این معناست threadها زمانی پایان مییابند که
تابع drop
مربوط به ThreadPool
تابع join
را
روی آنها فراخوانی کند.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in self.workers.drain(..) {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!("Worker {id} got a job; executing.");
job();
}
Err(_) => {
println!("Worker {id} disconnected; shutting down.");
break;
}
}
}
});
Worker { id, thread }
}
}
recv
با خطا بازمیگرددبرای دیدن این کد در عمل، بیایید main
را تغییر دهیم تا فقط دو درخواست را قبل از خاموششدن منظم سرور بپذیرد، همانطور که در لیستینگ 21-25 نشان داده شده است.
use hello::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
شما نمیخواهید یک سرور وب واقعی پس از فقط دو درخواست خاموش شود. این کد فقط نشان میدهد که خاموشی منظم و پاکسازی به درستی کار میکند.
متد take
که در trait Iterator
تعریف شده است، تکرار را به حداکثر دو آیتم محدود میکند. ThreadPool
در انتهای main
از محدوده خارج میشود و پیادهسازی drop
اجرا خواهد شد.
سرور را با دستور cargo run
راهاندازی کنید و سه درخواست ارسال کنید. درخواست سوم باید با خطا مواجه شود و در ترمینال خود باید خروجی مشابه زیر را ببینید:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.41s
Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3
ممکن است ترتیب نمایش شناسههای Worker
و
پیامهای چاپشده متفاوت باشد. از طریق این پیامها
میتوانیم بفهمیم کد چگونه کار میکند: نمونههای
Worker
با شناسههای 0 و 3 اولین دو درخواست را
دریافت کردهاند. سرور پس از اتصال دوم،
پذیرفتن ارتباطهای جدید را متوقف کرده و پیادهسازی
Drop
برای ThreadPool
پیش از آنکه Worker
3
کار خود را آغاز کند اجرا شده است. حذف کردن
sender
باعث قطع ارتباط تمامی نمونههای
Worker
میشود و به آنها اطلاع میدهد که
باید خاموش شوند. هر Worker
هنگام قطع اتصال،
پیامی چاپ میکند و سپس thread pool تابع join
را فراخوانی میکند تا منتظر پایان thread مربوط به
هر Worker
بماند.
به نکتهای جالب در این اجرای خاص توجه کنید:
ThreadPool
ابتدا sender
را حذف کرده و پیش از
آنکه هیچکدام از Worker
ها خطایی دریافت کنند،
تلاش کردهایم تا Worker
0 را join کنیم. در آن لحظه،
Worker
0 هنوز خطایی از recv
دریافت نکرده
بود، بنابراین thread اصلی منتظر ماند تا Worker
0
به کار خود پایان دهد. در این فاصله، Worker
3
یک job دریافت کرد و سپس همهی threadها
خطا دریافت کردند. وقتی Worker
0 به پایان رسید،
thread اصلی منتظر پایان سایر Worker
ها ماند.
در آن لحظه، همهی آنها از حلقهی خود خارج شده
و متوقف شده بودند.
تبریک میگویم! پروژه خود را کامل کردید؛ ما یک سرور وب ساده داریم که از یک مجموعه نخ برای پاسخدهی غیرهمزمان استفاده میکند. ما توانستیم سرور را به صورت منظم خاموش کنیم و تمام نخها در مجموعه را پاکسازی کنیم.
در اینجا کد کامل برای مرجع آورده شده است:
use hello::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!("Worker {id} got a job; executing.");
job();
}
Err(_) => {
println!("Worker {id} disconnected; shutting down.");
break;
}
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
میتوانستیم بیشتر اینجا انجام دهیم! اگر میخواهید این پروژه را بیشتر گسترش دهید، اینجا چند ایده آمده است:
- مستندات بیشتری به
ThreadPool
و متدهای عمومی آن اضافه کنید. - تستهایی برای عملکرد کتابخانه اضافه کنید.
- فراخوانیهای
unwrap
را به مدیریت خطای قویتر تغییر دهید. - از
ThreadPool
برای انجام برخی کارها به غیر از ارائه درخواستهای وب استفاده کنید. - یک crate مجموعه نخ از crates.io پیدا کنید و یک سرور وب مشابه با استفاده از آن crate پیادهسازی کنید. سپس API و مقاومت آن را با مجموعه نخی که ما پیادهسازی کردیم مقایسه کنید.
خلاصه
آفرین! شما تا پایان این کتاب پیش آمدهاید! از اینکه در این سفر با ما همراه بودید صمیمانه سپاسگزاریم. اکنون آمادهاید تا پروژههای Rust خودتان را پیادهسازی کنید و در پروژههای دیگران نیز مشارکت داشته باشید. فراموش نکنید که جامعهای گرم و صمیمی از دیگر Rustaceanها وجود دارد که با آغوش باز آمادهاند در مسیر یادگیری Rust به شما کمک کنند.