Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RClusterPool.cxx
Go to the documentation of this file.
1/// \file RClusterPool.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2020-03-11
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RClusterPool.hxx>
18#include <ROOT/RPageStorage.hxx>
19
20#include <TError.h>
21
22#include <algorithm>
23#include <chrono>
24#include <future>
25#include <iostream>
26#include <iterator>
27#include <map>
28#include <memory>
29#include <mutex>
30#include <set>
31#include <utility>
32
34{
35 if (fClusterKey.fClusterId == other.fClusterKey.fClusterId) {
36 if (fClusterKey.fPhysicalColumnSet.size() == other.fClusterKey.fPhysicalColumnSet.size()) {
37 for (auto itr1 = fClusterKey.fPhysicalColumnSet.begin(), itr2 = other.fClusterKey.fPhysicalColumnSet.begin();
39 if (*itr1 == *itr2)
40 continue;
41 return *itr1 < *itr2;
42 }
43 // *this == other
44 return false;
45 }
46 return fClusterKey.fPhysicalColumnSet.size() < other.fClusterKey.fPhysicalColumnSet.size();
47 }
48 return fClusterKey.fClusterId < other.fClusterKey.fClusterId;
49}
50
59
61{
62 {
63 // Controlled shutdown of the I/O thread
64 std::unique_lock<std::mutex> lock(fLockWorkQueue);
65 fReadQueue.emplace_back(RReadItem());
66 fCvHasReadWork.notify_one();
67 }
68 fThreadIo.join();
69}
70
72{
73 std::deque<RReadItem> readItems;
74 while (true) {
75 {
76 std::unique_lock<std::mutex> lock(fLockWorkQueue);
77 fCvHasReadWork.wait(lock, [&]{ return !fReadQueue.empty(); });
78 std::swap(readItems, fReadQueue);
79 }
80
81 while (!readItems.empty()) {
82 std::vector<RCluster::RKey> clusterKeys;
83 std::int64_t bunchId = -1;
84 for (unsigned i = 0; i < readItems.size(); ++i) {
85 const auto &item = readItems[i];
86 // `kInvalidDescriptorId` is used as a marker for thread cancellation. Such item causes the
87 // thread to terminate; thus, it must appear last in the queue.
88 if (R__unlikely(item.fClusterKey.fClusterId == ROOT::kInvalidDescriptorId)) {
89 R__ASSERT(i == (readItems.size() - 1));
90 return;
91 }
92 if ((bunchId >= 0) && (item.fBunchId != bunchId))
93 break;
94 bunchId = item.fBunchId;
95 clusterKeys.emplace_back(item.fClusterKey);
96 }
97
98 auto clusters = fPageSource.LoadClusters(clusterKeys);
99 for (std::size_t i = 0; i < clusters.size(); ++i) {
100 readItems[i].fPromise.set_value(std::move(clusters[i]));
101 }
102 readItems.erase(readItems.begin(), readItems.begin() + clusters.size());
103 }
104 } // while (true)
105}
106
109{
110 for (const auto &cptr : fPool) {
111 if (cptr && (cptr->GetId() == clusterId))
112 return cptr.get();
113 }
114 return nullptr;
115}
116
118{
119 auto N = fPool.size();
120 for (unsigned i = 0; i < N; ++i) {
121 if (!fPool[i])
122 return i;
123 }
124
125 R__ASSERT(false);
126 return N;
127}
128
129
130namespace {
131
132/// Helper class for the (cluster, column list) pairs that should be loaded in the background
133class RProvides {
134 using DescriptorId_t = ROOT::DescriptorId_t;
136
137public:
138 struct RInfo {
139 std::int64_t fBunchId = -1;
140 std::int64_t fFlags = 0;
141 ColumnSet_t fPhysicalColumnSet;
142 };
143
144 static constexpr std::int64_t kFlagRequired = 0x01;
145 static constexpr std::int64_t kFlagLast = 0x02;
146
147private:
148 std::map<DescriptorId_t, RInfo> fMap;
149
150public:
151 void Insert(DescriptorId_t clusterId, const RInfo &info)
152 {
153 fMap.emplace(clusterId, info);
154 }
155
156 bool Contains(DescriptorId_t clusterId) {
157 return fMap.count(clusterId) > 0;
158 }
159
160 std::size_t GetSize() const { return fMap.size(); }
161
162 void Erase(DescriptorId_t clusterId, const ColumnSet_t &physicalColumns)
163 {
164 auto itr = fMap.find(clusterId);
165 if (itr == fMap.end())
166 return;
167 ColumnSet_t d;
168 std::copy_if(itr->second.fPhysicalColumnSet.begin(), itr->second.fPhysicalColumnSet.end(),
169 std::inserter(d, d.end()),
170 [&physicalColumns](DescriptorId_t needle) { return physicalColumns.count(needle) == 0; });
171 if (d.empty()) {
172 fMap.erase(itr);
173 } else {
174 itr->second.fPhysicalColumnSet = d;
175 }
176 }
177
178 decltype(fMap)::iterator begin() { return fMap.begin(); }
180};
181
182} // anonymous namespace
183
187{
188 std::set<ROOT::DescriptorId_t> keep;
189 RProvides provide;
190 {
191 auto descriptorGuard = fPageSource.GetSharedDescriptorGuard();
192
193 // Determine previous cluster ids that we keep if they happen to be in the pool
194 auto prev = clusterId;
195 for (unsigned int i = 0; i < fWindowPre; ++i) {
196 prev = descriptorGuard->FindPrevClusterId(prev);
197 if (prev == ROOT::kInvalidDescriptorId)
198 break;
199 keep.insert(prev);
200 }
201
202 // Determine following cluster ids and the column ids that we want to make available
203 RProvides::RInfo provideInfo;
204 provideInfo.fPhysicalColumnSet = physicalColumns;
205 provideInfo.fBunchId = fBunchId;
206 provideInfo.fFlags = RProvides::kFlagRequired;
207 for (ROOT::DescriptorId_t i = 0, next = clusterId; i < 2 * fClusterBunchSize; ++i) {
208 if (i == fClusterBunchSize)
209 provideInfo.fBunchId = ++fBunchId;
210
211 auto cid = next;
212 next = descriptorGuard->FindNextClusterId(cid);
213 if (next != ROOT::kInvalidNTupleIndex) {
214 if (!fPageSource.GetEntryRange().IntersectsWith(descriptorGuard->GetClusterDescriptor(next)))
216 }
217 if (next == ROOT::kInvalidDescriptorId)
218 provideInfo.fFlags |= RProvides::kFlagLast;
219
220 provide.Insert(cid, provideInfo);
221
222 if (next == ROOT::kInvalidDescriptorId)
223 break;
224 provideInfo.fFlags = 0;
225 }
226 } // descriptorGuard
227
228 // Clear the cache from clusters not the in the look-ahead or the look-back window
229 for (auto &cptr : fPool) {
230 if (!cptr)
231 continue;
232 if (provide.Contains(cptr->GetId()) > 0)
233 continue;
234 if (keep.count(cptr->GetId()) > 0)
235 continue;
236 cptr.reset();
237 }
238
239 // Move clusters that meanwhile arrived into cache pool
240 {
241 // This lock is held during iteration over several data structures: the collection of in-flight clusters,
242 // the current pool of cached clusters, and the set of cluster ids to be preloaded.
243 // All three collections are expected to be small (certainly < 100, more likely < 10). All operations
244 // are non-blocking and moving around small items (pointers, ids, etc). Thus the overall locking time should
245 // still be reasonably small and the lock is rarely taken (usually once per cluster).
246 std::lock_guard<std::mutex> lockGuard(fLockWorkQueue);
247
248 for (auto itr = fInFlightClusters.begin(); itr != fInFlightClusters.end(); ) {
249 R__ASSERT(itr->fFuture.valid());
250 if (itr->fFuture.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
251 // Remove the set of columns that are already scheduled for being loaded
252 provide.Erase(itr->fClusterKey.fClusterId, itr->fClusterKey.fPhysicalColumnSet);
253 ++itr;
254 continue;
255 }
256
257 auto cptr = itr->fFuture.get();
259
260 const bool isExpired =
261 !provide.Contains(itr->fClusterKey.fClusterId) && (keep.count(itr->fClusterKey.fClusterId) == 0);
262 if (isExpired) {
263 cptr.reset();
264 itr = fInFlightClusters.erase(itr);
265 continue;
266 }
267
268 // Noop unless the page source has a task scheduler
269 fPageSource.UnzipCluster(cptr.get());
270
271 // We either put a fresh cluster into a free slot or we merge the cluster with an existing one
272 auto existingCluster = FindInPool(cptr->GetId());
273 if (existingCluster) {
274 existingCluster->Adopt(std::move(*cptr));
275 } else {
276 auto idxFreeSlot = FindFreeSlot();
277 fPool[idxFreeSlot] = std::move(cptr);
278 }
279 itr = fInFlightClusters.erase(itr);
280 }
281
282 // Determine clusters which get triggered for background loading
283 for (auto &cptr : fPool) {
284 if (!cptr)
285 continue;
286 provide.Erase(cptr->GetId(), cptr->GetAvailPhysicalColumns());
287 }
288
289 // Figure out if enough work accumulated to justify I/O calls
290 bool skipPrefetch = false;
291 if (provide.GetSize() < fClusterBunchSize) {
292 skipPrefetch = true;
293 for (const auto &kv : provide) {
294 if ((kv.second.fFlags & (RProvides::kFlagRequired | RProvides::kFlagLast)) == 0)
295 continue;
296 skipPrefetch = false;
297 break;
298 }
299 }
300
301 // Update the work queue and the in-flight cluster list with new requests. We already hold the work queue
302 // mutex
303 // TODO(jblomer): we should ensure that clusterId is given first to the I/O thread. That is usually the
304 // case but it's not ensured by the code
305 if (!skipPrefetch) {
306 for (const auto &kv : provide) {
307 R__ASSERT(!kv.second.fPhysicalColumnSet.empty());
308
310 readItem.fClusterKey.fClusterId = kv.first;
311 readItem.fBunchId = kv.second.fBunchId;
312 readItem.fClusterKey.fPhysicalColumnSet = kv.second.fPhysicalColumnSet;
313
315 inFlightCluster.fClusterKey.fClusterId = kv.first;
316 inFlightCluster.fClusterKey.fPhysicalColumnSet = kv.second.fPhysicalColumnSet;
317 inFlightCluster.fFuture = readItem.fPromise.get_future();
318 fInFlightClusters.emplace_back(std::move(inFlightCluster));
319
320 fReadQueue.emplace_back(std::move(readItem));
321 }
322 if (!fReadQueue.empty())
323 fCvHasReadWork.notify_one();
324 }
325 } // work queue lock guard
326
327 return WaitFor(clusterId, physicalColumns);
328}
329
333{
334 while (true) {
335 // Fast exit: the cluster happens to be already present in the cache pool
336 auto result = FindInPool(clusterId);
337 if (result) {
338 bool hasMissingColumn = false;
339 for (auto cid : physicalColumns) {
340 if (result->ContainsColumn(cid))
341 continue;
342
343 hasMissingColumn = true;
344 break;
345 }
346 if (!hasMissingColumn)
347 return result;
348 }
349
350 // Otherwise the missing data must have been triggered for loading by now, so block and wait
351 decltype(fInFlightClusters)::iterator itr;
352 {
353 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
354 itr = fInFlightClusters.begin();
355 for (; itr != fInFlightClusters.end(); ++itr) {
356 if (itr->fClusterKey.fClusterId == clusterId)
357 break;
358 }
359 R__ASSERT(itr != fInFlightClusters.end());
360 // Note that the fInFlightClusters is accessed concurrently only by the I/O thread. The I/O thread
361 // never changes the structure of the in-flight clusters array (it does not add, remove, or swap elements).
362 // Therefore, it is safe to access the element pointed to by itr here even after fLockWorkQueue
363 // is released. We need to release the lock before potentially blocking on the cluster future.
364 }
365
366 auto cptr = itr->fFuture.get();
367 // We were blocked waiting for the cluster, so assume that nobody discarded it.
368 R__ASSERT(cptr != nullptr);
369
370 // Noop unless the page source has a task scheduler
371 fPageSource.UnzipCluster(cptr.get());
372
373 if (result) {
374 result->Adopt(std::move(*cptr));
375 } else {
376 auto idxFreeSlot = FindFreeSlot();
377 fPool[idxFreeSlot] = std::move(cptr);
378 }
379
380 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
381 fInFlightClusters.erase(itr);
382 }
383}
384
386{
387 while (true) {
388 decltype(fInFlightClusters)::iterator itr;
389 {
390 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
391 itr = fInFlightClusters.begin();
392 if (itr == fInFlightClusters.end())
393 return;
394 }
395
396 itr->fFuture.wait();
397
398 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
399 fInFlightClusters.erase(itr);
400 }
401}
#define R__unlikely(expr)
Definition RConfig.hxx:602
std::ios_base::fmtflags fFlags
#define d(i)
Definition RSha256.hxx:102
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
#define N
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Managed a set of clusters containing compressed and packed pages.
void WaitForInFlightClusters()
Used by the unit tests to drain the queue of clusters to be preloaded.
unsigned int fClusterBunchSize
The number of clusters that are being read in a single vector read.
RCluster * GetCluster(ROOT::DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
std::vector< std::unique_ptr< RCluster > > fPool
The cache of clusters around the currently active cluster.
RClusterPool(RPageSource &pageSource, unsigned int clusterBunchSize)
void ExecReadClusters()
The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool.
RCluster * WaitFor(ROOT::DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the given cluster from the pool, which needs to contain at least the columns physicalColumns.
RPageSource & fPageSource
Every cluster pool is responsible for exactly one page source that triggers loading of the clusters (...
size_t FindFreeSlot() const
Returns an index of an unused element in fPool; callers of this function (GetCluster() and WaitFor())...
RCluster * FindInPool(ROOT::DescriptorId_t clusterId) const
Every cluster id has at most one corresponding RCluster pointer in the pool.
std::thread fThreadIo
The I/O thread calls RPageSource::LoadClusters() asynchronously.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:152
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:154
Abstract interface to read data from an ntuple.
const_iterator begin() const
const_iterator end() const
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
Definition Utils.hxx:187
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr NTupleSize_t kInvalidNTupleIndex
constexpr DescriptorId_t kInvalidDescriptorId
Clusters that are currently being processed by the pipeline.
bool operator<(const RInFlightCluster &other) const
First order by cluster id, then by number of columns, than by the column ids in fColumns.
Request to load a subset of the columns of a particular cluster.