Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleDS.hxx
Go to the documentation of this file.
1/// \file RNTupleDS.hxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \author Enrico Guiraud <enrico.guiraud@cern.ch>
5/// \date 2018-10-04
6/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
7/// is welcome!
8
9/*************************************************************************
10 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
11 * All rights reserved. *
12 * *
13 * For the licensing terms see $ROOTSYS/LICENSE. *
14 * For the list of contributors see $ROOTSYS/README/CREDITS. *
15 *************************************************************************/
16
17#ifndef ROOT_RNTupleDS
18#define ROOT_RNTupleDS
19
20#include <ROOT/RDataFrame.hxx>
21#include <ROOT/RDataSource.hxx>
22#include <ROOT/RNTupleUtil.hxx>
23#include <string_view>
24
25#include <condition_variable>
26#include <cstdint>
27#include <memory>
28#include <mutex>
29#include <string>
30#include <thread>
31#include <vector>
32#include <unordered_map>
33
34namespace ROOT {
35class RNTuple;
36
37namespace Experimental {
38class RFieldBase;
39class RNTupleDescriptor;
40
41namespace Internal {
42class RNTupleColumnReader;
43class RPageSource;
44}
45
46class RNTupleDS final : public ROOT::RDF::RDataSource {
48
49 /// The PrepareNextRanges() method populates the fNextRanges list with REntryRangeDS records.
50 /// The GetEntryRanges() swaps fNextRanges and fCurrentRanges and uses the list of
51 /// REntryRangeDS records to return the list of ranges ready to use by the RDF loop manager.
53 std::unique_ptr<ROOT::Experimental::Internal::RPageSource> fSource;
54 ULong64_t fFirstEntry = 0; ///< First entry index in fSource
55 /// End entry index in fSource, e.g. the number of entries in the range is fLastEntry - fFirstEntry
57 };
58
59 /// A clone of the first pages source's descriptor.
60 std::unique_ptr<RNTupleDescriptor> fPrincipalDescriptor;
61
62 /// The data source may be constructed with an ntuple name and a list of files
63 std::string fNTupleName;
64 std::vector<std::string> fFileNames;
65 /// The staging area is relevant for chains of files, i.e. when fFileNames is not empty. In this case,
66 /// files are opened in the background in batches of size `fNSlots` and kept in the staging area.
67 /// The first file (chains or no chains) is always opened on construction in order to process the schema.
68 /// For all subsequent files, the corresponding page sources in the staging area only executed `LoadStructure()`,
69 /// i.e. they should have a compressed buffer of the meta-data available.
70 /// Concretely:
71 /// 1. We open the first file on construction to read the schema and then move the corresponding page source
72 /// in the staging area.
73 /// 2. On `Initialize()`, we start the I/O background thread, which in turn opens the first batch of files.
74 /// 3. At the beginning of `GetEntryRanges()`, we
75 /// a) wait for the I/O thread to finish,
76 /// b) call `PrepareNextRanges()` in the main thread to move the page sources from the staging area
77 /// into `fNextRanges`; this will also call `Attach()` on the page sources (i.e., deserialize the meta-data),
78 /// and
79 /// c) trigger staging of the next batch of files in the I/O background thread.
80 /// 4. On `Finalize()`, the I/O background thread is stopped.
81 std::vector<std::unique_ptr<ROOT::Experimental::Internal::RPageSource>> fStagingArea;
82 std::size_t fNextFileIndex = 0; ///< Index into fFileNames to the next file to process
83
84 /// We prepare a prototype field for every column. If a column reader is actually requested
85 /// in GetColumnReaders(), we move a clone of the field into a new column reader for RDataFrame.
86 /// Only the clone connects to the backing page store and acquires I/O resources.
87 /// The field IDs are set in the context of the first source and used as keys in fFieldId2QualifiedName.
88 std::vector<std::unique_ptr<ROOT::Experimental::RFieldBase>> fProtoFields;
89 /// Connects the IDs of active proto fields and their subfields to their fully qualified name (a.b.c.d).
90 /// This enables the column reader to rewire the field IDs when the file changes (chain),
91 /// using the fully qualified name as a search key in the descriptor of the other page sources.
92 std::unordered_map<ROOT::Experimental::DescriptorId_t, std::string> fFieldId2QualifiedName;
93 std::vector<std::string> fColumnNames;
94 std::vector<std::string> fColumnTypes;
95 /// List of column readers returned by GetColumnReaders() organized by slot. Used to reconnect readers
96 /// to new page sources when the files in the chain change.
97 std::vector<std::vector<Internal::RNTupleColumnReader *>> fActiveColumnReaders;
98
99 unsigned int fNSlots = 0;
100 ULong64_t fSeenEntries = 0; ///< The number of entries so far returned by GetEntryRanges()
101 std::vector<REntryRangeDS> fCurrentRanges; ///< Basis for the ranges returned by the last GetEntryRanges() call
102 std::vector<REntryRangeDS> fNextRanges; ///< Basis for the ranges populated by the PrepareNextRanges() call
103 /// Maps the first entries from the ranges of the last GetEntryRanges() call to their corresponding index in
104 /// the fCurrentRanges vectors. This is necessary because the returned ranges get distributed arbitrarily
105 /// onto slots. In the InitSlot method, the column readers use this map to find the correct range to connect to.
106 std::unordered_map<ULong64_t, std::size_t> fFirstEntry2RangeIdx;
107
108 /// The background thread that runs StageNextSources()
109 std::thread fThreadStaging;
110 /// Protects the shared state between the main thread and the I/O thread
111 std::mutex fMutexStaging;
112 /// Signal for the state information of fIsReadyForStaging and fHasNextSources
113 std::condition_variable fCvStaging;
114 /// Is true when the staging thread should start working
115 bool fIsReadyForStaging = false;
116 /// Is true when the staging thread has populated the next batch of files to fStagingArea
117 bool fHasNextSources = false;
118 /// Is true when the I/O thread should quit
120
121 /// \brief Holds useful information about fields added to the RNTupleDS
122 struct RFieldInfo {
124 std::size_t fNRepetitions;
125 // Enable `std::vector::emplace_back` for this type
126 RFieldInfo(DescriptorId_t fieldId, std::size_t nRepetitions) : fFieldId(fieldId), fNRepetitions(nRepetitions) {}
127 };
128
129 /// Provides the RDF column "colName" given the field identified by fieldID. For records and collections,
130 /// AddField recurses into the sub fields. The fieldInfos argument is a list of objects holding info
131 /// about the fields of the outer collection(s) (w.r.t. fieldId). For instance, if fieldId refers to an
132 /// `std::vector<Jet>`, with
133 /// struct Jet {
134 /// float pt;
135 /// float eta;
136 /// };
137 /// AddField will recurse into Jet.pt and Jet.eta and provide the two inner fields as std::vector<float> each.
138 void AddField(const RNTupleDescriptor &desc, std::string_view colName, DescriptorId_t fieldId,
139 std::vector<RFieldInfo> fieldInfos);
140
141 /// The main function of the fThreadStaging background thread
142 void ExecStaging();
143 /// Starting from `fNextFileIndex`, opens the next `fNSlots` files. Calls `LoadStructure()` on the opened files.
144 /// The very first file is already available from the constructor.
145 void StageNextSources();
146 /// Populates fNextRanges with the next set of entry ranges. Moves files from the staging area as necessary
147 /// and aligns ranges with cluster boundaries for scheduling the tail of files.
148 /// Upon return, the fNextRanges list is ordered. It has usually fNSlots elements; fewer if there
149 /// is not enough work to give at least one cluster to every slot.
150 void PrepareNextRanges();
151
152 explicit RNTupleDS(std::unique_ptr<ROOT::Experimental::Internal::RPageSource> pageSource);
153
154public:
155 RNTupleDS(std::string_view ntupleName, std::string_view fileName);
156 RNTupleDS(ROOT::RNTuple *ntuple);
157 RNTupleDS(std::string_view ntupleName, const std::vector<std::string> &fileNames);
158 // Rule of five
159 RNTupleDS(const RNTupleDS &) = delete;
160 RNTupleDS &operator=(const RNTupleDS &) = delete;
161 RNTupleDS(RNTupleDS &&) = delete;
163 ~RNTupleDS() final;
164
165 void SetNSlots(unsigned int nSlots) final;
166 std::size_t GetNFiles() const final { return fFileNames.empty() ? 1 : fFileNames.size(); }
167 const std::vector<std::string> &GetColumnNames() const final { return fColumnNames; }
168 bool HasColumn(std::string_view colName) const final;
169 std::string GetTypeName(std::string_view colName) const final;
170 std::vector<std::pair<ULong64_t, ULong64_t>> GetEntryRanges() final;
171 std::string GetLabel() final { return "RNTupleDS"; }
172
173 void Initialize() final;
174 void InitSlot(unsigned int slot, ULong64_t firstEntry) final;
175 void FinalizeSlot(unsigned int slot) final;
176 void Finalize() final;
177
178 std::unique_ptr<ROOT::Detail::RDF::RColumnReaderBase>
179 GetColumnReaders(unsigned int /*slot*/, std::string_view /*name*/, const std::type_info &) final;
180
181 // Old API, unused
182 bool SetEntry(unsigned int, ULong64_t) final { return true; }
183
184protected:
185 Record_t GetColumnReadersImpl(std::string_view name, const std::type_info &) final;
186};
187
188} // namespace Experimental
189
190namespace RDF {
191namespace Experimental {
192RDataFrame FromRNTuple(std::string_view ntupleName, std::string_view fileName);
193RDataFrame FromRNTuple(std::string_view ntupleName, const std::vector<std::string> &fileNames);
194RDataFrame FromRNTuple(ROOT::RNTuple *ntuple);
195} // namespace Experimental
196} // namespace RDF
197
198} // ns ROOT
199
200#endif
unsigned long long ULong64_t
Definition RtypesCore.h:70
char name[80]
Definition TGX11.cxx:110
Every RDF column is represented by exactly one RNTuple field.
The RDataSource implementation for RNTuple.
Definition RNTupleDS.hxx:46
std::unique_ptr< ROOT::Detail::RDF::RColumnReaderBase > GetColumnReaders(unsigned int, std::string_view, const std::type_info &) final
If the other GetColumnReaders overload returns an empty vector, this overload will be called instead.
const std::vector< std::string > & GetColumnNames() const final
Returns a reference to the collection of the dataset's column names.
void SetNSlots(unsigned int nSlots) final
Inform RDataSource of the number of processing slots (i.e.
RNTupleDS(const RNTupleDS &)=delete
void ExecStaging()
The main function of the fThreadStaging background thread.
Record_t GetColumnReadersImpl(std::string_view name, const std::type_info &) final
type-erased vector of pointers to pointers to column values - one per slot
std::thread fThreadStaging
The background thread that runs StageNextSources()
void Initialize() final
Convenience method called before starting an event-loop.
void AddField(const RNTupleDescriptor &desc, std::string_view colName, DescriptorId_t fieldId, std::vector< RFieldInfo > fieldInfos)
Provides the RDF column "colName" given the field identified by fieldID.
RNTupleDS & operator=(const RNTupleDS &)=delete
std::size_t fNextFileIndex
Index into fFileNames to the next file to process.
Definition RNTupleDS.hxx:82
std::vector< std::string > fColumnNames
Definition RNTupleDS.hxx:93
std::unordered_map< ULong64_t, std::size_t > fFirstEntry2RangeIdx
Maps the first entries from the ranges of the last GetEntryRanges() call to their corresponding index...
void Finalize() final
Convenience method called after concluding an event-loop.
std::string GetTypeName(std::string_view colName) const final
Type of a column as a string, e.g.
void InitSlot(unsigned int slot, ULong64_t firstEntry) final
Convenience method called at the start of the data processing associated to a slot.
RNTupleDS(RNTupleDS &&)=delete
std::unordered_map< ROOT::Experimental::DescriptorId_t, std::string > fFieldId2QualifiedName
Connects the IDs of active proto fields and their subfields to their fully qualified name (a....
Definition RNTupleDS.hxx:92
void FinalizeSlot(unsigned int slot) final
Convenience method called at the end of the data processing associated to a slot.
std::vector< std::vector< Internal::RNTupleColumnReader * > > fActiveColumnReaders
List of column readers returned by GetColumnReaders() organized by slot.
Definition RNTupleDS.hxx:97
std::vector< std::pair< ULong64_t, ULong64_t > > GetEntryRanges() final
Return ranges of entries to distribute to tasks.
std::vector< REntryRangeDS > fCurrentRanges
Basis for the ranges returned by the last GetEntryRanges() call.
bool fStagingThreadShouldTerminate
Is true when the I/O thread should quit.
std::unique_ptr< RNTupleDescriptor > fPrincipalDescriptor
A clone of the first pages source's descriptor.
Definition RNTupleDS.hxx:60
bool fHasNextSources
Is true when the staging thread has populated the next batch of files to fStagingArea.
std::vector< REntryRangeDS > fNextRanges
Basis for the ranges populated by the PrepareNextRanges() call.
std::vector< std::unique_ptr< ROOT::Experimental::RFieldBase > > fProtoFields
We prepare a prototype field for every column.
Definition RNTupleDS.hxx:88
std::mutex fMutexStaging
Protects the shared state between the main thread and the I/O thread.
void StageNextSources()
Starting from fNextFileIndex, opens the next fNSlots files.
std::size_t GetNFiles() const final
Returns the number of files from which the dataset is constructed.
bool HasColumn(std::string_view colName) const final
Checks if the dataset has a certain column.
std::vector< std::unique_ptr< ROOT::Experimental::Internal::RPageSource > > fStagingArea
The staging area is relevant for chains of files, i.e.
Definition RNTupleDS.hxx:81
void PrepareNextRanges()
Populates fNextRanges with the next set of entry ranges.
std::vector< std::string > fFileNames
Definition RNTupleDS.hxx:64
std::string fNTupleName
The data source may be constructed with an ntuple name and a list of files.
Definition RNTupleDS.hxx:63
std::vector< std::string > fColumnTypes
Definition RNTupleDS.hxx:94
RNTupleDS & operator=(RNTupleDS &&)=delete
std::string GetLabel() final
Return a string representation of the datasource type.
bool SetEntry(unsigned int, ULong64_t) final
Advance the "cursors" returned by GetColumnReaders to the selected entry for a particular slot.
ULong64_t fSeenEntries
The number of entries so far returned by GetEntryRanges()
std::condition_variable fCvStaging
Signal for the state information of fIsReadyForStaging and fHasNextSources.
bool fIsReadyForStaging
Is true when the staging thread should start working.
The on-storage meta-data of an ntuple.
Pure virtual base class for all column reader types.
RDataSource defines an API that RDataFrame can use to read arbitrary data formats.
std::vector< void * > Record_t
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:69
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
RDataFrame FromRNTuple(std::string_view ntupleName, std::string_view fileName)
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
The PrepareNextRanges() method populates the fNextRanges list with REntryRangeDS records.
Definition RNTupleDS.hxx:52
std::unique_ptr< ROOT::Experimental::Internal::RPageSource > fSource
Definition RNTupleDS.hxx:53
ULong64_t fLastEntry
End entry index in fSource, e.g. the number of entries in the range is fLastEntry - fFirstEntry.
Definition RNTupleDS.hxx:56
ULong64_t fFirstEntry
First entry index in fSource.
Definition RNTupleDS.hxx:54
Holds useful information about fields added to the RNTupleDS.
RFieldInfo(DescriptorId_t fieldId, std::size_t nRepetitions)