Logo ROOT   6.08/07
Reference Guide
TVirtualPacketizer.cxx
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: Maarten Ballintijn 9/7/2002
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /** \class TVirtualPacketizer
13 \ingroup proofkernel
14 
15 The packetizer is a load balancing object created for each query.
16 It generates packets to be processed on PROOF worker servers.
17 A packet is an event range (begin entry and number of entries) or
18 object range (first object and number of objects) in a TTree
19 (entries) or a directory (objects) in a file.
20 Packets are generated taking into account the performance of the
21 remote machine, the time it took to process a previous packet on
22 the remote machine, the locality of the database files, etc.
23 
24 TVirtualPacketizer includes common parts of PROOF packetizers.
25 Look in subclasses for details.
26 The default packetizer is TPacketizerAdaptive (TPacketizer for Proof-Lite).
27 To use an alternative one, for instance - the TPacketizer, call:
28 proof->SetParameter("PROOF_Packetizer", "TPacketizer");
29 
30 */
31 
32 #include "TVirtualPacketizer.h"
33 #include "TEnv.h"
34 #include "TFile.h"
35 #include "TTree.h"
36 #include "TKey.h"
37 #include "TDSet.h"
38 #include "TError.h"
39 #include "TEventList.h"
40 #include "TEntryList.h"
41 #include "TMap.h"
42 #include "TMessage.h"
43 #include "TObjString.h"
44 #include "TParameter.h"
45 
46 #include "TProof.h"
47 #include "TProofDebug.h"
48 #include "TProofPlayer.h"
49 #include "TProofServ.h"
50 #include "TSlave.h"
51 #include "TSocket.h"
52 #include "TTimer.h"
53 #include "TUrl.h"
54 #include "TMath.h"
55 #include "TMonitor.h"
56 #include "TNtuple.h"
57 #include "TNtupleD.h"
58 #include "TPerfStats.h"
59 
61 
62 ////////////////////////////////////////////////////////////////////////////////
63 /// Constructor.
64 
66 {
67  fInput = input;
68  // General configuration parameters
69  fMinPacketTime = 3;
70  Double_t minPacketTime = 0;
71  if (TProof::GetParameter(input, "PROOF_MinPacketTime", minPacketTime) == 0) {
72  Info("TVirtualPacketizer", "setting minimum time for a packet to %f",
73  minPacketTime);
74  fMinPacketTime = (Int_t) minPacketTime;
75  }
76  fMaxPacketTime = 20;
77  Double_t maxPacketTime = 0;
78  if (TProof::GetParameter(input, "PROOF_MaxPacketTime", maxPacketTime) == 0) {
79  Info("TVirtualPacketizer", "setting maximum packet time for a packet to %f",
80  maxPacketTime);
81  fMaxPacketTime = (Int_t) maxPacketTime;
82  }
84 
85  // Create the list to save them in the query result (each derived packetizer is
86  // responsible to update this coherently)
87  fConfigParams = new TList;
88  fConfigParams->SetName("PROOF_PacketizerConfigParams");
89  fConfigParams->Add(new TParameter<Double_t>("PROOF_MinPacketTime", fMinPacketTime));
90  fConfigParams->Add(new TParameter<Double_t>("PROOF_MaxPacketTime", fMaxPacketTime));
91 
92  fProgressStatus = st;
93  if (!fProgressStatus) {
94  Error("TVirtualPacketizer", "No progress status");
95  return;
96  }
97  fTotalEntries = 0;
98  fValid = kTRUE;
99  fStop = kFALSE;
100  fFailedPackets = 0;
101  fDataSet = "";
102  fSlaveStats = 0;
103 
104  // Performance monitoring
105  fStartTime = gSystem->Now();
107  ResetBit(TVirtualPacketizer::kIsDone);
108  fInitTime = 0;
109  fProcTime = 0;
110  fTimeUpdt = -1.;
111 
112  // Init circularity ntple for performance calculations
113  fCircProg = new TNtupleD("CircNtuple","Circular progress info","tm:ev:mb:rc:al");
114  fCircN = 5;
115  TProof::GetParameter(input, "PROOF_ProgressCircularity", fCircN);
116  fCircProg->SetCircular(fCircN);
117  fCircProg->SetDirectory(0);
118 
119  // Check if we need to start the progress timer (multi-packetizers do not want
120  // timers from the packetizers they control ...). Also submasters do not need
121  // that (the progress timer is the one at the top master).
122  TString startProgress("yes");
123  TProof::GetParameter(input, "PROOF_StartProgressTimer", startProgress);
124  // If we are on a submaster, check if there is something else to do
125  if (gProofServ && gProofServ->IsMaster() && !gProofServ->IsTopMaster()) startProgress = "no";
126 
127  // Init progress timer, if requested
128  // The timer is destroyed (and therefore stopped) by the relevant TPacketizer implementation
129  // in GetNextPacket when end of work is detected.
130  fProgress = 0;
131  if (startProgress == "yes") {
132  Long_t period = 500;
133  TProof::GetParameter(input, "PROOF_ProgressPeriod", period);
134  fProgress = new TTimer;
135  fProgress->SetObject(this);
136  fProgress->Start(period, kFALSE);
137  }
138 
139  // Init ntple to store active workers vs processing time
140  fProgressPerf = 0;
141  TString saveProgressPerf("no");
142  if (TProof::GetParameter(input, "PROOF_SaveProgressPerf", saveProgressPerf) == 0) {
143  if (fProgress && saveProgressPerf == "yes")
144  fProgressPerf = new TNtuple("PROOF_ProgressPerfNtuple",
145  "{Active workers, evt rate, MB read} vs processing time", "tm:aw:er:mb:ns");
146  }
147  fProcTimeLast = -1.;
148  fActWrksLast = -1;
149  fEvtRateLast = -1.;
150  fMBsReadLast = -1.;
151  fEffSessLast = -1.;
152  fAWLastFill = kFALSE;
153  fReportPeriod = -1.;
154 
155  // Whether to send estimated values for the progress info
156  TString estopt;
157  if (TProof::GetParameter(input, "PROOF_RateEstimation", estopt) != 0 ||
158  estopt.IsNull()) {
159  // Parse option from the env
160  estopt = gEnv->GetValue("Proof.RateEstimation", "");
161  }
162  fUseEstOpt = kEstOff;
163  if (estopt == "current")
164  fUseEstOpt = kEstCurrent;
165  else if (estopt == "average")
166  fUseEstOpt = kEstAverage;
167 }
168 
169 ////////////////////////////////////////////////////////////////////////////////
170 /// Destructor.
171 
173 {
179  fProgressStatus = 0; // belongs to the player
180 }
181 
182 ////////////////////////////////////////////////////////////////////////////////
183 /// Get entries.
184 
186 {
187  Long64_t entries;
189 
190  if (!file || (file && file->IsZombie())) {
191  const char *emsg = (file) ? strerror(file->GetErrno()) : "<undef>";
192  Error("GetEntries","Cannot open file: %s (%s)", e->GetFileName(), emsg);
193  return -1;
194  }
195 
196  TDirectory *dirsave = gDirectory;
197  if ( ! file->cd(e->GetDirectory()) ) {
198  Error("GetEntries","Cannot cd to: %s", e->GetDirectory() );
199  delete file;
200  return -1;
201  }
202  TDirectory *dir = gDirectory;
203  dirsave->cd();
204 
205  if ( tree ) {
206  TKey *key = dir->GetKey(e->GetObjName());
207  if ( key == 0 ) {
208  Error("GetEntries","Cannot find tree \"%s\" in %s",
209  e->GetObjName(), e->GetFileName() );
210  delete file;
211  return -1;
212  }
213  TTree *t = (TTree *) key->ReadObj();
214  if ( t == 0 ) {
215  // Error always reported?
216  delete file;
217  return -1;
218  }
219  entries = (Long64_t) t->GetEntries();
220  delete t;
221 
222  } else {
223  TList *keys = dir->GetListOfKeys();
224  entries = keys->GetSize();
225  }
226 
227  delete file;
228 
229  return entries;
230 }
231 
232 ////////////////////////////////////////////////////////////////////////////////
233 /// Get next packet.
234 
236 {
237  AbstractMethod("GetNextPacket");
238  return 0;
239 }
240 
241 ////////////////////////////////////////////////////////////////////////////////
242 /// Stop process.
243 
245 {
246  fStop = kTRUE;
247  if (stoptimer) HandleTimer(0);
248 }
249 
250 ////////////////////////////////////////////////////////////////////////////////
251 /// Creates a new TDSetElement from from base packet starting from
252 /// the first entry with num entries.
253 /// The function returns a new created objects which have to be deleted.
254 
256  Long64_t first, Long64_t num)
257 {
258  TDSetElement* elem = new TDSetElement(base->GetFileName(), base->GetObjName(),
259  base->GetDirectory(), first, num,
260  0, fDataSet.Data());
261 
262  // create TDSetElements for all the friends of elem.
263  TList *friends = base->GetListOfFriends();
264  if (friends) {
265  TIter nxf(friends);
266  TDSetElement *fe = 0;
267  while ((fe = (TDSetElement *) nxf())) {
268  PDB(kLoop,2)
269  Info("CreateNewPacket", "friend: file '%s', obj:'%s'",
270  fe->GetFileName(), fe->GetObjName());
271  TDSetElement *xfe = new TDSetElement(fe->GetFileName(), fe->GetObjName(),
272  fe->GetDirectory(), first, num);
273  // The alias, if any, is in the element name options ('friend_alias=<alias>|')
274  elem->AddFriend(xfe, 0);
275  }
276  }
277 
278  return elem;
279 }
280 
281 ////////////////////////////////////////////////////////////////////////////////
282 /// Send progress message to client.
283 
285 {
286  PDB(kPacketizer,2)
287  Info("HandleTimer", "fProgress: %p, isDone: %d",
289 
291  // Make sure that the timer is stopped
292  if (fProgress) fProgress->Stop();
293  return kFALSE;
294  }
295 
296  // Prepare progress info
297  TTime tnow = gSystem->Now();
298  Float_t now = Long64_t(tnow - fStartTime) / (Float_t)1000.;
299  Long64_t estent = GetEntriesProcessed();
300  Long64_t estmb = GetBytesRead();
301  Long64_t estrc = GetReadCalls();
302 
303  // Times and counters
304  Float_t evtrti = -1., mbrti = -1.;
306  // Initialization
307  fInitTime = now;
308  } else {
309  // Fill the reference as first
310  if (fCircProg->GetEntries() <= 0) {
311  fCircProg->Fill((Double_t)0., 0., 0., 0., 0.);
312  }
313  // Time between updates
314  fTimeUpdt = now - fProcTime;
315  // Update proc time
316  fProcTime = now - fInitTime;
317  // Get the last entry
318  Double_t *ar = fCircProg->GetArgs();
320  // The current rate
321  Bool_t all = kTRUE;
322  evtrti = GetCurrentRate(all);
323  Double_t xall = (all) ? 1. : 0.;
324  GetEstEntriesProcessed(0, estent, estmb, estrc);
325  if (estent >= fTotalEntries) {
326  estent = GetEntriesProcessed();
327  estmb = GetBytesRead();
328  estrc = GetReadCalls();
329  }
330  // Fill entry
331  Double_t evts = (Double_t) estent;
332  Double_t mbs = (estmb > 0) ? estmb / TMath::Power(2.,20.) : 0.; //--> MB
333  Double_t rcs = (Double_t) estrc;
334  fCircProg->Fill((Double_t)fProcTime, evts, mbs, rcs, xall);
336  if (all) {
337  Double_t dt = (Double_t)fProcTime - ar[0];
338  Long64_t de = (evts > ar[1]) ? (Long64_t) (evts - ar[1]) : 0;
339  Long64_t db = (mbs > ar[2]) ? (Long64_t) ((mbs - ar[2])*TMath::Power(2.,20.)) : 0;
340  if (gPerfStats)
341  gPerfStats->RateEvent((Double_t)fProcTime, dt, de, db);
342  // Get the last to spot the cache readings
343  Double_t rc = (Double_t)estrc - ar[3];
344  mbrti = (rc > 0 && mbs > ar[2]) ? (Float_t) (mbs - ar[2]) / rc : 0. ;
345  }
346  // Final report only once (to correctly determine the proc time)
349  PDB(kPacketizer,2)
350  Info("HandleTimer", "ent:%lld, bytes:%lld, proct:%f, evtrti:%f, mbrti:%f (%f,%f)",
351  estent, estmb, fProcTime, evtrti, mbrti, mbs, ar[2]);
352  }
353 
354  if (gProofServ) {
355  // Message to be sent over
357  if (gProofServ->GetProtocol() > 25) {
358  Int_t actw = GetActiveWorkers();
359  Int_t acts = gProofServ->GetActSessions();
361  if (fProgressPerf && estent > 0) {
362  // Estimated query time
363  if (fProcTime > 0.) {
364  fReportPeriod = (Float_t) fTotalEntries / (Double_t) estent * fProcTime / 100.;
365  if (fReportPeriod > 0. && fReportPeriod < 5.) fReportPeriod = 5.;
366  }
367 
368  if (fProgressPerf->GetEntries() <= 0) {
369  // Fill the first entry
370  fProgressPerf->Fill(fProcTime, (Float_t)actw, -1., -1., -1.);
371  } else {
372  // Fill only if changed since last entry filled
373  Float_t *far = fProgressPerf->GetArgs();
375  Bool_t doReport = (fReportPeriod > 0. &&
376  (fProcTime - far[0]) >= fReportPeriod) ? kTRUE : kFALSE;
377  Float_t mbsread = estmb / 1024. / 1024.;
378  if (TMath::Abs((Float_t)actw - far[1]) > 0.1) {
379  if (fAWLastFill)
382  fProgressPerf->Fill(fProcTime, (Float_t)actw, evtrti, mbsread, effs);
384  } else if (doReport) {
385  fProgressPerf->Fill(fProcTime, (Float_t)actw, evtrti, mbsread, effs);
387  } else {
388  fAWLastFill = kTRUE;
389  }
391  fActWrksLast = actw;
392  fEvtRateLast = evtrti;
393  fMBsReadLast = mbsread;
394  fEffSessLast = effs;
395  }
396  }
397  // Fill the message now
399  fProcTime, evtrti, mbrti, actw, acts, effs);
400  m << &pi;
401  } else if (gProofServ->GetProtocol() > 11) {
402  // Fill the message now
403  m << fTotalEntries << estent << estmb << fInitTime << fProcTime
404  << evtrti << mbrti;
405  } else {
406  // Old format
408  }
409  // send message to client;
410  gProofServ->GetSocket()->Send(m);
411 
412  } else {
413  if (gProof && gProof->GetPlayer()) {
414  // Log locally
415  gProof->GetPlayer()->Progress(fTotalEntries, estent, estmb,
416  fInitTime, fProcTime, evtrti, mbrti);
417  }
418  }
419 
420  // Final report only once (to correctly determine the proc time)
423 
424  return kFALSE; // ignored?
425 }
426 
427 ////////////////////////////////////////////////////////////////////////////////
428 /// Set the initialization time
429 
431 {
433  fInitTime = Long64_t(gSystem->Now() - fStartTime) / (Float_t)1000.;
435  PDB(kPacketizer,2)
436  Info("SetInitTime","fInitTime set to %f s", fInitTime);
437  }
438 }
439 
440 ////////////////////////////////////////////////////////////////////////////////
441 /// Adds new workers. Must be implemented by each real packetizer properly.
442 /// Returns the number of workers added, or -1 on failure.
443 
445 {
446  Warning("AddWorkers", "Not implemented for this packetizer");
447 
448  return -1;
449 }
virtual Bool_t cd(const char *path=0)
Change current directory to "this" directory.
virtual Int_t Fill()
Fill a Ntuple with current values in fArgs.
Definition: TNtupleD.cxx:149
virtual Float_t GetCurrentRate(Bool_t &all)
virtual TList * GetListOfKeys() const
Definition: TDirectory.h:158
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:899
long long Long64_t
Definition: RtypesCore.h:69
A simple TTree restricted to a list of double variables only.
Definition: TNtupleD.h:30
const double pi
float Float_t
Definition: RtypesCore.h:53
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:520
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition: TProof.cxx:9890
Bool_t TestBit(UInt_t f) const
Definition: TObject.h:157
Long64_t GetReadCalls() const
Long64_t GetBytesRead() const
TSocket * GetSocket() const
Definition: TProofServ.h:271
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format...
Definition: TFile.h:50
virtual Int_t GetEntry(Long64_t entry=0, Int_t getall=0)
Read all branches of entry and return total number of bytes read.
Definition: TTree.cxx:5211
Basic string class.
Definition: TString.h:137
virtual void Progress(Long64_t total, Long64_t processed)=0
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
const Bool_t kFALSE
Definition: Rtypes.h:92
Int_t GetProtocol() const
Definition: TProofServ.h:266
virtual TList * GetListOfFriends() const
Definition: TDSet.h:110
Basic time type with millisecond precision.
Definition: TTime.h:29
Bool_t IsTopMaster() const
Definition: TProofServ.h:309
virtual void SetInitTime()
Set the initialization time.
Short_t Abs(Short_t d)
Definition: TMathBase.h:110
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
LongDouble_t Power(LongDouble_t x, LongDouble_t y)
Definition: TMath.h:501
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:739
Double_t * GetArgs() const
Definition: TNtupleD.h:55
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=1, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:3907
virtual void StopProcess(Bool_t abort, Bool_t stoptimer=kFALSE)
Stop process.
virtual Int_t GetErrno() const
Method returning errno. Is overriden in TRFIOFile.
Definition: TFile.cxx:1186
Manages an element of a TDSet.
Definition: TDSet.h:68
#define SafeDelete(p)
Definition: RConfig.h:507
TDSetElement * CreateNewPacket(TDSetElement *base, Long64_t first, Long64_t num)
Creates a new TDSetElement from from base packet starting from the first entry with num entries...
#define PDB(mask, level)
Definition: TProofDebug.h:58
const char * GetDirectory() const
Return directory where to look for object.
Definition: TDSet.cxx:234
virtual Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls)
void Info(const char *location, const char *msgfmt,...)
Int_t GetActSessions() const
Definition: TProofServ.h:277
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition: TKey.h:30
void Error(const char *location, const char *msgfmt,...)
TProofProgressStatus * fProgressStatus
A doubly linked list.
Definition: TList.h:47
const char * GetObjName() const
Definition: TDSet.h:122
Named parameter, streamable and storable.
Definition: TParameter.h:49
virtual TTime Now()
Get current time in milliseconds since 0:00 Jan 1 1995.
Definition: TSystem.cxx:468
A simple TTree restricted to a list of float variables only.
Definition: TNtuple.h:30
R__EXTERN TSystem * gSystem
Definition: TSystem.h:549
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Definition: TEnv.cxx:496
TMarker * m
Definition: textangle.C:8
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:925
Handles synchronous and a-synchronous timer events.
Definition: TTimer.h:57
void SetName(const char *name)
Definition: TCollection.h:116
virtual ~TVirtualPacketizer()
Destructor.
Long64_t GetEntries(Bool_t tree, TDSetElement *e)
Get entries.
#define gPerfStats
Float_t * GetArgs() const
Definition: TNtuple.h:58
long Long_t
Definition: RtypesCore.h:50
Float_t GetEffSessions() const
Definition: TProofServ.h:278
The packetizer is a load balancing object created for each query.
R__EXTERN TProof * gProof
Definition: TProof.h:1107
#define ClassImp(name)
Definition: Rtypes.h:279
Bool_t IsZombie() const
Definition: TObject.h:120
TVirtualProofPlayer * GetPlayer() const
Definition: TProof.h:746
virtual TKey * GetKey(const char *, Short_t=9999) const
Definition: TDirectory.h:156
double Double_t
Definition: RtypesCore.h:55
Long64_t GetEntriesProcessed() const
Describe directory structure in memory.
Definition: TDirectory.h:44
R__EXTERN TEnv * gEnv
Definition: TEnv.h:174
virtual Int_t Fill()
Fill a Ntuple with current values in fArgs.
Definition: TNtuple.cxx:170
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
Definition: TRolke.cxx:630
virtual TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet.
const char * GetFileName() const
Definition: TDSet.h:113
virtual void AddFriend(TDSetElement *friendElement, const char *alias)
Add friend TDSetElement to this set. The friend element will be copied to this object.
Definition: TDSet.cxx:357
virtual Long64_t GetEntries() const
Definition: TTree.h:393
Bool_t IsNull() const
Definition: TString.h:387
virtual TObject * ReadObj()
To read a TObject* from the file.
Definition: TKey.cxx:730
virtual Bool_t cd(const char *path=0)
Change current directory to "this" directory.
Definition: TDirectory.cxx:435
R__EXTERN TProofServ * gProofServ
Definition: TProofServ.h:361
Definition: file.py:1
void SetObject(TObject *object)
Set the object to be notified at time out.
Definition: TTimer.cxx:184
virtual Int_t GetActiveWorkers()
Definition: tree.py:1
virtual void Stop()
Definition: TTimer.h:99
A TTree object has a header with a name and a title.
Definition: TTree.h:98
#define gDirectory
Definition: TDirectory.h:221
void ResetBit(UInt_t f)
Definition: TObject.h:156
Definition: first.py:1
virtual Int_t GetSize() const
Definition: TCollection.h:95
Class describing a PROOF worker server.
Definition: TSlave.h:50
Container class for processing statistics.
const Bool_t kTRUE
Definition: Rtypes.h:91
void AbstractMethod(const char *method) const
Use this method to implement an "abstract" method that you don&#39;t want to leave purely abstract...
Definition: TObject.cxx:967
Bool_t IsMaster() const
Definition: TProofServ.h:307
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:911
const char * Data() const
Definition: TString.h:349
virtual Int_t AddWorkers(TList *workers)
Adds new workers.