70 void UpdatePerformance(
Double_t time);
75 class TPacketizerFile::TIterObj :
public TObject {
82 TIterObj(
const char *
n,
TIter *
iter) : fName(n), fIter(iter) { }
83 virtual ~TIterObj() {
if (fIter)
delete fIter; }
85 const char *
GetName()
const {
return fName;}
86 TIter *GetIter()
const {
return fIter;}
99 PDB(kPacketizer,1)
Info(
"TPacketizerFile",
"enter");
103 fProcNotAssigned =
kTRUE;
106 if (!input || (input && input->GetSize() <= 0)) {
107 Error(
"TPacketizerFile",
"input file is undefined or empty!");
113 Int_t procnotass = 1;
115 if (procnotass == 0) {
116 Info(
"TPacketizerFile",
"files not assigned to workers will not be processed");
117 fProcNotAssigned =
kFALSE;
122 Int_t addfileinfo = 0;
124 if (addfileinfo == 1) {
125 Info(
"TPacketizerFile",
126 "TFileInfo object will be included in the packet as associated object");
127 fAddFileInfo =
kTRUE;
132 if (!(fFiles = dynamic_cast<TMap *>(input->FindObject(
"PROOF_FilesToProcess")))) {
133 Error(
"TPacketizerFile",
"map of files to be processed/created not found");
139 fSlaveStats =
new TMap;
147 fSlaveStats->Add(wrk,
new TSlaveStat(wrk, input));
149 Info(
"TPacketizerFile",
"worker: %s", wrkname.
Data());
159 fNotAssigned =
new TList;
163 while ((key = nxl()) != 0) {
167 if (fc) wrklist = fc->
GetList();
172 fTotalEntries += wrklist->
GetSize();
173 fIters->Add(
new TIterObj(hname,
new TIter(wrklist)));
176 Info(
"TPacketizerFile",
"%d files of '%s' (fqdn: '%s') assigned to '%s'",
183 fNotAssigned->Add(o);
186 Info(
"TPacketizerFile",
"%d files of '%s' (fqdn: '%s') not assigned",
191 if (fNotAssigned && fNotAssigned->GetSize() > 0) {
192 fTotalEntries += fNotAssigned->GetSize();
193 fIters->Add(
new TIterObj(
"*",
new TIter(fNotAssigned)));
194 Info(
"TPacketizerFile",
"non-assigned files: %d", fNotAssigned->GetSize());
195 fNotAssigned->Print();
197 if (fTotalEntries <= 0) {
198 Error(
"TPacketizerFile",
"no file path in the map!");
203 Info(
"TPacketizerFile",
"processing %lld files", fTotalEntries);
210 PDB(kPacketizer,1)
Info(
"TPacketizerFile",
"return");
250 while ((key = nxw()) != 0) {
252 if (wrkstat && wrkstat->GetProgressStatus() && wrkstat->GetEntriesProcessed() > 0) {
254 currate += wrkstat->GetProgressStatus()->GetCurrentRate();
275 Error(
"GetNextPacket",
"could not find stat object for worker '%s'!", wrk->
GetName());
283 Double_t latency = 0., proctime = 0., proccpu = 0.;
298 numev = status->
GetEntries() - wrkstat->GetEntriesProcessed();
299 progress = wrkstat->AddProcessed(status);
304 totev = status->GetEntries();
310 Error(
"GetNextPacket",
"no status came in the kPROOF_GETPACKET message");
313 (*r) >> latency >> proctime >> proccpu;
320 numev = totev - wrkstat->GetEntriesProcessed();
321 wrkstat->GetProgressStatus()->IncEntries(numev);
322 wrkstat->GetProgressStatus()->SetLastUpdate();
329 Info(
"GetNextPacket",
"worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
331 numev, latency, proctime, proccpu, bytesRead);
335 latency, proctime, proccpu, bytesRead);
351 Info(
"GetNextPacket",
"worker-%s (%s): getting next files ... ", wrk->
GetOrdinal(),
363 nextfile = io->GetIter()->Next();
372 nextfile = io->GetIter()->Next();
377 if (!nextfile)
return elem;
383 if ((os = dynamic_cast<TObjString *>(nextfile))) {
386 if ((fi = dynamic_cast<TFileInfo *>(nextfile)))
391 Warning(
"GetNextPacket",
"found unsupported object of type '%s' in list: it must"
392 " be 'TObjString' or 'TFileInfo'", nextfile->ClassName());
397 Info(
"GetNextPacket",
"worker-%s: assigning: '%s' (remaining %lld files)",
419 TPacketizerFile::TSlaveStat::TSlaveStat(
TSlave *slave,
TList *input)
421 fSpeed(0), fTimeInstant(0), fCircLvl(5)
424 fCircNtp =
new TNtupleD(
"Speed Circ Ntp",
"Circular process info",
"tm:ev");
426 fCircLvl = (fCircLvl > 0) ? fCircLvl : 5;
427 fCircNtp->SetCircular(fCircLvl);
435 TPacketizerFile::TSlaveStat::~TSlaveStat()
443 void TPacketizerFile::TSlaveStat::UpdatePerformance(
Double_t time)
447 Int_t ne = fCircNtp->GetEntries();
450 fCircNtp->Fill(0., 0);
455 fCircNtp->GetEntry(ne-1);
457 fCircNtp->Fill(ttot, GetEntriesProcessed());
460 fCircNtp->GetEntry(0);
461 Double_t dtime = (ttot > ar[0]) ? ttot - ar[0] : ne+1 ;
463 fSpeed = nevts / dtime;
465 Info("UpdatePerformance", "time:%f, dtime:%f, nevts:%lld, speed: %f",
466 time, dtime, nevts, fSpeed);
480 fStatus->SetLastProcTime(0.);
488 Error(
"AddProcessed",
"status arg undefined");
498 Printf(
"Iterator '%s' controls %d units", GetName(),
499 ((GetIter() && GetIter()->GetCollection()) ? GetIter()->GetCollection()->GetSize()
const char * GetName() const
Returns name of object.
const char * GetOrdinal() const
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Print(Option_t *options="") const
Print information about this object.
A simple TTree restricted to a list of double variables only.
Collectable string class.
This class represents a WWW compatible URL.
virtual ~TPacketizerFile()
Destructor.
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Long64_t GetBytesRead() const
void SetLastUpdate(Double_t updtTime=0)
Update time stamp either with the passed value (if > 0) or with the current time. ...
static const char * filename()
void AddAssocObj(TObject *assocobj)
Add an associated object to the list.
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Double_t GetCurrentTime()
Get current time.
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Double_t GetProcTime() const
const char * Data() const
Double_t GetCPUTime() const
static struct mg_connection * fc(struct mg_context *ctx)
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
void IncEntries(Long64_t entries=1)
std::map< std::string, std::string >::const_iterator iter
void Info(const char *location, const char *msgfmt,...)
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
void Continue()
Resume a stopped stopwatch.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
void Error(const char *location, const char *msgfmt,...)
TObject * GetParameter(const char *par) const
Get specified parameter.
TProofProgressStatus * fProgressStatus
virtual void Print(Option_t *option="") const
This method must be overridden when a class wants to print itself.
const char * GetName() const
Returns name of object.
void SetLastEntries(Long64_t entries)
TDSetElement * GetNextPacket(TSlave *wrk, TMessage *r)
Get next packet.
void SetName(const char *name)
Long64_t GetEntries() const
Long64_t GetEntries(Bool_t tree, TDSetElement *e)
Get entries.
const char * GetUrl(Bool_t withDeflt=kFALSE) const
Return full URL.
ClassImp(TPacketizerFile) TPacketizerFile
Constructor.
virtual Int_t GetSize() const
void Print(std::ostream &os, const OptionType &opt)
virtual const char * GetName() const
Returns name of object.
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Int_t GetProtocol() const
Mother of all ROOT objects.
TUrl * GetCurrentUrl() const
Return the current url.
virtual void Add(TObject *obj)
Class that contains a list of TFileInfo's and accumulated meta data information about its entries...
ClassImp(TSlaveInfo) Int_t TSlaveInfo const TSlaveInfo * si
Used to sort slaveinfos by ordinal.
Class describing a generic file including meta information.
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
virtual TProofProgressStatus * AddProcessed(TProofProgressStatus *st)=0
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.