c++实现简单的线程池

2025-05-29 0 80

c++线程池,继承CDoit,实现其中的start和end

头文件

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96
/*

* 多线程管理类

*

*/

#ifndef CTHREADPOOLMANAGE_H

#define CTHREADPOOLMANAGE_H

#include <iostream>

#include <pthread.h>

#include <unistd.h>

#include <list>

#include <vector>

#include <time.h>

#include <asm/errno.h>

#define USLEEP_TIME 100

#define CHECK_TIME 1

using namespace std;

class CDoit

{

public:

virtual int start(void *){};

virtual int end(){};

};

class CthreadPoolManage

{

private:

int _minThreads; //最少保留几个线程

int _maxThreads; //最多可以有几个线程

int _waitSec; //空闲多少秒后将线程关闭

class threadInfo{

public:

threadInfo(){

isbusy = false;

doFlag = true;

}

//

pthread_mutex_t mtx=PTHREAD_MUTEX_INITIALIZER;

pthread_cond_t cond=PTHREAD_COND_INITIALIZER;

bool isbusy; //是否空闲

bool doFlag;

//

time_t beginTime; //线程不工作开始时间

pthread_t cThreadPid; //线程id

pthread_attr_t cThreadAttr; //线程属性

CDoit * doit; //任务类

void * value; //需要传递的值

};

//线程函数

static void* startThread(void*);

//任务队列锁

pthread_mutex_t _duty_mutex;

//任务队列

list<threadInfo*> _dutyList;

//线程队列锁

pthread_mutex_t _thread_mutex;

//线程队列

list<threadInfo*> _threadList;

///初始化,创建最小个数线程///

void initThread();

///任务分配线程///

static void* taskAllocation(void*arg);

pthread_t tasktPid;

///线程销毁、状态检查线程///

static void* checkThread(void* arg);

pthread_t checktPid;

bool checkrun;

//线程异常退出清理

static void threadCleanUp(void* arg);

//

int addThread(list<threadInfo*> *plist,threadInfo* ptinfo);

public:

CthreadPoolManage();

/*

保留的最少线程,最多线程数,空闲多久销毁,保留几个线程的冗余

*/

CthreadPoolManage(int min,int max,int waitSec);

~CthreadPoolManage();

int start();

//任务注入器

int putDuty(CDoit *,void *);

int getNowThreadNum();

};

#endif // CTHREADPOOLMANAGE_H

CPP文件

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240
/*

* 线程池,线程管理类

*

*/

#include "cthreadpoolmanage.h"

CthreadPoolManage::CthreadPoolManage()

{

_minThreads = 5; //最少保留几个线程

_maxThreads = 5; //最多可以有几个线程

_waitSec = 10; //空闲多少秒后将线程关闭

pthread_mutex_init(&_duty_mutex, NULL);

pthread_mutex_init(&_thread_mutex, NULL);

checkrun = true;

}

CthreadPoolManage::CthreadPoolManage(int min, int max, int waitSec)

{

CthreadPoolManage();

_minThreads = min; //最少保留几个线程

_maxThreads = max; //最多可以有几个线程

_waitSec = waitSec; //空闲多少秒后将线程关闭

}

CthreadPoolManage::~CthreadPoolManage()

{

}

void CthreadPoolManage::threadCleanUp(void* arg)

{

threadInfo* tinfo = (threadInfo*)arg;

tinfo->isbusy = false;

pthread_mutex_unlock(&tinfo->mtx);

pthread_attr_destroy (&tinfo->cThreadAttr);

delete tinfo;

}

void* CthreadPoolManage::startThread(void* arg)

{

cout<<"线程开始工作"<<endl;

threadInfo* tinfo = (threadInfo*)arg;

pthread_cleanup_push(threadCleanUp,arg);

while(tinfo->doFlag){

pthread_mutex_lock(&tinfo->mtx);

if(tinfo->doit == NULL)

{

cout<<"开始等待任务"<<endl;

pthread_cond_wait(&tinfo->cond,&tinfo->mtx);

cout<<"有任务了"<<endl;

}

tinfo->isbusy = true;

tinfo->doit->start(tinfo->value);

tinfo->doit->end();

tinfo->doit=NULL;

tinfo->isbusy = false;

time( &tinfo->beginTime);

pthread_mutex_unlock(&tinfo->mtx);

}

//0正常执行到这儿不执行清理函数,异常会执行

pthread_cleanup_pop(0);

pthread_attr_destroy (&tinfo->cThreadAttr);

delete tinfo;

cout<<"线程结束"<<endl;

}

void CthreadPoolManage::initThread()

{

int i = 0;

for(i = 0;i<this->_minThreads;i++)

{

threadInfo *tinfo = new threadInfo;

tinfo->doit = NULL;

tinfo->value = NULL;

tinfo->isbusy = false;

tinfo->doFlag = true;

// PTHREAD_CREATE_DETACHED (分离线程) 和 PTHREAD _CREATE_JOINABLE (非分离线程)

pthread_attr_init(&tinfo->cThreadAttr);

pthread_attr_setdetachstate(&tinfo->cThreadAttr,PTHREAD_CREATE_DETACHED );

cout<<"初始化了一个线程"<<endl;

if(pthread_create(&tinfo->cThreadPid,&tinfo->cThreadAttr,startThread,(void *)tinfo) != 0)

{

cout<<"创建线程失败"<<endl;

break;

}

this->_threadList.push_back(tinfo);

}

}

