Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RDFHelpers.cxx
Go to the documentation of this file.
1// Author: Stefan Wunsch, Enrico Guiraud CERN 09/2020
2
3/*************************************************************************
4 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
5 * All rights reserved. *
6 * *
7 * For the licensing terms see $ROOTSYS/LICENSE. *
8 * For the list of contributors see $ROOTSYS/README/CREDITS. *
9 *************************************************************************/
10
11#include "ROOT/RDFHelpers.hxx"
12
13#include "ROOT/RDF/RActionImpl.hxx" // for RActionImpl
14#include "ROOT/RDF/RFilterBase.hxx" // for RDFInternal
15#include "ROOT/RDF/RLoopManager.hxx" // for RLoopManager
16#include "ROOT/RDF/Utils.hxx"
17#include "ROOT/RResultHandle.hxx" // for RResultHandle, RunGraphs
18
19#include "TROOT.h" // IsImplicitMTEnabled
20#include "TError.h" // Warning
21#include "TStopwatch.h"
22#include "RConfigure.h" // R__USE_IMT
23#include "ROOT/RLogger.hxx"
24#include "ROOT/RSlotStack.hxx"
25#ifdef R__USE_IMT
27#endif // R__USE_IMT
28
29#include <algorithm>
30#include <cstdio>
31#include <iomanip>
32#include <iostream>
33#include <set>
34
35// TODO, this function should be part of core libraries
36#include <numeric>
37#if (!defined(_WIN32)) && (!defined(_WIN64))
38#include <unistd.h>
39#endif
40
41#if defined(_WIN32) || defined(_WIN64)
42#define WIN32_LEAN_AND_MEAN
43#define VC_EXTRALEAN
44#include <io.h>
45#include <Windows.h>
46#else
47#include <sys/ioctl.h>
48#endif
49
50class TTreeReader;
51
52// Get terminal size for progress bar
54{
55#if defined(_WIN32) || defined(_WIN64)
56 if (!_isatty(_fileno(stdout)))
57 return 0;
58 int width = 0;
61 width = (int)(csbi.srWindow.Right - csbi.srWindow.Left + 1);
62 return width;
63#else
64 int width = 0;
65 struct winsize w;
67 width = (int)(w.ws_col);
68 return width;
69#endif
70}
71
73
74unsigned int ROOT::RDF::RunGraphs(std::vector<RResultHandle> handles)
75{
76 if (handles.empty()) {
77 Warning("RunGraphs", "Got an empty list of handles, now quitting.");
78 return 0u;
79 }
80
81 // Check that there are results which have not yet been run
82 const unsigned int nToRun =
83 std::count_if(handles.begin(), handles.end(), [](const auto &h) { return !h.IsReady(); });
84 if (nToRun < handles.size()) {
85 Warning("RunGraphs", "Got %zu handles from which %zu link to results which are already ready.", handles.size(),
86 handles.size() - nToRun);
87 }
88 if (nToRun == 0u)
89 return 0u;
90
91 // Find the unique event loops
92 auto sameGraph = [](const RResultHandle &a, const RResultHandle &b) { return a.fLoopManager < b.fLoopManager; };
93 std::set<RResultHandle, decltype(sameGraph)> s(handles.begin(), handles.end(), sameGraph);
94 std::vector<RResultHandle> uniqueLoops(s.begin(), s.end());
95
96 // Trigger jitting. One call is enough to jit the code required by all computation graphs.
98 sw.Start();
99 {
103 // a very high verbosity was requested, let's not silence anything
104 uniqueLoops[0].fLoopManager->Jit();
105 } else {
106 // silence logs from RLoopManager::Jit: RunGraphs does its own logging
108 uniqueLoops[0].fLoopManager->Jit();
109 }
110 }
111 sw.Stop();
113 << "Just-in-time compilation phase for RunGraphs (" << uniqueLoops.size()
114 << " unique computation graphs) completed"
115 << (sw.RealTime() > 1e-3 ? " in " + std::to_string(sw.RealTime()) + " seconds." : " in less than 1ms.");
116
117 // Trigger the unique event loops
118 auto slotStack = std::make_shared<ROOT::Internal::RSlotStack>(ROOT::GetThreadPoolSize());
119 auto run = [&slotStack](RResultHandle &h) {
120 if (h.fLoopManager) {
121 h.fLoopManager->SetSlotStack(slotStack);
122 h.fLoopManager->Run(/*jit=*/false);
123 }
124 };
125
126 sw.Start();
127#ifdef R__USE_IMT
130 } else {
131#endif
132 std::for_each(uniqueLoops.begin(), uniqueLoops.end(), run);
133#ifdef R__USE_IMT
134 }
135#endif
136 sw.Stop();
138 << "Finished RunGraphs run (" << uniqueLoops.size() << " unique computation graphs, " << sw.CpuTime() << "s CPU, "
139 << sw.RealTime() << "s elapsed).";
140
141 return uniqueLoops.size();
142}
143
145{
146 throw std::logic_error("Varying a Snapshot result is not implemented yet.");
147}
148
149namespace ROOT {
150namespace RDF {
151
152namespace Experimental {
153
154void ThreadsPerTH3(unsigned int N)
155{
157}
158
160 unsigned int printInterval, bool useColors)
161 : fPrintInterval(printInterval),
162 fIncrement{increment},
163 fBarWidth{progressBarWidth = int(get_tty_size() / 4)},
164 fTotalFiles{totalFiles},
166 fIsTTY{_isatty(_fileno(stdout)) != 0},
167 fUseShellColours{false && useColors}
168#else
169 fIsTTY{isatty(fileno(stdout)) == 1},
170 fUseShellColours{useColors && fIsTTY} // Control characters only with terminals.
171#endif
172{
173}
174
175/// Compute a running mean of events/s.
186
187/// Record current event counts and time stamp, populate evts/s statistics array.
206
207namespace {
208
209struct RestoreStreamState {
210 RestoreStreamState(std::ostream &stream) : fStream(stream), fFlags(stream.flags()), fFillChar(stream.fill()) {}
212 {
213 fStream.flags(fFlags);
214 fStream.fill(fFillChar);
215 }
216
217 std::ostream &fStream;
218 std::ios_base::fmtflags fFlags;
219 std::ostream::char_type fFillChar;
220};
221
222/// Format std::chrono::seconds as `1:30m`.
223std::ostream &operator<<(std::ostream &stream, std::chrono::seconds elapsedSeconds)
224{
225 RestoreStreamState restore(stream);
226 auto h = std::chrono::duration_cast<std::chrono::hours>(elapsedSeconds);
227 auto m = std::chrono::duration_cast<std::chrono::minutes>(elapsedSeconds - h);
228 auto s = (elapsedSeconds - h - m).count();
229
230 if (h.count() > 0)
231 stream << h.count() << ':' << std::setw(2) << std::right << std::setfill('0');
232 stream << m.count() << ':' << std::setw(2) << std::right << std::setfill('0') << s;
233 return stream << (h.count() > 0 ? 'h' : 'm');
234}
235
236} // namespace
237
238/// Print event and time statistics.
239void ProgressHelper::PrintStats(std::ostream &stream, std::size_t currentEventCount,
240 std::chrono::seconds elapsedSeconds) const
241{
242 RestoreStreamState restore(stream);
243 auto evtpersec = EvtPerSec();
246 auto totalFiles = fTotalFiles;
247
249 stream << "\033[35m";
250 stream << "["
251 << "Elapsed time: " << elapsedSeconds << " ";
253 stream << "\033[0m";
254 stream << "processing file: " << currentFileIdx << " / " << totalFiles << " ";
255
256 // Event counts:
258 stream << "\033[32m";
259
260 stream << "processed evts: " << currentEventCount;
261 if (GetNEventsOfCurrentFile != 0) {
262 stream << " / " << std::scientific << std::setprecision(2) << GetNEventsOfCurrentFile;
263 }
264 stream << " ";
265
267 stream << "\033[0m";
268
269 // events/s
270 stream << std::scientific << std::setprecision(2) << evtpersec << " evt/s";
271
272 // Time statistics:
273 if (GetNEventsOfCurrentFile != 0) {
275 stream << "\033[35m";
276 std::chrono::seconds remainingSeconds(
277 static_cast<long long>((ComputeNEventsSoFar() - currentEventCount) / evtpersec));
278 stream << " " << remainingSeconds << " "
279 << " remaining time (per file being processed)";
281 stream << "\033[0m";
282 }
283
284 stream << "] ";
285}
286
287void ProgressHelper::PrintStatsFinal(std::ostream &stream, std::chrono::seconds elapsedSeconds) const
288{
289 RestoreStreamState restore(stream);
291 auto totalFiles = fTotalFiles;
292
294 stream << "\033[35m";
295 stream << "["
296 << "Total elapsed time: " << elapsedSeconds << " ";
298 stream << "\033[0m";
299 stream << "processed files: " << totalFiles << " / " << totalFiles << " ";
300
301 // Event counts:
303 stream << "\033[32m";
304
305 stream << "processed evts: " << totalEvents;
306 if (totalEvents != 0) {
307 stream << " / " << std::scientific << std::setprecision(2) << totalEvents;
308 }
309
311 stream << "\033[0m";
312
313 stream << "] ";
314}
315
316/// Print a progress bar of width `ProgressHelper::fBarWidth` if `fGetNEventsOfCurrentFile` is known.
317void ProgressHelper::PrintProgressBar(std::ostream &stream, std::size_t currentEventCount) const
318{
321 return;
322
323 RestoreStreamState restore(stream);
324
326 unsigned int nBar = std::min(completion, 1.) * fBarWidth;
327
328 std::string bars(std::max(nBar, 1u), '=');
329 bars.back() = (nBar == fBarWidth) ? '=' : '>';
330
332 stream << "\033[33m";
333 stream << '|' << std::setfill(' ') << std::setw(fBarWidth) << std::left << bars << "| ";
335 stream << "\033[0m";
336}
337//*/
338
339class ProgressBarAction final : public ROOT::Detail::RDF::RActionImpl<ProgressBarAction> {
340public:
341 using Result_t = int;
342
343private:
344 std::shared_ptr<ProgressHelper> fHelper;
345 std::shared_ptr<int> fDummyResult = std::make_shared<int>();
346
347public:
348 ProgressBarAction(std::shared_ptr<ProgressHelper> r) : fHelper(std::move(r)) {}
349
350 std::shared_ptr<Result_t> GetResultPtr() const { return fDummyResult; }
351
352 void Initialize() {}
353 void InitTask(TTreeReader *, unsigned int) {}
354
355 void Exec(unsigned int) {}
356
357 void Finalize()
358 {
359 std::mutex fPrintMutex;
360 if (!fPrintMutex.try_lock())
361 return;
362 std::lock_guard<std::mutex> lockGuard(fPrintMutex, std::adopt_lock);
363 const auto &[eventCount, elapsedSeconds] = fHelper->RecordEvtCountAndTime();
364
365 // The next line resets the current line output in the terminal.
366 // Brings the cursor at the beginning ('\r'), prints whitespace with the
367 // same length as the terminal size, then resets the cursor again so we
368 // can print the final stats on a clean line.
369 std::cout << '\r' << std::string(get_tty_size(), ' ') << '\r';
370 fHelper->PrintStatsFinal(std::cout, elapsedSeconds);
371 std::cout << '\n';
372 }
373
374 std::string GetActionName() { return "ProgressBar"; }
375 // dummy implementation of PartialUpdate
376 int &PartialUpdate(unsigned int) { return *fDummyResult; }
377
379 {
380 return [this](unsigned int slot, const ROOT::RDF::RSampleInfo &id) {
381 this->fHelper->registerNewSample(slot, id);
382 return this->fHelper->ComputeNEventsSoFar();
383 };
384 }
385};
386
388{
389 auto total_files = node.GetNFiles();
390 auto progress = std::make_shared<ProgressHelper>(1000, total_files);
391 ProgressBarAction c(progress);
392 auto r = node.Book<>(c);
393 r.OnPartialResultSlot(1000, [progress](unsigned int slot, auto &&arg) { (*progress)(slot, arg); });
394}
395
397{
398 auto node = ROOT::RDF::AsRNode(dataframe);
400}
401} // namespace Experimental
402} // namespace RDF
403} // namespace ROOT
int get_tty_size()
std::ostream & fStream
std::ostream::char_type fFillChar
std::ios_base::fmtflags fFlags
#define R__LOG_INFO(...)
Definition RLogger.hxx:359
#define b(i)
Definition RSha256.hxx:100
#define c(i)
Definition RSha256.hxx:101
#define a(i)
Definition RSha256.hxx:99
#define h(i)
Definition RSha256.hxx:106
#define e(i)
Definition RSha256.hxx:103
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:252
#define N
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize id
Option_t Option_t width
TCanvas * bars()
Definition bars.C:1
Base class for action helpers, see RInterface::Book() for more information.
ROOT::RDF::SampleCallback_t GetSampleCallback() final
Override this method to register a callback that is executed before the processing a new data sample ...
std::shared_ptr< ProgressHelper > fHelper
ProgressBarAction(std::shared_ptr< ProgressHelper > r)
void InitTask(TTreeReader *, unsigned int)
std::shared_ptr< Result_t > GetResultPtr() const
std::pair< std::size_t, std::chrono::seconds > RecordEvtCountAndTime()
Record current event counts and time stamp, populate evts/s statistics array.
void PrintProgressBar(std::ostream &stream, std::size_t currentEventCount) const
Print a progress bar of width ProgressHelper::fBarWidth if fGetNEventsOfCurrentFile is known.
void PrintStats(std::ostream &stream, std::size_t currentEventCount, std::chrono::seconds totalElapsedSeconds) const
Print event and time statistics.
std::array< double, 20 > fEventsPerSecondStatistics
double EvtPerSec() const
Compute a running mean of events/s.
std::atomic< std::size_t > fProcessedEvents
ProgressHelper(std::size_t increment, unsigned int totalFiles=1, unsigned int progressBarWidth=40, unsigned int printInterval=1, bool useColors=true)
Create a progress helper.
std::chrono::time_point< std::chrono::system_clock > fLastPrintTime
std::chrono::time_point< std::chrono::system_clock > fBeginTime
void PrintStatsFinal(std::ostream &stream, std::chrono::seconds totalElapsedSeconds) const
The public interface to the RDataFrame federation of classes.
RResultPtr< typename std::decay_t< Helper >::Result_t > Book(Helper &&helper, const ColumnNames_t &columns={})
Book execution of a custom action using a user-defined helper object.
A type-erased version of RResultPtr and RResultMap.
Smart pointer for the return type of actions.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
ROOT's RDataFrame offers a modern, high-level interface for analysis of data stored in TTree ,...
ELogLevel GetEffectiveVerbosity(const RLogManager &mgr) const
Definition RLogger.hxx:310
static RLogManager & Get()
Definition RLogger.cxx:60
Change the verbosity level (global or specific to the RLogChannel passed to the constructor) for the ...
Definition RLogger.hxx:240
const_iterator begin() const
const_iterator end() const
This class provides a simple interface to execute the same task multiple times in parallel threads,...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
Stopwatch class.
Definition TStopwatch.h:28
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:46
ROOT::RLogChannel & RDFLogChannel()
Definition RDFUtils.cxx:42
unsigned int & NThreadPerTH3()
Obtain or set the number of threads that will share a clone of a thread-safe 3D histogram.
Definition RDFUtils.cxx:63
RLogChannel & GetChannelOrManager()
Definition RLogger.hxx:299
void ThreadsPerTH3(unsigned int nThread=1)
Set the number of threads sharing one TH3 in RDataFrame.
RResultMap< T > VariationsFor(RResultPtr< T > resPtr)
Produce all required systematic variations for the given result.
void AddProgressBar(ROOT::RDF::RNode df)
Add ProgressBar to a ROOT::RDF::RNode.
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with an RDataFrame computation graph via e....
unsigned int RunGraphs(std::vector< RResultHandle > handles)
Run the event loops of multiple RDataFrames concurrently.
std::ostream & operator<<(std::ostream &os, const RDFDescription &description)
RNode AsRNode(NodeType node)
Cast a RDataFrame node to the common type ROOT::RDF::RNode.
Namespace for new ROOT classes and functions.
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:600
UInt_t GetThreadPoolSize()
Returns the size of ROOT's thread pool.
Definition TROOT.cxx:607
@ kDebug
Debug information; only useful for developers; can have added verbosity up to 255-kDebug.
@ kError
An error.
TMarker m
Definition textangle.C:8