ROOT  6.06/09
Reference Guide
TProofPlayer.cxx
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: Maarten Ballintijn 07/01/02
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2001, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 //////////////////////////////////////////////////////////////////////////
13 // //
14 // TProofPlayer //
15 // //
16 // This internal class and its subclasses steer the processing in PROOF.//
17 // Instances of the TProofPlayer class are created on the worker nodes //
18 // per session and do the processing. //
19 // Instances of its subclass - TProofPlayerRemote are created per each //
20 // query on the master(s) and on the client. On the master(s), //
21 // TProofPlayerRemote coordinate processing, check the dataset, create //
22 // the packetizer and take care of merging the results of the workers. //
23 // The instance on the client collects information on the input //
24 // (dataset and selector), it invokes the Begin() method and finalizes //
25 // the query by calling Terminate(). //
26 // //
27 //////////////////////////////////////////////////////////////////////////
28 
29 #include "TProofDraw.h"
30 #include "TProofPlayer.h"
31 #include "THashList.h"
32 #include "TEnv.h"
33 #include "TEventIter.h"
34 #include "TVirtualPacketizer.h"
35 #include "TSelector.h"
36 #include "TSocket.h"
37 #include "TProofServ.h"
38 #include "TProof.h"
39 #include "TProofOutputFile.h"
40 #include "TProofSuperMaster.h"
41 #include "TSlave.h"
42 #include "TClass.h"
43 #include "TROOT.h"
44 #include "TError.h"
45 #include "TException.h"
46 #include "MessageTypes.h"
47 #include "TMessage.h"
48 #include "TDSetProxy.h"
49 #include "TString.h"
50 #include "TSystem.h"
51 #include "TFile.h"
52 #include "TFileCollection.h"
53 #include "TFileInfo.h"
54 #include "TFileMerger.h"
55 #include "TProofDebug.h"
56 #include "TTimer.h"
57 #include "TMap.h"
58 #include "TPerfStats.h"
59 #include "TStatus.h"
60 #include "TEventList.h"
61 #include "TProofLimitsFinder.h"
62 #include "THashList.h"
63 #include "TSortedList.h"
64 #include "TTree.h"
65 #include "TEntryList.h"
66 #include "TDSet.h"
67 #include "TDrawFeedback.h"
68 #include "TNamed.h"
69 #include "TObjString.h"
70 #include "TQueryResult.h"
71 #include "TMD5.h"
72 #include "TMethodCall.h"
73 #include "TObjArray.h"
74 #include "TMutex.h"
75 #include "TH1.h"
76 #include "TVirtualMonitoring.h"
77 #include "TParameter.h"
79 #include "TStopwatch.h"
80 
81 // Timeout exception
82 #define kPEX_STOPPED 1001
83 #define kPEX_ABORTED 1002
84 
85 // To flag an abort condition: use a local static variable to avoid
86 // warnings about problems with longjumps
87 static Bool_t gAbort = kFALSE;
88 
89 class TAutoBinVal : public TNamed {
90 private:
91  Double_t fXmin, fXmax, fYmin, fYmax, fZmin, fZmax;
92 
93 public:
94  TAutoBinVal(const char *name, Double_t xmin, Double_t xmax, Double_t ymin,
95  Double_t ymax, Double_t zmin, Double_t zmax) : TNamed(name,"")
96  {
97  fXmin = xmin; fXmax = xmax;
98  fYmin = ymin; fYmax = ymax;
99  fZmin = zmin; fZmax = zmax;
100  }
101  void GetAll(Double_t& xmin, Double_t& xmax, Double_t& ymin,
102  Double_t& ymax, Double_t& zmin, Double_t& zmax)
103  {
104  xmin = fXmin; xmax = fXmax;
105  ymin = fYmin; ymax = fYmax;
106  zmin = fZmin; zmax = fZmax;
107  }
108 
109 };
110 
111 //
112 // Special timer to dispatch pending events while processing
113 ////////////////////////////////////////////////////////////////////////////////
114 
115 class TDispatchTimer : public TTimer {
116 private:
117  TProofPlayer *fPlayer;
118 
119 public:
120  TDispatchTimer(TProofPlayer *p) : TTimer(1000, kFALSE), fPlayer(p) { }
121 
122  Bool_t Notify();
123 };
124 ////////////////////////////////////////////////////////////////////////////////
125 /// Handle expiration of the timer associated with dispatching pending
126 /// events while processing. We must act as fast as possible here, so
127 /// we just set a flag submitting a request for dispatching pending events
128 
130 {
131  if (gDebug > 0) printf("TDispatchTimer::Notify: called!\n");
132 
133  fPlayer->SetBit(TProofPlayer::kDispatchOneEvent);
134 
135  // Needed for the next shot
136  Reset();
137  return kTRUE;
138 }
139 
140 //
141 // Special timer to notify reach of max packet proc time
142 ////////////////////////////////////////////////////////////////////////////////
143 
144 class TProctimeTimer : public TTimer {
145 private:
146  TProofPlayer *fPlayer;
147 
148 public:
149  TProctimeTimer(TProofPlayer *p, Long_t to) : TTimer(to, kFALSE), fPlayer(p) { }
150 
151  Bool_t Notify();
152 };
153 ////////////////////////////////////////////////////////////////////////////////
154 /// Handle expiration of the timer associated with dispatching pending
155 /// events while processing. We must act as fast as possible here, so
156 /// we just set a flag submitting a request for dispatching pending events
157 
159 {
160  if (gDebug > 0) printf("TProctimeTimer::Notify: called!\n");
161 
162  fPlayer->SetBit(TProofPlayer::kMaxProcTimeReached);
163 
164  // One shot only
165  return kTRUE;
166 }
167 
168 //
169 // Special timer to handle stop/abort request via exception raising
170 ////////////////////////////////////////////////////////////////////////////////
171 
172 class TStopTimer : public TTimer {
173 private:
174  Bool_t fAbort;
175  TProofPlayer *fPlayer;
176 
177 public:
178  TStopTimer(TProofPlayer *p, Bool_t abort, Int_t to);
179 
180  Bool_t Notify();
181 };
182 
183 ////////////////////////////////////////////////////////////////////////////////
184 /// Constructor for the timer to stop/abort processing.
185 /// The 'timeout' is in seconds.
186 /// Make sure that 'to' make sense, i.e. not larger than 10 days;
187 /// the minimum value is 10 ms (0 does not seem to start the timer ...).
188 
189 TStopTimer::TStopTimer(TProofPlayer *p, Bool_t abort, Int_t to)
190  : TTimer(((to <= 0 || to > 864000) ? 10 : to * 1000), kFALSE)
191 {
192  if (gDebug > 0)
193  Info ("TStopTimer","enter: %d, timeout: %d", abort, to);
194 
195  fPlayer = p;
196  fAbort = abort;
197 
198  if (gDebug > 1)
199  Info ("TStopTimer","timeout set to %s ms", fTime.AsString());
200 }
201 
202 ////////////////////////////////////////////////////////////////////////////////
203 /// Handle the signal coming from the expiration of the timer
204 /// associated with an abort or stop request.
205 /// We raise an exception which will be processed in the
206 /// event loop.
207 
209 {
210  if (gDebug > 0) printf("TStopTimer::Notify: called!\n");
211 
212  if (fAbort)
214  else
216 
217  return kTRUE;
218 }
219 
220 //------------------------------------------------------------------------------
221 
223 
224 THashList *TProofPlayer::fgDrawInputPars = 0;
225 
226 ////////////////////////////////////////////////////////////////////////////////
227 /// Default ctor.
228 
230  : fAutoBins(0), fOutput(0), fSelector(0), fCreateSelObj(kTRUE), fSelectorClass(0),
231  fFeedbackTimer(0), fFeedbackPeriod(2000),
232  fEvIter(0), fSelStatus(0),
233  fTotalEvents(0), fReadBytesRun(0), fReadCallsRun(0), fProcessedRun(0),
234  fQueryResults(0), fQuery(0), fPreviousQuery(0), fDrawQueries(0),
235  fMaxDrawQueries(1), fStopTimer(0), fStopTimerMtx(0), fDispatchTimer(0),
236  fProcTimeTimer(0), fProcTime(0),
237  fOutputFile(0),
238  fSaveMemThreshold(-1), fSavePartialResults(kFALSE), fSaveResultsPerPacket(kFALSE)
239 {
240  fInput = new TList;
241  fExitStatus = kFinished;
242  fProgressStatus = new TProofProgressStatus();
244  ResetBit(TProofPlayer::kIsProcessing);
247 
248  static Bool_t initLimitsFinder = kFALSE;
249  if (!initLimitsFinder && gProofServ && !gProofServ->IsMaster()) {
251  initLimitsFinder = kTRUE;
252  }
253 
254 }
255 
256 ////////////////////////////////////////////////////////////////////////////////
257 /// Destructor.
258 
260 {
261  fInput->Clear("nodelete");
263  // The output list is owned by fSelector and destroyed in there
264  SafeDelete(fSelector);
265  SafeDelete(fFeedbackTimer);
266  SafeDelete(fEvIter);
267  SafeDelete(fQueryResults);
268  SafeDelete(fDispatchTimer);
269  SafeDelete(fProcTimeTimer);
270  SafeDelete(fProcTime);
271  SafeDelete(fStopTimer);
272 }
273 
274 ////////////////////////////////////////////////////////////////////////////////
275 /// Set processing bit according to 'on'
276 
278 {
279  if (on)
281  else
283 }
284 
285 ////////////////////////////////////////////////////////////////////////////////
286 /// Stop the process after this event. If timeout is positive, start
287 /// a timer firing after timeout seconds to hard-stop time-expensive
288 /// events.
289 
291 {
292  if (gDebug > 0)
293  Info ("StopProcess","abort: %d, timeout: %d", abort, timeout);
294 
295  if (fEvIter != 0)
296  fEvIter->StopProcess(abort);
297  Long_t to = 1;
298  if (abort == kTRUE) {
300  } else {
302  to = timeout;
303  }
304  // Start countdown, if needed
305  if (to > 0)
306  SetStopTimer(kTRUE, abort, to);
307 }
308 
309 ////////////////////////////////////////////////////////////////////////////////
310 /// Enable/disable the timer to dispatch pening events while processing.
311 
313 {
314  SafeDelete(fDispatchTimer);
316  if (on) {
317  fDispatchTimer = new TDispatchTimer(this);
318  fDispatchTimer->Start();
319  }
320 }
321 
322 ////////////////////////////////////////////////////////////////////////////////
323 /// Enable/disable the timer to stop/abort processing.
324 /// The 'timeout' is in seconds.
325 
327 {
328  fStopTimerMtx = (fStopTimerMtx) ? fStopTimerMtx : new TMutex(kTRUE);
329  R__LOCKGUARD(fStopTimerMtx);
330 
331  // Clean-up the timer
332  SafeDelete(fStopTimer);
333  if (on) {
334  // create timer
335  fStopTimer = new TStopTimer(this, abort, timeout);
336  // Start the countdown
337  fStopTimer->Start();
338  if (gDebug > 0)
339  Info ("SetStopTimer", "%s timer STARTED (timeout: %d)",
340  (abort ? "ABORT" : "STOP"), timeout);
341  } else {
342  if (gDebug > 0)
343  Info ("SetStopTimer", "timer STOPPED");
344  }
345 }
346 
347 ////////////////////////////////////////////////////////////////////////////////
348 /// Add query result to the list, making sure that there are no
349 /// duplicates.
350 
352 {
353  if (!q) {
354  Warning("AddQueryResult","query undefined - do nothing");
355  return;
356  }
357 
358  // Treat differently normal and draw queries
359  if (!(q->IsDraw())) {
360  if (!fQueryResults) {
361  fQueryResults = new TList;
362  fQueryResults->Add(q);
363  } else {
364  TIter nxr(fQueryResults);
365  TQueryResult *qr = 0;
366  TQueryResult *qp = 0;
367  while ((qr = (TQueryResult *) nxr())) {
368  // If same query, remove old version and break
369  if (*qr == *q) {
370  fQueryResults->Remove(qr);
371  delete qr;
372  break;
373  }
374  // Record position according to start time
375  if (qr->GetStartTime().Convert() <= q->GetStartTime().Convert())
376  qp = qr;
377  }
378 
379  if (!qp) {
380  fQueryResults->AddFirst(q);
381  } else {
382  fQueryResults->AddAfter(qp, q);
383  }
384  }
385  } else if (IsClient()) {
386  // If max reached, eliminate first the oldest one
387  if (fDrawQueries == fMaxDrawQueries && fMaxDrawQueries > 0) {
388  TIter nxr(fQueryResults);
389  TQueryResult *qr = 0;
390  while ((qr = (TQueryResult *) nxr())) {
391  // If same query, remove old version and break
392  if (qr->IsDraw()) {
393  fDrawQueries--;
394  fQueryResults->Remove(qr);
395  delete qr;
396  break;
397  }
398  }
399  }
400  // Add new draw query
401  if (fDrawQueries >= 0 && fDrawQueries < fMaxDrawQueries) {
402  fDrawQueries++;
403  if (!fQueryResults)
404  fQueryResults = new TList;
405  fQueryResults->Add(q);
406  }
407  }
408 }
409 
410 ////////////////////////////////////////////////////////////////////////////////
411 /// Remove all query result instances referenced 'ref' from
412 /// the list of results.
413 
414 void TProofPlayer::RemoveQueryResult(const char *ref)
415 {
416  if (fQueryResults) {
417  TIter nxq(fQueryResults);
418  TQueryResult *qr = 0;
419  while ((qr = (TQueryResult *) nxq())) {
420  if (qr->Matches(ref)) {
421  fQueryResults->Remove(qr);
422  delete qr;
423  }
424  }
425  }
426 }
427 
428 ////////////////////////////////////////////////////////////////////////////////
429 /// Get query result instances referenced 'ref' from
430 /// the list of results.
431 
433 {
434  if (fQueryResults) {
435  if (ref && strlen(ref) > 0) {
436  TIter nxq(fQueryResults);
437  TQueryResult *qr = 0;
438  while ((qr = (TQueryResult *) nxq())) {
439  if (qr->Matches(ref))
440  return qr;
441  }
442  } else {
443  // Get last
444  return (TQueryResult *) fQueryResults->Last();
445  }
446  }
447 
448  // Nothing found
449  return (TQueryResult *)0;
450 }
451 
452 ////////////////////////////////////////////////////////////////////////////////
453 /// Set current query and save previous value.
454 
456 {
457  fPreviousQuery = fQuery;
458  fQuery = q;
459 }
460 
461 ////////////////////////////////////////////////////////////////////////////////
462 /// Add object to input list.
463 
465 {
466  fInput->Add(inp);
467 }
468 
469 ////////////////////////////////////////////////////////////////////////////////
470 /// Clear input list.
471 
473 {
474  fInput->Clear();
475 }
476 
477 ////////////////////////////////////////////////////////////////////////////////
478 /// Get output object by name.
479 
481 {
482  if (fOutput)
483  return fOutput->FindObject(name);
484  return 0;
485 }
486 
487 ////////////////////////////////////////////////////////////////////////////////
488 /// Get output list.
489 
491 {
492  TList *ol = fOutput;
493  if (!ol && fQuery)
494  ol = fQuery->GetOutputList();
495  return ol;
496 }
497 
498 ////////////////////////////////////////////////////////////////////////////////
499 /// Reinitialize fSelector using the selector files in the query result.
500 /// Needed when Finalize is called after a Process execution for the same
501 /// selector name.
502 
504 {
505  Int_t rc = 0;
506 
507  // Make sure we have a query
508  if (!qr) {
509  Info("ReinitSelector", "query undefined - do nothing");
510  return -1;
511  }
512 
513  // Selector name
514  TString selec = qr->GetSelecImp()->GetName();
515  if (selec.Length() <= 0) {
516  Info("ReinitSelector", "selector name undefined - do nothing");
517  return -1;
518  }
519 
520  // Find out if this is a standard selection used for Draw actions
521  Bool_t stdselec = TSelector::IsStandardDraw(selec);
522 
523  // Find out if this is a precompiled selector: in such a case we do not
524  // have the code in TMacros, so we must rely on local libraries
525  Bool_t compselec = (selec.Contains(".") || stdselec) ? kFALSE : kTRUE;
526 
527  // If not, find out if it needs to be expanded
528  TString ipathold;
529  if (!stdselec && !compselec) {
530  // Check checksums for the versions of the selector files
531  Bool_t expandselec = kTRUE;
532  TString dir, ipath;
533  char *selc = gSystem->Which(TROOT::GetMacroPath(), selec, kReadPermission);
534  if (selc) {
535  // Check checksums
536  TMD5 *md5icur = 0, *md5iold = 0, *md5hcur = 0, *md5hold = 0;
537  // Implementation files
538  md5icur = TMD5::FileChecksum(selc);
539  md5iold = qr->GetSelecImp()->Checksum();
540  // Header files
541  TString selh(selc);
542  Int_t dot = selh.Last('.');
543  if (dot != kNPOS) selh.Remove(dot);
544  selh += ".h";
546  md5hcur = TMD5::FileChecksum(selh);
547  md5hold = qr->GetSelecHdr()->Checksum();
548 
549  // If nothing has changed nothing to do
550  if (md5hcur && md5hold && md5icur && md5iold)
551  if (*md5hcur == *md5hold && *md5icur == *md5iold)
552  expandselec = kFALSE;
553 
554  SafeDelete(md5icur);
555  SafeDelete(md5hcur);
556  SafeDelete(md5iold);
557  SafeDelete(md5hold);
558  if (selc) delete [] selc;
559  }
560 
561  Bool_t ok = kTRUE;
562  // Expand selector files, if needed
563  if (expandselec) {
564 
565  ok = kFALSE;
566  // Expand files in a temporary directory
567  TUUID u;
568  dir = Form("%s/%s",gSystem->TempDirectory(),u.AsString());
569  if (!(gSystem->MakeDirectory(dir))) {
570 
571  // Export implementation file
572  selec = Form("%s/%s",dir.Data(),selec.Data());
573  qr->GetSelecImp()->SaveSource(selec);
574 
575  // Export header file
576  TString seleh = Form("%s/%s",dir.Data(),qr->GetSelecHdr()->GetName());
577  qr->GetSelecHdr()->SaveSource(seleh);
578 
579  // Adjust include path
580  ipathold = gSystem->GetIncludePath();
581  ipath = Form("-I%s %s", dir.Data(), gSystem->GetIncludePath());
582  gSystem->SetIncludePath(ipath.Data());
583 
584  ok = kTRUE;
585  }
586  }
587  TString opt(qr->GetOptions());
588  Ssiz_t id = opt.Last('#');
589  if (id != kNPOS && id < opt.Length() - 1)
590  selec += opt(id + 1, opt.Length());
591 
592  if (!ok) {
593  Info("ReinitSelector", "problems locating or exporting selector files");
594  return -1;
595  }
596  }
597 
598  // Cleanup previous stuff
599  SafeDelete(fSelector);
600  fSelectorClass = 0;
601 
602  // Init the selector now
603  Int_t iglevelsave = gErrorIgnoreLevel;
604  if (compselec)
605  // Silent error printout on first attempt
607 
608  if ((fSelector = TSelector::GetSelector(selec))) {
609  if (compselec)
610  gErrorIgnoreLevel = iglevelsave; // restore ignore level
611  fSelectorClass = fSelector->IsA();
612  fSelector->SetOption(qr->GetOptions());
613 
614  } else {
615  if (compselec) {
616  gErrorIgnoreLevel = iglevelsave; // restore ignore level
617  // Retry by loading first the libraries listed in TQueryResult, if any
618  if (strlen(qr->GetLibList()) > 0) {
619  TString sl(qr->GetLibList());
620  TObjArray *oa = sl.Tokenize(" ");
621  if (oa) {
622  Bool_t retry = kFALSE;
623  TIter nxl(oa);
624  TObjString *os = 0;
625  while ((os = (TObjString *) nxl())) {
626  TString lib = gSystem->BaseName(os->GetName());
627  if (lib != "lib") {
628  lib.ReplaceAll("-l", "lib");
629  if (gSystem->Load(lib) == 0)
630  retry = kTRUE;
631  }
632  }
633  // Retry now, if the case
634  if (retry)
635  fSelector = TSelector::GetSelector(selec);
636  }
637  }
638  }
639  if (!fSelector) {
640  if (compselec)
641  Info("ReinitSelector", "compiled selector re-init failed:"
642  " automatic reload unsuccessful:"
643  " please load manually the correct library");
644  rc = -1;
645  }
646  }
647  if (fSelector) {
648  // Draw needs to reinit temp histos
649  fSelector->SetInputList(qr->GetInputList());
650  if (stdselec) {
651  ((TProofDraw *)fSelector)->DefVar();
652  } else {
653  // variables may have been initialized in Begin()
654  fSelector->Begin(0);
655  }
656  }
657 
658  // Restore original include path, if needed
659  if (ipathold.Length() > 0)
660  gSystem->SetIncludePath(ipathold.Data());
661 
662  return rc;
663 }
664 
665 ////////////////////////////////////////////////////////////////////////////////
666 /// Incorporate output object (may not be used in this class).
667 
669 {
670  MayNotUse("AddOutputObject");
671  return -1;
672 }
673 
674 ////////////////////////////////////////////////////////////////////////////////
675 /// Incorporate output list (may not be used in this class).
676 
678 {
679  MayNotUse("AddOutput");
680 }
681 
682 ////////////////////////////////////////////////////////////////////////////////
683 /// Store output list (may not be used in this class).
684 
686 {
687  MayNotUse("StoreOutput");
688 }
689 
690 ////////////////////////////////////////////////////////////////////////////////
691 /// Store feedback list (may not be used in this class).
692 
694 {
695  MayNotUse("StoreFeedback");
696 }
697 
698 ////////////////////////////////////////////////////////////////////////////////
699 /// Report progress (may not be used in this class).
700 
701 void TProofPlayer::Progress(Long64_t /*total*/, Long64_t /*processed*/)
702 {
703  MayNotUse("Progress");
704 }
705 
706 ////////////////////////////////////////////////////////////////////////////////
707 /// Report progress (may not be used in this class).
708 
709 void TProofPlayer::Progress(Long64_t /*total*/, Long64_t /*processed*/,
710  Long64_t /*bytesread*/,
711  Float_t /*evtRate*/, Float_t /*mbRate*/,
712  Float_t /*evtrti*/, Float_t /*mbrti*/)
713 {
714  MayNotUse("Progress");
715 }
716 
717 ////////////////////////////////////////////////////////////////////////////////
718 /// Report progress (may not be used in this class).
719 
721 {
722  MayNotUse("Progress");
723 }
724 
725 ////////////////////////////////////////////////////////////////////////////////
726 /// Set feedback list (may not be used in this class).
727 
729 {
730  MayNotUse("Feedback");
731 }
732 
733 ////////////////////////////////////////////////////////////////////////////////
734 /// Draw feedback creation proxy. When accessed via TProof avoids
735 /// link dependency on libProofPlayer.
736 
738 {
739  return new TDrawFeedback(p);
740 }
741 
742 ////////////////////////////////////////////////////////////////////////////////
743 /// Set draw feedback option.
744 
746 {
747  if (f)
748  f->SetOption(opt);
749 }
750 
751 ////////////////////////////////////////////////////////////////////////////////
752 /// Delete draw feedback object.
753 
755 {
756  delete f;
757 }
758 
759 ////////////////////////////////////////////////////////////////////////////////
760 /// Save the partial results of this query to a dedicated file under the user
761 /// data directory. The file name has the form
762 /// <session_tag>.q<query_seq_num>.root
763 /// The file pat and the file are created if not existing already.
764 /// Only objects in the outputlist not being TProofOutputFile are saved.
765 /// The packets list 'packets' is saved if given.
766 /// Trees not attached to any file are attached to the open file.
767 /// If 'queryend' is kTRUE evrything is written out (TTrees included).
768 /// The actual saving action is controlled by 'force' and by fSavePartialResults /
769 /// fSaveResultsPerPacket:
770 ///
771 /// fSavePartialResults = kFALSE/kTRUE no-saving/saving
772 /// fSaveResultsPerPacket = kFALSE/kTRUE save-per-query/save-per-packet
773 ///
774 /// The function CheckMemUsage sets fSavePartialResults = 1 if fSaveMemThreshold > 0 and
775 /// ProcInfo_t::fMemResident >= fSaveMemThreshold: from that point on partial results
776 /// are always saved and expensive calls to TSystem::GetProcInfo saved.
777 /// The switch fSaveResultsPerPacket is instead controlled by the user or admin
778 /// who can also force saving in all cases; parameter PROOF_SavePartialResults or
779 /// RC env ProofPlayer.SavePartialResults .
780 /// However, if 'force' is kTRUE, fSavePartialResults and fSaveResultsPerPacket
781 /// are ignored.
782 /// Return -1 in case of problems, 0 otherwise.
783 
785 {
786  Bool_t save = (force || (fSavePartialResults &&
787  (queryend || fSaveResultsPerPacket))) ? kTRUE : kFALSE;
788  if (!save) {
789  PDB(kOutput, 2)
790  Info("SavePartialResults", "partial result saving disabled");
791  return 0;
792  }
793 
794  // Sanity check
795  if (!gProofServ) {
796  Error("SavePartialResults", "gProofServ undefined: something really wrong going on!!!");
797  return -1;
798  }
799  if (!fOutput) {
800  Error("SavePartialResults", "fOutput undefined: something really wrong going on!!!");
801  return -1;
802  }
803 
804  PDB(kOutput, 1)
805  Info("SavePartialResults", "start saving partial results {%d,%d,%d,%d}",
806  queryend, force, fSavePartialResults, fSaveResultsPerPacket);
807 
808  // Get list of processed packets from the iterator
809  PDB(kOutput, 2) Info("SavePartialResults", "fEvIter: %p", fEvIter);
810 
811  TList *packets = (fEvIter) ? fEvIter->GetPackets() : 0;
812  PDB(kOutput, 2) Info("SavePartialResults", "list of packets: %p, sz: %d",
813  packets, (packets ? packets->GetSize(): -1));
814 
815  // Open the file
816  const char *oopt = "UPDATE";
817  // Check if the file has already been defined
818  TString baseName(fOutputFilePath);
819  if (fOutputFilePath.IsNull()) {
820  baseName.Form("output-%s.q%d.root", gProofServ->GetTopSessionTag(), gProofServ->GetQuerySeqNum());
821  if (gProofServ->GetDataDirOpts() && strlen(gProofServ->GetDataDirOpts()) > 0) {
822  fOutputFilePath.Form("%s/%s?%s", gProofServ->GetDataDir(), baseName.Data(),
824  } else {
825  fOutputFilePath.Form("%s/%s", gProofServ->GetDataDir(), baseName.Data());
826  }
827  Info("SavePartialResults", "file with (partial) output: '%s'", fOutputFilePath.Data());
828  oopt = "RECREATE";
829  }
830  // Open the file in write mode
831  if (!(fOutputFile = TFile::Open(fOutputFilePath, oopt)) ||
832  (fOutputFile && fOutputFile->IsZombie())) {
833  Error("SavePartialResults", "cannot open '%s' for writing", fOutputFilePath.Data());
834  SafeDelete(fOutputFile);
835  return -1;
836  }
837 
838  // Save current directory
839  TDirectory *curdir = gDirectory;
840  fOutputFile->cd();
841 
842  // Write first the packets list, if required
843  if (packets) {
844  TDirectory *packetsDir = fOutputFile->mkdir("packets");
845  if (packetsDir) packetsDir->cd();
847  fOutputFile->cd();
848  }
849 
850  Bool_t notempty = kFALSE;
851  // Write out the output list
852  TList torm;
853  TIter nxo(fOutput);
854  TObject *o = 0;
855  while ((o = nxo())) {
856  // Skip output file drivers
857  if (o->InheritsFrom(TProofOutputFile::Class())) continue;
858  // Skip control objets
859  if (!strncmp(o->GetName(), "PROOF_", 6)) continue;
860  // Skip data members mapping
862  // Skip missing file info
863  if (!strcmp(o->GetName(), "MissingFiles")) continue;
864  // Trees need a special treatment
865  if (o->InheritsFrom("TTree")) {
866  TTree *t = (TTree *) o;
867  TDirectory *d = t->GetDirectory();
868  // If the tree is not attached to any file ...
869  if (!d || (d && !d->InheritsFrom("TFile"))) {
870  // ... we attach it
871  t->SetDirectory(fOutputFile);
872  }
873  if (t->GetDirectory() == fOutputFile) {
874  if (queryend) {
875  // ... we write it out
876  o->Write(0, TObject::kOverwrite);
877  // At least something in the file
878  notempty = kTRUE;
879  // Flag for removal from the outputlist
880  torm.Add(o);
881  // Prevent double-deletion attempts
882  t->SetDirectory(0);
883  } else {
884  // ... or we set in automatic flush mode
885  t->SetAutoFlush();
886  }
887  }
888  } else if (queryend || fSaveResultsPerPacket) {
889  // Save overwriting what's already there
890  o->Write(0, TObject::kOverwrite);
891  // At least something in the file
892  notempty = kTRUE;
893  // Flag for removal from the outputlist
894  if (queryend) torm.Add(o);
895  }
896  }
897 
898  // Restore previous directory
899  gDirectory = curdir;
900 
901  // Close the file if required
902  if (notempty) {
903  if (!fOutput->FindObject(baseName)) {
904  TProofOutputFile *po = 0;
905  // Get directions
906  TNamed *nm = (TNamed *) fInput->FindObject("PROOF_DefaultOutputOption");
907  TString oname = (nm) ? nm->GetTitle() : fOutputFilePath.Data();
908  if (nm && oname.BeginsWith("ds:")) {
909  oname.Replace(0, 3, "");
910  TString qtag =
912  oname.ReplaceAll("<qtag>", qtag);
913  // Create the TProofOutputFile for dataset creation
914  po = new TProofOutputFile(baseName, "DRO", oname.Data());
915  } else {
916  Bool_t hasddir = kFALSE;
917  // Create the TProofOutputFile for automatic merging
918  po = new TProofOutputFile(baseName, "M");
919  if (oname.BeginsWith("of:")) oname.Replace(0, 3, "");
920  if (gProofServ->IsTopMaster()) {
921  if (!strcmp(TUrl(oname, kTRUE).GetProtocol(), "file")) {
922  TString dsrv;
924  TProofServ::FilterLocalroot(oname, dsrv);
925  oname.Insert(0, dsrv);
926  }
927  } else {
928  if (nm) {
929  // The name has been sent by the client: resolve local place holders
930  oname.ReplaceAll("<file>", baseName);
931  } else {
932  // We did not get any indication; the final file will be in the datadir on
933  // the top master and it will be resolved there
934  oname.Form("<datadir>/%s", baseName.Data());
935  hasddir = kTRUE;
936  }
937  }
938  po->SetOutputFileName(oname.Data());
939  if (hasddir)
940  // Reset the bit, so that <datadir> has a chance to be resolved in AddOutputObject
942  po->SetName(gSystem->BaseName(oname.Data()));
943  }
944  po->AdoptFile(fOutputFile);
945  fOutput->Add(po);
946  // Flag the nature of this file
948  }
949  }
950  fOutputFile->Close();
951  SafeDelete(fOutputFile);
952 
953  // If last call, cleanup the output list from objects saved to file
954  if (queryend && torm.GetSize() > 0) {
955  TIter nxrm(&torm);
956  while ((o = nxrm())) { fOutput->Remove(o); }
957  }
958  torm.SetOwner(kFALSE);
959 
960  PDB(kOutput, 1)
961  Info("SavePartialResults", "partial results saved to file");
962  // We are done
963  return 0;
964 }
965 
966 ////////////////////////////////////////////////////////////////////////////////
967 /// Make sure that a valid selector object
968 /// Return -1 in case of problems, 0 otherwise
969 
970 Int_t TProofPlayer::AssertSelector(const char *selector_file)
971 {
972  if (selector_file && strlen(selector_file)) {
973  if (fCreateSelObj) SafeDelete(fSelector);
974 
975  // Get selector files from cache
976  if (gProofServ) {
978  TString ocwd = gSystem->WorkingDirectory();
980 
981  fSelector = TSelector::GetSelector(selector_file);
982 
983  gSystem->ChangeDirectory(ocwd);
985 
986  if (!fSelector) {
987  Error("AssertSelector", "cannot load: %s", selector_file );
988  return -1;
989  }
990  }
991 
993  Info("AssertSelector", "Processing via filename");
994  } else if (!fSelector) {
995  Error("AssertSelector", "no TSelector object define : cannot continue!");
996  return -1;
997  } else {
998  Info("AssertSelector", "Processing via TSelector object");
999  }
1000  // Done
1001  return 0;
1002 }
1003 ////////////////////////////////////////////////////////////////////////////////
1004 /// Update fProgressStatus
1005 
1007 {
1008  if (fProgressStatus) {
1009  fProgressStatus->IncEntries(fProcessedRun);
1013  if (gMonitoringWriter)
1016  fProcessedRun = 0;
1017  }
1018 }
1019 
1020 ////////////////////////////////////////////////////////////////////////////////
1021 /// Process specified TDSet on PROOF worker.
1022 /// The return value is -1 in case of error and TSelector::GetStatus()
1023 /// in case of success.
1024 
1025 Long64_t TProofPlayer::Process(TDSet *dset, const char *selector_file,
1026  Option_t *option, Long64_t nentries,
1027  Long64_t first)
1028 {
1029  PDB(kGlobal,1) Info("Process","Enter");
1030 
1032  fOutput = 0;
1033 
1034  TCleanup clean(this);
1035 
1036  fSelectorClass = 0;
1037  TString wmsg;
1038  TRY {
1039  if (AssertSelector(selector_file) != 0 || !fSelector) {
1040  Error("Process", "cannot assert the selector object");
1041  return -1;
1042  }
1043 
1044  fSelectorClass = fSelector->IsA();
1045  Int_t version = fSelector->Version();
1046  if (version == 0 && IsClient()) fSelector->GetOutputList()->Clear();
1047 
1048  fOutput = (THashList *) fSelector->GetOutputList();
1049 
1050  if (gProofServ)
1051  TPerfStats::Start(fInput, fOutput);
1052 
1053  fSelStatus = new TStatus;
1054  fOutput->Add(fSelStatus);
1055 
1056  fSelector->SetOption(option);
1057  fSelector->SetInputList(fInput);
1058 
1059  // If in sequential (0-PROOF) mode validate the data set to get
1060  // the number of entries
1061  fTotalEvents = nentries;
1062  if (fTotalEvents < 0 && gProofServ &&
1064  dset->Validate();
1065  dset->Reset();
1066  TDSetElement *e = 0;
1067  while ((e = dset->Next())) {
1068  fTotalEvents += e->GetNum();
1069  }
1070  }
1071 
1072  dset->Reset();
1073 
1074  // Set parameters controlling the iterator behaviour
1075  Int_t useTreeCache = 1;
1076  if (TProof::GetParameter(fInput, "PROOF_UseTreeCache", useTreeCache) == 0) {
1077  if (useTreeCache > -1 && useTreeCache < 2)
1078  gEnv->SetValue("ProofPlayer.UseTreeCache", useTreeCache);
1079  }
1080  Long64_t cacheSize = -1;
1081  if (TProof::GetParameter(fInput, "PROOF_CacheSize", cacheSize) == 0) {
1082  TString sz = TString::Format("%lld", cacheSize);
1083  gEnv->SetValue("ProofPlayer.CacheSize", sz.Data());
1084  }
1085  // Parallel unzipping
1086  Int_t useParallelUnzip = 0;
1087  if (TProof::GetParameter(fInput, "PROOF_UseParallelUnzip", useParallelUnzip) == 0) {
1088  if (useParallelUnzip > -1 && useParallelUnzip < 2)
1089  gEnv->SetValue("ProofPlayer.UseParallelUnzip", useParallelUnzip);
1090  }
1091  // OS file caching (Mac Os X only)
1092  Int_t dontCacheFiles = 0;
1093  if (TProof::GetParameter(fInput, "PROOF_DontCacheFiles", dontCacheFiles) == 0) {
1094  if (dontCacheFiles == 1)
1095  gEnv->SetValue("ProofPlayer.DontCacheFiles", 1);
1096  }
1097  fEvIter = TEventIter::Create(dset, fSelector, first, nentries);
1098 
1099  // Control file object swap
1100  // <how>*10 + <force>
1101  // <how> = 0 end of run
1102  // 1 after each packet
1103  // <force> = 0 no, swap only if memory threshold is reached
1104  // 1 swap in all cases, accordingly to <how>
1105  Int_t opt = 0;
1106  if (TProof::GetParameter(fInput, "PROOF_SavePartialResults", opt) != 0) {
1107  opt = gEnv->GetValue("ProofPlayer.SavePartialResults", 0);
1108  }
1109  fSaveResultsPerPacket = (opt >= 10) ? kTRUE : kFALSE;
1110  fSavePartialResults = (opt%10 > 0) ? kTRUE : kFALSE;
1111  Info("Process", "save partial results? %d per-packet? %d", fSavePartialResults, fSaveResultsPerPacket);
1112 
1113  // Memory threshold for file object swap
1114  Float_t memfrac = gEnv->GetValue("ProofPlayer.SaveMemThreshold", -1.);
1115  if (memfrac > 0.) {
1116  // The threshold is per core
1117  SysInfo_t si;
1118  if (gSystem->GetSysInfo(&si) == 0) {
1119  fSaveMemThreshold = (Long_t) ((memfrac * si.fPhysRam * 1024.) / si.fCpus);
1120  Info("Process", "memory threshold for saving objects to file set to %ld kB",
1121  fSaveMemThreshold);
1122  } else {
1123  Error("Process", "cannot get SysInfo_t (!)");
1124  }
1125  }
1126 
1127  if (version == 0) {
1128  PDB(kLoop,1) Info("Process","Call Begin(0)");
1129  fSelector->Begin(0);
1130  } else {
1131  if (IsClient()) {
1132  // on client (for local run)
1133  PDB(kLoop,1) Info("Process","Call Begin(0)");
1134  fSelector->Begin(0);
1135  }
1136  if (!fSelStatus->TestBit(TStatus::kNotOk)) {
1137  PDB(kLoop,1) Info("Process","Call SlaveBegin(0)");
1138  fSelector->SlaveBegin(0); // Init is called explicitly
1139  // from GetNextEvent()
1140  }
1141  }
1142 
1143  } CATCH(excode) {
1145  Error("Process","exception %d caught", excode);
1147  return -1;
1148  } ENDTRY;
1149 
1150  // Save the results, if needed, closing the file
1151  if (SavePartialResults(kFALSE) < 0)
1152  Warning("Process", "problems seetting up file-object swapping");
1153 
1154  // Create feedback lists, if required
1155  SetupFeedback();
1156 
1157  if (gMonitoringWriter)
1158  gMonitoringWriter->SendProcessingStatus("STARTED",kTRUE);
1159 
1160  PDB(kLoop,1)
1161  Info("Process","Looping over Process()");
1162 
1163  // get the byte read counter at the beginning of processing
1164  fReadBytesRun = TFile::GetFileBytesRead();
1165  fReadCallsRun = TFile::GetFileReadCalls();
1166  fProcessedRun = 0;
1167  // force the first monitoring info
1168  if (gMonitoringWriter)
1170 
1171  // Start asynchronous timer to dispatch pending events
1172  SetDispatchTimer(kTRUE);
1173 
1174  // Loop over range
1175  gAbort = kFALSE;
1176  Long64_t entry;
1179 
1180  TRY {
1181 
1182  Int_t mrc = -1;
1183  // Get the frequency for checking memory consumption and logging information
1184  Long64_t memlogfreq = -1;
1185  if (((mrc = TProof::GetParameter(fInput, "PROOF_MemLogFreq", memlogfreq))) != 0) memlogfreq = -1;
1186  Long64_t singleshot = 1;
1187  Bool_t warnHWMres = kTRUE, warnHWMvir = kTRUE;
1188  TString lastMsg("(unfortunately no detailed info is available about current packet)");
1189 
1190  // Initial memory footprint
1191  if (!CheckMemUsage(singleshot, warnHWMres, warnHWMvir, wmsg)) {
1192  Error("Process", "%s", wmsg.Data());
1193  wmsg.Insert(0, TString::Format("ERROR:%s, after SlaveBegin(), ", gProofServ->GetOrdinal()));
1194  fSelStatus->Add(wmsg.Data());
1195  if (gProofServ) {
1196  gProofServ->SendAsynMessage(wmsg.Data());
1198  }
1201  } else if (!wmsg.IsNull()) {
1202  Warning("Process", "%s", wmsg.Data());
1203  }
1204 
1205  TPair *currentElem = 0;
1206  // The event loop on the worker
1207  Long64_t fst = -1, num;
1208  Long_t maxproctime = -1;
1209  Bool_t newrun = kFALSE;
1210  while ((fEvIter->GetNextPacket(fst, num) != -1) &&
1211  !fSelStatus->TestBit(TStatus::kNotOk) &&
1212  fSelector->GetAbort() == TSelector::kContinue) {
1213  // This is needed by the inflate infrastructure to calculate
1214  // sleeping times
1216 
1217  // Give the possibility to the selector to access additional info in the
1218  // incoming packet
1219  if (dset->Current()) {
1220  if (!currentElem) {
1221  currentElem = new TPair(new TObjString("PROOF_CurrentElement"), dset->Current());
1222  fInput->Add(currentElem);
1223  } else {
1224  if (currentElem->Value() != dset->Current()) {
1225  currentElem->SetValue(dset->Current());
1226  } else if (dset->Current()->TestBit(TDSetElement::kNewRun)) {
1228  }
1229  }
1230  if (dset->Current()->TestBit(TDSetElement::kNewPacket)) {
1231  if (dset->TestBit(TDSet::kEmpty)) {
1232  lastMsg = "check logs for possible stacktrace - last cycle:";
1233  } else {
1234  TDSetElement *elem = dynamic_cast<TDSetElement *>(currentElem->Value());
1235  TString fn = (elem) ? elem->GetFileName() : "<undef>";
1236  lastMsg.Form("while processing dset:'%s', file:'%s'"
1237  " - check logs for possible stacktrace - last event:", dset->GetName(), fn.Data());
1238  }
1239  TProofServ::SetLastMsg(lastMsg);
1240  }
1241  // Set the max proc time, if any
1242  if (dset->Current()->GetMaxProcTime() >= 0.)
1243  maxproctime = (Long_t) (1000 * dset->Current()->GetMaxProcTime());
1244  newrun = (dset->Current()->TestBit(TDSetElement::kNewPacket)) ? kTRUE : kFALSE;
1245  }
1246 
1249  // Setup packet proc time measurement
1250  if (maxproctime > 0) {
1251  if (!fProcTimeTimer) fProcTimeTimer = new TProctimeTimer(this, maxproctime);
1252  fProcTimeTimer->Start(maxproctime, kTRUE); // One shot
1253  if (!fProcTime) fProcTime = new TStopwatch();
1254  fProcTime->Reset(); // Reset counters
1255  }
1256  Long64_t refnum = num;
1257  if (refnum < 0 && maxproctime <= 0) {
1258  wmsg.Form("neither entries nor max proc time specified:"
1259  " risk of infinite loop: processing aborted");
1260  Error("Process", "%s", wmsg.Data());
1261  if (gProofServ) {
1262  wmsg.Insert(0, TString::Format("ERROR:%s, entry:%lld, ",
1264  gProofServ->SendAsynMessage(wmsg.Data());
1265  }
1268  break;
1269  }
1270  while (refnum < 0 || num--) {
1271 
1272  // Did we use all our time?
1274  fProcTime->Stop();
1275  if (!newrun && !TestBit(TProofPlayer::kMaxProcTimeExtended) && refnum > 0) {
1276  // How much are we left with?
1277  Float_t xleft = (refnum > num) ? (Float_t) num / (Float_t) (refnum) : 1.;
1278  if (xleft < 0.2) {
1279  // Give another try, 1.5 times the remaining measured expected time
1280  Long_t mpt = (Long_t) (1500 * num / ((Double_t)(refnum - num) / fProcTime->RealTime()));
1282  fProcTimeTimer->Start(mpt, kTRUE); // One shot
1284  }
1285  }
1287  Info("Process", "max proc time reached (%ld msecs): packet processing stopped:\n%s",
1288  maxproctime, lastMsg.Data());
1289 
1290  break;
1291  }
1292  }
1293 
1294  if (!(!fSelStatus->TestBit(TStatus::kNotOk) &&
1295  fSelector->GetAbort() == TSelector::kContinue)) break;
1296 
1297  // Get the netry number, taking into account entry or event lists
1298  entry = fEvIter->GetEntryNumber(fst);
1299  fst++;
1300 
1301  // Set the last entry
1302  TProofServ::SetLastEntry(entry);
1303 
1304  if (fSelector->Version() == 0) {
1305  PDB(kLoop,3)
1306  Info("Process","Call ProcessCut(%lld)", entry);
1307  if (fSelector->ProcessCut(entry)) {
1308  PDB(kLoop,3)
1309  Info("Process","Call ProcessFill(%lld)", entry);
1310  fSelector->ProcessFill(entry);
1311  }
1312  } else {
1313  PDB(kLoop,3)
1314  Info("Process","Call Process(%lld)", entry);
1315  fSelector->Process(entry);
1316  if (fSelector->GetAbort() == TSelector::kAbortProcess) {
1318  break;
1319  } else if (fSelector->GetAbort() == TSelector::kAbortFile) {
1320  Info("Process", "packet processing aborted following the"
1321  " selector settings:\n%s", lastMsg.Data());
1322  fEvIter->InvalidatePacket();
1324  }
1325  }
1326  if (!fSelStatus->TestBit(TStatus::kNotOk)) fProcessedRun++;
1327 
1328  // Check the memory footprint, if required
1329  if (memlogfreq > 0 && (GetEventsProcessed() + fProcessedRun)%memlogfreq == 0) {
1330  if (!CheckMemUsage(memlogfreq, warnHWMres, warnHWMvir, wmsg)) {
1331  Error("Process", "%s", wmsg.Data());
1332  if (gProofServ) {
1333  wmsg.Insert(0, TString::Format("ERROR:%s, entry:%lld, ",
1334  gProofServ->GetOrdinal(), entry));
1335  gProofServ->SendAsynMessage(wmsg.Data());
1336  }
1340  break;
1341  } else {
1342  if (!wmsg.IsNull()) {
1343  Warning("Process", "%s", wmsg.Data());
1344  if (gProofServ) {
1345  wmsg.Insert(0, TString::Format("WARNING:%s, entry:%lld, ",
1346  gProofServ->GetOrdinal(), entry));
1347  gProofServ->SendAsynMessage(wmsg.Data());
1348  }
1349  }
1350  }
1351  }
1353  gSystem->DispatchOneEvent(kTRUE);
1355  }
1357  if (fSelStatus->TestBit(TStatus::kNotOk) || gROOT->IsInterrupted()) break;
1358 
1359  // Make sure that the selector abort status is reset
1360  if (fSelector->GetAbort() == TSelector::kAbortFile)
1361  fSelector->Abort("status reset", TSelector::kContinue);
1362  }
1363  }
1364 
1365  } CATCH(excode) {
1366  if (excode == kPEX_STOPPED) {
1367  Info("Process","received stop-process signal");
1369  } else if (excode == kPEX_ABORTED) {
1370  gAbort = kTRUE;
1371  Info("Process","received abort-process signal");
1373  } else {
1374  Error("Process","exception %d caught", excode);
1375  // Perhaps we need a dedicated status code here ...
1376  gAbort = kTRUE;
1378  }
1380  } ENDTRY;
1381 
1382  // Clean-up the envelop for the current element
1383  TPair *currentElem = 0;
1384  if ((currentElem = (TPair *) fInput->FindObject("PROOF_CurrentElement"))) {
1385  if ((currentElem = (TPair *) fInput->Remove(currentElem))) {
1386  delete currentElem->Key();
1387  delete currentElem;
1388  }
1389  }
1390 
1391  // Final memory footprint
1392  Long64_t singleshot = 1;
1393  Bool_t warnHWMres = kTRUE, warnHWMvir = kTRUE;
1394  Bool_t shrc = CheckMemUsage(singleshot, warnHWMres, warnHWMvir, wmsg);
1395  if (!wmsg.IsNull()) Warning("Process", "%s (%s)", wmsg.Data(), shrc ? "warn" : "hwm");
1396 
1397  PDB(kGlobal,2)
1398  Info("Process","%lld events processed", fProgressStatus->GetEntries());
1399 
1400  if (gMonitoringWriter) {
1404  }
1405 
1406  // Stop active timers
1407  SetDispatchTimer(kFALSE);
1408  if (fStopTimer != 0)
1409  SetStopTimer(kFALSE, gAbort);
1410  if (fFeedbackTimer != 0)
1411  HandleTimer(0);
1412 
1413  StopFeedback();
1414 
1415  // Save the results, if needed, closing the file
1416  if (SavePartialResults(kTRUE) < 0)
1417  Warning("Process", "problems saving the results to file");
1418 
1419  SafeDelete(fEvIter);
1420 
1421  // Finalize
1422 
1423  if (fExitStatus != kAborted) {
1424 
1425  TIter nxo(GetOutputList());
1426  TObject *o = 0;
1427  while ((o = nxo())) {
1428  // Special treatment for files
1429  if (o->IsA() == TProofOutputFile::Class()) {
1431  of->Print();
1433  const char *dir = of->GetDir();
1434  if (!dir || (dir && strlen(dir) <= 0)) {
1436  } else if (dir && strlen(dir) > 0) {
1437  TUrl u(dir);
1438  if (!strcmp(u.GetHost(), "localhost") || !strcmp(u.GetHost(), "127.0.0.1") ||
1439  !strcmp(u.GetHost(), "localhost.localdomain")) {
1440  u.SetHost(TUrl(gSystem->HostName()).GetHostFQDN());
1441  of->SetDir(u.GetUrl(kTRUE));
1442  }
1443  of->Print();
1444  }
1445  }
1446  }
1447 
1449 
1450  if (!fSelStatus->TestBit(TStatus::kNotOk)) {
1451  if (fSelector->Version() == 0) {
1452  PDB(kLoop,1) Info("Process","Call Terminate()");
1453  fSelector->Terminate();
1454  } else {
1455  PDB(kLoop,1) Info("Process","Call SlaveTerminate()");
1456  fSelector->SlaveTerminate();
1457  if (IsClient() && !fSelStatus->TestBit(TStatus::kNotOk)) {
1458  PDB(kLoop,1) Info("Process","Call Terminate()");
1459  fSelector->Terminate();
1460  }
1461  }
1462  }
1463 
1464  // Add Selector status in the output list so it can be returned to the client as done
1465  // by Tree::Process (see ROOT-748). The status from the various workers will be added.
1466  fOutput->Add(new TParameter<Long64_t>("PROOF_SelectorStatus", (Long64_t) fSelector->GetStatus()));
1467 
1468  if (gProofServ && !gProofServ->IsParallel()) { // put all the canvases onto the output list
1469  TIter nxc(gROOT->GetListOfCanvases());
1470  while (TObject *c = nxc())
1471  fOutput->Add(c);
1472  }
1473  }
1474 
1475  if (gProofServ)
1476  TPerfStats::Stop();
1477 
1478  return 0;
1479 }
1480 
1481 ////////////////////////////////////////////////////////////////////////////////
1482 /// Process specified TDSet on PROOF worker with TSelector object
1483 /// The return value is -1 in case of error and TSelector::GetStatus()
1484 /// in case of success.
1485 
1487  Option_t *option, Long64_t nentries,
1488  Long64_t first)
1489 {
1490  if (!selector) {
1491  Error("Process", "selector object undefiend!");
1492  return -1;
1493  }
1494 
1495  if (fCreateSelObj) SafeDelete(fSelector);
1496  fSelector = selector;
1498  return Process(dset, (const char *)0, option, nentries, first);
1499 }
1500 
1501 ////////////////////////////////////////////////////////////////////////////////
1502 /// Not implemented: meaningful only in the remote player. Returns kFALSE.
1503 
1505 {
1506  return kFALSE;
1507 }
1508 
1509 ////////////////////////////////////////////////////////////////////////////////
1510 /// Check the memory usage, if requested.
1511 /// Return kTRUE if OK, kFALSE if above 95% of at least one between virtual or
1512 /// resident limits are depassed.
1513 
1515  Bool_t &w80v, TString &wmsg)
1516 {
1517  Long64_t processed = GetEventsProcessed() + fProcessedRun;
1518  if (mfreq > 0 && processed%mfreq == 0) {
1519  // Record the memory information
1520  ProcInfo_t pi;
1521  if (!gSystem->GetProcInfo(&pi)){
1522  wmsg = "";
1523  if (gProofServ)
1524  Info("CheckMemUsage|Svc", "Memory %ld virtual %ld resident event %lld",
1525  pi.fMemVirtual, pi.fMemResident, processed);
1526  // Save info in TStatus
1527  fSelStatus->SetMemValues(pi.fMemVirtual, pi.fMemResident);
1528  // Apply limit on virtual memory, if any: warn if above 80%, stop if above 95% of max
1529  if (TProofServ::GetVirtMemMax() > 0) {
1531  wmsg.Form("using more than %d%% of allowed virtual memory (%ld kB)"
1532  " - STOP processing", (Int_t) (TProofServ::GetMemStop() * 100), pi.fMemVirtual);
1533  return kFALSE;
1534  } else if (pi.fMemVirtual > TProofServ::GetMemHWM() * TProofServ::GetVirtMemMax() && w80v) {
1535  // Refine monitoring
1536  mfreq = 1;
1537  wmsg.Form("using more than %d%% of allowed virtual memory (%ld kB)",
1538  (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemVirtual);
1539  w80v = kFALSE;
1540  }
1541  }
1542  // Apply limit on resident memory, if any: warn if above 80%, stop if above 95% of max
1543  if (TProofServ::GetResMemMax() > 0) {
1545  wmsg.Form("using more than %d%% of allowed resident memory (%ld kB)"
1546  " - STOP processing", (Int_t) (TProofServ::GetMemStop() * 100), pi.fMemResident);
1547  return kFALSE;
1548  } else if (pi.fMemResident > TProofServ::GetMemHWM() * TProofServ::GetResMemMax() && w80r) {
1549  // Refine monitoring
1550  mfreq = 1;
1551  if (wmsg.Length() > 0) {
1552  wmsg.Form("using more than %d%% of allowed both virtual and resident memory ({%ld,%ld} kB)",
1553  (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemVirtual, pi.fMemResident);
1554  } else {
1555  wmsg.Form("using more than %d%% of allowed resident memory (%ld kB)",
1556  (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemResident);
1557  }
1558  w80r = kFALSE;
1559  }
1560  }
1561  // In saving-partial-results mode flag the saving regime when reached to save expensive calls
1562  // to TSystem::GetProcInfo in SavePartialResults
1563  if (fSaveMemThreshold > 0 && pi.fMemResident >= fSaveMemThreshold) fSavePartialResults = kTRUE;
1564  }
1565  }
1566  // Done
1567  return kTRUE;
1568 }
1569 
1570 ////////////////////////////////////////////////////////////////////////////////
1571 /// Finalize query (may not be used in this class).
1572 
1574 {
1575  MayNotUse("Finalize");
1576  return -1;
1577 }
1578 
1579 ////////////////////////////////////////////////////////////////////////////////
1580 /// Finalize query (may not be used in this class).
1581 
1583 {
1584  MayNotUse("Finalize");
1585  return -1;
1586 }
1587 ////////////////////////////////////////////////////////////////////////////////
1588 /// Merge output (may not be used in this class).
1589 
1591 {
1592  MayNotUse("MergeOutput");
1593  return;
1594 }
1595 
1596 ////////////////////////////////////////////////////////////////////////////////
1597 
1599 {
1601  fOutput->Add(olsdm);
1602 }
1603 
1604 ////////////////////////////////////////////////////////////////////////////////
1605 /// Update automatic binning parameters for given object "name".
1606 
1610  Double_t& zmin, Double_t& zmax)
1611 {
1612  if ( fAutoBins == 0 ) {
1613  fAutoBins = new THashList;
1614  }
1615 
1616  TAutoBinVal *val = (TAutoBinVal*) fAutoBins->FindObject(name);
1617 
1618  if ( val == 0 ) {
1619  //look for info in higher master
1620  if (gProofServ && !gProofServ->IsTopMaster()) {
1621  TString key = name;
1622  TProofLimitsFinder::AutoBinFunc(key,xmin,xmax,ymin,ymax,zmin,zmax);
1623  }
1624 
1625  val = new TAutoBinVal(name,xmin,xmax,ymin,ymax,zmin,zmax);
1626  fAutoBins->Add(val);
1627  } else {
1628  val->GetAll(xmin,xmax,ymin,ymax,zmin,zmax);
1629  }
1630 }
1631 
1632 ////////////////////////////////////////////////////////////////////////////////
1633 /// Get next packet (may not be used in this class).
1634 
1636 {
1637  MayNotUse("GetNextPacket");
1638  return 0;
1639 }
1640 
1641 ////////////////////////////////////////////////////////////////////////////////
1642 /// Set up feedback (may not be used in this class).
1643 
1645 {
1646  MayNotUse("SetupFeedback");
1647 }
1648 
1649 ////////////////////////////////////////////////////////////////////////////////
1650 /// Stop feedback (may not be used in this class).
1651 
1653 {
1654  MayNotUse("StopFeedback");
1655 }
1656 
1657 ////////////////////////////////////////////////////////////////////////////////
1658 /// Draw (may not be used in this class).
1659 
1660 Long64_t TProofPlayer::DrawSelect(TDSet * /*set*/, const char * /*varexp*/,
1661  const char * /*selection*/, Option_t * /*option*/,
1662  Long64_t /*nentries*/, Long64_t /*firstentry*/)
1663 {
1664  MayNotUse("DrawSelect");
1665  return -1;
1666 }
1667 
1668 ////////////////////////////////////////////////////////////////////////////////
1669 /// Handle tree header request.
1670 
1672 {
1673  MayNotUse("HandleGetTreeHeader|");
1674 }
1675 
1676 ////////////////////////////////////////////////////////////////////////////////
1677 /// Receive histo from slave.
1678 
1680 {
1681  TObject *obj = mess->ReadObject(mess->GetClass());
1682  if (obj->InheritsFrom(TH1::Class())) {
1683  TH1 *h = (TH1*)obj;
1684  h->SetDirectory(0);
1685  TH1 *horg = (TH1*)gDirectory->GetList()->FindObject(h->GetName());
1686  if (horg)
1687  horg->Add(h);
1688  else
1690  }
1691 }
1692 
1693 ////////////////////////////////////////////////////////////////////////////////
1694 /// Draw the object if it is a canvas.
1695 /// Return 0 in case of success, 1 if it is not a canvas or libProofDraw
1696 /// is not available.
1697 
1699 {
1700  static Int_t (*gDrawCanvasHook)(TObject *) = 0;
1701 
1702  // Load the library the first time
1703  if (!gDrawCanvasHook) {
1704  // Load library needed for graphics ...
1705  TString drawlib = "libProofDraw";
1706  char *p = 0;
1707  if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
1708  delete[] p;
1709  if (gSystem->Load(drawlib) != -1) {
1710  // Locate DrawCanvas
1711  Func_t f = 0;
1712  if ((f = gSystem->DynFindSymbol(drawlib,"DrawCanvas")))
1713  gDrawCanvasHook = (Int_t (*)(TObject *))(f);
1714  else
1715  Warning("DrawCanvas", "can't find DrawCanvas");
1716  } else
1717  Warning("DrawCanvas", "can't load %s", drawlib.Data());
1718  } else
1719  Warning("DrawCanvas", "can't locate %s", drawlib.Data());
1720  }
1721  if (gDrawCanvasHook && obj)
1722  return (*gDrawCanvasHook)(obj);
1723  // No drawing hook or object undefined
1724  return 1;
1725 }
1726 
1727 ////////////////////////////////////////////////////////////////////////////////
1728 /// Parse the arguments from var, sel and opt and fill the selector and
1729 /// object name accordingly.
1730 /// Return 0 in case of success, 1 if libProofDraw is not available.
1731 
1732 Int_t TProofPlayer::GetDrawArgs(const char *var, const char *sel, Option_t *opt,
1733  TString &selector, TString &objname)
1734 {
1735  static Int_t (*gGetDrawArgsHook)(const char *, const char *, Option_t *,
1736  TString &, TString &) = 0;
1737 
1738  // Load the library the first time
1739  if (!gGetDrawArgsHook) {
1740  // Load library needed for graphics ...
1741  TString drawlib = "libProofDraw";
1742  char *p = 0;
1743  if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
1744  delete[] p;
1745  if (gSystem->Load(drawlib) != -1) {
1746  // Locate GetDrawArgs
1747  Func_t f = 0;
1748  if ((f = gSystem->DynFindSymbol(drawlib,"GetDrawArgs")))
1749  gGetDrawArgsHook = (Int_t (*)(const char *, const char *, Option_t *,
1750  TString &, TString &))(f);
1751  else
1752  Warning("GetDrawArgs", "can't find GetDrawArgs");
1753  } else
1754  Warning("GetDrawArgs", "can't load %s", drawlib.Data());
1755  } else
1756  Warning("GetDrawArgs", "can't locate %s", drawlib.Data());
1757  }
1758  if (gGetDrawArgsHook)
1759  return (*gGetDrawArgsHook)(var, sel, opt, selector, objname);
1760  // No parser hook or object undefined
1761  return 1;
1762 }
1763 
1764 ////////////////////////////////////////////////////////////////////////////////
1765 /// Create/destroy a named canvas for feedback
1766 
1767 void TProofPlayer::FeedBackCanvas(const char *name, Bool_t create)
1768 {
1769  static void (*gFeedBackCanvasHook)(const char *, Bool_t) = 0;
1770 
1771  // Load the library the first time
1772  if (!gFeedBackCanvasHook) {
1773  // Load library needed for graphics ...
1774  TString drawlib = "libProofDraw";
1775  char *p = 0;
1776  if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
1777  delete[] p;
1778  if (gSystem->Load(drawlib) != -1) {
1779  // Locate FeedBackCanvas
1780  Func_t f = 0;
1781  if ((f = gSystem->DynFindSymbol(drawlib,"FeedBackCanvas")))
1782  gFeedBackCanvasHook = (void (*)(const char *, Bool_t))(f);
1783  else
1784  Warning("FeedBackCanvas", "can't find FeedBackCanvas");
1785  } else
1786  Warning("FeedBackCanvas", "can't load %s", drawlib.Data());
1787  } else
1788  Warning("FeedBackCanvas", "can't locate %s", drawlib.Data());
1789  }
1790  if (gFeedBackCanvasHook) (*gFeedBackCanvasHook)(name, create);
1791  // No parser hook or object undefined
1792  return;
1793 }
1794 
1795 ////////////////////////////////////////////////////////////////////////////////
1796 /// Return the size in bytes of the cache
1797 
1799 {
1800  if (fEvIter) return fEvIter->GetCacheSize();
1801  return -1;
1802 }
1803 
1804 ////////////////////////////////////////////////////////////////////////////////
1805 /// Return the number of entries in the learning phase
1806 
1808 {
1809  if (fEvIter) return fEvIter->GetLearnEntries();
1810  return -1;
1811 }
1812 
1813 ////////////////////////////////////////////////////////////////////////////////
1814 /// Switch on/off merge timer
1815 
1817 {
1818  if (on) {
1819  if (!fMergeSTW) fMergeSTW = new TStopwatch();
1820  PDB(kGlobal,1)
1821  Info("SetMerging", "ON: mergers: %d", fProof->fMergersCount);
1822  if (fNumMergers <= 0 && fProof->fMergersCount > 0)
1824  } else if (fMergeSTW) {
1825  fMergeSTW->Stop();
1826  Float_t rt = fMergeSTW->RealTime();
1827  PDB(kGlobal,1)
1828  Info("SetMerging", "OFF: rt: %f, mergers: %d", rt, fNumMergers);
1829  if (fQuery) {
1830  if (!fProof->TestBit(TProof::kIsClient) || fProof->IsLite()) {
1831  // On the master (or in Lite()) we set the merging time and the numebr of mergers
1832  fQuery->SetMergeTime(rt);
1833  fQuery->SetNumMergers(fNumMergers);
1834  } else {
1835  // In a standard client we save the transfer-to-client time
1836  fQuery->SetRecvTime(rt);
1837  }
1838  PDB(kGlobal,2) fQuery->Print("F");
1839  }
1840  }
1841 }
1842 
1843 //------------------------------------------------------------------------------
1844 
1846 
1847 ////////////////////////////////////////////////////////////////////////////////
1848 /// Process the specified TSelector object 'nentries' times.
1849 /// Used to test the PROOF interator mechanism for cycle-driven selectors in a
1850 /// local session.
1851 /// The return value is -1 in case of error and TSelector::GetStatus()
1852 /// in case of success.
1853 
1854 Long64_t TProofPlayerLocal::Process(TSelector *selector,
1855  Long64_t nentries, Option_t *option)
1856 {
1857  if (!selector) {
1858  Error("Process", "selector object undefiend!");
1859  return -1;
1860  }
1861 
1862  TDSetProxy *set = new TDSetProxy("", "", "");
1863  set->SetBit(TDSet::kEmpty);
1864  set->SetBit(TDSet::kIsLocal);
1865  Long64_t rc = Process(set, selector, option, nentries);
1866  SafeDelete(set);
1867 
1868  // Done
1869  return rc;
1870 }
1871 
1872 ////////////////////////////////////////////////////////////////////////////////
1873 /// Process the specified TSelector file 'nentries' times.
1874 /// Used to test the PROOF interator mechanism for cycle-driven selectors in a
1875 /// local session.
1876 /// Process specified TDSet on PROOF worker with TSelector object
1877 /// The return value is -1 in case of error and TSelector::GetStatus()
1878 /// in case of success.
1879 
1881  Long64_t nentries, Option_t *option)
1882 {
1883  TDSetProxy *set = new TDSetProxy("", "", "");
1884  set->SetBit(TDSet::kEmpty);
1885  set->SetBit(TDSet::kIsLocal);
1886  Long64_t rc = Process(set, selector, option, nentries);
1887  SafeDelete(set);
1888 
1889  // Done
1890  return rc;
1891 }
1892 
1893 
1894 //------------------------------------------------------------------------------
1895 
1897 
1898 ////////////////////////////////////////////////////////////////////////////////
1899 /// Destructor.
1900 
1902 {
1903  SafeDelete(fOutput); // owns the output list
1904  SafeDelete(fOutputLists);
1905 
1906  // Objects stored in maps are already deleted when merging the feedback
1907  SafeDelete(fFeedbackLists);
1908  SafeDelete(fPacketizer);
1909 
1910  if (fProcessMessage)
1911  SafeDelete(fProcessMessage);
1912 }
1913 
1914 ////////////////////////////////////////////////////////////////////////////////
1915 /// Init the packetizer
1916 /// Return 0 on success (fPacketizer is correctly initialized), -1 on failure.
1917 
1919  Long64_t first, const char *defpackunit,
1920  const char *defpackdata)
1921 {
1923  PDB(kGlobal,1) Info("Process","Enter");
1924  fDSet = dset;
1926 
1927  // This is done here to pickup on the fly changes
1928  Int_t honebyone = 1;
1929  if (TProof::GetParameter(fInput, "PROOF_MergeTH1OneByOne", honebyone) != 0)
1930  honebyone = gEnv->GetValue("ProofPlayer.MergeTH1OneByOne", 1);
1931  fMergeTH1OneByOne = (honebyone == 1) ? kTRUE : kFALSE;
1932 
1933  Bool_t noData = dset->TestBit(TDSet::kEmpty) ? kTRUE : kFALSE;
1934 
1935  TString packetizer;
1936  TList *listOfMissingFiles = 0;
1937 
1938  TMethodCall callEnv;
1939  TClass *cl;
1940  noData = dset->TestBit(TDSet::kEmpty) ? kTRUE : kFALSE;
1941 
1942  if (noData) {
1943 
1944  if (TProof::GetParameter(fInput, "PROOF_Packetizer", packetizer) != 0)
1945  packetizer = defpackunit;
1946  else
1947  Info("InitPacketizer", "using alternate packetizer: %s", packetizer.Data());
1948 
1949  // Get linked to the related class
1950  cl = TClass::GetClass(packetizer);
1951  if (cl == 0) {
1952  Error("InitPacketizer", "class '%s' not found", packetizer.Data());
1954  return -1;
1955  }
1956 
1957  // Init the constructor
1958  callEnv.InitWithPrototype(cl, cl->GetName(),"TList*,Long64_t,TList*,TProofProgressStatus*");
1959  if (!callEnv.IsValid()) {
1960  Error("InitPacketizer",
1961  "cannot find correct constructor for '%s'", cl->GetName());
1963  return -1;
1964  }
1965  callEnv.ResetParam();
1967  callEnv.SetParam((Long64_t) nentries);
1968  callEnv.SetParam((Long_t) fInput);
1969  callEnv.SetParam((Long_t) fProgressStatus);
1970 
1971  } else if (dset->TestBit(TDSet::kMultiDSet)) {
1972 
1973  // We have to process many datasets in one go, keeping them separate
1974  if (fProof->GetRunStatus() != TProof::kRunning) {
1975  // We have been asked to stop
1976  Error("InitPacketizer", "received stop/abort request");
1978  return -1;
1979  }
1980 
1981  // The multi packetizer
1982  packetizer = "TPacketizerMulti";
1983 
1984  // Get linked to the related class
1985  cl = TClass::GetClass(packetizer);
1986  if (cl == 0) {
1987  Error("InitPacketizer", "class '%s' not found", packetizer.Data());
1989  return -1;
1990  }
1991 
1992  // Init the constructor
1993  callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
1994  if (!callEnv.IsValid()) {
1995  Error("InitPacketizer", "cannot find correct constructor for '%s'", cl->GetName());
1997  return -1;
1998  }
1999  callEnv.ResetParam();
2000  callEnv.SetParam((Long_t) dset);
2002  callEnv.SetParam((Long64_t) first);
2003  callEnv.SetParam((Long64_t) nentries);
2004  callEnv.SetParam((Long_t) fInput);
2005  callEnv.SetParam((Long_t) fProgressStatus);
2006 
2007  // We are going to test validity during the packetizer initialization
2008  dset->SetBit(TDSet::kValidityChecked);
2009  dset->ResetBit(TDSet::kSomeInvalid);
2010 
2011  } else {
2012 
2013  // Lookup - resolve the end-point urls to optmize the distribution.
2014  // The lookup was previously called in the packetizer's constructor.
2015  // A list for the missing files may already have been added to the
2016  // output list; otherwise, if needed it will be created inside
2017  if ((listOfMissingFiles = (TList *)fInput->FindObject("MissingFiles"))) {
2018  // Move it to the output list
2019  fInput->Remove(listOfMissingFiles);
2020  } else {
2021  listOfMissingFiles = new TList;
2022  }
2023  // Do the lookup; we only skip it if explicitly requested so.
2024  TString lkopt;
2025  if (TProof::GetParameter(fInput, "PROOF_LookupOpt", lkopt) != 0 || lkopt != "none")
2026  dset->Lookup(kTRUE, &listOfMissingFiles);
2027 
2028  if (fProof->GetRunStatus() != TProof::kRunning) {
2029  // We have been asked to stop
2030  Error("InitPacketizer", "received stop/abort request");
2032  return -1;
2033  }
2034 
2035  if (!(dset->GetListOfElements()) ||
2036  !(dset->GetListOfElements()->GetSize())) {
2037  if (gProofServ)
2038  gProofServ->SendAsynMessage("InitPacketizer: No files from the data set were found - Aborting");
2039  Error("InitPacketizer", "No files from the data set were found - Aborting");
2041  if (listOfMissingFiles) {
2042  listOfMissingFiles->SetOwner();
2043  fOutput->Remove(listOfMissingFiles);
2044  SafeDelete(listOfMissingFiles);
2045  }
2046  return -1;
2047  }
2048 
2049  if (TProof::GetParameter(fInput, "PROOF_Packetizer", packetizer) != 0)
2050  // Using standard packetizer TAdaptivePacketizer
2051  packetizer = defpackdata;
2052  else
2053  Info("InitPacketizer", "using alternate packetizer: %s", packetizer.Data());
2054 
2055  // Get linked to the related class
2056  cl = TClass::GetClass(packetizer);
2057  if (cl == 0) {
2058  Error("InitPacketizer", "class '%s' not found", packetizer.Data());
2060  return -1;
2061  }
2062 
2063  // Init the constructor
2064  callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
2065  if (!callEnv.IsValid()) {
2066  Error("InitPacketizer", "cannot find correct constructor for '%s'", cl->GetName());
2068  return -1;
2069  }
2070  callEnv.ResetParam();
2071  callEnv.SetParam((Long_t) dset);
2073  callEnv.SetParam((Long64_t) first);
2074  callEnv.SetParam((Long64_t) nentries);
2075  callEnv.SetParam((Long_t) fInput);
2076  callEnv.SetParam((Long_t) fProgressStatus);
2077 
2078  // We are going to test validity during the packetizer initialization
2079  dset->SetBit(TDSet::kValidityChecked);
2080  dset->ResetBit(TDSet::kSomeInvalid);
2081  }
2082 
2083  // Get an instance of the packetizer
2084  Long_t ret = 0;
2085  callEnv.Execute(ret);
2086  if ((fPacketizer = (TVirtualPacketizer *)ret) == 0) {
2087  Error("InitPacketizer", "cannot construct '%s'", cl->GetName());
2089  return -1;
2090  }
2091 
2092  if (!fPacketizer->IsValid()) {
2093  Error("InitPacketizer",
2094  "instantiated packetizer object '%s' is invalid", cl->GetName());
2097  return -1;
2098  }
2099 
2100  // In multi mode retrieve the list of missing files
2101  if (!noData && dset->TestBit(TDSet::kMultiDSet)) {
2102  if ((listOfMissingFiles = (TList *) fInput->FindObject("MissingFiles"))) {
2103  // Remove it; it will be added to the output list
2104  fInput->Remove(listOfMissingFiles);
2105  }
2106  }
2107 
2108  if (!noData) {
2109  // Add invalid elements to the list of missing elements
2110  TDSetElement *elem = 0;
2111  if (dset->TestBit(TDSet::kSomeInvalid)) {
2112  TIter nxe(dset->GetListOfElements());
2113  while ((elem = (TDSetElement *)nxe())) {
2114  if (!elem->GetValid()) {
2115  if (!listOfMissingFiles)
2116  listOfMissingFiles = new TList;
2117  listOfMissingFiles->Add(elem->GetFileInfo(dset->GetType()));
2118  dset->Remove(elem, kFALSE);
2119  }
2120  }
2121  // The invalid elements have been removed
2123  }
2124 
2125  // Record the list of missing or invalid elements in the output list
2126  if (listOfMissingFiles && listOfMissingFiles->GetSize() > 0) {
2127  TIter missingFiles(listOfMissingFiles);
2128  TString msg;
2129  if (gDebug > 0) {
2130  TFileInfo *fi = 0;
2131  while ((fi = (TFileInfo *) missingFiles.Next())) {
2132  if (fi->GetCurrentUrl()) {
2133  msg = Form("File not found: %s - skipping!",
2134  fi->GetCurrentUrl()->GetUrl());
2135  } else {
2136  msg = Form("File not found: %s - skipping!", fi->GetName());
2137  }
2139  }
2140  }
2141  // Make sure it will be sent back
2142  if (!GetOutput("MissingFiles")) {
2143  listOfMissingFiles->SetName("MissingFiles");
2144  AddOutputObject(listOfMissingFiles);
2145  }
2146  TStatus *tmpStatus = (TStatus *)GetOutput("PROOF_Status");
2147  if (!tmpStatus) AddOutputObject((tmpStatus = new TStatus()));
2148 
2149  // Estimate how much data are missing
2150  Int_t ngood = dset->GetListOfElements()->GetSize();
2151  Int_t nbad = listOfMissingFiles->GetSize();
2152  Double_t xb = Double_t(nbad) / Double_t(ngood + nbad);
2153  msg = Form(" About %.2f %c of the requested files (%d out of %d) were missing or unusable; details in"
2154  " the 'missingFiles' list", xb * 100., '%', nbad, nbad + ngood);
2155  tmpStatus->Add(msg.Data());
2156  msg = Form(" +++\n"
2157  " +++ About %.2f %c of the requested files (%d out of %d) are missing or unusable; details in"
2158  " the 'MissingFiles' list\n"
2159  " +++", xb * 100., '%', nbad, nbad + ngood);
2161  } else {
2162  // Cleanup
2163  SafeDelete(listOfMissingFiles);
2164  }
2165  }
2166 
2167  // Done
2168  return 0;
2169 }
2170 
2171 ////////////////////////////////////////////////////////////////////////////////
2172 /// Process specified TDSet on PROOF.
2173 /// This method is called on client and on the PROOF master.
2174 /// The return value is -1 in case of an error and TSelector::GetStatus() in
2175 /// in case of success.
2176 
2177 Long64_t TProofPlayerRemote::Process(TDSet *dset, const char *selector_file,
2178  Option_t *option, Long64_t nentries,
2179  Long64_t first)
2180 {
2181  PDB(kGlobal,1) Info("Process", "Enter");
2182 
2183  fDSet = dset;
2185 
2186  if (!fProgressStatus) {
2187  Error("Process", "No progress status");
2188  return -1;
2189  }
2191 
2192  // delete fOutput;
2193  if (!fOutput)
2194  fOutput = new THashList;
2195  else
2196  fOutput->Clear();
2197 
2199 
2200  if (fProof->IsMaster()){
2201  TPerfStats::Start(fInput, fOutput);
2202  } else {
2204  }
2205 
2206  TStopwatch elapsed;
2207 
2208  // Define filename
2209  TString fn;
2210  fSelectorFileName = selector_file;
2211 
2212  if (fCreateSelObj) {
2213  if(!SendSelector(selector_file)) return -1;
2214  fn = gSystem->BaseName(selector_file);
2215  } else {
2216  fn = selector_file;
2217  }
2218 
2219  TMessage mesg(kPROOF_PROCESS);
2220 
2221  // Parse option
2222  Bool_t sync = (fProof->GetQueryMode(option) == TProof::kSync);
2223 
2224  TList *inputtmp = 0; // List of temporary input objects
2225  TDSet *set = dset;
2226  if (fProof->IsMaster()) {
2227 
2228  PDB(kPacketizer,1) Info("Process","Create Proxy TDSet");
2229  set = new TDSetProxy( dset->GetType(), dset->GetObjName(),
2230  dset->GetDirectory() );
2231  if (dset->TestBit(TDSet::kEmpty))
2232  set->SetBit(TDSet::kEmpty);
2233 
2234  if (InitPacketizer(dset, nentries, first, "TPacketizerUnit", "TPacketizer") != 0) {
2235  Error("Process", "cannot init the packetizer");
2237  return -1;
2238  }
2239 
2240  // Reset start, this is now managed by the packetizer
2241  first = 0;
2242 
2243  // Negative memlogfreq disable checks.
2244  // If 0 is passed we try to have 100 messages about memory
2245  // Otherwise we use the frequency passed.
2246  Int_t mrc = -1;
2247  Long64_t memlogfreq = -1, mlf;
2248  if (gSystem->Getenv("PROOF_MEMLOGFREQ")) {
2249  TString clf(gSystem->Getenv("PROOF_MEMLOGFREQ"));
2250  if (clf.IsDigit()) { memlogfreq = clf.Atoi(); mrc = 0; }
2251  }
2252  if ((mrc = TProof::GetParameter(fProof->GetInputList(), "PROOF_MemLogFreq", mlf)) == 0) memlogfreq = mlf;
2253  if (memlogfreq == 0) {
2254  memlogfreq = fPacketizer->GetTotalEntries()/(fProof->GetParallel()*100);
2255  if (memlogfreq <= 0) memlogfreq = 1;
2256  }
2257  if (mrc == 0) fProof->SetParameter("PROOF_MemLogFreq", memlogfreq);
2258 
2259 
2260  // Send input data, if any
2261  TString emsg;
2262  if (TProof::SendInputData(fQuery, fProof, emsg) != 0)
2263  Warning("Process", "could not forward input data: %s", emsg.Data());
2264 
2265  // Attach to the transient histogram with the assigned packets, if required
2266  if (fInput->FindObject("PROOF_StatsHist") != 0) {
2267  if (!(fProcPackets = (TH1I *) fOutput->FindObject("PROOF_ProcPcktHist"))) {
2268  Warning("Process", "could not attach to histogram 'PROOF_ProcPcktHist'");
2269  } else {
2270  PDB(kLoop,1)
2271  Info("Process", "attached to histogram 'PROOF_ProcPcktHist' to record"
2272  " packets being processed");
2273  }
2274  }
2275 
2276  } else {
2277 
2278  // Check whether we have to enforce the use of submergers
2279  if (gEnv->Lookup("Proof.UseMergers") && !fInput->FindObject("PROOF_UseMergers")) {
2280  Int_t smg = gEnv->GetValue("Proof.UseMergers",-1);
2281  if (smg >= 0) {
2282  fInput->Add(new TParameter<Int_t>("PROOF_UseMergers", smg));
2283  if (gEnv->Lookup("Proof.MergersByHost")) {
2284  Int_t mbh = gEnv->GetValue("Proof.MergersByHost",0);
2285  if (mbh != 0) {
2286  // Administrator settings have the priority
2287  TObject *o = 0;
2288  if ((o = fInput->FindObject("PROOF_MergersByHost"))) { fInput->Remove(o); delete o; }
2289  fInput->Add(new TParameter<Int_t>("PROOF_MergersByHost", mbh));
2290  }
2291  }
2292  }
2293  }
2294 
2295  // For a new query clients should make sure that the temporary
2296  // output list is empty
2297  if (fOutputLists) {
2298  fOutputLists->Delete();
2299  delete fOutputLists;
2300  fOutputLists = 0;
2301  }
2302 
2303  if (!sync) {
2305  Printf(" ");
2306  Info("Process","starting new query");
2307  }
2308 
2309  // Define fSelector in Client if processing with filename
2310  if (fCreateSelObj) {
2311  SafeDelete(fSelector);
2312  if (!(fSelector = TSelector::GetSelector(selector_file))) {
2313  if (!sync)
2314  gSystem->RedirectOutput(0);
2315  return -1;
2316  }
2317  }
2318 
2319  fSelectorClass = 0;
2320  fSelectorClass = fSelector->IsA();
2321 
2322  // Add fSelector to inputlist if processing with object
2323  if (!fCreateSelObj) {
2324  // In any input list was set into the selector move it to the PROOF
2325  // input list, because we do not want to stream the selector one
2326  if (fSelector->GetInputList() && fSelector->GetInputList()->GetSize() > 0) {
2327  TIter nxi(fSelector->GetInputList());
2328  TObject *o = 0;
2329  while ((o = nxi())) {
2330  if (!fInput->FindObject(o)) {
2331  fInput->Add(o);
2332  if (!inputtmp) {
2333  inputtmp = new TList;
2334  inputtmp->SetOwner(kFALSE);
2335  }
2336  inputtmp->Add(o);
2337  }
2338  }
2339  }
2340  fInput->Add(fSelector);
2341  }
2342  // Set the input list for initialization
2343  fSelector->SetInputList(fInput);
2344  fSelector->SetOption(option);
2345  if (fSelector->GetOutputList()) fSelector->GetOutputList()->Clear();
2346 
2347  PDB(kLoop,1) Info("Process","Call Begin(0)");
2348  fSelector->Begin(0);
2349 
2350  // Reset the input list to avoid double streaming and related problems (saving
2351  // the TQueryResult)
2352  if (!fCreateSelObj) fSelector->SetInputList(0);
2353 
2354  // Send large input data objects, if any
2356 
2357  if (!sync)
2358  gSystem->RedirectOutput(0);
2359  }
2360 
2361  TCleanup clean(this);
2362  SetupFeedback();
2363 
2364  TString opt = option;
2365 
2366  // Old servers need a dedicated streamer
2367  if (fProof->fProtocol < 13)
2368  dset->SetWriteV3(kTRUE);
2369 
2370  // Workers will get the entry ranges from the packetizer
2371  Long64_t num = (gProofServ && gProofServ->IsMaster() && gProofServ->IsParallel()) ? -1 : nentries;
2372  Long64_t fst = (gProofServ && gProofServ->IsMaster() && gProofServ->IsParallel()) ? -1 : first;
2373 
2374  // Entry- or Event- list ?
2375  TEntryList *enl = (!fProof->IsMaster()) ? dynamic_cast<TEntryList *>(set->GetEntryList())
2376  : (TEntryList *)0;
2377  TEventList *evl = (!fProof->IsMaster() && !enl) ? dynamic_cast<TEventList *>(set->GetEntryList())
2378  : (TEventList *)0;
2379  if (fProof->fProtocol > 14) {
2380  if (fProcessMessage) delete fProcessMessage;
2382  mesg << set << fn << fInput << opt << num << fst << evl << sync << enl;
2383  (*fProcessMessage) << set << fn << fInput << opt << num << fst << evl << sync << enl;
2384  } else {
2385  mesg << set << fn << fInput << opt << num << fst << evl << sync;
2386  if (enl)
2387  // Not supported remotely
2388  Warning("Process","entry lists not supported by the server");
2389  }
2390 
2391  // Reset the merging progress information
2392  fProof->ResetMergePrg();
2393 
2394  Int_t nb = fProof->Broadcast(mesg);
2395  PDB(kGlobal,1) Info("Process", "Broadcast called: %d workers notified", nb);
2396  if (fProof->IsLite()) fProof->fNotIdle += nb;
2397 
2398  // Reset streamer choice
2399  if (fProof->fProtocol < 13)
2400  dset->SetWriteV3(kFALSE);
2401 
2402  // Redirect logs from master to special log frame
2403  if (IsClient())
2404  fProof->fRedirLog = kTRUE;
2405 
2406  if (!IsClient()){
2407  // Signal the start of finalize for the memory log grepping
2408  Info("Process|Svc", "Start merging Memory information");
2409  }
2410 
2411  if (!sync) {
2412  if (IsClient()) {
2413  // Asynchronous query: just make sure that asynchronous input
2414  // is enabled and return the prompt
2415  PDB(kGlobal,1) Info("Process","Asynchronous processing:"
2416  " activating CollectInputFrom");
2417  fProof->Activate();
2418 
2419  // Receive the acknowledgement and query sequential number
2420  fProof->Collect();
2421 
2422  return fProof->fSeqNum;
2423 
2424  } else {
2425  PDB(kGlobal,1) Info("Process","Calling Collect");
2426  fProof->Collect();
2427 
2428  HandleTimer(0); // force an update of final result
2429  // This forces a last call to TPacketizer::HandleTimer via the second argument
2430  // (the first is ignored). This is needed when some events were skipped so that
2431  // the total number of entries is not the one requested. The packetizer has no
2432  // way in such a case to understand that processing is finished: it must be told.
2433  if (fPacketizer) {
2434  fPacketizer->StopProcess(kFALSE, kTRUE);
2435  // The progress timer will now stop itself at the next call
2437  // Store process info
2438  elapsed.Stop();
2439  if (fQuery)
2440  fQuery->SetProcessInfo(0, 0., fPacketizer->GetBytesRead(),
2442  elapsed.RealTime());
2443  }
2444  StopFeedback();
2445 
2446  return Finalize(kFALSE,sync);
2447  }
2448  } else {
2449 
2450  PDB(kGlobal,1) Info("Process","Synchronous processing: calling Collect");
2451  fProof->Collect();
2452  if (!(fProof->IsSync())) {
2453  // The server required to switch to asynchronous mode
2454  Info("Process", "switching to the asynchronous mode ...");
2455  return fProof->fSeqNum;
2456  }
2457 
2458  // Restore prompt logging, for clients (Collect leaves things as they were
2459  // at the time it was called)
2460  if (IsClient())
2461  fProof->fRedirLog = kFALSE;
2462 
2463  if (!IsClient()) {
2464  // Force an update of final result
2465  HandleTimer(0);
2466  // This forces a last call to TPacketizer::HandleTimer via the second argument
2467  // (the first is ignored). This is needed when some events were skipped so that
2468  // the total number of entries is not the one requested. The packetizer has no
2469  // way in such a case to understand that processing is finished: it must be told.
2470  if (fPacketizer) {
2471  fPacketizer->StopProcess(kFALSE, kTRUE);
2472  // The progress timer will now stop itself at the next call
2474  // Store process info
2475  if (fQuery)
2476  fQuery->SetProcessInfo(0, 0., fPacketizer->GetBytesRead(),
2479  }
2480  } else {
2481  // Set the input list: maybe required at termination
2482  if (!fCreateSelObj) fSelector->SetInputList(fInput);
2483  }
2484  StopFeedback();
2485 
2486  Long64_t rc = -1;
2488  rc = Finalize(kFALSE,sync);
2489 
2490  // Remove temporary input objects, if any
2491  if (inputtmp) {
2492  TIter nxi(inputtmp);
2493  TObject *o = 0;
2494  while ((o = nxi())) fInput->Remove(o);
2495  SafeDelete(inputtmp);
2496  }
2497 
2498  // Done
2499  return rc;
2500  }
2501 }
2502 
2503 ////////////////////////////////////////////////////////////////////////////////
2504 /// Process specified TDSet on PROOF.
2505 /// This method is called on client and on the PROOF master.
2506 /// The return value is -1 in case of an error and TSelector::GetStatus() in
2507 /// in case of success.
2508 
2510  Option_t *option, Long64_t nentries,
2511  Long64_t first)
2512 {
2513  if (!selector) {
2514  Error("Process", "selector object undefined");
2515  return -1;
2516  }
2517 
2518  // Define fSelector in Client
2519  if (IsClient() && (selector != fSelector)) {
2520  if (fCreateSelObj) SafeDelete(fSelector);
2521  fSelector = selector;
2522  }
2523 
2525  Long64_t rc = Process(dset, selector->ClassName(), option, nentries, first);
2526  fCreateSelObj = kTRUE;
2527 
2528  // Done
2529  return rc;
2530 }
2531 
2532 ////////////////////////////////////////////////////////////////////////////////
2533 /// Prepares the given list of new workers to join a progressing process.
2534 /// Returns kTRUE on success, kFALSE otherwise.
2535 
2537 {
2538  if (!fProcessMessage || !fProof || !fPacketizer) {
2539  Error("Process", "Should not happen: fProcessMessage=%p fProof=%p fPacketizer=%p",
2541  return kFALSE;
2542  }
2543 
2544  if (!workers || !fProof->IsMaster()) {
2545  Error("Process", "Invalid call");
2546  return kFALSE;
2547  }
2548 
2549  PDB(kGlobal, 1)
2550  Info("Process", "Preparing %d new worker(s) to process", workers->GetEntries());
2551 
2552  // Sends the file associated to the TSelector, if necessary
2553  if (fCreateSelObj) {
2554  PDB(kGlobal, 2)
2555  Info("Process", "Sending selector file %s", fSelectorFileName.Data());
2557  Error("Process", "Problems in sending selector file %s", fSelectorFileName.Data());
2558  return kFALSE;
2559  }
2560  }
2561 
2562  if (fProof->IsLite()) fProof->fNotIdle += workers->GetSize();
2563 
2564  PDB(kGlobal, 2)
2565  Info("Process", "Adding new workers to the packetizer");
2566  if (fPacketizer->AddWorkers(workers) == -1) {
2567  Error("Process", "Cannot add new workers to the packetizer!");
2568  return kFALSE; // TODO: make new wrks inactive
2569  }
2570 
2571  PDB(kGlobal, 2)
2572  Info("Process", "Broadcasting process message to new workers");
2573  fProof->Broadcast(*fProcessMessage, workers);
2574 
2575  // Don't call Collect(): we came here from a global Collect() already which
2576  // will take care of new workers as well
2577 
2578  return kTRUE;
2579 
2580 }
2581 
2582 ////////////////////////////////////////////////////////////////////////////////
2583 /// Merge output in files
2584 
2586 {
2587  PDB(kOutput,1) Info("MergeOutputFiles", "enter: fOutput size: %d", fOutput->GetSize());
2588  PDB(kOutput,2) fOutput->ls();
2589 
2590  TList *rmList = 0;
2591  if (fMergeFiles) {
2592  TIter nxo(fOutput);
2593  TObject *o = 0;
2594  TProofOutputFile *pf = 0;
2595  while ((o = nxo())) {
2596  if ((pf = dynamic_cast<TProofOutputFile*>(o))) {
2597 
2598  PDB(kOutput,2) pf->Print();
2599 
2600  if (pf->IsMerge()) {
2601 
2602  // Point to the merger
2603  Bool_t localMerge = (pf->GetTypeOpt() == TProofOutputFile::kLocal) ? kTRUE : kFALSE;
2604  TFileMerger *filemerger = pf->GetFileMerger(localMerge);
2605  if (!filemerger) {
2606  Error("MergeOutputFiles", "file merger is null in TProofOutputFile! Protocol error?");
2607  pf->Print();
2608  continue;
2609  }
2610  // If only one instance the list in the merger is not yet created: do it now
2611  if (!pf->IsMerged()) {
2612  PDB(kOutput,2) pf->Print();
2613  TString fileLoc = TString::Format("%s/%s", pf->GetDir(), pf->GetFileName());
2614  filemerger->AddFile(fileLoc);
2615  }
2616  // Datadir
2617  TString ddir, ddopts;
2618  if (gProofServ) {
2619  ddir.Form("%s/", gProofServ->GetDataDir());
2621  }
2622  // Set the output file
2623  TString outfile(pf->GetOutputFileName());
2624  if (outfile.Contains("<datadir>/")) {
2625  outfile.ReplaceAll("<datadir>/", ddir.Data());
2626  if (!ddopts.IsNull())
2627  outfile += TString::Format("?%s", ddopts.Data());
2628  pf->SetOutputFileName(outfile);
2629  }
2630  if ((gProofServ && gProofServ->IsTopMaster()) || (fProof && fProof->IsLite())) {
2632  TString srv;
2634  TUrl usrv(srv);
2635  Bool_t localFile = kFALSE;
2636  if (pf->IsRetrieve()) {
2637  // This file will be retrieved by the client: we created it in the data dir
2638  // and save the file URL on the client in the title
2639  if (outfile.BeginsWith("client:")) outfile.Replace(0, 7, "");
2640  TString bn = gSystem->BaseName(TUrl(outfile.Data(), kTRUE).GetFile());
2641  // The output file path on the master
2642  outfile.Form("%s%s", ddir.Data(), bn.Data());
2643  // Save the client path in the title if not defined yet
2644  if (strlen(pf->GetTitle()) <= 0) pf->SetTitle(bn);
2645  // The file is local
2646  localFile = kTRUE;
2647  } else {
2648  // Check if the file is on the master or elsewhere
2649  if (outfile.BeginsWith("master:")) outfile.Replace(0, 7, "");
2650  // Check locality
2651  TUrl uof(outfile.Data(), kTRUE);
2652  TString lfn;
2653  ftyp = TFile::GetType(uof.GetUrl(), "RECREATE", &lfn);
2654  if (ftyp == TFile::kLocal && !srv.IsNull()) {
2655  // Check if is a different server
2656  if (uof.GetPort() > 0 && usrv.GetPort() > 0 &&
2657  usrv.GetPort() != uof.GetPort()) ftyp = TFile::kNet;
2658  }
2659  // If it is really local set the file name
2660  if (ftyp == TFile::kLocal) outfile = lfn;
2661  // The file maybe local
2662  if (ftyp == TFile::kLocal || ftyp == TFile::kFile) localFile = kTRUE;
2663  }
2664  // The remote output file name (the one to be used by the client)
2665  TString outfilerem(outfile);
2666  // For local files we add the local server
2667  if (localFile) {
2668  // Remove prefix, if any, if included and if Xrootd
2669  TProofServ::FilterLocalroot(outfilerem, srv);
2670  outfilerem.Insert(0, srv);
2671  }
2672  // Save the new remote output filename
2673  pf->SetOutputFileName(outfilerem);
2674  // Align the filename
2675  pf->SetFileName(gSystem->BaseName(outfilerem));
2676  }
2677  if (!filemerger->OutputFile(outfile)) {
2678  Error("MergeOutputFiles", "cannot open the output file");
2679  continue;
2680  }
2681  // Merge
2682  PDB(kSubmerger,2) filemerger->PrintFiles("");
2683  if (!filemerger->Merge()) {
2684  Error("MergeOutputFiles", "cannot merge the output files");
2685  continue;
2686  }
2687  // Remove the files
2688  TList *fileList = filemerger->GetMergeList();
2689  if (fileList) {
2690  TIter next(fileList);
2691  TObjString *url = 0;
2692  while((url = (TObjString*)next())) {
2693  TUrl u(url->GetName());
2694  if (!strcmp(u.GetProtocol(), "file")) {
2695  gSystem->Unlink(u.GetFile());
2696  } else {
2697  gSystem->Unlink(url->GetName());
2698  }
2699  }
2700  }
2701  // Reset the merger
2702  filemerger->Reset();
2703 
2704  } else {
2705 
2706  // If not yet merged (for example when having only 1 active worker,
2707  // we need to create the dataset by calling Merge on an effectively empty list
2708  if (!pf->IsMerged()) {
2709  TList dumlist;
2710  dumlist.Add(new TNamed("dum", "dum"));
2711  dumlist.SetOwner(kTRUE);
2712  pf->Merge(&dumlist);
2713  }
2714  // Point to the dataset
2716  if (!fc) {
2717  Error("MergeOutputFiles", "file collection is null in TProofOutputFile! Protocol error?");
2718  pf->Print();
2719  continue;
2720  }
2721  // Add the collection to the output list for registration and/or to be returned
2722  // to the client
2723  fOutput->Add(fc);
2724  // Do not cleanup at destruction
2725  pf->ResetFileCollection();
2726  // Tell the main thread to register this dataset, if needed
2727  if (pf->IsRegister()) {
2728  TString opt;
2729  if ((pf->GetTypeOpt() & TProofOutputFile::kOverwrite)) opt += "O";
2730  if ((pf->GetTypeOpt() & TProofOutputFile::kVerify)) opt += "V";
2731  if (!fOutput->FindObject("PROOFSERV_RegisterDataSet"))
2732  fOutput->Add(new TNamed("PROOFSERV_RegisterDataSet", ""));
2733  TString tag = TString::Format("DATASET_%s", pf->GetTitle());
2734  fOutput->Add(new TNamed(tag, opt));
2735  }
2736  // Remove this object from the output list and schedule it for distruction
2737  fOutput->Remove(pf);
2738  if (!rmList) rmList = new TList;
2739  rmList->Add(pf);
2740  PDB(kOutput,2) fOutput->Print();
2741  }
2742  }
2743  }
2744  }
2745 
2746  // Remove objects scheduled for removal
2747  if (rmList && rmList->GetSize() > 0) {
2748  TIter nxo(rmList);
2749  TObject *o = 0;
2750  while((o = nxo())) {
2751  fOutput->Remove(o);
2752  }
2753  rmList->SetOwner(kTRUE);
2754  delete rmList;
2755  }
2756 
2757  PDB(kOutput,1) Info("MergeOutputFiles", "done!");
2758 
2759  // Done
2760  return kTRUE;
2761 }
2762 
2763 
2764 ////////////////////////////////////////////////////////////////////////////////
2765 /// Set the selector's data members:
2766 /// find the mapping of data members to otuput list entries in the output list
2767 /// and apply it.
2768 
2770 {
2773  if (!olsdm) {
2774  PDB(kOutput,1) Warning("SetSelectorDataMembersFromOutputList",
2775  "failed to find map object in output list!");
2776  return;
2777  }
2778 
2779  olsdm->SetDataMembers(fSelector);
2780 }
2781 
2782 ////////////////////////////////////////////////////////////////////////////////
2783 
2785 {
2786  // Finalize a query.
2787  // Returns -1 in case of an error, 0 otherwise.
2788 
2789  if (IsClient()) {
2790  if (fOutputLists == 0) {
2791  if (force)
2792  if (fQuery)
2793  return fProof->Finalize(Form("%s:%s", fQuery->GetTitle(),
2794  fQuery->GetName()), force);
2795  } else {
2796  // Make sure the all objects are in the output list
2797  PDB(kGlobal,1) Info("Finalize","Calling Merge Output to finalize the output list");
2798  MergeOutput();
2799  }
2800  }
2801 
2802  Long64_t rv = 0;
2803  if (fProof->IsMaster()) {
2804 
2805  // Fill information for monitoring and stop it
2806  TStatus *status = (TStatus *) fOutput->FindObject("PROOF_Status");
2807  if (!status) {
2808  // The query was aborted: let's add some info in the output list
2809  status = new TStatus();
2810  fOutput->Add(status);
2811  TString emsg = TString::Format("Query aborted after %lld entries", GetEventsProcessed());
2812  status->Add(emsg);
2813  }
2814  status->SetExitStatus((Int_t) GetExitStatus());
2815 
2816  PDB(kOutput,1) Info("Finalize","Calling Merge Output");
2817  // Some objects (e.g. histos in autobin) may not have been merged yet
2818  // do it now
2819  MergeOutput();
2820 
2821  fOutput->SetOwner();
2822 
2823  // Add the active-wrks-vs-proctime info from the packetizer
2824  if (fPacketizer) {
2825  TObject *pperf = (TObject *) fPacketizer->GetProgressPerf(kTRUE);
2826  if (pperf) fOutput->Add(pperf);
2827  TList *parms = fPacketizer->GetConfigParams(kTRUE);
2828  if (parms) {
2829  TIter nxo(parms);
2830  TObject *o = 0;
2831  while ((o = nxo())) fOutput->Add(o);
2832  }
2833 
2834  // If other invalid elements were found during processing, add them to the
2835  // list of missing elements
2836  TDSetElement *elem = 0;
2837  if (fPacketizer->GetFailedPackets()) {
2839  TList *listOfMissingFiles = (TList *) fOutput->FindObject("MissingFiles");
2840  if (!listOfMissingFiles) {
2841  listOfMissingFiles = new TList;
2842  listOfMissingFiles->SetName("MissingFiles");
2843  }
2845  while ((elem = (TDSetElement *)nxe()))
2846  listOfMissingFiles->Add(elem->GetFileInfo(type));
2847  if (!fOutput->FindObject(listOfMissingFiles)) fOutput->Add(listOfMissingFiles);
2848  }
2849  }
2850 
2851  TPerfStats::Stop();
2852  // Save memory usage on master
2853  Long_t vmaxmst, rmaxmst;
2854  TPerfStats::GetMemValues(vmaxmst, rmaxmst);
2855  status->SetMemValues(vmaxmst, rmaxmst, kTRUE);
2856 
2857  SafeDelete(fSelector);
2858 
2859  } else {
2860  if (fExitStatus != kAborted) {
2861 
2862  if (!sync) {
2863  // Reinit selector (with multi-sessioning we must do this until
2864  // TSelector::GetSelector() is optimized to i) avoid reloading of an
2865  // unchanged selector and ii) invalidate existing instances of
2866  // reloaded selector)
2867  if (ReinitSelector(fQuery) == -1) {
2868  Info("Finalize", "problems reinitializing selector \"%s\"",
2869  fQuery->GetSelecImp()->GetName());
2870  return -1;
2871  }
2872  }
2873 
2874  if (fPacketizer)
2875  if (TList *failedPackets = fPacketizer->GetFailedPackets()) {
2877  failedPackets->SetName("FailedPackets");
2878  AddOutputObject(failedPackets);
2879 
2880  TStatus *status = (TStatus *)GetOutput("PROOF_Status");
2881  if (!status) AddOutputObject((status = new TStatus()));
2882  status->Add("Some packets were not processed! Check the the"
2883  " 'FailedPackets' list in the output list");
2884  }
2885 
2886  // Some input parameters may be needed in Terminate
2887  fSelector->SetInputList(fInput);
2888 
2889  TList *output = fSelector->GetOutputList();
2890  if (output) {
2891  TIter next(fOutput);
2892  while(TObject* obj = next()) {
2893  if (fProof->IsParallel() || DrawCanvas(obj) == 1)
2894  // Either parallel or not a canvas or not able to display it:
2895  // just add to the list
2896  output->Add(obj);
2897  }
2898  } else {
2899  Warning("Finalize", "undefined output list in the selector! Protocol error?");
2900  }
2901 
2902  // We need to do this because the output list can be modified in TSelector::Terminate
2903  // in a way to invalidate existing objects; so we clean the links when still valid and
2904  // we re-copy back later
2905  fOutput->SetOwner(kFALSE);
2906  fOutput->Clear("nodelete");
2907 
2908  // Map output objects to selector members
2910 
2911  PDB(kLoop,1) Info("Finalize","Call Terminate()");
2912  // This is the end of merging
2913  SetMerging(kFALSE);
2914  // We measure the merge time
2915  fProof->fQuerySTW.Reset();
2916  // Call Terminate now
2917  fSelector->Terminate();
2918 
2919  rv = fSelector->GetStatus();
2920 
2921  // Copy the output list back and clean the selector's list
2922  TIter it(output);
2923  while(TObject* o = it()) {
2924  fOutput->Add(o);
2925  }
2926 
2927  // Save the output list in the current query, if any
2928  if (fQuery) {
2929  fQuery->SetOutputList(fOutput);
2930  // Set in finalized state (cannot be done twice)
2931  fQuery->SetFinalized();
2932  } else {
2933  Warning("Finalize","current TQueryResult object is undefined!");
2934  }
2935 
2936  if (!fCreateSelObj) {
2937  fInput->Remove(fSelector);
2938  fOutput->Remove(fSelector);
2939  if (output) output->Remove(fSelector);
2940  fSelector = 0;
2941  }
2942 
2943  // We have transferred copy of the output objects in TQueryResult,
2944  // so now we can cleanup the selector, making sure that we do not
2945  // touch the output objects
2946  if (output) { output->SetOwner(kFALSE); output->Clear("nodelete"); }
2947  if (fCreateSelObj) SafeDelete(fSelector);
2948 
2949  // Delete fOutput (not needed anymore, cannot be finalized twice),
2950  // making sure that the objects saved in TQueryResult are not deleted
2951  fOutput->SetOwner(kFALSE);
2952  fOutput->Clear("nodelete");
2953  SafeDelete(fOutput);
2954 
2955  } else {
2956 
2957  // Cleanup
2958  fOutput->SetOwner();
2959  SafeDelete(fSelector);
2960  if (!fCreateSelObj) fSelector = 0;
2961  }
2962  }
2963  PDB(kGlobal,1) Info("Process","exit");
2964 
2965  if (!IsClient()) {
2966  Info("Finalize", "finalization on %s finished", gProofServ->GetPrefix());
2967  }
2969 
2970  return rv;
2971 }
2972 
2973 ////////////////////////////////////////////////////////////////////////////////
2974 /// Finalize the results of a query already processed.
2975 
2977 {
2978  PDB(kGlobal,1) Info("Finalize(TQueryResult *)","Enter");
2979 
2980  if (!IsClient()) {
2981  Info("Finalize(TQueryResult *)",
2982  "method to be executed only on the clients");
2983  return -1;
2984  }
2985 
2986  if (!qr) {
2987  Info("Finalize(TQueryResult *)", "query undefined");
2988  return -1;
2989  }
2990 
2991  if (qr->IsFinalized()) {
2992  Info("Finalize(TQueryResult *)", "query already finalized");
2993  return -1;
2994  }
2995 
2996  // Reset the list
2997  if (!fOutput)
2998  fOutput = new THashList;
2999  else
3000  fOutput->Clear();
3001 
3002  // Make sure that the temporary output list is empty
3003  if (fOutputLists) {
3004  fOutputLists->Delete();
3005  delete fOutputLists;
3006  fOutputLists = 0;
3007  }
3008 
3009  // Re-init the selector
3011 
3012  // Import the output list
3013  TList *tmp = (TList *) qr->GetOutputList();
3014  if (!tmp) {
3015  gSystem->RedirectOutput(0);
3016  Info("Finalize(TQueryResult *)", "outputlist is empty");
3017  return -1;
3018  }
3019  TList *out = fOutput;
3020  if (fProof->fProtocol < 11)
3021  out = new TList;
3022  TIter nxo(tmp);
3023  TObject *o = 0;
3024  while ((o = nxo()))
3025  out->Add(o->Clone());
3026 
3027  // Adopts the list
3028  if (fProof->fProtocol < 11) {
3029  out->SetOwner();
3030  StoreOutput(out);
3031  }
3032  gSystem->RedirectOutput(0);
3033 
3035 
3036  // Finalize it
3037  SetCurrentQuery(qr);
3038  Long64_t rc = Finalize();
3040 
3041  return rc;
3042 }
3043 
3044 ////////////////////////////////////////////////////////////////////////////////
3045 /// Send the selector file(s) to master or worker nodes.
3046 
3047 Bool_t TProofPlayerRemote::SendSelector(const char* selector_file)
3048 {
3049  // Check input
3050  if (!selector_file) {
3051  Info("SendSelector", "Invalid input: selector (file) name undefined");
3052  return kFALSE;
3053  }
3054 
3055  if (!strchr(gSystem->BaseName(selector_file), '.')) {
3056  if (gDebug > 1)
3057  Info("SendSelector", "selector name '%s' does not contain a '.':"
3058  " nothing to send, it will be loaded from a library", selector_file);
3059  return kTRUE;
3060  }
3061 
3062  // Extract the fine name first
3063  TString selec = selector_file;
3064  TString aclicMode;
3065  TString arguments;
3066  TString io;
3067  selec = gSystem->SplitAclicMode(selec, aclicMode, arguments, io);
3068 
3069  // Expand possible envs or '~'
3070  gSystem->ExpandPathName(selec);
3071 
3072  // Update the macro path
3074  TString np(gSystem->DirName(selec));
3075  if (!np.IsNull()) {
3076  np += ":";
3077  if (!mp.BeginsWith(np) && !mp.Contains(":"+np)) {
3078  Int_t ip = (mp.BeginsWith(".:")) ? 2 : 0;
3079  mp.Insert(ip, np);
3080  TROOT::SetMacroPath(mp);
3081  if (gDebug > 0)
3082  Info("SendSelector", "macro path set to '%s'", TROOT::GetMacroPath());
3083  }
3084  }
3085 
3086  // Header file
3087  TString header = selec;
3088  header.Remove(header.Last('.'));
3089  header += ".h";
3090  if (gSystem->AccessPathName(header, kReadPermission)) {
3091  TString h = header;
3092  header.Remove(header.Last('.'));
3093  header += ".hh";
3094  if (gSystem->AccessPathName(header, kReadPermission)) {
3095  Info("SendSelector",
3096  "header file not found: tried: %s %s", h.Data(), header.Data());
3097  return kFALSE;
3098  }
3099  }
3100 
3101  // Send files now
3103  Info("SendSelector", "problems sending implementation file %s", selec.Data());
3104  return kFALSE;
3105  }
3106  if (fProof->SendFile(header, (TProof::kBinary | TProof::kForward | TProof::kCp)) == -1) {
3107  Info("SendSelector", "problems sending header file %s", header.Data());
3108  return kFALSE;
3109  }
3110 
3111  return kTRUE;
3112 }
3113 
3114 ////////////////////////////////////////////////////////////////////////////////
3115 /// Merge objects in output the lists.
3116 
3118 {
3119  PDB(kOutput,1) Info("MergeOutput","Enter");
3120 
3121  TObject *obj = 0;
3122  if (fOutputLists) {
3123 
3125 
3126  TList *list;
3127  while ( (list = (TList *) next()) ) {
3128 
3129  if (!(obj = fOutput->FindObject(list->GetName()))) {
3130  obj = list->First();
3131  list->Remove(obj);
3132  fOutput->Add(obj);
3133  }
3134 
3135  if ( list->IsEmpty() ) continue;
3136 
3137  TMethodCall callEnv;
3138  if (obj->IsA())
3139  callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
3140  if (callEnv.IsValid()) {
3141  callEnv.SetParam((Long_t) list);
3142  callEnv.Execute(obj);
3143  } else {
3144  // No Merge interface, return individual objects
3145  while ( (obj = list->First()) ) {
3146  fOutput->Add(obj);
3147  list->Remove(obj);
3148  }
3149  }
3150  }
3152 
3153  } else {
3154 
3155  PDB(kOutput,1) Info("MergeOutput","fOutputLists empty");
3156  }
3157 
3158  if (!IsClient() || fProof->IsLite()) {
3159  // Merge the output files created on workers, if any
3160  MergeOutputFiles();
3161  }
3162 
3163  // If there are TProofOutputFile objects we have to make sure that the internal
3164  // information is consistent for the cases where this object is going to be merged
3165  // again (e.g. when using submergers or in a multi-master setup). This may not be
3166  // the case because the first coming in is taken as reference and it has the
3167  // internal dir and raw dir of the originating worker.
3168  TString key;
3169  TNamed *nm = 0;
3170  TList rmlist;
3171  TIter nxo(fOutput);
3172  while ((obj = nxo())) {
3173  TProofOutputFile *pf = dynamic_cast<TProofOutputFile *>(obj);
3174  if (pf) {
3175  if (gProofServ) {
3176  PDB(kOutput,2) Info("MergeOutput","found TProofOutputFile '%s'", obj->GetName());
3177  TString dir(pf->GetOutputFileName());
3178  PDB(kOutput,2) Info("MergeOutput","outputfilename: '%s'", dir.Data());
3179  // The dir
3180  if (dir.Last('/') != kNPOS) dir.Remove(dir.Last('/')+1);
3181  PDB(kOutput,2) Info("MergeOutput","dir: '%s'", dir.Data());
3182  pf->SetDir(dir);
3183  // The raw dir; for xrootd based system we include the 'localroot', if any
3184  TUrl u(dir);
3185  dir = u.GetFile();
3186  TString pfx = gEnv->GetValue("Path.Localroot","");
3187  if (!pfx.IsNull() &&
3188  (!strcmp(u.GetProtocol(), "root") || !strcmp(u.GetProtocol(), "xrd")))
3189  dir.Insert(0, pfx);
3190  PDB(kOutput,2) Info("MergeOutput","rawdir: '%s'", dir.Data());
3191  pf->SetDir(dir, kTRUE);
3192  // The worker ordinal
3194  // The saved output file name, if any
3195  key.Form("PROOF_OutputFileName_%s", pf->GetFileName());
3196  if ((nm = (TNamed *) fOutput->FindObject(key.Data()))) {
3197  pf->SetOutputFileName(nm->GetTitle());
3198  rmlist.Add(nm);
3200  pf->SetOutputFileName(0);
3202  }
3203  // The filename (order is important to exclude '.merger' from the key)
3204  dir = pf->GetFileName();
3206  dir += ".merger";
3207  pf->SetMerged(kFALSE);
3208  } else {
3209  if (dir.EndsWith(".merger")) dir.Remove(dir.Last('.'));
3210  }
3211  pf->SetFileName(dir);
3212  } else if (fProof->IsLite()) {
3213  // The ordinal
3214  pf->SetWorkerOrdinal("0");
3215  // The dir
3216  pf->SetDir(gSystem->DirName(pf->GetOutputFileName()));
3217  // The filename and raw dir
3218  TUrl u(pf->GetOutputFileName(), kTRUE);
3219  pf->SetFileName(gSystem->BaseName(u.GetFile()));
3220  pf->SetDir(gSystem->DirName(u.GetFile()), kTRUE);
3221  // Notify the output path
3222  Printf("\nOutput file: %s", pf->GetOutputFileName());
3223  }
3224  } else {
3225  PDB(kOutput,2) Info("MergeOutput","output object '%s' is not a TProofOutputFile", obj->GetName());
3226  }
3227  }
3228 
3229  // Remove temporary objects from fOutput
3230  if (rmlist.GetSize() > 0) {
3231  TIter nxrm(&rmlist);
3232  while ((obj = nxrm()))
3233  fOutput->Remove(obj);
3234  rmlist.SetOwner(kTRUE);
3235  }
3236 
3237  // If requested (typically in case of submerger to count possible side-effects in that process)
3238  // save the measured memory usage
3239  if (saveMemValues) {
3240  TPerfStats::Stop();
3241  // Save memory usage on master
3242  Long_t vmaxmst, rmaxmst;
3243  TPerfStats::GetMemValues(vmaxmst, rmaxmst);
3244  TStatus *status = (TStatus *) fOutput->FindObject("PROOF_Status");
3245  if (status) status->SetMemValues(vmaxmst, rmaxmst, kFALSE);
3246  }
3247 
3248  PDB(kOutput,1) fOutput->Print();
3249  PDB(kOutput,1) Info("MergeOutput","leave (%d object(s))", fOutput->GetSize());
3250 }
3251 
3252 ////////////////////////////////////////////////////////////////////////////////
3253 /// Progress signal.
3254 
3256 {
3257  if (IsClient()) {
3258  fProof->Progress(total, processed);
3259  } else {
3260  // Send to the previous tier
3262  m << total << processed;
3263  gProofServ->GetSocket()->Send(m);
3264  }
3265 }
3266 
3267 ////////////////////////////////////////////////////////////////////////////////
3268 /// Progress signal.
3269 
3271  Long64_t bytesread,
3272  Float_t initTime, Float_t procTime,
3273  Float_t evtrti, Float_t mbrti)
3274 {
3275  PDB(kGlobal,1)
3276  Info("Progress","%lld %lld %lld %f %f %f %f", total, processed, bytesread,
3277  initTime, procTime, evtrti, mbrti);
3278 
3279  if (IsClient()) {
3280  fProof->Progress(total, processed, bytesread, initTime, procTime, evtrti, mbrti);
3281  } else {
3282  // Send to the previous tier
3284  m << total << processed << bytesread << initTime << procTime << evtrti << mbrti;
3285  gProofServ->GetSocket()->Send(m);
3286  }
3287 }
3288 
3289 ////////////////////////////////////////////////////////////////////////////////
3290 /// Progress signal.
3291 
3293 {
3294  if (pi) {
3295  PDB(kGlobal,1)
3296  Info("Progress","%lld %lld %lld %f %f %f %f %d %f", pi->fTotal, pi->fProcessed, pi->fBytesRead,
3297  pi->fInitTime, pi->fProcTime, pi->fEvtRateI, pi->fMBRateI,
3298  pi->fActWorkers, pi->fEffSessions);
3299 
3300  if (IsClient()) {
3301  fProof->Progress(pi->fTotal, pi->fProcessed, pi->fBytesRead,
3302  pi->fInitTime, pi->fProcTime,
3303  pi->fEvtRateI, pi->fMBRateI,
3304  pi->fActWorkers, pi->fTotSessions, pi->fEffSessions);
3305  } else {
3306  // Send to the previous tier
3308  m << pi;
3309  gProofServ->GetSocket()->Send(m);
3310  }
3311  } else {
3312  Warning("Progress","TProofProgressInfo object undefined!");
3313  }
3314 }
3315 
3316 
3317 ////////////////////////////////////////////////////////////////////////////////
3318 /// Feedback signal.
3319 
3321 {
3322  fProof->Feedback(objs);
3323 }
3324 
3325 ////////////////////////////////////////////////////////////////////////////////
3326 /// Stop process after this event.
3327 
3329 {
3330  if (fPacketizer != 0)
3331  fPacketizer->StopProcess(abort, kFALSE);
3332  if (abort == kTRUE)
3334  else
3336 }
3337 
3338 ////////////////////////////////////////////////////////////////////////////////
3339 /// Incorporate the received object 'obj' into the output list fOutput.
3340 /// The latter is created if not existing.
3341 /// This method short cuts 'StoreOutput + MergeOutput' optimizing the memory
3342 /// consumption.
3343 /// Returns -1 in case of error, 1 if the object has been merged into another
3344 /// one (so that its ownership has not been taken and can be deleted), and 0
3345 /// otherwise.
3346 
3348 {
3349  PDB(kOutput,1)
3350  Info("AddOutputObject","Enter: %p (%s)", obj, obj ? obj->ClassName() : "undef");
3351 
3352  // We must something to process
3353  if (!obj) {
3354  PDB(kOutput,1) Info("AddOutputObject","Invalid input (obj == 0x0)");
3355  return -1;
3356  }
3357 
3358  // Create the output list, if not yet done
3359  if (!fOutput)
3360  fOutput = new THashList;
3361 
3362  // Flag about merging
3363  Bool_t merged = kTRUE;
3364 
3365  // Process event lists first
3366  TList *elists = dynamic_cast<TList *> (obj);
3367  if (elists && !strcmp(elists->GetName(), "PROOF_EventListsList")) {
3368 
3369  // Create a global event list, result of merging the event lists
3370  // coresponding to the various data set elements
3371  TEventList *evlist = new TEventList("PROOF_EventList");
3372 
3373  // Iterate the list of event list segments
3374  TIter nxevl(elists);
3375  TEventList *evl = 0;
3376  while ((evl = dynamic_cast<TEventList *> (nxevl()))) {
3377 
3378  // Find the file offset (fDSet is the current TDSet instance)
3379  // locating the element by name
3380  TIter nxelem(fDSet->GetListOfElements());
3381  TDSetElement *elem = 0;
3382  while ((elem = dynamic_cast<TDSetElement *> (nxelem()))) {
3383  if (!strcmp(elem->GetFileName(), evl->GetName()))
3384  break;
3385  }
3386  if (!elem) {
3387  Error("AddOutputObject", "Found an event list for %s, but no object with"
3388  " the same name in the TDSet", evl->GetName());
3389  continue;
3390  }
3391  Long64_t offset = elem->GetTDSetOffset();
3392 
3393  // Shift the list by the number of first event in that file
3394  Long64_t *arr = evl->GetList();
3395  Int_t num = evl->GetN();
3396  if (arr && offset > 0)
3397  for (Int_t i = 0; i < num; i++)
3398  arr[i] += offset;
3399 
3400  // Add to the global event list
3401  evlist->Add(evl);
3402  }
3403 
3404  // Incorporate the resulting global list in fOutput
3405  SetLastMergingMsg(evlist);
3406  Incorporate(evlist, fOutput, merged);
3407  NotifyMemory(evlist);
3408 
3409  // Delete the global list if merged
3410  if (merged)
3411  SafeDelete(evlist);
3412 
3413  // The original object has been transformed in something else; we do
3414  // not have ownership on it
3415  return 1;
3416  }
3417 
3418  // Check if we need to merge files
3419  TProofOutputFile *pf = dynamic_cast<TProofOutputFile*>(obj);
3420  if (pf) {
3421  fMergeFiles = kTRUE;
3422  if (!IsClient() || fProof->IsLite()) {
3423  if (pf->IsMerge()) {
3424  Bool_t hasfout = (pf->GetOutputFileName() &&
3425  strlen(pf->GetOutputFileName()) > 0 &&
3426  pf->TestBit(TProofOutputFile::kOutputFileNameSet)) ? kTRUE : kFALSE;
3427  Bool_t setfout = (!hasfout || TestBit(TVirtualProofPlayer::kIsSubmerger)) ? kTRUE : kFALSE;
3428  if (setfout) {
3429 
3430  TString ddir, ddopts;
3431  if (gProofServ) {
3432  ddir.Form("%s/", gProofServ->GetDataDir());
3433  if (gProofServ->GetDataDirOpts()) ddopts = gProofServ->GetDataDirOpts();
3434  }
3435  // Set the output file
3436  TString outfile(pf->GetOutputFileName());
3437  outfile.ReplaceAll("<datadir>/", ddir.Data());
3438  if (!ddopts.IsNull()) outfile += TString::Format("?%s", ddopts.Data());
3439  pf->SetOutputFileName(outfile);
3440 
3441  if (gProofServ) {
3442  // If submerger, save first the existing filename, if any
3443  if (TestBit(TVirtualProofPlayer::kIsSubmerger) && hasfout) {
3444  TString key = TString::Format("PROOF_OutputFileName_%s", pf->GetFileName());
3445  if (!fOutput->FindObject(key.Data()))
3446  fOutput->Add(new TNamed(key.Data(), pf->GetOutputFileName()));
3447  }
3448  TString of;
3450  if (of.IsNull()) {
3451  // Assume an xroot server running on the machine
3452  of.Form("root://%s/", gSystem->HostName());
3453  if (gSystem->Getenv("XRDPORT")) {
3454  TString sp(gSystem->Getenv("XRDPORT"));
3455  if (sp.IsDigit())
3456  of.Form("root://%s:%s/", gSystem->HostName(), sp.Data());
3457  }
3458  }
3459  TString sessionPath(gProofServ->GetSessionDir());
3460  TProofServ::FilterLocalroot(sessionPath, of);
3461  of += TString::Format("%s/%s", sessionPath.Data(), pf->GetFileName());
3463  if (!of.EndsWith(".merger")) of += ".merger";
3464  } else {
3465  if (of.EndsWith(".merger")) of.Remove(of.Last('.'));
3466  }
3467  pf->SetOutputFileName(of);
3468  }
3469  }
3470  // Notify
3471  PDB(kOutput, 1) pf->Print();
3472  }
3473  } else {
3474  // On clients notify the output path
3475  Printf("Output file: %s", pf->GetOutputFileName());
3476  }
3477  }
3478 
3479  // For other objects we just run the incorporation procedure
3480  SetLastMergingMsg(obj);
3481  Incorporate(obj, fOutput, merged);
3482  NotifyMemory(obj);
3483 
3484  // We are done
3485  return (merged ? 1 : 0);
3486 }
3487 
3488 ////////////////////////////////////////////////////////////////////////////////
3489 /// Control output redirection to TProof::fLogFileW
3490 
3492 {
3493  if (on && fProof && fProof->fLogFileW) {
3496  } else if (!on) {
3497  if (fErrorHandler) {
3500  }
3501  }
3502 }
3503 
3504 ////////////////////////////////////////////////////////////////////////////////
3505 /// Incorporate the content of the received output list 'out' into the final
3506 /// output list fOutput. The latter is created if not existing.
3507 /// This method short cuts 'StoreOutput + MergeOutput' limiting the memory
3508 /// consumption.
3509 
3511 {
3512  PDB(kOutput,1) Info("AddOutput","Enter");
3513 
3514  // We must something to process
3515  if (!out) {
3516  PDB(kOutput,1) Info("AddOutput","Invalid input (out == 0x0)");
3517  return;
3518  }
3519 
3520  // Create the output list, if not yet done
3521  if (!fOutput)
3522  fOutput = new THashList;
3523 
3524  // Process event lists first
3525  Bool_t merged = kTRUE;
3526  TList *elists = dynamic_cast<TList *> (out->FindObject("PROOF_EventListsList"));
3527  if (elists) {
3528 
3529  // Create a global event list, result of merging the event lists
3530  // corresponding to the various data set elements
3531  TEventList *evlist = new TEventList("PROOF_EventList");
3532 
3533  // Iterate the list of event list segments
3534  TIter nxevl(elists);
3535  TEventList *evl = 0;
3536  while ((evl = dynamic_cast<TEventList *> (nxevl()))) {
3537 
3538  // Find the file offset (fDSet is the current TDSet instance)
3539  // locating the element by name
3540  TIter nxelem(fDSet->GetListOfElements());
3541  TDSetElement *elem = 0;
3542  while ((elem = dynamic_cast<TDSetElement *> (nxelem()))) {
3543  if (!strcmp(elem->GetFileName(), evl->GetName()))
3544  break;
3545  }
3546  if (!elem) {
3547  Error("AddOutput", "Found an event list for %s, but no object with"
3548  " the same name in the TDSet", evl->GetName());
3549  continue;
3550  }
3551  Long64_t offset = elem->GetTDSetOffset();
3552 
3553  // Shift the list by the number of first event in that file
3554  Long64_t *arr = evl->GetList();
3555  Int_t num = evl->GetN();
3556  if (arr && offset > 0)
3557  for (Int_t i = 0; i < num; i++)
3558  arr[i] += offset;
3559 
3560  // Add to the global event list
3561  evlist->Add(evl);
3562  }
3563 
3564  // Remove and delete the events lists object to avoid spoiling iteration
3565  // during next steps
3566  out->Remove(elists);
3567  delete elists;
3568 
3569  // Incorporate the resulting global list in fOutput
3570  SetLastMergingMsg(evlist);
3571  Incorporate(evlist, fOutput, merged);
3572  NotifyMemory(evlist);
3573  }
3574 
3575  // Iterate on the remaining objects in the received list
3576  TIter nxo(out);
3577  TObject *obj = 0;
3578  while ((obj = nxo())) {
3579  SetLastMergingMsg(obj);
3580  Incorporate(obj, fOutput, merged);
3581  // If not merged, drop from the temporary list, as the ownership
3582  // passes to fOutput
3583  if (!merged)
3584  out->Remove(obj);
3585  NotifyMemory(obj);
3586  }
3587 
3588  // Done
3589  return;
3590 }
3591 
3592 ////////////////////////////////////////////////////////////////////////////////
3593 /// Printout the memory record after merging object 'obj'
3594 /// This record is used by the memory monitor
3595 
3597 {
3598  if (fProof && (!IsClient() || fProof->IsLite())){
3599  ProcInfo_t pi;
3600  if (!gSystem->GetProcInfo(&pi)){
3601  // For PROOF-Lite we redirect this output to a the open log file so that the
3602  // memory monitor can pick these messages up
3604  Info("NotifyMemory|Svc", "Memory %ld virtual %ld resident after merging object %s",
3605  pi.fMemVirtual, pi.fMemResident, obj->GetName());
3606  RedirectOutput(0);
3607  }
3608  // Record also values for monitoring
3610  }
3611 }
3612 
3613 ////////////////////////////////////////////////////////////////////////////////
3614 /// Set the message to be notified in case of exception
3615 
3617 {
3618  TString lastMsg = TString::Format("while merging object '%s'", obj->GetName());
3619  TProofServ::SetLastMsg(lastMsg);
3620 }
3621 
3622 ////////////////////////////////////////////////////////////////////////////////
3623 /// Incorporate object 'newobj' in the list 'outlist'.
3624 /// The object is merged with an object of the same name already existing in
3625 /// the list, or just added.
3626 /// The boolean merged is set to kFALSE when the object is just added to 'outlist';
3627 /// this happens if the Merge() method does not exist or if a object named as 'obj'
3628 /// is not already in the list. If the obj is not 'merged' than it should not be
3629 /// deleted, unless outlist is not owner of its objects.
3630 /// Return 0 on success, -1 on error.
3631 
3633 {
3634  merged = kTRUE;
3635 
3636  PDB(kOutput,1)
3637  Info("Incorporate", "enter: obj: %p (%s), list: %p",
3638  newobj, newobj ? newobj->ClassName() : "undef", outlist);
3639 
3640  // The object and list must exist
3641  if (!newobj || !outlist) {
3642  Error("Incorporate","Invalid inputs: obj: %p, list: %p", newobj, outlist);
3643  return -1;
3644  }
3645 
3646  // Special treatment for histograms in autobin mode
3647  Bool_t specialH =
3648  (!fProof || !fProof->TestBit(TProof::kIsClient) || fProof->IsLite()) ? kTRUE : kFALSE;
3649  if (specialH && newobj->InheritsFrom(TH1::Class())) {
3650  if (!HandleHistogram(newobj, merged)) {
3651  if (merged) {
3652  PDB(kOutput,1) Info("Incorporate", "histogram object '%s' merged", newobj->GetName());
3653  } else {
3654  PDB(kOutput,1) Info("Incorporate", "histogram object '%s' added to the"
3655  " appropriate list for delayed merging", newobj->GetName());
3656  }
3657  return 0;
3658  }
3659  }
3660 
3661  // Check if an object with the same name exists already
3662  TObject *obj = outlist->FindObject(newobj->GetName());
3663 
3664  // If no, add the new object and return
3665  if (!obj) {
3666  outlist->Add(newobj);
3667  merged = kFALSE;
3668  // Done
3669  return 0;
3670  }
3671 
3672  // Locate the Merge(TCollection *) method
3673  TMethodCall callEnv;
3674  if (obj->IsA())
3675  callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
3676  if (callEnv.IsValid()) {
3677  // Found: put the object in a one-element list
3678  static TList *xlist = new TList;
3679  xlist->Add(newobj);
3680  // Call the method
3681  callEnv.SetParam((Long_t) xlist);
3682  callEnv.Execute(obj);
3683  // Ready for next call
3684  xlist->Clear();
3685  } else {
3686  // Not found: return individual objects
3687  outlist->Add(newobj);
3688  merged = kFALSE;
3689  }
3690 
3691  // Done
3692  return 0;
3693 }
3694 
3695 ////////////////////////////////////////////////////////////////////////////////
3696 /// Low statistic histograms need a special treatment when using autobin
3697 
3699 {
3700  TH1 *h = dynamic_cast<TH1 *>(obj);
3701  if (!h) {
3702  // Not an histo
3703  return obj;
3704  }
3705 
3706  // This is only used if we return (TObject *)0 and there is only one case
3707  // when we set this to kTRUE
3708  merged = kFALSE;
3709 
3710  // Does is still needs binning ?
3711  Bool_t tobebinned = (h->GetBuffer()) ? kTRUE : kFALSE;
3712 
3713  // Number of entries
3714  Int_t nent = h->GetBufferLength();
3715  PDB(kOutput,2) Info("HandleHistogram", "h:%s ent:%d, buffer size: %d",
3716  h->GetName(), nent, h->GetBufferSize());
3717 
3718  // Attach to the list in the outputlists, if any
3719  TList *list = 0;
3720  if (!fOutputLists) {
3721  PDB(kOutput,2) Info("HandleHistogram", "create fOutputLists");
3722  fOutputLists = new TList;
3724  }
3725  list = (TList *) fOutputLists->FindObject(h->GetName());
3726 
3727  TH1 *href = 0;
3728  if (tobebinned) {
3729 
3730  // The histogram needs to be projected in a reasonable range: we
3731  // do this at the end with all the histos, so we need to create
3732  // a list here
3733  if (!list) {
3734  // Create the list
3735  list = new TList;
3736  list->SetName(h->GetName());
3737  list->SetOwner();
3738  fOutputLists->Add(list);
3739  // Move in it any previously merged object from the output list
3740  if (fOutput && (href = (TH1 *) fOutput->FindObject(h->GetName()))) {
3741  fOutput->Remove(href);
3742  list->Add(href);
3743  }
3744  }
3745  TIter nxh(list);
3746  while ((href = (TH1 *) nxh())) {
3747  if (href->GetBuffer() && href->GetBufferLength() < nent) break;
3748  }
3749  if (href) {
3750  list->AddBefore(href, h);
3751  } else {
3752  list->Add(h);
3753  }
3754  // Done
3755  return (TObject *)0;
3756 
3757  } else {
3758 
3759  if (list) {
3760  TIter nxh(list);
3761  while ((href = (TH1 *) nxh())) {
3762  if (href->GetBuffer() || href->GetEntries() < nent) break;
3763  }
3764  if (href) {
3765  list->AddBefore(href, h);
3766  } else {
3767  list->Add(h);
3768  }
3769  // Done
3770  return (TObject *)0;
3771 
3772  } else {
3773  // Check if we can 'Add' the histogram to an existing one; this is more efficient
3774  // then using Merge
3775  TH1 *hout = (TH1*) fOutput->FindObject(h->GetName());
3776  if (hout) {
3777  // Remove the existing histo from the output list ...
3778  fOutput->Remove(hout);
3779  // ... and create either the list to merge in one-go at the end
3780  // (more efficient than merging one by one) or, if too big, merge
3781  // these two and start the 'one-by-one' technology
3782  Int_t hsz = h->GetNbinsX() * h->GetNbinsY() * h->GetNbinsZ();
3783  if (fMergeTH1OneByOne || (gProofServ && hsz > gProofServ->GetMsgSizeHWM())) {
3784  list = new TList;
3785  list->Add(hout);
3786  h->Merge(list);
3787  list->SetOwner();
3788  delete list;
3789  return h;
3790  } else {
3791  list = new TList;
3792  list->SetName(h->GetName());
3793  list->SetOwner();
3794  fOutputLists->Add(list);
3795  // Add the existing and the incoming histos
3796  list->Add(hout);
3797  list->Add(h);
3798  // Done
3799  return (TObject *)0;
3800  }
3801  } else {
3802  // This is the first one; add it to the output list
3803  fOutput->Add(h);
3804  return (TObject *)0;
3805  }
3806  }
3807  }
3808  PDB(kOutput,1) Info("HandleHistogram", "leaving");
3809 }
3810 
3811 ////////////////////////////////////////////////////////////////////////////////
3812 /// Return kTRUE is the histograms 'h0' and 'h1' have the same binning and ranges
3813 /// on the axis (i.e. if they can be just Add-ed for merging).
3814 
3816 {
3817  Bool_t rc = kFALSE;
3818  if (!h0 || !h1) return rc;
3819 
3820  TAxis *a0 = 0, *a1 = 0;
3821 
3822  // Check X
3823  a0 = h0->GetXaxis();
3824  a1 = h1->GetXaxis();
3825  if (a0->GetNbins() == a1->GetNbins())
3826  if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
3827  if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
3828 
3829  // Check Y, if needed
3830  if (h0->GetDimension() > 1) {
3831  rc = kFALSE;
3832  a0 = h0->GetYaxis();
3833  a1 = h1->GetYaxis();
3834  if (a0->GetNbins() == a1->GetNbins())
3835  if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
3836  if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
3837  }
3838 
3839  // Check Z, if needed
3840  if (h0->GetDimension() > 2) {
3841  rc = kFALSE;
3842  a0 = h0->GetZaxis();
3843  a1 = h1->GetZaxis();
3844  if (a0->GetNbins() == a1->GetNbins())
3845  if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
3846  if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
3847  }
3848 
3849  // Done
3850  return rc;
3851 }
3852 
3853 ////////////////////////////////////////////////////////////////////////////////
3854 /// Store received output list.
3855 
3857 {
3858  PDB(kOutput,1) Info("StoreOutput","Enter");
3859 
3860  if ( out == 0 ) {
3861  PDB(kOutput,1) Info("StoreOutput","Leave (empty)");
3862  return;
3863  }
3864 
3865  TIter next(out);
3866  out->SetOwner(kFALSE); // take ownership of the contents
3867 
3868  if (fOutputLists == 0) {
3869  PDB(kOutput,2) Info("StoreOutput","Create fOutputLists");
3870  fOutputLists = new TList;
3872  }
3873  // process eventlists first
3874  TList* lists = dynamic_cast<TList*> (out->FindObject("PROOF_EventListsList"));
3875  if (lists) {
3876  out->Remove(lists);
3877  TEventList *mainList = new TEventList("PROOF_EventList");
3878  out->Add(mainList);
3879  TIter it(lists);
3880  TEventList *aList;
3881  while ( (aList = dynamic_cast<TEventList*> (it())) ) {
3882  // find file offset
3883  TIter nxe(fDSet->GetListOfElements());
3884  TDSetElement *elem;
3885  while ( (elem = dynamic_cast<TDSetElement*> (nxe())) ) {
3886  if (strcmp(elem->GetFileName(), aList->GetName()) == 0)
3887  break;
3888  }
3889  if (!elem) {
3890  Error("StoreOutput", "found the EventList for %s, but no object with that name "
3891  "in the TDSet", aList->GetName());
3892  continue;
3893  }
3894  Long64_t offset = elem->GetTDSetOffset();
3895 
3896  // shift the list by the number of first event in that file
3897  Long64_t *arr = aList->GetList();
3898  Int_t num = aList->GetN();
3899  if (arr && offset)
3900  for (int i = 0; i < num; i++)
3901  arr[i] += offset;
3902 
3903  mainList->Add(aList); // add to the main list
3904  }
3905  delete lists;
3906  }
3907 
3908  TObject *obj;
3909  while( (obj = next()) ) {
3910  PDB(kOutput,2) Info("StoreOutput","find list for '%s'", obj->GetName() );
3911 
3912  TList *list = (TList *) fOutputLists->FindObject( obj->GetName() );
3913  if ( list == 0 ) {
3914  PDB(kOutput,2) Info("StoreOutput", "list for '%s' not found (creating)", obj->GetName());
3915  list = new TList;
3916  list->SetName( obj->GetName() );
3917  list->SetOwner();
3918  fOutputLists->Add( list );
3919  }
3920  list->Add( obj );
3921  }
3922 
3923  delete out;
3924  PDB(kOutput,1) Info("StoreOutput", "leave");
3925 }
3926 
3927 ////////////////////////////////////////////////////////////////////////////////
3928 /// Merge feedback lists.
3929 
3931 {
3932  PDB(kFeedback,1)
3933  Info("MergeFeedback","Enter");
3934 
3935  if ( fFeedbackLists == 0 ) {
3936  PDB(kFeedback,1)
3937  Info("MergeFeedback","Leave (no output)");
3938  return 0;
3939  }
3940 
3941  TList *fb = new TList; // collection of feedback objects
3942  fb->SetOwner();
3943 
3945 
3946  TMap *map;
3947  while ( (map = (TMap*) next()) ) {
3948 
3949  PDB(kFeedback,2)
3950  Info("MergeFeedback", "map %s size: %d", map->GetName(), map->GetSize());
3951 
3952  // turn map into list ...
3953 
3954  TList *list = new TList;
3955  TIter keys(map);
3956 
3957 #ifndef R__TH1MERGEFIXED
3958  Int_t nbmx = -1;
3959  TObject *oref = 0;
3960 #endif
3961  while ( TObject *key = keys() ) {
3962  TObject *o = map->GetValue(key);
3963  TH1 *h = dynamic_cast<TH1 *>(o);
3964 #ifndef R__TH1MERGEFIXED
3965  // Temporary fix for to cope with the problem in TH1::Merge.
3966  // We need to use a reference histo the one with the largest number
3967  // of bins so that the histos from all submasters can be correctly
3968  // fit in
3969  if (h && !strncmp(o->GetName(),"PROOF_",6)) {
3970  if (h->GetNbinsX() > nbmx) {
3971  nbmx= h->GetNbinsX();
3972  oref = o;
3973  }
3974  }
3975 #endif
3976  if (h) {
3977  TIter nxh(list);
3978  TH1 *href= 0;
3979  while ((href = (TH1 *)nxh())) {
3980  if (h->GetBuffer()) {
3981  if (href->GetBuffer() && href->GetBufferLength() < h->GetBufferLength()) break;
3982  } else {
3983  if (href->GetBuffer() || href->GetEntries() < h->GetEntries()) break;
3984  }
3985  }
3986  if (href) {
3987  list->AddBefore(href, h);
3988  } else {
3989  list->Add(h);
3990  }
3991  } else {
3992  list->Add(o);
3993  }
3994  }
3995 
3996  // clone first object, remove from list
3997 #ifdef R__TH1MERGEFIXED
3998  TObject *obj = list->First();
3999 #else
4000  TObject *obj = (oref) ? oref : list->First();
4001 #endif
4002  list->Remove(obj);
4003  obj = obj->Clone();
4004  fb->Add(obj);
4005 
4006  if ( list->IsEmpty() ) {
4007  delete list;
4008  continue;
4009  }
4010 
4011  // merge list with clone
4012  TMethodCall callEnv;
4013  if (obj->IsA())
4014  callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
4015  if (callEnv.IsValid()) {
4016  callEnv.SetParam((Long_t) list);
4017  callEnv.Execute(obj);
4018  } else {
4019  // No Merge interface, return copy of individual objects
4020  while ( (obj = list->First()) ) {
4021  fb->Add(obj->Clone());
4022  list->Remove(obj);
4023  }
4024  }
4025 
4026  delete list;
4027  }
4028 
4029  PDB(kFeedback,1)
4030  Info("MergeFeedback","Leave (%d object(s))", fb->GetSize());
4031 
4032  return fb;
4033 }
4034 
4035 ////////////////////////////////////////////////////////////////////////////////
4036 /// Store feedback results from the specified slave.
4037 
4039 {
4040  PDB(kFeedback,1)
4041  Info("StoreFeedback","Enter");
4042 
4043  if ( out == 0 ) {
4044  PDB(kFeedback,1)
4045  Info("StoreFeedback","Leave (empty)");
4046  return;
4047  }
4048 
4049  if ( IsClient() ) {
4050  // in client
4051  Feedback(out);
4052  delete out;
4053  return;
4054  }
4055 
4056  if (fFeedbackLists == 0) {
4057  PDB(kFeedback,2) Info("StoreFeedback","Create fFeedbackLists");
4058  fFeedbackLists = new TList;
4060  }
4061 
4062  TIter next(out);
4063  out->SetOwner(kFALSE); // take ownership of the contents
4064 
4065  const char *ord = ((TSlave*) slave)->GetOrdinal();
4066 
4067  TObject *obj;
4068  while( (obj = next()) ) {
4069  PDB(kFeedback,2)
4070  Info("StoreFeedback","%s: Find '%s'", ord, obj->GetName() );
4071  TMap *map = (TMap*) fFeedbackLists->FindObject(obj->GetName());
4072  if ( map == 0 ) {
4073  PDB(kFeedback,2)
4074  Info("StoreFeedback", "%s: map for '%s' not found (creating)", ord, obj->GetName());
4075  // Map must not be owner (ownership is with regards to the keys (only))
4076  map = new TMap;
4077  map->SetName(obj->GetName());
4078  fFeedbackLists->Add(map);
4079  } else {
4080  PDB(kFeedback,2)
4081  Info("StoreFeedback","%s: removing previous value", ord);
4082  if (map->GetValue(slave))
4083  delete map->GetValue(slave);
4084  map->Remove(slave);
4085  }
4086  map->Add(slave, obj);
4087  PDB(kFeedback,2)
4088  Info("StoreFeedback","%s: %s, size: %d", ord, obj->GetName(), map->GetSize());
4089  }
4090 
4091  delete out;
4092  PDB(kFeedback,1)
4093  Info("StoreFeedback","Leave");
4094 }
4095 
4096 ////////////////////////////////////////////////////////////////////////////////
4097 /// Setup reporting of feedback objects.
4098 
4100 {
4101  if (IsClient()) return; // Client does not need timer
4102 
4103  fFeedback = (TList*) fInput->FindObject("FeedbackList");
4104 
4105  PDB(kFeedback,1) Info("SetupFeedback","\"FeedbackList\" %sfound",
4106  fFeedback == 0 ? "NOT ":"");
4107 
4108  if (fFeedback == 0 || fFeedback->GetSize() == 0) return;
4109 
4110  // OK, feedback was requested, setup the timer
4111  SafeDelete(fFeedbackTimer);
4112  fFeedbackPeriod = 2000;
4113  TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
4114  fFeedbackTimer = new TTimer;
4115  fFeedbackTimer->SetObject(this);
4116  fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
4117 }
4118 
4119 ////////////////////////////////////////////////////////////////////////////////
4120 /// Stop reporting of feedback objects.
4121 
4123 {
4124  if (fFeedbackTimer == 0) return;
4125 
4126  PDB(kFeedback,1) Info("StopFeedback","Stop Timer");
4127 
4128  SafeDelete(fFeedbackTimer);
4129 }
4130 
4131 ////////////////////////////////////////////////////////////////////////////////
4132 /// Send feedback objects to client.
4133 
4135 {
4136  PDB(kFeedback,2) Info("HandleTimer","Entry");
4137 
4138  if (fFeedbackTimer == 0) return kFALSE; // timer already switched off
4139 
4140  // process local feedback objects
4141 
4142  TList *fb = new TList;
4143  fb->SetOwner();
4144 
4145  TIter next(fFeedback);
4146  while( TObjString *name = (TObjString*) next() ) {
4147  TObject *o = fOutput->FindObject(name->GetName());
4148  if (o != 0) {
4149  fb->Add(o->Clone());
4150  // remove the corresponding entry from the feedback list
4151  TMap *m = 0;
4152  if (fFeedbackLists &&
4153  (m = (TMap *) fFeedbackLists->FindObject(name->GetName()))) {
4154  fFeedbackLists->Remove(m);
4155  m->DeleteValues();
4156  delete m;
4157  }
4158  }
4159  }
4160 
4161  if (fb->GetSize() > 0) {
4162  StoreFeedback(this, fb); // adopts fb
4163  } else {
4164  delete fb;
4165  }
4166 
4167  if (fFeedbackLists == 0) {
4168  fFeedbackTimer->Start(fFeedbackPeriod, kTRUE); // maybe next time
4169  return kFALSE;
4170  }
4171 
4172  fb = MergeFeedback();
4173 
4174  PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());
4175 
4177  m << fb;
4178 
4179  // send message to client;
4180  gProofServ->GetSocket()->Send(m);
4181 
4182  delete fb;
4183 
4184  fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
4185 
4186  return kFALSE; // ignored?
4187 }
4188 
4189 ////////////////////////////////////////////////////////////////////////////////
4190 /// Get next packet for specified slave.
4191 
4193 {
4194  // The first call to this determines the end of initialization
4195  SetInitTime();
4196 
4197  if (fProcPackets) {
4198  Int_t bin = fProcPackets->GetXaxis()->FindBin(slave->GetOrdinal());
4199  if (bin >= 0) {
4200  if (fProcPackets->GetBinContent(bin) > 0)
4201  fProcPackets->Fill(slave->GetOrdinal(), -1);
4202  }
4203  }
4204 
4205  TDSetElement *e = fPacketizer->GetNextPacket( slave, r );
4206 
4207  if (e == 0) {
4208  PDB(kPacketizer,2)
4209  Info("GetNextPacket","%s: done!", slave->GetOrdinal());
4210  } else if (e == (TDSetElement*) -1) {
4211  PDB(kPacketizer,2)
4212  Info("GetNextPacket","%s: waiting ...", slave->GetOrdinal());
4213  } else {
4214  PDB(kPacketizer,2)
4215  Info("GetNextPacket","%s (%s): '%s' '%s' '%s' %lld %lld",
4216  slave->GetOrdinal(), slave->GetName(), e->GetFileName(),
4217  e->GetDirectory(), e->GetObjName(), e->GetFirst(), e->GetNum());
4218  if (fProcPackets) fProcPackets->Fill(slave->GetOrdinal(), 1);
4219  }
4220 
4221  return e;
4222 }
4223 
4224 ////////////////////////////////////////////////////////////////////////////////
4225 /// Is the player running on the client?
4226 
4228 {
4230 }
4231 
4232 ////////////////////////////////////////////////////////////////////////////////
4233 /// Draw (support for TChain::Draw()).
4234 /// Returns -1 in case of error or number of selected events in case of success.
4235 
4237  const char *selection, Option_t *option,
4238  Long64_t nentries, Long64_t firstentry)
4239 {
4240  if (!fgDrawInputPars) {
4241  fgDrawInputPars = new THashList;
4242  fgDrawInputPars->Add(new TObjString("FeedbackList"));
4243  fgDrawInputPars->Add(new TObjString("PROOF_ChainWeight"));
4244  fgDrawInputPars->Add(new TObjString("PROOF_LineColor"));
4245  fgDrawInputPars->Add(new TObjString("PROOF_LineStyle"));
4246  fgDrawInputPars->Add(new TObjString("PROOF_LineWidth"));
4247  fgDrawInputPars->Add(new TObjString("PROOF_MarkerColor"));
4248  fgDrawInputPars->Add(new TObjString("PROOF_MarkerStyle"));
4249  fgDrawInputPars->Add(new TObjString("PROOF_MarkerSize"));
4250  fgDrawInputPars->Add(new TObjString("PROOF_FillColor"));
4251  fgDrawInputPars->Add(new TObjString("PROOF_FillStyle"));
4252  fgDrawInputPars->Add(new TObjString("PROOF_ListOfAliases"));
4253  }
4254 
4255  TString selector, objname;
4256  if (GetDrawArgs(varexp, selection, option, selector, objname) != 0) {
4257  Error("DrawSelect", "parsing arguments");
4258  return -1;
4259  }
4260 
4261  TNamed *varexpobj = new TNamed("varexp", varexp);
4262  TNamed *selectionobj = new TNamed("selection", selection);
4263 
4264  // Save the current input list
4265  TObject *o = 0;
4266  TList *savedInput = new TList;
4267  TIter nxi(fInput);
4268  while ((o = nxi())) {
4269  savedInput->Add(o);
4270  TString n(o->GetName());
4271  if (fgDrawInputPars &&
4272  !fgDrawInputPars->FindObject(o->GetName()) &&
4273  !n.BeginsWith("alias:")) fInput->Remove(o);
4274  }
4275 
4276  fInput->Add(varexpobj);
4277  fInput->Add(selectionobj);
4278 
4279  // Make sure we have an object name
4280  if (objname == "") objname = "htemp";
4281 
4282  fProof->AddFeedback(objname);
4283  Long64_t r = Process(set, selector, option, nentries, firstentry);
4284  fProof->RemoveFeedback(objname);
4285 
4286  fInput->Remove(varexpobj);
4287  fInput->Remove(selectionobj);
4288  if (TNamed *opt = dynamic_cast<TNamed*> (fInput->FindObject("PROOF_OPTIONS"))) {
4289  fInput->Remove(opt);
4290  delete opt;
4291  }
4292 
4293  delete varexpobj;
4294  delete selectionobj;
4295 
4296  // Restore the input list
4297  fInput->Clear();
4298  TIter nxsi(savedInput);
4299  while ((o = nxsi()))
4300  fInput->Add(o);
4301  savedInput->SetOwner(kFALSE);
4302  delete savedInput;
4303 
4304  return r;
4305 }
4306 
4307 ////////////////////////////////////////////////////////////////////////////////
4308 /// Set init time
4309 
4311 {
4312  if (fPacketizer)
4314 }
4315 
4316 //------------------------------------------------------------------------------
4317 
4318 
4320 
4321 ////////////////////////////////////////////////////////////////////////////////
4322 /// Setup feedback.
4323 
4324 void TProofPlayerSlave::SetupFeedback()
4325 {
4326  TList *fb = (TList*) fInput->FindObject("FeedbackList");
4327  if (fb) {
4328  PDB(kFeedback,1)
4329  Info("SetupFeedback","\"FeedbackList\" found: %d objects", fb->GetSize());
4330  } else {
4331  PDB(kFeedback,1)
4332  Info("SetupFeedback","\"FeedbackList\" NOT found");
4333  }
4334 
4335  if (fb == 0 || fb->GetSize() == 0) return;
4336 
4337  // OK, feedback was requested, setup the timer
4338 
4339  SafeDelete(fFeedbackTimer);
4340  fFeedbackPeriod = 2000;
4341  TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
4342  fFeedbackTimer = new TTimer;
4343  fFeedbackTimer->SetObject(this);
4344  fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
4345 
4346  fFeedback = fb;
4347 }
4348 
4349 ////////////////////////////////////////////////////////////////////////////////
4350 /// Stop feedback.
4351 
4353 {
4354  if (fFeedbackTimer == 0) return;
4355 
4356  PDB(kFeedback,1) Info("StopFeedback","Stop Timer");
4357 
4358  SafeDelete(fFeedbackTimer);
4359 }
4360 
4361 ////////////////////////////////////////////////////////////////////////////////
4362 /// Handle timer event.
4363 
4365 {
4366  PDB(kFeedback,2) Info("HandleTimer","Entry");
4367 
4368  // If in sequential (0-slave-PROOF) mode we do not have a packetizer
4369  // so we also send the info to update the progress bar.
4370  if (gProofServ) {
4371  Bool_t sendm = kFALSE;
4373  if (gProofServ->IsMaster() && !gProofServ->IsParallel()) {
4374  sendm = kTRUE;
4375  if (gProofServ->GetProtocol() > 25) {
4376  m << GetProgressStatus();
4377  } else if (gProofServ->GetProtocol() > 11) {
4379  m << fTotalEvents << ps->GetEntries() << ps->GetBytesRead()
4380  << (Float_t) -1. << (Float_t) ps->GetProcTime()
4381  << (Float_t) ps->GetRate() << (Float_t) -1.;
4382  } else {
4383  m << fTotalEvents << GetEventsProcessed();
4384  }
4385  }
4386  if (sendm) gProofServ->GetSocket()->Send(m);
4387  }
4388 
4389  if (fFeedback == 0) return kFALSE;
4390 
4391  TList *fb = new TList;
4392  fb->SetOwner(kFALSE);
4393 
4394  if (fOutput == 0) {
4395  fOutput = (THashList *) fSelector->GetOutputList();
4396  }
4397 
4398  if (fOutput) {
4399  TIter next(fFeedback);
4400  while( TObjString *name = (TObjString*) next() ) {
4401  // TODO: find object in memory ... maybe allow only in fOutput ?
4402  TObject *o = fOutput->FindObject(name->GetName());
4403  if (o != 0) fb->Add(o);
4404  }
4405  }
4406 
4407  PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());
4408 
4410  m << fb;
4411 
4412  // send message to client;
4413  gProofServ->GetSocket()->Send(m);
4414 
4415  delete fb;
4416 
4417  fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
4418 
4419  return kFALSE; // ignored?
4420 }
4421 
4422 ////////////////////////////////////////////////////////////////////////////////
4423 /// Handle tree header request.
4424 
4426 {
4428 
4429  TDSet *dset;
4430  (*mess) >> dset;
4431  dset->Reset();
4432  TDSetElement *e = dset->Next();
4433  Long64_t entries = 0;
4434  TFile *f = 0;
4435  TTree *t = 0;
4436  if (!e) {
4437  PDB(kGlobal, 1) Info("HandleGetTreeHeader", "empty TDSet");
4438  } else {
4439  f = TFile::Open(e->GetFileName());
4440  t = 0;
4441  if (f) {
4442  t = (TTree*) f->Get(e->GetObjName());
4443  if (t) {
4444  t->SetMaxVirtualSize(0);
4445  t->DropBaskets();
4446  entries = t->GetEntries();
4447 
4448  // compute #entries in all the files
4449  while ((e = dset->Next()) != 0) {
4450  TFile *f1 = TFile::Open(e->GetFileName());
4451  if (f1) {
4452  TTree *t1 = (TTree*) f1->Get(e->GetObjName());
4453  if (t1) {
4454  entries += t1->GetEntries();
4455  delete t1;
4456  }
4457  delete f1;
4458  }
4459  }
4460  t->SetMaxEntryLoop(entries); // this field will hold the total number of entries ;)
4461  }
4462  }
4463  }
4464  if (t)
4465  answ << TString("Success") << t;
4466  else
4467  answ << TString("Failed") << t;
4468 
4469  fSocket->Send(answ);
4470 
4471  SafeDelete(t);
4472  SafeDelete(f);
4473 }
4474 
4475 
4476 //------------------------------------------------------------------------------
4477 
4479 
4480 ////////////////////////////////////////////////////////////////////////////////
4481 /// Process specified TDSet on PROOF. Runs on super master.
4482 /// The return value is -1 in case of error and TSelector::GetStatus() in
4483 /// in case of success.
4484 
4485 Long64_t TProofPlayerSuperMaster::Process(TDSet *dset, const char *selector_file,
4486  Option_t *option, Long64_t nentries,
4487  Long64_t first)
4488 {
4489  fProgressStatus->Reset();
4490  PDB(kGlobal,1) Info("Process","Enter");
4491 
4492  TProofSuperMaster *proof = dynamic_cast<TProofSuperMaster*>(GetProof());
4493  if (!proof) return -1;
4494 
4495  delete fOutput;
4496  fOutput = new THashList;
4497 
4498  TPerfStats::Start(fInput, fOutput);
4499 
4500  if (!SendSelector(selector_file)) {
4501  Error("Process", "sending selector %s", selector_file);
4502  return -1;
4503  }
4504 
4505  TCleanup clean(this);
4506  SetupFeedback();
4507 
4508  if (proof->IsMaster()) {
4509 
4510  // make sure the DSet is valid
4511  if (!dset->ElementsValid()) {
4512  proof->ValidateDSet(dset);
4513  if (!dset->ElementsValid()) {
4514  Error("Process", "could not validate TDSet");
4515  return -1;
4516  }
4517  }
4518 
4519  TList msds;
4520  msds.SetOwner(); // This will delete TPairs
4521 
4522  TList keyholder; // List to clean up key part of the pairs
4523  keyholder.SetOwner();
4524  TList valueholder; // List to clean up value part of the pairs
4525  valueholder.SetOwner();
4526 
4527  // Construct msd list using the slaves
4528  TIter nextslave(proof->GetListOfActiveSlaves());
4529  while (TSlave *sl = dynamic_cast<TSlave*>(nextslave())) {
4530  TList *submasters = 0;
4531  TPair *msd = dynamic_cast<TPair*>(msds.FindObject(sl->GetMsd()));
4532  if (!msd) {
4533  submasters = new TList;
4534  submasters->SetName(sl->GetMsd());
4535  keyholder.Add(submasters);
4536  TList *setelements = new TSortedList(kSortDescending);
4537  setelements->SetName(TString(sl->GetMsd())+"_Elements");
4538  valueholder.Add(setelements);
4539  msds.Add(new TPair(submasters, setelements));
4540  } else {
4541  submasters = dynamic_cast<TList*>(msd->Key());
4542  }
4543  if (submasters) submasters->Add(sl);
4544  }
4545 
4546  // Add TDSetElements to msd list
4547  Long64_t cur = 0; //start of next element
4548  TIter nextelement(dset->GetListOfElements());
4549  while (TDSetElement *elem = dynamic_cast<TDSetElement*>(nextelement())) {
4550 
4551  if (elem->GetNum()<1) continue; // get rid of empty elements
4552 
4553  if (nentries !=-1 && cur>=first+nentries) {
4554  // we are done
4555  break;
4556  }
4557 
4558  if (cur+elem->GetNum()-1<first) {
4559  //element is before first requested entry
4560  cur+=elem->GetNum();
4561  continue;
4562  }
4563 
4564  if (cur<first) {
4565  //modify element to get proper start
4566  elem->SetNum(elem->GetNum()-(first-cur));
4567  elem->SetFirst(elem->GetFirst()+first-cur);
4568  cur=first;
4569  }
4570 
4571  if (nentries==-1 || cur+elem->GetNum()<=first+nentries) {
4572  cur+=elem->GetNum();
4573  } else {
4574  //modify element to get proper end
4575  elem->SetNum(first+nentries-cur);
4576  cur=first+nentries;
4577  }
4578 
4579  TPair *msd = dynamic_cast<TPair*>(msds.FindObject(elem->GetMsd()));
4580  if (!msd) {
4581  Error("Process", "data requires mass storage domain '%s'"
4582  " which is not accessible in this proof session",
4583  elem->GetMsd());
4584  return -1;
4585  } else {
4586  TList *elements = dynamic_cast<TList*>(msd->Value());
4587  if (elements) elements->Add(elem);
4588  }
4589  }
4590 
4591  TList usedmasters;
4592  TIter nextmsd(msds.MakeIterator());
4593  while (TPair *msd = dynamic_cast<TPair*>(nextmsd())) {
4594  TList *submasters = dynamic_cast<TList*>(msd->Key());
4595  TList *setelements = dynamic_cast<TList*>(msd->Value());
4596 
4597  // distribute elements over the masters
4598  Int_t nmasters = submasters ? submasters->GetSize() : -1;
4599  Int_t nelements = setelements ? setelements->GetSize() : -1;
4600  for (Int_t i=0; i<nmasters; i++) {
4601 
4602  Long64_t nent = 0;
4603  TDSet set(dset->GetType(), dset->GetObjName(),
4604  dset->GetDirectory());
4605  for (Int_t j = (i*nelements)/nmasters;
4606  j < ((i+1)*nelements)/nmasters;
4607  j++) {
4608  TDSetElement *elem = setelements ?
4609  dynamic_cast<TDSetElement*>(setelements->At(j)) : (TDSetElement *)0;
4610  if (elem) {
4611  set.Add(elem->GetFileName(), elem->GetObjName(),
4612  elem->GetDirectory(), elem->GetFirst(),
4613  elem->GetNum(), elem->GetMsd());
4614  nent += elem->GetNum();
4615  } else {
4616  Warning("Process", "not a TDSetElement object");
4617  }
4618  }
4619 
4620  if (set.GetListOfElements()->GetSize()>0) {
4621  TMessage mesg(kPROOF_PROCESS);
4622  TString fn(gSystem->BaseName(selector_file));
4623  TString opt = option;
4624  mesg << &set << fn << fInput << opt << Long64_t(-1) << Long64_t(0);
4625 
4626  TSlave *sl = dynamic_cast<TSlave*>(submasters->At(i));
4627  if (sl) {
4628  PDB(kGlobal,1) Info("Process",
4629  "Sending TDSet with %d elements to submaster %s",
4630  set.GetListOfElements()->GetSize(),
4631  sl->GetOrdinal());
4632  sl->GetSocket()->Send(mesg);
4633  usedmasters.Add(sl);
4634 
4635  // setup progress info
4636  fSlaves.AddLast(sl);
4637  fSlaveProgress.Set(fSlaveProgress.GetSize()+1);
4638  fSlaveProgress[fSlaveProgress.GetSize()-1] = 0;
4639  fSlaveTotals.Set(fSlaveTotals.GetSize()+1);
4640  fSlaveTotals[fSlaveTotals.GetSize()-1] = nent;
4641  fSlaveBytesRead.Set(fSlaveBytesRead.GetSize()+1);
4642  fSlaveBytesRead[fSlaveBytesRead.GetSize()-1] = 0;
4643  fSlaveInitTime.Set(fSlaveInitTime.GetSize()+1);
4644  fSlaveInitTime[fSlaveInitTime.GetSize()-1] = -1.;
4645  fSlaveProcTime.Set(fSlaveProcTime.GetSize()+1);
4646  fSlaveProcTime[fSlaveProcTime.GetSize()-1] = -1.;
4647  fSlaveEvtRti.Set(fSlaveEvtRti.GetSize()+1);
4648  fSlaveEvtRti[fSlaveEvtRti.GetSize()-1] = -1.;
4649  fSlaveMBRti.Set(fSlaveMBRti.GetSize()+1);
4650  fSlaveMBRti[fSlaveMBRti.GetSize()-1] = -1.;
4651  fSlaveActW.Set(fSlaveActW.GetSize()+1);
4652  fSlaveActW[fSlaveActW.GetSize()-1] = 0;
4653  fSlaveTotS.Set(fSlaveTotS.GetSize()+1);
4654  fSlaveTotS[fSlaveTotS.GetSize()-1] = 0;
4655  fSlaveEffS.Set(fSlaveEffS.GetSize()+1);
4656  fSlaveEffS[fSlaveEffS.GetSize()-1] = 0.;
4657  } else {
4658  Warning("Process", "not a TSlave object");
4659  }
4660  }
4661  }
4662  }
4663 
4664  if ( !IsClient() ) HandleTimer(0);
4665  PDB(kGlobal,1) Info("Process","Calling Collect");
4666  proof->Collect(&usedmasters);
4667  HandleTimer(0);
4668 
4669  }
4670 
4671  StopFeedback();
4672 
4673  PDB(kGlobal,1) Info("Process","Calling Merge Output");
4674  MergeOutput();
4675 
4676  TPerfStats::Stop();
4677 
4678  return 0;
4679 }
4680 
4681 ////////////////////////////////////////////////////////////////////////////////
4682 /// Report progress.
4683 
4685 {
4686  Int_t idx = fSlaves.IndexOf(sl);
4687  fSlaveProgress[idx] = processed;
4688  if (fSlaveTotals[idx] != total)
4689  Warning("Progress", "total events has changed for slave %s", sl->GetName());
4690  fSlaveTotals[idx] = total;
4691 
4692  Long64_t tot = 0;
4693  Int_t i;
4694  for (i = 0; i < fSlaveTotals.GetSize(); i++) tot += fSlaveTotals[i];
4695  Long64_t proc = 0;
4696  for (i = 0; i < fSlaveProgress.GetSize(); i++) proc += fSlaveProgress[i];
4697 
4698  Progress(tot, proc);
4699 }
4700 
4701 ////////////////////////////////////////////////////////////////////////////////
4702 /// Report progress.
4703 
4705  Long64_t processed, Long64_t bytesread,
4706  Float_t initTime, Float_t procTime,
4707  Float_t evtrti, Float_t mbrti)
4708 {
4709  PDB(kGlobal,2)
4710  Info("Progress","%s: %lld %lld %f %f %f %f", sl->GetName(),
4711  processed, bytesread, initTime, procTime, evtrti, mbrti);
4712 
4713  Int_t idx = fSlaves.IndexOf(sl);
4714  if (fSlaveTotals[idx] != total)
4715  Warning("Progress", "total events has changed for slave %s", sl->GetName());
4716  fSlaveTotals[idx] = total;
4717  fSlaveProgress[idx] = processed;
4718  fSlaveBytesRead[idx] = bytesread;
4719  fSlaveInitTime[idx] = (initTime > -1.) ? initTime : fSlaveInitTime[idx];
4720  fSlaveProcTime[idx] = (procTime > -1.) ? procTime : fSlaveProcTime[idx];
4721  fSlaveEvtRti[idx] = (evtrti > -1.) ? evtrti : fSlaveEvtRti[idx];
4722  fSlaveMBRti[idx] = (mbrti > -1.) ? mbrti : fSlaveMBRti[idx];
4723 
4724  Int_t i;
4725  Long64_t tot = 0;
4726  Long64_t proc = 0;
4727  Long64_t bytes = 0;
4728  Float_t init = -1.;
4729  Float_t ptime = -1.;
4730  Float_t erti = 0.;
4731  Float_t srti = 0.;
4732  Int_t nerti = 0;
4733  Int_t nsrti = 0;
4734  for (i = 0; i < fSlaveTotals.GetSize(); i++) {
4735  tot += fSlaveTotals[i];
4736  if (i < fSlaveProgress.GetSize())
4737  proc += fSlaveProgress[i];
4738  if (i < fSlaveBytesRead.GetSize())
4739  bytes += fSlaveBytesRead[i];
4740  if (i < fSlaveInitTime.GetSize())
4741  if (fSlaveInitTime[i] > -1. && (init < 0. || fSlaveInitTime[i] < init))
4742  init = fSlaveInitTime[i];
4743  if (i < fSlaveProcTime.GetSize())
4744  if (fSlaveProcTime[i] > -1. && (ptime < 0. || fSlaveProcTime[i] > ptime))
4745  ptime = fSlaveProcTime[i];
4746  if (i < fSlaveEvtRti.GetSize())
4747  if (fSlaveEvtRti[i] > -1.) {
4748  erti += fSlaveEvtRti[i];
4749  nerti++;
4750  }
4751  if (i < fSlaveMBRti.GetSize())
4752  if (fSlaveMBRti[i] > -1.) {
4753  srti += fSlaveMBRti[i];
4754  nsrti++;
4755  }
4756  }
4757  srti = (nsrti > 0) ? srti / nerti : 0.;
4758 
4759  Progress(tot, proc, bytes, init, ptime, erti, srti);
4760 }
4761 
4762 ////////////////////////////////////////////////////////////////////////////////
4763 /// Progress signal.
4764 
4766 {
4767  if (pi) {
4768  PDB(kGlobal,2)
4769  Info("Progress","%s: %lld %lld %lld %f %f %f %f %d %f", wrk->GetOrdinal(),
4770  pi->fTotal, pi->fProcessed, pi->fBytesRead,
4771  pi->fInitTime, pi->fProcTime, pi->fEvtRateI, pi->fMBRateI,
4772  pi->fActWorkers, pi->fEffSessions);
4773 
4774  Int_t idx = fSlaves.IndexOf(wrk);
4775  if (fSlaveTotals[idx] != pi->fTotal)
4776  Warning("Progress", "total events has changed for worker %s", wrk->GetName());
4777  fSlaveTotals[idx] = pi->fTotal;
4778  fSlaveProgress[idx] = pi->fProcessed;
4779  fSlaveBytesRead[idx] = pi->fBytesRead;
4780  fSlaveInitTime[idx] = (pi->fInitTime > -1.) ? pi->fInitTime : fSlaveInitTime[idx];
4781  fSlaveProcTime[idx] = (pi->fProcTime > -1.) ? pi->fProcTime : fSlaveProcTime[idx];
4782  fSlaveEvtRti[idx] = (pi->fEvtRateI > -1.) ? pi->fEvtRateI : fSlaveEvtRti[idx];
4783  fSlaveMBRti[idx] = (pi->fMBRateI > -1.) ? pi->fMBRateI : fSlaveMBRti[idx];
4784  fSlaveActW[idx] = (pi->fActWorkers > -1) ? pi->fActWorkers : fSlaveActW[idx];
4785  fSlaveTotS[idx] = (pi->fTotSessions > -1) ? pi->fTotSessions : fSlaveTotS[idx];
4786  fSlaveEffS[idx] = (pi->fEffSessions > -1.) ? pi->fEffSessions : fSlaveEffS[idx];
4787 
4788  Int_t i;
4789  Int_t nerti = 0;
4790  Int_t nsrti = 0;
4791  TProofProgressInfo pisum(0, 0, 0, -1., -1., 0., 0., 0, 0, 0.);
4792  for (i = 0; i < fSlaveTotals.GetSize(); i++) {
4793  pisum.fTotal += fSlaveTotals[i];
4794  if (i < fSlaveProgress.GetSize())
4795  pisum.fProcessed += fSlaveProgress[i];
4796  if (i < fSlaveBytesRead.GetSize())
4797  pisum.fBytesRead += fSlaveBytesRead[i];
4798  if (i < fSlaveInitTime.GetSize())
4799  if (fSlaveInitTime[i] > -1. && (pisum.fInitTime < 0. || fSlaveInitTime[i] < pisum.fInitTime))
4800  pisum.fInitTime = fSlaveInitTime[i];
4801  if (i < fSlaveProcTime.GetSize())
4802  if (fSlaveProcTime[i] > -1. && (pisum.fProcTime < 0. || fSlaveProcTime[i] > pisum.fProcTime))
4803  pisum.fProcTime = fSlaveProcTime[i];
4804  if (i < fSlaveEvtRti.GetSize())
4805  if (fSlaveEvtRti[i] > -1.) {
4806  pisum.fEvtRateI += fSlaveEvtRti[i];
4807  nerti++;
4808  }
4809  if (i < fSlaveMBRti.GetSize())
4810  if (fSlaveMBRti[i] > -1.) {
4811  pisum.fMBRateI += fSlaveMBRti[i];
4812  nsrti++;
4813  }
4814  if (i < fSlaveActW.GetSize())
4815  pisum.fActWorkers += fSlaveActW[i];
4816  if (i < fSlaveTotS.GetSize())
4817  if (fSlaveTotS[i] > -1 && (pisum.fTotSessions < 0. || fSlaveTotS[i] > pisum.fTotSessions))
4818  pisum.fTotSessions = fSlaveTotS[i];
4819  if (i < fSlaveEffS.GetSize())
4820  if (fSlaveEffS[i] > -1. && (pisum.fEffSessions < 0. || fSlaveEffS[i] > pisum.fEffSessions))
4821  pisum.fEffSessions = fSlaveEffS[i];
4822  }
4823  pisum.fMBRateI = (nsrti > 0) ? pisum.fMBRateI / nerti : 0.;
4824 
4825  Progress(&pisum);
4826  }
4827 }
4828 
4829 ////////////////////////////////////////////////////////////////////////////////
4830 /// Send progress and feedback to client.
4831 
4833 {
4834  if (fFeedbackTimer == 0) return kFALSE; // timer stopped already
4835 
4836  Int_t i;
4837  Long64_t tot = 0;
4838  Long64_t proc = 0;
4839  Long64_t bytes = 0;
4840  Float_t init = -1.;
4841  Float_t ptime = -1.;
4842  Float_t erti = 0.;
4843  Float_t srti = 0.;
4844  Int_t nerti = 0;
4845  Int_t nsrti = 0;
4846  for (i = 0; i < fSlaveTotals.GetSize(); i++) {
4847  tot += fSlaveTotals[i];
4848  if (i < fSlaveProgress.GetSize())
4849  proc += fSlaveProgress[i];
4850  if (i < fSlaveBytesRead.GetSize())
4851  bytes += fSlaveBytesRead[i];
4852  if (i < fSlaveInitTime.GetSize())
4853  if (fSlaveInitTime[i] > -1. && (init < 0. || fSlaveInitTime[i] < init))
4854  init = fSlaveInitTime[i];
4855  if (i < fSlaveProcTime.GetSize())
4856  if (fSlaveProcTime[i] > -1. && (ptime < 0. || fSlaveProcTime[i] > ptime))
4857  ptime = fSlaveProcTime[i];
4858  if (i < fSlaveEvtRti.GetSize())
4859  if (fSlaveEvtRti[i] > -1.) {
4860  erti += fSlaveEvtRti[i];
4861  nerti++;
4862  }
4863  if (i < fSlaveMBRti.GetSize())
4864  if (fSlaveMBRti[i] > -1.) {
4865  srti += fSlaveMBRti[i];
4866  nsrti++;
4867  }
4868  }
4869  erti = (nerti > 0) ? erti / nerti : 0.;
4870  srti = (nsrti > 0) ? srti / nerti : 0.;
4871 
4873  if (gProofServ->GetProtocol() > 25) {
4874  // Fill the message now
4875  TProofProgressInfo pi(tot, proc, bytes, init, ptime,
4876  erti, srti, -1,
4878  m << &pi;
4879  } else {
4880 
4881  m << tot << proc << bytes << init << ptime << erti << srti;
4882  }
4883 
4884  // send message to client;
4885  gProofServ->GetSocket()->Send(m);
4886 
4887  if (fReturnFeedback)
4889  else
4890  return kFALSE;
4891 }
4892 
4893 ////////////////////////////////////////////////////////////////////////////////
4894 /// Setup reporting of feedback objects and progress messages.
4895 
4897 {
4898  if (IsClient()) return; // Client does not need timer
4899 
4901 
4902  if (fFeedbackTimer) {
4904  return;
4905  } else {
4907  }
4908 
4909  // setup the timer for progress message
4910  SafeDelete(fFeedbackTimer);
4911  fFeedbackPeriod = 2000;
4912  TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
4913  fFeedbackTimer = new TTimer;
4914  fFeedbackTimer->SetObject(this);
4915  fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
4916 }
const char * GetHost() const
Definition: TUrl.h:76
const char * GetName() const
Returns name of object.
Definition: TObjString.h:42
Int_t fTotSessions
Definition: TProof.h:194
const char * GetSessionDir() const
Definition: TProofServ.h:259
Bool_t IsRetrieve() const
TList * GetOutputList()
Definition: TQueryResult.h:139
virtual const char * BaseName(const char *pathname)
Base name of a file name. Base name of /user/root is root.
Definition: TSystem.cxx:928
virtual Int_t GetEntries() const
Definition: TCollection.h:92
Long64_t Process(const char *selector, Long64_t nentries=-1, Option_t *option="")
Process the specified TSelector file 'nentries' times.
virtual Int_t Write(const char *name=0, Int_t option=0, Int_t bufsize=0)
Write this object to the current directory.
Definition: TObject.cxx:823
virtual const char * GetTitle() const
Returns title of object.
Definition: TNamed.h:52
void AddFeedback(const char *name)
Add object to feedback list.
Definition: TProof.cxx:10556
virtual Bool_t AccessPathName(const char *path, EAccessMode mode=kFileExists)
Returns FALSE if one can access a file using the specified access mode.
Definition: TSystem.cxx:1265
Long64_t fTotal
Definition: TProof.h:186
TNtuple * GetProgressPerf(Bool_t steal=kFALSE)
static FILE * SetErrorHandlerFile(FILE *ferr)
Set the file stream where to log (default stderr).
Definition: TMutex.h:37
Ssiz_t Last(char c) const
Find last occurrence of a character c.
Definition: TString.cxx:864
void Progress(Long64_t total, Long64_t processed)
Get query progress information.
Definition: TProof.cxx:9768
ErrorHandlerFunc_t SetErrorHandler(ErrorHandlerFunc_t newhandler)
Set an errorhandler function. Returns the old handler.
Definition: TError.cxx:106
virtual Int_t Fill(Double_t x)
Increment bin with abscissa X by 1.
Definition: TH1.cxx:3159
const char * GetOutputFileName() const
virtual TString SplitAclicMode(const char *filename, TString &mode, TString &args, TString &io) const
This method split a filename of the form: ~~~ {.cpp} [path/]macro.C[+|++[k|f|g|O|c|s|d|v|-]][(args)]...
Definition: TSystem.cxx:4071
void SetMerging(Bool_t on=kTRUE)
Switch on/off merge timer.
An array of TObjects.
Definition: TObjArray.h:39
const char * GetOrdinal() const
Definition: TSlave.h:135
TProofProgressStatus * fProgressStatus
Definition: TProofPlayer.h:94
TFileInfo * GetFileInfo(const char *type="TTree")
Return the content of this element in the form of a TFileInfo.
Definition: TDSet.cxx:234
float xmin
Definition: THbookFile.cxx:93
Bool_t IsDraw() const
Definition: TQueryResult.h:152
virtual void Delete(Option_t *option="")
Remove all objects from the list AND delete all heap based objects.
Definition: TList.cxx:404
UInt_t Convert(Bool_t toGMT=kFALSE) const
Convert fDatime from TDatime format to the standard time_t format.
Definition: TDatime.cxx:179
void ValidateDSet(TDSet *dset)
Validate a TDSet.
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
Definition: TStopwatch.cxx:108
void SetWorkerOrdinal(const char *ordinal)
virtual Double_t GetBinContent(Int_t bin) const
Return content of bin number bin.
Definition: TH1.cxx:4629
long long Long64_t
Definition: RtypesCore.h:69
EExitStatus fExitStatus
status of query in progress
Definition: TProofPlayer.h:92
void Activate(TList *slaves=0)
Activate slave server list.
Definition: TProof.cxx:2384
TSocket * GetSocket() const
Definition: TSlave.h:138
virtual TDSetElement * Next(Long64_t totalEntries=-1)
Returns next TDSetElement.
Definition: TDSet.cxx:416
Int_t fPhysRam
Definition: TSystem.h:169
Int_t AssertSelector(const char *selector_file)
Make sure that a valid selector object Return -1 in case of problems, 0 otherwise.
void FinalizationDone()
Definition: TProof.h:729
R__EXTERN Int_t gErrorIgnoreLevel
Definition: TError.h:107
void StoreOutput(TList *out)
Store output list (may not be used in this class).
virtual const char * WorkingDirectory()
Return working directory.
Definition: TSystem.cxx:865
virtual const char * GetName() const
Return name of this collection.
Int_t GetLearnEntries()
Return the number of entries in the learning phase.
virtual Bool_t InheritsFrom(const char *classname) const
Returns kTRUE if object inherits from class "classname".
Definition: TObject.cxx:487
static TMD5 * FileChecksum(const char *file)
Returns checksum of specified file.
Definition: TMD5.cxx:472
Long64_t GetEventsProcessed() const
Definition: TProofPlayer.h:223
Bool_t IsFinalized() const
Definition: TQueryResult.h:153
virtual Bool_t SendProcessingProgress(Double_t, Double_t, Bool_t=kFALSE)
void SetMemValues(Long_t vmem=-1, Long_t rmem=-1, Bool_t master=kFALSE)
Set max memory values.
Definition: TStatus.cxx:159
ClassImp(TSeqCollection) Int_t TSeqCollection TIter next(this)
Return index of object in collection.
Ssiz_t Length() const
Definition: TString.h:390
const double pi
const char Int_t const char TProof Int_t const char const char * msd
Definition: TXSlave.cxx:46
void Print(Option_t *option="") const
Dump the class content.
Collectable string class.
Definition: TObjString.h:32
#define kPEX_ABORTED
float Float_t
Definition: RtypesCore.h:53
Long64_t fBytesRead
Definition: TProof.h:188
virtual void SetDirectory(TDirectory *dir)
By default when an histogram is created, it is added to the list of histogram objects in the current ...
Definition: TH1.cxx:8266
void SetDir(const char *dir, Bool_t raw=kFALSE)
TProofLockPath * GetCacheLock()
Definition: TProofServ.h:293
const char Option_t
Definition: RtypesCore.h:62
float ymin
Definition: THbookFile.cxx:93
const char * GetObjName() const
Definition: TDSet.h:229
TObject * FindObject(const char *name) const
Find object using its name.
Definition: THashList.cxx:212
void SetValue(TObject *val)
Definition: TMap.h:126
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:520
This class represents a WWW compatible URL.
Definition: TUrl.h:41
TString & ReplaceAll(const TString &s1, const TString &s2)
Definition: TString.h:635
const char * GetDataDir() const
Definition: TProofServ.h:262
void SetWriteV3(Bool_t on=kTRUE)
Set/Reset the 'OldStreamer' bit in this instance and its elements.
Definition: TDSet.cxx:1846
virtual Int_t GetDimension() const
Definition: TH1.h:283
static void FilterLocalroot(TString &path, const char *url="root://dum/")
If 'path' is local and 'dsrv' is Xrootd, apply 'path.Localroot' settings, if any. ...
TMacro * GetSelecHdr() const
Definition: TQueryResult.h:135
TQueryResult * GetQueryResult(const char *ref)
Get query result instances referenced 'ref' from the list of results.
virtual Bool_t JoinProcess(TList *workers)
Not implemented: meaningful only in the remote player. Returns kFALSE.
Bool_t CheckMemUsage(Long64_t &mfreq, Bool_t &w80r, Bool_t &w80v, TString &wmsg)
Check the memory usage, if requested.
#define gDirectory
Definition: TDirectory.h:218
Definition: TDSet.h:153
virtual Long64_t Finalize(Bool_t force=kFALSE, Bool_t sync=kFALSE)
Finalize query (may not be used in this class).
Bool_t fSaveResultsPerPacket
Definition: TProofPlayer.h:118
virtual void SetName(const char *name)
Change (i.e.
Definition: TNamed.cxx:128
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
void StopProcess(Bool_t abort, Int_t timeout=-1)
Stop process after this event.
void MayNotUse(const char *method) const
Use this method to signal that a method (defined in a base class) may not be called in a derived clas...
Definition: TObject.cxx:971
const char * GetProtocol() const
Definition: TUrl.h:73
TH1 * h
Definition: legend2.C:5
void SetParameter(const char *par, const char *value)
Set input list parameter.
Definition: TProof.cxx:10389
void Add(const char *mesg)
Add an error message.
Definition: TStatus.cxx:46
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:892
Long64_t GetBytesRead() const
virtual Bool_t Merge(Bool_t=kTRUE)
Merge the files.
void AddOutput(TList *out)
Incorporate output list (may not be used in this class).
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format...
Definition: TFile.h:45
TString fSelectorFileName
Definition: TProofPlayer.h:310
void SetupFeedback()
Setup reporting of feedback objects.
ERunStatus GetRunStatus() const
Definition: TProof.h:976
static Long_t GetVirtMemMax()
VirtMemMax getter.
const char * GetOptions() const
Definition: TQueryResult.h:127
virtual int MakeDirectory(const char *name)
Make a directory.
Definition: TSystem.cxx:821
virtual void MergeOutput(Bool_t savememvalues=kFALSE)
Merge objects in output the lists.
void SetLastUpdate(Double_t updtTime=0)
Update time stamp either with the passed value (if > 0) or with the current time. ...
virtual TObject * Get(const char *namecycle)
Return pointer to object identified by namecycle.
virtual Bool_t ChangeDirectory(const char *path)
Change directory.
Definition: TSystem.cxx:856
Bool_t Notify()
Definition: TTimer.cxx:65
void RemoveQueryResult(const char *ref)
Remove all query result instances referenced 'ref' from the list of results.
Bool_t HandleTimer(TTimer *timer)
Send progress and feedback to client.
#define gROOT
Definition: TROOT.h:340
TString fLogFileName
Definition: TProof.h:539
Int_t AdoptFile(TFile *f)
Adopt a file already open.
TDSetElement * GetNextPacket(TSlave *slave, TMessage *r)
Get next packet for specified slave.
Float_t fEvtRateI
Definition: TProof.h:191
const char * GetFileName() const
UInt_t GetTypeOpt() const
virtual int Load(const char *module, const char *entry="", Bool_t system=kFALSE)
Load a shared library.
Definition: TSystem.cxx:1818
virtual const char * TempDirectory() const
Return a user configured or systemwide directory to create temporary files in.
Definition: TSystem.cxx:1447
Basic string class.
Definition: TString.h:137
void SetReadCalls(Long64_t readCalls)
TAlienJobStatus * status
Definition: TAlienJob.cxx:51
int Int_t
Definition: RtypesCore.h:41
virtual const char * DirName(const char *pathname)
Return the directory name in pathname.
Definition: TSystem.cxx:996
virtual TDirectory * mkdir(const char *name, const char *title="")
Create a sub-directory and return a pointer to the created directory.
Definition: TDirectory.cxx:955
bool Bool_t
Definition: RtypesCore.h:59
void RemoveFeedback(const char *name)
Remove object from feedback list.
Definition: TProof.cxx:10567
const Bool_t kFALSE
Definition: Rtypes.h:92
#define ENDTRY
Definition: TException.h:79
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:496
Int_t Broadcast(const TMessage &mess, TList *slaves)
Broadcast a message to all slaves in the specified list.
Definition: TProof.cxx:2470
virtual char * Which(const char *search, const char *file, EAccessMode mode=kFileExists)
Find location of file in a search path.
Definition: TSystem.cxx:1511
Bool_t HistoSameAxis(TH1 *h0, TH1 *h1)
Return kTRUE is the histograms 'h0' and 'h1' have the same binning and ranges on the axis (i...
Bool_t IsValid() const
virtual Int_t GetNbinsX() const
Definition: TH1.h:296
void SetOption(Option_t *option)
Definition: TDrawFeedback.h:56
TMacro * GetSelecImp() const
Definition: TQueryResult.h:136
const char * GetTopSessionTag() const
Definition: TProofServ.h:258
virtual Double_t GetEntries() const
return the current number of entries
Definition: TH1.cxx:4051
Bool_t IsSync() const
Definition: TProof.h:704
virtual void SetInitTime()
Set the initialization time.
TLatex * t1
Definition: textangle.C:20
TQueryResult * fQuery
Definition: TProofPlayer.h:101
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
Definition: TString.h:558
void SetDispatchTimer(Bool_t on=kTRUE)
Enable/disable the timer to dispatch pening events while processing.
TString & Insert(Ssiz_t pos, const char *s)
Definition: TString.h:592
Bool_t GetValid() const
Definition: TDSet.h:121
Short_t Abs(Short_t d)
Definition: TMathBase.h:110
virtual TObject * At(Int_t idx) const
Returns the object at position idx. Returns 0 if idx is out of range.
Definition: TList.cxx:310
Int_t fActWorkers
Definition: TProof.h:193
static Float_t GetMemHWM()
MemHWM getter.
virtual int GetProcInfo(ProcInfo_t *info) const
Returns cpu and memory used by this process into the ProcInfo_t structure.
Definition: TSystem.cxx:2443
const Int_t kBreak
Definition: TError.h:42
void MapOutputListToDataMembers() const
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:732
static TOutputListSelectorDataMap * FindInList(TCollection *coll)
Find a TOutputListSelectorDataMap in a collection.
Int_t SavePartialResults(Bool_t queryend=kFALSE, Bool_t force=kFALSE)
Save the partial results of this query to a dedicated file under the user data directory.
TList * fInput
Definition: TProofPlayer.h:83
TString & Replace(Ssiz_t pos, Ssiz_t n, const char *s)
Definition: TString.h:625
static const char * GetMacroPath()
Get macro search path. Static utility function.
Definition: TROOT.cxx:2406
Double_t GetProcTime() const
virtual void SetValue(const char *name, const char *value, EEnvLevel level=kEnvChange, const char *type=0)
Set the value of a resource or create a new resource.
Definition: TEnv.cxx:749
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=1, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:3851
const char * GetObjName() const
Definition: TDSet.h:122
TMessage * fProcessMessage
Histogram with packets being processed (owned by TPerfStats)
Definition: TProofPlayer.h:309
This class defines a UUID (Universally Unique IDentifier), also known as GUIDs (Globally Unique IDent...
Definition: TUUID.h:44
virtual Int_t GetN() const
Definition: TEventList.h:58
virtual void StopProcess(Bool_t abort, Bool_t stoptimer=kFALSE)
Stop process.
Long64_t GetNum() const
Definition: TDSet.h:116
const char * Data() const
Definition: TString.h:349
void SetStopTimer(Bool_t on=kTRUE, Bool_t abort=kFALSE, Int_t timeout=0)
Enable/disable the timer to stop/abort processing.
virtual TObject * ReadObject(const TClass *cl)
Read object from I/O buffer.
static void SetLastEntry(Long64_t lastentry)
Set the last entry before exception.
const char * GetDirectory() const
Definition: TDSet.h:230
static struct mg_connection * fc(struct mg_context *ctx)
Definition: civetweb.c:839
#define SafeDelete(p)
Definition: RConfig.h:436
virtual int Unlink(const char *name)
Unlink, i.e. remove, a file.
Definition: TSystem.cxx:1346
virtual TObject * Clone(const char *newname="") const
Make a clone of an object using the Streamer facility.
Definition: TObject.cxx:203
Double_t dot(const TVector2 &v1, const TVector2 &v2)
Definition: CsgOps.cxx:333
void Stop()
Stop the stopwatch.
Definition: TStopwatch.cxx:75
TMutex * fStopTimerMtx
Definition: TProofPlayer.h:107
virtual void SetAutoFlush(Long64_t autof=-30000000)
This function may be called at the start of a program to change the default value for fAutoFlush...
Definition: TTree.cxx:7401
static TString Format(const char *fmt,...)
Static method which formats a string using a printf style format descriptor and return a TString...
Definition: TString.cxx:2334
#define PDB(mask, level)
Definition: TProofDebug.h:58
void Class()
Definition: Class.C:29
Long64_t fReadBytesRun
Definition: TProofPlayer.h:96
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
Definition: THashList.h:36
TEventIter * fEvIter
period (ms) for sending intermediate results
Definition: TProofPlayer.h:90
const char * ord
Definition: TXSlave.cxx:46
Float_t GetEffSessions() const
Definition: TProofServ.h:276
This code implements the MD5 message-digest algorithm.
Definition: TMD5.h:46
const char * GetMsd() const
Definition: TDSet.h:119
The TNamed class is the base class for all named ROOT classes.
Definition: TNamed.h:33
EQueryMode GetQueryMode(Option_t *mode=0) const
Find out the query mode based on the current setting and 'mode'.
Definition: TProof.cxx:6106
virtual Bool_t IsEmpty() const
Definition: TCollection.h:99
void IncEntries(Long64_t entries=1)
Long64_t GetBytesRead() const
static Long64_t GetFileBytesRead()
Static function returning the total number of bytes read from all files.
Definition: TFile.cxx:4319
void StopFeedback()
Stop reporting of feedback objects.
Int_t GetQuerySeqNum() const
Definition: TProofServ.h:272
static EFileType GetType(const char *name, Option_t *option="", TString *prefix=0)
Resolve the file type as a function of the protocol field in 'name'.
Definition: TFile.cxx:4566
void AddQueryResult(TQueryResult *q)
Add query result to the list, making sure that there are no duplicates.
virtual TObject * FindObject(const char *name) const
search object named name in the list of functions
Definition: TH1.cxx:3578
R__EXTERN TVirtualMonitoringWriter * gMonitoringWriter
virtual Bool_t HandleTimer(TTimer *timer)
Execute action in response of a timer timing out.
Definition: TObject.cxx:469
virtual const char * Getenv(const char *env)
Get environment variable.
Definition: TSystem.cxx:1627
void DeleteDrawFeedback(TDrawFeedback *f)
Delete draw feedback object.
Int_t Collect(const TSlave *sl, Long_t timeout=-1, Int_t endtype=-1, Bool_t deactonfail=kFALSE)
Collect responses from slave sl.
Definition: TProof.cxx:2664
TList * GetListOfElements() const
Definition: TDSet.h:231
if(pyself &&pyself!=Py_None)
void SetDrawFeedbackOption(TDrawFeedback *f, Option_t *opt)
Set draw feedback option.
void ResetMergePrg()
Reset the merge progress notificator.
Definition: TProof.cxx:2460
A sorted doubly linked list.
Definition: TSortedList.h:30
void Info(const char *location, const char *msgfmt,...)
TList * MergeFeedback()
Merge feedback lists.
Long64_t fProcessedRun
Read calls in this run.
Definition: TProofPlayer.h:98
virtual Long64_t Process(TDSet *set, const char *selector, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)
Process specified TDSet on PROOF.
TProofProgressStatus * GetProgressStatus() const
Definition: TProofPlayer.h:241
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Definition: TMap.cxx:234
void Validate()
Validate the TDSet by opening files.
Definition: TDSet.cxx:1561
Int_t Atoi() const
Return integer value of string.
Definition: TString.cxx:1964
Bool_t IsParallel() const
Definition: TProof.h:972
void SetOutputFileName(const char *name)
Set the name of the output file; in the form of an Url.
static void GetMemValues(Long_t &vmax, Long_t &rmax)
Get memory usage.
Definition: TPerfStats.cxx:792
virtual void SetupFeedback()
Set up feedback (may not be used in this class).
Int_t fNotIdle
Definition: TProof.h:533
Long64_t GetMsgSizeHWM() const
Definition: TProofServ.h:286
static Long_t GetResMemMax()
ResMemMax getter.
void SetBytesRead(Long64_t bytesRead)
TList * GetOutputList() const
Get output list.
Method or function calling interface.
Definition: TMethodCall.h:41
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:918
void SetCurrentQuery(TQueryResult *q)
Set current query and save previous value.
TH1F * h1
Definition: legend1.C:5
const Bool_t kSortDescending
Definition: TList.h:41
A container class for query results.
Definition: TQueryResult.h:44
Double_t GetXmin() const
Definition: TAxis.h:137
TDSetElement * Current() const
Definition: TDSet.h:238
#define kPEX_STOPPED
virtual Bool_t OutputFile(const char *url, Bool_t force)
Open merger output file.
void Error(const char *location, const char *msgfmt,...)
THashList * fOutput
Definition: TProofPlayer.h:84
static void SetLimitsFinder(THLimitsFinder *finder)
This static function can be used to specify a finder derived from THLimitsFinder. ...
char * out
Definition: TBase64.cxx:29
virtual Int_t SendAsynMessage(const char *msg, Bool_t lf=kTRUE)
Send an asychronous message to the master / client .
static void Start(TList *input, TList *output)
Initialize PROOF statistics run.
Definition: TPerfStats.cxx:746
void SetInitTime()
Set init time.
Bool_t IsLite() const
Definition: TProof.h:966
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition: TProof.cxx:10485
Long64_t GetFirst() const
Definition: TDSet.h:114
void SetFileName(const char *name)
Float_t fProcTime
Definition: TProof.h:190
void Feedback(TList *objs)
Feedback signal.
Int_t GetBufferSize() const
Definition: TH1.h:238
TFileMerger * GetFileMerger(Bool_t local=kFALSE)
Get instance of the file merger to be used in 'merge' mode.
virtual Bool_t Notify()
Notify when timer times out.
Definition: TTimer.cxx:141
A doubly linked list.
Definition: TList.h:47
TObject * GetEntryList() const
Definition: TDSet.h:251
Bool_t fRedirLog
Definition: TProof.h:538
void ClearInput()
Clear input list.
void UpdateAutoBin(const char *name, Double_t &xmin, Double_t &xmax, Double_t &ymin, Double_t &ymax, Double_t &zmin, Double_t &zmax)
Update automatic binning parameters for given object "name".
static Float_t GetMemStop()
MemStop getter.
#define CATCH(n)
Definition: TException.h:73
Int_t GetPort() const
Definition: TUrl.h:87
static void Setup(TList *input)
Setup the PROOF input list with requested statistics and tracing options.
Definition: TPerfStats.cxx:729
virtual Bool_t AddFile(TFile *source, Bool_t own, Bool_t cpProgress)
Add the TFile to this file merger and give ownership of the TFile to this object (unless kFALSE is re...
void HandleGetTreeHeader(TMessage *mess)
Handle tree header request.
const char * GetName() const
Returns name of object.
Definition: TSlave.h:128
TSocket * GetSocket() const
Definition: TProofServ.h:269
EFileType
File type.
Definition: TFile.h:163
void Clear(Option_t *option="")
Remove all objects from the list.
Definition: THashList.cxx:167
float ymax
Definition: THbookFile.cxx:93
TDSetElement * GetNextPacket(TSlave *slave, TMessage *r)
Get next packet (may not be used in this class).
static Bool_t gAbort
const char * GetFileName() const
Definition: TDSet.h:113
Float_t fInitTime
Definition: TProof.h:189
Int_t fCpus
Definition: TSystem.h:165
const char * GetPrefix() const
Definition: TProofServ.h:288
ROOT::R::TRInterface & r
Definition: Object.C:4
Bool_t EndsWith(const char *pat, ECaseCompare cmp=kExact) const
Return true if string ends with the specified string.
Definition: TString.cxx:2220
Class to manage histogram axis.
Definition: TAxis.h:36
Float_t GetProcTime() const
R__EXTERN TSystem * gSystem
Definition: TSystem.h:549
Int_t GetNbins() const
Definition: TAxis.h:125
static void GetLocalServer(TString &dsrv)
Extract LOCALDATASERVER info in 'dsrv'.
const char * GetLibList() const
Definition: TQueryResult.h:137
1-D histogram with a int per channel (see TH1 documentation)}
Definition: TH1.h:529
static Bool_t IsStandardDraw(const char *selec)
Find out if this is a standard selection used for Draw actions (either TSelectorDraw, TProofDraw or deriving from them).
Definition: TSelector.cxx:235
TDirectory * GetDirectory() const
Definition: TTree.h:381
This class provides file copy and merging services.
Definition: TFileMerger.h:30
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Definition: TEnv.cxx:494
virtual const char * ClassName() const
Returns name of class to which the object belongs.
Definition: TObject.cxx:187
void AddInput(TObject *inp)
Add object to input list.
virtual void StopFeedback()
Stop feedback (may not be used in this class).
void SetupFeedback()
Setup reporting of feedback objects and progress messages.
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition: TList.cxx:674
Long64_t GetTotalEntries() const
Long_t fMemVirtual
Definition: TSystem.h:207
TObject * Next()
Definition: TCollection.h:158
Int_t fSeqNum
Definition: TProof.h:554
virtual TEnvRec * Lookup(const char *n)
Loop over all resource records and return the one with name.
Definition: TEnv.cxx:550
void AddOutput(TList *out)
Incorporate the content of the received output list 'out' into the final output list fOutput...
TObject * Value() const
Definition: TMap.h:125
void SetMerged(Bool_t merged=kTRUE)
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition: TString.cxx:2321
virtual Bool_t JoinProcess(TList *workers)
Prepares the given list of new workers to join a progressing process.
TList * GetInputList()
Definition: TQueryResult.h:128
Bool_t TestBit(UInt_t f) const
Definition: TObject.h:173
TMarker * m
Definition: textangle.C:8
Bool_t SetDataMembers(TSelector *sel) const
Given an output list, set the data members of a TSelector.
char * Form(const char *fmt,...)
Long64_t Process(TDSet *set, const char *selector, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)
Process specified TDSet on PROOF worker.
virtual Bool_t SendProcessingStatus(const char *, Bool_t=kFALSE)
const char * AsString() const
Return UUID as string. Copy string immediately since it will be reused.
Definition: TUUID.cxx:531
void SaveSource(FILE *fp)
Save macro source in file pointer fp.
Definition: TMacro.cxx:379
virtual Int_t GetNbinsZ() const
Definition: TH1.h:298
A TEventList object is a list of selected events (entries) in a TTree.
Definition: TEventList.h:33
Handles synchronous and a-synchronous timer events.
Definition: TTimer.h:57
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:51
void SetLastMergingMsg(TObject *obj)
Set the message to be notified in case of exception.
Long64_t fProcessed
Definition: TProof.h:187
Int_t GetProtocol() const
Definition: TProofServ.h:264
The ROOT global object gROOT contains a list of all defined classes.
Definition: TClass.h:81
TAxis * GetYaxis()
Definition: TH1.h:320
virtual Bool_t HandleTimer(TTimer *timer)
Send feedback objects to client.
float xmax
Definition: THbookFile.cxx:93
TStopwatch fQuerySTW
Definition: TProof.h:624
Int_t ReinitSelector(TQueryResult *qr)
Reinitialize fSelector using the selector files in the query result.
static Int_t GetFileReadCalls()
Static function returning the total number of read calls from all files.
Definition: TFile.cxx:4336
const char * GetCacheDir() const
Definition: TProofServ.h:261
Bool_t IsNull() const
Definition: TString.h:387
void SetName(const char *name)
Definition: TCollection.h:116
Int_t fProtocol
Definition: TProof.h:602
void Reset(Detail::TBranchProxy *x)
void HandleRecvHisto(TMessage *mess)
Receive histo from slave.
Int_t SendFile(const char *file, Int_t opt=(kBinary|kForward|kCp|kCpBin), const char *rfile=0, TSlave *sl=0)
Send a file to master or slave servers.
Definition: TProof.cxx:6881
void Warning(const char *location, const char *msgfmt,...)
Float_t fEffSessions
Definition: TProof.h:195
Long64_t GetEntries() const
Long64_t entry
Long64_t Finalize(Bool_t force=kFALSE, Bool_t sync=kFALSE)
Finalize query (may not be used in this class).
void FeedBackCanvas(const char *name, Bool_t create)
Create/destroy a named canvas for feedback.
const Double_t * GetBuffer() const
Definition: TH1.h:239
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap...
Definition: TMap.cxx:149
#define Printf
Definition: TGeoToOCC.h:18
Bool_t Matches(const char *ref)
Return TRUE if reference ref matches.
Int_t AddOutputObject(TObject *obj)
Incorporate the received object 'obj' into the output list fOutput.
TDatime GetStartTime() const
Definition: TQueryResult.h:125
void InitWithPrototype(TClass *cl, const char *method, const char *proto, Bool_t objectIsConst=kFALSE, ROOT::EFunctionMatchMode mode=ROOT::kConversionMatch)
Initialize the method invocation environment.
const char * GetUrl(Bool_t withDeflt=kFALSE) const
Return full URL.
Definition: TUrl.cxx:385
Int_t GetSize() const
Definition: TArray.h:49
Long64_t Merge(TCollection *list)
Merge objects from the list into this object.
static TEventIter * Create(TDSet *dset, TSelector *sel, Long64_t first, Long64_t num)
Create and instance of the appropriate iterator.
Definition: TEventIter.cxx:151
virtual Int_t FindBin(Double_t x)
Find bin number corresponding to abscissa x.
Definition: TAxis.cxx:264
void UpdateProgressInfo()
Update fProgressStatus.
static TSelector * GetSelector(const char *filename)
The code in filename is loaded (interpreted or compiled, see below), filename must contain a valid cl...
Definition: TSelector.cxx:140
static unsigned int total
virtual Int_t RedirectOutput(const char *name, const char *mode="a", RedirectHandle_t *h=0)
Redirect standard output (stdout, stderr) to the specified file.
Definition: TSystem.cxx:1677
void SetHost(const char *host)
Definition: TUrl.h:93
TString & Remove(Ssiz_t pos)
Definition: TString.h:616
long Long_t
Definition: RtypesCore.h:50
int Ssiz_t
Definition: RtypesCore.h:63
Bool_t IsClient() const
Definition: TProofPlayer.h:219
TList * GetInputList()
Get input list.
Definition: TProof.cxx:10320
Bool_t IsClient() const
Is the player running on the client?
TString fOutputFilePath
Definition: TProofPlayer.h:114
Class used by TMap to store (key,value) pairs.
Definition: TMap.h:106
Int_t GetBufferLength() const
Definition: TH1.h:237
TFile * fOutputFile
Definition: TProofPlayer.h:115
TList * GetListOfActiveSlaves() const
Definition: TProof.h:758
virtual const char * GetIncludePath()
Get the list of include path.
Definition: TSystem.cxx:3805
virtual Int_t GetSize() const
Definition: TCollection.h:95
virtual void SetDirectory(TDirectory *dir)
Change the tree's directory.
Definition: TTree.cxx:8095
virtual void MergeOutput(Bool_t savememvalues=kFALSE)
Merge output (may not be used in this class).
void RestorePreviousQuery()
Definition: TProofPlayer.h:186
double f(double x)
void ResetParam()
Reset parameter list. To be used before the first call the SetParam().
TList * GetMergeList() const
Definition: TFileMerger.h:81
static Int_t init()
virtual Func_t DynFindSymbol(const char *module, const char *entry)
Find specific entry point in specified library.
Definition: TSystem.cxx:1982
virtual const char * GetName() const
Returns name of object.
Definition: TObject.cxx:415
Int_t fMergersCount
Definition: TProof.h:581
double Double_t
Definition: RtypesCore.h:55
virtual const char * HostName()
Return the system's host name.
Definition: TSystem.cxx:307
void RedirectOutput(Bool_t on=kTRUE)
Control output redirection to TProof::fLogFileW.
char * DynamicPathName(const char *lib, Bool_t quiet=kFALSE)
Find a dynamic library called lib using the system search paths.
Definition: TSystem.cxx:1958
Int_t Lock()
Locks the directory.
TObject * Key() const
Definition: TMap.h:124
#define TRY
Definition: TException.h:66
Bool_t IsMaster() const
Definition: TProofServ.h:305
Describe directory structure in memory.
Definition: TDirectory.h:41
void Feedback(TList *objs)
Get list of feedback objects.
Definition: TProof.cxx:9835
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:44
int type
Definition: TGX11.cxx:120
R__EXTERN TEnv * gEnv
Definition: TEnv.h:174
ClassImp(TMCParticle) void TMCParticle printf(": p=(%7.3f,%7.3f,%9.3f) ;", fPx, fPy, fPz)
void HandleGetTreeHeader(TMessage *mess)
Handle tree header request.
const char * GetOrdinal() const
Definition: TProofServ.h:265
TNamed()
Definition: TNamed.h:40
virtual void SendInputDataFile()
Send the input data objects to the master; the objects are taken from the dedicated list and / or the...
Definition: TProof.cxx:10179
Definition: TProof.h:339
EExitStatus GetExitStatus() const
Definition: TProofPlayer.h:222
Double_t GetXmax() const
Definition: TAxis.h:138
int nentries
Definition: THbookFile.cxx:89
ErrorHandlerFunc_t fErrorHandler
tdset for current processing
Definition: TProofPlayer.h:306
The TH1 histogram class.
Definition: TH1.h:80
virtual TMD5 * Checksum()
Returns checksum of the current content.
Definition: TMacro.cxx:192
virtual TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet.
Bool_t IsMaster() const
Definition: TProof.h:969
#define R__LOCKGUARD(mutex)
virtual void Reset()
Reset or initialize access to the elements.
Definition: TDSet.cxx:1340
Bool_t MergeOutputFiles()
Merge output in files.
virtual void DispatchOneEvent(Bool_t pendingOnly=kFALSE)
Dispatch a single event.
Definition: TSystem.cxx:433
Long64_t DrawSelect(TDSet *set, const char *varexp, const char *selection, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)
Draw (may not be used in this class).
Bool_t IsMerged() const
Int_t InitPacketizer(TDSet *dset, Long64_t nentries, Long64_t first, const char *defpackunit, const char *defpackdata)
Init the packetizer Return 0 on success (fPacketizer is correctly initialized), -1 on failure...
Int_t Unlock()
Unlock the directory.
static TClass * GetClass(const char *name, Bool_t load=kTRUE, Bool_t silent=kFALSE)
Static method returning pointer to TClass of the specified class name.
Definition: TClass.cxx:2881
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Definition: TList.cxx:348
void SetFailedPackets(TList *list)
Bool_t IsRegister() const
void SetSelectorDataMembersFromOutputList()
Set the selector's data members: find the mapping of data members to otuput list entries in the outpu...
Long64_t Finalize(Int_t query=-1, Bool_t force=kFALSE)
Finalize the qry-th query in fQueries.
Definition: TProof.cxx:5870
virtual Bool_t SendSelector(const char *selector_file)
Send the selector file(s) to master or worker nodes.
#define name(a, b)
Definition: linkTestLib0.cpp:5
Bool_t HandleTimer(TTimer *timer)
Handle timer event.
virtual Bool_t Add(TF1 *h1, Double_t c1=1, Option_t *option="")
Performs the operation: this = this + c1*f1 if errors are defined (see TH1::Sumw2), errors are also recalculated.
Definition: TH1.cxx:780
TAxis * GetZaxis()
Definition: TH1.h:321
virtual void Add(const TEventList *list)
Merge contents of alist with this list.
Definition: TEventList.cxx:114
void Throw(int code)
If an exception context has been set (using the TRY and RETRY macros) jump back to where it was set...
Definition: TException.cxx:27
virtual Int_t DrawCanvas(TObject *obj)
Draw the object if it is a canvas.
FILE * fLogFileW
Definition: TProof.h:540
Mother of all ROOT objects.
Definition: TObject.h:58
void Lookup(Bool_t removeMissing=kFALSE, TList **missingFiles=0)
Resolve the end-point URL for the current elements of this data set If the removeMissing option is se...
Definition: TDSet.cxx:1577
const char Int_t const char TProof * proof
Definition: TXSlave.cxx:46
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition: TList.cxx:556
void StopFeedback()
Stop feedback.
virtual Int_t GetNbinsY() const
Definition: TH1.h:297
virtual Long64_t * GetList() const
Definition: TEventList.h:57
typedef void((*Func_t)())
TObject * HandleHistogram(TObject *obj, Bool_t &merged)
Low statistic histograms need a special treatment when using autobin.
TUrl * GetCurrentUrl() const
Return the current url.
Definition: TFileInfo.cxx:246
Int_t Collect(ESlaves list=kActive, Long_t timeout=-1, Int_t endtype=-1, Bool_t deactonfail=kFALSE)
Collect responses from the slave servers.
Definition: TProof.cxx:2722
void Progress(Long64_t total, Long64_t processed)
Report progress (may not be used in this class).
static void SetMacroPath(const char *newpath)
Set or extend the macro search path.
Definition: TROOT.cxx:2440
virtual Bool_t cd(const char *path=0)
Change current directory to "this" directory.
Definition: TDirectory.cxx:433
R__EXTERN TProofServ * gProofServ
Definition: TProofServ.h:359
Int_t AddOutputObject(TObject *obj)
Incorporate output object (may not be used in this class).
virtual void Add(TObject *obj)
Definition: TList.h:81
const Ssiz_t kNPOS
Definition: Rtypes.h:115
Int_t Incorporate(TObject *obj, TList *out, Bool_t &merged)
Incorporate object 'newobj' in the list 'outlist'.
Class that contains a list of TFileInfo's and accumulated meta data information about its entries...
void Execute(const char *, const char *, int *=0)
Execute method on this object with the given parameter string, e.g.
Definition: TMethodCall.h:68
void SetObject(TObject *object)
Set the object to be notified at time out.
Definition: TTimer.cxx:182
void Progress(Long64_t total, Long64_t processed)
Progress signal.
Definition: TProofPlayer.h:445
TFileCollection * GetFileCollection()
Get instance of the file collection to be used in 'dataset' mode.
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition: TString.h:567
TF1 * f1
Definition: legend1.C:11
void SetParam(Long_t l)
Add a long method parameter.
Double_t GetRate() const
Bool_t IsTopMaster() const
Definition: TProofServ.h:307
const char * GetDir(Bool_t raw=kFALSE) const
ClassImp(TSlaveInfo) Int_t TSlaveInfo const TSlaveInfo * si
Used to sort slaveinfos by ordinal.
Definition: TProof.cxx:167
R__EXTERN Int_t gDebug
Definition: Rtypes.h:128
void StopProcess(Bool_t abort, Int_t timeout=-1)
Stop the process after this event.
TVirtualPacketizer * fPacketizer
Definition: TProofPlayer.h:303
virtual TIterator * MakeIterator(Bool_t dir=kIterForward) const
Return a list iterator.
Definition: TList.cxx:603
Int_t GetDrawArgs(const char *var, const char *sel, Option_t *opt, TString &selector, TString &objname)
Parse the arguments from var, sel and opt and fill the selector and object name accordingly.
virtual void StoreFeedback(TObject *slave, TList *out)
Store feedback results from the specified slave.
const char * GetType() const
Definition: TDSet.h:228
void Reset()
Definition: TStopwatch.h:54
TObject * GetOutput(const char *name) const
Get output object by name.
Int_t GetTotSessions() const
Definition: TProofServ.h:274
Float_t GetMaxProcTime() const
Definition: TDSet.h:144
void SetExitStatus(Int_t est)
Definition: TStatus.h:73
static void Stop()
Terminate the PROOF statistics run.
Definition: TPerfStats.cxx:766
virtual Long64_t GetEntries() const
Definition: TTree.h:382
void StoreFeedback(TObject *slave, TList *out)
Store feedback list (may not be used in this class).
Long64_t GetCacheSize()
Return the size in bytes of the cache.
A TTree object has a header with a name and a title.
Definition: TTree.h:94
Float_t fMBRateI
Definition: TProof.h:192
static void SetLastMsg(const char *lastmsg)
Set the message to be sent back in case of exceptions.
Class describing a generic file including meta information.
Definition: TFileInfo.h:50
Bool_t IsMerge() const
void ResetBit(UInt_t f)
Definition: TObject.h:172
Bool_t fCreateSelObj
the latest selector
Definition: TProofPlayer.h:86
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
Definition: TSystem.cxx:1243
TList * GetConfigParams(Bool_t steal=kFALSE)
virtual void PrintFiles(Option_t *options)
Print list of files being merged.
virtual Int_t IndexOf(const TObject *obj) const
TClass * GetClass() const
Definition: TMessage.h:76
Bool_t IsParallel() const
True if in parallel mode.
Float_t GetInitTime() const
Definition: TSlave.h:50
static void output(int code)
Definition: gifencode.c:226
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
Definition: TSelector.h:39
static void ErrorHandler(Int_t level, Bool_t abort, const char *location, const char *msg)
The PROOF error handler function.
const Bool_t kTRUE
Definition: Rtypes.h:91
float * q
Definition: THbookFile.cxx:87
virtual void Reset()
Reset merger file list.
virtual void SetTitle(const char *title="")
Change (i.e. set) the title of the TNamed.
Definition: TNamed.cxx:152
TObject * obj
virtual TObject * FindObject(const char *name) const
Must be redefined in derived classes.
Definition: TObject.cxx:379
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:27
static void AutoBinFunc(TString &key, Double_t &xmin, Double_t &xmax, Double_t &ymin, Double_t &ymax, Double_t &zmin, Double_t &zmax)
Int_t GetParallel() const
Returns number of slaves active in parallel mode.
Definition: TProof.cxx:2299
static void SetMemValues()
Record memory usage.
Definition: TPerfStats.cxx:780
const Int_t n
Definition: legend1.C:16
Long64_t DrawSelect(TDSet *set, const char *varexp, const char *selection, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)
Draw (support for TChain::Draw()).
virtual Int_t Write(const char *name=0, Int_t option=0, Int_t bufsize=0)
Write all objects in this collection.
void NotifyMemory(TObject *obj)
Printout the memory record after merging object 'obj' This record is used by the memory monitor...
Bool_t IsValid() const
Return true if the method call has been properly initialized and is usable.
void StoreOutput(TList *out)
Store received output list.
TStopwatch * fMergeSTW
Definition: TProofPlayer.h:312
Long_t fMemResident
Definition: TSystem.h:206
const char * GetFile() const
Definition: TUrl.h:78
virtual int GetSysInfo(SysInfo_t *info) const
Returns static system info, like OS type, CPU type, number of CPUs RAM size, etc into the SysInfo_t s...
Definition: TSystem.cxx:2412
void SetProcessing(Bool_t on=kTRUE)
Set processing bit according to 'on'.
TAxis * GetXaxis()
Definition: TH1.h:319
void Progress(Long64_t total, Long64_t processed)
Progress signal.
virtual ~TProofPlayer()
Destructor.
virtual Long64_t Merge(TCollection *list)
Add all histograms in the collection to this histogram.
Definition: TH1.cxx:5305
virtual void SetIncludePath(const char *includePath)
IncludePath should contain the list of compiler flags to indicate where to find user defined header f...
Definition: TSystem.cxx:4007
ClassImp(TProofPlayerLocal) Long64_t TProofPlayerLocal
Process the specified TSelector object 'nentries' times.
const char * GetDirectory() const
Return directory where to look for object.
Definition: TDSet.cxx:256
static Int_t SendInputData(TQueryResult *qr, TProof *p, TString &emsg)
Send the input data file to the workers.
Definition: TProof.cxx:12948
void Feedback(TList *objs)
Set feedback list (may not be used in this class).
TDrawFeedback * CreateDrawFeedback(TProof *p)
Draw feedback creation proxy.
Bool_t fSavePartialResults
Definition: TProofPlayer.h:117
Int_t Remove(TDSetElement *elem, Bool_t deleteElem=kTRUE)
Remove TDSetElement 'elem' from the list.
Definition: TDSet.cxx:1548
const char * GetDataDirOpts() const
Definition: TProofServ.h:263
Stopwatch class.
Definition: TStopwatch.h:30
virtual Int_t AddWorkers(TList *workers)
Adds new workers.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:904