Logo ROOT  
Reference Guide
Loading...
Searching...
No Matches
RNTupleExporter.cxx
Go to the documentation of this file.
1/// \file RNTupleExporter.cxx
2/// \author Giacomo Parolini <giacomo.parolini@cern.ch>
3/// \date 2024-12-10
4/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
5/// is welcome!
6
7/*************************************************************************
8 * Copyright (C) 1995-2024, Rene Brun and Fons Rademakers. *
9 * All rights reserved. *
10 * *
11 * For the licensing terms see $ROOTSYS/LICENSE. *
12 * For the list of contributors see $ROOTSYS/README/CREDITS. *
13 *************************************************************************/
14
15#include <ROOT/RError.hxx>
17#include <ROOT/RPageStorage.hxx>
19#include <ROOT/RNTupleZip.hxx>
20#include <ROOT/RClusterPool.hxx>
21#include <ROOT/RLogger.hxx>
22#include <fstream>
23#include <sstream>
24#include <vector>
25
27
28namespace {
29
30ROOT::RLogChannel &RNTupleExporterLog()
31{
32 static RLogChannel sLog("ROOT.RNTupleExporter");
33 return sLog;
34}
35
36struct RColumnExportInfo {
37 const ROOT::RColumnDescriptor *fColDesc;
38 const ROOT::RFieldDescriptor *fFieldDesc;
39 std::string fQualifiedName;
40
41 RColumnExportInfo(const ROOT::RNTupleDescriptor &desc, const ROOT::RColumnDescriptor &colDesc,
42 const ROOT::RFieldDescriptor &fieldDesc)
43 : fColDesc(&colDesc),
44 fFieldDesc(&fieldDesc),
45 // NOTE: we don't need to keep the column representation index into account because exactly 1 representation
46 // is active per page, so there is no risk of name collisions.
47 fQualifiedName(desc.GetQualifiedFieldName(fieldDesc.GetId()) + '-' + std::to_string(colDesc.GetIndex()))
48 {
49 }
50};
51
52struct RAddColumnsResult {
53 int fNColsTotal = 0;
54
55 RAddColumnsResult &operator+=(const RAddColumnsResult &other)
56 {
57 fNColsTotal += other.fNColsTotal;
58 return *this;
59 }
60};
61
62template <typename T>
63bool ItemIsFilteredOut(const RNTupleExporter::RFilter<T> &filter, const T &item)
64{
65 bool filterHasType = filter.fSet.find(item) != filter.fSet.end();
66 bool isFiltered = (filter.fType == RNTupleExporter::EFilterType::kBlacklist) == filterHasType;
67 return isFiltered;
68}
69
70RAddColumnsResult AddColumnsFromField(std::vector<RColumnExportInfo> &vec, const ROOT::RNTupleDescriptor &desc,
71 const ROOT::RFieldDescriptor &fieldDesc,
72 const RNTupleExporter::RPagesOptions &options)
73{
74 R__LOG_DEBUG(1, RNTupleExporterLog()) << "processing field \"" << desc.GetQualifiedFieldName(fieldDesc.GetId())
75 << "\"";
76
77 RAddColumnsResult res{};
78
79 for (const auto &subfieldDesc : desc.GetFieldIterable(fieldDesc)) {
80 if (subfieldDesc.IsProjectedField())
81 continue;
82
83 for (const auto &colDesc : desc.GetColumnIterable(subfieldDesc)) {
84 // Filter columns by type
85 bool typeIsFiltered = ItemIsFilteredOut(options.fColumnTypeFilter, colDesc.GetType());
86 if (!typeIsFiltered)
87 vec.emplace_back(desc, colDesc, subfieldDesc);
88 res.fNColsTotal += 1;
89 }
90 res += AddColumnsFromField(vec, desc, subfieldDesc, options);
91 }
92
93 return res;
94}
95
96int CountPages(const ROOT::RNTupleDescriptor &desc, std::span<const RColumnExportInfo> columns)
97{
98 int nPages = 0;
99 auto clusterId = desc.FindClusterId(0, 0);
100 while (clusterId != kInvalidDescriptorId) {
101 const auto &clusterDesc = desc.GetClusterDescriptor(clusterId);
102 for (const auto &colInfo : columns) {
103 const auto &pages = clusterDesc.GetPageRange(colInfo.fColDesc->GetPhysicalId());
104 nPages += pages.GetPageInfos().size();
105 }
106 clusterId = desc.FindNextClusterId(clusterId);
107 }
108 return nPages;
109}
110
111} // namespace
112
115{
117 throw ROOT::RException(R__FAIL("exporting checksums is incompatible with decompressing the pages"));
118
119 RPagesResult res = {};
120
121 // make sure the source is attached
122 source.Attach();
123
124 auto desc = source.GetSharedDescriptorGuard();
125 ROOT::Internal::RClusterPool clusterPool{source};
126
127 // Collect column info
128 std::vector<RColumnExportInfo> columnInfos;
129 const RAddColumnsResult addColRes = AddColumnsFromField(columnInfos, desc.GetRef(), desc->GetFieldZero(), options);
130
131 // Collect ColumnSet for the cluster pool query
133 columnSet.reserve(columnInfos.size());
134 for (const auto &colInfo : columnInfos) {
135 columnSet.emplace(colInfo.fColDesc->GetPhysicalId());
136 }
137
138 const auto nPages = CountPages(desc.GetRef(), columnInfos);
139
140 const bool showProgress = (options.fFlags & RPagesOptions::kShowProgressBar) != 0;
141 res.fExportedFileNames.reserve(nPages);
142
143 // Iterate over the clusters in order and dump pages
144 auto clusterId = nPages > 0 ? desc->FindClusterId(0, 0) : ROOT::kInvalidDescriptorId;
145 int pagesExported = 0;
146 int prevIntPercent = 0;
147 std::vector<char> unzipBuf; // Only used when pages get decompressed
148 while (clusterId != ROOT::kInvalidDescriptorId) {
149 const auto &clusterDesc = desc->GetClusterDescriptor(clusterId);
150 const ROOT::Internal::RCluster *cluster = clusterPool.GetCluster(clusterId, columnSet);
151 for (const auto &colInfo : columnInfos) {
152 auto columnId = colInfo.fColDesc->GetPhysicalId();
153 const auto &pages = clusterDesc.GetPageRange(columnId);
154 const auto &colRange = clusterDesc.GetColumnRange(columnId);
155 auto colElement = ROOT::Internal::RColumnElementBase::Generate<void>(colInfo.fColDesc->GetType());
156 colElement->SetBitsOnStorage(colInfo.fColDesc->GetBitsOnStorage());
157
158 std::uint64_t pageIdx = 0;
159
160 R__LOG_DEBUG(0, RNTupleExporterLog())
161 << "exporting column \"" << colInfo.fQualifiedName << "\" (" << pages.GetPageInfos().size() << " pages)";
162
163 // We should never try to export a suppressed column range
164 assert(!colRange.IsSuppressed() || pages.GetPageInfos().empty());
165
166 for (const auto &pageInfo : pages.GetPageInfos()) {
167 ROOT::Internal::ROnDiskPage::Key key{columnId, pageIdx};
168 const ROOT::Internal::ROnDiskPage *onDiskPage = cluster->GetOnDiskPage(key);
169
170 // prepare the output file
171 std::ostringstream ss{options.fOutputPath, std::ios_base::ate};
172 assert(colRange.GetCompressionSettings());
173 ss << "/cluster_" << clusterDesc.GetId() << "_" << colInfo.fQualifiedName << "_page_" << pageIdx
174 << "_elems_" << pageInfo.GetNElements() << "_comp_" << *colRange.GetCompressionSettings() << ".page";
175 const auto outFileName = ss.str();
176 std::ofstream outFile{outFileName, std::ios_base::binary};
177 if (!outFile) {
178 throw ROOT::RException(
179 R__FAIL(std::string("output path ") + options.fOutputPath + " does not exist or is not writable!"));
180 }
181
182 // dump the page
183 const auto *pageBuf = static_cast<const char *>(onDiskPage->GetAddress());
184 if (options.fFlags & RPagesOptions::kDecompress) {
185 const auto nbytesPacked = colElement->GetPackedSize(pageInfo.GetNElements());
186 const auto nbytesData = pageInfo.GetLocator().GetNBytesOnStorage();
187 if (unzipBuf.size() < nbytesPacked)
188 unzipBuf.resize(nbytesPacked);
189 ROOT::Internal::RNTupleDecompressor::Unzip(pageBuf, nbytesData, nbytesPacked, &unzipBuf[0]);
190 outFile.write(unzipBuf.data(), nbytesPacked);
191 } else {
192 const bool includeChecksum =
193 (options.fFlags & RPagesOptions::kIncludeChecksums) != 0 && pageInfo.HasChecksum();
194 const std::size_t maybeChecksumSize = includeChecksum * 8;
195 const auto nbytesData = pageInfo.GetLocator().GetNBytesOnStorage() + maybeChecksumSize;
196 outFile.write(pageBuf, nbytesData);
197 }
198
199 res.fExportedFileNames.push_back(outFileName);
200
201 ++pageIdx, ++pagesExported;
202
203 if (showProgress) {
204 int intPercent = static_cast<int>(100.f * pagesExported / res.fExportedFileNames.size());
205 if (intPercent != prevIntPercent) {
206 fprintf(stderr, "\rExport progress: %02d%%", intPercent);
207 if (intPercent == 100)
208 fprintf(stderr, "\n");
209 prevIntPercent = intPercent;
210 }
211 }
212 }
213 }
214 clusterId = desc->FindNextClusterId(clusterId);
215 }
216
217 assert(res.fExportedFileNames.size() == static_cast<size_t>(pagesExported));
218 std::ostringstream ss;
219 ss << "exported " << res.fExportedFileNames.size() << " pages (";
220 if (options.fColumnTypeFilter.fSet.empty()) {
221 ss << addColRes.fNColsTotal << " columns)";
222 } else {
223 auto nColsFilteredOut = addColRes.fNColsTotal - columnInfos.size();
224 ss << nColsFilteredOut << "/" << addColRes.fNColsTotal << " columns filtered out)";
225 }
226 R__LOG_INFO(RNTupleExporterLog()) << ss.str();
227
228 return res;
229}
230
231} // namespace ROOT::Experimental::Internal
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:299
#define R__LOG_DEBUG(DEBUGLEVEL,...)
Definition RLogger.hxx:359
#define R__LOG_INFO(...)
Definition RLogger.hxx:358
static void AddColumnsFromField(std::vector< RColumnMergeInfo > &columns, const ROOT::RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData, const ROOT::RFieldDescriptor &srcFieldDesc, const ROOT::RFieldDescriptor &dstFieldDesc, const std::string &prefix="")
std::string & operator+=(std::string &left, const TString &right)
Definition TString.h:495
@ kBlacklist
Don't export items contained in the filter's set.
static RPagesResult ExportPages(ROOT::Internal::RPageSource &source, const RPagesOptions &options={})
Given a page source, writes all its pages to individual files (1 per page).
Managed a set of clusters containing compressed and packed pages.
RCluster * GetCluster(ROOT::DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:147
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:149
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
Definition RCluster.cxx:30
static std::unique_ptr< RColumnElementBase > Generate(ROOT::ENTupleColumnType type)
If CppT == void, use the default C++ type for the given column type.
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:40
const void * GetAddress() const
Definition RCluster.hxx:62
Abstract interface to read data from an ntuple.
void Attach(ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode mode=ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode::kForReading)
Open the physical storage container and deserialize header and footer.
const RSharedDescriptorGuard GetSharedDescriptorGuard() const
Takes the read lock for the descriptor.
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
ROOT::DescriptorId_t GetId() const
ROOT::DescriptorId_t FindNextClusterId(ROOT::DescriptorId_t clusterId) const
RFieldDescriptorIterable GetFieldIterable(const RFieldDescriptor &fieldDesc) const
RColumnDescriptorIterable GetColumnIterable() const
ROOT::DescriptorId_t FindClusterId(ROOT::NTupleSize_t entryIdx) const
const RClusterDescriptor & GetClusterDescriptor(ROOT::DescriptorId_t clusterId) const
std::string GetQualifiedFieldName(ROOT::DescriptorId_t fieldId) const
Walks up the parents of the field ID and returns a field name of the form a.b.c.d In case of invalid ...
const RFieldDescriptor & GetFieldZero() const
constexpr DescriptorId_t kInvalidDescriptorId
RFilter< ENTupleColumnType > fColumnTypeFilter
Optional filter that determines which columns are included or excluded from being exported.
@ kDecompress
If enabled, uncompress (but don't unpack) the page (mutually exclusive with kIncludeChecksums).
@ kShowProgressBar
If enabled, the exporter will report the current progress on the stderr.
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:50