Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleDS.cxx
Go to the documentation of this file.
1/// \file RNTupleDS.cxx
2/// \author Jakob Blomer <jblomer@cern.ch>
3/// \author Enrico Guiraud <enrico.guiraud@cern.ch>
4/// \date 2018-10-04
5
6/*************************************************************************
7 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
8 * All rights reserved. *
9 * *
10 * For the licensing terms see $ROOTSYS/LICENSE. *
11 * For the list of contributors see $ROOTSYS/README/CREDITS. *
12 *************************************************************************/
13
15#include <ROOT/RDataFrame.hxx>
16#include <ROOT/RDF/Utils.hxx>
17#include <ROOT/RField.hxx>
18#include <ROOT/RFieldUtils.hxx>
21#include <ROOT/RNTupleDS.hxx>
22#include <ROOT/RNTupleTypes.hxx>
23#include <ROOT/RPageStorage.hxx>
24#include <string_view>
25
26#include <TError.h>
27#include <TSystem.h>
28
29#include <cassert>
30#include <memory>
31#include <mutex>
32#include <string>
33#include <vector>
34#include <typeinfo>
35#include <utility>
36
37// clang-format off
38/**
39* \class ROOT::RDF::RNTupleDS
40* \ingroup dataframe
41* \brief The RDataSource implementation for RNTuple. It lets RDataFrame read RNTuple data.
42*
43* An RDataFrame that reads RNTuple data can be constructed using FromRNTuple().
44*
45* For each column containing an array or a collection, a corresponding column `#colname` is available to access
46* `colname.size()` without reading and deserializing the collection values.
47*
48**/
49// clang-format on
50
51namespace ROOT::Internal::RDF {
52/// An artificial field that transforms an RNTuple column that contains the offset of collections into
53/// collection sizes. It is used to provide the "number of" RDF columns for collections, e.g.
54/// `R_rdf_sizeof_jets` for a collection named `jets`.
55///
56/// This field owns the collection offset field but instead of exposing the collection offsets it exposes
57/// the collection sizes (offset(N+1) - offset(N)). For the time being, we offer this functionality only in RDataFrame.
58/// TODO(jblomer): consider providing a general set of useful virtual fields as part of RNTuple.
60protected:
61 std::unique_ptr<ROOT::RFieldBase> CloneImpl(std::string_view /* newName */) const final
62 {
63 return std::make_unique<RRDFCardinalityField>();
64 }
65 void ConstructValue(void *where) const final { *static_cast<std::size_t *>(where) = 0; }
66
67public:
68 RRDFCardinalityField() : ROOT::RFieldBase("", "std::size_t", ROOT::ENTupleStructure::kPlain, false /* isSimple */) {}
71 ~RRDFCardinalityField() override = default;
72
82 // Field is only used for reading
83 void GenerateColumns() final { throw RException(R__FAIL("Cardinality fields must only be used for reading")); }
88
89 size_t GetValueSize() const final { return sizeof(std::size_t); }
90 size_t GetAlignment() const final { return alignof(std::size_t); }
91
92 /// Get the number of elements of the collection identified by globalIndex
100
101 /// Get the number of elements of the collection identified by clusterIndex
109};
110
111/**
112 * @brief An artificial field that provides the size of a fixed-size array
113 *
114 * This is the implementation of `R_rdf_sizeof_column` in case `column` contains
115 * fixed-size arrays on disk.
116 */
118private:
119 std::size_t fArrayLength;
120
121 std::unique_ptr<ROOT::RFieldBase> CloneImpl(std::string_view) const final
122 {
123 return std::make_unique<RArraySizeField>(fArrayLength);
124 }
125 void GenerateColumns() final { throw RException(R__FAIL("RArraySizeField fields must only be used for reading")); }
127 void ReadGlobalImpl(ROOT::NTupleSize_t /*globalIndex*/, void *to) final
128 {
129 *static_cast<std::size_t *>(to) = fArrayLength;
130 }
131 void ReadInClusterImpl(RNTupleLocalIndex /*localIndex*/, void *to) final
132 {
133 *static_cast<std::size_t *>(to) = fArrayLength;
134 }
135
136public:
138 : ROOT::RFieldBase("", "std::size_t", ROOT::ENTupleStructure::kPlain, false /* isSimple */),
140 {
141 }
147
148 void ConstructValue(void *where) const final { *static_cast<std::size_t *>(where) = 0; }
149 std::size_t GetValueSize() const final { return sizeof(std::size_t); }
150 std::size_t GetAlignment() const final { return alignof(std::size_t); }
151};
152
153/// Every RDF column is represented by exactly one RNTuple field
157
158 RNTupleDS *fDataSource; ///< The data source that owns this column reader
159 RFieldBase *fProtoField; ///< The prototype field from which fField is cloned
160 std::unique_ptr<RFieldBase> fField; ///< The field backing the RDF column
161 std::unique_ptr<RFieldBase::RValue> fValue; ///< The memory location used to read from fField
162 std::shared_ptr<void> fValuePtr; ///< Used to reuse the object created by fValue when reconnecting sources
163 Long64_t fLastEntry = -1; ///< Last entry number that was read
164 /// For chains, the logical entry and the physical entry in any particular file can be different.
165 /// The entry offset stores the logical entry number (sum of all previous physical entries) when a file of the
166 /// corresponding data source was opened.
168
169public:
171 ~RNTupleColumnReader() override = default;
172
173 /// Connect the field and its subfields to the page source
175 {
176 assert(fLastEntry == -1);
177
179
180 // Create a new, real field from the prototype and set its field ID in the context of the given page source
182 {
183 auto descGuard = source.GetSharedDescriptorGuard();
184 // Set the on-disk field IDs for the field and the subfield
185 fField->SetOnDiskId(
187 auto iProto = fProtoField->cbegin();
188 auto iReal = fField->begin();
189 for (; iReal != fField->end(); ++iProto, ++iReal) {
190 iReal->SetOnDiskId(descGuard->FindFieldId(fDataSource->fFieldId2QualifiedName.at(iProto->GetOnDiskId())));
191 }
192 }
193
194 try {
196 } catch (const ROOT::RException &err) {
197 auto onDiskType = source.GetSharedDescriptorGuard()->GetFieldDescriptor(fField->GetOnDiskId()).GetTypeName();
198 std::string msg = "RNTupleDS: invalid type \"" + fField->GetTypeName() + "\" for column \"" +
199 fDataSource->fFieldId2QualifiedName[fField->GetOnDiskId()] + "\" with on-disk type \"" +
200 onDiskType + "\"";
201 throw std::runtime_error(msg);
202 }
203
204 if (fValuePtr) {
205 // When the reader reconnects to a new file, the fValuePtr is already set
206 fValue = std::make_unique<RFieldBase::RValue>(fField->BindValue(fValuePtr));
207 fValuePtr = nullptr;
208 } else {
209 // For the first file, create a new object for this field (reader)
210 fValue = std::make_unique<RFieldBase::RValue>(fField->CreateValue());
211 }
212 }
213
215 {
216 if (fValue && keepValue) {
217 fValuePtr = fValue->GetPtr<void>();
218 }
219 fValue = nullptr;
220 fField = nullptr;
221 fLastEntry = -1;
222 }
223
224 void *GetImpl(Long64_t entry) final
225 {
226 if (entry != fLastEntry) {
227 fValue->Read(entry - fEntryOffset);
229 }
230 return fValue->GetPtr<void>().get();
231 }
232};
233} // namespace ROOT::Internal::RDF
234
236
238 ROOT::DescriptorId_t fieldId, std::vector<RNTupleDS::RFieldInfo> fieldInfos,
239 bool convertToRVec)
240{
241 // As an example for the mapping of RNTuple fields to RDF columns, let's consider an RNTuple
242 // using the following types and with a top-level field named "event" of type Event:
243 //
244 // struct Event {
245 // int id;
246 // std::vector<Track> tracks;
247 // };
248 // struct Track {
249 // std::vector<Hit> hits;
250 // };
251 // struct Hit {
252 // float x;
253 // float y;
254 // };
255 //
256 // AddField() will be called from the constructor with the RNTuple root field (ENTupleStructure::kRecord).
257 // From there, we recurse into the "event" sub field (also ENTupleStructure::kRecord) and further down the
258 // tree of sub fields and expose the following RDF columns:
259 //
260 // "event" [Event]
261 // "event.id" [int]
262 // "event.tracks" [RVec<Track>]
263 // "R_rdf_sizeof_event.tracks" [unsigned int]
264 // "event.tracks.hits" [RVec<RVec<Hit>>]
265 // "R_rdf_sizeof_event.tracks.hits" [RVec<unsigned int>]
266 // "event.tracks.hits.x" [RVec<RVec<float>>]
267 // "R_rdf_sizeof_event.tracks.hits.x" [RVec<unsigned int>]
268 // "event.tracks.hits.y" [RVec<RVec<float>>]
269 // "R_rdf_sizeof_event.tracks.hits.y" [RVec<unsigned int>]
270
271 const auto &fieldDesc = desc.GetFieldDescriptor(fieldId);
272 const auto &nRepetitions = fieldDesc.GetNRepetitions();
273 if ((fieldDesc.GetStructure() == ROOT::ENTupleStructure::kCollection) || (nRepetitions > 0)) {
274 // The field is a collection or a fixed-size array.
275 // We open a new collection scope with fieldID being the inner most collection. E.g. for "event.tracks.hits",
276 // fieldInfos would already contain the fieldID of "event.tracks"
277 fieldInfos.emplace_back(fieldId, nRepetitions);
278 }
279
280 if (fieldDesc.GetStructure() == ROOT::ENTupleStructure::kCollection) {
281 // Inner fields of collections are provided as projected collections of only that inner field,
282 // E.g. we provide a projected collection RVec<RVec<float>> for "event.tracks.hits.x" in the example
283 // above.
285 convertToRVec && (fieldDesc.GetTypeName().substr(0, 19) == "ROOT::VecOps::RVec<" ||
286 fieldDesc.GetTypeName().substr(0, 12) == "std::vector<" || fieldDesc.GetTypeName() == "");
287 const auto &f = *desc.GetFieldIterable(fieldDesc.GetId()).begin();
289
290 // Note that at the end of the recursion, we handled the inner sub collections as well as the
291 // collection as whole, so we are done.
292 return;
293
294 } else if (nRepetitions > 0) {
295 // Fixed-size array, same logic as ROOT::RVec.
296 const auto &f = *desc.GetFieldIterable(fieldDesc.GetId()).begin();
297 AddField(desc, colName, f.GetId(), fieldInfos);
298 return;
299 } else if (fieldDesc.GetStructure() == ROOT::ENTupleStructure::kRecord) {
300 // Inner fields of records are provided as individual RDF columns, e.g. "event.id"
301 for (const auto &f : desc.GetFieldIterable(fieldDesc.GetId())) {
302 auto innerName = colName.empty() ? f.GetFieldName() : (std::string(colName) + "." + f.GetFieldName());
303 // Inner fields of collections of records are always exposed as ROOT::RVec
304 AddField(desc, innerName, f.GetId(), fieldInfos);
305 }
306 }
307
308 // The fieldID could be the root field or the class of fieldId might not be loaded.
309 // In these cases, only the inner fields are exposed as RDF columns.
310 auto fieldOrException = ROOT::RFieldBase::Create(fieldDesc.GetFieldName(), fieldDesc.GetTypeName());
311 if (!fieldOrException)
312 return;
313 auto valueField = fieldOrException.Unwrap();
314 valueField->SetOnDiskId(fieldId);
315 for (auto &f : *valueField) {
316 f.SetOnDiskId(desc.FindFieldId(f.GetFieldName(), f.GetParent()->GetOnDiskId()));
317 }
318 std::unique_ptr<ROOT::RFieldBase> cardinalityField;
319 // Collections get the additional "number of" RDF column (e.g. "R_rdf_sizeof_tracks")
320 if (!fieldInfos.empty()) {
321 const auto &info = fieldInfos.back();
322 if (info.fNRepetitions > 0) {
323 cardinalityField = std::make_unique<ROOT::Internal::RDF::RArraySizeField>(info.fNRepetitions);
324 } else {
325 cardinalityField = std::make_unique<ROOT::Internal::RDF::RRDFCardinalityField>();
326 }
327 cardinalityField->SetOnDiskId(info.fFieldId);
328 }
329
330 for (auto i = fieldInfos.rbegin(); i != fieldInfos.rend(); ++i) {
331 const auto &fieldInfo = *i;
332
333 if (fieldInfo.fNRepetitions > 0) {
334 // Fixed-size array, read it as ROOT::RVec in memory
335 valueField = std::make_unique<ROOT::RArrayAsRVecField>("", std::move(valueField), fieldInfo.fNRepetitions);
336 } else {
337 // Actual collection. A std::vector or ROOT::RVec gets added as a ROOT::RVec. All other collection types keep
338 // their original type.
339 if (convertToRVec) {
340 valueField = std::make_unique<ROOT::RRVecField>("", std::move(valueField));
341 } else {
342 auto outerFieldType = desc.GetFieldDescriptor(fieldInfo.fFieldId).GetTypeName();
344 }
345 }
346
347 valueField->SetOnDiskId(fieldInfo.fFieldId);
348
349 // Skip the inner-most collection level to construct the cardinality column
350 // It's taken care of by the `if (!fieldInfos.empty())` scope above
351 if (i != fieldInfos.rbegin()) {
352 if (fieldInfo.fNRepetitions > 0) {
353 // This collection level refers to a fixed-size array
355 std::make_unique<ROOT::RArrayAsRVecField>("", std::move(cardinalityField), fieldInfo.fNRepetitions);
356 } else {
357 // This collection level refers to an RVec
358 cardinalityField = std::make_unique<ROOT::RRVecField>("", std::move(cardinalityField));
359 }
360
361 cardinalityField->SetOnDiskId(fieldInfo.fFieldId);
362 }
363 }
364
365 if (cardinalityField) {
366 fColumnNames.emplace_back("R_rdf_sizeof_" + std::string(colName));
367 fColumnTypes.emplace_back(cardinalityField->GetTypeName());
368 fProtoFields.emplace_back(std::move(cardinalityField));
369 }
370
371 fieldInfos.emplace_back(fieldId, nRepetitions);
372 fColumnNames.emplace_back(colName);
373 fColumnTypes.emplace_back(valueField->GetTypeName());
374 fProtoFields.emplace_back(std::move(valueField));
375}
376
377ROOT::RDF::RNTupleDS::RNTupleDS(std::unique_ptr<ROOT::Internal::RPageSource> pageSource)
378{
379 pageSource->Attach();
380 fPrincipalDescriptor = pageSource->GetSharedDescriptorGuard()->Clone();
381 fStagingArea.emplace_back(std::move(pageSource));
382
383 AddField(fPrincipalDescriptor, "", fPrincipalDescriptor.GetFieldZeroId(),
384 std::vector<ROOT::RDF::RNTupleDS::RFieldInfo>());
385}
386
387namespace {
388
390{
391 // The setting is for now a global one, must be decided before running the
392 // program by setting the appropriate environment variable. Make sure that
393 // option configuration is thread-safe and happens only once.
395 static std::once_flag flag;
396 std::call_once(flag, []() {
397 if (auto env = gSystem->Getenv("ROOT_RNTUPLE_CLUSTERBUNCHSIZE"); env != nullptr && strlen(env) > 0) {
398 std::string envStr{env};
399 auto envNum{std::stoul(envStr)};
400 envNum = envNum == 0 ? 1 : envNum;
402 }
403 });
404 return opts;
405}
406
407std::unique_ptr<ROOT::Internal::RPageSource> CreatePageSource(std::string_view ntupleName, std::string_view fileName)
408{
410}
411} // namespace
412
413ROOT::RDF::RNTupleDS::RNTupleDS(std::string_view ntupleName, std::string_view fileName)
414 : RNTupleDS(CreatePageSource(ntupleName, fileName))
415{
416 fFileNames = std::vector<std::string>{std::string{fileName}};
417}
418
419ROOT::RDF::RNTupleDS::RNTupleDS(std::string_view ntupleName, const std::vector<std::string> &fileNames)
420 : RNTupleDS(CreatePageSource(ntupleName, fileNames[0]))
421{
424 fStagingArea.resize(fFileNames.size());
425}
426
427ROOT::RDF::RNTupleDS::RNTupleDS(std::string_view ntupleName, const std::vector<std::string> &fileNames,
428 const std::pair<ULong64_t, ULong64_t> &range)
430{
432}
433
435ROOT::RDF::RNTupleDS::GetColumnReadersImpl(std::string_view /* name */, const std::type_info & /* ti */)
436{
437 // This datasource uses the newer GetColumnReaders() API
438 return {};
439}
440
442{
443 // At this point we can assume that `name` will be found in fColumnNames
444 const auto index =
445 std::distance(fColumnNames.begin(), std::find(fColumnNames.begin(), fColumnNames.end(), fieldName));
446
447 // A reader was requested but we don't have RTTI for it, this is encoded with the tag UseNativeDataType. We can just
448 // return the available protofield
450 return fProtoFields[index].get();
451 }
452
453 // The user explicitly requested a type
455
456 // If the field corresponding to the provided name is not a cardinality column and the requested type is different
457 // from the proto field that was created when the data source was constructed, we first have to create an
458 // alternative proto field for the column reader. Otherwise, we can directly use the existing proto field.
459 if (fieldName.substr(0, 13) != "R_rdf_sizeof_" && requestedType != fColumnTypes[index]) {
460 auto &altProtoFields = fAlternativeProtoFields[index];
461
462 // If we can find the requested type in the registered alternative protofields, return the corresponding field
463 if (auto altProtoField = std::find_if(altProtoFields.begin(), altProtoFields.end(),
464 [&requestedType](const std::unique_ptr<ROOT::RFieldBase> &fld) {
465 return fld->GetTypeName() == requestedType;
466 });
468 return altProtoField->get();
469 }
470
471 // Otherwise, create a new protofield and register it in the alternatives before returning
474 throw std::runtime_error("RNTupleDS: Could not create field with type \"" + requestedType +
475 "\" for column \"" + std::string(fieldName) + "\"");
476 }
478 newAltProtoField->SetOnDiskId(fProtoFields[index]->GetOnDiskId());
479 auto *newField = newAltProtoField.get();
480 altProtoFields.emplace_back(std::move(newAltProtoField));
481 return newField;
482 }
483
484 // General case: there was a correspondence between the user-requested type and the corresponding column type
485 return fProtoFields[index].get();
486}
487
488std::unique_ptr<ROOT::Detail::RDF::RColumnReaderBase>
489ROOT::RDF::RNTupleDS::GetColumnReaders(unsigned int slot, std::string_view name, const std::type_info &tid)
490{
491 ROOT::RFieldBase *field = GetFieldWithTypeChecks(name, tid);
492 assert(field != nullptr);
493
494 // Map the field's and subfields' IDs to qualified names so that we can later connect the fields to
495 // other page sources from the chain
496 fFieldId2QualifiedName[field->GetOnDiskId()] = fPrincipalDescriptor.GetQualifiedFieldName(field->GetOnDiskId());
497 for (const auto &s : *field) {
498 fFieldId2QualifiedName[s.GetOnDiskId()] = fPrincipalDescriptor.GetQualifiedFieldName(s.GetOnDiskId());
499 }
500
501 auto reader = std::make_unique<ROOT::Internal::RDF::RNTupleColumnReader>(this, field);
502 fActiveColumnReaders[slot].emplace_back(reader.get());
503
504 return reader;
505}
506
508{
509 while (true) {
510 std::unique_lock lock(fMutexStaging);
511 fCvStaging.wait(lock, [this] { return fIsReadyForStaging || fStagingThreadShouldTerminate; });
512 if (fStagingThreadShouldTerminate)
513 return;
514
515 assert(!fHasNextSources);
516 StageNextSources();
517 fHasNextSources = true;
518 fIsReadyForStaging = false;
519
520 lock.unlock();
521 fCvStaging.notify_one();
522 }
523}
524
526{
527 const auto nFiles = fFileNames.empty() ? 1 : fFileNames.size();
528
529 for (auto i = fNextFileIndex; (i < nFiles) && ((i - fNextFileIndex) < fNSlots); ++i) {
530
531 if (fStagingThreadShouldTerminate)
532 return;
533
534 if (fStagingArea[i]) {
535 // The first file is already open and was used to read the schema
536 assert(i == 0);
537 } else {
538 fStagingArea[i] = CreatePageSource(fNTupleName, fFileNames[i]);
539 fStagingArea[i]->LoadStructure();
540 }
541 }
542}
543
545{
546 assert(fNextRanges.empty());
547
548 auto nFiles = fFileNames.empty() ? 1 : fFileNames.size();
549 auto nRemainingFiles = nFiles - fNextFileIndex;
550
551 if (nRemainingFiles == 0)
552 return;
553
554 // Easy work scheduling: one file per slot. We skip empty files (files without entries).
555
556 if ((nRemainingFiles >= fNSlots) || (fGlobalEntryRange.has_value())) {
557 while ((fNextRanges.size() < fNSlots) && (fNextFileIndex < nFiles)) {
559
560 std::swap(fStagingArea[fNextFileIndex], range.fSource);
561
562 if (!range.fSource) {
563 // Typically, the prestaged source should have been present. Only if some of the files are empty, we need
564 // to open and attach files here.
565 range.fSource = CreatePageSource(fNTupleName, fFileNames[fNextFileIndex]);
566 }
567 range.fFileName = fFileNames[fNextFileIndex];
568 range.fSource->Attach();
569 fNextFileIndex++;
570 auto nEntries = range.fSource->GetNEntries();
571 if (nEntries == 0)
572 continue;
573 range.fLastEntry = nEntries; // whole file per slot, i.e. entry range [0..nEntries - 1]
574
575 fNextRanges.emplace_back(std::move(range));
576 }
577 return;
578 }
579
580 // Work scheduling of the tail: multiple slots work on the same file.
581 // Every slot still has its own page source but these page sources may open the same file.
582 // Again, we need to skip empty files.
583 unsigned int nSlotsPerFile = fNSlots / nRemainingFiles;
584 for (std::size_t i = 0; (fNextRanges.size() < fNSlots) && (fNextFileIndex < nFiles); ++i) {
585 std::unique_ptr<ROOT::Internal::RPageSource> source;
586 // Need to look for the file name to populate the sample info later
587 const auto &sourceFileName = fFileNames[fNextFileIndex];
588 std::swap(fStagingArea[fNextFileIndex], source);
589 if (!source) {
590 // Empty files trigger this condition
591 source = CreatePageSource(fNTupleName, fFileNames[fNextFileIndex]);
592 }
593 source->Attach();
594 fNextFileIndex++;
595
596 auto nEntries = source->GetNEntries();
597 if (nEntries == 0)
598 continue;
599
600 // If last file: use all remaining slots
601 if (i == (nRemainingFiles - 1))
602 nSlotsPerFile = fNSlots - fNextRanges.size();
603
604 const auto rangesByCluster = [&source]() {
605 // Take the shared lock of the descriptor just for the time necessary
606 const auto descGuard = source->GetSharedDescriptorGuard();
608 }();
609
610 const unsigned int nRangesByCluster = rangesByCluster.size();
611
612 // Distribute slots equidistantly over the entry range, aligned on cluster boundaries
614 const auto remainder = nRangesByCluster % nSlotsPerFile;
615 std::size_t iRange = 0;
616 unsigned int iSlot = 0;
617 const unsigned int N = std::min(nSlotsPerFile, nRangesByCluster);
618 for (; iSlot < N; ++iSlot) {
619 auto start = rangesByCluster[iRange].fFirstEntry;
620 iRange += nClustersPerSlot + static_cast<int>(iSlot < remainder);
621 assert(iRange > 0);
622 auto end = rangesByCluster[iRange - 1].fLastEntryPlusOne;
623
625 range.fFileName = sourceFileName;
626 // The last range for this file just takes the already opened page source. All previous ranges clone.
627 if (iSlot == N - 1) {
628 range.fSource = std::move(source);
629 } else {
630 range.fSource = source->Clone();
631 }
632 range.fSource->SetEntryRange({start, end - start});
633 range.fFirstEntry = start;
634 range.fLastEntry = end;
635 fNextRanges.emplace_back(std::move(range));
636 }
637 } // loop over tail of remaining files
638}
639
640std::vector<std::pair<ULong64_t, ULong64_t>> ROOT::RDF::RNTupleDS::GetEntryRanges()
641{
642 std::vector<std::pair<ULong64_t, ULong64_t>> ranges;
643
644 // We need to distinguish between single threaded and multi-threaded runs.
645 // In single threaded mode, InitSlot is only called once and column readers have to be rewired
646 // to new page sources of the chain in GetEntryRanges. In multi-threaded mode, on the other hand,
647 // InitSlot is called for every returned range, thus rewiring the column readers takes place in
648 // InitSlot and FinalizeSlot.
649
650 if (fNSlots == 1) {
651 for (auto r : fActiveColumnReaders[0]) {
652 r->Disconnect(true /* keepValue */);
653 }
654 }
655
656 // If we have fewer files than slots and we run multiple event loops, we can reuse fCurrentRanges and don't need
657 // to worry about loading the fNextRanges. I.e., in this case we don't enter the if block.
658 if (fCurrentRanges.empty() || fSeenEntriesNoGlobalRange > 0) {
659 // Otherwise, i.e. start of the first event loop or in the middle of the event loop, prepare the next ranges
660 // and swap with the current ones.
661 {
662 std::unique_lock lock(fMutexStaging);
663 fCvStaging.wait(lock, [this] { return fHasNextSources; });
664 }
665 PrepareNextRanges();
666 if (fNextRanges.empty()) {
667 // No more data
668 return ranges;
669 }
670
671 assert(fNextRanges.size() <= fNSlots);
672
673 fCurrentRanges.clear();
674 std::swap(fCurrentRanges, fNextRanges);
675 }
676
677 // Stage next batch of files for the next call to GetEntryRanges()
678 {
679 std::lock_guard _(fMutexStaging);
680 fIsReadyForStaging = true;
681 fHasNextSources = false;
682 }
683 fCvStaging.notify_one();
684
685 // Create ranges for the RDF loop manager from the list of REntryRangeDS records.
686 // The entry ranges that are relative to the page source in REntryRangeDS are translated into absolute
687 // entry ranges, given the current state of the entry cursor.
688 // We remember the connection from first absolute entry index of a range to its REntryRangeDS record
689 // so that we can properly rewire the column reader in InitSlot
690 fFirstEntry2RangeIdx.clear();
691 fOriginalRanges.clear();
692
694
695 for (std::size_t i = 0; i < fCurrentRanges.size(); ++i) {
696
697 // Several consecutive ranges may operate on the same file (each with their own page source clone).
698 // We can detect a change of file when the first entry number jumps back to 0.
699 if (fCurrentRanges[i].fFirstEntry == 0) {
700 // New source
701 fSeenEntriesNoGlobalRange += nEntriesPerSource;
703 }
704
705 auto start = fCurrentRanges[i].fFirstEntry + fSeenEntriesNoGlobalRange;
706 auto end = fCurrentRanges[i].fLastEntry + fSeenEntriesNoGlobalRange;
707
708 nEntriesPerSource += end - start;
709
710 if (fGlobalEntryRange.has_value()) {
711
712 // We need to consider different scenarios for when we have GlobalRanges set by the user.
713 // Consider a simple case of 3 files, with original ranges set as (consecutive entries of 3 files):
714 // [0, 20], [20, 45], [45, 65]
715 // we will now see what happens in each of the scenarios below when GlobalRanges can be set to different
716 // values:
717 // a) [2, 5] - we stay in file 1
718 // - hence we will use the 1st case and get the range [2,5], in this case we also need to quit further
719 // processing from the other files by entering case 3
720 // b) [2, 21] - we start in file 1 and finish in file 2
721 // - use the 2nd case first, as 21 > 20 (end of first file), then we will go to case 1, resulting in ranges:
722 // [2, 20], [20, 21], c) [21 - 40] - we skip file 1, start in file 2 and stay in file 2
723 // - to skip the first file, we use the 4th case, followed by the 1st case, resulting range is: [21, 40]
724 // d) [21 - 65] - we skip file 1, start in file 2 and continue to file 3
725 // - to skip the first file, we use the 4th case, we continue with the 2nd case, and use the 1st case at the
726 // end, resulting ranges are [21, 45], [45, 65]
727 // The first case
728 if (fGlobalEntryRange->first >= start && fGlobalEntryRange->second <= end) {
729 fOriginalRanges.emplace_back(start, end);
730 fFirstEntry2RangeIdx[fGlobalEntryRange->first] = i;
731 ranges.emplace_back(fGlobalEntryRange->first, fGlobalEntryRange->second);
732 }
733
734 // The second case:
735 // The `fGlobalEntryRange->first < end` condition is to distinguish this case from the 4th case.
736 else if (fGlobalEntryRange->second > end && fGlobalEntryRange->first < end) {
737 fOriginalRanges.emplace_back(start, end);
738 fFirstEntry2RangeIdx[fGlobalEntryRange->first] = i;
739 ranges.emplace_back(fGlobalEntryRange->first, end);
740 std::optional<std::pair<ULong64_t, ULong64_t>> newvalues({end, fGlobalEntryRange->second});
741 fGlobalEntryRange.swap(newvalues);
742 }
743 // The third case, needed to correctly quit processing if we only stay in the first file
744 else if (fGlobalEntryRange->second < start) {
745 return ranges;
746 }
747
748 // The fourth case:
749 else if (fGlobalEntryRange->first >= end) {
750 fOriginalRanges.emplace_back(start, end);
751 fFirstEntry2RangeIdx[start] = i;
752 ranges.emplace_back(start, start);
753 }
754 }
755
756 else {
757 fFirstEntry2RangeIdx[start] = i;
758 fOriginalRanges.emplace_back(start, end);
759 ranges.emplace_back(start, end);
760 }
761 }
762
763 fSeenEntriesNoGlobalRange += nEntriesPerSource;
764
765 if ((fNSlots == 1) && (fCurrentRanges[0].fSource)) {
766 for (auto r : fActiveColumnReaders[0]) {
767 r->Connect(*fCurrentRanges[0].fSource, fOriginalRanges[0].first);
768 }
769 }
770
771 return ranges;
772}
773
775{
776 if (fNSlots == 1) {
777 // Ensure the connection between slot and range is valid also in single-thread mode
778 fSlotsToRangeIdxs[0] = 0;
779 return;
780 }
781
782 // The same slot ID could be picked multiple times in the same execution, thus
783 // ending up processing different page sources. Here we re-establish the
784 // connection between the slot and the correct page source by finding which
785 // range index corresponds to the first entry passed.
786 auto idxRange = fFirstEntry2RangeIdx.at(firstEntry);
787
788 // We also remember this connection so it can later be retrieved in CreateSampleInfo
789 fSlotsToRangeIdxs[slot * ROOT::Internal::RDF::CacheLineStep<std::size_t>()] = idxRange;
790
791 for (auto r : fActiveColumnReaders[slot]) {
792 r->Connect(*fCurrentRanges[idxRange].fSource,
793 fOriginalRanges[idxRange].first - fCurrentRanges[idxRange].fFirstEntry);
794 }
795}
796
798{
799 if (fNSlots == 1)
800 return;
801
802 for (auto r : fActiveColumnReaders[slot]) {
803 r->Disconnect(true /* keepValue */);
804 }
805}
806
807std::string ROOT::RDF::RNTupleDS::GetTypeName(std::string_view colName) const
808{
809 auto colNamePos = std::find(fColumnNames.begin(), fColumnNames.end(), colName);
810
811 if (colNamePos == fColumnNames.end()) {
812 auto msg = std::string("RNTupleDS: There is no column with name \"") + std::string(colName) + "\"";
813 throw std::runtime_error(msg);
814 }
815
816 const auto index = std::distance(fColumnNames.begin(), colNamePos);
817 return fColumnTypes[index];
818}
819
820bool ROOT::RDF::RNTupleDS::HasColumn(std::string_view colName) const
821{
822 return std::find(fColumnNames.begin(), fColumnNames.end(), colName) != fColumnNames.end();
823}
824
826{
827 fSeenEntriesNoGlobalRange = 0;
828 fNextFileIndex = 0;
829 fIsReadyForStaging = fHasNextSources = fStagingThreadShouldTerminate = false;
830 fThreadStaging = std::thread(&RNTupleDS::ExecStaging, this);
831 assert(fNextRanges.empty());
832
833 if (fCurrentRanges.empty() || (fFileNames.size() > fNSlots)) {
834 // First event loop or large number of files: start the staging process.
835 {
836 std::lock_guard _(fMutexStaging);
837 fIsReadyForStaging = true;
838 }
839 fCvStaging.notify_one();
840 } else {
841 // Otherwise, we will reuse fCurrentRanges. Make sure that staging and preparing next ranges will be a noop
842 // (already at the end of the list of files).
843 fNextFileIndex = std::max(fFileNames.size(), std::size_t(1));
844 }
845}
846
848{
849 for (unsigned int i = 0; i < fNSlots; ++i) {
850 for (auto r : fActiveColumnReaders[i]) {
851 r->Disconnect(false /* keepValue */);
852 }
853 }
854 {
855 std::lock_guard _(fMutexStaging);
856 fStagingThreadShouldTerminate = true;
857 }
858 fCvStaging.notify_one();
859 fThreadStaging.join();
860 // If we have a chain with more files than the number of slots, the files opened at the end of the
861 // event loop won't be reused when the event loop restarts, so we can close them.
862 if (fFileNames.size() > fNSlots) {
863 fCurrentRanges.clear();
864 fNextRanges.clear();
865 fStagingArea.clear();
866 fStagingArea.resize(fFileNames.size());
867 }
868}
869
871{
872 assert(fNSlots == 0);
873 assert(nSlots > 0);
874 fNSlots = nSlots;
875 fActiveColumnReaders.resize(fNSlots);
876 fSlotsToRangeIdxs.resize(fNSlots * ROOT::Internal::RDF::CacheLineStep<std::size_t>());
877}
878
879ROOT::RDataFrame ROOT::RDF::FromRNTuple(std::string_view ntupleName, std::string_view fileName)
880{
881 return ROOT::RDataFrame(std::make_unique<ROOT::RDF::RNTupleDS>(ntupleName, fileName));
882}
883
884ROOT::RDataFrame ROOT::RDF::FromRNTuple(std::string_view ntupleName, const std::vector<std::string> &fileNames)
885{
886 return ROOT::RDataFrame(std::make_unique<ROOT::RDF::RNTupleDS>(ntupleName, fileNames));
887}
888
889ROOT::RDF::RSampleInfo ROOT::Internal::RDF::RNTupleDS::CreateSampleInfo(
890 unsigned int slot, const std::unordered_map<std::string, ROOT::RDF::Experimental::RSample *> &sampleMap) const
891{
892 // The same slot ID could be picked multiple times in the same execution, thus
893 // ending up processing different page sources. Here we re-establish the
894 // connection between the slot and the correct page source by retrieving
895 // which range is connected currently to the slot
896
897 const auto &rangeIdx = fSlotsToRangeIdxs.at(slot * ROOT::Internal::RDF::CacheLineStep<std::size_t>());
898
899 // Missing source if a file does not exist
900 if (!fCurrentRanges[rangeIdx].fSource)
901 return ROOT::RDF::RSampleInfo{};
902
903 const auto &ntupleName = fCurrentRanges[rangeIdx].fSource->GetNTupleName();
904 const auto &ntuplePath = fCurrentRanges[rangeIdx].fFileName;
905 const auto ntupleID = std::string(ntuplePath) + '/' + ntupleName;
906
907 if (sampleMap.empty())
909 ntupleID, std::make_pair(fCurrentRanges[rangeIdx].fFirstEntry, fCurrentRanges[rangeIdx].fLastEntry));
910
911 if (sampleMap.find(ntupleID) == sampleMap.end())
912 throw std::runtime_error("Full sample identifier '" + ntupleID + "' cannot be found in the available samples.");
913
915 ntupleID, std::make_pair(fCurrentRanges[rangeIdx].fFirstEntry, fCurrentRanges[rangeIdx].fLastEntry),
916 sampleMap.at(ntupleID));
917}
918
921 const std::pair<ULong64_t, ULong64_t> &range)
922{
923 std::unique_ptr<ROOT::RDF::RNTupleDS> ds{new ROOT::RDF::RNTupleDS(ntupleName, fileNames, range)};
924 return ROOT::RDataFrame(std::move(ds));
925}
926
927std::pair<std::vector<ROOT::Internal::RNTupleClusterBoundaries>, ROOT::NTupleSize_t>
928ROOT::Internal::RDF::GetClustersAndEntries(std::string_view ntupleName, std::string_view location)
929{
931 source->Attach();
932 const auto descGuard = source->GetSharedDescriptorGuard();
933 return std::make_pair(ROOT::Internal::GetClusterBoundaries(descGuard.GetRef()), descGuard->GetNEntries());
934}
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:300
#define f(i)
Definition RSha256.hxx:104
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
long long Long64_t
Portable signed long integer 8 bytes.
Definition RtypesCore.h:83
unsigned long long ULong64_t
Portable unsigned long integer 8 bytes.
Definition RtypesCore.h:84
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define N
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t index
char name[80]
Definition TGX11.cxx:110
R__EXTERN TSystem * gSystem
Definition TSystem.h:572
#define _(A, B)
Definition cfortran.h:108
void GetCollectionInfo(const ROOT::NTupleSize_t globalIndex, RNTupleLocalIndex *collectionStart, ROOT::NTupleSize_t *collectionSize)
For offset columns only, look at the two adjacent values that define a collection's coordinates.
Definition RColumn.hxx:283
An artificial field that provides the size of a fixed-size array.
void GenerateColumns(const ROOT::RNTupleDescriptor &) final
Implementations in derived classes should create the backing columns corresponding to the field type ...
std::unique_ptr< ROOT::RFieldBase > CloneImpl(std::string_view) const final
Called by Clone(), which additionally copies the on-disk ID.
void ReadGlobalImpl(ROOT::NTupleSize_t, void *to) final
RArraySizeField(const RArraySizeField &other)=delete
RArraySizeField(RArraySizeField &&other)=default
RArraySizeField & operator=(RArraySizeField &&other)=default
void ReadInClusterImpl(RNTupleLocalIndex, void *to) final
std::size_t GetValueSize() const final
The number of bytes taken by a value of the appropriate type.
RArraySizeField(std::size_t arrayLength)
void ConstructValue(void *where) const final
Constructs value in a given location of size at least GetValueSize(). Called by the base class' Creat...
RArraySizeField & operator=(const RArraySizeField &other)=delete
std::size_t GetAlignment() const final
As a rule of thumb, the alignment is equal to the size of the type.
void GenerateColumns() final
Implementations in derived classes should create the backing columns corresponding to the field type ...
Every RDF column is represented by exactly one RNTuple field.
void * GetImpl(Long64_t entry) final
void Connect(RPageSource &source, Long64_t entryOffset)
Connect the field and its subfields to the page source.
RNTupleColumnReader(RNTupleDS *ds, RFieldBase *protoField)
std::unique_ptr< RFieldBase::RValue > fValue
The memory location used to read from fField.
std::unique_ptr< RFieldBase > fField
The field backing the RDF column.
Long64_t fEntryOffset
For chains, the logical entry and the physical entry in any particular file can be different.
std::shared_ptr< void > fValuePtr
Used to reuse the object created by fValue when reconnecting sources.
RNTupleDS * fDataSource
The data source that owns this column reader.
RFieldBase * fProtoField
The prototype field from which fField is cloned.
Long64_t fLastEntry
Last entry number that was read.
An artificial field that transforms an RNTuple column that contains the offset of collections into co...
Definition RNTupleDS.cxx:59
RRDFCardinalityField(RRDFCardinalityField &&other)=default
size_t GetAlignment() const final
As a rule of thumb, the alignment is equal to the size of the type.
Definition RNTupleDS.cxx:90
void GenerateColumns() final
Implementations in derived classes should create the backing columns corresponding to the field type ...
Definition RNTupleDS.cxx:83
void ReadGlobalImpl(ROOT::NTupleSize_t globalIndex, void *to) final
Get the number of elements of the collection identified by globalIndex.
Definition RNTupleDS.cxx:93
RRDFCardinalityField & operator=(RRDFCardinalityField &&other)=default
void GenerateColumns(const ROOT::RNTupleDescriptor &desc) final
Implementations in derived classes should create the backing columns corresponding to the field type ...
Definition RNTupleDS.cxx:84
const RColumnRepresentations & GetColumnRepresentations() const final
Implementations in derived classes should return a static RColumnRepresentations object.
Definition RNTupleDS.cxx:73
size_t GetValueSize() const final
The number of bytes taken by a value of the appropriate type.
Definition RNTupleDS.cxx:89
void ReadInClusterImpl(ROOT::RNTupleLocalIndex localIndex, void *to) final
Get the number of elements of the collection identified by clusterIndex.
std::unique_ptr< ROOT::RFieldBase > CloneImpl(std::string_view) const final
Called by Clone(), which additionally copies the on-disk ID.
Definition RNTupleDS.cxx:61
void ConstructValue(void *where) const final
Constructs value in a given location of size at least GetValueSize(). Called by the base class' Creat...
Definition RNTupleDS.cxx:65
static void SetClusterBunchSize(RNTupleReadOptions &options, unsigned int val)
Abstract interface to read data from an ntuple.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
std::vector< void * > Record_t
std::optional< std::pair< ULong64_t, ULong64_t > > fGlobalEntryRange
The RDataSource implementation for RNTuple.
Definition RNTupleDS.hxx:74
void AddField(const ROOT::RNTupleDescriptor &desc, std::string_view colName, ROOT::DescriptorId_t fieldId, std::vector< RFieldInfo > fieldInfos, bool convertToRVec=true)
Provides the RDF column "colName" given the field identified by fieldID.
std::vector< std::pair< ULong64_t, ULong64_t > > GetEntryRanges() final
Return ranges of entries to distribute to tasks.
void ExecStaging()
The main function of the fThreadStaging background thread.
std::vector< std::unique_ptr< ROOT::Internal::RPageSource > > fStagingArea
The staging area is relevant for chains of files, i.e.
std::unique_ptr< ROOT::Detail::RDF::RColumnReaderBase > GetColumnReaders(unsigned int, std::string_view, const std::type_info &) final
If the other GetColumnReaders overload returns an empty vector, this overload will be called instead.
std::vector< std::unique_ptr< ROOT::RFieldBase > > fProtoFields
We prepare a prototype field for every column.
void SetNSlots(unsigned int nSlots) final
Inform RDataSource of the number of processing slots (i.e.
ROOT::RFieldBase * GetFieldWithTypeChecks(std::string_view fieldName, const std::type_info &tid)
std::vector< std::string > fFileNames
Definition RNTupleDS.hxx:93
void InitSlot(unsigned int slot, ULong64_t firstEntry) final
Convenience method called at the start of the data processing associated to a slot.
RNTupleDS(std::unique_ptr< ROOT::Internal::RPageSource > pageSource)
std::string GetTypeName(std::string_view colName) const final
Type of a column as a string, e.g.
std::unordered_map< ROOT::DescriptorId_t, std::string > fFieldId2QualifiedName
Connects the IDs of active proto fields and their subfields to their fully qualified name (a....
std::string fNTupleName
The data source may be constructed with an ntuple name and a list of files.
Definition RNTupleDS.hxx:92
void PrepareNextRanges()
Populates fNextRanges with the next set of entry ranges.
void StageNextSources()
Starting from fNextFileIndex, opens the next fNSlots files.
void Finalize() final
Convenience method called after concluding an event-loop.
std::vector< std::string > fColumnTypes
void Initialize() final
Convenience method called before starting an event-loop.
std::vector< std::string > fColumnNames
bool HasColumn(std::string_view colName) const final
Checks if the dataset has a certain column.
Record_t GetColumnReadersImpl(std::string_view name, const std::type_info &) final
type-erased vector of pointers to pointers to column values - one per slot
void FinalizeSlot(unsigned int slot) final
Convenience method called at the end of the data processing associated to a slot.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
ROOT's RDataFrame offers a modern, high-level interface for analysis of data stored in TTree ,...
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
The list of column representations a field can have.
A field translates read and write calls from/to underlying columns to/from tree values.
ROOT::Internal::RColumn * fPrincipalColumn
All fields that have columns have a distinct main column.
RConstSchemaIterator cbegin() const
const std::string & GetFieldName() const
static RResult< std::unique_ptr< RFieldBase > > Create(const std::string &fieldName, const std::string &typeName, const ROOT::RCreateFieldOptions &options, const ROOT::RNTupleDescriptor *desc, ROOT::DescriptorId_t fieldId)
Factory method to resurrect a field from the stored on-disk type information.
ROOT::DescriptorId_t GetOnDiskId() const
std::unique_ptr< RFieldBase > Clone(std::string_view newName) const
Copies the field and its subfields using a possibly new name and a new, unconnected set of columns.
The on-storage metadata of an RNTuple.
RFieldDescriptorIterable GetFieldIterable(const RFieldDescriptor &fieldDesc) const
const RFieldDescriptor & GetFieldDescriptor(ROOT::DescriptorId_t fieldId) const
ROOT::DescriptorId_t FindFieldId(std::string_view fieldName, ROOT::DescriptorId_t parentId) const
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Common user-tunable settings for reading RNTuples.
const_iterator begin() const
const_iterator end() const
virtual const char * Getenv(const char *env)
Get environment variable.
Definition TSystem.cxx:1678
std::string TypeID2TypeName(const std::type_info &id)
Returns the name of a type starting from its type_info An empty string is returned in case of failure...
Definition RDFUtils.cxx:178
ROOT::RDataFrame FromRNTuple(std::string_view ntupleName, const std::vector< std::string > &fileNames, const std::pair< ULong64_t, ULong64_t > &range)
Internal overload of the function that allows passing a range of entries.
std::pair< std::vector< ROOT::Internal::RNTupleClusterBoundaries >, ROOT::NTupleSize_t > GetClustersAndEntries(std::string_view ntupleName, std::string_view location)
Retrieves the cluster boundaries and the number of entries for the input RNTuple.
void CallConnectPageSourceOnField(RFieldBase &, ROOT::Internal::RPageSource &)
std::vector< ROOT::Internal::RNTupleClusterBoundaries > GetClusterBoundaries(const RNTupleDescriptor &desc)
Return the cluster boundaries for each cluster in this RNTuple.
std::string GetRenormalizedTypeName(const std::string &metaNormalizedName)
Given a type name normalized by ROOT meta, renormalize it for RNTuple. E.g., insert std::prefix.
RDataFrame FromRNTuple(std::string_view ntupleName, std::string_view fileName)
std::vector< std::string > ColumnNames_t
Namespace for new ROOT classes and functions.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
ENTupleStructure
The fields in the RNTuple data model tree can carry different structural information about the type s...
Tag to let data sources use the native data type when creating a column reader.
Definition Utils.hxx:344
The PrepareNextRanges() method populates the fNextRanges list with REntryRangeDS records.
Definition RNTupleDS.hxx:80