Logo ROOT  
Reference Guide
TVirtualPacketizer.cxx
Go to the documentation of this file.
1// @(#)root/proofplayer:$Id$
2// Author: Maarten Ballintijn 9/7/2002
3
4/*************************************************************************
5 * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/** \class TVirtualPacketizer
13\ingroup proofkernel
14
15The packetizer is a load balancing object created for each query.
16It generates packets to be processed on PROOF worker servers.
17A packet is an event range (begin entry and number of entries) or
18object range (first object and number of objects) in a TTree
19(entries) or a directory (objects) in a file.
20Packets are generated taking into account the performance of the
21remote machine, the time it took to process a previous packet on
22the remote machine, the locality of the database files, etc.
23
24TVirtualPacketizer includes common parts of PROOF packetizers.
25Look in subclasses for details.
26The default packetizer is TPacketizerAdaptive (TPacketizer for Proof-Lite).
27To use an alternative one, for instance - the TPacketizer, call:
28proof->SetParameter("PROOF_Packetizer", "TPacketizer");
29
30*/
31
32#include "TVirtualPacketizer.h"
33#include "TEnv.h"
34#include "TFile.h"
35#include "TTree.h"
36#include "TKey.h"
37#include "TDSet.h"
38#include "TError.h"
39#include "TEventList.h"
40#include "TEntryList.h"
41#include "TMap.h"
42#include "TMessage.h"
43#include "TParameter.h"
44
45#include "TProof.h"
46#include "TProofDebug.h"
47#include "TVirtualProofPlayer.h"
48#include "TProofServ.h"
49#include "TSlave.h"
50#include "TSocket.h"
51#include "TTimer.h"
52#include "TUrl.h"
53#include "TMath.h"
54#include "TMonitor.h"
55#include "TNtuple.h"
56#include "TNtupleD.h"
57#include "TVirtualPerfStats.h"
58
60
61////////////////////////////////////////////////////////////////////////////////
62/// Constructor.
63
65{
66 fInput = input;
67 // General configuration parameters
69 Double_t minPacketTime = 0;
70 if (TProof::GetParameter(input, "PROOF_MinPacketTime", minPacketTime) == 0) {
71 Info("TVirtualPacketizer", "setting minimum time for a packet to %f",
72 minPacketTime);
73 fMinPacketTime = (Int_t) minPacketTime;
74 }
75 fMaxPacketTime = 20;
76 Double_t maxPacketTime = 0;
77 if (TProof::GetParameter(input, "PROOF_MaxPacketTime", maxPacketTime) == 0) {
78 Info("TVirtualPacketizer", "setting maximum packet time for a packet to %f",
79 maxPacketTime);
80 fMaxPacketTime = (Int_t) maxPacketTime;
81 }
83
84 // Create the list to save them in the query result (each derived packetizer is
85 // responsible to update this coherently)
86 fConfigParams = new TList;
87 fConfigParams->SetName("PROOF_PacketizerConfigParams");
88 fConfigParams->Add(new TParameter<Double_t>("PROOF_MinPacketTime", fMinPacketTime));
89 fConfigParams->Add(new TParameter<Double_t>("PROOF_MaxPacketTime", fMaxPacketTime));
90
91 fProgressStatus = st;
92 if (!fProgressStatus) {
93 Error("TVirtualPacketizer", "No progress status");
94 return;
95 }
96 fTotalEntries = 0;
97 fValid = kTRUE;
98 fStop = kFALSE;
100 fDataSet = "";
101 fSlaveStats = 0;
102
103 // Performance monitoring
107 fInitTime = 0;
108 fProcTime = 0;
109 fTimeUpdt = -1.;
110
111 // Init circularity ntple for performance calculations
112 fCircProg = new TNtupleD("CircNtuple","Circular progress info","tm:ev:mb:rc:al");
113 fCircN = 5;
114 TProof::GetParameter(input, "PROOF_ProgressCircularity", fCircN);
117
118 // Check if we need to start the progress timer (multi-packetizers do not want
119 // timers from the packetizers they control ...). Also submasters do not need
120 // that (the progress timer is the one at the top master).
121 TString startProgress("yes");
122 TProof::GetParameter(input, "PROOF_StartProgressTimer", startProgress);
123 // If we are on a submaster, check if there is something else to do
124 if (gProofServ && gProofServ->IsMaster() && !gProofServ->IsTopMaster()) startProgress = "no";
125
126 // Init progress timer, if requested
127 // The timer is destroyed (and therefore stopped) by the relevant TPacketizer implementation
128 // in GetNextPacket when end of work is detected.
129 fProgress = 0;
130 if (startProgress == "yes") {
131 Long_t period = 500;
132 TProof::GetParameter(input, "PROOF_ProgressPeriod", period);
133 fProgress = new TTimer;
134 fProgress->SetObject(this);
135 fProgress->Start(period, kFALSE);
136 }
137
138 // Init ntple to store active workers vs processing time
139 fProgressPerf = 0;
140 TString saveProgressPerf("no");
141 if (TProof::GetParameter(input, "PROOF_SaveProgressPerf", saveProgressPerf) == 0) {
142 if (fProgress && saveProgressPerf == "yes")
143 fProgressPerf = new TNtuple("PROOF_ProgressPerfNtuple",
144 "{Active workers, evt rate, MB read} vs processing time", "tm:aw:er:mb:ns");
145 }
146 fProcTimeLast = -1.;
147 fActWrksLast = -1;
148 fEvtRateLast = -1.;
149 fMBsReadLast = -1.;
150 fEffSessLast = -1.;
152 fReportPeriod = -1.;
153
154 // Whether to send estimated values for the progress info
155 TString estopt;
156 if (TProof::GetParameter(input, "PROOF_RateEstimation", estopt) != 0 ||
157 estopt.IsNull()) {
158 // Parse option from the env
159 estopt = gEnv->GetValue("Proof.RateEstimation", "");
160 }
162 if (estopt == "current")
164 else if (estopt == "average")
166}
167
168////////////////////////////////////////////////////////////////////////////////
169/// Destructor.
170
172{
178 fProgressStatus = 0; // belongs to the player
179}
180
181////////////////////////////////////////////////////////////////////////////////
182/// Get entries.
183
185{
186 Long64_t entries;
187 TFile *file = TFile::Open(e->GetFileName());
188
189 if (!file || (file && file->IsZombie())) {
190 const char *emsg = (file) ? strerror(file->GetErrno()) : "<undef>";
191 Error("GetEntries","Cannot open file: %s (%s)", e->GetFileName(), emsg);
192 return -1;
193 }
194
195 TDirectory *dirsave = gDirectory;
196 if ( ! file->cd(e->GetDirectory()) ) {
197 Error("GetEntries","Cannot cd to: %s", e->GetDirectory() );
198 delete file;
199 return -1;
200 }
201 TDirectory *dir = gDirectory;
202 dirsave->cd();
203
204 if ( tree ) {
205 TKey *key = dir->GetKey(e->GetObjName());
206 if ( key == 0 ) {
207 Error("GetEntries","Cannot find tree \"%s\" in %s",
208 e->GetObjName(), e->GetFileName() );
209 delete file;
210 return -1;
211 }
212 TTree *t = (TTree *) key->ReadObj();
213 if ( t == 0 ) {
214 // Error always reported?
215 delete file;
216 return -1;
217 }
218 entries = (Long64_t) t->GetEntries();
219 delete t;
220
221 } else {
222 TList *keys = dir->GetListOfKeys();
223 entries = keys->GetSize();
224 }
225
226 delete file;
227
228 return entries;
229}
230
231////////////////////////////////////////////////////////////////////////////////
232/// Get next packet.
233
235{
236 AbstractMethod("GetNextPacket");
237 return 0;
238}
239
240////////////////////////////////////////////////////////////////////////////////
241/// Stop process.
242
244{
245 fStop = kTRUE;
246 if (stoptimer) HandleTimer(0);
247}
248
249////////////////////////////////////////////////////////////////////////////////
250/// Creates a new TDSetElement from from base packet starting from
251/// the first entry with num entries.
252/// The function returns a new created objects which have to be deleted.
253
256{
257 TDSetElement* elem = new TDSetElement(base->GetFileName(), base->GetObjName(),
258 base->GetDirectory(), first, num,
259 0, fDataSet.Data());
260
261 // create TDSetElements for all the friends of elem.
262 TList *friends = base->GetListOfFriends();
263 if (friends) {
264 TIter nxf(friends);
265 TDSetElement *fe = 0;
266 while ((fe = (TDSetElement *) nxf())) {
267 PDB(kLoop,2)
268 Info("CreateNewPacket", "friend: file '%s', obj:'%s'",
269 fe->GetFileName(), fe->GetObjName());
270 TDSetElement *xfe = new TDSetElement(fe->GetFileName(), fe->GetObjName(),
271 fe->GetDirectory(), first, num);
272 // The alias, if any, is in the element name options ('friend_alias=<alias>|')
273 elem->AddFriend(xfe, 0);
274 }
275 }
276
277 return elem;
278}
279
280////////////////////////////////////////////////////////////////////////////////
281/// Send progress message to client.
282
284{
285 PDB(kPacketizer,2)
286 Info("HandleTimer", "fProgress: %p, isDone: %d",
288
290 // Make sure that the timer is stopped
291 if (fProgress) fProgress->Stop();
292 return kFALSE;
293 }
294
295 // Prepare progress info
296 TTime tnow = gSystem->Now();
297 Float_t now = Long64_t(tnow - fStartTime) / (Float_t)1000.;
298 Long64_t estent = GetEntriesProcessed();
299 Long64_t estmb = GetBytesRead();
300 Long64_t estrc = GetReadCalls();
301
302 // Times and counters
303 Float_t evtrti = -1., mbrti = -1.;
305 // Initialization
306 fInitTime = now;
307 } else {
308 // Fill the reference as first
309 if (fCircProg->GetEntries() <= 0) {
310 fCircProg->Fill((Double_t)0., 0., 0., 0., 0.);
311 }
312 // Time between updates
313 fTimeUpdt = now - fProcTime;
314 // Update proc time
315 fProcTime = now - fInitTime;
316 // Get the last entry
317 Double_t *ar = fCircProg->GetArgs();
319 // The current rate
320 Bool_t all = kTRUE;
321 evtrti = GetCurrentRate(all);
322 Double_t xall = (all) ? 1. : 0.;
323 GetEstEntriesProcessed(0, estent, estmb, estrc);
324 if (estent >= fTotalEntries) {
325 estent = GetEntriesProcessed();
326 estmb = GetBytesRead();
327 estrc = GetReadCalls();
328 }
329 // Fill entry
330 Double_t evts = (Double_t) estent;
331 Double_t mbs = (estmb > 0) ? estmb / TMath::Power(2.,20.) : 0.; //--> MB
332 Double_t rcs = (Double_t) estrc;
333 fCircProg->Fill((Double_t)fProcTime, evts, mbs, rcs, xall);
335 if (all) {
336 Double_t dt = (Double_t)fProcTime - ar[0];
337 Long64_t de = (evts > ar[1]) ? (Long64_t) (evts - ar[1]) : 0;
338 Long64_t db = (mbs > ar[2]) ? (Long64_t) ((mbs - ar[2])*TMath::Power(2.,20.)) : 0;
339 if (gPerfStats)
340 gPerfStats->RateEvent((Double_t)fProcTime, dt, de, db);
341 // Get the last to spot the cache readings
342 Double_t rc = (Double_t)estrc - ar[3];
343 mbrti = (rc > 0 && mbs > ar[2]) ? (Float_t) (mbs - ar[2]) / rc : 0. ;
344 }
345 // Final report only once (to correctly determine the proc time)
348 PDB(kPacketizer,2)
349 Info("HandleTimer", "ent:%lld, bytes:%lld, proct:%f, evtrti:%f, mbrti:%f (%f,%f)",
350 estent, estmb, fProcTime, evtrti, mbrti, mbs, ar[2]);
351 }
352
353 if (gProofServ) {
354 // Message to be sent over
356 if (gProofServ->GetProtocol() > 25) {
357 Int_t actw = GetActiveWorkers();
360 if (fProgressPerf && estent > 0) {
361 // Estimated query time
362 if (fProcTime > 0.) {
363 fReportPeriod = (Float_t) fTotalEntries / (Double_t) estent * fProcTime / 100.;
364 if (fReportPeriod > 0. && fReportPeriod < 5.) fReportPeriod = 5.;
365 }
366
367 if (fProgressPerf->GetEntries() <= 0) {
368 // Fill the first entry
369 fProgressPerf->Fill(fProcTime, (Float_t)actw, -1., -1., -1.);
370 } else {
371 // Fill only if changed since last entry filled
372 Float_t *far = fProgressPerf->GetArgs();
374 Bool_t doReport = (fReportPeriod > 0. &&
375 (fProcTime - far[0]) >= fReportPeriod) ? kTRUE : kFALSE;
376 Float_t mbsread = estmb / 1024. / 1024.;
377 if (TMath::Abs((Float_t)actw - far[1]) > 0.1) {
378 if (fAWLastFill)
381 fProgressPerf->Fill(fProcTime, (Float_t)actw, evtrti, mbsread, effs);
383 } else if (doReport) {
384 fProgressPerf->Fill(fProcTime, (Float_t)actw, evtrti, mbsread, effs);
386 } else {
388 }
390 fActWrksLast = actw;
391 fEvtRateLast = evtrti;
392 fMBsReadLast = mbsread;
393 fEffSessLast = effs;
394 }
395 }
396 // Fill the message now
398 fProcTime, evtrti, mbrti, actw, acts, effs);
399 m << &pi;
400 } else if (gProofServ->GetProtocol() > 11) {
401 // Fill the message now
402 m << fTotalEntries << estent << estmb << fInitTime << fProcTime
403 << evtrti << mbrti;
404 } else {
405 // Old format
407 }
408 // send message to client;
410
411 } else {
412 if (gProof && gProof->GetPlayer()) {
413 // Log locally
414 gProof->GetPlayer()->Progress(fTotalEntries, estent, estmb,
415 fInitTime, fProcTime, evtrti, mbrti);
416 }
417 }
418
419 // Final report only once (to correctly determine the proc time)
422
423 return kFALSE; // ignored?
424}
425
426////////////////////////////////////////////////////////////////////////////////
427/// Set the initialization time
428
430{
434 PDB(kPacketizer,2)
435 Info("SetInitTime","fInitTime set to %f s", fInitTime);
436 }
437}
438
439////////////////////////////////////////////////////////////////////////////////
440/// Adds new workers. Must be implemented by each real packetizer properly.
441/// Returns the number of workers added, or -1 on failure.
442
444{
445 Warning("AddWorkers", "Not implemented for this packetizer");
446
447 return -1;
448}
@ kPROOF_PROGRESS
Definition: MessageTypes.h:61
#define SafeDelete(p)
Definition: RConfig.hxx:543
#define e(i)
Definition: RSha256.hxx:103
int Int_t
Definition: RtypesCore.h:43
const Bool_t kFALSE
Definition: RtypesCore.h:90
long Long_t
Definition: RtypesCore.h:52
double Double_t
Definition: RtypesCore.h:57
long long Long64_t
Definition: RtypesCore.h:71
float Float_t
Definition: RtypesCore.h:55
const Bool_t kTRUE
Definition: RtypesCore.h:89
#define ClassImp(name)
Definition: Rtypes.h:361
#define gDirectory
Definition: TDirectory.h:229
R__EXTERN TEnv * gEnv
Definition: TEnv.h:171
#define PDB(mask, level)
Definition: TProofDebug.h:56
R__EXTERN TProofServ * gProofServ
Definition: TProofServ.h:347
R__EXTERN TProof * gProof
Definition: TProof.h:1077
R__EXTERN TSystem * gSystem
Definition: TSystem.h:556
#define gPerfStats
void SetName(const char *name)
Definition: TCollection.h:204
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Definition: TCollection.h:182
Manages an element of a TDSet.
Definition: TDSet.h:66
const char * GetObjName() const
Definition: TDSet.h:120
virtual TList * GetListOfFriends() const
Definition: TDSet.h:108
const char * GetDirectory() const
Return directory where to look for object.
Definition: TDSet.cxx:253
virtual void AddFriend(TDSetElement *friendElement, const char *alias)
Add friend TDSetElement to this set. The friend element will be copied to this object.
Definition: TDSet.cxx:376
const char * GetFileName() const
Definition: TDSet.h:111
Describe directory structure in memory.
Definition: TDirectory.h:40
virtual TKey * GetKey(const char *, Short_t=9999) const
Definition: TDirectory.h:164
virtual TList * GetListOfKeys() const
Definition: TDirectory.h:166
virtual Bool_t cd(const char *path=nullptr)
Change current directory to "this" directory.
Definition: TDirectory.cxx:498
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition: TEnv.cxx:491
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format.
Definition: TFile.h:53
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:3942
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition: TKey.h:28
virtual TObject * ReadObj()
To read a TObject* from the file.
Definition: TKey.cxx:738
A doubly linked list.
Definition: TList.h:44
virtual void Add(TObject *obj)
Definition: TList.h:87
A simple TTree restricted to a list of double variables only.
Definition: TNtupleD.h:28
Double_t * GetArgs() const
Definition: TNtupleD.h:53
virtual Int_t Fill()
Fill a Ntuple with current values in fArgs.
Definition: TNtupleD.cxx:150
A simple TTree restricted to a list of float variables only.
Definition: TNtuple.h:28
Float_t * GetArgs() const
Definition: TNtuple.h:56
virtual Int_t Fill()
Fill a Ntuple with current values in fArgs.
Definition: TNtuple.cxx:170
void AbstractMethod(const char *method) const
Use this method to implement an "abstract" method that you don't want to leave purely abstract.
Definition: TObject.cxx:933
R__ALWAYS_INLINE Bool_t TestBit(UInt_t f) const
Definition: TObject.h:187
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:877
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:694
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:891
void ResetBit(UInt_t f)
Definition: TObject.h:186
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:865
Named parameter, streamable and storable.
Definition: TParameter.h:37
Container class for processing statistics.
Int_t GetActSessions() const
Definition: TProofServ.h:263
Float_t GetEffSessions() const
Definition: TProofServ.h:264
TSocket * GetSocket() const
Definition: TProofServ.h:257
Bool_t IsMaster() const
Definition: TProofServ.h:293
Int_t GetProtocol() const
Definition: TProofServ.h:252
Bool_t IsTopMaster() const
Definition: TProofServ.h:295
TVirtualProofPlayer * GetPlayer() const
Definition: TProof.h:716
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition: TProof.cxx:9908
Class describing a PROOF worker server.
Definition: TSlave.h:46
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:522
Basic string class.
Definition: TString.h:131
const char * Data() const
Definition: TString.h:364
Bool_t IsNull() const
Definition: TString.h:402
virtual TTime Now()
Get current time in milliseconds since 0:00 Jan 1 1995.
Definition: TSystem.cxx:461
Basic time type with millisecond precision.
Definition: TTime.h:27
Handles synchronous and a-synchronous timer events.
Definition: TTimer.h:51
virtual void Start(Long_t milliSec=-1, Bool_t singleShot=kFALSE)
Starts the timer with a milliSec timeout.
Definition: TTimer.cxx:211
void SetObject(TObject *object)
Set the object to be notified at time out.
Definition: TTimer.cxx:184
virtual void Stop()
Definition: TTimer.h:93
A TTree represents a columnar dataset.
Definition: TTree.h:78
virtual void SetCircular(Long64_t maxEntries)
Enable/Disable circularity for this tree.
Definition: TTree.cxx:8739
virtual void SetDirectory(TDirectory *dir)
Change the tree's directory.
Definition: TTree.cxx:8813
virtual Long64_t GetEntries() const
Definition: TTree.h:457
virtual Int_t GetEntry(Long64_t entry=0, Int_t getall=0)
Read all branches of entry and return total number of bytes read.
Definition: TTree.cxx:5542
The packetizer is a load balancing object created for each query.
Long64_t GetEntries(Bool_t tree, TDSetElement *e)
Get entries.
virtual Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls)
virtual Int_t AddWorkers(TList *workers)
Adds new workers.
virtual ~TVirtualPacketizer()
Destructor.
TProofProgressStatus * fProgressStatus
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
Long64_t GetReadCalls() const
virtual Float_t GetCurrentRate(Bool_t &all)
Long64_t GetEntriesProcessed() const
virtual void StopProcess(Bool_t abort, Bool_t stoptimer=kFALSE)
Stop process.
virtual void SetInitTime()
Set the initialization time.
TVirtualPacketizer(TList *input, TProofProgressStatus *st=0)
Constructor.
virtual Int_t GetActiveWorkers()
Long64_t GetBytesRead() const
TDSetElement * CreateNewPacket(TDSetElement *base, Long64_t first, Long64_t num)
Creates a new TDSetElement from from base packet starting from the first entry with num entries.
virtual TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet.
virtual void Progress(Long64_t total, Long64_t processed)=0
static constexpr double pi
LongDouble_t Power(LongDouble_t x, LongDouble_t y)
Definition: TMath.h:725
Short_t Abs(Short_t d)
Definition: TMathBase.h:120
Definition: file.py:1
Definition: first.py:1
Definition: tree.py:1
auto * m
Definition: textangle.C:8