Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RClusterPool.cxx
Go to the documentation of this file.
1/// \file RClusterPool.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2020-03-11
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RClusterPool.hxx>
18#include <ROOT/RPageStorage.hxx>
19
20#include <TError.h>
21
22#include <algorithm>
23#include <chrono>
24#include <future>
25#include <iostream>
26#include <iterator>
27#include <map>
28#include <memory>
29#include <mutex>
30#include <set>
31#include <utility>
32
34{
37 for (auto itr1 = fClusterKey.fPhysicalColumnSet.begin(), itr2 = other.fClusterKey.fPhysicalColumnSet.begin();
38 itr1 != fClusterKey.fPhysicalColumnSet.end(); ++itr1, ++itr2) {
39 if (*itr1 == *itr2)
40 continue;
41 return *itr1 < *itr2;
42 }
43 // *this == other
44 return false;
45 }
47 }
49}
50
52 : fPageSource(pageSource)
53 , fClusterBunchSize(clusterBunchSize)
54 , fPool(2 * clusterBunchSize)
57{
58 R__ASSERT(clusterBunchSize > 0);
59}
60
62{
63 {
64 // Controlled shutdown of the I/O thread
65 std::unique_lock<std::mutex> lock(fLockWorkQueue);
66 fReadQueue.emplace_back(RReadItem());
67 fCvHasReadWork.notify_one();
68 }
69 fThreadIo.join();
70
71 {
72 // Controlled shutdown of the unzip thread
73 std::unique_lock<std::mutex> lock(fLockUnzipQueue);
74 fUnzipQueue.emplace_back(RUnzipItem());
75 fCvHasUnzipWork.notify_one();
76 }
77 fThreadUnzip.join();
78}
79
81{
82 // The thread keeps its local buffer of elements to be processed. On wakeup, the local copy is swapped with
83 // `fUnzipQueue`, which not only reduces contention but also reduces the overall number of allocations, as the
84 // internal storage of both copies is reused. The local copy should be cleared before the `std::swap()` in the next
85 // iteration.
86 std::deque<RUnzipItem> unzipItems;
87 while (true) {
88 {
89 std::unique_lock<std::mutex> lock(fLockUnzipQueue);
90 fCvHasUnzipWork.wait(lock, [&]{ return !fUnzipQueue.empty(); });
91 std::swap(unzipItems, fUnzipQueue);
92 }
93
94 for (auto &item : unzipItems) {
95 if (!item.fCluster)
96 return;
97
98 fPageSource.UnzipCluster(item.fCluster.get());
99
100 // Afterwards the GetCluster() method in the main thread can pick-up the cluster
101 item.fPromise.set_value(std::move(item.fCluster));
102 }
103 unzipItems.clear();
104 } // while (true)
105}
106
108{
109 std::deque<RReadItem> readItems;
110 while (true) {
111 {
112 std::unique_lock<std::mutex> lock(fLockWorkQueue);
113 fCvHasReadWork.wait(lock, [&]{ return !fReadQueue.empty(); });
114 std::swap(readItems, fReadQueue);
115 }
116
117 while (!readItems.empty()) {
118 std::vector<RCluster::RKey> clusterKeys;
119 std::int64_t bunchId = -1;
120 for (unsigned i = 0; i < readItems.size(); ++i) {
121 const auto &item = readItems[i];
122 // `kInvalidDescriptorId` is used as a marker for thread cancellation. Such item causes the
123 // thread to terminate; thus, it must appear last in the queue.
124 if (R__unlikely(item.fClusterKey.fClusterId == kInvalidDescriptorId)) {
125 R__ASSERT(i == (readItems.size() - 1));
126 return;
127 }
128 if ((bunchId >= 0) && (item.fBunchId != bunchId))
129 break;
130 bunchId = item.fBunchId;
131 clusterKeys.emplace_back(item.fClusterKey);
132 }
133
134 auto clusters = fPageSource.LoadClusters(clusterKeys);
135 bool unzipQueueDirty = false;
136 for (std::size_t i = 0; i < clusters.size(); ++i) {
137 // Meanwhile, the user might have requested clusters outside the look-ahead window, so that we don't
138 // need the cluster anymore, in which case we simply discard it right away, before moving it to the pool
139 bool discard;
140 {
141 std::unique_lock<std::mutex> lock(fLockWorkQueue);
142 discard = std::any_of(fInFlightClusters.begin(), fInFlightClusters.end(),
143 [thisClusterId = clusters[i]->GetId()](auto &inFlight) {
144 return inFlight.fClusterKey.fClusterId == thisClusterId && inFlight.fIsExpired;
145 });
146 }
147 if (discard) {
148 clusters[i].reset();
149 readItems[i].fPromise.set_value(std::move(clusters[i]));
150 } else {
151 // Hand-over the loaded cluster pages to the unzip thread
152 std::unique_lock<std::mutex> lock(fLockUnzipQueue);
153 fUnzipQueue.emplace_back(RUnzipItem{std::move(clusters[i]), std::move(readItems[i].fPromise)});
154 unzipQueueDirty = true;
155 }
156 }
157 readItems.erase(readItems.begin(), readItems.begin() + clusters.size());
158 if (unzipQueueDirty)
159 fCvHasUnzipWork.notify_one();
160 }
161 } // while (true)
162}
163
166{
167 for (const auto &cptr : fPool) {
168 if (cptr && (cptr->GetId() == clusterId))
169 return cptr.get();
170 }
171 return nullptr;
172}
173
175{
176 auto N = fPool.size();
177 for (unsigned i = 0; i < N; ++i) {
178 if (!fPool[i])
179 return i;
180 }
181
182 R__ASSERT(false);
183 return N;
184}
185
186
187namespace {
188
189/// Helper class for the (cluster, column list) pairs that should be loaded in the background
190class RProvides {
193
194public:
195 struct RInfo {
196 std::int64_t fBunchId = -1;
197 std::int64_t fFlags = 0;
198 ColumnSet_t fPhysicalColumnSet;
199 };
200
201 static constexpr std::int64_t kFlagRequired = 0x01;
202 static constexpr std::int64_t kFlagLast = 0x02;
203
204private:
205 std::map<DescriptorId_t, RInfo> fMap;
206
207public:
208 void Insert(DescriptorId_t clusterId, const RInfo &info)
209 {
210 fMap.emplace(clusterId, info);
211 }
212
213 bool Contains(DescriptorId_t clusterId) {
214 return fMap.count(clusterId) > 0;
215 }
216
217 std::size_t GetSize() const { return fMap.size(); }
218
219 void Erase(DescriptorId_t clusterId, const ColumnSet_t &physicalColumns)
220 {
221 auto itr = fMap.find(clusterId);
222 if (itr == fMap.end())
223 return;
224 ColumnSet_t d;
225 std::copy_if(itr->second.fPhysicalColumnSet.begin(), itr->second.fPhysicalColumnSet.end(),
226 std::inserter(d, d.end()),
227 [&physicalColumns](DescriptorId_t needle) { return physicalColumns.count(needle) == 0; });
228 if (d.empty()) {
229 fMap.erase(itr);
230 } else {
231 itr->second.fPhysicalColumnSet = d;
232 }
233 }
234
235 decltype(fMap)::iterator begin() { return fMap.begin(); }
236 decltype(fMap)::iterator end() { return fMap.end(); }
237};
238
239} // anonymous namespace
240
243 const RCluster::ColumnSet_t &physicalColumns)
244{
245 std::set<DescriptorId_t> keep;
246 RProvides provide;
247 {
248 auto descriptorGuard = fPageSource.GetSharedDescriptorGuard();
249
250 // Determine previous cluster ids that we keep if they happen to be in the pool
251 auto prev = clusterId;
252 for (unsigned int i = 0; i < fWindowPre; ++i) {
253 prev = descriptorGuard->FindPrevClusterId(prev);
254 if (prev == kInvalidDescriptorId)
255 break;
256 keep.insert(prev);
257 }
258
259 // Determine following cluster ids and the column ids that we want to make available
260 RProvides::RInfo provideInfo;
261 provideInfo.fPhysicalColumnSet = physicalColumns;
262 provideInfo.fBunchId = fBunchId;
263 provideInfo.fFlags = RProvides::kFlagRequired;
264 for (DescriptorId_t i = 0, next = clusterId; i < 2 * fClusterBunchSize; ++i) {
265 if (i == fClusterBunchSize)
266 provideInfo.fBunchId = ++fBunchId;
267
268 auto cid = next;
269 next = descriptorGuard->FindNextClusterId(cid);
270 if (next == kInvalidDescriptorId)
271 provideInfo.fFlags |= RProvides::kFlagLast;
272
273 provide.Insert(cid, provideInfo);
274
275 if (next == kInvalidDescriptorId)
276 break;
277 provideInfo.fFlags = 0;
278 }
279 } // descriptorGuard
280
281 // Clear the cache from clusters not the in the look-ahead or the look-back window
282 for (auto &cptr : fPool) {
283 if (!cptr)
284 continue;
285 if (provide.Contains(cptr->GetId()) > 0)
286 continue;
287 if (keep.count(cptr->GetId()) > 0)
288 continue;
289 cptr.reset();
290 }
291
292 // Move clusters that meanwhile arrived into cache pool
293 {
294 // This lock is held during iteration over several data structures: the collection of in-flight clusters,
295 // the current pool of cached clusters, and the set of cluster ids to be preloaded.
296 // All three collections are expected to be small (certainly < 100, more likely < 10). All operations
297 // are non-blocking and moving around small items (pointers, ids, etc). Thus the overall locking time should
298 // still be reasonably small and the lock is rarely taken (usually once per cluster).
299 std::lock_guard<std::mutex> lockGuard(fLockWorkQueue);
300
301 for (auto itr = fInFlightClusters.begin(); itr != fInFlightClusters.end(); ) {
302 R__ASSERT(itr->fFuture.valid());
303 itr->fIsExpired =
304 !provide.Contains(itr->fClusterKey.fClusterId) && (keep.count(itr->fClusterKey.fClusterId) == 0);
305
306 if (itr->fFuture.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
307 // Remove the set of columns that are already scheduled for being loaded
308 provide.Erase(itr->fClusterKey.fClusterId, itr->fClusterKey.fPhysicalColumnSet);
309 ++itr;
310 continue;
311 }
312
313 auto cptr = itr->fFuture.get();
314 // If cptr is nullptr, the cluster expired previously and was released by the I/O thread
315 if (!cptr || itr->fIsExpired) {
316 cptr.reset();
317 itr = fInFlightClusters.erase(itr);
318 continue;
319 }
320
321 // We either put a fresh cluster into a free slot or we merge the cluster with an existing one
322 auto existingCluster = FindInPool(cptr->GetId());
323 if (existingCluster) {
324 existingCluster->Adopt(std::move(*cptr));
325 } else {
326 auto idxFreeSlot = FindFreeSlot();
327 fPool[idxFreeSlot] = std::move(cptr);
328 }
329 itr = fInFlightClusters.erase(itr);
330 }
331
332 // Determine clusters which get triggered for background loading
333 for (auto &cptr : fPool) {
334 if (!cptr)
335 continue;
336 provide.Erase(cptr->GetId(), cptr->GetAvailPhysicalColumns());
337 }
338
339 // Figure out if enough work accumulated to justify I/O calls
340 bool skipPrefetch = false;
341 if (provide.GetSize() < fClusterBunchSize) {
342 skipPrefetch = true;
343 for (const auto &kv : provide) {
344 if ((kv.second.fFlags & (RProvides::kFlagRequired | RProvides::kFlagLast)) == 0)
345 continue;
346 skipPrefetch = false;
347 break;
348 }
349 }
350
351 // Update the work queue and the in-flight cluster list with new requests. We already hold the work queue
352 // mutex
353 // TODO(jblomer): we should ensure that clusterId is given first to the I/O thread. That is usually the
354 // case but it's not ensured by the code
355 if (!skipPrefetch) {
356 for (const auto &kv : provide) {
357 R__ASSERT(!kv.second.fPhysicalColumnSet.empty());
358
359 RReadItem readItem;
360 readItem.fClusterKey.fClusterId = kv.first;
361 readItem.fBunchId = kv.second.fBunchId;
362 readItem.fClusterKey.fPhysicalColumnSet = kv.second.fPhysicalColumnSet;
363
364 RInFlightCluster inFlightCluster;
365 inFlightCluster.fClusterKey.fClusterId = kv.first;
366 inFlightCluster.fClusterKey.fPhysicalColumnSet = kv.second.fPhysicalColumnSet;
367 inFlightCluster.fFuture = readItem.fPromise.get_future();
368 fInFlightClusters.emplace_back(std::move(inFlightCluster));
369
370 fReadQueue.emplace_back(std::move(readItem));
371 }
372 if (fReadQueue.size() > 0)
373 fCvHasReadWork.notify_one();
374 }
375 } // work queue lock guard
376
377 return WaitFor(clusterId, physicalColumns);
378}
379
382 const RCluster::ColumnSet_t &physicalColumns)
383{
384 while (true) {
385 // Fast exit: the cluster happens to be already present in the cache pool
386 auto result = FindInPool(clusterId);
387 if (result) {
388 bool hasMissingColumn = false;
389 for (auto cid : physicalColumns) {
390 if (result->ContainsColumn(cid))
391 continue;
392
393 hasMissingColumn = true;
394 break;
395 }
396 if (!hasMissingColumn)
397 return result;
398 }
399
400 // Otherwise the missing data must have been triggered for loading by now, so block and wait
401 decltype(fInFlightClusters)::iterator itr;
402 {
403 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
404 itr = fInFlightClusters.begin();
405 for (; itr != fInFlightClusters.end(); ++itr) {
406 if (itr->fClusterKey.fClusterId == clusterId)
407 break;
408 }
409 R__ASSERT(itr != fInFlightClusters.end());
410 // Note that the fInFlightClusters is accessed concurrently only by the I/O thread. The I/O thread
411 // never changes the structure of the in-flight clusters array (it does not add, remove, or swap elements).
412 // Therefore, it is safe to access the element pointed to by itr here even after fLockWorkQueue
413 // is released. We need to release the lock before potentially blocking on the cluster future.
414 }
415
416 auto cptr = itr->fFuture.get();
417 if (result) {
418 result->Adopt(std::move(*cptr));
419 } else {
420 auto idxFreeSlot = FindFreeSlot();
421 fPool[idxFreeSlot] = std::move(cptr);
422 }
423
424 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
425 fInFlightClusters.erase(itr);
426 }
427}
428
429
431{
432 while (true) {
433 decltype(fInFlightClusters)::iterator itr;
434 {
435 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
436 itr = fInFlightClusters.begin();
437 if (itr == fInFlightClusters.end())
438 return;
439 }
440
441 itr->fFuture.wait();
442
443 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
444 fInFlightClusters.erase(itr);
445 }
446}
#define R__unlikely(expr)
Definition RConfig.hxx:586
std::ios_base::fmtflags fFlags
#define d(i)
Definition RSha256.hxx:102
#define R__ASSERT(e)
Definition TError.h:118
#define N
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Managed a set of clusters containing compressed and packed pages.
RCluster * FindInPool(DescriptorId_t clusterId) const
Every cluster id has at most one corresponding RCluster pointer in the pool.
std::vector< std::unique_ptr< RCluster > > fPool
The cache of clusters around the currently active cluster.
void ExecUnzipClusters()
The unzip thread routine which takes a loaded cluster and passes it to fPageSource....
size_t FindFreeSlot() const
Returns an index of an unused element in fPool; callers of this function (GetCluster() and WaitFor())...
RCluster * GetCluster(DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
void WaitForInFlightClusters()
Used by the unit tests to drain the queue of clusters to be preloaded.
RClusterPool(RPageSource &pageSource, unsigned int clusterBunchSize)
void ExecReadClusters()
The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool.
std::thread fThreadUnzip
The unzip thread takes a loaded cluster and passes it to fPageSource->UnzipCluster() on it.
RPageSource & fPageSource
Every cluster pool is responsible for exactly one page source that triggers loading of the clusters (...
std::thread fThreadIo
The I/O thread calls RPageSource::LoadClusters() asynchronously.
unsigned int fClusterBunchSize
The number of clusters that are being read in a single vector read.
RCluster * WaitFor(DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the given cluster from the pool, which needs to contain at least the columns physicalColumns.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:155
std::unordered_set< DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:157
Abstract interface to read data from an ntuple.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr DescriptorId_t kInvalidDescriptorId
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
Definition Utils.hxx:187
Clusters that are currently being processed by the pipeline.
bool operator<(const RInFlightCluster &other) const
First order by cluster id, then by number of columns, than by the column ids in fColumns.
std::future< std::unique_ptr< RCluster > > fFuture
Request to load a subset of the columns of a particular cluster.
std::int64_t fBunchId
Items with different bunch ids are scheduled for different vector reads.
std::promise< std::unique_ptr< RCluster > > fPromise
Request to decompress and if necessary unpack compressed pages.