ROOT logo
ROOT » PROOF » PROOFPLAYER » TPacketizerAdaptive

class TPacketizerAdaptive: public TVirtualPacketizer


TPacketizerAdaptive

This packetizer is based on TPacketizer but uses different
load-balancing algorithms and data structures.
Two main improvements in the load-balancing strategy:
- First one was to change the order in which the files are assigned
to the computing nodes in such a way that network transfers are
evenly distributed in the query time. Transfer of the remote files
was often becoming a bottleneck at the end of a query.
- The other improvement is the use of time-based packet size. We
measure the processing rate of all the nodes and calculate the
packet size, so that it takes certain amount of time. In this way
packetizer prevents the situation where the query can't finish
because of one slow node.

The data structures: TFileStat, TFileNode and TSlaveStat are
enriched + changed and TFileNode::Compare method is changed.


Function Members (Methods)

public:
TPacketizerAdaptive(TDSet* dset, TList* slaves, Long64_t first, Long64_t num, TList* input, TProofProgressStatus* st)
virtual~TPacketizerAdaptive()
voidTObject::AbstractMethod(const char* method) const
virtual Int_tAddProcessed(TSlave* sl, TProofProgressStatus* st, Double_t latency, TList** listOfMissingFiles = 0)
virtual voidTObject::AppendPad(Option_t* option = "")
virtual voidTObject::Browse(TBrowser* b)
Int_tCalculatePacketSize(TObject* slstat, Long64_t cachesz, Int_t learnent)
static TClass*Class()
virtual const char*TObject::ClassName() const
virtual voidTObject::Clear(Option_t* = "")
virtual TObject*TObject::Clone(const char* newname = "") const
virtual Int_tTObject::Compare(const TObject* obj) const
virtual voidTObject::Copy(TObject& object) const
virtual voidTObject::Delete(Option_t* option = "")MENU
virtual Int_tTObject::DistancetoPrimitive(Int_t px, Int_t py)
virtual voidTObject::Draw(Option_t* option = "")
virtual voidTObject::DrawClass() constMENU
virtual TObject*TObject::DrawClone(Option_t* option = "") constMENU
virtual voidTObject::Dump() constMENU
virtual voidTObject::Error(const char* method, const char* msgfmt) const
virtual voidTObject::Execute(const char* method, const char* params, Int_t* error = 0)
virtual voidTObject::Execute(TMethod* method, TObjArray* params, Int_t* error = 0)
virtual voidTObject::ExecuteEvent(Int_t event, Int_t px, Int_t py)
virtual voidTObject::Fatal(const char* method, const char* msgfmt) const
virtual TObject*TObject::FindObject(const char* name) const
virtual TObject*TObject::FindObject(const TObject* obj) const
virtual Int_tGetActiveWorkers()
Long64_tTVirtualPacketizer::GetBytesRead() const
TList*TVirtualPacketizer::GetConfigParams(Bool_t steal = kFALSE)
Double_tTVirtualPacketizer::GetCumProcTime() const
virtual Float_tGetCurrentRate(Bool_t& all)
virtual Option_t*TObject::GetDrawOption() const
static Long_tTObject::GetDtorOnly()
Long64_tTVirtualPacketizer::GetEntriesProcessed() const
virtual Int_tGetEstEntriesProcessed(Float_t, Long64_t& ent, Long64_t& bytes, Long64_t& calls)
TList*TVirtualPacketizer::GetFailedPackets()
virtual const char*TObject::GetIconName() const
Float_tTVirtualPacketizer::GetInitTime() const
virtual const char*TObject::GetName() const
virtual TDSetElement*GetNextPacket(TSlave* sl, TMessage* r)
virtual char*TObject::GetObjectInfo(Int_t px, Int_t py) const
static Bool_tTObject::GetObjectStat()
virtual Option_t*TObject::GetOption() const
Float_tTVirtualPacketizer::GetProcTime() const
TNtuple*TVirtualPacketizer::GetProgressPerf(Bool_t steal = kFALSE)
Long64_tTVirtualPacketizer::GetReadCalls() const
TMap*TVirtualPacketizer::GetSlaveStats() const
TProofProgressStatus*TVirtualPacketizer::GetStatus()
virtual const char*TObject::GetTitle() const
Long64_tTVirtualPacketizer::GetTotalEntries() const
virtual UInt_tTObject::GetUniqueID() const
virtual ULong_tTObject::Hash() const
virtual voidTObject::Info(const char* method, const char* msgfmt) const
virtual Bool_tTObject::InheritsFrom(const char* classname) const
virtual Bool_tTObject::InheritsFrom(const TClass* cl) const
virtual voidTObject::Inspect() constMENU
voidTObject::InvertBit(UInt_t f)
virtual TClass*IsA() const
virtual Bool_tTObject::IsEqual(const TObject* obj) const
virtual Bool_tTObject::IsFolder() const
Bool_tTObject::IsOnHeap() const
virtual Bool_tTObject::IsSortable() const
Bool_tTVirtualPacketizer::IsValid() const
Bool_tTObject::IsZombie() const
virtual voidTObject::ls(Option_t* option = "") const
virtual voidMarkBad(TSlave* s, TProofProgressStatus* status, TList** missingFiles)
voidTObject::MayNotUse(const char* method) const
virtual Bool_tTObject::Notify()
static voidTObject::operator delete(void* ptr)
static voidTObject::operator delete(void* ptr, void* vp)
static voidTObject::operator delete[](void* ptr)
static voidTObject::operator delete[](void* ptr, void* vp)
void*TObject::operator new(size_t sz)
void*TObject::operator new(size_t sz, void* vp)
void*TObject::operator new[](size_t sz)
void*TObject::operator new[](size_t sz, void* vp)
virtual voidTObject::Paint(Option_t* option = "")
virtual voidTObject::Pop()
virtual voidTObject::Print(Option_t* option = "") const
virtual Int_tTObject::Read(const char* name)
virtual voidTObject::RecursiveRemove(TObject* obj)
voidTObject::ResetBit(UInt_t f)
virtual voidTObject::SaveAs(const char* filename = "", Option_t* option = "") constMENU
virtual voidTObject::SavePrimitive(basic_ostream<char,char_traits<char> >& out, Option_t* option = "")
voidTObject::SetBit(UInt_t f)
voidTObject::SetBit(UInt_t f, Bool_t set)
virtual voidTObject::SetDrawOption(Option_t* option = "")MENU
static voidTObject::SetDtorOnly(void* obj)
voidTVirtualPacketizer::SetFailedPackets(TList* list)
virtual voidTVirtualPacketizer::SetInitTime()
static voidTObject::SetObjectStat(Bool_t stat)
voidTVirtualPacketizer::SetProgressStatus(TProofProgressStatus* st)
voidTVirtualPacketizer::SetTotalEntries(Long64_t ent)
virtual voidTObject::SetUniqueID(UInt_t uid)
virtual voidShowMembers(TMemberInspector& insp)
virtual voidTVirtualPacketizer::StopProcess(Bool_t abort)
virtual voidStreamer(TBuffer& b)
voidStreamerNVirtual(TBuffer& b)
virtual voidTObject::SysError(const char* method, const char* msgfmt) const
Bool_tTObject::TestBit(UInt_t f) const
Int_tTObject::TestBits(UInt_t f) const
virtual voidTObject::UseCurrentStyle()
virtual voidTObject::Warning(const char* method, const char* msgfmt) const
virtual Int_tTObject::Write(const char* name = 0, Int_t option = 0, Int_t bufsize = 0)
virtual Int_tTObject::Write(const char* name = 0, Int_t option = 0, Int_t bufsize = 0) const
protected:
TDSetElement*TVirtualPacketizer::CreateNewPacket(TDSetElement* base, Long64_t first, Long64_t num)
virtual voidTObject::DoError(int level, const char* location, const char* fmt, va_list va) const
Long64_tTVirtualPacketizer::GetEntries(Bool_t tree, TDSetElement* e)
virtual Bool_tTVirtualPacketizer::HandleTimer(TTimer* timer)
voidTObject::MakeZombie()
private:
TPacketizerAdaptive()
TPacketizerAdaptive(const TPacketizerAdaptive&)
TPacketizerAdaptive::TFileStat*GetNextActive()
TPacketizerAdaptive::TFileStat*GetNextUnAlloc(TPacketizerAdaptive::TFileNode* node = 0, const char* nodeHostName = 0)
voidInitStats()
TPacketizerAdaptive::TFileNode*NextActiveNode()
TPacketizerAdaptive::TFileNode*NextNode()
voidoperator=(const TPacketizerAdaptive&)
Int_tReassignPacket(TDSetElement* e, TList** listOfMissingFiles)
voidRemoveActive(TPacketizerAdaptive::TFileStat* file)
voidRemoveActiveNode(TPacketizerAdaptive::TFileNode*)
voidRemoveUnAllocNode(TPacketizerAdaptive::TFileNode*)
voidReset()
voidSplitPerHost(TList* elements, TList** listOfMissingFiles)
voidValidateFiles(TDSet* dset, TList* slaves, Long64_t maxent = -1, Bool_t byfile = kFALSE)

