ROOT  6.06/09
Reference Guide
TPacketizerMulti.cxx
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: G. Ganis Jan 2010
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11 
12 //////////////////////////////////////////////////////////////////////////
13 // //
14 // TPacketizerMulti //
15 // //
16 // This class allows to do multiple runs in the same query; each run //
17 // can be a, for example, different dataset or the same dataset with //
18 // entry list. //
19 // The multiple packetizer conatins a list of packetizers which are //
20 // processed in turn. //
21 // The bit TSelector::kNewRun is set in the TSelector object when a new //
22 // packetizer is used. //
23 // //
24 //////////////////////////////////////////////////////////////////////////
25 
26 
27 #include "TPacketizerMulti.h"
28 
29 #include "TClass.h"
30 #include "TDSet.h"
31 #include "TError.h"
32 #include "TFileInfo.h"
33 #include "TList.h"
34 #include "TMap.h"
35 #include "TMethodCall.h"
36 #include "TProof.h"
37 #include "TProofDebug.h"
38 
40 
41 ////////////////////////////////////////////////////////////////////////////////
42 /// Constructor
43 
45  Long64_t first, Long64_t num,
46  TList *input, TProofProgressStatus *st)
47  : TVirtualPacketizer(input, st)
48 {
49  PDB(kPacketizer,1) Info("TPacketizerMulti",
50  "enter (first %lld, num %lld)", first, num);
51  fValid = kFALSE;
52  fPacketizersIter = 0;
53  fCurrent = 0;
54  fAssignedPack = 0;
55 
56  // Check inputs
57  if (!dset || !wrks || !input || !st) {
58  Error("TPacketizerMulti", "invalid inputs: dset:%p wrks:%p input:%p st:%p",
59  dset, wrks, input, st);
60  return;
61  }
62  // Create the list
63  fPacketizers = new TList;
64 
65  // We do not want progress timers from the single packetizers
66  TNamed *progTimerFlag = new TNamed("PROOF_StartProgressTimer", "no");
67  input->Add(progTimerFlag);
68 
69  fTotalEntries = 0;
70  TVirtualPacketizer *packetizer = 0;
71  // Simple or multi?
72  if (!(dset->TestBit(TDSet::kMultiDSet))) {
73  if ((packetizer = CreatePacketizer(dset, wrks, first, num, input, st))) {
74  fPacketizers->Add(packetizer);
75  fTotalEntries = packetizer->GetTotalEntries();
76  } else {
77  Error("TPacketizerMulti", "problems initializing packetizer for single dataset");
78  input->Remove(progTimerFlag);
79  delete progTimerFlag;
80  return;
81  }
82  } else {
83  // Iterate on the datasets
84  TIter nxds(dset->GetListOfElements());
85  TDSet *ds = 0;
86  while ((ds = (TDSet *)nxds())) {
87  if ((packetizer = CreatePacketizer(ds, wrks, first, num, input, st))) {
88  fPacketizers->Add(packetizer);
89  fTotalEntries += packetizer->GetTotalEntries();
90  } else {
91  Error("TPacketizerMulti", "problems initializing packetizer for dataset '%s'", ds->GetName());
92  }
93  }
94  }
95  // Cleanup temporary additions to the input list
96  input->Remove(progTimerFlag);
97  delete progTimerFlag;
98 
99  // If no valid packetizer could be initialized we fail
100  if (fPacketizers->GetSize() <= 0) {
101  Error("TPacketizerMulti", "no valid packetizer could be initialized - aborting");
102  SafeDelete(fPacketizers);
103  return;
104  } else {
105  Info("TPacketizerMulti", "%d packetizer(s) have been successfully initialized (%lld events in total)",
106  fPacketizers->GetSize(), fTotalEntries);
107  // To avoid problems with counters we must set the total entries in each packetizer
108  TIter nxp(fPacketizers);
109  while ((packetizer = (TVirtualPacketizer *) nxp()))
110  packetizer->SetTotalEntries(fTotalEntries);
111  }
112 
113  // Create the interator
114  fPacketizersIter = new TIter(fPacketizers);
115 
116  // Set the current the first
117  if (!(fCurrent = (TVirtualPacketizer *) fPacketizersIter->Next())) {
118  // Weird
119  Error("TPacketizerMulti", "could not point to the first valid packetizer");
120  fPacketizers->SetOwner(kTRUE);
121  SafeDelete(fPacketizers);
122  SafeDelete(fPacketizersIter);
123  return;
124  }
125 
126  // Create map
127  fAssignedPack = new TMap;
128 
129  // Ok, everything went fine
130  fValid = kTRUE;
131 
132  PDB(kPacketizer,1) Info("TPacketizerMulti", "done");
133 }
134 
135 ////////////////////////////////////////////////////////////////////////////////
136 /// Destructor.
137 
139 {
140  if (fPacketizers) {
143  }
145  fCurrent = 0;
146  if (fAssignedPack) {
149  }
151 }
152 
153 ////////////////////////////////////////////////////////////////////////////////
154 /// Get next packet from the current packetizer.
155 /// If the current packetizer is done, move to next.
156 /// Retun null when all packetizers are done.
157 
159 {
160  TDSetElement *elem = 0;
161 
162  // Must be valid
163  if (!fValid) return elem;
164 
165  // Point to the packetizer last used for thsi worker
166  TVirtualPacketizer *lastPacketizer = dynamic_cast<TVirtualPacketizer *>(fAssignedPack->GetValue(wrk));
167  if (lastPacketizer && lastPacketizer != fCurrent) {
168  PDB(kPacketizer,2)
169  Info("GetNextPacket", "%s: asking old packetizer %p ... ", wrk->GetOrdinal(), lastPacketizer);
170  if ((elem = lastPacketizer->GetNextPacket(wrk, r))) return elem;
171  if (fCurrent) {
172  // Transfer the status info
173  TVirtualSlaveStat *oldstat = dynamic_cast<TVirtualSlaveStat *>(lastPacketizer->GetSlaveStats()->GetValue(wrk));
174  TVirtualSlaveStat *curstat = dynamic_cast<TVirtualSlaveStat *>(fCurrent->GetSlaveStats()->GetValue(wrk));
175  if (oldstat && curstat)
176  *(curstat->GetProgressStatus()) += *(oldstat->GetProgressStatus());
177  }
178  }
179 
180  // Need something to be processed
181  if (!fCurrent) {
182  HandleTimer(0); // Send last timer message
183  return elem;
184  }
185 
186  // Get the next packet from the current packetizer
187  PDB(kPacketizer,2)
188  Info("GetNextPacket", "%s: asking current packetizer %p ... ", wrk->GetOrdinal(), fCurrent);
189  if (!(elem = fCurrent->GetNextPacket(wrk, r))) {
190  // We need to transfer the status info if we change packetizer now
191  TMap *oldStats = (lastPacketizer && lastPacketizer == fCurrent) ? lastPacketizer->GetSlaveStats() : 0;
192  // If the packetizer is done, move to next
194  if (fCurrent) {
195  // Transfer the status info
196  if (oldStats) {
197  TVirtualSlaveStat *oldstat = dynamic_cast<TVirtualSlaveStat *>(oldStats->GetValue(wrk));
198  TVirtualSlaveStat *curstat = dynamic_cast<TVirtualSlaveStat *>(fCurrent->GetSlaveStats()->GetValue(wrk));
199  if (oldstat && curstat)
200  *(curstat->GetProgressStatus()) += *(oldstat->GetProgressStatus());
201  }
202  PDB(kPacketizer,2)
203  Info("GetNextPacket", "%s: asking new packetizer %p ... ", wrk->GetOrdinal(), fCurrent);
204  elem = fCurrent->GetNextPacket(wrk, r);
205  }
206  }
207  if (fCurrent) {
208  // Save the packetizer
209  TPair *pair = dynamic_cast<TPair *>(fAssignedPack->FindObject(wrk));
210  if (pair) {
211  pair->SetValue(fCurrent);
212  } else {
213  fAssignedPack->Add(wrk, fCurrent);
214  }
215  PDB(kPacketizer,2)
216  Info("GetNextPacket", "assigned packetizer %p to %s (check: %p)",
217  fCurrent, wrk->GetOrdinal(), fAssignedPack->GetValue(wrk));
218  }
219 
220  // Check the total number of entries
223  Error("GetNextPacket", "Processed too many entries!");
224  HandleTimer(0); // Send last timer message
226  }
227 
228  // Done
229  return elem;
230 }
231 
232 ////////////////////////////////////////////////////////////////////////////////
233 /// Create a packetizer for dataset 'dset'
234 /// Return null on failure.
235 
237  Long64_t first, Long64_t num,
238  TList *input, TProofProgressStatus *st)
239 {
240  TVirtualPacketizer *packetizer = 0;
241 
242  // Check inputs
243  if (!dset || !wrks || !input || !st) {
244  Error("CreatePacketizer", "invalid inputs: dset:%p wrks:%p input:%p st:%p",
245  dset, wrks, input, st);
246  return packetizer;
247  }
248 
249  // This is for data-driven runs
250  if (dset->TestBit(TDSet::kEmpty)) {
251  Error("CreatePacketizer", "dataset is empty: protocol error?");
252  return packetizer;
253  }
254 
255  TString packetizername;
256  TList *listOfMissingFiles = 0;
257 
258  TMethodCall callEnv;
259  TClass *cl;
260 
261  // Lookup - resolve the end-point urls to optmize the distribution.
262  // The lookup was previously called in the packetizer's constructor.
263  // A list for the missing files may already have been added to the
264  // output list; otherwise, if needed it will be created inside
265  if (!(listOfMissingFiles = (TList *) input->FindObject("MissingFiles"))) {
266  // Create it
267  listOfMissingFiles = new TList;
268  // and add it to the input list; it will be later moved to the output list
269  input->Add(listOfMissingFiles);
270  }
271  dset->Lookup(kTRUE, &listOfMissingFiles);
272 
273  if (!(dset->GetListOfElements()) ||
274  !(dset->GetListOfElements()->GetSize())) {
275  Error("CreatePacketizer", "no files from the data set were found - skipping");
276  return packetizer;
277  }
278 
279  if (TProof::GetParameter(input, "PROOF_Packetizer", packetizername) != 0) {
280  // Using standard packetizer TPacketizer
281  packetizername = "TPacketizer";
282  } else {
283  Info("CreatePacketizer", "using alternate packetizer: %s", packetizername.Data());
284  }
285 
286  // Get linked to the related class
287  cl = TClass::GetClass(packetizername);
288  if (cl == 0) {
289  Error("CreatePacketizer", "class '%s' not found", packetizername.Data());
290  return packetizer;
291  }
292 
293  // Init the constructor
294  callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
295  if (!callEnv.IsValid()) {
296  Error("CreatePacketizer", "cannot find correct constructor for '%s'", cl->GetName());
297  return packetizer;
298  }
299  callEnv.ResetParam();
300  callEnv.SetParam((Long_t) dset);
301  callEnv.SetParam((Long_t) wrks);
302  callEnv.SetParam((Long64_t) first);
303  callEnv.SetParam((Long64_t) num);
304  callEnv.SetParam((Long_t) input);
305  callEnv.SetParam((Long_t) st);
306 
307  // We are going to test validity during the packetizer initialization
308  dset->SetBit(TDSet::kValidityChecked);
309  dset->ResetBit(TDSet::kSomeInvalid);
310 
311  // Get an instance of the packetizer
312  Long_t ret = 0;
313  callEnv.Execute(ret);
314  if ((packetizer = (TVirtualPacketizer *)ret) == 0) {
315  Error("CreatePacketizer", "cannot construct '%s'", cl->GetName());
316  return packetizer;
317  }
318 
319  if (!packetizer->IsValid()) {
320  Error("CreatePacketizer",
321  "instantiated packetizer object '%s' is invalid", cl->GetName());
322  SafeDelete(packetizer);
323  }
324 
325  // Add invalid elements to the list of missing elements
326  TDSetElement *elem = 0;
327  if (dset->TestBit(TDSet::kSomeInvalid)) {
328  TIter nxe(dset->GetListOfElements());
329  while ((elem = (TDSetElement *)nxe())) {
330  if (!elem->GetValid()) {
331  listOfMissingFiles->Add(elem->GetFileInfo(dset->GetType()));
332  dset->Remove(elem, kFALSE);
333  }
334  }
335  // The invalid elements have been removed
336  dset->ResetBit(TDSet::kSomeInvalid);
337  }
338 
339  // Done
340  return packetizer;
341 }
const char * GetOrdinal() const
Definition: TSlave.h:135
TFileInfo * GetFileInfo(const char *type="TTree")
Return the content of this element in the form of a TFileInfo.
Definition: TDSet.cxx:234
long long Long64_t
Definition: RtypesCore.h:69
void SetValue(TObject *val)
Definition: TMap.h:126
Definition: TDSet.h:153
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:892
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection)...
Definition: TMap.cxx:52
Basic string class.
Definition: TString.h:137
const Bool_t kFALSE
Definition: Rtypes.h:92
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:496
Bool_t IsValid() const
Bool_t GetValid() const
Definition: TDSet.h:121
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
const char * Data() const
Definition: TString.h:349
#define SafeDelete(p)
Definition: RConfig.h:436
#define PDB(mask, level)
Definition: TProofDebug.h:58
TVirtualPacketizer * CreatePacketizer(TDSet *dset, TList *wrks, Long64_t first, Long64_t num, TList *input, TProofProgressStatus *st)
Create a packetizer for dataset 'dset' Return null on failure.
The TNamed class is the base class for all named ROOT classes.
Definition: TNamed.h:33
TList * GetListOfElements() const
Definition: TDSet.h:231
TDSetElement * GetNextPacket(TSlave *wrk, TMessage *r)
Get next packet from the current packetizer.
void Info(const char *location, const char *msgfmt,...)
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Definition: TMap.cxx:234
Method or function calling interface.
Definition: TMethodCall.h:41
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:918
ClassImp(TPacketizerMulti) TPacketizerMulti
Constructor.
void Error(const char *location, const char *msgfmt,...)
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition: TProof.cxx:10485
TMap * GetSlaveStats() const
TProofProgressStatus * fProgressStatus
TObject * FindObject(const char *keyname) const
Check if a (key,value) pair exists with keyname as name of the key.
Definition: TMap.cxx:213
A doubly linked list.
Definition: TList.h:47
void SetTotalEntries(Long64_t ent)
ROOT::R::TRInterface & r
Definition: Object.C:4
Long64_t GetTotalEntries() const
TObject * Next()
Definition: TCollection.h:158
Bool_t TestBit(UInt_t f) const
Definition: TObject.h:173
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:51
The ROOT global object gROOT contains a list of all defined classes.
Definition: TClass.h:81
Long64_t GetEntries() const
void InitWithPrototype(TClass *cl, const char *method, const char *proto, Bool_t objectIsConst=kFALSE, ROOT::EFunctionMatchMode mode=ROOT::kConversionMatch)
Initialize the method invocation environment.
long Long_t
Definition: RtypesCore.h:50
Class used by TMap to store (key,value) pairs.
Definition: TMap.h:106
virtual Int_t GetSize() const
Definition: TCollection.h:95
void ResetParam()
Reset parameter list. To be used before the first call the SetParam().
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:44
virtual TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet.
static TClass * GetClass(const char *name, Bool_t load=kTRUE, Bool_t silent=kFALSE)
Static method returning pointer to TClass of the specified class name.
Definition: TClass.cxx:2881
TVirtualPacketizer * fCurrent
void Lookup(Bool_t removeMissing=kFALSE, TList **missingFiles=0)
Resolve the end-point URL for the current elements of this data set If the removeMissing option is se...
Definition: TDSet.cxx:1577
virtual Bool_t Add(const char *file, const char *objname=0, const char *dir=0, Long64_t first=0, Long64_t num=-1, const char *msd=0)
Add file to list of files to be analyzed.
Definition: TDSet.cxx:1023
virtual ~TPacketizerMulti()
Destructor.
virtual void Add(TObject *obj)
Definition: TList.h:81
void Execute(const char *, const char *, int *=0)
Execute method on this object with the given parameter string, e.g.
Definition: TMethodCall.h:68
void SetParam(Long_t l)
Add a long method parameter.
TProofProgressStatus * GetProgressStatus()
Definition: TSlave.h:50
const Bool_t kTRUE
Definition: Rtypes.h:91
Bool_t IsValid() const
Return true if the method call has been properly initialized and is usable.