Logo ROOT  
Reference Guide
 
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Loading...
Searching...
No Matches
RNTupleDS.hxx
Go to the documentation of this file.
1/// \file RNTupleDS.hxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \author Enrico Guiraud <enrico.guiraud@cern.ch>
5/// \date 2018-10-04
6/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
7/// is welcome!
8
9/*************************************************************************
10 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
11 * All rights reserved. *
12 * *
13 * For the licensing terms see $ROOTSYS/LICENSE. *
14 * For the list of contributors see $ROOTSYS/README/CREDITS. *
15 *************************************************************************/
16
17#ifndef ROOT_RNTupleDS
18#define ROOT_RNTupleDS
19
20#include <ROOT/RDataSource.hxx>
21#include <ROOT/RNTupleUtil.hxx>
23#include <string_view>
24
25#include <condition_variable>
26#include <cstdint>
27#include <memory>
28#include <mutex>
29#include <string>
30#include <thread>
31#include <vector>
32#include <unordered_map>
33
34namespace ROOT {
35class RNTuple;
36class RFieldBase;
37
38namespace Experimental {
39
40namespace Internal {
41class RNTupleColumnReader;
42class RPageSource;
43}
44
47
48 /// The PrepareNextRanges() method populates the fNextRanges list with REntryRangeDS records.
49 /// The GetEntryRanges() swaps fNextRanges and fCurrentRanges and uses the list of
50 /// REntryRangeDS records to return the list of ranges ready to use by the RDF loop manager.
52 std::unique_ptr<ROOT::Experimental::Internal::RPageSource> fSource;
53 ULong64_t fFirstEntry = 0; ///< First entry index in fSource
54 /// End entry index in fSource, e.g. the number of entries in the range is fLastEntry - fFirstEntry
56 };
57
58 /// A clone of the first pages source's descriptor.
60
61 /// The data source may be constructed with an ntuple name and a list of files
62 std::string fNTupleName;
63 std::vector<std::string> fFileNames;
64 /// The staging area is relevant for chains of files, i.e. when fFileNames is not empty. In this case,
65 /// files are opened in the background in batches of size `fNSlots` and kept in the staging area.
66 /// The first file (chains or no chains) is always opened on construction in order to process the schema.
67 /// For all subsequent files, the corresponding page sources in the staging area only executed `LoadStructure()`,
68 /// i.e. they should have a compressed buffer of the meta-data available.
69 /// Concretely:
70 /// 1. We open the first file on construction to read the schema and then move the corresponding page source
71 /// in the staging area.
72 /// 2. On `Initialize()`, we start the I/O background thread, which in turn opens the first batch of files.
73 /// 3. At the beginning of `GetEntryRanges()`, we
74 /// a) wait for the I/O thread to finish,
75 /// b) call `PrepareNextRanges()` in the main thread to move the page sources from the staging area
76 /// into `fNextRanges`; this will also call `Attach()` on the page sources (i.e., deserialize the meta-data),
77 /// and
78 /// c) trigger staging of the next batch of files in the I/O background thread.
79 /// 4. On `Finalize()`, the I/O background thread is stopped.
80 std::vector<std::unique_ptr<ROOT::Experimental::Internal::RPageSource>> fStagingArea;
81 std::size_t fNextFileIndex = 0; ///< Index into fFileNames to the next file to process
82
83 /// We prepare a prototype field for every column. If a column reader is actually requested
84 /// in GetColumnReaders(), we move a clone of the field into a new column reader for RDataFrame.
85 /// Only the clone connects to the backing page store and acquires I/O resources.
86 /// The field IDs are set in the context of the first source and used as keys in fFieldId2QualifiedName.
87 std::vector<std::unique_ptr<ROOT::RFieldBase>> fProtoFields;
88 /// Columns may be requested with types other than with which they were initially added as proto fields. For example,
89 /// a column with a `ROOT::RVec<float>` proto field may instead be requested as a `std::vector<float>`. In case this
90 /// happens, we create an alternative proto field and store it here, with the original index in `fProtoFields` as
91 /// key. A single column can have more than one alternative proto fields.
92 std::unordered_map<std::size_t, std::vector<std::unique_ptr<ROOT::RFieldBase>>> fAlternativeProtoFields;
93 /// Connects the IDs of active proto fields and their subfields to their fully qualified name (a.b.c.d).
94 /// This enables the column reader to rewire the field IDs when the file changes (chain),
95 /// using the fully qualified name as a search key in the descriptor of the other page sources.
96 std::unordered_map<ROOT::DescriptorId_t, std::string> fFieldId2QualifiedName;
97 std::vector<std::string> fColumnNames;
98 std::vector<std::string> fColumnTypes;
99 /// List of column readers returned by GetColumnReaders() organized by slot. Used to reconnect readers
100 /// to new page sources when the files in the chain change.
101 std::vector<std::vector<Internal::RNTupleColumnReader *>> fActiveColumnReaders;
102
103 ULong64_t fSeenEntries = 0; ///< The number of entries so far returned by GetEntryRanges()
104 std::vector<REntryRangeDS> fCurrentRanges; ///< Basis for the ranges returned by the last GetEntryRanges() call
105 std::vector<REntryRangeDS> fNextRanges; ///< Basis for the ranges populated by the PrepareNextRanges() call
106 /// Maps the first entries from the ranges of the last GetEntryRanges() call to their corresponding index in
107 /// the fCurrentRanges vectors. This is necessary because the returned ranges get distributed arbitrarily
108 /// onto slots. In the InitSlot method, the column readers use this map to find the correct range to connect to.
109 std::unordered_map<ULong64_t, std::size_t> fFirstEntry2RangeIdx;
110
111 /// The background thread that runs StageNextSources()
112 std::thread fThreadStaging;
113 /// Protects the shared state between the main thread and the I/O thread
114 std::mutex fMutexStaging;
115 /// Signal for the state information of fIsReadyForStaging and fHasNextSources
116 std::condition_variable fCvStaging;
117 /// Is true when the staging thread should start working
118 bool fIsReadyForStaging = false;
119 /// Is true when the staging thread has populated the next batch of files to fStagingArea
120 bool fHasNextSources = false;
121 /// Is true when the I/O thread should quit
123
124 /// \brief Holds useful information about fields added to the RNTupleDS
125 struct RFieldInfo {
127 std::size_t fNRepetitions;
128 // Enable `std::vector::emplace_back` for this type
133 };
134
135 /// Provides the RDF column "colName" given the field identified by fieldID. For records and collections,
136 /// AddField recurses into the sub fields. The fieldInfos argument is a list of objects holding info
137 /// about the fields of the outer collection(s) (w.r.t. fieldId). For instance, if fieldId refers to an
138 /// `std::vector<Jet>`, with
139 /// ~~~{.cpp}
140 /// struct Jet {
141 /// float pt;
142 /// float eta;
143 /// };
144 /// ~~~
145 /// AddField will recurse into `Jet.pt` and `Jet.eta` and provide the two inner fields as `ROOT::VecOps::RVec<float>`
146 /// each.
147 ///
148 /// In case the field is a collection of type `ROOT::VecOps::RVec`, `std::vector` or `std::array`, its corresponding
149 /// column is added as a `ROOT::VecOps::RVec`. Otherwise, the collection field's on-disk type is used. Note, however,
150 /// that inner record members of such collections will still be added as `ROOT::VecOps::RVec` (e.g., `std::set<Jet>
151 /// will be added as a `std::set`, but `Jet.[pt|eta] will be added as `ROOT::VecOps::RVec<float>).
152 void AddField(const ROOT::RNTupleDescriptor &desc, std::string_view colName, ROOT::DescriptorId_t fieldId,
153 std::vector<RFieldInfo> fieldInfos, bool convertToRVec = true);
154
155 /// The main function of the fThreadStaging background thread
156 void ExecStaging();
157 /// Starting from `fNextFileIndex`, opens the next `fNSlots` files. Calls `LoadStructure()` on the opened files.
158 /// The very first file is already available from the constructor.
159 void StageNextSources();
160 /// Populates fNextRanges with the next set of entry ranges. Moves files from the staging area as necessary
161 /// and aligns ranges with cluster boundaries for scheduling the tail of files.
162 /// Upon return, the fNextRanges list is ordered. It has usually fNSlots elements; fewer if there
163 /// is not enough work to give at least one cluster to every slot.
164 void PrepareNextRanges();
165
166 explicit RNTupleDS(std::unique_ptr<ROOT::Experimental::Internal::RPageSource> pageSource);
167
168public:
169 RNTupleDS(std::string_view ntupleName, std::string_view fileName);
171 RNTupleDS(std::string_view ntupleName, const std::vector<std::string> &fileNames);
172 // Rule of five
173 RNTupleDS(const RNTupleDS &) = delete;
174 RNTupleDS &operator=(const RNTupleDS &) = delete;
175 RNTupleDS(RNTupleDS &&) = delete;
178
179 void SetNSlots(unsigned int nSlots) final;
180 std::size_t GetNFiles() const final { return fFileNames.empty() ? 1 : fFileNames.size(); }
181 const std::vector<std::string> &GetColumnNames() const final { return fColumnNames; }
182 bool HasColumn(std::string_view colName) const final;
183 std::string GetTypeName(std::string_view colName) const final;
184 std::vector<std::pair<ULong64_t, ULong64_t>> GetEntryRanges() final;
185 std::string GetLabel() final { return "RNTupleDS"; }
186
187 void Initialize() final;
188 void InitSlot(unsigned int slot, ULong64_t firstEntry) final;
189 void FinalizeSlot(unsigned int slot) final;
190 void Finalize() final;
191
192 std::unique_ptr<ROOT::Detail::RDF::RColumnReaderBase>
193 GetColumnReaders(unsigned int /*slot*/, std::string_view /*name*/, const std::type_info &) final;
194
195 // Old API, unused
196 bool SetEntry(unsigned int, ULong64_t) final { return true; }
197
198protected:
199 Record_t GetColumnReadersImpl(std::string_view name, const std::type_info &) final;
200};
201
202} // namespace Experimental
203
204class RDataFrame;
205
206namespace RDF {
207namespace Experimental {
208RDataFrame FromRNTuple(std::string_view ntupleName, std::string_view fileName);
209RDataFrame FromRNTuple(std::string_view ntupleName, const std::vector<std::string> &fileNames);
210} // namespace Experimental
211} // namespace RDF
212} // namespace ROOT
213
214#endif
unsigned long long ULong64_t
Definition RtypesCore.h:70
char name[80]
Definition TGX11.cxx:110
Every RDF column is represented by exactly one RNTuple field.
The RDataSource implementation for RNTuple.
Definition RNTupleDS.hxx:45
std::unique_ptr< ROOT::Detail::RDF::RColumnReaderBase > GetColumnReaders(unsigned int, std::string_view, const std::type_info &) final
If the other GetColumnReaders overload returns an empty vector, this overload will be called instead.
const std::vector< std::string > & GetColumnNames() const final
Returns a reference to the collection of the dataset's column names.
void SetNSlots(unsigned int nSlots) final
Inform RDataSource of the number of processing slots (i.e.
void AddField(const ROOT::RNTupleDescriptor &desc, std::string_view colName, ROOT::DescriptorId_t fieldId, std::vector< RFieldInfo > fieldInfos, bool convertToRVec=true)
Provides the RDF column "colName" given the field identified by fieldID.
RNTupleDS(const RNTupleDS &)=delete
void ExecStaging()
The main function of the fThreadStaging background thread.
ROOT::RNTupleDescriptor fPrincipalDescriptor
A clone of the first pages source's descriptor.
Definition RNTupleDS.hxx:59
std::vector< std::unique_ptr< ROOT::RFieldBase > > fProtoFields
We prepare a prototype field for every column.
Definition RNTupleDS.hxx:87
Record_t GetColumnReadersImpl(std::string_view name, const std::type_info &) final
type-erased vector of pointers to pointers to column values - one per slot
std::thread fThreadStaging
The background thread that runs StageNextSources()
void Initialize() final
Convenience method called before starting an event-loop.
RNTupleDS & operator=(const RNTupleDS &)=delete
std::size_t fNextFileIndex
Index into fFileNames to the next file to process.
Definition RNTupleDS.hxx:81
std::vector< std::string > fColumnNames
Definition RNTupleDS.hxx:97
std::unordered_map< ULong64_t, std::size_t > fFirstEntry2RangeIdx
Maps the first entries from the ranges of the last GetEntryRanges() call to their corresponding index...
void Finalize() final
Convenience method called after concluding an event-loop.
std::string GetTypeName(std::string_view colName) const final
Type of a column as a string, e.g.
void InitSlot(unsigned int slot, ULong64_t firstEntry) final
Convenience method called at the start of the data processing associated to a slot.
RNTupleDS(RNTupleDS &&)=delete
void FinalizeSlot(unsigned int slot) final
Convenience method called at the end of the data processing associated to a slot.
std::vector< std::vector< Internal::RNTupleColumnReader * > > fActiveColumnReaders
List of column readers returned by GetColumnReaders() organized by slot.
std::vector< std::pair< ULong64_t, ULong64_t > > GetEntryRanges() final
Return ranges of entries to distribute to tasks.
std::vector< REntryRangeDS > fCurrentRanges
Basis for the ranges returned by the last GetEntryRanges() call.
bool fStagingThreadShouldTerminate
Is true when the I/O thread should quit.
bool fHasNextSources
Is true when the staging thread has populated the next batch of files to fStagingArea.
std::vector< REntryRangeDS > fNextRanges
Basis for the ranges populated by the PrepareNextRanges() call.
std::mutex fMutexStaging
Protects the shared state between the main thread and the I/O thread.
void StageNextSources()
Starting from fNextFileIndex, opens the next fNSlots files.
std::size_t GetNFiles() const final
Returns the number of files from which the dataset is constructed.
bool HasColumn(std::string_view colName) const final
Checks if the dataset has a certain column.
std::vector< std::unique_ptr< ROOT::Experimental::Internal::RPageSource > > fStagingArea
The staging area is relevant for chains of files, i.e.
Definition RNTupleDS.hxx:80
void PrepareNextRanges()
Populates fNextRanges with the next set of entry ranges.
std::unordered_map< std::size_t, std::vector< std::unique_ptr< ROOT::RFieldBase > > > fAlternativeProtoFields
Columns may be requested with types other than with which they were initially added as proto fields.
Definition RNTupleDS.hxx:92
std::vector< std::string > fFileNames
Definition RNTupleDS.hxx:63
std::string fNTupleName
The data source may be constructed with an ntuple name and a list of files.
Definition RNTupleDS.hxx:62
std::vector< std::string > fColumnTypes
Definition RNTupleDS.hxx:98
RNTupleDS & operator=(RNTupleDS &&)=delete
RNTupleDS(std::unique_ptr< ROOT::Experimental::Internal::RPageSource > pageSource)
std::string GetLabel() final
Return a string representation of the datasource type.
bool SetEntry(unsigned int, ULong64_t) final
Advance the "cursors" returned by GetColumnReaders to the selected entry for a particular slot.
std::unordered_map< ROOT::DescriptorId_t, std::string > fFieldId2QualifiedName
Connects the IDs of active proto fields and their subfields to their fully qualified name (a....
Definition RNTupleDS.hxx:96
ULong64_t fSeenEntries
The number of entries so far returned by GetEntryRanges()
std::condition_variable fCvStaging
Signal for the state information of fIsReadyForStaging and fHasNextSources.
bool fIsReadyForStaging
Is true when the staging thread should start working.
Pure virtual base class for all column reader types.
RDataSource defines an API that RDataFrame can use to read arbitrary data formats.
std::vector< void * > Record_t
The on-storage metadata of an RNTuple.
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:69
RDataFrame FromRNTuple(std::string_view ntupleName, std::string_view fileName)
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
The PrepareNextRanges() method populates the fNextRanges list with REntryRangeDS records.
Definition RNTupleDS.hxx:51
std::unique_ptr< ROOT::Experimental::Internal::RPageSource > fSource
Definition RNTupleDS.hxx:52
ULong64_t fLastEntry
End entry index in fSource, e.g. the number of entries in the range is fLastEntry - fFirstEntry.
Definition RNTupleDS.hxx:55
ULong64_t fFirstEntry
First entry index in fSource.
Definition RNTupleDS.hxx:53
Holds useful information about fields added to the RNTupleDS.
RFieldInfo(ROOT::DescriptorId_t fieldId, std::size_t nRepetitions)