Logo ROOT  
Reference Guide
Loading...
Searching...
No Matches
TFilePrefetch.cxx
Go to the documentation of this file.
1// @(#)root/io:$Id$
2// Author: Elvin Sindrilaru 19/05/2011
3
4/*************************************************************************
5 * Copyright (C) 1995-2011, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#include "TFilePrefetch.h"
13#include "TTimeStamp.h"
14#include "TSystem.h"
15#include "TMD5.h"
16#include "TVirtualPerfStats.h"
17#include "TVirtualMonitoring.h"
18#include "TSemaphore.h"
19#include "TFPBlock.h"
20#include "strlcpy.h"
21
22#include <string>
23#include <sstream>
24#include <cstdio>
25#include <cstdlib>
26#include <cctype>
27#include <cassert>
28
29static const int kMAX_READ_SIZE = 2; //maximum size of the read list of blocks
30
31inline int xtod(char c) { return (c>='0' && c<='9') ? c-'0' : ((c>='A' && c<='F') ? c-'A'+10 : ((c>='a' && c<='f') ? c-'a'+10 : 0)); }
32
33
34/**
35\class TFilePrefetch
36\ingroup io_files
37
38The prefetching mechanism uses two classes (TFilePrefetch and
39TFPBlock) to prefetch in advance a block of tree entries. There is
40a thread which takes care of actually transferring the blocks and
41making them available to the main requesting thread. Therefore,
42the time spent by the main thread waiting for the data before
43processing considerably decreases. Besides the prefetching
44mechanisms there is also a local caching option which can be
45enabled by the user. Both capabilities are disabled by default
46and must be explicitly enabled by the user.
47*/
48
49
50////////////////////////////////////////////////////////////////////////////////
51/// Constructor.
52
54 fFile(file),
55 fConsumer(0),
58{
59 fPendingBlocks = new TList();
60 fReadBlocks = new TList();
61
62 fPendingBlocks->SetOwner();
63 fReadBlocks->SetOwner();
64
66}
67
68////////////////////////////////////////////////////////////////////////////////
69/// Destructor
70
82
83
84////////////////////////////////////////////////////////////////////////////////
85/// Killing the async prefetching thread
86
88{
89 // Inform the consumer thread that prefetching is over
90 {
91 std::lock_guard<std::mutex> lk(fMutexPendingList);
93 }
94 fNewBlockAdded.notify_one();
95
96 fConsumer->Join();
99}
100
101
102////////////////////////////////////////////////////////////////////////////////
103/// Read one block and insert it in prefetchBuffers list.
104
106{
107 char* path = 0;
108
109 if (CheckBlockInCache(path, block)){
110 block->SetBuffer(GetBlockFromCache(path, block->GetDataSize()));
111 inCache = kTRUE;
112 }
113 else{
114 fFile->ReadBuffers(block->GetBuffer(), block->GetPos(), block->GetLen(), block->GetNoElem());
115 if (fFile->GetArchive()) {
116 for (Int_t i = 0; i < block->GetNoElem(); i++)
117 block->SetPos(i, block->GetPos(i) - fFile->GetArchiveOffset());
118 }
119 inCache =kFALSE;
120 }
121 delete[] path;
122}
123
124////////////////////////////////////////////////////////////////////////////////
125/// Get blocks specified in prefetchBlocks.
126
128{
129 Bool_t inCache = kFALSE;
130 TFPBlock* block = 0;
131
132 while((block = GetPendingBlock())){
133 ReadAsync(block, inCache);
134 AddReadBlock(block);
135 if (!inCache)
136 SaveBlockInCache(block);
137 }
138}
139
140////////////////////////////////////////////////////////////////////////////////
141/// Search for a requested element in a block and return the index.
142
144{
145 Int_t first = 0, last = -1, mid = -1;
146 last = (Int_t) blockObj->GetNoElem()-1;
147
148 while (first <= last){
149 mid = first + (last - first) / 2;
150 if ((offset >= blockObj->GetPos(mid) && offset <= (blockObj->GetPos(mid) + blockObj->GetLen(mid))
151 && ( (offset + len) <= blockObj->GetPos(mid) + blockObj->GetLen(mid)))){
152
153 *index = mid;
154 return true;
155 }
156 else if (blockObj->GetPos(mid) < offset){
157 first = mid + 1;
158 }
159 else{
160 last = mid - 1;
161 }
162 }
163 return false;
164}
165
166////////////////////////////////////////////////////////////////////////////////
167/// Return the time spent wating for buffer to be read in microseconds.
168
170{
171 return Long64_t(fWaitTime.RealTime()*1.e+6);
172}
173
174////////////////////////////////////////////////////////////////////////////////
175/// Return a prefetched element.
176
178{
179 Bool_t found = false;
180 TFPBlock* blockObj = 0;
181 Int_t index = -1;
182
183 std::unique_lock<std::mutex> lk(fMutexReadList);
184 while (1){
185 TIter iter(fReadBlocks);
186 while ((blockObj = (TFPBlock*) iter.Next())){
187 index = -1;
188 if (BinarySearchReadList(blockObj, offset, len, &index)){
189 found = true;
190 break;
191 }
192 }
193 if (found)
194 break;
195 else{
196 fWaitTime.Start(kFALSE);
197 fReadBlockAdded.wait(lk); //wait for a new block to be added
198 fWaitTime.Stop();
199 }
200 }
201
202 if (found){
203 char *pBuff = blockObj->GetPtrToPiece(index);
204 pBuff += (offset - blockObj->GetPos(index));
205 memcpy(buf, pBuff, len);
206 }
207 return found;
208}
209
210////////////////////////////////////////////////////////////////////////////////
211/// Create a TFPBlock object or recycle one and add it to the prefetchBlocks list.
212
213void TFilePrefetch::ReadBlock(Long64_t* offset, Int_t* len, Int_t nblock)
214{
215 TFPBlock* block = CreateBlockObj(offset, len, nblock);
216 AddPendingBlock(block);
217}
218
219////////////////////////////////////////////////////////////////////////////////
220/// Safe method to add a block to the pendingList.
221
223{
224 fMutexPendingList.lock();
225 fPendingBlocks->Add(block);
226 fMutexPendingList.unlock();
227
228 fNewBlockAdded.notify_one();
229}
230
231////////////////////////////////////////////////////////////////////////////////
232/// Safe method to remove a block from the pendingList.
233
235{
236 TFPBlock* block = 0;
237
238 // Use the semaphore to deal with the case when the file pointer
239 // is changed on the fly by TChain
240 fSemChangeFile->Post();
241 std::unique_lock<std::mutex> lk(fMutexPendingList);
242 // Wait unless there is a pending block or prefetching is over
243 fNewBlockAdded.wait(lk, [&]{ return fPendingBlocks->GetSize() > 0 || fPrefetchFinished; });
244 lk.unlock();
245 fSemChangeFile->Wait();
246
247 lk.lock();
248 if (fPendingBlocks->GetSize()){
249 block = (TFPBlock*)fPendingBlocks->First();
250 block = (TFPBlock*)fPendingBlocks->Remove(block);
251 }
252 return block;
253}
254
255////////////////////////////////////////////////////////////////////////////////
256/// Safe method to add a block to the readList.
257
259{
260 fMutexReadList.lock();
261
262 if (fReadBlocks->GetSize() >= kMAX_READ_SIZE){
263 TFPBlock* movedBlock = (TFPBlock*) fReadBlocks->First();
264 movedBlock = (TFPBlock*)fReadBlocks->Remove(movedBlock);
265 delete movedBlock;
266 movedBlock = 0;
267 }
268
269 fReadBlocks->Add(block);
270 fMutexReadList.unlock();
271
272 //signal the addition of a new block
273 fReadBlockAdded.notify_one();
274}
275
276
277////////////////////////////////////////////////////////////////////////////////
278/// Create a new block or recycle an old one.
279
281{
282 TFPBlock* blockObj = 0;
283
284 fMutexReadList.lock();
285
286 if (fReadBlocks->GetSize() >= kMAX_READ_SIZE){
287 blockObj = static_cast<TFPBlock*>(fReadBlocks->First());
288 fReadBlocks->Remove(blockObj);
289 fMutexReadList.unlock();
290 blockObj->ReallocBlock(offset, len, noblock);
291 }
292 else{
293 fMutexReadList.unlock();
294 blockObj = new TFPBlock(offset, len, noblock);
295 }
296 return blockObj;
297}
298
299////////////////////////////////////////////////////////////////////////////////
300/// Return reference to the consumer thread.
301
303{
304 return fConsumer;
305}
306
307
308////////////////////////////////////////////////////////////////////////////////
309/// Change the file
310///
311/// When prefetching is enabled we also need to:
312/// - make sure the async thread is not doing any work
313/// - clear all blocks from prefetching and read list
314/// - reset the file pointer
315
317{
318 if (action == TFile::kDisconnect) {
319 if (!fThreadJoined) {
320 fSemChangeFile->Wait();
321 }
322
323 if (fFile) {
324 // Remove all pending and read blocks
325 fMutexPendingList.lock();
326 fPendingBlocks->Clear();
327 fMutexPendingList.unlock();
328
329 fMutexReadList.lock();
330 fReadBlocks->Clear();
331 fMutexReadList.unlock();
332 }
333
334 fFile = file;
335 if (!fThreadJoined) {
336 fSemChangeFile->Post();
337 }
338 } else {
339 // kDoNotDisconnect must reconnect to the same file
340 assert((fFile == file) && "kDoNotDisconnect must reattach to the same file");
341 }
342}
343
344
345////////////////////////////////////////////////////////////////////////////////
346/// Used to start the consumer thread.
347
349{
350 int rc;
351
352 fConsumer = new TThread((TThread::VoidRtnFunc_t) ThreadProc, (void*) this);
353 rc = fConsumer->Run();
354 if ( !rc ) {
356 }
357 return rc;
358}
359
360
361////////////////////////////////////////////////////////////////////////////////
362/// Execution loop of the consumer thread.
363
365{
366 TFilePrefetch* pClass = (TFilePrefetch*) arg;
367
368 while (!pClass->IsPrefetchFinished()) {
369 pClass->ReadListOfBlocks();
370 }
371
372 return (TThread::VoidRtnFunc_t) 1;
373}
374
375//############################# CACHING PART ###################################
376
377////////////////////////////////////////////////////////////////////////////////
378/// Sum up individual hex values to obtain a decimal value.
379
381{
382 Int_t result = 0;
383 const char* ptr = hex;
384
385 for(Int_t i=0; i < (Int_t)strlen(hex); i++)
386 result += xtod(ptr[i]);
387
388 return result;
389}
390
391////////////////////////////////////////////////////////////////////////////////
392/// Test if the block is in cache.
393
395{
396 if (fPathCache == "")
397 return false;
398
399 Bool_t found = false;
400 TString fullPath(fPathCache); // path of the cached files.
401
402 Int_t value = 0;
403
404 void *fdir = gSystem->OpenDirectory(fullPath);
405 if (!fdir)
406 gSystem->mkdir(fullPath);
407 else {
408 gSystem->FreeDirectory(fdir);
409 }
410 //dir is SHA1 value modulo 16; filename is the value of the SHA1(offset+len)
411 TMD5* md = new TMD5();
412
413 TString concatStr;
414 for (Int_t i=0; i < block->GetNoElem(); i++){
415 concatStr.Form("%lld", block->GetPos(i));
416 md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
417 }
418
419 md->Final();
420 TString fileName( md->AsString() );
421 value = SumHex(fileName);
422 value = value % 16;
423 TString dirName;
424 dirName.Form("%i", value);
425
426 fullPath += "/" + dirName + "/" + fileName;
427
428 FileStat_t stat;
429 if (gSystem->GetPathInfo(fullPath, stat) == 0) {
430 path = new char[fullPath.Length() + 1];
431 strlcpy(path, fullPath,fullPath.Length() + 1);
432 found = true;
433 } else
434 found = false;
435
436 delete md;
437 return found;
438}
439
440////////////////////////////////////////////////////////////////////////////////
441/// Return a buffer from cache.
442
443char* TFilePrefetch::GetBlockFromCache(const char* path, Int_t length)
444{
445 char *buffer = 0;
446 TString strPath = path;
447
448 strPath += "?filetype=raw";
449 TFile* file = new TFile(strPath);
450
451 Double_t start = 0;
452 if (gPerfStats != 0) start = TTimeStamp();
453
454 buffer = (char*) calloc(length, sizeof(char));
455 file->ReadBuffer(buffer, 0, length);
456
457 fFile->fBytesRead += length;
458 fFile->fgBytesRead += length;
459 fFile->SetReadCalls(fFile->GetReadCalls() + 1);
460 fFile->fgReadCalls++;
461
463 gMonitoringWriter->SendFileReadProgress(fFile);
464 if (gPerfStats != 0) {
465 gPerfStats->FileReadEvent(fFile, length, start);
466 }
467
468 file->Close();
469 delete file;
470 return buffer;
471}
472
473////////////////////////////////////////////////////////////////////////////////
474/// Save the block content in cache.
475
477{
478 if (fPathCache == "")
479 return;
480
481 //dir is SHA1 value modulo 16; filename is the value of the SHA1
482 TMD5* md = new TMD5();
483
484 TString concatStr;
485 for(Int_t i=0; i< block->GetNoElem(); i++){
486 concatStr.Form("%lld", block->GetPos(i));
487 md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
488 }
489 md->Final();
490
491 TString fileName( md->AsString() );
492 Int_t value = SumHex(fileName);
493 value = value % 16;
494
495 TString fullPath( fPathCache );
496 TString dirName;
497 dirName.Form("%i", value);
498 fullPath += ("/" + dirName);
499
500 void *fdir = gSystem->OpenDirectory(fullPath);
501 if (!fdir)
502 gSystem->mkdir(fullPath);
503 else {
504 gSystem->FreeDirectory(fdir);
505 }
506
507 TFile* file = 0;
508 fullPath += ("/" + fileName);
509 FileStat_t stat;
510 if (gSystem->GetPathInfo(fullPath, stat) == 0) {
511 fullPath += "?filetype=raw";
512 file = TFile::Open(fullPath, "update");
513 } else{
514 fullPath += "?filetype=raw";
515 file = TFile::Open(fullPath, "new");
516 }
517
518 if (file) {
519 // coverity[unchecked_value] We do not print error message, have not error
520 // return code and close the file anyway, not need to check the return value.
521 file->WriteBuffer(block->GetBuffer(), block->GetDataSize());
522 file->Close();
523 delete file;
524 }
525 delete md;
526}
527
528
529////////////////////////////////////////////////////////////////////////////////
530/// Set the path of the cache directory.
531
533{
534 fPathCache = path;
535
536 void *fdir = gSystem->OpenDirectory(path);
537 if (!fdir) {
538 return (!gSystem->mkdir(path) ? true : false);
539 } else {
540 gSystem->FreeDirectory(fdir);
541 }
542
543 // Directory already exists
544 return true;
545}
546
#define SafeDelete(p)
Definition RConfig.hxx:525
#define c(i)
Definition RSha256.hxx:101
start
Definition Rotated.cxx:223
int Int_t
Signed integer 4 bytes (int).
Definition RtypesCore.h:59
unsigned char UChar_t
Unsigned Character 1 byte (unsigned char).
Definition RtypesCore.h:52
bool Bool_t
Boolean (0=false, 1=true) (bool).
Definition RtypesCore.h:77
constexpr Bool_t kFALSE
Definition RtypesCore.h:108
double Double_t
Double 8 bytes.
Definition RtypesCore.h:73
long long Long64_t
Portable signed long integer 8 bytes.
Definition RtypesCore.h:83
constexpr Bool_t kTRUE
Definition RtypesCore.h:107
static const int kMAX_READ_SIZE
int xtod(char c)
externTSystem * gSystem
Definition TSystem.h:582
externTVirtualMonitoringWriter * gMonitoringWriter
#define gPerfStats
#define calloc
Definition civetweb.c:1576
This class represents the encapsulation of a block request.
Definition TFPBlock.h:22
char * GetPtrToPiece(Int_t index) const
Get block buffer.
Definition TFPBlock.h:108
void SetPos(Int_t, Long64_t)
Set pos value for index idx.
Definition TFPBlock.cxx:71
void SetBuffer(char *)
Set block buffer.
Definition TFPBlock.cxx:80
Long64_t GetDataSize() const
Return size of the data in the block.
Definition TFPBlock.h:72
void ReallocBlock(Long64_t *, Int_t *, Int_t)
Reallocate the block's buffer based on the length of the elements it will contain.
Definition TFPBlock.cxx:93
Int_t GetLen(Int_t) const
Get length of the element at index i.
Definition TFPBlock.h:96
Int_t GetNoElem() const
Return number of elements in the block.
Definition TFPBlock.h:84
Long64_t GetPos(Int_t) const
Get position of the element at index i.
Definition TFPBlock.h:90
char * GetBuffer() const
Get block buffer.
Definition TFPBlock.h:102
Bool_t BinarySearchReadList(TFPBlock *, Long64_t, Int_t, Int_t *)
Search for a requested element in a block and return the index.
void ReadBlock(Long64_t *, Int_t *, Int_t)
Create a TFPBlock object or recycle one and add it to the prefetchBlocks list.
static TThread::VoidRtnFunc_t ThreadProc(void *)
Execution loop of the consumer thread.
Int_t SumHex(const char *)
Sum up individual hex values to obtain a decimal value.
TFilePrefetch(TFile *)
Constructor.
TSemaphore * fSemChangeFile
semaphore used when changing a file in TChain
void ReadListOfBlocks()
Get blocks specified in prefetchBlocks.
TFPBlock * CreateBlockObj(Long64_t *, Int_t *, Int_t)
Create a new block or recycle an old one.
TThread * fConsumer
consumer thread
std::condition_variable fNewBlockAdded
signal the addition of a new pending block
void AddReadBlock(TFPBlock *)
Safe method to add a block to the readList.
Bool_t CheckBlockInCache(char *&, TFPBlock *)
Test if the block is in cache.
Long64_t GetWaitTime()
Return the time spent wating for buffer to be read in microseconds.
Bool_t fThreadJoined
mark if async thread was joined
Bool_t SetCache(const char *)
Set the path of the cache directory.
TList * fReadBlocks
list of blocks read
Int_t ThreadStart()
Used to start the consumer thread.
std::atomic< Bool_t > fPrefetchFinished
true if prefetching is over
void ReadAsync(TFPBlock *, Bool_t &)
Read one block and insert it in prefetchBuffers list.
TFPBlock * GetPendingBlock()
Safe method to remove a block from the pendingList.
void AddPendingBlock(TFPBlock *)
Safe method to add a block to the pendingList.
Bool_t ReadBuffer(char *, Long64_t, Int_t)
Return a prefetched element.
TString fPathCache
path to the cache directory
std::mutex fMutexReadList
mutex for the list of read blocks
TStopwatch fWaitTime
time waiting to prefetch a buffer (in usec)
std::mutex fMutexPendingList
mutex for the pending list
std::condition_variable fReadBlockAdded
signal the addition of a new red block
void WaitFinishPrefetch()
Killing the async prefetching thread.
TThread * GetThread() const
Return reference to the consumer thread.
TFile * fFile
reference to the file
TList * fPendingBlocks
list of pending blocks to be read
void SetFile(TFile *file, TFile::ECacheAction action=TFile::kDisconnect)
Change the file.
char * GetBlockFromCache(const char *, Int_t)
Return a buffer from cache.
void SaveBlockInCache(TFPBlock *)
Save the block content in cache.
~TFilePrefetch() override
Destructor.
Bool_t IsPrefetchFinished() const
A file, usually with extension .root, that stores data and code in the form of serialized objects in ...
Definition TFile.h:130
virtual Bool_t WriteBuffer(const char *buf, Int_t len)
Write a buffer to the file.
Definition TFile.cxx:2528
ECacheAction
TTreeCache flushing semantics.
Definition TFile.h:148
@ kDisconnect
Definition TFile.h:148
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:3787
void Close(Option_t *option="") override
Close a file.
Definition TFile.cxx:981
virtual Bool_t ReadBuffer(char *buf, Int_t len)
Read a buffer from the file.
Definition TFile.cxx:1823
TObject * Next()
A doubly linked list.
Definition TList.h:38
This code implements the MD5 message-digest algorithm.
Definition TMD5.h:44
const char * AsString() const
Return message digest as string.
Definition TMD5.cxx:219
void Update(const UChar_t *buf, UInt_t len)
Update TMD5 object to reflect the concatenation of another buffer full of bytes.
Definition TMD5.cxx:107
void Final()
MD5 finalization, ends an MD5 message-digest operation, writing the the message digest and zeroizing ...
Definition TMD5.cxx:166
Basic string class.
Definition TString.h:138
Ssiz_t Length() const
Definition TString.h:425
const char * Data() const
Definition TString.h:384
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition TString.cxx:2363
<div class="legacybox"><h2>Legacy Code</h2> TThread is a legacy interface: there will be no bug fixes...
Definition TThread.h:40
void *(* VoidRtnFunc_t)(void *)
Definition TThread.h:52
The TTimeStamp encapsulates seconds and ns since EPOCH.
Definition TTimeStamp.h:45