1/// \file RNTupleMerger.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>, Max Orok <maxwellorok@gmail.com>, Alaettin Serhan Mete <amete@anl.gov>
4/// \date 2020-07-08
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
9 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
16#include <ROOT/RError.hxx>
17#include <ROOT/RNTuple.hxx>
20#include <ROOT/RNTupleModel.hxx>
21#include <ROOT/RNTupleUtil.hxx>
23#include <ROOT/RPageStorage.hxx>
24#include <ROOT/RClusterPool.hxx>
26#include <ROOT/RNTupleZip.hxx>
27#include <ROOT/TTaskGroup.hxx>
28#include <TROOT.h>
29#include <TFileMergeInfo.h>
30#include <TError.h>
31#include <TFile.h>
32#include <TKey.h>
34#include <deque>
37// IMPORTANT: this function must not throw, as it is used in exception-unsafe code (TFileMerger).
38try {
39 // Check the inputs
40 if (!inputs || inputs->GetEntries() < 3 || !mergeInfo)
41 return -1;
43 // Parse the input parameters
44 TIter itr(inputs);
46 // First entry is the RNTuple name
47 std::string ntupleName = std::string(itr()->GetName());
49 // Second entry is the output file
50 TFile *outFile = dynamic_cast<TFile *>(itr());
51 if (!outFile)
52 return -1;
54 // Check if the output file already has a key with that name
55 TKey *outKey = outFile->FindKey(ntupleName.c_str());
56 RNTuple *outNTuple = nullptr;
57 if (outKey) {
58 outNTuple = outKey->ReadObject<RNTuple>();
59 if (!outNTuple) {
60 Error("RNTuple::Merge", "Output file already has key, but not of type RNTuple!");
61 return -1;
62 }
63 // In principle, we should already be working on the RNTuple object from the output file, but just continue with
64 // pointer we just got.
65 }
67 // The "fast" options is present if and only if we don't want to change compression.
68 const int compression =
71 RNTupleWriteOptions writeOpts;
72 writeOpts.SetUseBufferedWrite(false);
73 if (compression != kUnknownCompressionSettings)
74 writeOpts.SetCompression(compression);
75 auto destination = std::make_unique<Internal::RPageSinkFile>(ntupleName, *outFile, writeOpts);
77 // If we already have an existing RNTuple, copy over its descriptor to support incremental merging
78 if (outNTuple) {
79 auto source = Internal::RPageSourceFile::CreateFromAnchor(*outNTuple);
80 source->Attach();
81 auto desc = source->GetSharedDescriptorGuard();
82 destination->InitFromDescriptor(desc.GetRef());
83 }
85 // The remaining entries are the input files
86 std::vector<std::unique_ptr<Internal::RPageSourceFile>> sources;
87 std::vector<Internal::RPageSource *> sourcePtrs;
89 while (const auto &pitr = itr()) {
90 TFile *inFile = dynamic_cast<TFile *>(pitr);
91 RNTuple *anchor = inFile ? inFile->Get<RNTuple>(ntupleName.c_str()) : nullptr;
92 if (!anchor)
93 return -1;
94 sources.push_back(Internal::RPageSourceFile::CreateFromAnchor(*anchor));
95 }
97 // Interface conversion
98 sourcePtrs.reserve(sources.size());
99 for (const auto &s : sources) {
100 sourcePtrs.push_back(s.get());
101 }
103 // Now merge
106 options.fCompressionSettings = compression;
107 merger.Merge(sourcePtrs, *destination, options);
109 // Provide the caller with a merged anchor object (even though we've already
110 // written it).
111 *this = *outFile->Get<RNTuple>(ntupleName.c_str());
113 return 0;
114} catch (const RException &ex) {
115 Error("RNTuple::Merge", "Exception thrown while merging: %s", ex.what());
116 return -1;
121 std::vector<ROOT::Experimental::Internal::RNTupleMerger::RColumnInfo> &columns)
123 for (auto &column : columns) {
124 column.fColumnOutputId = fOutputIdMap.size();
125 fOutputIdMap[column.fColumnName + "." + column.fColumnTypeAndVersion] = column.fColumnOutputId;
126 }
131 std::vector<ROOT::Experimental::Internal::RNTupleMerger::RColumnInfo> &columns)
133 // First ensure that we have the same number of columns
134 if (fOutputIdMap.size() != columns.size()) {
135 throw RException(R__FAIL("Columns between sources do NOT match"));
136 }
137 // Then ensure that we have the same names of columns and assign the ids
138 for (auto &column : columns) {
139 try {
140 column.fColumnOutputId = fOutputIdMap.at(column.fColumnName + "." + column.fColumnTypeAndVersion);
141 } catch (const std::out_of_range &) {
142 throw RException(R__FAIL("Column NOT found in the first source w/ name " + column.fColumnName +
143 " type and version " + column.fColumnTypeAndVersion));
144 }
145 }
150 std::vector<RColumnInfo> &columns)
152 // Here we recursively find the columns and fill the RColumnInfo vector
153 AddColumnsFromField(columns, descriptor, descriptor.GetFieldZero());
154 // Then we either build the internal map (first source) or validate the columns against it (remaning sources)
155 // In either case, we also assign the output ids here
156 if (fOutputIdMap.empty()) {
157 BuildColumnIdMap(columns);
158 } else {
159 ValidateColumns(columns);
160 }
165 const RNTupleDescriptor &desc,
166 const RFieldDescriptor &fieldDesc,
167 const std::string &prefix)
169 for (const auto &field : desc.GetFieldIterable(fieldDesc)) {
170 std::string name = prefix + field.GetFieldName() + ".";
171 const std::string typeAndVersion = field.GetTypeName() + "." + std::to_string(field.GetTypeVersion());
172 auto columnIter = desc.GetColumnIterable(field);
173 columns.reserve(columns.size() + columnIter.count());
174 for (const auto &column : columnIter) {
175 columns.emplace_back(name + std::to_string(column.GetIndex()), typeAndVersion, column.GetPhysicalId(),
177 }
178 AddColumnsFromField(columns, desc, field, name);
179 }
183void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span<RPageSource *> sources, RPageSink &destination,
184 const RNTupleMergeOptions &options)
186 std::vector<RColumnInfo> columns;
187 RCluster::ColumnSet_t columnSet;
189 if (destination.IsInitialized()) {
190 CollectColumns(destination.GetDescriptor(), columns);
191 }
193 std::unique_ptr<RNTupleModel> model; // used to initialize the schema of the output RNTuple
194 std::optional<TTaskGroup> taskGroup;
195#ifdef R__USE_IMT
197 taskGroup = TTaskGroup();
200 // Append the sources to the destination one-by-one
201 for (const auto &source : sources) {
202 source->Attach();
204 RClusterPool clusterPool{*source};
206 // Get a handle on the descriptor (metadata)
207 auto descriptor = source->GetSharedDescriptorGuard();
209 // Collect all the columns
210 // The column name : output column id map is only built once
211 columns.clear(), columnSet.clear();
212 CollectColumns(descriptor.GetRef(), columns);
213 columnSet.reserve(columns.size());
214 for (const auto &column : columns)
215 columnSet.emplace(column.fColumnInputId);
217 // Create sink from the input model if not initialized
218 if (!destination.IsInitialized()) {
219 model = descriptor->CreateModel();
220 destination.Init(*model.get());
221 }
223 for (const auto &extraTypeInfoDesc : descriptor->GetExtraTypeInfoIterable()) {
224 destination.UpdateExtraTypeInfo(extraTypeInfoDesc);
225 }
227 // Make sure the source contains events to be merged
228 if (source->GetNEntries() == 0) {
229 continue;
230 }
232 // Now loop over all clusters in this file
233 // descriptor->GetClusterIterable() doesn't guarantee any specific order...
234 // Find the first cluster id and iterate from there...
235 auto clusterId = descriptor->FindClusterId(0, 0);
237 while (clusterId != ROOT::Experimental::kInvalidDescriptorId) {
238 auto *cluster = clusterPool.GetCluster(clusterId, columnSet);
239 assert(cluster);
240 const auto &clusterDesc = descriptor->GetClusterDescriptor(clusterId);
242 // We use a std::deque so that references to the contained SealedPageSequence_t, and its iterators, are never
243 // invalidated.
244 std::deque<RPageStorage::SealedPageSequence_t> sealedPagesV;
245 std::vector<RPageStorage::RSealedPageGroup> sealedPageGroups;
246 std::vector<std::unique_ptr<unsigned char[]>> sealedPageBuffers;
248 for (const auto &column : columns) {
250 // See if this cluster contains this column
251 // if not, there is nothing to read/do...
252 auto columnId = column.fColumnInputId;
253 if (!clusterDesc.ContainsColumn(columnId)) {
254 continue;
255 }
257 const auto &columnDesc = descriptor->GetColumnDescriptor(columnId);
258 const auto colElement = RColumnElementBase::Generate(columnDesc.GetType());
260 // Now get the pages for this column in this cluster
261 const auto &pages = clusterDesc.GetPageRange(columnId);
264 sealedPages.resize(pages.fPageInfos.size());
266 // Each column range potentially has a distinct compression settings
267 const auto colRangeCompressionSettings = clusterDesc.GetColumnRange(columnId).fCompressionSettings;
268 const bool needsCompressionChange = options.fCompressionSettings != kUnknownCompressionSettings &&
269 colRangeCompressionSettings != options.fCompressionSettings;
271 // If the column range is already uncompressed we don't need to allocate any new buffer, so we don't
272 // bother reserving memory for them.
273 size_t pageBufferBaseIdx = sealedPageBuffers.size();
274 if (colRangeCompressionSettings != 0)
275 sealedPageBuffers.resize(sealedPageBuffers.size() + pages.fPageInfos.size());
277 std::uint64_t pageIdx = 0;
279 // Loop over the pages
280 for (const auto &pageInfo : pages.fPageInfos) {
281 assert(pageIdx < sealedPages.size());
282 assert(sealedPageBuffers.size() == 0 || pageIdx < sealedPageBuffers.size());
284 ROnDiskPage::Key key{columnId, pageIdx};
285 auto onDiskPage = cluster->GetOnDiskPage(key);
287 const auto checksumSize = pageInfo.fHasChecksum * RPageStorage::kNBytesPageChecksum;
288 RPageStorage::RSealedPage &sealedPage = sealedPages[pageIdx];
289 sealedPage.SetNElements(pageInfo.fNElements);
290 sealedPage.SetHasChecksum(pageInfo.fHasChecksum);
291 sealedPage.SetBufferSize(pageInfo.fLocator.fBytesOnStorage + checksumSize);
292 sealedPage.SetBuffer(onDiskPage->GetAddress());
294 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));
296 // Change compression if needed
297 if (needsCompressionChange) {
298 auto taskFunc = [ // values in
299 pageIdx, colRangeCompressionSettings, pageBufferBaseIdx, checksumSize,
300 // const refs in
301 &colElement, &pageInfo, &options,
302 // refs in-out
303 &sealedPage, &sealedPageBuffers]() {
304 // Step 1: prepare the source data.
305 // Unzip the source buffer into the zip staging buffer. This is a memcpy if the source was
306 // already uncompressed.
307 // Note that the checksum, if present, is not zipped, so we only need to unzip
308 // `sealedPage.GetDataSize()` bytes.
309 const auto uncompressedSize = colElement->GetSize() * sealedPage.GetNElements();
310 auto zipBuffer = std::make_unique<unsigned char[]>(uncompressedSize);
311 RNTupleDecompressor::Unzip(sealedPage.GetBuffer(), sealedPage.GetDataSize(), uncompressedSize,
312 zipBuffer.get());
314 // Step 2: prepare the destination buffer.
315 if (uncompressedSize != sealedPage.GetDataSize()) {
316 // source page is compressed
317 R__ASSERT(colRangeCompressionSettings != 0);
319 // We need to reallocate sealedPage's buffer because we are going to recompress the data
320 // with a different algorithm/level. Since we don't know a priori how big that'll be, the
321 // only safe bet is to allocate a buffer big enough to hold as many bytes as the uncompressed
322 // data.
323 R__ASSERT(sealedPage.GetDataSize() < uncompressedSize);
324 auto &newBuf = sealedPageBuffers[pageBufferBaseIdx + pageIdx];
325 newBuf = std::make_unique<unsigned char[]>(uncompressedSize + checksumSize);
326 sealedPage.SetBuffer(newBuf.get());
327 } else {
328 // source page is uncompressed. We can reuse the sealedPage's buffer since it's big
329 // enough.
330 // Note that this does not necessarily mean that the column range's compressionSettings are 0,
331 // as a page might have been stored uncompressed because it was not compressible with its
332 // advertised compression settings.
333 }
335 const auto newNBytes =
336 RNTupleCompressor::Zip(zipBuffer.get(), uncompressedSize, options.fCompressionSettings,
337 const_cast<void *>(sealedPage.GetBuffer()));
338 sealedPage.SetBufferSize(newNBytes + checksumSize);
339 if (pageInfo.fHasChecksum) {
340 // Calculate new checksum (this must happen after setting the new buffer size!)
341 sealedPage.ChecksumIfEnabled();
342 }
343 };
345 if (taskGroup)
346 taskGroup->Run(taskFunc);
347 else
348 taskFunc();
349 }
351 ++pageIdx;
353 } // end of loop over pages
355 if (taskGroup)
356 taskGroup->Wait();
357 sealedPagesV.push_back(std::move(sealedPages));
358 sealedPageGroups.emplace_back(column.fColumnOutputId, sealedPagesV.back().cbegin(),
359 sealedPagesV.back().cend());
361 } // end of loop over columns
363 // Now commit all pages to the output
364 destination.CommitSealedPageV(sealedPageGroups);
366 // Commit the clusters
367 destination.CommitCluster(clusterDesc.GetNEntries());
369 // Go to the next cluster
370 clusterId = descriptor->FindNextClusterId(clusterId);
372 } // end of loop over clusters
374 // Commit all clusters for this input
375 destination.CommitClusterGroup();
377 } // end of loop over sources
379 // Commit the output
380 destination.CommitDataset();
