Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TTreeCacheUnzip.cxx
Go to the documentation of this file.
1// Authors: Rene Brun 04/06/2006
2// Leandro Franco 10/04/2008
3// Fabrizio Furano (CERN) Aug 2009
4
5/*************************************************************************
6 * Copyright (C) 1995-2018, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13/**
14\class TTreeCacheUnzip
15\ingroup tree
16
17A TTreeCache which exploits parallelized decompression of its own content.
18
19*/
20
21#include "TTreeCacheUnzip.h"
22#include "TBranch.h"
23#include "TChain.h"
24#include "TEnv.h"
25#include "TEventList.h"
26#include "TFile.h"
27#include "TMath.h"
28#include "TROOT.h"
29#include "TMutex.h"
30
31#ifdef R__USE_IMT
33#include "ROOT/TTaskGroup.hxx"
34#endif
35
36#include <memory>
37
38extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);
39extern "C" int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout);
40
42
43// The unzip cache does not consume memory by itself, it just allocates in advance
44// mem blocks which are then picked as they are by the baskets.
45// Hence there is no good reason to limit it too much
47
49
50////////////////////////////////////////////////////////////////////////////////
51/// Clear all baskets' state arrays.
52
54 for (Int_t i = 0; i < size; i++) {
55 if (!fUnzipLen.empty()) fUnzipLen[i] = 0;
56 if (fUnzipChunks) {
57 if (fUnzipChunks[i]) fUnzipChunks[i].reset();
58 }
59 if (fUnzipStatus) fUnzipStatus[i].store(0);
60 }
61}
62
63////////////////////////////////////////////////////////////////////////////////
64
66 return fUnzipStatus[index].load() == kUntouched;
67}
68
69////////////////////////////////////////////////////////////////////////////////
70
72 return fUnzipStatus[index].load() == kProgress;
73}
74
75////////////////////////////////////////////////////////////////////////////////
76
78 return fUnzipStatus[index].load() == kFinished;
79}
80
81////////////////////////////////////////////////////////////////////////////////
82/// Check if the basket is unzipped already. We must make sure the length in
83/// fUnzipLen is larger than 0.
84
86 return (fUnzipStatus[index].load() == kFinished) && (fUnzipChunks[index].get()) && (fUnzipLen[index] > 0);
87}
88
89////////////////////////////////////////////////////////////////////////////////
90/// Reset all baskets' state arrays. This function is only called by main
91/// thread and parallel processing from upper layers should be disabled such
92/// as IMT in TTree::GetEntry(). Other threads should not call this function
93/// since it is not thread-safe.
94
96 std::vector<Int_t> aUnzipLen = std::vector<Int_t>(newSize, 0);
97 std::unique_ptr<char[]> *aUnzipChunks = new std::unique_ptr<char[]>[newSize];
98 std::atomic<Byte_t> *aUnzipStatus = new std::atomic<Byte_t>[newSize];
99
100 for (Int_t i = 0; i < newSize; ++i)
101 aUnzipStatus[i].store(0);
102
103 for (Int_t i = 0; i < oldSize; i++) {
104 aUnzipLen[i] = fUnzipLen[i];
105 aUnzipChunks[i] = std::move(fUnzipChunks[i]);
106 aUnzipStatus[i].store(fUnzipStatus[i].load());
107 }
108
109 if (fUnzipChunks) delete [] fUnzipChunks;
110 if (fUnzipStatus) delete [] fUnzipStatus;
111
112 fUnzipLen = aUnzipLen;
113 fUnzipChunks = aUnzipChunks;
114 fUnzipStatus = aUnzipStatus;
115}
116
117////////////////////////////////////////////////////////////////////////////////
118/// Set cache as finished.
119/// There are three scenarios that a basket is set as finished:
120/// 1. The basket has already been unzipped.
121/// 2. The thread is aborted from unzipping process.
122/// 3. To avoid other tasks/threads work on this basket,
123/// main thread marks the basket as finished and designates itself
124/// to unzip this basket.
125
127 fUnzipLen[index] = 0;
128 fUnzipChunks[index].reset();
129 fUnzipStatus[index].store((Byte_t)kFinished);
130}
131
132////////////////////////////////////////////////////////////////////////////////
133
135 fUnzipChunks[index].reset();
136 fUnzipStatus[index].store((Byte_t)kFinished);
137}
138
139////////////////////////////////////////////////////////////////////////////////
140
142 // Update status array at the very end because we need to be synchronous with the main thread.
143 fUnzipLen[index] = len;
144 fUnzipChunks[index].reset(buf);
145 fUnzipStatus[index].store((Byte_t)kFinished);
146}
147
148////////////////////////////////////////////////////////////////////////////////
149/// Start unzipping the basket if it is untouched yet.
150
152 Byte_t oldValue = kUntouched;
153 Byte_t newValue = kProgress;
154 return fUnzipStatus[index].compare_exchange_weak(oldValue, newValue, std::memory_order_release, std::memory_order_relaxed);
155}
156
157////////////////////////////////////////////////////////////////////////////////
158
160 fAsyncReading(false),
161 fEmpty(true),
162 fCycle(0),
163 fNseekMax(0),
166 fNFound(0),
167 fNMissed(0),
168 fNStalls(0),
169 fNUnzip(0)
170{
171 // Default Constructor.
172 Init();
173}
174
175////////////////////////////////////////////////////////////////////////////////
176/// Constructor.
177
178TTreeCacheUnzip::TTreeCacheUnzip(TTree *tree, Int_t buffersize) : TTreeCache(tree,buffersize),
179 fAsyncReading(false),
180 fEmpty(true),
181 fCycle(0),
182 fNseekMax(0),
183 fUnzipGroupSize(0),
184 fUnzipBufferSize(0),
185 fNFound(0),
186 fNMissed(0),
187 fNStalls(0),
188 fNUnzip(0)
189{
190 Init();
191}
192
193////////////////////////////////////////////////////////////////////////////////
194/// Initialization procedure common to all the constructors.
195
197{
198#ifdef R__USE_IMT
199 fUnzipTaskGroup.reset();
200#endif
201 fIOMutex = std::make_unique<TMutex>(true);
202
203 fCompBuffer = new char[16384];
204 fCompBufferSize = 16384;
205
206 fUnzipGroupSize = 102400; // Each task unzips at least 100 KB
207
208 if (fgParallel == kDisable) {
209 fParallel = false;
210 }
211 else if(fgParallel == kEnable || fgParallel == kForce) {
213
214 if(gDebug > 0)
215 Info("TTreeCacheUnzip", "Enabling Parallel Unzipping");
216
217 fParallel = true;
218
219 }
220 else {
221 Warning("TTreeCacheUnzip", "Parallel Option unknown");
222 }
223
224 // Check if asynchronous reading is supported by this TFile specialization
225 if (gEnv->GetValue("TFile.AsyncReading", 1)) {
226 if (fFile && !(fFile->ReadBufferAsync(0, 0)))
227 fAsyncReading = true;
228 }
229
230}
231
232////////////////////////////////////////////////////////////////////////////////
233/// Destructor. (in general called by the TFile destructor)
234
236{
237 ResetCache();
239}
240
241////////////////////////////////////////////////////////////////////////////////
242/// Add a branch to the list of branches to be stored in the cache
243/// this function is called by TBranch::GetBasket
244/// Returns:
245/// - 0 branch added or already included
246/// - -1 on error
247
248Int_t TTreeCacheUnzip::AddBranch(TBranch *b, bool subbranches /*= false*/)
249{
250 return TTreeCache::AddBranch(b, subbranches);
251}
252
253////////////////////////////////////////////////////////////////////////////////
254/// Add a branch to the list of branches to be stored in the cache
255/// this function is called by TBranch::GetBasket
256/// Returns:
257/// - 0 branch added or already included
258/// - -1 on error
259
260Int_t TTreeCacheUnzip::AddBranch(const char *branch, bool subbranches /*= false*/)
261{
262 return TTreeCache::AddBranch(branch, subbranches);
263}
264
265////////////////////////////////////////////////////////////////////////////////
266
268{
269
270 if (fNbranches <= 0) return false;
271
272 // Fill the cache buffer with the branches in the cache.
273 fIsTransferred = false;
274
275 TTree *tree = ((TBranch*)fBranches->UncheckedAt(0))->GetTree();
276 Long64_t entry = tree->GetReadEntry();
277
278 // If the entry is in the range we previously prefetched, there is
279 // no point in retrying. Note that this will also return false
280 // during the training phase (fEntryNext is then set intentional to
281 // the end of the training phase).
282 if (fEntryCurrent <= entry && entry < fEntryNext) return false;
283
284 // Triggered by the user, not the learning phase
285 if (entry == -1) entry = 0;
286
287 TTree::TClusterIterator clusterIter = tree->GetClusterIterator(entry);
288 fEntryCurrent = clusterIter();
289 fEntryNext = clusterIter.GetNextEntry();
290
292 if (fEntryMax <= 0) fEntryMax = tree->GetEntries();
294
295 // Check if owner has a TEventList set. If yes we optimize for this
296 // Special case reading only the baskets containing entries in the
297 // list.
298 TEventList *elist = fTree->GetEventList();
299 Long64_t chainOffset = 0;
300 if (elist) {
301 if (fTree->IsA() ==TChain::Class()) {
302 TChain *chain = (TChain*)fTree;
303 Int_t t = chain->GetTreeNumber();
304 chainOffset = chain->GetTreeOffset()[t];
305 }
306 }
307
308 //clear cache buffer
310
311 //store baskets
312 for (Int_t i = 0; i < fNbranches; i++) {
314 if (b->GetDirectory() == nullptr) continue;
315 if (b->GetDirectory()->GetFile() != fFile) continue;
316 Int_t nb = b->GetMaxBaskets();
317 Int_t *lbaskets = b->GetBasketBytes();
318 Long64_t *entries = b->GetBasketEntry();
319 if (!lbaskets || !entries) continue;
320 //we have found the branch. We now register all its baskets
321 //from the requested offset to the basket below fEntrymax
322 Int_t blistsize = b->GetListOfBaskets()->GetSize();
323 for (Int_t j=0;j<nb;j++) {
324 // This basket has already been read, skip it
325 if (j<blistsize && b->GetListOfBaskets()->UncheckedAt(j)) continue;
326
327 Long64_t pos = b->GetBasketSeek(j);
328 Int_t len = lbaskets[j];
329 if (pos <= 0 || len <= 0) continue;
330 //important: do not try to read fEntryNext, otherwise you jump to the next autoflush
331 if (entries[j] >= fEntryNext) continue;
332 if (entries[j] < entry && (j < nb - 1 && entries[j+1] <= entry)) continue;
333 if (elist) {
334 Long64_t emax = fEntryMax;
335 if (j < nb - 1) emax = entries[j+1] - 1;
336 if (!elist->ContainsRange(entries[j] + chainOffset, emax + chainOffset)) continue;
337 }
338 fNReadPref++;
339
341 }
342 if (gDebug > 0) printf("Entry: %lld, registering baskets branch %s, fEntryNext=%lld, fNseek=%d, fNtot=%d\n", entry, ((TBranch*)fBranches->UncheckedAt(i))->GetName(), fEntryNext, fNseek, fNtot);
343 }
344
345 // Now fix the size of the status arrays
346 ResetCache();
347 fIsLearning = false;
348
349 return true;
350}
351
352////////////////////////////////////////////////////////////////////////////////
353/// Change the underlying buffer size of the cache.
354/// Returns:
355/// - 0 if the buffer content is still available
356/// - 1 if some or all of the buffer content has been made unavailable
357/// - -1 on error
358
360{
361 Int_t res = TTreeCache::SetBufferSize(buffersize);
362 if (res < 0) {
363 return res;
364 }
366 ResetCache();
367 return 1;
368}
369
370////////////////////////////////////////////////////////////////////////////////
371/// Set the minimum and maximum entry number to be processed
372/// this information helps to optimize the number of baskets to read
373/// when prefetching the branch buffers.
374
376{
377 TTreeCache::SetEntryRange(emin, emax);
378}
379
380////////////////////////////////////////////////////////////////////////////////
381/// It's the same as TTreeCache::StopLearningPhase but we guarantee that
382/// we start the unzipping just after getting the buffers
383
385{
387}
388
389////////////////////////////////////////////////////////////////////////////////
390///update pointer to current Tree and recompute pointers to the branches in the cache
391
393{
395}
396
397////////////////////////////////////////////////////////////////////////////////
398// //
399// From now on we have the methods concerning the threading part of the cache //
400// //
401////////////////////////////////////////////////////////////////////////////////
402
403////////////////////////////////////////////////////////////////////////////////
404/// Static function that returns the parallel option
405/// (to indicate an additional thread)
406
408{
409 return fgParallel;
410}
411
412////////////////////////////////////////////////////////////////////////////////
413/// Static function that tells wether the multithreading unzipping is activated
414
416{
417 if (fgParallel == kEnable || fgParallel == kForce)
418 return true;
419
420 return false;
421}
422
423////////////////////////////////////////////////////////////////////////////////
424/// Static function that (de)activates multithreading unzipping
425///
426/// The possible options are:
427/// - kEnable _Enable_ it, which causes an automatic detection and launches the
428/// additional thread if the number of cores in the machine is greater than
429/// one
430/// - kDisable _Disable_ will not activate the additional thread.
431/// - kForce _Force_ will start the additional thread even if there is only one
432/// core. the default will be taken as kEnable.
433///
434/// Returns 0 if there was an error, 1 otherwise.
435
437{
440 return 1;
441 }
442 return 0;
443}
444
445////////////////////////////////////////////////////////////////////////////////
446// //
447// From now on we have the methods concerning the unzipping part of the cache //
448// //
449////////////////////////////////////////////////////////////////////////////////
450
451////////////////////////////////////////////////////////////////////////////////
452/// Read the logical record header from the buffer buf.
453/// That must be the pointer tho the header part not the object by itself and
454/// must contain data of at least maxbytes
455/// Returns nread;
456///
457/// In output arguments:
458///
459/// - nbytes : number of bytes in record
460/// if negative, this is a deleted record
461/// if 0, cannot read record, wrong value of argument first
462/// - objlen : uncompressed object size
463/// - keylen : length of logical record header
464///
465/// Note that the arguments objlen and keylen are returned only
466/// if maxbytes >=16
467/// Note: This was adapted from TFile... so some things dont apply
468
469Int_t TTreeCacheUnzip::GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
470{
471 Version_t versionkey;
472 Short_t klen;
473 UInt_t datime;
474 Int_t nb = 0, olen;
475 Int_t nread = maxbytes;
476 frombuf(buf, &nb);
477 nbytes = nb;
478 if (nb < 0) return nread;
479 // const Int_t headerSize = Int_t(sizeof(nb) +sizeof(versionkey) +sizeof(olen) +sizeof(datime) +sizeof(klen));
480 const Int_t headerSize = 16;
481 if (nread < headerSize) return nread;
482 frombuf(buf, &versionkey);
483 frombuf(buf, &olen);
484 frombuf(buf, &datime);
485 frombuf(buf, &klen);
486 if (!olen) olen = nbytes - klen;
487 objlen = olen;
488 keylen = klen;
489 return nread;
490}
491
492////////////////////////////////////////////////////////////////////////////////
493/// This will delete the list of buffers that are in the unzipping cache
494/// and will reset certain values in the cache.
495/// This name is ambiguos because the method doesn't reset the whole cache,
496/// only the part related to the unzipping
497/// Note: This method is completely different from TTreeCache::ResetCache(),
498/// in that method we were cleaning the prefetching buffer while here we
499/// delete the information about the unzipped buffers
500
502{
503 // Reset all the lists and wipe all the chunks
504 fCycle++;
506
507 if(fNseekMax < fNseek){
508 if (gDebug > 0)
509 Info("ResetCache", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
510
513 }
514 fEmpty = true;
515}
516
517////////////////////////////////////////////////////////////////////////////////
518/// This inflates a basket in the cache.. passing the data to a new
519/// buffer that will only wait there to be read...
520/// This function is responsible to update corresponding elements in
521/// fUnzipStatus, fUnzipChunks and fUnzipLen. Since we use atomic variables
522/// in fUnzipStatus to exclusively unzip the basket, we must update
523/// fUnzipStatus after fUnzipChunks and fUnzipLen and make sure fUnzipChunks
524/// and fUnzipLen are ready before main thread fetch the data.
525
527{
528 Int_t myCycle;
529 const Int_t hlen = 128;
530 Int_t objlen = 0, keylen = 0;
531 Int_t nbytes = 0;
532 Int_t readbuf = 0;
533
534 Long64_t rdoffs = 0;
535 Int_t rdlen = 0;
536
537 // To synchronize with the 'paging'
538 myCycle = fCycle;
539 rdoffs = fSeek[index];
540 rdlen = fSeekLen[index];
541
542 Int_t loc = -1;
543 if (!fNseek || fIsLearning) {
544 return 1;
545 }
546
547 if ((myCycle != fCycle) || !fIsTransferred) {
548 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
549 return 1;
550 }
551
552 // Prepare a memory buffer of adequate size
553 char* locbuff = nullptr;
554 if (rdlen > 16384) {
555 locbuff = new char[rdlen];
556 } else if (rdlen * 3 < 16384) {
557 locbuff = new char[rdlen * 2];
558 } else {
559 locbuff = new char[16384];
560 }
561
562 readbuf = ReadBufferExt(locbuff, rdoffs, rdlen, loc);
563
564 if (readbuf <= 0) {
565 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
566 if (locbuff) delete [] locbuff;
567 return -1;
568 }
569
570 GetRecordHeader(locbuff, hlen, nbytes, objlen, keylen);
571
572 Int_t len = (objlen > nbytes - keylen) ? keylen + objlen : nbytes;
573 // If the single unzipped chunk is really too big, reset it to not processable
574 // I.e. mark it as done but set the pointer to 0
575 // This block will be unzipped synchronously in the main thread
576 // TODO: ROOT internally breaks zipped buffers into 16MB blocks, we can probably still unzip in parallel.
577 if (len > 4 * fUnzipBufferSize) {
578 if (gDebug > 0)
579 Info("UnzipCache", "Block %d is too big, skipping.", index);
580
581 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
582 if (locbuff) delete [] locbuff;
583 return 0;
584 }
585
586 // Unzip it into a new blk
587 char *ptr = nullptr;
588 Int_t loclen = UnzipBuffer(&ptr, locbuff);
589 if ((loclen > 0) && (loclen == objlen + keylen)) {
590 if ((myCycle != fCycle) || !fIsTransferred) {
591 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
592 if (locbuff) delete [] locbuff;
593 delete [] ptr;
594 return 1;
595 }
596 fUnzipState.SetUnzipped(index, ptr, loclen); // Set it as done
597 fNUnzip++;
598 } else {
599 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
600 delete [] ptr;
601 }
602
603 if (locbuff) delete [] locbuff;
604 return 0;
605}
606
607#ifdef R__USE_IMT
608////////////////////////////////////////////////////////////////////////////////
609/// We create a TTaskGroup and asynchronously maps each group of baskets(> 100 kB in total)
610/// to a task. In TTaskGroup, we use TThreadExecutor to do the actually work of unzipping
611/// a group of basket. The purpose of creating TTaskGroup is to avoid competing with main thread.
612
614{
615 auto mapFunction = [&]() {
616 auto unzipFunction = [&](const std::vector<Int_t> &indices) {
617 // If cache is invalidated and we should return immediately.
618 if (!fIsTransferred) return nullptr;
619
620 for (auto ii : indices) {
621 if(fUnzipState.TryUnzipping(ii)) {
622 Int_t res = UnzipCache(ii);
623 if(res)
624 if (gDebug > 0)
625 Info("UnzipCache", "Unzipping failed or cache is in learning state");
626 }
627 }
628 return nullptr;
629 };
630
631 Int_t accusz = 0;
632 std::vector<std::vector<Int_t>> basketIndices;
633 std::vector<Int_t> indices;
634 if (fUnzipGroupSize <= 0) fUnzipGroupSize = 102400;
635 for (Int_t i = 0; i < fNseek; i++) {
636 while (accusz < fUnzipGroupSize) {
637 accusz += fSeekLen[i];
638 indices.push_back(i);
639 i++;
640 if (i >= fNseek) break;
641 }
642 if (i < fNseek) i--;
643 basketIndices.push_back(indices);
644 indices.clear();
645 accusz = 0;
646 }
648 pool.Foreach(unzipFunction, basketIndices);
649 };
650
651 fUnzipTaskGroup = std::make_unique<ROOT::Experimental::TTaskGroup>();
652 fUnzipTaskGroup->Run(mapFunction);
653
654 return 0;
655}
656#endif
657
658////////////////////////////////////////////////////////////////////////////////
659/// We try to read a buffer that has already been unzipped
660/// Returns -1 in case of read failure, 0 in case it's not in the
661/// cache and n>0 in case read from cache (number of bytes copied).
662/// pos and len are the original values as were passed to ReadBuffer
663/// but instead we will return the inflated buffer.
664/// Note!! : If *buf == 0 we will allocate the buffer and it will be the
665/// responsibility of the caller to free it... it is useful for example
666/// to pass it to the creator of TBuffer
667
669{
670 Int_t res = 0;
671 Int_t loc = -1;
672
673 // We go straight to TTreeCache/TfileCacheRead, in order to get the info we need
674 // pointer to the original zipped chunk
675 // its index in the original unsorted offsets lists
676 //
677 // Actually there are situations in which copying the buffer is not
678 // useful. But the choice is among doing once more a small memcpy or a binary search in a large array. I prefer the former.
679 // Also, here we prefer not to trigger the (re)population of the chunks in the TFileCacheRead. That is
680 // better to be done in the main thread.
681
682 Int_t myCycle = fCycle;
683
684 if (fParallel && !fIsLearning) {
685
686 if(fNseekMax < fNseek){
687 if (gDebug > 0)
688 Info("GetUnzipBuffer", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
689
692 }
693
695 if ((fCycle == myCycle) && (loc >= 0) && (loc < fNseek) && (pos == fSeekSort[loc])) {
696
697 // The buffer is, at minimum, in the file cache. We must know its index in the requests list
698 // In order to get its info
699 Int_t seekidx = fSeekIndex[loc];
700
701 do {
702
703 // If the block is ready we get it immediately.
704 // And also we don't have to alloc the blks. This is supposed to be
705 // the main thread of the app.
706 if (fUnzipState.IsUnzipped(seekidx)) {
707 if(!(*buf)) {
708 *buf = fUnzipState.fUnzipChunks[seekidx].get();
709 fUnzipState.fUnzipChunks[seekidx].release();
710 *free = true;
711 } else {
712 memcpy(*buf, fUnzipState.fUnzipChunks[seekidx].get(), fUnzipState.fUnzipLen[seekidx]);
713 fUnzipState.fUnzipChunks[seekidx].reset();
714 *free = false;
715 }
716
717 fNFound++;
718 return fUnzipState.fUnzipLen[seekidx];
719 }
720
721 // If the requested basket is being unzipped by a background task, we try to steal a blk to unzip.
722 Int_t reqi = -1;
723
724 if (fUnzipState.IsProgress(seekidx)) {
725 if (fEmpty) {
726 for (Int_t ii = 0; ii < fNseek; ++ii) {
727 Int_t idx = (seekidx + 1 + ii) % fNseek;
728 if (fUnzipState.IsUntouched(idx)) {
729 if(fUnzipState.TryUnzipping(idx)) {
730 reqi = idx;
731 break;
732 }
733 }
734 }
735 if (reqi < 0) {
736 fEmpty = false;
737 } else {
738 UnzipCache(reqi);
739 }
740 }
741
742 if ( myCycle != fCycle ) {
743 if (gDebug > 0)
744 Info("GetUnzipBuffer", "Sudden paging Break!!! fNseek: %d, fIsLearning:%d",
746
747 seekidx = -1;
748 break;
749 }
750 }
751
752 } while (fUnzipState.IsProgress(seekidx));
753
754 // Here the block is not pending. It could be done or aborted or not yet being processed.
755 if ( (seekidx >= 0) && (fUnzipState.IsUnzipped(seekidx)) ) {
756 if(!(*buf)) {
757 *buf = fUnzipState.fUnzipChunks[seekidx].get();
758 fUnzipState.fUnzipChunks[seekidx].release();
759 *free = true;
760 } else {
761 memcpy(*buf, fUnzipState.fUnzipChunks[seekidx].get(), fUnzipState.fUnzipLen[seekidx]);
762 fUnzipState.fUnzipChunks[seekidx].reset();
763 *free = false;
764 }
765
766 fNStalls++;
767 return fUnzipState.fUnzipLen[seekidx];
768 } else {
769 // This is a complete miss. We want to avoid the background tasks
770 // to try unzipping this block in the future.
771 fUnzipState.SetMissed(seekidx);
772 }
773 } else {
774 loc = -1;
775 fIsTransferred = false;
776 }
777 }
778
779 if (len > fCompBufferSize) {
780 if(fCompBuffer) delete [] fCompBuffer;
781 fCompBuffer = new char[len];
783 } else {
784 if (fCompBufferSize > len * 4) {
785 if(fCompBuffer) delete [] fCompBuffer;
786 fCompBuffer = new char[len*2];
787 fCompBufferSize = len * 2;
788 }
789 }
790
791 res = 0;
792 if (!ReadBufferExt(fCompBuffer, pos, len, loc)) {
793 // Cache is invalidated and we need to wait for all unzipping tasks to be finished before fill new baskets in cache.
794#ifdef R__USE_IMT
796 fUnzipTaskGroup->Cancel();
797 fUnzipTaskGroup.reset();
798 }
799#endif
800 {
801 // Fill new baskets into cache.
802 R__LOCKGUARD(fIOMutex.get());
803 fFile->Seek(pos);
805 } // end of lock scope
806#ifdef R__USE_IMT
808 CreateTasks();
809 }
810#endif
811 }
812
813 if (res) res = -1;
814
815 if (!res) {
816 res = UnzipBuffer(buf, fCompBuffer);
817 *free = true;
818 }
819
820 if (!fIsLearning) {
821 fNMissed++;
822 }
823
824 return res;
825}
826
827////////////////////////////////////////////////////////////////////////////////
828/// static function: Sets the unzip relative buffer size
829
831{
832 fgRelBuffSize = relbufferSize;
833}
834
835////////////////////////////////////////////////////////////////////////////////
836/// Sets the size for the unzipping cache... by default it should be
837/// two times the size of the prefetching cache
838
840{
841 fUnzipBufferSize = bufferSize;
842}
843
844////////////////////////////////////////////////////////////////////////////////
845/// Unzips a ROOT specific buffer... by reading the header at the beginning.
846/// returns the size of the inflated buffer or -1 if error
847/// Note!! : If *dest == 0 we will allocate the buffer and it will be the
848/// responsibility of the caller to free it... it is useful for example
849/// to pass it to the creator of TBuffer
850/// src is the original buffer with the record (header+compressed data)
851/// *dest is the inflated buffer (including the header)
852
854{
855 Int_t uzlen = 0;
856 bool alloc = false;
857
858 // Here we read the header of the buffer
859 const Int_t hlen = 128;
860 Int_t nbytes = 0, objlen = 0, keylen = 0;
861 GetRecordHeader(src, hlen, nbytes, objlen, keylen);
862
863 if (!(*dest)) {
864 /* early consistency check */
865 UChar_t *bufcur = (UChar_t *) (src + keylen);
866 Int_t nin, nbuf;
867 if(objlen > nbytes - keylen && R__unzip_header(&nin, bufcur, &nbuf) != 0) {
868 Error("UnzipBuffer", "Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
869 uzlen = -1;
870 return uzlen;
871 }
872 Int_t l = keylen + objlen;
873 *dest = new char[l];
874 alloc = true;
875 }
876 // Must unzip the buffer
877 // fSeekPos[ind]; adress of zipped buffer
878 // fSeekLen[ind]; len of the zipped buffer
879 // &fBuffer[fSeekPos[ind]]; memory address
880
881 // This is similar to TBasket::ReadBasketBuffers
882 bool oldCase = objlen == nbytes - keylen
883 && ((TBranch*)fBranches->UncheckedAt(0))->GetCompressionLevel() != 0
884 && fFile->GetVersion() <= 30401;
885
886 if (objlen > nbytes-keylen || oldCase) {
887
888 // Copy the key
889 memcpy(*dest, src, keylen);
890 uzlen += keylen;
891
892 char *objbuf = *dest + keylen;
893 UChar_t *bufcur = (UChar_t *) (src + keylen);
894 Int_t nin, nbuf;
895 Int_t nout = 0;
896 Int_t noutot = 0;
897
898 while (true) {
899 Int_t hc = R__unzip_header(&nin, bufcur, &nbuf);
900 if (hc != 0) break;
901 if (gDebug > 2)
902 Info("UnzipBuffer", " nin:%d, nbuf:%d, bufcur[3] :%d, bufcur[4] :%d, bufcur[5] :%d ",
903 nin, nbuf, bufcur[3], bufcur[4], bufcur[5]);
904 if (oldCase && (nin > objlen || nbuf > objlen)) {
905 if (gDebug > 2)
906 Info("UnzipBuffer", "oldcase objlen :%d ", objlen);
907
908 //buffer was very likely not compressed in an old version
909 memcpy(*dest + keylen, src + keylen, objlen);
910 uzlen += objlen;
911 return uzlen;
912 }
913
914 R__unzip(&nin, bufcur, &nbuf, objbuf, &nout);
915
916 if (gDebug > 2)
917 Info("UnzipBuffer", "R__unzip nin:%d, bufcur:%p, nbuf:%d, objbuf:%p, nout:%d",
918 nin, bufcur, nbuf, objbuf, nout);
919
920 if (!nout) break;
921 noutot += nout;
922 if (noutot >= objlen) break;
923 bufcur += nin;
924 objbuf += nout;
925 }
926
927 if (noutot != objlen) {
928 Error("UnzipBuffer", "nbytes = %d, keylen = %d, objlen = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d",
929 nbytes,keylen,objlen, noutot,nout,nin,nbuf);
930 uzlen = -1;
931 if(alloc) delete [] *dest;
932 *dest = nullptr;
933 return uzlen;
934 }
935 uzlen += objlen;
936 } else {
937 memcpy(*dest, src, keylen);
938 uzlen += keylen;
939 memcpy(*dest + keylen, src + keylen, objlen);
940 uzlen += objlen;
941 }
942 return uzlen;
943}
944
945////////////////////////////////////////////////////////////////////////////////
946
948
949 printf("******TreeCacheUnzip statistics for file: %s ******\n",fFile->GetName());
950 printf("Max allowed mem for pending buffers: %lld\n", fUnzipBufferSize);
951 printf("Number of blocks unzipped by threads: %d\n", fNUnzip);
952 printf("Number of hits: %d\n", fNFound);
953 printf("Number of stalls: %d\n", fNStalls);
954 printf("Number of misses: %d\n", fNMissed);
955
957}
958
959////////////////////////////////////////////////////////////////////////////////
960
962 R__LOCKGUARD(fIOMutex.get());
963 return TTreeCache::ReadBufferExt(buf, pos, len, loc);
964}
void frombuf(char *&buf, Bool_t *x)
Definition Bytes.h:278
#define b(i)
Definition RSha256.hxx:100
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
int Int_t
Definition RtypesCore.h:45
unsigned char Byte_t
Definition RtypesCore.h:64
short Version_t
Definition RtypesCore.h:65
unsigned char UChar_t
Definition RtypesCore.h:38
float Float_t
Definition RtypesCore.h:57
short Short_t
Definition RtypesCore.h:39
double Double_t
Definition RtypesCore.h:59
long long Long64_t
Definition RtypesCore.h:80
const char Option_t
Definition RtypesCore.h:66
#define ClassImp(name)
Definition Rtypes.h:377
R__EXTERN TEnv * gEnv
Definition TEnv.h:170
Option_t Option_t option
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t dest
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t index
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t src
Int_t gDebug
Definition TROOT.cxx:597
void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout)
int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout)
#define R__LOCKGUARD(mutex)
#define free
Definition civetweb.c:1539
This class provides a simple interface to execute the same task multiple times in parallel threads,...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
A TTree is a list of TBranches.
Definition TBranch.h:93
A chain is a collection of files containing TTree objects.
Definition TChain.h:33
static TClass * Class()
Long64_t * GetTreeOffset() const
Definition TChain.h:121
Int_t GetTreeNumber() const override
Definition TChain.h:120
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition TEnv.cxx:491
<div class="legacybox"><h2>Legacy Code</h2> TEventList is a legacy interface: there will be no bug fi...
Definition TEventList.h:31
virtual bool ContainsRange(Long64_t entrymin, Long64_t entrymax)
Return TRUE if list contains entries from entrymin to entrymax included.
Int_t * fSeekIndex
[fNseek] sorted index table of fSeek
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
Long64_t * fSeekSort
[fNseek] Position on file of buffers to be prefetched (sorted)
Int_t * fSeekLen
[fNseek] Length of buffers to be prefetched
Int_t fNtot
Total size of prefetched blocks.
virtual void Prefetch(Long64_t pos, Int_t len)
Add block of length len at position pos in the list of blocks to be prefetched.
Long64_t * fSeek
[fNseek] Position on file of buffers to be prefetched
Bool_t fIsTransferred
True when fBuffer contains something valid.
TFile * fFile
Pointer to file.
Int_t fNseek
Number of blocks to be prefetched.
virtual Int_t GetBufferSize() const
virtual void Seek(Long64_t offset, ERelativeTo pos=kBeg)
Seek to a specific position in the file. Pos it either kBeg, kCur or kEnd.
Definition TFile.cxx:2274
Int_t GetVersion() const
Definition TFile.h:245
virtual Bool_t ReadBufferAsync(Long64_t offs, Int_t len)
Definition TFile.cxx:5208
virtual Bool_t ReadBuffer(char *buf, Int_t len)
Read a buffer from the file.
Definition TFile.cxx:1770
const char * GetName() const override
Returns name of object.
Definition TNamed.h:47
TObject * UncheckedAt(Int_t i) const
Definition TObjArray.h:84
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition TObject.cxx:973
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition TObject.cxx:987
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition TObject.cxx:961
A TTreeCache which exploits parallelized decompression of its own content.
UnzipState_t fUnzipState
void Init()
Initialization procedure common to all the constructors.
Int_t UnzipCache(Int_t index)
This inflates a basket in the cache.
Int_t fNMissed
! number of blocks that were not found in the cache and were unzipped
void UpdateBranches(TTree *tree) override
update pointer to current Tree and recompute pointers to the branches in the cache
Int_t SetBufferSize(Int_t buffersize) override
Change the underlying buffer size of the cache.
Int_t AddBranch(TBranch *b, bool subbranches=false) override
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
void ResetCache() override
This will delete the list of buffers that are in the unzipping cache and will reset certain values in...
void SetEntryRange(Long64_t emin, Long64_t emax) override
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
static bool IsParallelUnzip()
Static function that tells wether the multithreading unzipping is activated.
Int_t fNseekMax
! fNseek can change so we need to know its max size
Int_t fNStalls
! number of hits which caused a stall
static TTreeCacheUnzip::EParUnzipMode fgParallel
Indicate if we want to activate the parallelism.
std::unique_ptr< ROOT::Experimental::TTaskGroup > fUnzipTaskGroup
static Int_t SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option=TTreeCacheUnzip::kEnable)
Static function that (de)activates multithreading unzipping.
bool FillBuffer() override
Fill the cache buffer with the branches in the cache.
Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc) override
Int_t UnzipBuffer(char **dest, char *src)
Unzips a ROOT specific buffer... by reading the header at the beginning.
Int_t GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
Read the logical record header from the buffer buf.
static void SetUnzipRelBufferSize(Float_t relbufferSize)
static function: Sets the unzip relative buffer size
std::unique_ptr< TMutex > fIOMutex
Int_t fNUnzip
! number of blocks that were unzipped
Int_t CreateTasks()
We create a TTaskGroup and asynchronously maps each group of baskets(> 100 kB in total) to a task.
Int_t GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, bool *free) override
We try to read a buffer that has already been unzipped Returns -1 in case of read failure,...
Long64_t fUnzipBufferSize
! Max Size for the ready unzipped blocks (default is 2*fBufferSize)
void SetUnzipBufferSize(Long64_t bufferSize)
Sets the size for the unzipping cache... by default it should be two times the size of the prefetchin...
Int_t fUnzipGroupSize
! Min accumulated size of a group of baskets ready to be unzipped by a IMT task
void StopLearningPhase() override
It's the same as TTreeCache::StopLearningPhase but we guarantee that we start the unzipping just afte...
void Print(Option_t *option="") const override
Print cache statistics.
bool fParallel
Indicate if we want to activate the parallelism (for this instance)
Int_t fNFound
! number of blocks that were found in the cache
static Double_t fgRelBuffSize
This is the percentage of the TTreeCacheUnzip that will be used.
~TTreeCacheUnzip() override
Destructor. (in general called by the TFile destructor)
static EParUnzipMode GetParallelUnzip()
Static function that returns the parallel option (to indicate an additional thread)
A cache to speed-up the reading of ROOT datasets.
Definition TTreeCache.h:32
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
Long64_t fEntryMin
! first entry in the cache
Definition TTreeCache.h:38
Long64_t fEntryNext
! next entry number where cache must be filled
Definition TTreeCache.h:41
TTree * GetTree() const
Definition TTreeCache.h:149
bool fIsLearning
! true if cache is in learning mode
Definition TTreeCache.h:54
virtual void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
Long64_t fEntryMax
! last entry in the cache
Definition TTreeCache.h:39
Long64_t fEntryCurrent
! current lowest entry number in the cache
Definition TTreeCache.h:40
Int_t SetBufferSize(Int_t buffersize) override
Change the underlying buffer size of the cache.
Int_t fNReadPref
Number of blocks that were prefetched.
Definition TTreeCache.h:49
TTree * fTree
! pointer to the current Tree
Definition TTreeCache.h:53
virtual void StopLearningPhase()
This is the counterpart of StartLearningPhase() and can be used to stop the learning phase.
Int_t fNbranches
! Number of branches in the cache
Definition TTreeCache.h:44
void Print(Option_t *option="") const override
Print cache statistics.
Int_t AddBranch(TBranch *b, bool subgbranches=false) override
Add a branch to the list of branches to be stored in the cache this function is called by the user vi...
TObjArray * fBranches
! List of branches to be stored in the cache
Definition TTreeCache.h:51
Helper class to iterate over cluster of baskets.
Definition TTree.h:270
Long64_t GetNextEntry()
Definition TTree.h:307
A TTree represents a columnar dataset.
Definition TTree.h:79
TEventList * GetEventList() const
Definition TTree.h:473
TClass * IsA() const override
Definition TTree.h:659
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:570
Long64_t BinarySearch(Long64_t n, const T *array, T value)
Binary search in an array of n values to locate value.
Definition TMathBase.h:347
void Reset(Int_t oldSize, Int_t newSize)
Reset all baskets' state arrays.
void Clear(Int_t size)
Clear all baskets' state arrays.
bool IsUnzipped(Int_t index) const
Check if the basket is unzipped already.
bool IsFinished(Int_t index) const
std::atomic< Byte_t > * fUnzipStatus
! [fNSeek]
void SetUnzipped(Int_t index, char *buf, Int_t len)
std::vector< Int_t > fUnzipLen
! [fNseek] Length of the unzipped buffers
bool TryUnzipping(Int_t index)
Start unzipping the basket if it is untouched yet.
bool IsProgress(Int_t index) const
bool IsUntouched(Int_t index) const
void SetFinished(Int_t index)
Set cache as finished.
std::unique_ptr< char[]> * fUnzipChunks
! [fNseek] Individual unzipped chunks. Their summed size is kept under control.
TLine l
Definition textangle.C:4