Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RClusterPool.cxx
Go to the documentation of this file.
1/// \file RClusterPool.cxx
2/// \ingroup NTuple
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2020-03-11
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RClusterPool.hxx>
18#include <ROOT/RPageStorage.hxx>
19
20#include <TError.h>
21
22#include <algorithm>
23#include <chrono>
24#include <future>
25#include <iostream>
26#include <iterator>
27#include <map>
28#include <memory>
29#include <mutex>
30#include <set>
31#include <utility>
32
34{
35 if (fClusterKey.fClusterId == other.fClusterKey.fClusterId) {
36 if (fClusterKey.fPhysicalColumnSet.size() == other.fClusterKey.fPhysicalColumnSet.size()) {
37 for (auto itr1 = fClusterKey.fPhysicalColumnSet.begin(), itr2 = other.fClusterKey.fPhysicalColumnSet.begin();
39 if (*itr1 == *itr2)
40 continue;
41 return *itr1 < *itr2;
42 }
43 // *this == other
44 return false;
45 }
46 return fClusterKey.fPhysicalColumnSet.size() < other.fClusterKey.fPhysicalColumnSet.size();
47 }
48 return fClusterKey.fClusterId < other.fClusterKey.fClusterId;
49}
50
53{
55
57 fCounters = std::make_unique<RCounters>(
58 RCounters{*fMetrics.MakeCounter<RNTupleAtomicCounter *>("nCluster", "", "number of currently cached clusters")});
59}
60
62{
63 StopBackgroundThread();
64}
65
67{
68 if (fThreadIo.joinable())
69 return;
70
71 fThreadIo = std::thread(&RClusterPool::ExecReadClusters, this);
72}
73
75{
76 if (!fThreadIo.joinable())
77 return;
78
79 {
80 // Controlled shutdown of the I/O thread
81 std::unique_lock<std::mutex> lock(fLockWorkQueue);
82 fReadQueue.emplace_back(RReadItem());
83 fCvHasReadWork.notify_one();
84 }
85 fThreadIo.join();
86}
87
89{
90 std::deque<RReadItem> readItems;
91 while (true) {
92 {
93 std::unique_lock<std::mutex> lock(fLockWorkQueue);
94 fCvHasReadWork.wait(lock, [&]{ return !fReadQueue.empty(); });
95 std::swap(readItems, fReadQueue);
96 }
97
98 while (!readItems.empty()) {
99 std::vector<RCluster::RKey> clusterKeys;
100 std::int64_t bunchId = -1;
101 for (unsigned i = 0; i < readItems.size(); ++i) {
102 const auto &item = readItems[i];
103 // `kInvalidDescriptorId` is used as a marker for thread cancellation. Such item causes the
104 // thread to terminate; thus, it must appear last in the queue.
105 if (R__unlikely(item.fClusterKey.fClusterId == ROOT::kInvalidDescriptorId)) {
106 R__ASSERT(i == (readItems.size() - 1));
107 return;
108 }
109 if ((bunchId >= 0) && (item.fBunchId != bunchId))
110 break;
111 bunchId = item.fBunchId;
112 clusterKeys.emplace_back(item.fClusterKey);
113 }
114
115 auto clusters = fPageSource.LoadClusters(clusterKeys);
116 for (std::size_t i = 0; i < clusters.size(); ++i) {
117 readItems[i].fPromise.set_value(std::move(clusters[i]));
118 }
119 readItems.erase(readItems.begin(), readItems.begin() + clusters.size());
120 }
121 } // while (true)
122}
123
124namespace {
125
126/// Helper class for the (cluster, column list) pairs that should be loaded in the background
127class RProvides {
129 using ColumnSet_t = ROOT::Internal::RCluster::ColumnSet_t;
130
131public:
132 struct RInfo {
133 std::int64_t fBunchId = -1;
134 std::int64_t fFlags = 0;
135 ColumnSet_t fPhysicalColumnSet;
136 };
137
138 static constexpr std::int64_t kFlagRequired = 0x01;
139 static constexpr std::int64_t kFlagLast = 0x02;
140
141private:
142 std::map<DescriptorId_t, RInfo> fMap;
143
144public:
145 void Insert(DescriptorId_t clusterId, const RInfo &info)
146 {
147 fMap.emplace(clusterId, info);
148 }
149
150 bool Contains(DescriptorId_t clusterId) {
151 return fMap.count(clusterId) > 0;
152 }
153
154 std::size_t GetSize() const { return fMap.size(); }
155
156 void Erase(DescriptorId_t clusterId, const ColumnSet_t &physicalColumns)
157 {
158 auto itr = fMap.find(clusterId);
159 if (itr == fMap.end())
160 return;
161 ColumnSet_t d;
162 std::copy_if(itr->second.fPhysicalColumnSet.begin(), itr->second.fPhysicalColumnSet.end(),
163 std::inserter(d, d.end()),
164 [&physicalColumns](DescriptorId_t needle) { return physicalColumns.count(needle) == 0; });
165 if (d.empty()) {
166 fMap.erase(itr);
167 } else {
168 itr->second.fPhysicalColumnSet = d;
169 }
170 }
171
172 decltype(fMap)::iterator begin() { return fMap.begin(); }
173 decltype(fMap)::iterator end() { return fMap.end(); }
174};
175
176} // anonymous namespace
177
180{
181 StartBackgroundThread(); // ensure that the thread is started (no-op if it is already running)
182
183 std::unordered_set<ROOT::DescriptorId_t> keep{fPageSource.GetPinnedClusters()};
184 for (auto cid : fPageSource.GetPinnedClusters()) {
185 auto descriptorGuard = fPageSource.GetSharedDescriptorGuard();
186
187 for (ROOT::DescriptorId_t i = 1, next = cid; i < 2 * fClusterBunchSize; ++i) {
188 next = descriptorGuard->FindNextClusterId(next);
189 if (next == ROOT::kInvalidNTupleIndex ||
190 !fPageSource.GetEntryRange().IntersectsWith(descriptorGuard->GetClusterDescriptor(next))) {
191 break;
192 }
193
194 keep.insert(next);
195 }
196 }
197
198 RProvides provide;
199 {
200 auto descriptorGuard = fPageSource.GetSharedDescriptorGuard();
201
202 // Determine following cluster ids and the column ids that we want to make available
203 RProvides::RInfo provideInfo;
204 provideInfo.fPhysicalColumnSet = physicalColumns;
205 provideInfo.fBunchId = fBunchId;
206 provideInfo.fFlags = RProvides::kFlagRequired;
207 for (ROOT::DescriptorId_t i = 0, next = clusterId; i < 2 * fClusterBunchSize; ++i) {
208 if (i == fClusterBunchSize)
209 provideInfo.fBunchId = ++fBunchId;
210
211 auto cid = next;
212 next = descriptorGuard->FindNextClusterId(cid);
213 if (next != ROOT::kInvalidNTupleIndex) {
214 if (!fPageSource.GetEntryRange().IntersectsWith(descriptorGuard->GetClusterDescriptor(next)))
216 }
217 if (next == ROOT::kInvalidDescriptorId)
218 provideInfo.fFlags |= RProvides::kFlagLast;
219
220 provide.Insert(cid, provideInfo);
221
222 if (next == ROOT::kInvalidDescriptorId)
223 break;
224 provideInfo.fFlags = 0;
225 }
226 } // descriptorGuard
227
228 // Clear the cache from clusters not the in the look-ahead window or the set of pinned clusters
229 for (auto itr = fPool.begin(); itr != fPool.end();) {
230 if (provide.Contains(itr->first) > 0) {
231 ++itr;
232 continue;
233 }
234 if (keep.count(itr->first) > 0) {
235 ++itr;
236 continue;
237 }
238 itr = fPool.erase(itr);
239 fCounters->fNCluster.Dec();
240 }
241
242 // Move clusters that meanwhile arrived into cache pool
243 {
244 // This lock is held during iteration over several data structures: the collection of in-flight clusters,
245 // the current pool of cached clusters, and the set of cluster ids to be preloaded.
246 // All three collections are expected to be small (certainly < 100, more likely < 10). All operations
247 // are non-blocking and moving around small items (pointers, ids, etc). Thus the overall locking time should
248 // still be reasonably small and the lock is rarely taken (usually once per cluster).
249 std::lock_guard<std::mutex> lockGuard(fLockWorkQueue);
250
251 for (auto itr = fInFlightClusters.begin(); itr != fInFlightClusters.end(); ) {
252 R__ASSERT(itr->fFuture.valid());
253 if (itr->fFuture.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
254 // Remove the set of columns that are already scheduled for being loaded
255 provide.Erase(itr->fClusterKey.fClusterId, itr->fClusterKey.fPhysicalColumnSet);
256 ++itr;
257 continue;
258 }
259
260 auto cptr = itr->fFuture.get();
262
263 const bool isExpired =
264 !provide.Contains(itr->fClusterKey.fClusterId) && (keep.count(itr->fClusterKey.fClusterId) == 0);
265 if (isExpired) {
266 cptr.reset();
267 itr = fInFlightClusters.erase(itr);
268 continue;
269 }
270
271 // Noop unless the page source has a task scheduler
272 fPageSource.UnzipCluster(cptr.get());
273
274 // We either put a fresh cluster into a free slot or we merge the cluster with an existing one
275 auto existingCluster = fPool.find(cptr->GetId());
276 if (existingCluster != fPool.end()) {
277 existingCluster->second->Adopt(std::move(*cptr));
278 } else {
279 const auto cid = cptr->GetId();
280 fPool.emplace(cid, std::move(cptr));
281 fCounters->fNCluster.Inc();
282 }
283 itr = fInFlightClusters.erase(itr);
284 }
285
286 // Determine clusters which get triggered for background loading
287 for (const auto &[_, cptr] : fPool) {
288 provide.Erase(cptr->GetId(), cptr->GetAvailPhysicalColumns());
289 }
290
291 // Figure out if enough work accumulated to justify I/O calls
292 bool skipPrefetch = false;
293 if (provide.GetSize() < fClusterBunchSize) {
294 skipPrefetch = true;
295 for (const auto &kv : provide) {
296 if ((kv.second.fFlags & (RProvides::kFlagRequired | RProvides::kFlagLast)) == 0)
297 continue;
298 skipPrefetch = false;
299 break;
300 }
301 }
302
303 // Update the work queue and the in-flight cluster list with new requests. We already hold the work queue
304 // mutex
305 // TODO(jblomer): we should ensure that clusterId is given first to the I/O thread. That is usually the
306 // case but it's not ensured by the code
307 if (!skipPrefetch) {
308 for (const auto &kv : provide) {
309 R__ASSERT(!kv.second.fPhysicalColumnSet.empty());
310
312 readItem.fClusterKey.fClusterId = kv.first;
313 readItem.fBunchId = kv.second.fBunchId;
314 readItem.fClusterKey.fPhysicalColumnSet = kv.second.fPhysicalColumnSet;
315
317 inFlightCluster.fClusterKey.fClusterId = kv.first;
318 inFlightCluster.fClusterKey.fPhysicalColumnSet = kv.second.fPhysicalColumnSet;
319 inFlightCluster.fFuture = readItem.fPromise.get_future();
320 fInFlightClusters.emplace_back(std::move(inFlightCluster));
321
322 fReadQueue.emplace_back(std::move(readItem));
323 }
324 if (!fReadQueue.empty())
325 fCvHasReadWork.notify_one();
326 }
327 } // work queue lock guard
328
329 return WaitFor(clusterId, physicalColumns);
330}
331
334{
335 while (true) {
336 // Fast exit: the cluster happens to be already present in the cache pool
337 auto result = fPool.find(clusterId);
338 if (result != fPool.end()) {
339 bool hasMissingColumn = false;
340 for (auto cid : physicalColumns) {
341 if (result->second->ContainsColumn(cid))
342 continue;
343
344 hasMissingColumn = true;
345 break;
346 }
347 if (!hasMissingColumn)
348 return result->second.get();
349 }
350
351 // Otherwise the missing data must have been triggered for loading by now, so block and wait
352 decltype(fInFlightClusters)::iterator itr;
353 {
354 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
355 itr = fInFlightClusters.begin();
356 for (; itr != fInFlightClusters.end(); ++itr) {
357 if (itr->fClusterKey.fClusterId == clusterId)
358 break;
359 }
360 R__ASSERT(itr != fInFlightClusters.end());
361 // Note that the fInFlightClusters is accessed concurrently only by the I/O thread. The I/O thread
362 // never changes the structure of the in-flight clusters array (it does not add, remove, or swap elements).
363 // Therefore, it is safe to access the element pointed to by itr here even after fLockWorkQueue
364 // is released. We need to release the lock before potentially blocking on the cluster future.
365 }
366
367 auto cptr = itr->fFuture.get();
368 // We were blocked waiting for the cluster, so assume that nobody discarded it.
369 R__ASSERT(cptr != nullptr);
370
371 // Noop unless the page source has a task scheduler
372 fPageSource.UnzipCluster(cptr.get());
373
374 if (result != fPool.end()) {
375 result->second->Adopt(std::move(*cptr));
376 } else {
377 const auto cid = cptr->GetId();
378 fPool.emplace(cid, std::move(cptr));
379 fCounters->fNCluster.Inc();
380 }
381
382 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
383 fInFlightClusters.erase(itr);
384 }
385}
386
388{
389 while (true) {
390 decltype(fInFlightClusters)::iterator itr;
391 {
392 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
393 itr = fInFlightClusters.begin();
394 while (itr != fInFlightClusters.end() &&
395 itr->fFuture.wait_for(std::chrono::seconds(0)) == std::future_status::ready) {
396 ++itr;
397 }
398 if (itr == fInFlightClusters.end())
399 break;
400 }
401
402 itr->fFuture.wait();
403 }
404}
#define R__unlikely(expr)
Definition RConfig.hxx:596
std::ios_base::fmtflags fFlags
#define d(i)
Definition RSha256.hxx:102
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
#define _(A, B)
Definition cfortran.h:108
A thread-safe integral performance counter.
CounterPtrT MakeCounter(const std::string &name, Args &&... args)
RCluster * WaitFor(ROOT::DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the given cluster from the pool, which needs to contain at least the columns physicalColumns.
ROOT::Experimental::Detail::RNTupleMetrics fMetrics
The cluster pool counters are observed by the page source.
unsigned int fClusterBunchSize
The number of clusters that are being read in a single vector read.
void WaitForInFlightClusters()
Used by the unit tests to drain the queue of clusters to be preloaded.
std::unique_ptr< RCounters > fCounters
void StopBackgroundThread()
Stop the I/O background thread. No-op if already stopped. Called by the destructor.
void ExecReadClusters()
The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool.
RCluster * GetCluster(ROOT::DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
RClusterPool(ROOT::Internal::RPageSource &pageSource, unsigned int clusterBunchSize)
void StartBackgroundThread()
Spawn the I/O background thread. No-op if already started.
ROOT::Internal::RPageSource & fPageSource
Every cluster pool is responsible for exactly one page source that triggers loading of the clusters (...
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:148
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:150
Abstract interface to read data from an ntuple.
const_iterator begin() const
const_iterator end() const
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
Definition Utils.hxx:204
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr NTupleSize_t kInvalidNTupleIndex
constexpr DescriptorId_t kInvalidDescriptorId
Performance counters that get registered in fMetrics.
Clusters that are currently being processed by the pipeline.
bool operator<(const RInFlightCluster &other) const
First order by cluster id, then by number of columns, than by the column ids in fColumns.
Request to load a subset of the columns of a particular cluster.
ROOT::DescriptorId_t fClusterId
Definition RCluster.hxx:153