Logo ROOT  
Reference Guide
Loading...
Searching...
No Matches
RClusterPool.cxx
Go to the documentation of this file.
1/// \file RClusterPool.cxx
2/// \author Jakob Blomer <jblomer@cern.ch>
3/// \date 2020-03-11
4/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
5/// is welcome!
6
7/*************************************************************************
8 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
9 * All rights reserved. *
10 * *
11 * For the licensing terms see $ROOTSYS/LICENSE. *
12 * For the list of contributors see $ROOTSYS/README/CREDITS. *
13 *************************************************************************/
14
15#include <ROOT/RClusterPool.hxx>
17#include <ROOT/RPageStorage.hxx>
18
19#include <TError.h>
20
21#include <algorithm>
22#include <chrono>
23#include <future>
24#include <iostream>
25#include <iterator>
26#include <map>
27#include <memory>
28#include <mutex>
29#include <set>
30#include <utility>
31
33{
34 if (fClusterKey.fClusterId == other.fClusterKey.fClusterId) {
35 if (fClusterKey.fPhysicalColumnSet.size() == other.fClusterKey.fPhysicalColumnSet.size()) {
36 for (auto itr1 = fClusterKey.fPhysicalColumnSet.begin(), itr2 = other.fClusterKey.fPhysicalColumnSet.begin();
37 itr1 != fClusterKey.fPhysicalColumnSet.end(); ++itr1, ++itr2) {
38 if (*itr1 == *itr2)
39 continue;
40 return *itr1 < *itr2;
41 }
42 // *this == other
43 return false;
44 }
45 return fClusterKey.fPhysicalColumnSet.size() < other.fClusterKey.fPhysicalColumnSet.size();
46 }
47 return fClusterKey.fClusterId < other.fClusterKey.fClusterId;
48}
49
51 : fPageSource(pageSource), fClusterBunchSize(clusterBunchSize), fMetrics("RClusterPool")
52{
53 R__ASSERT(clusterBunchSize > 0);
54
56 fCounters = std::make_unique<RCounters>(
57 RCounters{*fMetrics.MakeCounter<RNTupleAtomicCounter *>("nCluster", "", "number of currently cached clusters")});
58}
59
64
66{
67 if (fThreadIo.joinable())
68 return;
69
70 fThreadIo = std::thread(&RClusterPool::ExecReadClusters, this);
71}
72
74{
75 if (!fThreadIo.joinable())
76 return;
77
78 {
79 // Controlled shutdown of the I/O thread
80 std::unique_lock<std::mutex> lock(fLockWorkQueue);
81 fReadQueue.emplace_back(RReadItem());
82 fCvHasReadWork.notify_one();
83 }
84 fThreadIo.join();
85}
86
88{
89 std::deque<RReadItem> readItems;
90 while (true) {
91 {
92 std::unique_lock<std::mutex> lock(fLockWorkQueue);
93 fCvHasReadWork.wait(lock, [&]{ return !fReadQueue.empty(); });
94 std::swap(readItems, fReadQueue);
95 }
96
97 while (!readItems.empty()) {
98 std::vector<RCluster::RKey> clusterKeys;
99 std::int64_t bunchId = -1;
100 for (unsigned i = 0; i < readItems.size(); ++i) {
101 const auto &item = readItems[i];
102 // `kInvalidDescriptorId` is used as a marker for thread cancellation. Such item causes the
103 // thread to terminate; thus, it must appear last in the queue.
104 if (R__unlikely(item.fClusterKey.fClusterId == ROOT::kInvalidDescriptorId)) {
105 R__ASSERT(i == (readItems.size() - 1));
106 return;
107 }
108 if ((bunchId >= 0) && (item.fBunchId != bunchId))
109 break;
110 bunchId = item.fBunchId;
111 clusterKeys.emplace_back(item.fClusterKey);
112 }
113
114 auto clusters = fPageSource.LoadClusters(clusterKeys);
115 for (std::size_t i = 0; i < clusters.size(); ++i) {
116 readItems[i].fPromise.set_value(std::move(clusters[i]));
117 }
118 readItems.erase(readItems.begin(), readItems.begin() + clusters.size());
119 }
120 } // while (true)
121}
122
123namespace {
124
125/// Helper class for the (cluster, column list) pairs that should be loaded in the background
126class RProvides {
128 using ColumnSet_t = ROOT::Internal::RCluster::ColumnSet_t;
129
130public:
131 struct RInfo {
132 std::int64_t fBunchId = -1;
133 std::int64_t fFlags = 0;
134 ColumnSet_t fPhysicalColumnSet;
135 };
136
137 static constexpr std::int64_t kFlagRequired = 0x01;
138 static constexpr std::int64_t kFlagLast = 0x02;
139
140private:
141 std::map<DescriptorId_t, RInfo> fMap;
142
143public:
144 void Insert(DescriptorId_t clusterId, const RInfo &info)
145 {
146 fMap.emplace(clusterId, info);
147 }
148
149 bool Contains(DescriptorId_t clusterId) {
150 return fMap.count(clusterId) > 0;
151 }
152
153 std::size_t GetSize() const { return fMap.size(); }
154
155 void Erase(DescriptorId_t clusterId, const ColumnSet_t &physicalColumns)
156 {
157 auto itr = fMap.find(clusterId);
158 if (itr == fMap.end())
159 return;
160 ColumnSet_t d;
161 std::copy_if(itr->second.fPhysicalColumnSet.begin(), itr->second.fPhysicalColumnSet.end(),
162 std::inserter(d, d.end()),
163 [&physicalColumns](DescriptorId_t needle) { return physicalColumns.count(needle) == 0; });
164 if (d.empty()) {
165 fMap.erase(itr);
166 } else {
167 itr->second.fPhysicalColumnSet = d;
168 }
169 }
170
171 decltype(fMap)::iterator begin() { return fMap.begin(); }
172 decltype(fMap)::iterator end() { return fMap.end(); }
173};
174
175} // anonymous namespace
176
177ROOT::Internal::RCluster *
179{
180 StartBackgroundThread(); // ensure that the thread is started (no-op if it is already running)
181
182 std::unordered_set<ROOT::DescriptorId_t> keep{fPageSource.GetPinnedClusters()};
183 for (auto cid : fPageSource.GetPinnedClusters()) {
184 auto descriptorGuard = fPageSource.GetSharedDescriptorGuard();
185
186 for (ROOT::DescriptorId_t i = 1, next = cid; i < 2 * fClusterBunchSize; ++i) {
187 next = descriptorGuard->FindNextClusterId(next);
188 if (next == ROOT::kInvalidNTupleIndex ||
189 !fPageSource.GetEntryRange().IntersectsWith(descriptorGuard->GetClusterDescriptor(next))) {
190 break;
191 }
192
193 keep.insert(next);
194 }
195 }
196
197 RProvides provide;
198 {
199 auto descriptorGuard = fPageSource.GetSharedDescriptorGuard();
200
201 // Determine following cluster ids and the column ids that we want to make available
202 RProvides::RInfo provideInfo;
203 provideInfo.fPhysicalColumnSet = physicalColumns;
204 provideInfo.fBunchId = fBunchId;
205 provideInfo.fFlags = RProvides::kFlagRequired;
206 for (ROOT::DescriptorId_t i = 0, next = clusterId; i < 2 * fClusterBunchSize; ++i) {
207 if (i == fClusterBunchSize)
208 provideInfo.fBunchId = ++fBunchId;
209
210 auto cid = next;
211 next = descriptorGuard->FindNextClusterId(cid);
212 if (next != ROOT::kInvalidNTupleIndex) {
213 if (!fPageSource.GetEntryRange().IntersectsWith(descriptorGuard->GetClusterDescriptor(next)))
215 }
216 if (next == ROOT::kInvalidDescriptorId)
217 provideInfo.fFlags |= RProvides::kFlagLast;
218
219 provide.Insert(cid, provideInfo);
220
221 if (next == ROOT::kInvalidDescriptorId)
222 break;
223 provideInfo.fFlags = 0;
224 }
225 } // descriptorGuard
226
227 // Clear the cache from clusters not the in the look-ahead window or the set of pinned clusters
228 for (auto itr = fPool.begin(); itr != fPool.end();) {
229 if (provide.Contains(itr->first)) {
230 ++itr;
231 continue;
232 }
233 if (keep.count(itr->first) > 0) {
234 ++itr;
235 continue;
236 }
237 itr = fPool.erase(itr);
238 fCounters->fNCluster.Dec();
239 }
240
241 // Move clusters that meanwhile arrived into cache pool
242 {
243 // This lock is held during iteration over several data structures: the collection of in-flight clusters,
244 // the current pool of cached clusters, and the set of cluster ids to be preloaded.
245 // All three collections are expected to be small (certainly < 100, more likely < 10). All operations
246 // are non-blocking and moving around small items (pointers, ids, etc). Thus the overall locking time should
247 // still be reasonably small and the lock is rarely taken (usually once per cluster).
248 std::lock_guard<std::mutex> lockGuard(fLockWorkQueue);
249
250 for (auto itr = fInFlightClusters.begin(); itr != fInFlightClusters.end(); ) {
251 R__ASSERT(itr->fFuture.valid());
252 if (itr->fFuture.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
253 // Remove the set of columns that are already scheduled for being loaded
254 provide.Erase(itr->fClusterKey.fClusterId, itr->fClusterKey.fPhysicalColumnSet);
255 ++itr;
256 continue;
257 }
258
259 auto cptr = itr->fFuture.get();
260 R__ASSERT(cptr);
261
262 const bool isExpired =
263 !provide.Contains(itr->fClusterKey.fClusterId) && (keep.count(itr->fClusterKey.fClusterId) == 0);
264 if (isExpired) {
265 cptr.reset();
266 itr = fInFlightClusters.erase(itr);
267 continue;
268 }
269
270 // Noop unless the page source has a task scheduler
271 fPageSource.UnzipCluster(cptr.get());
272
273 // We either put a fresh cluster into a free slot or we merge the cluster with an existing one
274 auto existingCluster = fPool.find(cptr->GetId());
275 if (existingCluster != fPool.end()) {
276 existingCluster->second->Adopt(std::move(*cptr));
277 } else {
278 const auto cid = cptr->GetId();
279 fPool.emplace(cid, std::move(cptr));
280 fCounters->fNCluster.Inc();
281 }
282 itr = fInFlightClusters.erase(itr);
283 }
284
285 // Determine clusters which get triggered for background loading
286 for (const auto &[_, cptr] : fPool) {
287 provide.Erase(cptr->GetId(), cptr->GetAvailPhysicalColumns());
288 }
289
290 // Figure out if enough work accumulated to justify I/O calls
291 bool skipPrefetch = false;
292 if (provide.GetSize() < fClusterBunchSize) {
293 skipPrefetch = true;
294 for (const auto &kv : provide) {
295 if ((kv.second.fFlags & (RProvides::kFlagRequired | RProvides::kFlagLast)) == 0)
296 continue;
297 skipPrefetch = false;
298 break;
299 }
300 }
301
302 // Update the work queue and the in-flight cluster list with new requests. We already hold the work queue
303 // mutex
304 // TODO(jblomer): we should ensure that clusterId is given first to the I/O thread. That is usually the
305 // case but it's not ensured by the code
306 if (!skipPrefetch) {
307 for (const auto &kv : provide) {
308 R__ASSERT(!kv.second.fPhysicalColumnSet.empty());
309
310 RReadItem readItem;
311 readItem.fClusterKey.fClusterId = kv.first;
312 readItem.fBunchId = kv.second.fBunchId;
313 readItem.fClusterKey.fPhysicalColumnSet = kv.second.fPhysicalColumnSet;
314
315 RInFlightCluster inFlightCluster;
316 inFlightCluster.fClusterKey.fClusterId = kv.first;
317 inFlightCluster.fClusterKey.fPhysicalColumnSet = kv.second.fPhysicalColumnSet;
318 inFlightCluster.fFuture = readItem.fPromise.get_future();
319 fInFlightClusters.emplace_back(std::move(inFlightCluster));
320
321 fReadQueue.emplace_back(std::move(readItem));
322 }
323 if (!fReadQueue.empty())
324 fCvHasReadWork.notify_one();
325 }
326 } // work queue lock guard
327
328 return WaitFor(clusterId, physicalColumns);
329}
330
333{
334 while (true) {
335 // Fast exit: the cluster happens to be already present in the cache pool
336 auto result = fPool.find(clusterId);
337 if (result != fPool.end()) {
338 bool hasMissingColumn = false;
339 for (auto cid : physicalColumns) {
340 if (result->second->ContainsColumn(cid))
341 continue;
342
343 hasMissingColumn = true;
344 break;
345 }
346 if (!hasMissingColumn)
347 return result->second.get();
348 }
349
350 // Otherwise the missing data must have been triggered for loading by now, so block and wait
351 decltype(fInFlightClusters)::iterator itr;
352 {
353 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
354 itr = fInFlightClusters.begin();
355 for (; itr != fInFlightClusters.end(); ++itr) {
356 if (itr->fClusterKey.fClusterId == clusterId)
357 break;
358 }
359 R__ASSERT(itr != fInFlightClusters.end());
360 // Note that the fInFlightClusters is accessed concurrently only by the I/O thread. The I/O thread
361 // never changes the structure of the in-flight clusters array (it does not add, remove, or swap elements).
362 // Therefore, it is safe to access the element pointed to by itr here even after fLockWorkQueue
363 // is released. We need to release the lock before potentially blocking on the cluster future.
364 }
365
366 auto cptr = itr->fFuture.get();
367 // We were blocked waiting for the cluster, so assume that nobody discarded it.
368 R__ASSERT(cptr != nullptr);
369
370 // Noop unless the page source has a task scheduler
371 fPageSource.UnzipCluster(cptr.get());
372
373 if (result != fPool.end()) {
374 result->second->Adopt(std::move(*cptr));
375 } else {
376 const auto cid = cptr->GetId();
377 fPool.emplace(cid, std::move(cptr));
378 fCounters->fNCluster.Inc();
379 }
380
381 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
382 fInFlightClusters.erase(itr);
383 }
384}
385
387{
388 while (true) {
389 decltype(fInFlightClusters)::iterator itr;
390 {
391 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
392 itr = fInFlightClusters.begin();
393 while (itr != fInFlightClusters.end() &&
394 itr->fFuture.wait_for(std::chrono::seconds(0)) == std::future_status::ready) {
395 ++itr;
396 }
397 if (itr == fInFlightClusters.end())
398 break;
399 }
400
401 itr->fFuture.wait();
402 }
403}
#define R__unlikely(expr)
Definition RConfig.hxx:586
#define d(i)
Definition RSha256.hxx:102
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
#define _(A, B)
Definition cfortran.h:108
A thread-safe integral performance counter.
A thread-safe integral performance counter.
RCluster * WaitFor(ROOT::DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the given cluster from the pool, which needs to contain at least the columns physicalColumns.
std::condition_variable fCvHasReadWork
Signals a non-empty I/O work queue.
ROOT::Experimental::Detail::RNTupleMetrics fMetrics
The cluster pool counters are observed by the page source.
unsigned int fClusterBunchSize
The number of clusters that are being read in a single vector read.
void WaitForInFlightClusters()
Used by the unit tests to drain the queue of clusters to be preloaded.
std::vector< RInFlightCluster > fInFlightClusters
The clusters that were handed off to the I/O thread.
std::unordered_map< ROOT::DescriptorId_t, std::unique_ptr< RCluster > > fPool
The cache of active clusters and their successors.
std::unique_ptr< RCounters > fCounters
std::deque< RReadItem > fReadQueue
The communication channel to the I/O thread.
void StopBackgroundThread()
Stop the I/O background thread. No-op if already stopped. Called by the destructor.
void ExecReadClusters()
The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool.
RCluster * GetCluster(ROOT::DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
RClusterPool(ROOT::Internal::RPageSource &pageSource, unsigned int clusterBunchSize)
std::int64_t fBunchId
Used as an ever-growing counter in GetCluster() to separate bunches of clusters from each other.
void StartBackgroundThread()
Spawn the I/O background thread. No-op if already started.
std::mutex fLockWorkQueue
Protects the shared state between the main thread and the I/O thread, namely the work queue and the i...
ROOT::Internal::RPageSource & fPageSource
Every cluster pool is responsible for exactly one page source that triggers loading of the clusters (...
std::thread fThreadIo
The I/O thread calls RPageSource::LoadClusters() asynchronously.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:147
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:149
Abstract interface to read data from an ntuple.
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v.
Definition Utils.hxx:204
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr NTupleSize_t kInvalidNTupleIndex
constexpr DescriptorId_t kInvalidDescriptorId
Performance counters that get registered in fMetrics.
Clusters that are currently being processed by the pipeline.
bool operator<(const RInFlightCluster &other) const
First order by cluster id, then by number of columns, than by the column ids in fColumns.
std::future< std::unique_ptr< RCluster > > fFuture
Request to load a subset of the columns of a particular cluster.
std::int64_t fBunchId
Items with different bunch ids are scheduled for different vector reads.
std::promise< std::unique_ptr< RCluster > > fPromise
ROOT::DescriptorId_t fClusterId
Definition RCluster.hxx:152