Logo ROOT   6.12/07
Reference Guide
TPacketizerUnit.cxx
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: Long Tran-Thanh 22/07/07
3 // Revised: G. Ganis, May 2011
4 
5 /*************************************************************************
6  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
7  * All rights reserved. *
8  * *
9  * For the licensing terms see $ROOTSYS/LICENSE. *
10  * For the list of contributors see $ROOTSYS/README/CREDITS. *
11  *************************************************************************/
12 
13 /** \class TPacketizerUnit
14 \ingroup proofkernel
15 
16 This packetizer generates packets of generic units, representing the
17 number of times an operation cycle has to be repeated by the worker
18 node, e.g. the number of Monte carlo events to be generated.
19 Packets sizes are generated taking into account the performance of
20 worker nodes, based on the time needed to process previous packets,
21 with the goal of having all workers ending at the same time.
22 
23 */
24 
25 
26 #include "TPacketizerUnit.h"
27 
28 #include "Riostream.h"
29 #include "TDSet.h"
30 #include "TError.h"
31 #include "TEventList.h"
32 #include "TMap.h"
33 #include "TMessage.h"
34 #include "TMonitor.h"
35 #include "TNtupleD.h"
36 #include "TObject.h"
37 #include "TParameter.h"
38 #include "TPerfStats.h"
39 #include "TProofDebug.h"
40 #include "TProof.h"
41 #include "TProofPlayer.h"
42 #include "TProofServ.h"
43 #include "TSlave.h"
44 #include "TSocket.h"
45 #include "TStopwatch.h"
46 #include "TTimer.h"
47 #include "TUrl.h"
48 #include "TClass.h"
49 #include "TMath.h"
50 #include "TObjString.h"
51 
52 
53 using namespace TMath;
54 //
55 // The following utility class manage the state of the
56 // work to be performed and the slaves involved in the process.
57 //
58 // The list of TSlaveStat(s) keep track of the work (being) done
59 // by each slave
60 //
61 
62 //------------------------------------------------------------------------------
63 
64 class TPacketizerUnit::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
65 
66 friend class TPacketizerUnit;
67 
68 private:
69  Long64_t fLastProcessed; // Number of processed entries of the last packet
70  Double_t fRate; // Estimated processing rate averaged over circularity
71  Double_t fTimeInstant; // Starting time of the current packet
72  TNtupleD *fCircNtp; // Keeps circular info for speed calculations
73  Long_t fCircLvl; // Circularity level
74 
75 public:
76  TSlaveStat(TSlave *sl, TList *input);
77  ~TSlaveStat();
78 
79 // void GetCurrentTime();
80 
81  void UpdatePerformance(Double_t time);
83 
84 // ClassDef(TPacketizerUnit::TSlaveStat, 0);
85 };
86 
87 ////////////////////////////////////////////////////////////////////////////////
88 /// Main constructor
89 
90 TPacketizerUnit::TSlaveStat::TSlaveStat(TSlave *slave, TList *input)
91  : fLastProcessed(0),
92  fRate(0), fTimeInstant(0), fCircLvl(5)
93 {
94  // Initialize the circularity ntple for speed calculations
95  fCircNtp = new TNtupleD("Speed Circ Ntp", "Circular process info","tm:ev");
96  fCircNtp->SetDirectory(0);
97  TProof::GetParameter(input, "PROOF_TPacketizerUnitCircularity", fCircLvl);
98  fCircLvl = (fCircLvl > 0) ? fCircLvl : 5;
99  fCircNtp->SetCircular(fCircLvl);
100  fSlave = slave;
101  fStatus = new TProofProgressStatus();
102 }
103 
104 ////////////////////////////////////////////////////////////////////////////////
105 /// Destructor
106 
107 TPacketizerUnit::TSlaveStat::~TSlaveStat()
108 {
109  SafeDelete(fCircNtp);
110 }
111 
112 ////////////////////////////////////////////////////////////////////////////////
113 /// Update the circular ntple
114 
115 void TPacketizerUnit::TSlaveStat::UpdatePerformance(Double_t time)
116 {
117  Double_t ttot = time;
118  Double_t *ar = fCircNtp->GetArgs();
119  Int_t ne = fCircNtp->GetEntries();
120  if (ne <= 0) {
121  // First call: just fill one ref entry and return
122  fCircNtp->Fill(0., 0);
123  fRate = 0.;
124  return;
125  }
126  // Fill the entry
127  fCircNtp->GetEntry(ne-1);
128  ttot = ar[0] + time;
129  fCircNtp->Fill(ttot, GetEntriesProcessed());
130 
131  // Calculate the speed
132  fCircNtp->GetEntry(0);
133  Double_t dtime = (ttot > ar[0]) ? ttot - ar[0] : ne+1 ;
134  Long64_t nevts = GetEntriesProcessed() - (Long64_t)ar[1];
135  fRate = nevts / dtime;
136  PDB(kPacketizer,2)
137  Info("UpdatePerformance", "time:%f, dtime:%f, nevts:%lld, speed: %f",
138  time, dtime, nevts, fRate);
139 
140 }
141 
142 ////////////////////////////////////////////////////////////////////////////////
143 /// Update the status info to the 'st'.
144 /// return the difference (*st - *fStatus)
145 
147 {
148  if (st) {
149  // The entriesis not correct in 'st'
150  Long64_t lastEntries = st->GetEntries() - fStatus->GetEntries();
151  // The last proc time should not be added
152  fStatus->SetLastProcTime(0.);
153  // Get the diff
154  TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
155  *fStatus += *diff;
156  // Set the correct value
157  fStatus->SetLastEntries(lastEntries);
158  return diff;
159  } else {
160  Error("AddProcessed", "status arg undefined");
161  return 0;
162  }
163 }
164 
165 //------------------------------------------------------------------------------
166 
168 
169 ////////////////////////////////////////////////////////////////////////////////
170 /// Constructor
171 
174  : TVirtualPacketizer(input, st)
175 {
176  PDB(kPacketizer,1) Info("TPacketizerUnit", "enter (num %lld)", num);
177 
178  // Init pointer members
179  fWrkStats = 0;
180  fPackets = 0;
181  fInput = input;
182 
183  fFixedNum = kFALSE;
184  Int_t fixednum = -1;
185  if (TProof::GetParameter(input, "PROOF_PacketizerFixedNum", fixednum) != 0 || fixednum <= 0) {
186  fFixedNum = kFALSE;
187  }
188  else {
189  Info("TPacketizerUnit", "forcing the same cycles on each worker");
190  fFixedNum = kTRUE;
191  }
192 
193  fCalibFrac = 0.01;
194  if (TProof::GetParameter(input, "PROOF_PacketizerCalibFrac", fCalibFrac) != 0 || fCalibFrac <= 0)
195  fCalibFrac = 0.01;
196  PDB(kPacketizer,1)
197  Info("TPacketizerUnit", "size of the calibration packets: %.2f %% of average number per worker", fCalibFrac);
198 
199  fMaxPacketTime = 3.;
200  Double_t timeLimit = -1;
201  if (TProof::GetParameter(input, "PROOF_PacketizerTimeLimit", timeLimit) == 0) {
202  fMaxPacketTime = timeLimit;
203  Warning("TPacketizerUnit", "PROOF_PacketizerTimeLimit is deprecated: use PROOF_MaxPacketTime instead");
204  }
205  PDB(kPacketizer,1)
206  Info("TPacketizerUnit", "time limit is %lf", fMaxPacketTime);
207 
208  // Different default for min packet time
209  fMinPacketTime = 1;
210  Double_t minPacketTime = 0;
211  if (TProof::GetParameter(input, "PROOF_MinPacketTime", minPacketTime) == 0) fMinPacketTime = minPacketTime;
212  TParameter<Double_t> *mpt = (TParameter<Double_t> *) fConfigParams->FindObject("PROOF_MinPacketTime");
213  if (mpt) {
214  mpt->SetVal(fMinPacketTime);
215  } else {
216  fConfigParams->Add(new TParameter<Double_t>("PROOF_MinPacketTime", fMinPacketTime));
217  }
218 
219  fProcessing = 0;
220  fAssigned = 0;
221  fPacketSeq = 0;
222 
223  fStopwatch = new TStopwatch();
224 
225  fPackets = new TList;
226  fPackets->SetOwner();
227 
228  fWrkStats = new TMap;
230  fWrkExcluded = 0;
231 
232  TSlave *slave;
233  TIter si(slaves);
234  while ((slave = (TSlave*) si.Next())) {
235  if (slave->GetParallel() > 0) {
236  fWrkStats->Add(slave, new TSlaveStat(slave, input));
237  } else {
238  if (!fWrkExcluded) {
239  fWrkExcluded = new TList;
241  }
242  PDB(kPacketizer,2)
243  Info("TPacketizerUnit", "node '%s' has NO active worker: excluded from work distribution", slave->GetOrdinal());
244  fWrkExcluded->Add(slave);
245  }
246  }
247 
248  fTotalEntries = 0;
249  fNumPerWorker = -1;
250  if (num > 0 && AssignWork(0,0,num) != 0)
251  Warning("TPacketizerUnit", "some problems assigning work");
252 
253  // Save the config parameters in the dedicated list so that they will be saved
254  // in the outputlist and therefore in the relevant TQueryResult
255  fConfigParams->Add(new TParameter<Float_t>("PROOF_PacketizerCalibFrac", fCalibFrac));
256 
257  fStopwatch->Start();
258  PDB(kPacketizer,1) Info("TPacketizerUnit", "return");
259 }
260 
261 ////////////////////////////////////////////////////////////////////////////////
262 /// Assign work to be done to this packetizer
263 
265 {
266  if (num < 0) {
267  Error("AssignWork", "assigned a negative number (%lld) of cycles - protocol error?", num);
268  return -1;
269  }
270 
271  fTotalEntries += num;
272  PDB(kPacketizer,1)
273  Info("AssignWork", "assigned %lld additional cycles (new total: %lld)", num, fTotalEntries);
274 
275  // Update fixed number counter
276  if (fFixedNum && fWrkStats->GetSize() > 0) {
277  // Approximate number: the exact number is determined in GetNextPacket
279  if (fNumPerWorker == 0) fNumPerWorker = 1;
280  }
281 
282  // Update/Save the config parameters in the dedicated list so that they will be saved
283  // in the outputlist and therefore in the relevant TQueryResult
285  (TParameter<Long64_t> *) fConfigParams->FindObject("PROOF_PacketizerFixedNum");
286  if (fn) {
287  fn->SetVal(fNumPerWorker);
288  } else {
289  fConfigParams->Add(new TParameter<Long64_t>("PROOF_PacketizerFixedNum", fNumPerWorker));
290  }
291 
292  // Done
293  return 0;
294 }
295 
296 ////////////////////////////////////////////////////////////////////////////////
297 /// Destructor.
298 
300 {
301  if (fWrkStats)
307 }
308 
309 ////////////////////////////////////////////////////////////////////////////////
310 /// Get current time
311 
313 {
314  Double_t retValue = fStopwatch->RealTime();
315  fStopwatch->Continue();
316  return retValue;
317 }
318 
319 ////////////////////////////////////////////////////////////////////////////////
320 /// Get Estimation of the current rate; just summing the current rates of
321 /// the active workers
322 
324 {
325  all = kTRUE;
326  // Loop over the workers
327  Float_t currate = 0.;
328  if (fWrkStats && fWrkStats->GetSize() > 0) {
329  TIter nxw(fWrkStats);
330  TObject *key;
331  while ((key = nxw()) != 0) {
332  TSlaveStat *slstat = (TSlaveStat *) fWrkStats->GetValue(key);
333  if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
334  // Sum-up the current rates
335  currate += slstat->GetProgressStatus()->GetCurrentRate();
336  } else {
337  all = kFALSE;
338  }
339  }
340  }
341  // Done
342  return currate;
343 }
344 
345 ////////////////////////////////////////////////////////////////////////////////
346 /// Get next packet
347 
349 {
350  if (!fValid)
351  return 0;
352 
353  // Find slave
354  TSlaveStat *slstat = (TSlaveStat*) fWrkStats->GetValue(sl);
355  if (!slstat) {
356  Warning("GetNextPacket", "Received a packet request from an unknown slave: %s:%s",
357  sl->GetName(), sl->GetOrdinal());
358  return 0;
359  }
360 
361  PDB(kPacketizer,2)
362  Info("GetNextPacket","worker-%s: fAssigned %lld\t", sl->GetOrdinal(), fAssigned);
363 
364  // Update stats & free old element
365  Double_t latency = 0., proctime = 0., proccpu = 0.;
366  Long64_t bytesRead = -1;
367  Long64_t totalEntries = -1; // used only to read an old message type
368  Long64_t totev = 0;
369  Long64_t numev = -1;
370 
371  TProofProgressStatus *status = 0;
372  if (sl->GetProtocol() > 18) {
373  (*r) >> latency;
374  (*r) >> status;
375 
376  // Calculate the progress made in the last packet
377  TProofProgressStatus *progress = 0;
378  if (status) {
379  // update the worker status
380  numev = status->GetEntries() - slstat->GetEntriesProcessed();
381  progress = slstat->AddProcessed(status);
382  if (progress) {
383  // (*fProgressStatus) += *progress;
384  proctime = progress->GetProcTime();
385  proccpu = progress->GetCPUTime();
386  totev = status->GetEntries(); // for backward compatibility
387  bytesRead = progress->GetBytesRead();
388  delete progress;
389  }
390  delete status;
391  } else
392  Error("GetNextPacket", "no status came in the kPROOF_GETPACKET message");
393  } else {
394 
395  (*r) >> latency >> proctime >> proccpu;
396 
397  // only read new info if available
398  if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
399  if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
400  if (r->BufferSize() > r->Length()) (*r) >> totev;
401 
402  numev = totev - slstat->GetEntriesProcessed();
403  slstat->GetProgressStatus()->IncEntries(numev);
404  slstat->GetProgressStatus()->SetLastUpdate();
405  }
406 
407  fProgressStatus->IncEntries(numev);
409 
410  fProcessing = 0;
411 
412  PDB(kPacketizer,2)
413  Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
414  sl->GetOrdinal(), sl->GetName(),
415  numev, latency, proctime, proccpu, bytesRead);
416 
417  if (gPerfStats != 0) {
418  gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(), "", numev,
419  latency, proctime, proccpu, bytesRead);
420  }
421 
422  if (fNumPerWorker > 0 && slstat->GetEntriesProcessed() >= fNumPerWorker) {
423  PDB(kPacketizer,2)
424  Info("GetNextPacket","worker-%s (%s) is done (%lld cycles)",
425  sl->GetOrdinal(), sl->GetName(), slstat->GetEntriesProcessed());
426  return 0;
427  }
428 
429  if (fAssigned == fTotalEntries) {
430  Bool_t done = kTRUE;
431  // If we are on a submaster, check if there is something else to do
434  if (nxe) {
435  if (AssignWork(0,0,nxe->GetNum()) == 0) {
436  if (fAssigned < fTotalEntries) done = kFALSE;
437  } else {
438  Error("GetNextPacket", "problems assigning additional work: stop");
439  }
440  SafeDelete(nxe);
441  }
442  }
443  if (done) {
444  // Send last timer message
445  HandleTimer(0);
446  return 0;
447  }
448  }
449 
450  if (fStop) {
451  // Send last timer message
452  HandleTimer(0);
453  return 0;
454  }
455 
456 
457  Long64_t num;
458 
459  // Get the current time
460  Double_t cTime = GetCurrentTime();
461 
462  if (slstat->fCircNtp->GetEntries() <= 0) {
463  // The calibration phase
465  num = (Long64_t) (fCalibFrac * avg);
466  if (num < 1) num = (avg >= 1) ? avg : 1;
467  PDB(kPacketizer,2)
468  Info("GetNextPacket", "calibration: total entries %lld, workers %d, frac: %.1f %%, raw num: %lld",
469  fTotalEntries, fWrkStats->GetSize(), fCalibFrac * 100., num);
470 
471  // Create a reference entry
472  slstat->UpdatePerformance(0.);
473 
474  } else {
475 
476  if (fNumPerWorker < 0) {
477 
478  // Schedule tasks for workers based on the currently estimated processing speeds
479 
480  // Update performances
481  // slstat->fStatus was updated before;
482  slstat->UpdatePerformance(proctime);
483 
484  // We need to estimate the total instantaneous rate: for the workers not having yet
485  // one we assume the average of those having a measurement
486  // The optimal number for worker j is
487  //
488  // n_j = r_j / Sum r_i * N_left
489  //
490 
491  Int_t nrm = 0;
492  Double_t sumRate = 0.;
493  TIter nxwrk(fWrkStats);
494  TSlaveStat *wrkStat = 0;
495  TSlave *tmpWrk = 0;
496  while ((tmpWrk = (TSlave *)nxwrk())) {
497  if ((wrkStat = dynamic_cast<TSlaveStat *>(fWrkStats->GetValue(tmpWrk)))) {
498  if (wrkStat->fRate > 0) {
499  nrm++;
500  sumRate += wrkStat->fRate;
501  }
502  PDB(kPacketizer,3)
503  Info("GetNextPacket", "%d: worker-%s: rate %lf /s (sum: %lf /s)",
504  nrm, tmpWrk->GetOrdinal(), wrkStat->fRate, sumRate);
505  } else {
506  Warning("GetNextPacket", "dynamic_cast<TSlaveStat *> failing on value for '%s (%s)'! Skipping",
507  tmpWrk->GetName(), tmpWrk->GetOrdinal());
508  }
509  }
510 
511  // Check consistency
512  if (nrm <= 0) {
513  Error("GetNextPacket", "no worker has consistent information: stop processing!");
514  return (TDSetElement *)0;
515  }
516 
517  Double_t avgRate = sumRate / nrm;
518  // Check if all workers had meaningful rate information
519  if (nrm < fWrkStats->GetSize()) {
520  // For some workers the measurement is missing: use the average
521  sumRate += (fWrkStats->GetSize() - nrm) * avgRate;
522  }
523  PDB(kPacketizer,2)
524  Info("GetNextPacket", "rate: avg: %lf /s/wrk - sum: %lf /s (measurements %d out of %d)",
525  avgRate, sumRate, nrm, fWrkStats->GetSize());
526 
527  // Packet size for this worker
528  Double_t wrkRate = (slstat->fRate > 0.) ? slstat->fRate : avgRate ;
529  num = (Long64_t) ((fTotalEntries - fAssigned) * wrkRate / sumRate);
530  PDB(kPacketizer,2)
531  Info("GetNextPacket", "worker-%s (%s): raw packet size: %lld", sl->GetOrdinal(), sl->GetName(), num);
532 
533  // Apply time-per-packet limits
534  Double_t packTime = num / wrkRate;
535  if (fMaxPacketTime > 0. && packTime > fMaxPacketTime) {
536  num = (Long64_t) (fMaxPacketTime * wrkRate) ;
537  packTime = fMaxPacketTime;
538  PDB(kPacketizer,2)
539  Info("GetNextPacket", "worker-%s (%s): time-limited packet size: %lld (upper limit: %.2f secs)",
540  sl->GetOrdinal(), sl->GetName(), num, fMaxPacketTime);
541  }
542  if (fMinPacketTime > 0. && packTime < fMinPacketTime) {
543  num = (Long64_t) (fMinPacketTime * wrkRate);
544  PDB(kPacketizer,2)
545  Info("GetNextPacket", "worker-%s (%s): time-limited packet size: %lld (lower limit: %.2f secs)",
546  sl->GetOrdinal(), sl->GetName(), num, fMinPacketTime);
547  }
548 
549  } else {
550  // Fixed number of cycles per worker
551  num = fNumPerWorker - slstat->fLastProcessed;
552  if (num > 1 && slstat->fRate > 0 && num / slstat->fRate > fMaxPacketTime) {
553  num = (Long64_t) (slstat->fRate * fMaxPacketTime);
554  }
555  }
556  }
557  // Minimum packet size
558  num = (num > 1) ? num : 1;
559  fProcessing = (num < (fTotalEntries - fAssigned)) ? num
560  : (fTotalEntries - fAssigned);
561 
562  // Set the information of the current slave
563  slstat->fLastProcessed = fProcessing;
564  // Set the start time of the current packet
565  slstat->fTimeInstant = cTime;
566 
567  // Update the sequential number
568  fPacketSeq++;
569  TString sseq = TString::Format("p%lld", fPacketSeq);
570 
571  PDB(kPacketizer,2)
572  Info("GetNextPacket", "worker-%s: num %lld, processing %lld, remaining %lld",sl->GetOrdinal(),
574  TDSetElement *elem = new TDSetElement(sseq, sseq, "", fAssigned, fProcessing);
576 
577  // Update the total counter
578  fAssigned += slstat->fLastProcessed;
579 
580  return elem;
581 }
582 
583 ////////////////////////////////////////////////////////////////////////////////
584 /// Adds new workers. Returns the number of workers added, or -1 on failure.
585 
587 {
588  if (!workers) {
589  Error("AddWorkers", "Null list of new workers!");
590  return -1;
591  }
592 
593  Int_t curNumOfWrks = fWrkStats->GetEntries();
594 
595  TSlave *sl;
596  TIter next(workers);
597  while (( sl = dynamic_cast<TSlave*>(next()) ))
598  fWrkStats->Add(sl, new TSlaveStat(sl, fInput));
599 
600  fNumPerWorker = -1;
601  if (fFixedNum && fWrkStats->GetSize() > 0) {
602  // Approximate number: the exact number is determined in GetNextPacket
603  fNumPerWorker = (fNumPerWorker * curNumOfWrks) / fWrkStats->GetSize();
604  if (fNumPerWorker == 0) fNumPerWorker = 1;
605  }
606 
607  fConfigParams->Add(new TParameter<Long64_t>("PROOF_PacketizerFixedNum", fNumPerWorker));
608 
609  return fWrkStats->GetEntries();
610 }
virtual Int_t AddProcessed(TSlave *, TProofProgressStatus *, Double_t, TList **)
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:854
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
Definition: TStopwatch.cxx:110
long long Long64_t
Definition: RtypesCore.h:69
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Definition: TStopwatch.cxx:58
A simple TTree restricted to a list of double variables only.
Definition: TNtupleD.h:28
float Float_t
Definition: RtypesCore.h:53
Int_t GetParallel() const
Definition: TSlave.h:141
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition: TProof.cxx:9890
This class implements a data set to be used for PROOF processing.
Definition: TDSet.h:151
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
Double_t GetProcTime() const
virtual Int_t GetEntries() const
Definition: TCollection.h:177
Int_t AddWorkers(TList *workers)
Adds new workers. Returns the number of workers added, or -1 on failure.
void SetLastUpdate(Double_t updtTime=0)
Update time stamp either with the passed value (if > 0) or with the current time. ...
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection)...
Definition: TMap.cxx:53
void SetVal(const AParamType &val)
Definition: TParameter.h:71
const char * GetOrdinal() const
Definition: TSlave.h:131
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
const char * GetName() const
Returns name of object.
Definition: TSlave.h:124
Bool_t IsTopMaster() const
Definition: TProofServ.h:295
Long64_t GetEntries() const
Int_t GetProtocol() const
Definition: TSlave.h:133
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet.
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:694
virtual TObject * FindObject(const char *name) const
Delete a TObjLink object.
Definition: TList.cxx:574
Int_t Length() const
Definition: TBuffer.h:96
Manages an element of a TDSet.
Definition: TDSet.h:66
#define SafeDelete(p)
Definition: RConfig.h:509
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
Long64_t GetNum() const
Definition: TDSet.h:114
static TString Format(const char *fmt,...)
Static method which formats a string using a printf style format descriptor and return a TString...
Definition: TString.cxx:2365
#define PDB(mask, level)
Definition: TProofDebug.h:56
Long64_t fProcessing
Int_t AssignWork(TDSet *, Long64_t, Long64_t num)
Assign work to be done to this packetizer.
void IncEntries(Long64_t entries=1)
TDSetElement * GetNextPacket(Long64_t totalEntries=-1)
Get next range of entries to be processed on this server.
void Continue()
Resume a stopped stopwatch.
Definition: TStopwatch.cxx:93
This packetizer generates packets of generic units, representing the number of times an operation cyc...
TProofProgressStatus * fProgressStatus
A doubly linked list.
Definition: TList.h:44
void SetLastEntries(Long64_t entries)
Named parameter, streamable and storable.
Definition: TParameter.h:37
Long64_t fNumPerWorker
ROOT::R::TRInterface & r
Definition: Object.C:4
TObject * Next()
Definition: TCollection.h:247
virtual ~TPacketizerUnit()
Destructor.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:880
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap...
Definition: TMap.cxx:150
#define gPerfStats
const Bool_t kFALSE
Definition: RtypesCore.h:88
long Long_t
Definition: RtypesCore.h:50
The packetizer is a load balancing object created for each query.
TStopwatch * fStopwatch
#define ClassImp(name)
Definition: Rtypes.h:359
double Double_t
Definition: RtypesCore.h:55
Long64_t GetEntriesProcessed() const
Double_t GetCPUTime() const
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:40
Int_t BufferSize() const
Definition: TBuffer.h:94
Mother of all ROOT objects.
Definition: TObject.h:37
Long64_t GetBytesRead() const
R__EXTERN TProofServ * gProofServ
Definition: TProofServ.h:347
virtual void Add(TObject *obj)
Definition: TList.h:87
Int_t AddProcessed(TSlave *wrk, TProofProgressStatus *st, Double_t lat, TList **missing)
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Definition: TMap.cxx:235
virtual Int_t GetSize() const
Definition: TCollection.h:180
Class describing a PROOF worker server.
Definition: TSlave.h:46
Container class for processing statistics.
const Bool_t kTRUE
Definition: RtypesCore.h:87
Bool_t IsMaster() const
Definition: TProofServ.h:293
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:866
Double_t GetCurrentTime()
Get current time.
Stopwatch class.
Definition: TStopwatch.h:28