你的位置:首页 > 信息动态 > 新闻中心
信息动态
联系我们

线程池实现

2021/12/6 0:45:15

简易版线程池实现

/*
 * my_threadpool.cpp
 *
 *  Created on: 2021年12月5日
 *      Author: LENOVO
 */

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <queue>

using namespace std;

#include <pthread.h>

//工作线程
typedef struct NWORKER
{
	pthread_t id;  //线程id
	int number;		//按线程创建顺序分配一个编号
	int terminate; //线程终止标记

	struct NTHREADPOOL *pool;

} nworker_t;

//任务队列
typedef struct NJOB
{
	void (*job_func)(struct NJOB *job);
	void *user_data;

} nJob_t;
//线程池

queue<nJob_t*> job_que; //任务队列
typedef struct NTHREADPOOL
{
	//queue<nJob_t*> worker_que; //任务队列

	pthread_cond_t cond;
	pthread_mutex_t mtx;


} nthreadPool_t;



void *thread_callback(void *arg)
{
	nworker_t *worker = (nworker_t *)arg;

	while(1)
	{
		pthread_mutex_lock(&worker->pool->mtx);
		printf("pthread_mutex_lock\n");
		while(job_que.empty())
		{
			if(worker->terminate) break;

			pthread_cond_wait(&worker->pool->cond,&worker->pool->mtx);

		}

		if(worker->terminate)
		{
			pthread_mutex_lock(&worker->pool->mtx);
			break;
		}

		nJob_t* job = job_que.front();
		if(job)
		{
			job_que.pop();
		}
		pthread_mutex_unlock(&worker->pool->mtx);

		if(job == NULL) continue;


		job->job_func(job);
	}

	free(worker);
	return NULL;
}

int thread_pool_create(nthreadPool_t *pool,int num_thread)
{
	if(NULL == pool)
	{
		printf("pool is NULL");
		return -1;
	}

	if(num_thread < 1) num_thread = 1;
	memset(pool,0,sizeof(nthreadPool_t));

//	cond
	pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
	memcpy(&pool->cond,&blank_cond,sizeof(pthread_cond_t));

//	mutex
	pthread_mutex_t blank_mtx = PTHREAD_MUTEX_INITIALIZER;
	memcpy(&pool->mtx,&blank_mtx,sizeof(pthread_mutex_t));

	int idx = 0;
	for(idx = 0; idx < num_thread; idx++)
	{
		nworker_t *worker = (nworker_t *)malloc(sizeof(nworker_t));
		if(NULL == worker)
		{
			perror("nworker_t _malloc");
			return idx;
		}
		memset(worker,0,sizeof(nworker_t));

		worker->pool = pool;

		int ret = pthread_create(&worker->id,NULL,thread_callback,worker);
		if(ret)
		{
			perror("pthread_create");
			free(worker);
			return idx;
		}
		worker->number = idx;
	}
	return idx;
}

void thread_pool_push_job(nJob_t* job,nthreadPool_t *pool)
{
	pthread_mutex_lock(&pool->mtx);
	job_que.push(job);
	pthread_cond_signal(&pool->cond);
	pthread_mutex_unlock(&pool->mtx);
}

#if 1

#define TASK_COUNT		1000

void counter(nJob_t* job)
{

	if(NULL == job) return;
	int idx = *(int*)job->user_data;

	printf("job idx : %05d,selfid:%lu\n",idx,pthread_self());

	free(job->user_data);
	free(job);
}

int main()
{
	nthreadPool_t pool = {0};
	int num_thread = 10;

	int ret = thread_pool_create(&pool,num_thread);
	printf("pthread_num = %d\n",ret);
//
	for(int i = 0; i < TASK_COUNT; i++)
	{
		nJob_t* job = (nJob_t*)malloc(sizeof(nJob_t));
		if(NULL == job) exit(1);

		job->job_func = counter;
		job->user_data = malloc(sizeof(int));
		*(int *)job->user_data = i;

		thread_pool_push_job(job,&pool);

	}
	getchar();
}

#endif

刚开始在结构体中定义了一个queue,后来push时一直段错误
原因参考此博客 https://blog.csdn.net/weixin_44928892/article/details/109504171
原因:
先说结论:该段错误是由于当前Vector容器大小已达到容器容量上限,再次给容器添加元素导致内存空间不足,从而触发段错误。

1、当一个Vector容器被定义时,系统会给Vector分配一定的内存空间(Capacity),当Vector中元素不断增加至将要超过该容量时,

系统会分配一个更大的内存空间,在将之前的元素复制到新空间中,在新空间中继续添加元素。

2、结构体的数据成员是储存在一段连续的内存空间中的,且该空间大小通常是无法改变的。

当Vector被定义在结构体中时,遇到容量不足的情况,无法开辟新内存空间以存储新元素,便会导致内存不足,程序中断。