55   , 
fPool(2 * clusterBunchSize)
 
   66      std::unique_lock<std::mutex> lock(fLockWorkQueue);
 
   68      fCvHasReadWork.notify_one();
 
   74      std::unique_lock<std::mutex> lock(fLockUnzipQueue);
 
   76      fCvHasUnzipWork.notify_one();
 
   84      std::vector<RUnzipItem> unzipItems;
 
   86         std::unique_lock<std::mutex> lock(fLockUnzipQueue);
 
   87         fCvHasUnzipWork.wait(lock, [&]{ 
return !fUnzipQueue.empty(); });
 
   88         while (!fUnzipQueue.empty()) {
 
   89            unzipItems.emplace_back(std::move(fUnzipQueue.front()));
 
   94      for (
auto &item : unzipItems) {
 
   98         fPageSource.UnzipCluster(item.fCluster.get());
 
  101         item.fPromise.set_value(std::move(item.fCluster));
 
  109      std::vector<RReadItem> readItems;
 
  110      std::vector<RCluster::RKey> clusterKeys;
 
  111      std::int64_t bunchId = -1;
 
  113         std::unique_lock<std::mutex> lock(fLockWorkQueue);
 
  114         fCvHasReadWork.wait(lock, [&]{ 
return !fReadQueue.empty(); });
 
  115         while (!fReadQueue.empty()) {
 
  121            if ((bunchId >= 0) && (fReadQueue.front().fBunchId != bunchId))
 
  123            readItems.emplace_back(std::move(fReadQueue.front()));
 
  125            bunchId = readItems.back().fBunchId;
 
  126            clusterKeys.emplace_back(readItems.back().fClusterKey);
 
  130      auto clusters = fPageSource.LoadClusters(clusterKeys);
 
  132      for (std::size_t i = 0; i < clusters.size(); ++i) {
 
  135         bool discard = 
false;
 
  137            std::unique_lock<std::mutex> lock(fLockWorkQueue);
 
  138            for (
auto &inFlight : fInFlightClusters) {
 
  139               if (inFlight.fClusterKey.fClusterId != clusters[i]->GetId())
 
  141               discard = inFlight.fIsExpired;
 
  147            readItems[i].fPromise.set_value(std::move(clusters[i]));
 
  150            std::unique_lock<std::mutex> lock(fLockUnzipQueue);
 
  151            fUnzipQueue.emplace(
RUnzipItem{std::move(clusters[i]), std::move(readItems[i].fPromise)});
 
  152            fCvHasUnzipWork.notify_one();
 
  161   for (
const auto &cptr : fPool) {
 
  162      if (cptr && (cptr->GetId() == clusterId))
 
  170   auto N = fPool.size();
 
  171   for (
unsigned i = 0; i < 
N; ++i) {
 
  190      std::int64_t fBunchId = -1;
 
  192      ColumnSet_t fColumnSet;
 
  195   static constexpr std::int64_t kFlagRequired = 0x01;
 
  196   static constexpr std::int64_t kFlagLast     = 0x02;
 
  199   std::map<DescriptorId_t, RInfo> fMap;
 
  204      fMap.emplace(clusterId, info);
 
  207   bool Contains(DescriptorId_t clusterId) {
 
  208      return fMap.count(clusterId) > 0;
 
  211   std::size_t GetSize()
 const { 
return fMap.size(); }
 
  213   void Erase(DescriptorId_t clusterId, 
const ColumnSet_t &columns)
 
  215      auto itr = fMap.find(clusterId);
 
  216      if (itr == fMap.end())
 
  219      std::copy_if(itr->second.fColumnSet.begin(), itr->second.fColumnSet.end(), std::inserter(
d, 
d.end()),
 
  220         [&columns] (DescriptorId_t needle) { return columns.count(needle) == 0; });
 
  224         itr->second.fColumnSet = 
d;
 
  228   decltype(fMap)::iterator begin() { 
return fMap.begin(); }
 
  229   decltype(fMap)::iterator end() { 
return fMap.end(); }
 
  238   std::set<DescriptorId_t> keep;
 
  241      auto descriptorGuard = fPageSource.GetSharedDescriptorGuard();
 
  244      auto prev = clusterId;
 
  245      for (
unsigned int i = 0; i < fWindowPre; ++i) {
 
  246         prev = descriptorGuard->FindPrevClusterId(prev);
 
  253      RProvides::RInfo provideInfo;
 
  254      provideInfo.fColumnSet = columns;
 
  255      provideInfo.fBunchId = fBunchId;
 
  256      provideInfo.fFlags = RProvides::kFlagRequired;
 
  257      for (
DescriptorId_t i = 0, next = clusterId; i < 2 * fClusterBunchSize; ++i) {
 
  258         if (i == fClusterBunchSize)
 
  259            provideInfo.fBunchId = ++fBunchId;
 
  262         next = descriptorGuard->FindNextClusterId(cid);
 
  264            provideInfo.fFlags |= RProvides::kFlagLast;
 
  266         provide.Insert(cid, provideInfo);
 
  270         provideInfo.fFlags = 0;
 
  275   for (
auto &cptr : fPool) {
 
  278      if (provide.Contains(cptr->GetId()) > 0)
 
  280      if (keep.count(cptr->GetId()) > 0)
 
  292      std::lock_guard<std::mutex> lockGuard(fLockWorkQueue);
 
  294      for (
auto itr = fInFlightClusters.begin(); itr != fInFlightClusters.end(); ) {
 
  297            !provide.Contains(itr->fClusterKey.fClusterId) && (keep.count(itr->fClusterKey.fClusterId) == 0);
 
  299         if (itr->fFuture.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
 
  301            provide.Erase(itr->fClusterKey.fClusterId, itr->fClusterKey.fColumnSet);
 
  306         auto cptr = itr->fFuture.get();
 
  308         if (!cptr || itr->fIsExpired) {
 
  310            itr = fInFlightClusters.erase(itr);
 
  315         auto existingCluster = FindInPool(cptr->GetId());
 
  316         if (existingCluster) {
 
  317            existingCluster->Adopt(std::move(*cptr));
 
  319            auto idxFreeSlot = FindFreeSlot();
 
  320            fPool[idxFreeSlot] = std::move(cptr);
 
  322         itr = fInFlightClusters.erase(itr);
 
  326      for (
auto &cptr : fPool) {
 
  329         provide.Erase(cptr->GetId(), cptr->GetAvailColumns());
 
  333      bool skipPrefetch = 
false;
 
  334      if (provide.GetSize() < fClusterBunchSize) {
 
  336         for (
const auto &kv : provide) {
 
  337            if ((kv.second.fFlags & (RProvides::kFlagRequired | RProvides::kFlagLast)) == 0)
 
  339            skipPrefetch = 
false;
 
  349         for (
const auto &kv : provide) {
 
  350            R__ASSERT(!kv.second.fColumnSet.empty());
 
  354            readItem.
fBunchId = kv.second.fBunchId;
 
  361            fInFlightClusters.emplace_back(std::move(inFlightCluster));
 
  363            fReadQueue.emplace(std::move(readItem));
 
  365         if (fReadQueue.size() > 0)
 
  366            fCvHasReadWork.notify_one();
 
  370   return WaitFor(clusterId, columns);
 
  380      auto result = FindInPool(clusterId);
 
  382         bool hasMissingColumn = 
false;
 
  383         for (
auto cid : columns) {
 
  384            if (
result->ContainsColumn(cid))
 
  387            hasMissingColumn = 
true;
 
  390         if (!hasMissingColumn)
 
  395      decltype(fInFlightClusters)::iterator itr;
 
  397         std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
 
  398         itr = fInFlightClusters.begin();
 
  399         for (; itr != fInFlightClusters.end(); ++itr) {
 
  400            if (itr->fClusterKey.fClusterId == clusterId)
 
  403         R__ASSERT(itr != fInFlightClusters.end());
 
  410      auto cptr = itr->fFuture.get();
 
  412         result->Adopt(std::move(*cptr));
 
  414         auto idxFreeSlot = FindFreeSlot();
 
  415         fPool[idxFreeSlot] = std::move(cptr);
 
  418      std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
 
  419      fInFlightClusters.erase(itr);
 
  427      decltype(fInFlightClusters)::iterator itr;
 
  429         std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
 
  430         itr = fInFlightClusters.begin();
 
  431         if (itr == fInFlightClusters.end())
 
  437      std::lock_guard<std::mutex> lockGuardInFlightClusters(fLockWorkQueue);
 
  438      fInFlightClusters.erase(itr);
 
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
 
Managed a set of clusters containing compressed and packed pages.
 
RCluster * FindInPool(DescriptorId_t clusterId) const
Every cluster id has at most one corresponding RCluster pointer in the pool.
 
std::vector< std::unique_ptr< RCluster > > fPool
The cache of clusters around the currently active cluster.
 
void ExecUnzipClusters()
The unzip thread routine which takes a loaded cluster and passes it to fPageSource....
 
size_t FindFreeSlot() const
Returns an index of an unused element in fPool; callers of this function (GetCluster() and WaitFor())...
 
void WaitForInFlightClusters()
Used by the unit tests to drain the queue of clusters to be preloaded.
 
RClusterPool(RPageSource &pageSource, unsigned int clusterBunchSize)
 
void ExecReadClusters()
The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool.
 
std::thread fThreadUnzip
The unzip thread takes a loaded cluster and passes it to fPageSource->UnzipCluster() on it.
 
RCluster * GetCluster(DescriptorId_t clusterId, const RCluster::ColumnSet_t &columns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
 
RPageSource & fPageSource
Every cluster pool is responsible for exactly one page source that triggers loading of the clusters (...
 
std::thread fThreadIo
The I/O thread calls RPageSource::LoadClusters() asynchronously.
 
unsigned int fClusterBunchSize
The number of clusters that are being read in a single vector read.
 
RCluster * WaitFor(DescriptorId_t clusterId, const RCluster::ColumnSet_t &columns)
Returns the given cluster from the pool, which needs to contain at least the columns columns.
 
An in-memory subset of the packed and compressed pages of a cluster.
 
std::unordered_set< DescriptorId_t > ColumnSet_t
 
Abstract interface to read data from an ntuple.
 
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
 
constexpr DescriptorId_t kInvalidDescriptorId
 
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
 
Clusters that are currently being processed by the pipeline.
 
bool operator<(const RInFlightCluster &other) const
First order by cluster id, then by number of columns, than by the column ids in fColumns.
 
RCluster::RKey fClusterKey
 
std::future< std::unique_ptr< RCluster > > fFuture
 
Request to load a subset of the columns of a particular cluster.
 
std::int64_t fBunchId
Items with different bunch ids are scheduled for different vector reads.
 
std::promise< std::unique_ptr< RCluster > > fPromise
 
RCluster::RKey fClusterKey
 
Request to decompress and if necessary unpack compressed pages.
 
DescriptorId_t fClusterId