Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleMerger.cxx
Go to the documentation of this file.
1/// \file RNTupleMerger.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>, Max Orok <maxwellorok@gmail.com>, Alaettin Serhan Mete <amete@anl.gov>
4/// \date 2020-07-08
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RError.hxx>
17#include <ROOT/RNTuple.hxx>
20#include <ROOT/RNTupleModel.hxx>
21#include <ROOT/RNTupleUtil.hxx>
23#include <ROOT/RPageStorage.hxx>
24#include <ROOT/RClusterPool.hxx>
26#include <ROOT/RNTupleZip.hxx>
27#include <ROOT/TTaskGroup.hxx>
28#include <TROOT.h>
29#include <TFileMergeInfo.h>
30#include <TError.h>
31#include <TFile.h>
32#include <TKey.h>
33
34#include <deque>
35
37// IMPORTANT: this function must not throw, as it is used in exception-unsafe code (TFileMerger).
38try {
39 // Check the inputs
40 if (!inputs || inputs->GetEntries() < 3 || !mergeInfo)
41 return -1;
42
43 // Parse the input parameters
44 TIter itr(inputs);
45
46 // First entry is the RNTuple name
47 std::string ntupleName = std::string(itr()->GetName());
48
49 // Second entry is the output file
50 TFile *outFile = dynamic_cast<TFile *>(itr());
51 if (!outFile)
52 return -1;
53
54 // Check if the output file already has a key with that name
55 TKey *outKey = outFile->FindKey(ntupleName.c_str());
56 RNTuple *outNTuple = nullptr;
57 if (outKey) {
58 outNTuple = outKey->ReadObject<RNTuple>();
59 if (!outNTuple) {
60 Error("RNTuple::Merge", "Output file already has key, but not of type RNTuple!");
61 return -1;
62 }
63 // In principle, we should already be working on the RNTuple object from the output file, but just continue with
64 // pointer we just got.
65 }
66
67 // The "fast" options is present if and only if we don't want to change compression.
68 const int compression =
70
71 RNTupleWriteOptions writeOpts;
72 writeOpts.SetUseBufferedWrite(false);
73 if (compression != kUnknownCompressionSettings)
74 writeOpts.SetCompression(compression);
75 auto destination = std::make_unique<Internal::RPageSinkFile>(ntupleName, *outFile, writeOpts);
76
77 // If we already have an existing RNTuple, copy over its descriptor to support incremental merging
78 if (outNTuple) {
79 auto source = Internal::RPageSourceFile::CreateFromAnchor(*outNTuple);
80 source->Attach();
81 auto desc = source->GetSharedDescriptorGuard();
82 destination->InitFromDescriptor(desc.GetRef());
83 }
84
85 // The remaining entries are the input files
86 std::vector<std::unique_ptr<Internal::RPageSourceFile>> sources;
87 std::vector<Internal::RPageSource *> sourcePtrs;
88
89 while (const auto &pitr = itr()) {
90 TFile *inFile = dynamic_cast<TFile *>(pitr);
91 RNTuple *anchor = inFile ? inFile->Get<RNTuple>(ntupleName.c_str()) : nullptr;
92 if (!anchor)
93 return -1;
94 sources.push_back(Internal::RPageSourceFile::CreateFromAnchor(*anchor));
95 }
96
97 // Interface conversion
98 sourcePtrs.reserve(sources.size());
99 for (const auto &s : sources) {
100 sourcePtrs.push_back(s.get());
101 }
102
103 // Now merge
106 options.fCompressionSettings = compression;
107 merger.Merge(sourcePtrs, *destination, options);
108
109 // Provide the caller with a merged anchor object (even though we've already
110 // written it).
111 *this = *outFile->Get<RNTuple>(ntupleName.c_str());
112
113 return 0;
114} catch (const RException &ex) {
115 Error("RNTuple::Merge", "Exception thrown while merging: %s", ex.what());
116 return -1;
117}
118
119////////////////////////////////////////////////////////////////////////////////
121 std::vector<ROOT::Experimental::Internal::RNTupleMerger::RColumnInfo> &columns)
122{
123 for (auto &column : columns) {
124 column.fColumnOutputId = fOutputIdMap.size();
125 fOutputIdMap[column.fColumnName + "." + column.fColumnTypeAndVersion] = column.fColumnOutputId;
126 }
127}
128
129////////////////////////////////////////////////////////////////////////////////
131 std::vector<ROOT::Experimental::Internal::RNTupleMerger::RColumnInfo> &columns)
132{
133 // First ensure that we have the same number of columns
134 if (fOutputIdMap.size() != columns.size()) {
135 throw RException(R__FAIL("Columns between sources do NOT match"));
136 }
137 // Then ensure that we have the same names of columns and assign the ids
138 for (auto &column : columns) {
139 try {
140 column.fColumnOutputId = fOutputIdMap.at(column.fColumnName + "." + column.fColumnTypeAndVersion);
141 } catch (const std::out_of_range &) {
142 throw RException(R__FAIL("Column NOT found in the first source w/ name " + column.fColumnName +
143 " type and version " + column.fColumnTypeAndVersion));
144 }
145 }
146}
147
148////////////////////////////////////////////////////////////////////////////////
150 std::vector<RColumnInfo> &columns)
151{
152 // Here we recursively find the columns and fill the RColumnInfo vector
153 AddColumnsFromField(columns, descriptor, descriptor.GetFieldZero());
154 // Then we either build the internal map (first source) or validate the columns against it (remaning sources)
155 // In either case, we also assign the output ids here
156 if (fOutputIdMap.empty()) {
157 BuildColumnIdMap(columns);
158 } else {
159 ValidateColumns(columns);
160 }
161}
162
163////////////////////////////////////////////////////////////////////////////////
165 const RNTupleDescriptor &desc,
166 const RFieldDescriptor &fieldDesc,
167 const std::string &prefix)
168{
169 for (const auto &field : desc.GetFieldIterable(fieldDesc)) {
170 std::string name = prefix + field.GetFieldName() + ".";
171 const std::string typeAndVersion = field.GetTypeName() + "." + std::to_string(field.GetTypeVersion());
172 auto columnIter = desc.GetColumnIterable(field);
173 columns.reserve(columns.size() + columnIter.count());
174 for (const auto &column : columnIter) {
175 columns.emplace_back(name + std::to_string(column.GetIndex()), typeAndVersion, column.GetPhysicalId(),
177 }
178 AddColumnsFromField(columns, desc, field, name);
179 }
180}
181
182////////////////////////////////////////////////////////////////////////////////
183void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span<RPageSource *> sources, RPageSink &destination,
184 const RNTupleMergeOptions &options)
185{
186 std::vector<RColumnInfo> columns;
187 RCluster::ColumnSet_t columnSet;
188
189 if (destination.IsInitialized()) {
190 CollectColumns(destination.GetDescriptor(), columns);
191 }
192
193 std::unique_ptr<RNTupleModel> model; // used to initialize the schema of the output RNTuple
194 std::optional<TTaskGroup> taskGroup;
195#ifdef R__USE_IMT
197 taskGroup = TTaskGroup();
198#endif
199
200 // Append the sources to the destination one-by-one
201 for (const auto &source : sources) {
202 source->Attach();
203
204 RClusterPool clusterPool{*source};
205
206 // Get a handle on the descriptor (metadata)
207 auto descriptor = source->GetSharedDescriptorGuard();
208
209 // Collect all the columns
210 // The column name : output column id map is only built once
211 columns.clear(), columnSet.clear();
212 CollectColumns(descriptor.GetRef(), columns);
213 columnSet.reserve(columns.size());
214 for (const auto &column : columns)
215 columnSet.emplace(column.fColumnInputId);
216
217 // Create sink from the input model if not initialized
218 if (!destination.IsInitialized()) {
219 model = descriptor->CreateModel();
220 destination.Init(*model.get());
221 }
222
223 for (const auto &extraTypeInfoDesc : descriptor->GetExtraTypeInfoIterable()) {
224 destination.UpdateExtraTypeInfo(extraTypeInfoDesc);
225 }
226
227 // Make sure the source contains events to be merged
228 if (source->GetNEntries() == 0) {
229 continue;
230 }
231
232 // Now loop over all clusters in this file
233 // descriptor->GetClusterIterable() doesn't guarantee any specific order...
234 // Find the first cluster id and iterate from there...
235 auto clusterId = descriptor->FindClusterId(0, 0);
236
237 while (clusterId != ROOT::Experimental::kInvalidDescriptorId) {
238 auto *cluster = clusterPool.GetCluster(clusterId, columnSet);
239 assert(cluster);
240 const auto &clusterDesc = descriptor->GetClusterDescriptor(clusterId);
241
242 // We use a std::deque so that references to the contained SealedPageSequence_t, and its iterators, are never
243 // invalidated.
244 std::deque<RPageStorage::SealedPageSequence_t> sealedPagesV;
245 std::vector<RPageStorage::RSealedPageGroup> sealedPageGroups;
246 std::vector<std::unique_ptr<unsigned char[]>> sealedPageBuffers;
247
248 for (const auto &column : columns) {
249
250 // See if this cluster contains this column
251 // if not, there is nothing to read/do...
252 auto columnId = column.fColumnInputId;
253 if (!clusterDesc.ContainsColumn(columnId)) {
254 continue;
255 }
256
257 const auto &columnDesc = descriptor->GetColumnDescriptor(columnId);
258 const auto colElement = RColumnElementBase::Generate(columnDesc.GetType());
259
260 // Now get the pages for this column in this cluster
261 const auto &pages = clusterDesc.GetPageRange(columnId);
262
264 sealedPages.resize(pages.fPageInfos.size());
265
266 // Each column range potentially has a distinct compression settings
267 const auto colRangeCompressionSettings = clusterDesc.GetColumnRange(columnId).fCompressionSettings;
268 const bool needsCompressionChange = options.fCompressionSettings != kUnknownCompressionSettings &&
269 colRangeCompressionSettings != options.fCompressionSettings;
270
271 // If the column range is already uncompressed we don't need to allocate any new buffer, so we don't
272 // bother reserving memory for them.
273 size_t pageBufferBaseIdx = sealedPageBuffers.size();
274 if (colRangeCompressionSettings != 0)
275 sealedPageBuffers.resize(sealedPageBuffers.size() + pages.fPageInfos.size());
276
277 std::uint64_t pageIdx = 0;
278
279 // Loop over the pages
280 for (const auto &pageInfo : pages.fPageInfos) {
281 assert(pageIdx < sealedPages.size());
282 assert(sealedPageBuffers.size() == 0 || pageIdx < sealedPageBuffers.size());
283
284 ROnDiskPage::Key key{columnId, pageIdx};
285 auto onDiskPage = cluster->GetOnDiskPage(key);
286
287 const auto checksumSize = pageInfo.fHasChecksum * RPageStorage::kNBytesPageChecksum;
288 RPageStorage::RSealedPage &sealedPage = sealedPages[pageIdx];
289 sealedPage.SetNElements(pageInfo.fNElements);
290 sealedPage.SetHasChecksum(pageInfo.fHasChecksum);
291 sealedPage.SetBufferSize(pageInfo.fLocator.fBytesOnStorage + checksumSize);
292 sealedPage.SetBuffer(onDiskPage->GetAddress());
294 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));
295
296 // Change compression if needed
297 if (needsCompressionChange) {
298 auto taskFunc = [ // values in
299 pageIdx, colRangeCompressionSettings, pageBufferBaseIdx, checksumSize,
300 // const refs in
301 &colElement, &pageInfo, &options,
302 // refs in-out
303 &sealedPage, &sealedPageBuffers]() {
304 // Step 1: prepare the source data.
305 // Unzip the source buffer into the zip staging buffer. This is a memcpy if the source was
306 // already uncompressed.
307 // Note that the checksum, if present, is not zipped, so we only need to unzip
308 // `sealedPage.GetDataSize()` bytes.
309 const auto uncompressedSize = colElement->GetSize() * sealedPage.GetNElements();
310 auto zipBuffer = std::make_unique<unsigned char[]>(uncompressedSize);
311 RNTupleDecompressor::Unzip(sealedPage.GetBuffer(), sealedPage.GetDataSize(), uncompressedSize,
312 zipBuffer.get());
313
314 // Step 2: prepare the destination buffer.
315 if (uncompressedSize != sealedPage.GetDataSize()) {
316 // source page is compressed
317 R__ASSERT(colRangeCompressionSettings != 0);
318
319 // We need to reallocate sealedPage's buffer because we are going to recompress the data
320 // with a different algorithm/level. Since we don't know a priori how big that'll be, the
321 // only safe bet is to allocate a buffer big enough to hold as many bytes as the uncompressed
322 // data.
323 R__ASSERT(sealedPage.GetDataSize() < uncompressedSize);
324 auto &newBuf = sealedPageBuffers[pageBufferBaseIdx + pageIdx];
325 newBuf = std::make_unique<unsigned char[]>(uncompressedSize + checksumSize);
326 sealedPage.SetBuffer(newBuf.get());
327 } else {
328 // source page is uncompressed. We can reuse the sealedPage's buffer since it's big
329 // enough.
330 // Note that this does not necessarily mean that the column range's compressionSettings are 0,
331 // as a page might have been stored uncompressed because it was not compressible with its
332 // advertised compression settings.
333 }
334
335 const auto newNBytes =
336 RNTupleCompressor::Zip(zipBuffer.get(), uncompressedSize, options.fCompressionSettings,
337 const_cast<void *>(sealedPage.GetBuffer()));
338 sealedPage.SetBufferSize(newNBytes + checksumSize);
339 if (pageInfo.fHasChecksum) {
340 // Calculate new checksum (this must happen after setting the new buffer size!)
341 sealedPage.ChecksumIfEnabled();
342 }
343 };
344
345 if (taskGroup)
346 taskGroup->Run(taskFunc);
347 else
348 taskFunc();
349 }
350
351 ++pageIdx;
352
353 } // end of loop over pages
354
355 if (taskGroup)
356 taskGroup->Wait();
357 sealedPagesV.push_back(std::move(sealedPages));
358 sealedPageGroups.emplace_back(column.fColumnOutputId, sealedPagesV.back().cbegin(),
359 sealedPagesV.back().cend());
360
361 } // end of loop over columns
362
363 // Now commit all pages to the output
364 destination.CommitSealedPageV(sealedPageGroups);
365
366 // Commit the clusters
367 destination.CommitCluster(clusterDesc.GetNEntries());
368
369 // Go to the next cluster
370 clusterId = descriptor->FindNextClusterId(clusterId);
371
372 } // end of loop over clusters
373
374 // Commit all clusters for this input
375 destination.CommitClusterGroup();
376
377 } // end of loop over sources
378
379 // Commit the output
380 destination.CommitDataset();
381}
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:290
long long Long64_t
Definition RtypesCore.h:69
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:185
char name[80]
Definition TGX11.cxx:110
Managed a set of clusters containing compressed and packed pages.
std::unordered_set< DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:154
Given a set of RPageSources merge them into an RPageSink, optionally changing their compression.
void AddColumnsFromField(std::vector< RColumnInfo > &columns, const RNTupleDescriptor &desc, const RFieldDescriptor &fieldDesc, const std::string &prefix="")
Recursively add columns from a given field.
void Merge(std::span< RPageSource * > sources, RPageSink &destination, const RNTupleMergeOptions &options=RNTupleMergeOptions())
Merge a given set of sources into the destination.
void CollectColumns(const RNTupleDescriptor &descriptor, std::vector< RColumnInfo > &columns)
Recursively collect all the columns for all the fields rooted at field zero.
void BuildColumnIdMap(std::vector< RColumnInfo > &columns)
Build the internal column id map from the first source This is where we assign the output ids for the...
void ValidateColumns(std::vector< RColumnInfo > &columns)
Validate the columns against the internal map that is built from the first source This is where we as...
Abstract interface to write data into an ntuple.
virtual void UpdateExtraTypeInfo(const RExtraTypeInfoDescriptor &extraTypeInfo)=0
Adds an extra type information record to schema.
void CommitDataset()
Run the registered callbacks and finalize the current cluster and the entrire data set.
virtual const RNTupleDescriptor & GetDescriptor() const =0
Return the RNTupleDescriptor being constructed.
void Init(RNTupleModel &model)
Physically creates the storage container to hold the ntuple (e.g., a keys a TFile or an S3 bucket) In...
virtual void CommitClusterGroup()=0
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
virtual std::uint64_t CommitCluster(NTupleSize_t nNewEntries)=0
Finalize the current cluster and create a new one for the following data.
virtual void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges)=0
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const RNTupleReadOptions &options=RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
std::deque< RSealedPage > SealedPageSequence_t
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
Meta-data stored for every field of an ntuple.
The on-storage meta-data of an ntuple.
RColumnDescriptorIterable GetColumnIterable() const
const RFieldDescriptor & GetFieldZero() const
RFieldDescriptorIterable GetFieldIterable(const RFieldDescriptor &fieldDesc) const
Common user-tunable settings for storing ntuples.
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:61
Long64_t Merge(TCollection *input, TFileMergeInfo *mergeInfo)
RNTuple implements the hadd MergeFile interface Merge this NTuple with the input list entries.
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
Definition RError.hxx:281
A class to manage the asynchronous execution of work items.
Collection abstract base class.
Definition TCollection.h:65
virtual Int_t GetEntries() const
TKey * FindKey(const char *keyname) const override
Find key with name keyname in the current directory.
TObject * Get(const char *namecycle) override
Return pointer to object identified by namecycle.
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Definition TFile.h:53
Int_t GetCompressionSettings() const
Definition TFile.h:397
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition TKey.h:28
T * ReadObject()
To read an object (non deriving from TObject) from the file.
Definition TKey.h:103
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition TString.h:632
Double_t ex[n]
Definition legend1.C:17
constexpr int kUnknownCompressionSettings
constexpr DescriptorId_t kInvalidDescriptorId
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:570
int fCompressionSettings
If fCompressionSettings == kUnknownCompressionSettings (the default), the merger will not change the ...
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:52
A sealed page contains the bytes of a page as written to storage (packed & compressed).