Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TVirtualPacketizer.cxx
Go to the documentation of this file.
1// @(#)root/proofplayer:$Id$
2// Author: Maarten Ballintijn 9/7/2002
3
4/*************************************************************************
5 * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/** \class TVirtualPacketizer
13\ingroup proofkernel
14
15The packetizer is a load balancing object created for each query.
16It generates packets to be processed on PROOF worker servers.
17A packet is an event range (begin entry and number of entries) or
18object range (first object and number of objects) in a TTree
19(entries) or a directory (objects) in a file.
20Packets are generated taking into account the performance of the
21remote machine, the time it took to process a previous packet on
22the remote machine, the locality of the database files, etc.
23
24TVirtualPacketizer includes common parts of PROOF packetizers.
25Look in subclasses for details.
26The default packetizer is TPacketizerAdaptive (TPacketizer for Proof-Lite).
27To use an alternative one, for instance - the TPacketizer, call:
28proof->SetParameter("PROOF_Packetizer", "TPacketizer");
29
30*/
31
32#include "TVirtualPacketizer.h"
33#include "TEnv.h"
34#include "TFile.h"
35#include "TTree.h"
36#include "TKey.h"
37#include "TDSet.h"
38#include "TError.h"
39#include "TEventList.h"
40#include "TEntryList.h"
41#include "TMap.h"
42#include "TMessage.h"
43#include "TParameter.h"
44
45#include "TProof.h"
46#include "TProofDebug.h"
47#include "TVirtualProofPlayer.h"
48#include "TProofServ.h"
49#include "TSlave.h"
50#include "TSocket.h"
51#include "TTimer.h"
52#include "TUrl.h"
53#include "TMath.h"
54#include "TMonitor.h"
55#include "TNtuple.h"
56#include "TNtupleD.h"
57#include "TVirtualPerfStats.h"
58
60
61////////////////////////////////////////////////////////////////////////////////
62/// Constructor.
63
65{
66 fInput = input;
67 // General configuration parameters
69 Double_t minPacketTime = 0;
70 if (TProof::GetParameter(input, "PROOF_MinPacketTime", minPacketTime) == 0) {
71 Info("TVirtualPacketizer", "setting minimum time for a packet to %f",
72 minPacketTime);
73 fMinPacketTime = (Int_t) minPacketTime;
74 }
75 fMaxPacketTime = 20;
76 Double_t maxPacketTime = 0;
77 if (TProof::GetParameter(input, "PROOF_MaxPacketTime", maxPacketTime) == 0) {
78 Info("TVirtualPacketizer", "setting maximum packet time for a packet to %f",
79 maxPacketTime);
80 fMaxPacketTime = (Int_t) maxPacketTime;
81 }
83
84 // Create the list to save them in the query result (each derived packetizer is
85 // responsible to update this coherently)
86 fConfigParams = new TList;
87 fConfigParams->SetName("PROOF_PacketizerConfigParams");
88 fConfigParams->Add(new TParameter<Double_t>("PROOF_MinPacketTime", fMinPacketTime));
89 fConfigParams->Add(new TParameter<Double_t>("PROOF_MaxPacketTime", fMaxPacketTime));
90
91 fProgressStatus = st;
92 if (!fProgressStatus) {
93 Error("TVirtualPacketizer", "No progress status");
94 return;
95 }
96 fTotalEntries = 0;
97 fValid = kTRUE;
98 fStop = kFALSE;
100 fDataSet = "";
101 fSlaveStats = 0;
102
103 // Performance monitoring
107 fInitTime = 0;
108 fProcTime = 0;
109 fTimeUpdt = -1.;
110
111 // Init circularity ntple for performance calculations
112 fCircProg = new TNtupleD("CircNtuple","Circular progress info","tm:ev:mb:rc:al");
113 fCircN = 5;
114 TProof::GetParameter(input, "PROOF_ProgressCircularity", fCircN);
117
118 // Check if we need to start the progress timer (multi-packetizers do not want
119 // timers from the packetizers they control ...). Also submasters do not need
120 // that (the progress timer is the one at the top master).
121 TString startProgress("yes");
122 TProof::GetParameter(input, "PROOF_StartProgressTimer", startProgress);
123 // If we are on a submaster, check if there is something else to do
124 if (gProofServ && gProofServ->IsMaster() && !gProofServ->IsTopMaster()) startProgress = "no";
125
126 // Init progress timer, if requested
127 // The timer is destroyed (and therefore stopped) by the relevant TPacketizer implementation
128 // in GetNextPacket when end of work is detected.
129 fProgress = 0;
130 if (startProgress == "yes") {
131 Long_t period = 500;
132 TProof::GetParameter(input, "PROOF_ProgressPeriod", period);
133 fProgress = new TTimer;
134 fProgress->SetObject(this);
135 fProgress->Start(period, kFALSE);
136 }
137
138 // Init ntple to store active workers vs processing time
139 fProgressPerf = 0;
140 TString saveProgressPerf("no");
141 if (TProof::GetParameter(input, "PROOF_SaveProgressPerf", saveProgressPerf) == 0) {
142 if (fProgress && saveProgressPerf == "yes")
143 fProgressPerf = new TNtuple("PROOF_ProgressPerfNtuple",
144 "{Active workers, evt rate, MB read} vs processing time", "tm:aw:er:mb:ns");
145 }
146 fProcTimeLast = -1.;
147 fActWrksLast = -1;
148 fEvtRateLast = -1.;
149 fMBsReadLast = -1.;
150 fEffSessLast = -1.;
152 fReportPeriod = -1.;
153
154 // Whether to send estimated values for the progress info
155 TString estopt;
156 if (TProof::GetParameter(input, "PROOF_RateEstimation", estopt) != 0 ||
157 estopt.IsNull()) {
158 // Parse option from the env
159 estopt = gEnv->GetValue("Proof.RateEstimation", "");
160 }
162 if (estopt == "current")
164 else if (estopt == "average")
166}
167
168////////////////////////////////////////////////////////////////////////////////
169/// Destructor.
170
172{
178 fProgressStatus = 0; // belongs to the player
179}
180
181////////////////////////////////////////////////////////////////////////////////
182/// Get entries.
183
185{
186 Long64_t entries;
187 TFile *file = TFile::Open(e->GetFileName());
188
189 if (!file || (file && file->IsZombie())) {
190 const char *emsg = (file) ? strerror(file->GetErrno()) : "<undef>";
191 Error("GetEntries","Cannot open file: %s (%s)", e->GetFileName(), emsg);
192 return -1;
193 }
194
195 TDirectory *dirsave = gDirectory;
196 if ( ! file->cd(e->GetDirectory()) ) {
197 Error("GetEntries","Cannot cd to: %s", e->GetDirectory() );
198 delete file;
199 return -1;
200 }
201 TDirectory *dir = gDirectory;
202 dirsave->cd();
203
204 if ( tree ) {
205 TKey *key = dir->GetKey(e->GetObjName());
206 if ( key == 0 ) {
207 Error("GetEntries","Cannot find tree \"%s\" in %s",
208 e->GetObjName(), e->GetFileName() );
209 delete file;
210 return -1;
211 }
212 TTree *t = (TTree *) key->ReadObj();
213 if ( t == 0 ) {
214 // Error always reported?
215 delete file;
216 return -1;
217 }
218 entries = (Long64_t) t->GetEntries();
219 delete t;
220
221 } else {
222 TList *keys = dir->GetListOfKeys();
223 entries = keys->GetSize();
224 }
225
226 delete file;
227
228 return entries;
229}
230
231////////////////////////////////////////////////////////////////////////////////
232/// Get next packet.
233
235{
236 AbstractMethod("GetNextPacket");
237 return 0;
238}
239
240////////////////////////////////////////////////////////////////////////////////
241/// Stop process.
242
244{
245 fStop = kTRUE;
246 if (stoptimer) HandleTimer(0);
247}
248
249////////////////////////////////////////////////////////////////////////////////
250/// Creates a new TDSetElement from from base packet starting from
251/// the first entry with num entries.
252/// The function returns a new created objects which have to be deleted.
253
255 Long64_t first, Long64_t num)
256{
257 TDSetElement* elem = new TDSetElement(base->GetFileName(), base->GetObjName(),
258 base->GetDirectory(), first, num,
259 0, fDataSet.Data());
260
261 // create TDSetElements for all the friends of elem.
262 TList *friends = base->GetListOfFriends();
263 if (friends) {
264 TIter nxf(friends);
265 TDSetElement *fe = 0;
266 while ((fe = (TDSetElement *) nxf())) {
267 PDB(kLoop,2)
268 Info("CreateNewPacket", "friend: file '%s', obj:'%s'",
269 fe->GetFileName(), fe->GetObjName());
270 TDSetElement *xfe = new TDSetElement(fe->GetFileName(), fe->GetObjName(),
271 fe->GetDirectory(), first, num);
272 // The alias, if any, is in the element name options ('friend_alias=<alias>|')
273 elem->AddFriend(xfe, 0);
274 }
275 }
276
277 return elem;
278}
279
280////////////////////////////////////////////////////////////////////////////////
281/// Send progress message to client.
282
284{
285 PDB(kPacketizer,2)
286 Info("HandleTimer", "fProgress: %p, isDone: %d",
288
290 // Make sure that the timer is stopped
291 if (fProgress) fProgress->Stop();
292 return kFALSE;
293 }
294
295 // Prepare progress info
296 TTime tnow = gSystem->Now();
297 Float_t now = Long64_t(tnow - fStartTime) / (Float_t)1000.;
298 Long64_t estent = GetEntriesProcessed();
299 Long64_t estmb = GetBytesRead();
300 Long64_t estrc = GetReadCalls();
301
302 // Times and counters
303 Float_t evtrti = -1., mbrti = -1.;
305 // Initialization
306 fInitTime = now;
307 } else {
308 // Fill the reference as first
309 if (fCircProg->GetEntries() <= 0) {
310 fCircProg->Fill((Double_t)0., 0., 0., 0., 0.);
311 }
312 // Time between updates
313 fTimeUpdt = now - fProcTime;
314 // Update proc time
315 fProcTime = now - fInitTime;
316 // Get the last entry
317 Double_t *ar = fCircProg->GetArgs();
319 // The current rate
320 Bool_t all = kTRUE;
321 evtrti = GetCurrentRate(all);
322 Double_t xall = (all) ? 1. : 0.;
323 GetEstEntriesProcessed(0, estent, estmb, estrc);
324 if (estent >= fTotalEntries) {
325 estent = GetEntriesProcessed();
326 estmb = GetBytesRead();
327 estrc = GetReadCalls();
328 }
329 // Fill entry
330 Double_t evts = (Double_t) estent;
331 Double_t mbs = (estmb > 0) ? estmb / TMath::Power(2.,20.) : 0.; //--> MB
332 Double_t rcs = (Double_t) estrc;
333 fCircProg->Fill((Double_t)fProcTime, evts, mbs, rcs, xall);
335 if (all) {
336 Double_t dt = (Double_t)fProcTime - ar[0];
337 Long64_t de = (evts > ar[1]) ? (Long64_t) (evts - ar[1]) : 0;
338 Long64_t db = (mbs > ar[2]) ? (Long64_t) ((mbs - ar[2])*TMath::Power(2.,20.)) : 0;
339 if (gPerfStats)
340 gPerfStats->RateEvent((Double_t)fProcTime, dt, de, db);
341 // Get the last to spot the cache readings
342 Double_t rc = (Double_t)estrc - ar[3];
343 mbrti = (rc > 0 && mbs > ar[2]) ? (Float_t) (mbs - ar[2]) / rc : 0. ;
344 }
345 // Final report only once (to correctly determine the proc time)
348 PDB(kPacketizer,2)
349 Info("HandleTimer", "ent:%lld, bytes:%lld, proct:%f, evtrti:%f, mbrti:%f (%f,%f)",
350 estent, estmb, fProcTime, evtrti, mbrti, mbs, ar[2]);
351 }
352
353 if (gProofServ) {
354 // Message to be sent over
356 if (gProofServ->GetProtocol() > 25) {
357 Int_t actw = GetActiveWorkers();
360 if (fProgressPerf && estent > 0) {
361 // Estimated query time
362 if (fProcTime > 0.) {
363 fReportPeriod = (Float_t) fTotalEntries / (Double_t) estent * fProcTime / 100.;
364 if (fReportPeriod > 0. && fReportPeriod < 5.) fReportPeriod = 5.;
365 }
366
367 if (fProgressPerf->GetEntries() <= 0) {
368 // Fill the first entry
369 fProgressPerf->Fill(fProcTime, (Float_t)actw, -1., -1., -1.);
370 } else {
371 // Fill only if changed since last entry filled
372 Float_t *far = fProgressPerf->GetArgs();
374 Bool_t doReport = (fReportPeriod > 0. &&
375 (fProcTime - far[0]) >= fReportPeriod) ? kTRUE : kFALSE;
376 Float_t mbsread = estmb / 1024. / 1024.;
377 if (TMath::Abs((Float_t)actw - far[1]) > 0.1) {
378 if (fAWLastFill)
381 fProgressPerf->Fill(fProcTime, (Float_t)actw, evtrti, mbsread, effs);
383 } else if (doReport) {
384 fProgressPerf->Fill(fProcTime, (Float_t)actw, evtrti, mbsread, effs);
386 } else {
388 }
390 fActWrksLast = actw;
391 fEvtRateLast = evtrti;
392 fMBsReadLast = mbsread;
393 fEffSessLast = effs;
394 }
395 }
396 // Fill the message now
397 TProofProgressInfo pi(fTotalEntries, estent, estmb, fInitTime,
398 fProcTime, evtrti, mbrti, actw, acts, effs);
399 m << &pi;
400 } else if (gProofServ->GetProtocol() > 11) {
401 // Fill the message now
402 m << fTotalEntries << estent << estmb << fInitTime << fProcTime
403 << evtrti << mbrti;
404 } else {
405 // Old format
407 }
408 // send message to client;
410
411 } else {
412 if (gProof && gProof->GetPlayer()) {
413 // Log locally
414 gProof->GetPlayer()->Progress(fTotalEntries, estent, estmb,
415 fInitTime, fProcTime, evtrti, mbrti);
416 }
417 }
418
419 // Final report only once (to correctly determine the proc time)
422
423 return kFALSE; // ignored?
424}
425
426////////////////////////////////////////////////////////////////////////////////
427/// Set the initialization time
428
430{
434 PDB(kPacketizer,2)
435 Info("SetInitTime","fInitTime set to %f s", fInitTime);
436 }
437}
438
439////////////////////////////////////////////////////////////////////////////////
440/// Adds new workers. Must be implemented by each real packetizer properly.
441/// Returns the number of workers added, or -1 on failure.
442
444{
445 Warning("AddWorkers", "Not implemented for this packetizer");
446
447 return -1;
448}
@ kPROOF_PROGRESS
#define SafeDelete(p)
Definition RConfig.hxx:525
#define e(i)
Definition RSha256.hxx:103
int Int_t
Definition RtypesCore.h:45
long Long_t
Definition RtypesCore.h:54
float Float_t
Definition RtypesCore.h:57
constexpr Bool_t kFALSE
Definition RtypesCore.h:101
double Double_t
Definition RtypesCore.h:59
long long Long64_t
Definition RtypesCore.h:80
constexpr Bool_t kTRUE
Definition RtypesCore.h:100
#define ClassImp(name)
Definition Rtypes.h:377
#define gDirectory
Definition TDirectory.h:384
R__EXTERN TEnv * gEnv
Definition TEnv.h:170
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void input
#define PDB(mask, level)
Definition TProofDebug.h:56
R__EXTERN TProofServ * gProofServ
Definition TProofServ.h:347
R__EXTERN TProof * gProof
Definition TProof.h:1077
R__EXTERN TSystem * gSystem
Definition TSystem.h:555
#define gPerfStats
void SetName(const char *name)
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Manages an element of a TDSet.
Definition TDSet.h:66
const char * GetObjName() const
Definition TDSet.h:120
virtual TList * GetListOfFriends() const
Definition TDSet.h:108
const char * GetDirectory() const
Return directory where to look for object.
Definition TDSet.cxx:253
virtual void AddFriend(TDSetElement *friendElement, const char *alias)
Add friend TDSetElement to this set. The friend element will be copied to this object.
Definition TDSet.cxx:376
const char * GetFileName() const
Definition TDSet.h:111
Bool_t cd() override
Change current directory to "this" directory.
Describe directory structure in memory.
Definition TDirectory.h:45
virtual Bool_t cd()
Change current directory to "this" directory.
virtual TKey * GetKey(const char *, Short_t=9999) const
Definition TDirectory.h:221
virtual TList * GetListOfKeys() const
Definition TDirectory.h:223
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition TEnv.cxx:491
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Definition TFile.h:53
virtual Int_t GetErrno() const
Method returning errno.
Definition TFile.cxx:1246
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:4089
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition TKey.h:28
virtual TObject * ReadObj()
To read a TObject* from the file.
Definition TKey.cxx:762
A doubly linked list.
Definition TList.h:38
void Add(TObject *obj) override
Definition TList.h:83
A simple TTree restricted to a list of double variables only.
Definition TNtupleD.h:28
Double_t * GetArgs() const
Definition TNtupleD.h:53
Int_t Fill() override
Fill a Ntuple with current values in fArgs.
Definition TNtupleD.cxx:150
A simple TTree restricted to a list of float variables only.
Definition TNtuple.h:28
Float_t * GetArgs() const
Definition TNtuple.h:56
Int_t Fill() override
Fill a Ntuple with current values in fArgs.
Definition TNtuple.cxx:169
void AbstractMethod(const char *method) const
Use this method to implement an "abstract" method that you don't want to leave purely abstract.
Definition TObject.cxx:1029
R__ALWAYS_INLINE Bool_t TestBit(UInt_t f) const
Definition TObject.h:199
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition TObject.cxx:973
R__ALWAYS_INLINE Bool_t IsZombie() const
Definition TObject.h:153
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition TObject.cxx:780
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition TObject.cxx:987
void ResetBit(UInt_t f)
Definition TObject.h:198
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition TObject.cxx:961
Named parameter, streamable and storable.
Definition TParameter.h:35
Container class for processing statistics.
Int_t GetActSessions() const
Definition TProofServ.h:263
Float_t GetEffSessions() const
Definition TProofServ.h:264
TSocket * GetSocket() const
Definition TProofServ.h:257
Bool_t IsMaster() const
Definition TProofServ.h:293
Int_t GetProtocol() const
Definition TProofServ.h:252
Bool_t IsTopMaster() const
Definition TProofServ.h:295
TVirtualProofPlayer * GetPlayer() const
Definition TProof.h:716
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition TProof.cxx:9918
Class describing a PROOF worker server.
Definition TSlave.h:46
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition TSocket.cxx:522
Basic string class.
Definition TString.h:139
const char * Data() const
Definition TString.h:376
Bool_t IsNull() const
Definition TString.h:414
virtual TTime Now()
Get current time in milliseconds since 0:00 Jan 1 1995.
Definition TSystem.cxx:463
Basic time type with millisecond precision.
Definition TTime.h:27
Handles synchronous and a-synchronous timer events.
Definition TTimer.h:51
virtual void Start(Long_t milliSec=-1, Bool_t singleShot=kFALSE)
Starts the timer with a milliSec timeout.
Definition TTimer.cxx:213
void SetObject(TObject *object)
Set the object to be notified at time out.
Definition TTimer.cxx:186
virtual void Stop()
Definition TTimer.h:94
A TTree represents a columnar dataset.
Definition TTree.h:79
virtual Int_t GetEntry(Long64_t entry, Int_t getall=0)
Read all branches of entry and return total number of bytes read.
Definition TTree.cxx:5638
virtual void SetCircular(Long64_t maxEntries)
Enable/Disable circularity for this tree.
Definition TTree.cxx:8882
virtual void SetDirectory(TDirectory *dir)
Change the tree's directory.
Definition TTree.cxx:8956
virtual Long64_t GetEntries() const
Definition TTree.h:463
The packetizer is a load balancing object created for each query.
Long64_t GetEntries(Bool_t tree, TDSetElement *e)
Get entries.
virtual Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls)
virtual Int_t AddWorkers(TList *workers)
Adds new workers.
~TVirtualPacketizer() override
Destructor.
TProofProgressStatus * fProgressStatus
Long64_t GetReadCalls() const
virtual Float_t GetCurrentRate(Bool_t &all)
Long64_t GetEntriesProcessed() const
virtual void StopProcess(Bool_t abort, Bool_t stoptimer=kFALSE)
Stop process.
Bool_t HandleTimer(TTimer *timer) override
Send progress message to client.
virtual void SetInitTime()
Set the initialization time.
TVirtualPacketizer(TList *input, TProofProgressStatus *st=0)
Constructor.
virtual Int_t GetActiveWorkers()
Long64_t GetBytesRead() const
TDSetElement * CreateNewPacket(TDSetElement *base, Long64_t first, Long64_t num)
Creates a new TDSetElement from from base packet starting from the first entry with num entries.
virtual TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet.
virtual void Progress(Long64_t total, Long64_t processed)=0
LongDouble_t Power(LongDouble_t x, LongDouble_t y)
Returns x raised to the power y.
Definition TMath.h:721
Short_t Abs(Short_t d)
Returns the absolute value of parameter Short_t d.
Definition TMathBase.h:123
TMarker m
Definition textangle.C:8