Logo ROOT  
Reference Guide
Loading...
Searching...
No Matches
RNTupleParallelWriter.cxx
Go to the documentation of this file.
1/// \file RNTupleParallelWriter.cxx
2/// \author Jonas Hahnfeld <jonas.hahnfeld@cern.ch>
3/// \date 2024-02-01
4
5/*************************************************************************
6 * Copyright (C) 1995-2024, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
14
16#include <ROOT/RNTupleModel.hxx>
17#include <ROOT/RNTupleUtils.hxx>
19#include <ROOT/RPageSinkBuf.hxx>
20#include <ROOT/RPageStorage.hxx>
22
23#include <TDirectory.h>
24#include <TError.h>
25#include <TFile.h>
26#include <TROOT.h>
27
28namespace {
29
38
39/// An internal RPageSink that enables multiple RNTupleFillContext to write into a single common RPageSink.
40///
41/// The setup with two contexts looks as follows:
42///
43/// +------ owned by RNTupleFillContext ------+
44/// | |
45/// RPageSinkBuf --- forwards to ---> RPageSynchronizingSink ---+
46/// (and owns) |
47/// (via raw fInnerSink ptr) +-- RPageSink (usually a persistent sink)
48/// |
49/// RPageSinkBuf --- forwards to ---> RPageSynchronizingSink ---+
50/// | (and owns) |
51/// | |
52/// +------ owned by RNTupleFillContext ------+
53///
54/// The mutex used by the synchronizing sinks is owned by the RNTupleParallelWriter that also owns the original model,
55/// the "final" sink (usually a persistent sink) and keeps weak_ptr's of the contexts (to make sure they are destroyed
56/// before the writer is destructed).
57class RPageSynchronizingSink : public RPageSink {
58private:
59 /// The wrapped inner sink, not owned by this class.
60 RPageSink *fInnerSink;
61 std::mutex *fMutex;
62
63public:
64 explicit RPageSynchronizingSink(RPageSink &inner, std::mutex &mutex)
65 : RPageSink(inner.GetNTupleName(), inner.GetWriteOptions()), fInnerSink(&inner), fMutex(&mutex)
66 {
67 // Do not observe the sink's metrics: It will contain some counters for all threads, which is misleading for the
68 // users.
69 // fMetrics.ObserveMetrics(fSink->GetMetrics());
70 }
71 RPageSynchronizingSink(const RPageSynchronizingSink &) = delete;
72 RPageSynchronizingSink &operator=(const RPageSynchronizingSink &) = delete;
73
74 const RNTupleDescriptor &GetDescriptor() const final { return fInnerSink->GetDescriptor(); }
75
76 NTupleSize_t GetNEntries() const final { return fInnerSink->GetNEntries(); }
77
78 ColumnHandle_t AddColumn(DescriptorId_t, RColumn &) final { return {}; }
79 void InitImpl(ROOT::RNTupleModel &) final {}
80 void UpdateSchema(const RNTupleModelChangeset &, NTupleSize_t) final
81 {
82 throw ROOT::RException(R__FAIL("UpdateSchema not supported via RPageSynchronizingSink"));
83 }
84 void UpdateExtraTypeInfo(const RExtraTypeInfoDescriptor &) final
85 {
86 throw ROOT::RException(R__FAIL("UpdateExtraTypeInfo not supported via RPageSynchronizingSink"));
87 }
88
89 void CommitSuppressedColumn(ColumnHandle_t handle) final { fInnerSink->CommitSuppressedColumn(handle); }
90 void CommitPage(ColumnHandle_t, const RPage &) final
91 {
92 throw ROOT::RException(R__FAIL("should never commit single pages via RPageSynchronizingSink"));
93 }
94 void CommitSealedPage(DescriptorId_t, const RSealedPage &) final
95 {
96 throw ROOT::RException(R__FAIL("should never commit sealed pages via RPageSynchronizingSink"));
97 }
98 void CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges) final
99 {
100 fInnerSink->CommitSealedPageV(ranges);
101 }
102 std::uint64_t CommitCluster(NTupleSize_t nNewEntries) final { return fInnerSink->CommitCluster(nNewEntries); }
103 RStagedCluster StageCluster(NTupleSize_t nNewEntries) final { return fInnerSink->StageCluster(nNewEntries); }
104 void CommitStagedClusters(std::span<RStagedCluster> clusters) final { fInnerSink->CommitStagedClusters(clusters); }
105 void CommitClusterGroup() final
106 {
107 throw ROOT::RException(R__FAIL("should never commit cluster group via RPageSynchronizingSink"));
108 }
109
110 ROOT::Internal::RNTupleLink CommitDatasetImpl() final
111 {
112 throw ROOT::RException(R__FAIL("should never commit dataset via RPageSynchronizingSink"));
113 }
114
115 RSinkGuard GetSinkGuard() final { return RSinkGuard(fMutex); }
116
117 std::unique_ptr<RPageSink> CloneAsHidden(std::string_view, const ROOT::RNTupleWriteOptions &) const final
118 {
119 throw ROOT::RException(R__FAIL("cloning a RPageSynchronizingSink is not implemented yet"));
120 }
121
122 void CommitAttributeSet(std::string_view, const ROOT::Internal::RNTupleLink &) final
123 {
124 throw ROOT::RException(R__FAIL("committing attribute sets is not implemented yet for parallel writing"));
125 }
126};
127
128} // namespace
129
130ROOT::RNTupleParallelWriter::RNTupleParallelWriter(std::unique_ptr<ROOT::RNTupleModel> model,
131 std::unique_ptr<RPageSink> sink)
132 : fSink(std::move(sink)), fModel(std::move(model)), fMetrics("RNTupleParallelWriter")
133{
134 if (fModel->GetRegisteredSubfieldNames().size() > 0) {
135 throw RException(R__FAIL("cannot create an RNTupleParallelWriter from a model with registered subfields"));
136 }
137 fModel->Freeze();
138 fSink->Init(*fModel.get());
139 fMetrics.ObserveMetrics(fSink->GetMetrics());
140}
141
143{
144 try {
146 } catch (const RException &err) {
147 R__LOG_ERROR(ROOT::Internal::NTupleLog()) << "failure committing ntuple: " << err.GetError().GetReport();
148 }
149}
150
152{
153 if (fModel->IsExpired())
154 return;
155
156 for (const auto &context : fFillContexts) {
157 if (!context.expired()) {
158 throw RException(R__FAIL("RNTupleFillContext has not been destructed"));
159 }
160 }
161
162 // Now commit all clusters as a cluster group and then the dataset.
163 fSink->CommitClusterGroup();
164 fSink->CommitDataset();
165 fModel->Expire();
166}
167
168std::unique_ptr<ROOT::RNTupleParallelWriter>
169ROOT::RNTupleParallelWriter::Recreate(std::unique_ptr<ROOT::RNTupleModel> model, std::string_view ntupleName,
170 std::string_view storage, const ROOT::RNTupleWriteOptions &options)
171{
172 if (!options.GetUseBufferedWrite()) {
173 throw RException(R__FAIL("parallel writing requires buffering"));
174 }
175
176 auto sink = ROOT::Internal::RPagePersistentSink::Create(ntupleName, storage, options);
177 // Cannot use std::make_unique because the constructor of RNTupleParallelWriter is private.
178 return std::unique_ptr<RNTupleParallelWriter>(new RNTupleParallelWriter(std::move(model), std::move(sink)));
179}
180
181std::unique_ptr<ROOT::RNTupleParallelWriter>
182ROOT::RNTupleParallelWriter::Append(std::unique_ptr<ROOT::RNTupleModel> model, std::string_view ntupleName,
183 TDirectory &fileOrDirectory, const ROOT::RNTupleWriteOptions &options)
184{
185 auto file = fileOrDirectory.GetFile();
186 if (!file) {
187 throw RException(
188 R__FAIL("RNTupleParallelWriter only supports writing to a ROOT file. Cannot write into a directory "
189 "that is not backed by a file"));
190 }
191 if (!file->IsBinary()) {
192 throw RException(R__FAIL("RNTupleParallelWriter only supports writing to a ROOT file. Cannot write into " +
193 std::string(file->GetName())));
194 }
195 if (!file->IsWritable()) {
196 throw RException(R__FAIL("The file '" + std::string(file->GetName()) +
197 "' given to RNTupleParallelWriter is not writable. Open it with 'UPDATE' or 'RECREATE' "
198 "if you want to write into it."));
199 }
200 if (!options.GetUseBufferedWrite()) {
201 throw RException(R__FAIL("parallel writing requires buffering"));
202 }
203
204 auto sink = std::make_unique<ROOT::Internal::RPageSinkFile>(ntupleName, fileOrDirectory, options);
205 // Cannot use std::make_unique because the constructor of RNTupleParallelWriter is private.
206 return std::unique_ptr<RNTupleParallelWriter>(new RNTupleParallelWriter(std::move(model), std::move(sink)));
207}
208
209std::shared_ptr<ROOT::RNTupleFillContext> ROOT::RNTupleParallelWriter::CreateFillContext()
210{
211 std::lock_guard g(fMutex);
212
213 auto model = fModel->Clone();
214 auto sink =
215 std::make_unique<ROOT::Internal::RPageSinkBuf>(std::make_unique<RPageSynchronizingSink>(*fSink, fSinkMutex));
216
217 // Cannot use std::make_shared because the constructor of RNTupleFillContext is private. Also it would mean that the
218 // (direct) memory of all contexts stays around until the vector of weak_ptr's is cleared.
219 std::shared_ptr<RNTupleFillContext> context(new RNTupleFillContext(std::move(model), std::move(sink)));
220 fFillContexts.push_back(context);
221
222#ifdef R__USE_IMT
223 if (IsImplicitMTEnabled() &&
224 fSink->GetWriteOptions().GetUseImplicitMT() == ROOT::RNTupleWriteOptions::EImplicitMT::kOn) {
225 context->fZipTasks = std::make_unique<ROOT::Experimental::Internal::RNTupleImtTaskScheduler>();
226 context->fSink->SetTaskScheduler(context->fZipTasks.get());
227 }
228#endif
229
230 return context;
231}
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:299
#define R__LOG_ERROR(...)
Definition RLogger.hxx:356
#define g(i)
Definition RSha256.hxx:105
Double_t err
Binding & operator=(OUT(*fun)(void))
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
Definition RColumn.hxx:37
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Guess the concrete derived page source from the location.
Abstract interface to write data into an ntuple.
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:43
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
Field specific extra type information from the header / extenstion header.
The on-storage metadata of an RNTuple.
A context for filling entries (data) into clusters of an RNTuple.
static std::unique_ptr< RNTupleParallelWriter > Recreate(std::unique_ptr< ROOT::RNTupleModel > model, std::string_view ntupleName, std::string_view storage, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Recreate a new file and return a writer to write an RNTuple.
std::vector< std::weak_ptr< RNTupleFillContext > > fFillContexts
List of all created helpers. They must be destroyed before this RNTupleParallelWriter is destructed.
static std::unique_ptr< RNTupleParallelWriter > Append(std::unique_ptr< ROOT::RNTupleModel > model, std::string_view ntupleName, TDirectory &fileOrDirectory, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Append an RNTuple to the existing file.
Experimental::Detail::RNTupleMetrics fMetrics
void CommitDataset()
Automatically called by the destructor.
std::mutex fMutex
A global mutex to protect the internal data structures of this object.
std::unique_ptr< ROOT::Internal::RPageSink > fSink
The final RPageSink that represents the synchronization point.
RNTupleParallelWriter(std::unique_ptr< ROOT::RNTupleModel > model, std::unique_ptr< ROOT::Internal::RPageSink > sink)
std::shared_ptr< RNTupleFillContext > CreateFillContext()
Create a new RNTupleFillContext that can be used to fill entries and prepare clusters in parallel.
std::unique_ptr< ROOT::RNTupleModel > fModel
The original RNTupleModel connected to fSink; needs to be destructed before it.
std::mutex fSinkMutex
A mutex to synchronize the final page sink.
Common user-tunable settings for storing RNTuples.
Abstract interface to write data into an ntuple.
Describe directory structure in memory.
Definition TDirectory.h:45
virtual TFile * GetFile() const
Definition TDirectory.h:221
ROOT::RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:669
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
The incremental changes to a RNTupleModel.