Logo ROOT  
Reference Guide
TPacketizerMulti.cxx
Go to the documentation of this file.
1// @(#)root/proofplayer:$Id$
2// Author: G. Ganis Jan 2010
3
4/*************************************************************************
5 * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10*************************************************************************/
11
12/** \class TPacketizerMulti
13\ingroup proofkernel
14
15This class allows to do multiple runs in the same query; each run
16can be a, for example, different dataset or the same dataset with
17entry list.
18The multiple packetizer contains a list of packetizers which are
19processed in turn.
20The bit TSelector::kNewRun is set in the TSelector object when a new
21packetizer is used.
22
23*/
24
25
26#include "TPacketizerMulti.h"
27
28#include "TClass.h"
29#include "TDSet.h"
30#include "TError.h"
31#include "TFileInfo.h"
32#include "TList.h"
33#include "TMap.h"
34#include "TMethodCall.h"
35#include "TProof.h"
36#include "TProofDebug.h"
37
39
40////////////////////////////////////////////////////////////////////////////////
41/// Constructor
42
45 TList *input, TProofProgressStatus *st)
46 : TVirtualPacketizer(input, st)
47{
48 PDB(kPacketizer,1) Info("TPacketizerMulti",
49 "enter (first %lld, num %lld)", first, num);
50 fValid = kFALSE;
52 fCurrent = 0;
53 fAssignedPack = 0;
54
55 // Check inputs
56 if (!dset || !wrks || !input || !st) {
57 Error("TPacketizerMulti", "invalid inputs: dset:%p wrks:%p input:%p st:%p",
58 dset, wrks, input, st);
59 return;
60 }
61 // Create the list
62 fPacketizers = new TList;
63
64 // We do not want progress timers from the single packetizers
65 TNamed *progTimerFlag = new TNamed("PROOF_StartProgressTimer", "no");
66 input->Add(progTimerFlag);
67
68 fTotalEntries = 0;
69 TVirtualPacketizer *packetizer = 0;
70 // Simple or multi?
71 if (!(dset->TestBit(TDSet::kMultiDSet))) {
72 if ((packetizer = CreatePacketizer(dset, wrks, first, num, input, st))) {
73 fPacketizers->Add(packetizer);
74 fTotalEntries = packetizer->GetTotalEntries();
75 } else {
76 Error("TPacketizerMulti", "problems initializing packetizer for single dataset");
77 input->Remove(progTimerFlag);
78 delete progTimerFlag;
79 return;
80 }
81 } else {
82 // Iterate on the datasets
83 TIter nxds(dset->GetListOfElements());
84 TDSet *ds = 0;
85 while ((ds = (TDSet *)nxds())) {
86 if ((packetizer = CreatePacketizer(ds, wrks, first, num, input, st))) {
87 fPacketizers->Add(packetizer);
88 fTotalEntries += packetizer->GetTotalEntries();
89 } else {
90 Error("TPacketizerMulti", "problems initializing packetizer for dataset '%s'", ds->GetName());
91 }
92 }
93 }
94 // Cleanup temporary additions to the input list
95 input->Remove(progTimerFlag);
96 delete progTimerFlag;
97
98 // If no valid packetizer could be initialized we fail
99 if (fPacketizers->GetSize() <= 0) {
100 Error("TPacketizerMulti", "no valid packetizer could be initialized - aborting");
102 return;
103 } else {
104 Info("TPacketizerMulti", "%d packetizer(s) have been successfully initialized (%lld events in total)",
106 // To avoid problems with counters we must set the total entries in each packetizer
107 TIter nxp(fPacketizers);
108 while ((packetizer = (TVirtualPacketizer *) nxp()))
109 packetizer->SetTotalEntries(fTotalEntries);
110 }
111
112 // Create the interator
114
115 // Set the current the first
117 // Weird
118 Error("TPacketizerMulti", "could not point to the first valid packetizer");
122 return;
123 }
124
125 // Create map
126 fAssignedPack = new TMap;
127
128 // Ok, everything went fine
129 fValid = kTRUE;
130
131 PDB(kPacketizer,1) Info("TPacketizerMulti", "done");
132}
133
134////////////////////////////////////////////////////////////////////////////////
135/// Destructor.
136
138{
139 if (fPacketizers) {
142 }
144 fCurrent = 0;
145 if (fAssignedPack) {
148 }
150}
151
152////////////////////////////////////////////////////////////////////////////////
153/// Get next packet from the current packetizer.
154/// If the current packetizer is done, move to next.
155/// Retun null when all packetizers are done.
156
158{
159 TDSetElement *elem = 0;
160
161 // Must be valid
162 if (!fValid) return elem;
163
164 // Point to the packetizer last used for thsi worker
165 TVirtualPacketizer *lastPacketizer = dynamic_cast<TVirtualPacketizer *>(fAssignedPack->GetValue(wrk));
166 if (lastPacketizer && lastPacketizer != fCurrent) {
167 PDB(kPacketizer,2)
168 Info("GetNextPacket", "%s: asking old packetizer %p ... ", wrk->GetOrdinal(), lastPacketizer);
169 if ((elem = lastPacketizer->GetNextPacket(wrk, r))) return elem;
170 if (fCurrent) {
171 // Transfer the status info
172 TVirtualSlaveStat *oldstat = dynamic_cast<TVirtualSlaveStat *>(lastPacketizer->GetSlaveStats()->GetValue(wrk));
173 TVirtualSlaveStat *curstat = dynamic_cast<TVirtualSlaveStat *>(fCurrent->GetSlaveStats()->GetValue(wrk));
174 if (oldstat && curstat)
175 *(curstat->GetProgressStatus()) += *(oldstat->GetProgressStatus());
176 }
177 }
178
179 // Need something to be processed
180 if (!fCurrent) {
181 HandleTimer(0); // Send last timer message
182 return elem;
183 }
184
185 // Get the next packet from the current packetizer
186 PDB(kPacketizer,2)
187 Info("GetNextPacket", "%s: asking current packetizer %p ... ", wrk->GetOrdinal(), fCurrent);
188 if (!(elem = fCurrent->GetNextPacket(wrk, r))) {
189 // We need to transfer the status info if we change packetizer now
190 TMap *oldStats = (lastPacketizer && lastPacketizer == fCurrent) ? lastPacketizer->GetSlaveStats() : 0;
191 // If the packetizer is done, move to next
193 if (fCurrent) {
194 // Transfer the status info
195 if (oldStats) {
196 TVirtualSlaveStat *oldstat = dynamic_cast<TVirtualSlaveStat *>(oldStats->GetValue(wrk));
197 TVirtualSlaveStat *curstat = dynamic_cast<TVirtualSlaveStat *>(fCurrent->GetSlaveStats()->GetValue(wrk));
198 if (oldstat && curstat)
199 *(curstat->GetProgressStatus()) += *(oldstat->GetProgressStatus());
200 }
201 PDB(kPacketizer,2)
202 Info("GetNextPacket", "%s: asking new packetizer %p ... ", wrk->GetOrdinal(), fCurrent);
203 elem = fCurrent->GetNextPacket(wrk, r);
204 }
205 }
206 if (fCurrent) {
207 // Save the packetizer
208 TPair *pair = dynamic_cast<TPair *>(fAssignedPack->FindObject(wrk));
209 if (pair) {
210 pair->SetValue(fCurrent);
211 } else {
213 }
214 PDB(kPacketizer,2)
215 Info("GetNextPacket", "assigned packetizer %p to %s (check: %p)",
217 }
218
219 // Check the total number of entries
222 Error("GetNextPacket", "Processed too many entries!");
223 HandleTimer(0); // Send last timer message
225 }
226
227 // Done
228 return elem;
229}
230
231////////////////////////////////////////////////////////////////////////////////
232/// Create a packetizer for dataset 'dset'
233/// Return null on failure.
234
237 TList *input, TProofProgressStatus *st)
238{
239 TVirtualPacketizer *packetizer = 0;
240
241 // Check inputs
242 if (!dset || !wrks || !input || !st) {
243 Error("CreatePacketizer", "invalid inputs: dset:%p wrks:%p input:%p st:%p",
244 dset, wrks, input, st);
245 return packetizer;
246 }
247
248 // This is for data-driven runs
249 if (dset->TestBit(TDSet::kEmpty)) {
250 Error("CreatePacketizer", "dataset is empty: protocol error?");
251 return packetizer;
252 }
253
254 TString packetizername;
255 TList *listOfMissingFiles = 0;
256
257 TMethodCall callEnv;
258 TClass *cl;
259
260 // Lookup - resolve the end-point urls to optmize the distribution.
261 // The lookup was previously called in the packetizer's constructor.
262 // A list for the missing files may already have been added to the
263 // output list; otherwise, if needed it will be created inside
264 if (!(listOfMissingFiles = (TList *) input->FindObject("MissingFiles"))) {
265 // Create it
266 listOfMissingFiles = new TList;
267 // and add it to the input list; it will be later moved to the output list
268 input->Add(listOfMissingFiles);
269 }
270 dset->Lookup(kTRUE, &listOfMissingFiles);
271
272 if (!(dset->GetListOfElements()) ||
273 !(dset->GetListOfElements()->GetSize())) {
274 Error("CreatePacketizer", "no files from the data set were found - skipping");
275 return packetizer;
276 }
277
278 if (TProof::GetParameter(input, "PROOF_Packetizer", packetizername) != 0) {
279 // Using standard packetizer TPacketizer
280 packetizername = "TPacketizer";
281 } else {
282 Info("CreatePacketizer", "using alternate packetizer: %s", packetizername.Data());
283 }
284
285 // Get linked to the related class
286 cl = TClass::GetClass(packetizername);
287 if (cl == 0) {
288 Error("CreatePacketizer", "class '%s' not found", packetizername.Data());
289 return packetizer;
290 }
291
292 // Init the constructor
293 callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
294 if (!callEnv.IsValid()) {
295 Error("CreatePacketizer", "cannot find correct constructor for '%s'", cl->GetName());
296 return packetizer;
297 }
298 callEnv.ResetParam();
299 callEnv.SetParam((Long_t) dset);
300 callEnv.SetParam((Long_t) wrks);
301 callEnv.SetParam((Long64_t) first);
302 callEnv.SetParam((Long64_t) num);
303 callEnv.SetParam((Long_t) input);
304 callEnv.SetParam((Long_t) st);
305
306 // We are going to test validity during the packetizer initialization
309
310 // Get an instance of the packetizer
311 Long_t ret = 0;
312 callEnv.Execute(ret);
313 if ((packetizer = (TVirtualPacketizer *)ret) == 0) {
314 Error("CreatePacketizer", "cannot construct '%s'", cl->GetName());
315 return packetizer;
316 }
317
318 if (!packetizer->IsValid()) {
319 Error("CreatePacketizer",
320 "instantiated packetizer object '%s' is invalid", cl->GetName());
321 SafeDelete(packetizer);
322 }
323
324 // Add invalid elements to the list of missing elements
325 TDSetElement *elem = 0;
326 if (dset->TestBit(TDSet::kSomeInvalid)) {
327 TIter nxe(dset->GetListOfElements());
328 while ((elem = (TDSetElement *)nxe())) {
329 if (!elem->GetValid()) {
330 listOfMissingFiles->Add(elem->GetFileInfo(dset->GetType()));
331 dset->Remove(elem, kFALSE);
332 }
333 }
334 // The invalid elements have been removed
336 }
337
338 // Done
339 return packetizer;
340}
ROOT::R::TRInterface & r
Definition: Object.C:4
#define SafeDelete(p)
Definition: RConfig.hxx:550
const Bool_t kFALSE
Definition: RtypesCore.h:88
long Long_t
Definition: RtypesCore.h:50
long long Long64_t
Definition: RtypesCore.h:69
const Bool_t kTRUE
Definition: RtypesCore.h:87
#define ClassImp(name)
Definition: Rtypes.h:365
#define PDB(mask, level)
Definition: TProofDebug.h:56
TClass instances represent classes, structs and namespaces in the ROOT type system.
Definition: TClass.h:75
static TClass * GetClass(const char *name, Bool_t load=kTRUE, Bool_t silent=kFALSE)
Static method returning pointer to TClass of the specified class name.
Definition: TClass.cxx:2906
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Definition: TCollection.h:182
Manages an element of a TDSet.
Definition: TDSet.h:66
TFileInfo * GetFileInfo(const char *type="TTree")
Return the content of this element in the form of a TFileInfo.
Definition: TDSet.cxx:231
Bool_t GetValid() const
Definition: TDSet.h:119
This class implements a data set to be used for PROOF processing.
Definition: TDSet.h:153
Int_t Remove(TDSetElement *elem, Bool_t deleteElem=kTRUE)
Remove TDSetElement 'elem' from the list.
Definition: TDSet.cxx:1577
void Lookup(Bool_t removeMissing=kFALSE, TList **missingFiles=0)
Resolve the end-point URL for the current elements of this data set If the removeMissing option is se...
Definition: TDSet.cxx:1606
const char * GetType() const
Definition: TDSet.h:228
TList * GetListOfElements() const
Definition: TDSet.h:231
@ kSomeInvalid
Definition: TDSet.h:161
@ kEmpty
Definition: TDSet.h:159
@ kValidityChecked
Definition: TDSet.h:160
@ kMultiDSet
Definition: TDSet.h:162
TObject * Next()
Definition: TCollection.h:249
A doubly linked list.
Definition: TList.h:44
virtual void Add(TObject *obj)
Definition: TList.h:87
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition: TList.cxx:819
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:575
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:40
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection).
Definition: TMap.cxx:53
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Definition: TMap.cxx:235
TObject * FindObject(const char *keyname) const
Check if a (key,value) pair exists with keyname as name of the key.
Definition: TMap.cxx:214
Method or function calling interface.
Definition: TMethodCall.h:37
void ResetParam()
Reset parameter list. To be used before the first call the SetParam().
Bool_t IsValid() const
Return true if the method call has been properly initialized and is usable.
void Execute(const char *, const char *, int *=0)
Execute method on this object with the given parameter string, e.g.
Definition: TMethodCall.h:64
void InitWithPrototype(TClass *cl, const char *method, const char *proto, Bool_t objectIsConst=kFALSE, ROOT::EFunctionMatchMode mode=ROOT::kConversionMatch)
Initialize the method invocation environment.
void SetParam(Long_t l)
Add a long method parameter.
The TNamed class is the base class for all named ROOT classes.
Definition: TNamed.h:29
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:47
R__ALWAYS_INLINE Bool_t TestBit(UInt_t f) const
Definition: TObject.h:172
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:694
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:880
void ResetBit(UInt_t f)
Definition: TObject.h:171
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:854
This class allows to do multiple runs in the same query; each run can be a, for example,...
TVirtualPacketizer * CreatePacketizer(TDSet *dset, TList *wrks, Long64_t first, Long64_t num, TList *input, TProofProgressStatus *st)
Create a packetizer for dataset 'dset' Return null on failure.
TVirtualPacketizer * fCurrent
TDSetElement * GetNextPacket(TSlave *wrk, TMessage *r)
Get next packet from the current packetizer.
virtual ~TPacketizerMulti()
Destructor.
Class used by TMap to store (key,value) pairs.
Definition: TMap.h:102
void SetValue(TObject *val)
Definition: TMap.h:122
Container class for processing statistics.
Long64_t GetEntries() const
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition: TProof.cxx:9891
Class describing a PROOF worker server.
Definition: TSlave.h:46
const char * GetOrdinal() const
Definition: TSlave.h:131
Basic string class.
Definition: TString.h:131
const char * Data() const
Definition: TString.h:364
TProofProgressStatus * GetProgressStatus()
The packetizer is a load balancing object created for each query.
TProofProgressStatus * fProgressStatus
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
TMap * GetSlaveStats() const
Long64_t GetTotalEntries() const
Bool_t IsValid() const
virtual TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet.
void SetTotalEntries(Long64_t ent)
Definition: first.py:1