Logo ROOT  
Reference Guide
Loading...
Searching...
No Matches
RNTupleMerger.cxx
Go to the documentation of this file.
1/// \file RNTupleMerger.cxx
2/// \author Jakob Blomer <jblomer@cern.ch>, Max Orok <maxwellorok@gmail.com>, Alaettin Serhan Mete <amete@anl.gov>,
3/// Giacomo Parolini <giacomo.parolini@cern.ch>
4/// \date 2020-07-08
5/// \warning This is part of the ROOT 7 prototype! It will
6/// change without notice. It might trigger earthquakes. Feedback is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RError.hxx>
17#include <ROOT/RNTuple.hxx>
20#include <ROOT/RNTupleModel.hxx>
21#include <ROOT/RNTupleTypes.hxx>
22#include <ROOT/RNTupleUtils.hxx>
25#include <ROOT/RPageStorage.hxx>
26#include <ROOT/RClusterPool.hxx>
28#include <ROOT/RNTupleZip.hxx>
30#include <TROOT.h>
31#include <TFileMergeInfo.h>
32#include <TFile.h>
33#include <TKey.h>
34
35#include <algorithm>
36#include <deque>
37#include <initializer_list>
38#include <unordered_map>
39#include <vector>
40
51
52using namespace ROOT::Experimental::Internal;
53
55{
56 static ROOT::RLogChannel sLog("ROOT.NTuple.Merge");
57 return sLog;
58}
59
60// TFile options parsing
61// -------------------------------------------------------------------------------------
62static bool BeginsWithDelimitedWord(const TString &str, const char *word)
63{
64 const Ssiz_t wordLen = strlen(word);
65 if (str.Length() < wordLen)
66 return false;
67 if (!str.BeginsWith(word, TString::ECaseCompare::kIgnoreCase))
68 return false;
69 return str.Length() == wordLen || str(wordLen) == ' ';
70}
71
72template <typename T>
73static std::optional<T> ParseStringOption(const TString &opts, const char *pattern,
74 std::initializer_list<std::pair<const char *, T>> validValues)
75{
76 const Ssiz_t patternLen = strlen(pattern);
77 assert(pattern[patternLen - 1] == '='); // we want to parse options with the format `option=Value`
78 if (auto idx = opts.Index(pattern, 0, TString::ECaseCompare::kIgnoreCase);
79 idx >= 0 && opts.Length() > idx + patternLen) {
80 auto sub = TString(opts(idx + patternLen, opts.Length() - idx - patternLen));
81 for (const auto &[name, value] : validValues) {
82 if (BeginsWithDelimitedWord(sub, name)) {
83 return value;
84 }
85 }
86 }
87 return std::nullopt;
88}
89
90static std::optional<ENTupleMergingMode> ParseOptionMergingMode(const TString &opts)
91{
92 return ParseStringOption<ENTupleMergingMode>(opts, "rntuple.MergingMode=",
93 {
97 });
98}
99
100static std::optional<ENTupleMergeErrBehavior> ParseOptionErrBehavior(const TString &opts)
101{
102 return ParseStringOption<ENTupleMergeErrBehavior>(opts, "rntuple.ErrBehavior=",
103 {
106 });
107}
108
109static std::optional<ENTupleMergeVersionBehavior> ParseOptionVersionBehavior(const TString &opts)
110{
112 opts, "rntuple.VersionBehavior=",
113 {
116 });
117}
118// -------------------------------------------------------------------------------------
119
120// Entry point for TFileMerger. Internally calls RNTupleMerger::Merge().
122// IMPORTANT: this function must not throw, as it is used in exception-unsafe code (TFileMerger).
123try {
124 // Check the inputs
125 if (!inputs || inputs->GetEntries() < 3 || !mergeInfo) {
126 R__LOG_ERROR(NTupleMergeLog()) << "Invalid inputs.";
127 return -1;
128 }
129
130 // Parse the input parameters
131 TIter itr(inputs);
132
133 // First entry is the RNTuple name
134 std::string ntupleName = std::string(itr()->GetName());
135
136 // Second entry is the output file
137 TObject *secondArg = itr();
138 TFile *outFile = dynamic_cast<TFile *>(secondArg);
139 if (!outFile) {
140 R__LOG_ERROR(NTupleMergeLog()) << "Second input parameter should be a TFile, but it's a "
141 << secondArg->ClassName() << ".";
142 return -1;
143 }
144
145 // Check if the output file already has a key with that name
146 TKey *outKey = outFile->FindKey(ntupleName.c_str());
147 ROOT::RNTuple *outNTuple = nullptr;
148 if (outKey) {
149 outNTuple = outKey->ReadObject<ROOT::RNTuple>();
150 if (!outNTuple) {
151 R__LOG_ERROR(NTupleMergeLog()) << "Output file already has key, but not of type RNTuple!";
152 return -1;
153 }
154 // In principle, we should already be working on the RNTuple object from the output file, but just continue with
155 // pointer we just got.
156 }
157
158 const bool defaultComp = mergeInfo->fOptions.Contains("DefaultCompression");
159 const bool firstSrcComp = mergeInfo->fOptions.Contains("FirstSrcCompression");
160 const bool extraVerbose = mergeInfo->fOptions.Contains("rntuple.ExtraVerbose");
161 if (defaultComp && firstSrcComp) {
162 // this should never happen through hadd, but a user may call RNTuple::Merge() from custom code.
163 R__LOG_WARNING(NTupleMergeLog()) << "Passed both options \"DefaultCompression\" and \"FirstSrcCompression\": "
164 "only the latter will apply.";
165 }
166 std::optional<std::uint32_t> compression;
167 if (firstSrcComp) {
168 // user passed -ff or -fk: use the same compression as the first RNTuple we find in the sources.
169 // (do nothing here, the compression will be fetched below)
170 } else if (!defaultComp) {
171 // compression was explicitly passed by the user: use it.
172 compression = outFile->GetCompressionSettings();
173 } else {
174 // user passed no compression-related options: use default
176 R__LOG_INFO(NTupleMergeLog()) << "Using the default compression: " << *compression;
177 }
178
179 // The remaining entries are the input files
180 std::vector<std::unique_ptr<RPageSourceFile>> sources;
181 std::vector<RPageSource *> sourcePtrs;
182
183 while (const auto &pitr = itr()) {
184 TFile *inFile = dynamic_cast<TFile *>(pitr);
185 ROOT::RNTuple *anchor = inFile ? inFile->Get<ROOT::RNTuple>(ntupleName.c_str()) : nullptr;
186 if (!anchor) {
187 R__LOG_INFO(NTupleMergeLog()) << "No RNTuple anchor named '" << ntupleName << "' from file '"
188 << inFile->GetName() << "'";
189 continue;
190 }
191
192 auto source = RPageSourceFile::CreateFromAnchor(*anchor);
193 if (!compression) {
194 // Get the compression of this RNTuple and use it as the output compression.
195 // We currently assume all column ranges have the same compression, so we just peek at the first one.
197 auto descriptor = source->GetSharedDescriptorGuard();
198 auto clusterIter = descriptor->GetClusterIterable();
199 auto firstCluster = clusterIter.begin();
200 if (firstCluster == clusterIter.end()) {
202 << "Asked to use the first source's compression as the output compression, but the "
203 "first source (file '"
204 << inFile->GetName()
205 << "') has an empty RNTuple, therefore the output compression could not be "
206 "determined.";
207 return -1;
208 }
209 auto colRangeIter = (*firstCluster).GetColumnRangeIterable();
210 auto firstColRange = colRangeIter.begin();
211 if (firstColRange == colRangeIter.end()) {
213 << "Asked to use the first source's compression as the output compression, but the "
214 "first source (file '"
215 << inFile->GetName()
216 << "') has an empty RNTuple, therefore the output compression could not be "
217 "determined.";
218 return -1;
219 }
220 compression = (*firstColRange).GetCompressionSettings().value();
221 R__LOG_INFO(NTupleMergeLog()) << "Using the first RNTuple's compression: " << *compression;
222 }
223 sources.push_back(std::move(source));
224 }
225
226 RNTupleWriteOptions writeOpts;
227 assert(compression);
228 writeOpts.SetCompression(*compression);
229 auto destination = std::make_unique<ROOT::Internal::RPageSinkFile>(ntupleName, *outFile, writeOpts);
230 std::unique_ptr<ROOT::RNTupleModel> model;
231 // If we already have an existing RNTuple, copy over its descriptor to support incremental merging
232 if (outNTuple) {
233 auto outSource = RPageSourceFile::CreateFromAnchor(*outNTuple);
235 auto desc = outSource->GetSharedDescriptorGuard();
236 model = destination->InitFromDescriptor(desc.GetRef(), true /* copyClusters */);
237 }
238
239 // Interface conversion
240 sourcePtrs.reserve(sources.size());
241 for (const auto &s : sources) {
242 sourcePtrs.push_back(s.get());
243 }
244
245 // Now merge
246 RNTupleMerger merger{std::move(destination), std::move(model)};
247 RNTupleMergeOptions mergerOpts;
248 mergerOpts.fCompressionSettings = *compression;
249 mergerOpts.fExtraVerbose = extraVerbose;
250 if (auto mergingMode = ParseOptionMergingMode(mergeInfo->fOptions)) {
251 mergerOpts.fMergingMode = *mergingMode;
252 }
253 if (auto errBehavior = ParseOptionErrBehavior(mergeInfo->fOptions)) {
254 mergerOpts.fErrBehavior = *errBehavior;
255 }
256 if (auto versionBehavior = ParseOptionVersionBehavior(mergeInfo->fOptions)) {
257 mergerOpts.fVersionBehavior = *versionBehavior;
258 }
259 merger.Merge(sourcePtrs, mergerOpts).ThrowOnError();
260
261 // Provide the caller with a merged anchor object (even though we've already
262 // written it).
263 *this = *outFile->Get<ROOT::RNTuple>(ntupleName.c_str());
264
265 return 0;
266} catch (const std::exception &ex) {
267 R__LOG_ERROR(NTupleMergeLog()) << "Exception thrown while merging: " << ex.what();
268 return -1;
269}
270
271namespace {
272// Functor used to change the compression of a page to `options.fCompressionSettings`.
273struct RChangeCompressionFunc {
274 const RColumnElementBase &fSrcColElement;
275 const RColumnElementBase &fDstColElement;
276 const RNTupleMergeOptions &fMergeOptions;
277
278 RPageStorage::RSealedPage &fSealedPage;
280 std::byte *fBuffer;
281 std::size_t fBufSize;
282 const ROOT::RNTupleWriteOptions &fWriteOpts;
283
284 void operator()() const
285 {
286 assert(fSrcColElement.GetIdentifier() == fDstColElement.GetIdentifier());
287
288 fSealedPage.VerifyChecksumIfEnabled().ThrowOnError();
289
290 const auto bytesPacked = fSrcColElement.GetPackedSize(fSealedPage.GetNElements());
291 const auto compression = fMergeOptions.fCompressionSettings.value_or(0);
292 // TODO: this buffer could be kept and reused across pages
293 std::unique_ptr<std::byte[]> unzipBufOwned;
294 std::byte *unzipBuf;
295 if (compression != 0) {
296 unzipBufOwned = MakeUninitArray<std::byte>(bytesPacked);
297 unzipBuf = unzipBufOwned.get();
298 } else {
299 unzipBuf = fBuffer;
300 }
301 ROOT::Internal::RNTupleDecompressor::Unzip(fSealedPage.GetBuffer(), fSealedPage.GetDataSize(), bytesPacked,
302 unzipBuf);
303
304 const auto checksumSize = fWriteOpts.GetEnablePageChecksums() * sizeof(std::uint64_t);
305 std::size_t nBytesZipped;
306 if (compression != 0) {
307 assert(fBuffer != unzipBuf);
308 assert(fBufSize >= bytesPacked + checksumSize);
309 nBytesZipped = ROOT::Internal::RNTupleCompressor::Zip(unzipBuf, bytesPacked, compression, fBuffer);
310 } else {
311 nBytesZipped = bytesPacked;
312 }
313
314 fSealedPage = {fBuffer, nBytesZipped + checksumSize, fSealedPage.GetNElements(), fSealedPage.GetHasChecksum()};
315 fSealedPage.ChecksumIfEnabled();
316 }
317};
318
319struct RResealFunc {
320 const RColumnElementBase &fSrcColElement;
321 const RColumnElementBase &fDstColElement;
322 const RNTupleMergeOptions &fMergeOptions;
323
324 RPageStorage::RSealedPage &fSealedPage;
325 ROOT::Internal::RPageAllocator &fPageAlloc;
326 std::byte *fBuffer;
327 std::size_t fBufSize;
328 const ROOT::RNTupleWriteOptions &fWriteOpts;
329
330 void operator()() const
331 {
332 auto page = RPageSource::UnsealPage(fSealedPage, fSrcColElement, fPageAlloc).Unwrap();
333 RPageSink::RSealPageConfig sealConf;
334 sealConf.fElement = &fDstColElement;
335 sealConf.fPage = &page;
336 sealConf.fBuffer = fBuffer;
337 sealConf.fCompressionSettings = *fMergeOptions.fCompressionSettings;
338 sealConf.fWriteChecksum = fWriteOpts.GetEnablePageChecksums();
339 assert(fBufSize >= fSealedPage.GetDataSize() + fSealedPage.GetHasChecksum() * sizeof(std::uint64_t));
340 auto refSealedPage = RPageSink::SealPage(sealConf);
341 fSealedPage = refSealedPage;
342 }
343};
344
345struct RTaskVisitor {
346 std::optional<ROOT::Experimental::TTaskGroup> &fGroup;
347
348 template <typename T>
349 void operator()(T &&f)
350 {
351 if (fGroup)
352 fGroup->Run(f);
353 else
354 f();
355 }
356};
357
358struct RCommonField {
359 const ROOT::RFieldDescriptor *fSrc;
360 const ROOT::RFieldDescriptor *fDst;
361
362 RCommonField(const ROOT::RFieldDescriptor &src, const ROOT::RFieldDescriptor &dst) : fSrc(&src), fDst(&dst) {}
363};
364
365struct RDescriptorsComparison {
366 std::vector<const ROOT::RFieldDescriptor *> fExtraDstFields;
367 std::vector<const ROOT::RFieldDescriptor *> fExtraSrcFields;
368 std::vector<RCommonField> fCommonFields;
369};
370
371struct RColumnOutInfo {
372 ROOT::DescriptorId_t fColumnId;
373 ENTupleColumnType fColumnType;
374};
375
376// { fully.qualified.fieldName.colInputId => colOutputInfo }
377using ColumnIdMap_t = std::unordered_map<std::string, RColumnOutInfo>;
378
379struct RColumnInfoGroup {
380 std::vector<RColumnMergeInfo> fExtraDstColumns;
381 // These are sorted by InputId
382 std::vector<RColumnMergeInfo> fCommonColumns;
383};
384
385} // namespace
386
387// These structs cannot be in the anon namespace becase they're used in RNTupleMerger's private interface.
388namespace ROOT::Experimental::Internal {
390 // This column name is built as a dot-separated concatenation of the ancestry of
391 // the columns' parent fields' names plus the index of the column itself.
392 // e.g. "Muon.pt.x._0"
393 std::string fColumnName;
394 // The column id in the source RNTuple
396 // The corresponding column id in the destination RNTuple (the mapping happens in AddColumnsFromField())
399 // If nullopt, use the default in-memory type
400 std::optional<std::type_index> fInMemoryType;
403};
404
405// Data related to a single call of RNTupleMerger::Merge()
407 std::span<RPageSource *> fSources;
412
413 std::vector<RColumnMergeInfo> fColumns;
414 ColumnIdMap_t fColumnIdMap;
415
417
418 RNTupleMergeData(std::span<RPageSource *> sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts)
419 : fSources{sources}, fDestination{destination}, fMergeOpts{mergeOpts}, fDstDescriptor{destination.GetDescriptor()}
420 {
421 }
422};
423
425 // We use a std::deque so that references to the contained SealedPageSequence_t, and its iterators, are
426 // never invalidated.
427 std::deque<RPageStorage::SealedPageSequence_t> fPagesV;
428 std::vector<RPageStorage::RSealedPageGroup> fGroups;
429 std::vector<std::unique_ptr<std::byte[]>> fBuffers;
430};
431
432std::ostream &operator<<(std::ostream &os, const std::optional<ROOT::RColumnDescriptor::RValueRange> &x)
433{
434 if (x) {
435 os << '(' << x->fMin << ", " << x->fMax << ')';
436 } else {
437 os << "(null)";
438 }
439 return os;
440}
441
442} // namespace ROOT::Experimental::Internal
443
445{
446 // clang-format off
467 // clang-format on
468 return false;
469}
470
471/// Compares the top level fields of `dst` and `src` and determines whether they can be merged or not.
472/// In addition, returns the differences between `dst` and `src`'s structures
473static ROOT::RResult<RDescriptorsComparison>
475{
476 // Cases:
477 // 1. dst == src
478 // 2. dst has fields that src hasn't
479 // 3. src has fields that dst hasn't
480 // 4. dst and src have fields that differ (compatible or incompatible)
481
482 std::vector<std::string> errors;
483 RDescriptorsComparison res;
484
485 std::vector<RCommonField> commonFields;
486
487 for (const auto &dstField : dst.GetTopLevelFields()) {
488 const auto srcFieldId = src.FindFieldId(dstField.GetFieldName());
489 if (srcFieldId != ROOT::kInvalidDescriptorId) {
490 const auto &srcField = src.GetFieldDescriptor(srcFieldId);
491 commonFields.push_back({srcField, dstField});
492 } else {
493 res.fExtraDstFields.emplace_back(&dstField);
494 }
495 }
496 for (const auto &srcField : src.GetTopLevelFields()) {
497 const auto dstFieldId = dst.FindFieldId(srcField.GetFieldName());
498 if (dstFieldId == ROOT::kInvalidDescriptorId)
499 res.fExtraSrcFields.push_back(&srcField);
500 }
501
502 // Check compatibility of common fields
503 auto fieldsToCheck = commonFields;
504 // NOTE: using index-based for loop because the collection may get extended by the iteration
505 for (std::size_t fieldIdx = 0; fieldIdx < fieldsToCheck.size(); ++fieldIdx) {
506 const auto &field = fieldsToCheck[fieldIdx];
507
508 // NOTE: field.fSrc and field.fDst have the same name by construction
509 const auto &fieldName = field.fSrc->GetFieldName();
510
511 // Require that fields are both projected or both not projected
512 bool projCompatible = field.fSrc->IsProjectedField() == field.fDst->IsProjectedField();
513 if (!projCompatible) {
514 std::stringstream ss;
515 ss << "Field `" << fieldName << "` is incompatible with previously-seen field with that name because the "
516 << (field.fSrc->IsProjectedField() ? "new" : "old") << " one is projected and the other isn't";
517 errors.push_back(ss.str());
518 } else if (field.fSrc->IsProjectedField()) {
519 // if both fields are projected, verify that they point to the same real field
520 const auto srcName = src.GetQualifiedFieldName(field.fSrc->GetProjectionSourceId());
521 const auto dstName = dst.GetQualifiedFieldName(field.fDst->GetProjectionSourceId());
522 if (srcName != dstName) {
523 std::stringstream ss;
524 ss << "Field `" << fieldName
525 << "` is projected to a different field than a previously-seen field with the same name (old: "
526 << dstName << ", new: " << srcName << ")";
527 errors.push_back(ss.str());
528 }
529 }
530
531 // Require that fields types match
532 // TODO(gparolini): allow non-identical but compatible types
533 const auto &srcTyName = field.fSrc->GetTypeName();
534 const auto &dstTyName = field.fDst->GetTypeName();
535 if (srcTyName != dstTyName) {
536 std::stringstream ss;
537 ss << "Field `" << fieldName
538 << "` has a type incompatible with a previously-seen field with the same name: (old: " << dstTyName
539 << ", new: " << srcTyName << ")";
540 errors.push_back(ss.str());
541 }
542
543 // Require that type checksums match
544 const auto srcTyChk = field.fSrc->GetTypeChecksum();
545 const auto dstTyChk = field.fDst->GetTypeChecksum();
546 if (srcTyChk && dstTyChk && *srcTyChk != *dstTyChk) {
547 std::stringstream ss;
548 ss << "Field `" << field.fSrc->GetFieldName()
549 << "` has a different type checksum than previously-seen field with the same name";
550 errors.push_back(ss.str());
551 }
552
553 // Require that type versions match
554 const auto srcTyVer = field.fSrc->GetTypeVersion();
555 const auto dstTyVer = field.fDst->GetTypeVersion();
556 if (srcTyVer != dstTyVer) {
557 std::stringstream ss;
558 ss << "Field `" << field.fSrc->GetFieldName()
559 << "` has a different type version than previously-seen field with the same name (old: " << dstTyVer
560 << ", new: " << srcTyVer << ")";
561 errors.push_back(ss.str());
562 }
563
564 // Require that field versions match
565 const auto srcFldVer = field.fSrc->GetFieldVersion();
566 const auto dstFldVer = field.fDst->GetFieldVersion();
567 if (srcFldVer != dstFldVer) {
568 std::stringstream ss;
569 ss << "Field `" << field.fSrc->GetFieldName()
570 << "` has a different field version than previously-seen field with the same name (old: " << dstFldVer
571 << ", new: " << srcFldVer << ")";
572 errors.push_back(ss.str());
573 }
574
575 const auto srcRole = field.fSrc->GetStructure();
576 const auto dstRole = field.fDst->GetStructure();
577 if (srcRole != dstRole) {
578 std::stringstream ss;
579 ss << "Field `" << field.fSrc->GetFieldName()
580 << "` has a different structural role than previously-seen field with the same name (old: " << dstRole
581 << ", new: " << srcRole << ")";
582 errors.push_back(ss.str());
583 }
584
585 // Require that column representations match
586 const auto srcNCols = field.fSrc->GetLogicalColumnIds().size();
587 const auto dstNCols = field.fDst->GetLogicalColumnIds().size();
588 if (srcNCols != dstNCols) {
589 std::stringstream ss;
590 ss << "Field `" << field.fSrc->GetFieldName()
591 << "` has a different number of columns than previously-seen field with the same name (old: " << dstNCols
592 << ", new: " << srcNCols << ")";
593 errors.push_back(ss.str());
594 } else {
595 for (auto i = 0u; i < srcNCols; ++i) {
596 const auto srcColId = field.fSrc->GetLogicalColumnIds()[i];
597 const auto dstColId = field.fDst->GetLogicalColumnIds()[i];
598 const auto &srcCol = src.GetColumnDescriptor(srcColId);
599 const auto &dstCol = dst.GetColumnDescriptor(dstColId);
600 // TODO(gparolini): currently we refuse to merge columns of different types unless they are Split/non-Split
601 // version of the same type, because we know how to treat that specific case. We should also properly handle
602 // different but compatible types.
603 if (srcCol.GetType() != dstCol.GetType() &&
604 !IsSplitOrUnsplitVersionOf(srcCol.GetType(), dstCol.GetType())) {
605 std::stringstream ss;
606 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
607 << "` has a different column type of the same column on the previously-seen field with the same name "
608 "(old: "
609 << RColumnElementBase::GetColumnTypeName(srcCol.GetType())
610 << ", new: " << RColumnElementBase::GetColumnTypeName(dstCol.GetType()) << ")";
611 errors.push_back(ss.str());
612 }
613 if (srcCol.GetBitsOnStorage() != dstCol.GetBitsOnStorage()) {
614 std::stringstream ss;
615 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
616 << "` has a different number of bits of the same column on the previously-seen field with the same "
617 "name "
618 "(old: "
619 << srcCol.GetBitsOnStorage() << ", new: " << dstCol.GetBitsOnStorage() << ")";
620 errors.push_back(ss.str());
621 }
622 if (srcCol.GetValueRange() != dstCol.GetValueRange()) {
623 std::stringstream ss;
624 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
625 << "` has a different value range of the same column on the previously-seen field with the same name "
626 "(old: "
627 << srcCol.GetValueRange() << ", new: " << dstCol.GetValueRange() << ")";
628 errors.push_back(ss.str());
629 }
630 if (srcCol.GetRepresentationIndex() > 0) {
631 std::stringstream ss;
632 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
633 << "` has a representation index higher than 0. This is not supported yet by the merger.";
634 errors.push_back(ss.str());
635 }
636 }
637 }
638
639 // Require that subfields are compatible
640 const auto &srcLinks = field.fSrc->GetLinkIds();
641 const auto &dstLinks = field.fDst->GetLinkIds();
642 if (srcLinks.size() != dstLinks.size()) {
643 std::stringstream ss;
644 ss << "Field `" << field.fSrc->GetFieldName()
645 << "` has a different number of children than previously-seen field with the same name (old: "
646 << dstLinks.size() << ", new: " << srcLinks.size() << ")";
647 errors.push_back(ss.str());
648 } else {
649 for (std::size_t linkIdx = 0, linkNum = srcLinks.size(); linkIdx < linkNum; ++linkIdx) {
650 const auto &srcSubfield = src.GetFieldDescriptor(srcLinks[linkIdx]);
651 const auto &dstSubfield = dst.GetFieldDescriptor(dstLinks[linkIdx]);
652 fieldsToCheck.push_back(RCommonField{srcSubfield, dstSubfield});
653 }
654 }
655 }
656
657 std::string errMsg;
658 for (const auto &err : errors)
659 errMsg += std::string("\n * ") + err;
660
661 if (!errMsg.empty())
662 errMsg = errMsg.substr(1); // strip initial newline
663
664 if (errMsg.length())
665 return R__FAIL(errMsg);
666
667 res.fCommonFields = std::move(commonFields);
668
669 return ROOT::RResult(res);
670}
671
672// Applies late model extension to `destination`, adding all `newFields` to it.
673[[nodiscard]]
674static ROOT::RResult<void>
675ExtendDestinationModel(std::span<const ROOT::RFieldDescriptor *> newFields, ROOT::RNTupleModel &dstModel,
676 RNTupleMergeData &mergeData, std::vector<RCommonField> &commonFields)
677{
678 assert(newFields.size() > 0); // no point in calling this with 0 new cols
679
680 dstModel.Unfreeze();
681 ROOT::Internal::RNTupleModelChangeset changeset{dstModel};
682
683 if (mergeData.fMergeOpts.fExtraVerbose) {
684 std::string msg = "destination doesn't contain field";
685 if (newFields.size() > 1)
686 msg += 's';
687 msg += ' ';
688 msg += std::accumulate(newFields.begin(), newFields.end(), std::string{}, [](const auto &acc, const auto *field) {
689 return acc + (acc.length() ? ", " : "") + '`' + field->GetFieldName() + '`';
690 });
691 R__LOG_INFO(NTupleMergeLog()) << msg << ": adding " << (newFields.size() > 1 ? "them" : "it")
692 << " to the destination model (entry #" << mergeData.fNumDstEntries << ").";
693 }
694
695 changeset.fAddedFields.reserve(newFields.size());
696 // First add all non-projected fields...
697 for (const auto *fieldDesc : newFields) {
698 if (!fieldDesc->IsProjectedField()) {
699 auto field = fieldDesc->CreateField(*mergeData.fSrcDescriptor);
700 changeset.AddField(std::move(field));
701 }
702 }
703 // ...then add all projected fields.
704 for (const auto *fieldDesc : newFields) {
705 if (!fieldDesc->IsProjectedField())
706 continue;
707
709 auto field = fieldDesc->CreateField(*mergeData.fSrcDescriptor);
710 const auto sourceId = fieldDesc->GetProjectionSourceId();
711 const auto &sourceField = dstModel.GetConstField(mergeData.fSrcDescriptor->GetQualifiedFieldName(sourceId));
712 fieldMap[field.get()] = &sourceField;
713
714 for (const auto &subfield : *field) {
715 const auto &subFieldDesc = mergeData.fSrcDescriptor->GetFieldDescriptor(subfield.GetOnDiskId());
716 const auto subSourceId = subFieldDesc.GetProjectionSourceId();
717 const auto &subSourceField =
718 dstModel.GetConstField(mergeData.fSrcDescriptor->GetQualifiedFieldName(subSourceId));
719 fieldMap[&subfield] = &subSourceField;
720 }
721 changeset.fAddedProjectedFields.emplace_back(field.get());
722 ROOT::Internal::GetProjectedFieldsOfModel(dstModel).Add(std::move(field), fieldMap);
723 }
724 dstModel.Freeze();
725 try {
726 mergeData.fDestination.UpdateSchema(changeset, mergeData.fNumDstEntries);
727 } catch (const ROOT::RException &ex) {
728 return R__FAIL(ex.GetError().GetReport());
729 }
730
731 commonFields.reserve(commonFields.size() + newFields.size());
732 for (const auto *field : newFields) {
733 const auto newFieldInDstId = mergeData.fDstDescriptor.FindFieldId(field->GetFieldName());
734 const auto &newFieldInDst = mergeData.fDstDescriptor.GetFieldDescriptor(newFieldInDstId);
735 commonFields.emplace_back(*field, newFieldInDst);
736 }
737
739}
740
741// Generates default (zero) values for the given columns
742[[nodiscard]]
743static ROOT::RResult<void>
744GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span<const RColumnMergeInfo> columns,
745 RSealedPageMergeData &sealedPageData, ROOT::Internal::RPageAllocator &pageAlloc,
746 const ROOT::RNTupleDescriptor &dstDescriptor, const RNTupleMergeData &mergeData)
747{
748 if (!nEntriesToGenerate)
750
751 for (const auto &column : columns) {
752 const ROOT::RFieldDescriptor *field = column.fParentFieldDescriptor;
753
754 // Skip all auxiliary columns
755 assert(!field->GetLogicalColumnIds().empty());
756 if (field->GetLogicalColumnIds()[0] != column.fInputId)
757 continue;
758
759 // Check if this column is a child of a Collection or a Variant. If so, it has no data
760 // and can be skipped.
761 bool skipColumn = false;
762 auto nRepetitions = std::max<std::uint64_t>(field->GetNRepetitions(), 1);
763 for (auto parentId = field->GetParentId(); parentId != ROOT::kInvalidDescriptorId;) {
764 const ROOT::RFieldDescriptor &parent = column.fParentNTupleDescriptor->GetFieldDescriptor(parentId);
767 skipColumn = true;
768 break;
769 }
770 nRepetitions *= std::max<std::uint64_t>(parent.GetNRepetitions(), 1);
771 parentId = parent.GetParentId();
772 }
773 if (skipColumn)
774 continue;
775
776 const auto structure = field->GetStructure();
777
778 if (structure == ROOT::ENTupleStructure::kStreamer) {
779 return R__FAIL(
780 "Destination RNTuple contains a streamer field (" + field->GetFieldName() +
781 ") that is not present in one of the sources. "
782 "Creating a default value for a streamer field is ill-defined, therefore the merging process will abort.");
783 }
784
785 // NOTE: we cannot have a Record here because it has no associated columns.
787 structure == ROOT::ENTupleStructure::kPlain);
788
789 const auto &columnDesc = dstDescriptor.GetColumnDescriptor(column.fOutputId);
790 const auto colElement = RColumnElementBase::Generate(columnDesc.GetType());
791 const auto nElements = nEntriesToGenerate * nRepetitions;
792 const auto nBytesOnStorage = colElement->GetPackedSize(nElements);
793 // TODO(gparolini): make this configurable
794 constexpr auto kPageSizeLimit = 256 * 1024;
795 // TODO(gparolini): consider coalescing the last page if its size is less than some threshold
796 const size_t nPages = nBytesOnStorage / kPageSizeLimit + !!(nBytesOnStorage % kPageSizeLimit);
797 for (size_t i = 0; i < nPages; ++i) {
798 const auto pageSize = (i < nPages - 1) ? kPageSizeLimit : nBytesOnStorage - kPageSizeLimit * (nPages - 1);
799 const auto checksumSize = RPageStorage::kNBytesPageChecksum;
800 const auto bufSize = pageSize + checksumSize;
801 assert(pageSize % colElement->GetSize() == 0);
802 const auto nElementsPerPage = pageSize / colElement->GetSize();
803 auto page = pageAlloc.NewPage(colElement->GetSize(), nElementsPerPage);
804 page.GrowUnchecked(nElementsPerPage);
805 memset(page.GetBuffer(), 0, page.GetNBytes());
806
807 auto &buffer = sealedPageData.fBuffers.emplace_back(new std::byte[bufSize]);
809 sealConf.fElement = colElement.get();
810 sealConf.fPage = &page;
811 sealConf.fBuffer = buffer.get();
812 sealConf.fCompressionSettings = mergeData.fMergeOpts.fCompressionSettings.value();
814 auto sealedPage = RPageSink::SealPage(sealConf);
815
816 sealedPageData.fPagesV.push_back({sealedPage});
817 sealedPageData.fGroups.emplace_back(column.fOutputId, sealedPageData.fPagesV.back().cbegin(),
818 sealedPageData.fPagesV.back().cend());
819 }
820 }
822}
823
824// Merges all columns appearing both in the source and destination RNTuples, just copying them if their
825// compression matches ("fast merge") or by unsealing and resealing them with the proper compression.
826ROOT::RResult<void>
828 const ROOT::RClusterDescriptor &clusterDesc,
829 std::span<const RColumnMergeInfo> commonColumns,
830 const RCluster::ColumnSet_t &commonColumnSet, std::size_t nCommonColumnsInCluster,
831 RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData,
833{
834 assert(nCommonColumnsInCluster == commonColumnSet.size());
835 assert(nCommonColumnsInCluster <= commonColumns.size());
836 if (nCommonColumnsInCluster == 0)
838
839 const RCluster *cluster = clusterPool.GetCluster(clusterDesc.GetId(), commonColumnSet);
840 // we expect the cluster pool to contain the requested set of columns, since they were
841 // validated by CompareDescriptorStructure().
842 assert(cluster);
843
844 for (size_t colIdx = 0; colIdx < nCommonColumnsInCluster; ++colIdx) {
845 const auto &column = commonColumns[colIdx];
846 const auto &columnId = column.fInputId;
847 R__ASSERT(clusterDesc.ContainsColumn(columnId));
848
849 const auto &columnDesc = mergeData.fSrcDescriptor->GetColumnDescriptor(columnId);
850 const auto srcColElement = column.fInMemoryType
851 ? ROOT::Internal::GenerateColumnElement(*column.fInMemoryType, columnDesc.GetType())
852 : RColumnElementBase::Generate(columnDesc.GetType());
853 const auto dstColElement = column.fInMemoryType
854 ? ROOT::Internal::GenerateColumnElement(*column.fInMemoryType, column.fColumnType)
855 : RColumnElementBase::Generate(column.fColumnType);
856
857 // Now get the pages for this column in this cluster
858 const auto &pages = clusterDesc.GetPageRange(columnId);
859
861 sealedPages.resize(pages.GetPageInfos().size());
862
863 // Each column range potentially has a distinct compression settings
864 const auto colRangeCompressionSettings = clusterDesc.GetColumnRange(columnId).GetCompressionSettings().value();
865
866 // Select "merging level". There are 3 levels, from fastest to slowest, depending on the case:
867 // L1: compression and encoding of src and dest both match: we can simply copy the page
868 // L2: compression of dest doesn't match the src but encoding does: we must recompress the page but can avoid
869 // resealing it.
870 // L3: on-disk encoding doesn't match: we need to reseal the page, which implies decompressing and recompressing
871 // it.
872 const bool compressionIsDifferent =
873 colRangeCompressionSettings != mergeData.fMergeOpts.fCompressionSettings.value();
874 const bool needsResealing =
875 srcColElement->GetIdentifier().fOnDiskType != dstColElement->GetIdentifier().fOnDiskType;
876 const bool needsRecompressing = compressionIsDifferent || needsResealing;
877
878 if (needsRecompressing && mergeData.fMergeOpts.fExtraVerbose) {
880 << (needsResealing ? "Resealing" : "Recompressing") << " column " << column.fColumnName
881 << ": { compression: " << colRangeCompressionSettings << " => "
882 << mergeData.fMergeOpts.fCompressionSettings.value()
883 << ", onDiskType: " << RColumnElementBase::GetColumnTypeName(srcColElement->GetIdentifier().fOnDiskType)
884 << " => " << RColumnElementBase::GetColumnTypeName(dstColElement->GetIdentifier().fOnDiskType);
885 }
886
887 size_t pageBufferBaseIdx = sealedPageData.fBuffers.size();
888 // If the column range already has the right compression we don't need to allocate any new buffer, so we don't
889 // bother reserving memory for them.
890 if (needsRecompressing)
891 sealedPageData.fBuffers.resize(sealedPageData.fBuffers.size() + pages.GetPageInfos().size());
892
893 // If this column is deferred, we may need to fill "holes" until its real start. We fill any missing entry
894 // with zeroes, like we do for extraDstColumns.
895 // As an optimization, we don't do this for the first source (since we can rely on the FirstElementIndex and
896 // deferred column mechanism in that case).
897 // TODO: also avoid doing this if we added no real page of this column to the destination yet.
898 if (columnDesc.GetFirstElementIndex() > clusterDesc.GetFirstEntryIndex() && mergeData.fNumDstEntries > 0) {
899 const auto nMissingEntries = columnDesc.GetFirstElementIndex() - clusterDesc.GetFirstEntryIndex();
900 auto res = GenerateZeroPagesForColumns(nMissingEntries, {&column, 1}, sealedPageData, pageAlloc,
901 mergeData.fDstDescriptor, mergeData);
902 if (!res)
903 return R__FORWARD_ERROR(res);
904 }
905
906 // Loop over the pages
907 std::uint64_t pageIdx = 0;
908 for (const auto &pageInfo : pages.GetPageInfos()) {
909 assert(pageIdx < sealedPages.size());
910 assert(sealedPageData.fBuffers.size() == 0 || pageIdx < sealedPageData.fBuffers.size());
911 assert(pageInfo.GetLocator().GetType() != RNTupleLocator::kTypePageZero);
912
913 ROOT::Internal::ROnDiskPage::Key key{columnId, pageIdx};
914 auto onDiskPage = cluster->GetOnDiskPage(key);
915
916 const auto checksumSize = pageInfo.HasChecksum() * RPageStorage::kNBytesPageChecksum;
917 RPageStorage::RSealedPage &sealedPage = sealedPages[pageIdx];
918 sealedPage.SetNElements(pageInfo.GetNElements());
919 sealedPage.SetHasChecksum(pageInfo.HasChecksum());
920 sealedPage.SetBufferSize(pageInfo.GetLocator().GetNBytesOnStorage() + checksumSize);
921 sealedPage.SetBuffer(onDiskPage->GetAddress());
922 // TODO(gparolini): more graceful error handling (skip the page?)
924 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));
925
926 if (needsRecompressing) {
927 const auto uncompressedSize = srcColElement->GetSize() * sealedPage.GetNElements();
928 auto &buffer = sealedPageData.fBuffers[pageBufferBaseIdx + pageIdx];
929 const auto bufSize = uncompressedSize + checksumSize;
930 // NOTE: we currently allocate the max possible size for this buffer and don't shrink it afterward.
931 // We might want to introduce an option that trades speed for memory usage and shrink the buffer to fit
932 // the actual data size after recompressing.
933 buffer = MakeUninitArray<std::byte>(bufSize);
934
935 // clang-format off
936 if (needsResealing) {
937 RTaskVisitor{fTaskGroup}(RResealFunc{
938 *srcColElement,
939 *dstColElement,
940 mergeData.fMergeOpts,
941 sealedPage,
942 *fPageAlloc,
943 buffer.get(),
944 bufSize,
945 mergeData.fDestination.GetWriteOptions()
946 });
947 } else {
948 RTaskVisitor{fTaskGroup}(RChangeCompressionFunc{
949 *srcColElement,
950 *dstColElement,
951 mergeData.fMergeOpts,
952 sealedPage,
953 *fPageAlloc,
954 buffer.get(),
955 bufSize,
956 mergeData.fDestination.GetWriteOptions()
957 });
958 }
959 // clang-format on
960 }
961
962 ++pageIdx;
963
964 } // end of loop over pages
965
966 if (fTaskGroup)
967 fTaskGroup->Wait();
968
969 sealedPageData.fPagesV.push_back(std::move(sealedPages));
970 sealedPageData.fGroups.emplace_back(column.fOutputId, sealedPageData.fPagesV.back().cbegin(),
971 sealedPageData.fPagesV.back().cend());
972 } // end loop over common columns
973
975}
976
977// Iterates over all clusters of `source` and merges their pages into `destination`.
978// It is assumed that all columns in `commonColumns` are present (and compatible) in both the source and
979// the destination's schemas.
980// The pages may be "fast-merged" (i.e. simply copied with no decompression/recompression) if the target
981// compression is unspecified or matches the original compression settings.
983RNTupleMerger::MergeSourceClusters(RPageSource &source, std::span<const RColumnMergeInfo> commonColumns,
984 std::span<const RColumnMergeInfo> extraDstColumns, RNTupleMergeData &mergeData)
985{
986 ROOT::Internal::RClusterPool clusterPool{source};
987
988 std::vector<RColumnMergeInfo> missingColumns{extraDstColumns.begin(), extraDstColumns.end()};
989
990 // Loop over all clusters in this file.
991 // descriptor->GetClusterIterable() doesn't guarantee any specific order, so we explicitly
992 // request the first cluster.
993 ROOT::DescriptorId_t clusterId = mergeData.fSrcDescriptor->FindClusterId(0, 0);
994 while (clusterId != ROOT::kInvalidDescriptorId) {
995 const auto &clusterDesc = mergeData.fSrcDescriptor->GetClusterDescriptor(clusterId);
996 const auto nClusterEntries = clusterDesc.GetNEntries();
997 R__ASSERT(nClusterEntries > 0);
998
999 // NOTE: just because a column is in `commonColumns` it doesn't mean that each cluster in the source contains it,
1000 // as it may be a deferred column that only has real data in a future cluster.
1001 // We need to figure out which columns are actually present in this cluster so we only merge their pages (the
1002 // missing columns are handled by synthesizing zero pages - see below).
1003 size_t nCommonColumnsInCluster = commonColumns.size();
1004 while (nCommonColumnsInCluster > 0) {
1005 // Since `commonColumns` is sorted by column input id, we can simply traverse it from the back and stop as
1006 // soon as we find a common column that appears in this cluster: we know that in that case all previous
1007 // columns must appear as well.
1008 if (clusterDesc.ContainsColumn(commonColumns[nCommonColumnsInCluster - 1].fInputId))
1009 break;
1010 --nCommonColumnsInCluster;
1011 }
1012
1013 // Convert columns to a ColumnSet for the ClusterPool query
1014 RCluster::ColumnSet_t commonColumnSet;
1015 commonColumnSet.reserve(nCommonColumnsInCluster);
1016 for (size_t i = 0; i < nCommonColumnsInCluster; ++i)
1017 commonColumnSet.emplace(commonColumns[i].fInputId);
1018
1019 // For each cluster, the "missing columns" are the union of the extraDstColumns and the common columns
1020 // that are not present in the cluster. We generate zero pages for all of them.
1021 missingColumns.resize(extraDstColumns.size());
1022 for (size_t i = nCommonColumnsInCluster; i < commonColumns.size(); ++i)
1023 missingColumns.push_back(commonColumns[i]);
1024
1025 RSealedPageMergeData sealedPageData;
1026 auto res = MergeCommonColumns(clusterPool, clusterDesc, commonColumns, commonColumnSet, nCommonColumnsInCluster,
1027 sealedPageData, mergeData, *fPageAlloc);
1028 if (!res)
1029 return R__FORWARD_ERROR(res);
1030
1031 res = GenerateZeroPagesForColumns(nClusterEntries, missingColumns, sealedPageData, *fPageAlloc,
1032 mergeData.fDstDescriptor, mergeData);
1033 if (!res)
1034 return R__FORWARD_ERROR(res);
1035
1036 // Commit the pages and the clusters
1037 mergeData.fDestination.CommitSealedPageV(sealedPageData.fGroups);
1038 mergeData.fDestination.CommitCluster(nClusterEntries);
1039 mergeData.fNumDstEntries += nClusterEntries;
1040
1041 // Go to the next cluster
1042 clusterId = mergeData.fSrcDescriptor->FindNextClusterId(clusterId);
1043 }
1044
1045 // TODO(gparolini): when we get serious about huge file support (>~ 100GB) we might want to check here
1046 // the size of the running page list and commit a cluster group when it exceeds some threshold,
1047 // which would prevent the page list from getting too large.
1048 // However, as of today, we aren't really handling such huge files, and even relatively big ones
1049 // such as the CMS dataset have a page list size of about only 2 MB.
1050 // So currently we simply merge all cluster groups into one.
1052}
1053
1054static std::optional<std::type_index> ColumnInMemoryType(std::string_view fieldType, ENTupleColumnType onDiskType)
1055{
1056 if (onDiskType == ENTupleColumnType::kIndex32 || onDiskType == ENTupleColumnType::kSplitIndex32 ||
1058 return typeid(ROOT::Internal::RColumnIndex);
1059
1060 if (onDiskType == ENTupleColumnType::kSwitch)
1061 return typeid(ROOT::Internal::RColumnSwitch);
1062
1063 // clang-format off
1064 if (fieldType == "bool") return typeid(bool);
1065 if (fieldType == "std::byte") return typeid(std::byte);
1066 if (fieldType == "char") return typeid(char);
1067 if (fieldType == "std::int8_t") return typeid(std::int8_t);
1068 if (fieldType == "std::uint8_t") return typeid(std::uint8_t);
1069 if (fieldType == "std::int16_t") return typeid(std::int16_t);
1070 if (fieldType == "std::uint16_t") return typeid(std::uint16_t);
1071 if (fieldType == "std::int32_t") return typeid(std::int32_t);
1072 if (fieldType == "std::uint32_t") return typeid(std::uint32_t);
1073 if (fieldType == "std::int64_t") return typeid(std::int64_t);
1074 if (fieldType == "std::uint64_t") return typeid(std::uint64_t);
1075 if (fieldType == "float") return typeid(float);
1076 if (fieldType == "double") return typeid(double);
1077 // clang-format on
1078
1079 // if the type is not one of those above, we use the default in-memory type.
1080 return std::nullopt;
1081}
1082
1083// Given a field, fill `columns` and `mergeData.fColumnIdMap` with information about all columns belonging to it and its
1084// subfields. `mergeData.fColumnIdMap` is used to map matching columns from different sources to the same output column
1085// in the destination. We match columns by their "fully qualified name", which is the concatenation of their ancestor
1086// fields' names and the column index. By this point, since we called `CompareDescriptorStructure()` earlier, we should
1087// be guaranteed that two matching columns will have at least compatible representations. NOTE: srcFieldDesc and
1088// dstFieldDesc may alias.
1089static void AddColumnsFromField(std::vector<RColumnMergeInfo> &columns, const ROOT::RNTupleDescriptor &srcDesc,
1090 RNTupleMergeData &mergeData, const ROOT::RFieldDescriptor &srcFieldDesc,
1091 const ROOT::RFieldDescriptor &dstFieldDesc, const std::string &prefix = "")
1092{
1093 std::string name = prefix + '.' + srcFieldDesc.GetFieldName();
1094
1095 const auto &columnIds = srcFieldDesc.GetLogicalColumnIds();
1096 columns.reserve(columns.size() + columnIds.size());
1097 // NOTE: here we can match the src and dst columns by column index because we forbid merging fields with
1098 // different column representations.
1099 for (auto i = 0u; i < srcFieldDesc.GetLogicalColumnIds().size(); ++i) {
1100 // We don't want to try and merge alias columns
1101 if (srcFieldDesc.IsProjectedField())
1102 continue;
1103
1104 auto srcColumnId = srcFieldDesc.GetLogicalColumnIds()[i];
1105 const auto &srcColumn = srcDesc.GetColumnDescriptor(srcColumnId);
1106
1107 RColumnMergeInfo info{};
1108 info.fColumnName = name + '.' + std::to_string(srcColumn.GetIndex());
1109 info.fInputId = srcColumn.GetPhysicalId();
1110 // NOTE(gparolini): the parent field is used when synthesizing zero pages, which happens in 2 situations:
1111 // 1. when adding extra dst columns (in which case we need to synthesize zero pages for the incoming src), and
1112 // 2. when merging a deferred column into an existing column (in which case we need to fill the "hole" with
1113 // zeroes). For the first case srcFieldDesc and dstFieldDesc are the same (see the calling site of this function),
1114 // but for the second case they're not, and we need to pick the source field because we will then check the
1115 // column's *input* id inside fParentFieldDescriptor to see if it's a suppressed column (see
1116 // GenerateZeroPagesForColumns()).
1117 info.fParentFieldDescriptor = &srcFieldDesc;
1118 // Save the parent field descriptor since this may be either the source or destination descriptor depending on
1119 // whether this is an extraDstField or a commonField. We will need this in GenerateZeroPagesForColumns() to
1120 // properly walk up the field hierarchy.
1121 info.fParentNTupleDescriptor = &srcDesc;
1122
1123 if (auto it = mergeData.fColumnIdMap.find(info.fColumnName); it != mergeData.fColumnIdMap.end()) {
1124 info.fOutputId = it->second.fColumnId;
1125 info.fColumnType = it->second.fColumnType;
1126 } else {
1127 info.fOutputId = mergeData.fColumnIdMap.size();
1128 // NOTE(gparolini): map the type of src column to the type of dst column.
1129 // This mapping is only relevant for common columns and it's done to ensure we keep a consistent
1130 // on-disk representation of the same column.
1131 // This is also important to do for first source when it is used to generate the destination sink,
1132 // because even in that case their column representations may differ.
1133 // e.g. if the destination has a different compression than the source, an integer column might be
1134 // zigzag-encoded in the source but not in the destination.
1135 auto dstColumnId = dstFieldDesc.GetLogicalColumnIds()[i];
1136 const auto &dstColumn = mergeData.fDstDescriptor.GetColumnDescriptor(dstColumnId);
1137 info.fColumnType = dstColumn.GetType();
1138 mergeData.fColumnIdMap[info.fColumnName] = {info.fOutputId, info.fColumnType};
1139 }
1140
1141 if (mergeData.fMergeOpts.fExtraVerbose) {
1142 R__LOG_INFO(NTupleMergeLog()) << "Adding column " << info.fColumnName << " with log.id " << srcColumnId
1143 << ", phys.id " << srcColumn.GetPhysicalId() << ", type "
1144 << RColumnElementBase::GetColumnTypeName(srcColumn.GetType()) << " -> log.id "
1145 << info.fOutputId << ", type "
1147 }
1148
1149 // Since we disallow merging fields of different types, src and dstFieldDesc must have the same type name.
1150 assert(srcFieldDesc.GetTypeName() == dstFieldDesc.GetTypeName());
1151 info.fInMemoryType = ColumnInMemoryType(srcFieldDesc.GetTypeName(), info.fColumnType);
1152 columns.emplace_back(info);
1153 }
1154
1155 const auto &srcChildrenIds = srcFieldDesc.GetLinkIds();
1156 const auto &dstChildrenIds = dstFieldDesc.GetLinkIds();
1157 assert(srcChildrenIds.size() == dstChildrenIds.size());
1158 for (auto i = 0u; i < srcChildrenIds.size(); ++i) {
1159 const auto &srcChild = srcDesc.GetFieldDescriptor(srcChildrenIds[i]);
1160 const auto &dstChild = mergeData.fDstDescriptor.GetFieldDescriptor(dstChildrenIds[i]);
1161 AddColumnsFromField(columns, srcDesc, mergeData, srcChild, dstChild, name);
1162 }
1163}
1164
1165// Converts the fields comparison data to the corresponding column information.
1166// While doing so, it collects such information in `mergeData.fColumnIdMap`, which is used by later calls to this
1167// function to map already-seen column names to their chosen outputId, type and so on.
1168static RColumnInfoGroup GatherColumnInfos(const RDescriptorsComparison &descCmp, const ROOT::RNTupleDescriptor &srcDesc,
1169 RNTupleMergeData &mergeData)
1170{
1171 RColumnInfoGroup res;
1172 for (const ROOT::RFieldDescriptor *field : descCmp.fExtraDstFields) {
1173 AddColumnsFromField(res.fExtraDstColumns, mergeData.fDstDescriptor, mergeData, *field, *field);
1174 }
1175 for (const auto &[srcField, dstField] : descCmp.fCommonFields) {
1176 AddColumnsFromField(res.fCommonColumns, srcDesc, mergeData, *srcField, *dstField);
1177 }
1178
1179 // Sort the commonColumns by ID so we can more easily tell how many common columns each cluster has
1180 // (since each cluster must contain all columns of the previous cluster plus potentially some new ones)
1181 std::sort(res.fCommonColumns.begin(), res.fCommonColumns.end(),
1182 [](const auto &a, const auto &b) { return a.fInputId < b.fInputId; });
1183
1184 return res;
1185}
1186
1187static void PrefillColumnMap(const ROOT::RNTupleDescriptor &desc, const ROOT::RFieldDescriptor &fieldDesc,
1188 ColumnIdMap_t &colIdMap, const std::string &prefix = "")
1189{
1190 std::string name = prefix + '.' + fieldDesc.GetFieldName();
1191 for (const auto &colId : fieldDesc.GetLogicalColumnIds()) {
1192 const auto &colDesc = desc.GetColumnDescriptor(colId);
1193 RColumnOutInfo info{};
1194 const auto colName = name + '.' + std::to_string(colDesc.GetIndex());
1195 info.fColumnId = colDesc.GetLogicalId();
1196 info.fColumnType = colDesc.GetType();
1197 colIdMap[colName] = info;
1198 }
1199
1200 for (const auto &subId : fieldDesc.GetLinkIds()) {
1201 const auto &subfield = desc.GetFieldDescriptor(subId);
1202 PrefillColumnMap(desc, subfield, colIdMap, name);
1203 }
1204}
1205
1206RNTupleMerger::RNTupleMerger(std::unique_ptr<ROOT::Internal::RPagePersistentSink> destination,
1207 std::unique_ptr<ROOT::RNTupleModel> model)
1208 // TODO(gparolini): consider using an arena allocator instead, since we know the precise lifetime
1209 // of the RNTuples we are going to handle (e.g. we can reset the arena at every source)
1210 : fDestination(std::move(destination)),
1211 fPageAlloc(std::make_unique<ROOT::Internal::RPageAllocatorHeap>()),
1212 fModel(std::move(model))
1213{
1215
1216#ifdef R__USE_IMT
1219#endif
1220}
1221
1222RNTupleMerger::RNTupleMerger(std::unique_ptr<ROOT::Internal::RPagePersistentSink> destination)
1223 : RNTupleMerger(std::move(destination), nullptr)
1224{
1225}
1226
1227ROOT::RResult<void> RNTupleMerger::Merge(std::span<RPageSource *> sources, const RNTupleMergeOptions &mergeOptsIn)
1228{
1229 RNTupleMergeOptions mergeOpts = mergeOptsIn;
1230
1231 assert(fDestination);
1232
1233 // Set compression settings if unset and verify it's compatible with the sink
1234 {
1235 const auto dstCompSettings = fDestination->GetWriteOptions().GetCompression();
1236 if (!mergeOpts.fCompressionSettings) {
1237 mergeOpts.fCompressionSettings = dstCompSettings;
1238 } else if (*mergeOpts.fCompressionSettings != dstCompSettings) {
1239 return R__FAIL(std::string("The compression given to RNTupleMergeOptions is different from that of the "
1240 "sink! (opts: ") +
1241 std::to_string(*mergeOpts.fCompressionSettings) + ", sink: " + std::to_string(dstCompSettings) +
1242 ") This is currently unsupported.");
1243 }
1244 }
1245
1246 // we should have a model if and only if the destination is initialized.
1247 if (!!fModel != fDestination->IsInitialized()) {
1248 return R__FAIL(
1249 "passing an already-initialized destination to RNTupleMerger::Merge (i.e. trying to do incremental "
1250 "merging) can only be done by providing a valid ROOT::RNTupleModel when constructing the RNTupleMerger.");
1251 }
1252
1253 RNTupleMergeData mergeData{sources, *fDestination, mergeOpts};
1254 mergeData.fNumDstEntries = mergeData.fDestination.GetNEntries();
1255
1256 if (fModel) {
1257 // If this is an incremental merging, pre-fill the column id map with the existing destination ids.
1258 // Otherwise we would generate new output ids that may not match the ones in the destination!
1259 for (const auto &field : mergeData.fDstDescriptor.GetTopLevelFields()) {
1260 PrefillColumnMap(fDestination->GetDescriptor(), field, mergeData.fColumnIdMap);
1261 }
1262 }
1263
1264#define SKIP_OR_ABORT(errMsg) \
1265 do { \
1266 if (mergeOpts.fErrBehavior == ENTupleMergeErrBehavior::kSkip) { \
1267 R__LOG_WARNING(NTupleMergeLog()) << "Skipping RNTuple due to: " << (errMsg); \
1268 continue; \
1269 } else { \
1270 return R__FAIL(errMsg); \
1271 } \
1272 } while (0)
1273
1274 // Merge main loop
1275 for (RPageSource *source : sources) {
1276 // We need to make sure the streamer info from the source files is loaded otherwise we may not be able
1277 // to build the streamer info of user-defined types unless we have their dictionaries available.
1278 source->LoadStreamerInfo();
1279
1281 auto srcDescriptor = source->GetSharedDescriptorGuard();
1282 mergeData.fSrcDescriptor = &srcDescriptor.GetRef();
1283
1287 << "RNTuple '" << mergeData.fSrcDescriptor->GetName()
1288 << "' has a higher format version than the latest supported by this version "
1289 "of ROOT. Merging will work but some features may be dropped.";
1290 } else {
1291 return R__FAIL("RNTuple '" + mergeData.fSrcDescriptor->GetName() +
1292 "' has a higher format version than the latest supported by this version. Refusing to "
1293 "merge, since RNTupleMergeOptions::fVersionBehavior is set to AbortOnHigherVersion.");
1294 }
1295 }
1296
1297 // Create sink from the input model if not initialized
1298 if (!fModel) {
1299 fModel = fDestination->InitFromDescriptor(srcDescriptor.GetRef(), false /* copyClusters */);
1300 }
1301
1302 for (const auto &extraTypeInfoDesc : srcDescriptor->GetExtraTypeInfoIterable())
1303 fDestination->UpdateExtraTypeInfo(extraTypeInfoDesc);
1304
1305 auto descCmpRes = CompareDescriptorStructure(mergeData.fDstDescriptor, srcDescriptor.GetRef());
1306 if (!descCmpRes) {
1307 SKIP_OR_ABORT(std::string("Source RNTuple has an incompatible schema with the destination:\n") +
1308 descCmpRes.GetError()->GetReport());
1309 }
1310 auto descCmp = descCmpRes.Unwrap();
1311
1312 // If the current source is missing some fields and we're not in Union mode, error
1313 // (if we are in Union mode, MergeSourceClusters will fill the missing fields with default values).
1314 if (mergeOpts.fMergingMode != ENTupleMergingMode::kUnion && !descCmp.fExtraDstFields.empty()) {
1315 std::string msg = "Source RNTuple is missing the following fields:";
1316 for (const auto *field : descCmp.fExtraDstFields) {
1317 msg += "\n " + field->GetFieldName() + " : " + field->GetTypeName();
1318 }
1319 SKIP_OR_ABORT(msg);
1320 }
1321
1322 // handle extra src fields
1323 if (descCmp.fExtraSrcFields.size()) {
1324 if (mergeOpts.fMergingMode == ENTupleMergingMode::kUnion) {
1325 // late model extension for all fExtraSrcFields in Union mode
1326 auto res = ExtendDestinationModel(descCmp.fExtraSrcFields, *fModel, mergeData, descCmp.fCommonFields);
1327 if (!res)
1328 return R__FORWARD_ERROR(res);
1329 } else if (mergeOpts.fMergingMode == ENTupleMergingMode::kStrict) {
1330 // If the current source has extra fields and we're in Strict mode, error
1331 std::string msg = "Source RNTuple has extra fields that the destination RNTuple doesn't have:";
1332 for (const auto *field : descCmp.fExtraSrcFields) {
1333 msg += "\n " + field->GetFieldName() + " : " + field->GetTypeName();
1334 }
1335 SKIP_OR_ABORT(msg);
1336 }
1337 }
1338
1339 // handle extra dst fields & common fields
1340 auto columnInfos = GatherColumnInfos(descCmp, srcDescriptor.GetRef(), mergeData);
1341 auto res = MergeSourceClusters(*source, columnInfos.fCommonColumns, columnInfos.fExtraDstColumns, mergeData);
1342 if (!res)
1343 return R__FORWARD_ERROR(res);
1344 } // end loop over sources
1345
1346 if (fDestination->GetNEntries() == 0)
1347 R__LOG_WARNING(NTupleMergeLog()) << "Output RNTuple '" << fDestination->GetNTupleName() << "' has no entries.";
1348
1349 // Commit the output
1350 fDestination->CommitClusterGroup();
1351 fDestination->CommitDataset();
1352
1353 return RResult<void>::Success();
1354}
fBuffer
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking).
Definition RError.hxx:303
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:299
#define R__LOG_WARNING(...)
Definition RLogger.hxx:357
#define R__LOG_ERROR(...)
Definition RLogger.hxx:356
#define R__LOG_INFO(...)
Definition RLogger.hxx:358
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
static std::optional< std::type_index > ColumnInMemoryType(std::string_view fieldType, ENTupleColumnType onDiskType)
static ROOT::RResult< RDescriptorsComparison > CompareDescriptorStructure(const ROOT::RNTupleDescriptor &dst, const ROOT::RNTupleDescriptor &src)
Compares the top level fields of dst and src and determines whether they can be merged or not.
static ROOT::RResult< void > GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span< const RColumnMergeInfo > columns, RSealedPageMergeData &sealedPageData, ROOT::Internal::RPageAllocator &pageAlloc, const ROOT::RNTupleDescriptor &dstDescriptor, const RNTupleMergeData &mergeData)
static std::optional< ENTupleMergeErrBehavior > ParseOptionErrBehavior(const TString &opts)
static ROOT::RLogChannel & NTupleMergeLog()
#define SKIP_OR_ABORT(errMsg)
static std::optional< T > ParseStringOption(const TString &opts, const char *pattern, std::initializer_list< std::pair< const char *, T > > validValues)
static bool IsSplitOrUnsplitVersionOf(ENTupleColumnType a, ENTupleColumnType b)
static void AddColumnsFromField(std::vector< RColumnMergeInfo > &columns, const ROOT::RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData, const ROOT::RFieldDescriptor &srcFieldDesc, const ROOT::RFieldDescriptor &dstFieldDesc, const std::string &prefix="")
static std::optional< ENTupleMergingMode > ParseOptionMergingMode(const TString &opts)
static void PrefillColumnMap(const ROOT::RNTupleDescriptor &desc, const ROOT::RFieldDescriptor &fieldDesc, ColumnIdMap_t &colIdMap, const std::string &prefix="")
static RColumnInfoGroup GatherColumnInfos(const RDescriptorsComparison &descCmp, const ROOT::RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData)
static ROOT::RResult< void > ExtendDestinationModel(std::span< const ROOT::RFieldDescriptor * > newFields, ROOT::RNTupleModel &dstModel, RNTupleMergeData &mergeData, std::vector< RCommonField > &commonFields)
static std::optional< ENTupleMergeVersionBehavior > ParseOptionVersionBehavior(const TString &opts)
static bool BeginsWithDelimitedWord(const TString &str, const char *word)
#define b(i)
Definition RSha256.hxx:100
#define f(i)
Definition RSha256.hxx:104
#define a(i)
Definition RSha256.hxx:99
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
int Ssiz_t
String size (currently int).
Definition RtypesCore.h:81
TBuffer & operator<<(TBuffer &buf, const Tmpl *obj)
Definition TBuffer.h:397
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
char name[80]
Definition TGX11.cxx:148
Double_t err
TRObject operator()(const T1 &t1) const
The available trivial, native content types of a column.
Given a set of RPageSources merge them into an RPagePersistentSink, optionally changing their compres...
ROOT::RResult< void > MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, const ROOT::RClusterDescriptor &clusterDesc, std::span< const RColumnMergeInfo > commonColumns, const ROOT::Internal::RCluster::ColumnSet_t &commonColumnSet, std::size_t nCommonColumnsInCluster, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData, ROOT::Internal::RPageAllocator &pageAlloc)
std::unique_ptr< ROOT::RNTupleModel > fModel
RNTupleMerger(std::unique_ptr< ROOT::Internal::RPagePersistentSink > destination, std::unique_ptr< ROOT::RNTupleModel > model)
Creates a RNTupleMerger with the given destination.
ROOT::RResult< void > MergeSourceClusters(ROOT::Internal::RPageSource &source, std::span< const RColumnMergeInfo > commonColumns, std::span< const RColumnMergeInfo > extraDstColumns, RNTupleMergeData &mergeData)
std::unique_ptr< ROOT::Internal::RPagePersistentSink > fDestination
std::unique_ptr< ROOT::Internal::RPageAllocator > fPageAlloc
RResult< void > Merge(std::span< ROOT::Internal::RPageSource * > sources, const RNTupleMergeOptions &mergeOpts=RNTupleMergeOptions())
Merge a given set of sources into the destination.
A class to manage the asynchronous execution of work items.
Managed a set of clusters containing compressed and packed pages.
RCluster * GetCluster(ROOT::DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:147
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:149
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
Definition RCluster.cxx:30
A column element encapsulates the translation between basic C++ types and their column representation...
static const char * GetColumnTypeName(ROOT::ENTupleColumnType type)
static std::unique_ptr< RColumnElementBase > Generate(ROOT::ENTupleColumnType type)
If CppT == void, use the default C++ type for the given column type.
virtual RIdentifier GetIdentifier() const =0
std::size_t GetPackedSize(std::size_t nElements=1U) const
The in-memory representation of a 32bit or 64bit on-disk index column.
Holds the index and the tag of a kSwitch column.
static std::size_t Zip(const void *from, std::size_t nbytes, int compression, void *to)
Returns the size of the compressed data, written into the provided output buffer.
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
A helper class for serializing and deserialization of the RNTuple binary format.
@ kRaw
Deserializes the descriptor as-is without performing any additional fixup.
@ kForWriting
Deserializes the descriptor and performs fixup on the suppressed column ranges.
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
virtual RPage NewPage(std::size_t elementSize, std::size_t nElements)=0
Reserves memory large enough to hold nElements of the given size.
Abstract interface to write data into an ntuple.
const ROOT::RNTupleWriteOptions & GetWriteOptions() const
Returns the sink's write options.
virtual std::uint64_t CommitCluster(ROOT::NTupleSize_t nNewEntries)
Finalize the current cluster and create a new one for the following data.
virtual void UpdateSchema(const ROOT::Internal::RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry)=0
Incorporate incremental changes to the model into the ntuple descriptor.
RSealedPage SealPage(const ROOT::Internal::RPage &page, const ROOT::Internal::RColumnElementBase &element)
Helper for streaming a page.
virtual void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges)=0
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
virtual ROOT::NTupleSize_t GetNEntries() const =0
Storage provider that reads ntuple pages from a file.
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
Abstract interface to read data from an ntuple.
Common functionality of an ntuple storage for both reading and writing.
static constexpr std::size_t kNBytesPageChecksum
The page checksum is a 64bit xxhash3.
std::deque< RSealedPage > SealedPageSequence_t
void * GrowUnchecked(std::uint32_t nElements)
Increases the number elements in the page.
Definition RPage.hxx:149
std::unordered_map< const ROOT::RFieldBase *, const ROOT::RFieldBase * > FieldMap_t
The map keys are the projected target fields, the map values are the backing source fields Note that ...
RResult< void > Add(std::unique_ptr< ROOT::RFieldBase > field, const FieldMap_t &fieldMap)
Adds a new projected field.
std::optional< std::uint32_t > GetCompressionSettings() const
Metadata for RNTuple clusters.
ROOT::NTupleSize_t GetNEntries() const
ROOT::DescriptorId_t GetId() const
const RPageRange & GetPageRange(ROOT::DescriptorId_t physicalId) const
bool ContainsColumn(ROOT::DescriptorId_t physicalId) const
const RColumnRange & GetColumnRange(ROOT::DescriptorId_t physicalId) const
ROOT::NTupleSize_t GetFirstEntryIndex() const
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
Metadata stored for every field of an RNTuple.
const std::vector< ROOT::DescriptorId_t > & GetLogicalColumnIds() const
ROOT::ENTupleStructure GetStructure() const
const std::vector< ROOT::DescriptorId_t > & GetLinkIds() const
ROOT::DescriptorId_t GetParentId() const
std::uint64_t GetNRepetitions() const
ROOT::DescriptorId_t GetProjectionSourceId() const
const std::string & GetFieldName() const
const std::string & GetTypeName() const
A log configuration for a channel, e.g.
Definition RLogger.hxx:97
The on-storage metadata of an RNTuple.
const RColumnDescriptor & GetColumnDescriptor(ROOT::DescriptorId_t columnId) const
ROOT::DescriptorId_t FindNextClusterId(ROOT::DescriptorId_t clusterId) const
const RFieldDescriptor & GetFieldDescriptor(ROOT::DescriptorId_t fieldId) const
const std::string & GetName() const
ROOT::DescriptorId_t FindClusterId(ROOT::NTupleSize_t entryIdx) const
std::uint64_t GetVersion() const
const RClusterDescriptor & GetClusterDescriptor(ROOT::DescriptorId_t clusterId) const
ROOT::DescriptorId_t FindFieldId(std::string_view fieldName, ROOT::DescriptorId_t parentId) const
RFieldDescriptorIterable GetTopLevelFields() const
std::string GetQualifiedFieldName(ROOT::DescriptorId_t fieldId) const
Walks up the parents of the field ID and returns a field name of the form a.b.c.d In case of invalid ...
The RNTupleModel encapulates the schema of an RNTuple.
void Unfreeze()
Transitions an RNTupleModel from the frozen state back to the building state, invalidating all previo...
void Freeze()
Transitions an RNTupleModel from the building state to the frozen state, disabling adding additional ...
const ROOT::RFieldBase & GetConstField(std::string_view fieldName) const
Common user-tunable settings for storing RNTuples.
void SetCompression(std::uint32_t val)
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:67
Long64_t Merge(TCollection *input, TFileMergeInfo *mergeInfo)
RNTuple implements the hadd MergeFile interface Merge this NTuple with the input list entries.
static constexpr std::uint64_t GetCurrentVersion()
Returns the RNTuple version in the following form: Epoch: 2 most significant bytes Major: next 2 byte...
Definition RNTuple.hxx:89
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
Definition RError.hxx:289
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Definition RError.hxx:197
RSealedPage SealPage(const ROOT::Internal::RPage &page, const ROOT::Internal::RColumnElementBase &element)
Helper for streaming a page.
static RResult< ROOT::Internal::RPage > UnsealPage(const RSealedPage &sealedPage, const ROOT::Internal::RColumnElementBase &element, ROOT::Internal::RPageAllocator &pageAlloc)
Helper for unstreaming a page.
static constexpr std::size_t kNBytesPageChecksum
The page checksum is a 64bit xxhash3.
Collection abstract base class.
Definition TCollection.h:65
virtual Int_t GetEntries() const
TKey * FindKey(const char *keyname) const override
TObject * Get(const char *namecycle) override
Return pointer to object identified by namecycle.
A class to pass information from the TFileMerger to the objects being merged.
TString fOptions
Additional text based option being passed down to customize the merge.
A file, usually with extension .root, that stores data and code in the form of serialized objects in ...
Definition TFile.h:130
Int_t GetCompressionSettings() const
Definition TFile.h:489
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition TKey.h:28
T * ReadObject()
To read an object (non deriving from TObject) from the file.
Definition TKey.h:105
const char * GetName() const override
Returns name of object.
Definition TNamed.h:49
Mother of all ROOT objects.
Definition TObject.h:42
virtual const char * ClassName() const
Returns name of class to which the object belongs.
Definition TObject.cxx:227
Basic string class.
Definition TString.h:138
Ssiz_t Length() const
Definition TString.h:425
@ kIgnoreCase
Definition TString.h:285
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition TString.h:641
Ssiz_t Index(const char *pat, Ssiz_t i=0, ECaseCompare cmp=kExact) const
Definition TString.h:660
Double_t x[n]
Definition legend1.C:17
Double_t ex[n]
Definition legend1.C:17
@ kStrict
The merger will refuse to merge any 2 RNTuples whose schema doesn't match exactly.
@ kUnion
The merger will update the output model to include all columns from all sources.
@ kFilter
The merger will discard all columns that aren't present in the prototype model (i....
@ kAbortOnHigherVersion
The merger will refuse to merge RNTuples with higher versions than the latest supported by this ROOT ...
@ kWarnOnHigherVersion
The merger will emit a warning when merging RNTuples with higher version than the latest supported by...
@ kAbort
The merger will abort merging as soon as an error is encountered.
@ kSkip
Upon errors, the merger will skip the current source and continue.
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
std::unique_ptr< RColumnElementBase > GenerateColumnElement(std::type_index inMemoryType, ROOT::ENTupleColumnType onDiskType)
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:669
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
constexpr DescriptorId_t kInvalidDescriptorId
ENTupleColumnType
const ROOT::RFieldDescriptor * fParentFieldDescriptor
std::optional< std::type_index > fInMemoryType
const ROOT::RNTupleDescriptor * fParentNTupleDescriptor
const ROOT::RNTupleDescriptor * fSrcDescriptor
RNTupleMergeData(std::span< RPageSource * > sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts)
const ROOT::RNTupleDescriptor & fDstDescriptor
Set of merging options to pass to RNTupleMerger.
ENTupleMergingMode fMergingMode
Determines how the merging treats sources with different models (.
ENTupleMergeVersionBehavior fVersionBehavior
Determines how the Merge function behaves depending on the RNTuple sources' version.
ENTupleMergeErrBehavior fErrBehavior
Determines how the Merge function behaves upon merging errors.
std::optional< std::uint32_t > fCompressionSettings
If fCompressionSettings is empty (the default), the merger will not change the compression of any of ...
bool fExtraVerbose
If true, the merger will emit further diagnostics and information.
std::vector< RPageStorage::RSealedPageGroup > fGroups
std::vector< std::unique_ptr< std::byte[]> > fBuffers
std::deque< RPageStorage::SealedPageSequence_t > fPagesV
The incremental changes to a RNTupleModel.
void AddField(std::unique_ptr< ROOT::RFieldBase > field, std::string_view parentName="")
std::vector< ROOT::RFieldBase * > fAddedProjectedFields
Points to the projected fields in fModel that were added as part of an updater transaction.
std::vector< ROOT::RFieldBase * > fAddedFields
Points to the fields in fModel that were added as part of an updater transaction.
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:50
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RResult< void > VerifyChecksumIfEnabled() const
void SetNElements(std::uint32_t nElements)
void SetBufferSize(std::size_t bufferSize)
@ kUseGeneralPurpose
Use the new recommended general-purpose setting; it is a best trade-off between compression ratio/dec...
Definition Compression.h:58
Parameters for the SealPage() method.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
std::uint32_t fCompressionSettings
Compression algorithm and level to apply.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
const ROOT::Internal::RPage * fPage
Input page to be sealed.
const ROOT::Internal::RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RResult< void > VerifyChecksumIfEnabled() const
std::uint32_t GetNElements() const
const void * GetBuffer() const
std::size_t GetDataSize() const