32 if (!inputs || inputs->
GetEntries() < 3 || !mergeInfo)
39 std::string ntupleName = std::string(itr()->GetName());
47 TKey *outKey = outFile->
FindKey(ntupleName.c_str());
52 Error(
"RNTuple::Merge",
"Output file already has key, but not of type RNTuple!");
61 auto destination = std::make_unique<Internal::RPageSinkFile>(ntupleName, *outFile, writeOpts);
67 auto desc = source->GetSharedDescriptorGuard();
68 destination->InitFromDescriptor(desc.GetRef());
72 std::vector<std::unique_ptr<Internal::RPageSourceFile>> sources;
73 std::vector<Internal::RPageSource *> sourcePtrs;
75 while (
const auto &pitr = itr()) {
84 for (
const auto &s : sources) {
85 sourcePtrs.push_back(s.get());
90 merger.
Merge(sourcePtrs, *destination);
94 *
this = *outFile->
Get<
RNTuple>(ntupleName.c_str());
101 std::vector<ROOT::Experimental::Internal::RNTupleMerger::RColumnInfo> &columns)
103 for (
auto &column : columns) {
104 column.fColumnOutputId = fOutputIdMap.size();
105 fOutputIdMap[column.fColumnName +
"." + column.fColumnTypeAndVersion] = column.fColumnOutputId;
111 std::vector<ROOT::Experimental::Internal::RNTupleMerger::RColumnInfo> &columns)
114 if (fOutputIdMap.size() != columns.size()) {
118 for (
auto &column : columns) {
120 column.fColumnOutputId = fOutputIdMap.at(column.fColumnName +
"." + column.fColumnTypeAndVersion);
121 }
catch (
const std::out_of_range &) {
122 throw RException(
R__FAIL(
"Column NOT found in the first source w/ name " + column.fColumnName +
123 " type and version " + column.fColumnTypeAndVersion));
129std::vector<ROOT::Experimental::Internal::RNTupleMerger::RColumnInfo>
132 std::vector<RColumnInfo> columns;
134 AddColumnsFromField(columns, descriptor, descriptor.
GetFieldZero());
137 if (fOutputIdMap.empty()) {
138 BuildColumnIdMap(columns);
140 ValidateColumns(columns);
147 std::vector<ROOT::Experimental::Internal::RNTupleMerger::RColumnInfo> &columns,
const RNTupleDescriptor &desc,
151 std::string
name = prefix + field.GetFieldName() +
".";
152 const std::string typeAndVersion = field.GetTypeName() +
"." + std::to_string(field.GetTypeVersion());
154 columns.emplace_back(
name + std::to_string(column.GetIndex()), typeAndVersion, column.GetPhysicalId(),
157 AddColumnsFromField(columns, desc, field,
name);
169 for (
const auto &source : sources) {
173 if (source->GetNEntries() == 0) {
178 auto descriptor = source->GetSharedDescriptorGuard();
182 auto columns = CollectColumns(descriptor.GetRef());
186 auto model = descriptor->CreateModel();
187 destination.
Init(*model.get());
193 auto clusterId = descriptor->FindClusterId(0, 0);
196 auto &cluster = descriptor->GetClusterDescriptor(clusterId);
198 std::vector<std::unique_ptr<unsigned char[]>> buffers;
201 std::deque<RPageStorage::SealedPageSequence_t> sealedPagesV;
202 std::vector<RPageStorage::RSealedPageGroup> sealedPageGroups;
204 for (
const auto &column : columns) {
208 auto columnId = column.fColumnInputId;
209 if (!cluster.ContainsColumn(columnId)) {
214 const auto &pages = cluster.GetPageRange(columnId);
220 for (
const auto &pageInfo : pages.fPageInfos) {
227 source->LoadSealedPage(columnId, clusterIndex, sealedPage);
231 auto buffer = std::make_unique<unsigned char[]>(sealedPage.
fSize);
232 sealedPage.
fBuffer = buffer.get();
233 source->LoadSealedPage(columnId, clusterIndex, sealedPage);
235 buffers.push_back(std::move(buffer));
236 sealedPages.push_back(std::move(sealedPage));
239 idx += pageInfo.fNElements;
243 sealedPagesV.push_back(std::move(sealedPages));
244 sealedPageGroups.emplace_back(column.fColumnOutputId, sealedPagesV.back().cbegin(),
245 sealedPagesV.back().cend());
256 clusterId = descriptor->FindNextClusterId(clusterId);
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Given a set of RPageSources merge them into an RPageSink.
void AddColumnsFromField(std::vector< RColumnInfo > &columns, const RNTupleDescriptor &desc, const RFieldDescriptor &fieldDesc, const std::string &prefix="")
Recursively add columns from a given field.
void Merge(std::span< RPageSource * > sources, RPageSink &destination)
Merge a given set of sources into the destination.
std::vector< RColumnInfo > CollectColumns(const RNTupleDescriptor &descriptor)
Recursively collect all the columns for all the fields rooted at field zero.
void BuildColumnIdMap(std::vector< RColumnInfo > &columns)
Build the internal column id map from the first source This is where we assign the output ids for the...
void ValidateColumns(std::vector< RColumnInfo > &columns)
Validate the columns against the internal map that is built from the first source This is where we as...
Abstract interface to write data into an ntuple.
bool IsInitialized() const
virtual const RNTupleDescriptor & GetDescriptor() const =0
Return the RNTupleDescriptor being constructed.
void Init(RNTupleModel &model)
Physically creates the storage container to hold the ntuple (e.g., a keys a TFile or an S3 bucket) In...
virtual void CommitClusterGroup()=0
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
virtual std::uint64_t CommitCluster(NTupleSize_t nNewEntries)=0
Finalize the current cluster and create a new one for the following data.
virtual void CommitDataset()=0
Finalize the current cluster and the entrire data set.
virtual void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges)=0
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const RNTupleReadOptions &options=RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
std::deque< RSealedPage > SealedPageSequence_t
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Base class for all ROOT issued exceptions.
Meta-data stored for every field of an ntuple.
The on-storage meta-data of an ntuple.
RFieldDescriptorIterable GetFieldIterable(const RFieldDescriptor &fieldDesc) const
RColumnDescriptorIterable GetColumnIterable() const
const RFieldDescriptor & GetFieldZero() const
Common user-tunable settings for storing ntuples.
void SetUseBufferedWrite(bool val)
Representation of an RNTuple data set in a ROOT file.
Long64_t Merge(TCollection *input, TFileMergeInfo *mergeInfo)
RNTuple implements the hadd MergeFile interface Merge this NTuple with the input list entries.
Collection abstract base class.
virtual Int_t GetEntries() const
TKey * FindKey(const char *keyname) const override
Find key with name keyname in the current directory.
TObject * Get(const char *namecycle) override
Return pointer to object identified by namecycle.
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Book space in a file, create I/O buffers, to fill them, (un)compress them.
T * ReadObject()
To read an object (non deriving from TObject) from the file.
constexpr DescriptorId_t kInvalidDescriptorId
A sealed page contains the bytes of a page as written to storage (packed & compressed).