تبدیل سرور Single-Threaded به یک سرور Multithreaded

در حال حاضر، سرور هر درخواست را به نوبت پردازش می‌کند، به این معنی که تا زمانی که پردازش اولین اتصال تمام نشده باشد، اتصال دوم پردازش نمی‌شود. اگر سرور درخواست‌های بیشتری دریافت کند، این اجرای سریال کمتر و کمتر بهینه خواهد بود. اگر سرور درخواستی دریافت کند که پردازش آن زمان زیادی می‌برد، درخواست‌های بعدی باید منتظر بمانند تا درخواست طولانی تمام شود، حتی اگر بتوان درخواست‌های جدید را به سرعت پردازش کرد. ما باید این مشکل را رفع کنیم، اما ابتدا به این مشکل در عمل نگاه می‌کنیم.

شبیه‌سازی یک درخواست کند در پیاده‌سازی فعلی سرور

ما بررسی می‌کنیم که چگونه یک درخواست با پردازش کند می‌تواند بر سایر درخواست‌های ارسال‌شده به پیاده‌سازی فعلی سرور تأثیر بگذارد. لیست ۲۱-۱۰ پیاده‌سازی مدیریت یک درخواست به /sleep را نشان می‌دهد که یک پاسخ کند شبیه‌سازی‌شده است و باعث می‌شود سرور قبل از پاسخ دادن به مدت ۵ ثانیه بخوابد.

Filename: src/main.rs
use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
// --snip--

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    // --snip--

    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    // --snip--

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
Listing 21-10: شبیه‌سازی یک درخواست کند با خوابیدن سرور به مدت ۵ ثانیه

ما از if به match تغییر داده‌ایم زیرا اکنون سه حالت داریم. باید به‌طور صریح روی یک برش از request_line الگو تطابق ایجاد کنیم تا مقادیر رشته‌ای را تطابق دهیم؛ match به طور خودکار مرجع‌دهی و عدم مرجع‌دهی را مانند متد برابری انجام نمی‌دهد.

بازوی اول همان بلوک if از لیست ۲۱-۹ است. بازوی دوم یک درخواست به /sleep را تطابق می‌دهد. وقتی آن درخواست دریافت شود، سرور به مدت ۵ ثانیه می‌خوابد قبل از اینکه صفحه HTML موفقیت‌آمیز را نمایش دهد. بازوی سوم همان بلوک else از لیست ۲۱-۹ است.

می‌توانید ببینید که سرور ما چقدر ابتدایی است: کتابخانه‌های واقعی مدیریت تشخیص درخواست‌های متعدد را به روشی بسیار کمتر پرحرف انجام می‌دهند!

سرور را با استفاده از cargo run اجرا کنید. سپس دو پنجره مرورگر باز کنید: یکی برای آدرس http://127.0.0.1:7878/ و دیگری برای آدرس http://127.0.0.1:7878/sleep. اگر چند بار آدرس / را وارد کنید، می‌بینید که سریع پاسخ می‌دهد. اما اگر آدرس /sleep را وارد کنید و سپس / را بارگذاری کنید، خواهید دید که / منتظر می‌ماند تا درخواست /sleep برای ۵ ثانیه کامل بخوابد و سپس بارگذاری شود.

بهبود توان عملیاتی با یک Thread Pool

یک Thread Pool گروهی از Threadهای ایجادشده است که منتظر و آماده برای مدیریت یک وظیفه هستند. وقتی برنامه یک وظیفه جدید دریافت می‌کند، یکی از Threadهای موجود در Pool به وظیفه اختصاص داده می‌شود و آن Thread وظیفه را پردازش می‌کند. Threadهای باقی‌مانده در Pool در دسترس هستند تا هر وظیفه دیگری که وارد شود را در حالی که Thread اول وظیفه خود را پردازش می‌کند، مدیریت کنند. وقتی Thread اول پردازش وظیفه خود را به پایان می‌رساند، به Pool Threadهای بیکار بازمی‌گردد و آماده برای مدیریت یک وظیفه جدید است. یک Thread Pool به شما امکان می‌دهد اتصالات را به صورت همزمان پردازش کنید و توان عملیاتی سرور خود را افزایش دهید.

ما تعداد Threadهای موجود در Pool را به یک عدد کوچک محدود خواهیم کرد تا از حملات Denial of Service (DoS) محافظت کنیم؛ اگر برنامه ما برای هر درخواست جدید یک Thread ایجاد کند، کسی که ۱۰ میلیون درخواست به سرور ما ارسال کند می‌تواند با استفاده از تمام منابع سرور، پردازش درخواست‌ها را متوقف کند.

به جای ایجاد تعداد نامحدودی از Threadها، تعداد ثابتی از Threadها را در Pool خواهیم داشت که منتظر پردازش وظایف هستند. درخواست‌هایی که وارد می‌شوند به Pool ارسال می‌شوند. Pool یک صف از درخواست‌های ورودی را مدیریت خواهد کرد. هر یک از Threadها در Pool یک درخواست از صف برداشته، درخواست را پردازش می‌کند و سپس درخواست دیگری از صف درخواست می‌کند. با این طراحی، می‌توانیم حداکثر تا N درخواست را به صورت همزمان پردازش کنیم، جایی که N تعداد Threadها است. اگر هر Thread به یک درخواست طولانی پاسخ دهد، درخواست‌های بعدی ممکن است در صف پشتیبانی شوند، اما تعداد درخواست‌های طولانی که می‌توانیم قبل از رسیدن به این نقطه مدیریت کنیم افزایش یافته است.

