Logo ROOT  
Reference Guide
TVirtualPacketizer.cxx
Go to the documentation of this file.
1// @(#)root/proofplayer:$Id$
2// Author: Maarten Ballintijn 9/7/2002
3
4/*************************************************************************
5 * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/** \class TVirtualPacketizer
13\ingroup proofkernel
14
15The packetizer is a load balancing object created for each query.
16It generates packets to be processed on PROOF worker servers.
17A packet is an event range (begin entry and number of entries) or
18object range (first object and number of objects) in a TTree
19(entries) or a directory (objects) in a file.
20Packets are generated taking into account the performance of the
21remote machine, the time it took to process a previous packet on
22the remote machine, the locality of the database files, etc.
23
24TVirtualPacketizer includes common parts of PROOF packetizers.
25Look in subclasses for details.
26The default packetizer is TPacketizerAdaptive (TPacketizer for Proof-Lite).
27To use an alternative one, for instance - the TPacketizer, call:
28proof->SetParameter("PROOF_Packetizer", "TPacketizer");
29
30*/
31
32#include "TVirtualPacketizer.h"
33#include "TEnv.h"
34#include "TFile.h"
35#include "TTree.h"
36#include "TKey.h"
37#include "TDSet.h"
38#include "TError.h"
39#include "TEventList.h"
40#include "TEntryList.h"
41#include "TMap.h"
42#include "TMessage.h"
43#include "TObjString.h"
44#include "TParameter.h"
45
46#include "TProof.h"
47#include "TProofDebug.h"
48#include "TProofPlayer.h"
49#include "TProofServ.h"
50#include "TSlave.h"
51#include "TSocket.h"
52#include "TTimer.h"
53#include "TUrl.h"
54#include "TMath.h"
55#include "TMonitor.h"
56#include "TNtuple.h"
57#include "TNtupleD.h"
58#include "TPerfStats.h"
59
61
62////////////////////////////////////////////////////////////////////////////////
63/// Constructor.
64
66{
67 fInput = input;
68 // General configuration parameters
70 Double_t minPacketTime = 0;
71 if (TProof::GetParameter(input, "PROOF_MinPacketTime", minPacketTime) == 0) {
72 Info("TVirtualPacketizer", "setting minimum time for a packet to %f",
73 minPacketTime);
74 fMinPacketTime = (Int_t) minPacketTime;
75 }
76 fMaxPacketTime = 20;
77 Double_t maxPacketTime = 0;
78 if (TProof::GetParameter(input, "PROOF_MaxPacketTime", maxPacketTime) == 0) {
79 Info("TVirtualPacketizer", "setting maximum packet time for a packet to %f",
80 maxPacketTime);
81 fMaxPacketTime = (Int_t) maxPacketTime;
82 }
84
85 // Create the list to save them in the query result (each derived packetizer is
86 // responsible to update this coherently)
87 fConfigParams = new TList;
88 fConfigParams->SetName("PROOF_PacketizerConfigParams");
89 fConfigParams->Add(new TParameter<Double_t>("PROOF_MinPacketTime", fMinPacketTime));
90 fConfigParams->Add(new TParameter<Double_t>("PROOF_MaxPacketTime", fMaxPacketTime));
91
92 fProgressStatus = st;
93 if (!fProgressStatus) {
94 Error("TVirtualPacketizer", "No progress status");
95 return;
96 }
97 fTotalEntries = 0;
98 fValid = kTRUE;
99 fStop = kFALSE;
100 fFailedPackets = 0;
101 fDataSet = "";
102 fSlaveStats = 0;
103
104 // Performance monitoring
108 fInitTime = 0;
109 fProcTime = 0;
110 fTimeUpdt = -1.;
111
112 // Init circularity ntple for performance calculations
113 fCircProg = new TNtupleD("CircNtuple","Circular progress info","tm:ev:mb:rc:al");
114 fCircN = 5;
115 TProof::GetParameter(input, "PROOF_ProgressCircularity", fCircN);
118
119 // Check if we need to start the progress timer (multi-packetizers do not want
120 // timers from the packetizers they control ...). Also submasters do not need
121 // that (the progress timer is the one at the top master).
122 TString startProgress("yes");
123 TProof::GetParameter(input, "PROOF_StartProgressTimer", startProgress);
124 // If we are on a submaster, check if there is something else to do
125 if (gProofServ && gProofServ->IsMaster() && !gProofServ->IsTopMaster()) startProgress = "no";
126
127 // Init progress timer, if requested
128 // The timer is destroyed (and therefore stopped) by the relevant TPacketizer implementation
129 // in GetNextPacket when end of work is detected.
130 fProgress = 0;
131 if (startProgress == "yes") {
132 Long_t period = 500;
133 TProof::GetParameter(input, "PROOF_ProgressPeriod", period);
134 fProgress = new TTimer;
135 fProgress->SetObject(this);
136 fProgress->Start(period, kFALSE);
137 }
138
139 // Init ntple to store active workers vs processing time
140 fProgressPerf = 0;
141 TString saveProgressPerf("no");
142 if (TProof::GetParameter(input, "PROOF_SaveProgressPerf", saveProgressPerf) == 0) {
143 if (fProgress && saveProgressPerf == "yes")
144 fProgressPerf = new TNtuple("PROOF_ProgressPerfNtuple",
145 "{Active workers, evt rate, MB read} vs processing time", "tm:aw:er:mb:ns");
146 }
147 fProcTimeLast = -1.;
148 fActWrksLast = -1;
149 fEvtRateLast = -1.;
150 fMBsReadLast = -1.;
151 fEffSessLast = -1.;
153 fReportPeriod = -1.;
154
155 // Whether to send estimated values for the progress info
156 TString estopt;
157 if (TProof::GetParameter(input, "PROOF_RateEstimation", estopt) != 0 ||
158 estopt.IsNull()) {
159 // Parse option from the env
160 estopt = gEnv->GetValue("Proof.RateEstimation", "");
161 }
163 if (estopt == "current")
165 else if (estopt == "average")
167}
168
169////////////////////////////////////////////////////////////////////////////////
170/// Destructor.
171
173{
179 fProgressStatus = 0; // belongs to the player
180}
181
182////////////////////////////////////////////////////////////////////////////////
183/// Get entries.
184
186{
187 Long64_t entries;
188 TFile *file = TFile::Open(e->GetFileName());
189
190 if (!file || (file && file->IsZombie())) {
191 const char *emsg = (file) ? strerror(file->GetErrno()) : "<undef>";
192 Error("GetEntries","Cannot open file: %s (%s)", e->GetFileName(), emsg);
193 return -1;
194 }
195
196 TDirectory *dirsave = gDirectory;
197 if ( ! file->cd(e->GetDirectory()) ) {
198 Error("GetEntries","Cannot cd to: %s", e->GetDirectory() );
199 delete file;
200 return -1;
201 }
202 TDirectory *dir = gDirectory;
203 dirsave->cd();
204
205 if ( tree ) {
206 TKey *key = dir->GetKey(e->GetObjName());
207 if ( key == 0 ) {
208 Error("GetEntries","Cannot find tree \"%s\" in %s",
209 e->GetObjName(), e->GetFileName() );
210 delete file;
211 return -1;
212 }
213 TTree *t = (TTree *) key->ReadObj();
214 if ( t == 0 ) {
215 // Error always reported?
216 delete file;
217 return -1;
218 }
219 entries = (Long64_t) t->GetEntries();
220 delete t;
221
222 } else {
223 TList *keys = dir->GetListOfKeys();
224 entries = keys->GetSize();
225 }
226
227 delete file;
228
229 return entries;
230}
231
232////////////////////////////////////////////////////////////////////////////////
233/// Get next packet.
234
236{
237 AbstractMethod("GetNextPacket");
238 return 0;
239}
240
241////////////////////////////////////////////////////////////////////////////////
242/// Stop process.
243
245{
246 fStop = kTRUE;
247 if (stoptimer) HandleTimer(0);
248}
249
250////////////////////////////////////////////////////////////////////////////////
251/// Creates a new TDSetElement from from base packet starting from
252/// the first entry with num entries.
253/// The function returns a new created objects which have to be deleted.
254
257{
258 TDSetElement* elem = new TDSetElement(base->GetFileName(), base->GetObjName(),
259 base->GetDirectory(), first, num,
260 0, fDataSet.Data());
261
262 // create TDSetElements for all the friends of elem.
263 TList *friends = base->GetListOfFriends();
264 if (friends) {
265 TIter nxf(friends);
266 TDSetElement *fe = 0;
267 while ((fe = (TDSetElement *) nxf())) {
268 PDB(kLoop,2)
269 Info("CreateNewPacket", "friend: file '%s', obj:'%s'",
270 fe->GetFileName(), fe->GetObjName());
271 TDSetElement *xfe = new TDSetElement(fe->GetFileName(), fe->GetObjName(),
272 fe->GetDirectory(), first, num);
273 // The alias, if any, is in the element name options ('friend_alias=<alias>|')
274 elem->AddFriend(xfe, 0);
275 }
276 }
277
278 return elem;
279}
280
281////////////////////////////////////////////////////////////////////////////////
282/// Send progress message to client.
283
285{
286 PDB(kPacketizer,2)
287 Info("HandleTimer", "fProgress: %p, isDone: %d",
289
291 // Make sure that the timer is stopped
292 if (fProgress) fProgress->Stop();
293 return kFALSE;
294 }
295
296 // Prepare progress info
297 TTime tnow = gSystem->Now();
298 Float_t now = Long64_t(tnow - fStartTime) / (Float_t)1000.;
299 Long64_t estent = GetEntriesProcessed();
300 Long64_t estmb = GetBytesRead();
301 Long64_t estrc = GetReadCalls();
302
303 // Times and counters
304 Float_t evtrti = -1., mbrti = -1.;
306 // Initialization
307 fInitTime = now;
308 } else {
309 // Fill the reference as first
310 if (fCircProg->GetEntries() <= 0) {
311 fCircProg->Fill((Double_t)0., 0., 0., 0., 0.);
312 }
313 // Time between updates
314 fTimeUpdt = now - fProcTime;
315 // Update proc time
316 fProcTime = now - fInitTime;
317 // Get the last entry
318 Double_t *ar = fCircProg->GetArgs();
320 // The current rate
321 Bool_t all = kTRUE;
322 evtrti = GetCurrentRate(all);
323 Double_t xall = (all) ? 1. : 0.;
324 GetEstEntriesProcessed(0, estent, estmb, estrc);
325 if (estent >= fTotalEntries) {
326 estent = GetEntriesProcessed();
327 estmb = GetBytesRead();
328 estrc = GetReadCalls();
329 }
330 // Fill entry
331 Double_t evts = (Double_t) estent;
332 Double_t mbs = (estmb > 0) ? estmb / TMath::Power(2.,20.) : 0.; //--> MB
333 Double_t rcs = (Double_t) estrc;
334 fCircProg->Fill((Double_t)fProcTime, evts, mbs, rcs, xall);
336 if (all) {
337 Double_t dt = (Double_t)fProcTime - ar[0];
338 Long64_t de = (evts > ar[1]) ? (Long64_t) (evts - ar[1]) : 0;
339 Long64_t db = (mbs > ar[2]) ? (Long64_t) ((mbs - ar[2])*TMath::Power(2.,20.)) : 0;
340 if (gPerfStats)
341 gPerfStats->RateEvent((Double_t)fProcTime, dt, de, db);
342 // Get the last to spot the cache readings
343 Double_t rc = (Double_t)estrc - ar[3];
344 mbrti = (rc > 0 && mbs > ar[2]) ? (Float_t) (mbs - ar[2]) / rc : 0. ;
345 }
346 // Final report only once (to correctly determine the proc time)
349 PDB(kPacketizer,2)
350 Info("HandleTimer", "ent:%lld, bytes:%lld, proct:%f, evtrti:%f, mbrti:%f (%f,%f)",
351 estent, estmb, fProcTime, evtrti, mbrti, mbs, ar[2]);
352 }
353
354 if (gProofServ) {
355 // Message to be sent over
357 if (gProofServ->GetProtocol() > 25) {
358 Int_t actw = GetActiveWorkers();
361 if (fProgressPerf && estent > 0) {
362 // Estimated query time
363 if (fProcTime > 0.) {
364 fReportPeriod = (Float_t) fTotalEntries / (Double_t) estent * fProcTime / 100.;
365 if (fReportPeriod > 0. && fReportPeriod < 5.) fReportPeriod = 5.;
366 }
367
368 if (fProgressPerf->GetEntries() <= 0) {
369 // Fill the first entry
370 fProgressPerf->Fill(fProcTime, (Float_t)actw, -1., -1., -1.);
371 } else {
372 // Fill only if changed since last entry filled
373 Float_t *far = fProgressPerf->GetArgs();
375 Bool_t doReport = (fReportPeriod > 0. &&
376 (fProcTime - far[0]) >= fReportPeriod) ? kTRUE : kFALSE;
377 Float_t mbsread = estmb / 1024. / 1024.;
378 if (TMath::Abs((Float_t)actw - far[1]) > 0.1) {
379 if (fAWLastFill)
382 fProgressPerf->Fill(fProcTime, (Float_t)actw, evtrti, mbsread, effs);
384 } else if (doReport) {
385 fProgressPerf->Fill(fProcTime, (Float_t)actw, evtrti, mbsread, effs);
387 } else {
389 }
391 fActWrksLast = actw;
392 fEvtRateLast = evtrti;
393 fMBsReadLast = mbsread;
394 fEffSessLast = effs;
395 }
396 }
397 // Fill the message now
399 fProcTime, evtrti, mbrti, actw, acts, effs);
400 m << &pi;
401 } else if (gProofServ->GetProtocol() > 11) {
402 // Fill the message now
403 m << fTotalEntries << estent << estmb << fInitTime << fProcTime
404 << evtrti << mbrti;
405 } else {
406 // Old format
408 }
409 // send message to client;
411
412 } else {
413 if (gProof && gProof->GetPlayer()) {
414 // Log locally
415 gProof->GetPlayer()->Progress(fTotalEntries, estent, estmb,
416 fInitTime, fProcTime, evtrti, mbrti);
417 }
418 }
419
420 // Final report only once (to correctly determine the proc time)
423
424 return kFALSE; // ignored?
425}
426
427////////////////////////////////////////////////////////////////////////////////
428/// Set the initialization time
429
431{
435 PDB(kPacketizer,2)
436 Info("SetInitTime","fInitTime set to %f s", fInitTime);
437 }
438}
439
440////////////////////////////////////////////////////////////////////////////////
441/// Adds new workers. Must be implemented by each real packetizer properly.
442/// Returns the number of workers added, or -1 on failure.
443
445{
446 Warning("AddWorkers", "Not implemented for this packetizer");
447
448 return -1;
449}
@ kPROOF_PROGRESS
Definition: MessageTypes.h:61
#define SafeDelete(p)
Definition: RConfig.hxx:550
#define e(i)
Definition: RSha256.hxx:103
int Int_t
Definition: RtypesCore.h:41
const Bool_t kFALSE
Definition: RtypesCore.h:88
long Long_t
Definition: RtypesCore.h:50
bool Bool_t
Definition: RtypesCore.h:59
double Double_t
Definition: RtypesCore.h:55
long long Long64_t
Definition: RtypesCore.h:69
float Float_t
Definition: RtypesCore.h:53
const Bool_t kTRUE
Definition: RtypesCore.h:87
#define ClassImp(name)
Definition: Rtypes.h:365
#define gDirectory
Definition: TDirectory.h:223
R__EXTERN TEnv * gEnv
Definition: TEnv.h:171
#define PDB(mask, level)
Definition: TProofDebug.h:56
R__EXTERN TProofServ * gProofServ
Definition: TProofServ.h:347
R__EXTERN TProof * gProof
Definition: TProof.h:1077
R__EXTERN TSystem * gSystem
Definition: TSystem.h:560
#define gPerfStats
void SetName(const char *name)
Definition: TCollection.h:204
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Definition: TCollection.h:182
Manages an element of a TDSet.
Definition: TDSet.h:66
const char * GetObjName() const
Definition: TDSet.h:120
virtual TList * GetListOfFriends() const
Definition: TDSet.h:108
const char * GetDirectory() const
Return directory where to look for object.
Definition: TDSet.cxx:253
virtual void AddFriend(TDSetElement *friendElement, const char *alias)
Add friend TDSetElement to this set. The friend element will be copied to this object.
Definition: TDSet.cxx:376
const char * GetFileName() const
Definition: TDSet.h:111
Describe directory structure in memory.
Definition: TDirectory.h:34
virtual TKey * GetKey(const char *, Short_t=9999) const
Definition: TDirectory.h:158
virtual TList * GetListOfKeys() const
Definition: TDirectory.h:160
virtual Bool_t cd(const char *path=nullptr)
Change current directory to "this" directory.
Definition: TDirectory.cxx:497
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition: TEnv.cxx:491
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format.
Definition: TFile.h:48
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:3923
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition: TKey.h:24
virtual TObject * ReadObj()
To read a TObject* from the file.
Definition: TKey.cxx:729
A doubly linked list.
Definition: TList.h:44
virtual void Add(TObject *obj)
Definition: TList.h:87
A simple TTree restricted to a list of double variables only.
Definition: TNtupleD.h:28
Double_t * GetArgs() const
Definition: TNtupleD.h:53
virtual Int_t Fill()
Fill a Ntuple with current values in fArgs.
Definition: TNtupleD.cxx:149
A simple TTree restricted to a list of float variables only.
Definition: TNtuple.h:28
Float_t * GetArgs() const
Definition: TNtuple.h:56
virtual Int_t Fill()
Fill a Ntuple with current values in fArgs.
Definition: TNtuple.cxx:170
void AbstractMethod(const char *method) const
Use this method to implement an "abstract" method that you don't want to leave purely abstract.
Definition: TObject.cxx:922
R__ALWAYS_INLINE Bool_t TestBit(UInt_t f) const
Definition: TObject.h:172
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:866
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:694
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:880
void ResetBit(UInt_t f)
Definition: TObject.h:171
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:854
Named parameter, streamable and storable.
Definition: TParameter.h:37
Container class for processing statistics.
Int_t GetActSessions() const
Definition: TProofServ.h:263
Float_t GetEffSessions() const
Definition: TProofServ.h:264
TSocket * GetSocket() const
Definition: TProofServ.h:257
Bool_t IsMaster() const
Definition: TProofServ.h:293
Int_t GetProtocol() const
Definition: TProofServ.h:252
Bool_t IsTopMaster() const
Definition: TProofServ.h:295
TVirtualProofPlayer * GetPlayer() const
Definition: TProof.h:716
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition: TProof.cxx:9891
Class describing a PROOF worker server.
Definition: TSlave.h:46
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:521
Basic string class.
Definition: TString.h:131
const char * Data() const
Definition: TString.h:364
Bool_t IsNull() const
Definition: TString.h:402
virtual TTime Now()
Get current time in milliseconds since 0:00 Jan 1 1995.
Definition: TSystem.cxx:473
Basic time type with millisecond precision.
Definition: TTime.h:27
Handles synchronous and a-synchronous timer events.
Definition: TTimer.h:51
virtual void Start(Long_t milliSec=-1, Bool_t singleShot=kFALSE)
Starts the timer with a milliSec timeout.
Definition: TTimer.cxx:211
void SetObject(TObject *object)
Set the object to be notified at time out.
Definition: TTimer.cxx:184
virtual void Stop()
Definition: TTimer.h:93
A TTree represents a columnar dataset.
Definition: TTree.h:72
virtual void SetCircular(Long64_t maxEntries)
Enable/Disable circularity for this tree.
Definition: TTree.cxx:8629
virtual void SetDirectory(TDirectory *dir)
Change the tree's directory.
Definition: TTree.cxx:8703
virtual Long64_t GetEntries() const
Definition: TTree.h:450
virtual Int_t GetEntry(Long64_t entry=0, Int_t getall=0)
Read all branches of entry and return total number of bytes read.
Definition: TTree.cxx:5497
The packetizer is a load balancing object created for each query.
Long64_t GetEntries(Bool_t tree, TDSetElement *e)
Get entries.
virtual Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls)
virtual Int_t AddWorkers(TList *workers)
Adds new workers.
virtual ~TVirtualPacketizer()
Destructor.
TProofProgressStatus * fProgressStatus
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
Long64_t GetReadCalls() const
virtual Float_t GetCurrentRate(Bool_t &all)
Long64_t GetEntriesProcessed() const
virtual void StopProcess(Bool_t abort, Bool_t stoptimer=kFALSE)
Stop process.
virtual void SetInitTime()
Set the initialization time.
TVirtualPacketizer(TList *input, TProofProgressStatus *st=0)
Constructor.
virtual Int_t GetActiveWorkers()
Long64_t GetBytesRead() const
TDSetElement * CreateNewPacket(TDSetElement *base, Long64_t first, Long64_t num)
Creates a new TDSetElement from from base packet starting from the first entry with num entries.
virtual TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet.
virtual void Progress(Long64_t total, Long64_t processed)=0
static constexpr double pi
LongDouble_t Power(LongDouble_t x, LongDouble_t y)
Definition: TMath.h:725
Short_t Abs(Short_t d)
Definition: TMathBase.h:120
Definition: file.py:1
Definition: first.py:1
Definition: tree.py:1
auto * m
Definition: textangle.C:8