Logo ROOT   6.16/01
Reference Guide
TFilePrefetch.cxx
Go to the documentation of this file.
1// @(#)root/io:$Id$
2// Author: Elvin Sindrilaru 19/05/2011
3
4/*************************************************************************
5 * Copyright (C) 1995-2011, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#include "TFilePrefetch.h"
13#include "TTimeStamp.h"
14#include "TVirtualPerfStats.h"
15#include "TVirtualMonitoring.h"
16
17#include <iostream>
18#include <string>
19#include <sstream>
20#include <cstdio>
21#include <cstdlib>
22#include <cctype>
23#include <cassert>
24
25static const int kMAX_READ_SIZE = 2; //maximum size of the read list of blocks
26
27inline int xtod(char c) { return (c>='0' && c<='9') ? c-'0' : ((c>='A' && c<='F') ? c-'A'+10 : ((c>='a' && c<='f') ? c-'a'+10 : 0)); }
28
29using namespace std;
30
32
33/**
34\class TFilePrefetch
35\ingroup IO
36
37The prefetching mechanism uses two classes (TFilePrefetch and
38TFPBlock) to prefetch in advance a block of tree entries. There is
39a thread which takes care of actually transferring the blocks and
40making them available to the main requesting thread. Therefore,
41the time spent by the main thread waiting for the data before
42processing considerably decreases. Besides the prefetching
43mechanisms there is also a local caching option which can be
44enabled by the user. Both capabilities are disabled by default
45and must be explicitly enabled by the user.
46*/
47
48
49////////////////////////////////////////////////////////////////////////////////
50/// Constructor.
51
53 fFile(file),
54 fConsumer(0),
55 fThreadJoined(kTRUE),
56 fPrefetchFinished(kFALSE)
57{
58 fPendingBlocks = new TList();
59 fReadBlocks = new TList();
60
63
65}
66
67////////////////////////////////////////////////////////////////////////////////
68/// Destructor
69
71{
72 if (!fThreadJoined) {
74 }
75
80}
81
82
83////////////////////////////////////////////////////////////////////////////////
84/// Killing the async prefetching thread
85
87{
88 // Inform the consumer thread that prefetching is over
89 {
90 std::lock_guard<std::mutex> lk(fMutexPendingList);
92 }
93 fNewBlockAdded.notify_one();
94
95 fConsumer->Join();
98}
99
100
101////////////////////////////////////////////////////////////////////////////////
102/// Read one block and insert it in prefetchBuffers list.
103
105{
106 char* path = 0;
107
108 if (CheckBlockInCache(path, block)){
109 block->SetBuffer(GetBlockFromCache(path, block->GetDataSize()));
110 inCache = kTRUE;
111 }
112 else{
113 fFile->ReadBuffers(block->GetBuffer(), block->GetPos(), block->GetLen(), block->GetNoElem());
114 if (fFile->GetArchive()) {
115 for (Int_t i = 0; i < block->GetNoElem(); i++)
116 block->SetPos(i, block->GetPos(i) - fFile->GetArchiveOffset());
117 }
118 inCache =kFALSE;
119 }
120 delete[] path;
121}
122
123////////////////////////////////////////////////////////////////////////////////
124/// Get blocks specified in prefetchBlocks.
125
127{
128 Bool_t inCache = kFALSE;
129 TFPBlock* block = 0;
130
131 while((block = GetPendingBlock())){
132 ReadAsync(block, inCache);
133 AddReadBlock(block);
134 if (!inCache)
135 SaveBlockInCache(block);
136 }
137}
138
139////////////////////////////////////////////////////////////////////////////////
140/// Search for a requested element in a block and return the index.
141
143{
144 Int_t first = 0, last = -1, mid = -1;
145 last = (Int_t) blockObj->GetNoElem()-1;
146
147 while (first <= last){
148 mid = first + (last - first) / 2;
149 if ((offset >= blockObj->GetPos(mid) && offset <= (blockObj->GetPos(mid) + blockObj->GetLen(mid))
150 && ( (offset + len) <= blockObj->GetPos(mid) + blockObj->GetLen(mid)))){
151
152 *index = mid;
153 return true;
154 }
155 else if (blockObj->GetPos(mid) < offset){
156 first = mid + 1;
157 }
158 else{
159 last = mid - 1;
160 }
161 }
162 return false;
163}
164
165////////////////////////////////////////////////////////////////////////////////
166/// Return the time spent wating for buffer to be read in microseconds.
167
169{
170 return Long64_t(fWaitTime.RealTime()*1.e+6);
171}
172
173////////////////////////////////////////////////////////////////////////////////
174/// Return a prefetched element.
175
177{
178 Bool_t found = false;
179 TFPBlock* blockObj = 0;
180 Int_t index = -1;
181
182 std::unique_lock<std::mutex> lk(fMutexReadList);
183 while (1){
184 TIter iter(fReadBlocks);
185 while ((blockObj = (TFPBlock*) iter.Next())){
186 index = -1;
187 if (BinarySearchReadList(blockObj, offset, len, &index)){
188 found = true;
189 break;
190 }
191 }
192 if (found)
193 break;
194 else{
196 fReadBlockAdded.wait(lk); //wait for a new block to be added
197 fWaitTime.Stop();
198 }
199 }
200
201 if (found){
202 char *pBuff = blockObj->GetPtrToPiece(index);
203 pBuff += (offset - blockObj->GetPos(index));
204 memcpy(buf, pBuff, len);
205 }
206 return found;
207}
208
209////////////////////////////////////////////////////////////////////////////////
210/// Create a TFPBlock object or recycle one and add it to the prefetchBlocks list.
211
212void TFilePrefetch::ReadBlock(Long64_t* offset, Int_t* len, Int_t nblock)
213{
214 TFPBlock* block = CreateBlockObj(offset, len, nblock);
215 AddPendingBlock(block);
216}
217
218////////////////////////////////////////////////////////////////////////////////
219/// Safe method to add a block to the pendingList.
220
222{
223 fMutexPendingList.lock();
224 fPendingBlocks->Add(block);
225 fMutexPendingList.unlock();
226
227 fNewBlockAdded.notify_one();
228}
229
230////////////////////////////////////////////////////////////////////////////////
231/// Safe method to remove a block from the pendingList.
232
234{
235 TFPBlock* block = 0;
236
237 // Use the semaphore to deal with the case when the file pointer
238 // is changed on the fly by TChain
240 std::unique_lock<std::mutex> lk(fMutexPendingList);
241 // Wait unless there is a pending block or prefetching is over
242 fNewBlockAdded.wait(lk, [&]{ return fPendingBlocks->GetSize() > 0 || fPrefetchFinished; });
243 lk.unlock();
245
246 lk.lock();
247 if (fPendingBlocks->GetSize()){
248 block = (TFPBlock*)fPendingBlocks->First();
249 block = (TFPBlock*)fPendingBlocks->Remove(block);
250 }
251 return block;
252}
253
254////////////////////////////////////////////////////////////////////////////////
255/// Safe method to add a block to the readList.
256
258{
259 fMutexReadList.lock();
260
262 TFPBlock* movedBlock = (TFPBlock*) fReadBlocks->First();
263 movedBlock = (TFPBlock*)fReadBlocks->Remove(movedBlock);
264 delete movedBlock;
265 movedBlock = 0;
266 }
267
268 fReadBlocks->Add(block);
269 fMutexReadList.unlock();
270
271 //signal the addition of a new block
272 fReadBlockAdded.notify_one();
273}
274
275
276////////////////////////////////////////////////////////////////////////////////
277/// Create a new block or recycle an old one.
278
280{
281 TFPBlock* blockObj = 0;
282
283 fMutexReadList.lock();
284
286 blockObj = static_cast<TFPBlock*>(fReadBlocks->First());
287 fReadBlocks->Remove(blockObj);
288 fMutexReadList.unlock();
289 blockObj->ReallocBlock(offset, len, noblock);
290 }
291 else{
292 fMutexReadList.unlock();
293 blockObj = new TFPBlock(offset, len, noblock);
294 }
295 return blockObj;
296}
297
298////////////////////////////////////////////////////////////////////////////////
299/// Return reference to the consumer thread.
300
302{
303 return fConsumer;
304}
305
306
307////////////////////////////////////////////////////////////////////////////////
308/// Change the file
309///
310/// When prefetching is enabled we also need to:
311/// - make sure the async thread is not doing any work
312/// - clear all blocks from prefetching and read list
313/// - reset the file pointer
314
316{
317 if (action == TFile::kDisconnect) {
318 if (!fThreadJoined) {
320 }
321
322 if (fFile) {
323 // Remove all pending and read blocks
324 fMutexPendingList.lock();
326 fMutexPendingList.unlock();
327
328 fMutexReadList.lock();
330 fMutexReadList.unlock();
331 }
332
333 fFile = file;
334 if (!fThreadJoined) {
336 }
337 } else {
338 // kDoNotDisconnect must reconnect to the same file
339 assert((fFile == file) && "kDoNotDisconnect must reattach to the same file");
340 }
341}
342
343
344////////////////////////////////////////////////////////////////////////////////
345/// Used to start the consumer thread.
346
348{
349 int rc;
350
351 fConsumer = new TThread((TThread::VoidRtnFunc_t) ThreadProc, (void*) this);
352 rc = fConsumer->Run();
353 if ( !rc ) {
355 }
356 return rc;
357}
358
359
360////////////////////////////////////////////////////////////////////////////////
361/// Execution loop of the consumer thread.
362
364{
365 TFilePrefetch* pClass = (TFilePrefetch*) arg;
366
367 while (!pClass->IsPrefetchFinished()) {
368 pClass->ReadListOfBlocks();
369 }
370
371 return (TThread::VoidRtnFunc_t) 1;
372}
373
374//############################# CACHING PART ###################################
375
376////////////////////////////////////////////////////////////////////////////////
377/// Sum up individual hex values to obtain a decimal value.
378
380{
381 Int_t result = 0;
382 const char* ptr = hex;
383
384 for(Int_t i=0; i < (Int_t)strlen(hex); i++)
385 result += xtod(ptr[i]);
386
387 return result;
388}
389
390////////////////////////////////////////////////////////////////////////////////
391/// Test if the block is in cache.
392
394{
395 if (fPathCache == "")
396 return false;
397
398 Bool_t found = false;
399 TString fullPath(fPathCache); // path of the cached files.
400
401 Int_t value = 0;
402
403 if (!gSystem->OpenDirectory(fullPath))
404 gSystem->mkdir(fullPath);
405
406 //dir is SHA1 value modulo 16; filename is the value of the SHA1(offset+len)
407 TMD5* md = new TMD5();
408
409 TString concatStr;
410 for (Int_t i=0; i < block->GetNoElem(); i++){
411 concatStr.Form("%lld", block->GetPos(i));
412 md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
413 }
414
415 md->Final();
416 TString fileName( md->AsString() );
417 value = SumHex(fileName);
418 value = value % 16;
419 TString dirName;
420 dirName.Form("%i", value);
421
422 fullPath += "/" + dirName + "/" + fileName;
423
424 FileStat_t stat;
425 if (gSystem->GetPathInfo(fullPath, stat) == 0) {
426 path = new char[fullPath.Length() + 1];
427 strlcpy(path, fullPath,fullPath.Length() + 1);
428 found = true;
429 } else
430 found = false;
431
432 delete md;
433 return found;
434}
435
436////////////////////////////////////////////////////////////////////////////////
437/// Return a buffer from cache.
438
439char* TFilePrefetch::GetBlockFromCache(const char* path, Int_t length)
440{
441 char *buffer = 0;
442 TString strPath = path;
443
444 strPath += "?filetype=raw";
445 TFile* file = new TFile(strPath);
446
447 Double_t start = 0;
448 if (gPerfStats != 0) start = TTimeStamp();
449
450 buffer = (char*) calloc(length, sizeof(char));
451 file->ReadBuffer(buffer, 0, length);
452
453 fFile->fBytesRead += length;
454 fFile->fgBytesRead += length;
455 fFile->SetReadCalls(fFile->GetReadCalls() + 1);
456 fFile->fgReadCalls++;
457
460 if (gPerfStats != 0) {
461 gPerfStats->FileReadEvent(fFile, length, start);
462 }
463
464 file->Close();
465 delete file;
466 return buffer;
467}
468
469////////////////////////////////////////////////////////////////////////////////
470/// Save the block content in cache.
471
473{
474 if (fPathCache == "")
475 return;
476
477 //dir is SHA1 value modulo 16; filename is the value of the SHA1
478 TMD5* md = new TMD5();
479
480 TString concatStr;
481 for(Int_t i=0; i< block->GetNoElem(); i++){
482 concatStr.Form("%lld", block->GetPos(i));
483 md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
484 }
485 md->Final();
486
487 TString fileName( md->AsString() );
488 Int_t value = SumHex(fileName);
489 value = value % 16;
490
491 TString fullPath( fPathCache );
492 TString dirName;
493 dirName.Form("%i", value);
494 fullPath += ("/" + dirName);
495
496 if (!gSystem->OpenDirectory(fullPath))
497 gSystem->mkdir(fullPath);
498
499 TFile* file = 0;
500 fullPath += ("/" + fileName);
501 FileStat_t stat;
502 if (gSystem->GetPathInfo(fullPath, stat) == 0) {
503 fullPath += "?filetype=raw";
504 file = TFile::Open(fullPath, "update");
505 } else{
506 fullPath += "?filetype=raw";
507 file = TFile::Open(fullPath, "new");
508 }
509
510 if (file) {
511 // coverity[unchecked_value] We do not print error message, have not error
512 // return code and close the file anyway, not need to check the return value.
513 file->WriteBuffer(block->GetBuffer(), block->GetDataSize());
514 file->Close();
515 delete file;
516 }
517 delete md;
518}
519
520
521////////////////////////////////////////////////////////////////////////////////
522/// Set the path of the cache directory.
523
525{
526 fPathCache = path;
527
528 if (!gSystem->OpenDirectory(path)){
529 return (!gSystem->mkdir(path) ? true : false);
530 }
531
532 // Directory already exists
533 return true;
534}
535
#define SafeDelete(p)
Definition: RConfig.hxx:529
#define c(i)
Definition: RSha256.hxx:101
int Int_t
Definition: RtypesCore.h:41
unsigned char UChar_t
Definition: RtypesCore.h:34
const Bool_t kFALSE
Definition: RtypesCore.h:88
bool Bool_t
Definition: RtypesCore.h:59
double Double_t
Definition: RtypesCore.h:55
long long Long64_t
Definition: RtypesCore.h:69
const Bool_t kTRUE
Definition: RtypesCore.h:87
#define ClassImp(name)
Definition: Rtypes.h:363
static const int kMAX_READ_SIZE
int xtod(char c)
R__EXTERN TSystem * gSystem
Definition: TSystem.h:540
R__EXTERN TVirtualMonitoringWriter * gMonitoringWriter
#define gPerfStats
#define calloc
Definition: civetweb.c:1537
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Definition: TCollection.h:182
This class represents the encapsulation of a block request.
Definition: TFPBlock.h:22
char * GetPtrToPiece(Int_t index) const
Get block buffer.
Definition: TFPBlock.h:109
void SetPos(Int_t, Long64_t)
Set pos value for index idx.
Definition: TFPBlock.cxx:72
void SetBuffer(char *)
Set block buffer.
Definition: TFPBlock.cxx:81
Long64_t GetDataSize() const
Return size of the data in the block.
Definition: TFPBlock.h:73
void ReallocBlock(Long64_t *, Int_t *, Int_t)
Reallocate the block's buffer based on the length of the elements it will contain.
Definition: TFPBlock.cxx:94
Int_t GetLen(Int_t) const
Get length of the element at index i.
Definition: TFPBlock.h:97
Int_t GetNoElem() const
Return number of elements in the block.
Definition: TFPBlock.h:85
Long64_t GetPos(Int_t) const
Get position of the element at index i.
Definition: TFPBlock.h:91
char * GetBuffer() const
Get block buffer.
Definition: TFPBlock.h:103
The prefetching mechanism uses two classes (TFilePrefetch and TFPBlock) to prefetch in advance a bloc...
Definition: TFilePrefetch.h:31
Bool_t BinarySearchReadList(TFPBlock *, Long64_t, Int_t, Int_t *)
Search for a requested element in a block and return the index.
void ReadBlock(Long64_t *, Int_t *, Int_t)
Create a TFPBlock object or recycle one and add it to the prefetchBlocks list.
static TThread::VoidRtnFunc_t ThreadProc(void *)
Execution loop of the consumer thread.
Int_t SumHex(const char *)
Sum up individual hex values to obtain a decimal value.
TFilePrefetch(TFile *)
Constructor.
TSemaphore * fSemChangeFile
Definition: TFilePrefetch.h:42
void ReadListOfBlocks()
Get blocks specified in prefetchBlocks.
TFPBlock * CreateBlockObj(Long64_t *, Int_t *, Int_t)
Create a new block or recycle an old one.
TThread * fConsumer
Definition: TFilePrefetch.h:37
std::condition_variable fNewBlockAdded
Definition: TFilePrefetch.h:40
void AddReadBlock(TFPBlock *)
Safe method to add a block to the readList.
Bool_t CheckBlockInCache(char *&, TFPBlock *)
Test if the block is in cache.
Long64_t GetWaitTime()
Return the time spent wating for buffer to be read in microseconds.
Bool_t fThreadJoined
Definition: TFilePrefetch.h:45
Bool_t SetCache(const char *)
Set the path of the cache directory.
TList * fReadBlocks
Definition: TFilePrefetch.h:36
Int_t ThreadStart()
Used to start the consumer thread.
std::atomic< Bool_t > fPrefetchFinished
Definition: TFilePrefetch.h:46
void ReadAsync(TFPBlock *, Bool_t &)
Read one block and insert it in prefetchBuffers list.
TFPBlock * GetPendingBlock()
Safe method to remove a block from the pendingList.
void AddPendingBlock(TFPBlock *)
Safe method to add a block to the pendingList.
Bool_t ReadBuffer(char *, Long64_t, Int_t)
Return a prefetched element.
virtual ~TFilePrefetch()
Destructor.
TString fPathCache
Definition: TFilePrefetch.h:43
std::mutex fMutexReadList
Definition: TFilePrefetch.h:39
TStopwatch fWaitTime
Definition: TFilePrefetch.h:44
std::mutex fMutexPendingList
Definition: TFilePrefetch.h:38
std::condition_variable fReadBlockAdded
Definition: TFilePrefetch.h:41
void WaitFinishPrefetch()
Killing the async prefetching thread.
TThread * GetThread() const
Return reference to the consumer thread.
TList * fPendingBlocks
Definition: TFilePrefetch.h:35
void SetFile(TFile *file, TFile::ECacheAction action=TFile::kDisconnect)
Change the file.
char * GetBlockFromCache(const char *, Int_t)
Return a buffer from cache.
void SaveBlockInCache(TFPBlock *)
Save the block content in cache.
Bool_t IsPrefetchFinished() const
Definition: TFilePrefetch.h:80
ECacheAction
TTreeCache flushing semantics.
Definition: TFile.h:65
@ kDisconnect
Definition: TFile.h:65
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseGeneralPurpose, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:3975
TObject * Next()
Definition: TCollection.h:249
A doubly linked list.
Definition: TList.h:44
virtual void Add(TObject *obj)
Definition: TList.h:87
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition: TList.cxx:818
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition: TList.cxx:655
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Definition: TList.cxx:399
This code implements the MD5 message-digest algorithm.
Definition: TMD5.h:44
const char * AsString() const
Return message digest as string.
Definition: TMD5.cxx:220
void Update(const UChar_t *buf, UInt_t len)
Update TMD5 object to reflect the concatenation of another buffer full of bytes.
Definition: TMD5.cxx:108
void Final()
MD5 finalization, ends an MD5 message-digest operation, writing the the message digest and zeroizing ...
Definition: TMD5.cxx:167
Int_t Post()
Increment the value of the semaphore.
Definition: TSemaphore.cxx:103
Int_t Wait()
If the semaphore value is > 0 then decrement it and carry on, else block, waiting on the condition un...
Definition: TSemaphore.cxx:35
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
Definition: TStopwatch.cxx:110
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Definition: TStopwatch.cxx:58
void Stop()
Stop the stopwatch.
Definition: TStopwatch.cxx:77
virtual void * OpenDirectory(const char *name)
Open a directory. Returns 0 if directory does not exist.
Definition: TSystem.cxx:843
virtual int mkdir(const char *name, Bool_t recursive=kFALSE)
Make a file system directory.
Definition: TSystem.cxx:913
int GetPathInfo(const char *path, Long_t *id, Long_t *size, Long_t *flags, Long_t *modtime)
Get info about a file: id, size, flags, modification time.
Definition: TSystem.cxx:1388
Long_t Join(void **ret=0)
Join this thread.
Definition: TThread.cxx:508
Int_t Run(void *arg=0)
Start the thread.
Definition: TThread.cxx:561
void *(* VoidRtnFunc_t)(void *)
Definition: TThread.h:49
The TTimeStamp encapsulates seconds and ns since EPOCH.
Definition: TTimeStamp.h:71
virtual Bool_t SendFileReadProgress(TFile *)
Definition: file.py:1
Definition: first.py:1
STL namespace.