Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

تبدیل سرور Single-Threaded به یک سرور Multithreaded

در حال حاضر، سرور هر درخواست را به‌صورت ترتیبی پردازش می‌کند،
یعنی تا زمانی که پردازش درخواست اول به پایان نرسد، درخواست دوم پردازش نخواهد شد.
اگر سرور درخواست‌های بیشتری دریافت کند، این اجرای سریالی به مرور زمان کمتر بهینه خواهد بود.
اگر سروری درخواستی دریافت کند که پردازش آن زمان زیادی ببرد،
درخواست‌های بعدی باید تا پایان پردازش آن درخواست طولانی صبر کنند،
حتی اگر درخواست‌های جدید بتوانند سریع‌تر پردازش شوند.
ما باید این مشکل را برطرف کنیم، اما ابتدا مشکل را به صورت عملی بررسی می‌کنیم.

شبیه‌سازی یک درخواست کند

می‌خواهیم ببینیم چگونه یک درخواست با پردازش کند می‌تواند بر درخواست‌های دیگر به سرور فعلی ما تأثیر بگذارد.
لیستینگ 21-10 نحوه‌ی رسیدگی به درخواستی به مسیر /sleep را پیاده‌سازی می‌کند
که با یک پاسخ شبیه‌سازی شده‌ی کند، باعث می‌شود سرور پنج ثانیه قبل از پاسخ دادن بخوابد.

Filename: src/main.rs
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
// --snip--

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    // --snip--

    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    // --snip--

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
Listing 21-10: شبیه‌سازی یک درخواست کند با خوابیدن به مدت پنج ثانیه

حالا که سه حالت داریم، به جای if از match استفاده کرده‌ایم. باید صریحاً روی یک slice از request_line تطبیق الگو (pattern matching) انجام دهیم تا بتوانیم با مقادیر رشته‌ای literal تطبیق دهیم؛ چون match مانند متد برابری به‌صورت خودکار رفرنس‌گذاری و dereference نمی‌کند.

بازوی اول مشابه بلاک if در لیستینگ 21-9 است. بازوی دوم با درخواستی به مسیر /sleep مطابقت دارد. وقتی این درخواست دریافت شود، سرور به مدت پنج ثانیه می‌خوابد و سپس صفحه‌ی HTML موفقیت‌آمیز را رندر می‌کند. بازوی سوم همانند بلاک else در لیستینگ 21-9 است.

می‌توانید ببینید که سرور ما چقدر ابتدایی است: کتابخانه‌های واقعی مدیریت تشخیص درخواست‌های متعدد را به روشی بسیار کمتر پرحرف انجام می‌دهند!

سرور را با دستور cargo run اجرا کنید.
سپس دو پنجره‌ی مرورگر باز کنید: یکی برای آدرس http://127.0.0.1:7878 و دیگری برای http://127.0.0.1:7878/sleep.
اگر چند بار آدرس / را وارد کنید، مانند قبل پاسخ سریع دریافت خواهید کرد.
اما اگر ابتدا آدرس /sleep را وارد کنید و سپس آدرس / را بارگذاری کنید،
خواهید دید که / تا پایان پنج ثانیه خوابیدن sleep منتظر می‌ماند و سپس بارگذاری می‌شود.

بهبود توان عملیاتی با یک Thread Pool

یک Thread Pool گروهی از Threadهای ایجادشده است که منتظر و آماده برای مدیریت یک وظیفه هستند. وقتی برنامه یک وظیفه جدید دریافت می‌کند، یکی از Threadهای موجود در Pool به وظیفه اختصاص داده می‌شود و آن Thread وظیفه را پردازش می‌کند. Threadهای باقی‌مانده در Pool در دسترس هستند تا هر وظیفه دیگری که وارد شود را در حالی که Thread اول وظیفه خود را پردازش می‌کند، مدیریت کنند. وقتی Thread اول پردازش وظیفه خود را به پایان می‌رساند، به Pool Threadهای بیکار بازمی‌گردد و آماده برای مدیریت یک وظیفه جدید است. یک Thread Pool به شما امکان می‌دهد اتصالات را به صورت همزمان پردازش کنید و توان عملیاتی سرور خود را افزایش دهید.

ما تعداد Threadهای موجود در Pool را به یک عدد کوچک محدود خواهیم کرد تا از حملات Denial of Service (DoS) محافظت کنیم؛ اگر برنامه ما برای هر درخواست جدید یک Thread ایجاد کند، کسی که ۱۰ میلیون درخواست به سرور ما ارسال کند می‌تواند با استفاده از تمام منابع سرور، پردازش درخواست‌ها را متوقف کند.

برای محافظت در برابر حملات DoS، تعداد threadهای موجود در thread pool را محدود می‌کنیم؛ زیرا اگر برنامه‌ی ما برای هر درخواست یک thread جدید بسازد، کسی که ۱۰ میلیون درخواست به سرور ما ارسال کند می‌تواند با مصرف همه‌ی منابع سرور، عملیات پردازش درخواست‌ها را کاملاً متوقف کند.

