1. 태스크 노드
typedef void (*cb_fun)(void *); //任务结构体 typedef struct task { void *argv; //任务函数的参数(任务执行结束前,要保证参数地址有效) cb_fun handler; //任务函数(返回值必须为0 非0值用作增加线程,和销毁线程池) struct task *next; //任务链指针 }zoey_task_t;
Handler는 실제 태스크 함수인 함수 포인터, argv는 함수의 매개변수, next는 다음 태스크를 가리킨다.
2. Task Queue
typedef struct task_queue { zoey_task_t *head; //队列头 zoey_task_t **tail; //队列尾 unsigned int maxtasknum; //最大任务限制 unsigned int curtasknum; //当前任务数 }zoey_task_queue_t;
Head는 작업 큐의 헤드 포인터, tail은 작업 큐의 tail 포인터, maxtasknum은 큐의 최대 작업 수 제한, curtasknum은 현재 대기열에 있는 작업 수입니다.
3. 스레드 풀
typedef struct threadpool { pthread_mutex_t mutex; //互斥锁 pthread_cond_t cond; //条件锁 zoey_task_queue_t tasks;//任务队列 unsigned int threadnum; //线程数 unsigned int thread_stack_size; //线程堆栈大小 }zoey_threadpool_t;
mutex는 뮤텍스 잠금이고 cond는 조건부 잠금입니다. Mutex와 cond는 함께 수신하거나 추가할 스레드 풀 작업의 상호 배제를 보장합니다.
작업은 작업 대기열을 가리킵니다.
Threadnum은 스레드 풀에 있는 스레드 수
Thread_stack_size는 스레드 스택 크기
4. 시작 구성
//配置参数 typedef struct threadpool_conf { unsigned int threadnum; //线程数 unsigned int thread_stack_size;//线程堆栈大小 unsigned int maxtasknum;//最大任务限制 }zoey_threadpool_conf_t;
시작 구성 구조는 스레드 풀을 초기화할 때의 일부 매개변수입니다.
5. 스레드 풀 초기화
먼저 매개변수가 올바른지 확인한 후 mutex, cond, key(pthread_key_t)를 초기화합니다. 키는 스레드 전역 변수를 읽고 쓰는 데 사용됩니다. 이 전역 변수는 스레드 종료 여부를 제어합니다.
드디어 스레드를 생성합니다.
zoey_threadpool_t* zoey_threadpool_init(zoey_threadpool_conf_t *conf) { zoey_threadpool_t *pool = null; int error_flag_mutex = 0; int error_flag_cond = 0; pthread_attr_t attr; do{ if (z_conf_check(conf) == -1){ //检查参数是否合法 break; } pool = (zoey_threadpool_t *)malloc(sizeof(zoey_threadpool_t));//申请线程池句柄 if (pool == null){ break; } //初始化线程池基本参数 pool->threadnum = conf->threadnum; pool->thread_stack_size = conf->thread_stack_size; pool->tasks.maxtasknum = conf->maxtasknum; pool->tasks.curtasknum = 0; z_task_queue_init(&pool->tasks); if (z_thread_key_create() != 0){//创建一个pthread_key_t,用以访问线程全局变量。 free(pool); break; } if (z_thread_mutex_create(&pool->mutex) != 0){ //初始化互斥锁 z_thread_key_destroy(); free(pool); break; } if (z_thread_cond_create(&pool->cond) != 0){ //初始化条件锁 z_thread_key_destroy(); z_thread_mutex_destroy(&pool->mutex); free(pool); break; } if (z_threadpool_create(pool) != 0){ //创建线程池 z_thread_key_destroy(); z_thread_mutex_destroy(&pool->mutex); z_thread_cond_destroy(&pool->cond); free(pool); break; } return pool; }while(0); return null; }
6. 작업 추가
먼저 작업 노드를 신청하고 인스턴스화 후 작업 대기열에 노드를 추가하고 ++ 현재 작업 대기열 번호를 추가하고 다른 프로세스에 새 작업을 알립니다. 전체 프로세스가 잠겨 있습니다.
int zoey_threadpool_add_task(zoey_threadpool_t *pool, cb_fun handler, void* argv) { zoey_task_t *task = null; //申请一个任务节点并赋值 task = (zoey_task_t *)malloc(sizeof(zoey_task_t)); if (task == null){ return -1; } task->handler = handler; task->argv = argv; task->next = null; if (pthread_mutex_lock(&pool->mutex) != 0){ //加锁 free(task); return -1; } do{ if (pool->tasks.curtasknum >= pool->tasks.maxtasknum){//判断工作队列中的任务数是否达到限制 break; } //将任务节点尾插到任务队列 *(pool->tasks.tail) = task; pool->tasks.tail = &task->next; pool->tasks.curtasknum++; //通知阻塞的线程 if (pthread_cond_signal(&pool->cond) != 0){ break; } //解锁 pthread_mutex_unlock(&pool->mutex); return 0; }while(0); pthread_mutex_unlock(&pool->mutex); free(task); return -1; }
7. 스레드 풀 파괴
스레드 풀을 파괴한다는 것은 실제로 작업 큐에 작업을 추가하는 것이지만 추가된 작업은 스레드를 종료시키는 것입니다. z_threadpool_exit_cb 함수는 잠금을 0으로 설정한 다음 스레드를 종료합니다. 잠금 0은 이 스레드가 종료된 후 다음 스레드를 종료함을 의미합니다. 스레드를 종료하면 모든 리소스가 해제됩니다.
void zoey_threadpool_destroy(zoey_threadpool_t *pool) { unsigned int n = 0; volatile unsigned int lock; //z_threadpool_exit_cb函数会使对应线程退出 for (; n < pool->threadnum; n++){ lock = 1; if (zoey_threadpool_add_task(pool, z_threadpool_exit_cb, &lock) != 0){ return; } while (lock){ usleep(1); } } z_thread_mutex_destroy(&pool->mutex); z_thread_cond_destroy(&pool->cond); z_thread_key_destroy(); free(pool); }8. 스레드 추가하기
매우 간단합니다. 다른 스레드와 스레드 수를 생성하면 됩니다++. 잠그다.
int zoey_thread_add(zoey_threadpool_t *pool) { int ret = 0; if (pthread_mutex_lock(&pool->mutex) != 0){ return -1; } ret = z_thread_add(pool); pthread_mutex_unlock(&pool->mutex); return ret; }9. 작업 대기열의 최대 작업 제한을 변경합니다
num=0일 때 스레드 수를 무한으로 설정합니다.
void zoey_set_max_tasknum(zoey_threadpool_t *pool,unsigned int num) { if (pthread_mutex_lock(&pool->mutex) != 0){ return -1; } z_change_maxtask_num(pool, num); //改变最大任务限制 pthread_mutex_unlock(&pool->mutex); }10. 사용예제
int main()
{
int array[10000] = {0};
int i = 0;
zoey_threadpool_conf_t conf = {5,0,5}; //实例化启动参数
zoey_threadpool_t *pool = zoey_threadpool_init(&conf);//初始化线程池
if (pool == null){
return 0;
}
for (; i < 10000; i++){
array[i] = i;
if (i == 80){
zoey_thread_add(pool); //增加线程
zoey_thread_add(pool);
}
if (i == 100){
zoey_set_max_tasknum(pool, 0); //改变最大任务数 0为不做上限
}
while(1){
if (zoey_threadpool_add_task(pool, testfun, &array[i]) == 0){
break;
}
printf("error in i = %d\n",i);
}
}
zoey_threadpool_destroy(pool);
while(1){
sleep(5);
}
return 0;
}
위 내용은 nginx 스레드 풀의 소스 코드는 무엇입니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!