仓库地址: 1037827920/web-server: 使用rust编写的简单web服务器 (github.com)
模拟慢请求
一个单线程版本的web服务器只能一次处理一个请求,可是如果一个请求持续的时间太长,就会导致其他请求有可能饥饿,下面使用sleep方式让每次请求持续5s,模拟真实的慢请求:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration, };
fn main() { let listener = TcpListener::bind("localhost:8080").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } }
fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&mut stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), };
let contents = fs::read_to_string(filename).unwrap(); let length = contents.len();
let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap(); }
|
运行代码后访问localhost:8080/sleep,然后紧接着继续运行localhost:8080,会发现后者的请求必须等待前者完成后才能被处理,下面使用线程池改善吞吐量
多线程Web服务器实现
线程池: 包含一组已经生成的线程,它们时刻等待着接收并处理新的任务,当程序接收到新任务时,它会将线程池中的一个线程指派给该任务,在该线程忙着处理时,新来的任务交给池中剩余的线程进行处理,最终,当执行任务的线程处理完后,它会被重新放入到线程池中,准备处理新任务。注意: 需要限制线程池中的线程数量,以保护服务器免受拒绝服务攻击(DoS)的影响:如果针对每个请求创建一个新线程,那么一个人向我们的服务器发出1000万个请求,会直接耗尽资源,导致后续用户的请求无法被处理,这也是拒绝服务名称的来源。
因此,需要对线程池进行一定的架构设计,首先是设定最大线程数的上限,其次是维护一个请求队列。池中的线程去队列中依次弹出请求并处理。
为每个请求单独生成一个线程
修改main函数,每次处理一个任务就创建一个新的线程并执行任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration, };
fn main() { let listener = TcpListener::bind("localhost:8080").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { handle_connection(stream); }); } }
fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&mut stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), };
let contents = fs::read_to_string(filename).unwrap(); let length = contents.len();
let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap(); }
|
这样简单粗暴就能实现多线程的Web服务器,但是这样不能达到限制线程池中线程数的效果
限制创建线程的数量
利用线程池,继续修改main函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| fn main() { let listener = TcpListener::bind("localhost:8080").unwrap(); let pool = ThreadPool::new(4); for stream in listener.incoming() { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream) }); } }
|
可以看出,我们至少要实现ThreadPool这个结构体和execute方法
ThreadPool的初始化
首先要确定使用new还是build来初始化ThreadPool实例,new往往用于简单初始化一个实例,而build往往会完成更加复杂的构建工作,我们并不需要在初始化线程池的同时创建相应的线程,因此new更合适。
在src/lib.rs写入以下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| pub struct ThreadPool;
impl ThreadPool { pub fn new(size: usize) -> ThreadPool { assert!(size > 0);
ThreadPool } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static { todo!(); } }
|
在src/main.rs中导入lib.rs的ThreadPool:
1
| use <project_name>::ThreadPool;
|
ThreadPool的存储
ThreadPool作为一个线程池,肯定是要能够存储线程的对吧,继续修改ThreadPool,添加threads
字段,使其能够存储线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| use std::thread::{self, Thread};
pub struct ThreadPool { threads: Vec<thread::JoinHandle<()>>, }
impl ThreadPool { pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut threads = Vec::with_capacity(size);
for _ in 0..size { todo!(); }
ThreadPool { threads } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static { todo!(); } }
|
ThreadPool的设计
使用thread::spawn是生成线程的最好方式,但是它会立即执行传入的任务,我们需要的是创建线程和执行任务是要分离的。也就是说,我们可以先创建线程后这个线程就进入loop循环等待,直到有执行任务的信号过来这个线程才会执行任务。
可以考虑创建一个Worker结构体,存放id和对应的线程。作为ThreadPool和任务线程联系的桥梁,通过channel,ThreadPool持有Sender,通过execute方法将任务发送给Worker,而Worker持有Receiver,在loop循环中接收ThreadPool发送过来的任务。
ThreadPool结构体:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| use std::{ sync::{mpsc, Arc, Mutex}, thread, };
pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, }
impl ThreadPool { pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static { let job = Box::new(f); self.sender.send(job).unwrap(); } }
|
Worker结构体:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| type Job = Box<dyn FnOnce() + Send + 'static>;
struct Worker { id: usize, thread: thread::JoinHandle<()>, }
impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Workder {id} got a job; executing"); job(); }); Worker { id, thread } } }
|
关闭和资源清理
为ThreadPool实现Drop
当线程池被Drop时,需要等待所有的子线程完成它们的工作,然后再退出:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| struct Worker { id: usize, thread: Option<thread::JoinHandle<()>>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Workder {id} got a job; executing"); job(); }); Worker { id, thread: Some(thread) } } }
impl Drop for ThreadPool { fn drop(&mut self) { for worker in &mut self.workers { println!("Shuting down worker {}", worker.id); if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } } }
|
停止工作线程
虽然调用了join,但是目标线程依然不会停止,原因在于它们在无限地loop循环等待,需要channel的drop机制:释放sender后,receiver会收到错误,然后再退出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| pub struct ThreadPool { workers: Vec<Worker>, sender: Option<mpsc::Sender<Job>>, }
impl ThreadPool { pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender: Some(sender) } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static { let job = Box::new(f); self.sender.as_ref().unwrap().send(job).unwrap(); } }
impl Drop for ThreadPool { fn drop(&mut self) { drop(self.sender.take()); for worker in &mut self.workers { println!("Shuting down worker {}", worker.id); if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } } }
|
当sender被关闭后,将关闭对应的channel,所以loop的receiver就会收到一个错误,根据错误再进一步的错误:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let message = receiver.lock().unwrap().recv(); match message { Ok(job) => { println!("Worker {id} got a job; executing"); job(); } Err(_) => { println!("Worker {id} disconnected; shutting down."); break; } } }); Worker { id, thread: Some(thread), } } }
|
测试
为了验证代码的正确性,修改main:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| fn main() { let listener = TcpListener::bind("localhost:8080").unwrap(); let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) { let stream = stream.unwrap();
pool.execute(|| { handle_connection(stream); }); }
println!("Shutting down."); }
|