تبدیل سرور Single-Threaded به یک سرور Multithreaded
در حال حاضر، سرور هر درخواست را بهصورت ترتیبی پردازش میکند،
یعنی تا زمانی که پردازش درخواست اول به پایان نرسد، درخواست دوم پردازش نخواهد شد.
اگر سرور درخواستهای بیشتری دریافت کند، این اجرای سریالی به مرور زمان کمتر بهینه خواهد بود.
اگر سروری درخواستی دریافت کند که پردازش آن زمان زیادی ببرد،
درخواستهای بعدی باید تا پایان پردازش آن درخواست طولانی صبر کنند،
حتی اگر درخواستهای جدید بتوانند سریعتر پردازش شوند.
ما باید این مشکل را برطرف کنیم، اما ابتدا مشکل را به صورت عملی بررسی میکنیم.
شبیهسازی یک درخواست کند
میخواهیم ببینیم چگونه یک درخواست با پردازش کند میتواند بر درخواستهای دیگر به سرور فعلی ما تأثیر بگذارد.
لیستینگ 21-10 نحوهی رسیدگی به درخواستی به مسیر /sleep را پیادهسازی میکند
که با یک پاسخ شبیهسازی شدهی کند، باعث میشود سرور پنج ثانیه قبل از پاسخ دادن بخوابد.
use std::{ fs, io::{BufReader, prelude::*}, net::{TcpListener, TcpStream}, thread, time::Duration, }; // --snip-- fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { // --snip-- let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; // --snip-- let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }
حالا که سه حالت داریم، به جای if
از match
استفاده کردهایم.
باید صریحاً روی یک slice
از request_line
تطبیق الگو (pattern matching) انجام دهیم
تا بتوانیم با مقادیر رشتهای literal تطبیق دهیم؛
چون match
مانند متد برابری بهصورت خودکار رفرنسگذاری و dereference نمیکند.
بازوی اول مشابه بلاک if
در لیستینگ 21-9 است.
بازوی دوم با درخواستی به مسیر /sleep مطابقت دارد.
وقتی این درخواست دریافت شود، سرور به مدت پنج ثانیه میخوابد
و سپس صفحهی HTML موفقیتآمیز را رندر میکند.
بازوی سوم همانند بلاک else
در لیستینگ 21-9 است.
میتوانید ببینید که سرور ما چقدر ابتدایی است: کتابخانههای واقعی مدیریت تشخیص درخواستهای متعدد را به روشی بسیار کمتر پرحرف انجام میدهند!
سرور را با دستور cargo run
اجرا کنید.
سپس دو پنجرهی مرورگر باز کنید: یکی برای آدرس http://127.0.0.1:7878 و دیگری برای http://127.0.0.1:7878/sleep.
اگر چند بار آدرس / را وارد کنید، مانند قبل پاسخ سریع دریافت خواهید کرد.
اما اگر ابتدا آدرس /sleep را وارد کنید و سپس آدرس / را بارگذاری کنید،
خواهید دید که / تا پایان پنج ثانیه خوابیدن sleep
منتظر میماند و سپس بارگذاری میشود.
بهبود توان عملیاتی با یک Thread Pool
یک Thread Pool گروهی از Threadهای ایجادشده است که منتظر و آماده برای مدیریت یک وظیفه هستند. وقتی برنامه یک وظیفه جدید دریافت میکند، یکی از Threadهای موجود در Pool به وظیفه اختصاص داده میشود و آن Thread وظیفه را پردازش میکند. Threadهای باقیمانده در Pool در دسترس هستند تا هر وظیفه دیگری که وارد شود را در حالی که Thread اول وظیفه خود را پردازش میکند، مدیریت کنند. وقتی Thread اول پردازش وظیفه خود را به پایان میرساند، به Pool Threadهای بیکار بازمیگردد و آماده برای مدیریت یک وظیفه جدید است. یک Thread Pool به شما امکان میدهد اتصالات را به صورت همزمان پردازش کنید و توان عملیاتی سرور خود را افزایش دهید.
ما تعداد Threadهای موجود در Pool را به یک عدد کوچک محدود خواهیم کرد تا از حملات Denial of Service (DoS) محافظت کنیم؛ اگر برنامه ما برای هر درخواست جدید یک Thread ایجاد کند، کسی که ۱۰ میلیون درخواست به سرور ما ارسال کند میتواند با استفاده از تمام منابع سرور، پردازش درخواستها را متوقف کند.
برای محافظت در برابر حملات DoS، تعداد threadهای موجود در thread pool را محدود میکنیم؛ زیرا اگر برنامهی ما برای هر درخواست یک thread جدید بسازد، کسی که ۱۰ میلیون درخواست به سرور ما ارسال کند میتواند با مصرف همهی منابع سرور، عملیات پردازش درخواستها را کاملاً متوقف کند.
به جای ایجاد threadهای نامحدود، یک تعداد ثابت thread در pool خواهیم داشت که منتظر میمانند.
درخواستهای ورودی به این pool ارسال میشوند تا پردازش شوند.
pool صفی از درخواستهای ورودی را نگه میدارد.
هر یک از threadهای pool یک درخواست را از صف بیرون میکشد، آن را پردازش میکند،
و سپس درخواست بعدی را از صف دریافت میکند.
با این طراحی، میتوانیم تا N
درخواست را بهصورت همزمان پردازش کنیم،
که N
برابر با تعداد threadها است.
اگر هر thread در حال پاسخ دادن به یک درخواست طولانی باشد،
درخواستهای بعدی میتوانند در صف منتظر بمانند،
اما ما تعداد درخواستهای طولانی که میتوانیم قبل از رسیدن به این نقطه پردازش کنیم را افزایش دادهایم.
این تکنیک یکی از روشهای متعددی است که برای افزایش throughput یک وب سرور وجود دارد. گزینههای دیگری که میتوانید بررسی کنید عبارتاند از مدل fork/join، مدل async I/O تکنخی، و مدل async I/O چندنخی. اگر به این موضوع علاقهمند هستید، میتوانید دربارهی راهحلهای دیگر مطالعه کنید و سعی کنید آنها را پیادهسازی کنید؛ با زبانی سطح پایین مانند Rust، تمام این گزینهها ممکن هستند.
مشابه روش توسعه مبتنی بر تست که در پروژه فصل ۱۲ استفاده کردیم، اینجا از توسعه مبتنی بر کامپایلر استفاده میکنیم. کدی را که توابع مورد نظرمان را فراخوانی میکند، مینویسیم و سپس به خطاهای کامپایلر نگاه میکنیم تا مشخص کنیم چه تغییراتی باید انجام دهیم تا کد کار کند. با این حال، پیش از انجام این کار، روش دیگری را که قرار نیست استفاده کنیم، به عنوان نقطه شروع بررسی خواهیم کرد.
ایجاد یک thread برای هر درخواست
ابتدا بیایید ببینیم کد ما چگونه خواهد بود اگر برای هر اتصال یک thread جدید ایجاد کند. همانطور که قبلاً گفته شد، این برنامهی نهایی ما نیست به دلیل مشکلات احتمالی ایجاد تعداد نامحدود thread، اما نقطهی شروع خوبی برای داشتن یک سرور چندنخی عملی است. سپس بهعنوان بهبود، thread pool را اضافه خواهیم کرد و مقایسهی این دو راهحل سادهتر خواهد بود.
لیستینگ 21-11 تغییرات لازم در تابع main
را نشان میدهد تا برای هر stream در حلقهی for
یک thread جدید ایجاد کند و آن را مدیریت نماید.
use std::{ fs, io::{BufReader, prelude::*}, net::{TcpListener, TcpStream}, thread, time::Duration, }; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { handle_connection(stream); }); } } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }
همانطور که در فصل ۱۶ یاد گرفتید، thread::spawn
یک Thread جدید ایجاد کرده و سپس کد موجود در کلوزر را در Thread جدید اجرا میکند. اگر این کد را اجرا کنید و در مرورگر خود /sleep را باز کنید، سپس / را در دو تب دیگر باز کنید، خواهید دید که درخواستهای / لازم نیست منتظر پایان درخواست /sleep باشند. با این حال، همانطور که ذکر شد، این روش در نهایت سیستم را تحت فشار قرار میدهد زیرا شما تعداد نامحدودی Thread بدون محدودیت ایجاد میکنید.
ممکن است به یاد بیاورید که این دقیقاً همان شرایطی است که async و await در آن میدرخشند! این نکته را در ذهن داشته باشید در حالی که Thread Pool را میسازیم و به این فکر کنید که چگونه این شرایط با async متفاوت یا مشابه خواهد بود.
Creating a Finite Number of Threads
میخواهیم thread pool ما به روشی مشابه و آشنا کار کند،
بهطوری که تغییر از استفادهی مستقیم از threadها به استفاده از thread pool
نیاز به تغییرات بزرگ در کدی که از API ما استفاده میکند نداشته باشد.
لیستینگ 21-12 رابط فرضی ساختار ThreadPool
را نشان میدهد
که میخواهیم به جای thread::spawn
از آن استفاده کنیم.
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
ThreadPool
ما با استفاده از ThreadPool::new
یک thread pool جدید با تعداد قابل تنظیم thread ایجاد میکنیم،
در این مثال تعداد چهار thread است.
سپس در حلقهی for
، متد pool.execute
رابطی مشابه با thread::spawn
دارد،
که یک closure میگیرد و pool باید آن را برای هر stream اجرا کند.
ما باید pool.execute
را پیادهسازی کنیم تا این closure را بگیرد
و به یک thread در pool بدهد تا اجرا شود.
این کد هنوز کامپایل نخواهد شد، اما این کار را انجام میدهیم تا کامپایلر ما را در رفع خطاها راهنمایی کند.
ساخت ThreadPool
با استفاده از توسعهی مبتنی بر خطاهای کامپایلر
تغییرات لیستینگ 21-12 را در فایل src/main.rs اعمال کنید،
سپس اجازه دهید خطاهای کامپایلر از دستور cargo check
روند توسعهی ما را هدایت کنند.
در اینجا اولین خطایی که دریافت میکنیم آمده است:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
--> src/main.rs:11:16
|
11 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^ use of undeclared type `ThreadPool`
For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error
عالی! این خطا به ما میگوید که به یک نوع یا ماژول ThreadPool
نیاز داریم،
پس اکنون یکی میسازیم. پیادهسازی ThreadPool
ما مستقل از نوع کاری است که وبسرور انجام میدهد.
بنابراین، بیایید crate hello
را از یک crate دودویی به یک crate کتابخانهای تبدیل کنیم
تا پیادهسازی ThreadPool
را در آن قرار دهیم.
پس از تبدیل به crate کتابخانهای، میتوانیم از این کتابخانهی thread pool جداگانه برای هر کاری که میخواهیم با thread pool انجام دهیم استفاده کنیم،
نه فقط برای سرویسدهی به درخواستهای وب.
یک فایل src/lib.rs ایجاد کنید که شامل کد زیر باشد،
که سادهترین تعریف ممکن برای ThreadPool
است که فعلاً میتوانیم داشته باشیم:
pub struct ThreadPool;
سپس فایل main.rs را ویرایش کنید تا با افزودن کد زیر به بالای فایل src/main.rs،
ThreadPool
را از crate کتابخانهای وارد حوزه (scope) کنید:
use hello::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
این کد همچنان کار نخواهد کرد، اما بیایید دوباره آن را بررسی کنیم تا خطای بعدی که باید برطرف کنیم را ببینیم:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
--> src/main.rs:12:28
|
12 | let pool = ThreadPool::new(4);
| ^^^ function or associated item not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
این خطا نشان میدهد که باید تابع وابستهای به نام new
برای ThreadPool
ایجاد کنیم. همچنین میدانیم که new
باید یک پارامتر داشته باشد که بتواند مقدار 4
را به عنوان آرگومان بپذیرد و یک نمونه از ThreadPool
بازگرداند. بیایید سادهترین تابع new
که این خصوصیات را دارد پیادهسازی کنیم:
pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
}
ما نوع پارامتر size
را usize
انتخاب کردیم چون میدانیم تعداد منفی thread منطقی نیست.
همچنین میدانیم که این مقدار 4
را بهعنوان تعداد عناصر در یک مجموعه از threadها استفاده خواهیم کرد،
که برای همین منظور نوع usize
مناسب است،
همانطور که در بخش “انواع عدد صحیح” در فصل ۳ توضیح داده شده است.
بیایید دوباره کد را بررسی کنیم:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
--> src/main.rs:17:14
|
17 | pool.execute(|| {
| -----^^^^^^^ method not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
اکنون خطا رخ میدهد چون متد execute
روی ThreadPool
تعریف نشده است.
به یاد بیاورید از بخش “ایجاد تعداد محدودی thread”
که تصمیم گرفتیم رابط کاربری thread pool ما شبیه به thread::spawn
باشد.
علاوه بر این، متد execute
را پیادهسازی خواهیم کرد تا closure دریافتشده را بگیرد
و آن را به یک thread بیکار در pool بدهد تا اجرا کند.
متد execute
را روی ThreadPool
تعریف میکنیم تا یک closure را بهعنوان پارامتر بگیرد.
به یاد بیاورید از بخش “انتقال مقادیر گرفتهشده از closure و traitهای Fn
” در فصل ۱۳
که میتوانیم closureها را با سه trait مختلف بهعنوان پارامتر بگیریم: Fn
، FnMut
و FnOnce
.
باید تصمیم بگیریم در اینجا از کدام نوع closure استفاده کنیم.
میدانیم که قرار است کاری مشابه پیادهسازی thread::spawn
در کتابخانه استاندارد انجام دهیم،
پس میتوانیم به محدودیتهایی که امضای تابع thread::spawn
روی پارامترش دارد نگاه کنیم.
مستندات به ما موارد زیر را نشان میدهد:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
نوع پارامتر F
همان چیزی است که در اینجا به آن توجه داریم؛ پارامتر نوع T
مربوط به مقدار بازگشتی است و ما به آن توجه نداریم. میتوانیم ببینیم که spawn
از FnOnce
به عنوان محدودیت ویژگی روی F
استفاده میکند. این احتمالاً چیزی است که ما نیز میخواهیم، زیرا در نهایت آرگومان دریافتی در execute
را به spawn
پاس میدهیم. ما اطمینان بیشتری داریم که FnOnce
همان ویژگی مورد نظر ما است، زیرا Thread برای اجرای یک درخواست فقط Closure مربوط به آن درخواست را یک بار اجرا میکند، که با “Once” در FnOnce
مطابقت دارد.
پارامتر نوع F
همچنین دارای محدودیت ویژگی Send
و محدودیت طول عمر 'static
است، که در وضعیت ما مفید هستند: ما به Send
نیاز داریم تا Closure را از یک Thread به Thread دیگر منتقل کنیم و به 'static
نیاز داریم زیرا نمیدانیم اجرای Thread چه مدت طول میکشد. بیایید یک متد execute
روی ThreadPool
ایجاد کنیم که یک پارامتر عمومی از نوع F
با این محدودیتها بپذیرد:
pub struct ThreadPool;
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
ما همچنان از ()
پس از FnOnce
استفاده میکنیم زیرا این FnOnce
نشاندهنده یک Closure است که هیچ پارامتری نمیگیرد و نوع ()
را بازمیگرداند. درست مانند تعریف توابع، میتوان نوع بازگشتی را از امضا حذف کرد، اما حتی اگر هیچ پارامتری نداشته باشیم، همچنان به پرانتزها نیاز داریم.
دوباره، این سادهترین پیادهسازی متد execute
است: هیچ کاری انجام نمیدهد،
اما هدف ما فقط این است که کدمان کامپایل شود.
بیایید دوباره آن را بررسی کنیم:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s
کد کامپایل میشود! اما توجه داشته باشید که اگر cargo run
را اجرا کنید و در مرورگر یک درخواست ارسال کنید، خطاهایی را در مرورگر خواهید دید که در ابتدای فصل دیده بودیم. کتابخانه ما هنوز Closure پاسدادهشده به execute
را فراخوانی نمیکند!
نکته: یک ضربالمثل درباره زبانهایی با کامپایلرهای سختگیر، مانند Haskell و Rust، این است که “اگر کد کامپایل شود، کار میکند.” اما این ضربالمثل همیشه درست نیست. پروژه ما کامپایل میشود، اما هیچ کاری انجام نمیدهد! اگر در حال ساخت یک پروژه واقعی و کامل بودیم، اکنون زمان خوبی برای شروع نوشتن تستهای واحد بود تا بررسی کنیم که کد هم کامپایل میشود و رفتار مورد نظر ما را دارد.
فرض کنید: اگر قرار بود به جای closure، یک future اجرا کنیم، چه تفاوتهایی در اینجا وجود داشت؟
اعتبارسنجی تعداد Threadها در new
فعلاً با پارامترهای new
و execute
کاری انجام نمیدهیم.
بیایید بدنهی این توابع را با رفتار مورد نظر پیادهسازی کنیم.
برای شروع، به تابع new
فکر کنیم.
قبلاً برای پارامتر size
نوع بدون علامت (unsigned) را انتخاب کردیم،
چون یک pool با تعداد منفی thread منطقی نیست.
اما یک pool با صفر thread نیز منطقی نیست،
با اینکه صفر یک مقدار معتبر از نوع usize
است.
ما کدی اضافه خواهیم کرد که بررسی کند مقدار size
بزرگتر از صفر باشد،
و اگر صفر دریافت شد، با استفاده از ماکروی assert!
برنامه panic کند،
همانطور که در لیستینگ 21-13 نشان داده شده است.
pub struct ThreadPool;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
ThreadPool
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
ThreadPool::new
برای توقف برنامه در صورت صفر بودن size
ما همچنین برخی مستندات برای ThreadPool
خود با استفاده از نظرات داکیومنت (doc comments) اضافه کردهایم. توجه داشته باشید که ما از اصول خوب مستندسازی پیروی کردهایم و بخشی را اضافه کردهایم که شرایطی که تابع ما ممکن است به وحشت بیفتد (panic) را توضیح میدهد، همانطور که در فصل ۱۴ مورد بحث قرار گرفت. دستور cargo doc --open
را اجرا کنید و روی ساختار ThreadPool
کلیک کنید تا ببینید مستندات تولیدشده برای new
چگونه به نظر میرسند!
به جای اضافه کردن ماکروی assert!
همانطور که اینجا انجام دادیم، میتوانستیم new
را به build
تغییر دهیم و یک Result
بازگردانیم، مانند آنچه با Config::build
در پروژه I/O در لیست ۱۲-۹ انجام دادیم. اما در این مورد تصمیم گرفتهایم که تلاش برای ایجاد یک Thread Pool بدون هیچ Threadی باید یک خطای غیرقابل بازیابی باشد. اگر احساس جاهطلبی میکنید، سعی کنید تابعی به نام build
با امضای زیر بنویسید تا با تابع new
مقایسه کنید:
pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {
ایجاد فضایی برای ذخیره Threadها
اکنون که روشی برای اطمینان از تعداد معتبر Threadهایی که در Pool ذخیره میشوند داریم، میتوانیم این Threadها را ایجاد کرده و آنها را در ساختار ThreadPool
قبل از بازگرداندن ساختار ذخیره کنیم. اما چگونه میتوانیم یک Thread را “ذخیره” کنیم؟ بیایید دوباره به امضای thread::spawn
نگاه کنیم:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
تابع spawn
یک JoinHandle<T>
بازمیگرداند، جایی که T
نوعی است که Closure بازمیگرداند. بیایید ما هم از JoinHandle
استفاده کنیم و ببینیم چه اتفاقی میافتد. در مورد ما، Closureهایی که به Thread Pool ارسال میکنیم اتصال را مدیریت کرده و چیزی بازنمیگردانند، بنابراین T
برابر با نوع واحد ()
خواهد بود.
کد موجود در لیست ۲۱-۱۴ کامپایل میشود اما هنوز هیچ Threadی ایجاد نمیکند. ما تعریف ThreadPool
را تغییر دادهایم تا یک بردار از نمونههای thread::JoinHandle<()>
را نگه دارد، بردار را با ظرفیتی برابر با size
مقداردهی اولیه کردهایم، یک حلقه for
تنظیم کردهایم که کدی برای ایجاد Threadها اجرا میکند، و یک نمونه از ThreadPool
که آنها را در خود دارد بازمیگرداند.
use std::thread;
pub struct ThreadPool {
threads: Vec<thread::JoinHandle<()>>,
}
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut threads = Vec::with_capacity(size);
for _ in 0..size {
// create some threads and store them in the vector
}
ThreadPool { threads }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
ThreadPool
برای نگهداری Threadهاما std::thread
را در crate کتابخانهای وارد حوزه کردهایم،
چون از thread::JoinHandle
بهعنوان نوع آیتمهای موجود در بردار داخل ThreadPool
استفاده میکنیم.
پس از دریافت مقدار معتبر برای size
، ThreadPool
ما یک بردار جدید ایجاد میکند
که میتواند size
آیتم را در خود نگه دارد.
تابع with_capacity
همان کاری را انجام میدهد که Vec::new
انجام میدهد،
اما با یک تفاوت مهم: فضای لازم را از پیش در بردار تخصیص میدهد.
از آنجایی که میدانیم باید size
عنصر در بردار ذخیره کنیم،
انجام این تخصیص پیشاپیش کمی کارآمدتر از استفاده از Vec::new
است،
که هنگام وارد کردن عناصر، اندازهی خود را تغییر میدهد.
وقتی دوباره cargo check
را اجرا کنید، باید با موفقیت انجام شود.
ساختار Worker
مسئول ارسال کد از ThreadPool
به یک Thread
در حلقه for
در لیست ۲۱-۱۴، نظری در مورد ایجاد Threadها گذاشتیم. در اینجا بررسی خواهیم کرد که چگونه واقعاً Threadها را ایجاد میکنیم. کتابخانه استاندارد thread::spawn
را به عنوان روشی برای ایجاد Threadها ارائه میدهد، و thread::spawn
انتظار دارد کدی دریافت کند که Thread بلافاصله پس از ایجاد اجرا کند. با این حال، در مورد ما، میخواهیم Threadها را ایجاد کنیم و آنها را منتظر نگه داریم تا کدی که بعداً ارسال میکنیم را اجرا کنند. پیادهسازی Threadها در کتابخانه استاندارد هیچ راهی برای انجام این کار ارائه نمیدهد؛ بنابراین باید آن را به صورت دستی پیادهسازی کنیم.
این رفتار را با معرفی یک ساختار دادهی جدید بین ThreadPool
و threadها پیادهسازی میکنیم که این رفتار جدید را مدیریت کند.
این ساختار داده را Worker مینامیم که اصطلاح رایجی در پیادهسازیهای pooling است.
Worker
کدی که باید اجرا شود را دریافت میکند و آن را در thread خودش اجرا میکند.
این را مانند افرادی در آشپزخانهی یک رستوران تصور کنید: کارگران منتظر میمانند تا سفارشها از مشتریان برسد، سپس مسئول پذیرش و آمادهسازی آن سفارشها هستند.
به جای نگهداشتن یک بردار از نمونههای JoinHandle<()>
در thread pool،
نمونههای Worker
را ذخیره خواهیم کرد.
هر Worker
یک نمونهی تک JoinHandle<()>
نگه میدارد.
سپس متدی روی Worker
پیادهسازی میکنیم که یک closure از کد برای اجرا بگیرد
و آن را به thread در حال اجرای مربوطه برای اجرا ارسال کند.
همچنین به هر Worker
یک id
اختصاص میدهیم تا بتوانیم هنگام لاگگیری یا اشکالزدایی،
بین نمونههای مختلف Worker
در pool تمایز قائل شویم.
این فرآیند جدیدی است که هنگام ایجاد یک ThreadPool
اتفاق میافتد. کدی که Closure را به Thread ارسال میکند، پس از تنظیم Worker
به این شکل پیادهسازی خواهد شد:
۱. یک struct
به نام Worker
تعریف کنید که شامل یک فیلد id
و یک JoinHandle<()>
باشد.
۲. ساختار ThreadPool
را تغییر دهید تا یک بردار از نمونههای Worker
نگه دارد.
۳. تابعی به نام Worker::new
تعریف کنید که یک شمارهی id
بگیرد و یک نمونه Worker
بازگرداند
که شامل آن id
و یک thread ساخته شده با یک closure خالی باشد.
۴. در تابع ThreadPool::new
، از شمارنده حلقهی for
برای تولید id
استفاده کنید،
یک Worker
جدید با آن id
بسازید و آن را در بردار ذخیره کنید.
اگر آماده یک چالش هستید، سعی کنید این تغییرات را خودتان پیادهسازی کنید قبل از اینکه به کد موجود در لیست ۲۱-۱۵ نگاه کنید.
آمادهاید؟ در اینجا لیست ۲۱-۱۵ با یک روش برای انجام اصلاحات قبلی آورده شده است.
use std::thread;
pub struct ThreadPool {
workers: Vec<Worker>,
}
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
ThreadPool
برای نگهداری نمونههای Worker
به جای نگهداری مستقیم Threadهاما نام فیلد موجود در ThreadPool
را از threads
به workers
تغییر دادهایم زیرا اکنون نمونههای Worker
را نگه میدارد، نه نمونههای JoinHandle<()>
. از شمارنده حلقه for
به عنوان آرگومان برای Worker::new
استفاده میکنیم و هر Worker
جدید را در بردار به نام workers
ذخیره میکنیم.
کد خارجی (مانند سرور ما در src/main.rs) نیازی ندارد جزئیات پیادهسازی مربوط به استفاده از ساختار Worker
در داخل ThreadPool
را بداند، بنابراین ساختار Worker
و تابع new
آن را خصوصی میکنیم. تابع Worker::new
از id
دادهشده استفاده کرده و یک نمونه JoinHandle<()>
ایجاد میکند که با ایجاد یک Thread جدید با یک Closure خالی تولید میشود.
نکته: اگر سیستمعامل نتواند به دلیل کمبود منابع سیستم، یک Thread ایجاد کند،
thread::spawn
به وحشت خواهد افتاد (panic). این باعث میشود کل سرور ما به وحشت بیفتد، حتی اگر ایجاد برخی Threadها موفق باشد. برای سادگی، این رفتار مشکلی ندارد، اما در یک پیادهسازی تولیدی برای Thread Pool، احتمالاً ازstd::thread::Builder
و متدspawn
که یکResult
بازمیگرداند، استفاده میکنید.
این کد کامپایل خواهد شد و تعداد نمونههای Worker
را که به عنوان آرگومان به ThreadPool::new
مشخص کردهایم ذخیره میکند. اما ما هنوز Closureی که در execute
دریافت میکنیم را پردازش نمیکنیم. بیایید بررسی کنیم چگونه این کار را انجام دهیم.
ارسال درخواستها به Threadها از طریق Channelها
مشکل بعدی که به آن میپردازیم این است که Closureهایی که به thread::spawn
داده شدهاند، هیچ کاری انجام نمیدهند. در حال حاضر، Closureی که میخواهیم اجرا کنیم را در متد execute
دریافت میکنیم. اما نیاز داریم که یک Closure به thread::spawn
بدهیم تا در هنگام ایجاد هر Worker
در حین ایجاد ThreadPool
اجرا شود.
میخواهیم ساختارهای Worker
که به تازگی ایجاد کردهایم، کدی را که باید اجرا شود از یک صف که در ThreadPool
نگهداری میشود دریافت کرده و آن کد را به Thread خود برای اجرا ارسال کنند.
Channelهایی که در فصل ۱۶ یاد گرفتیم—راهی ساده برای ارتباط بین دو Thread—برای این مورد استفاده مناسب هستند. ما از یک Channel به عنوان صف کارها استفاده خواهیم کرد و execute
یک کار را از ThreadPool
به نمونههای Worker
ارسال میکند، که این کار را به Thread خود ارسال میکنند. برنامه به شرح زیر خواهد بود:
ThreadPool
یک Channel ایجاد کرده و نگهدارنده sender آن خواهد بود.- هر
Worker
نگهدارنده receiver خواهد بود. - یک ساختار
Job
جدید ایجاد خواهیم کرد که Closureهایی که میخواهیم از طریق Channel ارسال کنیم را نگه میدارد. - متد
execute
کاری که میخواهد اجرا کند را از طریق sender ارسال خواهد کرد. - در Thread خود،
Worker
بر receiver خود حلقه زده و Closureهای هر کاری که دریافت میکند را اجرا خواهد کرد.
بیایید با ایجاد یک Channel در ThreadPool::new
و نگهداری sender در نمونه ThreadPool
شروع کنیم، همانطور که در لیست ۲۱-۱۶ نشان داده شده است. ساختار Job
در حال حاضر چیزی نگه نمیدارد، اما نوع آیتمی خواهد بود که از طریق Channel ارسال میکنیم.
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
ThreadPool
برای ذخیره sender یک Channel که نمونههای Job
را منتقل میکنددر ThreadPool::new
، یک Channel جدید ایجاد میکنیم و Pool نگهدارنده sender خواهد بود. این کد با موفقیت کامپایل میشود.
بیایید هنگام ایجاد کانال توسط thread pool، receiver
کانال را به هر Worker
ارسال کنیم.
میدانیم که میخواهیم از receiver
در threadی که نمونههای Worker
ایجاد میکنند استفاده کنیم،
پس در closure به پارامتر receiver
رفرنس میدهیم.
کد موجود در لیستینگ 21-17 هنوز بهطور کامل کامپایل نمیشود.
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
receiver
به هر Worker
ما تغییرات کوچک و واضحی ایجاد کردهایم: receiver را به Worker::new
ارسال کردهایم و سپس از آن در داخل Closure استفاده کردهایم.
هنگامی که تلاش میکنیم این کد را بررسی کنیم، با این خطا مواجه میشویم:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
--> src/lib.rs:26:42
|
21 | let (sender, receiver) = mpsc::channel();
| -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 | for id in 0..size {
| ----------------- inside of this loop
26 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here, in previous iteration of loop
|
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
--> src/lib.rs:47:33
|
47 | fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
| --- in this method ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
|
25 ~ let mut value = Worker::new(id, receiver);
26 ~ for id in 0..size {
27 ~ workers.push(value);
|
For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error
کد در تلاش است تا receiver
را به چند نمونهی مختلف از Worker
ارسال کند.
این کار عملی نیست، همانطور که در فصل ۱۶ یاد گرفتیم: پیادهسازی کانال در Rust به صورت multiple producer و single consumer است.
یعنی نمیتوانیم انتهای مصرفکنندهی کانال را clone کنیم تا این کد را اصلاح کنیم.
همچنین نمیخواهیم یک پیام را چند بار به چند مصرفکننده ارسال کنیم؛
هدف این است که یک لیست از پیامها داشته باشیم که چند نمونه Worker
آن را دریافت کنند،
طوری که هر پیام فقط یک بار پردازش شود.
علاوه بر این، برداشتن یک کار از صف کانال شامل تغییر receiver
میشود، بنابراین Threadها به یک روش امن برای اشتراک و تغییر receiver
نیاز دارند؛ در غیر این صورت، ممکن است با شرایط رقابتی (race conditions) مواجه شویم (همانطور که در فصل ۱۶ توضیح داده شد).
به یاد بیاورید اشارهگرهای هوشمند ایمن در برابر thread که در فصل ۱۶ بحث شدند:
برای اشتراک مالکیت بین چند thread و اجازه دادن به تغییر مقدار توسط threadها،
باید از Arc<Mutex<T>>
استفاده کنیم.
نوع Arc
اجازه میدهد چند نمونهی Worker
مالک receiver
باشند،
و Mutex
تضمین میکند که در هر لحظه فقط یک Worker
بتواند از receiver
یک کار دریافت کند.
لیستینگ 21-18 تغییراتی را که باید انجام دهیم نشان میدهد.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
// --snip--
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --snip--
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
Worker
با استفاده از Arc
و Mutex
در تابع ThreadPool::new
، receiver
را داخل یک Arc
و یک Mutex
قرار میدهیم.
برای هر نمونهی جدید از Worker
، Arc
را clone میکنیم تا شمارندهی رفرنس افزایش یابد،
بهطوری که نمونههای Worker
بتوانند مالکیت مشترک receiver
را داشته باشند.
با این تغییرات، کد کامپایل میشود! به نتیجه نزدیکتر میشویم!
پیادهسازی متد execute
بیایید در نهایت متد execute
را روی ThreadPool
پیادهسازی کنیم.
همچنین Job
را از یک struct
به یک type alias برای یک trait object تبدیل میکنیم
که نوع closure ای را که execute
دریافت میکند نگه میدارد.
همانطور که در بخش “ایجاد مترادفهای نوع با type alias” در فصل ۲۰ بحث شد،
type alias به ما اجازه میدهد تا انواع طولانی را کوتاهتر کنیم و استفاده از آنها را آسانتر سازیم.
به لیستینگ 21-19 نگاه کنید.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
// --snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
Job
برای یک Box
که هر Closure را نگه میدارد و سپس ارسال کار از طریق کانالپس از ایجاد یک نمونه جدید Job
با استفاده از Closureی که در execute
دریافت میکنیم، آن کار را از طریق بخش ارسالکننده کانال ارسال میکنیم. ما برای حالتی که ارسال شکست بخورد، روی send
از unwrap
استفاده میکنیم. این حالت ممکن است رخ دهد، اگر مثلاً همه Threadهای ما از اجرا متوقف شوند، به این معنی که بخش دریافتکننده دیگر پیامهای جدید را دریافت نمیکند. در حال حاضر، نمیتوانیم Threadهای خود را از اجرا متوقف کنیم: Threadهای ما تا زمانی که Pool وجود دارد اجرا میشوند. دلیل استفاده از unwrap
این است که میدانیم حالت شکست رخ نخواهد داد، اما کامپایلر این موضوع را نمیداند.
اما هنوز کار تمام نشده است! در Worker
، closure که به thread::spawn
داده میشود
هنوز فقط به انتهای دریافتکنندهی کانال رفرنس میدهد.
در عوض، نیاز داریم که closure بهطور پیوسته در حلقهای بینهایت اجرا شود،
از انتهای دریافتکنندهی کانال درخواست کار کند و هرگاه کار دریافت کرد آن را اجرا نماید.
بیایید تغییرات نشانداده شده در لیستینگ 21-20 را در Worker::new
اعمال کنیم.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
Worker
در اینجا، ابتدا lock
را روی receiver
فراخوانی میکنیم تا mutex را به دست آوریم، و سپس unwrap
را فراخوانی میکنیم تا در صورت بروز هرگونه خطا، برنامه متوقف شود. به دست آوردن یک قفل ممکن است شکست بخورد اگر mutex در یک وضعیت poisoned باشد، که ممکن است اتفاق بیفتد اگر یک Thread دیگر در حالی که قفل را نگه داشته است به جای آزاد کردن آن متوقف شده باشد. در این شرایط، فراخوانی unwrap
برای متوقف کردن این Thread اقدام درستی است. میتوانید این unwrap
را به یک expect
با یک پیام خطای معنادار برای خود تغییر دهید.
اگر قفل روی mutex را به دست آوریم، recv
را فراخوانی میکنیم تا یک Job
را از کانال دریافت کنیم. یک unwrap
نهایی نیز در اینجا هر گونه خطا را برطرف میکند، که ممکن است رخ دهد اگر Threadی که sender را نگه داشته است خاموش شود، مشابه نحوهای که متد send
در صورت خاموش شدن receiver یک Err
بازمیگرداند.
فراخوانی recv
مسدود میشود، بنابراین اگر هنوز هیچ کاری وجود نداشته باشد، Thread فعلی منتظر میماند تا یک کار در دسترس قرار گیرد. Mutex<T>
تضمین میکند که در هر لحظه فقط یک Thread Worker
در تلاش برای درخواست یک کار است.
Thread Pool ما اکنون در وضعیت کاری قرار دارد! دستور cargo run
را اجرا کنید و چندین درخواست ارسال کنید:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
warning: field `workers` is never read
--> src/lib.rs:7:5
|
6 | pub struct ThreadPool {
| ---------- field in this struct
7 | workers: Vec<Worker>,
| ^^^^^^^
|
= note: `#[warn(dead_code)]` on by default
warning: fields `id` and `thread` are never read
--> src/lib.rs:48:5
|
47 | struct Worker {
| ------ fields in this struct
48 | id: usize,
| ^^
49 | thread: thread::JoinHandle<()>,
| ^^^^^^
warning: `hello` (lib) generated 2 warnings
Finished `dev` profile [unoptimized + debuginfo] target(s) in 4.91s
Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
موفقیت! اکنون یک Thread Pool داریم که اتصالات را به صورت همزمان اجرا میکند. هرگز بیش از چهار Thread ایجاد نمیشود، بنابراین اگر سرور درخواستهای زیادی دریافت کند، سیستم ما بارگذاری بیش از حد نخواهد شد. اگر یک درخواست به /sleep ارسال کنیم، سرور میتواند با استفاده از یک Thread دیگر به سایر درخواستها پاسخ دهد.
توجه: اگر مسیر /sleep را همزمان در چندین پنجرهی مرورگر باز کنید، ممکن است درخواستها بهصورت پشت سر هم و با فاصلههای پنج ثانیهای بارگذاری شوند. برخی مرورگرها به دلایل کش، چندین نمونه از یک درخواست مشابه را بهصورت ترتیبی اجرا میکنند. این محدودیت ناشی از سرور وب ما نیست.
این زمان خوبی است که مکث کنیم و بررسی کنیم چگونه کدهای لیستهای ۲۱-۱۸، ۲۱-۱۹ و ۲۱-۲۰ اگر به جای Closure از futures برای انجام کار استفاده میکردیم، متفاوت میبود. چه نوعهایی تغییر میکردند؟ آیا امضاهای متدها تغییر میکردند؟ کدام بخشهای کد همانگونه باقی میماندند؟
بعد از آشنایی با حلقهی while let
در فصلهای ۱۷ و ۱۹،
ممکن است این سؤال برایتان پیش آمده باشد که چرا کد thread مربوط به Worker
را مانند آنچه در لیستینگ 21-21 نشان داده شده ننوشتهایم.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
while let Ok(job) = receiver.lock().unwrap().recv() {
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
Worker::new
با استفاده از while let
این کد کامپایل میشود و اجرا میشود، اما منجر به رفتار مورد نظر برای threading نمیشود: یک درخواست کند همچنان باعث میشود سایر درخواستها برای پردازش منتظر بمانند. دلیل آن کمی ظریف است: ساختار Mutex
متد عمومی unlock
ندارد، زیرا مالکیت قفل بر اساس طول عمر MutexGuard<T>
درون LockResult<MutexGuard<T>>
که متد lock
بازمیگرداند است. در زمان کامپایل، بررسیکننده وام میتواند این قانون را اعمال کند که منبعی که توسط یک Mutex
محافظت میشود نمیتواند دسترسی پیدا کند مگر اینکه قفل را نگه داشته باشیم. با این حال، این پیادهسازی همچنین میتواند منجر به نگهداشتن قفل بیش از حد انتظار شود اگر به طول عمر MutexGuard<T>
توجه نکنیم.
کدی که در لیستینگ 21-20 با عبارت
let job = receiver.lock().unwrap().recv().unwrap();
نوشته شده است، کار میکند زیرا در استفاده از let
،
هر مقدار موقتی که در سمت راست علامت مساوی به کار رفته باشد،
بلافاصله پس از پایان دستور let
رها (drop) میشود.
اما در while let
(و همچنین if let
و match
) مقدارهای موقتی تا پایان بلاک مربوطه رها نمیشوند.
در لیستینگ 21-21، قفل (lock
) تا پایان فراخوانی job()
نگه داشته میشود،
که این یعنی سایر نمونههای Worker
نمیتوانند در آن مدت کار دریافت کنند.