به جای ایجاد threadهای نامحدود، یک تعداد ثابت thread در pool خواهیم داشت که منتظر می‌مانند. درخواست‌های ورودی به این pool ارسال می‌شوند تا پردازش شوند. pool صفی از درخواست‌های ورودی را نگه می‌دارد. هر یک از threadهای pool یک درخواست را از صف بیرون می‌کشد، آن را پردازش می‌کند، و سپس درخواست بعدی را از صف دریافت می‌کند. با این طراحی، می‌توانیم تا N درخواست را به‌صورت همزمان پردازش کنیم، که N برابر با تعداد threadها است. اگر هر thread در حال پاسخ دادن به یک درخواست طولانی باشد، درخواست‌های بعدی می‌توانند در صف منتظر بمانند، اما ما تعداد درخواست‌های طولانی که می‌توانیم قبل از رسیدن به این نقطه پردازش کنیم را افزایش داده‌ایم.

این تکنیک یکی از روش‌های متعددی است که برای افزایش throughput یک وب سرور وجود دارد. گزینه‌های دیگری که می‌توانید بررسی کنید عبارت‌اند از مدل fork/join، مدل async I/O تک‌نخی، و مدل async I/O چندنخی. اگر به این موضوع علاقه‌مند هستید، می‌توانید درباره‌ی راه‌حل‌های دیگر مطالعه کنید و سعی کنید آن‌ها را پیاده‌سازی کنید؛ با زبانی سطح پایین مانند Rust، تمام این گزینه‌ها ممکن هستند.

مشابه روش توسعه مبتنی بر تست که در پروژه فصل ۱۲ استفاده کردیم، اینجا از توسعه مبتنی بر کامپایلر استفاده می‌کنیم. کدی را که توابع مورد نظرمان را فراخوانی می‌کند، می‌نویسیم و سپس به خطاهای کامپایلر نگاه می‌کنیم تا مشخص کنیم چه تغییراتی باید انجام دهیم تا کد کار کند. با این حال، پیش از انجام این کار، روش دیگری را که قرار نیست استفاده کنیم، به عنوان نقطه شروع بررسی خواهیم کرد.

ایجاد یک thread برای هر درخواست

ابتدا بیایید ببینیم کد ما چگونه خواهد بود اگر برای هر اتصال یک thread جدید ایجاد کند. همان‌طور که قبلاً گفته شد، این برنامه‌ی نهایی ما نیست به دلیل مشکلات احتمالی ایجاد تعداد نامحدود thread، اما نقطه‌ی شروع خوبی برای داشتن یک سرور چندنخی عملی است. سپس به‌عنوان بهبود، thread pool را اضافه خواهیم کرد و مقایسه‌ی این دو راه‌حل ساده‌تر خواهد بود.

لیستینگ 21-11 تغییرات لازم در تابع main را نشان می‌دهد تا برای هر stream در حلقه‌ی for یک thread جدید ایجاد کند و آن را مدیریت نماید.

Filename: src/main.rs
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
Listing 21-11: ایجاد یک Thread جدید برای هر جریان

همان‌طور که در فصل ۱۶ یاد گرفتید، thread::spawn یک Thread جدید ایجاد کرده و سپس کد موجود در کلوزر را در Thread جدید اجرا می‌کند. اگر این کد را اجرا کنید و در مرورگر خود /sleep را باز کنید، سپس / را در دو تب دیگر باز کنید، خواهید دید که درخواست‌های / لازم نیست منتظر پایان درخواست /sleep باشند. با این حال، همان‌طور که ذکر شد، این روش در نهایت سیستم را تحت فشار قرار می‌دهد زیرا شما تعداد نامحدودی Thread بدون محدودیت ایجاد می‌کنید.

ممکن است به یاد بیاورید که این دقیقاً همان شرایطی است که async و await در آن می‌درخشند! این نکته را در ذهن داشته باشید در حالی که Thread Pool را می‌سازیم و به این فکر کنید که چگونه این شرایط با async متفاوت یا مشابه خواهد بود.

Creating a Finite Number of Threads

می‌خواهیم thread pool ما به روشی مشابه و آشنا کار کند،
به‌طوری که تغییر از استفاده‌ی مستقیم از threadها به استفاده از thread pool
نیاز به تغییرات بزرگ در کدی که از API ما استفاده می‌کند نداشته باشد.
لیستینگ 21-12 رابط فرضی ساختار ThreadPool را نشان می‌دهد
که می‌خواهیم به جای thread::spawn از آن استفاده کنیم.

Filename: src/main.rs
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
Listing 21-12: رابط ایده‌آل برای ThreadPool

ما با استفاده از ThreadPool::new یک thread pool جدید با تعداد قابل تنظیم thread ایجاد می‌کنیم، در این مثال تعداد چهار thread است. سپس در حلقه‌ی for، متد pool.execute رابطی مشابه با thread::spawn دارد، که یک closure می‌گیرد و pool باید آن را برای هر stream اجرا کند. ما باید pool.execute را پیاده‌سازی کنیم تا این closure را بگیرد و به یک thread در pool بدهد تا اجرا شود. این کد هنوز کامپایل نخواهد شد، اما این کار را انجام می‌دهیم تا کامپایلر ما را در رفع خطاها راهنمایی کند.

ساخت ThreadPool با استفاده از توسعه‌ی مبتنی بر خطاهای کامپایلر

تغییرات لیستینگ 21-12 را در فایل src/main.rs اعمال کنید، سپس اجازه دهید خطاهای کامپایلر از دستور cargo check روند توسعه‌ی ما را هدایت کنند. در اینجا اولین خطایی که دریافت می‌کنیم آمده است:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
  --> src/main.rs:11:16
   |
