Logo ROOT   6.18/05
Reference Guide
TThreadExecutor.hxx
Go to the documentation of this file.
1// @(#)root/thread:$Id$
2// Author: Xavier Valls March 2016
3
4/*************************************************************************
5 * Copyright (C) 1995-2006, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#ifndef ROOT_TThreadExecutor
13#define ROOT_TThreadExecutor
14
15#include "RConfigure.h"
16
17// exclude in case ROOT does not have IMT support
18#ifndef R__USE_IMT
19// No need to error out for dictionaries.
20# if !defined(__ROOTCLING__) && !defined(G__DICTIONARY)
21# error "Cannot use ROOT::TThreadExecutor without defining R__USE_IMT."
22# endif
23#else
24
25#include "ROOT/TExecutor.hxx"
26#include "ROOT/TPoolManager.hxx"
27#include "TROOT.h"
28#include "TError.h"
29#include <functional>
30#include <memory>
31#include <numeric>
32
33namespace ROOT {
34
35 class TThreadExecutor: public TExecutor<TThreadExecutor> {
36 public:
37 explicit TThreadExecutor();
38
39 explicit TThreadExecutor(UInt_t nThreads);
40
43
44 template<class F>
45 void Foreach(F func, unsigned nTimes, unsigned nChunks = 0);
46 template<class F, class INTEGER>
47 void Foreach(F func, ROOT::TSeq<INTEGER> args, unsigned nChunks = 0);
48 /// \cond
49 template<class F, class T>
50 void Foreach(F func, std::initializer_list<T> args, unsigned nChunks = 0);
51 /// \endcond
52 template<class F, class T>
53 void Foreach(F func, std::vector<T> &args, unsigned nChunks = 0);
54 template<class F, class T>
55 void Foreach(F func, const std::vector<T> &args, unsigned nChunks = 0);
56
58 template<class F, class Cond = noReferenceCond<F>>
59 auto Map(F func, unsigned nTimes) -> std::vector<typename std::result_of<F()>::type>;
60 template<class F, class INTEGER, class Cond = noReferenceCond<F, INTEGER>>
61 auto Map(F func, ROOT::TSeq<INTEGER> args) -> std::vector<typename std::result_of<F(INTEGER)>::type>;
62 template<class F, class T, class Cond = noReferenceCond<F, T>>
63 auto Map(F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>;
64
65 // // MapReduce
66 // // the late return types also check at compile-time whether redfunc is compatible with func,
67 // // other than checking that func is compatible with the type of arguments.
68 // // a static_assert check in TThreadExecutor::Reduce is used to check that redfunc is compatible with the type returned by func
70 template<class F, class R, class Cond = noReferenceCond<F>>
71 auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of<F()>::type;
72 template<class F, class R, class Cond = noReferenceCond<F>>
73 auto MapReduce(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> typename std::result_of<F()>::type;
74 template<class F, class INTEGER, class R, class Cond = noReferenceCond<F, INTEGER>>
75 auto MapReduce(F func, ROOT::TSeq<INTEGER> args, R redfunc, unsigned nChunks) -> typename std::result_of<F(INTEGER)>::type;
76 /// \cond
77 template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
78 auto MapReduce(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks) -> typename std::result_of<F(T)>::type;
79 /// \endcond
80 template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
81 auto MapReduce(F func, std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type;
82 template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
83 auto MapReduce(F func, std::vector<T> &args, R redfunc, unsigned nChunks) -> typename std::result_of<F(T)>::type;
84
86 template<class T, class BINARYOP> auto Reduce(const std::vector<T> &objs, BINARYOP redfunc) -> decltype(redfunc(objs.front(), objs.front()));
87 template<class T, class R> auto Reduce(const std::vector<T> &objs, R redfunc) -> decltype(redfunc(objs));
88
89 unsigned GetPoolSize();
90
91 protected:
92 template<class F, class R, class Cond = noReferenceCond<F>>
93 auto Map(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F()>::type>;
94 template<class F, class INTEGER, class R, class Cond = noReferenceCond<F, INTEGER>>
95 auto Map(F func, ROOT::TSeq<INTEGER> args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(INTEGER)>::type>;
96 template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
97 auto Map(F func, std::vector<T> &args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(T)>::type>;
98 template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
99 auto Map(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(T)>::type>;
100
101 private:
102 void ParallelFor(unsigned start, unsigned end, unsigned step, const std::function<void(unsigned int i)> &f);
103 double ParallelReduce(const std::vector<double> &objs, const std::function<double(double a, double b)> &redfunc);
104 float ParallelReduce(const std::vector<float> &objs, const std::function<float(float a, float b)> &redfunc);
105 template<class T, class R>
106 auto SeqReduce(const std::vector<T> &objs, R redfunc) -> decltype(redfunc(objs));
107
108 std::shared_ptr<ROOT::Internal::TPoolManager> fSched = nullptr;
109 };
110
111 /************ TEMPLATE METHODS IMPLEMENTATION ******************/
112
113 //////////////////////////////////////////////////////////////////////////
114 /// Execute func (with no arguments) nTimes in parallel.
115 /// Functions that take more than zero arguments can be executed (with
116 /// fixed arguments) by wrapping them in a lambda or with std::bind.
117 template<class F>
118 void TThreadExecutor::Foreach(F func, unsigned nTimes, unsigned nChunks) {
119 if (nChunks == 0) {
120 ParallelFor(0U, nTimes, 1, [&](unsigned int){func();});
121 return;
122 }
123
124 unsigned step = (nTimes + nChunks - 1) / nChunks;
125 auto lambda = [&](unsigned int i)
126 {
127 for (unsigned j = 0; j < step && (i + j) < nTimes; j++) {
128 func();
129 }
130 };
131 ParallelFor(0U, nTimes, step, lambda);
132 }
133
134 //////////////////////////////////////////////////////////////////////////
135 /// Execute func in parallel, taking an element of a
136 /// sequence as argument.
137 template<class F, class INTEGER>
138 void TThreadExecutor::Foreach(F func, ROOT::TSeq<INTEGER> args, unsigned nChunks) {
139 if (nChunks == 0) {
140 ParallelFor(*args.begin(), *args.end(), args.step(), [&](unsigned int i){func(i);});
141 return;
142 }
143 unsigned start = *args.begin();
144 unsigned end = *args.end();
145 unsigned seqStep = args.step();
146 unsigned step = (end - start + nChunks - 1) / nChunks; //ceiling the division
147
148 auto lambda = [&](unsigned int i)
149 {
150 for (unsigned j = 0; j < step && (i + j) < end; j+=seqStep) {
151 func(i + j);
152 }
153 };
154 ParallelFor(start, end, step, lambda);
155 }
156
157 /// \cond
158 //////////////////////////////////////////////////////////////////////////
159 /// Execute func in parallel, taking an element of a
160 /// initializer_list as argument.
161 template<class F, class T>
162 void TThreadExecutor::Foreach(F func, std::initializer_list<T> args, unsigned nChunks) {
163 std::vector<T> vargs(std::move(args));
164 Foreach(func, vargs, nChunks);
165 }
166 /// \endcond
167
168 //////////////////////////////////////////////////////////////////////////
169 /// Execute func in parallel, taking an element of an
170 /// std::vector as argument.
171 template<class F, class T>
172 void TThreadExecutor::Foreach(F func, std::vector<T> &args, unsigned nChunks) {
173 unsigned int nToProcess = args.size();
174 if (nChunks == 0) {
175 ParallelFor(0U, nToProcess, 1, [&](unsigned int i){func(args[i]);});
176 return;
177 }
178
179 unsigned step = (nToProcess + nChunks - 1) / nChunks; //ceiling the division
180 auto lambda = [&](unsigned int i)
181 {
182 for (unsigned j = 0; j < step && (i + j) < nToProcess; j++) {
183 func(args[i + j]);
184 }
185 };
186 ParallelFor(0U, nToProcess, step, lambda);
187 }
188
189 //////////////////////////////////////////////////////////////////////////
190 /// Execute func in parallel, taking an element of a std::vector as argument.
191 template<class F, class T>
192 void TThreadExecutor::Foreach(F func, const std::vector<T> &args, unsigned nChunks) {
193 unsigned int nToProcess = args.size();
194 if (nChunks == 0) {
195 ParallelFor(0U, nToProcess, 1, [&](unsigned int i){func(args[i]);});
196 return;
197 }
198
199 unsigned step = (nToProcess + nChunks - 1) / nChunks; //ceiling the division
200 auto lambda = [&](unsigned int i)
201 {
202 for (unsigned j = 0; j < step && (i + j) < nToProcess; j++) {
203 func(args[i + j]);
204 }
205 };
206 ParallelFor(0U, nToProcess, step, lambda);
207 }
208
209 //////////////////////////////////////////////////////////////////////////
210 /// Execute func (with no arguments) nTimes in parallel.
211 /// A vector containg executions' results is returned.
212 /// Functions that take more than zero arguments can be executed (with
213 /// fixed arguments) by wrapping them in a lambda or with std::bind.
214 template<class F, class Cond>
215 auto TThreadExecutor::Map(F func, unsigned nTimes) -> std::vector<typename std::result_of<F()>::type> {
216 using retType = decltype(func());
217 std::vector<retType> reslist(nTimes);
218 auto lambda = [&](unsigned int i)
219 {
220 reslist[i] = func();
221 };
222 ParallelFor(0U, nTimes, 1, lambda);
223
224 return reslist;
225 }
226
227 //////////////////////////////////////////////////////////////////////////
228 /// Execute func in parallel, taking an element of a
229 /// sequence as argument.
230 /// A vector containg executions' results is returned.
231 template<class F, class INTEGER, class Cond>
232 auto TThreadExecutor::Map(F func, ROOT::TSeq<INTEGER> args) -> std::vector<typename std::result_of<F(INTEGER)>::type> {
233 unsigned start = *args.begin();
234 unsigned end = *args.end();
235 unsigned seqStep = args.step();
236
237 using retType = decltype(func(start));
238 std::vector<retType> reslist(args.size());
239 auto lambda = [&](unsigned int i)
240 {
241 reslist[i] = func(i);
242 };
243 ParallelFor(start, end, seqStep, lambda);
244
245 return reslist;
246 }
247
248 //////////////////////////////////////////////////////////////////////////
249 /// Execute func (with no arguments) nTimes in parallel.
250 /// Divides and groups the executions in nChunks (if it doesn't make sense will reduce the number of chunks) with partial reduction;
251 /// A vector containg partial reductions' results is returned.
252 template<class F, class R, class Cond>
253 auto TThreadExecutor::Map(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F()>::type> {
254 if (nChunks == 0)
255 {
256 return Map(func, nTimes);
257 }
258
259 unsigned step = (nTimes + nChunks - 1) / nChunks;
260 // Avoid empty chunks
261 unsigned actualChunks = (nTimes + step - 1) / step;
262 using retType = decltype(func());
263 std::vector<retType> reslist(actualChunks);
264 auto lambda = [&](unsigned int i)
265 {
266 std::vector<retType> partialResults(std::min(nTimes-i, step));
267 for (unsigned j = 0; j < step && (i + j) < nTimes; j++) {
268 partialResults[j] = func();
269 }
270 reslist[i / step] = Reduce(partialResults, redfunc);
271 };
272 ParallelFor(0U, nTimes, step, lambda);
273
274 return reslist;
275 }
276
277 //////////////////////////////////////////////////////////////////////////
278 /// Execute func in parallel, taking an element of an
279 /// std::vector as argument.
280 /// A vector containg executions' results is returned.
281 // actual implementation of the Map method. all other calls with arguments eventually
282 // call this one
283 template<class F, class T, class Cond>
284 auto TThreadExecutor::Map(F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type> {
285 // //check whether func is callable
286 using retType = decltype(func(args.front()));
287
288 unsigned int nToProcess = args.size();
289 std::vector<retType> reslist(nToProcess);
290
291 auto lambda = [&](unsigned int i)
292 {
293 reslist[i] = func(args[i]);
294 };
295
296 ParallelFor(0U, nToProcess, 1, lambda);
297
298 return reslist;
299 }
300
301 //////////////////////////////////////////////////////////////////////////
302 /// Execute func in parallel, taking an element of a
303 /// sequence as argument.
304 /// Divides and groups the executions in nChunks (if it doesn't make sense will reduce the number of chunks) with partial reduction\n
305 /// A vector containg partial reductions' results is returned.
306 template<class F, class INTEGER, class R, class Cond>
307 auto TThreadExecutor::Map(F func, ROOT::TSeq<INTEGER> args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(INTEGER)>::type> {
308 if (nChunks == 0)
309 {
310#ifdef _MSC_VER
311 // temporary work-around to silent the error C2668: 'ROOT::TThreadExecutor::Map':
312 // ambiguous call to overloaded function, due to a MS compiler bug
313 unsigned start = *args.begin();
314 unsigned end = *args.end();
315 unsigned seqStep = args.step();
316
317 using retType = decltype(func(start));
318 std::vector<retType> reslist(end - start);
319 auto lambda = [&](unsigned int i)
320 {
321 reslist[i] = func(i);
322 };
323 ParallelFor(start, end, seqStep, lambda);
324 return reslist;
325#else
326 return Map(func, args);
327#endif
328 }
329
330 unsigned start = *args.begin();
331 unsigned end = *args.end();
332 unsigned seqStep = args.step();
333 unsigned step = (end - start + nChunks - 1) / nChunks; //ceiling the division
334 // Avoid empty chunks
335 unsigned actualChunks = (end - start + step - 1) / step;
336
337 using retType = decltype(func(start));
338 std::vector<retType> reslist(actualChunks);
339 auto lambda = [&](unsigned int i)
340 {
341 std::vector<retType> partialResults(std::min(end-i, step));
342 for (unsigned j = 0; j < step && (i + j) < end; j+=seqStep) {
343 partialResults[j] = func(i + j);
344 }
345 reslist[i / step] = Reduce(partialResults, redfunc);
346 };
347 ParallelFor(start, end, step, lambda);
348
349 return reslist;
350 }
351
352/// \cond
353 //////////////////////////////////////////////////////////////////////////
354 /// Execute func in parallel, taking an element of an
355 /// std::vector as argument. Divides and groups the executions in nChunks with partial reduction.
356 /// If it doesn't make sense will reduce the number of chunks.\n
357 /// A vector containg partial reductions' results is returned.
358 template<class F, class T, class R, class Cond>
359 auto TThreadExecutor::Map(F func, std::vector<T> &args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(T)>::type> {
360 if (nChunks == 0)
361 {
362 return Map(func, args);
363 }
364
365 unsigned int nToProcess = args.size();
366 unsigned step = (nToProcess + nChunks - 1) / nChunks; //ceiling the division
367 // Avoid empty chunks
368 unsigned actualChunks = (nToProcess + step - 1) / step;
369
370 using retType = decltype(func(args.front()));
371 std::vector<retType> reslist(actualChunks);
372 auto lambda = [&](unsigned int i)
373 {
374 std::vector<T> partialResults(step);
375 for (unsigned j = 0; j < step && (i + j) < nToProcess; j++) {
376 partialResults[j] = func(args[i + j]);
377 }
378 reslist[i / step] = Reduce(partialResults, redfunc);
379 };
380
381 ParallelFor(0U, nToProcess, step, lambda);
382
383 return reslist;
384 }
385
386 //////////////////////////////////////////////////////////////////////////
387 /// Execute func in parallel, taking an element of an
388 /// std::initializer_list as an argument. Divides and groups the executions in nChunks with partial reduction.
389 /// If it doesn't make sense will reduce the number of chunks.\n
390 /// A vector containg partial reductions' results is returned.
391 template<class F, class T, class R, class Cond>
392 auto TThreadExecutor::Map(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(T)>::type> {
393 std::vector<T> vargs(std::move(args));
394 const auto &reslist = Map(func, vargs, redfunc, nChunks);
395 return reslist;
396 }
397/// \endcond
398
399
400 //////////////////////////////////////////////////////////////////////////
401 /// This method behaves just like Map, but an additional redfunc function
402 /// must be provided. redfunc is applied to the vector Map would return and
403 /// must return the same type as func. In practice, redfunc can be used to
404 /// "squash" the vector returned by Map into a single object by merging,
405 /// adding, mixing the elements of the vector.\n
406 /// The fourth argument indicates the number of chunks we want to divide our work in.
407 template<class F, class R, class Cond>
408 auto TThreadExecutor::MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of<F()>::type {
409 return Reduce(Map(func, nTimes), redfunc);
410 }
411
412 template<class F, class R, class Cond>
413 auto TThreadExecutor::MapReduce(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> typename std::result_of<F()>::type {
414 return Reduce(Map(func, nTimes, redfunc, nChunks), redfunc);
415 }
416
417 template<class F, class INTEGER, class R, class Cond>
418 auto TThreadExecutor::MapReduce(F func, ROOT::TSeq<INTEGER> args, R redfunc, unsigned nChunks) -> typename std::result_of<F(INTEGER)>::type {
419 return Reduce(Map(func, args, redfunc, nChunks), redfunc);
420 }
421 /// \cond
422 template<class F, class T, class R, class Cond>
423 auto TThreadExecutor::MapReduce(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks) -> typename std::result_of<F(T)>::type {
424 return Reduce(Map(func, args, redfunc, nChunks), redfunc);
425 }
426 /// \endcond
427
428 template<class F, class T, class R, class Cond>
429 auto TThreadExecutor::MapReduce(F func, std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type {
430 return Reduce(Map(func, args), redfunc);
431 }
432
433 template<class F, class T, class R, class Cond>
434 auto TThreadExecutor::MapReduce(F func, std::vector<T> &args, R redfunc, unsigned nChunks) -> typename std::result_of<F(T)>::type {
435 return Reduce(Map(func, args, redfunc, nChunks), redfunc);
436 }
437
438 //////////////////////////////////////////////////////////////////////////
439 /// "Reduce" an std::vector into a single object in parallel by passing a
440 /// binary operator as the second argument to act on pairs of elements of the std::vector.
441 template<class T, class BINARYOP>
442 auto TThreadExecutor::Reduce(const std::vector<T> &objs, BINARYOP redfunc) -> decltype(redfunc(objs.front(), objs.front()))
443 {
444 // check we can apply reduce to objs
445 static_assert(std::is_same<decltype(redfunc(objs.front(), objs.front())), T>::value, "redfunc does not have the correct signature");
446 return ParallelReduce(objs, redfunc);
447 }
448
449 //////////////////////////////////////////////////////////////////////////
450 /// "Reduce" an std::vector into a single object by passing a
451 /// function as the second argument defining the reduction operation.
452 template<class T, class R>
453 auto TThreadExecutor::Reduce(const std::vector<T> &objs, R redfunc) -> decltype(redfunc(objs))
454 {
455 // check we can apply reduce to objs
456 static_assert(std::is_same<decltype(redfunc(objs)), T>::value, "redfunc does not have the correct signature");
457 return SeqReduce(objs, redfunc);
458 }
459
460 template<class T, class R>
461 auto TThreadExecutor::SeqReduce(const std::vector<T> &objs, R redfunc) -> decltype(redfunc(objs))
462 {
463 return redfunc(objs);
464 }
465
466} // namespace ROOT
467
468#endif // R__USE_IMT
469#endif
#define b(i)
Definition: RSha256.hxx:100
#define f(i)
Definition: RSha256.hxx:104
#define R(a, b, c, d, e, f, g, h, i)
Definition: RSha256.hxx:110
unsigned int UInt_t
Definition: RtypesCore.h:42
int type
Definition: TGX11.cxx:120
This class defines an interface to execute the same task multiple times in parallel,...
Definition: TExecutor.hxx:61
A pseudo container class which is a generator of indices.
Definition: TSeq.hxx:66
iterator begin() const
Definition: TSeq.hxx:163
T step() const
Definition: TSeq.hxx:184
iterator end() const
Definition: TSeq.hxx:166
This class provides a simple interface to execute the same task multiple times in parallel,...
auto SeqReduce(const std::vector< T > &objs, R redfunc) -> decltype(redfunc(objs))
auto Map(F func, unsigned nTimes) -> std::vector< typename std::result_of< F()>::type >
Execute func (with no arguments) nTimes in parallel.
TThreadExecutor & operator=(TThreadExecutor &)=delete
std::shared_ptr< ROOT::Internal::TPoolManager > fSched
void ParallelFor(unsigned start, unsigned end, unsigned step, const std::function< void(unsigned int i)> &f)
auto Map(F func, std::vector< T > &args, R redfunc, unsigned nChunks) -> std::vector< typename std::result_of< F(T)>::type >
auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of< F()>::type
This method behaves just like Map, but an additional redfunc function must be provided.
auto Reduce(const std::vector< T > &objs, BINARYOP redfunc) -> decltype(redfunc(objs.front(), objs.front()))
"Reduce" an std::vector into a single object in parallel by passing a binary operator as the second a...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute func (with no arguments) nTimes in parallel.
TThreadExecutor()
Class constructor.
TThreadExecutor(TThreadExecutor &)=delete
double ParallelReduce(const std::vector< double > &objs, const std::function< double(double a, double b)> &redfunc)
auto Map(F func, std::initializer_list< T > args, R redfunc, unsigned nChunks) -> std::vector< typename std::result_of< F(T)>::type >
#define F(x, y, z)
double T(double x)
Definition: ChebyshevPol.h:34
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
Definition: RExports.h:151
auto Map(Args &&... args) -> decltype(ROOT::Detail::VecOps::MapFromTuple(std::forward_as_tuple(args...), std::make_index_sequence< sizeof...(args) - 1 >()))
Create new collection applying a callable to the elements of the input collection.
Definition: RVec.hxx:907
Namespace for new ROOT classes and functions.
Definition: StringConv.hxx:21
auto * a
Definition: textangle.C:12