خاموشی و پاکسازی منظم
کدی که در لیستینگ 21-20 وجود دارد، همانطور که انتظار داشتیم، با استفاده از یک مجموعه نخ (thread pool) به درخواستها به صورت غیرهمزمان پاسخ میدهد. ما هشدارهایی در مورد فیلدهای workers
، id
و thread
دریافت میکنیم که به طور مستقیم از آنها استفاده نمیکنیم و به ما یادآوری میکنند که هیچ چیزی را پاکسازی نمیکنیم. وقتی از روش کمظرافت ctrl-c برای متوقف کردن نخ اصلی استفاده میکنیم، تمام نخهای دیگر نیز بلافاصله متوقف میشوند، حتی اگر در میانه ارائه یک درخواست باشند.
سپس، ما Drop
trait را پیادهسازی خواهیم کرد تا join
را روی هر یک از نخهای موجود در مجموعه نخ فراخوانی کنیم تا بتوانند درخواستهایی که در حال کار روی آنها هستند را قبل از بستهشدن تکمیل کنند. سپس روشی برای اطلاع به نخها که نباید درخواستهای جدید بپذیرند و باید خاموش شوند، پیادهسازی خواهیم کرد. برای مشاهده عملکرد این کد، سرور خود را تغییر میدهیم تا فقط دو درخواست را قبل از خاموشی منظم مجموعه نخها بپذیرد.
چیزی که باید توجه داشته باشید این است که هیچکدام از این موارد بخشهایی از کد را که مدیریت اجرای closureها را بر عهده دارند، تحت تأثیر قرار نمیدهند، بنابراین همه چیز در اینجا همانطور باقی میماند اگر از یک مجموعه نخ برای یک runtime غیرهمزمان استفاده میکردیم.
پیادهسازی Drop
Trait روی ThreadPool
بیایید با پیادهسازی Drop
روی مجموعه نخ شروع کنیم. وقتی مجموعه نخ حذف میشود، تمام نخهای ما باید به یکدیگر ملحق شوند تا مطمئن شویم کار خود را تکمیل میکنند. لیستینگ 21-22 اولین تلاش برای پیادهسازی Drop
را نشان میدهد؛ این کد هنوز به درستی کار نخواهد کرد.
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
});
Worker { id, thread }
}
}
ابتدا، ما از میان هر یک از workers
موجود در مجموعه نخ حلقه میزنیم. ما برای این کار از &mut
استفاده میکنیم زیرا self
یک ارجاع قابل تغییر است و ما همچنین نیاز داریم که بتوانیم worker
را تغییر دهیم. برای هر worker
، پیامی چاپ میکنیم که نشان میدهد این worker
خاص در حال خاموششدن است، و سپس join
را روی نخ آن worker
فراخوانی میکنیم. اگر فراخوانی join
شکست بخورد، از unwrap
استفاده میکنیم تا باعث panic شود و خاموشی غیرمنظم اتفاق بیفتد.
اینجا خطایی که هنگام کامپایل این کد دریافت میکنیم آمده است:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
--> src/lib.rs:52:13
|
52 | worker.thread.join().unwrap();
| ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
| |
| move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
|
note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`
--> file:///home/.rustup/toolchains/1.82/lib/rustlib/src/rust/library/std/src/thread/mod.rs:1763:17
|
1763 | pub fn join(self) -> Result<T> {
| ^^^^
For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` (lib) due to 1 previous error
این خطا به ما میگوید که نمیتوانیم join
را فراخوانی کنیم زیرا فقط یک قرض قابل تغییر از هر worker
داریم و join
مالکیت آرگومان خود را میگیرد. برای حل این مشکل، باید نخ را از نمونه Worker
که مالک thread
است خارج کنیم تا join
بتواند نخ را مصرف کند. یک راه برای انجام این کار استفاده از همان رویکردی است که در لیستینگ 18-15 استفاده کردیم. اگر Worker
یک Option<thread::JoinHandle<()>>
نگه میداشت، میتوانستیم با استفاده از متد take
مقدار را از نوع Some
به نوع None
منتقل کنیم.
با این حال، تنها زمانی که این مسئله مطرح میشود زمانی است که Worker
حذف میشود. در عوض، باید با یک Option<thread::JoinHandle<()>>
در همه جا سر و کار داشته باشیم. Rust ایدئوماتیک اغلب از Option
استفاده میکند، اما زمانی که متوجه شوید چیزی را در Option
قرار میدهید به عنوان یک راهحل موقت، حتی اگر بدانید آن مورد همیشه حضور دارد، ایده خوبی است که به دنبال روشهای جایگزین باشید.
در این حالت، یک جایگزین بهتر استفاده از متد Vec::drain
است. این متد یک پارامتر محدوده میگیرد تا مشخص کند کدام آیتمها باید از Vec
حذف شوند و یک تکرارگر از آن آیتمها بازمیگرداند. استفاده از ..
برای محدوده تمام مقادیر را از Vec
حذف خواهد کرد.
بنابراین باید پیادهسازی drop
در ThreadPool
را به این صورت بهروزرسانی کنیم:
#![allow(unused)] fn main() { use std::{ sync::{mpsc, Arc, Mutex}, thread, }; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { for worker in self.workers.drain(..) { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap(); } } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {id} got a job; executing."); job(); }); Worker { id, thread } } } }
این تغییر خطای کامپایلر را برطرف میکند و نیازی به تغییرات دیگر در کد ما ندارد.
علامتدهی به نخها برای توقف گوش دادن به وظایف
با تمام تغییراتی که اعمال کردهایم، کد ما بدون هیچ هشداری کامپایل میشود. با این حال، خبر بد این است که این کد هنوز به درستی کار نمیکند. کلید مشکل در منطق موجود در closureهایی است که توسط نخهای نمونههای Worker
اجرا میشوند: در حال حاضر، ما join
را فراخوانی میکنیم، اما این کار نخها را خاموش نمیکند زیرا آنها برای همیشه در جستجوی وظایف loop
میزنند. اگر با پیادهسازی فعلی drop
، ThreadPool
خود را حذف کنیم، نخ اصلی برای همیشه منتظر میماند تا اولین نخ تکمیل شود.
برای حل این مشکل، باید تغییری در پیادهسازی drop
در ThreadPool
و سپس تغییری در حلقه Worker
ایجاد کنیم.
ابتدا، پیادهسازی drop
در ThreadPool
را تغییر میدهیم تا sender
را قبل از منتظر ماندن برای تکمیل نخها به صورت صریح حذف کنیم. لیستینگ 21-23 تغییرات در ThreadPool
برای حذف صریح sender
را نشان میدهد. برخلاف workers
، اینجا ما باید از یک Option
استفاده کنیم تا بتوانیم sender
را با Option::take
از ThreadPool
منتقل کنیم.
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
// --snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
// --snip--
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in self.workers.drain(..) {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
});
Worker { id, thread }
}
}
sender
قبل از ملحق کردن نخهای workerحذف sender
کانال را میبندد، که نشان میدهد دیگر هیچ پیامی ارسال نخواهد شد. وقتی این اتفاق میافتد، تمام فراخوانیهای recv
که workers در حلقه بینهایت انجام میدهند یک خطا برمیگرداند. در لیستینگ 21-24، حلقه Worker
را تغییر میدهیم تا در چنین حالتی به صورت منظم از حلقه خارج شود، که به این معناست که نخها وقتی پیادهسازی drop
در ThreadPool
روی آنها join
را فراخوانی میکند تکمیل خواهند شد.
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in self.workers.drain(..) {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!("Worker {id} got a job; executing.");
job();
}
Err(_) => {
println!("Worker {id} disconnected; shutting down.");
break;
}
}
});
Worker { id, thread }
}
}
recv
یک خطا برمیگرداندبرای دیدن این کد در عمل، بیایید main
را تغییر دهیم تا فقط دو درخواست را قبل از خاموششدن منظم سرور بپذیرد، همانطور که در لیستینگ 21-25 نشان داده شده است.
use hello::ThreadPool;
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
شما نمیخواهید یک سرور وب واقعی پس از فقط دو درخواست خاموش شود. این کد فقط نشان میدهد که خاموشی منظم و پاکسازی به درستی کار میکند.
متد take
که در trait Iterator
تعریف شده است، تکرار را به حداکثر دو آیتم محدود میکند. ThreadPool
در انتهای main
از محدوده خارج میشود و پیادهسازی drop
اجرا خواهد شد.
سرور را با دستور cargo run
راهاندازی کنید و سه درخواست ارسال کنید. درخواست سوم باید با خطا مواجه شود و در ترمینال خود باید خروجی مشابه زیر را ببینید:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.41s
Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3
ممکن است ترتیب متفاوتی از کارگران و پیامهای چاپشده را مشاهده کنید. از پیامها میتوان فهمید که این کد چگونه کار میکند: کارگران 0 و 3 اولین دو درخواست را دریافت کردند. سرور پس از اتصال دوم دیگر اتصالها را نمیپذیرد و پیادهسازی Drop
روی ThreadPool
شروع به اجرا میکند قبل از اینکه کارگر 3 حتی کار خود را شروع کند. حذف sender
تمام کارگران را قطع کرده و به آنها میگوید که خاموش شوند. هر کارگر هنگام قطع شدن یک پیام چاپ میکند و سپس مجموعه نخ (thread pool) join
را فراخوانی میکند تا منتظر تکمیل هر نخ کارگر بماند.
به یک جنبه جالب از این اجرای خاص توجه کنید: ThreadPool
فرستنده را حذف کرد، و قبل از اینکه هر کارگری خطایی دریافت کند، ما سعی کردیم به کارگر 0 ملحق شویم. کارگر 0 هنوز از recv
خطایی دریافت نکرده بود، بنابراین نخ اصلی منتظر ماند تا کارگر 0 کار خود را به پایان برساند. در همین حال، کارگر 3 یک کار دریافت کرد و سپس تمام نخها خطا دریافت کردند. وقتی کارگر 0 تمام شد، نخ اصلی منتظر ماند تا بقیه کارگران کار خود را تمام کنند. در آن زمان، همه آنها از حلقههای خود خارج شده و متوقف شده بودند.
تبریک میگویم! پروژه خود را کامل کردید؛ ما یک سرور وب ساده داریم که از یک مجموعه نخ برای پاسخدهی غیرهمزمان استفاده میکند. ما توانستیم سرور را به صورت منظم خاموش کنیم و تمام نخها در مجموعه را پاکسازی کنیم.
در اینجا کد کامل برای مرجع آورده شده است:
use hello::ThreadPool;
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!("Worker {id} got a job; executing.");
job();
}
Err(_) => {
println!("Worker {id} disconnected; shutting down.");
break;
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
میتوانستیم بیشتر اینجا انجام دهیم! اگر میخواهید این پروژه را بیشتر گسترش دهید، اینجا چند ایده آمده است:
- مستندات بیشتری به
ThreadPool
و متدهای عمومی آن اضافه کنید. - تستهایی برای عملکرد کتابخانه اضافه کنید.
- فراخوانیهای
unwrap
را به مدیریت خطای قویتر تغییر دهید. - از
ThreadPool
برای انجام برخی کارها به غیر از ارائه درخواستهای وب استفاده کنید. - یک crate مجموعه نخ از crates.io پیدا کنید و یک سرور وب مشابه با استفاده از آن crate پیادهسازی کنید. سپس API و مقاومت آن را با مجموعه نخی که ما پیادهسازی کردیم مقایسه کنید.
خلاصه
آفرین! شما به انتهای این کتاب رسیدید! از شما بابت پیوستن به ما در این سفر با Rust سپاسگزاریم. اکنون آمادهاید که پروژههای Rust خود را پیادهسازی کنید و به پروژههای دیگران کمک کنید. به یاد داشته باشید که جامعهای خوشآمدگوی از Rustaceans وجود دارد که مشتاقانه منتظر کمک به شما در هر چالشی هستند که در مسیر Rust خود با آن مواجه میشوید.