Logo ROOT   6.18/05
Reference Guide
TThreadPool.h
Go to the documentation of this file.
1// @(#)root/thread:$Id$
2// Author: Anar Manafov 20/09/2011
3
4/*************************************************************************
5 * Copyright (C) 1995-2011, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#ifndef ROOT_TThreadPool
13#define ROOT_TThreadPool
14
15//////////////////////////////////////////////////////////////////////////
16// //
17// TThreadPool //
18// //
19// //
20//////////////////////////////////////////////////////////////////////////
21
22// ROOT
23#include "TObject.h"
24#include "TMutex.h"
25#include "TCondition.h"
26#include "TThread.h"
27// STD
28#include <queue>
29#include <vector>
30#include <iostream>
31#include <sstream>
32#ifdef _MSC_VER
33#define sleep(s) _sleep(s)
34#endif
35
36
37//////////////////////////////////////////////////////////////////////////
38// //
39// TNonCopyable //
40// Class which makes child to be non-copyable object. //
41// //
42//////////////////////////////////////////////////////////////////////////
44protected:
47private:
50};
51
52//////////////////////////////////////////////////////////////////////////
53// //
54// TThreadPoolTaskImp //
55// A base class for thread pool tasks. Users must inherit their //
56// tasks classes from it. //
57// Example: //
58// class TTestTask: public TThreadPoolTaskImp<TTestTask, int> //
59// //
60// in this example, //
61// TTestTask - is a user class, which implements //
62// thread pool task object. //
63// int - is a type of argument to TTestTask::run method. //
64// //
65// Please see the tutorial "tutorials/thread/threadPool.C" for //
66// more details on how to use TThreadPool. //
67// //
68//////////////////////////////////////////////////////////////////////////
69template <class aTask, class aParam>
71public:
72 bool run(aParam &param) {
73 aTask *pThis = reinterpret_cast<aTask *>(this);
74 return pThis->runTask(param);
75 }
76};
77
78//////////////////////////////////////////////////////////////////////////
79// //
80// TThreadPoolTask //
81// This is a supporting class for TThreadPool. //
82// It wraps users task objects in order to pass tasks arguments in //
83// type-safe way. //
84// //
85//////////////////////////////////////////////////////////////////////////
86template <class aTask, class aParam>
88public:
90
91public:
92 TThreadPoolTask(task_t &task, aParam &param):
93 fTask(task),
94 fTaskParam(param) {
95 }
96 bool run() {
97 return fTask.run(fTaskParam);
98 }
99
100private:
103};
104
105//////////////////////////////////////////////////////////////////////////
106// //
107// TThreadPool //
108// This class implement a simple Thread Pool pattern. //
109// So far it supports only one type of queue - FIFO //
110// //
111// Please see the tutorial "tutorials/thread/threadPool.C" for //
112// more details on how to use TThreadPool. //
113// //
114//////////////////////////////////////////////////////////////////////////
115template <class aTask, class aParam>
116class TThreadPool : public TNonCopyable {
117
119 typedef std::queue<task_t*> taskqueue_t;
120 typedef std::vector<TThread*> threads_array_t;
121
122public:
123 TThreadPool(size_t threadsCount, bool needDbg = false):
124 fStopped(false),
126 fTasksCount(0),
127 fIdleThreads(threadsCount),
128 fSilent(!needDbg) {
132
133 for (size_t i = 0; i < threadsCount; ++i) {
134 TThread *pThread = new TThread(&TThreadPool::Executor, this);
135 fThreads.push_back(pThread);
136 pThread->Run();
137 }
138
140
141 if (needDbg) {
144 }
145 }
146
148 Stop();
149 // deleting threads
150 threads_array_t::const_iterator iter = fThreads.begin();
151 threads_array_t::const_iterator iter_end = fThreads.end();
152 for (; iter != iter_end; ++iter)
153 delete(*iter);
154
155 delete fThreadJoinHelper;
156
157 delete fThreadNeeded;
158 delete fThreadAvailable;
159 delete fAllTasksDone;
160 }
161
162 void AddThread() {
163 TLockGuard lock(&fMutex);
164 TThread *pThread = new TThread(&TThreadPool::Executor, this);
165 fThreads.push_back(pThread);
166 pThread->Run();
167 ++fIdleThreads;
168 }
169
170 void PushTask(typename TThreadPoolTask<aTask, aParam>::task_t &task, aParam param) {
171 {
172 DbgLog("Main thread. Try to push a task");
173
174 TLockGuard lock(&fMutex);
175 task_t *t = new task_t(task, param);
176 fTasks.push(t);
177 ++fTasksCount;
178
179 DbgLog("Main thread. the task is pushed");
180 }
181 TLockGuard lock(&fMutex);
183 }
184
185 void Stop(bool processRemainingJobs = false) {
186 // prevent more jobs from being added to the queue
187 if (fStopped)
188 return;
189
190 if (processRemainingJobs) {
191 TLockGuard lock(&fMutex);
192 // wait for queue to drain
193 while (!fTasks.empty() && !fStopped) {
194 DbgLog("Main thread is waiting");
196 DbgLog("Main thread is DONE waiting");
197 }
198 }
199 // tell all threads to stop
200 {
201 TLockGuard lock(&fMutex);
202 fStopped = true;
204 DbgLog("Main threads requests to STOP");
205 }
206
207 // Waiting for all threads to complete
210 }
211
212 void Drain() {
213 // This method stops the calling thread until the task queue is empty
214
217 }
218
219 size_t TasksCount() const {
220 return fTasksCount;
221 }
222
223 size_t SuccessfulTasks() const {
224 return fSuccessfulTasks;
225 }
226
227 size_t IdleThreads() const {
228 return fIdleThreads;
229 }
230
231private:
232 static void* Monitor(void *arg) {
233 if (NULL == arg)
234 return NULL;
235
236 TThreadPool *pThis = reinterpret_cast<TThreadPool*>(arg);
237 while (true && !pThis->fStopped) {
238 std::stringstream ss;
239 ss
240 << ">>>> Check for tasks."
241 << " Number of Tasks: " << pThis->fTasks.size()
242 << "; Idle threads: " << pThis->IdleThreads();
243 pThis->DbgLog(ss.str());
244 sleep(1);
245 }
246 return NULL;
247 }
248
249 static void* Executor(void *arg) {
250 TThreadPool *pThis = reinterpret_cast<TThreadPool*>(arg);
251
252 while (!pThis->fStopped) {
253 task_t *task(NULL);
254
255 // There is a task, let's take it
256 {
257 // Find a task to perform
258 TLockGuard lock(&pThis->fMutex);
259 if (pThis->fTasks.empty() && !pThis->fStopped) {
260 pThis->DbgLog("waiting for a task");
261
262 if (pThis->fThreads.size() == pThis->fIdleThreads) {
264 pThis->fAllTasksDone->Broadcast();
265 }
266
267 // No tasks, we wait for a task to come
268 pThis->fThreadNeeded->Wait();
269
270 pThis->DbgLog("done waiting for tasks");
271 }
272 }
273
274 {
275 TLockGuard lock(&pThis->fMutex);
276 if (!pThis->fTasks.empty()) {
277 --pThis->fIdleThreads;
278 task = pThis->fTasks.front();
279 pThis->fTasks.pop();
280
281 pThis->DbgLog("get the task");
282 } else if (pThis->fThreads.size() == pThis->fIdleThreads) {
284 pThis->fAllTasksDone->Broadcast();
285 }
286 pThis->DbgLog("done Check <<<<");
287 }
288
289 // Execute the task
290 if (task) {
291 pThis->DbgLog("Run the task");
292
293 if (task->run()) {
294 TLockGuard lock(&pThis->fMutex);
295 ++pThis->fSuccessfulTasks;
296 }
297 delete task;
298 task = NULL;
299
300 TLockGuard lock(&pThis->fMutex);
301 ++pThis->fIdleThreads;
302
303 pThis->DbgLog("Done Running the task");
304 }
305 // Task is done, report that the thread is free
306 TLockGuard lock(&pThis->fMutex);
307 pThis->fThreadAvailable->Broadcast();
308 }
309
310 pThis->DbgLog("**** DONE ***");
311 return NULL;
312 }
313
314 static void *JoinHelper(void *arg) {
315 TThreadPool *pThis = reinterpret_cast<TThreadPool*>(arg);
316 threads_array_t::const_iterator iter = pThis->fThreads.begin();
317 threads_array_t::const_iterator iter_end = pThis->fThreads.end();
318 for (; iter != iter_end; ++iter)
319 (*iter)->Join();
320
321 return NULL;
322 }
323
324 static bool IsThreadActive(TThread *pThread) {
325 // so far we consider only kRunningState as activity
326 return (pThread->GetState() == TThread::kRunningState);
327 }
328
329 void DbgLog(const std::string &msg) {
330 if (fSilent)
331 return;
333 std::cout << "[" << TThread::SelfId() << "] " << msg << std::endl;
334 }
335
336private:
346 volatile bool fStopped;
351 bool fSilent; // No DBG messages
352};
353
354#endif
R__EXTERN C unsigned int sleep(unsigned int seconds)
Int_t Broadcast()
Definition: TCondition.h:54
Int_t Wait()
Wait to be signaled.
Definition: TCondition.cxx:75
Definition: TMutex.h:30
const TNonCopyable & operator=(const TNonCopyable &)
TNonCopyable(const TNonCopyable &)
bool run(aParam &param)
Definition: TThreadPool.h:72
TThreadPoolTask(task_t &task, aParam &param)
Definition: TThreadPool.h:92
task_t & fTask
Definition: TThreadPool.h:101
TThreadPoolTaskImp< aTask, aParam > task_t
Definition: TThreadPool.h:89
TThreadPool(size_t threadsCount, bool needDbg=false)
Definition: TThreadPool.h:123
TThreadPoolTask< aTask, aParam > task_t
Definition: TThreadPool.h:118
TCondition * fThreadAvailable
Definition: TThreadPool.h:340
size_t fSuccessfulTasks
Definition: TThreadPool.h:347
threads_array_t fThreads
Definition: TThreadPool.h:343
size_t TasksCount() const
Definition: TThreadPool.h:219
TMutex fMutexAllTasksDone
Definition: TThreadPool.h:341
void Stop(bool processRemainingJobs=false)
Definition: TThreadPool.h:185
TThread * fThreadJoinHelper
Definition: TThreadPool.h:344
TCondition * fAllTasksDone
Definition: TThreadPool.h:342
static void * Executor(void *arg)
Definition: TThreadPool.h:249
size_t fIdleThreads
Definition: TThreadPool.h:349
void Drain()
Definition: TThreadPool.h:212
static bool IsThreadActive(TThread *pThread)
Definition: TThreadPool.h:324
static void * JoinHelper(void *arg)
Definition: TThreadPool.h:314
void DbgLog(const std::string &msg)
Definition: TThreadPool.h:329
static void * Monitor(void *arg)
Definition: TThreadPool.h:232
TCondition * fThreadNeeded
Definition: TThreadPool.h:339
TMutex fMutex
Definition: TThreadPool.h:338
size_t fTasksCount
Definition: TThreadPool.h:348
TMutex fDbgOutputMutex
Definition: TThreadPool.h:350
void AddThread()
Definition: TThreadPool.h:162
size_t IdleThreads() const
Definition: TThreadPool.h:227
void PushTask(typename TThreadPoolTask< aTask, aParam >::task_t &task, aParam param)
Definition: TThreadPool.h:170
taskqueue_t fTasks
Definition: TThreadPool.h:337
TThread * fThreadMonitor
Definition: TThreadPool.h:345
volatile bool fStopped
Definition: TThreadPool.h:346
size_t SuccessfulTasks() const
Definition: TThreadPool.h:223
std::vector< TThread * > threads_array_t
Definition: TThreadPool.h:120
std::queue< task_t * > taskqueue_t
Definition: TThreadPool.h:119
EState GetState() const
Definition: TThread.h:126
static Long_t SelfId()
Static method returning the id for the current thread.
Definition: TThread.cxx:547
Long_t Join(void **ret=0)
Join this thread.
Definition: TThread.cxx:508
@ kRunningState
Definition: TThread.h:61
Int_t Run(void *arg=0)
Start the thread.
Definition: TThread.cxx:561
auto * l
Definition: textangle.C:4