Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TMPWorkerTree.cxx
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: Enrico Guiraud July 2015
3// Modified: G Ganis Jan 2017
4
5/*************************************************************************
6 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13#include "MPCode.h"
14#include "MPSendRecv.h"
15#include "TError.h"
16#include "TMPWorkerTree.h"
17#include "TEnv.h"
18#include <string>
19
20//////////////////////////////////////////////////////////////////////////
21///
22/// \class TMPWorkerTree
23///
24/// This class works in conjunction with TTreeProcessorMP, reacting to messages
25/// received from it as specified by the Notify and HandleInput methods.
26///
27/// \class TMPWorkerTreeFunc
28///
29/// Templated derivation of TMPWorkerTree handlign generic function tree processing.
30///
31/// \class TMPWorkerTreeSel
32///
33/// Templated derivation of TMPWorkerTree handlign selector tree processing.
34///
35//////////////////////////////////////////////////////////////////////////
36
37//////////////////////////////////////////////////////////////////////////
38/// Class constructors.
39/// Note that this does not set variables like fPid or fS (worker's socket).\n
40/// These operations are handled by the Init method, which is called after
41/// forking.\n
42/// This separation is in place because the instantiation of a worker
43/// must be done once _before_ forking, while the initialization of the
44/// members must be done _after_ forking by each of the children processes.
45
47 : TMPWorker(), fFileNames(), fTreeName(), fTree(nullptr), fFile(nullptr), fEntryList(nullptr), fFirstEntry(0),
48 fTreeCache(nullptr), fTreeCacheIsLearning(false), fUseTreeCache(true), fCacheSize(-1)
49{
50 Setup();
51}
52
53TMPWorkerTree::TMPWorkerTree(const std::vector<std::string> &fileNames, TEntryList *entries,
54 const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
55 : TMPWorker(nWorkers, maxEntries), fFileNames(fileNames), fTreeName(treeName), fTree(nullptr), fFile(nullptr),
56 fEntryList(entries), fFirstEntry(firstEntry), fTreeCache(nullptr), fTreeCacheIsLearning(false), fUseTreeCache(true),
57 fCacheSize(-1)
58{
59 Setup();
60}
61
62TMPWorkerTree::TMPWorkerTree(TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries,
63 ULong64_t firstEntry)
64 : TMPWorker(nWorkers, maxEntries), fTree(tree), fFile(nullptr), fEntryList(entries), fFirstEntry(firstEntry),
65 fTreeCache(nullptr), fTreeCacheIsLearning(false), fUseTreeCache(true), fCacheSize(-1)
66{
67 Setup();
68}
69
71{
72 // Properly close the open file, if any
73 CloseFile();
74}
75
76//////////////////////////////////////////////////////////////////////////
77/// Auxiliary method for common initialization
79{
80 Int_t uc = gEnv->GetValue("MultiProc.UseTreeCache", 1);
81 if (uc != 1) fUseTreeCache = false;
82 fCacheSize = gEnv->GetValue("MultiProc.CacheSize", -1);
83}
84
85//////////////////////////////////////////////////////////////////////////
86/// Handle file closing.
87
89{
90 // Avoid destroying the cache; must be placed before deleting the trees
91 if (fFile) {
92 if (fTree) fFile->SetCacheRead(nullptr, fTree);
93 delete fFile ;
94 fFile = nullptr;
95 }
96}
97
98//////////////////////////////////////////////////////////////////////////
99/// Handle file opening.
100
101TFile *TMPWorkerTree::OpenFile(const std::string& fileName)
102{
103
104 TFile *fp = TFile::Open(fileName.c_str());
105 if (fp == nullptr || fp->IsZombie()) {
106 std::stringstream ss;
107 ss << "could not open file " << fileName;
108 std::string errmsg = ss.str();
110 return nullptr;
111 }
112
113 return fp;
114}
115
116//////////////////////////////////////////////////////////////////////////
117/// Retrieve a tree from an open file.
118
120{
121 //retrieve the TTree with the specified name from file
122 //we are not the owner of the TTree object, the file is!
123 TTree *tree = nullptr;
124 if(fTreeName.empty()) {
125 // retrieve the first TTree
126 // (re-adapted from TEventIter.cxx)
127 if (fp->GetListOfKeys()) {
128 for(auto k : *fp->GetListOfKeys()) {
129 TKey *key = static_cast<TKey*>(k);
130 if (!strcmp(key->GetClassName(), "TTree") || !strcmp(key->GetClassName(), "TNtuple"))
131 tree = static_cast<TTree*>(fp->Get(key->GetName()));
132 }
133 }
134 } else {
135 tree = static_cast<TTree*>(fp->Get(fTreeName.c_str()));
136 }
137 if (tree == nullptr) {
138 std::stringstream ss;
139 ss << "cannot find tree with name " << fTreeName << " in file " << fp->GetName();
140 std::string errmsg = ss.str();
142 return nullptr;
143 }
144
145 return tree;
146}
147
148//////////////////////////////////////////////////////////////////////////
149/// Tree cache handling
150
152{
153 if (fUseTreeCache) {
154 TFile *curfile = tree->GetCurrentFile();
155 if (curfile) {
156 if (!fTreeCache) {
157 tree->SetCacheSize(fCacheSize);
158 fTreeCache = (TTreeCache *)curfile->GetCacheRead(tree);
159 if (fCacheSize < 0) fCacheSize = tree->GetCacheSize();
160 } else {
163 curfile->SetCacheRead(fTreeCache, tree);
164 }
165 if (fTreeCache) {
167 }
168 } else {
169 Warning("SetupTreeCache", "default tree does not have a file attached: corruption? Tree cache untouched");
170 }
171 } else {
172 // Disable the cache
173 tree->SetCacheSize(0);
174 }
175}
176
177//////////////////////////////////////////////////////////////////////////
178/// Init overload defining max entries
179
181{
182
183 TMPWorker::Init(fd, workerN);
185}
186
187//////////////////////////////////////////////////////////////////////////
188/// Max entries evaluation
189
191{
192 // E.g.: when dividing 10 entries between 3 workers, the first
193 // two will process 10/3 == 3 entries, the last one will process
194 // 10 - 2*(10/3) == 4 entries.
195 if(GetNWorker() < fNWorkers-1)
196 return maxEntries/fNWorkers;
197 else
198 return maxEntries - (fNWorkers-1)*(maxEntries/fNWorkers);
199}
200
201//////////////////////////////////////////////////////////////////////////
202/// Generic input handling
203
205{
206 UInt_t code = msg.first;
207
208 if (code == MPCode::kProcRange
209 || code == MPCode::kProcFile
210 || code == MPCode::kProcTree) {
211 //execute fProcFunc on a file or a range of entries in a file
212 Process(code, msg);
213 } else if (code == MPCode::kSendResult) {
214 //send back result
215 SendResult();
216 } else {
217 //unknown code received
218 std::string reply = "S" + std::to_string(GetNWorker());
219 reply += ": unknown code received: " + std::to_string(code);
220 MPSend(GetSocket(), MPCode::kError, reply.c_str());
221 }
222}
223
224
225
226//////////////////////////////////////////////////////////////////////////
227/// Selector processing SendResult and Process overload
228
230{
231 //send back result
234}
235
236//////////////////////////////////////////////////////////////////////////
237/// Selector specialization
238
240{
241 //evaluate the index of the file to process in fFileNames
242 //(we actually don't need the parameter if code == kProcTree)
243
244 Long64_t start = 0;
245 Long64_t finish = 0;
246 TEntryList *enl = nullptr;
247 std::string errmsg;
248 if (LoadTree(code, msg, start, finish, &enl, errmsg) != 0) {
249 SendError(errmsg);
250 return;
251 }
252
253 if (fCallBegin) {
254 fSelector.SlaveBegin(nullptr);
255 fCallBegin = false;
256 }
257
260 for (Long64_t entry = start; entry < finish; ++entry) {
261 Long64_t e = (enl) ? enl->GetEntry(entry) : entry;
263 }
264
265 // update the number of processed entries
266 fProcessedEntries += finish - start;
267
269
270 return;
271}
272
273//////////////////////////////////////////////////////////////////////////
274/// Load the required tree and evaluate the processing range
275
277 std::string &errmsg)
278{
279 // evaluate the index of the file to process in fFileNames
280 //(we actually don't need the parameter if code == kProcTree)
281
282 start = 0;
283 finish = 0;
284 errmsg = "";
285
286 UInt_t fileN = 0;
287 UInt_t nProcessed = 0;
288 bool setupcache = true;
289
290 std::string mgroot = "[S" + std::to_string(GetNWorker()) + "]: ";
291
292 TTree *tree = nullptr;
293 if (code == MPCode::kProcTree) {
294
295 mgroot += "MPCode::kProcTree: ";
296
297 // The tree must be defined at this level
298 if(fTree == nullptr) {
299 errmsg = mgroot + std::string("tree undefined!");
300 return -1;
301 }
302
303 //retrieve the total number of entries ranges processed so far by TPool
304 nProcessed = ReadBuffer<UInt_t>(msg.second.get());
305
306 //create entries range
307 //example: for 21 entries, 4 workers we want ranges 0-5, 5-10, 10-15, 15-21
308 //and this worker must take the rangeN-th range
309 Long64_t nEntries = fTree->GetEntries();
310 UInt_t nBunch = nEntries / fNWorkers;
311 UInt_t rangeN = nProcessed % fNWorkers;
312 start = rangeN * nBunch;
313 if (rangeN < (fNWorkers - 1)) {
314 finish = (rangeN+1)*nBunch;
315 } else {
316 finish = nEntries;
317 }
318
319 //process tree
320 tree = fTree;
321 CloseFile(); // May not be needed
322 if (fTree->GetCurrentFile()) {
323 // We need to reopen the file locally (TODO: to understand and fix this)
325 if (!(tree = (TTree *) fFile->Get(fTree->GetName()))) {
326 errmsg = mgroot + std::string("unable to retrieve tree from open file ") +
327 std::string(fTree->GetCurrentFile()->GetName());
328 delete fFile;
329 return -1;
330 }
331 fTree = tree;
332 } else {
333 //errors are handled inside OpenFile
334 errmsg = mgroot + std::string("unable to open file ") + std::string(fTree->GetCurrentFile()->GetName());
335 if (fFile && fFile->IsZombie()) delete fFile;
336 return -1;
337 }
338 }
339
340 } else {
341
342 if (code == MPCode::kProcRange) {
343 mgroot += "MPCode::kProcRange: ";
344 //retrieve the total number of entries ranges processed so far by TPool
345 nProcessed = ReadBuffer<UInt_t>(msg.second.get());
346 //evaluate the file and the entries range to process
347 fileN = nProcessed / fNWorkers;
348 } else if (code == MPCode::kProcFile) {
349 mgroot += "MPCode::kProcFile: ";
350 //evaluate the file and the entries range to process
351 fileN = ReadBuffer<UInt_t>(msg.second.get());
352 } else {
353 errmsg += "MPCode undefined!";
354 return -1;
355 }
356
357 // Open the file if required
358 if (fFile && strcmp(fFileNames[fileN].c_str(), fFile->GetName())) CloseFile();
359 if (!fFile) {
360 fFile = OpenFile(fFileNames[fileN]);
361 if (fFile == nullptr) {
362 // errors are handled inside OpenFile
363 errmsg = mgroot + std::string("unable to open file ") + fFileNames[fileN];
364 return -1;
365 }
366 }
367
368 //retrieve the TTree with the specified name from file
369 //we are not the owner of the TTree object, the file is!
370 tree = RetrieveTree(fFile);
371 if (tree == nullptr) {
372 //errors are handled inside RetrieveTree
373 errmsg = mgroot + std::string("unable to retrieve tree from open file ") + fFileNames[fileN];
374 return -1;
375 }
376
377 // Prepare to setup the cache, if required
378 setupcache = (tree != fTree) ? true : false;
379
380 // Store as reference
381 fTree = tree;
382
383 //create entries range
384 if (code == MPCode::kProcRange) {
385 //example: for 21 entries, 4 workers we want ranges 0-5, 5-10, 10-15, 15-21
386 //and this worker must take the rangeN-th range
387 Long64_t nEntries = tree->GetEntries();
388 UInt_t nBunch = nEntries / fNWorkers;
389 if(nEntries % fNWorkers) nBunch++;
390 UInt_t rangeN = nProcessed % fNWorkers;
391 start = rangeN * nBunch;
392 if(rangeN < (fNWorkers-1))
393 finish = (rangeN+1)*nBunch;
394 else
395 finish = nEntries;
396 } else {
397 start = 0;
398 finish = tree->GetEntries();
399 }
400 }
401
402 // Setup the cache, if required
403 if (setupcache) SetupTreeCache(fTree);
404
405 // Get the entrylist, if required
406 if (fEntryList && enl) {
407 if ((*enl = fEntryList->GetEntryList(fTree->GetName(), TUrl(fFile->GetName()).GetFile()))) {
408 // create entries range
409 if (code == MPCode::kProcRange) {
410 // example: for 21 entries, 4 workers we want ranges 0-5, 5-10, 10-15, 15-21
411 // and this worker must take the rangeN-th range
412 ULong64_t nEntries = (*enl)->GetN();
413 UInt_t nBunch = nEntries / fNWorkers;
414 if (nEntries % fNWorkers) nBunch++;
415 UInt_t rangeN = nProcessed % fNWorkers;
416 start = rangeN * nBunch;
417 if (rangeN < (fNWorkers - 1))
418 finish = (rangeN + 1) * nBunch;
419 else
420 finish = nEntries;
421 } else {
422 start = 0;
423 finish = (*enl)->GetN();
424 }
425 } else {
426 Warning("LoadTree", "failed to get entry list for: %s %s", fTree->GetName(), TUrl(fFile->GetName()).GetFile());
427 }
428 }
429
430 //check if we are going to reach the max of entries
431 //change finish accordingly
432 if (fMaxNEntries)
433 if (fProcessedEntries + finish - start > fMaxNEntries)
434 finish = start + fMaxNEntries - fProcessedEntries;
435
436 if (gDebug > 0 && fFile)
437 Info("LoadTree", "%s %d %d file: %s %lld %lld", mgroot.c_str(), nProcessed, fileN, fFile->GetName(), start,
438 finish);
439
440 return 0;
441}
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition MPSendRecv.h:32
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
#define e(i)
Definition RSha256.hxx:103
long long Long64_t
Definition RtypesCore.h:80
unsigned long long ULong64_t
Definition RtypesCore.h:81
R__EXTERN TEnv * gEnv
Definition TEnv.h:170
void Info(const char *location, const char *msgfmt,...)
Use this function for informational messages.
Definition TError.cxx:218
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:229
Int_t gDebug
Definition TROOT.cxx:595
TList * GetListOfKeys() const override
TObject * Get(const char *namecycle) override
Return pointer to object identified by namecycle.
A List of entry numbers in a TTree or TChain.
Definition TEntryList.h:26
virtual TEntryList * GetEntryList(const char *treename, const char *filename, Option_t *opt="")
Return the entry list, corresponding to treename and filename By default, the filename is first tried...
virtual Long64_t GetEntry(Long64_t index)
Return the number of the entry #index of this TEntryList in the TTree or TChain See also Next().
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition TEnv.cxx:491
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Definition TFile.h:53
virtual void SetCacheRead(TFileCacheRead *cache, TObject *tree=nullptr, ECacheAction action=kDisconnect)
Set a pointer to the read cache.
Definition TFile.cxx:2358
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:4082
TFileCacheRead * GetCacheRead(const TObject *tree=nullptr) const
Return a pointer to the current read cache.
Definition TFile.cxx:1255
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition TKey.h:28
virtual const char * GetClassName() const
Definition TKey.h:75
void SendResult() override
Selector processing SendResult and Process overload.
TSelector & fSelector
pointer to the selector to be used to process the tree. It is null if we are not using a TSelector.
void Process(UInt_t code, MPCodeBufPair &msg) override
Selector specialization.
Int_t LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start, Long64_t &finish, TEntryList **enl, std::string &errmsg)
Load the required tree and evaluate the processing range.
TTree * fTree
pointer to the tree to be processed. It is only used if the tree is directly passed to TProcessExecut...
TFile * fFile
last open file
~TMPWorkerTree() override
void Init(int fd, UInt_t workerN) override
Init overload defining max entries.
void HandleInput(MPCodeBufPair &msg) override
Execute instructions received from a MP client.
void Setup()
Auxiliary method for common initialization.
ULong64_t EvalMaxEntries(ULong64_t maxEntries)
Max entries evaluation.
TTree * RetrieveTree(TFile *fp)
Retrieve a tree from an open file.
TEntryList * fEntryList
entrylist
TMPWorkerTree()
Class constructors.
bool fUseTreeCache
Control usage of the tree cache.
std::vector< std::string > fFileNames
the files to be processed by all workers
TFile * OpenFile(const std::string &fileName)
Handle file opening.
virtual void SendResult()
virtual void Process(UInt_t, MPCodeBufPair &)
Long64_t fCacheSize
Cache size.
std::string fTreeName
the name of the tree to be processed
void CloseFile()
Handle file closing.
void SetupTreeCache(TTree *tree)
Tree cache handling.
TTreeCache * fTreeCache
instance of the tree cache for the tree
bool fTreeCacheIsLearning
Whether cache is in learning phase.
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
Definition TMPWorker.h:25
void SendError(const std::string &errmsg, unsigned int code=MPCode::kError)
Error sender.
unsigned GetNWorker() const
Definition TMPWorker.h:41
ULong64_t fMaxNEntries
the maximum number of entries to be processed by this worker
Definition TMPWorker.h:46
ULong64_t fProcessedEntries
the number of entries processed by this worker so far
Definition TMPWorker.h:47
TSocket * GetSocket()
Definition TMPWorker.h:39
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
Definition TMPWorker.cxx:52
unsigned fNWorkers
the number of workers spawned
Definition TMPWorker.h:45
const char * GetName() const override
Returns name of object.
Definition TNamed.h:47
R__ALWAYS_INLINE Bool_t IsZombie() const
Definition TObject.h:153
virtual void Init(TTree *)
Definition TSelector.h:53
virtual bool Process(Long64_t)
The Process() function is called for each entry in the tree (or possibly keyed object in the case of ...
virtual void SlaveBegin(TTree *)
Definition TSelector.h:55
bool Notify() override
This method must be overridden to handle object notification (the base implementation is no-op).
Definition TSelector.h:56
virtual TList * GetOutputList() const
Definition TSelector.h:69
virtual void SlaveTerminate()
Definition TSelector.h:70
A cache to speed-up the reading of ROOT datasets.
Definition TTreeCache.h:32
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
bool IsLearning() const override
Definition TTreeCache.h:152
virtual void ResetCache()
This will simply clear the cache.
A TTree represents a columnar dataset.
Definition TTree.h:79
TFile * GetCurrentFile() const
Return pointer to the current file.
Definition TTree.cxx:5479
virtual Long64_t GetEntries() const
Definition TTree.h:463
This class represents a WWW compatible URL.
Definition TUrl.h:33
const char * GetFile() const
Definition TUrl.h:69
@ kSendResult
Ask for a kFuncResult/kProcResult.
Definition MPCode.h:36
@ kProcFile
Tell a TMPWorkerTree which tree to process. The object sent is a TreeInfo.
Definition MPCode.h:38
@ kIdling
We are ready for the next task.
Definition MPCode.h:35
@ kError
Error message.
Definition MPCode.h:47
@ kProcRange
Tell a TMPWorkerTree which tree and entries range to process. The object sent is a TreeRangeInfo.
Definition MPCode.h:39
@ kProcTree
Tell a TMPWorkerTree to process the tree that was passed to it at construction time.
Definition MPCode.h:40
@ kProcError
Tell the client there was an error while processing.
Definition MPCode.h:44
@ kProcResult
The message contains the result of the processing of a TTree.
Definition MPCode.h:42