Logo ROOT  
Reference Guide
TThreadExecutor.cxx
Go to the documentation of this file.
1// Require TBB without captured exceptions
2#define TBB_USE_CAPTURED_EXCEPTION 0
3
6#if !defined(_MSC_VER)
7#pragma GCC diagnostic push
8#pragma GCC diagnostic ignored "-Wshadow"
9#endif
10#include "tbb/tbb.h"
11#define TBB_PREVIEW_GLOBAL_CONTROL 1 // required for TBB versions preceding 2019_U4
12#include "tbb/global_control.h"
13#if !defined(_MSC_VER)
14#pragma GCC diagnostic pop
15#endif
16
17//////////////////////////////////////////////////////////////////////////
18///
19/// \class ROOT::TThreadExecutor
20/// \ingroup Parallelism
21/// \brief This class provides a simple interface to execute the same task
22/// multiple times in parallel threads, possibly with different arguments every
23/// time.
24///
25/// ### ROOT::TThreadExecutor::Map
26/// This class inherits its interfaces from ROOT::TExecutorCRTP\n, adapting them for multithreaded
27/// parallelism and extends them supporting:
28/// * Parallel `Foreach` operations.
29/// * Custom task granularity and partial reduction, by specifying reduction function
30/// and the number of chunks as extra parameters for the Map call. This is specially useful
31/// to reduce the size of intermediate results when dealing with a sizeable number of elements
32/// in the input data.
33///
34/// The two possible usages of the Map method are:\n
35/// * Map(F func, unsigned nTimes): func is executed nTimes with no arguments
36/// * Map(F func, T& args): func is executed on each element of the collection of arguments args
37///
38/// For either signature, func is executed as many times as needed by a pool of
39/// nThreads threads, where nThreads typically defaults to the number of cores.\n
40/// A collection containing the result of each execution is returned.\n
41/// **Note:** the user is responsible for the deletion of any object that might
42/// be created upon execution of func, returned objects included: ROOT::TThreadExecutor never
43/// deletes what it returns, it simply forgets it.\n
44///
45/// \param func
46/// \parblock
47/// a callable object, such as a lambda expression, an std::function, a
48/// functor object or a function that takes zero arguments (for the first signature)
49/// or one (for the second signature).
50/// \endparblock
51/// \param args
52/// \parblock
53/// a standard vector, a ROOT::TSeq of integer type or an initializer list for the second signature.
54/// An integer only for the first.
55/// \endparblock
56/// **Note:** in cases where the function to be executed takes more than
57/// zero/one argument but all are fixed except zero/one, the function can be wrapped
58/// in a lambda or via std::bind to give it the right signature.\n
59///
60/// #### Return value:
61/// An std::vector. The elements in the container
62/// will be the objects returned by func.
63///
64///
65/// #### Examples:
66///
67/// ~~~{.cpp}
68/// root[] ROOT::TThreadExecutor pool; auto hists = pool.Map(CreateHisto, 10);
69/// root[] ROOT::TThreadExecutor pool(2); auto squares = pool.Map([](int a) { return a*a; }, {1,2,3});
70/// ~~~
71///
72/// ### ROOT::TThreadExecutor::MapReduce
73/// This set of methods behaves exactly like Map, but takes an additional
74/// function as a third argument. This function is applied to the set of
75/// objects returned by the corresponding Map execution to "squash" them
76/// into a single object. This function should be independent of the size of
77/// the vector returned by Map due to optimization of the number of chunks.
78///
79/// If this function is a binary operator, the "squashing" will be performed in parallel.
80/// This is exclusive to ROOT::TThreadExecutor and not any other ROOT::TExecutorCRTP-derived classes.\n
81///
82/// An integer can be passed as the fourth argument indicating the number of chunks we want to divide our work in.
83/// This may be useful to avoid the overhead introduced when running really short tasks.
84///
85/// #### Examples:
86/// ~~~{.cpp}
87/// root[] ROOT::TThreadExecutor pool; auto ten = pool.MapReduce([]() { return 1; }, 10, [](const std::vector<int> &v) { return std::accumulate(v.begin(), v.end(), 0); })
88/// root[] ROOT::TThreadExecutor pool; auto hist = pool.MapReduce(CreateAndFillHists, 10, PoolUtils::ReduceObjects);
89/// ~~~
90///
91//////////////////////////////////////////////////////////////////////////
92
93/*
94VERY IMPORTANT NOTE ABOUT WORK ISOLATION
95
96We enclose the parallel_for and parallel_reduce invocations in a
97task_arena::isolate because we want to prevent a thread to start executing an
98outer task when the task it's running spawned subtasks, e.g. with a parallel_for,
99and is waiting on inner tasks to be completed.
100
101While this change has a negligible performance impact, it has benefits for
102several applications, for example big parallelised HEP frameworks and
103RDataFrame analyses.
104- For HEP Frameworks, without work isolation, it can happen that a huge
105framework task is pulled by a yielding ROOT task.
106This causes to delay the processing of the event which is interrupted by the
107long task.
108For example, work isolation avoids that during the wait due to the parallel
109flushing of baskets, a very long simulation task is pulled in by the idle task.
110- For RDataFrame analyses we want to guarantee that each entry is processed from
111the beginning to the end without TBB interrupting it to pull in other work items.
112As a corollary, the usage of ROOT (or TBB in work isolation mode) in actions
113and transformations guarantee that each entry is processed from the beginning to
114the end without being interrupted by the processing of outer tasks.
115*/
116
117namespace ROOT {
118namespace Internal {
119
120/// A helper function to implement the TThreadExecutor::ParallelReduce methods
121template<typename T>
122static T ParallelReduceHelper(const std::vector<T> &objs, const std::function<T(T a, T b)> &redfunc)
123{
124 using BRange_t = tbb::blocked_range<decltype(objs.begin())>;
125
126 auto pred = [redfunc](BRange_t const & range, T init) {
127 return std::accumulate(range.begin(), range.end(), init, redfunc);
128 };
129
130 BRange_t objRange(objs.begin(), objs.end());
131
132 return tbb::this_task_arena::isolate([&] {
133 return tbb::parallel_reduce(objRange, T{}, pred, redfunc);
134 });
135
136}
137
138} // End NS Internal
139
140//////////////////////////////////////////////////////////////////////////
141/// \brief Class constructor.
142/// If the scheduler is active (e.g. because another TThreadExecutor is in flight, or ROOT::EnableImplicitMT() was
143/// called), work with the current pool of threads.
144/// If not, initialize the pool of threads, spawning nThreads. nThreads' default value, 0, initializes the
145/// pool with as many logical threads as are available in the system (see NLogicalCores in RTaskArenaWrapper.cxx).
146///
147/// At construction time, TThreadExecutor automatically enables ROOT's thread-safety locks as per calling
148/// ROOT::EnableThreadSafety().
150{
152}
153
154//////////////////////////////////////////////////////////////////////////
155/// \brief Execute a function in parallel over the indices of a loop.
156///
157/// \param start Start index of the loop.
158/// \param end End index of the loop.
159/// \param step Step size of the loop.
160/// \param f function to execute.
161void TThreadExecutor::ParallelFor(unsigned int start, unsigned int end, unsigned step,
162 const std::function<void(unsigned int i)> &f)
163{
164 if (GetPoolSize() > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)) {
165 Warning("TThreadExecutor::ParallelFor",
166 "tbb::global_control is limiting the number of parallel workers."
167 " Proceeding with %zu threads this time",
168 tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism));
169 }
170 fTaskArenaW->Access().execute([&] {
171 tbb::this_task_arena::isolate([&] {
172 tbb::parallel_for(start, end, step, f);
173 });
174 });
175}
176
177//////////////////////////////////////////////////////////////////////////
178/// \brief "Reduce" in parallel an std::vector<double> into a single double value
179///
180/// \param objs A vector of elements to combine.
181/// \param redfunc Reduction function to combine the elements of the vector `objs`.
182/// \return A value result of combining the vector elements into a single object of the same type.
183double TThreadExecutor::ParallelReduce(const std::vector<double> &objs,
184 const std::function<double(double a, double b)> &redfunc)
185{
186 if (GetPoolSize() > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)) {
187 Warning("TThreadExecutor::ParallelReduce",
188 "tbb::global_control is limiting the number of parallel workers."
189 " Proceeding with %zu threads this time",
190 tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism));
191 }
192 return fTaskArenaW->Access().execute([&] { return ROOT::Internal::ParallelReduceHelper<double>(objs, redfunc); });
193}
194
195//////////////////////////////////////////////////////////////////////////
196/// \brief "Reduce" in parallel an std::vector<float> into a single float value
197///
198/// \param objs A vector of elements to combine.
199/// \param redfunc Reduction function to combine the elements of the vector `objs`.
200/// \return A value result of combining the vector elements into a single object of the same type.
201float TThreadExecutor::ParallelReduce(const std::vector<float> &objs,
202 const std::function<float(float a, float b)> &redfunc)
203{
204 if (GetPoolSize() > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)) {
205 Warning("TThreadExecutor::ParallelReduce",
206 "tbb::global_control is limiting the number of parallel workers."
207 " Proceeding with %zu threads this time",
208 tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism));
209 }
210 return fTaskArenaW->Access().execute([&] { return ROOT::Internal::ParallelReduceHelper<float>(objs, redfunc); });
211}
212
213//////////////////////////////////////////////////////////////////////////
214/// \brief Returns the number of worker threads in the task arena.
215/// \return the number of worker threads assigned to the task arena.
217{
218 return fTaskArenaW->TaskArenaSize();
219}
220
221} // namespace ROOT
#define b(i)
Definition: RSha256.hxx:100
#define f(i)
Definition: RSha256.hxx:104
unsigned int UInt_t
Definition: RtypesCore.h:46
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition: TError.cxx:231
void ParallelFor(unsigned start, unsigned end, unsigned step, const std::function< void(unsigned int i)> &f)
Execute a function in parallel over the indices of a loop.
unsigned GetPoolSize() const
Returns the number of worker threads in the task arena.
std::shared_ptr< ROOT::Internal::RTaskArenaWrapper > fTaskArenaW
Pointer to the TBB task arena wrapper.
TThreadExecutor(UInt_t nThreads=0u)
Class constructor.
double ParallelReduce(const std::vector< double > &objs, const std::function< double(double a, double b)> &redfunc)
"Reduce" in parallel an std::vector<double> into a single double value
static T ParallelReduceHelper(const std::vector< T > &objs, const std::function< T(T a, T b)> &redfunc)
A helper function to implement the TThreadExecutor::ParallelReduce methods.
std::shared_ptr< ROOT::Internal::RTaskArenaWrapper > GetGlobalTaskArena(unsigned maxConcurrency=0)
Factory function returning a shared pointer to the instance of the global RTaskArenaWrapper.
Definition: RTaskArena.cxx:113
double T(double x)
Definition: ChebyshevPol.h:34
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
Definition: RExports.h:150
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
auto * a
Definition: textangle.C:12