Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleDS.cxx
Go to the documentation of this file.
1/// \file RNTupleDS.cxx
2/// \author Jakob Blomer <jblomer@cern.ch>
3/// \author Enrico Guiraud <enrico.guiraud@cern.ch>
4/// \date 2018-10-04
5
6/*************************************************************************
7 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
8 * All rights reserved. *
9 * *
10 * For the licensing terms see $ROOTSYS/LICENSE. *
11 * For the list of contributors see $ROOTSYS/README/CREDITS. *
12 *************************************************************************/
13
15#include <ROOT/RDataFrame.hxx>
16#include <ROOT/RDF/Utils.hxx>
17#include <ROOT/RField.hxx>
18#include <ROOT/RFieldUtils.hxx>
21#include <ROOT/RNTupleDS.hxx>
22#include <ROOT/RNTupleUtil.hxx>
23#include <ROOT/RPageStorage.hxx>
24#include <string_view>
25
26#include <TError.h>
27#include <TSystem.h>
28
29#include <cassert>
30#include <memory>
31#include <mutex>
32#include <string>
33#include <vector>
34#include <typeinfo>
35#include <utility>
36
37// clang-format off
38/**
39* \class ROOT::RDF::RNTupleDS
40* \ingroup dataframe
41* \brief The RDataSource implementation for RNTuple. It lets RDataFrame read RNTuple data.
42*
43* An RDataFrame that reads RNTuple data can be constructed using FromRNTuple().
44*
45* For each column containing an array or a collection, a corresponding column `#colname` is available to access
46* `colname.size()` without reading and deserializing the collection values.
47*
48**/
49// clang-format on
50
51namespace ROOT::Internal::RDF {
52/// An artificial field that transforms an RNTuple column that contains the offset of collections into
53/// collection sizes. It is used to provide the "number of" RDF columns for collections, e.g.
54/// `R_rdf_sizeof_jets` for a collection named `jets`.
55///
56/// This field owns the collection offset field but instead of exposing the collection offsets it exposes
57/// the collection sizes (offset(N+1) - offset(N)). For the time being, we offer this functionality only in RDataFrame.
58/// TODO(jblomer): consider providing a general set of useful virtual fields as part of RNTuple.
60protected:
61 std::unique_ptr<ROOT::RFieldBase> CloneImpl(std::string_view /* newName */) const final
62 {
63 return std::make_unique<RRDFCardinalityField>();
64 }
65 void ConstructValue(void *where) const final { *static_cast<std::size_t *>(where) = 0; }
66
67public:
68 RRDFCardinalityField() : ROOT::RFieldBase("", "std::size_t", ROOT::ENTupleStructure::kLeaf, false /* isSimple */) {}
71 ~RRDFCardinalityField() override = default;
72
82 // Field is only used for reading
83 void GenerateColumns() final { throw RException(R__FAIL("Cardinality fields must only be used for reading")); }
88
89 size_t GetValueSize() const final { return sizeof(std::size_t); }
90 size_t GetAlignment() const final { return alignof(std::size_t); }
91
92 /// Get the number of elements of the collection identified by globalIndex
100
101 /// Get the number of elements of the collection identified by clusterIndex
109};
110
111/**
112 * @brief An artificial field that provides the size of a fixed-size array
113 *
114 * This is the implementation of `R_rdf_sizeof_column` in case `column` contains
115 * fixed-size arrays on disk.
116 */
118private:
119 std::size_t fArrayLength;
120
121 std::unique_ptr<ROOT::RFieldBase> CloneImpl(std::string_view) const final
122 {
123 return std::make_unique<RArraySizeField>(fArrayLength);
124 }
125 void GenerateColumns() final { throw RException(R__FAIL("RArraySizeField fields must only be used for reading")); }
127 void ReadGlobalImpl(ROOT::NTupleSize_t /*globalIndex*/, void *to) final
128 {
129 *static_cast<std::size_t *>(to) = fArrayLength;
130 }
131 void ReadInClusterImpl(RNTupleLocalIndex /*localIndex*/, void *to) final
132 {
133 *static_cast<std::size_t *>(to) = fArrayLength;
134 }
135
136public:
138 : ROOT::RFieldBase("", "std::size_t", ROOT::ENTupleStructure::kLeaf, false /* isSimple */),
140 {
141 }
147
148 void ConstructValue(void *where) const final { *static_cast<std::size_t *>(where) = 0; }
149 std::size_t GetValueSize() const final { return sizeof(std::size_t); }
150 std::size_t GetAlignment() const final { return alignof(std::size_t); }
151};
152
153/// Every RDF column is represented by exactly one RNTuple field
157
158 RNTupleDS *fDataSource; ///< The data source that owns this column reader
159 RFieldBase *fProtoField; ///< The prototype field from which fField is cloned
160 std::unique_ptr<RFieldBase> fField; ///< The field backing the RDF column
161 std::unique_ptr<RFieldBase::RValue> fValue; ///< The memory location used to read from fField
162 std::shared_ptr<void> fValuePtr; ///< Used to reuse the object created by fValue when reconnecting sources
163 Long64_t fLastEntry = -1; ///< Last entry number that was read
164 /// For chains, the logical entry and the physical entry in any particular file can be different.
165 /// The entry offset stores the logical entry number (sum of all previous physical entries) when a file of the corresponding
166 /// data source was opened.
168
169public:
171 ~RNTupleColumnReader() override = default;
172
173 /// Connect the field and its subfields to the page source
175 {
176 assert(fLastEntry == -1);
178
179 // Create a new, real field from the prototype and set its field ID in the context of the given page source
181 {
182 auto descGuard = source.GetSharedDescriptorGuard();
183 // Set the on-disk field IDs for the field and the subfield
184 fField->SetOnDiskId(
186 auto iProto = fProtoField->cbegin();
187 auto iReal = fField->begin();
188 for (; iReal != fField->end(); ++iProto, ++iReal) {
189 iReal->SetOnDiskId(descGuard->FindFieldId(fDataSource->fFieldId2QualifiedName.at(iProto->GetOnDiskId())));
190 }
191 }
192
193 try {
195 } catch (const ROOT::RException &err) {
196 auto onDiskType = source.GetSharedDescriptorGuard()->GetFieldDescriptor(fField->GetOnDiskId()).GetTypeName();
197 std::string msg = "RNTupleDS: invalid type \"" + fField->GetTypeName() + "\" for column \"" +
198 fDataSource->fFieldId2QualifiedName[fField->GetOnDiskId()] + "\" with on-disk type \"" +
199 onDiskType + "\"";
200 throw std::runtime_error(msg);
201 }
202
203 if (fValuePtr) {
204 // When the reader reconnects to a new file, the fValuePtr is already set
205 fValue = std::make_unique<RFieldBase::RValue>(fField->BindValue(fValuePtr));
206 fValuePtr = nullptr;
207 } else {
208 // For the first file, create a new object for this field (reader)
209 fValue = std::make_unique<RFieldBase::RValue>(fField->CreateValue());
210 }
211 }
212
214 {
215 if (fValue && keepValue) {
216 fValuePtr = fValue->GetPtr<void>();
217 }
218 fValue = nullptr;
219 fField = nullptr;
220 fLastEntry = -1;
221 }
222
223 void *GetImpl(Long64_t entry) final
224 {
225 if (entry != fLastEntry) {
226 fValue->Read(entry - fEntryOffset);
228 }
229 return fValue->GetPtr<void>().get();
230 }
231};
232} // namespace ROOT::Internal::RDF
233
235
237 ROOT::DescriptorId_t fieldId, std::vector<RNTupleDS::RFieldInfo> fieldInfos,
238 bool convertToRVec)
239{
240 // As an example for the mapping of RNTuple fields to RDF columns, let's consider an RNTuple
241 // using the following types and with a top-level field named "event" of type Event:
242 //
243 // struct Event {
244 // int id;
245 // std::vector<Track> tracks;
246 // };
247 // struct Track {
248 // std::vector<Hit> hits;
249 // };
250 // struct Hit {
251 // float x;
252 // float y;
253 // };
254 //
255 // AddField() will be called from the constructor with the RNTuple root field (ENTupleStructure::kRecord).
256 // From there, we recurse into the "event" sub field (also ENTupleStructure::kRecord) and further down the
257 // tree of sub fields and expose the following RDF columns:
258 //
259 // "event" [Event]
260 // "event.id" [int]
261 // "event.tracks" [RVec<Track>]
262 // "R_rdf_sizeof_event.tracks" [unsigned int]
263 // "event.tracks.hits" [RVec<RVec<Hit>>]
264 // "R_rdf_sizeof_event.tracks.hits" [RVec<unsigned int>]
265 // "event.tracks.hits.x" [RVec<RVec<float>>]
266 // "R_rdf_sizeof_event.tracks.hits.x" [RVec<unsigned int>]
267 // "event.tracks.hits.y" [RVec<RVec<float>>]
268 // "R_rdf_sizeof_event.tracks.hits.y" [RVec<unsigned int>]
269
270 const auto &fieldDesc = desc.GetFieldDescriptor(fieldId);
271 const auto &nRepetitions = fieldDesc.GetNRepetitions();
272 if ((fieldDesc.GetStructure() == ROOT::ENTupleStructure::kCollection) || (nRepetitions > 0)) {
273 // The field is a collection or a fixed-size array.
274 // We open a new collection scope with fieldID being the inner most collection. E.g. for "event.tracks.hits",
275 // fieldInfos would already contain the fieldID of "event.tracks"
276 fieldInfos.emplace_back(fieldId, nRepetitions);
277 }
278
279 if (fieldDesc.GetStructure() == ROOT::ENTupleStructure::kCollection) {
280 // Inner fields of collections are provided as projected collections of only that inner field,
281 // E.g. we provide a projected collection RVec<RVec<float>> for "event.tracks.hits.x" in the example
282 // above.
284 convertToRVec && (fieldDesc.GetTypeName().substr(0, 19) == "ROOT::VecOps::RVec<" ||
285 fieldDesc.GetTypeName().substr(0, 12) == "std::vector<" || fieldDesc.GetTypeName() == "");
286 const auto &f = *desc.GetFieldIterable(fieldDesc.GetId()).begin();
288
289 // Note that at the end of the recursion, we handled the inner sub collections as well as the
290 // collection as whole, so we are done.
291 return;
292
293 } else if (nRepetitions > 0) {
294 // Fixed-size array, same logic as ROOT::RVec.
295 const auto &f = *desc.GetFieldIterable(fieldDesc.GetId()).begin();
296 AddField(desc, colName, f.GetId(), fieldInfos);
297 return;
298 } else if (fieldDesc.GetStructure() == ROOT::ENTupleStructure::kRecord) {
299 // Inner fields of records are provided as individual RDF columns, e.g. "event.id"
300 for (const auto &f : desc.GetFieldIterable(fieldDesc.GetId())) {
301 auto innerName = colName.empty() ? f.GetFieldName() : (std::string(colName) + "." + f.GetFieldName());
302 // Inner fields of collections of records are always exposed as ROOT::RVec
303 AddField(desc, innerName, f.GetId(), fieldInfos);
304 }
305 }
306
307 // The fieldID could be the root field or the class of fieldId might not be loaded.
308 // In these cases, only the inner fields are exposed as RDF columns.
309 auto fieldOrException = ROOT::RFieldBase::Create(fieldDesc.GetFieldName(), fieldDesc.GetTypeName());
310 if (!fieldOrException)
311 return;
312 auto valueField = fieldOrException.Unwrap();
313 valueField->SetOnDiskId(fieldId);
314 for (auto &f : *valueField) {
315 f.SetOnDiskId(desc.FindFieldId(f.GetFieldName(), f.GetParent()->GetOnDiskId()));
316 }
317 std::unique_ptr<ROOT::RFieldBase> cardinalityField;
318 // Collections get the additional "number of" RDF column (e.g. "R_rdf_sizeof_tracks")
319 if (!fieldInfos.empty()) {
320 const auto &info = fieldInfos.back();
321 if (info.fNRepetitions > 0) {
322 cardinalityField = std::make_unique<ROOT::Internal::RDF::RArraySizeField>(info.fNRepetitions);
323 } else {
324 cardinalityField = std::make_unique<ROOT::Internal::RDF::RRDFCardinalityField>();
325 }
326 cardinalityField->SetOnDiskId(info.fFieldId);
327 }
328
329 for (auto i = fieldInfos.rbegin(); i != fieldInfos.rend(); ++i) {
330 const auto &fieldInfo = *i;
331
332 if (fieldInfo.fNRepetitions > 0) {
333 // Fixed-size array, read it as ROOT::RVec in memory
334 valueField = std::make_unique<ROOT::RArrayAsRVecField>("", std::move(valueField), fieldInfo.fNRepetitions);
335 } else {
336 // Actual collection. A std::vector or ROOT::RVec gets added as a ROOT::RVec. All other collection types keep
337 // their original type.
338 if (convertToRVec) {
339 valueField = std::make_unique<ROOT::RRVecField>("", std::move(valueField));
340 } else {
341 auto outerFieldType = desc.GetFieldDescriptor(fieldInfo.fFieldId).GetTypeName();
343 }
344 }
345
346 valueField->SetOnDiskId(fieldInfo.fFieldId);
347
348 // Skip the inner-most collection level to construct the cardinality column
349 // It's taken care of by the `if (!fieldInfos.empty())` scope above
350 if (i != fieldInfos.rbegin()) {
351 if (fieldInfo.fNRepetitions > 0) {
352 // This collection level refers to a fixed-size array
354 std::make_unique<ROOT::RArrayAsRVecField>("", std::move(cardinalityField), fieldInfo.fNRepetitions);
355 } else {
356 // This collection level refers to an RVec
357 cardinalityField = std::make_unique<ROOT::RRVecField>("", std::move(cardinalityField));
358 }
359
360 cardinalityField->SetOnDiskId(fieldInfo.fFieldId);
361 }
362 }
363
364 if (cardinalityField) {
365 fColumnNames.emplace_back("R_rdf_sizeof_" + std::string(colName));
366 fColumnTypes.emplace_back(cardinalityField->GetTypeName());
367 fProtoFields.emplace_back(std::move(cardinalityField));
368 }
369
370 fieldInfos.emplace_back(fieldId, nRepetitions);
371 fColumnNames.emplace_back(colName);
372 fColumnTypes.emplace_back(valueField->GetTypeName());
373 fProtoFields.emplace_back(std::move(valueField));
374}
375
376ROOT::RDF::RNTupleDS::RNTupleDS(std::unique_ptr<ROOT::Internal::RPageSource> pageSource)
377{
378 pageSource->Attach();
379 fPrincipalDescriptor = pageSource->GetSharedDescriptorGuard()->Clone();
380 fStagingArea.emplace_back(std::move(pageSource));
381
382 AddField(fPrincipalDescriptor, "", fPrincipalDescriptor.GetFieldZeroId(),
383 std::vector<ROOT::RDF::RNTupleDS::RFieldInfo>());
384}
385
386namespace {
387
389{
390 // The setting is for now a global one, must be decided before running the
391 // program by setting the appropriate environment variable. Make sure that
392 // option configuration is thread-safe and happens only once.
394 static std::once_flag flag;
395 std::call_once(flag, []() {
396 if (auto env = gSystem->Getenv("ROOT_RNTUPLE_CLUSTERBUNCHSIZE"); env != nullptr && strlen(env) > 0) {
397 std::string envStr{env};
398 auto envNum{std::stoul(envStr)};
399 envNum = envNum == 0 ? 1 : envNum;
401 }
402 });
403 return opts;
404}
405
406std::unique_ptr<ROOT::Internal::RPageSource> CreatePageSource(std::string_view ntupleName, std::string_view fileName)
407{
409}
410} // namespace
411
412ROOT::RDF::RNTupleDS::RNTupleDS(std::string_view ntupleName, std::string_view fileName)
413 : RNTupleDS(CreatePageSource(ntupleName, fileName))
414{
415 fFileNames = std::vector<std::string>{std::string{fileName}};
416}
417
418ROOT::RDF::RNTupleDS::RNTupleDS(std::string_view ntupleName, const std::vector<std::string> &fileNames)
419 : RNTupleDS(CreatePageSource(ntupleName, fileNames[0]))
420{
423 fStagingArea.resize(fFileNames.size());
424}
425
427ROOT::RDF::RNTupleDS::GetColumnReadersImpl(std::string_view /* name */, const std::type_info & /* ti */)
428{
429 // This datasource uses the newer GetColumnReaders() API
430 return {};
431}
432
433std::unique_ptr<ROOT::Detail::RDF::RColumnReaderBase>
434ROOT::RDF::RNTupleDS::GetColumnReaders(unsigned int slot, std::string_view name, const std::type_info &tid)
435{
436 // At this point we can assume that `name` will be found in fColumnNames
437 const auto index = std::distance(fColumnNames.begin(), std::find(fColumnNames.begin(), fColumnNames.end(), name));
439
441 // If the field corresponding to the provided name is not a cardinality column and the requested type is different
442 // from the proto field that was created when the data source was constructed, we first have to create an
443 // alternative proto field for the column reader. Otherwise, we can directly use the existing proto field.
444 if (name.substr(0, 13) != "R_rdf_sizeof_" && requestedType != fColumnTypes[index]) {
445 auto &altProtoFields = fAlternativeProtoFields[index];
446 auto altProtoField = std::find_if(altProtoFields.begin(), altProtoFields.end(),
447 [&requestedType](const std::unique_ptr<ROOT::RFieldBase> &fld) {
448 return fld->GetTypeName() == requestedType;
449 });
451 field = altProtoField->get();
452 } else {
455 throw std::runtime_error("RNTupleDS: Could not create field with type \"" + requestedType +
456 "\" for column \"" + std::string(name));
457 }
459 newAltProtoField->SetOnDiskId(fProtoFields[index]->GetOnDiskId());
460 field = newAltProtoField.get();
461 altProtoFields.emplace_back(std::move(newAltProtoField));
462 }
463 } else {
464 field = fProtoFields[index].get();
465 }
466
467 // Map the field's and subfields' IDs to qualified names so that we can later connect the fields to
468 // other page sources from the chain
469 fFieldId2QualifiedName[field->GetOnDiskId()] = fPrincipalDescriptor.GetQualifiedFieldName(field->GetOnDiskId());
470 for (const auto &s : *field) {
471 fFieldId2QualifiedName[s.GetOnDiskId()] = fPrincipalDescriptor.GetQualifiedFieldName(s.GetOnDiskId());
472 }
473
474 auto reader = std::make_unique<ROOT::Internal::RDF::RNTupleColumnReader>(this, field);
475 fActiveColumnReaders[slot].emplace_back(reader.get());
476
477 return reader;
478}
479
481{
482 while (true) {
483 std::unique_lock lock(fMutexStaging);
484 fCvStaging.wait(lock, [this] { return fIsReadyForStaging || fStagingThreadShouldTerminate; });
485 if (fStagingThreadShouldTerminate)
486 return;
487
488 assert(!fHasNextSources);
489 StageNextSources();
490 fHasNextSources = true;
491 fIsReadyForStaging = false;
492
493 lock.unlock();
494 fCvStaging.notify_one();
495 }
496}
497
499{
500 const auto nFiles = fFileNames.empty() ? 1 : fFileNames.size();
501 for (auto i = fNextFileIndex; (i < nFiles) && ((i - fNextFileIndex) < fNSlots); ++i) {
502 if (fStagingThreadShouldTerminate)
503 return;
504
505 if (fStagingArea[i]) {
506 // The first file is already open and was used to read the schema
507 assert(i == 0);
508 } else {
509 fStagingArea[i] = CreatePageSource(fNTupleName, fFileNames[i]);
510 fStagingArea[i]->LoadStructure();
511 }
512 }
513}
514
516{
517 assert(fNextRanges.empty());
518 auto nFiles = fFileNames.empty() ? 1 : fFileNames.size();
519 auto nRemainingFiles = nFiles - fNextFileIndex;
520 if (nRemainingFiles == 0)
521 return;
522
523 // Easy work scheduling: one file per slot. We skip empty files (files without entries).
524 if (nRemainingFiles >= fNSlots) {
525 while ((fNextRanges.size() < fNSlots) && (fNextFileIndex < nFiles)) {
527
528 std::swap(fStagingArea[fNextFileIndex], range.fSource);
529
530 if (!range.fSource) {
531 // Typically, the prestaged source should have been present. Only if some of the files are empty, we need
532 // to open and attach files here.
533 range.fSource = CreatePageSource(fNTupleName, fFileNames[fNextFileIndex]);
534 }
535 range.fFileName = fFileNames[fNextFileIndex];
536 range.fSource->Attach();
537 fNextFileIndex++;
538
539 auto nEntries = range.fSource->GetNEntries();
540 if (nEntries == 0)
541 continue;
542
543 range.fLastEntry = nEntries; // whole file per slot, i.e. entry range [0..nEntries - 1]
544 fNextRanges.emplace_back(std::move(range));
545 }
546 return;
547 }
548
549 // Work scheduling of the tail: multiple slots work on the same file.
550 // Every slot still has its own page source but these page sources may open the same file.
551 // Again, we need to skip empty files.
552 unsigned int nSlotsPerFile = fNSlots / nRemainingFiles;
553 for (std::size_t i = 0; (fNextRanges.size() < fNSlots) && (fNextFileIndex < nFiles); ++i) {
554 std::unique_ptr<ROOT::Internal::RPageSource> source;
555 // Need to look for the file name to populate the sample info later
556 const auto &sourceFileName = fFileNames[fNextFileIndex];
557 std::swap(fStagingArea[fNextFileIndex], source);
558 if (!source) {
559 // Empty files trigger this condition
560 source = CreatePageSource(fNTupleName, fFileNames[fNextFileIndex]);
561 }
562 source->Attach();
563 fNextFileIndex++;
564
565 auto nEntries = source->GetNEntries();
566 if (nEntries == 0)
567 continue;
568
569 // If last file: use all remaining slots
570 if (i == (nRemainingFiles - 1))
571 nSlotsPerFile = fNSlots - fNextRanges.size();
572
573 std::vector<std::pair<ULong64_t, ULong64_t>> rangesByCluster;
574 {
575 auto descriptorGuard = source->GetSharedDescriptorGuard();
576 auto clusterId = descriptorGuard->FindClusterId(0, 0);
578 const auto &clusterDesc = descriptorGuard->GetClusterDescriptor(clusterId);
579 rangesByCluster.emplace_back(std::make_pair<ULong64_t, ULong64_t>(
580 clusterDesc.GetFirstEntryIndex(), clusterDesc.GetFirstEntryIndex() + clusterDesc.GetNEntries()));
581 clusterId = descriptorGuard->FindNextClusterId(clusterId);
582 }
583 }
584 const unsigned int nRangesByCluster = rangesByCluster.size();
585
586 // Distribute slots equidistantly over the entry range, aligned on cluster boundaries
588 const auto remainder = nRangesByCluster % nSlotsPerFile;
589 std::size_t iRange = 0;
590 unsigned int iSlot = 0;
591 const unsigned int N = std::min(nSlotsPerFile, nRangesByCluster);
592 for (; iSlot < N; ++iSlot) {
593 auto start = rangesByCluster[iRange].first;
594 iRange += nClustersPerSlot + static_cast<int>(iSlot < remainder);
595 assert(iRange > 0);
596 auto end = rangesByCluster[iRange - 1].second;
597
599 range.fFileName = sourceFileName;
600 // The last range for this file just takes the already opened page source. All previous ranges clone.
601 if (iSlot == N - 1) {
602 range.fSource = std::move(source);
603 } else {
604 range.fSource = source->Clone();
605 }
606 range.fSource->SetEntryRange({start, end - start});
607 range.fFirstEntry = start;
608 range.fLastEntry = end;
609 fNextRanges.emplace_back(std::move(range));
610 }
611 } // loop over tail of remaining files
612}
613
614std::vector<std::pair<ULong64_t, ULong64_t>> ROOT::RDF::RNTupleDS::GetEntryRanges()
615{
616 std::vector<std::pair<ULong64_t, ULong64_t>> ranges;
617
618 // We need to distinguish between single threaded and multi-threaded runs.
619 // In single threaded mode, InitSlot is only called once and column readers have to be rewired
620 // to new page sources of the chain in GetEntryRanges. In multi-threaded mode, on the other hand,
621 // InitSlot is called for every returned range, thus rewiring the column readers takes place in
622 // InitSlot and FinalizeSlot.
623
624 if (fNSlots == 1) {
625 for (auto r : fActiveColumnReaders[0]) {
626 r->Disconnect(true /* keepValue */);
627 }
628 }
629
630 // If we have fewer files than slots and we run multiple event loops, we can reuse fCurrentRanges and don't need
631 // to worry about loading the fNextRanges. I.e., in this case we don't enter the if block.
632 if (fCurrentRanges.empty() || (fSeenEntries > 0)) {
633 // Otherwise, i.e. start of the first event loop or in the middle of the event loop, prepare the next ranges
634 // and swap with the current ones.
635 {
636 std::unique_lock lock(fMutexStaging);
637 fCvStaging.wait(lock, [this] { return fHasNextSources; });
638 }
639 PrepareNextRanges();
640 if (fNextRanges.empty()) {
641 // No more data
642 return ranges;
643 }
644
645 assert(fNextRanges.size() <= fNSlots);
646
647 fCurrentRanges.clear();
648 std::swap(fCurrentRanges, fNextRanges);
649 }
650
651 // Stage next batch of files for the next call to GetEntryRanges()
652 {
653 std::lock_guard _(fMutexStaging);
654 fIsReadyForStaging = true;
655 fHasNextSources = false;
656 }
657 fCvStaging.notify_one();
658
659 // Create ranges for the RDF loop manager from the list of REntryRangeDS records.
660 // The entry ranges that are relative to the page source in REntryRangeDS are translated into absolute
661 // entry ranges, given the current state of the entry cursor.
662 // We remember the connection from first absolute entry index of a range to its REntryRangeDS record
663 // so that we can properly rewire the column reader in InitSlot
664 fFirstEntry2RangeIdx.clear();
666 for (std::size_t i = 0; i < fCurrentRanges.size(); ++i) {
667 // Several consecutive ranges may operate on the same file (each with their own page source clone).
668 // We can detect a change of file when the first entry number jumps back to 0.
669 if (fCurrentRanges[i].fFirstEntry == 0) {
670 // New source
671 fSeenEntries += nEntriesPerSource;
673 }
674 auto start = fCurrentRanges[i].fFirstEntry + fSeenEntries;
675 auto end = fCurrentRanges[i].fLastEntry + fSeenEntries;
676 nEntriesPerSource += end - start;
677
678 fFirstEntry2RangeIdx[start] = i;
679 ranges.emplace_back(start, end);
680 }
681 fSeenEntries += nEntriesPerSource;
682
683 if ((fNSlots == 1) && (fCurrentRanges[0].fSource)) {
684 for (auto r : fActiveColumnReaders[0]) {
685 r->Connect(*fCurrentRanges[0].fSource, ranges[0].first);
686 }
687 }
688
689 return ranges;
690}
691
693{
694 if (fNSlots == 1) {
695 // Ensure the connection between slot and range is valid also in single-thread mode
696 fSlotsToRangeIdxs[0] = 0;
697 return;
698 }
699
700 // The same slot ID could be picked multiple times in the same execution, thus
701 // ending up processing different page sources. Here we re-establish the
702 // connection between the slot and the correct page source by finding which
703 // range index corresponds to the first entry passed.
704 auto idxRange = fFirstEntry2RangeIdx.at(firstEntry);
705 // We also remember this connection so it can later be retrieved in CreateSampleInfo
706 fSlotsToRangeIdxs[slot * ROOT::Internal::RDF::CacheLineStep<std::size_t>()] = idxRange;
707
708 for (auto r : fActiveColumnReaders[slot]) {
709 r->Connect(*fCurrentRanges[idxRange].fSource, firstEntry - fCurrentRanges[idxRange].fFirstEntry);
710 }
711}
712
714{
715 if (fNSlots == 1)
716 return;
717
718 for (auto r : fActiveColumnReaders[slot]) {
719 r->Disconnect(true /* keepValue */);
720 }
721}
722
723std::string ROOT::RDF::RNTupleDS::GetTypeName(std::string_view colName) const
724{
725 auto colNamePos = std::find(fColumnNames.begin(), fColumnNames.end(), colName);
726
727 if (colNamePos == fColumnNames.end()) {
728 auto msg = std::string("RNTupleDS: There is no column with name \"") + std::string(colName) + "\"";
729 throw std::runtime_error(msg);
730 }
731
732 const auto index = std::distance(fColumnNames.begin(), colNamePos);
733 return fColumnTypes[index];
734}
735
736bool ROOT::RDF::RNTupleDS::HasColumn(std::string_view colName) const
737{
738 return std::find(fColumnNames.begin(), fColumnNames.end(), colName) != fColumnNames.end();
739}
740
742{
743 fSeenEntries = 0;
744 fNextFileIndex = 0;
745 fIsReadyForStaging = fHasNextSources = fStagingThreadShouldTerminate = false;
746 fThreadStaging = std::thread(&RNTupleDS::ExecStaging, this);
747 assert(fNextRanges.empty());
748
749 if (fCurrentRanges.empty() || (fFileNames.size() > fNSlots)) {
750 // First event loop or large number of files: start the staging process.
751 {
752 std::lock_guard _(fMutexStaging);
753 fIsReadyForStaging = true;
754 }
755 fCvStaging.notify_one();
756 } else {
757 // Otherwise, we will reuse fCurrentRanges. Make sure that staging and preparing next ranges will be a noop
758 // (already at the end of the list of files).
759 fNextFileIndex = std::max(fFileNames.size(), std::size_t(1));
760 }
761}
762
764{
765 for (unsigned int i = 0; i < fNSlots; ++i) {
766 for (auto r : fActiveColumnReaders[i]) {
767 r->Disconnect(false /* keepValue */);
768 }
769 }
770 {
771 std::lock_guard _(fMutexStaging);
772 fStagingThreadShouldTerminate = true;
773 }
774 fCvStaging.notify_one();
775 fThreadStaging.join();
776 // If we have a chain with more files than the number of slots, the files opened at the end of the
777 // event loop won't be reused when the event loop restarts, so we can close them.
778 if (fFileNames.size() > fNSlots) {
779 fCurrentRanges.clear();
780 fNextRanges.clear();
781 fStagingArea.clear();
782 fStagingArea.resize(fFileNames.size());
783 }
784}
785
787{
788 assert(fNSlots == 0);
789 assert(nSlots > 0);
790 fNSlots = nSlots;
791 fActiveColumnReaders.resize(fNSlots);
792 fSlotsToRangeIdxs.resize(fNSlots * ROOT::Internal::RDF::CacheLineStep<std::size_t>());
793}
794
795ROOT::RDataFrame ROOT::RDF::FromRNTuple(std::string_view ntupleName, std::string_view fileName)
796{
797 return ROOT::RDataFrame(std::make_unique<ROOT::RDF::RNTupleDS>(ntupleName, fileName));
798}
799
800ROOT::RDataFrame ROOT::RDF::FromRNTuple(std::string_view ntupleName, const std::vector<std::string> &fileNames)
801{
802 return ROOT::RDataFrame(std::make_unique<ROOT::RDF::RNTupleDS>(ntupleName, fileNames));
803}
804
805ROOT::RDF::RSampleInfo ROOT::Internal::RDF::RNTupleDS::CreateSampleInfo(
806 unsigned int slot, const std::unordered_map<std::string, ROOT::RDF::Experimental::RSample *> &sampleMap) const
807{
808 // The same slot ID could be picked multiple times in the same execution, thus
809 // ending up processing different page sources. Here we re-establish the
810 // connection between the slot and the correct page source by retrieving
811 // which range is connected currently to the slot
812 const auto &rangeIdx = fSlotsToRangeIdxs.at(slot * ROOT::Internal::RDF::CacheLineStep<std::size_t>());
813
814 // Missing source if a file does not exist
815 if (!fCurrentRanges[rangeIdx].fSource)
816 return ROOT::RDF::RSampleInfo{};
817
818 const auto &ntupleName = fCurrentRanges[rangeIdx].fSource->GetNTupleName();
819 const auto &ntuplePath = fCurrentRanges[rangeIdx].fFileName;
820 const auto ntupleID = std::string(ntuplePath) + '/' + ntupleName;
821
822 // TODO: There is no support for RNTuple in RDatasetSpec, thus the sample map
823 // is always empty at the moment.
824 if (sampleMap.empty())
826 ntupleID, std::make_pair(fCurrentRanges[rangeIdx].fFirstEntry, fCurrentRanges[rangeIdx].fLastEntry));
827
828 if (sampleMap.find(ntupleID) == sampleMap.end())
829 throw std::runtime_error("Full sample identifier '" + ntupleID + "' cannot be found in the available samples.");
830
832 ntupleID, std::make_pair(fCurrentRanges[rangeIdx].fFirstEntry, fCurrentRanges[rangeIdx].fLastEntry),
834}
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:299
#define f(i)
Definition RSha256.hxx:104
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
long long Long64_t
Definition RtypesCore.h:69
unsigned long long ULong64_t
Definition RtypesCore.h:70
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define N
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t index
char name[80]
Definition TGX11.cxx:110
R__EXTERN TSystem * gSystem
Definition TSystem.h:572
#define _(A, B)
Definition cfortran.h:108
void GetCollectionInfo(const ROOT::NTupleSize_t globalIndex, RNTupleLocalIndex *collectionStart, ROOT::NTupleSize_t *collectionSize)
For offset columns only, look at the two adjacent values that define a collection's coordinates.
Definition RColumn.hxx:283
An artificial field that provides the size of a fixed-size array.
void GenerateColumns(const ROOT::RNTupleDescriptor &) final
Implementations in derived classes should create the backing columns corresponding to the field type ...
std::unique_ptr< ROOT::RFieldBase > CloneImpl(std::string_view) const final
Called by Clone(), which additionally copies the on-disk ID.
void ReadGlobalImpl(ROOT::NTupleSize_t, void *to) final
RArraySizeField(const RArraySizeField &other)=delete
RArraySizeField(RArraySizeField &&other)=default
RArraySizeField & operator=(RArraySizeField &&other)=default
void ReadInClusterImpl(RNTupleLocalIndex, void *to) final
std::size_t GetValueSize() const final
The number of bytes taken by a value of the appropriate type.
RArraySizeField(std::size_t arrayLength)
void ConstructValue(void *where) const final
Constructs value in a given location of size at least GetValueSize(). Called by the base class' Creat...
RArraySizeField & operator=(const RArraySizeField &other)=delete
std::size_t GetAlignment() const final
As a rule of thumb, the alignment is equal to the size of the type.
void GenerateColumns() final
Implementations in derived classes should create the backing columns corresponding to the field type ...
Every RDF column is represented by exactly one RNTuple field.
void * GetImpl(Long64_t entry) final
void Connect(RPageSource &source, Long64_t entryOffset)
Connect the field and its subfields to the page source.
RNTupleColumnReader(RNTupleDS *ds, RFieldBase *protoField)
std::unique_ptr< RFieldBase::RValue > fValue
The memory location used to read from fField.
std::unique_ptr< RFieldBase > fField
The field backing the RDF column.
Long64_t fEntryOffset
For chains, the logical entry and the physical entry in any particular file can be different.
std::shared_ptr< void > fValuePtr
Used to reuse the object created by fValue when reconnecting sources.
RNTupleDS * fDataSource
The data source that owns this column reader.
RFieldBase * fProtoField
The prototype field from which fField is cloned.
Long64_t fLastEntry
Last entry number that was read.
An artificial field that transforms an RNTuple column that contains the offset of collections into co...
Definition RNTupleDS.cxx:59
RRDFCardinalityField(RRDFCardinalityField &&other)=default
size_t GetAlignment() const final
As a rule of thumb, the alignment is equal to the size of the type.
Definition RNTupleDS.cxx:90
void GenerateColumns() final
Implementations in derived classes should create the backing columns corresponding to the field type ...
Definition RNTupleDS.cxx:83
void ReadGlobalImpl(ROOT::NTupleSize_t globalIndex, void *to) final
Get the number of elements of the collection identified by globalIndex.
Definition RNTupleDS.cxx:93
RRDFCardinalityField & operator=(RRDFCardinalityField &&other)=default
void GenerateColumns(const ROOT::RNTupleDescriptor &desc) final
Implementations in derived classes should create the backing columns corresponding to the field type ...
Definition RNTupleDS.cxx:84
const RColumnRepresentations & GetColumnRepresentations() const final
Implementations in derived classes should return a static RColumnRepresentations object.
Definition RNTupleDS.cxx:73
size_t GetValueSize() const final
The number of bytes taken by a value of the appropriate type.
Definition RNTupleDS.cxx:89
void ReadInClusterImpl(ROOT::RNTupleLocalIndex localIndex, void *to) final
Get the number of elements of the collection identified by clusterIndex.
std::unique_ptr< ROOT::RFieldBase > CloneImpl(std::string_view) const final
Called by Clone(), which additionally copies the on-disk ID.
Definition RNTupleDS.cxx:61
void ConstructValue(void *where) const final
Constructs value in a given location of size at least GetValueSize(). Called by the base class' Creat...
Definition RNTupleDS.cxx:65
static void SetClusterBunchSize(RNTupleReadOptions &options, unsigned int val)
Abstract interface to read data from an ntuple.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
std::vector< void * > Record_t
The RDataSource implementation for RNTuple.
Definition RNTupleDS.hxx:47
void AddField(const ROOT::RNTupleDescriptor &desc, std::string_view colName, ROOT::DescriptorId_t fieldId, std::vector< RFieldInfo > fieldInfos, bool convertToRVec=true)
Provides the RDF column "colName" given the field identified by fieldID.
std::vector< std::pair< ULong64_t, ULong64_t > > GetEntryRanges() final
Return ranges of entries to distribute to tasks.
void ExecStaging()
The main function of the fThreadStaging background thread.
std::vector< std::unique_ptr< ROOT::Internal::RPageSource > > fStagingArea
The staging area is relevant for chains of files, i.e.
Definition RNTupleDS.hxx:83
std::unique_ptr< ROOT::Detail::RDF::RColumnReaderBase > GetColumnReaders(unsigned int, std::string_view, const std::type_info &) final
If the other GetColumnReaders overload returns an empty vector, this overload will be called instead.
std::vector< std::unique_ptr< ROOT::RFieldBase > > fProtoFields
We prepare a prototype field for every column.
Definition RNTupleDS.hxx:90
void SetNSlots(unsigned int nSlots) final
Inform RDataSource of the number of processing slots (i.e.
std::vector< std::string > fFileNames
Definition RNTupleDS.hxx:66
void InitSlot(unsigned int slot, ULong64_t firstEntry) final
Convenience method called at the start of the data processing associated to a slot.
RNTupleDS(std::unique_ptr< ROOT::Internal::RPageSource > pageSource)
std::string GetTypeName(std::string_view colName) const final
Type of a column as a string, e.g.
std::unordered_map< ROOT::DescriptorId_t, std::string > fFieldId2QualifiedName
Connects the IDs of active proto fields and their subfields to their fully qualified name (a....
Definition RNTupleDS.hxx:99
std::string fNTupleName
The data source may be constructed with an ntuple name and a list of files.
Definition RNTupleDS.hxx:65
void PrepareNextRanges()
Populates fNextRanges with the next set of entry ranges.
void StageNextSources()
Starting from fNextFileIndex, opens the next fNSlots files.
void Finalize() final
Convenience method called after concluding an event-loop.
std::vector< std::string > fColumnTypes
void Initialize() final
Convenience method called before starting an event-loop.
std::vector< std::string > fColumnNames
bool HasColumn(std::string_view colName) const final
Checks if the dataset has a certain column.
Record_t GetColumnReadersImpl(std::string_view name, const std::type_info &) final
type-erased vector of pointers to pointers to column values - one per slot
void FinalizeSlot(unsigned int slot) final
Convenience method called at the end of the data processing associated to a slot.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
ROOT's RDataFrame offers a modern, high-level interface for analysis of data stored in TTree ,...
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
The list of column representations a field can have.
A field translates read and write calls from/to underlying columns to/from tree values.
ROOT::Internal::RColumn * fPrincipalColumn
All fields that have columns have a distinct main column.
RConstSchemaIterator cbegin() const
const std::string & GetFieldName() const
static RResult< std::unique_ptr< RFieldBase > > Create(const std::string &fieldName, const std::string &typeName, const ROOT::RCreateFieldOptions &options, const ROOT::RNTupleDescriptor *desc, ROOT::DescriptorId_t fieldId)
Factory method to resurrect a field from the stored on-disk type information.
ROOT::DescriptorId_t GetOnDiskId() const
std::unique_ptr< RFieldBase > Clone(std::string_view newName) const
Copies the field and its subfields using a possibly new name and a new, unconnected set of columns.
The on-storage metadata of an RNTuple.
RFieldDescriptorIterable GetFieldIterable(const RFieldDescriptor &fieldDesc) const
const RFieldDescriptor & GetFieldDescriptor(ROOT::DescriptorId_t fieldId) const
ROOT::DescriptorId_t FindFieldId(std::string_view fieldName, ROOT::DescriptorId_t parentId) const
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Common user-tunable settings for reading RNTuples.
const_iterator begin() const
const_iterator end() const
virtual const char * Getenv(const char *env)
Get environment variable.
Definition TSystem.cxx:1678
std::string TypeID2TypeName(const std::type_info &id)
Returns the name of a type starting from its type_info An empty string is returned in case of failure...
Definition RDFUtils.cxx:129
void CallConnectPageSourceOnField(RFieldBase &, ROOT::Internal::RPageSource &)
std::string GetRenormalizedTypeName(const std::string &metaNormalizedName)
Given a type name normalized by ROOT meta, renormalize it for RNTuple. E.g., insert std::prefix.
RDataFrame FromRNTuple(std::string_view ntupleName, std::string_view fileName)
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
constexpr DescriptorId_t kInvalidDescriptorId
ENTupleStructure
The fields in the ntuple model tree can carry different structural information about the type system.
The PrepareNextRanges() method populates the fNextRanges list with REntryRangeDS records.
Definition RNTupleDS.hxx:53