Logo ROOT   6.12/07
Reference Guide
TResultProxy.hxx
Go to the documentation of this file.
1 // Author: Enrico Guiraud, Danilo Piparo CERN 03/2017
2 
3 /*************************************************************************
4  * Copyright (C) 1995-2016, Rene Brun and Fons Rademakers. *
5  * All rights reserved. *
6  * *
7  * For the licensing terms see $ROOTSYS/LICENSE. *
8  * For the list of contributors see $ROOTSYS/README/CREDITS. *
9  *************************************************************************/
10 
11 #ifndef ROOT_TRESULTPROXY
12 #define ROOT_TRESULTPROXY
13 
14 #include "ROOT/TypeTraits.hxx"
15 #include "ROOT/TDFNodes.hxx"
16 #include "TError.h" // Warning
17 
18 #include <memory>
19 #include <functional>
20 
21 namespace ROOT {
22 
23 namespace Experimental {
24 namespace TDF {
25 // Fwd decl for MakeResultProxy
26 template <typename T>
28 }
29 }
30 
31 namespace Detail {
32 namespace TDF {
34 // Fwd decl for TResultProxy
35 template <typename T>
36 TResultProxy<T> MakeResultProxy(const std::shared_ptr<T> &r, const std::shared_ptr<TLoopManager> &df,
37  TDFInternal::TActionBase *actionPtr);
38 template <typename T>
39 std::pair<TResultProxy<T>, std::shared_ptr<ROOT::Internal::TDF::TActionBase *>>
40 MakeResultProxy(const std::shared_ptr<T> &r, const std::shared_ptr<TLoopManager> &df);
41 } // ns TDF
42 } // ns Detail
43 
44 namespace Experimental {
45 namespace TDF {
47 namespace TDFDetail = ROOT::Detail::TDF;
48 namespace TTraits = ROOT::TypeTraits;
49 
50 /// Smart pointer for the return type of actions
51 /**
52 \class ROOT::Experimental::TDF::TResultProxy
53 \ingroup dataframe
54 \brief A wrapper around the result of TDataFrame actions able to trigger calculations lazily.
55 \tparam T Type of the action result
56 
57 A smart pointer which allows to access the result of a TDataFrame action. The
58 methods of the encapsulated object can be accessed via the arrow operator.
59 Upon invocation of the arrow operator or dereferencing (`operator*`), the
60 loop on the events and calculations of all scheduled actions are executed
61 if needed.
62 It is possible to iterate on the result proxy if the proxied object is a collection.
63 ~~~{.cpp}
64 for (auto& myItem : myResultProxy) { ... };
65 ~~~
66 If iteration is not supported by the type of the proxied object, a compilation error is thrown.
67 
68 */
69 template <typename T>
70 class TResultProxy {
71  // private using declarations
72  using SPT_t = std::shared_ptr<T>;
73  using SPTLM_t = std::shared_ptr<TDFDetail::TLoopManager>;
74  using WPTLM_t = std::weak_ptr<TDFDetail::TLoopManager>;
75  using ShrdPtrBool_t = std::shared_ptr<bool>;
76 
77  // friend declarations
78  template <typename W>
79  friend TResultProxy<W>
80  TDFDetail::MakeResultProxy(const std::shared_ptr<W> &, const SPTLM_t &, TDFInternal::TActionBase *);
81  template <typename W>
82  friend std::pair<TResultProxy<W>, std::shared_ptr<TDFInternal::TActionBase *>>
83  TDFDetail::MakeResultProxy(const std::shared_ptr<W> &, const SPTLM_t &);
84 
85  /// \cond HIDDEN_SYMBOLS
86  template <typename V, bool isCont = TTraits::IsContainer<V>::value>
87  struct TIterationHelper {
88  using Iterator_t = void;
89  void GetBegin(const V &) { static_assert(sizeof(V) == 0, "It does not make sense to ask begin for this class."); }
90  void GetEnd(const V &) { static_assert(sizeof(V) == 0, "It does not make sense to ask end for this class."); }
91  };
92 
93  template <typename V>
94  struct TIterationHelper<V, true> {
95  using Iterator_t = decltype(std::begin(std::declval<V>()));
96  static Iterator_t GetBegin(const V &v) { return std::begin(v); };
97  static Iterator_t GetEnd(const V &v) { return std::end(v); };
98  };
99  /// \endcond
100 
101  /// State registered also in the TLoopManager until the event loop is executed
102  const ShrdPtrBool_t fReadiness = std::make_shared<bool>(false);
103  WPTLM_t fImplWeakPtr; ///< Points to the TLoopManager at the root of the functional graph
104  const SPT_t fObjPtr; ///< Shared pointer encapsulating the wrapped result
105  /// Shared_ptr to a _pointer_ to the TDF action that produces this result. It is set at construction time for
106  /// non-jitted actions, and at jitting time for jitted actions (at the time of writing, this means right
107  /// before the event-loop).
108  // N.B. what's on the heap is the _pointer_ to TActionBase, we are _not_ taking shared ownership of a TAction.
109  // This cannot be a unique_ptr because that would disallow copy-construction of TResultProxies.
110  // It cannot be just a pointer to TActionBase because we need something to store in the callback callable that will
111  // be passed to TLoopManager _before_ the pointer to TActionBase is set in the case of jitted actions.
112  const std::shared_ptr<TDFInternal::TActionBase *> fActionPtrPtr;
113 
114  /// Triggers the event loop in the TLoopManager instance to which it's associated via the fImplWeakPtr
115  void TriggerRun();
116 
117  /// Get the pointer to the encapsulated result.
118  /// Ownership is not transferred to the caller.
119  /// Triggers event loop and execution of all actions booked in the associated TLoopManager.
120  T *Get()
121  {
122  if (!*fReadiness)
123  TriggerRun();
124  return fObjPtr.get();
125  }
126 
127  TResultProxy(const SPT_t &objPtr, const ShrdPtrBool_t &readiness, const SPTLM_t &loopManager,
128  TDFInternal::TActionBase *actionPtr = nullptr)
129  : fReadiness(readiness), fImplWeakPtr(loopManager), fObjPtr(objPtr),
130  fActionPtrPtr(new (TDFInternal::TActionBase *)(actionPtr))
131  {
132  }
133 
134  std::shared_ptr<TDFInternal::TActionBase *> GetActionPtrPtr() const { return fActionPtrPtr; }
135 
136 public:
137  using Value_t = T; ///< Convenience alias to simplify access to proxied type
138  static constexpr ULong64_t kOnce = 0ull; ///< Convenience definition to express a callback must be executed once
139 
140  TResultProxy() = delete;
141  TResultProxy(const TResultProxy &) = default;
142  TResultProxy(TResultProxy &&) = default;
143 
144  /// Get a const reference to the encapsulated object.
145  /// Triggers event loop and execution of all actions booked in the associated TLoopManager.
146  const T &GetValue() { return *Get(); }
147 
148  /// Get a pointer to the encapsulated object.
149  /// Triggers event loop and execution of all actions booked in the associated TLoopManager.
150  T &operator*() { return *Get(); }
151 
152  /// Get a pointer to the encapsulated object.
153  /// Ownership is not transferred to the caller.
154  /// Triggers event loop and execution of all actions booked in the associated TLoopManager.
155  T *operator->() { return Get(); }
156 
157  /// Return an iterator to the beginning of the contained object if this makes
158  /// sense, throw a compilation error otherwise
159  typename TIterationHelper<T>::Iterator_t begin()
160  {
161  if (!*fReadiness)
162  TriggerRun();
163  return TIterationHelper<T>::GetBegin(*fObjPtr);
164  }
165 
166  /// Return an iterator to the end of the contained object if this makes
167  /// sense, throw a compilation error otherwise
168  typename TIterationHelper<T>::Iterator_t end()
169  {
170  if (!*fReadiness)
171  TriggerRun();
172  return TIterationHelper<T>::GetEnd(*fObjPtr);
173  }
174 
175  /// Register a callback that TDataFrame will execute "everyNEvents" on a partial result.
176  ///
177  /// \param[in] everyNEvents Frequency at which the callback will be called, as a number of events processed
178  /// \param[in] callback a callable with signature `void(Value_t&)` where Value_t is the type of the value contained in this TResultProxy
179  /// \return this TResultProxy, to allow chaining of OnPartialResultSlot with other calls
180  ///
181  /// The callback must be a callable (lambda, function, functor class...) that takes a reference to the result type as
182  /// argument and returns nothing. TDataFrame will invoke registered callbacks passing partial action results as
183  /// arguments to them (e.g. a histogram filled with a part of the selected events, a counter incremented only up to a
184  /// certain point, a mean over a subset of the events and so forth).
185  ///
186  /// Callbacks can be used e.g. to inspect partial results of the analysis while the event loop is running. For
187  /// example one can draw an up-to-date version of a result histogram every 100 entries like this:
188  /// \code{.cpp}
189  /// auto h = tdf.Histo1D("x");
190  /// TCanvas c("c","x hist");
191  /// h.OnPartialResult(100, [&c](TH1D &h_) { c.cd(); h_.Draw(); c.Update(); });
192  /// h->Draw(); // event loop runs here, this `Draw` is executed after the event loop is finished
193  /// \endcode
194  ///
195  /// A value of 0 for everyNEvents indicates the callback must be executed only once, before running the event loop.
196  /// A conveniece definition `kOnce` is provided to make this fact more expressive in user code (see snippet below).
197  /// Multiple callbacks can be registered with the same TResultProxy (i.e. results of TDataFrame actions) and will
198  /// be executed sequentially. Callbacks are executed in the order they were registered.
199  /// The type of the value contained in a TResultProxy is also available as TResultProxy<T>::Value_t, e.g.
200  /// \code{.cpp}
201  /// auto h = tdf.Histo1D("x");
202  /// // h.kOnce is 0
203  /// // decltype(h)::Value_t is TH1D
204  /// \endcode
205  ///
206  /// When implicit multi-threading is enabled, the callback:
207  /// - will never be executed by multiple threads concurrently: it needs not be thread-safe. For example the snippet
208  /// above that draws the partial histogram on a canvas works seamlessly in multi-thread event loops.
209  /// - will always be executed "everyNEvents": partial results will "contain" that number of events more from
210  /// one call to the next
211  /// - might be executed by a different worker thread at different times: the value of `std::this_thread::get_id()`
212  /// might change between calls
213  /// To register a callback that is called by _each_ worker thread (concurrently) every N events one can use
214  /// OnPartialResultSlot.
215  TResultProxy<T> &OnPartialResult(ULong64_t everyNEvents, std::function<void(T&)> callback)
216  {
217  auto lm = fImplWeakPtr.lock();
218  if (!lm)
219  throw std::runtime_error("The main TDataFrame is not reachable: did it go out of scope?");
220  const auto nSlots = lm->GetNSlots();
221  auto actionPtrPtr = fActionPtrPtr.get();
222  auto c = [nSlots, actionPtrPtr, callback](unsigned int slot) {
223  if (slot != nSlots - 1)
224  return;
225  auto partialResult = static_cast<Value_t*>((*actionPtrPtr)->PartialUpdate(slot));
226  callback(*partialResult);
227  };
228  lm->RegisterCallback(everyNEvents, std::move(c));
229  return *this;
230  }
231 
232  /// Register a callback that TDataFrame will execute in each worker thread concurrently on that thread's partial result.
233  ///
234  /// \param[in] everyNEvents Frequency at which the callback will be called by each thread, as a number of events processed
235  /// \param[in] a callable with signature `void(unsigned int, Value_t&)` where Value_t is the type of the value contained in this TResultProxy
236  /// \return this TResultProxy, to allow chaining of OnPartialResultSlot with other calls
237  ///
238  /// See `OnPartialResult` for a generic explanation of the callback mechanism.
239  /// Compared to `OnPartialResult`, this method has two major differences:
240  /// - all worker threads invoke the callback once every specified number of events. The event count is per-thread,
241  /// and callback invocation might happen concurrently (i.e. the callback must be thread-safe)
242  /// - the callable must take an extra `unsigned int` parameter corresponding to a multi-thread "processing slot":
243  /// this is a "helper value" to simplify writing thread-safe callbacks: different worker threads might invoke the
244  /// callback concurrently but always with different `slot` numbers.
245  /// - a value of 0 for everyNEvents indicates the callback must be executed once _per slot_.
246  ///
247  /// For example, the following snippet prints out a thread-safe progress bar of the events processed by TDataFrame
248  /// \code
249  /// auto c = tdf.Count(); // any action would do, but `Count` is the most lightweight
250  /// std::string progress;
251  /// std::mutex bar_mutex;
252  /// c.OnPartialResultSlot(nEvents / 100, [&progress, &bar_mutex](unsigned int, ULong64_t &) {
253  /// std::lock_guard<std::mutex> lg(bar_mutex);
254  /// progress.push_back('#');
255  /// std::cout << "\r[" << std::left << std::setw(100) << progress << ']' << std::flush;
256  /// });
257  /// std::cout << "Analysis running..." << std::endl;
258  /// *c; // trigger the event loop by accessing an action's result
259  /// std::cout << "\nDone!" << std::endl;
260  /// \endcode
261  TResultProxy<T> &OnPartialResultSlot(ULong64_t everyNEvents, std::function<void(unsigned int, T&)> callback)
262  {
263  auto lm = fImplWeakPtr.lock();
264  if (!lm)
265  throw std::runtime_error("The main TDataFrame is not reachable: did it go out of scope?");
266  auto actionPtrPtr = fActionPtrPtr.get();
267  auto c = [actionPtrPtr, callback](unsigned int slot) {
268  auto partialResult = static_cast<Value_t*>((*actionPtrPtr)->PartialUpdate(slot));
269  callback(slot, *partialResult);
270  };
271  lm->RegisterCallback(everyNEvents, std::move(c));
272  return *this;
273  }
274 };
275 
276 template <typename T>
278 {
279  auto df = fImplWeakPtr.lock();
280  if (!df) {
281  throw std::runtime_error("The main TDataFrame is not reachable: did it go out of scope?");
282  }
283  df->Run();
284 }
285 } // end NS TDF
286 } // end NS Experimental
287 
288 namespace Detail {
289 namespace TDF {
290 /// Create a TResultProxy and set its pointer to the corresponding TAction
291 /// This overload is invoked by non-jitted actions, as they have access to TAction before constructing TResultProxy.
292 template <typename T>
293 TResultProxy<T> MakeResultProxy(const std::shared_ptr<T> &r, const std::shared_ptr<TLoopManager> &df,
294  TDFInternal::TActionBase *actionPtr)
295 {
296  auto readiness = std::make_shared<bool>(false);
297  auto resPtr = TResultProxy<T>(r, readiness, df, actionPtr);
298  df->Book(readiness);
299  return resPtr;
300 }
301 
302 /// Create a TResultProxy and return it together with its pointer to TAction
303 /// This overload is invoked by jitted actions; the pointer to TAction will be set right before the loop by jitted code
304 template <typename T>
305 std::pair<TResultProxy<T>, std::shared_ptr<TDFInternal::TActionBase *>>
306 MakeResultProxy(const std::shared_ptr<T> &r, const std::shared_ptr<TLoopManager> &df)
307 {
308  auto readiness = std::make_shared<bool>(false);
309  auto resPtr = TResultProxy<T>(r, readiness, df);
310  df->Book(readiness);
311  return std::make_pair(resPtr, resPtr.GetActionPtrPtr());
312 }
313 } // end NS TDF
314 } // end NS Detail
315 } // end NS ROOT
316 
317 #endif // ROOT_TRESULTPROXY
const T & GetValue()
Get a const reference to the encapsulated object.
const SPT_t fObjPtr
Shared pointer encapsulating the wrapped result.
Namespace for new ROOT classes and functions.
Definition: StringConv.hxx:21
std::shared_ptr< TDFDetail::TLoopManager > SPTLM_t
double T(double x)
Definition: ChebyshevPol.h:34
T Value_t
Convenience alias to simplify access to proxied type.
std::shared_ptr< TDFInternal::TActionBase * > GetActionPtrPtr() const
T * Get()
Get the pointer to the encapsulated result.
TResultProxy< T > & OnPartialResult(ULong64_t everyNEvents, std::function< void(T &)> callback)
Register a callback that TDataFrame will execute "everyNEvents" on a partial result.
std::weak_ptr< TDFDetail::TLoopManager > WPTLM_t
WPTLM_t fImplWeakPtr
Points to the TLoopManager at the root of the functional graph.
TResultProxy< T > & OnPartialResultSlot(ULong64_t everyNEvents, std::function< void(unsigned int, T &)> callback)
Register a callback that TDataFrame will execute in each worker thread concurrently on that thread&#39;s ...
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
Definition: RExports.h:146
Smart pointer for the return type of actions.
ROOT::R::TRInterface & r
Definition: Object.C:4
SVector< double, 2 > v
Definition: Dict.h:5
TIterationHelper< T >::Iterator_t begin()
Return an iterator to the beginning of the contained object if this makes sense, throw a compilation ...
const std::shared_ptr< TDFInternal::TActionBase * > fActionPtrPtr
Shared_ptr to a pointer to the TDF action that produces this result.
std::pair< TResultProxy< T >, std::shared_ptr< TDFInternal::TActionBase * > > MakeResultProxy(const std::shared_ptr< T > &r, const std::shared_ptr< TLoopManager > &df)
Create a TResultProxy and return it together with its pointer to TAction This overload is invoked by ...
unsigned long long ULong64_t
Definition: RtypesCore.h:70
ROOT type_traits extensions.
Definition: TypeTraits.hxx:23
TResultProxy(const SPT_t &objPtr, const ShrdPtrBool_t &readiness, const SPTLM_t &loopManager, TDFInternal::TActionBase *actionPtr=nullptr)
T * operator->()
Get a pointer to the encapsulated object.
std::shared_ptr< bool > ShrdPtrBool_t
void TriggerRun()
Triggers the event loop in the TLoopManager instance to which it&#39;s associated via the fImplWeakPtr...
T & operator*()
Get a pointer to the encapsulated object.
typedef void((*Func_t)())
TIterationHelper< T >::Iterator_t end()
Return an iterator to the end of the contained object if this makes sense, throw a compilation error ...