11 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type `ThreadPool`

For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error

عالی! این خطا به ما می‌گوید که به یک نوع یا ماژول ThreadPool نیاز داریم، پس اکنون یکی می‌سازیم. پیاده‌سازی ThreadPool ما مستقل از نوع کاری است که وب‌سرور انجام می‌دهد. بنابراین، بیایید crate hello را از یک crate دودویی به یک crate کتابخانه‌ای تبدیل کنیم تا پیاده‌سازی ThreadPool را در آن قرار دهیم. پس از تبدیل به crate کتابخانه‌ای، می‌توانیم از این کتابخانه‌ی thread pool جداگانه برای هر کاری که می‌خواهیم با thread pool انجام دهیم استفاده کنیم، نه فقط برای سرویس‌دهی به درخواست‌های وب.

یک فایل src/lib.rs ایجاد کنید که شامل کد زیر باشد، که ساده‌ترین تعریف ممکن برای ThreadPool است که فعلاً می‌توانیم داشته باشیم:

Filename: src/lib.rs
pub struct ThreadPool;

سپس فایل main.rs را ویرایش کنید تا با افزودن کد زیر به بالای فایل src/main.rs، ThreadPool را از crate کتابخانه‌ای وارد حوزه (scope) کنید:

Filename: src/main.rs
use hello::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

این کد همچنان کار نخواهد کرد، اما بیایید دوباره آن را بررسی کنیم تا خطای بعدی که باید برطرف کنیم را ببینیم:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
  --> src/main.rs:12:28
   |
12 |     let pool = ThreadPool::new(4);
   |                            ^^^ function or associated item not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error

این خطا نشان می‌دهد که باید تابع وابسته‌ای به نام new برای ThreadPool ایجاد کنیم. همچنین می‌دانیم که new باید یک پارامتر داشته باشد که بتواند مقدار 4 را به عنوان آرگومان بپذیرد و یک نمونه از ThreadPool بازگرداند. بیایید ساده‌ترین تابع new که این خصوصیات را دارد پیاده‌سازی کنیم:

Filename: src/lib.rs
pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

ما نوع پارامتر size را usize انتخاب کردیم چون می‌دانیم تعداد منفی thread منطقی نیست. همچنین می‌دانیم که این مقدار 4 را به‌عنوان تعداد عناصر در یک مجموعه از threadها استفاده خواهیم کرد، که برای همین منظور نوع usize مناسب است، همان‌طور که در بخش “انواع عدد صحیح” در فصل ۳ توضیح داده شده است.

بیایید دوباره کد را بررسی کنیم:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
  --> src/main.rs:17:14
   |
17 |         pool.execute(|| {
   |         -----^^^^^^^ method not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error

اکنون خطا رخ می‌دهد چون متد execute روی ThreadPool تعریف نشده است.
به یاد بیاورید از بخش “ایجاد تعداد محدودی thread”
که تصمیم گرفتیم رابط کاربری thread pool ما شبیه به thread::spawn باشد.
علاوه بر این، متد execute را پیاده‌سازی خواهیم کرد تا closure دریافت‌شده را بگیرد
و آن را به یک thread بیکار در pool بدهد تا اجرا کند.

متد execute را روی ThreadPool تعریف می‌کنیم تا یک closure را به‌عنوان پارامتر بگیرد.
به یاد بیاورید از بخش “انتقال مقادیر گرفته‌شده از closure و traitهای Fn در فصل ۱۳
که می‌توانیم closureها را با سه trait مختلف به‌عنوان پارامتر بگیریم: Fn، FnMut و FnOnce.
باید تصمیم بگیریم در اینجا از کدام نوع closure استفاده کنیم.
می‌دانیم که قرار است کاری مشابه پیاده‌سازی thread::spawn در کتابخانه استاندارد انجام دهیم،
پس می‌توانیم به محدودیت‌هایی که امضای تابع thread::spawn روی پارامترش دارد نگاه کنیم.
مستندات به ما موارد زیر را نشان می‌دهد:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

نوع پارامتر F همان چیزی است که در اینجا به آن توجه داریم؛ پارامتر نوع T مربوط به مقدار بازگشتی است و ما به آن توجه نداریم. می‌توانیم ببینیم که spawn از FnOnce به عنوان محدودیت ویژگی روی F استفاده می‌کند. این احتمالاً چیزی است که ما نیز می‌خواهیم، زیرا در نهایت آرگومان دریافتی در execute را به spawn پاس می‌دهیم. ما اطمینان بیشتری داریم که FnOnce همان ویژگی مورد نظر ما است، زیرا Thread برای اجرای یک درخواست فقط Closure مربوط به آن درخواست را یک بار اجرا می‌کند، که با “Once” در FnOnce مطابقت دارد.

پارامتر نوع F همچنین دارای محدودیت ویژگی Send و محدودیت طول عمر 'static است، که در وضعیت ما مفید هستند: ما به Send نیاز داریم تا Closure را از یک Thread به Thread دیگر منتقل کنیم و به 'static نیاز داریم زیرا نمی‌دانیم اجرای Thread چه مدت طول می‌کشد. بیایید یک متد execute روی ThreadPool ایجاد کنیم که یک پارامتر عمومی از نوع F با این محدودیت‌ها بپذیرد:

Filename: src/lib.rs
pub struct ThreadPool;

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

ما همچنان از () پس از FnOnce استفاده می‌کنیم زیرا این FnOnce نشان‌دهنده یک Closure است که هیچ پارامتری نمی‌گیرد و نوع () را بازمی‌گرداند. درست مانند تعریف توابع، می‌توان نوع بازگشتی را از امضا حذف کرد، اما حتی اگر هیچ پارامتری نداشته باشیم، همچنان به پرانتزها نیاز داریم.

دوباره، این ساده‌ترین پیاده‌سازی متد execute است: هیچ کاری انجام نمی‌دهد، اما هدف ما فقط این است که کدمان کامپایل شود. بیایید دوباره آن را بررسی کنیم:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s

کد کامپایل می‌شود! اما توجه داشته باشید که اگر cargo run را اجرا کنید و در مرورگر یک درخواست ارسال کنید، خطاهایی را در مرورگر خواهید دید که در ابتدای فصل دیده بودیم. کتابخانه ما هنوز Closure پاس‌داده‌شده به execute را فراخوانی نمی‌کند!

نکته: یک ضرب‌المثل درباره زبان‌هایی با کامپایلرهای سخت‌گیر، مانند Haskell و Rust، این است که “اگر کد کامپایل شود، کار می‌کند.” اما این ضرب‌المثل همیشه درست نیست. پروژه ما کامپایل می‌شود، اما هیچ کاری انجام نمی‌دهد! اگر در حال ساخت یک پروژه واقعی و کامل بودیم، اکنون زمان خوبی برای شروع نوشتن تست‌های واحد بود تا بررسی کنیم که کد هم کامپایل می‌شود و رفتار مورد نظر ما را دارد.

فرض کنید: اگر قرار بود به جای closure، یک future اجرا کنیم، چه تفاوت‌هایی در اینجا وجود داشت؟

اعتبارسنجی تعداد Threadها در new

فعلاً با پارامترهای new و execute کاری انجام نمی‌دهیم. بیایید بدنه‌ی این توابع را با رفتار مورد نظر پیاده‌سازی کنیم. برای شروع، به تابع new فکر کنیم. قبلاً برای پارامتر size نوع بدون علامت (unsigned) را انتخاب کردیم، چون یک pool با تعداد منفی thread منطقی نیست. اما یک pool با صفر thread نیز منطقی نیست، با این‌که صفر یک مقدار معتبر از نوع usize است. ما کدی اضافه خواهیم کرد که بررسی کند مقدار size بزرگ‌تر از صفر باشد، و اگر صفر دریافت شد، با استفاده از ماکروی assert! برنامه panic کند، همان‌طور که در لیستینگ 21-13 نشان داده شده است.

Filename: src/lib.rs
pub struct ThreadPool;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        ThreadPool
    }

    // --snip--
    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}
