Logo ROOT   6.14/05
Reference Guide
TThreadPool.h
Go to the documentation of this file.
1 // @(#)root/thread:$Id$
2 // Author: Anar Manafov 20/09/2011
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2011, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TThreadPool
13 #define ROOT_TThreadPool
14 
15 //////////////////////////////////////////////////////////////////////////
16 // //
17 // TThreadPool //
18 // //
19 // //
20 //////////////////////////////////////////////////////////////////////////
21 
22 // ROOT
23 #include "TObject.h"
24 #include "TMutex.h"
25 #include "TCondition.h"
26 #include "TThread.h"
27 // STD
28 #include <queue>
29 #include <vector>
30 #include <iostream>
31 #include <sstream>
32 #ifdef _MSC_VER
33 #define sleep(s) _sleep(s)
34 #endif
35 
36 
37 //////////////////////////////////////////////////////////////////////////
38 // //
39 // TNonCopyable //
40 // Class which makes child to be non-copyable object. //
41 // //
42 //////////////////////////////////////////////////////////////////////////
43 class TNonCopyable {
44 protected:
47 private:
48  TNonCopyable(const TNonCopyable&);
49  const TNonCopyable& operator=(const TNonCopyable&);
50 };
51 
52 //////////////////////////////////////////////////////////////////////////
53 // //
54 // TThreadPoolTaskImp //
55 // A base class for thread pool tasks. Users must inherit their //
56 // tasks classes from it. //
57 // Example: //
58 // class TTestTask: public TThreadPoolTaskImp<TTestTask, int> //
59 // //
60 // in this example, //
61 // TTestTask - is a user class, which implements //
62 // thread pool task object. //
63 // int - is a type of argument to TTestTask::run method. //
64 // //
65 // Please see the tutorial "tutorials/thread/threadPool.C" for //
66 // more details on how to use TThreadPool. //
67 // //
68 //////////////////////////////////////////////////////////////////////////
69 template <class aTask, class aParam>
71 public:
72  bool run(aParam &param) {
73  aTask *pThis = reinterpret_cast<aTask *>(this);
74  return pThis->runTask(param);
75  }
76 };
77 
78 //////////////////////////////////////////////////////////////////////////
79 // //
80 // TThreadPoolTask //
81 // This is a supporting class for TThreadPool. //
82 // It wraps users task objects in order to pass tasks arguments in //
83 // type-safe way. //
84 // //
85 //////////////////////////////////////////////////////////////////////////
86 template <class aTask, class aParam>
88 public:
90 
91 public:
92  TThreadPoolTask(task_t &task, aParam &param):
93  fTask(task),
94  fTaskParam(param) {
95  }
96  bool run() {
97  return fTask.run(fTaskParam);
98  }
99 
100 private:
101  task_t &fTask;
102  aParam fTaskParam;
103 };
104 
105 //////////////////////////////////////////////////////////////////////////
106 // //
107 // TThreadPool //
108 // This class implement a simple Thread Pool pattern. //
109 // So far it supports only one type of queue - FIFO //
110 // //
111 // Please see the tutorial "tutorials/thread/threadPool.C" for //
112 // more details on how to use TThreadPool. //
113 // //
114 //////////////////////////////////////////////////////////////////////////
115 template <class aTask, class aParam>
116 class TThreadPool : public TNonCopyable {
117 
119  typedef std::queue<task_t*> taskqueue_t;
120  typedef std::vector<TThread*> threads_array_t;
121 
122 public:
123  TThreadPool(size_t threadsCount, bool needDbg = false):
124  fStopped(false),
125  fSuccessfulTasks(0),
126  fTasksCount(0),
127  fIdleThreads(threadsCount),
128  fSilent(!needDbg) {
129  fThreadNeeded = new TCondition(&fMutex);
130  fThreadAvailable = new TCondition(&fMutex);
131  fAllTasksDone = new TCondition(&fMutexAllTasksDone);
132 
133  for (size_t i = 0; i < threadsCount; ++i) {
134  TThread *pThread = new TThread(&TThreadPool::Executor, this);
135  fThreads.push_back(pThread);
136  pThread->Run();
137  }
138 
139  fThreadJoinHelper = new TThread(&TThreadPool::JoinHelper, this);
140 
141  if (needDbg) {
142  fThreadMonitor = new TThread(&TThreadPool::Monitor, this);
143  fThreadMonitor->Run();
144  }
145  }
146 
148  Stop();
149  // deleting threads
150  threads_array_t::const_iterator iter = fThreads.begin();
151  threads_array_t::const_iterator iter_end = fThreads.end();
152  for (; iter != iter_end; ++iter)
153  delete(*iter);
154 
155  delete fThreadJoinHelper;
156 
157  delete fThreadNeeded;
158  delete fThreadAvailable;
159  delete fAllTasksDone;
160  }
161 
162  void AddThread() {
163  TLockGuard lock(&fMutex);
164  TThread *pThread = new TThread(&TThreadPool::Executor, this);
165  fThreads.push_back(pThread);
166  pThread->Run();
167  ++fIdleThreads;
168  }
169 
170  void PushTask(typename TThreadPoolTask<aTask, aParam>::task_t &task, aParam param) {
171  {
172  DbgLog("Main thread. Try to push a task");
173 
174  TLockGuard lock(&fMutex);
175  task_t *t = new task_t(task, param);
176  fTasks.push(t);
177  ++fTasksCount;
178 
179  DbgLog("Main thread. the task is pushed");
180  }
181  TLockGuard lock(&fMutex);
182  fThreadNeeded->Broadcast();
183  }
184 
185  void Stop(bool processRemainingJobs = false) {
186  // prevent more jobs from being added to the queue
187  if (fStopped)
188  return;
189 
190  if (processRemainingJobs) {
191  TLockGuard lock(&fMutex);
192  // wait for queue to drain
193  while (!fTasks.empty() && !fStopped) {
194  DbgLog("Main thread is waiting");
195  fThreadAvailable->Wait();
196  DbgLog("Main thread is DONE waiting");
197  }
198  }
199  // tell all threads to stop
200  {
201  TLockGuard lock(&fMutex);
202  fStopped = true;
203  fThreadNeeded->Broadcast();
204  DbgLog("Main threads requests to STOP");
205  }
206 
207  // Waiting for all threads to complete
208  fThreadJoinHelper->Run();
209  fThreadJoinHelper->Join();
210  }
211 
212  void Drain() {
213  // This method stops the calling thread until the task queue is empty
214 
215  TLockGuard lock(&fMutexAllTasksDone);
216  fAllTasksDone->Wait();
217  }
218 
219  size_t TasksCount() const {
220  return fTasksCount;
221  }
222 
223  size_t SuccessfulTasks() const {
224  return fSuccessfulTasks;
225  }
226 
227  size_t IdleThreads() const {
228  return fIdleThreads;
229  }
230 
231 private:
232  static void* Monitor(void *arg) {
233  if (NULL == arg)
234  return NULL;
235 
236  TThreadPool *pThis = reinterpret_cast<TThreadPool*>(arg);
237  while (true && !pThis->fStopped) {
238  std::stringstream ss;
239  ss
240  << ">>>> Check for tasks."
241  << " Number of Tasks: " << pThis->fTasks.size()
242  << "; Idle threads: " << pThis->IdleThreads();
243  pThis->DbgLog(ss.str());
244  sleep(1);
245  }
246  return NULL;
247  }
248 
249  static void* Executor(void *arg) {
250  TThreadPool *pThis = reinterpret_cast<TThreadPool*>(arg);
251 
252  while (!pThis->fStopped) {
253  task_t *task(NULL);
254 
255  // There is a task, let's take it
256  {
257  // Find a task to perform
258  TLockGuard lock(&pThis->fMutex);
259  if (pThis->fTasks.empty() && !pThis->fStopped) {
260  pThis->DbgLog("waiting for a task");
261 
262  if (pThis->fThreads.size() == pThis->fIdleThreads) {
264  pThis->fAllTasksDone->Broadcast();
265  }
266 
267  // No tasks, we wait for a task to come
268  pThis->fThreadNeeded->Wait();
269 
270  pThis->DbgLog("done waiting for tasks");
271  }
272  }
273 
274  {
275  TLockGuard lock(&pThis->fMutex);
276  if (!pThis->fTasks.empty()) {
277  --pThis->fIdleThreads;
278  task = pThis->fTasks.front();
279  pThis->fTasks.pop();
280 
281  pThis->DbgLog("get the task");
282  } else if (pThis->fThreads.size() == pThis->fIdleThreads) {
284  pThis->fAllTasksDone->Broadcast();
285  }
286  pThis->DbgLog("done Check <<<<");
287  }
288 
289  // Execute the task
290  if (task) {
291  pThis->DbgLog("Run the task");
292 
293  if (task->run()) {
294  TLockGuard lock(&pThis->fMutex);
295  ++pThis->fSuccessfulTasks;
296  }
297  delete task;
298  task = NULL;
299 
300  TLockGuard lock(&pThis->fMutex);
301  ++pThis->fIdleThreads;
302 
303  pThis->DbgLog("Done Running the task");
304  }
305  // Task is done, report that the thread is free
306  TLockGuard lock(&pThis->fMutex);
307  pThis->fThreadAvailable->Broadcast();
308  }
309 
310  pThis->DbgLog("**** DONE ***");
311  return NULL;
312  }
313 
314  static void *JoinHelper(void *arg) {
315  TThreadPool *pThis = reinterpret_cast<TThreadPool*>(arg);
316  threads_array_t::const_iterator iter = pThis->fThreads.begin();
317  threads_array_t::const_iterator iter_end = pThis->fThreads.end();
318  for (; iter != iter_end; ++iter)
319  (*iter)->Join();
320 
321  return NULL;
322  }
323 
324  static bool IsThreadActive(TThread *pThread) {
325  // so far we consider only kRunningState as activity
326  return (pThread->GetState() == TThread::kRunningState);
327  }
328 
329  void DbgLog(const std::string &msg) {
330  if (fSilent)
331  return;
332  TLockGuard lock(&fDbgOutputMutex);
333  std::cout << "[" << TThread::SelfId() << "] " << msg << std::endl;
334  }
335 
336 private:
337  taskqueue_t fTasks;
343  threads_array_t fThreads;
346  volatile bool fStopped;
348  size_t fTasksCount;
349  size_t fIdleThreads;
351  bool fSilent; // No DBG messages
352 };
353 
354 #endif
Definition: TMutex.h:30
TCondition * fThreadNeeded
Definition: TThreadPool.h:339
TThreadPool(size_t threadsCount, bool needDbg=false)
Definition: TThreadPool.h:123
EState GetState() const
Definition: TThread.h:126
void Drain()
Definition: TThreadPool.h:212
TCondition * fThreadAvailable
Definition: TThreadPool.h:340
bool run(aParam &param)
Definition: TThreadPool.h:72
TCondition * fAllTasksDone
Definition: TThreadPool.h:342
static void * Executor(void *arg)
Definition: TThreadPool.h:249
task_t & fTask
Definition: TThreadPool.h:101
void AddThread()
Definition: TThreadPool.h:162
TThreadPoolTask< aTask, aParam > task_t
Definition: TThreadPool.h:118
TThreadPoolTaskImp< aTask, aParam > task_t
Definition: TThreadPool.h:89
static Long_t SelfId()
Static method returning the id for the current thread.
Definition: TThread.cxx:547
Int_t Broadcast()
Definition: TCondition.h:54
Int_t Run(void *arg=0)
Start the thread.
Definition: TThread.cxx:561
TMutex fDbgOutputMutex
Definition: TThreadPool.h:350
Int_t Wait()
Wait to be signaled.
Definition: TCondition.cxx:75
static void * JoinHelper(void *arg)
Definition: TThreadPool.h:314
TThread * fThreadMonitor
Definition: TThreadPool.h:345
size_t TasksCount() const
Definition: TThreadPool.h:219
void PushTask(typename TThreadPoolTask< aTask, aParam >::task_t &task, aParam param)
Definition: TThreadPool.h:170
TMutex fMutex
Definition: TThreadPool.h:338
const TNonCopyable & operator=(const TNonCopyable &)
threads_array_t fThreads
Definition: TThreadPool.h:343
static bool IsThreadActive(TThread *pThread)
Definition: TThreadPool.h:324
size_t fTasksCount
Definition: TThreadPool.h:348
std::vector< TThread * > threads_array_t
Definition: TThreadPool.h:120
std::queue< task_t * > taskqueue_t
Definition: TThreadPool.h:119
volatile bool fStopped
Definition: TThreadPool.h:346
taskqueue_t fTasks
Definition: TThreadPool.h:337
size_t IdleThreads() const
Definition: TThreadPool.h:227
static void * Monitor(void *arg)
Definition: TThreadPool.h:232
size_t fSuccessfulTasks
Definition: TThreadPool.h:347
TThread * fThreadJoinHelper
Definition: TThreadPool.h:344
TMutex fMutexAllTasksDone
Definition: TThreadPool.h:341
void DbgLog(const std::string &msg)
Definition: TThreadPool.h:329
R__EXTERN C unsigned int sleep(unsigned int seconds)
size_t SuccessfulTasks() const
Definition: TThreadPool.h:223
auto * l
Definition: textangle.C:4
void Stop(bool processRemainingJobs=false)
Definition: TThreadPool.h:185
size_t fIdleThreads
Definition: TThreadPool.h:349
TThreadPoolTask(task_t &task, aParam &param)
Definition: TThreadPool.h:92