Logo ROOT   6.10/09
Reference Guide
TThreadedObject.hxx
Go to the documentation of this file.
1 // @(#)root/thread:$Id$
2 // Author: Danilo Piparo, CERN 11/2/2016
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2016, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TThreadedObject
13 #define ROOT_TThreadedObject
14 
15 #include "TList.h"
16 #include "TError.h"
17 
18 #include <functional>
19 #include <map>
20 #include <memory>
21 #include <mutex>
22 #include <string>
23 #include <thread>
24 #include <vector>
25 
26 #include "ROOT/TSpinMutex.hxx"
27 #include "TROOT.h"
28 
29 namespace ROOT {
30 
31  namespace Internal {
32 
33  namespace TThreadedObjectUtils {
34 
35  /// Get the unique index identifying a TThreadedObject.
36  inline unsigned GetTThreadedObjectIndex() {
37  static unsigned fgTThreadedObjectIndex = 0;
38  return fgTThreadedObjectIndex++;
39  }
40 
41  /// Return a copy of the object or a "Clone" if the copy constructor is not implemented.
42  template<class T, bool isCopyConstructible = std::is_copy_constructible<T>::value>
43  struct Cloner {
44  static T *Clone(const T *obj, TDirectory* d = nullptr) {
45  T* clone;
46  if (d){
47  TDirectory::TContext ctxt(d);
48  clone = new T(*obj);
49  } else {
50  clone = new T(*obj);
51  }
52  return clone;
53  }
54  };
55 
56  template<class T>
57  struct Cloner<T, false> {
58  static T *Clone(const T *obj, TDirectory* d = nullptr) {
59  T* clone;
60  if (d){
61  TDirectory::TContext ctxt(d);
62  clone = (T*)obj->Clone();
63  } else {
64  clone = (T*)obj->Clone();
65  }
66  return clone;
67  }
68  };
69 
70  } // End of namespace TThreadedObjectUtils
71  } // End of namespace Internals
72 
73  namespace TThreadedObjectUtils {
74 
75  template<class T>
76  using MergeFunctionType = std::function<void(std::shared_ptr<T>, std::vector<std::shared_ptr<T>>&)>;
77  /// Merge TObjects
78  template<class T>
79  void MergeTObjects(std::shared_ptr<T> target, std::vector<std::shared_ptr<T>> &objs)
80  {
81  if (!target) return;
82  TList objTList;
83  // Cannot do better than this
84  for (auto obj : objs) {
85  if (obj && obj != target) objTList.Add(obj.get());
86  }
87  target->Merge(&objTList);
88  }
89  } // end of namespace TThreadedObjectUtils
90 
91  /**
92  * \class ROOT::TThreadedObject
93  * \brief A wrapper to make object instances thread private, lazily.
94  * \tparam T Class of the object to be made thread private (e.g. TH1F)
95  * \ingroup Multicore
96  *
97  * A wrapper which makes objects thread private. The methods of the underlying
98  * object can be invoked via the the arrow operator. The object is created in
99  * a specific thread lazily, i.e. upon invocation of one of its methods.
100  * The correct object pointer from within a particular thread can be accessed
101  * with the overloaded arrow operator or with the Get method.
102  * In case an elaborate thread management is in place, e.g. in presence of
103  * stream of operations or "processing slots", it is also possible to
104  * manually select the correct object pointer explicitly.
105  */
106  template<class T>
108  public:
109  static unsigned fgMaxSlots; ///< The maximum number of processing slots (distinct threads) which the instances can manage
110  TThreadedObject(const TThreadedObject&) = delete;
111  /// Construct the TThreaded object and the "model" of the thread private
112  /// objects.
113  /// \tparam ARGS Arguments of the constructor of T
114  template<class ...ARGS>
115  TThreadedObject(ARGS&&... args): fObjPointers(fgMaxSlots, nullptr)
116  {
117  fDirectories.reserve(fgMaxSlots);
118 
119  std::string dirName = "__TThreaded_dir_";
120  dirName += std::to_string(ROOT::Internal::TThreadedObjectUtils::GetTThreadedObjectIndex()) + "_";
121  for (unsigned i=0; i< fgMaxSlots;++i) {
122  fDirectories.emplace_back(gROOT->mkdir((dirName+std::to_string(i)).c_str()));
123  }
124 
125  TDirectory::TContext ctxt(fDirectories[0]);
126  fModel.reset(new T(std::forward<ARGS>(args)...));
127  }
128 
129  /// Access a particular processing slot. This
130  /// method is *thread-unsafe*: it cannot be invoked from two different
131  /// threads with the same argument.
132  std::shared_ptr<T> GetAtSlot(unsigned i)
133  {
134  if ( i >= fObjPointers.size()) {
135  Warning("TThreadedObject::GetAtSlot", "Maximum number of slots reached.");
136  return nullptr;
137  }
138  auto objPointer = fObjPointers[i];
139  if (!objPointer) {
140  objPointer.reset(Internal::TThreadedObjectUtils::Cloner<T>::Clone(fModel.get(), fDirectories[i]));
141  fObjPointers[i] = objPointer;
142  }
143  return objPointer;
144  }
145 
146  /// Set the value of a particular slot.
147  void SetAtSlot(unsigned i, std::shared_ptr<T> v)
148  {
149  fObjPointers[i] = v;
150  }
151 
152  /// Access a particular slot which corresponds to a single thread.
153  /// This is in general faster than the GetAtSlot method but it is
154  /// responsibility of the caller to make sure that an object is
155  /// initialised for the particular slot.
156  std::shared_ptr<T> GetAtSlotUnchecked(unsigned i) const
157  {
158  return fObjPointers[i];
159  }
160 
161  /// Access the pointer corresponding to the current slot. This method is
162  /// not adequate for being called inside tight loops as it implies a
163  /// lookup in a mapping between the threadIDs and the slot indices.
164  /// A good practice consists in copying the pointer onto the stack and
165  /// proceed with the loop as shown in this work item (psudo-code) which
166  /// will be sent to different threads:
167  /// ~~~{.cpp}
168  /// auto workItem = [](){
169  /// auto objPtr = tthreadedObject.Get();
170  /// for (auto i : ROOT::TSeqI(1000)) {
171  /// // tthreadedObject->FastMethod(i); // don't do this! Inefficient!
172  /// objPtr->FastMethod(i);
173  /// }
174  /// }
175  /// ~~~
176  std::shared_ptr<T> Get()
177  {
178  return GetAtSlot(GetThisSlotNumber());
179  }
180 
181  /// Access the wrapped object and allow to call its methods.
183  {
184  return Get().get();
185  }
186 
187  /// Merge all the thread private objects. Can be called once: it does not
188  /// create any new object but destroys the present bookkeping collapsing
189  /// all objects into the one at slot 0.
190  std::shared_ptr<T> Merge(TThreadedObjectUtils::MergeFunctionType<T> mergeFunction = TThreadedObjectUtils::MergeTObjects<T>)
191  {
192  // We do not return if we already merged.
193  if (fIsMerged) {
194  Warning("TThreadedObject::Merge", "This object was already merged. Returning the previous result.");
195  return fObjPointers[0];
196  }
197  mergeFunction(fObjPointers[0], fObjPointers);
198  fIsMerged = true;
199  return fObjPointers[0];
200  }
201 
202  /// Merge all the thread private objects. Can be called many times. It
203  /// does create a new instance of class T to represent the "Sum" object.
204  /// This method is not thread safe: correct or acceptable behaviours
205  /// depend on the nature of T and of the merging function.
206  std::unique_ptr<T> SnapshotMerge(TThreadedObjectUtils::MergeFunctionType<T> mergeFunction = TThreadedObjectUtils::MergeTObjects<T>)
207  {
208  if (fIsMerged) {
209  Warning("TThreadedObject::SnapshotMerge", "This object was already merged. Returning the previous result.");
210  return std::unique_ptr<T>(Internal::TThreadedObjectUtils::Cloner<T>::Clone(fObjPointers[0].get()));
211  }
212  auto targetPtr = Internal::TThreadedObjectUtils::Cloner<T>::Clone(fModel.get());
213  std::shared_ptr<T> targetPtrShared(targetPtr, [](T *) {});
214  mergeFunction(targetPtrShared, fObjPointers);
215  return std::unique_ptr<T>(targetPtr);
216  }
217 
218  private:
219  std::unique_ptr<T> fModel; ///< Use to store a "model" of the object
220  std::vector<std::shared_ptr<T>> fObjPointers; ///< A pointer per thread is kept.
221  std::vector<TDirectory*> fDirectories; ///< A TDirectory per thread is kept.
222  std::map<std::thread::id, unsigned> fThrIDSlotMap; ///< A mapping between the thread IDs and the slots
223  unsigned fCurrMaxSlotIndex = 0; ///< The maximum slot index
224  bool fIsMerged = false; ///< Remember if the objects have been merged already
225  ROOT::TSpinMutex fThrIDSlotMutex; ///< Mutex to protect the ID-slot map access
226 
227  /// Get the slot number for this threadID.
228  unsigned GetThisSlotNumber()
229  {
230  const auto thisThreadID = std::this_thread::get_id();
231  unsigned thisIndex;
232  {
233  std::lock_guard<ROOT::TSpinMutex> lg(fThrIDSlotMutex);
234  auto thisSlotNumIt = fThrIDSlotMap.find(thisThreadID);
235  if (thisSlotNumIt != fThrIDSlotMap.end()) return thisSlotNumIt->second;
236  thisIndex = fCurrMaxSlotIndex++;
237  fThrIDSlotMap[thisThreadID] = thisIndex;
238  }
239  return thisIndex;
240  }
241 
242  };
243 
244  template<class T> unsigned TThreadedObject<T>::fgMaxSlots = 64;
245 
246 } // End ROOT namespace
247 
248 #include <sstream>
249 
250 ////////////////////////////////////////////////////////////////////////////////
251 /// Print a TThreadedObject at the prompt:
252 
253 namespace cling {
254  template<class T>
256  {
257  auto model = ((std::unique_ptr<T>*)(val))->get();
258  std::ostringstream ret;
259  ret << "A wrapper to make object instances thread private, lazily. "
260  << "The model which is replicated is " << printValue(model);
261  return ret.str();
262  }
263 }
264 
265 
266 #endif
std::shared_ptr< T > GetAtSlotUnchecked(unsigned i) const
Access a particular slot which corresponds to a single thread.
#define ARGS(alist)
Definition: gifencode.c:10
T * operator->()
Access the wrapped object and allow to call its methods.
Namespace for new ROOT classes and functions.
Definition: StringConv.hxx:21
double T(double x)
Definition: ChebyshevPol.h:34
A wrapper to make object instances thread private, lazily.
#define gROOT
Definition: TROOT.h:375
std::string printValue(ROOT::TThreadedObject< T > *val)
A spin mutex class which respects the STL interface for mutexes.
Definition: TSpinMutex.hxx:40
std::unique_ptr< T > fModel
Use to store a "model" of the object.
std::shared_ptr< T > Get()
Access the pointer corresponding to the current slot.
void SetAtSlot(unsigned i, std::shared_ptr< T > v)
Set the value of a particular slot.
std::vector< std::shared_ptr< T > > fObjPointers
A pointer per thread is kept.
std::map< std::thread::id, unsigned > fThrIDSlotMap
A mapping between the thread IDs and the slots.
A doubly linked list.
Definition: TList.h:43
unsigned GetThisSlotNumber()
Get the slot number for this threadID.
SVector< double, 2 > v
Definition: Dict.h:5
static unsigned fgMaxSlots
The maximum number of processing slots (distinct threads) which the instances can manage...
std::shared_ptr< T > GetAtSlot(unsigned i)
Access a particular processing slot.
void Warning(const char *location, const char *msgfmt,...)
void MergeTObjects(std::shared_ptr< T > target, std::vector< std::shared_ptr< T >> &objs)
Merge TObjects.
static T * Clone(const T *obj, TDirectory *d=nullptr)
Describe directory structure in memory.
Definition: TDirectory.h:34
Print a TSeq at the prompt:
Definition: TDatime.h:115
std::shared_ptr< T > Merge(TThreadedObjectUtils::MergeFunctionType< T > mergeFunction=TThreadedObjectUtils::MergeTObjects< T >)
Merge all the thread private objects.
std::vector< TDirectory * > fDirectories
A TDirectory per thread is kept.
ROOT::TSpinMutex fThrIDSlotMutex
Mutex to protect the ID-slot map access.
std::function< void(std::shared_ptr< T >, std::vector< std::shared_ptr< T > > &)> MergeFunctionType
virtual void Add(TObject *obj)
Definition: TList.h:77
static T * Clone(const T *obj, TDirectory *d=nullptr)
unsigned GetTThreadedObjectIndex()
Get the unique index identifying a TThreadedObject.
TThreadedObject(ARGS &&... args)
Construct the TThreaded object and the "model" of the thread private objects.
Return a copy of the object or a "Clone" if the copy constructor is not implemented.
std::unique_ptr< T > SnapshotMerge(TThreadedObjectUtils::MergeFunctionType< T > mergeFunction=TThreadedObjectUtils::MergeTObjects< T >)
Merge all the thread private objects.