54class RPageSynchronizingSink :
public RPageSink {
57 RPageSink *fInnerSink;
61 explicit RPageSynchronizingSink(RPageSink &inner, std::mutex &mutex)
62 : RPageSink(inner.GetNTupleName(), inner.GetWriteOptions()), fInnerSink(&inner), fMutex(&mutex)
68 RPageSynchronizingSink(
const RPageSynchronizingSink &) =
delete;
69 RPageSynchronizingSink &
operator=(
const RPageSynchronizingSink &) =
delete;
71 const RNTupleDescriptor &GetDescriptor() const final {
return fInnerSink->GetDescriptor(); }
73 ColumnHandle_t AddColumn(DescriptorId_t,
const RColumn &)
final {
return {}; }
74 void InitImpl(RNTupleModel &)
final {}
75 void UpdateSchema(
const RNTupleModelChangeset &, NTupleSize_t)
final
77 throw RException(
R__FAIL(
"UpdateSchema not supported via RPageSynchronizingSink"));
80 void CommitPage(ColumnHandle_t,
const RPage &)
final
82 throw RException(
R__FAIL(
"should never commit single pages via RPageSynchronizingSink"));
84 void CommitSealedPage(DescriptorId_t,
const RSealedPage &)
final
86 throw RException(
R__FAIL(
"should never commit sealed pages via RPageSynchronizingSink"));
88 void CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges)
final
90 fInnerSink->CommitSealedPageV(ranges);
92 std::uint64_t CommitCluster(NTupleSize_t nNewEntries)
final {
return fInnerSink->CommitCluster(nNewEntries); }
93 void CommitClusterGroup() final
95 throw RException(
R__FAIL(
"should never commit cluster group via RPageSynchronizingSink"));
97 void CommitDataset() final {
throw RException(
R__FAIL(
"should never commit dataset via RPageSynchronizingSink")); }
99 RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements)
final
101 return fInnerSink->ReservePage(columnHandle, nElements);
103 void ReleasePage(RPage &page)
final { fInnerSink->ReleasePage(page); }
105 RSinkGuard GetSinkGuard() final {
return RSinkGuard(fMutex); }
111 std::unique_ptr<Internal::RPageSink> sink)
112 : fSink(std::move(sink)), fModel(std::move(model)), fMetrics(
"RNTupleParallelWriter")
121 for (
const auto &context : fFillContexts) {
122 if (!context.expired()) {
130 fSink->CommitClusterGroup();
131 fSink->CommitDataset();
137std::unique_ptr<ROOT::Experimental::RNTupleParallelWriter>
147 return std::unique_ptr<RNTupleParallelWriter>(
new RNTupleParallelWriter(std::move(model), std::move(sink)));
150std::unique_ptr<ROOT::Experimental::RNTupleParallelWriter>
158 auto sink = std::make_unique<Internal::RPageSinkFile>(ntupleName, file, options);
160 return std::unique_ptr<RNTupleParallelWriter>(
new RNTupleParallelWriter(std::move(model), std::move(sink)));
165 std::lock_guard
g(fMutex);
167 auto model = fModel->Clone();
171 auto sink = std::make_unique<Internal::RPageSinkBuf>(std::make_unique<RPageSynchronizingSink>(*fSink, fSinkMutex));
175 std::shared_ptr<RNTupleFillContext> context(
new RNTupleFillContext(std::move(model), std::move(sink)));
176 fFillContexts.push_back(context);
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
#define R__LOG_ERROR(...)
Binding & operator=(OUT(*fun)(void))
void ObserveMetrics(RNTupleMetrics &observee)
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const RNTupleWriteOptions &options=RNTupleWriteOptions())
Guess the concrete derived page source from the location.
Abstract interface to write data into an ntuple.
A page is a slice of a column that is mapped into memory.
std::string GetReport() const
Format a dignostics report, e.g. for an exception message.
Base class for all ROOT issued exceptions.
const RError & GetError() const
The on-storage meta-data of an ntuple.
A context for filling entries (data) into clusters of an RNTuple.
The RNTupleModel encapulates the schema of an ntuple.
A writer to fill an RNTuple from multiple contexts.
RNTupleParallelWriter(std::unique_ptr< RNTupleModel > model, std::unique_ptr< Internal::RPageSink > sink)
std::unique_ptr< RNTupleModel > fModel
The original RNTupleModel connected to fSink; needs to be destructed before it.
std::unique_ptr< Internal::RPageSink > fSink
The final RPageSink that represents the synchronization point.
static std::unique_ptr< RNTupleParallelWriter > Recreate(std::unique_ptr< RNTupleModel > model, std::string_view ntupleName, std::string_view storage, const RNTupleWriteOptions &options=RNTupleWriteOptions())
Recreate a new file and return a writer to write an ntuple.
std::shared_ptr< RNTupleFillContext > CreateFillContext()
Create a new RNTupleFillContext that can be used to fill entries and prepare clusters in parallel.
static std::unique_ptr< RNTupleParallelWriter > Append(std::unique_ptr< RNTupleModel > model, std::string_view ntupleName, TFile &file, const RNTupleWriteOptions &options=RNTupleWriteOptions())
Append an ntuple to the existing file, which must not be accessed while data is filled into any creat...
Detail::RNTupleMetrics fMetrics
Common user-tunable settings for storing ntuples.
bool GetUseBufferedWrite() const
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
The incremental changes to a RNTupleModel