Logo ROOT  
Reference Guide
Loading...
Searching...
No Matches
RNTupleDS.cxx
Go to the documentation of this file.
1/// \file RNTupleDS.cxx
2/// \author Jakob Blomer <jblomer@cern.ch>
3/// \author Enrico Guiraud <enrico.guiraud@cern.ch>
4/// \date 2018-10-04
5
6/*************************************************************************
7 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
8 * All rights reserved. *
9 * *
10 * For the licensing terms see $ROOTSYS/LICENSE. *
11 * For the list of contributors see $ROOTSYS/README/CREDITS. *
12 *************************************************************************/
13
15#include <ROOT/RDataFrame.hxx>
16#include <ROOT/RDF/Utils.hxx>
17#include <ROOT/RField.hxx>
18#include <ROOT/RFieldUtils.hxx>
21#include <ROOT/RNTupleDS.hxx>
22#include <ROOT/RNTupleTypes.hxx>
23#include <ROOT/RPageStorage.hxx>
24#include <string_view>
25
26#include <TError.h>
27#include <TSystem.h>
28
29#include <cassert>
30#include <limits>
31#include <memory>
32#include <mutex>
33#include <string>
34#include <vector>
35#include <typeinfo>
36#include <type_traits>
37#include <utility>
38
39// clang-format off
40/**
41* \class ROOT::RDF::RNTupleDS
42* \ingroup dataframe
43* \brief The RDataSource implementation for RNTuple. It lets RDataFrame read RNTuple data.
44*
45* An RDataFrame that reads RNTuple data can be constructed using FromRNTuple().
46*
47* For each column containing an array or a collection, a corresponding column `#colname` is available to access
48* `colname.size()` without reading and deserializing the collection values.
49*
50**/
51// clang-format on
52
53namespace ROOT::Internal::RDF {
55protected:
56 // We construct these fields and know that they match the page source
58
59 RRDFCardinalityFieldBase(std::string_view name, std::string_view type)
60 : ROOT::RFieldBase(name, type, ROOT::ENTupleStructure::kPlain, false /* isSimple */)
61 {
62 }
63
64 // Field is only used for reading
65 void GenerateColumns() final { throw RException(R__FAIL("Cardinality fields must only be used for reading")); }
70
71public:
76 ~RRDFCardinalityFieldBase() override = default;
77
87};
88
89/// An artificial field that transforms an RNTuple column that contains the offset of collections into
90/// collection sizes. It is used to provide the "number of" RDF columns for collections, e.g.
91/// `R_rdf_sizeof_jets` for a collection named `jets`.
92///
93/// This is similar to the RCardinalityField but it presents itself as an integer type.
94/// The template argument T must be an integral type.
95template <typename T>
97 static_assert(std::is_integral_v<T>, "T must be an integral type");
98
100 {
101 if constexpr (std::is_same_v<T, bool> || std::is_same_v<T, std::uint64_t>)
102 return;
103 if (size > static_cast<ROOT::NTupleSize_t>(std::numeric_limits<T>::max())) {
104 throw RException(R__FAIL(std::string("integer overflow in field ") + GetFieldName() +
105 ". Please read the column with a larger-sized integral type."));
106 }
107 }
108
109protected:
110 std::unique_ptr<ROOT::RFieldBase> CloneImpl(std::string_view newName) const final
111 {
112 return std::make_unique<RRDFCardinalityField>(newName);
113 }
114 void ConstructValue(void *where) const final { *static_cast<T *>(where) = 0; }
115
116public:
125 ~RRDFCardinalityField() override = default;
126
127 std::size_t GetValueSize() const final { return sizeof(T); }
128 std::size_t GetAlignment() const final { return alignof(T); }
129
130 /// Get the number of elements of the collection identified by globalIndex
131 void ReadGlobalImpl(ROOT::NTupleSize_t globalIndex, void *to) final
132 {
133 RNTupleLocalIndex collectionStart;
135 fPrincipalColumn->GetCollectionInfo(globalIndex, &collectionStart, &size);
137 *static_cast<T *>(to) = size;
138 }
139
140 /// Get the number of elements of the collection identified by clusterIndex
141 void ReadInClusterImpl(ROOT::RNTupleLocalIndex localIndex, void *to) final
142 {
143 RNTupleLocalIndex collectionStart;
145 fPrincipalColumn->GetCollectionInfo(localIndex, &collectionStart, &size);
147 *static_cast<T *>(to) = size;
148 }
149};
150
151/**
152 * @brief An artificial field that provides the size of a fixed-size array
153 *
154 * This is the implementation of `R_rdf_sizeof_column` in case `column` contains
155 * fixed-size arrays on disk.
156 */
157class RArraySizeField final : public ROOT::RFieldBase {
158private:
159 std::size_t fArrayLength;
160
161 std::unique_ptr<ROOT::RFieldBase> CloneImpl(std::string_view newName) const final
162 {
163 return std::make_unique<RArraySizeField>(newName, fArrayLength);
164 }
165 void GenerateColumns() final { throw RException(R__FAIL("RArraySizeField fields must only be used for reading")); }
167 void ReadGlobalImpl(ROOT::NTupleSize_t /*globalIndex*/, void *to) final
168 {
169 *static_cast<std::size_t *>(to) = fArrayLength;
170 }
171 void ReadInClusterImpl(RNTupleLocalIndex /*localIndex*/, void *to) final
172 {
173 *static_cast<std::size_t *>(to) = fArrayLength;
174 }
175
176 // We construct these fields and know that they match the page source
178
179public:
180 RArraySizeField(std::string_view name, std::size_t arrayLength)
181 : ROOT::RFieldBase(name, ROOT::Internal::GetRenormalizedTypeName(typeid(std::size_t)),
182 ROOT::ENTupleStructure::kPlain, false /* isSimple */),
183 fArrayLength(arrayLength)
184 {
185 }
186 RArraySizeField(const RArraySizeField &other) = delete;
190 ~RArraySizeField() final = default;
191
192 void ConstructValue(void *where) const final { *static_cast<std::size_t *>(where) = 0; }
193 std::size_t GetValueSize() const final { return sizeof(std::size_t); }
194 std::size_t GetAlignment() const final { return alignof(std::size_t); }
195};
196
197/// Every RDF column is represented by exactly one RNTuple field
201
202 RNTupleDS *fDataSource; ///< The data source that owns this column reader
203 RFieldBase *fProtoField; ///< The prototype field from which fField is cloned
204 std::unique_ptr<RFieldBase> fField; ///< The field backing the RDF column
205 std::unique_ptr<RFieldBase::RValue> fValue; ///< The memory location used to read from fField
206 std::shared_ptr<void> fValuePtr; ///< Used to reuse the object created by fValue when reconnecting sources
207 Long64_t fLastEntry = -1; ///< Last entry number that was read
208 /// For chains, the logical entry and the physical entry in any particular file can be different.
209 /// The entry offset stores the logical entry number (sum of all previous physical entries) when a file of the
210 /// corresponding data source was opened.
212
213public:
214 RNTupleColumnReader(RNTupleDS *ds, RFieldBase *protoField) : fDataSource(ds), fProtoField(protoField) {}
215 ~RNTupleColumnReader() override = default;
216
217 /// Connect the field and its subfields to the page source
218 void Connect(RPageSource &source, Long64_t entryOffset)
219 {
220 assert(fLastEntry == -1);
221
222 fEntryOffset = entryOffset;
223
224 // Create a new, real field from the prototype and set its field ID in the context of the given page source
225 fField = fProtoField->Clone(fProtoField->GetFieldName());
226 {
227 auto descGuard = source.GetSharedDescriptorGuard();
228 // Set the on-disk field IDs for the field and the subfield
229 fField->SetOnDiskId(
230 descGuard->FindFieldId(fDataSource->fFieldId2QualifiedName.at(fProtoField->GetOnDiskId())));
231 auto iProto = fProtoField->cbegin();
232 auto iReal = fField->begin();
233 for (; iReal != fField->end(); ++iProto, ++iReal) {
234 iReal->SetOnDiskId(descGuard->FindFieldId(fDataSource->fFieldId2QualifiedName.at(iProto->GetOnDiskId())));
235 }
236 }
237
238 RFieldZero fieldZero;
240 fieldZero.Attach(std::move(fField));
241 try {
243 } catch (const ROOT::RException &) {
244 fField = std::move(fieldZero.ReleaseSubfields()[0]);
245 auto onDiskType = source.GetSharedDescriptorGuard()->GetFieldDescriptor(fField->GetOnDiskId()).GetTypeName();
246 std::string msg = "RNTupleDS: invalid type \"" + fField->GetTypeName() + "\" for column \"" +
247 fDataSource->fFieldId2QualifiedName[fField->GetOnDiskId()] + "\" with on-disk type \"" +
248 onDiskType + "\"";
249 throw std::runtime_error(msg);
250 }
251 fField = std::move(fieldZero.ReleaseSubfields()[0]);
252
253 if (fValuePtr) {
254 // When the reader reconnects to a new file, the fValuePtr is already set
255 fValue = std::make_unique<RFieldBase::RValue>(fField->BindValue(fValuePtr));
256 fValuePtr = nullptr;
257 } else {
258 // For the first file, create a new object for this field (reader)
259 fValue = std::make_unique<RFieldBase::RValue>(fField->CreateValue());
260 }
261 }
262
263 void Disconnect(bool keepValue)
264 {
265 if (fValue && keepValue) {
266 fValuePtr = fValue->GetPtr<void>();
267 }
268 fValue = nullptr;
269 fField = nullptr;
270 fLastEntry = -1;
271 }
272
273 void *GetImpl(Long64_t entry) final
274 {
275 if (entry != fLastEntry) {
276 fValue->Read(entry - fEntryOffset);
277 fLastEntry = entry;
278 }
279 return fValue->GetPtr<void>().get();
280 }
281};
282} // namespace ROOT::Internal::RDF
283
285
286void ROOT::RDF::RNTupleDS::AddField(const ROOT::RNTupleDescriptor &desc, std::string_view colName,
287 ROOT::DescriptorId_t fieldId, std::vector<RNTupleDS::RFieldInfo> fieldInfos,
288 bool convertToRVec)
289{
290 // As an example for the mapping of RNTuple fields to RDF columns, let's consider an RNTuple
291 // using the following types and with a top-level field named "event" of type Event:
292 //
293 // struct Event {
294 // int id;
295 // std::vector<Track> tracks;
296 // };
297 // struct Track {
298 // std::vector<Hit> hits;
299 // };
300 // struct Hit {
301 // float x;
302 // float y;
303 // };
304 //
305 // AddField() will be called from the constructor with the RNTuple root field (ENTupleStructure::kRecord).
306 // From there, we recurse into the "event" sub field (also ENTupleStructure::kRecord) and further down the
307 // tree of sub fields and expose the following RDF columns:
308 //
309 // "event" [Event]
310 // "event.id" [int]
311 // "event.tracks" [RVec<Track>]
312 // "R_rdf_sizeof_event.tracks" [unsigned int]
313 // "event.tracks.hits" [RVec<RVec<Hit>>]
314 // "R_rdf_sizeof_event.tracks.hits" [RVec<unsigned int>]
315 // "event.tracks.hits.x" [RVec<RVec<float>>]
316 // "R_rdf_sizeof_event.tracks.hits.x" [RVec<unsigned int>]
317 // "event.tracks.hits.y" [RVec<RVec<float>>]
318 // "R_rdf_sizeof_event.tracks.hits.y" [RVec<unsigned int>]
319
320 const auto &fieldDesc = desc.GetFieldDescriptor(fieldId);
321 const auto &nRepetitions = fieldDesc.GetNRepetitions();
322 if ((fieldDesc.GetStructure() == ROOT::ENTupleStructure::kCollection) || (nRepetitions > 0)) {
323 // The field is a collection or a fixed-size array.
324 // We open a new collection scope with fieldID being the inner most collection. E.g. for "event.tracks.hits",
325 // fieldInfos would already contain the fieldID of "event.tracks"
326 fieldInfos.emplace_back(fieldId, nRepetitions);
327 }
328
329 if (fieldDesc.GetStructure() == ROOT::ENTupleStructure::kCollection) {
330 // Inner fields of collections are provided as projected collections of only that inner field,
331 // E.g. we provide a projected collection RVec<RVec<float>> for "event.tracks.hits.x" in the example
332 // above.
333 bool representableAsRVec =
334 convertToRVec && (fieldDesc.GetTypeName().substr(0, 19) == "ROOT::VecOps::RVec<" ||
335 fieldDesc.GetTypeName().substr(0, 12) == "std::vector<" || fieldDesc.GetTypeName() == "");
336 const auto &f = *desc.GetFieldIterable(fieldDesc.GetId()).begin();
337 AddField(desc, colName, f.GetId(), fieldInfos, representableAsRVec);
338
339 // Note that at the end of the recursion, we handled the inner sub collections as well as the
340 // collection as whole, so we are done.
341 return;
342
343 } else if (nRepetitions > 0) {
344 // Fixed-size array, same logic as ROOT::RVec.
345 const auto &f = *desc.GetFieldIterable(fieldDesc.GetId()).begin();
346 AddField(desc, colName, f.GetId(), fieldInfos);
347 return;
348 } else if (fieldDesc.GetStructure() == ROOT::ENTupleStructure::kRecord) {
349 // Inner fields of records are provided as individual RDF columns, e.g. "event.id"
350 for (const auto &f : desc.GetFieldIterable(fieldDesc.GetId())) {
351 auto innerName = colName.empty() ? f.GetFieldName() : (std::string(colName) + "." + f.GetFieldName());
352 // Inner fields of collections of records are always exposed as ROOT::RVec
353 AddField(desc, innerName, f.GetId(), fieldInfos);
354 }
355 }
356
357 // The fieldID could be the root field or the class of fieldId might not be loaded.
358 // In these cases, only the inner fields are exposed as RDF columns.
359 auto fieldOrException = ROOT::RFieldBase::Create(fieldDesc.GetFieldName(), fieldDesc.GetTypeName());
360 if (!fieldOrException)
361 return;
362 auto valueField = fieldOrException.Unwrap();
363 if (const auto cardinalityField = dynamic_cast<const ROOT::RCardinalityField *>(valueField.get())) {
364 // Cardinality fields in RDataFrame are presented as integers
365 if (cardinalityField->As32Bit()) {
366 valueField =
367 std::make_unique<ROOT::Internal::RDF::RRDFCardinalityField<std::uint32_t>>(fieldDesc.GetFieldName());
368 } else if (cardinalityField->As64Bit()) {
369 valueField =
370 std::make_unique<ROOT::Internal::RDF::RRDFCardinalityField<std::uint64_t>>(fieldDesc.GetFieldName());
371 } else {
372 R__ASSERT(false && "cardinality field stored with an unexpected integer type");
373 }
374 }
375 valueField->SetOnDiskId(fieldId);
376 for (auto &f : *valueField) {
377 f.SetOnDiskId(desc.FindFieldId(f.GetFieldName(), f.GetParent()->GetOnDiskId()));
378 }
379 std::unique_ptr<ROOT::RFieldBase> cardinalityField;
380 // Collections get the additional "number of" RDF column (e.g. "R_rdf_sizeof_tracks")
381 if (!fieldInfos.empty()) {
382 const auto &info = fieldInfos.back();
383 const std::string name = "R_rdf_sizeof_" + desc.GetFieldDescriptor(info.fFieldId).GetFieldName();
384 if (info.fNRepetitions > 0) {
385 cardinalityField = std::make_unique<ROOT::Internal::RDF::RArraySizeField>(name, info.fNRepetitions);
386 } else {
387 cardinalityField = std::make_unique<ROOT::Internal::RDF::RRDFCardinalityField<std::size_t>>(name);
388 }
389 cardinalityField->SetOnDiskId(info.fFieldId);
390 }
391
392 for (auto i = fieldInfos.rbegin(); i != fieldInfos.rend(); ++i) {
393 const auto &fieldInfo = *i;
394
395 const auto valueFieldName = valueField->GetFieldName();
396
397 if (fieldInfo.fNRepetitions > 0) {
398 // Fixed-size array, read it as ROOT::RVec in memory
399 valueField =
400 std::make_unique<ROOT::RArrayAsRVecField>(valueFieldName, valueField->Clone("_0"), fieldInfo.fNRepetitions);
401 } else {
402 // Actual collection. A std::vector or ROOT::RVec gets added as a ROOT::RVec. All other collection types keep
403 // their original type.
404 if (convertToRVec) {
405 valueField = std::make_unique<ROOT::RRVecField>(valueFieldName, valueField->Clone("_0"));
406 } else {
407 auto outerFieldType = desc.GetFieldDescriptor(fieldInfo.fFieldId).GetTypeName();
408 valueField = ROOT::RFieldBase::Create(valueFieldName, outerFieldType).Unwrap();
409 }
410 }
411
412 valueField->SetOnDiskId(fieldInfo.fFieldId);
413
414 // Skip the inner-most collection level to construct the cardinality column
415 // It's taken care of by the `if (!fieldInfos.empty())` scope above
416 if (i != fieldInfos.rbegin()) {
417 const auto cardinalityFieldName = cardinalityField->GetFieldName();
418 if (fieldInfo.fNRepetitions > 0) {
419 // This collection level refers to a fixed-size array
420 cardinalityField = std::make_unique<ROOT::RArrayAsRVecField>(
421 cardinalityFieldName, cardinalityField->Clone("_0"), fieldInfo.fNRepetitions);
422 } else {
423 // This collection level refers to an RVec
424 cardinalityField = std::make_unique<ROOT::RRVecField>(cardinalityFieldName, cardinalityField->Clone("_0"));
425 }
426
427 cardinalityField->SetOnDiskId(fieldInfo.fFieldId);
428 }
429 }
430
431 if (cardinalityField) {
432 fColumnNames.emplace_back("R_rdf_sizeof_" + std::string(colName));
433 fColumnTypes.emplace_back(cardinalityField->GetTypeName());
434 fProtoFields.emplace_back(std::move(cardinalityField));
435 }
436
437 fieldInfos.emplace_back(fieldId, nRepetitions);
438 fColumnNames.emplace_back(colName);
439 fColumnTypes.emplace_back(valueField->GetTypeName());
440 fProtoFields.emplace_back(std::move(valueField));
441}
442
443ROOT::RDF::RNTupleDS::RNTupleDS(std::unique_ptr<ROOT::Internal::RPageSource> pageSource)
444{
445 pageSource->Attach();
446 fPrincipalDescriptor = pageSource->GetSharedDescriptorGuard()->Clone();
447 fStagingArea.emplace_back(std::move(pageSource));
448
449 AddField(fPrincipalDescriptor, "", fPrincipalDescriptor.GetFieldZeroId(),
450 std::vector<ROOT::RDF::RNTupleDS::RFieldInfo>());
451
452 auto topLevelFields = fPrincipalDescriptor.GetTopLevelFields();
453 const auto nTopLevelFields = std::distance(topLevelFields.begin(), topLevelFields.end());
454 fTopLevelFieldNames.reserve(nTopLevelFields);
455 for (const auto &field : topLevelFields)
456 fTopLevelFieldNames.push_back(field.GetFieldName());
457}
458
459namespace {
460
461const ROOT::RNTupleReadOptions &GetOpts()
462{
463 // The setting is for now a global one, must be decided before running the
464 // program by setting the appropriate environment variable. Make sure that
465 // option configuration is thread-safe and happens only once.
466 static ROOT::RNTupleReadOptions opts;
467 static std::once_flag flag;
468 std::call_once(flag, []() {
469 if (auto env = gSystem->Getenv("ROOT_RNTUPLE_CLUSTERBUNCHSIZE"); env != nullptr && strlen(env) > 0) {
470 std::string envStr{env};
471 auto envNum{std::stoul(envStr)};
472 envNum = envNum == 0 ? 1 : envNum;
474 }
475 });
476 return opts;
477}
478
479std::unique_ptr<ROOT::Internal::RPageSource> CreatePageSource(std::string_view ntupleName, std::string_view fileName)
480{
481 return ROOT::Internal::RPageSource::Create(ntupleName, fileName, GetOpts());
482}
483} // namespace
484
485ROOT::RDF::RNTupleDS::RNTupleDS(std::string_view ntupleName, std::string_view fileName)
486 : RNTupleDS(CreatePageSource(ntupleName, fileName))
487{
488 fNTupleName = ntupleName;
489 fFileNames = std::vector<std::string>{std::string{fileName}};
490}
491
492ROOT::RDF::RNTupleDS::RNTupleDS(std::string_view ntupleName, const std::vector<std::string> &fileNames)
493 : RNTupleDS(CreatePageSource(ntupleName, fileNames[0]))
494{
495 fNTupleName = ntupleName;
496 fFileNames = fileNames;
497 fStagingArea.resize(fFileNames.size());
498}
499
500ROOT::RDF::RNTupleDS::RNTupleDS(std::string_view ntupleName, const std::vector<std::string> &fileNames,
501 const std::pair<ULong64_t, ULong64_t> &range)
502 : RNTupleDS(ntupleName, fileNames)
503{
504 fGlobalEntryRange = range;
505}
506
508ROOT::RDF::RNTupleDS::GetColumnReadersImpl(std::string_view /* name */, const std::type_info & /* ti */)
509{
510 // This datasource uses the newer GetColumnReaders() API
511 return {};
512}
513
514ROOT::RFieldBase *ROOT::RDF::RNTupleDS::GetFieldWithTypeChecks(std::string_view fieldName, const std::type_info &tid)
515{
516 // At this point we can assume that `name` will be found in fColumnNames
517 const auto index =
518 std::distance(fColumnNames.begin(), std::find(fColumnNames.begin(), fColumnNames.end(), fieldName));
519
520 // A reader was requested but we don't have RTTI for it, this is encoded with the tag UseNativeDataType. We can just
521 // return the available protofield
522 if (tid == typeid(ROOT::Internal::RDF::UseNativeDataType)) {
523 return fProtoFields[index].get();
524 }
525
526 // The user explicitly requested a type
528
529 // If the requested type is different from the proto field that was created when the data source was constructed,
530 // we first have to create an alternative proto field for the column reader.
531 // Otherwise, we can directly use the existing proto field.
532 if (requestedType != fColumnTypes[index]) {
533 auto &altProtoFields = fAlternativeProtoFields[index];
534
535 // If we can find the requested type in the registered alternative protofields, return the corresponding field
536 if (auto altProtoField = std::find_if(altProtoFields.begin(), altProtoFields.end(),
537 [&requestedType](const std::unique_ptr<ROOT::RFieldBase> &fld) {
538 return fld->GetTypeName() == requestedType;
539 });
540 altProtoField != altProtoFields.end()) {
541 return altProtoField->get();
542 }
543
544 // Otherwise, create a new protofield and register it in the alternatives before returning
545 std::unique_ptr<RFieldBase> newAltProtoField;
546 const std::string strName = std::string(fieldName);
547 if (dynamic_cast<ROOT::Internal::RDF::RRDFCardinalityFieldBase *>(fProtoFields[index].get())) {
548 if (requestedType == "bool") {
549 newAltProtoField = std::make_unique<ROOT::Internal::RDF::RRDFCardinalityField<bool>>(strName);
550 } else if (requestedType == "char") {
551 newAltProtoField = std::make_unique<ROOT::Internal::RDF::RRDFCardinalityField<char>>(strName);
552 } else if (requestedType == "std::int8_t") {
553 newAltProtoField = std::make_unique<ROOT::Internal::RDF::RRDFCardinalityField<std::int8_t>>(strName);
554 } else if (requestedType == "std::uint8_t") {
555 newAltProtoField = std::make_unique<ROOT::Internal::RDF::RRDFCardinalityField<std::uint8_t>>(strName);
556 } else if (requestedType == "std::int16_t") {
557 newAltProtoField = std::make_unique<ROOT::Internal::RDF::RRDFCardinalityField<std::int16_t>>(strName);
558 } else if (requestedType == "std::uint16_t") {
559 newAltProtoField = std::make_unique<ROOT::Internal::RDF::RRDFCardinalityField<std::uint16_t>>(strName);
560 } else if (requestedType == "std::int32_t") {
561 newAltProtoField = std::make_unique<ROOT::Internal::RDF::RRDFCardinalityField<std::int32_t>>(strName);
562 } else if (requestedType == "std::uint32_t") {
563 newAltProtoField = std::make_unique<ROOT::Internal::RDF::RRDFCardinalityField<std::uint32_t>>(strName);
564 } else if (requestedType == "std::int64_t") {
565 newAltProtoField = std::make_unique<ROOT::Internal::RDF::RRDFCardinalityField<std::int64_t>>(strName);
566 } else if (requestedType == "std::uint64_t") {
567 newAltProtoField = std::make_unique<ROOT::Internal::RDF::RRDFCardinalityField<std::uint64_t>>(strName);
568 } else {
569 throw std::runtime_error("RNTupleDS: Could not create field with type \"" + requestedType +
570 "\" for column \"" + std::string(fieldName) + "\"");
571 }
572 } else {
573 auto newAltProtoFieldOrException = ROOT::RFieldBase::Create(strName, requestedType);
574 if (!newAltProtoFieldOrException) {
575 throw std::runtime_error("RNTupleDS: Could not create field with type \"" + requestedType +
576 "\" for column \"" + std::string(fieldName) + "\"");
577 }
578 newAltProtoField = newAltProtoFieldOrException.Unwrap();
579 }
580 newAltProtoField->SetOnDiskId(fProtoFields[index]->GetOnDiskId());
581 auto *newField = newAltProtoField.get();
582 altProtoFields.emplace_back(std::move(newAltProtoField));
583 return newField;
584 }
585
586 // General case: there was a correspondence between the user-requested type and the corresponding column type
587 return fProtoFields[index].get();
588}
589
590std::unique_ptr<ROOT::Detail::RDF::RColumnReaderBase>
591ROOT::RDF::RNTupleDS::GetColumnReaders(unsigned int slot, std::string_view name, const std::type_info &tid)
592{
594 assert(field != nullptr);
595
596 // Map the field's and subfields' IDs to qualified names so that we can later connect the fields to
597 // other page sources from the chain
598 fFieldId2QualifiedName[field->GetOnDiskId()] = fPrincipalDescriptor.GetQualifiedFieldName(field->GetOnDiskId());
599 for (const auto &s : *field) {
600 fFieldId2QualifiedName[s.GetOnDiskId()] = fPrincipalDescriptor.GetQualifiedFieldName(s.GetOnDiskId());
601 }
602
603 auto reader = std::make_unique<ROOT::Internal::RDF::RNTupleColumnReader>(this, field);
604 fActiveColumnReaders[slot].emplace_back(reader.get());
605
606 return reader;
607}
608
609void ROOT::RDF::RNTupleDS::ExecStaging()
610{
611 while (true) {
612 std::unique_lock lock(fMutexStaging);
613 fCvStaging.wait(lock, [this] { return fIsReadyForStaging || fStagingThreadShouldTerminate; });
614 if (fStagingThreadShouldTerminate)
615 return;
616
617 assert(!fHasNextSources);
618 StageNextSources();
619 fHasNextSources = true;
620 fIsReadyForStaging = false;
621
622 lock.unlock();
623 fCvStaging.notify_one();
624 }
625}
626
628{
629 const auto nFiles = fFileNames.empty() ? 1 : fFileNames.size();
630
631 for (auto i = fNextFileIndex; (i < nFiles) && ((i - fNextFileIndex) < fNSlots); ++i) {
632
634 return;
635
636 if (fStagingArea[i]) {
637 // The first file is already open and was used to read the schema
638 assert(i == 0);
639 } else {
640 fStagingArea[i] = CreatePageSource(fNTupleName, fFileNames[i]);
641 fStagingArea[i]->LoadStructure();
642 }
643 }
644}
645
647{
648 assert(fNextRanges.empty());
649
650 auto nFiles = fFileNames.empty() ? 1 : fFileNames.size();
651 auto nRemainingFiles = nFiles - fNextFileIndex;
652
653 if (nRemainingFiles == 0)
654 return;
655
656 // Easy work scheduling: one file per slot. We skip empty files (files without entries).
657
658 if ((nRemainingFiles >= fNSlots) || (fGlobalEntryRange.has_value())) {
659 while ((fNextRanges.size() < fNSlots) && (fNextFileIndex < nFiles)) {
660 REntryRangeDS range;
661
662 std::swap(fStagingArea[fNextFileIndex], range.fSource);
663
664 if (!range.fSource) {
665 // Typically, the prestaged source should have been present. Only if some of the files are empty, we need
666 // to open and attach files here.
667 range.fSource = CreatePageSource(fNTupleName, fFileNames[fNextFileIndex]);
668 }
670 range.fSource->Attach();
672 auto nEntries = range.fSource->GetNEntries();
673 if (nEntries == 0)
674 continue;
675 range.fLastEntry = nEntries; // whole file per slot, i.e. entry range [0..nEntries - 1]
676
677 fNextRanges.emplace_back(std::move(range));
678 }
679 return;
680 }
681
682 // Work scheduling of the tail: multiple slots work on the same file.
683 // Every slot still has its own page source but these page sources may open the same file.
684 // Again, we need to skip empty files.
685 unsigned int nSlotsPerFile = fNSlots / nRemainingFiles;
686 for (std::size_t i = 0; (fNextRanges.size() < fNSlots) && (fNextFileIndex < nFiles); ++i) {
687 std::unique_ptr<ROOT::Internal::RPageSource> source;
688 // Need to look for the file name to populate the sample info later
689 const auto &sourceFileName = fFileNames[fNextFileIndex];
690 std::swap(fStagingArea[fNextFileIndex], source);
691 if (!source) {
692 // Empty files trigger this condition
693 source = CreatePageSource(fNTupleName, fFileNames[fNextFileIndex]);
694 }
695 source->Attach();
697
698 auto nEntries = source->GetNEntries();
699 if (nEntries == 0)
700 continue;
701
702 // If last file: use all remaining slots
703 if (i == (nRemainingFiles - 1))
704 nSlotsPerFile = fNSlots - fNextRanges.size();
705
706 const auto rangesByCluster = [&source]() {
707 // Take the shared lock of the descriptor just for the time necessary
708 const auto descGuard = source->GetSharedDescriptorGuard();
709 return ROOT::Internal::GetClusterBoundaries(descGuard.GetRef());
710 }();
711
712 const unsigned int nRangesByCluster = rangesByCluster.size();
713
714 // Distribute slots equidistantly over the entry range, aligned on cluster boundaries
715 const auto nClustersPerSlot = nRangesByCluster / nSlotsPerFile;
716 const auto remainder = nRangesByCluster % nSlotsPerFile;
717 std::size_t iRange = 0;
718 unsigned int iSlot = 0;
719 const unsigned int N = std::min(nSlotsPerFile, nRangesByCluster);
720 for (; iSlot < N; ++iSlot) {
721 auto start = rangesByCluster[iRange].fFirstEntry;
722 iRange += nClustersPerSlot + static_cast<int>(iSlot < remainder);
723 assert(iRange > 0);
724 auto end = rangesByCluster[iRange - 1].fLastEntryPlusOne;
725
726 REntryRangeDS range;
727 range.fFileName = sourceFileName;
728 // The last range for this file just takes the already opened page source. All previous ranges clone.
729 if (iSlot == N - 1) {
730 range.fSource = std::move(source);
731 } else {
732 range.fSource = source->Clone();
733 }
734 range.fSource->SetEntryRange({start, end - start});
735 range.fFirstEntry = start;
736 range.fLastEntry = end;
737 fNextRanges.emplace_back(std::move(range));
738 }
739 } // loop over tail of remaining files
740}
741
742std::vector<std::pair<ULong64_t, ULong64_t>> ROOT::RDF::RNTupleDS::GetEntryRanges()
743{
744 std::vector<std::pair<ULong64_t, ULong64_t>> ranges;
745
746 // We need to distinguish between single threaded and multi-threaded runs.
747 // In single threaded mode, InitSlot is only called once and column readers have to be rewired
748 // to new page sources of the chain in GetEntryRanges. In multi-threaded mode, on the other hand,
749 // InitSlot is called for every returned range, thus rewiring the column readers takes place in
750 // InitSlot and FinalizeSlot.
751
752 if (fNSlots == 1) {
753 for (auto r : fActiveColumnReaders[0]) {
754 r->Disconnect(true /* keepValue */);
755 }
756 }
757
758 // If we have fewer files than slots and we run multiple event loops, we can reuse fCurrentRanges and don't need
759 // to worry about loading the fNextRanges. I.e., in this case we don't enter the if block.
760 if (fCurrentRanges.empty() || fSeenEntriesNoGlobalRange > 0) {
761 // Otherwise, i.e. start of the first event loop or in the middle of the event loop, prepare the next ranges
762 // and swap with the current ones.
763 {
764 std::unique_lock lock(fMutexStaging);
765 fCvStaging.wait(lock, [this] { return fHasNextSources; });
766 }
768 if (fNextRanges.empty()) {
769 // No more data
770 return ranges;
771 }
772
773 assert(fNextRanges.size() <= fNSlots);
774
775 fCurrentRanges.clear();
776 std::swap(fCurrentRanges, fNextRanges);
777 }
778
779 // Stage next batch of files for the next call to GetEntryRanges()
780 {
781 std::lock_guard _(fMutexStaging);
782 fIsReadyForStaging = true;
783 fHasNextSources = false;
784 }
785 fCvStaging.notify_one();
786
787 // Create ranges for the RDF loop manager from the list of REntryRangeDS records.
788 // The entry ranges that are relative to the page source in REntryRangeDS are translated into absolute
789 // entry ranges, given the current state of the entry cursor.
790 // We remember the connection from first absolute entry index of a range to its REntryRangeDS record
791 // so that we can properly rewire the column reader in InitSlot
792 fFirstEntry2RangeIdx.clear();
793 fOriginalRanges.clear();
794
795 ULong64_t nEntriesPerSource = 0;
796
797 for (std::size_t i = 0; i < fCurrentRanges.size(); ++i) {
798
799 // Several consecutive ranges may operate on the same file (each with their own page source clone).
800 // We can detect a change of file when the first entry number jumps back to 0.
801 if (fCurrentRanges[i].fFirstEntry == 0) {
802 // New source
803 fSeenEntriesNoGlobalRange += nEntriesPerSource;
804 nEntriesPerSource = 0;
805 }
806
807 auto start = fCurrentRanges[i].fFirstEntry + fSeenEntriesNoGlobalRange;
808 auto end = fCurrentRanges[i].fLastEntry + fSeenEntriesNoGlobalRange;
809
810 nEntriesPerSource += end - start;
811
812 if (fGlobalEntryRange.has_value()) {
813
814 // We need to consider different scenarios for when we have GlobalRanges set by the user.
815 // Consider a simple case of 3 files, with original ranges set as (consecutive entries of 3 files):
816 // [0, 20], [20, 45], [45, 65]
817 // we will now see what happens in each of the scenarios below when GlobalRanges can be set to different
818 // values:
819 // a) [2, 5] - we stay in file 1
820 // - hence we will use the 1st case and get the range [2,5], in this case we also need to quit further
821 // processing from the other files by entering case 3
822 // b) [2, 21] - we start in file 1 and finish in file 2
823 // - use the 2nd case first, as 21 > 20 (end of first file), then we will go to case 1, resulting in ranges:
824 // [2, 20], [20, 21], c) [21 - 40] - we skip file 1, start in file 2 and stay in file 2
825 // - to skip the first file, we use the 4th case, followed by the 1st case, resulting range is: [21, 40]
826 // d) [21 - 65] - we skip file 1, start in file 2 and continue to file 3
827 // - to skip the first file, we use the 4th case, we continue with the 2nd case, and use the 1st case at the
828 // end, resulting ranges are [21, 45], [45, 65]
829 // The first case
830 if (fGlobalEntryRange->first >= start && fGlobalEntryRange->second <= end) {
831 fOriginalRanges.emplace_back(start, end);
833 ranges.emplace_back(fGlobalEntryRange->first, fGlobalEntryRange->second);
834 }
835
836 // The second case:
837 // The `fGlobalEntryRange->first < end` condition is to distinguish this case from the 4th case.
838 else if (fGlobalEntryRange->second > end && fGlobalEntryRange->first < end) {
839 fOriginalRanges.emplace_back(start, end);
841 ranges.emplace_back(fGlobalEntryRange->first, end);
842 std::optional<std::pair<ULong64_t, ULong64_t>> newvalues({end, fGlobalEntryRange->second});
843 fGlobalEntryRange.swap(newvalues);
844 }
845 // The third case, needed to correctly quit processing if we only stay in the first file
846 else if (fGlobalEntryRange->second < start) {
847 return ranges;
848 }
849
850 // The fourth case:
851 else if (fGlobalEntryRange->first >= end) {
852 fOriginalRanges.emplace_back(start, end);
854 ranges.emplace_back(start, start);
855 }
856 }
857
858 else {
860 fOriginalRanges.emplace_back(start, end);
861 ranges.emplace_back(start, end);
862 }
863 }
864
865 fSeenEntriesNoGlobalRange += nEntriesPerSource;
866
867 if ((fNSlots == 1) && (fCurrentRanges[0].fSource)) {
868 for (auto r : fActiveColumnReaders[0]) {
869 r->Connect(*fCurrentRanges[0].fSource, fOriginalRanges[0].first);
870 }
871 }
872
873 return ranges;
874}
875
876void ROOT::RDF::RNTupleDS::InitSlot(unsigned int slot, ULong64_t firstEntry)
877{
878 if (fNSlots == 1) {
879 // Ensure the connection between slot and range is valid also in single-thread mode
880 fSlotsToRangeIdxs[0] = 0;
881 return;
882 }
883
884 // The same slot ID could be picked multiple times in the same execution, thus
885 // ending up processing different page sources. Here we re-establish the
886 // connection between the slot and the correct page source by finding which
887 // range index corresponds to the first entry passed.
888 auto idxRange = fFirstEntry2RangeIdx.at(firstEntry);
889
890 // We also remember this connection so it can later be retrieved in CreateSampleInfo
892
893 for (auto r : fActiveColumnReaders[slot]) {
894 r->Connect(*fCurrentRanges[idxRange].fSource,
895 fOriginalRanges[idxRange].first - fCurrentRanges[idxRange].fFirstEntry);
896 }
897}
898
900{
901 if (fNSlots == 1)
902 return;
903
904 for (auto r : fActiveColumnReaders[slot]) {
905 r->Disconnect(true /* keepValue */);
906 }
907}
908
909std::string ROOT::RDF::RNTupleDS::GetTypeName(std::string_view colName) const
910{
911 auto colNamePos = std::find(fColumnNames.begin(), fColumnNames.end(), colName);
912
913 if (colNamePos == fColumnNames.end()) {
914 auto msg = std::string("RNTupleDS: There is no column with name \"") + std::string(colName) + "\"";
915 throw std::runtime_error(msg);
916 }
917
918 const auto index = std::distance(fColumnNames.begin(), colNamePos);
919 return fColumnTypes[index];
920}
921
922bool ROOT::RDF::RNTupleDS::HasColumn(std::string_view colName) const
923{
924 return std::find(fColumnNames.begin(), fColumnNames.end(), colName) != fColumnNames.end();
925}
926
928{
930 fNextFileIndex = 0;
932 fThreadStaging = std::thread(&RNTupleDS::ExecStaging, this);
933 assert(fNextRanges.empty());
934
935 if (fCurrentRanges.empty() || (fFileNames.size() > fNSlots)) {
936 // First event loop or large number of files: start the staging process.
937 {
938 std::lock_guard _(fMutexStaging);
939 fIsReadyForStaging = true;
940 }
941 fCvStaging.notify_one();
942 } else {
943 // Otherwise, we will reuse fCurrentRanges. Make sure that staging and preparing next ranges will be a noop
944 // (already at the end of the list of files).
945 fNextFileIndex = std::max(fFileNames.size(), std::size_t(1));
946 }
947}
948
950{
951 for (unsigned int i = 0; i < fNSlots; ++i) {
952 for (auto r : fActiveColumnReaders[i]) {
953 r->Disconnect(false /* keepValue */);
954 }
955 }
956 {
957 std::lock_guard _(fMutexStaging);
959 }
960 fCvStaging.notify_one();
961 fThreadStaging.join();
962 // If we have a chain with more files than the number of slots, the files opened at the end of the
963 // event loop won't be reused when the event loop restarts, so we can close them.
964 if (fFileNames.size() > fNSlots) {
965 fCurrentRanges.clear();
966 fNextRanges.clear();
967 fStagingArea.clear();
968 fStagingArea.resize(fFileNames.size());
969 }
970}
971
972void ROOT::RDF::RNTupleDS::SetNSlots(unsigned int nSlots)
973{
974 assert(fNSlots == 0);
975 assert(nSlots > 0);
976 fNSlots = nSlots;
979}
980
981ROOT::RDataFrame ROOT::RDF::FromRNTuple(std::string_view ntupleName, std::string_view fileName)
982{
983 return ROOT::RDataFrame(std::make_unique<ROOT::RDF::RNTupleDS>(ntupleName, fileName));
984}
985
986ROOT::RDataFrame ROOT::RDF::FromRNTuple(std::string_view ntupleName, const std::vector<std::string> &fileNames)
987{
988 return ROOT::RDataFrame(std::make_unique<ROOT::RDF::RNTupleDS>(ntupleName, fileNames));
989}
990
991ROOT::RDF::RSampleInfo ROOT::Internal::RDF::RNTupleDS::CreateSampleInfo(
992 unsigned int slot, const std::unordered_map<std::string, ROOT::RDF::Experimental::RSample *> &sampleMap) const
993{
994 // The same slot ID could be picked multiple times in the same execution, thus
995 // ending up processing different page sources. Here we re-establish the
996 // connection between the slot and the correct page source by retrieving
997 // which range is connected currently to the slot
998
999 const auto &rangeIdx = fSlotsToRangeIdxs.at(slot * ROOT::Internal::RDF::CacheLineStep<std::size_t>());
1000
1001 // Missing source if a file does not exist
1002 if (!fCurrentRanges[rangeIdx].fSource)
1003 return ROOT::RDF::RSampleInfo{};
1004
1005 const auto &ntupleName = fCurrentRanges[rangeIdx].fSource->GetNTupleName();
1006 const auto &ntuplePath = fCurrentRanges[rangeIdx].fFileName;
1007 const auto ntupleID = std::string(ntuplePath) + '/' + ntupleName;
1008
1009 if (sampleMap.empty())
1011 ntupleID, std::make_pair(fCurrentRanges[rangeIdx].fFirstEntry, fCurrentRanges[rangeIdx].fLastEntry), nullptr,
1012 fPrincipalDescriptor.GetNEntries());
1013
1014 if (sampleMap.find(ntupleID) == sampleMap.end())
1015 throw std::runtime_error("Full sample identifier '" + ntupleID + "' cannot be found in the available samples.");
1016
1018 ntupleID, std::make_pair(fCurrentRanges[rangeIdx].fFirstEntry, fCurrentRanges[rangeIdx].fLastEntry),
1019 sampleMap.at(ntupleID), fPrincipalDescriptor.GetNEntries());
1020}
1021
1023 const ROOT::RDF::ColumnNames_t &fileNames,
1024 const std::pair<ULong64_t, ULong64_t> &range)
1025{
1026 std::unique_ptr<ROOT::RDF::RNTupleDS> ds{new ROOT::RDF::RNTupleDS(ntupleName, fileNames, range)};
1027 return ROOT::RDataFrame(std::move(ds));
1028}
1029
1030std::pair<std::vector<ROOT::Internal::RNTupleClusterBoundaries>, ROOT::NTupleSize_t>
1031ROOT::Internal::RDF::GetClustersAndEntries(std::string_view ntupleName, std::string_view location)
1032{
1033 auto source = ROOT::Internal::RPageSource::Create(ntupleName, location);
1034 source->Attach();
1035 const auto descGuard = source->GetSharedDescriptorGuard();
1036 return std::make_pair(ROOT::Internal::GetClusterBoundaries(descGuard.GetRef()), descGuard->GetNEntries());
1037}
ROOT::R::TRInterface & r
Definition Object.C:4
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:299
#define f(i)
Definition RSha256.hxx:104
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
start
Definition Rotated.cxx:223
long long Long64_t
Portable signed long integer 8 bytes.
Definition RtypesCore.h:83
unsigned long long ULong64_t
Portable unsigned long integer 8 bytes.
Definition RtypesCore.h:84
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
#define N
char name[80]
Definition TGX11.cxx:148
externTSystem * gSystem
Definition TSystem.h:582
#define _(A, B)
Definition cfortran.h:108
void GenerateColumns(const ROOT::RNTupleDescriptor &) final
Implementations in derived classes should create the backing columns corresponding to the field type ...
void ReadGlobalImpl(ROOT::NTupleSize_t, void *to) final
RArraySizeField(const RArraySizeField &other)=delete
RArraySizeField(RArraySizeField &&other)=default
RArraySizeField & operator=(RArraySizeField &&other)=default
RArraySizeField(std::string_view name, std::size_t arrayLength)
void ReadInClusterImpl(RNTupleLocalIndex, void *to) final
void ReconcileOnDiskField(const RNTupleDescriptor &) final
For non-artificial fields, check compatibility of the in-memory field and the on-disk field.
std::size_t GetValueSize() const final
The number of bytes taken by a value of the appropriate type.
void ConstructValue(void *where) const final
Constructs value in a given location of size at least GetValueSize(). Called by the base class' Creat...
RArraySizeField & operator=(const RArraySizeField &other)=delete
std::size_t GetAlignment() const final
As a rule of thumb, the alignment is equal to the size of the type.
void GenerateColumns() final
Implementations in derived classes should create the backing columns corresponding to the field type ...
std::unique_ptr< ROOT::RFieldBase > CloneImpl(std::string_view newName) const final
Called by Clone(), which additionally copies the on-disk ID.
ROOT::Internal::RPageSource RPageSource
void * GetImpl(Long64_t entry) final
void Connect(RPageSource &source, Long64_t entryOffset)
Connect the field and its subfields to the page source.
RNTupleColumnReader(RNTupleDS *ds, RFieldBase *protoField)
std::unique_ptr< RFieldBase::RValue > fValue
The memory location used to read from fField.
std::unique_ptr< RFieldBase > fField
The field backing the RDF column.
Long64_t fEntryOffset
For chains, the logical entry and the physical entry in any particular file can be different.
std::shared_ptr< void > fValuePtr
Used to reuse the object created by fValue when reconnecting sources.
RNTupleDS * fDataSource
The data source that owns this column reader.
RFieldBase * fProtoField
The prototype field from which fField is cloned.
Long64_t fLastEntry
Last entry number that was read.
void ReconcileOnDiskField(const RNTupleDescriptor &) final
For non-artificial fields, check compatibility of the in-memory field and the on-disk field.
Definition RNTupleDS.cxx:57
void GenerateColumns() final
Implementations in derived classes should create the backing columns corresponding to the field type ...
Definition RNTupleDS.cxx:65
RRDFCardinalityFieldBase(std::string_view name, std::string_view type)
Definition RNTupleDS.cxx:59
const RColumnRepresentations & GetColumnRepresentations() const final
Implementations in derived classes should return a static RColumnRepresentations object.
Definition RNTupleDS.cxx:78
void GenerateColumns(const ROOT::RNTupleDescriptor &desc) final
Implementations in derived classes should create the backing columns corresponding to the field type ...
Definition RNTupleDS.cxx:66
RRDFCardinalityFieldBase(const RRDFCardinalityFieldBase &other)=delete
RRDFCardinalityFieldBase(RRDFCardinalityFieldBase &&other)=default
RRDFCardinalityFieldBase & operator=(const RRDFCardinalityFieldBase &other)=delete
RRDFCardinalityFieldBase & operator=(RRDFCardinalityFieldBase &&other)=default
RRDFCardinalityField(RRDFCardinalityField &&other)=default
void CheckSize(ROOT::NTupleSize_t size) const
Definition RNTupleDS.cxx:99
void ConstructValue(void *where) const final
Constructs value in a given location of size at least GetValueSize(). Called by the base class' Creat...
RRDFCardinalityField & operator=(const RRDFCardinalityField &other)=delete
void ReadInClusterImpl(ROOT::RNTupleLocalIndex localIndex, void *to) final
Get the number of elements of the collection identified by clusterIndex.
std::size_t GetValueSize() const final
The number of bytes taken by a value of the appropriate type.
RRDFCardinalityField(const RRDFCardinalityField &other)=delete
RRDFCardinalityField & operator=(RRDFCardinalityField &&other)=default
void ReadGlobalImpl(ROOT::NTupleSize_t globalIndex, void *to) final
Get the number of elements of the collection identified by globalIndex.
std::unique_ptr< ROOT::RFieldBase > CloneImpl(std::string_view newName) const final
Called by Clone(), which additionally copies the on-disk ID.
RRDFCardinalityField(std::string_view name)
std::size_t GetAlignment() const final
As a rule of thumb, the alignment is equal to the size of the type.
static void SetClusterBunchSize(RNTupleReadOptions &options, unsigned int val)
Abstract interface to read data from an ntuple.
const RSharedDescriptorGuard GetSharedDescriptorGuard() const
Takes the read lock for the descriptor.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Guess the concrete derived page source from the file name (location).
std::vector< void * > Record_t
std::optional< std::pair< ULong64_t, ULong64_t > > fGlobalEntryRange
The RDataSource implementation for RNTuple.
Definition RNTupleDS.hxx:82
bool fHasNextSources
Is true when the staging thread has populated the next batch of files to fStagingArea.
std::vector< std::pair< ULong64_t, ULong64_t > > GetEntryRanges() final
Return ranges of entries to distribute to tasks.
std::vector< REntryRangeDS > fNextRanges
Basis for the ranges populated by the PrepareNextRanges() call.
std::unordered_map< ULong64_t, std::size_t > fFirstEntry2RangeIdx
Maps the first entries from the ranges of the last GetEntryRanges() call to their corresponding index...
bool fStagingThreadShouldTerminate
Is true when the I/O thread should quit.
std::vector< std::vector< ROOT::Internal::RDF::RNTupleColumnReader * > > fActiveColumnReaders
List of column readers returned by GetColumnReaders() organized by slot.
std::vector< std::unique_ptr< ROOT::Internal::RPageSource > > fStagingArea
The staging area is relevant for chains of files, i.e.
std::vector< std::pair< ULong64_t, ULong64_t > > fOriginalRanges
std::unique_ptr< ROOT::Detail::RDF::RColumnReaderBase > GetColumnReaders(unsigned int, std::string_view, const std::type_info &) final
If the other GetColumnReaders overload returns an empty vector, this overload will be called instead.
std::vector< std::size_t > fSlotsToRangeIdxs
One element per slot, corresponding to the current range index for that slot, as filled by InitSlot.
std::vector< std::unique_ptr< ROOT::RFieldBase > > fProtoFields
We prepare a prototype field for every column.
void SetNSlots(unsigned int nSlots) final
Inform RDataSource of the number of processing slots (i.e.
ROOT::RFieldBase * GetFieldWithTypeChecks(std::string_view fieldName, const std::type_info &tid)
std::vector< std::string > fFileNames
bool fIsReadyForStaging
Is true when the staging thread should start working.
void InitSlot(unsigned int slot, ULong64_t firstEntry) final
Convenience method called at the start of the data processing associated to a slot.
ROOT::RNTupleDescriptor fPrincipalDescriptor
A clone of the first pages source's descriptor.
Definition RNTupleDS.hxx:97
ULong64_t fSeenEntriesNoGlobalRange
The number of entries seen so far in GetEntryRanges().
std::vector< REntryRangeDS > fCurrentRanges
Basis for the ranges returned by the last GetEntryRanges() call.
std::vector< std::string > fTopLevelFieldNames
RNTupleDS(std::unique_ptr< ROOT::Internal::RPageSource > pageSource)
std::string GetTypeName(std::string_view colName) const final
Type of a column as a string, e.g.
std::size_t fNextFileIndex
Index into fFileNames to the next file to process.
std::unordered_map< ROOT::DescriptorId_t, std::string > fFieldId2QualifiedName
Connects the IDs of active proto fields and their subfields to their fully qualified name (a....
std::string fNTupleName
The data source may be constructed with an ntuple name and a list of files.
void PrepareNextRanges()
Populates fNextRanges with the next set of entry ranges.
void StageNextSources()
Provides the RDF column "colName" given the field identified by fieldID.
std::condition_variable fCvStaging
Signal for the state information of fIsReadyForStaging and fHasNextSources.
void Finalize() final
Convenience method called after concluding an event-loop.
std::thread fThreadStaging
The background thread that runs StageNextSources().
std::mutex fMutexStaging
Protects the shared state between the main thread and the I/O thread.
std::unordered_map< std::size_t, std::vector< std::unique_ptr< ROOT::RFieldBase > > > fAlternativeProtoFields
Columns may be requested with types other than with which they were initially added as proto fields.
std::vector< std::string > fColumnTypes
void Initialize() final
Convenience method called before starting an event-loop.
std::vector< std::string > fColumnNames
bool HasColumn(std::string_view colName) const final
Checks if the dataset has a certain column.
Record_t GetColumnReadersImpl(std::string_view name, const std::type_info &) final
type-erased vector of pointers to pointers to column values - one per slot
void FinalizeSlot(unsigned int slot) final
Convenience method called at the end of the data processing associated to a slot.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
ROOT's RDataFrame offers a modern, high-level interface for analysis of data stored in TTree ,...
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
The list of column representations a field can have.
A field translates read and write calls from/to underlying columns to/from tree values.
ROOT::Internal::RColumn * fPrincipalColumn
All fields that have columns have a distinct main column.
const std::string & GetFieldName() const
RFieldBase(std::string_view name, std::string_view type, ROOT::ENTupleStructure structure, bool isSimple, std::size_t nRepetitions=0)
The constructor creates the underlying column objects and connects them to either a sink or a source.
static RResult< std::unique_ptr< RFieldBase > > Create(const std::string &fieldName, const std::string &typeName, const ROOT::RCreateFieldOptions &options, const ROOT::RNTupleDescriptor *desc, ROOT::DescriptorId_t fieldId)
Factory method to resurrect a field from the stored on-disk type information.
ROOT::DescriptorId_t GetOnDiskId() const
void GenerateColumnsImpl()
For writing, use the currently set column representative.
std::uint64_t GetNRepetitions() const
const std::string & GetFieldName() const
const std::string & GetTypeName() const
The container field for an ntuple model, which itself has no physical representation.
Definition RField.hxx:58
std::vector< std::unique_ptr< RFieldBase > > ReleaseSubfields()
Moves all subfields into the returned vector.
Definition RField.cxx:64
void Attach(std::unique_ptr< RFieldBase > child)
A public version of the Attach method that allows piece-wise construction of the zero field.
Definition RField.cxx:40
The on-storage metadata of an RNTuple.
RFieldDescriptorIterable GetFieldIterable(const RFieldDescriptor &fieldDesc) const
const RFieldDescriptor & GetFieldDescriptor(ROOT::DescriptorId_t fieldId) const
ROOT::DescriptorId_t FindFieldId(std::string_view fieldName, ROOT::DescriptorId_t parentId) const
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Common user-tunable settings for reading RNTuples.
std::string TypeID2TypeName(const std::type_info &id)
Returns the name of a type starting from its type_info An empty string is returned in case of failure...
Definition RDFUtils.cxx:191
ROOT::RDataFrame FromRNTuple(std::string_view ntupleName, const std::vector< std::string > &fileNames, const std::pair< ULong64_t, ULong64_t > &range)
Internal overload of the function that allows passing a range of entries.
std::pair< std::vector< ROOT::Internal::RNTupleClusterBoundaries >, ROOT::NTupleSize_t > GetClustersAndEntries(std::string_view ntupleName, std::string_view location)
Retrieves the cluster boundaries and the number of entries for the input RNTuple.
constexpr std::size_t CacheLineStep()
Stepping through CacheLineStep<T> values in a vector<T> brings you to a new cache line.
Definition Utils.hxx:225
void SetAllowFieldSubstitutions(RFieldZero &fieldZero, bool val)
Definition RField.cxx:35
void CallConnectPageSourceOnField(RFieldBase &, ROOT::Internal::RPageSource &)
std::vector< ROOT::Internal::RNTupleClusterBoundaries > GetClusterBoundaries(const RNTupleDescriptor &desc)
Return the cluster boundaries for each cluster in this RNTuple.
std::string GetRenormalizedTypeName(const std::string &metaNormalizedName)
Given a type name normalized by ROOT meta, renormalize it for RNTuple. E.g., insert std::prefix.
RDataFrame FromRNTuple(std::string_view ntupleName, std::string_view fileName)
std::vector< std::string > ColumnNames_t
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
ENTupleStructure
The fields in the RNTuple data model tree can carry different structural information about the type s...
Tag to let data sources use the native data type when creating a column reader.
Definition Utils.hxx:347
The PrepareNextRanges() method populates the fNextRanges list with REntryRangeDS records.
Definition RNTupleDS.hxx:88
std::string_view fFileName
Storage location of the current RNTuple.
Definition RNTupleDS.hxx:93
ULong64_t fLastEntry
End entry index in fSource, e.g. the number of entries in the range is fLastEntry - fFirstEntry.
Definition RNTupleDS.hxx:92
std::unique_ptr< ROOT::Internal::RPageSource > fSource
Definition RNTupleDS.hxx:89
ULong64_t fFirstEntry
First entry index in fSource.
Definition RNTupleDS.hxx:90