Logo ROOT   6.14/05
Reference Guide
TPacketizerFile.cxx
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: G. Ganis 2009
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /** \class TPacketizerFile
13 \ingroup proofkernel
14 
15 This packetizer generates packets which contain a single file path
16 to be used in process. Used for tasks generating files, like in
17 PROOF bench.
18 
19 */
20 
21 #include "TPacketizerFile.h"
22 
23 #include "Riostream.h"
24 #include "TDSet.h"
25 #include "TError.h"
26 #include "TEventList.h"
27 #include "TMap.h"
28 #include "TMessage.h"
29 #include "TMonitor.h"
30 #include "TNtupleD.h"
31 #include "TObject.h"
32 #include "TParameter.h"
33 #include "TPerfStats.h"
34 #include "TProofDebug.h"
35 #include "TProof.h"
36 #include "TProofPlayer.h"
37 #include "TProofServ.h"
38 #include "TSlave.h"
39 #include "TSocket.h"
40 #include "TStopwatch.h"
41 #include "TTimer.h"
42 #include "TUrl.h"
43 #include "TClass.h"
44 #include "TMath.h"
45 #include "TObjString.h"
46 #include "TFileInfo.h"
47 #include "TFileCollection.h"
48 #include "THashList.h"
49 
50 //------------------------------------------------------------------------------
51 
52 class TPacketizerFile::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
53 
54 friend class TPacketizerFile;
55 
56 private:
57  Long64_t fLastProcessed; // number of processed entries of the last packet
58  Double_t fSpeed; // estimated current average speed of the processing slave
59  Double_t fTimeInstant; // stores the time instant when the current packet started
60  TNtupleD *fCircNtp; // Keeps circular info for speed calculations
61  Long_t fCircLvl; // Circularity level
62 
63 public:
64  TSlaveStat(TSlave *sl, TList *input);
65  ~TSlaveStat();
66 
67  void GetCurrentTime();
68 
69  void UpdatePerformance(Double_t time);
71 };
72 
73 // Iterator wrapper
74 class TPacketizerFile::TIterObj : public TObject {
75 
76 private:
77  TString fName; // Name of reference
78  TIter *fIter; // Iterator
79 
80 public:
81  TIterObj(const char *n, TIter *iter) : fName(n), fIter(iter) { }
82  virtual ~TIterObj() { if (fIter) delete fIter; }
83 
84  const char *GetName() const {return fName;}
85  TIter *GetIter() const {return fIter;}
86  void Print(Option_t* option = "") const;
87 };
88 
90 
91 ////////////////////////////////////////////////////////////////////////////////
92 /// Constructor
93 
96  : TVirtualPacketizer(input, st)
97 {
98  PDB(kPacketizer,1) Info("TPacketizerFile", "enter");
100  fValid = kFALSE;
101  fAssigned = 0;
104 
105  if (!input || (input && input->GetSize() <= 0)) {
106  Error("TPacketizerFile", "input file is undefined or empty!");
108  return;
109  }
110 
111  // Check if the files not explicitly assigned have to be processed
112  Int_t procnotass = 1;
113  if (TProof::GetParameter(input, "PROOF_ProcessNotAssigned", procnotass) == 0) {
114  if (procnotass == 0) {
115  Info("TPacketizerFile", "files not assigned to workers will not be processed");
117  }
118  }
119 
120  // Check if the TFileInfo object has to be added to the packet
121  Int_t addfileinfo = 0;
122  if (TProof::GetParameter(input, "PROOF_IncludeFileInfoInPacket", addfileinfo) == 0) {
123  if (addfileinfo == 1) {
124  Info("TPacketizerFile",
125  "TFileInfo object will be included in the packet as associated object");
127  }
128  }
129 
130  // These are the file to be created/processed per node; the information
131  if (!(fFiles = dynamic_cast<TMap *>(input->FindObject("PROOF_FilesToProcess")))) {
132  Error("TPacketizerFile", "map of files to be processed/created not found");
134  return;
135  }
136 
137  // The worker stats
138  fSlaveStats = new TMap;
140 
141  TList nodes;
142  nodes.SetOwner(kTRUE);
143  TSlave *wrk;
144  TIter si(workers);
145  while ((wrk = (TSlave *) si.Next())) {
146  fSlaveStats->Add(wrk, new TSlaveStat(wrk, input));
147  TString wrkname = TUrl(wrk->GetName()).GetHostFQDN();
148  Info("TPacketizerFile", "worker: %s", wrkname.Data());
149  if (!nodes.FindObject(wrkname)) nodes.Add(new TObjString(wrkname));
150  }
151 
152  // The list of iterators
153  fIters = new TList;
155 
156  // There must be something in
157  fTotalEntries = 0;
158  fNotAssigned = new TList;
159  fNotAssigned->SetName("*");
160  TIter nxl(fFiles);
161  TObject *key, *o = 0;
162  while ((key = nxl()) != 0) {
163  THashList *wrklist = dynamic_cast<THashList *>(fFiles->GetValue(key));
164  if (!wrklist) {
165  TFileCollection *fc = dynamic_cast<TFileCollection *>(fFiles->GetValue(key));
166  if (fc) wrklist = fc->GetList();
167  }
168  if (wrklist) {
169  TString hname = TUrl(key->GetName()).GetHostFQDN();
170  if ((o = nodes.FindObject(hname))) {
171  fTotalEntries += wrklist->GetSize();
172  fIters->Add(new TIterObj(hname, new TIter(wrklist)));
173  // Notify
174  PDB(kPacketizer,2)
175  Info("TPacketizerFile", "%d files of '%s' (fqdn: '%s') assigned to '%s'",
176  wrklist->GetSize(), key->GetName(), hname.Data(), o->GetName());
177  } else {
178  // We add all to the not assigned list so that they will be distributed
179  // according to the load
180  TIter nxf(wrklist);
181  while ((o = nxf()))
182  fNotAssigned->Add(o);
183  // Notify
184  PDB(kPacketizer,2)
185  Info("TPacketizerFile", "%d files of '%s' (fqdn: '%s') not assigned",
186  wrklist->GetSize(), key->GetName(), hname.Data());
187  }
188  }
189  }
190  if (fNotAssigned && fNotAssigned->GetSize() > 0) {
192  fIters->Add(new TIterObj("*", new TIter(fNotAssigned)));
193  Info("TPacketizerFile", "non-assigned files: %d", fNotAssigned->GetSize());
194  fNotAssigned->Print();
195  }
196  if (fTotalEntries <= 0) {
197  Error("TPacketizerFile", "no file path in the map!");
200  return;
201  } else {
202  Info("TPacketizerFile", "processing %lld files", fTotalEntries);
203  fIters->Print();
204  }
205 
206  fStopwatch = new TStopwatch();
207  fStopwatch->Start();
208  fValid = kTRUE;
209  PDB(kPacketizer,1) Info("TPacketizerFile", "return");
210 
211  // Done
212  return;
213 }
214 
215 ////////////////////////////////////////////////////////////////////////////////
216 /// Destructor.
217 
219 {
222  if (fIters) fIters->SetOwner(kTRUE);
225 }
226 
227 ////////////////////////////////////////////////////////////////////////////////
228 /// Get current time
229 
231 {
232  Double_t retValue = fStopwatch->RealTime();
233  fStopwatch->Continue();
234  return retValue;
235 }
236 
237 ////////////////////////////////////////////////////////////////////////////////
238 /// Get Estimation of the current rate; just summing the current rates of
239 /// the active workers
240 
242 {
243  all = kTRUE;
244  // Loop over the workers
245  Float_t currate = 0.;
246  if (fSlaveStats && fSlaveStats->GetSize() > 0) {
247  TIter nxw(fSlaveStats);
248  TObject *key;
249  while ((key = nxw()) != 0) {
250  TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(key);
251  if (wrkstat && wrkstat->GetProgressStatus() && wrkstat->GetEntriesProcessed() > 0) {
252  // Sum-up the current rates
253  currate += wrkstat->GetProgressStatus()->GetCurrentRate();
254  } else {
255  all = kFALSE;
256  }
257  }
258  }
259  // Done
260  return currate;
261 }
262 
263 ////////////////////////////////////////////////////////////////////////////////
264 /// Get next packet
265 
267 {
268  TDSetElement *elem = 0;
269  if (!fValid) return elem;
270 
271  // Find slave
272  TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(wrk);
273  if (!wrkstat) {
274  Error("GetNextPacket", "could not find stat object for worker '%s'!", wrk->GetName());
275  return elem;
276  }
277 
278  PDB(kPacketizer,2)
279  Info("GetNextPacket","worker-%s: fAssigned %lld / %lld", wrk->GetOrdinal(), fAssigned, fTotalEntries);
280 
281  // Update stats & free old element
282  Double_t latency = 0., proctime = 0., proccpu = 0.;
283  Long64_t bytesRead = -1;
284  Long64_t totalEntries = -1; // used only to read an old message type
285  Long64_t totev = 0;
286  Long64_t numev = -1;
287 
288  TProofProgressStatus *status = 0;
289  if (wrk->GetProtocol() > 18) {
290  (*r) >> latency;
291  (*r) >> status;
292 
293  // Calculate the progress made in the last packet
294  TProofProgressStatus *progress = 0;
295  if (status) {
296  // upadte the worker status
297  numev = status->GetEntries() - wrkstat->GetEntriesProcessed();
298  progress = wrkstat->AddProcessed(status);
299  if (progress) {
300  // (*fProgressStatus) += *progress;
301  proctime = progress->GetProcTime();
302  proccpu = progress->GetCPUTime();
303  totev = status->GetEntries(); // for backward compatibility
304  bytesRead = progress->GetBytesRead();
305  delete progress;
306  }
307  delete status;
308  } else
309  Error("GetNextPacket", "no status came in the kPROOF_GETPACKET message");
310  } else {
311 
312  (*r) >> latency >> proctime >> proccpu;
313 
314  // only read new info if available
315  if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
316  if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
317  if (r->BufferSize() > r->Length()) (*r) >> totev;
318 
319  numev = totev - wrkstat->GetEntriesProcessed();
320  wrkstat->GetProgressStatus()->IncEntries(numev);
321  wrkstat->GetProgressStatus()->SetLastUpdate();
322  }
323 
324  fProgressStatus->IncEntries(numev);
326 
327  PDB(kPacketizer,2)
328  Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
329  wrk->GetOrdinal(), wrk->GetName(),
330  numev, latency, proctime, proccpu, bytesRead);
331 
332  if (gPerfStats != 0) {
333  gPerfStats->PacketEvent(wrk->GetOrdinal(), wrk->GetName(), "", numev,
334  latency, proctime, proccpu, bytesRead);
335  }
336 
337  if (fAssigned == fTotalEntries) {
338  // Send last timer message
339  HandleTimer(0);
340  return 0;
341  }
342 
343  if (fStop) {
344  // Send last timer message
345  HandleTimer(0);
346  return 0;
347  }
348 
349  PDB(kPacketizer,2)
350  Info("GetNextPacket", "worker-%s (%s): getting next files ... ", wrk->GetOrdinal(),
351  wrk->GetName());
352 
353  // Get next file now
354  TObject *nextfile = 0;
355 
356  // Find iterator associated to the worker
357  TString wrkname = TUrl(wrk->GetName()).GetHostFQDN();
358  TIterObj *io = dynamic_cast<TIterObj *>(fIters->FindObject(wrkname));
359  if (io) {
360  // Get next file to process in the list of the worker
361  if (io->GetIter())
362  nextfile = io->GetIter()->Next();
363  }
364 
365  // If not found or all files already processed, check if a generic iterator
366  // has still some files to process
367  if (!nextfile && fProcNotAssigned) {
368  if ((io = dynamic_cast<TIterObj *>(fIters->FindObject("*")))) {
369  // Get next file to process in the list of the worker
370  if (io->GetIter())
371  nextfile = io->GetIter()->Next();
372  }
373  }
374 
375  // Return if nothing to process
376  if (!nextfile) return elem;
377 
378  // The file name: we support TObjString or TFileInfo
379  TString filename;
380  TObjString *os = 0;
381  TFileInfo *fi = 0;
382  if ((os = dynamic_cast<TObjString *>(nextfile))) {
383  filename = os->GetName();
384  } else {
385  if ((fi = dynamic_cast<TFileInfo *>(nextfile)))
386  filename = fi->GetCurrentUrl()->GetUrl();
387  }
388  // Nothing to process
389  if (filename.IsNull()) {
390  Warning("GetNextPacket", "found unsupported object of type '%s' in list: it must"
391  " be 'TObjString' or 'TFileInfo'", nextfile->ClassName());
392  return elem;
393  }
394  // Prepare the packet
395  PDB(kPacketizer,2)
396  Info("GetNextPacket", "worker-%s: assigning: '%s' (remaining %lld files)",
397  wrk->GetOrdinal(), filename.Data(), (fTotalEntries - fAssigned));
398  elem = new TDSetElement(filename, "", "", 0, 1);
400 
401  // Add the element, if required
402  if (fAddFileInfo && fi) {
403  elem->AddAssocObj(fi);
404  PDB(kPacketizer,2) fi->Print("L");
405  }
406 
407  // Update the total counter
408  fAssigned += 1;
409 
410  return elem;
411 }
412 
413 //------------------------------------------------------------------------------
414 
415 ////////////////////////////////////////////////////////////////////////////////
416 /// Main constructor
417 
418 TPacketizerFile::TSlaveStat::TSlaveStat(TSlave *slave, TList *input)
419  : fLastProcessed(0),
420  fSpeed(0), fTimeInstant(0), fCircLvl(5)
421 {
422  // Initialize the circularity ntple for speed calculations
423  fCircNtp = new TNtupleD("Speed Circ Ntp", "Circular process info","tm:ev");
424  TProof::GetParameter(input, "PROOF_TPacketizerFileCircularity", fCircLvl);
425  fCircLvl = (fCircLvl > 0) ? fCircLvl : 5;
426  fCircNtp->SetCircular(fCircLvl);
427  fSlave = slave;
428  fStatus = new TProofProgressStatus();
429 }
430 
431 ////////////////////////////////////////////////////////////////////////////////
432 /// Destructor
433 
434 TPacketizerFile::TSlaveStat::~TSlaveStat()
435 {
436  SafeDelete(fCircNtp);
437 }
438 
439 ////////////////////////////////////////////////////////////////////////////////
440 /// Update the circular ntple
441 
442 void TPacketizerFile::TSlaveStat::UpdatePerformance(Double_t time)
443 {
444  Double_t ttot = time;
445  Double_t *ar = fCircNtp->GetArgs();
446  Int_t ne = fCircNtp->GetEntries();
447  if (ne <= 0) {
448  // First call: just fill one ref entry and return
449  fCircNtp->Fill(0., 0);
450  fSpeed = 0.;
451  return;
452  }
453  // Fill the entry
454  fCircNtp->GetEntry(ne-1);
455  ttot = ar[0] + time;
456  fCircNtp->Fill(ttot, GetEntriesProcessed());
457 
458  // Calculate the speed
459  fCircNtp->GetEntry(0);
460  Double_t dtime = (ttot > ar[0]) ? ttot - ar[0] : ne+1 ;
461  Long64_t nevts = GetEntriesProcessed() - (Long64_t)ar[1];
462  fSpeed = nevts / dtime;
463  PDB(kPacketizer,2)
464  Info("UpdatePerformance", "time:%f, dtime:%f, nevts:%lld, speed: %f",
465  time, dtime, nevts, fSpeed);
466 
467 }
468 
469 ////////////////////////////////////////////////////////////////////////////////
470 /// Update the status info to the 'st'.
471 /// return the difference (*st - *fStatus)
472 
473 TProofProgressStatus *TPacketizerFile::TSlaveStat::AddProcessed(TProofProgressStatus *st)
474 {
475  if (st) {
476  // The entriesis not correct in 'st'
477  Long64_t lastEntries = st->GetEntries() - fStatus->GetEntries();
478  // The last proc time should not be added
479  fStatus->SetLastProcTime(0.);
480  // Get the diff
481  TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
482  *fStatus += *diff;
483  // Set the correct value
484  fStatus->SetLastEntries(lastEntries);
485  return diff;
486  } else {
487  Error("AddProcessed", "status arg undefined");
488  return 0;
489  }
490 }
491 
492 ////////////////////////////////////////////////////////////////////////////////
493 /// Printf info
494 
496 {
497  Printf("Iterator '%s' controls %d units", GetName(),
498  ((GetIter() && GetIter()->GetCollection()) ? GetIter()->GetCollection()->GetSize()
499  : -1));
500 }
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:854
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
Definition: TStopwatch.cxx:110
long long Long64_t
Definition: RtypesCore.h:69
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Definition: TStopwatch.cxx:58
A simple TTree restricted to a list of double variables only.
Definition: TNtupleD.h:28
Collectable string class.
Definition: TObjString.h:28
float Float_t
Definition: RtypesCore.h:53
const char Option_t
Definition: RtypesCore.h:62
void Print(Option_t *options="") const
Print information about this object.
Definition: TFileInfo.cxx:477
This class represents a WWW compatible URL.
Definition: TUrl.h:35
TUrl * GetCurrentUrl() const
Return the current url.
Definition: TFileInfo.cxx:248
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition: TProof.cxx:9890
virtual ~TPacketizerFile()
Destructor.
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
Double_t GetProcTime() const
void SetLastUpdate(Double_t updtTime=0)
Update time stamp either with the passed value (if > 0) or with the current time. ...
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection)...
Definition: TMap.cxx:53
void AddAssocObj(TObject *assocobj)
Add an associated object to the list.
Definition: TDSet.cxx:634
const char * GetOrdinal() const
Definition: TSlave.h:131
Basic string class.
Definition: TString.h:131
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
virtual void Print(Option_t *option="") const
This method must be overridden when a class wants to print itself.
Definition: TObject.cxx:550
const char * GetName() const
Returns name of object.
Definition: TSlave.h:124
Long64_t GetEntries() const
Int_t GetProtocol() const
Definition: TSlave.h:133
Double_t GetCurrentTime()
Get current time.
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:694
const char * GetUrl(Bool_t withDeflt=kFALSE) const
Return full URL.
Definition: TUrl.cxx:387
virtual TObject * FindObject(const char *name) const
Delete a TObjLink object.
Definition: TList.cxx:574
Int_t Length() const
Definition: TBuffer.h:96
Manages an element of a TDSet.
Definition: TDSet.h:66
static struct mg_connection * fc(struct mg_context *ctx)
Definition: civetweb.c:3352
#define PDB(mask, level)
Definition: TProofDebug.h:56
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
Definition: THashList.h:34
void IncEntries(Long64_t entries=1)
void Continue()
Resume a stopped stopwatch.
Definition: TStopwatch.cxx:93
TProofProgressStatus * fProgressStatus
A doubly linked list.
Definition: TList.h:44
const char * GetName() const
Returns name of object.
Definition: TObjString.h:39
void SetLastEntries(Long64_t entries)
TDSetElement * GetNextPacket(TSlave *wrk, TMessage *r)
Get next packet.
ROOT::R::TRInterface & r
Definition: Object.C:4
if object ctor succeeded but object should not be used
Definition: TObject.h:68
THashList * GetList()
TObject * Next()
Definition: TCollection.h:249
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:880
void SetName(const char *name)
Definition: TCollection.h:204
#define Printf
Definition: TGeoToOCC.h:18
#define gPerfStats
const Bool_t kFALSE
Definition: RtypesCore.h:88
#define SafeDelete(p)
Definition: RConfig.h:529
long Long_t
Definition: RtypesCore.h:50
The packetizer is a load balancing object created for each query.
#define ClassImp(name)
Definition: Rtypes.h:359
void Print(std::ostream &os, const OptionType &opt)
double Double_t
Definition: RtypesCore.h:55
Long64_t GetEntriesProcessed() const
Double_t GetCPUTime() const
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:40
Int_t BufferSize() const
Definition: TBuffer.h:94
Bool_t IsNull() const
Definition: TString.h:402
Mother of all ROOT objects.
Definition: TObject.h:37
Long64_t GetBytesRead() const
TStopwatch * fStopwatch
virtual void Add(TObject *obj)
Definition: TList.h:87
Class that contains a list of TFileInfo&#39;s and accumulated meta data information about its entries...
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Definition: TMap.cxx:235
Class describing a generic file including meta information.
Definition: TFileInfo.h:38
void ResetBit(UInt_t f)
Definition: TObject.h:171
virtual void Print(Option_t *option="") const
Default print for collections, calls Print(option, 1).
virtual const char * GetName() const
Returns name of object.
Definition: TObject.cxx:357
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Definition: TCollection.h:182
Class describing a PROOF worker server.
Definition: TSlave.h:46
Container class for processing statistics.
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
const Bool_t kTRUE
Definition: RtypesCore.h:87
virtual TProofProgressStatus * AddProcessed(TProofProgressStatus *st)=0
This packetizer generates packets which contain a single file path to be used in process.
const Int_t n
Definition: legend1.C:16
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:866
const char * Data() const
Definition: TString.h:364
Stopwatch class.
Definition: TStopwatch.h:28