Logo ROOT  
Reference Guide
Loading...
Searching...
No Matches
RPagePool.cxx
Go to the documentation of this file.
1/// \file RPagePool.cxx
2/// \author Jakob Blomer <jblomer@cern.ch>
3/// \date 2018-10-04
4
5/*************************************************************************
6 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13#include <ROOT/RPagePool.hxx>
14#include <ROOT/RPageStorage.hxx>
15#include <ROOT/RColumn.hxx>
16
17#include <TError.h>
18
19#include <algorithm>
20#include <cstdlib>
21#include <utility>
22
24{
26 fCounters = std::make_unique<RCounters>(
27 RCounters{*fMetrics.MakeCounter<RNTupleAtomicCounter *>("nPage", "", "number of currently cached pages")});
28}
29
31ROOT::Internal::RPagePool::AddPage(RPage page, const RKey &key, std::int64_t initialRefCounter)
32{
33 assert(fLookupByBuffer.count(page.GetBuffer()) == 0);
34
35 const auto entryIndex = fEntries.size();
36
37 auto itrPageSet = fLookupByKey.find(key);
38 if (itrPageSet != fLookupByKey.end()) {
39 auto [itrEntryIdx, isNew] = itrPageSet->second.emplace(RPagePosition(page), entryIndex);
40 if (!isNew) {
41 assert(itrEntryIdx->second < fEntries.size());
42 // We require that pages cover pairwise distinct element ranges of the column
43 assert(fEntries[itrEntryIdx->second].fPage.GetGlobalRangeLast() == page.GetGlobalRangeLast());
44 fEntries[itrEntryIdx->second].fRefCounter += initialRefCounter;
45 return fEntries[itrEntryIdx->second];
46 }
47 } else {
48 fLookupByKey.emplace(key, std::map<RPagePosition, std::size_t>{{RPagePosition(page), entryIndex}});
49 }
50
51 fLookupByBuffer[page.GetBuffer()] = entryIndex;
52
53 fCounters->fNPage.Inc();
54 return fEntries.emplace_back(REntry{std::move(page), key, initialRefCounter});
55}
56
58{
59 std::lock_guard<std::mutex> lockGuard(fLock);
60 return RPageRef(AddPage(std::move(page), key, 1).fPage, this);
61}
62
64{
65 std::lock_guard<std::mutex> lockGuard(fLock);
66 const auto &entry = AddPage(std::move(page), key, 0);
67 if (entry.fRefCounter == 0)
68 AddToUnusedPages(entry.fPage);
69}
70
71void ROOT::Internal::RPagePool::ErasePage(std::size_t entryIdx, decltype(fLookupByBuffer)::iterator lookupByBufferItr)
72{
73 fLookupByBuffer.erase(lookupByBufferItr);
74
75 auto itrPageSet = fLookupByKey.find(fEntries[entryIdx].fKey);
76 assert(itrPageSet != fLookupByKey.end());
77 itrPageSet->second.erase(RPagePosition(fEntries[entryIdx].fPage));
78 if (itrPageSet->second.empty())
79 fLookupByKey.erase(itrPageSet);
80
81 const auto N = fEntries.size();
82 assert(entryIdx < N);
83 if (entryIdx != (N - 1)) {
84 fLookupByBuffer[fEntries[N - 1].fPage.GetBuffer()] = entryIdx;
85 itrPageSet = fLookupByKey.find(fEntries[N - 1].fKey);
86 assert(itrPageSet != fLookupByKey.end());
87 auto itrEntryIdx = itrPageSet->second.find(RPagePosition(fEntries[N - 1].fPage));
88 assert(itrEntryIdx != itrPageSet->second.end());
89 itrEntryIdx->second = entryIdx;
90 fEntries[entryIdx] = std::move(fEntries[N - 1]);
91 }
92
93 fCounters->fNPage.Dec();
94 fEntries.resize(N - 1);
95}
96
98{
99 if (page.IsNull()) return;
100 std::lock_guard<std::mutex> lockGuard(fLock);
101
102 auto itrLookup = fLookupByBuffer.find(page.GetBuffer());
103 assert(itrLookup != fLookupByBuffer.end());
104 const auto idx = itrLookup->second;
105
106 assert(fEntries[idx].fRefCounter >= 1);
107 if (--fEntries[idx].fRefCounter == 0) {
108 if (fPageSource.GetPinnedClusters().count(page.GetClusterInfo().GetId()) > 0) {
109 AddToUnusedPages(page);
110 } else {
111 ErasePage(idx, itrLookup);
112 }
113 }
114}
115
117{
118 fUnusedPages[page.GetClusterInfo().GetId()].emplace(page.GetBuffer());
119}
120
122{
123 auto itr = fUnusedPages.find(page.GetClusterInfo().GetId());
124 assert(itr != fUnusedPages.end());
125 itr->second.erase(page.GetBuffer());
126 if (itr->second.empty())
127 fUnusedPages.erase(itr);
128}
129
131{
132 std::lock_guard<std::mutex> lockGuard(fLock);
133 auto itrPageSet = fLookupByKey.find(key);
134 if (itrPageSet == fLookupByKey.end())
135 return RPageRef();
136 assert(!itrPageSet->second.empty());
137
138 auto itrEntryIdx = itrPageSet->second.upper_bound(RPagePosition(globalIndex));
139 if (itrEntryIdx == itrPageSet->second.begin())
140 return RPageRef();
141
142 --itrEntryIdx;
143 if (fEntries[itrEntryIdx->second].fPage.Contains(globalIndex)) {
144 if (fEntries[itrEntryIdx->second].fRefCounter == 0)
145 RemoveFromUnusedPages(fEntries[itrEntryIdx->second].fPage);
146 fEntries[itrEntryIdx->second].fRefCounter++;
147 return RPageRef(fEntries[itrEntryIdx->second].fPage, this);
148 }
149 return RPageRef();
150}
151
153{
154 std::lock_guard<std::mutex> lockGuard(fLock);
155 auto itrPageSet = fLookupByKey.find(key);
156 if (itrPageSet == fLookupByKey.end())
157 return RPageRef();
158 assert(!itrPageSet->second.empty());
159
160 auto itrEntryIdx = itrPageSet->second.upper_bound(RPagePosition(localIndex));
161 if (itrEntryIdx == itrPageSet->second.begin())
162 return RPageRef();
163
164 --itrEntryIdx;
165 if (fEntries[itrEntryIdx->second].fPage.Contains(localIndex)) {
166 if (fEntries[itrEntryIdx->second].fRefCounter == 0)
167 RemoveFromUnusedPages(fEntries[itrEntryIdx->second].fPage);
168 fEntries[itrEntryIdx->second].fRefCounter++;
169 return RPageRef(fEntries[itrEntryIdx->second].fPage, this);
170 }
171 return RPageRef();
172}
173
175{
176 std::lock_guard<std::mutex> lockGuard(fLock);
177 auto itr = fUnusedPages.find(clusterId);
178 if (itr == fUnusedPages.end())
179 return;
180
181 for (auto pageBuffer : itr->second) {
182 const auto itrLookupByBuffer = fLookupByBuffer.find(pageBuffer);
183 assert(itrLookupByBuffer != fLookupByBuffer.end());
184 const auto entryIdx = itrLookupByBuffer->second;
185 assert(fEntries[entryIdx].fRefCounter == 0);
186 ErasePage(entryIdx, itrLookupByBuffer);
187 }
188
189 fUnusedPages.erase(itr);
190}
#define N
A thread-safe integral performance counter.
A thread-safe integral performance counter.
std::unique_ptr< RCounters > fCounters
void ErasePage(std::size_t entryIdx, decltype(fLookupByBuffer)::iterator lookupByBufferItr)
Called both by ReleasePage() and by Evict() to remove an unused page from the pool.
Definition RPagePool.cxx:71
ROOT::Experimental::Detail::RNTupleMetrics fMetrics
The page pool counters are observed by the page source.
void Evict(ROOT::DescriptorId_t clusterId)
Removes unused pages (pages with reference counter 0) from the page pool.
REntry & AddPage(RPage page, const RKey &key, std::int64_t initialRefCounter)
Add a new page to the fLookupByBuffer and fLookupByKey data structures.
Definition RPagePool.cxx:31
std::unordered_map< ROOT::DescriptorId_t, std::unordered_set< void * > > fUnusedPages
Remembers pages with reference counter 0, organized by the page's cluster id.
std::mutex fLock
The page pool is accessed concurrently due to parallel decompression.
RPageRef GetPage(RKey key, ROOT::NTupleSize_t globalIndex)
Tries to find the page corresponding to column and index in the cache.
RPagePool(RPageSource &pageSource)
Definition RPagePool.cxx:23
std::unordered_map< RKey, std::map< RPagePosition, std::size_t >, RKeyHasher > fLookupByKey
Used in GetPage() to find the right page in fEntries.
void AddToUnusedPages(const RPage &page)
Called by PreloadPage() if the page at hand is new and thus added with ref counter 0.
void PreloadPage(RPage page, RKey key)
Like RegisterPage() but the reference counter is initialized to 0.
Definition RPagePool.cxx:63
RPageRef RegisterPage(RPage page, RKey key)
Adds a new page to the pool.
Definition RPagePool.cxx:57
void RemoveFromUnusedPages(const RPage &page)
Called by GetPage(), when the reference counter increases from zero to one.
RPageSource & fPageSource
Every page pool is associated to exactly one page source.
std::unordered_map< void *, std::size_t > fLookupByBuffer
Used in ReleasePage() to find the page index in fPages.
std::vector< REntry > fEntries
All cached pages in the page pool.
void ReleasePage(const RPage &page)
Give back a page to the pool and decrease the reference counter.
Definition RPagePool.cxx:97
Reference to a page stored in the page pool.
Abstract interface to read data from an ntuple.
ROOT::DescriptorId_t GetId() const
Definition RPage.hxx:62
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:43
bool IsNull() const
Definition RPage.hxx:173
ROOT::NTupleSize_t GetGlobalRangeLast() const
Definition RPage.hxx:123
void * GetBuffer() const
Definition RPage.hxx:142
const RClusterInfo & GetClusterInfo() const
Definition RPage.hxx:126
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
Performance counters that get registered in fMetrics.
Every page in the page pool is annotated with a search key and a reference counter.
Definition RPagePool.hxx:76
Used in fLookupByKey to store both the absolute and the cluster-local page index of the referenced pa...
Definition RPagePool.hxx:85