大陆365bet网址

C++跨平台轻量级线程池(ThreadPool)

C++跨平台轻量级线程池(ThreadPool)

一个跨平台, 简单易用的Header-only线程池库, 基于Task提交, 支持提交任意参数提交, 支持获取返回值.

GitHub仓库地址

📌 线程池简介

线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程编程中。

它的核心思想是:预先创建一定数量的线程放在“池子”里,任务来了就把任务交给空闲的线程来处理,而不是每次都新建线程。

🚀特性亮点

任务提交灵活:支持任意可调用对象与参数组合,返回 std::future 获取执行结果

线程安全:使用 std::mutex / std::condition_variable / std::atomic 构建同步机制

跨平台:纯 C++11 实现,兼容 Windows 与 POSIX 等系统

Header-only:仅需包含 thread_pool.h,零依赖,即可使用

RAII 自动管理资源:析构时自动关闭线程池,防止资源泄露

任务等待机制:支持主动调用 wait_all() 等待所有任务完成

灵活关闭策略:默认是自动关闭线程池的, 如果有需要可以手动关闭线程池:

WaitForAllTasks: 等待所有任务完成后关闭

DiscardPendingTasks: 丢弃未开始的任务立即关闭

📦 快速开始

安装使用

拷贝thread_pool.h到你的项目目录,然后在代码中引入:

1

#include "thread_pool.h"

无需额外依赖,完全头文件实现。

基础示例代码

基础使用

1

2

3

4

5

6

7

8

9

10

11

12

13

14

#include "thread_pool.h"

#include

int main() {

abin::threadpool pool(4);

auto future1 = pool.submit([] { return 42; });

std::cout << "结果: " << future1.get() << "\n";

auto future2 = pool.submit([](int a, int b) { return a + b; }, 5, 7);

std::cout << "加法结果: " << future2.get() << "\n";

return 0;

}

提交任意类型任意参数的可调用对象

点击查看代码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

#include "thread_pool.h"

#include

#include

#include

#include

void normal_function(int x)

{

std::cout << "normal_function: " << x << std::endl;

}

struct MyClass

{

void member_function(int y)

{

std::cout << "MyClass::member_function: " << y << std::endl;

}

int add(int a, int b)

{

return a + b;

}

};

struct Functor

{

void operator()(const std::string& msg) const

{

std::cout << "Functor called with: " << msg << std::endl;

}

};

int main()

{

abin::threadpool pool(4);

// 提交一个普通函数

pool.submit(normal_function, 42);

// 提交一个无捕获 lambda

pool.submit([] { std::cout << "lambda no capture\n"; });

// 提交一个有捕获 lambda

int value = 99;

pool.submit([value] { std::cout << "lambda with capture: " << value << "\n"; });

// 提交成员函数, 使用lambda

MyClass obj;

pool.submit([&obj] { obj.member_function(123); });

// 提交成员函数, 使用 std::mem_fn

std::future ret = pool.submit(std::mem_fn(&MyClass::add), &obj, 3, 4);

std::cout << "add result1: " << ret.get() << "\n";

// 提交成员函数, 使用 std::bind

std::future fut_add = pool.submit(std::bind(&MyClass::add, &obj, 2, 3));

std::cout << "add result2: " << fut_add.get() << "\n";

// 提交一个函数对象(仿函数)

Functor f;

pool.submit(f, "hello functor");

// 使用 std::bind 提交

auto bound = std::bind(&MyClass::add, &obj, 5, 6);

std::future fut_bound = pool.submit(bound);

std::cout << "bound result: " << fut_bound.get() << "\n";

// 提交一个 std::packaged_task(注意: 低版本msvc可能报错)

std::packaged_task task([] { return std::string("from packaged_task"); });

std::future fut_str = task.get_future();

pool.submit(std::move(task)); // 必须 move

std::cout << "packaged_task result: " << fut_str.get() << "\n";

pool.wait_all(); // 等待任务完成

std::cout << "===All tasks completed.===\n";

}

ThreadPool源码

点击查看线程池源码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

