【项目实战】基于protobuf的发布订阅式消息队列(2)—— 线程池

张开发
2026/4/11 10:11:48 15 分钟阅读

分享文章

【项目实战】基于protobuf的发布订阅式消息队列(2)—— 线程池
目录一介绍一异步操作1.1 std::future 与 std::async 介绍1.2 promise::get_future1.3 packaged_task::get_future二线程池实现一介绍基础线程池的实现可以参考Linux系统编程——线程池_linux内核线程池-CSDN博客一个线程池其实很容易实现只需要有一组工作线程然后加上线程安全的任务队列然后这些工作线程不断从这个队列里取出任务然后执行难度不高但是这样的线程池无法获取这个任务执行的结果是“只进不出”的一种模式所以我们需要通过异步操作来让我们能够获取任务执行结果一异步操作1.1 std::future 与 std::async 介绍官方文档地址future - C Reference关于 std::futurestd::future 是C11标准库中的一个模板类它表示一个异步操作的结果可以帮助我们在需要的时候获取多线程编程中的异步任务的执行结果std::future 的一个重要特性是能够阻塞当前线程直到异步操作完成从而确保我们在获取结果时不会遇到未完成的操作应用场景异步任务当我们需要在后台执行一些耗时操作时如网络请求或计算密集型任务等std::future 可以用来表示这些异步任务的结果。通过将任务与主线程分离我们可以实现任务的并行处理从而提⾼程序的执行效率并发控制在多线程编程中我们可能需要等待某些任务完成后才能继续执行其他操作。通过使用 std::future我们可以实现线程之间的同步确保任务完成后再获取结果并继续执行后续操作结果获取std::future 提供了一种安全的方式来获取异步任务的结果。我们可以使用 std::future::get() 来获取任务的结果此函数会阻塞当前线程直到异步操作完成。这样在调用 get() 函数时我们可以确保已经获取到了所需的结果概括一下future 是一个可以获取某个异步线程里面执行的函数执行结果的对象可以保证在不同的线程中同步访问有结果就获取没结果就阻塞住关于 std::async前面说过futrue是获取异步线程里执行的函数的执行结果所以 future 的任务就是“获取结果”那么这个异步线程怎么来的呢执行的函数怎么放到这个异步线程里去执行呢async 的任务就是负责启动这个异步线程注意目前只是单纯介绍一个简单的异步线程还没有讲解到线程池部分async是一个函数有两个参数第一个参数就是要执行的函数名第二个就是要传递的参数可以是多个参数然后返回一个future对象相当于在后台建立一个线程然后执行函数不阻塞当前线程再举一个更简单的例子假设async是厨师我们用户是顾客厨师负责在后台做饭而我们顾客要想吃饭就必须通过取餐号futrue去取餐这个餐就是线程的执行结果代码示例#include iostream #include thread #include future int Add(int a, int b) { std::cout hello world std::endl; std::this_thread::sleep_for(std::chrono::seconds(3)); return a b; } int main() { std::futureint res std::async(std::launch::deferred, Add, 10, 20); // 这是 async 的带启动策略参数重载会在后面单独讲解 // 这里只要知道deferred的作用是不创建线程与执行在后面调用 get 或 wait 时才会创建线程和执行 int num res.get(); // 调用get后才会执行异步线程里的 Add 函数如果 Add 没有返回则会一直等待 std::cout num std::endl; return 0; }关于 async 带启动策略主要是两个关键参数std::launch::async立即创建线程并执行函数调用get 和 wait 时函数已经执行完或者正在执行std::launch::deferred调用时不立即创建线程不立即执行函数只有后面调用 get 或 wait时才会创建线程和执行如果不传这两个则由系统决定是开线程异步还是延迟执行这取决于当前线程资源1.2 promise::get_future除了 async 还有两个直接上代码#include iostream #include thread #include future void Add(int a, int b, std::promiseint prom) { std::this_thread::sleep_for(std::chrono::seconds(3)); prom.set_value(a b); // 可以手动设置结果 // 建立关系后可以在一个线程里对这个 promise 对象设置一个值然后另一个线程可以获取这个值这样也可以获取异步任务执行的结果常用于线程之间数据交流 return; } int main() { std::promiseint pro; std::futureint fu pro.get_future(); // 类似构建一种关联关系 std::thread thr(Add, 10, 20, std::ref(pro)); // 这里是强制取引用 int num fu.get(); // 如果函数中没有通过 set_value 设置值则会阻塞比如这里就是等待三秒后才获取结果 std::cout num std::endl; thr.join(); return 0; }1.3 packaged_task::get_future翻译过来就是“函数包”的意思功能简单介绍一下就是把一个函数 / 可调用对象包装成一个 “将来能拿结果” 的异步任务它不负责开线程不负责执行只负责两件事包装任务把任务的返回值 / 异常塞进一个 future 里话不多说直接上代码#include iostream #include thread #include future int Add(int a, int b) { std::this_thread::sleep_for(std::chrono::seconds(3)); return a b; } // packaged_task是一个模板类实例化的对象可以对一个函数进行二次封装 // 然后可以通过get_future获取一个 future 对象来获取封装的这个函数的异步执行结果 int main() { // std::packaged_taskint(int, int) task(Add); // std::futureint fu task.get_future(); // std::async(std::launch::async, task, 10, 29); //编译时会报错模板不匹配之类的错误因为packaged_task不能拷贝只能移动如果这里的task换成 std::move(task) 就不会报错了 // task(10. 20); //编译不报错但是只是在这里单纯地执行无异步效果 // std::thread thr(task, 10, 20); //编译报错和 async 同理 // 所以task可以被当作一个可调用对象来调用执行任务但是又不能完全当作一个函数来使用 std::shared_ptrstd::packaged_taskint(int, int) ptask std::make_sharedstd::packaged_taskint(int, int)(Add); std::futureint fu ptask-get_future(); std::thread thr([ptask]() { (*ptask)(10, 20); }); int num fu.get(); std::cout num std::endl; thr.join(); return 0; }重点为什么 packaged_task 对象不能被拷贝里面有一个promise对象这个负责把返回值传给 futurepromise 必须唯一因为它是结果的唯一发送方如果拷贝了就会出现两个 promise 都想往同一个 future 塞结果这就会导致一系列bug所以 promise 不能拷贝所以 packaged_task 也不能拷贝更细一点说std::promise 是独占型对象类似 unique在底层设计上就是不可拷贝的我们用的 shared_ptr 是因为我们拷贝的是指针不是 packaged_task 本身二线程池实现我们选择std::packaged_task 和 std::future 的组合来实现因为基于线程池执行任务的时候⼊⼝函数内部执行逻辑是固定的线程池要做的事情用户传入要执行的函数以及需要处理的数据函数的参数然后由线程池中的工作线程来执行函数完成任务线程需要实现的管理的成员任务池用 vector 维护的一个函数任务池子互斥锁条件变量实现同步互斥一定数量的工作线程用于不断从任务池取出任务执行任务结束运行标志以便于控制线程池的结束管理的操作入队任务入队一个函数和参数停止运行终止线程池线程池代码如下包括测试有大量注释详细介绍#include iostream #include functional #include memory #include thread #include future #include mutex #include vector #include atomic #include condition_variable class ThreadPool { public: using Functor std::functionvoid(void); // 声明一个类型这个类型是个函数类型 public: ThreadPool(int thread_num 1) : _stop(false) { for (int i 0; i thread_num; i) { _threads.emplace_back(ThreadPool::entry, this); // 在向量中直接构造线程对象第一个参数是指向线程入口函数this是作为参数传递给成员函数 } } ~ThreadPool() { // 直接调用 stop() 即可 stop(); } // push 要传入一个函数和若干参数push内部会将传入的函数封装成一个packaged_task异步任务 // 然后根据这个异步任务生成lambda表达式的可调用对象然后扔进任务池中接着由工作线程取出并执行 template typename F, typename... Args // 第一个表示函数第二个表示不定参数 auto push(F func, Args ...args) - std::futuredecltype(func(args...)) // 使用右值引用保持类型由于不确定函数返回值是什么所以使用 C11 的尾置返回类型语法配合decltype推导出函数返回值 { // 上面这一块文章后面会有详细说明 // 1推导返回值类型 using RetType decltype(func(args...)); // 2将传入函数封装成 packaged_task 对象 auto tmp_func std::bind(std::forwardF(func), std::forwardArgs(args)...); // bind 可以对函数进行参数绑定然后生成可调用对象这个对象可以不用传参直接调用 // 为什么要二次封装因为任务队列只能存一种类型而用户传给我们的函数类型可能是任意的 auto task std::make_sharedstd::packaged_taskRetType()(tmp_func); // 用智能指针是因为packaged_task 不能拷贝 // 3获取 future以后用来获取异步结果 std::futureRetType fut task-get_future(); // 4构造出一个 lambda 匿名函数捕获任务对象函数内指向任务对象 std::unique_lockstd::mutex lock(_mtx); _taskpool.push_back([task]() { (*task)(); }); // 先捕捉匿名对象通过智能指针直接解引用task // 5通知线程来取任务 _cv.notify_one(); return fut; } void stop() { if (_stop true) return; _stop true; _cv.notify_all(); for (auto e : _threads) e.join(); } private: void entry() // 从任务池取出任务然后执行 { while (!_stop) { // 如果一次只取一个任务的话就会有频繁的任务取出以及加锁等操作效率会大大降低所以在这里定义一个小的容器能够一次取出多个任务 std::vectorFunctor tmp_taskpool; std::unique_lockstd::mutex lock(_mtx); { // 等待任务池不为空或者 _stop 被置为返回 _cv.wait(lock, [this]() { return _stop || !_taskpool.empty(); }); // wait第二个参数是被唤醒的条件使用lamdba捕获this是为了让这个lamdba能够访问类成员变量 // 取出任务并执行 tmp_taskpool.swap(_taskpool); // 一次取出所有任务典型的空间换时间场景 } for (auto e : tmp_taskpool) { e(); // 执行任务 } } } private: std::atomicbool _stop; // 原子变量多线程编程中常用的同步机制它能够确保对共享变量的操作在执行时不会被其他线程的操作干扰 std::vectorFunctor _taskpool; // 任务池任务就是一个一个的函数通过下面的锁和条件变量控制访问 std::mutex _mtx; std::condition_variable _cv; // 条件变量 std::vectorstd::thread _threads; // 工作线程 }; int Add(int a, int b) { return a b; } int main() { ThreadPool pool; for (int i 1; i 10; i) { std::futureint fu pool.push(Add, 10, i); std::cout fu.get() std::endl; } pool.stop(); return 0; }关于push函数的说明具体说明注释已经解释清楚啦下面是额外补充说明假设传进来的是 int Add(int, int)那么 F 会变成 AddArgs 会变成 int, int 两个RetType 会变成 intbind 把 函数 f 和参数 args... 绑在一起变成一个不用传参就能直接调用的东西然后再二次封装成 package_task 因为只有这样才能让任务返回结果到 future然后再用 shared_ptr 包起来因为 packaged_task 不能拷贝为什么后面把任务放队列里时要用 lamdba因为任务队列在定义时只能存一种类型push函数就是把用户的函数打包成安全的异步任务然后扔线程池里push函数不关心执行过程只关心结果最后返回一个future类型给用户用户就可以根据这个 future 返回值拿到执行结果了

更多文章