15#include "RConfigure.h"
31#if (!defined(_WIN32)) && (!defined(_WIN64))
35#if defined(_WIN32) || defined(_WIN64)
36#define WIN32_LEAN_AND_MEAN
47#if defined(_WIN32) || defined(_WIN64)
48 if (!_isatty(_fileno(stdout)))
51 CONSOLE_SCREEN_BUFFER_INFO csbi;
52 if (GetConsoleScreenBufferInfo(GetStdHandle(STD_OUTPUT_HANDLE), &csbi))
53 width = (
int)(csbi.srWindow.Right - csbi.srWindow.Left + 1);
58 ioctl(fileno(stdout), TIOCGWINSZ, &
w);
68 if (handles.empty()) {
69 Warning(
"RunGraphs",
"Got an empty list of handles, now quitting.");
74 const unsigned int nToRun =
75 std::count_if(handles.begin(), handles.end(), [](
const auto &
h) { return !h.IsReady(); });
76 if (nToRun < handles.size()) {
77 Warning(
"RunGraphs",
"Got %zu handles from which %zu link to results which are already ready.", handles.size(),
78 handles.size() - nToRun);
85 std::set<
RResultHandle,
decltype(sameGraph)> s(handles.begin(), handles.end(), sameGraph);
86 std::vector<RResultHandle> uniqueLoops(s.begin(), s.end());
92 const auto effectiveVerbosity =
97 uniqueLoops[0].fLoopManager->Jit();
102 uniqueLoops[0].fLoopManager->Jit();
107 <<
"Just-in-time compilation phase for RunGraphs (" << uniqueLoops.size()
108 <<
" unique computation graphs) completed"
109 << (sw.
RealTime() > 1
e-3 ?
" in " + std::to_string(sw.
RealTime()) +
" seconds." :
" in less than 1ms.");
114 h.fLoopManager->Run(
false);
123 std::for_each(uniqueLoops.begin(), uniqueLoops.end(), run);
129 <<
"Finished RunGraphs run (" << uniqueLoops.size() <<
" unique computation graphs, " << sw.
CpuTime() <<
"s CPU, "
132 return uniqueLoops.size();
137 throw std::logic_error(
"Varying a Snapshot result is not implemented yet.");
143namespace Experimental {
146 unsigned int printInterval,
bool useColors)
147 : fPrintInterval(printInterval),
148 fIncrement{increment},
150 fTotalFiles{totalFiles},
151#if defined(_WIN32) || defined(_WIN64)
152 fIsTTY{_isatty(_fileno(stdout)) != 0},
153 fUseShellColours{false && useColors}
155 fIsTTY{isatty(fileno(stdout)) == 1},
156 fUseShellColours{useColors && fIsTTY}
176 using namespace std::chrono;
183 auto newPrintTime = system_clock::now();
186 duration<double> secondsCurrentInterval = newPrintTime - oldPrintTime;
188 eventsPerTimeInterval / secondsCurrentInterval.count();
190 return {currentEventCount, duration_cast<seconds>(newPrintTime -
fBeginTime)};
195struct RestoreStreamState {
196 RestoreStreamState(std::ostream &stream) :
fStream(stream),
fFlags(stream.flags()),
fFillChar(stream.fill()) {}
197 ~RestoreStreamState()
209std::ostream &
operator<<(std::ostream &stream, std::chrono::seconds elapsedSeconds)
211 RestoreStreamState restore(stream);
212 auto h = std::chrono::duration_cast<std::chrono::hours>(elapsedSeconds);
213 auto m = std::chrono::duration_cast<std::chrono::minutes>(elapsedSeconds -
h);
214 auto s = (elapsedSeconds -
h -
m).count();
217 stream <<
h.count() <<
':' << std::setw(2) << std::right << std::setfill(
'0');
218 stream <<
m.count() <<
':' << std::setw(2) << std::right << std::setfill(
'0') << s;
219 return stream << (
h.count() > 0 ?
'h' :
'm');
226 std::chrono::seconds elapsedSeconds)
const
228 RestoreStreamState restore(stream);
235 stream <<
"\033[35m";
237 <<
"Elapsed time: " << elapsedSeconds <<
" ";
240 stream <<
"processing file: " << currentFileIdx <<
" / " << totalFiles <<
" ";
244 stream <<
"\033[32m";
246 stream <<
"processed evts: " << currentEventCount;
247 if (GetNEventsOfCurrentFile != 0) {
248 stream <<
" / " << std::scientific << std::setprecision(2) << GetNEventsOfCurrentFile;
256 stream << std::scientific << std::setprecision(2) << evtpersec <<
" evt/s";
259 if (GetNEventsOfCurrentFile != 0) {
261 stream <<
"\033[35m";
262 std::chrono::seconds remainingSeconds(
264 stream <<
" " << remainingSeconds <<
" "
265 <<
" remaining time (per file being processed)";
275 RestoreStreamState restore(stream);
280 stream <<
"\033[35m";
282 <<
"Total elapsed time: " << elapsedSeconds <<
" ";
285 stream <<
"processed files: " << totalFiles <<
" / " << totalFiles <<
" ";
289 stream <<
"\033[32m";
291 stream <<
"processed evts: " << totalEvents;
292 if (totalEvents != 0) {
293 stream <<
" / " << std::scientific << std::setprecision(2) << totalEvents;
306 if (GetNEventsOfCurrentFile == 0)
309 RestoreStreamState restore(stream);
311 double completion =
double(currentEventCount) / GetNEventsOfCurrentFile;
312 unsigned int nBar = std::min(completion, 1.) *
fBarWidth;
314 std::string
bars(std::max(nBar, 1u),
'=');
318 stream <<
"\033[33m";
319 stream <<
'|' << std::setfill(
' ') << std::setw(
fBarWidth) << std::left <<
bars <<
"| ";
345 std::mutex fPrintMutex;
346 if (!fPrintMutex.try_lock())
348 std::lock_guard<std::mutex> lockGuard(fPrintMutex, std::adopt_lock);
349 const auto &[eventCount, elapsedSeconds] =
fHelper->RecordEvtCountAndTime();
355 std::cout <<
'\r' << std::string(
get_tty_size(),
' ') <<
'\r';
356 fHelper->PrintStatsFinal(std::cout, elapsedSeconds);
367 this->fHelper->registerNewSample(slot,
id);
368 return this->fHelper->ComputeNEventsSoFar();
376 auto progress = std::make_shared<ProgressHelper>(1000, total_files);
379 r.OnPartialResultSlot(1000, [progress](
unsigned int slot,
auto &&arg) { (*progress)(slot, arg); });
std::ostream::char_type fFillChar
std::ios_base::fmtflags fFlags
TBuffer & operator<<(TBuffer &buf, const Tmpl *obj)
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize id
Base class for action helpers, see RInterface::Book() for more information.
ELogLevel GetEffectiveVerbosity(const RLogManager &mgr) const
static RLogManager & Get()
Change the verbosity level (global or specific to the RLogChannel passed to the constructor) for the ...
ROOT::RDF::SampleCallback_t GetSampleCallback() final
Override this method to register a callback that is executed before the processing a new data sample ...
std::shared_ptr< ProgressHelper > fHelper
int & PartialUpdate(unsigned int)
ProgressBarAction(std::shared_ptr< ProgressHelper > r)
void InitTask(TTreeReader *, unsigned int)
std::shared_ptr< int > fDummyResult
std::shared_ptr< Result_t > GetResultPtr() const
std::string GetActionName()
std::pair< std::size_t, std::chrono::seconds > RecordEvtCountAndTime()
Record current event counts and time stamp, populate evts/s statistics array.
void PrintProgressBar(std::ostream &stream, std::size_t currentEventCount) const
Print a progress bar of width ProgressHelper::fBarWidth if fGetNEventsOfCurrentFile is known.
void PrintStats(std::ostream &stream, std::size_t currentEventCount, std::chrono::seconds totalElapsedSeconds) const
Print event and time statistics.
std::size_t fLastProcessedEvents
std::array< double, 20 > fEventsPerSecondStatistics
double EvtPerSec() const
Compute a running mean of events/s.
std::atomic< std::size_t > fProcessedEvents
unsigned int ComputeCurrentFileIdx() const
ProgressHelper(std::size_t increment, unsigned int totalFiles=1, unsigned int progressBarWidth=40, unsigned int printInterval=1, bool useColors=true)
Create a progress helper.
std::chrono::time_point< std::chrono::system_clock > fLastPrintTime
std::chrono::time_point< std::chrono::system_clock > fBeginTime
void PrintStatsFinal(std::ostream &stream, std::chrono::seconds totalElapsedSeconds) const
std::size_t fEventsPerSecondStatisticsIndex
std::size_t ComputeNEventsSoFar() const
The public interface to the RDataFrame federation of classes.
RResultPtr< typename std::decay_t< Helper >::Result_t > Book(Helper &&helper, const ColumnNames_t &columns={})
Book execution of a custom action using a user-defined helper object.
A type-erased version of RResultPtr and RResultMap.
Smart pointer for the return type of actions.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
ROOT's RDataFrame offers a modern, high-level interface for analysis of data stored in TTree ,...
This class provides a simple interface to execute the same task multiple times in parallel threads,...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Double_t CpuTime()
Stop the stopwatch (if it is running) and return the cputime (in seconds) passed between the start an...
void Stop()
Stop the stopwatch.
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
ROOT::Experimental::RLogChannel & RDFLogChannel()
RLogChannel & GetChannelOrManager()
@ kDebug
Debug information; only useful for developers; can have added verbosity up to 255-kDebug.
RResultMap< T > VariationsFor(RResultPtr< T > resPtr)
Produce all required systematic variations for the given result.
void AddProgressBar(ROOT::RDF::RNode df)
Add ProgressBar to a ROOT::RDF::RNode.
unsigned int RunGraphs(std::vector< RResultHandle > handles)
Trigger the event loop of multiple RDataFrames concurrently.
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with an RDataFrame computation graph via e....
RNode AsRNode(NodeType node)
Cast a RDataFrame node to the common type ROOT::RDF::RNode.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.