Loading [MathJax]/extensions/tex2jax.js
Logo ROOT  
Reference Guide
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
TThreadPool.h
Go to the documentation of this file.
1// @(#)root/thread:$Id$
2// Author: Anar Manafov 20/09/2011
3
4/*************************************************************************
5 * Copyright (C) 1995-2011, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#ifndef ROOT_TThreadPool
13#define ROOT_TThreadPool
14
15//////////////////////////////////////////////////////////////////////////
16// //
17// TThreadPool //
18// //
19// //
20//////////////////////////////////////////////////////////////////////////
21
22// ROOT
23#include "TObject.h"
24#include "TMutex.h"
25#include "TCondition.h"
26#include "TThread.h"
27// STD
28#include <queue>
29#include <vector>
30#include <iostream>
31#include <sstream>
32#ifdef _MSC_VER
33#define sleep(s) _sleep(s)
34#else
35#include <unistd.h>
36#endif
37
38
39//////////////////////////////////////////////////////////////////////////
40// //
41// TNonCopyable //
42// Class which makes child to be non-copyable object. //
43// //
44//////////////////////////////////////////////////////////////////////////
46protected:
49private:
52};
53
54//////////////////////////////////////////////////////////////////////////
55// //
56// TThreadPoolTaskImp //
57// A base class for thread pool tasks. Users must inherit their //
58// tasks classes from it. //
59// Example: //
60// class TTestTask: public TThreadPoolTaskImp<TTestTask, int> //
61// //
62// in this example, //
63// TTestTask - is a user class, which implements //
64// thread pool task object. //
65// int - is a type of argument to TTestTask::run method. //
66// //
67// Please see the tutorial "tutorials/thread/threadPool.C" for //
68// more details on how to use TThreadPool. //
69// //
70//////////////////////////////////////////////////////////////////////////
71template <class aTask, class aParam>
73public:
74 bool run(aParam &param) {
75 aTask *pThis = reinterpret_cast<aTask *>(this);
76 return pThis->runTask(param);
77 }
78};
79
80//////////////////////////////////////////////////////////////////////////
81// //
82// TThreadPoolTask //
83// This is a supporting class for TThreadPool. //
84// It wraps users task objects in order to pass tasks arguments in //
85// type-safe way. //
86// //
87//////////////////////////////////////////////////////////////////////////
88template <class aTask, class aParam>
90public:
92
93public:
94 TThreadPoolTask(task_t &task, aParam &param):
95 fTask(task),
96 fTaskParam(param) {
97 }
98 bool run() {
99 return fTask.run(fTaskParam);
100 }
101
102private:
105};
106
107//////////////////////////////////////////////////////////////////////////
108// //
109// TThreadPool //
110// This class implement a simple Thread Pool pattern. //
111// So far it supports only one type of queue - FIFO //
112// //
113// Please see the tutorial "tutorials/thread/threadPool.C" for //
114// more details on how to use TThreadPool. //
115// //
116//////////////////////////////////////////////////////////////////////////
117template <class aTask, class aParam>
118class TThreadPool : public TNonCopyable {
119
121 typedef std::queue<task_t*> taskqueue_t;
122 typedef std::vector<TThread*> threads_array_t;
123
124public:
125 TThreadPool(size_t threadsCount, bool needDbg = false):
126 fStopped(false),
128 fTasksCount(0),
129 fIdleThreads(threadsCount),
130 fSilent(!needDbg) {
134
135 for (size_t i = 0; i < threadsCount; ++i) {
136 TThread *pThread = new TThread(&TThreadPool::Executor, this);
137 fThreads.push_back(pThread);
138 pThread->Run();
139 }
140
142
143 if (needDbg) {
146 }
147 }
148
150 Stop();
151 // deleting threads
152 threads_array_t::const_iterator iter = fThreads.begin();
153 threads_array_t::const_iterator iter_end = fThreads.end();
154 for (; iter != iter_end; ++iter)
155 delete(*iter);
156
157 delete fThreadJoinHelper;
158
159 delete fThreadNeeded;
160 delete fThreadAvailable;
161 delete fAllTasksDone;
162 }
163
164 void AddThread() {
165 TLockGuard lock(&fMutex);
166 TThread *pThread = new TThread(&TThreadPool::Executor, this);
167 fThreads.push_back(pThread);
168 pThread->Run();
169 ++fIdleThreads;
170 }
171
172 void PushTask(typename TThreadPoolTask<aTask, aParam>::task_t &task, aParam param) {
173 {
174 DbgLog("Main thread. Try to push a task");
175
176 TLockGuard lock(&fMutex);
177 task_t *t = new task_t(task, param);
178 fTasks.push(t);
179 ++fTasksCount;
180
181 DbgLog("Main thread. the task is pushed");
182 }
183 TLockGuard lock(&fMutex);
185 }
186
187 void Stop(bool processRemainingJobs = false) {
188 // prevent more jobs from being added to the queue
189 if (fStopped)
190 return;
191
192 if (processRemainingJobs) {
193 TLockGuard lock(&fMutex);
194 // wait for queue to drain
195 while (!fTasks.empty() && !fStopped) {
196 DbgLog("Main thread is waiting");
198 DbgLog("Main thread is DONE waiting");
199 }
200 }
201 // tell all threads to stop
202 {
203 TLockGuard lock(&fMutex);
204 fStopped = true;
206 DbgLog("Main threads requests to STOP");
207 }
208
209 // Waiting for all threads to complete
212 }
213
214 void Drain() {
215 // This method stops the calling thread until the task queue is empty
216
219 }
220
221 size_t TasksCount() const {
222 return fTasksCount;
223 }
224
225 size_t SuccessfulTasks() const {
226 return fSuccessfulTasks;
227 }
228
229 size_t IdleThreads() const {
230 return fIdleThreads;
231 }
232
233private:
234 static void* Monitor(void *arg) {
235 if (NULL == arg)
236 return NULL;
237
238 TThreadPool *pThis = reinterpret_cast<TThreadPool*>(arg);
239 while (true && !pThis->fStopped) {
240 std::stringstream ss;
241 ss
242 << ">>>> Check for tasks."
243 << " Number of Tasks: " << pThis->fTasks.size()
244 << "; Idle threads: " << pThis->IdleThreads();
245 pThis->DbgLog(ss.str());
246 sleep(1);
247 }
248 return NULL;
249 }
250
251 static void* Executor(void *arg) {
252 TThreadPool *pThis = reinterpret_cast<TThreadPool*>(arg);
253
254 while (!pThis->fStopped) {
255 task_t *task(NULL);
256
257 // There is a task, let's take it
258 {
259 // Find a task to perform
260 TLockGuard lock(&pThis->fMutex);
261 if (pThis->fTasks.empty() && !pThis->fStopped) {
262 pThis->DbgLog("waiting for a task");
263
264 if (pThis->fThreads.size() == pThis->fIdleThreads) {
266 pThis->fAllTasksDone->Broadcast();
267 }
268
269 // No tasks, we wait for a task to come
270 pThis->fThreadNeeded->Wait();
271
272 pThis->DbgLog("done waiting for tasks");
273 }
274 }
275
276 {
277 TLockGuard lock(&pThis->fMutex);
278 if (!pThis->fTasks.empty()) {
279 --pThis->fIdleThreads;
280 task = pThis->fTasks.front();
281 pThis->fTasks.pop();
282
283 pThis->DbgLog("get the task");
284 } else if (pThis->fThreads.size() == pThis->fIdleThreads) {
286 pThis->fAllTasksDone->Broadcast();
287 }
288 pThis->DbgLog("done Check <<<<");
289 }
290
291 // Execute the task
292 if (task) {
293 pThis->DbgLog("Run the task");
294
295 if (task->run()) {
296 TLockGuard lock(&pThis->fMutex);
297 ++pThis->fSuccessfulTasks;
298 }
299 delete task;
300 task = NULL;
301
302 TLockGuard lock(&pThis->fMutex);
303 ++pThis->fIdleThreads;
304
305 pThis->DbgLog("Done Running the task");
306 }
307 // Task is done, report that the thread is free
308 TLockGuard lock(&pThis->fMutex);
309 pThis->fThreadAvailable->Broadcast();
310 }
311
312 pThis->DbgLog("**** DONE ***");
313 return NULL;
314 }
315
316 static void *JoinHelper(void *arg) {
317 TThreadPool *pThis = reinterpret_cast<TThreadPool*>(arg);
318 threads_array_t::const_iterator iter = pThis->fThreads.begin();
319 threads_array_t::const_iterator iter_end = pThis->fThreads.end();
320 for (; iter != iter_end; ++iter)
321 (*iter)->Join();
322
323 return NULL;
324 }
325
326 static bool IsThreadActive(TThread *pThread) {
327 // so far we consider only kRunningState as activity
328 return (pThread->GetState() == TThread::kRunningState);
329 }
330
331 void DbgLog(const std::string &msg) {
332 if (fSilent)
333 return;
335 std::cout << "[" << TThread::SelfId() << "] " << msg << std::endl;
336 }
337
338private:
348 volatile bool fStopped;
353 bool fSilent; // No DBG messages
354};
355
356#endif
R__EXTERN C unsigned int sleep(unsigned int seconds)
Int_t Broadcast()
Definition: TCondition.h:54
Int_t Wait()
Wait to be signaled.
Definition: TCondition.cxx:75
Definition: TMutex.h:30
const TNonCopyable & operator=(const TNonCopyable &)
TNonCopyable(const TNonCopyable &)
bool run(aParam &param)
Definition: TThreadPool.h:74
TThreadPoolTask(task_t &task, aParam &param)
Definition: TThreadPool.h:94
task_t & fTask
Definition: TThreadPool.h:103
TThreadPoolTaskImp< aTask, aParam > task_t
Definition: TThreadPool.h:91
TThreadPool(size_t threadsCount, bool needDbg=false)
Definition: TThreadPool.h:125
TThreadPoolTask< aTask, aParam > task_t
Definition: TThreadPool.h:120
TCondition * fThreadAvailable
Definition: TThreadPool.h:342
size_t fSuccessfulTasks
Definition: TThreadPool.h:349
threads_array_t fThreads
Definition: TThreadPool.h:345
size_t TasksCount() const
Definition: TThreadPool.h:221
TMutex fMutexAllTasksDone
Definition: TThreadPool.h:343
void Stop(bool processRemainingJobs=false)
Definition: TThreadPool.h:187
TThread * fThreadJoinHelper
Definition: TThreadPool.h:346
TCondition * fAllTasksDone
Definition: TThreadPool.h:344
static void * Executor(void *arg)
Definition: TThreadPool.h:251
size_t fIdleThreads
Definition: TThreadPool.h:351
void Drain()
Definition: TThreadPool.h:214
static bool IsThreadActive(TThread *pThread)
Definition: TThreadPool.h:326
static void * JoinHelper(void *arg)
Definition: TThreadPool.h:316
void DbgLog(const std::string &msg)
Definition: TThreadPool.h:331
static void * Monitor(void *arg)
Definition: TThreadPool.h:234
TCondition * fThreadNeeded
Definition: TThreadPool.h:341
TMutex fMutex
Definition: TThreadPool.h:340
size_t fTasksCount
Definition: TThreadPool.h:350
TMutex fDbgOutputMutex
Definition: TThreadPool.h:352
void AddThread()
Definition: TThreadPool.h:164
size_t IdleThreads() const
Definition: TThreadPool.h:229
void PushTask(typename TThreadPoolTask< aTask, aParam >::task_t &task, aParam param)
Definition: TThreadPool.h:172
taskqueue_t fTasks
Definition: TThreadPool.h:339
TThread * fThreadMonitor
Definition: TThreadPool.h:347
volatile bool fStopped
Definition: TThreadPool.h:348
size_t SuccessfulTasks() const
Definition: TThreadPool.h:225
std::vector< TThread * > threads_array_t
Definition: TThreadPool.h:122
std::queue< task_t * > taskqueue_t
Definition: TThreadPool.h:121
EState GetState() const
Definition: TThread.h:125
static Long_t SelfId()
Static method returning the id for the current thread.
Definition: TThread.cxx:548
Long_t Join(void **ret=0)
Join this thread.
Definition: TThread.cxx:509
@ kRunningState
Definition: TThread.h:60
Int_t Run(void *arg=0)
Start the thread.
Definition: TThread.cxx:562
auto * l
Definition: textangle.C:4