22#include "RConfigure.h"
38#if defined(_WIN32) || defined(_WIN64)
39#define WIN32_LEAN_AND_MEAN
55#if defined(_WIN32) || defined(_WIN64)
56 if (!_isatty(_fileno(stdout)))
59 CONSOLE_SCREEN_BUFFER_INFO csbi;
60 if (GetConsoleScreenBufferInfo(GetStdHandle(STD_OUTPUT_HANDLE), &csbi))
61 width = (int)(csbi.srWindow.Right - csbi.srWindow.Left + 1);
66 ioctl(fileno(stdout), TIOCGWINSZ, &w);
67 width = (int)(w.ws_col);
73struct RestoreStreamState {
74 RestoreStreamState(std::ostream &stream) : fStream(stream) { fPreservedState.copyfmt(stream); }
75 ~RestoreStreamState() { fStream.copyfmt(fPreservedState); }
77 std::ostream &fStream;
78 std::ios fPreservedState{
nullptr};
82std::string prettyPrint(std::chrono::seconds
const &elapsedSeconds)
84 std::stringstream str;
85 auto h = std::chrono::duration_cast<std::chrono::hours>(elapsedSeconds);
86 auto m = std::chrono::duration_cast<std::chrono::minutes>(elapsedSeconds -
h);
87 auto s = (elapsedSeconds -
h -
m).count();
90 str <<
h.count() <<
':' << std::setw(2) << std::right << std::setfill(
'0');
91 str <<
m.count() <<
':' << std::setw(2) << std::right << std::setfill(
'0') << s;
92 str << (
h.count() > 0 ?
'h' :
'm');
103 if (handles.empty()) {
104 Warning(
"RunGraphs",
"Got an empty list of handles, now quitting.");
109 const unsigned int nToRun =
110 std::count_if(handles.begin(), handles.end(), [](
const auto &
h) { return !h.IsReady(); });
111 if (nToRun < handles.size()) {
112 Warning(
"RunGraphs",
"Got %zu handles from which %zu link to results which are already ready.", handles.size(),
113 handles.size() - nToRun);
120 std::set<
RResultHandle,
decltype(sameGraph)> s(handles.begin(), handles.end(), sameGraph);
121 std::vector<RResultHandle> uniqueLoops(s.begin(), s.end());
131 uniqueLoops[0].fLoopManager->Jit();
135 uniqueLoops[0].fLoopManager->Jit();
140 <<
"Just-in-time compilation phase for RunGraphs (" << uniqueLoops.size()
141 <<
" unique computation graphs) completed"
142 << (sw.
RealTime() > 1
e-3 ?
" in " + std::to_string(sw.
RealTime()) +
" seconds." :
" in less than 1ms.");
147 if (
h.fLoopManager) {
148 h.fLoopManager->SetSlotStack(slotStack);
149 h.fLoopManager->Run(
false);
159 std::for_each(uniqueLoops.begin(), uniqueLoops.end(), run);
165 <<
"Finished RunGraphs run (" << uniqueLoops.size() <<
" unique computation graphs, " << sw.
CpuTime() <<
"s CPU, "
168 return uniqueLoops.size();
181#
if defined(_WIN32) || defined(_WIN64)
182 fIsTTY{_isatty(_fileno(stdout)) != 0},
185 fIsTTY{isatty(fileno(stdout)) == 1},
225 return n > 0 ?
sum /
n : 0.;
232 std::size_t result = 0;
234 result += item.second;
242 using namespace std::chrono;
249 auto newPrintTime = system_clock::now();
252 duration<double> secondsCurrentInterval = newPrintTime - oldPrintTime;
255 eventsPerTimeInterval / secondsCurrentInterval.count();
257 return {currentEventCount, duration_cast<seconds>(newPrintTime -
fBeginTime)};
262 std::chrono::seconds elapsedSeconds)
const
264 std::ostringstream buffer;
267 std::size_t currentFileIdx;
273 if (totalEventsInOpenFiles == 0)
276 double completion = 0.;
279 completion = fractionSeenFiles * (
double(currentEventCount) / totalEventsInOpenFiles);
281 completion =
double(currentEventCount) / totalEventsInOpenFiles;
287 unsigned int nBar = std::min(completion, 1.) * barWidth;
288 std::string
bars(std::max(nBar, 1u),
'=');
289 bars.back() = (nBar == barWidth) ?
'=' :
'>';
292 buffer <<
"\033[33m";
293 buffer <<
'|' << std::setfill(
' ') << std::setw(barWidth) << std::left <<
bars <<
"| ";
301 buffer <<
"\033[35m";
302 buffer <<
"Elapsed: " << prettyPrint(elapsedSeconds) <<
" ";
305 buffer <<
"files: " << currentFileIdx <<
" / " <<
fTotalFiles <<
" ";
309 buffer <<
"\033[32m";
311 buffer <<
"events: " << currentEventCount;
312 if (totalEventsInOpenFiles != 0) {
313 buffer <<
" / " << std::scientific << std::setprecision(2);
315 buffer << totalEventsInOpenFiles;
317 buffer <<
"(" << totalEventsInOpenFiles <<
" + x)";
325 buffer << std::scientific << std::setprecision(2) << evtpersec <<
" evt/s";
332 if (totalEventsInOpenFiles != 0) {
334 buffer <<
"\033[35m";
336 std::chrono::seconds remainingSeconds;
339 std::chrono::seconds{
static_cast<long long>((
ComputeTotalEvents() - currentEventCount) / evtpersec)};
342 std::chrono::seconds{
static_cast<long long>(elapsedSeconds.count() / completion - elapsedSeconds.count())};
344 buffer <<
" remaining ca.: " << prettyPrint(remainingSeconds);
352 RestoreStreamState restore(stream);
353 stream << std::left << std::setw(
fNColumns - 1) << buffer.str();
358 auto &stream = std::cout;
359 RestoreStreamState restore(stream);
360 const auto elapsedSeconds =
361 std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() -
fBeginTime);
369 stream <<
'\r' << std::string(
fNColumns,
' ') <<
'\r';
372 stream <<
"\033[35m";
374 <<
"Total elapsed time: " << prettyPrint(elapsedSeconds) <<
" ";
377 stream <<
"processed files: " <<
fTotalFiles <<
" ";
381 stream <<
"\033[32m";
383 stream <<
"processed events: " << totalEvents;
388 stream <<
" " << std::scientific << std::setprecision(2) << (
double)totalEvents / elapsedSeconds.count()
399 using namespace std::chrono;
413 std::unique_lock<std::mutex> lockGuard(
fUpdateMutex, std::try_to_lock);
425 std::cout << std::flush;
427 std::cout << std::endl;
463 constexpr std::size_t callbackEveryNEvents = 1000;
465 auto progress = std::make_shared<ProgressHelper>(callbackEveryNEvents, total_files);
468 r.OnPartialResultSlot(callbackEveryNEvents, [progress](
unsigned int slot,
auto &&arg) { (*progress)(slot, arg); });
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Base class for action helpers, see RInterface::Book() for more information.
ROOT::RDF::SampleCallback_t GetSampleCallback() final
Override this method to register a callback that is executed before the processing a new data sample ...
std::shared_ptr< ProgressHelper > fHelper
int & PartialUpdate(unsigned int)
ProgressBarAction(std::shared_ptr< ProgressHelper > r)
void InitTask(TTreeReader *, unsigned int)
std::shared_ptr< int > fDummyResult
std::shared_ptr< Result_t > GetResultPtr() const
std::string GetActionName()
unsigned int const fTotalFiles
std::size_t ComputeTotalEvents() const
Compute total events in all open files.
void PrintStatsFinal() const
ProgressHelper(std::size_t increment, unsigned int totalFiles, unsigned int printInterval=0, bool useColors=true)
Create a progress helper.
std::pair< std::size_t, std::chrono::seconds > RecordEvtCountAndTime()
Record current event counts and time stamp, populate evts/s statistics array.
void RegisterNewSample(unsigned int, const ROOT::RDF::RSampleInfo &id)
Register a new sample for completion statistics.
unsigned int const fNColumns
unsigned int fEventsPerSecondStatisticsCounter
std::chrono::time_point< std::chrono::system_clock > const fBeginTime
void Update()
Record number of events processed and update progress bar.
std::map< std::string, ULong64_t > fSampleNameToEventEntries
std::size_t fLastProcessedEvents
std::mutex fSampleNameToEventEntriesMutex
std::chrono::seconds const fPrintInterval
double EvtPerSec() const
Compute a running mean of events/s.
std::atomic< std::size_t > fProcessedEvents
std::array< double, 10 > fEventsPerSecondStatistics
std::chrono::time_point< std::chrono::system_clock > fLastPrintTime
std::size_t const fIncrement
bool const fUseShellColours
void PrintProgressAndStats(std::ostream &stream, std::size_t currentEventCount, std::chrono::seconds totalElapsedSeconds) const
Print event and time statistics.
RResultPtr< typename std::decay_t< Helper >::Result_t > Book(Helper &&helper, const ColumnNames_t &columns={})
Book execution of a custom action using a user-defined helper object.
A type-erased version of RResultPtr and RResultMap.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
ROOT's RDataFrame offers a modern, high-level interface for analysis of data stored in TTree ,...
ELogLevel GetEffectiveVerbosity(const RLogManager &mgr) const
static RLogManager & Get()
Change the verbosity level (global or specific to the RLogChannel passed to the constructor) for the ...
This class provides a simple interface to execute the same task multiple times in parallel threads,...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Double_t CpuTime()
Stop the stopwatch (if it is running) and return the cputime (in seconds) passed between the start an...
void Stop()
Stop the stopwatch.
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
ROOT::RLogChannel & RDFLogChannel()
unsigned int & NThreadPerTH3()
Obtain or set the number of threads that will share a clone of a thread-safe 3D histogram.
if(pos!=-1) leafTypeName.Remove(pos)
RLogChannel & GetChannelOrManager()
void ThreadsPerTH3(unsigned int nThread=1)
Set the number of threads sharing one TH3 in RDataFrame.
void AddProgressBar(ROOT::RDF::RNode df)
Add ProgressBar to a ROOT::RDF::RNode.
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with an RDataFrame computation graph via e....
unsigned int RunGraphs(std::vector< RResultHandle > handles)
Run the event loops of multiple RDataFrames concurrently.
RInterface<::ROOT::Detail::RDF::RNodeBase > RNode
RNode AsRNode(NodeType node)
Cast a RDataFrame node to the common type ROOT::RDF::RNode.
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
UInt_t GetThreadPoolSize()
Returns the size of ROOT's thread pool.
@ kDebug
Debug information; only useful for developers; can have added verbosity up to 255-kDebug.
static uint64_t sum(uint64_t i)