تبدیل سرور Single-Threaded به یک سرور Multithreaded
در حال حاضر، سرور هر درخواست را به نوبت پردازش میکند، به این معنی که تا زمانی که پردازش اولین اتصال تمام نشده باشد، اتصال دوم پردازش نمیشود. اگر سرور درخواستهای بیشتری دریافت کند، این اجرای سریال کمتر و کمتر بهینه خواهد بود. اگر سرور درخواستی دریافت کند که پردازش آن زمان زیادی میبرد، درخواستهای بعدی باید منتظر بمانند تا درخواست طولانی تمام شود، حتی اگر بتوان درخواستهای جدید را به سرعت پردازش کرد. ما باید این مشکل را رفع کنیم، اما ابتدا به این مشکل در عمل نگاه میکنیم.
شبیهسازی یک درخواست کند در پیادهسازی فعلی سرور
ما بررسی میکنیم که چگونه یک درخواست با پردازش کند میتواند بر سایر درخواستهای ارسالشده به پیادهسازی فعلی سرور تأثیر بگذارد. لیست ۲۱-۱۰ پیادهسازی مدیریت یک درخواست به /sleep را نشان میدهد که یک پاسخ کند شبیهسازیشده است و باعث میشود سرور قبل از پاسخ دادن به مدت ۵ ثانیه بخوابد.
use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration, }; // --snip-- fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { // --snip-- let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; // --snip-- let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }
ما از if
به match
تغییر دادهایم زیرا اکنون سه حالت داریم. باید بهطور صریح روی یک برش از request_line
الگو تطابق ایجاد کنیم تا مقادیر رشتهای را تطابق دهیم؛ match
به طور خودکار مرجعدهی و عدم مرجعدهی را مانند متد برابری انجام نمیدهد.
بازوی اول همان بلوک if
از لیست ۲۱-۹ است. بازوی دوم یک درخواست به /sleep را تطابق میدهد. وقتی آن درخواست دریافت شود، سرور به مدت ۵ ثانیه میخوابد قبل از اینکه صفحه HTML موفقیتآمیز را نمایش دهد. بازوی سوم همان بلوک else
از لیست ۲۱-۹ است.
میتوانید ببینید که سرور ما چقدر ابتدایی است: کتابخانههای واقعی مدیریت تشخیص درخواستهای متعدد را به روشی بسیار کمتر پرحرف انجام میدهند!
سرور را با استفاده از cargo run
اجرا کنید. سپس دو پنجره مرورگر باز کنید: یکی برای آدرس http://127.0.0.1:7878/ و دیگری برای آدرس http://127.0.0.1:7878/sleep. اگر چند بار آدرس / را وارد کنید، میبینید که سریع پاسخ میدهد. اما اگر آدرس /sleep را وارد کنید و سپس / را بارگذاری کنید، خواهید دید که / منتظر میماند تا درخواست /sleep برای ۵ ثانیه کامل بخوابد و سپس بارگذاری شود.
بهبود توان عملیاتی با یک Thread Pool
یک Thread Pool گروهی از Threadهای ایجادشده است که منتظر و آماده برای مدیریت یک وظیفه هستند. وقتی برنامه یک وظیفه جدید دریافت میکند، یکی از Threadهای موجود در Pool به وظیفه اختصاص داده میشود و آن Thread وظیفه را پردازش میکند. Threadهای باقیمانده در Pool در دسترس هستند تا هر وظیفه دیگری که وارد شود را در حالی که Thread اول وظیفه خود را پردازش میکند، مدیریت کنند. وقتی Thread اول پردازش وظیفه خود را به پایان میرساند، به Pool Threadهای بیکار بازمیگردد و آماده برای مدیریت یک وظیفه جدید است. یک Thread Pool به شما امکان میدهد اتصالات را به صورت همزمان پردازش کنید و توان عملیاتی سرور خود را افزایش دهید.
ما تعداد Threadهای موجود در Pool را به یک عدد کوچک محدود خواهیم کرد تا از حملات Denial of Service (DoS) محافظت کنیم؛ اگر برنامه ما برای هر درخواست جدید یک Thread ایجاد کند، کسی که ۱۰ میلیون درخواست به سرور ما ارسال کند میتواند با استفاده از تمام منابع سرور، پردازش درخواستها را متوقف کند.
به جای ایجاد تعداد نامحدودی از Threadها، تعداد ثابتی از Threadها را در Pool خواهیم داشت که منتظر پردازش وظایف هستند. درخواستهایی که وارد میشوند به Pool ارسال میشوند. Pool یک صف از درخواستهای ورودی را مدیریت خواهد کرد. هر یک از Threadها در Pool یک درخواست از صف برداشته، درخواست را پردازش میکند و سپس درخواست دیگری از صف درخواست میکند. با این طراحی، میتوانیم حداکثر تا N
درخواست را به صورت همزمان پردازش کنیم، جایی که N
تعداد Threadها است. اگر هر Thread به یک درخواست طولانی پاسخ دهد، درخواستهای بعدی ممکن است در صف پشتیبانی شوند، اما تعداد درخواستهای طولانی که میتوانیم قبل از رسیدن به این نقطه مدیریت کنیم افزایش یافته است.
این تکنیک تنها یکی از راههای بهبود توان عملیاتی یک وب سرور است. گزینههای دیگری که ممکن است بررسی کنید شامل مدل fork/join، مدل I/O async تکThreaded، یا مدل I/O async چندThreaded هستند. اگر به این موضوع علاقه دارید، میتوانید بیشتر در مورد راهحلهای دیگر بخوانید و آنها را پیادهسازی کنید؛ با یک زبان سطح پایین مانند Rust، همه این گزینهها ممکن هستند.
پیش از آنکه پیادهسازی یک Thread Pool را شروع کنیم، بیایید در مورد نحوه استفاده از Pool صحبت کنیم. وقتی قصد طراحی کدی را دارید، ابتدا نوشتن رابط کاربری (client interface) میتواند به طراحی شما کمک کند. API کد را به گونهای بنویسید که ساختاری برای نحوه فراخوانی آن داشته باشد؛ سپس قابلیتها را در آن ساختار پیادهسازی کنید به جای اینکه ابتدا قابلیتها را پیادهسازی کنید و سپس API عمومی را طراحی کنید.
مشابه روش توسعه مبتنی بر تست که در پروژه فصل ۱۲ استفاده کردیم، اینجا از توسعه مبتنی بر کامپایلر استفاده میکنیم. کدی را که توابع مورد نظرمان را فراخوانی میکند، مینویسیم و سپس به خطاهای کامپایلر نگاه میکنیم تا مشخص کنیم چه تغییراتی باید انجام دهیم تا کد کار کند. با این حال، پیش از انجام این کار، روش دیگری را که قرار نیست استفاده کنیم، به عنوان نقطه شروع بررسی خواهیم کرد.
ایجاد یک Thread جدید برای هر درخواست
ابتدا، بیایید بررسی کنیم که اگر کد ما برای هر اتصال یک Thread جدید ایجاد کند، چگونه به نظر میرسد. همانطور که قبلاً ذکر شد، این طرح نهایی ما نیست به دلیل مشکلاتی که ممکن است با ایجاد تعداد نامحدودی از Threadها پیش بیاید، اما این یک نقطه شروع برای ایجاد یک سرور Multithreaded کارا است. سپس Thread Pool را به عنوان یک بهبود اضافه خواهیم کرد، و مقایسه این دو راهحل آسانتر خواهد بود. لیست ۲۱-۱۱ تغییراتی را که باید در main
انجام دهیم تا برای هر جریان در حلقه for
یک Thread جدید ایجاد کنیم، نشان میدهد.
use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration, }; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { handle_connection(stream); }); } } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }
همانطور که در فصل ۱۶ یاد گرفتید، thread::spawn
یک Thread جدید ایجاد کرده و سپس کد موجود در کلوزر را در Thread جدید اجرا میکند. اگر این کد را اجرا کنید و در مرورگر خود /sleep را باز کنید، سپس / را در دو تب دیگر باز کنید، خواهید دید که درخواستهای / لازم نیست منتظر پایان درخواست /sleep باشند. با این حال، همانطور که ذکر شد، این روش در نهایت سیستم را تحت فشار قرار میدهد زیرا شما تعداد نامحدودی Thread بدون محدودیت ایجاد میکنید.
ممکن است به یاد بیاورید که این دقیقاً همان شرایطی است که async و await در آن میدرخشند! این نکته را در ذهن داشته باشید در حالی که Thread Pool را میسازیم و به این فکر کنید که چگونه این شرایط با async متفاوت یا مشابه خواهد بود.
Creating a Finite Number of Threads
ما میخواهیم Thread Pool ما به روشی مشابه و آشنا کار کند، به طوری که تغییر از استفاده از Threadها به Thread Pool نیاز به تغییرات زیادی در کدی که از API ما استفاده میکند نداشته باشد. لیست ۲۱-۱۲ رابط فرضی برای یک ساختار ThreadPool
را نشان میدهد که میخواهیم به جای thread::spawn
استفاده کنیم.
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
ThreadPool
ما از ThreadPool::new
برای ایجاد یک Thread Pool جدید با تعداد قابل تنظیم Threadها استفاده میکنیم، که در اینجا چهار است. سپس، در حلقه for
، متد pool.execute
رابطی مشابه با thread::spawn
دارد، به طوری که یک Closure را میگیرد که Pool باید برای هر جریان اجرا کند. ما نیاز داریم pool.execute
را پیادهسازی کنیم تا Closure را بگیرد و به یکی از Threadهای موجود در Pool برای اجرا بدهد. این کد هنوز کامپایل نمیشود، اما آن را امتحان میکنیم تا کامپایلر راهنمایی کند که چگونه آن را اصلاح کنیم.
ساخت ThreadPool
با استفاده از توسعه مبتنی بر کامپایلر
تغییرات لیست ۲۱-۱۲ را در فایل src/main.rs اعمال کنید و سپس از خطاهای کامپایلر که توسط cargo check
ارائه میشود برای هدایت توسعه استفاده کنید. اولین خطایی که دریافت میکنیم به صورت زیر است:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
--> src/main.rs:11:16
|
11 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^ use of undeclared type `ThreadPool`
For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error
عالی! این خطا به ما میگوید که به یک نوع یا ماژول ThreadPool
نیاز داریم، بنابراین اکنون آن را خواهیم ساخت. پیادهسازی ThreadPool
ما مستقل از نوع کاری است که وب سرور ما انجام میدهد. بنابراین، بیایید crate hello
را از یک crate باینری به یک crate کتابخانهای تغییر دهیم تا پیادهسازی ThreadPool
خود را در آن قرار دهیم. پس از تغییر به یک crate کتابخانهای، میتوانیم از کتابخانه Thread Pool جداگانه برای هر کاری که میخواهیم با استفاده از Thread Pool انجام دهیم استفاده کنیم، نه فقط برای سرویسدهی به درخواستهای وب.
فایلی به نام src/lib.rs ایجاد کنید که شامل تعریف زیر باشد، که سادهترین تعریف ممکن برای یک ساختار ThreadPool
است:
pub struct ThreadPool;
سپس فایل main.rs را ویرایش کنید تا ThreadPool
را از crate کتابخانهای وارد دامنه کنید. برای این کار کد زیر را به بالای فایل src/main.rs اضافه کنید:
use hello::ThreadPool;
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
این کد همچنان کار نخواهد کرد، اما بیایید دوباره آن را بررسی کنیم تا خطای بعدی که باید برطرف کنیم را ببینیم:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
--> src/main.rs:12:28
|
12 | let pool = ThreadPool::new(4);
| ^^^ function or associated item not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
این خطا نشان میدهد که باید تابع وابستهای به نام new
برای ThreadPool
ایجاد کنیم. همچنین میدانیم که new
باید یک پارامتر داشته باشد که بتواند مقدار 4
را به عنوان آرگومان بپذیرد و یک نمونه از ThreadPool
بازگرداند. بیایید سادهترین تابع new
که این خصوصیات را دارد پیادهسازی کنیم:
pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
}
ما نوع usize
را برای پارامتر size
انتخاب کردیم، زیرا میدانیم که تعداد منفی Threadها منطقی نیست. همچنین میدانیم که این مقدار 4
را به عنوان تعداد عناصر در یک مجموعه از Threadها استفاده خواهیم کرد، که نوع usize
برای آن مناسب است، همانطور که در بخش “نوعهای عدد صحیح” از فصل ۳ توضیح داده شد.
بیایید دوباره کد را بررسی کنیم:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
--> src/main.rs:17:14
|
17 | pool.execute(|| {
| -----^^^^^^^ method not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
اکنون خطا به این دلیل است که متد execute
روی ThreadPool
تعریف نشده است. به یاد بیاورید که در بخش “ایجاد تعداد محدودی از Threadها” تصمیم گرفتیم که Thread Pool ما باید رابطی مشابه thread::spawn
داشته باشد. علاوه بر این، متد execute
را طوری پیادهسازی خواهیم کرد که Closure داده شده را بگیرد و آن را به یک Thread بیکار در Pool برای اجرا بدهد.
ما متد execute
را روی ThreadPool
تعریف میکنیم تا یک Closure را به عنوان پارامتر بپذیرد. به یاد بیاورید که در بخش “انتقال مقادیر گرفتهشده از Closure و ویژگیهای Fn
” از فصل ۱۳ توضیح داده شد که میتوانیم Closureها را با سه ویژگی مختلف به عنوان پارامتر بپذیریم: Fn
، FnMut
، و FnOnce
. باید تصمیم بگیریم که در اینجا از کدام نوع Closure استفاده کنیم. میدانیم که چیزی مشابه با پیادهسازی thread::spawn
در کتابخانه استاندارد انجام خواهیم داد، بنابراین میتوانیم به محدودیتهای امضای thread::spawn
روی پارامترش نگاه کنیم. مستندات به ما موارد زیر را نشان میدهد:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
نوع پارامتر F
همان چیزی است که در اینجا به آن توجه داریم؛ پارامتر نوع T
مربوط به مقدار بازگشتی است و ما به آن توجه نداریم. میتوانیم ببینیم که spawn
از FnOnce
به عنوان محدودیت ویژگی روی F
استفاده میکند. این احتمالاً چیزی است که ما نیز میخواهیم، زیرا در نهایت آرگومان دریافتی در execute
را به spawn
پاس میدهیم. ما اطمینان بیشتری داریم که FnOnce
همان ویژگی مورد نظر ما است، زیرا Thread برای اجرای یک درخواست فقط Closure مربوط به آن درخواست را یک بار اجرا میکند، که با “Once” در FnOnce
مطابقت دارد.
پارامتر نوع F
همچنین دارای محدودیت ویژگی Send
و محدودیت طول عمر 'static
است، که در وضعیت ما مفید هستند: ما به Send
نیاز داریم تا Closure را از یک Thread به Thread دیگر منتقل کنیم و به 'static
نیاز داریم زیرا نمیدانیم اجرای Thread چه مدت طول میکشد. بیایید یک متد execute
روی ThreadPool
ایجاد کنیم که یک پارامتر عمومی از نوع F
با این محدودیتها بپذیرد:
pub struct ThreadPool;
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
ما همچنان از ()
پس از FnOnce
استفاده میکنیم زیرا این FnOnce
نشاندهنده یک Closure است که هیچ پارامتری نمیگیرد و نوع ()
را بازمیگرداند. درست مانند تعریف توابع، میتوان نوع بازگشتی را از امضا حذف کرد، اما حتی اگر هیچ پارامتری نداشته باشیم، همچنان به پرانتزها نیاز داریم.
دوباره، این سادهترین پیادهسازی ممکن برای متد execute
است: این متد هیچ کاری انجام نمیدهد، اما ما فقط تلاش میکنیم کد خود را کامپایل کنیم. بیایید دوباره کد را بررسی کنیم:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s
کد کامپایل میشود! اما توجه داشته باشید که اگر cargo run
را اجرا کنید و در مرورگر یک درخواست ارسال کنید، خطاهایی را در مرورگر خواهید دید که در ابتدای فصل دیده بودیم. کتابخانه ما هنوز Closure پاسدادهشده به execute
را فراخوانی نمیکند!
نکته: یک ضربالمثل درباره زبانهایی با کامپایلرهای سختگیر، مانند Haskell و Rust، این است که “اگر کد کامپایل شود، کار میکند.” اما این ضربالمثل همیشه درست نیست. پروژه ما کامپایل میشود، اما هیچ کاری انجام نمیدهد! اگر در حال ساخت یک پروژه واقعی و کامل بودیم، اکنون زمان خوبی برای شروع نوشتن تستهای واحد بود تا بررسی کنیم که کد هم کامپایل میشود و رفتار مورد نظر ما را دارد.
توجه: اگر قصد داشتیم به جای یک Closure، یک future اجرا کنیم، چه تفاوتی در اینجا وجود داشت؟
اعتبارسنجی تعداد Threadها در new
در حال حاضر، ما هیچ کاری با پارامترهای new
و execute
انجام نمیدهیم. بیایید بدنه این توابع را با رفتار مورد نظر خود پیادهسازی کنیم. ابتدا، به تابع new
فکر کنیم. قبلاً یک نوع عدد صحیح بدون علامت برای پارامتر size
انتخاب کردیم، زیرا یک Pool با تعداد منفی Thread منطقی نیست. با این حال، یک Pool با صفر Thread نیز منطقی نیست، اما صفر یک مقدار معتبر برای usize
است. کدی اضافه خواهیم کرد تا بررسی کند که مقدار size
بیشتر از صفر باشد قبل از اینکه یک نمونه از ThreadPool
بازگردانیم و در صورت دریافت مقدار صفر، برنامه با استفاده از ماکروی assert!
متوقف شود، همانطور که در لیست ۲۱-۱۳ نشان داده شده است.
pub struct ThreadPool;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
ThreadPool
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
ThreadPool::new
برای توقف برنامه در صورت صفر بودن size
ما همچنین برخی مستندات برای ThreadPool
خود با استفاده از نظرات داکیومنت (doc comments) اضافه کردهایم. توجه داشته باشید که ما از اصول خوب مستندسازی پیروی کردهایم و بخشی را اضافه کردهایم که شرایطی که تابع ما ممکن است به وحشت بیفتد (panic) را توضیح میدهد، همانطور که در فصل ۱۴ مورد بحث قرار گرفت. دستور cargo doc --open
را اجرا کنید و روی ساختار ThreadPool
کلیک کنید تا ببینید مستندات تولیدشده برای new
چگونه به نظر میرسند!
به جای اضافه کردن ماکروی assert!
همانطور که اینجا انجام دادیم، میتوانستیم new
را به build
تغییر دهیم و یک Result
بازگردانیم، مانند آنچه با Config::build
در پروژه I/O در لیست ۱۲-۹ انجام دادیم. اما در این مورد تصمیم گرفتهایم که تلاش برای ایجاد یک Thread Pool بدون هیچ Threadی باید یک خطای غیرقابل بازیابی باشد. اگر احساس جاهطلبی میکنید، سعی کنید تابعی به نام build
با امضای زیر بنویسید تا با تابع new
مقایسه کنید:
pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {
ایجاد فضایی برای ذخیره Threadها
اکنون که روشی برای اطمینان از تعداد معتبر Threadهایی که در Pool ذخیره میشوند داریم، میتوانیم این Threadها را ایجاد کرده و آنها را در ساختار ThreadPool
قبل از بازگرداندن ساختار ذخیره کنیم. اما چگونه میتوانیم یک Thread را “ذخیره” کنیم؟ بیایید دوباره به امضای thread::spawn
نگاه کنیم:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
تابع spawn
یک JoinHandle<T>
بازمیگرداند، جایی که T
نوعی است که Closure بازمیگرداند. بیایید ما هم از JoinHandle
استفاده کنیم و ببینیم چه اتفاقی میافتد. در مورد ما، Closureهایی که به Thread Pool ارسال میکنیم اتصال را مدیریت کرده و چیزی بازنمیگردانند، بنابراین T
برابر با نوع واحد ()
خواهد بود.
کد موجود در لیست ۲۱-۱۴ کامپایل میشود اما هنوز هیچ Threadی ایجاد نمیکند. ما تعریف ThreadPool
را تغییر دادهایم تا یک بردار از نمونههای thread::JoinHandle<()>
را نگه دارد، بردار را با ظرفیتی برابر با size
مقداردهی اولیه کردهایم، یک حلقه for
تنظیم کردهایم که کدی برای ایجاد Threadها اجرا میکند، و یک نمونه از ThreadPool
که آنها را در خود دارد بازمیگرداند.
use std::thread;
pub struct ThreadPool {
threads: Vec<thread::JoinHandle<()>>,
}
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut threads = Vec::with_capacity(size);
for _ in 0..size {
// create some threads and store them in the vector
}
ThreadPool { threads }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
ThreadPool
برای نگهداری Threadهاما std::thread
را در crate کتابخانهای وارد دامنه کردهایم، زیرا از thread::JoinHandle
به عنوان نوع آیتمهای موجود در بردار در ThreadPool
استفاده میکنیم.
هنگامی که یک مقدار معتبر برای size
دریافت شود، ThreadPool
ما یک بردار جدید ایجاد میکند که میتواند size
آیتم را در خود جای دهد. تابع with_capacity
همان کار Vec::new
را انجام میدهد اما با یک تفاوت مهم: فضای لازم را از قبل در بردار تخصیص میدهد. چون میدانیم که باید size
عنصر را در بردار ذخیره کنیم، انجام این تخصیص از ابتدا کمی کارآمدتر از استفاده از Vec::new
است که خودش در حین اضافه شدن عناصر تغییر اندازه میدهد.
وقتی دوباره cargo check
را اجرا کنید، باید با موفقیت انجام شود.
ساختار Worker
مسئول ارسال کد از ThreadPool
به یک Thread
در حلقه for
در لیست ۲۱-۱۴، نظری در مورد ایجاد Threadها گذاشتیم. در اینجا بررسی خواهیم کرد که چگونه واقعاً Threadها را ایجاد میکنیم. کتابخانه استاندارد thread::spawn
را به عنوان روشی برای ایجاد Threadها ارائه میدهد، و thread::spawn
انتظار دارد کدی دریافت کند که Thread بلافاصله پس از ایجاد اجرا کند. با این حال، در مورد ما، میخواهیم Threadها را ایجاد کنیم و آنها را منتظر نگه داریم تا کدی که بعداً ارسال میکنیم را اجرا کنند. پیادهسازی Threadها در کتابخانه استاندارد هیچ راهی برای انجام این کار ارائه نمیدهد؛ بنابراین باید آن را به صورت دستی پیادهسازی کنیم.
ما این رفتار را با معرفی یک ساختار داده جدید بین ThreadPool
و Threadها که این رفتار جدید را مدیریت میکند، پیادهسازی خواهیم کرد. این ساختار داده جدید را Worker مینامیم که یک اصطلاح رایج در پیادهسازیهای Pool است. Worker کدی را که باید اجرا شود دریافت میکند و آن را در Thread مربوط به Worker اجرا میکند. میتوانید به افرادی که در آشپزخانه یک رستوران کار میکنند فکر کنید: Workerها منتظر میمانند تا سفارشهایی از مشتریان دریافت کنند، و سپس مسئول گرفتن این سفارشها و انجام آنها هستند.
به جای ذخیره یک بردار از نمونههای JoinHandle<()>
در Thread Pool، ما نمونههایی از ساختار Worker
را ذخیره خواهیم کرد. هر Worker
یک نمونه JoinHandle<()>
را نگه میدارد. سپس یک متد روی Worker
پیادهسازی خواهیم کرد که یک Closure از کد برای اجرا بگیرد و آن را به Thread در حال اجرای Worker برای اجرا ارسال کند. همچنین به هر Worker یک id
اختصاص میدهیم تا هنگام ثبت لاگ یا اشکالزدایی بتوانیم بین Workerهای مختلف در Pool تمایز قائل شویم.
این فرآیند جدیدی است که هنگام ایجاد یک ThreadPool
اتفاق میافتد. کدی که Closure را به Thread ارسال میکند، پس از تنظیم Worker
به این شکل پیادهسازی خواهد شد:
- تعریف یک ساختار
Worker
که یکid
و یکJoinHandle<()>
نگه میدارد. - تغییر
ThreadPool
به طوری که یک بردار از نمونههایWorker
را ذخیره کند. - تعریف یک تابع
Worker::new
که یک عددid
میگیرد و یک نمونهWorker
بازمیگرداند که شاملid
و یک Thread ایجادشده با یک Closure خالی است. - در
ThreadPool::new
، از شمارنده حلقهfor
برای تولید یکid
استفاده کرده، یکWorker
جدید با آنid
ایجاد کرده و Worker را در بردار ذخیره میکنیم.
اگر آماده یک چالش هستید، سعی کنید این تغییرات را خودتان پیادهسازی کنید قبل از اینکه به کد موجود در لیست ۲۱-۱۵ نگاه کنید.
آمادهاید؟ در اینجا لیست ۲۱-۱۵ با یک روش برای انجام اصلاحات قبلی آورده شده است.
use std::thread;
pub struct ThreadPool {
workers: Vec<Worker>,
}
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
ThreadPool
برای نگهداری نمونههای Worker
به جای نگهداری مستقیم Threadهاما نام فیلد موجود در ThreadPool
را از threads
به workers
تغییر دادهایم زیرا اکنون نمونههای Worker
را نگه میدارد، نه نمونههای JoinHandle<()>
. از شمارنده حلقه for
به عنوان آرگومان برای Worker::new
استفاده میکنیم و هر Worker
جدید را در بردار به نام workers
ذخیره میکنیم.
کد خارجی (مانند سرور ما در src/main.rs) نیازی ندارد جزئیات پیادهسازی مربوط به استفاده از ساختار Worker
در داخل ThreadPool
را بداند، بنابراین ساختار Worker
و تابع new
آن را خصوصی میکنیم. تابع Worker::new
از id
دادهشده استفاده کرده و یک نمونه JoinHandle<()>
ایجاد میکند که با ایجاد یک Thread جدید با یک Closure خالی تولید میشود.
نکته: اگر سیستمعامل نتواند به دلیل کمبود منابع سیستم، یک Thread ایجاد کند،
thread::spawn
به وحشت خواهد افتاد (panic). این باعث میشود کل سرور ما به وحشت بیفتد، حتی اگر ایجاد برخی Threadها موفق باشد. برای سادگی، این رفتار مشکلی ندارد، اما در یک پیادهسازی تولیدی برای Thread Pool، احتمالاً ازstd::thread::Builder
و متدspawn
که یکResult
بازمیگرداند، استفاده میکنید.
این کد کامپایل خواهد شد و تعداد نمونههای Worker
را که به عنوان آرگومان به ThreadPool::new
مشخص کردهایم ذخیره میکند. اما ما هنوز Closureی که در execute
دریافت میکنیم را پردازش نمیکنیم. بیایید بررسی کنیم چگونه این کار را انجام دهیم.
ارسال درخواستها به Threadها از طریق Channelها
مشکل بعدی که به آن میپردازیم این است که Closureهایی که به thread::spawn
داده شدهاند، هیچ کاری انجام نمیدهند. در حال حاضر، Closureی که میخواهیم اجرا کنیم را در متد execute
دریافت میکنیم. اما نیاز داریم که یک Closure به thread::spawn
بدهیم تا در هنگام ایجاد هر Worker
در حین ایجاد ThreadPool
اجرا شود.
میخواهیم ساختارهای Worker
که به تازگی ایجاد کردهایم، کدی را که باید اجرا شود از یک صف که در ThreadPool
نگهداری میشود دریافت کرده و آن کد را به Thread خود برای اجرا ارسال کنند.
Channelهایی که در فصل ۱۶ یاد گرفتیم—راهی ساده برای ارتباط بین دو Thread—برای این مورد استفاده مناسب هستند. ما از یک Channel به عنوان صف کارها استفاده خواهیم کرد و execute
یک کار را از ThreadPool
به نمونههای Worker
ارسال میکند، که این کار را به Thread خود ارسال میکنند. برنامه به شرح زیر خواهد بود:
ThreadPool
یک Channel ایجاد کرده و نگهدارنده sender آن خواهد بود.- هر
Worker
نگهدارنده receiver خواهد بود. - یک ساختار
Job
جدید ایجاد خواهیم کرد که Closureهایی که میخواهیم از طریق Channel ارسال کنیم را نگه میدارد. - متد
execute
کاری که میخواهد اجرا کند را از طریق sender ارسال خواهد کرد. - در Thread خود،
Worker
بر receiver خود حلقه زده و Closureهای هر کاری که دریافت میکند را اجرا خواهد کرد.
بیایید با ایجاد یک Channel در ThreadPool::new
و نگهداری sender در نمونه ThreadPool
شروع کنیم، همانطور که در لیست ۲۱-۱۶ نشان داده شده است. ساختار Job
در حال حاضر چیزی نگه نمیدارد، اما نوع آیتمی خواهد بود که از طریق Channel ارسال میکنیم.
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
ThreadPool
برای ذخیره sender یک Channel که نمونههای Job
را منتقل میکنددر ThreadPool::new
، یک Channel جدید ایجاد میکنیم و Pool نگهدارنده sender خواهد بود. این کد با موفقیت کامپایل میشود.
بیایید تلاش کنیم یک receiver از Channel را به هر Worker در هنگام ایجاد Channel توسط Thread Pool ارسال کنیم. میدانیم که میخواهیم receiver را در Threadی که Workerها ایجاد میکنند استفاده کنیم، بنابراین به پارامتر receiver
در Closure ارجاع میدهیم. کد موجود در لیست ۲۱-۱۷ هنوز کاملاً کامپایل نخواهد شد.
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
ما تغییرات کوچک و واضحی ایجاد کردهایم: receiver را به Worker::new
ارسال کردهایم و سپس از آن در داخل Closure استفاده کردهایم.
هنگامی که تلاش میکنیم این کد را بررسی کنیم، با این خطا مواجه میشویم:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
--> src/lib.rs:26:42
|
21 | let (sender, receiver) = mpsc::channel();
| -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 | for id in 0..size {
| ----------------- inside of this loop
26 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here, in previous iteration of loop
|
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
--> src/lib.rs:47:33
|
47 | fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
| --- in this method ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
|
25 ~ let mut value = Worker::new(id, receiver);
26 ~ for id in 0..size {
27 ~ workers.push(value);
|
For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error
کد در تلاش است receiver
را به چندین نمونه Worker
منتقل کند. این کار امکانپذیر نیست، همانطور که در فصل ۱۶ بحث شد: پیادهسازی کانال (channel) که Rust ارائه میدهد، از نوع چند تولیدکننده (multiple producer) و یک مصرفکننده (single consumer) است. این به این معنی است که نمیتوانیم به سادگی بخش مصرفکننده کانال را برای رفع این کد کپی کنیم. همچنین نمیخواهیم یک پیام را چندین بار به چند مصرفکننده ارسال کنیم؛ بلکه میخواهیم یک لیست از پیامها داشته باشیم که چندین Worker آن را پردازش کنند بهگونهای که هر پیام فقط یک بار پردازش شود.
علاوه بر این، برداشتن یک کار از صف کانال شامل تغییر receiver
میشود، بنابراین Threadها به یک روش امن برای اشتراک و تغییر receiver
نیاز دارند؛ در غیر این صورت، ممکن است با شرایط رقابتی (race conditions) مواجه شویم (همانطور که در فصل ۱۶ توضیح داده شد).
با یادآوری اشارهگر (Pointer)های هوشمند ایمن برای Threadها که در فصل ۱۶ معرفی شدند: برای اشتراک مالکیت میان چندین Thread و اجازه تغییر مقدار، نیاز به استفاده از Arc<Mutex<T>>
داریم. نوع Arc
به چندین Worker اجازه میدهد مالکیت receiver
را به اشتراک بگذارند و Mutex
تضمین میکند که فقط یک Worker در هر لحظه یک کار را از receiver
دریافت کند. لیست ۲۱-۱۸ تغییراتی را که باید اعمال کنیم نشان میدهد.
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
// --snip--
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --snip--
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
receiver
بین Workerها با استفاده از Arc
و Mutex
در ThreadPool::new
، receiver
را در یک Arc
و یک Mutex
قرار میدهیم. برای هر Worker جدید، Arc
را کپی میکنیم تا شمارنده مرجع افزایش یابد و Workerها بتوانند مالکیت receiver
را به اشتراک بگذارند.
با این تغییرات، کد کامپایل میشود! به نتیجه نزدیکتر میشویم!
پیادهسازی متد execute
در نهایت، بیایید متد execute
را روی ThreadPool
پیادهسازی کنیم. همچنین Job
را از یک ساختار به یک نام مستعار نوع (type alias) برای یک شیء ویژگی تغییر خواهیم داد که نوع Closureی که execute
دریافت میکند را نگه میدارد. همانطور که در بخش “ایجاد مترادفهای نوع با نام مستعار” از فصل ۲۰ بحث شد، نامهای مستعار نوع به ما امکان میدهند تایپهای طولانی را برای استفاده آسانتر کوتاه کنیم. به لیست ۲۱-۱۹ نگاه کنید.
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
// --snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
Job
برای یک Box
که هر Closure را نگه میدارد و سپس ارسال کار از طریق کانالپس از ایجاد یک نمونه جدید Job
با استفاده از Closureی که در execute
دریافت میکنیم، آن کار را از طریق بخش ارسالکننده کانال ارسال میکنیم. ما برای حالتی که ارسال شکست بخورد، روی send
از unwrap
استفاده میکنیم. این حالت ممکن است رخ دهد، اگر مثلاً همه Threadهای ما از اجرا متوقف شوند، به این معنی که بخش دریافتکننده دیگر پیامهای جدید را دریافت نمیکند. در حال حاضر، نمیتوانیم Threadهای خود را از اجرا متوقف کنیم: Threadهای ما تا زمانی که Pool وجود دارد اجرا میشوند. دلیل استفاده از unwrap
این است که میدانیم حالت شکست رخ نخواهد داد، اما کامپایلر این موضوع را نمیداند.
اما هنوز کاملاً کار تمام نشده است! در Worker، Closureی که به thread::spawn
ارسال میشود همچنان فقط به بخش دریافتکننده کانال اشاره میکند. در عوض، باید Closure به طور مداوم حلقه بزند، از بخش دریافتکننده کانال درخواست یک کار کند و کار را هنگام دریافت اجرا کند. بیایید تغییرات نشان دادهشده در لیست ۲۱-۲۰ را به Worker::new
اعمال کنیم.
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
});
Worker { id, thread }
}
}
در اینجا، ابتدا lock
را روی receiver
فراخوانی میکنیم تا mutex را به دست آوریم، و سپس unwrap
را فراخوانی میکنیم تا در صورت بروز هرگونه خطا، برنامه متوقف شود. به دست آوردن یک قفل ممکن است شکست بخورد اگر mutex در یک وضعیت poisoned باشد، که ممکن است اتفاق بیفتد اگر یک Thread دیگر در حالی که قفل را نگه داشته است به جای آزاد کردن آن متوقف شده باشد. در این شرایط، فراخوانی unwrap
برای متوقف کردن این Thread اقدام درستی است. میتوانید این unwrap
را به یک expect
با یک پیام خطای معنادار برای خود تغییر دهید.
اگر قفل روی mutex را به دست آوریم، recv
را فراخوانی میکنیم تا یک Job
را از کانال دریافت کنیم. یک unwrap
نهایی نیز در اینجا هر گونه خطا را برطرف میکند، که ممکن است رخ دهد اگر Threadی که sender را نگه داشته است خاموش شود، مشابه نحوهای که متد send
در صورت خاموش شدن receiver یک Err
بازمیگرداند.
فراخوانی recv
مسدود میشود، بنابراین اگر هنوز هیچ کاری وجود نداشته باشد، Thread فعلی منتظر میماند تا یک کار در دسترس قرار گیرد. Mutex<T>
تضمین میکند که در هر لحظه فقط یک Thread Worker
در تلاش برای درخواست یک کار است.
Thread Pool ما اکنون در وضعیت کاری قرار دارد! دستور cargo run
را اجرا کنید و چندین درخواست ارسال کنید:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
warning: field `workers` is never read
--> src/lib.rs:7:5
|
6 | pub struct ThreadPool {
| ---------- field in this struct
7 | workers: Vec<Worker>,
| ^^^^^^^
|
= note: `#[warn(dead_code)]` on by default
warning: fields `id` and `thread` are never read
--> src/lib.rs:48:5
|
47 | struct Worker {
| ------ fields in this struct
48 | id: usize,
| ^^
49 | thread: thread::JoinHandle<()>,
| ^^^^^^
warning: `hello` (lib) generated 2 warnings
Finished `dev` profile [unoptimized + debuginfo] target(s) in 4.91s
Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
موفقیت! اکنون یک Thread Pool داریم که اتصالات را به صورت همزمان اجرا میکند. هرگز بیش از چهار Thread ایجاد نمیشود، بنابراین اگر سرور درخواستهای زیادی دریافت کند، سیستم ما بارگذاری بیش از حد نخواهد شد. اگر یک درخواست به /sleep ارسال کنیم، سرور میتواند با استفاده از یک Thread دیگر به سایر درخواستها پاسخ دهد.
نکته: اگر /sleep را به طور همزمان در چندین پنجره مرورگر باز کنید، ممکن است یکی پس از دیگری در فواصل ۵ ثانیهای بارگذاری شوند. برخی مرورگرهای وب به دلایل مربوط به کش، چندین نمونه از همان درخواست را به صورت متوالی اجرا میکنند. این محدودیت توسط وب سرور ما ایجاد نشده است.
این زمان خوبی است که مکث کنیم و بررسی کنیم چگونه کدهای لیستهای ۲۱-۱۸، ۲۱-۱۹ و ۲۱-۲۰ اگر به جای Closure از futures برای انجام کار استفاده میکردیم، متفاوت میبود. چه نوعهایی تغییر میکردند؟ آیا امضاهای متدها تغییر میکردند؟ کدام بخشهای کد همانگونه باقی میماندند؟
پس از یادگیری حلقه while let
در فصلهای ۱۷ و ۱۸، ممکن است تعجب کنید چرا کد Thread Worker را مانند لیست ۲۱-۲۱ ننوشتیم.
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
while let Ok(job) = receiver.lock().unwrap().recv() {
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
Worker::new
با استفاده از while let
این کد کامپایل میشود و اجرا میشود، اما منجر به رفتار مورد نظر برای threading نمیشود: یک درخواست کند همچنان باعث میشود سایر درخواستها برای پردازش منتظر بمانند. دلیل آن کمی ظریف است: ساختار Mutex
متد عمومی unlock
ندارد، زیرا مالکیت قفل بر اساس طول عمر MutexGuard<T>
درون LockResult<MutexGuard<T>>
که متد lock
بازمیگرداند است. در زمان کامپایل، بررسیکننده وام میتواند این قانون را اعمال کند که منبعی که توسط یک Mutex
محافظت میشود نمیتواند دسترسی پیدا کند مگر اینکه قفل را نگه داشته باشیم. با این حال، این پیادهسازی همچنین میتواند منجر به نگهداشتن قفل بیش از حد انتظار شود اگر به طول عمر MutexGuard<T>
توجه نکنیم.
کد موجود در لیست ۲۱-۲۰ که از let job = receiver.lock().unwrap().recv().unwrap();
استفاده میکند کار میکند زیرا با let
، هر مقدار موقتی استفادهشده در عبارت سمت راست علامت برابر بلافاصله پس از پایان دستور let
حذف میشود. با این حال، while let
(و همچنین if let
و match
) مقادیر موقتی را تا پایان بلوک مرتبط حذف نمیکند. در لیست ۲۱-۲۱، قفل در طول فراخوانی به job()
نگه داشته میشود، به این معنی که سایر Workerها نمیتوانند کار دریافت کنند.