Logo ROOT  
Reference Guide
TProofPlayer.cxx
Go to the documentation of this file.
1// @(#)root/proofplayer:$Id$
2// Author: Maarten Ballintijn 07/01/02
3
4/*************************************************************************
5 * Copyright (C) 1995-2001, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/** \class TProofPlayer
13\ingroup proofkernel
14
15Internal class steering processing in PROOF.
16Instances of the TProofPlayer class are created on the worker nodes
17per session and do the processing.
18Instances of its subclass - TProofPlayerRemote are created per each
19query on the master(s) and on the client. On the master(s),
20TProofPlayerRemote coordinate processing, check the dataset, create
21the packetizer and take care of merging the results of the workers.
22The instance on the client collects information on the input
23(dataset and selector), it invokes the Begin() method and finalizes
24the query by calling Terminate().
25
26*/
27
28#include "TProofDraw.h"
29#include "TProofPlayer.h"
30#include "THashList.h"
31#include "TEnv.h"
32#include "TEventIter.h"
33#include "TVirtualPacketizer.h"
34#include "TSelector.h"
35#include "TSocket.h"
36#include "TProofServ.h"
37#include "TProof.h"
38#include "TProofOutputFile.h"
39#include "TProofSuperMaster.h"
40#include "TSlave.h"
41#include "TClass.h"
42#include "TROOT.h"
43#include "TError.h"
44#include "TException.h"
45#include "MessageTypes.h"
46#include "TMessage.h"
47#include "TDSetProxy.h"
48#include "TString.h"
49#include "TSystem.h"
50#include "TFile.h"
51#include "TFileCollection.h"
52#include "TFileInfo.h"
53#include "TFileMerger.h"
54#include "TProofDebug.h"
55#include "TTimer.h"
56#include "TMap.h"
57#include "TPerfStats.h"
58#include "TStatus.h"
59#include "TEventList.h"
60#include "TProofLimitsFinder.h"
61#include "TSortedList.h"
62#include "TTree.h"
63#include "TEntryList.h"
64#include "TDSet.h"
65#include "TDrawFeedback.h"
66#include "TNamed.h"
67#include "TObjString.h"
68#include "TQueryResult.h"
69#include "TMD5.h"
70#include "TMethodCall.h"
71#include "TObjArray.h"
72#include "TH1.h"
73#include "TVirtualMonitoring.h"
74#include "TParameter.h"
76#include "TStopwatch.h"
77
78// Timeout exception
79#define kPEX_STOPPED 1001
80#define kPEX_ABORTED 1002
81
82// To flag an abort condition: use a local static variable to avoid
83// warnings about problems with longjumps
85
86class TAutoBinVal : public TNamed {
87private:
89
90public:
92 Double_t ymax, Double_t zmin, Double_t zmax) : TNamed(name,"")
93 {
94 fXmin = xmin; fXmax = xmax;
95 fYmin = ymin; fYmax = ymax;
96 fZmin = zmin; fZmax = zmax;
97 }
99 Double_t& ymax, Double_t& zmin, Double_t& zmax)
100 {
101 xmin = fXmin; xmax = fXmax;
102 ymin = fYmin; ymax = fYmax;
103 zmin = fZmin; zmax = fZmax;
104 }
105
106};
107
108//
109// Special timer to dispatch pending events while processing
110////////////////////////////////////////////////////////////////////////////////
111
112class TDispatchTimer : public TTimer {
113private:
115
116public:
118
119 Bool_t Notify();
120};
121////////////////////////////////////////////////////////////////////////////////
122/// Handle expiration of the timer associated with dispatching pending
123/// events while processing. We must act as fast as possible here, so
124/// we just set a flag submitting a request for dispatching pending events
125
127{
128 if (gDebug > 0) printf("TDispatchTimer::Notify: called!\n");
129
131
132 // Needed for the next shot
133 Reset();
134 return kTRUE;
135}
136
137//
138// Special timer to notify reach of max packet proc time
139////////////////////////////////////////////////////////////////////////////////
140
141class TProctimeTimer : public TTimer {
142private:
144
145public:
147
148 Bool_t Notify();
149};
150////////////////////////////////////////////////////////////////////////////////
151/// Handle expiration of the timer associated with dispatching pending
152/// events while processing. We must act as fast as possible here, so
153/// we just set a flag submitting a request for dispatching pending events
154
156{
157 if (gDebug > 0) printf("TProctimeTimer::Notify: called!\n");
158
160
161 // One shot only
162 return kTRUE;
163}
164
165//
166// Special timer to handle stop/abort request via exception raising
167////////////////////////////////////////////////////////////////////////////////
168
169class TStopTimer : public TTimer {
170private:
173
174public:
175 TStopTimer(TProofPlayer *p, Bool_t abort, Int_t to);
176
177 Bool_t Notify();
178};
179
180////////////////////////////////////////////////////////////////////////////////
181/// Constructor for the timer to stop/abort processing.
182/// The 'timeout' is in seconds.
183/// Make sure that 'to' make sense, i.e. not larger than 10 days;
184/// the minimum value is 10 ms (0 does not seem to start the timer ...).
185
187 : TTimer(((to <= 0 || to > 864000) ? 10 : to * 1000), kFALSE)
188{
189 if (gDebug > 0)
190 Info ("TStopTimer","enter: %d, timeout: %d", abort, to);
191
192 fPlayer = p;
193 fAbort = abort;
194
195 if (gDebug > 1)
196 Info ("TStopTimer","timeout set to %s ms", fTime.AsString());
197}
198
199////////////////////////////////////////////////////////////////////////////////
200/// Handle the signal coming from the expiration of the timer
201/// associated with an abort or stop request.
202/// We raise an exception which will be processed in the
203/// event loop.
204
206{
207 if (gDebug > 0) printf("TStopTimer::Notify: called!\n");
208
209 if (fAbort)
211 else
213
214 return kTRUE;
215}
216
217//------------------------------------------------------------------------------
218
220
222
223////////////////////////////////////////////////////////////////////////////////
224/// Default ctor.
225
227 : fAutoBins(0), fOutput(0), fSelector(0), fCreateSelObj(kTRUE), fSelectorClass(0),
228 fFeedbackTimer(0), fFeedbackPeriod(2000),
229 fEvIter(0), fSelStatus(0),
230 fTotalEvents(0), fReadBytesRun(0), fReadCallsRun(0), fProcessedRun(0),
231 fQueryResults(0), fQuery(0), fPreviousQuery(0), fDrawQueries(0),
232 fMaxDrawQueries(1), fStopTimer(0), fDispatchTimer(0),
233 fProcTimeTimer(0), fProcTime(0),
234 fOutputFile(0),
235 fSaveMemThreshold(-1), fSavePartialResults(kFALSE), fSaveResultsPerPacket(kFALSE)
236{
237 fInput = new TList;
244
245 static Bool_t initLimitsFinder = kFALSE;
246 if (!initLimitsFinder && gProofServ && !gProofServ->IsMaster()) {
248 initLimitsFinder = kTRUE;
249 }
250
251}
252
253////////////////////////////////////////////////////////////////////////////////
254/// Destructor.
255
257{
258 fInput->Clear("nodelete");
260 // The output list is owned by fSelector and destroyed in there
269}
270
271////////////////////////////////////////////////////////////////////////////////
272/// Set processing bit according to 'on'
273
275{
276 if (on)
278 else
280}
281
282////////////////////////////////////////////////////////////////////////////////
283/// Stop the process after this event. If timeout is positive, start
284/// a timer firing after timeout seconds to hard-stop time-expensive
285/// events.
286
288{
289 if (gDebug > 0)
290 Info ("StopProcess","abort: %d, timeout: %d", abort, timeout);
291
292 if (fEvIter != 0)
293 fEvIter->StopProcess(abort);
294 Long_t to = 1;
295 if (abort == kTRUE) {
297 } else {
299 to = timeout;
300 }
301 // Start countdown, if needed
302 if (to > 0)
303 SetStopTimer(kTRUE, abort, to);
304}
305
306////////////////////////////////////////////////////////////////////////////////
307/// Enable/disable the timer to dispatch pening events while processing.
308
310{
313 if (on) {
314 fDispatchTimer = new TDispatchTimer(this);
316 }
317}
318
319////////////////////////////////////////////////////////////////////////////////
320/// Enable/disable the timer to stop/abort processing.
321/// The 'timeout' is in seconds.
322
324{
325 std::lock_guard<std::mutex> lock(fStopTimerMtx);
326
327 // Clean-up the timer
329 if (on) {
330 // create timer
331 fStopTimer = new TStopTimer(this, abort, timeout);
332 // Start the countdown
333 fStopTimer->Start();
334 if (gDebug > 0)
335 Info ("SetStopTimer", "%s timer STARTED (timeout: %d)",
336 (abort ? "ABORT" : "STOP"), timeout);
337 } else {
338 if (gDebug > 0)
339 Info ("SetStopTimer", "timer STOPPED");
340 }
341}
342
343////////////////////////////////////////////////////////////////////////////////
344/// Add query result to the list, making sure that there are no
345/// duplicates.
346
348{
349 if (!q) {
350 Warning("AddQueryResult","query undefined - do nothing");
351 return;
352 }
353
354 // Treat differently normal and draw queries
355 if (!(q->IsDraw())) {
356 if (!fQueryResults) {
357 fQueryResults = new TList;
359 } else {
360 TIter nxr(fQueryResults);
361 TQueryResult *qr = 0;
362 TQueryResult *qp = 0;
363 while ((qr = (TQueryResult *) nxr())) {
364 // If same query, remove old version and break
365 if (*qr == *q) {
367 delete qr;
368 break;
369 }
370 // Record position according to start time
371 if (qr->GetStartTime().Convert() <= q->GetStartTime().Convert())
372 qp = qr;
373 }
374
375 if (!qp) {
377 } else {
379 }
380 }
381 } else if (IsClient()) {
382 // If max reached, eliminate first the oldest one
384 TIter nxr(fQueryResults);
385 TQueryResult *qr = 0;
386 while ((qr = (TQueryResult *) nxr())) {
387 // If same query, remove old version and break
388 if (qr->IsDraw()) {
389 fDrawQueries--;
391 delete qr;
392 break;
393 }
394 }
395 }
396 // Add new draw query
398 fDrawQueries++;
399 if (!fQueryResults)
400 fQueryResults = new TList;
402 }
403 }
404}
405
406////////////////////////////////////////////////////////////////////////////////
407/// Remove all query result instances referenced 'ref' from
408/// the list of results.
409
411{
412 if (fQueryResults) {
413 TIter nxq(fQueryResults);
414 TQueryResult *qr = 0;
415 while ((qr = (TQueryResult *) nxq())) {
416 if (qr->Matches(ref)) {
418 delete qr;
419 }
420 }
421 }
422}
423
424////////////////////////////////////////////////////////////////////////////////
425/// Get query result instances referenced 'ref' from
426/// the list of results.
427
429{
430 if (fQueryResults) {
431 if (ref && strlen(ref) > 0) {
432 TIter nxq(fQueryResults);
433 TQueryResult *qr = 0;
434 while ((qr = (TQueryResult *) nxq())) {
435 if (qr->Matches(ref))
436 return qr;
437 }
438 } else {
439 // Get last
440 return (TQueryResult *) fQueryResults->Last();
441 }
442 }
443
444 // Nothing found
445 return (TQueryResult *)0;
446}
447
448////////////////////////////////////////////////////////////////////////////////
449/// Set current query and save previous value.
450
452{
454 fQuery = q;
455}
456
457////////////////////////////////////////////////////////////////////////////////
458/// Add object to input list.
459
461{
462 fInput->Add(inp);
463}
464
465////////////////////////////////////////////////////////////////////////////////
466/// Clear input list.
467
469{
470 fInput->Clear();
471}
472
473////////////////////////////////////////////////////////////////////////////////
474/// Get output object by name.
475
477{
478 if (fOutput)
479 return fOutput->FindObject(name);
480 return 0;
481}
482
483////////////////////////////////////////////////////////////////////////////////
484/// Get output list.
485
487{
488 TList *ol = fOutput;
489 if (!ol && fQuery)
490 ol = fQuery->GetOutputList();
491 return ol;
492}
493
494////////////////////////////////////////////////////////////////////////////////
495/// Reinitialize fSelector using the selector files in the query result.
496/// Needed when Finalize is called after a Process execution for the same
497/// selector name.
498
500{
501 Int_t rc = 0;
502
503 // Make sure we have a query
504 if (!qr) {
505 Info("ReinitSelector", "query undefined - do nothing");
506 return -1;
507 }
508
509 // Selector name
510 TString selec = qr->GetSelecImp()->GetName();
511 if (selec.Length() <= 0) {
512 Info("ReinitSelector", "selector name undefined - do nothing");
513 return -1;
514 }
515
516 // Find out if this is a standard selection used for Draw actions
517 Bool_t stdselec = TSelector::IsStandardDraw(selec);
518
519 // Find out if this is a precompiled selector: in such a case we do not
520 // have the code in TMacros, so we must rely on local libraries
521 Bool_t compselec = (selec.Contains(".") || stdselec) ? kFALSE : kTRUE;
522
523 // If not, find out if it needs to be expanded
524 TString ipathold;
525 if (!stdselec && !compselec) {
526 // Check checksums for the versions of the selector files
527 Bool_t expandselec = kTRUE;
528 TString dir, ipath;
529 char *selc = gSystem->Which(TROOT::GetMacroPath(), selec, kReadPermission);
530 if (selc) {
531 // Check checksums
532 TMD5 *md5icur = 0, *md5iold = 0, *md5hcur = 0, *md5hold = 0;
533 // Implementation files
534 md5icur = TMD5::FileChecksum(selc);
535 md5iold = qr->GetSelecImp()->Checksum();
536 // Header files
537 TString selh(selc);
538 Int_t dot = selh.Last('.');
539 if (dot != kNPOS) selh.Remove(dot);
540 selh += ".h";
542 md5hcur = TMD5::FileChecksum(selh);
543 md5hold = qr->GetSelecHdr()->Checksum();
544
545 // If nothing has changed nothing to do
546 if (md5hcur && md5hold && md5icur && md5iold)
547 if (*md5hcur == *md5hold && *md5icur == *md5iold)
548 expandselec = kFALSE;
549
550 SafeDelete(md5icur);
551 SafeDelete(md5hcur);
552 SafeDelete(md5iold);
553 SafeDelete(md5hold);
554 if (selc) delete [] selc;
555 }
556
557 Bool_t ok = kTRUE;
558 // Expand selector files, if needed
559 if (expandselec) {
560
561 ok = kFALSE;
562 // Expand files in a temporary directory
563 TUUID u;
564 dir = Form("%s/%s",gSystem->TempDirectory(),u.AsString());
565 if (!(gSystem->MakeDirectory(dir))) {
566
567 // Export implementation file
568 selec = Form("%s/%s",dir.Data(),selec.Data());
569 qr->GetSelecImp()->SaveSource(selec);
570
571 // Export header file
572 TString seleh = Form("%s/%s",dir.Data(),qr->GetSelecHdr()->GetName());
573 qr->GetSelecHdr()->SaveSource(seleh);
574
575 // Adjust include path
576 ipathold = gSystem->GetIncludePath();
577 ipath = Form("-I%s %s", dir.Data(), gSystem->GetIncludePath());
578 gSystem->SetIncludePath(ipath.Data());
579
580 ok = kTRUE;
581 }
582 }
583 TString opt(qr->GetOptions());
584 Ssiz_t id = opt.Last('#');
585 if (id != kNPOS && id < opt.Length() - 1)
586 selec += opt(id + 1, opt.Length());
587
588 if (!ok) {
589 Info("ReinitSelector", "problems locating or exporting selector files");
590 return -1;
591 }
592 }
593
594 // Cleanup previous stuff
596 fSelectorClass = 0;
597
598 // Init the selector now
599 Int_t iglevelsave = gErrorIgnoreLevel;
600 if (compselec)
601 // Silent error printout on first attempt
603
604 if ((fSelector = TSelector::GetSelector(selec))) {
605 if (compselec)
606 gErrorIgnoreLevel = iglevelsave; // restore ignore level
607 fSelectorClass = fSelector->IsA();
609
610 } else {
611 if (compselec) {
612 gErrorIgnoreLevel = iglevelsave; // restore ignore level
613 // Retry by loading first the libraries listed in TQueryResult, if any
614 if (strlen(qr->GetLibList()) > 0) {
615 TString sl(qr->GetLibList());
616 TObjArray *oa = sl.Tokenize(" ");
617 if (oa) {
618 Bool_t retry = kFALSE;
619 TIter nxl(oa);
620 TObjString *os = 0;
621 while ((os = (TObjString *) nxl())) {
622 TString lib = gSystem->BaseName(os->GetName());
623 if (lib != "lib") {
624 lib.ReplaceAll("-l", "lib");
625 if (gSystem->Load(lib) == 0)
626 retry = kTRUE;
627 }
628 }
629 // Retry now, if the case
630 if (retry)
632 }
633 }
634 }
635 if (!fSelector) {
636 if (compselec)
637 Info("ReinitSelector", "compiled selector re-init failed:"
638 " automatic reload unsuccessful:"
639 " please load manually the correct library");
640 rc = -1;
641 }
642 }
643 if (fSelector) {
644 // Draw needs to reinit temp histos
646 if (stdselec) {
647 ((TProofDraw *)fSelector)->DefVar();
648 } else {
649 // variables may have been initialized in Begin()
650 fSelector->Begin(0);
651 }
652 }
653
654 // Restore original include path, if needed
655 if (ipathold.Length() > 0)
656 gSystem->SetIncludePath(ipathold.Data());
657
658 return rc;
659}
660
661////////////////////////////////////////////////////////////////////////////////
662/// Incorporate output object (may not be used in this class).
663
665{
666 MayNotUse("AddOutputObject");
667 return -1;
668}
669
670////////////////////////////////////////////////////////////////////////////////
671/// Incorporate output list (may not be used in this class).
672
674{
675 MayNotUse("AddOutput");
676}
677
678////////////////////////////////////////////////////////////////////////////////
679/// Store output list (may not be used in this class).
680
682{
683 MayNotUse("StoreOutput");
684}
685
686////////////////////////////////////////////////////////////////////////////////
687/// Store feedback list (may not be used in this class).
688
690{
691 MayNotUse("StoreFeedback");
692}
693
694////////////////////////////////////////////////////////////////////////////////
695/// Report progress (may not be used in this class).
696
697void TProofPlayer::Progress(Long64_t /*total*/, Long64_t /*processed*/)
698{
699 MayNotUse("Progress");
700}
701
702////////////////////////////////////////////////////////////////////////////////
703/// Report progress (may not be used in this class).
704
705void TProofPlayer::Progress(Long64_t /*total*/, Long64_t /*processed*/,
706 Long64_t /*bytesread*/,
707 Float_t /*evtRate*/, Float_t /*mbRate*/,
708 Float_t /*evtrti*/, Float_t /*mbrti*/)
709{
710 MayNotUse("Progress");
711}
712
713////////////////////////////////////////////////////////////////////////////////
714/// Report progress (may not be used in this class).
715
717{
718 MayNotUse("Progress");
719}
720
721////////////////////////////////////////////////////////////////////////////////
722/// Set feedback list (may not be used in this class).
723
725{
726 MayNotUse("Feedback");
727}
728
729////////////////////////////////////////////////////////////////////////////////
730/// Draw feedback creation proxy. When accessed via TProof avoids
731/// link dependency on libProofPlayer.
732
734{
735 return new TDrawFeedback(p);
736}
737
738////////////////////////////////////////////////////////////////////////////////
739/// Set draw feedback option.
740
742{
743 if (f)
744 f->SetOption(opt);
745}
746
747////////////////////////////////////////////////////////////////////////////////
748/// Delete draw feedback object.
749
751{
752 delete f;
753}
754
755////////////////////////////////////////////////////////////////////////////////
756/// Save the partial results of this query to a dedicated file under the user
757/// data directory. The file name has the form
758/// <session_tag>.q<query_seq_num>.root
759/// The file pat and the file are created if not existing already.
760/// Only objects in the outputlist not being TProofOutputFile are saved.
761/// The packets list 'packets' is saved if given.
762/// Trees not attached to any file are attached to the open file.
763/// If 'queryend' is kTRUE evrything is written out (TTrees included).
764/// The actual saving action is controlled by 'force' and by fSavePartialResults /
765/// fSaveResultsPerPacket:
766///
767/// fSavePartialResults = kFALSE/kTRUE no-saving/saving
768/// fSaveResultsPerPacket = kFALSE/kTRUE save-per-query/save-per-packet
769///
770/// The function CheckMemUsage sets fSavePartialResults = 1 if fSaveMemThreshold > 0 and
771/// ProcInfo_t::fMemResident >= fSaveMemThreshold: from that point on partial results
772/// are always saved and expensive calls to TSystem::GetProcInfo saved.
773/// The switch fSaveResultsPerPacket is instead controlled by the user or admin
774/// who can also force saving in all cases; parameter PROOF_SavePartialResults or
775/// RC env ProofPlayer.SavePartialResults .
776/// However, if 'force' is kTRUE, fSavePartialResults and fSaveResultsPerPacket
777/// are ignored.
778/// Return -1 in case of problems, 0 otherwise.
779
781{
782 Bool_t save = (force || (fSavePartialResults &&
783 (queryend || fSaveResultsPerPacket))) ? kTRUE : kFALSE;
784 if (!save) {
785 PDB(kOutput, 2)
786 Info("SavePartialResults", "partial result saving disabled");
787 return 0;
788 }
789
790 // Sanity check
791 if (!gProofServ) {
792 Error("SavePartialResults", "gProofServ undefined: something really wrong going on!!!");
793 return -1;
794 }
795 if (!fOutput) {
796 Error("SavePartialResults", "fOutput undefined: something really wrong going on!!!");
797 return -1;
798 }
799
800 PDB(kOutput, 1)
801 Info("SavePartialResults", "start saving partial results {%d,%d,%d,%d}",
803
804 // Get list of processed packets from the iterator
805 PDB(kOutput, 2) Info("SavePartialResults", "fEvIter: %p", fEvIter);
806
807 TList *packets = (fEvIter) ? fEvIter->GetPackets() : 0;
808 PDB(kOutput, 2) Info("SavePartialResults", "list of packets: %p, sz: %d",
809 packets, (packets ? packets->GetSize(): -1));
810
811 // Open the file
812 const char *oopt = "UPDATE";
813 // Check if the file has already been defined
814 TString baseName(fOutputFilePath);
815 if (fOutputFilePath.IsNull()) {
816 baseName.Form("output-%s.q%d.root", gProofServ->GetTopSessionTag(), gProofServ->GetQuerySeqNum());
817 if (gProofServ->GetDataDirOpts() && strlen(gProofServ->GetDataDirOpts()) > 0) {
818 fOutputFilePath.Form("%s/%s?%s", gProofServ->GetDataDir(), baseName.Data(),
820 } else {
821 fOutputFilePath.Form("%s/%s", gProofServ->GetDataDir(), baseName.Data());
822 }
823 Info("SavePartialResults", "file with (partial) output: '%s'", fOutputFilePath.Data());
824 oopt = "RECREATE";
825 }
826 // Open the file in write mode
827 if (!(fOutputFile = TFile::Open(fOutputFilePath, oopt)) ||
829 Error("SavePartialResults", "cannot open '%s' for writing", fOutputFilePath.Data());
831 return -1;
832 }
833
834 // Save current directory
835 TDirectory *curdir = gDirectory;
836 fOutputFile->cd();
837
838 // Write first the packets list, if required
839 if (packets) {
840 TDirectory *packetsDir = fOutputFile->mkdir("packets");
841 if (packetsDir) packetsDir->cd();
843 fOutputFile->cd();
844 }
845
846 Bool_t notempty = kFALSE;
847 // Write out the output list
848 TList torm;
849 TIter nxo(fOutput);
850 TObject *o = 0;
851 while ((o = nxo())) {
852 // Skip output file drivers
853 if (o->InheritsFrom(TProofOutputFile::Class())) continue;
854 // Skip control objets
855 if (!strncmp(o->GetName(), "PROOF_", 6)) continue;
856 // Skip data members mapping
858 // Skip missing file info
859 if (!strcmp(o->GetName(), "MissingFiles")) continue;
860 // Trees need a special treatment
861 if (o->InheritsFrom("TTree")) {
862 TTree *t = (TTree *) o;
863 TDirectory *d = t->GetDirectory();
864 // If the tree is not attached to any file ...
865 if (!d || (d && !d->InheritsFrom("TFile"))) {
866 // ... we attach it
868 }
869 if (t->GetDirectory() == fOutputFile) {
870 if (queryend) {
871 // ... we write it out
873 // At least something in the file
874 notempty = kTRUE;
875 // Flag for removal from the outputlist
876 torm.Add(o);
877 // Prevent double-deletion attempts
878 t->SetDirectory(0);
879 } else {
880 // ... or we set in automatic flush mode
881 t->SetAutoFlush();
882 }
883 }
884 } else if (queryend || fSaveResultsPerPacket) {
885 // Save overwriting what's already there
887 // At least something in the file
888 notempty = kTRUE;
889 // Flag for removal from the outputlist
890 if (queryend) torm.Add(o);
891 }
892 }
893
894 // Restore previous directory
895 gDirectory = curdir;
896
897 // Close the file if required
898 if (notempty) {
899 if (!fOutput->FindObject(baseName)) {
900 TProofOutputFile *po = 0;
901 // Get directions
902 TNamed *nm = (TNamed *) fInput->FindObject("PROOF_DefaultOutputOption");
903 TString oname = (nm) ? nm->GetTitle() : fOutputFilePath.Data();
904 if (nm && oname.BeginsWith("ds:")) {
905 oname.Replace(0, 3, "");
906 TString qtag =
908 oname.ReplaceAll("<qtag>", qtag);
909 // Create the TProofOutputFile for dataset creation
910 po = new TProofOutputFile(baseName, "DRO", oname.Data());
911 } else {
912 Bool_t hasddir = kFALSE;
913 // Create the TProofOutputFile for automatic merging
914 po = new TProofOutputFile(baseName, "M");
915 if (oname.BeginsWith("of:")) oname.Replace(0, 3, "");
916 if (gProofServ->IsTopMaster()) {
917 if (!strcmp(TUrl(oname, kTRUE).GetProtocol(), "file")) {
918 TString dsrv;
920 TProofServ::FilterLocalroot(oname, dsrv);
921 oname.Insert(0, dsrv);
922 }
923 } else {
924 if (nm) {
925 // The name has been sent by the client: resolve local place holders
926 oname.ReplaceAll("<file>", baseName);
927 } else {
928 // We did not get any indication; the final file will be in the datadir on
929 // the top master and it will be resolved there
930 oname.Form("<datadir>/%s", baseName.Data());
931 hasddir = kTRUE;
932 }
933 }
934 po->SetOutputFileName(oname.Data());
935 if (hasddir)
936 // Reset the bit, so that <datadir> has a chance to be resolved in AddOutputObject
938 po->SetName(gSystem->BaseName(oname.Data()));
939 }
941 fOutput->Add(po);
942 // Flag the nature of this file
944 }
945 }
948
949 // If last call, cleanup the output list from objects saved to file
950 if (queryend && torm.GetSize() > 0) {
951 TIter nxrm(&torm);
952 while ((o = nxrm())) { fOutput->Remove(o); }
953 }
954 torm.SetOwner(kFALSE);
955
956 PDB(kOutput, 1)
957 Info("SavePartialResults", "partial results saved to file");
958 // We are done
959 return 0;
960}
961
962////////////////////////////////////////////////////////////////////////////////
963/// Make sure that a valid selector object
964/// Return -1 in case of problems, 0 otherwise
965
966Int_t TProofPlayer::AssertSelector(const char *selector_file)
967{
968 if (selector_file && strlen(selector_file)) {
970
971 // Get selector files from cache
973 if (gProofServ) {
976 }
977
978 fSelector = TSelector::GetSelector(selector_file);
979
980 if (gProofServ) {
983 }
984
985 if (!fSelector) {
986 Error("AssertSelector", "cannot load: %s", selector_file );
987 return -1;
988 }
989
991 Info("AssertSelector", "Processing via filename (%s)", selector_file);
992 } else if (!fSelector) {
993 Error("AssertSelector", "no TSelector object define : cannot continue!");
994 return -1;
995 } else {
996 Info("AssertSelector", "Processing via TSelector object");
997 }
998 // Done
999 return 0;
1000}
1001////////////////////////////////////////////////////////////////////////////////
1002/// Update fProgressStatus
1003
1005{
1006 if (fProgressStatus) {
1014 fProcessedRun = 0;
1015 }
1016}
1017
1018////////////////////////////////////////////////////////////////////////////////
1019/// Process specified TDSet on PROOF worker.
1020/// The return value is -1 in case of error and TSelector::GetStatus()
1021/// in case of success.
1022
1023Long64_t TProofPlayer::Process(TDSet *dset, const char *selector_file,
1024 Option_t *option, Long64_t nentries,
1026{
1027 PDB(kGlobal,1) Info("Process","Enter");
1028
1030 fOutput = 0;
1031
1032 TCleanup clean(this);
1033
1034 fSelectorClass = 0;
1035 TString wmsg;
1036 TRY {
1037 if (AssertSelector(selector_file) != 0 || !fSelector) {
1038 Error("Process", "cannot assert the selector object");
1039 return -1;
1040 }
1041
1042 fSelectorClass = fSelector->IsA();
1043 Int_t version = fSelector->Version();
1044 if (version == 0 && IsClient()) fSelector->GetOutputList()->Clear();
1045
1047
1048 if (gProofServ)
1050
1051 fSelStatus = new TStatus;
1053
1054 fSelector->SetOption(option);
1056
1057 // If in sequential (0-PROOF) mode validate the data set to get
1058 // the number of entries
1060 if (fTotalEvents < 0 && gProofServ &&
1062 dset->Validate();
1063 dset->Reset();
1064 TDSetElement *e = 0;
1065 while ((e = dset->Next())) {
1066 fTotalEvents += e->GetNum();
1067 }
1068 }
1069
1070 dset->Reset();
1071
1072 // Set parameters controlling the iterator behaviour
1073 Int_t useTreeCache = 1;
1074 if (TProof::GetParameter(fInput, "PROOF_UseTreeCache", useTreeCache) == 0) {
1075 if (useTreeCache > -1 && useTreeCache < 2)
1076 gEnv->SetValue("ProofPlayer.UseTreeCache", useTreeCache);
1077 }
1078 Long64_t cacheSize = -1;
1079 if (TProof::GetParameter(fInput, "PROOF_CacheSize", cacheSize) == 0) {
1080 TString sz = TString::Format("%lld", cacheSize);
1081 gEnv->SetValue("ProofPlayer.CacheSize", sz.Data());
1082 }
1083 // Parallel unzipping
1084 Int_t useParallelUnzip = 0;
1085 if (TProof::GetParameter(fInput, "PROOF_UseParallelUnzip", useParallelUnzip) == 0) {
1086 if (useParallelUnzip > -1 && useParallelUnzip < 2)
1087 gEnv->SetValue("ProofPlayer.UseParallelUnzip", useParallelUnzip);
1088 }
1089 // OS file caching (Mac Os X only)
1090 Int_t dontCacheFiles = 0;
1091 if (TProof::GetParameter(fInput, "PROOF_DontCacheFiles", dontCacheFiles) == 0) {
1092 if (dontCacheFiles == 1)
1093 gEnv->SetValue("ProofPlayer.DontCacheFiles", 1);
1094 }
1096
1097 // Control file object swap
1098 // <how>*10 + <force>
1099 // <how> = 0 end of run
1100 // 1 after each packet
1101 // <force> = 0 no, swap only if memory threshold is reached
1102 // 1 swap in all cases, accordingly to <how>
1103 Int_t opt = 0;
1104 if (TProof::GetParameter(fInput, "PROOF_SavePartialResults", opt) != 0) {
1105 opt = gEnv->GetValue("ProofPlayer.SavePartialResults", 0);
1106 }
1107 fSaveResultsPerPacket = (opt >= 10) ? kTRUE : kFALSE;
1108 fSavePartialResults = (opt%10 > 0) ? kTRUE : kFALSE;
1109 Info("Process", "save partial results? %d per-packet? %d", fSavePartialResults, fSaveResultsPerPacket);
1110
1111 // Memory threshold for file object swap
1112 Float_t memfrac = gEnv->GetValue("ProofPlayer.SaveMemThreshold", -1.);
1113 if (memfrac > 0.) {
1114 // The threshold is per core
1115 SysInfo_t si;
1116 if (gSystem->GetSysInfo(&si) == 0) {
1117 fSaveMemThreshold = (Long_t) ((memfrac * si.fPhysRam * 1024.) / si.fCpus);
1118 Info("Process", "memory threshold for saving objects to file set to %ld kB",
1120 } else {
1121 Error("Process", "cannot get SysInfo_t (!)");
1122 }
1123 }
1124
1125 if (version == 0) {
1126 PDB(kLoop,1) Info("Process","Call Begin(0)");
1127 fSelector->Begin(0);
1128 } else {
1129 if (IsClient()) {
1130 // on client (for local run)
1131 PDB(kLoop,1) Info("Process","Call Begin(0)");
1132 fSelector->Begin(0);
1133 }
1135 PDB(kLoop,1) Info("Process","Call SlaveBegin(0)");
1136 fSelector->SlaveBegin(0); // Init is called explicitly
1137 // from GetNextEvent()
1138 }
1139 }
1140
1141 } CATCH(excode) {
1143 Error("Process","exception %d caught", excode);
1145 return -1;
1146 } ENDTRY;
1147
1148 // Save the results, if needed, closing the file
1149 if (SavePartialResults(kFALSE) < 0)
1150 Warning("Process", "problems seetting up file-object swapping");
1151
1152 // Create feedback lists, if required
1153 SetupFeedback();
1154
1157
1158 PDB(kLoop,1)
1159 Info("Process","Looping over Process()");
1160
1161 // get the byte read counter at the beginning of processing
1164 fProcessedRun = 0;
1165 // force the first monitoring info
1168
1169 // Start asynchronous timer to dispatch pending events
1171
1172 // Loop over range
1173 gAbort = kFALSE;
1174 Long64_t entry;
1177
1178 TRY {
1179
1180 Int_t mrc = -1;
1181 // Get the frequency for checking memory consumption and logging information
1182 Long64_t memlogfreq = -1;
1183 if (((mrc = TProof::GetParameter(fInput, "PROOF_MemLogFreq", memlogfreq))) != 0) memlogfreq = -1;
1184 Long64_t singleshot = 1;
1185 Bool_t warnHWMres = kTRUE, warnHWMvir = kTRUE;
1186 TString lastMsg("(unfortunately no detailed info is available about current packet)");
1187
1188 // Initial memory footprint
1189 if (!CheckMemUsage(singleshot, warnHWMres, warnHWMvir, wmsg)) {
1190 Error("Process", "%s", wmsg.Data());
1191 wmsg.Insert(0, TString::Format("ERROR:%s, after SlaveBegin(), ", gProofServ ? gProofServ->GetOrdinal() : "gProofServ is nullptr"));
1192 fSelStatus->Add(wmsg.Data());
1193 if (gProofServ) {
1196 }
1199 } else if (!wmsg.IsNull()) {
1200 Warning("Process", "%s", wmsg.Data());
1201 }
1202
1203 TPair *currentElem = 0;
1204 // The event loop on the worker
1205 Long64_t fst = -1, num;
1206 Long_t maxproctime = -1;
1207 Bool_t newrun = kFALSE;
1208 while ((fEvIter->GetNextPacket(fst, num) != -1) &&
1211 // This is needed by the inflate infrastructure to calculate
1212 // sleeping times
1214
1215 // Give the possibility to the selector to access additional info in the
1216 // incoming packet
1217 if (dset->Current()) {
1218 if (!currentElem) {
1219 currentElem = new TPair(new TObjString("PROOF_CurrentElement"), dset->Current());
1220 fInput->Add(currentElem);
1221 } else {
1222 if (currentElem->Value() != dset->Current()) {
1223 currentElem->SetValue(dset->Current());
1224 } else if (dset->Current()->TestBit(TDSetElement::kNewRun)) {
1226 }
1227 }
1229 if (dset->TestBit(TDSet::kEmpty)) {
1230 lastMsg = "check logs for possible stacktrace - last cycle:";
1231 } else {
1232 TDSetElement *elem = dynamic_cast<TDSetElement *>(currentElem->Value());
1233 TString fn = (elem) ? elem->GetFileName() : "<undef>";
1234 lastMsg.Form("while processing dset:'%s', file:'%s'"
1235 " - check logs for possible stacktrace - last event:", dset->GetName(), fn.Data());
1236 }
1237 TProofServ::SetLastMsg(lastMsg);
1238 }
1239 // Set the max proc time, if any
1240 if (dset->Current()->GetMaxProcTime() >= 0.)
1241 maxproctime = (Long_t) (1000 * dset->Current()->GetMaxProcTime());
1242 newrun = (dset->Current()->TestBit(TDSetElement::kNewPacket)) ? kTRUE : kFALSE;
1243 }
1244
1247 // Setup packet proc time measurement
1248 if (maxproctime > 0) {
1249 if (!fProcTimeTimer) fProcTimeTimer = new TProctimeTimer(this, maxproctime);
1250 fProcTimeTimer->Start(maxproctime, kTRUE); // One shot
1251 if (!fProcTime) fProcTime = new TStopwatch();
1252 fProcTime->Reset(); // Reset counters
1253 }
1254 Long64_t refnum = num;
1255 if (refnum < 0 && maxproctime <= 0) {
1256 wmsg.Form("neither entries nor max proc time specified:"
1257 " risk of infinite loop: processing aborted");
1258 Error("Process", "%s", wmsg.Data());
1259 if (gProofServ) {
1260 wmsg.Insert(0, TString::Format("ERROR:%s, entry:%lld, ",
1263 }
1266 break;
1267 }
1268 while (refnum < 0 || num--) {
1269
1270 // Did we use all our time?
1272 fProcTime->Stop();
1273 if (!newrun && !TestBit(TProofPlayer::kMaxProcTimeExtended) && refnum > 0) {
1274 // How much are we left with?
1275 Float_t xleft = (refnum > num) ? (Float_t) num / (Float_t) (refnum) : 1.;
1276 if (xleft < 0.2) {
1277 // Give another try, 1.5 times the remaining measured expected time
1278 Long_t mpt = (Long_t) (1500 * num / ((Double_t)(refnum - num) / fProcTime->RealTime()));
1280 fProcTimeTimer->Start(mpt, kTRUE); // One shot
1282 }
1283 }
1285 Info("Process", "max proc time reached (%ld msecs): packet processing stopped:\n%s",
1286 maxproctime, lastMsg.Data());
1287
1288 break;
1289 }
1290 }
1291
1294
1295 // Get the netry number, taking into account entry or event lists
1296 entry = fEvIter->GetEntryNumber(fst);
1297 fst++;
1298
1299 // Set the last entry
1301
1302 if (fSelector->Version() == 0) {
1303 PDB(kLoop,3)
1304 Info("Process","Call ProcessCut(%lld)", entry);
1305 if (fSelector->ProcessCut(entry)) {
1306 PDB(kLoop,3)
1307 Info("Process","Call ProcessFill(%lld)", entry);
1308 fSelector->ProcessFill(entry);
1309 }
1310 } else {
1311 PDB(kLoop,3)
1312 Info("Process","Call Process(%lld)", entry);
1313 fSelector->Process(entry);
1316 break;
1317 } else if (fSelector->GetAbort() == TSelector::kAbortFile) {
1318 Info("Process", "packet processing aborted following the"
1319 " selector settings:\n%s", lastMsg.Data());
1322 }
1323 }
1325
1326 // Check the memory footprint, if required
1327 if (memlogfreq > 0 && (GetEventsProcessed() + fProcessedRun)%memlogfreq == 0) {
1328 if (!CheckMemUsage(memlogfreq, warnHWMres, warnHWMvir, wmsg)) {
1329 Error("Process", "%s", wmsg.Data());
1330 if (gProofServ) {
1331 wmsg.Insert(0, TString::Format("ERROR:%s, entry:%lld, ",
1332 gProofServ->GetOrdinal(), entry));
1334 }
1338 break;
1339 } else {
1340 if (!wmsg.IsNull()) {
1341 Warning("Process", "%s", wmsg.Data());
1342 if (gProofServ) {
1343 wmsg.Insert(0, TString::Format("WARNING:%s, entry:%lld, ",
1344 gProofServ->GetOrdinal(), entry));
1346 }
1347 }
1348 }
1349 }
1353 }
1355 if (fSelStatus->TestBit(TStatus::kNotOk) || gROOT->IsInterrupted()) break;
1356
1357 // Make sure that the selector abort status is reset
1359 fSelector->Abort("status reset", TSelector::kContinue);
1360 }
1361 }
1362
1363 } CATCH(excode) {
1364 if (excode == kPEX_STOPPED) {
1365 Info("Process","received stop-process signal");
1367 } else if (excode == kPEX_ABORTED) {
1368 gAbort = kTRUE;
1369 Info("Process","received abort-process signal");
1371 } else {
1372 Error("Process","exception %d caught", excode);
1373 // Perhaps we need a dedicated status code here ...
1374 gAbort = kTRUE;
1376 }
1378 } ENDTRY;
1379
1380 // Clean-up the envelop for the current element
1381 TPair *currentElem = 0;
1382 if ((currentElem = (TPair *) fInput->FindObject("PROOF_CurrentElement"))) {
1383 if ((currentElem = (TPair *) fInput->Remove(currentElem))) {
1384 delete currentElem->Key();
1385 delete currentElem;
1386 }
1387 }
1388
1389 // Final memory footprint
1390 Long64_t singleshot = 1;
1391 Bool_t warnHWMres = kTRUE, warnHWMvir = kTRUE;
1392 Bool_t shrc = CheckMemUsage(singleshot, warnHWMres, warnHWMvir, wmsg);
1393 if (!wmsg.IsNull()) Warning("Process", "%s (%s)", wmsg.Data(), shrc ? "warn" : "hwm");
1394
1395 PDB(kGlobal,2)
1396 Info("Process","%lld events processed", fProgressStatus->GetEntries());
1397
1398 if (gMonitoringWriter) {
1402 }
1403
1404 // Stop active timers
1406 if (fStopTimer != 0)
1408 if (fFeedbackTimer != 0)
1409 HandleTimer(0);
1410
1411 StopFeedback();
1412
1413 // Save the results, if needed, closing the file
1414 if (SavePartialResults(kTRUE) < 0)
1415 Warning("Process", "problems saving the results to file");
1416
1418
1419 // Finalize
1420
1421 if (fExitStatus != kAborted) {
1422
1423 TIter nxo(GetOutputList());
1424 TObject *o = 0;
1425 while ((o = nxo())) {
1426 // Special treatment for files
1427 if (o->IsA() == TProofOutputFile::Class()) {
1429 of->Print();
1431 const char *dir = of->GetDir();
1432 if (!dir || (dir && strlen(dir) <= 0)) {
1434 } else if (dir && strlen(dir) > 0) {
1435 TUrl u(dir);
1436 if (!strcmp(u.GetHost(), "localhost") || !strcmp(u.GetHost(), "127.0.0.1") ||
1437 !strcmp(u.GetHost(), "localhost.localdomain")) {
1439 of->SetDir(u.GetUrl(kTRUE));
1440 }
1441 of->Print();
1442 }
1443 }
1444 }
1445
1447
1449 if (fSelector->Version() == 0) {
1450 PDB(kLoop,1) Info("Process","Call Terminate()");
1452 } else {
1453 PDB(kLoop,1) Info("Process","Call SlaveTerminate()");
1456 PDB(kLoop,1) Info("Process","Call Terminate()");
1458 }
1459 }
1460 }
1461
1462 // Add Selector status in the output list so it can be returned to the client as done
1463 // by Tree::Process (see ROOT-748). The status from the various workers will be added.
1464 fOutput->Add(new TParameter<Long64_t>("PROOF_SelectorStatus", (Long64_t) fSelector->GetStatus()));
1465
1466 if (gProofServ && !gProofServ->IsParallel()) { // put all the canvases onto the output list
1467 TIter nxc(gROOT->GetListOfCanvases());
1468 while (TObject *c = nxc())
1469 fOutput->Add(c);
1470 }
1471 }
1472
1473 if (gProofServ)
1475
1476 return 0;
1477}
1478
1479////////////////////////////////////////////////////////////////////////////////
1480/// Process specified TDSet on PROOF worker with TSelector object
1481/// The return value is -1 in case of error and TSelector::GetStatus()
1482/// in case of success.
1483
1485 Option_t *option, Long64_t nentries,
1487{
1488 if (!selector) {
1489 Error("Process", "selector object undefiend!");
1490 return -1;
1491 }
1492
1494 fSelector = selector;
1496 return Process(dset, (const char *)0, option, nentries, first);
1497}
1498
1499////////////////////////////////////////////////////////////////////////////////
1500/// Not implemented: meaningful only in the remote player. Returns kFALSE.
1501
1503{
1504 return kFALSE;
1505}
1506
1507////////////////////////////////////////////////////////////////////////////////
1508/// Check the memory usage, if requested.
1509/// Return kTRUE if OK, kFALSE if above 95% of at least one between virtual or
1510/// resident limits are depassed.
1511
1513 Bool_t &w80v, TString &wmsg)
1514{
1516 if (mfreq > 0 && processed%mfreq == 0) {
1517 // Record the memory information
1518 ProcInfo_t pi;
1519 if (!gSystem->GetProcInfo(&pi)){
1520 wmsg = "";
1521 if (gProofServ)
1522 Info("CheckMemUsage|Svc", "Memory %ld virtual %ld resident event %lld",
1523 pi.fMemVirtual, pi.fMemResident, processed);
1524 // Save info in TStatus
1525 fSelStatus->SetMemValues(pi.fMemVirtual, pi.fMemResident);
1526 // Apply limit on virtual memory, if any: warn if above 80%, stop if above 95% of max
1527 if (TProofServ::GetVirtMemMax() > 0) {
1528 if (pi.fMemVirtual > TProofServ::GetMemStop() * TProofServ::GetVirtMemMax()) {
1529 wmsg.Form("using more than %d%% of allowed virtual memory (%ld kB)"
1530 " - STOP processing", (Int_t) (TProofServ::GetMemStop() * 100), pi.fMemVirtual);
1531 return kFALSE;
1532 } else if (pi.fMemVirtual > TProofServ::GetMemHWM() * TProofServ::GetVirtMemMax() && w80v) {
1533 // Refine monitoring
1534 mfreq = 1;
1535 wmsg.Form("using more than %d%% of allowed virtual memory (%ld kB)",
1536 (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemVirtual);
1537 w80v = kFALSE;
1538 }
1539 }
1540 // Apply limit on resident memory, if any: warn if above 80%, stop if above 95% of max
1541 if (TProofServ::GetResMemMax() > 0) {
1542 if (pi.fMemResident > TProofServ::GetMemStop() * TProofServ::GetResMemMax()) {
1543 wmsg.Form("using more than %d%% of allowed resident memory (%ld kB)"
1544 " - STOP processing", (Int_t) (TProofServ::GetMemStop() * 100), pi.fMemResident);
1545 return kFALSE;
1546 } else if (pi.fMemResident > TProofServ::GetMemHWM() * TProofServ::GetResMemMax() && w80r) {
1547 // Refine monitoring
1548 mfreq = 1;
1549 if (wmsg.Length() > 0) {
1550 wmsg.Form("using more than %d%% of allowed both virtual and resident memory ({%ld,%ld} kB)",
1551 (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemVirtual, pi.fMemResident);
1552 } else {
1553 wmsg.Form("using more than %d%% of allowed resident memory (%ld kB)",
1554 (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemResident);
1555 }
1556 w80r = kFALSE;
1557 }
1558 }
1559 // In saving-partial-results mode flag the saving regime when reached to save expensive calls
1560 // to TSystem::GetProcInfo in SavePartialResults
1561 if (fSaveMemThreshold > 0 && pi.fMemResident >= fSaveMemThreshold) fSavePartialResults = kTRUE;
1562 }
1563 }
1564 // Done
1565 return kTRUE;
1566}
1567
1568////////////////////////////////////////////////////////////////////////////////
1569/// Finalize query (may not be used in this class).
1570
1572{
1573 MayNotUse("Finalize");
1574 return -1;
1575}
1576
1577////////////////////////////////////////////////////////////////////////////////
1578/// Finalize query (may not be used in this class).
1579
1581{
1582 MayNotUse("Finalize");
1583 return -1;
1584}
1585////////////////////////////////////////////////////////////////////////////////
1586/// Merge output (may not be used in this class).
1587
1589{
1590 MayNotUse("MergeOutput");
1591 return;
1592}
1593
1594////////////////////////////////////////////////////////////////////////////////
1595
1597{
1599 fOutput->Add(olsdm);
1600}
1601
1602////////////////////////////////////////////////////////////////////////////////
1603/// Update automatic binning parameters for given object "name".
1604
1608 Double_t& zmin, Double_t& zmax)
1609{
1610 if ( fAutoBins == 0 ) {
1611 fAutoBins = new THashList;
1612 }
1613
1615
1616 if ( val == 0 ) {
1617 //look for info in higher master
1618 if (gProofServ && !gProofServ->IsTopMaster()) {
1619 TString key = name;
1621 }
1622
1623 val = new TAutoBinVal(name,xmin,xmax,ymin,ymax,zmin,zmax);
1624 fAutoBins->Add(val);
1625 } else {
1626 val->GetAll(xmin,xmax,ymin,ymax,zmin,zmax);
1627 }
1628}
1629
1630////////////////////////////////////////////////////////////////////////////////
1631/// Get next packet (may not be used in this class).
1632
1634{
1635 MayNotUse("GetNextPacket");
1636 return 0;
1637}
1638
1639////////////////////////////////////////////////////////////////////////////////
1640/// Set up feedback (may not be used in this class).
1641
1643{
1644 MayNotUse("SetupFeedback");
1645}
1646
1647////////////////////////////////////////////////////////////////////////////////
1648/// Stop feedback (may not be used in this class).
1649
1651{
1652 MayNotUse("StopFeedback");
1653}
1654
1655////////////////////////////////////////////////////////////////////////////////
1656/// Draw (may not be used in this class).
1657
1658Long64_t TProofPlayer::DrawSelect(TDSet * /*set*/, const char * /*varexp*/,
1659 const char * /*selection*/, Option_t * /*option*/,
1660 Long64_t /*nentries*/, Long64_t /*firstentry*/)
1661{
1662 MayNotUse("DrawSelect");
1663 return -1;
1664}
1665
1666////////////////////////////////////////////////////////////////////////////////
1667/// Handle tree header request.
1668
1670{
1671 MayNotUse("HandleGetTreeHeader|");
1672}
1673
1674////////////////////////////////////////////////////////////////////////////////
1675/// Receive histo from slave.
1676
1678{
1679 TObject *obj = mess->ReadObject(mess->GetClass());
1680 if (obj->InheritsFrom(TH1::Class())) {
1681 TH1 *h = (TH1*)obj;
1682 h->SetDirectory(0);
1683 TH1 *horg = (TH1*)gDirectory->GetList()->FindObject(h->GetName());
1684 if (horg)
1685 horg->Add(h);
1686 else
1687 h->SetDirectory(gDirectory);
1688 }
1689}
1690
1691////////////////////////////////////////////////////////////////////////////////
1692/// Draw the object if it is a canvas.
1693/// Return 0 in case of success, 1 if it is not a canvas or libProofDraw
1694/// is not available.
1695
1697{
1698 static Int_t (*gDrawCanvasHook)(TObject *) = 0;
1699
1700 // Load the library the first time
1701 if (!gDrawCanvasHook) {
1702 // Load library needed for graphics ...
1703 TString drawlib = "libProofDraw";
1704 char *p = 0;
1705 if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
1706 delete[] p;
1707 if (gSystem->Load(drawlib) != -1) {
1708 // Locate DrawCanvas
1709 Func_t f = 0;
1710 if ((f = gSystem->DynFindSymbol(drawlib,"DrawCanvas")))
1711 gDrawCanvasHook = (Int_t (*)(TObject *))(f);
1712 else
1713 Warning("DrawCanvas", "can't find DrawCanvas");
1714 } else
1715 Warning("DrawCanvas", "can't load %s", drawlib.Data());
1716 } else
1717 Warning("DrawCanvas", "can't locate %s", drawlib.Data());
1718 }
1719 if (gDrawCanvasHook && obj)
1720 return (*gDrawCanvasHook)(obj);
1721 // No drawing hook or object undefined
1722 return 1;
1723}
1724
1725////////////////////////////////////////////////////////////////////////////////
1726/// Parse the arguments from var, sel and opt and fill the selector and
1727/// object name accordingly.
1728/// Return 0 in case of success, 1 if libProofDraw is not available.
1729
1730Int_t TProofPlayer::GetDrawArgs(const char *var, const char *sel, Option_t *opt,
1731 TString &selector, TString &objname)
1732{
1733 static Int_t (*gGetDrawArgsHook)(const char *, const char *, Option_t *,
1734 TString &, TString &) = 0;
1735
1736 // Load the library the first time
1737 if (!gGetDrawArgsHook) {
1738 // Load library needed for graphics ...
1739 TString drawlib = "libProofDraw";
1740 char *p = 0;
1741 if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
1742 delete[] p;
1743 if (gSystem->Load(drawlib) != -1) {
1744 // Locate GetDrawArgs
1745 Func_t f = 0;
1746 if ((f = gSystem->DynFindSymbol(drawlib,"GetDrawArgs")))
1747 gGetDrawArgsHook = (Int_t (*)(const char *, const char *, Option_t *,
1748 TString &, TString &))(f);
1749 else
1750 Warning("GetDrawArgs", "can't find GetDrawArgs");
1751 } else
1752 Warning("GetDrawArgs", "can't load %s", drawlib.Data());
1753 } else
1754 Warning("GetDrawArgs", "can't locate %s", drawlib.Data());
1755 }
1756 if (gGetDrawArgsHook)
1757 return (*gGetDrawArgsHook)(var, sel, opt, selector, objname);
1758 // No parser hook or object undefined
1759 return 1;
1760}
1761
1762////////////////////////////////////////////////////////////////////////////////
1763/// Create/destroy a named canvas for feedback
1764
1766{
1767 static void (*gFeedBackCanvasHook)(const char *, Bool_t) = 0;
1768
1769 // Load the library the first time
1770 if (!gFeedBackCanvasHook) {
1771 // Load library needed for graphics ...
1772 TString drawlib = "libProofDraw";
1773 char *p = 0;
1774 if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
1775 delete[] p;
1776 if (gSystem->Load(drawlib) != -1) {
1777 // Locate FeedBackCanvas
1778 Func_t f = 0;
1779 if ((f = gSystem->DynFindSymbol(drawlib,"FeedBackCanvas")))
1780 gFeedBackCanvasHook = (void (*)(const char *, Bool_t))(f);
1781 else
1782 Warning("FeedBackCanvas", "can't find FeedBackCanvas");
1783 } else
1784 Warning("FeedBackCanvas", "can't load %s", drawlib.Data());
1785 } else
1786 Warning("FeedBackCanvas", "can't locate %s", drawlib.Data());
1787 }
1788 if (gFeedBackCanvasHook) (*gFeedBackCanvasHook)(name, create);
1789 // No parser hook or object undefined
1790 return;
1791}
1792
1793////////////////////////////////////////////////////////////////////////////////
1794/// Return the size in bytes of the cache
1795
1797{
1798 if (fEvIter) return fEvIter->GetCacheSize();
1799 return -1;
1800}
1801
1802////////////////////////////////////////////////////////////////////////////////
1803/// Return the number of entries in the learning phase
1804
1806{
1807 if (fEvIter) return fEvIter->GetLearnEntries();
1808 return -1;
1809}
1810
1811////////////////////////////////////////////////////////////////////////////////
1812/// Switch on/off merge timer
1813
1815{
1816 if (on) {
1817 if (!fMergeSTW) fMergeSTW = new TStopwatch();
1818 PDB(kGlobal,1)
1819 Info("SetMerging", "ON: mergers: %d", fProof->fMergersCount);
1820 if (fNumMergers <= 0 && fProof->fMergersCount > 0)
1822 } else if (fMergeSTW) {
1823 fMergeSTW->Stop();
1824 Float_t rt = fMergeSTW->RealTime();
1825 PDB(kGlobal,1)
1826 Info("SetMerging", "OFF: rt: %f, mergers: %d", rt, fNumMergers);
1827 if (fQuery) {
1829 // On the master (or in Lite()) we set the merging time and the numebr of mergers
1830 fQuery->SetMergeTime(rt);
1832 } else {
1833 // In a standard client we save the transfer-to-client time
1834 fQuery->SetRecvTime(rt);
1835 }
1836 PDB(kGlobal,2) fQuery->Print("F");
1837 }
1838 }
1839}
1840
1841//------------------------------------------------------------------------------
1842
1844
1845////////////////////////////////////////////////////////////////////////////////
1846/// Process the specified TSelector object 'nentries' times.
1847/// Used to test the PROOF interator mechanism for cycle-driven selectors in a
1848/// local session.
1849/// The return value is -1 in case of error and TSelector::GetStatus()
1850/// in case of success.
1851
1853 Long64_t nentries, Option_t *option)
1854{
1855 if (!selector) {
1856 Error("Process", "selector object undefiend!");
1857 return -1;
1858 }
1859
1860 TDSetProxy *set = new TDSetProxy("", "", "");
1861 set->SetBit(TDSet::kEmpty);
1862 set->SetBit(TDSet::kIsLocal);
1863 Long64_t rc = Process(set, selector, option, nentries);
1864 SafeDelete(set);
1865
1866 // Done
1867 return rc;
1868}
1869
1870////////////////////////////////////////////////////////////////////////////////
1871/// Process the specified TSelector file 'nentries' times.
1872/// Used to test the PROOF interator mechanism for cycle-driven selectors in a
1873/// local session.
1874/// Process specified TDSet on PROOF worker with TSelector object
1875/// The return value is -1 in case of error and TSelector::GetStatus()
1876/// in case of success.
1877
1879 Long64_t nentries, Option_t *option)
1880{
1881 TDSetProxy *set = new TDSetProxy("", "", "");
1882 set->SetBit(TDSet::kEmpty);
1883 set->SetBit(TDSet::kIsLocal);
1884 Long64_t rc = Process(set, selector, option, nentries);
1885 SafeDelete(set);
1886
1887 // Done
1888 return rc;
1889}
1890
1891
1892//------------------------------------------------------------------------------
1893
1895
1896////////////////////////////////////////////////////////////////////////////////
1897/// Destructor.
1898
1900{
1901 SafeDelete(fOutput); // owns the output list
1903
1904 // Objects stored in maps are already deleted when merging the feedback
1907
1909}
1910
1911////////////////////////////////////////////////////////////////////////////////
1912/// Init the packetizer
1913/// Return 0 on success (fPacketizer is correctly initialized), -1 on failure.
1914
1916 Long64_t first, const char *defpackunit,
1917 const char *defpackdata)
1918{
1920 PDB(kGlobal,1) Info("Process","Enter");
1921 fDSet = dset;
1923
1924 // This is done here to pickup on the fly changes
1925 Int_t honebyone = 1;
1926 if (TProof::GetParameter(fInput, "PROOF_MergeTH1OneByOne", honebyone) != 0)
1927 honebyone = gEnv->GetValue("ProofPlayer.MergeTH1OneByOne", 1);
1928 fMergeTH1OneByOne = (honebyone == 1) ? kTRUE : kFALSE;
1929
1930 Bool_t noData = dset->TestBit(TDSet::kEmpty) ? kTRUE : kFALSE;
1931
1932 TString packetizer;
1933 TList *listOfMissingFiles = 0;
1934
1935 TMethodCall callEnv;
1936 TClass *cl;
1937 noData = dset->TestBit(TDSet::kEmpty) ? kTRUE : kFALSE;
1938
1939 if (noData) {
1940
1941 if (TProof::GetParameter(fInput, "PROOF_Packetizer", packetizer) != 0)
1942 packetizer = defpackunit;
1943 else
1944 Info("InitPacketizer", "using alternate packetizer: %s", packetizer.Data());
1945
1946 // Get linked to the related class
1947 cl = TClass::GetClass(packetizer);
1948 if (cl == 0) {
1949 Error("InitPacketizer", "class '%s' not found", packetizer.Data());
1951 return -1;
1952 }
1953
1954 // Init the constructor
1955 callEnv.InitWithPrototype(cl, cl->GetName(),"TList*,Long64_t,TList*,TProofProgressStatus*");
1956 if (!callEnv.IsValid()) {
1957 Error("InitPacketizer",
1958 "cannot find correct constructor for '%s'", cl->GetName());
1960 return -1;
1961 }
1962 callEnv.ResetParam();
1964 callEnv.SetParam((Long64_t) nentries);
1965 callEnv.SetParam((Longptr_t) fInput);
1967
1968 } else if (dset->TestBit(TDSet::kMultiDSet)) {
1969
1970 // We have to process many datasets in one go, keeping them separate
1972 // We have been asked to stop
1973 Error("InitPacketizer", "received stop/abort request");
1975 return -1;
1976 }
1977
1978 // The multi packetizer
1979 packetizer = "TPacketizerMulti";
1980
1981 // Get linked to the related class
1982 cl = TClass::GetClass(packetizer);
1983 if (cl == 0) {
1984 Error("InitPacketizer", "class '%s' not found", packetizer.Data());
1986 return -1;
1987 }
1988
1989 // Init the constructor
1990 callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
1991 if (!callEnv.IsValid()) {
1992 Error("InitPacketizer", "cannot find correct constructor for '%s'", cl->GetName());
1994 return -1;
1995 }
1996 callEnv.ResetParam();
1997 callEnv.SetParam((Longptr_t) dset);
1999 callEnv.SetParam((Long64_t) first);
2000 callEnv.SetParam((Long64_t) nentries);
2001 callEnv.SetParam((Longptr_t) fInput);
2003
2004 // We are going to test validity during the packetizer initialization
2005 dset->SetBit(TDSet::kValidityChecked);
2006 dset->ResetBit(TDSet::kSomeInvalid);
2007
2008 } else {
2009
2010 // Lookup - resolve the end-point urls to optmize the distribution.
2011 // The lookup was previously called in the packetizer's constructor.
2012 // A list for the missing files may already have been added to the
2013 // output list; otherwise, if needed it will be created inside
2014 if ((listOfMissingFiles = (TList *)fInput->FindObject("MissingFiles"))) {
2015 // Move it to the output list
2016 fInput->Remove(listOfMissingFiles);
2017 } else {
2018 listOfMissingFiles = new TList;
2019 }
2020 // Do the lookup; we only skip it if explicitly requested so.
2021 TString lkopt;
2022 if (TProof::GetParameter(fInput, "PROOF_LookupOpt", lkopt) != 0 || lkopt != "none")
2023 dset->Lookup(kTRUE, &listOfMissingFiles);
2024
2026 // We have been asked to stop
2027 Error("InitPacketizer", "received stop/abort request");
2029 return -1;
2030 }
2031
2032 if (!(dset->GetListOfElements()) ||
2033 !(dset->GetListOfElements()->GetSize())) {
2034 if (gProofServ)
2035 gProofServ->SendAsynMessage("InitPacketizer: No files from the data set were found - Aborting");
2036 Error("InitPacketizer", "No files from the data set were found - Aborting");
2038 if (listOfMissingFiles) {
2039 listOfMissingFiles->SetOwner();
2040 fOutput->Remove(listOfMissingFiles);
2041 SafeDelete(listOfMissingFiles);
2042 }
2043 return -1;
2044 }
2045
2046 if (TProof::GetParameter(fInput, "PROOF_Packetizer", packetizer) != 0)
2047 // Using standard packetizer TAdaptivePacketizer
2048 packetizer = defpackdata;
2049 else
2050 Info("InitPacketizer", "using alternate packetizer: %s", packetizer.Data());
2051
2052 // Get linked to the related class
2053 cl = TClass::GetClass(packetizer);
2054 if (cl == 0) {
2055 Error("InitPacketizer", "class '%s' not found", packetizer.Data());
2057 return -1;
2058 }
2059
2060 // Init the constructor
2061 callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
2062 if (!callEnv.IsValid()) {
2063 Error("InitPacketizer", "cannot find correct constructor for '%s'", cl->GetName());
2065 return -1;
2066 }
2067 callEnv.ResetParam();
2068 callEnv.SetParam((Longptr_t) dset);
2070 callEnv.SetParam((Long64_t) first);
2071 callEnv.SetParam((Long64_t) nentries);
2072 callEnv.SetParam((Longptr_t) fInput);
2074
2075 // We are going to test validity during the packetizer initialization
2076 dset->SetBit(TDSet::kValidityChecked);
2077 dset->ResetBit(TDSet::kSomeInvalid);
2078 }
2079
2080 // Get an instance of the packetizer
2081 Longptr_t ret = 0;
2082 callEnv.Execute(ret);
2083 if ((fPacketizer = (TVirtualPacketizer *)ret) == 0) {
2084 Error("InitPacketizer", "cannot construct '%s'", cl->GetName());
2086 return -1;
2087 }
2088
2089 if (!fPacketizer->IsValid()) {
2090 Error("InitPacketizer",
2091 "instantiated packetizer object '%s' is invalid", cl->GetName());
2094 return -1;
2095 }
2096
2097 // In multi mode retrieve the list of missing files
2098 if (!noData && dset->TestBit(TDSet::kMultiDSet)) {
2099 if ((listOfMissingFiles = (TList *) fInput->FindObject("MissingFiles"))) {
2100 // Remove it; it will be added to the output list
2101 fInput->Remove(listOfMissingFiles);
2102 }
2103 }
2104
2105 if (!noData) {
2106 // Add invalid elements to the list of missing elements
2107 TDSetElement *elem = 0;
2108 if (dset->TestBit(TDSet::kSomeInvalid)) {
2109 TIter nxe(dset->GetListOfElements());
2110 while ((elem = (TDSetElement *)nxe())) {
2111 if (!elem->GetValid()) {
2112 if (!listOfMissingFiles)
2113 listOfMissingFiles = new TList;
2114 listOfMissingFiles->Add(elem->GetFileInfo(dset->GetType()));
2115 dset->Remove(elem, kFALSE);
2116 }
2117 }
2118 // The invalid elements have been removed
2120 }
2121
2122 // Record the list of missing or invalid elements in the output list
2123 if (listOfMissingFiles && listOfMissingFiles->GetSize() > 0) {
2124 TIter missingFiles(listOfMissingFiles);
2125 TString msg;
2126 if (gDebug > 0) {
2127 TFileInfo *fi = 0;
2128 while ((fi = (TFileInfo *) missingFiles.Next())) {
2129 if (fi->GetCurrentUrl()) {
2130 msg = Form("File not found: %s - skipping!",
2131 fi->GetCurrentUrl()->GetUrl());
2132 } else {
2133 msg = Form("File not found: %s - skipping!", fi->GetName());
2134 }
2136 }
2137 }
2138 // Make sure it will be sent back
2139 if (!GetOutput("MissingFiles")) {
2140 listOfMissingFiles->SetName("MissingFiles");
2141 AddOutputObject(listOfMissingFiles);
2142 }
2143 TStatus *tmpStatus = (TStatus *)GetOutput("PROOF_Status");
2144 if (!tmpStatus) AddOutputObject((tmpStatus = new TStatus()));
2145
2146 // Estimate how much data are missing
2147 Int_t ngood = dset->GetListOfElements()->GetSize();
2148 Int_t nbad = listOfMissingFiles->GetSize();
2149 Double_t xb = Double_t(nbad) / Double_t(ngood + nbad);
2150 msg = Form(" About %.2f %c of the requested files (%d out of %d) were missing or unusable; details in"
2151 " the 'missingFiles' list", xb * 100., '%', nbad, nbad + ngood);
2152 tmpStatus->Add(msg.Data());
2153 msg = Form(" +++\n"
2154 " +++ About %.2f %c of the requested files (%d out of %d) are missing or unusable; details in"
2155 " the 'MissingFiles' list\n"
2156 " +++", xb * 100., '%', nbad, nbad + ngood);
2158 } else {
2159 // Cleanup
2160 SafeDelete(listOfMissingFiles);
2161 }
2162 }
2163
2164 // Done
2165 return 0;
2166}
2167
2168////////////////////////////////////////////////////////////////////////////////
2169/// Process specified TDSet on PROOF.
2170/// This method is called on client and on the PROOF master.
2171/// The return value is -1 in case of an error and TSelector::GetStatus() in
2172/// in case of success.
2173
2174Long64_t TProofPlayerRemote::Process(TDSet *dset, const char *selector_file,
2175 Option_t *option, Long64_t nentries,
2177{
2178 PDB(kGlobal,1) Info("Process", "Enter");
2179
2180 fDSet = dset;
2182
2183 if (!fProgressStatus) {
2184 Error("Process", "No progress status");
2185 return -1;
2186 }
2188
2189 // delete fOutput;
2190 if (!fOutput)
2191 fOutput = new THashList;
2192 else
2193 fOutput->Clear();
2194
2196
2197 if (fProof->IsMaster()){
2199 } else {
2201 }
2202
2203 TStopwatch elapsed;
2204
2205 // Define filename
2206 TString fn;
2207 fSelectorFileName = selector_file;
2208
2209 if (fCreateSelObj) {
2210 if(!SendSelector(selector_file)) return -1;
2211 fn = gSystem->BaseName(selector_file);
2212 } else {
2213 fn = selector_file;
2214 }
2215
2217
2218 // Parse option
2219 Bool_t sync = (fProof->GetQueryMode(option) == TProof::kSync);
2220
2221 TList *inputtmp = 0; // List of temporary input objects
2222 TDSet *set = dset;
2223 if (fProof->IsMaster()) {
2224
2225 PDB(kPacketizer,1) Info("Process","Create Proxy TDSet");
2226 set = new TDSetProxy( dset->GetType(), dset->GetObjName(),
2227 dset->GetDirectory() );
2228 if (dset->TestBit(TDSet::kEmpty))
2229 set->SetBit(TDSet::kEmpty);
2230
2231 if (InitPacketizer(dset, nentries, first, "TPacketizerUnit", "TPacketizer") != 0) {
2232 Error("Process", "cannot init the packetizer");
2234 return -1;
2235 }
2236
2237 // Reset start, this is now managed by the packetizer
2238 first = 0;
2239
2240 // Negative memlogfreq disable checks.
2241 // If 0 is passed we try to have 100 messages about memory
2242 // Otherwise we use the frequency passed.
2243 Int_t mrc = -1;
2244 Long64_t memlogfreq = -1, mlf;
2245 if (gSystem->Getenv("PROOF_MEMLOGFREQ")) {
2246 TString clf(gSystem->Getenv("PROOF_MEMLOGFREQ"));
2247 if (clf.IsDigit()) { memlogfreq = clf.Atoi(); mrc = 0; }
2248 }
2249 if ((mrc = TProof::GetParameter(fProof->GetInputList(), "PROOF_MemLogFreq", mlf)) == 0) memlogfreq = mlf;
2250 if (memlogfreq == 0) {
2251 memlogfreq = fPacketizer->GetTotalEntries()/(fProof->GetParallel()*100);
2252 if (memlogfreq <= 0) memlogfreq = 1;
2253 }
2254 if (mrc == 0) fProof->SetParameter("PROOF_MemLogFreq", memlogfreq);
2255
2256
2257 // Send input data, if any
2258 TString emsg;
2259 if (TProof::SendInputData(fQuery, fProof, emsg) != 0)
2260 Warning("Process", "could not forward input data: %s", emsg.Data());
2261
2262 // Attach to the transient histogram with the assigned packets, if required
2263 if (fInput->FindObject("PROOF_StatsHist") != 0) {
2264 if (!(fProcPackets = (TH1I *) fOutput->FindObject("PROOF_ProcPcktHist"))) {
2265 Warning("Process", "could not attach to histogram 'PROOF_ProcPcktHist'");
2266 } else {
2267 PDB(kLoop,1)
2268 Info("Process", "attached to histogram 'PROOF_ProcPcktHist' to record"
2269 " packets being processed");
2270 }
2271 }
2272
2273 } else {
2274
2275 // Check whether we have to enforce the use of submergers
2276 if (gEnv->Lookup("Proof.UseMergers") && !fInput->FindObject("PROOF_UseMergers")) {
2277 Int_t smg = gEnv->GetValue("Proof.UseMergers",-1);
2278 if (smg >= 0) {
2279 fInput->Add(new TParameter<Int_t>("PROOF_UseMergers", smg));
2280 if (gEnv->Lookup("Proof.MergersByHost")) {
2281 Int_t mbh = gEnv->GetValue("Proof.MergersByHost",0);
2282 if (mbh != 0) {
2283 // Administrator settings have the priority
2284 TObject *o = 0;
2285 if ((o = fInput->FindObject("PROOF_MergersByHost"))) { fInput->Remove(o); delete o; }
2286 fInput->Add(new TParameter<Int_t>("PROOF_MergersByHost", mbh));
2287 }
2288 }
2289 }
2290 }
2291
2292 // For a new query clients should make sure that the temporary
2293 // output list is empty
2294 if (fOutputLists) {
2296 delete fOutputLists;
2297 fOutputLists = 0;
2298 }
2299
2300 if (!sync) {
2302 Printf(" ");
2303 Info("Process","starting new query");
2304 }
2305
2306 // Define fSelector in Client if processing with filename
2307 if (fCreateSelObj) {
2309 if (!(fSelector = TSelector::GetSelector(selector_file))) {
2310 if (!sync)
2312 return -1;
2313 }
2314 }
2315
2316 fSelectorClass = 0;
2317 fSelectorClass = fSelector->IsA();
2318
2319 // Add fSelector to inputlist if processing with object
2320 if (!fCreateSelObj) {
2321 // In any input list was set into the selector move it to the PROOF
2322 // input list, because we do not want to stream the selector one
2323 if (fSelector->GetInputList() && fSelector->GetInputList()->GetSize() > 0) {
2325 TObject *o = 0;
2326 while ((o = nxi())) {
2327 if (!fInput->FindObject(o)) {
2328 fInput->Add(o);
2329 if (!inputtmp) {
2330 inputtmp = new TList;
2331 inputtmp->SetOwner(kFALSE);
2332 }
2333 inputtmp->Add(o);
2334 }
2335 }
2336 }
2338 }
2339 // Set the input list for initialization
2341 fSelector->SetOption(option);
2343
2344 PDB(kLoop,1) Info("Process","Call Begin(0)");
2345 fSelector->Begin(0);
2346
2347 // Reset the input list to avoid double streaming and related problems (saving
2348 // the TQueryResult)
2350
2351 // Send large input data objects, if any
2353
2354 if (!sync)
2356 }
2357
2358 TCleanup clean(this);
2359 SetupFeedback();
2360
2361 TString opt = option;
2362
2363 // Old servers need a dedicated streamer
2364 if (fProof->fProtocol < 13)
2365 dset->SetWriteV3(kTRUE);
2366
2367 // Workers will get the entry ranges from the packetizer
2370
2371 // Entry- or Event- list ?
2372 TEntryList *enl = (!fProof->IsMaster()) ? dynamic_cast<TEntryList *>(set->GetEntryList())
2373 : (TEntryList *)0;
2374 TEventList *evl = (!fProof->IsMaster() && !enl) ? dynamic_cast<TEventList *>(set->GetEntryList())
2375 : (TEventList *)0;
2376 if (fProof->fProtocol > 14) {
2377 if (fProcessMessage) delete fProcessMessage;
2379 mesg << set << fn << fInput << opt << num << fst << evl << sync << enl;
2380 (*fProcessMessage) << set << fn << fInput << opt << num << fst << evl << sync << enl;
2381 } else {
2382 mesg << set << fn << fInput << opt << num << fst << evl << sync;
2383 if (enl)
2384 // Not supported remotely
2385 Warning("Process","entry lists not supported by the server");
2386 }
2387
2388 // Reset the merging progress information
2390
2391 Int_t nb = fProof->Broadcast(mesg);
2392 PDB(kGlobal,1) Info("Process", "Broadcast called: %d workers notified", nb);
2393 if (fProof->IsLite()) fProof->fNotIdle += nb;
2394
2395 // Reset streamer choice
2396 if (fProof->fProtocol < 13)
2397 dset->SetWriteV3(kFALSE);
2398
2399 // Redirect logs from master to special log frame
2400 if (IsClient())
2402
2403 if (!IsClient()){
2404 // Signal the start of finalize for the memory log grepping
2405 Info("Process|Svc", "Start merging Memory information");
2406 }
2407
2408 if (!sync) {
2409 if (IsClient()) {
2410 // Asynchronous query: just make sure that asynchronous input
2411 // is enabled and return the prompt
2412 PDB(kGlobal,1) Info("Process","Asynchronous processing:"
2413 " activating CollectInputFrom");
2414 fProof->Activate();
2415
2416 // Receive the acknowledgement and query sequential number
2417 fProof->Collect();
2418
2419 return fProof->fSeqNum;
2420
2421 } else {
2422 PDB(kGlobal,1) Info("Process","Calling Collect");
2423 fProof->Collect();
2424
2425 HandleTimer(0); // force an update of final result
2426 // This forces a last call to TPacketizer::HandleTimer via the second argument
2427 // (the first is ignored). This is needed when some events were skipped so that
2428 // the total number of entries is not the one requested. The packetizer has no
2429 // way in such a case to understand that processing is finished: it must be told.
2430 if (fPacketizer) {
2432 // The progress timer will now stop itself at the next call
2434 // Store process info
2435 elapsed.Stop();
2436 if (fQuery)
2439 elapsed.RealTime());
2440 }
2441 StopFeedback();
2442
2443 return Finalize(kFALSE,sync);
2444 }
2445 } else {
2446
2447 PDB(kGlobal,1) Info("Process","Synchronous processing: calling Collect");
2448 fProof->Collect();
2449 if (!(fProof->IsSync())) {
2450 // The server required to switch to asynchronous mode
2451 Info("Process", "switching to the asynchronous mode ...");
2452 return fProof->fSeqNum;
2453 }
2454
2455 // Restore prompt logging, for clients (Collect leaves things as they were
2456 // at the time it was called)
2457 if (IsClient())
2459
2460 if (!IsClient()) {
2461 // Force an update of final result
2462 HandleTimer(0);
2463 // This forces a last call to TPacketizer::HandleTimer via the second argument
2464 // (the first is ignored). This is needed when some events were skipped so that
2465 // the total number of entries is not the one requested. The packetizer has no
2466 // way in such a case to understand that processing is finished: it must be told.
2467 if (fPacketizer) {
2469 // The progress timer will now stop itself at the next call
2471 // Store process info
2472 if (fQuery)
2476 }
2477 } else {
2478 // Set the input list: maybe required at termination
2480 }
2481 StopFeedback();
2482
2483 Long64_t rc = -1;
2485 rc = Finalize(kFALSE,sync);
2486
2487 // Remove temporary input objects, if any
2488 if (inputtmp) {
2489 TIter nxi(inputtmp);
2490 TObject *o = 0;
2491 while ((o = nxi())) fInput->Remove(o);
2492 SafeDelete(inputtmp);
2493 }
2494
2495 // Done
2496 return rc;
2497 }
2498}
2499
2500////////////////////////////////////////////////////////////////////////////////
2501/// Process specified TDSet on PROOF.
2502/// This method is called on client and on the PROOF master.
2503/// The return value is -1 in case of an error and TSelector::GetStatus() in
2504/// in case of success.
2505
2507 Option_t *option, Long64_t nentries,
2509{
2510 if (!selector) {
2511 Error("Process", "selector object undefined");
2512 return -1;
2513 }
2514
2515 // Define fSelector in Client
2516 if (IsClient() && (selector != fSelector)) {
2518 fSelector = selector;
2519 }
2520
2522 Long64_t rc = Process(dset, selector->ClassName(), option, nentries, first);
2524
2525 // Done
2526 return rc;
2527}
2528
2529////////////////////////////////////////////////////////////////////////////////
2530/// Prepares the given list of new workers to join a progressing process.
2531/// Returns kTRUE on success, kFALSE otherwise.
2532
2534{
2535 if (!fProcessMessage || !fProof || !fPacketizer) {
2536 Error("Process", "Should not happen: fProcessMessage=%p fProof=%p fPacketizer=%p",
2538 return kFALSE;
2539 }
2540
2541 if (!workers || !fProof->IsMaster()) {
2542 Error("Process", "Invalid call");
2543 return kFALSE;
2544 }
2545
2546 PDB(kGlobal, 1)
2547 Info("Process", "Preparing %d new worker(s) to process", workers->GetEntries());
2548
2549 // Sends the file associated to the TSelector, if necessary
2550 if (fCreateSelObj) {
2551 PDB(kGlobal, 2)
2552 Info("Process", "Sending selector file %s", fSelectorFileName.Data());
2554 Error("Process", "Problems in sending selector file %s", fSelectorFileName.Data());
2555 return kFALSE;
2556 }
2557 }
2558
2559 if (fProof->IsLite()) fProof->fNotIdle += workers->GetSize();
2560
2561 PDB(kGlobal, 2)
2562 Info("Process", "Adding new workers to the packetizer");
2563 if (fPacketizer->AddWorkers(workers) == -1) {
2564 Error("Process", "Cannot add new workers to the packetizer!");
2565 return kFALSE; // TODO: make new wrks inactive
2566 }
2567
2568 PDB(kGlobal, 2)
2569 Info("Process", "Broadcasting process message to new workers");
2570 fProof->Broadcast(*fProcessMessage, workers);
2571
2572 // Don't call Collect(): we came here from a global Collect() already which
2573 // will take care of new workers as well
2574
2575 return kTRUE;
2576
2577}
2578
2579////////////////////////////////////////////////////////////////////////////////
2580/// Merge output in files
2581
2583{
2584 PDB(kOutput,1) Info("MergeOutputFiles", "enter: fOutput size: %d", fOutput->GetSize());
2585 PDB(kOutput,2) fOutput->ls();
2586
2587 TList *rmList = 0;
2588 if (fMergeFiles) {
2589 TIter nxo(fOutput);
2590 TObject *o = 0;
2591 TProofOutputFile *pf = 0;
2592 while ((o = nxo())) {
2593 if ((pf = dynamic_cast<TProofOutputFile*>(o))) {
2594
2595 PDB(kOutput,2) pf->Print();
2596
2597 if (pf->IsMerge()) {
2598
2599 // Point to the merger
2600 Bool_t localMerge = (pf->GetTypeOpt() == TProofOutputFile::kLocal) ? kTRUE : kFALSE;
2601 TFileMerger *filemerger = pf->GetFileMerger(localMerge);
2602 if (!filemerger) {
2603 Error("MergeOutputFiles", "file merger is null in TProofOutputFile! Protocol error?");
2604 pf->Print();
2605 continue;
2606 }
2607 // If only one instance the list in the merger is not yet created: do it now
2608 if (!pf->IsMerged()) {
2609 PDB(kOutput,2) pf->Print();
2610 TString fileLoc = TString::Format("%s/%s", pf->GetDir(), pf->GetFileName());
2611 filemerger->AddFile(fileLoc);
2612 }
2613 // Datadir
2614 TString ddir, ddopts;
2615 if (gProofServ) {
2616 ddir.Form("%s/", gProofServ->GetDataDir());
2618 }
2619 // Set the output file
2620 TString outfile(pf->GetOutputFileName());
2621 if (outfile.Contains("<datadir>/")) {
2622 outfile.ReplaceAll("<datadir>/", ddir.Data());
2623 if (!ddopts.IsNull())
2624 outfile += TString::Format("?%s", ddopts.Data());
2625 pf->SetOutputFileName(outfile);
2626 }
2627 if ((gProofServ && gProofServ->IsTopMaster()) || (fProof && fProof->IsLite())) {
2629 TString srv;
2631 TUrl usrv(srv);
2632 Bool_t localFile = kFALSE;
2633 if (pf->IsRetrieve()) {
2634 // This file will be retrieved by the client: we created it in the data dir
2635 // and save the file URL on the client in the title
2636 if (outfile.BeginsWith("client:")) outfile.Replace(0, 7, "");
2637 TString bn = gSystem->BaseName(TUrl(outfile.Data(), kTRUE).GetFile());
2638 // The output file path on the master
2639 outfile.Form("%s%s", ddir.Data(), bn.Data());
2640 // Save the client path in the title if not defined yet
2641 if (strlen(pf->GetTitle()) <= 0) pf->SetTitle(bn);
2642 // The file is local
2643 localFile = kTRUE;
2644 } else {
2645 // Check if the file is on the master or elsewhere
2646 if (outfile.BeginsWith("master:")) outfile.Replace(0, 7, "");
2647 // Check locality
2648 TUrl uof(outfile.Data(), kTRUE);
2649 TString lfn;
2650 ftyp = TFile::GetType(uof.GetUrl(), "RECREATE", &lfn);
2651 if (ftyp == TFile::kLocal && !srv.IsNull()) {
2652 // Check if is a different server
2653 if (uof.GetPort() > 0 && usrv.GetPort() > 0 &&
2654 usrv.GetPort() != uof.GetPort()) ftyp = TFile::kNet;
2655 }
2656 // If it is really local set the file name
2657 if (ftyp == TFile::kLocal) outfile = lfn;
2658 // The file maybe local
2659 if (ftyp == TFile::kLocal || ftyp == TFile::kFile) localFile = kTRUE;
2660 }
2661 // The remote output file name (the one to be used by the client)
2662 TString outfilerem(outfile);
2663 // For local files we add the local server
2664 if (localFile) {
2665 // Remove prefix, if any, if included and if Xrootd
2666 TProofServ::FilterLocalroot(outfilerem, srv);
2667 outfilerem.Insert(0, srv);
2668 }
2669 // Save the new remote output filename
2670 pf->SetOutputFileName(outfilerem);
2671 // Align the filename
2672 pf->SetFileName(gSystem->BaseName(outfilerem));
2673 }
2674 if (!filemerger->OutputFile(outfile)) {
2675 Error("MergeOutputFiles", "cannot open the output file");
2676 continue;
2677 }
2678 // Merge
2679 PDB(kSubmerger,2) filemerger->PrintFiles("");
2680 if (!filemerger->Merge()) {
2681 Error("MergeOutputFiles", "cannot merge the output files");
2682 continue;
2683 }
2684 // Remove the files
2685 TList *fileList = filemerger->GetMergeList();
2686 if (fileList) {
2687 TIter next(fileList);
2688 TObjString *url = 0;
2689 while((url = (TObjString*)next())) {
2690 TUrl u(url->GetName());
2691 if (!strcmp(u.GetProtocol(), "file")) {
2692 gSystem->Unlink(u.GetFile());
2693 } else {
2694 gSystem->Unlink(url->GetName());
2695 }
2696 }
2697 }
2698 // Reset the merger
2699 filemerger->Reset();
2700
2701 } else {
2702
2703 // If not yet merged (for example when having only 1 active worker,
2704 // we need to create the dataset by calling Merge on an effectively empty list
2705 if (!pf->IsMerged()) {
2706 TList dumlist;
2707 dumlist.Add(new TNamed("dum", "dum"));
2708 dumlist.SetOwner(kTRUE);
2709 pf->Merge(&dumlist);
2710 }
2711 // Point to the dataset
2713 if (!fc) {
2714 Error("MergeOutputFiles", "file collection is null in TProofOutputFile! Protocol error?");
2715 pf->Print();
2716 continue;
2717 }
2718 // Add the collection to the output list for registration and/or to be returned
2719 // to the client
2720 fOutput->Add(fc);
2721 // Do not cleanup at destruction
2722 pf->ResetFileCollection();
2723 // Tell the main thread to register this dataset, if needed
2724 if (pf->IsRegister()) {
2725 TString opt;
2726 if ((pf->GetTypeOpt() & TProofOutputFile::kOverwrite)) opt += "O";
2727 if ((pf->GetTypeOpt() & TProofOutputFile::kVerify)) opt += "V";
2728 if (!fOutput->FindObject("PROOFSERV_RegisterDataSet"))
2729 fOutput->Add(new TNamed("PROOFSERV_RegisterDataSet", ""));
2730 TString tag = TString::Format("DATASET_%s", pf->GetTitle());
2731 fOutput->Add(new TNamed(tag, opt));
2732 }
2733 // Remove this object from the output list and schedule it for distruction
2734 fOutput->Remove(pf);
2735 if (!rmList) rmList = new TList;
2736 rmList->Add(pf);
2737 PDB(kOutput,2) fOutput->Print();
2738 }
2739 }
2740 }
2741 }
2742
2743 // Remove objects scheduled for removal
2744 if (rmList && rmList->GetSize() > 0) {
2745 TIter nxo(rmList);
2746 TObject *o = 0;
2747 while((o = nxo())) {
2748 fOutput->Remove(o);
2749 }
2750 rmList->SetOwner(kTRUE);
2751 delete rmList;
2752 }
2753
2754 PDB(kOutput,1) Info("MergeOutputFiles", "done!");
2755
2756 // Done
2757 return kTRUE;
2758}
2759
2760
2761////////////////////////////////////////////////////////////////////////////////
2762/// Set the selector's data members:
2763/// find the mapping of data members to otuput list entries in the output list
2764/// and apply it.
2765
2767{
2770 if (!olsdm) {
2771 PDB(kOutput,1) Warning("SetSelectorDataMembersFromOutputList",
2772 "failed to find map object in output list!");
2773 return;
2774 }
2775
2776 olsdm->SetDataMembers(fSelector);
2777}
2778
2779////////////////////////////////////////////////////////////////////////////////
2780
2782{
2783 // Finalize a query.
2784 // Returns -1 in case of an error, 0 otherwise.
2785
2786 if (IsClient()) {
2787 if (fOutputLists == 0) {
2788 if (force)
2789 if (fQuery)
2790 return fProof->Finalize(Form("%s:%s", fQuery->GetTitle(),
2791 fQuery->GetName()), force);
2792 } else {
2793 // Make sure the all objects are in the output list
2794 PDB(kGlobal,1) Info("Finalize","Calling Merge Output to finalize the output list");
2795 MergeOutput();
2796 }
2797 }
2798
2799 Long64_t rv = 0;
2800 if (fProof->IsMaster()) {
2801
2802 // Fill information for monitoring and stop it
2803 TStatus *status = (TStatus *) fOutput->FindObject("PROOF_Status");
2804 if (!status) {
2805 // The query was aborted: let's add some info in the output list
2806 status = new TStatus();
2807 fOutput->Add(status);
2808 TString emsg = TString::Format("Query aborted after %lld entries", GetEventsProcessed());
2809 status->Add(emsg);
2810 }
2811 status->SetExitStatus((Int_t) GetExitStatus());
2812
2813 PDB(kOutput,1) Info("Finalize","Calling Merge Output");
2814 // Some objects (e.g. histos in autobin) may not have been merged yet
2815 // do it now
2816 MergeOutput();
2817
2818 fOutput->SetOwner();
2819
2820 // Add the active-wrks-vs-proctime info from the packetizer
2821 if (fPacketizer) {
2823 if (pperf) fOutput->Add(pperf);
2825 if (parms) {
2826 TIter nxo(parms);
2827 TObject *o = 0;
2828 while ((o = nxo())) fOutput->Add(o);
2829 }
2830
2831 // If other invalid elements were found during processing, add them to the
2832 // list of missing elements
2833 TDSetElement *elem = 0;
2836 TList *listOfMissingFiles = (TList *) fOutput->FindObject("MissingFiles");
2837 if (!listOfMissingFiles) {
2838 listOfMissingFiles = new TList;
2839 listOfMissingFiles->SetName("MissingFiles");
2840 }
2842 while ((elem = (TDSetElement *)nxe()))
2843 listOfMissingFiles->Add(elem->GetFileInfo(type));
2844 if (!fOutput->FindObject(listOfMissingFiles)) fOutput->Add(listOfMissingFiles);
2845 }
2846 }
2847
2849 // Save memory usage on master
2850 Long_t vmaxmst, rmaxmst;
2851 TPerfStats::GetMemValues(vmaxmst, rmaxmst);
2852 status->SetMemValues(vmaxmst, rmaxmst, kTRUE);
2853
2855
2856 } else {
2857 if (fExitStatus != kAborted) {
2858
2859 if (!sync) {
2860 // Reinit selector (with multi-sessioning we must do this until
2861 // TSelector::GetSelector() is optimized to i) avoid reloading of an
2862 // unchanged selector and ii) invalidate existing instances of
2863 // reloaded selector)
2864 if (ReinitSelector(fQuery) == -1) {
2865 Info("Finalize", "problems reinitializing selector \"%s\"",
2866 fQuery->GetSelecImp()->GetName());
2867 return -1;
2868 }
2869 }
2870
2871 if (fPacketizer)
2872 if (TList *failedPackets = fPacketizer->GetFailedPackets()) {
2874 failedPackets->SetName("FailedPackets");
2875 AddOutputObject(failedPackets);
2876
2877 TStatus *status = (TStatus *)GetOutput("PROOF_Status");
2878 if (!status) AddOutputObject((status = new TStatus()));
2879 status->Add("Some packets were not processed! Check the the"
2880 " 'FailedPackets' list in the output list");
2881 }
2882
2883 // Some input parameters may be needed in Terminate
2885
2887 if (output) {
2888 TIter next(fOutput);
2889 while(TObject* obj = next()) {
2890 if (fProof->IsParallel() || DrawCanvas(obj) == 1)
2891 // Either parallel or not a canvas or not able to display it:
2892 // just add to the list
2893 output->Add(obj);
2894 }
2895 } else {
2896 Warning("Finalize", "undefined output list in the selector! Protocol error?");
2897 }
2898
2899 // We need to do this because the output list can be modified in TSelector::Terminate
2900 // in a way to invalidate existing objects; so we clean the links when still valid and
2901 // we re-copy back later
2903 fOutput->Clear("nodelete");
2904
2905 // Map output objects to selector members
2907
2908 PDB(kLoop,1) Info("Finalize","Call Terminate()");
2909 // This is the end of merging
2911 // We measure the merge time
2913 // Call Terminate now
2915
2916 rv = fSelector->GetStatus();
2917
2918 // Copy the output list back and clean the selector's list
2919 TIter it(output);
2920 while(TObject* o = it()) {
2921 fOutput->Add(o);
2922 }
2923
2924 // Save the output list in the current query, if any
2925 if (fQuery) {
2927 // Set in finalized state (cannot be done twice)
2929 } else {
2930 Warning("Finalize","current TQueryResult object is undefined!");
2931 }
2932
2933 if (!fCreateSelObj) {
2936 if (output) output->Remove(fSelector);
2937 fSelector = 0;
2938 }
2939
2940 // We have transferred copy of the output objects in TQueryResult,
2941 // so now we can cleanup the selector, making sure that we do not
2942 // touch the output objects
2943 if (output) { output->SetOwner(kFALSE); output->Clear("nodelete"); }
2945
2946 // Delete fOutput (not needed anymore, cannot be finalized twice),
2947 // making sure that the objects saved in TQueryResult are not deleted
2949 fOutput->Clear("nodelete");
2951
2952 } else {
2953
2954 // Cleanup
2955 fOutput->SetOwner();
2957 if (!fCreateSelObj) fSelector = 0;
2958 }
2959 }
2960 PDB(kGlobal,1) Info("Process","exit");
2961
2962 if (!IsClient()) {
2963 Info("Finalize", "finalization on %s finished", gProofServ->GetPrefix());
2964 }
2966
2967 return rv;
2968}
2969
2970////////////////////////////////////////////////////////////////////////////////
2971/// Finalize the results of a query already processed.
2972
2974{
2975 PDB(kGlobal,1) Info("Finalize(TQueryResult *)","Enter");
2976
2977 if (!IsClient()) {
2978 Info("Finalize(TQueryResult *)",
2979 "method to be executed only on the clients");
2980 return -1;
2981 }
2982
2983 if (!qr) {
2984 Info("Finalize(TQueryResult *)", "query undefined");
2985 return -1;
2986 }
2987
2988 if (qr->IsFinalized()) {
2989 Info("Finalize(TQueryResult *)", "query already finalized");
2990 return -1;
2991 }
2992
2993 // Reset the list
2994 if (!fOutput)
2995 fOutput = new THashList;
2996 else
2997 fOutput->Clear();
2998
2999 // Make sure that the temporary output list is empty
3000 if (fOutputLists) {
3002 delete fOutputLists;
3003 fOutputLists = 0;
3004 }
3005
3006 // Re-init the selector
3008
3009 // Import the output list
3010 TList *tmp = (TList *) qr->GetOutputList();
3011 if (!tmp) {
3013 Info("Finalize(TQueryResult *)", "outputlist is empty");
3014 return -1;
3015 }
3016 TList *out = fOutput;
3017 if (fProof->fProtocol < 11)
3018 out = new TList;
3019 TIter nxo(tmp);
3020 TObject *o = 0;
3021 while ((o = nxo()))
3022 out->Add(o->Clone());
3023
3024 // Adopts the list
3025 if (fProof->fProtocol < 11) {
3026 out->SetOwner();
3027 StoreOutput(out);
3028 }
3030
3032
3033 // Finalize it
3034 SetCurrentQuery(qr);
3035 Long64_t rc = Finalize();
3037
3038 return rc;
3039}
3040
3041////////////////////////////////////////////////////////////////////////////////
3042/// Send the selector file(s) to master or worker nodes.
3043
3045{
3046 // Check input
3047 if (!selector_file) {
3048 Info("SendSelector", "Invalid input: selector (file) name undefined");
3049 return kFALSE;
3050 }
3051
3052 if (!strchr(gSystem->BaseName(selector_file), '.')) {
3053 if (gDebug > 1)
3054 Info("SendSelector", "selector name '%s' does not contain a '.':"
3055 " nothing to send, it will be loaded from a library", selector_file);
3056 return kTRUE;
3057 }
3058
3059 // Extract the fine name first
3060 TString selec = selector_file;
3061 TString aclicMode;
3062 TString arguments;
3063 TString io;
3064 selec = gSystem->SplitAclicMode(selec, aclicMode, arguments, io);
3065
3066 // Expand possible envs or '~'
3067 gSystem->ExpandPathName(selec);
3068
3069 // Update the macro path
3071 TString np = gSystem->GetDirName(selec);
3072 if (!np.IsNull()) {
3073 np += ":";
3074 if (!mp.BeginsWith(np) && !mp.Contains(":"+np)) {
3075 Int_t ip = (mp.BeginsWith(".:")) ? 2 : 0;
3076 mp.Insert(ip, np);
3078 if (gDebug > 0)
3079 Info("SendSelector", "macro path set to '%s'", TROOT::GetMacroPath());
3080 }
3081 }
3082
3083 // Header file
3084 TString header = selec;
3085 header.Remove(header.Last('.'));
3086 header += ".h";
3087 if (gSystem->AccessPathName(header, kReadPermission)) {
3088 TString h = header;
3089 header.Remove(header.Last('.'));
3090 header += ".hh";
3091 if (gSystem->AccessPathName(header, kReadPermission)) {
3092 Info("SendSelector",
3093 "header file not found: tried: %s %s", h.Data(), header.Data());
3094 return kFALSE;
3095 }
3096 }
3097
3098 // Send files now
3100 Info("SendSelector", "problems sending implementation file %s", selec.Data());
3101 return kFALSE;
3102 }
3103 if (fProof->SendFile(header, (TProof::kBinary | TProof::kForward | TProof::kCp)) == -1) {
3104 Info("SendSelector", "problems sending header file %s", header.Data());
3105 return kFALSE;
3106 }
3107
3108 return kTRUE;
3109}
3110
3111////////////////////////////////////////////////////////////////////////////////
3112/// Merge objects in output the lists.
3113
3115{
3116 PDB(kOutput,1) Info("MergeOutput","Enter");
3117
3118 TObject *obj = 0;
3119 if (fOutputLists) {
3120
3121 TIter next(fOutputLists);
3122
3123 TList *list;
3124 while ( (list = (TList *) next()) ) {
3125
3126 if (!(obj = fOutput->FindObject(list->GetName()))) {
3127 obj = list->First();
3128 list->Remove(obj);
3129 fOutput->Add(obj);
3130 }
3131
3132 if ( list->IsEmpty() ) continue;
3133
3134 TMethodCall callEnv;
3135 if (obj->IsA())
3136 callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
3137 if (callEnv.IsValid()) {
3138 callEnv.SetParam((Longptr_t) list);
3139 callEnv.Execute(obj);
3140 } else {
3141 // No Merge interface, return individual objects
3142 while ( (obj = list->First()) ) {
3143 fOutput->Add(obj);
3144 list->Remove(obj);
3145 }
3146 }
3147 }
3149
3150 } else {
3151
3152 PDB(kOutput,1) Info("MergeOutput","fOutputLists empty");
3153 }
3154
3155 if (!IsClient() || fProof->IsLite()) {
3156 // Merge the output files created on workers, if any
3158 }
3159
3160 // If there are TProofOutputFile objects we have to make sure that the internal
3161 // information is consistent for the cases where this object is going to be merged
3162 // again (e.g. when using submergers or in a multi-master setup). This may not be
3163 // the case because the first coming in is taken as reference and it has the
3164 // internal dir and raw dir of the originating worker.
3165 TString key;
3166 TNamed *nm = 0;
3167 TList rmlist;
3168 TIter nxo(fOutput);
3169 while ((obj = nxo())) {
3170 TProofOutputFile *pf = dynamic_cast<TProofOutputFile *>(obj);
3171 if (pf) {
3172 if (gProofServ) {
3173 PDB(kOutput,2) Info("MergeOutput","found TProofOutputFile '%s'", obj->GetName());
3174 TString dir(pf->GetOutputFileName());
3175 PDB(kOutput,2) Info("MergeOutput","outputfilename: '%s'", dir.Data());
3176 // The dir
3177 if (dir.Last('/') != kNPOS) dir.Remove(dir.Last('/')+1);
3178 PDB(kOutput,2) Info("MergeOutput","dir: '%s'", dir.Data());
3179 pf->SetDir(dir);
3180 // The raw dir; for xrootd based system we include the 'localroot', if any
3181 TUrl u(dir);
3182 dir = u.GetFile();
3183 TString pfx = gEnv->GetValue("Path.Localroot","");
3184 if (!pfx.IsNull() &&
3185 (!strcmp(u.GetProtocol(), "root") || !strcmp(u.GetProtocol(), "xrd")))
3186 dir.Insert(0, pfx);
3187 PDB(kOutput,2) Info("MergeOutput","rawdir: '%s'", dir.Data());
3188 pf->SetDir(dir, kTRUE);
3189 // The worker ordinal
3191 // The saved output file name, if any
3192 key.Form("PROOF_OutputFileName_%s", pf->GetFileName());
3193 if ((nm = (TNamed *) fOutput->FindObject(key.Data()))) {
3194 pf->SetOutputFileName(nm->GetTitle());
3195 rmlist.Add(nm);
3197 pf->SetOutputFileName(0);
3199 }
3200 // The filename (order is important to exclude '.merger' from the key)
3201 dir = pf->GetFileName();
3203 dir += ".merger";
3204 pf->SetMerged(kFALSE);
3205 } else {
3206 if (dir.EndsWith(".merger")) dir.Remove(dir.Last('.'));
3207 }
3208 pf->SetFileName(dir);
3209 } else if (fProof->IsLite()) {
3210 // The ordinal
3211 pf->SetWorkerOrdinal("0");
3212 // The dir
3214 // The filename and raw dir
3215 TUrl u(pf->GetOutputFileName(), kTRUE);
3218 // Notify the output path
3219 Printf("\nOutput file: %s", pf->GetOutputFileName());
3220 }
3221 } else {
3222 PDB(kOutput,2) Info("MergeOutput","output object '%s' is not a TProofOutputFile", obj->GetName());
3223 }
3224 }
3225
3226 // Remove temporary objects from fOutput
3227 if (rmlist.GetSize() > 0) {
3228 TIter nxrm(&rmlist);
3229 while ((obj = nxrm()))
3230 fOutput->Remove(obj);
3231 rmlist.SetOwner(kTRUE);
3232 }
3233
3234 // If requested (typically in case of submerger to count possible side-effects in that process)
3235 // save the measured memory usage
3236 if (saveMemValues) {
3238 // Save memory usage on master
3239 Long_t vmaxmst, rmaxmst;
3240 TPerfStats::GetMemValues(vmaxmst, rmaxmst);
3241 TStatus *status = (TStatus *) fOutput->FindObject("PROOF_Status");
3242 if (status) status->SetMemValues(vmaxmst, rmaxmst, kFALSE);
3243 }
3244
3245 PDB(kOutput,1) fOutput->Print();
3246 PDB(kOutput,1) Info("MergeOutput","leave (%d object(s))", fOutput->GetSize());
3247}
3248
3249////////////////////////////////////////////////////////////////////////////////
3250/// Progress signal.
3251
3253{
3254 if (IsClient()) {
3255 fProof->Progress(total, processed);
3256 } else {
3257 // Send to the previous tier
3259 m << total << processed;
3261 }
3262}
3263
3264////////////////////////////////////////////////////////////////////////////////
3265/// Progress signal.
3266
3268 Long64_t bytesread,
3269 Float_t initTime, Float_t procTime,
3270 Float_t evtrti, Float_t mbrti)
3271{
3272 PDB(kGlobal,1)
3273 Info("Progress","%lld %lld %lld %f %f %f %f", total, processed, bytesread,
3274 initTime, procTime, evtrti, mbrti);
3275
3276 if (IsClient()) {
3277 fProof->Progress(total, processed, bytesread, initTime, procTime, evtrti, mbrti);
3278 } else {
3279 // Send to the previous tier
3281 m << total << processed << bytesread << initTime << procTime << evtrti << mbrti;
3283 }
3284}
3285
3286////////////////////////////////////////////////////////////////////////////////
3287/// Progress signal.
3288
3290{
3291 if (pi) {
3292 PDB(kGlobal,1)
3293 Info("Progress","%lld %lld %lld %f %f %f %f %d %f", pi->fTotal, pi->fProcessed, pi->fBytesRead,
3294 pi->fInitTime, pi->fProcTime, pi->fEvtRateI, pi->fMBRateI,
3295 pi->fActWorkers, pi->fEffSessions);
3296
3297 if (IsClient()) {
3298 fProof->Progress(pi->fTotal, pi->fProcessed, pi->fBytesRead,
3299 pi->fInitTime, pi->fProcTime,
3300 pi->fEvtRateI, pi->fMBRateI,
3301 pi->fActWorkers, pi->fTotSessions, pi->fEffSessions);
3302 } else {
3303 // Send to the previous tier
3305 m << pi;
3307 }
3308 } else {
3309 Warning("Progress","TProofProgressInfo object undefined!");
3310 }
3311}
3312
3313
3314////////////////////////////////////////////////////////////////////////////////
3315/// Feedback signal.
3316
3318{
3319 fProof->Feedback(objs);
3320}
3321
3322////////////////////////////////////////////////////////////////////////////////
3323/// Stop process after this event.
3324
3326{
3327 if (fPacketizer != 0)
3329 if (abort == kTRUE)
3331 else
3333}
3334
3335////////////////////////////////////////////////////////////////////////////////
3336/// Incorporate the received object 'obj' into the output list fOutput.
3337/// The latter is created if not existing.
3338/// This method short cuts 'StoreOutput + MergeOutput' optimizing the memory
3339/// consumption.
3340/// Returns -1 in case of error, 1 if the object has been merged into another
3341/// one (so that its ownership has not been taken and can be deleted), and 0
3342/// otherwise.
3343
3345{
3346 PDB(kOutput,1)
3347 Info("AddOutputObject","Enter: %p (%s)", obj, obj ? obj->ClassName() : "undef");
3348
3349 // We must something to process
3350 if (!obj) {
3351 PDB(kOutput,1) Info("AddOutputObject","Invalid input (obj == 0x0)");
3352 return -1;
3353 }
3354
3355 // Create the output list, if not yet done
3356 if (!fOutput)
3357 fOutput = new THashList;
3358
3359 // Flag about merging
3360 Bool_t merged = kTRUE;
3361
3362 // Process event lists first
3363 TList *elists = dynamic_cast<TList *> (obj);
3364 if (elists && !strcmp(elists->GetName(), "PROOF_EventListsList")) {
3365
3366 // Create a global event list, result of merging the event lists
3367 // coresponding to the various data set elements
3368 TEventList *evlist = new TEventList("PROOF_EventList");
3369
3370 // Iterate the list of event list segments
3371 TIter nxevl(elists);
3372 TEventList *evl = 0;
3373 while ((evl = dynamic_cast<TEventList *> (nxevl()))) {
3374
3375 // Find the file offset (fDSet is the current TDSet instance)
3376 // locating the element by name
3377 TIter nxelem(fDSet->GetListOfElements());
3378 TDSetElement *elem = 0;
3379 while ((elem = dynamic_cast<TDSetElement *> (nxelem()))) {
3380 if (!strcmp(elem->GetFileName(), evl->GetName()))
3381 break;
3382 }
3383 if (!elem) {
3384 Error("AddOutputObject", "Found an event list for %s, but no object with"
3385 " the same name in the TDSet", evl->GetName());
3386 continue;
3387 }
3388 Long64_t offset = elem->GetTDSetOffset();
3389
3390 // Shift the list by the number of first event in that file
3391 Long64_t *arr = evl->GetList();
3392 Int_t num = evl->GetN();
3393 if (arr && offset > 0)
3394 for (Int_t i = 0; i < num; i++)
3395 arr[i] += offset;
3396
3397 // Add to the global event list
3398 evlist->Add(evl);
3399 }
3400
3401 // Incorporate the resulting global list in fOutput
3402 SetLastMergingMsg(evlist);
3403 Incorporate(evlist, fOutput, merged);
3404 NotifyMemory(evlist);
3405
3406 // Delete the global list if merged
3407 if (merged)
3408 SafeDelete(evlist);
3409
3410 // The original object has been transformed in something else; we do
3411 // not have ownership on it
3412 return 1;
3413 }
3414
3415 // Check if we need to merge files
3416 TProofOutputFile *pf = dynamic_cast<TProofOutputFile*>(obj);
3417 if (pf) {
3419 if (!IsClient() || fProof->IsLite()) {
3420 if (pf->IsMerge()) {
3421 Bool_t hasfout = (pf->GetOutputFileName() &&
3422 strlen(pf->GetOutputFileName()) > 0 &&
3424 Bool_t setfout = (!hasfout || TestBit(TVirtualProofPlayer::kIsSubmerger)) ? kTRUE : kFALSE;
3425 if (setfout) {
3426
3427 TString ddir, ddopts;
3428 if (gProofServ) {
3429 ddir.Form("%s/", gProofServ->GetDataDir());
3431 }
3432 // Set the output file
3433 TString outfile(pf->GetOutputFileName());
3434 outfile.ReplaceAll("<datadir>/", ddir.Data());
3435 if (!ddopts.IsNull()) outfile += TString::Format("?%s", ddopts.Data());
3436 pf->SetOutputFileName(outfile);
3437
3438 if (gProofServ) {
3439 // If submerger, save first the existing filename, if any
3441 TString key = TString::Format("PROOF_OutputFileName_%s", pf->GetFileName());
3442 if (!fOutput->FindObject(key.Data()))
3443 fOutput->Add(new TNamed(key.Data(), pf->GetOutputFileName()));
3444 }
3445 TString of;
3447 if (of.IsNull()) {
3448 // Assume an xroot server running on the machine
3449 of.Form("root://%s/", gSystem->HostName());
3450 if (gSystem->Getenv("XRDPORT")) {
3451 TString sp(gSystem->Getenv("XRDPORT"));
3452 if (sp.IsDigit())
3453 of.Form("root://%s:%s/", gSystem->HostName(), sp.Data());
3454 }
3455 }
3456 TString sessionPath(gProofServ->GetSessionDir());
3457 TProofServ::FilterLocalroot(sessionPath, of);
3458 of += TString::Format("%s/%s", sessionPath.Data(), pf->GetFileName());
3460 if (!of.EndsWith(".merger")) of += ".merger";
3461 } else {
3462 if (of.EndsWith(".merger")) of.Remove(of.Last('.'));
3463 }
3464 pf->SetOutputFileName(of);
3465 }
3466 }
3467 // Notify
3468 PDB(kOutput, 1) pf->Print();
3469 }
3470 } else {
3471 // On clients notify the output path
3472 Printf("Output file: %s", pf->GetOutputFileName());
3473 }
3474 }
3475
3476 // For other objects we just run the incorporation procedure
3477 SetLastMergingMsg(obj);
3478 Incorporate(obj, fOutput, merged);
3479 NotifyMemory(obj);
3480
3481 // We are done
3482 return (merged ? 1 : 0);
3483}
3484
3485////////////////////////////////////////////////////////////////////////////////
3486/// Control output redirection to TProof::fLogFileW
3487
3489{
3490 if (on && fProof && fProof->fLogFileW) {
3493 } else if (!on) {
3494 if (fErrorHandler) {
3497 }
3498 }
3499}
3500
3501////////////////////////////////////////////////////////////////////////////////
3502/// Incorporate the content of the received output list 'out' into the final
3503/// output list fOutput. The latter is created if not existing.
3504/// This method short cuts 'StoreOutput + MergeOutput' limiting the memory
3505/// consumption.
3506
3508{
3509 PDB(kOutput,1) Info("AddOutput","Enter");
3510
3511 // We must something to process
3512 if (!out) {
3513 PDB(kOutput,1) Info("AddOutput","Invalid input (out == 0x0)");
3514 return;
3515 }
3516
3517 // Create the output list, if not yet done
3518 if (!fOutput)
3519 fOutput = new THashList;
3520
3521 // Process event lists first
3522 Bool_t merged = kTRUE;
3523 TList *elists = dynamic_cast<TList *> (out->FindObject("PROOF_EventListsList"));
3524 if (elists) {
3525
3526 // Create a global event list, result of merging the event lists
3527 // corresponding to the various data set elements
3528 TEventList *evlist = new TEventList("PROOF_EventList");
3529
3530 // Iterate the list of event list segments
3531 TIter nxevl(elists);
3532 TEventList *evl = 0;
3533 while ((evl = dynamic_cast<TEventList *> (nxevl()))) {
3534
3535 // Find the file offset (fDSet is the current TDSet instance)
3536 // locating the element by name
3537 TIter nxelem(fDSet->GetListOfElements());
3538 TDSetElement *elem = 0;
3539 while ((elem = dynamic_cast<TDSetElement *> (nxelem()))) {
3540 if (!strcmp(elem->GetFileName(), evl->GetName()))
3541 break;
3542 }
3543 if (!elem) {
3544 Error("AddOutput", "Found an event list for %s, but no object with"
3545 " the same name in the TDSet", evl->GetName());
3546 continue;
3547 }
3548 Long64_t offset = elem->GetTDSetOffset();
3549
3550 // Shift the list by the number of first event in that file
3551 Long64_t *arr = evl->GetList();
3552 Int_t num = evl->GetN();
3553 if (arr && offset > 0)
3554 for (Int_t i = 0; i < num; i++)
3555 arr[i] += offset;
3556
3557 // Add to the global event list
3558 evlist->Add(evl);
3559 }
3560
3561 // Remove and delete the events lists object to avoid spoiling iteration
3562 // during next steps
3563 out->Remove(elists);
3564 delete elists;
3565
3566 // Incorporate the resulting global list in fOutput
3567 SetLastMergingMsg(evlist);
3568 Incorporate(evlist, fOutput, merged);
3569 NotifyMemory(evlist);
3570 }
3571
3572 // Iterate on the remaining objects in the received list
3573 TIter nxo(out);
3574 TObject *obj = 0;
3575 while ((obj = nxo())) {
3576 SetLastMergingMsg(obj);
3577 Incorporate(obj, fOutput, merged);
3578 // If not merged, drop from the temporary list, as the ownership
3579 // passes to fOutput
3580 if (!merged)
3581 out->Remove(obj);
3582 NotifyMemory(obj);
3583 }
3584
3585 // Done
3586 return;
3587}
3588
3589////////////////////////////////////////////////////////////////////////////////
3590/// Printout the memory record after merging object 'obj'
3591/// This record is used by the memory monitor
3592
3594{
3595 if (fProof && (!IsClient() || fProof->IsLite())){
3596 ProcInfo_t pi;
3597 if (!gSystem->GetProcInfo(&pi)){
3598 // For PROOF-Lite we redirect this output to a the open log file so that the
3599 // memory monitor can pick these messages up
3601 Info("NotifyMemory|Svc", "Memory %ld virtual %ld resident after merging object %s",
3602 pi.fMemVirtual, pi.fMemResident, obj->GetName());
3603 RedirectOutput(0);
3604 }
3605 // Record also values for monitoring
3607 }
3608}
3609
3610////////////////////////////////////////////////////////////////////////////////
3611/// Set the message to be notified in case of exception
3612
3614{
3615 TString lastMsg = TString::Format("while merging object '%s'", obj->GetName());
3616 TProofServ::SetLastMsg(lastMsg);
3617}
3618
3619////////////////////////////////////////////////////////////////////////////////
3620/// Incorporate object 'newobj' in the list 'outlist'.
3621/// The object is merged with an object of the same name already existing in
3622/// the list, or just added.
3623/// The boolean merged is set to kFALSE when the object is just added to 'outlist';
3624/// this happens if the Merge() method does not exist or if a object named as 'obj'
3625/// is not already in the list. If the obj is not 'merged' than it should not be
3626/// deleted, unless outlist is not owner of its objects.
3627/// Return 0 on success, -1 on error.
3628
3630{
3631 merged = kTRUE;
3632
3633 PDB(kOutput,1)
3634 Info("Incorporate", "enter: obj: %p (%s), list: %p",
3635 newobj, newobj ? newobj->ClassName() : "undef", outlist);
3636
3637 // The object and list must exist
3638 if (!newobj || !outlist) {
3639 Error("Incorporate","Invalid inputs: obj: %p, list: %p", newobj, outlist);
3640 return -1;
3641 }
3642
3643 // Special treatment for histograms in autobin mode
3644 Bool_t specialH =
3646 if (specialH && newobj->InheritsFrom(TH1::Class())) {
3647 if (!HandleHistogram(newobj, merged)) {
3648 if (merged) {
3649 PDB(kOutput,1) Info("Incorporate", "histogram object '%s' merged", newobj->GetName());
3650 } else {
3651 PDB(kOutput,1) Info("Incorporate", "histogram object '%s' added to the"
3652 " appropriate list for delayed merging", newobj->GetName());
3653 }
3654 return 0;
3655 }
3656 }
3657
3658 // Check if an object with the same name exists already
3659 TObject *obj = outlist->FindObject(newobj->GetName());
3660
3661 // If no, add the new object and return
3662 if (!obj) {
3663 outlist->Add(newobj);
3664 merged = kFALSE;
3665 // Done
3666 return 0;
3667 }
3668
3669 // Locate the Merge(TCollection *) method
3670 TMethodCall callEnv;
3671 if (obj->IsA())
3672 callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
3673 if (callEnv.IsValid()) {
3674 // Found: put the object in a one-element list
3675 static TList *xlist = new TList;
3676 xlist->Add(newobj);
3677 // Call the method
3678 callEnv.SetParam((Longptr_t) xlist);
3679 callEnv.Execute(obj);
3680 // Ready for next call
3681 xlist->Clear();
3682 } else {
3683 // Not found: return individual objects
3684 outlist->Add(newobj);
3685 merged = kFALSE;
3686 }
3687
3688 // Done
3689 return 0;
3690}
3691
3692////////////////////////////////////////////////////////////////////////////////
3693/// Low statistic histograms need a special treatment when using autobin
3694
3696{
3697 TH1 *h = dynamic_cast<TH1 *>(obj);
3698 if (!h) {
3699 // Not an histo
3700 return obj;
3701 }
3702
3703 // This is only used if we return (TObject *)0 and there is only one case
3704 // when we set this to kTRUE
3705 merged = kFALSE;
3706
3707 // Does is still needs binning ?
3708 Bool_t tobebinned = (h->GetBuffer()) ? kTRUE : kFALSE;
3709
3710 // Number of entries
3711 Int_t nent = h->GetBufferLength();
3712 PDB(kOutput,2) Info("HandleHistogram", "h:%s ent:%d, buffer size: %d",
3713 h->GetName(), nent, h->GetBufferSize());
3714
3715 // Attach to the list in the outputlists, if any
3716 TList *list = 0;
3717 if (!fOutputLists) {
3718 PDB(kOutput,2) Info("HandleHistogram", "create fOutputLists");
3719 fOutputLists = new TList;
3721 }
3722 list = (TList *) fOutputLists->FindObject(h->GetName());
3723
3724 TH1 *href = 0;
3725 if (tobebinned) {
3726
3727 // The histogram needs to be projected in a reasonable range: we
3728 // do this at the end with all the histos, so we need to create
3729 // a list here
3730 if (!list) {
3731 // Create the list
3732 list = new TList;
3733 list->SetName(h->GetName());
3734 list->SetOwner();
3735 fOutputLists->Add(list);
3736 // Move in it any previously merged object from the output list
3737 if (fOutput && (href = (TH1 *) fOutput->FindObject(h->GetName()))) {
3738 fOutput->Remove(href);
3739 list->Add(href);
3740 }
3741 }
3742 TIter nxh(list);
3743 while ((href = (TH1 *) nxh())) {
3744 if (href->GetBuffer() && href->GetBufferLength() < nent) break;
3745 }
3746 if (href) {
3747 list->AddBefore(href, h);
3748 } else {
3749 list->Add(h);
3750 }
3751 // Done
3752 return (TObject *)0;
3753
3754 } else {
3755
3756 if (list) {
3757 TIter nxh(list);
3758 while ((href = (TH1 *) nxh())) {
3759 if (href->GetBuffer() || href->GetEntries() < nent) break;
3760 }
3761 if (href) {
3762 list->AddBefore(href, h);
3763 } else {
3764 list->Add(h);
3765 }
3766 // Done
3767 return (TObject *)0;
3768
3769 } else {
3770 // Check if we can 'Add' the histogram to an existing one; this is more efficient
3771 // then using Merge
3772 TH1 *hout = (TH1*) fOutput->FindObject(h->GetName());
3773 if (hout) {
3774 // Remove the existing histo from the output list ...
3775 fOutput->Remove(hout);
3776 // ... and create either the list to merge in one-go at the end
3777 // (more efficient than merging one by one) or, if too big, merge
3778 // these two and start the 'one-by-one' technology
3779 Int_t hsz = h->GetNbinsX() * h->GetNbinsY() * h->GetNbinsZ();
3781 list = new TList;
3782 list->Add(hout);
3783 h->Merge(list);
3784 list->SetOwner();
3785 delete list;
3786 return h;
3787 } else {
3788 list = new TList;
3789 list->SetName(h->GetName());
3790 list->SetOwner();
3791 fOutputLists->Add(list);
3792 // Add the existing and the incoming histos
3793 list->Add(hout);
3794 list->Add(h);
3795 // Done
3796 return (TObject *)0;
3797 }
3798 } else {
3799 // This is the first one; add it to the output list
3800 fOutput->Add(h);
3801 return (TObject *)0;
3802 }
3803 }
3804 }
3805}
3806
3807////////////////////////////////////////////////////////////////////////////////
3808/// Return kTRUE is the histograms 'h0' and 'h1' have the same binning and ranges
3809/// on the axis (i.e. if they can be just Add-ed for merging).
3810
3812{
3813 Bool_t rc = kFALSE;
3814 if (!h0 || !h1) return rc;
3815
3816 TAxis *a0 = 0, *a1 = 0;
3817
3818 // Check X
3819 a0 = h0->GetXaxis();
3820 a1 = h1->GetXaxis();
3821 if (a0->GetNbins() == a1->GetNbins())
3822 if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
3823 if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
3824
3825 // Check Y, if needed
3826 if (h0->GetDimension() > 1) {
3827 rc = kFALSE;
3828 a0 = h0->GetYaxis();
3829 a1 = h1->GetYaxis();
3830 if (a0->GetNbins() == a1->GetNbins())
3831 if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
3832 if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
3833 }
3834
3835 // Check Z, if needed
3836 if (h0->GetDimension() > 2) {
3837 rc = kFALSE;
3838 a0 = h0->GetZaxis();
3839 a1 = h1->GetZaxis();
3840 if (a0->GetNbins() == a1->GetNbins())
3841 if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
3842 if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
3843 }
3844
3845 // Done
3846 return rc;
3847}
3848
3849////////////////////////////////////////////////////////////////////////////////
3850/// Store received output list.
3851
3853{
3854 PDB(kOutput,1) Info("StoreOutput","Enter");
3855
3856 if ( out == 0 ) {
3857 PDB(kOutput,1) Info("StoreOutput","Leave (empty)");
3858 return;
3859 }
3860
3861 TIter next(out);
3862 out->SetOwner(kFALSE); // take ownership of the contents
3863
3864 if (fOutputLists == 0) {
3865 PDB(kOutput,2) Info("StoreOutput","Create fOutputLists");
3866 fOutputLists = new TList;
3868 }
3869 // process eventlists first
3870 TList* lists = dynamic_cast<TList*> (out->FindObject("PROOF_EventListsList"));
3871 if (lists) {
3872 out->Remove(lists);
3873 TEventList *mainList = new TEventList("PROOF_EventList");
3874 out->Add(mainList);
3875 TIter it(lists);
3876 TEventList *aList;
3877 while ( (aList = dynamic_cast<TEventList*> (it())) ) {
3878 // find file offset
3880 TDSetElement *elem;
3881 while ( (elem = dynamic_cast<TDSetElement*> (nxe())) ) {
3882 if (strcmp(elem->GetFileName(), aList->GetName()) == 0)
3883 break;
3884 }
3885 if (!elem) {
3886 Error("StoreOutput", "found the EventList for %s, but no object with that name "
3887 "in the TDSet", aList->GetName());
3888 continue;
3889 }
3890 Long64_t offset = elem->GetTDSetOffset();
3891
3892 // shift the list by the number of first event in that file
3893 Long64_t *arr = aList->GetList();
3894 Int_t num = aList->GetN();
3895 if (arr && offset)
3896 for (int i = 0; i < num; i++)
3897 arr[i] += offset;
3898
3899 mainList->Add(aList); // add to the main list
3900 }
3901 delete lists;
3902 }
3903
3904 TObject *obj;
3905 while( (obj = next()) ) {
3906 PDB(kOutput,2) Info("StoreOutput","find list for '%s'", obj->GetName() );
3907
3908 TList *list = (TList *) fOutputLists->FindObject( obj->GetName() );
3909 if ( list == 0 ) {
3910 PDB(kOutput,2) Info("StoreOutput", "list for '%s' not found (creating)", obj->GetName());
3911 list = new TList;
3912 list->SetName( obj->GetName() );
3913 list->SetOwner();
3914 fOutputLists->Add( list );
3915 }
3916 list->Add( obj );
3917 }
3918
3919 delete out;
3920 PDB(kOutput,1) Info("StoreOutput", "leave");
3921}
3922
3923////////////////////////////////////////////////////////////////////////////////
3924/// Merge feedback lists.
3925
3927{
3928 PDB(kFeedback,1)
3929 Info("MergeFeedback","Enter");
3930
3931 if ( fFeedbackLists == 0 ) {
3932 PDB(kFeedback,1)
3933 Info("MergeFeedback","Leave (no output)");
3934 return 0;
3935 }
3936
3937 TList *fb = new TList; // collection of feedback objects
3938 fb->SetOwner();
3939
3940 TIter next(fFeedbackLists);
3941
3942 TMap *map;
3943 while ( (map = (TMap*) next()) ) {
3944
3945 PDB(kFeedback,2)
3946 Info("MergeFeedback", "map %s size: %d", map->GetName(), map->GetSize());
3947
3948 // turn map into list ...
3949
3950 TList *list = new TList;
3951 TIter keys(map);
3952
3953#ifndef R__TH1MERGEFIXED
3954 Int_t nbmx = -1;
3955 TObject *oref = 0;
3956#endif
3957 while ( TObject *key = keys() ) {
3958 TObject *o = map->GetValue(key);
3959 TH1 *h = dynamic_cast<TH1 *>(o);
3960#ifndef R__TH1MERGEFIXED
3961 // Temporary fix for to cope with the problem in TH1::Merge.
3962 // We need to use a reference histo the one with the largest number
3963 // of bins so that the histos from all submasters can be correctly
3964 // fit in
3965 if (h && !strncmp(o->GetName(),"PROOF_",6)) {
3966 if (h->GetNbinsX() > nbmx) {
3967 nbmx= h->GetNbinsX();
3968 oref = o;
3969 }
3970 }
3971#endif
3972 if (h) {
3973 TIter nxh(list);
3974 TH1 *href= 0;
3975 while ((href = (TH1 *)nxh())) {
3976 if (h->GetBuffer()) {
3977 if (href->GetBuffer() && href->GetBufferLength() < h->GetBufferLength()) break;
3978 } else {
3979 if (href->GetBuffer() || href->GetEntries() < h->GetEntries()) break;
3980 }
3981 }
3982 if (href) {
3983 list->AddBefore(href, h);
3984 } else {
3985 list->Add(h);
3986 }
3987 } else {
3988 list->Add(o);
3989 }
3990 }
3991
3992 // clone first object, remove from list
3993#ifdef R__TH1MERGEFIXED
3994 TObject *obj = list->First();
3995#else
3996 TObject *obj = (oref) ? oref : list->First();
3997#endif
3998 list->Remove(obj);
3999 obj = obj->Clone();
4000 fb->Add(obj);
4001
4002 if ( list->IsEmpty() ) {
4003 delete list;
4004 continue;
4005 }
4006
4007 // merge list with clone
4008 TMethodCall callEnv;
4009 if (obj->IsA())
4010 callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
4011 if (callEnv.IsValid()) {
4012 callEnv.SetParam((Longptr_t) list);
4013 callEnv.Execute(obj);
4014 } else {
4015 // No Merge interface, return copy of individual objects
4016 while ( (obj = list->First()) ) {
4017 fb->Add(obj->Clone());
4018 list->Remove(obj);
4019 }
4020 }
4021
4022 delete list;
4023 }
4024
4025 PDB(kFeedback,1)
4026 Info("MergeFeedback","Leave (%d object(s))", fb->GetSize());
4027
4028 return fb;
4029}
4030
4031////////////////////////////////////////////////////////////////////////////////
4032/// Store feedback results from the specified slave.
4033
4035{
4036 PDB(kFeedback,1)
4037 Info("StoreFeedback","Enter");
4038
4039 if ( out == 0 ) {
4040 PDB(kFeedback,1)
4041 Info("StoreFeedback","Leave (empty)");
4042 return;
4043 }
4044
4045 if ( IsClient() ) {
4046 // in client
4047 Feedback(out);
4048 delete out;
4049 return;
4050 }
4051
4052 if (fFeedbackLists == 0) {
4053 PDB(kFeedback,2) Info("StoreFeedback","Create fFeedbackLists");
4054 fFeedbackLists = new TList;
4056 }
4057
4058 TIter next(out);
4059 out->SetOwner(kFALSE); // take ownership of the contents
4060
4061 const char *ord = ((TSlave*) slave)->GetOrdinal();
4062
4063 TObject *obj;
4064 while( (obj = next()) ) {
4065 PDB(kFeedback,2)
4066 Info("StoreFeedback","%s: Find '%s'", ord, obj->GetName() );
4067 TMap *map = (TMap*) fFeedbackLists->FindObject(obj->GetName());
4068 if ( map == 0 ) {
4069 PDB(kFeedback,2)
4070 Info("StoreFeedback", "%s: map for '%s' not found (creating)", ord, obj->GetName());
4071 // Map must not be owner (ownership is with regards to the keys (only))
4072 map = new TMap;
4073 map->SetName(obj->GetName());
4074 fFeedbackLists->Add(map);
4075 } else {
4076 PDB(kFeedback,2)
4077 Info("StoreFeedback","%s: removing previous value", ord);
4078 if (map->GetValue(slave))
4079 delete map->GetValue(slave);
4080 map->Remove(slave);
4081 }
4082 map->Add(slave, obj);
4083 PDB(kFeedback,2)
4084 Info("StoreFeedback","%s: %s, size: %d", ord, obj->GetName(), map->GetSize());
4085 }
4086
4087 delete out;
4088 PDB(kFeedback,1)
4089 Info("StoreFeedback","Leave");
4090}
4091
4092////////////////////////////////////////////////////////////////////////////////
4093/// Setup reporting of feedback objects.
4094
4096{
4097 if (IsClient()) return; // Client does not need timer
4098
4099 fFeedback = (TList*) fInput->FindObject("FeedbackList");
4100
4101 PDB(kFeedback,1) Info("SetupFeedback","\"FeedbackList\" %sfound",
4102 fFeedback == 0 ? "NOT ":"");
4103
4104 if (fFeedback == 0 || fFeedback->GetSize() == 0) return;
4105
4106 // OK, feedback was requested, setup the timer
4108 fFeedbackPeriod = 2000;
4109 TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
4110 fFeedbackTimer = new TTimer;
4113}
4114
4115////////////////////////////////////////////////////////////////////////////////
4116/// Stop reporting of feedback objects.
4117
4119{
4120 if (fFeedbackTimer == 0) return;
4121
4122 PDB(kFeedback,1) Info("StopFeedback","Stop Timer");
4123
4125}
4126
4127////////////////////////////////////////////////////////////////////////////////
4128/// Send feedback objects to client.
4129
4131{
4132 PDB(kFeedback,2) Info("HandleTimer","Entry");
4133
4134 if (fFeedbackTimer == 0) return kFALSE; // timer already switched off
4135
4136 // process local feedback objects
4137
4138 TList *fb = new TList;
4139 fb->SetOwner();
4140
4141 TIter next(fFeedback);
4142 while( TObjString *name = (TObjString*) next() ) {
4143 TObject *o = fOutput->FindObject(name->GetName());
4144 if (o != 0) {
4145 fb->Add(o->Clone());
4146 // remove the corresponding entry from the feedback list
4147 TMap *m = 0;
4148 if (fFeedbackLists &&
4149 (m = (TMap *) fFeedbackLists->FindObject(name->GetName()))) {
4151 m->DeleteValues();
4152 delete m;
4153 }
4154 }
4155 }
4156
4157 if (fb->GetSize() > 0) {
4158 StoreFeedback(this, fb); // adopts fb
4159 } else {
4160 delete fb;
4161 }
4162
4163 if (fFeedbackLists == 0) {
4164 fFeedbackTimer->Start(fFeedbackPeriod, kTRUE); // maybe next time
4165 return kFALSE;
4166 }
4167
4168 fb = MergeFeedback();
4169
4170 PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());
4171
4173 m << fb;
4174
4175 // send message to client;
4177
4178 delete fb;
4179
4181
4182 return kFALSE; // ignored?
4183}
4184
4185////////////////////////////////////////////////////////////////////////////////
4186/// Get next packet for specified slave.
4187
4189{
4190 // The first call to this determines the end of initialization
4191 SetInitTime();
4192
4193 if (fProcPackets) {
4194 Int_t bin = fProcPackets->GetXaxis()->FindBin(slave->GetOrdinal());
4195 if (bin >= 0) {
4196 if (fProcPackets->GetBinContent(bin) > 0)
4197 fProcPackets->Fill(slave->GetOrdinal(), -1);
4198 }
4199 }
4200
4202
4203 if (e == 0) {
4204 PDB(kPacketizer,2)
4205 Info("GetNextPacket","%s: done!", slave->GetOrdinal());
4206 } else if (e == (TDSetElement*) -1) {
4207 PDB(kPacketizer,2)
4208 Info("GetNextPacket","%s: waiting ...", slave->GetOrdinal());
4209 } else {
4210 PDB(kPacketizer,2)
4211 Info("GetNextPacket","%s (%s): '%s' '%s' '%s' %lld %lld",
4212 slave->GetOrdinal(), slave->GetName(), e->GetFileName(),
4213 e->GetDirectory(), e->GetObjName(), e->GetFirst(), e->GetNum());
4214 if (fProcPackets) fProcPackets->Fill(slave->GetOrdinal(), 1);
4215 }
4216
4217 return e;
4218}
4219
4220////////////////////////////////////////////////////////////////////////////////
4221/// Is the player running on the client?
4222
4224{
4226}
4227
4228////////////////////////////////////////////////////////////////////////////////
4229/// Draw (support for TChain::Draw()).
4230/// Returns -1 in case of error or number of selected events in case of success.
4231
4233 const char *selection, Option_t *option,
4234 Long64_t nentries, Long64_t firstentry)
4235{
4236 if (!fgDrawInputPars) {
4238 fgDrawInputPars->Add(new TObjString("FeedbackList"));
4239 fgDrawInputPars->Add(new TObjString("PROOF_ChainWeight"));
4240 fgDrawInputPars->Add(new TObjString("PROOF_LineColor"));
4241 fgDrawInputPars->Add(new TObjString("PROOF_LineStyle"));
4242 fgDrawInputPars->Add(new TObjString("PROOF_LineWidth"));
4243 fgDrawInputPars->Add(new TObjString("PROOF_MarkerColor"));
4244 fgDrawInputPars->Add(new TObjString("PROOF_MarkerStyle"));
4245 fgDrawInputPars->Add(new TObjString("PROOF_MarkerSize"));
4246 fgDrawInputPars->Add(new TObjString("PROOF_FillColor"));
4247 fgDrawInputPars->Add(new TObjString("PROOF_FillStyle"));
4248 fgDrawInputPars->Add(new TObjString("PROOF_ListOfAliases"));
4249 }
4250
4251 TString selector, objname;
4252 if (GetDrawArgs(varexp, selection, option, selector, objname) != 0) {
4253 Error("DrawSelect", "parsing arguments");
4254 return -1;
4255 }
4256
4257 TNamed *varexpobj = new TNamed("varexp", varexp);
4258 TNamed *selectionobj = new TNamed("selection", selection);
4259
4260 // Save the current input list
4261 TObject *o = 0;
4262 TList *savedInput = new TList;
4263 TIter nxi(fInput);
4264 while ((o = nxi())) {
4265 savedInput->Add(o);
4266 TString n(o->GetName());
4267 if (fgDrawInputPars &&
4269 !n.BeginsWith("alias:")) fInput->Remove(o);
4270 }
4271
4272 fInput->Add(varexpobj);
4273 fInput->Add(selectionobj);
4274
4275 // Make sure we have an object name
4276 if (objname == "") objname = "htemp";
4277
4278 fProof->AddFeedback(objname);
4279 Long64_t r = Process(set, selector, option, nentries, firstentry);
4280 fProof->RemoveFeedback(objname);
4281
4282 fInput->Remove(varexpobj);
4283 fInput->Remove(selectionobj);
4284 if (TNamed *opt = dynamic_cast<TNamed*> (fInput->FindObject("PROOF_OPTIONS"))) {
4285 fInput->Remove(opt);
4286 delete opt;
4287 }
4288
4289 delete varexpobj;
4290 delete selectionobj;
4291
4292 // Restore the input list
4293 fInput->Clear();
4294 TIter nxsi(savedInput);
4295 while ((o = nxsi()))
4296 fInput->Add(o);
4297 savedInput->SetOwner(kFALSE);
4298 delete savedInput;
4299
4300 return r;
4301}
4302
4303////////////////////////////////////////////////////////////////////////////////
4304/// Set init time
4305
4307{
4308 if (fPacketizer)
4310}
4311
4312//------------------------------------------------------------------------------
4313
4314
4316
4317////////////////////////////////////////////////////////////////////////////////
4318/// Setup feedback.
4319
4321{
4322 TList *fb = (TList*) fInput->FindObject("FeedbackList");
4323 if (fb) {
4324 PDB(kFeedback,1)
4325 Info("SetupFeedback","\"FeedbackList\" found: %d objects", fb->GetSize());
4326 } else {
4327 PDB(kFeedback,1)
4328 Info("SetupFeedback","\"FeedbackList\" NOT found");
4329 }
4330
4331 if (fb == 0 || fb->GetSize() == 0) return;
4332
4333 // OK, feedback was requested, setup the timer
4334
4336 fFeedbackPeriod = 2000;
4337 TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
4338 fFeedbackTimer = new TTimer;
4341
4342 fFeedback = fb;
4343}
4344
4345////////////////////////////////////////////////////////////////////////////////
4346/// Stop feedback.
4347
4349{
4350 if (fFeedbackTimer == 0) return;
4351
4352 PDB(kFeedback,1) Info("StopFeedback","Stop Timer");
4353
4355}
4356
4357////////////////////////////////////////////////////////////////////////////////
4358/// Handle timer event.
4359
4361{
4362 PDB(kFeedback,2) Info("HandleTimer","Entry");
4363
4364 // If in sequential (0-slave-PROOF) mode we do not have a packetizer
4365 // so we also send the info to update the progress bar.
4366 if (gProofServ) {
4367 Bool_t sendm = kFALSE;
4369 if (gProofServ->IsMaster() && !gProofServ->IsParallel()) {
4370 sendm = kTRUE;
4371 if (gProofServ->GetProtocol() > 25) {
4372 m << GetProgressStatus();
4373 } else if (gProofServ->GetProtocol() > 11) {
4375 m << fTotalEvents << ps->GetEntries() << ps->GetBytesRead()
4376 << (Float_t) -1. << (Float_t) ps->GetProcTime()
4377 << (Float_t) ps->GetRate() << (Float_t) -1.;
4378 } else {
4380 }
4381 }
4382 if (sendm) gProofServ->GetSocket()->Send(m);
4383 }
4384
4385 if (fFeedback == 0) return kFALSE;
4386
4387 TList *fb = new TList;
4388 fb->SetOwner(kFALSE);
4389
4390 if (fOutput == 0) {
4392 }
4393
4394 if (fOutput) {
4395 TIter next(fFeedback);
4396 while( TObjString *name = (TObjString*) next() ) {
4397 // TODO: find object in memory ... maybe allow only in fOutput ?
4398 TObject *o = fOutput->FindObject(name->GetName());
4399 if (o != 0) fb->Add(o);
4400 }
4401 }
4402
4403 PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());
4404
4406 m << fb;
4407
4408 // send message to client;
4410
4411 delete fb;
4412
4414
4415 return kFALSE; // ignored?
4416}
4417
4418////////////////////////////////////////////////////////////////////////////////
4419/// Handle tree header request.
4420
4422{
4424
4425 TDSet *dset;
4426 (*mess) >> dset;
4427 dset->Reset();
4428 TDSetElement *e = dset->Next();
4429 Long64_t entries = 0;
4430 TFile *f = 0;
4431 TTree *t = 0;
4432 if (!e) {
4433 PDB(kGlobal, 1) Info("HandleGetTreeHeader", "empty TDSet");
4434 } else {
4435 f = TFile::Open(e->GetFileName());
4436 t = 0;
4437 if (f) {
4438 t = (TTree*) f->Get(e->GetObjName());
4439 if (t) {
4440 t->SetMaxVirtualSize(0);
4441 t->DropBaskets();
4442 entries = t->GetEntries();
4443
4444 // compute #entries in all the files
4445 while ((e = dset->Next()) != 0) {
4446 TFile *f1 = TFile::Open(e->GetFileName());
4447 if (f1) {
4448 TTree *t1 = (TTree*) f1->Get(e->GetObjName());
4449 if (t1) {
4450 entries += t1->GetEntries();
4451 delete t1;
4452 }
4453 delete f1;
4454 }
4455 }
4456 t->SetMaxEntryLoop(entries); // this field will hold the total number of entries ;)
4457 }
4458 }
4459 }
4460 if (t)
4461 answ << TString("Success") << t;
4462 else
4463 answ << TString("Failed") << t;
4464
4465 fSocket->Send(answ);
4466
4467 SafeDelete(t);
4468 SafeDelete(f);
4469}
4470
4471
4472//------------------------------------------------------------------------------
4473
4475
4476////////////////////////////////////////////////////////////////////////////////
4477/// Process specified TDSet on PROOF. Runs on super master.
4478/// The return value is -1 in case of error and TSelector::GetStatus() in
4479/// in case of success.
4480
4481Long64_t TProofPlayerSuperMaster::Process(TDSet *dset, const char *selector_file,
4482 Option_t *option, Long64_t nentries,
4484{
4486 PDB(kGlobal,1) Info("Process","Enter");
4487
4488 TProofSuperMaster *proof = dynamic_cast<TProofSuperMaster*>(GetProof());
4489 if (!proof) return -1;
4490
4491 delete fOutput;
4492 fOutput = new THashList;
4493
4495
4496 if (!SendSelector(selector_file)) {
4497 Error("Process", "sending selector %s", selector_file);
4498 return -1;
4499 }
4500
4501 TCleanup clean(this);
4502 SetupFeedback();
4503
4504 if (proof->IsMaster()) {
4505
4506 // make sure the DSet is valid
4507 if (!dset->ElementsValid()) {
4508 proof->ValidateDSet(dset);
4509 if (!dset->ElementsValid()) {
4510 Error("Process", "could not validate TDSet");
4511 return -1;
4512 }
4513 }
4514
4515 TList msds;
4516 msds.SetOwner(); // This will delete TPairs
4517
4518 TList keyholder; // List to clean up key part of the pairs
4519 keyholder.SetOwner();
4520 TList valueholder; // List to clean up value part of the pairs
4521 valueholder.SetOwner();
4522
4523 // Construct msd list using the slaves
4524 TIter nextslave(proof->GetListOfActiveSlaves());
4525 while (TSlave *sl = dynamic_cast<TSlave*>(nextslave())) {
4526 TList *submasters = 0;
4527 TPair *msd = dynamic_cast<TPair*>(msds.FindObject(sl->GetMsd()));
4528 if (!msd) {
4529 submasters = new TList;
4530 submasters->SetName(sl->GetMsd());
4531 keyholder.Add(submasters);
4532 TList *setelements = new TSortedList(kSortDescending);
4533 setelements->SetName(TString(sl->GetMsd())+"_Elements");
4534 valueholder.Add(setelements);
4535 msds.Add(new TPair(submasters, setelements));
4536 } else {
4537 submasters = dynamic_cast<TList*>(msd->Key());
4538 }
4539 if (submasters) submasters->Add(sl);
4540 }
4541
4542 // Add TDSetElements to msd list
4543 Long64_t cur = 0; //start of next element
4544 TIter nextelement(dset->GetListOfElements());
4545 while (TDSetElement *elem = dynamic_cast<TDSetElement*>(nextelement())) {
4546
4547 if (elem->GetNum()<1) continue; // get rid of empty elements
4548
4549 if (nentries !=-1 && cur>=first+nentries) {
4550 // we are done
4551 break;
4552 }
4553
4554 if (cur+elem->GetNum()-1<first) {
4555 //element is before first requested entry
4556 cur+=elem->GetNum();
4557 continue;
4558 }
4559
4560 if (cur<first) {
4561 //modify element to get proper start
4562 elem->SetNum(elem->GetNum()-(first-cur));
4563 elem->SetFirst(elem->GetFirst()+first-cur);
4564 cur=first;
4565 }
4566
4567 if (nentries==-1 || cur+elem->GetNum()<=first+nentries) {
4568 cur+=elem->GetNum();
4569 } else {
4570 //modify element to get proper end
4571 elem->SetNum(first+nentries-cur);
4572 cur=first+nentries;
4573 }
4574
4575 TPair *msd = dynamic_cast<TPair*>(msds.FindObject(elem->GetMsd()));
4576 if (!msd) {
4577 Error("Process", "data requires mass storage domain '%s'"
4578 " which is not accessible in this proof session",
4579 elem->GetMsd());
4580 return -1;
4581 } else {
4582 TList *elements = dynamic_cast<TList*>(msd->Value());
4583 if (elements) elements->Add(elem);
4584 }
4585 }
4586
4587 TList usedmasters;
4588 TIter nextmsd(msds.MakeIterator());
4589 while (TPair *msd = dynamic_cast<TPair*>(nextmsd())) {
4590 TList *submasters = dynamic_cast<TList*>(msd->Key());
4591 TList *setelements = dynamic_cast<TList*>(msd->Value());
4592
4593 // distribute elements over the masters
4594 Int_t nmasters = submasters ? submasters->GetSize() : -1;
4595 Int_t nelements = setelements ? setelements->GetSize() : -1;
4596 for (Int_t i=0; i<nmasters; i++) {
4597
4598 Long64_t nent = 0;
4599 TDSet set(dset->GetType(), dset->GetObjName(),
4600 dset->GetDirectory());
4601 for (Int_t j = (i*nelements)/nmasters;
4602 j < ((i+1)*nelements)/nmasters;
4603 j++) {
4604 TDSetElement *elem = setelements ?
4605 dynamic_cast<TDSetElement*>(setelements->At(j)) : (TDSetElement *)0;
4606 if (elem) {
4607 set.Add(elem->GetFileName(), elem->GetObjName(),
4608 elem->GetDirectory(), elem->GetFirst(),
4609 elem->GetNum(), elem->GetMsd());
4610 nent += elem->GetNum();
4611 } else {
4612 Warning("Process", "not a TDSetElement object");
4613 }
4614 }
4615
4616 if (set.GetListOfElements()->GetSize()>0) {
4618 TString fn(gSystem->BaseName(selector_file));
4619 TString opt = option;
4620 mesg << &set << fn << fInput << opt << Long64_t(-1) << Long64_t(0);
4621
4622 TSlave *sl = dynamic_cast<TSlave*>(submasters->At(i));
4623 if (sl) {
4624 PDB(kGlobal,1) Info("Process",
4625 "Sending TDSet with %d elements to submaster %s",
4626 set.GetListOfElements()->GetSize(),
4627 sl->GetOrdinal());
4628 sl->GetSocket()->Send(mesg);
4629 usedmasters.Add(sl);
4630
4631 // setup progress info
4632 fSlaves.AddLast(sl);
4636 fSlaveTotals[fSlaveTotals.GetSize()-1] = nent;
4646 fSlaveMBRti[fSlaveMBRti.GetSize()-1] = -1.;
4648 fSlaveActW[fSlaveActW.GetSize()-1] = 0;
4650 fSlaveTotS[fSlaveTotS.GetSize()-1] = 0;
4652 fSlaveEffS[fSlaveEffS.GetSize()-1] = 0.;
4653 } else {
4654 Warning("Process", "not a TSlave object");
4655 }
4656 }
4657 }
4658 }
4659
4660 if ( !IsClient() ) HandleTimer(0);
4661 PDB(kGlobal,1) Info("Process","Calling Collect");
4662 proof->Collect(&usedmasters);
4663 HandleTimer(0);
4664
4665 }
4666
4667 StopFeedback();
4668
4669 PDB(kGlobal,1) Info("Process","Calling Merge Output");
4670 MergeOutput();
4671
4673
4674 return 0;
4675}
4676
4677////////////////////////////////////////////////////////////////////////////////
4678/// Report progress.
4679
4681{
4682 Int_t idx = fSlaves.IndexOf(sl);
4683 fSlaveProgress[idx] = processed;
4684 if (fSlaveTotals[idx] != total)
4685 Warning("Progress", "total events has changed for slave %s", sl->GetName());
4686 fSlaveTotals[idx] = total;
4687
4688 Long64_t tot = 0;
4689 Int_t i;
4690 for (i = 0; i < fSlaveTotals.GetSize(); i++) tot += fSlaveTotals[i];
4691 Long64_t proc = 0;
4692 for (i = 0; i < fSlaveProgress.GetSize(); i++) proc += fSlaveProgress[i];
4693
4694 Progress(tot, proc);
4695}
4696
4697////////////////////////////////////////////////////////////////////////////////
4698/// Report progress.
4699
4701 Long64_t processed, Long64_t bytesread,
4702 Float_t initTime, Float_t procTime,
4703 Float_t evtrti, Float_t mbrti)
4704{
4705 PDB(kGlobal,2)