Logo ROOT  
Reference Guide
TThreadExecutor.cxx
Go to the documentation of this file.
2 #if !defined(_MSC_VER)
3 #pragma GCC diagnostic push
4 #pragma GCC diagnostic ignored "-Wshadow"
5 #endif
6 #include "tbb/tbb.h"
7 #if !defined(_MSC_VER)
8 #pragma GCC diagnostic pop
9 #endif
10 
11 //////////////////////////////////////////////////////////////////////////
12 ///
13 /// \class ROOT::TThreadExecutor
14 /// \ingroup Parallelism
15 /// \brief This class provides a simple interface to execute the same task
16 /// multiple times in parallel, possibly with different arguments every
17 /// time. This mimics the behaviour of python's pool.Map method.
18 ///
19 /// ### ROOT::TThreadExecutor::Map
20 /// This class inherits its interfaces from ROOT::TExecutor\n.
21 /// The two possible usages of the Map method are:\n
22 /// * Map(F func, unsigned nTimes): func is executed nTimes with no arguments
23 /// * Map(F func, T& args): func is executed on each element of the collection of arguments args
24 ///
25 /// For either signature, func is executed as many times as needed by a pool of
26 /// nThreads threads; It defaults to the number of cores.\n
27 /// A collection containing the result of each execution is returned.\n
28 /// **Note:** the user is responsible for the deletion of any object that might
29 /// be created upon execution of func, returned objects included: ROOT::TThreadExecutor never
30 /// deletes what it returns, it simply forgets it.\n
31 ///
32 /// \param func
33 /// \parblock
34 /// a lambda expression, an std::function, a loaded macro, a
35 /// functor class or a function that takes zero arguments (for the first signature)
36 /// or one (for the second signature).
37 /// \endparblock
38 /// \param args
39 /// \parblock
40 /// a standard vector, a ROOT::TSeq of integer type or an initializer list for the second signature.
41 /// An integer only for the first.
42 /// \endparblock
43 /// **Note:** in cases where the function to be executed takes more than
44 /// zero/one argument but all are fixed except zero/one, the function can be wrapped
45 /// in a lambda or via std::bind to give it the right signature.\n
46 ///
47 /// #### Return value:
48 /// An std::vector. The elements in the container
49 /// will be the objects returned by func.
50 ///
51 ///
52 /// #### Examples:
53 ///
54 /// ~~~{.cpp}
55 /// root[] ROOT::TThreadExecutor pool; auto hists = pool.Map(CreateHisto, 10);
56 /// root[] ROOT::TThreadExecutor pool(2); auto squares = pool.Map([](int a) { return a*a; }, {1,2,3});
57 /// ~~~
58 ///
59 /// ### ROOT::TThreadExecutor::MapReduce
60 /// This set of methods behaves exactly like Map, but takes an additional
61 /// function as a third argument. This function is applied to the set of
62 /// objects returned by the corresponding Map execution to "squash" them
63 /// to a single object. This function should be independent of the size of
64 /// the vector returned by Map due to optimization of the number of chunks.
65 ///
66 /// If this function is a binary operator, the "squashing" will be performed in parallel.
67 /// This is exclusive to ROOT::TThreadExecutor and not any other ROOT::TExecutor-derived classes.\n
68 /// An integer can be passed as the fourth argument indicating the number of chunks we want to divide our work in.
69 /// This may be useful to avoid the overhead introduced when running really short tasks.
70 ///
71 /// #### Examples:
72 /// ~~~{.cpp}
73 /// root[] ROOT::TThreadExecutor pool; auto ten = pool.MapReduce([]() { return 1; }, 10, [](std::vector<int> v) { return std::accumulate(v.begin(), v.end(), 0); })
74 /// root[] ROOT::TThreadExecutor pool; auto hist = pool.MapReduce(CreateAndFillHists, 10, PoolUtils::ReduceObjects);
75 /// ~~~
76 ///
77 //////////////////////////////////////////////////////////////////////////
78 
79 /*
80 VERY IMPORTANT NOTE ABOUT WORK ISOLATION
81 
82 We enclose the parallel_for and parallel_reduce invocations in a
83 task_arena::isolate because we want to prevent a thread to start executing an
84 outer task when the task it's running spawned subtasks, e.g. with a parallel_for,
85 and is waiting on inner tasks to be completed.
86 
87 While this change has a negligible performance impact, it has benefits for
88 several applications, for example big parallelised HEP frameworks and
89 RDataFrame analyses.
90 - For HEP Frameworks, without work isolation, it can happen that a huge
91 framework task is pulled by a yielding ROOT task.
92 This causes to delay the processing of the event which is interrupted by the
93 long task.
94 For example, work isolation avoids that during the wait due to the parallel
95 flushing of baskets, a very long simulation task is pulled in by the idle task.
96 - For RDataFrame analyses we want to guarantee that each entry is processed from
97 the beginning to the end without TBB interrupting it to pull in other work items.
98 As a corollary, the usage of ROOT (or TBB in work isolation mode) in actions
99 and transformations guarantee that each entry is processed from the beginning to
100 the end without being interrupted by the processing of outer tasks.
101 */
102 
103 namespace ROOT {
104 namespace Internal {
105 
106 /// A helper function to implement the TThreadExecutor::ParallelReduce methods
107 template<typename T>
108 static T ParallelReduceHelper(const std::vector<T> &objs, const std::function<T(T a, T b)> &redfunc)
109 {
110  using BRange_t = tbb::blocked_range<decltype(objs.begin())>;
111 
112  auto pred = [redfunc](BRange_t const & range, T init) {
113  return std::accumulate(range.begin(), range.end(), init, redfunc);
114  };
115 
116  BRange_t objRange(objs.begin(), objs.end());
117 
118  return tbb::this_task_arena::isolate([&] {
119  return tbb::parallel_reduce(objRange, T{}, pred, redfunc);
120  });
121 
122 }
123 
124 } // End NS Internal
125 
126 //////////////////////////////////////////////////////////////////////////
127 /// Class constructor.
128 /// If the scheduler is active (e.g. because another TThreadExecutor is in flight, or ROOT::EnableImplicitMT() was
129 /// called), work with the current pool of threads.
130 /// If not, initialize the pool of threads, spawning nThreads. nThreads' default value, 0, initializes the
131 /// pool with as many logical threads as are available in the system (see NLogicalCores in RTaskArenaWrapper.cxx).
132 ///
133 /// At construction time, TThreadExecutor automatically enables ROOT's thread-safety locks as per calling
134 /// ROOT::EnableThreadSafety().
136 {
138 }
139 
140 void TThreadExecutor::ParallelFor(unsigned int start, unsigned int end, unsigned step,
141  const std::function<void(unsigned int i)> &f)
142 {
143  fTaskArenaW->Access().execute([&] {
144  tbb::this_task_arena::isolate([&] {
145  tbb::parallel_for(start, end, step, f);
146  });
147  });
148 }
149 
150 double TThreadExecutor::ParallelReduce(const std::vector<double> &objs,
151  const std::function<double(double a, double b)> &redfunc)
152 {
153  return fTaskArenaW->Access().execute([&] { return ROOT::Internal::ParallelReduceHelper<double>(objs, redfunc); });
154 }
155 
156 float TThreadExecutor::ParallelReduce(const std::vector<float> &objs,
157  const std::function<float(float a, float b)> &redfunc)
158 {
159  return fTaskArenaW->Access().execute([&] { return ROOT::Internal::ParallelReduceHelper<float>(objs, redfunc); });
160 }
161 
163 {
164  return fTaskArenaW->TaskArenaSize();
165 }
166 
167 } // namespace ROOT
f
#define f(i)
Definition: RSha256.hxx:104
ROOT::TThreadExecutor::ParallelFor
void ParallelFor(unsigned start, unsigned end, unsigned step, const std::function< void(unsigned int i)> &f)
Definition: TThreadExecutor.cxx:140
ROOT::TThreadExecutor::TThreadExecutor
TThreadExecutor(UInt_t nThreads=0u)
Class constructor.
Definition: TThreadExecutor.cxx:135
ROOT::Internal::GetGlobalTaskArena
std::shared_ptr< ROOT::Internal::RTaskArenaWrapper > GetGlobalTaskArena(unsigned maxConcurrency=0)
Factory function returning a shared pointer to the instance of the global RTaskArenaWrapper.
Definition: RTaskArena.cxx:102
BatchHelpers::init
EvaluateInfo init(std::vector< RooRealProxy > parameters, std::vector< ArrayWrapper * > wrappers, std::vector< double * > arrays, size_t begin, size_t batchSize)
ROOT::TThreadExecutor::ParallelReduce
double ParallelReduce(const std::vector< double > &objs, const std::function< double(double a, double b)> &redfunc)
Definition: TThreadExecutor.cxx:150
ROOT::Internal::ParallelReduceHelper
static T ParallelReduceHelper(const std::vector< T > &objs, const std::function< T(T a, T b)> &redfunc)
A helper function to implement the TThreadExecutor::ParallelReduce methods.
Definition: TThreadExecutor.cxx:108
ROOT::TThreadExecutor::GetPoolSize
unsigned GetPoolSize()
Definition: TThreadExecutor.cxx:162
b
#define b(i)
Definition: RSha256.hxx:100
a
auto * a
Definition: textangle.C:12
TThreadExecutor.hxx
ROOT::R::function
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
Definition: RExports.h:151
ROOT::TThreadExecutor::fTaskArenaW
std::shared_ptr< ROOT::Internal::RTaskArenaWrapper > fTaskArenaW
Definition: TThreadExecutor.hxx:107
unsigned int
ROOT::Math::Chebyshev::T
double T(double x)
Definition: ChebyshevPol.h:34
ROOT
VSD Structures.
Definition: StringConv.hxx:21