ROOT logo
ROOT » PROOF » PROOFPLAYER » TPacketizerAdaptive

class TPacketizerAdaptive: public TVirtualPacketizer


TPacketizerAdaptive

This packetizer is based on TPacketizer but uses different
load-balancing algorithms and data structures.
Two main improvements in the load-balancing strategy:
- First one was to change the order in which the files are assigned
to the computing nodes in such a way that network transfers are
evenly distributed in the query time. Transfer of the remote files
was often becoming a bottleneck at the end of a query.
- The other improvement is the use of time-based packet size. We
measure the processing rate of all the nodes and calculate the
packet size, so that it takes certain amount of time. In this way
packetizer prevents the situation where the query can't finish
because of one slow node.

The data structures: TFileStat, TFileNode and TSlaveStat are
enriched + changed and TFileNode::Compare method is changed.


Function Members (Methods)

public:
TPacketizerAdaptive(TDSet* dset, TList* slaves, Long64_t first, Long64_t num, TList* input, TProofProgressStatus* st)
virtual~TPacketizerAdaptive()
voidTObject::AbstractMethod(const char* method) const
virtual voidTObject::AppendPad(Option_t* option = "")
virtual voidTObject::Browse(TBrowser* b)
Int_tCalculatePacketSize(TObject* slstat, Long64_t cachesz, Int_t learnent)
static TClass*Class()
virtual const char*TObject::ClassName() const
virtual voidTObject::Clear(Option_t* = "")
virtual TObject*TObject::Clone(const char* newname = "") const
virtual Int_tTObject::Compare(const TObject* obj) const
virtual voidTObject::Copy(TObject& object) const
virtual voidTObject::Delete(Option_t* option = "")MENU
virtual Int_tTObject::DistancetoPrimitive(Int_t px, Int_t py)
virtual voidTObject::Draw(Option_t* option = "")
virtual voidTObject::DrawClass() constMENU
virtual TObject*TObject::DrawClone(Option_t* option = "") constMENU
virtual voidTObject::Dump() constMENU
virtual voidTObject::Error(const char* method, const char* msgfmt) const
virtual voidTObject::Execute(const char* method, const char* params, Int_t* error = 0)
virtual voidTObject::Execute(TMethod* method, TObjArray* params, Int_t* error = 0)
virtual voidTObject::ExecuteEvent(Int_t event, Int_t px, Int_t py)
virtual voidTObject::Fatal(const char* method, const char* msgfmt) const
virtual TObject*TObject::FindObject(const char* name) const
virtual TObject*TObject::FindObject(const TObject* obj) const
virtual Int_tGetActiveWorkers()
Long64_tTVirtualPacketizer::GetBytesRead() const
Double_tTVirtualPacketizer::GetCumProcTime() const
virtual Float_tGetCurrentRate(Bool_t& all)
virtual Option_t*TObject::GetDrawOption() const
static Long_tTObject::GetDtorOnly()
Long64_tTVirtualPacketizer::GetEntriesProcessed() const
virtual Int_tGetEstEntriesProcessed(Float_t, Long64_t& ent, Long64_t& bytes, Long64_t& calls)
TList*TVirtualPacketizer::GetFailedPackets()
virtual const char*TObject::GetIconName() const
Float_tTVirtualPacketizer::GetInitTime() const
virtual const char*TObject::GetName() const
virtual TDSetElement*GetNextPacket(TSlave* sl, TMessage* r)
virtual char*TObject::GetObjectInfo(Int_t px, Int_t py) const
static Bool_tTObject::GetObjectStat()
virtual Option_t*TObject::GetOption() const
Float_tTVirtualPacketizer::GetProcTime() const
Long64_tTVirtualPacketizer::GetReadCalls() const
TProofProgressStatus*TVirtualPacketizer::GetStatus()
virtual const char*TObject::GetTitle() const
Long64_tTVirtualPacketizer::GetTotalEntries() const
virtual UInt_tTObject::GetUniqueID() const
virtual Bool_tTObject::HandleTimer(TTimer* timer)
virtual ULong_tTObject::Hash() const
virtual voidTObject::Info(const char* method, const char* msgfmt) const
virtual Bool_tTObject::InheritsFrom(const char* classname) const
virtual Bool_tTObject::InheritsFrom(const TClass* cl) const
virtual voidTObject::Inspect() constMENU
voidTObject::InvertBit(UInt_t f)
virtual TClass*IsA() const
virtual Bool_tTObject::IsEqual(const TObject* obj) const
virtual Bool_tTObject::IsFolder() const
Bool_tTObject::IsOnHeap() const
virtual Bool_tTObject::IsSortable() const
Bool_tTVirtualPacketizer::IsValid() const
Bool_tTObject::IsZombie() const
virtual voidTObject::ls(Option_t* option = "") const
virtual voidMarkBad(TSlave* s, TProofProgressStatus* status, TList** missingFiles)
voidTObject::MayNotUse(const char* method) const
virtual Bool_tTObject::Notify()
static voidTObject::operator delete(void* ptr)
static voidTObject::operator delete(void* ptr, void* vp)
static voidTObject::operator delete[](void* ptr)
static voidTObject::operator delete[](void* ptr, void* vp)
void*TObject::operator new(size_t sz)
void*TObject::operator new(size_t sz, void* vp)
void*TObject::operator new[](size_t sz)
void*TObject::operator new[](size_t sz, void* vp)
virtual voidTObject::Paint(Option_t* option = "")
virtual voidTObject::Pop()
virtual voidTObject::Print(Option_t* option = "") const
virtual Int_tTObject::Read(const char* name)
virtual voidTObject::RecursiveRemove(TObject* obj)
voidTObject::ResetBit(UInt_t f)
virtual voidTObject::SaveAs(const char* filename = "", Option_t* option = "") constMENU
virtual voidTObject::SavePrimitive(basic_ostream<char,char_traits<char> >& out, Option_t* option = "")
voidTObject::SetBit(UInt_t f)
voidTObject::SetBit(UInt_t f, Bool_t set)
virtual voidTObject::SetDrawOption(Option_t* option = "")MENU
static voidTObject::SetDtorOnly(void* obj)
voidTVirtualPacketizer::SetFailedPackets(TList* list)
virtual voidTVirtualPacketizer::SetInitTime()
static voidTObject::SetObjectStat(Bool_t stat)
voidTVirtualPacketizer::SetProgressStatus(TProofProgressStatus* st)
virtual voidTObject::SetUniqueID(UInt_t uid)
virtual voidShowMembers(TMemberInspector& insp, char* parent)
virtual voidTVirtualPacketizer::StopProcess(Bool_t abort)
virtual voidStreamer(TBuffer& b)
voidStreamerNVirtual(TBuffer& b)
virtual voidTObject::SysError(const char* method, const char* msgfmt) const
Bool_tTObject::TestBit(UInt_t f) const
Int_tTObject::TestBits(UInt_t f) const
virtual voidTObject::UseCurrentStyle()
virtual voidTObject::Warning(const char* method, const char* msgfmt) const
virtual Int_tTObject::Write(const char* name = 0, Int_t option = 0, Int_t bufsize = 0)
virtual Int_tTObject::Write(const char* name = 0, Int_t option = 0, Int_t bufsize = 0) const
protected:
virtual voidTObject::DoError(int level, const char* location, const char* fmt, va_list va) const
Long64_tTVirtualPacketizer::GetEntries(Bool_t tree, TDSetElement* e)
voidTObject::MakeZombie()
private:
TPacketizerAdaptive()
TPacketizerAdaptive(const TPacketizerAdaptive&)
virtual Int_tAddProcessed(TSlave* sl, TProofProgressStatus* st, Double_t latency, TList** listOfMissingFiles = 0)
TPacketizerAdaptive::TFileStat*GetNextActive()
TPacketizerAdaptive::TFileStat*GetNextUnAlloc(TPacketizerAdaptive::TFileNode* node = 0)
voidInitStats()
TPacketizerAdaptive::TFileNode*NextActiveNode()
TPacketizerAdaptive::TFileNode*NextNode()
voidoperator=(const TPacketizerAdaptive&)
Int_tReassignPacket(TDSetElement* e, TList** listOfMissingFiles)
voidRemoveActive(TPacketizerAdaptive::TFileStat* file)
voidRemoveActiveNode(TPacketizerAdaptive::TFileNode*)
voidRemoveUnAllocNode(TPacketizerAdaptive::TFileNode*)
voidReset()
voidSplitPerHost(TList* elements, TList** listOfMissingFiles)
voidValidateFiles(TDSet* dset, TList* slaves)