این تکنیک تنها یکی از راه‌های بهبود توان عملیاتی یک وب سرور است. گزینه‌های دیگری که ممکن است بررسی کنید شامل مدل fork/join، مدل I/O async تک‌Threaded، یا مدل I/O async چندThreaded هستند. اگر به این موضوع علاقه دارید، می‌توانید بیشتر در مورد راه‌حل‌های دیگر بخوانید و آن‌ها را پیاده‌سازی کنید؛ با یک زبان سطح پایین مانند Rust، همه این گزینه‌ها ممکن هستند.

پیش از آنکه پیاده‌سازی یک Thread Pool را شروع کنیم، بیایید در مورد نحوه استفاده از Pool صحبت کنیم. وقتی قصد طراحی کدی را دارید، ابتدا نوشتن رابط کاربری (client interface) می‌تواند به طراحی شما کمک کند. API کد را به گونه‌ای بنویسید که ساختاری برای نحوه فراخوانی آن داشته باشد؛ سپس قابلیت‌ها را در آن ساختار پیاده‌سازی کنید به جای اینکه ابتدا قابلیت‌ها را پیاده‌سازی کنید و سپس API عمومی را طراحی کنید.

مشابه روش توسعه مبتنی بر تست که در پروژه فصل ۱۲ استفاده کردیم، اینجا از توسعه مبتنی بر کامپایلر استفاده می‌کنیم. کدی را که توابع مورد نظرمان را فراخوانی می‌کند، می‌نویسیم و سپس به خطاهای کامپایلر نگاه می‌کنیم تا مشخص کنیم چه تغییراتی باید انجام دهیم تا کد کار کند. با این حال، پیش از انجام این کار، روش دیگری را که قرار نیست استفاده کنیم، به عنوان نقطه شروع بررسی خواهیم کرد.

ایجاد یک Thread جدید برای هر درخواست

ابتدا، بیایید بررسی کنیم که اگر کد ما برای هر اتصال یک Thread جدید ایجاد کند، چگونه به نظر می‌رسد. همان‌طور که قبلاً ذکر شد، این طرح نهایی ما نیست به دلیل مشکلاتی که ممکن است با ایجاد تعداد نامحدودی از Threadها پیش بیاید، اما این یک نقطه شروع برای ایجاد یک سرور Multithreaded کارا است. سپس Thread Pool را به عنوان یک بهبود اضافه خواهیم کرد، و مقایسه این دو راه‌حل آسان‌تر خواهد بود. لیست ۲۱-۱۱ تغییراتی را که باید در main انجام دهیم تا برای هر جریان در حلقه for یک Thread جدید ایجاد کنیم، نشان می‌دهد.

Filename: src/main.rs
use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
Listing 21-11: ایجاد یک Thread جدید برای هر جریان

همان‌طور که در فصل ۱۶ یاد گرفتید، thread::spawn یک Thread جدید ایجاد کرده و سپس کد موجود در کلوزر را در Thread جدید اجرا می‌کند. اگر این کد را اجرا کنید و در مرورگر خود /sleep را باز کنید، سپس / را در دو تب دیگر باز کنید، خواهید دید که درخواست‌های / لازم نیست منتظر پایان درخواست /sleep باشند. با این حال، همان‌طور که ذکر شد، این روش در نهایت سیستم را تحت فشار قرار می‌دهد زیرا شما تعداد نامحدودی Thread بدون محدودیت ایجاد می‌کنید.

ممکن است به یاد بیاورید که این دقیقاً همان شرایطی است که async و await در آن می‌درخشند! این نکته را در ذهن داشته باشید در حالی که Thread Pool را می‌سازیم و به این فکر کنید که چگونه این شرایط با async متفاوت یا مشابه خواهد بود.

Creating a Finite Number of Threads

ما می‌خواهیم Thread Pool ما به روشی مشابه و آشنا کار کند، به طوری که تغییر از استفاده از Threadها به Thread Pool نیاز به تغییرات زیادی در کدی که از API ما استفاده می‌کند نداشته باشد. لیست ۲۱-۱۲ رابط فرضی برای یک ساختار ThreadPool را نشان می‌دهد که می‌خواهیم به جای thread::spawn استفاده کنیم.

Filename: src/main.rs
use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
Listing 21-12: رابط ایده‌آل برای ThreadPool

ما از ThreadPool::new برای ایجاد یک Thread Pool جدید با تعداد قابل تنظیم Threadها استفاده می‌کنیم، که در اینجا چهار است. سپس، در حلقه for، متد pool.execute رابطی مشابه با thread::spawn دارد، به طوری که یک Closure را می‌گیرد که Pool باید برای هر جریان اجرا کند. ما نیاز داریم pool.execute را پیاده‌سازی کنیم تا Closure را بگیرد و به یکی از Threadهای موجود در Pool برای اجرا بدهد. این کد هنوز کامپایل نمی‌شود، اما آن را امتحان می‌کنیم تا کامپایلر راهنمایی کند که چگونه آن را اصلاح کنیم.

ساخت ThreadPool با استفاده از توسعه مبتنی بر کامپایلر

تغییرات لیست ۲۱-۱۲ را در فایل src/main.rs اعمال کنید و سپس از خطاهای کامپایلر که توسط cargo check ارائه می‌شود برای هدایت توسعه استفاده کنید. اولین خطایی که دریافت می‌کنیم به صورت زیر است:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
  --> src/main.rs:11:16
   |