Listing 21-13: پیاده‌سازی ThreadPool::new برای توقف برنامه در صورت صفر بودن size

ما همچنین برخی مستندات برای ThreadPool خود با استفاده از نظرات داکیومنت (doc comments) اضافه کرده‌ایم. توجه داشته باشید که ما از اصول خوب مستندسازی پیروی کرده‌ایم و بخشی را اضافه کرده‌ایم که شرایطی که تابع ما ممکن است به وحشت بیفتد (panic) را توضیح می‌دهد، همان‌طور که در فصل ۱۴ مورد بحث قرار گرفت. دستور cargo doc --open را اجرا کنید و روی ساختار ThreadPool کلیک کنید تا ببینید مستندات تولیدشده برای new چگونه به نظر می‌رسند!

به جای اضافه کردن ماکروی assert! همان‌طور که اینجا انجام دادیم، می‌توانستیم new را به build تغییر دهیم و یک Result بازگردانیم، مانند آنچه با Config::build در پروژه I/O در لیست ۱۲-۹ انجام دادیم. اما در این مورد تصمیم گرفته‌ایم که تلاش برای ایجاد یک Thread Pool بدون هیچ Threadی باید یک خطای غیرقابل بازیابی باشد. اگر احساس جاه‌طلبی می‌کنید، سعی کنید تابعی به نام build با امضای زیر بنویسید تا با تابع new مقایسه کنید:

pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {

ایجاد فضایی برای ذخیره Threadها

اکنون که روشی برای اطمینان از تعداد معتبر Threadهایی که در Pool ذخیره می‌شوند داریم، می‌توانیم این Threadها را ایجاد کرده و آن‌ها را در ساختار ThreadPool قبل از بازگرداندن ساختار ذخیره کنیم. اما چگونه می‌توانیم یک Thread را “ذخیره” کنیم؟ بیایید دوباره به امضای thread::spawn نگاه کنیم:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

تابع spawn یک JoinHandle<T> بازمی‌گرداند، جایی که T نوعی است که Closure بازمی‌گرداند. بیایید ما هم از JoinHandle استفاده کنیم و ببینیم چه اتفاقی می‌افتد. در مورد ما، Closureهایی که به Thread Pool ارسال می‌کنیم اتصال را مدیریت کرده و چیزی بازنمی‌گردانند، بنابراین T برابر با نوع واحد () خواهد بود.

کد موجود در لیست ۲۱-۱۴ کامپایل می‌شود اما هنوز هیچ Threadی ایجاد نمی‌کند. ما تعریف ThreadPool را تغییر داده‌ایم تا یک بردار از نمونه‌های thread::JoinHandle<()> را نگه دارد، بردار را با ظرفیتی برابر با size مقداردهی اولیه کرده‌ایم، یک حلقه for تنظیم کرده‌ایم که کدی برای ایجاد Threadها اجرا می‌کند، و یک نمونه از ThreadPool که آن‌ها را در خود دارد بازمی‌گرداند.

Filename: src/lib.rs
use std::thread;

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // create some threads and store them in the vector
        }

        ThreadPool { threads }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}
