Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleDS.hxx
Go to the documentation of this file.
1/// \file RNTupleDS.hxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \author Enrico Guiraud <enrico.guiraud@cern.ch>
5/// \date 2018-10-04
6/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
7/// is welcome!
8
9/*************************************************************************
10 * Copyright (C) 1995-2025, Rene Brun and Fons Rademakers. *
11 * All rights reserved. *
12 * *
13 * For the licensing terms see $ROOTSYS/LICENSE. *
14 * For the list of contributors see $ROOTSYS/README/CREDITS. *
15 *************************************************************************/
16
17#ifndef ROOT_RNTupleDS
18#define ROOT_RNTupleDS
19
20#include <ROOT/RDataSource.hxx>
22#include <ROOT/RNTupleTypes.hxx>
23#include <string_view>
24
25#include <condition_variable>
26#include <cstdint>
27#include <memory>
28#include <mutex>
29#include <optional>
30#include <string>
31#include <thread>
32#include <vector>
33#include <unordered_map>
34
35// Follow RDF namespace convention
36namespace ROOT {
37class RDataFrame;
38}
39namespace ROOT::Internal::RDF {
40/**
41 * \brief Internal overload of the function that allows passing a range of entries
42 *
43 * The event range will be respected when processing this RNTuple. It is assumed
44 * that processing happens within one thread only.
45 */
46ROOT::RDataFrame FromRNTuple(std::string_view ntupleName, const std::vector<std::string> &fileNames,
47 const std::pair<ULong64_t, ULong64_t> &range);
48/**
49 * \brief Retrieves the cluster boundaries and the number of entries for the input RNTuple
50 *
51 * \param[in] ntupleName The name of the RNTuple dataset
52 * \param[in] location The location of the RNTuple dataset (e.g. a path to a file)
53 *
54 * \note This function is a helper for the Python side to avoid having to deal
55 * with the shared descriptor guard.
56 */
57std::pair<std::vector<ROOT::Internal::RNTupleClusterBoundaries>, ROOT::NTupleSize_t>
58GetClustersAndEntries(std::string_view ntupleName, std::string_view location);
59} // namespace ROOT::Internal::RDF
60
61namespace ROOT {
62class RFieldBase;
63class RDataFrame;
64class RNTuple;
65} // namespace ROOT
66namespace ROOT::Detail::RDF {
67class RNodeBase;
68}
69namespace ROOT::RDF {
70template <typename T>
71class RInterface;
72}
73namespace ROOT::Internal::RDF {
74class RNTupleColumnReader;
75std::vector<std::pair<std::uint64_t, std::uint64_t>>
77}
78namespace ROOT::Internal {
79class RPageSource;
80}
81
82namespace ROOT::RDF {
85
86 /// The PrepareNextRanges() method populates the fNextRanges list with REntryRangeDS records.
87 /// The GetEntryRanges() swaps fNextRanges and fCurrentRanges and uses the list of
88 /// REntryRangeDS records to return the list of ranges ready to use by the RDF loop manager.
90 std::unique_ptr<ROOT::Internal::RPageSource> fSource;
91 ULong64_t fFirstEntry = 0; ///< First entry index in fSource
92 /// End entry index in fSource, e.g. the number of entries in the range is fLastEntry - fFirstEntry
94 std::string_view fFileName; ///< Storage location of the current RNTuple
95 };
96
97 /// A clone of the first pages source's descriptor.
99
100 /// The data source may be constructed with an ntuple name and a list of files
101 std::string fNTupleName;
102 std::vector<std::string> fFileNames;
103 /// The staging area is relevant for chains of files, i.e. when fFileNames is not empty. In this case,
104 /// files are opened in the background in batches of size `fNSlots` and kept in the staging area.
105 /// The first file (chains or no chains) is always opened on construction in order to process the schema.
106 /// For all subsequent files, the corresponding page sources in the staging area only executed `LoadStructure()`,
107 /// i.e. they should have a compressed buffer of the meta-data available.
108 /// Concretely:
109 /// 1. We open the first file on construction to read the schema and then move the corresponding page source
110 /// in the staging area.
111 /// 2. On `Initialize()`, we start the I/O background thread, which in turn opens the first batch of files.
112 /// 3. At the beginning of `GetEntryRanges()`, we
113 /// a) wait for the I/O thread to finish,
114 /// b) call `PrepareNextRanges()` in the main thread to move the page sources from the staging area
115 /// into `fNextRanges`; this will also call `Attach()` on the page sources (i.e., deserialize the meta-data),
116 /// and
117 /// c) trigger staging of the next batch of files in the I/O background thread.
118 /// 4. On `Finalize()`, the I/O background thread is stopped.
119 std::vector<std::unique_ptr<ROOT::Internal::RPageSource>> fStagingArea;
120 std::size_t fNextFileIndex = 0; ///< Index into fFileNames to the next file to process
121
122 /// We prepare a prototype field for every column. If a column reader is actually requested
123 /// in GetColumnReaders(), we move a clone of the field into a new column reader for RDataFrame.
124 /// Only the clone connects to the backing page store and acquires I/O resources.
125 /// The field IDs are set in the context of the first source and used as keys in fFieldId2QualifiedName.
126 std::vector<std::unique_ptr<ROOT::RFieldBase>> fProtoFields;
127 /// Columns may be requested with types other than with which they were initially added as proto fields. For example,
128 /// a column with a `ROOT::RVec<float>` proto field may instead be requested as a `std::vector<float>`. In case this
129 /// happens, we create an alternative proto field and store it here, with the original index in `fProtoFields` as
130 /// key. A single column can have more than one alternative proto fields.
131 std::unordered_map<std::size_t, std::vector<std::unique_ptr<ROOT::RFieldBase>>> fAlternativeProtoFields;
132 /// Connects the IDs of active proto fields and their subfields to their fully qualified name (a.b.c.d).
133 /// This enables the column reader to rewire the field IDs when the file changes (chain),
134 /// using the fully qualified name as a search key in the descriptor of the other page sources.
135 std::unordered_map<ROOT::DescriptorId_t, std::string> fFieldId2QualifiedName;
136 std::vector<std::string> fColumnNames;
137 std::vector<std::string> fColumnTypes;
138 std::vector<std::string> fTopLevelFieldNames;
139 /// List of column readers returned by GetColumnReaders() organized by slot. Used to reconnect readers
140 /// to new page sources when the files in the chain change.
141 std::vector<std::vector<ROOT::Internal::RDF::RNTupleColumnReader *>> fActiveColumnReaders;
142
143 ULong64_t fSeenEntriesNoGlobalRange = 0; ///< The number of entries seen so far in GetEntryRanges()
144
145 std::vector<REntryRangeDS> fCurrentRanges; ///< Basis for the ranges returned by the last GetEntryRanges() call
146 std::vector<REntryRangeDS> fNextRanges; ///< Basis for the ranges populated by the PrepareNextRanges() call
147 /// Maps the first entries from the ranges of the last GetEntryRanges() call to their corresponding index in
148 /// the fCurrentRanges vectors. This is necessary because the returned ranges get distributed arbitrarily
149 /// onto slots. In the InitSlot method, the column readers use this map to find the correct range to connect to.
150 std::unordered_map<ULong64_t, std::size_t> fFirstEntry2RangeIdx;
151 // Keep track of the scheduled entries - necessary for processing of GlobalEntries
152 std::vector<std::pair<ULong64_t, ULong64_t>> fOriginalRanges;
153 /// One element per slot, corresponding to the current range index for that slot, as filled by InitSlot
154 std::vector<std::size_t> fSlotsToRangeIdxs;
155
156 /// The background thread that runs StageNextSources()
157 std::thread fThreadStaging;
158 /// Protects the shared state between the main thread and the I/O thread
159 std::mutex fMutexStaging;
160 /// Signal for the state information of fIsReadyForStaging and fHasNextSources
161 std::condition_variable fCvStaging;
162 /// Is true when the staging thread should start working
163 bool fIsReadyForStaging = false;
164 /// Is true when the staging thread has populated the next batch of files to fStagingArea
165 bool fHasNextSources = false;
166 /// Is true when the I/O thread should quit
168
169 /// \brief Holds useful information about fields added to the RNTupleDS
170 struct RFieldInfo {
172 std::size_t fNRepetitions;
173 // Enable `std::vector::emplace_back` for this type
178 };
179
180 /// Provides the RDF column "colName" given the field identified by fieldID. For records and collections,
181 /// AddField recurses into the sub fields. The fieldInfos argument is a list of objects holding info
182 /// about the fields of the outer collection(s) (w.r.t. fieldId). For instance, if fieldId refers to an
183 /// `std::vector<Jet>`, with
184 /// ~~~{.cpp}
185 /// struct Jet {
186 /// float pt;
187 /// float eta;
188 /// };
189 /// ~~~
190 /// AddField will recurse into `Jet.pt` and `Jet.eta` and provide the two inner fields as `ROOT::VecOps::RVec<float>`
191 /// each.
192 ///
193 /// In case the field is a collection of type `ROOT::VecOps::RVec`, `std::vector` or `std::array`, its corresponding
194 /// column is added as a `ROOT::VecOps::RVec`. Otherwise, the collection field's on-disk type is used. Note, however,
195 /// that inner record members of such collections will still be added as `ROOT::VecOps::RVec` (e.g., `std::set<Jet>
196 /// will be added as a `std::set`, but `Jet.[pt|eta] will be added as `ROOT::VecOps::RVec<float>).
197 void AddField(const ROOT::RNTupleDescriptor &desc, std::string_view colName, ROOT::DescriptorId_t fieldId,
198 std::vector<RFieldInfo> fieldInfos, bool convertToRVec = true);
199
200 /// The main function of the fThreadStaging background thread
201 void ExecStaging();
202 /// Starting from `fNextFileIndex`, opens the next `fNSlots` files. Calls `LoadStructure()` on the opened files.
203 /// The very first file is already available from the constructor.
204 void StageNextSources();
205 /// Populates fNextRanges with the next set of entry ranges. Moves files from the staging area as necessary
206 /// and aligns ranges with cluster boundaries for scheduling the tail of files.
207 /// Upon return, the fNextRanges list is ordered. It has usually fNSlots elements; fewer if there
208 /// is not enough work to give at least one cluster to every slot.
209 void PrepareNextRanges();
210
211 explicit RNTupleDS(std::unique_ptr<ROOT::Internal::RPageSource> pageSource);
212
213 ROOT::RFieldBase *GetFieldWithTypeChecks(std::string_view fieldName, const std::type_info &tid);
214
216 const std::vector<std::string> &fileNames,
217 const std::pair<ULong64_t, ULong64_t> &range);
218
219 // This function needs to acess private members fNTupleName and fFileNames
220 friend std::vector<std::pair<std::uint64_t, std::uint64_t>> ROOT::Internal::RDF::GetDatasetGlobalClusterBoundaries(
222
223 explicit RNTupleDS(std::string_view ntupleName, const std::vector<std::string> &fileNames,
224 const std::pair<ULong64_t, ULong64_t> &range);
225
226public:
227 RNTupleDS(std::string_view ntupleName, std::string_view fileName);
228 RNTupleDS(std::string_view ntupleName, const std::vector<std::string> &fileNames);
229 // Rule of five
230 RNTupleDS(const RNTupleDS &) = delete;
231 RNTupleDS &operator=(const RNTupleDS &) = delete;
232 RNTupleDS(RNTupleDS &&) = delete;
235
236 void SetNSlots(unsigned int nSlots) final;
237 std::size_t GetNFiles() const final { return fFileNames.empty() ? 1 : fFileNames.size(); }
238 const std::vector<std::string> &GetColumnNames() const final { return fColumnNames; }
239 const std::vector<std::string> &GetTopLevelFieldNames() const final { return fTopLevelFieldNames; }
240 bool HasColumn(std::string_view colName) const final;
241 std::string GetTypeName(std::string_view colName) const final;
242 std::vector<std::pair<ULong64_t, ULong64_t>> GetEntryRanges() final;
243 std::string GetLabel() final { return "RNTupleDS"; }
244
245 void Initialize() final;
246 void InitSlot(unsigned int slot, ULong64_t firstEntry) final;
247 void FinalizeSlot(unsigned int slot) final;
248 void Finalize() final;
249
250 std::unique_ptr<ROOT::Detail::RDF::RColumnReaderBase>
251 GetColumnReaders(unsigned int /*slot*/, std::string_view /*name*/, const std::type_info &) final;
252
253 ROOT::RDF::RSampleInfo
254 CreateSampleInfo(unsigned int,
255 const std::unordered_map<std::string, ROOT::RDF::Experimental::RSample *> &) const final;
256
257 // Old API, unused
258 bool SetEntry(unsigned int, ULong64_t) final { return true; }
259
260protected:
261 Record_t GetColumnReadersImpl(std::string_view name, const std::type_info &) final;
262};
263} // namespace ROOT::RDF
264
265namespace ROOT::RDF {
266RDataFrame FromRNTuple(std::string_view ntupleName, std::string_view fileName);
267RDataFrame FromRNTuple(std::string_view ntupleName, const std::vector<std::string> &fileNames);
268} // namespace ROOT::RDF
269
270#endif
unsigned long long ULong64_t
Portable unsigned long integer 8 bytes.
Definition RtypesCore.h:84
char name[80]
Definition TGX11.cxx:148
Pure virtual base class for all column reader types.
Every RDF column is represented by exactly one RNTuple field.
Abstract interface to read data from an ntuple.
RDataSource defines an API that RDataFrame can use to read arbitrary data formats.
friend ROOT::RDF::RSampleInfo ROOT::Internal::RDF::CreateSampleInfo(const ROOT::RDF::RDataSource &, unsigned int, const std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > &)
std::vector< void * > Record_t
The public interface to the RDataFrame federation of classes.
The RDataSource implementation for RNTuple.
Definition RNTupleDS.hxx:83
bool fHasNextSources
Is true when the staging thread has populated the next batch of files to fStagingArea.
const std::vector< std::string > & GetColumnNames() const final
Returns a reference to the collection of the dataset's column names.
void AddField(const ROOT::RNTupleDescriptor &desc, std::string_view colName, ROOT::DescriptorId_t fieldId, std::vector< RFieldInfo > fieldInfos, bool convertToRVec=true)
Provides the RDF column "colName" given the field identified by fieldID.
std::vector< std::pair< ULong64_t, ULong64_t > > GetEntryRanges() final
Return ranges of entries to distribute to tasks.
std::size_t GetNFiles() const final
Returns the number of files from which the dataset is constructed.
std::vector< REntryRangeDS > fNextRanges
Basis for the ranges populated by the PrepareNextRanges() call.
std::unordered_map< ULong64_t, std::size_t > fFirstEntry2RangeIdx
Maps the first entries from the ranges of the last GetEntryRanges() call to their corresponding index...
void ExecStaging()
The main function of the fThreadStaging background thread.
bool fStagingThreadShouldTerminate
Is true when the I/O thread should quit.
RNTupleDS(RNTupleDS &&)=delete
std::vector< std::vector< ROOT::Internal::RDF::RNTupleColumnReader * > > fActiveColumnReaders
List of column readers returned by GetColumnReaders() organized by slot.
std::vector< std::unique_ptr< ROOT::Internal::RPageSource > > fStagingArea
The staging area is relevant for chains of files, i.e.
std::vector< std::pair< ULong64_t, ULong64_t > > fOriginalRanges
std::unique_ptr< ROOT::Detail::RDF::RColumnReaderBase > GetColumnReaders(unsigned int, std::string_view, const std::type_info &) final
If the other GetColumnReaders overload returns an empty vector, this overload will be called instead.
std::vector< std::size_t > fSlotsToRangeIdxs
One element per slot, corresponding to the current range index for that slot, as filled by InitSlot.
std::vector< std::unique_ptr< ROOT::RFieldBase > > fProtoFields
We prepare a prototype field for every column.
void SetNSlots(unsigned int nSlots) final
Inform RDataSource of the number of processing slots (i.e.
ROOT::RFieldBase * GetFieldWithTypeChecks(std::string_view fieldName, const std::type_info &tid)
RNTupleDS & operator=(const RNTupleDS &)=delete
std::vector< std::string > fFileNames
bool fIsReadyForStaging
Is true when the staging thread should start working.
void InitSlot(unsigned int slot, ULong64_t firstEntry) final
Convenience method called at the start of the data processing associated to a slot.
ROOT::RNTupleDescriptor fPrincipalDescriptor
A clone of the first pages source's descriptor.
Definition RNTupleDS.hxx:98
ULong64_t fSeenEntriesNoGlobalRange
The number of entries seen so far in GetEntryRanges()
std::vector< REntryRangeDS > fCurrentRanges
Basis for the ranges returned by the last GetEntryRanges() call.
std::vector< std::string > fTopLevelFieldNames
RNTupleDS(std::unique_ptr< ROOT::Internal::RPageSource > pageSource)
std::string GetTypeName(std::string_view colName) const final
Type of a column as a string, e.g.
std::size_t fNextFileIndex
Index into fFileNames to the next file to process.
const std::vector< std::string > & GetTopLevelFieldNames() const final
std::unordered_map< ROOT::DescriptorId_t, std::string > fFieldId2QualifiedName
Connects the IDs of active proto fields and their subfields to their fully qualified name (a....
std::string fNTupleName
The data source may be constructed with an ntuple name and a list of files.
void PrepareNextRanges()
Populates fNextRanges with the next set of entry ranges.
void StageNextSources()
Starting from fNextFileIndex, opens the next fNSlots files.
std::condition_variable fCvStaging
Signal for the state information of fIsReadyForStaging and fHasNextSources.
RNTupleDS(const RNTupleDS &)=delete
void Finalize() final
Convenience method called after concluding an event-loop.
std::string GetLabel() final
Return a string representation of the datasource type.
RNTupleDS & operator=(RNTupleDS &&)=delete
std::thread fThreadStaging
The background thread that runs StageNextSources()
std::mutex fMutexStaging
Protects the shared state between the main thread and the I/O thread.
std::unordered_map< std::size_t, std::vector< std::unique_ptr< ROOT::RFieldBase > > > fAlternativeProtoFields
Columns may be requested with types other than with which they were initially added as proto fields.
bool SetEntry(unsigned int, ULong64_t) final
Advance the "cursors" returned by GetColumnReaders to the selected entry for a particular slot.
std::vector< std::string > fColumnTypes
void Initialize() final
Convenience method called before starting an event-loop.
std::vector< std::string > fColumnNames
bool HasColumn(std::string_view colName) const final
Checks if the dataset has a certain column.
Record_t GetColumnReadersImpl(std::string_view name, const std::type_info &) final
type-erased vector of pointers to pointers to column values - one per slot
void FinalizeSlot(unsigned int slot) final
Convenience method called at the end of the data processing associated to a slot.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
ROOT's RDataFrame offers a modern, high-level interface for analysis of data stored in TTree ,...
A field translates read and write calls from/to underlying columns to/from tree values.
The on-storage metadata of an RNTuple.
ROOT::RDataFrame FromRNTuple(std::string_view ntupleName, const std::vector< std::string > &fileNames, const std::pair< ULong64_t, ULong64_t > &range)
Internal overload of the function that allows passing a range of entries.
std::pair< std::vector< ROOT::Internal::RNTupleClusterBoundaries >, ROOT::NTupleSize_t > GetClustersAndEntries(std::string_view ntupleName, std::string_view location)
Retrieves the cluster boundaries and the number of entries for the input RNTuple.
std::vector< std::pair< std::uint64_t, std::uint64_t > > GetDatasetGlobalClusterBoundaries(const RNode &node)
Retrieve the cluster boundaries for each cluster in the dataset, across files, with a global offset.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
The PrepareNextRanges() method populates the fNextRanges list with REntryRangeDS records.
Definition RNTupleDS.hxx:89
std::string_view fFileName
Storage location of the current RNTuple.
Definition RNTupleDS.hxx:94
ULong64_t fLastEntry
End entry index in fSource, e.g. the number of entries in the range is fLastEntry - fFirstEntry.
Definition RNTupleDS.hxx:93
std::unique_ptr< ROOT::Internal::RPageSource > fSource
Definition RNTupleDS.hxx:90
ULong64_t fFirstEntry
First entry index in fSource.
Definition RNTupleDS.hxx:91
Holds useful information about fields added to the RNTupleDS.
RFieldInfo(ROOT::DescriptorId_t fieldId, std::size_t nRepetitions)
ROOT::DescriptorId_t fFieldId