Logo ROOT   6.07/09
Reference Guide
TProofPlayer.cxx
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: Maarten Ballintijn 07/01/02
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2001, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /** \class TProofPlayer
13 \ingroup proofkernel
14 
15 Internal class steering processing in PROOF.
16 Instances of the TProofPlayer class are created on the worker nodes
17 per session and do the processing.
18 Instances of its subclass - TProofPlayerRemote are created per each
19 query on the master(s) and on the client. On the master(s),
20 TProofPlayerRemote coordinate processing, check the dataset, create
21 the packetizer and take care of merging the results of the workers.
22 The instance on the client collects information on the input
23 (dataset and selector), it invokes the Begin() method and finalizes
24 the query by calling Terminate().
25 
26 */
27 
28 #include "TProofDraw.h"
29 #include "TProofPlayer.h"
30 #include "THashList.h"
31 #include "TEnv.h"
32 #include "TEventIter.h"
33 #include "TVirtualPacketizer.h"
34 #include "TSelector.h"
35 #include "TSocket.h"
36 #include "TProofServ.h"
37 #include "TProof.h"
38 #include "TProofOutputFile.h"
39 #include "TProofSuperMaster.h"
40 #include "TSlave.h"
41 #include "TClass.h"
42 #include "TROOT.h"
43 #include "TError.h"
44 #include "TException.h"
45 #include "MessageTypes.h"
46 #include "TMessage.h"
47 #include "TDSetProxy.h"
48 #include "TString.h"
49 #include "TSystem.h"
50 #include "TFile.h"
51 #include "TFileCollection.h"
52 #include "TFileInfo.h"
53 #include "TFileMerger.h"
54 #include "TProofDebug.h"
55 #include "TTimer.h"
56 #include "TMap.h"
57 #include "TPerfStats.h"
58 #include "TStatus.h"
59 #include "TEventList.h"
60 #include "TProofLimitsFinder.h"
61 #include "THashList.h"
62 #include "TSortedList.h"
63 #include "TTree.h"
64 #include "TEntryList.h"
65 #include "TDSet.h"
66 #include "TDrawFeedback.h"
67 #include "TNamed.h"
68 #include "TObjString.h"
69 #include "TQueryResult.h"
70 #include "TMD5.h"
71 #include "TMethodCall.h"
72 #include "TObjArray.h"
73 #include "TH1.h"
74 #include "TVirtualMonitoring.h"
75 #include "TParameter.h"
77 #include "TStopwatch.h"
78 
79 // Timeout exception
80 #define kPEX_STOPPED 1001
81 #define kPEX_ABORTED 1002
82 
83 // To flag an abort condition: use a local static variable to avoid
84 // warnings about problems with longjumps
85 static Bool_t gAbort = kFALSE;
86 
87 class TAutoBinVal : public TNamed {
88 private:
89  Double_t fXmin, fXmax, fYmin, fYmax, fZmin, fZmax;
90 
91 public:
92  TAutoBinVal(const char *name, Double_t xmin, Double_t xmax, Double_t ymin,
93  Double_t ymax, Double_t zmin, Double_t zmax) : TNamed(name,"")
94  {
95  fXmin = xmin; fXmax = xmax;
96  fYmin = ymin; fYmax = ymax;
97  fZmin = zmin; fZmax = zmax;
98  }
99  void GetAll(Double_t& xmin, Double_t& xmax, Double_t& ymin,
100  Double_t& ymax, Double_t& zmin, Double_t& zmax)
101  {
102  xmin = fXmin; xmax = fXmax;
103  ymin = fYmin; ymax = fYmax;
104  zmin = fZmin; zmax = fZmax;
105  }
106 
107 };
108 
109 //
110 // Special timer to dispatch pending events while processing
111 ////////////////////////////////////////////////////////////////////////////////
112 
113 class TDispatchTimer : public TTimer {
114 private:
115  TProofPlayer *fPlayer;
116 
117 public:
118  TDispatchTimer(TProofPlayer *p) : TTimer(1000, kFALSE), fPlayer(p) { }
119 
120  Bool_t Notify();
121 };
122 ////////////////////////////////////////////////////////////////////////////////
123 /// Handle expiration of the timer associated with dispatching pending
124 /// events while processing. We must act as fast as possible here, so
125 /// we just set a flag submitting a request for dispatching pending events
126 
127 Bool_t TDispatchTimer::Notify()
128 {
129  if (gDebug > 0) printf("TDispatchTimer::Notify: called!\n");
130 
131  fPlayer->SetBit(TProofPlayer::kDispatchOneEvent);
132 
133  // Needed for the next shot
134  Reset();
135  return kTRUE;
136 }
137 
138 //
139 // Special timer to notify reach of max packet proc time
140 ////////////////////////////////////////////////////////////////////////////////
141 
142 class TProctimeTimer : public TTimer {
143 private:
144  TProofPlayer *fPlayer;
145 
146 public:
147  TProctimeTimer(TProofPlayer *p, Long_t to) : TTimer(to, kFALSE), fPlayer(p) { }
148 
149  Bool_t Notify();
150 };
151 ////////////////////////////////////////////////////////////////////////////////
152 /// Handle expiration of the timer associated with dispatching pending
153 /// events while processing. We must act as fast as possible here, so
154 /// we just set a flag submitting a request for dispatching pending events
155 
156 Bool_t TProctimeTimer::Notify()
157 {
158  if (gDebug > 0) printf("TProctimeTimer::Notify: called!\n");
159 
160  fPlayer->SetBit(TProofPlayer::kMaxProcTimeReached);
161 
162  // One shot only
163  return kTRUE;
164 }
165 
166 //
167 // Special timer to handle stop/abort request via exception raising
168 ////////////////////////////////////////////////////////////////////////////////
169 
170 class TStopTimer : public TTimer {
171 private:
172  Bool_t fAbort;
173  TProofPlayer *fPlayer;
174 
175 public:
176  TStopTimer(TProofPlayer *p, Bool_t abort, Int_t to);
177 
178  Bool_t Notify();
179 };
180 
181 ////////////////////////////////////////////////////////////////////////////////
182 /// Constructor for the timer to stop/abort processing.
183 /// The 'timeout' is in seconds.
184 /// Make sure that 'to' make sense, i.e. not larger than 10 days;
185 /// the minimum value is 10 ms (0 does not seem to start the timer ...).
186 
187 TStopTimer::TStopTimer(TProofPlayer *p, Bool_t abort, Int_t to)
188  : TTimer(((to <= 0 || to > 864000) ? 10 : to * 1000), kFALSE)
189 {
190  if (gDebug > 0)
191  Info ("TStopTimer","enter: %d, timeout: %d", abort, to);
192 
193  fPlayer = p;
194  fAbort = abort;
195 
196  if (gDebug > 1)
197  Info ("TStopTimer","timeout set to %s ms", fTime.AsString());
198 }
199 
200 ////////////////////////////////////////////////////////////////////////////////
201 /// Handle the signal coming from the expiration of the timer
202 /// associated with an abort or stop request.
203 /// We raise an exception which will be processed in the
204 /// event loop.
205 
206 Bool_t TStopTimer::Notify()
207 {
208  if (gDebug > 0) printf("TStopTimer::Notify: called!\n");
209 
210  if (fAbort)
212  else
214 
215  return kTRUE;
216 }
217 
218 //------------------------------------------------------------------------------
219 
221 
223 
224 ////////////////////////////////////////////////////////////////////////////////
225 /// Default ctor.
226 
228  : fAutoBins(0), fOutput(0), fSelector(0), fCreateSelObj(kTRUE), fSelectorClass(0),
229  fFeedbackTimer(0), fFeedbackPeriod(2000),
230  fEvIter(0), fSelStatus(0),
231  fTotalEvents(0), fReadBytesRun(0), fReadCallsRun(0), fProcessedRun(0),
232  fQueryResults(0), fQuery(0), fPreviousQuery(0), fDrawQueries(0),
233  fMaxDrawQueries(1), fStopTimer(0), fDispatchTimer(0),
234  fProcTimeTimer(0), fProcTime(0),
235  fOutputFile(0),
236  fSaveMemThreshold(-1), fSavePartialResults(kFALSE), fSaveResultsPerPacket(kFALSE)
237 {
238  fInput = new TList;
245 
246  static Bool_t initLimitsFinder = kFALSE;
247  if (!initLimitsFinder && gProofServ && !gProofServ->IsMaster()) {
249  initLimitsFinder = kTRUE;
250  }
251 
252 }
253 
254 ////////////////////////////////////////////////////////////////////////////////
255 /// Destructor.
256 
258 {
259  fInput->Clear("nodelete");
261  // The output list is owned by fSelector and destroyed in there
270 }
271 
272 ////////////////////////////////////////////////////////////////////////////////
273 /// Set processing bit according to 'on'
274 
276 {
277  if (on)
279  else
281 }
282 
283 ////////////////////////////////////////////////////////////////////////////////
284 /// Stop the process after this event. If timeout is positive, start
285 /// a timer firing after timeout seconds to hard-stop time-expensive
286 /// events.
287 
289 {
290  if (gDebug > 0)
291  Info ("StopProcess","abort: %d, timeout: %d", abort, timeout);
292 
293  if (fEvIter != 0)
294  fEvIter->StopProcess(abort);
295  Long_t to = 1;
296  if (abort == kTRUE) {
298  } else {
300  to = timeout;
301  }
302  // Start countdown, if needed
303  if (to > 0)
304  SetStopTimer(kTRUE, abort, to);
305 }
306 
307 ////////////////////////////////////////////////////////////////////////////////
308 /// Enable/disable the timer to dispatch pening events while processing.
309 
311 {
314  if (on) {
315  fDispatchTimer = new TDispatchTimer(this);
317  }
318 }
319 
320 ////////////////////////////////////////////////////////////////////////////////
321 /// Enable/disable the timer to stop/abort processing.
322 /// The 'timeout' is in seconds.
323 
325 {
326  std::lock_guard<std::mutex> lock(fStopTimerMtx);
327 
328  // Clean-up the timer
330  if (on) {
331  // create timer
332  fStopTimer = new TStopTimer(this, abort, timeout);
333  // Start the countdown
334  fStopTimer->Start();
335  if (gDebug > 0)
336  Info ("SetStopTimer", "%s timer STARTED (timeout: %d)",
337  (abort ? "ABORT" : "STOP"), timeout);
338  } else {
339  if (gDebug > 0)
340  Info ("SetStopTimer", "timer STOPPED");
341  }
342 }
343 
344 ////////////////////////////////////////////////////////////////////////////////
345 /// Add query result to the list, making sure that there are no
346 /// duplicates.
347 
349 {
350  if (!q) {
351  Warning("AddQueryResult","query undefined - do nothing");
352  return;
353  }
354 
355  // Treat differently normal and draw queries
356  if (!(q->IsDraw())) {
357  if (!fQueryResults) {
358  fQueryResults = new TList;
359  fQueryResults->Add(q);
360  } else {
361  TIter nxr(fQueryResults);
362  TQueryResult *qr = 0;
363  TQueryResult *qp = 0;
364  while ((qr = (TQueryResult *) nxr())) {
365  // If same query, remove old version and break
366  if (*qr == *q) {
367  fQueryResults->Remove(qr);
368  delete qr;
369  break;
370  }
371  // Record position according to start time
372  if (qr->GetStartTime().Convert() <= q->GetStartTime().Convert())
373  qp = qr;
374  }
375 
376  if (!qp) {
378  } else {
379  fQueryResults->AddAfter(qp, q);
380  }
381  }
382  } else if (IsClient()) {
383  // If max reached, eliminate first the oldest one
385  TIter nxr(fQueryResults);
386  TQueryResult *qr = 0;
387  while ((qr = (TQueryResult *) nxr())) {
388  // If same query, remove old version and break
389  if (qr->IsDraw()) {
390  fDrawQueries--;
391  fQueryResults->Remove(qr);
392  delete qr;
393  break;
394  }
395  }
396  }
397  // Add new draw query
398  if (fDrawQueries >= 0 && fDrawQueries < fMaxDrawQueries) {
399  fDrawQueries++;
400  if (!fQueryResults)
401  fQueryResults = new TList;
402  fQueryResults->Add(q);
403  }
404  }
405 }
406 
407 ////////////////////////////////////////////////////////////////////////////////
408 /// Remove all query result instances referenced 'ref' from
409 /// the list of results.
410 
411 void TProofPlayer::RemoveQueryResult(const char *ref)
412 {
413  if (fQueryResults) {
414  TIter nxq(fQueryResults);
415  TQueryResult *qr = 0;
416  while ((qr = (TQueryResult *) nxq())) {
417  if (qr->Matches(ref)) {
418  fQueryResults->Remove(qr);
419  delete qr;
420  }
421  }
422  }
423 }
424 
425 ////////////////////////////////////////////////////////////////////////////////
426 /// Get query result instances referenced 'ref' from
427 /// the list of results.
428 
430 {
431  if (fQueryResults) {
432  if (ref && strlen(ref) > 0) {
433  TIter nxq(fQueryResults);
434  TQueryResult *qr = 0;
435  while ((qr = (TQueryResult *) nxq())) {
436  if (qr->Matches(ref))
437  return qr;
438  }
439  } else {
440  // Get last
441  return (TQueryResult *) fQueryResults->Last();
442  }
443  }
444 
445  // Nothing found
446  return (TQueryResult *)0;
447 }
448 
449 ////////////////////////////////////////////////////////////////////////////////
450 /// Set current query and save previous value.
451 
453 {
455  fQuery = q;
456 }
457 
458 ////////////////////////////////////////////////////////////////////////////////
459 /// Add object to input list.
460 
462 {
463  fInput->Add(inp);
464 }
465 
466 ////////////////////////////////////////////////////////////////////////////////
467 /// Clear input list.
468 
470 {
471  fInput->Clear();
472 }
473 
474 ////////////////////////////////////////////////////////////////////////////////
475 /// Get output object by name.
476 
478 {
479  if (fOutput)
480  return fOutput->FindObject(name);
481  return 0;
482 }
483 
484 ////////////////////////////////////////////////////////////////////////////////
485 /// Get output list.
486 
488 {
489  TList *ol = fOutput;
490  if (!ol && fQuery)
491  ol = fQuery->GetOutputList();
492  return ol;
493 }
494 
495 ////////////////////////////////////////////////////////////////////////////////
496 /// Reinitialize fSelector using the selector files in the query result.
497 /// Needed when Finalize is called after a Process execution for the same
498 /// selector name.
499 
501 {
502  Int_t rc = 0;
503 
504  // Make sure we have a query
505  if (!qr) {
506  Info("ReinitSelector", "query undefined - do nothing");
507  return -1;
508  }
509 
510  // Selector name
511  TString selec = qr->GetSelecImp()->GetName();
512  if (selec.Length() <= 0) {
513  Info("ReinitSelector", "selector name undefined - do nothing");
514  return -1;
515  }
516 
517  // Find out if this is a standard selection used for Draw actions
518  Bool_t stdselec = TSelector::IsStandardDraw(selec);
519 
520  // Find out if this is a precompiled selector: in such a case we do not
521  // have the code in TMacros, so we must rely on local libraries
522  Bool_t compselec = (selec.Contains(".") || stdselec) ? kFALSE : kTRUE;
523 
524  // If not, find out if it needs to be expanded
525  TString ipathold;
526  if (!stdselec && !compselec) {
527  // Check checksums for the versions of the selector files
528  Bool_t expandselec = kTRUE;
529  TString dir, ipath;
530  char *selc = gSystem->Which(TROOT::GetMacroPath(), selec, kReadPermission);
531  if (selc) {
532  // Check checksums
533  TMD5 *md5icur = 0, *md5iold = 0, *md5hcur = 0, *md5hold = 0;
534  // Implementation files
535  md5icur = TMD5::FileChecksum(selc);
536  md5iold = qr->GetSelecImp()->Checksum();
537  // Header files
538  TString selh(selc);
539  Int_t dot = selh.Last('.');
540  if (dot != kNPOS) selh.Remove(dot);
541  selh += ".h";
543  md5hcur = TMD5::FileChecksum(selh);
544  md5hold = qr->GetSelecHdr()->Checksum();
545 
546  // If nothing has changed nothing to do
547  if (md5hcur && md5hold && md5icur && md5iold)
548  if (*md5hcur == *md5hold && *md5icur == *md5iold)
549  expandselec = kFALSE;
550 
551  SafeDelete(md5icur);
552  SafeDelete(md5hcur);
553  SafeDelete(md5iold);
554  SafeDelete(md5hold);
555  if (selc) delete [] selc;
556  }
557 
558  Bool_t ok = kTRUE;
559  // Expand selector files, if needed
560  if (expandselec) {
561 
562  ok = kFALSE;
563  // Expand files in a temporary directory
564  TUUID u;
565  dir = Form("%s/%s",gSystem->TempDirectory(),u.AsString());
566  if (!(gSystem->MakeDirectory(dir))) {
567 
568  // Export implementation file
569  selec = Form("%s/%s",dir.Data(),selec.Data());
570  qr->GetSelecImp()->SaveSource(selec);
571 
572  // Export header file
573  TString seleh = Form("%s/%s",dir.Data(),qr->GetSelecHdr()->GetName());
574  qr->GetSelecHdr()->SaveSource(seleh);
575 
576  // Adjust include path
577  ipathold = gSystem->GetIncludePath();
578  ipath = Form("-I%s %s", dir.Data(), gSystem->GetIncludePath());
579  gSystem->SetIncludePath(ipath.Data());
580 
581  ok = kTRUE;
582  }
583  }
584  TString opt(qr->GetOptions());
585  Ssiz_t id = opt.Last('#');
586  if (id != kNPOS && id < opt.Length() - 1)
587  selec += opt(id + 1, opt.Length());
588 
589  if (!ok) {
590  Info("ReinitSelector", "problems locating or exporting selector files");
591  return -1;
592  }
593  }
594 
595  // Cleanup previous stuff
597  fSelectorClass = 0;
598 
599  // Init the selector now
600  Int_t iglevelsave = gErrorIgnoreLevel;
601  if (compselec)
602  // Silent error printout on first attempt
604 
605  if ((fSelector = TSelector::GetSelector(selec))) {
606  if (compselec)
607  gErrorIgnoreLevel = iglevelsave; // restore ignore level
608  fSelectorClass = fSelector->IsA();
610 
611  } else {
612  if (compselec) {
613  gErrorIgnoreLevel = iglevelsave; // restore ignore level
614  // Retry by loading first the libraries listed in TQueryResult, if any
615  if (strlen(qr->GetLibList()) > 0) {
616  TString sl(qr->GetLibList());
617  TObjArray *oa = sl.Tokenize(" ");
618  if (oa) {
619  Bool_t retry = kFALSE;
620  TIter nxl(oa);
621  TObjString *os = 0;
622  while ((os = (TObjString *) nxl())) {
623  TString lib = gSystem->BaseName(os->GetName());
624  if (lib != "lib") {
625  lib.ReplaceAll("-l", "lib");
626  if (gSystem->Load(lib) == 0)
627  retry = kTRUE;
628  }
629  }
630  // Retry now, if the case
631  if (retry)
633  }
634  }
635  }
636  if (!fSelector) {
637  if (compselec)
638  Info("ReinitSelector", "compiled selector re-init failed:"
639  " automatic reload unsuccessful:"
640  " please load manually the correct library");
641  rc = -1;
642  }
643  }
644  if (fSelector) {
645  // Draw needs to reinit temp histos
647  if (stdselec) {
648  ((TProofDraw *)fSelector)->DefVar();
649  } else {
650  // variables may have been initialized in Begin()
651  fSelector->Begin(0);
652  }
653  }
654 
655  // Restore original include path, if needed
656  if (ipathold.Length() > 0)
657  gSystem->SetIncludePath(ipathold.Data());
658 
659  return rc;
660 }
661 
662 ////////////////////////////////////////////////////////////////////////////////
663 /// Incorporate output object (may not be used in this class).
664 
666 {
667  MayNotUse("AddOutputObject");
668  return -1;
669 }
670 
671 ////////////////////////////////////////////////////////////////////////////////
672 /// Incorporate output list (may not be used in this class).
673 
675 {
676  MayNotUse("AddOutput");
677 }
678 
679 ////////////////////////////////////////////////////////////////////////////////
680 /// Store output list (may not be used in this class).
681 
683 {
684  MayNotUse("StoreOutput");
685 }
686 
687 ////////////////////////////////////////////////////////////////////////////////
688 /// Store feedback list (may not be used in this class).
689 
691 {
692  MayNotUse("StoreFeedback");
693 }
694 
695 ////////////////////////////////////////////////////////////////////////////////
696 /// Report progress (may not be used in this class).
697 
698 void TProofPlayer::Progress(Long64_t /*total*/, Long64_t /*processed*/)
699 {
700  MayNotUse("Progress");
701 }
702 
703 ////////////////////////////////////////////////////////////////////////////////
704 /// Report progress (may not be used in this class).
705 
706 void TProofPlayer::Progress(Long64_t /*total*/, Long64_t /*processed*/,
707  Long64_t /*bytesread*/,
708  Float_t /*evtRate*/, Float_t /*mbRate*/,
709  Float_t /*evtrti*/, Float_t /*mbrti*/)
710 {
711  MayNotUse("Progress");
712 }
713 
714 ////////////////////////////////////////////////////////////////////////////////
715 /// Report progress (may not be used in this class).
716 
718 {
719  MayNotUse("Progress");
720 }
721 
722 ////////////////////////////////////////////////////////////////////////////////
723 /// Set feedback list (may not be used in this class).
724 
726 {
727  MayNotUse("Feedback");
728 }
729 
730 ////////////////////////////////////////////////////////////////////////////////
731 /// Draw feedback creation proxy. When accessed via TProof avoids
732 /// link dependency on libProofPlayer.
733 
735 {
736  return new TDrawFeedback(p);
737 }
738 
739 ////////////////////////////////////////////////////////////////////////////////
740 /// Set draw feedback option.
741 
743 {
744  if (f)
745  f->SetOption(opt);
746 }
747 
748 ////////////////////////////////////////////////////////////////////////////////
749 /// Delete draw feedback object.
750 
752 {
753  delete f;
754 }
755 
756 ////////////////////////////////////////////////////////////////////////////////
757 /// Save the partial results of this query to a dedicated file under the user
758 /// data directory. The file name has the form
759 /// <session_tag>.q<query_seq_num>.root
760 /// The file pat and the file are created if not existing already.
761 /// Only objects in the outputlist not being TProofOutputFile are saved.
762 /// The packets list 'packets' is saved if given.
763 /// Trees not attached to any file are attached to the open file.
764 /// If 'queryend' is kTRUE evrything is written out (TTrees included).
765 /// The actual saving action is controlled by 'force' and by fSavePartialResults /
766 /// fSaveResultsPerPacket:
767 ///
768 /// fSavePartialResults = kFALSE/kTRUE no-saving/saving
769 /// fSaveResultsPerPacket = kFALSE/kTRUE save-per-query/save-per-packet
770 ///
771 /// The function CheckMemUsage sets fSavePartialResults = 1 if fSaveMemThreshold > 0 and
772 /// ProcInfo_t::fMemResident >= fSaveMemThreshold: from that point on partial results
773 /// are always saved and expensive calls to TSystem::GetProcInfo saved.
774 /// The switch fSaveResultsPerPacket is instead controlled by the user or admin
775 /// who can also force saving in all cases; parameter PROOF_SavePartialResults or
776 /// RC env ProofPlayer.SavePartialResults .
777 /// However, if 'force' is kTRUE, fSavePartialResults and fSaveResultsPerPacket
778 /// are ignored.
779 /// Return -1 in case of problems, 0 otherwise.
780 
782 {
783  Bool_t save = (force || (fSavePartialResults &&
784  (queryend || fSaveResultsPerPacket))) ? kTRUE : kFALSE;
785  if (!save) {
786  PDB(kOutput, 2)
787  Info("SavePartialResults", "partial result saving disabled");
788  return 0;
789  }
790 
791  // Sanity check
792  if (!gProofServ) {
793  Error("SavePartialResults", "gProofServ undefined: something really wrong going on!!!");
794  return -1;
795  }
796  if (!fOutput) {
797  Error("SavePartialResults", "fOutput undefined: something really wrong going on!!!");
798  return -1;
799  }
800 
801  PDB(kOutput, 1)
802  Info("SavePartialResults", "start saving partial results {%d,%d,%d,%d}",
803  queryend, force, fSavePartialResults, fSaveResultsPerPacket);
804 
805  // Get list of processed packets from the iterator
806  PDB(kOutput, 2) Info("SavePartialResults", "fEvIter: %p", fEvIter);
807 
808  TList *packets = (fEvIter) ? fEvIter->GetPackets() : 0;
809  PDB(kOutput, 2) Info("SavePartialResults", "list of packets: %p, sz: %d",
810  packets, (packets ? packets->GetSize(): -1));
811 
812  // Open the file
813  const char *oopt = "UPDATE";
814  // Check if the file has already been defined
815  TString baseName(fOutputFilePath);
816  if (fOutputFilePath.IsNull()) {
817  baseName.Form("output-%s.q%d.root", gProofServ->GetTopSessionTag(), gProofServ->GetQuerySeqNum());
818  if (gProofServ->GetDataDirOpts() && strlen(gProofServ->GetDataDirOpts()) > 0) {
819  fOutputFilePath.Form("%s/%s?%s", gProofServ->GetDataDir(), baseName.Data(),
821  } else {
822  fOutputFilePath.Form("%s/%s", gProofServ->GetDataDir(), baseName.Data());
823  }
824  Info("SavePartialResults", "file with (partial) output: '%s'", fOutputFilePath.Data());
825  oopt = "RECREATE";
826  }
827  // Open the file in write mode
828  if (!(fOutputFile = TFile::Open(fOutputFilePath, oopt)) ||
829  (fOutputFile && fOutputFile->IsZombie())) {
830  Error("SavePartialResults", "cannot open '%s' for writing", fOutputFilePath.Data());
832  return -1;
833  }
834 
835  // Save current directory
836  TDirectory *curdir = gDirectory;
837  fOutputFile->cd();
838 
839  // Write first the packets list, if required
840  if (packets) {
841  TDirectory *packetsDir = fOutputFile->mkdir("packets");
842  if (packetsDir) packetsDir->cd();
844  fOutputFile->cd();
845  }
846 
847  Bool_t notempty = kFALSE;
848  // Write out the output list
849  TList torm;
850  TIter nxo(fOutput);
851  TObject *o = 0;
852  while ((o = nxo())) {
853  // Skip output file drivers
854  if (o->InheritsFrom(TProofOutputFile::Class())) continue;
855  // Skip control objets
856  if (!strncmp(o->GetName(), "PROOF_", 6)) continue;
857  // Skip data members mapping
859  // Skip missing file info
860  if (!strcmp(o->GetName(), "MissingFiles")) continue;
861  // Trees need a special treatment
862  if (o->InheritsFrom("TTree")) {
863  TTree *t = (TTree *) o;
864  TDirectory *d = t->GetDirectory();
865  // If the tree is not attached to any file ...
866  if (!d || (d && !d->InheritsFrom("TFile"))) {
867  // ... we attach it
869  }
870  if (t->GetDirectory() == fOutputFile) {
871  if (queryend) {
872  // ... we write it out
873  o->Write(0, TObject::kOverwrite);
874  // At least something in the file
875  notempty = kTRUE;
876  // Flag for removal from the outputlist
877  torm.Add(o);
878  // Prevent double-deletion attempts
879  t->SetDirectory(0);
880  } else {
881  // ... or we set in automatic flush mode
882  t->SetAutoFlush();
883  }
884  }
885  } else if (queryend || fSaveResultsPerPacket) {
886  // Save overwriting what's already there
887  o->Write(0, TObject::kOverwrite);
888  // At least something in the file
889  notempty = kTRUE;
890  // Flag for removal from the outputlist
891  if (queryend) torm.Add(o);
892  }
893  }
894 
895  // Restore previous directory
896  gDirectory = curdir;
897 
898  // Close the file if required
899  if (notempty) {
900  if (!fOutput->FindObject(baseName)) {
901  TProofOutputFile *po = 0;
902  // Get directions
903  TNamed *nm = (TNamed *) fInput->FindObject("PROOF_DefaultOutputOption");
904  TString oname = (nm) ? nm->GetTitle() : fOutputFilePath.Data();
905  if (nm && oname.BeginsWith("ds:")) {
906  oname.Replace(0, 3, "");
907  TString qtag =
909  oname.ReplaceAll("<qtag>", qtag);
910  // Create the TProofOutputFile for dataset creation
911  po = new TProofOutputFile(baseName, "DRO", oname.Data());
912  } else {
913  Bool_t hasddir = kFALSE;
914  // Create the TProofOutputFile for automatic merging
915  po = new TProofOutputFile(baseName, "M");
916  if (oname.BeginsWith("of:")) oname.Replace(0, 3, "");
917  if (gProofServ->IsTopMaster()) {
918  if (!strcmp(TUrl(oname, kTRUE).GetProtocol(), "file")) {
919  TString dsrv;
921  TProofServ::FilterLocalroot(oname, dsrv);
922  oname.Insert(0, dsrv);
923  }
924  } else {
925  if (nm) {
926  // The name has been sent by the client: resolve local place holders
927  oname.ReplaceAll("<file>", baseName);
928  } else {
929  // We did not get any indication; the final file will be in the datadir on
930  // the top master and it will be resolved there
931  oname.Form("<datadir>/%s", baseName.Data());
932  hasddir = kTRUE;
933  }
934  }
935  po->SetOutputFileName(oname.Data());
936  if (hasddir)
937  // Reset the bit, so that <datadir> has a chance to be resolved in AddOutputObject
939  po->SetName(gSystem->BaseName(oname.Data()));
940  }
941  po->AdoptFile(fOutputFile);
942  fOutput->Add(po);
943  // Flag the nature of this file
945  }
946  }
947  fOutputFile->Close();
949 
950  // If last call, cleanup the output list from objects saved to file
951  if (queryend && torm.GetSize() > 0) {
952  TIter nxrm(&torm);
953  while ((o = nxrm())) { fOutput->Remove(o); }
954  }
955  torm.SetOwner(kFALSE);
956 
957  PDB(kOutput, 1)
958  Info("SavePartialResults", "partial results saved to file");
959  // We are done
960  return 0;
961 }
962 
963 ////////////////////////////////////////////////////////////////////////////////
964 /// Make sure that a valid selector object
965 /// Return -1 in case of problems, 0 otherwise
966 
967 Int_t TProofPlayer::AssertSelector(const char *selector_file)
968 {
969  if (selector_file && strlen(selector_file)) {
971 
972  // Get selector files from cache
973  if (gProofServ) {
975  TString ocwd = gSystem->WorkingDirectory();
977 
978  fSelector = TSelector::GetSelector(selector_file);
979 
980  gSystem->ChangeDirectory(ocwd);
982 
983  if (!fSelector) {
984  Error("AssertSelector", "cannot load: %s", selector_file );
985  return -1;
986  }
987  }
988 
990  Info("AssertSelector", "Processing via filename");
991  } else if (!fSelector) {
992  Error("AssertSelector", "no TSelector object define : cannot continue!");
993  return -1;
994  } else {
995  Info("AssertSelector", "Processing via TSelector object");
996  }
997  // Done
998  return 0;
999 }
1000 ////////////////////////////////////////////////////////////////////////////////
1001 /// Update fProgressStatus
1002 
1004 {
1005  if (fProgressStatus) {
1010  if (gMonitoringWriter)
1013  fProcessedRun = 0;
1014  }
1015 }
1016 
1017 ////////////////////////////////////////////////////////////////////////////////
1018 /// Process specified TDSet on PROOF worker.
1019 /// The return value is -1 in case of error and TSelector::GetStatus()
1020 /// in case of success.
1021 
1022 Long64_t TProofPlayer::Process(TDSet *dset, const char *selector_file,
1023  Option_t *option, Long64_t nentries,
1024  Long64_t first)
1025 {
1026  PDB(kGlobal,1) Info("Process","Enter");
1027 
1029  fOutput = 0;
1030 
1031  TCleanup clean(this);
1032 
1033  fSelectorClass = 0;
1034  TString wmsg;
1035  TRY {
1036  if (AssertSelector(selector_file) != 0 || !fSelector) {
1037  Error("Process", "cannot assert the selector object");
1038  return -1;
1039  }
1040 
1041  fSelectorClass = fSelector->IsA();
1042  Int_t version = fSelector->Version();
1043  if (version == 0 && IsClient()) fSelector->GetOutputList()->Clear();
1044 
1046 
1047  if (gProofServ)
1049 
1050  fSelStatus = new TStatus;
1052 
1053  fSelector->SetOption(option);
1055 
1056  // If in sequential (0-PROOF) mode validate the data set to get
1057  // the number of entries
1059  if (fTotalEvents < 0 && gProofServ &&
1061  dset->Validate();
1062  dset->Reset();
1063  TDSetElement *e = 0;
1064  while ((e = dset->Next())) {
1065  fTotalEvents += e->GetNum();
1066  }
1067  }
1068 
1069  dset->Reset();
1070 
1071  // Set parameters controlling the iterator behaviour
1072  Int_t useTreeCache = 1;
1073  if (TProof::GetParameter(fInput, "PROOF_UseTreeCache", useTreeCache) == 0) {
1074  if (useTreeCache > -1 && useTreeCache < 2)
1075  gEnv->SetValue("ProofPlayer.UseTreeCache", useTreeCache);
1076  }
1077  Long64_t cacheSize = -1;
1078  if (TProof::GetParameter(fInput, "PROOF_CacheSize", cacheSize) == 0) {
1079  TString sz = TString::Format("%lld", cacheSize);
1080  gEnv->SetValue("ProofPlayer.CacheSize", sz.Data());
1081  }
1082  // Parallel unzipping
1083  Int_t useParallelUnzip = 0;
1084  if (TProof::GetParameter(fInput, "PROOF_UseParallelUnzip", useParallelUnzip) == 0) {
1085  if (useParallelUnzip > -1 && useParallelUnzip < 2)
1086  gEnv->SetValue("ProofPlayer.UseParallelUnzip", useParallelUnzip);
1087  }
1088  // OS file caching (Mac Os X only)
1089  Int_t dontCacheFiles = 0;
1090  if (TProof::GetParameter(fInput, "PROOF_DontCacheFiles", dontCacheFiles) == 0) {
1091  if (dontCacheFiles == 1)
1092  gEnv->SetValue("ProofPlayer.DontCacheFiles", 1);
1093  }
1094  fEvIter = TEventIter::Create(dset, fSelector, first, nentries);
1095 
1096  // Control file object swap
1097  // <how>*10 + <force>
1098  // <how> = 0 end of run
1099  // 1 after each packet
1100  // <force> = 0 no, swap only if memory threshold is reached
1101  // 1 swap in all cases, accordingly to <how>
1102  Int_t opt = 0;
1103  if (TProof::GetParameter(fInput, "PROOF_SavePartialResults", opt) != 0) {
1104  opt = gEnv->GetValue("ProofPlayer.SavePartialResults", 0);
1105  }
1106  fSaveResultsPerPacket = (opt >= 10) ? kTRUE : kFALSE;
1107  fSavePartialResults = (opt%10 > 0) ? kTRUE : kFALSE;
1108  Info("Process", "save partial results? %d per-packet? %d", fSavePartialResults, fSaveResultsPerPacket);
1109 
1110  // Memory threshold for file object swap
1111  Float_t memfrac = gEnv->GetValue("ProofPlayer.SaveMemThreshold", -1.);
1112  if (memfrac > 0.) {
1113  // The threshold is per core
1114  SysInfo_t si;
1115  if (gSystem->GetSysInfo(&si) == 0) {
1116  fSaveMemThreshold = (Long_t) ((memfrac * si.fPhysRam * 1024.) / si.fCpus);
1117  Info("Process", "memory threshold for saving objects to file set to %ld kB",
1119  } else {
1120  Error("Process", "cannot get SysInfo_t (!)");
1121  }
1122  }
1123 
1124  if (version == 0) {
1125  PDB(kLoop,1) Info("Process","Call Begin(0)");
1126  fSelector->Begin(0);
1127  } else {
1128  if (IsClient()) {
1129  // on client (for local run)
1130  PDB(kLoop,1) Info("Process","Call Begin(0)");
1131  fSelector->Begin(0);
1132  }
1134  PDB(kLoop,1) Info("Process","Call SlaveBegin(0)");
1135  fSelector->SlaveBegin(0); // Init is called explicitly
1136  // from GetNextEvent()
1137  }
1138  }
1139 
1140  } CATCH(excode) {
1142  Error("Process","exception %d caught", excode);
1144  return -1;
1145  } ENDTRY;
1146 
1147  // Save the results, if needed, closing the file
1148  if (SavePartialResults(kFALSE) < 0)
1149  Warning("Process", "problems seetting up file-object swapping");
1150 
1151  // Create feedback lists, if required
1152  SetupFeedback();
1153 
1154  if (gMonitoringWriter)
1156 
1157  PDB(kLoop,1)
1158  Info("Process","Looping over Process()");
1159 
1160  // get the byte read counter at the beginning of processing
1163  fProcessedRun = 0;
1164  // force the first monitoring info
1165  if (gMonitoringWriter)
1167 
1168  // Start asynchronous timer to dispatch pending events
1170 
1171  // Loop over range
1172  gAbort = kFALSE;
1173  Long64_t entry;
1176 
1177  TRY {
1178 
1179  Int_t mrc = -1;
1180  // Get the frequency for checking memory consumption and logging information
1181  Long64_t memlogfreq = -1;
1182  if (((mrc = TProof::GetParameter(fInput, "PROOF_MemLogFreq", memlogfreq))) != 0) memlogfreq = -1;
1183  Long64_t singleshot = 1;
1184  Bool_t warnHWMres = kTRUE, warnHWMvir = kTRUE;
1185  TString lastMsg("(unfortunately no detailed info is available about current packet)");
1186 
1187  // Initial memory footprint
1188  if (!CheckMemUsage(singleshot, warnHWMres, warnHWMvir, wmsg)) {
1189  Error("Process", "%s", wmsg.Data());
1190  wmsg.Insert(0, TString::Format("ERROR:%s, after SlaveBegin(), ", gProofServ->GetOrdinal()));
1191  fSelStatus->Add(wmsg.Data());
1192  if (gProofServ) {
1193  gProofServ->SendAsynMessage(wmsg.Data());
1195  }
1198  } else if (!wmsg.IsNull()) {
1199  Warning("Process", "%s", wmsg.Data());
1200  }
1201 
1202  TPair *currentElem = 0;
1203  // The event loop on the worker
1204  Long64_t fst = -1, num;
1205  Long_t maxproctime = -1;
1206  Bool_t newrun = kFALSE;
1207  while ((fEvIter->GetNextPacket(fst, num) != -1) &&
1210  // This is needed by the inflate infrastructure to calculate
1211  // sleeping times
1213 
1214  // Give the possibility to the selector to access additional info in the
1215  // incoming packet
1216  if (dset->Current()) {
1217  if (!currentElem) {
1218  currentElem = new TPair(new TObjString("PROOF_CurrentElement"), dset->Current());
1219  fInput->Add(currentElem);
1220  } else {
1221  if (currentElem->Value() != dset->Current()) {
1222  currentElem->SetValue(dset->Current());
1223  } else if (dset->Current()->TestBit(TDSetElement::kNewRun)) {
1225  }
1226  }
1227  if (dset->Current()->TestBit(TDSetElement::kNewPacket)) {
1228  if (dset->TestBit(TDSet::kEmpty)) {
1229  lastMsg = "check logs for possible stacktrace - last cycle:";
1230  } else {
1231  TDSetElement *elem = dynamic_cast<TDSetElement *>(currentElem->Value());
1232  TString fn = (elem) ? elem->GetFileName() : "<undef>";
1233  lastMsg.Form("while processing dset:'%s', file:'%s'"
1234  " - check logs for possible stacktrace - last event:", dset->GetName(), fn.Data());
1235  }
1236  TProofServ::SetLastMsg(lastMsg);
1237  }
1238  // Set the max proc time, if any
1239  if (dset->Current()->GetMaxProcTime() >= 0.)
1240  maxproctime = (Long_t) (1000 * dset->Current()->GetMaxProcTime());
1241  newrun = (dset->Current()->TestBit(TDSetElement::kNewPacket)) ? kTRUE : kFALSE;
1242  }
1243 
1246  // Setup packet proc time measurement
1247  if (maxproctime > 0) {
1248  if (!fProcTimeTimer) fProcTimeTimer = new TProctimeTimer(this, maxproctime);
1249  fProcTimeTimer->Start(maxproctime, kTRUE); // One shot
1250  if (!fProcTime) fProcTime = new TStopwatch();
1251  fProcTime->Reset(); // Reset counters
1252  }
1253  Long64_t refnum = num;
1254  if (refnum < 0 && maxproctime <= 0) {
1255  wmsg.Form("neither entries nor max proc time specified:"
1256  " risk of infinite loop: processing aborted");
1257  Error("Process", "%s", wmsg.Data());
1258  if (gProofServ) {
1259  wmsg.Insert(0, TString::Format("ERROR:%s, entry:%lld, ",
1261  gProofServ->SendAsynMessage(wmsg.Data());
1262  }
1265  break;
1266  }
1267  while (refnum < 0 || num--) {
1268 
1269  // Did we use all our time?
1271  fProcTime->Stop();
1272  if (!newrun && !TestBit(TProofPlayer::kMaxProcTimeExtended) && refnum > 0) {
1273  // How much are we left with?
1274  Float_t xleft = (refnum > num) ? (Float_t) num / (Float_t) (refnum) : 1.;
1275  if (xleft < 0.2) {
1276  // Give another try, 1.5 times the remaining measured expected time
1277  Long_t mpt = (Long_t) (1500 * num / ((Double_t)(refnum - num) / fProcTime->RealTime()));
1279  fProcTimeTimer->Start(mpt, kTRUE); // One shot
1281  }
1282  }
1284  Info("Process", "max proc time reached (%ld msecs): packet processing stopped:\n%s",
1285  maxproctime, lastMsg.Data());
1286 
1287  break;
1288  }
1289  }
1290 
1291  if (!(!fSelStatus->TestBit(TStatus::kNotOk) &&
1292  fSelector->GetAbort() == TSelector::kContinue)) break;
1293 
1294  // Get the netry number, taking into account entry or event lists
1295  entry = fEvIter->GetEntryNumber(fst);
1296  fst++;
1297 
1298  // Set the last entry
1299  TProofServ::SetLastEntry(entry);
1300 
1301  if (fSelector->Version() == 0) {
1302  PDB(kLoop,3)
1303  Info("Process","Call ProcessCut(%lld)", entry);
1304  if (fSelector->ProcessCut(entry)) {
1305  PDB(kLoop,3)
1306  Info("Process","Call ProcessFill(%lld)", entry);
1307  fSelector->ProcessFill(entry);
1308  }
1309  } else {
1310  PDB(kLoop,3)
1311  Info("Process","Call Process(%lld)", entry);
1312  fSelector->Process(entry);
1315  break;
1316  } else if (fSelector->GetAbort() == TSelector::kAbortFile) {
1317  Info("Process", "packet processing aborted following the"
1318  " selector settings:\n%s", lastMsg.Data());
1321  }
1322  }
1324 
1325  // Check the memory footprint, if required
1326  if (memlogfreq > 0 && (GetEventsProcessed() + fProcessedRun)%memlogfreq == 0) {
1327  if (!CheckMemUsage(memlogfreq, warnHWMres, warnHWMvir, wmsg)) {
1328  Error("Process", "%s", wmsg.Data());
1329  if (gProofServ) {
1330  wmsg.Insert(0, TString::Format("ERROR:%s, entry:%lld, ",
1331  gProofServ->GetOrdinal(), entry));
1332  gProofServ->SendAsynMessage(wmsg.Data());
1333  }
1337  break;
1338  } else {
1339  if (!wmsg.IsNull()) {
1340  Warning("Process", "%s", wmsg.Data());
1341  if (gProofServ) {
1342  wmsg.Insert(0, TString::Format("WARNING:%s, entry:%lld, ",
1343  gProofServ->GetOrdinal(), entry));
1344  gProofServ->SendAsynMessage(wmsg.Data());
1345  }
1346  }
1347  }
1348  }
1352  }
1354  if (fSelStatus->TestBit(TStatus::kNotOk) || gROOT->IsInterrupted()) break;
1355 
1356  // Make sure that the selector abort status is reset
1358  fSelector->Abort("status reset", TSelector::kContinue);
1359  }
1360  }
1361 
1362  } CATCH(excode) {
1363  if (excode == kPEX_STOPPED) {
1364  Info("Process","received stop-process signal");
1366  } else if (excode == kPEX_ABORTED) {
1367  gAbort = kTRUE;
1368  Info("Process","received abort-process signal");
1370  } else {
1371  Error("Process","exception %d caught", excode);
1372  // Perhaps we need a dedicated status code here ...
1373  gAbort = kTRUE;
1375  }
1377  } ENDTRY;
1378 
1379  // Clean-up the envelop for the current element
1380  TPair *currentElem = 0;
1381  if ((currentElem = (TPair *) fInput->FindObject("PROOF_CurrentElement"))) {
1382  if ((currentElem = (TPair *) fInput->Remove(currentElem))) {
1383  delete currentElem->Key();
1384  delete currentElem;
1385  }
1386  }
1387 
1388  // Final memory footprint
1389  Long64_t singleshot = 1;
1390  Bool_t warnHWMres = kTRUE, warnHWMvir = kTRUE;
1391  Bool_t shrc = CheckMemUsage(singleshot, warnHWMres, warnHWMvir, wmsg);
1392  if (!wmsg.IsNull()) Warning("Process", "%s (%s)", wmsg.Data(), shrc ? "warn" : "hwm");
1393 
1394  PDB(kGlobal,2)
1395  Info("Process","%lld events processed", fProgressStatus->GetEntries());
1396 
1397  if (gMonitoringWriter) {
1401  }
1402 
1403  // Stop active timers
1405  if (fStopTimer != 0)
1407  if (fFeedbackTimer != 0)
1408  HandleTimer(0);
1409 
1410  StopFeedback();
1411 
1412  // Save the results, if needed, closing the file
1413  if (SavePartialResults(kTRUE) < 0)
1414  Warning("Process", "problems saving the results to file");
1415 
1417 
1418  // Finalize
1419 
1420  if (fExitStatus != kAborted) {
1421 
1422  TIter nxo(GetOutputList());
1423  TObject *o = 0;
1424  while ((o = nxo())) {
1425  // Special treatment for files
1426  if (o->IsA() == TProofOutputFile::Class()) {
1428  of->Print();
1430  const char *dir = of->GetDir();
1431  if (!dir || (dir && strlen(dir) <= 0)) {
1433  } else if (dir && strlen(dir) > 0) {
1434  TUrl u(dir);
1435  if (!strcmp(u.GetHost(), "localhost") || !strcmp(u.GetHost(), "127.0.0.1") ||
1436  !strcmp(u.GetHost(), "localhost.localdomain")) {
1437  u.SetHost(TUrl(gSystem->HostName()).GetHostFQDN());
1438  of->SetDir(u.GetUrl(kTRUE));
1439  }
1440  of->Print();
1441  }
1442  }
1443  }
1444 
1446 
1448  if (fSelector->Version() == 0) {
1449  PDB(kLoop,1) Info("Process","Call Terminate()");
1450  fSelector->Terminate();
1451  } else {
1452  PDB(kLoop,1) Info("Process","Call SlaveTerminate()");
1455  PDB(kLoop,1) Info("Process","Call Terminate()");
1456  fSelector->Terminate();
1457  }
1458  }
1459  }
1460 
1461  // Add Selector status in the output list so it can be returned to the client as done
1462  // by Tree::Process (see ROOT-748). The status from the various workers will be added.
1463  fOutput->Add(new TParameter<Long64_t>("PROOF_SelectorStatus", (Long64_t) fSelector->GetStatus()));
1464 
1465  if (gProofServ && !gProofServ->IsParallel()) { // put all the canvases onto the output list
1466  TIter nxc(gROOT->GetListOfCanvases());
1467  while (TObject *c = nxc())
1468  fOutput->Add(c);
1469  }
1470  }
1471 
1472  if (gProofServ)
1473  TPerfStats::Stop();
1474 
1475  return 0;
1476 }
1477 
1478 ////////////////////////////////////////////////////////////////////////////////
1479 /// Process specified TDSet on PROOF worker with TSelector object
1480 /// The return value is -1 in case of error and TSelector::GetStatus()
1481 /// in case of success.
1482 
1484  Option_t *option, Long64_t nentries,
1485  Long64_t first)
1486 {
1487  if (!selector) {
1488  Error("Process", "selector object undefiend!");
1489  return -1;
1490  }
1491 
1493  fSelector = selector;
1495  return Process(dset, (const char *)0, option, nentries, first);
1496 }
1497 
1498 ////////////////////////////////////////////////////////////////////////////////
1499 /// Not implemented: meaningful only in the remote player. Returns kFALSE.
1500 
1502 {
1503  return kFALSE;
1504 }
1505 
1506 ////////////////////////////////////////////////////////////////////////////////
1507 /// Check the memory usage, if requested.
1508 /// Return kTRUE if OK, kFALSE if above 95% of at least one between virtual or
1509 /// resident limits are depassed.
1510 
1512  Bool_t &w80v, TString &wmsg)
1513 {
1514  Long64_t processed = GetEventsProcessed() + fProcessedRun;
1515  if (mfreq > 0 && processed%mfreq == 0) {
1516  // Record the memory information
1517  ProcInfo_t pi;
1518  if (!gSystem->GetProcInfo(&pi)){
1519  wmsg = "";
1520  if (gProofServ)
1521  Info("CheckMemUsage|Svc", "Memory %ld virtual %ld resident event %lld",
1522  pi.fMemVirtual, pi.fMemResident, processed);
1523  // Save info in TStatus
1525  // Apply limit on virtual memory, if any: warn if above 80%, stop if above 95% of max
1526  if (TProofServ::GetVirtMemMax() > 0) {
1528  wmsg.Form("using more than %d%% of allowed virtual memory (%ld kB)"
1529  " - STOP processing", (Int_t) (TProofServ::GetMemStop() * 100), pi.fMemVirtual);
1530  return kFALSE;
1531  } else if (pi.fMemVirtual > TProofServ::GetMemHWM() * TProofServ::GetVirtMemMax() && w80v) {
1532  // Refine monitoring
1533  mfreq = 1;
1534  wmsg.Form("using more than %d%% of allowed virtual memory (%ld kB)",
1535  (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemVirtual);
1536  w80v = kFALSE;
1537  }
1538  }
1539  // Apply limit on resident memory, if any: warn if above 80%, stop if above 95% of max
1540  if (TProofServ::GetResMemMax() > 0) {
1542  wmsg.Form("using more than %d%% of allowed resident memory (%ld kB)"
1543  " - STOP processing", (Int_t) (TProofServ::GetMemStop() * 100), pi.fMemResident);
1544  return kFALSE;
1545  } else if (pi.fMemResident > TProofServ::GetMemHWM() * TProofServ::GetResMemMax() && w80r) {
1546  // Refine monitoring
1547  mfreq = 1;
1548  if (wmsg.Length() > 0) {
1549  wmsg.Form("using more than %d%% of allowed both virtual and resident memory ({%ld,%ld} kB)",
1550  (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemVirtual, pi.fMemResident);
1551  } else {
1552  wmsg.Form("using more than %d%% of allowed resident memory (%ld kB)",
1553  (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemResident);
1554  }
1555  w80r = kFALSE;
1556  }
1557  }
1558  // In saving-partial-results mode flag the saving regime when reached to save expensive calls
1559  // to TSystem::GetProcInfo in SavePartialResults
1561  }
1562  }
1563  // Done
1564  return kTRUE;
1565 }
1566 
1567 ////////////////////////////////////////////////////////////////////////////////
1568 /// Finalize query (may not be used in this class).
1569 
1571 {
1572  MayNotUse("Finalize");
1573  return -1;
1574 }
1575 
1576 ////////////////////////////////////////////////////////////////////////////////
1577 /// Finalize query (may not be used in this class).
1578 
1580 {
1581  MayNotUse("Finalize");
1582  return -1;
1583 }
1584 ////////////////////////////////////////////////////////////////////////////////
1585 /// Merge output (may not be used in this class).
1586 
1588 {
1589  MayNotUse("MergeOutput");
1590  return;
1591 }
1592 
1593 ////////////////////////////////////////////////////////////////////////////////
1594 
1596 {
1598  fOutput->Add(olsdm);
1599 }
1600 
1601 ////////////////////////////////////////////////////////////////////////////////
1602 /// Update automatic binning parameters for given object "name".
1603 
1607  Double_t& zmin, Double_t& zmax)
1608 {
1609  if ( fAutoBins == 0 ) {
1610  fAutoBins = new THashList;
1611  }
1612 
1613  TAutoBinVal *val = (TAutoBinVal*) fAutoBins->FindObject(name);
1614 
1615  if ( val == 0 ) {
1616  //look for info in higher master
1617  if (gProofServ && !gProofServ->IsTopMaster()) {
1618  TString key = name;
1619  TProofLimitsFinder::AutoBinFunc(key,xmin,xmax,ymin,ymax,zmin,zmax);
1620  }
1621 
1622  val = new TAutoBinVal(name,xmin,xmax,ymin,ymax,zmin,zmax);
1623  fAutoBins->Add(val);
1624  } else {
1625  val->GetAll(xmin,xmax,ymin,ymax,zmin,zmax);
1626  }
1627 }
1628 
1629 ////////////////////////////////////////////////////////////////////////////////
1630 /// Get next packet (may not be used in this class).
1631 
1633 {
1634  MayNotUse("GetNextPacket");
1635  return 0;
1636 }
1637 
1638 ////////////////////////////////////////////////////////////////////////////////
1639 /// Set up feedback (may not be used in this class).
1640 
1642 {
1643  MayNotUse("SetupFeedback");
1644 }
1645 
1646 ////////////////////////////////////////////////////////////////////////////////
1647 /// Stop feedback (may not be used in this class).
1648 
1650 {
1651  MayNotUse("StopFeedback");
1652 }
1653 
1654 ////////////////////////////////////////////////////////////////////////////////
1655 /// Draw (may not be used in this class).
1656 
1657 Long64_t TProofPlayer::DrawSelect(TDSet * /*set*/, const char * /*varexp*/,
1658  const char * /*selection*/, Option_t * /*option*/,
1659  Long64_t /*nentries*/, Long64_t /*firstentry*/)
1660 {
1661  MayNotUse("DrawSelect");
1662  return -1;
1663 }
1664 
1665 ////////////////////////////////////////////////////////////////////////////////
1666 /// Handle tree header request.
1667 
1669 {
1670  MayNotUse("HandleGetTreeHeader|");
1671 }
1672 
1673 ////////////////////////////////////////////////////////////////////////////////
1674 /// Receive histo from slave.
1675 
1677 {
1678  TObject *obj = mess->ReadObject(mess->GetClass());
1679  if (obj->InheritsFrom(TH1::Class())) {
1680  TH1 *h = (TH1*)obj;
1681  h->SetDirectory(0);
1682  TH1 *horg = (TH1*)gDirectory->GetList()->FindObject(h->GetName());
1683  if (horg)
1684  horg->Add(h);
1685  else
1687  }
1688 }
1689 
1690 ////////////////////////////////////////////////////////////////////////////////
1691 /// Draw the object if it is a canvas.
1692 /// Return 0 in case of success, 1 if it is not a canvas or libProofDraw
1693 /// is not available.
1694 
1696 {
1697  static Int_t (*gDrawCanvasHook)(TObject *) = 0;
1698 
1699  // Load the library the first time
1700  if (!gDrawCanvasHook) {
1701  // Load library needed for graphics ...
1702  TString drawlib = "libProofDraw";
1703  char *p = 0;
1704  if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
1705  delete[] p;
1706  if (gSystem->Load(drawlib) != -1) {
1707  // Locate DrawCanvas
1708  Func_t f = 0;
1709  if ((f = gSystem->DynFindSymbol(drawlib,"DrawCanvas")))
1710  gDrawCanvasHook = (Int_t (*)(TObject *))(f);
1711  else
1712  Warning("DrawCanvas", "can't find DrawCanvas");
1713  } else
1714  Warning("DrawCanvas", "can't load %s", drawlib.Data());
1715  } else
1716  Warning("DrawCanvas", "can't locate %s", drawlib.Data());
1717  }
1718  if (gDrawCanvasHook && obj)
1719  return (*gDrawCanvasHook)(obj);
1720  // No drawing hook or object undefined
1721  return 1;
1722 }
1723 
1724 ////////////////////////////////////////////////////////////////////////////////
1725 /// Parse the arguments from var, sel and opt and fill the selector and
1726 /// object name accordingly.
1727 /// Return 0 in case of success, 1 if libProofDraw is not available.
1728 
1729 Int_t TProofPlayer::GetDrawArgs(const char *var, const char *sel, Option_t *opt,
1730  TString &selector, TString &objname)
1731 {
1732  static Int_t (*gGetDrawArgsHook)(const char *, const char *, Option_t *,
1733  TString &, TString &) = 0;
1734 
1735  // Load the library the first time
1736  if (!gGetDrawArgsHook) {
1737  // Load library needed for graphics ...
1738  TString drawlib = "libProofDraw";
1739  char *p = 0;
1740  if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
1741  delete[] p;
1742  if (gSystem->Load(drawlib) != -1) {
1743  // Locate GetDrawArgs
1744  Func_t f = 0;
1745  if ((f = gSystem->DynFindSymbol(drawlib,"GetDrawArgs")))
1746  gGetDrawArgsHook = (Int_t (*)(const char *, const char *, Option_t *,
1747  TString &, TString &))(f);
1748  else
1749  Warning("GetDrawArgs", "can't find GetDrawArgs");
1750  } else
1751  Warning("GetDrawArgs", "can't load %s", drawlib.Data());
1752  } else
1753  Warning("GetDrawArgs", "can't locate %s", drawlib.Data());
1754  }
1755  if (gGetDrawArgsHook)
1756  return (*gGetDrawArgsHook)(var, sel, opt, selector, objname);
1757  // No parser hook or object undefined
1758  return 1;
1759 }
1760 
1761 ////////////////////////////////////////////////////////////////////////////////
1762 /// Create/destroy a named canvas for feedback
1763 
1764 void TProofPlayer::FeedBackCanvas(const char *name, Bool_t create)
1765 {
1766  static void (*gFeedBackCanvasHook)(const char *, Bool_t) = 0;
1767 
1768  // Load the library the first time
1769  if (!gFeedBackCanvasHook) {
1770  // Load library needed for graphics ...
1771  TString drawlib = "libProofDraw";
1772  char *p = 0;
1773  if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
1774  delete[] p;
1775  if (gSystem->Load(drawlib) != -1) {
1776  // Locate FeedBackCanvas
1777  Func_t f = 0;
1778  if ((f = gSystem->DynFindSymbol(drawlib,"FeedBackCanvas")))
1779  gFeedBackCanvasHook = (void (*)(const char *, Bool_t))(f);
1780  else
1781  Warning("FeedBackCanvas", "can't find FeedBackCanvas");
1782  } else
1783  Warning("FeedBackCanvas", "can't load %s", drawlib.Data());
1784  } else
1785  Warning("FeedBackCanvas", "can't locate %s", drawlib.Data());
1786  }
1787  if (gFeedBackCanvasHook) (*gFeedBackCanvasHook)(name, create);
1788  // No parser hook or object undefined
1789  return;
1790 }
1791 
1792 ////////////////////////////////////////////////////////////////////////////////
1793 /// Return the size in bytes of the cache
1794 
1796 {
1797  if (fEvIter) return fEvIter->GetCacheSize();
1798  return -1;
1799 }
1800 
1801 ////////////////////////////////////////////////////////////////////////////////
1802 /// Return the number of entries in the learning phase
1803 
1805 {
1806  if (fEvIter) return fEvIter->GetLearnEntries();
1807  return -1;
1808 }
1809 
1810 ////////////////////////////////////////////////////////////////////////////////
1811 /// Switch on/off merge timer
1812 
1814 {
1815  if (on) {
1816  if (!fMergeSTW) fMergeSTW = new TStopwatch();
1817  PDB(kGlobal,1)
1818  Info("SetMerging", "ON: mergers: %d", fProof->fMergersCount);
1819  if (fNumMergers <= 0 && fProof->fMergersCount > 0)
1820  fNumMergers = fProof->fMergersCount;
1821  } else if (fMergeSTW) {
1822  fMergeSTW->Stop();
1823  Float_t rt = fMergeSTW->RealTime();
1824  PDB(kGlobal,1)
1825  Info("SetMerging", "OFF: rt: %f, mergers: %d", rt, fNumMergers);
1826  if (fQuery) {
1827  if (!fProof->TestBit(TProof::kIsClient) || fProof->IsLite()) {
1828  // On the master (or in Lite()) we set the merging time and the numebr of mergers
1829  fQuery->SetMergeTime(rt);
1830  fQuery->SetNumMergers(fNumMergers);
1831  } else {
1832  // In a standard client we save the transfer-to-client time
1833  fQuery->SetRecvTime(rt);
1834  }
1835  PDB(kGlobal,2) fQuery->Print("F");
1836  }
1837  }
1838 }
1839 
1840 //------------------------------------------------------------------------------
1841 
1843 
1844 ////////////////////////////////////////////////////////////////////////////////
1845 /// Process the specified TSelector object 'nentries' times.
1846 /// Used to test the PROOF interator mechanism for cycle-driven selectors in a
1847 /// local session.
1848 /// The return value is -1 in case of error and TSelector::GetStatus()
1849 /// in case of success.
1850 
1852  Long64_t nentries, Option_t *option)
1853 {
1854  if (!selector) {
1855  Error("Process", "selector object undefiend!");
1856  return -1;
1857  }
1858 
1859  TDSetProxy *set = new TDSetProxy("", "", "");
1860  set->SetBit(TDSet::kEmpty);
1861  set->SetBit(TDSet::kIsLocal);
1862  Long64_t rc = Process(set, selector, option, nentries);
1863  SafeDelete(set);
1864 
1865  // Done
1866  return rc;
1867 }
1868 
1869 ////////////////////////////////////////////////////////////////////////////////
1870 /// Process the specified TSelector file 'nentries' times.
1871 /// Used to test the PROOF interator mechanism for cycle-driven selectors in a
1872 /// local session.
1873 /// Process specified TDSet on PROOF worker with TSelector object
1874 /// The return value is -1 in case of error and TSelector::GetStatus()
1875 /// in case of success.
1876 
1878  Long64_t nentries, Option_t *option)
1879 {
1880  TDSetProxy *set = new TDSetProxy("", "", "");
1881  set->SetBit(TDSet::kEmpty);
1882  set->SetBit(TDSet::kIsLocal);
1883  Long64_t rc = Process(set, selector, option, nentries);
1884  SafeDelete(set);
1885 
1886  // Done
1887  return rc;
1888 }
1889 
1890 
1891 //------------------------------------------------------------------------------
1892 
1894 
1895 ////////////////////////////////////////////////////////////////////////////////
1896 /// Destructor.
1897 
1899 {
1900  SafeDelete(fOutput); // owns the output list
1901  SafeDelete(fOutputLists);
1902 
1903  // Objects stored in maps are already deleted when merging the feedback
1904  SafeDelete(fFeedbackLists);
1905  SafeDelete(fPacketizer);
1906 
1907  SafeDelete(fProcessMessage);
1908 }
1909 
1910 ////////////////////////////////////////////////////////////////////////////////
1911 /// Init the packetizer
1912 /// Return 0 on success (fPacketizer is correctly initialized), -1 on failure.
1913 
1915  Long64_t first, const char *defpackunit,
1916  const char *defpackdata)
1917 {
1918  SafeDelete(fPacketizer);
1919  PDB(kGlobal,1) Info("Process","Enter");
1920  fDSet = dset;
1922 
1923  // This is done here to pickup on the fly changes
1924  Int_t honebyone = 1;
1925  if (TProof::GetParameter(fInput, "PROOF_MergeTH1OneByOne", honebyone) != 0)
1926  honebyone = gEnv->GetValue("ProofPlayer.MergeTH1OneByOne", 1);
1927  fMergeTH1OneByOne = (honebyone == 1) ? kTRUE : kFALSE;
1928 
1929  Bool_t noData = dset->TestBit(TDSet::kEmpty) ? kTRUE : kFALSE;
1930 
1931  TString packetizer;
1932  TList *listOfMissingFiles = 0;
1933 
1934  TMethodCall callEnv;
1935  TClass *cl;
1936  noData = dset->TestBit(TDSet::kEmpty) ? kTRUE : kFALSE;
1937 
1938  if (noData) {
1939 
1940  if (TProof::GetParameter(fInput, "PROOF_Packetizer", packetizer) != 0)
1941  packetizer = defpackunit;
1942  else
1943  Info("InitPacketizer", "using alternate packetizer: %s", packetizer.Data());
1944 
1945  // Get linked to the related class
1946  cl = TClass::GetClass(packetizer);
1947  if (cl == 0) {
1948  Error("InitPacketizer", "class '%s' not found", packetizer.Data());
1950  return -1;
1951  }
1952 
1953  // Init the constructor
1954  callEnv.InitWithPrototype(cl, cl->GetName(),"TList*,Long64_t,TList*,TProofProgressStatus*");
1955  if (!callEnv.IsValid()) {
1956  Error("InitPacketizer",
1957  "cannot find correct constructor for '%s'", cl->GetName());
1959  return -1;
1960  }
1961  callEnv.ResetParam();
1962  callEnv.SetParam((Long_t) fProof->GetListOfActiveSlaves());
1963  callEnv.SetParam((Long64_t) nentries);
1964  callEnv.SetParam((Long_t) fInput);
1965  callEnv.SetParam((Long_t) fProgressStatus);
1966 
1967  } else if (dset->TestBit(TDSet::kMultiDSet)) {
1968 
1969  // We have to process many datasets in one go, keeping them separate
1970  if (fProof->GetRunStatus() != TProof::kRunning) {
1971  // We have been asked to stop
1972  Error("InitPacketizer", "received stop/abort request");
1974  return -1;
1975  }
1976 
1977  // The multi packetizer
1978  packetizer = "TPacketizerMulti";
1979 
1980  // Get linked to the related class
1981  cl = TClass::GetClass(packetizer);
1982  if (cl == 0) {
1983  Error("InitPacketizer", "class '%s' not found", packetizer.Data());
1985  return -1;
1986  }
1987 
1988  // Init the constructor
1989  callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
1990  if (!callEnv.IsValid()) {
1991  Error("InitPacketizer", "cannot find correct constructor for '%s'", cl->GetName());
1993  return -1;
1994  }
1995  callEnv.ResetParam();
1996  callEnv.SetParam((Long_t) dset);
1997  callEnv.SetParam((Long_t) fProof->GetListOfActiveSlaves());
1998  callEnv.SetParam((Long64_t) first);
1999  callEnv.SetParam((Long64_t) nentries);
2000  callEnv.SetParam((Long_t) fInput);
2001  callEnv.SetParam((Long_t) fProgressStatus);
2002 
2003  // We are going to test validity during the packetizer initialization
2004  dset->SetBit(TDSet::kValidityChecked);
2005  dset->ResetBit(TDSet::kSomeInvalid);
2006 
2007  } else {
2008 
2009  // Lookup - resolve the end-point urls to optmize the distribution.
2010  // The lookup was previously called in the packetizer's constructor.
2011  // A list for the missing files may already have been added to the
2012  // output list; otherwise, if needed it will be created inside
2013  if ((listOfMissingFiles = (TList *)fInput->FindObject("MissingFiles"))) {
2014  // Move it to the output list
2015  fInput->Remove(listOfMissingFiles);
2016  } else {
2017  listOfMissingFiles = new TList;
2018  }
2019  // Do the lookup; we only skip it if explicitly requested so.
2020  TString lkopt;
2021  if (TProof::GetParameter(fInput, "PROOF_LookupOpt", lkopt) != 0 || lkopt != "none")
2022  dset->Lookup(kTRUE, &listOfMissingFiles);
2023 
2024  if (fProof->GetRunStatus() != TProof::kRunning) {
2025  // We have been asked to stop
2026  Error("InitPacketizer", "received stop/abort request");
2028  return -1;
2029  }
2030 
2031  if (!(dset->GetListOfElements()) ||
2032  !(dset->GetListOfElements()->GetSize())) {
2033  if (gProofServ)
2034  gProofServ->SendAsynMessage("InitPacketizer: No files from the data set were found - Aborting");
2035  Error("InitPacketizer", "No files from the data set were found - Aborting");
2037  if (listOfMissingFiles) {
2038  listOfMissingFiles->SetOwner();
2039  fOutput->Remove(listOfMissingFiles);
2040  SafeDelete(listOfMissingFiles);
2041  }
2042  return -1;
2043  }
2044 
2045  if (TProof::GetParameter(fInput, "PROOF_Packetizer", packetizer) != 0)
2046  // Using standard packetizer TAdaptivePacketizer
2047  packetizer = defpackdata;
2048  else
2049  Info("InitPacketizer", "using alternate packetizer: %s", packetizer.Data());
2050 
2051  // Get linked to the related class
2052  cl = TClass::GetClass(packetizer);
2053  if (cl == 0) {
2054  Error("InitPacketizer", "class '%s' not found", packetizer.Data());
2056  return -1;
2057  }
2058 
2059  // Init the constructor
2060  callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
2061  if (!callEnv.IsValid()) {
2062  Error("InitPacketizer", "cannot find correct constructor for '%s'", cl->GetName());
2064  return -1;
2065  }
2066  callEnv.ResetParam();
2067  callEnv.SetParam((Long_t) dset);
2068  callEnv.SetParam((Long_t) fProof->GetListOfActiveSlaves());
2069  callEnv.SetParam((Long64_t) first);
2070  callEnv.SetParam((Long64_t) nentries);
2071  callEnv.SetParam((Long_t) fInput);
2072  callEnv.SetParam((Long_t) fProgressStatus);
2073 
2074  // We are going to test validity during the packetizer initialization
2075  dset->SetBit(TDSet::kValidityChecked);
2076  dset->ResetBit(TDSet::kSomeInvalid);
2077  }
2078 
2079  // Get an instance of the packetizer
2080  Long_t ret = 0;
2081  callEnv.Execute(ret);
2082  if ((fPacketizer = (TVirtualPacketizer *)ret) == 0) {
2083  Error("InitPacketizer", "cannot construct '%s'", cl->GetName());
2085  return -1;
2086  }
2087 
2088  if (!fPacketizer->IsValid()) {
2089  Error("InitPacketizer",
2090  "instantiated packetizer object '%s' is invalid", cl->GetName());
2092  SafeDelete(fPacketizer);
2093  return -1;
2094  }
2095 
2096  // In multi mode retrieve the list of missing files
2097  if (!noData && dset->TestBit(TDSet::kMultiDSet)) {
2098  if ((listOfMissingFiles = (TList *) fInput->FindObject("MissingFiles"))) {
2099  // Remove it; it will be added to the output list
2100  fInput->Remove(listOfMissingFiles);
2101  }
2102  }
2103 
2104  if (!noData) {
2105  // Add invalid elements to the list of missing elements
2106  TDSetElement *elem = 0;
2107  if (dset->TestBit(TDSet::kSomeInvalid)) {
2108  TIter nxe(dset->GetListOfElements());
2109  while ((elem = (TDSetElement *)nxe())) {
2110  if (!elem->GetValid()) {
2111  if (!listOfMissingFiles)
2112  listOfMissingFiles = new TList;
2113  listOfMissingFiles->Add(elem->GetFileInfo(dset->GetType()));
2114  dset->Remove(elem, kFALSE);
2115  }
2116  }
2117  // The invalid elements have been removed
2119  }
2120 
2121  // Record the list of missing or invalid elements in the output list
2122  if (listOfMissingFiles && listOfMissingFiles->GetSize() > 0) {
2123  TIter missingFiles(listOfMissingFiles);
2124  TString msg;
2125  if (gDebug > 0) {
2126  TFileInfo *fi = 0;
2127  while ((fi = (TFileInfo *) missingFiles.Next())) {
2128  if (fi->GetCurrentUrl()) {
2129  msg = Form("File not found: %s - skipping!",
2130  fi->GetCurrentUrl()->GetUrl());
2131  } else {
2132  msg = Form("File not found: %s - skipping!", fi->GetName());
2133  }
2135  }
2136  }
2137  // Make sure it will be sent back
2138  if (!GetOutput("MissingFiles")) {
2139  listOfMissingFiles->SetName("MissingFiles");
2140  AddOutputObject(listOfMissingFiles);
2141  }
2142  TStatus *tmpStatus = (TStatus *)GetOutput("PROOF_Status");
2143  if (!tmpStatus) AddOutputObject((tmpStatus = new TStatus()));
2144 
2145  // Estimate how much data are missing
2146  Int_t ngood = dset->GetListOfElements()->GetSize();
2147  Int_t nbad = listOfMissingFiles->GetSize();
2148  Double_t xb = Double_t(nbad) / Double_t(ngood + nbad);
2149  msg = Form(" About %.2f %c of the requested files (%d out of %d) were missing or unusable; details in"
2150  " the 'missingFiles' list", xb * 100., '%', nbad, nbad + ngood);
2151  tmpStatus->Add(msg.Data());
2152  msg = Form(" +++\n"
2153  " +++ About %.2f %c of the requested files (%d out of %d) are missing or unusable; details in"
2154  " the 'MissingFiles' list\n"
2155  " +++", xb * 100., '%', nbad, nbad + ngood);
2157  } else {
2158  // Cleanup
2159  SafeDelete(listOfMissingFiles);
2160  }
2161  }
2162 
2163  // Done
2164  return 0;
2165 }
2166 
2167 ////////////////////////////////////////////////////////////////////////////////
2168 /// Process specified TDSet on PROOF.
2169 /// This method is called on client and on the PROOF master.
2170 /// The return value is -1 in case of an error and TSelector::GetStatus() in
2171 /// in case of success.
2172 
2173 Long64_t TProofPlayerRemote::Process(TDSet *dset, const char *selector_file,
2174  Option_t *option, Long64_t nentries,
2175  Long64_t first)
2176 {
2177  PDB(kGlobal,1) Info("Process", "Enter");
2178 
2179  fDSet = dset;
2181 
2182  if (!fProgressStatus) {
2183  Error("Process", "No progress status");
2184  return -1;
2185  }
2187 
2188  // delete fOutput;
2189  if (!fOutput)
2190  fOutput = new THashList;
2191  else
2192  fOutput->Clear();
2193 
2194  SafeDelete(fFeedbackLists);
2195 
2196  if (fProof->IsMaster()){
2198  } else {
2200  }
2201 
2202  TStopwatch elapsed;
2203 
2204  // Define filename
2205  TString fn;
2206  fSelectorFileName = selector_file;
2207 
2208  if (fCreateSelObj) {
2209  if(!SendSelector(selector_file)) return -1;
2210  fn = gSystem->BaseName(selector_file);
2211  } else {
2212  fn = selector_file;
2213  }
2214 
2215  TMessage mesg(kPROOF_PROCESS);
2216 
2217  // Parse option
2218  Bool_t sync = (fProof->GetQueryMode(option) == TProof::kSync);
2219 
2220  TList *inputtmp = 0; // List of temporary input objects
2221  TDSet *set = dset;
2222  if (fProof->IsMaster()) {
2223 
2224  PDB(kPacketizer,1) Info("Process","Create Proxy TDSet");
2225  set = new TDSetProxy( dset->GetType(), dset->GetObjName(),
2226  dset->GetDirectory() );
2227  if (dset->TestBit(TDSet::kEmpty))
2228  set->SetBit(TDSet::kEmpty);
2229 
2230  if (InitPacketizer(dset, nentries, first, "TPacketizerUnit", "TPacketizer") != 0) {
2231  Error("Process", "cannot init the packetizer");
2233  return -1;
2234  }
2235 
2236  // Reset start, this is now managed by the packetizer
2237  first = 0;
2238 
2239  // Negative memlogfreq disable checks.
2240  // If 0 is passed we try to have 100 messages about memory
2241  // Otherwise we use the frequency passed.
2242  Int_t mrc = -1;
2243  Long64_t memlogfreq = -1, mlf;
2244  if (gSystem->Getenv("PROOF_MEMLOGFREQ")) {
2245  TString clf(gSystem->Getenv("PROOF_MEMLOGFREQ"));
2246  if (clf.IsDigit()) { memlogfreq = clf.Atoi(); mrc = 0; }
2247  }
2248  if ((mrc = TProof::GetParameter(fProof->GetInputList(), "PROOF_MemLogFreq", mlf)) == 0) memlogfreq = mlf;
2249  if (memlogfreq == 0) {
2250  memlogfreq = fPacketizer->GetTotalEntries()/(fProof->GetParallel()*100);
2251  if (memlogfreq <= 0) memlogfreq = 1;
2252  }
2253  if (mrc == 0) fProof->SetParameter("PROOF_MemLogFreq", memlogfreq);
2254 
2255 
2256  // Send input data, if any
2257  TString emsg;
2258  if (TProof::SendInputData(fQuery, fProof, emsg) != 0)
2259  Warning("Process", "could not forward input data: %s", emsg.Data());
2260 
2261  // Attach to the transient histogram with the assigned packets, if required
2262  if (fInput->FindObject("PROOF_StatsHist") != 0) {
2263  if (!(fProcPackets = (TH1I *) fOutput->FindObject("PROOF_ProcPcktHist"))) {
2264  Warning("Process", "could not attach to histogram 'PROOF_ProcPcktHist'");
2265  } else {
2266  PDB(kLoop,1)
2267  Info("Process", "attached to histogram 'PROOF_ProcPcktHist' to record"
2268  " packets being processed");
2269  }
2270  }
2271 
2272  } else {
2273 
2274  // Check whether we have to enforce the use of submergers
2275  if (gEnv->Lookup("Proof.UseMergers") && !fInput->FindObject("PROOF_UseMergers")) {
2276  Int_t smg = gEnv->GetValue("Proof.UseMergers",-1);
2277  if (smg >= 0) {
2278  fInput->Add(new TParameter<Int_t>("PROOF_UseMergers", smg));
2279  if (gEnv->Lookup("Proof.MergersByHost")) {
2280  Int_t mbh = gEnv->GetValue("Proof.MergersByHost",0);
2281  if (mbh != 0) {
2282  // Administrator settings have the priority
2283  TObject *o = 0;
2284  if ((o = fInput->FindObject("PROOF_MergersByHost"))) { fInput->Remove(o); delete o; }
2285  fInput->Add(new TParameter<Int_t>("PROOF_MergersByHost", mbh));
2286  }
2287  }
2288  }
2289  }
2290 
2291  // For a new query clients should make sure that the temporary
2292  // output list is empty
2293  if (fOutputLists) {
2294  fOutputLists->Delete();
2295  delete fOutputLists;
2296  fOutputLists = 0;
2297  }
2298 
2299  if (!sync) {
2300  gSystem->RedirectOutput(fProof->fLogFileName);
2301  Printf(" ");
2302  Info("Process","starting new query");
2303  }
2304 
2305  // Define fSelector in Client if processing with filename
2306  if (fCreateSelObj) {
2308  if (!(fSelector = TSelector::GetSelector(selector_file))) {
2309  if (!sync)
2310  gSystem->RedirectOutput(0);
2311  return -1;
2312  }
2313  }
2314 
2315  fSelectorClass = 0;
2316  fSelectorClass = fSelector->IsA();
2317 
2318  // Add fSelector to inputlist if processing with object
2319  if (!fCreateSelObj) {
2320  // In any input list was set into the selector move it to the PROOF
2321  // input list, because we do not want to stream the selector one
2322  if (fSelector->GetInputList() && fSelector->GetInputList()->GetSize() > 0) {
2323  TIter nxi(fSelector->GetInputList());
2324  TObject *o = 0;
2325  while ((o = nxi())) {
2326  if (!fInput->FindObject(o)) {
2327  fInput->Add(o);
2328  if (!inputtmp) {
2329  inputtmp = new TList;
2330  inputtmp->SetOwner(kFALSE);
2331  }
2332  inputtmp->Add(o);
2333  }
2334  }
2335  }
2336  fInput->Add(fSelector);
2337  }
2338  // Set the input list for initialization
2340  fSelector->SetOption(option);
2342 
2343  PDB(kLoop,1) Info("Process","Call Begin(0)");
2344  fSelector->Begin(0);
2345 
2346  // Reset the input list to avoid double streaming and related problems (saving
2347  // the TQueryResult)
2349 
2350  // Send large input data objects, if any
2351  fProof->SendInputDataFile();
2352 
2353  if (!sync)
2354  gSystem->RedirectOutput(0);
2355  }
2356 
2357  TCleanup clean(this);
2358  SetupFeedback();
2359 
2360  TString opt = option;
2361 
2362  // Old servers need a dedicated streamer
2363  if (fProof->fProtocol < 13)
2364  dset->SetWriteV3(kTRUE);
2365 
2366  // Workers will get the entry ranges from the packetizer
2367  Long64_t num = (gProofServ && gProofServ->IsMaster() && gProofServ->IsParallel()) ? -1 : nentries;
2368  Long64_t fst = (gProofServ && gProofServ->IsMaster() && gProofServ->IsParallel()) ? -1 : first;
2369 
2370  // Entry- or Event- list ?
2371  TEntryList *enl = (!fProof->IsMaster()) ? dynamic_cast<TEntryList *>(set->GetEntryList())
2372  : (TEntryList *)0;
2373  TEventList *evl = (!fProof->IsMaster() && !enl) ? dynamic_cast<TEventList *>(set->GetEntryList())
2374  : (TEventList *)0;
2375  if (fProof->fProtocol > 14) {
2376  if (fProcessMessage) delete fProcessMessage;
2377  fProcessMessage = new TMessage(kPROOF_PROCESS);
2378  mesg << set << fn << fInput << opt << num << fst << evl << sync << enl;
2379  (*fProcessMessage) << set << fn << fInput << opt << num << fst << evl << sync << enl;
2380  } else {
2381  mesg << set << fn << fInput << opt << num << fst << evl << sync;
2382  if (enl)
2383  // Not supported remotely
2384  Warning("Process","entry lists not supported by the server");
2385  }
2386 
2387  // Reset the merging progress information
2388  fProof->ResetMergePrg();
2389 
2390  Int_t nb = fProof->Broadcast(mesg);
2391  PDB(kGlobal,1) Info("Process", "Broadcast called: %d workers notified", nb);
2392  if (fProof->IsLite()) fProof->fNotIdle += nb;
2393 
2394  // Reset streamer choice
2395  if (fProof->fProtocol < 13)
2396  dset->SetWriteV3(kFALSE);
2397 
2398  // Redirect logs from master to special log frame
2399  if (IsClient())
2400  fProof->fRedirLog = kTRUE;
2401 
2402  if (!IsClient()){
2403  // Signal the start of finalize for the memory log grepping
2404  Info("Process|Svc", "Start merging Memory information");
2405  }
2406 
2407  if (!sync) {
2408  if (IsClient()) {
2409  // Asynchronous query: just make sure that asynchronous input
2410  // is enabled and return the prompt
2411  PDB(kGlobal,1) Info("Process","Asynchronous processing:"
2412  " activating CollectInputFrom");
2413  fProof->Activate();
2414 
2415  // Receive the acknowledgement and query sequential number
2416  fProof->Collect();
2417 
2418  return fProof->fSeqNum;
2419 
2420  } else {
2421  PDB(kGlobal,1) Info("Process","Calling Collect");
2422  fProof->Collect();
2423 
2424  HandleTimer(0); // force an update of final result
2425  // This forces a last call to TPacketizer::HandleTimer via the second argument
2426  // (the first is ignored). This is needed when some events were skipped so that
2427  // the total number of entries is not the one requested. The packetizer has no
2428  // way in such a case to understand that processing is finished: it must be told.
2429  if (fPacketizer) {
2430  fPacketizer->StopProcess(kFALSE, kTRUE);
2431  // The progress timer will now stop itself at the next call
2432  fPacketizer->SetBit(TVirtualPacketizer::kIsDone);
2433  // Store process info
2434  elapsed.Stop();
2435  if (fQuery)
2436  fQuery->SetProcessInfo(0, 0., fPacketizer->GetBytesRead(),
2437  fPacketizer->GetInitTime(),
2438  elapsed.RealTime());
2439  }
2440  StopFeedback();
2441 
2442  return Finalize(kFALSE,sync);
2443  }
2444  } else {
2445 
2446  PDB(kGlobal,1) Info("Process","Synchronous processing: calling Collect");
2447  fProof->Collect();
2448  if (!(fProof->IsSync())) {
2449  // The server required to switch to asynchronous mode
2450  Info("Process", "switching to the asynchronous mode ...");
2451  return fProof->fSeqNum;
2452  }
2453 
2454  // Restore prompt logging, for clients (Collect leaves things as they were
2455  // at the time it was called)
2456  if (IsClient())
2457  fProof->fRedirLog = kFALSE;
2458 
2459  if (!IsClient()) {
2460  // Force an update of final result
2461  HandleTimer(0);
2462  // This forces a last call to TPacketizer::HandleTimer via the second argument
2463  // (the first is ignored). This is needed when some events were skipped so that
2464  // the total number of entries is not the one requested. The packetizer has no
2465  // way in such a case to understand that processing is finished: it must be told.
2466  if (fPacketizer) {
2467  fPacketizer->StopProcess(kFALSE, kTRUE);
2468  // The progress timer will now stop itself at the next call
2469  fPacketizer->SetBit(TVirtualPacketizer::kIsDone);
2470  // Store process info
2471  if (fQuery)
2472  fQuery->SetProcessInfo(0, 0., fPacketizer->GetBytesRead(),
2473  fPacketizer->GetInitTime(),
2474  fPacketizer->GetProcTime());
2475  }
2476  } else {
2477  // Set the input list: maybe required at termination
2479  }
2480  StopFeedback();
2481 
2482  Long64_t rc = -1;
2484  rc = Finalize(kFALSE,sync);
2485 
2486  // Remove temporary input objects, if any
2487  if (inputtmp) {
2488  TIter nxi(inputtmp);
2489  TObject *o = 0;
2490  while ((o = nxi())) fInput->Remove(o);
2491  SafeDelete(inputtmp);
2492  }
2493 
2494  // Done
2495  return rc;
2496  }
2497 }
2498 
2499 ////////////////////////////////////////////////////////////////////////////////
2500 /// Process specified TDSet on PROOF.
2501 /// This method is called on client and on the PROOF master.
2502 /// The return value is -1 in case of an error and TSelector::GetStatus() in
2503 /// in case of success.
2504 
2506  Option_t *option, Long64_t nentries,
2507  Long64_t first)
2508 {
2509  if (!selector) {
2510  Error("Process", "selector object undefined");
2511  return -1;
2512  }
2513 
2514  // Define fSelector in Client
2515  if (IsClient() && (selector != fSelector)) {
2517  fSelector = selector;
2518  }
2519 
2521  Long64_t rc = Process(dset, selector->ClassName(), option, nentries, first);
2522  fCreateSelObj = kTRUE;
2523 
2524  // Done
2525  return rc;
2526 }
2527 
2528 ////////////////////////////////////////////////////////////////////////////////
2529 /// Prepares the given list of new workers to join a progressing process.
2530 /// Returns kTRUE on success, kFALSE otherwise.
2531 
2533 {
2534  if (!fProcessMessage || !fProof || !fPacketizer) {
2535  Error("Process", "Should not happen: fProcessMessage=%p fProof=%p fPacketizer=%p",
2536  fProcessMessage, fProof, fPacketizer);
2537  return kFALSE;
2538  }
2539 
2540  if (!workers || !fProof->IsMaster()) {
2541  Error("Process", "Invalid call");
2542  return kFALSE;
2543  }
2544 
2545  PDB(kGlobal, 1)
2546  Info("Process", "Preparing %d new worker(s) to process", workers->GetEntries());
2547 
2548  // Sends the file associated to the TSelector, if necessary
2549  if (fCreateSelObj) {
2550  PDB(kGlobal, 2)
2551  Info("Process", "Sending selector file %s", fSelectorFileName.Data());
2552  if(!SendSelector(fSelectorFileName.Data())) {
2553  Error("Process", "Problems in sending selector file %s", fSelectorFileName.Data());
2554  return kFALSE;
2555  }
2556  }
2557 
2558  if (fProof->IsLite()) fProof->fNotIdle += workers->GetSize();
2559 
2560  PDB(kGlobal, 2)
2561  Info("Process", "Adding new workers to the packetizer");
2562  if (fPacketizer->AddWorkers(workers) == -1) {
2563  Error("Process", "Cannot add new workers to the packetizer!");
2564  return kFALSE; // TODO: make new wrks inactive
2565  }
2566 
2567  PDB(kGlobal, 2)
2568  Info("Process", "Broadcasting process message to new workers");
2569  fProof->Broadcast(*fProcessMessage, workers);
2570 
2571  // Don't call Collect(): we came here from a global Collect() already which
2572  // will take care of new workers as well
2573 
2574  return kTRUE;
2575 
2576 }
2577 
2578 ////////////////////////////////////////////////////////////////////////////////
2579 /// Merge output in files
2580 
2582 {
2583  PDB(kOutput,1) Info("MergeOutputFiles", "enter: fOutput size: %d", fOutput->GetSize());
2584  PDB(kOutput,2) fOutput->ls();
2585 
2586  TList *rmList = 0;
2587  if (fMergeFiles) {
2588  TIter nxo(fOutput);
2589  TObject *o = 0;
2590  TProofOutputFile *pf = 0;
2591  while ((o = nxo())) {
2592  if ((pf = dynamic_cast<TProofOutputFile*>(o))) {
2593 
2594  PDB(kOutput,2) pf->Print();
2595 
2596  if (pf->IsMerge()) {
2597 
2598  // Point to the merger
2599  Bool_t localMerge = (pf->GetTypeOpt() == TProofOutputFile::kLocal) ? kTRUE : kFALSE;
2600  TFileMerger *filemerger = pf->GetFileMerger(localMerge);
2601  if (!filemerger) {
2602  Error("MergeOutputFiles", "file merger is null in TProofOutputFile! Protocol error?");
2603  pf->Print();
2604  continue;
2605  }
2606  // If only one instance the list in the merger is not yet created: do it now
2607  if (!pf->IsMerged()) {
2608  PDB(kOutput,2) pf->Print();
2609  TString fileLoc = TString::Format("%s/%s", pf->GetDir(), pf->GetFileName());
2610  filemerger->AddFile(fileLoc);
2611  }
2612  // Datadir
2613  TString ddir, ddopts;
2614  if (gProofServ) {
2615  ddir.Form("%s/", gProofServ->GetDataDir());
2617  }
2618  // Set the output file
2619  TString outfile(pf->GetOutputFileName());
2620  if (outfile.Contains("<datadir>/")) {
2621  outfile.ReplaceAll("<datadir>/", ddir.Data());
2622  if (!ddopts.IsNull())
2623  outfile += TString::Format("?%s", ddopts.Data());
2624  pf->SetOutputFileName(outfile);
2625  }
2626  if ((gProofServ && gProofServ->IsTopMaster()) || (fProof && fProof->IsLite())) {
2628  TString srv;
2630  TUrl usrv(srv);
2631  Bool_t localFile = kFALSE;
2632  if (pf->IsRetrieve()) {
2633  // This file will be retrieved by the client: we created it in the data dir
2634  // and save the file URL on the client in the title
2635  if (outfile.BeginsWith("client:")) outfile.Replace(0, 7, "");
2636  TString bn = gSystem->BaseName(TUrl(outfile.Data(), kTRUE).GetFile());
2637  // The output file path on the master
2638  outfile.Form("%s%s", ddir.Data(), bn.Data());
2639  // Save the client path in the title if not defined yet
2640  if (strlen(pf->GetTitle()) <= 0) pf->SetTitle(bn);
2641  // The file is local
2642  localFile = kTRUE;
2643  } else {
2644  // Check if the file is on the master or elsewhere
2645  if (outfile.BeginsWith("master:")) outfile.Replace(0, 7, "");
2646  // Check locality
2647  TUrl uof(outfile.Data(), kTRUE);
2648  TString lfn;
2649  ftyp = TFile::GetType(uof.GetUrl(), "RECREATE", &lfn);
2650  if (ftyp == TFile::kLocal && !srv.IsNull()) {
2651  // Check if is a different server
2652  if (uof.GetPort() > 0 && usrv.GetPort() > 0 &&
2653  usrv.GetPort() != uof.GetPort()) ftyp = TFile::kNet;
2654  }
2655  // If it is really local set the file name
2656  if (ftyp == TFile::kLocal) outfile = lfn;
2657  // The file maybe local
2658  if (ftyp == TFile::kLocal || ftyp == TFile::kFile) localFile = kTRUE;
2659  }
2660  // The remote output file name (the one to be used by the client)
2661  TString outfilerem(outfile);
2662  // For local files we add the local server
2663  if (localFile) {
2664  // Remove prefix, if any, if included and if Xrootd
2665  TProofServ::FilterLocalroot(outfilerem, srv);
2666  outfilerem.Insert(0, srv);
2667  }
2668  // Save the new remote output filename
2669  pf->SetOutputFileName(outfilerem);
2670  // Align the filename
2671  pf->SetFileName(gSystem->BaseName(outfilerem));
2672  }
2673  if (!filemerger->OutputFile(outfile)) {
2674  Error("MergeOutputFiles", "cannot open the output file");
2675  continue;
2676  }
2677  // Merge
2678  PDB(kSubmerger,2) filemerger->PrintFiles("");
2679  if (!filemerger->Merge()) {
2680  Error("MergeOutputFiles", "cannot merge the output files");
2681  continue;
2682  }
2683  // Remove the files
2684  TList *fileList = filemerger->GetMergeList();
2685  if (fileList) {
2686  TIter next(fileList);
2687  TObjString *url = 0;
2688  while((url = (TObjString*)next())) {
2689  TUrl u(url->GetName());
2690  if (!strcmp(u.GetProtocol(), "file")) {
2691  gSystem->Unlink(u.GetFile());
2692  } else {
2693  gSystem->Unlink(url->GetName());
2694  }
2695  }
2696  }
2697  // Reset the merger
2698  filemerger->Reset();
2699 
2700  } else {
2701 
2702  // If not yet merged (for example when having only 1 active worker,
2703  // we need to create the dataset by calling Merge on an effectively empty list
2704  if (!pf->IsMerged()) {
2705  TList dumlist;
2706  dumlist.Add(new TNamed("dum", "dum"));
2707  dumlist.SetOwner(kTRUE);
2708  pf->Merge(&dumlist);
2709  }
2710  // Point to the dataset
2712  if (!fc) {
2713  Error("MergeOutputFiles", "file collection is null in TProofOutputFile! Protocol error?");
2714  pf->Print();
2715  continue;
2716  }
2717  // Add the collection to the output list for registration and/or to be returned
2718  // to the client
2719  fOutput->Add(fc);
2720  // Do not cleanup at destruction
2721  pf->ResetFileCollection();
2722  // Tell the main thread to register this dataset, if needed
2723  if (pf->IsRegister()) {
2724  TString opt;
2725  if ((pf->GetTypeOpt() & TProofOutputFile::kOverwrite)) opt += "O";
2726  if ((pf->GetTypeOpt() & TProofOutputFile::kVerify)) opt += "V";
2727  if (!fOutput->FindObject("PROOFSERV_RegisterDataSet"))
2728  fOutput->Add(new TNamed("PROOFSERV_RegisterDataSet", ""));
2729  TString tag = TString::Format("DATASET_%s", pf->GetTitle());
2730  fOutput->Add(new TNamed(tag, opt));
2731  }
2732  // Remove this object from the output list and schedule it for distruction
2733  fOutput->Remove(pf);
2734  if (!rmList) rmList = new TList;
2735  rmList->Add(pf);
2736  PDB(kOutput,2) fOutput->Print();
2737  }
2738  }
2739  }
2740  }
2741 
2742  // Remove objects scheduled for removal
2743  if (rmList && rmList->GetSize() > 0) {
2744  TIter nxo(rmList);
2745  TObject *o = 0;
2746  while((o = nxo())) {
2747  fOutput->Remove(o);
2748  }
2749  rmList->SetOwner(kTRUE);
2750  delete rmList;
2751  }
2752 
2753  PDB(kOutput,1) Info("MergeOutputFiles", "done!");
2754 
2755  // Done
2756  return kTRUE;
2757 }
2758 
2759 
2760 ////////////////////////////////////////////////////////////////////////////////
2761 /// Set the selector's data members:
2762 /// find the mapping of data members to otuput list entries in the output list
2763 /// and apply it.
2764 
2766 {
2769  if (!olsdm) {
2770  PDB(kOutput,1) Warning("SetSelectorDataMembersFromOutputList",
2771  "failed to find map object in output list!");
2772  return;
2773  }
2774 
2775  olsdm->SetDataMembers(fSelector);
2776 }
2777 
2778 ////////////////////////////////////////////////////////////////////////////////
2779 
2781 {
2782  // Finalize a query.
2783  // Returns -1 in case of an error, 0 otherwise.
2784 
2785  if (IsClient()) {
2786  if (fOutputLists == 0) {
2787  if (force)
2788  if (fQuery)
2789  return fProof->Finalize(Form("%s:%s", fQuery->GetTitle(),
2790  fQuery->GetName()), force);
2791  } else {
2792  // Make sure the all objects are in the output list
2793  PDB(kGlobal,1) Info("Finalize","Calling Merge Output to finalize the output list");
2794  MergeOutput();
2795  }
2796  }
2797 
2798  Long64_t rv = 0;
2799  if (fProof->IsMaster()) {
2800 
2801  // Fill information for monitoring and stop it
2802  TStatus *status = (TStatus *) fOutput->FindObject("PROOF_Status");
2803  if (!status) {
2804  // The query was aborted: let's add some info in the output list
2805  status = new TStatus();
2806  fOutput->Add(status);
2807  TString emsg = TString::Format("Query aborted after %lld entries", GetEventsProcessed());
2808  status->Add(emsg);
2809  }
2810  status->SetExitStatus((Int_t) GetExitStatus());
2811 
2812  PDB(kOutput,1) Info("Finalize","Calling Merge Output");
2813  // Some objects (e.g. histos in autobin) may not have been merged yet
2814  // do it now
2815  MergeOutput();
2816 
2817  fOutput->SetOwner();
2818 
2819  // Add the active-wrks-vs-proctime info from the packetizer
2820  if (fPacketizer) {
2821  TObject *pperf = (TObject *) fPacketizer->GetProgressPerf(kTRUE);
2822  if (pperf) fOutput->Add(pperf);
2823  TList *parms = fPacketizer->GetConfigParams(kTRUE);
2824  if (parms) {
2825  TIter nxo(parms);
2826  TObject *o = 0;
2827  while ((o = nxo())) fOutput->Add(o);
2828  }
2829 
2830  // If other invalid elements were found during processing, add them to the
2831  // list of missing elements
2832  TDSetElement *elem = 0;
2833  if (fPacketizer->GetFailedPackets()) {
2834  TString type = (fPacketizer->TestBit(TVirtualPacketizer::kIsTree)) ? "TTree" : "";
2835  TList *listOfMissingFiles = (TList *) fOutput->FindObject("MissingFiles");
2836  if (!listOfMissingFiles) {
2837  listOfMissingFiles = new TList;
2838  listOfMissingFiles->SetName("MissingFiles");
2839  }
2840  TIter nxe(fPacketizer->GetFailedPackets());
2841  while ((elem = (TDSetElement *)nxe()))
2842  listOfMissingFiles->Add(elem->GetFileInfo(type));
2843  if (!fOutput->FindObject(listOfMissingFiles)) fOutput->Add(listOfMissingFiles);
2844  }
2845  }
2846 
2847  TPerfStats::Stop();
2848  // Save memory usage on master
2849  Long_t vmaxmst, rmaxmst;
2850  TPerfStats::GetMemValues(vmaxmst, rmaxmst);
2851  status->SetMemValues(vmaxmst, rmaxmst, kTRUE);
2852 
2854 
2855  } else {
2856  if (fExitStatus != kAborted) {
2857 
2858  if (!sync) {
2859  // Reinit selector (with multi-sessioning we must do this until
2860  // TSelector::GetSelector() is optimized to i) avoid reloading of an
2861  // unchanged selector and ii) invalidate existing instances of
2862  // reloaded selector)
2863  if (ReinitSelector(fQuery) == -1) {
2864  Info("Finalize", "problems reinitializing selector \"%s\"",
2865  fQuery->GetSelecImp()->GetName());
2866  return -1;
2867  }
2868  }
2869 
2870  if (fPacketizer)
2871  if (TList *failedPackets = fPacketizer->GetFailedPackets()) {
2872  fPacketizer->SetFailedPackets(0);
2873  failedPackets->SetName("FailedPackets");
2874  AddOutputObject(failedPackets);
2875 
2876  TStatus *status = (TStatus *)GetOutput("PROOF_Status");
2877  if (!status) AddOutputObject((status = new TStatus()));
2878  status->Add("Some packets were not processed! Check the the"
2879  " 'FailedPackets' list in the output list");
2880  }
2881 
2882  // Some input parameters may be needed in Terminate
2884 
2886  if (output) {
2887  TIter next(fOutput);
2888  while(TObject* obj = next()) {
2889  if (fProof->IsParallel() || DrawCanvas(obj) == 1)
2890  // Either parallel or not a canvas or not able to display it:
2891  // just add to the list
2892  output->Add(obj);
2893  }
2894  } else {
2895  Warning("Finalize", "undefined output list in the selector! Protocol error?");
2896  }
2897 
2898  // We need to do this because the output list can be modified in TSelector::Terminate
2899  // in a way to invalidate existing objects; so we clean the links when still valid and
2900  // we re-copy back later
2902  fOutput->Clear("nodelete");
2903 
2904  // Map output objects to selector members
2905  SetSelectorDataMembersFromOutputList();
2906 
2907  PDB(kLoop,1) Info("Finalize","Call Terminate()");
2908  // This is the end of merging
2909  SetMerging(kFALSE);
2910  // We measure the merge time
2911  fProof->fQuerySTW.Reset();
2912  // Call Terminate now
2913  fSelector->Terminate();
2914 
2915  rv = fSelector->GetStatus();
2916 
2917  // Copy the output list back and clean the selector's list
2918  TIter it(output);
2919  while(TObject* o = it()) {
2920  fOutput->Add(o);
2921  }
2922 
2923  // Save the output list in the current query, if any
2924  if (fQuery) {
2926  // Set in finalized state (cannot be done twice)
2927  fQuery->SetFinalized();
2928  } else {
2929  Warning("Finalize","current TQueryResult object is undefined!");
2930  }
2931 
2932  if (!fCreateSelObj) {
2935  if (output) output->Remove(fSelector);
2936  fSelector = 0;
2937  }
2938 
2939  // We have transferred copy of the output objects in TQueryResult,
2940  // so now we can cleanup the selector, making sure that we do not
2941  // touch the output objects
2942  if (output) { output->SetOwner(kFALSE); output->Clear("nodelete"); }
2944 
2945  // Delete fOutput (not needed anymore, cannot be finalized twice),
2946  // making sure that the objects saved in TQueryResult are not deleted
2948  fOutput->Clear("nodelete");
2950 
2951  } else {
2952 
2953  // Cleanup
2954  fOutput->SetOwner();
2956  if (!fCreateSelObj) fSelector = 0;
2957  }
2958  }
2959  PDB(kGlobal,1) Info("Process","exit");
2960 
2961  if (!IsClient()) {
2962  Info("Finalize", "finalization on %s finished", gProofServ->GetPrefix());
2963  }
2964  fProof->FinalizationDone();
2965 
2966  return rv;
2967 }
2968 
2969 ////////////////////////////////////////////////////////////////////////////////
2970 /// Finalize the results of a query already processed.
2971 
2973 {
2974  PDB(kGlobal,1) Info("Finalize(TQueryResult *)","Enter");
2975 
2976  if (!IsClient()) {
2977  Info("Finalize(TQueryResult *)",
2978  "method to be executed only on the clients");
2979  return -1;
2980  }
2981 
2982  if (!qr) {
2983  Info("Finalize(TQueryResult *)", "query undefined");
2984  return -1;
2985  }
2986 
2987  if (qr->IsFinalized()) {
2988  Info("Finalize(TQueryResult *)", "query already finalized");
2989  return -1;
2990  }
2991 
2992  // Reset the list
2993  if (!fOutput)
2994  fOutput = new THashList;
2995  else
2996  fOutput->Clear();
2997 
2998  // Make sure that the temporary output list is empty
2999  if (fOutputLists) {
3000  fOutputLists->Delete();
3001  delete fOutputLists;
3002  fOutputLists = 0;
3003  }
3004 
3005  // Re-init the selector
3006  gSystem->RedirectOutput(fProof->fLogFileName);
3007 
3008  // Import the output list
3009  TList *tmp = (TList *) qr->GetOutputList();
3010  if (!tmp) {
3011  gSystem->RedirectOutput(0);
3012  Info("Finalize(TQueryResult *)", "outputlist is empty");
3013  return -1;
3014  }
3015  TList *out = fOutput;
3016  if (fProof->fProtocol < 11)
3017  out = new TList;
3018  TIter nxo(tmp);
3019  TObject *o = 0;
3020  while ((o = nxo()))
3021  out->Add(o->Clone());
3022 
3023  // Adopts the list
3024  if (fProof->fProtocol < 11) {
3025  out->SetOwner();
3026  StoreOutput(out);
3027  }
3028  gSystem->RedirectOutput(0);
3029 
3030  SetSelectorDataMembersFromOutputList();
3031 
3032  // Finalize it
3033  SetCurrentQuery(qr);
3034  Long64_t rc = Finalize();
3036 
3037  return rc;
3038 }
3039 
3040 ////////////////////////////////////////////////////////////////////////////////
3041 /// Send the selector file(s) to master or worker nodes.
3042 
3043 Bool_t TProofPlayerRemote::SendSelector(const char* selector_file)
3044 {
3045  // Check input
3046  if (!selector_file) {
3047  Info("SendSelector", "Invalid input: selector (file) name undefined");
3048  return kFALSE;
3049  }
3050 
3051  if (!strchr(gSystem->BaseName(selector_file), '.')) {
3052  if (gDebug > 1)
3053  Info("SendSelector", "selector name '%s' does not contain a '.':"
3054  " nothing to send, it will be loaded from a library", selector_file);
3055  return kTRUE;
3056  }
3057 
3058  // Extract the fine name first
3059  TString selec = selector_file;
3060  TString aclicMode;
3061  TString arguments;
3062  TString io;
3063  selec = gSystem->SplitAclicMode(selec, aclicMode, arguments, io);
3064 
3065  // Expand possible envs or '~'
3066  gSystem->ExpandPathName(selec);
3067 
3068  // Update the macro path
3070  TString np(gSystem->DirName(selec));
3071  if (!np.IsNull()) {
3072  np += ":";
3073  if (!mp.BeginsWith(np) && !mp.Contains(":"+np)) {
3074  Int_t ip = (mp.BeginsWith(".:")) ? 2 : 0;
3075  mp.Insert(ip, np);
3076  TROOT::SetMacroPath(mp);
3077  if (gDebug > 0)
3078  Info("SendSelector", "macro path set to '%s'", TROOT::GetMacroPath());
3079  }
3080  }
3081 
3082  // Header file
3083  TString header = selec;
3084  header.Remove(header.Last('.'));
3085  header += ".h";
3086  if (gSystem->AccessPathName(header, kReadPermission)) {
3087  TString h = header;
3088  header.Remove(header.Last('.'));
3089  header += ".hh";
3090  if (gSystem->AccessPathName(header, kReadPermission)) {
3091  Info("SendSelector",
3092  "header file not found: tried: %s %s", h.Data(), header.Data());
3093  return kFALSE;
3094  }
3095  }
3096 
3097  // Send files now
3098  if (fProof->SendFile(selec, (TProof::kBinary | TProof::kForward | TProof::kCp | TProof::kCpBin)) == -1) {
3099  Info("SendSelector", "problems sending implementation file %s", selec.Data());
3100  return kFALSE;
3101  }
3102  if (fProof->SendFile(header, (TProof::kBinary | TProof::kForward | TProof::kCp)) == -1) {
3103  Info("SendSelector", "problems sending header file %s", header.Data());
3104  return kFALSE;
3105  }
3106 
3107  return kTRUE;
3108 }
3109 
3110 ////////////////////////////////////////////////////////////////////////////////
3111 /// Merge objects in output the lists.
3112 
3114 {
3115  PDB(kOutput,1) Info("MergeOutput","Enter");
3116 
3117  TObject *obj = 0;
3118  if (fOutputLists) {
3119 
3120  TIter next(fOutputLists);
3121 
3122  TList *list;
3123  while ( (list = (TList *) next()) ) {
3124 
3125  if (!(obj = fOutput->FindObject(list->GetName()))) {
3126  obj = list->First();
3127  list->Remove(obj);
3128  fOutput->Add(obj);
3129  }
3130 
3131  if ( list->IsEmpty() ) continue;
3132 
3133  TMethodCall callEnv;
3134  if (obj->IsA())
3135  callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
3136  if (callEnv.IsValid()) {
3137  callEnv.SetParam((Long_t) list);
3138  callEnv.Execute(obj);
3139  } else {
3140  // No Merge interface, return individual objects
3141  while ( (obj = list->First()) ) {
3142  fOutput->Add(obj);
3143  list->Remove(obj);
3144  }
3145  }
3146  }
3147  SafeDelete(fOutputLists);
3148 
3149  } else {
3150 
3151  PDB(kOutput,1) Info("MergeOutput","fOutputLists empty");
3152  }
3153 
3154  if (!IsClient() || fProof->IsLite()) {
3155  // Merge the output files created on workers, if any
3156  MergeOutputFiles();
3157  }
3158 
3159  // If there are TProofOutputFile objects we have to make sure that the internal
3160  // information is consistent for the cases where this object is going to be merged
3161  // again (e.g. when using submergers or in a multi-master setup). This may not be
3162  // the case because the first coming in is taken as reference and it has the
3163  // internal dir and raw dir of the originating worker.
3164  TString key;
3165  TNamed *nm = 0;
3166  TList rmlist;
3167  TIter nxo(fOutput);
3168  while ((obj = nxo())) {
3169  TProofOutputFile *pf = dynamic_cast<TProofOutputFile *>(obj);
3170  if (pf) {
3171  if (gProofServ) {
3172  PDB(kOutput,2) Info("MergeOutput","found TProofOutputFile '%s'", obj->GetName());
3173  TString dir(pf->GetOutputFileName());
3174  PDB(kOutput,2) Info("MergeOutput","outputfilename: '%s'", dir.Data());
3175  // The dir
3176  if (dir.Last('/') != kNPOS) dir.Remove(dir.Last('/')+1);
3177  PDB(kOutput,2) Info("MergeOutput","dir: '%s'", dir.Data());
3178  pf->SetDir(dir);
3179  // The raw dir; for xrootd based system we include the 'localroot', if any
3180  TUrl u(dir);
3181  dir = u.GetFile();
3182  TString pfx = gEnv->GetValue("Path.Localroot","");
3183  if (!pfx.IsNull() &&
3184  (!strcmp(u.GetProtocol(), "root") || !strcmp(u.GetProtocol(), "xrd")))
3185  dir.Insert(0, pfx);
3186  PDB(kOutput,2) Info("MergeOutput","rawdir: '%s'", dir.Data());
3187  pf->SetDir(dir, kTRUE);
3188  // The worker ordinal
3190  // The saved output file name, if any
3191  key.Form("PROOF_OutputFileName_%s", pf->GetFileName());
3192  if ((nm = (TNamed *) fOutput->FindObject(key.Data()))) {
3193  pf->SetOutputFileName(nm->GetTitle());
3194  rmlist.Add(nm);
3196  pf->SetOutputFileName(0);
3198  }
3199  // The filename (order is important to exclude '.merger' from the key)
3200  dir = pf->GetFileName();
3202  dir += ".merger";
3203  pf->SetMerged(kFALSE);
3204  } else {
3205  if (dir.EndsWith(".merger")) dir.Remove(dir.Last('.'));
3206  }
3207  pf->SetFileName(dir);
3208  } else if (fProof->IsLite()) {
3209  // The ordinal
3210  pf->SetWorkerOrdinal("0");
3211  // The dir
3212  pf->SetDir(gSystem->DirName(pf->GetOutputFileName()));
3213  // The filename and raw dir
3214  TUrl u(pf->GetOutputFileName(), kTRUE);
3215  pf->SetFileName(gSystem->BaseName(u.GetFile()));
3216  pf->SetDir(gSystem->DirName(u.GetFile()), kTRUE);
3217  // Notify the output path
3218  Printf("\nOutput file: %s", pf->GetOutputFileName());
3219  }
3220  } else {
3221  PDB(kOutput,2) Info("MergeOutput","output object '%s' is not a TProofOutputFile", obj->GetName());
3222  }
3223  }
3224 
3225  // Remove temporary objects from fOutput
3226  if (rmlist.GetSize() > 0) {
3227  TIter nxrm(&rmlist);
3228  while ((obj = nxrm()))
3229  fOutput->Remove(obj);
3230  rmlist.SetOwner(kTRUE);
3231  }
3232 
3233  // If requested (typically in case of submerger to count possible side-effects in that process)
3234  // save the measured memory usage
3235  if (saveMemValues) {
3236  TPerfStats::Stop();
3237  // Save memory usage on master
3238  Long_t vmaxmst, rmaxmst;
3239  TPerfStats::GetMemValues(vmaxmst, rmaxmst);
3240  TStatus *status = (TStatus *) fOutput->FindObject("PROOF_Status");
3241  if (status) status->SetMemValues(vmaxmst, rmaxmst, kFALSE);
3242  }
3243 
3244  PDB(kOutput,1) fOutput->Print();
3245  PDB(kOutput,1) Info("MergeOutput","leave (%d object(s))", fOutput->GetSize());
3246 }
3247 
3248 ////////////////////////////////////////////////////////////////////////////////
3249 /// Progress signal.
3250 
3252 {
3253  if (IsClient()) {
3254  fProof->Progress(total, processed);
3255  } else {
3256  // Send to the previous tier
3258  m << total << processed;
3259  gProofServ->GetSocket()->Send(m);
3260  }
3261 }
3262 
3263 ////////////////////////////////////////////////////////////////////////////////
3264 /// Progress signal.
3265 
3267  Long64_t bytesread,
3268  Float_t initTime, Float_t procTime,
3269  Float_t evtrti, Float_t mbrti)
3270 {
3271  PDB(kGlobal,1)
3272  Info("Progress","%lld %lld %lld %f %f %f %f", total, processed, bytesread,
3273  initTime, procTime, evtrti, mbrti);
3274 
3275  if (IsClient()) {
3276  fProof->Progress(total, processed, bytesread, initTime, procTime, evtrti, mbrti);
3277  } else {
3278  // Send to the previous tier
3280  m << total << processed << bytesread << initTime << procTime << evtrti << mbrti;
3281  gProofServ->GetSocket()->Send(m);
3282  }
3283 }
3284 
3285 ////////////////////////////////////////////////////////////////////////////////
3286 /// Progress signal.
3287 
3289 {
3290  if (pi) {
3291  PDB(kGlobal,1)
3292  Info("Progress","%lld %lld %lld %f %f %f %f %d %f", pi->fTotal, pi->fProcessed, pi->fBytesRead,
3293  pi->fInitTime, pi->fProcTime, pi->fEvtRateI, pi->fMBRateI,
3294  pi->fActWorkers, pi->fEffSessions);
3295 
3296  if (IsClient()) {
3297  fProof->Progress(pi->fTotal, pi->fProcessed, pi->fBytesRead,
3298  pi->fInitTime, pi->fProcTime,
3299  pi->fEvtRateI, pi->fMBRateI,
3300  pi->fActWorkers, pi->fTotSessions, pi->fEffSessions);
3301  } else {
3302  // Send to the previous tier
3304  m << pi;
3305  gProofServ->GetSocket()->Send(m);
3306  }
3307  } else {
3308  Warning("Progress","TProofProgressInfo object undefined!");
3309  }
3310 }
3311 
3312 
3313 ////////////////////////////////////////////////////////////////////////////////
3314 /// Feedback signal.
3315 
3317 {
3318  fProof->Feedback(objs);
3319 }
3320 
3321 ////////////////////////////////////////////////////////////////////////////////
3322 /// Stop process after this event.
3323 
3325 {
3326  if (fPacketizer != 0)
3327  fPacketizer->StopProcess(abort, kFALSE);
3328  if (abort == kTRUE)
3330  else
3332 }
3333 
3334 ////////////////////////////////////////////////////////////////////////////////
3335 /// Incorporate the received object 'obj' into the output list fOutput.
3336 /// The latter is created if not existing.
3337 /// This method short cuts 'StoreOutput + MergeOutput' optimizing the memory
3338 /// consumption.
3339 /// Returns -1 in case of error, 1 if the object has been merged into another
3340 /// one (so that its ownership has not been taken and can be deleted), and 0
3341 /// otherwise.
3342 
3344 {
3345  PDB(kOutput,1)
3346  Info("AddOutputObject","Enter: %p (%s)", obj, obj ? obj->ClassName() : "undef");
3347 
3348  // We must something to process
3349  if (!obj) {
3350  PDB(kOutput,1) Info("AddOutputObject","Invalid input (obj == 0x0)");
3351  return -1;
3352  }
3353 
3354  // Create the output list, if not yet done
3355  if (!fOutput)
3356  fOutput = new THashList;
3357 
3358  // Flag about merging
3359  Bool_t merged = kTRUE;
3360 
3361  // Process event lists first
3362  TList *elists = dynamic_cast<TList *> (obj);
3363  if (elists && !strcmp(elists->GetName(), "PROOF_EventListsList")) {
3364 
3365  // Create a global event list, result of merging the event lists
3366  // coresponding to the various data set elements
3367  TEventList *evlist = new TEventList("PROOF_EventList");
3368 
3369  // Iterate the list of event list segments
3370  TIter nxevl(elists);
3371  TEventList *evl = 0;
3372  while ((evl = dynamic_cast<TEventList *> (nxevl()))) {
3373 
3374  // Find the file offset (fDSet is the current TDSet instance)
3375  // locating the element by name
3376  TIter nxelem(fDSet->GetListOfElements());
3377  TDSetElement *elem = 0;
3378  while ((elem = dynamic_cast<TDSetElement *> (nxelem()))) {
3379  if (!strcmp(elem->GetFileName(), evl->GetName()))
3380  break;
3381  }
3382  if (!elem) {
3383  Error("AddOutputObject", "Found an event list for %s, but no object with"
3384  " the same name in the TDSet", evl->GetName());
3385  continue;
3386  }
3387  Long64_t offset = elem->GetTDSetOffset();
3388 
3389  // Shift the list by the number of first event in that file
3390  Long64_t *arr = evl->GetList();
3391  Int_t num = evl->GetN();
3392  if (arr && offset > 0)
3393  for (Int_t i = 0; i < num; i++)
3394  arr[i] += offset;
3395 
3396  // Add to the global event list
3397  evlist->Add(evl);
3398  }
3399 
3400  // Incorporate the resulting global list in fOutput
3401  SetLastMergingMsg(evlist);
3402  Incorporate(evlist, fOutput, merged);
3403  NotifyMemory(evlist);
3404 
3405  // Delete the global list if merged
3406  if (merged)
3407  SafeDelete(evlist);
3408 
3409  // The original object has been transformed in something else; we do
3410  // not have ownership on it
3411  return 1;
3412  }
3413 
3414  // Check if we need to merge files
3415  TProofOutputFile *pf = dynamic_cast<TProofOutputFile*>(obj);
3416  if (pf) {
3417  fMergeFiles = kTRUE;
3418  if (!IsClient() || fProof->IsLite()) {
3419  if (pf->IsMerge()) {
3420  Bool_t hasfout = (pf->GetOutputFileName() &&
3421  strlen(pf->GetOutputFileName()) > 0 &&
3423  Bool_t setfout = (!hasfout || TestBit(TVirtualProofPlayer::kIsSubmerger)) ? kTRUE : kFALSE;
3424  if (setfout) {
3425 
3426  TString ddir, ddopts;
3427  if (gProofServ) {
3428  ddir.Form("%s/", gProofServ->GetDataDir());
3429  if (gProofServ->GetDataDirOpts()) ddopts = gProofServ->GetDataDirOpts();
3430  }
3431  // Set the output file
3432  TString outfile(pf->GetOutputFileName());
3433  outfile.ReplaceAll("<datadir>/", ddir.Data());
3434  if (!ddopts.IsNull()) outfile += TString::Format("?%s", ddopts.Data());
3435  pf->SetOutputFileName(outfile);
3436 
3437  if (gProofServ) {
3438  // If submerger, save first the existing filename, if any
3439  if (TestBit(TVirtualProofPlayer::kIsSubmerger) && hasfout) {
3440  TString key = TString::Format("PROOF_OutputFileName_%s", pf->GetFileName());
3441  if (!fOutput->FindObject(key.Data()))
3442  fOutput->Add(new TNamed(key.Data(), pf->GetOutputFileName()));
3443  }
3444  TString of;
3446  if (of.IsNull()) {
3447  // Assume an xroot server running on the machine
3448  of.Form("root://%s/", gSystem->HostName());
3449  if (gSystem->Getenv("XRDPORT")) {
3450  TString sp(gSystem->Getenv("XRDPORT"));
3451  if (sp.IsDigit())
3452  of.Form("root://%s:%s/", gSystem->HostName(), sp.Data());
3453  }
3454  }
3455  TString sessionPath(gProofServ->GetSessionDir());
3456  TProofServ::FilterLocalroot(sessionPath, of);
3457  of += TString::Format("%s/%s", sessionPath.Data(), pf->GetFileName());
3459  if (!of.EndsWith(".merger")) of += ".merger";
3460  } else {
3461  if (of.EndsWith(".merger")) of.Remove(of.Last('.'));
3462  }
3463  pf->SetOutputFileName(of);
3464  }
3465  }
3466  // Notify
3467  PDB(kOutput, 1) pf->Print();
3468  }
3469  } else {
3470  // On clients notify the output path
3471  Printf("Output file: %s", pf->GetOutputFileName());
3472  }
3473  }
3474 
3475  // For other objects we just run the incorporation procedure
3476  SetLastMergingMsg(obj);
3477  Incorporate(obj, fOutput, merged);
3478  NotifyMemory(obj);
3479 
3480  // We are done
3481  return (merged ? 1 : 0);
3482 }
3483 
3484 ////////////////////////////////////////////////////////////////////////////////
3485 /// Control output redirection to TProof::fLogFileW
3486 
3488 {
3489  if (on && fProof && fProof->fLogFileW) {
3490  TProofServ::SetErrorHandlerFile(fProof->fLogFileW);
3491  fErrorHandler = SetErrorHandler(TProofServ::ErrorHandler);
3492  } else if (!on) {
3493  if (fErrorHandler) {
3495  SetErrorHandler(fErrorHandler);
3496  }
3497  }
3498 }
3499 
3500 ////////////////////////////////////////////////////////////////////////////////
3501 /// Incorporate the content of the received output list 'out' into the final
3502 /// output list fOutput. The latter is created if not existing.
3503 /// This method short cuts 'StoreOutput + MergeOutput' limiting the memory
3504 /// consumption.
3505 
3507 {
3508  PDB(kOutput,1) Info("AddOutput","Enter");
3509 
3510  // We must something to process
3511  if (!out) {
3512  PDB(kOutput,1) Info("AddOutput","Invalid input (out == 0x0)");
3513  return;
3514  }
3515 
3516  // Create the output list, if not yet done
3517  if (!fOutput)
3518  fOutput = new THashList;
3519 
3520  // Process event lists first
3521  Bool_t merged = kTRUE;
3522  TList *elists = dynamic_cast<TList *> (out->FindObject("PROOF_EventListsList"));
3523  if (elists) {
3524 
3525  // Create a global event list, result of merging the event lists
3526  // corresponding to the various data set elements
3527  TEventList *evlist = new TEventList("PROOF_EventList");
3528 
3529  // Iterate the list of event list segments
3530  TIter nxevl(elists);
3531  TEventList *evl = 0;
3532  while ((evl = dynamic_cast<TEventList *> (nxevl()))) {
3533 
3534  // Find the file offset (fDSet is the current TDSet instance)
3535  // locating the element by name
3536  TIter nxelem(fDSet->GetListOfElements());
3537  TDSetElement *elem = 0;
3538  while ((elem = dynamic_cast<TDSetElement *> (nxelem()))) {
3539  if (!strcmp(elem->GetFileName(), evl->GetName()))
3540  break;
3541  }
3542  if (!elem) {
3543  Error("AddOutput", "Found an event list for %s, but no object with"
3544  " the same name in the TDSet", evl->GetName());
3545  continue;
3546  }
3547  Long64_t offset = elem->GetTDSetOffset();
3548 
3549  // Shift the list by the number of first event in that file
3550  Long64_t *arr = evl->GetList();
3551  Int_t num = evl->GetN();
3552  if (arr && offset > 0)
3553  for (Int_t i = 0; i < num; i++)
3554  arr[i] += offset;
3555 
3556  // Add to the global event list
3557  evlist->Add(evl);
3558  }
3559 
3560  // Remove and delete the events lists object to avoid spoiling iteration
3561  // during next steps
3562  out->Remove(elists);
3563  delete elists;
3564 
3565  // Incorporate the resulting global list in fOutput
3566  SetLastMergingMsg(evlist);
3567  Incorporate(evlist, fOutput, merged);
3568  NotifyMemory(evlist);
3569  }
3570 
3571  // Iterate on the remaining objects in the received list
3572  TIter nxo(out);
3573  TObject *obj = 0;
3574  while ((obj = nxo())) {
3575  SetLastMergingMsg(obj);
3576  Incorporate(obj, fOutput, merged);
3577  // If not merged, drop from the temporary list, as the ownership
3578  // passes to fOutput
3579  if (!merged)
3580  out->Remove(obj);
3581  NotifyMemory(obj);
3582  }
3583 
3584  // Done
3585  return;
3586 }
3587 
3588 ////////////////////////////////////////////////////////////////////////////////
3589 /// Printout the memory record after merging object 'obj'
3590 /// This record is used by the memory monitor
3591 
3593 {
3594  if (fProof && (!IsClient() || fProof->IsLite())){
3595  ProcInfo_t pi;
3596  if (!gSystem->GetProcInfo(&pi)){
3597  // For PROOF-Lite we redirect this output to a the open log file so that the
3598  // memory monitor can pick these messages up
3599  RedirectOutput(fProof->IsLite());
3600  Info("NotifyMemory|Svc", "Memory %ld virtual %ld resident after merging object %s",
3601  pi.fMemVirtual, pi.fMemResident, obj->GetName());
3602  RedirectOutput(0);
3603  }
3604  // Record also values for monitoring
3606  }
3607 }
3608 
3609 ////////////////////////////////////////////////////////////////////////////////
3610 /// Set the message to be notified in case of exception
3611 
3613 {
3614  TString lastMsg = TString::Format("while merging object '%s'", obj->GetName());
3615  TProofServ::SetLastMsg(lastMsg);
3616 }
3617 
3618 ////////////////////////////////////////////////////////////////////////////////
3619 /// Incorporate object 'newobj' in the list 'outlist'.
3620 /// The object is merged with an object of the same name already existing in
3621 /// the list, or just added.
3622 /// The boolean merged is set to kFALSE when the object is just added to 'outlist';
3623 /// this happens if the Merge() method does not exist or if a object named as 'obj'
3624 /// is not already in the list. If the obj is not 'merged' than it should not be
3625 /// deleted, unless outlist is not owner of its objects.
3626 /// Return 0 on success, -1 on error.
3627 
3629 {
3630  merged = kTRUE;
3631 
3632  PDB(kOutput,1)
3633  Info("Incorporate", "enter: obj: %p (%s), list: %p",
3634  newobj, newobj ? newobj->ClassName() : "undef", outlist);
3635 
3636  // The object and list must exist
3637  if (!newobj || !outlist) {
3638  Error("Incorporate","Invalid inputs: obj: %p, list: %p", newobj, outlist);
3639  return -1;
3640  }
3641 
3642  // Special treatment for histograms in autobin mode
3643  Bool_t specialH =
3644  (!fProof || !fProof->TestBit(TProof::kIsClient) || fProof->IsLite()) ? kTRUE : kFALSE;
3645  if (specialH && newobj->InheritsFrom(TH1::Class())) {
3646  if (!HandleHistogram(newobj, merged)) {
3647  if (merged) {
3648  PDB(kOutput,1) Info("Incorporate", "histogram object '%s' merged", newobj->GetName());
3649  } else {
3650  PDB(kOutput,1) Info("Incorporate", "histogram object '%s' added to the"
3651  " appropriate list for delayed merging", newobj->GetName());
3652  }
3653  return 0;
3654  }
3655  }
3656 
3657  // Check if an object with the same name exists already
3658  TObject *obj = outlist->FindObject(newobj->GetName());
3659 
3660  // If no, add the new object and return
3661  if (!obj) {
3662  outlist->Add(newobj);
3663  merged = kFALSE;
3664  // Done
3665  return 0;
3666  }
3667 
3668  // Locate the Merge(TCollection *) method
3669  TMethodCall callEnv;
3670  if (obj->IsA())
3671  callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
3672  if (callEnv.IsValid()) {
3673  // Found: put the object in a one-element list
3674  static TList *xlist = new TList;
3675  xlist->Add(newobj);
3676  // Call the method
3677  callEnv.SetParam((Long_t) xlist);
3678  callEnv.Execute(obj);
3679  // Ready for next call
3680  xlist->Clear();
3681  } else {
3682  // Not found: return individual objects
3683  outlist->Add(newobj);
3684  merged = kFALSE;
3685  }
3686 
3687  // Done
3688  return 0;
3689 }
3690 
3691 ////////////////////////////////////////////////////////////////////////////////
3692 /// Low statistic histograms need a special treatment when using autobin
3693 
3695 {
3696  TH1 *h = dynamic_cast<TH1 *>(obj);
3697  if (!h) {
3698  // Not an histo
3699  return obj;
3700  }
3701 
3702  // This is only used if we return (TObject *)0 and there is only one case
3703  // when we set this to kTRUE
3704  merged = kFALSE;
3705 
3706  // Does is still needs binning ?
3707  Bool_t tobebinned = (h->GetBuffer()) ? kTRUE : kFALSE;
3708 
3709  // Number of entries
3710  Int_t nent = h->GetBufferLength();
3711  PDB(kOutput,2) Info("HandleHistogram", "h:%s ent:%d, buffer size: %d",
3712  h->GetName(), nent, h->GetBufferSize());
3713 
3714  // Attach to the list in the outputlists, if any
3715  TList *list = 0;
3716  if (!fOutputLists) {
3717  PDB(kOutput,2) Info("HandleHistogram", "create fOutputLists");
3718  fOutputLists = new TList;
3719  fOutputLists->SetOwner();
3720  }
3721  list = (TList *) fOutputLists->FindObject(h->GetName());
3722 
3723  TH1 *href = 0;
3724  if (tobebinned) {
3725 
3726  // The histogram needs to be projected in a reasonable range: we
3727  // do this at the end with all the histos, so we need to create
3728  // a list here
3729  if (!list) {
3730  // Create the list
3731  list = new TList;
3732  list->SetName(h->GetName());
3733  list->SetOwner();
3734  fOutputLists->Add(list);
3735  // Move in it any previously merged object from the output list
3736  if (fOutput && (href = (TH1 *) fOutput->FindObject(h->GetName()))) {
3737  fOutput->Remove(href);
3738  list->Add(href);
3739  }
3740  }
3741  TIter nxh(list);
3742  while ((href = (TH1 *) nxh())) {
3743  if (href->GetBuffer() && href->GetBufferLength() < nent) break;
3744  }
3745  if (href) {
3746  list->AddBefore(href, h);
3747  } else {
3748  list->Add(h);
3749  }
3750  // Done
3751  return (TObject *)0;
3752 
3753  } else {
3754 
3755  if (list) {
3756  TIter nxh(list);
3757  while ((href = (TH1 *) nxh())) {
3758  if (href->GetBuffer() || href->GetEntries() < nent) break;
3759  }
3760  if (href) {
3761  list->AddBefore(href, h);
3762  } else {
3763  list->Add(h);
3764  }
3765  // Done
3766  return (TObject *)0;
3767 
3768  } else {
3769  // Check if we can 'Add' the histogram to an existing one; this is more efficient
3770  // then using Merge
3771  TH1 *hout = (TH1*) fOutput->FindObject(h->GetName());
3772  if (hout) {
3773  // Remove the existing histo from the output list ...
3774  fOutput->Remove(hout);
3775  // ... and create either the list to merge in one-go at the end
3776  // (more efficient than merging one by one) or, if too big, merge
3777  // these two and start the 'one-by-one' technology
3778  Int_t hsz = h->GetNbinsX() * h->GetNbinsY() * h->GetNbinsZ();
3779  if (fMergeTH1OneByOne || (gProofServ && hsz > gProofServ->GetMsgSizeHWM())) {
3780  list = new TList;
3781  list->Add(hout);
3782  h->Merge(list);
3783  list->SetOwner();
3784  delete list;
3785  return h;
3786  } else {
3787  list = new TList;
3788  list->SetName(h->GetName());
3789  list->SetOwner();
3790  fOutputLists->Add(list);
3791  // Add the existing and the incoming histos
3792  list->Add(hout);
3793  list->Add(h);
3794  // Done
3795  return (TObject *)0;
3796  }
3797  } else {
3798  // This is the first one; add it to the output list
3799  fOutput->Add(h);
3800  return (TObject *)0;
3801  }
3802  }
3803  }
3804  PDB(kOutput,1) Info("HandleHistogram", "leaving");
3805 }
3806 
3807 ////////////////////////////////////////////////////////////////////////////////
3808 /// Return kTRUE is the histograms 'h0' and 'h1' have the same binning and ranges
3809 /// on the axis (i.e. if they can be just Add-ed for merging).
3810 
3812 {
3813  Bool_t rc = kFALSE;
3814  if (!h0 || !h1) return rc;
3815 
3816  TAxis *a0 = 0, *a1 = 0;
3817 
3818  // Check X
3819  a0 = h0->GetXaxis();
3820  a1 = h1->GetXaxis();
3821  if (a0->GetNbins() == a1->GetNbins())
3822  if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
3823  if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
3824 
3825  // Check Y, if needed
3826  if (h0->GetDimension() > 1) {
3827  rc = kFALSE;
3828  a0 = h0->GetYaxis();
3829  a1 = h1->GetYaxis();
3830  if (a0->GetNbins() == a1->GetNbins())
3831  if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
3832  if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
3833  }
3834 
3835  // Check Z, if needed
3836  if (h0->GetDimension() > 2) {
3837  rc = kFALSE;
3838  a0 = h0->GetZaxis();
3839  a1 = h1->GetZaxis();
3840  if (a0->GetNbins() == a1->GetNbins())
3841  if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
3842  if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
3843  }
3844 
3845  // Done
3846  return rc;
3847 }
3848 
3849 ////////////////////////////////////////////////////////////////////////////////
3850 /// Store received output list.
3851 
3853 {
3854  PDB(kOutput,1) Info("StoreOutput","Enter");
3855 
3856  if ( out == 0 ) {
3857  PDB(kOutput,1) Info("StoreOutput","Leave (empty)");
3858  return;
3859  }
3860 
3861  TIter next(out);
3862  out->SetOwner(kFALSE); // take ownership of the contents
3863 
3864  if (fOutputLists == 0) {
3865  PDB(kOutput,2) Info("StoreOutput","Create fOutputLists");
3866  fOutputLists = new TList;
3867  fOutputLists->SetOwner();
3868  }
3869  // process eventlists first
3870  TList* lists = dynamic_cast<TList*> (out->FindObject("PROOF_EventListsList"));
3871  if (lists) {
3872  out->Remove(lists);
3873  TEventList *mainList = new TEventList("PROOF_EventList");
3874  out->Add(mainList);
3875  TIter it(lists);
3876  TEventList *aList;
3877  while ( (aList = dynamic_cast<TEventList*> (it())) ) {
3878  // find file offset
3879  TIter nxe(fDSet->GetListOfElements());
3880  TDSetElement *elem;
3881  while ( (elem = dynamic_cast<TDSetElement*> (nxe())) ) {
3882  if (strcmp(elem->GetFileName(), aList->GetName()) == 0)
3883  break;
3884  }
3885  if (!elem) {
3886  Error("StoreOutput", "found the EventList for %s, but no object with that name "
3887  "in the TDSet", aList->GetName());
3888  continue;
3889  }
3890  Long64_t offset = elem->GetTDSetOffset();
3891 
3892  // shift the list by the number of first event in that file
3893  Long64_t *arr = aList->GetList();
3894  Int_t num = aList->GetN();
3895  if (arr && offset)
3896  for (int i = 0; i < num; i++)
3897  arr[i] += offset;
3898 
3899  mainList->Add(aList); // add to the main list
3900  }
3901  delete lists;
3902  }
3903 
3904  TObject *obj;
3905  while( (obj = next()) ) {
3906  PDB(kOutput,2) Info("StoreOutput","find list for '%s'", obj->GetName() );
3907 
3908  TList *list = (TList *) fOutputLists->FindObject( obj->GetName() );
3909  if ( list == 0 ) {
3910  PDB(kOutput,2) Info("StoreOutput", "list for '%s' not found (creating)", obj->GetName());
3911  list = new TList;
3912  list->SetName( obj->GetName() );
3913  list->SetOwner();
3914  fOutputLists->Add( list );
3915  }
3916  list->Add( obj );
3917  }
3918 
3919  delete out;
3920  PDB(kOutput,1) Info("StoreOutput", "leave");
3921 }
3922 
3923 ////////////////////////////////////////////////////////////////////////////////
3924 /// Merge feedback lists.
3925 
3927 {
3928  PDB(kFeedback,1)
3929  Info("MergeFeedback","Enter");
3930 
3931  if ( fFeedbackLists == 0 ) {
3932  PDB(kFeedback,1)
3933  Info("MergeFeedback","Leave (no output)");
3934  return 0;
3935  }
3936 
3937  TList *fb = new TList; // collection of feedback objects
3938  fb->SetOwner();
3939 
3940  TIter next(fFeedbackLists);
3941 
3942  TMap *map;
3943  while ( (map = (TMap*) next()) ) {
3944 
3945  PDB(kFeedback,2)
3946  Info("MergeFeedback", "map %s size: %d", map->GetName(), map->GetSize());
3947 
3948  // turn map into list ...
3949 
3950  TList *list = new TList;
3951  TIter keys(map);
3952 
3953 #ifndef R__TH1MERGEFIXED
3954  Int_t nbmx = -1;
3955  TObject *oref = 0;
3956 #endif
3957  while ( TObject *key = keys() ) {
3958  TObject *o = map->GetValue(key);
3959  TH1 *h = dynamic_cast<TH1 *>(o);
3960 #ifndef R__TH1MERGEFIXED
3961  // Temporary fix for to cope with the problem in TH1::Merge.
3962  // We need to use a reference histo the one with the largest number
3963  // of bins so that the histos from all submasters can be correctly
3964  // fit in
3965  if (h && !strncmp(o->GetName(),"PROOF_",6)) {
3966  if (h->GetNbinsX() > nbmx) {
3967  nbmx= h->GetNbinsX();
3968  oref = o;
3969  }
3970  }
3971 #endif
3972  if (h) {
3973  TIter nxh(list);
3974  TH1 *href= 0;
3975  while ((href = (TH1 *)nxh())) {
3976  if (h->GetBuffer()) {
3977  if (href->GetBuffer() && href->GetBufferLength() < h->GetBufferLength()) break;
3978  } else {
3979  if (href->GetBuffer() || href->GetEntries() < h->GetEntries()) break;
3980  }
3981  }
3982  if (href) {
3983  list->AddBefore(href, h);
3984  } else {
3985  list->Add(h);
3986  }
3987  } else {
3988  list->Add(o);
3989  }
3990  }
3991 
3992  // clone first object, remove from list
3993 #ifdef R__TH1MERGEFIXED
3994  TObject *obj = list->First();
3995 #else
3996  TObject *obj = (oref) ? oref : list->First();
3997 #endif
3998  list->Remove(obj);
3999  obj = obj->Clone();
4000  fb->Add(obj);
4001 
4002  if ( list->IsEmpty() ) {
4003  delete list;
4004  continue;
4005  }
4006 
4007  // merge list with clone
4008  TMethodCall callEnv;
4009  if (obj->IsA())
4010  callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
4011  if (callEnv.IsValid()) {
4012  callEnv.SetParam((Long_t) list);
4013  callEnv.Execute(obj);
4014  } else {
4015  // No Merge interface, return copy of individual objects
4016  while ( (obj = list->First()) ) {
4017  fb->Add(obj->Clone());
4018  list->Remove(obj);
4019  }
4020  }
4021 
4022  delete list;
4023  }
4024 
4025  PDB(kFeedback,1)
4026  Info("MergeFeedback","Leave (%d object(s))", fb->GetSize());
4027 
4028  return fb;
4029 }
4030 
4031 ////////////////////////////////////////////////////////////////////////////////
4032 /// Store feedback results from the specified slave.
4033 
4035 {
4036  PDB(kFeedback,1)
4037  Info("StoreFeedback","Enter");
4038 
4039  if ( out == 0 ) {
4040  PDB(kFeedback,1)
4041  Info("StoreFeedback","Leave (empty)");
4042  return;
4043  }
4044 
4045  if ( IsClient() ) {
4046  // in client
4047  Feedback(out);
4048  delete out;
4049  return;
4050  }
4051 
4052  if (fFeedbackLists == 0) {
4053  PDB(kFeedback,2) Info("StoreFeedback","Create fFeedbackLists");
4054  fFeedbackLists = new TList;
4055  fFeedbackLists->SetOwner();
4056  }
4057 
4058  TIter next(out);
4059  out->SetOwner(kFALSE); // take ownership of the contents
4060 
4061  const char *ord = ((TSlave*) slave)->GetOrdinal();
4062 
4063  TObject *obj;
4064  while( (obj = next()) ) {
4065  PDB(kFeedback,2)
4066  Info("StoreFeedback","%s: Find '%s'", ord, obj->GetName() );
4067  TMap *map = (TMap*) fFeedbackLists->FindObject(obj->GetName());
4068  if ( map == 0 ) {
4069  PDB(kFeedback,2)
4070  Info("StoreFeedback", "%s: map for '%s' not found (creating)", ord, obj->GetName());
4071  // Map must not be owner (ownership is with regards to the keys (only))
4072  map = new TMap;
4073  map->SetName(obj->GetName());
4074  fFeedbackLists->Add(map);
4075  } else {
4076  PDB(kFeedback,2)
4077  Info("StoreFeedback","%s: removing previous value", ord);
4078  if (map->GetValue(slave))
4079  delete map->GetValue(slave);
4080  map->Remove(slave);
4081  }
4082  map->Add(slave, obj);
4083  PDB(kFeedback,2)
4084  Info("StoreFeedback","%s: %s, size: %d", ord, obj->GetName(), map->GetSize());
4085  }
4086 
4087  delete out;
4088  PDB(kFeedback,1)
4089  Info("StoreFeedback","Leave");
4090 }
4091 
4092 ////////////////////////////////////////////////////////////////////////////////
4093 /// Setup reporting of feedback objects.
4094 
4096 {
4097  if (IsClient()) return; // Client does not need timer
4098 
4099  fFeedback = (TList*) fInput->FindObject("FeedbackList");
4100 
4101  PDB(kFeedback,1) Info("SetupFeedback","\"FeedbackList\" %sfound",
4102  fFeedback == 0 ? "NOT ":"");
4103 
4104  if (fFeedback == 0 || fFeedback->GetSize() == 0) return;
4105 
4106  // OK, feedback was requested, setup the timer
4108  fFeedbackPeriod = 2000;
4109  TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
4110  fFeedbackTimer = new TTimer;
4111  fFeedbackTimer->SetObject(this);
4113 }
4114 
4115 ////////////////////////////////////////////////////////////////////////////////
4116 /// Stop reporting of feedback objects.
4117 
4119 {
4120  if (fFeedbackTimer == 0) return;
4121 
4122  PDB(kFeedback,1) Info("StopFeedback","Stop Timer");
4123 
4125 }
4126 
4127 ////////////////////////////////////////////////////////////////////////////////
4128 /// Send feedback objects to client.
4129 
4131 {
4132  PDB(kFeedback,2) Info("HandleTimer","Entry");
4133 
4134  if (fFeedbackTimer == 0) return kFALSE; // timer already switched off
4135 
4136  // process local feedback objects
4137 
4138  TList *fb = new TList;
4139  fb->SetOwner();
4140 
4141  TIter next(fFeedback);
4142  while( TObjString *name = (TObjString*) next() ) {
4143  TObject *o = fOutput->FindObject(name->GetName());
4144  if (o != 0) {
4145  fb->Add(o->Clone());
4146  // remove the corresponding entry from the feedback list
4147  TMap *m = 0;
4148  if (fFeedbackLists &&
4149  (m = (TMap *) fFeedbackLists->FindObject(name->GetName()))) {
4150  fFeedbackLists->Remove(m);
4151  m->DeleteValues();
4152  delete m;
4153  }
4154  }
4155  }
4156 
4157  if (fb->GetSize() > 0) {
4158  StoreFeedback(this, fb); // adopts fb
4159  } else {
4160  delete fb;
4161  }
4162 
4163  if (fFeedbackLists == 0) {
4164  fFeedbackTimer->Start(fFeedbackPeriod, kTRUE); // maybe next time
4165  return kFALSE;
4166  }
4167 
4168  fb = MergeFeedback();
4169 
4170  PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());
4171 
4173  m << fb;
4174 
4175  // send message to client;
4176  gProofServ->GetSocket()->Send(m);
4177 
4178  delete fb;
4179 
4181 
4182  return kFALSE; // ignored?
4183 }
4184 
4185 ////////////////////////////////////////////////////////////////////////////////
4186 /// Get next packet for specified slave.
4187 
4189 {
4190  // The first call to this determines the end of initialization
4191  SetInitTime();
4192 
4193  if (fProcPackets) {
4194  Int_t bin = fProcPackets->GetXaxis()->FindBin(slave->GetOrdinal());
4195  if (bin >= 0) {
4196  if (fProcPackets->GetBinContent(bin) > 0)
4197  fProcPackets->Fill(slave->GetOrdinal(), -1);
4198  }
4199  }
4200 
4201  TDSetElement *e = fPacketizer->GetNextPacket( slave, r );
4202 
4203  if (e == 0) {
4204  PDB(kPacketizer,2)
4205  Info("GetNextPacket","%s: done!", slave->GetOrdinal());
4206  } else if (e == (TDSetElement*) -1) {
4207  PDB(kPacketizer,2)
4208  Info("GetNextPacket","%s: waiting ...", slave->GetOrdinal());
4209  } else {
4210  PDB(kPacketizer,2)
4211  Info("GetNextPacket","%s (%s): '%s' '%s' '%s' %lld %lld",
4212  slave->GetOrdinal(), slave->GetName(), e->GetFileName(),
4213  e->GetDirectory(), e->GetObjName(), e->GetFirst(), e->GetNum());
4214  if (fProcPackets) fProcPackets->Fill(slave->GetOrdinal(), 1);
4215  }
4216 
4217  return e;
4218 }
4219 
4220 ////////////////////////////////////////////////////////////////////////////////
4221 /// Is the player running on the client?
4222 
4224 {
4225  return fProof ? fProof->TestBit(TProof::kIsClient) : kFALSE;
4226 }
4227 
4228 ////////////////////////////////////////////////////////////////////////////////
4229 /// Draw (support for TChain::Draw()).
4230 /// Returns -1 in case of error or number of selected events in case of success.
4231 
4233  const char *selection, Option_t *option,
4234  Long64_t nentries, Long64_t firstentry)
4235 {
4236  if (!fgDrawInputPars) {
4237  fgDrawInputPars = new THashList;
4238  fgDrawInputPars->Add(new TObjString("FeedbackList"));
4239  fgDrawInputPars->Add(new TObjString("PROOF_ChainWeight"));
4240  fgDrawInputPars->Add(new TObjString("PROOF_LineColor"));
4241  fgDrawInputPars->Add(new TObjString("PROOF_LineStyle"));
4242  fgDrawInputPars->Add(new TObjString("PROOF_LineWidth"));
4243  fgDrawInputPars->Add(new TObjString("PROOF_MarkerColor"));
4244  fgDrawInputPars->Add(new TObjString("PROOF_MarkerStyle"));
4245  fgDrawInputPars->Add(new TObjString("PROOF_MarkerSize"));
4246  fgDrawInputPars->Add(new TObjString("PROOF_FillColor"));
4247  fgDrawInputPars->Add(new TObjString("PROOF_FillStyle"));
4248  fgDrawInputPars->Add(new TObjString("PROOF_ListOfAliases"));
4249  }
4250 
4251  TString selector, objname;
4252  if (GetDrawArgs(varexp, selection, option, selector, objname) != 0) {
4253  Error("DrawSelect", "parsing arguments");
4254  return -1;
4255  }
4256 
4257  TNamed *varexpobj = new TNamed("varexp", varexp);
4258  TNamed *selectionobj = new TNamed("selection", selection);
4259 
4260  // Save the current input list
4261  TObject *o = 0;
4262  TList *savedInput = new TList;
4263  TIter nxi(fInput);
4264  while ((o = nxi())) {
4265  savedInput->Add(o);
4266  TString n(o->GetName());
4267  if (fgDrawInputPars &&
4269  !n.BeginsWith("alias:")) fInput->Remove(o);
4270  }
4271 
4272  fInput->Add(varexpobj);
4273  fInput->Add(selectionobj);
4274 
4275  // Make sure we have an object name
4276  if (objname == "") objname = "htemp";
4277 
4278  fProof->AddFeedback(objname);
4279  Long64_t r = Process(set, selector, option, nentries, firstentry);
4280  fProof->RemoveFeedback(objname);
4281 
4282  fInput->Remove(varexpobj);
4283  fInput->Remove(selectionobj);
4284  if (TNamed *opt = dynamic_cast<TNamed*> (fInput->FindObject("PROOF_OPTIONS"))) {
4285  fInput->Remove(opt);
4286  delete opt;
4287  }
4288 
4289  delete varexpobj;
4290  delete selectionobj;
4291 
4292  // Restore the input list
4293  fInput->Clear();
4294  TIter nxsi(savedInput);
4295  while ((o = nxsi()))
4296  fInput->Add(o);
4297  savedInput->SetOwner(kFALSE);
4298  delete savedInput;
4299 
4300  return r;
4301 }
4302 
4303 ////////////////////////////////////////////////////////////////////////////////
4304 /// Set init time
4305 
4307 {
4308  if (fPacketizer)
4309  fPacketizer->SetInitTime();
4310 }
4311 
4312 //------------------------------------------------------------------------------
4313 
4314 
4316 
4317 ////////////////////////////////////////////////////////////////////////////////
4318 /// Setup feedback.
4319 
4321 {
4322  TList *fb = (TList*) fInput->FindObject("FeedbackList");
4323  if (fb) {
4324  PDB(kFeedback,1)
4325  Info("SetupFeedback","\"FeedbackList\" found: %d objects", fb->GetSize());
4326  } else {
4327  PDB(kFeedback,1)
4328  Info("SetupFeedback","\"FeedbackList\" NOT found");
4329  }
4330 
4331  if (fb == 0 || fb->GetSize() == 0) return;
4332 
4333  // OK, feedback was requested, setup the timer
4334 
4336  fFeedbackPeriod = 2000;
4337  TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
4338  fFeedbackTimer = new TTimer;
4339  fFeedbackTimer->SetObject(this);
4341 
4342  fFeedback = fb;
4343 }
4344 
4345 ////////////////////////////////////////////////////////////////////////////////
4346 /// Stop feedback.
4347 
4349 {
4350  if (fFeedbackTimer == 0) return;
4351 
4352  PDB(kFeedback,1) Info("StopFeedback","Stop Timer");
4353 
4355 }
4356 
4357 ////////////////////////////////////////////////////////////////////////////////
4358 /// Handle timer event.
4359 
4361 {
4362  PDB(kFeedback,2) Info("HandleTimer","Entry");
4363 
4364  // If in sequential (0-slave-PROOF) mode we do not have a packetizer
4365  // so we also send the info to update the progress bar.
4366  if (gProofServ) {
4367  Bool_t sendm = kFALSE;
4369  if (gProofServ->IsMaster() && !gProofServ->IsParallel()) {
4370  sendm = kTRUE;
4371  if (gProofServ->GetProtocol() > 25) {
4372  m << GetProgressStatus();
4373  } else if (gProofServ->GetProtocol() > 11) {
4375  m << fTotalEvents << ps->GetEntries() << ps->GetBytesRead()
4376  << (Float_t) -1. << (Float_t) ps->GetProcTime()
4377  << (Float_t) ps->GetRate() << (Float_t) -1.;
4378  } else {
4379  m << fTotalEvents << GetEventsProcessed();
4380  }
4381  }
4382  if (sendm) gProofServ->GetSocket()->Send(m);
4383  }
4384 
4385  if (fFeedback == 0) return kFALSE;
4386 
4387  TList *fb = new TList;
4388  fb->SetOwner(kFALSE);
4389 
4390  if (fOutput == 0) {
4392  }
4393 
4394  if (fOutput) {
4395  TIter next(fFeedback);
4396  while( TObjString *name = (TObjString*) next() ) {
4397  // TODO: find object in memory ... maybe allow only in fOutput ?
4398  TObject *o = fOutput->FindObject(name->GetName());
4399  if (o != 0) fb->Add(o);
4400  }
4401  }
4402 
4403  PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());
4404 
4406  m << fb;
4407 
4408  // send message to client;
4409  gProofServ->GetSocket()->Send(m);
4410 
4411  delete fb;
4412 
4414 
4415  return kFALSE; // ignored?
4416 }
4417 
4418 ////////////////////////////////////////////////////////////////////////////////
4419 /// Handle tree header request.
4420 
4422 {
4424 
4425  TDSet *dset;
4426  (*mess) >> dset;
4427  dset->Reset();
4428  TDSetElement *e = dset->Next();
4429  Long64_t entries = 0;
4430  TFile *f = 0;
4431  TTree *t = 0;
4432  if (!e) {
4433  PDB(kGlobal, 1) Info("HandleGetTreeHeader", "empty TDSet");
4434  } else {
4435  f = TFile::Open(e->GetFileName());
4436  t = 0;
4437  if (f) {
4438  t = (TTree*) f->Get(e->GetObjName());
4439  if (t) {
4440  t->SetMaxVirtualSize(0);
4441  t->DropBaskets();
4442  entries = t->GetEntries();
4443 
4444  // compute #entries in all the files
4445  while ((e = dset->Next()) != 0) {
4446  TFile *f1 = TFile::Open(e->GetFileName());
4447  if (f1) {
4448  TTree *t1 = (TTree*) f1->Get(e->GetObjName());
4449  if (t1) {
4450  entries += t1->GetEntries();
4451  delete t1;
4452  }
4453  delete f1;
4454  }
4455  }
4456  t->SetMaxEntryLoop(entries); // this field will hold the total number of entries ;)
4457  }
4458  }
4459  }
4460  if (t)
4461  answ << TString("Success") << t;
4462  else
4463  answ << TString("Failed") << t;
4464 
4465  fSocket->Send(answ);
4466 
4467  SafeDelete(t);
4468  SafeDelete(f);
4469 }
4470 
4471 
4472 //------------------------------------------------------------------------------
4473 
4475 
4476 ////////////////////////////////////////////////////////////////////////////////
4477 /// Process specified TDSet on PROOF. Runs on super master.
4478 /// The return value is -1 in case of error and TSelector::GetStatus() in
4479 /// in case of success.
4480 
4481 Long64_t TProofPlayerSuperMaster::Process(TDSet *dset, const char *selector_file,
4482  Option_t *option, Long64_t nentries,
4483  Long64_t first)
4484 {
4486  PDB(kGlobal,1) Info("Process","Enter");
4487 
4488  TProofSuperMaster *proof = dynamic_cast<TProofSuperMaster*>(GetProof());
4489  if (!proof) return -1;
4490 
4491  delete fOutput;
4492  fOutput = new THashList;
4493 
4495 
4496  if (!SendSelector(selector_file)) {
4497  Error("Process", "sending selector %s", selector_file);
4498  return -1;
4499  }
4500 
4501  TCleanup clean(this);
4502  SetupFeedback();
4503 
4504  if (proof->IsMaster()) {
4505 
4506  // make sure the DSet is valid
4507  if (!dset->ElementsValid()) {
4508  proof->ValidateDSet(dset);
4509  if (!dset->ElementsValid()) {
4510  Error("Process", "could not validate TDSet");
4511  return -1;
4512  }
4513  }
4514 
4515  TList msds;
4516  msds.SetOwner(); // This will delete TPairs
4517 
4518  TList keyholder; // List to clean up key part of the pairs
4519  keyholder.SetOwner();
4520  TList valueholder; // List to clean up value part of the pairs
4521  valueholder.SetOwner();
4522 
4523  // Construct msd list using the slaves
4524  TIter nextslave(proof->GetListOfActiveSlaves());
4525  while (TSlave *sl = dynamic_cast<TSlave*>(nextslave())) {
4526  TList *submasters = 0;
4527  TPair *msd = dynamic_cast<TPair*>(msds.FindObject(sl->GetMsd()));
4528  if (!msd) {
4529  submasters = new TList;
4530  submasters->SetName(sl->GetMsd());
4531  keyholder.Add(submasters);
4532  TList *setelements = new TSortedList(kSortDescending);
4533  setelements->SetName(TString(sl->GetMsd())+"_Elements");
4534  valueholder.Add(setelements);
4535  msds.Add(new TPair(submasters, setelements));
4536  } else {
4537  submasters = dynamic_cast<TList*>(msd->Key());
4538  }
4539  if (submasters) submasters->Add(sl);
4540  }
4541 
4542  // Add TDSetElements to msd list
4543  Long64_t cur = 0; //start of next element
4544  TIter nextelement(dset->GetListOfElements());
4545  while (TDSetElement *elem = dynamic_cast<TDSetElement*>(nextelement())) {
4546 
4547  if (elem->GetNum()<1) continue; // get rid of empty elements
4548 
4549  if (nentries !=-1 && cur>=first+nentries) {
4550  // we are done
4551  break;
4552  }
4553 
4554  if (cur+elem->GetNum()-1<first) {
4555  //element is before first requested entry
4556  cur+=elem->GetNum();
4557  continue;
4558  }
4559 
4560  if (cur<first) {
4561  //modify element to get proper start
4562  elem->SetNum(elem->GetNum()-(first-cur));
4563  elem->SetFirst(elem->GetFirst()+first-cur);
4564  cur=first;
4565  }
4566 
4567  if (nentries==-1 || cur+elem->GetNum()<=first+nentries) {
4568  cur+=elem->GetNum();
4569  } else {
4570  //modify element to get proper end
4571  elem->SetNum(first+nentries-cur);
4572  cur=first+nentries;
4573  }
4574 
4575  TPair *msd = dynamic_cast<TPair*>(msds.FindObject(elem->GetMsd()));
4576  if (!msd) {
4577  Error("Process", "data requires mass storage domain '%s'"
4578  " which is not accessible in this proof session",
4579  elem->GetMsd());
4580  return -1;
4581  } else {
4582  TList *elements = dynamic_cast<TList*>(msd->Value());
4583  if (elements) elements->Add(elem);
4584  }
4585  }
4586 
4587  TList usedmasters;
4588  TIter nextmsd(msds.MakeIterator());
4589  while (TPair *msd = dynamic_cast<TPair*>(nextmsd())) {
4590  TList *submasters = dynamic_cast<TList*>(msd->Key());
4591  TList *setelements = dynamic_cast<TList*>(msd->Value());
4592 
4593  // distribute elements over the masters
4594  Int_t nmasters = submasters ? submasters->GetSize() : -1;
4595  Int_t nelements = setelements ? setelements->GetSize() : -1;
4596  for (Int_t i=0; i<nmasters; i++) {
4597 
4598  Long64_t nent = 0;
4599  TDSet set(dset->GetType(), dset->GetObjName(),
4600  dset->GetDirectory());
4601  for (Int_t j = (i*nelements)/nmasters;
4602  j < ((i+1)*nelements)/nmasters;
4603  j++) {
4604  TDSetElement *elem = setelements ?
4605  dynamic_cast<TDSetElement*>(setelements->At(j)) : (TDSetElement *)0;
4606  if (elem) {
4607  set.Add(elem->GetFileName(), elem->GetObjName(),
4608  elem->GetDirectory(), elem->GetFirst(),
4609  elem->GetNum(), elem->GetMsd());
4610  nent += elem->GetNum();
4611  } else {
4612  Warning("Process", "not a TDSetElement object");
4613  }
4614  }
4615 
4616  if (set.GetListOfElements()->GetSize()>0) {
4617  TMessage mesg(kPROOF_PROCESS);
4618  TString fn(gSystem->BaseName(selector_file));
4619  TString opt = option;
4620  mesg << &set << fn << fInput << opt << Long64_t(-1) << Long64_t(0);
4621 
4622  TSlave *sl = dynamic_cast<TSlave*>(submasters->At(i));
4623  if (sl) {
4624  PDB(kGlobal,1) Info("Process",
4625  "Sending TDSet with %d elements to submaster %s",
4626  set.GetListOfElements()->GetSize(),
4627  sl->GetOrdinal());
4628  sl->GetSocket()->Send(mesg);
4629  usedmasters.Add(sl);
4630 
4631  // setup progress info
4632  fSlaves.AddLast(sl);
4633  fSlaveProgress.Set(fSlaveProgress.GetSize()+1);
4634  fSlaveProgress[fSlaveProgress.GetSize()-1] = 0;
4635  fSlaveTotals.Set(fSlaveTotals.GetSize()+1);
4636  fSlaveTotals[fSlaveTotals.GetSize()-1] = nent;
4637  fSlaveBytesRead.Set(fSlaveBytesRead.GetSize()+1);
4638  fSlaveBytesRead[fSlaveBytesRead.GetSize()-1] = 0;
4639  fSlaveInitTime.Set(fSlaveInitTime.GetSize()+1);
4640  fSlaveInitTime[fSlaveInitTime.GetSize()-1] = -1.;
4641  fSlaveProcTime.Set(fSlaveProcTime.GetSize()+1);
4642  fSlaveProcTime[fSlaveProcTime.GetSize()-1] = -1.;
4643  fSlaveEvtRti.Set(fSlaveEvtRti.GetSize()+1);
4644  fSlaveEvtRti[fSlaveEvtRti.GetSize()-1] = -1.;
4645  fSlaveMBRti.Set(fSlaveMBRti.GetSize()+1);
4646  fSlaveMBRti[fSlaveMBRti.GetSize()-1] = -1.;
4647  fSlaveActW.Set(fSlaveActW.GetSize()+1);
4648  fSlaveActW[fSlaveActW.GetSize()-1] = 0;
4649  fSlaveTotS.Set(fSlaveTotS.GetSize()+1);
4650  fSlaveTotS[fSlaveTotS.GetSize()-1] = 0;
4651  fSlaveEffS.Set(fSlaveEffS.GetSize()+1);
4652  fSlaveEffS[fSlaveEffS.GetSize()-1] = 0.;
4653  } else {
4654  Warning("Process", "not a TSlave object");
4655  }
4656  }
4657  }
4658  }
4659 
4660  if ( !IsClient() ) HandleTimer(0);
4661  PDB(kGlobal,1) Info("Process","Calling Collect");
4662  proof->Collect(&usedmasters);
4663  HandleTimer(0);
4664 
4665  }
4666 
4667  StopFeedback();
4668 
4669  PDB(kGlobal,1) Info("Process","Calling Merge Output");
4670  MergeOutput();
4671 
4672  TPerfStats::Stop();
4673 
4674  return 0;
4675 }
4676 
4677 ////////////////////////////////////////////////////////////////////////////////
4678 /// Report progress.
4679 
4681 {
4682  Int_t idx = fSlaves.IndexOf(sl);
4683  fSlaveProgress[idx] = processed;
4684  if (fSlaveTotals[idx] != total)
4685  Warning("Progress", "total events has changed for slave %s", sl->GetName());
4686  fSlaveTotals[idx] = total;
4687 
4688  Long64_t tot = 0;
4689  Int_t i;
4690  for (i = 0; i < fSlaveTotals.GetSize(); i++) tot += fSlaveTotals[i];
4691  Long64_t proc = 0;
4692  for (i = 0; i < fSlaveProgress.GetSize(); i++) proc += fSlaveProgress[i];
4693 
4694  Progress(tot, proc);
4695 }
4696 
4697 ////////////////////////////////////////////////////////////////////////////////
4698 /// Report progress.
4699 
4701  Long64_t processed, Long64_t bytesread,
4702  Float_t initTime, Float_t procTime,
4703  Float_t evtrti, Float_t mbrti)
4704 {
4705  PDB(kGlobal,2)
4706  Info("Progress","%s: %lld %lld %f %f %f %f", sl->GetName(),
4707  processed, bytesread, initTime, procTime, evtrti, mbrti);
4708 
4709  Int_t idx = fSlaves.IndexOf(sl);
4710  if (fSlaveTotals[idx] != total)
4711  Warning("Progress", "total events has changed for slave %s", sl->GetName());
4712  fSlaveTotals[idx] = total;
4713  fSlaveProgress[idx] = processed;
4714  fSlaveBytesRead[idx] = bytesread;
4715  fSlaveInitTime[idx] = (initTime > -1.) ? initTime : fSlaveInitTime[idx];
4716  fSlaveProcTime[idx] = (procTime > -1.) ? procTime : fSlaveProcTime[idx];
4717  fSlaveEvtRti[idx] = (evtrti > -1.) ? evtrti : fSlaveEvtRti[idx];
4718  fSlaveMBRti[idx] = (mbrti > -1.) ? mbrti : fSlaveMBRti[idx];
4719 
4720  Int_t i;
4721  Long64_t tot = 0;
4722  Long64_t proc = 0;
4723  Long64_t bytes = 0;
4724  Float_t init = -1.;
4725  Float_t ptime = -1.;
4726  Float_t erti = 0.;
4727  Float_t srti = 0.;
4728  Int_t nerti = 0;
4729  Int_t nsrti = 0;
4730  for (i = 0; i < fSlaveTotals.GetSize(); i++) {
4731  tot += fSlaveTotals[i];
4732  if (i < fSlaveProgress.GetSize())
4733  proc += fSlaveProgress[i];
4734  if (i < fSlaveBytesRead.GetSize())
4735  bytes += fSlaveBytesRead[i];
4736  if (i < fSlaveInitTime.GetSize())
4737  if (fSlaveInitTime[i] > -1. && (init < 0. || fSlaveInitTime[i] < init))
4738  init = fSlaveInitTime[i];
4739  if (i < fSlaveProcTime.GetSize())
4740  if (fSlaveProcTime[i] > -1. && (ptime < 0. || fSlaveProcTime[i] > ptime))
4741  ptime = fSlaveProcTime[i];
4742  if (i < fSlaveEvtRti.GetSize())
4743  if (fSlaveEvtRti[i] > -1.) {
4744  erti += fSlaveEvtRti[i];
4745  nerti++;
4746  }
4747  if (i < fSlaveMBRti.GetSize())
4748  if (fSlaveMBRti[i] > -1.) {
4749  srti += fSlaveMBRti[i];
4750  nsrti++;
4751  }
4752  }
4753  srti = (nsrti > 0) ? srti / nerti : 0.;
4754 
4755  Progress(tot, proc, bytes, init, ptime, erti, srti);
4756 }
4757 
4758 ////////////////////////////////////////////////////////////////////////////////
4759 /// Progress signal.
4760 
4762 {
4763  if (pi) {
4764  PDB(kGlobal,2)
4765  Info("Progress","%s: %lld %lld %lld %f %f %f %f %d %f", wrk->GetOrdinal(),
4766  pi->fTotal, pi->fProcessed, pi->fBytesRead,
4767  pi->fInitTime, pi->fProcTime, pi->fEvtRateI, pi->fMBRateI,
4768  pi->fActWorkers, pi->fEffSessions);
4769 
4770  Int_t idx = fSlaves.IndexOf(wrk);
4771  if (fSlaveTotals[idx] != pi->fTotal)
4772  Warning("Progress", "total events has changed for worker %s", wrk->GetName());
4773  fSlaveTotals[idx] = pi->fTotal;
4774  fSlaveProgress[idx] = pi->fProcessed;
4775  fSlaveBytesRead[idx] = pi->fBytesRead;
4776  fSlaveInitTime[idx] = (pi->fInitTime > -1.) ? pi->fInitTime : fSlaveInitTime[idx];
4777  fSlaveProcTime[idx] = (pi->fProcTime > -1.) ? pi->fProcTime : fSlaveProcTime[idx];
4778  fSlaveEvtRti[idx] = (pi->fEvtRateI > -1.) ? pi->fEvtRateI : fSlaveEvtRti[idx];
4779  fSlaveMBRti[idx] = (pi->fMBRateI > -1.) ? pi->fMBRateI : fSlaveMBRti[idx];
4780  fSlaveActW[idx] = (pi->fActWorkers > -1) ? pi->fActWorkers : fSlaveActW[idx];
4781  fSlaveTotS[idx] = (pi->fTotSessions > -1) ? pi->fTotSessions : fSlaveTotS[idx];
4782  fSlaveEffS[idx] = (pi->fEffSessions > -1.) ? pi->fEffSessions : fSlaveEffS[idx];
4783 
4784  Int_t i;
4785  Int_t nerti = 0;
4786  Int_t nsrti = 0;
4787  TProofProgressInfo pisum(0, 0, 0, -1., -1., 0., 0., 0, 0, 0.);
4788  for (i = 0; i < fSlaveTotals.GetSize(); i++) {
4789  pisum.fTotal += fSlaveTotals[i];
4790  if (i < fSlaveProgress.GetSize())
4791  pisum.fProcessed += fSlaveProgress[i];
4792  if (i < fSlaveBytesRead.GetSize())
4793  pisum.fBytesRead += fSlaveBytesRead[i];
4794  if (i < fSlaveInitTime.GetSize())
4795  if (fSlaveInitTime[i] > -1. && (pisum.fInitTime < 0. || fSlaveInitTime[i] < pisum.fInitTime))
4796  pisum.fInitTime = fSlaveInitTime[i];
4797  if (i < fSlaveProcTime.GetSize())
4798  if (fSlaveProcTime[i] > -1. && (pisum.fProcTime < 0. || fSlaveProcTime[i] > pisum.fProcTime))
4799  pisum.fProcTime = fSlaveProcTime[i];
4800  if (i < fSlaveEvtRti.GetSize())
4801  if (fSlaveEvtRti[i] > -1.) {
4802  pisum.fEvtRateI += fSlaveEvtRti[i];
4803  nerti++;
4804  }
4805  if (i < fSlaveMBRti.GetSize())
4806  if (fSlaveMBRti[i] > -1.) {
4807  pisum.fMBRateI += fSlaveMBRti[i];
4808  nsrti++;
4809  }
4810  if (i < fSlaveActW.GetSize())
4811  pisum.fActWorkers += fSlaveActW[i];
4812  if (i < fSlaveTotS.GetSize())
4813  if (fSlaveTotS[i] > -1 && (pisum.fTotSessions < 0. || fSlaveTotS[i] > pisum.fTotSessions))
4814  pisum.fTotSessions = fSlaveTotS[i];
4815  if (i < fSlaveEffS.GetSize())
4816  if (fSlaveEffS[i] > -1. && (pisum.fEffSessions < 0. || fSlaveEffS[i] > pisum.fEffSessions))
4817  pisum.fEffSessions = fSlaveEffS[i];
4818  }
4819  pisum.fMBRateI = (nsrti > 0) ? pisum.fMBRateI / nerti : 0.;
4820 
4821  Progress(&pisum);
4822  }
4823 }
4824 
4825 ////////////////////////////////////////////////////////////////////////////////
4826 /// Send progress and feedback to client.
4827 
4829 {
4830  if (fFeedbackTimer == 0) return kFALSE; // timer stopped already
4831 
4832  Int_t i;
4833  Long64_t tot = 0;
4834  Long64_t proc = 0;
4835  Long64_t bytes = 0;
4836  Float_t init = -1.;
4837  Float_t ptime = -1.;
4838  Float_t erti = 0.;
4839  Float_t srti = 0.;
4840  Int_t nerti = 0;
4841  Int_t nsrti = 0;
4842  for (i = 0; i < fSlaveTotals.GetSize(); i++) {
4843  tot += fSlaveTotals[i];
4844  if (i < fSlaveProgress.GetSize())
4845  proc += fSlaveProgress[i];
4846  if (i < fSlaveBytesRead.GetSize())
4847  bytes += fSlaveBytesRead[i];
4848  if (i < fSlaveInitTime.GetSize())
4849  if (fSlaveInitTime[i] > -1. && (init < 0. || fSlaveInitTime[i] < init))
4850  init = fSlaveInitTime[i];
4851  if (i < fSlaveProcTime.GetSize())
4852  if (fSlaveProcTime[i] > -1. && (ptime < 0. || fSlaveProcTime[i] > ptime))
4853  ptime = fSlaveProcTime[i];
4854  if (i < fSlaveEvtRti.GetSize())
4855  if (fSlaveEvtRti[i] > -1.) {
4856  erti += fSlaveEvtRti[i];
4857  nerti++;
4858  }
4859  if (i < fSlaveMBRti.GetSize())
4860  if (fSlaveMBRti[i] > -1.) {
4861  srti += fSlaveMBRti[i];
4862  nsrti++;
4863  }
4864  }
4865  erti = (nerti > 0) ? erti / nerti : 0.;
4866  srti = (nsrti > 0) ? srti / nerti : 0.;
4867 
4869  if (gProofServ->GetProtocol() > 25) {
4870  // Fill the message now
4871  TProofProgressInfo pi(tot, proc, bytes, init, ptime,
4872  erti, srti, -1,
4874  m << &pi;
4875  } else {
4876 
4877  m << tot << proc << bytes << init << ptime << erti << srti;
4878  }
4879 
4880  // send message to client;
4881  gProofServ->GetSocket()->Send(m);
4882 
4883  if (fReturnFeedback)
4885  else
4886  return kFALSE;
4887 }
4888 
4889 ////////////////////////////////////////////////////////////////////////////////
4890 /// Setup reporting of feedback objects and progress messages.
4891 
4893 {
4894  if (IsClient()) return; // Client does not need timer
4895 
4897 
4898  if (fFeedbackTimer) {
4899  fReturnFeedback = kTRUE;
4900  return;
4901  } else {
4902  fReturnFeedback = kFALSE;
4903  }
4904 
4905  // setup the timer for progress message
4907  fFeedbackPeriod = 2000;
4908  TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
4909  fFeedbackTimer = new TTimer;
4910  fFeedbackTimer->SetObject(this);
4912 }
const char * GetHost() const
Definition: TUrl.h:76
const char * GetName() const
Returns name of object.
Definition: TObjString.h:42
Int_t fTotSessions
Definition: TProof.h:201
virtual void SetMerging(Bool_t=kTRUE)
Definition: TProofPlayer.h:233
const char * GetSessionDir() const
Definition: TProofServ.h:261
Bool_t IsRetrieve() const
virtual Bool_t cd(const char *path=0)
Change current directory to "this" directory.
TList * GetOutputList()
Definition: TQueryResult.h:139
virtual const char * BaseName(const char *pathname)
Base name of a file name. Base name of /user/root is root.
Definition: TSystem.cxx:929
virtual Int_t GetEntries() const
Definition: TCollection.h:92
Long64_t Process(const char *selector, Long64_t nentries=-1, Option_t *option="")
Process the specified TSelector file &#39;nentries&#39; times.
virtual Int_t Write(const char *name=0, Int_t option=0, Int_t bufsize=0)
Write this object to the current directory.
Definition: TObject.cxx:830
virtual const char * GetTitle() const
Returns title of object.
Definition: TNamed.h:52
virtual Bool_t AccessPathName(const char *path, EAccessMode mode=kFileExists)
Returns FALSE if one can access a file using the specified access mode.
Definition: TSystem.cxx:1265
Long64_t fTotal
Definition: TProof.h:193
static FILE * SetErrorHandlerFile(FILE *ferr)
Set the file stream where to log (default stderr).
Ssiz_t Last(char c) const
Find last occurrence of a character c.
Definition: TString.cxx:865
virtual TDirectory * mkdir(const char *name, const char *title="")
Create a sub-directory and return a pointer to the created directory.
ErrorHandlerFunc_t SetErrorHandler(ErrorHandlerFunc_t newhandler)
Set an errorhandler function. Returns the old handler.
Definition: TError.cxx:106
virtual int Version() const
Definition: TSelector.h:60
const char * GetOutputFileName() const
virtual TString SplitAclicMode(const char *filename, TString &mode, TString &args, TString &io) const
This method split a filename of the form: ~~~ {.cpp} [path/]macro.C[+|++[k|f|g|O|c|s|d|v|-]][(args)]...
Definition: TSystem.cxx:4081
void SetMerging(Bool_t on=kTRUE)
Switch on/off merge timer.
An array of TObjects.
Definition: TObjArray.h:39
const char * GetOrdinal() const
Definition: TSlave.h:135
TProofProgressStatus * fProgressStatus
Definition: TProofPlayer.h:95
TFileInfo * GetFileInfo(const char *type="TTree")
Return the content of this element in the form of a TFileInfo.
Definition: TDSet.cxx:212
float xmin
Definition: THbookFile.cxx:93
Bool_t IsDraw() const
Definition: TQueryResult.h:152
UInt_t Convert(Bool_t toGMT=kFALSE) const
Convert fDatime from TDatime format to the standard time_t format.
Definition: TDatime.cxx:181
void ValidateDSet(TDSet *dset)
Validate a TDSet.
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
Definition: TStopwatch.cxx:110
Internal class steering processing in PROOF.
Definition: TProofPlayer.h:78
void SetWorkerOrdinal(const char *ordinal)
long long Long64_t
Definition: RtypesCore.h:69
EExitStatus fExitStatus
status of query in progress
Definition: TProofPlayer.h:93
void SetNumMergers(Int_t nmergers)
Definition: TQueryResult.h:109
TSocket * GetSocket() const
Definition: TSlave.h:138
virtual TDSetElement * Next(Long64_t totalEntries=-1)
Returns next TDSetElement.
Definition: TDSet.cxx:394
Int_t fPhysRam
Definition: TSystem.h:169
Int_t AssertSelector(const char *selector_file)
Make sure that a valid selector object Return -1 in case of problems, 0 otherwise.
R__EXTERN Int_t gErrorIgnoreLevel
Definition: TError.h:107
void StoreOutput(TList *out)
Store output list (may not be used in this class).
virtual const char * WorkingDirectory()
Return working directory.
Definition: TSystem.cxx:866
virtual const char * GetName() const
Return name of this collection.
Int_t GetLearnEntries()
Return the number of entries in the learning phase.
virtual Bool_t InheritsFrom(const char *classname) const
Returns kTRUE if object inherits from class "classname".
Definition: TObject.cxx:488
static TMD5 * FileChecksum(const char *file)
Returns checksum of specified file.
Definition: TMD5.cxx:474
Long64_t GetEventsProcessed() const
Definition: TProofPlayer.h:224
Bool_t IsFinalized() const
Definition: TQueryResult.h:153
virtual Bool_t SendProcessingProgress(Double_t, Double_t, Bool_t=kFALSE)
void SetMemValues(Long_t vmem=-1, Long_t rmem=-1, Bool_t master=kFALSE)
Set max memory values.
Definition: TStatus.cxx:159
Ssiz_t Length() const
Definition: TString.h:390
Long_t fFeedbackPeriod
timer for sending intermediate results
Definition: TProofPlayer.h:90
const double pi
virtual TList * GetOutputList() const
Definition: TSelector.h:76
void Print(Option_t *option="") const
Dump the class content.
Collectable string class.
Definition: TObjString.h:32
#define kPEX_ABORTED
float Float_t
Definition: RtypesCore.h:53
virtual void ls(Option_t *option="") const
List (ls) all objects in this collection.
Long64_t fBytesRead
Definition: TProof.h:195
virtual void SetDirectory(TDirectory *dir)
By default when an histogram is created, it is added to the list of histogram objects in the current ...
Definition: TH1.cxx:8008
void SetDir(const char *dir, Bool_t raw=kFALSE)
TProofLockPath * GetCacheLock()
Definition: TProofServ.h:295
return c
const char Option_t
Definition: RtypesCore.h:62
float ymin
Definition: THbookFile.cxx:93
const char * GetObjName() const
Definition: TDSet.h:229
TObject * FindObject(const char *name) const
Find object using its name.
Definition: THashList.cxx:213
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:520
This class represents a WWW compatible URL.
Definition: TUrl.h:41
virtual Bool_t ProcessCut(Long64_t)
Definition: TSelector.cxx:262
TString & ReplaceAll(const TString &s1, const TString &s2)
Definition: TString.h:635
const char * GetDataDir() const
Definition: TProofServ.h:264
void SetWriteV3(Bool_t on=kTRUE)
Set/Reset the &#39;OldStreamer&#39; bit in this instance and its elements.
Definition: TDSet.cxx:1856
virtual Int_t GetDimension() const
Definition: TH1.h:287
static void FilterLocalroot(TString &path, const char *url="root://dum/")
If &#39;path&#39; is local and &#39;dsrv&#39; is Xrootd, apply &#39;path.Localroot&#39; settings, if any. ...
TMacro * GetSelecHdr() const
Definition: TQueryResult.h:135
TQueryResult * GetQueryResult(const char *ref)
Get query result instances referenced &#39;ref&#39; from the list of results.
virtual Bool_t JoinProcess(TList *workers)
Not implemented: meaningful only in the remote player. Returns kFALSE.
Bool_t CheckMemUsage(Long64_t &mfreq, Bool_t &w80r, Bool_t &w80v, TString &wmsg)
Check the memory usage, if requested.
This class implements a data set to be used for PROOF processing.
Definition: TDSet.h:153
virtual Long64_t Finalize(Bool_t force=kFALSE, Bool_t sync=kFALSE)
Finalize query (may not be used in this class).
Bool_t fSaveResultsPerPacket
Definition: TProofPlayer.h:119
TProofPlayer(TProof *proof=0)
Default ctor.
virtual void SetName(const char *name)
Set the name of the TNamed.
Definition: TNamed.cxx:131
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
void StopProcess(Bool_t abort, Int_t timeout=-1)
Stop process after this event.
void MayNotUse(const char *method) const
Use this method to signal that a method (defined in a base class) may not be called in a derived clas...
Definition: TObject.cxx:978
const char * GetProtocol() const
Definition: TUrl.h:73
TH1 * h
Definition: legend2.C:5
TQueryResult * fPreviousQuery
Definition: TProofPlayer.h:103
TObject * Key() const
Definition: TMap.h:124
void Add(const char *mesg)
Add an error message.
Definition: TStatus.cxx:46
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:899
Long64_t GetBytesRead() const
virtual Bool_t Merge(Bool_t=kTRUE)
Merge the files.
void AddOutput(TList *out)
Incorporate output list (may not be used in this class).
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format...
Definition: TFile.h:50
void SetupFeedback()
Setup reporting of feedback objects.
static Long_t GetVirtMemMax()
VirtMemMax getter.
const char * GetOptions() const
Definition: TQueryResult.h:127
virtual int MakeDirectory(const char *name)
Make a directory.
Definition: TSystem.cxx:822
virtual void AddFirst(TObject *obj)
Add object at the beginning of the list.
Definition: TList.cxx:93
virtual void MergeOutput(Bool_t savememvalues=kFALSE)
Merge objects in output the lists.
void SetLastUpdate(Double_t updtTime=0)
Update time stamp either with the passed value (if > 0) or with the current time. ...
virtual TObject * Get(const char *namecycle)
Return pointer to object identified by namecycle.
virtual Bool_t ChangeDirectory(const char *path)
Change directory.
Definition: TSystem.cxx:857
void RemoveQueryResult(const char *ref)
Remove all query result instances referenced &#39;ref&#39; from the list of results.
TTimer * fProcTimeTimer
Definition: TProofPlayer.h:112
Bool_t HandleTimer(TTimer *timer)
Send progress and feedback to client.
#define gROOT
Definition: TROOT.h:364
Int_t AdoptFile(TFile *f)
Adopt a file already open.
TDSetElement * GetNextPacket(TSlave *slave, TMessage *r)
Get next packet for specified slave.
Float_t fEvtRateI
Definition: TProof.h:198
const char * GetFileName() const
UInt_t GetTypeOpt() const
virtual int Load(const char *module, const char *entry="", Bool_t system=kFALSE)
Load a shared library.
Definition: TSystem.cxx:1818
Implement Tree drawing using PROOF.
Definition: TProofDraw.h:57
virtual const char * TempDirectory() const
Return a user configured or systemwide directory to create temporary files in.
Definition: TSystem.cxx:1447
Bool_t IsZombie() const
Definition: TObject.h:127
Basic string class.
Definition: TString.h:137
virtual Int_t GetNextPacket(Long64_t &first, Long64_t &num)=0
void SetReadCalls(Long64_t readCalls)
int Int_t
Definition: RtypesCore.h:41
virtual const char * DirName(const char *pathname)
Return the directory name in pathname.
Definition: TSystem.cxx:997
bool Bool_t
Definition: RtypesCore.h:59
const Bool_t kFALSE
Definition: Rtypes.h:92
#define ENDTRY
Definition: TException.h:73
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:497
virtual char * Which(const char *search, const char *file, EAccessMode mode=kFileExists)
Find location of file in a search path.
Definition: TSystem.cxx:1511
Bool_t HistoSameAxis(TH1 *h0, TH1 *h1)
Return kTRUE is the histograms &#39;h0&#39; and &#39;h1&#39; have the same binning and ranges on the axis (i...
virtual void StopProcess(Bool_t abort)
Set flag to stop the process.
Definition: TEventIter.cxx:142
virtual Long64_t GetCacheSize()=0
virtual Int_t GetNbinsX() const
Definition: TH1.h:301
virtual void InvalidatePacket()
Invalidated the current packet (if any) by setting the TDSetElement::kCorrupted bit.
Definition: TEventIter.cxx:134
void SetOption(Option_t *option)
Definition: TDrawFeedback.h:56
const char * Class
Definition: TXMLSetup.cxx:64
void SetRecvTime(Float_t recvtime)
Definition: TQueryResult.h:107
TMacro * GetSelecImp() const
Definition: TQueryResult.h:136
const char * GetTopSessionTag() const
Definition: TProofServ.h:260
virtual Double_t GetEntries() const
Return the current number of entries.
Definition: TH1.cxx:4055
TLatex * t1
Definition: textangle.C:20
TQueryResult * fQuery
Definition: TProofPlayer.h:102
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
Definition: TString.h:558
void SetDispatchTimer(Bool_t on=kTRUE)
Enable/disable the timer to dispatch pening events while processing.
Implementation of TProof controlling PROOF federated clusters.
TString & Insert(Ssiz_t pos, const char *s)
Definition: TString.h:592
virtual void SetInputList(TList *input)
Definition: TSelector.h:73
Bool_t GetValid() const
Definition: TDSet.h:121
Short_t Abs(Short_t d)
Definition: TMathBase.h:110
virtual TObject * At(Int_t idx) const
Returns the object at position idx. Returns 0 if idx is out of range.
Definition: TList.cxx:311
Int_t fActWorkers
Definition: TProof.h:200
static Float_t GetMemHWM()
MemHWM getter.
virtual int GetProcInfo(ProcInfo_t *info) const
Returns cpu and memory used by this process into the ProcInfo_t structure.
Definition: TSystem.cxx:2443
const Int_t kBreak
Definition: TError.h:42
void MapOutputListToDataMembers() const
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:739
overwrite existing object with same name
Definition: TObject.h:84
static TOutputListSelectorDataMap * FindInList(TCollection *coll)
Find a TOutputListSelectorDataMap in a collection.
Int_t SavePartialResults(Bool_t queryend=kFALSE, Bool_t force=kFALSE)
Save the partial results of this query to a dedicated file under the user data directory.
TList * fInput
Definition: TProofPlayer.h:84
TString & Replace(Ssiz_t pos, Ssiz_t n, const char *s)
Definition: TString.h:625
static const char * GetMacroPath()
Get macro search path. Static utility function.
Definition: TROOT.cxx:2559
Double_t GetProcTime() const
virtual void SetValue(const char *name, const char *value, EEnvLevel level=kEnvChange, const char *type=0)
Set the value of a resource or create a new resource.
Definition: TEnv.cxx:751
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=1, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:3871
const char * GetObjName() const
Definition: TDSet.h:122
This class defines a UUID (Universally Unique IDentifier), also known as GUIDs (Globally Unique IDent...
Definition: TUUID.h:44
virtual Int_t GetN() const
Definition: TEventList.h:58
TList * fAutoBins
Definition: TProofPlayer.h:81
Long64_t GetNum() const
Definition: TDSet.h:116
const char * Data() const
Definition: TString.h:349
void SetStopTimer(Bool_t on=kTRUE, Bool_t abort=kFALSE, Int_t timeout=0)
Enable/disable the timer to stop/abort processing.
Manages an element of a TDSet.
Definition: TDSet.h:68
virtual TObject * ReadObject(const TClass *cl)
Read object from I/O buffer.
static void SetLastEntry(Long64_t lastentry)
Set the last entry before exception.
const char * GetDirectory() const
Definition: TDSet.h:230
static struct mg_connection * fc(struct mg_context *ctx)
Definition: civetweb.c:1956
virtual void ProcessFill(Long64_t)
Definition: TSelector.cxx:279
#define SafeDelete(p)
Definition: RConfig.h:499
virtual int Unlink(const char *name)
Unlink, i.e. remove, a file.
Definition: TSystem.cxx:1346
virtual TObject * Clone(const char *newname="") const
Make a clone of an object using the Streamer facility.
Definition: TObject.cxx:204
void Stop()
Stop the stopwatch.
Definition: TStopwatch.cxx:77
void Print(Option_t *opt="") const
Print query content. Use opt = "F" for a full listing.
virtual void SetAutoFlush(Long64_t autof=-30000000)
This function may be called at the start of a program to change the default value for fAutoFlush...
Definition: TTree.cxx:7591
static TString Format(const char *fmt,...)
Static method which formats a string using a printf style format descriptor and return a TString...
Definition: TString.cxx:2335
#define PDB(mask, level)
Definition: TProofDebug.h:58
Long64_t fReadBytesRun
Definition: TProofPlayer.h:97
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
Definition: THashList.h:36
TEventIter * fEvIter
period (ms) for sending intermediate results
Definition: TProofPlayer.h:91
Float_t GetEffSessions() const
Definition: TProofServ.h:278
This code implements the MD5 message-digest algorithm.
Definition: TMD5.h:46
const char * GetMsd() const
Definition: TDSet.h:119
This class holds the status of an ongoing operation and collects error messages.
Definition: TStatus.h:39
The TNamed class is the base class for all named ROOT classes.
Definition: TNamed.h:33
virtual Bool_t IsEmpty() const
Definition: TCollection.h:99
void IncEntries(Long64_t entries=1)
static Long64_t GetFileBytesRead()
Static function returning the total number of bytes read from all files.
Definition: TFile.cxx:4339
void StopFeedback()
Stop reporting of feedback objects.
Int_t GetQuerySeqNum() const
Definition: TProofServ.h:274
static EFileType GetType(const char *name, Option_t *option="", TString *prefix=0)
Resolve the file type as a function of the protocol field in &#39;name&#39;.
Definition: TFile.cxx:4586
virtual void Start(Long_t milliSec=-1, Bool_t singleShot=kFALSE)
Starts the timer with a milliSec timeout.
Definition: TTimer.cxx:211
void SetMergeTime(Float_t mergetime)
Definition: TQueryResult.h:106
void AddQueryResult(TQueryResult *q)
Add query result to the list, making sure that there are no duplicates.
R__EXTERN TVirtualMonitoringWriter * gMonitoringWriter
virtual Bool_t HandleTimer(TTimer *timer)
Execute action in response of a timer timing out.
Definition: TObject.cxx:470
virtual const char * Getenv(const char *env)
Get environment variable.
Definition: TSystem.cxx:1627
void DeleteDrawFeedback(TDrawFeedback *f)
Delete draw feedback object.
TList * GetListOfElements() const
Definition: TDSet.h:231
void SetDrawFeedbackOption(TDrawFeedback *f, Option_t *opt)
Set draw feedback option.
A sorted doubly linked list.
Definition: TSortedList.h:30
void Info(const char *location, const char *msgfmt,...)
TList * MergeFeedback()
Merge feedback lists.
Long64_t fProcessedRun
Read calls in this run.
Definition: TProofPlayer.h:99
virtual Long64_t Process(TDSet *set, const char *selector, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)
Process specified TDSet on PROOF.
TProofProgressStatus * GetProgressStatus() const
Definition: TProofPlayer.h:242
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Definition: TMap.cxx:235
void Validate()
Validate the TDSet by opening files.
Definition: TDSet.cxx:1571
Int_t Atoi() const
Return integer value of string.
Definition: TString.cxx:1965
Int_t fMaxDrawQueries
Definition: TProofPlayer.h:105
virtual void Begin(TTree *)
Definition: TSelector.h:62
void SetOutputFileName(const char *name)
Set the name of the output file; in the form of an Url.
static void GetMemValues(Long_t &vmax, Long_t &rmax)
Get memory usage.
Definition: TPerfStats.cxx:790
virtual void SetupFeedback()
Set up feedback (may not be used in this class).
TClass * fSelectorClass
kTRUE when fSelector has been created locally
Definition: TProofPlayer.h:88
Long64_t GetMsgSizeHWM() const
Definition: TProofServ.h:288
static Long_t GetResMemMax()
ResMemMax getter.
virtual void SetOutputList(TList *out, Bool_t adopt=kTRUE)
Set / change the output list.
void SetBytesRead(Long64_t bytesRead)
TList * GetOutputList() const
Get output list.
Method or function calling interface.
Definition: TMethodCall.h:41
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:925
void SetCurrentQuery(TQueryResult *q)
Set current query and save previous value.
TH1F * h1
Definition: legend1.C:5
virtual Long64_t GetEntryNumber(Long64_t)
Definition: TEventIter.cxx:230
const Bool_t kSortDescending
Definition: TList.h:41
TList * GetPackets()
Definition: TEventIter.h:93
A container class for query results.
Definition: TQueryResult.h:44
Double_t GetXmin() const
Definition: TAxis.h:139
TDSetElement * Current() const
Definition: TDSet.h:238
#define kPEX_STOPPED
virtual Bool_t OutputFile(const char *url, Bool_t force)
Open merger output file.
THashList * fOutput
Definition: TProofPlayer.h:85
static void SetLimitsFinder(THLimitsFinder *finder)
This static function can be used to specify a finder derived from THLimitsFinder. ...
static void AutoBinFunc(TString &key, Double_t &xmin, Double_t &xmax, Double_t &ymin, Double_t &ymax, Double_t &zmin, Double_t &zmax)
Get bining information.
static void Start(TList *input, TList *output)
Initialize PROOF statistics run.
Definition: TPerfStats.cxx:744
virtual void SetProcessInfo(Long64_t ent, Float_t cpu=0., Long64_t siz=-1, Float_t inittime=0., Float_t proctime=0.)
Set processing info.
void SetInitTime()
Set init time.
TTimer * fFeedbackTimer
class of the latest selector
Definition: TProofPlayer.h:89
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition: TProof.cxx:9890
Long64_t GetFirst() const
Definition: TDSet.h:114
void SetFileName(const char *name)
Float_t fProcTime
Definition: TProof.h:197
void Feedback(TList *objs)
Feedback signal.
Int_t GetBufferSize() const
Definition: TH1.h:242
TFileMerger * GetFileMerger(Bool_t local=kFALSE)
Get instance of the file merger to be used in &#39;merge&#39; mode.
TObject * FindObject(const char *keyname) const
Check if a (key,value) pair exists with keyname as name of the key.
Definition: TMap.cxx:214
A doubly linked list.
Definition: TList.h:47
TStopwatch * fProcTime
Definition: TProofPlayer.h:113
void ClearInput()
Clear input list.
void UpdateAutoBin(const char *name, Double_t &xmin, Double_t &xmax, Double_t &ymin, Double_t &ymax, Double_t &zmin, Double_t &zmax)
Update automatic binning parameters for given object "name".
static Float_t GetMemStop()
MemStop getter.
virtual TList * GetInputList() const
Definition: TSelector.h:75
#define CATCH(n)
Definition: TException.h:67
Int_t GetPort() const
Definition: TUrl.h:87
static void Setup(TList *input)
Setup the PROOF input list with requested statistics and tracing options.
Definition: TPerfStats.cxx:727
virtual Bool_t AddFile(TFile *source, Bool_t own, Bool_t cpProgress)
Add the TFile to this file merger and give ownership of the TFile to this object (unless kFALSE is re...
void SendAsynMessage(const char *msg, Bool_t lf=kTRUE)
Send an asychronous message to the master / client .
void HandleGetTreeHeader(TMessage *mess)
Handle tree header request.
const char * GetName() const
Returns name of object.
Definition: TSlave.h:128
TSocket * GetSocket() const
Definition: TProofServ.h:271
EFileType
File type.
Definition: TFile.h:172
void Clear(Option_t *option="")
Remove all objects from the list.
Definition: THashList.cxx:168
float ymax
Definition: THbookFile.cxx:93
TDSetElement * GetNextPacket(TSlave *slave, TMessage *r)
Get next packet (may not be used in this class).
static Bool_t gAbort
const char * GetFileName() const
Definition: TDSet.h:113
Float_t fInitTime
Definition: TProof.h:196
Int_t fCpus
Definition: TSystem.h:165
TObject * Value() const
Definition: TMap.h:125
const char * GetPrefix() const
Definition: TProofServ.h:290
Bool_t EndsWith(const char *pat, ECaseCompare cmp=kExact) const
Return true if string ends with the specified string.
Definition: TString.cxx:2221
TRandom2 r(17)
Class to manage histogram axis.
Definition: TAxis.h:36
R__EXTERN TSystem * gSystem
Definition: TSystem.h:549
Int_t GetNbins() const
Definition: TAxis.h:127
Long64_t fReadCallsRun
Bytes read in this run.
Definition: TProofPlayer.h:98
static void GetLocalServer(TString &dsrv)
Extract LOCALDATASERVER info in &#39;dsrv&#39;.
virtual void Abort(const char *why, EAbort what=kAbortProcess)
Abort processing.
Definition: TSelector.cxx:116
const char * GetLibList() const
Definition: TQueryResult.h:137
tomato 1-D histogram with an int per channel (see TH1 documentation)}
Definition: TH1.h:534
static Bool_t IsStandardDraw(const char *selec)
Find out if this is a standard selection used for Draw actions (either TSelectorDraw, TProofDraw or deriving from them).
Definition: TSelector.cxx:237
TDirectory * GetDirectory() const
Definition: TTree.h:391
This class provides file copy and merging services.
Definition: TFileMerger.h:30
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Definition: TEnv.cxx:496
virtual const char * ClassName() const
Returns name of class to which the object belongs.
Definition: TObject.cxx:188
void AddInput(TObject *inp)
Add object to input list.
virtual void StopFeedback()
Stop feedback (may not be used in this class).
void SetupFeedback()
Setup reporting of feedback objects and progress messages.
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition: TList.cxx:675
virtual void SlaveBegin(TTree *)
Definition: TSelector.h:63
Long_t fMemVirtual
Definition: TSystem.h:207
TObject * Next()
Definition: TCollection.h:158
virtual TEnvRec * Lookup(const char *n)
Loop over all resource records and return the one with name.
Definition: TEnv.cxx:552
TObject * Remove(TObject *obj)
Remove object from the list.
Definition: THashList.cxx:285
void AddOutput(TList *out)
Incorporate the content of the received output list &#39;out&#39; into the final output list fOutput...
void SetMerged(Bool_t merged=kTRUE)
Long_t fSaveMemThreshold
Definition: TProofPlayer.h:117
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition: TString.cxx:2322
virtual Bool_t JoinProcess(TList *workers)
Prepares the given list of new workers to join a progressing process.
TList * GetInputList()
Definition: TQueryResult.h:128
Bool_t TestBit(UInt_t f) const
Definition: TObject.h:159
TMarker * m
Definition: textangle.C:8
Bool_t SetDataMembers(TSelector *sel) const
Given an output list, set the data members of a TSelector.
char * Form(const char *fmt,...)
Class to steer the merging of files produced on the workers.
Long64_t Process(TDSet *set, const char *selector, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)
Process specified TDSet on PROOF worker.
virtual void SetInitTime()
Definition: TProofPlayer.h:231
virtual Bool_t SendProcessingStatus(const char *, Bool_t=kFALSE)
const char * AsString() const
Return UUID as string. Copy string immediately since it will be reused.
Definition: TUUID.cxx:537
void SaveSource(FILE *fp)
Save macro source in file pointer fp.
Definition: TMacro.cxx:380
virtual Int_t GetNbinsZ() const
Definition: TH1.h:303
A TEventList object is a list of selected events (entries) in a TTree.
Definition: TEventList.h:33
Handles synchronous and a-synchronous timer events.
Definition: TTimer.h:57
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:51
void SetLastMergingMsg(TObject *obj)
Set the message to be notified in case of exception.
Long64_t fProcessed
Definition: TProof.h:194
virtual void SetFinalized()
Definition: TQueryResult.h:99
Int_t GetProtocol() const
Definition: TProofServ.h:266
The ROOT global object gROOT contains a list of all defined classes.
Definition: TClass.h:81
virtual Bool_t HandleTimer(TTimer *timer)
Send feedback objects to client.
TAxis * GetYaxis()
Definition: TH1.h:325
float xmax
Definition: THbookFile.cxx:93
Int_t ReinitSelector(TQueryResult *qr)
Reinitialize fSelector using the selector files in the query result.
static Int_t GetFileReadCalls()
Static function returning the total number of read calls from all files.
Definition: TFile.cxx:4356
const char * GetCacheDir() const
Definition: TProofServ.h:262
write collection with single key
Definition: TObject.h:83
Bool_t IsNull() const
Definition: TString.h:387
void SetName(const char *name)
Definition: TCollection.h:116
void Reset(Detail::TBranchProxy *x)
void HandleRecvHisto(TMessage *mess)
Receive histo from slave.
Float_t fEffSessions
Definition: TProof.h:202
Long64_t GetEntries() const
TList * fQueryResults
Events processed in this run.
Definition: TProofPlayer.h:101
Long64_t Finalize(Bool_t force=kFALSE, Bool_t sync=kFALSE)
Finalize query (may not be used in this class).
TStatus * fSelStatus
iterator on events or objects
Definition: TProofPlayer.h:92
void FeedBackCanvas(const char *name, Bool_t create)
Create/destroy a named canvas for feedback.
const Double_t * GetBuffer() const
Definition: TH1.h:243
virtual void Terminate()
Definition: TSelector.h:78
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap...
Definition: TMap.cxx:150
#define Printf
Definition: TGeoToOCC.h:18
Bool_t Matches(const char *ref)
Return TRUE if reference ref matches.
Int_t AddOutputObject(TObject *obj)
Incorporate the received object &#39;obj&#39; into the output list fOutput.
TDatime GetStartTime() const
Definition: TQueryResult.h:125
void InitWithPrototype(TClass *cl, const char *method, const char *proto, Bool_t objectIsConst=kFALSE, ROOT::EFunctionMatchMode mode=ROOT::kConversionMatch)
Initialize the method invocation environment.
virtual void SlaveTerminate()
Definition: TSelector.h:77
const char * GetUrl(Bool_t withDeflt=kFALSE) const
Return full URL.
Definition: TUrl.cxx:387
Long64_t Merge(TCollection *list)
Merge objects from the list into this object.
virtual void Print(Option_t *option="") const
Default print for collections, calls Print(option, 1).
static TEventIter * Create(TDSet *dset, TSelector *sel, Long64_t first, Long64_t num)
Create and instance of the appropriate iterator.
Definition: TEventIter.cxx:150
void UpdateProgressInfo()
Update fProgressStatus.
static TSelector * GetSelector(const char *filename)
The code in filename is loaded (interpreted or compiled, see below), filename must contain a valid cl...
Definition: TSelector.cxx:142
static unsigned int total
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
Definition: TList.cxx:581
virtual Int_t RedirectOutput(const char *name, const char *mode="a", RedirectHandle_t *h=0)
Redirect standard output (stdout, stderr) to the specified file.
Definition: TSystem.cxx:1677
void SetHost(const char *host)
Definition: TUrl.h:93
TString & Remove(Ssiz_t pos)
Definition: TString.h:616
long Long_t
Definition: RtypesCore.h:50
int Ssiz_t
Definition: RtypesCore.h:63
Bool_t IsClient() const
Definition: TProofPlayer.h:220
The packetizer is a load balancing object created for each query.
Bool_t IsClient() const
Is the player running on the client?
TString fOutputFilePath
Definition: TProofPlayer.h:115
Class used by TMap to store (key,value) pairs.
Definition: TMap.h:106
Int_t GetBufferLength() const
Definition: TH1.h:241
TFile * fOutputFile
Definition: TProofPlayer.h:116
TList * GetListOfActiveSlaves() const
Definition: TProof.h:753
virtual const char * GetIncludePath()
Get the list of include path.
Definition: TSystem.cxx:3815
virtual Int_t GetSize() const
Definition: TCollection.h:95
virtual void SetDirectory(TDirectory *dir)
Change the tree&#39;s directory.
Definition: TTree.cxx:8285
virtual void MergeOutput(Bool_t savememvalues=kFALSE)
Merge output (may not be used in this class).
#define ClassImp(name)
Definition: Rtypes.h:279
void RestorePreviousQuery()
Definition: TProofPlayer.h:187
double f(double x)
TSelector * fSelector
Definition: TProofPlayer.h:86
void ResetParam()
Reset parameter list. To be used before the first call the SetParam().
TList * GetMergeList() const
Definition: TFileMerger.h:81
static Int_t init()
virtual Func_t DynFindSymbol(const char *module, const char *entry)
Find specific entry point in specified library.
Definition: TSystem.cxx:1982
virtual const char * GetName() const
Returns name of object.
Definition: TObject.cxx:416
double Double_t
Definition: RtypesCore.h:55
virtual const char * HostName()
Return the system&#39;s host name.
Definition: TSystem.cxx:308
void RedirectOutput(Bool_t on=kTRUE)
Control output redirection to TProof::fLogFileW.
char * DynamicPathName(const char *lib, Bool_t quiet=kFALSE)
Find a dynamic library called lib using the system search paths.
Definition: TSystem.cxx:1958
Int_t Lock()
Locks the directory.
virtual EAbort GetAbort() const
Definition: TSelector.h:80
TTimer * fDispatchTimer
Definition: TProofPlayer.h:110
#define TRY
Definition: TException.h:60
Bool_t IsMaster() const
Definition: TProofServ.h:307
Describe directory structure in memory.
Definition: TDirectory.h:44
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:44
int type
Definition: TGX11.cxx:120
R__EXTERN TEnv * gEnv
Definition: TEnv.h:174
void HandleGetTreeHeader(TMessage *mess)
Handle tree header request.
const char * GetOrdinal() const
Definition: TProofServ.h:267
TNamed()
Definition: TNamed.h:40
This class controls a Parallel ROOT Facility, PROOF, cluster.
Definition: TProof.h:346
EExitStatus GetExitStatus() const
Definition: TProofPlayer.h:223
Double_t GetXmax() const
Definition: TAxis.h:140
static THashList * fgDrawInputPars
Definition: TProofPlayer.h:121
int nentries
Definition: THbookFile.cxx:89
Set the selector&#39;s data members to the corresponding elements of the output list. ...
The TH1 histogram class.
Definition: TH1.h:80
virtual TMD5 * Checksum()
Returns checksum of the current content.
Definition: TMacro.cxx:193
Class to find axis limits and synchronize them between workers.
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
Definition: TRolke.cxx:630
Bool_t IsMaster() const
Definition: TProof.h:966
virtual void Reset()
Reset or initialize access to the elements.
Definition: TDSet.cxx:1350
virtual Bool_t Process(Long64_t)
Definition: TSelector.cxx:292
Bool_t MergeOutputFiles()
Merge output in files.
virtual void DispatchOneEvent(Bool_t pendingOnly=kFALSE)
Dispatch a single event.
Definition: TSystem.cxx:434
Long64_t DrawSelect(TDSet *set, const char *varexp, const char *selection, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)
Draw (may not be used in this class).
virtual Int_t GetLearnEntries()=0
Bool_t IsMerged() const
Int_t InitPacketizer(TDSet *dset, Long64_t nentries, Long64_t first, const char *defpackunit, const char *defpackdata)
Init the packetizer Return 0 on success (fPacketizer is correctly initialized), -1 on failure...
Int_t Unlock()
Unlock the directory.
static TClass * GetClass(const char *name, Bool_t load=kTRUE, Bool_t silent=kFALSE)
Static method returning pointer to TClass of the specified class name.
Definition: TClass.cxx:2882
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Definition: TList.cxx:349
Bool_t IsRegister() const
void SetSelectorDataMembersFromOutputList()
Set the selector&#39;s data members: find the mapping of data members to otuput list entries in the outpu...
virtual void SetOption(const char *option)
Definition: TSelector.h:71
virtual Bool_t SendSelector(const char *selector_file)
Send the selector file(s) to master or worker nodes.
virtual void AddAfter(const TObject *after, TObject *obj)
Insert object after object after in the list.
Definition: TList.cxx:221
Bool_t HandleTimer(TTimer *timer)
Handle timer event.
virtual Bool_t Add(TF1 *h1, Double_t c1=1, Option_t *option="")
Performs the operation: this = this + c1*f1 if errors are defined (see TH1::Sumw2), errors are also recalculated.
Definition: TH1.cxx:770
TAxis * GetZaxis()
Definition: TH1.h:326
virtual void Add(const TEventList *list)
Merge contents of alist with this list.
Definition: TEventList.cxx:116
void Throw(int code)
If an exception context has been set (using the TRY and RETRY macros) jump back to where it was set...
Definition: TException.cxx:27
virtual Int_t DrawCanvas(TObject *obj)
Draw the object if it is a canvas.
Mother of all ROOT objects.
Definition: TObject.h:44
void Lookup(Bool_t removeMissing=kFALSE, TList **missingFiles=0)
Resolve the end-point URL for the current elements of this data set If the removeMissing option is se...
Definition: TDSet.cxx:1587
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition: TList.cxx:557
void StopFeedback()
Stop feedback.
virtual Int_t GetNbinsY() const
Definition: TH1.h:302
virtual Long64_t * GetList() const
Definition: TEventList.h:57
typedef void((*Func_t)())
TObject * HandleHistogram(TObject *obj, Bool_t &merged)
Low statistic histograms need a special treatment when using autobin.
TUrl * GetCurrentUrl() const
Return the current url.
Definition: TFileInfo.cxx:248
Int_t Collect(ESlaves list=kActive, Long_t timeout=-1, Int_t endtype=-1, Bool_t deactonfail=kFALSE)
Collect responses from the slave servers.
Definition: TProof.cxx:2705
void Progress(Long64_t total, Long64_t processed)
Report progress (may not be used in this class).
static void SetMacroPath(const char *newpath)
Set or extend the macro search path.
Definition: TROOT.cxx:2593
virtual Bool_t cd(const char *path=0)
Change current directory to "this" directory.
Definition: TDirectory.cxx:435
R__EXTERN TProofServ * gProofServ
Definition: TProofServ.h:361
Int_t AddOutputObject(TObject *obj)
Incorporate output object (may not be used in this class).
virtual void Add(TObject *obj)
Definition: TList.h:81
const Ssiz_t kNPOS
Definition: Rtypes.h:115
Int_t Incorporate(TObject *obj, TList *out, Bool_t &merged)
Incorporate object &#39;newobj&#39; in the list &#39;outlist&#39;.
Class that contains a list of TFileInfo&#39;s and accumulated meta data information about its entries...
void Execute(const char *, const char *, int *=0)
Execute method on this object with the given parameter string, e.g.
Definition: TMethodCall.h:68
Utility class to draw objects in the feedback list during queries.
Definition: TDrawFeedback.h:39
void SetObject(TObject *object)
Set the object to be notified at time out.
Definition: TTimer.cxx:184
void Progress(Long64_t total, Long64_t processed)
Progress signal.
Definition: TProofPlayer.h:446
TFileCollection * GetFileCollection()
Get instance of the file collection to be used in &#39;dataset&#39; mode.
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition: TString.h:567
TF1 * f1
Definition: legend1.C:11
void SetParam(Long_t l)
Add a long method parameter.
Double_t GetRate() const
Bool_t IsTopMaster() const
Definition: TProofServ.h:309
const char * GetDir(Bool_t raw=kFALSE) const
R__EXTERN Int_t gDebug
Definition: Rtypes.h:128
void StopProcess(Bool_t abort, Int_t timeout=-1)
Stop the process after this event.
virtual TIterator * MakeIterator(Bool_t dir=kIterForward) const
Return a list iterator.
Definition: TList.cxx:604
Int_t GetDrawArgs(const char *var, const char *sel, Option_t *opt, TString &selector, TString &objname)
Parse the arguments from var, sel and opt and fill the selector and object name accordingly.
virtual void StoreFeedback(TObject *slave, TList *out)
Store feedback results from the specified slave.
const char * GetType() const
Definition: TDSet.h:228
void Reset()
Definition: TStopwatch.h:54
TObject * GetOutput(const char *name) const
Get output object by name.
Int_t GetTotSessions() const
Definition: TProofServ.h:276
Float_t GetMaxProcTime() const
Definition: TDSet.h:144
void SetExitStatus(Int_t est)
Definition: TStatus.h:73
static void Stop()
Terminate the PROOF statistics run.
Definition: TPerfStats.cxx:764
Long64_t fTotalEvents
Definition: TProofPlayer.h:94
virtual Long64_t GetEntries() const
Definition: TTree.h:392
void StoreFeedback(TObject *slave, TList *out)
Store feedback list (may not be used in this class).
Long64_t GetCacheSize()
Return the size in bytes of the cache.
A TTree object has a header with a name and a title.
Definition: TTree.h:98
#define gDirectory
Definition: TDirectory.h:221
Float_t fMBRateI
Definition: TProof.h:199
static void SetLastMsg(const char *lastmsg)
Set the message to be sent back in case of exceptions.
std::mutex fStopTimerMtx
Definition: TProofPlayer.h:108
Class describing a generic file including meta information.
Definition: TFileInfo.h:50
Bool_t IsMerge() const
void ResetBit(UInt_t f)
Definition: TObject.h:158
Bool_t fCreateSelObj
the latest selector
Definition: TProofPlayer.h:87
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
Definition: TSystem.cxx:1243
Definition: first.py:1
virtual void PrintFiles(Option_t *options)
Print list of files being merged.
TClass * GetClass() const
Definition: TMessage.h:76
Int_t fDrawQueries
Definition: TProofPlayer.h:104
Bool_t IsParallel() const
True if in parallel mode.
Class describing a PROOF worker server.
Definition: TSlave.h:50
static void output(int code)
Definition: gifencode.c:226
Container class for processing statistics.
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
Definition: TSelector.h:39
static void ErrorHandler(Int_t level, Bool_t abort, const char *location, const char *msg)
The PROOF error handler function.
const Bool_t kTRUE
Definition: Rtypes.h:91
float * q
Definition: THbookFile.cxx:87
virtual void Reset()
Reset merger file list.
virtual void SetTitle(const char *title="")
Set the title of the TNamed.
Definition: TNamed.cxx:155
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:27
static void SetMemValues()
Record memory usage.
Definition: TPerfStats.cxx:778
const Int_t n
Definition: legend1.C:16
Long64_t DrawSelect(TDSet *set, const char *varexp, const char *selection, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)
Draw (support for TChain::Draw()).
virtual Int_t Write(const char *name=0, Int_t option=0, Int_t bufsize=0)
Write all objects in this collection.
void NotifyMemory(TObject *obj)
Printout the memory record after merging object &#39;obj&#39; This record is used by the memory monitor...
Bool_t IsValid() const
Return true if the method call has been properly initialized and is usable.
void StoreOutput(TList *out)
Store received output list.
char name[80]
Definition: TGX11.cxx:109
Long_t fMemResident
Definition: TSystem.h:206
const char * GetFile() const
Definition: TUrl.h:78
virtual int GetSysInfo(SysInfo_t *info) const
Returns static system info, like OS type, CPU type, number of CPUs RAM size, etc into the SysInfo_t s...
Definition: TSystem.cxx:2412
virtual Long64_t GetStatus() const
Definition: TSelector.h:66
void SetProcessing(Bool_t on=kTRUE)
Set processing bit according to &#39;on&#39;.
TAxis * GetXaxis()
Definition: TH1.h:324
void Progress(Long64_t total, Long64_t processed)
Progress signal.
virtual ~TProofPlayer()
Destructor.
TTimer * fStopTimer
Definition: TProofPlayer.h:107
virtual Long64_t Merge(TCollection *list)
Add all histograms in the collection to this histogram.
Definition: TH1.cxx:5313
if(line.BeginsWith("/*"))
Definition: HLFactory.cxx:443
virtual void SetIncludePath(const char *includePath)
IncludePath should contain the list of compiler flags to indicate where to find user defined header f...
Definition: TSystem.cxx:4017
const char * GetDirectory() const
Return directory where to look for object.
Definition: TDSet.cxx:234
static Int_t SendInputData(TQueryResult *qr, TProof *p, TString &emsg)
Send the input data file to the workers.
Definition: TProof.cxx:12353
void Feedback(TList *objs)
Set feedback list (may not be used in this class).
virtual void Close(Option_t *option="")
Close a file.
Definition: TFile.cxx:904
TDrawFeedback * CreateDrawFeedback(TProof *p)
Draw feedback creation proxy.
Bool_t fSavePartialResults
Definition: TProofPlayer.h:118
Int_t Remove(TDSetElement *elem, Bool_t deleteElem=kTRUE)
Remove TDSetElement &#39;elem&#39; from the list.
Definition: TDSet.cxx:1558
const char * GetDataDirOpts() const
Definition: TProofServ.h:265
Stopwatch class.
Definition: TStopwatch.h:30
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:911