Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TPacketizerFile.cxx
Go to the documentation of this file.
1// @(#)root/proofplayer:$Id$
2// Author: G. Ganis 2009
3
4/*************************************************************************
5 * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/** \class TPacketizerFile
13\ingroup proofkernel
14
15This packetizer generates packets which contain a single file path
16to be used in process. Used for tasks generating files, like in
17PROOF bench.
18
19*/
20
21#include "TPacketizerFile.h"
22
23#include "Riostream.h"
24#include "TDSet.h"
25#include "TError.h"
26#include "TEventList.h"
27#include "TMap.h"
28#include "TMessage.h"
29#include "TMonitor.h"
30#include "TNtupleD.h"
31#include "TObject.h"
32#include "TParameter.h"
33#include "TPerfStats.h"
34#include "TProofDebug.h"
35#include "TProof.h"
36#include "TProofPlayer.h"
37#include "TProofServ.h"
38#include "TSlave.h"
39#include "TSocket.h"
40#include "TStopwatch.h"
41#include "TTimer.h"
42#include "TUrl.h"
43#include "TClass.h"
44#include "TMath.h"
45#include "TObjString.h"
46#include "TFileInfo.h"
47#include "TFileCollection.h"
48#include "THashList.h"
49
50//------------------------------------------------------------------------------
51
53
54friend class TPacketizerFile;
55
56private:
57 Long64_t fLastProcessed; // number of processed entries of the last packet
58 Double_t fSpeed; // estimated current average speed of the processing slave
59 Double_t fTimeInstant; // stores the time instant when the current packet started
60 TNtupleD *fCircNtp; // Keeps circular info for speed calculations
61 Long_t fCircLvl; // Circularity level
62
63public:
64 TSlaveStat(TSlave *sl, TList *input);
66
68
69 void UpdatePerformance(Double_t time);
71};
72
73// Iterator wrapper
75
76private:
77 TString fName; // Name of reference
78 TIter *fIter; // Iterator
79
80public:
81 TIterObj(const char *n, TIter *iter) : fName(n), fIter(iter) { }
82 virtual ~TIterObj() { if (fIter) delete fIter; }
83
84 const char *GetName() const {return fName;}
85 TIter *GetIter() const {return fIter;}
86 void Print(Option_t* option = "") const;
87};
88
90
91////////////////////////////////////////////////////////////////////////////////
92/// Constructor
93
96 : TVirtualPacketizer(input, st)
97{
98 PDB(kPacketizer,1) Info("TPacketizerFile", "enter");
100 fValid = kFALSE;
101 fAssigned = 0;
104
105 if (!input || (input && input->GetSize() <= 0)) {
106 Error("TPacketizerFile", "input file is undefined or empty!");
108 return;
109 }
110
111 // Check if the files not explicitly assigned have to be processed
112 Int_t procnotass = 1;
113 if (TProof::GetParameter(input, "PROOF_ProcessNotAssigned", procnotass) == 0) {
114 if (procnotass == 0) {
115 Info("TPacketizerFile", "files not assigned to workers will not be processed");
117 }
118 }
119
120 // Check if the TFileInfo object has to be added to the packet
121 Int_t addfileinfo = 0;
122 if (TProof::GetParameter(input, "PROOF_IncludeFileInfoInPacket", addfileinfo) == 0) {
123 if (addfileinfo == 1) {
124 Info("TPacketizerFile",
125 "TFileInfo object will be included in the packet as associated object");
127 }
128 }
129
130 // These are the file to be created/processed per node; the information
131 if (!(fFiles = dynamic_cast<TMap *>(input->FindObject("PROOF_FilesToProcess")))) {
132 Error("TPacketizerFile", "map of files to be processed/created not found");
134 return;
135 }
136
137 // The worker stats
138 fSlaveStats = new TMap;
140
141 TList nodes;
142 nodes.SetOwner(kTRUE);
143 TSlave *wrk;
144 TIter si(workers);
145 while ((wrk = (TSlave *) si.Next())) {
146 fSlaveStats->Add(wrk, new TSlaveStat(wrk, input));
147 TString wrkname = TUrl(wrk->GetName()).GetHostFQDN();
148 Info("TPacketizerFile", "worker: %s", wrkname.Data());
149 if (!nodes.FindObject(wrkname)) nodes.Add(new TObjString(wrkname));
150 }
151
152 // The list of iterators
153 fIters = new TList;
155
156 // There must be something in
157 fTotalEntries = 0;
158 fNotAssigned = new TList;
159 fNotAssigned->SetName("*");
160 TIter nxl(fFiles);
161 TObject *key, *o = 0;
162 while ((key = nxl()) != 0) {
163 THashList *wrklist = dynamic_cast<THashList *>(fFiles->GetValue(key));
164 if (!wrklist) {
165 TFileCollection *fc = dynamic_cast<TFileCollection *>(fFiles->GetValue(key));
166 if (fc) wrklist = fc->GetList();
167 }
168 if (wrklist) {
169 TString hname = TUrl(key->GetName()).GetHostFQDN();
170 if ((o = nodes.FindObject(hname))) {
171 fTotalEntries += wrklist->GetSize();
172 fIters->Add(new TIterObj(hname, new TIter(wrklist)));
173 // Notify
174 PDB(kPacketizer,2)
175 Info("TPacketizerFile", "%d files of '%s' (fqdn: '%s') assigned to '%s'",
176 wrklist->GetSize(), key->GetName(), hname.Data(), o->GetName());
177 } else {
178 // We add all to the not assigned list so that they will be distributed
179 // according to the load
180 TIter nxf(wrklist);
181 while ((o = nxf()))
182 fNotAssigned->Add(o);
183 // Notify
184 PDB(kPacketizer,2)
185 Info("TPacketizerFile", "%d files of '%s' (fqdn: '%s') not assigned",
186 wrklist->GetSize(), key->GetName(), hname.Data());
187 }
188 }
189 }
190 if (fNotAssigned && fNotAssigned->GetSize() > 0) {
192 fIters->Add(new TIterObj("*", new TIter(fNotAssigned)));
193 Info("TPacketizerFile", "non-assigned files: %d", fNotAssigned->GetSize());
195 }
196 if (fTotalEntries <= 0) {
197 Error("TPacketizerFile", "no file path in the map!");
200 return;
201 } else {
202 Info("TPacketizerFile", "processing %lld files", fTotalEntries);
203 fIters->Print();
204 }
205
206 fStopwatch = new TStopwatch();
207 fStopwatch->Start();
208 fValid = kTRUE;
209 PDB(kPacketizer,1) Info("TPacketizerFile", "return");
210
211 // Done
212 return;
213}
214
215////////////////////////////////////////////////////////////////////////////////
216/// Destructor.
217
219{
225}
226
227////////////////////////////////////////////////////////////////////////////////
228/// Get current time
229
231{
232 Double_t retValue = fStopwatch->RealTime();
234 return retValue;
235}
236
237////////////////////////////////////////////////////////////////////////////////
238/// Get Estimation of the current rate; just summing the current rates of
239/// the active workers
240
242{
243 all = kTRUE;
244 // Loop over the workers
245 Float_t currate = 0.;
246 if (fSlaveStats && fSlaveStats->GetSize() > 0) {
247 TIter nxw(fSlaveStats);
248 TObject *key;
249 while ((key = nxw()) != 0) {
250 TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(key);
251 if (wrkstat && wrkstat->GetProgressStatus() && wrkstat->GetEntriesProcessed() > 0) {
252 // Sum-up the current rates
253 currate += wrkstat->GetProgressStatus()->GetCurrentRate();
254 } else {
255 all = kFALSE;
256 }
257 }
258 }
259 // Done
260 return currate;
261}
262
263////////////////////////////////////////////////////////////////////////////////
264/// Get next packet
265
267{
268 TDSetElement *elem = 0;
269 if (!fValid) return elem;
270
271 // Find slave
272 TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(wrk);
273 if (!wrkstat) {
274 Error("GetNextPacket", "could not find stat object for worker '%s'!", wrk->GetName());
275 return elem;
276 }
277
278 PDB(kPacketizer,2)
279 Info("GetNextPacket","worker-%s: fAssigned %lld / %lld", wrk->GetOrdinal(), fAssigned, fTotalEntries);
280
281 // Update stats & free old element
282 Double_t latency = 0., proctime = 0., proccpu = 0.;
283 Long64_t bytesRead = -1;
284 Long64_t totalEntries = -1; // used only to read an old message type
285 Long64_t totev = 0;
286 Long64_t numev = -1;
287
288 TProofProgressStatus *status = 0;
289 if (wrk->GetProtocol() > 18) {
290 (*r) >> latency;
291 (*r) >> status;
292
293 // Calculate the progress made in the last packet
294 TProofProgressStatus *progress = 0;
295 if (status) {
296 // upadte the worker status
297 numev = status->GetEntries() - wrkstat->GetEntriesProcessed();
298 progress = wrkstat->AddProcessed(status);
299 if (progress) {
300 // (*fProgressStatus) += *progress;
301 proctime = progress->GetProcTime();
302 proccpu = progress->GetCPUTime();
303 totev = status->GetEntries(); // for backward compatibility
304 bytesRead = progress->GetBytesRead();
305 delete progress;
306 }
307 delete status;
308 } else
309 Error("GetNextPacket", "no status came in the kPROOF_GETPACKET message");
310 } else {
311
312 (*r) >> latency >> proctime >> proccpu;
313
314 // only read new info if available
315 if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
316 if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
317 if (r->BufferSize() > r->Length()) (*r) >> totev;
318
319 numev = totev - wrkstat->GetEntriesProcessed();
320 wrkstat->GetProgressStatus()->IncEntries(numev);
321 wrkstat->GetProgressStatus()->SetLastUpdate();
322 }
323
326
327 PDB(kPacketizer,2)
328 Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
329 wrk->GetOrdinal(), wrk->GetName(),
330 numev, latency, proctime, proccpu, bytesRead);
331
332 if (gPerfStats != 0) {
333 gPerfStats->PacketEvent(wrk->GetOrdinal(), wrk->GetName(), "", numev,
334 latency, proctime, proccpu, bytesRead);
335 }
336
337 if (fAssigned == fTotalEntries) {
338 // Send last timer message
339 HandleTimer(0);
340 return 0;
341 }
342
343 if (fStop) {
344 // Send last timer message
345 HandleTimer(0);
346 return 0;
347 }
348
349 PDB(kPacketizer,2)
350 Info("GetNextPacket", "worker-%s (%s): getting next files ... ", wrk->GetOrdinal(),
351 wrk->GetName());
352
353 // Get next file now
354 TObject *nextfile = 0;
355
356 // Find iterator associated to the worker
357 TString wrkname = TUrl(wrk->GetName()).GetHostFQDN();
358 TIterObj *io = dynamic_cast<TIterObj *>(fIters->FindObject(wrkname));
359 if (io) {
360 // Get next file to process in the list of the worker
361 if (io->GetIter())
362 nextfile = io->GetIter()->Next();
363 }
364
365 // If not found or all files already processed, check if a generic iterator
366 // has still some files to process
367 if (!nextfile && fProcNotAssigned) {
368 if ((io = dynamic_cast<TIterObj *>(fIters->FindObject("*")))) {
369 // Get next file to process in the list of the worker
370 if (io->GetIter())
371 nextfile = io->GetIter()->Next();
372 }
373 }
374
375 // Return if nothing to process
376 if (!nextfile) return elem;
377
378 // The file name: we support TObjString or TFileInfo
379 TString filename;
380 TObjString *os = 0;
381 TFileInfo *fi = 0;
382 if ((os = dynamic_cast<TObjString *>(nextfile))) {
383 filename = os->GetName();
384 } else {
385 if ((fi = dynamic_cast<TFileInfo *>(nextfile)))
386 filename = fi->GetCurrentUrl()->GetUrl();
387 }
388 // Nothing to process
389 if (filename.IsNull()) {
390 Warning("GetNextPacket", "found unsupported object of type '%s' in list: it must"
391 " be 'TObjString' or 'TFileInfo'", nextfile->ClassName());
392 return elem;
393 }
394 // Prepare the packet
395 PDB(kPacketizer,2)
396 Info("GetNextPacket", "worker-%s: assigning: '%s' (remaining %lld files)",
397 wrk->GetOrdinal(), filename.Data(), (fTotalEntries - fAssigned));
398 elem = new TDSetElement(filename, "", "", 0, 1);
400
401 // Add the element, if required
402 if (fAddFileInfo && fi) {
403 elem->AddAssocObj(fi);
404 PDB(kPacketizer,2) fi->Print("L");
405 }
406
407 // Update the total counter
408 fAssigned += 1;
409
410 return elem;
411}
412
413//------------------------------------------------------------------------------
414
415////////////////////////////////////////////////////////////////////////////////
416/// Main constructor
417
419 : fLastProcessed(0),
420 fSpeed(0), fTimeInstant(0), fCircLvl(5)
421{
422 // Initialize the circularity ntple for speed calculations
423 fCircNtp = new TNtupleD("Speed Circ Ntp", "Circular process info","tm:ev");
424 TProof::GetParameter(input, "PROOF_TPacketizerFileCircularity", fCircLvl);
425 fCircLvl = (fCircLvl > 0) ? fCircLvl : 5;
427 fSlave = slave;
429}
430
431////////////////////////////////////////////////////////////////////////////////
432/// Destructor
433
435{
436 SafeDelete(fCircNtp);
437}
438
439////////////////////////////////////////////////////////////////////////////////
440/// Update the circular ntple
441
443{
444 Double_t ttot = time;
445 Double_t *ar = fCircNtp->GetArgs();
446 Int_t ne = fCircNtp->GetEntries();
447 if (ne <= 0) {
448 // First call: just fill one ref entry and return
449 fCircNtp->Fill(0., 0);
450 fSpeed = 0.;
451 return;
452 }
453 // Fill the entry
454 fCircNtp->GetEntry(ne-1);
455 ttot = ar[0] + time;
456 fCircNtp->Fill(ttot, GetEntriesProcessed());
457
458 // Calculate the speed
459 fCircNtp->GetEntry(0);
460 Double_t dtime = (ttot > ar[0]) ? ttot - ar[0] : ne+1 ;
461 Long64_t nevts = GetEntriesProcessed() - (Long64_t)ar[1];
462 fSpeed = nevts / dtime;
463 PDB(kPacketizer,2)
464 Info("UpdatePerformance", "time:%f, dtime:%f, nevts:%lld, speed: %f",
465 time, dtime, nevts, fSpeed);
466
467}
468
469////////////////////////////////////////////////////////////////////////////////
470/// Update the status info to the 'st'.
471/// return the difference (*st - *fStatus)
472
474{
475 if (st) {
476 // The entriesis not correct in 'st'
477 Long64_t lastEntries = st->GetEntries() - fStatus->GetEntries();
478 // The last proc time should not be added
479 fStatus->SetLastProcTime(0.);
480 // Get the diff
481 TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
482 *fStatus += *diff;
483 // Set the correct value
484 fStatus->SetLastEntries(lastEntries);
485 return diff;
486 } else {
487 Error("AddProcessed", "status arg undefined");
488 return 0;
489 }
490}
491
492////////////////////////////////////////////////////////////////////////////////
493/// Printf info
494
496{
497 Printf("Iterator '%s' controls %d units", GetName(),
498 ((GetIter() && GetIter()->GetCollection()) ? GetIter()->GetCollection()->GetSize()
499 : -1));
500}
ROOT::R::TRInterface & r
Definition Object.C:4
#define SafeDelete(p)
Definition RConfig.hxx:537
const Bool_t kFALSE
Definition RtypesCore.h:101
long Long_t
Definition RtypesCore.h:54
double Double_t
Definition RtypesCore.h:59
long long Long64_t
Definition RtypesCore.h:80
float Float_t
Definition RtypesCore.h:57
const Bool_t kTRUE
Definition RtypesCore.h:100
const char Option_t
Definition RtypesCore.h:66
#define ClassImp(name)
Definition Rtypes.h:364
#define PDB(mask, level)
Definition TProofDebug.h:56
void Printf(const char *fmt,...)
#define gPerfStats
static struct mg_connection * fc(struct mg_context *ctx)
Definition civetweb.c:3728
virtual void Print(Option_t *option="") const
Default print for collections, calls Print(option, 1).
void SetName(const char *name)
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Manages an element of a TDSet.
Definition TDSet.h:66
void AddAssocObj(TObject *assocobj)
Add an associated object to the list.
Definition TDSet.cxx:634
Class that contains a list of TFileInfo's and accumulated meta data information about its entries.
Class describing a generic file including meta information.
Definition TFileInfo.h:39
void Print(Option_t *options="") const
Print information about this object.
TUrl * GetCurrentUrl() const
Return the current url.
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
Definition THashList.h:34
TObject * Next()
A doubly linked list.
Definition TList.h:38
virtual void Add(TObject *obj)
Definition TList.h:81
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition TList.cxx:578
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition TMap.h:40
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection).
Definition TMap.cxx:54
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Definition TMap.cxx:236
A simple TTree restricted to a list of double variables only.
Definition TNtupleD.h:28
Collectable string class.
Definition TObjString.h:28
const char * GetName() const
Returns name of object.
Definition TObjString.h:38
Mother of all ROOT objects.
Definition TObject.h:41
virtual const char * GetName() const
Returns name of object.
Definition TObject.cxx:429
virtual const char * ClassName() const
Returns name of class to which the object belongs.
Definition TObject.cxx:200
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition TObject.cxx:949
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition TObject.cxx:766
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition TObject.cxx:963
void ResetBit(UInt_t f)
Definition TObject.h:200
@ kInvalidObject
if object ctor succeeded but object should not be used
Definition TObject.h:72
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition TObject.cxx:937
void Print(Option_t *option="") const
Printf info.
TIterObj(const char *n, TIter *iter)
const char * GetName() const
Returns name of object.
void UpdatePerformance(Double_t time)
Update the circular ntple.
TSlaveStat(TSlave *sl, TList *input)
Main constructor.
TProofProgressStatus * AddProcessed(TProofProgressStatus *st)
Update the status info to the 'st'.
This packetizer generates packets which contain a single file path to be used in process.
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
virtual ~TPacketizerFile()
Destructor.
TDSetElement * GetNextPacket(TSlave *wrk, TMessage *r)
Get next packet.
Double_t GetCurrentTime()
Get current time.
TStopwatch * fStopwatch
Container class for processing statistics.
void SetLastUpdate(Double_t updtTime=0)
Update time stamp either with the passed value (if > 0) or with the current time.
Double_t GetProcTime() const
Long64_t GetEntries() const
Double_t GetCurrentRate() const
Get current rate. Rteunr the average rate if the current is not defined.
void SetLastEntries(Long64_t entries)
Double_t GetCPUTime() const
void IncEntries(Long64_t entries=1)
Long64_t GetBytesRead() const
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition TProof.cxx:9918
Class describing a PROOF worker server.
Definition TSlave.h:46
Int_t GetProtocol() const
Definition TSlave.h:133
const char * GetName() const
Returns name of object.
Definition TSlave.h:124
const char * GetOrdinal() const
Definition TSlave.h:131
Stopwatch class.
Definition TStopwatch.h:28
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
void Continue()
Resume a stopped stopwatch.
Basic string class.
Definition TString.h:136
const char * Data() const
Definition TString.h:369
Bool_t IsNull() const
Definition TString.h:407
virtual void SetCircular(Long64_t maxEntries)
Enable/Disable circularity for this tree.
Definition TTree.cxx:8858
This class represents a WWW compatible URL.
Definition TUrl.h:33
const char * GetUrl(Bool_t withDeflt=kFALSE) const
Return full URL.
Definition TUrl.cxx:389
const char * GetHostFQDN() const
Return fully qualified domain name of url host.
Definition TUrl.cxx:471
The packetizer is a load balancing object created for each query.
TProofProgressStatus * fProgressStatus
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
Long64_t GetEntriesProcessed() const
const Int_t n
Definition legend1.C:16