Listing 21-14: ایجاد یک بردار برای ThreadPool برای نگهداری Threadها

ما std::thread را در crate کتابخانه‌ای وارد حوزه کرده‌ایم، چون از thread::JoinHandle به‌عنوان نوع آیتم‌های موجود در بردار داخل ThreadPool استفاده می‌کنیم.

پس از دریافت مقدار معتبر برای size، ThreadPool ما یک بردار جدید ایجاد می‌کند که می‌تواند size آیتم را در خود نگه دارد. تابع with_capacity همان کاری را انجام می‌دهد که Vec::new انجام می‌دهد، اما با یک تفاوت مهم: فضای لازم را از پیش در بردار تخصیص می‌دهد. از آن‌جایی که می‌دانیم باید size عنصر در بردار ذخیره کنیم، انجام این تخصیص پیشاپیش کمی کارآمدتر از استفاده از Vec::new است، که هنگام وارد کردن عناصر، اندازه‌ی خود را تغییر می‌دهد.

وقتی دوباره cargo check را اجرا کنید، باید با موفقیت انجام شود.

ساختار Worker مسئول ارسال کد از ThreadPool به یک Thread

در حلقه for در لیست ۲۱-۱۴، نظری در مورد ایجاد Threadها گذاشتیم. در اینجا بررسی خواهیم کرد که چگونه واقعاً Threadها را ایجاد می‌کنیم. کتابخانه استاندارد thread::spawn را به عنوان روشی برای ایجاد Threadها ارائه می‌دهد، و thread::spawn انتظار دارد کدی دریافت کند که Thread بلافاصله پس از ایجاد اجرا کند. با این حال، در مورد ما، می‌خواهیم Threadها را ایجاد کنیم و آن‌ها را منتظر نگه داریم تا کدی که بعداً ارسال می‌کنیم را اجرا کنند. پیاده‌سازی Threadها در کتابخانه استاندارد هیچ راهی برای انجام این کار ارائه نمی‌دهد؛ بنابراین باید آن را به صورت دستی پیاده‌سازی کنیم.

این رفتار را با معرفی یک ساختار داده‌ی جدید بین ThreadPool و threadها پیاده‌سازی می‌کنیم که این رفتار جدید را مدیریت کند. این ساختار داده را Worker می‌نامیم که اصطلاح رایجی در پیاده‌سازی‌های pooling است. Worker کدی که باید اجرا شود را دریافت می‌کند و آن را در thread خودش اجرا می‌کند.

این را مانند افرادی در آشپزخانه‌ی یک رستوران تصور کنید: کارگران منتظر می‌مانند تا سفارش‌ها از مشتریان برسد، سپس مسئول پذیرش و آماده‌سازی آن سفارش‌ها هستند.

به جای نگه‌داشتن یک بردار از نمونه‌های JoinHandle<()> در thread pool، نمونه‌های Worker را ذخیره خواهیم کرد. هر Worker یک نمونه‌ی تک JoinHandle<()> نگه می‌دارد. سپس متدی روی Worker پیاده‌سازی می‌کنیم که یک closure از کد برای اجرا بگیرد و آن را به thread در حال اجرای مربوطه برای اجرا ارسال کند. همچنین به هر Worker یک id اختصاص می‌دهیم تا بتوانیم هنگام لاگ‌گیری یا اشکال‌زدایی، بین نمونه‌های مختلف Worker در pool تمایز قائل شویم.

این فرآیند جدیدی است که هنگام ایجاد یک ThreadPool اتفاق می‌افتد. کدی که Closure را به Thread ارسال می‌کند، پس از تنظیم Worker به این شکل پیاده‌سازی خواهد شد:

۱. یک struct به نام Worker تعریف کنید که شامل یک فیلد id و یک JoinHandle<()> باشد. ۲. ساختار ThreadPool را تغییر دهید تا یک بردار از نمونه‌های Worker نگه دارد. ۳. تابعی به نام Worker::new تعریف کنید که یک شماره‌ی id بگیرد و یک نمونه Worker بازگرداند که شامل آن id و یک thread ساخته شده با یک closure خالی باشد. ۴. در تابع ThreadPool::new، از شمارنده حلقه‌ی for برای تولید id استفاده کنید، یک Worker جدید با آن id بسازید و آن را در بردار ذخیره کنید.

اگر آماده یک چالش هستید، سعی کنید این تغییرات را خودتان پیاده‌سازی کنید قبل از اینکه به کد موجود در لیست ۲۱-۱۵ نگاه کنید.

آماده‌اید؟ در اینجا لیست ۲۱-۱۵ با یک روش برای انجام اصلاحات قبلی آورده شده است.

Filename: src/lib.rs
use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}
Listing 21-15: تغییر ThreadPool برای نگهداری نمونه‌های Worker به جای نگهداری مستقیم Threadها

