61 std::unique_ptr<ROOT::RFieldBase>
CloneImpl(std::string_view )
const final
63 return std::make_unique<RRDFCardinalityField>();
98 *
static_cast<std::size_t *
>(to) =
size;
107 *
static_cast<std::size_t *
>(to) =
size;
121 std::unique_ptr<ROOT::RFieldBase>
CloneImpl(std::string_view)
const final
161 std::unique_ptr<RFieldBase::RValue>
fValue;
196 auto onDiskType =
source.GetSharedDescriptorGuard()->GetFieldDescriptor(
fField->GetOnDiskId()).GetTypeName();
197 std::string
msg =
"RNTupleDS: invalid type \"" +
fField->GetTypeName() +
"\" for column \"" +
200 throw std::runtime_error(
msg);
209 fValue = std::make_unique<RFieldBase::RValue>(
fField->CreateValue());
229 return fValue->GetPtr<
void>().get();
285 fieldDesc.GetTypeName().substr(0, 12) ==
"std::vector<" ||
fieldDesc.GetTypeName() ==
"");
315 f.SetOnDiskId(desc.
FindFieldId(
f.GetFieldName(),
f.GetParent()->GetOnDiskId()));
321 if (
info.fNRepetitions > 0) {
324 cardinalityField = std::make_unique<ROOT::Internal::RDF::RRDFCardinalityField>();
379 fPrincipalDescriptor =
pageSource->GetSharedDescriptorGuard()->Clone();
380 fStagingArea.emplace_back(std::move(
pageSource));
382 AddField(fPrincipalDescriptor,
"", fPrincipalDescriptor.GetFieldZeroId(),
383 std::vector<ROOT::RDF::RNTupleDS::RFieldInfo>());
394 static std::once_flag
flag;
395 std::call_once(
flag, []() {
406std::unique_ptr<ROOT::Internal::RPageSource> CreatePageSource(std::string_view
ntupleName, std::string_view fileName)
415 fFileNames = std::vector<std::string>{std::string{fileName}};
433std::unique_ptr<ROOT::Detail::RDF::RColumnReaderBase>
437 const auto index = std::distance(fColumnNames.begin(), std::find(fColumnNames.begin(), fColumnNames.end(),
name));
448 return fld->GetTypeName() == requestedType;
455 throw std::runtime_error(
"RNTupleDS: Could not create field with type \"" +
requestedType +
456 "\" for column \"" + std::string(
name));
469 fFieldId2QualifiedName[
field->GetOnDiskId()] = fPrincipalDescriptor.GetQualifiedFieldName(
field->GetOnDiskId());
470 for (
const auto &s : *
field) {
471 fFieldId2QualifiedName[s.GetOnDiskId()] = fPrincipalDescriptor.GetQualifiedFieldName(s.GetOnDiskId());
474 auto reader = std::make_unique<ROOT::Internal::RDF::RNTupleColumnReader>(
this,
field);
475 fActiveColumnReaders[
slot].emplace_back(
reader.get());
483 std::unique_lock lock(fMutexStaging);
484 fCvStaging.wait(lock, [
this] {
return fIsReadyForStaging || fStagingThreadShouldTerminate; });
485 if (fStagingThreadShouldTerminate)
490 fHasNextSources =
true;
491 fIsReadyForStaging =
false;
494 fCvStaging.notify_one();
500 const auto nFiles = fFileNames.empty() ? 1 : fFileNames.size();
501 for (
auto i = fNextFileIndex; (i <
nFiles) && ((i - fNextFileIndex) < fNSlots); ++i) {
502 if (fStagingThreadShouldTerminate)
505 if (fStagingArea[i]) {
509 fStagingArea[i] = CreatePageSource(fNTupleName, fFileNames[i]);
510 fStagingArea[i]->LoadStructure();
517 assert(fNextRanges.empty());
518 auto nFiles = fFileNames.empty() ? 1 : fFileNames.size();
525 while ((fNextRanges.size() < fNSlots) && (fNextFileIndex <
nFiles)) {
528 std::swap(fStagingArea[fNextFileIndex],
range.fSource);
530 if (!
range.fSource) {
533 range.fSource = CreatePageSource(fNTupleName, fFileNames[fNextFileIndex]);
535 range.fFileName = fFileNames[fNextFileIndex];
536 range.fSource->Attach();
544 fNextRanges.emplace_back(std::move(
range));
553 for (std::size_t i = 0; (fNextRanges.size() < fNSlots) && (fNextFileIndex <
nFiles); ++i) {
554 std::unique_ptr<ROOT::Internal::RPageSource>
source;
557 std::swap(fStagingArea[fNextFileIndex],
source);
560 source = CreatePageSource(fNTupleName, fFileNames[fNextFileIndex]);
590 unsigned int iSlot = 0;
606 range.fSource->SetEntryRange({start, end - start});
607 range.fFirstEntry = start;
608 range.fLastEntry = end;
609 fNextRanges.emplace_back(std::move(
range));
616 std::vector<std::pair<ULong64_t, ULong64_t>> ranges;
625 for (
auto r : fActiveColumnReaders[0]) {
626 r->Disconnect(
true );
632 if (fCurrentRanges.empty() || (fSeenEntries > 0)) {
636 std::unique_lock lock(fMutexStaging);
637 fCvStaging.wait(lock, [
this] {
return fHasNextSources; });
640 if (fNextRanges.empty()) {
645 assert(fNextRanges.size() <= fNSlots);
647 fCurrentRanges.clear();
648 std::swap(fCurrentRanges, fNextRanges);
653 std::lock_guard
_(fMutexStaging);
654 fIsReadyForStaging =
true;
655 fHasNextSources =
false;
657 fCvStaging.notify_one();
664 fFirstEntry2RangeIdx.clear();
666 for (std::size_t i = 0; i < fCurrentRanges.size(); ++i) {
669 if (fCurrentRanges[i].fFirstEntry == 0) {
674 auto start = fCurrentRanges[i].fFirstEntry + fSeenEntries;
675 auto end = fCurrentRanges[i].fLastEntry + fSeenEntries;
678 fFirstEntry2RangeIdx[start] = i;
679 ranges.emplace_back(start, end);
683 if ((fNSlots == 1) && (fCurrentRanges[0].fSource)) {
684 for (
auto r : fActiveColumnReaders[0]) {
685 r->Connect(*fCurrentRanges[0].fSource, ranges[0].first);
696 fSlotsToRangeIdxs[0] = 0;
706 fSlotsToRangeIdxs[
slot * ROOT::Internal::RDF::CacheLineStep<std::size_t>()] =
idxRange;
708 for (
auto r : fActiveColumnReaders[
slot]) {
718 for (
auto r : fActiveColumnReaders[
slot]) {
719 r->Disconnect(
true );
728 auto msg = std::string(
"RNTupleDS: There is no column with name \"") + std::string(
colName) +
"\"";
729 throw std::runtime_error(
msg);
733 return fColumnTypes[
index];
738 return std::find(fColumnNames.begin(), fColumnNames.end(),
colName) != fColumnNames.end();
745 fIsReadyForStaging = fHasNextSources = fStagingThreadShouldTerminate =
false;
746 fThreadStaging = std::thread(&RNTupleDS::ExecStaging,
this);
747 assert(fNextRanges.empty());
749 if (fCurrentRanges.empty() || (fFileNames.size() > fNSlots)) {
752 std::lock_guard
_(fMutexStaging);
753 fIsReadyForStaging =
true;
755 fCvStaging.notify_one();
759 fNextFileIndex = std::max(fFileNames.size(), std::size_t(1));
765 for (
unsigned int i = 0; i < fNSlots; ++i) {
766 for (
auto r : fActiveColumnReaders[i]) {
767 r->Disconnect(
false );
771 std::lock_guard
_(fMutexStaging);
772 fStagingThreadShouldTerminate =
true;
774 fCvStaging.notify_one();
775 fThreadStaging.join();
778 if (fFileNames.size() > fNSlots) {
779 fCurrentRanges.clear();
781 fStagingArea.clear();
782 fStagingArea.resize(fFileNames.size());
791 fActiveColumnReaders.resize(fNSlots);
792 fSlotsToRangeIdxs.resize(fNSlots * ROOT::Internal::RDF::CacheLineStep<std::size_t>());
806 unsigned int slot,
const std::unordered_map<std::string, ROOT::RDF::Experimental::RSample *> &
sampleMap)
const
812 const auto &
rangeIdx = fSlotsToRangeIdxs.at(
slot * ROOT::Internal::RDF::CacheLineStep<std::size_t>());
815 if (!fCurrentRanges[
rangeIdx].fSource)
829 throw std::runtime_error(
"Full sample identifier '" +
ntupleID +
"' cannot be found in the available samples.");
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
unsigned long long ULong64_t
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t index
R__EXTERN TSystem * gSystem
void GetCollectionInfo(const ROOT::NTupleSize_t globalIndex, RNTupleLocalIndex *collectionStart, ROOT::NTupleSize_t *collectionSize)
For offset columns only, look at the two adjacent values that define a collection's coordinates.
An artificial field that provides the size of a fixed-size array.
void GenerateColumns(const ROOT::RNTupleDescriptor &) final
Implementations in derived classes should create the backing columns corresponding to the field type ...
std::unique_ptr< ROOT::RFieldBase > CloneImpl(std::string_view) const final
Called by Clone(), which additionally copies the on-disk ID.
void ReadGlobalImpl(ROOT::NTupleSize_t, void *to) final
RArraySizeField(const RArraySizeField &other)=delete
RArraySizeField(RArraySizeField &&other)=default
RArraySizeField & operator=(RArraySizeField &&other)=default
void ReadInClusterImpl(RNTupleLocalIndex, void *to) final
std::size_t GetValueSize() const final
The number of bytes taken by a value of the appropriate type.
RArraySizeField(std::size_t arrayLength)
void ConstructValue(void *where) const final
Constructs value in a given location of size at least GetValueSize(). Called by the base class' Creat...
~RArraySizeField() final=default
RArraySizeField & operator=(const RArraySizeField &other)=delete
std::size_t GetAlignment() const final
As a rule of thumb, the alignment is equal to the size of the type.
void GenerateColumns() final
Implementations in derived classes should create the backing columns corresponding to the field type ...
Every RDF column is represented by exactly one RNTuple field.
void * GetImpl(Long64_t entry) final
void Connect(RPageSource &source, Long64_t entryOffset)
Connect the field and its subfields to the page source.
RNTupleColumnReader(RNTupleDS *ds, RFieldBase *protoField)
std::unique_ptr< RFieldBase::RValue > fValue
The memory location used to read from fField.
std::unique_ptr< RFieldBase > fField
The field backing the RDF column.
Long64_t fEntryOffset
For chains, the logical entry and the physical entry in any particular file can be different.
std::shared_ptr< void > fValuePtr
Used to reuse the object created by fValue when reconnecting sources.
RNTupleDS * fDataSource
The data source that owns this column reader.
~RNTupleColumnReader() override=default
RFieldBase * fProtoField
The prototype field from which fField is cloned.
Long64_t fLastEntry
Last entry number that was read.
void Disconnect(bool keepValue)
An artificial field that transforms an RNTuple column that contains the offset of collections into co...
RRDFCardinalityField(RRDFCardinalityField &&other)=default
size_t GetAlignment() const final
As a rule of thumb, the alignment is equal to the size of the type.
void GenerateColumns() final
Implementations in derived classes should create the backing columns corresponding to the field type ...
void ReadGlobalImpl(ROOT::NTupleSize_t globalIndex, void *to) final
Get the number of elements of the collection identified by globalIndex.
RRDFCardinalityField & operator=(RRDFCardinalityField &&other)=default
void GenerateColumns(const ROOT::RNTupleDescriptor &desc) final
Implementations in derived classes should create the backing columns corresponding to the field type ...
~RRDFCardinalityField() override=default
const RColumnRepresentations & GetColumnRepresentations() const final
Implementations in derived classes should return a static RColumnRepresentations object.
size_t GetValueSize() const final
The number of bytes taken by a value of the appropriate type.
void ReadInClusterImpl(ROOT::RNTupleLocalIndex localIndex, void *to) final
Get the number of elements of the collection identified by clusterIndex.
std::unique_ptr< ROOT::RFieldBase > CloneImpl(std::string_view) const final
Called by Clone(), which additionally copies the on-disk ID.
void ConstructValue(void *where) const final
Constructs value in a given location of size at least GetValueSize(). Called by the base class' Creat...
static void SetClusterBunchSize(RNTupleReadOptions &options, unsigned int val)
Abstract interface to read data from an ntuple.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
std::vector< void * > Record_t
The RDataSource implementation for RNTuple.
void AddField(const ROOT::RNTupleDescriptor &desc, std::string_view colName, ROOT::DescriptorId_t fieldId, std::vector< RFieldInfo > fieldInfos, bool convertToRVec=true)
Provides the RDF column "colName" given the field identified by fieldID.
std::vector< std::pair< ULong64_t, ULong64_t > > GetEntryRanges() final
Return ranges of entries to distribute to tasks.
void ExecStaging()
The main function of the fThreadStaging background thread.
std::vector< std::unique_ptr< ROOT::Internal::RPageSource > > fStagingArea
The staging area is relevant for chains of files, i.e.
std::unique_ptr< ROOT::Detail::RDF::RColumnReaderBase > GetColumnReaders(unsigned int, std::string_view, const std::type_info &) final
If the other GetColumnReaders overload returns an empty vector, this overload will be called instead.
std::vector< std::unique_ptr< ROOT::RFieldBase > > fProtoFields
We prepare a prototype field for every column.
void SetNSlots(unsigned int nSlots) final
Inform RDataSource of the number of processing slots (i.e.
std::vector< std::string > fFileNames
void InitSlot(unsigned int slot, ULong64_t firstEntry) final
Convenience method called at the start of the data processing associated to a slot.
RNTupleDS(std::unique_ptr< ROOT::Internal::RPageSource > pageSource)
std::string GetTypeName(std::string_view colName) const final
Type of a column as a string, e.g.
std::unordered_map< ROOT::DescriptorId_t, std::string > fFieldId2QualifiedName
Connects the IDs of active proto fields and their subfields to their fully qualified name (a....
std::string fNTupleName
The data source may be constructed with an ntuple name and a list of files.
void PrepareNextRanges()
Populates fNextRanges with the next set of entry ranges.
void StageNextSources()
Starting from fNextFileIndex, opens the next fNSlots files.
void Finalize() final
Convenience method called after concluding an event-loop.
std::vector< std::string > fColumnTypes
void Initialize() final
Convenience method called before starting an event-loop.
std::vector< std::string > fColumnNames
bool HasColumn(std::string_view colName) const final
Checks if the dataset has a certain column.
Record_t GetColumnReadersImpl(std::string_view name, const std::type_info &) final
type-erased vector of pointers to pointers to column values - one per slot
void FinalizeSlot(unsigned int slot) final
Convenience method called at the end of the data processing associated to a slot.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
ROOT's RDataFrame offers a modern, high-level interface for analysis of data stored in TTree ,...
Base class for all ROOT issued exceptions.
The list of column representations a field can have.
A field translates read and write calls from/to underlying columns to/from tree values.
ROOT::Internal::RColumn * fPrincipalColumn
All fields that have columns have a distinct main column.
RConstSchemaIterator cbegin() const
const std::string & GetFieldName() const
static RResult< std::unique_ptr< RFieldBase > > Create(const std::string &fieldName, const std::string &typeName, const ROOT::RCreateFieldOptions &options, const ROOT::RNTupleDescriptor *desc, ROOT::DescriptorId_t fieldId)
Factory method to resurrect a field from the stored on-disk type information.
ROOT::DescriptorId_t GetOnDiskId() const
std::unique_ptr< RFieldBase > Clone(std::string_view newName) const
Copies the field and its subfields using a possibly new name and a new, unconnected set of columns.
The on-storage metadata of an RNTuple.
RFieldDescriptorIterable GetFieldIterable(const RFieldDescriptor &fieldDesc) const
const RFieldDescriptor & GetFieldDescriptor(ROOT::DescriptorId_t fieldId) const
ROOT::DescriptorId_t FindFieldId(std::string_view fieldName, ROOT::DescriptorId_t parentId) const
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Common user-tunable settings for reading RNTuples.
const_iterator begin() const
const_iterator end() const
virtual const char * Getenv(const char *env)
Get environment variable.
std::string TypeID2TypeName(const std::type_info &id)
Returns the name of a type starting from its type_info An empty string is returned in case of failure...
void CallConnectPageSourceOnField(RFieldBase &, ROOT::Internal::RPageSource &)
std::string GetRenormalizedTypeName(const std::string &metaNormalizedName)
Given a type name normalized by ROOT meta, renormalize it for RNTuple. E.g., insert std::prefix.
RDataFrame FromRNTuple(std::string_view ntupleName, std::string_view fileName)
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
constexpr DescriptorId_t kInvalidDescriptorId
ENTupleStructure
The fields in the ntuple model tree can carry different structural information about the type system.
The PrepareNextRanges() method populates the fNextRanges list with REntryRangeDS records.