Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RDFHelpers.cxx
Go to the documentation of this file.
1// Author: Stefan Wunsch, Enrico Guiraud CERN 09/2020
2
3/*************************************************************************
4 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
5 * All rights reserved. *
6 * *
7 * For the licensing terms see $ROOTSYS/LICENSE. *
8 * For the list of contributors see $ROOTSYS/README/CREDITS. *
9 *************************************************************************/
10
11#include "ROOT/RDFHelpers.hxx"
12#include "TROOT.h" // IsImplicitMTEnabled
13#include "TError.h" // Warning
14#include "TStopwatch.h"
15#include "RConfigure.h" // R__USE_IMT
16#include "ROOT/RLogger.hxx"
17#include "ROOT/RDF/RLoopManager.hxx" // for RLoopManager
18#include "ROOT/RDF/Utils.hxx"
19#include "ROOT/RResultHandle.hxx" // for RResultHandle, RunGraphs
20#ifdef R__USE_IMT
22#endif // R__USE_IMT
23
24#include <algorithm>
25#include <iostream>
26#include <set>
27#include <stdio.h>
28
29// TODO, this function should be part of core libraries
30#include <numeric>
31#if (!defined(_WIN32)) && (!defined(_WIN64))
32#include <unistd.h>
33#endif
34
35#if defined(_WIN32) || defined(_WIN64)
36#define WIN32_LEAN_AND_MEAN
37#define VC_EXTRALEAN
38#include <io.h>
39#include <Windows.h>
40#else
41#include <sys/ioctl.h>
42#endif
43
44// Get terminal size for progress bar
46{
47#if defined(_WIN32) || defined(_WIN64)
48 if (!_isatty(_fileno(stdout)))
49 return 0;
50 int width = 0;
51 CONSOLE_SCREEN_BUFFER_INFO csbi;
52 if (GetConsoleScreenBufferInfo(GetStdHandle(STD_OUTPUT_HANDLE), &csbi))
53 width = (int)(csbi.srWindow.Right - csbi.srWindow.Left + 1);
54 return width;
55#else
56 int width = 0;
57 struct winsize w;
58 ioctl(fileno(stdout), TIOCGWINSZ, &w);
59 width = (int)(w.ws_col);
60 return width;
61#endif
62}
63
65
66unsigned int ROOT::RDF::RunGraphs(std::vector<RResultHandle> handles)
67{
68 if (handles.empty()) {
69 Warning("RunGraphs", "Got an empty list of handles, now quitting.");
70 return 0u;
71 }
72
73 // Check that there are results which have not yet been run
74 const unsigned int nToRun =
75 std::count_if(handles.begin(), handles.end(), [](const auto &h) { return !h.IsReady(); });
76 if (nToRun < handles.size()) {
77 Warning("RunGraphs", "Got %lu handles from which %lu link to results which are already ready.", handles.size(),
78 handles.size() - nToRun);
79 }
80 if (nToRun == 0u)
81 return 0u;
82
83 // Find the unique event loops
84 auto sameGraph = [](const RResultHandle &a, const RResultHandle &b) { return a.fLoopManager < b.fLoopManager; };
85 std::set<RResultHandle, decltype(sameGraph)> s(handles.begin(), handles.end(), sameGraph);
86 std::vector<RResultHandle> uniqueLoops(s.begin(), s.end());
87
88 // Trigger jitting. One call is enough to jit the code required by all computation graphs.
89 TStopwatch sw;
90 sw.Start();
91 {
92 const auto effectiveVerbosity =
95 if (effectiveVerbosity >= ROOT::Experimental::ELogLevel::kDebug + 10) {
96 // a very high verbosity was requested, let's not silence anything
97 uniqueLoops[0].fLoopManager->Jit();
98 } else {
99 // silence logs from RLoopManager::Jit: RunGraphs does its own logging
102 uniqueLoops[0].fLoopManager->Jit();
103 }
104 }
105 sw.Stop();
107 << "Just-in-time compilation phase for RunGraphs (" << uniqueLoops.size()
108 << " unique computation graphs) completed"
109 << (sw.RealTime() > 1e-3 ? " in " + std::to_string(sw.RealTime()) + " seconds." : " in less than 1ms.");
110
111 // Trigger the unique event loops
112 auto run = [](RResultHandle &h) {
113 if (h.fLoopManager)
114 h.fLoopManager->Run(/*jit=*/false);
115 };
116
117 sw.Start();
118#ifdef R__USE_IMT
120 ROOT::TThreadExecutor{}.Foreach(run, uniqueLoops);
121 } else {
122#endif
123 std::for_each(uniqueLoops.begin(), uniqueLoops.end(), run);
124#ifdef R__USE_IMT
125 }
126#endif
127 sw.Stop();
129 << "Finished RunGraphs run (" << uniqueLoops.size() << " unique computation graphs, " << sw.CpuTime() << "s CPU, "
130 << sw.RealTime() << "s elapsed).";
131
132 return uniqueLoops.size();
133}
134
136{
137 throw std::logic_error("Varying a Snapshot result is not implemented yet.");
138}
139
140namespace ROOT {
141namespace RDF {
142
143namespace Experimental {
144
145ProgressHelper::ProgressHelper(std::size_t increment, unsigned int totalFiles, unsigned int progressBarWidth,
146 unsigned int printInterval, bool useColors)
147 : fPrintInterval(printInterval),
148 fIncrement{increment},
149 fBarWidth{progressBarWidth = int(get_tty_size() / 4)},
150 fTotalFiles{totalFiles},
151#if defined(_WIN32) || defined(_WIN64)
152 fIsTTY{_isatty(_fileno(stdout)) != 0},
153 fUseShellColours{false && useColors}
154#else
155 fIsTTY{isatty(fileno(stdout)) == 1},
156 fUseShellColours{useColors && fIsTTY} // Control characters only with terminals.
157#endif
158{
159}
160
161/// Compute a running mean of events/s.
163{
165 return std::accumulate(fEventsPerSecondStatistics.begin(),
168 else
169 return std::accumulate(fEventsPerSecondStatistics.begin(), fEventsPerSecondStatistics.end(), 0.) /
171}
172
173/// Record current event counts and time stamp, populate evts/s statistics array.
174std::pair<std::size_t, std::chrono::seconds> ProgressHelper::RecordEvtCountAndTime()
175{
176 using namespace std::chrono;
177
178 auto currentEventCount = fProcessedEvents.load();
179 auto eventsPerTimeInterval = currentEventCount - fLastProcessedEvents;
180 fLastProcessedEvents = currentEventCount;
181
182 auto oldPrintTime = fLastPrintTime;
183 auto newPrintTime = system_clock::now();
184 fLastPrintTime = newPrintTime;
185
186 duration<double> secondsCurrentInterval = newPrintTime - oldPrintTime;
188 eventsPerTimeInterval / secondsCurrentInterval.count();
189
190 return {currentEventCount, duration_cast<seconds>(newPrintTime - fBeginTime)};
191}
192
193namespace {
194/// Format std::chrono::seconds as `1:30m`.
195std::ostream &operator<<(std::ostream &stream, std::chrono::seconds elapsedSeconds)
196{
197 auto h = std::chrono::duration_cast<std::chrono::hours>(elapsedSeconds);
198 auto m = std::chrono::duration_cast<std::chrono::minutes>(elapsedSeconds - h);
199 auto s = (elapsedSeconds - h - m).count();
200 if (h.count() > 0)
201 stream << h.count() << ':' << std::setw(2) << std::right << std::setfill('0');
202 stream << m.count() << ':' << std::setw(2) << std::right << std::setfill('0') << s;
203 return stream << (h.count() > 0 ? 'h' : 'm');
204}
205
206struct RestoreStreamState {
207 RestoreStreamState(std::ostream &stream) : fStream(stream), fFlags(stream.flags()), fFillChar(stream.fill()) {}
208 ~RestoreStreamState()
209 {
210 fStream.setf(fFlags);
211 fStream.fill(fFillChar);
212 }
213
214 std::ostream &fStream;
215 std::ios_base::fmtflags fFlags;
216 std::ostream::char_type fFillChar;
217};
218} // namespace
219
220/// Print event and time statistics.
221void ProgressHelper::PrintStats(std::ostream &stream, std::size_t currentEventCount,
222 std::chrono::seconds elapsedSeconds) const
223{
224 auto evtpersec = EvtPerSec();
225 auto GetNEventsOfCurrentFile = ComputeNEventsSoFar();
226 auto currentFileIdx = ComputeCurrentFileIdx();
227 auto totalFiles = fTotalFiles;
228
230 stream << "\033[35m";
231 stream << "["
232 << "Elapsed time: " << elapsedSeconds << " ";
234 stream << "\033[0m";
235 stream << "processing file: " << currentFileIdx << " / " << totalFiles << " ";
236
237 // Event counts:
239 stream << "\033[32m";
240
241 stream << "processed evts: " << currentEventCount;
242 if (GetNEventsOfCurrentFile != 0) {
243 stream << " / " << std::scientific << std::setprecision(2) << GetNEventsOfCurrentFile;
244 }
245 stream << " ";
246
248 stream << "\033[0m";
249
250 // events/s
251 stream << std::scientific << std::setprecision(2) << evtpersec << " evt/s";
252
253 // Time statistics:
254 if (GetNEventsOfCurrentFile != 0) {
256 stream << "\033[35m";
257 std::chrono::seconds remainingSeconds(
258 static_cast<long long>((ComputeNEventsSoFar() - currentEventCount) / evtpersec));
259 stream << " " << remainingSeconds << " "
260 << " remaining time (per file being processed)";
262 stream << "\033[0m";
263 }
264
265 stream << "] ";
266}
267
268void ProgressHelper::PrintStatsFinal(std::ostream &stream, std::chrono::seconds elapsedSeconds) const
269{
270 auto totalEvents = ComputeNEventsSoFar();
271 auto totalFiles = fTotalFiles;
272
274 stream << "\033[35m";
275 stream << "["
276 << "Total elapsed time: " << elapsedSeconds << " ";
278 stream << "\033[0m";
279 stream << "processed files: " << totalFiles << " / " << totalFiles << " ";
280
281 // Event counts:
283 stream << "\033[32m";
284
285 stream << "processed evts: " << totalEvents;
286 if (totalEvents != 0) {
287 stream << " / " << std::scientific << std::setprecision(2) << totalEvents;
288 }
289
291 stream << "\033[0m";
292
293 stream << "] ";
294}
295
296/// Print a progress bar of width `ProgressHelper::fBarWidth` if `fGetNEventsOfCurrentFile` is known.
297void ProgressHelper::PrintProgressBar(std::ostream &stream, std::size_t currentEventCount) const
298{
299 auto GetNEventsOfCurrentFile = ComputeNEventsSoFar();
300 if (GetNEventsOfCurrentFile == 0)
301 return;
302
303 RestoreStreamState restore(stream);
304
305 double completion = double(currentEventCount) / GetNEventsOfCurrentFile;
306 unsigned int nBar = std::min(completion, 1.) * fBarWidth;
307
308 std::string bars(std::max(nBar, 1u), '=');
309 bars.back() = (nBar == fBarWidth) ? '=' : '>';
310
312 stream << "\033[33m";
313 stream << '|' << std::setfill(' ') << std::setw(fBarWidth) << std::left << bars << "| ";
315 stream << "\033[0m";
316}
317//*/
318
319class ProgressBarAction final : public ROOT::Detail::RDF::RActionImpl<ProgressBarAction> {
320public:
321 using Result_t = int;
322
323private:
324 std::shared_ptr<ProgressHelper> fHelper;
325 std::shared_ptr<int> fDummyResult = std::make_shared<int>();
326
327public:
328 ProgressBarAction(std::shared_ptr<ProgressHelper> r) : fHelper(std::move(r)) {}
329
330 std::shared_ptr<Result_t> GetResultPtr() const { return fDummyResult; }
331
332 void Initialize() {}
333 void InitTask(TTreeReader *, unsigned int) {}
334
335 void Exec(unsigned int) {}
336
337 void Finalize()
338 {
339 std::mutex fPrintMutex;
340 if (!fPrintMutex.try_lock())
341 return;
342 std::lock_guard<std::mutex> lockGuard(fPrintMutex, std::adopt_lock);
343 const auto &[eventCount, elapsedSeconds] = fHelper->RecordEvtCountAndTime();
344
345 // The next line resets the current line output in the terminal.
346 // Brings the cursor at the beginning ('\r'), prints whitespace with the
347 // same length as the terminal size, then resets the cursor again so we
348 // can print the final stats on a clean line.
349 std::cout << '\r' << std::string(get_tty_size(), ' ') << '\r';
350 fHelper->PrintStatsFinal(std::cout, elapsedSeconds);
351 std::cout << '\n';
352 }
353
354 std::string GetActionName() { return "ProgressBar"; }
355 // dummy implementation of PartialUpdate
356 int &PartialUpdate(unsigned int) { return *fDummyResult; }
357
359 {
360 return [this](unsigned int slot, const ROOT::RDF::RSampleInfo &id) {
361 this->fHelper->registerNewSample(slot, id);
362 return this->fHelper->ComputeNEventsSoFar();
363 };
364 }
365};
366
368{
369 auto total_files = node.GetNFiles();
370 auto progress = std::make_shared<ProgressHelper>(1000, total_files);
371 ProgressBarAction c(progress);
372 auto r = node.Book<>(c);
373 r.OnPartialResultSlot(1000, [progress](unsigned int slot, auto &&arg) { (*progress)(slot, arg); });
374}
375
377{
378 auto node = ROOT::RDF::AsRNode(dataframe);
380}
381} // namespace Experimental
382} // namespace RDF
383} // namespace ROOT
int get_tty_size()
std::ostream & fStream
std::ostream::char_type fFillChar
std::ios_base::fmtflags fFlags
#define R__LOG_INFO(...)
Definition RLogger.hxx:364
#define b(i)
Definition RSha256.hxx:100
#define c(i)
Definition RSha256.hxx:101
#define a(i)
Definition RSha256.hxx:99
#define h(i)
Definition RSha256.hxx:106
#define e(i)
Definition RSha256.hxx:103
TBuffer & operator<<(TBuffer &buf, const Tmpl *obj)
Definition TBuffer.h:399
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:229
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize id
Option_t Option_t width
TCanvas * bars()
Definition bars.C:1
Base class for action helpers, see RInterface::Book() for more information.
ELogLevel GetEffectiveVerbosity(const RLogManager &mgr) const
Definition RLogger.hxx:313
static RLogManager & Get()
Definition RLogger.cxx:62
Change the verbosity level (global or specific to the RLogChannel passed to the constructor) for the ...
Definition RLogger.hxx:243
std::shared_ptr< ProgressHelper > fHelper
ProgressBarAction(std::shared_ptr< ProgressHelper > r)
void InitTask(TTreeReader *, unsigned int)
std::shared_ptr< Result_t > GetResultPtr() const
ROOT::RDF::SampleCallback_t GetSampleCallback()
Override this method to register a callback that is executed before the processing a new data sample ...
std::pair< std::size_t, std::chrono::seconds > RecordEvtCountAndTime()
Record current event counts and time stamp, populate evts/s statistics array.
void PrintProgressBar(std::ostream &stream, std::size_t currentEventCount) const
Print a progress bar of width ProgressHelper::fBarWidth if fGetNEventsOfCurrentFile is known.
void PrintStats(std::ostream &stream, std::size_t currentEventCount, std::chrono::seconds totalElapsedSeconds) const
Print event and time statistics.
std::array< double, 20 > fEventsPerSecondStatistics
double EvtPerSec() const
Compute a running mean of events/s.
std::atomic< std::size_t > fProcessedEvents
ProgressHelper(std::size_t increment, unsigned int totalFiles=1, unsigned int progressBarWidth=40, unsigned int printInterval=1, bool useColors=true)
Create a progress helper.
std::chrono::time_point< std::chrono::system_clock > fLastPrintTime
std::chrono::time_point< std::chrono::system_clock > fBeginTime
void PrintStatsFinal(std::ostream &stream, std::chrono::seconds totalElapsedSeconds) const
The public interface to the RDataFrame federation of classes.
RResultPtr< typename std::decay_t< Helper >::Result_t > Book(Helper &&helper, const ColumnNames_t &columns={})
Book execution of a custom action using a user-defined helper object.
A type-erased version of RResultPtr and RResultMap.
Smart pointer for the return type of actions.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
ROOT's RDataFrame offers a modern, high-level interface for analysis of data stored in TTree ,...
This class provides a simple interface to execute the same task multiple times in parallel threads,...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
Stopwatch class.
Definition TStopwatch.h:28
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Double_t CpuTime()
Stop the stopwatch (if it is running) and return the cputime (in seconds) passed between the start an...
void Stop()
Stop the stopwatch.
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:44
ROOT::Experimental::RLogChannel & RDFLogChannel()
Definition RDFUtils.cxx:37
RLogChannel & GetChannelOrManager()
Definition RLogger.hxx:302
@ kDebug
Debug information; only useful for developers; can have added verbosity up to 255-kDebug.
RResultMap< T > VariationsFor(RResultPtr< T > resPtr)
Produce all required systematic variations for the given result.
void AddProgressBar(ROOT::RDF::RNode df)
unsigned int RunGraphs(std::vector< RResultHandle > handles)
Trigger the event loop of multiple RDataFrames concurrently.
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with a RDataFrame computation graph via e....
RNode AsRNode(NodeType node)
Cast a RDataFrame node to the common type ROOT::RDF::RNode.
This file contains a specialised ROOT message handler to test for diagnostic in unit tests.
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:570
TMarker m
Definition textangle.C:8