#ifndef ROOT_TVirtualPacketizer
#define ROOT_TVirtualPacketizer
#ifndef ROOT_TObject
#include "TObject.h"
#endif
#ifndef ROOT_TSlave
#include "TSlave.h"
#endif
#ifndef ROOT_TProofProgressStatus
#include "TProofProgressStatus.h"
#endif
#ifndef ROOT_TTime
#include "TTime.h"
#endif
class TDSet;
class TDSetElement;
class TList;
class TMap;
class TMessage;
class TNtuple;
class TNtupleD;
class TProofProgressInfo;
class TSlave;
class TVirtualPacketizer : public TObject {
public:
class TVirtualSlaveStat;
protected:
enum EUseEstOpt {
kEstOff = 0,
kEstCurrent = 1,
kEstAverage = 2
};
Double_t fMinPacketTime;
Double_t fMaxPacketTime;
TList *fConfigParams;
TMap *fSlaveStats;
TProofProgressStatus *fProgressStatus;
TTimer *fProgress;
Long64_t fTotalEntries;
TList *fFailedPackets;
TTime fStartTime;
Float_t fInitTime;
Float_t fProcTime;
Float_t fTimeUpdt;
TNtupleD *fCircProg;
Long_t fCircN;
TNtuple *fProgressPerf;
Float_t fProcTimeLast;
Int_t fActWrksLast;
Float_t fEvtRateLast;
Float_t fMBsReadLast;
Float_t fEffSessLast;
Bool_t fAWLastFill;
Float_t fReportPeriod;
EUseEstOpt fUseEstOpt;
Bool_t fValid;
Bool_t fStop;
TString fDataSet;
TVirtualPacketizer(TList *input, TProofProgressStatus *st = 0);
TVirtualPacketizer(const TVirtualPacketizer &);
void operator=(const TVirtualPacketizer &);
TDSetElement *CreateNewPacket(TDSetElement* base, Long64_t first, Long64_t num);
Long64_t GetEntries(Bool_t tree, TDSetElement *e);
virtual Bool_t HandleTimer(TTimer *timer);
public:
enum EStatusBits { kIsInitializing = BIT(16), kIsDone = BIT(17), kIsTree = BIT(18) };
virtual ~TVirtualPacketizer();
virtual Int_t AssignWork(TDSet* , Long64_t , Long64_t ) { return -1; }
Bool_t IsValid() const { return fValid; }
Long64_t GetEntriesProcessed() const { return (fProgressStatus? fProgressStatus->GetEntries() : 0); }
virtual Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls)
{ ent = GetEntriesProcessed(); bytes = GetBytesRead(); calls = GetReadCalls(); return 0; }
virtual Float_t GetCurrentRate(Bool_t &all) { all = kTRUE; return (fProgressStatus? fProgressStatus->GetCurrentRate() : 0.); }
Long64_t GetTotalEntries() const { return fTotalEntries; }
virtual TDSetElement *GetNextPacket(TSlave *sl, TMessage *r);
virtual void SetInitTime();
virtual void StopProcess(Bool_t abort, Bool_t stoptimer = kFALSE);
TList *GetFailedPackets() { return fFailedPackets; }
void SetFailedPackets(TList *list) { fFailedPackets = list; }
virtual Int_t AddWorkers(TList *workers);
Long64_t GetBytesRead() const { return (fProgressStatus? fProgressStatus->GetBytesRead() : 0); }
Long64_t GetReadCalls() const { return (fProgressStatus? fProgressStatus->GetReadCalls() : 0); }
Double_t GetCumProcTime() const { return fProgressStatus->GetProcTime(); }
Float_t GetInitTime() const { return fInitTime; }
Float_t GetProcTime() const { return fProcTime; }
TNtuple *GetProgressPerf(Bool_t steal = kFALSE) { if (steal) { TNtuple *n = fProgressPerf; fProgressPerf = 0; return n;
} else { return fProgressPerf;} }
TList *GetConfigParams(Bool_t steal = kFALSE) { if (steal) { TList *l = fConfigParams; fConfigParams = 0; return l;
} else { return fConfigParams;} }
virtual void MarkBad(TSlave * , TProofProgressStatus * , TList ** ) { return; }
virtual Int_t AddProcessed(TSlave * , TProofProgressStatus * ,
Double_t , TList ** ) { return 0; }
TProofProgressStatus *GetStatus() { return fProgressStatus; }
void SetProgressStatus(TProofProgressStatus *st) { fProgressStatus = st; }
void SetTotalEntries(Long64_t ent) { fTotalEntries = ent; }
TMap *GetSlaveStats() const { return fSlaveStats; }
virtual Int_t GetActiveWorkers() { return -1; }
ClassDef(TVirtualPacketizer,0)
};
class TVirtualPacketizer::TVirtualSlaveStat : public TObject {
friend class TPacketizerAdaptive;
friend class TPacketizer;
protected:
TString fWrkFQDN;
TSlave *fSlave;
TProofProgressStatus *fStatus;
public:
const char *GetName() const { return fWrkFQDN.Data(); }
const char *GetOrdinal() const { return fSlave->GetOrdinal(); }
Long64_t GetEntriesProcessed() const { return fStatus?fStatus->GetEntries():-1; }
Double_t GetProcTime() const { return fStatus?fStatus->GetProcTime():-1; }
Float_t GetAvgRate() { return fStatus->GetRate(); }
TProofProgressStatus *GetProgressStatus() { return fStatus; }
virtual TProofProgressStatus *AddProcessed(TProofProgressStatus *st) = 0;
};
#endif