Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TTreeCacheUnzip.cxx
Go to the documentation of this file.
1// Authors: Rene Brun 04/06/2006
2// Leandro Franco 10/04/2008
3// Fabrizio Furano (CERN) Aug 2009
4
5/*************************************************************************
6 * Copyright (C) 1995-2018, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13/**
14\class TTreeCacheUnzip
15\ingroup tree
16
17A TTreeCache which exploits parallelized decompression of its own content.
18
19*/
20
21#include "TTreeCacheUnzip.h"
22#include "TBranch.h"
23#include "TChain.h"
24#include "TEnv.h"
25#include "TEventList.h"
26#include "TFile.h"
27#include "TMath.h"
28#include "TROOT.h"
29#include "TMutex.h"
30
31#ifdef R__USE_IMT
33#include "ROOT/TTaskGroup.hxx"
34#endif
35
36#include <memory>
37
38extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);
40
42
43// The unzip cache does not consume memory by itself, it just allocates in advance
44// mem blocks which are then picked as they are by the baskets.
45// Hence there is no good reason to limit it too much
47
48
49////////////////////////////////////////////////////////////////////////////////
50/// Clear all baskets' state arrays.
51
53 for (Int_t i = 0; i < size; i++) {
54 if (!fUnzipLen.empty()) fUnzipLen[i] = 0;
55 if (fUnzipChunks) {
56 if (fUnzipChunks[i]) fUnzipChunks[i].reset();
57 }
58 if (fUnzipStatus) fUnzipStatus[i].store(0);
59 }
60}
61
62////////////////////////////////////////////////////////////////////////////////
63
65 return fUnzipStatus[index].load() == kUntouched;
66}
67
68////////////////////////////////////////////////////////////////////////////////
69
71 return fUnzipStatus[index].load() == kProgress;
72}
73
74////////////////////////////////////////////////////////////////////////////////
75
77 return fUnzipStatus[index].load() == kFinished;
78}
79
80////////////////////////////////////////////////////////////////////////////////
81/// Check if the basket is unzipped already. We must make sure the length in
82/// fUnzipLen is larger than 0.
83
85 return (fUnzipStatus[index].load() == kFinished) && (fUnzipChunks[index].get()) && (fUnzipLen[index] > 0);
86}
87
88////////////////////////////////////////////////////////////////////////////////
89/// Reset all baskets' state arrays. This function is only called by main
90/// thread and parallel processing from upper layers should be disabled such
91/// as IMT in TTree::GetEntry(). Other threads should not call this function
92/// since it is not thread-safe.
93
95 std::vector<Int_t> aUnzipLen = std::vector<Int_t>(newSize, 0);
96 std::unique_ptr<char[]> *aUnzipChunks = new std::unique_ptr<char[]>[newSize];
97 std::atomic<Byte_t> *aUnzipStatus = new std::atomic<Byte_t>[newSize];
98
99 for (Int_t i = 0; i < newSize; ++i)
100 aUnzipStatus[i].store(0);
101
102 for (Int_t i = 0; i < oldSize; i++) {
103 aUnzipLen[i] = fUnzipLen[i];
104 aUnzipChunks[i] = std::move(fUnzipChunks[i]);
105 aUnzipStatus[i].store(fUnzipStatus[i].load());
106 }
107
108 if (fUnzipChunks) delete [] fUnzipChunks;
109 if (fUnzipStatus) delete [] fUnzipStatus;
110
111 fUnzipLen = aUnzipLen;
112 fUnzipChunks = aUnzipChunks;
113 fUnzipStatus = aUnzipStatus;
114}
115
116////////////////////////////////////////////////////////////////////////////////
117/// Set cache as finished.
118/// There are three scenarios that a basket is set as finished:
119/// 1. The basket has already been unzipped.
120/// 2. The thread is aborted from unzipping process.
121/// 3. To avoid other tasks/threads work on this basket,
122/// main thread marks the basket as finished and designates itself
123/// to unzip this basket.
124
126 fUnzipLen[index] = 0;
127 fUnzipChunks[index].reset();
128 fUnzipStatus[index].store((Byte_t)kFinished);
129}
130
131////////////////////////////////////////////////////////////////////////////////
132
134 fUnzipChunks[index].reset();
135 fUnzipStatus[index].store((Byte_t)kFinished);
136}
137
138////////////////////////////////////////////////////////////////////////////////
139
141 // Update status array at the very end because we need to be synchronous with the main thread.
142 fUnzipLen[index] = len;
143 fUnzipChunks[index].reset(buf);
144 fUnzipStatus[index].store((Byte_t)kFinished);
145}
146
147////////////////////////////////////////////////////////////////////////////////
148/// Start unzipping the basket if it is untouched yet.
149
153 return fUnzipStatus[index].compare_exchange_weak(oldValue, newValue, std::memory_order_release, std::memory_order_relaxed);
154}
155
156////////////////////////////////////////////////////////////////////////////////
157
160 fEmpty(true),
161 fCycle(0),
162 fNseekMax(0),
165 fNFound(0),
166 fNMissed(0),
167 fNStalls(0),
168 fNUnzip(0)
169{
170 // Default Constructor.
171 Init();
172}
173
174////////////////////////////////////////////////////////////////////////////////
175/// Constructor.
176
178 fAsyncReading(false),
179 fEmpty(true),
180 fCycle(0),
181 fNseekMax(0),
182 fUnzipGroupSize(0),
183 fUnzipBufferSize(0),
184 fNFound(0),
185 fNMissed(0),
186 fNStalls(0),
187 fNUnzip(0)
188{
189 Init();
190}
191
192////////////////////////////////////////////////////////////////////////////////
193/// Initialization procedure common to all the constructors.
194
196{
197#ifdef R__USE_IMT
198 fUnzipTaskGroup.reset();
199#endif
200 fIOMutex = std::make_unique<TMutex>(true);
201
202 fCompBuffer = new char[16384];
203 fCompBufferSize = 16384;
204
205 fUnzipGroupSize = 102400; // Each task unzips at least 100 KB
206
207 if (fgParallel == kDisable) {
208 fParallel = false;
209 }
210 else if(fgParallel == kEnable || fgParallel == kForce) {
212
213 if(gDebug > 0)
214 Info("TTreeCacheUnzip", "Enabling Parallel Unzipping");
215
216 fParallel = true;
217
218 }
219 else {
220 Warning("TTreeCacheUnzip", "Parallel Option unknown");
221 }
222
223 // Check if asynchronous reading is supported by this TFile specialization
224 if (gEnv->GetValue("TFile.AsyncReading", 1)) {
225 if (fFile && !(fFile->ReadBufferAsync(0, 0)))
226 fAsyncReading = true;
227 }
228
229}
230
231////////////////////////////////////////////////////////////////////////////////
232/// Destructor. (in general called by the TFile destructor)
233
239
240////////////////////////////////////////////////////////////////////////////////
241/// Add a branch to the list of branches to be stored in the cache
242/// this function is called by TBranch::GetBasket
243/// Returns:
244/// - 0 branch added or already included
245/// - -1 on error
246
251
252////////////////////////////////////////////////////////////////////////////////
253/// Add a branch to the list of branches to be stored in the cache
254/// this function is called by TBranch::GetBasket
255/// Returns:
256/// - 0 branch added or already included
257/// - -1 on error
258
260{
262}
263
264////////////////////////////////////////////////////////////////////////////////
265
267{
268
269 if (fNbranches <= 0) return false;
270
271 // Fill the cache buffer with the branches in the cache.
272 fIsTransferred = false;
273
274 TTree *tree = ((TBranch*)fBranches->UncheckedAt(0))->GetTree();
275 Long64_t entry = tree->GetReadEntry();
276
277 // If the entry is in the range we previously prefetched, there is
278 // no point in retrying. Note that this will also return false
279 // during the training phase (fEntryNext is then set intentional to
280 // the end of the training phase).
281 if (fEntryCurrent <= entry && entry < fEntryNext) return false;
282
283 // Triggered by the user, not the learning phase
284 if (entry == -1) entry = 0;
285
286 TTree::TClusterIterator clusterIter = tree->GetClusterIterator(entry);
288 fEntryNext = clusterIter.GetNextEntry();
289
291 if (fEntryMax <= 0) fEntryMax = tree->GetEntries();
293
294 // Check if owner has a TEventList set. If yes we optimize for this
295 // Special case reading only the baskets containing entries in the
296 // list.
297 TEventList *elist = fTree->GetEventList();
299 if (elist) {
300 if (fTree->IsA() ==TChain::Class()) {
301 TChain *chain = (TChain*)fTree;
302 Int_t t = chain->GetTreeNumber();
303 chainOffset = chain->GetTreeOffset()[t];
304 }
305 }
306
307 //clear cache buffer
309
310 //store baskets
311 for (Int_t i = 0; i < fNbranches; i++) {
313 if (b->GetDirectory() == nullptr) continue;
314 if (b->GetDirectory()->GetFile() != fFile) continue;
315 Int_t nb = b->GetMaxBaskets();
316 Int_t *lbaskets = b->GetBasketBytes();
317 Long64_t *entries = b->GetBasketEntry();
318 if (!lbaskets || !entries) continue;
319 //we have found the branch. We now register all its baskets
320 //from the requested offset to the basket below fEntrymax
321 Int_t blistsize = b->GetListOfBaskets()->GetSize();
322 for (Int_t j=0;j<nb;j++) {
323 // This basket has already been read, skip it
324 if (j<blistsize && b->GetListOfBaskets()->UncheckedAt(j)) continue;
325
326 Long64_t pos = b->GetBasketSeek(j);
327 Int_t len = lbaskets[j];
328 if (pos <= 0 || len <= 0) continue;
329 //important: do not try to read fEntryNext, otherwise you jump to the next autoflush
330 if (entries[j] >= fEntryNext) continue;
331 if (entries[j] < entry && (j < nb - 1 && entries[j+1] <= entry)) continue;
332 if (elist) {
334 if (j < nb - 1) emax = entries[j+1] - 1;
335 if (!elist->ContainsRange(entries[j] + chainOffset, emax + chainOffset)) continue;
336 }
337 fNReadPref++;
338
340 }
341 if (gDebug > 0) printf("Entry: %lld, registering baskets branch %s, fEntryNext=%lld, fNseek=%d, fNtot=%d\n", entry, ((TBranch*)fBranches->UncheckedAt(i))->GetName(), fEntryNext, fNseek, fNtot);
342 }
343
344 // Now fix the size of the status arrays
345 ResetCache();
346 fIsLearning = false;
347
348 return true;
349}
350
351////////////////////////////////////////////////////////////////////////////////
352/// Change the underlying buffer size of the cache.
353/// The buffersize might be clamped, see TFileCacheRead::SetBufferSize
354/// Returns:
355/// - 0 if the buffer content is still available
356/// - 1 if some or all of the buffer content has been made unavailable
357/// - -1 on error
358
360{
362 if (res < 0) {
363 return res;
364 }
366 ResetCache();
367 return 1;
368}
369
370////////////////////////////////////////////////////////////////////////////////
371/// Set the minimum and maximum entry number to be processed
372/// this information helps to optimize the number of baskets to read
373/// when prefetching the branch buffers.
374
379
380////////////////////////////////////////////////////////////////////////////////
381/// It's the same as TTreeCache::StopLearningPhase but we guarantee that
382/// we start the unzipping just after getting the buffers
383
388
389////////////////////////////////////////////////////////////////////////////////
390///update pointer to current Tree and recompute pointers to the branches in the cache
391
396
397////////////////////////////////////////////////////////////////////////////////
398// //
399// From now on we have the methods concerning the threading part of the cache //
400// //
401////////////////////////////////////////////////////////////////////////////////
402
403////////////////////////////////////////////////////////////////////////////////
404/// Static function that returns the parallel option
405/// (to indicate an additional thread)
406
411
412////////////////////////////////////////////////////////////////////////////////
413/// Static function that tells wether the multithreading unzipping is activated
414
416{
417 if (fgParallel == kEnable || fgParallel == kForce)
418 return true;
419
420 return false;
421}
422
423////////////////////////////////////////////////////////////////////////////////
424/// Static function that (de)activates multithreading unzipping
425///
426/// The possible options are:
427/// - kEnable _Enable_ it, which causes an automatic detection and launches the
428/// additional thread if the number of cores in the machine is greater than
429/// one
430/// - kDisable _Disable_ will not activate the additional thread.
431/// - kForce _Force_ will start the additional thread even if there is only one
432/// core. the default will be taken as kEnable.
433///
434/// Returns 0 if there was an error, 1 otherwise.
435
444
445////////////////////////////////////////////////////////////////////////////////
446// //
447// From now on we have the methods concerning the unzipping part of the cache //
448// //
449////////////////////////////////////////////////////////////////////////////////
450
451////////////////////////////////////////////////////////////////////////////////
452/// Read the logical record header from the buffer buf.
453/// That must be the pointer tho the header part not the object by itself and
454/// must contain data of at least maxbytes
455/// Returns nread;
456///
457/// In output arguments:
458///
459/// - nbytes : number of bytes in record
460/// if negative, this is a deleted record
461/// if 0, cannot read record, wrong value of argument first
462/// - objlen : uncompressed object size
463/// - keylen : length of logical record header
464///
465/// Note that the arguments objlen and keylen are returned only
466/// if maxbytes >=16
467/// Note: This was adapted from TFile... so some things dont apply
468
470{
474 Int_t nb = 0, olen;
476 frombuf(buf, &nb);
477 nbytes = nb;
478 if (nb < 0) return nread;
479 // const Int_t headerSize = Int_t(sizeof(nb) +sizeof(versionkey) +sizeof(olen) +sizeof(datime) +sizeof(klen));
480 const Int_t headerSize = 16;
481 if (nread < headerSize) return nread;
482 frombuf(buf, &versionkey);
483 frombuf(buf, &olen);
484 frombuf(buf, &datime);
485 frombuf(buf, &klen);
486 if (!olen) olen = nbytes - klen;
487 objlen = olen;
488 keylen = klen;
489 return nread;
490}
491
492////////////////////////////////////////////////////////////////////////////////
493/// This will delete the list of buffers that are in the unzipping cache
494/// and will reset certain values in the cache.
495/// This name is ambiguos because the method doesn't reset the whole cache,
496/// only the part related to the unzipping
497/// Note: This method is completely different from TTreeCache::ResetCache(),
498/// in that method we were cleaning the prefetching buffer while here we
499/// delete the information about the unzipped buffers
500
502{
503 // Reset all the lists and wipe all the chunks
504 fCycle++;
506
507 if(fNseekMax < fNseek){
508 if (gDebug > 0)
509 Info("ResetCache", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
510
513 }
514 fEmpty = true;
515}
516
517////////////////////////////////////////////////////////////////////////////////
518/// This inflates a basket in the cache.. passing the data to a new
519/// buffer that will only wait there to be read...
520/// This function is responsible to update corresponding elements in
521/// fUnzipStatus, fUnzipChunks and fUnzipLen. Since we use atomic variables
522/// in fUnzipStatus to exclusively unzip the basket, we must update
523/// fUnzipStatus after fUnzipChunks and fUnzipLen and make sure fUnzipChunks
524/// and fUnzipLen are ready before main thread fetch the data.
525
527{
529 const Int_t hlen = 128;
530 Int_t objlen = 0, keylen = 0;
531 Int_t nbytes = 0;
532 Int_t readbuf = 0;
533
534 Long64_t rdoffs = 0;
535 Int_t rdlen = 0;
536
537 // To synchronize with the 'paging'
538 myCycle = fCycle;
539 rdoffs = fSeek[index];
541
542 Int_t loc = -1;
543 if (!fNseek || fIsLearning) {
544 return 1;
545 }
546
547 if ((myCycle != fCycle) || !fIsTransferred) {
548 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
549 return 1;
550 }
551
552 // Prepare a memory buffer of adequate size
553 char* locbuff = nullptr;
554 if (rdlen > 16384) {
555 locbuff = new char[rdlen];
556 } else if (rdlen * 3 < 16384) {
557 locbuff = new char[rdlen * 2];
558 } else {
559 locbuff = new char[16384];
560 }
561
563
564 if (readbuf <= 0) {
565 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
566 if (locbuff) delete [] locbuff;
567 return -1;
568 }
569
571
573 // If the single unzipped chunk is really too big, reset it to not processable
574 // I.e. mark it as done but set the pointer to 0
575 // This block will be unzipped synchronously in the main thread
576 // TODO: ROOT internally breaks zipped buffers into 16MB blocks, we can probably still unzip in parallel.
577 if (len > 4 * fUnzipBufferSize) {
578 if (gDebug > 0)
579 Info("UnzipCache", "Block %d is too big, skipping.", index);
580
581 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
582 if (locbuff) delete [] locbuff;
583 return 0;
584 }
585
586 // Unzip it into a new blk
587 char *ptr = nullptr;
589 if ((loclen > 0) && (loclen == objlen + keylen)) {
590 if ((myCycle != fCycle) || !fIsTransferred) {
591 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
592 if (locbuff) delete [] locbuff;
593 delete [] ptr;
594 return 1;
595 }
596 fUnzipState.SetUnzipped(index, ptr, loclen); // Set it as done
597 fNUnzip++;
598 } else {
599 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
600 delete [] ptr;
601 }
602
603 if (locbuff) delete [] locbuff;
604 return 0;
605}
606
607#ifdef R__USE_IMT
608////////////////////////////////////////////////////////////////////////////////
609/// We create a TTaskGroup and asynchronously maps each group of baskets(> 100 kB in total)
610/// to a task. In TTaskGroup, we use TThreadExecutor to do the actually work of unzipping
611/// a group of basket. The purpose of creating TTaskGroup is to avoid competing with main thread.
612
614{
615 auto mapFunction = [&]() {
616 auto unzipFunction = [&](const std::vector<Int_t> &indices) {
617 // If cache is invalidated and we should return immediately.
618 if (!fIsTransferred) return nullptr;
619
620 for (auto ii : indices) {
622 Int_t res = UnzipCache(ii);
623 if(res)
624 if (gDebug > 0)
625 Info("UnzipCache", "Unzipping failed or cache is in learning state");
626 }
627 }
628 return nullptr;
629 };
630
631 Int_t accusz = 0;
632 std::vector<std::vector<Int_t>> basketIndices;
633 std::vector<Int_t> indices;
634 if (fUnzipGroupSize <= 0) fUnzipGroupSize = 102400;
635 for (Int_t i = 0; i < fNseek; i++) {
636 while (accusz < fUnzipGroupSize) {
637 accusz += fSeekLen[i];
638 indices.push_back(i);
639 i++;
640 if (i >= fNseek) break;
641 }
642 if (i < fNseek) i--;
643 basketIndices.push_back(indices);
644 indices.clear();
645 accusz = 0;
646 }
649 };
650
651 fUnzipTaskGroup = std::make_unique<ROOT::Experimental::TTaskGroup>();
653
654 return 0;
655}
656#endif
657
658////////////////////////////////////////////////////////////////////////////////
659/// We try to read a buffer that has already been unzipped
660/// Returns -1 in case of read failure, 0 in case it's not in the
661/// cache and n>0 in case read from cache (number of bytes copied).
662/// pos and len are the original values as were passed to ReadBuffer
663/// but instead we will return the inflated buffer.
664/// Note!! : If *buf == 0 we will allocate the buffer and it will be the
665/// responsibility of the caller to free it... it is useful for example
666/// to pass it to the creator of TBuffer
667
669{
670 Int_t res = 0;
671 Int_t loc = -1;
672
673 // We go straight to TTreeCache/TfileCacheRead, in order to get the info we need
674 // pointer to the original zipped chunk
675 // its index in the original unsorted offsets lists
676 //
677 // Actually there are situations in which copying the buffer is not
678 // useful. But the choice is among doing once more a small memcpy or a binary search in a large array. I prefer the former.
679 // Also, here we prefer not to trigger the (re)population of the chunks in the TFileCacheRead. That is
680 // better to be done in the main thread.
681
683
684 if (fParallel && !fIsLearning) {
685
686 if(fNseekMax < fNseek){
687 if (gDebug > 0)
688 Info("GetUnzipBuffer", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
689
692 }
693
695 if ((fCycle == myCycle) && (loc >= 0) && (loc < fNseek) && (pos == fSeekSort[loc])) {
696
697 // The buffer is, at minimum, in the file cache. We must know its index in the requests list
698 // In order to get its info
700
701 do {
702
703 // If the block is ready we get it immediately.
704 // And also we don't have to alloc the blks. This is supposed to be
705 // the main thread of the app.
707 if(!(*buf)) {
708 *buf = fUnzipState.fUnzipChunks[seekidx].get();
710 *free = true;
711 } else {
714 *free = false;
715 }
716
717 fNFound++;
719 }
720
721 // If the requested basket is being unzipped by a background task, we try to steal a blk to unzip.
722 Int_t reqi = -1;
723
725 if (fEmpty) {
726 for (Int_t ii = 0; ii < fNseek; ++ii) {
727 Int_t idx = (seekidx + 1 + ii) % fNseek;
728 if (fUnzipState.IsUntouched(idx)) {
729 if(fUnzipState.TryUnzipping(idx)) {
730 reqi = idx;
731 break;
732 }
733 }
734 }
735 if (reqi < 0) {
736 fEmpty = false;
737 } else {
739 }
740 }
741
742 if ( myCycle != fCycle ) {
743 if (gDebug > 0)
744 Info("GetUnzipBuffer", "Sudden paging Break!!! fNseek: %d, fIsLearning:%d",
746
747 seekidx = -1;
748 break;
749 }
750 }
751
752 } while (fUnzipState.IsProgress(seekidx));
753
754 // Here the block is not pending. It could be done or aborted or not yet being processed.
755 if ( (seekidx >= 0) && (fUnzipState.IsUnzipped(seekidx)) ) {
756 if(!(*buf)) {
757 *buf = fUnzipState.fUnzipChunks[seekidx].get();
759 *free = true;
760 } else {
763 *free = false;
764 }
765
766 fNStalls++;
768 } else {
769 // This is a complete miss. We want to avoid the background tasks
770 // to try unzipping this block in the future.
772 }
773 } else {
774 loc = -1;
775 fIsTransferred = false;
776 }
777 }
778
779 if (len > fCompBufferSize) {
780 if(fCompBuffer) delete [] fCompBuffer;
781 fCompBuffer = new char[len];
783 } else {
784 if (fCompBufferSize > len * 4) {
785 if(fCompBuffer) delete [] fCompBuffer;
786 fCompBuffer = new char[len*2];
787 fCompBufferSize = len * 2;
788 }
789 }
790
791 res = 0;
792 if (!ReadBufferExt(fCompBuffer, pos, len, loc)) {
793 // Cache is invalidated and we need to wait for all unzipping tasks to be finished before fill new baskets in cache.
794#ifdef R__USE_IMT
796 fUnzipTaskGroup->Cancel();
797 fUnzipTaskGroup.reset();
798 }
799#endif
800 {
801 // Fill new baskets into cache.
802 R__LOCKGUARD(fIOMutex.get());
803 fFile->Seek(pos);
805 } // end of lock scope
806#ifdef R__USE_IMT
808 CreateTasks();
809 }
810#endif
811 }
812
813 if (res) res = -1;
814
815 if (!res) {
816 res = UnzipBuffer(buf, fCompBuffer);
817 *free = true;
818 }
819
820 if (!fIsLearning) {
821 fNMissed++;
822 }
823
824 return res;
825}
826
827////////////////////////////////////////////////////////////////////////////////
828/// static function: Sets the unzip relative buffer size
829
834
835////////////////////////////////////////////////////////////////////////////////
836/// Sets the size for the unzipping cache... by default it should be
837/// two times the size of the prefetching cache
838
840{
841 fUnzipBufferSize = bufferSize;
842}
843
844////////////////////////////////////////////////////////////////////////////////
845/// Unzips a ROOT specific buffer... by reading the header at the beginning.
846/// returns the size of the inflated buffer or -1 if error
847/// Note!! : If *dest == 0 we will allocate the buffer and it will be the
848/// responsibility of the caller to free it... it is useful for example
849/// to pass it to the creator of TBuffer
850/// src is the original buffer with the record (header+compressed data)
851/// *dest is the inflated buffer (including the header)
852
854{
855 Int_t uzlen = 0;
856 bool alloc = false;
857
858 // Here we read the header of the buffer
859 const Int_t hlen = 128;
860 Int_t nbytes = 0, objlen = 0, keylen = 0;
862
863 if (!(*dest)) {
864 /* early consistency check */
865 UChar_t *bufcur = (UChar_t *) (src + keylen);
866 Int_t nin, nbuf;
867 if(objlen > nbytes - keylen && R__unzip_header(&nin, bufcur, &nbuf) != 0) {
868 Error("UnzipBuffer", "Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
869 uzlen = -1;
870 return uzlen;
871 }
872 Int_t l = keylen + objlen;
873 *dest = new char[l];
874 alloc = true;
875 }
876 // Must unzip the buffer
877 // fSeekPos[ind]; adress of zipped buffer
878 // fSeekLen[ind]; len of the zipped buffer
879 // &fBuffer[fSeekPos[ind]]; memory address
880
881 // This is similar to TBasket::ReadBasketBuffers
882 bool oldCase = objlen == nbytes - keylen
883 && ((TBranch*)fBranches->UncheckedAt(0))->GetCompressionLevel() != 0
884 && fFile->GetVersion() <= 30401;
885
886 if (objlen > nbytes-keylen || oldCase) {
887
888 // Copy the key
889 memcpy(*dest, src, keylen);
890 uzlen += keylen;
891
892 char *objbuf = *dest + keylen;
893 UChar_t *bufcur = (UChar_t *) (src + keylen);
894 Int_t nin, nbuf;
895 Int_t nout = 0;
896 Int_t noutot = 0;
897
898 while (true) {
900 if (hc != 0) break;
901 if (gDebug > 2)
902 Info("UnzipBuffer", " nin:%d, nbuf:%d, bufcur[3] :%d, bufcur[4] :%d, bufcur[5] :%d ",
903 nin, nbuf, bufcur[3], bufcur[4], bufcur[5]);
904 if (oldCase && (nin > objlen || nbuf > objlen)) {
905 if (gDebug > 2)
906 Info("UnzipBuffer", "oldcase objlen :%d ", objlen);
907
908 //buffer was very likely not compressed in an old version
910 uzlen += objlen;
911 return uzlen;
912 }
913
915
916 if (gDebug > 2)
917 Info("UnzipBuffer", "R__unzip nin:%d, bufcur:%p, nbuf:%d, objbuf:%p, nout:%d",
919
920 if (!nout) break;
921 noutot += nout;
922 if (noutot >= objlen) break;
923 bufcur += nin;
924 objbuf += nout;
925 }
926
927 if (noutot != objlen) {
928 Error("UnzipBuffer", "nbytes = %d, keylen = %d, objlen = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d",
930 uzlen = -1;
931 if(alloc) delete [] *dest;
932 *dest = nullptr;
933 return uzlen;
934 }
935 uzlen += objlen;
936 } else {
937 memcpy(*dest, src, keylen);
938 uzlen += keylen;
940 uzlen += objlen;
941 }
942 return uzlen;
943}
944
945////////////////////////////////////////////////////////////////////////////////
946
948
949 printf("******TreeCacheUnzip statistics for file: %s ******\n",fFile->GetName());
950 printf("Max allowed mem for pending buffers: %lld\n", fUnzipBufferSize);
951 printf("Number of blocks unzipped by threads: %d\n", fNUnzip);
952 printf("Number of hits: %d\n", fNFound);
953 printf("Number of stalls: %d\n", fNStalls);
954 printf("Number of misses: %d\n", fNMissed);
955
957}
958
959////////////////////////////////////////////////////////////////////////////////
960
void frombuf(char *&buf, Bool_t *x)
Definition Bytes.h:278
#define b(i)
Definition RSha256.hxx:100
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
int Int_t
Signed integer 4 bytes (int)
Definition RtypesCore.h:59
unsigned char Byte_t
Byte (8 bits) (unsigned char)
Definition RtypesCore.h:78
short Version_t
Class version identifier (short)
Definition RtypesCore.h:79
unsigned char UChar_t
Unsigned Character 1 byte (unsigned char)
Definition RtypesCore.h:52
float Float_t
Float 4 bytes (float)
Definition RtypesCore.h:71
short Short_t
Signed Short integer 2 bytes (short)
Definition RtypesCore.h:53
double Double_t
Double 8 bytes.
Definition RtypesCore.h:73
long long Long64_t
Portable signed long integer 8 bytes.
Definition RtypesCore.h:83
const char Option_t
Option string (const char)
Definition RtypesCore.h:80
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
R__EXTERN TEnv * gEnv
Definition TEnv.h:170
Option_t Option_t option
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t dest
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t index
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t src
Int_t gDebug
Global variable setting the debug level. Set to 0 to disable, increase it in steps of 1 to increase t...
Definition TROOT.cxx:627
void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout)
int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout)
#define R__LOCKGUARD(mutex)
This class provides a simple interface to execute the same task multiple times in parallel threads,...
A TTree is a list of TBranches.
Definition TBranch.h:93
A chain is a collection of files containing TTree objects.
Definition TChain.h:33
static TClass * Class()
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition TEnv.cxx:490
<div class="legacybox"><h2>Legacy Code</h2> TEventList is a legacy interface: there will be no bug fi...
Definition TEventList.h:31
virtual bool ContainsRange(Long64_t entrymin, Long64_t entrymax)
Return TRUE if list contains entries from entrymin to entrymax included.
Int_t * fSeekIndex
[fNseek] sorted index table of fSeek
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
Long64_t * fSeekSort
[fNseek] Position on file of buffers to be prefetched (sorted)
Int_t * fSeekLen
[fNseek] Length of buffers to be prefetched
Int_t fNtot
Total size of prefetched blocks.
virtual void Prefetch(Long64_t pos, Int_t len)
Add block of length len at position pos in the list of blocks to be prefetched.
Long64_t * fSeek
[fNseek] Position on file of buffers to be prefetched
Bool_t fIsTransferred
True when fBuffer contains something valid.
TFile * fFile
Pointer to file.
Int_t fNseek
Number of blocks to be prefetched.
virtual Int_t GetBufferSize() const
virtual void Seek(Long64_t offset, ERelativeTo pos=kBeg)
Seek to a specific position in the file. Pos it either kBeg, kCur or kEnd.
Definition TFile.cxx:2304
Int_t GetVersion() const
Definition TFile.h:324
virtual Bool_t ReadBufferAsync(Long64_t offs, Int_t len)
Definition TFile.cxx:4877
virtual Bool_t ReadBuffer(char *buf, Int_t len)
Read a buffer from the file.
Definition TFile.cxx:1800
const char * GetName() const override
Returns name of object.
Definition TNamed.h:49
TObject * UncheckedAt(Int_t i) const
Definition TObjArray.h:84
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition TObject.cxx:1057
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition TObject.cxx:1071
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition TObject.cxx:1045
UnzipState_t fUnzipState
void Init()
Initialization procedure common to all the constructors.
Int_t UnzipCache(Int_t index)
This inflates a basket in the cache.
Int_t SetBufferSize(Long64_t buffersize) override
Change the underlying buffer size of the cache.
Int_t fNMissed
! number of blocks that were not found in the cache and were unzipped
void UpdateBranches(TTree *tree) override
update pointer to current Tree and recompute pointers to the branches in the cache
Int_t AddBranch(TBranch *b, bool subbranches=false) override
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
void ResetCache() override
This will delete the list of buffers that are in the unzipping cache and will reset certain values in...
void SetEntryRange(Long64_t emin, Long64_t emax) override
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
static bool IsParallelUnzip()
Static function that tells wether the multithreading unzipping is activated.
Int_t fNseekMax
! fNseek can change so we need to know its max size
Int_t fNStalls
! number of hits which caused a stall
static TTreeCacheUnzip::EParUnzipMode fgParallel
Indicate if we want to activate the parallelism.
std::unique_ptr< ROOT::Experimental::TTaskGroup > fUnzipTaskGroup
static Int_t SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option=TTreeCacheUnzip::kEnable)
Static function that (de)activates multithreading unzipping.
bool FillBuffer() override
Fill the cache buffer with the branches in the cache.
Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc) override
Int_t UnzipBuffer(char **dest, char *src)
Unzips a ROOT specific buffer... by reading the header at the beginning.
Int_t GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
Read the logical record header from the buffer buf.
static void SetUnzipRelBufferSize(Float_t relbufferSize)
static function: Sets the unzip relative buffer size
std::unique_ptr< TMutex > fIOMutex
Int_t fNUnzip
! number of blocks that were unzipped
Int_t CreateTasks()
We create a TTaskGroup and asynchronously maps each group of baskets(> 100 kB in total) to a task.
Int_t GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, bool *free) override
We try to read a buffer that has already been unzipped Returns -1 in case of read failure,...
Long64_t fUnzipBufferSize
! Max Size for the ready unzipped blocks (default is 2*fBufferSize)
void SetUnzipBufferSize(Long64_t bufferSize)
Sets the size for the unzipping cache... by default it should be two times the size of the prefetchin...
Int_t fUnzipGroupSize
! Min accumulated size of a group of baskets ready to be unzipped by a IMT task
void StopLearningPhase() override
It's the same as TTreeCache::StopLearningPhase but we guarantee that we start the unzipping just afte...
void Print(Option_t *option="") const override
Print cache statistics.
bool fParallel
Indicate if we want to activate the parallelism (for this instance)
Int_t fNFound
! number of blocks that were found in the cache
static Double_t fgRelBuffSize
This is the percentage of the TTreeCacheUnzip that will be used.
~TTreeCacheUnzip() override
Destructor. (in general called by the TFile destructor)
static EParUnzipMode GetParallelUnzip()
Static function that returns the parallel option (to indicate an additional thread)
A cache to speed-up the reading of ROOT datasets.
Definition TTreeCache.h:32
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
Int_t SetBufferSize(Long64_t buffersize) override
Change the underlying buffer size of the cache.
Long64_t fEntryMin
! first entry in the cache
Definition TTreeCache.h:38
Long64_t fEntryNext
! next entry number where cache must be filled
Definition TTreeCache.h:41
TTree * GetTree() const
Definition TTreeCache.h:149
bool fIsLearning
! true if cache is in learning mode
Definition TTreeCache.h:54
virtual void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
Long64_t fEntryMax
! last entry in the cache
Definition TTreeCache.h:39
Long64_t fEntryCurrent
! current lowest entry number in the cache
Definition TTreeCache.h:40
Int_t fNReadPref
Number of blocks that were prefetched.
Definition TTreeCache.h:49
TTree * fTree
! pointer to the current Tree
Definition TTreeCache.h:53
virtual void StopLearningPhase()
This is the counterpart of StartLearningPhase() and can be used to stop the learning phase.
Int_t fNbranches
! Number of branches in the cache
Definition TTreeCache.h:44
void Print(Option_t *option="") const override
Print cache statistics.
Int_t AddBranch(TBranch *b, bool subgbranches=false) override
Add a branch to the list of branches to be stored in the cache this function is called by the user vi...
TObjArray * fBranches
! List of branches to be stored in the cache
Definition TTreeCache.h:51
Helper class to iterate over cluster of baskets.
Definition TTree.h:306
A TTree represents a columnar dataset.
Definition TTree.h:89
TEventList * GetEventList() const
Definition TTree.h:552
TClass * IsA() const override
Definition TTree.h:744
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:600
Long64_t BinarySearch(Long64_t n, const T *array, T value)
Binary search in an array of n values to locate value.
Definition TMathBase.h:348
void Reset(Int_t oldSize, Int_t newSize)
Reset all baskets' state arrays.
void Clear(Int_t size)
Clear all baskets' state arrays.
bool IsUnzipped(Int_t index) const
Check if the basket is unzipped already.
bool IsFinished(Int_t index) const
std::atomic< Byte_t > * fUnzipStatus
! [fNSeek]
void SetUnzipped(Int_t index, char *buf, Int_t len)
std::vector< Int_t > fUnzipLen
! [fNseek] Length of the unzipped buffers
bool TryUnzipping(Int_t index)
Start unzipping the basket if it is untouched yet.
bool IsProgress(Int_t index) const
bool IsUntouched(Int_t index) const
void SetFinished(Int_t index)
Set cache as finished.
std::unique_ptr< char[]> * fUnzipChunks
! [fNseek] Individual unzipped chunks. Their summed size is kept under control.
TLine l
Definition textangle.C:4