江西九江永修网站建设怀化医保网站

当前位置: 首页 > news >正文

江西九江永修网站建设,怀化医保网站,微信公众号对接网站做,长春少儿编程培训机构C实现线程池一、前言二、线程池的接口设计2.1、类封装2.2、线程池的初始化2.3、线程池的启动2.4、线程池的停止2.5、线程的执行函数run()2.6、任务的运行函数2.7、等待所有线程结束三、测试线程池四、源码地址总结一、前言 C实现的线程池#xff0c;可能涉及以下知识点#… C实现线程池一、前言二、线程池的接口设计2.1、类封装2.2、线程池的初始化2.3、线程池的启动2.4、线程池的停止2.5、线程的执行函数run()2.6、任务的运行函数2.7、等待所有线程结束三、测试线程池四、源码地址总结一、前言 C实现的线程池可能涉及以下知识点 decltype。packaged_task。make_shared。mutex。unique_lock。notify_one。future。queue。bind。thread。 等等。
二、线程池的接口设计 1封装一个线程池的类。 2线程池的初始化设置线程的数量。 3启动线程池创建线程等工作。 4执行任务的函数。 5停止线程池。 6等所有任务执行完成退出执行函数。 2.1、类封装 线程池类采用c11来实现。 #ifndef _CPP_THREAD_POOLH #define _CPP_THREAD_POOLH#include iostream #include functional #include memory #include queue #include mutex #include vector #include thread #include future#ifdef WIN32 #include windows.h #else #include sys/time.h #endifusing namespace std;void getNow(timeval tv); int64_t getNowMs();#define TNOW getNow() #define TNOWMS getNowMs()class CPP_ThreadPool{ protected:struct TaskFunc{TaskFunc(uint64_t expireTime):_expireTime(expireTime){}int64_t _expireTime0;//超时的绝对时间functionvoid() _func;};typedef shared_ptrTaskFunc TaskFuncPtr;/ * brief 获取任务 ** *return TaskFuncPtr /bool get(TaskFuncPtr task);/** brief 线程池是否退出/bool isTerminate(){return _bTerminate;}/** brief 线程运行态*/void run();public: /** brief 构造函数 /CPP_ThreadPool(); / * brief 析构, 会停止所有线程 /virtual ~CPP_ThreadPool();/ * * brief 初始化. * * param num 工作线程个数 */bool init(size_t num);/** brief 停止所有线程, 会等待所有线程结束 */void stop();/** brief 启动所有线程 /bool start();/ * brief 等待当前任务队列中, 所有工作全部结束(队列无任务). * param millsecond 等待的时间(ms), -1:永远等待 * return true, 所有工作都处理完毕 * false,超时退出 /bool waitForAllDone(int millsecond-1);/** brief 获取线程个数. return size_t 线程个数 /size_t getThreadNum(){unique_lockmutex lock(_mutex);return _threads.size();}/** brief 获取当前线程池的任务数 return size_t 线程池的任务数 */size_t getJobNum(){unique_lockmutex lock(_mutex);return _tasks.size();}/** brief 用线程池启用任务(F是function, Args是参数) ** * param ParentFunctor * param tf * return 返回任务的future对象, 可以通过这个对象来获取返回值 /template class F,class… Argsauto exec(F f, Args… args)-futuredecltype(f(args…)){return exec(0,f,args…);}/ * unused.** brief 用线程池启用任务(F是function, Args是参数) * param 超时时间 单位ms (为0时不做超时控制) 若任务超时此任务将被丢弃 * param bind function * return 返回任务的future对象, 可以通过这个对象来获取返回值 ** template class F, class… Args * 它是c里新增的最强大的特性之一它对参数进行了高度泛化它能表示0到任意个数、任意类型的参数 * auto exec(F f, Args … args) - std::futuredecltype(f(args…)) * std::futuredecltype(f(args…))返回future调用者可以通过future获取返回值 * 返回值后置*/templateclass F,class… Argsauto exec(int64_t timeoutMs,F f,Args… args) - futuredecltype(f(args…)){//获取现在时间int64_t expireTime(timeoutMs0)?0:TNOWMStimeoutMs;// 定义返回值类型using retTypedecltype(f(args…));// 封装任务auto taskmake_sharedpackaged_taskretType()(bind(forwardF(f),forwardArgs(args)…));// 封装任务指针设置过期时间TaskFuncPtr fPtrmake_sharedTaskFunc(expireTime);fPtr-_functask{(task)();};unique_lockmutex lock(_mutex);// 插入任务_tasks.push(fPtr);// 唤醒阻塞的线程可以考虑只有任务队列为空的情 况再去notify_condition.notify_one();return task-get_future();}protected:size_t _threadNum;//线程数量bool _bTerminate;//判定是否终止线程池mutex _mutex; //唯一锁vectorthread _threads; //工作线程数组queueTaskFuncPtr _tasks; //任务队列condition_variable _condition;//条件变量atomicint _atomic{0};//原子变量 };#endif使用示例 CPP_ThreadPool tpool; tpool.init(5); //初始化线程池线程数 //启动线程方式 tpool.start(); //将任务丢到线程池中* tpool.exec(testFunction, 10); //参数和start相同 //等待线程池结束 tpool.waitForAllDone(1000); //参数0时, 表示无限等待(注意有人调用stop也会推出) //此时: 外部需要结束线程池是调用 tpool.stop(); 注意ZERO_ThreadPool::exec执行任务返回的是个future, 因此可以通过future异步获取结果, 比如 int testInt(int i) { return i; } auto f tpool.exec(testInt, 5); cout f.get() endl; //当testInt在线程池中执行后, f.get()会返回数值5 class Test { public: int test(int i); }; Test t; auto f tpool.exec(std::bind(Test::test, t, std::placeholders::_1), 10); //返回的future对象, 可以检查是否执行 cout f.get() endl; 2.2、线程池的初始化 主要是设置线程池中线程的数量如果线程池已经存在则直接返回防止重复初始化。 bool CPP_ThreadPool::init(size_t num) {unique_lockmutex lock(_mutex);if(!_threads.empty())return false;_threadNumnum;return true; }2.3、线程池的启动 根据设置的线程数量创建线程并保存在一个数组中。如果线程池已经存在则直接返回防止重复启动。 bool CPP_ThreadPool::start() {unique_lockmutex lock(_mutex);if(!_threads.empty())return false;for(size_t i0;i_threadNum;i){_threads.push_back(new thread(CPP_ThreadPool::run,this));}return true; }2.4、线程池的停止 设置线程退出条件并通知所有线程。停止时要等待所有线程都执行完任务再销毁线程。 需要注意锁的粒度。 void CPP_ThreadPool::stop() {// 注意要有这个{}不然会死锁。{unique_lockmutex lock(_mutex);_bTerminatetrue;_condition.notify_all();}size_t thdCount_threads.size();for(size_t i0;ithdCount;i){if(_threads[i]-joinable()){_threads[i]-join();}delete _threads[i];_threads[i]NULL;}unique_lockmutex lock(_mutex);_threads.clear(); }2.5、线程的执行函数run() 读取任务判断任务是否存在如果任务队列为空则进入等待状态直到任务队列不为空或退出线程池这里需要两次判断因为可能存在虚假唤醒。 执行任务调用匿名函数。 检测所有任务都是否执行完毕这里使用了原子变量来检测任务是否都执行完原因在于任务队列为空不代表任务已经执行完任务可能还在运行中、也可能是任务刚弹出还没运行使用原子变量来计数就更严谨。 bool CPP_ThreadPool::get(TaskFuncPtr task) {unique_lockmutex lock(_mutex);if(_tasks.empty())//判断任务是否存在{_condition.wait(lock,[this]{return _bTerminate || !_tasks.empty();//唤醒条件});}if(_bTerminate)return false;if(!_tasks.empty())//判断任务是否存在{taskmove(_tasks.front());// 使用移动语义_tasks.pop();//弹出一个任务return true;}return false; }// 执行任务的线程 void CPP_ThreadPool::run() {while(!isTerminate()){TaskFuncPtr task;// 读取任务bool okget(task);if(ok){_atomic;try{if(task-_expireTime!0 task-_expireTime TNOWMS){// 处理超时任务}elsetask-_func();//执行任务}catch(…){}–_atomic;// 任务执行完毕,这里只是为了通知waitForAllDoneunique_lockmutex lock(_mutex);if(_atomic0 _tasks.empty())_condition.notify_all();}} }2.6、任务的运行函数 这里使用了可变模块参数、智能指针、bind、function、捕获列表的相关技术知识。 返回任务的future对象, 可以通过这个对象来获取返回值。 超时时间 单位ms (为0时不做超时控制) 若任务超时此任务将被丢弃。 可变模块参数对参数进行了高度泛化它能表示0到任意个数、任意类型的参数。 /** brief 用线程池启用任务(F是function, Args是参数) ** * param ParentFunctor * param tf * return 返回任务的future对象, 可以通过这个对象来获取返回值 /template class F,class… Argsauto exec(F f, Args… args)-futuredecltype(f(args…)){return exec(0,f,args…);}/ * unused.** brief 用线程池启用任务(F是function, Args是参数) * param 超时时间 单位ms (为0时不做超时控制) 若任务超时此任务将被丢弃 * param bind function * return 返回任务的future对象, 可以通过这个对象来获取返回值 ** template class F, class… Args * 它是c里新增的最强大的特性之一它对参数进行了高度泛化它能表示0到任意个数、任意类型的参数 * auto exec(F f, Args … args) - std::futuredecltype(f(args…)) * std::futuredecltype(f(args…))返回future调用者可以通过future获取返回值 * 返回值后置*/templateclass F,class… Argsauto exec(int64_t timeoutMs,F f,Args… args) - futuredecltype(f(args…)){//获取现在时间int64_t expireTime(timeoutMs0)?0:TNOWMStimeoutMs;// 定义返回值类型using retTypedecltype(f(args…));// 封装任务auto taskmake_sharedpackaged_taskretType()(bind(forwardF(f),forwardArgs(args)…));// 封装任务指针设置过期时间TaskFuncPtr fPtrmake_sharedTaskFunc(expireTime);fPtr-_functask{(*task)();};unique_lockmutex lock(_mutex);// 插入任务_tasks.push(fPtr);// 唤醒阻塞的线程可以考虑只有任务队列为空的情 况再去notify_condition.notify_one();return task-get_future();}2.7、等待所有线程结束 bool CPP_ThreadPool::waitForAllDone(int millsecond) {unique_lockmutex lock(_mutex);if(_tasks.empty())return true;if(millsecond0){_condition.wait(lock,[this]{ return _tasks.empty();});return true;}else{return _condition.wait_for(lock,chrono::milliseconds(millsecond),[this]{ return _tasks.empty();});} }三、测试线程池 #include iostream #include cppThreadPool.husing namespace std;void func1(int a) { cout func1() a a endl; } void func2(int a, string b) { cout func2() a a , b b endl; }void func3() {coutfunc3endl; }void test01() {couttest 01endl;CPP_ThreadPool threadpool;threadpool.init(2);threadpool.start();//启动线程池// 执行任务threadpool.exec(func1,10);threadpool.exec(func2,20,FLY.);threadpool.exec(1000,func3);threadpool.waitForAllDone();threadpool.stop(); }int func1_future(int a) { cout func1() a a endl; return a; }string func2_future(int a, string b) { cout func2() a a , b b endl; return b; }void test02() {couttest 02endl;CPP_ThreadPool threadpool;threadpool.init(2);threadpool.start();//启动线程池futuredecltype(func1_future(0)) ret01threadpool.exec(func1_future,10);futurestring ret02threadpool.exec(func2_future,20,FLY.);threadpool.waitForAllDone();coutret01 ret01.get()endl;coutret02 ret02.get()endl;threadpool.stop();}class Test{ public:int test(int a){cout_name: a aendl;return a1;}void setname(string name){_namename;}string _name;};void test03() {couttest 03endl;CPP_ThreadPool threadpool;threadpool.init(2);threadpool.start();//启动线程池Test t1;Test t2;t1.setname(Test 1);t2.setname(Test 2);auto f1threadpool.exec(bind(Test::test,t1,placeholders::_1),10);auto f2threadpool.exec(bind(Test::test,t2,placeholders::_1),20);threadpool.waitForAllDone();coutf1 f1.get()endl;coutf2 f2.get()endl;threadpool.stop();}int main(int argc,char **argv) {// 简单测试线程池test01();// 测试任务函数返回值test02();// 测试类对象函数的绑定test03();return 0; }执行结果 test 01 func1() a10 func2() a20, bFLY. func3 test 02 func1() a10 func2() a20, bFLY. ret01 10 ret02 FLY. test 03 Test 1: a 10 Test 2: a 20 f1 11 f2 21四、源码地址 源码已经上传github。 总结 线程池的核心初始化、线程启动、执行函数、线程停止。