Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleDS.cxx
Go to the documentation of this file.
1/// \file RNTupleDS.cxx
2/// \author Jakob Blomer <jblomer@cern.ch>
3/// \author Enrico Guiraud <enrico.guiraud@cern.ch>
4/// \date 2018-10-04
5
6/*************************************************************************
7 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
8 * All rights reserved. *
9 * *
10 * For the licensing terms see $ROOTSYS/LICENSE. *
11 * For the list of contributors see $ROOTSYS/README/CREDITS. *
12 *************************************************************************/
13
15#include <ROOT/RDataFrame.hxx>
16#include <ROOT/RDF/Utils.hxx>
17#include <ROOT/RField.hxx>
18#include <ROOT/RFieldUtils.hxx>
21#include <ROOT/RNTupleDS.hxx>
22#include <ROOT/RNTupleTypes.hxx>
23#include <ROOT/RPageStorage.hxx>
24#include <string_view>
25
26#include <TError.h>
27#include <TSystem.h>
28
29#include <cassert>
30#include <memory>
31#include <mutex>
32#include <string>
33#include <vector>
34#include <typeinfo>
35#include <utility>
36
37// clang-format off
38/**
39* \class ROOT::RDF::RNTupleDS
40* \ingroup dataframe
41* \brief The RDataSource implementation for RNTuple. It lets RDataFrame read RNTuple data.
42*
43* An RDataFrame that reads RNTuple data can be constructed using FromRNTuple().
44*
45* For each column containing an array or a collection, a corresponding column `#colname` is available to access
46* `colname.size()` without reading and deserializing the collection values.
47*
48**/
49// clang-format on
50
51namespace ROOT::Internal::RDF {
52/// An artificial field that transforms an RNTuple column that contains the offset of collections into
53/// collection sizes. It is used to provide the "number of" RDF columns for collections, e.g.
54/// `R_rdf_sizeof_jets` for a collection named `jets`.
55///
56/// This field owns the collection offset field but instead of exposing the collection offsets it exposes
57/// the collection sizes (offset(N+1) - offset(N)). For the time being, we offer this functionality only in RDataFrame.
58/// TODO(jblomer): consider providing a general set of useful virtual fields as part of RNTuple.
60protected:
61 std::unique_ptr<ROOT::RFieldBase> CloneImpl(std::string_view /* newName */) const final
62 {
63 return std::make_unique<RRDFCardinalityField>();
64 }
65 void ConstructValue(void *where) const final { *static_cast<std::size_t *>(where) = 0; }
66
67 // We construct these fields and know that they match the page source
69
70public:
71 RRDFCardinalityField() : ROOT::RFieldBase("", "std::size_t", ROOT::ENTupleStructure::kPlain, false /* isSimple */) {}
74 ~RRDFCardinalityField() override = default;
75
85 // Field is only used for reading
86 void GenerateColumns() final { throw RException(R__FAIL("Cardinality fields must only be used for reading")); }
91
92 size_t GetValueSize() const final { return sizeof(std::size_t); }
93 size_t GetAlignment() const final { return alignof(std::size_t); }
94
95 /// Get the number of elements of the collection identified by globalIndex
103
104 /// Get the number of elements of the collection identified by clusterIndex
112};
113
114/**
115 * @brief An artificial field that provides the size of a fixed-size array
116 *
117 * This is the implementation of `R_rdf_sizeof_column` in case `column` contains
118 * fixed-size arrays on disk.
119 */
121private:
122 std::size_t fArrayLength;
123
124 std::unique_ptr<ROOT::RFieldBase> CloneImpl(std::string_view) const final
125 {
126 return std::make_unique<RArraySizeField>(fArrayLength);
127 }
128 void GenerateColumns() final { throw RException(R__FAIL("RArraySizeField fields must only be used for reading")); }
130 void ReadGlobalImpl(ROOT::NTupleSize_t /*globalIndex*/, void *to) final
131 {
132 *static_cast<std::size_t *>(to) = fArrayLength;
133 }
134 void ReadInClusterImpl(RNTupleLocalIndex /*localIndex*/, void *to) final
135 {
136 *static_cast<std::size_t *>(to) = fArrayLength;
137 }
138
139 // We construct these fields and know that they match the page source
141
142public:
144 : ROOT::RFieldBase("", "std::size_t", ROOT::ENTupleStructure::kPlain, false /* isSimple */),
146 {
147 }
153
154 void ConstructValue(void *where) const final { *static_cast<std::size_t *>(where) = 0; }
155 std::size_t GetValueSize() const final { return sizeof(std::size_t); }
156 std::size_t GetAlignment() const final { return alignof(std::size_t); }
157};
158
159/// Every RDF column is represented by exactly one RNTuple field
163
164 RNTupleDS *fDataSource; ///< The data source that owns this column reader
165 RFieldBase *fProtoField; ///< The prototype field from which fField is cloned
166 std::unique_ptr<RFieldBase> fField; ///< The field backing the RDF column
167 std::unique_ptr<RFieldBase::RValue> fValue; ///< The memory location used to read from fField
168 std::shared_ptr<void> fValuePtr; ///< Used to reuse the object created by fValue when reconnecting sources
169 Long64_t fLastEntry = -1; ///< Last entry number that was read
170 /// For chains, the logical entry and the physical entry in any particular file can be different.
171 /// The entry offset stores the logical entry number (sum of all previous physical entries) when a file of the
172 /// corresponding data source was opened.
174
175public:
177 ~RNTupleColumnReader() override = default;
178
179 /// Connect the field and its subfields to the page source
181 {
182 assert(fLastEntry == -1);
183
185
186 // Create a new, real field from the prototype and set its field ID in the context of the given page source
188 {
189 auto descGuard = source.GetSharedDescriptorGuard();
190 // Set the on-disk field IDs for the field and the subfield
191 fField->SetOnDiskId(
193 auto iProto = fProtoField->cbegin();
194 auto iReal = fField->begin();
195 for (; iReal != fField->end(); ++iProto, ++iReal) {
196 iReal->SetOnDiskId(descGuard->FindFieldId(fDataSource->fFieldId2QualifiedName.at(iProto->GetOnDiskId())));
197 }
198 }
199
202 fieldZero.Attach(std::move(fField));
203 try {
205 } catch (const ROOT::RException &err) {
206 fField = std::move(fieldZero.ReleaseSubfields()[0]);
207 auto onDiskType = source.GetSharedDescriptorGuard()->GetFieldDescriptor(fField->GetOnDiskId()).GetTypeName();
208 std::string msg = "RNTupleDS: invalid type \"" + fField->GetTypeName() + "\" for column \"" +
209 fDataSource->fFieldId2QualifiedName[fField->GetOnDiskId()] + "\" with on-disk type \"" +
210 onDiskType + "\"";
211 throw std::runtime_error(msg);
212 }
213 fField = std::move(fieldZero.ReleaseSubfields()[0]);
214
215 if (fValuePtr) {
216 // When the reader reconnects to a new file, the fValuePtr is already set
217 fValue = std::make_unique<RFieldBase::RValue>(fField->BindValue(fValuePtr));
218 fValuePtr = nullptr;
219 } else {
220 // For the first file, create a new object for this field (reader)
221 fValue = std::make_unique<RFieldBase::RValue>(fField->CreateValue());
222 }
223 }
224
226 {
227 if (fValue && keepValue) {
228 fValuePtr = fValue->GetPtr<void>();
229 }
230 fValue = nullptr;
231 fField = nullptr;
232 fLastEntry = -1;
233 }
234
235 void *GetImpl(Long64_t entry) final
236 {
237 if (entry != fLastEntry) {
238 fValue->Read(entry - fEntryOffset);
240 }
241 return fValue->GetPtr<void>().get();
242 }
243};
244} // namespace ROOT::Internal::RDF
245
247
249 ROOT::DescriptorId_t fieldId, std::vector<RNTupleDS::RFieldInfo> fieldInfos,
250 bool convertToRVec)
251{
252 // As an example for the mapping of RNTuple fields to RDF columns, let's consider an RNTuple
253 // using the following types and with a top-level field named "event" of type Event:
254 //
255 // struct Event {
256 // int id;
257 // std::vector<Track> tracks;
258 // };
259 // struct Track {
260 // std::vector<Hit> hits;
261 // };
262 // struct Hit {
263 // float x;
264 // float y;
265 // };
266 //
267 // AddField() will be called from the constructor with the RNTuple root field (ENTupleStructure::kRecord).
268 // From there, we recurse into the "event" sub field (also ENTupleStructure::kRecord) and further down the
269 // tree of sub fields and expose the following RDF columns:
270 //
271 // "event" [Event]
272 // "event.id" [int]
273 // "event.tracks" [RVec<Track>]
274 // "R_rdf_sizeof_event.tracks" [unsigned int]
275 // "event.tracks.hits" [RVec<RVec<Hit>>]
276 // "R_rdf_sizeof_event.tracks.hits" [RVec<unsigned int>]
277 // "event.tracks.hits.x" [RVec<RVec<float>>]
278 // "R_rdf_sizeof_event.tracks.hits.x" [RVec<unsigned int>]
279 // "event.tracks.hits.y" [RVec<RVec<float>>]
280 // "R_rdf_sizeof_event.tracks.hits.y" [RVec<unsigned int>]
281
282 const auto &fieldDesc = desc.GetFieldDescriptor(fieldId);
283 const auto &nRepetitions = fieldDesc.GetNRepetitions();
284 if ((fieldDesc.GetStructure() == ROOT::ENTupleStructure::kCollection) || (nRepetitions > 0)) {
285 // The field is a collection or a fixed-size array.
286 // We open a new collection scope with fieldID being the inner most collection. E.g. for "event.tracks.hits",
287 // fieldInfos would already contain the fieldID of "event.tracks"
288 fieldInfos.emplace_back(fieldId, nRepetitions);
289 }
290
291 if (fieldDesc.GetStructure() == ROOT::ENTupleStructure::kCollection) {
292 // Inner fields of collections are provided as projected collections of only that inner field,
293 // E.g. we provide a projected collection RVec<RVec<float>> for "event.tracks.hits.x" in the example
294 // above.
296 convertToRVec && (fieldDesc.GetTypeName().substr(0, 19) == "ROOT::VecOps::RVec<" ||
297 fieldDesc.GetTypeName().substr(0, 12) == "std::vector<" || fieldDesc.GetTypeName() == "");
298 const auto &f = *desc.GetFieldIterable(fieldDesc.GetId()).begin();
300
301 // Note that at the end of the recursion, we handled the inner sub collections as well as the
302 // collection as whole, so we are done.
303 return;
304
305 } else if (nRepetitions > 0) {
306 // Fixed-size array, same logic as ROOT::RVec.
307 const auto &f = *desc.GetFieldIterable(fieldDesc.GetId()).begin();
308 AddField(desc, colName, f.GetId(), fieldInfos);
309 return;
310 } else if (fieldDesc.GetStructure() == ROOT::ENTupleStructure::kRecord) {
311 // Inner fields of records are provided as individual RDF columns, e.g. "event.id"
312 for (const auto &f : desc.GetFieldIterable(fieldDesc.GetId())) {
313 auto innerName = colName.empty() ? f.GetFieldName() : (std::string(colName) + "." + f.GetFieldName());
314 // Inner fields of collections of records are always exposed as ROOT::RVec
315 AddField(desc, innerName, f.GetId(), fieldInfos);
316 }
317 }
318
319 // The fieldID could be the root field or the class of fieldId might not be loaded.
320 // In these cases, only the inner fields are exposed as RDF columns.
321 auto fieldOrException = ROOT::RFieldBase::Create(fieldDesc.GetFieldName(), fieldDesc.GetTypeName());
322 if (!fieldOrException)
323 return;
324 auto valueField = fieldOrException.Unwrap();
325 valueField->SetOnDiskId(fieldId);
326 for (auto &f : *valueField) {
327 f.SetOnDiskId(desc.FindFieldId(f.GetFieldName(), f.GetParent()->GetOnDiskId()));
328 }
329 std::unique_ptr<ROOT::RFieldBase> cardinalityField;
330 // Collections get the additional "number of" RDF column (e.g. "R_rdf_sizeof_tracks")
331 if (!fieldInfos.empty()) {
332 const auto &info = fieldInfos.back();
333 if (info.fNRepetitions > 0) {
334 cardinalityField = std::make_unique<ROOT::Internal::RDF::RArraySizeField>(info.fNRepetitions);
335 } else {
336 cardinalityField = std::make_unique<ROOT::Internal::RDF::RRDFCardinalityField>();
337 }
338 cardinalityField->SetOnDiskId(info.fFieldId);
339 }
340
341 for (auto i = fieldInfos.rbegin(); i != fieldInfos.rend(); ++i) {
342 const auto &fieldInfo = *i;
343
344 if (fieldInfo.fNRepetitions > 0) {
345 // Fixed-size array, read it as ROOT::RVec in memory
346 valueField = std::make_unique<ROOT::RArrayAsRVecField>("", std::move(valueField), fieldInfo.fNRepetitions);
347 } else {
348 // Actual collection. A std::vector or ROOT::RVec gets added as a ROOT::RVec. All other collection types keep
349 // their original type.
350 if (convertToRVec) {
351 valueField = std::make_unique<ROOT::RRVecField>("", std::move(valueField));
352 } else {
353 auto outerFieldType = desc.GetFieldDescriptor(fieldInfo.fFieldId).GetTypeName();
355 }
356 }
357
358 valueField->SetOnDiskId(fieldInfo.fFieldId);
359
360 // Skip the inner-most collection level to construct the cardinality column
361 // It's taken care of by the `if (!fieldInfos.empty())` scope above
362 if (i != fieldInfos.rbegin()) {
363 if (fieldInfo.fNRepetitions > 0) {
364 // This collection level refers to a fixed-size array
366 std::make_unique<ROOT::RArrayAsRVecField>("", std::move(cardinalityField), fieldInfo.fNRepetitions);
367 } else {
368 // This collection level refers to an RVec
369 cardinalityField = std::make_unique<ROOT::RRVecField>("", std::move(cardinalityField));
370 }
371
372 cardinalityField->SetOnDiskId(fieldInfo.fFieldId);
373 }
374 }
375
376 if (cardinalityField) {
377 fColumnNames.emplace_back("R_rdf_sizeof_" + std::string(colName));
378 fColumnTypes.emplace_back(cardinalityField->GetTypeName());
379 fProtoFields.emplace_back(std::move(cardinalityField));
380 }
381
382 fieldInfos.emplace_back(fieldId, nRepetitions);
383 fColumnNames.emplace_back(colName);
384 fColumnTypes.emplace_back(valueField->GetTypeName());
385 fProtoFields.emplace_back(std::move(valueField));
386}
387
388ROOT::RDF::RNTupleDS::RNTupleDS(std::unique_ptr<ROOT::Internal::RPageSource> pageSource)
389{
390 pageSource->Attach();
391 fPrincipalDescriptor = pageSource->GetSharedDescriptorGuard()->Clone();
392 fStagingArea.emplace_back(std::move(pageSource));
393
394 AddField(fPrincipalDescriptor, "", fPrincipalDescriptor.GetFieldZeroId(),
395 std::vector<ROOT::RDF::RNTupleDS::RFieldInfo>());
396}
397
398namespace {
399
401{
402 // The setting is for now a global one, must be decided before running the
403 // program by setting the appropriate environment variable. Make sure that
404 // option configuration is thread-safe and happens only once.
406 static std::once_flag flag;
407 std::call_once(flag, []() {
408 if (auto env = gSystem->Getenv("ROOT_RNTUPLE_CLUSTERBUNCHSIZE"); env != nullptr && strlen(env) > 0) {
409 std::string envStr{env};
410 auto envNum{std::stoul(envStr)};
411 envNum = envNum == 0 ? 1 : envNum;
413 }
414 });
415 return opts;
416}
417
418std::unique_ptr<ROOT::Internal::RPageSource> CreatePageSource(std::string_view ntupleName, std::string_view fileName)
419{
421}
422} // namespace
423
424ROOT::RDF::RNTupleDS::RNTupleDS(std::string_view ntupleName, std::string_view fileName)
425 : RNTupleDS(CreatePageSource(ntupleName, fileName))
426{
427 fFileNames = std::vector<std::string>{std::string{fileName}};
428}
429
430ROOT::RDF::RNTupleDS::RNTupleDS(std::string_view ntupleName, const std::vector<std::string> &fileNames)
431 : RNTupleDS(CreatePageSource(ntupleName, fileNames[0]))
432{
435 fStagingArea.resize(fFileNames.size());
436}
437
438ROOT::RDF::RNTupleDS::RNTupleDS(std::string_view ntupleName, const std::vector<std::string> &fileNames,
439 const std::pair<ULong64_t, ULong64_t> &range)
441{
443}
444
446ROOT::RDF::RNTupleDS::GetColumnReadersImpl(std::string_view /* name */, const std::type_info & /* ti */)
447{
448 // This datasource uses the newer GetColumnReaders() API
449 return {};
450}
451
453{
454 // At this point we can assume that `name` will be found in fColumnNames
455 const auto index =
456 std::distance(fColumnNames.begin(), std::find(fColumnNames.begin(), fColumnNames.end(), fieldName));
457
458 // A reader was requested but we don't have RTTI for it, this is encoded with the tag UseNativeDataType. We can just
459 // return the available protofield
461 return fProtoFields[index].get();
462 }
463
464 // The user explicitly requested a type
466
467 // If the field corresponding to the provided name is not a cardinality column and the requested type is different
468 // from the proto field that was created when the data source was constructed, we first have to create an
469 // alternative proto field for the column reader. Otherwise, we can directly use the existing proto field.
470 if (fieldName.substr(0, 13) != "R_rdf_sizeof_" && requestedType != fColumnTypes[index]) {
471 auto &altProtoFields = fAlternativeProtoFields[index];
472
473 // If we can find the requested type in the registered alternative protofields, return the corresponding field
474 if (auto altProtoField = std::find_if(altProtoFields.begin(), altProtoFields.end(),
475 [&requestedType](const std::unique_ptr<ROOT::RFieldBase> &fld) {
476 return fld->GetTypeName() == requestedType;
477 });
479 return altProtoField->get();
480 }
481
482 // Otherwise, create a new protofield and register it in the alternatives before returning
485 throw std::runtime_error("RNTupleDS: Could not create field with type \"" + requestedType +
486 "\" for column \"" + std::string(fieldName) + "\"");
487 }
489 newAltProtoField->SetOnDiskId(fProtoFields[index]->GetOnDiskId());
490 auto *newField = newAltProtoField.get();
491 altProtoFields.emplace_back(std::move(newAltProtoField));
492 return newField;
493 }
494
495 // General case: there was a correspondence between the user-requested type and the corresponding column type
496 return fProtoFields[index].get();
497}
498
499std::unique_ptr<ROOT::Detail::RDF::RColumnReaderBase>
500ROOT::RDF::RNTupleDS::GetColumnReaders(unsigned int slot, std::string_view name, const std::type_info &tid)
501{
502 ROOT::RFieldBase *field = GetFieldWithTypeChecks(name, tid);
503 assert(field != nullptr);
504
505 // Map the field's and subfields' IDs to qualified names so that we can later connect the fields to
506 // other page sources from the chain
507 fFieldId2QualifiedName[field->GetOnDiskId()] = fPrincipalDescriptor.GetQualifiedFieldName(field->GetOnDiskId());
508 for (const auto &s : *field) {
509 fFieldId2QualifiedName[s.GetOnDiskId()] = fPrincipalDescriptor.GetQualifiedFieldName(s.GetOnDiskId());
510 }
511
512 auto reader = std::make_unique<ROOT::Internal::RDF::RNTupleColumnReader>(this, field);
513 fActiveColumnReaders[slot].emplace_back(reader.get());
514
515 return reader;
516}
517
519{
520 while (true) {
521 std::unique_lock lock(fMutexStaging);
522 fCvStaging.wait(lock, [this] { return fIsReadyForStaging || fStagingThreadShouldTerminate; });
523 if (fStagingThreadShouldTerminate)
524 return;
525
526 assert(!fHasNextSources);
527 StageNextSources();
528 fHasNextSources = true;
529 fIsReadyForStaging = false;
530
531 lock.unlock();
532 fCvStaging.notify_one();
533 }
534}
535
537{
538 const auto nFiles = fFileNames.empty() ? 1 : fFileNames.size();
539
540 for (auto i = fNextFileIndex; (i < nFiles) && ((i - fNextFileIndex) < fNSlots); ++i) {
541
542 if (fStagingThreadShouldTerminate)
543 return;
544
545 if (fStagingArea[i]) {
546 // The first file is already open and was used to read the schema
547 assert(i == 0);
548 } else {
549 fStagingArea[i] = CreatePageSource(fNTupleName, fFileNames[i]);
550 fStagingArea[i]->LoadStructure();
551 }
552 }
553}
554
556{
557 assert(fNextRanges.empty());
558
559 auto nFiles = fFileNames.empty() ? 1 : fFileNames.size();
560 auto nRemainingFiles = nFiles - fNextFileIndex;
561
562 if (nRemainingFiles == 0)
563 return;
564
565 // Easy work scheduling: one file per slot. We skip empty files (files without entries).
566
567 if ((nRemainingFiles >= fNSlots) || (fGlobalEntryRange.has_value())) {
568 while ((fNextRanges.size() < fNSlots) && (fNextFileIndex < nFiles)) {
570
571 std::swap(fStagingArea[fNextFileIndex], range.fSource);
572
573 if (!range.fSource) {
574 // Typically, the prestaged source should have been present. Only if some of the files are empty, we need
575 // to open and attach files here.
576 range.fSource = CreatePageSource(fNTupleName, fFileNames[fNextFileIndex]);
577 }
578 range.fFileName = fFileNames[fNextFileIndex];
579 range.fSource->Attach();
580 fNextFileIndex++;
581 auto nEntries = range.fSource->GetNEntries();
582 if (nEntries == 0)
583 continue;
584 range.fLastEntry = nEntries; // whole file per slot, i.e. entry range [0..nEntries - 1]
585
586 fNextRanges.emplace_back(std::move(range));
587 }
588 return;
589 }
590
591 // Work scheduling of the tail: multiple slots work on the same file.
592 // Every slot still has its own page source but these page sources may open the same file.
593 // Again, we need to skip empty files.
594 unsigned int nSlotsPerFile = fNSlots / nRemainingFiles;
595 for (std::size_t i = 0; (fNextRanges.size() < fNSlots) && (fNextFileIndex < nFiles); ++i) {
596 std::unique_ptr<ROOT::Internal::RPageSource> source;
597 // Need to look for the file name to populate the sample info later
598 const auto &sourceFileName = fFileNames[fNextFileIndex];
599 std::swap(fStagingArea[fNextFileIndex], source);
600 if (!source) {
601 // Empty files trigger this condition
602 source = CreatePageSource(fNTupleName, fFileNames[fNextFileIndex]);
603 }
604 source->Attach();
605 fNextFileIndex++;
606
607 auto nEntries = source->GetNEntries();
608 if (nEntries == 0)
609 continue;
610
611 // If last file: use all remaining slots
612 if (i == (nRemainingFiles - 1))
613 nSlotsPerFile = fNSlots - fNextRanges.size();
614
615 const auto rangesByCluster = [&source]() {
616 // Take the shared lock of the descriptor just for the time necessary
617 const auto descGuard = source->GetSharedDescriptorGuard();
619 }();
620
621 const unsigned int nRangesByCluster = rangesByCluster.size();
622
623 // Distribute slots equidistantly over the entry range, aligned on cluster boundaries
625 const auto remainder = nRangesByCluster % nSlotsPerFile;
626 std::size_t iRange = 0;
627 unsigned int iSlot = 0;
628 const unsigned int N = std::min(nSlotsPerFile, nRangesByCluster);
629 for (; iSlot < N; ++iSlot) {
630 auto start = rangesByCluster[iRange].fFirstEntry;
631 iRange += nClustersPerSlot + static_cast<int>(iSlot < remainder);
632 assert(iRange > 0);
633 auto end = rangesByCluster[iRange - 1].fLastEntryPlusOne;
634
636 range.fFileName = sourceFileName;
637 // The last range for this file just takes the already opened page source. All previous ranges clone.
638 if (iSlot == N - 1) {
639 range.fSource = std::move(source);
640 } else {
641 range.fSource = source->Clone();
642 }
643 range.fSource->SetEntryRange({start, end - start});
644 range.fFirstEntry = start;
645 range.fLastEntry = end;
646 fNextRanges.emplace_back(std::move(range));
647 }
648 } // loop over tail of remaining files
649}
650
651std::vector<std::pair<ULong64_t, ULong64_t>> ROOT::RDF::RNTupleDS::GetEntryRanges()
652{
653 std::vector<std::pair<ULong64_t, ULong64_t>> ranges;
654
655 // We need to distinguish between single threaded and multi-threaded runs.
656 // In single threaded mode, InitSlot is only called once and column readers have to be rewired
657 // to new page sources of the chain in GetEntryRanges. In multi-threaded mode, on the other hand,
658 // InitSlot is called for every returned range, thus rewiring the column readers takes place in
659 // InitSlot and FinalizeSlot.
660
661 if (fNSlots == 1) {
662 for (auto r : fActiveColumnReaders[0]) {
663 r->Disconnect(true /* keepValue */);
664 }
665 }
666
667 // If we have fewer files than slots and we run multiple event loops, we can reuse fCurrentRanges and don't need
668 // to worry about loading the fNextRanges. I.e., in this case we don't enter the if block.
669 if (fCurrentRanges.empty() || fSeenEntriesNoGlobalRange > 0) {
670 // Otherwise, i.e. start of the first event loop or in the middle of the event loop, prepare the next ranges
671 // and swap with the current ones.
672 {
673 std::unique_lock lock(fMutexStaging);
674 fCvStaging.wait(lock, [this] { return fHasNextSources; });
675 }
676 PrepareNextRanges();
677 if (fNextRanges.empty()) {
678 // No more data
679 return ranges;
680 }
681
682 assert(fNextRanges.size() <= fNSlots);
683
684 fCurrentRanges.clear();
685 std::swap(fCurrentRanges, fNextRanges);
686 }
687
688 // Stage next batch of files for the next call to GetEntryRanges()
689 {
690 std::lock_guard _(fMutexStaging);
691 fIsReadyForStaging = true;
692 fHasNextSources = false;
693 }
694 fCvStaging.notify_one();
695
696 // Create ranges for the RDF loop manager from the list of REntryRangeDS records.
697 // The entry ranges that are relative to the page source in REntryRangeDS are translated into absolute
698 // entry ranges, given the current state of the entry cursor.
699 // We remember the connection from first absolute entry index of a range to its REntryRangeDS record
700 // so that we can properly rewire the column reader in InitSlot
701 fFirstEntry2RangeIdx.clear();
702 fOriginalRanges.clear();
703
705
706 for (std::size_t i = 0; i < fCurrentRanges.size(); ++i) {
707
708 // Several consecutive ranges may operate on the same file (each with their own page source clone).
709 // We can detect a change of file when the first entry number jumps back to 0.
710 if (fCurrentRanges[i].fFirstEntry == 0) {
711 // New source
712 fSeenEntriesNoGlobalRange += nEntriesPerSource;
714 }
715
716 auto start = fCurrentRanges[i].fFirstEntry + fSeenEntriesNoGlobalRange;
717 auto end = fCurrentRanges[i].fLastEntry + fSeenEntriesNoGlobalRange;
718
719 nEntriesPerSource += end - start;
720
721 if (fGlobalEntryRange.has_value()) {
722
723 // We need to consider different scenarios for when we have GlobalRanges set by the user.
724 // Consider a simple case of 3 files, with original ranges set as (consecutive entries of 3 files):
725 // [0, 20], [20, 45], [45, 65]
726 // we will now see what happens in each of the scenarios below when GlobalRanges can be set to different
727 // values:
728 // a) [2, 5] - we stay in file 1
729 // - hence we will use the 1st case and get the range [2,5], in this case we also need to quit further
730 // processing from the other files by entering case 3
731 // b) [2, 21] - we start in file 1 and finish in file 2
732 // - use the 2nd case first, as 21 > 20 (end of first file), then we will go to case 1, resulting in ranges:
733 // [2, 20], [20, 21], c) [21 - 40] - we skip file 1, start in file 2 and stay in file 2
734 // - to skip the first file, we use the 4th case, followed by the 1st case, resulting range is: [21, 40]
735 // d) [21 - 65] - we skip file 1, start in file 2 and continue to file 3
736 // - to skip the first file, we use the 4th case, we continue with the 2nd case, and use the 1st case at the
737 // end, resulting ranges are [21, 45], [45, 65]
738 // The first case
739 if (fGlobalEntryRange->first >= start && fGlobalEntryRange->second <= end) {
740 fOriginalRanges.emplace_back(start, end);
741 fFirstEntry2RangeIdx[fGlobalEntryRange->first] = i;
742 ranges.emplace_back(fGlobalEntryRange->first, fGlobalEntryRange->second);
743 }
744
745 // The second case:
746 // The `fGlobalEntryRange->first < end` condition is to distinguish this case from the 4th case.
747 else if (fGlobalEntryRange->second > end && fGlobalEntryRange->first < end) {
748 fOriginalRanges.emplace_back(start, end);
749 fFirstEntry2RangeIdx[fGlobalEntryRange->first] = i;
750 ranges.emplace_back(fGlobalEntryRange->first, end);
751 std::optional<std::pair<ULong64_t, ULong64_t>> newvalues({end, fGlobalEntryRange->second});
752 fGlobalEntryRange.swap(newvalues);
753 }
754 // The third case, needed to correctly quit processing if we only stay in the first file
755 else if (fGlobalEntryRange->second < start) {
756 return ranges;
757 }
758
759 // The fourth case:
760 else if (fGlobalEntryRange->first >= end) {
761 fOriginalRanges.emplace_back(start, end);
762 fFirstEntry2RangeIdx[start] = i;
763 ranges.emplace_back(start, start);
764 }
765 }
766
767 else {
768 fFirstEntry2RangeIdx[start] = i;
769 fOriginalRanges.emplace_back(start, end);
770 ranges.emplace_back(start, end);
771 }
772 }
773
774 fSeenEntriesNoGlobalRange += nEntriesPerSource;
775
776 if ((fNSlots == 1) && (fCurrentRanges[0].fSource)) {
777 for (auto r : fActiveColumnReaders[0]) {
778 r->Connect(*fCurrentRanges[0].fSource, fOriginalRanges[0].first);
779 }
780 }
781
782 return ranges;
783}
784
786{
787 if (fNSlots == 1) {
788 // Ensure the connection between slot and range is valid also in single-thread mode
789 fSlotsToRangeIdxs[0] = 0;
790 return;
791 }
792
793 // The same slot ID could be picked multiple times in the same execution, thus
794 // ending up processing different page sources. Here we re-establish the
795 // connection between the slot and the correct page source by finding which
796 // range index corresponds to the first entry passed.
797 auto idxRange = fFirstEntry2RangeIdx.at(firstEntry);
798
799 // We also remember this connection so it can later be retrieved in CreateSampleInfo
800 fSlotsToRangeIdxs[slot * ROOT::Internal::RDF::CacheLineStep<std::size_t>()] = idxRange;
801
802 for (auto r : fActiveColumnReaders[slot]) {
803 r->Connect(*fCurrentRanges[idxRange].fSource,
804 fOriginalRanges[idxRange].first - fCurrentRanges[idxRange].fFirstEntry);
805 }
806}
807
809{
810 if (fNSlots == 1)
811 return;
812
813 for (auto r : fActiveColumnReaders[slot]) {
814 r->Disconnect(true /* keepValue */);
815 }
816}
817
818std::string ROOT::RDF::RNTupleDS::GetTypeName(std::string_view colName) const
819{
820 auto colNamePos = std::find(fColumnNames.begin(), fColumnNames.end(), colName);
821
822 if (colNamePos == fColumnNames.end()) {
823 auto msg = std::string("RNTupleDS: There is no column with name \"") + std::string(colName) + "\"";
824 throw std::runtime_error(msg);
825 }
826
827 const auto index = std::distance(fColumnNames.begin(), colNamePos);
828 return fColumnTypes[index];
829}
830
831bool ROOT::RDF::RNTupleDS::HasColumn(std::string_view colName) const
832{
833 return std::find(fColumnNames.begin(), fColumnNames.end(), colName) != fColumnNames.end();
834}
835
837{
838 fSeenEntriesNoGlobalRange = 0;
839 fNextFileIndex = 0;
840 fIsReadyForStaging = fHasNextSources = fStagingThreadShouldTerminate = false;
841 fThreadStaging = std::thread(&RNTupleDS::ExecStaging, this);
842 assert(fNextRanges.empty());
843
844 if (fCurrentRanges.empty() || (fFileNames.size() > fNSlots)) {
845 // First event loop or large number of files: start the staging process.
846 {
847 std::lock_guard _(fMutexStaging);
848 fIsReadyForStaging = true;
849 }
850 fCvStaging.notify_one();
851 } else {
852 // Otherwise, we will reuse fCurrentRanges. Make sure that staging and preparing next ranges will be a noop
853 // (already at the end of the list of files).
854 fNextFileIndex = std::max(fFileNames.size(), std::size_t(1));
855 }
856}
857
859{
860 for (unsigned int i = 0; i < fNSlots; ++i) {
861 for (auto r : fActiveColumnReaders[i]) {
862 r->Disconnect(false /* keepValue */);
863 }
864 }
865 {
866 std::lock_guard _(fMutexStaging);
867 fStagingThreadShouldTerminate = true;
868 }
869 fCvStaging.notify_one();
870 fThreadStaging.join();
871 // If we have a chain with more files than the number of slots, the files opened at the end of the
872 // event loop won't be reused when the event loop restarts, so we can close them.
873 if (fFileNames.size() > fNSlots) {
874 fCurrentRanges.clear();
875 fNextRanges.clear();
876 fStagingArea.clear();
877 fStagingArea.resize(fFileNames.size());
878 }
879}
880
882{
883 assert(fNSlots == 0);
884 assert(nSlots > 0);
885 fNSlots = nSlots;
886 fActiveColumnReaders.resize(fNSlots);
887 fSlotsToRangeIdxs.resize(fNSlots * ROOT::Internal::RDF::CacheLineStep<std::size_t>());
888}
889
890ROOT::RDataFrame ROOT::RDF::FromRNTuple(std::string_view ntupleName, std::string_view fileName)
891{
892 return ROOT::RDataFrame(std::make_unique<ROOT::RDF::RNTupleDS>(ntupleName, fileName));
893}
894
895ROOT::RDataFrame ROOT::RDF::FromRNTuple(std::string_view ntupleName, const std::vector<std::string> &fileNames)
896{
897 return ROOT::RDataFrame(std::make_unique<ROOT::RDF::RNTupleDS>(ntupleName, fileNames));
898}
899
900ROOT::RDF::RSampleInfo ROOT::Internal::RDF::RNTupleDS::CreateSampleInfo(
901 unsigned int slot, const std::unordered_map<std::string, ROOT::RDF::Experimental::RSample *> &sampleMap) const
902{
903 // The same slot ID could be picked multiple times in the same execution, thus
904 // ending up processing different page sources. Here we re-establish the
905 // connection between the slot and the correct page source by retrieving
906 // which range is connected currently to the slot
907
908 const auto &rangeIdx = fSlotsToRangeIdxs.at(slot * ROOT::Internal::RDF::CacheLineStep<std::size_t>());
909
910 // Missing source if a file does not exist
911 if (!fCurrentRanges[rangeIdx].fSource)
912 return ROOT::RDF::RSampleInfo{};
913
914 const auto &ntupleName = fCurrentRanges[rangeIdx].fSource->GetNTupleName();
915 const auto &ntuplePath = fCurrentRanges[rangeIdx].fFileName;
916 const auto ntupleID = std::string(ntuplePath) + '/' + ntupleName;
917
918 if (sampleMap.empty())
920 ntupleID, std::make_pair(fCurrentRanges[rangeIdx].fFirstEntry, fCurrentRanges[rangeIdx].fLastEntry));
921
922 if (sampleMap.find(ntupleID) == sampleMap.end())
923 throw std::runtime_error("Full sample identifier '" + ntupleID + "' cannot be found in the available samples.");
924
926 ntupleID, std::make_pair(fCurrentRanges[rangeIdx].fFirstEntry, fCurrentRanges[rangeIdx].fLastEntry),
927 sampleMap.at(ntupleID));
928}
929
932 const std::pair<ULong64_t, ULong64_t> &range)
933{
934 std::unique_ptr<ROOT::RDF::RNTupleDS> ds{new ROOT::RDF::RNTupleDS(ntupleName, fileNames, range)};
935 return ROOT::RDataFrame(std::move(ds));
936}
937
938std::pair<std::vector<ROOT::Internal::RNTupleClusterBoundaries>, ROOT::NTupleSize_t>
939ROOT::Internal::RDF::GetClustersAndEntries(std::string_view ntupleName, std::string_view location)
940{
942 source->Attach();
943 const auto descGuard = source->GetSharedDescriptorGuard();
944 return std::make_pair(ROOT::Internal::GetClusterBoundaries(descGuard.GetRef()), descGuard->GetNEntries());
945}
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:300
#define f(i)
Definition RSha256.hxx:104
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
long long Long64_t
Portable signed long integer 8 bytes.
Definition RtypesCore.h:83
unsigned long long ULong64_t
Portable unsigned long integer 8 bytes.
Definition RtypesCore.h:84
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define N
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t index
char name[80]
Definition TGX11.cxx:110
R__EXTERN TSystem * gSystem
Definition TSystem.h:572
#define _(A, B)
Definition cfortran.h:108
void GetCollectionInfo(const ROOT::NTupleSize_t globalIndex, RNTupleLocalIndex *collectionStart, ROOT::NTupleSize_t *collectionSize)
For offset columns only, look at the two adjacent values that define a collection's coordinates.
Definition RColumn.hxx:283
An artificial field that provides the size of a fixed-size array.
void GenerateColumns(const ROOT::RNTupleDescriptor &) final
Implementations in derived classes should create the backing columns corresponding to the field type ...
std::unique_ptr< ROOT::RFieldBase > CloneImpl(std::string_view) const final
Called by Clone(), which additionally copies the on-disk ID.
void ReadGlobalImpl(ROOT::NTupleSize_t, void *to) final
RArraySizeField(const RArraySizeField &other)=delete
RArraySizeField(RArraySizeField &&other)=default
RArraySizeField & operator=(RArraySizeField &&other)=default
void ReadInClusterImpl(RNTupleLocalIndex, void *to) final
void ReconcileOnDiskField(const RNTupleDescriptor &) final
For non-artificial fields, check compatibility of the in-memory field and the on-disk field.
std::size_t GetValueSize() const final
The number of bytes taken by a value of the appropriate type.
RArraySizeField(std::size_t arrayLength)
void ConstructValue(void *where) const final
Constructs value in a given location of size at least GetValueSize(). Called by the base class' Creat...
RArraySizeField & operator=(const RArraySizeField &other)=delete
std::size_t GetAlignment() const final
As a rule of thumb, the alignment is equal to the size of the type.
void GenerateColumns() final
Implementations in derived classes should create the backing columns corresponding to the field type ...
Every RDF column is represented by exactly one RNTuple field.
void * GetImpl(Long64_t entry) final
void Connect(RPageSource &source, Long64_t entryOffset)
Connect the field and its subfields to the page source.
RNTupleColumnReader(RNTupleDS *ds, RFieldBase *protoField)
std::unique_ptr< RFieldBase::RValue > fValue
The memory location used to read from fField.
std::unique_ptr< RFieldBase > fField
The field backing the RDF column.
Long64_t fEntryOffset
For chains, the logical entry and the physical entry in any particular file can be different.
std::shared_ptr< void > fValuePtr
Used to reuse the object created by fValue when reconnecting sources.
RNTupleDS * fDataSource
The data source that owns this column reader.
RFieldBase * fProtoField
The prototype field from which fField is cloned.
Long64_t fLastEntry
Last entry number that was read.
An artificial field that transforms an RNTuple column that contains the offset of collections into co...
Definition RNTupleDS.cxx:59
RRDFCardinalityField(RRDFCardinalityField &&other)=default
size_t GetAlignment() const final
As a rule of thumb, the alignment is equal to the size of the type.
Definition RNTupleDS.cxx:93
void GenerateColumns() final
Implementations in derived classes should create the backing columns corresponding to the field type ...
Definition RNTupleDS.cxx:86
void ReadGlobalImpl(ROOT::NTupleSize_t globalIndex, void *to) final
Get the number of elements of the collection identified by globalIndex.
Definition RNTupleDS.cxx:96
void ReconcileOnDiskField(const RNTupleDescriptor &) final
For non-artificial fields, check compatibility of the in-memory field and the on-disk field.
Definition RNTupleDS.cxx:68
RRDFCardinalityField & operator=(RRDFCardinalityField &&other)=default
void GenerateColumns(const ROOT::RNTupleDescriptor &desc) final
Implementations in derived classes should create the backing columns corresponding to the field type ...
Definition RNTupleDS.cxx:87
const RColumnRepresentations & GetColumnRepresentations() const final
Implementations in derived classes should return a static RColumnRepresentations object.
Definition RNTupleDS.cxx:76
size_t GetValueSize() const final
The number of bytes taken by a value of the appropriate type.
Definition RNTupleDS.cxx:92
void ReadInClusterImpl(ROOT::RNTupleLocalIndex localIndex, void *to) final
Get the number of elements of the collection identified by clusterIndex.
std::unique_ptr< ROOT::RFieldBase > CloneImpl(std::string_view) const final
Called by Clone(), which additionally copies the on-disk ID.
Definition RNTupleDS.cxx:61
void ConstructValue(void *where) const final
Constructs value in a given location of size at least GetValueSize(). Called by the base class' Creat...
Definition RNTupleDS.cxx:65
static void SetClusterBunchSize(RNTupleReadOptions &options, unsigned int val)
Abstract interface to read data from an ntuple.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
std::vector< void * > Record_t
std::optional< std::pair< ULong64_t, ULong64_t > > fGlobalEntryRange
The RDataSource implementation for RNTuple.
Definition RNTupleDS.hxx:74
void AddField(const ROOT::RNTupleDescriptor &desc, std::string_view colName, ROOT::DescriptorId_t fieldId, std::vector< RFieldInfo > fieldInfos, bool convertToRVec=true)
Provides the RDF column "colName" given the field identified by fieldID.
std::vector< std::pair< ULong64_t, ULong64_t > > GetEntryRanges() final
Return ranges of entries to distribute to tasks.
void ExecStaging()
The main function of the fThreadStaging background thread.
std::vector< std::unique_ptr< ROOT::Internal::RPageSource > > fStagingArea
The staging area is relevant for chains of files, i.e.
std::unique_ptr< ROOT::Detail::RDF::RColumnReaderBase > GetColumnReaders(unsigned int, std::string_view, const std::type_info &) final
If the other GetColumnReaders overload returns an empty vector, this overload will be called instead.
std::vector< std::unique_ptr< ROOT::RFieldBase > > fProtoFields
We prepare a prototype field for every column.
void SetNSlots(unsigned int nSlots) final
Inform RDataSource of the number of processing slots (i.e.
ROOT::RFieldBase * GetFieldWithTypeChecks(std::string_view fieldName, const std::type_info &tid)
std::vector< std::string > fFileNames
Definition RNTupleDS.hxx:93
void InitSlot(unsigned int slot, ULong64_t firstEntry) final
Convenience method called at the start of the data processing associated to a slot.
RNTupleDS(std::unique_ptr< ROOT::Internal::RPageSource > pageSource)
std::string GetTypeName(std::string_view colName) const final
Type of a column as a string, e.g.
std::unordered_map< ROOT::DescriptorId_t, std::string > fFieldId2QualifiedName
Connects the IDs of active proto fields and their subfields to their fully qualified name (a....
std::string fNTupleName
The data source may be constructed with an ntuple name and a list of files.
Definition RNTupleDS.hxx:92
void PrepareNextRanges()
Populates fNextRanges with the next set of entry ranges.
void StageNextSources()
Starting from fNextFileIndex, opens the next fNSlots files.
void Finalize() final
Convenience method called after concluding an event-loop.
std::vector< std::string > fColumnTypes
void Initialize() final
Convenience method called before starting an event-loop.
std::vector< std::string > fColumnNames
bool HasColumn(std::string_view colName) const final
Checks if the dataset has a certain column.
Record_t GetColumnReadersImpl(std::string_view name, const std::type_info &) final
type-erased vector of pointers to pointers to column values - one per slot
void FinalizeSlot(unsigned int slot) final
Convenience method called at the end of the data processing associated to a slot.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
ROOT's RDataFrame offers a modern, high-level interface for analysis of data stored in TTree ,...
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
The list of column representations a field can have.
A field translates read and write calls from/to underlying columns to/from tree values.
ROOT::Internal::RColumn * fPrincipalColumn
All fields that have columns have a distinct main column.
RConstSchemaIterator cbegin() const
const std::string & GetFieldName() const
static RResult< std::unique_ptr< RFieldBase > > Create(const std::string &fieldName, const std::string &typeName, const ROOT::RCreateFieldOptions &options, const ROOT::RNTupleDescriptor *desc, ROOT::DescriptorId_t fieldId)
Factory method to resurrect a field from the stored on-disk type information.
ROOT::DescriptorId_t GetOnDiskId() const
std::unique_ptr< RFieldBase > Clone(std::string_view newName) const
Copies the field and its subfields using a possibly new name and a new, unconnected set of columns.
The container field for an ntuple model, which itself has no physical representation.
Definition RField.hxx:59
The on-storage metadata of an RNTuple.
RFieldDescriptorIterable GetFieldIterable(const RFieldDescriptor &fieldDesc) const
const RFieldDescriptor & GetFieldDescriptor(ROOT::DescriptorId_t fieldId) const
ROOT::DescriptorId_t FindFieldId(std::string_view fieldName, ROOT::DescriptorId_t parentId) const
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Common user-tunable settings for reading RNTuples.
const_iterator begin() const
const_iterator end() const
virtual const char * Getenv(const char *env)
Get environment variable.
Definition TSystem.cxx:1676
std::string TypeID2TypeName(const std::type_info &id)
Returns the name of a type starting from its type_info An empty string is returned in case of failure...
Definition RDFUtils.cxx:178
ROOT::RDataFrame FromRNTuple(std::string_view ntupleName, const std::vector< std::string > &fileNames, const std::pair< ULong64_t, ULong64_t > &range)
Internal overload of the function that allows passing a range of entries.
std::pair< std::vector< ROOT::Internal::RNTupleClusterBoundaries >, ROOT::NTupleSize_t > GetClustersAndEntries(std::string_view ntupleName, std::string_view location)
Retrieves the cluster boundaries and the number of entries for the input RNTuple.
void SetAllowFieldSubstitutions(RFieldZero &fieldZero, bool val)
Definition RField.cxx:34
void CallConnectPageSourceOnField(RFieldBase &, ROOT::Internal::RPageSource &)
std::vector< ROOT::Internal::RNTupleClusterBoundaries > GetClusterBoundaries(const RNTupleDescriptor &desc)
Return the cluster boundaries for each cluster in this RNTuple.
std::string GetRenormalizedTypeName(const std::string &metaNormalizedName)
Given a type name normalized by ROOT meta, renormalize it for RNTuple. E.g., insert std::prefix.
RDataFrame FromRNTuple(std::string_view ntupleName, std::string_view fileName)
std::vector< std::string > ColumnNames_t
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
ENTupleStructure
The fields in the RNTuple data model tree can carry different structural information about the type s...
Tag to let data sources use the native data type when creating a column reader.
Definition Utils.hxx:344
The PrepareNextRanges() method populates the fNextRanges list with REntryRangeDS records.
Definition RNTupleDS.hxx:80