93 std::vector<Int_t> aUnzipLen = std::vector<Int_t>(newSize, 0);
94 std::unique_ptr<char[]> *aUnzipChunks =
new std::unique_ptr<char[]>[newSize];
95 std::atomic<Byte_t> *aUnzipStatus =
new std::atomic<Byte_t>[newSize];
97 for (
Int_t i = 0; i < newSize; ++i)
98 aUnzipStatus[i].store(0);
100 for (
Int_t i = 0; i < oldSize; i++) {
151 return fUnzipStatus[index].compare_exchange_weak(oldValue, newValue, std::memory_order_release, std::memory_order_relaxed);
198 fIOMutex = std::make_unique<TMutex>(
true);
212 Info(
"TTreeCacheUnzip",
"Enabling Parallel Unzipping");
218 Warning(
"TTreeCacheUnzip",
"Parallel Option unknown");
222 if (
gEnv->GetValue(
"TFile.AsyncReading", 1)) {
282 if (entry == -1) entry = 0;
300 Int_t t = chain->GetTreeNumber();
301 chainOffset = chain->GetTreeOffset()[t];
311 if (
b->GetDirectory() ==
nullptr)
continue;
312 if (
b->GetDirectory()->GetFile() !=
fFile)
continue;
313 Int_t nb =
b->GetMaxBaskets();
314 Int_t *lbaskets =
b->GetBasketBytes();
316 if (!lbaskets || !entries)
continue;
319 Int_t blistsize =
b->GetListOfBaskets()->GetSize();
320 for (
Int_t j=0;j<nb;j++) {
322 if (j<blistsize && b->GetListOfBaskets()->UncheckedAt(j))
continue;
325 Int_t len = lbaskets[j];
326 if (pos <= 0 || len <= 0)
continue;
329 if (entries[j] < entry && (j < nb - 1 && entries[j+1] <= entry))
continue;
332 if (j < nb - 1) emax = entries[j+1] - 1;
333 if (!elist->
ContainsRange(entries[j] + chainOffset, emax + chainOffset))
continue;
473 Int_t nread = maxbytes;
476 if (nb < 0)
return nread;
478 const Int_t headerSize = 16;
479 if (nread < headerSize)
return nread;
484 if (!olen) olen = nbytes - klen;
527 const Int_t hlen = 128;
528 Int_t objlen = 0, keylen = 0;
537 rdoffs =
fSeek[index];
551 char* locbuff =
nullptr;
553 locbuff =
new char[rdlen];
554 }
else if (rdlen * 3 < 16384) {
555 locbuff =
new char[rdlen * 2];
557 locbuff =
new char[16384];
564 if (locbuff)
delete [] locbuff;
570 Int_t len = (objlen > nbytes - keylen) ? keylen + objlen : nbytes;
577 Info(
"UnzipCache",
"Block %d is too big, skipping.", index);
580 if (locbuff)
delete [] locbuff;
587 if ((loclen > 0) && (loclen == objlen + keylen)) {
590 if (locbuff)
delete [] locbuff;
601 if (locbuff)
delete [] locbuff;
613 auto mapFunction = [&]() {
614 auto unzipFunction = [&](
const std::vector<Int_t> &indices) {
618 for (
auto ii : indices) {
623 Info(
"UnzipCache",
"Unzipping failed or cache is in learning state");
630 std::vector<std::vector<Int_t>> basketIndices;
631 std::vector<Int_t> indices;
636 indices.push_back(i);
641 basketIndices.push_back(indices);
646 pool.
Foreach(unzipFunction, basketIndices);
740 if ( myCycle !=
fCycle ) {
742 Info(
"GetUnzipBuffer",
"Sudden paging Break!!! fNseek: %d, fIsLearning:%d",
753 if ( (seekidx >= 0) && (
fUnzipState.IsUnzipped(seekidx)) ) {
857 const Int_t hlen = 128;
858 Int_t nbytes = 0, objlen = 0, keylen = 0;
861 Int_t nbytesRemain = nbytes - keylen;
862 Int_t objlenRemain = objlen;
869 if ((objlen > nbytes - keylen) &&
870 ((nbytesRemain < ROOT::Internal::kZipHeaderSize) || (R__unzip_header(&nin, bufcur, &nbuf) != 0))) {
871 Error(
"UnzipBuffer",
"Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
875 Int_t l = keylen + objlen;
885 bool oldCase = objlen == nbytes - keylen
887 &&
fFile->GetVersion() <= 30401;
889 if (objlen > nbytes-keylen || oldCase) {
892 memcpy(*dest, src, keylen);
895 char *objbuf = *dest + keylen;
902 while (nbytesRemain >= ROOT::Internal::kZipHeaderSize) {
903 Int_t hc = R__unzip_header(&nin, bufcur, &nbuf);
904 if ((hc != 0) || (nin > nbytesRemain) || (nbuf > objlenRemain))
907 Info(
"UnzipBuffer",
" nin:%d, nbuf:%d, bufcur[3] :%d, bufcur[4] :%d, bufcur[5] :%d ",
908 nin, nbuf, bufcur[3], bufcur[4], bufcur[5]);
909 if (oldCase && (nin > objlen || nbuf > objlen)) {
911 Info(
"UnzipBuffer",
"oldcase objlen :%d ", objlen);
914 memcpy(*dest + keylen, src + keylen, objlen);
919 R__unzip(&nin, bufcur, &nbuf,
reinterpret_cast<unsigned char *
>(objbuf), &nout);
922 Info(
"UnzipBuffer",
"R__unzip nin:%d, bufcur:%p, nbuf:%d, objbuf:%p, nout:%d",
923 nin, bufcur, nbuf, objbuf, nout);
927 if (noutot >= objlen)
break;
931 objlenRemain -= nout;
934 if (noutot != objlen) {
935 Error(
"UnzipBuffer",
"nbytes = %d, keylen = %d, objlen = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d",
936 nbytes,keylen,objlen, noutot,nout,nin,nbuf);
938 if(alloc)
delete [] *dest;
944 memcpy(*dest, src, keylen);
946 memcpy(*dest + keylen, src + keylen, objlen);
956 printf(
"******TreeCacheUnzip statistics for file: %s ******\n",
fFile->GetName());
958 printf(
"Number of blocks unzipped by threads: %d\n",
fNUnzip);
959 printf(
"Number of hits: %d\n",
fNFound);
960 printf(
"Number of stalls: %d\n",
fNStalls);
961 printf(
"Number of misses: %d\n",
fNMissed);
void frombuf(char *&buf, Bool_t *x)
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
unsigned char Byte_t
Byte (8 bits) (unsigned char).
int Int_t
Signed integer 4 bytes (int).
short Version_t
Class version identifier (short).
unsigned char UChar_t
Unsigned Character 1 byte (unsigned char).
unsigned int UInt_t
Unsigned integer 4 bytes (unsigned int).
short Short_t
Signed Short integer 2 bytes (short).
double Double_t
Double 8 bytes.
long long Long64_t
Portable signed long integer 8 bytes.
float Float_t
Float 4 bytes (float).
const char Option_t
Option string (const char).
#define R__LOCKGUARD(mutex)
This class provides a simple interface to execute the same task multiple times in parallel threads,...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
A TTree is a list of TBranches.
A chain is a collection of files containing TTree objects.
<div class="legacybox"><h2>Legacy Code</h2> TEventList is a legacy interface: there will be no bug fi...
virtual bool ContainsRange(Long64_t entrymin, Long64_t entrymax)
Return TRUE if list contains entries from entrymin to entrymax included.
Int_t * fSeekIndex
[fNseek] sorted index table of fSeek
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
Long64_t * fSeekSort
[fNseek] Position on file of buffers to be prefetched (sorted)
Int_t * fSeekLen
[fNseek] Length of buffers to be prefetched
Int_t fNtot
Total size of prefetched blocks.
virtual void Prefetch(Long64_t pos, Int_t len)
Add block of length len at position pos in the list of blocks to be prefetched.
Long64_t * fSeek
[fNseek] Position on file of buffers to be prefetched
Bool_t fIsTransferred
True when fBuffer contains something valid.
TFile * fFile
Pointer to file.
Int_t fNseek
Number of blocks to be prefetched.
virtual Int_t GetBufferSize() const
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
void Init()
Initialization procedure common to all the constructors.
Int_t UnzipCache(Int_t index)
This inflates a basket in the cache.
Int_t SetBufferSize(Long64_t buffersize) override
Change the underlying buffer size of the cache.
Int_t fNMissed
! number of blocks that were not found in the cache and were unzipped
void UpdateBranches(TTree *tree) override
update pointer to current Tree and recompute pointers to the branches in the cache
Int_t AddBranch(TBranch *b, bool subbranches=false) override
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
void ResetCache() override
This will delete the list of buffers that are in the unzipping cache and will reset certain values in...
void SetEntryRange(Long64_t emin, Long64_t emax) override
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
static bool IsParallelUnzip()
Static function that tells wether the multithreading unzipping is activated.
Int_t fNseekMax
! fNseek can change so we need to know its max size
Int_t fNStalls
! number of hits which caused a stall
static TTreeCacheUnzip::EParUnzipMode fgParallel
Indicate if we want to activate the parallelism.
std::unique_ptr< ROOT::Experimental::TTaskGroup > fUnzipTaskGroup
static Int_t SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option=TTreeCacheUnzip::kEnable)
Static function that (de)activates multithreading unzipping.
bool FillBuffer() override
Fill the cache buffer with the branches in the cache.
Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc) override
Int_t UnzipBuffer(char **dest, char *src)
Unzips a ROOT specific buffer... by reading the header at the beginning.
Int_t GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
Read the logical record header from the buffer buf.
static void SetUnzipRelBufferSize(Float_t relbufferSize)
static function: Sets the unzip relative buffer size
std::unique_ptr< TMutex > fIOMutex
Int_t fNUnzip
! number of blocks that were unzipped
Int_t CreateTasks()
We create a TTaskGroup and asynchronously maps each group of baskets(> 100 kB in total) to a task.
Int_t GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, bool *free) override
We try to read a buffer that has already been unzipped Returns -1 in case of read failure,...
Long64_t fUnzipBufferSize
! Max Size for the ready unzipped blocks (default is 2*fBufferSize)
void SetUnzipBufferSize(Long64_t bufferSize)
Sets the size for the unzipping cache... by default it should be two times the size of the prefetchin...
Int_t fUnzipGroupSize
! Min accumulated size of a group of baskets ready to be unzipped by a IMT task
void StopLearningPhase() override
It's the same as TTreeCache::StopLearningPhase but we guarantee that we start the unzipping just afte...
void Print(Option_t *option="") const override
Print cache statistics.
bool fParallel
Indicate if we want to activate the parallelism (for this instance).
Int_t fNFound
! number of blocks that were found in the cache
static Double_t fgRelBuffSize
This is the percentage of the TTreeCacheUnzip that will be used.
~TTreeCacheUnzip() override
Destructor. (in general called by the TFile destructor).
static EParUnzipMode GetParallelUnzip()
Static function that returns the parallel option (to indicate an additional thread).
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
Int_t SetBufferSize(Long64_t buffersize) override
Change the underlying buffer size of the cache.
Long64_t fEntryMin
! first entry in the cache
TTreeCache(const TTreeCache &)=delete
this class cannot be copied
Long64_t fEntryNext
! next entry number where cache must be filled
bool fIsLearning
! true if cache is in learning mode
virtual void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
Long64_t fEntryMax
! last entry in the cache
Long64_t fEntryCurrent
! current lowest entry number in the cache
Int_t fNReadPref
Number of blocks that were prefetched.
TTree * fTree
! pointer to the current Tree
virtual void StopLearningPhase()
This is the counterpart of StartLearningPhase() and can be used to stop the learning phase.
Int_t fNbranches
! Number of branches in the cache
void Print(Option_t *option="") const override
Print cache statistics.
Int_t AddBranch(TBranch *b, bool subgbranches=false) override
Add a branch to the list of branches to be stored in the cache this function is called by the user vi...
TObjArray * fBranches
! List of branches to be stored in the cache
Helper class to iterate over cluster of baskets.
A TTree represents a columnar dataset.
virtual TClusterIterator GetClusterIterator(Long64_t firstentry)
Return an iterator over the cluster of baskets starting at firstentry.
virtual Long64_t GetEntries() const
virtual Long64_t GetReadEntry() const
virtual TTree * GetTree() const
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Long64_t BinarySearch(Long64_t n, const T *array, T value)
Binary search in an array of n values to locate value.
void Reset(Int_t oldSize, Int_t newSize)
Reset all baskets' state arrays.
void Clear(Int_t size)
Clear all baskets' state arrays.
bool IsUnzipped(Int_t index) const
Check if the basket is unzipped already.
bool IsFinished(Int_t index) const
std::atomic< Byte_t > * fUnzipStatus
! [fNSeek]
void SetUnzipped(Int_t index, char *buf, Int_t len)
std::vector< Int_t > fUnzipLen
! [fNseek] Length of the unzipped buffers
bool TryUnzipping(Int_t index)
Start unzipping the basket if it is untouched yet.
void SetMissed(Int_t index)
bool IsProgress(Int_t index) const
bool IsUntouched(Int_t index) const
void SetFinished(Int_t index)
Set cache as finished.
std::unique_ptr< char[]> * fUnzipChunks
! [fNseek] Individual unzipped chunks. Their summed size is kept under control.