38#include <unordered_map>
49 if (!inputs || inputs->
GetEntries() < 3 || !mergeInfo) {
50 Error(
"RNTuple::Merge",
"Invalid inputs.");
58 std::string ntupleName = std::string(itr()->GetName());
62 TFile *outFile =
dynamic_cast<TFile *
>(secondArg);
64 Error(
"RNTuple::Merge",
"Second input parameter should be a TFile, but it's a %s.", secondArg->
ClassName());
69 TKey *outKey = outFile->
FindKey(ntupleName.c_str());
74 Error(
"RNTuple::Merge",
"Output file already has key, but not of type RNTuple!");
82 const int compression =
89 auto destination = std::make_unique<RPageSinkFile>(ntupleName, *outFile, writeOpts);
95 auto desc = source->GetSharedDescriptorGuard();
96 destination->InitFromDescriptor(desc.GetRef());
100 std::vector<std::unique_ptr<RPageSourceFile>> sources;
101 std::vector<RPageSource *> sourcePtrs;
103 while (
const auto &pitr = itr()) {
107 Error(
"RNTuple::Merge",
"Failed to retrieve RNTuple anchor named '%s' from file '%s'", ntupleName.c_str(),
115 sourcePtrs.reserve(sources.size());
116 for (
const auto &s : sources) {
117 sourcePtrs.push_back(s.get());
128 *
this = *outFile->
Get<
RNTuple>(ntupleName.c_str());
132 Error(
"RNTuple::Merge",
"Exception thrown while merging: %s",
ex.what());
138struct RChangeCompressionFunc {
153 sealConf.
fElement = &fDstColElement;
154 sealConf.
fPage = &page;
159 fSealedPage = refSealedPage;
163struct RDescriptorsComparison {
164 std::vector<const RFieldDescriptor *> fExtraDstFields;
165 std::vector<const RFieldDescriptor *> fExtraSrcFields;
166 std::vector<const RFieldDescriptor *> fCommonFields;
169struct RColumnOutInfo {
175using ColumnIdMap_t = std::unordered_map<std::string, RColumnOutInfo>;
177struct RColumnInfoGroup {
178 std::vector<RColumnInfo> fExtraDstColumns;
179 std::vector<RColumnInfo> fCommonColumns;
219 std::deque<RPageStorage::SealedPageSequence_t>
fPagesV;
220 std::vector<RPageStorage::RSealedPageGroup>
fGroups;
221 std::vector<std::unique_ptr<std::uint8_t[]>>
fBuffers;
236 std::vector<std::string> errors;
237 RDescriptorsComparison res;
239 struct RCommonField {
243 std::vector<RCommonField> commonFields;
246 const auto srcFieldId =
src.FindFieldId(dstField.GetFieldName());
248 const auto &srcField =
src.GetFieldDescriptor(srcFieldId);
249 commonFields.push_back({&dstField, &srcField});
251 res.fExtraDstFields.emplace_back(&dstField);
254 for (
const auto &srcField :
src.GetTopLevelFields()) {
255 const auto dstFieldId = dst.
FindFieldId(srcField.GetFieldName());
257 res.fExtraSrcFields.push_back(&srcField);
261 for (
const auto &field : commonFields) {
263 const auto &fieldName = field.fSrc->GetFieldName();
266 bool projCompatible = field.fSrc->IsProjectedField() == field.fDst->IsProjectedField();
267 if (!projCompatible) {
268 std::stringstream ss;
269 ss <<
"Field `" << fieldName <<
"` is incompatible with previously-seen field with that name because the "
270 << (field.fSrc->IsProjectedField() ?
"new" :
"old") <<
" one is projected and the other isn't";
271 errors.push_back(ss.str());
272 }
else if (field.fSrc->IsProjectedField()) {
274 const auto srcName =
src.GetQualifiedFieldName(field.fSrc->GetProjectionSourceId());
276 if (srcName != dstName) {
277 std::stringstream ss;
278 ss <<
"Field `" << fieldName
279 <<
"` is projected to a different field than a previously-seen field with the same name (old: " << dstName
280 <<
", new: " << srcName <<
")";
281 errors.push_back(ss.str());
287 const auto &srcTyName = field.fSrc->GetTypeName();
288 const auto &dstTyName = field.fDst->GetTypeName();
289 if (srcTyName != dstTyName) {
290 std::stringstream ss;
291 ss <<
"Field `" << fieldName
292 <<
"` has a type incompatible with a previously-seen field with the same name: (old: " << dstTyName
293 <<
", new: " << srcTyName <<
")";
294 errors.push_back(ss.str());
297 const auto srcTyChk = field.fSrc->GetTypeChecksum();
298 const auto dstTyChk = field.fDst->GetTypeChecksum();
299 if (srcTyChk && dstTyChk && *srcTyChk != *dstTyChk) {
300 std::stringstream ss;
301 ss <<
"Field `" << field.fSrc->GetFieldName()
302 <<
"` has a different type checksum than previously-seen field with the same name";
303 errors.push_back(ss.str());
306 const auto srcTyVer = field.fSrc->GetTypeVersion();
307 const auto dstTyVer = field.fDst->GetTypeVersion();
308 if (srcTyVer != dstTyVer) {
309 std::stringstream ss;
310 ss <<
"Field `" << field.fSrc->GetFieldName()
311 <<
"` has a different type version than previously-seen field with the same name (old: " << dstTyVer
312 <<
", new: " << srcTyVer <<
")";
313 errors.push_back(ss.str());
318 for (
const auto &err : errors)
319 errMsg += std::string(
"\n * ") + err;
322 errMsg = errMsg.substr(1);
327 res.fCommonFields.reserve(commonFields.size());
328 for (
const auto &[
_, srcField] : commonFields) {
329 res.fCommonFields.emplace_back(srcField);
342 assert(newFields.size() > 0);
347 std::string msg =
"destination doesn't contain field";
348 if (newFields.size() > 1)
351 msg += std::accumulate(newFields.begin(), newFields.end(), std::string{}, [](
const auto &acc,
const auto *field) {
352 return acc + (acc.length() ?
", " :
"") +
'`' + field->GetFieldName() +
'`';
354 Info(
"RNTuple::Merge",
"%s: adding %s to the destination model (entry #%" PRIu64
").", msg.c_str(),
355 (newFields.size() > 1 ?
"them" :
"it"), mergeData.
fNumDstEntries);
357 changeset.fAddedFields.reserve(newFields.size());
358 for (
const auto *fieldDesc : newFields) {
360 if (fieldDesc->IsProjectedField())
361 changeset.fAddedProjectedFields.emplace_back(field.get());
363 changeset.fAddedFields.emplace_back(field.get());
364 changeset.fModel.AddField(std::move(field));
376 assert(commonColumns.size() == commonColumnSet.size());
377 if (commonColumns.empty())
387 for (
const auto &column : commonColumns) {
388 const auto &columnId = column.fInputId;
389 R__ASSERT(clusterDesc.ContainsColumn(columnId));
396 const auto &pages = clusterDesc.GetPageRange(columnId);
399 sealedPages.resize(pages.fPageInfos.size());
402 const auto colRangeCompressionSettings = clusterDesc.GetColumnRange(columnId).fCompressionSettings;
407 Info(
"RNTuple::Merge",
"Column %s: changing source compression from %d to %d", column.fColumnName.c_str(),
412 size_t pageBufferBaseIdx = sealedPageData.
fBuffers.size();
413 if (colRangeCompressionSettings != 0)
414 sealedPageData.
fBuffers.resize(sealedPageData.
fBuffers.size() + pages.fPageInfos.size());
417 std::uint64_t pageIdx = 0;
418 for (
const auto &pageInfo : pages.fPageInfos) {
419 assert(pageIdx < sealedPages.size());
420 assert(sealedPageData.
fBuffers.size() == 0 || pageIdx < sealedPageData.
fBuffers.size());
429 sealedPage.
SetBufferSize(pageInfo.fLocator.fBytesOnStorage + checksumSize);
430 sealedPage.
SetBuffer(onDiskPage->GetAddress());
435 if (needsCompressionChange) {
436 const auto uncompressedSize = srcColElement->GetSize() * sealedPage.
GetNElements();
437 auto &buffer = sealedPageData.
fBuffers[pageBufferBaseIdx + pageIdx];
438 buffer = std::make_unique<std::uint8_t[]>(uncompressedSize + checksumSize);
439 RChangeCompressionFunc compressTask{
440 column.fOutputId, *srcColElement, *dstColElement, mergeData.
fMergeOpts,
441 sealedPage, *fPageAlloc, buffer.get(),
445 fTaskGroup->Run(compressTask);
457 sealedPageData.
fPagesV.push_back(std::move(sealedPages));
458 sealedPageData.
fGroups.emplace_back(column.fOutputId, sealedPageData.
fPagesV.back().cbegin(),
459 sealedPageData.
fPagesV.back().cend());
468 for (
const auto &column : extraDstColumns) {
469 const auto &columnId = column.fInputId;
479 bool skipColumn =
false;
480 auto nRepetitions = std::max<std::uint64_t>(field->
GetNRepetitions(), 1);
499 "Destination RNTuple contains an Unsplit field (%s) that is not present in one of the sources. "
500 "Creating a default value for an Unsplit field is ill-defined, therefore the merging process will abort.",
510 const auto nElements = nClusterEntries * nRepetitions;
511 const auto bytesOnStorage = colElement->GetPackedSize(nElements);
512 constexpr auto kPageSizeLimit = 256 * 1024;
514 const size_t nPages = bytesOnStorage / kPageSizeLimit + !!(bytesOnStorage % kPageSizeLimit);
515 for (
size_t i = 0; i < nPages; ++i) {
516 const auto pageSize = (i < nPages - 1) ? kPageSizeLimit : bytesOnStorage - kPageSizeLimit * (nPages - 1);
518 const auto bufSize = pageSize + checksumSize;
519 auto &buffer = sealedPageData.
fBuffers.emplace_back(
new unsigned char[bufSize]);
522 memset(buffer.get(), 0, pageSize);
523 sealedPage.ChecksumIfEnabled();
525 sealedPageData.
fPagesV.push_back({sealedPage});
528 sealedPageData.
fGroups.emplace_back(column.fOutputId, sealedPageData.
fPagesV.back().cbegin(),
529 sealedPageData.
fPagesV.back().cend());
545 commonColumnSet.reserve(commonColumns.size());
546 for (
const auto &column : commonColumns)
547 commonColumnSet.emplace(column.fInputId);
550 extraDstColumnSet.reserve(extraDstColumns.size());
551 for (
const auto &column : extraDstColumns)
552 extraDstColumnSet.emplace(column.fInputId);
560 const auto nClusterEntries = clusterDesc.
GetNEntries();
565 if (!commonColumnSet.empty()) {
566 MergeCommonColumns(clusterPool, clusterId, commonColumns, commonColumnSet, sealedPageData, mergeData);
569 if (!extraDstColumnSet.empty()) {
598 const std::string &prefix =
"")
603 columns.reserve(columns.size() + columnIds.size());
604 for (
const auto &columnId : columnIds) {
629 columns.emplace_back(info);
639static RColumnInfoGroup
642 RColumnInfoGroup res;
646 for (
const auto *field : descCmp.fCommonFields) {
668 std::unique_ptr<RNTupleModel> model;
670#define SKIP_OR_ABORT(errMsg) \
672 if (mergeOpts.fErrBehavior == ENTupleMergeErrBehavior::kSkip) { \
673 Warning("RNTuple::Merge", "Skipping RNTuple due to: %s", (errMsg).c_str()); \
676 return R__FAIL(errMsg); \
683 auto srcDescriptor = source->GetSharedDescriptorGuard();
684 mergeData.fSrcDescriptor = &srcDescriptor.GetRef();
688 model = srcDescriptor->CreateModel();
689 destination.
Init(*model);
692 for (
const auto &extraTypeInfoDesc : srcDescriptor->GetExtraTypeInfoIterable())
698 std::string(
"Source RNTuple will be skipped due to incompatible schema with the destination:\n") +
699 descCmpRes.GetError()->GetReport());
701 auto descCmp = descCmpRes.Unwrap();
706 std::string msg =
"Source RNTuple is missing the following fields:";
707 for (
const auto *field : descCmp.fExtraDstFields) {
708 msg +=
"\n " + field->GetFieldName() +
" : " + field->GetTypeName();
714 if (descCmp.fExtraSrcFields.size()) {
718 descCmp.fCommonFields.insert(descCmp.fCommonFields.end(), descCmp.fExtraSrcFields.begin(),
719 descCmp.fExtraSrcFields.end());
722 std::string msg =
"Source RNTuple has extra fields that the destination RNTuple doesn't have:";
723 for (
const auto *field : descCmp.fExtraSrcFields) {
724 msg +=
"\n " + field->GetFieldName() +
" : " + field->GetTypeName();
731 auto columnInfos =
GatherColumnInfos(descCmp, srcDescriptor.GetRef(), mergeData);
732 MergeSourceClusters(*source, columnInfos.fCommonColumns, columnInfos.fExtraDstColumns, mergeData);
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
static RResult< RDescriptorsComparison > CompareDescriptorStructure(const RNTupleDescriptor &dst, const RNTupleDescriptor &src)
Compares the top level fields of dst and src and determines whether they can be merged or not.
#define SKIP_OR_ABORT(errMsg)
static void GenerateExtraDstColumns(size_t nClusterEntries, std::span< RColumnInfo > extraDstColumns, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData)
static RColumnInfoGroup GatherColumnInfos(const RDescriptorsComparison &descCmp, const RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData)
static void ExtendDestinationModel(std::span< const RFieldDescriptor * > newFields, RNTupleModel &dstModel, RNTupleMergeData &mergeData)
static void AddColumnsFromField(std::vector< RColumnInfo > &columns, const RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData, const RFieldDescriptor &fieldDesc, const std::string &prefix="")
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
void Info(const char *location, const char *msgfmt,...)
Use this function for informational messages.
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
void Fatal(const char *location, const char *msgfmt,...)
Use this function in case of a fatal error. It will abort the program.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t src
TRObject operator()(const T1 &t1) const
The available trivial, native content types of a column.
Managed a set of clusters containing compressed and packed pages.
RCluster * GetCluster(DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
An in-memory subset of the packed and compressed pages of a cluster.
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
std::unordered_set< DescriptorId_t > ColumnSet_t
A column element encapsulates the translation between basic C++ types and their column representation...
static std::unique_ptr< RColumnElementBase > Generate(EColumnType type)
If CppT == void, use the default C++ type for the given column type.
Given a set of RPageSources merge them into an RPageSink, optionally changing their compression.
std::optional< TTaskGroup > fTaskGroup
RResult< void > Merge(std::span< RPageSource * > sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts=RNTupleMergeOptions())
Merge a given set of sources into the destination.
void MergeCommonColumns(RClusterPool &clusterPool, DescriptorId_t clusterId, std::span< RColumnInfo > commonColumns, RCluster::ColumnSet_t commonColumnSet, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData)
void MergeSourceClusters(RPageSource &source, std::span< RColumnInfo > commonColumns, std::span< RColumnInfo > extraDstColumns, RNTupleMergeData &mergeData)
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
Abstract interface to write data into an ntuple.
bool IsInitialized() const
virtual void UpdateExtraTypeInfo(const RExtraTypeInfoDescriptor &extraTypeInfo)=0
Adds an extra type information record to schema.
void CommitDataset()
Run the registered callbacks and finalize the current cluster and the entrire data set.
void Init(RNTupleModel &model)
Physically creates the storage container to hold the ntuple (e.g., a keys a TFile or an S3 bucket) In...
virtual void CommitClusterGroup()=0
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
virtual std::uint64_t CommitCluster(NTupleSize_t nNewEntries)
Finalize the current cluster and create a new one for the following data.
virtual void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry)=0
Incorporate incremental changes to the model into the ntuple descriptor.
virtual void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges)=0
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
RSealedPage SealPage(const RPage &page, const RColumnElementBase &element)
Helper for streaming a page.
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const RNTupleReadOptions &options=RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
Abstract interface to read data from an ntuple.
static RResult< RPage > UnsealPage(const RSealedPage &sealedPage, const RColumnElementBase &element, DescriptorId_t physicalColumnId, RPageAllocator &pageAlloc)
Helper for unstreaming a page.
std::deque< RSealedPage > SealedPageSequence_t
static constexpr std::size_t kNBytesPageChecksum
The page checksum is a 64bit xxhash3.
ClusterSize_t GetNEntries() const
Base class for all ROOT issued exceptions.
Meta-data stored for every field of an ntuple.
DescriptorId_t GetParentId() const
const std::string & GetFieldName() const
const std::vector< DescriptorId_t > & GetLogicalColumnIds() const
std::uint64_t GetNRepetitions() const
ENTupleStructure GetStructure() const
The on-storage meta-data of an ntuple.
DescriptorId_t FindNextClusterId(DescriptorId_t clusterId) const
DescriptorId_t FindClusterId(DescriptorId_t physicalColumnId, NTupleSize_t index) const
const RClusterDescriptor & GetClusterDescriptor(DescriptorId_t clusterId) const
std::string GetQualifiedFieldName(DescriptorId_t fieldId) const
Walks up the parents of the field ID and returns a field name of the form a.b.c.d In case of invalid ...
DescriptorId_t FindFieldId(std::string_view fieldName, DescriptorId_t parentId) const
const RColumnDescriptor & GetColumnDescriptor(DescriptorId_t columnId) const
const RFieldDescriptor & GetFieldDescriptor(DescriptorId_t fieldId) const
RFieldDescriptorIterable GetTopLevelFields() const
RFieldDescriptorIterable GetFieldIterable(const RFieldDescriptor &fieldDesc) const
The RNTupleModel encapulates the schema of an ntuple.
Common user-tunable settings for storing ntuples.
void SetCompression(int val)
void SetUseBufferedWrite(bool val)
Representation of an RNTuple data set in a ROOT file.
Long64_t Merge(TCollection *input, TFileMergeInfo *mergeInfo)
RNTuple implements the hadd MergeFile interface Merge this NTuple with the input list entries.
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
A class to manage the asynchronous execution of work items.
Collection abstract base class.
virtual Int_t GetEntries() const
TKey * FindKey(const char *keyname) const override
Find key with name keyname in the current directory.
TObject * Get(const char *namecycle) override
Return pointer to object identified by namecycle.
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Int_t GetCompressionSettings() const
Book space in a file, create I/O buffers, to fill them, (un)compress them.
T * ReadObject()
To read an object (non deriving from TObject) from the file.
const char * GetName() const override
Returns name of object.
Mother of all ROOT objects.
virtual const char * ClassName() const
Returns name of class to which the object belongs.
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
@ kStrict
The merger will refuse to merge any 2 RNTuples whose schema doesn't match exactly.
@ kUnion
The merger will update the output model to include all columns from all sources.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
constexpr int kUnknownCompressionSettings
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr DescriptorId_t kInvalidDescriptorId
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
const RFieldDescriptor * fParentField
ColumnIdMap_t fColumnIdMap
RNTupleMergeData(std::span< RPageSource * > sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts)
std::span< RPageSource * > fSources
const RNTupleDescriptor & fDstDescriptor
const RNTupleDescriptor * fSrcDescriptor
std::vector< RColumnInfo > fColumns
const RNTupleMergeOptions & fMergeOpts
NTupleSize_t fNumDstEntries
int fCompressionSettings
If fCompressionSettings == kUnknownCompressionSettings (the default), the merger will not change the ...
ENTupleMergingMode fMergingMode
Determines how the merging treats sources with different models (.
bool fExtraVerbose
If true, the merger will emit further diagnostics and information.
The incremental changes to a RNTupleModel
On-disk pages within a page source are identified by the column and page number.
Parameters for the SealPage() method.
const RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
int fCompressionSetting
Compression algorithm and level to apply.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
const RPage * fPage
Input page to be sealed.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
void SetHasChecksum(bool hasChecksum)
void SetNElements(std::uint32_t nElements)
void SetBuffer(const void *buffer)
void SetBufferSize(std::size_t bufferSize)
RResult< void > VerifyChecksumIfEnabled() const
std::uint32_t GetNElements() const
std::size_t GetBufferSize() const
bool GetHasChecksum() const
std::vector< RPageStorage::RSealedPageGroup > fGroups
std::deque< RPageStorage::SealedPageSequence_t > fPagesV
std::vector< std::unique_ptr< std::uint8_t[]> > fBuffers