ROOT  6.06/09
Reference Guide
TVirtualPacketizer.cxx
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: Maarten Ballintijn 9/7/2002
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 //////////////////////////////////////////////////////////////////////////
13 // //
14 // TVirtualPacketizer //
15 // //
16 // The packetizer is a load balancing object created for each query. //
17 // It generates packets to be processed on PROOF worker servers. //
18 // A packet is an event range (begin entry and number of entries) or //
19 // object range (first object and number of objects) in a TTree //
20 // (entries) or a directory (objects) in a file. //
21 // Packets are generated taking into account the performance of the //
22 // remote machine, the time it took to process a previous packet on //
23 // the remote machine, the locality of the database files, etc. //
24 // //
25 // TVirtualPacketizer includes common parts of PROOF packetizers. //
26 // Look in subclasses for details. //
27 // The default packetizer is TPacketizerAdaptive. //
28 // To use an alternative one, for instance - the TPacketizer, call: //
29 // proof->SetParameter("PROOF_Packetizer", "TPacketizer"); //
30 // //
31 //////////////////////////////////////////////////////////////////////////
32 
33 
34 #include "TVirtualPacketizer.h"
35 #include "TEnv.h"
36 #include "TFile.h"
37 #include "TTree.h"
38 #include "TKey.h"
39 #include "TDSet.h"
40 #include "TError.h"
41 #include "TEventList.h"
42 #include "TEntryList.h"
43 #include "TMap.h"
44 #include "TMessage.h"
45 #include "TObjString.h"
46 #include "TParameter.h"
47 
48 #include "TProof.h"
49 #include "TProofDebug.h"
50 #include "TProofPlayer.h"
51 #include "TProofServ.h"
52 #include "TSlave.h"
53 #include "TSocket.h"
54 #include "TTimer.h"
55 #include "TUrl.h"
56 #include "TMath.h"
57 #include "TMonitor.h"
58 #include "TNtuple.h"
59 #include "TNtupleD.h"
60 #include "TPerfStats.h"
61 
63 
64 ////////////////////////////////////////////////////////////////////////////////
65 /// Constructor.
66 
68 {
69  fInput = input;
70  // General configuration parameters
71  fMinPacketTime = 3;
72  Double_t minPacketTime = 0;
73  if (TProof::GetParameter(input, "PROOF_MinPacketTime", minPacketTime) == 0) {
74  Info("TVirtualPacketizer", "setting minimum time for a packet to %f",
75  minPacketTime);
76  fMinPacketTime = (Int_t) minPacketTime;
77  }
78  fMaxPacketTime = 20;
79  Double_t maxPacketTime = 0;
80  if (TProof::GetParameter(input, "PROOF_MaxPacketTime", maxPacketTime) == 0) {
81  Info("TVirtualPacketizer", "setting maximum packet time for a packet to %f",
82  maxPacketTime);
83  fMaxPacketTime = (Int_t) maxPacketTime;
84  }
86 
87  // Create the list to save them in the query result (each derived packetizer is
88  // responsible to update this coherently)
89  fConfigParams = new TList;
90  fConfigParams->SetName("PROOF_PacketizerConfigParams");
91  fConfigParams->Add(new TParameter<Double_t>("PROOF_MinPacketTime", fMinPacketTime));
92  fConfigParams->Add(new TParameter<Double_t>("PROOF_MaxPacketTime", fMaxPacketTime));
93 
94  fProgressStatus = st;
95  if (!fProgressStatus) {
96  Error("TVirtualPacketizer", "No progress status");
97  return;
98  }
99  fTotalEntries = 0;
100  fValid = kTRUE;
101  fStop = kFALSE;
102  fFailedPackets = 0;
103  fDataSet = "";
104  fSlaveStats = 0;
105 
106  // Performance monitoring
107  fStartTime = gSystem->Now();
109  ResetBit(TVirtualPacketizer::kIsDone);
110  fInitTime = 0;
111  fProcTime = 0;
112  fTimeUpdt = -1.;
113 
114  // Init circularity ntple for performance calculations
115  fCircProg = new TNtupleD("CircNtuple","Circular progress info","tm:ev:mb:rc:al");
116  fCircN = 5;
117  TProof::GetParameter(input, "PROOF_ProgressCircularity", fCircN);
118  fCircProg->SetCircular(fCircN);
119  fCircProg->SetDirectory(0);
120 
121  // Check if we need to start the progress timer (multi-packetizers do not want
122  // timers from the packetizers they control ...). Also submasters do not need
123  // that (the progress timer is the one at the top master).
124  TString startProgress("yes");
125  TProof::GetParameter(input, "PROOF_StartProgressTimer", startProgress);
126  // If we are on a submaster, check if there is something else to do
127  if (gProofServ && gProofServ->IsMaster() && !gProofServ->IsTopMaster()) startProgress = "no";
128 
129  // Init progress timer, if requested
130  // The timer is destroyed (and therefore stopped) by the relevant TPacketizer implementation
131  // in GetNextPacket when end of work is detected.
132  fProgress = 0;
133  if (startProgress == "yes") {
134  Long_t period = 500;
135  TProof::GetParameter(input, "PROOF_ProgressPeriod", period);
136  fProgress = new TTimer;
137  fProgress->SetObject(this);
138  fProgress->Start(period, kFALSE);
139  }
140 
141  // Init ntple to store active workers vs processing time
142  fProgressPerf = 0;
143  TString saveProgressPerf("no");
144  if (TProof::GetParameter(input, "PROOF_SaveProgressPerf", saveProgressPerf) == 0) {
145  if (fProgress && saveProgressPerf == "yes")
146  fProgressPerf = new TNtuple("PROOF_ProgressPerfNtuple",
147  "{Active workers, evt rate, MB read} vs processing time", "tm:aw:er:mb:ns");
148  }
149  fProcTimeLast = -1.;
150  fActWrksLast = -1;
151  fEvtRateLast = -1.;
152  fMBsReadLast = -1.;
153  fEffSessLast = -1.;
154  fAWLastFill = kFALSE;
155  fReportPeriod = -1.;
156 
157  // Whether to send estimated values for the progress info
158  TString estopt;
159  if (TProof::GetParameter(input, "PROOF_RateEstimation", estopt) != 0 ||
160  estopt.IsNull()) {
161  // Parse option from the env
162  estopt = gEnv->GetValue("Proof.RateEstimation", "");
163  }
164  fUseEstOpt = kEstOff;
165  if (estopt == "current")
166  fUseEstOpt = kEstCurrent;
167  else if (estopt == "average")
168  fUseEstOpt = kEstAverage;
169 }
170 
171 ////////////////////////////////////////////////////////////////////////////////
172 /// Destructor.
173 
175 {
181  fProgressStatus = 0; // belongs to the player
182 }
183 
184 ////////////////////////////////////////////////////////////////////////////////
185 /// Get entries.
186 
188 {
189  Long64_t entries;
190  TFile *file = TFile::Open(e->GetFileName());
191 
192  if (!file || (file && file->IsZombie())) {
193  const char *emsg = (file) ? strerror(file->GetErrno()) : "<undef>";
194  Error("GetEntries","Cannot open file: %s (%s)", e->GetFileName(), emsg);
195  return -1;
196  }
197 
198  TDirectory *dirsave = gDirectory;
199  if ( ! file->cd(e->GetDirectory()) ) {
200  Error("GetEntries","Cannot cd to: %s", e->GetDirectory() );
201  delete file;
202  return -1;
203  }
204  TDirectory *dir = gDirectory;
205  dirsave->cd();
206 
207  if ( tree ) {
208  TKey *key = dir->GetKey(e->GetObjName());
209  if ( key == 0 ) {
210  Error("GetEntries","Cannot find tree \"%s\" in %s",
211  e->GetObjName(), e->GetFileName() );
212  delete file;
213  return -1;
214  }
215  TTree *t = (TTree *) key->ReadObj();
216  if ( t == 0 ) {
217  // Error always reported?
218  delete file;
219  return -1;
220  }
221  entries = (Long64_t) t->GetEntries();
222  delete t;
223 
224  } else {
225  TList *keys = dir->GetListOfKeys();
226  entries = keys->GetSize();
227  }
228 
229  delete file;
230 
231  return entries;
232 }
233 
234 ////////////////////////////////////////////////////////////////////////////////
235 /// Get next packet.
236 
238 {
239  AbstractMethod("GetNextPacket");
240  return 0;
241 }
242 
243 ////////////////////////////////////////////////////////////////////////////////
244 /// Stop process.
245 
247 {
248  fStop = kTRUE;
249  if (stoptimer) HandleTimer(0);
250 }
251 
252 ////////////////////////////////////////////////////////////////////////////////
253 /// Creates a new TDSetElement from from base packet starting from
254 /// the first entry with num entries.
255 /// The function returns a new created objects which have to be deleted.
256 
258  Long64_t first, Long64_t num)
259 {
260  TDSetElement* elem = new TDSetElement(base->GetFileName(), base->GetObjName(),
261  base->GetDirectory(), first, num,
262  0, fDataSet.Data());
263 
264  // create TDSetElements for all the friends of elem.
265  TList *friends = base->GetListOfFriends();
266  if (friends) {
267  TIter nxf(friends);
268  TDSetElement *fe = 0;
269  while ((fe = (TDSetElement *) nxf())) {
270  PDB(kLoop,2)
271  Info("CreateNewPacket", "friend: file '%s', obj:'%s'",
272  fe->GetFileName(), fe->GetObjName());
273  TDSetElement *xfe = new TDSetElement(fe->GetFileName(), fe->GetObjName(),
274  fe->GetDirectory(), first, num);
275  // The alias, if any, is in the element name options ('friend_alias=<alias>|')
276  elem->AddFriend(xfe, 0);
277  }
278  }
279 
280  return elem;
281 }
282 
283 ////////////////////////////////////////////////////////////////////////////////
284 /// Send progress message to client.
285 
287 {
288  PDB(kPacketizer,2)
289  Info("HandleTimer", "fProgress: %p, isDone: %d",
291 
293  // Make sure that the timer is stopped
294  if (fProgress) fProgress->Stop();
295  return kFALSE;
296  }
297 
298  // Prepare progress info
299  TTime tnow = gSystem->Now();
300  Float_t now = Long64_t(tnow - fStartTime) / (Float_t)1000.;
301  Long64_t estent = GetEntriesProcessed();
302  Long64_t estmb = GetBytesRead();
303  Long64_t estrc = GetReadCalls();
304 
305  // Times and counters
306  Float_t evtrti = -1., mbrti = -1.;
308  // Initialization
309  fInitTime = now;
310  } else {
311  // Fill the reference as first
312  if (fCircProg->GetEntries() <= 0) {
313  fCircProg->Fill((Double_t)0., 0., 0., 0., 0.);
314  }
315  // Time between updates
316  fTimeUpdt = now - fProcTime;
317  // Update proc time
318  fProcTime = now - fInitTime;
319  // Get the last entry
320  Double_t *ar = fCircProg->GetArgs();
322  // The current rate
323  Bool_t all = kTRUE;
324  evtrti = GetCurrentRate(all);
325  Double_t xall = (all) ? 1. : 0.;
326  GetEstEntriesProcessed(0, estent, estmb, estrc);
327  if (estent >= fTotalEntries) {
328  estent = GetEntriesProcessed();
329  estmb = GetBytesRead();
330  estrc = GetReadCalls();
331  }
332  // Fill entry
333  Double_t evts = (Double_t) estent;
334  Double_t mbs = (estmb > 0) ? estmb / TMath::Power(2.,20.) : 0.; //--> MB
335  Double_t rcs = (Double_t) estrc;
336  fCircProg->Fill((Double_t)fProcTime, evts, mbs, rcs, xall);
338  if (all) {
339  Double_t dt = (Double_t)fProcTime - ar[0];
340  Long64_t de = (evts > ar[1]) ? (Long64_t) (evts - ar[1]) : 0;
341  Long64_t db = (mbs > ar[2]) ? (Long64_t) ((mbs - ar[2])*TMath::Power(2.,20.)) : 0;
342  if (gPerfStats)
343  gPerfStats->RateEvent((Double_t)fProcTime, dt, de, db);
344  // Get the last to spot the cache readings
345  Double_t rc = (Double_t)estrc - ar[3];
346  mbrti = (rc > 0 && mbs > ar[2]) ? (Float_t) (mbs - ar[2]) / rc : 0. ;
347  }
348  // Final report only once (to correctly determine the proc time)
351  PDB(kPacketizer,2)
352  Info("HandleTimer", "ent:%lld, bytes:%lld, proct:%f, evtrti:%f, mbrti:%f (%f,%f)",
353  estent, estmb, fProcTime, evtrti, mbrti, mbs, ar[2]);
354  }
355 
356  if (gProofServ) {
357  // Message to be sent over
359  if (gProofServ->GetProtocol() > 25) {
360  Int_t actw = GetActiveWorkers();
361  Int_t acts = gProofServ->GetActSessions();
363  if (fProgressPerf && estent > 0) {
364  // Estimated query time
365  if (fProcTime > 0.) {
366  fReportPeriod = (Float_t) fTotalEntries / (Double_t) estent * fProcTime / 100.;
367  if (fReportPeriod > 0. && fReportPeriod < 5.) fReportPeriod = 5.;
368  }
369 
370  if (fProgressPerf->GetEntries() <= 0) {
371  // Fill the first entry
372  fProgressPerf->Fill(fProcTime, (Float_t)actw, -1., -1., -1.);
373  } else {
374  // Fill only if changed since last entry filled
375  Float_t *far = fProgressPerf->GetArgs();
377  Bool_t doReport = (fReportPeriod > 0. &&
378  (fProcTime - far[0]) >= fReportPeriod) ? kTRUE : kFALSE;
379  Float_t mbsread = estmb / 1024. / 1024.;
380  if (TMath::Abs((Float_t)actw - far[1]) > 0.1) {
381  if (fAWLastFill)
384  fProgressPerf->Fill(fProcTime, (Float_t)actw, evtrti, mbsread, effs);
386  } else if (doReport) {
387  fProgressPerf->Fill(fProcTime, (Float_t)actw, evtrti, mbsread, effs);
389  } else {
390  fAWLastFill = kTRUE;
391  }
393  fActWrksLast = actw;
394  fEvtRateLast = evtrti;
395  fMBsReadLast = mbsread;
396  fEffSessLast = effs;
397  }
398  }
399  // Fill the message now
401  fProcTime, evtrti, mbrti, actw, acts, effs);
402  m << &pi;
403  } else if (gProofServ->GetProtocol() > 11) {
404  // Fill the message now
405  m << fTotalEntries << estent << estmb << fInitTime << fProcTime
406  << evtrti << mbrti;
407  } else {
408  // Old format
410  }
411  // send message to client;
412  gProofServ->GetSocket()->Send(m);
413 
414  } else {
415  if (gProof && gProof->GetPlayer()) {
416  // Log locally
417  gProof->GetPlayer()->Progress(fTotalEntries, estent, estmb,
418  fInitTime, fProcTime, evtrti, mbrti);
419  }
420  }
421 
422  // Final report only once (to correctly determine the proc time)
425 
426  return kFALSE; // ignored?
427 }
428 
429 ////////////////////////////////////////////////////////////////////////////////
430 /// Set the initialization time
431 
433 {
435  fInitTime = Long64_t(gSystem->Now() - fStartTime) / (Float_t)1000.;
437  PDB(kPacketizer,2)
438  Info("SetInitTime","fInitTime set to %f s", fInitTime);
439  }
440 }
441 
442 ////////////////////////////////////////////////////////////////////////////////
443 /// Adds new workers. Must be implemented by each real packetizer properly.
444 /// Returns the number of workers added, or -1 on failure.
445 
447 {
448  Warning("AddWorkers", "Not implemented for this packetizer");
449 
450  return -1;
451 }
virtual Bool_t cd(const char *path=0)
Change current directory to "this" directory.
virtual Int_t Fill()
[fNvar] Array of variables
Definition: TNtupleD.cxx:147
virtual Float_t GetCurrentRate(Bool_t &all)
long long Long64_t
Definition: RtypesCore.h:69
virtual Int_t GetErrno() const
Method returning errno. Is overriden in TRFIOFile.
Definition: TFile.cxx:1180
A simple TTree restricted to a list of double variables only.
Definition: TNtupleD.h:30
virtual TList * GetListOfKeys() const
Definition: TDirectory.h:155
const double pi
float Float_t
Definition: RtypesCore.h:53
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:520
#define gDirectory
Definition: TDirectory.h:218
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:892
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format...
Definition: TFile.h:45
virtual Int_t GetEntry(Long64_t entry=0, Int_t getall=0)
Read all branches of entry and return total number of bytes read.
Definition: TTree.cxx:5163
Bool_t IsZombie() const
Definition: TObject.h:141
Basic string class.
Definition: TString.h:137
virtual void Progress(Long64_t total, Long64_t processed)=0
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
const Bool_t kFALSE
Definition: Rtypes.h:92
Basic time type with millisecond precision.
Definition: TTime.h:29
Int_t GetActSessions() const
Definition: TProofServ.h:275
virtual void SetInitTime()
Set the initialization time.
Short_t Abs(Short_t d)
Definition: TMathBase.h:110
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
LongDouble_t Power(LongDouble_t x, LongDouble_t y)
Definition: TMath.h:501
Long64_t GetEntriesProcessed() const
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:732
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=1, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:3851
const char * GetObjName() const
Definition: TDSet.h:122
virtual void StopProcess(Bool_t abort, Bool_t stoptimer=kFALSE)
Stop process.
const char * Data() const
Definition: TString.h:349
virtual TKey * GetKey(const char *, Short_t=9999) const
Definition: TDirectory.h:153
#define SafeDelete(p)
Definition: RConfig.h:436
TDSetElement * CreateNewPacket(TDSetElement *base, Long64_t first, Long64_t num)
Creates a new TDSetElement from from base packet starting from the first entry with num entries...
#define PDB(mask, level)
Definition: TProofDebug.h:58
Float_t GetEffSessions() const
Definition: TProofServ.h:276
Long64_t GetBytesRead() const
TVirtualProofPlayer * GetPlayer() const
Definition: TProof.h:751
Float_t * GetArgs() const
Definition: TNtuple.h:58
virtual Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls)
void Info(const char *location, const char *msgfmt,...)
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition: TKey.h:30
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:918
void Error(const char *location, const char *msgfmt,...)
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition: TProof.cxx:10485
TProofProgressStatus * fProgressStatus
A doubly linked list.
Definition: TList.h:47
Named parameter, streamable and storable.
Definition: TParameter.h:49
TSocket * GetSocket() const
Definition: TProofServ.h:269
virtual TList * GetListOfFriends() const
Definition: TDSet.h:110
virtual TTime Now()
Get current time in milliseconds since 0:00 Jan 1 1995.
Definition: TSystem.cxx:467
const char * GetFileName() const
Definition: TDSet.h:113
A simple TTree restricted to a list of float variables only.
Definition: TNtuple.h:30
R__EXTERN TSystem * gSystem
Definition: TSystem.h:549
Double_t * GetArgs() const
Definition: TNtupleD.h:55
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Definition: TEnv.cxx:494
Bool_t TestBit(UInt_t f) const
Definition: TObject.h:173
TMarker * m
Definition: textangle.C:8
void AbstractMethod(const char *method) const
Use this method to implement an "abstract" method that you don't want to leave purely abstract...
Definition: TObject.cxx:960
Handles synchronous and a-synchronous timer events.
Definition: TTimer.h:57
Long64_t GetReadCalls() const
Int_t GetProtocol() const
Definition: TProofServ.h:264
Bool_t IsNull() const
Definition: TString.h:387
void SetName(const char *name)
Definition: TCollection.h:116
virtual ~TVirtualPacketizer()
Destructor.
Long64_t GetEntries(Bool_t tree, TDSetElement *e)
Get entries.
#define gPerfStats
long Long_t
Definition: RtypesCore.h:50
R__EXTERN TProof * gProof
Definition: TProof.h:1110
virtual Int_t GetSize() const
Definition: TCollection.h:95
double Double_t
Definition: RtypesCore.h:55
ClassImp(TVirtualPacketizer) TVirtualPacketizer
Constructor.
Bool_t IsMaster() const
Definition: TProofServ.h:305
Describe directory structure in memory.
Definition: TDirectory.h:41
R__EXTERN TEnv * gEnv
Definition: TEnv.h:174
virtual Int_t Fill()
[fNvar] Array of variables
Definition: TNtuple.cxx:167
virtual TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet.
virtual void AddFriend(TDSetElement *friendElement, const char *alias)
Add friend TDSetElement to this set. The friend element will be copied to this object.
Definition: TDSet.cxx:379
virtual TObject * ReadObj()
To read a TObject* from the file.
Definition: TKey.cxx:727
virtual Bool_t cd(const char *path=0)
Change current directory to "this" directory.
Definition: TDirectory.cxx:433
R__EXTERN TProofServ * gProofServ
Definition: TProofServ.h:359
void SetObject(TObject *object)
Set the object to be notified at time out.
Definition: TTimer.cxx:182
Bool_t IsTopMaster() const
Definition: TProofServ.h:307
virtual Int_t GetActiveWorkers()
virtual Long64_t GetEntries() const
Definition: TTree.h:382
virtual void Stop()
Definition: TTimer.h:99
A TTree object has a header with a name and a title.
Definition: TTree.h:94
void ResetBit(UInt_t f)
Definition: TObject.h:172
Definition: TSlave.h:50
const Bool_t kTRUE
Definition: Rtypes.h:91
const char * GetDirectory() const
Return directory where to look for object.
Definition: TDSet.cxx:256
virtual Int_t AddWorkers(TList *workers)
Adds new workers.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:904