Logo ROOT   6.12/07
Reference Guide
TVirtualPacketizer.h
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: Maarten Ballintijn 9/7/2002
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TVirtualPacketizer
13 #define ROOT_TVirtualPacketizer
14 
15 //////////////////////////////////////////////////////////////////////////
16 // //
17 // TVirtualPacketizer //
18 // //
19 // Packetizer is a load balancing object created for each query. //
20 // It generates packets to be processed on PROOF worker servers. //
21 // A packet is an event range (begin entry and number of entries) or //
22 // object range (first object and number of objects) in a TTree //
23 // (entries) or a directory (objects) in a file. //
24 // Packets are generated taking into account the performance of the //
25 // remote machine, the time it took to process a previous packet on //
26 // the remote machine, the locality of the database files, etc. //
27 // //
28 // TVirtualPacketizer includes common parts of PROOF packetizers. //
29 // Look in subclasses for details. //
30 // The default packetizer is TPacketizerAdaptive. //
31 // To use an alternative one, for instance - the TPacketizer, call: //
32 // proof->SetParameter("PROOF_Packetizer", "TPacketizer"); //
33 // //
34 //////////////////////////////////////////////////////////////////////////
35 
36 #include "TObject.h"
37 #include "TSlave.h"
38 #include "TProofProgressStatus.h"
39 #include "TTime.h"
40 
41 
42 class TDSet;
43 class TDSetElement;
44 class TList;
45 class TMap;
46 class TMessage;
47 class TNtuple;
48 class TNtupleD;
49 class TProofProgressInfo;
50 class TSlave;
51 
52 
53 class TVirtualPacketizer : public TObject {
54 
55 public: // public because of Sun CC bug
56  class TVirtualSlaveStat;
57 
58 protected:
59  enum EUseEstOpt { // Option for usage of estimated values
60  kEstOff = 0,
63  };
64 
65  // General configuration parameters
66  Double_t fMinPacketTime; // minimum packet time
67  Double_t fMaxPacketTime; // maximum packet time
68  TList *fConfigParams; // List of configuration parameters
69 
70  TMap *fSlaveStats; // slave status, keyed by correspondig TSlave
71 
72  TProofProgressStatus *fProgressStatus; // pointer to status in the player.
73  TTimer *fProgress; // progress updates timer
74 
75  Long64_t fTotalEntries; // total number of entries to be distributed;
76  // not used in the progressive packetizer
77  TList *fFailedPackets;// a list of packets that failed while processing
78 
79  // Members for progress info
80  TTime fStartTime; // time offset
81  Float_t fInitTime; // time before processing
82  Float_t fProcTime; // time since start of processing
83  Float_t fTimeUpdt; // time between updates
84  TNtupleD *fCircProg; // Keeps circular info for "instantenous"
85  // rate calculations
86  Long_t fCircN; // Circularity
87 
88  TNtuple *fProgressPerf; // {Active workers, evt rate, MBs read} as a function of processing time
89  Float_t fProcTimeLast; // Time of the last measurement
90  Int_t fActWrksLast; // Active workers at fProcTimeLast
91  Float_t fEvtRateLast; // Evt rate at fProcTimeLast
92  Float_t fMBsReadLast; // MBs read at fProcTimeLast
93  Float_t fEffSessLast; // Number of effective sessions at fProcTimeLast
94  Bool_t fAWLastFill; // Whether to fill the last measurement
95  Float_t fReportPeriod; // Time between reports if nothing changes (estimated proc time / 100)
96 
97  EUseEstOpt fUseEstOpt; // Control usage of estimated values for the progress info
98 
99  Bool_t fValid; // Constructed properly?
100  Bool_t fStop; // Termination of Process() requested?
101 
102  TString fDataSet; // Name of the dataset being processed (for dataset-driven runs)
103 
104  TList *fInput; // Input list
105 
107  TVirtualPacketizer(const TVirtualPacketizer &); // no implementation, will generate
108  void operator=(const TVirtualPacketizer &); // error on accidental usage
109 
111  Long64_t GetEntries(Bool_t tree, TDSetElement *e); // Num of entries or objects
112  virtual Bool_t HandleTimer(TTimer *timer);
113 
114 public:
115  enum EStatusBits { kIsInitializing = BIT(16), kIsDone = BIT(17), kIsTree = BIT(18) };
116  virtual ~TVirtualPacketizer();
117 
118  virtual Int_t AssignWork(TDSet* /*dset*/, Long64_t /*first*/, Long64_t /*num*/) { return -1; }
119  Bool_t IsValid() const { return fValid; }
120  Long64_t GetEntriesProcessed() const { return (fProgressStatus? fProgressStatus->GetEntries() : 0); }
122  { ent = GetEntriesProcessed(); bytes = GetBytesRead(); calls = GetReadCalls(); return 0; }
123  virtual Float_t GetCurrentRate(Bool_t &all) { all = kTRUE; return (fProgressStatus? fProgressStatus->GetCurrentRate() : 0.); }
125  virtual TDSetElement *GetNextPacket(TSlave *sl, TMessage *r);
126  virtual void SetInitTime();
127  virtual void StopProcess(Bool_t abort, Bool_t stoptimer = kFALSE);
129  void SetFailedPackets(TList *list) { fFailedPackets = list; }
130  virtual Int_t AddWorkers(TList *workers);
131 
132  Long64_t GetBytesRead() const { return (fProgressStatus? fProgressStatus->GetBytesRead() : 0); }
133  Long64_t GetReadCalls() const { return (fProgressStatus? fProgressStatus->GetReadCalls() : 0); }
134  Double_t GetCumProcTime() const { return fProgressStatus->GetProcTime(); }
135  Float_t GetInitTime() const { return fInitTime; }
136  Float_t GetProcTime() const { return fProcTime; }
137  TNtuple *GetProgressPerf(Bool_t steal = kFALSE) { if (steal) { TNtuple *n = fProgressPerf; fProgressPerf = 0; return n;
138  } else { return fProgressPerf;} }
139  TList *GetConfigParams(Bool_t steal = kFALSE) { if (steal) { TList *l = fConfigParams; fConfigParams = 0; return l;
140  } else { return fConfigParams;} }
141  virtual void MarkBad(TSlave * /*s*/, TProofProgressStatus * /*status*/, TList ** /*missingFiles*/) { return; }
142  virtual Int_t AddProcessed(TSlave * /*sl*/, TProofProgressStatus * /*st*/,
143  Double_t /*lat*/, TList ** /*missingFiles*/) { return 0; }
145  void SetProgressStatus(TProofProgressStatus *st) { fProgressStatus = st; }
146  void SetTotalEntries(Long64_t ent) { fTotalEntries = ent; }
147 
148  TMap *GetSlaveStats() const { return fSlaveStats; }
149 
150  virtual Int_t GetActiveWorkers() { return -1; }
151 
152  ClassDef(TVirtualPacketizer,0) //Generate work packets for parallel processing
153 };
154 
155 //------------------------------------------------------------------------------
156 
158 
159 friend class TPacketizerAdaptive;
160 friend class TPacketizer;
161 
162 protected:
163  TString fWrkFQDN; // Worker FQDN
164  TSlave *fSlave; // corresponding TSlave record
165  TProofProgressStatus *fStatus; // status as of the last finished packet
166 
167 public:
168  const char *GetName() const { return fWrkFQDN.Data(); }
169  const char *GetOrdinal() const { return fSlave->GetOrdinal(); }
170  Long64_t GetEntriesProcessed() const { return fStatus?fStatus->GetEntries():-1; }
171  Double_t GetProcTime() const { return fStatus?fStatus->GetProcTime():-1; }
172  Float_t GetAvgRate() { return fStatus->GetRate(); }
175 };
176 
177 #endif
virtual Int_t AddProcessed(TSlave *, TProofProgressStatus *, Double_t, TList **)
TNtuple * GetProgressPerf(Bool_t steal=kFALSE)
virtual Float_t GetCurrentRate(Bool_t &all)
long long Long64_t
Definition: RtypesCore.h:69
virtual void MarkBad(TSlave *, TProofProgressStatus *, TList **)
A simple TTree restricted to a list of double variables only.
Definition: TNtupleD.h:28
float Float_t
Definition: RtypesCore.h:53
This class implements a data set to be used for PROOF processing.
Definition: TDSet.h:151
Long64_t GetReadCalls() const
#define BIT(n)
Definition: Rtypes.h:78
Long64_t GetBytesRead() const
Double_t GetProcTime() const
Float_t GetProcTime() const
const char * GetOrdinal() const
Definition: TSlave.h:131
Basic string class.
Definition: TString.h:125
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
Basic time type with millisecond precision.
Definition: TTime.h:27
Long64_t GetEntries() const
virtual void SetInitTime()
Set the initialization time.
Double_t GetCumProcTime() const
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
virtual void StopProcess(Bool_t abort, Bool_t stoptimer=kFALSE)
Stop process.
This class generates packets to be processed on PROOF worker servers.
Definition: TPacketizer.h:39
Manages an element of a TDSet.
Definition: TDSet.h:66
const char * GetName() const
Returns name of object.
TDSetElement * CreateNewPacket(TDSetElement *base, Long64_t first, Long64_t num)
Creates a new TDSetElement from from base packet starting from the first entry with num entries...
#define ClassDef(name, id)
Definition: Rtypes.h:320
void SetProgressStatus(TProofProgressStatus *st)
TVirtualPacketizer(TList *input, TProofProgressStatus *st=0)
Constructor.
TMap * GetSlaveStats() const
virtual Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls)
Double_t GetRate() const
TProofProgressStatus * fProgressStatus
This packetizer is based on TPacketizer but uses different load-balancing algorithms and data structu...
Long64_t GetTotalEntries() const
A doubly linked list.
Definition: TList.h:44
Double_t GetCurrentRate() const
Get current rate. Rteunr the average rate if the current is not defined.
void SetTotalEntries(Long64_t ent)
ROOT::R::TRInterface & r
Definition: Object.C:4
A simple TTree restricted to a list of float variables only.
Definition: TNtuple.h:28
virtual Int_t AssignWork(TDSet *, Long64_t, Long64_t)
Handles synchronous and a-synchronous timer events.
Definition: TTimer.h:51
virtual ~TVirtualPacketizer()
Destructor.
Long64_t GetEntries(Bool_t tree, TDSetElement *e)
Get entries.
const Bool_t kFALSE
Definition: RtypesCore.h:88
long Long_t
Definition: RtypesCore.h:50
The packetizer is a load balancing object created for each query.
double Double_t
Definition: RtypesCore.h:55
Long64_t GetEntriesProcessed() const
EStatusBits
Definition: TObject.h:57
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:40
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
Definition: TRolke.cxx:630
virtual TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet.
void SetFailedPackets(TList *list)
Bool_t IsValid() const
Long64_t GetReadCalls() const
Mother of all ROOT objects.
Definition: TObject.h:37
Long64_t GetBytesRead() const
auto * l
Definition: textangle.C:4
void operator=(const TVirtualPacketizer &)
virtual Int_t GetActiveWorkers()
Definition: tree.py:1
TProofProgressStatus * GetStatus()
Definition: first.py:1
TList * GetConfigParams(Bool_t steal=kFALSE)
TProofProgressStatus * GetProgressStatus()
Class describing a PROOF worker server.
Definition: TSlave.h:46
Container class for processing statistics.
const Bool_t kTRUE
Definition: RtypesCore.h:87
virtual TProofProgressStatus * AddProcessed(TProofProgressStatus *st)=0
const Int_t n
Definition: legend1.C:16
Float_t GetInitTime() const
const char * Data() const
Definition: TString.h:345
virtual Int_t AddWorkers(TList *workers)
Adds new workers.