Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RDFHelpers.cxx
Go to the documentation of this file.
1// Author: Stefan Wunsch, Enrico Guiraud CERN 09/2020
2
3/*************************************************************************
4 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
5 * All rights reserved. *
6 * *
7 * For the licensing terms see $ROOTSYS/LICENSE. *
8 * For the list of contributors see $ROOTSYS/README/CREDITS. *
9 *************************************************************************/
10
11#include "ROOT/RDFHelpers.hxx"
12#include "TROOT.h" // IsImplicitMTEnabled
13#include "TError.h" // Warning
14#include "TStopwatch.h"
15#include "RConfigure.h" // R__USE_IMT
16#include "ROOT/RLogger.hxx"
17#include "ROOT/RDF/RLoopManager.hxx" // for RLoopManager
18#include "ROOT/RDF/Utils.hxx"
19#include "ROOT/RResultHandle.hxx" // for RResultHandle, RunGraphs
20#ifdef R__USE_IMT
22#endif // R__USE_IMT
23
24#include <algorithm>
25#include <iostream>
26#include <set>
27#include <cstdio>
28
29// TODO, this function should be part of core libraries
30#include <numeric>
31#if (!defined(_WIN32)) && (!defined(_WIN64))
32#include <unistd.h>
33#endif
34
35#if defined(_WIN32) || defined(_WIN64)
36#define WIN32_LEAN_AND_MEAN
37#define VC_EXTRALEAN
38#include <io.h>
39#include <Windows.h>
40#else
41#include <sys/ioctl.h>
42#endif
43
44// Get terminal size for progress bar
46{
47#if defined(_WIN32) || defined(_WIN64)
48 if (!_isatty(_fileno(stdout)))
49 return 0;
50 int width = 0;
51 CONSOLE_SCREEN_BUFFER_INFO csbi;
52 if (GetConsoleScreenBufferInfo(GetStdHandle(STD_OUTPUT_HANDLE), &csbi))
53 width = (int)(csbi.srWindow.Right - csbi.srWindow.Left + 1);
54 return width;
55#else
56 int width = 0;
57 struct winsize w;
58 ioctl(fileno(stdout), TIOCGWINSZ, &w);
59 width = (int)(w.ws_col);
60 return width;
61#endif
62}
63
65
66unsigned int ROOT::RDF::RunGraphs(std::vector<RResultHandle> handles)
67{
68 if (handles.empty()) {
69 Warning("RunGraphs", "Got an empty list of handles, now quitting.");
70 return 0u;
71 }
72
73 // Check that there are results which have not yet been run
74 const unsigned int nToRun =
75 std::count_if(handles.begin(), handles.end(), [](const auto &h) { return !h.IsReady(); });
76 if (nToRun < handles.size()) {
77 Warning("RunGraphs", "Got %zu handles from which %zu link to results which are already ready.", handles.size(),
78 handles.size() - nToRun);
79 }
80 if (nToRun == 0u)
81 return 0u;
82
83 // Find the unique event loops
84 auto sameGraph = [](const RResultHandle &a, const RResultHandle &b) { return a.fLoopManager < b.fLoopManager; };
85 std::set<RResultHandle, decltype(sameGraph)> s(handles.begin(), handles.end(), sameGraph);
86 std::vector<RResultHandle> uniqueLoops(s.begin(), s.end());
87
88 // Trigger jitting. One call is enough to jit the code required by all computation graphs.
89 TStopwatch sw;
90 sw.Start();
91 {
92 const auto effectiveVerbosity =
95 if (effectiveVerbosity >= ROOT::Experimental::ELogLevel::kDebug + 10) {
96 // a very high verbosity was requested, let's not silence anything
97 uniqueLoops[0].fLoopManager->Jit();
98 } else {
99 // silence logs from RLoopManager::Jit: RunGraphs does its own logging
102 uniqueLoops[0].fLoopManager->Jit();
103 }
104 }
105 sw.Stop();
107 << "Just-in-time compilation phase for RunGraphs (" << uniqueLoops.size()
108 << " unique computation graphs) completed"
109 << (sw.RealTime() > 1e-3 ? " in " + std::to_string(sw.RealTime()) + " seconds." : " in less than 1ms.");
110
111 // Trigger the unique event loops
112 auto run = [](RResultHandle &h) {
113 if (h.fLoopManager)
114 h.fLoopManager->Run(/*jit=*/false);
115 };
116
117 sw.Start();
118#ifdef R__USE_IMT
120 ROOT::TThreadExecutor{}.Foreach(run, uniqueLoops);
121 } else {
122#endif
123 std::for_each(uniqueLoops.begin(), uniqueLoops.end(), run);
124#ifdef R__USE_IMT
125 }
126#endif
127 sw.Stop();
129 << "Finished RunGraphs run (" << uniqueLoops.size() << " unique computation graphs, " << sw.CpuTime() << "s CPU, "
130 << sw.RealTime() << "s elapsed).";
131
132 return uniqueLoops.size();
133}
134
136{
137 throw std::logic_error("Varying a Snapshot result is not implemented yet.");
138}
139
140namespace ROOT {
141namespace RDF {
142
143namespace Experimental {
144
145ProgressHelper::ProgressHelper(std::size_t increment, unsigned int totalFiles, unsigned int progressBarWidth,
146 unsigned int printInterval, bool useColors)
147 : fPrintInterval(printInterval),
148 fIncrement{increment},
149 fBarWidth{progressBarWidth = int(get_tty_size() / 4)},
150 fTotalFiles{totalFiles},
151#if defined(_WIN32) || defined(_WIN64)
152 fIsTTY{_isatty(_fileno(stdout)) != 0},
153 fUseShellColours{false && useColors}
154#else
155 fIsTTY{isatty(fileno(stdout)) == 1},
156 fUseShellColours{useColors && fIsTTY} // Control characters only with terminals.
157#endif
158{
159}
160
161/// Compute a running mean of events/s.
163{
165 return std::accumulate(fEventsPerSecondStatistics.begin(),
168 else
169 return std::accumulate(fEventsPerSecondStatistics.begin(), fEventsPerSecondStatistics.end(), 0.) /
171}
172
173/// Record current event counts and time stamp, populate evts/s statistics array.
174std::pair<std::size_t, std::chrono::seconds> ProgressHelper::RecordEvtCountAndTime()
175{
176 using namespace std::chrono;
177
178 auto currentEventCount = fProcessedEvents.load();
179 auto eventsPerTimeInterval = currentEventCount - fLastProcessedEvents;
180 fLastProcessedEvents = currentEventCount;
181
182 auto oldPrintTime = fLastPrintTime;
183 auto newPrintTime = system_clock::now();
184 fLastPrintTime = newPrintTime;
185
186 duration<double> secondsCurrentInterval = newPrintTime - oldPrintTime;
188 eventsPerTimeInterval / secondsCurrentInterval.count();
189
190 return {currentEventCount, duration_cast<seconds>(newPrintTime - fBeginTime)};
191}
192
193namespace {
194
195struct RestoreStreamState {
196 RestoreStreamState(std::ostream &stream) : fStream(stream), fFlags(stream.flags()), fFillChar(stream.fill()) {}
197 ~RestoreStreamState()
198 {
199 fStream.flags(fFlags);
200 fStream.fill(fFillChar);
201 }
202
203 std::ostream &fStream;
204 std::ios_base::fmtflags fFlags;
205 std::ostream::char_type fFillChar;
206};
207
208/// Format std::chrono::seconds as `1:30m`.
209std::ostream &operator<<(std::ostream &stream, std::chrono::seconds elapsedSeconds)
210{
211 RestoreStreamState restore(stream);
212 auto h = std::chrono::duration_cast<std::chrono::hours>(elapsedSeconds);
213 auto m = std::chrono::duration_cast<std::chrono::minutes>(elapsedSeconds - h);
214 auto s = (elapsedSeconds - h - m).count();
215
216 if (h.count() > 0)
217 stream << h.count() << ':' << std::setw(2) << std::right << std::setfill('0');
218 stream << m.count() << ':' << std::setw(2) << std::right << std::setfill('0') << s;
219 return stream << (h.count() > 0 ? 'h' : 'm');
220}
221
222} // namespace
223
224/// Print event and time statistics.
225void ProgressHelper::PrintStats(std::ostream &stream, std::size_t currentEventCount,
226 std::chrono::seconds elapsedSeconds) const
227{
228 RestoreStreamState restore(stream);
229 auto evtpersec = EvtPerSec();
230 auto GetNEventsOfCurrentFile = ComputeNEventsSoFar();
231 auto currentFileIdx = ComputeCurrentFileIdx();
232 auto totalFiles = fTotalFiles;
233
235 stream << "\033[35m";
236 stream << "["
237 << "Elapsed time: " << elapsedSeconds << " ";
239 stream << "\033[0m";
240 stream << "processing file: " << currentFileIdx << " / " << totalFiles << " ";
241
242 // Event counts:
244 stream << "\033[32m";
245
246 stream << "processed evts: " << currentEventCount;
247 if (GetNEventsOfCurrentFile != 0) {
248 stream << " / " << std::scientific << std::setprecision(2) << GetNEventsOfCurrentFile;
249 }
250 stream << " ";
251
253 stream << "\033[0m";
254
255 // events/s
256 stream << std::scientific << std::setprecision(2) << evtpersec << " evt/s";
257
258 // Time statistics:
259 if (GetNEventsOfCurrentFile != 0) {
261 stream << "\033[35m";
262 std::chrono::seconds remainingSeconds(
263 static_cast<long long>((ComputeNEventsSoFar() - currentEventCount) / evtpersec));
264 stream << " " << remainingSeconds << " "
265 << " remaining time (per file being processed)";
267 stream << "\033[0m";
268 }
269
270 stream << "] ";
271}
272
273void ProgressHelper::PrintStatsFinal(std::ostream &stream, std::chrono::seconds elapsedSeconds) const
274{
275 RestoreStreamState restore(stream);
276 auto totalEvents = ComputeNEventsSoFar();
277 auto totalFiles = fTotalFiles;
278
280 stream << "\033[35m";
281 stream << "["
282 << "Total elapsed time: " << elapsedSeconds << " ";
284 stream << "\033[0m";
285 stream << "processed files: " << totalFiles << " / " << totalFiles << " ";
286
287 // Event counts:
289 stream << "\033[32m";
290
291 stream << "processed evts: " << totalEvents;
292 if (totalEvents != 0) {
293 stream << " / " << std::scientific << std::setprecision(2) << totalEvents;
294 }
295
297 stream << "\033[0m";
298
299 stream << "] ";
300}
301
302/// Print a progress bar of width `ProgressHelper::fBarWidth` if `fGetNEventsOfCurrentFile` is known.
303void ProgressHelper::PrintProgressBar(std::ostream &stream, std::size_t currentEventCount) const
304{
305 auto GetNEventsOfCurrentFile = ComputeNEventsSoFar();
306 if (GetNEventsOfCurrentFile == 0)
307 return;
308
309 RestoreStreamState restore(stream);
310
311 double completion = double(currentEventCount) / GetNEventsOfCurrentFile;
312 unsigned int nBar = std::min(completion, 1.) * fBarWidth;
313
314 std::string bars(std::max(nBar, 1u), '=');
315 bars.back() = (nBar == fBarWidth) ? '=' : '>';
316
318 stream << "\033[33m";
319 stream << '|' << std::setfill(' ') << std::setw(fBarWidth) << std::left << bars << "| ";
321 stream << "\033[0m";
322}
323//*/
324
325class ProgressBarAction final : public ROOT::Detail::RDF::RActionImpl<ProgressBarAction> {
326public:
327 using Result_t = int;
328
329private:
330 std::shared_ptr<ProgressHelper> fHelper;
331 std::shared_ptr<int> fDummyResult = std::make_shared<int>();
332
333public:
334 ProgressBarAction(std::shared_ptr<ProgressHelper> r) : fHelper(std::move(r)) {}
335
336 std::shared_ptr<Result_t> GetResultPtr() const { return fDummyResult; }
337
338 void Initialize() {}
339 void InitTask(TTreeReader *, unsigned int) {}
340
341 void Exec(unsigned int) {}
342
343 void Finalize()
344 {
345 std::mutex fPrintMutex;
346 if (!fPrintMutex.try_lock())
347 return;
348 std::lock_guard<std::mutex> lockGuard(fPrintMutex, std::adopt_lock);
349 const auto &[eventCount, elapsedSeconds] = fHelper->RecordEvtCountAndTime();
350
351 // The next line resets the current line output in the terminal.
352 // Brings the cursor at the beginning ('\r'), prints whitespace with the
353 // same length as the terminal size, then resets the cursor again so we
354 // can print the final stats on a clean line.
355 std::cout << '\r' << std::string(get_tty_size(), ' ') << '\r';
356 fHelper->PrintStatsFinal(std::cout, elapsedSeconds);
357 std::cout << '\n';
358 }
359
360 std::string GetActionName() { return "ProgressBar"; }
361 // dummy implementation of PartialUpdate
362 int &PartialUpdate(unsigned int) { return *fDummyResult; }
363
365 {
366 return [this](unsigned int slot, const ROOT::RDF::RSampleInfo &id) {
367 this->fHelper->registerNewSample(slot, id);
368 return this->fHelper->ComputeNEventsSoFar();
369 };
370 }
371};
372
374{
375 auto total_files = node.GetNFiles();
376 auto progress = std::make_shared<ProgressHelper>(1000, total_files);
377 ProgressBarAction c(progress);
378 auto r = node.Book<>(c);
379 r.OnPartialResultSlot(1000, [progress](unsigned int slot, auto &&arg) { (*progress)(slot, arg); });
380}
381
383{
384 auto node = ROOT::RDF::AsRNode(dataframe);
386}
387} // namespace Experimental
388} // namespace RDF
389} // namespace ROOT
int get_tty_size()
std::ostream & fStream
std::ostream::char_type fFillChar
std::ios_base::fmtflags fFlags
#define R__LOG_INFO(...)
Definition RLogger.hxx:364
#define b(i)
Definition RSha256.hxx:100
#define c(i)
Definition RSha256.hxx:101
#define a(i)
Definition RSha256.hxx:99
#define h(i)
Definition RSha256.hxx:106
#define e(i)
Definition RSha256.hxx:103
TBuffer & operator<<(TBuffer &buf, const Tmpl *obj)
Definition TBuffer.h:397
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:229
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize id
Option_t Option_t width
TCanvas * bars()
Definition bars.C:1
Base class for action helpers, see RInterface::Book() for more information.
ELogLevel GetEffectiveVerbosity(const RLogManager &mgr) const
Definition RLogger.hxx:313
static RLogManager & Get()
Definition RLogger.cxx:62
Change the verbosity level (global or specific to the RLogChannel passed to the constructor) for the ...
Definition RLogger.hxx:243
ROOT::RDF::SampleCallback_t GetSampleCallback() final
Override this method to register a callback that is executed before the processing a new data sample ...
std::shared_ptr< ProgressHelper > fHelper
ProgressBarAction(std::shared_ptr< ProgressHelper > r)
void InitTask(TTreeReader *, unsigned int)
std::shared_ptr< Result_t > GetResultPtr() const
std::pair< std::size_t, std::chrono::seconds > RecordEvtCountAndTime()
Record current event counts and time stamp, populate evts/s statistics array.
void PrintProgressBar(std::ostream &stream, std::size_t currentEventCount) const
Print a progress bar of width ProgressHelper::fBarWidth if fGetNEventsOfCurrentFile is known.
void PrintStats(std::ostream &stream, std::size_t currentEventCount, std::chrono::seconds totalElapsedSeconds) const
Print event and time statistics.
std::array< double, 20 > fEventsPerSecondStatistics
double EvtPerSec() const
Compute a running mean of events/s.
std::atomic< std::size_t > fProcessedEvents
ProgressHelper(std::size_t increment, unsigned int totalFiles=1, unsigned int progressBarWidth=40, unsigned int printInterval=1, bool useColors=true)
Create a progress helper.
std::chrono::time_point< std::chrono::system_clock > fLastPrintTime
std::chrono::time_point< std::chrono::system_clock > fBeginTime
void PrintStatsFinal(std::ostream &stream, std::chrono::seconds totalElapsedSeconds) const
The public interface to the RDataFrame federation of classes.
RResultPtr< typename std::decay_t< Helper >::Result_t > Book(Helper &&helper, const ColumnNames_t &columns={})
Book execution of a custom action using a user-defined helper object.
A type-erased version of RResultPtr and RResultMap.
Smart pointer for the return type of actions.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
ROOT's RDataFrame offers a modern, high-level interface for analysis of data stored in TTree ,...
This class provides a simple interface to execute the same task multiple times in parallel threads,...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
Stopwatch class.
Definition TStopwatch.h:28
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Double_t CpuTime()
Stop the stopwatch (if it is running) and return the cputime (in seconds) passed between the start an...
void Stop()
Stop the stopwatch.
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:44
ROOT::Experimental::RLogChannel & RDFLogChannel()
Definition RDFUtils.cxx:37
RLogChannel & GetChannelOrManager()
Definition RLogger.hxx:302
@ kDebug
Debug information; only useful for developers; can have added verbosity up to 255-kDebug.
RResultMap< T > VariationsFor(RResultPtr< T > resPtr)
Produce all required systematic variations for the given result.
void AddProgressBar(ROOT::RDF::RNode df)
Add ProgressBar to a ROOT::RDF::RNode.
unsigned int RunGraphs(std::vector< RResultHandle > handles)
Trigger the event loop of multiple RDataFrames concurrently.
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with an RDataFrame computation graph via e....
RNode AsRNode(NodeType node)
Cast a RDataFrame node to the common type ROOT::RDF::RNode.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:570
TMarker m
Definition textangle.C:8