37#include <initializer_list>
38#include <unordered_map>
64 const Ssiz_t wordLen = strlen(word);
65 if (str.Length() < wordLen)
69 return str.Length() == wordLen || str(wordLen) ==
' ';
74 std::initializer_list<std::pair<const char *, T>> validValues)
76 const Ssiz_t patternLen = strlen(pattern);
77 assert(pattern[patternLen - 1] ==
'=');
79 idx >= 0 && opts.
Length() > idx + patternLen) {
80 auto sub =
TString(opts(idx + patternLen, opts.
Length() - idx - patternLen));
81 for (
const auto &[
name, value] : validValues) {
112 opts,
"rntuple.VersionBehavior=",
125 if (!inputs || inputs->
GetEntries() < 3 || !mergeInfo) {
134 std::string ntupleName = std::string(itr()->GetName());
138 TFile *outFile =
dynamic_cast<TFile *
>(secondArg);
146 TKey *outKey = outFile->
FindKey(ntupleName.c_str());
158 const bool defaultComp = mergeInfo->
fOptions.
Contains(
"DefaultCompression");
159 const bool firstSrcComp = mergeInfo->
fOptions.
Contains(
"FirstSrcCompression");
160 const bool extraVerbose = mergeInfo->
fOptions.
Contains(
"rntuple.ExtraVerbose");
161 if (defaultComp && firstSrcComp) {
164 "only the latter will apply.";
166 std::optional<std::uint32_t> compression;
170 }
else if (!defaultComp) {
180 std::vector<std::unique_ptr<RPageSourceFile>> sources;
181 std::vector<RPageSource *> sourcePtrs;
183 while (
const auto &pitr = itr()) {
197 auto descriptor = source->GetSharedDescriptorGuard();
198 auto clusterIter = descriptor->GetClusterIterable();
199 auto firstCluster = clusterIter.begin();
200 if (firstCluster == clusterIter.end()) {
202 <<
"Asked to use the first source's compression as the output compression, but the "
203 "first source (file '"
205 <<
"') has an empty RNTuple, therefore the output compression could not be "
209 auto colRangeIter = (*firstCluster).GetColumnRangeIterable();
210 auto firstColRange = colRangeIter.begin();
211 if (firstColRange == colRangeIter.end()) {
213 <<
"Asked to use the first source's compression as the output compression, but the "
214 "first source (file '"
216 <<
"') has an empty RNTuple, therefore the output compression could not be "
220 compression = (*firstColRange).GetCompressionSettings().value();
223 sources.push_back(std::move(source));
229 auto destination = std::make_unique<ROOT::Internal::RPageSinkFile>(ntupleName, *outFile, writeOpts);
230 std::unique_ptr<ROOT::RNTupleModel> model;
235 auto desc = outSource->GetSharedDescriptorGuard();
236 model = destination->InitFromDescriptor(desc.GetRef(),
true );
240 sourcePtrs.reserve(sources.size());
241 for (
const auto &s : sources) {
242 sourcePtrs.push_back(s.get());
246 RNTupleMerger merger{std::move(destination), std::move(model)};
266}
catch (
const std::exception &
ex) {
273struct RChangeCompressionFunc {
281 std::size_t fBufSize;
293 std::unique_ptr<std::byte[]> unzipBufOwned;
295 if (compression != 0) {
297 unzipBuf = unzipBufOwned.get();
305 std::size_t nBytesZipped;
306 if (compression != 0) {
308 assert(fBufSize >= bytesPacked + checksumSize);
311 nBytesZipped = bytesPacked;
320 const RColumnElementBase &fSrcColElement;
321 const RColumnElementBase &fDstColElement;
322 const RNTupleMergeOptions &fMergeOptions;
324 RPageStorage::RSealedPage &fSealedPage;
325 ROOT::Internal::RPageAllocator &fPageAlloc;
327 std::size_t fBufSize;
328 const ROOT::RNTupleWriteOptions &fWriteOpts;
333 RPageSink::RSealPageConfig sealConf;
334 sealConf.
fElement = &fDstColElement;
335 sealConf.
fPage = &page;
341 fSealedPage = refSealedPage;
346 std::optional<ROOT::Experimental::TTaskGroup> &fGroup;
348 template <
typename T>
359 const ROOT::RFieldDescriptor *fSrc;
360 const ROOT::RFieldDescriptor *fDst;
362 RCommonField(
const ROOT::RFieldDescriptor &src,
const ROOT::RFieldDescriptor &dst) : fSrc(&src), fDst(&dst) {}
365struct RDescriptorsComparison {
366 std::vector<const ROOT::RFieldDescriptor *> fExtraDstFields;
367 std::vector<const ROOT::RFieldDescriptor *> fExtraSrcFields;
368 std::vector<RCommonField> fCommonFields;
371struct RColumnOutInfo {
377using ColumnIdMap_t = std::unordered_map<std::string, RColumnOutInfo>;
379struct RColumnInfoGroup {
380 std::vector<RColumnMergeInfo> fExtraDstColumns;
382 std::vector<RColumnMergeInfo> fCommonColumns;
388namespace ROOT::Experimental::Internal {
427 std::deque<RPageStorage::SealedPageSequence_t>
fPagesV;
428 std::vector<RPageStorage::RSealedPageGroup>
fGroups;
429 std::vector<std::unique_ptr<std::byte[]>>
fBuffers;
432std::ostream &
operator<<(std::ostream &os,
const std::optional<ROOT::RColumnDescriptor::RValueRange> &
x)
435 os <<
'(' <<
x->fMin <<
", " <<
x->fMax <<
')';
473static ROOT::RResult<RDescriptorsComparison>
482 std::vector<std::string> errors;
483 RDescriptorsComparison res;
485 std::vector<RCommonField> commonFields;
488 const auto srcFieldId = src.
FindFieldId(dstField.GetFieldName());
491 commonFields.push_back({srcField, dstField});
493 res.fExtraDstFields.emplace_back(&dstField);
497 const auto dstFieldId = dst.
FindFieldId(srcField.GetFieldName());
499 res.fExtraSrcFields.push_back(&srcField);
503 auto fieldsToCheck = commonFields;
505 for (std::size_t fieldIdx = 0; fieldIdx < fieldsToCheck.size(); ++fieldIdx) {
506 const auto &field = fieldsToCheck[fieldIdx];
509 const auto &fieldName = field.fSrc->GetFieldName();
512 bool projCompatible = field.fSrc->IsProjectedField() == field.fDst->IsProjectedField();
513 if (!projCompatible) {
514 std::stringstream ss;
515 ss <<
"Field `" << fieldName <<
"` is incompatible with previously-seen field with that name because the "
516 << (field.fSrc->IsProjectedField() ?
"new" :
"old") <<
" one is projected and the other isn't";
517 errors.push_back(ss.str());
518 }
else if (field.fSrc->IsProjectedField()) {
522 if (srcName != dstName) {
523 std::stringstream ss;
524 ss <<
"Field `" << fieldName
525 <<
"` is projected to a different field than a previously-seen field with the same name (old: "
526 << dstName <<
", new: " << srcName <<
")";
527 errors.push_back(ss.str());
533 const auto &srcTyName = field.fSrc->GetTypeName();
534 const auto &dstTyName = field.fDst->GetTypeName();
535 if (srcTyName != dstTyName) {
536 std::stringstream ss;
537 ss <<
"Field `" << fieldName
538 <<
"` has a type incompatible with a previously-seen field with the same name: (old: " << dstTyName
539 <<
", new: " << srcTyName <<
")";
540 errors.push_back(ss.str());
544 const auto srcTyChk = field.fSrc->GetTypeChecksum();
545 const auto dstTyChk = field.fDst->GetTypeChecksum();
546 if (srcTyChk && dstTyChk && *srcTyChk != *dstTyChk) {
547 std::stringstream ss;
548 ss <<
"Field `" << field.fSrc->GetFieldName()
549 <<
"` has a different type checksum than previously-seen field with the same name";
550 errors.push_back(ss.str());
554 const auto srcTyVer = field.fSrc->GetTypeVersion();
555 const auto dstTyVer = field.fDst->GetTypeVersion();
556 if (srcTyVer != dstTyVer) {
557 std::stringstream ss;
558 ss <<
"Field `" << field.fSrc->GetFieldName()
559 <<
"` has a different type version than previously-seen field with the same name (old: " << dstTyVer
560 <<
", new: " << srcTyVer <<
")";
561 errors.push_back(ss.str());
565 const auto srcFldVer = field.fSrc->GetFieldVersion();
566 const auto dstFldVer = field.fDst->GetFieldVersion();
567 if (srcFldVer != dstFldVer) {
568 std::stringstream ss;
569 ss <<
"Field `" << field.fSrc->GetFieldName()
570 <<
"` has a different field version than previously-seen field with the same name (old: " << dstFldVer
571 <<
", new: " << srcFldVer <<
")";
572 errors.push_back(ss.str());
575 const auto srcRole = field.fSrc->GetStructure();
576 const auto dstRole = field.fDst->GetStructure();
577 if (srcRole != dstRole) {
578 std::stringstream ss;
579 ss <<
"Field `" << field.fSrc->GetFieldName()
580 <<
"` has a different structural role than previously-seen field with the same name (old: " << dstRole
581 <<
", new: " << srcRole <<
")";
582 errors.push_back(ss.str());
586 const auto srcNCols = field.fSrc->GetLogicalColumnIds().size();
587 const auto dstNCols = field.fDst->GetLogicalColumnIds().size();
588 if (srcNCols != dstNCols) {
589 std::stringstream ss;
590 ss <<
"Field `" << field.fSrc->GetFieldName()
591 <<
"` has a different number of columns than previously-seen field with the same name (old: " << dstNCols
592 <<
", new: " << srcNCols <<
")";
593 errors.push_back(ss.str());
595 for (
auto i = 0u; i < srcNCols; ++i) {
596 const auto srcColId = field.fSrc->GetLogicalColumnIds()[i];
597 const auto dstColId = field.fDst->GetLogicalColumnIds()[i];
603 if (srcCol.GetType() != dstCol.GetType() &&
605 std::stringstream ss;
606 ss << i <<
"-th column of field `" << field.fSrc->GetFieldName()
607 <<
"` has a different column type of the same column on the previously-seen field with the same name "
611 errors.push_back(ss.str());
613 if (srcCol.GetBitsOnStorage() != dstCol.GetBitsOnStorage()) {
614 std::stringstream ss;
615 ss << i <<
"-th column of field `" << field.fSrc->GetFieldName()
616 <<
"` has a different number of bits of the same column on the previously-seen field with the same "
619 << srcCol.GetBitsOnStorage() <<
", new: " << dstCol.GetBitsOnStorage() <<
")";
620 errors.push_back(ss.str());
622 if (srcCol.GetValueRange() != dstCol.GetValueRange()) {
623 std::stringstream ss;
624 ss << i <<
"-th column of field `" << field.fSrc->GetFieldName()
625 <<
"` has a different value range of the same column on the previously-seen field with the same name "
627 << srcCol.GetValueRange() <<
", new: " << dstCol.GetValueRange() <<
")";
628 errors.push_back(ss.str());
630 if (srcCol.GetRepresentationIndex() > 0) {
631 std::stringstream ss;
632 ss << i <<
"-th column of field `" << field.fSrc->GetFieldName()
633 <<
"` has a representation index higher than 0. This is not supported yet by the merger.";
634 errors.push_back(ss.str());
640 const auto &srcLinks = field.fSrc->GetLinkIds();
641 const auto &dstLinks = field.fDst->GetLinkIds();
642 if (srcLinks.size() != dstLinks.size()) {
643 std::stringstream ss;
644 ss <<
"Field `" << field.fSrc->GetFieldName()
645 <<
"` has a different number of children than previously-seen field with the same name (old: "
646 << dstLinks.size() <<
", new: " << srcLinks.size() <<
")";
647 errors.push_back(ss.str());
649 for (std::size_t linkIdx = 0, linkNum = srcLinks.size(); linkIdx < linkNum; ++linkIdx) {
652 fieldsToCheck.push_back(RCommonField{srcSubfield, dstSubfield});
658 for (
const auto &
err : errors)
659 errMsg += std::string(
"\n * ") +
err;
662 errMsg = errMsg.substr(1);
667 res.fCommonFields = std::move(commonFields);
674static ROOT::RResult<void>
678 assert(newFields.size() > 0);
684 std::string msg =
"destination doesn't contain field";
685 if (newFields.size() > 1)
688 msg += std::accumulate(newFields.begin(), newFields.end(), std::string{}, [](
const auto &acc,
const auto *field) {
689 return acc + (acc.length() ?
", " :
"") +
'`' + field->GetFieldName() +
'`';
692 <<
" to the destination model (entry #" << mergeData.
fNumDstEntries <<
").";
697 for (
const auto *fieldDesc : newFields) {
698 if (!fieldDesc->IsProjectedField()) {
700 changeset.
AddField(std::move(field));
704 for (
const auto *fieldDesc : newFields) {
705 if (!fieldDesc->IsProjectedField())
710 const auto sourceId = fieldDesc->GetProjectionSourceId();
712 fieldMap[field.get()] = &sourceField;
714 for (
const auto &subfield : *field) {
717 const auto &subSourceField =
719 fieldMap[&subfield] = &subSourceField;
728 return R__FAIL(
ex.GetError().GetReport());
731 commonFields.reserve(commonFields.size() + newFields.size());
732 for (
const auto *field : newFields) {
735 commonFields.emplace_back(*field, newFieldInDst);
743static ROOT::RResult<void>
748 if (!nEntriesToGenerate)
751 for (
const auto &column : columns) {
761 bool skipColumn =
false;
762 auto nRepetitions = std::max<std::uint64_t>(field->
GetNRepetitions(), 1);
780 "Destination RNTuple contains a streamer field (" + field->
GetFieldName() +
781 ") that is not present in one of the sources. "
782 "Creating a default value for a streamer field is ill-defined, therefore the merging process will abort.");
791 const auto nElements = nEntriesToGenerate * nRepetitions;
792 const auto nBytesOnStorage = colElement->GetPackedSize(nElements);
794 constexpr auto kPageSizeLimit = 256 * 1024;
796 const size_t nPages = nBytesOnStorage / kPageSizeLimit + !!(nBytesOnStorage % kPageSizeLimit);
797 for (
size_t i = 0; i < nPages; ++i) {
798 const auto pageSize = (i < nPages - 1) ? kPageSizeLimit : nBytesOnStorage - kPageSizeLimit * (nPages - 1);
800 const auto bufSize = pageSize + checksumSize;
801 assert(pageSize % colElement->GetSize() == 0);
802 const auto nElementsPerPage = pageSize / colElement->GetSize();
803 auto page = pageAlloc.
NewPage(colElement->GetSize(), nElementsPerPage);
805 memset(page.GetBuffer(), 0, page.GetNBytes());
807 auto &buffer = sealedPageData.
fBuffers.emplace_back(
new std::byte[bufSize]);
809 sealConf.
fElement = colElement.get();
810 sealConf.
fPage = &page;
811 sealConf.
fBuffer = buffer.get();
816 sealedPageData.
fPagesV.push_back({sealedPage});
817 sealedPageData.
fGroups.emplace_back(column.fOutputId, sealedPageData.
fPagesV.back().cbegin(),
818 sealedPageData.
fPagesV.back().cend());
829 std::span<const RColumnMergeInfo> commonColumns,
834 assert(nCommonColumnsInCluster == commonColumnSet.size());
835 assert(nCommonColumnsInCluster <= commonColumns.size());
836 if (nCommonColumnsInCluster == 0)
844 for (
size_t colIdx = 0; colIdx < nCommonColumnsInCluster; ++colIdx) {
845 const auto &column = commonColumns[colIdx];
846 const auto &columnId = column.fInputId;
850 const auto srcColElement = column.fInMemoryType
853 const auto dstColElement = column.fInMemoryType
861 sealedPages.resize(pages.GetPageInfos().size());
872 const bool compressionIsDifferent =
874 const bool needsResealing =
875 srcColElement->GetIdentifier().fOnDiskType != dstColElement->GetIdentifier().fOnDiskType;
876 const bool needsRecompressing = compressionIsDifferent || needsResealing;
880 << (needsResealing ?
"Resealing" :
"Recompressing") <<
" column " << column.fColumnName
881 <<
": { compression: " << colRangeCompressionSettings <<
" => "
887 size_t pageBufferBaseIdx = sealedPageData.
fBuffers.size();
890 if (needsRecompressing)
891 sealedPageData.
fBuffers.resize(sealedPageData.
fBuffers.size() + pages.GetPageInfos().size());
899 const auto nMissingEntries = columnDesc.GetFirstElementIndex() - clusterDesc.
GetFirstEntryIndex();
907 std::uint64_t pageIdx = 0;
908 for (
const auto &pageInfo : pages.GetPageInfos()) {
909 assert(pageIdx < sealedPages.size());
910 assert(sealedPageData.
fBuffers.size() == 0 || pageIdx < sealedPageData.
fBuffers.size());
920 sealedPage.
SetBufferSize(pageInfo.GetLocator().GetNBytesOnStorage() + checksumSize);
921 sealedPage.
SetBuffer(onDiskPage->GetAddress());
926 if (needsRecompressing) {
927 const auto uncompressedSize = srcColElement->GetSize() * sealedPage.
GetNElements();
928 auto &buffer = sealedPageData.
fBuffers[pageBufferBaseIdx + pageIdx];
929 const auto bufSize = uncompressedSize + checksumSize;
936 if (needsResealing) {
948 RTaskVisitor{
fTaskGroup}(RChangeCompressionFunc{
969 sealedPageData.
fPagesV.push_back(std::move(sealedPages));
970 sealedPageData.
fGroups.emplace_back(column.fOutputId, sealedPageData.
fPagesV.back().cbegin(),
971 sealedPageData.
fPagesV.back().cend());
984 std::span<const RColumnMergeInfo> extraDstColumns,
RNTupleMergeData &mergeData)
988 std::vector<RColumnMergeInfo> missingColumns{extraDstColumns.begin(), extraDstColumns.end()};
996 const auto nClusterEntries = clusterDesc.
GetNEntries();
1003 size_t nCommonColumnsInCluster = commonColumns.size();
1004 while (nCommonColumnsInCluster > 0) {
1008 if (clusterDesc.ContainsColumn(commonColumns[nCommonColumnsInCluster - 1].fInputId))
1010 --nCommonColumnsInCluster;
1015 commonColumnSet.reserve(nCommonColumnsInCluster);
1016 for (
size_t i = 0; i < nCommonColumnsInCluster; ++i)
1017 commonColumnSet.emplace(commonColumns[i].fInputId);
1021 missingColumns.resize(extraDstColumns.size());
1022 for (
size_t i = nCommonColumnsInCluster; i < commonColumns.size(); ++i)
1023 missingColumns.push_back(commonColumns[i]);
1026 auto res =
MergeCommonColumns(clusterPool, clusterDesc, commonColumns, commonColumnSet, nCommonColumnsInCluster,
1064 if (fieldType ==
"bool")
return typeid(
bool);
1065 if (fieldType ==
"std::byte")
return typeid(std::byte);
1066 if (fieldType ==
"char")
return typeid(char);
1067 if (fieldType ==
"std::int8_t")
return typeid(std::int8_t);
1068 if (fieldType ==
"std::uint8_t")
return typeid(std::uint8_t);
1069 if (fieldType ==
"std::int16_t")
return typeid(std::int16_t);
1070 if (fieldType ==
"std::uint16_t")
return typeid(std::uint16_t);
1071 if (fieldType ==
"std::int32_t")
return typeid(std::int32_t);
1072 if (fieldType ==
"std::uint32_t")
return typeid(std::uint32_t);
1073 if (fieldType ==
"std::int64_t")
return typeid(std::int64_t);
1074 if (fieldType ==
"std::uint64_t")
return typeid(std::uint64_t);
1075 if (fieldType ==
"float")
return typeid(float);
1076 if (fieldType ==
"double")
return typeid(
double);
1080 return std::nullopt;
1096 columns.reserve(columns.size() + columnIds.size());
1109 info.
fInputId = srcColumn.GetPhysicalId();
1143 <<
", phys.id " << srcColumn.GetPhysicalId() <<
", type "
1152 columns.emplace_back(info);
1155 const auto &srcChildrenIds = srcFieldDesc.
GetLinkIds();
1156 const auto &dstChildrenIds = dstFieldDesc.
GetLinkIds();
1157 assert(srcChildrenIds.size() == dstChildrenIds.size());
1158 for (
auto i = 0u; i < srcChildrenIds.size(); ++i) {
1171 RColumnInfoGroup res;
1175 for (
const auto &[srcField, dstField] : descCmp.fCommonFields) {
1181 std::sort(res.fCommonColumns.begin(), res.fCommonColumns.end(),
1182 [](
const auto &
a,
const auto &
b) { return a.fInputId < b.fInputId; });
1188 ColumnIdMap_t &colIdMap,
const std::string &prefix =
"")
1193 RColumnOutInfo info{};
1194 const auto colName =
name +
'.' + std::to_string(colDesc.GetIndex());
1195 info.fColumnId = colDesc.GetLogicalId();
1196 info.fColumnType = colDesc.GetType();
1197 colIdMap[colName] = info;
1200 for (
const auto &subId : fieldDesc.
GetLinkIds()) {
1207 std::unique_ptr<ROOT::RNTupleModel> model)
1235 const auto dstCompSettings =
fDestination->GetWriteOptions().GetCompression();
1239 return R__FAIL(std::string(
"The compression given to RNTupleMergeOptions is different from that of the "
1241 std::to_string(*mergeOpts.
fCompressionSettings) +
", sink: " + std::to_string(dstCompSettings) +
1242 ") This is currently unsupported.");
1249 "passing an already-initialized destination to RNTupleMerger::Merge (i.e. trying to do incremental "
1250 "merging) can only be done by providing a valid ROOT::RNTupleModel when constructing the RNTupleMerger.");
1264#define SKIP_OR_ABORT(errMsg) \
1266 if (mergeOpts.fErrBehavior == ENTupleMergeErrBehavior::kSkip) { \
1267 R__LOG_WARNING(NTupleMergeLog()) << "Skipping RNTuple due to: " << (errMsg); \
1270 return R__FAIL(errMsg); \
1278 source->LoadStreamerInfo();
1281 auto srcDescriptor = source->GetSharedDescriptorGuard();
1288 <<
"' has a higher format version than the latest supported by this version "
1289 "of ROOT. Merging will work but some features may be dropped.";
1292 "' has a higher format version than the latest supported by this version. Refusing to "
1293 "merge, since RNTupleMergeOptions::fVersionBehavior is set to AbortOnHigherVersion.");
1302 for (
const auto &extraTypeInfoDesc : srcDescriptor->GetExtraTypeInfoIterable())
1307 SKIP_OR_ABORT(std::string(
"Source RNTuple has an incompatible schema with the destination:\n") +
1308 descCmpRes.GetError()->GetReport());
1310 auto descCmp = descCmpRes.Unwrap();
1315 std::string msg =
"Source RNTuple is missing the following fields:";
1316 for (
const auto *field : descCmp.fExtraDstFields) {
1317 msg +=
"\n " + field->GetFieldName() +
" : " + field->GetTypeName();
1323 if (descCmp.fExtraSrcFields.size()) {
1331 std::string msg =
"Source RNTuple has extra fields that the destination RNTuple doesn't have:";
1332 for (
const auto *field : descCmp.fExtraSrcFields) {
1333 msg +=
"\n " + field->GetFieldName() +
" : " + field->GetTypeName();
1340 auto columnInfos =
GatherColumnInfos(descCmp, srcDescriptor.GetRef(), mergeData);
1341 auto res =
MergeSourceClusters(*source, columnInfos.fCommonColumns, columnInfos.fExtraDstColumns, mergeData);
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking).
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
#define R__LOG_WARNING(...)
#define R__LOG_ERROR(...)
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
static std::optional< std::type_index > ColumnInMemoryType(std::string_view fieldType, ENTupleColumnType onDiskType)
static ROOT::RResult< RDescriptorsComparison > CompareDescriptorStructure(const ROOT::RNTupleDescriptor &dst, const ROOT::RNTupleDescriptor &src)
Compares the top level fields of dst and src and determines whether they can be merged or not.
static ROOT::RResult< void > GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span< const RColumnMergeInfo > columns, RSealedPageMergeData &sealedPageData, ROOT::Internal::RPageAllocator &pageAlloc, const ROOT::RNTupleDescriptor &dstDescriptor, const RNTupleMergeData &mergeData)
static std::optional< ENTupleMergeErrBehavior > ParseOptionErrBehavior(const TString &opts)
static ROOT::RLogChannel & NTupleMergeLog()
#define SKIP_OR_ABORT(errMsg)
static std::optional< T > ParseStringOption(const TString &opts, const char *pattern, std::initializer_list< std::pair< const char *, T > > validValues)
static bool IsSplitOrUnsplitVersionOf(ENTupleColumnType a, ENTupleColumnType b)
static void AddColumnsFromField(std::vector< RColumnMergeInfo > &columns, const ROOT::RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData, const ROOT::RFieldDescriptor &srcFieldDesc, const ROOT::RFieldDescriptor &dstFieldDesc, const std::string &prefix="")
static std::optional< ENTupleMergingMode > ParseOptionMergingMode(const TString &opts)
static void PrefillColumnMap(const ROOT::RNTupleDescriptor &desc, const ROOT::RFieldDescriptor &fieldDesc, ColumnIdMap_t &colIdMap, const std::string &prefix="")
static RColumnInfoGroup GatherColumnInfos(const RDescriptorsComparison &descCmp, const ROOT::RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData)
static ROOT::RResult< void > ExtendDestinationModel(std::span< const ROOT::RFieldDescriptor * > newFields, ROOT::RNTupleModel &dstModel, RNTupleMergeData &mergeData, std::vector< RCommonField > &commonFields)
static std::optional< ENTupleMergeVersionBehavior > ParseOptionVersionBehavior(const TString &opts)
static bool BeginsWithDelimitedWord(const TString &str, const char *word)
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
int Ssiz_t
String size (currently int).
TBuffer & operator<<(TBuffer &buf, const Tmpl *obj)
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
TRObject operator()(const T1 &t1) const
The available trivial, native content types of a column.
Given a set of RPageSources merge them into an RPagePersistentSink, optionally changing their compres...
ROOT::RResult< void > MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, const ROOT::RClusterDescriptor &clusterDesc, std::span< const RColumnMergeInfo > commonColumns, const ROOT::Internal::RCluster::ColumnSet_t &commonColumnSet, std::size_t nCommonColumnsInCluster, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData, ROOT::Internal::RPageAllocator &pageAlloc)
std::optional< TTaskGroup > fTaskGroup
std::unique_ptr< ROOT::RNTupleModel > fModel
RNTupleMerger(std::unique_ptr< ROOT::Internal::RPagePersistentSink > destination, std::unique_ptr< ROOT::RNTupleModel > model)
Creates a RNTupleMerger with the given destination.
ROOT::RResult< void > MergeSourceClusters(ROOT::Internal::RPageSource &source, std::span< const RColumnMergeInfo > commonColumns, std::span< const RColumnMergeInfo > extraDstColumns, RNTupleMergeData &mergeData)
std::unique_ptr< ROOT::Internal::RPagePersistentSink > fDestination
std::unique_ptr< ROOT::Internal::RPageAllocator > fPageAlloc
RResult< void > Merge(std::span< ROOT::Internal::RPageSource * > sources, const RNTupleMergeOptions &mergeOpts=RNTupleMergeOptions())
Merge a given set of sources into the destination.
A class to manage the asynchronous execution of work items.
Managed a set of clusters containing compressed and packed pages.
RCluster * GetCluster(ROOT::DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
An in-memory subset of the packed and compressed pages of a cluster.
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
A column element encapsulates the translation between basic C++ types and their column representation...
static const char * GetColumnTypeName(ROOT::ENTupleColumnType type)
static std::unique_ptr< RColumnElementBase > Generate(ROOT::ENTupleColumnType type)
If CppT == void, use the default C++ type for the given column type.
virtual RIdentifier GetIdentifier() const =0
std::size_t GetPackedSize(std::size_t nElements=1U) const
The in-memory representation of a 32bit or 64bit on-disk index column.
Holds the index and the tag of a kSwitch column.
static std::size_t Zip(const void *from, std::size_t nbytes, int compression, void *to)
Returns the size of the compressed data, written into the provided output buffer.
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
A helper class for serializing and deserialization of the RNTuple binary format.
@ kRaw
Deserializes the descriptor as-is without performing any additional fixup.
@ kForWriting
Deserializes the descriptor and performs fixup on the suppressed column ranges.
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
virtual RPage NewPage(std::size_t elementSize, std::size_t nElements)=0
Reserves memory large enough to hold nElements of the given size.
Abstract interface to write data into an ntuple.
const ROOT::RNTupleWriteOptions & GetWriteOptions() const
Returns the sink's write options.
virtual std::uint64_t CommitCluster(ROOT::NTupleSize_t nNewEntries)
Finalize the current cluster and create a new one for the following data.
virtual void UpdateSchema(const ROOT::Internal::RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry)=0
Incorporate incremental changes to the model into the ntuple descriptor.
RSealedPage SealPage(const ROOT::Internal::RPage &page, const ROOT::Internal::RColumnElementBase &element)
Helper for streaming a page.
virtual void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges)=0
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
virtual ROOT::NTupleSize_t GetNEntries() const =0
Storage provider that reads ntuple pages from a file.
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
Abstract interface to read data from an ntuple.
Common functionality of an ntuple storage for both reading and writing.
static constexpr std::size_t kNBytesPageChecksum
The page checksum is a 64bit xxhash3.
std::deque< RSealedPage > SealedPageSequence_t
void * GrowUnchecked(std::uint32_t nElements)
Increases the number elements in the page.
std::unordered_map< const ROOT::RFieldBase *, const ROOT::RFieldBase * > FieldMap_t
The map keys are the projected target fields, the map values are the backing source fields Note that ...
RResult< void > Add(std::unique_ptr< ROOT::RFieldBase > field, const FieldMap_t &fieldMap)
Adds a new projected field.
std::optional< std::uint32_t > GetCompressionSettings() const
Metadata for RNTuple clusters.
ROOT::NTupleSize_t GetNEntries() const
ROOT::DescriptorId_t GetId() const
const RPageRange & GetPageRange(ROOT::DescriptorId_t physicalId) const
bool ContainsColumn(ROOT::DescriptorId_t physicalId) const
const RColumnRange & GetColumnRange(ROOT::DescriptorId_t physicalId) const
ROOT::NTupleSize_t GetFirstEntryIndex() const
Base class for all ROOT issued exceptions.
Metadata stored for every field of an RNTuple.
const std::vector< ROOT::DescriptorId_t > & GetLogicalColumnIds() const
ROOT::ENTupleStructure GetStructure() const
const std::vector< ROOT::DescriptorId_t > & GetLinkIds() const
ROOT::DescriptorId_t GetParentId() const
std::uint64_t GetNRepetitions() const
bool IsProjectedField() const
ROOT::DescriptorId_t GetProjectionSourceId() const
const std::string & GetFieldName() const
const std::string & GetTypeName() const
A log configuration for a channel, e.g.
The on-storage metadata of an RNTuple.
const RColumnDescriptor & GetColumnDescriptor(ROOT::DescriptorId_t columnId) const
ROOT::DescriptorId_t FindNextClusterId(ROOT::DescriptorId_t clusterId) const
const RFieldDescriptor & GetFieldDescriptor(ROOT::DescriptorId_t fieldId) const
const std::string & GetName() const
ROOT::DescriptorId_t FindClusterId(ROOT::NTupleSize_t entryIdx) const
std::uint64_t GetVersion() const
const RClusterDescriptor & GetClusterDescriptor(ROOT::DescriptorId_t clusterId) const
ROOT::DescriptorId_t FindFieldId(std::string_view fieldName, ROOT::DescriptorId_t parentId) const
RFieldDescriptorIterable GetTopLevelFields() const
std::string GetQualifiedFieldName(ROOT::DescriptorId_t fieldId) const
Walks up the parents of the field ID and returns a field name of the form a.b.c.d In case of invalid ...
The RNTupleModel encapulates the schema of an RNTuple.
void Unfreeze()
Transitions an RNTupleModel from the frozen state back to the building state, invalidating all previo...
void Freeze()
Transitions an RNTupleModel from the building state to the frozen state, disabling adding additional ...
const ROOT::RFieldBase & GetConstField(std::string_view fieldName) const
Common user-tunable settings for storing RNTuples.
bool GetEnablePageChecksums() const
void SetCompression(std::uint32_t val)
Representation of an RNTuple data set in a ROOT file.
Long64_t Merge(TCollection *input, TFileMergeInfo *mergeInfo)
RNTuple implements the hadd MergeFile interface Merge this NTuple with the input list entries.
static constexpr std::uint64_t GetCurrentVersion()
Returns the RNTuple version in the following form: Epoch: 2 most significant bytes Major: next 2 byte...
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
RSealedPage SealPage(const ROOT::Internal::RPage &page, const ROOT::Internal::RColumnElementBase &element)
Helper for streaming a page.
static RResult< ROOT::Internal::RPage > UnsealPage(const RSealedPage &sealedPage, const ROOT::Internal::RColumnElementBase &element, ROOT::Internal::RPageAllocator &pageAlloc)
Helper for unstreaming a page.
static constexpr std::size_t kNBytesPageChecksum
The page checksum is a 64bit xxhash3.
Collection abstract base class.
virtual Int_t GetEntries() const
TKey * FindKey(const char *keyname) const override
TObject * Get(const char *namecycle) override
Return pointer to object identified by namecycle.
A class to pass information from the TFileMerger to the objects being merged.
TString fOptions
Additional text based option being passed down to customize the merge.
A file, usually with extension .root, that stores data and code in the form of serialized objects in ...
Int_t GetCompressionSettings() const
Book space in a file, create I/O buffers, to fill them, (un)compress them.
T * ReadObject()
To read an object (non deriving from TObject) from the file.
const char * GetName() const override
Returns name of object.
Mother of all ROOT objects.
virtual const char * ClassName() const
Returns name of class to which the object belongs.
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Ssiz_t Index(const char *pat, Ssiz_t i=0, ECaseCompare cmp=kExact) const
@ kStrict
The merger will refuse to merge any 2 RNTuples whose schema doesn't match exactly.
@ kUnion
The merger will update the output model to include all columns from all sources.
@ kFilter
The merger will discard all columns that aren't present in the prototype model (i....
@ kAbortOnHigherVersion
The merger will refuse to merge RNTuples with higher versions than the latest supported by this ROOT ...
@ kWarnOnHigherVersion
The merger will emit a warning when merging RNTuples with higher version than the latest supported by...
@ kAbort
The merger will abort merging as soon as an error is encountered.
@ kSkip
Upon errors, the merger will skip the current source and continue.
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
std::unique_ptr< RColumnElementBase > GenerateColumnElement(std::type_index inMemoryType, ROOT::ENTupleColumnType onDiskType)
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
constexpr DescriptorId_t kInvalidDescriptorId
ROOT::DescriptorId_t fOutputId
const ROOT::RFieldDescriptor * fParentFieldDescriptor
ROOT::DescriptorId_t fInputId
std::optional< std::type_index > fInMemoryType
const ROOT::RNTupleDescriptor * fParentNTupleDescriptor
ENTupleColumnType fColumnType
const ROOT::RNTupleDescriptor * fSrcDescriptor
ColumnIdMap_t fColumnIdMap
RNTupleMergeData(std::span< RPageSource * > sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts)
std::span< RPageSource * > fSources
ROOT::NTupleSize_t fNumDstEntries
const RNTupleMergeOptions & fMergeOpts
std::vector< RColumnMergeInfo > fColumns
const ROOT::RNTupleDescriptor & fDstDescriptor
Set of merging options to pass to RNTupleMerger.
ENTupleMergingMode fMergingMode
Determines how the merging treats sources with different models (.
ENTupleMergeVersionBehavior fVersionBehavior
Determines how the Merge function behaves depending on the RNTuple sources' version.
ENTupleMergeErrBehavior fErrBehavior
Determines how the Merge function behaves upon merging errors.
std::optional< std::uint32_t > fCompressionSettings
If fCompressionSettings is empty (the default), the merger will not change the compression of any of ...
bool fExtraVerbose
If true, the merger will emit further diagnostics and information.
std::vector< RPageStorage::RSealedPageGroup > fGroups
std::vector< std::unique_ptr< std::byte[]> > fBuffers
std::deque< RPageStorage::SealedPageSequence_t > fPagesV
The incremental changes to a RNTupleModel.
void AddField(std::unique_ptr< ROOT::RFieldBase > field, std::string_view parentName="")
std::vector< ROOT::RFieldBase * > fAddedProjectedFields
Points to the projected fields in fModel that were added as part of an updater transaction.
std::vector< ROOT::RFieldBase * > fAddedFields
Points to the fields in fModel that were added as part of an updater transaction.
On-disk pages within a page source are identified by the column and page number.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RResult< void > VerifyChecksumIfEnabled() const
void SetBuffer(const void *buffer)
std::uint32_t GetNElements() const
void SetHasChecksum(bool hasChecksum)
void SetNElements(std::uint32_t nElements)
std::size_t GetBufferSize() const
void SetBufferSize(std::size_t bufferSize)
@ kUseGeneralPurpose
Use the new recommended general-purpose setting; it is a best trade-off between compression ratio/dec...
Parameters for the SealPage() method.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
std::uint32_t fCompressionSettings
Compression algorithm and level to apply.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
const ROOT::Internal::RPage * fPage
Input page to be sealed.
const ROOT::Internal::RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RResult< void > VerifyChecksumIfEnabled() const
std::uint32_t GetNElements() const
const void * GetBuffer() const
bool GetHasChecksum() const
std::size_t GetDataSize() const