69 void UpdatePerformance(
Double_t time);
74 class TPacketizerFile::TIterObj :
public TObject {
81 TIterObj(
const char *
n,
TIter *iter) : fName(n), fIter(iter) { }
82 virtual ~TIterObj() {
if (fIter)
delete fIter; }
84 const char *
GetName()
const {
return fName;}
85 TIter *GetIter()
const {
return fIter;}
98 PDB(kPacketizer,1)
Info(
"TPacketizerFile",
"enter");
105 if (!input || (input && input->
GetSize() <= 0)) {
106 Error(
"TPacketizerFile",
"input file is undefined or empty!");
112 Int_t procnotass = 1;
114 if (procnotass == 0) {
115 Info(
"TPacketizerFile",
"files not assigned to workers will not be processed");
121 Int_t addfileinfo = 0;
123 if (addfileinfo == 1) {
124 Info(
"TPacketizerFile",
125 "TFileInfo object will be included in the packet as associated object");
131 if (!(
fFiles = dynamic_cast<TMap *>(input->
FindObject(
"PROOF_FilesToProcess")))) {
132 Error(
"TPacketizerFile",
"map of files to be processed/created not found");
148 Info(
"TPacketizerFile",
"worker: %s", wrkname.
Data());
162 while ((key = nxl()) != 0) {
166 if (fc) wrklist = fc->
GetList();
175 Info(
"TPacketizerFile",
"%d files of '%s' (fqdn: '%s') assigned to '%s'",
185 Info(
"TPacketizerFile",
"%d files of '%s' (fqdn: '%s') not assigned",
197 Error(
"TPacketizerFile",
"no file path in the map!");
209 PDB(kPacketizer,1)
Info(
"TPacketizerFile",
"return");
249 while ((key = nxw()) != 0) {
251 if (wrkstat && wrkstat->GetProgressStatus() && wrkstat->GetEntriesProcessed() > 0) {
253 currate += wrkstat->GetProgressStatus()->GetCurrentRate();
274 Error(
"GetNextPacket",
"could not find stat object for worker '%s'!", wrk->
GetName());
282 Double_t latency = 0., proctime = 0., proccpu = 0.;
297 numev = status->
GetEntries() - wrkstat->GetEntriesProcessed();
298 progress = wrkstat->AddProcessed(status);
303 totev = status->GetEntries();
309 Error(
"GetNextPacket",
"no status came in the kPROOF_GETPACKET message");
312 (*r) >> latency >> proctime >> proccpu;
319 numev = totev - wrkstat->GetEntriesProcessed();
320 wrkstat->GetProgressStatus()->IncEntries(numev);
321 wrkstat->GetProgressStatus()->SetLastUpdate();
328 Info(
"GetNextPacket",
"worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
330 numev, latency, proctime, proccpu, bytesRead);
334 latency, proctime, proccpu, bytesRead);
350 Info(
"GetNextPacket",
"worker-%s (%s): getting next files ... ", wrk->
GetOrdinal(),
362 nextfile = io->GetIter()->Next();
371 nextfile = io->GetIter()->Next();
376 if (!nextfile)
return elem;
382 if ((os = dynamic_cast<TObjString *>(nextfile))) {
385 if ((fi = dynamic_cast<TFileInfo *>(nextfile)))
390 Warning(
"GetNextPacket",
"found unsupported object of type '%s' in list: it must" 391 " be 'TObjString' or 'TFileInfo'", nextfile->ClassName());
396 Info(
"GetNextPacket",
"worker-%s: assigning: '%s' (remaining %lld files)",
418 TPacketizerFile::TSlaveStat::TSlaveStat(
TSlave *slave,
TList *input)
420 fSpeed(0), fTimeInstant(0), fCircLvl(5)
423 fCircNtp =
new TNtupleD(
"Speed Circ Ntp",
"Circular process info",
"tm:ev");
425 fCircLvl = (fCircLvl > 0) ? fCircLvl : 5;
426 fCircNtp->SetCircular(fCircLvl);
434 TPacketizerFile::TSlaveStat::~TSlaveStat()
442 void TPacketizerFile::TSlaveStat::UpdatePerformance(
Double_t time)
446 Int_t ne = fCircNtp->GetEntries();
449 fCircNtp->Fill(0., 0);
454 fCircNtp->GetEntry(ne-1);
459 fCircNtp->GetEntry(0);
460 Double_t dtime = (ttot > ar[0]) ? ttot - ar[0] : ne+1 ;
462 fSpeed = nevts / dtime;
464 Info(
"UpdatePerformance",
"time:%f, dtime:%f, nevts:%lld, speed: %f",
465 time, dtime, nevts, fSpeed);
479 fStatus->SetLastProcTime(0.);
487 Error(
"AddProcessed",
"status arg undefined");
498 ((GetIter() && GetIter()->GetCollection()) ? GetIter()->GetCollection()->GetSize()
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
A simple TTree restricted to a list of double variables only.
Collectable string class.
void Print(Option_t *options="") const
Print information about this object.
This class represents a WWW compatible URL.
TUrl * GetCurrentUrl() const
Return the current url.
TObject * GetParameter(const char *par) const
Get specified parameter.
virtual ~TPacketizerFile()
Destructor.
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
Double_t GetProcTime() const
void SetLastUpdate(Double_t updtTime=0)
Update time stamp either with the passed value (if > 0) or with the current time. ...
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection)...
void AddAssocObj(TObject *assocobj)
Add an associated object to the list.
const char * GetOrdinal() const
virtual void Print(Option_t *option="") const
This method must be overridden when a class wants to print itself.
const char * GetName() const
Returns name of object.
Long64_t GetEntries() const
Int_t GetProtocol() const
Double_t GetCurrentTime()
Get current time.
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
const char * GetUrl(Bool_t withDeflt=kFALSE) const
Return full URL.
virtual TObject * FindObject(const char *name) const
Delete a TObjLink object.
Manages an element of a TDSet.
static struct mg_connection * fc(struct mg_context *ctx)
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
void IncEntries(Long64_t entries=1)
void Continue()
Resume a stopped stopwatch.
TProofProgressStatus * fProgressStatus
const char * GetName() const
Returns name of object.
void SetLastEntries(Long64_t entries)
TDSetElement * GetNextPacket(TSlave *wrk, TMessage *r)
Get next packet.
if object ctor succeeded but object should not be used
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
void SetName(const char *name)
The packetizer is a load balancing object created for each query.
void Print(std::ostream &os, const OptionType &opt)
Long64_t GetEntriesProcessed() const
Double_t GetCPUTime() const
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Mother of all ROOT objects.
Long64_t GetBytesRead() const
virtual void Add(TObject *obj)
Class that contains a list of TFileInfo's and accumulated meta data information about its entries...
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Class describing a generic file including meta information.
virtual void Print(Option_t *option="") const
Default print for collections, calls Print(option, 1).
virtual const char * GetName() const
Returns name of object.
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Class describing a PROOF worker server.
Container class for processing statistics.
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
virtual TProofProgressStatus * AddProcessed(TProofProgressStatus *st)=0
This packetizer generates packets which contain a single file path to be used in process.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
const char * Data() const