Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TThreadedObject.hxx
Go to the documentation of this file.
1// @(#)root/thread:$Id$
2// Author: Danilo Piparo, CERN 11/2/2016
3
4/*************************************************************************
5 * Copyright (C) 1995-2018, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#ifndef ROOT_TThreadedObject
13#define ROOT_TThreadedObject
14
15#include "ROOT/TSpinMutex.hxx"
16#include "TDirectory.h"
17#include "TError.h"
18#include "TList.h"
19#include "TROOT.h"
20
21
22#include <algorithm>
23#include <exception>
24#include <deque>
25#include <functional>
26#include <map>
27#include <memory>
28#include <mutex>
29#include <sstream>
30#include <string>
31#include <thread>
32#include <vector>
33
34class TH1;
35
36namespace ROOT {
37
38 /**
39 * \class ROOT::TNumSlots
40 * \brief Defines the number of threads in some of ROOT's interfaces.
41 */
42 struct TNumSlots {
43 unsigned int fVal; // number of slots
44 friend bool operator==(TNumSlots lhs, TNumSlots rhs) { return lhs.fVal == rhs.fVal; }
45 friend bool operator!=(TNumSlots lhs, TNumSlots rhs) { return lhs.fVal != rhs.fVal; }
46 };
47
48 namespace Internal {
49
50 namespace TThreadedObjectUtils {
51
52
53 template<typename T, bool ISHISTO = std::is_base_of<TH1,T>::value>
54 struct Detacher{
55 static T* Detach(T* obj) {
56 return obj;
57 }
58 };
59
60 template<typename T>
61 struct Detacher<T, true>{
62 static T* Detach(T* obj) {
63 obj->SetDirectory(nullptr);
64 obj->ResetBit(kMustCleanup);
65 return obj;
66 }
67 };
68
69 /// Return a copy of the object or a "Clone" if the copy constructor is not implemented.
70 template<class T, bool isCopyConstructible = std::is_copy_constructible<T>::value>
71 struct Cloner {
72 static T *Clone(const T *obj, TDirectory* d = nullptr) {
73 T* clone;
74 if (d){
76 clone = new T(*obj);
77 } else {
78 clone = new T(*obj);
79 }
80 return Detacher<T>::Detach(clone);
81 }
82 };
83
84 template<class T>
85 struct Cloner<T, false> {
86 static T *Clone(const T *obj, TDirectory* d = nullptr) {
87 T* clone;
88 if (d){
90 clone = (T*)obj->Clone();
91 } else {
92 clone = (T*)obj->Clone();
93 }
94 return clone;
95 }
96 };
97
98 template <class T, bool ISHISTO = std::is_base_of<TH1, T>::value>
99 struct DirCreator {
101 {
102 static unsigned dirCounter = 0;
103 const std::string dirName = "__TThreaded_dir_" + std::to_string(dirCounter++) + "_";
104 return gROOT->mkdir(dirName.c_str());
105 }
106 };
107
108 template <class T>
109 struct DirCreator<T, true> {
110 static TDirectory *Create() { return nullptr; }
111 };
112
113 } // End of namespace TThreadedObjectUtils
114 } // End of namespace Internal
115
116 namespace TThreadedObjectUtils {
117
118 template<class T>
119 using MergeFunctionType = std::function<void(std::shared_ptr<T>, std::vector<std::shared_ptr<T>>&)>;
120
121 /// Merge TObjects
122 template<class T>
123 void MergeTObjects(std::shared_ptr<T> target, std::vector<std::shared_ptr<T>> &objs)
124 {
125 if (!target) return;
126 TList objTList;
127 // Cannot do better than this
128 for (auto obj : objs) {
129 if (obj && obj != target) objTList.Add(obj.get());
130 }
131 target->Merge(&objTList);
132 }
133 } // end of namespace TThreadedObjectUtils
134
135 /**
136 * \class ROOT::TThreadedObject
137 * \brief A wrapper to make object instances thread private, lazily.
138 * \tparam T Class of the object to be made thread private (e.g. TH1F)
139 * \ingroup Parallelism
140 *
141 * A wrapper which makes objects thread private. The methods of the underlying
142 * object can be invoked via the arrow operator. The object is created in
143 * a specific thread lazily, i.e. upon invocation of one of its methods.
144 * The correct object pointer from within a particular thread can be accessed
145 * with the overloaded arrow operator or with the Get method.
146 * In case an elaborate thread management is in place, e.g. in presence of
147 * stream of operations or "processing slots", it is also possible to
148 * manually select the correct object pointer explicitly.
149 */
150 template<class T>
152 public:
153 /// The initial number of empty processing slots that a TThreadedObject is constructed with by default.
154 /// Deprecated: TThreadedObject grows as more slots are required.
155 static constexpr const TNumSlots fgMaxSlots{64};
156
158
159 /// Construct the TThreadedObject with initSlots empty slots and the "model" of the thread private objects.
160 /// \param initSlots Set the initial number of slots of the TThreadedObject.
161 /// \tparam ARGS Arguments' class type of the constructor of T
162 /// \param args variadic arguments
163 ///
164 /// This form of the constructor is useful to manually pre-set the content of a given number of slots
165 /// when used in combination with TThreadedObject::SetAtSlot().
166 template <class... ARGS>
167 TThreadedObject(TNumSlots initSlots, ARGS &&... args) : fIsMerged(false)
168 {
169 const auto nSlots = initSlots.fVal;
170 fObjPointers.resize(nSlots);
171
172 // create at least one directory (we need it for fModel), plus others as needed by the size of fObjPointers
174 for (auto i = 1u; i < nSlots; ++i)
176
178 fModel.reset(Internal::TThreadedObjectUtils::Detacher<T>::Detach(new T(std::forward<ARGS>(args)...)));
179 }
180
181 /// Construct the TThreadedObject and the "model" of the thread private objects.
182 /// \tparam ARGS Arguments of the constructor of T
183 template<class ...ARGS>
185
186 /// Return the number of currently available slot.
187 ///
188 /// The method is safe to call concurrently to other TThreadedObject methods.
189 /// Note that slots could be available but contain no data (i.e. a nullptr) if
190 /// they have not been used yet.
191 unsigned GetNSlots() const
192 {
193 std::lock_guard<ROOT::TSpinMutex> lg(fSpinMutex);
194 return fObjPointers.size();
195 }
196
197 /// Access a particular processing slot.
198 ///
199 /// This method is thread-safe as long as concurrent calls request different slots (i.e. pass a different
200 /// argument) and no thread accesses slot `i` via the arrow operator, so mixing usage of GetAtSlot
201 /// with usage of the arrow operator can be dangerous.
202 std::shared_ptr<T> GetAtSlot(unsigned i)
203 {
204 std::size_t nAvailableSlots;
205 {
206 // fObjPointers can grow due to a concurrent operation on this TThreadedObject, need to lock
207 std::lock_guard<ROOT::TSpinMutex> lg(fSpinMutex);
208 nAvailableSlots = fObjPointers.size();
209 }
210
211 if (i >= nAvailableSlots) {
212 Warning("TThreadedObject::GetAtSlot", "This slot does not exist.");
213 return nullptr;
214 }
215
216 auto &objPointer = fObjPointers[i];
217 if (!objPointer)
219 return objPointer;
220 }
221
222 /// Set the value of a particular slot.
223 ///
224 /// This method is thread-safe as long as concurrent calls access different slots (i.e. pass a different
225 /// argument) and no thread accesses slot `i` via the arrow operator, so mixing usage of SetAtSlot
226 /// with usage of the arrow operator can be dangerous.
227 void SetAtSlot(unsigned i, std::shared_ptr<T> v)
228 {
229 std::size_t nAvailableSlots;
230 {
231 // fObjPointers can grow due to a concurrent operation on this TThreadedObject, need to lock
232 std::lock_guard<ROOT::TSpinMutex> lg(fSpinMutex);
233 nAvailableSlots = fObjPointers.size();
234 }
235
236 if (i >= nAvailableSlots) {
237 Warning("TThreadedObject::SetAtSlot", "This slot does not exist, doing nothing.");
238 return;
239 }
240
241 fObjPointers[i] = v;
242 }
243
244 /// Access a particular slot which corresponds to a single thread.
245 /// This is in general faster than the GetAtSlot method but it is
246 /// responsibility of the caller to make sure that the slot exists
247 /// and to check that the contained object is initialized (and not
248 /// a nullptr).
249 std::shared_ptr<T> GetAtSlotUnchecked(unsigned i) const
250 {
251 return fObjPointers[i];
252 }
253
254 /// Access a particular slot which corresponds to a single thread.
255 /// This overload is faster than the GetAtSlotUnchecked method but
256 /// the caller is responsible to make sure that the slot exists, to
257 /// check that the contained object is initialized and that the returned
258 /// pointer will not outlive the TThreadedObject that returned it, which
259 /// maintains ownership of the actual object.
260 T* GetAtSlotRaw(unsigned i) const
261 {
262 return fObjPointers[i].get();
263 }
264
265 /// Access the pointer corresponding to the current slot. This method is
266 /// not adequate for being called inside tight loops as it implies a
267 /// lookup in a mapping between the threadIDs and the slot indices.
268 /// A good practice consists in copying the pointer onto the stack and
269 /// proceed with the loop as shown in this work item (psudo-code) which
270 /// will be sent to different threads:
271 /// ~~~{.cpp}
272 /// auto workItem = [](){
273 /// auto objPtr = tthreadedObject.Get();
274 /// for (auto i : ROOT::TSeqI(1000)) {
275 /// // tthreadedObject->FastMethod(i); // don't do this! Inefficient!
276 /// objPtr->FastMethod(i);
277 /// }
278 /// }
279 /// ~~~
280 std::shared_ptr<T> Get()
281 {
283 }
284
285 /// Access the wrapped object and allow to call its methods.
287 {
288 return Get().get();
289 }
290
291 /// Merge all the thread private objects. Can be called once: it does not
292 /// create any new object but destroys the present bookkeping collapsing
293 /// all objects into the one at slot 0.
294 std::shared_ptr<T> Merge(TThreadedObjectUtils::MergeFunctionType<T> mergeFunction = TThreadedObjectUtils::MergeTObjects<T>)
295 {
296 // We do not return if we already merged.
297 if (fIsMerged) {
298 Warning("TThreadedObject::Merge", "This object was already merged. Returning the previous result.");
299 return fObjPointers[0];
300 }
301 // need to convert to std::vector because historically mergeFunction requires a vector
302 auto vecOfObjPtrs = std::vector<std::shared_ptr<T>>(fObjPointers.begin(), fObjPointers.end());
303 mergeFunction(fObjPointers[0], vecOfObjPtrs);
304 fIsMerged = true;
305 return fObjPointers[0];
306 }
307
308 /// Merge all the thread private objects. Can be called many times. It
309 /// does create a new instance of class T to represent the "Sum" object.
310 /// This method is not thread safe: correct or acceptable behaviours
311 /// depend on the nature of T and of the merging function.
312 std::unique_ptr<T> SnapshotMerge(TThreadedObjectUtils::MergeFunctionType<T> mergeFunction = TThreadedObjectUtils::MergeTObjects<T>)
313 {
314 if (fIsMerged) {
315 Warning("TThreadedObject::SnapshotMerge", "This object was already merged. Returning the previous result.");
316 return std::unique_ptr<T>(Internal::TThreadedObjectUtils::Cloner<T>::Clone(fObjPointers[0].get()));
317 }
319 std::shared_ptr<T> targetPtrShared(targetPtr, [](T *) {});
320 // need to convert to std::vector because historically mergeFunction requires a vector
321 auto vecOfObjPtrs = std::vector<std::shared_ptr<T>>(fObjPointers.begin(), fObjPointers.end());
322 mergeFunction(targetPtrShared, vecOfObjPtrs);
323 return std::unique_ptr<T>(targetPtr);
324 }
325
326 private:
327 std::unique_ptr<T> fModel; ///< Use to store a "model" of the object
328 // std::deque's guarantee that references to the elements are not invalidated when appending new slots
329 std::deque<std::shared_ptr<T>> fObjPointers; ///< An object pointer per slot
330 // If the object is a histogram, we also create dummy directories that the histogram associates with
331 // so we do not pollute gDirectory
332 std::deque<TDirectory*> fDirectories; ///< A TDirectory per slot
333 std::map<std::thread::id, unsigned> fThrIDSlotMap; ///< A mapping between the thread IDs and the slots
334 mutable ROOT::TSpinMutex fSpinMutex; ///< Protects concurrent access to fThrIDSlotMap, fObjPointers
335 bool fIsMerged : 1; ///< Remember if the objects have been merged already
336
337 /// Get the slot number for this threadID, make a slot if needed
339 {
340 const auto thisThreadID = std::this_thread::get_id();
341 std::lock_guard<ROOT::TSpinMutex> lg(fSpinMutex);
342 const auto thisSlotNumIt = fThrIDSlotMap.find(thisThreadID);
343 if (thisSlotNumIt != fThrIDSlotMap.end())
344 return thisSlotNumIt->second;
345 const auto newIndex = fThrIDSlotMap.size();
346 fThrIDSlotMap[thisThreadID] = newIndex;
347 R__ASSERT(newIndex <= fObjPointers.size() && "This should never happen, we should create new slots as needed");
348 if (newIndex == fObjPointers.size()) {
350 fObjPointers.emplace_back(nullptr);
351 }
352 return newIndex;
353 }
354 };
355
356 template<class T>
357 constexpr const TNumSlots TThreadedObject<T>::fgMaxSlots;
358
359} // End ROOT namespace
360
361////////////////////////////////////////////////////////////////////////////////
362/// Print a TThreadedObject at the prompt:
363
364namespace cling {
365 template<class T>
366 std::string printValue(ROOT::TThreadedObject<T> *val)
367 {
368 auto model = ((std::unique_ptr<T>*)(val))->get();
369 std::ostringstream ret;
370 ret << "A wrapper to make object instances thread private, lazily. "
371 << "The model which is replicated is " << printValue(model);
372 return ret.str();
373 }
374}
375
376
377#endif
#define d(i)
Definition RSha256.hxx:102
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:229
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t target
@ kMustCleanup
Definition TObject.h:368
#define gROOT
Definition TROOT.h:406
A spin mutex class which respects the STL interface for mutexes.
A wrapper to make object instances thread private, lazily.
std::map< std::thread::id, unsigned > fThrIDSlotMap
A mapping between the thread IDs and the slots.
T * operator->()
Access the wrapped object and allow to call its methods.
void SetAtSlot(unsigned i, std::shared_ptr< T > v)
Set the value of a particular slot.
static constexpr const TNumSlots fgMaxSlots
The initial number of empty processing slots that a TThreadedObject is constructed with by default.
std::shared_ptr< T > Merge(TThreadedObjectUtils::MergeFunctionType< T > mergeFunction=TThreadedObjectUtils::MergeTObjects< T >)
Merge all the thread private objects.
std::deque< TDirectory * > fDirectories
A TDirectory per slot.
std::shared_ptr< T > GetAtSlot(unsigned i)
Access a particular processing slot.
std::shared_ptr< T > Get()
Access the pointer corresponding to the current slot.
ROOT::TSpinMutex fSpinMutex
Protects concurrent access to fThrIDSlotMap, fObjPointers.
TThreadedObject(const TThreadedObject &)=delete
std::unique_ptr< T > fModel
Use to store a "model" of the object.
std::deque< std::shared_ptr< T > > fObjPointers
An object pointer per slot.
std::unique_ptr< T > SnapshotMerge(TThreadedObjectUtils::MergeFunctionType< T > mergeFunction=TThreadedObjectUtils::MergeTObjects< T >)
Merge all the thread private objects.
std::shared_ptr< T > GetAtSlotUnchecked(unsigned i) const
Access a particular slot which corresponds to a single thread.
unsigned GetThisSlotNumber()
Get the slot number for this threadID, make a slot if needed.
TThreadedObject(TNumSlots initSlots, ARGS &&... args)
Construct the TThreadedObject with initSlots empty slots and the "model" of the thread private object...
T * GetAtSlotRaw(unsigned i) const
Access a particular slot which corresponds to a single thread.
unsigned GetNSlots() const
Return the number of currently available slot.
TThreadedObject(ARGS &&... args)
Construct the TThreadedObject and the "model" of the thread private objects.
bool fIsMerged
Remember if the objects have been merged already.
TDirectory::TContext keeps track and restore the current directory.
Definition TDirectory.h:89
Describe directory structure in memory.
Definition TDirectory.h:45
virtual TDirectory * mkdir(const char *name, const char *title="", Bool_t returnExistingDirectory=kFALSE)
Create a sub-directory "a" or a hierarchy of sub-directories "a/b/c/...".
TH1 is the base class of all histogram classes in ROOT.
Definition TH1.h:59
A doubly linked list.
Definition TList.h:38
void Add(TObject *obj) override
Definition TList.h:81
void MergeTObjects(std::shared_ptr< T > target, std::vector< std::shared_ptr< T > > &objs)
Merge TObjects.
std::function< void(std::shared_ptr< T >, std::vector< std::shared_ptr< T > > &)> MergeFunctionType
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
static T * Clone(const T *obj, TDirectory *d=nullptr)
Return a copy of the object or a "Clone" if the copy constructor is not implemented.
static T * Clone(const T *obj, TDirectory *d=nullptr)
Defines the number of threads in some of ROOT's interfaces.
friend bool operator!=(TNumSlots lhs, TNumSlots rhs)
friend bool operator==(TNumSlots lhs, TNumSlots rhs)
#define ARGS(alist)
Definition gifencode.c:10