Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TFilePrefetch.cxx
Go to the documentation of this file.
1// @(#)root/io:$Id$
2// Author: Elvin Sindrilaru 19/05/2011
3
4/*************************************************************************
5 * Copyright (C) 1995-2011, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#include "TFilePrefetch.h"
13#include "TTimeStamp.h"
14#include "TSystem.h"
15#include "TMD5.h"
16#include "TVirtualPerfStats.h"
17#include "TVirtualMonitoring.h"
18#include "TSemaphore.h"
19#include "TFPBlock.h"
20#include "strlcpy.h"
21
22#include <string>
23#include <sstream>
24#include <cstdio>
25#include <cstdlib>
26#include <cctype>
27#include <cassert>
28
29static const int kMAX_READ_SIZE = 2; //maximum size of the read list of blocks
30
31inline int xtod(char c) { return (c>='0' && c<='9') ? c-'0' : ((c>='A' && c<='F') ? c-'A'+10 : ((c>='a' && c<='f') ? c-'a'+10 : 0)); }
32
33
34/**
35\class TFilePrefetch
36\ingroup IO
37
38The prefetching mechanism uses two classes (TFilePrefetch and
39TFPBlock) to prefetch in advance a block of tree entries. There is
40a thread which takes care of actually transferring the blocks and
41making them available to the main requesting thread. Therefore,
42the time spent by the main thread waiting for the data before
43processing considerably decreases. Besides the prefetching
44mechanisms there is also a local caching option which can be
45enabled by the user. Both capabilities are disabled by default
46and must be explicitly enabled by the user.
47*/
48
49
50////////////////////////////////////////////////////////////////////////////////
51/// Constructor.
52
54 fFile(file),
55 fConsumer(0),
56 fThreadJoined(kTRUE),
57 fPrefetchFinished(kFALSE)
58{
59 fPendingBlocks = new TList();
60 fReadBlocks = new TList();
61
64
66}
67
68////////////////////////////////////////////////////////////////////////////////
69/// Destructor
70
82
83
84////////////////////////////////////////////////////////////////////////////////
85/// Killing the async prefetching thread
86
88{
89 // Inform the consumer thread that prefetching is over
90 {
91 std::lock_guard<std::mutex> lk(fMutexPendingList);
93 }
94 fNewBlockAdded.notify_one();
95
96 fConsumer->Join();
99}
100
101
102////////////////////////////////////////////////////////////////////////////////
103/// Read one block and insert it in prefetchBuffers list.
104
106{
107 char* path = 0;
108
109 if (CheckBlockInCache(path, block)){
110 block->SetBuffer(GetBlockFromCache(path, block->GetDataSize()));
111 inCache = kTRUE;
112 }
113 else{
114 fFile->ReadBuffers(block->GetBuffer(), block->GetPos(), block->GetLen(), block->GetNoElem());
115 if (fFile->GetArchive()) {
116 for (Int_t i = 0; i < block->GetNoElem(); i++)
117 block->SetPos(i, block->GetPos(i) - fFile->GetArchiveOffset());
118 }
120 }
121 delete[] path;
122}
123
124////////////////////////////////////////////////////////////////////////////////
125/// Get blocks specified in prefetchBlocks.
126
128{
130 TFPBlock* block = 0;
131
132 while((block = GetPendingBlock())){
135 if (!inCache)
137 }
138}
139
140////////////////////////////////////////////////////////////////////////////////
141/// Search for a requested element in a block and return the index.
142
144{
145 Int_t first = 0, last = -1, mid = -1;
146 last = (Int_t) blockObj->GetNoElem()-1;
147
148 while (first <= last){
149 mid = first + (last - first) / 2;
150 if ((offset >= blockObj->GetPos(mid) && offset <= (blockObj->GetPos(mid) + blockObj->GetLen(mid))
151 && ( (offset + len) <= blockObj->GetPos(mid) + blockObj->GetLen(mid)))){
152
153 *index = mid;
154 return true;
155 }
156 else if (blockObj->GetPos(mid) < offset){
157 first = mid + 1;
158 }
159 else{
160 last = mid - 1;
161 }
162 }
163 return false;
164}
165
166////////////////////////////////////////////////////////////////////////////////
167/// Return the time spent wating for buffer to be read in microseconds.
168
173
174////////////////////////////////////////////////////////////////////////////////
175/// Return a prefetched element.
176
178{
179 Bool_t found = false;
180 TFPBlock* blockObj = 0;
181 Int_t index = -1;
182
183 std::unique_lock<std::mutex> lk(fMutexReadList);
184 while (1){
185 TIter iter(fReadBlocks);
186 while ((blockObj = (TFPBlock*) iter.Next())){
187 index = -1;
189 found = true;
190 break;
191 }
192 }
193 if (found)
194 break;
195 else{
197 fReadBlockAdded.wait(lk); //wait for a new block to be added
198 fWaitTime.Stop();
199 }
200 }
201
202 if (found){
203 char *pBuff = blockObj->GetPtrToPiece(index);
204 pBuff += (offset - blockObj->GetPos(index));
205 memcpy(buf, pBuff, len);
206 }
207 return found;
208}
209
210////////////////////////////////////////////////////////////////////////////////
211/// Create a TFPBlock object or recycle one and add it to the prefetchBlocks list.
212
218
219////////////////////////////////////////////////////////////////////////////////
220/// Safe method to add a block to the pendingList.
221
223{
224 fMutexPendingList.lock();
226 fMutexPendingList.unlock();
227
228 fNewBlockAdded.notify_one();
229}
230
231////////////////////////////////////////////////////////////////////////////////
232/// Safe method to remove a block from the pendingList.
233
235{
236 TFPBlock* block = 0;
237
238 // Use the semaphore to deal with the case when the file pointer
239 // is changed on the fly by TChain
241 std::unique_lock<std::mutex> lk(fMutexPendingList);
242 // Wait unless there is a pending block or prefetching is over
243 fNewBlockAdded.wait(lk, [&]{ return fPendingBlocks->GetSize() > 0 || fPrefetchFinished; });
244 lk.unlock();
246
247 lk.lock();
248 if (fPendingBlocks->GetSize()){
251 }
252 return block;
253}
254
255////////////////////////////////////////////////////////////////////////////////
256/// Safe method to add a block to the readList.
257
259{
260 fMutexReadList.lock();
261
265 delete movedBlock;
266 movedBlock = 0;
267 }
268
270 fMutexReadList.unlock();
271
272 //signal the addition of a new block
273 fReadBlockAdded.notify_one();
274}
275
276
277////////////////////////////////////////////////////////////////////////////////
278/// Create a new block or recycle an old one.
279
281{
282 TFPBlock* blockObj = 0;
283
284 fMutexReadList.lock();
285
287 blockObj = static_cast<TFPBlock*>(fReadBlocks->First());
289 fMutexReadList.unlock();
290 blockObj->ReallocBlock(offset, len, noblock);
291 }
292 else{
293 fMutexReadList.unlock();
295 }
296 return blockObj;
297}
298
299////////////////////////////////////////////////////////////////////////////////
300/// Return reference to the consumer thread.
301
303{
304 return fConsumer;
305}
306
307
308////////////////////////////////////////////////////////////////////////////////
309/// Change the file
310///
311/// When prefetching is enabled we also need to:
312/// - make sure the async thread is not doing any work
313/// - clear all blocks from prefetching and read list
314/// - reset the file pointer
315
317{
318 if (action == TFile::kDisconnect) {
319 if (!fThreadJoined) {
321 }
322
323 if (fFile) {
324 // Remove all pending and read blocks
325 fMutexPendingList.lock();
327 fMutexPendingList.unlock();
328
329 fMutexReadList.lock();
331 fMutexReadList.unlock();
332 }
333
334 fFile = file;
335 if (!fThreadJoined) {
337 }
338 } else {
339 // kDoNotDisconnect must reconnect to the same file
340 assert((fFile == file) && "kDoNotDisconnect must reattach to the same file");
341 }
342}
343
344
345////////////////////////////////////////////////////////////////////////////////
346/// Used to start the consumer thread.
347
349{
350 int rc;
351
352 fConsumer = new TThread((TThread::VoidRtnFunc_t) ThreadProc, (void*) this);
353 rc = fConsumer->Run();
354 if ( !rc ) {
356 }
357 return rc;
358}
359
360
361////////////////////////////////////////////////////////////////////////////////
362/// Execution loop of the consumer thread.
363
365{
367
368 while (!pClass->IsPrefetchFinished()) {
369 pClass->ReadListOfBlocks();
370 }
371
372 return (TThread::VoidRtnFunc_t) 1;
373}
374
375//############################# CACHING PART ###################################
376
377////////////////////////////////////////////////////////////////////////////////
378/// Sum up individual hex values to obtain a decimal value.
379
381{
382 Int_t result = 0;
383 const char* ptr = hex;
384
385 for(Int_t i=0; i < (Int_t)strlen(hex); i++)
386 result += xtod(ptr[i]);
387
388 return result;
389}
390
391////////////////////////////////////////////////////////////////////////////////
392/// Test if the block is in cache.
393
395{
396 if (fPathCache == "")
397 return false;
398
399 Bool_t found = false;
400 TString fullPath(fPathCache); // path of the cached files.
401
402 Int_t value = 0;
403
404 void *fdir = gSystem->OpenDirectory(fullPath);
405 if (!fdir)
407 else {
408 gSystem->FreeDirectory(fdir);
409 }
410 //dir is SHA1 value modulo 16; filename is the value of the SHA1(offset+len)
411 TMD5* md = new TMD5();
412
414 for (Int_t i=0; i < block->GetNoElem(); i++){
415 concatStr.Form("%lld", block->GetPos(i));
416 md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
417 }
418
419 md->Final();
420 TString fileName( md->AsString() );
421 value = SumHex(fileName);
422 value = value % 16;
424 dirName.Form("%i", value);
425
426 fullPath += "/" + dirName + "/" + fileName;
427
428 FileStat_t stat;
429 if (gSystem->GetPathInfo(fullPath, stat) == 0) {
430 path = new char[fullPath.Length() + 1];
431 strlcpy(path, fullPath,fullPath.Length() + 1);
432 found = true;
433 } else
434 found = false;
435
436 delete md;
437 return found;
438}
439
440////////////////////////////////////////////////////////////////////////////////
441/// Return a buffer from cache.
442
444{
445 char *buffer = 0;
446 TString strPath = path;
447
448 strPath += "?filetype=raw";
449 TFile* file = new TFile(strPath);
450
451 Double_t start = 0;
452 if (gPerfStats != 0) start = TTimeStamp();
453
454 buffer = (char*) calloc(length, sizeof(char));
455 file->ReadBuffer(buffer, 0, length);
456
461
464 if (gPerfStats != 0) {
465 gPerfStats->FileReadEvent(fFile, length, start);
466 }
467
468 file->Close();
469 delete file;
470 return buffer;
471}
472
473////////////////////////////////////////////////////////////////////////////////
474/// Save the block content in cache.
475
477{
478 if (fPathCache == "")
479 return;
480
481 //dir is SHA1 value modulo 16; filename is the value of the SHA1
482 TMD5* md = new TMD5();
483
485 for(Int_t i=0; i< block->GetNoElem(); i++){
486 concatStr.Form("%lld", block->GetPos(i));
487 md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
488 }
489 md->Final();
490
491 TString fileName( md->AsString() );
492 Int_t value = SumHex(fileName);
493 value = value % 16;
494
497 dirName.Form("%i", value);
498 fullPath += ("/" + dirName);
499
500 void *fdir = gSystem->OpenDirectory(fullPath);
501 if (!fdir)
503 else {
504 gSystem->FreeDirectory(fdir);
505 }
506
507 TFile* file = 0;
508 fullPath += ("/" + fileName);
509 FileStat_t stat;
510 if (gSystem->GetPathInfo(fullPath, stat) == 0) {
511 fullPath += "?filetype=raw";
512 file = TFile::Open(fullPath, "update");
513 } else{
514 fullPath += "?filetype=raw";
515 file = TFile::Open(fullPath, "new");
516 }
517
518 if (file) {
519 // coverity[unchecked_value] We do not print error message, have not error
520 // return code and close the file anyway, not need to check the return value.
521 file->WriteBuffer(block->GetBuffer(), block->GetDataSize());
522 file->Close();
523 delete file;
524 }
525 delete md;
526}
527
528
529////////////////////////////////////////////////////////////////////////////////
530/// Set the path of the cache directory.
531
533{
534 fPathCache = path;
535
536 void *fdir = gSystem->OpenDirectory(path);
537 if (!fdir) {
538 return (!gSystem->mkdir(path) ? true : false);
539 } else {
540 gSystem->FreeDirectory(fdir);
541 }
542
543 // Directory already exists
544 return true;
545}
546
#define SafeDelete(p)
Definition RConfig.hxx:533
#define c(i)
Definition RSha256.hxx:101
int Int_t
Signed integer 4 bytes (int)
Definition RtypesCore.h:59
constexpr Bool_t kFALSE
Definition RtypesCore.h:108
long long Long64_t
Portable signed long integer 8 bytes.
Definition RtypesCore.h:83
constexpr Bool_t kTRUE
Definition RtypesCore.h:107
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
static const int kMAX_READ_SIZE
int xtod(char c)
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h offset
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t index
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void value
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
R__EXTERN TSystem * gSystem
Definition TSystem.h:572
R__EXTERN TVirtualMonitoringWriter * gMonitoringWriter
#define gPerfStats
#define calloc
Definition civetweb.c:1576
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
This class represents the encapsulation of a block request.
Definition TFPBlock.h:22
The prefetching mechanism uses two classes (TFilePrefetch and TFPBlock) to prefetch in advance a bloc...
Bool_t BinarySearchReadList(TFPBlock *, Long64_t, Int_t, Int_t *)
Search for a requested element in a block and return the index.
void ReadBlock(Long64_t *, Int_t *, Int_t)
Create a TFPBlock object or recycle one and add it to the prefetchBlocks list.
static TThread::VoidRtnFunc_t ThreadProc(void *)
Execution loop of the consumer thread.
Int_t SumHex(const char *)
Sum up individual hex values to obtain a decimal value.
TFilePrefetch(TFile *)
Constructor.
TSemaphore * fSemChangeFile
semaphore used when changing a file in TChain
void ReadListOfBlocks()
Get blocks specified in prefetchBlocks.
TFPBlock * CreateBlockObj(Long64_t *, Int_t *, Int_t)
Create a new block or recycle an old one.
TThread * fConsumer
consumer thread
std::condition_variable fNewBlockAdded
signal the addition of a new pending block
void AddReadBlock(TFPBlock *)
Safe method to add a block to the readList.
Bool_t CheckBlockInCache(char *&, TFPBlock *)
Test if the block is in cache.
Long64_t GetWaitTime()
Return the time spent wating for buffer to be read in microseconds.
Bool_t fThreadJoined
mark if async thread was joined
Bool_t SetCache(const char *)
Set the path of the cache directory.
TList * fReadBlocks
list of blocks read
Int_t ThreadStart()
Used to start the consumer thread.
std::atomic< Bool_t > fPrefetchFinished
true if prefetching is over
void ReadAsync(TFPBlock *, Bool_t &)
Read one block and insert it in prefetchBuffers list.
TFPBlock * GetPendingBlock()
Safe method to remove a block from the pendingList.
void AddPendingBlock(TFPBlock *)
Safe method to add a block to the pendingList.
Bool_t ReadBuffer(char *, Long64_t, Int_t)
Return a prefetched element.
TString fPathCache
path to the cache directory
std::mutex fMutexReadList
mutex for the list of read blocks
TStopwatch fWaitTime
time waiting to prefetch a buffer (in usec)
std::mutex fMutexPendingList
mutex for the pending list
std::condition_variable fReadBlockAdded
signal the addition of a new red block
void WaitFinishPrefetch()
Killing the async prefetching thread.
TThread * GetThread() const
Return reference to the consumer thread.
TFile * fFile
reference to the file
TList * fPendingBlocks
list of pending blocks to be read
void SetFile(TFile *file, TFile::ECacheAction action=TFile::kDisconnect)
Change the file.
char * GetBlockFromCache(const char *, Int_t)
Return a buffer from cache.
void SaveBlockInCache(TFPBlock *)
Save the block content in cache.
~TFilePrefetch() override
Destructor.
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Definition TFile.h:131
static std::atomic< Long64_t > fgBytesRead
Number of bytes read by all TFile objects.
Definition TFile.h:161
Long64_t fBytesRead
Number of bytes read from this file.
Definition TFile.h:155
Long64_t GetArchiveOffset() const
Definition TFile.h:300
virtual Bool_t ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
Read the nbuf blocks described in arrays pos and len.
Definition TFile.cxx:1851
virtual Int_t GetReadCalls() const
Definition TFile.h:323
virtual void SetReadCalls(Int_t readcalls=0)
Definition TFile.h:373
virtual Bool_t WriteBuffer(const char *buf, Int_t len)
Write a buffer to the file.
Definition TFile.cxx:2506
ECacheAction
TTreeCache flushing semantics.
Definition TFile.h:149
@ kDisconnect
Definition TFile.h:149
TArchiveFile * GetArchive() const
Definition TFile.h:299
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:3764
void Close(Option_t *option="") override
Close a file.
Definition TFile.cxx:958
static std::atomic< Int_t > fgReadCalls
Number of bytes read from all TFile objects.
Definition TFile.h:164
virtual Bool_t ReadBuffer(char *buf, Int_t len)
Read a buffer from the file.
Definition TFile.cxx:1800
TObject * Next()
A doubly linked list.
Definition TList.h:38
void Clear(Option_t *option="") override
Remove all objects from the list.
Definition TList.cxx:399
void Add(TObject *obj) override
Definition TList.h:81
TObject * Remove(TObject *obj) override
Remove object from the list.
Definition TList.cxx:819
TObject * First() const override
Return the first object in the list. Returns 0 when list is empty.
Definition TList.cxx:656
This code implements the MD5 message-digest algorithm.
Definition TMD5.h:44
Int_t Post()
Increment the value of the semaphore.
Int_t Wait()
If the semaphore value is > 0 then decrement it and carry on, else block, waiting on the condition un...
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
void Stop()
Stop the stopwatch.
Basic string class.
Definition TString.h:138
virtual void FreeDirectory(void *dirp)
Free a directory.
Definition TSystem.cxx:855
virtual void * OpenDirectory(const char *name)
Open a directory.
Definition TSystem.cxx:846
virtual int mkdir(const char *name, Bool_t recursive=kFALSE)
Make a file system directory.
Definition TSystem.cxx:916
int GetPathInfo(const char *path, Long_t *id, Long_t *size, Long_t *flags, Long_t *modtime)
Get info about a file: id, size, flags, modification time.
Definition TSystem.cxx:1409
<div class="legacybox"><h2>Legacy Code</h2> TThread is a legacy interface: there will be no bug fixes...
Definition TThread.h:40
Long_t Join(void **ret=nullptr)
Join this thread.
Definition TThread.cxx:513
Int_t Run(void *arg=nullptr, const int affinity=-1)
Start the thread.
Definition TThread.cxx:569
void *(* VoidRtnFunc_t)(void *)
Definition TThread.h:52
The TTimeStamp encapsulates seconds and ns since EPOCH.
Definition TTimeStamp.h:45
virtual Bool_t SendFileReadProgress(TFile *)