46 TTHREAD_TLS(
unsigned int) count = 0U;
51 TTHREAD_TLS(
unsigned int) index = UINT_MAX;
55 std::vector<unsigned int>
fBuf;
60 TSlotStack(
unsigned int size) : fCursor(size), fBuf(size) { std::iota(fBuf.begin(), fBuf.end(), 0U); }
84 class TLoopManager :
public std::enable_shared_from_this<TLoopManager> {
86 enum class ELoopType { kROOTFiles, kROOTFilesMT, kNoFiles, kNoFilesMT, kDataSource, kDataSourceMT };
96 : fFun(
std::move(f)), fEveryN(everyN), fCounters(nSlots, 0ull) {}
100 auto &c = fCounters[slot];
118 if (fHasBeenCalled[slot] == 1)
121 fHasBeenCalled[slot] = 1;
133 std::shared_ptr<TTree> fTree{
nullptr};
138 const unsigned int fNSlots{1};
139 bool fMustRunNamedFilters{
true};
140 unsigned int fNChildren{0};
141 unsigned int fNStopsReceived{0};
150 void RunEmptySourceMT();
151 void RunEmptySource();
152 void RunTreeProcessorMT();
153 void RunTreeReader();
154 void RunDataSourceMT();
155 void RunDataSource();
156 void RunAndCheckFilters(
unsigned int slot,
Long64_t entry);
160 void CleanUpTask(
unsigned int slot);
162 void EvalChildrenCounts();
167 TLoopManager(std::unique_ptr<TDataSource> ds,
const ColumnNames_t &defaultBranches);
173 std::shared_ptr<TLoopManager>
GetSharedPtr() {
return shared_from_this(); }
174 const ColumnNames_t &GetDefaultColumnNames()
const;
176 TTree *GetTree()
const;
178 const std::map<std::string, TCustomColumnBasePtr_t> &
GetBookedColumns()
const {
return fBookedCustomColumns; }
185 void Book(
const std::shared_ptr<bool> &branchPtr);
187 bool CheckFilters(
int,
unsigned int);
196 void Jit(
const std::string &
s) { fToJit.append(s); }
199 void AddColumnAlias(
const std::string &alias,
const std::string &colName) { fAliasColumnNameMap[alias] = colName; }
200 const std::map<std::string, std::string> &
GetAliasMap()
const {
return fAliasColumnNameMap; }
231 template <
typename T>
244 unsigned int fSlot = std::numeric_limits<unsigned int>::max();
262 bool fArrayHasBeenChecked =
false;
271 constexpr
bool useReaderValue = std::is_same<ProxyParam_t, T>::value;
272 if (useReaderValue) {
273 fColumnKind = EColumnKind::kTreeValue;
276 fColumnKind = EColumnKind::kTreeArray;
282 template <
typename U =
T,
283 typename std::enable_if<std::is_same<typename TColumnValue<U>::ProxyParam_t, U>::value,
int>::type = 0>
287 template <typename U = T, typename std::enable_if<!std::is_same<ProxyParam_t, U>::value,
int>::type = 0>
290 auto &readerArray = *fReaderArrays.back();
295 if (!fArrayHasBeenChecked) {
296 if (readerArray.GetSize() > 1) {
297 if (1 != (&readerArray[1] - &readerArray[0])) {
298 std::string exceptionText =
"Branch ";
299 exceptionText += readerArray.GetBranchName();
301 " hangs from a non-split branch. For this reason, it cannot be accessed via a TArrayBranch." 302 " Please read the top level branch instead.";
303 throw std::runtime_error(exceptionText);
305 fArrayHasBeenChecked =
true;
314 switch (fColumnKind) {
315 case EColumnKind::kTreeValue: fReaderValues.pop_back();
break;
316 case EColumnKind::kTreeArray: fReaderArrays.pop_back();
break;
317 case EColumnKind::kCustomColumn:
318 fCustomColumns.pop_back();
319 fCustomValuePtrs.pop_back();
321 case EColumnKind::kDataSource:
322 fCustomColumns.pop_back();
323 fDSValuePtrs.pop_back();
330 template <
typename T>
334 template <
typename... BranchTypes>
336 using type = std::tuple<TColumnValue<BranchTypes>...>;
339 template <
typename BranchType>
343 template <
typename ValueTuple,
int...
S>
347 std::initializer_list<int> expander{(std::get<S>(values).
Reset(), 0)...};
364 virtual void Run(
unsigned int slot,
Long64_t entry) = 0;
365 virtual void InitSlot(
TTreeReader *
r,
unsigned int slot) = 0;
366 virtual void TriggerChildrenCount() = 0;
367 virtual void ClearValueReaders(
unsigned int slot) = 0;
371 virtual void *PartialUpdate(
unsigned int slot) = 0;
374 template <
typename Helper,
typename PrevDataFrame,
typename BranchTypes_t =
typename Helper::BranchTypes_t>
376 using TypeInd_t = GenStaticSeq_t<BranchTypes_t::list_size>;
381 std::vector<TDFValueTuple_t<BranchTypes_t>>
fValues;
384 TAction(Helper &&
h,
const ColumnNames_t &bl, PrevDataFrame &pd)
396 InitTDFValues(slot, fValues[slot],
r, fBranches, fImplPtr->GetCustomColumnNames(), fImplPtr->GetBookedColumns(),
398 fHelper.InitSlot(
r, slot);
404 if (fPrevData.CheckFilters(slot, entry))
409 void Exec(
unsigned int slot,
Long64_t entry, TDFInternal::StaticSeq<S...>)
412 fHelper.Exec(slot, std::get<S>(fValues[slot]).Get(entry)...);
422 void *
PartialUpdate(
unsigned int slot)
final {
return PartialUpdateImpl(slot); }
427 template <
typename H = Helper>
428 auto PartialUpdateImpl(
unsigned int slot) -> decltype(std::declval<H>().PartialUpdate(slot), (
void *)(
nullptr))
430 return &fHelper.PartialUpdate(slot);
434 throw std::runtime_error(
"This action does not support callbacks yet!");
449 unsigned int fNChildren{0};
450 unsigned int fNStopsReceived{0};
460 virtual void InitSlot(
TTreeReader *
r,
unsigned int slot) = 0;
461 virtual void *GetValuePtr(
unsigned int slot) = 0;
462 virtual const std::type_info &GetTypeId()
const = 0;
465 virtual void Update(
unsigned int slot,
Long64_t entry) = 0;
466 virtual void ClearValueReaders(
unsigned int slot) = 0;
472 namespace TCCHelperTypes {
481 template <
typename F,
typename UPDATE_HELPER_TYPE = TCCHelperTypes::TNothing>
492 using BranchTypes_t =
typename TDFInternal::RemoveFirstTwoParametersIf<std::is_same<TSlotAndEntry, UHT_t>::value,
494 using TypeInd_t = TDFInternal::GenStaticSeq_t<BranchTypes_t::list_size>;
495 using ret_type =
typename CallableTraits<F>::ret_type;
498 typename std::conditional<std::is_same<ret_type, bool>::value, std::deque<ret_type>, std::vector<ret_type>>
::type;
504 std::vector<TDFInternal::TDFValueTuple_t<BranchTypes_t>>
fValues;
508 bool isDSColumn =
false)
510 fLastResults(fNSlots), fValues(fNSlots)
519 TDFInternal::InitTDFValues(slot, fValues[slot],
r, fBranches, fImplPtr->GetCustomColumnNames(),
520 fImplPtr->GetBookedColumns(),
TypeInd_t());
523 void *
GetValuePtr(
unsigned int slot)
final {
return static_cast<void *
>(&fLastResults[slot]); }
527 if (entry != fLastCheckedEntry[slot]) {
530 fLastCheckedEntry[slot] = entry;
539 template <
int...
S,
typename... BranchTypes>
543 fLastResults[slot] = fExpression(std::get<S>(fValues[slot]).Get(entry)...);
549 template <
int...
S,
typename... BranchTypes>
553 fLastResults[slot] = fExpression(slot, std::get<S>(fValues[slot]).Get(entry)...);
559 template <
int...
S,
typename... BranchTypes>
563 fLastResults[slot] = fExpression(slot, entry, std::get<S>(fValues[slot]).Get(entry)...);
577 std::vector<int> fLastResult = {
true};
578 std::vector<ULong64_t> fAccepted = {0};
579 std::vector<ULong64_t> fRejected = {0};
581 unsigned int fNChildren{0};
582 unsigned int fNStopsReceived{0};
590 virtual void InitSlot(
TTreeReader *
r,
unsigned int slot) = 0;
591 virtual bool CheckFilters(
unsigned int slot,
Long64_t entry) = 0;
592 virtual void Report()
const = 0;
593 virtual void PartialReport()
const = 0;
595 bool HasName()
const;
596 void PrintReport()
const;
597 virtual void IncrChildrenCount() = 0;
598 virtual void StopProcessing() = 0;
604 virtual void TriggerChildrenCount() = 0;
608 assert(!fName.empty());
610 std::fill(fAccepted.begin(), fAccepted.end(), 0);
611 std::fill(fRejected.begin(), fRejected.end(), 0);
613 virtual void ClearValueReaders(
unsigned int slot) = 0;
617 template <
typename FilterF,
typename PrevDataFrame>
620 using TypeInd_t = TDFInternal::GenStaticSeq_t<BranchTypes_t::list_size>;
625 std::vector<TDFInternal::TDFValueTuple_t<BranchTypes_t>>
fValues;
639 if (entry != fLastCheckedEntry[slot]) {
640 if (!fPrevData.CheckFilters(slot, entry)) {
642 fLastResult[slot] =
false;
645 auto passed = CheckFilterHelper(slot, entry,
TypeInd_t());
646 passed ? ++fAccepted[slot] : ++fRejected[slot];
647 fLastResult[slot] = passed;
649 fLastCheckedEntry[slot] = entry;
651 return fLastResult[slot];
657 return fFilter(std::get<S>(fValues[slot]).Get(entry)...);
665 TDFInternal::InitTDFValues(slot, fValues[slot],
r, fBranches, fImplPtr->GetCustomColumnNames(),
666 fImplPtr->GetBookedColumns(),
TypeInd_t());
670 void Report() const final { PartialReport(); }
674 fPrevData.PartialReport();
681 if (fNStopsReceived == fNChildren)
682 fPrevData.StopProcessing();
689 if (fNChildren == 1 && fName.empty())
690 fPrevData.IncrChildrenCount();
695 assert(!fName.empty());
696 fPrevData.IncrChildrenCount();
710 bool fLastResult{
true};
712 unsigned int fNChildren{0};
713 unsigned int fNStopsReceived{0};
714 bool fHasStopped{
false};
717 void ResetCounters();
721 const unsigned int nSlots);
726 virtual bool CheckFilters(
unsigned int slot,
Long64_t entry) = 0;
727 virtual void Report()
const = 0;
728 virtual void PartialReport()
const = 0;
729 virtual void IncrChildrenCount() = 0;
730 virtual void StopProcessing() = 0;
740 template <
typename PrevData>
745 TRange(
unsigned int start,
unsigned int stop,
unsigned int stride, PrevData &pd)
756 if (entry != fLastCheckedEntry) {
759 if (!fPrevData.CheckFilters(slot, entry)) {
764 ++fNProcessedEntries;
765 if (fNProcessedEntries <= fStart || (fStop > 0 && fNProcessedEntries > fStop) ||
766 (fStride != 1 && fNProcessedEntries % fStride != 0))
770 if (fNProcessedEntries == fStop) {
772 fPrevData.StopProcessing();
775 fLastCheckedEntry = entry;
782 void Report() const final { fPrevData.PartialReport(); }
789 if (fNStopsReceived == fNChildren && !fHasStopped)
790 fPrevData.StopProcessing();
798 fPrevData.IncrChildrenCount();
809 template <
typename T>
812 fCustomColumns.emplace_back(customColumn);
814 throw std::runtime_error(
815 std::string(
"TColumnValue: type specified for column \"" + customColumn->
GetName() +
"\" is ") +
816 typeid(
T).name() +
" but temporary column has type " + customColumn->
GetTypeId().name());
819 fColumnKind = EColumnKind::kDataSource;
820 fDSValuePtrs.emplace_back(static_cast<T **>(customColumn->
GetValuePtr(slot)));
822 fColumnKind = EColumnKind::kCustomColumn;
823 fCustomValuePtrs.emplace_back(static_cast<T *>(customColumn->
GetValuePtr(slot)));
832 template <
typename T>
833 template <
typename U,
834 typename std::enable_if<std::is_same<typename TColumnValue<U>::ProxyParam_t, U>::value,
int>
::type>
837 if (fColumnKind == EColumnKind::kTreeValue) {
838 return *(fReaderValues.back()->Get());
840 fCustomColumns.back()->Update(fSlot, entry);
841 return fColumnKind == EColumnKind::kCustomColumn ? *fCustomValuePtrs.back() : **fDSValuePtrs.back();
848 #endif // ROOT_TDFNODES
typename TTDFValueTuple< BranchType >::type TDFValueTuple_t
std::string GetName(const std::string &scope_name)
void StopProcessing() final
RangeBaseVec_t fBookedRanges
ColumnNames_t fDefinedDataSourceColumns
List of data-source columns that have been Defined so far.
TDataSource * GetDataSource() const
std::vector< std::unique_ptr< TTreeReaderValue< T > > > fReaderValues
Owning ptrs to a TTreeReaderValue. Used for non-temporary columns when T != TArrayBranch<U> ...
FilterBaseVec_t fBookedFilters
typename CallableTraits< FilterF >::arg_types BranchTypes_t
void Report() const final
bool CheckFilters(unsigned int slot, Long64_t entry) final
void operator()(unsigned int slot)
const ColumnNames_t fBranches
TTreeReader is a simple, robust and fast interface to read values from a TTree, TChain or TNtuple...
basic_string_view< char > string_view
std::function< void(unsigned int)> Callback_t
Namespace for new ROOT classes and functions.
T & Get(Long64_t entry)
This overload is used to return scalar quantities (i.e. types that are not read into a TArrayBranch) ...
PrevDataFrame & fPrevData
std::vector< TDFValueTuple_t< BranchTypes_t > > fValues
std::map< std::string, TCustomColumnBasePtr_t > fBookedCustomColumns
void PartialReport() const final
unsigned int & GetCount()
void InitSlot(TTreeReader *r, unsigned int slot) final
typename std::conditional< std::is_same< ret_type, bool >::value, std::deque< ret_type >, std::vector< ret_type > >::type ValuesPerSlot_t
void ResetChildrenCount()
ColumnNames_t fCustomColumnNames
Contains names of all custom columns defined in the functional graph.
void PartialReport() const final
void * GetValuePtr(unsigned int slot) final
const ColumnNames_t fDefaultColumns
const std::unique_ptr< TDataSource > fDataSource
Owning pointer to a data-source object. Null if no data-source.
void * PartialUpdateImpl(...)
std::vector< TDFInternal::TDFValueTuple_t< BranchTypes_t > > fValues
void * PartialUpdate(unsigned int slot) final
This method is invoked to update a partial result during the event loop, right before passing the res...
void PartialReport() const
End of recursive chain of calls, does nothing.
unsigned int GetNSlots() const
typename CallableTraits< F >::arg_types FunParamTypes_t
void TriggerChildrenCount() final
std::vector< unsigned int > fBuf
virtual void ClearValueReaders(unsigned int slot) final
const ColumnNames_t fBranches
A spin mutex class which respects the STL interface for mutexes.
const std::map< std::string, TCustomColumnBasePtr_t > & GetBookedColumns() const
typename TDFInternal::RemoveFirstParameterIf< std::is_same< TSlot, UHT_t >::value, FunParamTypes_t >::type BranchTypesTmp_t
TArrayBranch< ProxyParam_t > Get(Long64_t)
This overload is used to return arrays (i.e. types that are read into a TArrayBranch) ...
TAction(Helper &&h, const ColumnNames_t &bl, PrevDataFrame &pd)
void AddDataSourceColumn(std::string_view name)
const std::map< std::string, std::string > & GetAliasMap() const
void StopProcessing() final
std::vector< ULong64_t > fCounters
void MakeProxy(TTreeReader *r, const std::string &bn)
void UpdateHelper(unsigned int slot, Long64_t entry, TDFInternal::StaticSeq< S... >, TypeList< BranchTypes... >, TCCHelperTypes::TSlotAndEntry *)
const std::type_info & GetTypeId() const
std::shared_ptr< TFilterBase > FilterBasePtr_t
unsigned int GetNSlots() const
void ReturnSlot(unsigned int slotNumber)
Helper class that updates and returns TTree branches as well as TDataFrame temporary columns...
unsigned int GetNSlots() const
ULong64_t GetNEmptyEntries() const
typename CallableTraits< F >::ret_type ret_type
std::shared_ptr< TLoopManager > GetSharedPtr()
void SetTmpColumn(unsigned int slot, TCustomColumnBase *tmpColumn)
const ColumnNames_t & GetCustomColumnNames() const
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
bool MustRunNamedFilters() const
EColumnKind
TColumnValue has a slightly different behaviour whether the column comes from a TTreeReader, a TDataFrame Define or a TDataSource.
void InitSlot(TTreeReader *r, unsigned int slot) final
void operator()(unsigned int slot)
Extracts data from a TTree.
void InitSlot(TTreeReader *r, unsigned int slot) final
ActionBaseVec_t fBookedActions
PrevDataFrame & fPrevData
When using TDataFrame to read data from a ROOT file, users can specify that the type of a branch is T...
typename std::conditional< std::is_same< ReaderValueOrArray_t< T >, TTreeReaderValue< T > >::value, T, TakeFirstParameter_t< T > >::type ProxyParam_t
std::vector< FilterBasePtr_t > FilterBaseVec_t
std::vector< Long64_t > fLastCheckedEntry
RooArgSet S(const RooAbsArg &v1)
TRange(unsigned int start, unsigned int stop, unsigned int stride, PrevData &pd)
const ColumnNames_t & GetDefinedDataSourceColumns() const
std::vector< TCallback > fCallbacks
Registered callbacks.
const unsigned int fNSlots
number of thread slots used by this node, inherited from parent node.
const bool fIsDataSourceColumn
does the custom column refer to a data-source column? (or a user-define column?)
void Report() const final
TDataSource defines an API that TDataFrame can use to read arbitrary data formats.
std::vector< T * > fCustomValuePtrs
Non-owning ptrs to the value of a custom column.
std::vector< RangeBasePtr_t > RangeBaseVec_t
std::vector< T ** > fDSValuePtrs
Non-owning ptrs to the value of a data-source column.
std::vector< ActionBasePtr_t > ActionBaseVec_t
void TriggerChildrenCount() final
unsigned int GetNSlots() const
bool IsDataSourceColumn() const
TOneTimeCallback(Callback_t &&f, unsigned int nSlots)
void ClearValueReaders(unsigned int slot) final
void UpdateHelper(unsigned int slot, Long64_t entry, TDFInternal::StaticSeq< S... >, TypeList< BranchTypes... >, TCCHelperTypes::TSlot *)
TCallback(ULong64_t everyN, Callback_t &&f, unsigned int nSlots)
void Exec(unsigned int slot, Long64_t entry, TDFInternal::StaticSeq< S... >)
TLoopManager * fImplPtr
A raw pointer to the TLoopManager at the root of this functional graph.
std::string fToJit
string containing all BuildAndBook actions that should be jitted before running
std::vector< TDFInternal::TDFValueTuple_t< BranchTypes_t > > fValues
std::map< std::string, std::string > fAliasColumnNameMap
ColumnNameAlias-columnName pairs.
void Jit(const std::string &s)
void Update(unsigned int slot, Long64_t entry) final
void Reset(Detail::TBranchProxy *x)
FilterBaseVec_t fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
std::tuple< TColumnValue< BranchTypes >... > type
TFilter(FilterF &&f, const ColumnNames_t &bl, PrevDataFrame &pd, std::string_view name="")
TDFInternal::GenStaticSeq_t< BranchTypes_t::list_size > TypeInd_t
const ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
Lightweight storage for a collection of types.
TCustomColumn(std::string_view name, F &&expression, const ColumnNames_t &bl, TLoopManager *lm, bool isDSColumn=false)
std::vector< std::unique_ptr< TTreeReaderArray< ProxyParam_t > > > fReaderArrays
Owning ptrs to a TTreeReaderArray. Used for non-temporary columns when T == TArrayBranch<U>.
virtual const std::type_info & GetTypeId() const =0
const unsigned int fNSlots
Number of thread slots used by this node, inherited from parent node.
Describe directory structure in memory.
unsigned long long ULong64_t
std::vector< Long64_t > fLastCheckedEntry
const unsigned int fNSlots
Number of thread slots used by this node.
void Run(unsigned int slot, Long64_t entry) final
ROOT type_traits extensions.
TLoopManager * fImplPtr
A raw pointer to the TLoopManager at the root of this functional graph.
typename TDFInternal::RemoveFirstTwoParametersIf< std::is_same< TSlotAndEntry, UHT_t >::value, BranchTypesTmp_t >::type BranchTypes_t
static constexpr double s
ValuesPerSlot_t fLastResults
std::shared_ptr< TDFInternal::TActionBase > ActionBasePtr_t
TDFInternal::GenStaticSeq_t< BranchTypes_t::list_size > TypeInd_t
Binding & operator=(OUT(*fun)(void))
unsigned int GetNSlots() const
TLoopManager * fImplPtr
A raw pointer to the TLoopManager at the root of this functional graph.
TSlotStack(unsigned int size)
typedef void((*Func_t)())
std::vector< int > fHasBeenCalled
void ResetChildrenCount()
unsigned int & GetIndex()
const ColumnNames_t fBranches
void IncrChildrenCount() final
void AddColumnAlias(const std::string &alias, const std::string &colName)
virtual void ClearValueReaders(unsigned int slot) final
std::string GetName() const
std::vector< TCustomColumnBase * > fCustomColumns
Non-owning ptrs to the node responsible for the custom column. Needed when querying custom values...
bool CheckFilterHelper(unsigned int slot, Long64_t entry, TDFInternal::StaticSeq< S... >)
std::shared_ptr< TCustomColumnBase > TCustomColumnBasePtr_t
std::vector< std::shared_ptr< bool > > fResProxyReadiness
void ResetTDFValueTuple(ValueTuple &values, StaticSeq< S... >)
Clear the proxies of a tuple of TColumnValues.
auto PartialUpdateImpl(unsigned int slot) -> decltype(std::declval< H >().PartialUpdate(slot),(void *)(nullptr))
void UpdateHelper(unsigned int slot, Long64_t entry, TDFInternal::StaticSeq< S... >, TypeList< BranchTypes... >, TCCHelperTypes::TNothing *)
void IncrChildrenCount() final
const unsigned int fNSlots
Number of thread slots used by this node, inherited from parent node.
TLoopManager * fImplPtr
A raw pointer to the TLoopManager at the root of this functional graph.
std::vector< TOneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
virtual void * GetValuePtr(unsigned int slot)=0
GenStaticSeq_t< BranchTypes_t::list_size > TypeInd_t
std::shared_ptr< TRangeBase > RangeBasePtr_t
void SetTree(const std::shared_ptr< TTree > &tree)
bool CheckFilters(unsigned int slot, Long64_t entry) final
Ranges act as filters when it comes to selecting entries that downstream nodes should process...