This packetizer is based on TPacketizer but uses different load-balancing algorithms and data structures.
Two main improvements in the load-balancing strategy:
The data structures: TFileStat, TFileNode and TSlaveStat are enriched + changed and TFileNode::Compare method is changed.
Definition at line 48 of file TPacketizerAdaptive.h.
Classes | |
class | TFileNode |
class | TFileStat |
class | TSlaveStat |
Public Member Functions | |
TPacketizerAdaptive (TDSet *dset, TList *slaves, Long64_t first, Long64_t num, TList *input, TProofProgressStatus *st) | |
Constructor. | |
~TPacketizerAdaptive () override | |
Destructor. | |
Int_t | AddProcessed (TSlave *sl, TProofProgressStatus *st, Double_t latency, TList **listOfMissingFiles=0) override |
To be used by GetNextPacket but also in reaction to kPROOF_STOPPROCESS message (when the worker was asked to stop processing during a packet). | |
Int_t | CalculatePacketSize (TObject *slstat, Long64_t cachesz, Int_t learnent) |
The result depends on the fStrategy. | |
Int_t | GetActiveWorkers () override |
Return the number of workers still processing. | |
Float_t | GetCurrentRate (Bool_t &all) override |
Get Estimation of the current rate; just summing the current rates of the active workers. | |
Int_t | GetEstEntriesProcessed (Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls) override |
Get estimation for the number of processed entries and bytes read at time t, based on the numbers already processed and the latests worker measured speeds. | |
TDSetElement * | GetNextPacket (TSlave *sl, TMessage *r) override |
Get next packet; A meaningfull difference to TPacketizer is the fact that this packetizer, for each worker, tries to predict whether the worker will finish processing it's local files before the end of the query. | |
void | MarkBad (TSlave *s, TProofProgressStatus *status, TList **missingFiles) override |
This method can be called at any time during processing as an effect of handling kPROOF_STOPPROCESS If the output list from this worker is going to be sent back to the master, the 'status' includes the number of entries processed by the slave. | |
Public Member Functions inherited from TVirtualPacketizer | |
~TVirtualPacketizer () override | |
Destructor. | |
virtual Int_t | AddWorkers (TList *workers) |
Adds new workers. | |
virtual Int_t | AssignWork (TDSet *, Long64_t, Long64_t) |
Long64_t | GetBytesRead () const |
TList * | GetConfigParams (Bool_t steal=kFALSE) |
Double_t | GetCumProcTime () const |
Long64_t | GetEntriesProcessed () const |
TList * | GetFailedPackets () |
Float_t | GetInitTime () const |
Float_t | GetProcTime () const |
TNtuple * | GetProgressPerf (Bool_t steal=kFALSE) |
Long64_t | GetReadCalls () const |
TMap * | GetSlaveStats () const |
TProofProgressStatus * | GetStatus () |
Long64_t | GetTotalEntries () const |
TClass * | IsA () const override |
Bool_t | IsValid () const |
void | SetFailedPackets (TList *list) |
virtual void | SetInitTime () |
Set the initialization time. | |
void | SetProgressStatus (TProofProgressStatus *st) |
void | SetTotalEntries (Long64_t ent) |
virtual void | StopProcess (Bool_t abort, Bool_t stoptimer=kFALSE) |
Stop process. | |
void | Streamer (TBuffer &) override |
Stream an object of class TObject. | |
void | StreamerNVirtual (TBuffer &ClassDef_StreamerNVirtual_b) |
Public Member Functions inherited from TObject | |
TObject () | |
TObject constructor. | |
TObject (const TObject &object) | |
TObject copy ctor. | |
virtual | ~TObject () |
TObject destructor. | |
void | AbstractMethod (const char *method) const |
Use this method to implement an "abstract" method that you don't want to leave purely abstract. | |
virtual void | AppendPad (Option_t *option="") |
Append graphics object to current pad. | |
virtual void | Browse (TBrowser *b) |
Browse object. May be overridden for another default action. | |
ULong_t | CheckedHash () |
Check and record whether this class has a consistent Hash/RecursiveRemove setup (*) and then return the regular Hash value for this object. | |
virtual const char * | ClassName () const |
Returns name of class to which the object belongs. | |
virtual void | Clear (Option_t *="") |
virtual TObject * | Clone (const char *newname="") const |
Make a clone of an object using the Streamer facility. | |
virtual Int_t | Compare (const TObject *obj) const |
Compare abstract method. | |
virtual void | Copy (TObject &object) const |
Copy this to obj. | |
virtual void | Delete (Option_t *option="") |
Delete this object. | |
virtual Int_t | DistancetoPrimitive (Int_t px, Int_t py) |
Computes distance from point (px,py) to the object. | |
virtual void | Draw (Option_t *option="") |
Default Draw method for all objects. | |
virtual void | DrawClass () const |
Draw class inheritance tree of the class to which this object belongs. | |
virtual TObject * | DrawClone (Option_t *option="") const |
Draw a clone of this object in the current selected pad with: gROOT->SetSelectedPad(c1) . | |
virtual void | Dump () const |
Dump contents of object on stdout. | |
virtual void | Error (const char *method, const char *msgfmt,...) const |
Issue error message. | |
virtual void | Execute (const char *method, const char *params, Int_t *error=nullptr) |
Execute method on this object with the given parameter string, e.g. | |
virtual void | Execute (TMethod *method, TObjArray *params, Int_t *error=nullptr) |
Execute method on this object with parameters stored in the TObjArray. | |
virtual void | ExecuteEvent (Int_t event, Int_t px, Int_t py) |
Execute action corresponding to an event at (px,py). | |
virtual void | Fatal (const char *method, const char *msgfmt,...) const |
Issue fatal error message. | |
virtual TObject * | FindObject (const char *name) const |
Must be redefined in derived classes. | |
virtual TObject * | FindObject (const TObject *obj) const |
Must be redefined in derived classes. | |
virtual Option_t * | GetDrawOption () const |
Get option used by the graphics system to draw this object. | |
virtual const char * | GetIconName () const |
Returns mime type name of object. | |
virtual const char * | GetName () const |
Returns name of object. | |
virtual char * | GetObjectInfo (Int_t px, Int_t py) const |
Returns string containing info about the object at position (px,py). | |
virtual Option_t * | GetOption () const |
virtual const char * | GetTitle () const |
Returns title of object. | |
virtual UInt_t | GetUniqueID () const |
Return the unique object id. | |
virtual ULong_t | Hash () const |
Return hash value for this object. | |
Bool_t | HasInconsistentHash () const |
Return true is the type of this object is known to have an inconsistent setup for Hash and RecursiveRemove (i.e. | |
virtual void | Info (const char *method, const char *msgfmt,...) const |
Issue info message. | |
virtual Bool_t | InheritsFrom (const char *classname) const |
Returns kTRUE if object inherits from class "classname". | |
virtual Bool_t | InheritsFrom (const TClass *cl) const |
Returns kTRUE if object inherits from TClass cl. | |
virtual void | Inspect () const |
Dump contents of this object in a graphics canvas. | |
void | InvertBit (UInt_t f) |
Bool_t | IsDestructed () const |
IsDestructed. | |
virtual Bool_t | IsEqual (const TObject *obj) const |
Default equal comparison (objects are equal if they have the same address in memory). | |
virtual Bool_t | IsFolder () const |
Returns kTRUE in case object contains browsable objects (like containers or lists of other objects). | |
R__ALWAYS_INLINE Bool_t | IsOnHeap () const |
virtual Bool_t | IsSortable () const |
R__ALWAYS_INLINE Bool_t | IsZombie () const |
virtual void | ls (Option_t *option="") const |
The ls function lists the contents of a class on stdout. | |
void | MayNotUse (const char *method) const |
Use this method to signal that a method (defined in a base class) may not be called in a derived class (in principle against good design since a child class should not provide less functionality than its parent, however, sometimes it is necessary). | |
virtual Bool_t | Notify () |
This method must be overridden to handle object notification (the base implementation is no-op). | |
void | Obsolete (const char *method, const char *asOfVers, const char *removedFromVers) const |
Use this method to declare a method obsolete. | |
void | operator delete (void *ptr) |
Operator delete. | |
void | operator delete (void *ptr, void *vp) |
Only called by placement new when throwing an exception. | |
void | operator delete[] (void *ptr) |
Operator delete []. | |
void | operator delete[] (void *ptr, void *vp) |
Only called by placement new[] when throwing an exception. | |
void * | operator new (size_t sz) |
void * | operator new (size_t sz, void *vp) |
void * | operator new[] (size_t sz) |
void * | operator new[] (size_t sz, void *vp) |
TObject & | operator= (const TObject &rhs) |
TObject assignment operator. | |
virtual void | Paint (Option_t *option="") |
This method must be overridden if a class wants to paint itself. | |
virtual void | Pop () |
Pop on object drawn in a pad to the top of the display list. | |
virtual void | Print (Option_t *option="") const |
This method must be overridden when a class wants to print itself. | |
virtual Int_t | Read (const char *name) |
Read contents of object with specified name from the current directory. | |
virtual void | RecursiveRemove (TObject *obj) |
Recursively remove this object from a list. | |
void | ResetBit (UInt_t f) |
virtual void | SaveAs (const char *filename="", Option_t *option="") const |
Save this object in the file specified by filename. | |
virtual void | SavePrimitive (std::ostream &out, Option_t *option="") |
Save a primitive as a C++ statement(s) on output stream "out". | |
void | SetBit (UInt_t f) |
void | SetBit (UInt_t f, Bool_t set) |
Set or unset the user status bits as specified in f. | |
virtual void | SetDrawOption (Option_t *option="") |
Set drawing option for object. | |
virtual void | SetUniqueID (UInt_t uid) |
Set the unique object id. | |
void | StreamerNVirtual (TBuffer &ClassDef_StreamerNVirtual_b) |
virtual void | SysError (const char *method, const char *msgfmt,...) const |
Issue system error message. | |
R__ALWAYS_INLINE Bool_t | TestBit (UInt_t f) const |
Int_t | TestBits (UInt_t f) const |
virtual void | UseCurrentStyle () |
Set current style settings in this object This function is called when either TCanvas::UseCurrentStyle or TROOT::ForceStyle have been invoked. | |
virtual void | Warning (const char *method, const char *msgfmt,...) const |
Issue warning message. | |
virtual Int_t | Write (const char *name=nullptr, Int_t option=0, Int_t bufsize=0) |
Write this object to the current directory. | |
virtual Int_t | Write (const char *name=nullptr, Int_t option=0, Int_t bufsize=0) const |
Write this object to the current directory. | |
Private Member Functions | |
TPacketizerAdaptive () | |
TPacketizerAdaptive (const TPacketizerAdaptive &) | |
TFileStat * | GetNextActive () |
Get next active file. | |
TFileStat * | GetNextUnAlloc (TFileNode *node=0, const char *nodeHostName=0) |
Get next unallocated file from 'node' or other nodes: First try 'node'. | |
void | InitStats () |
(re)initialise the statistics called at the begining or after a worker dies. | |
TFileNode * | NextActiveNode () |
Get next active node. | |
TFileNode * | NextNode () |
Get next node which has unallocated files. | |
void | operator= (const TPacketizerAdaptive &) |
Int_t | ReassignPacket (TDSetElement *e, TList **listOfMissingFiles) |
The file in the listOfMissingFiles can appear several times; in order to fix that, a TDSetElement::Merge method is needed. | |
void | RemoveActive (TFileStat *file) |
Remove file from the list of actives. | |
void | RemoveActiveNode (TFileNode *) |
Remove node from the list of actives. | |
void | RemoveUnAllocNode (TFileNode *) |
Remove unallocated node. | |
void | Reset () |
Reset the internal data structure for packet distribution. | |
void | SplitPerHost (TList *elements, TList **listOfMissingFiles) |
Split into per host entries The files in the listOfMissingFiles can appear several times; in order to fix that, a TDSetElement::Merge method is needed. | |
void | ValidateFiles (TDSet *dset, TList *slaves, Long64_t maxent=-1, Bool_t byfile=kFALSE) |
Check existence of file/dir/tree an get number of entries. | |
Additional Inherited Members | |
Public Types inherited from TVirtualPacketizer | |
enum | EStatusBits { kIsInitializing = (1ULL << ( 16 )) , kIsDone = (1ULL << ( 17 )) , kIsTree = (1ULL << ( 18 )) } |
Public Types inherited from TObject | |
enum | { kIsOnHeap = 0x01000000 , kNotDeleted = 0x02000000 , kZombie = 0x04000000 , kInconsistent = 0x08000000 , kBitMask = 0x00ffffff } |
enum | { kSingleKey = (1ULL << ( 0 )) , kOverwrite = (1ULL << ( 1 )) , kWriteDelete = (1ULL << ( 2 )) } |
enum | EDeprecatedStatusBits { kObjInCanvas = (1ULL << ( 3 )) } |
enum | EStatusBits { kCanDelete = (1ULL << ( 0 )) , kMustCleanup = (1ULL << ( 3 )) , kIsReferenced = (1ULL << ( 4 )) , kHasUUID = (1ULL << ( 5 )) , kCannotPick = (1ULL << ( 6 )) , kNoContextMenu = (1ULL << ( 8 )) , kInvalidObject = (1ULL << ( 13 )) } |
Static Public Member Functions inherited from TVirtualPacketizer | |
static TClass * | Class () |
static const char * | Class_Name () |
static constexpr Version_t | Class_Version () |
static const char * | DeclFileName () |
Static Public Member Functions inherited from TObject | |
static TClass * | Class () |
static const char * | Class_Name () |
static constexpr Version_t | Class_Version () |
static const char * | DeclFileName () |
static Longptr_t | GetDtorOnly () |
Return destructor only flag. | |
static Bool_t | GetObjectStat () |
Get status of object stat flag. | |
static void | SetDtorOnly (void *obj) |
Set destructor only flag. | |
static void | SetObjectStat (Bool_t stat) |
Turn on/off tracking of objects in the TObjectTable. | |
Protected Types inherited from TVirtualPacketizer | |
enum | EUseEstOpt { kEstOff = 0 , kEstCurrent = 1 , kEstAverage = 2 } |
Protected Types inherited from TObject | |
enum | { kOnlyPrepStep = (1ULL << ( 3 )) } |
Protected Member Functions inherited from TVirtualPacketizer | |
TVirtualPacketizer (const TVirtualPacketizer &) | |
TVirtualPacketizer (TList *input, TProofProgressStatus *st=0) | |
Constructor. | |
TDSetElement * | CreateNewPacket (TDSetElement *base, Long64_t first, Long64_t num) |
Creates a new TDSetElement from from base packet starting from the first entry with num entries. | |
Long64_t | GetEntries (Bool_t tree, TDSetElement *e) |
Get entries. | |
Bool_t | HandleTimer (TTimer *timer) override |
Send progress message to client. | |
void | operator= (const TVirtualPacketizer &) |
Protected Member Functions inherited from TObject | |
virtual void | DoError (int level, const char *location, const char *fmt, va_list va) const |
Interface to ErrorHandler (protected). | |
void | MakeZombie () |
Protected Attributes inherited from TVirtualPacketizer | |
Int_t | fActWrksLast |
Bool_t | fAWLastFill |
Long_t | fCircN |
TNtupleD * | fCircProg |
TList * | fConfigParams |
TString | fDataSet |
Float_t | fEffSessLast |
Float_t | fEvtRateLast |
TList * | fFailedPackets |
Float_t | fInitTime |
TList * | fInput |
Double_t | fMaxPacketTime |
Float_t | fMBsReadLast |
Double_t | fMinPacketTime |
Float_t | fProcTime |
Float_t | fProcTimeLast |
TTimer * | fProgress |
TNtuple * | fProgressPerf |
TProofProgressStatus * | fProgressStatus |
Float_t | fReportPeriod |
TMap * | fSlaveStats |
TTime | fStartTime |
Bool_t | fStop |
Float_t | fTimeUpdt |
Long64_t | fTotalEntries |
EUseEstOpt | fUseEstOpt |
Bool_t | fValid |
#include <TPacketizerAdaptive.h>
|
private |
|
private |
TPacketizerAdaptive::TPacketizerAdaptive | ( | TDSet * | dset, |
TList * | slaves, | ||
Long64_t | first, | ||
Long64_t | num, | ||
TList * | input, | ||
TProofProgressStatus * | st | ||
) |
Constructor.
Definition at line 445 of file TPacketizerAdaptive.cxx.
|
override |
Destructor.
Definition at line 878 of file TPacketizerAdaptive.cxx.
|
overridevirtual |
To be used by GetNextPacket but also in reaction to kPROOF_STOPPROCESS message (when the worker was asked to stop processing during a packet).
returns the #entries intended in the last packet - #processed entries
Reimplemented from TVirtualPacketizer.
Definition at line 1567 of file TPacketizerAdaptive.cxx.
Int_t TPacketizerAdaptive::CalculatePacketSize | ( | TObject * | slstat, |
Long64_t | cachesz, | ||
Int_t | learnent | ||
) |
The result depends on the fStrategy.
Definition at line 1477 of file TPacketizerAdaptive.cxx.
|
overridevirtual |
Return the number of workers still processing.
Reimplemented from TVirtualPacketizer.
Definition at line 1947 of file TPacketizerAdaptive.cxx.
Get Estimation of the current rate; just summing the current rates of the active workers.
Reimplemented from TVirtualPacketizer.
Definition at line 1964 of file TPacketizerAdaptive.cxx.
|
overridevirtual |
Get estimation for the number of processed entries and bytes read at time t, based on the numbers already processed and the latests worker measured speeds.
If t <= 0 the current time is used. Only the estimation for the entries is currently implemented. This is needed to smooth the instantaneous rate plot.
Reimplemented from TVirtualPacketizer.
Definition at line 1993 of file TPacketizerAdaptive.cxx.
|
private |
Get next active file.
Definition at line 1045 of file TPacketizerAdaptive.cxx.
|
overridevirtual |
Get next packet; A meaningfull difference to TPacketizer is the fact that this packetizer, for each worker, tries to predict whether the worker will finish processing it's local files before the end of the query.
If yes, it allocates, to those workers, files from non-slave filenodes or from slaves that are overloaded. The check is done every time a new file needs to be assigned.
Reimplemented from TVirtualPacketizer.
Definition at line 1664 of file TPacketizerAdaptive.cxx.
|
private |
Get next unallocated file from 'node' or other nodes: First try 'node'.
If there is no more files, keep trying to find an unallocated file on other nodes.
Definition at line 933 of file TPacketizerAdaptive.cxx.
|
private |
(re)initialise the statistics called at the begining or after a worker dies.
Definition at line 895 of file TPacketizerAdaptive.cxx.
|
overridevirtual |
This method can be called at any time during processing as an effect of handling kPROOF_STOPPROCESS If the output list from this worker is going to be sent back to the master, the 'status' includes the number of entries processed by the slave.
From this we calculate the remaining part of the packet. 0 indicates that the results from that worker were lost completely. Assume that the filenodes for which we have a TFileNode object are still up and running.
Reimplemented from TVirtualPacketizer.
Definition at line 2068 of file TPacketizerAdaptive.cxx.
|
private |
Get next active node.
Definition at line 1062 of file TPacketizerAdaptive.cxx.
|
private |
Get next node which has unallocated files.
the order is determined by TFileNode::Compare
Definition at line 1016 of file TPacketizerAdaptive.cxx.
|
private |
|
private |
The file in the listOfMissingFiles can appear several times; in order to fix that, a TDSetElement::Merge method is needed.
Definition at line 2128 of file TPacketizerAdaptive.cxx.
|
private |
Remove file from the list of actives.
Definition at line 1084 of file TPacketizerAdaptive.cxx.
|
private |
Remove node from the list of actives.
Definition at line 1095 of file TPacketizerAdaptive.cxx.
|
private |
Remove unallocated node.
Definition at line 1037 of file TPacketizerAdaptive.cxx.
|
private |
Reset the internal data structure for packet distribution.
Definition at line 1103 of file TPacketizerAdaptive.cxx.
Split into per host entries The files in the listOfMissingFiles can appear several times; in order to fix that, a TDSetElement::Merge method is needed.
Definition at line 2171 of file TPacketizerAdaptive.cxx.
|
private |
Check existence of file/dir/tree an get number of entries.
Assumes the files have been setup.
Definition at line 1152 of file TPacketizerAdaptive.cxx.
|
private |
Definition at line 58 of file TPacketizerAdaptive.h.
|
private |
Definition at line 70 of file TPacketizerAdaptive.h.
|
private |
Definition at line 64 of file TPacketizerAdaptive.h.
|
private |
Definition at line 56 of file TPacketizerAdaptive.h.
|
private |
Definition at line 62 of file TPacketizerAdaptive.h.
|
private |
Definition at line 72 of file TPacketizerAdaptive.h.
|
private |
Definition at line 67 of file TPacketizerAdaptive.h.
|
private |
Definition at line 65 of file TPacketizerAdaptive.h.
|
private |
Definition at line 59 of file TPacketizerAdaptive.h.
|
private |
Definition at line 74 of file TPacketizerAdaptive.h.
|
private |
Definition at line 68 of file TPacketizerAdaptive.h.
|
private |
Definition at line 76 of file TPacketizerAdaptive.h.
|
private |
Definition at line 60 of file TPacketizerAdaptive.h.
|
private |
Definition at line 82 of file TPacketizerAdaptive.h.
|
private |
Definition at line 83 of file TPacketizerAdaptive.h.
|
private |
Definition at line 57 of file TPacketizerAdaptive.h.