11 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type `ThreadPool`

For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error

عالی! این خطا به ما می‌گوید که به یک نوع یا ماژول ThreadPool نیاز داریم، بنابراین اکنون آن را خواهیم ساخت. پیاده‌سازی ThreadPool ما مستقل از نوع کاری است که وب سرور ما انجام می‌دهد. بنابراین، بیایید crate hello را از یک crate باینری به یک crate کتابخانه‌ای تغییر دهیم تا پیاده‌سازی ThreadPool خود را در آن قرار دهیم. پس از تغییر به یک crate کتابخانه‌ای، می‌توانیم از کتابخانه Thread Pool جداگانه برای هر کاری که می‌خواهیم با استفاده از Thread Pool انجام دهیم استفاده کنیم، نه فقط برای سرویس‌دهی به درخواست‌های وب.

فایلی به نام src/lib.rs ایجاد کنید که شامل تعریف زیر باشد، که ساده‌ترین تعریف ممکن برای یک ساختار ThreadPool است:

Filename: src/lib.rs
pub struct ThreadPool;

سپس فایل main.rs را ویرایش کنید تا ThreadPool را از crate کتابخانه‌ای وارد دامنه کنید. برای این کار کد زیر را به بالای فایل src/main.rs اضافه کنید:

Filename: src/main.rs
use hello::ThreadPool;
use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

این کد همچنان کار نخواهد کرد، اما بیایید دوباره آن را بررسی کنیم تا خطای بعدی که باید برطرف کنیم را ببینیم:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
  --> src/main.rs:12:28
   |
12 |     let pool = ThreadPool::new(4);
   |                            ^^^ function or associated item not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error

این خطا نشان می‌دهد که باید تابع وابسته‌ای به نام new برای ThreadPool ایجاد کنیم. همچنین می‌دانیم که new باید یک پارامتر داشته باشد که بتواند مقدار 4 را به عنوان آرگومان بپذیرد و یک نمونه از ThreadPool بازگرداند. بیایید ساده‌ترین تابع new که این خصوصیات را دارد پیاده‌سازی کنیم:

Filename: src/lib.rs
pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

ما نوع usize را برای پارامتر size انتخاب کردیم، زیرا می‌دانیم که تعداد منفی Threadها منطقی نیست. همچنین می‌دانیم که این مقدار 4 را به عنوان تعداد عناصر در یک مجموعه از Threadها استفاده خواهیم کرد، که نوع usize برای آن مناسب است، همان‌طور که در بخش “نوع‌های عدد صحیح” از فصل ۳ توضیح داده شد.

بیایید دوباره کد را بررسی کنیم:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
  --> src/main.rs:17:14
   |
