Logo ROOT  
Reference Guide
TPacketizerUnit.cxx
Go to the documentation of this file.
1// @(#)root/proofplayer:$Id$
2// Author: Long Tran-Thanh 22/07/07
3// Revised: G. Ganis, May 2011
4
5/*************************************************************************
6 * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13/** \class TPacketizerUnit
14\ingroup proofkernel
15
16This packetizer generates packets of generic units, representing the
17number of times an operation cycle has to be repeated by the worker
18node, e.g. the number of Monte carlo events to be generated.
19Packets sizes are generated taking into account the performance of
20worker nodes, based on the time needed to process previous packets,
21with the goal of having all workers ending at the same time.
22
23*/
24
25
26#include "TPacketizerUnit.h"
27
28#include "Riostream.h"
29#include "TDSet.h"
30#include "TError.h"
31#include "TEventList.h"
32#include "TMap.h"
33#include "TMessage.h"
34#include "TMonitor.h"
35#include "TNtupleD.h"
36#include "TObject.h"
37#include "TParameter.h"
38#include "TPerfStats.h"
39#include "TProofDebug.h"
40#include "TProof.h"
41#include "TProofPlayer.h"
42#include "TProofServ.h"
43#include "TSlave.h"
44#include "TSocket.h"
45#include "TStopwatch.h"
46#include "TTimer.h"
47#include "TUrl.h"
48#include "TClass.h"
49#include "TMath.h"
50
51
52using namespace TMath;
53//
54// The following utility class manage the state of the
55// work to be performed and the slaves involved in the process.
56//
57// The list of TSlaveStat(s) keep track of the work (being) done
58// by each slave
59//
60
61//------------------------------------------------------------------------------
62
63class TPacketizerUnit::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
64
65friend class TPacketizerUnit;
66
67private:
68 Long64_t fLastProcessed; // Number of processed entries of the last packet
69 Double_t fRate; // Estimated processing rate averaged over circularity
70 Double_t fTimeInstant; // Starting time of the current packet
71 TNtupleD *fCircNtp; // Keeps circular info for speed calculations
72 Long_t fCircLvl; // Circularity level
73
74public:
75 TSlaveStat(TSlave *sl, TList *input);
76 ~TSlaveStat();
77
78// void GetCurrentTime();
79
80 void UpdatePerformance(Double_t time);
82
83// ClassDef(TPacketizerUnit::TSlaveStat, 0);
84};
85
86////////////////////////////////////////////////////////////////////////////////
87/// Main constructor
88
89TPacketizerUnit::TSlaveStat::TSlaveStat(TSlave *slave, TList *input)
90 : fLastProcessed(0),
91 fRate(0), fTimeInstant(0), fCircLvl(5)
92{
93 // Initialize the circularity ntple for speed calculations
94 fCircNtp = new TNtupleD("Speed Circ Ntp", "Circular process info","tm:ev");
95 fCircNtp->SetDirectory(0);
96 TProof::GetParameter(input, "PROOF_TPacketizerUnitCircularity", fCircLvl);
97 fCircLvl = (fCircLvl > 0) ? fCircLvl : 5;
98 fCircNtp->SetCircular(fCircLvl);
99 fSlave = slave;
100 fStatus = new TProofProgressStatus();
101}
102
103////////////////////////////////////////////////////////////////////////////////
104/// Destructor
105
106TPacketizerUnit::TSlaveStat::~TSlaveStat()
107{
108 SafeDelete(fCircNtp);
109}
110
111////////////////////////////////////////////////////////////////////////////////
112/// Update the circular ntple
113
114void TPacketizerUnit::TSlaveStat::UpdatePerformance(Double_t time)
115{
116 Double_t ttot = time;
117 Double_t *ar = fCircNtp->GetArgs();
118 Int_t ne = fCircNtp->GetEntries();
119 if (ne <= 0) {
120 // First call: just fill one ref entry and return
121 fCircNtp->Fill(0., 0);
122 fRate = 0.;
123 return;
124 }
125 // Fill the entry
126 fCircNtp->GetEntry(ne-1);
127 ttot = ar[0] + time;
128 fCircNtp->Fill(ttot, GetEntriesProcessed());
129
130 // Calculate the speed
131 fCircNtp->GetEntry(0);
132 Double_t dtime = (ttot > ar[0]) ? ttot - ar[0] : ne+1 ;
133 Long64_t nevts = GetEntriesProcessed() - (Long64_t)ar[1];
134 fRate = nevts / dtime;
135 PDB(kPacketizer,2)
136 Info("UpdatePerformance", "time:%f, dtime:%f, nevts:%lld, speed: %f",
137 time, dtime, nevts, fRate);
138
139}
140
141////////////////////////////////////////////////////////////////////////////////
142/// Update the status info to the 'st'.
143/// return the difference (*st - *fStatus)
144
146{
147 if (st) {
148 // The entriesis not correct in 'st'
149 Long64_t lastEntries = st->GetEntries() - fStatus->GetEntries();
150 // The last proc time should not be added
151 fStatus->SetLastProcTime(0.);
152 // Get the diff
153 TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
154 *fStatus += *diff;
155 // Set the correct value
156 fStatus->SetLastEntries(lastEntries);
157 return diff;
158 } else {
159 Error("AddProcessed", "status arg undefined");
160 return 0;
161 }
162}
163
164//------------------------------------------------------------------------------
165
167
168////////////////////////////////////////////////////////////////////////////////
169/// Constructor
170
173 : TVirtualPacketizer(input, st)
174{
175 PDB(kPacketizer,1) Info("TPacketizerUnit", "enter (num %lld)", num);
176
177 // Init pointer members
178 fWrkStats = 0;
179 fPackets = 0;
180 fInput = input;
181
183 Int_t fixednum = -1;
184 if (TProof::GetParameter(input, "PROOF_PacketizerFixedNum", fixednum) != 0 || fixednum <= 0) {
186 }
187 else {
188 Info("TPacketizerUnit", "forcing the same cycles on each worker");
190 }
191
192 fCalibFrac = 0.01;
193 if (TProof::GetParameter(input, "PROOF_PacketizerCalibFrac", fCalibFrac) != 0 || fCalibFrac <= 0)
194 fCalibFrac = 0.01;
195 PDB(kPacketizer,1)
196 Info("TPacketizerUnit", "size of the calibration packets: %.2f %% of average number per worker", fCalibFrac);
197
198 fMaxPacketTime = 3.;
199 Double_t timeLimit = -1;
200 if (TProof::GetParameter(input, "PROOF_PacketizerTimeLimit", timeLimit) == 0) {
201 fMaxPacketTime = timeLimit;
202 Warning("TPacketizerUnit", "PROOF_PacketizerTimeLimit is deprecated: use PROOF_MaxPacketTime instead");
203 }
204 PDB(kPacketizer,1)
205 Info("TPacketizerUnit", "time limit is %lf", fMaxPacketTime);
206
207 // Different default for min packet time
208 fMinPacketTime = 1;
209 Double_t minPacketTime = 0;
210 if (TProof::GetParameter(input, "PROOF_MinPacketTime", minPacketTime) == 0) fMinPacketTime = minPacketTime;
211 TParameter<Double_t> *mpt = (TParameter<Double_t> *) fConfigParams->FindObject("PROOF_MinPacketTime");
212 if (mpt) {
214 } else {
215 fConfigParams->Add(new TParameter<Double_t>("PROOF_MinPacketTime", fMinPacketTime));
216 }
217
218 fProcessing = 0;
219 fAssigned = 0;
220 fPacketSeq = 0;
221
222 fStopwatch = new TStopwatch();
223
224 fPackets = new TList;
226
227 fWrkStats = new TMap;
229 fWrkExcluded = 0;
230
231 TSlave *slave;
232 TIter si(slaves);
233 while ((slave = (TSlave*) si.Next())) {
234 if (slave->GetParallel() > 0) {
235 fWrkStats->Add(slave, new TSlaveStat(slave, input));
236 } else {
237 if (!fWrkExcluded) {
238 fWrkExcluded = new TList;
240 }
241 PDB(kPacketizer,2)
242 Info("TPacketizerUnit", "node '%s' has NO active worker: excluded from work distribution", slave->GetOrdinal());
243 fWrkExcluded->Add(slave);
244 }
245 }
246
247 fTotalEntries = 0;
248 fNumPerWorker = -1;
249 if (num > 0 && AssignWork(0,0,num) != 0)
250 Warning("TPacketizerUnit", "some problems assigning work");
251
252 // Save the config parameters in the dedicated list so that they will be saved
253 // in the outputlist and therefore in the relevant TQueryResult
254 fConfigParams->Add(new TParameter<Float_t>("PROOF_PacketizerCalibFrac", fCalibFrac));
255
256 fStopwatch->Start();
257 PDB(kPacketizer,1) Info("TPacketizerUnit", "return");
258}
259
260////////////////////////////////////////////////////////////////////////////////
261/// Assign work to be done to this packetizer
262
264{
265 if (num < 0) {
266 Error("AssignWork", "assigned a negative number (%lld) of cycles - protocol error?", num);
267 return -1;
268 }
269
270 fTotalEntries += num;
271 PDB(kPacketizer,1)
272 Info("AssignWork", "assigned %lld additional cycles (new total: %lld)", num, fTotalEntries);
273
274 // Update fixed number counter
275 if (fFixedNum && fWrkStats->GetSize() > 0) {
276 // Approximate number: the exact number is determined in GetNextPacket
278 if (fNumPerWorker == 0) fNumPerWorker = 1;
279 }
280
281 // Update/Save the config parameters in the dedicated list so that they will be saved
282 // in the outputlist and therefore in the relevant TQueryResult
284 (TParameter<Long64_t> *) fConfigParams->FindObject("PROOF_PacketizerFixedNum");
285 if (fn) {
287 } else {
288 fConfigParams->Add(new TParameter<Long64_t>("PROOF_PacketizerFixedNum", fNumPerWorker));
289 }
290
291 // Done
292 return 0;
293}
294
295////////////////////////////////////////////////////////////////////////////////
296/// Destructor.
297
299{
300 if (fWrkStats)
306}
307
308////////////////////////////////////////////////////////////////////////////////
309/// Get current time
310
312{
313 Double_t retValue = fStopwatch->RealTime();
315 return retValue;
316}
317
318////////////////////////////////////////////////////////////////////////////////
319/// Get Estimation of the current rate; just summing the current rates of
320/// the active workers
321
323{
324 all = kTRUE;
325 // Loop over the workers
326 Float_t currate = 0.;
327 if (fWrkStats && fWrkStats->GetSize() > 0) {
328 TIter nxw(fWrkStats);
329 TObject *key;
330 while ((key = nxw()) != 0) {
331 TSlaveStat *slstat = (TSlaveStat *) fWrkStats->GetValue(key);
332 if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
333 // Sum-up the current rates
334 currate += slstat->GetProgressStatus()->GetCurrentRate();
335 } else {
336 all = kFALSE;
337 }
338 }
339 }
340 // Done
341 return currate;
342}
343
344////////////////////////////////////////////////////////////////////////////////
345/// Get next packet
346
348{
349 if (!fValid)
350 return 0;
351
352 // Find slave
353 TSlaveStat *slstat = (TSlaveStat*) fWrkStats->GetValue(sl);
354 if (!slstat) {
355 Warning("GetNextPacket", "Received a packet request from an unknown slave: %s:%s",
356 sl->GetName(), sl->GetOrdinal());
357 return 0;
358 }
359
360 PDB(kPacketizer,2)
361 Info("GetNextPacket","worker-%s: fAssigned %lld\t", sl->GetOrdinal(), fAssigned);
362
363 // Update stats & free old element
364 Double_t latency = 0., proctime = 0., proccpu = 0.;
365 Long64_t bytesRead = -1;
366 Long64_t totalEntries = -1; // used only to read an old message type
367 Long64_t totev = 0;
368 Long64_t numev = -1;
369
370 TProofProgressStatus *status = 0;
371 if (sl->GetProtocol() > 18) {
372 (*r) >> latency;
373 (*r) >> status;
374
375 // Calculate the progress made in the last packet
376 TProofProgressStatus *progress = 0;
377 if (status) {
378 // update the worker status
379 numev = status->GetEntries() - slstat->GetEntriesProcessed();
380 progress = slstat->AddProcessed(status);
381 if (progress) {
382 // (*fProgressStatus) += *progress;
383 proctime = progress->GetProcTime();
384 proccpu = progress->GetCPUTime();
385 totev = status->GetEntries(); // for backward compatibility
386 bytesRead = progress->GetBytesRead();
387 delete progress;
388 }
389 delete status;
390 } else
391 Error("GetNextPacket", "no status came in the kPROOF_GETPACKET message");
392 } else {
393
394 (*r) >> latency >> proctime >> proccpu;
395
396 // only read new info if available
397 if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
398 if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
399 if (r->BufferSize() > r->Length()) (*r) >> totev;
400
401 numev = totev - slstat->GetEntriesProcessed();
402 slstat->GetProgressStatus()->IncEntries(numev);
403 slstat->GetProgressStatus()->SetLastUpdate();
404 }
405
408
409 fProcessing = 0;
410
411 PDB(kPacketizer,2)
412 Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
413 sl->GetOrdinal(), sl->GetName(),
414 numev, latency, proctime, proccpu, bytesRead);
415
416 if (gPerfStats != 0) {
417 gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(), "", numev,
418 latency, proctime, proccpu, bytesRead);
419 }
420
421 if (fNumPerWorker > 0 && slstat->GetEntriesProcessed() >= fNumPerWorker) {
422 PDB(kPacketizer,2)
423 Info("GetNextPacket","worker-%s (%s) is done (%lld cycles)",
424 sl->GetOrdinal(), sl->GetName(), slstat->GetEntriesProcessed());
425 return 0;
426 }
427
428 if (fAssigned == fTotalEntries) {
429 Bool_t done = kTRUE;
430 // If we are on a submaster, check if there is something else to do
433 if (nxe) {
434 if (AssignWork(0,0,nxe->GetNum()) == 0) {
435 if (fAssigned < fTotalEntries) done = kFALSE;
436 } else {
437 Error("GetNextPacket", "problems assigning additional work: stop");
438 }
439 SafeDelete(nxe);
440 }
441 }
442 if (done) {
443 // Send last timer message
444 HandleTimer(0);
445 return 0;
446 }
447 }
448
449 if (fStop) {
450 // Send last timer message
451 HandleTimer(0);
452 return 0;
453 }
454
455
456 Long64_t num;
457
458 // Get the current time
459 Double_t cTime = GetCurrentTime();
460
461 if (slstat->fCircNtp->GetEntries() <= 0) {
462 // The calibration phase
464 num = (Long64_t) (fCalibFrac * avg);
465 if (num < 1) num = (avg >= 1) ? avg : 1;
466 PDB(kPacketizer,2)
467 Info("GetNextPacket", "calibration: total entries %lld, workers %d, frac: %.1f %%, raw num: %lld",
468 fTotalEntries, fWrkStats->GetSize(), fCalibFrac * 100., num);
469
470 // Create a reference entry
471 slstat->UpdatePerformance(0.);
472
473 } else {
474
475 if (fNumPerWorker < 0) {
476
477 // Schedule tasks for workers based on the currently estimated processing speeds
478
479 // Update performances
480 // slstat->fStatus was updated before;
481 slstat->UpdatePerformance(proctime);
482
483 // We need to estimate the total instantaneous rate: for the workers not having yet
484 // one we assume the average of those having a measurement
485 // The optimal number for worker j is
486 //
487 // n_j = r_j / Sum r_i * N_left
488 //
489
490 Int_t nrm = 0;
491 Double_t sumRate = 0.;
492 TIter nxwrk(fWrkStats);
493 TSlaveStat *wrkStat = 0;
494 TSlave *tmpWrk = 0;
495 while ((tmpWrk = (TSlave *)nxwrk())) {
496 if ((wrkStat = dynamic_cast<TSlaveStat *>(fWrkStats->GetValue(tmpWrk)))) {
497 if (wrkStat->fRate > 0) {
498 nrm++;
499 sumRate += wrkStat->fRate;
500 }
501 PDB(kPacketizer,3)
502 Info("GetNextPacket", "%d: worker-%s: rate %lf /s (sum: %lf /s)",
503 nrm, tmpWrk->GetOrdinal(), wrkStat->fRate, sumRate);
504 } else {
505 Warning("GetNextPacket", "dynamic_cast<TSlaveStat *> failing on value for '%s (%s)'! Skipping",
506 tmpWrk->GetName(), tmpWrk->GetOrdinal());
507 }
508 }
509
510 // Check consistency
511 if (nrm <= 0) {
512 Error("GetNextPacket", "no worker has consistent information: stop processing!");
513 return (TDSetElement *)0;
514 }
515
516 Double_t avgRate = sumRate / nrm;
517 // Check if all workers had meaningful rate information
518 if (nrm < fWrkStats->GetSize()) {
519 // For some workers the measurement is missing: use the average
520 sumRate += (fWrkStats->GetSize() - nrm) * avgRate;
521 }
522 PDB(kPacketizer,2)
523 Info("GetNextPacket", "rate: avg: %lf /s/wrk - sum: %lf /s (measurements %d out of %d)",
524 avgRate, sumRate, nrm, fWrkStats->GetSize());
525
526 // Packet size for this worker
527 Double_t wrkRate = (slstat->fRate > 0.) ? slstat->fRate : avgRate ;
528 num = (Long64_t) ((fTotalEntries - fAssigned) * wrkRate / sumRate);
529 PDB(kPacketizer,2)
530 Info("GetNextPacket", "worker-%s (%s): raw packet size: %lld", sl->GetOrdinal(), sl->GetName(), num);
531
532 // Apply time-per-packet limits
533 Double_t packTime = num / wrkRate;
534 if (fMaxPacketTime > 0. && packTime > fMaxPacketTime) {
535 num = (Long64_t) (fMaxPacketTime * wrkRate) ;
536 packTime = fMaxPacketTime;
537 PDB(kPacketizer,2)
538 Info("GetNextPacket", "worker-%s (%s): time-limited packet size: %lld (upper limit: %.2f secs)",
539 sl->GetOrdinal(), sl->GetName(), num, fMaxPacketTime);
540 }
541 if (fMinPacketTime > 0. && packTime < fMinPacketTime) {
542 num = (Long64_t) (fMinPacketTime * wrkRate);
543 PDB(kPacketizer,2)
544 Info("GetNextPacket", "worker-%s (%s): time-limited packet size: %lld (lower limit: %.2f secs)",
545 sl->GetOrdinal(), sl->GetName(), num, fMinPacketTime);
546 }
547
548 } else {
549 // Fixed number of cycles per worker
550 num = fNumPerWorker - slstat->fLastProcessed;
551 if (num > 1 && slstat->fRate > 0 && num / slstat->fRate > fMaxPacketTime) {
552 num = (Long64_t) (slstat->fRate * fMaxPacketTime);
553 }
554 }
555 }
556 // Minimum packet size
557 num = (num > 1) ? num : 1;
558 fProcessing = (num < (fTotalEntries - fAssigned)) ? num
560
561 // Set the information of the current slave
562 slstat->fLastProcessed = fProcessing;
563 // Set the start time of the current packet
564 slstat->fTimeInstant = cTime;
565
566 // Update the sequential number
567 fPacketSeq++;
568 TString sseq = TString::Format("p%lld", fPacketSeq);
569
570 PDB(kPacketizer,2)
571 Info("GetNextPacket", "worker-%s: num %lld, processing %lld, remaining %lld",sl->GetOrdinal(),
573 TDSetElement *elem = new TDSetElement(sseq, sseq, "", fAssigned, fProcessing);
575
576 // Update the total counter
577 fAssigned += slstat->fLastProcessed;
578
579 return elem;
580}
581
582////////////////////////////////////////////////////////////////////////////////
583/// Adds new workers. Returns the number of workers added, or -1 on failure.
584
586{
587 if (!workers) {
588 Error("AddWorkers", "Null list of new workers!");
589 return -1;
590 }
591
592 Int_t curNumOfWrks = fWrkStats->GetEntries();
593
594 TSlave *sl;
595 TIter next(workers);
596 while (( sl = dynamic_cast<TSlave*>(next()) ))
597 fWrkStats->Add(sl, new TSlaveStat(sl, fInput));
598
599 fNumPerWorker = -1;
600 if (fFixedNum && fWrkStats->GetSize() > 0) {
601 // Approximate number: the exact number is determined in GetNextPacket
602 fNumPerWorker = (fNumPerWorker * curNumOfWrks) / fWrkStats->GetSize();
603 if (fNumPerWorker == 0) fNumPerWorker = 1;
604 }
605
606 fConfigParams->Add(new TParameter<Long64_t>("PROOF_PacketizerFixedNum", fNumPerWorker));
607
608 return fWrkStats->GetEntries();
609}
ROOT::R::TRInterface & r
Definition: Object.C:4
#define SafeDelete(p)
Definition: RConfig.hxx:543
#define f(i)
Definition: RSha256.hxx:104
int Int_t
Definition: RtypesCore.h:43
const Bool_t kFALSE
Definition: RtypesCore.h:90
long Long_t
Definition: RtypesCore.h:52
double Double_t
Definition: RtypesCore.h:57
long long Long64_t
Definition: RtypesCore.h:71
const Bool_t kTRUE
Definition: RtypesCore.h:89
#define ClassImp(name)
Definition: Rtypes.h:361
void Info(const char *location, const char *msgfmt,...)
void Error(const char *location, const char *msgfmt,...)
#define PDB(mask, level)
Definition: TProofDebug.h:56
R__EXTERN TProofServ * gProofServ
Definition: TProofServ.h:347
#define gPerfStats
virtual Int_t GetEntries() const
Definition: TCollection.h:177
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Definition: TCollection.h:182
Manages an element of a TDSet.
Definition: TDSet.h:66
Long64_t GetNum() const
Definition: TDSet.h:114
@ kEmpty
Definition: TDSet.h:73
This class implements a data set to be used for PROOF processing.
Definition: TDSet.h:153
TObject * Next()
Definition: TCollection.h:249
A doubly linked list.
Definition: TList.h:44
virtual void Add(TObject *obj)
Definition: TList.h:87
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:577
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:40
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection).
Definition: TMap.cxx:54
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap.
Definition: TMap.cxx:151
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Definition: TMap.cxx:236
A simple TTree restricted to a list of double variables only.
Definition: TNtupleD.h:28
Mother of all ROOT objects.
Definition: TObject.h:37
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:877
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:694
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:891
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:865
This packetizer generates packets of generic units, representing the number of times an operation cyc...
Int_t AddWorkers(TList *workers)
Adds new workers. Returns the number of workers added, or -1 on failure.
virtual ~TPacketizerUnit()
Destructor.
Long64_t fProcessing
Double_t GetCurrentTime()
Get current time.
Long64_t fNumPerWorker
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
TStopwatch * fStopwatch
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet.
Int_t AssignWork(TDSet *, Long64_t, Long64_t num)
Assign work to be done to this packetizer.
Named parameter, streamable and storable.
Definition: TParameter.h:37
void SetVal(const AParamType &val)
Definition: TParameter.h:71
Container class for processing statistics.
void SetLastUpdate(Double_t updtTime=0)
Update time stamp either with the passed value (if > 0) or with the current time.
Double_t GetProcTime() const
Long64_t GetEntries() const
void SetLastEntries(Long64_t entries)
Double_t GetCPUTime() const
void IncEntries(Long64_t entries=1)
Long64_t GetBytesRead() const
TDSetElement * GetNextPacket(Long64_t totalEntries=-1)
Get next range of entries to be processed on this server.
Bool_t IsMaster() const
Definition: TProofServ.h:293
Bool_t IsTopMaster() const
Definition: TProofServ.h:295
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition: TProof.cxx:9908
Class describing a PROOF worker server.
Definition: TSlave.h:46
Int_t GetParallel() const
Definition: TSlave.h:141
Int_t GetProtocol() const
Definition: TSlave.h:133
const char * GetName() const
Returns name of object.
Definition: TSlave.h:124
const char * GetOrdinal() const
Definition: TSlave.h:131
Stopwatch class.
Definition: TStopwatch.h:28
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
Definition: TStopwatch.cxx:110
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Definition: TStopwatch.cxx:58
void Continue()
Resume a stopped stopwatch.
Definition: TStopwatch.cxx:93
Basic string class.
Definition: TString.h:131
static TString Format(const char *fmt,...)
Static method which formats a string using a printf style format descriptor and return a TString.
Definition: TString.cxx:2311
virtual TProofProgressStatus * AddProcessed(TProofProgressStatus *st)=0
The packetizer is a load balancing object created for each query.
TProofProgressStatus * fProgressStatus
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
virtual Int_t AddProcessed(TSlave *, TProofProgressStatus *, Double_t, TList **)
Long64_t GetEntriesProcessed() const
TMath.
Definition: TMathBase.h:35