خاموشی و پاک‌سازی منظم

کدی که در لیستینگ 21-20 وجود دارد، همان‌طور که انتظار داشتیم، با استفاده از یک مجموعه نخ (thread pool) به درخواست‌ها به صورت غیرهمزمان پاسخ می‌دهد. ما هشدارهایی در مورد فیلدهای workers، id و thread دریافت می‌کنیم که به طور مستقیم از آن‌ها استفاده نمی‌کنیم و به ما یادآوری می‌کنند که هیچ چیزی را پاک‌سازی نمی‌کنیم. وقتی از روش کم‌ظرافت ctrl-c برای متوقف کردن نخ اصلی استفاده می‌کنیم، تمام نخ‌های دیگر نیز بلافاصله متوقف می‌شوند، حتی اگر در میانه ارائه یک درخواست باشند.

سپس، ما Drop trait را پیاده‌سازی خواهیم کرد تا join را روی هر یک از نخ‌های موجود در مجموعه نخ فراخوانی کنیم تا بتوانند درخواست‌هایی که در حال کار روی آن‌ها هستند را قبل از بسته‌شدن تکمیل کنند. سپس روشی برای اطلاع به نخ‌ها که نباید درخواست‌های جدید بپذیرند و باید خاموش شوند، پیاده‌سازی خواهیم کرد. برای مشاهده عملکرد این کد، سرور خود را تغییر می‌دهیم تا فقط دو درخواست را قبل از خاموشی منظم مجموعه نخ‌ها بپذیرد.

چیزی که باید توجه داشته باشید این است که هیچ‌کدام از این موارد بخش‌هایی از کد را که مدیریت اجرای closureها را بر عهده دارند، تحت تأثیر قرار نمی‌دهند، بنابراین همه چیز در اینجا همان‌طور باقی می‌ماند اگر از یک مجموعه نخ برای یک runtime غیرهمزمان استفاده می‌کردیم.

پیاده‌سازی Drop Trait روی ThreadPool

بیایید با پیاده‌سازی Drop روی مجموعه نخ شروع کنیم. وقتی مجموعه نخ حذف می‌شود، تمام نخ‌های ما باید به یکدیگر ملحق شوند تا مطمئن شویم کار خود را تکمیل می‌کنند. لیستینگ 21-22 اولین تلاش برای پیاده‌سازی Drop را نشان می‌دهد؛ این کد هنوز به درستی کار نخواهد کرد.

Filename: src/lib.rs
use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();

            println!("Worker {id} got a job; executing.");

            job();
        });

        Worker { id, thread }
    }
}
Listing 21-22: ملحق کردن هر نخ وقتی مجموعه نخ از محدوده خارج می‌شود

ابتدا، ما از میان هر یک از workers موجود در مجموعه نخ حلقه می‌زنیم. ما برای این کار از &mut استفاده می‌کنیم زیرا self یک ارجاع قابل تغییر است و ما همچنین نیاز داریم که بتوانیم worker را تغییر دهیم. برای هر worker، پیامی چاپ می‌کنیم که نشان می‌دهد این worker خاص در حال خاموش‌شدن است، و سپس join را روی نخ آن worker فراخوانی می‌کنیم. اگر فراخوانی join شکست بخورد، از unwrap استفاده می‌کنیم تا باعث panic شود و خاموشی غیرمنظم اتفاق بیفتد.

اینجا خطایی که هنگام کامپایل این کد دریافت می‌کنیم آمده است:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
    --> src/lib.rs:52:13
     |
52   |             worker.thread.join().unwrap();
     |             ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
     |             |
     |             move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
     |
note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`
    --> file:///home/.rustup/toolchains/1.82/lib/rustlib/src/rust/library/std/src/thread/mod.rs:1763:17
     |
1763 |     pub fn join(self) -> Result<T> {
     |                 ^^^^

For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` (lib) due to 1 previous error

این خطا به ما می‌گوید که نمی‌توانیم join را فراخوانی کنیم زیرا فقط یک قرض قابل تغییر از هر worker داریم و join مالکیت آرگومان خود را می‌گیرد. برای حل این مشکل، باید نخ را از نمونه Worker که مالک thread است خارج کنیم تا join بتواند نخ را مصرف کند. یک راه برای انجام این کار استفاده از همان رویکردی است که در لیستینگ 18-15 استفاده کردیم. اگر Worker یک Option<thread::JoinHandle<()>> نگه می‌داشت، می‌توانستیم با استفاده از متد take مقدار را از نوع Some به نوع None منتقل کنیم.