Data Members

public:
enum TVirtualPacketizer::EUseEstOpt { kEstOff
kEstCurrent
kEstAverage
};
enum TVirtualPacketizer::EStatusBits { kIsInitializing
kIsDone
};
enum TObject::EStatusBits { kCanDelete
kMustCleanup
kObjInCanvas
kIsReferenced
kHasUUID
kCannotPick
kNoContextMenu
kInvalidObject
};
enum TObject::[unnamed] { kIsOnHeap
kNotDeleted
kZombie
kBitMask
kSingleKey
kOverwrite
kWriteDelete
};
public:
static Double_tfgMaxPacketTimemaximum packet time
static Long_tfgMaxSlaveCntmaximum number of workers per filenode (Long_t to avoid
static Double_tfgMinPacketTimeminimum packet time
static Int_tfgPacketAsAFractionused to calculate the packet size
static Int_tfgStrategy0 means the classic and 1 (default) - the adaptive strategy
protected:
Bool_tTVirtualPacketizer::fStopTermination of Process() requested?
Bool_tTVirtualPacketizer::fValidConstructed properly?
private:
TList*fActivenodes with unfinished files
Float_tfBaseLocalPreferenceindicates how much more likely
TList*fFileNodesnodes with files
Bool_tfForceLocalif 1 - eliminate the remote processing
Float_tfFractionOfRemoteFilesfraction of TDSetElements
Int_tfMaxPerfIdxmaximum of our slaves' performance index
Long64_tfNEventsOnRemLocnumber of events in currently
TMap*fSlaveStatsslave status, keyed by correspondig TSlave
TList*fUnAllocatednodes with unallocated files

Class Charts

Inheritance Inherited Members Includes Libraries
Class Charts

Function documentation

TPacketizerAdaptive(TDSet* dset, TList* slaves, Long64_t first, Long64_t num, TList* input, TProofProgressStatus* st)
 Constructor
~TPacketizerAdaptive()
 Destructor.
void InitStats()
 (re)initialise the statistics
 called at the begining or after a worker dies.
void RemoveUnAllocNode(TPacketizerAdaptive::TFileNode* )
 Remove unallocated node.
void RemoveActive(TPacketizerAdaptive::TFileStat* file)
 Remove file from the list of actives.
void RemoveActiveNode(TPacketizerAdaptive::TFileNode* )
 Remove node from the list of actives.
void Reset()
 Reset the internal data structure for packet distribution.
void ValidateFiles(TDSet* dset, TList* slaves)
 Check existence of file/dir/tree an get number of entries.
 Assumes the files have been setup.
Int_t CalculatePacketSize(TObject* slstat, Long64_t cachesz, Int_t learnent)
 The result depends on the fgStrategy
Int_t AddProcessed(TSlave* sl, TProofProgressStatus* st, Double_t latency, TList** listOfMissingFiles = 0)
 To be used by GetNextPacket but also in reaction to kPROOF_STOPPROCESS
 message (when the worker was asked to stop processing during a packet).
 returns the #entries intended in the last packet - #processed entries
TDSetElement * GetNextPacket(TSlave* sl, TMessage* r)
 Get next packet;
 A meaningfull difference to TPacketizer is the fact that this
 packetizer, for each worker, tries to predict whether the worker
 will finish processing it's local files before the end of the query.
 If yes, it allocates, to those workers, files from non-slave filenodes
 or from slaves that are overloaded. The check is done every time a new
 file needs to be assigned.
Int_t GetActiveWorkers()
 Return the number of workers still processing
Float_t GetCurrentRate(Bool_t& all)
 Get Estimation of the current rate; just summing the current rates of
 the active workers
Int_t GetEstEntriesProcessed(Float_t , Long64_t& ent, Long64_t& bytes, Long64_t& calls)
 Get estimation for the number of processed entries and bytes read at time t,
 based on the numbers already processed and the latests worker measured speeds.
 If t <= 0 the cutrent time is used.
 Only the estimation for the entries is currently implemented.
 This is needed to smooth the instantaneous rate plot.
void MarkBad(TSlave* s, TProofProgressStatus* status, TList** missingFiles)
 This method can be called at any time during processing
 as an effect of handling kPROOF_STOPPROCESS
 If the output list from this worker is going to be sent back to the master,
 the 'status' includes the number of entries processed by the slave.
 From this we calculate the remaining part of the packet.
 0 indicates that the results from that worker were lost completely.
 Assume that the filenodes for which we have a TFileNode object
 are still up and running.
Int_t ReassignPacket(TDSetElement* e, TList** listOfMissingFiles)
 The file in the listOfMissingFiles can appear several times;
 in order to fix that, a TDSetElement::Merge method is needed.
void SplitPerHost(TList* elements, TList** listOfMissingFiles)
 Split into per host entries
 The files in the listOfMissingFiles can appear several times;
 in order to fix that, a TDSetElement::Merge method is needed.
TPacketizerAdaptive()
TPacketizerAdaptive(const TPacketizerAdaptive& )
void operator=(const TPacketizerAdaptive& )
TFileNode * NextNode()
TFileNode * NextActiveNode()
TFileStat * GetNextUnAlloc(TPacketizerAdaptive::TFileNode* node = 0)
TFileStat * GetNextActive()