17 |         pool.execute(|| {
   |         -----^^^^^^^ method not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error

اکنون خطا به این دلیل است که متد execute روی ThreadPool تعریف نشده است. به یاد بیاورید که در بخش “ایجاد تعداد محدودی از Threadها” تصمیم گرفتیم که Thread Pool ما باید رابطی مشابه thread::spawn داشته باشد. علاوه بر این، متد execute را طوری پیاده‌سازی خواهیم کرد که Closure داده شده را بگیرد و آن را به یک Thread بیکار در Pool برای اجرا بدهد.

ما متد execute را روی ThreadPool تعریف می‌کنیم تا یک Closure را به عنوان پارامتر بپذیرد. به یاد بیاورید که در بخش “انتقال مقادیر گرفته‌شده از Closure و ویژگی‌های Fn از فصل ۱۳ توضیح داده شد که می‌توانیم Closureها را با سه ویژگی مختلف به عنوان پارامتر بپذیریم: Fn، FnMut، و FnOnce. باید تصمیم بگیریم که در اینجا از کدام نوع Closure استفاده کنیم. می‌دانیم که چیزی مشابه با پیاده‌سازی thread::spawn در کتابخانه استاندارد انجام خواهیم داد، بنابراین می‌توانیم به محدودیت‌های امضای thread::spawn روی پارامترش نگاه کنیم. مستندات به ما موارد زیر را نشان می‌دهد:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

نوع پارامتر F همان چیزی است که در اینجا به آن توجه داریم؛ پارامتر نوع T مربوط به مقدار بازگشتی است و ما به آن توجه نداریم. می‌توانیم ببینیم که spawn از FnOnce به عنوان محدودیت ویژگی روی F استفاده می‌کند. این احتمالاً چیزی است که ما نیز می‌خواهیم، زیرا در نهایت آرگومان دریافتی در execute را به spawn پاس می‌دهیم. ما اطمینان بیشتری داریم که FnOnce همان ویژگی مورد نظر ما است، زیرا Thread برای اجرای یک درخواست فقط Closure مربوط به آن درخواست را یک بار اجرا می‌کند، که با “Once” در FnOnce مطابقت دارد.

پارامتر نوع F همچنین دارای محدودیت ویژگی Send و محدودیت طول عمر 'static است، که در وضعیت ما مفید هستند: ما به Send نیاز داریم تا Closure را از یک Thread به Thread دیگر منتقل کنیم و به 'static نیاز داریم زیرا نمی‌دانیم اجرای Thread چه مدت طول می‌کشد. بیایید یک متد execute روی ThreadPool ایجاد کنیم که یک پارامتر عمومی از نوع F با این محدودیت‌ها بپذیرد:

Filename: src/lib.rs
pub struct ThreadPool;

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

ما همچنان از () پس از FnOnce استفاده می‌کنیم زیرا این FnOnce نشان‌دهنده یک Closure است که هیچ پارامتری نمی‌گیرد و نوع () را بازمی‌گرداند. درست مانند تعریف توابع، می‌توان نوع بازگشتی را از امضا حذف کرد، اما حتی اگر هیچ پارامتری نداشته باشیم، همچنان به پرانتزها نیاز داریم.

دوباره، این ساده‌ترین پیاده‌سازی ممکن برای متد execute است: این متد هیچ کاری انجام نمی‌دهد، اما ما فقط تلاش می‌کنیم کد خود را کامپایل کنیم. بیایید دوباره کد را بررسی کنیم:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s

کد کامپایل می‌شود! اما توجه داشته باشید که اگر cargo run را اجرا کنید و در مرورگر یک درخواست ارسال کنید، خطاهایی را در مرورگر خواهید دید که در ابتدای فصل دیده بودیم. کتابخانه ما هنوز Closure پاس‌داده‌شده به execute را فراخوانی نمی‌کند!

نکته: یک ضرب‌المثل درباره زبان‌هایی با کامپایلرهای سخت‌گیر، مانند Haskell و Rust، این است که “اگر کد کامپایل شود، کار می‌کند.” اما این ضرب‌المثل همیشه درست نیست. پروژه ما کامپایل می‌شود، اما هیچ کاری انجام نمی‌دهد! اگر در حال ساخت یک پروژه واقعی و کامل بودیم، اکنون زمان خوبی برای شروع نوشتن تست‌های واحد بود تا بررسی کنیم که کد هم کامپایل می‌شود و رفتار مورد نظر ما را دارد.

توجه: اگر قصد داشتیم به جای یک Closure، یک future اجرا کنیم، چه تفاوتی در اینجا وجود داشت؟

اعتبارسنجی تعداد Threadها در new

در حال حاضر، ما هیچ کاری با پارامترهای new و execute انجام نمی‌دهیم. بیایید بدنه این توابع را با رفتار مورد نظر خود پیاده‌سازی کنیم. ابتدا، به تابع new فکر کنیم. قبلاً یک نوع عدد صحیح بدون علامت برای پارامتر size انتخاب کردیم، زیرا یک Pool با تعداد منفی Thread منطقی نیست. با این حال، یک Pool با صفر Thread نیز منطقی نیست، اما صفر یک مقدار معتبر برای usize است. کدی اضافه خواهیم کرد تا بررسی کند که مقدار size بیشتر از صفر باشد قبل از اینکه یک نمونه از ThreadPool بازگردانیم و در صورت دریافت مقدار صفر، برنامه با استفاده از ماکروی assert! متوقف شود، همان‌طور که در لیست ۲۱-۱۳ نشان داده شده است.

Filename: src/lib.rs
pub struct ThreadPool;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        ThreadPool
    }

    // --snip--
    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}
Listing 21-13: پیاده‌سازی ThreadPool::new برای توقف برنامه در صورت صفر بودن size

ما همچنین برخی مستندات برای ThreadPool خود با استفاده از نظرات داکیومنت (doc comments) اضافه کرده‌ایم. توجه داشته باشید که ما از اصول خوب مستندسازی پیروی کرده‌ایم و بخشی را اضافه کرده‌ایم که شرایطی که تابع ما ممکن است به وحشت بیفتد (panic) را توضیح می‌دهد، همان‌طور که در فصل ۱۴ مورد بحث قرار گرفت. دستور cargo doc --open را اجرا کنید و روی ساختار ThreadPool کلیک کنید تا ببینید مستندات تولیدشده برای new چگونه به نظر می‌رسند!

به جای اضافه کردن ماکروی assert! همان‌طور که اینجا انجام دادیم، می‌توانستیم new را به build تغییر دهیم و یک Result بازگردانیم، مانند آنچه با Config::build در پروژه I/O در لیست ۱۲-۹ انجام دادیم. اما در این مورد تصمیم گرفته‌ایم که تلاش برای ایجاد یک Thread Pool بدون هیچ Threadی باید یک خطای غیرقابل بازیابی باشد. اگر احساس جاه‌طلبی می‌کنید، سعی کنید تابعی به نام build با امضای زیر بنویسید تا با تابع new مقایسه کنید:

pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {

ایجاد فضایی برای ذخیره Threadها

اکنون که روشی برای اطمینان از تعداد معتبر Threadهایی که در Pool ذخیره می‌شوند داریم، می‌توانیم این Threadها را ایجاد کرده و آن‌ها را در ساختار ThreadPool قبل از بازگرداندن ساختار ذخیره کنیم. اما چگونه می‌توانیم یک Thread را “ذخیره” کنیم؟ بیایید دوباره به امضای thread::spawn نگاه کنیم:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

تابع spawn یک JoinHandle<T> بازمی‌گرداند، جایی که T نوعی است که Closure بازمی‌گرداند. بیایید ما هم از JoinHandle استفاده کنیم و ببینیم چه اتفاقی می‌افتد. در مورد ما، Closureهایی که به Thread Pool ارسال می‌کنیم اتصال را مدیریت کرده و چیزی بازنمی‌گردانند، بنابراین T برابر با نوع واحد () خواهد بود.

کد موجود در لیست ۲۱-۱۴ کامپایل می‌شود اما هنوز هیچ Threadی ایجاد نمی‌کند. ما تعریف ThreadPool را تغییر داده‌ایم تا یک بردار از نمونه‌های thread::JoinHandle<()> را نگه دارد، بردار را با ظرفیتی برابر با size مقداردهی اولیه کرده‌ایم، یک حلقه for تنظیم کرده‌ایم که کدی برای ایجاد Threadها اجرا می‌کند، و یک نمونه از ThreadPool که آن‌ها را در خود دارد بازمی‌گرداند.

Filename: src/lib.rs
use std::thread;

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // create some threads and store them in the vector
        }

        ThreadPool { threads }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}
