Logo ROOT   6.16/01
Reference Guide
TXProofServ.cxx
Go to the documentation of this file.
1// @(#)root/proofx:$Id$
2// Author: Gerardo Ganis 12/12/2005
3
4/*************************************************************************
5 * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/** \class TXProofServ
13\ingroup proofx
14
15This class implements the XProofD version of TProofServ, with respect to which it differs only
16for the underlying connection technology.
17
18*/
19
20#include "RConfigure.h"
21#include <ROOT/RConfig.hxx>
22#include "Riostream.h"
23
24#ifdef WIN32
25 #include <io.h>
26 typedef long off_t;
27#endif
28#include <sys/types.h>
29#include <netinet/in.h>
30#include <utime.h>
31
32#include "TXProofServ.h"
33#include "TObjString.h"
34#include "TEnv.h"
35#include "TError.h"
36#include "TException.h"
37#include "THashList.h"
38#include "TInterpreter.h"
39#include "TParameter.h"
40#include "TProofDebug.h"
41#include "TProof.h"
42#include "TProofPlayer.h"
43#include "TQueryResultManager.h"
44#include "TRegexp.h"
45#include "TClass.h"
46#include "TROOT.h"
47#include "TSystem.h"
48#include "TPluginManager.h"
49#include "TXSocketHandler.h"
50#include "TXUnixSocket.h"
51#include "compiledata.h"
52#include "TProofNodeInfo.h"
53#include "XProofProtocol.h"
54
57
58
59// debug hook
60static volatile Int_t gProofServDebug = 1;
61
62//----- SigPipe signal handler -------------------------------------------------
63////////////////////////////////////////////////////////////////////////////////
64
65class TXProofServSigPipeHandler : public TSignalHandler {
66 TXProofServ *fServ;
67public:
68 TXProofServSigPipeHandler(TXProofServ *s) : TSignalHandler(kSigInterrupt, kFALSE)
69 { fServ = s; }
70 Bool_t Notify();
71};
72
73////////////////////////////////////////////////////////////////////////////////
74
75Bool_t TXProofServSigPipeHandler::Notify()
76{
77 fServ->HandleSigPipe();
78 return kTRUE;
79}
80
81//----- Termination signal handler ---------------------------------------------
82////////////////////////////////////////////////////////////////////////////////
83
84class TXProofServTerminationHandler : public TSignalHandler {
85 TXProofServ *fServ;
86public:
87 TXProofServTerminationHandler(TXProofServ *s)
88 : TSignalHandler(kSigTermination, kFALSE) { fServ = s; }
89 Bool_t Notify();
90};
91
92////////////////////////////////////////////////////////////////////////////////
93
94Bool_t TXProofServTerminationHandler::Notify()
95{
96 Printf("Received SIGTERM: terminating");
97
98 fServ->HandleTermination();
99 return kTRUE;
100}
101
102//----- Seg violation signal handler ---------------------------------------------
103////////////////////////////////////////////////////////////////////////////////
104
105class TXProofServSegViolationHandler : public TSignalHandler {
106 TXProofServ *fServ;
107public:
108 TXProofServSegViolationHandler(TXProofServ *s)
110 Bool_t Notify();
111};
112
113////////////////////////////////////////////////////////////////////////////////
114
115Bool_t TXProofServSegViolationHandler::Notify()
116{
117 Printf("**** ");
118 Printf("**** Segmentation violation: terminating ****");
119 Printf("**** ");
120 fServ->HandleTermination();
121 return kTRUE;
122}
123
124//----- Input handler for messages from parent or master -----------------------
125////////////////////////////////////////////////////////////////////////////////
126
127class TXProofServInputHandler : public TFileHandler {
128 TXProofServ *fServ;
129public:
130 TXProofServInputHandler(TXProofServ *s, Int_t fd) : TFileHandler(fd, 1)
131 { fServ = s; }
132 Bool_t Notify();
133 Bool_t ReadNotify() { return Notify(); }
134};
135
136////////////////////////////////////////////////////////////////////////////////
137
138Bool_t TXProofServInputHandler::Notify()
139{
140 fServ->HandleSocketInput();
141 // This request has been completed: remove the client ID from the pipe
142 ((TXUnixSocket *) fServ->GetSocket())->RemoveClientID();
143 return kTRUE;
144}
145
147
148// Hook to the constructor. This is needed to avoid using the plugin manager
149// which may create problems in multi-threaded environments.
150extern "C" {
151 TApplication *GetTXProofServ(Int_t *argc, char **argv, FILE *flog)
152 { return new TXProofServ(argc, argv, flog); }
153}
154
155////////////////////////////////////////////////////////////////////////////////
156/// Main constructor
157
158TXProofServ::TXProofServ(Int_t *argc, char **argv, FILE *flog)
159 : TProofServ(argc, argv, flog)
160{
162 fInputHandler = 0;
164
165 // TODO:
166 // Int_t useFIFO = 0;
167/* if (GetParameter(fProof->GetInputList(), "PROOF_UseFIFO", useFIFO) != 0) {
168 if (useFIFO == 1)
169 Info("", "enablig use of FIFO (if allowed by the server)");
170 else
171 Warning("", "unsupported strategy index (%d): ignore", strategy);
172 }
173*/
174}
175
176////////////////////////////////////////////////////////////////////////////////
177/// Finalize the server setup. If master, create the TProof instance to talk
178/// the worker or submaster nodes.
179/// Return 0 on success, -1 on error
180
182{
183 Bool_t xtest = (Argc() > 3 && !strcmp(Argv(3), "test")) ? kTRUE : kFALSE;
184
185 if (gProofDebugLevel > 0)
186 Info("CreateServer", "starting%s server creation", (xtest ? " test" : ""));
187
188 // Get file descriptor for log file
189 if (fLogFile) {
190 // Use the file already open by pmain
191 if ((fLogFileDes = fileno(fLogFile)) < 0) {
192 Error("CreateServer", "resolving the log file description number");
193 return -1;
194 }
195 // Hide the session start-up logs unless we are in verbose mode
196 if (gProofDebugLevel <= 0)
197 lseek(fLogFileDes, (off_t) 0, SEEK_END);
198 }
199
200 // Global location string in TXSocket
201 TXSocket::SetLocation((IsMaster()) ? "master" : "slave");
202
203 // Set debug level in XrdClient
204 EnvPutInt(NAME_DEBUG, gEnv->GetValue("XNet.Debug", 0));
205
206 // Get socket to be used to call back our xpd
207 if (xtest) {
208 // test session, just send the protocol version on the open pipe
209 // and exit
210 if (!(fSockPath = gSystem->Getenv("ROOTOPENSOCK"))) {
211 Error("CreateServer", "test: socket setup by xpd undefined");
212 return -1;
213 }
214 Int_t fpw = (Int_t) strtol(fSockPath.Data(), 0, 10);
215 int proto = htonl(kPROOF_Protocol);
216 fSockPath = "";
217 if (write(fpw, &proto, sizeof(proto)) != sizeof(proto)) {
218 Error("CreateServer", "test: sending protocol number");
219 return -1;
220 }
221 exit(0);
222 } else {
223 fSockPath = gEnv->GetValue("ProofServ.OpenSock", "");
224 if (fSockPath.Length() <= 0) {
225 Error("CreateServer", "socket setup by xpd undefined");
226 return -1;
227 }
228 TString entity = gEnv->GetValue("ProofServ.Entity", "");
229 if (entity.Length() > 0)
230 fSockPath.Insert(0,Form("%s/", entity.Data()));
231 }
232
233 // Get open socket descriptor, if any
234 Int_t sockfd = -1;
235 const char *opensock = gSystem->Getenv("ROOTOPENSOCK");
236 if (opensock && strlen(opensock) > 0) {
238 sockfd = (Int_t) strtol(opensock, 0, 10);
239 if (TSystem::GetErrno() == ERANGE) {
240 sockfd = -1;
241 Warning("CreateServer", "socket descriptor: wrong conversion from '%s'", opensock);
242 }
243 if (sockfd > 0 && gProofDebugLevel > 0)
244 Info("CreateServer", "using open connection (descriptor %d)", sockfd);
245 }
246
247 // Get the sessions ID
248 Int_t psid = gEnv->GetValue("ProofServ.SessionID", -1);
249 if (psid < 0) {
250 Error("CreateServer", "Session ID undefined");
251 return -1;
252 }
253
254 // Call back the server
255 fSocket = new TXUnixSocket(fSockPath, psid, -1, this, sockfd);
256 if (!fSocket || !(fSocket->IsValid())) {
257 Error("CreateServer", "Failed to open connection to XrdProofd coordinator");
258 return -1;
259 }
260 // Set compression level, if any
262
263 // Set the title for debugging
264 TString tgt("client");
265 if (fOrdinal != "0") {
266 tgt = fOrdinal;
267 if (tgt.Last('.') != kNPOS) tgt.Remove(tgt.Last('.'));
268 }
269 fSocket->SetTitle(tgt);
270
271 // Set the this as reference of this socket
272 ((TXSocket *)fSocket)->fReference = this;
273
274 // Get socket descriptor
275 Int_t sock = fSocket->GetDescriptor();
276
277 // Install message input handlers
279 TXSocketHandler::GetSocketHandler(new TXProofServInputHandler(this, sock), fSocket);
281
282 // Get the client ID
283 Int_t cid = gEnv->GetValue("ProofServ.ClientID", -1);
284 if (cid < 0) {
285 Error("CreateServer", "Client ID undefined");
286 SendLogFile();
287 return -1;
288 }
289 ((TXSocket *)fSocket)->SetClientID(cid);
290
291 // debug hooks
292 if (IsMaster()) {
293 // wait (loop) in master to allow debugger to connect
294 if (gEnv->GetValue("Proof.GdbHook",0) == 1) {
295 while (gProofServDebug)
296 ;
297 }
298 } else {
299 // wait (loop) in slave to allow debugger to connect
300 if (gEnv->GetValue("Proof.GdbHook",0) == 2) {
301 while (gProofServDebug)
302 ;
303 }
304 }
305
306 if (gProofDebugLevel > 0)
307 Info("CreateServer", "Service: %s, ConfDir: %s, IsMaster: %d",
309
310 if (Setup() == -1) {
311 // Setup failure
312 LogToMaster();
313 SendLogFile();
314 Terminate(0);
315 return -1;
316 }
317
318 if (!fLogFile) {
320 // If for some reason we failed setting a redirection file for the logs
321 // we cannot continue
322 if (!fLogFile || (fLogFileDes = fileno(fLogFile)) < 0) {
323 LogToMaster();
324 SendLogFile(-98);
325 Terminate(0);
326 return -1;
327 }
328 }
329
330 // Send message of the day to the client
331 if (IsMaster()) {
332 if (CatMotd() == -1) {
333 LogToMaster();
334 SendLogFile(-99);
335 Terminate(0);
336 return -1;
337 }
338 }
339
340 // Everybody expects iostream to be available, so load it...
341 ProcessLine("#include <iostream>", kTRUE);
342 ProcessLine("#include <string>",kTRUE); // for std::string iostream.
343
344 // Load user functions
345 const char *logon;
346 logon = gEnv->GetValue("Proof.Load", (char *)0);
347 if (logon) {
348 char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
349 if (mac)
350 ProcessLine(Form(".L %s", logon), kTRUE);
351 delete [] mac;
352 }
353
354 // Execute logon macro
355 logon = gEnv->GetValue("Proof.Logon", (char *)0);
356 if (logon && !NoLogOpt()) {
357 char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
358 if (mac)
359 ProcessFile(logon);
360 delete [] mac;
361 }
362
363 // Save current interpreter context
364 gInterpreter->SaveContext();
365 gInterpreter->SaveGlobalsContext();
366
367 // if master, start slave servers
368 if (IsMaster()) {
369 TString master;
370
371 if (fConfFile.BeginsWith("lite:")) {
372 master = "lite://";
373 } else {
374 master.Form("proof://%s@__master__", fUser.Data());
375
376 // Add port, if defined
377 Int_t port = gEnv->GetValue("ProofServ.XpdPort", -1);
378 if (port > -1) {
379 master += ":";
380 master += port;
381 }
382 }
383
384 // Make sure that parallel startup via threads is not active
385 // (it is broken for xpd because of the locks on gInterpreterMutex)
386 gEnv->SetValue("Proof.ParallelStartup", 0);
387
388 // Get plugin manager to load appropriate TProof from
389 TPluginManager *pm = gROOT->GetPluginManager();
390 if (!pm) {
391 Error("CreateServer", "no plugin manager found");
392 SendLogFile(-99);
393 Terminate(0);
394 return -1;
395 }
396
397 // Find the appropriate handler
398 TPluginHandler *h = pm->FindHandler("TProof", fConfFile);
399 if (!h) {
400 Error("CreateServer", "no plugin found for TProof with a"
401 " config file of '%s'", fConfFile.Data());
402 SendLogFile(-99);
403 Terminate(0);
404 return -1;
405 }
406
407 // load the plugin
408 if (h->LoadPlugin() == -1) {
409 Error("CreateServer", "plugin for TProof could not be loaded");
410 SendLogFile(-99);
411 Terminate(0);
412 return -1;
413 }
414
415 // Make instance of TProof
416 if (fConfFile.BeginsWith("lite:")) {
417 // Remove input and signal handlers to avoid spurious "signals"
418 // during startup
420 fProof = reinterpret_cast<TProof*>(h->ExecPlugin(6, master.Data(),
421 0, 0,
422 fLogLevel,
423 fSessionDir.Data(), 0));
424 // Re-enable input and signal handlers
426 } else {
427 fProof = reinterpret_cast<TProof*>(h->ExecPlugin(5, master.Data(),
428 fConfFile.Data(),
429 fConfDir.Data(),
430 fLogLevel,
432 }
433
434 // Save worker info
436
437 if (!fProof || (fProof && !fProof->IsValid())) {
438 Error("CreateServer", "plugin for TProof could not be executed");
439 FlushLogFile();
440 delete fProof;
441 fProof = 0;
442 SendLogFile(-99);
443 Terminate(0);
444 return -1;
445 }
446 // Find out if we are a master in direct contact only with workers
448
449 SendLogFile();
450 }
451
452 // Setup the shutdown timer
453 if (!fShutdownTimer) {
454 // Check activity on socket every 5 mins
455 fShutdownTimer = new TShutdownTimer(this, 300000);
457 }
458
459 // Check if schema evolution is effective: clients running versions <=17 do not
460 // support that: send a warning message
461 if (fProtocol <= 17) {
462 TString msg;
463 msg.Form("Warning: client version is too old: automatic schema evolution is ineffective.\n"
464 " This may generate compatibility problems between streamed objects.\n"
465 " The advise is to move to ROOT >= 5.21/02 .");
466 SendAsynMessage(msg.Data());
467 }
468
469 // Setup the idle timer
470 if (IsMaster() && !fIdleTOTimer) {
471 // Check activity on socket every 5 mins
472 Int_t idle_to = gEnv->GetValue("ProofServ.IdleTimeout", -1);
473 if (idle_to > 0) {
474 fIdleTOTimer = new TIdleTOTimer(this, idle_to * 1000);
476 if (gProofDebugLevel > 0)
477 Info("CreateServer", " idle timer started (%d secs)", idle_to);
478 } else if (gProofDebugLevel > 0) {
479 Info("CreateServer", " idle timer not started (no idle timeout requested)");
480 }
481 }
482
483 // Done
484 return 0;
485}
486
487////////////////////////////////////////////////////////////////////////////////
488/// Cleanup. Not really necessary since after this dtor there is no
489/// live anyway.
490
492{
493 delete fSocket;
494}
495
496////////////////////////////////////////////////////////////////////////////////
497/// Handle high priority data sent by the master or client.
498
500{
501 // Real-time notification of messages
503
504 // Get interrupt
505 Bool_t fw = kFALSE;
506 Int_t iLev = ((TXSocket *)fSocket)->GetInterrupt(fw);
507 if (iLev < 0) {
508 Error("HandleUrgentData", "error receiving interrupt");
509 return;
510 }
511
512 PDB(kGlobal, 2)
513 Info("HandleUrgentData", "got interrupt: %d\n", iLev);
514
515 if (fProof)
516 fProof->SetActive();
517
518 switch (iLev) {
519
520 case TProof::kPing:
521 PDB(kGlobal, 2)
522 Info("HandleUrgentData", "*** Ping");
523
524 // If master server, propagate interrupt to slaves
525 if (fw && IsMaster()) {
527 if (nbad > 0) {
528 Info("HandleUrgentData","%d slaves did not reply to ping",nbad);
529 }
530 }
531
532 // Touch the admin path to show we are alive
533 if (fAdminPath.IsNull()) {
534 fAdminPath = gEnv->GetValue("ProofServ.AdminPath", "");
535 }
536
537 if (!fAdminPath.IsNull()) {
538 if (!fAdminPath.EndsWith(".status")) {
539 // Update file time stamps
540 if (utime(fAdminPath.Data(), 0) != 0)
541 Info("HandleUrgentData", "problems touching path: %s", fAdminPath.Data());
542 else
543 PDB(kGlobal, 2)
544 Info("HandleUrgentData", "touching path: %s", fAdminPath.Data());
545 } else {
546 // Update the status in the file
547 // 0 idle
548 // 1 running
549 // 2 being terminated (currently unused)
550 // 3 queued
551 // 4 idle timed-out
552 Int_t uss_rc = UpdateSessionStatus(-1);
553 if (uss_rc != 0)
554 Error("HandleUrgentData", "problems updating status path: %s (errno: %d)", fAdminPath.Data(), -uss_rc);
555 }
556 } else {
557 Info("HandleUrgentData", "admin path undefined");
558 }
559
560 break;
561
563 Info("HandleUrgentData", "*** Hard Interrupt");
564
565 // If master server, propagate interrupt to slaves
566 if (fw && IsMaster())
568
569 // Flush input socket
570 ((TXSocket *)fSocket)->Flush();
571
572 if (IsMaster())
573 SendLogFile();
574
575 break;
576
578 Info("HandleUrgentData", "Soft Interrupt");
579
580 // If master server, propagate interrupt to slaves
581 if (fw && IsMaster())
583
584 Interrupt();
585
586 if (IsMaster())
587 SendLogFile();
588
589 break;
590
591
593 Info("HandleUrgentData", "Shutdown Interrupt");
594
595 // When returning for here connection are closed
597
598 break;
599
600 default:
601 Error("HandleUrgentData", "unexpected type: %d", iLev);
602 break;
603 }
604
605
607}
608
609////////////////////////////////////////////////////////////////////////////////
610/// Called when the client is not alive anymore; terminate the session.
611
613{
614 // Real-time notification of messages
615
616 Info("HandleSigPipe","got sigpipe ... do nothing");
617}
618
619////////////////////////////////////////////////////////////////////////////////
620/// Called when the client is not alive anymore; terminate the session.
621
623{
624 // If master server, propagate interrupt to slaves
625 // (shutdown interrupt send internally).
626 if (IsMaster()) {
627
628 // If not idle, try first to stop processing
629 if (!fIdle) {
630 // Remove pending requests
632 // Interrupt the current monitor
634 // Do not wait for ever, but al least 20 seconds
635 Long_t timeout = gEnv->GetValue("Proof.ShutdownTimeout", 60);
636 timeout = (timeout > 20) ? timeout : 20;
637 // Processing will be aborted
638 fProof->StopProcess(kTRUE, (Long_t) (timeout / 2));
639 // Receive end-of-processing messages, but do not wait for ever
640 fProof->Collect(TProof::kActive, timeout);
641 // Still not idle
642 if (!fIdle)
643 Warning("HandleTermination","processing could not be stopped");
644 }
645 // Close the session
646 if (fProof)
647 fProof->Close("S");
648 }
649
650 Terminate(0); // will not return from here....
651}
652
653////////////////////////////////////////////////////////////////////////////////
654/// Print the ProofServ logo on standard output.
655/// Return 0 on success, -1 on error
656
658{
659 char str[512];
660
661 if (IsMaster()) {
662 snprintf(str, 512, "**** Welcome to the PROOF server @ %s ****", gSystem->HostName());
663 } else {
664 snprintf(str, 512, "**** PROOF worker server @ %s started ****", gSystem->HostName());
665 }
666
667 if (fSocket->Send(str) != 1+static_cast<Int_t>(strlen(str))) {
668 Error("Setup", "failed to send proof server startup message");
669 return -1;
670 }
671
672 // Get client protocol
673 if ((fProtocol = gEnv->GetValue("ProofServ.ClientVersion", -1)) < 0) {
674 Error("Setup", "remote proof protocol missing");
675 return -1;
676 }
677
678 // The local user
679 fUser = gEnv->GetValue("ProofServ.Entity", "");
680 if (fUser.Length() >= 0) {
681 if (fUser.Contains(":"))
682 fUser.Remove(fUser.Index(":"));
683 if (fUser.Contains("@"))
684 fUser.Remove(fUser.Index("@"));
685 } else {
687 if (pw) {
688 fUser = pw->fUser;
689 delete pw;
690 }
691 }
692
693 // Work dir and ...
694 if (IsMaster()) {
695 TString cf = gEnv->GetValue("ProofServ.ProofConfFile", "");
696 if (cf.Length() > 0)
697 fConfFile = cf;
698 }
699 fWorkDir = gEnv->GetValue("ProofServ.Sandbox", Form("~/%s", kPROOF_WorkDir));
700
701 // Get Session tag
702 if ((fSessionTag = gEnv->GetValue("ProofServ.SessionTag", "-1")) == "-1") {
703 Error("Setup", "Session tag missing");
704 return -1;
705 }
706 // Get top session tag, i.e. the tag of the PROOF session
707 if ((fTopSessionTag = gEnv->GetValue("ProofServ.TopSessionTag", "-1")) == "-1") {
708 fTopSessionTag = "";
709 // Try to extract it from log file path (for backward compatibility)
710 if (gSystem->Getenv("ROOTPROOFLOGFILE")) {
711 fTopSessionTag = gSystem->DirName(gSystem->Getenv("ROOTPROOFLOGFILE"));
712 Ssiz_t lstl;
713 if ((lstl = fTopSessionTag.Last('/')) != kNPOS) fTopSessionTag.Remove(0, lstl + 1);
714 if (fTopSessionTag.BeginsWith("session-")) {
715 fTopSessionTag.Remove(0, strlen("session-"));
716 } else {
717 fTopSessionTag = "";
718 }
719 }
720 if (fTopSessionTag.IsNull()) {
721 Error("Setup", "top session tag missing");
722 return -1;
723 }
724 }
725
726 // Make sure the process ID is in the tag
727 TString spid = Form("-%d", gSystem->GetPid());
728 if (!fSessionTag.EndsWith(spid)) {
729 Int_t nd = 0;
730 if ((nd = fSessionTag.CountChar('-')) >= 2) {
731 Int_t id = fSessionTag.Index("-", fSessionTag.Index("-") + 1);
732 if (id != kNPOS) fSessionTag.Remove(id);
733 } else if (nd != 1) {
734 Warning("Setup", "Wrong number of '-' in session tag: protocol error? %s", fSessionTag.Data());
735 }
736 // Add this process ID
737 fSessionTag += spid;
738 }
739 if (gProofDebugLevel > 0)
740 Info("Setup", "session tags: %s, %s", fTopSessionTag.Data(), fSessionTag.Data());
741
742 // Get Session dir (sandbox)
743 if ((fSessionDir = gEnv->GetValue("ProofServ.SessionDir", "-1")) == "-1") {
744 Error("Setup", "Session dir missing");
745 return -1;
746 }
747
748 // Goto to the main PROOF working directory
749 char *workdir = gSystem->ExpandPathName(fWorkDir.Data());
750 fWorkDir = workdir;
751 delete [] workdir;
752 if (gProofDebugLevel > 0)
753 Info("Setup", "working directory set to %s", fWorkDir.Data());
754
755 // Common setup
756 if (SetupCommon() != 0) {
757 Error("Setup", "common setup failed");
758 return -1;
759 }
760
761 // Send packages off immediately to reduce latency
763
764 // Check every two hours if client is still alive
766
767 // Install SigPipe handler to handle kKeepAlive failure
768 gSystem->AddSignalHandler(new TXProofServSigPipeHandler(this));
769
770 // Install Termination handler
771 gSystem->AddSignalHandler(new TXProofServTerminationHandler(this));
772
773 // Install seg violation handler
774 gSystem->AddSignalHandler(new TXProofServSegViolationHandler(this));
775
776 if (gProofDebugLevel > 0)
777 Info("Setup", "successfully completed");
778
779 // Done
780 return 0;
781}
782
783////////////////////////////////////////////////////////////////////////////////
784/// Get list of workers to be used from now on.
785/// The list must be provided by the caller.
786
788 Int_t & /* prioritychange */,
789 Bool_t resume)
790{
792
793 // User config files, when enabled, override cluster-wide configuration
794 if (gEnv->GetValue("ProofServ.UseUserCfg", 0) != 0) {
795 Int_t pc = 1;
796 return TProofServ::GetWorkers(workers, pc);
797 }
798
799 // seqnum of the query for which we call getworkers
800 Bool_t dynamicStartup = gEnv->GetValue("Proof.DynamicStartup", kFALSE);
801 TString seqnum = (dynamicStartup) ? "" : XPD_GW_Static;
802 if (!fWaitingQueries->IsEmpty()) {
803 if (resume) {
804 seqnum += ((TProofQueryResult *)(fWaitingQueries->First()))->GetSeqNum();
805 } else {
806 seqnum += ((TProofQueryResult *)(fWaitingQueries->Last()))->GetSeqNum();
807 }
808 }
809 // Send request to the coordinator
810 TObjString *os = 0;
811 if (dynamicStartup) {
812 // We wait dynto seconds for the first worker to come; -1 means forever
813 Int_t dynto = gEnv->GetValue("Proof.DynamicStartupTimeout", -1);
814 Bool_t doto = (dynto > 0) ? kTRUE : kFALSE;
815 while (!(os = ((TXSocket *)fSocket)->SendCoordinator(kGetWorkers, seqnum.Data()))) {
816 if (doto > 0 && --dynto < 0) break;
817 // Another second
818 gSystem->Sleep(1000);
819 }
820 } else {
821 os = ((TXSocket *)fSocket)->SendCoordinator(kGetWorkers, seqnum.Data());
822 }
823
824 // The reply contains some information about the master (image, workdir)
825 // followed by the information about the workers; the tokens for each node
826 // are separated by '&'
827 if (os) {
828 TString fl(os->GetName());
830 SendAsynMessage("+++ Query cannot be processed now: enqueued");
831 return kQueryEnqueued;
832 }
833
834 // Honour a max number of workers request (typically when running in valgrind)
835 Int_t nwrks = -1;
836 Bool_t pernode = kFALSE;
837 if (gSystem->Getenv("PROOF_NWORKERS")) {
838 TString s(gSystem->Getenv("PROOF_NWORKERS"));
839 if (s.EndsWith("x")) {
840 pernode = kTRUE;
841 s.ReplaceAll("x", "");
842 }
843 if (s.IsDigit()) {
844 nwrks = s.Atoi();
845 if (!dynamicStartup && (nwrks > 0)) {
846 // Notify, except in dynamic workers mode to avoid flooding
847 TString msg;
848 if (pernode) {
849 msg.Form("+++ Starting max %d workers per node following the setting of PROOF_NWORKERS", nwrks);
850 } else {
851 msg.Form("+++ Starting max %d workers following the setting of PROOF_NWORKERS", nwrks);
852 }
853 SendAsynMessage(msg);
854 } else {
855 nwrks = -1;
856 }
857 } else {
858 pernode = kFALSE;
859 }
860 }
861
862 TString tok;
863 Ssiz_t from = 0;
864 TList *nodecnt = (pernode) ? new TList : 0 ;
865 if (fl.Tokenize(tok, from, "&")) {
866 if (!tok.IsNull()) {
867 TProofNodeInfo *master = new TProofNodeInfo(tok);
868 if (!master) {
869 Error("GetWorkers", "no appropriate master line got from coordinator");
870 return kQueryStop;
871 } else {
872 // Set image if not yet done and available
873 if (fImage.IsNull() && strlen(master->GetImage()) > 0)
874 fImage = master->GetImage();
875 SafeDelete(master);
876 }
877 // Now the workers
878 while (fl.Tokenize(tok, from, "&")) {
879 if (!tok.IsNull()) {
880 if (nwrks == -1 || nwrks > 0) {
881 // We have the minimal set of information to start
882 rc = kQueryOK;
883 if (pernode && nodecnt) {
884 TProofNodeInfo *ni = new TProofNodeInfo(tok);
885 TParameter<Int_t> *p = 0;
886 Int_t nw = 0;
887 if (!(p = (TParameter<Int_t> *) nodecnt->FindObject(ni->GetNodeName().Data()))) {
888 p = new TParameter<Int_t>(ni->GetNodeName().Data(), nw);
889 nodecnt->Add(p);
890 }
891 nw = p->GetVal();
892 if (gDebug > 0)
893 Info("GetWorkers","%p: name: %s (%s) val: %d (nwrks: %d)",
894 p, p->GetName(), ni->GetNodeName().Data(), nw, nwrks);
895 if (nw < nwrks) {
896 if (workers) workers->Add(ni);
897 nw++;
898 p->SetVal(nw);
899 } else {
900 // Two many workers on this machine already
901 SafeDelete(ni);
902 }
903 } else {
904 if (workers)
905 workers->Add(new TProofNodeInfo(tok));
906 // Count down
907 if (nwrks != -1) nwrks--;
908 }
909 } else {
910 // Release this worker (to cleanup the session list in the coordinator and get a fresh
911 // and correct list next call)
912 TProofNodeInfo *ni = new TProofNodeInfo(tok);
914 }
915 }
916 }
917 }
918 }
919 // Cleanup
920 if (nodecnt) {
921 nodecnt->SetOwner(kTRUE);
922 SafeDelete(nodecnt);
923 }
924 }
925
926 // We are done
927 return rc;
928}
929
930////////////////////////////////////////////////////////////////////////////////
931/// Handle error on the input socket
932
934{
935 // Try reconnection
936 if (fSocket && !fSocket->IsValid()) {
937
939 if (fSocket && fSocket->IsValid()) {
940 if (gDebug > 0)
941 Info("HandleError",
942 "%p: connection to local coordinator re-established", this);
943 FlushLogFile();
944 return kFALSE;
945 }
946 }
947 Printf("TXProofServ::HandleError: %p: got called ...", this);
948
949 // If master server, propagate interrupt to slaves
950 // (shutdown interrupt send internally).
951 if (IsMaster())
952 fProof->Close("S");
953
954 // Avoid communicating back anything to the coordinator (it is gone)
955 if (fSocket) ((TXSocket *)fSocket)->SetSessionID(-1);
956
957 Terminate(0);
958
959 Printf("TXProofServ::HandleError: %p: DONE ... ", this);
960
961 // We are done
962 return kTRUE;
963}
964
965////////////////////////////////////////////////////////////////////////////////
966/// Handle asynchronous input on the input socket
967
969{
970 if (gDebug > 2)
971 Printf("TXProofServ::HandleInput %p, in: %p", this, in);
972
973 XHandleIn_t *hin = (XHandleIn_t *) in;
974 Int_t acod = (hin) ? hin->fInt1 : kXPD_msg;
975
976 // Act accordingly
977 if (acod == kXPD_ping || acod == kXPD_interrupt) {
978 // Interrupt or Ping
980
981 } else if (acod == kXPD_flush) {
982 // Flush stdout, so that we can access the full log file
983 Info("HandleInput","kXPD_flush: flushing log file (stdout)");
984 fflush(stdout);
985
986 } else if (acod == kXPD_urgent) {
987 // Get type
988 Int_t type = hin->fInt2;
989 switch (type) {
991 {
992 // Abort or Stop ?
993 Bool_t abort = (hin->fInt3 != 0) ? kTRUE : kFALSE;
994 // Timeout
995 Int_t timeout = hin->fInt4;
996 // Act now
997 if (fProof)
998 fProof->StopProcess(abort, timeout);
999 else
1000 if (fPlayer)
1001 fPlayer->StopProcess(abort, timeout);
1002 }
1003 break;
1004 default:
1005 Info("HandleInput","kXPD_urgent: unknown type: %d", type);
1006 }
1007
1008 } else if (acod == kXPD_inflate) {
1009
1010 // Obsolete type
1011 Warning("HandleInput", "kXPD_inflate: obsolete message type");
1012
1013 } else if (acod == kXPD_priority) {
1014
1015 // The factor is the priority to be propagated
1016 fGroupPriority = hin->fInt2;
1017 if (fProof)
1019 // Notify
1020 Info("HandleInput", "kXPD_priority: group %s priority set to %f",
1021 fGroup.Data(), (Float_t) fGroupPriority / 100.);
1022
1023 } else if (acod == kXPD_clusterinfo) {
1024
1025 // Information about the cluster status
1026 fTotSessions = hin->fInt2;
1027 fActSessions = hin->fInt3;
1028 fEffSessions = (hin->fInt4)/1000.;
1029 // Notify
1030 Info("HandleInput", "kXPD_clusterinfo: tot: %d, act: %d, eff: %f",
1032
1033 } else {
1034 // Standard socket input
1036 // This request has been completed: remove the client ID from the pipe
1037 ((TXSocket *)fSocket)->RemoveClientID();
1038 }
1039
1040 // We are done
1041 return kTRUE;
1042}
1043
1044////////////////////////////////////////////////////////////////////////////////
1045/// Disable read timeout on the underlying socket
1046
1048{
1049 if (fSocket)
1050 ((TXSocket *)fSocket)->DisableTimeout();
1051}
1052
1053////////////////////////////////////////////////////////////////////////////////
1054/// Enable read timeout on the underlying socket
1055
1057{
1058 if (fSocket)
1059 ((TXSocket *)fSocket)->EnableTimeout();
1060}
1061
1062////////////////////////////////////////////////////////////////////////////////
1063/// Terminate the proof server.
1064
1066{
1067 if (fTerminated)
1068 // Avoid doubling the exit operations
1069 exit(1);
1071
1072 // Notify
1073 Info("Terminate", "starting session termination operations ...");
1074 if (fgLogToSysLog > 0) {
1075 TString s;
1076 s.Form("%s -1 %.3f %.3f", fgSysLogEntity.Data(), fRealTime, fCpuTime);
1077 gSystem->Syslog(kLogNotice, s.Data());
1078 }
1079
1080 // Notify the memory footprint
1081 ProcInfo_t pi;
1082 if (!gSystem->GetProcInfo(&pi)){
1083 Info("Terminate", "process memory footprint: %ld/%ld kB virtual, %ld/%ld kB resident ",
1084 pi.fMemVirtual, fgVirtMemMax, pi.fMemResident, fgResMemMax);
1085 }
1086
1087 // Deactivate current monitor, if any
1088 if (fProof)
1090
1091 // Cleanup session directory
1092 if (status == 0) {
1093 // make sure we remain in a "connected" directory
1095 // needed in case fSessionDir is on NFS ?!
1096 gSystem->MakeDirectory(fSessionDir+"/.delete");
1097 gSystem->Exec(Form("%s %s", kRM, fSessionDir.Data()));
1098 }
1099
1100 // Cleanup queries directory if empty
1101 if (IsMaster()) {
1102 if (!(fQMgr && fQMgr->Queries() && fQMgr->Queries()->GetSize())) {
1103 // make sure we remain in a "connected" directory
1105 // needed in case fQueryDir is on NFS ?!
1106 gSystem->MakeDirectory(fQueryDir+"/.delete");
1107 gSystem->Exec(Form("%s %s", kRM, fQueryDir.Data()));
1108 // Remove lock file
1109 if (fQueryLock)
1111 }
1112
1113 // Unlock the query dir owned by this session
1114 if (fQueryLock)
1115 fQueryLock->Unlock();
1116 } else {
1117 // Try to stop processing if any
1118 Bool_t abort = (status == 0) ? kFALSE : kTRUE;
1119 if (!fIdle && fPlayer)
1120 fPlayer->StopProcess(abort,1);
1121 gSystem->Sleep(2000);
1122 }
1123
1124 // Cleanup data directory if empty
1127 Info("Terminate", "data directory '%s' has been removed", fDataDir.Data());
1128 }
1129
1130 // Remove input and signal handlers to avoid spurious "signals"
1131 // for closing activities executed upon exit()
1133
1134 // Stop processing events (set a flag to exit the event loop)
1135 gSystem->ExitLoop();
1136
1137 // We post the pipe once to wake up the main thread which is waiting for
1138 // activity on this socket; this fake activity will make it return and
1139 // eventually exit the loop.
1141
1142 // Notify
1143 Printf("Terminate: termination operations ended: quitting!");
1144}
1145
1146////////////////////////////////////////////////////////////////////////////////
1147/// Try locking query area of session tagged sessiontag.
1148/// The id of the locking file is returned in fid and must be
1149/// unlocked via UnlockQueryFile(fid).
1150
1151Int_t TXProofServ::LockSession(const char *sessiontag, TProofLockPath **lck)
1152{
1153 // We do not need to lock our own session
1154 if (strstr(sessiontag, fTopSessionTag))
1155 return 0;
1156
1157 if (!lck) {
1158 Info("LockSession","locker space undefined");
1159 return -1;
1160 }
1161 *lck = 0;
1162
1163 // Check the format
1164 TString stag = sessiontag;
1165 TRegexp re("session-.*-.*-.*");
1166 Int_t i1 = stag.Index(re);
1167 if (i1 == kNPOS) {
1168 Info("LockSession","bad format: %s", sessiontag);
1169 return -1;
1170 }
1171 stag.ReplaceAll("session-","");
1172
1173 // Drop query number, if any
1174 Int_t i2 = stag.Index(":q");
1175 if (i2 != kNPOS)
1176 stag.Remove(i2);
1177
1178 // Make sure that parent process does not exist anylonger
1179 TString parlog = fSessionDir;
1180 parlog = parlog.Remove(parlog.Index("master-")+strlen("master-"));
1181 parlog += stag;
1182 if (!gSystem->AccessPathName(parlog)) {
1183 Info("LockSession","parent still running: do nothing");
1184 return -1;
1185 }
1186
1187 // Lock the query lock file
1188 TString qlock = fQueryLock->GetName();
1189 qlock.ReplaceAll(fTopSessionTag, stag);
1190
1191 if (!gSystem->AccessPathName(qlock)) {
1192 *lck = new TProofLockPath(qlock);
1193 if (((*lck)->Lock()) < 0) {
1194 Info("LockSession","problems locking query lock file");
1195 SafeDelete(*lck);
1196 return -1;
1197 }
1198 }
1199
1200 // We are done
1201 return 0;
1202}
1203
1204////////////////////////////////////////////////////////////////////////////////
1205/// Send message to intermediate coordinator to release worker of last ordinal
1206/// ord.
1207
1208void TXProofServ::ReleaseWorker(const char *ord)
1209{
1210 if (gDebug > 2) Info("ReleaseWorker","releasing: %s", ord);
1211
1212 ((TXSocket *)fSocket)->SendCoordinator(kReleaseWorker, ord);
1213}
#define SafeDelete(p)
Definition: RConfig.hxx:529
#define h(i)
Definition: RSha256.hxx:106
const Ssiz_t kNPOS
Definition: RtypesCore.h:111
int Int_t
Definition: RtypesCore.h:41
int Ssiz_t
Definition: RtypesCore.h:63
const Bool_t kFALSE
Definition: RtypesCore.h:88
long Long_t
Definition: RtypesCore.h:50
bool Bool_t
Definition: RtypesCore.h:59
float Float_t
Definition: RtypesCore.h:53
const Bool_t kTRUE
Definition: RtypesCore.h:87
#define ClassImp(name)
Definition: Rtypes.h:363
R__EXTERN Int_t gDebug
Definition: Rtypes.h:90
R__EXTERN TEnv * gEnv
Definition: TEnv.h:171
int type
Definition: TGX11.cxx:120
#define Printf
Definition: TGeoToOCC.h:18
#define gInterpreter
Definition: TInterpreter.h:538
#define PDB(mask, level)
Definition: TProofDebug.h:56
R__EXTERN Int_t gProofDebugLevel
Definition: TProofDebug.h:54
const char *const kRM
Definition: TProof.h:142
const char *const kPROOF_WorkDir
Definition: TProof.h:124
const Int_t kPROOF_Protocol
Definition: TProof.h:120
#define gROOT
Definition: TROOT.h:410
@ kKeepAlive
Definition: TSocket.h:41
@ kNoDelay
Definition: TSocket.h:43
char * Form(const char *fmt,...)
@ kSigTermination
@ kSigInterrupt
@ kSigSegmentationViolation
@ kReadPermission
Definition: TSystem.h:48
@ kWritePermission
Definition: TSystem.h:47
@ kLogNotice
Definition: TSystem.h:62
R__EXTERN TSystem * gSystem
Definition: TSystem.h:540
static volatile Int_t gProofServDebug
Definition: TXProofServ.cxx:60
TApplication * GetTXProofServ(Int_t *argc, char **argv, FILE *flog)
@ kReleaseWorker
@ kGetWorkers
const char *const XPD_GW_QueryEnqueued
@ kXPD_flush
@ kXPD_msg
@ kXPD_urgent
@ kXPD_clusterinfo
@ kXPD_interrupt
@ kXPD_ping
@ kXPD_inflate
@ kXPD_priority
const char *const XPD_GW_Static
#define NAME_DEBUG
#define EnvPutInt(name, val)
Definition: XrdClientEnv.hh:47
const char * proto
Definition: civetweb.c:16604
#define snprintf
Definition: civetweb.c:1540
This class creates the ROOT Application Environment that interfaces to the windowing system eventloop...
Definition: TApplication.h:39
virtual Long_t ProcessFile(const char *file, Int_t *error=0, Bool_t keep=kFALSE)
char ** Argv() const
Definition: TApplication.h:136
Bool_t NoLogOpt() const
Definition: TApplication.h:138
virtual Long_t ProcessLine(const char *line, Bool_t sync=kFALSE, Int_t *error=0)
Int_t Argc() const
Definition: TApplication.h:135
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Bool_t IsEmpty() const
Definition: TCollection.h:186
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Definition: TCollection.h:182
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition: TEnv.cxx:491
virtual void SetValue(const char *name, const char *value, EEnvLevel level=kEnvChange, const char *type=0)
Set the value of a resource or create a new resource.
Definition: TEnv.cxx:736
virtual Bool_t Notify()
Notify when event occurred on descriptor associated with this handler.
virtual Bool_t ReadNotify()
Notify when something can be read from the descriptor associated with this handler.
A doubly linked list.
Definition: TList.h:44
virtual void Add(TObject *obj)
Definition: TList.h:87
virtual TObject * FindObject(const char *name) const
Delete a TObjLink object.
Definition: TList.cxx:574
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
Definition: TList.cxx:689
virtual void Delete(Option_t *option="")
Remove all objects from the list AND delete all heap based objects.
Definition: TList.cxx:467
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition: TList.cxx:655
virtual void SetTitle(const char *title="")
Set the title of the TNamed.
Definition: TNamed.cxx:164
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:47
Collectable string class.
Definition: TObjString.h:28
const char * GetName() const
Returns name of object.
Definition: TObjString.h:39
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:866
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:880
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:854
Named parameter, streamable and storable.
Definition: TParameter.h:37
void SetVal(const AParamType &val)
Definition: TParameter.h:71
const AParamType & GetVal() const
Definition: TParameter.h:69
const char * GetName() const
Returns name of object.
Definition: TParameter.h:68
This class implements a plugin library manager.
TPluginHandler * FindHandler(const char *base, const char *uri=0)
Returns the handler if there exists a handler for the specified URI.
Int_t Unlock()
Unlock the directory.
The purpose of this class is to provide a complete node description for masters, submasters and worke...
const TString & GetImage() const
const TString & GetOrdinal() const
const TString & GetNodeName() const
TQueryResult version adapted to PROOF neeeds.
Class providing the PROOF server.
Definition: TProofServ.h:66
Float_t fCpuTime
Definition: TProofServ.h:114
TString fQueryDir
Definition: TProofServ.h:88
Int_t fProtocol
Definition: TProofServ.h:103
void Interrupt()
Definition: TProofServ.h:291
Int_t CatMotd()
Print message of the day (in the file pointed by the env PROOFMOTD or from fConfDir/etc/proof/motd).
Int_t fLogLevel
Definition: TProofServ.h:107
FILE * fLogFile
Definition: TProofServ.h:100
virtual EQueryAction GetWorkers(TList *workers, Int_t &prioritychange, Bool_t resume=kFALSE)
Get list of workers to be used from now on.
TString fDataDir
Definition: TProofServ.h:90
static Long_t fgResMemMax
Definition: TProofServ.h:160
TString fSessionDir
Definition: TProofServ.h:85
TString fAdminPath
Definition: TProofServ.h:92
void FlushLogFile()
Reposition the read pointer in the log file to the very end.
TString fService
Definition: TProofServ.h:76
Int_t fLogFileDes
Definition: TProofServ.h:101
TProofLockPath * fQueryLock
Definition: TProofServ.h:95
void RedirectOutput(const char *dir=0, const char *mode="w")
Redirect stdout to a log file.
Bool_t fMasterServ
Definition: TProofServ.h:111
TSocket * fSocket
Definition: TProofServ.h:97
Int_t fCompressMsg
Definition: TProofServ.h:142
Int_t SetupCommon()
Common part (between TProofServ and TXProofServ) of the setup phase.
void SendAsynMessage(const char *msg, Bool_t lf=kTRUE)
Send an asychronous message to the master / client .
TVirtualProofPlayer * fPlayer
Definition: TProofServ.h:99
TString fSessionTag
Definition: TProofServ.h:83
Bool_t IsMaster() const
Definition: TProofServ.h:293
TProof * fProof
Definition: TProofServ.h:98
TString fOrdinal
Definition: TProofServ.h:104
Bool_t fEndMaster
Definition: TProofServ.h:110
virtual void HandleSocketInput()
Handle input coming from the client or from the master server.
TString fWorkDir
Definition: TProofServ.h:81
TShutdownTimer * fShutdownTimer
Definition: TProofServ.h:138
Int_t fActSessions
Definition: TProofServ.h:121
TString fGroup
Definition: TProofServ.h:78
Bool_t fRealTimeLog
Definition: TProofServ.h:136
static Long_t fgVirtMemMax
Definition: TProofServ.h:159
TString fImage
Definition: TProofServ.h:82
@ kQueryEnqueued
Definition: TProofServ.h:73
TString fTopSessionTag
Definition: TProofServ.h:84
Float_t fEffSessions
Definition: TProofServ.h:122
TString fConfDir
Definition: TProofServ.h:79
TIdleTOTimer * fIdleTOTimer
Definition: TProofServ.h:140
static Int_t fgLogToSysLog
Definition: TProofServ.h:172
Bool_t UnlinkDataDir(const char *path)
Scan recursively the datadir and unlink it if empty Return kTRUE if it can be unlinked,...
friend class TXProofServ
Definition: TProofServ.h:69
TString fConfFile
Definition: TProofServ.h:80
Int_t fTotSessions
Definition: TProofServ.h:120
TString fUser
Definition: TProofServ.h:77
TQueryResultManager * fQMgr
Definition: TProofServ.h:126
TList * fWaitingQueries
Definition: TProofServ.h:128
static TString fgSysLogEntity
Definition: TProofServ.h:174
void LogToMaster(Bool_t on=kTRUE)
Definition: TProofServ.h:322
Int_t fGroupPriority
Definition: TProofServ.h:109
virtual void SendLogFile(Int_t status=0, Int_t start=-1, Int_t end=-1)
Send log file to master.
Float_t fRealTime
Definition: TProofServ.h:113
Int_t UpdateSessionStatus(Int_t xst=-1)
Update the session status in the relevant file.
Bool_t fIdle
Definition: TProofServ.h:129
This class controls a Parallel ROOT Facility, PROOF, cluster.
Definition: TProof.h:316
Int_t BroadcastGroupPriority(const char *grp, Int_t priority, ESlaves list=kAllUnique)
Broadcast the group priority to all workers in the specified list.
Definition: TProof.cxx:2429
@ kShutdownInterrupt
Definition: TProof.h:398
@ kHardInterrupt
Definition: TProof.h:396
@ kPing
Definition: TProof.h:395
@ kSoftInterrupt
Definition: TProof.h:397
Bool_t IsEndMaster() const
Definition: TProof.h:664
void Close(Option_t *option="")
Close all open slave servers.
Definition: TProof.cxx:1776
TList * fActiveSlaves
Definition: TProof.h:477
void InterruptCurrentMonitor()
If in active in a monitor set ready state.
Definition: TProof.cxx:11307
Bool_t IsValid() const
Definition: TProof.h:937
Int_t Collect(const TSlave *sl, Long_t timeout=-1, Int_t endtype=-1, Bool_t deactonfail=kFALSE)
Collect responses from slave sl.
Definition: TProof.cxx:2647
void SetActive(Bool_t=kTRUE)
Definition: TProof.h:988
void SetMonitor(TMonitor *mon=0, Bool_t on=kTRUE)
Activate (on == TRUE) or deactivate (on == FALSE) all sockets monitored by 'mon'.
Definition: TProof.cxx:2386
@ kActive
Definition: TProof.h:564
Int_t Ping(ESlaves list)
Ping PROOF slaves. Returns the number of slaves that responded.
Definition: TProof.cxx:4724
void StopProcess(Bool_t abort, Int_t timeout=-1)
Send STOPPROCESS message to master and workers.
Definition: TProof.cxx:6196
void Interrupt(EUrgent type, ESlaves list=kActive)
Send interrupt to master or slave servers.
Definition: TProof.cxx:2254
virtual void SaveWorkerInfo()
Save information about the worker set in the file .workers in the working dir.
Definition: TProof.cxx:11780
TList * Queries() const
static const char * GetMacroPath()
Get macro search path. Static utility function.
Definition: TROOT.cxx:2764
Regular expression class.
Definition: TRegexp.h:31
virtual Bool_t Notify()
Notify when signal occurs.
virtual Int_t SetOption(ESockOptions opt, Int_t val)
Set socket options.
Definition: TSocket.cxx:1012
virtual Int_t GetDescriptor() const
Definition: TSocket.h:132
void SetCompressionSettings(Int_t settings=ROOT::RCompressionSetting::EDefaults::kUseGeneralPurpose)
Used to specify the compression level and algorithm: settings = 100 * algorithm + level.
Definition: TSocket.cxx:1097
virtual Bool_t IsValid() const
Definition: TSocket.h:152
virtual Int_t Reconnect()
Definition: TSocket.h:158
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:522
Basic string class.
Definition: TString.h:131
Ssiz_t Length() const
Definition: TString.h:405
TString & Insert(Ssiz_t pos, const char *s)
Definition: TString.h:644
Bool_t EndsWith(const char *pat, ECaseCompare cmp=kExact) const
Return true if string ends with the specified string.
Definition: TString.cxx:2152
const char * Data() const
Definition: TString.h:364
TString & ReplaceAll(const TString &s1, const TString &s2)
Definition: TString.h:687
Ssiz_t Last(char c) const
Find last occurrence of a character c.
Definition: TString.cxx:876
TObjArray * Tokenize(const TString &delim) const
This function is used to isolate sequential tokens in a TString.
Definition: TString.cxx:2172
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
Definition: TString.h:610
Bool_t IsNull() const
Definition: TString.h:402
Int_t CountChar(Int_t c) const
Return number of times character c occurs in the string.
Definition: TString.cxx:464
TString & Remove(Ssiz_t pos)
Definition: TString.h:668
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition: TString.cxx:2264
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition: TString.h:619
Ssiz_t Index(const char *pat, Ssiz_t i=0, ECaseCompare cmp=kExact) const
Definition: TString.h:634
virtual void AddFileHandler(TFileHandler *fh)
Add a file handler to the list of system file handlers.
Definition: TSystem.cxx:563
virtual void Syslog(ELogLevel level, const char *mess)
Send mess to syslog daemon.
Definition: TSystem.cxx:1673
static void ResetErrno()
Static function resetting system error number.
Definition: TSystem.cxx:285
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
Definition: TSystem.cxx:1264
static Int_t GetErrno()
Static function returning system error number.
Definition: TSystem.cxx:269
virtual const char * DirName(const char *pathname)
Return the directory name in pathname.
Definition: TSystem.cxx:1013
virtual int GetPid()
Get process id.
Definition: TSystem.cxx:716
virtual const char * Getenv(const char *env)
Get environment variable.
Definition: TSystem.cxx:1652
virtual int MakeDirectory(const char *name)
Make a directory.
Definition: TSystem.cxx:834
virtual Int_t Exec(const char *shellcmd)
Execute a command.
Definition: TSystem.cxx:662
virtual TFileHandler * RemoveFileHandler(TFileHandler *fh)
Remove a file handler from the list of file handlers.
Definition: TSystem.cxx:573
virtual Bool_t AccessPathName(const char *path, EAccessMode mode=kFileExists)
Returns FALSE if one can access a file using the specified access mode.
Definition: TSystem.cxx:1286
virtual void ExitLoop()
Exit from event loop.
Definition: TSystem.cxx:401
virtual Bool_t ChangeDirectory(const char *path)
Change directory.
Definition: TSystem.cxx:869
virtual int GetProcInfo(ProcInfo_t *info) const
Returns cpu and memory used by this process into the ProcInfo_t structure.
Definition: TSystem.cxx:2532
virtual void AddSignalHandler(TSignalHandler *sh)
Add a signal handler to list of system signal handlers.
Definition: TSystem.cxx:541
virtual const char * HostName()
Return the system's host name.
Definition: TSystem.cxx:312
virtual void Sleep(UInt_t milliSec)
Sleep milliSec milli seconds.
Definition: TSystem.cxx:446
virtual char * Which(const char *search, const char *file, EAccessMode mode=kFileExists)
Find location of file in a search path.
Definition: TSystem.cxx:1536
virtual int Unlink(const char *name)
Unlink, i.e.
Definition: TSystem.cxx:1371
virtual UserGroup_t * GetUserInfo(Int_t uid)
Returns all user info in the UserGroup_t structure.
Definition: TSystem.cxx:1588
virtual void Start(Long_t milliSec=-1, Bool_t singleShot=kFALSE)
Starts the timer with a milliSec timeout.
Definition: TTimer.cxx:211
virtual void StopProcess(Bool_t abort, Int_t timeout=-1)=0
This class implements the XProofD version of TProofServ, with respect to which it differs only for th...
Definition: TXProofServ.h:30
void EnableTimeout()
Enable read timeout on the underlying socket.
TString fSockPath
Definition: TXProofServ.h:35
Int_t Setup()
Print the ProofServ logo on standard output.
void Terminate(Int_t status)
Terminate the proof server.
void HandleTermination()
Called when the client is not alive anymore; terminate the session.
Bool_t fTerminated
Definition: TXProofServ.h:37
EQueryAction GetWorkers(TList *workers, Int_t &prioritychange, Bool_t resume=kFALSE)
Get list of workers to be used from now on.
virtual ~TXProofServ()
Cleanup.
void DisableTimeout()
Disable read timeout on the underlying socket.
void HandleSigPipe()
Called when the client is not alive anymore; terminate the session.
Bool_t HandleError(const void *in=0)
Handle error on the input socket.
Bool_t HandleInput(const void *in=0)
Handle asynchronous input on the input socket.
TXProofServInterruptHandler * fInterruptHandler
Definition: TXProofServ.h:33
void ReleaseWorker(const char *ord)
Send message to intermediate coordinator to release worker of last ordinal ord.
Int_t LockSession(const char *sessiontag, TProofLockPath **lck)
Try locking query area of session tagged sessiontag.
TXSocketHandler * fInputHandler
Definition: TXProofServ.h:34
Int_t CreateServer()
Finalize the server setup.
void HandleUrgentData()
Handle high priority data sent by the master or client.
Int_t Post(TSocket *s)
Write a byte to the global pipe to signal new availibility of new messages.
Definition: TXSocket.cxx:2284
static TXSocketHandler * GetSocketHandler(TFileHandler *h=0, TSocket *s=0)
Get an instance of the input socket handler with 'h' as handler, connected to socket 's'.
High level handler of connections to XProofD.
Definition: TXSocket.h:59
static void SetLocation(const char *loc="")
Set location string.
Definition: TXSocket.cxx:242
@ kStopProcess
Definition: TXSocket.h:139
static TXSockPipe fgPipe
Definition: TXSocket.h:111
Implementation of TXSocket using PF_UNIX sockets.
Definition: TXUnixSocket.h:29
static constexpr double s
static constexpr double pc
static constexpr double pi
TString fUser
Definition: TSystem.h:142
Int_t fInt2
Definition: TXSocket.h:50
Int_t fInt4
Definition: TXSocket.h:52
Int_t fInt1
Definition: TXSocket.h:49
Int_t fInt3
Definition: TXSocket.h:51