Logo ROOT  
Reference Guide
 
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Loading...
Searching...
No Matches
RNTupleDS.cxx
Go to the documentation of this file.
1/// \file RNTupleDS.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \author Enrico Guiraud <enrico.guiraud@cern.ch>
5/// \date 2018-10-04
6/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
7/// is welcome!
8
9/*************************************************************************
10 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
11 * All rights reserved. *
12 * *
13 * For the licensing terms see $ROOTSYS/LICENSE. *
14 * For the list of contributors see $ROOTSYS/README/CREDITS. *
15 *************************************************************************/
16
18#include <ROOT/RDataFrame.hxx>
19#include <ROOT/RField.hxx>
20#include <ROOT/RFieldUtils.hxx>
23#include <ROOT/RNTupleDS.hxx>
24#include <ROOT/RNTupleUtil.hxx>
25#include <ROOT/RPageStorage.hxx>
26#include <string_view>
27
28#include <TError.h>
29#include <TSystem.h>
30
31#include <cassert>
32#include <memory>
33#include <mutex>
34#include <string>
35#include <vector>
36#include <typeinfo>
37#include <utility>
38
39// clang-format off
40/**
41* \class ROOT::Experimental::RNTupleDS
42* \ingroup dataframe
43* \brief The RDataSource implementation for RNTuple. It lets RDataFrame read RNTuple data.
44*
45* An RDataFrame that reads RNTuple data can be constructed using FromRNTuple().
46*
47* For each column containing an array or a collection, a corresponding column `#colname` is available to access
48* `colname.size()` without reading and deserializing the collection values.
49*
50**/
51// clang-format on
52
53namespace ROOT {
54namespace Experimental {
55namespace Internal {
56
57/// An artificial field that transforms an RNTuple column that contains the offset of collections into
58/// collection sizes. It is used to provide the "number of" RDF columns for collections, e.g.
59/// `R_rdf_sizeof_jets` for a collection named `jets`.
60///
61/// This field owns the collection offset field but instead of exposing the collection offsets it exposes
62/// the collection sizes (offset(N+1) - offset(N)). For the time being, we offer this functionality only in RDataFrame.
63/// TODO(jblomer): consider providing a general set of useful virtual fields as part of RNTuple.
65protected:
66 std::unique_ptr<ROOT::RFieldBase> CloneImpl(std::string_view /* newName */) const final
67 {
68 return std::make_unique<RRDFCardinalityField>();
69 }
70 void ConstructValue(void *where) const final { *static_cast<std::size_t *>(where) = 0; }
71
72public:
73 RRDFCardinalityField() : ROOT::RFieldBase("", "std::size_t", ROOT::ENTupleStructure::kLeaf, false /* isSimple */) {}
77
87 // Field is only used for reading
88 void GenerateColumns() final { assert(false && "Cardinality fields must only be used for reading"); }
93
94 size_t GetValueSize() const final { return sizeof(std::size_t); }
95 size_t GetAlignment() const final { return alignof(std::size_t); }
96
97 /// Get the number of elements of the collection identified by globalIndex
105
106 /// Get the number of elements of the collection identified by clusterIndex
114};
115
116/**
117 * @brief An artificial field that provides the size of a fixed-size array
118 *
119 * This is the implementation of `R_rdf_sizeof_column` in case `column` contains
120 * fixed-size arrays on disk.
121 */
123private:
124 std::size_t fArrayLength;
125
126 std::unique_ptr<ROOT::RFieldBase> CloneImpl(std::string_view) const final
127 {
128 return std::make_unique<RArraySizeField>(fArrayLength);
129 }
130 void GenerateColumns() final { assert(false && "RArraySizeField fields must only be used for reading"); }
132 void ReadGlobalImpl(ROOT::NTupleSize_t /*globalIndex*/, void *to) final
133 {
134 *static_cast<std::size_t *>(to) = fArrayLength;
135 }
136 void ReadInClusterImpl(RNTupleLocalIndex /*localIndex*/, void *to) final
137 {
138 *static_cast<std::size_t *>(to) = fArrayLength;
139 }
140
141public:
143 : ROOT::RFieldBase("", "std::size_t", ROOT::ENTupleStructure::kLeaf, false /* isSimple */),
145 {
146 }
152
153 void ConstructValue(void *where) const final { *static_cast<std::size_t *>(where) = 0; }
154 std::size_t GetValueSize() const final { return sizeof(std::size_t); }
155 std::size_t GetAlignment() const final { return alignof(std::size_t); }
156};
157
158/// Every RDF column is represented by exactly one RNTuple field
162
163 RNTupleDS *fDataSource; ///< The data source that owns this column reader
164 RFieldBase *fProtoField; ///< The prototype field from which fField is cloned
165 std::unique_ptr<RFieldBase> fField; ///< The field backing the RDF column
166 std::unique_ptr<RFieldBase::RValue> fValue; ///< The memory location used to read from fField
167 std::shared_ptr<void> fValuePtr; ///< Used to reuse the object created by fValue when reconnecting sources
168 Long64_t fLastEntry = -1; ///< Last entry number that was read
169 /// For chains, the logical entry and the physical entry in any particular file can be different.
170 /// The entry offset stores the logical entry number (sum of all previous physical entries) when a file of the corresponding
171 /// data source was opened.
173
174public:
177
178 /// Connect the field and its subfields to the page source
180 {
181 assert(fLastEntry == -1);
183
184 // Create a new, real field from the prototype and set its field ID in the context of the given page source
186 {
187 auto descGuard = source.GetSharedDescriptorGuard();
188 // Set the on-disk field IDs for the field and the subfield
189 fField->SetOnDiskId(
191 auto iProto = fProtoField->cbegin();
192 auto iReal = fField->begin();
193 for (; iReal != fField->end(); ++iProto, ++iReal) {
194 iReal->SetOnDiskId(descGuard->FindFieldId(fDataSource->fFieldId2QualifiedName.at(iProto->GetOnDiskId())));
195 }
196 }
197
198 try {
200 } catch (const ROOT::RException &err) {
201 auto onDiskType = source.GetSharedDescriptorGuard()->GetFieldDescriptor(fField->GetOnDiskId()).GetTypeName();
202 std::string msg = "RNTupleDS: invalid type \"" + fField->GetTypeName() + "\" for column \"" +
203 fDataSource->fFieldId2QualifiedName[fField->GetOnDiskId()] + "\" with on-disk type \"" +
204 onDiskType + "\"";
205 throw std::runtime_error(msg);
206 }
207
208 if (fValuePtr) {
209 // When the reader reconnects to a new file, the fValuePtr is already set
210 fValue = std::make_unique<RFieldBase::RValue>(fField->BindValue(fValuePtr));
211 fValuePtr = nullptr;
212 } else {
213 // For the first file, create a new object for this field (reader)
214 fValue = std::make_unique<RFieldBase::RValue>(fField->CreateValue());
215 }
216 }
217
219 {
220 if (fValue && keepValue) {
221 fValuePtr = fValue->GetPtr<void>();
222 }
223 fValue = nullptr;
224 fField = nullptr;
225 fLastEntry = -1;
226 }
227
228 void *GetImpl(Long64_t entry) final
229 {
230 if (entry != fLastEntry) {
231 fValue->Read(entry - fEntryOffset);
233 }
234 return fValue->GetPtr<void>().get();
235 }
236};
237
238} // namespace Internal
239
240RNTupleDS::~RNTupleDS() = default;
241
243 std::vector<RNTupleDS::RFieldInfo> fieldInfos, bool convertToRVec)
244{
245 // As an example for the mapping of RNTuple fields to RDF columns, let's consider an RNTuple
246 // using the following types and with a top-level field named "event" of type Event:
247 //
248 // struct Event {
249 // int id;
250 // std::vector<Track> tracks;
251 // };
252 // struct Track {
253 // std::vector<Hit> hits;
254 // };
255 // struct Hit {
256 // float x;
257 // float y;
258 // };
259 //
260 // AddField() will be called from the constructor with the RNTuple root field (ENTupleStructure::kRecord).
261 // From there, we recurse into the "event" sub field (also ENTupleStructure::kRecord) and further down the
262 // tree of sub fields and expose the following RDF columns:
263 //
264 // "event" [Event]
265 // "event.id" [int]
266 // "event.tracks" [RVec<Track>]
267 // "R_rdf_sizeof_event.tracks" [unsigned int]
268 // "event.tracks.hits" [RVec<RVec<Hit>>]
269 // "R_rdf_sizeof_event.tracks.hits" [RVec<unsigned int>]
270 // "event.tracks.hits.x" [RVec<RVec<float>>]
271 // "R_rdf_sizeof_event.tracks.hits.x" [RVec<unsigned int>]
272 // "event.tracks.hits.y" [RVec<RVec<float>>]
273 // "R_rdf_sizeof_event.tracks.hits.y" [RVec<unsigned int>]
274
275 const auto &fieldDesc = desc.GetFieldDescriptor(fieldId);
276 const auto &nRepetitions = fieldDesc.GetNRepetitions();
277 if ((fieldDesc.GetStructure() == ROOT::ENTupleStructure::kCollection) || (nRepetitions > 0)) {
278 // The field is a collection or a fixed-size array.
279 // We open a new collection scope with fieldID being the inner most collection. E.g. for "event.tracks.hits",
280 // fieldInfos would already contain the fieldID of "event.tracks"
281 fieldInfos.emplace_back(fieldId, nRepetitions);
282 }
283
284 if (fieldDesc.GetStructure() == ROOT::ENTupleStructure::kCollection) {
285 // Inner fields of collections are provided as projected collections of only that inner field,
286 // E.g. we provide a projected collection RVec<RVec<float>> for "event.tracks.hits.x" in the example
287 // above.
289 convertToRVec && (fieldDesc.GetTypeName().substr(0, 19) == "ROOT::VecOps::RVec<" ||
290 fieldDesc.GetTypeName().substr(0, 12) == "std::vector<" || fieldDesc.GetTypeName() == "");
291 const auto &f = *desc.GetFieldIterable(fieldDesc.GetId()).begin();
293
294 // Note that at the end of the recursion, we handled the inner sub collections as well as the
295 // collection as whole, so we are done.
296 return;
297
298 } else if (nRepetitions > 0) {
299 // Fixed-size array, same logic as ROOT::RVec.
300 const auto &f = *desc.GetFieldIterable(fieldDesc.GetId()).begin();
301 AddField(desc, colName, f.GetId(), fieldInfos);
302 return;
303 } else if (fieldDesc.GetStructure() == ROOT::ENTupleStructure::kRecord) {
304 // Inner fields of records are provided as individual RDF columns, e.g. "event.id"
305 for (const auto &f : desc.GetFieldIterable(fieldDesc.GetId())) {
306 auto innerName = colName.empty() ? f.GetFieldName() : (std::string(colName) + "." + f.GetFieldName());
307 // Inner fields of collections of records are always exposed as ROOT::RVec
308 AddField(desc, innerName, f.GetId(), fieldInfos);
309 }
310 }
311
312 // The fieldID could be the root field or the class of fieldId might not be loaded.
313 // In these cases, only the inner fields are exposed as RDF columns.
314 auto fieldOrException = ROOT::RFieldBase::Create(fieldDesc.GetFieldName(), fieldDesc.GetTypeName());
315 if (!fieldOrException)
316 return;
317 auto valueField = fieldOrException.Unwrap();
318 valueField->SetOnDiskId(fieldId);
319 for (auto &f : *valueField) {
320 f.SetOnDiskId(desc.FindFieldId(f.GetFieldName(), f.GetParent()->GetOnDiskId()));
321 }
322 std::unique_ptr<ROOT::RFieldBase> cardinalityField;
323 // Collections get the additional "number of" RDF column (e.g. "R_rdf_sizeof_tracks")
324 if (!fieldInfos.empty()) {
325 const auto &info = fieldInfos.back();
326 if (info.fNRepetitions > 0) {
327 cardinalityField = std::make_unique<ROOT::Experimental::Internal::RArraySizeField>(info.fNRepetitions);
328 } else {
329 cardinalityField = std::make_unique<ROOT::Experimental::Internal::RRDFCardinalityField>();
330 }
331 cardinalityField->SetOnDiskId(info.fFieldId);
332 }
333
334 for (auto i = fieldInfos.rbegin(); i != fieldInfos.rend(); ++i) {
335 const auto &fieldInfo = *i;
336
337 if (fieldInfo.fNRepetitions > 0) {
338 // Fixed-size array, read it as ROOT::RVec in memory
339 valueField = std::make_unique<ROOT::RArrayAsRVecField>("", std::move(valueField), fieldInfo.fNRepetitions);
340 } else {
341 // Actual collection. A std::vector or ROOT::RVec gets added as a ROOT::RVec. All other collection types keep
342 // their original type.
343 if (convertToRVec) {
344 valueField = std::make_unique<ROOT::RRVecField>("", std::move(valueField));
345 } else {
346 auto outerFieldType = desc.GetFieldDescriptor(fieldInfo.fFieldId).GetTypeName();
348 }
349 }
350
351 valueField->SetOnDiskId(fieldInfo.fFieldId);
352
353 // Skip the inner-most collection level to construct the cardinality column
354 // It's taken care of by the `if (!fieldInfos.empty())` scope above
355 if (i != fieldInfos.rbegin()) {
356 if (fieldInfo.fNRepetitions > 0) {
357 // This collection level refers to a fixed-size array
359 std::make_unique<ROOT::RArrayAsRVecField>("", std::move(cardinalityField), fieldInfo.fNRepetitions);
360 } else {
361 // This collection level refers to an RVec
362 cardinalityField = std::make_unique<ROOT::RRVecField>("", std::move(cardinalityField));
363 }
364
365 cardinalityField->SetOnDiskId(fieldInfo.fFieldId);
366 }
367 }
368
369 if (cardinalityField) {
370 fColumnNames.emplace_back("R_rdf_sizeof_" + std::string(colName));
371 fColumnTypes.emplace_back(cardinalityField->GetTypeName());
372 fProtoFields.emplace_back(std::move(cardinalityField));
373 }
374
375 fieldInfos.emplace_back(fieldId, nRepetitions);
376 fColumnNames.emplace_back(colName);
377 fColumnTypes.emplace_back(valueField->GetTypeName());
378 fProtoFields.emplace_back(std::move(valueField));
379}
380
381RNTupleDS::RNTupleDS(std::unique_ptr<Internal::RPageSource> pageSource)
382{
383 pageSource->Attach();
384 fPrincipalDescriptor = pageSource->GetSharedDescriptorGuard()->Clone();
385 fStagingArea.emplace_back(std::move(pageSource));
386
388 std::vector<ROOT::Experimental::RNTupleDS::RFieldInfo>());
389}
390
391namespace {
392
394{
395 // The setting is for now a global one, must be decided before running the
396 // program by setting the appropriate environment variable. Make sure that
397 // option configuration is thread-safe and happens only once.
399 static std::once_flag flag;
400 std::call_once(flag, []() {
401 if (auto env = gSystem->Getenv("ROOT_RNTUPLE_CLUSTERBUNCHSIZE"); env != nullptr && strlen(env) > 0) {
402 std::string envStr{env};
403 auto envNum{std::stoul(envStr)};
404 envNum = envNum == 0 ? 1 : envNum;
406 }
407 });
408 return opts;
409}
410
411std::unique_ptr<ROOT::Experimental::Internal::RPageSource>
412CreatePageSource(std::string_view ntupleName, std::string_view fileName)
413{
415}
416} // namespace
417
418RNTupleDS::RNTupleDS(std::string_view ntupleName, std::string_view fileName)
419 : RNTupleDS(CreatePageSource(ntupleName, fileName))
420{
421}
422
424 : RNTupleDS(ROOT::Experimental::Internal::RPageSourceFile::CreateFromAnchor(*ntuple))
425{
426}
427
428RNTupleDS::RNTupleDS(std::string_view ntupleName, const std::vector<std::string> &fileNames)
429 : RNTupleDS(CreatePageSource(ntupleName, fileNames[0]))
430{
433 fStagingArea.resize(fFileNames.size());
434}
435
436RDF::RDataSource::Record_t RNTupleDS::GetColumnReadersImpl(std::string_view /* name */, const std::type_info & /* ti */)
437{
438 // This datasource uses the newer GetColumnReaders() API
439 return {};
440}
441
442std::unique_ptr<ROOT::Detail::RDF::RColumnReaderBase>
443RNTupleDS::GetColumnReaders(unsigned int slot, std::string_view name, const std::type_info &tid)
444{
445 // At this point we can assume that `name` will be found in fColumnNames
446 const auto index = std::distance(fColumnNames.begin(), std::find(fColumnNames.begin(), fColumnNames.end(), name));
448
450 // If the field corresponding to the provided name is not a cardinality column and the requested type is different
451 // from the proto field that was created when the data source was constructed, we first have to create an
452 // alternative proto field for the column reader. Otherwise, we can directly use the existing proto field.
453 if (name.substr(0, 13) != "R_rdf_sizeof_" && requestedType != fColumnTypes[index]) {
455 auto altProtoField = std::find_if(altProtoFields.begin(), altProtoFields.end(),
456 [&requestedType](const std::unique_ptr<ROOT::RFieldBase> &fld) {
457 return fld->GetTypeName() == requestedType;
458 });
460 field = altProtoField->get();
461 } else {
464 throw std::runtime_error("RNTupleDS: Could not create field with type \"" + requestedType +
465 "\" for column \"" + std::string(name));
466 }
468 newAltProtoField->SetOnDiskId(fProtoFields[index]->GetOnDiskId());
469 field = newAltProtoField.get();
470 altProtoFields.emplace_back(std::move(newAltProtoField));
471 }
472 } else {
473 field = fProtoFields[index].get();
474 }
475
476 // Map the field's and subfields' IDs to qualified names so that we can later connect the fields to
477 // other page sources from the chain
479 for (const auto &s : *field) {
480 fFieldId2QualifiedName[s.GetOnDiskId()] = fPrincipalDescriptor.GetQualifiedFieldName(s.GetOnDiskId());
481 }
482
483 auto reader = std::make_unique<Internal::RNTupleColumnReader>(this, field);
484 fActiveColumnReaders[slot].emplace_back(reader.get());
485
486 return reader;
487}
488
490{
491 while (true) {
492 std::unique_lock lock(fMutexStaging);
493 fCvStaging.wait(lock, [this] { return fIsReadyForStaging || fStagingThreadShouldTerminate; });
495 return;
496
499 fHasNextSources = true;
500 fIsReadyForStaging = false;
501
502 lock.unlock();
503 fCvStaging.notify_one();
504 }
505}
506
508{
509 const auto nFiles = fFileNames.empty() ? 1 : fFileNames.size();
510 for (auto i = fNextFileIndex; (i < nFiles) && ((i - fNextFileIndex) < fNSlots); ++i) {
512 return;
513
514 if (fStagingArea[i]) {
515 // The first file is already open and was used to read the schema
516 assert(i == 0);
517 } else {
518 fStagingArea[i] = CreatePageSource(fNTupleName, fFileNames[i]);
519 fStagingArea[i]->LoadStructure();
520 }
521 }
522}
523
525{
526 assert(fNextRanges.empty());
527 auto nFiles = fFileNames.empty() ? 1 : fFileNames.size();
529 if (nRemainingFiles == 0)
530 return;
531
532 // Easy work scheduling: one file per slot. We skip empty files (files without entries).
533 if (nRemainingFiles >= fNSlots) {
534 while ((fNextRanges.size() < fNSlots) && (fNextFileIndex < nFiles)) {
536
537 std::swap(fStagingArea[fNextFileIndex], range.fSource);
538
539 if (!range.fSource) {
540 // Typically, the prestaged source should have been present. Only if some of the files are empty, we need
541 // to open and attach files here.
542 range.fSource = CreatePageSource(fNTupleName, fFileNames[fNextFileIndex]);
543 }
544 range.fSource->Attach();
546
547 auto nEntries = range.fSource->GetNEntries();
548 if (nEntries == 0)
549 continue;
550
551 range.fLastEntry = nEntries; // whole file per slot, i.e. entry range [0..nEntries - 1]
552 fNextRanges.emplace_back(std::move(range));
553 }
554 return;
555 }
556
557 // Work scheduling of the tail: multiple slots work on the same file.
558 // Every slot still has its own page source but these page sources may open the same file.
559 // Again, we need to skip empty files.
560 unsigned int nSlotsPerFile = fNSlots / nRemainingFiles;
561 for (std::size_t i = 0; (fNextRanges.size() < fNSlots) && (fNextFileIndex < nFiles); ++i) {
562 std::unique_ptr<Internal::RPageSource> source;
564 if (!source) {
565 // Empty files trigger this condition
566 source = CreatePageSource(fNTupleName, fFileNames[fNextFileIndex]);
567 }
568 source->Attach();
570
571 auto nEntries = source->GetNEntries();
572 if (nEntries == 0)
573 continue;
574
575 // If last file: use all remaining slots
576 if (i == (nRemainingFiles - 1))
578
579 std::vector<std::pair<ULong64_t, ULong64_t>> rangesByCluster;
580 {
581 auto descriptorGuard = source->GetSharedDescriptorGuard();
582 auto clusterId = descriptorGuard->FindClusterId(0, 0);
584 const auto &clusterDesc = descriptorGuard->GetClusterDescriptor(clusterId);
585 rangesByCluster.emplace_back(std::make_pair<ULong64_t, ULong64_t>(
586 clusterDesc.GetFirstEntryIndex(), clusterDesc.GetFirstEntryIndex() + clusterDesc.GetNEntries()));
587 clusterId = descriptorGuard->FindNextClusterId(clusterId);
588 }
589 }
590 const unsigned int nRangesByCluster = rangesByCluster.size();
591
592 // Distribute slots equidistantly over the entry range, aligned on cluster boundaries
594 const auto remainder = nRangesByCluster % nSlotsPerFile;
595 std::size_t iRange = 0;
596 unsigned int iSlot = 0;
597 const unsigned int N = std::min(nSlotsPerFile, nRangesByCluster);
598 for (; iSlot < N; ++iSlot) {
599 auto start = rangesByCluster[iRange].first;
600 iRange += nClustersPerSlot + static_cast<int>(iSlot < remainder);
601 assert(iRange > 0);
602 auto end = rangesByCluster[iRange - 1].second;
603
605 // The last range for this file just takes the already opened page source. All previous ranges clone.
606 if (iSlot == N - 1) {
607 range.fSource = std::move(source);
608 } else {
609 range.fSource = source->Clone();
610 }
611 range.fSource->SetEntryRange({start, end - start});
612 range.fFirstEntry = start;
613 range.fLastEntry = end;
614 fNextRanges.emplace_back(std::move(range));
615 }
616 } // loop over tail of remaining files
617}
618
619std::vector<std::pair<ULong64_t, ULong64_t>> RNTupleDS::GetEntryRanges()
620{
621 std::vector<std::pair<ULong64_t, ULong64_t>> ranges;
622
623 // We need to distinguish between single threaded and multi-threaded runs.
624 // In single threaded mode, InitSlot is only called once and column readers have to be rewired
625 // to new page sources of the chain in GetEntryRanges. In multi-threaded mode, on the other hand,
626 // InitSlot is called for every returned range, thus rewiring the column readers takes place in
627 // InitSlot and FinalizeSlot.
628
629 if (fNSlots == 1) {
630 for (auto r : fActiveColumnReaders[0]) {
631 r->Disconnect(true /* keepValue */);
632 }
633 }
634
635 // If we have fewer files than slots and we run multiple event loops, we can reuse fCurrentRanges and don't need
636 // to worry about loading the fNextRanges. I.e., in this case we don't enter the if block.
637 if (fCurrentRanges.empty() || (fSeenEntries > 0)) {
638 // Otherwise, i.e. start of the first event loop or in the middle of the event loop, prepare the next ranges
639 // and swap with the current ones.
640 {
641 std::unique_lock lock(fMutexStaging);
642 fCvStaging.wait(lock, [this] { return fHasNextSources; });
643 }
645 if (fNextRanges.empty()) {
646 // No more data
647 return ranges;
648 }
649
650 assert(fNextRanges.size() <= fNSlots);
651
652 fCurrentRanges.clear();
653 std::swap(fCurrentRanges, fNextRanges);
654 }
655
656 // Stage next batch of files for the next call to GetEntryRanges()
657 {
658 std::lock_guard _(fMutexStaging);
659 fIsReadyForStaging = true;
660 fHasNextSources = false;
661 }
662 fCvStaging.notify_one();
663
664 // Create ranges for the RDF loop manager from the list of REntryRangeDS records.
665 // The entry ranges that are relative to the page source in REntryRangeDS are translated into absolute
666 // entry ranges, given the current state of the entry cursor.
667 // We remember the connection from first absolute entry index of a range to its REntryRangeDS record
668 // so that we can properly rewire the column reader in InitSlot
669 fFirstEntry2RangeIdx.clear();
671 for (std::size_t i = 0; i < fCurrentRanges.size(); ++i) {
672 // Several consecutive ranges may operate on the same file (each with their own page source clone).
673 // We can detect a change of file when the first entry number jumps back to 0.
674 if (fCurrentRanges[i].fFirstEntry == 0) {
675 // New source
678 }
679 auto start = fCurrentRanges[i].fFirstEntry + fSeenEntries;
680 auto end = fCurrentRanges[i].fLastEntry + fSeenEntries;
681 nEntriesPerSource += end - start;
682
683 fFirstEntry2RangeIdx[start] = i;
684 ranges.emplace_back(start, end);
685 }
687
688 if ((fNSlots == 1) && (fCurrentRanges[0].fSource)) {
689 for (auto r : fActiveColumnReaders[0]) {
690 r->Connect(*fCurrentRanges[0].fSource, ranges[0].first);
691 }
692 }
693
694 return ranges;
695}
696
698{
699 if (fNSlots == 1)
700 return;
701
703 for (auto r : fActiveColumnReaders[slot]) {
704 r->Connect(*fCurrentRanges[idxRange].fSource, firstEntry - fCurrentRanges[idxRange].fFirstEntry);
705 }
706}
707
709{
710 if (fNSlots == 1)
711 return;
712
713 for (auto r : fActiveColumnReaders[slot]) {
714 r->Disconnect(true /* keepValue */);
715 }
716}
717
718std::string RNTupleDS::GetTypeName(std::string_view colName) const
719{
720 auto colNamePos = std::find(fColumnNames.begin(), fColumnNames.end(), colName);
721
722 if (colNamePos == fColumnNames.end()) {
723 auto msg = std::string("RNTupleDS: There is no column with name \"") + std::string(colName) + "\"";
724 throw std::runtime_error(msg);
725 }
726
727 const auto index = std::distance(fColumnNames.begin(), colNamePos);
728 return fColumnTypes[index];
729}
730
731bool RNTupleDS::HasColumn(std::string_view colName) const
732{
733 return std::find(fColumnNames.begin(), fColumnNames.end(), colName) != fColumnNames.end();
734}
735
737{
738 fSeenEntries = 0;
739 fNextFileIndex = 0;
741 fThreadStaging = std::thread(&RNTupleDS::ExecStaging, this);
742 assert(fNextRanges.empty());
743
744 if (fCurrentRanges.empty() || (fFileNames.size() > fNSlots)) {
745 // First event loop or large number of files: start the staging process.
746 {
747 std::lock_guard _(fMutexStaging);
748 fIsReadyForStaging = true;
749 }
750 fCvStaging.notify_one();
751 } else {
752 // Otherwise, we will reuse fCurrentRanges. Make sure that staging and preparing next ranges will be a noop
753 // (already at the end of the list of files).
754 fNextFileIndex = std::max(fFileNames.size(), std::size_t(1));
755 }
756}
757
759{
760 for (unsigned int i = 0; i < fNSlots; ++i) {
761 for (auto r : fActiveColumnReaders[i]) {
762 r->Disconnect(false /* keepValue */);
763 }
764 }
765 {
766 std::lock_guard _(fMutexStaging);
768 }
769 fCvStaging.notify_one();
770 fThreadStaging.join();
771 // If we have a chain with more files than the number of slots, the files opened at the end of the
772 // event loop won't be reused when the event loop restarts, so we can close them.
773 if (fFileNames.size() > fNSlots) {
774 fCurrentRanges.clear();
775 fNextRanges.clear();
776 fStagingArea.clear();
777 fStagingArea.resize(fFileNames.size());
778 }
779}
780
781void RNTupleDS::SetNSlots(unsigned int nSlots)
782{
783 assert(fNSlots == 0);
784 assert(nSlots > 0);
785 fNSlots = nSlots;
787}
788} // namespace Experimental
789} // namespace ROOT
790
791ROOT::RDataFrame ROOT::RDF::Experimental::FromRNTuple(std::string_view ntupleName, std::string_view fileName)
792{
793 return ROOT::RDataFrame(std::make_unique<ROOT::Experimental::RNTupleDS>(ntupleName, fileName));
794}
795
797ROOT::RDF::Experimental::FromRNTuple(std::string_view ntupleName, const std::vector<std::string> &fileNames)
798{
799 return ROOT::RDataFrame(std::make_unique<ROOT::Experimental::RNTupleDS>(ntupleName, fileNames));
800}
#define f(i)
Definition RSha256.hxx:104
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
long long Long64_t
Definition RtypesCore.h:69
unsigned long long ULong64_t
Definition RtypesCore.h:70
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define N
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t index
char name[80]
Definition TGX11.cxx:110
R__EXTERN TSystem * gSystem
Definition TSystem.h:572
#define _(A, B)
Definition cfortran.h:108
An artificial field that provides the size of a fixed-size array.
std::unique_ptr< ROOT::RFieldBase > CloneImpl(std::string_view) const final
Called by Clone(), which additionally copies the on-disk ID.
RArraySizeField & operator=(RArraySizeField &&other)=default
void ConstructValue(void *where) const final
Constructs value in a given location of size at least GetValueSize(). Called by the base class' Creat...
void ReadInClusterImpl(RNTupleLocalIndex, void *to) final
void GenerateColumns(const ROOT::RNTupleDescriptor &) final
Implementations in derived classes should create the backing columns corresponding to the field type ...
void ReadGlobalImpl(ROOT::NTupleSize_t, void *to) final
RArraySizeField(RArraySizeField &&other)=default
std::size_t GetValueSize() const final
The number of bytes taken by a value of the appropriate type.
RArraySizeField(const RArraySizeField &other)=delete
std::size_t GetAlignment() const final
As a rule of thumb, the alignment is equal to the size of the type.
void GenerateColumns() final
Implementations in derived classes should create the backing columns corresponding to the field type ...
RArraySizeField & operator=(const RArraySizeField &other)=delete
Every RDF column is represented by exactly one RNTuple field.
std::unique_ptr< RFieldBase > fField
The field backing the RDF column.
Long64_t fEntryOffset
For chains, the logical entry and the physical entry in any particular file can be different.
std::unique_ptr< RFieldBase::RValue > fValue
The memory location used to read from fField.
RNTupleColumnReader(RNTupleDS *ds, RFieldBase *protoField)
Long64_t fLastEntry
Last entry number that was read.
RNTupleDS * fDataSource
The data source that owns this column reader.
void Connect(RPageSource &source, Long64_t entryOffset)
Connect the field and its subfields to the page source.
std::shared_ptr< void > fValuePtr
Used to reuse the object created by fValue when reconnecting sources.
RFieldBase * fProtoField
The prototype field from which fField is cloned.
Abstract interface to read data from an ntuple.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
An artificial field that transforms an RNTuple column that contains the offset of collections into co...
Definition RNTupleDS.cxx:64
void ReadInClusterImpl(ROOT::RNTupleLocalIndex localIndex, void *to) final
Get the number of elements of the collection identified by clusterIndex.
void ReadGlobalImpl(ROOT::NTupleSize_t globalIndex, void *to) final
Get the number of elements of the collection identified by globalIndex.
Definition RNTupleDS.cxx:98
RRDFCardinalityField(RRDFCardinalityField &&other)=default
void GenerateColumns(const ROOT::RNTupleDescriptor &desc) final
Implementations in derived classes should create the backing columns corresponding to the field type ...
Definition RNTupleDS.cxx:89
std::unique_ptr< ROOT::RFieldBase > CloneImpl(std::string_view) const final
Called by Clone(), which additionally copies the on-disk ID.
Definition RNTupleDS.cxx:66
const RColumnRepresentations & GetColumnRepresentations() const final
Implementations in derived classes should return a static RColumnRepresentations object.
Definition RNTupleDS.cxx:78
void GenerateColumns() final
Implementations in derived classes should create the backing columns corresponding to the field type ...
Definition RNTupleDS.cxx:88
RRDFCardinalityField & operator=(RRDFCardinalityField &&other)=default
size_t GetValueSize() const final
The number of bytes taken by a value of the appropriate type.
Definition RNTupleDS.cxx:94
size_t GetAlignment() const final
As a rule of thumb, the alignment is equal to the size of the type.
Definition RNTupleDS.cxx:95
void ConstructValue(void *where) const final
Constructs value in a given location of size at least GetValueSize(). Called by the base class' Creat...
Definition RNTupleDS.cxx:70
The RDataSource implementation for RNTuple.
Definition RNTupleDS.hxx:45
std::unique_ptr< ROOT::Detail::RDF::RColumnReaderBase > GetColumnReaders(unsigned int, std::string_view, const std::type_info &) final
If the other GetColumnReaders overload returns an empty vector, this overload will be called instead.
void SetNSlots(unsigned int nSlots) final
Inform RDataSource of the number of processing slots (i.e.
void AddField(const ROOT::RNTupleDescriptor &desc, std::string_view colName, ROOT::DescriptorId_t fieldId, std::vector< RFieldInfo > fieldInfos, bool convertToRVec=true)
Provides the RDF column "colName" given the field identified by fieldID.
void ExecStaging()
The main function of the fThreadStaging background thread.
ROOT::RNTupleDescriptor fPrincipalDescriptor
A clone of the first pages source's descriptor.
Definition RNTupleDS.hxx:59
std::vector< std::unique_ptr< ROOT::RFieldBase > > fProtoFields
We prepare a prototype field for every column.
Definition RNTupleDS.hxx:87
Record_t GetColumnReadersImpl(std::string_view name, const std::type_info &) final
type-erased vector of pointers to pointers to column values - one per slot
std::thread fThreadStaging
The background thread that runs StageNextSources()
void Initialize() final
Convenience method called before starting an event-loop.
std::size_t fNextFileIndex
Index into fFileNames to the next file to process.
Definition RNTupleDS.hxx:81
std::vector< std::string > fColumnNames
Definition RNTupleDS.hxx:97
std::unordered_map< ULong64_t, std::size_t > fFirstEntry2RangeIdx
Maps the first entries from the ranges of the last GetEntryRanges() call to their corresponding index...
void Finalize() final
Convenience method called after concluding an event-loop.
std::string GetTypeName(std::string_view colName) const final
Type of a column as a string, e.g.
void InitSlot(unsigned int slot, ULong64_t firstEntry) final
Convenience method called at the start of the data processing associated to a slot.
void FinalizeSlot(unsigned int slot) final
Convenience method called at the end of the data processing associated to a slot.
std::vector< std::vector< Internal::RNTupleColumnReader * > > fActiveColumnReaders
List of column readers returned by GetColumnReaders() organized by slot.
std::vector< std::pair< ULong64_t, ULong64_t > > GetEntryRanges() final
Return ranges of entries to distribute to tasks.
std::vector< REntryRangeDS > fCurrentRanges
Basis for the ranges returned by the last GetEntryRanges() call.
bool fStagingThreadShouldTerminate
Is true when the I/O thread should quit.
bool fHasNextSources
Is true when the staging thread has populated the next batch of files to fStagingArea.
std::vector< REntryRangeDS > fNextRanges
Basis for the ranges populated by the PrepareNextRanges() call.
std::mutex fMutexStaging
Protects the shared state between the main thread and the I/O thread.
void StageNextSources()
Starting from fNextFileIndex, opens the next fNSlots files.
bool HasColumn(std::string_view colName) const final
Checks if the dataset has a certain column.
std::vector< std::unique_ptr< ROOT::Experimental::Internal::RPageSource > > fStagingArea
The staging area is relevant for chains of files, i.e.
Definition RNTupleDS.hxx:80
void PrepareNextRanges()
Populates fNextRanges with the next set of entry ranges.
std::unordered_map< std::size_t, std::vector< std::unique_ptr< ROOT::RFieldBase > > > fAlternativeProtoFields
Columns may be requested with types other than with which they were initially added as proto fields.
Definition RNTupleDS.hxx:92
std::vector< std::string > fFileNames
Definition RNTupleDS.hxx:63
std::string fNTupleName
The data source may be constructed with an ntuple name and a list of files.
Definition RNTupleDS.hxx:62
std::vector< std::string > fColumnTypes
Definition RNTupleDS.hxx:98
RNTupleDS(std::unique_ptr< ROOT::Experimental::Internal::RPageSource > pageSource)
std::unordered_map< ROOT::DescriptorId_t, std::string > fFieldId2QualifiedName
Connects the IDs of active proto fields and their subfields to their fully qualified name (a....
Definition RNTupleDS.hxx:96
ULong64_t fSeenEntries
The number of entries so far returned by GetEntryRanges()
std::condition_variable fCvStaging
Signal for the state information of fIsReadyForStaging and fHasNextSources.
bool fIsReadyForStaging
Is true when the staging thread should start working.
void GetCollectionInfo(const ROOT::NTupleSize_t globalIndex, RNTupleLocalIndex *collectionStart, ROOT::NTupleSize_t *collectionSize)
For offset columns only, look at the two adjacent values that define a collection's coordinates.
Definition RColumn.hxx:285
static void SetClusterBunchSize(RNTupleReadOptions &options, unsigned int val)
std::vector< void * > Record_t
ROOT's RDataFrame offers a modern, high-level interface for analysis of data stored in TTree ,...
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
The list of column representations a field can have.
A field translates read and write calls from/to underlying columns to/from tree values.
ROOT::Internal::RColumn * fPrincipalColumn
All fields that have columns have a distinct main column.
RConstSchemaIterator cbegin() const
const std::string & GetFieldName() const
static RResult< std::unique_ptr< RFieldBase > > Create(const std::string &fieldName, const std::string &typeName, const ROOT::RCreateFieldOptions &options, const ROOT::RNTupleDescriptor *desc, ROOT::DescriptorId_t fieldId)
Factory method to resurrect a field from the stored on-disk type information.
ROOT::DescriptorId_t GetOnDiskId() const
std::unique_ptr< RFieldBase > Clone(std::string_view newName) const
Copies the field and its subfields using a possibly new name and a new, unconnected set of columns.
The on-storage metadata of an RNTuple.
RFieldDescriptorIterable GetFieldIterable(const RFieldDescriptor &fieldDesc) const
const RFieldDescriptor & GetFieldDescriptor(ROOT::DescriptorId_t fieldId) const
ROOT::DescriptorId_t GetFieldZeroId() const
Returns the logical parent of all top-level RNTuple data fields.
ROOT::DescriptorId_t FindFieldId(std::string_view fieldName, ROOT::DescriptorId_t parentId) const
std::string GetQualifiedFieldName(ROOT::DescriptorId_t fieldId) const
Walks up the parents of the field ID and returns a field name of the form a.b.c.d In case of invalid ...
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Common user-tunable settings for reading RNTuples.
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:69
const_iterator begin() const
const_iterator end() const
virtual const char * Getenv(const char *env)
Get environment variable.
Definition TSystem.cxx:1676
std::string TypeID2TypeName(const std::type_info &id)
Returns the name of a type starting from its type_info An empty string is returned in case of failure...
Definition RDFUtils.cxx:121
void CallConnectPageSourceOnField(RFieldBase &, ROOT::Experimental::Internal::RPageSource &)
std::string GetRenormalizedTypeName(const std::string &metaNormalizedName)
Given a type name normalized by ROOT meta, renormalize it for RNTuple. E.g., insert std::prefix.
RDataFrame FromRNTuple(std::string_view ntupleName, std::string_view fileName)
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
constexpr DescriptorId_t kInvalidDescriptorId
The PrepareNextRanges() method populates the fNextRanges list with REntryRangeDS records.
Definition RNTupleDS.hxx:51