Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RTaskArena.cxx
Go to the documentation of this file.
1// Require TBB without captured exceptions
2#define TBB_USE_CAPTURED_EXCEPTION 0
3
4#include "ROOT/RTaskArena.hxx"
6#include "TError.h"
7#include "TROOT.h"
8#include "TSystem.h"
9#include "TThread.h"
10#include <fstream>
11#include <mutex>
12#include <string>
13#include <thread>
14#include <cmath>
15#include "tbb/task_arena.h"
16#define TBB_PREVIEW_GLOBAL_CONTROL 1 // required for TBB versions preceding 2019_U4
17#include "tbb/global_control.h"
18
19//////////////////////////////////////////////////////////////////////////
20///
21/// \class ROOT::Internal::RTaskArenaWrapper
22/// \ingroup Parallelism
23/// \brief Wrapper over tbb::task_arena
24///
25/// This class is a wrapper over tbb::task_arena, in order to keep
26/// TBB away from ROOT's headers. We keep a single global instance to be
27/// used by any parallel ROOT class with TBB as a backend.
28///
29/// TThreadExecutor, IMT and any class relying on TBB will get a pointer
30/// to the scheduler through `ROOT::Internal::GetGlobalTaskArena()`, which
31/// will return areference to the only pointer to the TBB scheduler that
32/// will be active in any ROOT Process.
33///
34/// #### Examples:
35/// ~~~{.cpp}
36/// root[] auto gTA = ROOT::Internal::GetGlobalTaskArena(nWorkers) //get a shared_ptr to the global arena and initialize
37/// //it with nWorkers. Enable thread safety in ROOT
38/// root[] gTA->TaskArenaSize() // Get the current size of the arena (number of worker threads)
39/// root[] gTA->Access() //std::unique_ptr to the internal tbb::task_arena for interacting directly with it (needed to
40/// //call operations such as execute)
41/// root[] gTA->Access().max_concurrency() // call to tbb::task_arena::max_concurrency()
42/// ~~~
43///
44//////////////////////////////////////////////////////////////////////////
45
46namespace ROOT {
47namespace Internal {
48
49// Honor environment variable `ROOT_MAX_THREADS` if set.
50// Also honor cgroup quotas if set: see https://github.com/oneapi-src/oneTBB/issues/190
52{
53 if (const char *envMaxThreads = gSystem->Getenv("ROOT_MAX_THREADS")) {
54 char *str_end = nullptr;
55 long maxThreads = std::strtol(envMaxThreads, &str_end, 0 /*auto-detect base*/);
56 if (str_end == envMaxThreads && maxThreads == 0) {
57 Error("ROOT::Internal::LogicalCPUBandwidthControl()",
58 "cannot parse number in environment variable ROOT_MAX_THREADS; ignoring.");
59 } else if (maxThreads < 1) {
60 Error("ROOT::Internal::LogicalCPUBandwidthControl()",
61 "environment variable ROOT_MAX_THREADS must be >= 1, but set to %ld; ignoring.",
63 } else
64 return maxThreads;
65 }
66
67#ifdef R__LINUX
68 // Check for CFS bandwith control
69 std::ifstream f("/sys/fs/cgroup/cpuacct/cpu.cfs_quota_us"); // quota file
70 if (f) {
71 float cfs_quota;
72 f >> cfs_quota;
73 f.close();
74 if (cfs_quota > 0) {
75 f.open("/sys/fs/cgroup/cpuacct/cpu.cfs_period_us"); // period file
76 float cfs_period;
77 f >> cfs_period;
78 f.close();
79 return static_cast<int>(std::ceil(cfs_quota / cfs_period));
80 }
81 }
82#endif
83 return std::thread::hardware_concurrency();
84}
85
86////////////////////////////////////////////////////////////////////////////////
87/// Initializes the tbb::task_arena within RTaskArenaWrapper.
88///
89/// * Can't be reinitialized
90/// * Checks for CPU bandwidth control and avoids oversubscribing
91/// * If no BC in place and maxConcurrency<1, defaults to the default tbb number of threads,
92/// which is CPU affinity aware
93////////////////////////////////////////////////////////////////////////////////
95{
96 const unsigned tbbDefaultNumberThreads = fTBBArena->max_concurrency(); // not initialized, automatic state
98 const unsigned bcCpus = LogicalCPUBandwidthControl();
99 if (maxConcurrency > bcCpus) {
100 Warning("RTaskArenaWrapper", "CPU Bandwith Control Active. Proceeding with %d threads accordingly", bcCpus);
102 }
103 if (maxConcurrency > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)) {
104 Warning("RTaskArenaWrapper", "tbb::global_control is active, limiting the number of parallel workers"
105 "from this task arena available for execution.");
106 }
107 fTBBArena->initialize(maxConcurrency);
110}
111
112////////////////////////////////////////////////////////////////////////////////
113/// Initializes the tbb::task_arena within RTaskArenaWrapper by attaching to an
114/// existing arena.
115///
116/// * Can't be reinitialized
117////////////////////////////////////////////////////////////////////////////////
119 : fTBBArena(new ROpaqueTaskArena{tbb::task_arena::attach{}})
120{
121 fTBBArena->initialize(tbb::task_arena::attach{});
122 fNWorkers = fTBBArena->max_concurrency();
124}
125
130
132
134{
135 return fNWorkers;
136}
137////////////////////////////////////////////////////////////////////////////////
138/// Provides access to the wrapped tbb::task_arena.
139////////////////////////////////////////////////////////////////////////////////
144
145std::shared_ptr<ROOT::Internal::RTaskArenaWrapper>
147{
148 static std::weak_ptr<ROOT::Internal::RTaskArenaWrapper> weak_GTAWrapper;
149
150 static std::mutex m;
151 const std::lock_guard<std::mutex> lock{m};
152 if (auto sp = weak_GTAWrapper.lock()) {
153 if (maxConcurrency && (sp->TaskArenaSize() != maxConcurrency)) {
154 Warning("RTaskArenaWrapper", "There's already an active task arena. Proceeding with the current %d threads",
155 sp->TaskArenaSize());
156 }
157 return sp;
158 }
159 std::shared_ptr<ROOT::Internal::RTaskArenaWrapper> sp;
161 sp = std::make_shared<ROOT::Internal::RTaskArenaWrapper>(ROOT::Internal::RTaskArenaWrapper::Attach{});
162 } else {
163 if (config == ROOT::EIMTConfig::kWholeMachine) {
164 maxConcurrency = 0;
165 }
166 sp = std::make_shared<ROOT::Internal::RTaskArenaWrapper>(maxConcurrency);
167 }
169 return sp;
170}
171
172std::shared_ptr<ROOT::Internal::RTaskArenaWrapper> GetGlobalTaskArena(ROOT::EIMTConfig config)
173{
174 if (config >= ROOT::EIMTConfig::kNumConfigs)
175 ::Fatal("ROOT::Internal::GetGlobalTaskArena",
176 "Unsupported enum value %d", (int)config);
177 return GetGlobalTaskArena(0, config);
178}
179
180std::shared_ptr<ROOT::Internal::RTaskArenaWrapper> GetGlobalTaskArena(unsigned maxConcurrency)
181{
183}
184
185} // namespace Internal
186} // namespace ROOT
#define f(i)
Definition RSha256.hxx:104
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:208
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:252
void Fatal(const char *location, const char *msgfmt,...)
Use this function in case of a fatal error. It will abort the program.
Definition TError.cxx:267
R__EXTERN TSystem * gSystem
Definition TSystem.h:572
ROOT::ROpaqueTaskArena & Access()
Provides access to the wrapped tbb::task_arena.
RTaskArenaWrapper(unsigned maxConcurrency=0)
Initializes the tbb::task_arena within RTaskArenaWrapper.
std::unique_ptr< ROOT::ROpaqueTaskArena > fTBBArena
virtual const char * Getenv(const char *env)
Get environment variable.
Definition TSystem.cxx:1678
int LogicalCPUBandwidthControl()
Returns the available number of logical cores.
std::shared_ptr< ROOT::Internal::RTaskArenaWrapper > GetGlobalTaskArena(unsigned maxConcurrency=0)
Factory function returning a shared pointer to the instance of the global RTaskArenaWrapper.
Namespace for new ROOT classes and functions.
void EnableThreadSafety()
Enable support for multi-threading within the ROOT code in particular, enables the global mutex to ma...
Definition TROOT.cxx:506
EIMTConfig
Definition TROOT.h:83
@ kWholeMachine
Default configuration.
@ kNumConfigs
Number of support IMT semantic configurations.
@ kExistingTBBArena
Use the existing TBB arena.
Marker for attaching to an existing tbb::task_arena.
TMarker m
Definition textangle.C:8