146 Info(
"TTreeCacheUnzip",
"Enabling Parallel Unzipping");
156 Warning(
"TTreeCacheUnzip",
"Parallel Option unknown");
237 if (entry == -1) entry=0;
271 if (!lbaskets || !entries)
continue;
275 for (
Int_t j=0;j<nb;j++) {
277 if (j<blistsize && b->GetListOfBaskets()->UncheckedAt(j))
continue;
280 Int_t len = lbaskets[j];
281 if (pos <= 0 || len <= 0)
continue;
284 if (entries[j] < entry && (j<nb-1 && entries[j+1] <= entry))
continue;
287 if (j<nb-1) emax = entries[j+1]-1;
288 if (!elist->
ContainsRange(entries[j]+chainOffset,emax+chainOffset))
continue;
426 if (
gDebug > 0)
Info(
"SendSignal",
" fUnzipCondition->Signal()");
456 class TTreeCacheUnzipData {
471 if (nt > 10) nt = 10;
474 Info(
"StartThreadUnzip",
"Going to start %d threads.", nt);
476 for (
Int_t i = 0; i < nt; i++) {
482 Info(
"StartThreadUnzip",
"Going to start thread '%s'", nm.
Data());
484 TTreeCacheUnzipData *d =
new TTreeCacheUnzipData;
490 Error(
"TTreeCacheUnzip::StartThreadUnzip",
" Unable to create new thread.");
515 for (
Int_t i = 0; i < 1; i++) {
542 TTreeCacheUnzipData *d = (TTreeCacheUnzipData *)arg;
548 Int_t thrnum = d->fCount;
549 Int_t startindex = thrnum;
550 Int_t locbuffsz = 16384;
551 char *locbuff =
new char[16384];
560 if (myCycle != unzipMng->
fCycle) startindex = thrnum;
561 myCycle = unzipMng->
fCycle;
562 if (unzipMng->
fNseek) startindex = startindex % unzipMng->
fNseek;
563 else startindex = -1;
567 res = unzipMng->
UnzipCache(startindex, locbuffsz, locbuff);
617 Int_t nread = maxbytes;
620 if (nb < 0)
return nread;
622 const Int_t headerSize = 16;
623 if (nread < headerSize)
return nread;
628 if (!olen) olen = nbytes-klen;
667 Info(
"ResetCache",
"Changing fNseekMax from:%d to:%d", fNseekMax,
fNseek);
675 char **aUnzipChunks =
new char *[
fNseek];
676 memset(aUnzipChunks, 0,
fNseek*
sizeof(
char *));
740 char **aUnzipChunks =
new char *[
fNseek];
741 memset(aUnzipChunks, 0,
fNseek*
sizeof(
char *));
807 if ( myCycle !=
fCycle ) {
809 Info(
"GetUnzipBuffer",
"Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
953 const Int_t hlen=128;
954 Int_t nbytes=0, objlen=0, keylen=0;
962 Error(
"UnzipBuffer",
"Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
976 Bool_t oldCase = objlen==nbytes-keylen
980 if (objlen > nbytes-keylen || oldCase) {
983 memcpy(*dest, src, keylen);
986 char *objbuf = *dest + keylen;
996 Info(
"UnzipBuffer",
" nin:%d, nbuf:%d, bufcur[3] :%d, bufcur[4] :%d, bufcur[5] :%d ",
997 nin, nbuf, bufcur[3], bufcur[4], bufcur[5]);
998 if (oldCase && (nin > objlen || nbuf > objlen)) {
1000 Info(
"UnzipBuffer",
"oldcase objlen :%d ", objlen);
1003 memcpy( *dest + keylen, src + keylen, objlen);
1008 R__unzip(&nin, bufcur, &nbuf, objbuf, &nout);
1011 Info(
"UnzipBuffer",
"R__unzip nin:%d, bufcur:%p, nbuf:%d, objbuf:%p, nout:%d",
1012 nin, bufcur, nbuf, objbuf, nout);
1016 if (noutot >= objlen)
break;
1021 if (noutot != objlen) {
1022 Error(
"UnzipBuffer",
"nbytes = %d, keylen = %d, objlen = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d",
1023 nbytes,keylen,objlen, noutot,nout,nin,nbuf);
1025 if(alloc)
delete [] *
dest;
1031 memcpy(*dest, src, keylen);
1033 memcpy(*dest + keylen, src + keylen, objlen);
1063 const Int_t hlen=128;
1064 Int_t objlen=0, keylen=0;
1068 Int_t idxtounzip = -1;
1076 Info(
"UnzipCache",
"Sudden Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1092 Int_t reqi = (startindex+ii) % fNseek;
1098 rdoffs =
fSeek[idxtounzip];
1109 if (idxtounzip < 0) {
1111 Info(
"UnzipCache",
"Nothing to do... startindex:%d fTotalUnzipBytes:%lld fUnzipBufferSize:%lld fNseek:%d",
1121 Info(
"UnzipCache",
"Sudden Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1129 if(locbuffsz < rdlen) {
1130 if (locbuff)
delete [] locbuff;
1132 locbuff =
new char[locbuffsz];
1134 }
else if(locbuffsz > rdlen*3) {
1135 if (locbuff)
delete [] locbuff;
1136 locbuffsz = rdlen*2;
1137 locbuff =
new char[locbuffsz];
1142 Info(
"UnzipCache",
"Going to unzip block %d", idxtounzip);
1151 Info(
"UnzipCache",
"Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1168 Info(
"UnzipCache",
"Block %d not done. rdoffs=%lld rdlen=%d readbuf=%d", idxtounzip, rdoffs, rdlen, readbuf);
1174 Int_t len = (objlen > nbytes-keylen)? keylen+objlen : nbytes;
1182 Info(
"UnzipCache",
"Block %d is too big, skipping.", idxtounzip);
1200 if ((loclen > 0) && (loclen == objlen+keylen)) {
1205 Info(
"UnzipCache",
"Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1226 Info(
"UnzipCache",
"reqi:%d, rdoffs:%lld, rdlen: %d, loclen:%d",
1227 idxtounzip, rdoffs, rdlen, loclen);
1233 Info(
"argh",
"loclen:%d objlen:%d loc:%d readbuf:%d", loclen, objlen, loc, readbuf);
1246 printf(
"******TreeCacheUnzip statistics for file: %s ******\n",
fFile->
GetName());
1248 printf(
"Number of blocks unzipped by threads: %d\n",
fNUnzip);
1249 printf(
"Number of hits: %d\n",
fNFound);
1250 printf(
"Number of stalls: %d\n",
fNStalls);
1251 printf(
"Number of misses: %d\n",
fNMissed);
TCondition * fUnzipStartCondition
Used to signal the threads to start.
virtual const char * GetName() const
Returns name of object.
static Int_t SetCancelDeferred()
Static method to set the cancellation response type of the calling thread to deferred, i.e.
Int_t fNtot
Total size of prefetched blocks.
void frombuf(char *&buf, Bool_t *x)
Long64_t fEntryMax
! last entry in the cache
Long64_t * GetBasketEntry() const
Int_t fNUnzip
! number of blocks that were unzipped
void WaitUnzipStartSignal()
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
TFile * fFile
Pointer to file.
Int_t StopThreadUnzip()
To stop the thread we only need to change the value of the variable fActiveThread to false and the lo...
void UpdateBranches(TTree *tree)
update pointer to current Tree and recompute pointers to the branches in the cache ...
TObjArray * GetListOfBaskets()
virtual void ResetCache()
This will delete the list of buffers that are in the unzipping cache and will reset certain values in...
Int_t fNStalls
! number of hits which caused a stall
TObjArray * fBranches
! List of branches to be stored in the cache
Int_t GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
Read the logical record header from the buffer buf.
virtual void StopLearningPhase()
It's the same as TTreeCache::StopLearningPhase but we guarantee that we start the unzipping just afte...
TMutex * fMutexList
Mutex to protect the various lists. Used by the condvars.
A specialized TFileCacheRead object for a TTree.
virtual void Seek(Long64_t offset, ERelativeTo pos=kBeg)
Seek to a specific position in the file. Pos it either kBeg, kCur or kEnd.
static Int_t SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option=TTreeCacheUnzip::kEnable)
Static function that (de)activates multithreading unzipping.
Byte_t * fUnzipStatus
! [fNSeek] For each blk, tells us if it's unzipped or pending
virtual Bool_t ReadBuffer(char *buf, Int_t len)
Read a buffer from the file.
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
TDirectory * GetDirectory() const
Long64_t fEntryMin
! first entry in the cache
Bool_t FillBuffer()
Fill the cache buffer with the branches in the cache.
static Int_t SetCancelOn()
Static method to turn on thread cancellation.
virtual void Prefetch(Long64_t pos, Int_t len)
Add block of length len at position pos in the list of blocks to be prefetched.
Int_t * fUnzipLen
! [fNseek] Length of the unzipped buffers
virtual void StopLearningPhase()
This is the counterpart of StartLearningPhase() and can be used to stop the learning phase...
TThread * fUnzipThread[10]
virtual Int_t AddBranch(TBranch *b, Bool_t subgbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
Long64_t * GetTreeOffset() const
void SetUnzipBufferSize(Long64_t bufferSize)
Sets the size for the unzipping cache...
Bool_t fActiveThread
Used to terminate gracefully the unzippers.
Int_t fNFound
! number of blocks that were found in the cache
static Bool_t IsParallelUnzip()
Static function that tells wether the multithreading unzipping is activated.
Int_t UnzipBuffer(char **dest, char *src)
Unzips a ROOT specific buffer...
virtual Int_t GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, Bool_t *free)
We try to read a buffer that has already been unzipped Returns -1 in case of read failure...
std::queue< Int_t > fActiveBlks
The blocks which are active now.
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
Helper class to iterate over cluster of baskets.
Int_t * GetBasketBytes() const
static void * UnzipLoop(void *arg)
This is a static function.
virtual int GetSysInfo(SysInfo_t *info) const
Returns static system info, like OS type, CPU type, number of CPUs RAM size, etc into the SysInfo_t s...
Int_t * fSeekLen
[fNseek] Length of buffers to be prefetched
virtual Long64_t GetReadEntry() const
Bool_t fIsTransferred
True when fBuffer contains something valid.
Int_t fNMissed
! number of blocks that were not found in the cache and were unzipped
virtual Bool_t ContainsRange(Long64_t entrymin, Long64_t entrymax)
Return TRUE if list contains entries from entrymin to entrymax included.
virtual TClusterIterator GetClusterIterator(Long64_t firstentry)
Return an iterator over the cluster of baskets starting at firstentry.
int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout)
void Print(Option_t *option="") const
Print cache statistics.
static Long_t SelfId()
Static method returning the id for the current thread.
Specialization of TTreeCache for parallel Unzipping.
Int_t Run(void *arg=0)
Start the thread.
Bool_t fParallel
Indicate if we want to activate the parallelism (for this instance)
virtual TFile * GetFile() const
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
TTree * fTree
! pointer to the current Tree
virtual Int_t AddBranch(TBranch *b, Bool_t subbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
static Double_t fgRelBuffSize
This is the percentage of the TTreeCacheUnzip that will be used.
void SendUnzipStartSignal(Bool_t broadcast)
This will send the signal corresponfing to the queue...
virtual ~TTreeCacheUnzip()
Destructor. (in general called by the TFile destructor)
virtual void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
R__EXTERN TSystem * gSystem
Int_t * fSeekIndex
[fNseek] sorted index table of fSeek
Int_t fNseekMax
! fNseek can change so we need to know its max size
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Int_t GetMaxBaskets() const
static EParUnzipMode GetParallelUnzip()
Static function that returns the parallel option (to indicate an additional thread) ...
A TEventList object is a list of selected events (entries) in a TTree.
Long_t Join(void **ret=0)
Join this thread.
static constexpr double nm
Int_t fNReadPref
Number of blocks that were prefetched.
virtual Long64_t GetBasketSeek(Int_t basket) const
Return address of basket in the file.
Long64_t fEntryCurrent
! current lowest entry number in the cache
Bool_t IsQueueEmpty()
It says if the queue is empty... useful to see if we have to process it.
TObject * UncheckedAt(Int_t i) const
virtual void Print(Option_t *option="") const
Print cache statistics.
Bool_t fIsLearning
! true if cache is in learning mode
Long64_t fUnzipBufferSize
! Max Size for the ready unzipped blocks (default is 2*fBufferSize)
virtual Int_t GetTreeNumber() const
virtual Int_t GetBufferSize() const
TCondition * fUnzipDoneCondition
Used to wait for an unzip tour to finish. Gives the Async feel.
#define R__LOCKGUARD(mutex)
void Init()
Initialization procedure common to all the constructors.
static void SetUnzipRelBufferSize(Float_t relbufferSize)
static function: Sets the unzip relatibe buffer size
static TTreeCacheUnzip::EParUnzipMode fgParallel
Indicate if we want to activate the parallelism.
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout)
virtual Long64_t GetEntries() const
char ** fUnzipChunks
! [fNseek] Individual unzipped chunks. Their summed size is kept under control.
Int_t fNbranches
! Number of branches in the cache
Long64_t * fSeekSort
[fNseek] Position on file of buffers to be prefetched (sorted)
Bool_t IsActiveThread()
This indicates if the thread is active in this moment...
Int_t StartThreadUnzip(Int_t nthreads)
The Thread is only a part of the TTreeCache but it is the part that waits for info in the queue and p...
#define dest(otri, vertexptr)
Long64_t * fSeek
[fNseek] Position on file of buffers to be prefetched
A chain is a collection of files containing TTree objects.
you should not use this method at all Int_t Int_t Double_t Double_t Double_t Int_t Double_t Double_t Double_t Double_t b
Int_t TimedWaitRelative(ULong_t ms)
Wait to be signaled or till the timer times out.
A TTree object has a header with a name and a title.
TEventList * GetEventList() const
virtual const char * GetName() const
Returns name of object.
virtual Int_t GetSize() const
A TTree is a list of TBranches.
Long64_t fEntryNext
! next entry number where cache must be filled
Long64_t fTotalUnzipBytes
! The total sum of the currently unzipped blks
Int_t UnzipCache(Int_t &startindex, Int_t &locbuffsz, char *&locbuff)
This inflates all the buffers in the cache.
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
Long64_t BinarySearch(Long64_t n, const T *array, T value)
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
virtual Bool_t ReadBufferAsync(Long64_t offs, Int_t len)
Int_t fNseek
Number of blocks to be prefetched.
const char * Data() const