Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleDS.cxx
Go to the documentation of this file.
1/// \file RNTupleDS.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \author Enrico Guiraud <enrico.guiraud@cern.ch>
5/// \date 2018-10-04
6/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
7/// is welcome!
8
9/*************************************************************************
10 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
11 * All rights reserved. *
12 * *
13 * For the licensing terms see $ROOTSYS/LICENSE. *
14 * For the list of contributors see $ROOTSYS/README/CREDITS. *
15 *************************************************************************/
16
18#include <ROOT/RField.hxx>
21#include <ROOT/RNTupleDS.hxx>
22#include <ROOT/RNTupleUtil.hxx>
23#include <ROOT/RPageStorage.hxx>
24#include <string_view>
25
26#include <TError.h>
27#include <TSystem.h>
28
29#include <cassert>
30#include <memory>
31#include <mutex>
32#include <string>
33#include <vector>
34#include <typeinfo>
35#include <utility>
36
37// clang-format off
38/**
39* \class ROOT::Experimental::RNTupleDS
40* \ingroup dataframe
41* \brief The RDataSource implementation for RNTuple. It lets RDataFrame read RNTuple data.
42*
43* An RDataFrame that reads RNTuple data can be constructed using FromRNTuple().
44*
45* For each column containing an array or a collection, a corresponding column `#colname` is available to access
46* `colname.size()` without reading and deserializing the collection values.
47*
48**/
49// clang-format on
50
51namespace ROOT {
52namespace Experimental {
53namespace Internal {
54
55/// An artificial field that transforms an RNTuple column that contains the offset of collections into
56/// collection sizes. It is used to provide the "number of" RDF columns for collections, e.g.
57/// `R_rdf_sizeof_jets` for a collection named `jets`.
58///
59/// This field owns the collection offset field but instead of exposing the collection offsets it exposes
60/// the collection sizes (offset(N+1) - offset(N)). For the time being, we offer this functionality only in RDataFrame.
61/// TODO(jblomer): consider providing a general set of useful virtual fields as part of RNTuple.
63protected:
64 std::unique_ptr<ROOT::Experimental::RFieldBase> CloneImpl(std::string_view /* newName */) const final
65 {
66 return std::make_unique<RRDFCardinalityField>();
67 }
68 void ConstructValue(void *where) const final { *static_cast<std::size_t *>(where) = 0; }
69
70public:
71 static std::string TypeName() { return "std::size_t"; }
73 : ROOT::Experimental::RFieldBase("", TypeName(), ENTupleStructure::kLeaf, false /* isSimple */)
74 {
75 }
79
81 {
82 static RColumnRepresentations representations(
84 {});
85 return representations;
86 }
87 // Field is only used for reading
88 void GenerateColumns() final { assert(false && "Cardinality fields must only be used for reading"); }
89 void GenerateColumns(const RNTupleDescriptor &desc) final { GenerateColumnsImpl<Internal::RColumnIndex>(desc); }
90
91 size_t GetValueSize() const final { return sizeof(std::size_t); }
92 size_t GetAlignment() const final { return alignof(std::size_t); }
93
94 /// Get the number of elements of the collection identified by globalIndex
95 void ReadGlobalImpl(ROOT::Experimental::NTupleSize_t globalIndex, void *to) final
96 {
97 RClusterIndex collectionStart;
99 fPrincipalColumn->GetCollectionInfo(globalIndex, &collectionStart, &size);
100 *static_cast<std::size_t *>(to) = size;
101 }
102
103 /// Get the number of elements of the collection identified by clusterIndex
104 void ReadInClusterImpl(ROOT::Experimental::RClusterIndex clusterIndex, void *to) final
105 {
106 RClusterIndex collectionStart;
108 fPrincipalColumn->GetCollectionInfo(clusterIndex, &collectionStart, &size);
109 *static_cast<std::size_t *>(to) = size;
110 }
111};
112
113/**
114 * @brief An artificial field that provides the size of a fixed-size array
115 *
116 * This is the implementation of `R_rdf_sizeof_column` in case `column` contains
117 * fixed-size arrays on disk.
118 */
120private:
121 std::size_t fArrayLength;
122
123 std::unique_ptr<ROOT::Experimental::RFieldBase> CloneImpl(std::string_view) const final
124 {
125 return std::make_unique<RArraySizeField>(fArrayLength);
126 }
127 void GenerateColumns() final { assert(false && "RArraySizeField fields must only be used for reading"); }
129 void ReadGlobalImpl(NTupleSize_t /*globalIndex*/, void *to) final { *static_cast<std::size_t *>(to) = fArrayLength; }
130 void ReadInClusterImpl(RClusterIndex /*clusterIndex*/, void *to) final
131 {
132 *static_cast<std::size_t *>(to) = fArrayLength;
133 }
134
135public:
136 RArraySizeField(std::size_t arrayLength)
137 : ROOT::Experimental::RFieldBase("", "std::size_t", ENTupleStructure::kLeaf, false /* isSimple */),
138 fArrayLength(arrayLength)
139 {
140 }
141 RArraySizeField(const RArraySizeField &other) = delete;
145 ~RArraySizeField() final = default;
146
147 void ConstructValue(void *where) const final { *static_cast<std::size_t *>(where) = 0; }
148 std::size_t GetValueSize() const final { return sizeof(std::size_t); }
149 std::size_t GetAlignment() const final { return alignof(std::size_t); }
150};
151
152/// Every RDF column is represented by exactly one RNTuple field
156
157 RNTupleDS *fDataSource; ///< The data source that owns this column reader
158 RFieldBase *fProtoField; ///< The prototype field from which fField is cloned
159 std::unique_ptr<RFieldBase> fField; ///< The field backing the RDF column
160 std::unique_ptr<RFieldBase::RValue> fValue; ///< The memory location used to read from fField
161 std::shared_ptr<void> fValuePtr; ///< Used to reuse the object created by fValue when reconnecting sources
162 Long64_t fLastEntry = -1; ///< Last entry number that was read
163 /// For chains, the logical entry and the physical entry in any particular file can be different.
164 /// The entry offset stores the logical entry number (sum of all previous physical entries) when a file of the corresponding
165 /// data source was opened.
167
168public:
169 RNTupleColumnReader(RNTupleDS *ds, RFieldBase *protoField) : fDataSource(ds), fProtoField(protoField) {}
171
172 /// Connect the field and its subfields to the page source
173 void Connect(RPageSource &source, Long64_t entryOffset)
174 {
175 assert(fLastEntry == -1);
176 fEntryOffset = entryOffset;
177
178 // Create a new, real field from the prototype and set its field ID in the context of the given page source
180 {
181 auto descGuard = source.GetSharedDescriptorGuard();
182 // Set the on-disk field IDs for the field and the subfield
183 fField->SetOnDiskId(
184 descGuard->FindFieldId(fDataSource->fFieldId2QualifiedName.at(fProtoField->GetOnDiskId())));
185 auto iProto = fProtoField->cbegin();
186 auto iReal = fField->begin();
187 for (; iReal != fField->end(); ++iProto, ++iReal) {
188 iReal->SetOnDiskId(descGuard->FindFieldId(fDataSource->fFieldId2QualifiedName.at(iProto->GetOnDiskId())));
189 }
190 }
191
193
194 if (fValuePtr) {
195 // When the reader reconnects to a new file, the fValuePtr is already set
196 fValue = std::make_unique<RFieldBase::RValue>(fField->BindValue(fValuePtr));
197 fValuePtr = nullptr;
198 } else {
199 // For the first file, create a new object for this field (reader)
200 fValue = std::make_unique<RFieldBase::RValue>(fField->CreateValue());
201 }
202 }
203
204 void Disconnect(bool keepValue)
205 {
206 if (fValue && keepValue) {
207 fValuePtr = fValue->GetPtr<void>();
208 }
209 fValue = nullptr;
210 fField = nullptr;
211 fLastEntry = -1;
212 }
213
214 void *GetImpl(Long64_t entry) final
215 {
216 if (entry != fLastEntry) {
217 fValue->Read(entry - fEntryOffset);
218 fLastEntry = entry;
219 }
220 return fValue->GetPtr<void>().get();
221 }
222};
223
224} // namespace Internal
225
226RNTupleDS::~RNTupleDS() = default;
227
228void RNTupleDS::AddField(const RNTupleDescriptor &desc, std::string_view colName, DescriptorId_t fieldId,
229 std::vector<RNTupleDS::RFieldInfo> fieldInfos)
230{
231 // As an example for the mapping of RNTuple fields to RDF columns, let's consider an RNTuple
232 // using the following types and with a top-level field named "event" of type Event:
233 //
234 // struct Event {
235 // int id;
236 // std::vector<Track> tracks;
237 // };
238 // struct Track {
239 // std::vector<Hit> hits;
240 // };
241 // struct Hit {
242 // float x;
243 // float y;
244 // };
245 //
246 // AddField() will be called from the constructor with the RNTuple root field (ENTupleStructure::kRecord).
247 // From there, we recurse into the "event" sub field (also ENTupleStructure::kRecord) and further down the
248 // tree of sub fields and expose the following RDF columns:
249 //
250 // "event" [Event]
251 // "event.id" [int]
252 // "event.tracks" [RVec<Track>]
253 // "R_rdf_sizeof_event.tracks" [unsigned int]
254 // "event.tracks.hits" [RVec<RVec<Hit>>]
255 // "R_rdf_sizeof_event.tracks.hits" [RVec<unsigned int>]
256 // "event.tracks.hits.x" [RVec<RVec<float>>]
257 // "R_rdf_sizeof_event.tracks.hits.x" [RVec<unsigned int>]
258 // "event.tracks.hits.y" [RVec<RVec<float>>]
259 // "R_rdf_sizeof_event.tracks.hits.y" [RVec<unsigned int>]
260
261 const auto &fieldDesc = desc.GetFieldDescriptor(fieldId);
262 const auto &nRepetitions = fieldDesc.GetNRepetitions();
263 if ((fieldDesc.GetStructure() == ENTupleStructure::kCollection) || (nRepetitions > 0)) {
264 // The field is a collection or a fixed-size array.
265 // We open a new collection scope with fieldID being the inner most collection. E.g. for "event.tracks.hits",
266 // fieldInfos would already contain the fieldID of "event.tracks"
267 fieldInfos.emplace_back(fieldId, nRepetitions);
268 }
269
270 if (fieldDesc.GetStructure() == ENTupleStructure::kCollection) {
271 // Inner fields of collections are provided as projected collections of only that inner field,
272 // E.g. we provide a projected collection RVec<RVec<float>> for "event.tracks.hits.x" in the example
273 // above.
274
275 if (fieldDesc.GetTypeName().empty()) {
276 // Anonymous collection with one or several sub fields
277 auto cardinalityField = std::make_unique<ROOT::Experimental::Internal::RRDFCardinalityField>();
278 cardinalityField->SetOnDiskId(fieldId);
279 fColumnNames.emplace_back("R_rdf_sizeof_" + std::string(colName));
280 fColumnTypes.emplace_back(cardinalityField->GetTypeName());
281 fProtoFields.emplace_back(std::move(cardinalityField));
282
283 for (const auto &f : desc.GetFieldIterable(fieldDesc.GetId())) {
284 AddField(desc, std::string(colName) + "." + f.GetFieldName(), f.GetId(), fieldInfos);
285 }
286 } else {
287 // ROOT::RVec with exactly one sub field
288 const auto &f = *desc.GetFieldIterable(fieldDesc.GetId()).begin();
289 AddField(desc, colName, f.GetId(), fieldInfos);
290 }
291 // Note that at the end of the recursion, we handled the inner sub collections as well as the
292 // collection as whole, so we are done.
293 return;
294
295 } else if (nRepetitions > 0) {
296 // Fixed-size array, same logic as ROOT::RVec.
297 const auto &f = *desc.GetFieldIterable(fieldDesc.GetId()).begin();
298 AddField(desc, colName, f.GetId(), fieldInfos);
299 return;
300 } else if (fieldDesc.GetStructure() == ENTupleStructure::kRecord) {
301 // Inner fields of records are provided as individual RDF columns, e.g. "event.id"
302 for (const auto &f : desc.GetFieldIterable(fieldDesc.GetId())) {
303 auto innerName = colName.empty() ? f.GetFieldName() : (std::string(colName) + "." + f.GetFieldName());
304 AddField(desc, innerName, f.GetId(), fieldInfos);
305 }
306 }
307
308 // The fieldID could be the root field or the class of fieldId might not be loaded.
309 // In these cases, only the inner fields are exposed as RDF columns.
310 auto fieldOrException = RFieldBase::Create(fieldDesc.GetFieldName(), fieldDesc.GetTypeName());
311 if (!fieldOrException)
312 return;
313 auto valueField = fieldOrException.Unwrap();
314 valueField->SetOnDiskId(fieldId);
315 for (auto &f : *valueField) {
316 f.SetOnDiskId(desc.FindFieldId(f.GetFieldName(), f.GetParent()->GetOnDiskId()));
317 }
318 std::unique_ptr<RFieldBase> cardinalityField;
319 // Collections get the additional "number of" RDF column (e.g. "R_rdf_sizeof_tracks")
320 if (!fieldInfos.empty()) {
321 const auto &info = fieldInfos.back();
322 if (info.fNRepetitions > 0) {
323 cardinalityField = std::make_unique<ROOT::Experimental::Internal::RArraySizeField>(info.fNRepetitions);
324 } else {
325 cardinalityField = std::make_unique<ROOT::Experimental::Internal::RRDFCardinalityField>();
326 }
327 cardinalityField->SetOnDiskId(info.fFieldId);
328 }
329
330 for (auto i = fieldInfos.rbegin(); i != fieldInfos.rend(); ++i) {
331 const auto &fieldInfo = *i;
332
333 if (fieldInfo.fNRepetitions > 0) {
334 // Fixed-size array, read it as ROOT::RVec in memory
335 valueField =
336 std::make_unique<ROOT::Experimental::RArrayAsRVecField>("", std::move(valueField), fieldInfo.fNRepetitions);
337 } else {
338 // Actual ROOT::RVec
339 valueField = std::make_unique<ROOT::Experimental::RRVecField>("", std::move(valueField));
340 }
341
342 valueField->SetOnDiskId(fieldInfo.fFieldId);
343
344 // Skip the inner-most collection level to construct the cardinality column
345 // It's taken care of by the `if (!fieldInfos.empty())` scope above
346 if (i != fieldInfos.rbegin()) {
347 if (fieldInfo.fNRepetitions > 0) {
348 // This collection level refers to a fixed-size array
349 cardinalityField = std::make_unique<ROOT::Experimental::RArrayAsRVecField>("", std::move(cardinalityField),
350 fieldInfo.fNRepetitions);
351 } else {
352 // This collection level refers to an RVec
353 cardinalityField = std::make_unique<ROOT::Experimental::RRVecField>("", std::move(cardinalityField));
354 }
355
356 cardinalityField->SetOnDiskId(fieldInfo.fFieldId);
357 }
358 }
359
360 if (cardinalityField) {
361 fColumnNames.emplace_back("R_rdf_sizeof_" + std::string(colName));
362 fColumnTypes.emplace_back(cardinalityField->GetTypeName());
363 fProtoFields.emplace_back(std::move(cardinalityField));
364 }
365
366 fieldInfos.emplace_back(fieldId, nRepetitions);
367 fColumnNames.emplace_back(colName);
368 fColumnTypes.emplace_back(valueField->GetTypeName());
369 fProtoFields.emplace_back(std::move(valueField));
370}
371
372RNTupleDS::RNTupleDS(std::unique_ptr<Internal::RPageSource> pageSource)
373{
374 pageSource->Attach();
375 fPrincipalDescriptor = pageSource->GetSharedDescriptorGuard()->Clone();
376 fStagingArea.emplace_back(std::move(pageSource));
377
378 AddField(*fPrincipalDescriptor, "", fPrincipalDescriptor->GetFieldZeroId(),
379 std::vector<ROOT::Experimental::RNTupleDS::RFieldInfo>());
380}
381
382namespace {
383
385{
386 // The setting is for now a global one, must be decided before running the
387 // program by setting the appropriate environment variable. Make sure that
388 // option configuration is thread-safe and happens only once.
390 static std::once_flag flag;
391 std::call_once(flag, []() {
392 if (auto env = gSystem->Getenv("ROOT_RNTUPLE_CLUSTERBUNCHSIZE"); env != nullptr && strlen(env) > 0) {
393 std::string envStr{env};
394 auto envNum{std::stoul(envStr)};
395 envNum = envNum == 0 ? 1 : envNum;
396 opts.SetClusterBunchSize(envNum);
397 }
398 });
399 return opts;
400}
401
402std::unique_ptr<ROOT::Experimental::Internal::RPageSource>
403CreatePageSource(std::string_view ntupleName, std::string_view fileName)
404{
405 return ROOT::Experimental::Internal::RPageSource::Create(ntupleName, fileName, GetOpts());
406}
407} // namespace
408
409RNTupleDS::RNTupleDS(std::string_view ntupleName, std::string_view fileName)
410 : RNTupleDS(CreatePageSource(ntupleName, fileName))
411{
412}
413
415 : RNTupleDS(ROOT::Experimental::Internal::RPageSourceFile::CreateFromAnchor(*ntuple))
416{
417}
418
419RNTupleDS::RNTupleDS(std::string_view ntupleName, const std::vector<std::string> &fileNames)
420 : RNTupleDS(CreatePageSource(ntupleName, fileNames[0]))
421{
422 fNTupleName = ntupleName;
423 fFileNames = fileNames;
424 fStagingArea.resize(fFileNames.size());
425}
426
427RDF::RDataSource::Record_t RNTupleDS::GetColumnReadersImpl(std::string_view /* name */, const std::type_info & /* ti */)
428{
429 // This datasource uses the newer GetColumnReaders() API
430 return {};
431}
432
433std::unique_ptr<ROOT::Detail::RDF::RColumnReaderBase>
434RNTupleDS::GetColumnReaders(unsigned int slot, std::string_view name, const std::type_info & /*tid*/)
435{
436 // At this point we can assume that `name` will be found in fColumnNames
437 // TODO(jblomer): check incoming type
438 const auto index = std::distance(fColumnNames.begin(), std::find(fColumnNames.begin(), fColumnNames.end(), name));
439 auto field = fProtoFields[index].get();
440
441 // Map the field's and subfields' IDs to qualified names so that we can later connect the fields to
442 // other page sources from the chain
443 fFieldId2QualifiedName[field->GetOnDiskId()] = fPrincipalDescriptor->GetQualifiedFieldName(field->GetOnDiskId());
444 for (const auto &s : *field) {
445 fFieldId2QualifiedName[s.GetOnDiskId()] = fPrincipalDescriptor->GetQualifiedFieldName(s.GetOnDiskId());
446 }
447
448 auto reader = std::make_unique<Internal::RNTupleColumnReader>(this, field);
449 fActiveColumnReaders[slot].emplace_back(reader.get());
450
451 return reader;
452}
453
455{
456 while (true) {
457 std::unique_lock lock(fMutexStaging);
458 fCvStaging.wait(lock, [this] { return fIsReadyForStaging || fStagingThreadShouldTerminate; });
460 return;
461
462 assert(!fHasNextSources);
464 fHasNextSources = true;
465 fIsReadyForStaging = false;
466
467 lock.unlock();
468 fCvStaging.notify_one();
469 }
470}
471
473{
474 const auto nFiles = fFileNames.empty() ? 1 : fFileNames.size();
475 for (auto i = fNextFileIndex; (i < nFiles) && ((i - fNextFileIndex) < fNSlots); ++i) {
477 return;
478
479 if (fStagingArea[i]) {
480 // The first file is already open and was used to read the schema
481 assert(i == 0);
482 } else {
483 fStagingArea[i] = CreatePageSource(fNTupleName, fFileNames[i]);
484 fStagingArea[i]->LoadStructure();
485 }
486 }
487}
488
490{
491 assert(fNextRanges.empty());
492 auto nFiles = fFileNames.empty() ? 1 : fFileNames.size();
493 auto nRemainingFiles = nFiles - fNextFileIndex;
494 if (nRemainingFiles == 0)
495 return;
496
497 // Easy work scheduling: one file per slot. We skip empty files (files without entries).
498 if (nRemainingFiles >= fNSlots) {
499 while ((fNextRanges.size() < fNSlots) && (fNextFileIndex < nFiles)) {
500 REntryRangeDS range;
501
502 std::swap(fStagingArea[fNextFileIndex], range.fSource);
503
504 if (!range.fSource) {
505 // Typically, the prestaged source should have been present. Only if some of the files are empty, we need
506 // to open and attach files here.
507 range.fSource = CreatePageSource(fNTupleName, fFileNames[fNextFileIndex]);
508 }
509 range.fSource->Attach();
511
512 auto nEntries = range.fSource->GetNEntries();
513 if (nEntries == 0)
514 continue;
515
516 range.fLastEntry = nEntries; // whole file per slot, i.e. entry range [0..nEntries - 1]
517 fNextRanges.emplace_back(std::move(range));
518 }
519 return;
520 }
521
522 // Work scheduling of the tail: multiple slots work on the same file.
523 // Every slot still has its own page source but these page sources may open the same file.
524 // Again, we need to skip empty files.
525 unsigned int nSlotsPerFile = fNSlots / nRemainingFiles;
526 for (std::size_t i = 0; (fNextRanges.size() < fNSlots) && (fNextFileIndex < nFiles); ++i) {
527 std::unique_ptr<Internal::RPageSource> source;
528 std::swap(fStagingArea[fNextFileIndex], source);
529 if (!source) {
530 // Empty files trigger this condition
531 source = CreatePageSource(fNTupleName, fFileNames[fNextFileIndex]);
532 }
533 source->Attach();
535
536 auto nEntries = source->GetNEntries();
537 if (nEntries == 0)
538 continue;
539
540 // If last file: use all remaining slots
541 if (i == (nRemainingFiles - 1))
542 nSlotsPerFile = fNSlots - fNextRanges.size();
543
544 std::vector<std::pair<ULong64_t, ULong64_t>> rangesByCluster;
545 {
546 auto descriptorGuard = source->GetSharedDescriptorGuard();
547 auto clusterId = descriptorGuard->FindClusterId(0, 0);
548 while (clusterId != kInvalidDescriptorId) {
549 const auto &clusterDesc = descriptorGuard->GetClusterDescriptor(clusterId);
550 rangesByCluster.emplace_back(std::make_pair<ULong64_t, ULong64_t>(
551 clusterDesc.GetFirstEntryIndex(), clusterDesc.GetFirstEntryIndex() + clusterDesc.GetNEntries()));
552 clusterId = descriptorGuard->FindNextClusterId(clusterId);
553 }
554 }
555 const unsigned int nRangesByCluster = rangesByCluster.size();
556
557 // Distribute slots equidistantly over the entry range, aligned on cluster boundaries
558 const auto nClustersPerSlot = nRangesByCluster / nSlotsPerFile;
559 const auto remainder = nRangesByCluster % nSlotsPerFile;
560 std::size_t iRange = 0;
561 unsigned int iSlot = 0;
562 const unsigned int N = std::min(nSlotsPerFile, nRangesByCluster);
563 for (; iSlot < N; ++iSlot) {
564 auto start = rangesByCluster[iRange].first;
565 iRange += nClustersPerSlot + static_cast<int>(iSlot < remainder);
566 assert(iRange > 0);
567 auto end = rangesByCluster[iRange - 1].second;
568
569 REntryRangeDS range;
570 // The last range for this file just takes the already opened page source. All previous ranges clone.
571 if (iSlot == N - 1) {
572 range.fSource = std::move(source);
573 } else {
574 range.fSource = source->Clone();
575 }
576 range.fSource->SetEntryRange({start, end - start});
577 range.fFirstEntry = start;
578 range.fLastEntry = end;
579 fNextRanges.emplace_back(std::move(range));
580 }
581 } // loop over tail of remaining files
582}
583
584std::vector<std::pair<ULong64_t, ULong64_t>> RNTupleDS::GetEntryRanges()
585{
586 std::vector<std::pair<ULong64_t, ULong64_t>> ranges;
587
588 // We need to distinguish between single threaded and multi-threaded runs.
589 // In single threaded mode, InitSlot is only called once and column readers have to be rewired
590 // to new page sources of the chain in GetEntryRanges. In multi-threaded mode, on the other hand,
591 // InitSlot is called for every returned range, thus rewiring the column readers takes place in
592 // InitSlot and FinalizeSlot.
593
594 if (fNSlots == 1) {
595 for (auto r : fActiveColumnReaders[0]) {
596 r->Disconnect(true /* keepValue */);
597 }
598 }
599
600 // If we have fewer files than slots and we run multiple event loops, we can reuse fCurrentRanges and don't need
601 // to worry about loading the fNextRanges. I.e., in this case we don't enter the if block.
602 if (fCurrentRanges.empty() || (fSeenEntries > 0)) {
603 // Otherwise, i.e. start of the first event loop or in the middle of the event loop, prepare the next ranges
604 // and swap with the current ones.
605 {
606 std::unique_lock lock(fMutexStaging);
607 fCvStaging.wait(lock, [this] { return fHasNextSources; });
608 }
610 if (fNextRanges.empty()) {
611 // No more data
612 return ranges;
613 }
614
615 assert(fNextRanges.size() <= fNSlots);
616
617 fCurrentRanges.clear();
618 std::swap(fCurrentRanges, fNextRanges);
619 }
620
621 // Stage next batch of files for the next call to GetEntryRanges()
622 {
623 std::lock_guard _(fMutexStaging);
624 fIsReadyForStaging = true;
625 fHasNextSources = false;
626 }
627 fCvStaging.notify_one();
628
629 // Create ranges for the RDF loop manager from the list of REntryRangeDS records.
630 // The entry ranges that are relative to the page source in REntryRangeDS are translated into absolute
631 // entry ranges, given the current state of the entry cursor.
632 // We remember the connection from first absolute entry index of a range to its REntryRangeDS record
633 // so that we can properly rewire the column reader in InitSlot
634 fFirstEntry2RangeIdx.clear();
635 ULong64_t nEntriesPerSource = 0;
636 for (std::size_t i = 0; i < fCurrentRanges.size(); ++i) {
637 // Several consecutive ranges may operate on the same file (each with their own page source clone).
638 // We can detect a change of file when the first entry number jumps back to 0.
639 if (fCurrentRanges[i].fFirstEntry == 0) {
640 // New source
641 fSeenEntries += nEntriesPerSource;
642 nEntriesPerSource = 0;
643 }
644 auto start = fCurrentRanges[i].fFirstEntry + fSeenEntries;
645 auto end = fCurrentRanges[i].fLastEntry + fSeenEntries;
646 nEntriesPerSource += end - start;
647
648 fFirstEntry2RangeIdx[start] = i;
649 ranges.emplace_back(start, end);
650 }
651 fSeenEntries += nEntriesPerSource;
652
653 if ((fNSlots == 1) && (fCurrentRanges[0].fSource)) {
654 for (auto r : fActiveColumnReaders[0]) {
655 r->Connect(*fCurrentRanges[0].fSource, ranges[0].first);
656 }
657 }
658
659 return ranges;
660}
661
662void RNTupleDS::InitSlot(unsigned int slot, ULong64_t firstEntry)
663{
664 if (fNSlots == 1)
665 return;
666
667 auto idxRange = fFirstEntry2RangeIdx.at(firstEntry);
668 for (auto r : fActiveColumnReaders[slot]) {
669 r->Connect(*fCurrentRanges[idxRange].fSource, firstEntry - fCurrentRanges[idxRange].fFirstEntry);
670 }
671}
672
673void RNTupleDS::FinalizeSlot(unsigned int slot)
674{
675 if (fNSlots == 1)
676 return;
677
678 for (auto r : fActiveColumnReaders[slot]) {
679 r->Disconnect(true /* keepValue */);
680 }
681}
682
683std::string RNTupleDS::GetTypeName(std::string_view colName) const
684{
685 const auto index = std::distance(fColumnNames.begin(), std::find(fColumnNames.begin(), fColumnNames.end(), colName));
686 return fColumnTypes[index];
687}
688
689bool RNTupleDS::HasColumn(std::string_view colName) const
690{
691 return std::find(fColumnNames.begin(), fColumnNames.end(), colName) != fColumnNames.end();
692}
693
695{
696 fSeenEntries = 0;
697 fNextFileIndex = 0;
699 fThreadStaging = std::thread(&RNTupleDS::ExecStaging, this);
700 assert(fNextRanges.empty());
701
702 if (fCurrentRanges.empty() || (fFileNames.size() > fNSlots)) {
703 // First event loop or large number of files: start the staging process.
704 {
705 std::lock_guard _(fMutexStaging);
706 fIsReadyForStaging = true;
707 }
708 fCvStaging.notify_one();
709 } else {
710 // Otherwise, we will reuse fCurrentRanges. Make sure that staging and preparing next ranges will be a noop
711 // (already at the end of the list of files).
712 fNextFileIndex = std::max(fFileNames.size(), std::size_t(1));
713 }
714}
715
717{
718 for (unsigned int i = 0; i < fNSlots; ++i) {
719 for (auto r : fActiveColumnReaders[i]) {
720 r->Disconnect(false /* keepValue */);
721 }
722 }
723 {
724 std::lock_guard _(fMutexStaging);
726 }
727 fCvStaging.notify_one();
728 fThreadStaging.join();
729 // If we have a chain with more files than the number of slots, the files opened at the end of the
730 // event loop won't be reused when the event loop restarts, so we can close them.
731 if (fFileNames.size() > fNSlots) {
732 fCurrentRanges.clear();
733 fNextRanges.clear();
734 fStagingArea.clear();
735 fStagingArea.resize(fFileNames.size());
736 }
737}
738
739void RNTupleDS::SetNSlots(unsigned int nSlots)
740{
741 assert(fNSlots == 0);
742 assert(nSlots > 0);
743 fNSlots = nSlots;
745}
746} // namespace Experimental
747} // namespace ROOT
748
749ROOT::RDataFrame ROOT::RDF::Experimental::FromRNTuple(std::string_view ntupleName, std::string_view fileName)
750{
751 return ROOT::RDataFrame(std::make_unique<ROOT::Experimental::RNTupleDS>(ntupleName, fileName));
752}
753
755ROOT::RDF::Experimental::FromRNTuple(std::string_view ntupleName, const std::vector<std::string> &fileNames)
756{
757 return ROOT::RDataFrame(std::make_unique<ROOT::Experimental::RNTupleDS>(ntupleName, fileNames));
758}
#define f(i)
Definition RSha256.hxx:104
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
long long Long64_t
Definition RtypesCore.h:69
unsigned long long ULong64_t
Definition RtypesCore.h:70
#define N
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t index
char name[80]
Definition TGX11.cxx:110
R__EXTERN TSystem * gSystem
Definition TSystem.h:561
#define _(A, B)
Definition cfortran.h:108
An artificial field that provides the size of a fixed-size array.
RArraySizeField & operator=(RArraySizeField &&other)=default
std::unique_ptr< ROOT::Experimental::RFieldBase > CloneImpl(std::string_view) const final
Called by Clone(), which additionally copies the on-disk ID.
void ReadGlobalImpl(NTupleSize_t, void *to) final
void ConstructValue(void *where) const final
Constructs value in a given location of size at least GetValueSize(). Called by the base class' Creat...
RArraySizeField(RArraySizeField &&other)=default
void GenerateColumns(const ROOT::Experimental::RNTupleDescriptor &) final
Implementations in derived classes should create the backing columns corresponsing to the field type ...
std::size_t GetValueSize() const final
The number of bytes taken by a value of the appropriate type.
RArraySizeField(const RArraySizeField &other)=delete
void ReadInClusterImpl(RClusterIndex, void *to) final
std::size_t GetAlignment() const final
As a rule of thumb, the alignment is equal to the size of the type.
void GenerateColumns() final
Implementations in derived classes should create the backing columns corresponsing to the field type ...
RArraySizeField & operator=(const RArraySizeField &other)=delete
void GetCollectionInfo(const NTupleSize_t globalIndex, RClusterIndex *collectionStart, NTupleSize_t *collectionSize)
For offset columns only, look at the two adjacent values that define a collection's coordinates.
Definition RColumn.hxx:283
Every RDF column is represented by exactly one RNTuple field.
std::unique_ptr< RFieldBase > fField
The field backing the RDF column.
Long64_t fEntryOffset
For chains, the logical entry and the physical entry in any particular file can be different.
std::unique_ptr< RFieldBase::RValue > fValue
The memory location used to read from fField.
RNTupleColumnReader(RNTupleDS *ds, RFieldBase *protoField)
Long64_t fLastEntry
Last entry number that was read.
RNTupleDS * fDataSource
The data source that owns this column reader.
void Connect(RPageSource &source, Long64_t entryOffset)
Connect the field and its subfields to the page source.
std::shared_ptr< void > fValuePtr
Used to reuse the object created by fValue when reconnecting sources.
RFieldBase * fProtoField
The prototype field from which fField is cloned.
Abstract interface to read data from an ntuple.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const RNTupleReadOptions &options=RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
const RSharedDescriptorGuard GetSharedDescriptorGuard() const
Takes the read lock for the descriptor.
An artificial field that transforms an RNTuple column that contains the offset of collections into co...
Definition RNTupleDS.cxx:62
std::unique_ptr< ROOT::Experimental::RFieldBase > CloneImpl(std::string_view) const final
Called by Clone(), which additionally copies the on-disk ID.
Definition RNTupleDS.cxx:64
void GenerateColumns(const RNTupleDescriptor &desc) final
Implementations in derived classes should create the backing columns corresponsing to the field type ...
Definition RNTupleDS.cxx:89
void ReadInClusterImpl(ROOT::Experimental::RClusterIndex clusterIndex, void *to) final
Get the number of elements of the collection identified by clusterIndex.
RRDFCardinalityField(RRDFCardinalityField &&other)=default
const RColumnRepresentations & GetColumnRepresentations() const final
Implementations in derived classes should return a static RColumnRepresentations object.
Definition RNTupleDS.cxx:80
void GenerateColumns() final
Implementations in derived classes should create the backing columns corresponsing to the field type ...
Definition RNTupleDS.cxx:88
RRDFCardinalityField & operator=(RRDFCardinalityField &&other)=default
size_t GetValueSize() const final
The number of bytes taken by a value of the appropriate type.
Definition RNTupleDS.cxx:91
size_t GetAlignment() const final
As a rule of thumb, the alignment is equal to the size of the type.
Definition RNTupleDS.cxx:92
void ConstructValue(void *where) const final
Constructs value in a given location of size at least GetValueSize(). Called by the base class' Creat...
Definition RNTupleDS.cxx:68
void ReadGlobalImpl(ROOT::Experimental::NTupleSize_t globalIndex, void *to) final
Get the number of elements of the collection identified by globalIndex.
Definition RNTupleDS.cxx:95
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Some fields have multiple possible column representations, e.g.
A field translates read and write calls from/to underlying columns to/from tree values.
const std::string & GetFieldName() const
std::unique_ptr< RFieldBase > Clone(std::string_view newName) const
Copies the field and its sub fields using a possibly new name and a new, unconnected set of columns.
RConstSchemaIterator cbegin() const
static RResult< std::unique_ptr< RFieldBase > > Create(const std::string &fieldName, const std::string &canonicalType, const std::string &typeAlias, bool continueOnError=false)
Factory method to resurrect a field from the stored on-disk type information.
Internal::RColumn * fPrincipalColumn
All fields that have columns have a distinct main column.
DescriptorId_t GetOnDiskId() const
The RDataSource implementation for RNTuple.
Definition RNTupleDS.hxx:46
std::unique_ptr< ROOT::Detail::RDF::RColumnReaderBase > GetColumnReaders(unsigned int, std::string_view, const std::type_info &) final
If the other GetColumnReaders overload returns an empty vector, this overload will be called instead.
void SetNSlots(unsigned int nSlots) final
Inform RDataSource of the number of processing slots (i.e.
void ExecStaging()
The main function of the fThreadStaging background thread.
Record_t GetColumnReadersImpl(std::string_view name, const std::type_info &) final
type-erased vector of pointers to pointers to column values - one per slot
std::thread fThreadStaging
The background thread that runs StageNextSources()
void Initialize() final
Convenience method called before starting an event-loop.
void AddField(const RNTupleDescriptor &desc, std::string_view colName, DescriptorId_t fieldId, std::vector< RFieldInfo > fieldInfos)
Provides the RDF column "colName" given the field identified by fieldID.
std::size_t fNextFileIndex
Index into fFileNames to the next file to process.
Definition RNTupleDS.hxx:82
std::vector< std::string > fColumnNames
Definition RNTupleDS.hxx:93
std::unordered_map< ULong64_t, std::size_t > fFirstEntry2RangeIdx
Maps the first entries from the ranges of the last GetEntryRanges() call to their corresponding index...
void Finalize() final
Convenience method called after concluding an event-loop.
std::string GetTypeName(std::string_view colName) const final
Type of a column as a string, e.g.
void InitSlot(unsigned int slot, ULong64_t firstEntry) final
Convenience method called at the start of the data processing associated to a slot.
std::unordered_map< ROOT::Experimental::DescriptorId_t, std::string > fFieldId2QualifiedName
Connects the IDs of active proto fields and their subfields to their fully qualified name (a....
Definition RNTupleDS.hxx:92
void FinalizeSlot(unsigned int slot) final
Convenience method called at the end of the data processing associated to a slot.
std::vector< std::vector< Internal::RNTupleColumnReader * > > fActiveColumnReaders
List of column readers returned by GetColumnReaders() organized by slot.
Definition RNTupleDS.hxx:97
std::vector< std::pair< ULong64_t, ULong64_t > > GetEntryRanges() final
Return ranges of entries to distribute to tasks.
std::vector< REntryRangeDS > fCurrentRanges
Basis for the ranges returned by the last GetEntryRanges() call.
bool fStagingThreadShouldTerminate
Is true when the I/O thread should quit.
std::unique_ptr< RNTupleDescriptor > fPrincipalDescriptor
A clone of the first pages source's descriptor.
Definition RNTupleDS.hxx:60
bool fHasNextSources
Is true when the staging thread has populated the next batch of files to fStagingArea.
std::vector< REntryRangeDS > fNextRanges
Basis for the ranges populated by the PrepareNextRanges() call.
std::vector< std::unique_ptr< ROOT::Experimental::RFieldBase > > fProtoFields
We prepare a prototype field for every column.
Definition RNTupleDS.hxx:88
std::mutex fMutexStaging
Protects the shared state between the main thread and the I/O thread.
void StageNextSources()
Starting from fNextFileIndex, opens the next fNSlots files.
bool HasColumn(std::string_view colName) const final
Checks if the dataset has a certain column.
std::vector< std::unique_ptr< ROOT::Experimental::Internal::RPageSource > > fStagingArea
The staging area is relevant for chains of files, i.e.
Definition RNTupleDS.hxx:81
void PrepareNextRanges()
Populates fNextRanges with the next set of entry ranges.
std::vector< std::string > fFileNames
Definition RNTupleDS.hxx:64
std::string fNTupleName
The data source may be constructed with an ntuple name and a list of files.
Definition RNTupleDS.hxx:63
std::vector< std::string > fColumnTypes
Definition RNTupleDS.hxx:94
RNTupleDS(std::unique_ptr< ROOT::Experimental::Internal::RPageSource > pageSource)
ULong64_t fSeenEntries
The number of entries so far returned by GetEntryRanges()
std::condition_variable fCvStaging
Signal for the state information of fIsReadyForStaging and fHasNextSources.
bool fIsReadyForStaging
Is true when the staging thread should start working.
The on-storage meta-data of an ntuple.
DescriptorId_t FindFieldId(std::string_view fieldName, DescriptorId_t parentId) const
const RFieldDescriptor & GetFieldDescriptor(DescriptorId_t fieldId) const
RFieldDescriptorIterable GetFieldIterable(const RFieldDescriptor &fieldDesc) const
Common user-tunable settings for reading ntuples.
std::vector< void * > Record_t
ROOT's RDataFrame offers a modern, high-level interface for analysis of data stored in TTree ,...
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:69
virtual const char * Getenv(const char *env)
Get environment variable.
Definition TSystem.cxx:1665
void CallConnectPageSourceOnField(RFieldBase &, RPageSource &)
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
ENTupleStructure
The fields in the ntuple model tree can carry different structural information about the type system.
constexpr DescriptorId_t kInvalidDescriptorId
RDataFrame FromRNTuple(std::string_view ntupleName, std::string_view fileName)
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
The PrepareNextRanges() method populates the fNextRanges list with REntryRangeDS records.
Definition RNTupleDS.hxx:52
std::unique_ptr< ROOT::Experimental::Internal::RPageSource > fSource
Definition RNTupleDS.hxx:53
ULong64_t fLastEntry
End entry index in fSource, e.g. the number of entries in the range is fLastEntry - fFirstEntry.
Definition RNTupleDS.hxx:56
ULong64_t fFirstEntry
First entry index in fSource.
Definition RNTupleDS.hxx:54