Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RTaskArena.cxx
Go to the documentation of this file.
1#include "ROOT/RTaskArena.hxx"
3#include "TError.h"
4#include "TROOT.h"
5#include "TThread.h"
6#include <fstream>
7#include <mutex>
8#include <thread>
9#include "tbb/task_arena.h"
10#define TBB_PREVIEW_GLOBAL_CONTROL 1 // required for TBB versions preceding 2019_U4
11#include "tbb/global_control.h"
12
13//////////////////////////////////////////////////////////////////////////
14///
15/// \class ROOT::Internal::RTaskArenaWrapper
16/// \ingroup Parallelism
17/// \brief Wrapper over tbb::task_arena
18///
19/// This class is a wrapper over tbb::task_arena, in order to keep
20/// TBB away from ROOT's headers. We keep a single global instance to be
21/// used by any parallel ROOT class with TBB as a backend.
22///
23/// TThreadExecutor, IMT and any class relying on TBB will get a pointer
24/// to the scheduler through `ROOT::Internal::GetGlobalTaskArena()`, which
25/// will return areference to the only pointer to the TBB scheduler that
26/// will be active in any ROOT Process.
27///
28/// #### Examples:
29/// ~~~{.cpp}
30/// root[] auto gTA = ROOT::Internal::GetGlobalTaskArena(nWorkers) //get a shared_ptr to the global arena and initialize
31/// //it with nWorkers. Enable thread safety in ROOT
32/// root[] gTA->TaskArenaSize() // Get the current size of the arena (number of worker threads)
33/// root[] gTA->Access() //std::unique_ptr to the internal tbb::task_arena for interacting directly with it (needed to
34/// //call operations such as execute)
35/// root[] gTA->Access().max_concurrency() // call to tbb::task_arena::max_concurrency()
36/// ~~~
37///
38//////////////////////////////////////////////////////////////////////////
39
40namespace ROOT {
41namespace Internal {
42
44{
45#ifdef R__LINUX
46 // Check for CFS bandwith control
47 std::ifstream f("/sys/fs/cgroup/cpuacct/cpu.cfs_quota_us"); // quota file
48 if (f) {
49 float cfs_quota;
50 f >> cfs_quota;
51 f.close();
52 if (cfs_quota > 0) {
53 f.open("/sys/fs/cgroup/cpuacct/cpu.cfs_period_us"); // period file
54 float cfs_period;
55 f >> cfs_period;
56 f.close();
57 return static_cast<int>(std::ceil(cfs_quota / cfs_period));
58 }
59 }
60#endif
61 return std::thread::hardware_concurrency();
62}
63
64////////////////////////////////////////////////////////////////////////////////
65/// Initializes the tbb::task_arena within RTaskArenaWrapper.
66///
67/// * Can't be reinitialized
68/// * Checks for CPU bandwidth control and avoids oversubscribing
69/// * If no BC in place and maxConcurrency<1, defaults to the default tbb number of threads,
70/// which is CPU affinity aware
71////////////////////////////////////////////////////////////////////////////////
72RTaskArenaWrapper::RTaskArenaWrapper(unsigned maxConcurrency) : fTBBArena(new ROpaqueTaskArena{})
73{
74 const unsigned tbbDefaultNumberThreads = fTBBArena->max_concurrency(); // not initialized, automatic state
75 maxConcurrency = maxConcurrency > 0 ? std::min(maxConcurrency, tbbDefaultNumberThreads) : tbbDefaultNumberThreads;
76 const unsigned bcCpus = LogicalCPUBandwithControl();
77 if (maxConcurrency > bcCpus) {
78 Warning("RTaskArenaWrapper", "CPU Bandwith Control Active. Proceeding with %d threads accordingly", bcCpus);
79 maxConcurrency = bcCpus;
80 }
81 if (maxConcurrency > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)) {
82 Warning("RTaskArenaWrapper", "tbb::global_control is active, limiting the number of parallel workers"
83 "from this task arena available for execution.");
84 }
85 fTBBArena->initialize(maxConcurrency);
86 fNWorkers = maxConcurrency;
88}
89
91{
92 fNWorkers = 0u;
93}
94
96
98{
99 return fNWorkers;
100}
101////////////////////////////////////////////////////////////////////////////////
102/// Provides access to the wrapped tbb::task_arena.
103////////////////////////////////////////////////////////////////////////////////
105{
106 return *fTBBArena;
107}
108
109std::shared_ptr<ROOT::Internal::RTaskArenaWrapper> GetGlobalTaskArena(unsigned maxConcurrency)
110{
111 static std::weak_ptr<ROOT::Internal::RTaskArenaWrapper> weak_GTAWrapper;
112
113 static std::mutex m;
114 const std::lock_guard<std::mutex> lock{m};
115 if (auto sp = weak_GTAWrapper.lock()) {
116 if (maxConcurrency && (sp->TaskArenaSize() != maxConcurrency)) {
117 Warning("RTaskArenaWrapper", "There's already an active task arena. Proceeding with the current %d threads",
118 sp->TaskArenaSize());
119 }
120 return sp;
121 }
122 std::shared_ptr<ROOT::Internal::RTaskArenaWrapper> sp(new ROOT::Internal::RTaskArenaWrapper(maxConcurrency));
123 weak_GTAWrapper = sp;
124 return sp;
125}
126
127} // namespace Internal
128} // namespace ROOT
#define f(i)
Definition RSha256.hxx:104
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:231
Wrapper for tbb::task_arena.
ROOT::ROpaqueTaskArena & Access()
Provides access to the wrapped tbb::task_arena.
RTaskArenaWrapper(unsigned maxConcurrency=0)
Initializes the tbb::task_arena within RTaskArenaWrapper.
std::unique_ptr< ROOT::ROpaqueTaskArena > fTBBArena
std::shared_ptr< ROOT::Internal::RTaskArenaWrapper > GetGlobalTaskArena(unsigned maxConcurrency=0)
Factory function returning a shared pointer to the instance of the global RTaskArenaWrapper.
int LogicalCPUBandwithControl()
Returns the available number of logical cores.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
void EnableThreadSafety()
Enables the global mutex to make ROOT thread safe/aware.
Definition TROOT.cxx:494
auto * m
Definition textangle.C:8