ما نام فیلد موجود در ThreadPool را از threads به workers تغییر داده‌ایم زیرا اکنون نمونه‌های Worker را نگه می‌دارد، نه نمونه‌های JoinHandle<()>. از شمارنده حلقه for به عنوان آرگومان برای Worker::new استفاده می‌کنیم و هر Worker جدید را در بردار به نام workers ذخیره می‌کنیم.

کد خارجی (مانند سرور ما در src/main.rs) نیازی ندارد جزئیات پیاده‌سازی مربوط به استفاده از ساختار Worker در داخل ThreadPool را بداند، بنابراین ساختار Worker و تابع new آن را خصوصی می‌کنیم. تابع Worker::new از id داده‌شده استفاده کرده و یک نمونه JoinHandle<()> ایجاد می‌کند که با ایجاد یک Thread جدید با یک Closure خالی تولید می‌شود.

نکته: اگر سیستم‌عامل نتواند به دلیل کمبود منابع سیستم، یک Thread ایجاد کند، thread::spawn به وحشت خواهد افتاد (panic). این باعث می‌شود کل سرور ما به وحشت بیفتد، حتی اگر ایجاد برخی Threadها موفق باشد. برای سادگی، این رفتار مشکلی ندارد، اما در یک پیاده‌سازی تولیدی برای Thread Pool، احتمالاً از std::thread::Builder و متد spawn که یک Result بازمی‌گرداند، استفاده می‌کنید.

این کد کامپایل خواهد شد و تعداد نمونه‌های Worker را که به عنوان آرگومان به ThreadPool::new مشخص کرده‌ایم ذخیره می‌کند. اما ما هنوز Closureی که در execute دریافت می‌کنیم را پردازش نمی‌کنیم. بیایید بررسی کنیم چگونه این کار را انجام دهیم.

ارسال درخواست‌ها به Threadها از طریق Channelها

مشکل بعدی که به آن می‌پردازیم این است که Closureهایی که به thread::spawn داده شده‌اند، هیچ کاری انجام نمی‌دهند. در حال حاضر، Closureی که می‌خواهیم اجرا کنیم را در متد execute دریافت می‌کنیم. اما نیاز داریم که یک Closure به thread::spawn بدهیم تا در هنگام ایجاد هر Worker در حین ایجاد ThreadPool اجرا شود.

می‌خواهیم ساختارهای Worker که به تازگی ایجاد کرده‌ایم، کدی را که باید اجرا شود از یک صف که در ThreadPool نگهداری می‌شود دریافت کرده و آن کد را به Thread خود برای اجرا ارسال کنند.

Channelهایی که در فصل ۱۶ یاد گرفتیم—راهی ساده برای ارتباط بین دو Thread—برای این مورد استفاده مناسب هستند. ما از یک Channel به عنوان صف کارها استفاده خواهیم کرد و execute یک کار را از ThreadPool به نمونه‌های Worker ارسال می‌کند، که این کار را به Thread خود ارسال می‌کنند. برنامه به شرح زیر خواهد بود:

  1. ThreadPool یک Channel ایجاد کرده و نگهدارنده sender آن خواهد بود.
  2. هر Worker نگهدارنده receiver خواهد بود.
  3. یک ساختار Job جدید ایجاد خواهیم کرد که Closureهایی که می‌خواهیم از طریق Channel ارسال کنیم را نگه می‌دارد.
  4. متد execute کاری که می‌خواهد اجرا کند را از طریق sender ارسال خواهد کرد.
  5. در Thread خود، Worker بر receiver خود حلقه زده و Closureهای هر کاری که دریافت می‌کند را اجرا خواهد کرد.

بیایید با ایجاد یک Channel در ThreadPool::new و نگهداری sender در نمونه ThreadPool شروع کنیم، همان‌طور که در لیست ۲۱-۱۶ نشان داده شده است. ساختار Job در حال حاضر چیزی نگه نمی‌دارد، اما نوع آیتمی خواهد بود که از طریق Channel ارسال می‌کنیم.

Filename: src/lib.rs
use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}
Listing 21-16: تغییر ThreadPool برای ذخیره sender یک Channel که نمونه‌های Job را منتقل می‌کند

در ThreadPool::new، یک Channel جدید ایجاد می‌کنیم و Pool نگهدارنده sender خواهد بود. این کد با موفقیت کامپایل می‌شود.

بیایید هنگام ایجاد کانال توسط thread pool، receiver کانال را به هر Worker ارسال کنیم. می‌دانیم که می‌خواهیم از receiver در threadی که نمونه‌های Worker ایجاد می‌کنند استفاده کنیم، پس در closure به پارامتر receiver رفرنس می‌دهیم. کد موجود در لیستینگ 21-17 هنوز به‌طور کامل کامپایل نمی‌شود.

Filename: src/lib.rs
use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, receiver));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--


struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}
Listing 21-17: ارسال receiver به هر Worker

ما تغییرات کوچک و واضحی ایجاد کرده‌ایم: receiver را به Worker::new ارسال کرده‌ایم و سپس از آن در داخل Closure استفاده کرده‌ایم.

هنگامی که تلاش می‌کنیم این کد را بررسی کنیم، با این خطا مواجه می‌شویم:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:26:42
   |
