Logo ROOT  
Reference Guide
TPacketizerUnit.cxx
Go to the documentation of this file.
1// @(#)root/proofplayer:$Id$
2// Author: Long Tran-Thanh 22/07/07
3// Revised: G. Ganis, May 2011
4
5/*************************************************************************
6 * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13/** \class TPacketizerUnit
14\ingroup proofkernel
15
16This packetizer generates packets of generic units, representing the
17number of times an operation cycle has to be repeated by the worker
18node, e.g. the number of Monte carlo events to be generated.
19Packets sizes are generated taking into account the performance of
20worker nodes, based on the time needed to process previous packets,
21with the goal of having all workers ending at the same time.
22
23*/
24
25
26#include "TPacketizerUnit.h"
27
28#include "Riostream.h"
29#include "TDSet.h"
30#include "TError.h"
31#include "TEventList.h"
32#include "TMap.h"
33#include "TMessage.h"
34#include "TMonitor.h"
35#include "TNtupleD.h"
36#include "TObject.h"
37#include "TParameter.h"
38#include "TPerfStats.h"
39#include "TProofDebug.h"
40#include "TProof.h"
41#include "TProofPlayer.h"
42#include "TProofServ.h"
43#include "TSlave.h"
44#include "TSocket.h"
45#include "TStopwatch.h"
46#include "TTimer.h"
47#include "TUrl.h"
48#include "TClass.h"
49#include "TMath.h"
50#include "TObjString.h"
51
52
53using namespace TMath;
54//
55// The following utility class manage the state of the
56// work to be performed and the slaves involved in the process.
57//
58// The list of TSlaveStat(s) keep track of the work (being) done
59// by each slave
60//
61
62//------------------------------------------------------------------------------
63
64class TPacketizerUnit::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
65
66friend class TPacketizerUnit;
67
68private:
69 Long64_t fLastProcessed; // Number of processed entries of the last packet
70 Double_t fRate; // Estimated processing rate averaged over circularity
71 Double_t fTimeInstant; // Starting time of the current packet
72 TNtupleD *fCircNtp; // Keeps circular info for speed calculations
73 Long_t fCircLvl; // Circularity level
74
75public:
76 TSlaveStat(TSlave *sl, TList *input);
77 ~TSlaveStat();
78
79// void GetCurrentTime();
80
81 void UpdatePerformance(Double_t time);
83
84// ClassDef(TPacketizerUnit::TSlaveStat, 0);
85};
86
87////////////////////////////////////////////////////////////////////////////////
88/// Main constructor
89
90TPacketizerUnit::TSlaveStat::TSlaveStat(TSlave *slave, TList *input)
91 : fLastProcessed(0),
92 fRate(0), fTimeInstant(0), fCircLvl(5)
93{
94 // Initialize the circularity ntple for speed calculations
95 fCircNtp = new TNtupleD("Speed Circ Ntp", "Circular process info","tm:ev");
96 fCircNtp->SetDirectory(0);
97 TProof::GetParameter(input, "PROOF_TPacketizerUnitCircularity", fCircLvl);
98 fCircLvl = (fCircLvl > 0) ? fCircLvl : 5;
99 fCircNtp->SetCircular(fCircLvl);
100 fSlave = slave;
101 fStatus = new TProofProgressStatus();
102}
103
104////////////////////////////////////////////////////////////////////////////////
105/// Destructor
106
107TPacketizerUnit::TSlaveStat::~TSlaveStat()
108{
109 SafeDelete(fCircNtp);
110}
111
112////////////////////////////////////////////////////////////////////////////////
113/// Update the circular ntple
114
115void TPacketizerUnit::TSlaveStat::UpdatePerformance(Double_t time)
116{
117 Double_t ttot = time;
118 Double_t *ar = fCircNtp->GetArgs();
119 Int_t ne = fCircNtp->GetEntries();
120 if (ne <= 0) {
121 // First call: just fill one ref entry and return
122 fCircNtp->Fill(0., 0);
123 fRate = 0.;
124 return;
125 }
126 // Fill the entry
127 fCircNtp->GetEntry(ne-1);
128 ttot = ar[0] + time;
129 fCircNtp->Fill(ttot, GetEntriesProcessed());
130
131 // Calculate the speed
132 fCircNtp->GetEntry(0);
133 Double_t dtime = (ttot > ar[0]) ? ttot - ar[0] : ne+1 ;
134 Long64_t nevts = GetEntriesProcessed() - (Long64_t)ar[1];
135 fRate = nevts / dtime;
136 PDB(kPacketizer,2)
137 Info("UpdatePerformance", "time:%f, dtime:%f, nevts:%lld, speed: %f",
138 time, dtime, nevts, fRate);
139
140}
141
142////////////////////////////////////////////////////////////////////////////////
143/// Update the status info to the 'st'.
144/// return the difference (*st - *fStatus)
145
147{
148 if (st) {
149 // The entriesis not correct in 'st'
150 Long64_t lastEntries = st->GetEntries() - fStatus->GetEntries();
151 // The last proc time should not be added
152 fStatus->SetLastProcTime(0.);
153 // Get the diff
154 TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
155 *fStatus += *diff;
156 // Set the correct value
157 fStatus->SetLastEntries(lastEntries);
158 return diff;
159 } else {
160 Error("AddProcessed", "status arg undefined");
161 return 0;
162 }
163}
164
165//------------------------------------------------------------------------------
166
168
169////////////////////////////////////////////////////////////////////////////////
170/// Constructor
171
174 : TVirtualPacketizer(input, st)
175{
176 PDB(kPacketizer,1) Info("TPacketizerUnit", "enter (num %lld)", num);
177
178 // Init pointer members
179 fWrkStats = 0;
180 fPackets = 0;
181 fInput = input;
182
184 Int_t fixednum = -1;
185 if (TProof::GetParameter(input, "PROOF_PacketizerFixedNum", fixednum) != 0 || fixednum <= 0) {
187 }
188 else {
189 Info("TPacketizerUnit", "forcing the same cycles on each worker");
191 }
192
193 fCalibFrac = 0.01;
194 if (TProof::GetParameter(input, "PROOF_PacketizerCalibFrac", fCalibFrac) != 0 || fCalibFrac <= 0)
195 fCalibFrac = 0.01;
196 PDB(kPacketizer,1)
197 Info("TPacketizerUnit", "size of the calibration packets: %.2f %% of average number per worker", fCalibFrac);
198
199 fMaxPacketTime = 3.;
200 Double_t timeLimit = -1;
201 if (TProof::GetParameter(input, "PROOF_PacketizerTimeLimit", timeLimit) == 0) {
202 fMaxPacketTime = timeLimit;
203 Warning("TPacketizerUnit", "PROOF_PacketizerTimeLimit is deprecated: use PROOF_MaxPacketTime instead");
204 }
205 PDB(kPacketizer,1)
206 Info("TPacketizerUnit", "time limit is %lf", fMaxPacketTime);
207
208 // Different default for min packet time
209 fMinPacketTime = 1;
210 Double_t minPacketTime = 0;
211 if (TProof::GetParameter(input, "PROOF_MinPacketTime", minPacketTime) == 0) fMinPacketTime = minPacketTime;
212 TParameter<Double_t> *mpt = (TParameter<Double_t> *) fConfigParams->FindObject("PROOF_MinPacketTime");
213 if (mpt) {
215 } else {
216 fConfigParams->Add(new TParameter<Double_t>("PROOF_MinPacketTime", fMinPacketTime));
217 }
218
219 fProcessing = 0;
220 fAssigned = 0;
221 fPacketSeq = 0;
222
223 fStopwatch = new TStopwatch();
224
225 fPackets = new TList;
227
228 fWrkStats = new TMap;
230 fWrkExcluded = 0;
231
232 TSlave *slave;
233 TIter si(slaves);
234 while ((slave = (TSlave*) si.Next())) {
235 if (slave->GetParallel() > 0) {
236 fWrkStats->Add(slave, new TSlaveStat(slave, input));
237 } else {
238 if (!fWrkExcluded) {
239 fWrkExcluded = new TList;
241 }
242 PDB(kPacketizer,2)
243 Info("TPacketizerUnit", "node '%s' has NO active worker: excluded from work distribution", slave->GetOrdinal());
244 fWrkExcluded->Add(slave);
245 }
246 }
247
248 fTotalEntries = 0;
249 fNumPerWorker = -1;
250 if (num > 0 && AssignWork(0,0,num) != 0)
251 Warning("TPacketizerUnit", "some problems assigning work");
252
253 // Save the config parameters in the dedicated list so that they will be saved
254 // in the outputlist and therefore in the relevant TQueryResult
255 fConfigParams->Add(new TParameter<Float_t>("PROOF_PacketizerCalibFrac", fCalibFrac));
256
257 fStopwatch->Start();
258 PDB(kPacketizer,1) Info("TPacketizerUnit", "return");
259}
260
261////////////////////////////////////////////////////////////////////////////////
262/// Assign work to be done to this packetizer
263
265{
266 if (num < 0) {
267 Error("AssignWork", "assigned a negative number (%lld) of cycles - protocol error?", num);
268 return -1;
269 }
270
271 fTotalEntries += num;
272 PDB(kPacketizer,1)
273 Info("AssignWork", "assigned %lld additional cycles (new total: %lld)", num, fTotalEntries);
274
275 // Update fixed number counter
276 if (fFixedNum && fWrkStats->GetSize() > 0) {
277 // Approximate number: the exact number is determined in GetNextPacket
279 if (fNumPerWorker == 0) fNumPerWorker = 1;
280 }
281
282 // Update/Save the config parameters in the dedicated list so that they will be saved
283 // in the outputlist and therefore in the relevant TQueryResult
285 (TParameter<Long64_t> *) fConfigParams->FindObject("PROOF_PacketizerFixedNum");
286 if (fn) {
288 } else {
289 fConfigParams->Add(new TParameter<Long64_t>("PROOF_PacketizerFixedNum", fNumPerWorker));
290 }
291
292 // Done
293 return 0;
294}
295
296////////////////////////////////////////////////////////////////////////////////
297/// Destructor.
298
300{
301 if (fWrkStats)
307}
308
309////////////////////////////////////////////////////////////////////////////////
310/// Get current time
311
313{
314 Double_t retValue = fStopwatch->RealTime();
316 return retValue;
317}
318
319////////////////////////////////////////////////////////////////////////////////
320/// Get Estimation of the current rate; just summing the current rates of
321/// the active workers
322
324{
325 all = kTRUE;
326 // Loop over the workers
327 Float_t currate = 0.;
328 if (fWrkStats && fWrkStats->GetSize() > 0) {
329 TIter nxw(fWrkStats);
330 TObject *key;
331 while ((key = nxw()) != 0) {
332 TSlaveStat *slstat = (TSlaveStat *) fWrkStats->GetValue(key);
333 if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
334 // Sum-up the current rates
335 currate += slstat->GetProgressStatus()->GetCurrentRate();
336 } else {
337 all = kFALSE;
338 }
339 }
340 }
341 // Done
342 return currate;
343}
344
345////////////////////////////////////////////////////////////////////////////////
346/// Get next packet
347
349{
350 if (!fValid)
351 return 0;
352
353 // Find slave
354 TSlaveStat *slstat = (TSlaveStat*) fWrkStats->GetValue(sl);
355 if (!slstat) {
356 Warning("GetNextPacket", "Received a packet request from an unknown slave: %s:%s",
357 sl->GetName(), sl->GetOrdinal());
358 return 0;
359 }
360
361 PDB(kPacketizer,2)
362 Info("GetNextPacket","worker-%s: fAssigned %lld\t", sl->GetOrdinal(), fAssigned);
363
364 // Update stats & free old element
365 Double_t latency = 0., proctime = 0., proccpu = 0.;
366 Long64_t bytesRead = -1;
367 Long64_t totalEntries = -1; // used only to read an old message type
368 Long64_t totev = 0;
369 Long64_t numev = -1;
370
371 TProofProgressStatus *status = 0;
372 if (sl->GetProtocol() > 18) {
373 (*r) >> latency;
374 (*r) >> status;
375
376 // Calculate the progress made in the last packet
377 TProofProgressStatus *progress = 0;
378 if (status) {
379 // update the worker status
380 numev = status->GetEntries() - slstat->GetEntriesProcessed();
381 progress = slstat->AddProcessed(status);
382 if (progress) {
383 // (*fProgressStatus) += *progress;
384 proctime = progress->GetProcTime();
385 proccpu = progress->GetCPUTime();
386 totev = status->GetEntries(); // for backward compatibility
387 bytesRead = progress->GetBytesRead();
388 delete progress;
389 }
390 delete status;
391 } else
392 Error("GetNextPacket", "no status came in the kPROOF_GETPACKET message");
393 } else {
394
395 (*r) >> latency >> proctime >> proccpu;
396
397 // only read new info if available
398 if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
399 if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
400 if (r->BufferSize() > r->Length()) (*r) >> totev;
401
402 numev = totev - slstat->GetEntriesProcessed();
403 slstat->GetProgressStatus()->IncEntries(numev);
404 slstat->GetProgressStatus()->SetLastUpdate();
405 }
406
409
410 fProcessing = 0;
411
412 PDB(kPacketizer,2)
413 Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
414 sl->GetOrdinal(), sl->GetName(),
415 numev, latency, proctime, proccpu, bytesRead);
416
417 if (gPerfStats != 0) {
418 gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(), "", numev,
419 latency, proctime, proccpu, bytesRead);
420 }
421
422 if (fNumPerWorker > 0 && slstat->GetEntriesProcessed() >= fNumPerWorker) {
423 PDB(kPacketizer,2)
424 Info("GetNextPacket","worker-%s (%s) is done (%lld cycles)",
425 sl->GetOrdinal(), sl->GetName(), slstat->GetEntriesProcessed());
426 return 0;
427 }
428
429 if (fAssigned == fTotalEntries) {
430 Bool_t done = kTRUE;
431 // If we are on a submaster, check if there is something else to do
434 if (nxe) {
435 if (AssignWork(0,0,nxe->GetNum()) == 0) {
436 if (fAssigned < fTotalEntries) done = kFALSE;
437 } else {
438 Error("GetNextPacket", "problems assigning additional work: stop");
439 }
440 SafeDelete(nxe);
441 }
442 }
443 if (done) {
444 // Send last timer message
445 HandleTimer(0);
446 return 0;
447 }
448 }
449
450 if (fStop) {
451 // Send last timer message
452 HandleTimer(0);
453 return 0;
454 }
455
456
457 Long64_t num;
458
459 // Get the current time
460 Double_t cTime = GetCurrentTime();
461
462 if (slstat->fCircNtp->GetEntries() <= 0) {
463 // The calibration phase
465 num = (Long64_t) (fCalibFrac * avg);
466 if (num < 1) num = (avg >= 1) ? avg : 1;
467 PDB(kPacketizer,2)
468 Info("GetNextPacket", "calibration: total entries %lld, workers %d, frac: %.1f %%, raw num: %lld",
469 fTotalEntries, fWrkStats->GetSize(), fCalibFrac * 100., num);
470
471 // Create a reference entry
472 slstat->UpdatePerformance(0.);
473
474 } else {
475
476 if (fNumPerWorker < 0) {
477
478 // Schedule tasks for workers based on the currently estimated processing speeds
479
480 // Update performances
481 // slstat->fStatus was updated before;
482 slstat->UpdatePerformance(proctime);
483
484 // We need to estimate the total instantaneous rate: for the workers not having yet
485 // one we assume the average of those having a measurement
486 // The optimal number for worker j is
487 //
488 // n_j = r_j / Sum r_i * N_left
489 //
490
491 Int_t nrm = 0;
492 Double_t sumRate = 0.;
493 TIter nxwrk(fWrkStats);
494 TSlaveStat *wrkStat = 0;
495 TSlave *tmpWrk = 0;
496 while ((tmpWrk = (TSlave *)nxwrk())) {
497 if ((wrkStat = dynamic_cast<TSlaveStat *>(fWrkStats->GetValue(tmpWrk)))) {
498 if (wrkStat->fRate > 0) {
499 nrm++;
500 sumRate += wrkStat->fRate;
501 }
502 PDB(kPacketizer,3)
503 Info("GetNextPacket", "%d: worker-%s: rate %lf /s (sum: %lf /s)",
504 nrm, tmpWrk->GetOrdinal(), wrkStat->fRate, sumRate);
505 } else {
506 Warning("GetNextPacket", "dynamic_cast<TSlaveStat *> failing on value for '%s (%s)'! Skipping",
507 tmpWrk->GetName(), tmpWrk->GetOrdinal());
508 }
509 }
510
511 // Check consistency
512 if (nrm <= 0) {
513 Error("GetNextPacket", "no worker has consistent information: stop processing!");
514 return (TDSetElement *)0;
515 }
516
517 Double_t avgRate = sumRate / nrm;
518 // Check if all workers had meaningful rate information
519 if (nrm < fWrkStats->GetSize()) {
520 // For some workers the measurement is missing: use the average
521 sumRate += (fWrkStats->GetSize() - nrm) * avgRate;
522 }
523 PDB(kPacketizer,2)
524 Info("GetNextPacket", "rate: avg: %lf /s/wrk - sum: %lf /s (measurements %d out of %d)",
525 avgRate, sumRate, nrm, fWrkStats->GetSize());
526
527 // Packet size for this worker
528 Double_t wrkRate = (slstat->fRate > 0.) ? slstat->fRate : avgRate ;
529 num = (Long64_t) ((fTotalEntries - fAssigned) * wrkRate / sumRate);
530 PDB(kPacketizer,2)
531 Info("GetNextPacket", "worker-%s (%s): raw packet size: %lld", sl->GetOrdinal(), sl->GetName(), num);
532
533 // Apply time-per-packet limits
534 Double_t packTime = num / wrkRate;
535 if (fMaxPacketTime > 0. && packTime > fMaxPacketTime) {
536 num = (Long64_t) (fMaxPacketTime * wrkRate) ;
537 packTime = fMaxPacketTime;
538 PDB(kPacketizer,2)
539 Info("GetNextPacket", "worker-%s (%s): time-limited packet size: %lld (upper limit: %.2f secs)",
540 sl->GetOrdinal(), sl->GetName(), num, fMaxPacketTime);
541 }
542 if (fMinPacketTime > 0. && packTime < fMinPacketTime) {
543 num = (Long64_t) (fMinPacketTime * wrkRate);
544 PDB(kPacketizer,2)
545 Info("GetNextPacket", "worker-%s (%s): time-limited packet size: %lld (lower limit: %.2f secs)",
546 sl->GetOrdinal(), sl->GetName(), num, fMinPacketTime);
547 }
548
549 } else {
550 // Fixed number of cycles per worker
551 num = fNumPerWorker - slstat->fLastProcessed;
552 if (num > 1 && slstat->fRate > 0 && num / slstat->fRate > fMaxPacketTime) {
553 num = (Long64_t) (slstat->fRate * fMaxPacketTime);
554 }
555 }
556 }
557 // Minimum packet size
558 num = (num > 1) ? num : 1;
559 fProcessing = (num < (fTotalEntries - fAssigned)) ? num
561
562 // Set the information of the current slave
563 slstat->fLastProcessed = fProcessing;
564 // Set the start time of the current packet
565 slstat->fTimeInstant = cTime;
566
567 // Update the sequential number
568 fPacketSeq++;
569 TString sseq = TString::Format("p%lld", fPacketSeq);
570
571 PDB(kPacketizer,2)
572 Info("GetNextPacket", "worker-%s: num %lld, processing %lld, remaining %lld",sl->GetOrdinal(),
574 TDSetElement *elem = new TDSetElement(sseq, sseq, "", fAssigned, fProcessing);
576
577 // Update the total counter
578 fAssigned += slstat->fLastProcessed;
579
580 return elem;
581}
582
583////////////////////////////////////////////////////////////////////////////////
584/// Adds new workers. Returns the number of workers added, or -1 on failure.
585
587{
588 if (!workers) {
589 Error("AddWorkers", "Null list of new workers!");
590 return -1;
591 }
592
593 Int_t curNumOfWrks = fWrkStats->GetEntries();
594
595 TSlave *sl;
596 TIter next(workers);
597 while (( sl = dynamic_cast<TSlave*>(next()) ))
598 fWrkStats->Add(sl, new TSlaveStat(sl, fInput));
599
600 fNumPerWorker = -1;
601 if (fFixedNum && fWrkStats->GetSize() > 0) {
602 // Approximate number: the exact number is determined in GetNextPacket
603 fNumPerWorker = (fNumPerWorker * curNumOfWrks) / fWrkStats->GetSize();
604 if (fNumPerWorker == 0) fNumPerWorker = 1;
605 }
606
607 fConfigParams->Add(new TParameter<Long64_t>("PROOF_PacketizerFixedNum", fNumPerWorker));
608
609 return fWrkStats->GetEntries();
610}
ROOT::R::TRInterface & r
Definition: Object.C:4
#define SafeDelete(p)
Definition: RConfig.hxx:550
#define f(i)
Definition: RSha256.hxx:104
int Int_t
Definition: RtypesCore.h:41
const Bool_t kFALSE
Definition: RtypesCore.h:88
long Long_t
Definition: RtypesCore.h:50
bool Bool_t
Definition: RtypesCore.h:59
double Double_t
Definition: RtypesCore.h:55
long long Long64_t
Definition: RtypesCore.h:69
float Float_t
Definition: RtypesCore.h:53
const Bool_t kTRUE
Definition: RtypesCore.h:87
#define ClassImp(name)
Definition: Rtypes.h:365
void Info(const char *location, const char *msgfmt,...)
void Error(const char *location, const char *msgfmt,...)
#define PDB(mask, level)
Definition: TProofDebug.h:56
R__EXTERN TProofServ * gProofServ
Definition: TProofServ.h:347
#define gPerfStats
virtual Int_t GetEntries() const
Definition: TCollection.h:177
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Definition: TCollection.h:182
Manages an element of a TDSet.
Definition: TDSet.h:66
Long64_t GetNum() const
Definition: TDSet.h:114
@ kEmpty
Definition: TDSet.h:73
This class implements a data set to be used for PROOF processing.
Definition: TDSet.h:153
TObject * Next()
Definition: TCollection.h:249
A doubly linked list.
Definition: TList.h:44
virtual void Add(TObject *obj)
Definition: TList.h:87
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:575
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:40
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection).
Definition: TMap.cxx:53
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap.
Definition: TMap.cxx:150
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Definition: TMap.cxx:235
A simple TTree restricted to a list of double variables only.
Definition: TNtupleD.h:28
Mother of all ROOT objects.
Definition: TObject.h:37
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:866
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:694
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:880
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:854
This packetizer generates packets of generic units, representing the number of times an operation cyc...
Int_t AddWorkers(TList *workers)
Adds new workers. Returns the number of workers added, or -1 on failure.
virtual ~TPacketizerUnit()
Destructor.
Long64_t fProcessing
Double_t GetCurrentTime()
Get current time.
Long64_t fNumPerWorker
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
TStopwatch * fStopwatch
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet.
Int_t AssignWork(TDSet *, Long64_t, Long64_t num)
Assign work to be done to this packetizer.
Named parameter, streamable and storable.
Definition: TParameter.h:37
void SetVal(const AParamType &val)
Definition: TParameter.h:71
Container class for processing statistics.
void SetLastUpdate(Double_t updtTime=0)
Update time stamp either with the passed value (if > 0) or with the current time.
Double_t GetProcTime() const
Long64_t GetEntries() const
void SetLastEntries(Long64_t entries)
Double_t GetCPUTime() const
void IncEntries(Long64_t entries=1)
Long64_t GetBytesRead() const
TDSetElement * GetNextPacket(Long64_t totalEntries=-1)
Get next range of entries to be processed on this server.
Bool_t IsMaster() const
Definition: TProofServ.h:293
Bool_t IsTopMaster() const
Definition: TProofServ.h:295
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition: TProof.cxx:9891
Class describing a PROOF worker server.
Definition: TSlave.h:46
Int_t GetParallel() const
Definition: TSlave.h:141
Int_t GetProtocol() const
Definition: TSlave.h:133
const char * GetName() const
Returns name of object.
Definition: TSlave.h:124
const char * GetOrdinal() const
Definition: TSlave.h:131
Stopwatch class.
Definition: TStopwatch.h:28
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
Definition: TStopwatch.cxx:110
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Definition: TStopwatch.cxx:58
void Continue()
Resume a stopped stopwatch.
Definition: TStopwatch.cxx:93
Basic string class.
Definition: TString.h:131
static TString Format(const char *fmt,...)
Static method which formats a string using a printf style format descriptor and return a TString.
Definition: TString.cxx:2311
virtual TProofProgressStatus * AddProcessed(TProofProgressStatus *st)=0
The packetizer is a load balancing object created for each query.
TProofProgressStatus * fProgressStatus
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
virtual Int_t AddProcessed(TSlave *, TProofProgressStatus *, Double_t, TList **)
Long64_t GetEntriesProcessed() const
TMath.
Definition: TMathBase.h:35