Listing 21-14: ایجاد یک بردار برای ThreadPool برای نگهداری Threadها

ما std::thread را در crate کتابخانه‌ای وارد دامنه کرده‌ایم، زیرا از thread::JoinHandle به عنوان نوع آیتم‌های موجود در بردار در ThreadPool استفاده می‌کنیم.

هنگامی که یک مقدار معتبر برای size دریافت شود، ThreadPool ما یک بردار جدید ایجاد می‌کند که می‌تواند size آیتم را در خود جای دهد. تابع with_capacity همان کار Vec::new را انجام می‌دهد اما با یک تفاوت مهم: فضای لازم را از قبل در بردار تخصیص می‌دهد. چون می‌دانیم که باید size عنصر را در بردار ذخیره کنیم، انجام این تخصیص از ابتدا کمی کارآمدتر از استفاده از Vec::new است که خودش در حین اضافه شدن عناصر تغییر اندازه می‌دهد.

وقتی دوباره cargo check را اجرا کنید، باید با موفقیت انجام شود.

ساختار Worker مسئول ارسال کد از ThreadPool به یک Thread

در حلقه for در لیست ۲۱-۱۴، نظری در مورد ایجاد Threadها گذاشتیم. در اینجا بررسی خواهیم کرد که چگونه واقعاً Threadها را ایجاد می‌کنیم. کتابخانه استاندارد thread::spawn را به عنوان روشی برای ایجاد Threadها ارائه می‌دهد، و thread::spawn انتظار دارد کدی دریافت کند که Thread بلافاصله پس از ایجاد اجرا کند. با این حال، در مورد ما، می‌خواهیم Threadها را ایجاد کنیم و آن‌ها را منتظر نگه داریم تا کدی که بعداً ارسال می‌کنیم را اجرا کنند. پیاده‌سازی Threadها در کتابخانه استاندارد هیچ راهی برای انجام این کار ارائه نمی‌دهد؛ بنابراین باید آن را به صورت دستی پیاده‌سازی کنیم.

ما این رفتار را با معرفی یک ساختار داده جدید بین ThreadPool و Threadها که این رفتار جدید را مدیریت می‌کند، پیاده‌سازی خواهیم کرد. این ساختار داده جدید را Worker می‌نامیم که یک اصطلاح رایج در پیاده‌سازی‌های Pool است. Worker کدی را که باید اجرا شود دریافت می‌کند و آن را در Thread مربوط به Worker اجرا می‌کند. می‌توانید به افرادی که در آشپزخانه یک رستوران کار می‌کنند فکر کنید: Workerها منتظر می‌مانند تا سفارش‌هایی از مشتریان دریافت کنند، و سپس مسئول گرفتن این سفارش‌ها و انجام آن‌ها هستند.

به جای ذخیره یک بردار از نمونه‌های JoinHandle<()> در Thread Pool، ما نمونه‌هایی از ساختار Worker را ذخیره خواهیم کرد. هر Worker یک نمونه JoinHandle<()> را نگه می‌دارد. سپس یک متد روی Worker پیاده‌سازی خواهیم کرد که یک Closure از کد برای اجرا بگیرد و آن را به Thread در حال اجرای Worker برای اجرا ارسال کند. همچنین به هر Worker یک id اختصاص می‌دهیم تا هنگام ثبت لاگ یا اشکال‌زدایی بتوانیم بین Workerهای مختلف در Pool تمایز قائل شویم.

این فرآیند جدیدی است که هنگام ایجاد یک ThreadPool اتفاق می‌افتد. کدی که Closure را به Thread ارسال می‌کند، پس از تنظیم Worker به این شکل پیاده‌سازی خواهد شد:

  1. تعریف یک ساختار Worker که یک id و یک JoinHandle<()> نگه می‌دارد.
  2. تغییر ThreadPool به طوری که یک بردار از نمونه‌های Worker را ذخیره کند.
  3. تعریف یک تابع Worker::new که یک عدد id می‌گیرد و یک نمونه Worker بازمی‌گرداند که شامل id و یک Thread ایجادشده با یک Closure خالی است.
  4. در ThreadPool::new، از شمارنده حلقه for برای تولید یک id استفاده کرده، یک Worker جدید با آن id ایجاد کرده و Worker را در بردار ذخیره می‌کنیم.

اگر آماده یک چالش هستید، سعی کنید این تغییرات را خودتان پیاده‌سازی کنید قبل از اینکه به کد موجود در لیست ۲۱-۱۵ نگاه کنید.

آماده‌اید؟ در اینجا لیست ۲۱-۱۵ با یک روش برای انجام اصلاحات قبلی آورده شده است.

