74 fAsyncReading(kFALSE),
144 Info(
"TTreeCacheUnzip",
"Enabling Parallel Unzipping");
154 Warning(
"TTreeCacheUnzip",
"Parallel Option unknown");
235 if (entry == -1) entry=0;
269 if (!lbaskets || !entries)
continue;
273 for (
Int_t j=0;j<nb;j++) {
275 if (j<blistsize && b->GetListOfBaskets()->UncheckedAt(j))
continue;
278 Int_t len = lbaskets[j];
279 if (pos <= 0 || len <= 0)
continue;
282 if (entries[j] < entry && (j<nb-1 && entries[j+1] <= entry))
continue;
285 if (j<nb-1) emax = entries[j+1]-1;
286 if (!elist->
ContainsRange(entries[j]+chainOffset,emax+chainOffset))
continue;
424 if (
gDebug > 0)
Info(
"SendSignal",
" fUnzipCondition->Signal()");
454 class TTreeCacheUnzipData {
469 if (nt > 10) nt = 10;
472 Info(
"StartThreadUnzip",
"Going to start %d threads.", nt);
474 for (
Int_t i = 0; i < nt; i++) {
480 Info(
"StartThreadUnzip",
"Going to start thread '%s'", nm.
Data());
482 TTreeCacheUnzipData *d =
new TTreeCacheUnzipData;
488 Error(
"TTreeCacheUnzip::StartThreadUnzip",
" Unable to create new thread.");
513 for (
Int_t i = 0; i < 1; i++) {
540 TTreeCacheUnzipData *d = (TTreeCacheUnzipData *)arg;
546 Int_t thrnum = d->fCount;
547 Int_t startindex = thrnum;
548 Int_t locbuffsz = 16384;
549 char *locbuff =
new char[16384];
558 if (myCycle != unzipMng->
fCycle) startindex = thrnum;
559 myCycle = unzipMng->
fCycle;
560 if (unzipMng->
fNseek) startindex = startindex % unzipMng->
fNseek;
561 else startindex = -1;
565 res = unzipMng->
UnzipCache(startindex, locbuffsz, locbuff);
615 Int_t nread = maxbytes;
618 if (nb < 0)
return nread;
620 const Int_t headerSize = 16;
621 if (nread < headerSize)
return nread;
626 if (!olen) olen = nbytes-klen;
665 Info(
"ResetCache",
"Changing fNseekMax from:%d to:%d", fNseekMax,
fNseek);
673 char **aUnzipChunks =
new char *[
fNseek];
674 memset(aUnzipChunks, 0,
fNseek*
sizeof(
char *));
738 char **aUnzipChunks =
new char *[
fNseek];
739 memset(aUnzipChunks, 0,
fNseek*
sizeof(
char *));
805 if ( myCycle !=
fCycle ) {
807 Info(
"GetUnzipBuffer",
"Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
951 const Int_t hlen=128;
952 Int_t nbytes=0, objlen=0, keylen=0;
960 Error(
"UnzipBuffer",
"Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
974 Bool_t oldCase = objlen==nbytes-keylen
978 if (objlen > nbytes-keylen || oldCase) {
981 memcpy(*dest, src, keylen);
984 char *objbuf = *dest + keylen;
994 Info(
"UnzipBuffer",
" nin:%d, nbuf:%d, bufcur[3] :%d, bufcur[4] :%d, bufcur[5] :%d ",
995 nin, nbuf, bufcur[3], bufcur[4], bufcur[5]);
996 if (oldCase && (nin > objlen || nbuf > objlen)) {
998 Info(
"UnzipBuffer",
"oldcase objlen :%d ", objlen);
1001 memcpy( *dest + keylen, src + keylen, objlen);
1006 R__unzip(&nin, bufcur, &nbuf, objbuf, &nout);
1009 Info(
"UnzipBuffer",
"R__unzip nin:%d, bufcur:%p, nbuf:%d, objbuf:%p, nout:%d",
1010 nin, bufcur, nbuf, objbuf, nout);
1014 if (noutot >= objlen)
break;
1019 if (noutot != objlen) {
1020 Error(
"UnzipBuffer",
"nbytes = %d, keylen = %d, objlen = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d",
1021 nbytes,keylen,objlen, noutot,nout,nin,nbuf);
1023 if(alloc)
delete [] *
dest;
1029 memcpy(*dest, src, keylen);
1031 memcpy(*dest + keylen, src + keylen, objlen);
1061 const Int_t hlen=128;
1062 Int_t objlen=0, keylen=0;
1066 Int_t idxtounzip = -1;
1074 Info(
"UnzipCache",
"Sudden Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1090 Int_t reqi = (startindex+ii) % fNseek;
1096 rdoffs =
fSeek[idxtounzip];
1107 if (idxtounzip < 0) {
1109 Info(
"UnzipCache",
"Nothing to do... startindex:%d fTotalUnzipBytes:%lld fUnzipBufferSize:%lld fNseek:%d",
1119 Info(
"UnzipCache",
"Sudden Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1127 if(locbuffsz < rdlen) {
1128 if (locbuff)
delete [] locbuff;
1130 locbuff =
new char[locbuffsz];
1132 }
else if(locbuffsz > rdlen*3) {
1133 if (locbuff)
delete [] locbuff;
1134 locbuffsz = rdlen*2;
1135 locbuff =
new char[locbuffsz];
1140 Info(
"UnzipCache",
"Going to unzip block %d", idxtounzip);
1149 Info(
"UnzipCache",
"Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1166 Info(
"UnzipCache",
"Block %d not done. rdoffs=%lld rdlen=%d readbuf=%d", idxtounzip, rdoffs, rdlen, readbuf);
1172 Int_t len = (objlen > nbytes-keylen)? keylen+objlen : nbytes;
1180 Info(
"UnzipCache",
"Block %d is too big, skipping.", idxtounzip);
1198 if ((loclen > 0) && (loclen == objlen+keylen)) {
1203 Info(
"UnzipCache",
"Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1224 Info(
"UnzipCache",
"reqi:%d, rdoffs:%lld, rdlen: %d, loclen:%d",
1225 idxtounzip, rdoffs, rdlen, loclen);
1231 Info(
"argh",
"loclen:%d objlen:%d loc:%d readbuf:%d", loclen, objlen, loc, readbuf);
1247 printf(
"Number of blocks unzipped by threads: %d\n",
fNUnzip);
TCondition * fUnzipStartCondition
void Print(Option_t *option="") const
Print cache statistics.
static Int_t SetCancelDeferred()
Static method to set the cancellation response type of the calling thread to deferred, i.e.
Int_t fNtot
Total size of prefetched blocks.
void frombuf(char *&buf, Bool_t *x)
Long64_t fEntryMax
first entry in the cache
void WaitUnzipStartSignal()
TFile * fFile
Pointer to file.
Int_t StopThreadUnzip()
To stop the thread we only need to change the value of the variable fActiveThread to false and the lo...
void UpdateBranches(TTree *tree)
update pointer to current Tree and recompute pointers to the branches in the cache ...
TObjArray * GetListOfBaskets()
virtual void ResetCache()
This will delete the list of buffers that are in the unzipping cache and will reset certain values in...
Int_t fNStalls
number of blocks that were found in the cache
TEventList * GetEventList() const
virtual Long64_t GetReadEntry() const
Int_t GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
Read the logical record header from the buffer buf.
virtual void StopLearningPhase()
It's the same as TTreeCache::StopLearningPhase but we guarantee that we start the unzipping just afte...
A specialized TFileCacheRead object for a TTree.
virtual void Seek(Long64_t offset, ERelativeTo pos=kBeg)
Seek to a specific position in the file. Pos it either kBeg, kCur or kEnd.
static Int_t SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option=TTreeCacheUnzip::kEnable)
Static function that (de)activates multithreading unzipping.
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Byte_t * fUnzipStatus
[fNseek] Individual unzipped chunks. Their summed size is kept under control.
virtual Bool_t ReadBuffer(char *buf, Int_t len)
Read a buffer from the file.
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
Bool_t FillBuffer()
Fill the cache buffer with the branches in the cache.
static Int_t SetCancelOn()
Static method to turn on thread cancellation.
virtual void Prefetch(Long64_t pos, Int_t len)
Add block of length len at position pos in the list of blocks to be prefetched.
virtual Long64_t GetBasketSeek(Int_t basket) const
Return address of basket in the file.
virtual void StopLearningPhase()
This is the counterpart of StartLearningPhase() and can be used to stop the learning phase...
Long64_t * GetBasketEntry() const
TThread * fUnzipThread[10]
virtual Int_t AddBranch(TBranch *b, Bool_t subgbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
void SetUnzipBufferSize(Long64_t bufferSize)
Sets the size for the unzipping cache...
virtual void Print(Option_t *option="") const
Print cache statistics.
Int_t fNFound
number of blocks that were unzipped
static Bool_t IsParallelUnzip()
Static function that tells wether the multithreading unzipping is activated.
Int_t UnzipBuffer(char **dest, char *src)
Unzips a ROOT specific buffer...
virtual Int_t GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, Bool_t *free)
We try to read a buffer that has already been unzipped Returns -1 in case of read failure...
const char * Data() const
std::queue< Int_t > fActiveBlks
number of blocks that were not found in the cache and were unzipped
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
Long64_t * GetTreeOffset() const
Helper class to iterate over cluster of baskets.
static void * UnzipLoop(void *arg)
This is a static function.
Vc_ALWAYS_INLINE void free(T *p)
Frees memory that was allocated with Vc::malloc.
virtual Int_t GetTreeNumber() const
Int_t * fSeekLen
[fNseek] Length of buffers to be prefetched
Bool_t fIsTransferred
True when fBuffer contains something valid.
Int_t fNMissed
number of hits which caused a stall
Int_t * GetBasketBytes() const
virtual Bool_t ContainsRange(Long64_t entrymin, Long64_t entrymax)
Return TRUE if list contains entries from entrymin to entrymax included.
void Init(TClassEdit::TInterpreterLookupHelper *helper)
virtual Int_t GetBufferSize() const
virtual TClusterIterator GetClusterIterator(Long64_t firstentry)
Return an iterator over the cluster of baskets starting at firstentry.
int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout)
static Long_t SelfId()
Static method returning the id for the current thread.
Specialization of TTreeCache for parallel Unzipping.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Int_t Run(void *arg=0)
Start the thread.
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
TTree * fTree
list of branch names in the cache
virtual Int_t AddBranch(TBranch *b, Bool_t subbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
virtual TFile * GetFile() const
TObject * UncheckedAt(Int_t i) const
static Double_t fgRelBuffSize
Max Size for the ready unzipped blocks (default is 2*fBufferSize)
void SendUnzipStartSignal(Bool_t broadcast)
This will send the signal corresponfing to the queue...
virtual ~TTreeCacheUnzip()
Destructor. (in general called by the TFile destructor)
virtual void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
R__EXTERN TSystem * gSystem
Int_t * fSeekIndex
[fNseek] sorted index table of fSeek
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Int_t fNseekMax
The total sum of the currently unzipped blks.
static EParUnzipMode GetParallelUnzip()
Static function that returns the parallel option (to indicate an additional thread) ...
A TEventList object is a list of selected events (entries) in a TTree.
virtual const char * GetName() const
Returns name of object.
Long_t Join(void **ret=0)
Join this thread.
Long64_t fEntryCurrent
last entry in the cache
Bool_t IsQueueEmpty()
It says if the queue is empty... useful to see if we have to process it.
virtual Int_t GetSize() const
virtual const char * GetName() const
Returns name of object.
Bool_t fIsLearning
pointer to the current Tree
Long64_t fUnzipBufferSize
fNseek can change so we need to know its max size
TCondition * fUnzipDoneCondition
ClassImp(TMCParticle) void TMCParticle printf(": p=(%7.3f,%7.3f,%9.3f) ;", fPx, fPy, fPz)
#define R__LOCKGUARD(mutex)
void Init()
Initialization procedure common to all the constructors.
static void SetUnzipRelBufferSize(Float_t relbufferSize)
static function: Sets the unzip relatibe buffer size
static TTreeCacheUnzip::EParUnzipMode fgParallel
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout)
char ** fUnzipChunks
[fNseek] Length of the unzipped buffers
TDirectory * GetDirectory() const
Int_t fNbranches
next entry number where cache must be filled
Long64_t * fSeekSort
[fNseek] Position on file of buffers to be prefetched (sorted)
Bool_t IsActiveThread()
This indicates if the thread is active in this moment...
Int_t GetMaxBaskets() const
Int_t StartThreadUnzip(Int_t nthreads)
The Thread is only a part of the TTreeCache but it is the part that waits for info in the queue and p...
#define dest(otri, vertexptr)
Long64_t * fSeek
[fNseek] Position on file of buffers to be prefetched
A chain is a collection of files containg TTree objects.
ClassImp(TTreeCacheUnzip) TTreeCacheUnzip
Int_t TimedWaitRelative(ULong_t ms)
Wait to be signaled or till the timer times out.
virtual Long64_t GetEntries() const
A TTree object has a header with a name and a title.
A TTree is a list of TBranches.
Long64_t fEntryNext
current lowest entry number in the cache
Long64_t fTotalUnzipBytes
[fNSeek] For each blk, tells us if it's unzipped or pending
Int_t UnzipCache(Int_t &startindex, Int_t &locbuffsz, char *&locbuff)
This inflates all the buffers in the cache.
void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
Long64_t BinarySearch(Long64_t n, const T *array, T value)
virtual int GetSysInfo(SysInfo_t *info) const
Returns static system info, like OS type, CPU type, number of CPUs RAM size, etc into the SysInfo_t s...
virtual Bool_t ReadBufferAsync(Long64_t offs, Int_t len)
Int_t fNseek
Number of blocks to be prefetched.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.