線程池的基本概念
不管線程池是什么東西!但是我們必須知道線程池被搞出來的目的就是:提高程序執行效率而設計出來的;
了解了線程池的目的后:我們就可以開始理解線程池:
首先回答一個問題:為什么會有線程池?
呃呃,我這么問就很奇怪,因為線程池是什么我都沒說,怎么會知道為什么會有線程池呢?所以我打算帶大家去思考一個場景:
當我們的程序中:有一批任務到來時候(通常該任務都是從網絡來的),我們就會創建一堆線程去處理這一批任務;
雖然說創建線程的成本開銷并不大,但是這里有個問題:當我們任務來到時候,你才去創建線程去處理這個任務,你不覺得這樣很慢嗎?
是否我們可以換個思路:假如我們有一種手段:使得任務一到來,就可以馬上有線程去處理這批任務,這樣是不是相對于前面等線程來到,再創建線程去處理時候快得多;
所以說:線程池就是基于上面的思路設計的;線程池就是:預先創建好一大批線程,同時線程池維護一個隊列,來存放到來的任務,當隊列中一旦有任務時候,預先創建好的一大批線程就可以并發處理這一批任務了;
我們抽象出一個模型:
任務派發者是誰? 是生產者;
任務存儲的隊列是什么?是一個容器,數組,鏈表,只要是可以存放產品(數據)的東西即可;
拿任務去處理的是誰?是消費者;
所以說:線程池本質就是一個生產者消費者的模型;
而我們線程池只需要關注兩個點:一個存放任務的隊列,和消費隊列任務的消費者即可;而生產者暫時不用關注,因為生產者是你外部搞出任務丟給線程池去使用;那么什么時候可以關心生產者呢?
也就是當我們去使用線程池的時候咯;這不就是妥妥的生產者消費者模型嘛!
線程池實現的基本思路:
在各個編程語言的語種中都有線程池的概念,并且很多語言中直接提供了線程池,作為程序猿直接使用就可以了,下面給大家介紹一下線程池的實現原理:
線程池的組成主要分為 3 個部分,這三部分配合工作就可以得到一個完整的線程池:
任務隊列,存儲需要處理的任務,由工作的線程來處理這些任務
通過線程池提供的 API 函數,將一個待處理的任務添加到任務隊列,或者從任務隊列中刪除;
已處理的任務會被從任務隊列中刪除;
線程池的使用者,也就是調用線程池函數往任務隊列中添加任務的線程就是生產者線程;
工作的線程(任務隊列任務的消費者) ,N個
線程池中維護了一定數量的工作線程,他們的作用是是不停的讀任務隊列,從里邊取出任務并處理
工作的線程相當于是任務隊列的消費者角色;
如果任務隊列為空,工作的線程將會被阻塞 (使用條件變量 / 信號量阻塞);
如果阻塞之后有了新的任務,由生產者將阻塞解除,工作線程開始工作;
管理者線程(不處理任務隊列中的任務),1個
它的任務是周期性的對任務隊列中的任務數量以及處于忙狀態的工作線程個數進行檢測;
當任務過多的時候,可以適當的創建一些新的工作線程;
當任務過少的時候,可以適當的銷毀一些工作的線程;
線程池的代碼
1.任務隊列的任務結構體
對于任務隊列:
里面存放的都是函數指針,該函數指針指向的就是處理任務的函數;
同時還要維護一個任務函數的形參;
typedef struct Task
{
void (*function)(void *args); //任務的函數指針
void *args; //任務函數的形參
} Task;
2. 線程池的定義
線程池里面最重要的是:
一個任務隊列;
多個消費者線程IDs;
一個管理者線程ID;
管理線程池的鎖;
管理任務隊列是否為滿和空的條件變量;
還有一些其他的輔助成員變量;
struct ThreadPool
{
Task *taskQ; //任務隊列
/*對于一個任務隊列:我們需要知道以下信息*/
int queueCapacity; //隊列的容量
int queueSize; //當前任務的個數
int queueFront; //隊頭取任務
int queueRear; //隊尾放任務
/*有了任務隊列后,還要有管理任務隊列的線程和從任務隊列拿任務的線程*/
pthread_t managerID; //管理者線程
/*設置為指針的目的:工作線程有多個*/
pthread_t *threadIDs; //工作線程(也就是消費者)
/*對于工作線程我們要知道以下這幾個消息方便管理*/
int minNum; //最少的工作線程數
int maxNum; //最多的工作線程數
int busyNum; //正在工作的線程數,也就是正在獲取任務處理的線程
int liveNum; //存貨的工作線程數(也就是被喚醒的線程,卻沒有資格去獲取任務的線程)
int exitNum; //銷毀的工作線程數(因為可能工作線程存在,但是卻不工作,我們需要殺掉一些不必要的線程)
/* 由于任務隊列為臨界資源:
工作線程(消費者)可能有多個會同時競爭該資源
同時多生產者線程之間(也就是往任務隊列放任務的線程)也會競爭該資源
所以我們要保證互斥訪問線程池的任務隊列
*/
pthread_mutex_t mutexpool; //鎖整個線程池
pthread_mutex_t mutexbusyNum; //鎖增在工作線程的數量
/*由于任務隊列滿,或者為空:
生產者和消費者都需要阻塞
所以需要條件變量,來保證
*/
pthread_cond_t notFull; //判斷線程池是否為滿
pthread_cond_t notEmpty; //判斷線程池是否為空
/*輔助成員主要判斷該線程池是否還在工作*/
int shutdown; //判斷是否需要銷毀線程池,是0不銷毀,是1銷毀
};
線程池的頭文件聲明
#pragma once
#include < pthread.h >
#include < string.h >
#include < unistd.h >
#include < malloc.h >
#include< stdio.h >
typedef struct ThreadPool ThreadPool; //線程池結構體,這里聲明的原因是結構體定義在線程池源文件中
//創建線程池并初始化
ThreadPool* threadPoolCreate(int min,int max,int queueSize);
//銷毀線程池
int threadPoolDestroy(ThreadPool* pool);
//給線程池添加任務
void threadPoolAdd(ThreadPool* pool,void(*functions)(void*),void* args);
//獲取線程池工作線程的個數
int threadBusyNum (ThreadPool* pool);
//獲取線程池存活的線程的個數
int threadLiveNum (ThreadPool* pool);
//工作線程
void* worker (void* args);
//管理線程
void* manager (void* args);
//線程退出函數
void threadExit(ThreadPool* pool);
線程池的源文件
#include"thread_pool.h"
const int WORK_THREAD_NUMBER = 2; //管理者線程要添加的工作線程個數,和銷毀的線程個數
/*
線程池:首先要有個任務隊列,在C語言中,
任務隊列是需要自己定義的,C++中可以直接使用容器queue
*/
//任務隊列存放的任務就是一個函數指針
typedef struct Task
{
void (*function)(void *args);
void *args;
} Task;
//再搞出一個線程池
struct ThreadPool
{
Task *taskQ; //任務隊列
/*對于一個任務隊列:我們需要知道以下信息*/
int queueCapacity; //隊列的容量
int queueSize; //當前任務的個數
int queueFront; //隊頭取任務
int queueRear; //隊尾放任務
/*有了任務隊列后,還要有管理任務隊列的線程和從任務隊列拿任務的線程*/
pthread_t managerID; //管理者線程
/*設置為指針的目的:工作線程有多個*/
pthread_t *threadIDs; //工作線程(也就是消費者)
/*對于工作線程我們要知道以下這幾個消息方便管理*/
int minNum; //最少的工作線程數
int maxNum; //最多的工作線程數
int busyNum; //正在工作的線程數,也就是正在獲取任務處理的線程
int liveNum; //存貨的工作線程數(也就是被喚醒的線程,卻沒有資格去獲取任務的線程)
int exitNum; //銷毀的工作線程數(因為可能工作線程存在,但是卻不工作,我們需要殺掉一些不必要的線程)
/* 由于任務隊列為臨界資源:
工作線程(消費者)可能有多個會同時競爭該資源
同時多生產者線程之間(也就是往任務隊列放任務的線程)也會競爭該資源
所以我們要保證互斥訪問線程池的任務隊列
*/
pthread_mutex_t mutexpool; //鎖整個線程池
pthread_mutex_t mutexbusyNum; //鎖增在工作線程的數量
/*由于任務隊列滿,或者為空:
生產者和消費者都需要阻塞
所以需要條件變量,來保證
*/
pthread_cond_t notFull; //判斷線程池是否為滿
pthread_cond_t notEmpty; //判斷線程池是否為空
/*輔助成員主要判斷該線程池是否還在工作*/
int shutdown; //判斷是否需要銷毀線程池,是0不銷毀,是1銷毀
};
//************************************************************************************************
/*由于我們的線程池被創建出來時候,就必須保證存在的,
所以我們返回值要設計為指針類型,不能是賦值拷貝的形式
并且如何考慮線程池需要傳入什么參數初始化呢?
*/
ThreadPool *threadPoolCreate(int min, int max, int queueSize)
{
//先搞出一個線程池
ThreadPool *pool = (ThreadPool *)malloc(sizeof(ThreadPool));
do // do while(0)的設計是為了,假設開辟線程池,消費者線程IDs,任務隊列空間失敗,可以直接跳出循環統一處理釋放空間
{
if (pool == NULL)
{
printf("malloc threadPool is failedn");
break;
}
//搞出線程池后開始初始化里面的數據成員
//首先先搞出消費者線程出來
pool- >threadIDs = (pthread_t *)malloc(sizeof(pthread_t) * max);
if (pool- >threadIDs == NULL)
{
printf("malloc threadIDs is failedn");
/*如果沒有do while(0)的設計,這里直接返回,那么前面的pool內存池的空間沒有被釋放,這就會內存泄漏了*/
// return NULL;
//基于上面的注釋考慮,這里設計break;退出dowhile(0)然后處理
break;
}
//初始化消費者線程ID
/*這么做的目的是:在管理者線程中可以通過判斷線程ID是否為0,來說明該消費者線程是否被占用*/
memset(pool- >threadIDs, 0, sizeof(pthread_t) * max);
//初始化線程池的其他成員屬性
pool- >minNum = min;
pool- >maxNum = max;
pool- >busyNum = 0;
pool- >liveNum = min;
pool- >exitNum = 0;
//初始化鎖和條件變量
if (pthread_mutex_init(&pool- >mutexpool, NULL) != 0 ||
pthread_mutex_init(&pool- >mutexpool, NULL) != 0 ||
pthread_cond_init(&pool- >notEmpty, NULL) != 0 ||
pthread_cond_init(&pool- >notFull, NULL) != 0)
{
perror("mutex or condition failed:");
}
//初始化任務隊列
pool- >taskQ = (Task *)malloc(sizeof(Task) * queueSize);
if (pool- >taskQ == NULL)
{
printf("malloc taskQ is failedn");
break;
}
pool- >queueCapacity = queueSize;
pool- >queueSize = 0;
pool- >queueFront = 0;
pool- >queueRear = 0;
//剛開始不關閉線程池
pool- >shutdown = 0;
//創建管理者線程和消費者線程
pthread_create(&pool- >managerID, NULL, manager, (void *)pool);
int i = 0;
for (; i < min; ++i)
{
/*消費線程需要消費的是任務,
也就是taskQ,而taskQ又是pool的一個成員屬性
所以傳參時候,我們傳入pool就可以獲得taskQ了
*/
pthread_create(&pool- >threadIDs[i], NULL, worker, (void *)pool);
}
//創建成功初始化后,那么就可以把線程池返回去了
return pool;
} while (0);
//如果break出來,那么就是異常的開辟空間失敗,要釋放資源
if (pool)
free(pool);
if (pool && pool- >threadIDs)
free(pool- >threadIDs);
if (pool && pool- >taskQ)
free(pool- >taskQ);
return NULL;
}
//判斷任務隊列是否為空
static int taskQIsEmpty(ThreadPool *pool)
{
return pool- >queueSize == 0;
}
//判斷線程池是否還工作
static int isShutDown(ThreadPool *pool)
{
return pool- >shutdown == 1 ? 1 : 0;
}
//消費者線程
void *worker(void *args)
{
ThreadPool *pool = (ThreadPool *)args;
/*設計為死循環是:消費者要不斷從任務隊列拿任務來處理*/
while (1)
{
pthread_mutex_lock(&pool- >mutexpool);
//消費數據之前,要判斷任務隊列是否為空,空就需要掛起該線程
while (taskQIsEmpty(pool) && !isShutDown(pool))
{
pthread_cond_wait(&pool- >notEmpty, &pool- >mutexpool);
//線程被喚醒后,判斷是否需要銷毀該線程,因為有線程是多余的
if (pool- >exitNum > 0)
{
pool- >exitNum--;
if (pool- >liveNum > pool- >minNum)
{
pool- >liveNum--;
pthread_mutex_unlock(&pool- >mutexpool); //退出線程前解鎖,防止死鎖問題
threadExit(pool);
}
}
}
//還需要判斷線程池是否關閉了,關閉了就退出消費者線程即可
if (isShutDown(pool))
{
pthread_mutex_unlock(&pool- >mutexpool);
threadExit(pool);
}
//開始消費者拿任務
Task task; //保存任務的變量
task.function = pool- >taskQ[pool- >queueFront].function; //獲取到任務隊列的任務,就是一個函數指針
task.args = pool- >taskQ[pool- >queueFront].args; //獲取任務隊列任務的函數指針參數
//控制任務隊列的指針移動
pool- >queueFront++;
pool- >queueFront %= pool- >queueCapacity;
pool- >queueSize--;
pthread_mutex_unlock(&pool- >mutexpool);
//喚醒生產者
pthread_cond_signal(&pool- >notFull);
//拿到任務后就是處理任務
// 1.處理任務前,先處理busyNum
pthread_mutex_lock(&pool- >mutexbusyNum);
pool- >busyNum++;
pthread_mutex_unlock(&pool- >mutexbusyNum);
// 2. 這里處理任務就是調用任務函數
task.function(task.args);
//任務處理完就釋放參數的空間
free(task.args);
task.args = NULL;
printf("thread %ld ending working ... n", pthread_self());
// 3.處理完任務對其busyNum操作
pthread_mutex_lock(&pool- >mutexbusyNum);
pool- >busyNum--;
pthread_mutex_unlock(&pool- >mutexbusyNum);
}
}
//管理者線程
/*
主要是管理創建線程和銷毀線程
*/
void *manager(void *args)
{
ThreadPool *pool = (ThreadPool *)args;
//只要線程池沒關閉,那么管理者線程就一直工作
while (!isShutDown(pool))
{
//自己定制的檢查策略:我設置每個三秒檢測
sleep(3);
//取出線程池任務的數量和消費者的工作線程數量
pthread_mutex_lock(&pool- >mutexpool);
int queueSize = pool- >queueSize;
int liveNum = pool- >liveNum;
pthread_mutex_unlock(&pool- >mutexpool);
//獲取忙的消費者線程數量
pthread_mutex_lock(&pool- >mutexbusyNum);
int busyNum = pool- >busyNum;
pthread_mutex_unlock(&pool- >mutexbusyNum);
//開始管理線程
// 1.添加消費者線程
/*制定添加規則(也是自己設定的)
任務的個數 > 存活的線程個數 && 存活的線程個數 < 最大的線程個數
*/
if (queueSize > liveNum && liveNum < pool- >maxNum)
{
pthread_mutex_lock(&pool- >mutexpool); //這個鎖主要是操作了liveNum這個資源
int counter = 0; // counter表示要添加的消費者線程數量
//遍歷 消費者線程IDs數組,看看哪個位置可以放入新添加的線程
int i = 0;
for (; i < pool- >maxNum &&
counter < WORK_THREAD_NUMBER &&
pool- >liveNum < pool- >maxNum;
i++)
{
//為0表示消費者線程數組的位置可以放入線程ID
if (pool- >threadIDs[i] == 0)
{
pthread_create(&pool- >threadIDs[i], NULL, worker, pool);
counter++;
liveNum++;
}
}
pthread_mutex_unlock(&pool- >mutexpool);
}
//由于線程過多,可能要進行銷毀
// 2. 銷毀消費者線程
/*
銷毀線程的策略:
存活的線程數量 >忙的線程數量*2 && 存活線程數量 >最小線程數量
*/
if (liveNum > busyNum * 2 && liveNum > pool- >minNum)
{
pthread_mutex_lock(&pool- >mutexpool);
pool- >exitNum = WORK_THREAD_NUMBER;
pthread_mutex_unlock(&pool- >mutexpool);
//讓工作者線程去自殺
/*如何讓他自殺呢?
由于線程池有多余的消費者線程不工作
我們可以通過喚醒消費者線程,讓他去自己消亡
*/
int i = 0;
for (; i < WORK_THREAD_NUMBER; i++)
{
pthread_cond_signal(&pool- >notEmpty);
}
}
}
}
//線程退出函數
void threadExit(ThreadPool *pool)
{
pthread_t tid = pthread_self();
int i = 0;
//遍歷消費者線程的線程個數,找到退出線程的ID
for (; i < pool- >maxNum; i++)
{
if (pool- >threadIDs[i] == tid)
{
pool- >threadIDs[i] = 0;
printf("threadExit()消費者線程 :%ld exit...n", tid);
break;
}
}
pthread_exit(NULL);
}
static int taskQisFull(ThreadPool* pool)
{
return pool- >queueCapacity == pool- >queueSize;
}
//給線程池添加任務
void threadPoolAdd(ThreadPool* pool,void(*function)(void*),void* args)
{
pthread_mutex_lock(&pool- >mutexpool);
//生產者線程:任務隊列滿要阻塞自己
while(taskQisFull(pool) && !isShutDown(pool))
{
pthread_cond_wait(&pool- >notFull,&pool- >mutexpool);
}
if(isShutDown(pool))
{
pthread_mutex_unlock(&pool- >mutexpool);
return ;
}
//添加任務
pool- >taskQ[pool- >queueRear].function = function;
pool- >taskQ[pool- >queueRear].args = args;
pool- >queueRear++;
pool- >queueRear %= pool- >queueCapacity;
pool- >queueSize++;
pthread_mutex_unlock(&pool- >mutexpool);
//喚醒work線程:
pthread_cond_signal(&pool- >notEmpty);
}
//獲取線程池工作線程的個數
int threadBusyNum (ThreadPool* pool)
{
pthread_mutex_lock(&pool- >mutexbusyNum);
int busyNum = pool- >busyNum;
pthread_mutex_unlock(&pool- >mutexbusyNum);
return busyNum;
}
//獲取線程池存活的線程的個數
int threadLiveNum (ThreadPool* pool)
{
pthread_mutex_lock(&pool- >mutexpool);
int liveNum = pool- >liveNum;
pthread_mutex_unlock(&pool- >mutexpool);
return liveNum;
}
//銷毀線程池
int threadPoolDestroy(ThreadPool* pool)
{
if(pool == NULL)
{
return -1;
}
//關閉線程池
pool- >shutdown = 1;
//喚醒阻塞的消費者
//存活的線程有多少就喚醒多少
int i = 0;
for(;i < pool- >liveNum;i++)
{
pthread_cond_signal(&pool- >notEmpty);
}
pthread_join(pool- >managerID,NULL);
//釋放資源
if(pool- >taskQ )
free(pool- >taskQ);
if(pool- >threadIDs)
free(pool- >threadIDs);
pthread_mutex_destroy(&pool- >mutexbusyNum);
pthread_mutex_destroy(&pool- >mutexpool);
pthread_cond_destroy(&pool- >notFull);
pthread_cond_destroy(&pool- >notEmpty);
free(pool);
pool = NULL;
return 0;
}
線程池測試代碼
#include"thread_pool.h"
//任務處理函數
void taskFunction(void* args)
{
int num = *(int*)args;
printf("thread: %ld is working,number:%dn",pthread_self(),num);
sleep(1);
}
int main()
{
//創建線程池
ThreadPool* pool = threadPoolCreate(3,10,20);
//往線程池里面放任務
int i = 0;
for(; i< 20; i++)
{
int *num = (int*)malloc(sizeof(int));
*num = i+1;
threadPoolAdd(pool,taskFunction,(void*)num);
}
sleep(10);
threadPoolDestroy(pool);
return 0;
}
測試線程池結果
由于我的測試代碼:只搞了3個工作線程(消費者線程),任務隊列大小為20,并且搞了20個任務隊列進去,所以線程池就會有三個工作線程在搶奪任務工作!
-
程序
+關注
關注
117文章
3815瀏覽量
82018 -
容器
+關注
關注
0文章
503瀏覽量
22303 -
線程池
+關注
關注
0文章
57瀏覽量
7032 -
數組
+關注
關注
1文章
419瀏覽量
26256
發布評論請先 登錄
相關推薦
跨平臺的線程池組件--TP組件
python創建線程池的兩種方法
線程池的線程怎么釋放

Spring 的線程池應用

評論