54 using namespace TMath;
82 void UpdatePerformance(
Double_t time);
91 TPacketizerUnit::TSlaveStat::TSlaveStat(
TSlave *slave,
TList *input)
93 fRate(0), fTimeInstant(0), fCircLvl(5)
96 fCircNtp =
new TNtupleD(
"Speed Circ Ntp",
"Circular process info",
"tm:ev");
97 fCircNtp->SetDirectory(0);
99 fCircLvl = (fCircLvl > 0) ? fCircLvl : 5;
100 fCircNtp->SetCircular(fCircLvl);
108 TPacketizerUnit::TSlaveStat::~TSlaveStat()
116 void TPacketizerUnit::TSlaveStat::UpdatePerformance(
Double_t time)
120 Int_t ne = fCircNtp->GetEntries();
123 fCircNtp->Fill(0., 0);
128 fCircNtp->GetEntry(ne-1);
133 fCircNtp->GetEntry(0);
134 Double_t dtime = (ttot > ar[0]) ? ttot - ar[0] : ne+1 ;
136 fRate = nevts / dtime;
138 Info("UpdatePerformance", "time:%f, dtime:%f, nevts:%lld, speed: %f",
139 time, dtime, nevts, fRate);
151 Long64_t lastEntries = st->GetEntries() - fStatus->GetEntries();
153 fStatus->SetLastProcTime(0.);
161 Error(
"AddProcessed",
"status arg undefined");
177 PDB(kPacketizer,1)
Info(
"TPacketizerUnit",
"enter (num %lld)", num);
190 Info(
"TPacketizerUnit",
"forcing the same cycles on each worker");
198 Info(
"TPacketizerUnit",
"size of the calibration packets: %.2f %% of average number per worker",
fCalibFrac);
204 Warning(
"TPacketizerUnit",
"PROOF_PacketizerTimeLimit is deprecated: use PROOF_MaxPacketTime instead");
244 Info(
"TPacketizerUnit",
"node '%s' has NO active worker: excluded from work distribution", slave->
GetOrdinal());
252 Warning(
"TPacketizerUnit",
"some problems assigning work");
259 PDB(kPacketizer,1)
Info(
"TPacketizerUnit",
"return");
268 Error(
"AssignWork",
"assigned a negative number (%lld) of cycles - protocol error?", num);
274 Info(
"AssignWork",
"assigned %lld additional cycles (new total: %lld)", num,
fTotalEntries);
332 while ((key = nxw()) != 0) {
334 if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
336 currate += slstat->GetProgressStatus()->GetCurrentRate();
357 Warning(
"GetNextPacket",
"Received a packet request from an unknown slave: %s:%s",
366 Double_t latency = 0., proctime = 0., proccpu = 0.;
381 numev = status->
GetEntries() - slstat->GetEntriesProcessed();
382 progress = slstat->AddProcessed(status);
387 totev = status->GetEntries();
393 Error(
"GetNextPacket",
"no status came in the kPROOF_GETPACKET message");
396 (*r) >> latency >> proctime >> proccpu;
403 numev = totev - slstat->GetEntriesProcessed();
404 slstat->GetProgressStatus()->IncEntries(numev);
405 slstat->GetProgressStatus()->SetLastUpdate();
414 Info(
"GetNextPacket",
"worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
416 numev, latency, proctime, proccpu, bytesRead);
420 latency, proctime, proccpu, bytesRead);
425 Info(
"GetNextPacket",
"worker-%s (%s) is done (%lld cycles)",
439 Error(
"GetNextPacket",
"problems assigning additional work: stop");
463 if (slstat->fCircNtp->GetEntries() <= 0) {
467 if (num < 1) num = (avg >= 1) ? avg : 1;
469 Info(
"GetNextPacket",
"calibration: total entries %lld, workers %d, frac: %.1f %%, raw num: %lld",
473 slstat->UpdatePerformance(0.);
483 slstat->UpdatePerformance(proctime);
495 TSlaveStat *wrkStat = 0;
497 while ((tmpWrk = (
TSlave *)nxwrk())) {
499 if (wrkStat->fRate > 0) {
501 sumRate += wrkStat->fRate;
504 Info(
"GetNextPacket",
"%d: worker-%s: rate %lf /s (sum: %lf /s)",
505 nrm, tmpWrk->
GetOrdinal(), wrkStat->fRate, sumRate);
507 Warning(
"GetNextPacket",
"dynamic_cast<TSlaveStat *> failing on value for '%s (%s)'! Skipping",
514 Error(
"GetNextPacket",
"no worker has consistent information: stop processing!");
520 if (nrm < fWrkStats->GetSize()) {
525 Info(
"GetNextPacket",
"rate: avg: %lf /s/wrk - sum: %lf /s (measurements %d out of %d)",
529 Double_t wrkRate = (slstat->fRate > 0.) ? slstat->fRate : avgRate ;
532 Info(
"GetNextPacket",
"worker-%s (%s): raw packet size: %lld", sl->
GetOrdinal(), sl->
GetName(), num);
540 Info(
"GetNextPacket",
"worker-%s (%s): time-limited packet size: %lld (upper limit: %.2f secs)",
546 Info(
"GetNextPacket",
"worker-%s (%s): time-limited packet size: %lld (lower limit: %.2f secs)",
553 if (num > 1 && slstat->fRate > 0 && num / slstat->fRate >
fMaxPacketTime) {
559 num = (num > 1) ? num : 1;
566 slstat->fTimeInstant = cTime;
573 Info(
"GetNextPacket",
"worker-%s: num %lld, processing %lld, remaining %lld",sl->
GetOrdinal(),
590 Error(
"AddWorkers",
"Null list of new workers!");
598 while (( sl = dynamic_cast<TSlave*>(
next()) ))
virtual Int_t GetEntries() const
virtual Int_t AddProcessed(TSlave *, TProofProgressStatus *, Double_t, TList **)
const char * GetOrdinal() const
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
A simple TTree restricted to a list of double variables only.
ClassImp(TSeqCollection) Int_t TSeqCollection TIter next(this)
Return index of object in collection.
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Long64_t GetBytesRead() const
Int_t AddWorkers(TList *workers)
Adds new workers. Returns the number of workers added, or -1 on failure.
void SetLastUpdate(Double_t updtTime=0)
Update time stamp either with the passed value (if > 0) or with the current time. ...
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection)...
void SetVal(const AParamType &val)
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet.
Long64_t GetEntriesProcessed() const
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Double_t GetProcTime() const
Double_t GetCPUTime() const
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
static TString Format(const char *fmt,...)
Static method which formats a string using a printf style format descriptor and return a TString...
Int_t AssignWork(TDSet *, Long64_t, Long64_t num)
Assign work to be done to this packetizer.
void IncEntries(Long64_t entries=1)
TDSetElement * GetNextPacket(Long64_t totalEntries=-1)
Get next range of entries to be processed on this server.
void Info(const char *location, const char *msgfmt,...)
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
void Continue()
Resume a stopped stopwatch.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
void Error(const char *location, const char *msgfmt,...)
TObject * GetParameter(const char *par) const
Get specified parameter.
TProofProgressStatus * fProgressStatus
ClassImp(TPacketizerUnit) TPacketizerUnit
Constructor.
const char * GetName() const
Returns name of object.
void SetLastEntries(Long64_t entries)
Named parameter, streamable and storable.
virtual ~TPacketizerUnit()
Destructor.
void Warning(const char *location, const char *msgfmt,...)
Long64_t GetEntries() const
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap...
virtual Int_t GetSize() const
Int_t GetParallel() const
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Int_t GetProtocol() const
Mother of all ROOT objects.
R__EXTERN TProofServ * gProofServ
virtual void Add(TObject *obj)
Bool_t IsTopMaster() const
ClassImp(TSlaveInfo) Int_t TSlaveInfo const TSlaveInfo * si
Used to sort slaveinfos by ordinal.
Double_t GetCurrentTime()
Get current time.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.