Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleParallelWriter.cxx
Go to the documentation of this file.
1/// \file RNTupleParallelWriter.cxx
2/// \ingroup NTuple
3/// \author Jonas Hahnfeld <jonas.hahnfeld@cern.ch>
4/// \date 2024-02-01
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2024, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
17
19#include <ROOT/RNTupleModel.hxx>
20#include <ROOT/RNTupleUtils.hxx>
22#include <ROOT/RPageSinkBuf.hxx>
23#include <ROOT/RPageStorage.hxx>
25
26#include <TDirectory.h>
27#include <TError.h>
28#include <TFile.h>
29#include <TROOT.h>
30
31namespace {
32
41
42/// An internal RPageSink that enables multiple RNTupleFillContext to write into a single common RPageSink.
43///
44/// The setup with two contexts looks as follows:
45///
46/// +------ owned by RNTupleFillContext ------+
47/// | |
48/// RPageSinkBuf --- forwards to ---> RPageSynchronizingSink ---+
49/// (and owns) |
50/// (via raw fInnerSink ptr) +-- RPageSink (usually a persistent sink)
51/// |
52/// RPageSinkBuf --- forwards to ---> RPageSynchronizingSink ---+
53/// | (and owns) |
54/// | |
55/// +------ owned by RNTupleFillContext ------+
56///
57/// The mutex used by the synchronizing sinks is owned by the RNTupleParallelWriter that also owns the original model,
58/// the "final" sink (usually a persistent sink) and keeps weak_ptr's of the contexts (to make sure they are destroyed
59/// before the writer is destructed).
60class RPageSynchronizingSink : public RPageSink {
61private:
62 /// The wrapped inner sink, not owned by this class.
63 RPageSink *fInnerSink;
64 std::mutex *fMutex;
65
66public:
67 explicit RPageSynchronizingSink(RPageSink &inner, std::mutex &mutex)
68 : RPageSink(inner.GetNTupleName(), inner.GetWriteOptions()), fInnerSink(&inner), fMutex(&mutex)
69 {
70 // Do not observe the sink's metrics: It will contain some counters for all threads, which is misleading for the
71 // users.
72 // fMetrics.ObserveMetrics(fSink->GetMetrics());
73 }
74 RPageSynchronizingSink(const RPageSynchronizingSink &) = delete;
75 RPageSynchronizingSink &operator=(const RPageSynchronizingSink &) = delete;
76
77 const RNTupleDescriptor &GetDescriptor() const final { return fInnerSink->GetDescriptor(); }
78
79 NTupleSize_t GetNEntries() const final { return fInnerSink->GetNEntries(); }
80
81 ColumnHandle_t AddColumn(DescriptorId_t, RColumn &) final { return {}; }
82 void InitImpl(ROOT::RNTupleModel &) final {}
83 void UpdateSchema(const RNTupleModelChangeset &, NTupleSize_t) final
84 {
85 throw ROOT::RException(R__FAIL("UpdateSchema not supported via RPageSynchronizingSink"));
86 }
87 void UpdateExtraTypeInfo(const RExtraTypeInfoDescriptor &) final
88 {
89 throw ROOT::RException(R__FAIL("UpdateExtraTypeInfo not supported via RPageSynchronizingSink"));
90 }
91
92 void CommitSuppressedColumn(ColumnHandle_t handle) final { fInnerSink->CommitSuppressedColumn(handle); }
93 void CommitPage(ColumnHandle_t, const RPage &) final
94 {
95 throw ROOT::RException(R__FAIL("should never commit single pages via RPageSynchronizingSink"));
96 }
97 void CommitSealedPage(DescriptorId_t, const RSealedPage &) final
98 {
99 throw ROOT::RException(R__FAIL("should never commit sealed pages via RPageSynchronizingSink"));
100 }
101 void CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges) final
102 {
103 fInnerSink->CommitSealedPageV(ranges);
104 }
105 std::uint64_t CommitCluster(NTupleSize_t nNewEntries) final { return fInnerSink->CommitCluster(nNewEntries); }
106 RStagedCluster StageCluster(NTupleSize_t nNewEntries) final { return fInnerSink->StageCluster(nNewEntries); }
107 void CommitStagedClusters(std::span<RStagedCluster> clusters) final { fInnerSink->CommitStagedClusters(clusters); }
108 void CommitClusterGroup() final
109 {
110 throw ROOT::RException(R__FAIL("should never commit cluster group via RPageSynchronizingSink"));
111 }
112 void CommitDatasetImpl() final
113 {
114 throw ROOT::RException(R__FAIL("should never commit dataset via RPageSynchronizingSink"));
115 }
116
117 RSinkGuard GetSinkGuard() final { return RSinkGuard(fMutex); }
118};
119
120} // namespace
121
123 std::unique_ptr<RPageSink> sink)
124 : fSink(std::move(sink)), fModel(std::move(model)), fMetrics("RNTupleParallelWriter")
125{
126 if (fModel->GetRegisteredSubfieldNames().size() > 0) {
127 throw RException(R__FAIL("cannot create an RNTupleParallelWriter from a model with registered subfields"));
128 }
129 fModel->Freeze();
130 fSink->Init(*fModel.get());
131 fMetrics.ObserveMetrics(fSink->GetMetrics());
132}
133
135{
136 try {
137 CommitDataset();
138 } catch (const RException &err) {
139 R__LOG_ERROR(ROOT::Internal::NTupleLog()) << "failure committing ntuple: " << err.GetError().GetReport();
140 }
141}
142
144{
145 if (fModel->IsExpired())
146 return;
147
148 for (const auto &context : fFillContexts) {
149 if (!context.expired()) {
150 throw RException(R__FAIL("RNTupleFillContext has not been destructed"));
151 }
152 }
153
154 // Now commit all clusters as a cluster group and then the dataset.
155 fSink->CommitClusterGroup();
156 fSink->CommitDataset();
157 fModel->Expire();
158}
159
160std::unique_ptr<ROOT::Experimental::RNTupleParallelWriter>
161ROOT::Experimental::RNTupleParallelWriter::Recreate(std::unique_ptr<ROOT::RNTupleModel> model,
162 std::string_view ntupleName, std::string_view storage,
163 const ROOT::RNTupleWriteOptions &options)
164{
165 if (!options.GetUseBufferedWrite()) {
166 throw RException(R__FAIL("parallel writing requires buffering"));
167 }
168
170 // Cannot use std::make_unique because the constructor of RNTupleParallelWriter is private.
171 return std::unique_ptr<RNTupleParallelWriter>(new RNTupleParallelWriter(std::move(model), std::move(sink)));
172}
173
174std::unique_ptr<ROOT::Experimental::RNTupleParallelWriter>
175ROOT::Experimental::RNTupleParallelWriter::Append(std::unique_ptr<ROOT::RNTupleModel> model,
176 std::string_view ntupleName, TDirectory &fileOrDirectory,
177 const ROOT::RNTupleWriteOptions &options)
178{
179 auto file = fileOrDirectory.GetFile();
180 if (!file) {
181 throw RException(
182 R__FAIL("RNTupleParallelWriter only supports writing to a ROOT file. Cannot write into a directory "
183 "that is not backed by a file"));
184 }
185 if (!file->IsBinary()) {
186 throw RException(R__FAIL("RNTupleParallelWriter only supports writing to a ROOT file. Cannot write into " +
187 std::string(file->GetName())));
188 }
189 if (!options.GetUseBufferedWrite()) {
190 throw RException(R__FAIL("parallel writing requires buffering"));
191 }
192
193 auto sink = std::make_unique<ROOT::Internal::RPageSinkFile>(ntupleName, fileOrDirectory, options);
194 // Cannot use std::make_unique because the constructor of RNTupleParallelWriter is private.
195 return std::unique_ptr<RNTupleParallelWriter>(new RNTupleParallelWriter(std::move(model), std::move(sink)));
196}
197
198std::shared_ptr<ROOT::Experimental::RNTupleFillContext> ROOT::Experimental::RNTupleParallelWriter::CreateFillContext()
199{
200 std::lock_guard g(fMutex);
201
202 auto model = fModel->Clone();
203 auto sink =
204 std::make_unique<ROOT::Internal::RPageSinkBuf>(std::make_unique<RPageSynchronizingSink>(*fSink, fSinkMutex));
205
206 // Cannot use std::make_shared because the constructor of RNTupleFillContext is private. Also it would mean that the
207 // (direct) memory of all contexts stays around until the vector of weak_ptr's is cleared.
208 std::shared_ptr<RNTupleFillContext> context(new RNTupleFillContext(std::move(model), std::move(sink)));
209 fFillContexts.push_back(context);
210
211#ifdef R__USE_IMT
212 if (IsImplicitMTEnabled() &&
213 fSink->GetWriteOptions().GetUseImplicitMT() == ROOT::RNTupleWriteOptions::EImplicitMT::kOn) {
214 context->fZipTasks = std::make_unique<ROOT::Experimental::Internal::RNTupleImtTaskScheduler>();
215 context->fSink->SetTaskScheduler(context->fZipTasks.get());
216 }
217#endif
218
219 return context;
220}
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:300
#define R__LOG_ERROR(...)
Definition RLogger.hxx:357
#define g(i)
Definition RSha256.hxx:105
Binding & operator=(OUT(*fun)(void))
void ObserveMetrics(RNTupleMetrics &observee)
A context for filling entries (data) into clusters of an RNTuple.
A writer to fill an RNTuple from multiple contexts.
static std::unique_ptr< RNTupleParallelWriter > Append(std::unique_ptr< ROOT::RNTupleModel > model, std::string_view ntupleName, TDirectory &fileOrDirectory, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Append an ntuple to the existing file, which must not be accessed while data is filled into any creat...
static std::unique_ptr< RNTupleParallelWriter > Recreate(std::unique_ptr< ROOT::RNTupleModel > model, std::string_view ntupleName, std::string_view storage, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Recreate a new file and return a writer to write an ntuple.
std::shared_ptr< RNTupleFillContext > CreateFillContext()
Create a new RNTupleFillContext that can be used to fill entries and prepare clusters in parallel.
RNTupleParallelWriter(std::unique_ptr< ROOT::RNTupleModel > model, std::unique_ptr< ROOT::Internal::RPageSink > sink)
std::unique_ptr< ROOT::RNTupleModel > fModel
The original RNTupleModel connected to fSink; needs to be destructed before it.
void CommitDataset()
Automatically called by the destructor.
std::unique_ptr< ROOT::Internal::RPageSink > fSink
The final RPageSink that represents the synchronization point.
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
Definition RColumn.hxx:38
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Guess the concrete derived page source from the location.
Abstract interface to write data into an ntuple.
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:44
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
const RError & GetError() const
Definition RError.hxx:84
Field specific extra type information from the header / extenstion header.
The on-storage metadata of an RNTuple.
The RNTupleModel encapulates the schema of an RNTuple.
Common user-tunable settings for storing RNTuples.
Describe directory structure in memory.
Definition TDirectory.h:45
ROOT::RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:600
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
The incremental changes to a RNTupleModel