Logo ROOT   6.12/07
Reference Guide
TProofLite.cxx
Go to the documentation of this file.
1 // @(#)root/proof:$Id: 7735e42a1b96a9f40ae76bd884acac883a178dee $
2 // Author: G. Ganis March 2008
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /** \class TProofLite
13 \ingroup proofkernel
14 
15 This class starts a PROOF session on the local machine: no daemons,
16 client and master merged, communications via UNIX-like sockets.
17 By default the number of workers started is NumberOfCores+1; a
18 different number can be forced on construction.
19 
20 */
21 
22 #include "TProofLite.h"
23 
24 #ifdef WIN32
25 # include <io.h>
26 # include "snprintf.h"
27 #endif
28 #include "RConfigure.h"
29 #include "TDSet.h"
30 #include "TEnv.h"
31 #include "TError.h"
32 #include "TFile.h"
33 #include "TFileCollection.h"
34 #include "TFileInfo.h"
35 #include "THashList.h"
36 #include "TMessage.h"
37 #include "TMonitor.h"
38 #include "TObjString.h"
39 #include "TPluginManager.h"
40 #include "TDataSetManager.h"
41 #include "TDataSetManagerFile.h"
42 #include "TParameter.h"
43 #include "TPRegexp.h"
44 #include "TProofQueryResult.h"
45 #include "TProofServ.h"
46 #include "TQueryResultManager.h"
47 #include "TROOT.h"
48 #include "TServerSocket.h"
49 #include "TSlave.h"
50 #include "TSortedList.h"
51 #include "TTree.h"
52 #include "TVirtualProofPlayer.h"
53 #include "TSelector.h"
54 #include "TPackMgr.h"
55 
57 
58 Int_t TProofLite::fgWrksMax = -2; // Unitialized max number of workers
59 
60 ////////////////////////////////////////////////////////////////////////////////
61 /// Create a PROOF environment. Starting PROOF involves either connecting
62 /// to a master server, which in turn will start a set of slave servers, or
63 /// directly starting as master server (if master = ""). Masterurl is of
64 /// the form: [proof[s]://]host[:port]. Conffile is the name of the config
65 /// file describing the remote PROOF cluster (this argument alows you to
66 /// describe different cluster configurations).
67 /// The default is proof.conf. Confdir is the directory where the config
68 /// file and other PROOF related files are (like motd and noproof files).
69 /// Loglevel is the log level (default = 1). User specified custom config
70 /// files will be first looked for in $HOME/.conffile.
71 
72 TProofLite::TProofLite(const char *url, const char *conffile, const char *confdir,
73  Int_t loglevel, const char *alias, TProofMgr *mgr)
74 {
75  fUrl.SetUrl(url);
76 
77  // Default initializations
78  fServSock = 0;
79  fCacheLock = 0;
80  fQueryLock = 0;
81  fQMgr = 0;
82  fDataSetManager = 0;
83  fDataSetStgRepo = 0;
84  fReInvalid = new TPMERegexp("[^A-Za-z0-9._-]");
85  InitMembers();
86 
87  // This may be needed during init
88  fManager = mgr;
89 
90  // Default server type
92 
93  // Default query mode
94  fQueryMode = kSync;
95 
96  // Client and master are merged
100 
101  // Flag that we are a client
102  if (!gSystem->Getenv("ROOTPROOFCLIENT")) gSystem->Setenv("ROOTPROOFCLIENT","");
103 
104  // Protocol and Host
105  fUrl.SetProtocol("proof");
106  fUrl.SetHost("__lite__");
107  fUrl.SetPort(1093);
108 
109  // User
110  if (strlen(fUrl.GetUser()) <= 0) {
111  // Get user logon name
113  if (pw) {
114  fUrl.SetUser(pw->fUser);
115  delete pw;
116  }
117  }
118  fMaster = gSystem->HostName();
119 
120  // Analysise the conffile field
121  ParseConfigField(conffile);
122 
123  // Determine the number of workers giving priority to users request.
124  // Otherwise use the system information, if available, or just start
125  // the minimal number, i.e. 2 .
126  if ((fNWorkers = GetNumberOfWorkers(url)) > 0) {
127 
128  TString stup;
129  if (gProofServ) {
130  Int_t port = gEnv->GetValue("ProofServ.XpdPort", 1093);
131  stup.Form("%s @ %s:%d ", gProofServ->GetOrdinal(), gSystem->HostName(), port);
132  }
133  Printf(" +++ Starting PROOF-Lite %swith %d workers +++", stup.Data(), fNWorkers);
134  // Init the session now
135  Init(url, conffile, confdir, loglevel, alias);
136  }
137 
138  // For final cleanup
139  if (!gROOT->GetListOfProofs()->FindObject(this))
140  gROOT->GetListOfProofs()->Add(this);
141 
142  // Still needed by the packetizers: needs to be changed
143  gProof = this;
144 }
145 
146 ////////////////////////////////////////////////////////////////////////////////
147 /// Start the PROOF environment. Starting PROOF involves either connecting
148 /// to a master server, which in turn will start a set of slave servers, or
149 /// directly starting as master server (if master = ""). For a description
150 /// of the arguments see the TProof ctor. Returns the number of started
151 /// master or slave servers, returns 0 in case of error, in which case
152 /// fValid remains false.
153 
154 Int_t TProofLite::Init(const char *, const char *conffile,
155  const char *confdir, Int_t loglevel, const char *)
156 {
158 
159  fValid = kFALSE;
160 
161  // Connected to terminal?
162  fTty = (isatty(0) == 0 || isatty(1) == 0) ? kFALSE : kTRUE;
163 
164  if (TestBit(TProof::kIsMaster)) {
165  // Fill default conf file and conf dir
166  if (!conffile || !conffile[0])
168  if (!confdir || !confdir[0])
170  } else {
171  fConfDir = confdir;
172  fConfFile = conffile;
173  }
174 
175  // The sandbox for this session
176  if (CreateSandbox() != 0) {
177  Error("Init", "could not create/assert sandbox for this session");
178  return 0;
179  }
180 
181  // UNIX path for communication with workers
182  TString sockpathdir = gEnv->GetValue("ProofLite.SockPathDir", gSystem->TempDirectory());
183  if (sockpathdir.IsNull()) sockpathdir = gSystem->TempDirectory();
184  if (sockpathdir(sockpathdir.Length()-1) == '/') sockpathdir.Remove(sockpathdir.Length()-1);
185  fSockPath.Form("%s/plite-%d", sockpathdir.Data(), gSystem->GetPid());
186  if (fSockPath.Length() > 104) {
187  // Sort of hardcoded limit length for Unix systems
188  Error("Init", "Unix socket path '%s' is too long (%d bytes):",
190  Error("Init", "use 'ProofLite.SockPathDir' to create it under a directory different"
191  " from '%s'", sockpathdir.Data());
192  return 0;
193  }
194 
195  fLogLevel = loglevel;
198  fImage = "<local>";
199  fIntHandler = 0;
200  fStatus = 0;
201  fRecvMessages = new TList;
203  fSlaveInfo = 0;
204  fChains = new TList;
205  fAvailablePackages = 0;
206  fEnabledPackages = 0;
208  fInputData = 0;
210 
213 
214  // Timeout for some collect actions
215  fCollectTimeout = gEnv->GetValue("Proof.CollectTimeout", -1);
216 
217  // Should the workers be started dynamically; default: no
219  fDynamicStartupStep = -1;
220  fDynamicStartupNMax = -1;
221  TString dynconf = gEnv->GetValue("Proof.SimulateDynamicStartup", "");
222  if (dynconf.Length() > 0) {
224  fLastPollWorkers_s = time(0);
225  // Extract parameters
226  Int_t from = 0;
227  TString p;
228  if (dynconf.Tokenize(p, from, ":"))
229  if (p.IsDigit()) fDynamicStartupStep = p.Atoi();
230  if (dynconf.Tokenize(p, from, ":"))
231  if (p.IsDigit()) fDynamicStartupNMax = p.Atoi();
232  }
233 
234 
235  fProgressDialog = 0;
237 
238  // Client logging of messages from the workers
239  fRedirLog = kFALSE;
240  if (TestBit(TProof::kIsClient)) {
241  fLogFileName = Form("%s/session-%s.log", fWorkDir.Data(), GetName());
242  if ((fLogFileW = fopen(fLogFileName.Data(), "w")) == 0)
243  Error("Init", "could not create temporary logfile %s", fLogFileName.Data());
244  if ((fLogFileR = fopen(fLogFileName.Data(), "r")) == 0)
245  Error("Init", "could not open logfile %s for reading", fLogFileName.Data());
246  }
248 
251  TString(fCacheDir).ReplaceAll("/","%").Data()));
252 
253  // Create 'queries' locker instance and lock it
256  TString(fQueryDir).ReplaceAll("/","%").Data()));
257  fQueryLock->Lock();
258  // Create the query manager
261 
262  // Apply quotas, if any
263  Int_t maxq = gEnv->GetValue("ProofLite.MaxQueriesSaved", 10);
264  if (fQMgr && fQMgr->ApplyMaxQueries(maxq) != 0)
265  Warning("Init", "problems applying fMaxQueries");
266 
267  if (InitDataSetManager() != 0)
268  Warning("Init", "problems initializing the dataset manager");
269 
270  // Status of cluster
271  fNotIdle = 0;
272 
273  // Query type
274  fSync = kTRUE;
275 
276  // List of queries
277  fQueries = 0;
278  fOtherQueries = 0;
279  fDrawQueries = 0;
280  fMaxDrawQueries = 1;
281  fSeqNum = 0;
282 
283  // Remote ID of the session
284  fSessionID = -1;
285 
286  // Part of active query
287  fWaitingSlaves = 0;
288 
289  // Make remote PROOF player
290  fPlayer = 0;
291  MakePlayer("lite");
292 
293  fFeedback = new TList;
294  fFeedback->SetOwner();
295  fFeedback->SetName("FeedbackList");
297 
298  // Sort workers by descending performance index
300  fActiveSlaves = new TList;
301  fInactiveSlaves = new TList;
302  fUniqueSlaves = new TList;
303  fAllUniqueSlaves = new TList;
304  fNonUniqueMasters = new TList;
305  fBadSlaves = new TList;
306  fAllMonitor = new TMonitor;
307  fActiveMonitor = new TMonitor;
308  fUniqueMonitor = new TMonitor;
310  fCurrentMonitor = 0;
311  fServSock = 0;
312 
315 
316  // Control how to start the workers; copy-on-write (fork) is *very*
317  // experimental and available on Unix only.
319  if (gEnv->GetValue("ProofLite.ForkStartup", 0) != 0) {
320 #ifndef WIN32
322 #else
323  Warning("Init", "fork-based workers startup is not available on Windows - ignoring");
324 #endif
325  }
326 
327  fLoadedMacros = 0;
328  if (TestBit(TProof::kIsClient)) {
329 
330  // List of directories where to look for global packages
331  TString globpack = gEnv->GetValue("Proof.GlobalPackageDirs","");
332  TProofServ::ResolveKeywords(globpack);
333  Int_t nglb = TPackMgr::RegisterGlobalPath(globpack);
334  if (gDebug > 0)
335  Info("Init", " %d global package directories registered", nglb);
336  }
337 
338  // Start workers
339  if (SetupWorkers(0) != 0) {
340  Error("Init", "problems setting up workers");
341  return 0;
342  }
343 
344  // we are now properly initialized
345  fValid = kTRUE;
346 
347  // De-activate monitor (will be activated in Collect)
349 
350  // By default go into parallel mode
351  GoParallel(-1, kFALSE);
352 
353  // Send relevant initial state to slaves
355 
356  SetActive(kFALSE);
357 
358  if (IsValid()) {
359  // Activate input handler
361  // Set PROOF to running state
363  }
364  // We register the session as a socket so that cleanup is done properly
366  gROOT->GetListOfSockets()->Add(this);
367 
368  AskParallel();
369 
370  return fActiveSlaves->GetSize();
371 }
372 ////////////////////////////////////////////////////////////////////////////////
373 /// Destructor
374 
376 {
377  // Shutdown the workers
378  RemoveWorkers(0);
379 
380  if (!(fQMgr && fQMgr->Queries() && fQMgr->Queries()->GetSize())) {
381  // needed in case fQueryDir is on NFS ?!
382  gSystem->MakeDirectory(fQueryDir+"/.delete");
383  gSystem->Exec(Form("%s %s", kRM, fQueryDir.Data()));
384  }
385 
386  // Remove lock file
387  if (fQueryLock) {
389  fQueryLock->Unlock();
390  }
391 
395 
396  // Cleanup the socket
399 }
400 
401 ////////////////////////////////////////////////////////////////////////////////
402 /// Static method to determine the number of workers giving priority to users request.
403 /// Otherwise use the system information, if available, or just start
404 /// the minimal number, i.e. 2 .
405 
407 {
408  Bool_t notify = kFALSE;
409  if (fgWrksMax == -2) {
410  // Find the max number of workers, if any
411  TString sysname = "system.rootrc";
412  char *s = gSystem->ConcatFileName(TROOT::GetEtcDir(), sysname);
413  TEnv sysenv(0);
414  sysenv.ReadFile(s, kEnvGlobal);
415  fgWrksMax = sysenv.GetValue("ProofLite.MaxWorkers", -1);
416  // Notify once the user if its will is changed
417  notify = kTRUE;
418  if (s) delete[] s;
419  }
420  if (fgWrksMax == 0) {
421  ::Error("TProofLite::GetNumberOfWorkers",
422  "PROOF-Lite disabled by the system administrator: sorry!");
423  return 0;
424  }
425 
426  TString nw;
427  Int_t nWorkers = -1;
428  Bool_t urlSetting = kFALSE;
429  if (url && strlen(url)) {
430  nw = url;
431  Int_t in = nw.Index("workers=");
432  if (in != kNPOS) {
433  nw.Remove(0, in + strlen("workers="));
434  while (!nw.IsDigit())
435  nw.Remove(nw.Length()-1);
436  if (!nw.IsNull()) {
437  if ((nWorkers = nw.Atoi()) <= 0) {
438  ::Warning("TProofLite::GetNumberOfWorkers",
439  "number of workers specified by 'workers='"
440  " is non-positive: using default");
441  } else {
442  urlSetting = kFALSE;
443  }
444  }
445  }
446  }
447  if (!urlSetting && fgProofEnvList) {
448  // Check PROOF_NWORKERS
449  TNamed *nm = (TNamed *) fgProofEnvList->FindObject("PROOF_NWORKERS");
450  if (nm) {
451  nw = nm->GetTitle();
452  if (nw.IsDigit()) {
453  if ((nWorkers = nw.Atoi()) == 0) {
454  ::Warning("TProofLite::GetNumberOfWorkers",
455  "number of workers specified by 'workers='"
456  " is non-positive: using default");
457  }
458  }
459  }
460  }
461  if (nWorkers <= 0) {
462  nWorkers = gEnv->GetValue("ProofLite.Workers", -1);
463  if (nWorkers <= 0) {
464  SysInfo_t si;
465  if (gSystem->GetSysInfo(&si) == 0 && si.fCpus > 2) {
466  nWorkers = si.fCpus;
467  } else {
468  // Two workers by default
469  nWorkers = 2;
470  }
471  if (notify) notify = kFALSE;
472  }
473  }
474  // Apply the max, if any
475  if (fgWrksMax > 0 && fgWrksMax < nWorkers) {
476  if (notify)
477  ::Warning("TProofLite::GetNumberOfWorkers", "number of PROOF-Lite workers limited by"
478  " the system administrator to %d", fgWrksMax);
479  nWorkers = fgWrksMax;
480  }
481 
482  // Done
483  return nWorkers;
484 }
485 
486 ////////////////////////////////////////////////////////////////////////////////
487 /// Start up PROOF workers.
488 
490 {
491  // Create server socket on the assigned UNIX sock path
492  if (!fServSock) {
493  if ((fServSock = new TServerSocket(fSockPath))) {
495  // Remove from the list so that cleanup can be done in the correct order
496  gROOT->GetListOfSockets()->Remove(fServSock);
497  }
498  }
499  if (!fServSock || !fServSock->IsValid()) {
500  Error("SetupWorkers",
501  "unable to create server socket for internal communications");
503  return -1;
504  }
505 
506  // Create a monitor and add the socket to it
507  TMonitor *mon = new TMonitor;
508  mon->Add(fServSock);
509 
510  TList started;
511  TSlave *wrk = 0;
512  Int_t nWrksDone = 0, nWrksTot = -1;
513  TString fullord;
514 
515  if (opt == 0) {
516  nWrksTot = fForkStartup ? 1 : fNWorkers;
517  // Now we create the worker applications which will call us back to finalize
518  // the setup
519  Int_t ord = 0;
520  for (; ord < nWrksTot; ord++) {
521 
522  // Ordinal for this worker server
523  const char *o = (gProofServ) ? gProofServ->GetOrdinal() : "0";
524  fullord.Form("%s.%d", o, ord);
525 
526  // Create environment files
527  SetProofServEnv(fullord);
528 
529  // Create worker server and add to the list
530  if ((wrk = CreateSlave("lite", fullord, 100, fImage, fWorkDir)))
531  started.Add(wrk);
532 
533  // Notify
534  NotifyStartUp("Opening connections to workers", ++nWrksDone, nWrksTot);
535 
536  } //end of worker loop
537  } else {
538  if (!fForkStartup) {
539  Warning("SetupWorkers", "standard startup: workers already started");
540  return -1;
541  }
542  nWrksTot = fNWorkers - 1;
543  // Now we create the worker applications which will call us back to finalize
544  // the setup
545  TString clones;
546  Int_t ord = 0;
547  for (; ord < nWrksTot; ord++) {
548 
549  // Ordinal for this worker server
550  const char *o = (gProofServ) ? gProofServ->GetOrdinal() : "0";
551  fullord.Form("%s.%d", o, ord + 1);
552  if (!clones.IsNull()) clones += " ";
553  clones += fullord;
554 
555  // Create worker server and add to the list
556  if ((wrk = CreateSlave("lite", fullord, -1, fImage, fWorkDir)))
557  started.Add(wrk);
558 
559  // Notify
560  NotifyStartUp("Opening connections to workers", ++nWrksDone, nWrksTot);
561 
562  } //end of worker loop
563 
564  // Send the request
566  m << clones;
567  Broadcast(m, kActive);
568  }
569 
570  // Wait for call backs
571  nWrksDone = 0;
572  nWrksTot = started.GetSize();
573  Int_t nSelects = 0;
574  Int_t to = gEnv->GetValue("ProofLite.StartupTimeOut", 5) * 1000;
575  while (started.GetSize() > 0 && nSelects < nWrksTot) {
576 
577  // Wait for activity on the socket for max 5 secs
578  TSocket *xs = mon->Select(to);
579 
580  // Count attempts and check
581  nSelects++;
582  if (xs == (TSocket *) -1) continue;
583 
584  // Get the connection
585  TSocket *s = fServSock->Accept();
586  if (s && s->IsValid()) {
587  // Receive ordinal
588  TMessage *msg = 0;
589  if (s->Recv(msg) < 0) {
590  Warning("SetupWorkers", "problems receiving message from accepted socket!");
591  } else {
592  if (msg) {
593  TString ord;
594  *msg >> ord;
595  // Find who is calling back
596  if ((wrk = (TSlave *) started.FindObject(ord))) {
597  // Remove it from the started list
598  started.Remove(wrk);
599 
600  // Assign tis socket the selected worker
601  wrk->SetSocket(s);
602  // Remove socket from global TROOT socket list. Only the TProof object,
603  // representing all worker sockets, will be added to this list. This will
604  // ensure the correct termination of all proof servers in case the
605  // root session terminates.
607  gROOT->GetListOfSockets()->Remove(s);
608  }
609  if (wrk->IsValid()) {
610  // Set the input handler
611  wrk->SetInputHandler(new TProofInputHandler(this, wrk->GetSocket()));
612  // Set fParallel to 1 for workers since they do not
613  // report their fParallel with a LOG_DONE message
614  wrk->fParallel = 1;
615  // Finalize setup of the server
616  wrk->SetupServ(TSlave::kSlave, 0);
617  }
618 
619  // Monitor good workers
620  fSlaves->Add(wrk);
621  if (wrk->IsValid()) {
622  if (opt == 1) fActiveSlaves->Add(wrk);
623  fAllMonitor->Add(wrk->GetSocket());
624  // Record also in the list for termination
625  if (startedWorkers) startedWorkers->Add(wrk);
626  // Notify startup operations
627  NotifyStartUp("Setting up worker servers", ++nWrksDone, nWrksTot);
628  } else {
629  // Flag as bad
630  fBadSlaves->Add(wrk);
631  }
632  }
633  } else {
634  Warning("SetupWorkers", "received empty message from accepted socket!");
635  }
636  }
637  }
638  }
639 
640  // Cleanup the monitor and the server socket
641  mon->DeActivateAll();
642  delete mon;
643 
644  // Create Progress dialog, if needed
645  if (!gROOT->IsBatch() && !fProgressDialog) {
646  if ((fProgressDialog =
647  gROOT->GetPluginManager()->FindHandler("TProofProgressDialog")))
648  if (fProgressDialog->LoadPlugin() == -1)
649  fProgressDialog = 0;
650  }
651 
652  if (opt == 1) {
653  // Collect replies
654  Collect(kActive);
655  // Update group view
656  SendGroupView();
657  // By default go into parallel mode
658  SetParallel(-1, 0);
659  }
660  // Done
661  return 0;
662 }
663 
664 ////////////////////////////////////////////////////////////////////////////////
665 /// Notify setting-up operation message
666 
667 void TProofLite::NotifyStartUp(const char *action, Int_t done, Int_t tot)
668 {
669  Int_t frac = (Int_t) (done*100.)/tot;
670  char msg[512] = {0};
671  if (frac >= 100) {
672  snprintf(msg, 512, "%s: OK (%d workers) \n",
673  action, tot);
674  } else {
675  snprintf(msg, 512, "%s: %d out of %d (%d %%)\r",
676  action, done, tot, frac);
677  }
678  fprintf(stderr,"%s", msg);
679 }
680 
681 ////////////////////////////////////////////////////////////////////////////////
682 /// Create environment files for worker 'ord'
683 
685 {
686  // Check input
687  if (!ord || strlen(ord) <= 0) {
688  Error("SetProofServEnv", "ordinal string undefined");
689  return -1;
690  }
691 
692  // ROOT env file
693  TString rcfile(Form("%s/worker-%s.rootrc", fWorkDir.Data(), ord));
694  FILE *frc = fopen(rcfile.Data(), "w");
695  if (!frc) {
696  Error("SetProofServEnv", "cannot open rc file %s", rcfile.Data());
697  return -1;
698  }
699 
700  // The session working dir depends on the role
701  fprintf(frc,"# The session working dir\n");
702  fprintf(frc,"ProofServ.SessionDir: %s/worker-%s\n", fWorkDir.Data(), ord);
703 
704  // The session unique tag
705  fprintf(frc,"# Session tag\n");
706  fprintf(frc,"ProofServ.SessionTag: %s\n", GetName());
707 
708  // Log / Debug level
709  fprintf(frc,"# Proof Log/Debug level\n");
710  fprintf(frc,"Proof.DebugLevel: %d\n", gDebug);
711 
712  // Ordinal number
713  fprintf(frc,"# Ordinal number\n");
714  fprintf(frc,"ProofServ.Ordinal: %s\n", ord);
715 
716  // ROOT Version tag
717  fprintf(frc,"# ROOT Version tag\n");
718  fprintf(frc,"ProofServ.RootVersionTag: %s\n", gROOT->GetVersion());
719 
720  // Work dir
721  TString sandbox = fSandbox;
722  if (GetSandbox(sandbox, kFALSE, "ProofServ.Sandbox") != 0)
723  Warning("SetProofServEnv", "problems getting sandbox string for worker");
724  fprintf(frc,"# Users sandbox\n");
725  fprintf(frc, "ProofServ.Sandbox: %s\n", sandbox.Data());
726 
727  // Cache dir
728  fprintf(frc,"# Users cache\n");
729  fprintf(frc, "ProofServ.CacheDir: %s\n", fCacheDir.Data());
730 
731  // Package dir
732  fprintf(frc,"# Users packages\n");
733  fprintf(frc, "ProofServ.PackageDir: %s\n", fPackMgr->GetDir());
734 
735  // Image
736  fprintf(frc,"# Server image\n");
737  fprintf(frc, "ProofServ.Image: %s\n", fImage.Data());
738 
739  // Set Open socket
740  fprintf(frc,"# Open socket\n");
741  fprintf(frc, "ProofServ.OpenSock: %s\n", fSockPath.Data());
742 
743  // Client Protocol
744  fprintf(frc,"# Client Protocol\n");
745  fprintf(frc, "ProofServ.ClientVersion: %d\n", kPROOF_Protocol);
746 
747  // ROOT env file created
748  fclose(frc);
749 
750  // System env file
751  TString envfile(Form("%s/worker-%s.env", fWorkDir.Data(), ord));
752  FILE *fenv = fopen(envfile.Data(), "w");
753  if (!fenv) {
754  Error("SetProofServEnv", "cannot open env file %s", envfile.Data());
755  return -1;
756  }
757  // ROOTSYS
758  fprintf(fenv, "export ROOTSYS=%s\n", TROOT::GetRootSys().Data());
759  // Conf dir
760  fprintf(fenv, "export ROOTCONFDIR=%s\n", TROOT::GetRootSys().Data());
761  // TMPDIR
762  fprintf(fenv, "export TMPDIR=%s\n", gSystem->TempDirectory());
763  // Log file in the log dir
764  TString logfile(Form("%s/worker-%s.log", fWorkDir.Data(), ord));
765  fprintf(fenv, "export ROOTPROOFLOGFILE=%s\n", logfile.Data());
766  // RC file
767  fprintf(fenv, "export ROOTRCFILE=%s\n", rcfile.Data());
768  // ROOT version tag (needed in building packages)
769  fprintf(fenv, "export ROOTVERSIONTAG=%s\n", gROOT->GetVersion());
770  // This flag can be used to identify the type of worker; for example, in BUILD.sh or SETUP.C ...
771  fprintf(fenv, "export ROOTPROOFLITE=%d\n", fNWorkers);
772  // Local files are on the local file system
773  fprintf(fenv, "export LOCALDATASERVER=\"file://\"\n");
774  // Set the user envs
775  if (fgProofEnvList) {
776  TString namelist;
777  TIter nxenv(fgProofEnvList);
778  TNamed *env = 0;
779  while ((env = (TNamed *)nxenv())) {
780  TString senv(env->GetTitle());
781  ResolveKeywords(senv, ord, logfile.Data());
782  fprintf(fenv, "export %s=%s\n", env->GetName(), senv.Data());
783  if (namelist.Length() > 0)
784  namelist += ',';
785  namelist += env->GetName();
786  }
787  fprintf(fenv, "export PROOF_ALLVARS=%s\n", namelist.Data());
788  }
789 
790  // System env file created
791  fclose(fenv);
792 
793  // Done
794  return 0;
795 }
796 
797 ////////////////////////////////////////////////////////////////////////////////
798 /// Resolve some keywords in 's'
799 /// <logfilewrk>, <user>, <rootsys>, <cpupin>
800 
801 void TProofLite::ResolveKeywords(TString &s, const char *ord,
802  const char *logfile)
803 {
804  if (!logfile) return;
805 
806  // Log file
807  if (s.Contains("<logfilewrk>") && logfile) {
808  TString lfr(logfile);
809  if (lfr.EndsWith(".log")) lfr.Remove(lfr.Last('.'));
810  s.ReplaceAll("<logfilewrk>", lfr.Data());
811  }
812 
813  // user
814  if (gSystem->Getenv("USER") && s.Contains("<user>")) {
815  s.ReplaceAll("<user>", gSystem->Getenv("USER"));
816  }
817 
818  // rootsys
819  if (gSystem->Getenv("ROOTSYS") && s.Contains("<rootsys>")) {
820  s.ReplaceAll("<rootsys>", gSystem->Getenv("ROOTSYS"));
821  }
822 
823  // cpupin: pin to this CPU num (from 0 to ncpus-1)
824  if (s.Contains("<cpupin>")) {
825  TString o = ord;
826  Int_t n = o.Index('.');
827  if (n != kNPOS) {
828 
829  o.Remove(0, n+1);
830  n = o.Atoi(); // n is ord
831 
832  TString cpuPinList;
833  {
834  const TList *envVars = GetEnvVars();
835  TNamed *var;
836  if (envVars) {
837  var = dynamic_cast<TNamed *>(envVars->FindObject("PROOF_SLAVE_CPUPIN_ORDER"));
838  if (var) cpuPinList = var->GetTitle();
839  }
840  }
841 
842  UInt_t nCpus = 1;
843  {
844  SysInfo_t si;
845  if (gSystem->GetSysInfo(&si) == 0 && (si.fCpus > 0))
846  nCpus = si.fCpus;
847  else nCpus = 1; // fallback
848  }
849 
850  if (cpuPinList.IsNull() || (cpuPinList == "*")) {
851  // Use processors in order
852  n = n % nCpus;
853  }
854  else {
855  // Use processors in user's order
856  // n is now the ordinal, converting to idx
857  n = n % (cpuPinList.CountChar('+')+1);
858  TString tok;
859  Ssiz_t from = 0;
860  for (Int_t i=0; cpuPinList.Tokenize(tok, from, "\\+"); i++) {
861  if (i == n) {
862  n = (tok.Atoi() % nCpus);
863  break;
864  }
865  }
866  }
867 
868  o.Form("%d", n);
869  }
870  else {
871  o = "0"; // should not happen
872  }
873  s.ReplaceAll("<cpupin>", o);
874  }
875 }
876 
877 ////////////////////////////////////////////////////////////////////////////////
878 /// Create the sandbox for this session
879 
881 {
882  // Make sure the sandbox area exist and is writable
883  if (GetSandbox(fSandbox, kTRUE, "ProofLite.Sandbox") != 0) return -1;
884 
885  // Package Manager
886  TString packdir = gEnv->GetValue("Proof.PackageDir", "");
887  if (packdir.IsNull())
888  packdir.Form("%s/%s", fSandbox.Data(), kPROOF_PackDir);
889  if (AssertPath(packdir, kTRUE) != 0) return -1;
890  fPackMgr = new TPackMgr(packdir);
891 
892  // Cache Dir
893  fCacheDir = gEnv->GetValue("Proof.CacheDir", "");
894  if (fCacheDir.IsNull())
896  if (AssertPath(fCacheDir, kTRUE) != 0) return -1;
897 
898  // Data Set Dir
899  fDataSetDir = gEnv->GetValue("Proof.DataSetDir", "");
900  if (fDataSetDir.IsNull())
902  if (AssertPath(fDataSetDir, kTRUE) != 0) return -1;
903 
904  // Session unique tag (name of this TProof instance)
905  TString stag;
906  stag.Form("%s-%d-%d", gSystem->HostName(), (int)time(0), gSystem->GetPid());
907  SetName(stag.Data());
908 
909  Int_t subpath = gEnv->GetValue("ProofLite.SubPath", 1);
910  // Subpath for this session in the fSandbox (<sandbox>/path-to-working-dir)
911  TString sessdir;
912  if (subpath != 0) {
913  sessdir = gSystem->WorkingDirectory();
914  sessdir.ReplaceAll(gSystem->HomeDirectory(),"");
915  sessdir.ReplaceAll("/","-");
916  sessdir.Replace(0,1,"/",1);
917  sessdir.Insert(0, fSandbox.Data());
918  } else {
919  // USe the sandbox
920  sessdir = fSandbox;
921  }
922 
923  // Session working and queries dir
924  fWorkDir.Form("%s/session-%s", sessdir.Data(), stag.Data());
925  if (AssertPath(fWorkDir, kTRUE) != 0) return -1;
926 
927  // Create symlink to the last session
928  TString lastsess;
929  lastsess.Form("%s/last-lite-session", sessdir.Data());
930  gSystem->Unlink(lastsess);
931  gSystem->Symlink(fWorkDir, lastsess);
932 
933  // Queries Dir: local to the working dir, unless required differently
934  fQueryDir = gEnv->GetValue("Proof.QueryDir", "");
935  if (fQueryDir.IsNull())
936  fQueryDir.Form("%s/%s", sessdir.Data(), kPROOF_QueryDir);
937  if (AssertPath(fQueryDir, kTRUE) != 0) return -1;
938 
939  // Cleanup old sessions dirs
940  CleanupSandbox();
941 
942  // Done
943  return 0;
944 }
945 
946 ////////////////////////////////////////////////////////////////////////////////
947 /// Print status of PROOF-Lite cluster.
948 
949 void TProofLite::Print(Option_t *option) const
950 {
951  TString ord;
952  if (gProofServ) ord.Form("%s ", gProofServ->GetOrdinal());
953  if (IsParallel())
954  Printf("*** PROOF-Lite cluster %s(parallel mode, %d workers):", ord.Data(), GetParallel());
955  else
956  Printf("*** PROOF-Lite cluster %s(sequential mode)", ord.Data());
957 
958  if (gProofServ) {
959  TString url(gSystem->HostName());
960  // Add port to URL, if defined
961  Int_t port = gEnv->GetValue("ProofServ.XpdPort", 1093);
962  if (port > -1) url.Form("%s:%d",gSystem->HostName(), port);
963  Printf("URL: %s", url.Data());
964  } else {
965  Printf("Host name: %s", gSystem->HostName());
966  }
967  Printf("User: %s", GetUser());
968  TString ver(gROOT->GetVersion());
969  ver += TString::Format("|%s", gROOT->GetGitCommit());
970  if (gSystem->Getenv("ROOTVERSIONTAG"))
971  ver += TString::Format("|%s", gSystem->Getenv("ROOTVERSIONTAG"));
972  Printf("ROOT version|rev|tag: %s", ver.Data());
973  Printf("Architecture-Compiler: %s-%s", gSystem->GetBuildArch(),
975  Printf("Protocol version: %d", GetClientProtocol());
976  Printf("Working directory: %s", gSystem->WorkingDirectory());
977  Printf("Communication path: %s", fSockPath.Data());
978  Printf("Log level: %d", GetLogLevel());
979  Printf("Number of workers: %d", GetNumberOfSlaves());
980  Printf("Number of active workers: %d", GetNumberOfActiveSlaves());
981  Printf("Number of unique workers: %d", GetNumberOfUniqueSlaves());
982  Printf("Number of inactive workers: %d", GetNumberOfInactiveSlaves());
983  Printf("Number of bad workers: %d", GetNumberOfBadSlaves());
984  Printf("Total MB's processed: %.2f", float(GetBytesRead())/(1024*1024));
985  Printf("Total real time used (s): %.3f", GetRealTime());
986  Printf("Total CPU time used (s): %.3f", GetCpuTime());
987  if (TString(option).Contains("a", TString::kIgnoreCase) && GetNumberOfSlaves()) {
988  Printf("List of workers:");
989  TIter nextslave(fSlaves);
990  while (TSlave* sl = dynamic_cast<TSlave*>(nextslave())) {
991  if (sl->IsValid())
992  sl->Print(option);
993  }
994  }
995 }
996 
997 ////////////////////////////////////////////////////////////////////////////////
998 /// Create a TProofQueryResult instance for this query.
999 
1001  Long64_t fst, TDSet *dset,
1002  const char *selec)
1003 {
1004  // Increment sequential number
1005  Int_t seqnum = -1;
1006  if (fQMgr) {
1008  seqnum = fQMgr->SeqNum();
1009  }
1010 
1011  // Create the instance and add it to the list
1012  TProofQueryResult *pqr = new TProofQueryResult(seqnum, opt,
1013  fPlayer->GetInputList(), nent,
1014  fst, dset, selec,
1015  (dset ? dset->GetEntryList() : 0));
1016  // Title is the session identifier
1017  pqr->SetTitle(GetName());
1018 
1019  return pqr;
1020 }
1021 
1022 ////////////////////////////////////////////////////////////////////////////////
1023 /// Set query in running state.
1024 
1026 {
1027  // Record current position in the log file at start
1028  fflush(fLogFileW);
1029  Int_t startlog = lseek(fileno(fLogFileW), (off_t) 0, SEEK_END);
1030 
1031  // Add some header to logs
1032  Printf(" ");
1033  Info("SetQueryRunning", "starting query: %d", pq->GetSeqNum());
1034 
1035  // Build the list of loaded PAR packages
1036  TString parlist = "";
1037  fPackMgr->GetEnabledPackages(parlist);
1038 
1039  // Set in running state
1040  pq->SetRunning(startlog, parlist, GetParallel());
1041 
1042  // Bytes and CPU at start (we will calculate the differential at end)
1043  AskStatistics();
1045 }
1046 
1047 ////////////////////////////////////////////////////////////////////////////////
1048 /// Execute the specified drawing action on a data set (TDSet).
1049 /// Event- or Entry-lists should be set in the data set object using
1050 /// TDSet::SetEntryList.
1051 /// Returns -1 in case of error or number of selected events otherwise.
1052 
1053 Long64_t TProofLite::DrawSelect(TDSet *dset, const char *varexp,
1054  const char *selection, Option_t *option,
1056 {
1057  if (!IsValid()) return -1;
1058 
1059  // Make sure that asynchronous processing is not active
1060  if (!IsIdle()) {
1061  Info("DrawSelect","not idle, asynchronous Draw not supported");
1062  return -1;
1063  }
1064  TString opt(option);
1065  Int_t idx = opt.Index("ASYN", 0, TString::kIgnoreCase);
1066  if (idx != kNPOS)
1067  opt.Replace(idx,4,"");
1068 
1069  // Fill the internal variables
1070  fVarExp = varexp;
1071  fSelection = selection;
1072 
1073  return Process(dset, "draw:", opt, nentries, first);
1074 }
1075 
1076 ////////////////////////////////////////////////////////////////////////////////
1077 /// Process a data set (TDSet) using the specified selector (.C) file.
1078 /// Entry- or event-lists should be set in the data set object using
1079 /// TDSet::SetEntryList.
1080 /// The return value is -1 in case of error and TSelector::GetStatus() in
1081 /// in case of success.
1082 
1083 Long64_t TProofLite::Process(TDSet *dset, const char *selector, Option_t *option,
1085 {
1086  // For the time being cannot accept other queries if not idle, even if in async
1087  // mode; needs to set up an event handler to manage that
1088 
1089  TString opt(option), optfb, outfile;
1090  // Enable feedback, if required
1091  if (opt.Contains("fb=") || opt.Contains("feedback=")) SetFeedback(opt, optfb, 0);
1092  // Define output file, either from 'opt' or the default one
1093  if (HandleOutputOptions(opt, outfile, 0) != 0) return -1;
1094 
1095  // Resolve query mode
1096  fSync = (GetQueryMode(opt) == kSync);
1097  if (!fSync) {
1098  Info("Process","asynchronous mode not yet supported in PROOF-Lite");
1099  return -1;
1100  }
1101 
1102  if (!IsIdle()) {
1103  // Notify submission
1104  Info("Process", "not idle: cannot accept queries");
1105  return -1;
1106  }
1107 
1108  // Cleanup old temporary datasets
1109  if (IsIdle() && fRunningDSets && fRunningDSets->GetSize() > 0) {
1111  fRunningDSets->Delete();
1112  }
1113 
1114  if (!IsValid() || !fQMgr || !fPlayer) {
1115  Error("Process", "invalid sesion or query-result manager undefined!");
1116  return -1;
1117  }
1118 
1119  // Make sure that all enabled workers get some work, unless stated
1120  // differently
1121  if (!fPlayer->GetInputList()->FindObject("PROOF_MaxSlavesPerNode"))
1122  SetParameter("PROOF_MaxSlavesPerNode", (Long_t)0);
1123 
1124  Bool_t hasNoData = (!dset || (dset && dset->TestBit(TDSet::kEmpty))) ? kTRUE : kFALSE;
1125 
1126  // If just a name was given to identify the dataset, retrieve it from the
1127  // local files
1128  // Make sure the dataset contains the information needed
1129  TString emsg;
1130  if ((!hasNoData) && dset->GetListOfElements()->GetSize() == 0) {
1131  if (TProof::AssertDataSet(dset, fPlayer->GetInputList(), fDataSetManager, emsg) != 0) {
1132  Error("Process", "from AssertDataSet: %s", emsg.Data());
1133  return -1;
1134  }
1135  if (dset->GetListOfElements()->GetSize() == 0) {
1136  Error("Process", "no files to process!");
1137  return -1;
1138  }
1139  } else if (hasNoData) {
1140  // Check if we are required to process with TPacketizerFile a registered dataset
1141  TNamed *ftp = dynamic_cast<TNamed *>(fPlayer->GetInputList()->FindObject("PROOF_FilesToProcess"));
1142  if (ftp) {
1143  TString dsn(ftp->GetTitle());
1144  if (!dsn.Contains(":") || dsn.BeginsWith("dataset:")) {
1145  dsn.ReplaceAll("dataset:", "");
1146  // Make sure we have something in input and a dataset manager
1147  if (!fDataSetManager) {
1148  emsg.Form("dataset manager not initialized!");
1149  } else {
1150  TFileCollection *fc = 0;
1151  // Get the dataset
1152  if (!(fc = fDataSetManager->GetDataSet(dsn))) {
1153  emsg.Form("requested dataset '%s' does not exists", dsn.Data());
1154  } else {
1155  TMap *fcmap = TProofServ::GetDataSetNodeMap(fc, emsg);
1156  if (fcmap) {
1157  fPlayer->GetInputList()->Remove(ftp);
1158  delete ftp;
1159  fcmap->SetOwner(kTRUE);
1160  fcmap->SetName("PROOF_FilesToProcess");
1161  fPlayer->GetInputList()->Add(fcmap);
1162  }
1163  }
1164  }
1165  if (!emsg.IsNull()) {
1166  Error("HandleProcess", "%s", emsg.Data());
1167  return -1;
1168  }
1169  }
1170  }
1171  }
1172 
1173  TString selec(selector), varexp, selection, objname;
1174  // If a draw query, extract the relevant info
1175  if (selec.BeginsWith("draw:")) {
1176  varexp = fVarExp;
1177  selection = fSelection;
1178  // Decode now the expression
1179  if (fPlayer->GetDrawArgs(varexp, selection, opt, selec, objname) != 0) {
1180  Error("Process", "draw query: error parsing arguments '%s', '%s', '%s'",
1181  varexp.Data(), selection.Data(), opt.Data());
1182  return -1;
1183  }
1184  }
1185 
1186  // Create instance of query results (the data set is added after Process)
1187  TProofQueryResult *pq = MakeQueryResult(nentries, opt, first, 0, selec);
1188 
1189  // Check if queries must be saved into files
1190  // Automatic saving is controlled by ProofLite.AutoSaveQueries
1191  Bool_t savequeries =
1192  (!strcmp(gEnv->GetValue("ProofLite.AutoSaveQueries", "off"), "on")) ? kTRUE : kFALSE;
1193 
1194  // Keep queries in memory and how many (-1 = all, 0 = none, ...)
1195  Int_t memqueries = gEnv->GetValue("ProofLite.MaxQueriesMemory", 1);
1196 
1197  // If not a draw action add the query to the main list
1198  if (!(pq->IsDraw())) {
1199  if (fQMgr->Queries()) {
1200  if (memqueries != 0) fQMgr->Queries()->Add(pq);
1201  if (memqueries >= 0 && fQMgr->Queries()->GetSize() > memqueries) {
1202  // Remove oldest
1203  TObject *qfst = fQMgr->Queries()->First();
1204  fQMgr->Queries()->Remove(qfst);
1205  delete qfst;
1206  }
1207  }
1208  // Also save it to queries dir
1209  if (savequeries) fQMgr->SaveQuery(pq);
1210  }
1211 
1212  // Set the query number
1213  fSeqNum = pq->GetSeqNum();
1214 
1215  // Set in running state
1216  SetQueryRunning(pq);
1217 
1218  // Save to queries dir, if not standard draw
1219  if (!(pq->IsDraw())) {
1220  if (savequeries) fQMgr->SaveQuery(pq);
1221  } else {
1223  }
1224 
1225  // Start or reset the progress dialog
1226  if (!gROOT->IsBatch()) {
1227  Int_t dsz = (dset && dset->GetListOfElements()) ? dset->GetListOfElements()->GetSize() : -1;
1228  if (fProgressDialog &&
1230  if (!fProgressDialogStarted) {
1231  fProgressDialog->ExecPlugin(5, this, selec.Data(), dsz,
1232  first, nentries);
1234  } else {
1235  ResetProgressDialog(selec.Data(), dsz, first, nentries);
1236  }
1237  }
1239  }
1240 
1241  // Add query results to the player lists
1242  if (!(pq->IsDraw()))
1243  fPlayer->AddQueryResult(pq);
1244 
1245  // Set query currently processed
1246  fPlayer->SetCurrentQuery(pq);
1247 
1248  // Make sure the unique query tag is available as TNamed object in the
1249  // input list so that it can be used in TSelectors for monitoring
1250  TNamed *qtag = (TNamed *) fPlayer->GetInputList()->FindObject("PROOF_QueryTag");
1251  if (qtag) {
1252  qtag->SetTitle(Form("%s:%s",pq->GetTitle(),pq->GetName()));
1253  } else {
1254  TObject *o = fPlayer->GetInputList()->FindObject("PROOF_QueryTag");
1255  if (o) fPlayer->GetInputList()->Remove(o);
1256  fPlayer->AddInput(new TNamed("PROOF_QueryTag",
1257  Form("%s:%s",pq->GetTitle(),pq->GetName())));
1258  }
1259 
1260  // Set PROOF to running state
1262 
1263  // deactivate the default application interrupt handler
1264  // ctrl-c's will be forwarded to PROOF to stop the processing
1265  TSignalHandler *sh = 0;
1266  if (fSync) {
1267  if (gApplication)
1269  }
1270 
1271  // Make sure we get a fresh result
1272  fOutputList.Clear();
1273 
1274  // Start the additional workers now if using fork-based startup
1275  TList *startedWorkers = 0;
1276  if (fForkStartup) {
1277  startedWorkers = new TList;
1278  startedWorkers->SetOwner(kFALSE);
1279  SetupWorkers(1, startedWorkers);
1280  }
1281 
1282  // This is the end of preparation
1283  fQuerySTW.Reset();
1284 
1285  Long64_t rv = 0;
1286  if (!(pq->IsDraw())) {
1287  if (selector && strlen(selector)) {
1288  rv = fPlayer->Process(dset, selec, opt, nentries, first);
1289  } else {
1290  rv = fPlayer->Process(dset, fSelector, opt, nentries, first);
1291  }
1292  } else {
1293  rv = fPlayer->DrawSelect(dset, varexp, selection, opt, nentries, first);
1294  }
1295 
1296  // This is the end of merging
1297  fQuerySTW.Stop();
1298  Float_t rt = fQuerySTW.RealTime();
1299  // Update the query content
1300  TQueryResult *qr = GetQueryResult();
1301  if (qr) {
1302  qr->SetTermTime(rt);
1303  // Preparation time is always null in PROOF-Lite
1304  }
1305 
1306  // Disable feedback, if required
1307  if (!optfb.IsNull()) SetFeedback(opt, optfb, 1);
1308 
1309  if (fSync) {
1310 
1311  // Terminate additional workers if using fork-based startup
1312  if (fForkStartup && startedWorkers) {
1313  RemoveWorkers(startedWorkers);
1314  SafeDelete(startedWorkers);
1315  }
1316 
1317  // reactivate the default application interrupt handler
1318  if (sh)
1320 
1321  // Return number of events processed
1324  ? kTRUE : kFALSE;
1325  if (abort) fPlayer->StopProcess(kTRUE);
1326  Emit("StopProcess(Bool_t)", abort);
1327  }
1328 
1329  // In PROOFLite this has to be done once only in TProofLite::Process
1331  // If the last object, notify the GUI that the result arrived
1332  QueryResultReady(Form("%s:%s", pq->GetTitle(), pq->GetName()));
1333  // Processing is over
1334  UpdateDialog();
1335 
1336  // Save the data set into the TQueryResult (should be done after Process to avoid
1337  // improper deletion during collection)
1338  if (rv == 0 && dset && !dset->TestBit(TDSet::kEmpty) && pq->GetInputList()) {
1339  pq->GetInputList()->Add(dset);
1340  if (dset->GetEntryList())
1341  pq->GetInputList()->Add(dset->GetEntryList());
1342  }
1343 
1344  // Register any dataset produced during this processing, if required
1346  TNamed *psr = (TNamed *) fPlayer->GetOutputList()->FindObject("PROOFSERV_RegisterDataSet");
1347  if (psr) {
1348  TString err;
1350  fPlayer->GetOutputList(), fDataSetManager, err) != 0)
1351  Warning("ProcessNext", "problems registering produced datasets: %s", err.Data());
1352  fPlayer->GetOutputList()->Remove(psr);
1353  delete psr;
1354  }
1355  }
1356 
1357  // Complete filling of the TQueryResult instance
1358  AskStatistics();
1359  if (!(pq->IsDraw())) {
1360  if (fQMgr->FinalizeQuery(pq, this, fPlayer)) {
1361  if (savequeries) fQMgr->SaveQuery(pq, -1);
1362  }
1363  }
1364 
1365  // Remove aborted queries from the list
1368  if (fQMgr) fQMgr->RemoveQuery(pq);
1369  } else {
1370  // If the last object, notify the GUI that the result arrived
1371  QueryResultReady(Form("%s:%s", pq->GetTitle(), pq->GetName()));
1372  // Keep in memory only light info about a query
1373  if (!(pq->IsDraw()) && memqueries >= 0) {
1374  if (fQMgr && fQMgr->Queries()) {
1375  TQueryResult *pqr = pq->CloneInfo();
1376  if (pqr) fQMgr->Queries()->Add(pqr);
1377  // Remove from the fQueries list
1378  fQMgr->Queries()->Remove(pq);
1379  }
1380  }
1381  // To get the prompt back
1382  TString msg;
1383  msg.Form("Lite-0: all output objects have been merged ");
1384  fprintf(stderr, "%s\n", msg.Data());
1385  }
1386  // Save the performance info, if required
1387  if (!fPerfTree.IsNull()) {
1388  if (SavePerfTree() != 0) Error("Process", "saving performance info ...");
1389  // Must be re-enabled each time
1390  SetPerfTree(0);
1391  }
1392  }
1393  // Finalise output file settings (opt is ignored in here)
1394  if (HandleOutputOptions(opt, outfile, 1) != 0) return -1;
1395 
1396  // Retrieve status from the output list
1397  if (rv >= 0) {
1398  TParameter<Long64_t> *sst =
1399  (TParameter<Long64_t> *) fOutputList.FindObject("PROOF_SelectorStatus");
1400  if (sst) rv = sst->GetVal();
1401  }
1402 
1403 
1404  // Done
1405  return rv;
1406 }
1407 
1408 ////////////////////////////////////////////////////////////////////////////////
1409 /// Initialize the dataset manager from directives or from defaults
1410 /// Return 0 on success, -1 on failure
1411 
1413 {
1414  fDataSetManager = 0;
1415 
1416  // Default user and group
1417  TString user("???"), group("default");
1418  UserGroup_t *pw = gSystem->GetUserInfo();
1419  if (pw) {
1420  user = pw->fUser;
1421  delete pw;
1422  }
1423 
1424  // Dataset manager instance via plug-in
1425  TPluginHandler *h = 0;
1426  TString dsm = gEnv->GetValue("Proof.DataSetManager", "");
1427  if (!dsm.IsNull()) {
1428  // Get plugin manager to load the appropriate TDataSetManager
1429  if (gROOT->GetPluginManager()) {
1430  // Find the appropriate handler
1431  h = gROOT->GetPluginManager()->FindHandler("TDataSetManager", dsm);
1432  if (h && h->LoadPlugin() != -1) {
1433  // make instance of the dataset manager
1434  fDataSetManager =
1435  reinterpret_cast<TDataSetManager*>(h->ExecPlugin(3, group.Data(),
1436  user.Data(), dsm.Data()));
1437  }
1438  }
1439  }
1441  Warning("InitDataSetManager", "dataset manager plug-in initialization failed");
1443  }
1444 
1445  // If no valid dataset manager has been created we instantiate the default one
1446  if (!fDataSetManager) {
1447  TString opts("Av:");
1448  TString dsetdir = gEnv->GetValue("ProofServ.DataSetDir", "");
1449  if (dsetdir.IsNull()) {
1450  // Use the default in the sandbox
1451  dsetdir = fDataSetDir;
1452  opts += "Sb:";
1453  }
1454  // Find the appropriate handler
1455  if (!h) {
1456  h = gROOT->GetPluginManager()->FindHandler("TDataSetManager", "file");
1457  if (h && h->LoadPlugin() == -1) h = 0;
1458  }
1459  if (h) {
1460  // make instance of the dataset manager
1461  fDataSetManager = reinterpret_cast<TDataSetManager*>(h->ExecPlugin(3,
1462  group.Data(), user.Data(),
1463  Form("dir:%s opt:%s", dsetdir.Data(), opts.Data())));
1464  }
1466  Warning("InitDataSetManager", "default dataset manager plug-in initialization failed");
1468  }
1469  }
1470 
1471  if (gDebug > 0 && fDataSetManager) {
1472  Info("InitDataSetManager", "datasetmgr Cq: %d, Ar: %d, Av: %d, Ti: %d, Sb: %d",
1478  }
1479 
1480  // Dataset manager for staging requests
1481  TString dsReqCfg = gEnv->GetValue("Proof.DataSetStagingRequests", "");
1482  if (!dsReqCfg.IsNull()) {
1483  TPMERegexp reReqDir("(^| )(dir:)?([^ ]+)( |$)");
1484 
1485  if (reReqDir.Match(dsReqCfg) == 5) {
1486  TString dsDirFmt;
1487  dsDirFmt.Form("dir:%s perms:open", reReqDir[3].Data());
1488  fDataSetStgRepo = new TDataSetManagerFile("_stage_", "_stage_", dsDirFmt);
1490  Warning("InitDataSetManager", "failed init of dataset staging requests repository");
1492  }
1493  } else {
1494  Warning("InitDataSetManager", "specify, with [dir:]<path>, a valid path for staging requests");
1495  }
1496  } else if (gDebug > 0) {
1497  Warning("InitDataSetManager", "no repository for staging requests available");
1498  }
1499 
1500  // Done
1501  return (fDataSetManager ? 0 : -1);
1502 }
1503 
1504 ////////////////////////////////////////////////////////////////////////////////
1505 /// List contents of file cache. If all is true show all caches also on
1506 /// slaves. If everything is ok all caches are to be the same.
1507 
1509 {
1510  if (!IsValid()) return;
1511 
1512  Printf("*** Local file cache %s ***", fCacheDir.Data());
1513  gSystem->Exec(Form("%s %s", kLS, fCacheDir.Data()));
1514 }
1515 
1516 ////////////////////////////////////////////////////////////////////////////////
1517 /// Remove files from all file caches.
1518 
1519 void TProofLite::ClearCache(const char *file)
1520 {
1521  if (!IsValid()) return;
1522 
1523  fCacheLock->Lock();
1524  if (!file || strlen(file) <= 0) {
1525  gSystem->Exec(Form("%s %s/*", kRM, fCacheDir.Data()));
1526  } else {
1527  gSystem->Exec(Form("%s %s/%s", kRM, fCacheDir.Data(), file));
1528  }
1529  fCacheLock->Unlock();
1530 }
1531 
1532 ////////////////////////////////////////////////////////////////////////////////
1533 /// Copy the specified macro in the cache directory. The macro file is
1534 /// uploaded if new or updated. If existing, the corresponding header
1535 /// basename(macro).h or .hh, is also uploaded. For the other arguments
1536 /// see TProof::Load().
1537 /// Returns 0 in case of success and -1 in case of error.
1538 
1539 Int_t TProofLite::Load(const char *macro, Bool_t notOnClient, Bool_t uniqueOnly,
1540  TList *wrks)
1541 {
1542  if (!IsValid()) return -1;
1543 
1544  if (!macro || !macro[0]) {
1545  Error("Load", "need to specify a macro name");
1546  return -1;
1547  }
1548 
1549  TString macs(macro), mac;
1550  Int_t from = 0;
1551  while (macs.Tokenize(mac, from, ",")) {
1552  if (IsIdle()) {
1553  if (CopyMacroToCache(mac) < 0) return -1;
1554  } else {
1555  // The name
1556  TString macn = gSystem->BaseName(mac);
1557  macn.Remove(macn.Last('.'));
1558  // Relevant pointers
1559  TList cachedFiles;
1560  TString cacheDir = fCacheDir;
1561  gSystem->ExpandPathName(cacheDir);
1562  void * dirp = gSystem->OpenDirectory(cacheDir);
1563  if (dirp) {
1564  const char *e = 0;
1565  while ((e = gSystem->GetDirEntry(dirp))) {
1566  if (!strncmp(e, macn.Data(), macn.Length())) {
1567  TString fncache = Form("%s/%s", cacheDir.Data(), e);
1568  cachedFiles.Add(new TObjString(fncache.Data()));
1569  }
1570  }
1571  gSystem->FreeDirectory(dirp);
1572  }
1573  }
1574  }
1575 
1576  return TProof::Load(macro, notOnClient, uniqueOnly, wrks);
1577 }
1578 
1579 ////////////////////////////////////////////////////////////////////////////////
1580 /// Copy a macro, and its possible associated .h[h] file,
1581 /// to the cache directory, from where the workers can get the file.
1582 /// If headerRequired is 1, return -1 in case the header is not found.
1583 /// If headerRequired is 0, try to copy header too.
1584 /// If headerRequired is -1, don't look for header, only copy macro.
1585 /// If the selector pionter is not 0, consider the macro to be a selector
1586 /// and try to load the selector and set it to the pointer.
1587 /// The mask 'opt' is an or of ESendFileOpt:
1588 /// kCpBin (0x8) Retrieve from the cache the binaries associated
1589 /// with the file
1590 /// kCp (0x10) Retrieve the files from the cache
1591 /// Return -1 in case of error, 0 otherwise.
1592 
1593 Int_t TProofLite::CopyMacroToCache(const char *macro, Int_t headerRequired,
1594  TSelector **selector, Int_t opt, TList *)
1595 {
1596  // Relevant pointers
1597  TString cacheDir = fCacheDir;
1598  gSystem->ExpandPathName(cacheDir);
1599  TProofLockPath *cacheLock = fCacheLock;
1600 
1601  // Split out the aclic mode, if any
1602  TString name = macro;
1603  TString acmode, args, io;
1604  name = gSystem->SplitAclicMode(name, acmode, args, io);
1605 
1606  PDB(kGlobal,1)
1607  Info("CopyMacroToCache", "enter: names: %s, %s", macro, name.Data());
1608 
1609  // Make sure that the file exists
1610  if (gSystem->AccessPathName(name, kReadPermission)) {
1611  Error("CopyMacroToCache", "file %s not found or not readable", name.Data());
1612  return -1;
1613  }
1614 
1615  // Update the macro path
1617  TString np(gSystem->DirName(name));
1618  if (!np.IsNull()) {
1619  np += ":";
1620  if (!mp.BeginsWith(np) && !mp.Contains(":"+np)) {
1621  Int_t ip = (mp.BeginsWith(".:")) ? 2 : 0;
1622  mp.Insert(ip, np);
1623  TROOT::SetMacroPath(mp);
1624  PDB(kGlobal,1)
1625  Info("CopyMacroToCache", "macro path set to '%s'", TROOT::GetMacroPath());
1626  }
1627  }
1628 
1629  // Check the header file
1630  Int_t dot = name.Last('.');
1631  const char *hext[] = { ".h", ".hh", "" };
1632  TString hname, checkedext;
1633  Int_t i = 0;
1634  while (strlen(hext[i]) > 0) {
1635  hname = name(0, dot);
1636  hname += hext[i];
1637  if (!gSystem->AccessPathName(hname, kReadPermission))
1638  break;
1639  if (!checkedext.IsNull()) checkedext += ",";
1640  checkedext += hext[i];
1641  hname = "";
1642  i++;
1643  }
1644  if (hname.IsNull() && headerRequired == 1) {
1645  Error("CopyMacroToCache", "header file for %s not found or not readable "
1646  "(checked extensions: %s)", name.Data(), checkedext.Data());
1647  return -1;
1648  }
1649  if (headerRequired < 0)
1650  hname = "";
1651 
1652  cacheLock->Lock();
1653 
1654  // Check these files with those in the cache (if any)
1655  Bool_t useCacheBinaries = kFALSE;
1656  TString cachedname = Form("%s/%s", cacheDir.Data(), gSystem->BaseName(name));
1657  TString cachedhname;
1658  if (!hname.IsNull())
1659  cachedhname = Form("%s/%s", cacheDir.Data(), gSystem->BaseName(hname));
1660  if (!gSystem->AccessPathName(cachedname, kReadPermission)) {
1661  TMD5 *md5 = TMD5::FileChecksum(name);
1662  TMD5 *md5cache = TMD5::FileChecksum(cachedname);
1663  if (md5 && md5cache && (*md5 == *md5cache))
1664  useCacheBinaries = kTRUE;
1665  if (!hname.IsNull()) {
1666  if (!gSystem->AccessPathName(cachedhname, kReadPermission)) {
1667  TMD5 *md5h = TMD5::FileChecksum(hname);
1668  TMD5 *md5hcache = TMD5::FileChecksum(cachedhname);
1669  if (md5h && md5hcache && (*md5h != *md5hcache))
1670  useCacheBinaries = kFALSE;
1671  SafeDelete(md5h);
1672  SafeDelete(md5hcache);
1673  }
1674  }
1675  SafeDelete(md5);
1676  SafeDelete(md5cache);
1677  }
1678 
1679  // Create version file name template
1680  TString vername(Form(".%s", name.Data()));
1681  dot = vername.Last('.');
1682  if (dot != kNPOS)
1683  vername.Remove(dot);
1684  vername += ".binversion";
1685  Bool_t savever = kFALSE;
1686 
1687  // Check binary version
1688  if (useCacheBinaries) {
1689  TString v, r;
1690  FILE *f = fopen(Form("%s/%s", cacheDir.Data(), vername.Data()), "r");
1691  if (f) {
1692  v.Gets(f);
1693  r.Gets(f);
1694  fclose(f);
1695  }
1696  if (!f || v != gROOT->GetVersion() || r != gROOT->GetGitCommit())
1697  useCacheBinaries = kFALSE;
1698  }
1699 
1700  // Create binary name template
1701  TString binname = gSystem->BaseName(name);
1702  dot = binname.Last('.');
1703  if (dot != kNPOS)
1704  binname.Replace(dot,1,"_");
1705  TString pcmname = TString::Format("%s_ACLiC_dict_rdict.pcm", binname.Data());
1706  binname += ".";
1707 
1708  FileStat_t stlocal, stcache;
1709  void *dirp = 0;
1710  if (useCacheBinaries) {
1711  // Loop over binaries in the cache and copy them locally if newer then the local
1712  // versions or there is no local version
1713  dirp = gSystem->OpenDirectory(cacheDir);
1714  if (dirp) {
1715  const char *e = 0;
1716  while ((e = gSystem->GetDirEntry(dirp))) {
1717  if (!strncmp(e, binname.Data(), binname.Length()) ||
1718  !strncmp(e, pcmname.Data(), pcmname.Length())) {
1719  TString fncache = Form("%s/%s", cacheDir.Data(), e);
1720  Bool_t docp = kTRUE;
1721  if (!gSystem->GetPathInfo(fncache, stcache)) {
1722  Int_t rc = gSystem->GetPathInfo(e, stlocal);
1723  if (rc == 0 && (stlocal.fMtime >= stcache.fMtime))
1724  docp = kFALSE;
1725  // Copy the file, if needed
1726  if (docp) {
1727  gSystem->Exec(Form("%s %s", kRM, e));
1728  PDB(kGlobal,2)
1729  Info("CopyMacroToCache",
1730  "retrieving %s from cache", fncache.Data());
1731  gSystem->Exec(Form("%s %s %s", kCP, fncache.Data(), e));
1732  }
1733  }
1734  }
1735  }
1736  gSystem->FreeDirectory(dirp);
1737  }
1738  }
1739  cacheLock->Unlock();
1740 
1741  if (selector) {
1742  // Now init the selector in optimized way
1743  if (!(*selector = TSelector::GetSelector(macro))) {
1744  Error("CopyMacroToCache", "could not create a selector from %s", macro);
1745  return -1;
1746  }
1747  }
1748 
1749  cacheLock->Lock();
1750 
1751  TList *cachedFiles = new TList;
1752  // Save information in the cache now for later usage
1753  dirp = gSystem->OpenDirectory(".");
1754  if (dirp) {
1755  const char *e = 0;
1756  while ((e = gSystem->GetDirEntry(dirp))) {
1757  if (!strncmp(e, binname.Data(), binname.Length()) ||
1758  !strncmp(e, pcmname.Data(), pcmname.Length())) {
1759  Bool_t docp = kTRUE;
1760  if (!gSystem->GetPathInfo(e, stlocal)) {
1761  TString fncache = Form("%s/%s", cacheDir.Data(), e);
1762  Int_t rc = gSystem->GetPathInfo(fncache, stcache);
1763  if (rc == 0 && (stlocal.fMtime <= stcache.fMtime))
1764  docp = kFALSE;
1765  // Copy the file, if needed
1766  if (docp) {
1767  gSystem->Exec(Form("%s %s", kRM, fncache.Data()));
1768  PDB(kGlobal,2)
1769  Info("CopyMacroToCache","caching %s ...", e);
1770  gSystem->Exec(Form("%s %s %s", kCP, e, fncache.Data()));
1771  savever = kTRUE;
1772  }
1773  if (opt & kCpBin)
1774  cachedFiles->Add(new TObjString(fncache.Data()));
1775  }
1776  }
1777  }
1778  gSystem->FreeDirectory(dirp);
1779  }
1780 
1781  // Save binary version if requested
1782  if (savever) {
1783  FILE *f = fopen(Form("%s/%s", cacheDir.Data(), vername.Data()), "w");
1784  if (f) {
1785  fputs(gROOT->GetVersion(), f);
1786  fputs(Form("\n%s", gROOT->GetGitCommit()), f);
1787  fclose(f);
1788  }
1789  }
1790 
1791  // Save also the selector info, if needed
1792  if (!useCacheBinaries) {
1793  gSystem->Exec(Form("%s %s", kRM, cachedname.Data()));
1794  PDB(kGlobal,2)
1795  Info("CopyMacroToCache","caching %s ...", name.Data());
1796  gSystem->Exec(Form("%s %s %s", kCP, name.Data(), cachedname.Data()));
1797  if (!hname.IsNull()) {
1798  gSystem->Exec(Form("%s %s", kRM, cachedhname.Data()));
1799  PDB(kGlobal,2)
1800  Info("CopyMacroToCache","caching %s ...", hname.Data());
1801  gSystem->Exec(Form("%s %s %s", kCP, hname.Data(), cachedhname.Data()));
1802  }
1803  }
1804  if (opt & kCp) {
1805  cachedFiles->Add(new TObjString(cachedname.Data()));
1806  if (!hname.IsNull())
1807  cachedFiles->Add(new TObjString(cachedhname.Data()));
1808  }
1809 
1810  cacheLock->Unlock();
1811 
1812  cachedFiles->SetOwner();
1813  delete cachedFiles;
1814 
1815  return 0;
1816 }
1817 
1818 ////////////////////////////////////////////////////////////////////////////////
1819 /// Remove old sessions dirs keep at most 'Proof.MaxOldSessions' (default 10)
1820 
1822 {
1823  Int_t maxold = gEnv->GetValue("Proof.MaxOldSessions", 1);
1824 
1825  if (maxold < 0) return 0;
1826 
1827  TSortedList *olddirs = new TSortedList(kFALSE);
1828 
1829  TString sandbox = gSystem->DirName(fWorkDir.Data());
1830 
1831  void *dirp = gSystem->OpenDirectory(sandbox);
1832  if (dirp) {
1833  const char *e = 0;
1834  while ((e = gSystem->GetDirEntry(dirp))) {
1835  if (!strncmp(e, "session-", 8) && !strstr(e, GetName())) {
1836  TString d(e);
1837  Int_t i = d.Last('-');
1838  if (i != kNPOS) d.Remove(i);
1839  i = d.Last('-');
1840  if (i != kNPOS) d.Remove(0,i+1);
1841  TString path = Form("%s/%s", sandbox.Data(), e);
1842  olddirs->Add(new TNamed(d, path));
1843  }
1844  }
1845  gSystem->FreeDirectory(dirp);
1846  }
1847 
1848  // Clean it up, if required
1849  Bool_t notify = kTRUE;
1850  while (olddirs->GetSize() > maxold) {
1851  if (notify && gDebug > 0)
1852  Printf("Cleaning sandbox at: %s", sandbox.Data());
1853  notify = kFALSE;
1854  TNamed *n = (TNamed *) olddirs->Last();
1855  if (n) {
1856  gSystem->Exec(Form("%s %s", kRM, n->GetTitle()));
1857  olddirs->Remove(n);
1858  delete n;
1859  }
1860  }
1861 
1862  // Cleanup
1863  olddirs->SetOwner();
1864  delete olddirs;
1865 
1866  // Done
1867  return 0;
1868 }
1869 
1870 ////////////////////////////////////////////////////////////////////////////////
1871 /// Get the list of queries.
1872 
1874 {
1875  Bool_t all = ((strchr(opt,'A') || strchr(opt,'a'))) ? kTRUE : kFALSE;
1876 
1877  TList *ql = new TList;
1878  Int_t ntot = 0, npre = 0, ndraw= 0;
1879  if (fQMgr) {
1880  if (all) {
1881  // Rescan
1882  TString qdir = fQueryDir;
1883  Int_t idx = qdir.Index("session-");
1884  if (idx != kNPOS)
1885  qdir.Remove(idx);
1886  fQMgr->ScanPreviousQueries(qdir);
1887  // Gather also information about previous queries, if any
1888  if (fQMgr->PreviousQueries()) {
1889  TIter nxq(fQMgr->PreviousQueries());
1890  TProofQueryResult *pqr = 0;
1891  while ((pqr = (TProofQueryResult *)nxq())) {
1892  ntot++;
1893  pqr->fSeqNum = ntot;
1894  ql->Add(pqr);
1895  }
1896  }
1897  }
1898 
1899  npre = ntot;
1900  if (fQMgr->Queries()) {
1901  // Add info about queries in this session
1902  TIter nxq(fQMgr->Queries());
1903  TProofQueryResult *pqr = 0;
1904  TQueryResult *pqm = 0;
1905  while ((pqr = (TProofQueryResult *)nxq())) {
1906  ntot++;
1907  if ((pqm = pqr->CloneInfo())) {
1908  pqm->fSeqNum = ntot;
1909  ql->Add(pqm);
1910  } else {
1911  Warning("GetListOfQueries", "unable to clone TProofQueryResult '%s:%s'",
1912  pqr->GetName(), pqr->GetTitle());
1913  }
1914  }
1915  }
1916  // Number of draw queries
1917  ndraw = fQMgr->DrawQueries();
1918  }
1919 
1920  fOtherQueries = npre;
1921  fDrawQueries = ndraw;
1922  if (fQueries) {
1923  fQueries->Delete();
1924  delete fQueries;
1925  fQueries = 0;
1926  }
1927  fQueries = ql;
1928 
1929  // This should have been filled by now
1930  return fQueries;
1931 }
1932 
1933 ////////////////////////////////////////////////////////////////////////////////
1934 /// Register the 'dataSet' on the cluster under the current
1935 /// user, group and the given 'dataSetName'.
1936 /// Fails if a dataset named 'dataSetName' already exists, unless 'optStr'
1937 /// contains 'O', in which case the old dataset is overwritten.
1938 /// If 'optStr' contains 'V' the dataset files are verified (default no
1939 /// verification).
1940 /// Returns kTRUE on success.
1941 
1943  TFileCollection *dataSet, const char* optStr)
1944 {
1945  if (!fDataSetManager) {
1946  Info("RegisterDataSet", "dataset manager not available");
1947  return kFALSE;
1948  }
1949 
1950  if (!uri || strlen(uri) <= 0) {
1951  Info("RegisterDataSet", "specifying a dataset name is mandatory");
1952  return kFALSE;
1953  }
1954 
1955  Bool_t parallelverify = kFALSE;
1956  TString sopt(optStr);
1957  if (sopt.Contains("V") && !sopt.Contains("S")) {
1958  // We do verification in parallel later on; just register for now
1959  parallelverify = kTRUE;
1960  sopt.ReplaceAll("V", "");
1961  }
1962  // This would screw up things remotely, make sure is not there
1963  sopt.ReplaceAll("S", "");
1964 
1965  Bool_t result = kTRUE;
1967  // Check the list
1968  if (!dataSet || dataSet->GetList()->GetSize() == 0) {
1969  Error("RegisterDataSet", "can not save an empty list.");
1970  result = kFALSE;
1971  }
1972  // Register the dataset (quota checks are done inside here)
1973  result = (fDataSetManager->RegisterDataSet(uri, dataSet, sopt) == 0)
1974  ? kTRUE : kFALSE;
1975  } else {
1976  Info("RegisterDataSet", "dataset registration not allowed");
1977  result = kFALSE;
1978  }
1979 
1980  if (!result)
1981  Error("RegisterDataSet", "dataset was not saved");
1982 
1983  // If old server or not verifying in parallel we are done
1984  if (!parallelverify) return result;
1985 
1986  // If we are here it means that we will verify in parallel
1987  sopt += "V";
1988  if (VerifyDataSet(uri, sopt) < 0){
1989  Error("RegisterDataSet", "problems verifying dataset '%s'", uri);
1990  return kFALSE;
1991  }
1992 
1993  // Done
1994  return kTRUE;
1995 }
1996 
1997 ////////////////////////////////////////////////////////////////////////////////
1998 /// Set/Change the name of the default tree. The tree name may contain
1999 /// subdir specification in the form "subdir/name".
2000 /// Returns 0 on success, -1 otherwise.
2001 
2002 Int_t TProofLite::SetDataSetTreeName(const char *dataset, const char *treename)
2003 {
2004  if (!fDataSetManager) {
2005  Info("ExistsDataSet", "dataset manager not available");
2006  return kFALSE;
2007  }
2008 
2009  if (!dataset || strlen(dataset) <= 0) {
2010  Info("SetDataSetTreeName", "specifying a dataset name is mandatory");
2011  return -1;
2012  }
2013 
2014  if (!treename || strlen(treename) <= 0) {
2015  Info("SetDataSetTreeName", "specifying a tree name is mandatory");
2016  return -1;
2017  }
2018 
2019  TUri uri(dataset);
2020  TString fragment(treename);
2021  if (!fragment.BeginsWith("/")) fragment.Insert(0, "/");
2022  uri.SetFragment(fragment);
2023 
2024  return fDataSetManager->ScanDataSet(uri.GetUri().Data(),
2026 }
2027 
2028 ////////////////////////////////////////////////////////////////////////////////
2029 /// Returns kTRUE if 'dataset' described by 'uri' exists, kFALSE otherwise
2030 
2032 {
2033  if (!fDataSetManager) {
2034  Info("ExistsDataSet", "dataset manager not available");
2035  return kFALSE;
2036  }
2037 
2038  if (!uri || strlen(uri) <= 0) {
2039  Error("ExistsDataSet", "dataset name missing");
2040  return kFALSE;
2041  }
2042 
2043  // Check if the dataset exists
2044  return fDataSetManager->ExistsDataSet(uri);
2045 }
2046 
2047 ////////////////////////////////////////////////////////////////////////////////
2048 /// lists all datasets that match given uri
2049 
2050 TMap *TProofLite::GetDataSets(const char *uri, const char *srvex)
2051 {
2052  if (!fDataSetManager) {
2053  Info("GetDataSets", "dataset manager not available");
2054  return (TMap *)0;
2055  }
2056 
2057  // Get the datasets and return the map
2058  if (srvex && strlen(srvex) > 0) {
2059  return fDataSetManager->GetSubDataSets(uri, srvex);
2060  } else {
2062  return fDataSetManager->GetDataSets(uri, opt);
2063  }
2064 }
2065 
2066 ////////////////////////////////////////////////////////////////////////////////
2067 /// Shows datasets in locations that match the uri
2068 /// By default shows the user's datasets and global ones
2069 
2070 void TProofLite::ShowDataSets(const char *uri, const char *opt)
2071 {
2072  if (!fDataSetManager) {
2073  Info("GetDataSet", "dataset manager not available");
2074  return;
2075  }
2076 
2077  fDataSetManager->ShowDataSets(uri, opt);
2078 }
2079 
2080 ////////////////////////////////////////////////////////////////////////////////
2081 /// Get a list of TFileInfo objects describing the files of the specified
2082 /// dataset.
2083 
2084 TFileCollection *TProofLite::GetDataSet(const char *uri, const char *)
2085 {
2086  if (!fDataSetManager) {
2087  Info("GetDataSet", "dataset manager not available");
2088  return (TFileCollection *)0;
2089  }
2090 
2091  if (!uri || strlen(uri) <= 0) {
2092  Info("GetDataSet", "specifying a dataset name is mandatory");
2093  return 0;
2094  }
2095 
2096  // Return the list
2097  return fDataSetManager->GetDataSet(uri);
2098 }
2099 
2100 ////////////////////////////////////////////////////////////////////////////////
2101 /// Remove the specified dataset from the PROOF cluster.
2102 /// Files are not deleted.
2103 
2104 Int_t TProofLite::RemoveDataSet(const char *uri, const char *)
2105 {
2106  if (!fDataSetManager) {
2107  Info("RemoveDataSet", "dataset manager not available");
2108  return -1;
2109  }
2110 
2112  if (!fDataSetManager->RemoveDataSet(uri)) {
2113  // Failure
2114  return -1;
2115  }
2116  } else {
2117  Info("RemoveDataSet", "dataset creation / removal not allowed");
2118  return -1;
2119  }
2120 
2121  // Done
2122  return 0;
2123 }
2124 
2125 ////////////////////////////////////////////////////////////////////////////////
2126 /// Allows users to request staging of a particular dataset. Requests are
2127 /// saved in a special dataset repository and must be honored by the endpoint.
2128 /// This is the special PROOF-Lite re-implementation of the TProof function
2129 /// and includes code originally implemented in TProofServ.
2130 
2132 {
2133  if (!dataset) {
2134  Error("RequestStagingDataSet", "invalid dataset specified");
2135  return kFALSE;
2136  }
2137 
2138  if (!fDataSetStgRepo) {
2139  Error("RequestStagingDataSet", "no dataset staging request repository available");
2140  return kFALSE;
2141  }
2142 
2143  TString dsUser, dsGroup, dsName, dsTree;
2144 
2145  // Transform input URI in a valid dataset name
2146  TString validUri = dataset;
2147  while (fReInvalid->Substitute(validUri, "_")) {}
2148 
2149  // Check if dataset exists beforehand: if it does, staging has already been requested
2150  if (fDataSetStgRepo->ExistsDataSet(validUri.Data())) {
2151  Warning("RequestStagingDataSet", "staging of %s already requested", dataset);
2152  return kFALSE;
2153  }
2154 
2155  // Try to get dataset from current manager
2157  if (!fc || (fc->GetNFiles() == 0)) {
2158  Error("RequestStagingDataSet", "empty dataset or no dataset returned");
2159  if (fc) delete fc;
2160  return kFALSE;
2161  }
2162 
2163  // Reset all staged bits and remove unnecessary URLs (all but last)
2164  TIter it(fc->GetList());
2165  TFileInfo *fi;
2166  while ((fi = dynamic_cast<TFileInfo *>(it.Next()))) {
2168  Int_t nToErase = fi->GetNUrls() - 1;
2169  for (Int_t i=0; i<nToErase; i++)
2170  fi->RemoveUrlAt(0);
2171  }
2172 
2173  fc->Update(); // absolutely necessary
2174 
2175  // Save request
2176  fDataSetStgRepo->ParseUri(validUri, &dsGroup, &dsUser, &dsName);
2177  if (fDataSetStgRepo->WriteDataSet(dsGroup, dsUser, dsName, fc) == 0) {
2178  // Error, can't save dataset
2179  Error("RequestStagingDataSet", "can't register staging request for %s", dataset);
2180  delete fc;
2181  return kFALSE;
2182  }
2183 
2184  Info("RequestStagingDataSet", "Staging request registered for %s", dataset);
2185  delete fc;
2186 
2187  return kTRUE;
2188 }
2189 
2190 ////////////////////////////////////////////////////////////////////////////////
2191 /// Cancels a dataset staging request. Returns kTRUE on success, kFALSE on
2192 /// failure. Dataset not found equals to a failure. PROOF-Lite
2193 /// re-implementation of the equivalent function in TProofServ.
2194 
2196 {
2197  if (!dataset) {
2198  Error("CancelStagingDataSet", "invalid dataset specified");
2199  return kFALSE;
2200  }
2201 
2202  if (!fDataSetStgRepo) {
2203  Error("CancelStagingDataSet", "no dataset staging request repository available");
2204  return kFALSE;
2205  }
2206 
2207  // Transform URI in a valid dataset name
2208  TString validUri = dataset;
2209  while (fReInvalid->Substitute(validUri, "_")) {}
2210 
2211  if (!fDataSetStgRepo->RemoveDataSet(validUri.Data()))
2212  return kFALSE;
2213 
2214  return kTRUE;
2215 }
2216 
2217 ////////////////////////////////////////////////////////////////////////////////
2218 /// Obtains a TFileCollection showing the staging status of the specified
2219 /// dataset. A valid dataset manager and dataset staging requests repository
2220 /// must be present on the endpoint. PROOF-Lite version of the equivalent
2221 /// function from TProofServ.
2222 
2224 {
2225  if (!dataset) {
2226  Error("GetStagingStatusDataSet", "invalid dataset specified");
2227  return 0;
2228  }
2229 
2230  if (!fDataSetStgRepo) {
2231  Error("GetStagingStatusDataSet", "no dataset staging request repository available");
2232  return 0;
2233  }
2234 
2235  // Transform URI in a valid dataset name
2236  TString validUri = dataset;
2237  while (fReInvalid->Substitute(validUri, "_")) {}
2238 
2239  // Get the list
2241  if (!fc) {
2242  // No such dataset (not an error)
2243  Info("GetStagingStatusDataSet", "no pending staging request for %s", dataset);
2244  return 0;
2245  }
2246 
2247  // Dataset found: return it (must be cleaned by caller)
2248  return fc;
2249 }
2250 
2251 ////////////////////////////////////////////////////////////////////////////////
2252 /// Verify if all files in the specified dataset are available.
2253 /// Print a list and return the number of missing files.
2254 
2255 Int_t TProofLite::VerifyDataSet(const char *uri, const char *optStr)
2256 {
2257  if (!fDataSetManager) {
2258  Info("VerifyDataSet", "dataset manager not available");
2259  return -1;
2260  }
2261 
2262  Int_t rc = -1;
2263  TString sopt(optStr);
2264  if (sopt.Contains("S")) {
2265 
2267  rc = fDataSetManager->ScanDataSet(uri);
2268  } else {
2269  Info("VerifyDataSet", "dataset verification not allowed");
2270  rc = -1;
2271  }
2272  return rc;
2273  }
2274 
2275  // Done
2276  return VerifyDataSetParallel(uri, optStr);
2277 }
2278 
2279 ////////////////////////////////////////////////////////////////////////////////
2280 /// Clear the content of the dataset cache, if any (matching 'dataset', if defined).
2281 
2282 void TProofLite::ClearDataSetCache(const char *dataset)
2283 {
2285  // Done
2286  return;
2287 }
2288 
2289 ////////////////////////////////////////////////////////////////////////////////
2290 /// Display the content of the dataset cache, if any (matching 'dataset', if defined).
2291 
2292 void TProofLite::ShowDataSetCache(const char *dataset)
2293 {
2294  // For PROOF-Lite act locally
2295  if (fDataSetManager) fDataSetManager->ShowCache(dataset);
2296  // Done
2297  return;
2298 }
2299 
2300 ////////////////////////////////////////////////////////////////////////////////
2301 /// Make sure that the input data objects are available to the workers in a
2302 /// dedicated file in the cache; the objects are taken from the dedicated list
2303 /// and / or the specified file.
2304 /// If the fInputData is empty the specified file is sent over.
2305 /// If there is no specified file, a file named "inputdata.root" is created locally
2306 /// with the content of fInputData and sent over to the master.
2307 /// If both fInputData and the specified file are not empty, a copy of the file
2308 /// is made locally and augmented with the content of fInputData.
2309 
2311 {
2312  // Prepare the file
2313  TString dataFile;
2314  PrepareInputDataFile(dataFile);
2315 
2316  // Make sure it is in the cache, if not empty
2317  if (dataFile.Length() > 0) {
2318 
2319  if (!dataFile.BeginsWith(fCacheDir)) {
2320  // Destination
2321  TString dst;
2322  dst.Form("%s/%s", fCacheDir.Data(), gSystem->BaseName(dataFile));
2323  // Remove it first if it exists
2324  if (!gSystem->AccessPathName(dst))
2325  gSystem->Unlink(dst);
2326  // Copy the file
2327  if (gSystem->CopyFile(dataFile, dst) != 0)
2328  Warning("SendInputDataFile", "problems copying '%s' to '%s'",
2329  dataFile.Data(), dst.Data());
2330  }
2331 
2332  // Set the name in the input list so that the workers can find it
2333  AddInput(new TNamed("PROOF_InputDataFile", Form("%s", gSystem->BaseName(dataFile))));
2334  }
2335 }
2336 
2337 ////////////////////////////////////////////////////////////////////////////////
2338 /// Handle remove request.
2339 
2340 Int_t TProofLite::Remove(const char *ref, Bool_t all)
2341 {
2342  PDB(kGlobal, 1)
2343  Info("Remove", "Enter: %s, %d", ref, all);
2344 
2345  if (all) {
2346  // Remove also local copies, if any
2347  if (fPlayer)
2348  fPlayer->RemoveQueryResult(ref);
2349  }
2350 
2351  TString queryref(ref);
2352 
2353  if (queryref == "cleanupdir") {
2354 
2355  // Cleanup previous sessions results
2356  Int_t nd = (fQMgr) ? fQMgr->CleanupQueriesDir() : -1;
2357 
2358  // Notify
2359  Info("Remove", "%d directories removed", nd);
2360  // We are done
2361  return 0;
2362  }
2363 
2364 
2365  if (fQMgr) {
2366  TProofLockPath *lck = 0;
2367  if (fQMgr->LockSession(queryref, &lck) == 0) {
2368 
2369  // Remove query
2370  fQMgr->RemoveQuery(queryref, 0);
2371 
2372  // Unlock and remove the lock file
2373  if (lck) {
2374  gSystem->Unlink(lck->GetName());
2375  SafeDelete(lck);
2376  }
2377 
2378  // We are done
2379  return 0;
2380  }
2381  } else {
2382  Warning("Remove", "query result manager undefined!");
2383  }
2384 
2385  // Notify failure
2386  Info("Remove",
2387  "query %s could not be removed (unable to lock session)", queryref.Data());
2388 
2389  // Done
2390  return -1;
2391 }
2392 
2393 ////////////////////////////////////////////////////////////////////////////////
2394 /// Creates a tree header (a tree with nonexisting files) object for
2395 /// the DataSet.
2396 
2398 {
2399  TTree *t = 0;
2400  if (!dset) {
2401  Error("GetTreeHeader", "undefined TDSet");
2402  return t;
2403  }
2404 
2405  dset->Reset();
2406  TDSetElement *e = dset->Next();
2407  Long64_t entries = 0;
2408  TFile *f = 0;
2409  if (!e) {
2410  PDB(kGlobal, 1) Info("GetTreeHeader", "empty TDSet");
2411  } else {
2412  f = TFile::Open(e->GetFileName());
2413  t = 0;
2414  if (f) {
2415  t = (TTree*) f->Get(e->GetObjName());
2416  if (t) {
2417  t->SetMaxVirtualSize(0);
2418  t->DropBaskets();
2419  entries = t->GetEntries();
2420 
2421  // compute #entries in all the files
2422  while ((e = dset->Next()) != 0) {
2423  TFile *f1 = TFile::Open(e->GetFileName());
2424  if (f1) {
2425  TTree *t1 = (TTree*) f1->Get(e->GetObjName());
2426  if (t1) {
2427  entries += t1->GetEntries();
2428  delete t1;
2429  }
2430  delete f1;
2431  }
2432  }
2433  t->SetMaxEntryLoop(entries); // this field will hold the total number of entries ;)
2434  }
2435  }
2436  }
2437  // Done
2438  return t;
2439 }
2440 
2441 ////////////////////////////////////////////////////////////////////////////////
2442 /// Add to the fUniqueSlave list the active slaves that have a unique
2443 /// (user) file system image. This information is used to transfer files
2444 /// only once to nodes that share a file system (an image). Submasters
2445 /// which are not in fUniqueSlaves are put in the fNonUniqueMasters
2446 /// list. That list is used to trigger the transferring of files to
2447 /// the submaster's unique slaves without the need to transfer the file
2448 /// to the submaster.
2449 
2451 {
2452  fUniqueSlaves->Clear();
2457 
2458  if (fActiveSlaves->GetSize() <= 0) return;
2459 
2460  TSlave *wrk = dynamic_cast<TSlave*>(fActiveSlaves->First());
2461  if (!wrk) {
2462  Error("FindUniqueSlaves", "first object in fActiveSlaves not a TSlave: embarrasing!");
2463  return;
2464  }
2465  fUniqueSlaves->Add(wrk);
2466  fAllUniqueSlaves->Add(wrk);
2467  fUniqueMonitor->Add(wrk->GetSocket());
2468  fAllUniqueMonitor->Add(wrk->GetSocket());
2469 
2470  // will be actiavted in Collect()
2473 }
2474 
2475 ////////////////////////////////////////////////////////////////////////////////
2476 /// List contents of the data directory in the sandbox.
2477 /// This is the place where files produced by the client queries are kept
2478 
2480 {
2481  if (!IsValid()) return;
2482 
2483  // Get worker infos
2484  TList *wrki = GetListOfSlaveInfos();
2485  TSlaveInfo *wi = 0;
2486  TIter nxwi(wrki);
2487  while ((wi = (TSlaveInfo *) nxwi())) {
2488  ShowDataDir(wi->GetDataDir());
2489  }
2490 }
2491 
2492 ////////////////////////////////////////////////////////////////////////////////
2493 /// List contents of the data directory 'dirname'
2494 
2495 void TProofLite::ShowDataDir(const char *dirname)
2496 {
2497  if (!dirname) return;
2498 
2499  FileStat_t dirst;
2500  if (gSystem->GetPathInfo(dirname, dirst) != 0) return;
2501  if (!R_ISDIR(dirst.fMode)) return;
2502 
2503  void *dirp = gSystem->OpenDirectory(dirname);
2504  TString fn;
2505  const char *ent = 0;
2506  while ((ent = gSystem->GetDirEntry(dirp))) {
2507  fn.Form("%s/%s", dirname, ent);
2508  FileStat_t st;
2509  if (gSystem->GetPathInfo(fn.Data(), st) == 0) {
2510  if (R_ISREG(st.fMode)) {
2511  Printf("lite:0| %s", fn.Data());
2512  } else if (R_ISREG(st.fMode)) {
2513  ShowDataDir(fn.Data());
2514  }
2515  }
2516  }
2517  // Done
2518  return;
2519 }
2520 
2521 ////////////////////////////////////////////////////////////////////////////////
2522 /// Simulate dynamic addition, for test purposes.
2523 /// Here we decide how many workers to add, we create them and set the
2524 /// environment.
2525 /// This call is called regularly by Collect if the opton is enabled.
2526 /// Returns the number of new workers added, or <0 on errors.
2527 
2529 {
2530  // Max workers
2531  if (fDynamicStartupNMax <= 0) {
2532  SysInfo_t si;
2533  if (gSystem->GetSysInfo(&si) == 0 && si.fCpus > 2) {
2535  } else {
2536  fDynamicStartupNMax = 2;
2537  }
2538  }
2539  if (fNWorkers >= fDynamicStartupNMax) {
2540  // Max reached: disable
2541  Info("PollForNewWorkers", "max reached: %d workers started", fNWorkers);
2543  return 0;
2544  }
2545 
2546  // Number of new workers
2547  Int_t nAdd = (fDynamicStartupStep > 0) ? fDynamicStartupStep : 1;
2548 
2549  // Create a monitor and add the socket to it
2550  TMonitor *mon = new TMonitor;
2551  mon->Add(fServSock);
2552 
2553  TList started;
2554  TSlave *wrk = 0;
2555  Int_t nWrksDone = 0, nWrksTot = -1;
2556  TString fullord;
2557 
2558  nWrksTot = fNWorkers + nAdd;
2559  // Now we create the worker applications which will call us back to finalize
2560  // the setup
2561  Int_t ord = fNWorkers;
2562  for (; ord < nWrksTot; ord++) {
2563 
2564  // Ordinal for this worker server
2565  fullord = Form("0.%d", ord);
2566 
2567  // Create environment files
2568  SetProofServEnv(fullord);
2569 
2570  // Create worker server and add to the list
2571  if ((wrk = CreateSlave("lite", fullord, 100, fImage, fWorkDir)))
2572  started.Add(wrk);
2573 
2574  PDB(kGlobal, 3)
2575  Info("PollForNewWorkers", "additional worker '%s' started", fullord.Data());
2576 
2577  // Notify
2578  NotifyStartUp("Opening connections to workers", ++nWrksDone, nWrksTot);
2579 
2580  } //end of worker loop
2581  fNWorkers = nWrksTot;
2582 
2583  // A list of TSlave objects for workers that are being added
2584  TList *addedWorkers = new TList();
2585  addedWorkers->SetOwner(kFALSE);
2586 
2587  // Wait for call backs
2588  nWrksDone = 0;
2589  nWrksTot = started.GetSize();
2590  Int_t nSelects = 0;
2591  Int_t to = gEnv->GetValue("ProofLite.StartupTimeOut", 5) * 1000;
2592  while (started.GetSize() > 0 && nSelects < nWrksTot) {
2593 
2594  // Wait for activity on the socket for max 5 secs
2595  TSocket *xs = mon->Select(to);
2596 
2597  // Count attempts and check
2598  nSelects++;
2599  if (xs == (TSocket *) -1) continue;
2600 
2601  // Get the connection
2602  TSocket *s = fServSock->Accept();
2603  if (s && s->IsValid()) {
2604  // Receive ordinal
2605  TMessage *msg = 0;
2606  if (s->Recv(msg) < 0) {
2607  Warning("PollForNewWorkers", "problems receiving message from accepted socket!");
2608  } else {
2609  if (msg) {
2610  *msg >> fullord;
2611  // Find who is calling back
2612  if ((wrk = (TSlave *) started.FindObject(fullord))) {
2613  // Remove it from the started list
2614  started.Remove(wrk);
2615 
2616  // Assign tis socket the selected worker
2617  wrk->SetSocket(s);
2618  // Remove socket from global TROOT socket list. Only the TProof object,
2619  // representing all worker sockets, will be added to this list. This will
2620  // ensure the correct termination of all proof servers in case the
2621  // root session terminates.
2623  gROOT->GetListOfSockets()->Remove(s);
2624  }
2625  if (wrk->IsValid()) {
2626  // Set the input handler
2627  wrk->SetInputHandler(new TProofInputHandler(this, wrk->GetSocket()));
2628  // Set fParallel to 1 for workers since they do not
2629  // report their fParallel with a LOG_DONE message
2630  wrk->fParallel = 1;
2631  // Finalize setup of the server
2632  wrk->SetupServ(TSlave::kSlave, 0);
2633  }
2634 
2635  // Monitor good workers
2636  fSlaves->Add(wrk);
2637  if (wrk->IsValid()) {
2638  fActiveSlaves->Add(wrk); // Is this required? Check!
2639  fAllMonitor->Add(wrk->GetSocket());
2640  // Record also in the list for termination
2641  addedWorkers->Add(wrk);
2642  // Notify startup operations
2643  NotifyStartUp("Setting up added worker servers", ++nWrksDone, nWrksTot);
2644  } else {
2645  // Flag as bad
2646  fBadSlaves->Add(wrk);
2647  }
2648  }
2649  } else {
2650  Warning("PollForNewWorkers", "received empty message from accepted socket!");
2651  }
2652  }
2653  }
2654  }
2655 
2656  // Cleanup the monitor and the server socket
2657  mon->DeActivateAll();
2658  delete mon;
2659 
2660  Broadcast(kPROOF_GETSTATS, addedWorkers);
2661  Collect(addedWorkers, fCollectTimeout);
2662 
2663  // Update group view
2664  // SendGroupView();
2665 
2666  // By default go into parallel mode
2667  // SetParallel(-1, 0);
2668  SendCurrentState(addedWorkers);
2669 
2670  // Set worker processing environment
2671  SetupWorkersEnv(addedWorkers, kTRUE);
2672 
2673  // We are adding workers dynamically to an existing process, we
2674  // should invoke a special player's Process() to set only added workers
2675  // to the proper state
2676  if (fPlayer) {
2677  PDB(kGlobal, 3)
2678  Info("PollForNewWorkers", "Will send the PROCESS message to selected workers");
2679  fPlayer->JoinProcess(addedWorkers);
2680  }
2681 
2682  // Cleanup fwhat remained from startup
2683  Collect(addedWorkers);
2684 
2685  // Activate
2686  TIter naw(addedWorkers);
2687  while ((wrk = (TSlave *)naw())) {
2688  fActiveMonitor->Add(wrk->GetSocket());
2689  }
2690  // Cleanup
2691  delete addedWorkers;
2692 
2693  // Done
2694  return nWrksDone;
2695 }
Int_t SetProofServEnv(const char *ord)
Create environment files for worker &#39;ord&#39;.
Definition: TProofLite.cxx:684
void SetQueryRunning(TProofQueryResult *pq)
Set query in running state.
virtual const char * BaseName(const char *pathname)
Base name of a file name. Base name of /user/root is root.
Definition: TSystem.cxx:932
virtual Int_t GetDrawArgs(const char *var, const char *sel, Option_t *opt, TString &selector, TString &objname)=0
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:47
Long64_t GetEntries() const
Definition: TQueryResult.h:122
virtual Bool_t AccessPathName(const char *path, EAccessMode mode=kFileExists)
Returns FALSE if one can access a file using the specified access mode.
Definition: TSystem.cxx:1276
Int_t VerifyDataSet(const char *uri, const char *=0)
Verify if all files in the specified dataset are available.
This class starts a PROOF session on the local machine: no daemons, client and master merged...
Definition: TProofLite.h:40
Bool_t RequestStagingDataSet(const char *dataset)
Allows users to request staging of a particular dataset.
virtual TList * GetInputList() const =0
The PROOF package manager contains tools to manage packages.
Definition: TPackMgr.h:37
virtual Bool_t IsValid() const
Definition: TSocket.h:151
void GetEnabledPackages(TString &packlist)
Method to get a semi-colon separated list with the names of the enabled packages. ...
Definition: TPackMgr.cxx:501
TQueryResultManager * fQMgr
Definition: TProofLite.h:62
virtual int GetPid()
Get process id.
Definition: TSystem.cxx:715
TString fPerfTree
Definition: TProof.h:557
virtual void Delete(Option_t *option="")
Remove all objects from the list AND delete all heap based objects.
Definition: TList.cxx:467
void ActivateAsyncInput()
Activate the a-sync input handler.
Definition: TProof.cxx:4382
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:854
Int_t GetNumberOfActiveSlaves() const
Return number of active slaves, i.e.
Definition: TProof.cxx:1965
void AskParallel()
Ask the for the number of parallel slaves.
Definition: TProof.cxx:2055
void SetPort(Int_t port)
Definition: TUrl.h:91
TPMERegexp * fReInvalid
Definition: TProofLite.h:67
Long64_t GetBytesRead() const
Definition: TProof.h:929
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
Definition: TStopwatch.cxx:110
void ShowDataSetCache(const char *dataset=0)
Display the content of the dataset cache, if any (matching &#39;dataset&#39;, if defined).
virtual const char * GetBuildCompilerVersion() const
Return the build compiler version.
Definition: TSystem.cxx:3784
TMonitor * fAllUniqueMonitor
Definition: TProof.h:486
virtual Int_t ClearCache(const char *uri)
Clear cached information matching uri.
virtual void AddInput(TObject *inp)=0
long long Long64_t
Definition: RtypesCore.h:69
TFileCollection * GetDataSet(const char *uri, const char *=0)
Get a list of TFileInfo objects describing the files of the specified dataset.
auto * m
Definition: textangle.C:8
virtual TDSetElement * Next(Long64_t totalEntries=-1)
Returns next TDSetElement.
Definition: TDSet.cxx:394
void PrepareInputDataFile(TString &dataFile)
Prepare the file with the input data objects to be sent the master; the objects are taken from the de...
Definition: TProof.cxx:9611
Int_t Load(const char *macro, Bool_t notOnClient=kFALSE, Bool_t uniqueOnly=kTRUE, TList *wrks=0)
Copy the specified macro in the cache directory.
void SetPerfTree(const char *pf="perftree.root", Bool_t withWrks=kFALSE)
Enable/Disable saving of the performance tree.
Definition: TProof.cxx:12597
virtual const char * WorkingDirectory()
Return working directory.
Definition: TSystem.cxx:869
static TMD5 * FileChecksum(const char *file)
Returns checksum of specified file.
Definition: TMD5.cxx:474
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
Definition: TUrl.cxx:520
TString fConfDir
Definition: TProof.h:569
Bool_t ExistsDataSet(const char *uri)
Returns kTRUE if &#39;dataset&#39; described by &#39;uri&#39; exists, kFALSE otherwise.
const char *const kCP
Definition: TProof.h:141
const char *const kLS
Definition: TProof.h:143
Int_t fOtherQueries
Definition: TProof.h:523
Collectable string class.
Definition: TObjString.h:28
float Float_t
Definition: RtypesCore.h:53
virtual TVirtualProofPlayer * MakePlayer(const char *player=0, TSocket *s=0)
Construct a TProofPlayer object.
Definition: TProof.cxx:10183
const char Option_t
Definition: RtypesCore.h:62
virtual ~TProofLite()
Destructor.
Definition: TProofLite.cxx:375
Bool_t fSync
Definition: TProof.h:506
virtual TString SplitAclicMode(const char *filename, TString &mode, TString &args, TString &io) const
This method split a filename of the form: ~~~ {.cpp} [path/]macro.C[+|++[k|f|g|O|c|s|d|v|-]][(args)]...
Definition: TSystem.cxx:4124
Bool_t RegisterDataSet(const char *dsName, TFileCollection *ds, const char *opt="")
Register the &#39;dataSet&#39; on the cluster under the current user, group and the given &#39;dataSetName&#39;...
TFileCollection * GetStagingStatusDataSet(const char *dataset)
Obtains a TFileCollection showing the staging status of the specified dataset.
const Ssiz_t kNPOS
Definition: RtypesCore.h:111
TString & ReplaceAll(const TString &s1, const TString &s2)
Definition: TString.h:638
void SetupWorkersEnv(TList *wrks, Bool_t increasingpool=kFALSE)
Set up packages, loaded macros, include and lib paths ...
Definition: TProof.cxx:1512
void SendInputDataFile()
Make sure that the input data objects are available to the workers in a dedicated file in the cache; ...
int GetPathInfo(const char *path, Long_t *id, Long_t *size, Long_t *flags, Long_t *modtime)
Get info about a file: id, size, flags, modification time.
Definition: TSystem.cxx:1374
virtual Long64_t DrawSelect(TDSet *set, const char *varexp, const char *selection, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)=0
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition: TSocket.cxx:822
This class implements a data set to be used for PROOF processing.
Definition: TDSet.h:151
virtual void SetName(const char *name)
Set the name of the TNamed.
Definition: TNamed.cxx:140
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
TH1 * h
Definition: legend2.C:5
void SetParameter(const char *par, const char *value)
Set input list parameter.
Definition: TProof.cxx:9794
virtual Bool_t RemoveDataSet(const char *uri)
Removes the indicated dataset.
The PROOF manager interacts with the PROOF server coordinator to create or destroy a PROOF session...
Definition: TProofMgr.h:43
void SetUrl(const char *url, Bool_t defaultIsFile=kFALSE)
Parse url character string and split in its different subcomponents.
Definition: TUrl.cxx:110
static const TList * GetEnvVars()
Get environemnt variables.
Definition: TProof.cxx:11723
Int_t LockSession(const char *sessiontag, TProofLockPath **lck)
Try locking query area of session tagged sessiontag.
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format...
Definition: TFile.h:46
virtual EExitStatus GetExitStatus() const =0
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
Definition: TList.cxx:689
TList * fAllUniqueSlaves
Definition: TProof.h:482
virtual int MakeDirectory(const char *name)
Make a directory.
Definition: TSystem.cxx:825
virtual void AddSignalHandler(TSignalHandler *sh)
Add a signal handler to list of system signal handlers.
Definition: TSystem.cxx:540
virtual const char * HomeDirectory(const char *userName=0)
Return the user&#39;s home directory.
Definition: TSystem.cxx:885
TString fSelection
Definition: TProofLite.h:58
TString fImage
Definition: TProof.h:570
virtual TFileCollection * GetDataSet(const char *uri, const char *server=0)
Utility function used in various methods for user dataset upload.
Int_t PollForNewWorkers()
Simulate dynamic addition, for test purposes.
virtual TObject * Get(const char *namecycle)
Return pointer to object identified by namecycle.
void SetSocket(TSocket *s)
Definition: TSlave.h:112
#define R__ASSERT(e)
Definition: TError.h:96
#define gROOT
Definition: TROOT.h:402
TString fLogFileName
Definition: TProof.h:511
Ssiz_t Index(const char *pat, Ssiz_t i=0, ECaseCompare cmp=kExact) const
Definition: TString.h:585
R__ALWAYS_INLINE Bool_t TestBit(UInt_t f) const
Definition: TObject.h:172
TProofLockPath * fCacheLock
Definition: TProofLite.h:60
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor&#39;s active list.
Definition: TMonitor.cxx:168
Int_t LoadPlugin()
Load the plugin library for this handler.
virtual void SetCurrentQuery(TQueryResult *q)=0
TList * Queries() const
TList * fQueries
Definition: TProof.h:522
The TEnv class reads config files, by default named .rootrc.
Definition: TEnv.h:125
Basic string class.
Definition: TString.h:125
void ClearDataSetCache(const char *dataset=0)
Clear the content of the dataset cache, if any (matching &#39;dataset&#39;, if defined).
Int_t GetNumberOfSlaves() const
Return number of slaves as described in the config file.
Definition: TProof.cxx:1956
Int_t GetClientProtocol() const
Definition: TProof.h:914
Int_t SetDataSetTreeName(const char *dataset, const char *treename)
Set/Change the name of the default tree.
int Int_t
Definition: RtypesCore.h:41
virtual const char * DirName(const char *pathname)
Return the directory name in pathname.
Definition: TSystem.cxx:1004
bool Bool_t
Definition: RtypesCore.h:59
void NotifyStartUp(const char *action, Int_t done, Int_t tot)
Notify setting-up operation message.
Definition: TProofLite.cxx:667
TQueryResult * CloneInfo()
Return an instance of TQueryResult containing only the local info fields, i.e.
TList * fUniqueSlaves
Definition: TProof.h:481
virtual Bool_t JoinProcess(TList *workers)=0
R__EXTERN TVirtualMutex * gROOTMutex
Definition: TROOT.h:57
static const TString & GetRootSys()
Get the rootsys directory in the installation. Static utility function.
Definition: TROOT.cxx:2821
void SetUser(const char *user)
Definition: TUrl.h:85
TList * fWaitingSlaves
Definition: TProof.h:521
Int_t Broadcast(const TMessage &mess, TList *slaves)
Broadcast a message to all slaves in the specified list.
Definition: TProof.cxx:2453
const char *const kRM
Definition: TProof.h:142
virtual void RemoveAll()
Remove all sockets from the monitor.
Definition: TMonitor.cxx:241
virtual void ShowDataSets(const char *uri="*", const char *opt="")
Prints formatted information about the dataset &#39;uri&#39;.
This class represents a RFC 3986 compatible URI.
Definition: TUri.h:35
Long_t fMtime
Definition: TSystem.h:132
static Int_t fgWrksMax
Definition: TProofLite.h:69
Int_t GetSeqNum() const
Definition: TQueryResult.h:115
R__EXTERN TApplication * gApplication
Definition: TApplication.h:165
virtual void DeActivateAll()
De-activate all activated sockets.
Definition: TMonitor.cxx:302
void ShowData()
List contents of the data directory in the sandbox.
void ShowDataDir(const char *dirname)
List contents of the data directory &#39;dirname&#39;.
void ScanPreviousQueries(const char *dir)
Scan the queries directory for the results of previous queries.
static void ResolveKeywords(TString &fname, const char *path=0)
Replace <ord>, <user>, <u>, <group>, <stag>, <qnum>, <file>, <rver> and <build> placeholders in fname...
TString & Insert(Ssiz_t pos, const char *s)
Definition: TString.h:595
TList * fInputData
Definition: TProof.h:535
Int_t SendCurrentState(ESlaves list=kActive)
Transfer the current state of the master to the active slave servers.
Definition: TProof.cxx:6730
TList * GetListOfElements() const
Definition: TDSet.h:229
TVirtualProofPlayer * fPlayer
Definition: TProof.h:494
Bool_t R_ISREG(Int_t mode)
Definition: TSystem.h:119
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:694
void ResolveKeywords(TString &s, const char *ord, const char *logfile)
Resolve some keywords in &#39;s&#39; <logfilewrk>, <user>, <rootsys>, <cpupin>
Definition: TProofLite.cxx:801
virtual TObject * FindObject(const char *name) const
Delete a TObjLink object.
Definition: TList.cxx:574
TString & Replace(Ssiz_t pos, Ssiz_t n, const char *s)
Definition: TString.h:628
static const char * GetMacroPath()
Get macro search path. Static utility function.
Definition: TROOT.cxx:2699
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=1, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:3950
Int_t fMode
Definition: TSystem.h:128
Bool_t fForkStartup
Definition: TProofLite.h:52
Bool_t fMasterServ
Definition: TProof.h:566
virtual Long64_t Process(TDSet *set, const char *selector, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)=0
Int_t SavePerfTree(const char *pf=0, const char *qref=0)
Save performance information from TPerfStats to file &#39;pf&#39;.
Definition: TProof.cxx:12619
TDataSetManagerFile * fDataSetStgRepo
Definition: TProofLite.h:65
TSignalHandler * fIntHandler
Definition: TProof.h:491
Manages an element of a TDSet.
Definition: TDSet.h:66
Int_t fDrawQueries
Definition: TProof.h:524
TList * fChains
Definition: TProof.h:496
static struct mg_connection * fc(struct mg_context *ctx)
Definition: civetweb.c:1956
Int_t Update(Long64_t avgsize=-1)
Update accumulated information about the elements of the collection (e.g.
virtual const char * GetDirEntry(void *dirp)
Get a directory entry. Returns 0 if no more entries.
Definition: TSystem.cxx:851
#define SafeDelete(p)
Definition: RConfig.h:509
TPluginHandler * fProgressDialog
Definition: TProof.h:492
TList * fBadSlaves
Definition: TProof.h:574
virtual TList * GetOutputList() const =0
virtual int Unlink(const char *name)
Unlink, i.e. remove, a file.
Definition: TSystem.cxx:1357
TString fDataSetDir
Definition: TProofLite.h:49
Bool_t fSendGroupView
list returned by kPROOF_GETSLAVEINFO
Definition: TProof.h:474
void Stop()
Stop the stopwatch.
Definition: TStopwatch.cxx:77
virtual Bool_t IsValid() const
Definition: TSlave.h:150
TObject * GetEntryList() const
Definition: TDSet.h:249
static TString Format(const char *fmt,...)
Static method which formats a string using a printf style format descriptor and return a TString...
Definition: TString.cxx:2365
virtual int GetSysInfo(SysInfo_t *info) const
Returns static system info, like OS type, CPU type, number of CPUs RAM size, etc into the SysInfo_t s...
Definition: TSystem.cxx:2487
#define PDB(mask, level)
Definition: TProofDebug.h:56
void UpdateDialog()
Final update of the progress dialog.
Definition: TProof.cxx:4325
TDataSetManager * fDataSetManager
Definition: TProofLite.h:64
TSlave * CreateSlave(const char *url, const char *ord, Int_t perf, const char *image, const char *workdir)
Create a new TSlave of type TSlave::kSlave.
Definition: TProof.cxx:1831
TSocket * GetSocket() const
Definition: TSlave.h:134
This code implements the MD5 message-digest algorithm.
Definition: TMD5.h:44
TString fCacheDir
Definition: TProofLite.h:47
The TNamed class is the base class for all named ROOT classes.
Definition: TNamed.h:29
Long64_t Process(TDSet *dset, const char *sel, Option_t *o="", Long64_t nent=-1, Long64_t fst=0)
Process a data set (TDSet) using the specified selector (.C) file.
TList * PreviousQueries() const
Int_t GetLogLevel() const
Definition: TProof.h:916
const char *const kPROOF_QueryDir
Definition: TProof.h:128
Int_t Init(const char *masterurl, const char *conffile, const char *confdir, Int_t loglevel, const char *alias=0)
Start the PROOF environment.
Definition: TProofLite.cxx:154
virtual Int_t RegisterDataSet(const char *uri, TFileCollection *dataSet, const char *opt)
Register a dataset, perfoming quota checkings, if needed.
Int_t fParallel
Definition: TSlave.h:97
Bool_t CancelStagingDataSet(const char *dataset)
Cancels a dataset staging request.
virtual const char * Getenv(const char *env)
Get environment variable.
Definition: TSystem.cxx:1638
Int_t Collect(const TSlave *sl, Long_t timeout=-1, Int_t endtype=-1, Bool_t deactonfail=kFALSE)
Collect responses from slave sl.
Definition: TProof.cxx:2647
static Int_t RegisterDataSets(TList *in, TList *out, TDataSetManager *dsm, TString &e)
Register TFileCollections in &#39;out&#39; as datasets according to the rules in &#39;in&#39;.
Int_t ApplyMaxQueries(Int_t mxq)
Scan the queries directory and remove the oldest ones (and relative dirs, if empty) in such a way onl...
A sorted doubly linked list.
Definition: TSortedList.h:28
TString fConfFile
Definition: TProof.h:568
Bool_t EndsWith(const char *pat, ECaseCompare cmp=kExact) const
Return true if string ends with the specified string.
Definition: TString.cxx:2231
virtual UserGroup_t * GetUserInfo(Int_t uid)
Returns all user info in the UserGroup_t structure.
Definition: TSystem.cxx:1574
TQueryResult * GetQueryResult(const char *ref=0)
Return pointer to the full TQueryResult instance owned by the player and referenced by &#39;ref&#39;...
Definition: TProof.cxx:2126
TList * fSlaves
Definition: TProof.h:572
Int_t fNotIdle
Definition: TProof.h:505
virtual void SetOutputList(TList *out, Bool_t adopt=kTRUE)
Set / change the output list.
virtual TSocket * Accept(UChar_t Opt=0)
Accept a connection on a server socket.
virtual const char * TempDirectory() const
Return a user configured or systemwide directory to create temporary files in.
Definition: TSystem.cxx:1458
const Bool_t kSortDescending
Definition: TList.h:38
friend class TProofInputHandler
Definition: TProof.h:324
const char * GetDataDir() const
Definition: TProof.h:229
TList * fInactiveSlaves
Definition: TProof.h:480
A container class for query results.
Definition: TQueryResult.h:36
Int_t fCollectTimeout
Definition: TProof.h:583
TList * GetListOfSlaveInfos()
Returns list of TSlaveInfo&#39;s. In case of error return 0.
Definition: TProof.cxx:2299
Int_t fDynamicStartupNMax
Definition: TProofLite.h:55
static TList * fgProofEnvList
Definition: TProof.h:545
Float_t GetRealTime() const
Definition: TProof.h:930
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
virtual void SetProcessInfo(Long64_t ent, Float_t cpu=0., Long64_t siz=-1, Float_t inittime=0., Float_t proctime=0.)
Set processing info.
TString fQueryDir
Definition: TProofLite.h:48
void SetTermTime(Float_t termtime)
Definition: TQueryResult.h:100
void ParseConfigField(const char *config)
The config file field may contain special instructions which need to be parsed at the beginning...
Definition: TProof.cxx:1031
TString fSandbox
Definition: TProofLite.h:46
Int_t GetNumberOfInactiveSlaves() const
Return number of inactive slaves, i.e.
Definition: TProof.cxx:1974
A doubly linked list.
Definition: TList.h:44
const char * GetObjName() const
Definition: TDSet.h:120
const char * GetUser() const
Definition: TUrl.h:68
Bool_t fRedirLog
Definition: TProof.h:510
TMonitor * fUniqueMonitor
Definition: TProof.h:485
const char *const kPROOF_ConfFile
Definition: TProof.h:122
Bool_t RemoveDataSet(const char *group, const char *user, const char *dsName)
Removes the indicated dataset.
Int_t RemoveDataSet(const char *uri, const char *=0)
Remove the specified dataset from the PROOF cluster.
virtual Int_t ReadFile(const char *fname, EEnvLevel level)
Read and parse the resource file for a certain level.
Definition: TEnv.cxx:592
const char *const kPROOF_QueryLockFile
Definition: TProof.h:133
TMonitor * fAllMonitor
Definition: TProof.h:575
virtual void AddQueryResult(TQueryResult *q)=0
void InitMembers()
Default initializations.
Definition: TProof.cxx:524
void ShowDataSets(const char *uri="", const char *=0)
Shows datasets in locations that match the uri By default shows the user&#39;s datasets and global ones...
const char * GetDir() const
Definition: TPackMgr.h:77
virtual TMap * GetSubDataSets(const char *uri, const char *excludeservers)
Partition dataset &#39;ds&#39; accordingly to the servers.
TString fUser
Definition: TSystem.h:142
virtual const char * GetBuildArch() const
Return the build architecture.
Definition: TSystem.cxx:3768
Int_t fLogLevel
Definition: TProof.h:469
TProofMgr * fManager
Definition: TProof.h:587
TList * fActiveSlaves
Definition: TProof.h:477
Long64_t DrawSelect(TDSet *dset, const char *varexp, const char *selection="", Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)
Execute the specified drawing action on a data set (TDSet).
void QueryResultReady(const char *ref)
Notify availability of a query result.
Definition: TProof.cxx:9341
const TString GetUri() const
Returns the whole URI - an implementation of chapter 5.3 component recomposition. ...
Definition: TUri.cxx:140
Int_t fNWorkers
Definition: TProofLite.h:45
Bool_t fValid
Definition: TProof.h:464
virtual void Setenv(const char *name, const char *value)
Set environment variable.
Definition: TSystem.cxx:1622
Int_t fCpus
Definition: TSystem.h:155
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition: TList.cxx:655
ROOT::R::TRInterface & r
Definition: Object.C:4
Class managing the query-result area.
R__EXTERN TSystem * gSystem
Definition: TSystem.h:540
TString fWorkDir
Definition: TProof.h:467
SVector< double, 2 > v
Definition: Dict.h:5
if object ctor succeeded but object should not be used
Definition: TObject.h:68
const char *const kPROOF_DataSetDir
Definition: TProof.h:129
TMonitor * fCurrentMonitor
Definition: TProof.h:487
THashList * GetList()
Long_t ExecPlugin(int nargs, const T &... params)
Int_t SetupWorkers(Int_t opt=0, TList *wrks=0)
Start up PROOF workers.
Definition: TProofLite.cxx:489
Long64_t GetNFiles() const
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition: TList.cxx:818
const char *const kPROOF_ConfDir
Definition: TProof.h:123
Bool_t Gets(FILE *fp, Bool_t chop=kTRUE)
Read one line from the stream, including the , or until EOF.
Definition: Stringio.cxx:198
virtual Bool_t ExistsDataSet(const char *uri)
Checks if the indicated dataset exits.
Bool_t IsDraw() const
Definition: TQueryResult.h:144
Bool_t ParseUri(const char *uri, TString *dsGroup=0, TString *dsUser=0, TString *dsName=0, TString *dsTree=0, Bool_t onlyCurrent=kFALSE, Bool_t wildcards=kFALSE)
Parses a (relative) URI that describes a DataSet on the cluster.
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
Definition: TString.h:561
Int_t fSeqNum
Definition: TProof.h:526
void ClearCache(const char *file=0)
Remove files from all file caches.
Int_t fDynamicStartupStep
Definition: TProofLite.h:54
virtual Int_t ShowCache(const char *uri)
Show cached information matching uri.
void SetActive(Bool_t=kTRUE)
Definition: TProof.h:988
Int_t WriteDataSet(const char *group, const char *user, const char *dsName, TFileCollection *dataset, UInt_t option=0, TMD5 *checksum=0)
Writes indicated dataset.
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition: TString.cxx:2343
TList * fSlaveInfo
Definition: TProof.h:473
unsigned int UInt_t
Definition: RtypesCore.h:42
TList * GetInputList()
Definition: TQueryResult.h:120
const char *const kPROOF_CacheLockFile
Definition: TProof.h:131
Int_t ScanDataSet(const char *uri, const char *opt)
Scans the dataset indicated by &#39;uri&#39; following the &#39;opts&#39; directives.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:880
char * Form(const char *fmt,...)
Ssiz_t Length() const
Definition: TString.h:386
void SetRunning(Int_t startlog, const char *par, Int_t nwrks)
Call when running starts.
Int_t fSeqNum
query unique sequential number
Definition: TQueryResult.h:52
TServerSocket * fServSock
Definition: TProofLite.h:51
Bool_t SetFragment(const TString &fragment)
Set fragment component of URI: fragment = *( pchar / "/" / "?" ).
Definition: TUri.cxx:498
TList * GetListOfQueries(Option_t *opt="")
Get the list of queries.
Bool_t fProgressDialogStarted
Definition: TProof.h:493
virtual Int_t Exec(const char *shellcmd)
Execute a command.
Definition: TSystem.cxx:661
const Int_t kPROOF_Protocol
Definition: TProof.h:120
Int_t SendGroupView()
Send to all active slaves servers the current slave group size and their unique id.
Definition: TProof.cxx:6432
TStopwatch fQuerySTW
Definition: TProof.h:593
Bool_t FinalizeQuery(TProofQueryResult *pq, TProof *proof, TVirtualProofPlayer *player)
Final steps after Process() to complete the TQueryResult instance.
Int_t RemoveWorkers(TList *wrks)
Used for shuting down the workres after a query is finished.
Definition: TProof.cxx:1575
FILE * fLogFileR
Definition: TProof.h:513
void SetName(const char *name)
Definition: TCollection.h:202
Int_t fProtocol
Definition: TProof.h:571
static Int_t GetNumberOfWorkers(const char *url=0)
Static method to determine the number of workers giving priority to users request.
Definition: TProofLite.cxx:406
Bool_t fLogToWindowOnly
Definition: TProof.h:514
static constexpr double nm
TList * fFeedback
Definition: TProof.h:495
Bool_t IsIdle() const
Definition: TProof.h:940
virtual void FreeDirectory(void *dirp)
Free a directory.
Definition: TSystem.cxx:843
void AddInput(TObject *obj)
Add objects that might be needed during the processing of the selector (see Process()).
Definition: TProof.cxx:9706
#define Printf
Definition: TGeoToOCC.h:18
TMap * GetDataSets(const char *uri="", const char *=0)
lists all datasets that match given uri
TList * fRunningDSets
Definition: TProof.h:581
const Bool_t kFALSE
Definition: RtypesCore.h:88
Int_t VerifyDataSetParallel(const char *uri, const char *optStr)
Internal function for parallel dataset verification used TProof::VerifyDataSet and TProofLite::Verify...
Definition: TProof.cxx:11153
Bool_t fDynamicStartup
Definition: TProof.h:589
virtual void RemoveQueryResult(const char *ref)=0
static TSelector * GetSelector(const char *filename)
The code in filename is loaded (interpreted or compiled, see below), filename must contain a valid cl...
Definition: TSelector.cxx:142
Int_t AssertPath(const char *path, Bool_t writable)
Make sure that &#39;path&#39; exists; if &#39;writable&#39; is kTRUE, make also sure that the path is writable...
Definition: TProof.cxx:1253
void SetHost(const char *host)
Definition: TUrl.h:87
Int_t CreateSandbox()
Create the sandbox for this session.
Definition: TProofLite.cxx:880
TString & Remove(Ssiz_t pos)
Definition: TString.h:619
long Long_t
Definition: RtypesCore.h:50
int Ssiz_t
Definition: RtypesCore.h:63
TString fSockPath
Definition: TProofLite.h:50
Int_t HandleOutputOptions(TString &opt, TString &target, Int_t action)
Extract from opt information about output handling settings.
Definition: TProof.cxx:4909
R__EXTERN TProof * gProof
Definition: TProof.h:1077
virtual TSignalHandler * RemoveSignalHandler(TSignalHandler *sh)
Remove a signal handler from list of signal handlers.
Definition: TSystem.cxx:550
Bool_t fEndMaster
Definition: TProof.h:530
TObjArray * Tokenize(const TString &delim) const
This function is used to isolate sequential tokens in a TString.
Definition: TString.cxx:2251
void SetFeedback(TString &opt, TString &optfb, Int_t action)
Extract from opt in optfb information about wanted feedback settings.
Definition: TProof.cxx:5204
auto * t1
Definition: textangle.C:20
#define ClassImp(name)
Definition: Rtypes.h:359
Int_t GoParallel(Int_t nodes, Bool_t accept=kFALSE, Bool_t random=kFALSE)
Go in parallel mode with at most "nodes" slaves.
Definition: TProof.cxx:7245
TSignalHandler * GetSignalHandler() const
Definition: TApplication.h:106
virtual const char * HostName()
Return the system&#39;s host name.
Definition: TSystem.cxx:311
virtual int Symlink(const char *from, const char *to)
Create a symbolic link from file1 to file2.
Definition: TSystem.cxx:1348
Ssiz_t Last(char c) const
Find last occurrence of a character c.
Definition: TString.cxx:875
Int_t Lock()
Locks the directory.
TList * fEnabledPackages
Definition: TProof.h:580
EQueryMode GetQueryMode(Option_t *mode=0) const
Find out the query mode based on the current setting and &#39;mode&#39;.
Definition: TProof.cxx:6091
Int_t GetParallel() const
Returns number of slaves active in parallel mode.
Definition: TProof.cxx:2282
TList * fNonUniqueMasters
Definition: TProof.h:483
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:40
R__EXTERN TEnv * gEnv
Definition: TEnv.h:171
Int_t CleanupSandbox()
Remove old sessions dirs keep at most &#39;Proof.MaxOldSessions&#39; (default 10)
TList * fRecvMessages
Definition: TProof.h:472
TNamed()
Definition: TNamed.h:36
Int_t GetNumberOfBadSlaves() const
Return number of bad slaves.
Definition: TProof.cxx:1992
static const TString & GetEtcDir()
Get the sysconfig directory in the installation. Static utility function.
Definition: TROOT.cxx:2905
TList * fEnabledPackagesOnCluster
Definition: TProof.h:533
int nentries
Definition: THbookFile.cxx:89
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition: TString.h:570
static constexpr double s
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
Definition: TRolke.cxx:630
#define R__LOCKGUARD(mutex)
virtual void Reset()
Reset or initialize access to the elements.
Definition: TDSet.cxx:1350
void SetRunStatus(ERunStatus rst)
Definition: TProof.h:672
const char * GetFileName() const
Definition: TDSet.h:111
static Int_t RegisterGlobalPath(const char *paths)
Parse one or more paths as possible sources of packages Returns number of paths added; or -1 in case ...
Definition: TPackMgr.cxx:871
void RemoveQuery(TQueryResult *qr, Bool_t soft=kFALSE)
Remove everything about query qr.
Int_t Unlock()
Unlock the directory.
virtual void StopProcess(Bool_t abort, Int_t timeout=-1)=0
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Definition: TList.cxx:399
Int_t GetSandbox(TString &sb, Bool_t assert=kFALSE, const char *rc=0)
Set the sandbox path from &#39; Proof.Sandbox&#39; or the alternative var &#39;rc&#39;.
Definition: TProof.cxx:1004
TQueryResult version adapted to PROOF neeeds.
EQueryMode fQueryMode
Definition: TProof.h:588
static TMap * GetDataSetNodeMap(TFileCollection *fc, TString &emsg)
Get a map {server-name, list-of-files} for collection &#39;fc&#39; to be used in TPacketizerFile.
virtual Long64_t GetEntries() const
Definition: TTree.h:382
Bool_t IsNull() const
Definition: TString.h:383
void SaveQuery(TProofQueryResult *qr, const char *fout=0)
Save current status of query &#39;qr&#39; to file name fout.
Int_t Match(const TString &s, UInt_t start=0)
Runs a match on s against the regex &#39;this&#39; was created with.
Definition: TPRegexp.cxx:708
FILE * fLogFileW
Definition: TProof.h:512
Mother of all ROOT objects.
Definition: TObject.h:37
TPackMgr * fPackMgr
Definition: TProof.h:532
TSelector * fSelector
Definition: TProof.h:591
TString fVarExp
Definition: TProofLite.h:57
Bool_t ExistsDataSet(const char *group, const char *user, const char *dsName)
Checks if the indicated dataset exits.
Float_t GetCpuTime() const
Definition: TProof.h:931
Int_t InitDataSetManager()
Initialize the dataset manager from directives or from defaults Return 0 on success, -1 on failure.
Bool_t R_ISDIR(Int_t mode)
Definition: TSystem.h:116
static void SetMacroPath(const char *newpath)
Set or extend the macro search path.
Definition: TROOT.cxx:2725
R__EXTERN TProofServ * gProofServ
Definition: TProofServ.h:347
virtual void Add(TObject *obj)
Definition: TList.h:87
Wrapper for PCRE library (Perl Compatible Regular Expressions).
Definition: TPRegexp.h:97
Class that contains a list of TFileInfo&#39;s and accumulated meta data information about its entries...
Definition: file.py:1
TProofOutputList fOutputList
Definition: TProof.h:538
TProofLockPath * fQueryLock
Definition: TProofLite.h:61
TFileCollection * GetDataSet(const char *uri, const char *srv=0)
Utility function used in various methods for user dataset upload.
Int_t CleanupQueriesDir()
Remove all queries results referring to previous sessions.
Int_t fSessionID
Definition: TProof.h:528
void AskStatistics()
Ask the for the statistics of the slaves.
Definition: TProof.cxx:2000
TF1 * f1
Definition: legend1.C:11
#define snprintf
Definition: civetweb.c:822
TTree * GetTreeHeader(TDSet *tdset)
Creates a tree header (a tree with nonexisting files) object for the DataSet.
Int_t fStatus
Definition: TProof.h:470
Int_t SetParallel(Int_t nodes=-1, Bool_t random=kFALSE)
Tell PROOF how many slaves to use in parallel.
Definition: TProof.cxx:7112
virtual int CopyFile(const char *from, const char *to, Bool_t overwrite=kFALSE)
Copy a file.
Definition: TSystem.cxx:1321
virtual void * OpenDirectory(const char *name)
Open a directory. Returns 0 if directory does not exist.
Definition: TSystem.cxx:834
Int_t CountChar(Int_t c) const
Return number of times character c occurs in the string.
Definition: TString.cxx:454
R__EXTERN Int_t gDebug
Definition: Rtypes.h:86
Int_t Atoi() const
Return integer value of string.
Definition: TString.cxx:1975
void Reset()
Definition: TStopwatch.h:52
Bool_t IsParallel() const
Definition: TProof.h:939
const char * GetUser() const
Definition: TProof.h:906
Bool_t IsDigit() const
Returns true if all characters in string are digits (0-9) or white spaces, i.e.
Definition: TString.cxx:1817
TList * fAvailablePackages
Definition: TProof.h:579
A TTree object has a header with a name and a title.
Definition: TTree.h:70
const AParamType & GetVal() const
Definition: TParameter.h:69
Class describing a generic file including meta information.
Definition: TFileInfo.h:38
void Emit(const char *signal, const T &arg)
Activate signal with single parameter.
Definition: TQObject.h:165
Int_t SendInitialState()
Transfer the initial (i.e.
Definition: TProof.cxx:6746
void ResetBit(UInt_t f)
Definition: TObject.h:171
TProofMgr::EServType fServType
Definition: TProof.h:586
Bool_t IsValid() const
Definition: TProof.h:937
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
Definition: TSystem.cxx:1254
TList * fTerminatedSlaveInfos
Definition: TProof.h:573
Definition: first.py:1
void Add(TObject *obj)
Add object in sorted list.
Definition: TSortedList.cxx:27
TProofQueryResult * MakeQueryResult(Long64_t nent, const char *opt, Long64_t fst, TDSet *dset, const char *selec)
Create a TProofQueryResult instance for this query.
TList * fLoadedMacros
Definition: TProof.h:544
Int_t Remove(const char *ref, Bool_t all)
Handle remove request.
virtual Int_t Load(const char *macro, Bool_t notOnClient=kFALSE, Bool_t uniqueOnly=kTRUE, TList *wrks=0)
Load the specified macro on master, workers and, if notOnClient is kFALSE, on the client...
Definition: TProof.cxx:8600
virtual Int_t GetSize() const
Definition: TCollection.h:180
Int_t Substitute(TString &s, const TString &r, Bool_t doDollarSubst=kTRUE)
Substitute matching part of s with r, dollar back-ref substitution is performed if doDollarSubst is t...
Definition: TPRegexp.cxx:874
Class describing a PROOF worker server.
Definition: TSlave.h:46
void FindUniqueSlaves()
Add to the fUniqueSlave list the active slaves that have a unique (user) file system image...
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
Definition: TSelector.h:33
void ResetProgressDialog(const char *sel, Int_t sz, Long64_t fst, Long64_t ent)
Reset progress dialog.
Definition: TProof.cxx:9271
virtual void SetTitle(const char *title="")
Set the title of the TNamed.
Definition: TNamed.cxx:164
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition: TEnv.cxx:491
TString fMaster
Definition: TProof.h:466
Bool_t fTty
Definition: TProof.h:465
TUrl fUrl
Definition: TProof.h:567
const Bool_t kTRUE
Definition: RtypesCore.h:87
virtual TMap * GetDataSets(const char *uri, UInt_t=TDataSetManager::kExport)
Returns all datasets for the <group> and <user> specified by <uri>.
void ShowCache(Bool_t all=kFALSE)
List contents of file cache.
virtual char * ConcatFileName(const char *dir, const char *name)
Concatenate a directory and a file name. User must delete returned string.
Definition: TSystem.cxx:1052
void Print(Option_t *option="") const
Print status of PROOF-Lite cluster.
Definition: TProofLite.cxx:949
const char * GetOrdinal() const
Definition: TProofServ.h:253
const Int_t n
Definition: legend1.C:16
virtual Int_t SetupServ(Int_t stype, const char *conffile)
Init a PROOF slave object.
Definition: TSlave.cxx:179
TMonitor * fActiveMonitor
Definition: TProof.h:484
virtual TList * GetListOfResults() const =0
Int_t GetNumberOfUniqueSlaves() const
Return number of unique slaves, i.e.
Definition: TProof.cxx:1983
char name[80]
Definition: TGX11.cxx:109
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:866
Long64_t fLastPollWorkers_s
Definition: TProof.h:476
Int_t fMaxDrawQueries
Definition: TProof.h:525
virtual const char * GetTitle() const
Returns title of object.
Definition: TNamed.h:48
Int_t CopyMacroToCache(const char *macro, Int_t headerRequired=0, TSelector **selector=0, Int_t opt=0, TList *wrks=0)
Copy a macro, and its possible associated .h[h] file, to the cache directory, from where the workers ...
const char *const kPROOF_CacheDir
Definition: TProof.h:125
const char * Data() const
Definition: TString.h:345
const char *const kPROOF_PackDir
Definition: TProof.h:126
void SetInputHandler(TFileHandler *ih)
Adopt and register input handler for this slave.
Definition: TSlave.cxx:394
static Int_t AssertDataSet(TDSet *dset, TList *input, TDataSetManager *mgr, TString &emsg)
Make sure that dataset is in the form to be processed.
Definition: TProof.cxx:11987