Logo ROOT   6.14/05
Reference Guide
TThreadedObject.hxx
Go to the documentation of this file.
1 // @(#)root/thread:$Id$
2 // Author: Danilo Piparo, CERN 11/2/2016
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2016, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TThreadedObject
13 #define ROOT_TThreadedObject
14 
15 #include "TList.h"
16 #include "TError.h"
17 
18 #include <functional>
19 #include <map>
20 #include <memory>
21 #include <mutex>
22 #include <string>
23 #include <thread>
24 #include <vector>
25 
26 #include "ROOT/TSpinMutex.hxx"
27 #include "TROOT.h"
28 
29 class TH1;
30 
31 namespace ROOT {
32 
33  namespace Internal {
34 
35  namespace TThreadedObjectUtils {
36 
37  /// Get the unique index identifying a TThreadedObject.
38  inline unsigned GetTThreadedObjectIndex() {
39  static unsigned fgTThreadedObjectIndex = 0;
40  return fgTThreadedObjectIndex++;
41  }
42 
43  template<typename T, bool ISHISTO = std::is_base_of<TH1,T>::value>
44  struct Detacher{
45  static T* Detach(T* obj) {
46  return obj;
47  }
48  };
49 
50  template<typename T>
51  struct Detacher<T, true>{
52  static T* Detach(T* obj) {
53  obj->SetDirectory(nullptr);
54  obj->ResetBit(kMustCleanup);
55  return obj;
56  }
57  };
58 
59  /// Return a copy of the object or a "Clone" if the copy constructor is not implemented.
60  template<class T, bool isCopyConstructible = std::is_copy_constructible<T>::value>
61  struct Cloner {
62  static T *Clone(const T *obj, TDirectory* d = nullptr) {
63  T* clone;
64  if (d){
65  TDirectory::TContext ctxt(d);
66  clone = new T(*obj);
67  } else {
68  clone = new T(*obj);
69  }
70  return Detacher<T>::Detach(clone);
71  }
72  };
73 
74  template<class T>
75  struct Cloner<T, false> {
76  static T *Clone(const T *obj, TDirectory* d = nullptr) {
77  T* clone;
78  if (d){
79  TDirectory::TContext ctxt(d);
80  clone = (T*)obj->Clone();
81  } else {
82  clone = (T*)obj->Clone();
83  }
84  return clone;
85  }
86  };
87 
88  template<class T, bool ISHISTO = std::is_base_of<TH1,T>::value>
89  struct DirCreator{
90  static std::vector<TDirectory*> Create(unsigned maxSlots) {
91  std::string dirName = "__TThreaded_dir_";
92  dirName += std::to_string(ROOT::Internal::TThreadedObjectUtils::GetTThreadedObjectIndex()) + "_";
93  std::vector<TDirectory*> dirs;
94  dirs.reserve(maxSlots);
95  for (unsigned i=0; i< maxSlots;++i) {
96  auto dir = gROOT->mkdir((dirName+std::to_string(i)).c_str());
97  dirs.emplace_back(dir);
98  }
99  return dirs;
100  }
101  };
102 
103  template<class T>
104  struct DirCreator<T, true>{
105  static std::vector<TDirectory*> Create(unsigned maxSlots) {
106  std::vector<TDirectory*> dirs(maxSlots, nullptr);
107  return dirs;
108  }
109  };
110 
111  } // End of namespace TThreadedObjectUtils
112  } // End of namespace Internals
113 
114  namespace TThreadedObjectUtils {
115 
116  template<class T>
117  using MergeFunctionType = std::function<void(std::shared_ptr<T>, std::vector<std::shared_ptr<T>>&)>;
118  /// Merge TObjects
119  template<class T>
120  void MergeTObjects(std::shared_ptr<T> target, std::vector<std::shared_ptr<T>> &objs)
121  {
122  if (!target) return;
123  TList objTList;
124  // Cannot do better than this
125  for (auto obj : objs) {
126  if (obj && obj != target) objTList.Add(obj.get());
127  }
128  target->Merge(&objTList);
129  }
130  } // end of namespace TThreadedObjectUtils
131 
132  /**
133  * \class ROOT::TThreadedObject
134  * \brief A wrapper to make object instances thread private, lazily.
135  * \tparam T Class of the object to be made thread private (e.g. TH1F)
136  * \ingroup Multicore
137  *
138  * A wrapper which makes objects thread private. The methods of the underlying
139  * object can be invoked via the the arrow operator. The object is created in
140  * a specific thread lazily, i.e. upon invocation of one of its methods.
141  * The correct object pointer from within a particular thread can be accessed
142  * with the overloaded arrow operator or with the Get method.
143  * In case an elaborate thread management is in place, e.g. in presence of
144  * stream of operations or "processing slots", it is also possible to
145  * manually select the correct object pointer explicitly.
146  */
147  template<class T>
149  public:
150  static unsigned fgMaxSlots; ///< The maximum number of processing slots (distinct threads) which the instances can manage
151  TThreadedObject(const TThreadedObject&) = delete;
152  /// Construct the TThreaded object and the "model" of the thread private
153  /// objects.
154  /// \tparam ARGS Arguments of the constructor of T
155  template<class ...ARGS>
156  TThreadedObject(ARGS&&... args): fObjPointers(fgMaxSlots, nullptr)
157  {
159 
160  TDirectory::TContext ctxt(fDirectories[0]);
161  fModel.reset(Internal::TThreadedObjectUtils::Detacher<T>::Detach(new T(std::forward<ARGS>(args)...)));
162  }
163 
164  /// Access a particular processing slot. This
165  /// method is *thread-unsafe*: it cannot be invoked from two different
166  /// threads with the same argument.
167  std::shared_ptr<T> GetAtSlot(unsigned i)
168  {
169  if ( i >= fObjPointers.size()) {
170  Warning("TThreadedObject::GetAtSlot", "Maximum number of slots reached.");
171  return nullptr;
172  }
173  auto objPointer = fObjPointers[i];
174  if (!objPointer) {
175  objPointer.reset(Internal::TThreadedObjectUtils::Cloner<T>::Clone(fModel.get(), fDirectories[i]));
176  fObjPointers[i] = objPointer;
177  }
178  return objPointer;
179  }
180 
181  /// Set the value of a particular slot.
182  void SetAtSlot(unsigned i, std::shared_ptr<T> v)
183  {
184  fObjPointers[i] = v;
185  }
186 
187  /// Access a particular slot which corresponds to a single thread.
188  /// This is in general faster than the GetAtSlot method but it is
189  /// responsibility of the caller to make sure that an object is
190  /// initialised for the particular slot.
191  std::shared_ptr<T> GetAtSlotUnchecked(unsigned i) const
192  {
193  return fObjPointers[i];
194  }
195 
196  /// Access a particular slot which corresponds to a single thread.
197  /// This overload is faster than the GetAtSlotUnchecked method but
198  /// the caller is responsible to make sure that an object is
199  /// initialised for the particular slot and that the returned pointer
200  /// will not outlive the TThreadedObject that returned it.
201  T* GetAtSlotRaw(unsigned i) const
202  {
203  return fObjPointers[i].get();
204  }
205 
206  /// Access the pointer corresponding to the current slot. This method is
207  /// not adequate for being called inside tight loops as it implies a
208  /// lookup in a mapping between the threadIDs and the slot indices.
209  /// A good practice consists in copying the pointer onto the stack and
210  /// proceed with the loop as shown in this work item (psudo-code) which
211  /// will be sent to different threads:
212  /// ~~~{.cpp}
213  /// auto workItem = [](){
214  /// auto objPtr = tthreadedObject.Get();
215  /// for (auto i : ROOT::TSeqI(1000)) {
216  /// // tthreadedObject->FastMethod(i); // don't do this! Inefficient!
217  /// objPtr->FastMethod(i);
218  /// }
219  /// }
220  /// ~~~
221  std::shared_ptr<T> Get()
222  {
223  return GetAtSlot(GetThisSlotNumber());
224  }
225 
226  /// Access the wrapped object and allow to call its methods.
228  {
229  return Get().get();
230  }
231 
232  /// Merge all the thread private objects. Can be called once: it does not
233  /// create any new object but destroys the present bookkeping collapsing
234  /// all objects into the one at slot 0.
235  std::shared_ptr<T> Merge(TThreadedObjectUtils::MergeFunctionType<T> mergeFunction = TThreadedObjectUtils::MergeTObjects<T>)
236  {
237  // We do not return if we already merged.
238  if (fIsMerged) {
239  Warning("TThreadedObject::Merge", "This object was already merged. Returning the previous result.");
240  return fObjPointers[0];
241  }
242  mergeFunction(fObjPointers[0], fObjPointers);
243  fIsMerged = true;
244  return fObjPointers[0];
245  }
246 
247  /// Merge all the thread private objects. Can be called many times. It
248  /// does create a new instance of class T to represent the "Sum" object.
249  /// This method is not thread safe: correct or acceptable behaviours
250  /// depend on the nature of T and of the merging function.
251  std::unique_ptr<T> SnapshotMerge(TThreadedObjectUtils::MergeFunctionType<T> mergeFunction = TThreadedObjectUtils::MergeTObjects<T>)
252  {
253  if (fIsMerged) {
254  Warning("TThreadedObject::SnapshotMerge", "This object was already merged. Returning the previous result.");
255  return std::unique_ptr<T>(Internal::TThreadedObjectUtils::Cloner<T>::Clone(fObjPointers[0].get()));
256  }
257  auto targetPtr = Internal::TThreadedObjectUtils::Cloner<T>::Clone(fModel.get());
258  std::shared_ptr<T> targetPtrShared(targetPtr, [](T *) {});
259  mergeFunction(targetPtrShared, fObjPointers);
260  return std::unique_ptr<T>(targetPtr);
261  }
262 
263  private:
264  std::unique_ptr<T> fModel; ///< Use to store a "model" of the object
265  std::vector<std::shared_ptr<T>> fObjPointers; ///< A pointer per thread is kept.
266  std::vector<TDirectory*> fDirectories; ///< A TDirectory per thread is kept.
267  std::map<std::thread::id, unsigned> fThrIDSlotMap; ///< A mapping between the thread IDs and the slots
268  unsigned fCurrMaxSlotIndex = 0; ///< The maximum slot index
269  bool fIsMerged = false; ///< Remember if the objects have been merged already
270  ROOT::TSpinMutex fThrIDSlotMutex; ///< Mutex to protect the ID-slot map access
271 
272  /// Get the slot number for this threadID.
273  unsigned GetThisSlotNumber()
274  {
275  const auto thisThreadID = std::this_thread::get_id();
276  unsigned thisIndex;
277  {
278  std::lock_guard<ROOT::TSpinMutex> lg(fThrIDSlotMutex);
279  auto thisSlotNumIt = fThrIDSlotMap.find(thisThreadID);
280  if (thisSlotNumIt != fThrIDSlotMap.end()) return thisSlotNumIt->second;
281  thisIndex = fCurrMaxSlotIndex++;
282  fThrIDSlotMap[thisThreadID] = thisIndex;
283  }
284  return thisIndex;
285  }
286 
287  };
288 
289  template<class T> unsigned TThreadedObject<T>::fgMaxSlots = 64;
290 
291 } // End ROOT namespace
292 
293 #include <sstream>
294 
295 ////////////////////////////////////////////////////////////////////////////////
296 /// Print a TThreadedObject at the prompt:
297 
298 namespace cling {
299  template<class T>
301  {
302  auto model = ((std::unique_ptr<T>*)(val))->get();
303  std::ostringstream ret;
304  ret << "A wrapper to make object instances thread private, lazily. "
305  << "The model which is replicated is " << printValue(model);
306  return ret.str();
307  }
308 }
309 
310 
311 #endif
std::shared_ptr< T > GetAtSlotUnchecked(unsigned i) const
Access a particular slot which corresponds to a single thread.
#define ARGS(alist)
Definition: gifencode.c:10
T * operator->()
Access the wrapped object and allow to call its methods.
Namespace for new ROOT classes and functions.
Definition: StringConv.hxx:21
double T(double x)
Definition: ChebyshevPol.h:34
A wrapper to make object instances thread private, lazily.
#define gROOT
Definition: TROOT.h:410
std::string printValue(ROOT::TThreadedObject< T > *val)
A spin mutex class which respects the STL interface for mutexes.
Definition: TSpinMutex.hxx:40
std::unique_ptr< T > fModel
Use to store a "model" of the object.
std::shared_ptr< T > Get()
Access the pointer corresponding to the current slot.
void SetAtSlot(unsigned i, std::shared_ptr< T > v)
Set the value of a particular slot.
std::vector< std::shared_ptr< T > > fObjPointers
A pointer per thread is kept.
std::map< std::thread::id, unsigned > fThrIDSlotMap
A mapping between the thread IDs and the slots.
static std::vector< TDirectory * > Create(unsigned maxSlots)
A doubly linked list.
Definition: TList.h:44
T * GetAtSlotRaw(unsigned i) const
Access a particular slot which corresponds to a single thread.
unsigned GetThisSlotNumber()
Get the slot number for this threadID.
SVector< double, 2 > v
Definition: Dict.h:5
static unsigned fgMaxSlots
The maximum number of processing slots (distinct threads) which the instances can manage...
std::shared_ptr< T > GetAtSlot(unsigned i)
Access a particular processing slot.
void Warning(const char *location, const char *msgfmt,...)
void MergeTObjects(std::shared_ptr< T > target, std::vector< std::shared_ptr< T >> &objs)
Merge TObjects.
#define d(i)
Definition: RSha256.hxx:102
static T * Clone(const T *obj, TDirectory *d=nullptr)
Describe directory structure in memory.
Definition: TDirectory.h:34
Print a TSeq at the prompt:
Definition: TDatime.h:115
std::shared_ptr< T > Merge(TThreadedObjectUtils::MergeFunctionType< T > mergeFunction=TThreadedObjectUtils::MergeTObjects< T >)
Merge all the thread private objects.
The TH1 histogram class.
Definition: TH1.h:56
std::vector< TDirectory * > fDirectories
A TDirectory per thread is kept.
ROOT::TSpinMutex fThrIDSlotMutex
Mutex to protect the ID-slot map access.
std::function< void(std::shared_ptr< T >, std::vector< std::shared_ptr< T > > &)> MergeFunctionType
virtual void Add(TObject *obj)
Definition: TList.h:87
static T * Clone(const T *obj, TDirectory *d=nullptr)
unsigned GetTThreadedObjectIndex()
Get the unique index identifying a TThreadedObject.
TThreadedObject(ARGS &&... args)
Construct the TThreaded object and the "model" of the thread private objects.
Return a copy of the object or a "Clone" if the copy constructor is not implemented.
std::unique_ptr< T > SnapshotMerge(TThreadedObjectUtils::MergeFunctionType< T > mergeFunction=TThreadedObjectUtils::MergeTObjects< T >)
Merge all the thread private objects.
static std::vector< TDirectory * > Create(unsigned maxSlots)