Logo ROOT   6.10/09
Reference Guide
TFilePrefetch.cxx
Go to the documentation of this file.
1 // @(#)root/io:$Id$
2 // Author: Elvin Sindrilaru 19/05/2011
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2011, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #include "TFilePrefetch.h"
13 #include "TTimeStamp.h"
14 #include "TVirtualPerfStats.h"
15 #include "TVirtualMonitoring.h"
16 
17 #include <iostream>
18 #include <string>
19 #include <sstream>
20 #include <cstdio>
21 #include <cstdlib>
22 #include <cctype>
23 #include <cassert>
24 
25 static const int kMAX_READ_SIZE = 2; //maximum size of the read list of blocks
26 
27 inline int xtod(char c) { return (c>='0' && c<='9') ? c-'0' : ((c>='A' && c<='F') ? c-'A'+10 : ((c>='a' && c<='f') ? c-'a'+10 : 0)); }
28 
29 using namespace std;
30 
32 
33 /**
34 \class TFilePrefetch
35 \ingroup IO
36 
37 The prefetching mechanism uses two classes (TFilePrefetch and
38 TFPBlock) to prefetch in advance a block of tree entries. There is
39 a thread which takes care of actually transferring the blocks and
40 making them available to the main requesting thread. Therefore,
41 the time spent by the main thread waiting for the data before
42 processing considerably decreases. Besides the prefetching
43 mechanisms there is also a local caching option which can be
44 enabled by the user. Both capabilities are disabled by default
45 and must be explicitly enabled by the user.
46 */
47 
48 
49 ////////////////////////////////////////////////////////////////////////////////
50 /// Constructor.
51 
53  fFile(file),
54  fConsumer(0),
55  fThreadJoined(kTRUE),
56  fPrefetchFinished(kFALSE)
57 {
58  fPendingBlocks = new TList();
59  fReadBlocks = new TList();
60 
61  fPendingBlocks->SetOwner();
62  fReadBlocks->SetOwner();
63 
64  fSemChangeFile = new TSemaphore(0);
65 }
66 
67 ////////////////////////////////////////////////////////////////////////////////
68 /// Destructor
69 
71 {
72  if (!fThreadJoined) {
73  WaitFinishPrefetch();
74  }
75 
76  SafeDelete(fConsumer);
77  SafeDelete(fPendingBlocks);
78  SafeDelete(fReadBlocks);
79  SafeDelete(fSemChangeFile);
80 }
81 
82 
83 ////////////////////////////////////////////////////////////////////////////////
84 /// Killing the async prefetching thread
85 
87 {
88  // Inform the consumer thread that prefetching is over
89  {
90  std::lock_guard<std::mutex> lk(fMutexPendingList);
91  fPrefetchFinished = kTRUE;
92  }
93  fNewBlockAdded.notify_one();
94 
95  fConsumer->Join();
96  fThreadJoined = kTRUE;
97  fPrefetchFinished = kFALSE;
98 }
99 
100 
101 ////////////////////////////////////////////////////////////////////////////////
102 /// Read one block and insert it in prefetchBuffers list.
103 
105 {
106  char* path = 0;
107 
108  if (CheckBlockInCache(path, block)){
109  block->SetBuffer(GetBlockFromCache(path, block->GetDataSize()));
110  inCache = kTRUE;
111  }
112  else{
113  fFile->ReadBuffers(block->GetBuffer(), block->GetPos(), block->GetLen(), block->GetNoElem());
114  if (fFile->GetArchive()) {
115  for (Int_t i = 0; i < block->GetNoElem(); i++)
116  block->SetPos(i, block->GetPos(i) - fFile->GetArchiveOffset());
117  }
118  inCache =kFALSE;
119  }
120  delete[] path;
121 }
122 
123 ////////////////////////////////////////////////////////////////////////////////
124 /// Get blocks specified in prefetchBlocks.
125 
127 {
128  Bool_t inCache = kFALSE;
129  TFPBlock* block = 0;
130 
131  while((block = GetPendingBlock())){
132  ReadAsync(block, inCache);
133  AddReadBlock(block);
134  if (!inCache)
135  SaveBlockInCache(block);
136  }
137 }
138 
139 ////////////////////////////////////////////////////////////////////////////////
140 /// Search for a requested element in a block and return the index.
141 
143 {
144  Int_t first = 0, last = -1, mid = -1;
145  last = (Int_t) blockObj->GetNoElem()-1;
146 
147  while (first <= last){
148  mid = first + (last - first) / 2;
149  if ((offset >= blockObj->GetPos(mid) && offset <= (blockObj->GetPos(mid) + blockObj->GetLen(mid))
150  && ( (offset + len) <= blockObj->GetPos(mid) + blockObj->GetLen(mid)))){
151 
152  *index = mid;
153  return true;
154  }
155  else if (blockObj->GetPos(mid) < offset){
156  first = mid + 1;
157  }
158  else{
159  last = mid - 1;
160  }
161  }
162  return false;
163 }
164 
165 ////////////////////////////////////////////////////////////////////////////////
166 /// Return the time spent wating for buffer to be read in microseconds.
167 
169 {
170  return Long64_t(fWaitTime.RealTime()*1.e+6);
171 }
172 
173 ////////////////////////////////////////////////////////////////////////////////
174 /// Return a prefetched element.
175 
177 {
178  Bool_t found = false;
179  TFPBlock* blockObj = 0;
180  Int_t index = -1;
181 
182  std::unique_lock<std::mutex> lk(fMutexReadList);
183  while (1){
184  TIter iter(fReadBlocks);
185  while ((blockObj = (TFPBlock*) iter.Next())){
186  index = -1;
187  if (BinarySearchReadList(blockObj, offset, len, &index)){
188  found = true;
189  break;
190  }
191  }
192  if (found)
193  break;
194  else{
195  fWaitTime.Start(kFALSE);
196  fReadBlockAdded.wait(lk); //wait for a new block to be added
197  fWaitTime.Stop();
198  }
199  }
200 
201  if (found){
202  char *pBuff = blockObj->GetPtrToPiece(index);
203  pBuff += (offset - blockObj->GetPos(index));
204  memcpy(buf, pBuff, len);
205  }
206  return found;
207 }
208 
209 ////////////////////////////////////////////////////////////////////////////////
210 /// Create a TFPBlock object or recycle one and add it to the prefetchBlocks list.
211 
212 void TFilePrefetch::ReadBlock(Long64_t* offset, Int_t* len, Int_t nblock)
213 {
214  TFPBlock* block = CreateBlockObj(offset, len, nblock);
215  AddPendingBlock(block);
216 }
217 
218 ////////////////////////////////////////////////////////////////////////////////
219 /// Safe method to add a block to the pendingList.
220 
222 {
223  fMutexPendingList.lock();
224  fPendingBlocks->Add(block);
225  fMutexPendingList.unlock();
226 
227  fNewBlockAdded.notify_one();
228 }
229 
230 ////////////////////////////////////////////////////////////////////////////////
231 /// Safe method to remove a block from the pendingList.
232 
234 {
235  TFPBlock* block = 0;
236 
237  // Use the semaphore to deal with the case when the file pointer
238  // is changed on the fly by TChain
239  fSemChangeFile->Post();
240  std::unique_lock<std::mutex> lk(fMutexPendingList);
241  // Wait unless there is a pending block or prefetching is over
242  fNewBlockAdded.wait(lk, [&]{ return fPendingBlocks->GetSize() > 0 || fPrefetchFinished; });
243  lk.unlock();
244  fSemChangeFile->Wait();
245 
246  lk.lock();
247  if (fPendingBlocks->GetSize()){
248  block = (TFPBlock*)fPendingBlocks->First();
249  block = (TFPBlock*)fPendingBlocks->Remove(block);
250  }
251  return block;
252 }
253 
254 ////////////////////////////////////////////////////////////////////////////////
255 /// Safe method to add a block to the readList.
256 
258 {
259  fMutexReadList.lock();
260 
261  if (fReadBlocks->GetSize() >= kMAX_READ_SIZE){
262  TFPBlock* movedBlock = (TFPBlock*) fReadBlocks->First();
263  movedBlock = (TFPBlock*)fReadBlocks->Remove(movedBlock);
264  delete movedBlock;
265  movedBlock = 0;
266  }
267 
268  fReadBlocks->Add(block);
269  fMutexReadList.unlock();
270 
271  //signal the addition of a new block
272  fReadBlockAdded.notify_one();
273 }
274 
275 
276 ////////////////////////////////////////////////////////////////////////////////
277 /// Create a new block or recycle an old one.
278 
280 {
281  TFPBlock* blockObj = 0;
282 
283  fMutexReadList.lock();
284 
285  if (fReadBlocks->GetSize() >= kMAX_READ_SIZE){
286  blockObj = static_cast<TFPBlock*>(fReadBlocks->First());
287  fReadBlocks->Remove(blockObj);
288  fMutexReadList.unlock();
289  blockObj->ReallocBlock(offset, len, noblock);
290  }
291  else{
292  fMutexReadList.unlock();
293  blockObj = new TFPBlock(offset, len, noblock);
294  }
295  return blockObj;
296 }
297 
298 ////////////////////////////////////////////////////////////////////////////////
299 /// Return reference to the consumer thread.
300 
302 {
303  return fConsumer;
304 }
305 
306 
307 ////////////////////////////////////////////////////////////////////////////////
308 /// Change the file
309 ///
310 /// When prefetching is enabled we also need to:
311 /// - make sure the async thread is not doing any work
312 /// - clear all blocks from prefetching and read list
313 /// - reset the file pointer
314 
316 {
317  if (action == TFile::kDisconnect) {
318  if (!fThreadJoined) {
319  fSemChangeFile->Wait();
320  }
321 
322  if (fFile) {
323  // Remove all pending and read blocks
324  fMutexPendingList.lock();
325  fPendingBlocks->Clear();
326  fMutexPendingList.unlock();
327 
328  fMutexReadList.lock();
329  fReadBlocks->Clear();
330  fMutexReadList.unlock();
331  }
332 
333  fFile = file;
334  if (!fThreadJoined) {
335  fSemChangeFile->Post();
336  }
337  } else {
338  // kDoNotDisconnect must reconnect to the same file
339  assert((fFile == file) && "kDoNotDisconnect must reattach to the same file");
340  }
341 }
342 
343 
344 ////////////////////////////////////////////////////////////////////////////////
345 /// Used to start the consumer thread.
346 
348 {
349  int rc;
350 
351  fConsumer = new TThread((TThread::VoidRtnFunc_t) ThreadProc, (void*) this);
352  rc = fConsumer->Run();
353  if ( !rc ) {
354  fThreadJoined = kFALSE;
355  }
356  return rc;
357 }
358 
359 
360 ////////////////////////////////////////////////////////////////////////////////
361 /// Execution loop of the consumer thread.
362 
364 {
365  TFilePrefetch* pClass = (TFilePrefetch*) arg;
366 
367  while (!pClass->IsPrefetchFinished()) {
368  pClass->ReadListOfBlocks();
369  }
370 
371  return (TThread::VoidRtnFunc_t) 1;
372 }
373 
374 //############################# CACHING PART ###################################
375 
376 ////////////////////////////////////////////////////////////////////////////////
377 /// Sum up individual hex values to obtain a decimal value.
378 
379 Int_t TFilePrefetch::SumHex(const char *hex)
380 {
381  Int_t result = 0;
382  const char* ptr = hex;
383 
384  for(Int_t i=0; i < (Int_t)strlen(hex); i++)
385  result += xtod(ptr[i]);
386 
387  return result;
388 }
389 
390 ////////////////////////////////////////////////////////////////////////////////
391 /// Test if the block is in cache.
392 
394 {
395  if (fPathCache == "")
396  return false;
397 
398  Bool_t found = false;
399  TString fullPath(fPathCache); // path of the cached files.
400 
401  Int_t value = 0;
402 
403  if (!gSystem->OpenDirectory(fullPath))
404  gSystem->mkdir(fullPath);
405 
406  //dir is SHA1 value modulo 16; filename is the value of the SHA1(offset+len)
407  TMD5* md = new TMD5();
408 
409  TString concatStr;
410  for (Int_t i=0; i < block->GetNoElem(); i++){
411  concatStr.Form("%lld", block->GetPos(i));
412  md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
413  }
414 
415  md->Final();
416  TString fileName( md->AsString() );
417  value = SumHex(fileName);
418  value = value % 16;
419  TString dirName;
420  dirName.Form("%i", value);
421 
422  fullPath += "/" + dirName + "/" + fileName;
423 
424  FileStat_t stat;
425  if (gSystem->GetPathInfo(fullPath, stat) == 0) {
426  path = new char[fullPath.Length() + 1];
427  strlcpy(path, fullPath,fullPath.Length() + 1);
428  found = true;
429  } else
430  found = false;
431 
432  delete md;
433  return found;
434 }
435 
436 ////////////////////////////////////////////////////////////////////////////////
437 /// Return a buffer from cache.
438 
439 char* TFilePrefetch::GetBlockFromCache(const char* path, Int_t length)
440 {
441  char *buffer = 0;
442  TString strPath = path;
443 
444  strPath += "?filetype=raw";
445  TFile* file = new TFile(strPath);
446 
447  Double_t start = 0;
448  if (gPerfStats != 0) start = TTimeStamp();
449 
450  buffer = (char*) calloc(length, sizeof(char));
451  file->ReadBuffer(buffer, 0, length);
452 
453  fFile->fBytesRead += length;
454  fFile->fgBytesRead += length;
455  fFile->SetReadCalls(fFile->GetReadCalls() + 1);
456  fFile->fgReadCalls++;
457 
458  if (gMonitoringWriter)
460  if (gPerfStats != 0) {
461  gPerfStats->FileReadEvent(fFile, length, start);
462  }
463 
464  file->Close();
465  delete file;
466  return buffer;
467 }
468 
469 ////////////////////////////////////////////////////////////////////////////////
470 /// Save the block content in cache.
471 
473 {
474  if (fPathCache == "")
475  return;
476 
477  //dir is SHA1 value modulo 16; filename is the value of the SHA1
478  TMD5* md = new TMD5();
479 
480  TString concatStr;
481  for(Int_t i=0; i< block->GetNoElem(); i++){
482  concatStr.Form("%lld", block->GetPos(i));
483  md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
484  }
485  md->Final();
486 
487  TString fileName( md->AsString() );
488  Int_t value = SumHex(fileName);
489  value = value % 16;
490 
491  TString fullPath( fPathCache );
492  TString dirName;
493  dirName.Form("%i", value);
494  fullPath += ("/" + dirName);
495 
496  if (!gSystem->OpenDirectory(fullPath))
497  gSystem->mkdir(fullPath);
498 
499  TFile* file = 0;
500  fullPath += ("/" + fileName);
501  FileStat_t stat;
502  if (gSystem->GetPathInfo(fullPath, stat) == 0) {
503  fullPath += "?filetype=raw";
504  file = TFile::Open(fullPath, "update");
505  } else{
506  fullPath += "?filetype=raw";
507  file = TFile::Open(fullPath, "new");
508  }
509 
510  if (file) {
511  // coverity[unchecked_value] We do not print error message, have not error
512  // return code and close the file anyway, not need to check the return value.
513  file->WriteBuffer(block->GetBuffer(), block->GetDataSize());
514  file->Close();
515  delete file;
516  }
517  delete md;
518 }
519 
520 
521 ////////////////////////////////////////////////////////////////////////////////
522 /// Set the path of the cache directory.
523 
525 {
526  fPathCache = path;
527 
528  if (!gSystem->OpenDirectory(path)){
529  return (!gSystem->mkdir(path) ? true : false);
530  }
531 
532  // Directory already exists
533  return true;
534 }
535 
void AddPendingBlock(TFPBlock *)
Safe method to add a block to the pendingList.
void *(* VoidRtnFunc_t)(void *)
Definition: TThread.h:49
void ReadAsync(TFPBlock *, Bool_t &)
Read one block and insert it in prefetchBuffers list.
Bool_t BinarySearchReadList(TFPBlock *, Long64_t, Int_t, Int_t *)
Search for a requested element in a block and return the index.
long long Long64_t
Definition: RtypesCore.h:69
you should not use this method at all Int_t Int_t Double_t Double_t Double_t Int_t mid
Definition: TRolke.cxx:630
void ReallocBlock(Long64_t *, Int_t *, Int_t)
Reallocate the block&#39;s buffer based on the length of the elements it will contain.
Definition: TFPBlock.cxx:94
Bool_t CheckBlockInCache(char *&, TFPBlock *)
Test if the block is in cache.
void Final()
MD5 finalization, ends an MD5 message-digest operation, writing the the message digest and zeroizing ...
Definition: TMD5.cxx:167
Bool_t IsPrefetchFinished() const
Definition: TFilePrefetch.h:80
static TThread::VoidRtnFunc_t ThreadProc(void *)
Execution loop of the consumer thread.
int GetPathInfo(const char *path, Long_t *id, Long_t *size, Long_t *flags, Long_t *modtime)
Get info about a file: id, size, flags, modification time.
Definition: TSystem.cxx:1370
int xtod(char c)
static const int kMAX_READ_SIZE
Long64_t GetWaitTime()
Return the time spent wating for buffer to be read in microseconds.
const char * AsString() const
Return message digest as string.
Definition: TMD5.cxx:220
Int_t GetNoElem() const
Return number of elements in the block.
Definition: TFPBlock.h:85
char * GetPtrToPiece(Int_t index) const
Get block buffer.
Definition: TFPBlock.h:109
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
STL namespace.
Int_t ThreadStart()
Used to start the consumer thread.
virtual int mkdir(const char *name, Bool_t recursive=kFALSE)
Make a file system directory.
Definition: TSystem.cxx:903
char * GetBuffer() const
Get block buffer.
Definition: TFPBlock.h:103
void SaveBlockInCache(TFPBlock *)
Save the block content in cache.
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=1, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:3909
#define SafeDelete(p)
Definition: RConfig.h:499
void SetPos(Int_t, Long64_t)
Set pos value for index idx.
Definition: TFPBlock.cxx:72
This code implements the MD5 message-digest algorithm.
Definition: TMD5.h:44
R__EXTERN TVirtualMonitoringWriter * gMonitoringWriter
void SetBuffer(char *)
Set block buffer.
Definition: TFPBlock.cxx:81
This class represents the encapsulation of a block request.
Definition: TFPBlock.h:22
A doubly linked list.
Definition: TList.h:43
void AddReadBlock(TFPBlock *)
Safe method to add a block to the readList.
void Update(const UChar_t *buf, UInt_t len)
Update TMD5 object to reflect the concatenation of another buffer full of bytes.
Definition: TMD5.cxx:108
R__EXTERN TSystem * gSystem
Definition: TSystem.h:539
void SetFile(TFile *file, TFile::ECacheAction action=TFile::kDisconnect)
Change the file.
void WaitFinishPrefetch()
Killing the async prefetching thread.
TObject * Next()
Definition: TCollection.h:153
#define calloc
Definition: civetweb.c:819
Bool_t ReadBuffer(char *, Long64_t, Int_t)
Return a prefetched element.
Long64_t GetPos(Int_t) const
Get position of the element at index i.
Definition: TFPBlock.h:91
Int_t SumHex(const char *)
Sum up individual hex values to obtain a decimal value.
#define gPerfStats
const Bool_t kFALSE
Definition: RtypesCore.h:92
Long64_t GetDataSize() const
Return size of the data in the block.
Definition: TFPBlock.h:73
void ReadListOfBlocks()
Get blocks specified in prefetchBlocks.
virtual ~TFilePrefetch()
Destructor.
#define ClassImp(name)
Definition: Rtypes.h:336
double Double_t
Definition: RtypesCore.h:55
TThread * GetThread() const
Return reference to the consumer thread.
The TTimeStamp encapsulates seconds and ns since EPOCH.
Definition: TTimeStamp.h:71
The prefetching mechanism uses two classes (TFilePrefetch and TFPBlock) to prefetch in advance a bloc...
Definition: TFilePrefetch.h:31
Int_t GetLen(Int_t) const
Get length of the element at index i.
Definition: TFPBlock.h:97
virtual Bool_t SendFileReadProgress(TFile *)
char * GetBlockFromCache(const char *, Int_t)
Return a buffer from cache.
Definition: file.py:1
Bool_t SetCache(const char *)
Set the path of the cache directory.
void ReadBlock(Long64_t *, Int_t *, Int_t)
Create a TFPBlock object or recycle one and add it to the prefetchBlocks list.
virtual void * OpenDirectory(const char *name)
Open a directory. Returns 0 if directory does not exist.
Definition: TSystem.cxx:833
double result[121]
unsigned char UChar_t
Definition: RtypesCore.h:34
Definition: first.py:1
const Bool_t kTRUE
Definition: RtypesCore.h:91
TFPBlock * GetPendingBlock()
Safe method to remove a block from the pendingList.
TFPBlock * CreateBlockObj(Long64_t *, Int_t *, Int_t)
Create a new block or recycle an old one.
ECacheAction
TTreeCache flushing semantics.
Definition: TFile.h:63