11 #include "RConfigure.h" 35 TActionBase::TActionBase(
TLoopManager *implPtr,
const ColumnNames_t &tmpBranches,
unsigned int nSlots)
36 : fImplPtr(implPtr), fTmpBranches(tmpBranches), fNSlots(nSlots)
65 :
fImplPtr(implPtr),
fTmpBranches(tmpBranches), fLastCheckedEntry(nSlots, -1), fLastResult(nSlots),
66 fAccepted(nSlots), fRejected(nSlots),
fName(name),
fNSlots(nSlots)
82 return !
fName.empty();
91 double perc = accepted;
92 if (all > 0) perc /= all;
94 Printf(
"%-10s: pass=%-10lld all=%-10lld -- %8.3f %%",
fName.c_str(), accepted, all, perc);
103 unsigned int fCursor;
104 std::vector<unsigned int> fBuf;
108 TSlotStack() =
delete;
109 TSlotStack(
unsigned int size) : fCursor(size), fBuf(size) { std::iota(fBuf.begin(), fBuf.end(), 0U); }
110 void Push(
unsigned int slotNumber);
114 void TSlotStack::Push(
unsigned int slotNumber)
116 std::lock_guard<ROOT::TSpinMutex> guard(fMutex);
117 fBuf[fCursor++] = slotNumber;
118 assert(fCursor <= fBuf.size() &&
"TSlotStack assumes that at most a fixed number of values can be present in the " 119 "stack. fCursor is greater than the size of the internal buffer. This violates " 123 unsigned int TSlotStack::Pop()
125 assert(fCursor > 0 &&
126 "TSlotStack assumes that a value can be always popped. fCursor is <=0 and this violates such assumption.");
127 std::lock_guard<ROOT::TSpinMutex> guard(fMutex);
128 return fBuf[--fCursor];
143 for (
auto &actionPtr :
fBookedActions) actionPtr->Run(slot, entry);
144 for (
auto &namedFilterPtr :
fBookedNamedFilters) namedFilterPtr->CheckFilters(slot, entry);
152 for (
auto &pair :
fBookedBranches) pair.second->ClearValueReaders(slot);
167 std::vector<std::pair<Long64_t, Long64_t>> entryRanges;
170 Long64_t end = start + nEntriesPerSlot;
175 entryRanges.emplace_back(start, end);
180 auto genFunction = [
this, &slotStack](
const std::pair<Long64_t, Long64_t> &range) {
181 auto slot = slotStack.Pop();
183 for (
auto currEntry = range.first; currEntry < range.second; ++currEntry) {
187 slotStack.Push(slot);
191 pool.
Foreach(genFunction, entryRanges);
194 std::unique_ptr<ttpmt_t> tp;
195 tp.reset(
new ttpmt_t(*
fTree));
197 tp->Process([
this, &slotStack](
TTreeReader &
r) ->
void {
198 auto slot = slotStack.Pop();
205 slotStack.Push(slot);
235 *readiness.get() =
true;
238 fResProxyReadiness.clear();
251 for (
auto &bookedBranch :
fBookedBranches) bookedBranch.second->Init(r, slot);
299 if (filterPtr->HasName()) {
332 unsigned int stride,
unsigned int nSlots)
333 : fImplPtr(implPtr), fTmpBranches(tmpBranches), fStart(start), fStop(stop), fStride(stride),
fNSlots(nSlots)
void Foreach(F func, unsigned nTimes)
Execute func (with no arguments) nTimes in parallel.
RangeBaseVec_t fBookedRanges
ColumnNames_t GetTmpBranches() const
FilterBaseVec_t fBookedFilters
TTreeReader is a simple, robust and fast interface to read values from a TTree, TChain or TNtuple...
Namespace for new ROOT classes and functions.
Long64_t GetCurrentEntry() const
Returns the index of the current entry being read.
TLoopManager * GetImplPtr() const
TRangeBase(TLoopManager *implPtr, const ColumnNames_t &tmpBranches, unsigned int start, unsigned int stop, unsigned int stride, unsigned int nSlots)
TTree()
Default constructor and I/O constructor.
TLoopManager * GetImplPtr() const
TLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
std::map< std::string, TmpBranchBasePtr_t > fBookedBranches
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
A spin mutex class which respects the STL interface for mutexes.
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
void InitAllNodes(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes.
void Book(const ActionBasePtr_t &actionPtr)
void CleanUpTask(unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
const ColumnNames_t fTmpBranches
std::vector< ULong64_t > fAccepted
std::vector< ULong64_t > fRejected
std::shared_ptr< TFilterBase > FilterBasePtr_t
TCustomColumnBase(TLoopManager *df, const ColumnNames_t &tmpBranches, std::string_view name, unsigned int nSlots)
const ColumnNames_t & GetDefaultBranches() const
void Report() const
Call PrintReport on all booked filters.
bool CheckFilters(int, unsigned int)
const ColumnNames_t fTmpBranches
ColumnNames_t GetTmpBranches() const
ActionBaseVec_t fBookedActions
This class provides a simple interface to execute the same task multiple times in parallel...
const unsigned int fNSlots
Number of thread slots used by this node, inherited from parent node.
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
const Long64_t fNEmptyEntries
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
std::shared_ptr< TCustomColumnBase > TmpBranchBasePtr_t
TLoopManager * fImplPtr
A raw pointer to the TLoopManager at the root of this functional graph.
const unsigned int fNSlots
FilterBaseVec_t fBookedNamedFilters
TFilterBase(TLoopManager *df, const ColumnNames_t &tmpBranches, std::string_view name, unsigned int nSlots)
ColumnNames_t fTmpBranches
Describe directory structure in memory.
const unsigned int fNSlots
Number of thread slots used by this node.
ColumnNames_t fTmpBranches
TLoopManager * fImplPtr
A raw pointer to the TLoopManager at the root of this functional graph.
const ColumnNames_t fDefaultBranches
ColumnNames_t GetTmpBranches() const
TCustomColumnBase * GetBookedBranch(const std::string &name) const
std::shared_ptr< TDFInternal::TActionBase > ActionBasePtr_t
unsigned int GetNSlots() const
TLoopManager * fImplPtr
A raw pointer to the TLoopManager at the root of this functional graph.
TLoopManager * GetImplPtr()
TLoopManager * GetImplPtr() const
Bool_t Next()
Move to the next entry (or index of the TEntryList if that is set).
std::shared_ptr< TTree > fTree
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
std::string GetName() const
std::vector< std::shared_ptr< bool > > fResProxyReadiness
TLoopManager * fImplPtr
A raw pointer to the TLoopManager at the root of this functional graph.
A class to process the entries of a TTree in parallel.
::TDirectory * GetDirectory() const
std::shared_ptr< TRangeBase > RangeBasePtr_t