Logo ROOT  
Reference Guide
 
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Loading...
Searching...
No Matches
RDFHelpers.cxx
Go to the documentation of this file.
1// Author: Stefan Wunsch, Enrico Guiraud CERN 09/2020
2
3/*************************************************************************
4 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
5 * All rights reserved. *
6 * *
7 * For the licensing terms see $ROOTSYS/LICENSE. *
8 * For the list of contributors see $ROOTSYS/README/CREDITS. *
9 *************************************************************************/
10
11#include "ROOT/RDFHelpers.hxx"
12#include "TROOT.h" // IsImplicitMTEnabled
13#include "TError.h" // Warning
14#include "TStopwatch.h"
15#include "RConfigure.h" // R__USE_IMT
16#include "ROOT/RLogger.hxx"
17#include "ROOT/RDF/RLoopManager.hxx" // for RLoopManager
18#include "ROOT/RDF/Utils.hxx"
19#include "ROOT/RResultHandle.hxx" // for RResultHandle, RunGraphs
20#include "ROOT/RSlotStack.hxx"
21#ifdef R__USE_IMT
23#endif // R__USE_IMT
24
25#include <algorithm>
26#include <iostream>
27#include <set>
28#include <cstdio>
29
30// TODO, this function should be part of core libraries
31#include <numeric>
32#if (!defined(_WIN32)) && (!defined(_WIN64))
33#include <unistd.h>
34#endif
35
36#if defined(_WIN32) || defined(_WIN64)
37#define WIN32_LEAN_AND_MEAN
38#define VC_EXTRALEAN
39#include <io.h>
40#include <Windows.h>
41#else
42#include <sys/ioctl.h>
43#endif
44
45// Get terminal size for progress bar
47{
48#if defined(_WIN32) || defined(_WIN64)
49 if (!_isatty(_fileno(stdout)))
50 return 0;
51 int width = 0;
54 width = (int)(csbi.srWindow.Right - csbi.srWindow.Left + 1);
55 return width;
56#else
57 int width = 0;
58 struct winsize w;
60 width = (int)(w.ws_col);
61 return width;
62#endif
63}
64
66
67unsigned int ROOT::RDF::RunGraphs(std::vector<RResultHandle> handles)
68{
69 if (handles.empty()) {
70 Warning("RunGraphs", "Got an empty list of handles, now quitting.");
71 return 0u;
72 }
73
74 // Check that there are results which have not yet been run
75 const unsigned int nToRun =
76 std::count_if(handles.begin(), handles.end(), [](const auto &h) { return !h.IsReady(); });
77 if (nToRun < handles.size()) {
78 Warning("RunGraphs", "Got %zu handles from which %zu link to results which are already ready.", handles.size(),
79 handles.size() - nToRun);
80 }
81 if (nToRun == 0u)
82 return 0u;
83
84 // Find the unique event loops
85 auto sameGraph = [](const RResultHandle &a, const RResultHandle &b) { return a.fLoopManager < b.fLoopManager; };
86 std::set<RResultHandle, decltype(sameGraph)> s(handles.begin(), handles.end(), sameGraph);
87 std::vector<RResultHandle> uniqueLoops(s.begin(), s.end());
88
89 // Trigger jitting. One call is enough to jit the code required by all computation graphs.
91 sw.Start();
92 {
96 // a very high verbosity was requested, let's not silence anything
97 uniqueLoops[0].fLoopManager->Jit();
98 } else {
99 // silence logs from RLoopManager::Jit: RunGraphs does its own logging
101 uniqueLoops[0].fLoopManager->Jit();
102 }
103 }
104 sw.Stop();
106 << "Just-in-time compilation phase for RunGraphs (" << uniqueLoops.size()
107 << " unique computation graphs) completed"
108 << (sw.RealTime() > 1e-3 ? " in " + std::to_string(sw.RealTime()) + " seconds." : " in less than 1ms.");
109
110 // Trigger the unique event loops
111 auto slotStack = std::make_shared<ROOT::Internal::RSlotStack>(ROOT::GetThreadPoolSize());
112 auto run = [&slotStack](RResultHandle &h) {
113 if (h.fLoopManager) {
114 h.fLoopManager->SetSlotStack(slotStack);
115 h.fLoopManager->Run(/*jit=*/false);
116 }
117 };
118
119 sw.Start();
120#ifdef R__USE_IMT
123 } else {
124#endif
125 std::for_each(uniqueLoops.begin(), uniqueLoops.end(), run);
126#ifdef R__USE_IMT
127 }
128#endif
129 sw.Stop();
131 << "Finished RunGraphs run (" << uniqueLoops.size() << " unique computation graphs, " << sw.CpuTime() << "s CPU, "
132 << sw.RealTime() << "s elapsed).";
133
134 return uniqueLoops.size();
135}
136
138{
139 throw std::logic_error("Varying a Snapshot result is not implemented yet.");
140}
141
142namespace ROOT {
143namespace RDF {
144
145namespace Experimental {
146
148 unsigned int printInterval, bool useColors)
149 : fPrintInterval(printInterval),
150 fIncrement{increment},
151 fBarWidth{progressBarWidth = int(get_tty_size() / 4)},
152 fTotalFiles{totalFiles},
154 fIsTTY{_isatty(_fileno(stdout)) != 0},
155 fUseShellColours{false && useColors}
156#else
157 fIsTTY{isatty(fileno(stdout)) == 1},
158 fUseShellColours{useColors && fIsTTY} // Control characters only with terminals.
159#endif
160{
161}
162
163/// Compute a running mean of events/s.
174
175/// Record current event counts and time stamp, populate evts/s statistics array.
194
195namespace {
196
197struct RestoreStreamState {
198 RestoreStreamState(std::ostream &stream) : fStream(stream), fFlags(stream.flags()), fFillChar(stream.fill()) {}
200 {
201 fStream.flags(fFlags);
202 fStream.fill(fFillChar);
203 }
204
205 std::ostream &fStream;
206 std::ios_base::fmtflags fFlags;
207 std::ostream::char_type fFillChar;
208};
209
210/// Format std::chrono::seconds as `1:30m`.
211std::ostream &operator<<(std::ostream &stream, std::chrono::seconds elapsedSeconds)
212{
213 RestoreStreamState restore(stream);
214 auto h = std::chrono::duration_cast<std::chrono::hours>(elapsedSeconds);
215 auto m = std::chrono::duration_cast<std::chrono::minutes>(elapsedSeconds - h);
216 auto s = (elapsedSeconds - h - m).count();
217
218 if (h.count() > 0)
219 stream << h.count() << ':' << std::setw(2) << std::right << std::setfill('0');
220 stream << m.count() << ':' << std::setw(2) << std::right << std::setfill('0') << s;
221 return stream << (h.count() > 0 ? 'h' : 'm');
222}
223
224} // namespace
225
226/// Print event and time statistics.
227void ProgressHelper::PrintStats(std::ostream &stream, std::size_t currentEventCount,
228 std::chrono::seconds elapsedSeconds) const
229{
230 RestoreStreamState restore(stream);
231 auto evtpersec = EvtPerSec();
234 auto totalFiles = fTotalFiles;
235
237 stream << "\033[35m";
238 stream << "["
239 << "Elapsed time: " << elapsedSeconds << " ";
241 stream << "\033[0m";
242 stream << "processing file: " << currentFileIdx << " / " << totalFiles << " ";
243
244 // Event counts:
246 stream << "\033[32m";
247
248 stream << "processed evts: " << currentEventCount;
249 if (GetNEventsOfCurrentFile != 0) {
250 stream << " / " << std::scientific << std::setprecision(2) << GetNEventsOfCurrentFile;
251 }
252 stream << " ";
253
255 stream << "\033[0m";
256
257 // events/s
258 stream << std::scientific << std::setprecision(2) << evtpersec << " evt/s";
259
260 // Time statistics:
261 if (GetNEventsOfCurrentFile != 0) {
263 stream << "\033[35m";
264 std::chrono::seconds remainingSeconds(
265 static_cast<long long>((ComputeNEventsSoFar() - currentEventCount) / evtpersec));
266 stream << " " << remainingSeconds << " "
267 << " remaining time (per file being processed)";
269 stream << "\033[0m";
270 }
271
272 stream << "] ";
273}
274
275void ProgressHelper::PrintStatsFinal(std::ostream &stream, std::chrono::seconds elapsedSeconds) const
276{
277 RestoreStreamState restore(stream);
279 auto totalFiles = fTotalFiles;
280
282 stream << "\033[35m";
283 stream << "["
284 << "Total elapsed time: " << elapsedSeconds << " ";
286 stream << "\033[0m";
287 stream << "processed files: " << totalFiles << " / " << totalFiles << " ";
288
289 // Event counts:
291 stream << "\033[32m";
292
293 stream << "processed evts: " << totalEvents;
294 if (totalEvents != 0) {
295 stream << " / " << std::scientific << std::setprecision(2) << totalEvents;
296 }
297
299 stream << "\033[0m";
300
301 stream << "] ";
302}
303
304/// Print a progress bar of width `ProgressHelper::fBarWidth` if `fGetNEventsOfCurrentFile` is known.
305void ProgressHelper::PrintProgressBar(std::ostream &stream, std::size_t currentEventCount) const
306{
309 return;
310
311 RestoreStreamState restore(stream);
312
314 unsigned int nBar = std::min(completion, 1.) * fBarWidth;
315
316 std::string bars(std::max(nBar, 1u), '=');
317 bars.back() = (nBar == fBarWidth) ? '=' : '>';
318
320 stream << "\033[33m";
321 stream << '|' << std::setfill(' ') << std::setw(fBarWidth) << std::left << bars << "| ";
323 stream << "\033[0m";
324}
325//*/
326
327class ProgressBarAction final : public ROOT::Detail::RDF::RActionImpl<ProgressBarAction> {
328public:
329 using Result_t = int;
330
331private:
332 std::shared_ptr<ProgressHelper> fHelper;
333 std::shared_ptr<int> fDummyResult = std::make_shared<int>();
334
335public:
336 ProgressBarAction(std::shared_ptr<ProgressHelper> r) : fHelper(std::move(r)) {}
337
338 std::shared_ptr<Result_t> GetResultPtr() const { return fDummyResult; }
339
340 void Initialize() {}
341 void InitTask(TTreeReader *, unsigned int) {}
342
343 void Exec(unsigned int) {}
344
345 void Finalize()
346 {
347 std::mutex fPrintMutex;
348 if (!fPrintMutex.try_lock())
349 return;
350 std::lock_guard<std::mutex> lockGuard(fPrintMutex, std::adopt_lock);
351 const auto &[eventCount, elapsedSeconds] = fHelper->RecordEvtCountAndTime();
352
353 // The next line resets the current line output in the terminal.
354 // Brings the cursor at the beginning ('\r'), prints whitespace with the
355 // same length as the terminal size, then resets the cursor again so we
356 // can print the final stats on a clean line.
357 std::cout << '\r' << std::string(get_tty_size(), ' ') << '\r';
358 fHelper->PrintStatsFinal(std::cout, elapsedSeconds);
359 std::cout << '\n';
360 }
361
362 std::string GetActionName() { return "ProgressBar"; }
363 // dummy implementation of PartialUpdate
364 int &PartialUpdate(unsigned int) { return *fDummyResult; }
365
367 {
368 return [this](unsigned int slot, const ROOT::RDF::RSampleInfo &id) {
369 this->fHelper->registerNewSample(slot, id);
370 return this->fHelper->ComputeNEventsSoFar();
371 };
372 }
373};
374
376{
377 auto total_files = node.GetNFiles();
378 auto progress = std::make_shared<ProgressHelper>(1000, total_files);
379 ProgressBarAction c(progress);
380 auto r = node.Book<>(c);
381 r.OnPartialResultSlot(1000, [progress](unsigned int slot, auto &&arg) { (*progress)(slot, arg); });
382}
383
385{
386 auto node = ROOT::RDF::AsRNode(dataframe);
388}
389} // namespace Experimental
390} // namespace RDF
391} // namespace ROOT
int get_tty_size()
std::ostream & fStream
std::ostream::char_type fFillChar
std::ios_base::fmtflags fFlags
#define R__LOG_INFO(...)
Definition RLogger.hxx:359
#define b(i)
Definition RSha256.hxx:100
#define c(i)
Definition RSha256.hxx:101
#define a(i)
Definition RSha256.hxx:99
#define h(i)
Definition RSha256.hxx:106
#define e(i)
Definition RSha256.hxx:103
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:229
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize id
Option_t Option_t width
TCanvas * bars()
Definition bars.C:1
Base class for action helpers, see RInterface::Book() for more information.
ROOT::RDF::SampleCallback_t GetSampleCallback() final
Override this method to register a callback that is executed before the processing a new data sample ...
std::shared_ptr< ProgressHelper > fHelper
ProgressBarAction(std::shared_ptr< ProgressHelper > r)
void InitTask(TTreeReader *, unsigned int)
std::shared_ptr< Result_t > GetResultPtr() const
std::pair< std::size_t, std::chrono::seconds > RecordEvtCountAndTime()
Record current event counts and time stamp, populate evts/s statistics array.
void PrintProgressBar(std::ostream &stream, std::size_t currentEventCount) const
Print a progress bar of width ProgressHelper::fBarWidth if fGetNEventsOfCurrentFile is known.
void PrintStats(std::ostream &stream, std::size_t currentEventCount, std::chrono::seconds totalElapsedSeconds) const
Print event and time statistics.
std::array< double, 20 > fEventsPerSecondStatistics
double EvtPerSec() const
Compute a running mean of events/s.
std::atomic< std::size_t > fProcessedEvents
ProgressHelper(std::size_t increment, unsigned int totalFiles=1, unsigned int progressBarWidth=40, unsigned int printInterval=1, bool useColors=true)
Create a progress helper.
std::chrono::time_point< std::chrono::system_clock > fLastPrintTime
std::chrono::time_point< std::chrono::system_clock > fBeginTime
void PrintStatsFinal(std::ostream &stream, std::chrono::seconds totalElapsedSeconds) const
The public interface to the RDataFrame federation of classes.
RResultPtr< typename std::decay_t< Helper >::Result_t > Book(Helper &&helper, const ColumnNames_t &columns={})
Book execution of a custom action using a user-defined helper object.
A type-erased version of RResultPtr and RResultMap.
Smart pointer for the return type of actions.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
ROOT's RDataFrame offers a modern, high-level interface for analysis of data stored in TTree ,...
ELogLevel GetEffectiveVerbosity(const RLogManager &mgr) const
Definition RLogger.hxx:310
static RLogManager & Get()
Definition RLogger.cxx:60
Change the verbosity level (global or specific to the RLogChannel passed to the constructor) for the ...
Definition RLogger.hxx:240
const_iterator begin() const
const_iterator end() const
This class provides a simple interface to execute the same task multiple times in parallel threads,...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
Stopwatch class.
Definition TStopwatch.h:28
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:46
ROOT::RLogChannel & RDFLogChannel()
Definition RDFUtils.cxx:41
RLogChannel & GetChannelOrManager()
Definition RLogger.hxx:299
RResultMap< T > VariationsFor(RResultPtr< T > resPtr)
Produce all required systematic variations for the given result.
void AddProgressBar(ROOT::RDF::RNode df)
Add ProgressBar to a ROOT::RDF::RNode.
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with an RDataFrame computation graph via e....
unsigned int RunGraphs(std::vector< RResultHandle > handles)
Run the event loops of multiple RDataFrames concurrently.
std::ostream & operator<<(std::ostream &os, const RDFDescription &description)
RNode AsRNode(NodeType node)
Cast a RDataFrame node to the common type ROOT::RDF::RNode.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:570
UInt_t GetThreadPoolSize()
Returns the size of ROOT's thread pool.
Definition TROOT.cxx:577
@ kDebug
Debug information; only useful for developers; can have added verbosity up to 255-kDebug.
@ kError
An error.
TMarker m
Definition textangle.C:8