ROOT  6.06/09
Reference Guide
TTreeCacheUnzip.cxx
Go to the documentation of this file.
1 // @(#)root/tree:$Id$
2 // Author: Leandro Franco 10/04/2008
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /** \class TTreeCacheUnzip
13 
14 Specialization of TTreeCache for parallel Unzipping.
15 
16 Fabrizio Furano (CERN) Aug 2009
17 Core TTree-related code borrowed from the previous version
18  by Leandro Franco and Rene Brun
19 
20 ## Parallel Unzipping
21 
22 TTreeCache has been specialised in order to let additional threads
23 free to unzip in advance its content. In this implementation we
24 support up to 10 threads, but right now it makes more sense to
25 limit their number to 1-2
26 
27 The application reading data is carefully synchronized, in order to:
28  - if the block it wants is not unzipped, it self-unzips it without
29  waiting
30  - if the block is being unzipped in parallel, it waits only
31  for that unzip to finish
32  - if the block has already been unzipped, it takes it
33 
34 This is supposed to cancel a part of the unzipping latency, at the
35 expenses of cpu time.
36 
37 The default parameters are the same of the prev version, i.e. 20%
38 of the TTreeCache cache size. To change it use
39 TTreeCache::SetUnzipBufferSize(Long64_t bufferSize)
40 where bufferSize must be passed in bytes.
41 */
42 
43 #include "TTreeCacheUnzip.h"
44 #include "TChain.h"
45 #include "TBranch.h"
46 #include "TFile.h"
47 #include "TEventList.h"
48 #include "TVirtualMutex.h"
49 #include "TThread.h"
50 #include "TCondition.h"
51 #include "TMath.h"
52 #include "Bytes.h"
53 
54 #include "TEnv.h"
55 
56 #define THREADCNT 2
57 extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);
58 extern "C" int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout);
59 
61 
62 // The unzip cache does not consume memory by itself, it just allocates in advance
63 // mem blocks which are then picked as they are by the baskets.
64 // Hence there is no good reason to limit it too much
66 
68 
69 ////////////////////////////////////////////////////////////////////////////////
70 
72 
73  fActiveThread(kFALSE),
74  fAsyncReading(kFALSE),
75  fCycle(0),
76  fLastReadPos(0),
77  fBlocksToGo(0),
78  fUnzipLen(0),
79  fUnzipChunks(0),
80  fUnzipStatus(0),
81  fTotalUnzipBytes(0),
82  fNseekMax(0),
83  fUnzipBufferSize(0),
84  fNUnzip(0),
85  fNFound(0),
86  fNStalls(0),
87  fNMissed(0)
88 
89 {
90  // Default Constructor.
91 
92  Init();
93 }
94 
95 ////////////////////////////////////////////////////////////////////////////////
96 /// Constructor.
97 
98 TTreeCacheUnzip::TTreeCacheUnzip(TTree *tree, Int_t buffersize) : TTreeCache(tree,buffersize),
99  fActiveThread(kFALSE),
100  fAsyncReading(kFALSE),
101  fCycle(0),
102  fLastReadPos(0),
103  fBlocksToGo(0),
104  fUnzipLen(0),
105  fUnzipChunks(0),
106  fUnzipStatus(0),
107  fTotalUnzipBytes(0),
108  fNseekMax(0),
109  fUnzipBufferSize(0),
110  fNUnzip(0),
111  fNFound(0),
112  fNStalls(0),
113  fNMissed(0)
114 {
115  Init();
116 }
117 
118 ////////////////////////////////////////////////////////////////////////////////
119 /// Initialization procedure common to all the constructors.
120 
122 {
123  fMutexList = new TMutex(kTRUE);
124  fIOMutex = new TMutex(kTRUE);
125 
128 
129  fTotalUnzipBytes = 0;
130 
131  fCompBuffer = new char[16384];
132  fCompBufferSize = 16384;
133 
134  if (fgParallel == kDisable) {
135  fParallel = kFALSE;
136  }
137  else if(fgParallel == kEnable || fgParallel == kForce) {
138  SysInfo_t info;
139  gSystem->GetSysInfo(&info);
140 
142 
143  if(gDebug > 0)
144  Info("TTreeCacheUnzip", "Enabling Parallel Unzipping");
145 
146  fParallel = kTRUE;
147 
148  for (Int_t i = 0; i < 10; i++) fUnzipThread[i] = 0;
149 
151 
152  }
153  else {
154  Warning("TTreeCacheUnzip", "Parallel Option unknown");
155  }
156 
157  // Check if asynchronous reading is supported by this TFile specialization
158  if (gEnv->GetValue("TFile.AsyncReading", 1)) {
159  if (fFile && !(fFile->ReadBufferAsync(0, 0)))
161  }
162 
163 }
164 
165 ////////////////////////////////////////////////////////////////////////////////
166 /// Destructor. (in general called by the TFile destructor)
167 
169 {
170  ResetCache();
171 
172  if (IsActiveThread())
173  StopThreadUnzip();
174 
175  delete [] fUnzipLen;
176 
177  delete fUnzipStartCondition;
178  delete fUnzipDoneCondition;
179 
180  delete fMutexList;
181  delete fIOMutex;
182 
183  delete [] fUnzipStatus;
184  delete [] fUnzipChunks;
185 }
186 
187 ////////////////////////////////////////////////////////////////////////////////
188 /// Add a branch to the list of branches to be stored in the cache
189 /// this function is called by TBranch::GetBasket
190 /// Returns:
191 /// - 0 branch added or already included
192 /// - -1 on error
193 
194 Int_t TTreeCacheUnzip::AddBranch(TBranch *b, Bool_t subbranches /*= kFALSE*/)
195 {
197 
198  return TTreeCache::AddBranch(b, subbranches);
199 }
200 
201 ////////////////////////////////////////////////////////////////////////////////
202 /// Add a branch to the list of branches to be stored in the cache
203 /// this function is called by TBranch::GetBasket
204 /// Returns:
205 /// - 0 branch added or already included
206 /// - -1 on error
207 
208 Int_t TTreeCacheUnzip::AddBranch(const char *branch, Bool_t subbranches /*= kFALSE*/)
209 {
211 
212  return TTreeCache::AddBranch(branch, subbranches);
213 }
214 
215 ////////////////////////////////////////////////////////////////////////////////
216 
218 {
219  if (fNbranches <= 0) return kFALSE;
220  {
221  // Fill the cache buffer with the branches in the cache.
224 
225  TTree *tree = ((TBranch*)fBranches->UncheckedAt(0))->GetTree();
226  Long64_t entry = tree->GetReadEntry();
227 
228  // If the entry is in the range we previously prefetched, there is
229  // no point in retrying. Note that this will also return false
230  // during the training phase (fEntryNext is then set intentional to
231  // the end of the training phase).
232  if (fEntryCurrent <= entry && entry < fEntryNext) return kFALSE;
233 
234  // Triggered by the user, not the learning phase
235  if (entry == -1) entry=0;
236 
237  TTree::TClusterIterator clusterIter = tree->GetClusterIterator(entry);
238  fEntryCurrent = clusterIter();
239  fEntryNext = clusterIter.GetNextEntry();
240 
242  if (fEntryMax <= 0) fEntryMax = tree->GetEntries();
244 
245  // Check if owner has a TEventList set. If yes we optimize for this
246  // Special case reading only the baskets containing entries in the
247  // list.
248  TEventList *elist = fTree->GetEventList();
249  Long64_t chainOffset = 0;
250  if (elist) {
251  if (fTree->IsA() ==TChain::Class()) {
252  TChain *chain = (TChain*)fTree;
253  Int_t t = chain->GetTreeNumber();
254  chainOffset = chain->GetTreeOffset()[t];
255  }
256  }
257 
258  //clear cache buffer
260 
261  //store baskets
262  for (Int_t i=0;i<fNbranches;i++) {
264  if (b->GetDirectory()==0) continue;
265  if (b->GetDirectory()->GetFile() != fFile) continue;
266  Int_t nb = b->GetMaxBaskets();
267  Int_t *lbaskets = b->GetBasketBytes();
268  Long64_t *entries = b->GetBasketEntry();
269  if (!lbaskets || !entries) continue;
270  //we have found the branch. We now register all its baskets
271  //from the requested offset to the basket below fEntrymax
272  Int_t blistsize = b->GetListOfBaskets()->GetSize();
273  for (Int_t j=0;j<nb;j++) {
274  // This basket has already been read, skip it
275  if (j<blistsize && b->GetListOfBaskets()->UncheckedAt(j)) continue;
276 
277  Long64_t pos = b->GetBasketSeek(j);
278  Int_t len = lbaskets[j];
279  if (pos <= 0 || len <= 0) continue;
280  //important: do not try to read fEntryNext, otherwise you jump to the next autoflush
281  if (entries[j] >= fEntryNext) continue;
282  if (entries[j] < entry && (j<nb-1 && entries[j+1] <= entry)) continue;
283  if (elist) {
284  Long64_t emax = fEntryMax;
285  if (j<nb-1) emax = entries[j+1]-1;
286  if (!elist->ContainsRange(entries[j]+chainOffset,emax+chainOffset)) continue;
287  }
288  fNReadPref++;
289 
290  TFileCacheRead::Prefetch(pos,len);
291  }
292  if (gDebug > 0) printf("Entry: %lld, registering baskets branch %s, fEntryNext=%lld, fNseek=%d, fNtot=%d\n",entry,((TBranch*)fBranches->UncheckedAt(i))->GetName(),fEntryNext,fNseek,fNtot);
293  }
294 
295  // Now fix the size of the status arrays
296  ResetCache();
297 
299 
300  }
301 
302  return kTRUE;
303 }
304 
305 ////////////////////////////////////////////////////////////////////////////////
306 /// Change the underlying buffer size of the cache.
307 /// Returns:
308 /// - 0 if the buffer content is still available
309 /// - 1 if some or all of the buffer content has been made unavailable
310 /// - -1 on error
311 
313 {
315 
316  Int_t res = TTreeCache::SetBufferSize(buffersize);
317  if (res < 0) {
318  return res;
319  }
321  ResetCache();
322  return 1;
323 }
324 
325 ////////////////////////////////////////////////////////////////////////////////
326 /// Set the minimum and maximum entry number to be processed
327 /// this information helps to optimize the number of baskets to read
328 /// when prefetching the branch buffers.
329 
331 {
333 
334  TTreeCache::SetEntryRange(emin, emax);
335 }
336 
337 ////////////////////////////////////////////////////////////////////////////////
338 /// It's the same as TTreeCache::StopLearningPhase but we guarantee that
339 /// we start the unzipping just after getting the buffers
340 
342 {
344 
346 
347 }
348 
349 ////////////////////////////////////////////////////////////////////////////////
350 ///update pointer to current Tree and recompute pointers to the branches in the cache
351 
353 {
355 
357 }
358 
359 ////////////////////////////////////////////////////////////////////////////////
360 // //
361 // From now on we have the methods concerning the threading part of the cache //
362 // //
363 ////////////////////////////////////////////////////////////////////////////////
364 
365 ////////////////////////////////////////////////////////////////////////////////
366 /// Static function that returns the parallel option
367 /// (to indicate an additional thread)
368 
370 {
371  return fgParallel;
372 }
373 
374 ////////////////////////////////////////////////////////////////////////////////
375 /// Static function that tells wether the multithreading unzipping is activated
376 
378 {
379  if (fgParallel == kEnable || fgParallel == kForce)
380  return kTRUE;
381 
382  return kFALSE;
383 }
384 
385 ////////////////////////////////////////////////////////////////////////////////
386 /// This indicates if the thread is active in this moment...
387 /// this variable is very important because if we change it from true to
388 /// false the thread will stop... ( see StopThreadTreeCacheUnzip() )
389 
391 {
393 
394  return fActiveThread;
395 }
396 
397 ////////////////////////////////////////////////////////////////////////////////
398 /// It says if the queue is empty... useful to see if we have to process it.
399 
401 {
403 
404  if ( fIsLearning )
405  return kTRUE;
406 
407  return kFALSE;
408 }
409 
411 {
412  // Here the threads sleep waiting for some blocks to unzip
413 
415 
416 }
417 
418 ////////////////////////////////////////////////////////////////////////////////
419 /// This will send the signal corresponfing to the queue... normally used
420 /// when we want to start processing the list of buffers.
421 
423 {
424  if (gDebug > 0) Info("SendSignal", " fUnzipCondition->Signal()");
425 
426  if (broadcast)
428  else
430 }
431 
432 ////////////////////////////////////////////////////////////////////////////////
433 /// Static function that (de)activates multithreading unzipping
434 ///
435 /// The possible options are:
436 /// - kEnable _Enable_ it, which causes an automatic detection and launches the
437 /// additional thread if the number of cores in the machine is greater than
438 /// one
439 /// - kDisable _Disable_ will not activate the additional thread.
440 /// - kForce _Force_ will start the additional thread even if there is only one
441 /// core. the default will be taken as kEnable.
442 ///
443 /// Returns 0 if there was an error, 1 otherwise.
444 
446 {
448  fgParallel = option;
449  return 1;
450  }
451  return 0;
452 }
453 
454 class TTreeCacheUnzipData {
455 public:
456  TTreeCacheUnzip *fInstance;
457  Int_t fCount;
458 };
459 
460 ////////////////////////////////////////////////////////////////////////////////
461 /// The Thread is only a part of the TTreeCache but it is the part that
462 /// waits for info in the queue and process it... unfortunatly, a Thread is
463 /// not an object an we have to deal with it in the old C-Style way
464 /// Returns 0 if the thread was initialized or 1 if it was already running
465 
467 {
468  Int_t nt = nthreads;
469  if (nt > 10) nt = 10;
470 
471  if (gDebug > 0)
472  Info("StartThreadUnzip", "Going to start %d threads.", nt);
473 
474  for (Int_t i = 0; i < nt; i++) {
475  if (!fUnzipThread[i]) {
476  TString nm("UnzipLoop");
477  nm += i;
478 
479  if (gDebug > 0)
480  Info("StartThreadUnzip", "Going to start thread '%s'", nm.Data());
481 
482  TTreeCacheUnzipData *d = new TTreeCacheUnzipData;
483  d->fInstance = this;
484  d->fCount = i;
485 
486  fUnzipThread[i] = new TThread(nm.Data(), UnzipLoop, (void*)d);
487  if (!fUnzipThread[i])
488  Error("TTreeCacheUnzip::StartThreadUnzip", " Unable to create new thread.");
489 
490  fUnzipThread[i]->Run();
491 
492  // There is at least one active thread
494 
495  }
496  }
497 
498  return (fActiveThread == kTRUE);
499 }
500 
501 ////////////////////////////////////////////////////////////////////////////////
502 /// To stop the thread we only need to change the value of the variable
503 /// fActiveThread to false and the loop will stop (of course, we will have)
504 /// to do the cleaning after that.
505 ///
506 /// Note: The syncronization part is important here or we will try to delete
507 /// teh object while it's still processing the queue
508 
510 {
512 
513  for (Int_t i = 0; i < 1; i++) {
514  if(fUnzipThread[i]){
515 
517 
518  if (fUnzipThread[i]->Exists()) {
519  fUnzipThread[i]->Join();
520  delete fUnzipThread[i];
521  }
522  }
523 
524  }
525 
526  return 1;
527 }
528 
529 ////////////////////////////////////////////////////////////////////////////////
530 /// This is a static function.
531 ///
532 /// This is the call that will be executed in the Thread generated by
533 /// StartThreadTreeCacheUnzip... what we want to do is to inflate the next
534 /// series of buffers leaving them in the second cache.
535 ///
536 /// Returns 0 when it finishes
537 
539 {
540  TTreeCacheUnzipData *d = (TTreeCacheUnzipData *)arg;
541  TTreeCacheUnzip *unzipMng = d->fInstance;
542 
545 
546  Int_t thrnum = d->fCount;
547  Int_t startindex = thrnum;
548  Int_t locbuffsz = 16384;
549  char *locbuff = new char[16384];
550  Int_t res = 0;
551  Int_t myCycle = 0;
552 
553  while( unzipMng->IsActiveThread() ) {
554  res = 1;
555 
556  {
557  R__LOCKGUARD(unzipMng->fMutexList);
558  if (myCycle != unzipMng->fCycle) startindex = thrnum;
559  myCycle = unzipMng->fCycle;
560  if (unzipMng->fNseek) startindex = startindex % unzipMng->fNseek;
561  else startindex = -1;
562  }
563 
564  if (startindex >= 0)
565  res = unzipMng->UnzipCache(startindex, locbuffsz, locbuff);
566 
567  {
568  R__LOCKGUARD(unzipMng->fMutexList);
569 
570  if(!unzipMng->IsActiveThread()) break;
571 
572  if ((res == 1) || (!unzipMng->fIsTransferred)) {
573  unzipMng->WaitUnzipStartSignal();
574  startindex = unzipMng->fLastReadPos+3+thrnum;
575  }
576  }
577 
578  }
579 
580  delete d;
581  delete [] locbuff;
582  return (void *)0;
583 }
584 
585 ////////////////////////////////////////////////////////////////////////////////
586 // //
587 // From now on we have the methods concerning the unzipping part of the cache //
588 // //
589 ////////////////////////////////////////////////////////////////////////////////
590 
591 ////////////////////////////////////////////////////////////////////////////////
592 /// Read the logical record header from the buffer buf.
593 /// That must be the pointer tho the header part not the object by itself and
594 /// must contain data of at least maxbytes
595 /// Returns nread;
596 ///
597 /// In output arguments:
598 ///
599 /// - nbytes : number of bytes in record
600 /// if negative, this is a deleted record
601 /// if 0, cannot read record, wrong value of argument first
602 /// - objlen : uncompressed object size
603 /// - keylen : length of logical record header
604 ///
605 /// Note that the arguments objlen and keylen are returned only
606 /// if maxbytes >=16
607 /// Note: This was adapted from TFile... so some things dont apply
608 
609 Int_t TTreeCacheUnzip::GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
610 {
611  Version_t versionkey;
612  Short_t klen;
613  UInt_t datime;
614  Int_t nb = 0,olen;
615  Int_t nread = maxbytes;
616  frombuf(buf,&nb);
617  nbytes = nb;
618  if (nb < 0) return nread;
619  // const Int_t headerSize = Int_t(sizeof(nb) +sizeof(versionkey) +sizeof(olen) +sizeof(datime) +sizeof(klen));
620  const Int_t headerSize = 16;
621  if (nread < headerSize) return nread;
622  frombuf(buf, &versionkey);
623  frombuf(buf, &olen);
624  frombuf(buf, &datime);
625  frombuf(buf, &klen);
626  if (!olen) olen = nbytes-klen;
627  objlen = olen;
628  keylen = klen;
629  return nread;
630 }
631 
632 ////////////////////////////////////////////////////////////////////////////////
633 /// This will delete the list of buffers that are in the unzipping cache
634 /// and will reset certain values in the cache.
635 /// This name is ambiguos because the method doesn't reset the whole cache,
636 /// only the part related to the unzipping
637 /// Note: This method is completely different from TTreeCache::ResetCache(),
638 /// in that method we were cleaning the prefetching buffer while here we
639 /// delete the information about the unzipped buffers
640 
642 {
643  {
645 
646  if (gDebug > 0)
647  Info("ResetCache", "Thread: %ld -- Resetting the cache. fNseek:%d fNSeekMax:%d fTotalUnzipBytes:%lld", TThread::SelfId(), fNseek, fNseekMax, fTotalUnzipBytes);
648 
649  // Reset all the lists and wipe all the chunks
650  fCycle++;
651  for (Int_t i = 0; i < fNseekMax; i++) {
652  if (fUnzipLen) fUnzipLen[i] = 0;
653  if (fUnzipChunks) {
654  if (fUnzipChunks[i]) delete [] fUnzipChunks[i];
655  fUnzipChunks[i] = 0;
656  }
657  if (fUnzipStatus) fUnzipStatus[i] = 0;
658 
659  }
660 
661  while (fActiveBlks.size()) fActiveBlks.pop();
662 
663  if(fNseekMax < fNseek){
664  if (gDebug > 0)
665  Info("ResetCache", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
666 
667  Byte_t *aUnzipStatus = new Byte_t[fNseek];
668  memset(aUnzipStatus, 0, fNseek*sizeof(Byte_t));
669 
670  Int_t *aUnzipLen = new Int_t[fNseek];
671  memset(aUnzipLen, 0, fNseek*sizeof(Int_t));
672 
673  char **aUnzipChunks = new char *[fNseek];
674  memset(aUnzipChunks, 0, fNseek*sizeof(char *));
675 
676  if (fUnzipStatus) delete [] fUnzipStatus;
677  if (fUnzipLen) delete [] fUnzipLen;
678  if (fUnzipChunks) delete [] fUnzipChunks;
679 
680  fUnzipStatus = aUnzipStatus;
681  fUnzipLen = aUnzipLen;
682  fUnzipChunks = aUnzipChunks;
683 
684  fNseekMax = fNseek;
685  }
686 
687  fLastReadPos = 0;
688  fTotalUnzipBytes = 0;
690  }
691 
693 
694 }
695 
696 ////////////////////////////////////////////////////////////////////////////////
697 /// We try to read a buffer that has already been unzipped
698 /// Returns -1 in case of read failure, 0 in case it's not in the
699 /// cache and n>0 in case read from cache (number of bytes copied).
700 /// pos and len are the original values as were passed to ReadBuffer
701 /// but instead we will return the inflated buffer.
702 /// Note!! : If *buf == 0 we will allocate the buffer and it will be the
703 /// responsability of the caller to free it... it is useful for example
704 /// to pass it to the creator of TBuffer
705 
707 {
708  Int_t res = 0;
709  Int_t loc = -1;
710 
711  {
713 
714  // We go straight to TTreeCache/TfileCacheRead, in order to get the info we need
715  // pointer to the original zipped chunk
716  // its index in the original unsorted offsets lists
717  //
718  // Actually there are situations in which copying the buffer is not
719  // useful. But the choice is among doing once more a small memcpy or a binary search in a large array. I prefer the former.
720  // Also, here we prefer not to trigger the (re)population of the chunks in the TFileCacheRead. That is
721  // better to be done in the main thread.
722 
723  // And now loc is the position of the chunk in the array of the sorted chunks
724  Int_t myCycle = fCycle;
725 
726  if (fParallel && !fIsLearning) {
727 
728  if(fNseekMax < fNseek){
729  if (gDebug > 0)
730  Info("GetUnzipBuffer", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
731 
732  Byte_t *aUnzipStatus = new Byte_t[fNseek];
733  memset(aUnzipStatus, 0, fNseek*sizeof(Byte_t));
734 
735  Int_t *aUnzipLen = new Int_t[fNseek];
736  memset(aUnzipLen, 0, fNseek*sizeof(Int_t));
737 
738  char **aUnzipChunks = new char *[fNseek];
739  memset(aUnzipChunks, 0, fNseek*sizeof(char *));
740 
741  for (Int_t i = 0; i < fNseekMax; i++) {
742  aUnzipStatus[i] = fUnzipStatus[i];
743  aUnzipLen[i] = fUnzipLen[i];
744  aUnzipChunks[i] = fUnzipChunks[i];
745  }
746 
747  if (fUnzipStatus) delete [] fUnzipStatus;
748  if (fUnzipLen) delete [] fUnzipLen;
749  if (fUnzipChunks) delete [] fUnzipChunks;
750 
751  fUnzipStatus = aUnzipStatus;
752  fUnzipLen = aUnzipLen;
753  fUnzipChunks = aUnzipChunks;
754 
755  fNseekMax = fNseek;
756  }
757 
759  if ( (fCycle == myCycle) && (loc >= 0) && (loc < fNseek) && (pos == fSeekSort[loc]) ) {
760 
761  // The buffer is, at minimum, in the file cache. We must know its index in the requests list
762  // In order to get its info
763  Int_t seekidx = fSeekIndex[loc];
764 
765  fLastReadPos = seekidx;
766 
767  do {
768 
769  // If the block is ready we get it immediately.
770  // And also we don't have to alloc the blks. This is supposed to be
771  // the main thread of the app.
772  if ((fUnzipStatus[seekidx] == 2) && (fUnzipChunks[seekidx]) && (fUnzipLen[seekidx] > 0)) {
773 
774  //if (gDebug > 0)
775  // Info("GetUnzipBuffer", "++++++++++++++++++++ CacheHIT Block wanted: %d len:%d req_len:%d fNseek:%d", seekidx, fUnzipLen[seekidx], len, fNseek);
776 
777  if(!(*buf)) {
778  *buf = fUnzipChunks[seekidx];
779  fUnzipChunks[seekidx] = 0;
780  fTotalUnzipBytes -= fUnzipLen[seekidx];
782  *free = kTRUE;
783  }
784  else {
785  memcpy(*buf, fUnzipChunks[seekidx], fUnzipLen[seekidx]);
786  delete fUnzipChunks[seekidx];
787  fTotalUnzipBytes -= fUnzipLen[seekidx];
788  fUnzipChunks[seekidx] = 0;
790  *free = kFALSE;
791  }
792 
793  fNFound++;
794 
795  return fUnzipLen[seekidx];
796  }
797 
798  // If the status of the unzipped chunk is pending
799  // we wait on the condvar, hoping that the next signal is the good one
800  if ( fUnzipStatus[seekidx] == 1 ) {
801  //fMutexList->UnLock();
803  //fMutexList->Lock();
804 
805  if ( myCycle != fCycle ) {
806  if (gDebug > 0)
807  Info("GetUnzipBuffer", "Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
809 
810  fLastReadPos = 0;
811 
812  seekidx = -1;
813  break;
814  }
815 
816  }
817 
818  } while ( fUnzipStatus[seekidx] == 1 );
819 
820  //if (gDebug > 0)
821  // Info("GetUnzipBuffer", "------- Block wanted: %d status: %d len: %d chunk: %llx ", seekidx, fUnzipStatus[seekidx], fUnzipLen[seekidx], fUnzipChunks[seekidx]);
822 
823  // Here the block is not pending. It could be done or aborted or not yet being processed.
824  if ( (seekidx >= 0) && (fUnzipStatus[seekidx] == 2) && (fUnzipChunks[seekidx]) && (fUnzipLen[seekidx] > 0) ) {
825 
826  //if (gDebug > 0)
827  // Info("GetUnzipBuffer", "++++++++++++++++++++ CacheLateHIT Block wanted: %d len:%d fNseek:%d", seekidx, fUnzipLen[seekidx], fNseek);
828 
829  if(!(*buf)) {
830  *buf = fUnzipChunks[seekidx];
831  fUnzipChunks[seekidx] = 0;
832  fTotalUnzipBytes -= fUnzipLen[seekidx];
834  *free = kTRUE;
835  }
836  else {
837  memcpy(*buf, fUnzipChunks[seekidx], fUnzipLen[seekidx]);
838  delete fUnzipChunks[seekidx];
839  fTotalUnzipBytes -= fUnzipLen[seekidx];
840  fUnzipChunks[seekidx] = 0;
842  *free = kFALSE;
843  }
844 
845  fNStalls++;
846 
847  return fUnzipLen[seekidx];
848  }
849  else {
850  // This is a complete miss. We want to avoid the threads
851  // to try unzipping this block in the future.
852  fUnzipStatus[seekidx] = 2;
853  fUnzipChunks[seekidx] = 0;
854 
857 
858  //if (gDebug > 0)
859  // Info("GetUnzipBuffer", "++++++++++++++++++++ CacheMISS Block wanted: %d len:%d fNseek:%d", seekidx, len, fNseek);
860  }
861 
862  } else {
863  loc = -1;
864  //fLastReadPos = 0;
866  }
867 
868  } else {
869  // We need to reset it for new transferences...
870  //ResetCache();
871  //TFileCacheRead::Prefetch(0,0);
872  }
873 
874  } // scope of the lock!
875 
876  if (len > fCompBufferSize) {
877  delete [] fCompBuffer;
878  fCompBuffer = new char[len];
879  fCompBufferSize = len;
880  } else {
881  if (fCompBufferSize > len*4) {
882  delete [] fCompBuffer;
883  fCompBuffer = new char[len*2];
884  fCompBufferSize = len*2;
885  }
886  }
887 
888  {
890  // Here we know that the async unzip of the wanted chunk
891  // was not done for some reason. We continue.
892 
893  res = 0;
894  if (!ReadBufferExt(fCompBuffer, pos, len, loc)) {
895  //Info("GetUnzipBuffer", "++++++++++++++++++++ CacheMISS %d %d", loc, fNseek);
896  fFile->Seek(pos);
897  res = fFile->ReadBuffer(fCompBuffer, len);
898  }
899 
900  if (res) res = -1;
901 
902  } // scope of the lock!
903 
904  if (!res) {
905  res = UnzipBuffer(buf, fCompBuffer);
906  *free = kTRUE;
907  }
908 
909  if (!fIsLearning) {
910  fNMissed++;
911  }
912 
913  return res;
914 
915 }
916 
917 ////////////////////////////////////////////////////////////////////////////////
918 /// static function: Sets the unzip relatibe buffer size
919 
921 {
922  fgRelBuffSize = relbufferSize;
923 }
924 
925 ////////////////////////////////////////////////////////////////////////////////
926 /// Sets the size for the unzipping cache... by default it should be
927 /// two times the size of the prefetching cache
928 
930 {
932 
933  fUnzipBufferSize = bufferSize;
934 }
935 
936 ////////////////////////////////////////////////////////////////////////////////
937 /// Unzips a ROOT specific buffer... by reading the header at the beginning.
938 /// returns the size of the inflated buffer or -1 if error
939 /// Note!! : If *dest == 0 we will allocate the buffer and it will be the
940 /// responsability of the caller to free it... it is useful for example
941 /// to pass it to the creator of TBuffer
942 /// src is the original buffer with the record (header+compressed data)
943 /// *dest is the inflated buffer (including the header)
944 
946 {
947  Int_t uzlen = 0;
948  Bool_t alloc = kFALSE;
949 
950  // Here we read the header of the buffer
951  const Int_t hlen=128;
952  Int_t nbytes=0, objlen=0, keylen=0;
953  GetRecordHeader(src, hlen, nbytes, objlen, keylen);
954 
955  if (!(*dest)) {
956  /* early consistency check */
957  UChar_t *bufcur = (UChar_t *) (src + keylen);
958  Int_t nin, nbuf;
959  if(R__unzip_header(&nin, bufcur, &nbuf)!=0) {
960  Error("UnzipBuffer", "Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
961  uzlen = -1;
962  return uzlen;
963  }
964  Int_t l = keylen+objlen;
965  *dest = new char[l];
966  alloc = kTRUE;
967  }
968  // Must unzip the buffer
969  // fSeekPos[ind]; adress of zipped buffer
970  // fSeekLen[ind]; len of the zipped buffer
971  // &fBuffer[fSeekPos[ind]]; memory address
972 
973  // This is similar to TBasket::ReadBasketBuffers
974  Bool_t oldCase = objlen==nbytes-keylen
975  && ((TBranch*)fBranches->UncheckedAt(0))->GetCompressionLevel()!=0
976  && fFile->GetVersion()<=30401;
977 
978  if (objlen > nbytes-keylen || oldCase) {
979 
980  // Copy the key
981  memcpy(*dest, src, keylen);
982  uzlen += keylen;
983 
984  char *objbuf = *dest + keylen;
985  UChar_t *bufcur = (UChar_t *) (src + keylen);
986  Int_t nin, nbuf;
987  Int_t nout = 0;
988  Int_t noutot = 0;
989 
990  while (1) {
991  Int_t hc = R__unzip_header(&nin, bufcur, &nbuf);
992  if (hc!=0) break;
993  if (gDebug > 2)
994  Info("UnzipBuffer", " nin:%d, nbuf:%d, bufcur[3] :%d, bufcur[4] :%d, bufcur[5] :%d ",
995  nin, nbuf, bufcur[3], bufcur[4], bufcur[5]);
996  if (oldCase && (nin > objlen || nbuf > objlen)) {
997  if (gDebug > 2)
998  Info("UnzipBuffer", "oldcase objlen :%d ", objlen);
999 
1000  //buffer was very likely not compressed in an old version
1001  memcpy( *dest + keylen, src + keylen, objlen);
1002  uzlen += objlen;
1003  return uzlen;
1004  }
1005 
1006  R__unzip(&nin, bufcur, &nbuf, objbuf, &nout);
1007 
1008  if (gDebug > 2)
1009  Info("UnzipBuffer", "R__unzip nin:%d, bufcur:%p, nbuf:%d, objbuf:%p, nout:%d",
1010  nin, bufcur, nbuf, objbuf, nout);
1011 
1012  if (!nout) break;
1013  noutot += nout;
1014  if (noutot >= objlen) break;
1015  bufcur += nin;
1016  objbuf += nout;
1017  }
1018 
1019  if (noutot != objlen) {
1020  Error("UnzipBuffer", "nbytes = %d, keylen = %d, objlen = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d",
1021  nbytes,keylen,objlen, noutot,nout,nin,nbuf);
1022  uzlen = -1;
1023  if(alloc) delete [] *dest;
1024  *dest = 0;
1025  return uzlen;
1026  }
1027  uzlen += objlen;
1028  } else {
1029  memcpy(*dest, src, keylen);
1030  uzlen += keylen;
1031  memcpy(*dest + keylen, src + keylen, objlen);
1032  uzlen += objlen;
1033  }
1034  return uzlen;
1035 }
1036 
1037 ////////////////////////////////////////////////////////////////////////////////
1038 /// This inflates all the buffers in the cache.. passing the data to a new
1039 /// buffer that will only wait there to be read...
1040 /// We can not inflate all the buffers in the cache so we will try to do
1041 /// it until the cache gets full... there is a member called fUnzipBufferSize which will
1042 /// tell us the max size we can allocate for this cache.
1043 ///
1044 /// note that we will unzip in the order they were put into the cache not
1045 /// the order of the transference so it has to be read in that order or the
1046 /// pre-unzipping will be useless.
1047 ///
1048 /// startindex is used as start index to check for blks to be unzipped
1049 ///
1050 /// returns 0 in normal conditions or -1 if error, 1 if it would like to sleep
1051 ///
1052 /// This func is supposed to compete among an indefinite number of threads to get a chunk to inflate
1053 /// in order to accommodate multiple unzippers
1054 /// Since everything is so async, we cannot use a fixed buffer, we are forced to keep
1055 /// the individual chunks as separate blocks, whose summed size does not exceed the maximum
1056 /// allowed. The pointers are kept globally in the array fUnzipChunks
1057 
1058 Int_t TTreeCacheUnzip::UnzipCache(Int_t &startindex, Int_t &locbuffsz, char *&locbuff)
1059 {
1060  Int_t myCycle;
1061  const Int_t hlen=128;
1062  Int_t objlen=0, keylen=0;
1063  Int_t nbytes=0;
1064  Int_t readbuf = 0;
1065 
1066  Int_t idxtounzip = -1;
1067  Long64_t rdoffs = 0;
1068  Int_t rdlen = 0;
1069  {
1071 
1072  if (!IsActiveThread() || !fNseek || fIsLearning || !fIsTransferred) {
1073  if (gDebug > 0)
1074  Info("UnzipCache", "Sudden Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1076  return 1;
1077  }
1078 
1079  // To synchronize with the 'paging'
1080  myCycle = fCycle;
1081 
1082  // Try to look for a blk to unzip
1083  idxtounzip = -1;
1084  rdoffs = 0;
1085  rdlen = 0;
1087 
1088  if (fBlocksToGo > 0) {
1089  for (Int_t ii=0; ii < fNseek; ii++) {
1090  Int_t reqi = (startindex+ii) % fNseek;
1091  if (!fUnzipStatus[reqi] && (fSeekLen[reqi] > 256) ) {
1092  // We found a chunk which is not unzipped nor pending
1093  fUnzipStatus[reqi] = 1; // Set it as pending
1094  idxtounzip = reqi;
1095 
1096  rdoffs = fSeek[idxtounzip];
1097  rdlen = fSeekLen[idxtounzip];
1098  break;
1099  }
1100  }
1101  if (idxtounzip < 0) fBlocksToGo = 0;
1102  }
1103  }
1104 
1105  } // lock scope
1106 
1107  if (idxtounzip < 0) {
1108  if (gDebug > 0)
1109  Info("UnzipCache", "Nothing to do... startindex:%d fTotalUnzipBytes:%lld fUnzipBufferSize:%lld fNseek:%d",
1110  startindex, fTotalUnzipBytes, fUnzipBufferSize, fNseek );
1111  return 1;
1112  }
1113 
1114  // And here we have a new blk to unzip
1115  startindex = idxtounzip+THREADCNT;
1116 
1117  if (!IsActiveThread() || !fNseek || fIsLearning ) {
1118  if (gDebug > 0)
1119  Info("UnzipCache", "Sudden Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1121  return 1;
1122  }
1123 
1124  Int_t loc = -1;
1125 
1126  // Prepare a static tmp buf of adequate size
1127  if(locbuffsz < rdlen) {
1128  if (locbuff) delete [] locbuff;
1129  locbuffsz = rdlen;
1130  locbuff = new char[locbuffsz];
1131  //memset(locbuff, 0, locbuffsz);
1132  } else if(locbuffsz > rdlen*3) {
1133  if (locbuff) delete [] locbuff;
1134  locbuffsz = rdlen*2;
1135  locbuff = new char[locbuffsz];
1136  //memset(locbuff, 0, locbuffsz);
1137  }
1138 
1139  if (gDebug > 0)
1140  Info("UnzipCache", "Going to unzip block %d", idxtounzip);
1141 
1142  readbuf = ReadBufferExt(locbuff, rdoffs, rdlen, loc);
1143 
1144  {
1146 
1147  if ( (myCycle != fCycle) || !fIsTransferred ) {
1148  if (gDebug > 0)
1149  Info("UnzipCache", "Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1151 
1152  fUnzipStatus[idxtounzip] = 2; // Set it as not done
1153  fUnzipChunks[idxtounzip] = 0;
1154  fUnzipLen[idxtounzip] = 0;
1156 
1157  startindex = 0;
1158  return 1;
1159  }
1160 
1161  if (readbuf <= 0) {
1162  fUnzipStatus[idxtounzip] = 2; // Set it as not done
1163  fUnzipChunks[idxtounzip] = 0;
1164  fUnzipLen[idxtounzip] = 0;
1165  if (gDebug > 0)
1166  Info("UnzipCache", "Block %d not done. rdoffs=%lld rdlen=%d readbuf=%d", idxtounzip, rdoffs, rdlen, readbuf);
1167  return -1;
1168  }
1169 
1170  GetRecordHeader(locbuff, hlen, nbytes, objlen, keylen);
1171 
1172  Int_t len = (objlen > nbytes-keylen)? keylen+objlen : nbytes;
1173 
1174  // If the single unzipped chunk is really too big, reset it to not processable
1175  // I.e. mark it as done but set the pointer to 0
1176  // This block will be unzipped synchronously in the main thread
1177  if (len > 4*fUnzipBufferSize) {
1178 
1179  //if (gDebug > 0)
1180  Info("UnzipCache", "Block %d is too big, skipping.", idxtounzip);
1181 
1182  fUnzipStatus[idxtounzip] = 2; // Set it as done
1183  fUnzipChunks[idxtounzip] = 0;
1184  fUnzipLen[idxtounzip] = 0;
1185 
1187  return 0;
1188  }
1189 
1190  } // Scope of the lock
1191 
1192  // Unzip it into a new blk
1193  char *ptr = 0;
1194  Int_t loclen = 0;
1195 
1196  loclen = UnzipBuffer(&ptr, locbuff);
1197 
1198  if ((loclen > 0) && (loclen == objlen+keylen)) {
1200 
1201  if ( (myCycle != fCycle) || !fIsTransferred) {
1202  if (gDebug > 0)
1203  Info("UnzipCache", "Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1205  delete [] ptr;
1206 
1207  fUnzipStatus[idxtounzip] = 2; // Set it as not done
1208  fUnzipChunks[idxtounzip] = 0;
1209  fUnzipLen[idxtounzip] = 0;
1210 
1211  startindex = 0;
1213  return 1;
1214  }
1215 
1216  fUnzipStatus[idxtounzip] = 2; // Set it as done
1217  fUnzipChunks[idxtounzip] = ptr;
1218  fUnzipLen[idxtounzip] = loclen;
1219  fTotalUnzipBytes += loclen;
1220 
1221  fActiveBlks.push(idxtounzip);
1222 
1223  if (gDebug > 0)
1224  Info("UnzipCache", "reqi:%d, rdoffs:%lld, rdlen: %d, loclen:%d",
1225  idxtounzip, rdoffs, rdlen, loclen);
1226 
1227  fNUnzip++;
1228  }
1229  else {
1231  Info("argh", "loclen:%d objlen:%d loc:%d readbuf:%d", loclen, objlen, loc, readbuf);
1232  fUnzipStatus[idxtounzip] = 2; // Set it as done
1233  fUnzipChunks[idxtounzip] = 0;
1234  fUnzipLen[idxtounzip] = 0;
1235  }
1236 
1238 
1239  delete [] ptr;
1240  return 0;
1241 }
1242 
1243 void TTreeCacheUnzip::Print(Option_t* option) const {
1244 
1245  printf("******TreeCacheUnzip statistics for file: %s ******\n",fFile->GetName());
1246  printf("Max allowed mem for pending buffers: %lld\n", fUnzipBufferSize);
1247  printf("Number of blocks unzipped by threads: %d\n", fNUnzip);
1248  printf("Number of hits: %d\n", fNFound);
1249  printf("Number of stalls: %d\n", fNStalls);
1250  printf("Number of misses: %d\n", fNMissed);
1251 
1252  TTreeCache::Print(option);
1253 }
1254 
1255 ////////////////////////////////////////////////////////////////////////////////
1256 
1259  return TTreeCache::ReadBufferExt(buf, pos, len, loc);
1260 
1261 }
TCondition * fUnzipStartCondition
void Print(Option_t *option="") const
Print cache statistics.
static Int_t SetCancelDeferred()
Static method to set the cancellation response type of the calling thread to deferred, i.e.
Definition: TThread.cxx:652
Int_t fNtot
Total size of prefetched blocks.
Definition: TMutex.h:37
void frombuf(char *&buf, Bool_t *x)
Definition: Bytes.h:282
Long64_t fEntryMax
first entry in the cache
Definition: TTreeCache.h:41
Long64_t GetNextEntry()
Definition: TTree.h:278
TFile * fFile
Pointer to file.
long long Long64_t
Definition: RtypesCore.h:69
Int_t StopThreadUnzip()
To stop the thread we only need to change the value of the variable fActiveThread to false and the lo...
short Version_t
Definition: RtypesCore.h:61
void UpdateBranches(TTree *tree)
update pointer to current Tree and recompute pointers to the branches in the cache ...
TObjArray * GetListOfBaskets()
Definition: TBranch.h:176
virtual void ResetCache()
This will delete the list of buffers that are in the unzipping cache and will reset certain values in...
float Float_t
Definition: RtypesCore.h:53
Int_t fNStalls
number of blocks that were found in the cache
TObjArray * fBranches
Definition: TTreeCache.h:48
const char Option_t
Definition: RtypesCore.h:62
TEventList * GetEventList() const
Definition: TTree.h:392
virtual Long64_t GetReadEntry() const
Definition: TTree.h:424
Int_t GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
Read the logical record header from the buffer buf.
virtual void StopLearningPhase()
It's the same as TTreeCache::StopLearningPhase but we guarantee that we start the unzipping just afte...
A specialized TFileCacheRead object for a TTree.
Definition: TTreeCache.h:34
virtual void Seek(Long64_t offset, ERelativeTo pos=kBeg)
Seek to a specific position in the file. Pos it either kBeg, kCur or kEnd.
Definition: TFile.cxx:2085
static Int_t SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option=TTreeCacheUnzip::kEnable)
Static function that (de)activates multithreading unzipping.
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:892
Byte_t * fUnzipStatus
[fNseek] Individual unzipped chunks. Their summed size is kept under control.
virtual Bool_t ReadBuffer(char *buf, Int_t len)
Read a buffer from the file.
Definition: TFile.cxx:1596
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
Long64_t fEntryMin
Definition: TTreeCache.h:40
Basic string class.
Definition: TString.h:137
Bool_t FillBuffer()
Fill the cache buffer with the branches in the cache.
static Int_t SetCancelOn()
Static method to turn on thread cancellation.
Definition: TThread.cxx:632
virtual void Prefetch(Long64_t pos, Int_t len)
Add block of length len at position pos in the list of blocks to be prefetched.
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
const Bool_t kFALSE
Definition: Rtypes.h:92
virtual Long64_t GetBasketSeek(Int_t basket) const
Return address of basket in the file.
Definition: TBranch.cxx:1143
virtual void StopLearningPhase()
This is the counterpart of StartLearningPhase() and can be used to stop the learning phase...
TTree * GetTree() const
Definition: TTreeCache.h:88
Long64_t * GetBasketEntry() const
Definition: TBranch.h:149
TThread * fUnzipThread[10]
virtual Int_t AddBranch(TBranch *b, Bool_t subgbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
Definition: TTreeCache.cxx:330
void SetUnzipBufferSize(Long64_t bufferSize)
Sets the size for the unzipping cache...
virtual void Print(Option_t *option="") const
Print cache statistics.
Definition: TTreeCache.cxx:994
Int_t fNFound
number of blocks that were unzipped
static Bool_t IsParallelUnzip()
Static function that tells wether the multithreading unzipping is activated.
Int_t UnzipBuffer(char **dest, char *src)
Unzips a ROOT specific buffer...
virtual Int_t GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, Bool_t *free)
We try to read a buffer that has already been unzipped Returns -1 in case of read failure...
const char * Data() const
Definition: TString.h:349
std::queue< Int_t > fActiveBlks
number of blocks that were not found in the cache and were unzipped
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
Long64_t * GetTreeOffset() const
Definition: TChain.h:119
Helper class to iterate over cluster of baskets.
Definition: TTree.h:245
static void * UnzipLoop(void *arg)
This is a static function.
Vc_ALWAYS_INLINE void free(T *p)
Frees memory that was allocated with Vc::malloc.
Definition: memory.h:94
void Class()
Definition: Class.C:29
#define THREADCNT
virtual Int_t GetTreeNumber() const
Definition: TChain.h:118
Int_t * fSeekLen
[fNseek] Length of buffers to be prefetched
unsigned char Byte_t
Definition: RtypesCore.h:60
Bool_t fIsTransferred
True when fBuffer contains something valid.
Int_t fNMissed
number of hits which caused a stall
Int_t * GetBasketBytes() const
Definition: TBranch.h:148
virtual Bool_t ContainsRange(Long64_t entrymin, Long64_t entrymax)
Return TRUE if list contains entries from entrymin to entrymax included.
Definition: TEventList.cxx:169
void Init(TClassEdit::TInterpreterLookupHelper *helper)
Definition: TClassEdit.cxx:118
virtual Int_t GetBufferSize() const
virtual TClusterIterator GetClusterIterator(Long64_t firstentry)
Return an iterator over the cluster of baskets starting at firstentry.
Definition: TTree.cxx:4995
int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout)
static Long_t SelfId()
Static method returning the id for the current thread.
Definition: TThread.cxx:537
Specialization of TTreeCache for parallel Unzipping.
Int_t Broadcast()
Definition: TCondition.h:58
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:918
Int_t Run(void *arg=0)
Start the thread.
Definition: TThread.cxx:551
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
TTree * fTree
list of branch names in the cache
Definition: TTreeCache.h:50
virtual Int_t AddBranch(TBranch *b, Bool_t subbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
virtual TFile * GetFile() const
Definition: TDirectory.h:152
TObject * UncheckedAt(Int_t i) const
Definition: TObjArray.h:91
static Double_t fgRelBuffSize
Max Size for the ready unzipped blocks (default is 2*fBufferSize)
void SendUnzipStartSignal(Bool_t broadcast)
This will send the signal corresponfing to the queue...
virtual ~TTreeCacheUnzip()
Destructor. (in general called by the TFile destructor)
virtual void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
R__EXTERN TSystem * gSystem
Definition: TSystem.h:549
Int_t * fSeekIndex
[fNseek] sorted index table of fSeek
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Definition: TEnv.cxx:494
Int_t fNseekMax
The total sum of the currently unzipped blks.
unsigned int UInt_t
Definition: RtypesCore.h:42
static EParUnzipMode GetParallelUnzip()
Static function that returns the parallel option (to indicate an additional thread) ...
short Short_t
Definition: RtypesCore.h:35
A TEventList object is a list of selected events (entries) in a TTree.
Definition: TEventList.h:33
TLine * l
Definition: textangle.C:4
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:51
Long_t Join(void **ret=0)
Join this thread.
Definition: TThread.cxx:498
Long64_t entry
Int_t fNReadPref
Definition: TTreeCache.h:47
Long64_t fEntryCurrent
last entry in the cache
Definition: TTreeCache.h:42
Bool_t IsQueueEmpty()
It says if the queue is empty... useful to see if we have to process it.
virtual Int_t GetSize() const
Definition: TCollection.h:95
virtual const char * GetName() const
Returns name of object.
Definition: TObject.cxx:415
double Double_t
Definition: RtypesCore.h:55
Bool_t fIsLearning
pointer to the current Tree
Definition: TTreeCache.h:51
Long64_t fUnzipBufferSize
fNseek can change so we need to know its max size
R__EXTERN TEnv * gEnv
Definition: TEnv.h:174
TCondition * fUnzipDoneCondition
ClassImp(TMCParticle) void TMCParticle printf(": p=(%7.3f,%7.3f,%9.3f) ;", fPx, fPy, fPz)
#define R__LOCKGUARD(mutex)
void Init()
Initialization procedure common to all the constructors.
static void SetUnzipRelBufferSize(Float_t relbufferSize)
static function: Sets the unzip relatibe buffer size
static TTreeCacheUnzip::EParUnzipMode fgParallel
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout)
char ** fUnzipChunks
[fNseek] Length of the unzipped buffers
TDirectory * GetDirectory() const
Definition: TBranch.h:157
Int_t fNbranches
next entry number where cache must be filled
Definition: TTreeCache.h:44
Long64_t * fSeekSort
[fNseek] Position on file of buffers to be prefetched (sorted)
Bool_t IsActiveThread()
This indicates if the thread is active in this moment...
Int_t GetMaxBaskets() const
Definition: TBranch.h:179
Int_t StartThreadUnzip(Int_t nthreads)
The Thread is only a part of the TTreeCache but it is the part that waits for info in the queue and p...
#define dest(otri, vertexptr)
Definition: triangle.c:1040
Int_t GetVersion() const
Definition: TFile.h:205
Long64_t * fSeek
[fNseek] Position on file of buffers to be prefetched
A chain is a collection of files containg TTree objects.
Definition: TChain.h:35
ClassImp(TTreeCacheUnzip) TTreeCacheUnzip
R__EXTERN Int_t gDebug
Definition: Rtypes.h:128
Int_t TimedWaitRelative(ULong_t ms)
Wait to be signaled or till the timer times out.
Definition: TCondition.cxx:113
virtual Long64_t GetEntries() const
Definition: TTree.h:382
A TTree object has a header with a name and a title.
Definition: TTree.h:94
unsigned char UChar_t
Definition: RtypesCore.h:34
A TTree is a list of TBranches.
Definition: TBranch.h:58
Long64_t fEntryNext
current lowest entry number in the cache
Definition: TTreeCache.h:43
Long64_t fTotalUnzipBytes
[fNSeek] For each blk, tells us if it's unzipped or pending
const Bool_t kTRUE
Definition: Rtypes.h:91
Int_t UnzipCache(Int_t &startindex, Int_t &locbuffsz, char *&locbuff)
This inflates all the buffers in the cache.
void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
Long64_t BinarySearch(Long64_t n, const T *array, T value)
Definition: TMath.h:944
virtual int GetSysInfo(SysInfo_t *info) const
Returns static system info, like OS type, CPU type, number of CPUs RAM size, etc into the SysInfo_t s...
Definition: TSystem.cxx:2412
virtual Bool_t ReadBufferAsync(Long64_t offs, Int_t len)
Definition: TFile.cxx:4955
Int_t fNseek
Number of blocks to be prefetched.
Int_t Signal()
Definition: TCondition.h:57
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:904