int CthreadPoolManage::addThread(std::list< CthreadPoolManage::threadInfo* >* plist, CthreadPoolManage::threadInfo* ptinfo)

{

threadInfo *tinfo = new threadInfo;

tinfo->doit = ptinfo->doit;

tinfo->value = ptinfo->value;

tinfo->isbusy = true;

if(pthread_create(&tinfo->cThreadPid,NULL,startThread,(void *)tinfo) != 0)

{

cout<<"创建线程失败"<<endl;

return -1;

}

plist->push_back(tinfo);

return 0;

}

int CthreadPoolManage::putDuty(CDoit* doit, void* value)

{

threadInfo *tinfo = new threadInfo;

time( &tinfo->beginTime);

tinfo->doit= doit;

tinfo->value = value;

pthread_mutex_lock(&_duty_mutex);

this->_dutyList.push_back(tinfo);

pthread_mutex_unlock(&_duty_mutex);

return 0;

}

void* CthreadPoolManage::taskAllocation(void*arg)

{

CthreadPoolManage * ptmanage = (CthreadPoolManage*)arg;

int size_1 = 0;

int size_2 = 0;

int i_1 = 0;

int i_2 = 0;

bool a_1 = true;

bool a_2 = true;

threadInfo* ptinfo;

threadInfo* ptinfoTmp;

while(true){

size_1 = 0;

size_2 = 0;

pthread_mutex_lock(&ptmanage->_duty_mutex);

pthread_mutex_lock(&ptmanage->_thread_mutex);

size_1 = ptmanage->_dutyList.size();

size_2 =ptmanage->_threadList.size();

for(list<threadInfo*>::iterator itorti1 = ptmanage->_dutyList.begin();itorti1 !=ptmanage->_dutyList.end();)

{

ptinfo = *itorti1;

a_1 = true;

for(list<threadInfo*>::iterator itorti2 = ptmanage->_threadList.begin();itorti2!=ptmanage->_threadList.end();itorti2++){

ptinfoTmp = *itorti2;

if(EBUSY == pthread_mutex_trylock(&ptinfoTmp->mtx))

{

continue;

}

if(!ptinfoTmp->isbusy)

{

ptinfoTmp->doit = ptinfo->doit;

ptinfoTmp->value = ptinfo->value;

ptinfoTmp->isbusy = true;

pthread_cond_signal(&ptinfoTmp->cond);

pthread_mutex_unlock(&ptinfoTmp->mtx);

a_1 = false;

delete ptinfo;

break;

}

pthread_mutex_unlock(&ptinfoTmp->mtx);

}

if(a_1){

if(ptmanage->_threadList.size()>ptmanage->_maxThreads||ptmanage->addThread(&ptmanage->_threadList,ptinfo)!=0)

{

itorti1++;

continue;

}else{

itorti1 = ptmanage->_dutyList.erase(itorti1);

}

delete ptinfo;

}else{

itorti1 = ptmanage->_dutyList.erase(itorti1);

}

}

pthread_mutex_unlock(&ptmanage->_duty_mutex);

pthread_mutex_unlock(&ptmanage->_thread_mutex);

usleep(USLEEP_TIME);

}

return 0;

}

void* CthreadPoolManage::checkThread(void* arg)

{

CthreadPoolManage * ptmanage = (CthreadPoolManage*)arg;

threadInfo* ptinfo;

time_t nowtime;

while(ptmanage->checkrun){

sleep(CHECK_TIME);

pthread_mutex_lock(&ptmanage->_thread_mutex);

if(ptmanage->_threadList.size()<=ptmanage->_minThreads)

{

continue;

}

for(list<threadInfo*>::iterator itorti2 = ptmanage->_threadList.begin();itorti2!=ptmanage->_threadList.end();){

ptinfo = *itorti2;

if(EBUSY == pthread_mutex_trylock(&ptinfo->mtx))

{

itorti2++;

continue;

}

time(&nowtime);

if(ptinfo->isbusy == false && nowtime-ptinfo->beginTime>ptmanage->_waitSec)

{

ptinfo->doFlag = false;

itorti2 = ptmanage->_threadList.erase(itorti2);

}else{

itorti2++;

}

pthread_mutex_unlock(&ptinfo->mtx);

}

pthread_mutex_unlock(&ptmanage->_thread_mutex);

}

}

int CthreadPoolManage::start()

{

//初始化

this->initThread();

//启动任务分配线程

if(pthread_create(&tasktPid,NULL,taskAllocation,(void *)this) != 0)

{

cout<<"创建任务分配线程失败"<<endl;

return -1;

}

//创建现程状态分配管理线程

if(pthread_create(&checktPid,NULL,checkThread,(void *)this) != 0)

{

cout<<"创建线程状态分配管理线程失败"<<endl;

return -1;

}

return 0;

}

///////////////////////////////

int CthreadPoolManage::getNowThreadNum()

{

int num = 0;

pthread_mutex_lock(&this->_thread_mutex);

num = this->_threadList.size();

pthread_mutex_unlock(&this->_thread_mutex);

return num ;

}

以上所述就是本文的全部内容了,希望大家能够喜欢。

请您花一点时间将文章分享给您的朋友或者留下评论。我们将会由衷感谢您的支持!

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

快网idc优惠网 建站教程 c++实现简单的线程池 https://www.kuaiidc.com/107537.html

相关文章

发表评论
暂无评论