Filename: src/lib.rs
use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}
Listing 21-15: تغییر ThreadPool برای نگهداری نمونه‌های Worker به جای نگهداری مستقیم Threadها

ما نام فیلد موجود در ThreadPool را از threads به workers تغییر داده‌ایم زیرا اکنون نمونه‌های Worker را نگه می‌دارد، نه نمونه‌های JoinHandle<()>. از شمارنده حلقه for به عنوان آرگومان برای Worker::new استفاده می‌کنیم و هر Worker جدید را در بردار به نام workers ذخیره می‌کنیم.

کد خارجی (مانند سرور ما در src/main.rs) نیازی ندارد جزئیات پیاده‌سازی مربوط به استفاده از ساختار Worker در داخل ThreadPool را بداند، بنابراین ساختار Worker و تابع new آن را خصوصی می‌کنیم. تابع Worker::new از id داده‌شده استفاده کرده و یک نمونه JoinHandle<()> ایجاد می‌کند که با ایجاد یک Thread جدید با یک Closure خالی تولید می‌شود.

نکته: اگر سیستم‌عامل نتواند به دلیل کمبود منابع سیستم، یک Thread ایجاد کند، thread::spawn به وحشت خواهد افتاد (panic). این باعث می‌شود کل سرور ما به وحشت بیفتد، حتی اگر ایجاد برخی Threadها موفق باشد. برای سادگی، این رفتار مشکلی ندارد، اما در یک پیاده‌سازی تولیدی برای Thread Pool، احتمالاً از std::thread::Builder و متد spawn که یک Result بازمی‌گرداند، استفاده می‌کنید.

این کد کامپایل خواهد شد و تعداد نمونه‌های Worker را که به عنوان آرگومان به ThreadPool::new مشخص کرده‌ایم ذخیره می‌کند. اما ما هنوز Closureی که در execute دریافت می‌کنیم را پردازش نمی‌کنیم. بیایید بررسی کنیم چگونه این کار را انجام دهیم.

ارسال درخواست‌ها به Threadها از طریق Channelها

مشکل بعدی که به آن می‌پردازیم این است که Closureهایی که به thread::spawn داده شده‌اند، هیچ کاری انجام نمی‌دهند. در حال حاضر، Closureی که می‌خواهیم اجرا کنیم را در متد execute دریافت می‌کنیم. اما نیاز داریم که یک Closure به thread::spawn بدهیم تا در هنگام ایجاد هر Worker در حین ایجاد ThreadPool اجرا شود.

می‌خواهیم ساختارهای Worker که به تازگی ایجاد کرده‌ایم، کدی را که باید اجرا شود از یک صف که در ThreadPool نگهداری می‌شود دریافت کرده و آن کد را به Thread خود برای اجرا ارسال کنند.

Channelهایی که در فصل ۱۶ یاد گرفتیم—راهی ساده برای ارتباط بین دو Thread—برای این مورد استفاده مناسب هستند. ما از یک Channel به عنوان صف کارها استفاده خواهیم کرد و execute یک کار را از ThreadPool به نمونه‌های Worker ارسال می‌کند، که این کار را به Thread خود ارسال می‌کنند. برنامه به شرح زیر خواهد بود:

  1. ThreadPool یک Channel ایجاد کرده و نگهدارنده sender آن خواهد بود.
  2. هر Worker نگهدارنده receiver خواهد بود.
  3. یک ساختار Job جدید ایجاد خواهیم کرد که Closureهایی که می‌خواهیم از طریق Channel ارسال کنیم را نگه می‌دارد.
  4. متد execute کاری که می‌خواهد اجرا کند را از طریق sender ارسال خواهد کرد.
  5. در Thread خود، Worker بر receiver خود حلقه زده و Closureهای هر کاری که دریافت می‌کند را اجرا خواهد کرد.

بیایید با ایجاد یک Channel در ThreadPool::new و نگهداری sender در نمونه ThreadPool شروع کنیم، همان‌طور که در لیست ۲۱-۱۶ نشان داده شده است. ساختار Job در حال حاضر چیزی نگه نمی‌دارد، اما نوع آیتمی خواهد بود که از طریق Channel ارسال می‌کنیم.

Filename: src/lib.rs
use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}
Listing 21-16: تغییر ThreadPool برای ذخیره sender یک Channel که نمونه‌های Job را منتقل می‌کند

در ThreadPool::new، یک Channel جدید ایجاد می‌کنیم و Pool نگهدارنده sender خواهد بود. این کد با موفقیت کامپایل می‌شود.

بیایید تلاش کنیم یک receiver از Channel را به هر Worker در هنگام ایجاد Channel توسط Thread Pool ارسال کنیم. می‌دانیم که می‌خواهیم receiver را در Threadی که Workerها ایجاد می‌کنند استفاده کنیم، بنابراین به پارامتر receiver در Closure ارجاع می‌دهیم. کد موجود در لیست ۲۱-۱۷ هنوز کاملاً کامپایل نخواهد شد.

Filename: src/lib.rs
use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, receiver));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--


struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}
Listing 21-17: ارسال receiver به Workerها

ما تغییرات کوچک و واضحی ایجاد کرده‌ایم: receiver را به Worker::new ارسال کرده‌ایم و سپس از آن در داخل Closure استفاده کرده‌ایم.

هنگامی که تلاش می‌کنیم این کد را بررسی کنیم، با این خطا مواجه می‌شویم:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:26:42
   |
