Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleDS.cxx
Go to the documentation of this file.
1/// \file RNTupleDS.cxx
2/// \author Jakob Blomer <jblomer@cern.ch>
3/// \author Enrico Guiraud <enrico.guiraud@cern.ch>
4/// \date 2018-10-04
5
6/*************************************************************************
7 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
8 * All rights reserved. *
9 * *
10 * For the licensing terms see $ROOTSYS/LICENSE. *
11 * For the list of contributors see $ROOTSYS/README/CREDITS. *
12 *************************************************************************/
13
15#include <ROOT/RDataFrame.hxx>
16#include <ROOT/RDF/Utils.hxx>
17#include <ROOT/RField.hxx>
18#include <ROOT/RFieldUtils.hxx>
21#include <ROOT/RNTupleDS.hxx>
22#include <ROOT/RNTupleTypes.hxx>
23#include <ROOT/RPageStorage.hxx>
24#include <string_view>
25
26#include <TError.h>
27#include <TSystem.h>
28
29#include <cassert>
30#include <memory>
31#include <mutex>
32#include <string>
33#include <vector>
34#include <typeinfo>
35#include <utility>
36
37// clang-format off
38/**
39* \class ROOT::RDF::RNTupleDS
40* \ingroup dataframe
41* \brief The RDataSource implementation for RNTuple. It lets RDataFrame read RNTuple data.
42*
43* An RDataFrame that reads RNTuple data can be constructed using FromRNTuple().
44*
45* For each column containing an array or a collection, a corresponding column `#colname` is available to access
46* `colname.size()` without reading and deserializing the collection values.
47*
48**/
49// clang-format on
50
51namespace ROOT::Internal::RDF {
52/// An artificial field that transforms an RNTuple column that contains the offset of collections into
53/// collection sizes. It is used to provide the "number of" RDF columns for collections, e.g.
54/// `R_rdf_sizeof_jets` for a collection named `jets`.
55///
56/// This field owns the collection offset field but instead of exposing the collection offsets it exposes
57/// the collection sizes (offset(N+1) - offset(N)). For the time being, we offer this functionality only in RDataFrame.
58/// TODO(jblomer): consider providing a general set of useful virtual fields as part of RNTuple.
60protected:
61 std::unique_ptr<ROOT::RFieldBase> CloneImpl(std::string_view /* newName */) const final
62 {
63 return std::make_unique<RRDFCardinalityField>();
64 }
65 void ConstructValue(void *where) const final { *static_cast<std::size_t *>(where) = 0; }
66
67 // We construct these fields and know that they match the page source
69
70public:
71 RRDFCardinalityField() : ROOT::RFieldBase("", "std::size_t", ROOT::ENTupleStructure::kPlain, false /* isSimple */) {}
74 ~RRDFCardinalityField() override = default;
75
85 // Field is only used for reading
86 void GenerateColumns() final { throw RException(R__FAIL("Cardinality fields must only be used for reading")); }
91
92 size_t GetValueSize() const final { return sizeof(std::size_t); }
93 size_t GetAlignment() const final { return alignof(std::size_t); }
94
95 /// Get the number of elements of the collection identified by globalIndex
103
104 /// Get the number of elements of the collection identified by clusterIndex
112};
113
114/**
115 * @brief An artificial field that provides the size of a fixed-size array
116 *
117 * This is the implementation of `R_rdf_sizeof_column` in case `column` contains
118 * fixed-size arrays on disk.
119 */
121private:
122 std::size_t fArrayLength;
123
124 std::unique_ptr<ROOT::RFieldBase> CloneImpl(std::string_view) const final
125 {
126 return std::make_unique<RArraySizeField>(fArrayLength);
127 }
128 void GenerateColumns() final { throw RException(R__FAIL("RArraySizeField fields must only be used for reading")); }
130 void ReadGlobalImpl(ROOT::NTupleSize_t /*globalIndex*/, void *to) final
131 {
132 *static_cast<std::size_t *>(to) = fArrayLength;
133 }
134 void ReadInClusterImpl(RNTupleLocalIndex /*localIndex*/, void *to) final
135 {
136 *static_cast<std::size_t *>(to) = fArrayLength;
137 }
138
139 // We construct these fields and know that they match the page source
141
142public:
144 : ROOT::RFieldBase("", "std::size_t", ROOT::ENTupleStructure::kPlain, false /* isSimple */),
146 {
147 }
153
154 void ConstructValue(void *where) const final { *static_cast<std::size_t *>(where) = 0; }
155 std::size_t GetValueSize() const final { return sizeof(std::size_t); }
156 std::size_t GetAlignment() const final { return alignof(std::size_t); }
157};
158
159/// Every RDF column is represented by exactly one RNTuple field
163
164 RNTupleDS *fDataSource; ///< The data source that owns this column reader
165 RFieldBase *fProtoField; ///< The prototype field from which fField is cloned
166 std::unique_ptr<RFieldBase> fField; ///< The field backing the RDF column
167 std::unique_ptr<RFieldBase::RValue> fValue; ///< The memory location used to read from fField
168 std::shared_ptr<void> fValuePtr; ///< Used to reuse the object created by fValue when reconnecting sources
169 Long64_t fLastEntry = -1; ///< Last entry number that was read
170 /// For chains, the logical entry and the physical entry in any particular file can be different.
171 /// The entry offset stores the logical entry number (sum of all previous physical entries) when a file of the
172 /// corresponding data source was opened.
174
175public:
177 ~RNTupleColumnReader() override = default;
178
179 /// Connect the field and its subfields to the page source
181 {
182 assert(fLastEntry == -1);
183
185
186 // Create a new, real field from the prototype and set its field ID in the context of the given page source
188 {
189 auto descGuard = source.GetSharedDescriptorGuard();
190 // Set the on-disk field IDs for the field and the subfield
191 fField->SetOnDiskId(
193 auto iProto = fProtoField->cbegin();
194 auto iReal = fField->begin();
195 for (; iReal != fField->end(); ++iProto, ++iReal) {
196 iReal->SetOnDiskId(descGuard->FindFieldId(fDataSource->fFieldId2QualifiedName.at(iProto->GetOnDiskId())));
197 }
198 }
199
200 try {
202 } catch (const ROOT::RException &err) {
203 auto onDiskType = source.GetSharedDescriptorGuard()->GetFieldDescriptor(fField->GetOnDiskId()).GetTypeName();
204 std::string msg = "RNTupleDS: invalid type \"" + fField->GetTypeName() + "\" for column \"" +
205 fDataSource->fFieldId2QualifiedName[fField->GetOnDiskId()] + "\" with on-disk type \"" +
206 onDiskType + "\"";
207 throw std::runtime_error(msg);
208 }
209
210 if (fValuePtr) {
211 // When the reader reconnects to a new file, the fValuePtr is already set
212 fValue = std::make_unique<RFieldBase::RValue>(fField->BindValue(fValuePtr));
213 fValuePtr = nullptr;
214 } else {
215 // For the first file, create a new object for this field (reader)
216 fValue = std::make_unique<RFieldBase::RValue>(fField->CreateValue());
217 }
218 }
219
221 {
222 if (fValue && keepValue) {
223 fValuePtr = fValue->GetPtr<void>();
224 }
225 fValue = nullptr;
226 fField = nullptr;
227 fLastEntry = -1;
228 }
229
230 void *GetImpl(Long64_t entry) final
231 {
232 if (entry != fLastEntry) {
233 fValue->Read(entry - fEntryOffset);
235 }
236 return fValue->GetPtr<void>().get();
237 }
238};
239} // namespace ROOT::Internal::RDF
240
242
244 ROOT::DescriptorId_t fieldId, std::vector<RNTupleDS::RFieldInfo> fieldInfos,
245 bool convertToRVec)
246{
247 // As an example for the mapping of RNTuple fields to RDF columns, let's consider an RNTuple
248 // using the following types and with a top-level field named "event" of type Event:
249 //
250 // struct Event {
251 // int id;
252 // std::vector<Track> tracks;
253 // };
254 // struct Track {
255 // std::vector<Hit> hits;
256 // };
257 // struct Hit {
258 // float x;
259 // float y;
260 // };
261 //
262 // AddField() will be called from the constructor with the RNTuple root field (ENTupleStructure::kRecord).
263 // From there, we recurse into the "event" sub field (also ENTupleStructure::kRecord) and further down the
264 // tree of sub fields and expose the following RDF columns:
265 //
266 // "event" [Event]
267 // "event.id" [int]
268 // "event.tracks" [RVec<Track>]
269 // "R_rdf_sizeof_event.tracks" [unsigned int]
270 // "event.tracks.hits" [RVec<RVec<Hit>>]
271 // "R_rdf_sizeof_event.tracks.hits" [RVec<unsigned int>]
272 // "event.tracks.hits.x" [RVec<RVec<float>>]
273 // "R_rdf_sizeof_event.tracks.hits.x" [RVec<unsigned int>]
274 // "event.tracks.hits.y" [RVec<RVec<float>>]
275 // "R_rdf_sizeof_event.tracks.hits.y" [RVec<unsigned int>]
276
277 const auto &fieldDesc = desc.GetFieldDescriptor(fieldId);
278 const auto &nRepetitions = fieldDesc.GetNRepetitions();
279 if ((fieldDesc.GetStructure() == ROOT::ENTupleStructure::kCollection) || (nRepetitions > 0)) {
280 // The field is a collection or a fixed-size array.
281 // We open a new collection scope with fieldID being the inner most collection. E.g. for "event.tracks.hits",
282 // fieldInfos would already contain the fieldID of "event.tracks"
283 fieldInfos.emplace_back(fieldId, nRepetitions);
284 }
285
286 if (fieldDesc.GetStructure() == ROOT::ENTupleStructure::kCollection) {
287 // Inner fields of collections are provided as projected collections of only that inner field,
288 // E.g. we provide a projected collection RVec<RVec<float>> for "event.tracks.hits.x" in the example
289 // above.
291 convertToRVec && (fieldDesc.GetTypeName().substr(0, 19) == "ROOT::VecOps::RVec<" ||
292 fieldDesc.GetTypeName().substr(0, 12) == "std::vector<" || fieldDesc.GetTypeName() == "");
293 const auto &f = *desc.GetFieldIterable(fieldDesc.GetId()).begin();
295
296 // Note that at the end of the recursion, we handled the inner sub collections as well as the
297 // collection as whole, so we are done.
298 return;
299
300 } else if (nRepetitions > 0) {
301 // Fixed-size array, same logic as ROOT::RVec.
302 const auto &f = *desc.GetFieldIterable(fieldDesc.GetId()).begin();
303 AddField(desc, colName, f.GetId(), fieldInfos);
304 return;
305 } else if (fieldDesc.GetStructure() == ROOT::ENTupleStructure::kRecord) {
306 // Inner fields of records are provided as individual RDF columns, e.g. "event.id"
307 for (const auto &f : desc.GetFieldIterable(fieldDesc.GetId())) {
308 auto innerName = colName.empty() ? f.GetFieldName() : (std::string(colName) + "." + f.GetFieldName());
309 // Inner fields of collections of records are always exposed as ROOT::RVec
310 AddField(desc, innerName, f.GetId(), fieldInfos);
311 }
312 }
313
314 // The fieldID could be the root field or the class of fieldId might not be loaded.
315 // In these cases, only the inner fields are exposed as RDF columns.
316 auto fieldOrException = ROOT::RFieldBase::Create(fieldDesc.GetFieldName(), fieldDesc.GetTypeName());
317 if (!fieldOrException)
318 return;
319 auto valueField = fieldOrException.Unwrap();
320 valueField->SetOnDiskId(fieldId);
321 for (auto &f : *valueField) {
322 f.SetOnDiskId(desc.FindFieldId(f.GetFieldName(), f.GetParent()->GetOnDiskId()));
323 }
324 std::unique_ptr<ROOT::RFieldBase> cardinalityField;
325 // Collections get the additional "number of" RDF column (e.g. "R_rdf_sizeof_tracks")
326 if (!fieldInfos.empty()) {
327 const auto &info = fieldInfos.back();
328 if (info.fNRepetitions > 0) {
329 cardinalityField = std::make_unique<ROOT::Internal::RDF::RArraySizeField>(info.fNRepetitions);
330 } else {
331 cardinalityField = std::make_unique<ROOT::Internal::RDF::RRDFCardinalityField>();
332 }
333 cardinalityField->SetOnDiskId(info.fFieldId);
334 }
335
336 for (auto i = fieldInfos.rbegin(); i != fieldInfos.rend(); ++i) {
337 const auto &fieldInfo = *i;
338
339 if (fieldInfo.fNRepetitions > 0) {
340 // Fixed-size array, read it as ROOT::RVec in memory
341 valueField = std::make_unique<ROOT::RArrayAsRVecField>("", std::move(valueField), fieldInfo.fNRepetitions);
342 } else {
343 // Actual collection. A std::vector or ROOT::RVec gets added as a ROOT::RVec. All other collection types keep
344 // their original type.
345 if (convertToRVec) {
346 valueField = std::make_unique<ROOT::RRVecField>("", std::move(valueField));
347 } else {
348 auto outerFieldType = desc.GetFieldDescriptor(fieldInfo.fFieldId).GetTypeName();
350 }
351 }
352
353 valueField->SetOnDiskId(fieldInfo.fFieldId);
354
355 // Skip the inner-most collection level to construct the cardinality column
356 // It's taken care of by the `if (!fieldInfos.empty())` scope above
357 if (i != fieldInfos.rbegin()) {
358 if (fieldInfo.fNRepetitions > 0) {
359 // This collection level refers to a fixed-size array
361 std::make_unique<ROOT::RArrayAsRVecField>("", std::move(cardinalityField), fieldInfo.fNRepetitions);
362 } else {
363 // This collection level refers to an RVec
364 cardinalityField = std::make_unique<ROOT::RRVecField>("", std::move(cardinalityField));
365 }
366
367 cardinalityField->SetOnDiskId(fieldInfo.fFieldId);
368 }
369 }
370
371 if (cardinalityField) {
372 fColumnNames.emplace_back("R_rdf_sizeof_" + std::string(colName));
373 fColumnTypes.emplace_back(cardinalityField->GetTypeName());
374 fProtoFields.emplace_back(std::move(cardinalityField));
375 }
376
377 fieldInfos.emplace_back(fieldId, nRepetitions);
378 fColumnNames.emplace_back(colName);
379 fColumnTypes.emplace_back(valueField->GetTypeName());
380 fProtoFields.emplace_back(std::move(valueField));
381}
382
383ROOT::RDF::RNTupleDS::RNTupleDS(std::unique_ptr<ROOT::Internal::RPageSource> pageSource)
384{
385 pageSource->Attach();
386 fPrincipalDescriptor = pageSource->GetSharedDescriptorGuard()->Clone();
387 fStagingArea.emplace_back(std::move(pageSource));
388
389 AddField(fPrincipalDescriptor, "", fPrincipalDescriptor.GetFieldZeroId(),
390 std::vector<ROOT::RDF::RNTupleDS::RFieldInfo>());
391}
392
393namespace {
394
396{
397 // The setting is for now a global one, must be decided before running the
398 // program by setting the appropriate environment variable. Make sure that
399 // option configuration is thread-safe and happens only once.
401 static std::once_flag flag;
402 std::call_once(flag, []() {
403 if (auto env = gSystem->Getenv("ROOT_RNTUPLE_CLUSTERBUNCHSIZE"); env != nullptr && strlen(env) > 0) {
404 std::string envStr{env};
405 auto envNum{std::stoul(envStr)};
406 envNum = envNum == 0 ? 1 : envNum;
408 }
409 });
410 return opts;
411}
412
413std::unique_ptr<ROOT::Internal::RPageSource> CreatePageSource(std::string_view ntupleName, std::string_view fileName)
414{
416}
417} // namespace
418
419ROOT::RDF::RNTupleDS::RNTupleDS(std::string_view ntupleName, std::string_view fileName)
420 : RNTupleDS(CreatePageSource(ntupleName, fileName))
421{
422 fFileNames = std::vector<std::string>{std::string{fileName}};
423}
424
425ROOT::RDF::RNTupleDS::RNTupleDS(std::string_view ntupleName, const std::vector<std::string> &fileNames)
426 : RNTupleDS(CreatePageSource(ntupleName, fileNames[0]))
427{
430 fStagingArea.resize(fFileNames.size());
431}
432
433ROOT::RDF::RNTupleDS::RNTupleDS(std::string_view ntupleName, const std::vector<std::string> &fileNames,
434 const std::pair<ULong64_t, ULong64_t> &range)
436{
438}
439
441ROOT::RDF::RNTupleDS::GetColumnReadersImpl(std::string_view /* name */, const std::type_info & /* ti */)
442{
443 // This datasource uses the newer GetColumnReaders() API
444 return {};
445}
446
448{
449 // At this point we can assume that `name` will be found in fColumnNames
450 const auto index =
451 std::distance(fColumnNames.begin(), std::find(fColumnNames.begin(), fColumnNames.end(), fieldName));
452
453 // A reader was requested but we don't have RTTI for it, this is encoded with the tag UseNativeDataType. We can just
454 // return the available protofield
456 return fProtoFields[index].get();
457 }
458
459 // The user explicitly requested a type
461
462 // If the field corresponding to the provided name is not a cardinality column and the requested type is different
463 // from the proto field that was created when the data source was constructed, we first have to create an
464 // alternative proto field for the column reader. Otherwise, we can directly use the existing proto field.
465 if (fieldName.substr(0, 13) != "R_rdf_sizeof_" && requestedType != fColumnTypes[index]) {
466 auto &altProtoFields = fAlternativeProtoFields[index];
467
468 // If we can find the requested type in the registered alternative protofields, return the corresponding field
469 if (auto altProtoField = std::find_if(altProtoFields.begin(), altProtoFields.end(),
470 [&requestedType](const std::unique_ptr<ROOT::RFieldBase> &fld) {
471 return fld->GetTypeName() == requestedType;
472 });
474 return altProtoField->get();
475 }
476
477 // Otherwise, create a new protofield and register it in the alternatives before returning
480 throw std::runtime_error("RNTupleDS: Could not create field with type \"" + requestedType +
481 "\" for column \"" + std::string(fieldName) + "\"");
482 }
484 newAltProtoField->SetOnDiskId(fProtoFields[index]->GetOnDiskId());
485 auto *newField = newAltProtoField.get();
486 altProtoFields.emplace_back(std::move(newAltProtoField));
487 return newField;
488 }
489
490 // General case: there was a correspondence between the user-requested type and the corresponding column type
491 return fProtoFields[index].get();
492}
493
494std::unique_ptr<ROOT::Detail::RDF::RColumnReaderBase>
495ROOT::RDF::RNTupleDS::GetColumnReaders(unsigned int slot, std::string_view name, const std::type_info &tid)
496{
497 ROOT::RFieldBase *field = GetFieldWithTypeChecks(name, tid);
498 assert(field != nullptr);
499
500 // Map the field's and subfields' IDs to qualified names so that we can later connect the fields to
501 // other page sources from the chain
502 fFieldId2QualifiedName[field->GetOnDiskId()] = fPrincipalDescriptor.GetQualifiedFieldName(field->GetOnDiskId());
503 for (const auto &s : *field) {
504 fFieldId2QualifiedName[s.GetOnDiskId()] = fPrincipalDescriptor.GetQualifiedFieldName(s.GetOnDiskId());
505 }
506
507 auto reader = std::make_unique<ROOT::Internal::RDF::RNTupleColumnReader>(this, field);
508 fActiveColumnReaders[slot].emplace_back(reader.get());
509
510 return reader;
511}
512
514{
515 while (true) {
516 std::unique_lock lock(fMutexStaging);
517 fCvStaging.wait(lock, [this] { return fIsReadyForStaging || fStagingThreadShouldTerminate; });
518 if (fStagingThreadShouldTerminate)
519 return;
520
521 assert(!fHasNextSources);
522 StageNextSources();
523 fHasNextSources = true;
524 fIsReadyForStaging = false;
525
526 lock.unlock();
527 fCvStaging.notify_one();
528 }
529}
530
532{
533 const auto nFiles = fFileNames.empty() ? 1 : fFileNames.size();
534
535 for (auto i = fNextFileIndex; (i < nFiles) && ((i - fNextFileIndex) < fNSlots); ++i) {
536
537 if (fStagingThreadShouldTerminate)
538 return;
539
540 if (fStagingArea[i]) {
541 // The first file is already open and was used to read the schema
542 assert(i == 0);
543 } else {
544 fStagingArea[i] = CreatePageSource(fNTupleName, fFileNames[i]);
545 fStagingArea[i]->LoadStructure();
546 }
547 }
548}
549
551{
552 assert(fNextRanges.empty());
553
554 auto nFiles = fFileNames.empty() ? 1 : fFileNames.size();
555 auto nRemainingFiles = nFiles - fNextFileIndex;
556
557 if (nRemainingFiles == 0)
558 return;
559
560 // Easy work scheduling: one file per slot. We skip empty files (files without entries).
561
562 if ((nRemainingFiles >= fNSlots) || (fGlobalEntryRange.has_value())) {
563 while ((fNextRanges.size() < fNSlots) && (fNextFileIndex < nFiles)) {
565
566 std::swap(fStagingArea[fNextFileIndex], range.fSource);
567
568 if (!range.fSource) {
569 // Typically, the prestaged source should have been present. Only if some of the files are empty, we need
570 // to open and attach files here.
571 range.fSource = CreatePageSource(fNTupleName, fFileNames[fNextFileIndex]);
572 }
573 range.fFileName = fFileNames[fNextFileIndex];
574 range.fSource->Attach();
575 fNextFileIndex++;
576 auto nEntries = range.fSource->GetNEntries();
577 if (nEntries == 0)
578 continue;
579 range.fLastEntry = nEntries; // whole file per slot, i.e. entry range [0..nEntries - 1]
580
581 fNextRanges.emplace_back(std::move(range));
582 }
583 return;
584 }
585
586 // Work scheduling of the tail: multiple slots work on the same file.
587 // Every slot still has its own page source but these page sources may open the same file.
588 // Again, we need to skip empty files.
589 unsigned int nSlotsPerFile = fNSlots / nRemainingFiles;
590 for (std::size_t i = 0; (fNextRanges.size() < fNSlots) && (fNextFileIndex < nFiles); ++i) {
591 std::unique_ptr<ROOT::Internal::RPageSource> source;
592 // Need to look for the file name to populate the sample info later
593 const auto &sourceFileName = fFileNames[fNextFileIndex];
594 std::swap(fStagingArea[fNextFileIndex], source);
595 if (!source) {
596 // Empty files trigger this condition
597 source = CreatePageSource(fNTupleName, fFileNames[fNextFileIndex]);
598 }
599 source->Attach();
600 fNextFileIndex++;
601
602 auto nEntries = source->GetNEntries();
603 if (nEntries == 0)
604 continue;
605
606 // If last file: use all remaining slots
607 if (i == (nRemainingFiles - 1))
608 nSlotsPerFile = fNSlots - fNextRanges.size();
609
610 const auto rangesByCluster = [&source]() {
611 // Take the shared lock of the descriptor just for the time necessary
612 const auto descGuard = source->GetSharedDescriptorGuard();
614 }();
615
616 const unsigned int nRangesByCluster = rangesByCluster.size();
617
618 // Distribute slots equidistantly over the entry range, aligned on cluster boundaries
620 const auto remainder = nRangesByCluster % nSlotsPerFile;
621 std::size_t iRange = 0;
622 unsigned int iSlot = 0;
623 const unsigned int N = std::min(nSlotsPerFile, nRangesByCluster);
624 for (; iSlot < N; ++iSlot) {
625 auto start = rangesByCluster[iRange].fFirstEntry;
626 iRange += nClustersPerSlot + static_cast<int>(iSlot < remainder);
627 assert(iRange > 0);
628 auto end = rangesByCluster[iRange - 1].fLastEntryPlusOne;
629
631 range.fFileName = sourceFileName;
632 // The last range for this file just takes the already opened page source. All previous ranges clone.
633 if (iSlot == N - 1) {
634 range.fSource = std::move(source);
635 } else {
636 range.fSource = source->Clone();
637 }
638 range.fSource->SetEntryRange({start, end - start});
639 range.fFirstEntry = start;
640 range.fLastEntry = end;
641 fNextRanges.emplace_back(std::move(range));
642 }
643 } // loop over tail of remaining files
644}
645
646std::vector<std::pair<ULong64_t, ULong64_t>> ROOT::RDF::RNTupleDS::GetEntryRanges()
647{
648 std::vector<std::pair<ULong64_t, ULong64_t>> ranges;
649
650 // We need to distinguish between single threaded and multi-threaded runs.
651 // In single threaded mode, InitSlot is only called once and column readers have to be rewired
652 // to new page sources of the chain in GetEntryRanges. In multi-threaded mode, on the other hand,
653 // InitSlot is called for every returned range, thus rewiring the column readers takes place in
654 // InitSlot and FinalizeSlot.
655
656 if (fNSlots == 1) {
657 for (auto r : fActiveColumnReaders[0]) {
658 r->Disconnect(true /* keepValue */);
659 }
660 }
661
662 // If we have fewer files than slots and we run multiple event loops, we can reuse fCurrentRanges and don't need
663 // to worry about loading the fNextRanges. I.e., in this case we don't enter the if block.
664 if (fCurrentRanges.empty() || fSeenEntriesNoGlobalRange > 0) {
665 // Otherwise, i.e. start of the first event loop or in the middle of the event loop, prepare the next ranges
666 // and swap with the current ones.
667 {
668 std::unique_lock lock(fMutexStaging);
669 fCvStaging.wait(lock, [this] { return fHasNextSources; });
670 }
671 PrepareNextRanges();
672 if (fNextRanges.empty()) {
673 // No more data
674 return ranges;
675 }
676
677 assert(fNextRanges.size() <= fNSlots);
678
679 fCurrentRanges.clear();
680 std::swap(fCurrentRanges, fNextRanges);
681 }
682
683 // Stage next batch of files for the next call to GetEntryRanges()
684 {
685 std::lock_guard _(fMutexStaging);
686 fIsReadyForStaging = true;
687 fHasNextSources = false;
688 }
689 fCvStaging.notify_one();
690
691 // Create ranges for the RDF loop manager from the list of REntryRangeDS records.
692 // The entry ranges that are relative to the page source in REntryRangeDS are translated into absolute
693 // entry ranges, given the current state of the entry cursor.
694 // We remember the connection from first absolute entry index of a range to its REntryRangeDS record
695 // so that we can properly rewire the column reader in InitSlot
696 fFirstEntry2RangeIdx.clear();
697 fOriginalRanges.clear();
698
700
701 for (std::size_t i = 0; i < fCurrentRanges.size(); ++i) {
702
703 // Several consecutive ranges may operate on the same file (each with their own page source clone).
704 // We can detect a change of file when the first entry number jumps back to 0.
705 if (fCurrentRanges[i].fFirstEntry == 0) {
706 // New source
707 fSeenEntriesNoGlobalRange += nEntriesPerSource;
709 }
710
711 auto start = fCurrentRanges[i].fFirstEntry + fSeenEntriesNoGlobalRange;
712 auto end = fCurrentRanges[i].fLastEntry + fSeenEntriesNoGlobalRange;
713
714 nEntriesPerSource += end - start;
715
716 if (fGlobalEntryRange.has_value()) {
717
718 // We need to consider different scenarios for when we have GlobalRanges set by the user.
719 // Consider a simple case of 3 files, with original ranges set as (consecutive entries of 3 files):
720 // [0, 20], [20, 45], [45, 65]
721 // we will now see what happens in each of the scenarios below when GlobalRanges can be set to different
722 // values:
723 // a) [2, 5] - we stay in file 1
724 // - hence we will use the 1st case and get the range [2,5], in this case we also need to quit further
725 // processing from the other files by entering case 3
726 // b) [2, 21] - we start in file 1 and finish in file 2
727 // - use the 2nd case first, as 21 > 20 (end of first file), then we will go to case 1, resulting in ranges:
728 // [2, 20], [20, 21], c) [21 - 40] - we skip file 1, start in file 2 and stay in file 2
729 // - to skip the first file, we use the 4th case, followed by the 1st case, resulting range is: [21, 40]
730 // d) [21 - 65] - we skip file 1, start in file 2 and continue to file 3
731 // - to skip the first file, we use the 4th case, we continue with the 2nd case, and use the 1st case at the
732 // end, resulting ranges are [21, 45], [45, 65]
733 // The first case
734 if (fGlobalEntryRange->first >= start && fGlobalEntryRange->second <= end) {
735 fOriginalRanges.emplace_back(start, end);
736 fFirstEntry2RangeIdx[fGlobalEntryRange->first] = i;
737 ranges.emplace_back(fGlobalEntryRange->first, fGlobalEntryRange->second);
738 }
739
740 // The second case:
741 // The `fGlobalEntryRange->first < end` condition is to distinguish this case from the 4th case.
742 else if (fGlobalEntryRange->second > end && fGlobalEntryRange->first < end) {
743 fOriginalRanges.emplace_back(start, end);
744 fFirstEntry2RangeIdx[fGlobalEntryRange->first] = i;
745 ranges.emplace_back(fGlobalEntryRange->first, end);
746 std::optional<std::pair<ULong64_t, ULong64_t>> newvalues({end, fGlobalEntryRange->second});
747 fGlobalEntryRange.swap(newvalues);
748 }
749 // The third case, needed to correctly quit processing if we only stay in the first file
750 else if (fGlobalEntryRange->second < start) {
751 return ranges;
752 }
753
754 // The fourth case:
755 else if (fGlobalEntryRange->first >= end) {
756 fOriginalRanges.emplace_back(start, end);
757 fFirstEntry2RangeIdx[start] = i;
758 ranges.emplace_back(start, start);
759 }
760 }
761
762 else {
763 fFirstEntry2RangeIdx[start] = i;
764 fOriginalRanges.emplace_back(start, end);
765 ranges.emplace_back(start, end);
766 }
767 }
768
769 fSeenEntriesNoGlobalRange += nEntriesPerSource;
770
771 if ((fNSlots == 1) && (fCurrentRanges[0].fSource)) {
772 for (auto r : fActiveColumnReaders[0]) {
773 r->Connect(*fCurrentRanges[0].fSource, fOriginalRanges[0].first);
774 }
775 }
776
777 return ranges;
778}
779
781{
782 if (fNSlots == 1) {
783 // Ensure the connection between slot and range is valid also in single-thread mode
784 fSlotsToRangeIdxs[0] = 0;
785 return;
786 }
787
788 // The same slot ID could be picked multiple times in the same execution, thus
789 // ending up processing different page sources. Here we re-establish the
790 // connection between the slot and the correct page source by finding which
791 // range index corresponds to the first entry passed.
792 auto idxRange = fFirstEntry2RangeIdx.at(firstEntry);
793
794 // We also remember this connection so it can later be retrieved in CreateSampleInfo
795 fSlotsToRangeIdxs[slot * ROOT::Internal::RDF::CacheLineStep<std::size_t>()] = idxRange;
796
797 for (auto r : fActiveColumnReaders[slot]) {
798 r->Connect(*fCurrentRanges[idxRange].fSource,
799 fOriginalRanges[idxRange].first - fCurrentRanges[idxRange].fFirstEntry);
800 }
801}
802
804{
805 if (fNSlots == 1)
806 return;
807
808 for (auto r : fActiveColumnReaders[slot]) {
809 r->Disconnect(true /* keepValue */);
810 }
811}
812
813std::string ROOT::RDF::RNTupleDS::GetTypeName(std::string_view colName) const
814{
815 auto colNamePos = std::find(fColumnNames.begin(), fColumnNames.end(), colName);
816
817 if (colNamePos == fColumnNames.end()) {
818 auto msg = std::string("RNTupleDS: There is no column with name \"") + std::string(colName) + "\"";
819 throw std::runtime_error(msg);
820 }
821
822 const auto index = std::distance(fColumnNames.begin(), colNamePos);
823 return fColumnTypes[index];
824}
825
826bool ROOT::RDF::RNTupleDS::HasColumn(std::string_view colName) const
827{
828 return std::find(fColumnNames.begin(), fColumnNames.end(), colName) != fColumnNames.end();
829}
830
832{
833 fSeenEntriesNoGlobalRange = 0;
834 fNextFileIndex = 0;
835 fIsReadyForStaging = fHasNextSources = fStagingThreadShouldTerminate = false;
836 fThreadStaging = std::thread(&RNTupleDS::ExecStaging, this);
837 assert(fNextRanges.empty());
838
839 if (fCurrentRanges.empty() || (fFileNames.size() > fNSlots)) {
840 // First event loop or large number of files: start the staging process.
841 {
842 std::lock_guard _(fMutexStaging);
843 fIsReadyForStaging = true;
844 }
845 fCvStaging.notify_one();
846 } else {
847 // Otherwise, we will reuse fCurrentRanges. Make sure that staging and preparing next ranges will be a noop
848 // (already at the end of the list of files).
849 fNextFileIndex = std::max(fFileNames.size(), std::size_t(1));
850 }
851}
852
854{
855 for (unsigned int i = 0; i < fNSlots; ++i) {
856 for (auto r : fActiveColumnReaders[i]) {
857 r->Disconnect(false /* keepValue */);
858 }
859 }
860 {
861 std::lock_guard _(fMutexStaging);
862 fStagingThreadShouldTerminate = true;
863 }
864 fCvStaging.notify_one();
865 fThreadStaging.join();
866 // If we have a chain with more files than the number of slots, the files opened at the end of the
867 // event loop won't be reused when the event loop restarts, so we can close them.
868 if (fFileNames.size() > fNSlots) {
869 fCurrentRanges.clear();
870 fNextRanges.clear();
871 fStagingArea.clear();
872 fStagingArea.resize(fFileNames.size());
873 }
874}
875
877{
878 assert(fNSlots == 0);
879 assert(nSlots > 0);
880 fNSlots = nSlots;
881 fActiveColumnReaders.resize(fNSlots);
882 fSlotsToRangeIdxs.resize(fNSlots * ROOT::Internal::RDF::CacheLineStep<std::size_t>());
883}
884
885ROOT::RDataFrame ROOT::RDF::FromRNTuple(std::string_view ntupleName, std::string_view fileName)
886{
887 return ROOT::RDataFrame(std::make_unique<ROOT::RDF::RNTupleDS>(ntupleName, fileName));
888}
889
890ROOT::RDataFrame ROOT::RDF::FromRNTuple(std::string_view ntupleName, const std::vector<std::string> &fileNames)
891{
892 return ROOT::RDataFrame(std::make_unique<ROOT::RDF::RNTupleDS>(ntupleName, fileNames));
893}
894
895ROOT::RDF::RSampleInfo ROOT::Internal::RDF::RNTupleDS::CreateSampleInfo(
896 unsigned int slot, const std::unordered_map<std::string, ROOT::RDF::Experimental::RSample *> &sampleMap) const
897{
898 // The same slot ID could be picked multiple times in the same execution, thus
899 // ending up processing different page sources. Here we re-establish the
900 // connection between the slot and the correct page source by retrieving
901 // which range is connected currently to the slot
902
903 const auto &rangeIdx = fSlotsToRangeIdxs.at(slot * ROOT::Internal::RDF::CacheLineStep<std::size_t>());
904
905 // Missing source if a file does not exist
906 if (!fCurrentRanges[rangeIdx].fSource)
907 return ROOT::RDF::RSampleInfo{};
908
909 const auto &ntupleName = fCurrentRanges[rangeIdx].fSource->GetNTupleName();
910 const auto &ntuplePath = fCurrentRanges[rangeIdx].fFileName;
911 const auto ntupleID = std::string(ntuplePath) + '/' + ntupleName;
912
913 if (sampleMap.empty())
915 ntupleID, std::make_pair(fCurrentRanges[rangeIdx].fFirstEntry, fCurrentRanges[rangeIdx].fLastEntry));
916
917 if (sampleMap.find(ntupleID) == sampleMap.end())
918 throw std::runtime_error("Full sample identifier '" + ntupleID + "' cannot be found in the available samples.");
919
921 ntupleID, std::make_pair(fCurrentRanges[rangeIdx].fFirstEntry, fCurrentRanges[rangeIdx].fLastEntry),
922 sampleMap.at(ntupleID));
923}
924
927 const std::pair<ULong64_t, ULong64_t> &range)
928{
929 std::unique_ptr<ROOT::RDF::RNTupleDS> ds{new ROOT::RDF::RNTupleDS(ntupleName, fileNames, range)};
930 return ROOT::RDataFrame(std::move(ds));
931}
932
933std::pair<std::vector<ROOT::Internal::RNTupleClusterBoundaries>, ROOT::NTupleSize_t>
934ROOT::Internal::RDF::GetClustersAndEntries(std::string_view ntupleName, std::string_view location)
935{
937 source->Attach();
938 const auto descGuard = source->GetSharedDescriptorGuard();
939 return std::make_pair(ROOT::Internal::GetClusterBoundaries(descGuard.GetRef()), descGuard->GetNEntries());
940}
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:300
#define f(i)
Definition RSha256.hxx:104
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
long long Long64_t
Portable signed long integer 8 bytes.
Definition RtypesCore.h:83
unsigned long long ULong64_t
Portable unsigned long integer 8 bytes.
Definition RtypesCore.h:84
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define N
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t index
char name[80]
Definition TGX11.cxx:110
R__EXTERN TSystem * gSystem
Definition TSystem.h:572
#define _(A, B)
Definition cfortran.h:108
void GetCollectionInfo(const ROOT::NTupleSize_t globalIndex, RNTupleLocalIndex *collectionStart, ROOT::NTupleSize_t *collectionSize)
For offset columns only, look at the two adjacent values that define a collection's coordinates.
Definition RColumn.hxx:283
An artificial field that provides the size of a fixed-size array.
void GenerateColumns(const ROOT::RNTupleDescriptor &) final
Implementations in derived classes should create the backing columns corresponding to the field type ...
std::unique_ptr< ROOT::RFieldBase > CloneImpl(std::string_view) const final
Called by Clone(), which additionally copies the on-disk ID.
void ReadGlobalImpl(ROOT::NTupleSize_t, void *to) final
RArraySizeField(const RArraySizeField &other)=delete
RArraySizeField(RArraySizeField &&other)=default
RArraySizeField & operator=(RArraySizeField &&other)=default
void ReadInClusterImpl(RNTupleLocalIndex, void *to) final
void ReconcileOnDiskField(const RNTupleDescriptor &) final
For non-artificial fields, check compatibility of the in-memory field and the on-disk field.
std::size_t GetValueSize() const final
The number of bytes taken by a value of the appropriate type.
RArraySizeField(std::size_t arrayLength)
void ConstructValue(void *where) const final
Constructs value in a given location of size at least GetValueSize(). Called by the base class' Creat...
RArraySizeField & operator=(const RArraySizeField &other)=delete
std::size_t GetAlignment() const final
As a rule of thumb, the alignment is equal to the size of the type.
void GenerateColumns() final
Implementations in derived classes should create the backing columns corresponding to the field type ...
Every RDF column is represented by exactly one RNTuple field.
void * GetImpl(Long64_t entry) final
void Connect(RPageSource &source, Long64_t entryOffset)
Connect the field and its subfields to the page source.
RNTupleColumnReader(RNTupleDS *ds, RFieldBase *protoField)
std::unique_ptr< RFieldBase::RValue > fValue
The memory location used to read from fField.
std::unique_ptr< RFieldBase > fField
The field backing the RDF column.
Long64_t fEntryOffset
For chains, the logical entry and the physical entry in any particular file can be different.
std::shared_ptr< void > fValuePtr
Used to reuse the object created by fValue when reconnecting sources.
RNTupleDS * fDataSource
The data source that owns this column reader.
RFieldBase * fProtoField
The prototype field from which fField is cloned.
Long64_t fLastEntry
Last entry number that was read.
An artificial field that transforms an RNTuple column that contains the offset of collections into co...
Definition RNTupleDS.cxx:59
RRDFCardinalityField(RRDFCardinalityField &&other)=default
size_t GetAlignment() const final
As a rule of thumb, the alignment is equal to the size of the type.
Definition RNTupleDS.cxx:93
void GenerateColumns() final
Implementations in derived classes should create the backing columns corresponding to the field type ...
Definition RNTupleDS.cxx:86
void ReadGlobalImpl(ROOT::NTupleSize_t globalIndex, void *to) final
Get the number of elements of the collection identified by globalIndex.
Definition RNTupleDS.cxx:96
void ReconcileOnDiskField(const RNTupleDescriptor &) final
For non-artificial fields, check compatibility of the in-memory field and the on-disk field.
Definition RNTupleDS.cxx:68
RRDFCardinalityField & operator=(RRDFCardinalityField &&other)=default
void GenerateColumns(const ROOT::RNTupleDescriptor &desc) final
Implementations in derived classes should create the backing columns corresponding to the field type ...
Definition RNTupleDS.cxx:87
const RColumnRepresentations & GetColumnRepresentations() const final
Implementations in derived classes should return a static RColumnRepresentations object.
Definition RNTupleDS.cxx:76
size_t GetValueSize() const final
The number of bytes taken by a value of the appropriate type.
Definition RNTupleDS.cxx:92
void ReadInClusterImpl(ROOT::RNTupleLocalIndex localIndex, void *to) final
Get the number of elements of the collection identified by clusterIndex.
std::unique_ptr< ROOT::RFieldBase > CloneImpl(std::string_view) const final
Called by Clone(), which additionally copies the on-disk ID.
Definition RNTupleDS.cxx:61
void ConstructValue(void *where) const final
Constructs value in a given location of size at least GetValueSize(). Called by the base class' Creat...
Definition RNTupleDS.cxx:65
static void SetClusterBunchSize(RNTupleReadOptions &options, unsigned int val)
Abstract interface to read data from an ntuple.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
std::vector< void * > Record_t
std::optional< std::pair< ULong64_t, ULong64_t > > fGlobalEntryRange
The RDataSource implementation for RNTuple.
Definition RNTupleDS.hxx:74
void AddField(const ROOT::RNTupleDescriptor &desc, std::string_view colName, ROOT::DescriptorId_t fieldId, std::vector< RFieldInfo > fieldInfos, bool convertToRVec=true)
Provides the RDF column "colName" given the field identified by fieldID.
std::vector< std::pair< ULong64_t, ULong64_t > > GetEntryRanges() final
Return ranges of entries to distribute to tasks.
void ExecStaging()
The main function of the fThreadStaging background thread.
std::vector< std::unique_ptr< ROOT::Internal::RPageSource > > fStagingArea
The staging area is relevant for chains of files, i.e.
std::unique_ptr< ROOT::Detail::RDF::RColumnReaderBase > GetColumnReaders(unsigned int, std::string_view, const std::type_info &) final
If the other GetColumnReaders overload returns an empty vector, this overload will be called instead.
std::vector< std::unique_ptr< ROOT::RFieldBase > > fProtoFields
We prepare a prototype field for every column.
void SetNSlots(unsigned int nSlots) final
Inform RDataSource of the number of processing slots (i.e.
ROOT::RFieldBase * GetFieldWithTypeChecks(std::string_view fieldName, const std::type_info &tid)
std::vector< std::string > fFileNames
Definition RNTupleDS.hxx:93
void InitSlot(unsigned int slot, ULong64_t firstEntry) final
Convenience method called at the start of the data processing associated to a slot.
RNTupleDS(std::unique_ptr< ROOT::Internal::RPageSource > pageSource)
std::string GetTypeName(std::string_view colName) const final
Type of a column as a string, e.g.
std::unordered_map< ROOT::DescriptorId_t, std::string > fFieldId2QualifiedName
Connects the IDs of active proto fields and their subfields to their fully qualified name (a....
std::string fNTupleName
The data source may be constructed with an ntuple name and a list of files.
Definition RNTupleDS.hxx:92
void PrepareNextRanges()
Populates fNextRanges with the next set of entry ranges.
void StageNextSources()
Starting from fNextFileIndex, opens the next fNSlots files.
void Finalize() final
Convenience method called after concluding an event-loop.
std::vector< std::string > fColumnTypes
void Initialize() final
Convenience method called before starting an event-loop.
std::vector< std::string > fColumnNames
bool HasColumn(std::string_view colName) const final
Checks if the dataset has a certain column.
Record_t GetColumnReadersImpl(std::string_view name, const std::type_info &) final
type-erased vector of pointers to pointers to column values - one per slot
void FinalizeSlot(unsigned int slot) final
Convenience method called at the end of the data processing associated to a slot.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
ROOT's RDataFrame offers a modern, high-level interface for analysis of data stored in TTree ,...
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
The list of column representations a field can have.
A field translates read and write calls from/to underlying columns to/from tree values.
ROOT::Internal::RColumn * fPrincipalColumn
All fields that have columns have a distinct main column.
RConstSchemaIterator cbegin() const
const std::string & GetFieldName() const
static RResult< std::unique_ptr< RFieldBase > > Create(const std::string &fieldName, const std::string &typeName, const ROOT::RCreateFieldOptions &options, const ROOT::RNTupleDescriptor *desc, ROOT::DescriptorId_t fieldId)
Factory method to resurrect a field from the stored on-disk type information.
ROOT::DescriptorId_t GetOnDiskId() const
std::unique_ptr< RFieldBase > Clone(std::string_view newName) const
Copies the field and its subfields using a possibly new name and a new, unconnected set of columns.
The on-storage metadata of an RNTuple.
RFieldDescriptorIterable GetFieldIterable(const RFieldDescriptor &fieldDesc) const
const RFieldDescriptor & GetFieldDescriptor(ROOT::DescriptorId_t fieldId) const
ROOT::DescriptorId_t FindFieldId(std::string_view fieldName, ROOT::DescriptorId_t parentId) const
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Common user-tunable settings for reading RNTuples.
const_iterator begin() const
const_iterator end() const
virtual const char * Getenv(const char *env)
Get environment variable.
Definition TSystem.cxx:1676
std::string TypeID2TypeName(const std::type_info &id)
Returns the name of a type starting from its type_info An empty string is returned in case of failure...
Definition RDFUtils.cxx:178
ROOT::RDataFrame FromRNTuple(std::string_view ntupleName, const std::vector< std::string > &fileNames, const std::pair< ULong64_t, ULong64_t > &range)
Internal overload of the function that allows passing a range of entries.
std::pair< std::vector< ROOT::Internal::RNTupleClusterBoundaries >, ROOT::NTupleSize_t > GetClustersAndEntries(std::string_view ntupleName, std::string_view location)
Retrieves the cluster boundaries and the number of entries for the input RNTuple.
void CallConnectPageSourceOnField(RFieldBase &, ROOT::Internal::RPageSource &)
std::vector< ROOT::Internal::RNTupleClusterBoundaries > GetClusterBoundaries(const RNTupleDescriptor &desc)
Return the cluster boundaries for each cluster in this RNTuple.
std::string GetRenormalizedTypeName(const std::string &metaNormalizedName)
Given a type name normalized by ROOT meta, renormalize it for RNTuple. E.g., insert std::prefix.
RDataFrame FromRNTuple(std::string_view ntupleName, std::string_view fileName)
std::vector< std::string > ColumnNames_t
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
ENTupleStructure
The fields in the RNTuple data model tree can carry different structural information about the type s...
Tag to let data sources use the native data type when creating a column reader.
Definition Utils.hxx:344
The PrepareNextRanges() method populates the fNextRanges list with REntryRangeDS records.
Definition RNTupleDS.hxx:80