目录
线程池(Thread Pool)
线程池(Thread Pool)是一种用于管理多线程环境的技术。Thread Pool 会在 User Process 中预先创建一组 User Threads,并在需要时重复使用它们,而不是频繁的创建新线程。
线程池可以有效提高程序性能和可靠性:
- 避免频繁创建、销毁线程,降低了系统开销;
- 限制线程数量,防止过度占用系统资源;
- 提高了程序的响应速度和吞吐量;
- 管理线程的生命周期,避免了线程泄漏的风险;
- 可实现任务优先级。
tiny-threadpool
- Github:https://github.com/JmilkFan/tiny-threadpool
tiny-threadpool fork 自 C-Thread-Pool,是一个开源的轻量级线程池,实现了以下功能。
- 符合 ANCI C 和 POSIX 风格;
- 支持 Thread 的 Pause(暂停)、Resume(恢复)、Wait(等待);
- 提供简洁的 APIs。
数据结构设计
Task / Job
Task / Job 本质是由 Producer 发出的任务请求,通常数量庞大,需要高并发的进行处理。
/**
* Job 应该包含以下 3 个成员:
* 1. 线程入口函数;
* 2. 线程入口函数的参数;
* 3. 指向下一个 Job 的指针。
*/
typedef struct job{
struct job* prev; // 指向下一个 Job,添加 New Job 入队尾(Rear)时,上一次的 Rear Job,应该要指向下一个 New Job,然后 New Job 成为新的 Near Job。
void (*function)(void* arg); // 线程入口函数的类型声明
void* arg; // 线程入口函数的参数的类型声明
} job;
Task / Job Queue
Queue 用于缓存 Jobs,并且是 FIFO 的。在某些场景中,可能会要求 Queue 的长度是可调整的,也可能会要求有多条不同优先级的 Queues。
/**
* Job Queue 应该包含以下 5 个成员:
* 1. 队头
* 2. 队尾
* 3. 队长
* 4. 互斥锁,保证高并发 Jobs 入队/出队是 FIFO 的。
* 5. 队列状态:1)空队列;2)非空队列;
*/
typedef struct jobqueue{
job *front; // 指向队头
job *rear; // 指向队尾
int len; // 队列长度
pthread_mutex_t rwmutex; // 任务队列的锁
bsem *has_jobs; // 指向一个二元信号量,用于表示 Queue 是否为空。
} jobqueue;
/**
* 二元信号量,用于标识 Job Queue 是否为空。
* 同样需要使用互斥锁和条件变量来保证高并发处理时二元值的安全性。
*/
typedef struct bsem {
pthread_mutex_t mutex; // 信号量的锁
pthread_cond_t cond; // 信号量的条件
int v; // 信号量的互斥值
// 0:空队列;
// 1:非空队列。
} bsem;
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
Worker / Thread
Worker / Thread 是 Thread Pool 分配出来真正处理 Job 的执行单元,它们是线程入口函数的 Caller(调用者),所以也称为 Consumer。
/**
* Thread 应该包含以下 3 个成员:
* 1. 友好的 ID,便于调试。区别于 Kernel 分配的 TID。
* 2. 指向 pthread 实体的指针
* 3. 指向 Thread Pool 的指针,以此来获得/释放线程池的锁和条件变量
*/
typedef struct thread{
int id; // 友好 ID
pthread_t pthread; // 指向一个 pThread 实体
struct thpool_* thpool_p; // 指向线程池
} thread;
Thread Pool Manager
Thread Pool Manager 用于管理 Thread Pool 资源:例如:创建/销毁多线程、维护任务队列、线程池状态、互斥锁和条件变量等。
当有 Job 需要处理时,Manager 就从 Pool 中获取一个可用的 Thread 来处理。当 Job 完成后,Manager 回收 Thread,而不是销毁它。
/**
* Thread Pool Manager 应该包含以下 5 个成员:
* 1. 多线程列表
* 2. 活跃线程数量
* 3. 工作线程数量(可用线程数 = 活跃线程数量 - 工作线程数量)
* 4. 任务队列
* 5. 互斥锁
* 6. 条件变量
*/
typedef struct thpool_{
thread** threads; // 指向线程指针数组
volatile int num_threads_alive; // 当前活跃的线程数量
volatile int num_threads_working; // 当前工作中的线程数量
jobqueue jobqueue; // 线程池关联的任务队列
pthread_mutex_t thcount_lock; // 线程池的锁
pthread_cond_t threads_all_idle; // 线程池的条件变量
} thpool_;
- 12
- 13
- 14
- 15
- 16
- 17
Public APIs
typedef struct thpool_* threadpool;
// 创建一个包含有指定数量线程的线程池
threadpool thpool_init(int num_threads);
// 添加 task 到 task queue 中。
int thpool_add_work(threadpool, void (*function_p)(void*), void* arg_p);
// 等待所有 tasks 执行完。
void thpool_wait(threadpool);
// 暂停所有 tasks,使它们进入 sleep 状态。通过信号机制来实现。
void thpool_pause(threadpool);
// 恢复所有 tasks 继续执行。
void thpool_resume(threadpool);
// 销毁所有 tasks,如果有 task 正在执行,则先等待 task 执行完。
void thpool_destroy(threadpool);
// 获取当前正在工作中的线程数量。
int thpool_num_threads_working(threadpool);
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
Private Functions
// 构造 struct thread,并调用 pthread_create() 创建线程
static int thread_init (thpool_* thpool_p, struct thread** thread_p, int id)
// 当线程被暂停时会在这里休眠
static void thread_hold(int sig_id)
// 线程在此函数中执行任务
static void* thread_do(struct thread* thread_p)
// 销毁 struct thread
static void thread_destroy (thread* thread_p)
// 任务队列相关的操作集合
static int jobqueue_init(jobqueue* jobqueue_p)
static void jobqueue_clear(jobqueue* jobqueue_p)
static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob)
static struct job* jobqueue_pull(jobqueue* jobqueue_p)
static void jobqueue_destroy(jobqueue* jobqueue_p)
// 信号量相关的操作集合
static void bsem_init(bsem *bsem_p, int value)
static void bsem_reset(bsem *bsem_p)
static void bsem_post(bsem *bsem_p)
static void bsem_post_all(bsem *bsem_p)
static void bsem_wait(bsem* bsem_p)
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
运行示例
$ gcc example.c thpool.c -D THPOOL_DEBUG -pthread -o example
$ ./example
Making threadpool with 4 threads
THPOOL_DEBUG: Created thread 0 in pool
THPOOL_DEBUG: Created thread 1 in pool
THPOOL_DEBUG: Created thread 2 in pool
THPOOL_DEBUG: Created thread 3 in pool
Adding 40 tasks to threadpool
Thread #245600256 working on 0
Thread #246136832 working on 2
Thread #246673408 working on 3
Thread #246673408 working on 6
Thread #246673408 working on 7
...
Killing threadpool
- 12
- 13
- 14
- 15
- 16