21 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 |         for id in 0..size {
   |         ----------------- inside of this loop
26 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here, in previous iteration of loop
   |
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
  --> src/lib.rs:47:33
   |
47 |     fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
   |        --- in this method       ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
   |
25 ~         let mut value = Worker::new(id, receiver);
26 ~         for id in 0..size {
27 ~             workers.push(value);
   |

For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error

کد در تلاش است receiver را به چندین نمونه Worker منتقل کند. این کار امکان‌پذیر نیست، همان‌طور که در فصل ۱۶ بحث شد: پیاده‌سازی کانال (channel) که Rust ارائه می‌دهد، از نوع چند تولیدکننده (multiple producer) و یک مصرف‌کننده (single consumer) است. این به این معنی است که نمی‌توانیم به سادگی بخش مصرف‌کننده کانال را برای رفع این کد کپی کنیم. همچنین نمی‌خواهیم یک پیام را چندین بار به چند مصرف‌کننده ارسال کنیم؛ بلکه می‌خواهیم یک لیست از پیام‌ها داشته باشیم که چندین Worker آن را پردازش کنند به‌گونه‌ای که هر پیام فقط یک بار پردازش شود.

علاوه بر این، برداشتن یک کار از صف کانال شامل تغییر receiver می‌شود، بنابراین Threadها به یک روش امن برای اشتراک و تغییر receiver نیاز دارند؛ در غیر این صورت، ممکن است با شرایط رقابتی (race conditions) مواجه شویم (همان‌طور که در فصل ۱۶ توضیح داده شد).

با یادآوری اشاره‌گر (Pointer)های هوشمند ایمن برای Threadها که در فصل ۱۶ معرفی شدند: برای اشتراک مالکیت میان چندین Thread و اجازه تغییر مقدار، نیاز به استفاده از Arc<Mutex<T>> داریم. نوع Arc به چندین Worker اجازه می‌دهد مالکیت receiver را به اشتراک بگذارند و Mutex تضمین می‌کند که فقط یک Worker در هر لحظه یک کار را از receiver دریافت کند. لیست ۲۱-۱۸ تغییراتی را که باید اعمال کنیم نشان می‌دهد.

Filename: src/lib.rs
use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};
// --snip--

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}
Listing 21-18: اشتراک‌گذاری receiver بین Workerها با استفاده از Arc و Mutex

در ThreadPool::new، receiver را در یک Arc و یک Mutex قرار می‌دهیم. برای هر Worker جدید، Arc را کپی می‌کنیم تا شمارنده مرجع افزایش یابد و Workerها بتوانند مالکیت receiver را به اشتراک بگذارند.

با این تغییرات، کد کامپایل می‌شود! به نتیجه نزدیک‌تر می‌شویم!

پیاده‌سازی متد execute

در نهایت، بیایید متد execute را روی ThreadPool پیاده‌سازی کنیم. همچنین Job را از یک ساختار به یک نام مستعار نوع (type alias) برای یک شیء ویژگی تغییر خواهیم داد که نوع Closureی که execute دریافت می‌کند را نگه می‌دارد. همان‌طور که در بخش “ایجاد مترادف‌های نوع با نام مستعار” از فصل ۲۰ بحث شد، نام‌های مستعار نوع به ما امکان می‌دهند تایپ‌های طولانی را برای استفاده آسان‌تر کوتاه کنیم. به لیست ۲۱-۱۹ نگاه کنید.

Filename: src/lib.rs
use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}
Listing 21-19: ایجاد یک نام مستعار Job برای یک Box که هر Closure را نگه می‌دارد و سپس ارسال کار از طریق کانال

پس از ایجاد یک نمونه جدید Job با استفاده از Closureی که در execute دریافت می‌کنیم، آن کار را از طریق بخش ارسال‌کننده کانال ارسال می‌کنیم. ما برای حالتی که ارسال شکست بخورد، روی send از unwrap استفاده می‌کنیم. این حالت ممکن است رخ دهد، اگر مثلاً همه Threadهای ما از اجرا متوقف شوند، به این معنی که بخش دریافت‌کننده دیگر پیام‌های جدید را دریافت نمی‌کند. در حال حاضر، نمی‌توانیم Threadهای خود را از اجرا متوقف کنیم: Threadهای ما تا زمانی که Pool وجود دارد اجرا می‌شوند. دلیل استفاده از unwrap این است که می‌دانیم حالت شکست رخ نخواهد داد، اما کامپایلر این موضوع را نمی‌داند.

اما هنوز کاملاً کار تمام نشده است! در Worker، Closureی که به thread::spawn ارسال می‌شود همچنان فقط به بخش دریافت‌کننده کانال اشاره می‌کند. در عوض، باید Closure به طور مداوم حلقه بزند، از بخش دریافت‌کننده کانال درخواست یک کار کند و کار را هنگام دریافت اجرا کند. بیایید تغییرات نشان داده‌شده در لیست ۲۱-۲۰ را به Worker::new اعمال کنیم.

Filename: src/lib.rs
use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();

            println!("Worker {id} got a job; executing.");

            job();
        });

        Worker { id, thread }
    }
}
Listing 21-20: دریافت و اجرای کارها در Thread مربوط به Worker

