Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TThreadedObject.hxx
Go to the documentation of this file.
1// @(#)root/thread:$Id$
2// Author: Danilo Piparo, CERN 11/2/2016
3
4/*************************************************************************
5 * Copyright (C) 1995-2018, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#ifndef ROOT_TThreadedObject
13#define ROOT_TThreadedObject
14
15#include "ROOT/TSpinMutex.hxx"
16#include "TDirectory.h"
17#include "TError.h"
18#include "TList.h"
19#include "TROOT.h"
20
21
22#include <algorithm>
23#include <exception>
24#include <deque>
25#include <functional>
26#include <map>
27#include <memory>
28#include <mutex>
29#include <sstream>
30#include <string>
31#include <thread>
32#include <vector>
33
34class TH1;
35
36namespace ROOT {
37
38 /**
39 * \class ROOT::TNumSlots
40 * \brief Defines the number of threads in some of ROOT's interfaces.
41 */
42 struct TNumSlots {
43 unsigned int fVal; // number of slots
44 friend bool operator==(TNumSlots lhs, TNumSlots rhs) { return lhs.fVal == rhs.fVal; }
45 friend bool operator!=(TNumSlots lhs, TNumSlots rhs) { return lhs.fVal != rhs.fVal; }
46 };
47
48 namespace Internal {
49
50 namespace TThreadedObjectUtils {
51
52
53 template<typename T, bool ISHISTO = std::is_base_of<TH1,T>::value>
54 struct Detacher{
55 static T* Detach(T* obj) {
56 return obj;
57 }
58 };
59
60 template<typename T>
61 struct Detacher<T, true>{
62 static T* Detach(T* obj) {
63 obj->SetDirectory(nullptr);
64 obj->ResetBit(kMustCleanup);
65 return obj;
66 }
67 };
68
69 /// Return a copy of the object or a "Clone" if the copy constructor is not implemented.
70 template<class T, bool isCopyConstructible = std::is_copy_constructible<T>::value>
71 struct Cloner {
72 static T *Clone(const T *obj, TDirectory* d = nullptr) {
73 T* clone;
74 if (d){
76 clone = new T(*obj);
77 } else {
78 clone = new T(*obj);
79 }
80 return Detacher<T>::Detach(clone);
81 }
82 };
83
84 template<class T>
85 struct Cloner<T, false> {
86 static T *Clone(const T *obj, TDirectory* d = nullptr) {
87 T* clone;
88 if (d){
90 clone = (T*)obj->Clone();
91 } else {
92 clone = (T*)obj->Clone();
93 }
94 return clone;
95 }
96 };
97
98 template <class T, bool ISHISTO = std::is_base_of<TH1, T>::value>
99 struct DirCreator {
101 {
102 static unsigned dirCounter = 0;
103 const std::string dirName = "__TThreaded_dir_" + std::to_string(dirCounter++) + "_";
104 return gROOT->mkdir(dirName.c_str());
105 }
106 };
107
108 template <class T>
109 struct DirCreator<T, true> {
110 static TDirectory *Create() { return nullptr; }
111 };
112
113 } // End of namespace TThreadedObjectUtils
114 } // End of namespace Internal
115
116 namespace TThreadedObjectUtils {
117
118 template<class T>
119 using MergeFunctionType = std::function<void(std::shared_ptr<T>, std::vector<std::shared_ptr<T>>&)>;
120
121 /// Merge TObjects
122 template<class T>
123 void MergeTObjects(std::shared_ptr<T> target, std::vector<std::shared_ptr<T>> &objs)
124 {
125 if (!target) return;
126 TList objTList;
127 // Cannot do better than this
128 for (auto obj : objs) {
129 if (obj && obj != target) objTList.Add(obj.get());
130 }
131 target->Merge(&objTList);
132 }
133 } // end of namespace TThreadedObjectUtils
134
135 /**
136 * \class ROOT::TThreadedObject
137 * \brief A wrapper to make object instances thread private, lazily.
138 * \tparam T Class of the object to be made thread private (e.g. TH1F)
139 * \ingroup Multicore
140 *
141 * A wrapper which makes objects thread private. The methods of the underlying
142 * object can be invoked via the arrow operator. The object is created in
143 * a specific thread lazily, i.e. upon invocation of one of its methods.
144 * The correct object pointer from within a particular thread can be accessed
145 * with the overloaded arrow operator or with the Get method.
146 * In case an elaborate thread management is in place, e.g. in presence of
147 * stream of operations or "processing slots", it is also possible to
148 * manually select the correct object pointer explicitly.
149 */
150 template<class T>
152 public:
153 /// The initial number of empty processing slots that a TThreadedObject is constructed with by default.
154 /// Deprecated: TThreadedObject grows as more slots are required.
155 static constexpr const TNumSlots fgMaxSlots{64};
156
158
159 /// Construct the TThreadedObject with initSlots empty slots and the "model" of the thread private objects.
160 /// \param initSlots Set the initial number of slots of the TThreadedObject.
161 /// \tparam ARGS Arguments of the constructor of T
162 ///
163 /// This form of the constructor is useful to manually pre-set the content of a given number of slots
164 /// when used in combination with TThreadedObject::SetAtSlot().
165 template <class... ARGS>
166 TThreadedObject(TNumSlots initSlots, ARGS &&... args) : fIsMerged(false)
167 {
168 const auto nSlots = initSlots.fVal;
169 fObjPointers.resize(nSlots);
170
171 // create at least one directory (we need it for fModel), plus others as needed by the size of fObjPointers
173 for (auto i = 1u; i < nSlots; ++i)
175
177 fModel.reset(Internal::TThreadedObjectUtils::Detacher<T>::Detach(new T(std::forward<ARGS>(args)...)));
178 }
179
180 /// Construct the TThreadedObject and the "model" of the thread private objects.
181 /// \tparam ARGS Arguments of the constructor of T
182 template<class ...ARGS>
184
185 /// Return the number of currently available slot.
186 ///
187 /// The method is safe to call concurrently to other TThreadedObject methods.
188 /// Note that slots could be available but contain no data (i.e. a nullptr) if
189 /// they have not been used yet.
190 unsigned GetNSlots() const
191 {
192 std::lock_guard<ROOT::TSpinMutex> lg(fSpinMutex);
193 return fObjPointers.size();
194 }
195
196 /// Access a particular processing slot.
197 ///
198 /// This method is thread-safe as long as concurrent calls request different slots (i.e. pass a different
199 /// argument) and no thread accesses slot `i` via the arrow operator, so mixing usage of GetAtSlot
200 /// with usage of the arrow operator can be dangerous.
201 std::shared_ptr<T> GetAtSlot(unsigned i)
202 {
203 std::size_t nAvailableSlots;
204 {
205 // fObjPointers can grow due to a concurrent operation on this TThreadedObject, need to lock
206 std::lock_guard<ROOT::TSpinMutex> lg(fSpinMutex);
207 nAvailableSlots = fObjPointers.size();
208 }
209
210 if (i >= nAvailableSlots) {
211 Warning("TThreadedObject::GetAtSlot", "This slot does not exist.");
212 return nullptr;
213 }
214
215 auto &objPointer = fObjPointers[i];
216 if (!objPointer)
218 return objPointer;
219 }
220
221 /// Set the value of a particular slot.
222 ///
223 /// This method is thread-safe as long as concurrent calls access different slots (i.e. pass a different
224 /// argument) and no thread accesses slot `i` via the arrow operator, so mixing usage of SetAtSlot
225 /// with usage of the arrow operator can be dangerous.
226 void SetAtSlot(unsigned i, std::shared_ptr<T> v)
227 {
228 std::size_t nAvailableSlots;
229 {
230 // fObjPointers can grow due to a concurrent operation on this TThreadedObject, need to lock
231 std::lock_guard<ROOT::TSpinMutex> lg(fSpinMutex);
232 nAvailableSlots = fObjPointers.size();
233 }
234
235 if (i >= nAvailableSlots) {
236 Warning("TThreadedObject::SetAtSlot", "This slot does not exist, doing nothing.");
237 return;
238 }
239
240 fObjPointers[i] = v;
241 }
242
243 /// Access a particular slot which corresponds to a single thread.
244 /// This is in general faster than the GetAtSlot method but it is
245 /// responsibility of the caller to make sure that the slot exists
246 /// and to check that the contained object is initialized (and not
247 /// a nullptr).
248 std::shared_ptr<T> GetAtSlotUnchecked(unsigned i) const
249 {
250 return fObjPointers[i];
251 }
252
253 /// Access a particular slot which corresponds to a single thread.
254 /// This overload is faster than the GetAtSlotUnchecked method but
255 /// the caller is responsible to make sure that the slot exists, to
256 /// check that the contained object is initialized and that the returned
257 /// pointer will not outlive the TThreadedObject that returned it, which
258 /// maintains ownership of the actual object.
259 T* GetAtSlotRaw(unsigned i) const
260 {
261 return fObjPointers[i].get();
262 }
263
264 /// Access the pointer corresponding to the current slot. This method is
265 /// not adequate for being called inside tight loops as it implies a
266 /// lookup in a mapping between the threadIDs and the slot indices.
267 /// A good practice consists in copying the pointer onto the stack and
268 /// proceed with the loop as shown in this work item (psudo-code) which
269 /// will be sent to different threads:
270 /// ~~~{.cpp}
271 /// auto workItem = [](){
272 /// auto objPtr = tthreadedObject.Get();
273 /// for (auto i : ROOT::TSeqI(1000)) {
274 /// // tthreadedObject->FastMethod(i); // don't do this! Inefficient!
275 /// objPtr->FastMethod(i);
276 /// }
277 /// }
278 /// ~~~
279 std::shared_ptr<T> Get()
280 {
282 }
283
284 /// Access the wrapped object and allow to call its methods.
286 {
287 return Get().get();
288 }
289
290 /// Merge all the thread private objects. Can be called once: it does not
291 /// create any new object but destroys the present bookkeping collapsing
292 /// all objects into the one at slot 0.
293 std::shared_ptr<T> Merge(TThreadedObjectUtils::MergeFunctionType<T> mergeFunction = TThreadedObjectUtils::MergeTObjects<T>)
294 {
295 // We do not return if we already merged.
296 if (fIsMerged) {
297 Warning("TThreadedObject::Merge", "This object was already merged. Returning the previous result.");
298 return fObjPointers[0];
299 }
300 // need to convert to std::vector because historically mergeFunction requires a vector
301 auto vecOfObjPtrs = std::vector<std::shared_ptr<T>>(fObjPointers.begin(), fObjPointers.end());
302 mergeFunction(fObjPointers[0], vecOfObjPtrs);
303 fIsMerged = true;
304 return fObjPointers[0];
305 }
306
307 /// Merge all the thread private objects. Can be called many times. It
308 /// does create a new instance of class T to represent the "Sum" object.
309 /// This method is not thread safe: correct or acceptable behaviours
310 /// depend on the nature of T and of the merging function.
311 std::unique_ptr<T> SnapshotMerge(TThreadedObjectUtils::MergeFunctionType<T> mergeFunction = TThreadedObjectUtils::MergeTObjects<T>)
312 {
313 if (fIsMerged) {
314 Warning("TThreadedObject::SnapshotMerge", "This object was already merged. Returning the previous result.");
315 return std::unique_ptr<T>(Internal::TThreadedObjectUtils::Cloner<T>::Clone(fObjPointers[0].get()));
316 }
318 std::shared_ptr<T> targetPtrShared(targetPtr, [](T *) {});
319 // need to convert to std::vector because historically mergeFunction requires a vector
320 auto vecOfObjPtrs = std::vector<std::shared_ptr<T>>(fObjPointers.begin(), fObjPointers.end());
321 mergeFunction(targetPtrShared, vecOfObjPtrs);
322 return std::unique_ptr<T>(targetPtr);
323 }
324
325 private:
326 std::unique_ptr<T> fModel; ///< Use to store a "model" of the object
327 // std::deque's guarantee that references to the elements are not invalidated when appending new slots
328 std::deque<std::shared_ptr<T>> fObjPointers; ///< An object pointer per slot
329 // If the object is a histogram, we also create dummy directories that the histogram associates with
330 // so we do not pollute gDirectory
331 std::deque<TDirectory*> fDirectories; ///< A TDirectory per slot
332 std::map<std::thread::id, unsigned> fThrIDSlotMap; ///< A mapping between the thread IDs and the slots
333 mutable ROOT::TSpinMutex fSpinMutex; ///< Protects concurrent access to fThrIDSlotMap, fObjPointers
334 bool fIsMerged : 1; ///< Remember if the objects have been merged already
335
336 /// Get the slot number for this threadID, make a slot if needed
338 {
339 const auto thisThreadID = std::this_thread::get_id();
340 std::lock_guard<ROOT::TSpinMutex> lg(fSpinMutex);
341 const auto thisSlotNumIt = fThrIDSlotMap.find(thisThreadID);
342 if (thisSlotNumIt != fThrIDSlotMap.end())
343 return thisSlotNumIt->second;
344 const auto newIndex = fThrIDSlotMap.size();
345 fThrIDSlotMap[thisThreadID] = newIndex;
346 R__ASSERT(newIndex <= fObjPointers.size() && "This should never happen, we should create new slots as needed");
347 if (newIndex == fObjPointers.size()) {
349 fObjPointers.emplace_back(nullptr);
350 }
351 return newIndex;
352 }
353 };
354
355 template<class T>
356 constexpr const TNumSlots TThreadedObject<T>::fgMaxSlots;
357
358} // End ROOT namespace
359
360////////////////////////////////////////////////////////////////////////////////
361/// Print a TThreadedObject at the prompt:
362
363namespace cling {
364 template<class T>
365 std::string printValue(ROOT::TThreadedObject<T> *val)
366 {
367 auto model = ((std::unique_ptr<T>*)(val))->get();
368 std::ostringstream ret;
369 ret << "A wrapper to make object instances thread private, lazily. "
370 << "The model which is replicated is " << printValue(model);
371 return ret.str();
372 }
373}
374
375
376#endif
#define d(i)
Definition RSha256.hxx:102
#define R__ASSERT(e)
Definition TError.h:120
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:231
@ kMustCleanup
Definition TObject.h:355
#define gROOT
Definition TROOT.h:406
typedef void((*Func_t)())
A spin mutex class which respects the STL interface for mutexes.
A wrapper to make object instances thread private, lazily.
std::map< std::thread::id, unsigned > fThrIDSlotMap
A mapping between the thread IDs and the slots.
T * operator->()
Access the wrapped object and allow to call its methods.
void SetAtSlot(unsigned i, std::shared_ptr< T > v)
Set the value of a particular slot.
static constexpr const TNumSlots fgMaxSlots
The initial number of empty processing slots that a TThreadedObject is constructed with by default.
std::shared_ptr< T > Merge(TThreadedObjectUtils::MergeFunctionType< T > mergeFunction=TThreadedObjectUtils::MergeTObjects< T >)
Merge all the thread private objects.
std::deque< TDirectory * > fDirectories
A TDirectory per slot.
std::shared_ptr< T > GetAtSlot(unsigned i)
Access a particular processing slot.
std::shared_ptr< T > Get()
Access the pointer corresponding to the current slot.
ROOT::TSpinMutex fSpinMutex
Protects concurrent access to fThrIDSlotMap, fObjPointers.
TThreadedObject(const TThreadedObject &)=delete
std::unique_ptr< T > fModel
Use to store a "model" of the object.
std::deque< std::shared_ptr< T > > fObjPointers
An object pointer per slot.
std::unique_ptr< T > SnapshotMerge(TThreadedObjectUtils::MergeFunctionType< T > mergeFunction=TThreadedObjectUtils::MergeTObjects< T >)
Merge all the thread private objects.
std::shared_ptr< T > GetAtSlotUnchecked(unsigned i) const
Access a particular slot which corresponds to a single thread.
unsigned GetThisSlotNumber()
Get the slot number for this threadID, make a slot if needed.
TThreadedObject(TNumSlots initSlots, ARGS &&... args)
Construct the TThreadedObject with initSlots empty slots and the "model" of the thread private object...
T * GetAtSlotRaw(unsigned i) const
Access a particular slot which corresponds to a single thread.
unsigned GetNSlots() const
Return the number of currently available slot.
TThreadedObject(ARGS &&... args)
Construct the TThreadedObject and the "model" of the thread private objects.
bool fIsMerged
Remember if the objects have been merged already.
Small helper to keep current directory context.
Definition TDirectory.h:52
Describe directory structure in memory.
Definition TDirectory.h:45
virtual TDirectory * mkdir(const char *name, const char *title="", Bool_t returnExistingDirectory=kFALSE)
Create a sub-directory "a" or a hierarchy of sub-directories "a/b/c/...".
TH1 is the base class of all histogram classes in ROOT.
Definition TH1.h:58
A doubly linked list.
Definition TList.h:44
virtual void Add(TObject *obj)
Definition TList.h:87
void MergeTObjects(std::shared_ptr< T > target, std::vector< std::shared_ptr< T > > &objs)
Merge TObjects.
std::function< void(std::shared_ptr< T >, std::vector< std::shared_ptr< T > > &)> MergeFunctionType
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
static T * Clone(const T *obj, TDirectory *d=nullptr)
Return a copy of the object or a "Clone" if the copy constructor is not implemented.
static T * Clone(const T *obj, TDirectory *d=nullptr)
Defines the number of threads in some of ROOT's interfaces.
friend bool operator!=(TNumSlots lhs, TNumSlots rhs)
friend bool operator==(TNumSlots lhs, TNumSlots rhs)
#define ARGS(alist)
Definition gifencode.c:10