با این حال، تنها زمانی که این مسئله مطرح می‌شود زمانی است که Worker حذف می‌شود. در عوض، باید با یک Option<thread::JoinHandle<()>> در همه جا سر و کار داشته باشیم. Rust ایدئوماتیک اغلب از Option استفاده می‌کند، اما زمانی که متوجه شوید چیزی را در Option قرار می‌دهید به عنوان یک راه‌حل موقت، حتی اگر بدانید آن مورد همیشه حضور دارد، ایده خوبی است که به دنبال روش‌های جایگزین باشید.

در این حالت، یک جایگزین بهتر استفاده از متد Vec::drain است. این متد یک پارامتر محدوده می‌گیرد تا مشخص کند کدام آیتم‌ها باید از Vec حذف شوند و یک تکرارگر از آن آیتم‌ها بازمی‌گرداند. استفاده از .. برای محدوده تمام مقادیر را از Vec حذف خواهد کرد.

بنابراین باید پیاده‌سازی drop در ThreadPool را به این صورت به‌روزرسانی کنیم:

Filename: src/lib.rs
#![allow(unused)]
fn main() {
use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in self.workers.drain(..) {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();

            println!("Worker {id} got a job; executing.");

            job();
        });

        Worker { id, thread }
    }
}
}

این تغییر خطای کامپایلر را برطرف می‌کند و نیازی به تغییرات دیگر در کد ما ندارد.

علامت‌دهی به نخ‌ها برای توقف گوش دادن به وظایف

با تمام تغییراتی که اعمال کرده‌ایم، کد ما بدون هیچ هشداری کامپایل می‌شود. با این حال، خبر بد این است که این کد هنوز به درستی کار نمی‌کند. کلید مشکل در منطق موجود در closureهایی است که توسط نخ‌های نمونه‌های Worker اجرا می‌شوند: در حال حاضر، ما join را فراخوانی می‌کنیم، اما این کار نخ‌ها را خاموش نمی‌کند زیرا آن‌ها برای همیشه در جستجوی وظایف loop می‌زنند. اگر با پیاده‌سازی فعلی drop، ThreadPool خود را حذف کنیم، نخ اصلی برای همیشه منتظر می‌ماند تا اولین نخ تکمیل شود.

برای حل این مشکل، باید تغییری در پیاده‌سازی drop در ThreadPool و سپس تغییری در حلقه Worker ایجاد کنیم.

ابتدا، پیاده‌سازی drop در ThreadPool را تغییر می‌دهیم تا sender را قبل از منتظر ماندن برای تکمیل نخ‌ها به صورت صریح حذف کنیم. لیستینگ 21-23 تغییرات در ThreadPool برای حذف صریح sender را نشان می‌دهد. برخلاف workers، اینجا ما باید از یک Option استفاده کنیم تا بتوانیم sender را با Option::take از ThreadPool منتقل کنیم.

Filename: src/lib.rs
use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}
// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        // --snip--

        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in self.workers.drain(..) {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();

            println!("Worker {id} got a job; executing.");

            job();
        });

        Worker { id, thread }
    }
}
Listing 21-23: حذف صریح sender قبل از ملحق کردن نخ‌های worker

حذف sender کانال را می‌بندد، که نشان می‌دهد دیگر هیچ پیامی ارسال نخواهد شد. وقتی این اتفاق می‌افتد، تمام فراخوانی‌های recv که workers در حلقه بی‌نهایت انجام می‌دهند یک خطا برمی‌گرداند. در لیستینگ 21-24، حلقه Worker را تغییر می‌دهیم تا در چنین حالتی به صورت منظم از حلقه خارج شود، که به این معناست که نخ‌ها وقتی پیاده‌سازی drop در ThreadPool روی آن‌ها join را فراخوانی می‌کند تکمیل خواهند شد.

Filename: src/lib.rs
use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in self.workers.drain(..) {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv();

            match message {
                Ok(job) => {
                    println!("Worker {id} got a job; executing.");

                    job();
                }
                Err(_) => {
                    println!("Worker {id} disconnected; shutting down.");
                    break;
                }
            }
        });

        Worker { id, thread }
    }
}
Listing 21-24: خروج صریح از حلقه وقتی recv یک خطا برمی‌گرداند

برای دیدن این کد در عمل، بیایید main را تغییر دهیم تا فقط دو درخواست را قبل از خاموش‌شدن منظم سرور بپذیرد، همان‌طور که در لیستینگ 21-25 نشان داده شده است.

