39#include <initializer_list>
40#include <unordered_map>
76 std::initializer_list<std::pair<const char *, T>>
validValues)
96 {
"Filter", ENTupleMergingMode::kFilter},
97 {
"Union", ENTupleMergingMode::kUnion},
98 {
"Strict", ENTupleMergingMode::kStrict},
106 {
"Abort", ENTupleMergeErrBehavior::kAbort},
107 {
"Skip", ENTupleMergeErrBehavior::kSkip},
156 "only the latter will apply.";
172 std::vector<std::unique_ptr<RPageSourceFile>>
sources;
175 while (
const auto &
pitr =
itr()) {
180 <<
inFile->GetName() <<
"'";
188 source->Attach(RNTupleSerializer::EDescriptorDeserializeMode::kRaw);
194 <<
"Asked to use the first source's compression as the output compression, but the "
195 "first source (file '"
197 <<
"') has an empty RNTuple, therefore the output compression could not be "
201 auto colRangeIter = (*firstCluster).GetColumnRangeIterable();
205 <<
"Asked to use the first source's compression as the output compression, but the "
206 "first source (file '"
208 <<
"') has an empty RNTuple, therefore the output compression could not be "
212 compression = (*firstColRange).GetCompressionSettings().value();
222 std::unique_ptr<ROOT::RNTupleModel> model;
226 outSource->Attach(RNTupleSerializer::EDescriptorDeserializeMode::kForWriting);
227 auto desc =
outSource->GetSharedDescriptorGuard();
228 model =
destination->InitFromDescriptor(desc.GetRef(),
true );
233 for (
const auto &s :
sources) {
255}
catch (
const std::exception &
ex) {
262struct RChangeCompressionFunc {
270 std::size_t fBufSize;
303 std::size_t fBufSize;
308 auto page = RPageSource::UnsealPage(fSealedPage, fSrcColElement, fPageAlloc).Unwrap();
310 sealConf.fElement = &fDstColElement;
322 std::optional<ROOT::Experimental::TTaskGroup> &fGroup;
324 template <
typename T>
341struct RDescriptorsComparison {
342 std::vector<const ROOT::RFieldDescriptor *> fExtraDstFields;
343 std::vector<const ROOT::RFieldDescriptor *> fExtraSrcFields;
344 std::vector<RCommonField> fCommonFields;
347struct RColumnOutInfo {
353using ColumnIdMap_t = std::unordered_map<std::string, RColumnOutInfo>;
355struct RColumnInfoGroup {
356 std::vector<RColumnMergeInfo> fExtraDstColumns;
358 std::vector<RColumnMergeInfo> fCommonColumns;
403 std::deque<RPageStorage::SealedPageSequence_t>
fPagesV;
404 std::vector<RPageStorage::RSealedPageGroup>
fGroups;
405 std::vector<std::unique_ptr<std::uint8_t[]>>
fBuffers;
408std::ostream &
operator<<(std::ostream &os,
const std::optional<ROOT::RColumnDescriptor::RValueRange> &
x)
411 os <<
'(' <<
x->fMin <<
", " <<
x->fMax <<
')';
458 std::vector<std::string>
errors;
459 RDescriptorsComparison res;
463 for (
const auto &
dstField :
dst.GetTopLevelFields()) {
469 res.fExtraDstFields.emplace_back(&
dstField);
472 for (
const auto &
srcField :
src.GetTopLevelFields()) {
475 res.fExtraSrcFields.push_back(&
srcField);
490 std::stringstream
ss;
491 ss <<
"Field `" <<
fieldName <<
"` is incompatible with previously-seen field with that name because the "
492 << (
field.fSrc->IsProjectedField() ?
"new" :
"old") <<
" one is projected and the other isn't";
494 }
else if (
field.fSrc->IsProjectedField()) {
496 const auto srcName =
src.GetQualifiedFieldName(
field.fSrc->GetProjectionSourceId());
497 const auto dstName =
dst.GetQualifiedFieldName(
field.fDst->GetProjectionSourceId());
499 std::stringstream
ss;
501 <<
"` is projected to a different field than a previously-seen field with the same name (old: "
512 std::stringstream
ss;
514 <<
"` has a type incompatible with a previously-seen field with the same name: (old: " <<
dstTyName
523 std::stringstream
ss;
524 ss <<
"Field `" <<
field.fSrc->GetFieldName()
525 <<
"` has a different type checksum than previously-seen field with the same name";
533 std::stringstream
ss;
534 ss <<
"Field `" <<
field.fSrc->GetFieldName()
535 <<
"` has a different type version than previously-seen field with the same name (old: " <<
dstTyVer
544 std::stringstream
ss;
545 ss <<
"Field `" <<
field.fSrc->GetFieldName()
546 <<
"` has a different field version than previously-seen field with the same name (old: " <<
dstFldVer
554 std::stringstream
ss;
555 ss <<
"Field `" <<
field.fSrc->GetFieldName()
556 <<
"` has a different structural role than previously-seen field with the same name (old: " <<
dstRole
557 <<
", new: " <<
srcRole <<
")";
562 const auto srcNCols =
field.fSrc->GetLogicalColumnIds().size();
563 const auto dstNCols =
field.fDst->GetLogicalColumnIds().size();
565 std::stringstream
ss;
566 ss <<
"Field `" <<
field.fSrc->GetFieldName()
567 <<
"` has a different number of columns than previously-seen field with the same name (old: " <<
dstNCols
581 std::stringstream
ss;
582 ss << i <<
"-th column of field `" <<
field.fSrc->GetFieldName()
583 <<
"` has a different column type of the same column on the previously-seen field with the same name "
589 if (
srcCol.GetBitsOnStorage() !=
dstCol.GetBitsOnStorage()) {
590 std::stringstream
ss;
591 ss << i <<
"-th column of field `" <<
field.fSrc->GetFieldName()
592 <<
"` has a different number of bits of the same column on the previously-seen field with the same "
595 <<
srcCol.GetBitsOnStorage() <<
", new: " <<
dstCol.GetBitsOnStorage() <<
")";
599 std::stringstream
ss;
600 ss << i <<
"-th column of field `" <<
field.fSrc->GetFieldName()
601 <<
"` has a different value range of the same column on the previously-seen field with the same name "
603 <<
srcCol.GetValueRange() <<
", new: " <<
dstCol.GetValueRange() <<
")";
606 if (
srcCol.GetRepresentationIndex() > 0) {
607 std::stringstream
ss;
608 ss << i <<
"-th column of field `" <<
field.fSrc->GetFieldName()
609 <<
"` has a representation index higher than 0. This is not supported yet by the merger.";
619 std::stringstream
ss;
620 ss <<
"Field `" <<
field.fSrc->GetFieldName()
621 <<
"` has a different number of children than previously-seen field with the same name (old: "
634 for (
const auto &err :
errors)
635 errMsg += std::string(
"\n * ") + err;
659 if (
mergeData.fMergeOpts.fExtraVerbose) {
660 std::string
msg =
"destination doesn't contain field";
665 return acc + (acc.length() ?
", " :
"") +
'`' + field->GetFieldName() +
'`';
668 <<
" to the destination model (entry #" <<
mergeData.fNumDstEntries <<
").";
704 return R__FAIL(
ex.GetError().GetReport());
727 for (
const auto &column :
columns) {
732 if (
field->GetLogicalColumnIds()[0] != column.fInputId)
752 const auto structure =
field->GetStructure();
756 "Destination RNTuple contains a streamer field (" +
field->GetFieldName() +
757 ") that is not present in one of the sources. "
758 "Creating a default value for a streamer field is ill-defined, therefore the merging process will abort.");
773 for (
size_t i = 0; i <
nPages; ++i) {
788 sealConf.fCompressionSettings =
mergeData.fMergeOpts.fCompressionSettings.value();
789 sealConf.fWriteChecksum =
mergeData.fDestination.GetWriteOptions().GetEnablePageChecksums();
822 const auto &
columnId = column.fInputId;
856 << (
needsResealing ?
"Resealing" :
"Recompressing") <<
" column " << column.fColumnName
858 <<
mergeData.fMergeOpts.fCompressionSettings.value()
899 sealedPage.VerifyChecksumIfEnabled().ThrowOnError();
912 RTaskVisitor{fTaskGroup}(RResealFunc{
923 RTaskVisitor{fTaskGroup}(RChangeCompressionFunc{
1039 if (
fieldType ==
"std::byte")
return typeid(std::byte);
1040 if (
fieldType ==
"char")
return typeid(char);
1041 if (
fieldType ==
"std::int8_t")
return typeid(std::int8_t);
1042 if (
fieldType ==
"std::uint8_t")
return typeid(std::uint8_t);
1043 if (
fieldType ==
"std::int16_t")
return typeid(std::int16_t);
1044 if (
fieldType ==
"std::uint16_t")
return typeid(std::uint16_t);
1045 if (
fieldType ==
"std::int32_t")
return typeid(std::int32_t);
1046 if (
fieldType ==
"std::uint32_t")
return typeid(std::uint32_t);
1047 if (
fieldType ==
"std::int64_t")
return typeid(std::int64_t);
1048 if (
fieldType ==
"std::uint64_t")
return typeid(std::uint64_t);
1049 if (
fieldType ==
"float")
return typeid(float);
1054 return std::nullopt;
1098 info.fOutputId = it->second.fColumnId;
1099 info.fColumnType = it->second.fColumnType;
1115 if (
mergeData.fMergeOpts.fExtraVerbose) {
1117 <<
", phys.id " <<
srcColumn.GetPhysicalId() <<
", type "
1119 <<
info.fOutputId <<
", type "
1145 RColumnInfoGroup res;
1155 std::sort(res.fCommonColumns.begin(), res.fCommonColumns.end(),
1156 [](
const auto &
a,
const auto &
b) { return a.fInputId < b.fInputId; });
1162 ColumnIdMap_t &
colIdMap,
const std::string &prefix =
"")
1167 RColumnOutInfo
info{};
1181 std::unique_ptr<ROOT::RNTupleModel> model)
1186 fModel(std::
move(model))
1213 return R__FAIL(std::string(
"The compression given to RNTupleMergeOptions is different from that of the "
1216 ") This is currently unsupported.");
1223 "passing an already-initialized destination to RNTupleMerger::Merge (i.e. trying to do incremental "
1224 "merging) can only be done by providing a valid ROOT::RNTupleModel when constructing the RNTupleMerger.");
1233 for (
const auto &
field :
mergeData.fDstDescriptor.GetTopLevelFields()) {
1238#define SKIP_OR_ABORT(errMsg) \
1240 if (mergeOpts.fErrBehavior == ENTupleMergeErrBehavior::kSkip) { \
1241 R__LOG_WARNING(NTupleMergeLog()) << "Skipping RNTuple due to: " << (errMsg); \
1244 return R__FAIL(errMsg); \
1252 source->LoadStreamerInfo();
1254 source->Attach(RNTupleSerializer::EDescriptorDeserializeMode::kForWriting);
1268 SKIP_OR_ABORT(std::string(
"Source RNTuple has an incompatible schema with the destination:\n") +
1276 std::string
msg =
"Source RNTuple is missing the following fields:";
1278 msg +=
"\n " +
field->GetFieldName() +
" : " +
field->GetTypeName();
1284 if (
descCmp.fExtraSrcFields.size()) {
1292 std::string
msg =
"Source RNTuple has extra fields that the destination RNTuple doesn't have:";
1294 msg +=
"\n " +
field->GetFieldName() +
" : " +
field->GetTypeName();
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking)
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
#define R__LOG_WARNING(...)
#define R__LOG_ERROR(...)
static std::optional< std::type_index > ColumnInMemoryType(std::string_view fieldType, ENTupleColumnType onDiskType)
static ROOT::RResult< RDescriptorsComparison > CompareDescriptorStructure(const ROOT::RNTupleDescriptor &dst, const ROOT::RNTupleDescriptor &src)
Compares the top level fields of dst and src and determines whether they can be merged or not.
static ROOT::RResult< void > GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span< const RColumnMergeInfo > columns, RSealedPageMergeData &sealedPageData, ROOT::Internal::RPageAllocator &pageAlloc, const ROOT::RNTupleDescriptor &dstDescriptor, const RNTupleMergeData &mergeData)
static std::optional< ENTupleMergeErrBehavior > ParseOptionErrBehavior(const TString &opts)
static ROOT::RLogChannel & NTupleMergeLog()
#define SKIP_OR_ABORT(errMsg)
static std::optional< T > ParseStringOption(const TString &opts, const char *pattern, std::initializer_list< std::pair< const char *, T > > validValues)
static bool IsSplitOrUnsplitVersionOf(ENTupleColumnType a, ENTupleColumnType b)
static void AddColumnsFromField(std::vector< RColumnMergeInfo > &columns, const ROOT::RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData, const ROOT::RFieldDescriptor &srcFieldDesc, const ROOT::RFieldDescriptor &dstFieldDesc, const std::string &prefix="")
static std::optional< ENTupleMergingMode > ParseOptionMergingMode(const TString &opts)
static void PrefillColumnMap(const ROOT::RNTupleDescriptor &desc, const ROOT::RFieldDescriptor &fieldDesc, ColumnIdMap_t &colIdMap, const std::string &prefix="")
static RColumnInfoGroup GatherColumnInfos(const RDescriptorsComparison &descCmp, const ROOT::RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData)
static ROOT::RResult< void > ExtendDestinationModel(std::span< const ROOT::RFieldDescriptor * > newFields, ROOT::RNTupleModel &dstModel, RNTupleMergeData &mergeData, std::vector< RCommonField > &commonFields)
static bool BeginsWithDelimitedWord(const TString &str, const char *word)
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void value
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t src
TRObject operator()(const T1 &t1) const
The available trivial, native content types of a column.
Given a set of RPageSources merge them into an RPagePersistentSink, optionally changing their compres...
ROOT::RResult< void > MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, const ROOT::RClusterDescriptor &clusterDesc, std::span< const RColumnMergeInfo > commonColumns, const ROOT::Internal::RCluster::ColumnSet_t &commonColumnSet, std::size_t nCommonColumnsInCluster, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData, ROOT::Internal::RPageAllocator &pageAlloc)
std::optional< TTaskGroup > fTaskGroup
std::unique_ptr< ROOT::RNTupleModel > fModel
RNTupleMerger(std::unique_ptr< ROOT::Internal::RPagePersistentSink > destination, std::unique_ptr< ROOT::RNTupleModel > model)
Creates a RNTupleMerger with the given destination.
ROOT::RResult< void > MergeSourceClusters(ROOT::Internal::RPageSource &source, std::span< const RColumnMergeInfo > commonColumns, std::span< const RColumnMergeInfo > extraDstColumns, RNTupleMergeData &mergeData)
std::unique_ptr< ROOT::Internal::RPagePersistentSink > fDestination
RResult< void > Merge(std::span< ROOT::Internal::RPageSource * > sources, const RNTupleMergeOptions &mergeOpts=RNTupleMergeOptions())
Merge a given set of sources into the destination.
A class to manage the asynchronous execution of work items.
Managed a set of clusters containing compressed and packed pages.
An in-memory subset of the packed and compressed pages of a cluster.
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
A column element encapsulates the translation between basic C++ types and their column representation...
static const char * GetColumnTypeName(ROOT::ENTupleColumnType type)
static std::unique_ptr< RColumnElementBase > Generate(ROOT::ENTupleColumnType type)
If CppT == void, use the default C++ type for the given column type.
virtual RIdentifier GetIdentifier() const =0
std::size_t GetPackedSize(std::size_t nElements=1U) const
The in-memory representation of a 32bit or 64bit on-disk index column.
Holds the index and the tag of a kSwitch column.
static std::size_t Zip(const void *from, std::size_t nbytes, int compression, void *to)
Returns the size of the compressed data, written into the provided output buffer.
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
A helper class for serializing and deserialization of the RNTuple binary format.
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
Abstract interface to write data into an ntuple.
RSealedPage SealPage(const ROOT::Internal::RPage &page, const ROOT::Internal::RColumnElementBase &element)
Helper for streaming a page.
Storage provider that reads ntuple pages from a file.
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
Abstract interface to read data from an ntuple.
Common functionality of an ntuple storage for both reading and writing.
static constexpr std::size_t kNBytesPageChecksum
The page checksum is a 64bit xxhash3.
std::deque< RSealedPage > SealedPageSequence_t
std::unordered_map< const ROOT::RFieldBase *, const ROOT::RFieldBase * > FieldMap_t
The map keys are the projected target fields, the map values are the backing source fields Note that ...
RResult< void > Add(std::unique_ptr< ROOT::RFieldBase > field, const FieldMap_t &fieldMap)
Adds a new projected field.
Metadata for RNTuple clusters.
Base class for all ROOT issued exceptions.
Metadata stored for every field of an RNTuple.
ROOT::ENTupleStructure GetStructure() const
ROOT::DescriptorId_t GetParentId() const
std::uint64_t GetNRepetitions() const
A log configuration for a channel, e.g.
The on-storage metadata of an RNTuple.
const RColumnDescriptor & GetColumnDescriptor(ROOT::DescriptorId_t columnId) const
const RFieldDescriptor & GetFieldDescriptor(ROOT::DescriptorId_t fieldId) const
The RNTupleModel encapulates the schema of an RNTuple.
Common user-tunable settings for storing RNTuples.
bool GetEnablePageChecksums() const
Representation of an RNTuple data set in a ROOT file.
Long64_t Merge(TCollection *input, TFileMergeInfo *mergeInfo)
RNTuple implements the hadd MergeFile interface Merge this NTuple with the input list entries.
const_iterator begin() const
const_iterator end() const
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Collection abstract base class.
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Mother of all ROOT objects.
@ kStrict
The merger will refuse to merge any 2 RNTuples whose schema doesn't match exactly.
@ kUnion
The merger will update the output model to include all columns from all sources.
std::ostream & operator<<(std::ostream &os, const std::optional< ROOT::RColumnDescriptor::RValueRange > &x)
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
std::unique_ptr< RColumnElementBase > GenerateColumnElement(std::type_index inMemoryType, ROOT::ENTupleColumnType onDiskType)
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
constexpr DescriptorId_t kInvalidDescriptorId
ROOT::DescriptorId_t fOutputId
const ROOT::RFieldDescriptor * fParentFieldDescriptor
ROOT::DescriptorId_t fInputId
std::optional< std::type_index > fInMemoryType
const ROOT::RNTupleDescriptor * fParentNTupleDescriptor
ENTupleColumnType fColumnType
const ROOT::RNTupleDescriptor * fSrcDescriptor
ColumnIdMap_t fColumnIdMap
RNTupleMergeData(std::span< RPageSource * > sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts)
std::span< RPageSource * > fSources
ROOT::NTupleSize_t fNumDstEntries
const RNTupleMergeOptions & fMergeOpts
std::vector< RColumnMergeInfo > fColumns
const ROOT::RNTupleDescriptor & fDstDescriptor
Set of merging options to pass to RNTupleMerger.
std::optional< std::uint32_t > fCompressionSettings
If fCompressionSettings is empty (the default), the merger will not change the compression of any of ...
std::vector< RPageStorage::RSealedPageGroup > fGroups
std::deque< RPageStorage::SealedPageSequence_t > fPagesV
std::vector< std::unique_ptr< std::uint8_t[]> > fBuffers
The incremental changes to a RNTupleModel
On-disk pages within a page source are identified by the column and page number.
Parameters for the SealPage() method.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RResult< void > VerifyChecksumIfEnabled() const
std::uint32_t GetNElements() const
const void * GetBuffer() const
bool GetHasChecksum() const
std::size_t GetDataSize() const
@ kUseGeneralPurpose
Use the new recommended general-purpose setting; it is a best trade-off between compression ratio/dec...