در اینجا، ابتدا lock را روی receiver فراخوانی می‌کنیم تا mutex را به دست آوریم، و سپس unwrap را فراخوانی می‌کنیم تا در صورت بروز هرگونه خطا، برنامه متوقف شود. به دست آوردن یک قفل ممکن است شکست بخورد اگر mutex در یک وضعیت poisoned باشد، که ممکن است اتفاق بیفتد اگر یک Thread دیگر در حالی که قفل را نگه داشته است به جای آزاد کردن آن متوقف شده باشد. در این شرایط، فراخوانی unwrap برای متوقف کردن این Thread اقدام درستی است. می‌توانید این unwrap را به یک expect با یک پیام خطای معنادار برای خود تغییر دهید.

اگر قفل روی mutex را به دست آوریم، recv را فراخوانی می‌کنیم تا یک Job را از کانال دریافت کنیم. یک unwrap نهایی نیز در اینجا هر گونه خطا را برطرف می‌کند، که ممکن است رخ دهد اگر Threadی که sender را نگه داشته است خاموش شود، مشابه نحوه‌ای که متد send در صورت خاموش شدن receiver یک Err بازمی‌گرداند.

فراخوانی recv مسدود می‌شود، بنابراین اگر هنوز هیچ کاری وجود نداشته باشد، Thread فعلی منتظر می‌ماند تا یک کار در دسترس قرار گیرد. Mutex<T> تضمین می‌کند که در هر لحظه فقط یک Thread Worker در تلاش برای درخواست یک کار است.

Thread Pool ما اکنون در وضعیت کاری قرار دارد! دستور cargo run را اجرا کنید و چندین درخواست ارسال کنید:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
warning: field `workers` is never read
 --> src/lib.rs:7:5
  |
6 | pub struct ThreadPool {
  |            ---------- field in this struct
7 |     workers: Vec<Worker>,
  |     ^^^^^^^
  |
  = note: `#[warn(dead_code)]` on by default

warning: fields `id` and `thread` are never read
  --> src/lib.rs:48:5
   |
47 | struct Worker {
   |        ------ fields in this struct
48 |     id: usize,
   |     ^^
49 |     thread: thread::JoinHandle<()>,
   |     ^^^^^^

warning: `hello` (lib) generated 2 warnings
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 4.91s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.

موفقیت! اکنون یک Thread Pool داریم که اتصالات را به صورت همزمان اجرا می‌کند. هرگز بیش از چهار Thread ایجاد نمی‌شود، بنابراین اگر سرور درخواست‌های زیادی دریافت کند، سیستم ما بارگذاری بیش از حد نخواهد شد. اگر یک درخواست به /sleep ارسال کنیم، سرور می‌تواند با استفاده از یک Thread دیگر به سایر درخواست‌ها پاسخ دهد.

نکته: اگر /sleep را به طور همزمان در چندین پنجره مرورگر باز کنید، ممکن است یکی پس از دیگری در فواصل ۵ ثانیه‌ای بارگذاری شوند. برخی مرورگرهای وب به دلایل مربوط به کش، چندین نمونه از همان درخواست را به صورت متوالی اجرا می‌کنند. این محدودیت توسط وب سرور ما ایجاد نشده است.

این زمان خوبی است که مکث کنیم و بررسی کنیم چگونه کدهای لیست‌های ۲۱-۱۸، ۲۱-۱۹ و ۲۱-۲۰ اگر به جای Closure از futures برای انجام کار استفاده می‌کردیم، متفاوت می‌بود. چه نوع‌هایی تغییر می‌کردند؟ آیا امضاهای متدها تغییر می‌کردند؟ کدام بخش‌های کد همان‌گونه باقی می‌ماندند؟

پس از یادگیری حلقه while let در فصل‌های ۱۷ و ۱۸، ممکن است تعجب کنید چرا کد Thread Worker را مانند لیست ۲۱-۲۱ ننوشتیم.

Filename: src/lib.rs
use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}
// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}
Listing 21-21: یک پیاده‌سازی جایگزین برای Worker::new با استفاده از while let

این کد کامپایل می‌شود و اجرا می‌شود، اما منجر به رفتار مورد نظر برای threading نمی‌شود: یک درخواست کند همچنان باعث می‌شود سایر درخواست‌ها برای پردازش منتظر بمانند. دلیل آن کمی ظریف است: ساختار Mutex متد عمومی unlock ندارد، زیرا مالکیت قفل بر اساس طول عمر MutexGuard<T> درون LockResult<MutexGuard<T>> که متد lock بازمی‌گرداند است. در زمان کامپایل، بررسی‌کننده وام می‌تواند این قانون را اعمال کند که منبعی که توسط یک Mutex محافظت می‌شود نمی‌تواند دسترسی پیدا کند مگر اینکه قفل را نگه داشته باشیم. با این حال، این پیاده‌سازی همچنین می‌تواند منجر به نگه‌داشتن قفل بیش از حد انتظار شود اگر به طول عمر MutexGuard<T> توجه نکنیم.

کد موجود در لیست ۲۱-۲۰ که از let job = receiver.lock().unwrap().recv().unwrap(); استفاده می‌کند کار می‌کند زیرا با let، هر مقدار موقتی استفاده‌شده در عبارت سمت راست علامت برابر بلافاصله پس از پایان دستور let حذف می‌شود. با این حال، while let (و همچنین if let و match) مقادیر موقتی را تا پایان بلوک مرتبط حذف نمی‌کند. در لیست ۲۱-۲۱، قفل در طول فراخوانی به job() نگه داشته می‌شود، به این معنی که سایر Workerها نمی‌توانند کار دریافت کنند.