Data Members

public:
enum TVirtualPacketizer::EUseEstOpt { kEstOff
kEstCurrent
kEstAverage
};
enum TVirtualPacketizer::EStatusBits { kIsInitializing
kIsDone
kIsTree
};
enum TObject::EStatusBits { kCanDelete
kMustCleanup
kObjInCanvas
kIsReferenced
kHasUUID
kCannotPick
kNoContextMenu
kInvalidObject
};
enum TObject::[unnamed] { kIsOnHeap
kNotDeleted
kZombie
kBitMask
kSingleKey
kOverwrite
kWriteDelete
};
protected:
Bool_tTVirtualPacketizer::fAWLastFillWhether to fill the last measurement
Int_tTVirtualPacketizer::fActWrksLastActive workers at fProcTimeLast
Long_tTVirtualPacketizer::fCircNCircularity
TNtupleD*TVirtualPacketizer::fCircProgKeeps circular info for "instantenous"
TList*TVirtualPacketizer::fConfigParamsList of configuration parameters
TStringTVirtualPacketizer::fDataSetName of the dataset being processed (for dataset-driven runs)
Float_tTVirtualPacketizer::fEffSessLastNumber of effective sessions at fProcTimeLast
Float_tTVirtualPacketizer::fEvtRateLastEvt rate at fProcTimeLast
TList*TVirtualPacketizer::fFailedPacketsa list of packets that failed while processing
Float_tTVirtualPacketizer::fInitTimetime before processing
Float_tTVirtualPacketizer::fMBsReadLastMBs read at fProcTimeLast
Double_tTVirtualPacketizer::fMaxPacketTimemaximum packet time
Double_tTVirtualPacketizer::fMinPacketTimeminimum packet time
Float_tTVirtualPacketizer::fProcTimetime since start of processing
Float_tTVirtualPacketizer::fProcTimeLastTime of the last measurement
TTimer*TVirtualPacketizer::fProgressprogress updates timer
TNtuple*TVirtualPacketizer::fProgressPerf{Active workers, evt rate, MBs read} as a function of processing time
TProofProgressStatus*TVirtualPacketizer::fProgressStatuspointer to status in the player.
Float_tTVirtualPacketizer::fReportPeriodTime between reports if nothing changes (estimated proc time / 100)
TMap*TVirtualPacketizer::fSlaveStatsslave status, keyed by correspondig TSlave
TTimeTVirtualPacketizer::fStartTimetime offset
Bool_tTVirtualPacketizer::fStopTermination of Process() requested?
Float_tTVirtualPacketizer::fTimeUpdttime between updates
Long64_tTVirtualPacketizer::fTotalEntriestotal number of entries to be distributed;
TVirtualPacketizer::EUseEstOptTVirtualPacketizer::fUseEstOptControl usage of estimated values for the progress info
Bool_tTVirtualPacketizer::fValidConstructed properly?
private:
TList*fActivenodes with unfinished files
Float_tfBaseLocalPreferenceindicates how much more likely the nodes will be
Bool_tfCachePacketSynccontrol synchronization of cache and packet sizes
TList*fFileNodesnodes with files
TSortedList*fFilesToProcessGlobal list of files (TFileStat) to be processed
Bool_tfForceLocalif 1 - eliminate the remote processing
Float_tfFractionOfRemoteFilesfraction of TDSetElements that are on non-workers
Double_tfMaxEntriesRatiomax file entries to avg allowed ratio for cache-to-packet sync
Int_tfMaxPerfIdxmaximum of our slaves' performance index
Long_tfMaxSlaveCntmaximum number of workers per filenode (Long_t to avoid
Long64_tfNEventsOnRemLocnumber of events in currently
Int_tfPacketAsAFractionused to calculate the packet size
TList*fPartitionslist of partitions on nodes
Int_tfStrategy0 means the classic and 1 (default) - the adaptive strategy
TList*fUnAllocatednodes with unallocated files

Class Charts

Inheritance Inherited Members Includes Libraries
Class Charts

Function documentation

TPacketizerAdaptive(TDSet* dset, TList* slaves, Long64_t first, Long64_t num, TList* input, TProofProgressStatus* st)
 Constructor
~TPacketizerAdaptive()
 Destructor.
void InitStats()
 (re)initialise the statistics
 called at the begining or after a worker dies.
void RemoveUnAllocNode(TPacketizerAdaptive::TFileNode* )
 Remove unallocated node.
void RemoveActive(TPacketizerAdaptive::TFileStat* file)
 Remove file from the list of actives.
void RemoveActiveNode(TPacketizerAdaptive::TFileNode* )
 Remove node from the list of actives.
void Reset()
 Reset the internal data structure for packet distribution.
void ValidateFiles(TDSet* dset, TList* slaves, Long64_t maxent = -1, Bool_t byfile = kFALSE)
 Check existence of file/dir/tree an get number of entries.
 Assumes the files have been setup.
Int_t CalculatePacketSize(TObject* slstat, Long64_t cachesz, Int_t learnent)
 The result depends on the fStrategy
Int_t AddProcessed(TSlave* sl, TProofProgressStatus* st, Double_t latency, TList** listOfMissingFiles = 0)
 To be used by GetNextPacket but also in reaction to kPROOF_STOPPROCESS
 message (when the worker was asked to stop processing during a packet).
 returns the #entries intended in the last packet - #processed entries
TDSetElement * GetNextPacket(TSlave* sl, TMessage* r)
 Get next packet;
 A meaningfull difference to TPacketizer is the fact that this
 packetizer, for each worker, tries to predict whether the worker
 will finish processing it's local files before the end of the query.
 If yes, it allocates, to those workers, files from non-slave filenodes
 or from slaves that are overloaded. The check is done every time a new
 file needs to be assigned.
Int_t GetActiveWorkers()
 Return the number of workers still processing
Float_t GetCurrentRate(Bool_t& all)
 Get Estimation of the current rate; just summing the current rates of
 the active workers
Int_t GetEstEntriesProcessed(Float_t , Long64_t& ent, Long64_t& bytes, Long64_t& calls)
 Get estimation for the number of processed entries and bytes read at time t,
 based on the numbers already processed and the latests worker measured speeds.
 If t <= 0 the current time is used.
 Only the estimation for the entries is currently implemented.
 This is needed to smooth the instantaneous rate plot.
void MarkBad(TSlave* s, TProofProgressStatus* status, TList** missingFiles)
 This method can be called at any time during processing
 as an effect of handling kPROOF_STOPPROCESS
 If the output list from this worker is going to be sent back to the master,
 the 'status' includes the number of entries processed by the slave.
 From this we calculate the remaining part of the packet.
 0 indicates that the results from that worker were lost completely.
 Assume that the filenodes for which we have a TFileNode object
 are still up and running.
Int_t ReassignPacket(TDSetElement* e, TList** listOfMissingFiles)
 The file in the listOfMissingFiles can appear several times;
 in order to fix that, a TDSetElement::Merge method is needed.
void SplitPerHost(TList* elements, TList** listOfMissingFiles)
 Split into per host entries
 The files in the listOfMissingFiles can appear several times;
 in order to fix that, a TDSetElement::Merge method is needed.
TPacketizerAdaptive()
TPacketizerAdaptive(const TPacketizerAdaptive& )
void operator=(const TPacketizerAdaptive& )
TFileNode * NextNode()
TFileNode * NextActiveNode()
TFileStat * GetNextUnAlloc(TPacketizerAdaptive::TFileNode* node = 0, const char* nodeHostName = 0)
TFileStat * GetNextActive()