关键词搜索

源码搜索 ×
×

tars源码分析之13

发布2022-07-10浏览788次

详情内容

线程池是很典型,之前去阿里面试,就让写了一个线程池,来看看:

  1. #include "util/tc_thread_pool.h"
  2. #include "util/tc_common.h"
  3. #include <iostream>
  4. namespace tars
  5. {
  6. TC_ThreadPool::ThreadWorker::ThreadWorker(TC_ThreadPool *tpool)
  7. : _tpool(tpool)
  8. , _bTerminate(false)
  9. {
  10. }
  11. void TC_ThreadPool::ThreadWorker::terminate()
  12. {
  13. _bTerminate = true;
  14. _tpool->notifyT();
  15. }
  16. void TC_ThreadPool::ThreadWorker::run()
  17. {
  18. //调用初始化部分
  19. auto pst = _tpool->get();
  20. if(pst)
  21. {
  22. try
  23. {
  24. pst();
  25. }
  26. catch ( ... )
  27. {
  28. }
  29. }
  30. //调用处理部分
  31. while (!_bTerminate)
  32. {
  33. auto pfw = _tpool->get(this);
  34. if(pfw)
  35. {
  36. try
  37. {
  38. pfw();
  39. }
  40. catch ( ... )
  41. {
  42. }
  43. _tpool->idle(this);
  44. }
  45. }
  46. //结束
  47. _tpool->exit();
  48. }
  49. //
  50. //
  51. //
  52. TC_ThreadPool::KeyInitialize TC_ThreadPool::g_key_initialize;
  53. pthread_key_t TC_ThreadPool::g_key;
  54. void TC_ThreadPool::destructor(void *p)
  55. {
  56. ThreadData *ttd = (ThreadData*)p;
  57. delete ttd;
  58. }
  59. void TC_ThreadPool::exit()
  60. {
  61. TC_ThreadPool::ThreadData *p = getThreadData();
  62. if(p)
  63. {
  64. delete p;
  65. int ret = pthread_setspecific(g_key, NULL);
  66. if(ret != 0)
  67. {
  68. throw TC_ThreadPool_Exception("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret);
  69. }
  70. }
  71. _jobqueue.clear();
  72. }
  73. void TC_ThreadPool::setThreadData(TC_ThreadPool::ThreadData *p)
  74. {
  75. TC_ThreadPool::ThreadData *pOld = getThreadData();
  76. if(pOld != NULL && pOld != p)
  77. {
  78. delete pOld;
  79. }
  80. int ret = pthread_setspecific(g_key, (void *)p);
  81. if(ret != 0)
  82. {
  83. throw TC_ThreadPool_Exception("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret);
  84. }
  85. }
  86. TC_ThreadPool::ThreadData* TC_ThreadPool::getThreadData()
  87. {
  88. return (ThreadData *)pthread_getspecific(g_key);
  89. }
  90. void TC_ThreadPool::setThreadData(pthread_key_t pkey, ThreadData *p)
  91. {
  92. TC_ThreadPool::ThreadData *pOld = getThreadData(pkey);
  93. if(pOld != NULL && pOld != p)
  94. {
  95. delete pOld;
  96. }
  97. int ret = pthread_setspecific(pkey, (void *)p);
  98. if(ret != 0)
  99. {
  100. throw TC_ThreadPool_Exception("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret);
  101. }
  102. }
  103. TC_ThreadPool::ThreadData* TC_ThreadPool::getThreadData(pthread_key_t pkey)
  104. {
  105. return (ThreadData *)pthread_getspecific(pkey);
  106. }
  107. TC_ThreadPool::TC_ThreadPool()
  108. : _bAllDone(true)
  109. {
  110. }
  111. TC_ThreadPool::~TC_ThreadPool()
  112. {
  113. stop();
  114. clear();
  115. }
  116. void TC_ThreadPool::clear()
  117. {
  118. std::vector<ThreadWorker*>::iterator it = _jobthread.begin();
  119. while(it != _jobthread.end())
  120. {
  121. delete (*it);
  122. ++it;
  123. }
  124. _jobthread.clear();
  125. _busthread.clear();
  126. }
  127. void TC_ThreadPool::init(size_t num)
  128. {
  129. stop();
  130. Lock sync(*this);
  131. clear();
  132. for(size_t i = 0; i < num; i++)
  133. {
  134. _jobthread.push_back(new ThreadWorker(this));
  135. }
  136. }
  137. void TC_ThreadPool::stop()
  138. {
  139. Lock sync(*this);
  140. std::vector<ThreadWorker*>::iterator it = _jobthread.begin();
  141. while(it != _jobthread.end())
  142. {
  143. if ((*it)->isAlive())
  144. {
  145. (*it)->terminate();
  146. (*it)->getThreadControl().join();
  147. }
  148. ++it;
  149. }
  150. _bAllDone = true;
  151. }
  152. void TC_ThreadPool::start()
  153. {
  154. Lock sync(*this);
  155. std::vector<ThreadWorker*>::iterator it = _jobthread.begin();
  156. while(it != _jobthread.end())
  157. {
  158. (*it)->start();
  159. ++it;
  160. }
  161. _bAllDone = false;
  162. }
  163. bool TC_ThreadPool::finish()
  164. {
  165. return _startqueue.empty() && _jobqueue.empty() && _busthread.empty() && _bAllDone;
  166. }
  167. bool TC_ThreadPool::waitForAllDone(int millsecond)
  168. {
  169. Lock sync(_tmutex);
  170. start1:
  171. //任务队列和繁忙线程都是空的
  172. if (finish())
  173. {
  174. return true;
  175. }
  176. //永远等待
  177. if(millsecond < 0)
  178. {
  179. _tmutex.timedWait(1000);
  180. goto start1;
  181. }
  182. int64_t iNow= TC_Common::now2ms();
  183. int m = millsecond;
  184. start2:
  185. bool b = _tmutex.timedWait(millsecond);
  186. //完成处理了
  187. if(finish())
  188. {
  189. return true;
  190. }
  191. if(!b)
  192. {
  193. return false;
  194. }
  195. millsecond = max((int64_t)0, m - (TC_Common::now2ms() - iNow));
  196. goto start2;
  197. return false;
  198. }
  199. std::function<void ()> TC_ThreadPool::get(ThreadWorker *ptw)
  200. {
  201. std::function<void ()> res;
  202. if(!_jobqueue.pop_front(res, 1000))
  203. {
  204. return NULL;
  205. }
  206. {
  207. Lock sync(_tmutex);
  208. _busthread.insert(ptw);
  209. }
  210. return res;
  211. }
  212. std::function<void ()> TC_ThreadPool::get()
  213. {
  214. std::function<void ()> res;
  215. if(!_startqueue.pop_front(res))
  216. {
  217. return NULL;
  218. }
  219. return res;
  220. }
  221. void TC_ThreadPool::idle(ThreadWorker *ptw)
  222. {
  223. Lock sync(_tmutex);
  224. _busthread.erase(ptw);
  225. //无繁忙线程, 通知等待在线程池结束的线程醒过来
  226. if(_busthread.empty())
  227. {
  228. _bAllDone = true;
  229. _tmutex.notifyAll();
  230. }
  231. }
  232. void TC_ThreadPool::notifyT()
  233. {
  234. _jobqueue.notifyT();
  235. }
  236. }

相关技术文章

点击QQ咨询
开通会员
返回顶部
×
微信扫码支付
微信扫码支付
确定支付下载
请使用微信描二维码支付
×

提示信息

×

选择支付方式

  • 微信支付
  • 支付宝付款
确定支付下载