Logo ROOT   6.16/01
Reference Guide
TTreeCacheUnzip.cxx
Go to the documentation of this file.
1// @(#)root/tree:$Id$
2// Author: Leandro Franco 10/04/2008
3
4/*************************************************************************
5 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/** \class TTreeCacheUnzip
13\ingroup tree
14
15Specialization of TTreeCache for parallel Unzipping.
16
17Fabrizio Furano (CERN) Aug 2009
18Core TTree-related code borrowed from the previous version
19 by Leandro Franco and Rene Brun
20
21## Parallel Unzipping
22
23TTreeCache has been specialised in order to let additional threads
24free to unzip in advance its content. In this implementation we
25support up to 10 threads, but right now it makes more sense to
26limit their number to 1-2
27
28The application reading data is carefully synchronized, in order to:
29 - if the block it wants is not unzipped, it self-unzips it without
30 waiting
31 - if the block is being unzipped in parallel, it waits only
32 for that unzip to finish
33 - if the block has already been unzipped, it takes it
34
35This is supposed to cancel a part of the unzipping latency, at the
36expenses of cpu time.
37
38The default parameters are the same of the prev version, i.e. 20%
39of the TTreeCache cache size. To change it use
40TTreeCache::SetUnzipBufferSize(Long64_t bufferSize)
41where bufferSize must be passed in bytes.
42*/
43
44#include "TTreeCacheUnzip.h"
45#include "TBranch.h"
46#include "TChain.h"
47#include "TEnv.h"
48#include "TEventList.h"
49#include "TFile.h"
50#include "TMath.h"
51#include "TMutex.h"
52#include "TROOT.h"
53#include "TVirtualMutex.h"
54
55#ifdef R__USE_IMT
57#endif
58
59extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);
60extern "C" int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout);
61
63
64// The unzip cache does not consume memory by itself, it just allocates in advance
65// mem blocks which are then picked as they are by the baskets.
66// Hence there is no good reason to limit it too much
68
70
71////////////////////////////////////////////////////////////////////////////////
72/// Clear all baskets' state arrays.
73
75 for (Int_t i = 0; i < size; i++) {
76 if (!fUnzipLen.empty()) fUnzipLen[i] = 0;
77 if (fUnzipChunks) {
78 if (fUnzipChunks[i]) fUnzipChunks[i].reset();
79 }
80 if (fUnzipStatus) fUnzipStatus[i].store(0);
81 }
82}
83
84////////////////////////////////////////////////////////////////////////////////
85
87 return fUnzipStatus[index].load() == kUntouched;
88}
89
90////////////////////////////////////////////////////////////////////////////////
91
93 return fUnzipStatus[index].load() == kProgress;
94}
95
96////////////////////////////////////////////////////////////////////////////////
97
99 return fUnzipStatus[index].load() == kFinished;
100}
101
102////////////////////////////////////////////////////////////////////////////////
103/// Check if the basket is unzipped already. We must make sure the length in
104/// fUnzipLen is larger than 0.
105
107 return (fUnzipStatus[index].load() == kFinished) && (fUnzipChunks[index].get()) && (fUnzipLen[index] > 0);
108}
109
110////////////////////////////////////////////////////////////////////////////////
111/// Reset all baskets' state arrays. This function is only called by main
112/// thread and parallel processing from upper layers should be disabled such
113/// as IMT in TTree::GetEntry(). Other threads should not call this function
114/// since it is not thread-safe.
115
117 std::vector<Int_t> aUnzipLen = std::vector<Int_t>(newSize, 0);
118 std::unique_ptr<char[]> *aUnzipChunks = new std::unique_ptr<char[]>[newSize];
119 std::atomic<Byte_t> *aUnzipStatus = new std::atomic<Byte_t>[newSize];
120
121 for (Int_t i = 0; i < newSize; ++i)
122 aUnzipStatus[i].store(0);
123
124 for (Int_t i = 0; i < oldSize; i++) {
125 aUnzipLen[i] = fUnzipLen[i];
126 aUnzipChunks[i] = std::move(fUnzipChunks[i]);
127 aUnzipStatus[i].store(fUnzipStatus[i].load());
128 }
129
130 if (fUnzipChunks) delete [] fUnzipChunks;
131 if (fUnzipStatus) delete [] fUnzipStatus;
132
133 fUnzipLen = aUnzipLen;
134 fUnzipChunks = aUnzipChunks;
135 fUnzipStatus = aUnzipStatus;
136}
137
138////////////////////////////////////////////////////////////////////////////////
139/// Set cache as finished.
140/// There are three scenarios that a basket is set as finished:
141/// 1. The basket has already been unzipped.
142/// 2. The thread is aborted from unzipping process.
143/// 3. To avoid other tasks/threads work on this basket,
144/// main thread marks the basket as finished and designates itself
145/// to unzip this basket.
146
148 fUnzipLen[index] = 0;
149 fUnzipChunks[index].reset();
150 fUnzipStatus[index].store((Byte_t)kFinished);
151}
152
153////////////////////////////////////////////////////////////////////////////////
154
156 fUnzipChunks[index].reset();
157 fUnzipStatus[index].store((Byte_t)kFinished);
158}
159
160////////////////////////////////////////////////////////////////////////////////
161
163 // Update status array at the very end because we need to be synchronous with the main thread.
164 fUnzipLen[index] = len;
165 fUnzipChunks[index].reset(buf);
166 fUnzipStatus[index].store((Byte_t)kFinished);
167}
168
169////////////////////////////////////////////////////////////////////////////////
170/// Start unzipping the basket if it is untouched yet.
171
173 Byte_t oldValue = kUntouched;
174 Byte_t newValue = kProgress;
175 return fUnzipStatus[index].compare_exchange_weak(oldValue, newValue, std::memory_order_release, std::memory_order_relaxed);
176}
177
178////////////////////////////////////////////////////////////////////////////////
179
182 fEmpty(kTRUE),
183 fCycle(0),
184 fNseekMax(0),
187 fNFound(0),
188 fNMissed(0),
189 fNStalls(0),
190 fNUnzip(0)
191{
192 // Default Constructor.
193 Init();
194}
195
196////////////////////////////////////////////////////////////////////////////////
197/// Constructor.
198
200 fAsyncReading(kFALSE),
201 fEmpty(kTRUE),
202 fCycle(0),
203 fNseekMax(0),
204 fUnzipGroupSize(0),
205 fUnzipBufferSize(0),
206 fNFound(0),
207 fNMissed(0),
208 fNStalls(0),
209 fNUnzip(0)
210{
211 Init();
212}
213
214////////////////////////////////////////////////////////////////////////////////
215/// Initialization procedure common to all the constructors.
216
218{
219#ifdef R__USE_IMT
220 fUnzipTaskGroup.reset();
221#endif
222 fIOMutex = new TMutex(kTRUE);
223
224 fCompBuffer = new char[16384];
225 fCompBufferSize = 16384;
226
227 fUnzipGroupSize = 102400; // Each task unzips at least 100 KB
228
229 if (fgParallel == kDisable) {
231 }
232 else if(fgParallel == kEnable || fgParallel == kForce) {
234
235 if(gDebug > 0)
236 Info("TTreeCacheUnzip", "Enabling Parallel Unzipping");
237
239
240 }
241 else {
242 Warning("TTreeCacheUnzip", "Parallel Option unknown");
243 }
244
245 // Check if asynchronous reading is supported by this TFile specialization
246 if (gEnv->GetValue("TFile.AsyncReading", 1)) {
247 if (fFile && !(fFile->ReadBufferAsync(0, 0)))
249 }
250
251}
252
253////////////////////////////////////////////////////////////////////////////////
254/// Destructor. (in general called by the TFile destructor)
255
257{
258 ResetCache();
259 delete fIOMutex;
261}
262
263////////////////////////////////////////////////////////////////////////////////
264/// Add a branch to the list of branches to be stored in the cache
265/// this function is called by TBranch::GetBasket
266/// Returns:
267/// - 0 branch added or already included
268/// - -1 on error
269
271{
272 return TTreeCache::AddBranch(b, subbranches);
273}
274
275////////////////////////////////////////////////////////////////////////////////
276/// Add a branch to the list of branches to be stored in the cache
277/// this function is called by TBranch::GetBasket
278/// Returns:
279/// - 0 branch added or already included
280/// - -1 on error
281
282Int_t TTreeCacheUnzip::AddBranch(const char *branch, Bool_t subbranches /*= kFALSE*/)
283{
284 return TTreeCache::AddBranch(branch, subbranches);
285}
286
287////////////////////////////////////////////////////////////////////////////////
288
290{
291
292 if (fNbranches <= 0) return kFALSE;
293
294 // Fill the cache buffer with the branches in the cache.
296
298 Long64_t entry = tree->GetReadEntry();
299
300 // If the entry is in the range we previously prefetched, there is
301 // no point in retrying. Note that this will also return false
302 // during the training phase (fEntryNext is then set intentional to
303 // the end of the training phase).
304 if (fEntryCurrent <= entry && entry < fEntryNext) return kFALSE;
305
306 // Triggered by the user, not the learning phase
307 if (entry == -1) entry = 0;
308
309 TTree::TClusterIterator clusterIter = tree->GetClusterIterator(entry);
310 fEntryCurrent = clusterIter();
311 fEntryNext = clusterIter.GetNextEntry();
312
314 if (fEntryMax <= 0) fEntryMax = tree->GetEntries();
316
317 // Check if owner has a TEventList set. If yes we optimize for this
318 // Special case reading only the baskets containing entries in the
319 // list.
320 TEventList *elist = fTree->GetEventList();
321 Long64_t chainOffset = 0;
322 if (elist) {
323 if (fTree->IsA() ==TChain::Class()) {
324 TChain *chain = (TChain*)fTree;
325 Int_t t = chain->GetTreeNumber();
326 chainOffset = chain->GetTreeOffset()[t];
327 }
328 }
329
330 //clear cache buffer
332
333 //store baskets
334 for (Int_t i = 0; i < fNbranches; i++) {
336 if (b->GetDirectory() == 0) continue;
337 if (b->GetDirectory()->GetFile() != fFile) continue;
338 Int_t nb = b->GetMaxBaskets();
339 Int_t *lbaskets = b->GetBasketBytes();
340 Long64_t *entries = b->GetBasketEntry();
341 if (!lbaskets || !entries) continue;
342 //we have found the branch. We now register all its baskets
343 //from the requested offset to the basket below fEntrymax
344 Int_t blistsize = b->GetListOfBaskets()->GetSize();
345 for (Int_t j=0;j<nb;j++) {
346 // This basket has already been read, skip it
347 if (j<blistsize && b->GetListOfBaskets()->UncheckedAt(j)) continue;
348
349 Long64_t pos = b->GetBasketSeek(j);
350 Int_t len = lbaskets[j];
351 if (pos <= 0 || len <= 0) continue;
352 //important: do not try to read fEntryNext, otherwise you jump to the next autoflush
353 if (entries[j] >= fEntryNext) continue;
354 if (entries[j] < entry && (j < nb - 1 && entries[j+1] <= entry)) continue;
355 if (elist) {
356 Long64_t emax = fEntryMax;
357 if (j < nb - 1) emax = entries[j+1] - 1;
358 if (!elist->ContainsRange(entries[j] + chainOffset, emax + chainOffset)) continue;
359 }
360 fNReadPref++;
361
362 TFileCacheRead::Prefetch(pos, len);
363 }
364 if (gDebug > 0) printf("Entry: %lld, registering baskets branch %s, fEntryNext=%lld, fNseek=%d, fNtot=%d\n", entry, ((TBranch*)fBranches->UncheckedAt(i))->GetName(), fEntryNext, fNseek, fNtot);
365 }
366
367 // Now fix the size of the status arrays
368 ResetCache();
370
371 return kTRUE;
372}
373
374////////////////////////////////////////////////////////////////////////////////
375/// Change the underlying buffer size of the cache.
376/// Returns:
377/// - 0 if the buffer content is still available
378/// - 1 if some or all of the buffer content has been made unavailable
379/// - -1 on error
380
382{
383 Int_t res = TTreeCache::SetBufferSize(buffersize);
384 if (res < 0) {
385 return res;
386 }
388 ResetCache();
389 return 1;
390}
391
392////////////////////////////////////////////////////////////////////////////////
393/// Set the minimum and maximum entry number to be processed
394/// this information helps to optimize the number of baskets to read
395/// when prefetching the branch buffers.
396
398{
399 TTreeCache::SetEntryRange(emin, emax);
400}
401
402////////////////////////////////////////////////////////////////////////////////
403/// It's the same as TTreeCache::StopLearningPhase but we guarantee that
404/// we start the unzipping just after getting the buffers
405
407{
409}
410
411////////////////////////////////////////////////////////////////////////////////
412///update pointer to current Tree and recompute pointers to the branches in the cache
413
415{
417}
418
419////////////////////////////////////////////////////////////////////////////////
420// //
421// From now on we have the methods concerning the threading part of the cache //
422// //
423////////////////////////////////////////////////////////////////////////////////
424
425////////////////////////////////////////////////////////////////////////////////
426/// Static function that returns the parallel option
427/// (to indicate an additional thread)
428
430{
431 return fgParallel;
432}
433
434////////////////////////////////////////////////////////////////////////////////
435/// Static function that tells wether the multithreading unzipping is activated
436
438{
439 if (fgParallel == kEnable || fgParallel == kForce)
440 return kTRUE;
441
442 return kFALSE;
443}
444
445////////////////////////////////////////////////////////////////////////////////
446/// Static function that (de)activates multithreading unzipping
447///
448/// The possible options are:
449/// - kEnable _Enable_ it, which causes an automatic detection and launches the
450/// additional thread if the number of cores in the machine is greater than
451/// one
452/// - kDisable _Disable_ will not activate the additional thread.
453/// - kForce _Force_ will start the additional thread even if there is only one
454/// core. the default will be taken as kEnable.
455///
456/// Returns 0 if there was an error, 1 otherwise.
457
459{
461 fgParallel = option;
462 return 1;
463 }
464 return 0;
465}
466
467////////////////////////////////////////////////////////////////////////////////
468// //
469// From now on we have the methods concerning the unzipping part of the cache //
470// //
471////////////////////////////////////////////////////////////////////////////////
472
473////////////////////////////////////////////////////////////////////////////////
474/// Read the logical record header from the buffer buf.
475/// That must be the pointer tho the header part not the object by itself and
476/// must contain data of at least maxbytes
477/// Returns nread;
478///
479/// In output arguments:
480///
481/// - nbytes : number of bytes in record
482/// if negative, this is a deleted record
483/// if 0, cannot read record, wrong value of argument first
484/// - objlen : uncompressed object size
485/// - keylen : length of logical record header
486///
487/// Note that the arguments objlen and keylen are returned only
488/// if maxbytes >=16
489/// Note: This was adapted from TFile... so some things dont apply
490
491Int_t TTreeCacheUnzip::GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
492{
493 Version_t versionkey;
494 Short_t klen;
495 UInt_t datime;
496 Int_t nb = 0, olen;
497 Int_t nread = maxbytes;
498 frombuf(buf, &nb);
499 nbytes = nb;
500 if (nb < 0) return nread;
501 // const Int_t headerSize = Int_t(sizeof(nb) +sizeof(versionkey) +sizeof(olen) +sizeof(datime) +sizeof(klen));
502 const Int_t headerSize = 16;
503 if (nread < headerSize) return nread;
504 frombuf(buf, &versionkey);
505 frombuf(buf, &olen);
506 frombuf(buf, &datime);
507 frombuf(buf, &klen);
508 if (!olen) olen = nbytes - klen;
509 objlen = olen;
510 keylen = klen;
511 return nread;
512}
513
514////////////////////////////////////////////////////////////////////////////////
515/// This will delete the list of buffers that are in the unzipping cache
516/// and will reset certain values in the cache.
517/// This name is ambiguos because the method doesn't reset the whole cache,
518/// only the part related to the unzipping
519/// Note: This method is completely different from TTreeCache::ResetCache(),
520/// in that method we were cleaning the prefetching buffer while here we
521/// delete the information about the unzipped buffers
522
524{
525 // Reset all the lists and wipe all the chunks
526 fCycle++;
528
529 if(fNseekMax < fNseek){
530 if (gDebug > 0)
531 Info("ResetCache", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
532
535 }
536 fEmpty = kTRUE;
537}
538
539////////////////////////////////////////////////////////////////////////////////
540/// This inflates a basket in the cache.. passing the data to a new
541/// buffer that will only wait there to be read...
542/// This function is responsible to update corresponding elements in
543/// fUnzipStatus, fUnzipChunks and fUnzipLen. Since we use atomic variables
544/// in fUnzipStatus to exclusively unzip the basket, we must update
545/// fUnzipStatus after fUnzipChunks and fUnzipLen and make sure fUnzipChunks
546/// and fUnzipLen are ready before main thread fetch the data.
547
549{
550 Int_t myCycle;
551 const Int_t hlen = 128;
552 Int_t objlen = 0, keylen = 0;
553 Int_t nbytes = 0;
554 Int_t readbuf = 0;
555
556 Long64_t rdoffs = 0;
557 Int_t rdlen = 0;
558
559 // To synchronize with the 'paging'
560 myCycle = fCycle;
561 rdoffs = fSeek[index];
562 rdlen = fSeekLen[index];
563
564 Int_t loc = -1;
565 if (!fNseek || fIsLearning) {
566 return 1;
567 }
568
569 if ((myCycle != fCycle) || !fIsTransferred) {
570 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
571 return 1;
572 }
573
574 // Prepare a memory buffer of adequate size
575 char* locbuff = 0;
576 if (rdlen > 16384) {
577 locbuff = new char[rdlen];
578 } else if (rdlen * 3 < 16384) {
579 locbuff = new char[rdlen * 2];
580 } else {
581 locbuff = new char[16384];
582 }
583
584 readbuf = ReadBufferExt(locbuff, rdoffs, rdlen, loc);
585
586 if (readbuf <= 0) {
587 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
588 if (locbuff) delete [] locbuff;
589 return -1;
590 }
591
592 GetRecordHeader(locbuff, hlen, nbytes, objlen, keylen);
593
594 Int_t len = (objlen > nbytes - keylen) ? keylen + objlen : nbytes;
595 // If the single unzipped chunk is really too big, reset it to not processable
596 // I.e. mark it as done but set the pointer to 0
597 // This block will be unzipped synchronously in the main thread
598 // TODO: ROOT internally breaks zipped buffers into 16MB blocks, we can probably still unzip in parallel.
599 if (len > 4 * fUnzipBufferSize) {
600 if (gDebug > 0)
601 Info("UnzipCache", "Block %d is too big, skipping.", index);
602
603 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
604 if (locbuff) delete [] locbuff;
605 return 0;
606 }
607
608 // Unzip it into a new blk
609 char *ptr = 0;
610 Int_t loclen = UnzipBuffer(&ptr, locbuff);
611 if ((loclen > 0) && (loclen == objlen + keylen)) {
612 if ((myCycle != fCycle) || !fIsTransferred) {
613 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
614 if (locbuff) delete [] locbuff;
615 return 1;
616 }
617 fUnzipState.SetUnzipped(index, ptr, loclen); // Set it as done
618 fNUnzip++;
619 } else {
620 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
621 }
622
623 if (locbuff) delete [] locbuff;
624 return 0;
625}
626
627#ifdef R__USE_IMT
628////////////////////////////////////////////////////////////////////////////////
629/// We create a TTaskGroup and asynchronously maps each group of baskets(> 100 kB in total)
630/// to a task. In TTaskGroup, we use TThreadExecutor to do the actually work of unzipping
631/// a group of basket. The purpose of creating TTaskGroup is to avoid competing with main thread.
632
634{
635 auto mapFunction = [&]() {
636 auto unzipFunction = [&](const std::vector<Int_t> &indices) {
637 // If cache is invalidated and we should return immediately.
638 if (!fIsTransferred) return nullptr;
639
640 for (auto ii : indices) {
641 if(fUnzipState.TryUnzipping(ii)) {
642 Int_t res = UnzipCache(ii);
643 if(res)
644 if (gDebug > 0)
645 Info("UnzipCache", "Unzipping failed or cache is in learning state");
646 }
647 }
648 return nullptr;
649 };
650
651 Int_t accusz = 0;
652 std::vector<std::vector<Int_t>> basketIndices;
653 std::vector<Int_t> indices;
654 if (fUnzipGroupSize <= 0) fUnzipGroupSize = 102400;
655 for (Int_t i = 0; i < fNseek; i++) {
656 while (accusz < fUnzipGroupSize) {
657 accusz += fSeekLen[i];
658 indices.push_back(i);
659 i++;
660 if (i >= fNseek) break;
661 }
662 if (i < fNseek) i--;
663 basketIndices.push_back(indices);
664 indices.clear();
665 accusz = 0;
666 }
668 pool.Foreach(unzipFunction, basketIndices);
669 };
670
672 fUnzipTaskGroup->Run(mapFunction);
673
674 return 0;
675}
676#endif
677
678////////////////////////////////////////////////////////////////////////////////
679/// We try to read a buffer that has already been unzipped
680/// Returns -1 in case of read failure, 0 in case it's not in the
681/// cache and n>0 in case read from cache (number of bytes copied).
682/// pos and len are the original values as were passed to ReadBuffer
683/// but instead we will return the inflated buffer.
684/// Note!! : If *buf == 0 we will allocate the buffer and it will be the
685/// responsability of the caller to free it... it is useful for example
686/// to pass it to the creator of TBuffer
687
689{
690 Int_t res = 0;
691 Int_t loc = -1;
692
693 // We go straight to TTreeCache/TfileCacheRead, in order to get the info we need
694 // pointer to the original zipped chunk
695 // its index in the original unsorted offsets lists
696 //
697 // Actually there are situations in which copying the buffer is not
698 // useful. But the choice is among doing once more a small memcpy or a binary search in a large array. I prefer the former.
699 // Also, here we prefer not to trigger the (re)population of the chunks in the TFileCacheRead. That is
700 // better to be done in the main thread.
701
702 Int_t myCycle = fCycle;
703
704 if (fParallel && !fIsLearning) {
705
706 if(fNseekMax < fNseek){
707 if (gDebug > 0)
708 Info("GetUnzipBuffer", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
709
712 }
713
715 if ((fCycle == myCycle) && (loc >= 0) && (loc < fNseek) && (pos == fSeekSort[loc])) {
716
717 // The buffer is, at minimum, in the file cache. We must know its index in the requests list
718 // In order to get its info
719 Int_t seekidx = fSeekIndex[loc];
720
721 do {
722
723 // If the block is ready we get it immediately.
724 // And also we don't have to alloc the blks. This is supposed to be
725 // the main thread of the app.
726 if (fUnzipState.IsUnzipped(seekidx)) {
727 if(!(*buf)) {
728 *buf = fUnzipState.fUnzipChunks[seekidx].get();
729 fUnzipState.fUnzipChunks[seekidx].release();
730 *free = kTRUE;
731 } else {
732 memcpy(*buf, fUnzipState.fUnzipChunks[seekidx].get(), fUnzipState.fUnzipLen[seekidx]);
733 fUnzipState.fUnzipChunks[seekidx].reset();
734 *free = kFALSE;
735 }
736
737 fNFound++;
738 return fUnzipState.fUnzipLen[seekidx];
739 }
740
741 // If the requested basket is being unzipped by a background task, we try to steal a blk to unzip.
742 Int_t reqi = -1;
743
744 if (fUnzipState.IsProgress(seekidx)) {
745 if (fEmpty) {
746 for (Int_t ii = 0; ii < fNseek; ++ii) {
747 Int_t idx = (seekidx + 1 + ii) % fNseek;
748 if (fUnzipState.IsUntouched(idx)) {
749 if(fUnzipState.TryUnzipping(idx)) {
750 reqi = idx;
751 break;
752 }
753 }
754 }
755 if (reqi < 0) {
756 fEmpty = kFALSE;
757 } else {
758 UnzipCache(reqi);
759 }
760 }
761
762 if ( myCycle != fCycle ) {
763 if (gDebug > 0)
764 Info("GetUnzipBuffer", "Sudden paging Break!!! fNseek: %d, fIsLearning:%d",
766
767 seekidx = -1;
768 break;
769 }
770 }
771
772 } while (fUnzipState.IsProgress(seekidx));
773
774 // Here the block is not pending. It could be done or aborted or not yet being processed.
775 if ( (seekidx >= 0) && (fUnzipState.IsUnzipped(seekidx)) ) {
776 if(!(*buf)) {
777 *buf = fUnzipState.fUnzipChunks[seekidx].get();
778 fUnzipState.fUnzipChunks[seekidx].release();
779 *free = kTRUE;
780 } else {
781 memcpy(*buf, fUnzipState.fUnzipChunks[seekidx].get(), fUnzipState.fUnzipLen[seekidx]);
782 fUnzipState.fUnzipChunks[seekidx].reset();
783 *free = kFALSE;
784 }
785
786 fNStalls++;
787 return fUnzipState.fUnzipLen[seekidx];
788 } else {
789 // This is a complete miss. We want to avoid the background tasks
790 // to try unzipping this block in the future.
791 fUnzipState.SetMissed(seekidx);
792 }
793 } else {
794 loc = -1;
796 }
797 }
798
799 if (len > fCompBufferSize) {
800 if(fCompBuffer) delete [] fCompBuffer;
801 fCompBuffer = new char[len];
802 fCompBufferSize = len;
803 } else {
804 if (fCompBufferSize > len * 4) {
805 if(fCompBuffer) delete [] fCompBuffer;
806 fCompBuffer = new char[len*2];
807 fCompBufferSize = len * 2;
808 }
809 }
810
811 res = 0;
812 if (!ReadBufferExt(fCompBuffer, pos, len, loc)) {
813 // Cache is invalidated and we need to wait for all unzipping tasks to befinished before fill new baskets in cache.
814#ifdef R__USE_IMT
815 if(fUnzipTaskGroup) {
816 fUnzipTaskGroup->Cancel();
817 fUnzipTaskGroup.reset();
818 }
819#endif
820 {
821 // Fill new baskets into cache.
823 fFile->Seek(pos);
824 res = fFile->ReadBuffer(fCompBuffer, len);
825 } // end of lock scope
826#ifdef R__USE_IMT
827 CreateTasks();
828#endif
829 }
830
831 if (res) res = -1;
832
833 if (!res) {
834 res = UnzipBuffer(buf, fCompBuffer);
835 *free = kTRUE;
836 }
837
838 if (!fIsLearning) {
839 fNMissed++;
840 }
841
842 return res;
843}
844
845////////////////////////////////////////////////////////////////////////////////
846/// static function: Sets the unzip relatibe buffer size
847
849{
850 fgRelBuffSize = relbufferSize;
851}
852
853////////////////////////////////////////////////////////////////////////////////
854/// Sets the size for the unzipping cache... by default it should be
855/// two times the size of the prefetching cache
856
858{
859 fUnzipBufferSize = bufferSize;
860}
861
862////////////////////////////////////////////////////////////////////////////////
863/// Unzips a ROOT specific buffer... by reading the header at the beginning.
864/// returns the size of the inflated buffer or -1 if error
865/// Note!! : If *dest == 0 we will allocate the buffer and it will be the
866/// responsability of the caller to free it... it is useful for example
867/// to pass it to the creator of TBuffer
868/// src is the original buffer with the record (header+compressed data)
869/// *dest is the inflated buffer (including the header)
870
872{
873 Int_t uzlen = 0;
874 Bool_t alloc = kFALSE;
875
876 // Here we read the header of the buffer
877 const Int_t hlen = 128;
878 Int_t nbytes = 0, objlen = 0, keylen = 0;
879 GetRecordHeader(src, hlen, nbytes, objlen, keylen);
880
881 if (!(*dest)) {
882 /* early consistency check */
883 UChar_t *bufcur = (UChar_t *) (src + keylen);
884 Int_t nin, nbuf;
885 if(objlen > nbytes - keylen && R__unzip_header(&nin, bufcur, &nbuf) != 0) {
886 Error("UnzipBuffer", "Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
887 uzlen = -1;
888 return uzlen;
889 }
890 Int_t l = keylen + objlen;
891 *dest = new char[l];
892 alloc = kTRUE;
893 }
894 // Must unzip the buffer
895 // fSeekPos[ind]; adress of zipped buffer
896 // fSeekLen[ind]; len of the zipped buffer
897 // &fBuffer[fSeekPos[ind]]; memory address
898
899 // This is similar to TBasket::ReadBasketBuffers
900 Bool_t oldCase = objlen == nbytes - keylen
901 && ((TBranch*)fBranches->UncheckedAt(0))->GetCompressionLevel() != 0
902 && fFile->GetVersion() <= 30401;
903
904 if (objlen > nbytes-keylen || oldCase) {
905
906 // Copy the key
907 memcpy(*dest, src, keylen);
908 uzlen += keylen;
909
910 char *objbuf = *dest + keylen;
911 UChar_t *bufcur = (UChar_t *) (src + keylen);
912 Int_t nin, nbuf;
913 Int_t nout = 0;
914 Int_t noutot = 0;
915
916 while (1) {
917 Int_t hc = R__unzip_header(&nin, bufcur, &nbuf);
918 if (hc != 0) break;
919 if (gDebug > 2)
920 Info("UnzipBuffer", " nin:%d, nbuf:%d, bufcur[3] :%d, bufcur[4] :%d, bufcur[5] :%d ",
921 nin, nbuf, bufcur[3], bufcur[4], bufcur[5]);
922 if (oldCase && (nin > objlen || nbuf > objlen)) {
923 if (gDebug > 2)
924 Info("UnzipBuffer", "oldcase objlen :%d ", objlen);
925
926 //buffer was very likely not compressed in an old version
927 memcpy(*dest + keylen, src + keylen, objlen);
928 uzlen += objlen;
929 return uzlen;
930 }
931
932 R__unzip(&nin, bufcur, &nbuf, objbuf, &nout);
933
934 if (gDebug > 2)
935 Info("UnzipBuffer", "R__unzip nin:%d, bufcur:%p, nbuf:%d, objbuf:%p, nout:%d",
936 nin, bufcur, nbuf, objbuf, nout);
937
938 if (!nout) break;
939 noutot += nout;
940 if (noutot >= objlen) break;
941 bufcur += nin;
942 objbuf += nout;
943 }
944
945 if (noutot != objlen) {
946 Error("UnzipBuffer", "nbytes = %d, keylen = %d, objlen = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d",
947 nbytes,keylen,objlen, noutot,nout,nin,nbuf);
948 uzlen = -1;
949 if(alloc) delete [] *dest;
950 *dest = 0;
951 return uzlen;
952 }
953 uzlen += objlen;
954 } else {
955 memcpy(*dest, src, keylen);
956 uzlen += keylen;
957 memcpy(*dest + keylen, src + keylen, objlen);
958 uzlen += objlen;
959 }
960 return uzlen;
961}
962
963////////////////////////////////////////////////////////////////////////////////
964
965void TTreeCacheUnzip::Print(Option_t* option) const {
966
967 printf("******TreeCacheUnzip statistics for file: %s ******\n",fFile->GetName());
968 printf("Max allowed mem for pending buffers: %lld\n", fUnzipBufferSize);
969 printf("Number of blocks unzipped by threads: %d\n", fNUnzip);
970 printf("Number of hits: %d\n", fNFound);
971 printf("Number of stalls: %d\n", fNStalls);
972 printf("Number of misses: %d\n", fNMissed);
973
974 TTreeCache::Print(option);
975}
976
977////////////////////////////////////////////////////////////////////////////////
978
981 return TTreeCache::ReadBufferExt(buf, pos, len, loc);
982}
void frombuf(char *&buf, Bool_t *x)
Definition: Bytes.h:280
void Class()
Definition: Class.C:29
#define b(i)
Definition: RSha256.hxx:100
unsigned char Byte_t
Definition: RtypesCore.h:60
int Int_t
Definition: RtypesCore.h:41
short Version_t
Definition: RtypesCore.h:61
unsigned char UChar_t
Definition: RtypesCore.h:34
unsigned int UInt_t
Definition: RtypesCore.h:42
const Bool_t kFALSE
Definition: RtypesCore.h:88
bool Bool_t
Definition: RtypesCore.h:59
short Short_t
Definition: RtypesCore.h:35
double Double_t
Definition: RtypesCore.h:55
long long Long64_t
Definition: RtypesCore.h:69
float Float_t
Definition: RtypesCore.h:53
const Bool_t kTRUE
Definition: RtypesCore.h:87
const char Option_t
Definition: RtypesCore.h:62
#define ClassImp(name)
Definition: Rtypes.h:363
R__EXTERN Int_t gDebug
Definition: Rtypes.h:90
R__EXTERN TEnv * gEnv
Definition: TEnv.h:171
void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout)
int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout)
#define R__LOCKGUARD(mutex)
#define free
Definition: civetweb.c:1539
A class to manage the asynchronous execution of work items.
Definition: TTaskGroup.hxx:21
This class provides a simple interface to execute the same task multiple times in parallel,...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute func (with no arguments) nTimes in parallel.
A TTree is a list of TBranches.
Definition: TBranch.h:64
A chain is a collection of files containing TTree objects.
Definition: TChain.h:33
virtual Int_t GetTreeNumber() const
Definition: TChain.h:116
Long64_t * GetTreeOffset() const
Definition: TChain.h:117
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition: TEnv.cxx:491
A TEventList object is a list of selected events (entries) in a TTree.
Definition: TEventList.h:31
virtual Bool_t ContainsRange(Long64_t entrymin, Long64_t entrymax)
Return TRUE if list contains entries from entrymin to entrymax included.
Definition: TEventList.cxx:171
Int_t * fSeekIndex
[fNseek] sorted index table of fSeek
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
Long64_t * fSeekSort
[fNseek] Position on file of buffers to be prefetched (sorted)
Int_t * fSeekLen
[fNseek] Length of buffers to be prefetched
Int_t fNtot
Total size of prefetched blocks.
virtual void Prefetch(Long64_t pos, Int_t len)
Add block of length len at position pos in the list of blocks to be prefetched.
Long64_t * fSeek
[fNseek] Position on file of buffers to be prefetched
Bool_t fIsTransferred
True when fBuffer contains something valid.
TFile * fFile
Pointer to file.
Int_t fNseek
Number of blocks to be prefetched.
virtual Int_t GetBufferSize() const
virtual void Seek(Long64_t offset, ERelativeTo pos=kBeg)
Seek to a specific position in the file. Pos it either kBeg, kCur or kEnd.
Definition: TFile.cxx:2173
Int_t GetVersion() const
Definition: TFile.h:229
virtual Bool_t ReadBufferAsync(Long64_t offs, Int_t len)
Definition: TFile.cxx:5079
virtual Bool_t ReadBuffer(char *buf, Int_t len)
Read a buffer from the file.
Definition: TFile.cxx:1670
Definition: TMutex.h:30
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:47
TObject * UncheckedAt(Int_t i) const
Definition: TObjArray.h:89
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:866
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:880
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:854
Specialization of TTreeCache for parallel Unzipping.
virtual void StopLearningPhase()
It's the same as TTreeCache::StopLearningPhase but we guarantee that we start the unzipping just afte...
UnzipState_t fUnzipState
Bool_t FillBuffer()
Fill the cache buffer with the branches in the cache.
void Init()
Initialization procedure common to all the constructors.
Int_t UnzipCache(Int_t index)
This inflates a basket in the cache.
Int_t fNMissed
! number of blocks that were not found in the cache and were unzipped
void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
Int_t fNseekMax
! fNseek can change so we need to know its max size
virtual ~TTreeCacheUnzip()
Destructor. (in general called by the TFile destructor)
void UpdateBranches(TTree *tree)
update pointer to current Tree and recompute pointers to the branches in the cache
Int_t fNStalls
! number of hits which caused a stall
static TTreeCacheUnzip::EParUnzipMode fgParallel
Indicate if we want to activate the parallelism.
std::unique_ptr< ROOT::Experimental::TTaskGroup > fUnzipTaskGroup
static Int_t SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option=TTreeCacheUnzip::kEnable)
Static function that (de)activates multithreading unzipping.
Int_t UnzipBuffer(char **dest, char *src)
Unzips a ROOT specific buffer... by reading the header at the beginning.
Int_t GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
Read the logical record header from the buffer buf.
static void SetUnzipRelBufferSize(Float_t relbufferSize)
static function: Sets the unzip relatibe buffer size
void Print(Option_t *option="") const
Print cache statistics.
Int_t fNUnzip
! number of blocks that were unzipped
Int_t CreateTasks()
We create a TTaskGroup and asynchronously maps each group of baskets(> 100 kB in total) to a task.
Bool_t fParallel
Indicate if we want to activate the parallelism (for this instance)
Long64_t fUnzipBufferSize
! Max Size for the ready unzipped blocks (default is 2*fBufferSize)
static Bool_t IsParallelUnzip()
Static function that tells wether the multithreading unzipping is activated.
void SetUnzipBufferSize(Long64_t bufferSize)
Sets the size for the unzipping cache... by default it should be two times the size of the prefetchin...
Int_t fUnzipGroupSize
! Min accumulated size of a group of baskets ready to be unzipped by a IMT task
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
Int_t fNFound
! number of blocks that were found in the cache
virtual Int_t GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, Bool_t *free)
We try to read a buffer that has already been unzipped Returns -1 in case of read failure,...
virtual Int_t AddBranch(TBranch *b, Bool_t subbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
static Double_t fgRelBuffSize
This is the percentage of the TTreeCacheUnzip that will be used.
virtual void ResetCache()
This will delete the list of buffers that are in the unzipping cache and will reset certain values in...
static EParUnzipMode GetParallelUnzip()
Static function that returns the parallel option (to indicate an additional thread)
virtual Int_t AddBranch(TBranch *b, Bool_t subgbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by the user vi...
Definition: TTreeCache.cxx:367
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
Long64_t fEntryMin
! first entry in the cache
Definition: TTreeCache.h:41
Long64_t fEntryNext
! next entry number where cache must be filled
Definition: TTreeCache.h:44
Bool_t fIsLearning
! true if cache is in learning mode
Definition: TTreeCache.h:57
TTree * GetTree() const
Definition: TTreeCache.h:152
virtual void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
Long64_t fEntryMax
! last entry in the cache
Definition: TTreeCache.h:42
Long64_t fEntryCurrent
! current lowest entry number in the cache
Definition: TTreeCache.h:43
Int_t fNReadPref
Number of blocks that were prefetched.
Definition: TTreeCache.h:52
TTree * fTree
! pointer to the current Tree
Definition: TTreeCache.h:56
virtual void StopLearningPhase()
This is the counterpart of StartLearningPhase() and can be used to stop the learning phase.
Int_t fNbranches
! Number of branches in the cache
Definition: TTreeCache.h:47
virtual void Print(Option_t *option="") const
Print cache statistics.
TObjArray * fBranches
! List of branches to be stored in the cache
Definition: TTreeCache.h:54
Helper class to iterate over cluster of baskets.
Definition: TTree.h:247
Long64_t GetNextEntry()
Definition: TTree.h:284
A TTree object has a header with a name and a title.
Definition: TTree.h:71
TEventList * GetEventList() const
Definition: TTree.h:412
Long64_t BinarySearch(Long64_t n, const T *array, T value)
Definition: TMathBase.h:278
Definition: tree.py:1
Bool_t IsFinished(Int_t index) const
void Reset(Int_t oldSize, Int_t newSize)
Reset all baskets' state arrays.
void Clear(Int_t size)
Clear all baskets' state arrays.
Bool_t IsProgress(Int_t index) const
std::atomic< Byte_t > * fUnzipStatus
! [fNSeek]
void SetUnzipped(Int_t index, char *buf, Int_t len)
std::vector< Int_t > fUnzipLen
! [fNseek] Length of the unzipped buffers
Bool_t TryUnzipping(Int_t index)
Start unzipping the basket if it is untouched yet.
Bool_t IsUnzipped(Int_t index) const
Check if the basket is unzipped already.
Bool_t IsUntouched(Int_t index) const
void SetFinished(Int_t index)
Set cache as finished.
std::unique_ptr< char[]> * fUnzipChunks
! [fNseek] Individual unzipped chunks. Their summed size is kept under control.
auto * l
Definition: textangle.C:4
#define dest(otri, vertexptr)
Definition: triangle.c:1040