Logo ROOT  
Reference Guide
TThreadExecutor.cxx
Go to the documentation of this file.
3#if !defined(_MSC_VER)
4#pragma GCC diagnostic push
5#pragma GCC diagnostic ignored "-Wshadow"
6#endif
7#include "tbb/tbb.h"
8#define TBB_PREVIEW_GLOBAL_CONTROL 1 // required for TBB versions preceding 2019_U4
9#include "tbb/global_control.h"
10#if !defined(_MSC_VER)
11#pragma GCC diagnostic pop
12#endif
13
14//////////////////////////////////////////////////////////////////////////
15///
16/// \class ROOT::TThreadExecutor
17/// \ingroup Parallelism
18/// \brief This class provides a simple interface to execute the same task
19/// multiple times in parallel, possibly with different arguments every
20/// time. This mimics the behaviour of python's pool.Map method.
21///
22/// ### ROOT::TThreadExecutor::Map
23/// This class inherits its interfaces from ROOT::TExecutor\n.
24/// The two possible usages of the Map method are:\n
25/// * Map(F func, unsigned nTimes): func is executed nTimes with no arguments
26/// * Map(F func, T& args): func is executed on each element of the collection of arguments args
27///
28/// For either signature, func is executed as many times as needed by a pool of
29/// nThreads threads; It defaults to the number of cores.\n
30/// A collection containing the result of each execution is returned.\n
31/// **Note:** the user is responsible for the deletion of any object that might
32/// be created upon execution of func, returned objects included: ROOT::TThreadExecutor never
33/// deletes what it returns, it simply forgets it.\n
34///
35/// \param func
36/// \parblock
37/// a lambda expression, an std::function, a loaded macro, a
38/// functor class or a function that takes zero arguments (for the first signature)
39/// or one (for the second signature).
40/// \endparblock
41/// \param args
42/// \parblock
43/// a standard vector, a ROOT::TSeq of integer type or an initializer list for the second signature.
44/// An integer only for the first.
45/// \endparblock
46/// **Note:** in cases where the function to be executed takes more than
47/// zero/one argument but all are fixed except zero/one, the function can be wrapped
48/// in a lambda or via std::bind to give it the right signature.\n
49///
50/// #### Return value:
51/// An std::vector. The elements in the container
52/// will be the objects returned by func.
53///
54///
55/// #### Examples:
56///
57/// ~~~{.cpp}
58/// root[] ROOT::TThreadExecutor pool; auto hists = pool.Map(CreateHisto, 10);
59/// root[] ROOT::TThreadExecutor pool(2); auto squares = pool.Map([](int a) { return a*a; }, {1,2,3});
60/// ~~~
61///
62/// ### ROOT::TThreadExecutor::MapReduce
63/// This set of methods behaves exactly like Map, but takes an additional
64/// function as a third argument. This function is applied to the set of
65/// objects returned by the corresponding Map execution to "squash" them
66/// to a single object. This function should be independent of the size of
67/// the vector returned by Map due to optimization of the number of chunks.
68///
69/// If this function is a binary operator, the "squashing" will be performed in parallel.
70/// This is exclusive to ROOT::TThreadExecutor and not any other ROOT::TExecutor-derived classes.\n
71/// An integer can be passed as the fourth argument indicating the number of chunks we want to divide our work in.
72/// This may be useful to avoid the overhead introduced when running really short tasks.
73///
74/// #### Examples:
75/// ~~~{.cpp}
76/// root[] ROOT::TThreadExecutor pool; auto ten = pool.MapReduce([]() { return 1; }, 10, [](std::vector<int> v) { return std::accumulate(v.begin(), v.end(), 0); })
77/// root[] ROOT::TThreadExecutor pool; auto hist = pool.MapReduce(CreateAndFillHists, 10, PoolUtils::ReduceObjects);
78/// ~~~
79///
80//////////////////////////////////////////////////////////////////////////
81
82/*
83VERY IMPORTANT NOTE ABOUT WORK ISOLATION
84
85We enclose the parallel_for and parallel_reduce invocations in a
86task_arena::isolate because we want to prevent a thread to start executing an
87outer task when the task it's running spawned subtasks, e.g. with a parallel_for,
88and is waiting on inner tasks to be completed.
89
90While this change has a negligible performance impact, it has benefits for
91several applications, for example big parallelised HEP frameworks and
92RDataFrame analyses.
93- For HEP Frameworks, without work isolation, it can happen that a huge
94framework task is pulled by a yielding ROOT task.
95This causes to delay the processing of the event which is interrupted by the
96long task.
97For example, work isolation avoids that during the wait due to the parallel
98flushing of baskets, a very long simulation task is pulled in by the idle task.
99- For RDataFrame analyses we want to guarantee that each entry is processed from
100the beginning to the end without TBB interrupting it to pull in other work items.
101As a corollary, the usage of ROOT (or TBB in work isolation mode) in actions
102and transformations guarantee that each entry is processed from the beginning to
103the end without being interrupted by the processing of outer tasks.
104*/
105
106namespace ROOT {
107namespace Internal {
108
109/// A helper function to implement the TThreadExecutor::ParallelReduce methods
110template<typename T>
111static T ParallelReduceHelper(const std::vector<T> &objs, const std::function<T(T a, T b)> &redfunc)
112{
113 using BRange_t = tbb::blocked_range<decltype(objs.begin())>;
114
115 auto pred = [redfunc](BRange_t const & range, T init) {
116 return std::accumulate(range.begin(), range.end(), init, redfunc);
117 };
118
119 BRange_t objRange(objs.begin(), objs.end());
120
121 return tbb::this_task_arena::isolate([&] {
122 return tbb::parallel_reduce(objRange, T{}, pred, redfunc);
123 });
124
125}
126
127} // End NS Internal
128
129//////////////////////////////////////////////////////////////////////////
130/// Class constructor.
131/// If the scheduler is active (e.g. because another TThreadExecutor is in flight, or ROOT::EnableImplicitMT() was
132/// called), work with the current pool of threads.
133/// If not, initialize the pool of threads, spawning nThreads. nThreads' default value, 0, initializes the
134/// pool with as many logical threads as are available in the system (see NLogicalCores in RTaskArenaWrapper.cxx).
135///
136/// At construction time, TThreadExecutor automatically enables ROOT's thread-safety locks as per calling
137/// ROOT::EnableThreadSafety().
139{
141}
142
143void TThreadExecutor::ParallelFor(unsigned int start, unsigned int end, unsigned step,
144 const std::function<void(unsigned int i)> &f)
145{
146 if (GetPoolSize() > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)) {
147 Warning("TThreadExecutor::ParallelFor",
148 "tbb::global_control is limiting the number of parallel workers."
149 " Proceeding with %zu threads this time",
150 tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism));
151 }
152 fTaskArenaW->Access().execute([&] {
153 tbb::this_task_arena::isolate([&] {
154 tbb::parallel_for(start, end, step, f);
155 });
156 });
157}
158
159double TThreadExecutor::ParallelReduce(const std::vector<double> &objs,
160 const std::function<double(double a, double b)> &redfunc)
161{
162 if (GetPoolSize() > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)) {
163 Warning("TThreadExecutor::ParallelReduce",
164 "tbb::global_control is limiting the number of parallel workers."
165 " Proceeding with %zu threads this time",
166 tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism));
167 }
168 return fTaskArenaW->Access().execute([&] { return ROOT::Internal::ParallelReduceHelper<double>(objs, redfunc); });
169}
170
171float TThreadExecutor::ParallelReduce(const std::vector<float> &objs,
172 const std::function<float(float a, float b)> &redfunc)
173{
174 if (GetPoolSize() > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)) {
175 Warning("TThreadExecutor::ParallelReduce",
176 "tbb::global_control is limiting the number of parallel workers."
177 " Proceeding with %zu threads this time",
178 tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism));
179 }
180 return fTaskArenaW->Access().execute([&] { return ROOT::Internal::ParallelReduceHelper<float>(objs, redfunc); });
181}
182
184{
185 return fTaskArenaW->TaskArenaSize();
186}
187
188} // namespace ROOT
#define b(i)
Definition: RSha256.hxx:100
#define f(i)
Definition: RSha256.hxx:104
void Warning(const char *location, const char *msgfmt,...)
void ParallelFor(unsigned start, unsigned end, unsigned step, const std::function< void(unsigned int i)> &f)
std::shared_ptr< ROOT::Internal::RTaskArenaWrapper > fTaskArenaW
TThreadExecutor(UInt_t nThreads=0u)
Class constructor.
double ParallelReduce(const std::vector< double > &objs, const std::function< double(double a, double b)> &redfunc)
EvaluateInfo init(std::vector< RooRealProxy > parameters, std::vector< ArrayWrapper * > wrappers, std::vector< double * > arrays, size_t begin, size_t batchSize)
static T ParallelReduceHelper(const std::vector< T > &objs, const std::function< T(T a, T b)> &redfunc)
A helper function to implement the TThreadExecutor::ParallelReduce methods.
std::shared_ptr< ROOT::Internal::RTaskArenaWrapper > GetGlobalTaskArena(unsigned maxConcurrency=0)
Factory function returning a shared pointer to the instance of the global RTaskArenaWrapper.
Definition: RTaskArena.cxx:109
double T(double x)
Definition: ChebyshevPol.h:34
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
Definition: RExports.h:151
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Definition: StringConv.hxx:21
auto * a
Definition: textangle.C:12