11 #include "RConfigure.h" 35 TActionBase::TActionBase(
TLoopManager *implPtr,
const unsigned int nSlots) : fImplPtr(implPtr), fNSlots(nSlots)
44 const bool isDSColumn)
45 :
fImplPtr(implPtr), fName(name),
fNSlots(nSlots), fIsDataSourceColumn(isDSColumn)
65 :
fImplPtr(implPtr), fLastResult(nSlots), fAccepted(nSlots), fRejected(nSlots),
fName(name),
fNSlots(nSlots)
76 return !
fName.empty();
85 double perc = accepted;
89 Printf(
"%-10s: pass=%-10lld all=%-10lld -- %8.3f %%",
fName.c_str(), accepted, all, perc);
99 void TSlotStack::ReturnSlot(
unsigned int slotNumber)
101 auto &index = GetIndex();
102 auto &count = GetCount();
103 assert(count > 0U &&
"TSlotStack has a reference count relative to an index which will become negative.");
107 std::lock_guard<ROOT::TSpinMutex> guard(fMutex);
108 fBuf[fCursor++] = slotNumber;
109 assert(fCursor <= fBuf.size() &&
"TSlotStack assumes that at most a fixed number of values can be present in the " 110 "stack. fCursor is greater than the size of the internal buffer. This violates " 115 unsigned int TSlotStack::GetSlot()
117 auto &index = GetIndex();
118 auto &count = GetCount();
120 if (UINT_MAX != index)
122 std::lock_guard<ROOT::TSpinMutex> guard(fMutex);
123 assert(fCursor > 0 &&
"TSlotStack assumes that a value can be always obtained. In this case fCursor is <=0 and this " 124 "violates such assumption.");
125 index = fBuf[--fCursor];
130 : fTree(
std::shared_ptr<TTree>(tree, [](TTree *) {})),
fDefaultColumns(defaultBranches),
159 std::vector<std::pair<ULong64_t, ULong64_t>> entryRanges;
167 entryRanges.emplace_back(start, end);
172 auto genFunction = [
this, &slotStack](
const std::pair<ULong64_t, ULong64_t> &range) {
173 auto slot = slotStack.
GetSlot();
175 for (
auto currEntry = range.first; currEntry < range.second; ++currEntry) {
183 pool.
Foreach(genFunction, entryRanges);
185 #endif // not implemented otherwise 203 std::unique_ptr<ttpmt_t> tp;
204 tp.reset(
new ttpmt_t(*
fTree));
206 tp->Process([
this, &slotStack](
TTreeReader &
r) ->
void {
207 auto slot = slotStack.
GetSlot();
216 #endif // no-op otherwise (will not be called) 223 if (0 ==
fTree->GetEntriesFast())
return;
240 while (!ranges.empty()) {
243 for (
const auto &range : ranges) {
244 auto end = range.second;
245 for (
auto entry = range.first; entry < end; ++entry) {
265 auto runOnRange = [
this, &slotStack](
const std::pair<ULong64_t, ULong64_t> &range) {
266 const auto slot = slotStack.
GetSlot();
269 const auto end = range.second;
270 for (
auto entry = range.first; entry < end; ++entry) {
281 while (!ranges.empty()) {
282 pool.
Foreach(runOnRange, ranges);
286 #endif // not implemented otherwise (never called) 293 for (
auto &actionPtr :
fBookedActions) actionPtr->Run(slot, entry);
294 for (
auto &namedFilterPtr :
fBookedNamedFilters) namedFilterPtr->CheckFilters(slot, entry);
295 for (
auto &callback :
fCallbacks) callback(slot);
322 customColumn.second->InitNode();
335 *readiness.get() =
true;
337 fResProxyReadiness.clear();
343 ptr->ResetChildrenCount();
345 ptr->ResetChildrenCount();
346 for (
auto &ptr : fBookedRanges)
347 ptr->ResetChildrenCount();
364 auto error = TInterpreter::EErrorCode::kNoError;
366 if (TInterpreter::EErrorCode::kNoError != error) {
367 std::string exceptionText =
368 "An error occurred while jitting. The lines above might indicate the cause of the crash\n";
369 throw std::runtime_error(exceptionText.c_str());
382 for (
auto &actionPtr :
fBookedActions) actionPtr->TriggerChildrenCount();
442 if (filterPtr->HasName()) {
450 const auto &
name = columnPtr->GetName();
479 if (everyNEvents == 0ull)
486 const unsigned int nSlots)
487 : fImplPtr(implPtr), fStart(start), fStop(stop), fStride(stride),
fNSlots(nSlots)
void Foreach(F func, unsigned nTimes)
Execute func (with no arguments) nTimes in parallel.
RangeBaseVec_t fBookedRanges
FilterBaseVec_t fBookedFilters
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
TTreeReader is a simple, robust and fast interface to read values from a TTree, TChain or TNtuple...
basic_string_view< char > string_view
Namespace for new ROOT classes and functions.
Long64_t fLastCheckedEntry
Long64_t GetCurrentEntry() const
Returns the index of the current entry being read.
std::map< std::string, TCustomColumnBasePtr_t > fBookedCustomColumns
void RunTreeProcessorMT()
Run event loop over one or multiple ROOT files, in parallel.
TLoopManager * GetImplPtr() const
TTree()
Default constructor and I/O constructor.
ColumnNames_t fCustomColumnNames
Contains names of all custom columns defined in the functional graph.
const ColumnNames_t fDefaultColumns
const std::unique_ptr< TDataSource > fDataSource
Owning pointer to a data-source object. Null if no data-source.
TLoopManager * GetImplPtr() const
TLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
void Book(const ActionBasePtr_t &actionPtr)
void CleanUpTask(unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
TCustomColumnBase(TLoopManager *df, std::string_view name, const unsigned int nSlots, const bool isDSColumn)
TFilterBase(TLoopManager *df, std::string_view name, const unsigned int nSlots)
std::vector< ULong64_t > fAccepted
std::vector< ULong64_t > fRejected
std::shared_ptr< TFilterBase > FilterBasePtr_t
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void RunEmptySource()
Run event loop with no source files, in sequence.
void Run()
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source...
void ReturnSlot(unsigned int slotNumber)
TRangeBase(TLoopManager *implPtr, unsigned int start, unsigned int stop, unsigned int stride, const unsigned int nSlots)
void Report() const
Call PrintReport on all booked filters.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
bool CheckFilters(int, unsigned int)
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
::TDirectory *const fDirPtr
bool fMustRunNamedFilters
ActionBaseVec_t fBookedActions
std::vector< Long64_t > fLastCheckedEntry
This class provides a simple interface to execute the same task multiple times in parallel...
std::vector< TCallback > fCallbacks
Registered callbacks.
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
const unsigned int fNSlots
number of thread slots used by this node, inherited from parent node.
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the TDataFrame...
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
void RunTreeReader()
Run event loop over one or multiple ROOT files, in sequence.
TLoopManager * fImplPtr
A raw pointer to the TLoopManager at the root of this functional graph.
const unsigned int fNSlots
std::string fToJit
string containing all BuildAndBook actions that should be jitted before running
FilterBaseVec_t fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
const ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
bool fHasStopped
True if the end of the range has been reached.
void JitActions()
Jit all actions that required runtime column type inference, and clean the fToJit member variable...
Describe directory structure in memory.
unsigned long long ULong64_t
std::vector< Long64_t > fLastCheckedEntry
const unsigned int fNSlots
Number of thread slots used by this node.
const ULong64_t fNEmptyEntries
TLoopManager * fImplPtr
A raw pointer to the TLoopManager at the root of this functional graph.
TCustomColumnBase * GetBookedBranch(const std::string &name) const
std::shared_ptr< TDFInternal::TActionBase > ActionBasePtr_t
ULong64_t fNProcessedEntries
unsigned int GetNSlots() const
TLoopManager * fImplPtr
A raw pointer to the TLoopManager at the root of this functional graph.
TLoopManager * GetImplPtr()
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
TLoopManager * GetImplPtr() const
Bool_t Next()
Move to the next entry (or index of the TEntryList if that is set).
std::shared_ptr< TTree > fTree
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
std::string GetName() const
std::shared_ptr< TCustomColumnBase > TCustomColumnBasePtr_t
std::vector< std::shared_ptr< bool > > fResProxyReadiness
const unsigned int fNSlots
Number of thread slots used by this node, inherited from parent node.
TLoopManager * fImplPtr
A raw pointer to the TLoopManager at the root of this functional graph.
A class to process the entries of a TTree in parallel.
::TDirectory * GetDirectory() const
std::vector< TOneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
std::shared_ptr< TRangeBase > RangeBasePtr_t