Filename: src/main.rs
use hello::ThreadPool;
use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
Listing 21-25: خاموش‌کردن سرور پس از ارائه دو درخواست با خروج از حلقه

شما نمی‌خواهید یک سرور وب واقعی پس از فقط دو درخواست خاموش شود. این کد فقط نشان می‌دهد که خاموشی منظم و پاک‌سازی به درستی کار می‌کند.

متد take که در trait Iterator تعریف شده است، تکرار را به حداکثر دو آیتم محدود می‌کند. ThreadPool در انتهای main از محدوده خارج می‌شود و پیاده‌سازی drop اجرا خواهد شد.

سرور را با دستور cargo run راه‌اندازی کنید و سه درخواست ارسال کنید. درخواست سوم باید با خطا مواجه شود و در ترمینال خود باید خروجی مشابه زیر را ببینید:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.41s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3

ممکن است ترتیب متفاوتی از کارگران و پیام‌های چاپ‌شده را مشاهده کنید. از پیام‌ها می‌توان فهمید که این کد چگونه کار می‌کند: کارگران 0 و 3 اولین دو درخواست را دریافت کردند. سرور پس از اتصال دوم دیگر اتصال‌ها را نمی‌پذیرد و پیاده‌سازی Drop روی ThreadPool شروع به اجرا می‌کند قبل از اینکه کارگر 3 حتی کار خود را شروع کند. حذف sender تمام کارگران را قطع کرده و به آن‌ها می‌گوید که خاموش شوند. هر کارگر هنگام قطع شدن یک پیام چاپ می‌کند و سپس مجموعه نخ (thread pool) join را فراخوانی می‌کند تا منتظر تکمیل هر نخ کارگر بماند.

به یک جنبه جالب از این اجرای خاص توجه کنید: ThreadPool فرستنده را حذف کرد، و قبل از اینکه هر کارگری خطایی دریافت کند، ما سعی کردیم به کارگر 0 ملحق شویم. کارگر 0 هنوز از recv خطایی دریافت نکرده بود، بنابراین نخ اصلی منتظر ماند تا کارگر 0 کار خود را به پایان برساند. در همین حال، کارگر 3 یک کار دریافت کرد و سپس تمام نخ‌ها خطا دریافت کردند. وقتی کارگر 0 تمام شد، نخ اصلی منتظر ماند تا بقیه کارگران کار خود را تمام کنند. در آن زمان، همه آن‌ها از حلقه‌های خود خارج شده و متوقف شده بودند.

تبریک می‌گویم! پروژه خود را کامل کردید؛ ما یک سرور وب ساده داریم که از یک مجموعه نخ برای پاسخ‌دهی غیرهمزمان استفاده می‌کند. ما توانستیم سرور را به صورت منظم خاموش کنیم و تمام نخ‌ها در مجموعه را پاک‌سازی کنیم.

در اینجا کد کامل برای مرجع آورده شده است:

Filename: src/main.rs
use hello::ThreadPool;
use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
Filename: src/lib.rs
use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv();

            match message {
                Ok(job) => {
                    println!("Worker {id} got a job; executing.");

                    job();
                }
                Err(_) => {
                    println!("Worker {id} disconnected; shutting down.");
                    break;
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

می‌توانستیم بیشتر اینجا انجام دهیم! اگر می‌خواهید این پروژه را بیشتر گسترش دهید، اینجا چند ایده آمده است:

  • مستندات بیشتری به ThreadPool و متدهای عمومی آن اضافه کنید.
  • تست‌هایی برای عملکرد کتابخانه اضافه کنید.
  • فراخوانی‌های unwrap را به مدیریت خطای قوی‌تر تغییر دهید.
  • از ThreadPool برای انجام برخی کارها به غیر از ارائه درخواست‌های وب استفاده کنید.
  • یک crate مجموعه نخ از crates.io پیدا کنید و یک سرور وب مشابه با استفاده از آن crate پیاده‌سازی کنید. سپس API و مقاومت آن را با مجموعه نخی که ما پیاده‌سازی کردیم مقایسه کنید.

خلاصه

آفرین! شما به انتهای این کتاب رسیدید! از شما بابت پیوستن به ما در این سفر با Rust سپاسگزاریم. اکنون آماده‌اید که پروژه‌های Rust خود را پیاده‌سازی کنید و به پروژه‌های دیگران کمک کنید. به یاد داشته باشید که جامعه‌ای خوش‌آمدگوی از Rustaceans وجود دارد که مشتاقانه منتظر کمک به شما در هر چالشی هستند که در مسیر Rust خود با آن مواجه می‌شوید.