Logo ROOT   6.18/05
Reference Guide
TTreeCacheUnzip.cxx
Go to the documentation of this file.
1// Authors: Rene Brun 04/06/2006
2// Leandro Franco 10/04/2008
3// Fabrizio Furano (CERN) Aug 2009
4
5/*************************************************************************
6 * Copyright (C) 1995-2018, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13/**
14\class TTreeCacheUnzip
15\ingroup tree
16
17A TTreeCache which exploits parallelized decompression of its own content.
18
19*/
20
21#include "TTreeCacheUnzip.h"
22#include "TBranch.h"
23#include "TChain.h"
24#include "TEnv.h"
25#include "TEventList.h"
26#include "TFile.h"
27#include "TMath.h"
28#include "TMutex.h"
29#include "ROOT/RMakeUnique.hxx"
30
31#ifdef R__USE_IMT
33#include "ROOT/TTaskGroup.hxx"
34#endif
35
36extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);
37extern "C" int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout);
38
40
41// The unzip cache does not consume memory by itself, it just allocates in advance
42// mem blocks which are then picked as they are by the baskets.
43// Hence there is no good reason to limit it too much
45
47
48////////////////////////////////////////////////////////////////////////////////
49/// Clear all baskets' state arrays.
50
52 for (Int_t i = 0; i < size; i++) {
53 if (!fUnzipLen.empty()) fUnzipLen[i] = 0;
54 if (fUnzipChunks) {
55 if (fUnzipChunks[i]) fUnzipChunks[i].reset();
56 }
57 if (fUnzipStatus) fUnzipStatus[i].store(0);
58 }
59}
60
61////////////////////////////////////////////////////////////////////////////////
62
64 return fUnzipStatus[index].load() == kUntouched;
65}
66
67////////////////////////////////////////////////////////////////////////////////
68
70 return fUnzipStatus[index].load() == kProgress;
71}
72
73////////////////////////////////////////////////////////////////////////////////
74
76 return fUnzipStatus[index].load() == kFinished;
77}
78
79////////////////////////////////////////////////////////////////////////////////
80/// Check if the basket is unzipped already. We must make sure the length in
81/// fUnzipLen is larger than 0.
82
84 return (fUnzipStatus[index].load() == kFinished) && (fUnzipChunks[index].get()) && (fUnzipLen[index] > 0);
85}
86
87////////////////////////////////////////////////////////////////////////////////
88/// Reset all baskets' state arrays. This function is only called by main
89/// thread and parallel processing from upper layers should be disabled such
90/// as IMT in TTree::GetEntry(). Other threads should not call this function
91/// since it is not thread-safe.
92
94 std::vector<Int_t> aUnzipLen = std::vector<Int_t>(newSize, 0);
95 std::unique_ptr<char[]> *aUnzipChunks = new std::unique_ptr<char[]>[newSize];
96 std::atomic<Byte_t> *aUnzipStatus = new std::atomic<Byte_t>[newSize];
97
98 for (Int_t i = 0; i < newSize; ++i)
99 aUnzipStatus[i].store(0);
100
101 for (Int_t i = 0; i < oldSize; i++) {
102 aUnzipLen[i] = fUnzipLen[i];
103 aUnzipChunks[i] = std::move(fUnzipChunks[i]);
104 aUnzipStatus[i].store(fUnzipStatus[i].load());
105 }
106
107 if (fUnzipChunks) delete [] fUnzipChunks;
108 if (fUnzipStatus) delete [] fUnzipStatus;
109
110 fUnzipLen = aUnzipLen;
111 fUnzipChunks = aUnzipChunks;
112 fUnzipStatus = aUnzipStatus;
113}
114
115////////////////////////////////////////////////////////////////////////////////
116/// Set cache as finished.
117/// There are three scenarios that a basket is set as finished:
118/// 1. The basket has already been unzipped.
119/// 2. The thread is aborted from unzipping process.
120/// 3. To avoid other tasks/threads work on this basket,
121/// main thread marks the basket as finished and designates itself
122/// to unzip this basket.
123
125 fUnzipLen[index] = 0;
126 fUnzipChunks[index].reset();
127 fUnzipStatus[index].store((Byte_t)kFinished);
128}
129
130////////////////////////////////////////////////////////////////////////////////
131
133 fUnzipChunks[index].reset();
134 fUnzipStatus[index].store((Byte_t)kFinished);
135}
136
137////////////////////////////////////////////////////////////////////////////////
138
140 // Update status array at the very end because we need to be synchronous with the main thread.
141 fUnzipLen[index] = len;
142 fUnzipChunks[index].reset(buf);
143 fUnzipStatus[index].store((Byte_t)kFinished);
144}
145
146////////////////////////////////////////////////////////////////////////////////
147/// Start unzipping the basket if it is untouched yet.
148
150 Byte_t oldValue = kUntouched;
151 Byte_t newValue = kProgress;
152 return fUnzipStatus[index].compare_exchange_weak(oldValue, newValue, std::memory_order_release, std::memory_order_relaxed);
153}
154
155////////////////////////////////////////////////////////////////////////////////
156
159 fEmpty(kTRUE),
160 fCycle(0),
161 fNseekMax(0),
164 fNFound(0),
165 fNMissed(0),
166 fNStalls(0),
167 fNUnzip(0)
168{
169 // Default Constructor.
170 Init();
171}
172
173////////////////////////////////////////////////////////////////////////////////
174/// Constructor.
175
177 fAsyncReading(kFALSE),
178 fEmpty(kTRUE),
179 fCycle(0),
180 fNseekMax(0),
181 fUnzipGroupSize(0),
182 fUnzipBufferSize(0),
183 fNFound(0),
184 fNMissed(0),
185 fNStalls(0),
186 fNUnzip(0)
187{
188 Init();
189}
190
191////////////////////////////////////////////////////////////////////////////////
192/// Initialization procedure common to all the constructors.
193
195{
196#ifdef R__USE_IMT
197 fUnzipTaskGroup.reset();
198#endif
199 fIOMutex = std::make_unique<TMutex>(kTRUE);
200
201 fCompBuffer = new char[16384];
202 fCompBufferSize = 16384;
203
204 fUnzipGroupSize = 102400; // Each task unzips at least 100 KB
205
206 if (fgParallel == kDisable) {
208 }
209 else if(fgParallel == kEnable || fgParallel == kForce) {
211
212 if(gDebug > 0)
213 Info("TTreeCacheUnzip", "Enabling Parallel Unzipping");
214
216
217 }
218 else {
219 Warning("TTreeCacheUnzip", "Parallel Option unknown");
220 }
221
222 // Check if asynchronous reading is supported by this TFile specialization
223 if (gEnv->GetValue("TFile.AsyncReading", 1)) {
224 if (fFile && !(fFile->ReadBufferAsync(0, 0)))
226 }
227
228}
229
230////////////////////////////////////////////////////////////////////////////////
231/// Destructor. (in general called by the TFile destructor)
232
234{
235 ResetCache();
237}
238
239////////////////////////////////////////////////////////////////////////////////
240/// Add a branch to the list of branches to be stored in the cache
241/// this function is called by TBranch::GetBasket
242/// Returns:
243/// - 0 branch added or already included
244/// - -1 on error
245
247{
248 return TTreeCache::AddBranch(b, subbranches);
249}
250
251////////////////////////////////////////////////////////////////////////////////
252/// Add a branch to the list of branches to be stored in the cache
253/// this function is called by TBranch::GetBasket
254/// Returns:
255/// - 0 branch added or already included
256/// - -1 on error
257
258Int_t TTreeCacheUnzip::AddBranch(const char *branch, Bool_t subbranches /*= kFALSE*/)
259{
260 return TTreeCache::AddBranch(branch, subbranches);
261}
262
263////////////////////////////////////////////////////////////////////////////////
264
266{
267
268 if (fNbranches <= 0) return kFALSE;
269
270 // Fill the cache buffer with the branches in the cache.
272
274 Long64_t entry = tree->GetReadEntry();
275
276 // If the entry is in the range we previously prefetched, there is
277 // no point in retrying. Note that this will also return false
278 // during the training phase (fEntryNext is then set intentional to
279 // the end of the training phase).
280 if (fEntryCurrent <= entry && entry < fEntryNext) return kFALSE;
281
282 // Triggered by the user, not the learning phase
283 if (entry == -1) entry = 0;
284
285 TTree::TClusterIterator clusterIter = tree->GetClusterIterator(entry);
286 fEntryCurrent = clusterIter();
287 fEntryNext = clusterIter.GetNextEntry();
288
290 if (fEntryMax <= 0) fEntryMax = tree->GetEntries();
292
293 // Check if owner has a TEventList set. If yes we optimize for this
294 // Special case reading only the baskets containing entries in the
295 // list.
296 TEventList *elist = fTree->GetEventList();
297 Long64_t chainOffset = 0;
298 if (elist) {
299 if (fTree->IsA() ==TChain::Class()) {
300 TChain *chain = (TChain*)fTree;
301 Int_t t = chain->GetTreeNumber();
302 chainOffset = chain->GetTreeOffset()[t];
303 }
304 }
305
306 //clear cache buffer
308
309 //store baskets
310 for (Int_t i = 0; i < fNbranches; i++) {
312 if (b->GetDirectory() == 0) continue;
313 if (b->GetDirectory()->GetFile() != fFile) continue;
314 Int_t nb = b->GetMaxBaskets();
315 Int_t *lbaskets = b->GetBasketBytes();
316 Long64_t *entries = b->GetBasketEntry();
317 if (!lbaskets || !entries) continue;
318 //we have found the branch. We now register all its baskets
319 //from the requested offset to the basket below fEntrymax
320 Int_t blistsize = b->GetListOfBaskets()->GetSize();
321 for (Int_t j=0;j<nb;j++) {
322 // This basket has already been read, skip it
323 if (j<blistsize && b->GetListOfBaskets()->UncheckedAt(j)) continue;
324
325 Long64_t pos = b->GetBasketSeek(j);
326 Int_t len = lbaskets[j];
327 if (pos <= 0 || len <= 0) continue;
328 //important: do not try to read fEntryNext, otherwise you jump to the next autoflush
329 if (entries[j] >= fEntryNext) continue;
330 if (entries[j] < entry && (j < nb - 1 && entries[j+1] <= entry)) continue;
331 if (elist) {
332 Long64_t emax = fEntryMax;
333 if (j < nb - 1) emax = entries[j+1] - 1;
334 if (!elist->ContainsRange(entries[j] + chainOffset, emax + chainOffset)) continue;
335 }
336 fNReadPref++;
337
338 TFileCacheRead::Prefetch(pos, len);
339 }
340 if (gDebug > 0) printf("Entry: %lld, registering baskets branch %s, fEntryNext=%lld, fNseek=%d, fNtot=%d\n", entry, ((TBranch*)fBranches->UncheckedAt(i))->GetName(), fEntryNext, fNseek, fNtot);
341 }
342
343 // Now fix the size of the status arrays
344 ResetCache();
346
347 return kTRUE;
348}
349
350////////////////////////////////////////////////////////////////////////////////
351/// Change the underlying buffer size of the cache.
352/// Returns:
353/// - 0 if the buffer content is still available
354/// - 1 if some or all of the buffer content has been made unavailable
355/// - -1 on error
356
358{
359 Int_t res = TTreeCache::SetBufferSize(buffersize);
360 if (res < 0) {
361 return res;
362 }
364 ResetCache();
365 return 1;
366}
367
368////////////////////////////////////////////////////////////////////////////////
369/// Set the minimum and maximum entry number to be processed
370/// this information helps to optimize the number of baskets to read
371/// when prefetching the branch buffers.
372
374{
375 TTreeCache::SetEntryRange(emin, emax);
376}
377
378////////////////////////////////////////////////////////////////////////////////
379/// It's the same as TTreeCache::StopLearningPhase but we guarantee that
380/// we start the unzipping just after getting the buffers
381
383{
385}
386
387////////////////////////////////////////////////////////////////////////////////
388///update pointer to current Tree and recompute pointers to the branches in the cache
389
391{
393}
394
395////////////////////////////////////////////////////////////////////////////////
396// //
397// From now on we have the methods concerning the threading part of the cache //
398// //
399////////////////////////////////////////////////////////////////////////////////
400
401////////////////////////////////////////////////////////////////////////////////
402/// Static function that returns the parallel option
403/// (to indicate an additional thread)
404
406{
407 return fgParallel;
408}
409
410////////////////////////////////////////////////////////////////////////////////
411/// Static function that tells wether the multithreading unzipping is activated
412
414{
415 if (fgParallel == kEnable || fgParallel == kForce)
416 return kTRUE;
417
418 return kFALSE;
419}
420
421////////////////////////////////////////////////////////////////////////////////
422/// Static function that (de)activates multithreading unzipping
423///
424/// The possible options are:
425/// - kEnable _Enable_ it, which causes an automatic detection and launches the
426/// additional thread if the number of cores in the machine is greater than
427/// one
428/// - kDisable _Disable_ will not activate the additional thread.
429/// - kForce _Force_ will start the additional thread even if there is only one
430/// core. the default will be taken as kEnable.
431///
432/// Returns 0 if there was an error, 1 otherwise.
433
435{
437 fgParallel = option;
438 return 1;
439 }
440 return 0;
441}
442
443////////////////////////////////////////////////////////////////////////////////
444// //
445// From now on we have the methods concerning the unzipping part of the cache //
446// //
447////////////////////////////////////////////////////////////////////////////////
448
449////////////////////////////////////////////////////////////////////////////////
450/// Read the logical record header from the buffer buf.
451/// That must be the pointer tho the header part not the object by itself and
452/// must contain data of at least maxbytes
453/// Returns nread;
454///
455/// In output arguments:
456///
457/// - nbytes : number of bytes in record
458/// if negative, this is a deleted record
459/// if 0, cannot read record, wrong value of argument first
460/// - objlen : uncompressed object size
461/// - keylen : length of logical record header
462///
463/// Note that the arguments objlen and keylen are returned only
464/// if maxbytes >=16
465/// Note: This was adapted from TFile... so some things dont apply
466
467Int_t TTreeCacheUnzip::GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
468{
469 Version_t versionkey;
470 Short_t klen;
471 UInt_t datime;
472 Int_t nb = 0, olen;
473 Int_t nread = maxbytes;
474 frombuf(buf, &nb);
475 nbytes = nb;
476 if (nb < 0) return nread;
477 // const Int_t headerSize = Int_t(sizeof(nb) +sizeof(versionkey) +sizeof(olen) +sizeof(datime) +sizeof(klen));
478 const Int_t headerSize = 16;
479 if (nread < headerSize) return nread;
480 frombuf(buf, &versionkey);
481 frombuf(buf, &olen);
482 frombuf(buf, &datime);
483 frombuf(buf, &klen);
484 if (!olen) olen = nbytes - klen;
485 objlen = olen;
486 keylen = klen;
487 return nread;
488}
489
490////////////////////////////////////////////////////////////////////////////////
491/// This will delete the list of buffers that are in the unzipping cache
492/// and will reset certain values in the cache.
493/// This name is ambiguos because the method doesn't reset the whole cache,
494/// only the part related to the unzipping
495/// Note: This method is completely different from TTreeCache::ResetCache(),
496/// in that method we were cleaning the prefetching buffer while here we
497/// delete the information about the unzipped buffers
498
500{
501 // Reset all the lists and wipe all the chunks
502 fCycle++;
504
505 if(fNseekMax < fNseek){
506 if (gDebug > 0)
507 Info("ResetCache", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
508
511 }
512 fEmpty = kTRUE;
513}
514
515////////////////////////////////////////////////////////////////////////////////
516/// This inflates a basket in the cache.. passing the data to a new
517/// buffer that will only wait there to be read...
518/// This function is responsible to update corresponding elements in
519/// fUnzipStatus, fUnzipChunks and fUnzipLen. Since we use atomic variables
520/// in fUnzipStatus to exclusively unzip the basket, we must update
521/// fUnzipStatus after fUnzipChunks and fUnzipLen and make sure fUnzipChunks
522/// and fUnzipLen are ready before main thread fetch the data.
523
525{
526 Int_t myCycle;
527 const Int_t hlen = 128;
528 Int_t objlen = 0, keylen = 0;
529 Int_t nbytes = 0;
530 Int_t readbuf = 0;
531
532 Long64_t rdoffs = 0;
533 Int_t rdlen = 0;
534
535 // To synchronize with the 'paging'
536 myCycle = fCycle;
537 rdoffs = fSeek[index];
538 rdlen = fSeekLen[index];
539
540 Int_t loc = -1;
541 if (!fNseek || fIsLearning) {
542 return 1;
543 }
544
545 if ((myCycle != fCycle) || !fIsTransferred) {
546 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
547 return 1;
548 }
549
550 // Prepare a memory buffer of adequate size
551 char* locbuff = 0;
552 if (rdlen > 16384) {
553 locbuff = new char[rdlen];
554 } else if (rdlen * 3 < 16384) {
555 locbuff = new char[rdlen * 2];
556 } else {
557 locbuff = new char[16384];
558 }
559
560 readbuf = ReadBufferExt(locbuff, rdoffs, rdlen, loc);
561
562 if (readbuf <= 0) {
563 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
564 if (locbuff) delete [] locbuff;
565 return -1;
566 }
567
568 GetRecordHeader(locbuff, hlen, nbytes, objlen, keylen);
569
570 Int_t len = (objlen > nbytes - keylen) ? keylen + objlen : nbytes;
571 // If the single unzipped chunk is really too big, reset it to not processable
572 // I.e. mark it as done but set the pointer to 0
573 // This block will be unzipped synchronously in the main thread
574 // TODO: ROOT internally breaks zipped buffers into 16MB blocks, we can probably still unzip in parallel.
575 if (len > 4 * fUnzipBufferSize) {
576 if (gDebug > 0)
577 Info("UnzipCache", "Block %d is too big, skipping.", index);
578
579 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
580 if (locbuff) delete [] locbuff;
581 return 0;
582 }
583
584 // Unzip it into a new blk
585 char *ptr = 0;
586 Int_t loclen = UnzipBuffer(&ptr, locbuff);
587 if ((loclen > 0) && (loclen == objlen + keylen)) {
588 if ((myCycle != fCycle) || !fIsTransferred) {
589 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
590 if (locbuff) delete [] locbuff;
591 return 1;
592 }
593 fUnzipState.SetUnzipped(index, ptr, loclen); // Set it as done
594 fNUnzip++;
595 } else {
596 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
597 }
598
599 if (locbuff) delete [] locbuff;
600 return 0;
601}
602
603#ifdef R__USE_IMT
604////////////////////////////////////////////////////////////////////////////////
605/// We create a TTaskGroup and asynchronously maps each group of baskets(> 100 kB in total)
606/// to a task. In TTaskGroup, we use TThreadExecutor to do the actually work of unzipping
607/// a group of basket. The purpose of creating TTaskGroup is to avoid competing with main thread.
608
610{
611 auto mapFunction = [&]() {
612 auto unzipFunction = [&](const std::vector<Int_t> &indices) {
613 // If cache is invalidated and we should return immediately.
614 if (!fIsTransferred) return nullptr;
615
616 for (auto ii : indices) {
617 if(fUnzipState.TryUnzipping(ii)) {
618 Int_t res = UnzipCache(ii);
619 if(res)
620 if (gDebug > 0)
621 Info("UnzipCache", "Unzipping failed or cache is in learning state");
622 }
623 }
624 return nullptr;
625 };
626
627 Int_t accusz = 0;
628 std::vector<std::vector<Int_t>> basketIndices;
629 std::vector<Int_t> indices;
630 if (fUnzipGroupSize <= 0) fUnzipGroupSize = 102400;
631 for (Int_t i = 0; i < fNseek; i++) {
632 while (accusz < fUnzipGroupSize) {
633 accusz += fSeekLen[i];
634 indices.push_back(i);
635 i++;
636 if (i >= fNseek) break;
637 }
638 if (i < fNseek) i--;
639 basketIndices.push_back(indices);
640 indices.clear();
641 accusz = 0;
642 }
644 pool.Foreach(unzipFunction, basketIndices);
645 };
646
648 fUnzipTaskGroup->Run(mapFunction);
649
650 return 0;
651}
652#endif
653
654////////////////////////////////////////////////////////////////////////////////
655/// We try to read a buffer that has already been unzipped
656/// Returns -1 in case of read failure, 0 in case it's not in the
657/// cache and n>0 in case read from cache (number of bytes copied).
658/// pos and len are the original values as were passed to ReadBuffer
659/// but instead we will return the inflated buffer.
660/// Note!! : If *buf == 0 we will allocate the buffer and it will be the
661/// responsability of the caller to free it... it is useful for example
662/// to pass it to the creator of TBuffer
663
665{
666 Int_t res = 0;
667 Int_t loc = -1;
668
669 // We go straight to TTreeCache/TfileCacheRead, in order to get the info we need
670 // pointer to the original zipped chunk
671 // its index in the original unsorted offsets lists
672 //
673 // Actually there are situations in which copying the buffer is not
674 // useful. But the choice is among doing once more a small memcpy or a binary search in a large array. I prefer the former.
675 // Also, here we prefer not to trigger the (re)population of the chunks in the TFileCacheRead. That is
676 // better to be done in the main thread.
677
678 Int_t myCycle = fCycle;
679
680 if (fParallel && !fIsLearning) {
681
682 if(fNseekMax < fNseek){
683 if (gDebug > 0)
684 Info("GetUnzipBuffer", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
685
688 }
689
691 if ((fCycle == myCycle) && (loc >= 0) && (loc < fNseek) && (pos == fSeekSort[loc])) {
692
693 // The buffer is, at minimum, in the file cache. We must know its index in the requests list
694 // In order to get its info
695 Int_t seekidx = fSeekIndex[loc];
696
697 do {
698
699 // If the block is ready we get it immediately.
700 // And also we don't have to alloc the blks. This is supposed to be
701 // the main thread of the app.
702 if (fUnzipState.IsUnzipped(seekidx)) {
703 if(!(*buf)) {
704 *buf = fUnzipState.fUnzipChunks[seekidx].get();
705 fUnzipState.fUnzipChunks[seekidx].release();
706 *free = kTRUE;
707 } else {
708 memcpy(*buf, fUnzipState.fUnzipChunks[seekidx].get(), fUnzipState.fUnzipLen[seekidx]);
709 fUnzipState.fUnzipChunks[seekidx].reset();
710 *free = kFALSE;
711 }
712
713 fNFound++;
714 return fUnzipState.fUnzipLen[seekidx];
715 }
716
717 // If the requested basket is being unzipped by a background task, we try to steal a blk to unzip.
718 Int_t reqi = -1;
719
720 if (fUnzipState.IsProgress(seekidx)) {
721 if (fEmpty) {
722 for (Int_t ii = 0; ii < fNseek; ++ii) {
723 Int_t idx = (seekidx + 1 + ii) % fNseek;
724 if (fUnzipState.IsUntouched(idx)) {
725 if(fUnzipState.TryUnzipping(idx)) {
726 reqi = idx;
727 break;
728 }
729 }
730 }
731 if (reqi < 0) {
732 fEmpty = kFALSE;
733 } else {
734 UnzipCache(reqi);
735 }
736 }
737
738 if ( myCycle != fCycle ) {
739 if (gDebug > 0)
740 Info("GetUnzipBuffer", "Sudden paging Break!!! fNseek: %d, fIsLearning:%d",
742
743 seekidx = -1;
744 break;
745 }
746 }
747
748 } while (fUnzipState.IsProgress(seekidx));
749
750 // Here the block is not pending. It could be done or aborted or not yet being processed.
751 if ( (seekidx >= 0) && (fUnzipState.IsUnzipped(seekidx)) ) {
752 if(!(*buf)) {
753 *buf = fUnzipState.fUnzipChunks[seekidx].get();
754 fUnzipState.fUnzipChunks[seekidx].release();
755 *free = kTRUE;
756 } else {
757 memcpy(*buf, fUnzipState.fUnzipChunks[seekidx].get(), fUnzipState.fUnzipLen[seekidx]);
758 fUnzipState.fUnzipChunks[seekidx].reset();
759 *free = kFALSE;
760 }
761
762 fNStalls++;
763 return fUnzipState.fUnzipLen[seekidx];
764 } else {
765 // This is a complete miss. We want to avoid the background tasks
766 // to try unzipping this block in the future.
767 fUnzipState.SetMissed(seekidx);
768 }
769 } else {
770 loc = -1;
772 }
773 }
774
775 if (len > fCompBufferSize) {
776 if(fCompBuffer) delete [] fCompBuffer;
777 fCompBuffer = new char[len];
778 fCompBufferSize = len;
779 } else {
780 if (fCompBufferSize > len * 4) {
781 if(fCompBuffer) delete [] fCompBuffer;
782 fCompBuffer = new char[len*2];
783 fCompBufferSize = len * 2;
784 }
785 }
786
787 res = 0;
788 if (!ReadBufferExt(fCompBuffer, pos, len, loc)) {
789 // Cache is invalidated and we need to wait for all unzipping tasks to befinished before fill new baskets in cache.
790#ifdef R__USE_IMT
792 fUnzipTaskGroup->Cancel();
793 fUnzipTaskGroup.reset();
794 }
795#endif
796 {
797 // Fill new baskets into cache.
798 R__LOCKGUARD(fIOMutex.get());
799 fFile->Seek(pos);
800 res = fFile->ReadBuffer(fCompBuffer, len);
801 } // end of lock scope
802#ifdef R__USE_IMT
804 CreateTasks();
805 }
806#endif
807 }
808
809 if (res) res = -1;
810
811 if (!res) {
812 res = UnzipBuffer(buf, fCompBuffer);
813 *free = kTRUE;
814 }
815
816 if (!fIsLearning) {
817 fNMissed++;
818 }
819
820 return res;
821}
822
823////////////////////////////////////////////////////////////////////////////////
824/// static function: Sets the unzip relatibe buffer size
825
827{
828 fgRelBuffSize = relbufferSize;
829}
830
831////////////////////////////////////////////////////////////////////////////////
832/// Sets the size for the unzipping cache... by default it should be
833/// two times the size of the prefetching cache
834
836{
837 fUnzipBufferSize = bufferSize;
838}
839
840////////////////////////////////////////////////////////////////////////////////
841/// Unzips a ROOT specific buffer... by reading the header at the beginning.
842/// returns the size of the inflated buffer or -1 if error
843/// Note!! : If *dest == 0 we will allocate the buffer and it will be the
844/// responsability of the caller to free it... it is useful for example
845/// to pass it to the creator of TBuffer
846/// src is the original buffer with the record (header+compressed data)
847/// *dest is the inflated buffer (including the header)
848
850{
851 Int_t uzlen = 0;
852 Bool_t alloc = kFALSE;
853
854 // Here we read the header of the buffer
855 const Int_t hlen = 128;
856 Int_t nbytes = 0, objlen = 0, keylen = 0;
857 GetRecordHeader(src, hlen, nbytes, objlen, keylen);
858
859 if (!(*dest)) {
860 /* early consistency check */
861 UChar_t *bufcur = (UChar_t *) (src + keylen);
862 Int_t nin, nbuf;
863 if(objlen > nbytes - keylen && R__unzip_header(&nin, bufcur, &nbuf) != 0) {
864 Error("UnzipBuffer", "Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
865 uzlen = -1;
866 return uzlen;
867 }
868 Int_t l = keylen + objlen;
869 *dest = new char[l];
870 alloc = kTRUE;
871 }
872 // Must unzip the buffer
873 // fSeekPos[ind]; adress of zipped buffer
874 // fSeekLen[ind]; len of the zipped buffer
875 // &fBuffer[fSeekPos[ind]]; memory address
876
877 // This is similar to TBasket::ReadBasketBuffers
878 Bool_t oldCase = objlen == nbytes - keylen
879 && ((TBranch*)fBranches->UncheckedAt(0))->GetCompressionLevel() != 0
880 && fFile->GetVersion() <= 30401;
881
882 if (objlen > nbytes-keylen || oldCase) {
883
884 // Copy the key
885 memcpy(*dest, src, keylen);
886 uzlen += keylen;
887
888 char *objbuf = *dest + keylen;
889 UChar_t *bufcur = (UChar_t *) (src + keylen);
890 Int_t nin, nbuf;
891 Int_t nout = 0;
892 Int_t noutot = 0;
893
894 while (1) {
895 Int_t hc = R__unzip_header(&nin, bufcur, &nbuf);
896 if (hc != 0) break;
897 if (gDebug > 2)
898 Info("UnzipBuffer", " nin:%d, nbuf:%d, bufcur[3] :%d, bufcur[4] :%d, bufcur[5] :%d ",
899 nin, nbuf, bufcur[3], bufcur[4], bufcur[5]);
900 if (oldCase && (nin > objlen || nbuf > objlen)) {
901 if (gDebug > 2)
902 Info("UnzipBuffer", "oldcase objlen :%d ", objlen);
903
904 //buffer was very likely not compressed in an old version
905 memcpy(*dest + keylen, src + keylen, objlen);
906 uzlen += objlen;
907 return uzlen;
908 }
909
910 R__unzip(&nin, bufcur, &nbuf, objbuf, &nout);
911
912 if (gDebug > 2)
913 Info("UnzipBuffer", "R__unzip nin:%d, bufcur:%p, nbuf:%d, objbuf:%p, nout:%d",
914 nin, bufcur, nbuf, objbuf, nout);
915
916 if (!nout) break;
917 noutot += nout;
918 if (noutot >= objlen) break;
919 bufcur += nin;
920 objbuf += nout;
921 }
922
923 if (noutot != objlen) {
924 Error("UnzipBuffer", "nbytes = %d, keylen = %d, objlen = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d",
925 nbytes,keylen,objlen, noutot,nout,nin,nbuf);
926 uzlen = -1;
927 if(alloc) delete [] *dest;
928 *dest = 0;
929 return uzlen;
930 }
931 uzlen += objlen;
932 } else {
933 memcpy(*dest, src, keylen);
934 uzlen += keylen;
935 memcpy(*dest + keylen, src + keylen, objlen);
936 uzlen += objlen;
937 }
938 return uzlen;
939}
940
941////////////////////////////////////////////////////////////////////////////////
942
943void TTreeCacheUnzip::Print(Option_t* option) const {
944
945 printf("******TreeCacheUnzip statistics for file: %s ******\n",fFile->GetName());
946 printf("Max allowed mem for pending buffers: %lld\n", fUnzipBufferSize);
947 printf("Number of blocks unzipped by threads: %d\n", fNUnzip);
948 printf("Number of hits: %d\n", fNFound);
949 printf("Number of stalls: %d\n", fNStalls);
950 printf("Number of misses: %d\n", fNMissed);
951
952 TTreeCache::Print(option);
953}
954
955////////////////////////////////////////////////////////////////////////////////
956
958 R__LOCKGUARD(fIOMutex.get());
959 return TTreeCache::ReadBufferExt(buf, pos, len, loc);
960}
void frombuf(char *&buf, Bool_t *x)
Definition: Bytes.h:280
void Class()
Definition: Class.C:29
#define b(i)
Definition: RSha256.hxx:100
unsigned char Byte_t
Definition: RtypesCore.h:60
int Int_t
Definition: RtypesCore.h:41
short Version_t
Definition: RtypesCore.h:61
unsigned char UChar_t
Definition: RtypesCore.h:34
unsigned int UInt_t
Definition: RtypesCore.h:42
const Bool_t kFALSE
Definition: RtypesCore.h:88
bool Bool_t
Definition: RtypesCore.h:59
short Short_t
Definition: RtypesCore.h:35
double Double_t
Definition: RtypesCore.h:55
long long Long64_t
Definition: RtypesCore.h:69
float Float_t
Definition: RtypesCore.h:53
const Bool_t kTRUE
Definition: RtypesCore.h:87
const char Option_t
Definition: RtypesCore.h:62
#define ClassImp(name)
Definition: Rtypes.h:365
R__EXTERN Int_t gDebug
Definition: Rtypes.h:91
R__EXTERN TEnv * gEnv
Definition: TEnv.h:171
void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout)
int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout)
#define R__LOCKGUARD(mutex)
#define free
Definition: civetweb.c:1539
A class to manage the asynchronous execution of work items.
Definition: TTaskGroup.hxx:21
This class provides a simple interface to execute the same task multiple times in parallel,...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute func (with no arguments) nTimes in parallel.
A TTree is a list of TBranches.
Definition: TBranch.h:65
A chain is a collection of files containing TTree objects.
Definition: TChain.h:34
virtual Int_t GetTreeNumber() const
Definition: TChain.h:117
Long64_t * GetTreeOffset() const
Definition: TChain.h:118
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition: TEnv.cxx:491
A TEventList object is a list of selected events (entries) in a TTree.
Definition: TEventList.h:31
virtual Bool_t ContainsRange(Long64_t entrymin, Long64_t entrymax)
Return TRUE if list contains entries from entrymin to entrymax included.
Definition: TEventList.cxx:171
Int_t * fSeekIndex
[fNseek] sorted index table of fSeek
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
Long64_t * fSeekSort
[fNseek] Position on file of buffers to be prefetched (sorted)
Int_t * fSeekLen
[fNseek] Length of buffers to be prefetched
Int_t fNtot
Total size of prefetched blocks.
virtual void Prefetch(Long64_t pos, Int_t len)
Add block of length len at position pos in the list of blocks to be prefetched.
Long64_t * fSeek
[fNseek] Position on file of buffers to be prefetched
Bool_t fIsTransferred
True when fBuffer contains something valid.
TFile * fFile
Pointer to file.
Int_t fNseek
Number of blocks to be prefetched.
virtual Int_t GetBufferSize() const
virtual void Seek(Long64_t offset, ERelativeTo pos=kBeg)
Seek to a specific position in the file. Pos it either kBeg, kCur or kEnd.
Definition: TFile.cxx:2175
Int_t GetVersion() const
Definition: TFile.h:229
virtual Bool_t ReadBufferAsync(Long64_t offs, Int_t len)
Definition: TFile.cxx:5082
virtual Bool_t ReadBuffer(char *buf, Int_t len)
Read a buffer from the file.
Definition: TFile.cxx:1672
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:47
TObject * UncheckedAt(Int_t i) const
Definition: TObjArray.h:90
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:866
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:880
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:854
A TTreeCache which exploits parallelized decompression of its own content.
virtual void StopLearningPhase()
It's the same as TTreeCache::StopLearningPhase but we guarantee that we start the unzipping just afte...
UnzipState_t fUnzipState
Bool_t FillBuffer()
Fill the cache buffer with the branches in the cache.
void Init()
Initialization procedure common to all the constructors.
Int_t UnzipCache(Int_t index)
This inflates a basket in the cache.
Int_t fNMissed
! number of blocks that were not found in the cache and were unzipped
void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
Int_t fNseekMax
! fNseek can change so we need to know its max size
virtual ~TTreeCacheUnzip()
Destructor. (in general called by the TFile destructor)
void UpdateBranches(TTree *tree)
update pointer to current Tree and recompute pointers to the branches in the cache
Int_t fNStalls
! number of hits which caused a stall
static TTreeCacheUnzip::EParUnzipMode fgParallel
Indicate if we want to activate the parallelism.
std::unique_ptr< ROOT::Experimental::TTaskGroup > fUnzipTaskGroup
static Int_t SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option=TTreeCacheUnzip::kEnable)
Static function that (de)activates multithreading unzipping.
Int_t UnzipBuffer(char **dest, char *src)
Unzips a ROOT specific buffer... by reading the header at the beginning.
Int_t GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
Read the logical record header from the buffer buf.
static void SetUnzipRelBufferSize(Float_t relbufferSize)
static function: Sets the unzip relatibe buffer size
void Print(Option_t *option="") const
Print cache statistics.
std::unique_ptr< TMutex > fIOMutex
Int_t fNUnzip
! number of blocks that were unzipped
Int_t CreateTasks()
We create a TTaskGroup and asynchronously maps each group of baskets(> 100 kB in total) to a task.
Bool_t fParallel
Indicate if we want to activate the parallelism (for this instance)
Long64_t fUnzipBufferSize
! Max Size for the ready unzipped blocks (default is 2*fBufferSize)
static Bool_t IsParallelUnzip()
Static function that tells wether the multithreading unzipping is activated.
void SetUnzipBufferSize(Long64_t bufferSize)
Sets the size for the unzipping cache... by default it should be two times the size of the prefetchin...
Int_t fUnzipGroupSize
! Min accumulated size of a group of baskets ready to be unzipped by a IMT task
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
Int_t fNFound
! number of blocks that were found in the cache
virtual Int_t GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, Bool_t *free)
We try to read a buffer that has already been unzipped Returns -1 in case of read failure,...
virtual Int_t AddBranch(TBranch *b, Bool_t subbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
static Double_t fgRelBuffSize
This is the percentage of the TTreeCacheUnzip that will be used.
virtual void ResetCache()
This will delete the list of buffers that are in the unzipping cache and will reset certain values in...
static EParUnzipMode GetParallelUnzip()
Static function that returns the parallel option (to indicate an additional thread)
A cache to speed-up the reading of ROOT datasets.
Definition: TTreeCache.h:35
virtual Int_t AddBranch(TBranch *b, Bool_t subgbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by the user vi...
Definition: TTreeCache.cxx:368
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
Long64_t fEntryMin
! first entry in the cache
Definition: TTreeCache.h:41
Long64_t fEntryNext
! next entry number where cache must be filled
Definition: TTreeCache.h:44
Bool_t fIsLearning
! true if cache is in learning mode
Definition: TTreeCache.h:57
TTree * GetTree() const
Definition: TTreeCache.h:152
virtual void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
Long64_t fEntryMax
! last entry in the cache
Definition: TTreeCache.h:42
Long64_t fEntryCurrent
! current lowest entry number in the cache
Definition: TTreeCache.h:43
Int_t fNReadPref
Number of blocks that were prefetched.
Definition: TTreeCache.h:52
TTree * fTree
! pointer to the current Tree
Definition: TTreeCache.h:56
virtual void StopLearningPhase()
This is the counterpart of StartLearningPhase() and can be used to stop the learning phase.
Int_t fNbranches
! Number of branches in the cache
Definition: TTreeCache.h:47
virtual void Print(Option_t *option="") const
Print cache statistics.
TObjArray * fBranches
! List of branches to be stored in the cache
Definition: TTreeCache.h:54
Helper class to iterate over cluster of baskets.
Definition: TTree.h:247
Long64_t GetNextEntry()
Definition: TTree.h:284
A TTree represents a columnar dataset.
Definition: TTree.h:71
TEventList * GetEventList() const
Definition: TTree.h:412
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition: TROOT.cxx:610
Long64_t BinarySearch(Long64_t n, const T *array, T value)
Definition: TMathBase.h:278
Definition: tree.py:1
Bool_t IsFinished(Int_t index) const
void Reset(Int_t oldSize, Int_t newSize)
Reset all baskets' state arrays.
void Clear(Int_t size)
Clear all baskets' state arrays.
Bool_t IsProgress(Int_t index) const
std::atomic< Byte_t > * fUnzipStatus
! [fNSeek]
void SetUnzipped(Int_t index, char *buf, Int_t len)
std::vector< Int_t > fUnzipLen
! [fNseek] Length of the unzipped buffers
Bool_t TryUnzipping(Int_t index)
Start unzipping the basket if it is untouched yet.
Bool_t IsUnzipped(Int_t index) const
Check if the basket is unzipped already.
Bool_t IsUntouched(Int_t index) const
void SetFinished(Int_t index)
Set cache as finished.
std::unique_ptr< char[]> * fUnzipChunks
! [fNseek] Individual unzipped chunks. Their summed size is kept under control.
auto * l
Definition: textangle.C:4
#define dest(otri, vertexptr)
Definition: triangle.c:1040