Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RDFHelpers.hxx
Go to the documentation of this file.
1// Author: Enrico Guiraud, Danilo Piparo CERN 02/2018
2
3/*************************************************************************
4 * Copyright (C) 1995-2018, Rene Brun and Fons Rademakers. *
5 * All rights reserved. *
6 * *
7 * For the licensing terms see $ROOTSYS/LICENSE. *
8 * For the list of contributors see $ROOTSYS/README/CREDITS. *
9 *************************************************************************/
10
11// This header contains helper free functions that slim down RDataFrame's programming model
12
13#ifndef ROOT_RDF_HELPERS
14#define ROOT_RDF_HELPERS
15
19#include <ROOT/RResultHandle.hxx> // users of RunGraphs might rely on this transitive include
20#include <ROOT/TypeTraits.hxx>
21
22#include <array>
23#include <chrono>
24#include <fstream>
25#include <functional>
26#include <map>
27#include <memory>
28#include <mutex>
29#include <type_traits>
30#include <utility> // std::index_sequence
31#include <vector>
32
33namespace ROOT {
34namespace Internal {
35namespace RDF {
36template <typename... ArgTypes, typename F>
38{
39 return std::function<bool(ArgTypes...)>([=](ArgTypes... args) mutable { return !f(args...); });
40}
41
42template <typename... ArgTypes, typename Ret, typename... Args>
43std::function<bool(ArgTypes...)> NotHelper(ROOT::TypeTraits::TypeList<ArgTypes...>, Ret (*f)(Args...))
44{
45 return std::function<bool(ArgTypes...)>([=](ArgTypes... args) mutable { return !f(args...); });
46}
47
48template <typename I, typename T, typename F>
50
51template <std::size_t... N, typename T, typename F>
52class PassAsVecHelper<std::index_sequence<N...>, T, F> {
53 template <std::size_t Idx>
54 using AlwaysT = T;
55 std::decay_t<F> fFunc;
56
57public:
58 PassAsVecHelper(F &&f) : fFunc(std::forward<F>(f)) {}
59 auto operator()(AlwaysT<N>... args) -> decltype(fFunc({args...})) { return fFunc({args...}); }
60};
61
62template <std::size_t N, typename T, typename F>
64{
65 return PassAsVecHelper<std::make_index_sequence<N>, T, F>(std::forward<F>(f));
66}
67
68} // namespace RDF
69} // namespace Internal
70
71namespace RDF {
73
74// clang-format off
75/// Given a callable with signature bool(T1, T2, ...) return a callable with same signature that returns the negated result
76///
77/// The callable must have one single non-template definition of operator(). This is a limitation with respect to
78/// std::not_fn, required for interoperability with RDataFrame.
79// clang-format on
80template <typename F,
81 typename Args = typename ROOT::TypeTraits::CallableTraits<std::decay_t<F>>::arg_types_nodecay,
82 typename Ret = typename ROOT::TypeTraits::CallableTraits<std::decay_t<F>>::ret_type>
83auto Not(F &&f) -> decltype(RDFInternal::NotHelper(Args(), std::forward<F>(f)))
84{
85 static_assert(std::is_same<Ret, bool>::value, "RDF::Not requires a callable that returns a bool.");
86 return RDFInternal::NotHelper(Args(), std::forward<F>(f));
87}
88
89// clang-format off
90/// PassAsVec is a callable generator that allows passing N variables of type T to a function as a single collection.
91///
92/// PassAsVec<N, T>(func) returns a callable that takes N arguments of type T, passes them down to function `func` as
93/// an initializer list `{t1, t2, t3,..., tN}` and returns whatever f({t1, t2, t3, ..., tN}) returns.
94///
95/// Note that for this to work with RDataFrame the type of all columns that the callable is applied to must be exactly T.
96/// Example usage together with RDataFrame ("varX" columns must all be `float` variables):
97/// \code
98/// bool myVecFunc(std::vector<float> args);
99/// df.Filter(PassAsVec<3, float>(myVecFunc), {"var1", "var2", "var3"});
100/// \endcode
101// clang-format on
102template <std::size_t N, typename T, typename F>
104{
106}
107
108// clang-format off
109/// Create a graphviz representation of the dataframe computation graph, return it as a string.
110/// \param[in] node any node of the graph. Called on the head (first) node, it prints the entire graph. Otherwise, only the branch the node belongs to.
111///
112/// The output can be displayed with a command akin to `dot -Tpng output.dot > output.png && open output.png`.
113///
114/// Note that "hanging" Defines, i.e. Defines without downstream nodes, will not be displayed by SaveGraph as they are
115/// effectively optimized away from the computation graph.
116///
117/// Note that SaveGraph is not thread-safe and must not be called concurrently from different threads.
118// clang-format on
119template <typename NodeType>
120std::string SaveGraph(NodeType node)
121{
123 return helper.RepresentGraph(node);
124}
125
126// clang-format off
127/// Create a graphviz representation of the dataframe computation graph, write it to the specified file.
128/// \param[in] node any node of the graph. Called on the head (first) node, it prints the entire graph. Otherwise, only the branch the node belongs to.
129/// \param[in] outputFile file where to save the representation.
130///
131/// The output can be displayed with a command akin to `dot -Tpng output.dot > output.png && open output.png`.
132///
133/// Note that "hanging" Defines, i.e. Defines without downstream nodes, will not be displayed by SaveGraph as they are
134/// effectively optimized away from the computation graph.
135///
136/// Note that SaveGraph is not thread-safe and must not be called concurrently from different threads.
137// clang-format on
138template <typename NodeType>
139void SaveGraph(NodeType node, const std::string &outputFile)
140{
142 std::string dotGraph = helper.RepresentGraph(node);
143
144 std::ofstream out(outputFile);
145 if (!out.is_open()) {
146 throw std::runtime_error("Could not open output file \"" + outputFile + "\"for reading");
147 }
148
149 out << dotGraph;
150 out.close();
151}
152
153// clang-format off
154/// Cast a RDataFrame node to the common type ROOT::RDF::RNode
155/// \param[in] node Any node of a RDataFrame graph
156// clang-format on
157template <typename NodeType>
158RNode AsRNode(NodeType node)
159{
160 return node;
161}
162
163// clang-format off
164/// Trigger the event loop of multiple RDataFrames concurrently
165/// \param[in] handles A vector of RResultHandles
166/// \return The number of distinct computation graphs that have been processed
167///
168/// This function triggers the event loop of all computation graphs which relate to the
169/// given RResultHandles. The advantage compared to running the event loop implicitly by accessing the
170/// RResultPtr is that the event loops will run concurrently. Therefore, the overall
171/// computation of all results is generally more efficient.
172/// It should be noted that user-defined operations (e.g., Filters and Defines) of the different RDataFrame graphs are assumed to be safe to call concurrently.
173///
174/// ~~~{.cpp}
175/// ROOT::RDataFrame df1("tree1", "file1.root");
176/// auto r1 = df1.Histo1D("var1");
177///
178/// ROOT::RDataFrame df2("tree2", "file2.root");
179/// auto r2 = df2.Sum("var2");
180///
181/// // RResultPtr -> RResultHandle conversion is automatic
182/// ROOT::RDF::RunGraphs({r1, r2});
183/// ~~~
184// clang-format on
185unsigned int RunGraphs(std::vector<RResultHandle> handles);
186
187namespace Experimental {
188
189/// \brief Produce all required systematic variations for the given result.
190/// \param[in] resPtr The result for which variations should be produced.
191/// \return A \ref ROOT::RDF::Experimental::RResultMap "RResultMap" object with full variation names as strings
192/// (e.g. "pt:down") and the corresponding varied results as values.
193///
194/// A given input RResultPtr<T> produces a corresponding RResultMap<T> with a "nominal"
195/// key that will return a value identical to the one contained in the original RResultPtr.
196/// Other keys correspond to the varied values of this result, one for each variation
197/// that the result depends on.
198/// VariationsFor does not trigger the event loop. The event loop is only triggered
199/// upon first access to a valid key, similarly to what happens with RResultPtr.
200///
201/// If the result does not depend, directly or indirectly, from any registered systematic variation, the
202/// returned RResultMap will contain only the "nominal" key.
203///
204/// See RDataFrame's \ref ROOT::RDF::RInterface::Vary() "Vary" method for more information and example usages.
205///
206/// \note Currently, producing variations for the results of \ref ROOT::RDF::RInterface::Display() "Display",
207/// \ref ROOT::RDF::RInterface::Report() "Report" and \ref ROOT::RDF::RInterface::Snapshot() "Snapshot"
208/// actions is not supported.
209//
210// An overview of how systematic variations work internally. Given N variations (including the nominal):
211//
212// RResultMap owns RVariedAction
213// N results N action helpers
214// N previous filters
215// N*#input_cols column readers
216//
217// ...and each RFilter and RDefine knows for what universe it needs to construct column readers ("nominal" by default).
218template <typename T>
220{
221 R__ASSERT(resPtr != nullptr && "Calling VariationsFor on an empty RResultPtr");
222
223 // populate parts of the computation graph for which we only have "empty shells", e.g. RJittedActions and
224 // RJittedFilters
225 resPtr.fLoopManager->Jit();
226
227 std::unique_ptr<RDFInternal::RActionBase> variedAction;
228 std::vector<std::shared_ptr<T>> variedResults;
229
230 std::shared_ptr<RDFInternal::RActionBase> nominalAction = resPtr.fActionPtr;
231 std::vector<std::string> variations = nominalAction->GetVariations();
232 const auto nVariations = variations.size();
233
234 if (nVariations > 0) {
235 // clone the result once for each variation
236 variedResults.reserve(nVariations);
237 for (auto i = 0u; i < nVariations; ++i){
238 // implicitly assuming that T is copiable: this should be the case
239 // for all result types in use, as they are copied for each slot
240 variedResults.emplace_back(new T{*resPtr.fObjPtr});
241
242 // Check if the result's type T inherits from TNamed
243 if constexpr (std::is_base_of<TNamed, T>::value) {
244 // Get the current variation name
245 std::string variationName = variations[i];
246 // Replace the colon with an underscore
247 std::replace(variationName.begin(), variationName.end(), ':', '_');
248 // Get a pointer to the corresponding varied result
249 auto &variedResult = variedResults.back();
250 // Set the varied result's name to NOMINALNAME_VARIATIONAME
251 variedResult->SetName((std::string(variedResult->GetName()) + "_" + variationName).c_str());
252 }
253 }
254
255 std::vector<void *> typeErasedResults;
256 typeErasedResults.reserve(variedResults.size());
257 for (auto &res : variedResults)
258 typeErasedResults.emplace_back(&res);
259
260 // Create the RVariedAction and inject it in the computation graph.
261 // This recursively creates all the required varied column readers and upstream nodes of the computation graph.
262 variedAction = nominalAction->MakeVariedAction(std::move(typeErasedResults));
263 }
264
265 return RDFInternal::MakeResultMap<T>(resPtr.fObjPtr, std::move(variedResults), std::move(variations),
266 *resPtr.fLoopManager, std::move(nominalAction), std::move(variedAction));
267}
268
271
272/// \brief Add ProgressBar to a ROOT::RDF::RNode
273/// \param[in] df RDataFrame node at which ProgressBar is called.
274///
275/// The ProgressBar can be added not only at the RDataFrame head node, but also at any any computational node,
276/// such as Filter or Define.
277/// ###Example usage:
278/// ~~~{.cpp}
279/// ROOT::RDataFrame df("tree", "file.root");
280/// auto df_1 = ROOT::RDF::RNode(df.Filter("x>1"));
281/// ROOT::RDF::Experimental::AddProgressBar(df_1);
282/// ~~~
284
285/// \brief Add ProgressBar to an RDataFrame
286/// \param[in] df RDataFrame for which ProgressBar is called.
287///
288/// This function adds a ProgressBar to display the event statistics in the terminal every
289/// \b m events and every \b n seconds, including elapsed time, currently processed file,
290/// currently processed events, the rate of event processing
291/// and an estimated remaining time (per file being processed).
292/// ProgressBar should be added after the dataframe object (df) is created first:
293/// ~~~{.cpp}
294/// ROOT::RDataFrame df("tree", "file.root");
295/// ROOT::RDF::Experimental::AddProgressBar(df);
296/// ~~~
297/// For more details see ROOT::RDF::Experimental::ProgressHelper Class.
299
301
302/// RDF progress helper.
303/// This class provides callback functions to the RDataFrame. The event statistics
304/// (including elapsed time, currently processed file, currently processed events, the rate of event processing
305/// and an estimated remaining time (per file being processed))
306/// are recorded and printed in the terminal every m events and every n seconds.
307/// ProgressHelper::operator()(unsigned int, T&) is thread safe, and can be used as a callback in MT mode.
308/// ProgressBar should be added after creating the dataframe object (df):
309/// ~~~{.cpp}
310/// ROOT::RDataFrame df("tree", "file.root");
311/// ROOT::RDF::Experimental::AddProgressBar(df);
312/// ~~~
313/// alternatively RDataFrame can be cast to an RNode first giving it more flexibility.
314/// For example, it can be called at any computational node, such as Filter or Define, not only the head node,
315/// with no change to the ProgressBar function itself:
316/// ~~~{.cpp}
317/// ROOT::RDataFrame df("tree", "file.root");
318/// auto df_1 = ROOT::RDF::RNode(df.Filter("x>1"));
319/// ROOT::RDF::Experimental::AddProgressBar(df_1);
320/// ~~~
322private:
323 double EvtPerSec() const;
324 std::pair<std::size_t, std::chrono::seconds> RecordEvtCountAndTime();
325 void PrintStats(std::ostream &stream, std::size_t currentEventCount, std::chrono::seconds totalElapsedSeconds) const;
326 void PrintStatsFinal(std::ostream &stream, std::chrono::seconds totalElapsedSeconds) const;
327 void PrintProgressBar(std::ostream &stream, std::size_t currentEventCount) const;
328
329 std::chrono::time_point<std::chrono::system_clock> fBeginTime = std::chrono::system_clock::now();
330 std::chrono::time_point<std::chrono::system_clock> fLastPrintTime = fBeginTime;
331 std::chrono::seconds fPrintInterval{1};
332
333 std::atomic<std::size_t> fProcessedEvents{0};
334 std::size_t fLastProcessedEvents{0};
335 std::size_t fIncrement;
336
338 std::map<std::string, ULong64_t> fSampleNameToEventEntries; // Filename, events in the file
339
340 std::array<double, 20> fEventsPerSecondStatistics;
342
343 unsigned int fBarWidth;
344 unsigned int fTotalFiles;
345
346 std::mutex fPrintMutex;
347 bool fIsTTY;
349
350 std::shared_ptr<TTree> fTree{nullptr};
351
352public:
353 /// Create a progress helper.
354 /// \param increment RDF callbacks are called every `n` events. Pass this `n` here.
355 /// \param totalFiles read total number of files in the RDF.
356 /// \param progressBarWidth Number of characters the progress bar will occupy.
357 /// \param printInterval Update every stats every `n` seconds.
358 /// \param useColors Use shell colour codes to colour the output. Automatically disabled when
359 /// we are not writing to a tty.
360 ProgressHelper(std::size_t increment, unsigned int totalFiles = 1, unsigned int progressBarWidth = 40,
361 unsigned int printInterval = 1, bool useColors = true);
362
363 ~ProgressHelper() = default;
364
365 friend class ProgressBarAction;
366
367 /// Register a new sample for completion statistics.
368 /// \see ROOT::RDF::RInterface::DefinePerSample().
369 /// The *id.AsString()* refers to the name of the currently processed file.
370 /// The idea is to populate the event entries in the *fSampleNameToEventEntries* map
371 /// by selecting the greater of the two values:
372 /// *id.EntryRange().second* which is the upper event entry range of the processed sample
373 /// and the current value of the event entries in the *fSampleNameToEventEntries* map.
374 /// In the single threaded case, the two numbers are the same as the entry range corresponds
375 /// to the number of events in an individual file (each sample is simply a single file).
376 /// In the multithreaded case, the idea is to accumulate the higher event entry value until
377 /// the total number of events in a given file is reached.
378 void registerNewSample(unsigned int /*slot*/, const ROOT::RDF::RSampleInfo &id)
379 {
380 std::lock_guard<std::mutex> lock(fSampleNameToEventEntriesMutex);
381 fSampleNameToEventEntries[id.AsString()] =
382 std::max(id.EntryRange().second, fSampleNameToEventEntries[id.AsString()]);
383 }
384
385 /// Thread-safe callback for RDataFrame.
386 /// It will record elapsed times and event statistics, and print a progress bar every n seconds (set by the
387 /// fPrintInterval). \param slot Ignored. \param value Ignored.
388 template <typename T>
389 void operator()(unsigned int /*slot*/, T &value)
390 {
392 }
393 // clang-format off
394 /// Thread-safe callback for RDataFrame.
395 /// It will record elapsed times and event statistics, and print a progress bar every n seconds (set by the fPrintInterval).
396 /// \param value Ignored.
397 // clang-format on
398 template <typename T>
399 void operator()(T & /*value*/)
400 {
401 using namespace std::chrono;
402 // ***************************************************
403 // Warning: Here, everything needs to be thread safe:
404 // ***************************************************
406
407 // We only print every n seconds.
408 if (duration_cast<seconds>(system_clock::now() - fLastPrintTime) < fPrintInterval) {
409 return;
410 }
411
412 // ***************************************************
413 // Protected by lock from here:
414 // ***************************************************
415 if (!fPrintMutex.try_lock())
416 return;
417 std::lock_guard<std::mutex> lockGuard(fPrintMutex, std::adopt_lock);
418
419 std::size_t eventCount;
420 seconds elapsedSeconds;
421 std::tie(eventCount, elapsedSeconds) = RecordEvtCountAndTime();
422
423 if (fIsTTY)
424 std::cout << "\r";
425
426 PrintProgressBar(std::cout, eventCount);
427 PrintStats(std::cout, eventCount, elapsedSeconds);
428
429 if (fIsTTY)
430 std::cout << std::flush;
431 else
432 std::cout << std::endl;
433 }
434
435 std::size_t ComputeNEventsSoFar() const
436 {
437 std::unique_lock<std::mutex> lock(fSampleNameToEventEntriesMutex);
438 std::size_t result = 0;
439 for (const auto &item : fSampleNameToEventEntries)
440 result += item.second;
441 return result;
442 }
443
444 unsigned int ComputeCurrentFileIdx() const
445 {
446 std::unique_lock<std::mutex> lock(fSampleNameToEventEntriesMutex);
447 return fSampleNameToEventEntries.size();
448 }
449};
450} // namespace Experimental
451} // namespace RDF
452} // namespace ROOT
453#endif
#define f(i)
Definition RSha256.hxx:104
#define R__ASSERT(e)
Definition TError.h:118
#define N
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void value
TRObject operator()(const T1 &t1) const
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
std::string RepresentGraph(ROOT::RDataFrame &rDataFrame)
Starting from the root node, prints the entire graph.
std::pair< std::size_t, std::chrono::seconds > RecordEvtCountAndTime()
Record current event counts and time stamp, populate evts/s statistics array.
void registerNewSample(unsigned int, const ROOT::RDF::RSampleInfo &id)
Register a new sample for completion statistics.
void PrintProgressBar(std::ostream &stream, std::size_t currentEventCount) const
Print a progress bar of width ProgressHelper::fBarWidth if fGetNEventsOfCurrentFile is known.
void operator()(unsigned int, T &value)
Thread-safe callback for RDataFrame.
void PrintStats(std::ostream &stream, std::size_t currentEventCount, std::chrono::seconds totalElapsedSeconds) const
Print event and time statistics.
std::map< std::string, ULong64_t > fSampleNameToEventEntries
std::array< double, 20 > fEventsPerSecondStatistics
double EvtPerSec() const
Compute a running mean of events/s.
std::atomic< std::size_t > fProcessedEvents
std::chrono::time_point< std::chrono::system_clock > fLastPrintTime
std::chrono::time_point< std::chrono::system_clock > fBeginTime
void PrintStatsFinal(std::ostream &stream, std::chrono::seconds totalElapsedSeconds) const
void operator()(T &)
Thread-safe callback for RDataFrame.
The public interface to the RDataFrame federation of classes.
Smart pointer for the return type of actions.
RDFDetail::RLoopManager * fLoopManager
Non-owning pointer to the RLoopManager at the root of this computation graph.
std::shared_ptr< RDFInternal::RActionBase > fActionPtr
Owning pointer to the action that will produce this result.
SPT_t fObjPtr
Shared pointer encapsulating the wrapped result.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
ROOT's RDataFrame offers a modern, high-level interface for analysis of data stored in TTree ,...
#define F(x, y, z)
std::function< bool(ArgTypes...)> NotHelper(ROOT::TypeTraits::TypeList< ArgTypes... >, F &&f)
auto PassAsVec(F &&f) -> PassAsVecHelper< std::make_index_sequence< N >, T, F >
RResultMap< T > VariationsFor(RResultPtr< T > resPtr)
Produce all required systematic variations for the given result.
void AddProgressBar(ROOT::RDF::RNode df)
Add ProgressBar to a ROOT::RDF::RNode.
unsigned int RunGraphs(std::vector< RResultHandle > handles)
Trigger the event loop of multiple RDataFrames concurrently.
auto Not(F &&f) -> decltype(RDFInternal::NotHelper(Args(), std::forward< F >(f)))
Given a callable with signature bool(T1, T2, ...) return a callable with same signature that returns ...
std::string SaveGraph(NodeType node)
Create a graphviz representation of the dataframe computation graph, return it as a string.
RNode AsRNode(NodeType node)
Cast a RDataFrame node to the common type ROOT::RDF::RNode.
This file contains a specialised ROOT message handler to test for diagnostic in unit tests.
Lightweight storage for a collection of types.