75 fAsyncReading(kFALSE),
145 Info(
"TTreeCacheUnzip",
"Enabling Parallel Unzipping");
155 Warning(
"TTreeCacheUnzip",
"Parallel Option unknown");
236 if (entry == -1) entry=0;
270 if (!lbaskets || !entries)
continue;
274 for (
Int_t j=0;j<nb;j++) {
276 if (j<blistsize && b->GetListOfBaskets()->UncheckedAt(j))
continue;
279 Int_t len = lbaskets[j];
280 if (pos <= 0 || len <= 0)
continue;
283 if (entries[j] < entry && (j<nb-1 && entries[j+1] <= entry))
continue;
286 if (j<nb-1) emax = entries[j+1]-1;
287 if (!elist->
ContainsRange(entries[j]+chainOffset,emax+chainOffset))
continue;
425 if (
gDebug > 0)
Info(
"SendSignal",
" fUnzipCondition->Signal()");
455 class TTreeCacheUnzipData {
470 if (nt > 10) nt = 10;
473 Info(
"StartThreadUnzip",
"Going to start %d threads.", nt);
475 for (
Int_t i = 0; i < nt; i++) {
481 Info(
"StartThreadUnzip",
"Going to start thread '%s'", nm.
Data());
483 TTreeCacheUnzipData *
d =
new TTreeCacheUnzipData;
489 Error(
"TTreeCacheUnzip::StartThreadUnzip",
" Unable to create new thread.");
514 for (
Int_t i = 0; i < 1; i++) {
541 TTreeCacheUnzipData *
d = (TTreeCacheUnzipData *)arg;
547 Int_t thrnum = d->fCount;
548 Int_t startindex = thrnum;
549 Int_t locbuffsz = 16384;
550 char *locbuff =
new char[16384];
559 if (myCycle != unzipMng->
fCycle) startindex = thrnum;
560 myCycle = unzipMng->
fCycle;
561 if (unzipMng->
fNseek) startindex = startindex % unzipMng->
fNseek;
562 else startindex = -1;
566 res = unzipMng->
UnzipCache(startindex, locbuffsz, locbuff);
616 Int_t nread = maxbytes;
619 if (nb < 0)
return nread;
621 const Int_t headerSize = 16;
622 if (nread < headerSize)
return nread;
627 if (!olen) olen = nbytes-klen;
666 Info(
"ResetCache",
"Changing fNseekMax from:%d to:%d", fNseekMax,
fNseek);
674 char **aUnzipChunks =
new char *[
fNseek];
675 memset(aUnzipChunks, 0,
fNseek*
sizeof(
char *));
739 char **aUnzipChunks =
new char *[
fNseek];
740 memset(aUnzipChunks, 0,
fNseek*
sizeof(
char *));
806 if ( myCycle !=
fCycle ) {
808 Info(
"GetUnzipBuffer",
"Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
952 const Int_t hlen=128;
953 Int_t nbytes=0, objlen=0, keylen=0;
961 Error(
"UnzipBuffer",
"Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
975 Bool_t oldCase = objlen==nbytes-keylen
979 if (objlen > nbytes-keylen || oldCase) {
982 memcpy(*dest, src, keylen);
985 char *objbuf = *dest + keylen;
995 Info(
"UnzipBuffer",
" nin:%d, nbuf:%d, bufcur[3] :%d, bufcur[4] :%d, bufcur[5] :%d ",
996 nin, nbuf, bufcur[3], bufcur[4], bufcur[5]);
997 if (oldCase && (nin > objlen || nbuf > objlen)) {
999 Info(
"UnzipBuffer",
"oldcase objlen :%d ", objlen);
1002 memcpy( *dest + keylen, src + keylen, objlen);
1007 R__unzip(&nin, bufcur, &nbuf, objbuf, &nout);
1010 Info(
"UnzipBuffer",
"R__unzip nin:%d, bufcur:%p, nbuf:%d, objbuf:%p, nout:%d",
1011 nin, bufcur, nbuf, objbuf, nout);
1015 if (noutot >= objlen)
break;
1020 if (noutot != objlen) {
1021 Error(
"UnzipBuffer",
"nbytes = %d, keylen = %d, objlen = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d",
1022 nbytes,keylen,objlen, noutot,nout,nin,nbuf);
1024 if(alloc)
delete [] *
dest;
1030 memcpy(*dest, src, keylen);
1032 memcpy(*dest + keylen, src + keylen, objlen);
1062 const Int_t hlen=128;
1063 Int_t objlen=0, keylen=0;
1067 Int_t idxtounzip = -1;
1075 Info(
"UnzipCache",
"Sudden Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1091 Int_t reqi = (startindex+
ii) % fNseek;
1097 rdoffs =
fSeek[idxtounzip];
1108 if (idxtounzip < 0) {
1110 Info(
"UnzipCache",
"Nothing to do... startindex:%d fTotalUnzipBytes:%lld fUnzipBufferSize:%lld fNseek:%d",
1120 Info(
"UnzipCache",
"Sudden Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1128 if(locbuffsz < rdlen) {
1129 if (locbuff)
delete [] locbuff;
1131 locbuff =
new char[locbuffsz];
1134 if(locbuffsz > rdlen*3) {
1135 if (locbuff)
delete [] locbuff;
1136 locbuffsz = rdlen*2;
1137 locbuff =
new char[locbuffsz];
1142 Info(
"UnzipCache",
"Going to unzip block %d", idxtounzip);
1151 Info(
"UnzipCache",
"Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1168 Info(
"UnzipCache",
"Block %d not done. rdoffs=%lld rdlen=%d readbuf=%d", idxtounzip, rdoffs, rdlen, readbuf);
1174 Int_t len = (objlen > nbytes-keylen)? keylen+objlen : nbytes;
1182 Info(
"UnzipCache",
"Block %d is too big, skipping.", idxtounzip);
1200 if ((loclen > 0) && (loclen == objlen+keylen)) {
1205 Info(
"UnzipCache",
"Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1226 Info(
"UnzipCache",
"reqi:%d, rdoffs:%lld, rdlen: %d, loclen:%d",
1227 idxtounzip, rdoffs, rdlen, loclen);
1233 Info(
"argh",
"loclen:%d objlen:%d loc:%d readbuf:%d", loclen, objlen, loc, readbuf);
1249 printf(
"Number of blocks unzipped by threads: %d\n",
fNUnzip);
TCondition * fUnzipStartCondition
void Print(Option_t *option="") const
Print cache statistics.
static Int_t SetCancelDeferred()
Static method to set the cancellation response type of the calling thread to deferred, i.e.
Int_t fNtot
Total size of prefetched blocks.
void frombuf(char *&buf, Bool_t *x)
Long64_t fEntryMax
first entry in the cache
void WaitUnzipStartSignal()
TFile * fFile
Pointer to file.
Int_t StopThreadUnzip()
To stop the thread we only need to change the value of the variable fActiveThread to false and the lo...
void UpdateBranches(TTree *tree)
update pointer to current Tree and recompute pointers to the branches in the cache ...
TObjArray * GetListOfBaskets()
virtual void ResetCache()
This will delete the list of buffers that are in the unzipping cache and will reset certain values in...
Int_t fNStalls
number of blocks that were found in the cache
TEventList * GetEventList() const
virtual Long64_t GetReadEntry() const
Int_t GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
Read the logical record header from the buffer buf.
virtual void StopLearningPhase()
It's the same as TTreeCache::StopLearningPhase but we guarantee that we start the unzipping just afte...
A specialized TFileCacheRead object for a TTree.
virtual void Seek(Long64_t offset, ERelativeTo pos=kBeg)
Seek to a specific position in the file. Pos it either kBeg, kCur or kEnd.
static Int_t SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option=TTreeCacheUnzip::kEnable)
Static function that (de)activates multithreading unzipping.
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Byte_t * fUnzipStatus
[fNseek] Individual unzipped chunks. Their summed size is kept under control.
virtual Bool_t ReadBuffer(char *buf, Int_t len)
Read a buffer from the file.
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
Bool_t FillBuffer()
Fill the cache buffer with the branches in the cache.
static Int_t SetCancelOn()
Static method to turn on thread cancellation.
virtual void Prefetch(Long64_t pos, Int_t len)
Add block of length len at position pos in the list of blocks to be prefetched.
virtual Long64_t GetBasketSeek(Int_t basket) const
Return address of basket in the file.
virtual void StopLearningPhase()
This is the counterpart of StartLearningPhase() and can be used to stop the learning phase...
Long64_t * GetBasketEntry() const
TThread * fUnzipThread[10]
virtual Int_t AddBranch(TBranch *b, Bool_t subgbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
void SetUnzipBufferSize(Long64_t bufferSize)
Sets the size for the unzipping cache...
virtual void Print(Option_t *option="") const
Print cache statistics.
Int_t fNFound
number of blocks that were unzipped
static Bool_t IsParallelUnzip()
Static function that tells wether the multithreading unzipping is activated.
Int_t UnzipBuffer(char **dest, char *src)
Unzips a ROOT specific buffer...
virtual Int_t GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, Bool_t *free)
We try to read a buffer that has already been unzipped Returns -1 in case of read failure...
const char * Data() const
std::queue< Int_t > fActiveBlks
number of blocks that were not found in the cache and were unzipped
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
Long64_t * GetTreeOffset() const
Helper class to iterate over cluster of baskets.
static void * UnzipLoop(void *arg)
This is a static function.
virtual Int_t GetTreeNumber() const
Int_t * fSeekLen
[fNseek] Length of buffers to be prefetched
Bool_t fIsTransferred
True when fBuffer contains something valid.
Int_t fNMissed
number of hits which caused a stall
Int_t * GetBasketBytes() const
virtual Bool_t ContainsRange(Long64_t entrymin, Long64_t entrymax)
Return TRUE if list contains entries from entrymin to entrymax included.
void Init(TClassEdit::TInterpreterLookupHelper *helper)
virtual Int_t GetBufferSize() const
virtual TClusterIterator GetClusterIterator(Long64_t firstentry)
Return an iterator over the cluster of baskets starting at firstentry.
int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout)
static Long_t SelfId()
Static method returning the id for the current thread.
Specialization of TTreeCache for parallel Unzipping.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Int_t Run(void *arg=0)
Start the thread.
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
TTree * fTree
list of branch names in the cache
virtual Int_t AddBranch(TBranch *b, Bool_t subbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
virtual TFile * GetFile() const
TObject * UncheckedAt(Int_t i) const
static Double_t fgRelBuffSize
Max Size for the ready unzipped blocks (default is 2*fBufferSize)
void SendUnzipStartSignal(Bool_t broadcast)
This will send the signal corresponfing to the queue...
virtual ~TTreeCacheUnzip()
Destructor. (in general called by the TFile destructor)
virtual void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
R__EXTERN TSystem * gSystem
Int_t * fSeekIndex
[fNseek] sorted index table of fSeek
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Int_t fNseekMax
The total sum of the currently unzipped blks.
static EParUnzipMode GetParallelUnzip()
Static function that returns the parallel option (to indicate an additional thread) ...
A TEventList object is a list of selected events (entries) in a TTree.
virtual const char * GetName() const
Returns name of object.
Long_t Join(void **ret=0)
Join this thread.
Long64_t fEntryCurrent
last entry in the cache
Bool_t IsQueueEmpty()
It says if the queue is empty... useful to see if we have to process it.
virtual Int_t GetSize() const
virtual const char * GetName() const
Returns name of object.
Bool_t fIsLearning
pointer to the current Tree
Long64_t fUnzipBufferSize
fNseek can change so we need to know its max size
TCondition * fUnzipDoneCondition
ClassImp(TMCParticle) void TMCParticle printf(": p=(%7.3f,%7.3f,%9.3f) ;", fPx, fPy, fPz)
#define R__LOCKGUARD(mutex)
void Init()
Initialization procedure common to all the constructors.
static void SetUnzipRelBufferSize(Float_t relbufferSize)
static function: Sets the unzip relatibe buffer size
static TTreeCacheUnzip::EParUnzipMode fgParallel
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout)
char ** fUnzipChunks
[fNseek] Length of the unzipped buffers
TDirectory * GetDirectory() const
Int_t fNbranches
next entry number where cache must be filled
Long64_t * fSeekSort
[fNseek] Position on file of buffers to be prefetched (sorted)
Bool_t IsActiveThread()
This indicates if the thread is active in this moment...
Int_t GetMaxBaskets() const
Int_t StartThreadUnzip(Int_t nthreads)
The Thread is only a part of the TTreeCache but it is the part that waits for info in the queue and p...
#define dest(otri, vertexptr)
Long64_t * fSeek
[fNseek] Position on file of buffers to be prefetched
A chain is a collection of files containg TTree objects.
ClassImp(TTreeCacheUnzip) TTreeCacheUnzip
Int_t TimedWaitRelative(ULong_t ms)
Wait to be signaled or till the timer times out.
virtual Long64_t GetEntries() const
A TTree object has a header with a name and a title.
A TTree is a list of TBranches.
Long64_t fEntryNext
current lowest entry number in the cache
Long64_t fTotalUnzipBytes
[fNSeek] For each blk, tells us if it's unzipped or pending
Int_t UnzipCache(Int_t &startindex, Int_t &locbuffsz, char *&locbuff)
This inflates all the buffers in the cache.
void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
Long64_t BinarySearch(Long64_t n, const T *array, T value)
virtual int GetSysInfo(SysInfo_t *info) const
Returns static system info, like OS type, CPU type, number of CPUs RAM size, etc into the SysInfo_t s...
virtual Bool_t ReadBufferAsync(Long64_t offs, Int_t len)
Int_t fNseek
Number of blocks to be prefetched.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.