Logo ROOT   6.07/09
Reference Guide
TPoolPlayer.cxx
Go to the documentation of this file.
1 #include <iostream>
2 
3 #include "TPoolPlayer.h"
4 #include "PoolUtils.h"
5 #include "MPCode.h"
6 
7 #include "TFile.h"
8 
9 void TPoolPlayer::Init(int fd, unsigned nWorkers) {
10  TMPWorker::Init(fd, nWorkers);
11 }
12 
14 {
15  unsigned code = msg.first;
16 
17  if (code == PoolCode::kProcTree) {
18  ProcTree(msg);
19  } else if (code == PoolCode::kProcRange || code == PoolCode::kProcFile) {
20  ProcDataSet(code, msg);
21  } else if (code == PoolCode::kSendResult){
24  }else {
25  //unknown code received
26  std::string errmsg = "unknown code received: " + std::to_string(code);
27  SendError(errmsg);
28  }
29 }
30 
31 void TPoolPlayer::ProcDataSet(unsigned int code, MPCodeBufPair& msg)
32 {
33  //evaluate the index of the file to process in fFileNames
34  //(we actually don't need the parameter if code == kProcTree)
35  unsigned fileN = 0;
36  unsigned nProcessed = 0;
37 
38  if (code == PoolCode::kProcRange) {
39  //retrieve the total number of entries ranges processed so far by TPool
40  nProcessed = ReadBuffer<unsigned>(msg.second.get());
41  //evaluate the file and the entries range to process
42  fileN = nProcessed / fNWorkers;
43  } else {
44  //evaluate the file and the entries range to process
45  fileN = ReadBuffer<unsigned>(msg.second.get());
46  }
47 
48  // Open the file
49  fFile = OpenFile(fFileNames[fileN]);
50  if (fFile == nullptr) {
51  //errors are handled inside OpenFile
52  std::string errmsg = "unable to open file " + fFileNames[fileN];
53  SendError(errmsg);
54  return;
55  }
56 
57  //retrieve the TTree with the specified name from file
58  //we are not the owner of the TTree object, the file is!
59  fTree = RetrieveTree(fFile);
60  if (fTree == nullptr) {
61  //errors are handled inside RetrieveTree
62  std::string errmsg = "unable to retrieve tree from open file " + fFileNames[fileN];
63  SendError(errmsg);
64  return;
65  }
66  TTree *tree = fTree;
67 
68  // Setup the cache, if required
70 
71  //create entries range
72  Long64_t start = 0;
73  Long64_t finish = 0;
74  if (code == PoolCode::kProcRange) {
75  //example: for 21 entries, 4 workers we want ranges 0-5, 5-10, 10-15, 15-21
76  //and this worker must take the rangeN-th range
77  unsigned nEntries = tree->GetEntries();
78  unsigned nBunch = nEntries / fNWorkers;
79  if(nEntries % fNWorkers) nBunch++;
80  unsigned rangeN = nProcessed % fNWorkers;
81  start = rangeN*nBunch + 1;
82  if(rangeN < (fNWorkers-1))
83  finish = (rangeN+1)*nBunch;
84  else
85  finish = nEntries;
86  } else {
87  start = 0;
88  finish = tree->GetEntries();
89  }
90 
91  //check if we are going to reach the max of entries
92  //change finish accordingly
93  if (fMaxNEntries)
94  if (fProcessedEntries + finish - start > fMaxNEntries)
95  finish = start + fMaxNEntries - fProcessedEntries;
96 
97  if(fFirstEntry){
98  fSelector.SlaveBegin(nullptr);
99  fFirstEntry = false;
100  }
101 
102  fSelector.Init(tree);
103  fSelector.Notify();
104  for(Long64_t entry = start; entry<finish; ++entry) {
105  fSelector.Process(entry);
106  }
107 
108  //update the number of processed entries
109  fProcessedEntries += finish - start;
110 
112 
113  return;
114 }
115 
117 {
118 
119  // The tree must be defined at this level
120  if(fTree == nullptr) {
121  std::cout << "tree undefined!\n" ;
122  //errors are handled inside RetrieveTree
123  return;
124  }
125 
126  //evaluate the index of the file to process in fFileNames
127  //(we actually don't need the parameter if code == kProcTree)
128  unsigned nProcessed = 0;
129  //retrieve the total number of entries ranges processed so far by TPool
130  nProcessed = ReadBuffer<unsigned>(msg.second.get());
131 
132  //create entries range
133  Long64_t start = 0;
134  Long64_t finish = 0;
135  //example: for 21 entries, 4 workers we want ranges 0-5, 5-10, 10-15, 15-21
136  //and this worker must take the rangeN-th range
137  unsigned nEntries = fTree->GetEntries();
138  unsigned nBunch = nEntries / fNWorkers;
139  unsigned rangeN = nProcessed % fNWorkers;
140  start = rangeN*nBunch + 1;
141  if(rangeN < (fNWorkers-1))
142  finish = (rangeN+1)*nBunch;
143  else
144  finish = nEntries;
145 
146  //check if we are going to reach the max of entries
147  //change finish accordingly
148  if (fMaxNEntries)
149  if (fProcessedEntries + finish - start > fMaxNEntries)
150  finish = start + fMaxNEntries - fProcessedEntries;
151 
152  //process tree
153  TTree *tree = fTree;
154  CloseFile(); // May not be needed
155  if (fTree->GetCurrentFile()) {
156  // We need to reopen the file locally (TODO: to understand and fix this)
157  if ((fFile = TFile::Open(fTree->GetCurrentFile()->GetName())) && !fFile->IsZombie()) {
158  if (!(tree = (TTree *) fFile->Get(fTree->GetName()))) {
159  std::string errmsg = "unable to retrieve tree from open file " +
160  std::string(fTree->GetCurrentFile()->GetName());
161  SendError(errmsg);
162  }
163  fTree = tree;
164  } else {
165  //errors are handled inside OpenFile
166  std::string errmsg = "unable to open file " + std::string(fTree->GetCurrentFile()->GetName());
167  SendError(errmsg);
168  }
169  }
170 
171  // Setup the cache, if required
173 
174  if(fFirstEntry){
175  fSelector.SlaveBegin(nullptr);
176  fFirstEntry = false;
177  }
178 
179  fSelector.Init(tree);
180  fSelector.Notify();
181 
182  for(Long64_t entry = start; entry<finish; ++entry) {
183  fSelector.Process(entry);
184  }
185 
186  //update the number of processed entries
187  fProcessedEntries += finish - start;
188 
190 
191  return;
192 }
long long Long64_t
Definition: RtypesCore.h:69
Tell a TPoolProcessor to process the tree that was passed to it at construction time.
Definition: PoolUtils.h:40
virtual Bool_t Notify()
This method must be overridden to handle object notification.
Definition: TSelector.h:64
Tell a TPoolProcessor which tree and entries range to process. The object sent is a TreeRangeInfo...
Definition: PoolUtils.h:39
virtual TList * GetOutputList() const
Definition: TSelector.h:76
TFile * OpenFile(const std::string &fileName)
Handle file opening.
Definition: TMPWorker.cxx:185
virtual TObject * Get(const char *namecycle)
Return pointer to object identified by namecycle.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
Bool_t IsZombie() const
Definition: TObject.h:127
TFile * GetCurrentFile() const
Return pointer to the current file.
Definition: TTree.cxx:5052
Tell a TPoolProcessor which tree to process. The object sent is a TreeInfo.
Definition: PoolUtils.h:38
We are ready for the next task.
Definition: PoolUtils.h:35
void SendError(const std::string &errmsg, unsigned int code=MPCode::kError)
Error sender.
Definition: TMPWorker.cxx:268
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=1, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:3871
ULong64_t fMaxNEntries
the maximum number of entries to be processed by this worker
Definition: TPoolPlayer.h:48
void Init(int fd, unsigned nWorkers)
This method is called by children processes right after forking.
Definition: TPoolPlayer.cxx:9
void SetupTreeCache(TTree *tree)
Tree cache handling.
Definition: TMPWorker.cxx:236
unsigned fNWorkers
the number of workers spawned
Definition: TPoolPlayer.h:47
void CloseFile()
Handle file closing.
Definition: TMPWorker.cxx:172
TFile * fFile
last open file
Definition: TMPWorker.h:54
void HandleInput(MPCodeBufPair &msg)
Execute instructions received from a TPool client.
Definition: TPoolPlayer.cxx:13
virtual void SlaveBegin(TTree *)
Definition: TSelector.h:63
TSocket * GetSocket()
Definition: TMPWorker.h:45
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:51
bool fFirstEntry
Definition: TPoolPlayer.h:50
TTree * fTree
tree to be processed. It is only used if the tree is directly passed to TProcessExecutor::Process as ...
Definition: TPoolPlayer.h:46
The message contains the result of the processing of a TTree.
Definition: PoolUtils.h:42
virtual void SlaveTerminate()
Definition: TSelector.h:77
std::pair< unsigned, std::unique_ptr< TBufferFile >> MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:31
std::vector< std::string > fFileNames
the files to be processed by all workers
Definition: TPoolPlayer.h:44
Ask for a kFuncResult/kProcResult.
Definition: PoolUtils.h:36
virtual Bool_t Process(Long64_t)
Definition: TSelector.cxx:292
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
Definition: TMPWorker.cxx:111
void ProcDataSet(unsigned int code, MPCodeBufPair &msg)
Run fSelector->Process over a data set.
Definition: TPoolPlayer.cxx:31
Definition: tree.py:1
virtual Long64_t GetEntries() const
Definition: TTree.h:392
A TTree object has a header with a name and a title.
Definition: TTree.h:98
ULong64_t fProcessedEntries
the number of entries processed by this worker so far
Definition: TPoolPlayer.h:49
void ProcTree(MPCodeBufPair &msg)
Run fSelector->Process over the tree entries, send back result.
TTree * RetrieveTree(TFile *fp)
Retrieve a tree from an open file.
Definition: TMPWorker.cxx:204
virtual void Init(TTree *)
Definition: TSelector.h:61
TSelector & fSelector
pointer to the selector to be used to process the tree. It is null if we are not using a TSelector...
Definition: TPoolPlayer.h:43