55 ,
fPool(2 * clusterBunchSize)
66 std::unique_lock<std::mutex> lock(fLockWorkQueue);
68 fCvHasReadWork.notify_one();
74 std::unique_lock<std::mutex> lock(fLockUnzipQueue);
76 fCvHasUnzipWork.notify_one();
84 std::vector<RUnzipItem> unzipItems;
86 std::unique_lock<std::mutex> lock(fLockUnzipQueue);
87 fCvHasUnzipWork.wait(lock, [&]{
return !fUnzipQueue.empty(); });
88 while (!fUnzipQueue.empty()) {
89 unzipItems.emplace_back(std::move(fUnzipQueue.front()));
94 for (
auto &item : unzipItems) {
98 fPageSource.UnzipCluster(item.fCluster.get());
101 item.fPromise.set_value(std::move(item.fCluster));
109 std::vector<RReadItem> readItems;
110 std::vector<RCluster::RKey> clusterKeys;
111 std::int64_t bunchId = -1;
113 std::unique_lock<std::mutex> lock(fLockWorkQueue);
114 fCvHasReadWork.wait(lock, [&]{
return !fReadQueue.empty(); });
115 while (!fReadQueue.empty()) {
121 if ((bunchId >= 0) && (fReadQueue.front().fBunchId != bunchId))
123 readItems.emplace_back(std::move(fReadQueue.front()));
125 bunchId = readItems.back().fBunchId;
126 clusterKeys.emplace_back(readItems.back().fClusterKey);
130 auto clusters = fPageSource.LoadClusters(clusterKeys);
132 for (std::size_t i = 0; i < clusters.size(); ++i) {
135 bool discard =
false;
137 std::unique_lock<std::mutex> lock(fLockWorkQueue);
138 for (
auto &inFlight : fInFlightClusters) {
139 if (inFlight.fClusterKey.fClusterId != clusters[i]->GetId())
141 discard = inFlight.fIsExpired;
147 readItems[i].fPromise.set_value(std::move(clusters[i]));
150 std::unique_lock<std::mutex> lock(fLockUnzipQueue);
151 fUnzipQueue.emplace(
RUnzipItem{std::move(clusters[i]), std::move(readItems[i].fPromise)});
152 fCvHasUnzipWork.notify_one();
161 for (
const auto &cptr : fPool) {
162 if (cptr && (cptr->GetId() == clusterId))
170 auto N = fPool.size();
171 for (
unsigned i = 0; i <
N; ++i) {
190 std::int64_t fBunchId = -1;
192 ColumnSet_t fColumnSet;
195 static constexpr std::int64_t kFlagRequired = 0x01;
196 static constexpr std::int64_t kFlagLast = 0x02;
199 std::map<DescriptorId_t, RInfo> fMap;
204 fMap.emplace(clusterId, info);
207 bool Contains(DescriptorId_t clusterId) {
208 return fMap.count(clusterId) > 0;
211 std::size_t GetSize()
const {
return fMap.size(); }
213 void Erase(DescriptorId_t clusterId,
const ColumnSet_t &columns)
215 auto itr = fMap.find(clusterId);
216 if (itr == fMap.end())
219 std::copy_if(itr->second.fColumnSet.begin(), itr->second.fColumnSet.end(), std::inserter(
d,
d.end()),
220 [&columns] (DescriptorId_t needle) { return columns.count(needle) == 0; });
224 itr->second.fColumnSet =
d;
228 decltype(fMap)::iterator begin() {
return fMap.begin(); }
229 decltype(fMap)::iterator end() {
return fMap.end(); }
238 std::set<DescriptorId_t> keep;
241 auto descriptorGuard = fPageSource.GetSharedDescriptorGuard();
244 auto prev = clusterId;
245 for (
unsigned int i = 0; i < fWindowPre; ++i) {
246 prev = descriptorGuard->FindPrevClusterId(prev);
253 RProvides::RInfo provideInfo;
254 provideInfo.fColumnSet = columns;
255 provideInfo.fBunchId = fBunchId;
256 provideInfo.fFlags = RProvides::kFlagRequired;
257 for (
DescriptorId_t i = 0, next = clusterId; i < 2 * fClusterBunchSize; ++i) {
258 if (i == fClusterBunchSize)
259 provideInfo.fBunchId = ++fBunchId;
262 next = descriptorGuard->FindNextClusterId(cid);
264 provideInfo.fFlags |= RProvides::kFlagLast;
266 provide.Insert(cid, provideInfo);
270 provideInfo.fFlags = 0;
275 for (
auto &cptr : fPool) {
278 if (provide.Contains(cptr->GetId()) > 0)
280 if (keep.count(cptr->GetId()) > 0)
292 std::lock_guard<std::mutex> lockGuard(fLockWorkQueue);
294 for (
auto itr = fInFlightClusters.begin(); itr != fInFlightClusters.end(); ) {
297 !provide.Contains(itr->fClusterKey.fClusterId) && (keep.count(itr->fClusterKey.fClusterId) == 0);
299 if (itr->fFuture.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
301 provide.Erase(itr->fClusterKey.fClusterId, itr->fClusterKey.fColumnSet);
306 auto cptr = itr->fFuture.get();
308 if (!cptr || itr->fIsExpired) {
310 itr = fInFlightClusters.erase(itr);
315 auto existingCluster = FindInPool(cptr->GetId());
316 if (existingCluster) {
317 existingCluster->Adopt(std::move(*cptr));
319 auto idxFreeSlot = FindFreeSlot();
320 fPool[idxFreeSlot] = std::move(cptr);
322 itr = fInFlightClusters.erase(itr);
326 for (
auto &cptr : fPool) {
329 provide.Erase(cptr->GetId(), cptr->GetAvailColumns());
333 bool skipPrefetch =
false;
334 if (provide.GetSize() < fClusterBunchSize) {
336 for (
const auto &kv : provide) {
337 if ((kv.second.fFlags & (RProvides::kFlagRequired | RProvides::kFlagLast)) == 0)
339 skipPrefetch =
false;
349 for (
const auto &kv : provide) {
350 R__ASSERT(!kv.second.fColumnSet.empty());
354 readItem.
fBunchId = kv.second.fBunchId;
361 fInFlightClusters.emplace_back(std::move(inFlightCluster));
363 fReadQueue.emplace(std::move(readItem));
365 if (fReadQueue.size() > 0)
366 fCvHasReadWork.notify_one();
370 return WaitFor(clusterId, columns);
380 auto result = FindInPool(clusterId);
382 bool hasMissingColumn =
false;
383 for (
auto cid : columns) {
384 if (
result->ContainsColumn(cid))
387 hasMissingColumn =
true;
390 if (!hasMissingColumn)
395 decltype(fInFlightClusters)::iterator itr;
397 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
398 itr = fInFlightClusters.begin();
399 for (; itr != fInFlightClusters.end(); ++itr) {
400 if (itr->fClusterKey.fClusterId == clusterId)
403 R__ASSERT(itr != fInFlightClusters.end());
410 auto cptr = itr->fFuture.get();
412 result->Adopt(std::move(*cptr));
414 auto idxFreeSlot = FindFreeSlot();
415 fPool[idxFreeSlot] = std::move(cptr);
418 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
419 fInFlightClusters.erase(itr);
427 decltype(fInFlightClusters)::iterator itr;
429 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
430 itr = fInFlightClusters.begin();
431 if (itr == fInFlightClusters.end())
437 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
438 fInFlightClusters.erase(itr);
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Managed a set of clusters containing compressed and packed pages.
RCluster * FindInPool(DescriptorId_t clusterId) const
Every cluster id has at most one corresponding RCluster pointer in the pool.
std::vector< std::unique_ptr< RCluster > > fPool
The cache of clusters around the currently active cluster.
void ExecUnzipClusters()
The unzip thread routine which takes a loaded cluster and passes it to fPageSource....
size_t FindFreeSlot() const
Returns an index of an unused element in fPool; callers of this function (GetCluster() and WaitFor())...
void WaitForInFlightClusters()
Used by the unit tests to drain the queue of clusters to be preloaded.
RClusterPool(RPageSource &pageSource, unsigned int clusterBunchSize)
void ExecReadClusters()
The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool.
std::thread fThreadUnzip
The unzip thread takes a loaded cluster and passes it to fPageSource->UnzipCluster() on it.
RCluster * GetCluster(DescriptorId_t clusterId, const RCluster::ColumnSet_t &columns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
RPageSource & fPageSource
Every cluster pool is responsible for exactly one page source that triggers loading of the clusters (...
std::thread fThreadIo
The I/O thread calls RPageSource::LoadClusters() asynchronously.
unsigned int fClusterBunchSize
The number of clusters that are being read in a single vector read.
RCluster * WaitFor(DescriptorId_t clusterId, const RCluster::ColumnSet_t &columns)
Returns the given cluster from the pool, which needs to contain at least the columns columns.
An in-memory subset of the packed and compressed pages of a cluster.
std::unordered_set< DescriptorId_t > ColumnSet_t
Abstract interface to read data from an ntuple.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr DescriptorId_t kInvalidDescriptorId
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
Clusters that are currently being processed by the pipeline.
bool operator<(const RInFlightCluster &other) const
First order by cluster id, then by number of columns, than by the column ids in fColumns.
RCluster::RKey fClusterKey
std::future< std::unique_ptr< RCluster > > fFuture
Request to load a subset of the columns of a particular cluster.
std::int64_t fBunchId
Items with different bunch ids are scheduled for different vector reads.
std::promise< std::unique_ptr< RCluster > > fPromise
RCluster::RKey fClusterKey
Request to decompress and if necessary unpack compressed pages.
DescriptorId_t fClusterId