使用rust实现一个异步运行时是async-std的单线程Web服务器。
仓库地址: 1037827920/web-server:
使用rust编写的简单web服务器 (github.com)
在之前的单线程版本的Web服务器代码上进行修改,具体代码在给的仓库地址中。
实现异步代码
首先将handle_connection修改为async实现:
1
   | async fn handle_connection(mut stream: TcpStream) {}
 
  | 
 
该修改会将函数的返回值从()变成Future<Output =
()>,因此直接运行将不再有任何效果,只用通过.await或执行器的poll。
使用async-std作为异步运行时:
async-std运行时允许使用属性#[async_std::main]将我们的fn
main函数变成async fn
main,这样就可以在main函数中直接调用其他async函数,否则你得用block_on方法来让main去阻塞等待异步函数的完成,但是这种简单粗暴的阻塞等待方式并不灵活
Cargo.toml:
1 2 3 4 5 6
   | [dependencies] futures = "0.3"
  [dependencies.async-std] version = "1.6" features = ["attributes"]
 
  | 
 
下面将main函数修改为异步的,并在其中调用前面修改的异步版本handle_connection:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
   | use std::{     io::{prelude::*, BufReader},     net::{TcpListener, TcpStream},     fs,     time::Duration, }; extern crate async_std; use async_std::task;
  #[async_std::main] async fn main() {     let listener = TcpListener::bind("localhost:8080").unwrap();     for stream in listener.incoming() {         let stream = stream.unwrap();                  handle_connection(stream).await;     } }
 
  | 
 
实现异步版本的handle_connection:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
   | 
  async fn handle_connection(mut stream: TcpStream) {     let buf_reader = BufReader::new(&mut stream);          let request_line = buf_reader.lines().next().unwrap().unwrap();
           let (status_line, filename) = match &request_line[..] {         "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),          "GET /sleep HTTP/1.1" => {                           task::sleep(Duration::from_secs(5)).await;             ("HTTP/1.1 200 OK", "hello.html")         }         _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),     };
      let contents = fs::read_to_string(filename).unwrap();     let length = contents.len();          let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
           stream.write_all(response.as_bytes()).unwrap(); }
 
  | 
 
可以看出,只是把函数变成async往往是不够的,还需要将它内部的代码也都变成异步兼容,阻塞线程绝对是不可行的
但是线程web服务器还是不能进行并发处理请求,原因是listener.incoming()是阻塞的迭代器。当listener在等待连接时,执行器是无法执行其他Future的,而且只有当我们处理完已有的连接后,才能接收新的连接。
并发地处理连接
上面的解决方法是将listener.incoming()从一个阻塞的迭代器变成一个非阻塞的Stream
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
   | use std::{     fs,     time::Duration, }; extern crate async_std; use async_std::{     net::{TcpListener, TcpStream},     io::{prelude::*, BufReader},     task, }; use futures::StreamExt;
  #[async_std::main] async fn main() {     let listener = TcpListener::bind("localhost:8080").await.unwrap();          listener         .incoming()         .for_each_concurrent(None, |tcpstream| async move {             let tpcstream = tcpstream.unwrap();             handle_connection(tpcstream).await;         })         .await; }
 
  | 
 
异步版本的TcpListener为listener.incoming()实现了Stream
trait,这样listener.incoming()不再阻塞,且使用for_each_concurrent可以并发地处理从Stream获取的元素。
现在关键在于handle_connection不能再阻塞:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
   | 
  async fn handle_connection(mut stream: TcpStream) {     let buf_reader = BufReader::new(&mut stream);          let request_line = buf_reader.lines().next().await.unwrap().unwrap();
           let (status_line, filename) = match &request_line[..] {         "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),          "GET /sleep HTTP/1.1" => {                           task::sleep(Duration::from_secs(5)).await;             ("HTTP/1.1 200 OK", "hello.html")         }         _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),     };
      let contents = fs::read_to_string(filename).unwrap();     let length = contents.len();          let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
           stream.write_all(response.as_bytes()).await.unwrap(); }
 
  | 
 
在将数据读写改造成异步后,现在该函数也彻底变成了异步版本,可以并发地处理连接
使用多线程提升性能
async并发和多线程其实并不冲突,async-std包也允许我们使用多个线程去处理,由于handle_connection实现了Send
trait不会阻塞,因此使用async_std::task::spawn是非常安全的:
1 2 3 4 5 6 7 8 9 10 11 12 13
   | use async_std::task::spawn;
  #[async_std::main] async fn main() {     let listener = TcpListener::bind("localhost:8080").await.unwarp():     listener     	.incoming()     	.for_each_concurrent(None, |stream| async move {             let stream = stream.unwrap();             spawn(handle_connection(stream));     })     .await; }
 
  | 
 
但是这里是为每个请求都单独创建了一个线程,实际上需要限制创建线程的数量,可以通过线程池来实现。具体可以看这篇无async的多线程版本的Web服务器