Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RTaskArena.cxx
Go to the documentation of this file.
1// Require TBB without captured exceptions
2#define TBB_USE_CAPTURED_EXCEPTION 0
3
4#include "ROOT/RTaskArena.hxx"
6#include "TError.h"
7#include "TROOT.h"
8#include "TSystem.h"
9#include "TThread.h"
10#include <fstream>
11#include <mutex>
12#include <string>
13#include <thread>
14#include "tbb/task_arena.h"
15#define TBB_PREVIEW_GLOBAL_CONTROL 1 // required for TBB versions preceding 2019_U4
16#include "tbb/global_control.h"
17
18//////////////////////////////////////////////////////////////////////////
19///
20/// \class ROOT::Internal::RTaskArenaWrapper
21/// \ingroup Parallelism
22/// \brief Wrapper over tbb::task_arena
23///
24/// This class is a wrapper over tbb::task_arena, in order to keep
25/// TBB away from ROOT's headers. We keep a single global instance to be
26/// used by any parallel ROOT class with TBB as a backend.
27///
28/// TThreadExecutor, IMT and any class relying on TBB will get a pointer
29/// to the scheduler through `ROOT::Internal::GetGlobalTaskArena()`, which
30/// will return areference to the only pointer to the TBB scheduler that
31/// will be active in any ROOT Process.
32///
33/// #### Examples:
34/// ~~~{.cpp}
35/// root[] auto gTA = ROOT::Internal::GetGlobalTaskArena(nWorkers) //get a shared_ptr to the global arena and initialize
36/// //it with nWorkers. Enable thread safety in ROOT
37/// root[] gTA->TaskArenaSize() // Get the current size of the arena (number of worker threads)
38/// root[] gTA->Access() //std::unique_ptr to the internal tbb::task_arena for interacting directly with it (needed to
39/// //call operations such as execute)
40/// root[] gTA->Access().max_concurrency() // call to tbb::task_arena::max_concurrency()
41/// ~~~
42///
43//////////////////////////////////////////////////////////////////////////
44
45namespace ROOT {
46namespace Internal {
47
48// Honor environment variable `ROOT_MAX_THREADS` if set.
49// Also honor cgroup quotas if set: see https://github.com/oneapi-src/oneTBB/issues/190
51{
52 if (const char *envMaxThreads = gSystem->Getenv("ROOT_MAX_THREADS")) {
53 char *str_end = nullptr;
54 long maxThreads = std::strtol(envMaxThreads, &str_end, 0 /*auto-detect base*/);
55 if (str_end == envMaxThreads && maxThreads == 0) {
56 Error("ROOT::Internal::LogicalCPUBandwidthControl()",
57 "cannot parse number in environment variable ROOT_MAX_THREADS; ignoring.");
58 } else if (maxThreads < 1) {
59 Error("ROOT::Internal::LogicalCPUBandwidthControl()",
60 "environment variable ROOT_MAX_THREADS must be >= 1, but set to %ld; ignoring.",
61 maxThreads);
62 } else
63 return maxThreads;
64 }
65
66#ifdef R__LINUX
67 // Check for CFS bandwith control
68 std::ifstream f("/sys/fs/cgroup/cpuacct/cpu.cfs_quota_us"); // quota file
69 if (f) {
70 float cfs_quota;
71 f >> cfs_quota;
72 f.close();
73 if (cfs_quota > 0) {
74 f.open("/sys/fs/cgroup/cpuacct/cpu.cfs_period_us"); // period file
75 float cfs_period;
76 f >> cfs_period;
77 f.close();
78 return static_cast<int>(std::ceil(cfs_quota / cfs_period));
79 }
80 }
81#endif
82 return std::thread::hardware_concurrency();
83}
84
85////////////////////////////////////////////////////////////////////////////////
86/// Initializes the tbb::task_arena within RTaskArenaWrapper.
87///
88/// * Can't be reinitialized
89/// * Checks for CPU bandwidth control and avoids oversubscribing
90/// * If no BC in place and maxConcurrency<1, defaults to the default tbb number of threads,
91/// which is CPU affinity aware
92////////////////////////////////////////////////////////////////////////////////
93RTaskArenaWrapper::RTaskArenaWrapper(unsigned maxConcurrency) : fTBBArena(new ROpaqueTaskArena{})
94{
95 const unsigned tbbDefaultNumberThreads = fTBBArena->max_concurrency(); // not initialized, automatic state
96 maxConcurrency = maxConcurrency > 0 ? std::min(maxConcurrency, tbbDefaultNumberThreads) : tbbDefaultNumberThreads;
97 const unsigned bcCpus = LogicalCPUBandwidthControl();
98 if (maxConcurrency > bcCpus) {
99 Warning("RTaskArenaWrapper", "CPU Bandwith Control Active. Proceeding with %d threads accordingly", bcCpus);
100 maxConcurrency = bcCpus;
101 }
102 if (maxConcurrency > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)) {
103 Warning("RTaskArenaWrapper", "tbb::global_control is active, limiting the number of parallel workers"
104 "from this task arena available for execution.");
105 }
106 fTBBArena->initialize(maxConcurrency);
107 fNWorkers = maxConcurrency;
109}
110
112{
113 fNWorkers = 0u;
114}
115
116unsigned RTaskArenaWrapper::fNWorkers = 0u;
117
119{
120 return fNWorkers;
121}
122////////////////////////////////////////////////////////////////////////////////
123/// Provides access to the wrapped tbb::task_arena.
124////////////////////////////////////////////////////////////////////////////////
126{
127 return *fTBBArena;
128}
129
130std::shared_ptr<ROOT::Internal::RTaskArenaWrapper> GetGlobalTaskArena(unsigned maxConcurrency)
131{
132 static std::weak_ptr<ROOT::Internal::RTaskArenaWrapper> weak_GTAWrapper;
133
134 static std::mutex m;
135 const std::lock_guard<std::mutex> lock{m};
136 if (auto sp = weak_GTAWrapper.lock()) {
137 if (maxConcurrency && (sp->TaskArenaSize() != maxConcurrency)) {
138 Warning("RTaskArenaWrapper", "There's already an active task arena. Proceeding with the current %d threads",
139 sp->TaskArenaSize());
140 }
141 return sp;
142 }
143 std::shared_ptr<ROOT::Internal::RTaskArenaWrapper> sp(new ROOT::Internal::RTaskArenaWrapper(maxConcurrency));
144 weak_GTAWrapper = sp;
145 return sp;
146}
147
148} // namespace Internal
149} // namespace ROOT
#define f(i)
Definition RSha256.hxx:104
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:185
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:229
R__EXTERN TSystem * gSystem
Definition TSystem.h:555
Wrapper for tbb::task_arena.
ROOT::ROpaqueTaskArena & Access()
Provides access to the wrapped tbb::task_arena.
RTaskArenaWrapper(unsigned maxConcurrency=0)
Initializes the tbb::task_arena within RTaskArenaWrapper.
std::unique_ptr< ROOT::ROpaqueTaskArena > fTBBArena
virtual const char * Getenv(const char *env)
Get environment variable.
Definition TSystem.cxx:1665
int LogicalCPUBandwidthControl()
Returns the available number of logical cores.
std::shared_ptr< ROOT::Internal::RTaskArenaWrapper > GetGlobalTaskArena(unsigned maxConcurrency=0)
Factory function returning a shared pointer to the instance of the global RTaskArenaWrapper.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
void EnableThreadSafety()
Enable support for multi-threading within the ROOT code in particular, enables the global mutex to ma...
Definition TROOT.cxx:501
TMarker m
Definition textangle.C:8