Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TTreeCacheUnzip.cxx
Go to the documentation of this file.
1// Authors: Rene Brun 04/06/2006
2// Leandro Franco 10/04/2008
3// Fabrizio Furano (CERN) Aug 2009
4
5/*************************************************************************
6 * Copyright (C) 1995-2018, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13/**
14\class TTreeCacheUnzip
15\ingroup tree
16
17A TTreeCache which exploits parallelized decompression of its own content.
18
19*/
20
21#include "RZip.h"
22#include "TTreeCacheUnzip.h"
23#include "TBranch.h"
24#include "TChain.h"
25#include "TEnv.h"
26#include "TEventList.h"
27#include "TFile.h"
28#include "TMath.h"
29#include "TROOT.h"
30#include "TMutex.h"
31
32#ifdef R__USE_IMT
34#include "ROOT/TTaskGroup.hxx"
35#endif
36
37#include <memory>
38
40
41// The unzip cache does not consume memory by itself, it just allocates in advance
42// mem blocks which are then picked as they are by the baskets.
43// Hence there is no good reason to limit it too much
45
46
47////////////////////////////////////////////////////////////////////////////////
48/// Clear all baskets' state arrays.
49
51 for (Int_t i = 0; i < size; i++) {
52 if (!fUnzipLen.empty()) fUnzipLen[i] = 0;
53 if (fUnzipChunks) {
54 if (fUnzipChunks[i]) fUnzipChunks[i].reset();
55 }
56 if (fUnzipStatus) fUnzipStatus[i].store(0);
57 }
58}
59
60////////////////////////////////////////////////////////////////////////////////
61
63 return fUnzipStatus[index].load() == kUntouched;
64}
65
66////////////////////////////////////////////////////////////////////////////////
67
69 return fUnzipStatus[index].load() == kProgress;
70}
71
72////////////////////////////////////////////////////////////////////////////////
73
75 return fUnzipStatus[index].load() == kFinished;
76}
77
78////////////////////////////////////////////////////////////////////////////////
79/// Check if the basket is unzipped already. We must make sure the length in
80/// fUnzipLen is larger than 0.
81
83 return (fUnzipStatus[index].load() == kFinished) && (fUnzipChunks[index].get()) && (fUnzipLen[index] > 0);
84}
85
86////////////////////////////////////////////////////////////////////////////////
87/// Reset all baskets' state arrays. This function is only called by main
88/// thread and parallel processing from upper layers should be disabled such
89/// as IMT in TTree::GetEntry(). Other threads should not call this function
90/// since it is not thread-safe.
91
93 std::vector<Int_t> aUnzipLen = std::vector<Int_t>(newSize, 0);
94 std::unique_ptr<char[]> *aUnzipChunks = new std::unique_ptr<char[]>[newSize];
95 std::atomic<Byte_t> *aUnzipStatus = new std::atomic<Byte_t>[newSize];
96
97 for (Int_t i = 0; i < newSize; ++i)
98 aUnzipStatus[i].store(0);
99
100 for (Int_t i = 0; i < oldSize; i++) {
101 aUnzipLen[i] = fUnzipLen[i];
102 aUnzipChunks[i] = std::move(fUnzipChunks[i]);
103 aUnzipStatus[i].store(fUnzipStatus[i].load());
104 }
105
106 if (fUnzipChunks) delete [] fUnzipChunks;
107 if (fUnzipStatus) delete [] fUnzipStatus;
108
109 fUnzipLen = aUnzipLen;
110 fUnzipChunks = aUnzipChunks;
111 fUnzipStatus = aUnzipStatus;
112}
113
114////////////////////////////////////////////////////////////////////////////////
115/// Set cache as finished.
116/// There are three scenarios that a basket is set as finished:
117/// 1. The basket has already been unzipped.
118/// 2. The thread is aborted from unzipping process.
119/// 3. To avoid other tasks/threads work on this basket,
120/// main thread marks the basket as finished and designates itself
121/// to unzip this basket.
122
124 fUnzipLen[index] = 0;
125 fUnzipChunks[index].reset();
126 fUnzipStatus[index].store((Byte_t)kFinished);
127}
128
129////////////////////////////////////////////////////////////////////////////////
130
132 fUnzipChunks[index].reset();
133 fUnzipStatus[index].store((Byte_t)kFinished);
134}
135
136////////////////////////////////////////////////////////////////////////////////
137
139 // Update status array at the very end because we need to be synchronous with the main thread.
140 fUnzipLen[index] = len;
141 fUnzipChunks[index].reset(buf);
142 fUnzipStatus[index].store((Byte_t)kFinished);
143}
144
145////////////////////////////////////////////////////////////////////////////////
146/// Start unzipping the basket if it is untouched yet.
147
151 return fUnzipStatus[index].compare_exchange_weak(oldValue, newValue, std::memory_order_release, std::memory_order_relaxed);
152}
153
154////////////////////////////////////////////////////////////////////////////////
155
158 fEmpty(true),
159 fCycle(0),
160 fNseekMax(0),
163 fNFound(0),
164 fNMissed(0),
165 fNStalls(0),
166 fNUnzip(0)
167{
168 // Default Constructor.
169 Init();
170}
171
172////////////////////////////////////////////////////////////////////////////////
173/// Constructor.
174
176 fAsyncReading(false),
177 fEmpty(true),
178 fCycle(0),
179 fNseekMax(0),
180 fUnzipGroupSize(0),
181 fUnzipBufferSize(0),
182 fNFound(0),
183 fNMissed(0),
184 fNStalls(0),
185 fNUnzip(0)
186{
187 Init();
188}
189
190////////////////////////////////////////////////////////////////////////////////
191/// Initialization procedure common to all the constructors.
192
194{
195#ifdef R__USE_IMT
196 fUnzipTaskGroup.reset();
197#endif
198 fIOMutex = std::make_unique<TMutex>(true);
199
200 fCompBuffer = new char[16384];
201 fCompBufferSize = 16384;
202
203 fUnzipGroupSize = 102400; // Each task unzips at least 100 KB
204
205 if (fgParallel == kDisable) {
206 fParallel = false;
207 }
208 else if(fgParallel == kEnable || fgParallel == kForce) {
210
211 if(gDebug > 0)
212 Info("TTreeCacheUnzip", "Enabling Parallel Unzipping");
213
214 fParallel = true;
215
216 }
217 else {
218 Warning("TTreeCacheUnzip", "Parallel Option unknown");
219 }
220
221 // Check if asynchronous reading is supported by this TFile specialization
222 if (gEnv->GetValue("TFile.AsyncReading", 1)) {
223 if (fFile && !(fFile->ReadBufferAsync(0, 0)))
224 fAsyncReading = true;
225 }
226
227}
228
229////////////////////////////////////////////////////////////////////////////////
230/// Destructor. (in general called by the TFile destructor)
231
237
238////////////////////////////////////////////////////////////////////////////////
239/// Add a branch to the list of branches to be stored in the cache
240/// this function is called by TBranch::GetBasket
241/// Returns:
242/// - 0 branch added or already included
243/// - -1 on error
244
249
250////////////////////////////////////////////////////////////////////////////////
251/// Add a branch to the list of branches to be stored in the cache
252/// this function is called by TBranch::GetBasket
253/// Returns:
254/// - 0 branch added or already included
255/// - -1 on error
256
258{
260}
261
262////////////////////////////////////////////////////////////////////////////////
263
265{
266
267 if (fNbranches <= 0) return false;
268
269 // Fill the cache buffer with the branches in the cache.
270 fIsTransferred = false;
271
272 TTree *tree = ((TBranch*)fBranches->UncheckedAt(0))->GetTree();
273 Long64_t entry = tree->GetReadEntry();
274
275 // If the entry is in the range we previously prefetched, there is
276 // no point in retrying. Note that this will also return false
277 // during the training phase (fEntryNext is then set intentional to
278 // the end of the training phase).
279 if (fEntryCurrent <= entry && entry < fEntryNext) return false;
280
281 // Triggered by the user, not the learning phase
282 if (entry == -1) entry = 0;
283
284 TTree::TClusterIterator clusterIter = tree->GetClusterIterator(entry);
286 fEntryNext = clusterIter.GetNextEntry();
287
289 if (fEntryMax <= 0) fEntryMax = tree->GetEntries();
291
292 // Check if owner has a TEventList set. If yes we optimize for this
293 // Special case reading only the baskets containing entries in the
294 // list.
295 TEventList *elist = fTree->GetEventList();
297 if (elist) {
298 if (fTree->IsA() ==TChain::Class()) {
299 TChain *chain = (TChain*)fTree;
300 Int_t t = chain->GetTreeNumber();
301 chainOffset = chain->GetTreeOffset()[t];
302 }
303 }
304
305 //clear cache buffer
307
308 //store baskets
309 for (Int_t i = 0; i < fNbranches; i++) {
311 if (b->GetDirectory() == nullptr) continue;
312 if (b->GetDirectory()->GetFile() != fFile) continue;
313 Int_t nb = b->GetMaxBaskets();
314 Int_t *lbaskets = b->GetBasketBytes();
315 Long64_t *entries = b->GetBasketEntry();
316 if (!lbaskets || !entries) continue;
317 //we have found the branch. We now register all its baskets
318 //from the requested offset to the basket below fEntrymax
319 Int_t blistsize = b->GetListOfBaskets()->GetSize();
320 for (Int_t j=0;j<nb;j++) {
321 // This basket has already been read, skip it
322 if (j<blistsize && b->GetListOfBaskets()->UncheckedAt(j)) continue;
323
324 Long64_t pos = b->GetBasketSeek(j);
325 Int_t len = lbaskets[j];
326 if (pos <= 0 || len <= 0) continue;
327 //important: do not try to read fEntryNext, otherwise you jump to the next autoflush
328 if (entries[j] >= fEntryNext) continue;
329 if (entries[j] < entry && (j < nb - 1 && entries[j+1] <= entry)) continue;
330 if (elist) {
332 if (j < nb - 1) emax = entries[j+1] - 1;
333 if (!elist->ContainsRange(entries[j] + chainOffset, emax + chainOffset)) continue;
334 }
335 fNReadPref++;
336
338 }
339 if (gDebug > 0) printf("Entry: %lld, registering baskets branch %s, fEntryNext=%lld, fNseek=%d, fNtot=%d\n", entry, ((TBranch*)fBranches->UncheckedAt(i))->GetName(), fEntryNext, fNseek, fNtot);
340 }
341
342 // Now fix the size of the status arrays
343 ResetCache();
344 fIsLearning = false;
345
346 return true;
347}
348
349////////////////////////////////////////////////////////////////////////////////
350/// Change the underlying buffer size of the cache.
351/// The buffersize might be clamped, see TFileCacheRead::SetBufferSize
352/// Returns:
353/// - 0 if the buffer content is still available
354/// - 1 if some or all of the buffer content has been made unavailable
355/// - -1 on error
356
358{
360 if (res < 0) {
361 return res;
362 }
364 ResetCache();
365 return 1;
366}
367
368////////////////////////////////////////////////////////////////////////////////
369/// Set the minimum and maximum entry number to be processed
370/// this information helps to optimize the number of baskets to read
371/// when prefetching the branch buffers.
372
377
378////////////////////////////////////////////////////////////////////////////////
379/// It's the same as TTreeCache::StopLearningPhase but we guarantee that
380/// we start the unzipping just after getting the buffers
381
386
387////////////////////////////////////////////////////////////////////////////////
388///update pointer to current Tree and recompute pointers to the branches in the cache
389
394
395////////////////////////////////////////////////////////////////////////////////
396// //
397// From now on we have the methods concerning the threading part of the cache //
398// //
399////////////////////////////////////////////////////////////////////////////////
400
401////////////////////////////////////////////////////////////////////////////////
402/// Static function that returns the parallel option
403/// (to indicate an additional thread)
404
409
410////////////////////////////////////////////////////////////////////////////////
411/// Static function that tells wether the multithreading unzipping is activated
412
414{
415 if (fgParallel == kEnable || fgParallel == kForce)
416 return true;
417
418 return false;
419}
420
421////////////////////////////////////////////////////////////////////////////////
422/// Static function that (de)activates multithreading unzipping
423///
424/// The possible options are:
425/// - kEnable _Enable_ it, which causes an automatic detection and launches the
426/// additional thread if the number of cores in the machine is greater than
427/// one
428/// - kDisable _Disable_ will not activate the additional thread.
429/// - kForce _Force_ will start the additional thread even if there is only one
430/// core. the default will be taken as kEnable.
431///
432/// Returns 0 if there was an error, 1 otherwise.
433
442
443////////////////////////////////////////////////////////////////////////////////
444// //
445// From now on we have the methods concerning the unzipping part of the cache //
446// //
447////////////////////////////////////////////////////////////////////////////////
448
449////////////////////////////////////////////////////////////////////////////////
450/// Read the logical record header from the buffer buf.
451/// That must be the pointer tho the header part not the object by itself and
452/// must contain data of at least maxbytes
453/// Returns nread;
454///
455/// In output arguments:
456///
457/// - nbytes : number of bytes in record
458/// if negative, this is a deleted record
459/// if 0, cannot read record, wrong value of argument first
460/// - objlen : uncompressed object size
461/// - keylen : length of logical record header
462///
463/// Note that the arguments objlen and keylen are returned only
464/// if maxbytes >=16
465/// Note: This was adapted from TFile... so some things dont apply
466
468{
472 Int_t nb = 0, olen;
474 frombuf(buf, &nb);
475 nbytes = nb;
476 if (nb < 0) return nread;
477 // const Int_t headerSize = Int_t(sizeof(nb) +sizeof(versionkey) +sizeof(olen) +sizeof(datime) +sizeof(klen));
478 const Int_t headerSize = 16;
479 if (nread < headerSize) return nread;
480 frombuf(buf, &versionkey);
481 frombuf(buf, &olen);
482 frombuf(buf, &datime);
483 frombuf(buf, &klen);
484 if (!olen) olen = nbytes - klen;
485 objlen = olen;
486 keylen = klen;
487 return nread;
488}
489
490////////////////////////////////////////////////////////////////////////////////
491/// This will delete the list of buffers that are in the unzipping cache
492/// and will reset certain values in the cache.
493/// This name is ambiguos because the method doesn't reset the whole cache,
494/// only the part related to the unzipping
495/// Note: This method is completely different from TTreeCache::ResetCache(),
496/// in that method we were cleaning the prefetching buffer while here we
497/// delete the information about the unzipped buffers
498
500{
501 // Reset all the lists and wipe all the chunks
502 fCycle++;
504
505 if(fNseekMax < fNseek){
506 if (gDebug > 0)
507 Info("ResetCache", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
508
511 }
512 fEmpty = true;
513}
514
515////////////////////////////////////////////////////////////////////////////////
516/// This inflates a basket in the cache.. passing the data to a new
517/// buffer that will only wait there to be read...
518/// This function is responsible to update corresponding elements in
519/// fUnzipStatus, fUnzipChunks and fUnzipLen. Since we use atomic variables
520/// in fUnzipStatus to exclusively unzip the basket, we must update
521/// fUnzipStatus after fUnzipChunks and fUnzipLen and make sure fUnzipChunks
522/// and fUnzipLen are ready before main thread fetch the data.
523
525{
527 const Int_t hlen = 128;
528 Int_t objlen = 0, keylen = 0;
529 Int_t nbytes = 0;
530 Int_t readbuf = 0;
531
532 Long64_t rdoffs = 0;
533 Int_t rdlen = 0;
534
535 // To synchronize with the 'paging'
536 myCycle = fCycle;
537 rdoffs = fSeek[index];
539
540 Int_t loc = -1;
541 if (!fNseek || fIsLearning) {
542 return 1;
543 }
544
545 if ((myCycle != fCycle) || !fIsTransferred) {
546 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
547 return 1;
548 }
549
550 // Prepare a memory buffer of adequate size
551 char* locbuff = nullptr;
552 if (rdlen > 16384) {
553 locbuff = new char[rdlen];
554 } else if (rdlen * 3 < 16384) {
555 locbuff = new char[rdlen * 2];
556 } else {
557 locbuff = new char[16384];
558 }
559
561
562 if (readbuf <= 0) {
563 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
564 if (locbuff) delete [] locbuff;
565 return -1;
566 }
567
569
571 // If the single unzipped chunk is really too big, reset it to not processable
572 // I.e. mark it as done but set the pointer to 0
573 // This block will be unzipped synchronously in the main thread
574 // TODO: ROOT internally breaks zipped buffers into 16MB blocks, we can probably still unzip in parallel.
575 if (len > 4 * fUnzipBufferSize) {
576 if (gDebug > 0)
577 Info("UnzipCache", "Block %d is too big, skipping.", index);
578
579 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
580 if (locbuff) delete [] locbuff;
581 return 0;
582 }
583
584 // Unzip it into a new blk
585 char *ptr = nullptr;
587 if ((loclen > 0) && (loclen == objlen + keylen)) {
588 if ((myCycle != fCycle) || !fIsTransferred) {
589 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
590 if (locbuff) delete [] locbuff;
591 delete [] ptr;
592 return 1;
593 }
594 fUnzipState.SetUnzipped(index, ptr, loclen); // Set it as done
595 fNUnzip++;
596 } else {
597 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
598 delete [] ptr;
599 }
600
601 if (locbuff) delete [] locbuff;
602 return 0;
603}
604
605#ifdef R__USE_IMT
606////////////////////////////////////////////////////////////////////////////////
607/// We create a TTaskGroup and asynchronously maps each group of baskets(> 100 kB in total)
608/// to a task. In TTaskGroup, we use TThreadExecutor to do the actually work of unzipping
609/// a group of basket. The purpose of creating TTaskGroup is to avoid competing with main thread.
610
612{
613 auto mapFunction = [&]() {
614 auto unzipFunction = [&](const std::vector<Int_t> &indices) {
615 // If cache is invalidated and we should return immediately.
616 if (!fIsTransferred) return nullptr;
617
618 for (auto ii : indices) {
620 Int_t res = UnzipCache(ii);
621 if(res)
622 if (gDebug > 0)
623 Info("UnzipCache", "Unzipping failed or cache is in learning state");
624 }
625 }
626 return nullptr;
627 };
628
629 Int_t accusz = 0;
630 std::vector<std::vector<Int_t>> basketIndices;
631 std::vector<Int_t> indices;
632 if (fUnzipGroupSize <= 0) fUnzipGroupSize = 102400;
633 for (Int_t i = 0; i < fNseek; i++) {
634 while (accusz < fUnzipGroupSize) {
635 accusz += fSeekLen[i];
636 indices.push_back(i);
637 i++;
638 if (i >= fNseek) break;
639 }
640 if (i < fNseek) i--;
641 basketIndices.push_back(indices);
642 indices.clear();
643 accusz = 0;
644 }
647 };
648
649 fUnzipTaskGroup = std::make_unique<ROOT::Experimental::TTaskGroup>();
651
652 return 0;
653}
654#endif
655
656////////////////////////////////////////////////////////////////////////////////
657/// We try to read a buffer that has already been unzipped
658/// Returns -1 in case of read failure, 0 in case it's not in the
659/// cache and n>0 in case read from cache (number of bytes copied).
660/// pos and len are the original values as were passed to ReadBuffer
661/// but instead we will return the inflated buffer.
662/// Note!! : If *buf == 0 we will allocate the buffer and it will be the
663/// responsibility of the caller to free it... it is useful for example
664/// to pass it to the creator of TBuffer
665
667{
668 Int_t res = 0;
669 Int_t loc = -1;
670
671 // We go straight to TTreeCache/TfileCacheRead, in order to get the info we need
672 // pointer to the original zipped chunk
673 // its index in the original unsorted offsets lists
674 //
675 // Actually there are situations in which copying the buffer is not
676 // useful. But the choice is among doing once more a small memcpy or a binary search in a large array. I prefer the former.
677 // Also, here we prefer not to trigger the (re)population of the chunks in the TFileCacheRead. That is
678 // better to be done in the main thread.
679
681
682 if (fParallel && !fIsLearning) {
683
684 if(fNseekMax < fNseek){
685 if (gDebug > 0)
686 Info("GetUnzipBuffer", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
687
690 }
691
693 if ((fCycle == myCycle) && (loc >= 0) && (loc < fNseek) && (pos == fSeekSort[loc])) {
694
695 // The buffer is, at minimum, in the file cache. We must know its index in the requests list
696 // In order to get its info
698
699 do {
700
701 // If the block is ready we get it immediately.
702 // And also we don't have to alloc the blks. This is supposed to be
703 // the main thread of the app.
705 if(!(*buf)) {
706 *buf = fUnzipState.fUnzipChunks[seekidx].get();
708 *free = true;
709 } else {
712 *free = false;
713 }
714
715 fNFound++;
717 }
718
719 // If the requested basket is being unzipped by a background task, we try to steal a blk to unzip.
720 Int_t reqi = -1;
721
723 if (fEmpty) {
724 for (Int_t ii = 0; ii < fNseek; ++ii) {
725 Int_t idx = (seekidx + 1 + ii) % fNseek;
726 if (fUnzipState.IsUntouched(idx)) {
727 if(fUnzipState.TryUnzipping(idx)) {
728 reqi = idx;
729 break;
730 }
731 }
732 }
733 if (reqi < 0) {
734 fEmpty = false;
735 } else {
737 }
738 }
739
740 if ( myCycle != fCycle ) {
741 if (gDebug > 0)
742 Info("GetUnzipBuffer", "Sudden paging Break!!! fNseek: %d, fIsLearning:%d",
744
745 seekidx = -1;
746 break;
747 }
748 }
749
750 } while (fUnzipState.IsProgress(seekidx));
751
752 // Here the block is not pending. It could be done or aborted or not yet being processed.
753 if ( (seekidx >= 0) && (fUnzipState.IsUnzipped(seekidx)) ) {
754 if(!(*buf)) {
755 *buf = fUnzipState.fUnzipChunks[seekidx].get();
757 *free = true;
758 } else {
761 *free = false;
762 }
763
764 fNStalls++;
766 } else {
767 // This is a complete miss. We want to avoid the background tasks
768 // to try unzipping this block in the future.
770 }
771 } else {
772 loc = -1;
773 fIsTransferred = false;
774 }
775 }
776
777 if (len > fCompBufferSize) {
778 if(fCompBuffer) delete [] fCompBuffer;
779 fCompBuffer = new char[len];
781 } else {
782 if (fCompBufferSize > len * 4) {
783 if(fCompBuffer) delete [] fCompBuffer;
784 fCompBuffer = new char[len*2];
785 fCompBufferSize = len * 2;
786 }
787 }
788
789 res = 0;
790 if (!ReadBufferExt(fCompBuffer, pos, len, loc)) {
791 // Cache is invalidated and we need to wait for all unzipping tasks to be finished before fill new baskets in cache.
792#ifdef R__USE_IMT
794 fUnzipTaskGroup->Cancel();
795 fUnzipTaskGroup.reset();
796 }
797#endif
798 {
799 // Fill new baskets into cache.
800 R__LOCKGUARD(fIOMutex.get());
801 fFile->Seek(pos);
803 } // end of lock scope
804#ifdef R__USE_IMT
806 CreateTasks();
807 }
808#endif
809 }
810
811 if (res) res = -1;
812
813 if (!res) {
814 res = UnzipBuffer(buf, fCompBuffer);
815 *free = true;
816 }
817
818 if (!fIsLearning) {
819 fNMissed++;
820 }
821
822 return res;
823}
824
825////////////////////////////////////////////////////////////////////////////////
826/// static function: Sets the unzip relative buffer size
827
832
833////////////////////////////////////////////////////////////////////////////////
834/// Sets the size for the unzipping cache... by default it should be
835/// two times the size of the prefetching cache
836
838{
839 fUnzipBufferSize = bufferSize;
840}
841
842////////////////////////////////////////////////////////////////////////////////
843/// Unzips a ROOT specific buffer... by reading the header at the beginning.
844/// returns the size of the inflated buffer or -1 if error
845/// Note!! : If *dest == 0 we will allocate the buffer and it will be the
846/// responsibility of the caller to free it... it is useful for example
847/// to pass it to the creator of TBuffer
848/// src is the original buffer with the record (header+compressed data)
849/// *dest is the inflated buffer (including the header)
850
852{
853 Int_t uzlen = 0;
854 bool alloc = false;
855
856 // Here we read the header of the buffer
857 const Int_t hlen = 128;
858 Int_t nbytes = 0, objlen = 0, keylen = 0;
860
863
864 if (!(*dest)) {
865 /* early consistency check */
866 UChar_t *bufcur = (UChar_t *) (src + keylen);
867 Int_t nin = 0;
868 Int_t nbuf = 0;
869 if ((objlen > nbytes - keylen) &&
870 ((nbytesRemain < ROOT::Internal::kZipHeaderSize) || (R__unzip_header(&nin, bufcur, &nbuf) != 0))) {
871 Error("UnzipBuffer", "Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
872 uzlen = -1;
873 return uzlen;
874 }
875 Int_t l = keylen + objlen;
876 *dest = new char[l];
877 alloc = true;
878 }
879 // Must unzip the buffer
880 // fSeekPos[ind]; adress of zipped buffer
881 // fSeekLen[ind]; len of the zipped buffer
882 // &fBuffer[fSeekPos[ind]]; memory address
883
884 // This is similar to TBasket::ReadBasketBuffers
885 bool oldCase = objlen == nbytes - keylen
886 && ((TBranch*)fBranches->UncheckedAt(0))->GetCompressionLevel() != 0
887 && fFile->GetVersion() <= 30401;
888
889 if (objlen > nbytes-keylen || oldCase) {
890
891 // Copy the key
892 memcpy(*dest, src, keylen);
893 uzlen += keylen;
894
895 char *objbuf = *dest + keylen;
896 UChar_t *bufcur = (UChar_t *) (src + keylen);
897 Int_t nin = 0;
898 Int_t nbuf = 0;
899 Int_t nout = 0;
900 Int_t noutot = 0;
901
902 while (nbytesRemain >= ROOT::Internal::kZipHeaderSize) {
904 if ((hc != 0) || (nin > nbytesRemain) || (nbuf > objlenRemain))
905 break;
906 if (gDebug > 2)
907 Info("UnzipBuffer", " nin:%d, nbuf:%d, bufcur[3] :%d, bufcur[4] :%d, bufcur[5] :%d ",
908 nin, nbuf, bufcur[3], bufcur[4], bufcur[5]);
909 if (oldCase && (nin > objlen || nbuf > objlen)) {
910 if (gDebug > 2)
911 Info("UnzipBuffer", "oldcase objlen :%d ", objlen);
912
913 //buffer was very likely not compressed in an old version
915 uzlen += objlen;
916 return uzlen;
917 }
918
919 R__unzip(&nin, bufcur, &nbuf, reinterpret_cast<unsigned char *>(objbuf), &nout);
920
921 if (gDebug > 2)
922 Info("UnzipBuffer", "R__unzip nin:%d, bufcur:%p, nbuf:%d, objbuf:%p, nout:%d",
924
925 if (!nout) break;
926 noutot += nout;
927 if (noutot >= objlen) break;
928 bufcur += nin;
929 objbuf += nout;
930 nbytesRemain -= nin;
932 }
933
934 if (noutot != objlen) {
935 Error("UnzipBuffer", "nbytes = %d, keylen = %d, objlen = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d",
937 uzlen = -1;
938 if(alloc) delete [] *dest;
939 *dest = nullptr;
940 return uzlen;
941 }
942 uzlen += objlen;
943 } else {
944 memcpy(*dest, src, keylen);
945 uzlen += keylen;
947 uzlen += objlen;
948 }
949 return uzlen;
950}
951
952////////////////////////////////////////////////////////////////////////////////
953
955
956 printf("******TreeCacheUnzip statistics for file: %s ******\n",fFile->GetName());
957 printf("Max allowed mem for pending buffers: %lld\n", fUnzipBufferSize);
958 printf("Number of blocks unzipped by threads: %d\n", fNUnzip);
959 printf("Number of hits: %d\n", fNFound);
960 printf("Number of stalls: %d\n", fNStalls);
961 printf("Number of misses: %d\n", fNMissed);
962
964}
965
966////////////////////////////////////////////////////////////////////////////////
967
void frombuf(char *&buf, Bool_t *x)
Definition Bytes.h:278
#define b(i)
Definition RSha256.hxx:100
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
int Int_t
Signed integer 4 bytes (int)
Definition RtypesCore.h:59
unsigned char Byte_t
Byte (8 bits) (unsigned char)
Definition RtypesCore.h:78
short Version_t
Class version identifier (short)
Definition RtypesCore.h:79
float Float_t
Float 4 bytes (float)
Definition RtypesCore.h:71
short Short_t
Signed Short integer 2 bytes (short)
Definition RtypesCore.h:53
double Double_t
Double 8 bytes.
Definition RtypesCore.h:73
long long Long64_t
Portable signed long integer 8 bytes.
Definition RtypesCore.h:83
const char Option_t
Option string (const char)
Definition RtypesCore.h:80
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
R__EXTERN TEnv * gEnv
Definition TEnv.h:170
Option_t Option_t option
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t dest
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t index
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t src
Int_t gDebug
Global variable setting the debug level. Set to 0 to disable, increase it in steps of 1 to increase t...
Definition TROOT.cxx:627
#define R__LOCKGUARD(mutex)
This class provides a simple interface to execute the same task multiple times in parallel threads,...
A TTree is a list of TBranches.
Definition TBranch.h:93
A chain is a collection of files containing TTree objects.
Definition TChain.h:33
static TClass * Class()
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition TEnv.cxx:503
<div class="legacybox"><h2>Legacy Code</h2> TEventList is a legacy interface: there will be no bug fi...
Definition TEventList.h:31
virtual bool ContainsRange(Long64_t entrymin, Long64_t entrymax)
Return TRUE if list contains entries from entrymin to entrymax included.
Int_t * fSeekIndex
[fNseek] sorted index table of fSeek
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
Long64_t * fSeekSort
[fNseek] Position on file of buffers to be prefetched (sorted)
Int_t * fSeekLen
[fNseek] Length of buffers to be prefetched
Int_t fNtot
Total size of prefetched blocks.
virtual void Prefetch(Long64_t pos, Int_t len)
Add block of length len at position pos in the list of blocks to be prefetched.
Long64_t * fSeek
[fNseek] Position on file of buffers to be prefetched
Bool_t fIsTransferred
True when fBuffer contains something valid.
TFile * fFile
Pointer to file.
Int_t fNseek
Number of blocks to be prefetched.
virtual Int_t GetBufferSize() const
virtual void Seek(Long64_t offset, ERelativeTo pos=kBeg)
Seek to a specific position in the file. Pos it either kBeg, kCur or kEnd.
Definition TFile.cxx:2304
Int_t GetVersion() const
Definition TFile.h:333
virtual Bool_t ReadBufferAsync(Long64_t offs, Int_t len)
Definition TFile.cxx:4879
virtual Bool_t ReadBuffer(char *buf, Int_t len)
Read a buffer from the file.
Definition TFile.cxx:1800
const char * GetName() const override
Returns name of object.
Definition TNamed.h:49
TObject * UncheckedAt(Int_t i) const
Definition TObjArray.h:90
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition TObject.cxx:1075
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition TObject.cxx:1089
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition TObject.cxx:1063
UnzipState_t fUnzipState
void Init()
Initialization procedure common to all the constructors.
Int_t UnzipCache(Int_t index)
This inflates a basket in the cache.
Int_t SetBufferSize(Long64_t buffersize) override
Change the underlying buffer size of the cache.
Int_t fNMissed
! number of blocks that were not found in the cache and were unzipped
void UpdateBranches(TTree *tree) override
update pointer to current Tree and recompute pointers to the branches in the cache
Int_t AddBranch(TBranch *b, bool subbranches=false) override
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
void ResetCache() override
This will delete the list of buffers that are in the unzipping cache and will reset certain values in...
void SetEntryRange(Long64_t emin, Long64_t emax) override
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
static bool IsParallelUnzip()
Static function that tells wether the multithreading unzipping is activated.
Int_t fNseekMax
! fNseek can change so we need to know its max size
Int_t fNStalls
! number of hits which caused a stall
static TTreeCacheUnzip::EParUnzipMode fgParallel
Indicate if we want to activate the parallelism.
std::unique_ptr< ROOT::Experimental::TTaskGroup > fUnzipTaskGroup
static Int_t SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option=TTreeCacheUnzip::kEnable)
Static function that (de)activates multithreading unzipping.
bool FillBuffer() override
Fill the cache buffer with the branches in the cache.
Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc) override
Int_t UnzipBuffer(char **dest, char *src)
Unzips a ROOT specific buffer... by reading the header at the beginning.
Int_t GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
Read the logical record header from the buffer buf.
static void SetUnzipRelBufferSize(Float_t relbufferSize)
static function: Sets the unzip relative buffer size
std::unique_ptr< TMutex > fIOMutex
Int_t fNUnzip
! number of blocks that were unzipped
Int_t CreateTasks()
We create a TTaskGroup and asynchronously maps each group of baskets(> 100 kB in total) to a task.
Int_t GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, bool *free) override
We try to read a buffer that has already been unzipped Returns -1 in case of read failure,...
Long64_t fUnzipBufferSize
! Max Size for the ready unzipped blocks (default is 2*fBufferSize)
void SetUnzipBufferSize(Long64_t bufferSize)
Sets the size for the unzipping cache... by default it should be two times the size of the prefetchin...
Int_t fUnzipGroupSize
! Min accumulated size of a group of baskets ready to be unzipped by a IMT task
void StopLearningPhase() override
It's the same as TTreeCache::StopLearningPhase but we guarantee that we start the unzipping just afte...
void Print(Option_t *option="") const override
Print cache statistics.
bool fParallel
Indicate if we want to activate the parallelism (for this instance)
Int_t fNFound
! number of blocks that were found in the cache
static Double_t fgRelBuffSize
This is the percentage of the TTreeCacheUnzip that will be used.
~TTreeCacheUnzip() override
Destructor. (in general called by the TFile destructor)
static EParUnzipMode GetParallelUnzip()
Static function that returns the parallel option (to indicate an additional thread)
A cache to speed-up the reading of ROOT datasets.
Definition TTreeCache.h:32
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
Int_t SetBufferSize(Long64_t buffersize) override
Change the underlying buffer size of the cache.
Long64_t fEntryMin
! first entry in the cache
Definition TTreeCache.h:38
Long64_t fEntryNext
! next entry number where cache must be filled
Definition TTreeCache.h:41
TTree * GetTree() const
Definition TTreeCache.h:149
bool fIsLearning
! true if cache is in learning mode
Definition TTreeCache.h:54
virtual void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
Long64_t fEntryMax
! last entry in the cache
Definition TTreeCache.h:39
Long64_t fEntryCurrent
! current lowest entry number in the cache
Definition TTreeCache.h:40
Int_t fNReadPref
Number of blocks that were prefetched.
Definition TTreeCache.h:49
TTree * fTree
! pointer to the current Tree
Definition TTreeCache.h:53
virtual void StopLearningPhase()
This is the counterpart of StartLearningPhase() and can be used to stop the learning phase.
Int_t fNbranches
! Number of branches in the cache
Definition TTreeCache.h:44
void Print(Option_t *option="") const override
Print cache statistics.
Int_t AddBranch(TBranch *b, bool subgbranches=false) override
Add a branch to the list of branches to be stored in the cache this function is called by the user vi...
TObjArray * fBranches
! List of branches to be stored in the cache
Definition TTreeCache.h:51
Helper class to iterate over cluster of baskets.
Definition TTree.h:314
A TTree represents a columnar dataset.
Definition TTree.h:89
TEventList * GetEventList() const
Definition TTree.h:560
TClass * IsA() const override
Definition TTree.h:757
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:600
Long64_t BinarySearch(Long64_t n, const T *array, T value)
Binary search in an array of n values to locate value.
Definition TMathBase.h:329
void Reset(Int_t oldSize, Int_t newSize)
Reset all baskets' state arrays.
void Clear(Int_t size)
Clear all baskets' state arrays.
bool IsUnzipped(Int_t index) const
Check if the basket is unzipped already.
bool IsFinished(Int_t index) const
std::atomic< Byte_t > * fUnzipStatus
! [fNSeek]
void SetUnzipped(Int_t index, char *buf, Int_t len)
std::vector< Int_t > fUnzipLen
! [fNseek] Length of the unzipped buffers
bool TryUnzipping(Int_t index)
Start unzipping the basket if it is untouched yet.
bool IsProgress(Int_t index) const
bool IsUntouched(Int_t index) const
void SetFinished(Int_t index)
Set cache as finished.
std::unique_ptr< char[]> * fUnzipChunks
! [fNseek] Individual unzipped chunks. Their summed size is kept under control.
TLine l
Definition textangle.C:4