简易版线程池实现
/*
* my_threadpool.cpp
*
* Created on: 2021年12月5日
* Author: LENOVO
*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <queue>
using namespace std;
#include <pthread.h>
//工作线程
typedef struct NWORKER
{
pthread_t id; //线程id
int number; //按线程创建顺序分配一个编号
int terminate; //线程终止标记
struct NTHREADPOOL *pool;
} nworker_t;
//任务队列
typedef struct NJOB
{
void (*job_func)(struct NJOB *job);
void *user_data;
} nJob_t;
//线程池
queue<nJob_t*> job_que; //任务队列
typedef struct NTHREADPOOL
{
//queue<nJob_t*> worker_que; //任务队列
pthread_cond_t cond;
pthread_mutex_t mtx;
} nthreadPool_t;
void *thread_callback(void *arg)
{
nworker_t *worker = (nworker_t *)arg;
while(1)
{
pthread_mutex_lock(&worker->pool->mtx);
printf("pthread_mutex_lock\n");
while(job_que.empty())
{
if(worker->terminate) break;
pthread_cond_wait(&worker->pool->cond,&worker->pool->mtx);
}
if(worker->terminate)
{
pthread_mutex_lock(&worker->pool->mtx);
break;
}
nJob_t* job = job_que.front();
if(job)
{
job_que.pop();
}
pthread_mutex_unlock(&worker->pool->mtx);
if(job == NULL) continue;
job->job_func(job);
}
free(worker);
return NULL;
}
int thread_pool_create(nthreadPool_t *pool,int num_thread)
{
if(NULL == pool)
{
printf("pool is NULL");
return -1;
}
if(num_thread < 1) num_thread = 1;
memset(pool,0,sizeof(nthreadPool_t));
// cond
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
memcpy(&pool->cond,&blank_cond,sizeof(pthread_cond_t));
// mutex
pthread_mutex_t blank_mtx = PTHREAD_MUTEX_INITIALIZER;
memcpy(&pool->mtx,&blank_mtx,sizeof(pthread_mutex_t));
int idx = 0;
for(idx = 0; idx < num_thread; idx++)
{
nworker_t *worker = (nworker_t *)malloc(sizeof(nworker_t));
if(NULL == worker)
{
perror("nworker_t _malloc");
return idx;
}
memset(worker,0,sizeof(nworker_t));
worker->pool = pool;
int ret = pthread_create(&worker->id,NULL,thread_callback,worker);
if(ret)
{
perror("pthread_create");
free(worker);
return idx;
}
worker->number = idx;
}
return idx;
}
void thread_pool_push_job(nJob_t* job,nthreadPool_t *pool)
{
pthread_mutex_lock(&pool->mtx);
job_que.push(job);
pthread_cond_signal(&pool->cond);
pthread_mutex_unlock(&pool->mtx);
}
#if 1
#define TASK_COUNT 1000
void counter(nJob_t* job)
{
if(NULL == job) return;
int idx = *(int*)job->user_data;
printf("job idx : %05d,selfid:%lu\n",idx,pthread_self());
free(job->user_data);
free(job);
}
int main()
{
nthreadPool_t pool = {0};
int num_thread = 10;
int ret = thread_pool_create(&pool,num_thread);
printf("pthread_num = %d\n",ret);
//
for(int i = 0; i < TASK_COUNT; i++)
{
nJob_t* job = (nJob_t*)malloc(sizeof(nJob_t));
if(NULL == job) exit(1);
job->job_func = counter;
job->user_data = malloc(sizeof(int));
*(int *)job->user_data = i;
thread_pool_push_job(job,&pool);
}
getchar();
}
#endif
刚开始在结构体中定义了一个queue,后来push时一直段错误
原因参考此博客 https://blog.csdn.net/weixin_44928892/article/details/109504171
原因:
先说结论:该段错误是由于当前Vector容器大小已达到容器容量上限,再次给容器添加元素导致内存空间不足,从而触发段错误。
1、当一个Vector容器被定义时,系统会给Vector分配一定的内存空间(Capacity),当Vector中元素不断增加至将要超过该容量时,
系统会分配一个更大的内存空间,在将之前的元素复制到新空间中,在新空间中继续添加元素。
2、结构体的数据成员是储存在一段连续的内存空间中的,且该空间大小通常是无法改变的。
当Vector被定义在结构体中时,遇到容量不足的情况,无法开辟新内存空间以存储新元素,便会导致内存不足,程序中断。