镇江网站制作教程一级做a免费体验区不用下载网站

当前位置: 首页 > news >正文

镇江网站制作教程,一级做a免费体验区不用下载网站,齐齐哈尔哪里做网站,替 wordpress后台登陆页面模板十七、day17 之前我们介绍了IOServicePool的方式#xff0c;一个IOServicePool开启n个线程和n个iocontext#xff0c;每个线程内独立运行iocontext, 各个iocontext监听各自绑定的socket是否就绪#xff0c;如果就绪就在各自线程里触发回调函数。为避免线程安全问题#xf…十七、day17 之前我们介绍了IOServicePool的方式一个IOServicePool开启n个线程和n个iocontext每个线程内独立运行iocontext, 各个iocontext监听各自绑定的socket是否就绪如果就绪就在各自线程里触发回调函数。为避免线程安全问题我们将网络数据封装为逻辑包投递给逻辑系统逻辑系统有一个单独线程处理这样将网络IO和逻辑处理解耦合极大的提高了服务器IO层面的吞吐率。 今天给大家介绍asio多线程模式的第二种多线程模式IOThreadPool我们只初始化一个iocontext用来监听服务器的读写事件包括新连接到来的监听也用这个iocontext。只是我们让iocontext.run在多个线程中调用启动一个 io_context由多个线程共享这样回调函数就会被不同的线程触发从这个角度看回调函数被并发调用了。 1. IOThreadPool实现

  1. IOThreadPool.h #pragma once #include Singleton.h #include boost/asio.hppclass AsioThreadPool : public SingletonAsioThreadPool { public:friend class SingletonAsioThreadPool;~AsioThreadPool() {}AsioThreadPool operator (const AsioThreadPool) delete;AsioThreadPool(const AsioThreadPool) delete;boost::asio::io_context GetIOService();void Stop(); private:AsioThreadPool(int threadNum std::thread::hardware_concurrency());boost::asio::io_context _service;std::unique_ptrboost::asio::io_context::work _work;std::vectorstd::thread _threads; };AsioThreadPool也是单例模式需继承单例模板类SingletonT_serviceAsioThreadPool多线程模式中只有一个io_context进行不同线程的调度所以仅初始化一个ioc_workwork的数量与ioc相同有且仅有一个_threads线程数默认与cpu核数相同 AsioThreadPool继承 Singleton 模板类时Singleton 类的拷贝构造函数和赋值运算符已经被删除这意味着AsioThreadPool也会继承这些删除操作。因此派生类默认不能被拷贝或赋值也不需要再次delte拷贝或者赋值但是可以写上增加代码阅读性。 2AsioThreadPool构造函数 AsioThreadPool::AsioThreadPool(int threadNum) : _work(new boost::asio::io_context::work(_service)){for (int i 0; i threadNum; i) {_threads.emplace_back(this {_service.run();});} }注意work是通过std::unique_ptr进行管理的所以work不能被拷贝仅仅可以通过移动语义转移。 将AsioThreadPool池中唯一的ioc与work绑定防止ioc返回然后启动threadNum个线程每个线程中都调用_service.run()。 _service.run函数内部就是从iocp或者epoll获取就绪描述符和绑定的回调函数进而调用回调函数因为回调函数是在不同的线程里调用的所以会存在不同的线程调用同一个socket的回调函数的情况。 _service.run内部在Linux环境下调用的是epoll_wait返回所有就绪的描述符列表在windows上会循环调用GetQueuedCompletionStatus函数返回就绪的描述符二者原理类似进而通过描述符找到对应的注册的回调函数然后调用回调函数。 二者的流程如下 a. IOCP IOCP的使用主要分为以下几步 1 创建完成端口(iocp)对象 2 创建一个或多个工作线程在完成端口上执行并处理投递到完成端口上的I/O请求 3 Socket关联iocp对象在Socket上投递网络事件 4 工作线程调用GetQueuedCompletionStatus函数获取完成通知封包取得事件信息并进行处理。该函数会阻塞直到有I/O操作完成并返回相应的事件信息线程接收到通知后就可以处理对应的I/O请求。 b. epoll 调用epoll_creat在内核中创建一张epoll表这张 epoll 表用于管理要监视的文件描述符开辟一片包含n个epoll_event大小的连续空间这个空间的大小是预期要监听的事件数量 n以便存放准备好的事件信息。将要监听的socket或其他文件描述符注册到epoll表里可以指定需要监听的事件类型如可读、可写等并将其与之前创建的 epoll 实例关联。调用epoll_wait传入之前我们开辟的连续空间这一调用会阻塞直到有事件发生。一旦有事件就绪epoll_wait 会返回就绪的 epoll_event 列表并将对应的 Socket 信息写入之前开辟的内存空间中供后续处理。 3其他函数 boost::asio::io_context AsioThreadPool::GetIOService() {return _service; }void AsioThreadPool::Stop() {_work.reset();for (auto t : _threads) {t.join(); } }GetIOService()用于返回多线程池中唯一的上下文服务Stop()用于将work释放以便ioc可以返回
  2. 隐患及如何解决 IOThreadPool模式有一个隐患同一个socket的就绪后触发的回调函数可能在不同的线程里比如第一次是在线程1第二次是在线程3如果这两次触发间隔时间不大那么很可能出现不同线程并发访问数据的情况比如在处理读事件时第一次回调触发后我们从socket的接收缓冲区读数据出来第二次回调触发,还是从socket的接收缓冲区读数据就会造成两个线程同时从socket中读数据的情况会造成数据混乱。因为多个线程都会执行io_context.run()每次调用都会返回一些就绪的回调函数比如线程1调用io_context.run()返回ReadHandler1以及绑定的socket线程2调用io_context.run()返回ReadHandler2以及绑定的socket线程3调用io_context.run()返回ReadHandler3以及绑定的socket但是ReadHandler1和ReadHandler3都是session1调用的那么可能会导致多个线程同时处理相同的回调导致线程安全问题 注但如果回调函数被派发到逻辑队列中或者进行加锁那么就不会存在不同线程访问同一个socket数据的线程安全问题。这里的隐患是不通过队列或者加锁处理逻辑问题会存在线程安全问题。 1通过strand改进 在多线程环境中触发回调函数时我们可以使用 asio 提供的串行类 strand 来进行封装从而实现串行调用。其基本原理是每个线程在调用函数时不直接执行该函数而是将要调用的函数投递到由 strand 管理的队列中随后由一个统一的线程从队列中取出并调用这些回调函数。通过这种方式函数调用是串行进行的从而解决由于线程并发带来的安全问题。 使用strand进行封装 图中当socket就绪后并不是由多个线程调用每个socket注册的回调函数而是将回调函数投递给strand管理的队列再由strand统一调度派发。为了让回调函数被派发到strand的队列我们只需要在注册回调函数时加一层strand的包装即可。 strand 实际上是将回调函数放入同一个队列如果有逻辑层或者通过加锁处理回调那么就不需要strand以确保线程安全。因此在描述符就绪后相应的回调会被放入该队列中进行处理。虽然使用 strand 可以保证在多线程环境下的安全性但在触发时仍然是单线程的这限制了整体性能。因此尽管多线程可以提高读写事件的派发效率但整体性能通常不如IOService_pool 这种方式更优。在实际工作中我们通常选择使用 IOService_pool因为它能够更好地平衡并发和性能。 a. CSession类中新增成员变量_strand boost::asio::strandboost::asio::io_context::executor_type _strand;b. 修改CSession的构造函数 CSession::CSession(boost::asio::io_context io_context, CServer* server):_socket(io_context), _server(server), _b_close(false),_b_head_parse(false), _strand(io_context.get_executor()){boost::uuids::uuid a_uuid boost::uuids::random_generator()();_uuid boost::uuids::to_string(a_uuid);_recv_head_node make_sharedMsgNode(HEAD_TOTAL_LEN); }可以看到_strand的初始化是放在初始化列表里利用io_context.get_executor()返回的执行器构造strand。因为在asio中无论iocontext还是strand底层都是通过executor调度的我们将他理解为调度器就可以。如果多个iocontext和strand的调度器是一个那他们的消息派发统一由这个调度器执行。我们利用iocontext的调度器构造strand这样他们统一由一个调度器管理。在绑定回调函数的调度器时我们选择strand绑定即可。 比如我们在Start函数里添加绑定 将回调函数的调用者绑定为_strand void CSession::Start(){::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),boost::asio::bind_executor(_strand, std::bind(CSession::HandleRead, this,std::placeholders::_1, std::placeholders::_2, SharedSelf()))); }修改前的代码为 void CSession::Start() {memset(_data, 0, MAX_LENGTH); // 缓冲区清零// 从套接字中读取数据并绑定回调函数headle_read_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2,shared_from_this()));//_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),// boost::asio::bind_executor(_strand, // std::bind(CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2,shared_from_this()))); }同样的道理在所有收发的地方都将调度器绑定为_strand 比如发送部分我们需要修改为如下 auto msgnode _send_que.front();boost::asio::async_write(_socket, boost::asio::buffer(msgnode-_data, msgnode-_total_len), boost::asio::bind_executor(_strand, std::bind(CSession::HandleWrite, this, std::placeholders::_1, SharedSelf())));3. 客户端 int main() {try {auto pool AsioThreadPool::GetInstance();boost::asio::io_context ioc;boost::asio::signal_set signals(ioc, SIGINT, SIGTERM);// 必须异步等待否则建立线程进行处理signals.async_wait(ioc, pool {if (!error) {std::cout Signal signal_number received. std::endl;ioc.stop(); // 停止 io_contextpool-Stop();std::unique_lockstd::mutex lock(mutex_quit);bstop true;cond_quit.notify_one();}});CServer s(pool-GetIOService(), 10086);ioc.run();{std::unique_lockstd::mutex lock(mutex_quit);while (!bstop) {cond_quit.wait(lock);}}}catch (std::exception e) {std::cerr Exception: e.what() \n;}boost::asio::io_context io_context; }之所以用条件变量是因为传入Server的ioc是从线程池中获得的Server初始化之后并没有调用ioc.run()线程池中的ioc是运行在自己的线程中的下面这段是运行在主线程中的是为了防止主线程结束了而子线程还在运行所以必须设置条件变量防止主线程结束 {std::unique_lockstd::mutex lock(mutex_quit);while (!bstop) {cond_quit.wait(lock);}}如果不设置条件变量使主线程挂起那么当主线程结束后如果server还没有跑起来server相当于子线程那么会直接让子线程结束 CServer s(pool-GetIOService(), 10086);单独生成一个ioc运行监听信号当收到两个退出信号时服务器退出而server的ioc是从线程池中获取的。当获取到退出信号时首先将监听信号的ioc退出然后将线程池的unique_ptr释放work使得线程池中的ioc能退出最后上锁然后唤醒主线程主线程结束。 std::cout Signal signal_number received. std::endl;ioc.stop(); // 停止 io_contextpool-Stop();std::unique_lockstd::mutex lock(mutex_quit);bstop true;cond_quit.notify_one();4. 总结
  3. 在IOServicePool示例中使用main函数中定义的io_context来监听异常信号signal并且用于初始化Server来监听接收信号但在IOThreadPool示例中main函数定义的ioc仅仅用于监听异常信号而初始化Server启动异步接收async_accept是通过线程池中的ioc。 这是因为 第一种方式调用了n1个ioc.run()第二种方式调用了2个ioc.run() 前者在main函数中处理信号监听和Server任务当main中的ioc接收到客户端连接之后生成一个子session并从IOServicePool线程池中取一个ioc用于管理子Session的收发。也就是说main中的ioc用于监听信号和server任务处理而线程池中的ioc用于处理子Session的收发。main中的ioc一旦停止那么服务器就会停止客户端的连接服务session任务不会在增加而线程池中的ioc全部停止后线程的收发就会停止。后者再main函数中处理监听信号任务而server任务处理交给了线程池IOThreadPool中唯一的ioc进行处理。也就是说让一个ioc单独监听信号另一个ioc运行在多线程中做收发。main中的ioc停止后仅仅只是服务器的信号监听停止了而server的任务处理以及session的收发仍然会持续。 此外他们的退出机制也有些不同。 第一种方式信号处理函数在 ioc 上注册当收到信号时会调用 ioc.stop() 停止 io_context并调用线程池的 Stop 方法停止线程池的运行。这种方式不需要额外的同步机制因为主线程就是 ioc.run() 的执行线程一旦停止就会退出。第二种方式信号处理函数在主线程的 ioc 上注册但实际处理的 I/O 事件由线程池中的 io_context 完成。为了协调多线程退出需要使用 mutex 和条件变量cond_quit来确保所有线程安全地退出。信号处理函数在停止 io_context 后会通知等待在条件变量上的线程退出从而确保整个程序能够正确结束。
  4. 哪一种多线程模式的效率最高 直观上来说可能二者的效率都差不多其实IOServicePool的效率略高于IOThreadPool。 IOThreadPool 通常会涉及到 strandstrand 的作用是将回调函数放入同一个队列中执行以确保线程安全如果逻辑层或者通过加锁已经处理了回调的同步问题那么就不需要使用 strand。在这种机制下当描述符就绪时相应的回调会被放入这个队列中依次处理。虽然 strand 可以确保多线程环境下的安全性但由于回调函数的执行是串行的所以在事件触发时仍然是单线程操作这在一定程度上限制了整体性能的提升。 尽管多线程模式确实能提高读写事件的派发效率但由于 strand 的串行限制整体性能通常不如使用 IOService_pool 的方式。在实际工作中我们更倾向于使用 IOService_pool因为它在保证线程安全的同时能够更好地利用并发性提升程序性能和效率。