39#include <initializer_list>
40#include <unordered_map>
76 std::initializer_list<std::pair<const char *, T>>
validValues)
96 {
"Filter", ENTupleMergingMode::kFilter},
97 {
"Union", ENTupleMergingMode::kUnion},
98 {
"Strict", ENTupleMergingMode::kStrict},
106 {
"Abort", ENTupleMergeErrBehavior::kAbort},
107 {
"Skip", ENTupleMergeErrBehavior::kSkip},
156 "only the latter will apply.";
172 std::vector<std::unique_ptr<RPageSourceFile>>
sources;
175 while (
const auto &
pitr =
itr()) {
180 <<
inFile->GetName() <<
"'";
188 source->Attach(RNTupleSerializer::EDescriptorDeserializeMode::kRaw);
194 <<
"Asked to use the first source's compression as the output compression, but the "
195 "first source (file '"
197 <<
"') has an empty RNTuple, therefore the output compression could not be "
201 auto colRangeIter = (*firstCluster).GetColumnRangeIterable();
205 <<
"Asked to use the first source's compression as the output compression, but the "
206 "first source (file '"
208 <<
"') has an empty RNTuple, therefore the output compression could not be "
212 compression = (*firstColRange).GetCompressionSettings().value();
222 std::unique_ptr<ROOT::RNTupleModel> model;
226 outSource->Attach(RNTupleSerializer::EDescriptorDeserializeMode::kForWriting);
227 auto desc =
outSource->GetSharedDescriptorGuard();
228 model =
destination->InitFromDescriptor(desc.GetRef(),
true );
233 for (
const auto &s :
sources) {
255}
catch (
const std::exception &
ex) {
262struct RChangeCompressionFunc {
273 auto page = RPageSource::UnsealPage(fSealedPage, fSrcColElement, fPageAlloc).Unwrap();
275 sealConf.fElement = &fDstColElement;
292struct RDescriptorsComparison {
293 std::vector<const ROOT::RFieldDescriptor *> fExtraDstFields;
294 std::vector<const ROOT::RFieldDescriptor *> fExtraSrcFields;
295 std::vector<RCommonField> fCommonFields;
298struct RColumnOutInfo {
304using ColumnIdMap_t = std::unordered_map<std::string, RColumnOutInfo>;
306struct RColumnInfoGroup {
307 std::vector<RColumnMergeInfo> fExtraDstColumns;
309 std::vector<RColumnMergeInfo> fCommonColumns;
354 std::deque<RPageStorage::SealedPageSequence_t>
fPagesV;
355 std::vector<RPageStorage::RSealedPageGroup>
fGroups;
356 std::vector<std::unique_ptr<std::uint8_t[]>>
fBuffers;
359std::ostream &
operator<<(std::ostream &os,
const std::optional<ROOT::RColumnDescriptor::RValueRange> &
x)
362 os <<
'(' <<
x->fMin <<
", " <<
x->fMax <<
')';
409 std::vector<std::string>
errors;
410 RDescriptorsComparison res;
414 for (
const auto &
dstField :
dst.GetTopLevelFields()) {
420 res.fExtraDstFields.emplace_back(&
dstField);
423 for (
const auto &
srcField :
src.GetTopLevelFields()) {
426 res.fExtraSrcFields.push_back(&
srcField);
441 std::stringstream
ss;
442 ss <<
"Field `" <<
fieldName <<
"` is incompatible with previously-seen field with that name because the "
443 << (
field.fSrc->IsProjectedField() ?
"new" :
"old") <<
" one is projected and the other isn't";
445 }
else if (
field.fSrc->IsProjectedField()) {
447 const auto srcName =
src.GetQualifiedFieldName(
field.fSrc->GetProjectionSourceId());
448 const auto dstName =
dst.GetQualifiedFieldName(
field.fDst->GetProjectionSourceId());
450 std::stringstream
ss;
452 <<
"` is projected to a different field than a previously-seen field with the same name (old: "
463 std::stringstream
ss;
465 <<
"` has a type incompatible with a previously-seen field with the same name: (old: " <<
dstTyName
474 std::stringstream
ss;
475 ss <<
"Field `" <<
field.fSrc->GetFieldName()
476 <<
"` has a different type checksum than previously-seen field with the same name";
484 std::stringstream
ss;
485 ss <<
"Field `" <<
field.fSrc->GetFieldName()
486 <<
"` has a different type version than previously-seen field with the same name (old: " <<
dstTyVer
494 std::stringstream
ss;
495 ss <<
"Field `" <<
field.fSrc->GetFieldName()
496 <<
"` has a different structural role than previously-seen field with the same name (old: " <<
dstRole
497 <<
", new: " <<
srcRole <<
")";
502 const auto srcNCols =
field.fSrc->GetLogicalColumnIds().size();
503 const auto dstNCols =
field.fDst->GetLogicalColumnIds().size();
505 std::stringstream
ss;
506 ss <<
"Field `" <<
field.fSrc->GetFieldName()
507 <<
"` has a different number of columns than previously-seen field with the same name (old: " <<
dstNCols
521 std::stringstream
ss;
522 ss << i <<
"-th column of field `" <<
field.fSrc->GetFieldName()
523 <<
"` has a different column type of the same column on the previously-seen field with the same name "
529 if (
srcCol.GetBitsOnStorage() !=
dstCol.GetBitsOnStorage()) {
530 std::stringstream
ss;
531 ss << i <<
"-th column of field `" <<
field.fSrc->GetFieldName()
532 <<
"` has a different number of bits of the same column on the previously-seen field with the same "
535 <<
srcCol.GetBitsOnStorage() <<
", new: " <<
dstCol.GetBitsOnStorage() <<
")";
539 std::stringstream
ss;
540 ss << i <<
"-th column of field `" <<
field.fSrc->GetFieldName()
541 <<
"` has a different value range of the same column on the previously-seen field with the same name "
543 <<
srcCol.GetValueRange() <<
", new: " <<
dstCol.GetValueRange() <<
")";
546 if (
srcCol.GetRepresentationIndex() > 0) {
547 std::stringstream
ss;
548 ss << i <<
"-th column of field `" <<
field.fSrc->GetFieldName()
549 <<
"` has a representation index higher than 0. This is not supported yet by the merger.";
559 std::stringstream
ss;
560 ss <<
"Field `" <<
field.fSrc->GetFieldName()
561 <<
"` has a different number of children than previously-seen field with the same name (old: "
574 for (
const auto &err :
errors)
575 errMsg += std::string(
"\n * ") + err;
599 if (
mergeData.fMergeOpts.fExtraVerbose) {
600 std::string
msg =
"destination doesn't contain field";
605 return acc + (acc.length() ?
", " :
"") +
'`' + field->GetFieldName() +
'`';
608 <<
" to the destination model (entry #" <<
mergeData.fNumDstEntries <<
").";
644 return R__FAIL(
ex.GetError().GetReport());
667 for (
const auto &column :
columns) {
672 if (
field->GetLogicalColumnIds()[0] != column.fInputId)
692 const auto structure =
field->GetStructure();
696 "Destination RNTuple contains a streamer field (" +
field->GetFieldName() +
697 ") that is not present in one of the sources. "
698 "Creating a default value for a streamer field is ill-defined, therefore the merging process will abort.");
713 for (
size_t i = 0; i <
nPages; ++i) {
728 sealConf.fCompressionSettings =
mergeData.fMergeOpts.fCompressionSettings.value();
729 sealConf.fWriteChecksum =
mergeData.fDestination.GetWriteOptions().GetEnablePageChecksums();
762 const auto &
columnId = column.fInputId;
790 <<
mergeData.fMergeOpts.fCompressionSettings.value()
831 sealedPage.VerifyChecksumIfEnabled().ThrowOnError();
951 if (
fieldType ==
"std::byte")
return typeid(std::byte);
952 if (
fieldType ==
"char")
return typeid(char);
953 if (
fieldType ==
"std::int8_t")
return typeid(std::int8_t);
954 if (
fieldType ==
"std::uint8_t")
return typeid(std::uint8_t);
955 if (
fieldType ==
"std::int16_t")
return typeid(std::int16_t);
956 if (
fieldType ==
"std::uint16_t")
return typeid(std::uint16_t);
957 if (
fieldType ==
"std::int32_t")
return typeid(std::int32_t);
958 if (
fieldType ==
"std::uint32_t")
return typeid(std::uint32_t);
959 if (
fieldType ==
"std::int64_t")
return typeid(std::int64_t);
960 if (
fieldType ==
"std::uint64_t")
return typeid(std::uint64_t);
961 if (
fieldType ==
"float")
return typeid(float);
1010 info.fOutputId = it->second.fColumnId;
1011 info.fColumnType = it->second.fColumnType;
1027 if (
mergeData.fMergeOpts.fExtraVerbose) {
1029 <<
", phys.id " <<
srcColumn.GetPhysicalId() <<
", type "
1031 <<
info.fOutputId <<
", type "
1057 RColumnInfoGroup res;
1067 std::sort(res.fCommonColumns.begin(), res.fCommonColumns.end(),
1068 [](
const auto &
a,
const auto &
b) { return a.fInputId < b.fInputId; });
1074 ColumnIdMap_t &
colIdMap,
const std::string &prefix =
"")
1079 RColumnOutInfo
info{};
1093 std::unique_ptr<ROOT::RNTupleModel> model)
1098 fModel(std::
move(model))
1125 return R__FAIL(std::string(
"The compression given to RNTupleMergeOptions is different from that of the "
1128 ") This is currently unsupported.");
1135 "passing an already-initialized destination to RNTupleMerger::Merge (i.e. trying to do incremental "
1136 "merging) can only be done by providing a valid ROOT::RNTupleModel when constructing the RNTupleMerger.");
1145 for (
const auto &
field :
mergeData.fDstDescriptor.GetTopLevelFields()) {
1150#define SKIP_OR_ABORT(errMsg) \
1152 if (mergeOpts.fErrBehavior == ENTupleMergeErrBehavior::kSkip) { \
1153 R__LOG_WARNING(NTupleMergeLog()) << "Skipping RNTuple due to: " << (errMsg); \
1156 return R__FAIL(errMsg); \
1162 source->Attach(RNTupleSerializer::EDescriptorDeserializeMode::kForWriting);
1176 SKIP_OR_ABORT(std::string(
"Source RNTuple has an incompatible schema with the destination:\n") +
1184 std::string
msg =
"Source RNTuple is missing the following fields:";
1186 msg +=
"\n " +
field->GetFieldName() +
" : " +
field->GetTypeName();
1192 if (
descCmp.fExtraSrcFields.size()) {
1200 std::string
msg =
"Source RNTuple has extra fields that the destination RNTuple doesn't have:";
1202 msg +=
"\n " +
field->GetFieldName() +
" : " +
field->GetTypeName();
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking)
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
#define R__LOG_WARNING(...)
#define R__LOG_ERROR(...)
static std::optional< std::type_index > ColumnInMemoryType(std::string_view fieldType, ENTupleColumnType onDiskType)
static ROOT::RResult< RDescriptorsComparison > CompareDescriptorStructure(const ROOT::RNTupleDescriptor &dst, const ROOT::RNTupleDescriptor &src)
Compares the top level fields of dst and src and determines whether they can be merged or not.
static ROOT::RResult< void > GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span< const RColumnMergeInfo > columns, RSealedPageMergeData &sealedPageData, ROOT::Internal::RPageAllocator &pageAlloc, const ROOT::RNTupleDescriptor &dstDescriptor, const RNTupleMergeData &mergeData)
static std::optional< ENTupleMergeErrBehavior > ParseOptionErrBehavior(const TString &opts)
static ROOT::RLogChannel & NTupleMergeLog()
#define SKIP_OR_ABORT(errMsg)
static std::optional< T > ParseStringOption(const TString &opts, const char *pattern, std::initializer_list< std::pair< const char *, T > > validValues)
static bool IsSplitOrUnsplitVersionOf(ENTupleColumnType a, ENTupleColumnType b)
static void AddColumnsFromField(std::vector< RColumnMergeInfo > &columns, const ROOT::RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData, const ROOT::RFieldDescriptor &srcFieldDesc, const ROOT::RFieldDescriptor &dstFieldDesc, const std::string &prefix="")
static std::optional< ENTupleMergingMode > ParseOptionMergingMode(const TString &opts)
static void PrefillColumnMap(const ROOT::RNTupleDescriptor &desc, const ROOT::RFieldDescriptor &fieldDesc, ColumnIdMap_t &colIdMap, const std::string &prefix="")
static RColumnInfoGroup GatherColumnInfos(const RDescriptorsComparison &descCmp, const ROOT::RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData)
static ROOT::RResult< void > ExtendDestinationModel(std::span< const ROOT::RFieldDescriptor * > newFields, ROOT::RNTupleModel &dstModel, RNTupleMergeData &mergeData, std::vector< RCommonField > &commonFields)
static bool BeginsWithDelimitedWord(const TString &str, const char *word)
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void value
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t src
TRObject operator()(const T1 &t1) const
The available trivial, native content types of a column.
Given a set of RPageSources merge them into an RPagePersistentSink, optionally changing their compres...
ROOT::RResult< void > MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, const ROOT::RClusterDescriptor &clusterDesc, std::span< const RColumnMergeInfo > commonColumns, const ROOT::Internal::RCluster::ColumnSet_t &commonColumnSet, std::size_t nCommonColumnsInCluster, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData, ROOT::Internal::RPageAllocator &pageAlloc)
std::optional< TTaskGroup > fTaskGroup
std::unique_ptr< ROOT::RNTupleModel > fModel
RNTupleMerger(std::unique_ptr< ROOT::Internal::RPagePersistentSink > destination, std::unique_ptr< ROOT::RNTupleModel > model)
Creates a RNTupleMerger with the given destination.
ROOT::RResult< void > MergeSourceClusters(ROOT::Internal::RPageSource &source, std::span< const RColumnMergeInfo > commonColumns, std::span< const RColumnMergeInfo > extraDstColumns, RNTupleMergeData &mergeData)
std::unique_ptr< ROOT::Internal::RPagePersistentSink > fDestination
RResult< void > Merge(std::span< ROOT::Internal::RPageSource * > sources, const RNTupleMergeOptions &mergeOpts=RNTupleMergeOptions())
Merge a given set of sources into the destination.
A class to manage the asynchronous execution of work items.
Managed a set of clusters containing compressed and packed pages.
An in-memory subset of the packed and compressed pages of a cluster.
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
A column element encapsulates the translation between basic C++ types and their column representation...
static const char * GetColumnTypeName(ROOT::ENTupleColumnType type)
static std::unique_ptr< RColumnElementBase > Generate(ROOT::ENTupleColumnType type)
If CppT == void, use the default C++ type for the given column type.
The in-memory representation of a 32bit or 64bit on-disk index column.
Holds the index and the tag of a kSwitch column.
A helper class for serializing and deserialization of the RNTuple binary format.
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
Abstract interface to write data into an ntuple.
RSealedPage SealPage(const ROOT::Internal::RPage &page, const ROOT::Internal::RColumnElementBase &element)
Helper for streaming a page.
Storage provider that reads ntuple pages from a file.
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
Abstract interface to read data from an ntuple.
Common functionality of an ntuple storage for both reading and writing.
static constexpr std::size_t kNBytesPageChecksum
The page checksum is a 64bit xxhash3.
std::deque< RSealedPage > SealedPageSequence_t
std::unordered_map< const ROOT::RFieldBase *, const ROOT::RFieldBase * > FieldMap_t
The map keys are the projected target fields, the map values are the backing source fields Note that ...
RResult< void > Add(std::unique_ptr< ROOT::RFieldBase > field, const FieldMap_t &fieldMap)
Adds a new projected field.
Metadata for RNTuple clusters.
Base class for all ROOT issued exceptions.
Metadata stored for every field of an RNTuple.
ROOT::ENTupleStructure GetStructure() const
ROOT::DescriptorId_t GetParentId() const
std::uint64_t GetNRepetitions() const
A log configuration for a channel, e.g.
The on-storage metadata of an RNTuple.
const RColumnDescriptor & GetColumnDescriptor(ROOT::DescriptorId_t columnId) const
const RFieldDescriptor & GetFieldDescriptor(ROOT::DescriptorId_t fieldId) const
The RNTupleModel encapulates the schema of an RNTuple.
Common user-tunable settings for storing RNTuples.
Representation of an RNTuple data set in a ROOT file.
Long64_t Merge(TCollection *input, TFileMergeInfo *mergeInfo)
RNTuple implements the hadd MergeFile interface Merge this NTuple with the input list entries.
const_iterator begin() const
const_iterator end() const
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Collection abstract base class.
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Mother of all ROOT objects.
@ kStrict
The merger will refuse to merge any 2 RNTuples whose schema doesn't match exactly.
@ kUnion
The merger will update the output model to include all columns from all sources.
std::ostream & operator<<(std::ostream &os, const std::optional< ROOT::RColumnDescriptor::RValueRange > &x)
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
std::unique_ptr< RColumnElementBase > GenerateColumnElement(std::type_index inMemoryType, ROOT::ENTupleColumnType onDiskType)
Namespace for new ROOT classes and functions.
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
constexpr DescriptorId_t kInvalidDescriptorId
ROOT::DescriptorId_t fOutputId
const ROOT::RFieldDescriptor * fParentFieldDescriptor
ROOT::DescriptorId_t fInputId
std::optional< std::type_index > fInMemoryType
const ROOT::RNTupleDescriptor * fParentNTupleDescriptor
ENTupleColumnType fColumnType
const ROOT::RNTupleDescriptor * fSrcDescriptor
ColumnIdMap_t fColumnIdMap
RNTupleMergeData(std::span< RPageSource * > sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts)
std::span< RPageSource * > fSources
ROOT::NTupleSize_t fNumDstEntries
const RNTupleMergeOptions & fMergeOpts
std::vector< RColumnMergeInfo > fColumns
const ROOT::RNTupleDescriptor & fDstDescriptor
Set of merging options to pass to RNTupleMerger.
std::optional< std::uint32_t > fCompressionSettings
If fCompressionSettings is empty (the default), the merger will not change the compression of any of ...
std::vector< RPageStorage::RSealedPageGroup > fGroups
std::deque< RPageStorage::SealedPageSequence_t > fPagesV
std::vector< std::unique_ptr< std::uint8_t[]> > fBuffers
The incremental changes to a RNTupleModel
On-disk pages within a page source are identified by the column and page number.
Parameters for the SealPage() method.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
bool GetHasChecksum() const
@ kUseGeneralPurpose
Use the new recommended general-purpose setting; it is a best trade-off between compression ratio/dec...