重庆网站建设夹夹虫seo快速提高网站转化率

当前位置: 首页 > news >正文

重庆网站建设夹夹虫,seo快速提高网站转化率,扬中论坛扬中人家,wordpress会员查看内容收费这一章将实现一个手写的 web server 和 多线程的服务器#xff0c;用到之前学到的所有特性 简单的web server 作为一个 web 服务器#xff0c;我们首先要能接收到请求#xff0c;目前市面上的 web 服务大多数都是基于 HTTP 和 HTTPS 协议的#xff0c;而他们有是基于 TCP…这一章将实现一个手写的 web server 和 多线程的服务器用到之前学到的所有特性 简单的web server 作为一个 web 服务器我们首先要能接收到请求目前市面上的 web 服务大多数都是基于 HTTP 和 HTTPS 协议的而他们有是基于 TCP 协议传输的 所以我们希望我们的服务器可以监听 TCP 连接rust 的库函数中提供了 TcpListener 这样的函数来监听 TCP 连接我们新建一个项目然后在 src/main.rs 中编写监听功能。 TcpListener 的 incoming 方法返回一个迭代器它提供了一系列的流流stream代表一个客户端和服务端之间打开的连接 use std::net::TcpListener;fn main() {let listener TcpListener::bind(127.0.0.1:7878).unwrap();for stream in listener.incoming() {let stream stream.unwrap();println!(Connection established!);} }运行代码之后访问 127.0.0.1:7878 就可以看到接收到了信息如果你成功接收到了信息那么下一步就是怎么样处理信息我们使用一个缓冲区接收我们的数据此时它是一个 u8 类型的字节流数据我们将它转化为字符串打印出来 use std::{io::{prelude::, BufReader},net::{TcpListener, TcpStream}, };fn main() {let listener TcpListener::bind(127.0.0.1:7878).unwrap();for stream in listener.incoming() {let stream stream.unwrap();handle_connection(stream);} }fn handle_connection(mut stream: TcpStream) {let mut buffer [0; 1024];stream.read(mut buffer).unwrap();println!(Request: {}, String::from_utf8_lossy(buffer[..])); }现在运行我们的程序它将打印出一个完整的 TCP 请求作为一个服务器对于一个请求我们需要做出我们的回复提供给它需要的资源这里我们就返回先尝试返回一个简单的响应 因为 stream 的 write_all 方法获取一个 [u8] 并直接将这些字节发送给连接。所以我们调用as_bytes 对数据进行转化 fn handle_connection(mut stream: TcpStream) {let mut buffer [0; 1024];stream.read(mut buffer).unwrap();let response HTTP/1.1 200 OK\r\n\r\n;stream.write_all(response.as_bytes()).unwrap();stream.flush().unwrap(); }此时我们再次访问我们的地址将不会得到报错信息而是返回一个空白的页面说明我们的页面请求已经得到了响应那么接下来我们需要让我们的服务器能够返回真实的 web 页面 我们编写一个 hello.html 页面放在项目的根目录下 !DOCTYPE html html langenheadmeta charsetutf-8titleHello!/title/headbodyh1Hello!/h1pHi from Rust/p/body /html之后在返回处理方法中读取我们刚刚编写页面的内容构造成 TCP 请求的返回格式进行返回 use std::{fs,io::{prelude::, BufReader},net::{TcpListener, TcpStream}, }; // –snip–fn handle_connection(mut stream: TcpStream) {let mut buffer [0; 1024];stream.read(mut buffer).unwrap();let status_line HTTP/1.1 200 OK;let contents fs::read_to_string(hello.html).unwrap();let length contents.len();let response format!({status_line}\r\nContent-Length: {length}\r\n\r\n{contents});stream.write_all(response.as_bytes()).unwrap();stream.flush().unwrap(); }当然我们的服务器不可能只返回一个页面对于不同路径的 TCP 请求我们需要为他们返回不同的页面所以我们做了一个模式匹配如果是请求的主页我们返回一个页面如果请求其他页面我们返回一个 404 页面表示找不到 fn handle_connection(mut stream: TcpStream) {let mut buffer [0; 1024];stream.read(mut buffer).unwrap();let get bGET / HTTP/1.1\r\n;let (status_line, filename) if buffer.starts_with(get) {(HTTP/1.1 200 OK, index.html)} else {(HTTP/1.1 404 NOT FOUND, 404.html)};let contents fs::read_to_string(filename).unwrap();let response format!({}\r\nContent-Length: {}\r\n\r\n{},status_line,contents.len(),contents);stream.write_all(response.as_bytes()).unwrap();stream.flush().unwrap(); }多线程的web server 之前搭建的服务器一次只能处理一个请求如果我们遇到多个请求的情况只有当上一个请求处理完毕才能处理下一个请求当一个请求时间很长的时候会极大影响整体的性能比如我们构造一个 sleep 路径它在阻塞 5 秒钟后才返回那么整个服务器的所有请求都会阻塞 use std::fs; use std::io::prelude::; use std::net::TcpListener; use std::net::TcpStream; use std::thread; use std::time::Duration; fn handle_connection(mut stream: TcpStream) {let mut buffer [0; 1024];stream.read(mut buffer).unwrap();let get bGET / HTTP/1.1\r\n;let sleep bGET /sleep HTTP/1.1\r\n;let (status_line, filename) if buffer.starts_with(get) {(HTTP/1.1 200 OK, index.html)} else if buffer.starts_with(sleep) {thread::sleep(Duration::from_secs(5));(HTTP/1.1 200 OK, index.html)} else {(HTTP/1.1 404 NOT FOUND, 404.html)};let contents fs::read_to_string(filename).unwrap();let response format!({}\r\nContent-Length: {}\r\n\r\n{},status_line,contents.len(),contents);stream.write_all(response.as_bytes()).unwrap();stream.flush().unwrap(); }为了解决这个情况我们需要一个多线程的服务器我们需要设计一个 线程池thread pool。当程序收到一个新任务线程池中的一个线程会被分配任务这个线程会离开并处理任务。其余的线程则可用于处理在第一个线程处理任务的同时处理其他接收到的任务。当第一个线程处理完任务时它会返回空闲线程池中等待处理新任务。线程池允许我们并发处理连接增加 server 的吞吐量。同时为了保证我们的服务器 不被 Dos 攻击我们需要限定我们线程的数量所有我们的线程池应该在初始化的时候可以限定最大线程数所以我们的线程池调用应该是这样的 使用 ThreadPool::new 来创建一个新的线程池它有一个可配置的线程数的参数pool.execute 有着类似 thread::spawn 的接口它获取一个线程池运行于每一个流的闭包。 fn main() {let listener TcpListener::bind(127.0.0.1:7878).unwrap();let pool ThreadPool::new(4);for stream in listener.incoming() {let stream stream.unwrap();pool.execute(|| {handle_connection(stream);});} }有了目标之后我们就可以编写我们的 ThreadPool 程序我们创建 src/lib.rs 文件在其中创建我们的 ThreadPool 它包含一个 new 函数以及一个 执行函数 execute pub struct ThreadPool;impl ThreadPool {pub fn new(size: usize) - ThreadPool {ThreadPool} } pub fn executeF(self, f: F)whereF: FnOnce() Send static,{}这里传入泛型如此约束的原因是我们的 execute 函数参照了标准库的 thread::spawn 函数它的定义如下 pub fn spawnF, T(f: F) - JoinHandleTwhereF: FnOnce() - T,F: Send static,T: Send static,之后我们可以看到spawn 返回一个 JoinHandle 类型的数据所以我们也可以使用它来包裹我们的线程我们在 ThreadPool 结构中定义 Vec 来存储我们的线程 use std::thread;pub struct ThreadPool {threads: Vecthread::JoinHandle(), }impl ThreadPool {// –snip–pub fn new(size: usize) - ThreadPool {assert!(size 0);let mut threads Vec::with_capacity(size);for _ in 0..size {// create some threads and store them in the vector}ThreadPool { threads }}// –snip– }此时我们的项目已经可以运行了我们还没有实现我们的线程的具体内容我们将要实现的行为是创建线程并稍后发送代码所以我们需要在 ThreadPool 和线程间引入一个新数据类型来管理这种新行为这个数据结构称为 Worker每一个 Worker 会储存一个单独的 JoinHandle() 实例。接着会在 Worker 上实现一个方法它会获取需要允许代码的闭包并将其发送给已经运行的线程执行。我们还会赋予每一个 worker id这样就可以在日志和调试中区别线程池中的不同 worker。 use std::thread;pub struct ThreadPool {workers: VecWorker, }impl ThreadPool {// –snip–pub fn new(size: usize) - ThreadPool {assert!(size 0);let mut workers Vec::with_capacity(size);for id in 0..size {workers.push(Worker::new(id));}ThreadPool { workers }}// –snip– } struct Worker {id: usize,thread: thread::JoinHandle(), } impl Worker {fn new(id: usize) - Worker {let thread thread::spawn(|| {});Worker { id, thread }} }我们已经可以创建线程但是之后我们需要将我们通过 execute 方法传递进来的方法放入我们的线程中我们使用 信道 —— 作为沟通工具方案如下 ThreadPool 会创建一个信道并充当发送者。每个 Worker 将会充当接收者。新建一个 Job 结构体来存放用于向信道中发送的闭包。execute 方法会在发送者发出期望执行的任务。在线程中Worker 会遍历接收者并执行任何接收到的任务。 use std::{sync::mpsc, thread};pub struct ThreadPool {workers: VecWorker,sender: mpsc::SenderJob, }struct Job;impl ThreadPool {// –snip–pub fn new(size: usize) - ThreadPool {assert!(size 0);let (sender, receiver) mpsc::channel();let mut workers Vec::with_capacity(size);for id in 0..size {workers.push(Worker::new(id, receiver));}ThreadPool { workers, sender }}// –snip– }// –snip– impl Worker {fn new(id: usize, receiver: mpsc::ReceiverJob) - Worker {let thread thread::spawn(|| {receiver;});Worker { id, thread }} }但是上述的代码有一个问题这段代码尝试将 receiver 传递给多个 Worker 实例。这是不行的Rust 所提供的信道实现是多 生产者单 消费者 的。为了在多个线程间共享所有权并允许线程修改其值需要使用 ArcMutexT。Arc 使得多个 worker 拥有接收端而 Mutex 则确保一次只有一个 worker 能从接收端得到任务 use std::{sync::{mpsc, Arc, Mutex},thread, }; // –snip–impl ThreadPool {// –snip–pub fn new(size: usize) - ThreadPool {assert!(size 0);let (sender, receiver) mpsc::channel();let receiver Arc::new(Mutex::new(receiver));let mut workers Vec::with_capacity(size);for id in 0..size {workers.push(Worker::new(id, Arc::clone(receiver)));}ThreadPool { workers, sender }}// –snip– }// –snip–impl Worker {fn new(id: usize, receiver: ArcMutexmpsc::ReceiverJob) - Worker {// –snip–} }最后我们需要实现 ThreadPool 上的 execute 方法它将接收到的方法传递到线程中并且执行它。而在 worker 中传递给 thread::spawn 的闭包仍然还只是 引用 了信道的接收端。我们需要闭包一直循环向信道的接收端请求任务并在得到任务时执行他们。 这里调用了 lock 来获取互斥器如果互斥器处于一种叫做 被污染poisoned的状态时获取锁可能会失败这可能发生于其他线程在持有锁时 panic 了且没有释放锁。如果锁定了互斥器接着调用 recv 从信道中接收 Job调用 recv 会阻塞当前线程所以如果还没有任务其会等待直到有可用的任务。 type Job Boxdyn FnOnce() Send static;impl ThreadPool {// –snip–pub fn executeF(self, f: F)whereF: FnOnce() Send static,{let job Box::new(f);self.sender.send(job).unwrap();} }// –snip–impl Worker {fn new(id: usize, receiver: ArcMutexmpsc::ReceiverJob) - Worker {let thread thread::spawn(move || loop {let job receiver.lock().unwrap().recv().unwrap();println!(Worker {id} got a job; executing.);job();});Worker { id, thread }} } 停机与清理 之前编写代码已经可以完整运行了但是面临两个问题一个是我们不能停止我们的服务器只能使用命令行的强制停止程序命令并且我们的服务器并没有清理所有的内容所以我们需要继续完善我们的服务器 首先我们为线程池实现 Drop。当线程池被丢弃时应该 join 所有线程以确保他们完成其操作。这里首先遍历线程池中的每个 workers。对于每一个线程会打印出说明信息表明此特定 worker 正在关闭接着在 worker 线程上调用 join。如果 join 调用失败通过 unwrap 使得 panic 并进行不优雅的关闭。 impl Drop for ThreadPool {fn drop(mut self) {for worker in mut self.workers {println!(Shutting down worker {}, worker.id);worker.thread.join().unwrap();}} }因为目前只有每一个 worker 的可变借用而 join 需要获取其参数的所有权。如果 Worker 存放的是 Optionthread::JoinHandle()就可以在 Option 上调用 take 方法将值从 Some 成员中移动出来而对 None 成员不做处理。换句话说正在运行的 Worker 的 thread 将是 Some 成员值而当需要清理 worker 时将 Some 替换为 None这样 worker 就没有可以运行的线程了。所以我们需要为此需要更新 Worker 的定义为如下 struct Worker {id: usize,thread: Optionthread::JoinHandle(), }对此当新建 Worker 时需要将 thread 值封装进 Some。 impl Worker {fn new(id: usize, receiver: ArcMutexmpsc::ReceiverJob) - Worker {// –snip–Worker {id,thread: Some(thread),}} }impl Drop for ThreadPool {fn drop(mut self) {for worker in mut self.workers {println!(Shutting down worker {}, worker.id);if let Some(thread) worker.thread.take() {thread.join().unwrap();}}} }现在程序可以通过编译了但是存在的问题是 Worker 中分配的线程所运行的闭包中的逻辑调用 join 并不会关闭线程因为他们一直 loop 来寻找任务。如果采用这个实现来尝试丢弃 ThreadPool则主线程会永远阻塞在等待第一个线程结束上。 所以我们需要修改 ThreadPool 的 drop 实现并修改 Worker 循环。我们需要在等待线程结束前显式丢弃 sender pub struct ThreadPool {workers: VecWorker,sender: Optionmpsc::SenderJob, } // –snip– impl ThreadPool {pub fn new(size: usize) - ThreadPool {// –snip–ThreadPool {workers,sender: Some(sender),}}pub fn executeF(self, f: F)whereF: FnOnce() Send static,{let job Box::new(f);self.sender.asref().unwrap().send(job).unwrap();} }impl Drop for ThreadPool {fn drop(mut self) {drop(self.sender.take());for worker in mut self.workers {println!(Shutting down worker {}, worker.id);if let Some(thread) worker.thread.take() {thread.join().unwrap();}}} }丢弃 sender 会关闭信道这表明不会有更多的消息被发送这时 worker 中的无限循环中的所有 recv 调用都会返回错误。我们修改 Worker 循环在这种情况下优雅地退出这意味着当 ThreadPool 的 drop 实现调用 join 时线程会结束。 impl Worker {fn new(id: usize, receiver: ArcMutexmpsc::ReceiverJob) - Worker {let thread thread::spawn(move || loop {let message receiver.lock().unwrap().recv();match message {Ok(job) {println!(Worker {id} got a job; executing.);job();}Err() {println!(Worker {id} disconnected; shutting down.);break;}}});Worker {id,thread: Some(thread),}} }修改 main 函数可以让收到指定数量的请求后停机 fn main() {let listener TcpListener::bind(127.0.0.1:7878).unwrap();let pool ThreadPool::new(4);for stream in listener.incoming().take(2) {let stream stream.unwrap();pool.execute(|| {handle_connection(stream);});}println!(Shutting down.); }至此一个完整的小型 web 服务器搭建完毕完整的代码如下 文件名src/main.rs use hello::ThreadPool;//改成你hello的项目名字 use std::fs; use std::io::prelude::; use std::net::TcpListener; use std::net::TcpStream; use std::thread; use std::time::Duration;fn main() {let listener TcpListener::bind(127.0.0.1:7878).unwrap();let pool ThreadPool::new(4);for stream in listener.incoming().take(2) {let stream stream.unwrap();pool.execute(|| {handle_connection(stream);});}println!(Shutting down.); }fn handle_connection(mut stream: TcpStream) {let mut buffer [0; 1024];stream.read(mut buffer).unwrap();let get bGET / HTTP/1.1\r\n;let sleep bGET /sleep HTTP/1.1\r\n;let (status_line, filename) if buffer.starts_with(get) {(HTTP/1.1 200 OK, hello.html)} else if buffer.starts_with(sleep) {thread::sleep(Duration::from_secs(5));(HTTP/1.1 200 OK, hello.html)} else {(HTTP/1.1 404 NOT FOUND, 404.html)};let contents fs::read_to_string(filename).unwrap();let response format!({}\r\nContent-Length: {}\r\n\r\n{},status_line,contents.len(),contents);stream.write_all(response.as_bytes()).unwrap();stream.flush().unwrap(); }文件名src/lib.rs use std::{sync::{mpsc, Arc, Mutex},thread, };pub struct ThreadPool {workers: VecWorker,sender: Optionmpsc::SenderJob, }type Job Boxdyn FnOnce() Send static;impl ThreadPool {/// Create a new ThreadPool.////// The size is the number of threads in the pool.////// # Panics////// The new function will panic if the size is zero.pub fn new(size: usize) - ThreadPool {assert!(size 0);let (sender, receiver) mpsc::channel();let receiver Arc::new(Mutex::new(receiver));let mut workers Vec::with_capacity(size);for id in 0..size {workers.push(Worker::new(id, Arc::clone(receiver)));}ThreadPool {workers,sender: Some(sender),}}pub fn executeF(self, f: F)whereF: FnOnce() Send static,{let job Box::new(f);self.sender.asref().unwrap().send(job).unwrap();} }impl Drop for ThreadPool {fn drop(mut self) {drop(self.sender.take());for worker in mut self.workers {println!(Shutting down worker {}, worker.id);if let Some(thread) worker.thread.take() {thread.join().unwrap();}}} }struct Worker {id: usize,thread: Optionthread::JoinHandle(), }impl Worker {fn new(id: usize, receiver: ArcMutexmpsc::ReceiverJob) - Worker {let thread thread::spawn(move || loop {let message receiver.lock().unwrap().recv();match message {Ok(job) {println!(Worker {id} got a job; executing.);job();}Err() {println!(Worker {id} disconnected; shutting down.);break;}}});Worker {id,thread: Some(thread),}} }这里还有很多可以做的事如果你希望继续增强这个项目如下是一些点子 为 ThreadPool 和其公有方法增加更多文档为库的功能增加测试将 unwrap 调用改为更健壮的错误处理使用 ThreadPool 进行其他不同于处理网络请求的任务在 crates.io 上寻找一个线程池 crate 并使用它实现一个类似的 web server将其 API 和鲁棒性与我们的实现做对比