Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleMerger.cxx
Go to the documentation of this file.
1/// \file RNTupleMerger.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>, Max Orok <maxwellorok@gmail.com>, Alaettin Serhan Mete <amete@anl.gov>,
4/// Giacomo Parolini <giacomo.parolini@cern.ch>
5/// \date 2020-07-08
6/// \warning This is part of the ROOT 7 prototype! It will
7/// change without notice. It might trigger earthquakes. Feedback is welcome!
8
9/*************************************************************************
10 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
11 * All rights reserved. *
12 * *
13 * For the licensing terms see $ROOTSYS/LICENSE. *
14 * For the list of contributors see $ROOTSYS/README/CREDITS. *
15 *************************************************************************/
16
17#include <ROOT/RError.hxx>
18#include <ROOT/RNTuple.hxx>
21#include <ROOT/RNTupleModel.hxx>
22#include <ROOT/RNTupleUtil.hxx>
24#include <ROOT/RPageStorage.hxx>
25#include <ROOT/RClusterPool.hxx>
27#include <ROOT/RNTupleZip.hxx>
29#include <TROOT.h>
30#include <TFileMergeInfo.h>
31#include <TError.h>
32#include <TFile.h>
33#include <TKey.h>
34
35#include <algorithm>
36#include <deque>
37#include <inttypes.h> // for PRIu64
38#include <unordered_map>
39#include <vector>
40
41using namespace ROOT::Experimental;
42using namespace ROOT::Experimental::Internal;
43
44// Entry point for TFileMerger. Internally calls RNTupleMerger::Merge().
46// IMPORTANT: this function must not throw, as it is used in exception-unsafe code (TFileMerger).
47try {
48 // Check the inputs
49 if (!inputs || inputs->GetEntries() < 3 || !mergeInfo) {
50 Error("RNTuple::Merge", "Invalid inputs.");
51 return -1;
52 }
53
54 // Parse the input parameters
55 TIter itr(inputs);
56
57 // First entry is the RNTuple name
58 std::string ntupleName = std::string(itr()->GetName());
59
60 // Second entry is the output file
61 TObject *secondArg = itr();
62 TFile *outFile = dynamic_cast<TFile *>(secondArg);
63 if (!outFile) {
64 Error("RNTuple::Merge", "Second input parameter should be a TFile, but it's a %s.", secondArg->ClassName());
65 return -1;
66 }
67
68 // Check if the output file already has a key with that name
69 TKey *outKey = outFile->FindKey(ntupleName.c_str());
70 RNTuple *outNTuple = nullptr;
71 if (outKey) {
72 outNTuple = outKey->ReadObject<RNTuple>();
73 if (!outNTuple) {
74 Error("RNTuple::Merge", "Output file already has key, but not of type RNTuple!");
75 return -1;
76 }
77 // In principle, we should already be working on the RNTuple object from the output file, but just continue with
78 // pointer we just got.
79 }
80
81 // The "fast" option is present if and only if we don't want to change compression.
82 const int compression =
84
85 RNTupleWriteOptions writeOpts;
86 writeOpts.SetUseBufferedWrite(false);
87 if (compression != kUnknownCompressionSettings)
88 writeOpts.SetCompression(compression);
89 auto destination = std::make_unique<RPageSinkFile>(ntupleName, *outFile, writeOpts);
90
91 // If we already have an existing RNTuple, copy over its descriptor to support incremental merging
92 if (outNTuple) {
93 auto source = RPageSourceFile::CreateFromAnchor(*outNTuple);
94 source->Attach();
95 auto desc = source->GetSharedDescriptorGuard();
96 destination->InitFromDescriptor(desc.GetRef());
97 }
98
99 // The remaining entries are the input files
100 std::vector<std::unique_ptr<RPageSourceFile>> sources;
101 std::vector<RPageSource *> sourcePtrs;
102
103 while (const auto &pitr = itr()) {
104 TFile *inFile = dynamic_cast<TFile *>(pitr);
105 RNTuple *anchor = inFile ? inFile->Get<RNTuple>(ntupleName.c_str()) : nullptr;
106 if (!anchor) {
107 Error("RNTuple::Merge", "Failed to retrieve RNTuple anchor named '%s' from file '%s'", ntupleName.c_str(),
108 inFile->GetName());
109 return -1;
110 }
111 sources.push_back(RPageSourceFile::CreateFromAnchor(*anchor));
112 }
113
114 // Interface conversion
115 sourcePtrs.reserve(sources.size());
116 for (const auto &s : sources) {
117 sourcePtrs.push_back(s.get());
118 }
119
120 // Now merge
121 RNTupleMerger merger;
122 RNTupleMergeOptions mergerOpts;
123 mergerOpts.fCompressionSettings = compression;
124 merger.Merge(sourcePtrs, *destination).ThrowOnError();
125
126 // Provide the caller with a merged anchor object (even though we've already
127 // written it).
128 *this = *outFile->Get<RNTuple>(ntupleName.c_str());
129
130 return 0;
131} catch (const RException &ex) {
132 Error("RNTuple::Merge", "Exception thrown while merging: %s", ex.what());
133 return -1;
134}
135
136namespace {
137// Functor used to change the compression of a page to `options.fCompressionSettings`.
138struct RChangeCompressionFunc {
139 DescriptorId_t fOutputColumnId;
140
141 const RColumnElementBase &fSrcColElement;
142 const RColumnElementBase &fDstColElement;
143 const RNTupleMergeOptions &fMergeOptions;
144
145 RPageStorage::RSealedPage &fSealedPage;
146 RPageAllocator &fPageAlloc;
147 std::uint8_t *fBuffer;
148
149 void operator()() const
150 {
151 auto page = RPageSource::UnsealPage(fSealedPage, fSrcColElement, fOutputColumnId, fPageAlloc).Unwrap();
153 sealConf.fElement = &fDstColElement;
154 sealConf.fPage = &page;
155 sealConf.fBuffer = fBuffer;
156 sealConf.fCompressionSetting = fMergeOptions.fCompressionSettings;
157 sealConf.fWriteChecksum = fSealedPage.GetHasChecksum();
158 auto refSealedPage = RPageSink::SealPage(sealConf);
159 fSealedPage = refSealedPage;
160 }
161};
162
163struct RDescriptorsComparison {
164 std::vector<const RFieldDescriptor *> fExtraDstFields;
165 std::vector<const RFieldDescriptor *> fExtraSrcFields;
166 std::vector<const RFieldDescriptor *> fCommonFields;
167};
168
169struct RColumnOutInfo {
170 DescriptorId_t fColumnId;
171 EColumnType fColumnType;
172};
173
174// { fully.qualified.fieldName.colInputId => colOutputInfo }
175using ColumnIdMap_t = std::unordered_map<std::string, RColumnOutInfo>;
176
177struct RColumnInfoGroup {
178 std::vector<RColumnInfo> fExtraDstColumns;
179 std::vector<RColumnInfo> fCommonColumns;
180};
181
182} // namespace
183
184// These structs cannot be in the anon namespace becase they're used in RNTupleMerger's private interface.
187 // This column name is built as a dot-separated concatenation of the ancestry of
188 // the columns' parent fields' names plus the index of the column itself.
189 // e.g. "Muon.pt.x._0"
190 std::string fColumnName;
195};
196
197// Data related to a single call of RNTupleMerger::Merge()
199 std::span<RPageSource *> fSources;
204
205 std::vector<RColumnInfo> fColumns;
206 ColumnIdMap_t fColumnIdMap;
207
209
210 RNTupleMergeData(std::span<RPageSource *> sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts)
211 : fSources{sources}, fDestination{destination}, fMergeOpts{mergeOpts}, fDstDescriptor{destination.GetDescriptor()}
212 {
213 }
214};
215
217 // We use a std::deque so that references to the contained SealedPageSequence_t, and its iterators, are
218 // never invalidated.
219 std::deque<RPageStorage::SealedPageSequence_t> fPagesV;
220 std::vector<RPageStorage::RSealedPageGroup> fGroups;
221 std::vector<std::unique_ptr<std::uint8_t[]>> fBuffers;
222};
223} // namespace ROOT::Experimental::Internal
224
225/// Compares the top level fields of `dst` and `src` and determines whether they can be merged or not.
226/// In addition, returns the differences between `dst` and `src`'s structures
229{
230 // Cases:
231 // 1. dst == src
232 // 2. dst has fields that src hasn't
233 // 3. src has fields that dst hasn't
234 // 4. dst and src have fields that differ (compatible or incompatible)
235
236 std::vector<std::string> errors;
237 RDescriptorsComparison res;
238
239 struct RCommonField {
240 const RFieldDescriptor *fDst;
241 const RFieldDescriptor *fSrc;
242 };
243 std::vector<RCommonField> commonFields;
244
245 for (const auto &dstField : dst.GetTopLevelFields()) {
246 const auto srcFieldId = src.FindFieldId(dstField.GetFieldName());
247 if (srcFieldId != kInvalidDescriptorId) {
248 const auto &srcField = src.GetFieldDescriptor(srcFieldId);
249 commonFields.push_back({&dstField, &srcField});
250 } else {
251 res.fExtraDstFields.emplace_back(&dstField);
252 }
253 }
254 for (const auto &srcField : src.GetTopLevelFields()) {
255 const auto dstFieldId = dst.FindFieldId(srcField.GetFieldName());
256 if (dstFieldId == kInvalidDescriptorId)
257 res.fExtraSrcFields.push_back(&srcField);
258 }
259
260 // Check compatibility of common fields
261 for (const auto &field : commonFields) {
262 // NOTE: field.fSrc and field.fDst have the same name by construction
263 const auto &fieldName = field.fSrc->GetFieldName();
264
265 // Require that fields are both projected or both not projected
266 bool projCompatible = field.fSrc->IsProjectedField() == field.fDst->IsProjectedField();
267 if (!projCompatible) {
268 std::stringstream ss;
269 ss << "Field `" << fieldName << "` is incompatible with previously-seen field with that name because the "
270 << (field.fSrc->IsProjectedField() ? "new" : "old") << " one is projected and the other isn't";
271 errors.push_back(ss.str());
272 } else if (field.fSrc->IsProjectedField()) {
273 // if both fields are projected, verify that they point to the same real field
274 const auto srcName = src.GetQualifiedFieldName(field.fSrc->GetProjectionSourceId());
275 const auto dstName = dst.GetQualifiedFieldName(field.fDst->GetProjectionSourceId());
276 if (srcName != dstName) {
277 std::stringstream ss;
278 ss << "Field `" << fieldName
279 << "` is projected to a different field than a previously-seen field with the same name (old: " << dstName
280 << ", new: " << srcName << ")";
281 errors.push_back(ss.str());
282 }
283 }
284
285 // Require that fields types match
286 // TODO(gparolini): allow non-identical but compatible types
287 const auto &srcTyName = field.fSrc->GetTypeName();
288 const auto &dstTyName = field.fDst->GetTypeName();
289 if (srcTyName != dstTyName) {
290 std::stringstream ss;
291 ss << "Field `" << fieldName
292 << "` has a type incompatible with a previously-seen field with the same name: (old: " << dstTyName
293 << ", new: " << srcTyName << ")";
294 errors.push_back(ss.str());
295 }
296
297 const auto srcTyChk = field.fSrc->GetTypeChecksum();
298 const auto dstTyChk = field.fDst->GetTypeChecksum();
299 if (srcTyChk && dstTyChk && *srcTyChk != *dstTyChk) {
300 std::stringstream ss;
301 ss << "Field `" << field.fSrc->GetFieldName()
302 << "` has a different type checksum than previously-seen field with the same name";
303 errors.push_back(ss.str());
304 }
305
306 const auto srcTyVer = field.fSrc->GetTypeVersion();
307 const auto dstTyVer = field.fDst->GetTypeVersion();
308 if (srcTyVer != dstTyVer) {
309 std::stringstream ss;
310 ss << "Field `" << field.fSrc->GetFieldName()
311 << "` has a different type version than previously-seen field with the same name (old: " << dstTyVer
312 << ", new: " << srcTyVer << ")";
313 errors.push_back(ss.str());
314 }
315 }
316
317 std::string errMsg;
318 for (const auto &err : errors)
319 errMsg += std::string("\n * ") + err;
320
321 if (!errMsg.empty())
322 errMsg = errMsg.substr(1); // strip initial newline
323
324 if (errMsg.length())
325 return R__FAIL(errMsg);
326
327 res.fCommonFields.reserve(commonFields.size());
328 for (const auto &[_, srcField] : commonFields) {
329 res.fCommonFields.emplace_back(srcField);
330 }
331
332 // TODO(gparolini): we should exhaustively check the field tree rather than just the top level fields,
333 // in case the user forgets to change the version number on one field.
334
335 return RResult(res);
336}
337
338// Applies late model extension to `destination`, adding all `newFields` to it.
339static void ExtendDestinationModel(std::span<const RFieldDescriptor *> newFields, RNTupleModel &dstModel,
340 RNTupleMergeData &mergeData)
341{
342 assert(newFields.size() > 0); // no point in calling this with 0 new cols
343
344 dstModel.Unfreeze();
345 RNTupleModelChangeset changeset{dstModel};
346
347 std::string msg = "destination doesn't contain field";
348 if (newFields.size() > 1)
349 msg += 's';
350 msg += ' ';
351 msg += std::accumulate(newFields.begin(), newFields.end(), std::string{}, [](const auto &acc, const auto *field) {
352 return acc + (acc.length() ? ", " : "") + '`' + field->GetFieldName() + '`';
353 });
354 Info("RNTuple::Merge", "%s: adding %s to the destination model (entry #%" PRIu64 ").", msg.c_str(),
355 (newFields.size() > 1 ? "them" : "it"), mergeData.fNumDstEntries);
356
357 changeset.fAddedFields.reserve(newFields.size());
358 for (const auto *fieldDesc : newFields) {
359 auto field = fieldDesc->CreateField(*mergeData.fSrcDescriptor);
360 if (fieldDesc->IsProjectedField())
361 changeset.fAddedProjectedFields.emplace_back(field.get());
362 else
363 changeset.fAddedFields.emplace_back(field.get());
364 changeset.fModel.AddField(std::move(field));
365 }
366 dstModel.Freeze();
367 mergeData.fDestination.UpdateSchema(changeset, mergeData.fNumDstEntries);
368}
369
370// Merges all columns appearing both in the source and destination RNTuples, just copying them if their
371// compression matches ("fast merge") or by unsealing and resealing them with the proper compression.
373 std::span<RColumnInfo> commonColumns, RCluster::ColumnSet_t commonColumnSet,
374 RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData)
375{
376 assert(commonColumns.size() == commonColumnSet.size());
377 if (commonColumns.empty())
378 return;
379
380 const RCluster *cluster = clusterPool.GetCluster(clusterId, commonColumnSet);
381 // we expect the cluster pool to contain the requested set of columns, since they were
382 // validated by CompareDescriptorStructures().
383 assert(cluster);
384
385 const auto &clusterDesc = mergeData.fSrcDescriptor->GetClusterDescriptor(clusterId);
386
387 for (const auto &column : commonColumns) {
388 const auto &columnId = column.fInputId;
389 R__ASSERT(clusterDesc.ContainsColumn(columnId));
390
391 const auto &columnDesc = mergeData.fSrcDescriptor->GetColumnDescriptor(columnId);
392 const auto srcColElement = RColumnElementBase::Generate(columnDesc.GetType());
393 const auto dstColElement = RColumnElementBase::Generate(column.fColumnType);
394
395 // Now get the pages for this column in this cluster
396 const auto &pages = clusterDesc.GetPageRange(columnId);
397
399 sealedPages.resize(pages.fPageInfos.size());
400
401 // Each column range potentially has a distinct compression settings
402 const auto colRangeCompressionSettings = clusterDesc.GetColumnRange(columnId).fCompressionSettings;
403 const bool needsCompressionChange = mergeData.fMergeOpts.fCompressionSettings != kUnknownCompressionSettings &&
404 colRangeCompressionSettings != mergeData.fMergeOpts.fCompressionSettings;
405
406 if (needsCompressionChange && mergeData.fMergeOpts.fExtraVerbose)
407 Info("RNTuple::Merge", "Column %s: changing source compression from %d to %d", column.fColumnName.c_str(),
408 colRangeCompressionSettings, mergeData.fMergeOpts.fCompressionSettings);
409
410 // If the column range is already uncompressed we don't need to allocate any new buffer, so we don't
411 // bother reserving memory for them.
412 size_t pageBufferBaseIdx = sealedPageData.fBuffers.size();
413 if (colRangeCompressionSettings != 0)
414 sealedPageData.fBuffers.resize(sealedPageData.fBuffers.size() + pages.fPageInfos.size());
415
416 // Loop over the pages
417 std::uint64_t pageIdx = 0;
418 for (const auto &pageInfo : pages.fPageInfos) {
419 assert(pageIdx < sealedPages.size());
420 assert(sealedPageData.fBuffers.size() == 0 || pageIdx < sealedPageData.fBuffers.size());
421
422 ROnDiskPage::Key key{columnId, pageIdx};
423 auto onDiskPage = cluster->GetOnDiskPage(key);
424
425 const auto checksumSize = pageInfo.fHasChecksum * RPageStorage::kNBytesPageChecksum;
426 RPageStorage::RSealedPage &sealedPage = sealedPages[pageIdx];
427 sealedPage.SetNElements(pageInfo.fNElements);
428 sealedPage.SetHasChecksum(pageInfo.fHasChecksum);
429 sealedPage.SetBufferSize(pageInfo.fLocator.fBytesOnStorage + checksumSize);
430 sealedPage.SetBuffer(onDiskPage->GetAddress());
431 // TODO(gparolini): more graceful error handling (skip the page?)
433 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));
434
435 if (needsCompressionChange) {
436 const auto uncompressedSize = srcColElement->GetSize() * sealedPage.GetNElements();
437 auto &buffer = sealedPageData.fBuffers[pageBufferBaseIdx + pageIdx];
438 buffer = std::make_unique<std::uint8_t[]>(uncompressedSize + checksumSize);
439 RChangeCompressionFunc compressTask{
440 column.fOutputId, *srcColElement, *dstColElement, mergeData.fMergeOpts,
441 sealedPage, *fPageAlloc, buffer.get(),
442 };
443
444 if (fTaskGroup)
445 fTaskGroup->Run(compressTask);
446 else
447 compressTask();
448 }
449
450 ++pageIdx;
451
452 } // end of loop over pages
453
454 if (fTaskGroup)
455 fTaskGroup->Wait();
456
457 sealedPageData.fPagesV.push_back(std::move(sealedPages));
458 sealedPageData.fGroups.emplace_back(column.fOutputId, sealedPageData.fPagesV.back().cbegin(),
459 sealedPageData.fPagesV.back().cend());
460 } // end loop over common columns
461}
462
463// Generates default values for columns that are not present in the current source RNTuple
464// but are present in the destination's schema.
465static void GenerateExtraDstColumns(size_t nClusterEntries, std::span<RColumnInfo> extraDstColumns,
466 RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData)
467{
468 for (const auto &column : extraDstColumns) {
469 const auto &columnId = column.fInputId;
470 const auto &columnDesc = mergeData.fDstDescriptor.GetColumnDescriptor(columnId);
471 const RFieldDescriptor *field = column.fParentField;
472
473 // Skip all auxiliary columns
474 if (field->GetLogicalColumnIds()[0] != columnId)
475 continue;
476
477 // Check if this column is a child of a Collection or a Variant. If so, it has no data
478 // and can be skipped.
479 bool skipColumn = false;
480 auto nRepetitions = std::max<std::uint64_t>(field->GetNRepetitions(), 1);
481 for (auto parentId = field->GetParentId(); parentId != kInvalidDescriptorId;) {
482 const RFieldDescriptor &parent = mergeData.fSrcDescriptor->GetFieldDescriptor(parentId);
485 skipColumn = true;
486 break;
487 }
488 nRepetitions *= std::max<std::uint64_t>(parent.GetNRepetitions(), 1);
489 parentId = parent.GetParentId();
490 }
491 if (skipColumn)
492 continue;
493
494 const auto structure = field->GetStructure();
495
496 if (structure == ENTupleStructure::kUnsplit) {
497 Fatal(
498 "RNTuple::Merge",
499 "Destination RNTuple contains an Unsplit field (%s) that is not present in one of the sources. "
500 "Creating a default value for an Unsplit field is ill-defined, therefore the merging process will abort.",
501 field->GetFieldName().c_str());
502 continue;
503 }
504
505 // NOTE: we cannot have a Record here because it has no associated columns.
507 structure == ENTupleStructure::kLeaf);
508
509 const auto colElement = RColumnElementBase::Generate(columnDesc.GetType());
510 const auto nElements = nClusterEntries * nRepetitions;
511 const auto bytesOnStorage = colElement->GetPackedSize(nElements);
512 constexpr auto kPageSizeLimit = 256 * 1024;
513 // TODO(gparolini): consider coalescing the last page if its size is less than some threshold
514 const size_t nPages = bytesOnStorage / kPageSizeLimit + !!(bytesOnStorage % kPageSizeLimit);
515 for (size_t i = 0; i < nPages; ++i) {
516 const auto pageSize = (i < nPages - 1) ? kPageSizeLimit : bytesOnStorage - kPageSizeLimit * (nPages - 1);
517 const auto checksumSize = RPageStorage::kNBytesPageChecksum;
518 const auto bufSize = pageSize + checksumSize;
519 auto &buffer = sealedPageData.fBuffers.emplace_back(new unsigned char[bufSize]);
520
521 RPageStorage::RSealedPage sealedPage{buffer.get(), bufSize, static_cast<std::uint32_t>(nElements), true};
522 memset(buffer.get(), 0, pageSize);
523 sealedPage.ChecksumIfEnabled();
524
525 sealedPageData.fPagesV.push_back({sealedPage});
526 }
527
528 sealedPageData.fGroups.emplace_back(column.fOutputId, sealedPageData.fPagesV.back().cbegin(),
529 sealedPageData.fPagesV.back().cend());
530 }
531}
532
533// Iterates over all clusters of `source` and merges their pages into `destination`.
534// It is assumed that all columns in `commonColumns` are present (and compatible) in both the source and
535// the destination's schemas.
536// The pages may be "fast-merged" (i.e. simply copied with no decompression/recompression) if the target
537// compression is unspecified or matches the original compression settings.
538void RNTupleMerger::MergeSourceClusters(RPageSource &source, std::span<RColumnInfo> commonColumns,
539 std::span<RColumnInfo> extraDstColumns, RNTupleMergeData &mergeData)
540{
541 RClusterPool clusterPool{source};
542
543 // Convert columns to a ColumnSet for the ClusterPool query
544 RCluster::ColumnSet_t commonColumnSet;
545 commonColumnSet.reserve(commonColumns.size());
546 for (const auto &column : commonColumns)
547 commonColumnSet.emplace(column.fInputId);
548
549 RCluster::ColumnSet_t extraDstColumnSet;
550 extraDstColumnSet.reserve(extraDstColumns.size());
551 for (const auto &column : extraDstColumns)
552 extraDstColumnSet.emplace(column.fInputId);
553
554 // Loop over all clusters in this file.
555 // descriptor->GetClusterIterable() doesn't guarantee any specific order, so we explicitly
556 // request the first cluster.
557 DescriptorId_t clusterId = mergeData.fSrcDescriptor->FindClusterId(0, 0);
558 while (clusterId != kInvalidDescriptorId) {
559 const auto &clusterDesc = mergeData.fSrcDescriptor->GetClusterDescriptor(clusterId);
560 const auto nClusterEntries = clusterDesc.GetNEntries();
561 R__ASSERT(nClusterEntries > 0);
562
563 RSealedPageMergeData sealedPageData;
564
565 if (!commonColumnSet.empty()) {
566 MergeCommonColumns(clusterPool, clusterId, commonColumns, commonColumnSet, sealedPageData, mergeData);
567 }
568
569 if (!extraDstColumnSet.empty()) {
570 GenerateExtraDstColumns(nClusterEntries, extraDstColumns, sealedPageData, mergeData);
571 }
572
573 // Commit the pages and the clusters
574 mergeData.fDestination.CommitSealedPageV(sealedPageData.fGroups);
575 mergeData.fDestination.CommitCluster(nClusterEntries);
576 mergeData.fNumDstEntries += nClusterEntries;
577
578 // Go to the next cluster
579 clusterId = mergeData.fSrcDescriptor->FindNextClusterId(clusterId);
580 }
581
582 // TODO(gparolini): when we get serious about huge file support (>~ 100GB) we might want to check here
583 // the size of the running page list and commit a cluster group when it exceeds some threshold,
584 // which would prevent the page list from getting too large.
585 // However, as of today, we aren't really handling such huge files, and even relatively big ones
586 // such as the CMS dataset have a page list size of about only 2 MB.
587 // So currently we simply merge all cluster groups into one.
588}
589
590// Given a field, fill `columns` and `colIdMap` with information about all columns belonging to it and its subfields.
591// `colIdMap` is used to map matching columns from different sources to the same output column in the destination.
592// We match columns by their "fully qualified name", which is the concatenation of their ancestor fields' names
593// and the column index.
594// By this point, since we called `CompareDescriptorStructures()` earlier, we should be guaranteed that two matching
595// columns will have at least compatible representations.
596static void AddColumnsFromField(std::vector<RColumnInfo> &columns, const RNTupleDescriptor &srcDesc,
597 RNTupleMergeData &mergeData, const RFieldDescriptor &fieldDesc,
598 const std::string &prefix = "")
599{
600 std::string name = prefix + '.' + fieldDesc.GetFieldName();
601
602 const auto &columnIds = fieldDesc.GetLogicalColumnIds();
603 columns.reserve(columns.size() + columnIds.size());
604 for (const auto &columnId : columnIds) {
605 const auto &srcColumn = srcDesc.GetColumnDescriptor(columnId);
606 RColumnInfo info;
607 info.fColumnName = name + '.' + std::to_string(srcColumn.GetIndex());
608 info.fInputId = columnId;
609 info.fParentField = &fieldDesc;
610
611 if (auto it = mergeData.fColumnIdMap.find(info.fColumnName); it != mergeData.fColumnIdMap.end()) {
612 info.fOutputId = it->second.fColumnId;
613 info.fColumnType = it->second.fColumnType;
614 } else {
615 info.fOutputId = mergeData.fColumnIdMap.size();
616 // NOTE(gparolini): map the type of src column to the type of dst column.
617 // This mapping is only relevant for common columns and it's done to ensure we keep a consistent
618 // on-disk representation of the same column.
619 // This is also important to do for first source when it is used to generate the destination sink,
620 // because even in that case their column representations may differ.
621 // e.g. if the destination has a different compression than the source, an integer column might be
622 // zigzag-encoded in the source but not in the destination.
623 const auto &dstColumn = (&mergeData.fDstDescriptor == &srcDesc)
624 ? srcColumn
625 : mergeData.fDstDescriptor.GetColumnDescriptor(columnId);
626 info.fColumnType = dstColumn.GetType();
627 mergeData.fColumnIdMap[info.fColumnName] = {info.fOutputId, info.fColumnType};
628 }
629 columns.emplace_back(info);
630 }
631
632 for (const auto &field : srcDesc.GetFieldIterable(fieldDesc))
633 AddColumnsFromField(columns, srcDesc, mergeData, field, name);
634}
635
636// Converts the fields comparison data to the corresponding column information.
637// While doing so, it collects such information in `colIdMap`, which is used by later calls to this function
638// to map already-seen column names to their chosen outputId, type and so on.
639static RColumnInfoGroup
640GatherColumnInfos(const RDescriptorsComparison &descCmp, const RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData)
641{
642 RColumnInfoGroup res;
643 for (const RFieldDescriptor *field : descCmp.fExtraDstFields) {
644 AddColumnsFromField(res.fExtraDstColumns, mergeData.fDstDescriptor, mergeData, *field);
645 }
646 for (const auto *field : descCmp.fCommonFields) {
647 AddColumnsFromField(res.fCommonColumns, srcDesc, mergeData, *field);
648 }
649 return res;
650}
651
653 // TODO(gparolini): consider using an arena allocator instead, since we know the precise lifetime
654 // of the RNTuples we are going to handle (e.g. we can reset the arena at every source)
655 : fPageAlloc(std::make_unique<RPageAllocatorHeap>())
656{
657#ifdef R__USE_IMT
660#endif
661}
662
664RNTupleMerger::Merge(std::span<RPageSource *> sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts)
665{
666 RNTupleMergeData mergeData{sources, destination, mergeOpts};
667
668 std::unique_ptr<RNTupleModel> model; // used to initialize the schema of the output RNTuple
669
670#define SKIP_OR_ABORT(errMsg) \
671 do { \
672 if (mergeOpts.fErrBehavior == ENTupleMergeErrBehavior::kSkip) { \
673 Warning("RNTuple::Merge", "Skipping RNTuple due to: %s", (errMsg).c_str()); \
674 continue; \
675 } else { \
676 return R__FAIL(errMsg); \
677 } \
678 } while (0)
679
680 // Merge main loop
681 for (RPageSource *source : sources) {
682 source->Attach();
683 auto srcDescriptor = source->GetSharedDescriptorGuard();
684 mergeData.fSrcDescriptor = &srcDescriptor.GetRef();
685
686 // Create sink from the input model if not initialized
687 if (!destination.IsInitialized()) {
688 model = srcDescriptor->CreateModel();
689 destination.Init(*model);
690 }
691
692 for (const auto &extraTypeInfoDesc : srcDescriptor->GetExtraTypeInfoIterable())
693 destination.UpdateExtraTypeInfo(extraTypeInfoDesc);
694
695 auto descCmpRes = CompareDescriptorStructure(mergeData.fDstDescriptor, srcDescriptor.GetRef());
696 if (!descCmpRes) {
698 std::string("Source RNTuple will be skipped due to incompatible schema with the destination:\n") +
699 descCmpRes.GetError()->GetReport());
700 }
701 auto descCmp = descCmpRes.Unwrap();
702
703 // If the current source is missing some fields and we're not in Union mode, error
704 // (if we are in Union mode, MergeSourceClusters will fill the missing fields with default values).
705 if (mergeOpts.fMergingMode != ENTupleMergingMode::kUnion && !descCmp.fExtraDstFields.empty()) {
706 std::string msg = "Source RNTuple is missing the following fields:";
707 for (const auto *field : descCmp.fExtraDstFields) {
708 msg += "\n " + field->GetFieldName() + " : " + field->GetTypeName();
709 }
710 SKIP_OR_ABORT(msg);
711 }
712
713 // handle extra src fields
714 if (descCmp.fExtraSrcFields.size()) {
715 if (mergeOpts.fMergingMode == ENTupleMergingMode::kUnion) {
716 // late model extension for all fExtraSrcFields in Union mode
717 ExtendDestinationModel(descCmp.fExtraSrcFields, *model, mergeData);
718 descCmp.fCommonFields.insert(descCmp.fCommonFields.end(), descCmp.fExtraSrcFields.begin(),
719 descCmp.fExtraSrcFields.end());
720 } else if (mergeOpts.fMergingMode == ENTupleMergingMode::kStrict) {
721 // If the current source has extra fields and we're in Strict mode, error
722 std::string msg = "Source RNTuple has extra fields that the destination RNTuple doesn't have:";
723 for (const auto *field : descCmp.fExtraSrcFields) {
724 msg += "\n " + field->GetFieldName() + " : " + field->GetTypeName();
725 }
726 SKIP_OR_ABORT(msg);
727 }
728 }
729
730 // handle extra dst fields & common fields
731 auto columnInfos = GatherColumnInfos(descCmp, srcDescriptor.GetRef(), mergeData);
732 MergeSourceClusters(*source, columnInfos.fCommonColumns, columnInfos.fExtraDstColumns, mergeData);
733 } // end loop over sources
734
735 // Commit the output
736 destination.CommitClusterGroup();
737 destination.CommitDataset();
738
739 return RResult<void>::Success();
740}
fBuffer
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:290
static RResult< RDescriptorsComparison > CompareDescriptorStructure(const RNTupleDescriptor &dst, const RNTupleDescriptor &src)
Compares the top level fields of dst and src and determines whether they can be merged or not.
#define SKIP_OR_ABORT(errMsg)
static void GenerateExtraDstColumns(size_t nClusterEntries, std::span< RColumnInfo > extraDstColumns, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData)
static RColumnInfoGroup GatherColumnInfos(const RDescriptorsComparison &descCmp, const RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData)
static void ExtendDestinationModel(std::span< const RFieldDescriptor * > newFields, RNTupleModel &dstModel, RNTupleMergeData &mergeData)
static void AddColumnsFromField(std::vector< RColumnInfo > &columns, const RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData, const RFieldDescriptor &fieldDesc, const std::string &prefix="")
long long Long64_t
Definition RtypesCore.h:69
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
void Info(const char *location, const char *msgfmt,...)
Use this function for informational messages.
Definition TError.cxx:218
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:185
void Fatal(const char *location, const char *msgfmt,...)
Use this function in case of a fatal error. It will abort the program.
Definition TError.cxx:244
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t src
char name[80]
Definition TGX11.cxx:110
TRObject operator()(const T1 &t1) const
#define _(A, B)
Definition cfortran.h:108
The available trivial, native content types of a column.
Managed a set of clusters containing compressed and packed pages.
RCluster * GetCluster(DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:152
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
Definition RCluster.cxx:32
std::unordered_set< DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:154
A column element encapsulates the translation between basic C++ types and their column representation...
static std::unique_ptr< RColumnElementBase > Generate(EColumnType type)
If CppT == void, use the default C++ type for the given column type.
Given a set of RPageSources merge them into an RPageSink, optionally changing their compression.
RResult< void > Merge(std::span< RPageSource * > sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts=RNTupleMergeOptions())
Merge a given set of sources into the destination.
void MergeCommonColumns(RClusterPool &clusterPool, DescriptorId_t clusterId, std::span< RColumnInfo > commonColumns, RCluster::ColumnSet_t commonColumnSet, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData)
void MergeSourceClusters(RPageSource &source, std::span< RColumnInfo > commonColumns, std::span< RColumnInfo > extraDstColumns, RNTupleMergeData &mergeData)
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
Abstract interface to write data into an ntuple.
virtual void UpdateExtraTypeInfo(const RExtraTypeInfoDescriptor &extraTypeInfo)=0
Adds an extra type information record to schema.
void CommitDataset()
Run the registered callbacks and finalize the current cluster and the entrire data set.
void Init(RNTupleModel &model)
Physically creates the storage container to hold the ntuple (e.g., a keys a TFile or an S3 bucket) In...
virtual void CommitClusterGroup()=0
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
virtual std::uint64_t CommitCluster(NTupleSize_t nNewEntries)
Finalize the current cluster and create a new one for the following data.
virtual void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry)=0
Incorporate incremental changes to the model into the ntuple descriptor.
virtual void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges)=0
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
RSealedPage SealPage(const RPage &page, const RColumnElementBase &element)
Helper for streaming a page.
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const RNTupleReadOptions &options=RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
Abstract interface to read data from an ntuple.
static RResult< RPage > UnsealPage(const RSealedPage &sealedPage, const RColumnElementBase &element, DescriptorId_t physicalColumnId, RPageAllocator &pageAlloc)
Helper for unstreaming a page.
std::deque< RSealedPage > SealedPageSequence_t
static constexpr std::size_t kNBytesPageChecksum
The page checksum is a 64bit xxhash3.
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
Meta-data stored for every field of an ntuple.
const std::string & GetFieldName() const
const std::vector< DescriptorId_t > & GetLogicalColumnIds() const
The on-storage meta-data of an ntuple.
DescriptorId_t FindNextClusterId(DescriptorId_t clusterId) const
DescriptorId_t FindClusterId(DescriptorId_t physicalColumnId, NTupleSize_t index) const
const RClusterDescriptor & GetClusterDescriptor(DescriptorId_t clusterId) const
std::string GetQualifiedFieldName(DescriptorId_t fieldId) const
Walks up the parents of the field ID and returns a field name of the form a.b.c.d In case of invalid ...
DescriptorId_t FindFieldId(std::string_view fieldName, DescriptorId_t parentId) const
const RColumnDescriptor & GetColumnDescriptor(DescriptorId_t columnId) const
const RFieldDescriptor & GetFieldDescriptor(DescriptorId_t fieldId) const
RFieldDescriptorIterable GetTopLevelFields() const
RFieldDescriptorIterable GetFieldIterable(const RFieldDescriptor &fieldDesc) const
The RNTupleModel encapulates the schema of an ntuple.
Common user-tunable settings for storing ntuples.
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:61
Long64_t Merge(TCollection *input, TFileMergeInfo *mergeInfo)
RNTuple implements the hadd MergeFile interface Merge this NTuple with the input list entries.
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
Definition RError.hxx:281
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Definition RError.hxx:194
A class to manage the asynchronous execution of work items.
Collection abstract base class.
Definition TCollection.h:65
virtual Int_t GetEntries() const
TKey * FindKey(const char *keyname) const override
Find key with name keyname in the current directory.
TObject * Get(const char *namecycle) override
Return pointer to object identified by namecycle.
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Definition TFile.h:53
Int_t GetCompressionSettings() const
Definition TFile.h:397
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition TKey.h:28
T * ReadObject()
To read an object (non deriving from TObject) from the file.
Definition TKey.h:103
const char * GetName() const override
Returns name of object.
Definition TNamed.h:47
Mother of all ROOT objects.
Definition TObject.h:41
virtual const char * ClassName() const
Returns name of class to which the object belongs.
Definition TObject.cxx:213
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition TString.h:632
Double_t ex[n]
Definition legend1.C:17
@ kStrict
The merger will refuse to merge any 2 RNTuples whose schema doesn't match exactly.
@ kUnion
The merger will update the output model to include all columns from all sources.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
constexpr int kUnknownCompressionSettings
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr DescriptorId_t kInvalidDescriptorId
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:570
RNTupleMergeData(std::span< RPageSource * > sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts)
int fCompressionSettings
If fCompressionSettings == kUnknownCompressionSettings (the default), the merger will not change the ...
ENTupleMergingMode fMergingMode
Determines how the merging treats sources with different models (.
bool fExtraVerbose
If true, the merger will emit further diagnostics and information.
The incremental changes to a RNTupleModel
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:52
const RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
int fCompressionSetting
Compression algorithm and level to apply.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
std::vector< RPageStorage::RSealedPageGroup > fGroups
std::deque< RPageStorage::SealedPageSequence_t > fPagesV
std::vector< std::unique_ptr< std::uint8_t[]> > fBuffers