21 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 |         for id in 0..size {
   |         ----------------- inside of this loop
26 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here, in previous iteration of loop
   |
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
  --> src/lib.rs:47:33
   |
47 |     fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
   |        --- in this method       ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
   |
25 ~         let mut value = Worker::new(id, receiver);
26 ~         for id in 0..size {
27 ~             workers.push(value);
   |

For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error

کد در تلاش است تا receiver را به چند نمونه‌ی مختلف از Worker ارسال کند.
این کار عملی نیست، همان‌طور که در فصل ۱۶ یاد گرفتیم: پیاده‌سازی کانال در Rust به صورت multiple producer و single consumer است.
یعنی نمی‌توانیم انتهای مصرف‌کننده‌ی کانال را clone کنیم تا این کد را اصلاح کنیم.
همچنین نمی‌خواهیم یک پیام را چند بار به چند مصرف‌کننده ارسال کنیم؛
هدف این است که یک لیست از پیام‌ها داشته باشیم که چند نمونه Worker آن را دریافت کنند،
طوری که هر پیام فقط یک بار پردازش شود.

علاوه بر این، برداشتن یک کار از صف کانال شامل تغییر receiver می‌شود، بنابراین Threadها به یک روش امن برای اشتراک و تغییر receiver نیاز دارند؛ در غیر این صورت، ممکن است با شرایط رقابتی (race conditions) مواجه شویم (همان‌طور که در فصل ۱۶ توضیح داده شد).

به یاد بیاورید اشاره‌گرهای هوشمند ایمن در برابر thread که در فصل ۱۶ بحث شدند: برای اشتراک مالکیت بین چند thread و اجازه دادن به تغییر مقدار توسط threadها، باید از Arc<Mutex<T>> استفاده کنیم. نوع Arc اجازه می‌دهد چند نمونه‌ی Worker مالک receiver باشند، و Mutex تضمین می‌کند که در هر لحظه فقط یک Worker بتواند از receiver یک کار دریافت کند. لیستینگ 21-18 تغییراتی را که باید انجام دهیم نشان می‌دهد.

Filename: src/lib.rs
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};
// --snip--

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}
Listing 21-18: اشتراک‌گذاری receiver بین نمونه‌های Worker با استفاده از Arc و Mutex

در تابع ThreadPool::new، receiver را داخل یک Arc و یک Mutex قرار می‌دهیم. برای هر نمونه‌ی جدید از Worker، Arc را clone می‌کنیم تا شمارنده‌ی رفرنس افزایش یابد، به‌طوری که نمونه‌های Worker بتوانند مالکیت مشترک receiver را داشته باشند.

با این تغییرات، کد کامپایل می‌شود! به نتیجه نزدیک‌تر می‌شویم!

پیاده‌سازی متد execute

بیایید در نهایت متد execute را روی ThreadPool پیاده‌سازی کنیم.
همچنین Job را از یک struct به یک type alias برای یک trait object تبدیل می‌کنیم
که نوع closure ای را که execute دریافت می‌کند نگه می‌دارد.
همان‌طور که در بخش “ایجاد مترادف‌های نوع با type alias” در فصل ۲۰ بحث شد،
type alias به ما اجازه می‌دهد تا انواع طولانی را کوتاه‌تر کنیم و استفاده از آن‌ها را آسان‌تر سازیم.
به لیستینگ 21-19 نگاه کنید.

Filename: src/lib.rs
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}
Listing 21-19: ایجاد یک نام مستعار Job برای یک Box که هر Closure را نگه می‌دارد و سپس ارسال کار از طریق کانال

پس از ایجاد یک نمونه جدید Job با استفاده از Closureی که در execute دریافت می‌کنیم، آن کار را از طریق بخش ارسال‌کننده کانال ارسال می‌کنیم. ما برای حالتی که ارسال شکست بخورد، روی send از unwrap استفاده می‌کنیم. این حالت ممکن است رخ دهد، اگر مثلاً همه Threadهای ما از اجرا متوقف شوند، به این معنی که بخش دریافت‌کننده دیگر پیام‌های جدید را دریافت نمی‌کند. در حال حاضر، نمی‌توانیم Threadهای خود را از اجرا متوقف کنیم: Threadهای ما تا زمانی که Pool وجود دارد اجرا می‌شوند. دلیل استفاده از unwrap این است که می‌دانیم حالت شکست رخ نخواهد داد، اما کامپایلر این موضوع را نمی‌داند.

اما هنوز کار تمام نشده است! در Worker، closure که به thread::spawn داده می‌شود
هنوز فقط به انتهای دریافت‌کننده‌ی کانال رفرنس می‌دهد.
در عوض، نیاز داریم که closure به‌طور پیوسته در حلقه‌ای بی‌نهایت اجرا شود،
از انتهای دریافت‌کننده‌ی کانال درخواست کار کند و هرگاه کار دریافت کرد آن را اجرا نماید.
بیایید تغییرات نشان‌داده شده در لیستینگ 21-20 را در Worker::new اعمال کنیم.

Filename: src/lib.rs
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}
Listing 21-20: دریافت و اجرای کارها در thread نمونه‌ی Worker

در اینجا، ابتدا lock را روی receiver فراخوانی می‌کنیم تا mutex را به دست آوریم، و سپس unwrap را فراخوانی می‌کنیم تا در صورت بروز هرگونه خطا، برنامه متوقف شود. به دست آوردن یک قفل ممکن است شکست بخورد اگر mutex در یک وضعیت poisoned باشد، که ممکن است اتفاق بیفتد اگر یک Thread دیگر در حالی که قفل را نگه داشته است به جای آزاد کردن آن متوقف شده باشد. در این شرایط، فراخوانی unwrap برای متوقف کردن این Thread اقدام درستی است. می‌توانید این unwrap را به یک expect با یک پیام خطای معنادار برای خود تغییر دهید.

