70 std::unique_lock<std::mutex> lock(fLockWorkQueue);
72 fCvHasReadWork.notify_one();
78 std::unique_lock<std::mutex> lock(fLockUnzipQueue);
80 fCvHasUnzipWork.notify_one();
88 std::vector<RUnzipItem> unzipItems;
90 std::unique_lock<std::mutex> lock(fLockUnzipQueue);
91 fCvHasUnzipWork.wait(lock, [&]{
return !fUnzipQueue.empty(); });
92 while (!fUnzipQueue.empty()) {
93 unzipItems.emplace_back(std::move(fUnzipQueue.front()));
98 for (
auto &item : unzipItems) {
102 fPageSource.UnzipCluster(item.fCluster.get());
105 item.fPromise.set_value(std::move(item.fCluster));
113 std::vector<RReadItem> readItems;
115 std::unique_lock<std::mutex> lock(fLockWorkQueue);
116 fCvHasReadWork.wait(lock, [&]{
return !fReadQueue.empty(); });
117 while (!fReadQueue.empty()) {
118 readItems.emplace_back(std::move(fReadQueue.front()));
123 for (
auto &item : readItems) {
128 auto cluster = fPageSource.LoadCluster(item.fClusterId, item.fColumns);
132 bool discard =
false;
134 std::unique_lock<std::mutex> lock(fLockWorkQueue);
135 for (
auto &inFlight : fInFlightClusters) {
136 if (inFlight.fClusterId != item.fClusterId)
138 discard = inFlight.fIsExpired;
144 item.fPromise.set_value(std::move(cluster));
147 std::unique_lock<std::mutex> lock(fLockUnzipQueue);
148 fUnzipQueue.emplace(
RUnzipItem{std::move(cluster), std::move(item.fPromise)});
149 fCvHasUnzipWork.notify_one();
158 for (
const auto &cptr : fPool) {
159 if (cptr && (cptr->GetId() == clusterId))
167 auto N = fPool.size();
168 for (
unsigned i = 0; i <
N; ++i) {
186 std::map<DescriptorId_t, ColumnSet_t> fMap;
191 fMap.emplace(clusterId, columns);
195 return fMap.count(clusterId) > 0;
198 void Erase(DescriptorId_t clusterId,
const ColumnSet_t &columns)
200 auto itr = fMap.find(clusterId);
201 if (itr == fMap.end())
204 std::copy_if(itr->second.begin(), itr->second.end(), std::inserter(
d,
d.end()),
205 [&columns] (DescriptorId_t needle) { return columns.count(needle) == 0; });
213 decltype(fMap)::iterator begin() {
return fMap.begin(); }
214 decltype(fMap)::iterator end() {
return fMap.end(); }
223 const auto &desc = fPageSource.GetDescriptor();
226 std::set<DescriptorId_t> keep;
227 auto prev = clusterId;
228 for (
unsigned int i = 0; i < fWindowPre; ++i) {
229 prev = desc.FindPrevClusterId(prev);
237 provide.Insert(clusterId, columns);
238 auto next = clusterId;
242 for (
unsigned int i = 1; i < fWindowPost; ++i) {
243 next = desc.FindNextClusterId(next);
246 provide.Insert(next, columns);
250 for (
auto &cptr : fPool) {
253 if (provide.Contains(cptr->GetId()) > 0)
255 if (keep.count(cptr->GetId()) > 0)
267 std::lock_guard<std::mutex> lockGuard(fLockWorkQueue);
269 for (
auto itr = fInFlightClusters.begin(); itr != fInFlightClusters.end(); ) {
271 itr->fIsExpired = !provide.Contains(itr->fClusterId) && (keep.count(itr->fClusterId) == 0);
273 if (itr->fFuture.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
275 provide.Erase(itr->fClusterId, itr->fColumns);
280 auto cptr = itr->fFuture.get();
282 if (!cptr || itr->fIsExpired) {
284 itr = fInFlightClusters.erase(itr);
289 auto existingCluster = FindInPool(cptr->GetId());
290 if (existingCluster) {
291 existingCluster->Adopt(std::move(*cptr));
293 auto idxFreeSlot = FindFreeSlot();
294 fPool[idxFreeSlot] = std::move(cptr);
296 itr = fInFlightClusters.erase(itr);
300 for (
auto &cptr : fPool) {
303 provide.Erase(cptr->GetId(), cptr->GetAvailColumns());
310 for (
const auto &kv : provide) {
319 inFlightCluster.
fColumns = kv.second;
321 fInFlightClusters.emplace_back(std::move(inFlightCluster));
323 fReadQueue.emplace(std::move(readItem));
325 if (fReadQueue.size() > 0)
326 fCvHasReadWork.notify_one();
329 return WaitFor(clusterId, columns);
339 auto result = FindInPool(clusterId);
341 bool hasMissingColumn =
false;
342 for (
auto cid : columns) {
343 if (result->ContainsColumn(cid))
346 hasMissingColumn =
true;
349 if (!hasMissingColumn)
354 decltype(fInFlightClusters)::iterator itr;
356 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
357 itr = fInFlightClusters.begin();
358 for (; itr != fInFlightClusters.end(); ++itr) {
359 if (itr->fClusterId == clusterId)
362 R__ASSERT(itr != fInFlightClusters.end());
369 auto cptr = itr->fFuture.get();
371 result->Adopt(std::move(*cptr));
373 auto idxFreeSlot = FindFreeSlot();
374 fPool[idxFreeSlot] = std::move(cptr);
377 std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
378 fInFlightClusters.erase(itr);
Managed a set of clusters containing compressed and packed pages.
RCluster * FindInPool(DescriptorId_t clusterId) const
Every cluster id has at most one corresponding RCluster pointer in the pool.
std::vector< std::unique_ptr< RCluster > > fPool
The cache of clusters around the currently active cluster.
void ExecUnzipClusters()
The unzip thread routine which takes a loaded cluster and passes it to fPageSource....
size_t FindFreeSlot() const
Returns an index of an unused element in fPool; callers of this function (GetCluster() and WaitFor())...
RCluster * WaitFor(DescriptorId_t clusterId, const RPageSource::ColumnSet_t &columns)
Returns the given cluster from the pool, which needs to contain at least the columns columns.
unsigned int fWindowPost
The number of desired clusters in the pool, including the currently active cluster.
void ExecReadClusters()
The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool.
std::thread fThreadUnzip
The unzip thread takes a loaded cluster and passes it to fPageSource->UnzipCluster() on it.
RPageSource & fPageSource
Every cluster pool is responsible for exactly one page source that triggers loading of the clusters (...
std::thread fThreadIo
The I/O thread calls RPageSource::LoadCluster() asynchronously.
RClusterPool(RPageSource &pageSource, unsigned int size)
unsigned int fWindowPre
The number of clusters before the currently active cluster that should stay in the pool if present.
RCluster * GetCluster(DescriptorId_t clusterId, const RPageSource::ColumnSet_t &columns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
An in-memory subset of the packed and compressed pages of a cluster.
Abstract interface to read data from an ntuple.
std::unordered_set< DescriptorId_t > ColumnSet_t
Derived from the model (fields) that are actually being requested at a given point in time.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr DescriptorId_t kInvalidDescriptorId
Clusters that are currently being processed by the pipeline.
bool operator<(const RInFlightCluster &other) const
First order by cluster id, then by number of columns, than by the column ids in fColumns.
RPageSource::ColumnSet_t fColumns
std::future< std::unique_ptr< RCluster > > fFuture
DescriptorId_t fClusterId
Request to load a subset of the columns of a particular cluster.
DescriptorId_t fClusterId
std::promise< std::unique_ptr< RCluster > > fPromise
RPageSource::ColumnSet_t fColumns
Request to decompress and if necessary unpack compressed pages.