Logo ROOT   6.10/09
Reference Guide
TPacketizerAdaptive.cxx
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: Jan Iwaszkiewicz 11/12/06
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11 
12 /** \class TPacketizerAdaptive
13 \ingroup proofkernel
14 
15 This packetizer is based on TPacketizer but uses different
16 load-balancing algorithms and data structures.
17 Two main improvements in the load-balancing strategy:
18  - First one was to change the order in which the files are assigned
19  to the computing nodes in such a way that network transfers are
20  evenly distributed in the query time. Transfer of the remote files
21  was often becoming a bottleneck at the end of a query.
22  - The other improvement is the use of time-based packet size. We
23  measure the processing rate of all the nodes and calculate the
24  packet size, so that it takes certain amount of time. In this way
25  packetizer prevents the situation where the query can't finish
26  because of one slow node.
27 
28 The data structures: TFileStat, TFileNode and TSlaveStat are
29 enriched + changed and TFileNode::Compare method is changed.
30 
31 */
32 
33 #include "TPacketizerAdaptive.h"
34 
35 #include "Riostream.h"
36 #include "TDSet.h"
37 #include "TError.h"
38 #include "TEnv.h"
39 #include "TEntryList.h"
40 #include "TEventList.h"
41 #include "TMap.h"
42 #include "TMessage.h"
43 #include "TMonitor.h"
44 #include "TNtupleD.h"
45 #include "TObject.h"
46 #include "TParameter.h"
47 #include "TPerfStats.h"
48 #include "TProofDebug.h"
49 #include "TProof.h"
50 #include "TProofServ.h"
51 #include "TSlave.h"
52 #include "TSocket.h"
53 #include "TSortedList.h"
54 #include "TUrl.h"
55 #include "TClass.h"
56 #include "TRandom.h"
57 #include "TMath.h"
58 #include "TObjString.h"
59 #include "TList.h"
60 
61 //
62 // The following three utility classes manage the state of the
63 // work to be performed and the slaves involved in the process.
64 // A list of TFileNode(s) describes the hosts with files, each
65 // has a list of TFileStat(s) keeping the state for each TDSet
66 // element (file).
67 //
68 // The list of TSlaveStat(s) keep track of the work (being) done
69 // by each slave
70 //
71 
72 
73 //------------------------------------------------------------------------------
74 
75 class TPacketizerAdaptive::TFileStat : public TObject {
76 
77 private:
78  Bool_t fIsDone; // is this element processed
79  TFileNode *fNode; // my FileNode
80  TDSetElement *fElement; // location of the file and its range
81  Long64_t fNextEntry; // cursor in the range, -1 when done // needs changing
82 
83 public:
84  TFileStat(TFileNode *node, TDSetElement *elem, TList *file);
85 
86  Bool_t IsDone() const {return fIsDone;}
87  Bool_t IsSortable() const { return kTRUE; }
88  void SetDone() {fIsDone = kTRUE;}
89  TFileNode *GetNode() const {return fNode;}
90  TDSetElement *GetElement() const {return fElement;}
91  Long64_t GetNextEntry() const {return fNextEntry;}
92  void MoveNextEntry(Long64_t step) {fNextEntry += step;}
93 
94  // This method is used to keep a sorted list of remaining files to be processed
95  Int_t Compare(const TObject* obj) const
96  {
97  // Return -1 if elem.entries < obj.elem.entries, 0 if elem.entries equal
98  // and 1 if elem.entries < obj.elem.entries.
99  const TFileStat *fst = dynamic_cast<const TFileStat*>(obj);
100  if (fst && GetElement() && fst->GetElement()) {
101  Long64_t ent = GetElement()->GetNum();
102  Long64_t entfst = fst->GetElement()->GetNum();
103  if (ent > 0 && entfst > 0) {
104  if (ent > entfst) {
105  return 1;
106  } else if (ent < entfst) {
107  return -1;
108  } else {
109  return 0;
110  }
111  }
112  }
113  // No info: assume equal (no change in order)
114  return 0;
115  }
116  void Print(Option_t * = 0) const
117  { // Notify file name and entries
118  Printf("TFileStat: %s %lld", fElement ? fElement->GetName() : "---",
119  fElement ? fElement->GetNum() : -1);
120  }
121 };
122 
123 TPacketizerAdaptive::TFileStat::TFileStat(TFileNode *node, TDSetElement *elem, TList *files)
124  : fIsDone(kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
125 {
126  // Constructor: add to the global list
127  if (files) files->Add(this);
128 }
129 
130 //------------------------------------------------------------------------------
131 
132 // a class describing a file node as a part of a session
133 class TPacketizerAdaptive::TFileNode : public TObject {
134 
135 private:
136  TString fNodeName; // FQDN of the node
137  TList *fFiles; // TDSetElements (files) stored on this node
138  TObject *fUnAllocFileNext; // cursor in fFiles
139  TList *fActFiles; // files with work remaining
140  TObject *fActFileNext; // cursor in fActFiles
141  Int_t fMySlaveCnt; // number of slaves running on this node
142  // (which can process remote files)
143  Int_t fExtSlaveCnt; // number of external slaves processing
144  // files on this node
145  Int_t fRunSlaveCnt; // total number of slaves processing files
146  // on this node
147  Long64_t fProcessed; // number of events processed on this node
148  Long64_t fEvents; // number of entries in files on this node
149 
150  Int_t fStrategy; // 0 means the classic and 1 (default) - the adaptive strategy
151 
152  TSortedList *fFilesToProcess; // Global list of files (TFileStat) to be processed (owned by TPacketizer)
153 
154 public:
155  TFileNode(const char *name, Int_t strategy, TSortedList *files);
156  ~TFileNode() { delete fFiles; delete fActFiles; }
157 
158  void IncMySlaveCnt() { fMySlaveCnt++; }
159  Int_t GetMySlaveCnt() const { return fMySlaveCnt; }
160  void IncExtSlaveCnt(const char *slave) { if (fNodeName != slave) fExtSlaveCnt++; }
161  void DecExtSlaveCnt(const char *slave) { if (fNodeName != slave) fExtSlaveCnt--; R__ASSERT(fExtSlaveCnt >= 0); }
162  Int_t GetSlaveCnt() const { return fMySlaveCnt + fExtSlaveCnt; }
163  void IncRunSlaveCnt() { fRunSlaveCnt++; }
164  void DecRunSlaveCnt() { fRunSlaveCnt--; R__ASSERT(fRunSlaveCnt >= 0); }
165  Int_t GetRunSlaveCnt() const { return fRunSlaveCnt; }
166  Int_t GetExtSlaveCnt() const { return fExtSlaveCnt; }
167  Int_t GetNumberOfActiveFiles() const { return fActFiles->GetSize(); }
168  Bool_t IsSortable() const { return kTRUE; }
169  Int_t GetNumberOfFiles() { return fFiles->GetSize(); }
170  void IncProcessed(Long64_t nEvents)
171  { fProcessed += nEvents; }
172  Long64_t GetProcessed() const { return fProcessed; }
173  void DecreaseProcessed(Long64_t nEvents) { fProcessed -= nEvents; }
174  // this method is used by Compare() it adds 1, so it returns a number that
175  // would be true if one more slave is added.
176  Long64_t GetEventsLeftPerSlave() const
177  { return ((fEvents - fProcessed)/(fRunSlaveCnt + 1)); }
178  void IncEvents(Long64_t nEvents) { fEvents += nEvents; }
179  const char *GetName() const { return fNodeName.Data(); }
180  Long64_t GetNEvents() const { return fEvents; }
181 
182  void Print(Option_t * = 0) const
183  {
184  TFileStat *fs = 0;
185  TDSetElement *e = 0;
186  Int_t nn = 0;
187  Printf("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
188  Printf("+++ TFileNode: %s +++", fNodeName.Data());
189  Printf("+++ Evts: %lld (total: %lld) ", fProcessed, fEvents);
190  Printf("+++ Worker count: int:%d, ext: %d, tot:%d ", fMySlaveCnt, fExtSlaveCnt, fRunSlaveCnt);
191  Printf("+++ Files: %d ", fFiles ? fFiles->GetSize() : 0);
192  if (fFiles && fFiles->GetSize() > 0) {
193  TIter nxf(fFiles);
194  while ((fs = (TFileStat *) nxf())) {
195  if ((e = fs->GetElement())) {
196  Printf("+++ #%d: %s %lld - %lld (%lld) - next: %lld ", ++nn, e->GetName(),
197  e->GetFirst(), e->GetFirst() + e->GetNum() - 1, e->GetNum(), fs->GetNextEntry());
198  } else {
199  Printf("+++ #%d: no element! ", ++nn);
200  }
201  }
202  }
203  Printf("+++ Active files: %d ", fActFiles ? fActFiles->GetSize() : 0);
204  if (fActFiles && fActFiles->GetSize() > 0) {
205  TIter nxaf(fActFiles);
206  while ((fs = (TFileStat *) nxaf())) {
207  if ((e = fs->GetElement())) {
208  Printf("+++ #%d: %s %lld - %lld (%lld) - next: %lld", ++nn, e->GetName(),
209  e->GetFirst(), e->GetFirst() + e->GetNum() - 1, e->GetNum(), fs->GetNextEntry());
210  } else {
211  Printf("+++ #%d: no element! ", ++nn);
212  }
213  }
214  }
215  Printf("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
216  }
217 
218  void Add(TDSetElement *elem, Bool_t tolist)
219  {
220  TList *files = tolist ? (TList *)fFilesToProcess : (TList *)0;
221  TFileStat *f = new TFileStat(this, elem, files);
222  fFiles->Add(f);
223  if (fUnAllocFileNext == 0) fUnAllocFileNext = fFiles->First();
224  }
225 
226  TFileStat *GetNextUnAlloc()
227  {
228  TObject *next = fUnAllocFileNext;
229 
230  if (next != 0) {
231  // make file active
232  fActFiles->Add(next);
233  if (fActFileNext == 0) fActFileNext = fActFiles->First();
234 
235  // move cursor
236  fUnAllocFileNext = fFiles->After(fUnAllocFileNext);
237  }
238  return (TFileStat *) next;
239  }
240 
241  TFileStat *GetNextActive()
242  {
243  TObject *next = fActFileNext;
244 
245  if (fActFileNext != 0) {
246  fActFileNext = fActFiles->After(fActFileNext);
247  if (fActFileNext == 0) fActFileNext = fActFiles->First();
248  }
249 
250  return (TFileStat *) next;
251  }
252 
253  void RemoveActive(TFileStat *file)
254  {
255  if (fActFileNext == file) fActFileNext = fActFiles->After(file);
256  fActFiles->Remove(file);
257  if (fFilesToProcess) fFilesToProcess->Remove(file);
258  if (fActFileNext == 0) fActFileNext = fActFiles->First();
259  }
260 
261  Int_t Compare(const TObject *other) const
262  {
263  // Must return -1 if this is smaller than obj, 0 if objects are equal
264  // and 1 if this is larger than obj.
265  // smaller means more needing a new worker.
266  // Two cases are considered depending on
267  // relation between harddrive speed and network bandwidth.
268 
269  const TFileNode *obj = dynamic_cast<const TFileNode*>(other);
270  if (!obj) {
271  Error("Compare", "input is not a TPacketizer::TFileNode object");
272  return 0;
273  }
274 
275  // how many more events it has than obj
276 
277  if (fStrategy == 1) {
278  // The default adaptive strategy.
279  Int_t myVal = GetRunSlaveCnt();
280  Int_t otherVal = obj->GetRunSlaveCnt();
281  if (myVal < otherVal) {
282  return -1;
283  } else if (myVal > otherVal) {
284  return 1;
285  } else {
286  // if this has more events to process than obj
287  if ((fEvents - fProcessed) >
288  (obj->GetNEvents() - obj->GetProcessed())) {
289  return -1;
290  } else {
291  return 1;
292  }
293  }
294  } else {
295  Int_t myVal = GetSlaveCnt();
296  Int_t otherVal = obj->GetSlaveCnt();
297  if (myVal < otherVal) {
298  return -1;
299  } else if (myVal > otherVal) {
300  return 1;
301  } else {
302  return 0;
303  }
304  }
305  }
306 
307  void Reset()
308  {
309  fUnAllocFileNext = fFiles->First();
310  fActFiles->Clear();
311  fActFileNext = 0;
312  fExtSlaveCnt = 0;
313  fMySlaveCnt = 0;
314  fRunSlaveCnt = 0;
315  }
316 };
317 
318 
319 TPacketizerAdaptive::TFileNode::TFileNode(const char *name, Int_t strategy, TSortedList *files)
320  : fNodeName(name), fFiles(new TList), fUnAllocFileNext(0),
321  fActFiles(new TList), fActFileNext(0), fMySlaveCnt(0),
322  fExtSlaveCnt(0), fRunSlaveCnt(0), fProcessed(0), fEvents(0),
323  fStrategy(strategy), fFilesToProcess(files)
324 {
325  // Constructor
326 
327  fFiles->SetOwner();
328  fActFiles->SetOwner(kFALSE);
329 }
330 
331 //------------------------------------------------------------------------------
332 
333 class TPacketizerAdaptive::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
334 
335 friend class TPacketizerAdaptive;
336 
337 private:
338  TFileNode *fFileNode; // corresponding node or 0
339  TFileStat *fCurFile; // file currently being processed
340  TDSetElement *fCurElem; // TDSetElement currently being processed
341  Long64_t fCurProcessed; // events processed in the current file
342  Float_t fCurProcTime; // proc time spent on the current file
343  TList *fDSubSet; // packets processed by this worker
344 
345 public:
346  TSlaveStat(TSlave *slave);
347  ~TSlaveStat();
348  TFileNode *GetFileNode() const { return fFileNode; }
349  Long64_t GetEntriesProcessed() const { return fStatus?fStatus->GetEntries():-1; }
350  Double_t GetProcTime() const { return fStatus?fStatus->GetProcTime():-1; }
351  TFileStat *GetCurFile() { return fCurFile; }
352  void SetFileNode(TFileNode *node) { fFileNode = node; }
353  void UpdateRates(TProofProgressStatus *st);
354  Float_t GetAvgRate() { return fStatus->GetRate(); }
355  Float_t GetCurRate() {
356  return (fCurProcTime?fCurProcessed/fCurProcTime:0); }
357  Int_t GetLocalEventsLeft() {
358  return fFileNode?(fFileNode->GetEventsLeftPerSlave()):0; }
359  TList *GetProcessedSubSet() { return fDSubSet; }
360  TProofProgressStatus *GetProgressStatus() { return fStatus; }
362 };
363 
364 ////////////////////////////////////////////////////////////////////////////////
365 /// Constructor
366 
367 TPacketizerAdaptive::TSlaveStat::TSlaveStat(TSlave *slave)
368  : fFileNode(0), fCurFile(0), fCurElem(0),
369  fCurProcessed(0), fCurProcTime(0)
370 {
371  fDSubSet = new TList();
372  fDSubSet->SetOwner();
373  fSlave = slave;
374  fStatus = new TProofProgressStatus();
375  // The slave name is a special one in PROOF-Lite: avoid blocking on the DNS
376  // for non existing names
377  fWrkFQDN = slave->GetName();
378  if (strcmp(slave->ClassName(), "TSlaveLite")) {
379  fWrkFQDN = TUrl(fWrkFQDN).GetHostFQDN();
380  // Get full name for local hosts
381  if (fWrkFQDN.Contains("localhost") || fWrkFQDN == "127.0.0.1")
382  fWrkFQDN = TUrl(gSystem->HostName()).GetHostFQDN();
383  }
384  PDB(kPacketizer, 2)
385  Info("TSlaveStat", "wrk FQDN: %s", fWrkFQDN.Data());
386 }
387 
388 ////////////////////////////////////////////////////////////////////////////////
389 /// Cleanup
390 
391 TPacketizerAdaptive::TSlaveStat::~TSlaveStat()
392 {
393  SafeDelete(fDSubSet);
394  SafeDelete(fStatus);
395 }
396 
397 ////////////////////////////////////////////////////////////////////////////////
398 /// Update packetizer rates
399 
400 void TPacketizerAdaptive::TSlaveStat::UpdateRates(TProofProgressStatus *st)
401 {
402  if (!st) {
403  Error("UpdateRates", "no status object!");
404  return;
405  }
406  if (fCurFile->IsDone()) {
407  fCurProcTime = 0;
408  fCurProcessed = 0;
409  } else {
410  fCurProcTime += st->GetProcTime() - GetProcTime();
411  fCurProcessed += st->GetEntries() - GetEntriesProcessed();
412  }
413  fCurFile->GetNode()->IncProcessed(st->GetEntries() - GetEntriesProcessed());
414  st->SetLastEntries(st->GetEntries() - fStatus->GetEntries());
415  SafeDelete(fStatus);
416  fStatus = st;
417 }
418 
419 ////////////////////////////////////////////////////////////////////////////////
420 /// Add the current element to the fDSubSet (subset processed by this worker)
421 /// and if the status arg is given, then change the size of the packet.
422 /// return the difference (*st - *fStatus)
423 
424 TProofProgressStatus *TPacketizerAdaptive::TSlaveStat::AddProcessed(TProofProgressStatus *st)
425 {
426  if (st && fDSubSet && fCurElem) {
427  if (fCurElem->GetNum() != st->GetEntries() - GetEntriesProcessed())
428  fCurElem->SetNum(st->GetEntries() - GetEntriesProcessed());
429  fDSubSet->Add(fCurElem);
430  TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
431  return diff;
432  } else {
433  Error("AddProcessed", "processed subset of current elem undefined");
434  return 0;
435  }
436 }
437 
438 //------------------------------------------------------------------------------
439 
441 
442 ////////////////////////////////////////////////////////////////////////////////
443 /// Constructor
444 
446  Long64_t first, Long64_t num,
447  TList *input, TProofProgressStatus *st)
448  : TVirtualPacketizer(input, st)
449 {
450  PDB(kPacketizer,1) Info("TPacketizerAdaptive",
451  "enter (first %lld, num %lld)", first, num);
452 
453  // Init pointer members
454  fSlaveStats = 0;
455  fUnAllocated = 0;
456  fActive = 0;
457  fFileNodes = 0;
458  fMaxPerfIdx = 1;
460  fMaxEntriesRatio = 2.;
461 
462  fMaxSlaveCnt = -1;
463  fPacketAsAFraction = 4;
464  fStrategy = 1;
465  fFilesToProcess = new TSortedList;
466  fFilesToProcess->SetOwner(kFALSE);
467 
468  if (!fProgressStatus) {
469  Error("TPacketizerAdaptive", "No progress status");
470  return;
471  }
472 
473  // Attempt to synchronize the packet size with the tree cache size
474  Int_t cpsync = -1;
475  if (TProof::GetParameter(input, "PROOF_PacketizerCachePacketSync", cpsync) != 0) {
476  // Check if there is a global cache-packet sync setting
477  cpsync = gEnv->GetValue("Packetizer.CachePacketSync", 1);
478  }
479  if (cpsync >= 0) fCachePacketSync = (cpsync > 0) ? kTRUE : kFALSE;
480 
481  // Max file entries to avg allowed ratio for cache-to-packet synchronization
482  // (applies only if fCachePacketSync is true; -1. disables the bound)
483  if (TProof::GetParameter(input, "PROOF_PacketizerMaxEntriesRatio", fMaxEntriesRatio) != 0) {
484  // Check if there is a global ratio setting
485  fMaxEntriesRatio = gEnv->GetValue("Packetizer.MaxEntriesRatio", 2.);
486  }
487 
488  // The possibility to change packetizer strategy to the basic TPacketizer's
489  // one (in which workers always process their local data first).
490  Int_t strategy = -1;
491  if (TProof::GetParameter(input, "PROOF_PacketizerStrategy", strategy) != 0) {
492  // Check if there is a global strategy setting
493  strategy = gEnv->GetValue("Packetizer.Strategy", 1);
494  }
495  if (strategy == 0) {
496  fStrategy = 0;
497  Info("TPacketizerAdaptive", "using the basic strategy of TPacketizer");
498  } else if (strategy != 1) {
499  Warning("TPacketizerAdaptive", "unsupported strategy index (%d): ignore", strategy);
500  }
501 
502  Long_t maxSlaveCnt = 0;
503  if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", maxSlaveCnt) == 0) {
504  if (maxSlaveCnt < 0) {
505  Info("TPacketizerAdaptive",
506  "The value of PROOF_MaxSlavesPerNode must be positive");
507  maxSlaveCnt = 0;
508  }
509  } else {
510  // Try also with Int_t (recently supported in TProof::SetParameter)
511  Int_t mxslcnt = -1;
512  if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", mxslcnt) == 0) {
513  if (mxslcnt < 0) {
514  Info("TPacketizerAdaptive",
515  "The value of PROOF_MaxSlavesPerNode must be positive");
516  mxslcnt = 0;
517  }
518  maxSlaveCnt = (Long_t) mxslcnt;
519  }
520  }
521 
522  if (!maxSlaveCnt)
523  maxSlaveCnt = gEnv->GetValue("Packetizer.MaxWorkersPerNode", 0);
524  if (maxSlaveCnt > 0) {
525  fMaxSlaveCnt = maxSlaveCnt;
526  Info("TPacketizerAdaptive", "Setting max number of workers per node to %ld",
527  fMaxSlaveCnt);
528  }
529 
530  // if forceLocal parameter is set to 1 then eliminate the cross-worker
531  // processing;
532  // This minimizes the network usage on the PROOF cluser at the expense of
533  // longer jobs processing times.
534  // To process successfully the session must have workers with all the data!
536  Int_t forceLocal = 0;
537  if (TProof::GetParameter(input, "PROOF_ForceLocal", forceLocal) == 0) {
538  if (forceLocal == 1)
539  fForceLocal = kTRUE;
540  else
541  Info("TPacketizerAdaptive",
542  "The only accepted value of PROOF_ForceLocal parameter is 1 !");
543  }
544 
545  // Below we provide a possibility to change the way packet size is
546  // calculated or define the packet time directly.
547  // fPacketAsAFraction can be interpreted as follows:
548  // packet time is (expected job proc. time) / fPacketSizeAsAFraction.
549  // It substitutes 20 in the old formula to calculate the fPacketSize:
550  // fPacketSize = fTotalEntries / (20 * nslaves)
551  Int_t packetAsAFraction = 0;
552  if (TProof::GetParameter(input, "PROOF_PacketAsAFraction", packetAsAFraction) == 0) {
553  if (packetAsAFraction > 0) {
554  fPacketAsAFraction = packetAsAFraction;
555  Info("TPacketizerAdaptive",
556  "using alternate fraction of query time as a packet size: %d",
557  packetAsAFraction);
558  } else
559  Info("TPacketizerAdaptive", "packetAsAFraction parameter must be higher than 0");
560  }
561 
562  // Packet re-assignement
563  fTryReassign = 0;
564  Int_t tryReassign = 0;
565  if (TProof::GetParameter(input, "PROOF_TryReassign", tryReassign) != 0)
566  tryReassign = gEnv->GetValue("Packetizer.TryReassign", 0);
567  fTryReassign = tryReassign;
568  if (fTryReassign != 0)
569  Info("TPacketizerAdaptive", "failed packets will be re-assigned");
570 
571  // Save the config parameters in the dedicated list so that they will be saved
572  // in the outputlist and therefore in the relevant TQueryResult
573  fConfigParams->Add(new TParameter<Int_t>("PROOF_PacketizerCachePacketSync", (Int_t)fCachePacketSync));
574  fConfigParams->Add(new TParameter<Double_t>("PROOF_PacketizerMaxEntriesRatio", fMaxEntriesRatio));
575  fConfigParams->Add(new TParameter<Int_t>("PROOF_PacketizerStrategy", fStrategy));
576  fConfigParams->Add(new TParameter<Int_t>("PROOF_MaxWorkersPerNode", (Int_t)fMaxSlaveCnt));
577  fConfigParams->Add(new TParameter<Int_t>("PROOF_ForceLocal", (Int_t)fForceLocal));
578  fConfigParams->Add(new TParameter<Int_t>("PROOF_PacketAsAFraction", fPacketAsAFraction));
579 
580  Double_t baseLocalPreference = 1.2;
581  fBaseLocalPreference = (Float_t)baseLocalPreference;
582  if (TProof::GetParameter(input, "PROOF_BaseLocalPreference", baseLocalPreference) == 0)
583  fBaseLocalPreference = (Float_t)baseLocalPreference;
584 
585  fFileNodes = new TList;
586  fFileNodes->SetOwner();
587  fUnAllocated = new TList;
589  fActive = new TList;
591 
592  fValid = kTRUE;
593 
594  // Resolve end-point urls to optmize distribution
595  // dset->Lookup(); // moved to TProofPlayerRemote::Process
596 
597  // Read list of mounted disks
598  TObjArray *partitions = 0;
599  TString partitionsStr;
600  if (TProof::GetParameter(input, "PROOF_PacketizerPartitions", partitionsStr) != 0)
601  partitionsStr = gEnv->GetValue("Packetizer.Partitions", "");
602  if (!partitionsStr.IsNull()) {
603  Info("TPacketizerAdaptive", "Partitions: %s", partitionsStr.Data());
604  partitions = partitionsStr.Tokenize(",");
605  }
606 
607  // Split into per host and disk entries
608  dset->Reset();
609  TDSetElement *e;
610  while ((e = (TDSetElement*)dset->Next())) {
611 
612  if (e->GetValid()) continue;
613 
614  // The dataset name, if any
615  if (fDataSet.IsNull() && e->GetDataSet() && strlen(e->GetDataSet()))
616  fDataSet = e->GetDataSet();
617 
618  TUrl url = e->GetFileName();
619  PDB(kPacketizer,2)
620  Info("TPacketizerAdaptive", "element name: %s (url: %s)", e->GetFileName(), url.GetUrl());
621 
622  // Map non URL filenames to dummy host
623  TString host;
624  if ( !url.IsValid() ||
625  (strncmp(url.GetProtocol(),"root", 4) &&
626  strncmp(url.GetProtocol(),"rfio", 4) &&
627  strncmp(url.GetProtocol(),"file", 4)) ) {
628  host = "no-host";
629  } else if ( url.IsValid() && !strncmp(url.GetProtocol(),"file", 4)) {
630  host = "localhost";
631  url.SetProtocol("root");
632  } else {
633  host = url.GetHostFQDN();
634  }
635  // Get full name for local hosts
636  if (host.Contains("localhost") || host == "127.0.0.1") {
637  url.SetHost(gSystem->HostName());
638  host = url.GetHostFQDN();
639  }
640 
641  // Find on which disk is the file, if any
642  TString disk;
643  if (partitions) {
644  TIter iString(partitions);
645  TObjString* os = 0;
646  while ((os = (TObjString *)iString())) {
647  // Compare begining of the url with disk mountpoint
648  if (strncmp(url.GetFile(), os->GetName(), os->GetString().Length()) == 0) {
649  disk = os->GetName();
650  break;
651  }
652  }
653  }
654  // Node's url
655  TString nodeStr;
656  if (disk.IsNull())
657  nodeStr.Form("%s://%s", url.GetProtocol(), host.Data());
658  else
659  nodeStr.Form("%s://%s/%s", url.GetProtocol(), host.Data(), disk.Data());
660  TFileNode *node = (TFileNode *) fFileNodes->FindObject(nodeStr);
661 
662  if (node == 0) {
663  node = new TFileNode(nodeStr, fStrategy, fFilesToProcess);
664  fFileNodes->Add(node);
665  PDB(kPacketizer,2)
666  Info("TPacketizerAdaptive", "creating new node '%s' or the element", nodeStr.Data());
667  } else {
668  PDB(kPacketizer,2)
669  Info("TPacketizerAdaptive", "adding element to existing node '%s'", nodeStr.Data());
670  }
671 
672  node->Add(e, kFALSE);
673  }
674 
675  fSlaveStats = new TMap;
677 
678  TSlave *slave;
679  TIter si(slaves);
680  while ((slave = (TSlave*) si.Next())) {
681  fSlaveStats->Add( slave, new TSlaveStat(slave) );
682  fMaxPerfIdx = slave->GetPerfIdx() > fMaxPerfIdx ?
683  slave->GetPerfIdx() : fMaxPerfIdx;
684  }
685 
686  // Setup file & filenode structure
687  Reset();
688  // Optimize the number of files to be open when running on subsample
689  Int_t validateMode = 0;
690  Int_t gprc = TProof::GetParameter(input, "PROOF_ValidateByFile", validateMode);
691  Bool_t byfile = (gprc == 0 && validateMode > 0 && num > -1) ? kTRUE : kFALSE;
692  if (num > -1)
693  PDB(kPacketizer,2)
694  Info("TPacketizerAdaptive",
695  "processing subset of entries: validating by file? %s", byfile ? "yes": "no");
696  ValidateFiles(dset, slaves, num, byfile);
697 
698 
699  if (!fValid) return;
700 
701  // apply global range (first,num) to dset and rebuild structure
702  // ommitting TDSet elements that are not needed
703 
704  Int_t files = 0;
705  fTotalEntries = 0;
706  fUnAllocated->Clear(); // avoid dangling pointers
707  fActive->Clear();
708  fFileNodes->Clear(); // then delete all objects
709  PDB(kPacketizer,2)
710  Info("TPacketizerAdaptive",
711  "processing range: first %lld, num %lld", first, num);
712 
713  dset->Reset();
714  Long64_t cur = 0;
715  while (( e = (TDSetElement*)dset->Next())) {
716 
717  // Skip invalid or missing file; It will be moved
718  // from the dset to the 'MissingFiles' list in the player.
719  if (!e->GetValid()) continue;
720 
721  TUrl url = e->GetFileName();
722  Long64_t eFirst = e->GetFirst();
723  Long64_t eNum = e->GetNum();
724  PDB(kPacketizer,2)
725  Info("TPacketizerAdaptive", "processing element '%s'", e->GetFileName());
726  PDB(kPacketizer,2)
727  Info("TPacketizerAdaptive",
728  " --> first %lld, elenum %lld (cur %lld) (entrylist: %p)", eFirst, eNum, cur, e->GetEntryList());
729 
730  if (!e->GetEntryList()) {
731  // This element is before the start of the global range, skip it
732  if (cur + eNum < first) {
733  cur += eNum;
734  PDB(kPacketizer,2)
735  Info("TPacketizerAdaptive", " --> skip element cur %lld", cur);
736  continue;
737  }
738 
739  // This element is after the end of the global range, skip it
740  if (num != -1 && (first+num <= cur)) {
741  cur += eNum;
742  PDB(kPacketizer,2)
743  Info("TPacketizerAdaptive", " --> drop element cur %lld", cur);
744  continue; // break ??
745  }
746 
747  Bool_t inRange = kFALSE;
748  if (cur <= first || (num != -1 && (first+num <= cur+eNum))) {
749 
750  if (cur <= first) {
751  // If this element contains the start of the global range
752  // adjust its start and number of entries
753  e->SetFirst( eFirst + (first - cur) );
754  e->SetNum( e->GetNum() - (first - cur) );
755  PDB(kPacketizer,2)
756  Info("TPacketizerAdaptive", " --> adjust start %lld and end %lld",
757  eFirst + (first - cur), first + num - cur);
758  inRange = kTRUE;
759  }
760  if (num != -1 && (first+num <= cur+eNum)) {
761  // If this element contains the end of the global range
762  // adjust its number of entries
763  e->SetNum( first + num - e->GetFirst() - cur );
764  PDB(kPacketizer,2)
765  Info("TPacketizerAdaptive", " --> adjust end %lld", first + num - cur);
766  inRange = kTRUE;
767  }
768 
769  } else {
770  // Increment the counter ...
771  PDB(kPacketizer,2)
772  Info("TPacketizerAdaptive", " --> increment 'cur' by %lld", eNum);
773  cur += eNum;
774  }
775  // Re-adjust eNum and cur, if needed
776  if (inRange) {
777  cur += eNum;
778  eNum = e->GetNum();
779  }
780 
781  } else {
782  TEntryList *enl = dynamic_cast<TEntryList *>(e->GetEntryList());
783  if (enl) {
784  eNum = enl->GetN();
785  PDB(kPacketizer,2)
786  Info("TPacketizerAdaptive", " --> entry-list element: %lld entries", eNum);
787  } else {
788  TEventList *evl = dynamic_cast<TEventList *>(e->GetEntryList());
789  eNum = evl ? evl->GetN() : eNum;
790  PDB(kPacketizer,2)
791  Info("TPacketizerAdaptive", " --> event-list element: %lld entries (evl:%p)", eNum, evl);
792  }
793  if (!eNum) {
794  PDB(kPacketizer,2)
795  Info("TPacketizerAdaptive", " --> empty entry- or event-list element!");
796  continue;
797  }
798  }
799  PDB(kPacketizer,2)
800  Info("TPacketizerAdaptive", " --> next cur %lld", cur);
801 
802  // Map non URL filenames to dummy host
803  TString host;
804  if ( !url.IsValid() ||
805  (strncmp(url.GetProtocol(),"root", 4) &&
806  strncmp(url.GetProtocol(),"rfio", 4) &&
807  strncmp(url.GetProtocol(),"file", 4)) ) {
808  host = "no-host";
809  } else if ( url.IsValid() && !strncmp(url.GetProtocol(),"file", 4)) {
810  host = "localhost";
811  url.SetProtocol("root");
812  } else {
813  host = url.GetHostFQDN();
814  }
815  // Get full name for local hosts
816  if (host.Contains("localhost") || host == "127.0.0.1") {
817  url.SetHost(gSystem->HostName());
818  host = url.GetHostFQDN();
819  }
820 
821  // Find, on which disk is the file
822  TString disk;
823  if (partitions) {
824  TIter iString(partitions);
825  TObjString* os = 0;
826  while ((os = (TObjString *)iString())) {
827  // Compare begining of the url with disk mountpoint
828  if (strncmp(url.GetFile(), os->GetName(), os->GetString().Length()) == 0) {
829  disk = os->GetName();
830  break;
831  }
832  }
833  }
834  // Node's url
835  TString nodeStr;
836  if (disk.IsNull())
837  nodeStr.Form("%s://%s", url.GetProtocol(), host.Data());
838  else
839  nodeStr.Form("%s://%s/%s", url.GetProtocol(), host.Data(), disk.Data());
840  TFileNode *node = (TFileNode*) fFileNodes->FindObject(nodeStr);
841 
842 
843  if (node == 0) {
844  node = new TFileNode(nodeStr, fStrategy, fFilesToProcess);
845  fFileNodes->Add( node );
846  PDB(kPacketizer, 2)
847  Info("TPacketizerAdaptive", " --> creating new node '%s' for element", nodeStr.Data());
848  } else {
849  PDB(kPacketizer, 2)
850  Info("TPacketizerAdaptive", " --> adding element to exiting node '%s'", nodeStr.Data());
851  }
852 
853  ++files;
854  fTotalEntries += eNum;
855  node->Add(e, kTRUE);
856  node->IncEvents(eNum);
857  PDB(kPacketizer,2) e->Print("a");
858  }
859  PDB(kPacketizer,1)
860  Info("TPacketizerAdaptive", "processing %lld entries in %d files on %d hosts",
861  fTotalEntries, files, fFileNodes->GetSize());
862 
863  // Set the total number for monitoring
864  if (gPerfStats)
865  gPerfStats->SetNumEvents(fTotalEntries);
866 
867  Reset();
868 
869  InitStats();
870 
871  if (!fValid)
873 
874  PDB(kPacketizer,1) Info("TPacketizerAdaptive", "return");
875 }
876 
877 ////////////////////////////////////////////////////////////////////////////////
878 /// Destructor.
879 
881 {
882  if (fSlaveStats) {
884  }
885 
890  SafeDelete(fFilesToProcess);
891 }
892 
893 ////////////////////////////////////////////////////////////////////////////////
894 /// (re)initialise the statistics
895 /// called at the begining or after a worker dies.
896 
898 {
899  // calculating how many files from TDSet are not cached on
900  // any slave
901  Int_t noRemoteFiles = 0;
902  fNEventsOnRemLoc = 0;
903  Int_t totalNumberOfFiles = 0;
904  TIter next(fFileNodes);
905  while (TFileNode *fn = (TFileNode*)next()) {
906  totalNumberOfFiles += fn->GetNumberOfFiles();
907  if (fn->GetMySlaveCnt() == 0) {
908  noRemoteFiles += fn->GetNumberOfFiles();
909  fNEventsOnRemLoc += (fn->GetNEvents() - fn->GetProcessed());
910  }
911  }
912 
913  if (totalNumberOfFiles == 0) {
914  Info("InitStats", "no valid or non-empty file found: setting invalid");
915  // No valid files: set invalid and return
916  fValid = kFALSE;
917  return;
918  }
919 
920  fFractionOfRemoteFiles = (1.0 * noRemoteFiles) / totalNumberOfFiles;
921  Info("InitStats",
922  "fraction of remote files %f", fFractionOfRemoteFiles);
923 
924  if (!fValid)
926 
927  PDB(kPacketizer,1) Info("InitStats", "return");
928 }
929 
930 ////////////////////////////////////////////////////////////////////////////////
931 /// Get next unallocated file from 'node' or other nodes:
932 /// First try 'node'. If there is no more files, keep trying to
933 /// find an unallocated file on other nodes.
934 
935 TPacketizerAdaptive::TFileStat *TPacketizerAdaptive::GetNextUnAlloc(TFileNode *node, const char *nodeHostName)
936 {
937  TFileStat *file = 0;
938 
939  if (node != 0) {
940  PDB(kPacketizer, 2)
941  Info("GetNextUnAlloc", "looking for file on node %s", node->GetName());
942  file = node->GetNextUnAlloc();
943  if (file == 0) RemoveUnAllocNode(node);
944  } else {
945  if (nodeHostName && strlen(nodeHostName) > 0) {
946 
947  TFileNode *fn;
948  // Make sure that they are in the corrected order
949  fUnAllocated->Sort();
950  PDB(kPacketizer,2) fUnAllocated->Print();
951 
952  // Loop over unallocated fileNode list
953  for (int i = 0; i < fUnAllocated->GetSize(); i++) {
954 
955  if ((fn = (TFileNode *) fUnAllocated->At(i))) {
956  TUrl uu(fn->GetName());
957  PDB(kPacketizer, 2)
958  Info("GetNextUnAlloc", "comparing %s with %s...", nodeHostName, uu.GetHost());
959 
960  // Check, whether node's hostname is matching with current fileNode (fn)
961  if (!strcmp(nodeHostName, uu.GetHost())) {
962  node = fn;
963 
964  // Fetch next unallocated file from this node
965  if ((file = node->GetNextUnAlloc()) == 0) {
966  RemoveUnAllocNode(node);
967  node = 0;
968  } else {
969  PDB(kPacketizer, 2)
970  Info("GetNextUnAlloc", "found! (host: %s)", uu.GetHost());
971  break;
972  }
973  }
974  } else {
975  Warning("GetNextUnAlloc", "unallocate entry %d is empty!", i);
976  }
977  }
978 
979  if (node != 0 && fMaxSlaveCnt > 0 && node->GetExtSlaveCnt() >= fMaxSlaveCnt) {
980  // Unlike in TPacketizer we look at the number of ext slaves only.
981  PDB(kPacketizer,1)
982  Info("GetNextUnAlloc", "reached Workers-per-Node Limit (%ld)", fMaxSlaveCnt);
983  node = 0;
984  }
985  }
986 
987  if (node == 0) {
988  while (file == 0 && ((node = NextNode()) != 0)) {
989  PDB(kPacketizer, 2)
990  Info("GetNextUnAlloc", "looking for file on node %s", node->GetName());
991  if ((file = node->GetNextUnAlloc()) == 0) RemoveUnAllocNode(node);
992  }
993  }
994  }
995 
996  if (file != 0) {
997  // if needed make node active
998  if (fActive->FindObject(node) == 0) {
999  fActive->Add(node);
1000  }
1001  }
1002 
1003  PDB(kPacketizer, 2) {
1004  if (!file) {
1005  Info("GetNextUnAlloc", "no file found!");
1006  } else {
1007  file->Print();
1008  }
1009  }
1010 
1011  return file;
1012 }
1013 
1014 ////////////////////////////////////////////////////////////////////////////////
1015 /// Get next node which has unallocated files.
1016 /// the order is determined by TFileNode::Compare
1017 
1018 TPacketizerAdaptive::TFileNode *TPacketizerAdaptive::NextNode()
1019 {
1020  fUnAllocated->Sort();
1021  PDB(kPacketizer,2) {
1022  fUnAllocated->Print();
1023  }
1024 
1025  TFileNode *fn = (TFileNode*) fUnAllocated->First();
1026  if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetExtSlaveCnt() >= fMaxSlaveCnt) {
1027  // unlike in TPacketizer we look at the number of ext slaves only.
1028  PDB(kPacketizer,1)
1029  Info("NextNode", "reached Workers-per-Node Limit (%ld)", fMaxSlaveCnt);
1030  fn = 0;
1031  }
1032 
1033  return fn;
1034 }
1035 
1036 ////////////////////////////////////////////////////////////////////////////////
1037 /// Remove unallocated node.
1038 
1040 {
1041  fUnAllocated->Remove(node);
1042 }
1043 
1044 ////////////////////////////////////////////////////////////////////////////////
1045 /// Get next active file.
1046 
1047 TPacketizerAdaptive::TFileStat *TPacketizerAdaptive::GetNextActive()
1048 {
1049  TFileNode *node;
1050  TFileStat *file = 0;
1051 
1052  while (file == 0 && ((node = NextActiveNode()) != 0)) {
1053  file = node->GetNextActive();
1054  if (file == 0) RemoveActiveNode(node);
1055  }
1056 
1057  return file;
1058 }
1059 
1060 
1061 ////////////////////////////////////////////////////////////////////////////////
1062 /// Get next active node.
1063 
1064 TPacketizerAdaptive::TFileNode *TPacketizerAdaptive::NextActiveNode()
1065 {
1066  fActive->Sort();
1067  PDB(kPacketizer,2) {
1068  Info("NextActiveNode", "enter");
1069  fActive->Print();
1070  }
1071 
1072  TFileNode *fn = (TFileNode*) fActive->First();
1073  // look at only ext slaves
1074  if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetExtSlaveCnt() >= fMaxSlaveCnt) {
1075  PDB(kPacketizer,1)
1076  Info("NextActiveNode","reached Workers-per-Node limit (%ld)", fMaxSlaveCnt);
1077  fn = 0;
1078  }
1079 
1080  return fn;
1081 }
1082 
1083 ////////////////////////////////////////////////////////////////////////////////
1084 /// Remove file from the list of actives.
1085 
1087 {
1088  TFileNode *node = file->GetNode();
1089 
1090  node->RemoveActive(file);
1091  if (node->GetNumberOfActiveFiles() == 0) RemoveActiveNode(node);
1092 }
1093 
1094 ////////////////////////////////////////////////////////////////////////////////
1095 /// Remove node from the list of actives.
1096 
1098 {
1099  fActive->Remove(node);
1100 }
1101 
1102 ////////////////////////////////////////////////////////////////////////////////
1103 /// Reset the internal data structure for packet distribution.
1104 
1106 {
1107  fUnAllocated->Clear();
1109 
1110  fActive->Clear();
1111 
1112  TIter files(fFileNodes);
1113  TFileNode *fn;
1114  while ((fn = (TFileNode*) files.Next()) != 0) {
1115  fn->Reset();
1116  }
1117 
1118  TIter slaves(fSlaveStats);
1119  TObject *key;
1120  while ((key = slaves.Next()) != 0) {
1121  TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(key);
1122  if (!slstat) {
1123  Warning("Reset", "TSlaveStat associated to key '%s' is NULL", key->GetName());
1124  continue;
1125  }
1126  // Find out which file nodes are on the worker machine and assign the
1127  // one with less workers assigned
1128  TFileNode *fnmin = 0;
1129  Int_t fncnt = fSlaveStats->GetSize();
1130  files.Reset();
1131  while ((fn = (TFileNode*) files.Next()) != 0) {
1132  if (!strcmp(slstat->GetName(), TUrl(fn->GetName()).GetHost())) {
1133  if (fn->GetMySlaveCnt() < fncnt) {
1134  fnmin = fn;
1135  fncnt = fn->GetMySlaveCnt();
1136  }
1137  }
1138  }
1139  if (fnmin != 0 ) {
1140  slstat->SetFileNode(fnmin);
1141  fnmin->IncMySlaveCnt();
1142  PDB(kPacketizer, 2)
1143  Info("Reset","assigning node '%s' to '%s' (cnt: %d)",
1144  fnmin->GetName(), slstat->GetName(), fnmin->GetMySlaveCnt());
1145  }
1146  slstat->fCurFile = 0;
1147  }
1148 }
1149 
1150 ////////////////////////////////////////////////////////////////////////////////
1151 /// Check existence of file/dir/tree an get number of entries.
1152 /// Assumes the files have been setup.
1153 
1155  Long64_t maxent, Bool_t byfile)
1156 {
1157  TMap slaves_by_sock;
1158  TMonitor mon;
1159  TList workers;
1160 
1161 
1162  // Setup the communication infrastructure
1163 
1164  workers.AddAll(slaves);
1165  TIter si(slaves);
1166  TSlave *slm;
1167  while ((slm = (TSlave*)si.Next()) != 0) {
1168  PDB(kPacketizer,3)
1169  Info("ValidateFiles","socket added to monitor: %p (%s)",
1170  slm->GetSocket(), slm->GetName());
1171  mon.Add(slm->GetSocket());
1172  slaves_by_sock.Add(slm->GetSocket(), slm);
1173  }
1174 
1175  mon.DeActivateAll();
1176 
1177  ((TProof*)gProof)->DeActivateAsyncInput();
1178 
1179  // Some monitoring systems (TXSocketHandler) need to know this
1180  ((TProof*)gProof)->fCurrentMonitor = &mon;
1181 
1182  // Identify the type
1183  if (!strcmp(dset->GetType(), "TTree")) SetBit(TVirtualPacketizer::kIsTree);
1184 
1185  // Preparing for client notification
1186  TString msg("Validating files");
1187  UInt_t n = 0;
1188  UInt_t tot = dset->GetListOfElements()->GetSize();
1189  Bool_t st = kTRUE;
1190 
1191  Long64_t totent = 0, nopenf = 0;
1192  while (kTRUE) {
1193 
1194  // send work
1195  while (TSlave *s = (TSlave *)workers.First()) {
1196 
1197  workers.Remove(s);
1198 
1199  // find a file
1200 
1201  TSlaveStat *slstat = (TSlaveStat*)fSlaveStats->GetValue(s);
1202  if (!slstat) {
1203  Error("ValidateFiles", "TSlaveStat associated to slave '%s' is NULL", s->GetName());
1204  continue;
1205  }
1206 
1207  TFileNode *node = 0;
1208  TFileStat *file = 0;
1209 
1210  // try its own node first
1211  if ((node = slstat->GetFileNode()) != 0) {
1212  PDB(kPacketizer,3) node->Print();
1213  file = GetNextUnAlloc(node);
1214  if (file == 0)
1215  slstat->SetFileNode(0);
1216  }
1217 
1218  // look for a file on any other node if necessary
1219  if (file == 0)
1220  file = GetNextUnAlloc();
1221 
1222  if (file != 0) {
1223  // files are done right away
1224  RemoveActive(file);
1225 
1226  slstat->fCurFile = file;
1227  TDSetElement *elem = file->GetElement();
1228  Long64_t entries = elem->GetEntries(kTRUE, kFALSE);
1229  if (entries < 0 || strlen(elem->GetTitle()) <= 0) {
1230  // This is decremented when we get the reply
1231  file->GetNode()->IncExtSlaveCnt(slstat->GetName());
1233  m << dset->IsTree()
1234  << TString(elem->GetFileName())
1235  << TString(elem->GetDirectory())
1236  << TString(elem->GetObjName());
1237 
1238  s->GetSocket()->Send( m );
1239  mon.Activate(s->GetSocket());
1240  PDB(kPacketizer,2)
1241  Info("ValidateFiles",
1242  "sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s",
1243  s->GetOrdinal(), s->GetName(), s->GetSocket(),
1244  dset->IsTree() ? "tree" : "objects", elem->GetFileName(),
1245  elem->GetDirectory(), elem->GetObjName());
1246  } else {
1247  // Fill the info
1248  elem->SetTDSetOffset(entries);
1249  if (entries > 0) {
1250  // Most likely valid
1251  elem->SetValid();
1252  if (!elem->GetEntryList()) {
1253  if (elem->GetFirst() > entries) {
1254  Error("ValidateFiles",
1255  "first (%lld) higher then number of entries (%lld) in %s",
1256  elem->GetFirst(), entries, elem->GetFileName());
1257  // disable element
1258  slstat->fCurFile->SetDone();
1259  elem->Invalidate();
1260  dset->SetBit(TDSet::kSomeInvalid);
1261  }
1262  if (elem->GetNum() == -1) {
1263  elem->SetNum(entries - elem->GetFirst());
1264  } else if (elem->GetFirst() + elem->GetNum() > entries) {
1265  Warning("ValidateFiles", "num (%lld) + first (%lld) larger then number of"
1266  " keys/entries (%lld) in %s", elem->GetNum(), elem->GetFirst(),
1267  entries, elem->GetFileName());
1268  elem->SetNum(entries - elem->GetFirst());
1269  }
1270  PDB(kPacketizer,2)
1271  Info("ValidateFiles",
1272  "found elem '%s' with %lld entries", elem->GetFileName(), entries);
1273  }
1274  }
1275  // Count
1276  totent += entries;
1277  nopenf++;
1278  // Notify the client
1279  n++;
1280  gProof->SendDataSetStatus(msg, n, tot, st);
1281 
1282  // This worker is ready for the next validation
1283  workers.Add(s);
1284  }
1285  }
1286  }
1287 
1288  // Check if there is anything to wait for
1289  if (mon.GetActive() == 0) {
1290  if (byfile && maxent > 0) {
1291  // How many files do we still need ?
1292  Long64_t nrestf = (maxent - totent) * nopenf / totent ;
1293  if (nrestf <= 0 && maxent > totent) nrestf = 1;
1294  if (nrestf > 0) {
1295  PDB(kPacketizer,3)
1296  Info("ValidateFiles", "{%lld, %lld, %lld}: needs to validate %lld more files",
1297  maxent, totent, nopenf, nrestf);
1298  si.Reset();
1299  while ((slm = (TSlave *) si.Next()) && nrestf--) {
1300  workers.Add(slm);
1301  }
1302  continue;
1303  } else {
1304  PDB(kPacketizer,3)
1305  Info("ValidateFiles", "no need to validate more files");
1306  break;
1307  }
1308  } else {
1309  break;
1310  }
1311  }
1312 
1313  PDB(kPacketizer,3) {
1314  Info("ValidateFiles", "waiting for %d slaves:", mon.GetActive());
1315  TList *act = mon.GetListOfActives();
1316  TIter next(act);
1317  while (TSocket *s = (TSocket*) next()) {
1318  TSlave *sl = (TSlave *) slaves_by_sock.GetValue(s);
1319  if (sl)
1320  Info("ValidateFiles", " worker-%s (%s)",
1321  sl->GetOrdinal(), sl->GetName());
1322  }
1323  delete act;
1324  }
1325 
1326  TSocket *sock = mon.Select();
1327  // If we have been interrupted break
1328  if (!sock) {
1329  Error("ValidateFiles", "selection has been interrupted - STOP");
1330  mon.DeActivateAll();
1331  fValid = kFALSE;
1332  break;
1333  }
1334  mon.DeActivate(sock);
1335 
1336  PDB(kPacketizer,3) Info("ValidateFiles", "select returned: %p", sock);
1337 
1338  TSlave *slave = (TSlave *) slaves_by_sock.GetValue( sock );
1339  if (!sock->IsValid()) {
1340  // A socket got invalid during validation
1341  Error("ValidateFiles", "worker-%s (%s) got invalid - STOP",
1342  slave->GetOrdinal(), slave->GetName());
1343  ((TProof*)gProof)->MarkBad(slave, "socket got invalid during validation");
1344  fValid = kFALSE;
1345  break;
1346  }
1347 
1348  TMessage *reply;
1349 
1350  if (sock->Recv(reply) <= 0) {
1351  // Notify
1352  Error("ValidateFiles", "Recv failed! for worker-%s (%s)",
1353  slave->GetOrdinal(), slave->GetName());
1354  // Help! lost a slave? ('slave' is deleted inside here ...)
1355  ((TProof*)gProof)->MarkBad(slave, "receive failed during validation");
1356  fValid = kFALSE;
1357  continue;
1358  }
1359 
1360  if (reply->What() != kPROOF_GETENTRIES) {
1361  // Not what we want: handover processing to the central machinery
1362  Int_t what = reply->What();
1363  ((TProof*)gProof)->HandleInputMessage(slave, reply);
1364  if (what == kPROOF_FATAL) {
1365  Error("ValidateFiles", "kPROOF_FATAL from worker-%s (%s)",
1366  slave->GetOrdinal(), slave->GetName());
1367  fValid = kFALSE;
1368  } else {
1369  // Reactivate the socket
1370  mon.Activate(sock);
1371  }
1372  // Get next message
1373  continue;
1374  }
1375 
1376  TSlaveStat *slavestat = (TSlaveStat*) fSlaveStats->GetValue( slave );
1377  TDSetElement *e = slavestat->fCurFile->GetElement();
1378  slavestat->fCurFile->GetNode()->DecExtSlaveCnt(slavestat->GetName());
1379  Long64_t entries;
1380 
1381  (*reply) >> entries;
1382 
1383  // Extract object name, if there
1384  if ((reply->BufferSize() > reply->Length())) {
1385  TString objname;
1386  (*reply) >> objname;
1387  e->SetTitle(objname);
1388  }
1389 
1390  e->SetTDSetOffset(entries);
1391  if (entries > 0) {
1392 
1393  // This dataset element is most likely valid
1394  e->SetValid();
1395 
1396  if (!e->GetEntryList()) {
1397  if (e->GetFirst() > entries) {
1398  Error("ValidateFiles",
1399  "first (%lld) higher then number of entries (%lld) in %s",
1400  e->GetFirst(), entries, e->GetFileName());
1401 
1402  // Invalidate the element
1403  slavestat->fCurFile->SetDone();
1404  e->Invalidate();
1405  dset->SetBit(TDSet::kSomeInvalid);
1406  }
1407 
1408  if (e->GetNum() == -1) {
1409  e->SetNum(entries - e->GetFirst());
1410  } else if (e->GetFirst() + e->GetNum() > entries) {
1411  Error("ValidateFiles",
1412  "num (%lld) + first (%lld) larger then number of keys/entries (%lld) in %s",
1413  e->GetNum(), e->GetFirst(), entries, e->GetFileName());
1414  e->SetNum(entries - e->GetFirst());
1415  }
1416  }
1417 
1418  // Count
1419  totent += entries;
1420  nopenf++;
1421 
1422  // Notify the client
1423  n++;
1424  gProof->SendDataSetStatus(msg, n, tot, st);
1425 
1426  } else {
1427 
1428  Error("ValidateFiles", "cannot get entries for file: %s - skipping", e->GetFileName() );
1429  //
1430  // Need to fix this with a user option to allow incomplete file sets (rdm)
1431  //
1432  //fValid = kFALSE; // all element must be readable!
1433  if (gProofServ) {
1435  m << TString(Form("Cannot get entries for file: %s - skipping",
1436  e->GetFileName()));
1437  gProofServ->GetSocket()->Send(m);
1438  }
1439 
1440  // invalidate element
1441  e->Invalidate();
1442  dset->SetBit(TDSet::kSomeInvalid);
1443  }
1444  PDB(kPacketizer,3) Info("ValidateFiles", " %lld events validated", totent);
1445 
1446  // Ready for the next job, unless we have enough files
1447  if (maxent < 0 || ((totent < maxent) && !byfile))
1448  workers.Add(slave);
1449  }
1450 
1451  // report std. output from slaves??
1452 
1453  ((TProof*)gProof)->ActivateAsyncInput();
1454 
1455  // This needs to be reset
1456  ((TProof*)gProof)->fCurrentMonitor = 0;
1457 
1458  // No reason to continue if invalid
1459  if (!fValid)
1460  return;
1461 
1462  // compute the offset for each file element
1463  Long64_t offset = 0;
1464  Long64_t newOffset = 0;
1465  TIter next(dset->GetListOfElements());
1466  TDSetElement *el;
1467  while ( (el = dynamic_cast<TDSetElement*> (next())) ) {
1468  if (el->GetValid()) {
1469  newOffset = offset + el->GetTDSetOffset();
1470  el->SetTDSetOffset(offset);
1471  offset = newOffset;
1472  }
1473  }
1474 }
1475 
1476 ////////////////////////////////////////////////////////////////////////////////
1477 /// The result depends on the fStrategy
1478 
1480 {
1481  Long64_t num;
1482  if (fStrategy == 0) {
1483  // TPacketizer's heuristic for starting packet size
1484  // Constant packet size;
1485  Int_t nslaves = fSlaveStats->GetSize();
1486  if (nslaves > 0) {
1487  num = fTotalEntries / (fPacketAsAFraction * nslaves);
1488  } else {
1489  num = 1;
1490  }
1491  } else {
1492  // The dynamic heuristic for setting the packet size (default)
1493  // Calculates the packet size based on performance of this slave
1494  // and estimated time left until the end of the query.
1495  TSlaveStat* slstat = (TSlaveStat*)slStatPtr;
1496  Float_t rate = slstat->GetCurRate();
1497  if (!rate)
1498  rate = slstat->GetAvgRate();
1499  if (rate) {
1500 
1501  // Global average rate
1502  Float_t avgProcRate = (GetEntriesProcessed()/(GetCumProcTime() / fSlaveStats->GetSize()));
1503  Float_t packetTime = ((fTotalEntries - GetEntriesProcessed())/avgProcRate)/fPacketAsAFraction;
1504 
1505  // Bytes-to-Event conversion
1506  Float_t bevt = (GetEntriesProcessed() > 0) ? GetBytesRead() / GetEntriesProcessed() : -1.;
1507 
1508  // Make sure it is not smaller then the cache, if the info is available and the size
1509  // synchronization is required. But apply the cache-packet size synchronization only if there
1510  // are enough left files to process and the files are all of similar sizes. Otherwise we risk
1511  // to not exploit optimally all potentially active workers.
1512  Bool_t cpsync = fCachePacketSync;
1513  if (fMaxEntriesRatio > 0. && cpsync) {
1514  if (fFilesToProcess && fFilesToProcess->GetSize() <= fSlaveStats->GetSize()) {
1515  Long64_t remEntries = fTotalEntries - GetEntriesProcessed();
1516  Long64_t maxEntries = -1;
1517  if (fFilesToProcess->Last()) {
1518  TDSetElement *elem = (TDSetElement *) ((TPacketizerAdaptive::TFileStat *) fFilesToProcess->Last())->GetElement();
1519  if (elem) maxEntries = elem->GetNum();
1520  }
1521  if (maxEntries > remEntries / fSlaveStats->GetSize() * fMaxEntriesRatio) {
1522  PDB(kPacketizer,3) {
1523  Info("CalculatePacketSize", "%s: switching off synchronization of packet and cache sizes:", slstat->GetOrdinal());
1524  Info("CalculatePacketSize", "%s: few files (%d) remaining of very different sizes (max/avg = %.2f > %.2f)",
1525  slstat->GetOrdinal(), fFilesToProcess->GetSize(),
1526  (Double_t)maxEntries / remEntries * fSlaveStats->GetSize(), fMaxEntriesRatio);
1527  }
1528  cpsync = kFALSE;
1529  }
1530  }
1531  }
1532  if (bevt > 0. && cachesz > 0 && cpsync) {
1533  if ((Long64_t)(rate * packetTime * bevt) < cachesz)
1534  packetTime = cachesz / bevt / rate;
1535  }
1536 
1537  // Apply min-max again, if required
1538  if (fMaxPacketTime > 0. && packetTime > fMaxPacketTime) packetTime = fMaxPacketTime;
1539  if (fMinPacketTime > 0. && packetTime < fMinPacketTime) packetTime = fMinPacketTime;
1540 
1541  // Translate the packet length in number of entries
1542  num = (Long64_t)(rate * packetTime);
1543 
1544  // Notify
1545  PDB(kPacketizer,2)
1546  Info("CalculatePacketSize","%s: avgr: %f, rate: %f, left: %lld, pacT: %f, sz: %f (csz: %f), num: %lld",
1547  slstat->GetOrdinal(), avgProcRate, rate, fTotalEntries - GetEntriesProcessed(),
1548  packetTime, ((bevt > 0) ? num*bevt/1048576. : -1.), cachesz/1048576., num);
1549 
1550  } else {
1551  // First packet for this worker in this query
1552  // Twice the learning phase
1553  num = (learnent > 0) ? 5 * learnent : 1000;
1554 
1555  // Notify
1556  PDB(kPacketizer,2)
1557  Info("CalculatePacketSize","%s: num: %lld", slstat->GetOrdinal(), num);
1558  }
1559  }
1560  if (num < 1) num = 1;
1561  return num;
1562 }
1563 
1564 ////////////////////////////////////////////////////////////////////////////////
1565 /// To be used by GetNextPacket but also in reaction to kPROOF_STOPPROCESS
1566 /// message (when the worker was asked to stop processing during a packet).
1567 /// returns the #entries intended in the last packet - #processed entries
1568 
1570  TProofProgressStatus *status,
1571  Double_t latency,
1572  TList **listOfMissingFiles)
1573 {
1574  // find slave
1575  TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
1576  if (!slstat) {
1577  Error("AddProcessed", "%s: TSlaveStat instance for worker %s not found!",
1578  (sl ? sl->GetOrdinal() : "x.x"),
1579  (sl ? sl->GetName() : "**undef**"));
1580  return -1;
1581  }
1582 
1583  // update stats & free old element
1584 
1585  if ( slstat->fCurElem != 0 ) {
1586  Long64_t expectedNumEv = slstat->fCurElem->GetNum();
1587  // Calculate the number of events processed in the last packet
1588  Long64_t numev;
1589  if (status && status->GetEntries() > 0)
1590  numev = status->GetEntries() - slstat->GetEntriesProcessed();
1591  else
1592  numev = 0;
1593 
1594  // Calculate the progress made in the last packet
1595  TProofProgressStatus *progress = 0;
1596  if (numev > 0) {
1597  // This also moves the pointer in the corrsponding TFileInfo
1598  progress = slstat->AddProcessed(status);
1599  if (progress) {
1600  (*fProgressStatus) += *progress;
1601  // update processing rate
1602  slstat->UpdateRates(status);
1603  }
1604  } else {
1605  progress = new TProofProgressStatus();
1606  }
1607  if (progress) {
1608  PDB(kPacketizer,2)
1609  Info("AddProcessed", "%s: %s: %lld %7.3lf %7.3lf %7.3lf %lld",
1610  sl->GetOrdinal(), sl->GetName(), progress->GetEntries(), latency,
1611  progress->GetProcTime(), progress->GetCPUTime(), progress->GetBytesRead());
1612 
1613  if (gPerfStats)
1614  gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(),
1615  slstat->fCurElem->GetFileName(),
1616  progress->GetEntries(),
1617  latency,
1618  progress->GetProcTime(),
1619  progress->GetCPUTime(),
1620  progress->GetBytesRead());
1621  delete progress;
1622  }
1623  if (numev != expectedNumEv) {
1624  // The last packet was not fully processed
1625  // and will be split in two:
1626  // - The completed part was marked as done.
1627  // - Create a new packet with the part to be resubmitted.
1628  TDSetElement *newPacket = new TDSetElement(*(slstat->fCurElem));
1629  if (newPacket && numev < expectedNumEv) {
1630  Long64_t first = newPacket->GetFirst();
1631  newPacket->SetFirst(first + numev);
1632  if (ReassignPacket(newPacket, listOfMissingFiles) == -1)
1633  SafeDelete(newPacket);
1634  } else
1635  Error("AddProcessed", "%s: processed too much? (%lld, %lld)",
1636  sl->GetOrdinal(), numev, expectedNumEv);
1637 
1638  // TODO: a signal handler which will send info from the worker
1639  // after a packet fails.
1640  /* Add it to the failed packets list.
1641  if (!fFailedPackets) {
1642  fFailedPackets = new TList();
1643  }
1644  fFailedPackets->Add(slstat->fCurElem);
1645  */
1646  }
1647 
1648  slstat->fCurElem = 0;
1649  return (expectedNumEv - numev);
1650  } else {
1651  // the kPROOF_STOPPRPOCESS message is send after the worker receives zero
1652  // as the reply to kPROOF_GETNEXTPACKET
1653  return -1;
1654  }
1655 }
1656 
1657 ////////////////////////////////////////////////////////////////////////////////
1658 /// Get next packet;
1659 /// A meaningfull difference to TPacketizer is the fact that this
1660 /// packetizer, for each worker, tries to predict whether the worker
1661 /// will finish processing it's local files before the end of the query.
1662 /// If yes, it allocates, to those workers, files from non-slave filenodes
1663 /// or from slaves that are overloaded. The check is done every time a new
1664 /// file needs to be assigned.
1665 
1667 {
1668  if ( !fValid ) {
1669  return 0;
1670  }
1671 
1672  // find slave
1673 
1674  TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
1675  if (!slstat) {
1676  Error("GetNextPacket", "TSlaveStat instance for worker %s not found!",
1677  (sl ? sl->GetName() : "**undef**"));
1678  return 0;
1679  }
1680 
1681  // Attach to current file
1682  TFileStat *file = slstat->fCurFile;
1683 
1684  // Update stats & free old element
1685 
1686  Bool_t firstPacket = kFALSE;
1687  Long64_t cachesz = -1;
1688  Int_t learnent = -1;
1689  if ( slstat->fCurElem != 0 ) {
1690 
1691  Long64_t restEntries = 0;
1692  Double_t latency, proctime, proccpu;
1693  TProofProgressStatus *status = 0;
1694  Bool_t fileNotOpen = kFALSE, fileCorrupted = kFALSE;
1695 
1696  if (sl->GetProtocol() > 18) {
1697 
1698  (*r) >> latency;
1699  (*r) >> status;
1700 
1701  if (sl->GetProtocol() > 25) {
1702  (*r) >> cachesz >> learnent;
1703  if (r->BufferSize() > r->Length()) (*r) >> restEntries;
1704  }
1705  fileNotOpen = status->TestBit(TProofProgressStatus::kFileNotOpen) ? kTRUE : kFALSE;
1706  fileCorrupted = status->TestBit(TProofProgressStatus::kFileCorrupted) ? kTRUE : kFALSE;
1707 
1708  } else {
1709 
1710  Long64_t bytesRead = -1;
1711 
1712  (*r) >> latency >> proctime >> proccpu;
1713  // only read new info if available
1714  if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
1715  if (r->BufferSize() > r->Length()) (*r) >> restEntries;
1716  Long64_t totev = 0;
1717  if (r->BufferSize() > r->Length()) (*r) >> totev;
1718 
1719  status = new TProofProgressStatus(totev, bytesRead, -1, proctime, proccpu);
1720  fileNotOpen = (restEntries < 0) ? kTRUE : kFALSE;
1721  }
1722 
1723  if (!fileNotOpen && !fileCorrupted) {
1724  if (AddProcessed(sl, status, latency) != 0)
1725  Error("GetNextPacket", "%s: the worker processed a different # of entries", sl->GetOrdinal());
1728  Error("GetNextPacket", "%s: processed too many entries! (%lld, %lld)",
1730  // Send last timer message and stop the timer
1731  HandleTimer(0);
1733  }
1734  } else {
1735  if (file) {
1736  if (file->GetElement()) {
1737  if (fileCorrupted) {
1738  Info("GetNextPacket", "%s: file '%s' turned corrupted: invalidating file (%lld)",
1739  sl->GetOrdinal(), file->GetElement()->GetName(), restEntries);
1740  Int_t nunproc = AddProcessed(sl, status, latency);
1741  PDB(kPacketizer,1)
1742  Info("GetNextPacket", "%s: %d entries un-processed", sl->GetOrdinal(), nunproc);
1743  // Remaining to be processed
1744  Long64_t num = 0;
1745  if (file->GetElement()->TestBit(TDSetElement::kCorrupted)) {
1746  // Add the remainign entries in the packet to the ones already registered
1747  num = file->GetElement()->GetEntries() + restEntries;
1748  } else {
1749  // First call: add the remaining entries in the packet and those of the file
1750  // not yet assigned
1751  Long64_t rest = file->GetElement()->GetEntries() - file->GetNextEntry();
1752  num = restEntries + rest;
1753  }
1754  file->GetElement()->SetEntries(num);
1755  PDB(kPacketizer,1)
1756  Info("GetNextPacket", "%s: removed file: %s, entries left: %lld", sl->GetOrdinal(),
1757  file->GetElement()->GetName(), file->GetElement()->GetEntries());
1758  // Flag as corrupted
1759  file->GetElement()->SetBit(TDSetElement::kCorrupted);
1760  } else {
1761  Info("GetNextPacket", "%s: file '%s' could not be open: invalidating related element",
1762  sl->GetOrdinal(), file->GetElement()->GetName());
1763  }
1764  // Invalidate the element
1765  file->GetElement()->Invalidate();
1766  // Add it to the failed packets list
1767  if (!fFailedPackets) fFailedPackets = new TList();
1768  if (!fFailedPackets->FindObject(file->GetElement()))
1769  fFailedPackets->Add(file->GetElement());
1770  }
1771  // Deactivate this TFileStat
1772  file->SetDone();
1773  RemoveActive(file);
1774  } else {
1775  Info("GetNextPacket", "%s: error raised by worker, but TFileStat object invalid:"
1776  " protocol error?", sl->GetOrdinal());
1777  }
1778  }
1779  } else {
1780  firstPacket = kTRUE;
1781  }
1782 
1783  if ( fStop ) {
1784  HandleTimer(0);
1785  return 0;
1786  }
1787 
1788  TString nodeName;
1789  if (file != 0) nodeName = file->GetNode()->GetName();
1790  TString nodeHostName(slstat->GetName());
1791 
1792  PDB(kPacketizer,3)
1793  Info("GetNextPacket", "%s: entries processed: %lld - looking for a packet from node '%s'",
1794  sl->GetOrdinal(), fProgressStatus->GetEntries(), nodeName.Data());
1795 
1796  // If current file is just finished
1797  if ( file != 0 && file->IsDone() ) {
1798  file->GetNode()->DecExtSlaveCnt(slstat->GetName());
1799  file->GetNode()->DecRunSlaveCnt();
1800  if (gPerfStats)
1801  gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(), file->GetNode()->GetName(),
1802  file->GetElement()->GetFileName(), kFALSE);
1803  file = 0;
1804  }
1805  // Reset the current file field
1806  slstat->fCurFile = file;
1807 
1808  Long64_t avgEventsLeftPerSlave =
1811  return 0;
1812  // Get a file if needed
1813  if ( file == 0) {
1814  // Needs a new file
1815  Bool_t openLocal;
1816  // Aiming for localPreference == 1 when #local == #remote events left
1817  Float_t localPreference = fBaseLocalPreference - (fNEventsOnRemLoc /
1818  (0.4 *(fTotalEntries - fProgressStatus->GetEntries())));
1819  if ( slstat->GetFileNode() != 0 ) {
1820  // Local file node exists and has more events to process.
1821  fUnAllocated->Sort();
1822  TFileNode* firstNonLocalNode = (TFileNode*)fUnAllocated->First();
1823  Bool_t nonLocalNodePossible;
1824  if (fForceLocal)
1825  nonLocalNodePossible = 0;
1826  else
1827  nonLocalNodePossible = firstNonLocalNode ?
1828  (fMaxSlaveCnt < 0 || (fMaxSlaveCnt > 0 && firstNonLocalNode->GetExtSlaveCnt() < fMaxSlaveCnt))
1829  : 0;
1830  openLocal = !nonLocalNodePossible;
1831  Float_t slaveRate = slstat->GetAvgRate();
1832  if ( nonLocalNodePossible && fStrategy == 1) {
1833  // OpenLocal is set to kFALSE
1834  if ( slstat->GetFileNode()->GetRunSlaveCnt() >
1835  slstat->GetFileNode()->GetMySlaveCnt() - 1 )
1836  // External slaves help slstat -> don't open nonlocal files
1837  // -1 because, at this point slstat is not running
1838  openLocal = kTRUE;
1839  else if ( slaveRate == 0 ) { // first file for this slave
1840  // GetLocalEventsLeft() counts the potential slave
1841  // as running on its fileNode.
1842  if ( slstat->GetLocalEventsLeft() * localPreference
1843  > (avgEventsLeftPerSlave))
1844  openLocal = kTRUE;
1845  else if ( (firstNonLocalNode->GetEventsLeftPerSlave())
1846  < slstat->GetLocalEventsLeft() * localPreference )
1847  openLocal = kTRUE;
1848  else if ( firstNonLocalNode->GetExtSlaveCnt() > 1 )
1849  openLocal = kTRUE;
1850  else if ( firstNonLocalNode->GetRunSlaveCnt() == 0 )
1851  openLocal = kTRUE;
1852  } else {
1853  // At this point slstat has a non zero avg rate > 0
1854  Float_t slaveTime = slstat->GetLocalEventsLeft()/slaveRate;
1855  // And thus fCumProcTime, fProcessed > 0
1856  Float_t avgTime = avgEventsLeftPerSlave
1858  if (slaveTime * localPreference > avgTime)
1859  openLocal = kTRUE;
1860  else if ((firstNonLocalNode->GetEventsLeftPerSlave())
1861  < slstat->GetLocalEventsLeft() * localPreference)
1862  openLocal = kTRUE;
1863  }
1864  }
1865  if (openLocal || fStrategy == 0) {
1866  // Try its own node
1867  file = slstat->GetFileNode()->GetNextUnAlloc();
1868  if (!file)
1869  file = slstat->GetFileNode()->GetNextActive();
1870  if ( file == 0 ) {
1871  // No more files on this worker
1872  slstat->SetFileNode(0);
1873  }
1874  }
1875  }
1876 
1877  // Try to find an unused filenode first
1878  if(file == 0 && !fForceLocal)
1879  file = GetNextUnAlloc(0, nodeHostName);
1880 
1881  // Then look at the active filenodes
1882  if(file == 0 && !fForceLocal)
1883  file = GetNextActive();
1884 
1885  if (file == 0) return 0;
1886 
1887  PDB(kPacketizer,3) if (fFilesToProcess) fFilesToProcess->Print();
1888 
1889  slstat->fCurFile = file;
1890  // if remote and unallocated file
1891  if (file->GetNode()->GetMySlaveCnt() == 0 &&
1892  file->GetElement()->GetFirst() == file->GetNextEntry()) {
1893  fNEventsOnRemLoc -= file->GetElement()->GetNum();
1894  if (fNEventsOnRemLoc < 0) {
1895  Error("GetNextPacket",
1896  "inconsistent value for fNEventsOnRemLoc (%lld): stop delivering packets!",
1898  return 0;
1899  }
1900  }
1901  file->GetNode()->IncExtSlaveCnt(slstat->GetName());
1902  file->GetNode()->IncRunSlaveCnt();
1903  if (gPerfStats)
1904  gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(),
1905  file->GetNode()->GetName(),
1906  file->GetElement()->GetFileName(), kTRUE);
1907  }
1908 
1909  Long64_t num = CalculatePacketSize(slstat, cachesz, learnent);
1910 
1911  // Get a packet
1912 
1913  TDSetElement *base = file->GetElement();
1914  Long64_t first = file->GetNextEntry();
1915  Long64_t last = base->GetFirst() + base->GetNum();
1916 
1917  // If the remaining part is smaller than the (packetsize * 1.5)
1918  // then increase the packetsize
1919 
1920  if ( first + num * 1.5 >= last ) {
1921  num = last - first;
1922  file->SetDone(); // done
1923  // Delete file from active list (unalloc list is single pass, no delete needed)
1924  RemoveActive(file);
1925  }
1926 
1927  // Update NextEntry in the file object
1928  file->MoveNextEntry(num);
1929 
1930  slstat->fCurElem = CreateNewPacket(base, first, num);
1931  if (base->GetEntryList())
1932  slstat->fCurElem->SetEntryList(base->GetEntryList(), first, num);
1933 
1934  // Flag the first packet of a new run (dataset)
1935  if (firstPacket)
1936  slstat->fCurElem->SetBit(TDSetElement::kNewRun);
1937  else
1938  slstat->fCurElem->ResetBit(TDSetElement::kNewRun);
1939 
1940  PDB(kPacketizer,2)
1941  Info("GetNextPacket","%s: %s %lld %lld (%lld)", sl->GetOrdinal(), base->GetFileName(), first, first + num - 1, num);
1942 
1943  return slstat->fCurElem;
1944 }
1945 
1946 ////////////////////////////////////////////////////////////////////////////////
1947 /// Return the number of workers still processing
1948 
1950 {
1951  Int_t actw = 0;
1952  TIter nxw(fSlaveStats);
1953  TObject *key;
1954  while ((key = nxw())) {
1955  TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1956  if (wrkstat && wrkstat->fCurFile) actw++;
1957  }
1958  // Done
1959  return actw;
1960 }
1961 
1962 ////////////////////////////////////////////////////////////////////////////////
1963 /// Get Estimation of the current rate; just summing the current rates of
1964 /// the active workers
1965 
1967 {
1968  all = kTRUE;
1969  // Loop over the workers
1970  Float_t currate = 0.;
1971  if (fSlaveStats && fSlaveStats->GetSize() > 0) {
1972  TIter nxw(fSlaveStats);
1973  TObject *key;
1974  while ((key = nxw()) != 0) {
1975  TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1976  if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
1977  // Sum-up the current rates
1978  currate += slstat->GetProgressStatus()->GetCurrentRate();
1979  } else {
1980  all = kFALSE;
1981  }
1982  }
1983  }
1984  // Done
1985  return currate;
1986 }
1987 
1988 ////////////////////////////////////////////////////////////////////////////////
1989 /// Get estimation for the number of processed entries and bytes read at time t,
1990 /// based on the numbers already processed and the latests worker measured speeds.
1991 /// If t <= 0 the current time is used.
1992 /// Only the estimation for the entries is currently implemented.
1993 /// This is needed to smooth the instantaneous rate plot.
1994 
1996  Long64_t &bytes, Long64_t &calls)
1997 {
1998  // Default value
1999  ent = GetEntriesProcessed();
2000  bytes = GetBytesRead();
2001  calls = GetReadCalls();
2002 
2003  // Parse option
2004  if (fUseEstOpt == kEstOff)
2005  // Do not use estimation
2006  return 0;
2007  Bool_t current = (fUseEstOpt == kEstCurrent) ? kTRUE : kFALSE;
2008 
2009  TTime tnow = gSystem->Now();
2010  Double_t now = (t > 0) ? (Double_t)t : Long64_t(tnow) / (Double_t)1000.;
2011  Double_t dt = -1;
2012 
2013  // Loop over the workers
2014  Bool_t all = kTRUE;
2015  Float_t trate = 0.;
2016  if (fSlaveStats && fSlaveStats->GetSize() > 0) {
2017  ent = 0;
2018  TIter nxw(fSlaveStats);
2019  TObject *key;
2020  while ((key = nxw()) != 0) {
2021  TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
2022  if (slstat) {
2023  // Those surely processed
2024  Long64_t e = slstat->GetEntriesProcessed();
2025  if (e <= 0) all = kFALSE;
2026  // Time elapsed since last update
2027  dt = now - slstat->GetProgressStatus()->GetLastUpdate();
2028  // Add estimated entries processed since last update
2029  Float_t rate = (current && slstat->GetCurRate() > 0) ? slstat->GetCurRate()
2030  : slstat->GetAvgRate();
2031  trate += rate;
2032  // Add estimated entries processed since last update
2033  e += (Long64_t) (dt * rate);
2034  // Add to the total
2035  ent += e;
2036  // Notify
2037  PDB(kPacketizer,3)
2038  Info("GetEstEntriesProcessed","%s: e:%lld rate:%f dt:%f e:%lld",
2039  slstat->fSlave->GetOrdinal(),
2040  slstat->GetEntriesProcessed(), rate, dt, e);
2041  }
2042  }
2043  }
2044  // Notify
2045  dt = now - fProgressStatus->GetLastUpdate();
2046  PDB(kPacketizer,2)
2047  Info("GetEstEntriesProcessed",
2048  "dt: %f, estimated entries: %lld (%lld), bytes read: %lld rate: %f (all: %d)",
2049  dt, ent, GetEntriesProcessed(), bytes, trate, all);
2050 
2051  // Check values
2052  ent = (ent > 0) ? ent : fProgressStatus->GetEntries();
2053  ent = (ent <= fTotalEntries) ? ent : fTotalEntries;
2054  bytes = (bytes > 0) ? bytes : fProgressStatus->GetBytesRead();
2055 
2056  // Done
2057  return ((all) ? 0 : 1);
2058 }
2059 
2060 ////////////////////////////////////////////////////////////////////////////////
2061 /// This method can be called at any time during processing
2062 /// as an effect of handling kPROOF_STOPPROCESS
2063 /// If the output list from this worker is going to be sent back to the master,
2064 /// the 'status' includes the number of entries processed by the slave.
2065 /// From this we calculate the remaining part of the packet.
2066 /// 0 indicates that the results from that worker were lost completely.
2067 /// Assume that the filenodes for which we have a TFileNode object
2068 /// are still up and running.
2069 
2071  TList **listOfMissingFiles)
2072 {
2073  TSlaveStat *slaveStat = (TSlaveStat *)(fSlaveStats->GetValue(s));
2074  if (!slaveStat) {
2075  Error("MarkBad", "Worker does not exist");
2076  return;
2077  }
2078  // Update worker counters
2079  if (slaveStat->fCurFile && slaveStat->fCurFile->GetNode()) {
2080  slaveStat->fCurFile->GetNode()->DecExtSlaveCnt(slaveStat->GetName());
2081  slaveStat->fCurFile->GetNode()->DecRunSlaveCnt();
2082  }
2083 
2084  // If status is defined, the remaining part of the last packet is
2085  // reassigned in AddProcessed called from handling kPROOF_STOPPROCESS
2086  if (!status) {
2087  // Get the subset processed by the bad worker.
2088  TList *subSet = slaveStat->GetProcessedSubSet();
2089  if (subSet) {
2090  // Take care of the current packet
2091  if (slaveStat->fCurElem) {
2092  subSet->Add(slaveStat->fCurElem);
2093  }
2094  // Merge overlapping or subsequent elements
2095  Int_t nmg = 0, ntries = 100;
2096  TDSetElement *e = 0, *enxt = 0;
2097  do {
2098  nmg = 0;
2099  e = (TDSetElement *) subSet->First();
2100  while ((enxt = (TDSetElement *) subSet->After(e))) {
2101  if (e->MergeElement(enxt) >= 0) {
2102  nmg++;
2103  subSet->Remove(enxt);
2104  delete enxt;
2105  } else {
2106  e = enxt;
2107  }
2108  }
2109  } while (nmg > 0 && --ntries > 0);
2110  // reassign the packets assigned to the bad slave and save the size;
2111  SplitPerHost(subSet, listOfMissingFiles);
2112  // the elements were reassigned so should not be deleted
2113  subSet->SetOwner(0);
2114  } else {
2115  Warning("MarkBad", "subset processed by bad worker not found!");
2116  }
2117  (*fProgressStatus) -= *(slaveStat->GetProgressStatus());
2118  }
2119  // remove slavestat from the map
2120  fSlaveStats->Remove(s);
2121  delete slaveStat;
2122  // recalculate fNEventsOnRemLoc and others
2123  InitStats();
2124 }
2125 
2126 ////////////////////////////////////////////////////////////////////////////////
2127 /// The file in the listOfMissingFiles can appear several times;
2128 /// in order to fix that, a TDSetElement::Merge method is needed.
2129 
2131  TList **listOfMissingFiles)
2132 {
2133  if (!e) {
2134  Error("ReassignPacket", "empty packet!");
2135  return -1;
2136  }
2137  // Check the old filenode
2138  TUrl url = e->GetFileName();
2139  // Check the host from which 'e' was previously read.
2140  // Map non URL filenames to dummy host
2141  TString host;
2142  if ( !url.IsValid() ||
2143  (strncmp(url.GetProtocol(),"root", 4) &&
2144  strncmp(url.GetProtocol(),"rfio", 4))) {
2145  host = "no-host";
2146  } else {
2147  host = url.GetHost();
2148  }
2149 
2150  // If accessible add it back to the old node
2151  // and do DecProcessed
2152  TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
2153  if (node && fTryReassign) {
2154  // The packet 'e' was processing data from this node.
2155  node->DecreaseProcessed(e->GetNum());
2156  // The file should be already in fFilesToProcess ...
2157  node->Add(e, kFALSE);
2158  if (!fUnAllocated->FindObject(node))
2159  fUnAllocated->Add(node);
2160  return 0;
2161  } else {
2162  // Add to the list of missing files
2163  TFileInfo *fi = e->GetFileInfo();
2164  if (listOfMissingFiles && *listOfMissingFiles)
2165  (*listOfMissingFiles)->Add((TObject *)fi);
2166  return -1;
2167  }
2168 }
2169 
2170 ////////////////////////////////////////////////////////////////////////////////
2171 /// Split into per host entries
2172 /// The files in the listOfMissingFiles can appear several times;
2173 /// in order to fix that, a TDSetElement::Merge method is needed.
2174 
2176  TList **listOfMissingFiles)
2177 {
2178  if (!elements) {
2179  Error("SplitPerHost", "Empty list of packets!");
2180  return;
2181  }
2182  if (elements->GetSize() <= 0) {
2183  Error("SplitPerHost", "The input list contains no elements");
2184  return;
2185  }
2186  TIter subSetIter(elements);
2187  TDSetElement *e;
2188  while ((e = (TDSetElement*) subSetIter.Next())) {
2189  if (ReassignPacket(e, listOfMissingFiles) == -1) {
2190  // Remove from the list in order to delete it.
2191  if (elements->Remove(e))
2192  Error("SplitPerHost", "Error removing a missing file");
2193  delete e;
2194  }
2195 
2196  }
2197 }
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:47
virtual Int_t AddProcessed(TSlave *, TProofProgressStatus *, Double_t, TList **)
void MarkBad(TSlave *s, TProofProgressStatus *status, TList **missingFiles)
This method can be called at any time during processing as an effect of handling kPROOF_STOPPROCESS I...
virtual Bool_t IsValid() const
Definition: TSocket.h:146
An array of TObjects.
Definition: TObjArray.h:37
TFileInfo * GetFileInfo(const char *type="TTree")
Return the content of this element in the form of a TFileInfo.
Definition: TDSet.cxx:212
Long64_t GetTDSetOffset() const
Definition: TDSet.h:128
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:847
void Reset()
Reset the internal data structure for packet distribution.
Long64_t GetEntries(Bool_t istree=kTRUE, Bool_t openfile=kTRUE)
Returns number of entries in tree or objects in file.
Definition: TDSet.cxx:410
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition: TMonitor.cxx:438
long long Long64_t
Definition: RtypesCore.h:69
virtual Long64_t GetN() const
Definition: TEntryList.h:75
Int_t fPacketAsAFraction
Definition: TPacketizer.h:60
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
Definition: TUrl.cxx:520
Collectable string class.
Definition: TObjString.h:28
float Float_t
Definition: RtypesCore.h:53
const char Option_t
Definition: RtypesCore.h:62
void SplitPerHost(TList *elements, TList **listOfMissingFiles)
Split into per host entries The files in the listOfMissingFiles can appear several times; in order to...
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:520
void SetTDSetOffset(Long64_t offset)
Definition: TDSet.h:129
This class represents a WWW compatible URL.
Definition: TUrl.h:35
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition: TProof.cxx:9890
Bool_t TestBit(UInt_t f) const
Definition: TObject.h:159
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition: TSocket.cxx:818
const char * GetProtocol() const
Definition: TUrl.h:67
This class implements a data set to be used for PROOF processing.
Definition: TDSet.h:151
Long64_t GetReadCalls() const
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
Bool_t GetValid() const
Definition: TDSet.h:119
Long64_t GetBytesRead() const
TSocket * GetSocket() const
Definition: TProofServ.h:257
Double_t GetProcTime() const
void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent=-1, Bool_t byfile=kFALSE)
Check existence of file/dir/tree an get number of entries.
Float_t GetProcTime() const
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
Definition: TList.cxx:585
virtual void AddAll(const TCollection *col)
Add all objects from collection col to this collection.
Definition: TCollection.cxx:58
Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls)
Get estimation for the number of processed entries and bytes read at time t, based on the numbers alr...
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection)...
Definition: TMap.cxx:53
#define R__ASSERT(e)
Definition: TError.h:96
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor&#39;s active list.
Definition: TMonitor.cxx:168
Long64_t GetFirst() const
Definition: TDSet.h:112
const char * GetOrdinal() const
Definition: TSlave.h:131
Basic string class.
Definition: TString.h:129
void RemoveActiveNode(TFileNode *)
Remove node from the list of actives.
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
virtual void Print(Option_t *option="") const
This method must be overridden when a class wants to print itself.
Definition: TObject.cxx:543
const char * GetName() const
Returns name of object.
Definition: TSlave.h:124
Basic time type with millisecond precision.
Definition: TTime.h:27
virtual void DeActivateAll()
De-activate all activated sockets.
Definition: TMonitor.cxx:302
Long64_t GetEntries() const
Int_t GetProtocol() const
Definition: TSlave.h:133
Bool_t IsValid() const
Definition: TUrl.h:82
TList * GetListOfElements() const
Definition: TDSet.h:229
void Reset()
Definition: TCollection.h:156
Double_t GetCumProcTime() const
const char * GetHostFQDN() const
Return fully qualified domain name of url host.
Definition: TUrl.cxx:469
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:687
const char * GetUrl(Bool_t withDeflt=kFALSE) const
Return full URL.
Definition: TUrl.cxx:387
virtual ~TPacketizerAdaptive()
Destructor.
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:501
Int_t GetActiveWorkers()
Return the number of workers still processing.
Double_t GetLastUpdate() const
TFileNode * NextNode()
Get next node which has unallocated files.
Int_t Length() const
Definition: TBuffer.h:94
virtual void Sort(Bool_t order=kSortAscending)
Sort linked list.
Definition: TList.cxx:775
const char * GetFile() const
Definition: TUrl.h:72
Manages an element of a TDSet.
Definition: TDSet.h:66
const char * GetHost() const
Definition: TUrl.h:70
#define SafeDelete(p)
Definition: RConfig.h:499
virtual const char * ClassName() const
Returns name of class to which the object belongs.
Definition: TObject.cxx:135
TFileStat * GetNextActive()
Get next active file.
TDSetElement * CreateNewPacket(TDSetElement *base, Long64_t first, Long64_t num)
Creates a new TDSetElement from from base packet starting from the first entry with num entries...
Long64_t GetNum() const
Definition: TDSet.h:114
#define PDB(mask, level)
Definition: TProofDebug.h:56
Int_t GetPerfIdx() const
Definition: TSlave.h:132
TSocket * GetSocket() const
Definition: TSlave.h:134
virtual void DeActivate(TSocket *sock)
De-activate a socket.
Definition: TMonitor.cxx:284
Int_t AddProcessed(TSlave *sl, TProofProgressStatus *st, Double_t latency, TList **listOfMissingFiles=0)
To be used by GetNextPacket but also in reaction to kPROOF_STOPPROCESS message (when the worker was a...
virtual Int_t GetN() const
Definition: TEventList.h:56
const char * GetDirectory() const
Return directory where to look for object.
Definition: TDSet.cxx:234
A sorted doubly linked list.
Definition: TSortedList.h:28
std::vector< std::vector< double > > Data
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet; A meaningfull difference to TPacketizer is the fact that this packetizer, for each worker, tries to predict whether the worker will finish processing it&#39;s local files before the end of the query.
TList * fFileNodes
Definition: TPacketizer.h:49
void Reset()
Reset the internal datastructure for packet distribution.
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
TProofProgressStatus * fProgressStatus
This packetizer is based on TPacketizer but uses different load-balancing algorithms and data structu...
A doubly linked list.
Definition: TList.h:43
const char * GetObjName() const
Definition: TDSet.h:120
const char * GetName() const
Returns name of object.
Definition: TObjString.h:39
TList * fUnAllocated
Definition: TPacketizer.h:50
void SetLastEntries(Long64_t entries)
const int nEvents
Definition: testRooFit.cxx:42
Named parameter, streamable and storable.
Definition: TParameter.h:37
const TString & GetString() const
Definition: TObjString.h:47
void RemoveUnAllocNode(TFileNode *)
Remove unallocated node.
virtual TTime Now()
Get current time in milliseconds since 0:00 Jan 1 1995.
Definition: TSystem.cxx:470
void SendDataSetStatus(const char *msg, UInt_t n, UInt_t tot, Bool_t st)
Send or notify data set status.
Definition: TProof.cxx:9308
void RemoveActive(TFileStat *file)
Remove file from the list of actives.
Bool_t IsTree() const
Definition: TDSet.h:223
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition: TList.cxx:561
TRandom2 r(17)
virtual Bool_t IsSortable() const
Definition: TObject.h:120
R__EXTERN TSystem * gSystem
Definition: TSystem.h:539
Int_t fMaxPerfIdx
Definition: TPacketizer.h:56
TObject * Remove(TObject *key)
Remove the (key,value) pair with key from the map.
Definition: TMap.cxx:295
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Definition: TEnv.cxx:482
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition: TList.cxx:679
TObject * Next()
Definition: TCollection.h:153
Int_t CalculatePacketSize(TObject *slstat, Long64_t cachesz, Int_t learnent)
The result depends on the fStrategy.
TFileNode * NextActiveNode()
Get next active node.
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition: TString.cxx:2332
TList * GetListOfActives() const
Returns a list with all active sockets.
Definition: TMonitor.cxx:498
unsigned int UInt_t
Definition: RtypesCore.h:42
TMarker * m
Definition: textangle.C:8
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:873
char * Form(const char *fmt,...)
Ssiz_t Length() const
Definition: TString.h:388
void Invalidate()
Definition: TDSet.h:134
Int_t ReassignPacket(TDSetElement *e, TList **listOfMissingFiles)
The file in the listOfMissingFiles can appear several times; in order to fix that, a TDSetElement::Merge method is needed.
A TEventList object is a list of selected events (entries) in a TTree.
Definition: TEventList.h:31
TFileStat * GetNextUnAlloc(TFileNode *node=0)
Get next unallocated file.
virtual TObject * After(const TObject *obj) const
Returns the object after object obj.
Definition: TList.cxx:293
virtual Int_t Compare(const TObject *obj) const
Compare abstract method.
Definition: TObject.cxx:166
virtual TObject * At(Int_t idx) const
Returns the object at position idx. Returns 0 if idx is out of range.
Definition: TList.cxx:315
Int_t MergeElement(TDSetElement *elem)
Check if &#39;elem&#39; is overlapping or subsequent and, if the case, return a merged element.
Definition: TDSet.cxx:163
virtual void Activate(TSocket *sock)
Activate a de-activated socket.
Definition: TMonitor.cxx:250
void Print(Option_t *options="") const
Print a TDSetElement. When option="a" print full data.
Definition: TDSet.cxx:242
Long_t fMaxSlaveCnt
Definition: TPacketizer.h:58
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap...
Definition: TMap.cxx:150
#define Printf
Definition: TGeoToOCC.h:18
#define gPerfStats
const Bool_t kFALSE
Definition: RtypesCore.h:92
void InitStats()
(re)initialise the statistics called at the begining or after a worker dies.
UInt_t What() const
Definition: TMessage.h:74
void SetHost(const char *host)
Definition: TUrl.h:87
long Long_t
Definition: RtypesCore.h:50
The packetizer is a load balancing object created for each query.
R__EXTERN TProof * gProof
Definition: TProof.h:1081
void Add(THist< DIMENSIONS, PRECISION_TO, STAT_TO... > &to, const THist< DIMENSIONS, PRECISION_FROM, STAT_FROM... > &from)
Add two histograms.
Definition: THist.hxx:336
TObjArray * Tokenize(const TString &delim) const
This function is used to isolate sequential tokens in a TString.
Definition: TString.cxx:2251
#define ClassImp(name)
Definition: Rtypes.h:336
TObject * GetEntryList() const
Definition: TDSet.h:131
double f(double x)
void SetValid()
Definition: TDSet.h:135
double Double_t
Definition: RtypesCore.h:55
virtual const char * HostName()
Return the system&#39;s host name.
Definition: TSystem.cxx:310
Long64_t GetEntriesProcessed() const
Double_t GetCPUTime() const
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:40
R__EXTERN TEnv * gEnv
Definition: TEnv.h:170
TFileStat * GetNextUnAlloc(TFileNode *node=0, const char *nodeHostName=0)
Get next unallocated file from &#39;node&#39; or other nodes: First try &#39;node&#39;.
This class controls a Parallel ROOT Facility, PROOF, cluster.
Definition: TProof.h:320
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition: TString.h:572
void SetNum(Long64_t num)
Definition: TDSet.h:118
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
Definition: TRolke.cxx:630
const char * GetFileName() const
Definition: TDSet.h:111
Int_t BufferSize() const
Definition: TBuffer.h:92
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Definition: TList.cxx:353
const char * GetType() const
Definition: TDSet.h:226
Bool_t IsNull() const
Definition: TString.h:385
void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent=-1, Bool_t byfile=kFALSE)
Check existence of file/dir/tree an get number of entries.
Mother of all ROOT objects.
Definition: TObject.h:37
Long64_t GetBytesRead() const
const char * GetDataSet() const
Definition: TDSet.h:122
R__EXTERN TProofServ * gProofServ
Definition: TProofServ.h:347
virtual void Add(TObject *obj)
Definition: TList.h:77
Definition: file.py:1
void RemoveActiveNode(TFileNode *)
Remove node from the list of actives.
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Definition: TMap.cxx:235
void RemoveUnAllocNode(TFileNode *)
Remove unallocated node.
TList * fActive
Definition: TPacketizer.h:51
Class describing a generic file including meta information.
Definition: TFileInfo.h:38
const int strategy
Definition: testNdimFit.cxx:46
Definition: first.py:1
const int nn
virtual void Print(Option_t *option="") const
Default print for collections, calls Print(option, 1).
virtual const char * GetName() const
Returns name of object.
Definition: TObject.cxx:364
virtual Int_t GetSize() const
Definition: TCollection.h:89
Class describing a PROOF worker server.
Definition: TSlave.h:46
TFileStat * GetNextActive()
Get next active file.
Container class for processing statistics.
void SetFirst(Long64_t first)
Definition: TDSet.h:113
virtual void SetTitle(const char *title="")
Set the title of the TNamed.
Definition: TNamed.cxx:155
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:25
const Bool_t kTRUE
Definition: RtypesCore.h:91
const Int_t n
Definition: legend1.C:16
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:859
virtual const char * GetTitle() const
Returns title of object.
Definition: TNamed.h:48
void RemoveActive(TFileStat *file)
Remove file from the list of actives.
const char * Data() const
Definition: TString.h:347
TFileNode * NextActiveNode()
Get next active node.