اگر قفل روی mutex را به دست آوریم، recv را فراخوانی می‌کنیم تا یک Job را از کانال دریافت کنیم. یک unwrap نهایی نیز در اینجا هر گونه خطا را برطرف می‌کند، که ممکن است رخ دهد اگر Threadی که sender را نگه داشته است خاموش شود، مشابه نحوه‌ای که متد send در صورت خاموش شدن receiver یک Err بازمی‌گرداند.

فراخوانی recv مسدود می‌شود، بنابراین اگر هنوز هیچ کاری وجود نداشته باشد، Thread فعلی منتظر می‌ماند تا یک کار در دسترس قرار گیرد. Mutex<T> تضمین می‌کند که در هر لحظه فقط یک Thread Worker در تلاش برای درخواست یک کار است.

Thread Pool ما اکنون در وضعیت کاری قرار دارد! دستور cargo run را اجرا کنید و چندین درخواست ارسال کنید:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
warning: field `workers` is never read
 --> src/lib.rs:7:5
  |
6 | pub struct ThreadPool {
  |            ---------- field in this struct
7 |     workers: Vec<Worker>,
  |     ^^^^^^^
  |
  = note: `#[warn(dead_code)]` on by default

warning: fields `id` and `thread` are never read
  --> src/lib.rs:48:5
   |
47 | struct Worker {
   |        ------ fields in this struct
48 |     id: usize,
   |     ^^
49 |     thread: thread::JoinHandle<()>,
   |     ^^^^^^

warning: `hello` (lib) generated 2 warnings
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 4.91s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.

موفقیت! اکنون یک Thread Pool داریم که اتصالات را به صورت همزمان اجرا می‌کند. هرگز بیش از چهار Thread ایجاد نمی‌شود، بنابراین اگر سرور درخواست‌های زیادی دریافت کند، سیستم ما بارگذاری بیش از حد نخواهد شد. اگر یک درخواست به /sleep ارسال کنیم، سرور می‌تواند با استفاده از یک Thread دیگر به سایر درخواست‌ها پاسخ دهد.

توجه: اگر مسیر /sleep را هم‌زمان در چندین پنجره‌ی مرورگر باز کنید، ممکن است درخواست‌ها به‌صورت پشت سر هم و با فاصله‌های پنج ثانیه‌ای بارگذاری شوند. برخی مرورگرها به دلایل کش، چندین نمونه از یک درخواست مشابه را به‌صورت ترتیبی اجرا می‌کنند. این محدودیت ناشی از سرور وب ما نیست.

این زمان خوبی است که مکث کنیم و بررسی کنیم چگونه کدهای لیست‌های ۲۱-۱۸، ۲۱-۱۹ و ۲۱-۲۰ اگر به جای Closure از futures برای انجام کار استفاده می‌کردیم، متفاوت می‌بود. چه نوع‌هایی تغییر می‌کردند؟ آیا امضاهای متدها تغییر می‌کردند؟ کدام بخش‌های کد همان‌گونه باقی می‌ماندند؟

بعد از آشنایی با حلقه‌ی while let در فصل‌های ۱۷ و ۱۹، ممکن است این سؤال برایتان پیش آمده باشد که چرا کد thread مربوط به Worker را مانند آنچه در لیستینگ 21-21 نشان داده شده ننوشته‌ایم.

Filename: src/lib.rs
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}
// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}
Listing 21-21: یک پیاده‌سازی جایگزین برای Worker::new با استفاده از while let

این کد کامپایل می‌شود و اجرا می‌شود، اما منجر به رفتار مورد نظر برای threading نمی‌شود: یک درخواست کند همچنان باعث می‌شود سایر درخواست‌ها برای پردازش منتظر بمانند. دلیل آن کمی ظریف است: ساختار Mutex متد عمومی unlock ندارد، زیرا مالکیت قفل بر اساس طول عمر MutexGuard<T> درون LockResult<MutexGuard<T>> که متد lock بازمی‌گرداند است. در زمان کامپایل، بررسی‌کننده وام می‌تواند این قانون را اعمال کند که منبعی که توسط یک Mutex محافظت می‌شود نمی‌تواند دسترسی پیدا کند مگر اینکه قفل را نگه داشته باشیم. با این حال، این پیاده‌سازی همچنین می‌تواند منجر به نگه‌داشتن قفل بیش از حد انتظار شود اگر به طول عمر MutexGuard<T> توجه نکنیم.

کدی که در لیستینگ 21-20 با عبارت let job = receiver.lock().unwrap().recv().unwrap(); نوشته شده است، کار می‌کند زیرا در استفاده از let، هر مقدار موقتی که در سمت راست علامت مساوی به کار رفته باشد، بلافاصله پس از پایان دستور let رها (drop) می‌شود. اما در while let (و همچنین if let و match) مقدارهای موقتی تا پایان بلاک مربوطه رها نمی‌شوند. در لیستینگ 21-21، قفل (lock) تا پایان فراخوانی job() نگه داشته می‌شود، که این یعنی سایر نمونه‌های Worker نمی‌توانند در آن مدت کار دریافت کنند.