Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TFilePrefetch.cxx
Go to the documentation of this file.
1// @(#)root/io:$Id$
2// Author: Elvin Sindrilaru 19/05/2011
3
4/*************************************************************************
5 * Copyright (C) 1995-2011, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#include "TFilePrefetch.h"
13#include "TTimeStamp.h"
14#include "TSystem.h"
15#include "TMD5.h"
16#include "TVirtualPerfStats.h"
17#include "TVirtualMonitoring.h"
18#include "TSemaphore.h"
19#include "TFPBlock.h"
20#include "strlcpy.h"
21
22#include <string>
23#include <sstream>
24#include <cstdio>
25#include <cstdlib>
26#include <cctype>
27#include <cassert>
28
29static const int kMAX_READ_SIZE = 2; //maximum size of the read list of blocks
30
31inline int xtod(char c) { return (c>='0' && c<='9') ? c-'0' : ((c>='A' && c<='F') ? c-'A'+10 : ((c>='a' && c<='f') ? c-'a'+10 : 0)); }
32
33using namespace std;
34
36
37/**
38\class TFilePrefetch
39\ingroup IO
40
41The prefetching mechanism uses two classes (TFilePrefetch and
42TFPBlock) to prefetch in advance a block of tree entries. There is
43a thread which takes care of actually transferring the blocks and
44making them available to the main requesting thread. Therefore,
45the time spent by the main thread waiting for the data before
46processing considerably decreases. Besides the prefetching
47mechanisms there is also a local caching option which can be
48enabled by the user. Both capabilities are disabled by default
49and must be explicitly enabled by the user.
50*/
51
52
53////////////////////////////////////////////////////////////////////////////////
54/// Constructor.
55
57 fFile(file),
58 fConsumer(0),
59 fThreadJoined(kTRUE),
60 fPrefetchFinished(kFALSE)
61{
62 fPendingBlocks = new TList();
63 fReadBlocks = new TList();
64
67
69}
70
71////////////////////////////////////////////////////////////////////////////////
72/// Destructor
73
75{
76 if (!fThreadJoined) {
78 }
79
84}
85
86
87////////////////////////////////////////////////////////////////////////////////
88/// Killing the async prefetching thread
89
91{
92 // Inform the consumer thread that prefetching is over
93 {
94 std::lock_guard<std::mutex> lk(fMutexPendingList);
96 }
97 fNewBlockAdded.notify_one();
98
99 fConsumer->Join();
102}
103
104
105////////////////////////////////////////////////////////////////////////////////
106/// Read one block and insert it in prefetchBuffers list.
107
109{
110 char* path = 0;
111
112 if (CheckBlockInCache(path, block)){
113 block->SetBuffer(GetBlockFromCache(path, block->GetDataSize()));
114 inCache = kTRUE;
115 }
116 else{
117 fFile->ReadBuffers(block->GetBuffer(), block->GetPos(), block->GetLen(), block->GetNoElem());
118 if (fFile->GetArchive()) {
119 for (Int_t i = 0; i < block->GetNoElem(); i++)
120 block->SetPos(i, block->GetPos(i) - fFile->GetArchiveOffset());
121 }
122 inCache =kFALSE;
123 }
124 delete[] path;
125}
126
127////////////////////////////////////////////////////////////////////////////////
128/// Get blocks specified in prefetchBlocks.
129
131{
132 Bool_t inCache = kFALSE;
133 TFPBlock* block = 0;
134
135 while((block = GetPendingBlock())){
136 ReadAsync(block, inCache);
137 AddReadBlock(block);
138 if (!inCache)
139 SaveBlockInCache(block);
140 }
141}
142
143////////////////////////////////////////////////////////////////////////////////
144/// Search for a requested element in a block and return the index.
145
147{
148 Int_t first = 0, last = -1, mid = -1;
149 last = (Int_t) blockObj->GetNoElem()-1;
150
151 while (first <= last){
152 mid = first + (last - first) / 2;
153 if ((offset >= blockObj->GetPos(mid) && offset <= (blockObj->GetPos(mid) + blockObj->GetLen(mid))
154 && ( (offset + len) <= blockObj->GetPos(mid) + blockObj->GetLen(mid)))){
155
156 *index = mid;
157 return true;
158 }
159 else if (blockObj->GetPos(mid) < offset){
160 first = mid + 1;
161 }
162 else{
163 last = mid - 1;
164 }
165 }
166 return false;
167}
168
169////////////////////////////////////////////////////////////////////////////////
170/// Return the time spent wating for buffer to be read in microseconds.
171
173{
174 return Long64_t(fWaitTime.RealTime()*1.e+6);
175}
176
177////////////////////////////////////////////////////////////////////////////////
178/// Return a prefetched element.
179
181{
182 Bool_t found = false;
183 TFPBlock* blockObj = 0;
184 Int_t index = -1;
185
186 std::unique_lock<std::mutex> lk(fMutexReadList);
187 while (1){
188 TIter iter(fReadBlocks);
189 while ((blockObj = (TFPBlock*) iter.Next())){
190 index = -1;
191 if (BinarySearchReadList(blockObj, offset, len, &index)){
192 found = true;
193 break;
194 }
195 }
196 if (found)
197 break;
198 else{
200 fReadBlockAdded.wait(lk); //wait for a new block to be added
201 fWaitTime.Stop();
202 }
203 }
204
205 if (found){
206 char *pBuff = blockObj->GetPtrToPiece(index);
207 pBuff += (offset - blockObj->GetPos(index));
208 memcpy(buf, pBuff, len);
209 }
210 return found;
211}
212
213////////////////////////////////////////////////////////////////////////////////
214/// Create a TFPBlock object or recycle one and add it to the prefetchBlocks list.
215
216void TFilePrefetch::ReadBlock(Long64_t* offset, Int_t* len, Int_t nblock)
217{
218 TFPBlock* block = CreateBlockObj(offset, len, nblock);
219 AddPendingBlock(block);
220}
221
222////////////////////////////////////////////////////////////////////////////////
223/// Safe method to add a block to the pendingList.
224
226{
227 fMutexPendingList.lock();
228 fPendingBlocks->Add(block);
229 fMutexPendingList.unlock();
230
231 fNewBlockAdded.notify_one();
232}
233
234////////////////////////////////////////////////////////////////////////////////
235/// Safe method to remove a block from the pendingList.
236
238{
239 TFPBlock* block = 0;
240
241 // Use the semaphore to deal with the case when the file pointer
242 // is changed on the fly by TChain
244 std::unique_lock<std::mutex> lk(fMutexPendingList);
245 // Wait unless there is a pending block or prefetching is over
246 fNewBlockAdded.wait(lk, [&]{ return fPendingBlocks->GetSize() > 0 || fPrefetchFinished; });
247 lk.unlock();
249
250 lk.lock();
251 if (fPendingBlocks->GetSize()){
252 block = (TFPBlock*)fPendingBlocks->First();
253 block = (TFPBlock*)fPendingBlocks->Remove(block);
254 }
255 return block;
256}
257
258////////////////////////////////////////////////////////////////////////////////
259/// Safe method to add a block to the readList.
260
262{
263 fMutexReadList.lock();
264
266 TFPBlock* movedBlock = (TFPBlock*) fReadBlocks->First();
267 movedBlock = (TFPBlock*)fReadBlocks->Remove(movedBlock);
268 delete movedBlock;
269 movedBlock = 0;
270 }
271
272 fReadBlocks->Add(block);
273 fMutexReadList.unlock();
274
275 //signal the addition of a new block
276 fReadBlockAdded.notify_one();
277}
278
279
280////////////////////////////////////////////////////////////////////////////////
281/// Create a new block or recycle an old one.
282
284{
285 TFPBlock* blockObj = 0;
286
287 fMutexReadList.lock();
288
290 blockObj = static_cast<TFPBlock*>(fReadBlocks->First());
291 fReadBlocks->Remove(blockObj);
292 fMutexReadList.unlock();
293 blockObj->ReallocBlock(offset, len, noblock);
294 }
295 else{
296 fMutexReadList.unlock();
297 blockObj = new TFPBlock(offset, len, noblock);
298 }
299 return blockObj;
300}
301
302////////////////////////////////////////////////////////////////////////////////
303/// Return reference to the consumer thread.
304
306{
307 return fConsumer;
308}
309
310
311////////////////////////////////////////////////////////////////////////////////
312/// Change the file
313///
314/// When prefetching is enabled we also need to:
315/// - make sure the async thread is not doing any work
316/// - clear all blocks from prefetching and read list
317/// - reset the file pointer
318
320{
321 if (action == TFile::kDisconnect) {
322 if (!fThreadJoined) {
324 }
325
326 if (fFile) {
327 // Remove all pending and read blocks
328 fMutexPendingList.lock();
330 fMutexPendingList.unlock();
331
332 fMutexReadList.lock();
334 fMutexReadList.unlock();
335 }
336
337 fFile = file;
338 if (!fThreadJoined) {
340 }
341 } else {
342 // kDoNotDisconnect must reconnect to the same file
343 assert((fFile == file) && "kDoNotDisconnect must reattach to the same file");
344 }
345}
346
347
348////////////////////////////////////////////////////////////////////////////////
349/// Used to start the consumer thread.
350
352{
353 int rc;
354
355 fConsumer = new TThread((TThread::VoidRtnFunc_t) ThreadProc, (void*) this);
356 rc = fConsumer->Run();
357 if ( !rc ) {
359 }
360 return rc;
361}
362
363
364////////////////////////////////////////////////////////////////////////////////
365/// Execution loop of the consumer thread.
366
368{
369 TFilePrefetch* pClass = (TFilePrefetch*) arg;
370
371 while (!pClass->IsPrefetchFinished()) {
372 pClass->ReadListOfBlocks();
373 }
374
375 return (TThread::VoidRtnFunc_t) 1;
376}
377
378//############################# CACHING PART ###################################
379
380////////////////////////////////////////////////////////////////////////////////
381/// Sum up individual hex values to obtain a decimal value.
382
384{
385 Int_t result = 0;
386 const char* ptr = hex;
387
388 for(Int_t i=0; i < (Int_t)strlen(hex); i++)
389 result += xtod(ptr[i]);
390
391 return result;
392}
393
394////////////////////////////////////////////////////////////////////////////////
395/// Test if the block is in cache.
396
398{
399 if (fPathCache == "")
400 return false;
401
402 Bool_t found = false;
403 TString fullPath(fPathCache); // path of the cached files.
404
405 Int_t value = 0;
406
407 if (!gSystem->OpenDirectory(fullPath))
408 gSystem->mkdir(fullPath);
409
410 //dir is SHA1 value modulo 16; filename is the value of the SHA1(offset+len)
411 TMD5* md = new TMD5();
412
413 TString concatStr;
414 for (Int_t i=0; i < block->GetNoElem(); i++){
415 concatStr.Form("%lld", block->GetPos(i));
416 md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
417 }
418
419 md->Final();
420 TString fileName( md->AsString() );
421 value = SumHex(fileName);
422 value = value % 16;
423 TString dirName;
424 dirName.Form("%i", value);
425
426 fullPath += "/" + dirName + "/" + fileName;
427
428 FileStat_t stat;
429 if (gSystem->GetPathInfo(fullPath, stat) == 0) {
430 path = new char[fullPath.Length() + 1];
431 strlcpy(path, fullPath,fullPath.Length() + 1);
432 found = true;
433 } else
434 found = false;
435
436 delete md;
437 return found;
438}
439
440////////////////////////////////////////////////////////////////////////////////
441/// Return a buffer from cache.
442
443char* TFilePrefetch::GetBlockFromCache(const char* path, Int_t length)
444{
445 char *buffer = 0;
446 TString strPath = path;
447
448 strPath += "?filetype=raw";
449 TFile* file = new TFile(strPath);
450
451 Double_t start = 0;
452 if (gPerfStats != 0) start = TTimeStamp();
453
454 buffer = (char*) calloc(length, sizeof(char));
455 file->ReadBuffer(buffer, 0, length);
456
457 fFile->fBytesRead += length;
458 fFile->fgBytesRead += length;
461
464 if (gPerfStats != 0) {
465 gPerfStats->FileReadEvent(fFile, length, start);
466 }
467
468 file->Close();
469 delete file;
470 return buffer;
471}
472
473////////////////////////////////////////////////////////////////////////////////
474/// Save the block content in cache.
475
477{
478 if (fPathCache == "")
479 return;
480
481 //dir is SHA1 value modulo 16; filename is the value of the SHA1
482 TMD5* md = new TMD5();
483
484 TString concatStr;
485 for(Int_t i=0; i< block->GetNoElem(); i++){
486 concatStr.Form("%lld", block->GetPos(i));
487 md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
488 }
489 md->Final();
490
491 TString fileName( md->AsString() );
492 Int_t value = SumHex(fileName);
493 value = value % 16;
494
495 TString fullPath( fPathCache );
496 TString dirName;
497 dirName.Form("%i", value);
498 fullPath += ("/" + dirName);
499
500 if (!gSystem->OpenDirectory(fullPath))
501 gSystem->mkdir(fullPath);
502
503 TFile* file = 0;
504 fullPath += ("/" + fileName);
505 FileStat_t stat;
506 if (gSystem->GetPathInfo(fullPath, stat) == 0) {
507 fullPath += "?filetype=raw";
508 file = TFile::Open(fullPath, "update");
509 } else{
510 fullPath += "?filetype=raw";
511 file = TFile::Open(fullPath, "new");
512 }
513
514 if (file) {
515 // coverity[unchecked_value] We do not print error message, have not error
516 // return code and close the file anyway, not need to check the return value.
517 file->WriteBuffer(block->GetBuffer(), block->GetDataSize());
518 file->Close();
519 delete file;
520 }
521 delete md;
522}
523
524
525////////////////////////////////////////////////////////////////////////////////
526/// Set the path of the cache directory.
527
529{
530 fPathCache = path;
531
532 if (!gSystem->OpenDirectory(path)){
533 return (!gSystem->mkdir(path) ? true : false);
534 }
535
536 // Directory already exists
537 return true;
538}
539
#define SafeDelete(p)
Definition RConfig.hxx:547
#define c(i)
Definition RSha256.hxx:101
int Int_t
Definition RtypesCore.h:45
unsigned char UChar_t
Definition RtypesCore.h:38
const Bool_t kFALSE
Definition RtypesCore.h:92
long long Long64_t
Definition RtypesCore.h:73
const Bool_t kTRUE
Definition RtypesCore.h:91
#define ClassImp(name)
Definition Rtypes.h:364
static const int kMAX_READ_SIZE
int xtod(char c)
R__EXTERN TSystem * gSystem
Definition TSystem.h:559
R__EXTERN TVirtualMonitoringWriter * gMonitoringWriter
#define gPerfStats
#define calloc
Definition civetweb.c:1537
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
This class represents the encapsulation of a block request.
Definition TFPBlock.h:22
char * GetPtrToPiece(Int_t index) const
Get block buffer.
Definition TFPBlock.h:109
void SetPos(Int_t, Long64_t)
Set pos value for index idx.
Definition TFPBlock.cxx:72
void SetBuffer(char *)
Set block buffer.
Definition TFPBlock.cxx:81
Long64_t GetDataSize() const
Return size of the data in the block.
Definition TFPBlock.h:73
void ReallocBlock(Long64_t *, Int_t *, Int_t)
Reallocate the block's buffer based on the length of the elements it will contain.
Definition TFPBlock.cxx:94
Int_t GetLen(Int_t) const
Get length of the element at index i.
Definition TFPBlock.h:97
Int_t GetNoElem() const
Return number of elements in the block.
Definition TFPBlock.h:85
Long64_t GetPos(Int_t) const
Get position of the element at index i.
Definition TFPBlock.h:91
char * GetBuffer() const
Get block buffer.
Definition TFPBlock.h:103
The prefetching mechanism uses two classes (TFilePrefetch and TFPBlock) to prefetch in advance a bloc...
Bool_t BinarySearchReadList(TFPBlock *, Long64_t, Int_t, Int_t *)
Search for a requested element in a block and return the index.
void ReadBlock(Long64_t *, Int_t *, Int_t)
Create a TFPBlock object or recycle one and add it to the prefetchBlocks list.
static TThread::VoidRtnFunc_t ThreadProc(void *)
Execution loop of the consumer thread.
Int_t SumHex(const char *)
Sum up individual hex values to obtain a decimal value.
TFilePrefetch(TFile *)
Constructor.
TSemaphore * fSemChangeFile
void ReadListOfBlocks()
Get blocks specified in prefetchBlocks.
TFPBlock * CreateBlockObj(Long64_t *, Int_t *, Int_t)
Create a new block or recycle an old one.
TThread * fConsumer
std::condition_variable fNewBlockAdded
void AddReadBlock(TFPBlock *)
Safe method to add a block to the readList.
Bool_t CheckBlockInCache(char *&, TFPBlock *)
Test if the block is in cache.
Long64_t GetWaitTime()
Return the time spent wating for buffer to be read in microseconds.
Bool_t fThreadJoined
Bool_t SetCache(const char *)
Set the path of the cache directory.
TList * fReadBlocks
Int_t ThreadStart()
Used to start the consumer thread.
std::atomic< Bool_t > fPrefetchFinished
void ReadAsync(TFPBlock *, Bool_t &)
Read one block and insert it in prefetchBuffers list.
TFPBlock * GetPendingBlock()
Safe method to remove a block from the pendingList.
void AddPendingBlock(TFPBlock *)
Safe method to add a block to the pendingList.
Bool_t ReadBuffer(char *, Long64_t, Int_t)
Return a prefetched element.
virtual ~TFilePrefetch()
Destructor.
TString fPathCache
std::mutex fMutexReadList
TStopwatch fWaitTime
std::mutex fMutexPendingList
std::condition_variable fReadBlockAdded
void WaitFinishPrefetch()
Killing the async prefetching thread.
TThread * GetThread() const
Return reference to the consumer thread.
TList * fPendingBlocks
void SetFile(TFile *file, TFile::ECacheAction action=TFile::kDisconnect)
Change the file.
char * GetBlockFromCache(const char *, Int_t)
Return a buffer from cache.
void SaveBlockInCache(TFPBlock *)
Save the block content in cache.
Bool_t IsPrefetchFinished() const
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format.
Definition TFile.h:54
static std::atomic< Long64_t > fgBytesRead
Number of bytes read by all TFile objects.
Definition TFile.h:130
Long64_t fBytesRead
Number of bytes read from this file.
Definition TFile.h:77
Long64_t GetArchiveOffset() const
Definition TFile.h:213
virtual Bool_t ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
Read the nbuf blocks described in arrays pos and len.
Definition TFile.cxx:1737
virtual Int_t GetReadCalls() const
Definition TFile.h:236
virtual void SetReadCalls(Int_t readcalls=0)
Definition TFile.h:282
ECacheAction
TTreeCache flushing semantics.
Definition TFile.h:71
@ kDisconnect
Definition TFile.h:71
TArchiveFile * GetArchive() const
Definition TFile.h:212
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:3997
static std::atomic< Int_t > fgReadCalls
Number of bytes read from all TFile objects.
Definition TFile.h:132
TObject * Next()
A doubly linked list.
Definition TList.h:44
virtual void Add(TObject *obj)
Definition TList.h:87
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition TList.cxx:822
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition TList.cxx:659
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Definition TList.cxx:402
This code implements the MD5 message-digest algorithm.
Definition TMD5.h:44
const char * AsString() const
Return message digest as string.
Definition TMD5.cxx:220
void Update(const UChar_t *buf, UInt_t len)
Update TMD5 object to reflect the concatenation of another buffer full of bytes.
Definition TMD5.cxx:108
void Final()
MD5 finalization, ends an MD5 message-digest operation, writing the the message digest and zeroizing ...
Definition TMD5.cxx:167
Int_t Post()
Increment the value of the semaphore.
Int_t Wait()
If the semaphore value is > 0 then decrement it and carry on, else block, waiting on the condition un...
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
void Stop()
Stop the stopwatch.
Basic string class.
Definition TString.h:136
Ssiz_t Length() const
Definition TString.h:410
const char * Data() const
Definition TString.h:369
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition TString.cxx:2309
virtual void * OpenDirectory(const char *name)
Open a directory. Returns 0 if directory does not exist.
Definition TSystem.cxx:835
virtual int mkdir(const char *name, Bool_t recursive=kFALSE)
Make a file system directory.
Definition TSystem.cxx:905
int GetPathInfo(const char *path, Long_t *id, Long_t *size, Long_t *flags, Long_t *modtime)
Get info about a file: id, size, flags, modification time.
Definition TSystem.cxx:1396
Long_t Join(void **ret=nullptr)
Join this thread.
Definition TThread.cxx:515
Int_t Run(void *arg=nullptr)
Start the thread.
Definition TThread.cxx:568
void *(* VoidRtnFunc_t)(void *)
Definition TThread.h:52
The TTimeStamp encapsulates seconds and ns since EPOCH.
Definition TTimeStamp.h:71
virtual Bool_t SendFileReadProgress(TFile *)
Definition file.py:1
Definition first.py:1