Logo ROOT   6.10/09
Reference Guide
TPacketizerMulti.cxx
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: G. Ganis Jan 2010
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11 
12 /** \class TPacketizerMulti
13 \ingroup proofkernel
14 
15 This class allows to do multiple runs in the same query; each run
16 can be a, for example, different dataset or the same dataset with
17 entry list.
18 The multiple packetizer contains a list of packetizers which are
19 processed in turn.
20 The bit TSelector::kNewRun is set in the TSelector object when a new
21 packetizer is used.
22 
23 */
24 
25 
26 #include "TPacketizerMulti.h"
27 
28 #include "TClass.h"
29 #include "TDSet.h"
30 #include "TError.h"
31 #include "TFileInfo.h"
32 #include "TList.h"
33 #include "TMap.h"
34 #include "TMethodCall.h"
35 #include "TProof.h"
36 #include "TProofDebug.h"
37 
39 
40 ////////////////////////////////////////////////////////////////////////////////
41 /// Constructor
42 
44  Long64_t first, Long64_t num,
45  TList *input, TProofProgressStatus *st)
46  : TVirtualPacketizer(input, st)
47 {
48  PDB(kPacketizer,1) Info("TPacketizerMulti",
49  "enter (first %lld, num %lld)", first, num);
50  fValid = kFALSE;
51  fPacketizersIter = 0;
52  fCurrent = 0;
53  fAssignedPack = 0;
54 
55  // Check inputs
56  if (!dset || !wrks || !input || !st) {
57  Error("TPacketizerMulti", "invalid inputs: dset:%p wrks:%p input:%p st:%p",
58  dset, wrks, input, st);
59  return;
60  }
61  // Create the list
62  fPacketizers = new TList;
63 
64  // We do not want progress timers from the single packetizers
65  TNamed *progTimerFlag = new TNamed("PROOF_StartProgressTimer", "no");
66  input->Add(progTimerFlag);
67 
68  fTotalEntries = 0;
69  TVirtualPacketizer *packetizer = 0;
70  // Simple or multi?
71  if (!(dset->TestBit(TDSet::kMultiDSet))) {
72  if ((packetizer = CreatePacketizer(dset, wrks, first, num, input, st))) {
73  fPacketizers->Add(packetizer);
74  fTotalEntries = packetizer->GetTotalEntries();
75  } else {
76  Error("TPacketizerMulti", "problems initializing packetizer for single dataset");
77  input->Remove(progTimerFlag);
78  delete progTimerFlag;
79  return;
80  }
81  } else {
82  // Iterate on the datasets
83  TIter nxds(dset->GetListOfElements());
84  TDSet *ds = 0;
85  while ((ds = (TDSet *)nxds())) {
86  if ((packetizer = CreatePacketizer(ds, wrks, first, num, input, st))) {
87  fPacketizers->Add(packetizer);
88  fTotalEntries += packetizer->GetTotalEntries();
89  } else {
90  Error("TPacketizerMulti", "problems initializing packetizer for dataset '%s'", ds->GetName());
91  }
92  }
93  }
94  // Cleanup temporary additions to the input list
95  input->Remove(progTimerFlag);
96  delete progTimerFlag;
97 
98  // If no valid packetizer could be initialized we fail
99  if (fPacketizers->GetSize() <= 0) {
100  Error("TPacketizerMulti", "no valid packetizer could be initialized - aborting");
101  SafeDelete(fPacketizers);
102  return;
103  } else {
104  Info("TPacketizerMulti", "%d packetizer(s) have been successfully initialized (%lld events in total)",
105  fPacketizers->GetSize(), fTotalEntries);
106  // To avoid problems with counters we must set the total entries in each packetizer
107  TIter nxp(fPacketizers);
108  while ((packetizer = (TVirtualPacketizer *) nxp()))
109  packetizer->SetTotalEntries(fTotalEntries);
110  }
111 
112  // Create the interator
113  fPacketizersIter = new TIter(fPacketizers);
114 
115  // Set the current the first
116  if (!(fCurrent = (TVirtualPacketizer *) fPacketizersIter->Next())) {
117  // Weird
118  Error("TPacketizerMulti", "could not point to the first valid packetizer");
119  fPacketizers->SetOwner(kTRUE);
120  SafeDelete(fPacketizers);
121  SafeDelete(fPacketizersIter);
122  return;
123  }
124 
125  // Create map
126  fAssignedPack = new TMap;
127 
128  // Ok, everything went fine
129  fValid = kTRUE;
130 
131  PDB(kPacketizer,1) Info("TPacketizerMulti", "done");
132 }
133 
134 ////////////////////////////////////////////////////////////////////////////////
135 /// Destructor.
136 
138 {
139  if (fPacketizers) {
142  }
144  fCurrent = 0;
145  if (fAssignedPack) {
148  }
150 }
151 
152 ////////////////////////////////////////////////////////////////////////////////
153 /// Get next packet from the current packetizer.
154 /// If the current packetizer is done, move to next.
155 /// Retun null when all packetizers are done.
156 
158 {
159  TDSetElement *elem = 0;
160 
161  // Must be valid
162  if (!fValid) return elem;
163 
164  // Point to the packetizer last used for thsi worker
165  TVirtualPacketizer *lastPacketizer = dynamic_cast<TVirtualPacketizer *>(fAssignedPack->GetValue(wrk));
166  if (lastPacketizer && lastPacketizer != fCurrent) {
167  PDB(kPacketizer,2)
168  Info("GetNextPacket", "%s: asking old packetizer %p ... ", wrk->GetOrdinal(), lastPacketizer);
169  if ((elem = lastPacketizer->GetNextPacket(wrk, r))) return elem;
170  if (fCurrent) {
171  // Transfer the status info
172  TVirtualSlaveStat *oldstat = dynamic_cast<TVirtualSlaveStat *>(lastPacketizer->GetSlaveStats()->GetValue(wrk));
173  TVirtualSlaveStat *curstat = dynamic_cast<TVirtualSlaveStat *>(fCurrent->GetSlaveStats()->GetValue(wrk));
174  if (oldstat && curstat)
175  *(curstat->GetProgressStatus()) += *(oldstat->GetProgressStatus());
176  }
177  }
178 
179  // Need something to be processed
180  if (!fCurrent) {
181  HandleTimer(0); // Send last timer message
182  return elem;
183  }
184 
185  // Get the next packet from the current packetizer
186  PDB(kPacketizer,2)
187  Info("GetNextPacket", "%s: asking current packetizer %p ... ", wrk->GetOrdinal(), fCurrent);
188  if (!(elem = fCurrent->GetNextPacket(wrk, r))) {
189  // We need to transfer the status info if we change packetizer now
190  TMap *oldStats = (lastPacketizer && lastPacketizer == fCurrent) ? lastPacketizer->GetSlaveStats() : 0;
191  // If the packetizer is done, move to next
193  if (fCurrent) {
194  // Transfer the status info
195  if (oldStats) {
196  TVirtualSlaveStat *oldstat = dynamic_cast<TVirtualSlaveStat *>(oldStats->GetValue(wrk));
197  TVirtualSlaveStat *curstat = dynamic_cast<TVirtualSlaveStat *>(fCurrent->GetSlaveStats()->GetValue(wrk));
198  if (oldstat && curstat)
199  *(curstat->GetProgressStatus()) += *(oldstat->GetProgressStatus());
200  }
201  PDB(kPacketizer,2)
202  Info("GetNextPacket", "%s: asking new packetizer %p ... ", wrk->GetOrdinal(), fCurrent);
203  elem = fCurrent->GetNextPacket(wrk, r);
204  }
205  }
206  if (fCurrent) {
207  // Save the packetizer
208  TPair *pair = dynamic_cast<TPair *>(fAssignedPack->FindObject(wrk));
209  if (pair) {
210  pair->SetValue(fCurrent);
211  } else {
212  fAssignedPack->Add(wrk, fCurrent);
213  }
214  PDB(kPacketizer,2)
215  Info("GetNextPacket", "assigned packetizer %p to %s (check: %p)",
216  fCurrent, wrk->GetOrdinal(), fAssignedPack->GetValue(wrk));
217  }
218 
219  // Check the total number of entries
222  Error("GetNextPacket", "Processed too many entries!");
223  HandleTimer(0); // Send last timer message
225  }
226 
227  // Done
228  return elem;
229 }
230 
231 ////////////////////////////////////////////////////////////////////////////////
232 /// Create a packetizer for dataset 'dset'
233 /// Return null on failure.
234 
236  Long64_t first, Long64_t num,
237  TList *input, TProofProgressStatus *st)
238 {
239  TVirtualPacketizer *packetizer = 0;
240 
241  // Check inputs
242  if (!dset || !wrks || !input || !st) {
243  Error("CreatePacketizer", "invalid inputs: dset:%p wrks:%p input:%p st:%p",
244  dset, wrks, input, st);
245  return packetizer;
246  }
247 
248  // This is for data-driven runs
249  if (dset->TestBit(TDSet::kEmpty)) {
250  Error("CreatePacketizer", "dataset is empty: protocol error?");
251  return packetizer;
252  }
253 
254  TString packetizername;
255  TList *listOfMissingFiles = 0;
256 
257  TMethodCall callEnv;
258  TClass *cl;
259 
260  // Lookup - resolve the end-point urls to optmize the distribution.
261  // The lookup was previously called in the packetizer's constructor.
262  // A list for the missing files may already have been added to the
263  // output list; otherwise, if needed it will be created inside
264  if (!(listOfMissingFiles = (TList *) input->FindObject("MissingFiles"))) {
265  // Create it
266  listOfMissingFiles = new TList;
267  // and add it to the input list; it will be later moved to the output list
268  input->Add(listOfMissingFiles);
269  }
270  dset->Lookup(kTRUE, &listOfMissingFiles);
271 
272  if (!(dset->GetListOfElements()) ||
273  !(dset->GetListOfElements()->GetSize())) {
274  Error("CreatePacketizer", "no files from the data set were found - skipping");
275  return packetizer;
276  }
277 
278  if (TProof::GetParameter(input, "PROOF_Packetizer", packetizername) != 0) {
279  // Using standard packetizer TPacketizer
280  packetizername = "TPacketizer";
281  } else {
282  Info("CreatePacketizer", "using alternate packetizer: %s", packetizername.Data());
283  }
284 
285  // Get linked to the related class
286  cl = TClass::GetClass(packetizername);
287  if (cl == 0) {
288  Error("CreatePacketizer", "class '%s' not found", packetizername.Data());
289  return packetizer;
290  }
291 
292  // Init the constructor
293  callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
294  if (!callEnv.IsValid()) {
295  Error("CreatePacketizer", "cannot find correct constructor for '%s'", cl->GetName());
296  return packetizer;
297  }
298  callEnv.ResetParam();
299  callEnv.SetParam((Long_t) dset);
300  callEnv.SetParam((Long_t) wrks);
301  callEnv.SetParam((Long64_t) first);
302  callEnv.SetParam((Long64_t) num);
303  callEnv.SetParam((Long_t) input);
304  callEnv.SetParam((Long_t) st);
305 
306  // We are going to test validity during the packetizer initialization
307  dset->SetBit(TDSet::kValidityChecked);
308  dset->ResetBit(TDSet::kSomeInvalid);
309 
310  // Get an instance of the packetizer
311  Long_t ret = 0;
312  callEnv.Execute(ret);
313  if ((packetizer = (TVirtualPacketizer *)ret) == 0) {
314  Error("CreatePacketizer", "cannot construct '%s'", cl->GetName());
315  return packetizer;
316  }
317 
318  if (!packetizer->IsValid()) {
319  Error("CreatePacketizer",
320  "instantiated packetizer object '%s' is invalid", cl->GetName());
321  SafeDelete(packetizer);
322  }
323 
324  // Add invalid elements to the list of missing elements
325  TDSetElement *elem = 0;
326  if (dset->TestBit(TDSet::kSomeInvalid)) {
327  TIter nxe(dset->GetListOfElements());
328  while ((elem = (TDSetElement *)nxe())) {
329  if (!elem->GetValid()) {
330  listOfMissingFiles->Add(elem->GetFileInfo(dset->GetType()));
331  dset->Remove(elem, kFALSE);
332  }
333  }
334  // The invalid elements have been removed
335  dset->ResetBit(TDSet::kSomeInvalid);
336  }
337 
338  // Done
339  return packetizer;
340 }
TFileInfo * GetFileInfo(const char *type="TTree")
Return the content of this element in the form of a TFileInfo.
Definition: TDSet.cxx:212
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:847
long long Long64_t
Definition: RtypesCore.h:69
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition: TProof.cxx:9890
Bool_t TestBit(UInt_t f) const
Definition: TObject.h:159
This class implements a data set to be used for PROOF processing.
Definition: TDSet.h:151
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
Bool_t GetValid() const
Definition: TDSet.h:119
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection)...
Definition: TMap.cxx:53
const char * GetOrdinal() const
Definition: TSlave.h:131
Basic string class.
Definition: TString.h:129
Long64_t GetEntries() const
TList * GetListOfElements() const
Definition: TDSet.h:229
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:501
Manages an element of a TDSet.
Definition: TDSet.h:66
#define SafeDelete(p)
Definition: RConfig.h:499
This class allows to do multiple runs in the same query; each run can be a, for example, different dataset or the same dataset with entry list.
#define PDB(mask, level)
Definition: TProofDebug.h:56
TVirtualPacketizer * CreatePacketizer(TDSet *dset, TList *wrks, Long64_t first, Long64_t num, TList *input, TProofProgressStatus *st)
Create a packetizer for dataset &#39;dset&#39; Return null on failure.
The TNamed class is the base class for all named ROOT classes.
Definition: TNamed.h:29
TMap * GetSlaveStats() const
TDSetElement * GetNextPacket(TSlave *wrk, TMessage *r)
Get next packet from the current packetizer.
void Info(const char *location, const char *msgfmt,...)
Method or function calling interface.
Definition: TMethodCall.h:37
void Error(const char *location, const char *msgfmt,...)
TProofProgressStatus * fProgressStatus
Long64_t GetTotalEntries() const
A doubly linked list.
Definition: TList.h:43
void SetValue(TObject *val)
Definition: TMap.h:122
void SetTotalEntries(Long64_t ent)
TRandom2 r(17)
TObject * Next()
Definition: TCollection.h:153
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:873
The ROOT global object gROOT contains a list of all defined classes.
Definition: TClass.h:71
Bool_t IsValid() const
Return true if the method call has been properly initialized and is usable.
void InitWithPrototype(TClass *cl, const char *method, const char *proto, Bool_t objectIsConst=kFALSE, ROOT::EFunctionMatchMode mode=ROOT::kConversionMatch)
Initialize the method invocation environment.
const Bool_t kFALSE
Definition: RtypesCore.h:92
long Long_t
Definition: RtypesCore.h:50
The packetizer is a load balancing object created for each query.
Class used by TMap to store (key,value) pairs.
Definition: TMap.h:102
#define ClassImp(name)
Definition: Rtypes.h:336
void ResetParam()
Reset parameter list. To be used before the first call the SetParam().
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:40
virtual TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet.
static TClass * GetClass(const char *name, Bool_t load=kTRUE, Bool_t silent=kFALSE)
Static method returning pointer to TClass of the specified class name.
Definition: TClass.cxx:2885
TVirtualPacketizer * fCurrent
Bool_t IsValid() const
void Lookup(Bool_t removeMissing=kFALSE, TList **missingFiles=0)
Resolve the end-point URL for the current elements of this data set If the removeMissing option is se...
Definition: TDSet.cxx:1587
virtual Bool_t Add(const char *file, const char *objname=0, const char *dir=0, Long64_t first=0, Long64_t num=-1, const char *msd=0)
Add file to list of files to be analyzed.
Definition: TDSet.cxx:1033
TObject * FindObject(const char *keyname) const
Check if a (key,value) pair exists with keyname as name of the key.
Definition: TMap.cxx:214
virtual ~TPacketizerMulti()
Destructor.
virtual void Add(TObject *obj)
Definition: TList.h:77
void Execute(const char *, const char *, int *=0)
Execute method on this object with the given parameter string, e.g.
Definition: TMethodCall.h:64
void SetParam(Long_t l)
Add a long method parameter.
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Definition: TMap.cxx:235
Definition: first.py:1
TProofProgressStatus * GetProgressStatus()
virtual Int_t GetSize() const
Definition: TCollection.h:89
Class describing a PROOF worker server.
Definition: TSlave.h:46
Container class for processing statistics.
const Bool_t kTRUE
Definition: RtypesCore.h:91
const char * Data() const
Definition: TString.h:347