ROOT  6.06/09
Reference Guide
TPacketizerUnit.cxx
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: Long Tran-Thanh 22/07/07
3 // Revised: G. Ganis, May 2011
4 
5 /*************************************************************************
6  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
7  * All rights reserved. *
8  * *
9  * For the licensing terms see $ROOTSYS/LICENSE. *
10  * For the list of contributors see $ROOTSYS/README/CREDITS. *
11  *************************************************************************/
12 
13 //////////////////////////////////////////////////////////////////////////
14 // //
15 // TPacketizerUnit //
16 // //
17 // This packetizer generates packets of generic units, representing the //
18 // number of times an operation cycle has to be repeated by the worker //
19 // node, e.g. the number of Monte carlo events to be generated. //
20 // Packets sizes are generated taking into account the performance of //
21 // worker nodes, based on the time needed to process previous packets, //
22 // with the goal of having all workers ending at the same time. //
23 // //
24 //////////////////////////////////////////////////////////////////////////
25 
26 
27 #include "TPacketizerUnit.h"
28 
29 #include "Riostream.h"
30 #include "TDSet.h"
31 #include "TError.h"
32 #include "TEventList.h"
33 #include "TMap.h"
34 #include "TMessage.h"
35 #include "TMonitor.h"
36 #include "TNtupleD.h"
37 #include "TObject.h"
38 #include "TParameter.h"
39 #include "TPerfStats.h"
40 #include "TProofDebug.h"
41 #include "TProof.h"
42 #include "TProofPlayer.h"
43 #include "TProofServ.h"
44 #include "TSlave.h"
45 #include "TSocket.h"
46 #include "TStopwatch.h"
47 #include "TTimer.h"
48 #include "TUrl.h"
49 #include "TClass.h"
50 #include "TMath.h"
51 #include "TObjString.h"
52 
53 
54 using namespace TMath;
55 //
56 // The following utility class manage the state of the
57 // work to be performed and the slaves involved in the process.
58 //
59 // The list of TSlaveStat(s) keep track of the work (being) done
60 // by each slave
61 //
62 
63 //------------------------------------------------------------------------------
64 
65 class TPacketizerUnit::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
66 
67 friend class TPacketizerUnit;
68 
69 private:
70  Long64_t fLastProcessed; // Number of processed entries of the last packet
71  Double_t fRate; // Estimated processing rate averaged over circularity
72  Double_t fTimeInstant; // Starting time of the current packet
73  TNtupleD *fCircNtp; // Keeps circular info for speed calculations
74  Long_t fCircLvl; // Circularity level
75 
76 public:
77  TSlaveStat(TSlave *sl, TList *input);
78  ~TSlaveStat();
79 
80 // void GetCurrentTime();
81 
82  void UpdatePerformance(Double_t time);
84 
85 // ClassDef(TPacketizerUnit::TSlaveStat, 0);
86 };
87 
88 ////////////////////////////////////////////////////////////////////////////////
89 /// Main constructor
90 
91 TPacketizerUnit::TSlaveStat::TSlaveStat(TSlave *slave, TList *input)
92  : fLastProcessed(0),
93  fRate(0), fTimeInstant(0), fCircLvl(5)
94 {
95  // Initialize the circularity ntple for speed calculations
96  fCircNtp = new TNtupleD("Speed Circ Ntp", "Circular process info","tm:ev");
97  fCircNtp->SetDirectory(0);
98  TProof::GetParameter(input, "PROOF_TPacketizerUnitCircularity", fCircLvl);
99  fCircLvl = (fCircLvl > 0) ? fCircLvl : 5;
100  fCircNtp->SetCircular(fCircLvl);
101  fSlave = slave;
102  fStatus = new TProofProgressStatus();
103 }
104 
105 ////////////////////////////////////////////////////////////////////////////////
106 /// Destructor
107 
108 TPacketizerUnit::TSlaveStat::~TSlaveStat()
109 {
110  SafeDelete(fCircNtp);
111 }
112 
113 ////////////////////////////////////////////////////////////////////////////////
114 /// Update the circular ntple
115 
116 void TPacketizerUnit::TSlaveStat::UpdatePerformance(Double_t time)
117 {
118  Double_t ttot = time;
119  Double_t *ar = fCircNtp->GetArgs();
120  Int_t ne = fCircNtp->GetEntries();
121  if (ne <= 0) {
122  // First call: just fill one ref entry and return
123  fCircNtp->Fill(0., 0);
124  fRate = 0.;
125  return;
126  }
127  // Fill the entry
128  fCircNtp->GetEntry(ne-1);
129  ttot = ar[0] + time;
130  fCircNtp->Fill(ttot, GetEntriesProcessed());
131 
132  // Calculate the speed
133  fCircNtp->GetEntry(0);
134  Double_t dtime = (ttot > ar[0]) ? ttot - ar[0] : ne+1 ;
135  Long64_t nevts = GetEntriesProcessed() - (Long64_t)ar[1];
136  fRate = nevts / dtime;
137  PDB(kPacketizer,2)
138  Info("UpdatePerformance", "time:%f, dtime:%f, nevts:%lld, speed: %f",
139  time, dtime, nevts, fRate);
140 
141 }
142 
143 ////////////////////////////////////////////////////////////////////////////////
144 /// Update the status info to the 'st'.
145 /// return the difference (*st - *fStatus)
146 
148 {
149  if (st) {
150  // The entriesis not correct in 'st'
151  Long64_t lastEntries = st->GetEntries() - fStatus->GetEntries();
152  // The last proc time should not be added
153  fStatus->SetLastProcTime(0.);
154  // Get the diff
155  TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
156  *fStatus += *diff;
157  // Set the correct value
158  fStatus->SetLastEntries(lastEntries);
159  return diff;
160  } else {
161  Error("AddProcessed", "status arg undefined");
162  return 0;
163  }
164 }
165 
166 //------------------------------------------------------------------------------
167 
169 
170 ////////////////////////////////////////////////////////////////////////////////
171 /// Constructor
172 
173 TPacketizerUnit::TPacketizerUnit(TList *slaves, Long64_t num, TList *input,
175  : TVirtualPacketizer(input, st)
176 {
177  PDB(kPacketizer,1) Info("TPacketizerUnit", "enter (num %lld)", num);
178 
179  // Init pointer members
180  fWrkStats = 0;
181  fPackets = 0;
182  fInput = input;
183 
184  fFixedNum = kFALSE;
185  Int_t fixednum = -1;
186  if (TProof::GetParameter(input, "PROOF_PacketizerFixedNum", fixednum) != 0 || fixednum <= 0) {
187  fFixedNum = kFALSE;
188  }
189  else {
190  Info("TPacketizerUnit", "forcing the same cycles on each worker");
191  fFixedNum = kTRUE;
192  }
193 
194  fCalibFrac = 0.01;
195  if (TProof::GetParameter(input, "PROOF_PacketizerCalibFrac", fCalibFrac) != 0 || fCalibFrac <= 0)
196  fCalibFrac = 0.01;
197  PDB(kPacketizer,1)
198  Info("TPacketizerUnit", "size of the calibration packets: %.2f %% of average number per worker", fCalibFrac);
199 
200  fMaxPacketTime = 3.;
201  Double_t timeLimit = -1;
202  if (TProof::GetParameter(input, "PROOF_PacketizerTimeLimit", timeLimit) == 0) {
203  fMaxPacketTime = timeLimit;
204  Warning("TPacketizerUnit", "PROOF_PacketizerTimeLimit is deprecated: use PROOF_MaxPacketTime instead");
205  }
206  PDB(kPacketizer,1)
207  Info("TPacketizerUnit", "time limit is %lf", fMaxPacketTime);
208 
209  // Different default for min packet time
210  fMinPacketTime = 1;
211  Double_t minPacketTime = 0;
212  if (TProof::GetParameter(input, "PROOF_MinPacketTime", minPacketTime) == 0) fMinPacketTime = minPacketTime;
213  TParameter<Double_t> *mpt = (TParameter<Double_t> *) fConfigParams->FindObject("PROOF_MinPacketTime");
214  if (mpt) {
215  mpt->SetVal(fMinPacketTime);
216  } else {
217  fConfigParams->Add(new TParameter<Double_t>("PROOF_MinPacketTime", fMinPacketTime));
218  }
219 
220  fProcessing = 0;
221  fAssigned = 0;
222  fPacketSeq = 0;
223 
224  fStopwatch = new TStopwatch();
225 
226  fPackets = new TList;
227  fPackets->SetOwner();
228 
229  fWrkStats = new TMap;
231  fWrkExcluded = 0;
232 
233  TSlave *slave;
234  TIter si(slaves);
235  while ((slave = (TSlave*) si.Next())) {
236  if (slave->GetParallel() > 0) {
237  fWrkStats->Add(slave, new TSlaveStat(slave, input));
238  } else {
239  if (!fWrkExcluded) {
240  fWrkExcluded = new TList;
242  }
243  PDB(kPacketizer,2)
244  Info("TPacketizerUnit", "node '%s' has NO active worker: excluded from work distribution", slave->GetOrdinal());
245  fWrkExcluded->Add(slave);
246  }
247  }
248 
249  fTotalEntries = 0;
250  fNumPerWorker = -1;
251  if (num > 0 && AssignWork(0,0,num) != 0)
252  Warning("TPacketizerUnit", "some problems assigning work");
253 
254  // Save the config parameters in the dedicated list so that they will be saved
255  // in the outputlist and therefore in the relevant TQueryResult
256  fConfigParams->Add(new TParameter<Float_t>("PROOF_PacketizerCalibFrac", fCalibFrac));
257 
258  fStopwatch->Start();
259  PDB(kPacketizer,1) Info("TPacketizerUnit", "return");
260 }
261 
262 ////////////////////////////////////////////////////////////////////////////////
263 /// Assign work to be done to this packetizer
264 
266 {
267  if (num < 0) {
268  Error("AssignWork", "assigned a negative number (%lld) of cycles - protocol error?", num);
269  return -1;
270  }
271 
272  fTotalEntries += num;
273  PDB(kPacketizer,1)
274  Info("AssignWork", "assigned %lld additional cycles (new total: %lld)", num, fTotalEntries);
275 
276  // Update fixed number counter
277  if (fFixedNum && fWrkStats->GetSize() > 0) {
278  // Approximate number: the exact number is determined in GetNextPacket
280  if (fNumPerWorker == 0) fNumPerWorker = 1;
281  }
282 
283  // Update/Save the config parameters in the dedicated list so that they will be saved
284  // in the outputlist and therefore in the relevant TQueryResult
286  (TParameter<Long64_t> *) fConfigParams->FindObject("PROOF_PacketizerFixedNum");
287  if (fn) {
288  fn->SetVal(fNumPerWorker);
289  } else {
290  fConfigParams->Add(new TParameter<Long64_t>("PROOF_PacketizerFixedNum", fNumPerWorker));
291  }
292 
293  // Done
294  return 0;
295 }
296 
297 ////////////////////////////////////////////////////////////////////////////////
298 /// Destructor.
299 
301 {
302  if (fWrkStats)
308 }
309 
310 ////////////////////////////////////////////////////////////////////////////////
311 /// Get current time
312 
314 {
315  Double_t retValue = fStopwatch->RealTime();
316  fStopwatch->Continue();
317  return retValue;
318 }
319 
320 ////////////////////////////////////////////////////////////////////////////////
321 /// Get Estimation of the current rate; just summing the current rates of
322 /// the active workers
323 
325 {
326  all = kTRUE;
327  // Loop over the workers
328  Float_t currate = 0.;
329  if (fWrkStats && fWrkStats->GetSize() > 0) {
330  TIter nxw(fWrkStats);
331  TObject *key;
332  while ((key = nxw()) != 0) {
333  TSlaveStat *slstat = (TSlaveStat *) fWrkStats->GetValue(key);
334  if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
335  // Sum-up the current rates
336  currate += slstat->GetProgressStatus()->GetCurrentRate();
337  } else {
338  all = kFALSE;
339  }
340  }
341  }
342  // Done
343  return currate;
344 }
345 
346 ////////////////////////////////////////////////////////////////////////////////
347 /// Get next packet
348 
350 {
351  if (!fValid)
352  return 0;
353 
354  // Find slave
355  TSlaveStat *slstat = (TSlaveStat*) fWrkStats->GetValue(sl);
356  if (!slstat) {
357  Warning("GetNextPacket", "Received a packet request from an unknown slave: %s:%s",
358  sl->GetName(), sl->GetOrdinal());
359  return 0;
360  }
361 
362  PDB(kPacketizer,2)
363  Info("GetNextPacket","worker-%s: fAssigned %lld\t", sl->GetOrdinal(), fAssigned);
364 
365  // Update stats & free old element
366  Double_t latency = 0., proctime = 0., proccpu = 0.;
367  Long64_t bytesRead = -1;
368  Long64_t totalEntries = -1; // used only to read an old message type
369  Long64_t totev = 0;
370  Long64_t numev = -1;
371 
373  if (sl->GetProtocol() > 18) {
374  (*r) >> latency;
375  (*r) >> status;
376 
377  // Calculate the progress made in the last packet
378  TProofProgressStatus *progress = 0;
379  if (status) {
380  // update the worker status
381  numev = status->GetEntries() - slstat->GetEntriesProcessed();
382  progress = slstat->AddProcessed(status);
383  if (progress) {
384  // (*fProgressStatus) += *progress;
385  proctime = progress->GetProcTime();
386  proccpu = progress->GetCPUTime();
387  totev = status->GetEntries(); // for backward compatibility
388  bytesRead = progress->GetBytesRead();
389  delete progress;
390  }
391  delete status;
392  } else
393  Error("GetNextPacket", "no status came in the kPROOF_GETPACKET message");
394  } else {
395 
396  (*r) >> latency >> proctime >> proccpu;
397 
398  // only read new info if available
399  if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
400  if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
401  if (r->BufferSize() > r->Length()) (*r) >> totev;
402 
403  numev = totev - slstat->GetEntriesProcessed();
404  slstat->GetProgressStatus()->IncEntries(numev);
405  slstat->GetProgressStatus()->SetLastUpdate();
406  }
407 
408  fProgressStatus->IncEntries(numev);
410 
411  fProcessing = 0;
412 
413  PDB(kPacketizer,2)
414  Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
415  sl->GetOrdinal(), sl->GetName(),
416  numev, latency, proctime, proccpu, bytesRead);
417 
418  if (gPerfStats != 0) {
419  gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(), "", numev,
420  latency, proctime, proccpu, bytesRead);
421  }
422 
423  if (fNumPerWorker > 0 && slstat->GetEntriesProcessed() >= fNumPerWorker) {
424  PDB(kPacketizer,2)
425  Info("GetNextPacket","worker-%s (%s) is done (%lld cycles)",
426  sl->GetOrdinal(), sl->GetName(), slstat->GetEntriesProcessed());
427  return 0;
428  }
429 
430  if (fAssigned == fTotalEntries) {
431  Bool_t done = kTRUE;
432  // If we are on a submaster, check if there is something else to do
435  if (nxe) {
436  if (AssignWork(0,0,nxe->GetNum()) == 0) {
437  if (fAssigned < fTotalEntries) done = kFALSE;
438  } else {
439  Error("GetNextPacket", "problems assigning additional work: stop");
440  }
441  SafeDelete(nxe);
442  }
443  }
444  if (done) {
445  // Send last timer message
446  HandleTimer(0);
447  return 0;
448  }
449  }
450 
451  if (fStop) {
452  // Send last timer message
453  HandleTimer(0);
454  return 0;
455  }
456 
457 
458  Long64_t num;
459 
460  // Get the current time
461  Double_t cTime = GetCurrentTime();
462 
463  if (slstat->fCircNtp->GetEntries() <= 0) {
464  // The calibration phase
466  num = (Long64_t) (fCalibFrac * avg);
467  if (num < 1) num = (avg >= 1) ? avg : 1;
468  PDB(kPacketizer,2)
469  Info("GetNextPacket", "calibration: total entries %lld, workers %d, frac: %.1f %%, raw num: %lld",
470  fTotalEntries, fWrkStats->GetSize(), fCalibFrac * 100., num);
471 
472  // Create a reference entry
473  slstat->UpdatePerformance(0.);
474 
475  } else {
476 
477  if (fNumPerWorker < 0) {
478 
479  // Schedule tasks for workers based on the currently estimated processing speeds
480 
481  // Update performances
482  // slstat->fStatus was updated before;
483  slstat->UpdatePerformance(proctime);
484 
485  // We need to estimate the total instantaneous rate: for the workers not having yet
486  // one we assume the average of those having a measurement
487  // The optimal number for worker j is
488  //
489  // n_j = r_j / Sum r_i * N_left
490  //
491 
492  Int_t nrm = 0;
493  Double_t sumRate = 0.;
494  TIter nxwrk(fWrkStats);
495  TSlaveStat *wrkStat = 0;
496  TSlave *tmpWrk = 0;
497  while ((tmpWrk = (TSlave *)nxwrk())) {
498  if ((wrkStat = dynamic_cast<TSlaveStat *>(fWrkStats->GetValue(tmpWrk)))) {
499  if (wrkStat->fRate > 0) {
500  nrm++;
501  sumRate += wrkStat->fRate;
502  }
503  PDB(kPacketizer,3)
504  Info("GetNextPacket", "%d: worker-%s: rate %lf /s (sum: %lf /s)",
505  nrm, tmpWrk->GetOrdinal(), wrkStat->fRate, sumRate);
506  } else {
507  Warning("GetNextPacket", "dynamic_cast<TSlaveStat *> failing on value for '%s (%s)'! Skipping",
508  tmpWrk->GetName(), tmpWrk->GetOrdinal());
509  }
510  }
511 
512  // Check consistency
513  if (nrm <= 0) {
514  Error("GetNextPacket", "no worker has consistent information: stop processing!");
515  return (TDSetElement *)0;
516  }
517 
518  Double_t avgRate = sumRate / nrm;
519  // Check if all workers had meaningful rate information
520  if (nrm < fWrkStats->GetSize()) {
521  // For some workers the measurement is missing: use the average
522  sumRate += (fWrkStats->GetSize() - nrm) * avgRate;
523  }
524  PDB(kPacketizer,2)
525  Info("GetNextPacket", "rate: avg: %lf /s/wrk - sum: %lf /s (measurements %d out of %d)",
526  avgRate, sumRate, nrm, fWrkStats->GetSize());
527 
528  // Packet size for this worker
529  Double_t wrkRate = (slstat->fRate > 0.) ? slstat->fRate : avgRate ;
530  num = (Long64_t) ((fTotalEntries - fAssigned) * wrkRate / sumRate);
531  PDB(kPacketizer,2)
532  Info("GetNextPacket", "worker-%s (%s): raw packet size: %lld", sl->GetOrdinal(), sl->GetName(), num);
533 
534  // Apply time-per-packet limits
535  Double_t packTime = num / wrkRate;
536  if (fMaxPacketTime > 0. && packTime > fMaxPacketTime) {
537  num = (Long64_t) (fMaxPacketTime * wrkRate) ;
538  packTime = fMaxPacketTime;
539  PDB(kPacketizer,2)
540  Info("GetNextPacket", "worker-%s (%s): time-limited packet size: %lld (upper limit: %.2f secs)",
541  sl->GetOrdinal(), sl->GetName(), num, fMaxPacketTime);
542  }
543  if (fMinPacketTime > 0. && packTime < fMinPacketTime) {
544  num = (Long64_t) (fMinPacketTime * wrkRate);
545  PDB(kPacketizer,2)
546  Info("GetNextPacket", "worker-%s (%s): time-limited packet size: %lld (lower limit: %.2f secs)",
547  sl->GetOrdinal(), sl->GetName(), num, fMinPacketTime);
548  }
549 
550  } else {
551  // Fixed number of cycles per worker
552  num = fNumPerWorker - slstat->fLastProcessed;
553  if (num > 1 && slstat->fRate > 0 && num / slstat->fRate > fMaxPacketTime) {
554  num = (Long64_t) (slstat->fRate * fMaxPacketTime);
555  }
556  }
557  }
558  // Minimum packet size
559  num = (num > 1) ? num : 1;
560  fProcessing = (num < (fTotalEntries - fAssigned)) ? num
561  : (fTotalEntries - fAssigned);
562 
563  // Set the information of the current slave
564  slstat->fLastProcessed = fProcessing;
565  // Set the start time of the current packet
566  slstat->fTimeInstant = cTime;
567 
568  // Update the sequential number
569  fPacketSeq++;
570  TString sseq = TString::Format("p%lld", fPacketSeq);
571 
572  PDB(kPacketizer,2)
573  Info("GetNextPacket", "worker-%s: num %lld, processing %lld, remaining %lld",sl->GetOrdinal(),
575  TDSetElement *elem = new TDSetElement(sseq, sseq, "", fAssigned, fProcessing);
577 
578  // Update the total counter
579  fAssigned += slstat->fLastProcessed;
580 
581  return elem;
582 }
583 
584 ////////////////////////////////////////////////////////////////////////////////
585 /// Adds new workers. Returns the number of workers added, or -1 on failure.
586 
588 {
589  if (!workers) {
590  Error("AddWorkers", "Null list of new workers!");
591  return -1;
592  }
593 
594  Int_t curNumOfWrks = fWrkStats->GetEntries();
595 
596  TSlave *sl;
597  TIter next(workers);
598  while (( sl = dynamic_cast<TSlave*>(next()) ))
599  fWrkStats->Add(sl, new TSlaveStat(sl, fInput));
600 
601  fNumPerWorker = -1;
602  if (fFixedNum && fWrkStats->GetSize() > 0) {
603  // Approximate number: the exact number is determined in GetNextPacket
604  fNumPerWorker = (fNumPerWorker * curNumOfWrks) / fWrkStats->GetSize();
605  if (fNumPerWorker == 0) fNumPerWorker = 1;
606  }
607 
608  fConfigParams->Add(new TParameter<Long64_t>("PROOF_PacketizerFixedNum", fNumPerWorker));
609 
610  return fWrkStats->GetEntries();
611 }
virtual Int_t GetEntries() const
Definition: TCollection.h:92
virtual Int_t AddProcessed(TSlave *, TProofProgressStatus *, Double_t, TList **)
const char * GetOrdinal() const
Definition: TSlave.h:135
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
Definition: TStopwatch.cxx:108
long long Long64_t
Definition: RtypesCore.h:69
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Definition: TStopwatch.cxx:56
A simple TTree restricted to a list of double variables only.
Definition: TNtupleD.h:30
ClassImp(TSeqCollection) Int_t TSeqCollection TIter next(this)
Return index of object in collection.
float Float_t
Definition: RtypesCore.h:53
Definition: TDSet.h:153
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:892
Long64_t GetBytesRead() const
Int_t AddWorkers(TList *workers)
Adds new workers. Returns the number of workers added, or -1 on failure.
void SetLastUpdate(Double_t updtTime=0)
Update time stamp either with the passed value (if > 0) or with the current time. ...
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection)...
Definition: TMap.cxx:52
void SetVal(const AParamType &val)
Definition: TParameter.h:79
Basic string class.
Definition: TString.h:137
TAlienJobStatus * status
Definition: TAlienJob.cxx:51
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
const Bool_t kFALSE
Definition: Rtypes.h:92
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:496
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet.
Long64_t GetEntriesProcessed() const
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:732
Double_t GetProcTime() const
Long64_t GetNum() const
Definition: TDSet.h:116
Double_t GetCPUTime() const
#define SafeDelete(p)
Definition: RConfig.h:436
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
static TString Format(const char *fmt,...)
Static method which formats a string using a printf style format descriptor and return a TString...
Definition: TString.cxx:2334
#define PDB(mask, level)
Definition: TProofDebug.h:58
Long64_t fProcessing
Int_t AssignWork(TDSet *, Long64_t, Long64_t num)
Assign work to be done to this packetizer.
void IncEntries(Long64_t entries=1)
TDSetElement * GetNextPacket(Long64_t totalEntries=-1)
Get next range of entries to be processed on this server.
void Info(const char *location, const char *msgfmt,...)
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Definition: TMap.cxx:234
void Continue()
Resume a stopped stopwatch.
Definition: TStopwatch.cxx:91
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:918
void Error(const char *location, const char *msgfmt,...)
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition: TProof.cxx:10485
TProofProgressStatus * fProgressStatus
A doubly linked list.
Definition: TList.h:47
ClassImp(TPacketizerUnit) TPacketizerUnit
Constructor.
const char * GetName() const
Returns name of object.
Definition: TSlave.h:128
void SetLastEntries(Long64_t entries)
Named parameter, streamable and storable.
Definition: TParameter.h:49
Long64_t fNumPerWorker
ROOT::R::TRInterface & r
Definition: Object.C:4
TObject * Next()
Definition: TCollection.h:158
virtual ~TPacketizerUnit()
Destructor.
void Warning(const char *location, const char *msgfmt,...)
Long64_t GetEntries() const
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap...
Definition: TMap.cxx:149
#define gPerfStats
long Long_t
Definition: RtypesCore.h:50
TStopwatch * fStopwatch
virtual Int_t GetSize() const
Definition: TCollection.h:95
double Double_t
Definition: RtypesCore.h:55
Int_t GetParallel() const
Definition: TSlave.h:145
Bool_t IsMaster() const
Definition: TProofServ.h:305
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:44
Int_t GetProtocol() const
Definition: TSlave.h:137
Mother of all ROOT objects.
Definition: TObject.h:58
Int_t BufferSize() const
Definition: TBuffer.h:92
R__EXTERN TProofServ * gProofServ
Definition: TProofServ.h:359
virtual void Add(TObject *obj)
Definition: TList.h:81
Int_t Length() const
Definition: TBuffer.h:94
Bool_t IsTopMaster() const
Definition: TProofServ.h:307
ClassImp(TSlaveInfo) Int_t TSlaveInfo const TSlaveInfo * si
Used to sort slaveinfos by ordinal.
Definition: TProof.cxx:167
Definition: TSlave.h:50
const Bool_t kTRUE
Definition: Rtypes.h:91
Double_t GetCurrentTime()
Get current time.
Stopwatch class.
Definition: TStopwatch.h:30
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:904