/**************************************************************************************************************

*~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

* @file: thread_pool.h

* @version: v0.9.2

* @description: A cross-platform, lightweight, easy-to-use C++11 thread pool that supports submitting tasks with

* arbitrary parameters and obtaining return values

* - Futures

* - Task-based: Supports tasks with arbitrary parameters, and obtains return values ​​through `std::future`.

* - Cross-Platform: Works on platforms supporting C++11.

* - Thread Safety: Uses `std::mutex`, `std::condition_variable`, and atomic variables for synchronization.

* - Flexible Shutdown: Two modes for shutdown: `WaitForAllTasks` and `DiscardPendingTasks`.

* - Lightweight & Easy-to-Use: Simple API with minimal setup.

*

* @author: abin

* @date: 2025-04-20

* @license: MIT

* @repository: https://github.com/abin-z/ThreadPool

*~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

**************************************************************************************************************/

#ifndef ABIN_THREADPOOL_H

#define ABIN_THREADPOOL_H

#include

#include

#include

#include

#include

#include

#include

#include

#include

#include

namespace abin

{

/// @brief C++11的线程池, 提交任务支持任意多参数, 支持获取返回值

class threadpool

{

using task_t = std::function; // 定义任务类型为可调用对象

public:

/// @brief 线程池当前状态信息结构体

struct status_info

{

std::size_t total_threads; // 总线程数

std::size_t busy_threads; // 正在执行任务的线程数

std::size_t idle_threads; // 空闲线程数

std::size_t pending_tasks; // 等待中的任务数

bool running; // 线程池是否在运行

};

/// @brief 关闭线程池的模式

enum class shutdown_mode : unsigned char

{

/// @brief 等待所有已提交的任务完成后再关闭线程池

/// 在此模式下, 线程池会等待所有任务(包括已开始和未开始的任务)执行完成后再关闭.

WaitForAllTasks,

/// @brief 立即关闭线程池, 丢弃尚未开始的任务.

/// 在此模式下, 线程池会立即停止接收新任务, 丢弃所有尚未开始执行的任务,

/// 但已经开始执行的任务会继续执行, 直到它们完成.

DiscardPendingTasks

};

public:

/// @brief 构造函数, 初始化线程池并启动指定数量的工作线程

/// @param thread_count 要创建的线程数量, 默认为硬件支持的并发线程数(若无法获取则为 4)

explicit threadpool(std::size_t thread_count = default_thread_count())

{

launch_threads(validate_thread_count(thread_count)); // 创建线程

}

/// @brief 析构函数, 停止所有线程并等待它们完成

~threadpool()

{

shutdown(shutdown_mode::WaitForAllTasks);

}

/// @brief 提交任务到线程池并返回一个 future 对象, 用户可以通过它获取任务的返回值

///

/// @tparam F 任务类型的可调用对象

/// @tparam Args 可调用对象的参数类型

/// @param f 需要提交的任务

/// @param args 任务的参数

/// @return std::future 返回一个 future 对象, 允许用户获取任务的返回值

template

auto submit(F &&f, Args &&...args) -> std::future

{

if (!running_) throw std::runtime_error("error: ThreadPool is not running. Cannot submit new tasks.");

using return_type = decltype(f(args...));

// 将 f 包装成 task, task 是一个 shared_ptr 指向 packaged_task

auto task = std::make_shared>(

std::bind(std::forward(f), std::forward(args)...) // 将函数和参数封装成一个 return_type() 的可调用对象

);

std::future ret = task->get_future(); // 获取与 task 相关联的 future

{

std::lock_guard lock(mtx_);

task_queue_.emplace([task] { (*task)(); }); // 将任务添加到任务队列中

}

cv_.notify_one(); // 通知一个等待中的工作线程有新的任务可以执行

return ret; // 返回 future 对象

}

/// @brief 阻塞直到所有任务完成(任务队列为空且没有任务在执行), 若没有任务,立即返回

void wait_all()

{

if (busy_count_ == 0 && pending_tasks() == 0) return;

std::unique_lock lock(mtx_done_);

cv_done_.wait(lock, [this] { return busy_count_ == 0 && pending_tasks() == 0; });

}

/// @brief 关闭线程池

/// @param mode `WaitForAllTasks` 等待所有任务执行完成后再关闭; `DiscardPendingTasks` 立即关闭线程池,

/// 抛弃尚未开始的任务.

void shutdown(shutdown_mode mode = shutdown_mode::WaitForAllTasks)

{

{

std::lock_guard lock(mtx_);

if (!running_) return; // 已经关闭则直接返回

running_ = false;

if (mode == shutdown_mode::DiscardPendingTasks) // 放弃任务模式

{

std::queue empty;

std::swap(task_queue_, empty); // 清空任务队列

}

}

cv_.notify_all();

for (std::thread &worker : workers_)

{

if (worker.joinable()) worker.join();

}

workers_.clear();

}

/// @brief 重启线程池, 先关闭当前线程池(等待所有任务完成), 然后以指定的线程数量重新启动线程池.

/// @param thread_count 要创建的工作线程数量

void reboot(std::size_t thread_count)

{

shutdown(shutdown_mode::WaitForAllTasks);

{

std::lock_guard lock(mtx_);

if (running_) return; // 已重启, 无需再次初始化(幂等)

running_ = true;

launch_threads(validate_thread_count(thread_count));

}

}

/// @brief 当前线程池的总线程数量

std::size_t total_threads() const noexcept

{

return workers_.size();

}

/// @brief 获取当前等待的任务数量

std::size_t pending_tasks() const noexcept

{

std::lock_guard lock(mtx_);

return task_queue_.size();

}

/// @brief 获取繁忙的线程数量

std::size_t busy_threads() const noexcept

{

return busy_count_.load();

}

/// @brief 获取空闲线程数量

std::size_t idle_threads() const noexcept

{

return workers_.size() - busy_count_.load();

}

/// @brief 当前线程池是否正在运行(未停止)

bool is_running() const noexcept

{

return running_.load();

}

/// @brief 获取线程池的当前状态信息

status_info status() const noexcept

{

std::size_t total = 0;

std::size_t pending = 0;

{

std::lock_guard lock(mtx_);

total = workers_.size();

pending = task_queue_.size();

}

std::size_t busy = busy_count_.load();

std::size_t idle = total - busy;

return {total, busy, idle, pending, running_.load()};

}

// 禁用拷贝构造函数和拷贝赋值操作符

threadpool(const threadpool &) = delete;

threadpool &operator=(const threadpool &) = delete;

// 禁用移动构造函数和移动赋值操作符

threadpool(threadpool &&) = delete;

threadpool &operator=(threadpool &&) = delete;

private:

/// @brief 默认线程数, 获取硬件支持的并发线程数, 若无法获取则默认为4

static std::size_t default_thread_count()

{

auto n = std::thread::hardware_concurrency();

return n == 0 ? 4 : n;

}

/// @brief 验证线程数是否合法, 1 <= count <= 4096

static std::size_t validate_thread_count(std::size_t count)

{

if (count < 1 || count > 4096) throw std::invalid_argument("invalid thread_count: must be in range [1, 1024]");

return count;

}

/// @brief 启动线程池, 创建指定数量的工作线程

/// @param thread_count 线程池中线程的数量

void launch_threads(std::size_t thread_count)

{

if (!workers_.empty()) return; // 已经初始化过

for (std::size_t i = 0; i < thread_count; ++i)

{

// 创建并启动工作线程

workers_.emplace_back([this] {

while (true)

{

task_t task;

{

std::unique_lock lock(mtx_);

// 等待直到任务队列中有任务, 或者线程池已停止

cv_.wait(lock, [this] { return !running_ || !task_queue_.empty(); });

if (!running_ && task_queue_.empty()) return; // 如果线程池已经停止并且队列为空, 退出线程

task = std::move(task_queue_.front()); // 从队列中取出任务

task_queue_.pop();

}

++busy_count_;

task(); // 执行任务

--busy_count_;

// 判断任务是否已全部完成

if (busy_count_ == 0 && task_queue_.empty())

{

std::lock_guard lock(mtx_done_);

if (busy_count_ == 0 && pending_tasks() == 0) // 二次确认, 避免竞态

{

cv_done_.notify_all();

}

}

}

});

}

}

private:

std::vector workers_; // 工作线程集合,用于并发执行任务

std::queue task_queue_; // 等待执行的任务队列

std::condition_variable cv_; // 条件变量,用于通知工作线程有新任务到来

mutable std::mutex mtx_; // 主互斥锁,保护任务队列和与其相关的状态

std::atomic busy_count_{0}; // 正在执行任务的线程数量

std::atomic running_{true}; // 线程池是否处于运行状态

mutable std::mutex mtx_done_; // 用于保护完成通知的互斥锁(wait_all 用)

std::condition_variable cv_done_; // 条件变量,用于等待所有任务执行完毕(配合 wait_all 使用)

};

} // namespace abin

#endif // ABIN_THREADPOOL_H

← 微信附近人在哪里设置 阴阳师手游黑鬼使怎么样 黑鬼使御魂搭配 →

相关推荐