Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageSinkBuf.cxx
Go to the documentation of this file.
1/// \file RPageSinkBuf.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \author Max Orok <maxwellorok@gmail.com>
5/// \date 2021-03-17
6/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
7/// is welcome!
8
9/*************************************************************************
10 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
11 * All rights reserved. *
12 * *
13 * For the licensing terms see $ROOTSYS/LICENSE. *
14 * For the list of contributors see $ROOTSYS/README/CREDITS. *
15 *************************************************************************/
16
18#include <ROOT/RNTupleModel.hxx>
19#include <ROOT/RNTupleZip.hxx>
20#include <ROOT/RPageSinkBuf.hxx>
21
23 : RPageSink(inner->GetNTupleName(), inner->GetWriteOptions())
24 , fMetrics("RPageSinkBuf")
25 , fInnerSink(std::move(inner))
26{
27 fCounters = std::unique_ptr<RCounters>(new RCounters{
28 *fMetrics.MakeCounter<RNTuplePlainCounter*>("ParallelZip", "",
29 "compressing pages in parallel")
30 });
31 fMetrics.ObserveMetrics(fInnerSink->GetMetrics());
32}
33
35{
36 fBufferedColumns.resize(fLastColumnId);
37 fInnerModel = model.Clone();
38 fInnerSink->Create(*fInnerModel);
39}
40
43{
44 // TODO avoid frequent (de)allocations by holding on to allocated buffers in RColumnBuf
45 RPage bufPage = ReservePage(columnHandle, page.GetNElements());
46 // make sure the page is aware of how many elements it will have
47 bufPage.GrowUnchecked(page.GetNElements());
48 memcpy(bufPage.GetBuffer(), page.GetBuffer(), page.GetNBytes());
49 // Safety: RColumnBuf::iterators are guaranteed to be valid until the
50 // element is destroyed. In other words, all buffered page iterators are
51 // valid until the return value of DrainBufferedPages() goes out of scope in
52 // CommitCluster().
53 RColumnBuf::iterator zipItem =
54 fBufferedColumns.at(columnHandle.fId).BufferPage(columnHandle, bufPage);
55 if (!fTaskScheduler) {
56 return RNTupleLocator{};
57 }
58 fCounters->fParallelZip.SetValue(1);
59 // Thread safety: Each thread works on a distinct zipItem which owns its
60 // compression buffer.
61 zipItem->AllocateSealedPageBuf();
62 R__ASSERT(zipItem->fBuf);
63 fTaskScheduler->AddTask([this, zipItem, colId = columnHandle.fId] {
64 zipItem->fSealedPage = SealPage(zipItem->fPage,
65 *fBufferedColumns.at(colId).GetHandle().fColumn->GetElement(),
66 GetWriteOptions().GetCompression(), zipItem->fBuf.get()
67 );
68 });
69
70 // we're feeding bad locators to fOpenPageRanges but it should not matter
71 // because they never get written out
72 return RNTupleLocator{};
73}
74
77 DescriptorId_t columnId, const RSealedPage &sealedPage)
78{
79 fInnerSink->CommitSealedPage(columnId, sealedPage);
80 // we're feeding bad locators to fOpenPageRanges but it should not matter
81 // because they never get written out
82 return RNTupleLocator{};
83}
84
85std::uint64_t
87{
88 if (fTaskScheduler) {
89 fTaskScheduler->Wait();
90 fTaskScheduler->Reset();
91 }
92
93 for (auto &bufColumn : fBufferedColumns) {
94 for (auto &bufPage : bufColumn.DrainBufferedPages()) {
95 if (bufPage.IsSealed()) {
96 fInnerSink->CommitSealedPage(bufColumn.GetHandle().fId, bufPage.fSealedPage);
97 } else {
98 fInnerSink->CommitPage(bufColumn.GetHandle(), bufPage.fPage);
99 }
100 ReleasePage(bufPage.fPage);
101 }
102 }
103 return fInnerSink->CommitCluster(nEntries);
104}
105
107{
108 fInnerSink->CommitDataset();
109}
110
113{
114 return fInnerSink->ReservePage(columnHandle, nElements);
115}
116
118{
119 fInnerSink->ReleasePage(page);
120}
#define R__ASSERT(e)
Definition TError.h:118
void ObserveMetrics(RNTupleMetrics &observee)
CounterPtrT MakeCounter(const std::string &name, Args &&... args)
A non thread-safe integral performance counter.
std::deque< RPageZipItem >::iterator iterator
RNTupleLocator CommitSealedPageImpl(DescriptorId_t columnId, const RSealedPage &sealedPage) final
RPageSinkBuf(std::unique_ptr< RPageSink > inner)
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
std::unique_ptr< RCounters > fCounters
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements.
std::unique_ptr< RPageSink > fInnerSink
The inner sink, responsible for actually performing I/O.
void CreateImpl(const RNTupleModel &model) final
std::uint64_t CommitClusterImpl(NTupleSize_t nEntries) final
Returns the number of bytes written to storage (excluding metadata)
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final
Abstract interface to write data into an ntuple.
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:41
ClusterSize_t::ValueType GetNElements() const
Definition RPage.hxx:83
ClusterSize_t::ValueType GetNBytes() const
The space taken by column elements in the buffer.
Definition RPage.hxx:81
void * GrowUnchecked(ClusterSize_t::ValueType nElements)
Called during writing: returns a pointer after the last element and increases the element counter in ...
Definition RPage.hxx:109
The RNTupleModel encapulates the schema of an ntuple.
std::unique_ptr< RNTupleModel > Clone() const
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
I/O performance counters that get registered in fMetrics.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
Generic information about the physical location of data.