ROOT  6.06/09
Reference Guide
TXProofServ.cxx
Go to the documentation of this file.
1 // @(#)root/proofx:$Id$
2 // Author: Gerardo Ganis 12/12/2005
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 //////////////////////////////////////////////////////////////////////////
13 // //
14 // TXProofServ //
15 // //
16 // TXProofServ is the XRD version of the PROOF server. It differs from //
17 // TXProofServ only for the underlying connection technology //
18 // //
19 //////////////////////////////////////////////////////////////////////////
20 
21 #include "RConfigure.h"
22 #include "RConfig.h"
23 #include "Riostream.h"
24 
25 #ifdef WIN32
26  #include <io.h>
27  typedef long off_t;
28 #endif
29 #include <sys/types.h>
30 #include <netinet/in.h>
31 #include <utime.h>
32 
33 #include "TXProofServ.h"
34 #include "TObjString.h"
35 #include "TEnv.h"
36 #include "TError.h"
37 #include "TException.h"
38 #include "THashList.h"
39 #include "TInterpreter.h"
40 #include "TParameter.h"
41 #include "TProofDebug.h"
42 #include "TProof.h"
43 #include "TProofPlayer.h"
44 #include "TQueryResultManager.h"
45 #include "TRegexp.h"
46 #include "TClass.h"
47 #include "TROOT.h"
48 #include "TSystem.h"
49 #include "TPluginManager.h"
50 #include "TXSocketHandler.h"
51 #include "TXUnixSocket.h"
52 #include "compiledata.h"
53 #include "TProofNodeInfo.h"
54 #include "XProofProtocol.h"
55 
56 #include <XrdClient/XrdClientConst.hh>
57 #include <XrdClient/XrdClientEnv.hh>
58 
59 
60 // debug hook
61 static volatile Int_t gProofServDebug = 1;
62 
63 //----- SigPipe signal handler -------------------------------------------------
64 ////////////////////////////////////////////////////////////////////////////////
65 
66 class TXProofServSigPipeHandler : public TSignalHandler {
67  TXProofServ *fServ;
68 public:
69  TXProofServSigPipeHandler(TXProofServ *s) : TSignalHandler(kSigInterrupt, kFALSE)
70  { fServ = s; }
71  Bool_t Notify();
72 };
73 
74 ////////////////////////////////////////////////////////////////////////////////
75 
77 {
78  fServ->HandleSigPipe();
79  return kTRUE;
80 }
81 
82 //----- Termination signal handler ---------------------------------------------
83 ////////////////////////////////////////////////////////////////////////////////
84 
85 class TXProofServTerminationHandler : public TSignalHandler {
86  TXProofServ *fServ;
87 public:
88  TXProofServTerminationHandler(TXProofServ *s)
89  : TSignalHandler(kSigTermination, kFALSE) { fServ = s; }
90  Bool_t Notify();
91 };
92 
93 ////////////////////////////////////////////////////////////////////////////////
94 
96 {
97  Printf("Received SIGTERM: terminating");
98 
99  fServ->HandleTermination();
100  return kTRUE;
101 }
102 
103 //----- Seg violation signal handler ---------------------------------------------
104 ////////////////////////////////////////////////////////////////////////////////
105 
106 class TXProofServSegViolationHandler : public TSignalHandler {
107  TXProofServ *fServ;
108 public:
109  TXProofServSegViolationHandler(TXProofServ *s)
111  Bool_t Notify();
112 };
113 
114 ////////////////////////////////////////////////////////////////////////////////
115 
117 {
118  Printf("**** ");
119  Printf("**** Segmentation violation: terminating ****");
120  Printf("**** ");
121  fServ->HandleTermination();
122  return kTRUE;
123 }
124 
125 //----- Input handler for messages from parent or master -----------------------
126 ////////////////////////////////////////////////////////////////////////////////
127 
128 class TXProofServInputHandler : public TFileHandler {
129  TXProofServ *fServ;
130 public:
131  TXProofServInputHandler(TXProofServ *s, Int_t fd) : TFileHandler(fd, 1)
132  { fServ = s; }
133  Bool_t Notify();
134  Bool_t ReadNotify() { return Notify(); }
135 };
136 
137 ////////////////////////////////////////////////////////////////////////////////
138 
140 {
141  fServ->HandleSocketInput();
142  // This request has been completed: remove the client ID from the pipe
143  ((TXUnixSocket *) fServ->GetSocket())->RemoveClientID();
144  return kTRUE;
145 }
146 
148 
149 // Hook to the constructor. This is needed to avoid using the plugin manager
150 // which may create problems in multi-threaded environments.
151 extern "C" {
152  TApplication *GetTXProofServ(Int_t *argc, char **argv, FILE *flog)
153  { return new TXProofServ(argc, argv, flog); }
154 }
155 
156 ////////////////////////////////////////////////////////////////////////////////
157 /// Main constructor
158 
159 TXProofServ::TXProofServ(Int_t *argc, char **argv, FILE *flog)
160  : TProofServ(argc, argv, flog)
161 {
162  fInterruptHandler = 0;
163  fInputHandler = 0;
165 
166  // TODO:
167  // Int_t useFIFO = 0;
168 /* if (GetParameter(fProof->GetInputList(), "PROOF_UseFIFO", useFIFO) != 0) {
169  if (useFIFO == 1)
170  Info("", "enablig use of FIFO (if allowed by the server)");
171  else
172  Warning("", "unsupported strategy index (%d): ignore", strategy);
173  }
174 */
175 }
176 
177 ////////////////////////////////////////////////////////////////////////////////
178 /// Finalize the server setup. If master, create the TProof instance to talk
179 /// the worker or submaster nodes.
180 /// Return 0 on success, -1 on error
181 
183 {
184  Bool_t xtest = (Argc() > 3 && !strcmp(Argv(3), "test")) ? kTRUE : kFALSE;
185 
186  if (gProofDebugLevel > 0)
187  Info("CreateServer", "starting%s server creation", (xtest ? " test" : ""));
188 
189  // Get file descriptor for log file
190  if (fLogFile) {
191  // Use the file already open by pmain
192  if ((fLogFileDes = fileno(fLogFile)) < 0) {
193  Error("CreateServer", "resolving the log file description number");
194  return -1;
195  }
196  // Hide the session start-up logs unless we are in verbose mode
197  if (gProofDebugLevel <= 0)
198  lseek(fLogFileDes, (off_t) 0, SEEK_END);
199  }
200 
201  // Global location string in TXSocket
202  TXSocket::SetLocation((IsMaster()) ? "master" : "slave");
203 
204  // Set debug level in XrdClient
205  EnvPutInt(NAME_DEBUG, gEnv->GetValue("XNet.Debug", 0));
206 
207  // Get socket to be used to call back our xpd
208  if (xtest) {
209  // test session, just send the protocol version on the open pipe
210  // and exit
211  if (!(fSockPath = gSystem->Getenv("ROOTOPENSOCK"))) {
212  Error("CreateServer", "test: socket setup by xpd undefined");
213  return -1;
214  }
215  Int_t fpw = (Int_t) strtol(fSockPath.Data(), 0, 10);
216  int proto = htonl(kPROOF_Protocol);
217  fSockPath = "";
218  if (write(fpw, &proto, sizeof(proto)) != sizeof(proto)) {
219  Error("CreateServer", "test: sending protocol number");
220  return -1;
221  }
222  exit(0);
223  } else {
224  fSockPath = gEnv->GetValue("ProofServ.OpenSock", "");
225  if (fSockPath.Length() <= 0) {
226  Error("CreateServer", "socket setup by xpd undefined");
227  return -1;
228  }
229  TString entity = gEnv->GetValue("ProofServ.Entity", "");
230  if (entity.Length() > 0)
231  fSockPath.Insert(0,Form("%s/", entity.Data()));
232  }
233 
234  // Get open socket descriptor, if any
235  Int_t sockfd = -1;
236  const char *opensock = gSystem->Getenv("ROOTOPENSOCK");
237  if (opensock && strlen(opensock) > 0) {
239  sockfd = (Int_t) strtol(opensock, 0, 10);
240  if (TSystem::GetErrno() == ERANGE) {
241  sockfd = -1;
242  Warning("CreateServer", "socket descriptor: wrong conversion from '%s'", opensock);
243  }
244  if (sockfd > 0 && gProofDebugLevel > 0)
245  Info("CreateServer", "using open connection (descriptor %d)", sockfd);
246  }
247 
248  // Get the sessions ID
249  Int_t psid = gEnv->GetValue("ProofServ.SessionID", -1);
250  if (psid < 0) {
251  Error("CreateServer", "Session ID undefined");
252  return -1;
253  }
254 
255  // Call back the server
256  fSocket = new TXUnixSocket(fSockPath, psid, -1, this, sockfd);
257  if (!fSocket || !(fSocket->IsValid())) {
258  Error("CreateServer", "Failed to open connection to XrdProofd coordinator");
259  return -1;
260  }
261  // Set compression level, if any
263 
264  // Set the title for debugging
265  TString tgt("client");
266  if (fOrdinal != "0") {
267  tgt = fOrdinal;
268  if (tgt.Last('.') != kNPOS) tgt.Remove(tgt.Last('.'));
269  }
270  fSocket->SetTitle(tgt);
271 
272  // Set the this as reference of this socket
273  ((TXSocket *)fSocket)->fReference = this;
274 
275  // Get socket descriptor
276  Int_t sock = fSocket->GetDescriptor();
277 
278  // Install message input handlers
279  fInputHandler =
280  TXSocketHandler::GetSocketHandler(new TXProofServInputHandler(this, sock), fSocket);
282 
283  // Get the client ID
284  Int_t cid = gEnv->GetValue("ProofServ.ClientID", -1);
285  if (cid < 0) {
286  Error("CreateServer", "Client ID undefined");
287  SendLogFile();
288  return -1;
289  }
290  ((TXSocket *)fSocket)->SetClientID(cid);
291 
292  // debug hooks
293  if (IsMaster()) {
294  // wait (loop) in master to allow debugger to connect
295  if (gEnv->GetValue("Proof.GdbHook",0) == 1) {
296  while (gProofServDebug)
297  ;
298  }
299  } else {
300  // wait (loop) in slave to allow debugger to connect
301  if (gEnv->GetValue("Proof.GdbHook",0) == 2) {
302  while (gProofServDebug)
303  ;
304  }
305  }
306 
307  if (gProofDebugLevel > 0)
308  Info("CreateServer", "Service: %s, ConfDir: %s, IsMaster: %d",
310 
311  if (Setup() == -1) {
312  // Setup failure
313  LogToMaster();
314  SendLogFile();
315  Terminate(0);
316  return -1;
317  }
318 
319  if (!fLogFile) {
320  RedirectOutput();
321  // If for some reason we failed setting a redirection file for the logs
322  // we cannot continue
323  if (!fLogFile || (fLogFileDes = fileno(fLogFile)) < 0) {
324  LogToMaster();
325  SendLogFile(-98);
326  Terminate(0);
327  return -1;
328  }
329  }
330 
331  // Send message of the day to the client
332  if (IsMaster()) {
333  if (CatMotd() == -1) {
334  LogToMaster();
335  SendLogFile(-99);
336  Terminate(0);
337  return -1;
338  }
339  }
340 
341  // Everybody expects iostream to be available, so load it...
342  ProcessLine("#include <iostream>", kTRUE);
343  ProcessLine("#include <string>",kTRUE); // for std::string iostream.
344 
345  // Load user functions
346  const char *logon;
347  logon = gEnv->GetValue("Proof.Load", (char *)0);
348  if (logon) {
349  char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
350  if (mac)
351  ProcessLine(Form(".L %s", logon), kTRUE);
352  delete [] mac;
353  }
354 
355  // Execute logon macro
356  logon = gEnv->GetValue("Proof.Logon", (char *)0);
357  if (logon && !NoLogOpt()) {
358  char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
359  if (mac)
360  ProcessFile(logon);
361  delete [] mac;
362  }
363 
364  // Save current interpreter context
365  gInterpreter->SaveContext();
366  gInterpreter->SaveGlobalsContext();
367 
368  // if master, start slave servers
369  if (IsMaster()) {
370  TString master;
371 
372  if (fConfFile.BeginsWith("lite:")) {
373  master = "lite://";
374  } else {
375  master.Form("proof://%s@__master__", fUser.Data());
376 
377  // Add port, if defined
378  Int_t port = gEnv->GetValue("ProofServ.XpdPort", -1);
379  if (port > -1) {
380  master += ":";
381  master += port;
382  }
383  }
384 
385  // Make sure that parallel startup via threads is not active
386  // (it is broken for xpd because of the locks on gInterpreterMutex)
387  gEnv->SetValue("Proof.ParallelStartup", 0);
388 
389  // Get plugin manager to load appropriate TProof from
390  TPluginManager *pm = gROOT->GetPluginManager();
391  if (!pm) {
392  Error("CreateServer", "no plugin manager found");
393  SendLogFile(-99);
394  Terminate(0);
395  return -1;
396  }
397 
398  // Find the appropriate handler
399  TPluginHandler *h = pm->FindHandler("TProof", fConfFile);
400  if (!h) {
401  Error("CreateServer", "no plugin found for TProof with a"
402  " config file of '%s'", fConfFile.Data());
403  SendLogFile(-99);
404  Terminate(0);
405  return -1;
406  }
407 
408  // load the plugin
409  if (h->LoadPlugin() == -1) {
410  Error("CreateServer", "plugin for TProof could not be loaded");
411  SendLogFile(-99);
412  Terminate(0);
413  return -1;
414  }
415 
416  // Make instance of TProof
417  if (fConfFile.BeginsWith("lite:")) {
418  // Remove input and signal handlers to avoid spurious "signals"
419  // during startup
421  fProof = reinterpret_cast<TProof*>(h->ExecPlugin(6, master.Data(),
422  0, 0,
423  fLogLevel,
424  fSessionDir.Data(), 0));
425  // Re-enable input and signal handlers
427  } else {
428  fProof = reinterpret_cast<TProof*>(h->ExecPlugin(5, master.Data(),
429  fConfFile.Data(),
430  fConfDir.Data(),
431  fLogLevel,
432  fTopSessionTag.Data()));
433  }
434 
435  // Save worker info
436  if (fProof) fProof->SaveWorkerInfo();
437 
438  if (!fProof || (fProof && !fProof->IsValid())) {
439  Error("CreateServer", "plugin for TProof could not be executed");
440  FlushLogFile();
441  delete fProof;
442  fProof = 0;
443  SendLogFile(-99);
444  Terminate(0);
445  return -1;
446  }
447  // Find out if we are a master in direct contact only with workers
449 
450  SendLogFile();
451  }
452 
453  // Setup the shutdown timer
454  if (!fShutdownTimer) {
455  // Check activity on socket every 5 mins
456  fShutdownTimer = new TShutdownTimer(this, 300000);
458  }
459 
460  // Check if schema evolution is effective: clients running versions <=17 do not
461  // support that: send a warning message
462  if (fProtocol <= 17) {
463  TString msg;
464  msg.Form("Warning: client version is too old: automatic schema evolution is ineffective.\n"
465  " This may generate compatibility problems between streamed objects.\n"
466  " The advise is to move to ROOT >= 5.21/02 .");
467  SendAsynMessage(msg.Data());
468  }
469 
470  // Setup the idle timer
471  if (IsMaster() && !fIdleTOTimer) {
472  // Check activity on socket every 5 mins
473  Int_t idle_to = gEnv->GetValue("ProofServ.IdleTimeout", -1);
474  if (idle_to > 0) {
475  fIdleTOTimer = new TIdleTOTimer(this, idle_to * 1000);
476  fIdleTOTimer->Start(-1, kTRUE);
477  if (gProofDebugLevel > 0)
478  Info("CreateServer", " idle timer started (%d secs)", idle_to);
479  } else if (gProofDebugLevel > 0) {
480  Info("CreateServer", " idle timer not started (no idle timeout requested)");
481  }
482  }
483 
484  // Done
485  return 0;
486 }
487 
488 ////////////////////////////////////////////////////////////////////////////////
489 /// Cleanup. Not really necessary since after this dtor there is no
490 /// live anyway.
491 
493 {
494  delete fSocket;
495 }
496 
497 ////////////////////////////////////////////////////////////////////////////////
498 /// Handle high priority data sent by the master or client.
499 
501 {
502  // Real-time notification of messages
504 
505  // Get interrupt
506  Bool_t fw = kFALSE;
507  Int_t iLev = ((TXSocket *)fSocket)->GetInterrupt(fw);
508  if (iLev < 0) {
509  Error("HandleUrgentData", "error receiving interrupt");
510  return;
511  }
512 
513  PDB(kGlobal, 2)
514  Info("HandleUrgentData", "got interrupt: %d\n", iLev);
515 
516  if (fProof)
517  fProof->SetActive();
518 
519  switch (iLev) {
520 
521  case TProof::kPing:
522  PDB(kGlobal, 2)
523  Info("HandleUrgentData", "*** Ping");
524 
525  // If master server, propagate interrupt to slaves
526  if (fw && IsMaster()) {
527  Int_t nbad = fProof->fActiveSlaves->GetSize() - fProof->Ping();
528  if (nbad > 0) {
529  Info("HandleUrgentData","%d slaves did not reply to ping",nbad);
530  }
531  }
532 
533  // Touch the admin path to show we are alive
534  if (fAdminPath.IsNull()) {
535  fAdminPath = gEnv->GetValue("ProofServ.AdminPath", "");
536  }
537 
538  if (!fAdminPath.IsNull()) {
539  if (!fAdminPath.EndsWith(".status")) {
540  // Update file time stamps
541  if (utime(fAdminPath.Data(), 0) != 0)
542  Info("HandleUrgentData", "problems touching path: %s", fAdminPath.Data());
543  else
544  PDB(kGlobal, 2)
545  Info("HandleUrgentData", "touching path: %s", fAdminPath.Data());
546  } else {
547  // Update the status in the file
548  // 0 idle
549  // 1 running
550  // 2 being terminated (currently unused)
551  // 3 queued
552  // 4 idle timed-out
553  Int_t uss_rc = UpdateSessionStatus(-1);
554  if (uss_rc != 0)
555  Error("HandleUrgentData", "problems updating status path: %s (errno: %d)", fAdminPath.Data(), -uss_rc);
556  }
557  } else {
558  Info("HandleUrgentData", "admin path undefined");
559  }
560 
561  break;
562 
564  Info("HandleUrgentData", "*** Hard Interrupt");
565 
566  // If master server, propagate interrupt to slaves
567  if (fw && IsMaster())
569 
570  // Flush input socket
571  ((TXSocket *)fSocket)->Flush();
572 
573  if (IsMaster())
574  SendLogFile();
575 
576  break;
577 
579  Info("HandleUrgentData", "Soft Interrupt");
580 
581  // If master server, propagate interrupt to slaves
582  if (fw && IsMaster())
584 
585  Interrupt();
586 
587  if (IsMaster())
588  SendLogFile();
589 
590  break;
591 
592 
594  Info("HandleUrgentData", "Shutdown Interrupt");
595 
596  // When returning for here connection are closed
598 
599  break;
600 
601  default:
602  Error("HandleUrgentData", "unexpected type: %d", iLev);
603  break;
604  }
605 
606 
607  if (fProof) fProof->SetActive(kFALSE);
608 }
609 
610 ////////////////////////////////////////////////////////////////////////////////
611 /// Called when the client is not alive anymore; terminate the session.
612 
614 {
615  // Real-time notification of messages
616 
617  Info("HandleSigPipe","got sigpipe ... do nothing");
618 }
619 
620 ////////////////////////////////////////////////////////////////////////////////
621 /// Called when the client is not alive anymore; terminate the session.
622 
624 {
625  // If master server, propagate interrupt to slaves
626  // (shutdown interrupt send internally).
627  if (IsMaster()) {
628 
629  // If not idle, try first to stop processing
630  if (!fIdle) {
631  // Remove pending requests
633  // Interrupt the current monitor
635  // Do not wait for ever, but al least 20 seconds
636  Long_t timeout = gEnv->GetValue("Proof.ShutdownTimeout", 60);
637  timeout = (timeout > 20) ? timeout : 20;
638  // Processing will be aborted
639  fProof->StopProcess(kTRUE, (Long_t) (timeout / 2));
640  // Receive end-of-processing messages, but do not wait for ever
641  fProof->Collect(TProof::kActive, timeout);
642  // Still not idle
643  if (!fIdle)
644  Warning("HandleTermination","processing could not be stopped");
645  }
646  // Close the session
647  if (fProof)
648  fProof->Close("S");
649  }
650 
651  Terminate(0); // will not return from here....
652 }
653 
654 ////////////////////////////////////////////////////////////////////////////////
655 /// Print the ProofServ logo on standard output.
656 /// Return 0 on success, -1 on error
657 
659 {
660  char str[512];
661 
662  if (IsMaster()) {
663  snprintf(str, 512, "**** Welcome to the PROOF server @ %s ****", gSystem->HostName());
664  } else {
665  snprintf(str, 512, "**** PROOF worker server @ %s started ****", gSystem->HostName());
666  }
667 
668  if (fSocket->Send(str) != 1+static_cast<Int_t>(strlen(str))) {
669  Error("Setup", "failed to send proof server startup message");
670  return -1;
671  }
672 
673  // Get client protocol
674  if ((fProtocol = gEnv->GetValue("ProofServ.ClientVersion", -1)) < 0) {
675  Error("Setup", "remote proof protocol missing");
676  return -1;
677  }
678 
679  // The local user
680  fUser = gEnv->GetValue("ProofServ.Entity", "");
681  if (fUser.Length() >= 0) {
682  if (fUser.Contains(":"))
683  fUser.Remove(fUser.Index(":"));
684  if (fUser.Contains("@"))
685  fUser.Remove(fUser.Index("@"));
686  } else {
688  if (pw) {
689  fUser = pw->fUser;
690  delete pw;
691  }
692  }
693 
694  // Work dir and ...
695  if (IsMaster()) {
696  TString cf = gEnv->GetValue("ProofServ.ProofConfFile", "");
697  if (cf.Length() > 0)
698  fConfFile = cf;
699  }
700  fWorkDir = gEnv->GetValue("ProofServ.Sandbox", Form("~/%s", kPROOF_WorkDir));
701 
702  // Get Session tag
703  if ((fSessionTag = gEnv->GetValue("ProofServ.SessionTag", "-1")) == "-1") {
704  Error("Setup", "Session tag missing");
705  return -1;
706  }
707  // Get top session tag, i.e. the tag of the PROOF session
708  if ((fTopSessionTag = gEnv->GetValue("ProofServ.TopSessionTag", "-1")) == "-1") {
709  fTopSessionTag = "";
710  // Try to extract it from log file path (for backward compatibility)
711  if (gSystem->Getenv("ROOTPROOFLOGFILE")) {
712  fTopSessionTag = gSystem->DirName(gSystem->Getenv("ROOTPROOFLOGFILE"));
713  Ssiz_t lstl;
714  if ((lstl = fTopSessionTag.Last('/')) != kNPOS) fTopSessionTag.Remove(0, lstl + 1);
715  if (fTopSessionTag.BeginsWith("session-")) {
716  fTopSessionTag.Remove(0, strlen("session-"));
717  } else {
718  fTopSessionTag = "";
719  }
720  }
721  if (fTopSessionTag.IsNull()) {
722  Error("Setup", "top session tag missing");
723  return -1;
724  }
725  }
726 
727  // Make sure the process ID is in the tag
728  TString spid = Form("-%d", gSystem->GetPid());
729  if (!fSessionTag.EndsWith(spid)) {
730  Int_t nd = 0;
731  if ((nd = fSessionTag.CountChar('-')) >= 2) {
732  Int_t id = fSessionTag.Index("-", fSessionTag.Index("-") + 1);
733  if (id != kNPOS) fSessionTag.Remove(id);
734  } else if (nd != 1) {
735  Warning("Setup", "Wrong number of '-' in session tag: protocol error? %s", fSessionTag.Data());
736  }
737  // Add this process ID
738  fSessionTag += spid;
739  }
740  if (gProofDebugLevel > 0)
741  Info("Setup", "session tags: %s, %s", fTopSessionTag.Data(), fSessionTag.Data());
742 
743  // Get Session dir (sandbox)
744  if ((fSessionDir = gEnv->GetValue("ProofServ.SessionDir", "-1")) == "-1") {
745  Error("Setup", "Session dir missing");
746  return -1;
747  }
748 
749  // Goto to the main PROOF working directory
751  fWorkDir = workdir;
752  delete [] workdir;
753  if (gProofDebugLevel > 0)
754  Info("Setup", "working directory set to %s", fWorkDir.Data());
755 
756  // Common setup
757  if (SetupCommon() != 0) {
758  Error("Setup", "common setup failed");
759  return -1;
760  }
761 
762  // Send packages off immediately to reduce latency
764 
765  // Check every two hours if client is still alive
767 
768  // Install SigPipe handler to handle kKeepAlive failure
769  gSystem->AddSignalHandler(new TXProofServSigPipeHandler(this));
770 
771  // Install Termination handler
772  gSystem->AddSignalHandler(new TXProofServTerminationHandler(this));
773 
774  // Install seg violation handler
775  gSystem->AddSignalHandler(new TXProofServSegViolationHandler(this));
776 
777  if (gProofDebugLevel > 0)
778  Info("Setup", "successfully completed");
779 
780  // Done
781  return 0;
782 }
783 
784 ////////////////////////////////////////////////////////////////////////////////
785 /// Get list of workers to be used from now on.
786 /// The list must be provided by the caller.
787 
789  Int_t & /* prioritychange */,
790  Bool_t resume)
791 {
793 
794  // User config files, when enabled, override cluster-wide configuration
795  if (gEnv->GetValue("ProofServ.UseUserCfg", 0) != 0) {
796  Int_t pc = 1;
797  return TProofServ::GetWorkers(workers, pc);
798  }
799 
800  // seqnum of the query for which we call getworkers
801  Bool_t dynamicStartup = gEnv->GetValue("Proof.DynamicStartup", kFALSE);
802  TString seqnum = (dynamicStartup) ? "" : XPD_GW_Static;
803  if (!fWaitingQueries->IsEmpty()) {
804  if (resume) {
805  seqnum += ((TProofQueryResult *)(fWaitingQueries->First()))->GetSeqNum();
806  } else {
807  seqnum += ((TProofQueryResult *)(fWaitingQueries->Last()))->GetSeqNum();
808  }
809  }
810  // Send request to the coordinator
811  TObjString *os = 0;
812  if (dynamicStartup) {
813  // We wait dynto seconds for the first worker to come; -1 means forever
814  Int_t dynto = gEnv->GetValue("Proof.DynamicStartupTimeout", -1);
815  Bool_t doto = (dynto > 0) ? kTRUE : kFALSE;
816  while (!(os = ((TXSocket *)fSocket)->SendCoordinator(kGetWorkers, seqnum.Data()))) {
817  if (doto > 0 && --dynto < 0) break;
818  // Another second
819  gSystem->Sleep(1000);
820  }
821  } else {
822  os = ((TXSocket *)fSocket)->SendCoordinator(kGetWorkers, seqnum.Data());
823  }
824 
825  // The reply contains some information about the master (image, workdir)
826  // followed by the information about the workers; the tokens for each node
827  // are separated by '&'
828  if (os) {
829  TString fl(os->GetName());
830  if (fl.BeginsWith(XPD_GW_QueryEnqueued)) {
831  SendAsynMessage("+++ Query cannot be processed now: enqueued");
832  return kQueryEnqueued;
833  }
834 
835  // Honour a max number of workers request (typically when running in valgrind)
836  Int_t nwrks = -1;
837  Bool_t pernode = kFALSE;
838  if (gSystem->Getenv("PROOF_NWORKERS")) {
839  TString s(gSystem->Getenv("PROOF_NWORKERS"));
840  if (s.EndsWith("x")) {
841  pernode = kTRUE;
842  s.ReplaceAll("x", "");
843  }
844  if (s.IsDigit()) {
845  nwrks = s.Atoi();
846  if (!dynamicStartup && (nwrks > 0)) {
847  // Notify, except in dynamic workers mode to avoid flooding
848  TString msg;
849  if (pernode) {
850  msg.Form("+++ Starting max %d workers per node following the setting of PROOF_NWORKERS", nwrks);
851  } else {
852  msg.Form("+++ Starting max %d workers following the setting of PROOF_NWORKERS", nwrks);
853  }
854  SendAsynMessage(msg);
855  } else {
856  nwrks = -1;
857  }
858  } else {
859  pernode = kFALSE;
860  }
861  }
862 
863  TString tok;
864  Ssiz_t from = 0;
865  TList *nodecnt = (pernode) ? new TList : 0 ;
866  if (fl.Tokenize(tok, from, "&")) {
867  if (!tok.IsNull()) {
868  TProofNodeInfo *master = new TProofNodeInfo(tok);
869  if (!master) {
870  Error("GetWorkers", "no appropriate master line got from coordinator");
871  return kQueryStop;
872  } else {
873  // Set image if not yet done and available
874  if (fImage.IsNull() && strlen(master->GetImage()) > 0)
875  fImage = master->GetImage();
876  SafeDelete(master);
877  }
878  // Now the workers
879  while (fl.Tokenize(tok, from, "&")) {
880  if (!tok.IsNull()) {
881  if (nwrks == -1 || nwrks > 0) {
882  // We have the minimal set of information to start
883  rc = kQueryOK;
884  if (pernode && nodecnt) {
885  TProofNodeInfo *ni = new TProofNodeInfo(tok);
886  TParameter<Int_t> *p = 0;
887  Int_t nw = 0;
888  if (!(p = (TParameter<Int_t> *) nodecnt->FindObject(ni->GetNodeName().Data()))) {
889  p = new TParameter<Int_t>(ni->GetNodeName().Data(), nw);
890  nodecnt->Add(p);
891  }
892  nw = p->GetVal();
893  if (gDebug > 0)
894  Info("GetWorkers","%p: name: %s (%s) val: %d (nwrks: %d)",
895  p, p->GetName(), ni->GetNodeName().Data(), nw, nwrks);
896  if (nw < nwrks) {
897  if (workers) workers->Add(ni);
898  nw++;
899  p->SetVal(nw);
900  } else {
901  // Two many workers on this machine already
902  SafeDelete(ni);
903  }
904  } else {
905  if (workers)
906  workers->Add(new TProofNodeInfo(tok));
907  // Count down
908  if (nwrks != -1) nwrks--;
909  }
910  } else {
911  // Release this worker (to cleanup the session list in the coordinator and get a fresh
912  // and correct list next call)
913  TProofNodeInfo *ni = new TProofNodeInfo(tok);
914  ReleaseWorker(ni->GetOrdinal().Data());
915  }
916  }
917  }
918  }
919  }
920  // Cleanup
921  if (nodecnt) {
922  nodecnt->SetOwner(kTRUE);
923  SafeDelete(nodecnt);
924  }
925  }
926 
927  // We are done
928  return rc;
929 }
930 
931 ////////////////////////////////////////////////////////////////////////////////
932 /// Handle error on the input socket
933 
935 {
936  // Try reconnection
937  if (fSocket && !fSocket->IsValid()) {
938 
939  fSocket->Reconnect();
940  if (fSocket && fSocket->IsValid()) {
941  if (gDebug > 0)
942  Info("HandleError",
943  "%p: connection to local coordinator re-established", this);
944  FlushLogFile();
945  return kFALSE;
946  }
947  }
948  Printf("TXProofServ::HandleError: %p: got called ...", this);
949 
950  // If master server, propagate interrupt to slaves
951  // (shutdown interrupt send internally).
952  if (IsMaster())
953  fProof->Close("S");
954 
955  // Avoid communicating back anything to the coordinator (it is gone)
956  if (fSocket) ((TXSocket *)fSocket)->SetSessionID(-1);
957 
958  Terminate(0);
959 
960  Printf("TXProofServ::HandleError: %p: DONE ... ", this);
961 
962  // We are done
963  return kTRUE;
964 }
965 
966 ////////////////////////////////////////////////////////////////////////////////
967 /// Handle asynchronous input on the input socket
968 
970 {
971  if (gDebug > 2)
972  Printf("TXProofServ::HandleInput %p, in: %p", this, in);
973 
974  XHandleIn_t *hin = (XHandleIn_t *) in;
975  Int_t acod = (hin) ? hin->fInt1 : kXPD_msg;
976 
977  // Act accordingly
978  if (acod == kXPD_ping || acod == kXPD_interrupt) {
979  // Interrupt or Ping
981 
982  } else if (acod == kXPD_flush) {
983  // Flush stdout, so that we can access the full log file
984  Info("HandleInput","kXPD_flush: flushing log file (stdout)");
985  fflush(stdout);
986 
987  } else if (acod == kXPD_urgent) {
988  // Get type
989  Int_t type = hin->fInt2;
990  switch (type) {
992  {
993  // Abort or Stop ?
994  Bool_t abort = (hin->fInt3 != 0) ? kTRUE : kFALSE;
995  // Timeout
996  Int_t timeout = hin->fInt4;
997  // Act now
998  if (fProof)
999  fProof->StopProcess(abort, timeout);
1000  else
1001  if (fPlayer)
1002  fPlayer->StopProcess(abort, timeout);
1003  }
1004  break;
1005  default:
1006  Info("HandleInput","kXPD_urgent: unknown type: %d", type);
1007  }
1008 
1009  } else if (acod == kXPD_inflate) {
1010 
1011  // Obsolete type
1012  Warning("HandleInput", "kXPD_inflate: obsolete message type");
1013 
1014  } else if (acod == kXPD_priority) {
1015 
1016  // The factor is the priority to be propagated
1017  fGroupPriority = hin->fInt2;
1018  if (fProof)
1020  // Notify
1021  Info("HandleInput", "kXPD_priority: group %s priority set to %f",
1022  fGroup.Data(), (Float_t) fGroupPriority / 100.);
1023 
1024  } else if (acod == kXPD_clusterinfo) {
1025 
1026  // Information about the cluster status
1027  fTotSessions = hin->fInt2;
1028  fActSessions = hin->fInt3;
1029  fEffSessions = (hin->fInt4)/1000.;
1030  // Notify
1031  Info("HandleInput", "kXPD_clusterinfo: tot: %d, act: %d, eff: %f",
1033 
1034  } else {
1035  // Standard socket input
1037  // This request has been completed: remove the client ID from the pipe
1038  ((TXSocket *)fSocket)->RemoveClientID();
1039  }
1040 
1041  // We are done
1042  return kTRUE;
1043 }
1044 
1045 ////////////////////////////////////////////////////////////////////////////////
1046 /// Disable read timeout on the underlying socket
1047 
1049 {
1050  if (fSocket)
1051  ((TXSocket *)fSocket)->DisableTimeout();
1052 }
1053 
1054 ////////////////////////////////////////////////////////////////////////////////
1055 /// Enable read timeout on the underlying socket
1056 
1058 {
1059  if (fSocket)
1060  ((TXSocket *)fSocket)->EnableTimeout();
1061 }
1062 
1063 ////////////////////////////////////////////////////////////////////////////////
1064 /// Terminate the proof server.
1065 
1067 {
1068  if (fTerminated)
1069  // Avoid doubling the exit operations
1070  exit(1);
1071  fTerminated = kTRUE;
1072 
1073  // Notify
1074  Info("Terminate", "starting session termination operations ...");
1075  if (fgLogToSysLog > 0) {
1076  TString s;
1077  s.Form("%s -1 %.3f %.3f", fgSysLogEntity.Data(), fRealTime, fCpuTime);
1078  gSystem->Syslog(kLogNotice, s.Data());
1079  }
1080 
1081  // Notify the memory footprint
1082  ProcInfo_t pi;
1083  if (!gSystem->GetProcInfo(&pi)){
1084  Info("Terminate", "process memory footprint: %ld/%ld kB virtual, %ld/%ld kB resident ",
1086  }
1087 
1088  // Deactivate current monitor, if any
1089  if (fProof)
1090  fProof->SetMonitor(0, kFALSE);
1091 
1092  // Cleanup session directory
1093  if (status == 0) {
1094  // make sure we remain in a "connected" directory
1095  gSystem->ChangeDirectory("/");
1096  // needed in case fSessionDir is on NFS ?!
1097  gSystem->MakeDirectory(fSessionDir+"/.delete");
1098  gSystem->Exec(Form("%s %s", kRM, fSessionDir.Data()));
1099  }
1100 
1101  // Cleanup queries directory if empty
1102  if (IsMaster()) {
1103  if (!(fQMgr && fQMgr->Queries() && fQMgr->Queries()->GetSize())) {
1104  // make sure we remain in a "connected" directory
1105  gSystem->ChangeDirectory("/");
1106  // needed in case fQueryDir is on NFS ?!
1107  gSystem->MakeDirectory(fQueryDir+"/.delete");
1108  gSystem->Exec(Form("%s %s", kRM, fQueryDir.Data()));
1109  // Remove lock file
1110  if (fQueryLock)
1112  }
1113 
1114  // Unlock the query dir owned by this session
1115  if (fQueryLock)
1116  fQueryLock->Unlock();
1117  } else {
1118  // Try to stop processing if any
1119  Bool_t abort = (status == 0) ? kFALSE : kTRUE;
1120  if (!fIdle && fPlayer)
1121  fPlayer->StopProcess(abort,1);
1122  gSystem->Sleep(2000);
1123  }
1124 
1125  // Cleanup data directory if empty
1127  if (UnlinkDataDir(fDataDir))
1128  Info("Terminate", "data directory '%s' has been removed", fDataDir.Data());
1129  }
1130 
1131  // Remove input and signal handlers to avoid spurious "signals"
1132  // for closing activities executed upon exit()
1134 
1135  // Stop processing events (set a flag to exit the event loop)
1136  gSystem->ExitLoop();
1137 
1138  // We post the pipe once to wake up the main thread which is waiting for
1139  // activity on this socket; this fake activity will make it return and
1140  // eventually exit the loop.
1142 
1143  // Notify
1144  Printf("Terminate: termination operations ended: quitting!");
1145 }
1146 
1147 ////////////////////////////////////////////////////////////////////////////////
1148 /// Try locking query area of session tagged sessiontag.
1149 /// The id of the locking file is returned in fid and must be
1150 /// unlocked via UnlockQueryFile(fid).
1151 
1152 Int_t TXProofServ::LockSession(const char *sessiontag, TProofLockPath **lck)
1153 {
1154  // We do not need to lock our own session
1155  if (strstr(sessiontag, fTopSessionTag))
1156  return 0;
1157 
1158  if (!lck) {
1159  Info("LockSession","locker space undefined");
1160  return -1;
1161  }
1162  *lck = 0;
1163 
1164  // Check the format
1165  TString stag = sessiontag;
1166  TRegexp re("session-.*-.*-.*");
1167  Int_t i1 = stag.Index(re);
1168  if (i1 == kNPOS) {
1169  Info("LockSession","bad format: %s", sessiontag);
1170  return -1;
1171  }
1172  stag.ReplaceAll("session-","");
1173 
1174  // Drop query number, if any
1175  Int_t i2 = stag.Index(":q");
1176  if (i2 != kNPOS)
1177  stag.Remove(i2);
1178 
1179  // Make sure that parent process does not exist anylonger
1180  TString parlog = fSessionDir;
1181  parlog = parlog.Remove(parlog.Index("master-")+strlen("master-"));
1182  parlog += stag;
1183  if (!gSystem->AccessPathName(parlog)) {
1184  Info("LockSession","parent still running: do nothing");
1185  return -1;
1186  }
1187 
1188  // Lock the query lock file
1189  TString qlock = fQueryLock->GetName();
1190  qlock.ReplaceAll(fTopSessionTag, stag);
1191 
1192  if (!gSystem->AccessPathName(qlock)) {
1193  *lck = new TProofLockPath(qlock);
1194  if (((*lck)->Lock()) < 0) {
1195  Info("LockSession","problems locking query lock file");
1196  SafeDelete(*lck);
1197  return -1;
1198  }
1199  }
1200 
1201  // We are done
1202  return 0;
1203 }
1204 
1205 ////////////////////////////////////////////////////////////////////////////////
1206 /// Send message to intermediate coordinator to release worker of last ordinal
1207 /// ord.
1208 
1210 {
1211  if (gDebug > 2) Info("ReleaseWorker","releasing: %s", ord);
1212 
1213  ((TXSocket *)fSocket)->SendCoordinator(kReleaseWorker, ord);
1214 }
const char * GetName() const
Returns name of object.
Definition: TObjString.h:42
Bool_t fMasterServ
Definition: TProofServ.h:123
Float_t fEffSessions
Definition: TProofServ.h:134
FILE * fLogFile
Definition: TProofServ.h:111
Bool_t fRealTimeLog
Definition: TProofServ.h:148
TXSocketHandler * fInputHandler
Definition: TXProofServ.h:38
virtual Bool_t AccessPathName(const char *path, EAccessMode mode=kFileExists)
Returns FALSE if one can access a file using the specified access mode.
Definition: TSystem.cxx:1265
void Interrupt(EUrgent type, ESlaves list=kActive)
Send interrupt to master or slave servers.
Definition: TProof.cxx:2271
Ssiz_t Last(char c) const
Find last occurrence of a character c.
Definition: TString.cxx:864
Int_t CatMotd()
Print message of the day (in the file pointed by the env PROOFMOTD or from fConfDir/etc/proof/motd).
Int_t Setup()
Print the ProofServ logo on standard output.
static void SetLocation(const char *loc="")
Set location string.
Definition: TXSocket.cxx:254
virtual Long_t ProcessLine(const char *line, Bool_t sync=kFALSE, Int_t *error=0)
Process a single command line, either a C++ statement or an interpreter command starting with a "...
virtual int GetPid()
Get process id.
Definition: TSystem.cxx:711
Int_t fActSessions
Definition: TProofServ.h:133
TString fConfFile
Definition: TProofServ.h:89
virtual void Delete(Option_t *option="")
Remove all objects from the list AND delete all heap based objects.
Definition: TList.cxx:404
virtual void Syslog(ELogLevel level, const char *mess)
Send mess to syslog daemon.
Definition: TSystem.cxx:1648
void LogToMaster(Bool_t on=kTRUE)
Definition: TProofServ.h:334
virtual Bool_t Notify()
Notify when event occurred on descriptor associated with this handler.
Bool_t IsValid() const
Definition: TProof.h:970
virtual EQueryAction GetWorkers(TList *workers, Int_t &prioritychange, Bool_t resume=kFALSE)
Get list of workers to be used from now on.
double write(int n, const std::string &file_name, const std::string &vector_type, int compress=0)
writing
static Long_t fgResMemMax
Definition: TProofServ.h:172
void SetCompressionSettings(Int_t settings=1)
Used to specify the compression level and algorithm: settings = 100 * algorithm + level...
Definition: TSocket.cxx:1103
Int_t SetupCommon()
Common part (between TProofServ and TXProofServ) of the setup phase.
Ssiz_t Length() const
Definition: TString.h:390
const double pi
Int_t CreateServer()
Finalize the server setup.
Collectable string class.
Definition: TObjString.h:32
float Float_t
Definition: RtypesCore.h:53
virtual Bool_t IsValid() const
Definition: TSocket.h:162
void SetMonitor(TMonitor *mon=0, Bool_t on=kTRUE)
Activate (on == TRUE) or deactivate (on == FALSE) all sockets monitored by 'mon'. ...
Definition: TProof.cxx:2403
virtual Int_t Reconnect()
Definition: TSocket.h:168
void InterruptCurrentMonitor()
If in active in a monitor set ready state.
Definition: TProof.cxx:11902
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:520
virtual Int_t SetOption(ESockOptions opt, Int_t val)
Set socket options.
Definition: TSocket.cxx:1017
static TXSocketHandler * GetSocketHandler(TFileHandler *h=0, TSocket *s=0)
Get an instance of the input socket handler with 'h' as handler, connected to socket 's'...
Int_t Argc() const
Definition: TApplication.h:141
TString & ReplaceAll(const TString &s1, const TString &s2)
Definition: TString.h:635
TString fSessionTag
Definition: TProofServ.h:92
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
TH1 * h
Definition: legend2.C:5
TVirtualProofPlayer * fPlayer
Definition: TProofServ.h:110
Bool_t UnlinkDataDir(const char *path)
Scan recursively the datadir and unlink it if empty Return kTRUE if it can be unlinked, kFALSE otherwise.
Bool_t HandleInput(const void *in=0)
Handle asynchronous input on the input socket.
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:892
const char *const kPROOF_WorkDir
Definition: TProof.h:147
TString fGroup
Definition: TProofServ.h:87
virtual int MakeDirectory(const char *name)
Make a directory.
Definition: TSystem.cxx:821
virtual void AddSignalHandler(TSignalHandler *sh)
Add a signal handler to list of system signal handlers.
Definition: TSystem.cxx:536
Regular expression class.
Definition: TRegexp.h:35
Int_t UpdateSessionStatus(Int_t xst=-1)
Update the session status in the relevant file.
virtual Bool_t ChangeDirectory(const char *path)
Change directory.
Definition: TSystem.cxx:856
Bool_t Notify()
Definition: TTimer.cxx:65
const char *const XPD_GW_QueryEnqueued
TPluginHandler * FindHandler(const char *base, const char *uri=0)
Returns the handler if there exists a handler for the specified URI.
#define gROOT
Definition: TROOT.h:340
void SetVal(const AParamType &val)
Definition: TParameter.h:79
Int_t LoadPlugin()
Load the plugin library for this handler.
Basic string class.
Definition: TString.h:137
virtual void SaveWorkerInfo()
Save information about the worker set in the file .workers in the working dir.
Definition: TProof.cxx:12375
TAlienJobStatus * status
Definition: TAlienJob.cxx:51
int Int_t
Definition: RtypesCore.h:41
virtual const char * DirName(const char *pathname)
Return the directory name in pathname.
Definition: TSystem.cxx:996
bool Bool_t
Definition: RtypesCore.h:59
const Bool_t kFALSE
Definition: Rtypes.h:92
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:496
const char *const kRM
Definition: TProof.h:165
#define gInterpreter
Definition: TInterpreter.h:502
virtual char * Which(const char *search, const char *file, EAccessMode mode=kFileExists)
Find location of file in a search path.
Definition: TSystem.cxx:1511
TString fImage
Definition: TProofServ.h:91
TString fService
Definition: TProofServ.h:85
Int_t LockSession(const char *sessiontag, TProofLockPath **lck)
Try locking query area of session tagged sessiontag.
Long_t ExecPlugin(int nargs, const T &...params)
void HandleUrgentData()
Handle high priority data sent by the master or client.
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
Definition: TString.h:558
void DisableTimeout()
Disable read timeout on the underlying socket.
TString & Insert(Ssiz_t pos, const char *s)
Definition: TString.h:592
void EnableTimeout()
Enable read timeout on the underlying socket.
Int_t fInt3
Definition: TXSocket.h:66
virtual int GetProcInfo(ProcInfo_t *info) const
Returns cpu and memory used by this process into the ProcInfo_t structure.
Definition: TSystem.cxx:2443
virtual TFileHandler * RemoveFileHandler(TFileHandler *fh)
Remove a file handler from the list of file handlers.
Definition: TSystem.cxx:568
virtual Int_t GetDescriptor() const
Definition: TSocket.h:142
static Long_t fgVirtMemMax
Definition: TProofServ.h:171
Int_t Post(TSocket *s)
Write a byte to the global pipe to signal new availibility of new messages.
Definition: TXSocket.cxx:2228
Int_t fInt2
Definition: TXSocket.h:65
virtual void HandleSocketInput()
Handle input coming from the client or from the master server.
static const char * GetMacroPath()
Get macro search path. Static utility function.
Definition: TROOT.cxx:2406
static Int_t GetErrno()
Static function returning system error number.
Definition: TSystem.cxx:264
virtual void SetValue(const char *name, const char *value, EEnvLevel level=kEnvChange, const char *type=0)
Set the value of a resource or create a new resource.
Definition: TEnv.cxx:749
const char * Data() const
Definition: TString.h:349
virtual Bool_t Notify()
Notify when signal occurs.
#define SafeDelete(p)
Definition: RConfig.h:436
virtual int Unlink(const char *name)
Unlink, i.e. remove, a file.
Definition: TSystem.cxx:1346
TSocket * fSocket
Definition: TProofServ.h:108
void StopProcess(Bool_t abort, Int_t timeout=-1)
Send STOPPROCESS message to master and workers.
Definition: TProof.cxx:6211
#define PDB(mask, level)
Definition: TProofDebug.h:58
virtual void Sleep(UInt_t milliSec)
Sleep milliSec milli seconds.
Definition: TSystem.cxx:441
const char * ord
Definition: TXSlave.cxx:46
virtual Bool_t IsEmpty() const
Definition: TCollection.h:99
virtual void Start(Long_t milliSec=-1, Bool_t singleShot=kFALSE)
Starts the timer with a milliSec timeout.
Definition: TTimer.cxx:209
TXProofServInterruptHandler * fInterruptHandler
Definition: TXProofServ.h:37
void ReleaseWorker(const char *ord)
Send message to intermediate coordinator to release worker of last ordinal ord.
virtual const char * Getenv(const char *env)
Get environment variable.
Definition: TSystem.cxx:1627
Int_t Collect(const TSlave *sl, Long_t timeout=-1, Int_t endtype=-1, Bool_t deactonfail=kFALSE)
Collect responses from slave sl.
Definition: TProof.cxx:2664
if(pyself &&pyself!=Py_None)
virtual void ExitLoop()
Exit from event loop.
Definition: TSystem.cxx:396
virtual UserGroup_t * GetUserInfo(Int_t uid)
Returns all user info in the UserGroup_t structure.
Definition: TSystem.cxx:1563
TString fDataDir
Definition: TProofServ.h:100
void FlushLogFile()
Reposition the read pointer in the log file to the very end.
TProof * fProof
Definition: TProofServ.h:109
TString fConfDir
Definition: TProofServ.h:88
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:918
TString flog
Definition: pq2main.cxx:37
TString fOrdinal
Definition: TProofServ.h:116
EQueryAction GetWorkers(TList *workers, Int_t &prioritychange, Bool_t resume=kFALSE)
Get list of workers to be used from now on.
virtual Int_t SendAsynMessage(const char *msg, Bool_t lf=kTRUE)
Send an asychronous message to the master / client .
TString fAdminPath
Definition: TProofServ.h:102
Int_t fInt4
Definition: TXSocket.h:67
A doubly linked list.
Definition: TList.h:47
Int_t fGroupPriority
Definition: TProofServ.h:121
Float_t fCpuTime
Definition: TProofServ.h:126
char ** Argv() const
Definition: TApplication.h:142
const TString & GetOrdinal() const
const char Int_t const char TProof Int_t const char * workdir
Definition: TXSlave.cxx:46
void Terminate(Int_t status)
Terminate the proof server.
Named parameter, streamable and storable.
Definition: TParameter.h:49
Int_t fProtocol
Definition: TProofServ.h:115
TString fUser
Definition: TSystem.h:152
TList * fActiveSlaves
Definition: TProof.h:505
static double C[]
static Int_t fgLogToSysLog
Definition: TProofServ.h:184
Bool_t EndsWith(const char *pat, ECaseCompare cmp=kExact) const
Return true if string ends with the specified string.
Definition: TString.cxx:2220
R__EXTERN TSystem * gSystem
Definition: TSystem.h:549
Int_t fCompressMsg
Definition: TProofServ.h:154
TString fSessionDir
Definition: TProofServ.h:94
const char * GetName() const
Returns name of object.
Definition: TParameter.h:76
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Definition: TEnv.cxx:494
virtual ~TXProofServ()
Cleanup.
Long_t fMemVirtual
Definition: TSystem.h:207
void SetActive(Bool_t=kTRUE)
Definition: TProof.h:1021
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition: TString.cxx:2321
TIdleTOTimer * fIdleTOTimer
Definition: TProofServ.h:152
void HandleSigPipe()
Called when the client is not alive anymore; terminate the session.
char * Form(const char *fmt,...)
void HandleTermination()
Called when the client is not alive anymore; terminate the session.
This class implements a plugin library manager.
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:51
Int_t Ping(ESlaves list)
Ping PROOF slaves. Returns the number of slaves that responded.
Definition: TProof.cxx:4742
virtual Int_t Exec(const char *shellcmd)
Execute a command.
Definition: TSystem.cxx:657
const Int_t kPROOF_Protocol
Definition: TProof.h:143
void Interrupt()
Definition: TProofServ.h:303
TString fUser
Definition: TProofServ.h:86
Int_t CountChar(Int_t c) const
Return number of times character c occurs in the string.
Definition: TString.cxx:443
Bool_t fIdle
Definition: TProofServ.h:141
Bool_t IsNull() const
Definition: TString.h:387
TShutdownTimer * fShutdownTimer
Definition: TProofServ.h:150
#define Printf
Definition: TGeoToOCC.h:18
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
Definition: TList.cxx:580
TString & Remove(Ssiz_t pos)
Definition: TString.h:616
long Long_t
Definition: RtypesCore.h:50
int Ssiz_t
Definition: RtypesCore.h:63
void Close(Option_t *option="")
Close all open slave servers.
Definition: TProof.cxx:1792
virtual Int_t GetSize() const
Definition: TCollection.h:95
virtual void SendLogFile(Int_t status=0, Int_t start=-1, Int_t end=-1)
Send log file to master.
TString fTopSessionTag
Definition: TProofServ.h:93
virtual const char * HostName()
Return the system's host name.
Definition: TSystem.cxx:307
Bool_t IsMaster() const
Definition: TProofServ.h:305
Int_t fLogLevel
Definition: TProofServ.h:119
int type
Definition: TGX11.cxx:120
R__EXTERN TEnv * gEnv
Definition: TEnv.h:174
Definition: TProof.h:339
ClassImp(TXProofServ) extern"C"
Float_t fRealTime
Definition: TProofServ.h:125
Int_t fInt1
Definition: TXSocket.h:64
Int_t Unlock()
Unlock the directory.
virtual void StopProcess(Bool_t abort, Int_t timeout=-1)=0
Bool_t HandleError(const void *in=0)
Handle error on the input socket.
Bool_t fEndMaster
Definition: TProofServ.h:122
static TString fgSysLogEntity
Definition: TProofServ.h:186
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition: TList.cxx:556
const char *const XPD_GW_Static
TXProofServ(Int_t *argc, char **argv, FILE *flog=0)
Main constructor.
virtual void Add(TObject *obj)
Definition: TList.h:81
const Ssiz_t kNPOS
Definition: Rtypes.h:115
TList * Queries() const
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition: TString.h:567
R__EXTERN Int_t gProofDebugLevel
Definition: TProofDebug.h:56
const TString & GetNodeName() const
TString fWorkDir
Definition: TProofServ.h:90
static TXSockPipe fgPipe
Definition: TXSocket.h:126
R__EXTERN Int_t gDebug
Definition: Rtypes.h:128
virtual Bool_t ReadNotify()
Notify when something can be read from the descriptor associated with this handler.
void RedirectOutput(const char *dir=0, const char *mode="w")
Redirect stdout to a log file.
virtual void AddFileHandler(TFileHandler *fh)
Add a file handler to the list of system file handlers.
Definition: TSystem.cxx:558
static void ResetErrno()
Static function resetting system error number.
Definition: TSystem.cxx:280
This class creates the ROOT Application Environment that interfaces to the windowing system eventloop...
Definition: TApplication.h:45
const AParamType & GetVal() const
Definition: TParameter.h:77
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
Definition: TSystem.cxx:1243
Int_t BroadcastGroupPriority(const char *grp, Int_t priority, ESlaves list=kAllUnique)
Broadcast the group priority to all workers in the specified list.
Definition: TProof.cxx:2446
TString fSockPath
Definition: TXProofServ.h:39
TList * fWaitingQueries
Definition: TProofServ.h:140
TString fQueryDir
Definition: TProofServ.h:98
Int_t fTotSessions
Definition: TProofServ.h:132
Ssiz_t Index(const char *pat, Ssiz_t i=0, ECaseCompare cmp=kExact) const
Definition: TString.h:582
const TString & GetImage() const
const Bool_t kTRUE
Definition: Rtypes.h:91
virtual void SetTitle(const char *title="")
Change (i.e. set) the title of the TNamed.
Definition: TNamed.cxx:152
Int_t fLogFileDes
Definition: TProofServ.h:112
Bool_t fTerminated
Definition: TXProofServ.h:41
Bool_t NoLogOpt() const
Definition: TApplication.h:144
static volatile Int_t gProofServDebug
Definition: TXProofServ.cxx:61
Long_t fMemResident
Definition: TSystem.h:206
Bool_t IsEndMaster() const
Definition: TProof.h:699
TQueryResultManager * fQMgr
Definition: TProofServ.h:138
virtual Long_t ProcessFile(const char *file, Int_t *error=0, Bool_t keep=kFALSE)
Process a file containing a C++ macro.
TProofLockPath * fQueryLock
Definition: TProofServ.h:106
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:904