Logo ROOT   6.10/09
Reference Guide
TThreadPool.h
Go to the documentation of this file.
1 // @(#)root/thread:$Id$
2 // Author: Anar Manafov 20/09/2011
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2011, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TThreadPool
13 #define ROOT_TThreadPool
14 
15 //////////////////////////////////////////////////////////////////////////
16 // //
17 // TThreadPool //
18 // //
19 // //
20 //////////////////////////////////////////////////////////////////////////
21 
22 // ROOT
23 #include "TObject.h"
24 #include "TMutex.h"
25 #include "TCondition.h"
26 #include "TThread.h"
27 // STD
28 #include <queue>
29 #include <vector>
30 #include <iostream>
31 #include <sstream>
32 
33 
34 //////////////////////////////////////////////////////////////////////////
35 // //
36 // TNonCopyable //
37 // Class which makes child to be non-copyable object. //
38 // //
39 //////////////////////////////////////////////////////////////////////////
40 class TNonCopyable {
41 protected:
44 private:
45  TNonCopyable(const TNonCopyable&);
46  const TNonCopyable& operator=(const TNonCopyable&);
47 };
48 
49 //////////////////////////////////////////////////////////////////////////
50 // //
51 // TThreadPoolTaskImp //
52 // A base class for thread pool tasks. Users must inherit their //
53 // tasks classes from it. //
54 // Example: //
55 // class TTestTask: public TThreadPoolTaskImp<TTestTask, int> //
56 // //
57 // in this example, //
58 // TTestTask - is a user class, which implements //
59 // thread pool task object. //
60 // int - is a type of argument to TTestTask::run method. //
61 // //
62 // Please see the tutorial "tutorials/thread/threadPool.C" for //
63 // more details on how to use TThreadPool. //
64 // //
65 //////////////////////////////////////////////////////////////////////////
66 template <class aTask, class aParam>
68 public:
69  bool run(aParam &param) {
70  aTask *pThis = reinterpret_cast<aTask *>(this);
71  return pThis->runTask(param);
72  }
73 };
74 
75 //////////////////////////////////////////////////////////////////////////
76 // //
77 // TThreadPoolTask //
78 // This is a supporting class for TThreadPool. //
79 // It wraps users task objects in order to pass tasks arguments in //
80 // type-safe way. //
81 // //
82 //////////////////////////////////////////////////////////////////////////
83 template <class aTask, class aParam>
85 public:
87 
88 public:
89  TThreadPoolTask(task_t &task, aParam &param):
90  fTask(task),
91  fTaskParam(param) {
92  }
93  bool run() {
94  return fTask.run(fTaskParam);
95  }
96 
97 private:
98  task_t &fTask;
99  aParam fTaskParam;
100 };
101 
102 //////////////////////////////////////////////////////////////////////////
103 // //
104 // TThreadPool //
105 // This class implement a simple Thread Pool pattern. //
106 // So far it supports only one type of queue - FIFO //
107 // //
108 // Please see the tutorial "tutorials/thread/threadPool.C" for //
109 // more details on how to use TThreadPool. //
110 // //
111 //////////////////////////////////////////////////////////////////////////
112 template <class aTask, class aParam>
113 class TThreadPool : public TNonCopyable {
114 
116  typedef std::queue<task_t*> taskqueue_t;
117  typedef std::vector<TThread*> threads_array_t;
118 
119 public:
120  TThreadPool(size_t threadsCount, bool needDbg = false):
121  fStopped(false),
122  fSuccessfulTasks(0),
123  fTasksCount(0),
124  fIdleThreads(threadsCount),
125  fSilent(!needDbg) {
126  fThreadNeeded = new TCondition(&fMutex);
127  fThreadAvailable = new TCondition(&fMutex);
128  fAllTasksDone = new TCondition(&fMutexAllTasksDone);
129 
130  for (size_t i = 0; i < threadsCount; ++i) {
131  TThread *pThread = new TThread(&TThreadPool::Executor, this);
132  fThreads.push_back(pThread);
133  pThread->Run();
134  }
135 
136  fThreadJoinHelper = new TThread(&TThreadPool::JoinHelper, this);
137 
138  if (needDbg) {
139  fThreadMonitor = new TThread(&TThreadPool::Monitor, this);
140  fThreadMonitor->Run();
141  }
142  }
143 
145  Stop();
146  // deleting threads
147  threads_array_t::const_iterator iter = fThreads.begin();
148  threads_array_t::const_iterator iter_end = fThreads.end();
149  for (; iter != iter_end; ++iter)
150  delete(*iter);
151 
152  delete fThreadJoinHelper;
153 
154  delete fThreadNeeded;
155  delete fThreadAvailable;
156  delete fAllTasksDone;
157  }
158 
159  void AddThread() {
160  TLockGuard lock(&fMutex);
161  TThread *pThread = new TThread(&TThreadPool::Executor, this);
162  fThreads.push_back(pThread);
163  pThread->Run();
164  ++fIdleThreads;
165  }
166 
167  void PushTask(typename TThreadPoolTask<aTask, aParam>::task_t &task, aParam param) {
168  {
169  DbgLog("Main thread. Try to push a task");
170 
171  TLockGuard lock(&fMutex);
172  task_t *t = new task_t(task, param);
173  fTasks.push(t);
174  ++fTasksCount;
175 
176  DbgLog("Main thread. the task is pushed");
177  }
178  TLockGuard lock(&fMutex);
179  fThreadNeeded->Broadcast();
180  }
181 
182  void Stop(bool processRemainingJobs = false) {
183  // prevent more jobs from being added to the queue
184  if (fStopped)
185  return;
186 
187  if (processRemainingJobs) {
188  TLockGuard lock(&fMutex);
189  // wait for queue to drain
190  while (!fTasks.empty() && !fStopped) {
191  DbgLog("Main thread is waiting");
192  fThreadAvailable->Wait();
193  DbgLog("Main thread is DONE waiting");
194  }
195  }
196  // tell all threads to stop
197  {
198  TLockGuard lock(&fMutex);
199  fStopped = true;
200  fThreadNeeded->Broadcast();
201  DbgLog("Main threads requests to STOP");
202  }
203 
204  // Waiting for all threads to complete
205  fThreadJoinHelper->Run();
206  fThreadJoinHelper->Join();
207  }
208 
209  void Drain() {
210  // This method stops the calling thread until the task queue is empty
211 
212  TLockGuard lock(&fMutexAllTasksDone);
213  fAllTasksDone->Wait();
214  }
215 
216  size_t TasksCount() const {
217  return fTasksCount;
218  }
219 
220  size_t SuccessfulTasks() const {
221  return fSuccessfulTasks;
222  }
223 
224  size_t IdleThreads() const {
225  return fIdleThreads;
226  }
227 
228 private:
229  static void* Monitor(void *arg) {
230  if (NULL == arg)
231  return NULL;
232 
233  TThreadPool *pThis = reinterpret_cast<TThreadPool*>(arg);
234  while (true && !pThis->fStopped) {
235  std::stringstream ss;
236  ss
237  << ">>>> Check for tasks."
238  << " Number of Tasks: " << pThis->fTasks.size()
239  << "; Idle threads: " << pThis->IdleThreads();
240  pThis->DbgLog(ss.str());
241  sleep(1);
242  }
243  return NULL;
244  }
245 
246  static void* Executor(void *arg) {
247  TThreadPool *pThis = reinterpret_cast<TThreadPool*>(arg);
248 
249  while (!pThis->fStopped) {
250  task_t *task(NULL);
251 
252  // There is a task, let's take it
253  {
254  // Find a task to perform
255  TLockGuard lock(&pThis->fMutex);
256  if (pThis->fTasks.empty() && !pThis->fStopped) {
257  pThis->DbgLog("waiting for a task");
258 
259  if (pThis->fThreads.size() == pThis->fIdleThreads) {
261  pThis->fAllTasksDone->Broadcast();
262  }
263 
264  // No tasks, we wait for a task to come
265  pThis->fThreadNeeded->Wait();
266 
267  pThis->DbgLog("done waiting for tasks");
268  }
269  }
270 
271  {
272  TLockGuard lock(&pThis->fMutex);
273  if (!pThis->fTasks.empty()) {
274  --pThis->fIdleThreads;
275  task = pThis->fTasks.front();
276  pThis->fTasks.pop();
277 
278  pThis->DbgLog("get the task");
279  } else if (pThis->fThreads.size() == pThis->fIdleThreads) {
281  pThis->fAllTasksDone->Broadcast();
282  }
283  pThis->DbgLog("done Check <<<<");
284  }
285 
286  // Execute the task
287  if (task) {
288  pThis->DbgLog("Run the task");
289 
290  if (task->run()) {
291  TLockGuard lock(&pThis->fMutex);
292  ++pThis->fSuccessfulTasks;
293  }
294  delete task;
295  task = NULL;
296 
297  TLockGuard lock(&pThis->fMutex);
298  ++pThis->fIdleThreads;
299 
300  pThis->DbgLog("Done Running the task");
301  }
302  // Task is done, report that the thread is free
303  TLockGuard lock(&pThis->fMutex);
304  pThis->fThreadAvailable->Broadcast();
305  }
306 
307  pThis->DbgLog("**** DONE ***");
308  return NULL;
309  }
310 
311  static void *JoinHelper(void *arg) {
312  TThreadPool *pThis = reinterpret_cast<TThreadPool*>(arg);
313  threads_array_t::const_iterator iter = pThis->fThreads.begin();
314  threads_array_t::const_iterator iter_end = pThis->fThreads.end();
315  for (; iter != iter_end; ++iter)
316  (*iter)->Join();
317 
318  return NULL;
319  }
320 
321  static bool IsThreadActive(TThread *pThread) {
322  // so far we consider only kRunningState as activity
323  return (pThread->GetState() == TThread::kRunningState);
324  }
325 
326  void DbgLog(const std::string &msg) {
327  if (fSilent)
328  return;
329  TLockGuard lock(&fDbgOutputMutex);
330  std::cout << "[" << TThread::SelfId() << "] " << msg << std::endl;
331  }
332 
333 private:
334  taskqueue_t fTasks;
340  threads_array_t fThreads;
343  volatile bool fStopped;
345  size_t fTasksCount;
346  size_t fIdleThreads;
348  bool fSilent; // No DBG messages
349 };
350 
351 #endif
Definition: TMutex.h:30
TCondition * fThreadNeeded
Definition: TThreadPool.h:336
TThreadPool(size_t threadsCount, bool needDbg=false)
Definition: TThreadPool.h:120
EState GetState() const
Definition: TThread.h:126
void Drain()
Definition: TThreadPool.h:209
TCondition * fThreadAvailable
Definition: TThreadPool.h:337
bool run(aParam &param)
Definition: TThreadPool.h:69
TCondition * fAllTasksDone
Definition: TThreadPool.h:339
static void * Executor(void *arg)
Definition: TThreadPool.h:246
task_t & fTask
Definition: TThreadPool.h:98
#define NULL
Definition: RtypesCore.h:88
void AddThread()
Definition: TThreadPool.h:159
TThreadPoolTask< aTask, aParam > task_t
Definition: TThreadPool.h:115
TThreadPoolTaskImp< aTask, aParam > task_t
Definition: TThreadPool.h:86
static Long_t SelfId()
Static method returning the id for the current thread.
Definition: TThread.cxx:538
Int_t Broadcast()
Definition: TCondition.h:54
Int_t Run(void *arg=0)
Start the thread.
Definition: TThread.cxx:552
TMutex fDbgOutputMutex
Definition: TThreadPool.h:347
Int_t Wait()
Wait to be signaled.
Definition: TCondition.cxx:75
aParam fTaskParam
Definition: TThreadPool.h:99
static void * JoinHelper(void *arg)
Definition: TThreadPool.h:311
TThread * fThreadMonitor
Definition: TThreadPool.h:342
size_t TasksCount() const
Definition: TThreadPool.h:216
void PushTask(typename TThreadPoolTask< aTask, aParam >::task_t &task, aParam param)
Definition: TThreadPool.h:167
TMutex fMutex
Definition: TThreadPool.h:335
const TNonCopyable & operator=(const TNonCopyable &)
threads_array_t fThreads
Definition: TThreadPool.h:340
static bool IsThreadActive(TThread *pThread)
Definition: TThreadPool.h:321
size_t fTasksCount
Definition: TThreadPool.h:345
std::vector< TThread * > threads_array_t
Definition: TThreadPool.h:117
TLine * l
Definition: textangle.C:4
std::queue< task_t * > taskqueue_t
Definition: TThreadPool.h:116
volatile bool fStopped
Definition: TThreadPool.h:343
taskqueue_t fTasks
Definition: TThreadPool.h:334
size_t IdleThreads() const
Definition: TThreadPool.h:224
static void * Monitor(void *arg)
Definition: TThreadPool.h:229
size_t fSuccessfulTasks
Definition: TThreadPool.h:344
TThread * fThreadJoinHelper
Definition: TThreadPool.h:341
TMutex fMutexAllTasksDone
Definition: TThreadPool.h:338
void DbgLog(const std::string &msg)
Definition: TThreadPool.h:326
R__EXTERN C unsigned int sleep(unsigned int seconds)
size_t SuccessfulTasks() const
Definition: TThreadPool.h:220
void Stop(bool processRemainingJobs=false)
Definition: TThreadPool.h:182
size_t fIdleThreads
Definition: TThreadPool.h:346
TThreadPoolTask(task_t &task, aParam &param)
Definition: TThreadPool.h:89