Logo ROOT  
Reference Guide
TXProofServ.cxx
Go to the documentation of this file.
1// @(#)root/proofx:$Id$
2// Author: Gerardo Ganis 12/12/2005
3
4/*************************************************************************
5 * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/** \class TXProofServ
13\ingroup proofx
14
15This class implements the XProofD version of TProofServ, with respect to which it differs only
16for the underlying connection technology.
17
18*/
19
20#include "RConfigure.h"
21#include <ROOT/RConfig.hxx>
22#include "Riostream.h"
23
24#ifdef WIN32
25 #include <io.h>
26 typedef long off_t;
27#endif
28#include <sys/types.h>
29#include <netinet/in.h>
30#include <utime.h>
31
32#include "TXProofServ.h"
33#include "TObjString.h"
34#include "TEnv.h"
35#include "TError.h"
36#include "TException.h"
37#include "THashList.h"
38#include "TInterpreter.h"
39#include "TParameter.h"
40#include "TProofDebug.h"
41#include "TProof.h"
42#include "TVirtualProofPlayer.h"
43#include "TQueryResultManager.h"
44#include "TRegexp.h"
45#include "TClass.h"
46#include "TROOT.h"
47#include "TSystem.h"
48#include "TPluginManager.h"
49#include "TXSocketHandler.h"
50#include "TXUnixSocket.h"
51#include "compiledata.h"
52#include "TProofNodeInfo.h"
53#include "XProofProtocol.h"
54
57
58
59// debug hook
60static volatile Int_t gProofServDebug = 1;
61
62//----- SigPipe signal handler -------------------------------------------------
63////////////////////////////////////////////////////////////////////////////////
64
65class TXProofServSigPipeHandler : public TSignalHandler {
66 TXProofServ *fServ;
67public:
68 TXProofServSigPipeHandler(TXProofServ *s) : TSignalHandler(kSigInterrupt, kFALSE)
69 { fServ = s; }
70 Bool_t Notify();
71};
72
73////////////////////////////////////////////////////////////////////////////////
74
75Bool_t TXProofServSigPipeHandler::Notify()
76{
77 fServ->HandleSigPipe();
78 return kTRUE;
79}
80
81//----- Termination signal handler ---------------------------------------------
82////////////////////////////////////////////////////////////////////////////////
83
84class TXProofServTerminationHandler : public TSignalHandler {
85 TXProofServ *fServ;
86public:
87 TXProofServTerminationHandler(TXProofServ *s)
88 : TSignalHandler(kSigTermination, kFALSE) { fServ = s; }
89 Bool_t Notify();
90};
91
92////////////////////////////////////////////////////////////////////////////////
93
94Bool_t TXProofServTerminationHandler::Notify()
95{
96 Printf("Received SIGTERM: terminating");
97
98 fServ->HandleTermination();
99 return kTRUE;
100}
101
102//----- Seg violation signal handler ---------------------------------------------
103////////////////////////////////////////////////////////////////////////////////
104
105class TXProofServSegViolationHandler : public TSignalHandler {
106 TXProofServ *fServ;
107public:
108 TXProofServSegViolationHandler(TXProofServ *s)
110 Bool_t Notify();
111};
112
113////////////////////////////////////////////////////////////////////////////////
114
115Bool_t TXProofServSegViolationHandler::Notify()
116{
117 Printf("**** ");
118 Printf("**** Segmentation violation: terminating ****");
119 Printf("**** ");
120 fServ->HandleTermination();
121 return kTRUE;
122}
123
124//----- Input handler for messages from parent or master -----------------------
125////////////////////////////////////////////////////////////////////////////////
126
127class TXProofServInputHandler : public TFileHandler {
128 TXProofServ *fServ;
129public:
130 TXProofServInputHandler(TXProofServ *s, Int_t fd) : TFileHandler(fd, 1)
131 { fServ = s; }
132 Bool_t Notify();
133 Bool_t ReadNotify() { return Notify(); }
134};
135
136////////////////////////////////////////////////////////////////////////////////
137
138Bool_t TXProofServInputHandler::Notify()
139{
140 fServ->HandleSocketInput();
141 // This request has been completed: remove the client ID from the pipe
142 ((TXUnixSocket *) fServ->GetSocket())->RemoveClientID();
143 return kTRUE;
144}
145
147
148// Hook to the constructor. This is needed to avoid using the plugin manager
149// which may create problems in multi-threaded environments.
150extern "C" {
151 TApplication *GetTXProofServ(Int_t *argc, char **argv, FILE *flog)
152 { return new TXProofServ(argc, argv, flog); }
153}
154
155////////////////////////////////////////////////////////////////////////////////
156/// Main constructor
157
158TXProofServ::TXProofServ(Int_t *argc, char **argv, FILE *flog)
159 : TProofServ(argc, argv, flog)
160{
162 fInputHandler = 0;
164
165 // TODO:
166 // Int_t useFIFO = 0;
167/* if (GetParameter(fProof->GetInputList(), "PROOF_UseFIFO", useFIFO) != 0) {
168 if (useFIFO == 1)
169 Info("", "enablig use of FIFO (if allowed by the server)");
170 else
171 Warning("", "unsupported strategy index (%d): ignore", strategy);
172 }
173*/
174}
175
176////////////////////////////////////////////////////////////////////////////////
177/// Finalize the server setup. If master, create the TProof instance to talk
178/// the worker or submaster nodes.
179/// Return 0 on success, -1 on error
180
182{
183 Bool_t xtest = (Argc() > 3 && !strcmp(Argv(3), "test")) ? kTRUE : kFALSE;
184
185 if (gProofDebugLevel > 0)
186 Info("CreateServer", "starting%s server creation", (xtest ? " test" : ""));
187
188 // Get file descriptor for log file
189 if (fLogFile) {
190 // Use the file already open by pmain
191 if ((fLogFileDes = fileno(fLogFile)) < 0) {
192 Error("CreateServer", "resolving the log file description number");
193 return -1;
194 }
195 // Hide the session start-up logs unless we are in verbose mode
196 if (gProofDebugLevel <= 0)
197 lseek(fLogFileDes, (off_t) 0, SEEK_END);
198 }
199
200 // Global location string in TXSocket
201 TXSocket::SetLocation((IsMaster()) ? "master" : "slave");
202
203 // Set debug level in XrdClient
204 EnvPutInt(NAME_DEBUG, gEnv->GetValue("XNet.Debug", 0));
205
206 // Get socket to be used to call back our xpd
207 if (xtest) {
208 // test session, just send the protocol version on the open pipe
209 // and exit
210 if (!(fSockPath = gSystem->Getenv("ROOTOPENSOCK"))) {
211 Error("CreateServer", "test: socket setup by xpd undefined");
212 return -1;
213 }
214 Int_t fpw = (Int_t) strtol(fSockPath.Data(), 0, 10);
215 int proto = htonl(kPROOF_Protocol);
216 fSockPath = "";
217 if (write(fpw, &proto, sizeof(proto)) != sizeof(proto)) {
218 Error("CreateServer", "test: sending protocol number");
219 return -1;
220 }
221 exit(0);
222 } else {
223 fSockPath = gEnv->GetValue("ProofServ.OpenSock", "");
224 if (fSockPath.Length() <= 0) {
225 Error("CreateServer", "socket setup by xpd undefined");
226 return -1;
227 }
228 TString entity = gEnv->GetValue("ProofServ.Entity", "");
229 if (entity.Length() > 0)
230 fSockPath.Insert(0,Form("%s/", entity.Data()));
231 }
232
233 // Get open socket descriptor, if any
234 Int_t sockfd = -1;
235 const char *opensock = gSystem->Getenv("ROOTOPENSOCK");
236 if (opensock && strlen(opensock) > 0) {
238 sockfd = (Int_t) strtol(opensock, 0, 10);
239 if (TSystem::GetErrno() == ERANGE) {
240 sockfd = -1;
241 Warning("CreateServer", "socket descriptor: wrong conversion from '%s'", opensock);
242 }
243 if (sockfd > 0 && gProofDebugLevel > 0)
244 Info("CreateServer", "using open connection (descriptor %d)", sockfd);
245 }
246
247 // Get the sessions ID
248 Int_t psid = gEnv->GetValue("ProofServ.SessionID", -1);
249 if (psid < 0) {
250 Error("CreateServer", "Session ID undefined");
251 return -1;
252 }
253
254 // Call back the server
255 fSocket = new TXUnixSocket(fSockPath, psid, -1, this, sockfd);
256 if (!fSocket || !(fSocket->IsValid())) {
257 Error("CreateServer", "Failed to open connection to XrdProofd coordinator");
258 return -1;
259 }
260 // Set compression level, if any
262
263 // Set the title for debugging
264 TString tgt("client");
265 if (fOrdinal != "0") {
266 tgt = fOrdinal;
267 if (tgt.Last('.') != kNPOS) tgt.Remove(tgt.Last('.'));
268 }
269 fSocket->SetTitle(tgt);
270
271 // Set the this as reference of this socket
272 ((TXSocket *)fSocket)->fReference = this;
273
274 // Get socket descriptor
275 Int_t sock = fSocket->GetDescriptor();
276
277 // Install message input handlers
279 TXSocketHandler::GetSocketHandler(new TXProofServInputHandler(this, sock), fSocket);
281
282 // Get the client ID
283 Int_t cid = gEnv->GetValue("ProofServ.ClientID", -1);
284 if (cid < 0) {
285 Error("CreateServer", "Client ID undefined");
286 SendLogFile();
287 return -1;
288 }
289 ((TXSocket *)fSocket)->SetClientID(cid);
290
291 // debug hooks
292 if (IsMaster()) {
293 // wait (loop) in master to allow debugger to connect
294 if (gEnv->GetValue("Proof.GdbHook",0) == 1) {
295 while (gProofServDebug)
296 ;
297 }
298 } else {
299 // wait (loop) in slave to allow debugger to connect
300 if (gEnv->GetValue("Proof.GdbHook",0) == 2) {
301 while (gProofServDebug)
302 ;
303 }
304 }
305
306 if (gProofDebugLevel > 0)
307 Info("CreateServer", "Service: %s, ConfDir: %s, IsMaster: %d",
309
310 if (Setup() == -1) {
311 // Setup failure
312 LogToMaster();
313 SendLogFile();
314 Terminate(0);
315 return -1;
316 }
317
318 if (!fLogFile) {
320 // If for some reason we failed setting a redirection file for the logs
321 // we cannot continue
322 if (!fLogFile || (fLogFileDes = fileno(fLogFile)) < 0) {
323 LogToMaster();
324 SendLogFile(-98);
325 Terminate(0);
326 return -1;
327 }
328 }
329
330 // Send message of the day to the client
331 if (IsMaster()) {
332 if (CatMotd() == -1) {
333 LogToMaster();
334 SendLogFile(-99);
335 Terminate(0);
336 return -1;
337 }
338 }
339
340 // Everybody expects iostream to be available, so load it...
341 ProcessLine("#include <iostream>", kTRUE);
342 ProcessLine("#include <string>",kTRUE); // for std::string iostream.
343
344 // Load user functions
345 const char *logon;
346 logon = gEnv->GetValue("Proof.Load", (char *)0);
347 if (logon) {
348 char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
349 if (mac)
350 ProcessLine(Form(".L %s", logon), kTRUE);
351 delete [] mac;
352 }
353
354 // Execute logon macro
355 logon = gEnv->GetValue("Proof.Logon", (char *)0);
356 if (logon && !NoLogOpt()) {
357 char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
358 if (mac)
359 ProcessFile(logon);
360 delete [] mac;
361 }
362
363 // Save current interpreter context
364 gInterpreter->SaveContext();
365 gInterpreter->SaveGlobalsContext();
366
367 // if master, start slave servers
368 if (IsMaster()) {
369 TString master;
370
371 if (fConfFile.BeginsWith("lite:")) {
372 master = "lite://";
373 } else {
374 master.Form("proof://%s@__master__", fUser.Data());
375
376 // Add port, if defined
377 Int_t port = gEnv->GetValue("ProofServ.XpdPort", -1);
378 if (port > -1) {
379 master += ":";
380 master += port;
381 }
382 }
383
384 // Make sure that parallel startup via threads is not active
385 // (it is broken for xpd because of the locks on gInterpreterMutex)
386 gEnv->SetValue("Proof.ParallelStartup", 0);
387
388 // Get plugin manager to load appropriate TProof from
389 TPluginManager *pm = gROOT->GetPluginManager();
390 if (!pm) {
391 Error("CreateServer", "no plugin manager found");
392 SendLogFile(-99);
393 Terminate(0);
394 return -1;
395 }
396
397 // Find the appropriate handler
398 TPluginHandler *h = pm->FindHandler("TProof", fConfFile);
399 if (!h) {
400 Error("CreateServer", "no plugin found for TProof with a"
401 " config file of '%s'", fConfFile.Data());
402 SendLogFile(-99);
403 Terminate(0);
404 return -1;
405 }
406
407 // load the plugin
408 if (h->LoadPlugin() == -1) {
409 Error("CreateServer", "plugin for TProof could not be loaded");
410 SendLogFile(-99);
411 Terminate(0);
412 return -1;
413 }
414
415 // Make instance of TProof
416 if (fConfFile.BeginsWith("lite:")) {
417 // Remove input and signal handlers to avoid spurious "signals"
418 // during startup
420 fProof = reinterpret_cast<TProof*>(h->ExecPlugin(6, master.Data(),
421 0, 0,
422 fLogLevel,
423 fSessionDir.Data(), 0));
424 // Re-enable input and signal handlers
426 } else {
427 fProof = reinterpret_cast<TProof*>(h->ExecPlugin(5, master.Data(),
428 fConfFile.Data(),
429 fConfDir.Data(),
430 fLogLevel,
432 }
433
434 // Save worker info
436
437 if (!fProof || (fProof && !fProof->IsValid())) {
438 Error("CreateServer", "plugin for TProof could not be executed");
439 FlushLogFile();
440 delete fProof;
441 fProof = 0;
442 SendLogFile(-99);
443 Terminate(0);
444 return -1;
445 }
446 // Find out if we are a master in direct contact only with workers
448
449 SendLogFile();
450 }
451
452 // Setup the shutdown timer
453 if (!fShutdownTimer) {
454 // Check activity on socket every 5 mins
455 fShutdownTimer = new TShutdownTimer(this, 300000);
457 }
458
459 // Check if schema evolution is effective: clients running versions <=17 do not
460 // support that: send a warning message
461 if (fProtocol <= 17) {
462 TString msg;
463 msg.Form("Warning: client version is too old: automatic schema evolution is ineffective.\n"
464 " This may generate compatibility problems between streamed objects.\n"
465 " The advise is to move to ROOT >= 5.21/02 .");
466 SendAsynMessage(msg.Data());
467 }
468
469 // Setup the idle timer
470 if (IsMaster() && !fIdleTOTimer) {
471 // Check activity on socket every 5 mins
472 Int_t idle_to = gEnv->GetValue("ProofServ.IdleTimeout", -1);
473 if (idle_to > 0) {
474 fIdleTOTimer = new TIdleTOTimer(this, idle_to * 1000);
476 if (gProofDebugLevel > 0)
477 Info("CreateServer", " idle timer started (%d secs)", idle_to);
478 } else if (gProofDebugLevel > 0) {
479 Info("CreateServer", " idle timer not started (no idle timeout requested)");
480 }
481 }
482
483 // Done
484 return 0;
485}
486
487////////////////////////////////////////////////////////////////////////////////
488/// Cleanup. Not really necessary since after this dtor there is no
489/// live anyway.
490
492{
493 delete fSocket;
494}
495
496////////////////////////////////////////////////////////////////////////////////
497/// Handle high priority data sent by the master or client.
498
500{
501 // Real-time notification of messages
503
504 // Get interrupt
505 Bool_t fw = kFALSE;
506 Int_t iLev = ((TXSocket *)fSocket)->GetInterrupt(fw);
507 if (iLev < 0) {
508 Error("HandleUrgentData", "error receiving interrupt");
509 return;
510 }
511
512 PDB(kGlobal, 2)
513 Info("HandleUrgentData", "got interrupt: %d\n", iLev);
514
515 if (fProof)
516 fProof->SetActive();
517
518 switch (iLev) {
519
520 case TProof::kPing:
521 PDB(kGlobal, 2)
522 Info("HandleUrgentData", "*** Ping");
523
524 // If master server, propagate interrupt to slaves
525 if (fw && IsMaster()) {
527 if (nbad > 0) {
528 Info("HandleUrgentData","%d slaves did not reply to ping",nbad);
529 }
530 }
531
532 // Touch the admin path to show we are alive
533 if (fAdminPath.IsNull()) {
534 fAdminPath = gEnv->GetValue("ProofServ.AdminPath", "");
535 }
536
537 if (!fAdminPath.IsNull()) {
538 if (!fAdminPath.EndsWith(".status")) {
539 // Update file time stamps
540 if (utime(fAdminPath.Data(), 0) != 0)
541 Info("HandleUrgentData", "problems touching path: %s", fAdminPath.Data());
542 else
543 PDB(kGlobal, 2)
544 Info("HandleUrgentData", "touching path: %s", fAdminPath.Data());
545 } else {
546 // Update the status in the file
547 // 0 idle
548 // 1 running
549 // 2 being terminated (currently unused)
550 // 3 queued
551 // 4 idle timed-out
552 Int_t uss_rc = UpdateSessionStatus(-1);
553 if (uss_rc != 0)
554 Error("HandleUrgentData", "problems updating status path: %s (errno: %d)", fAdminPath.Data(), -uss_rc);
555 }
556 } else {
557 Info("HandleUrgentData", "admin path undefined");
558 }
559
560 break;
561
563 Info("HandleUrgentData", "*** Hard Interrupt");
564
565 // If master server, propagate interrupt to slaves
566 if (fw && IsMaster())
568
569 // Flush input socket
570 ((TXSocket *)fSocket)->Flush();
571
572 if (IsMaster())
573 SendLogFile();
574
575 break;
576
578 Info("HandleUrgentData", "Soft Interrupt");
579
580 // If master server, propagate interrupt to slaves
581 if (fw && IsMaster())
583
584 Interrupt();
585
586 if (IsMaster())
587 SendLogFile();
588
589 break;
590
591
593 Info("HandleUrgentData", "Shutdown Interrupt");
594
595 // When returning for here connection are closed
597
598 break;
599
600 default:
601 Error("HandleUrgentData", "unexpected type: %d", iLev);
602 break;
603 }
604
605
607}
608
609////////////////////////////////////////////////////////////////////////////////
610/// Called when the client is not alive anymore; terminate the session.
611
613{
614 // Real-time notification of messages
615
616 Info("HandleSigPipe","got sigpipe ... do nothing");
617}
618
619////////////////////////////////////////////////////////////////////////////////
620/// Called when the client is not alive anymore; terminate the session.
621
623{
624 // If master server, propagate interrupt to slaves
625 // (shutdown interrupt send internally).
626 if (IsMaster()) {
627
628 // If not idle, try first to stop processing
629 if (!fIdle) {
630 // Remove pending requests
632 // Interrupt the current monitor
634 // Do not wait for ever, but al least 20 seconds
635 Long_t timeout = gEnv->GetValue("Proof.ShutdownTimeout", 60);
636 timeout = (timeout > 20) ? timeout : 20;
637 // Processing will be aborted
638 fProof->StopProcess(kTRUE, (Long_t) (timeout / 2));
639 // Receive end-of-processing messages, but do not wait for ever
640 fProof->Collect(TProof::kActive, timeout);
641 // Still not idle
642 if (!fIdle)
643 Warning("HandleTermination","processing could not be stopped");
644 }
645 // Close the session
646 if (fProof)
647 fProof->Close("S");
648 }
649
650 Terminate(0); // will not return from here....
651}
652
653////////////////////////////////////////////////////////////////////////////////
654/// Print the ProofServ logo on standard output.
655/// Return 0 on success, -1 on error
656
658{
659 char str[512];
660
661 if (IsMaster()) {
662 snprintf(str, 512, "**** Welcome to the PROOF server @ %s ****", gSystem->HostName());
663 } else {
664 snprintf(str, 512, "**** PROOF worker server @ %s started ****", gSystem->HostName());
665 }
666
667 if (fSocket->Send(str) != 1+static_cast<Int_t>(strlen(str))) {
668 Error("Setup", "failed to send proof server startup message");
669 return -1;
670 }
671
672 // Get client protocol
673 if ((fProtocol = gEnv->GetValue("ProofServ.ClientVersion", -1)) < 0) {
674 Error("Setup", "remote proof protocol missing");
675 return -1;
676 }
677
678 // The local user
679 fUser = gEnv->GetValue("ProofServ.Entity", "");
680 if (fUser.Length() >= 0) {
681 if (fUser.Contains(":"))
682 fUser.Remove(fUser.Index(":"));
683 if (fUser.Contains("@"))
684 fUser.Remove(fUser.Index("@"));
685 } else {
687 if (pw) {
688 fUser = pw->fUser;
689 delete pw;
690 }
691 }
692
693 // Work dir and ...
694 if (IsMaster()) {
695 TString cf = gEnv->GetValue("ProofServ.ProofConfFile", "");
696 if (cf.Length() > 0)
697 fConfFile = cf;
698 }
699 fWorkDir = gEnv->GetValue("ProofServ.Sandbox", Form("~/%s", kPROOF_WorkDir));
700
701 // Get Session tag
702 if ((fSessionTag = gEnv->GetValue("ProofServ.SessionTag", "-1")) == "-1") {
703 Error("Setup", "Session tag missing");
704 return -1;
705 }
706 // Get top session tag, i.e. the tag of the PROOF session
707 if ((fTopSessionTag = gEnv->GetValue("ProofServ.TopSessionTag", "-1")) == "-1") {
708 fTopSessionTag = "";
709 // Try to extract it from log file path (for backward compatibility)
710 if (gSystem->Getenv("ROOTPROOFLOGFILE")) {
711 fTopSessionTag = gSystem->GetDirName(gSystem->Getenv("ROOTPROOFLOGFILE"));
712 Ssiz_t lstl;
713 if ((lstl = fTopSessionTag.Last('/')) != kNPOS) fTopSessionTag.Remove(0, lstl + 1);
714 if (fTopSessionTag.BeginsWith("session-")) {
715 fTopSessionTag.Remove(0, strlen("session-"));
716 } else {
717 fTopSessionTag = "";
718 }
719 }
720 if (fTopSessionTag.IsNull()) {
721 Error("Setup", "top session tag missing");
722 return -1;
723 }
724 }
725
726 // Make sure the process ID is in the tag
727 TString spid = Form("-%d", gSystem->GetPid());
728 if (!fSessionTag.EndsWith(spid)) {
729 Int_t nd = 0;
730 if ((nd = fSessionTag.CountChar('-')) >= 2) {
731 Int_t id = fSessionTag.Index("-", fSessionTag.Index("-") + 1);
732 if (id != kNPOS) fSessionTag.Remove(id);
733 } else if (nd != 1) {
734 Warning("Setup", "Wrong number of '-' in session tag: protocol error? %s", fSessionTag.Data());
735 }
736 // Add this process ID
737 fSessionTag += spid;
738 }
739 if (gProofDebugLevel > 0)
740 Info("Setup", "session tags: %s, %s", fTopSessionTag.Data(), fSessionTag.Data());
741
742 // Get Session dir (sandbox)
743 if ((fSessionDir = gEnv->GetValue("ProofServ.SessionDir", "-1")) == "-1") {
744 Error("Setup", "Session dir missing");
745 return -1;
746 }
747
748 // Goto to the main PROOF working directory
750 if (gProofDebugLevel > 0)
751 Info("Setup", "working directory set to %s", fWorkDir.Data());
752
753 // Common setup
754 if (SetupCommon() != 0) {
755 Error("Setup", "common setup failed");
756 return -1;
757 }
758
759 // Send packages off immediately to reduce latency
761
762 // Check every two hours if client is still alive
764
765 // Install SigPipe handler to handle kKeepAlive failure
766 gSystem->AddSignalHandler(new TXProofServSigPipeHandler(this));
767
768 // Install Termination handler
769 gSystem->AddSignalHandler(new TXProofServTerminationHandler(this));
770
771 // Install seg violation handler
772 gSystem->AddSignalHandler(new TXProofServSegViolationHandler(this));
773
774 if (gProofDebugLevel > 0)
775 Info("Setup", "successfully completed");
776
777 // Done
778 return 0;
779}
780
781////////////////////////////////////////////////////////////////////////////////
782/// Get list of workers to be used from now on.
783/// The list must be provided by the caller.
784
786 Int_t & /* prioritychange */,
787 Bool_t resume)
788{
790
791 // User config files, when enabled, override cluster-wide configuration
792 if (gEnv->GetValue("ProofServ.UseUserCfg", 0) != 0) {
793 Int_t pc = 1;
794 return TProofServ::GetWorkers(workers, pc);
795 }
796
797 // seqnum of the query for which we call getworkers
798 Bool_t dynamicStartup = gEnv->GetValue("Proof.DynamicStartup", kFALSE);
799 TString seqnum = (dynamicStartup) ? "" : XPD_GW_Static;
800 if (!fWaitingQueries->IsEmpty()) {
801 if (resume) {
802 seqnum += ((TProofQueryResult *)(fWaitingQueries->First()))->GetSeqNum();
803 } else {
804 seqnum += ((TProofQueryResult *)(fWaitingQueries->Last()))->GetSeqNum();
805 }
806 }
807 // Send request to the coordinator
808 TObjString *os = 0;
809 if (dynamicStartup) {
810 // We wait dynto seconds for the first worker to come; -1 means forever
811 Int_t dynto = gEnv->GetValue("Proof.DynamicStartupTimeout", -1);
812 Bool_t doto = (dynto > 0) ? kTRUE : kFALSE;
813 while (!(os = ((TXSocket *)fSocket)->SendCoordinator(kGetWorkers, seqnum.Data()))) {
814 if (doto > 0 && --dynto < 0) break;
815 // Another second
816 gSystem->Sleep(1000);
817 }
818 } else {
819 os = ((TXSocket *)fSocket)->SendCoordinator(kGetWorkers, seqnum.Data());
820 }
821
822 // The reply contains some information about the master (image, workdir)
823 // followed by the information about the workers; the tokens for each node
824 // are separated by '&'
825 if (os) {
826 TString fl(os->GetName());
828 SendAsynMessage("+++ Query cannot be processed now: enqueued");
829 return kQueryEnqueued;
830 }
831
832 // Honour a max number of workers request (typically when running in valgrind)
833 Int_t nwrks = -1;
834 Bool_t pernode = kFALSE;
835 if (gSystem->Getenv("PROOF_NWORKERS")) {
836 TString s(gSystem->Getenv("PROOF_NWORKERS"));
837 if (s.EndsWith("x")) {
838 pernode = kTRUE;
839 s.ReplaceAll("x", "");
840 }
841 if (s.IsDigit()) {
842 nwrks = s.Atoi();
843 if (!dynamicStartup && (nwrks > 0)) {
844 // Notify, except in dynamic workers mode to avoid flooding
845 TString msg;
846 if (pernode) {
847 msg.Form("+++ Starting max %d workers per node following the setting of PROOF_NWORKERS", nwrks);
848 } else {
849 msg.Form("+++ Starting max %d workers following the setting of PROOF_NWORKERS", nwrks);
850 }
851 SendAsynMessage(msg);
852 } else {
853 nwrks = -1;
854 }
855 } else {
856 pernode = kFALSE;
857 }
858 }
859
860 TString tok;
861 Ssiz_t from = 0;
862 TList *nodecnt = (pernode) ? new TList : 0 ;
863 if (fl.Tokenize(tok, from, "&")) {
864 if (!tok.IsNull()) {
865 TProofNodeInfo *master = new TProofNodeInfo(tok);
866 if (!master) {
867 Error("GetWorkers", "no appropriate master line got from coordinator");
868 return kQueryStop;
869 } else {
870 // Set image if not yet done and available
871 if (fImage.IsNull() && strlen(master->GetImage()) > 0)
872 fImage = master->GetImage();
873 SafeDelete(master);
874 }
875 // Now the workers
876 while (fl.Tokenize(tok, from, "&")) {
877 if (!tok.IsNull()) {
878 if (nwrks == -1 || nwrks > 0) {
879 // We have the minimal set of information to start
880 rc = kQueryOK;
881 if (pernode && nodecnt) {
882 TProofNodeInfo *ni = new TProofNodeInfo(tok);
883 TParameter<Int_t> *p = 0;
884 Int_t nw = 0;
885 if (!(p = (TParameter<Int_t> *) nodecnt->FindObject(ni->GetNodeName().Data()))) {
886 p = new TParameter<Int_t>(ni->GetNodeName().Data(), nw);
887 nodecnt->Add(p);
888 }
889 nw = p->GetVal();
890 if (gDebug > 0)
891 Info("GetWorkers","%p: name: %s (%s) val: %d (nwrks: %d)",
892 p, p->GetName(), ni->GetNodeName().Data(), nw, nwrks);
893 if (nw < nwrks) {
894 if (workers) workers->Add(ni);
895 nw++;
896 p->SetVal(nw);
897 } else {
898 // Two many workers on this machine already
899 SafeDelete(ni);
900 }
901 } else {
902 if (workers)
903 workers->Add(new TProofNodeInfo(tok));
904 // Count down
905 if (nwrks != -1) nwrks--;
906 }
907 } else {
908 // Release this worker (to cleanup the session list in the coordinator and get a fresh
909 // and correct list next call)
910 TProofNodeInfo *ni = new TProofNodeInfo(tok);
912 }
913 }
914 }
915 }
916 }
917 // Cleanup
918 if (nodecnt) {
919 nodecnt->SetOwner(kTRUE);
920 SafeDelete(nodecnt);
921 }
922 }
923
924 // We are done
925 return rc;
926}
927
928////////////////////////////////////////////////////////////////////////////////
929/// Handle error on the input socket
930
932{
933 // Try reconnection
934 if (fSocket && !fSocket->IsValid()) {
935
937 if (fSocket && fSocket->IsValid()) {
938 if (gDebug > 0)
939 Info("HandleError",
940 "%p: connection to local coordinator re-established", this);
941 FlushLogFile();
942 return kFALSE;
943 }
944 }
945 Printf("TXProofServ::HandleError: %p: got called ...", this);
946
947 // If master server, propagate interrupt to slaves
948 // (shutdown interrupt send internally).
949 if (IsMaster())
950 fProof->Close("S");
951
952 // Avoid communicating back anything to the coordinator (it is gone)
953 if (fSocket) ((TXSocket *)fSocket)->SetSessionID(-1);
954
955 Terminate(0);
956
957 Printf("TXProofServ::HandleError: %p: DONE ... ", this);
958
959 // We are done
960 return kTRUE;
961}
962
963////////////////////////////////////////////////////////////////////////////////
964/// Handle asynchronous input on the input socket
965
967{
968 if (gDebug > 2)
969 Printf("TXProofServ::HandleInput %p, in: %p", this, in);
970
971 XHandleIn_t *hin = (XHandleIn_t *) in;
972 Int_t acod = (hin) ? hin->fInt1 : kXPD_msg;
973
974 // Act accordingly
975 if (acod == kXPD_ping || acod == kXPD_interrupt) {
976 // Interrupt or Ping
978
979 } else if (acod == kXPD_flush) {
980 // Flush stdout, so that we can access the full log file
981 Info("HandleInput","kXPD_flush: flushing log file (stdout)");
982 fflush(stdout);
983
984 } else if (acod == kXPD_urgent) {
985 // Get type
986 Int_t type = hin->fInt2;
987 switch (type) {
989 {
990 // Abort or Stop ?
991 Bool_t abort = (hin->fInt3 != 0) ? kTRUE : kFALSE;
992 // Timeout
993 Int_t timeout = hin->fInt4;
994 // Act now
995 if (fProof)
996 fProof->StopProcess(abort, timeout);
997 else
998 if (fPlayer)
999 fPlayer->StopProcess(abort, timeout);
1000 }
1001 break;
1002 default:
1003 Info("HandleInput","kXPD_urgent: unknown type: %d", type);
1004 }
1005
1006 } else if (acod == kXPD_inflate) {
1007
1008 // Obsolete type
1009 Warning("HandleInput", "kXPD_inflate: obsolete message type");
1010
1011 } else if (acod == kXPD_priority) {
1012
1013 // The factor is the priority to be propagated
1014 fGroupPriority = hin->fInt2;
1015 if (fProof)
1017 // Notify
1018 Info("HandleInput", "kXPD_priority: group %s priority set to %f",
1019 fGroup.Data(), (Float_t) fGroupPriority / 100.);
1020
1021 } else if (acod == kXPD_clusterinfo) {
1022
1023 // Information about the cluster status
1024 fTotSessions = hin->fInt2;
1025 fActSessions = hin->fInt3;
1026 fEffSessions = (hin->fInt4)/1000.;
1027 // Notify
1028 Info("HandleInput", "kXPD_clusterinfo: tot: %d, act: %d, eff: %f",
1030
1031 } else {
1032 // Standard socket input
1034 // This request has been completed: remove the client ID from the pipe
1035 ((TXSocket *)fSocket)->RemoveClientID();
1036 }
1037
1038 // We are done
1039 return kTRUE;
1040}
1041
1042////////////////////////////////////////////////////////////////////////////////
1043/// Disable read timeout on the underlying socket
1044
1046{
1047 if (fSocket)
1048 ((TXSocket *)fSocket)->DisableTimeout();
1049}
1050
1051////////////////////////////////////////////////////////////////////////////////
1052/// Enable read timeout on the underlying socket
1053
1055{
1056 if (fSocket)
1057 ((TXSocket *)fSocket)->EnableTimeout();
1058}
1059
1060////////////////////////////////////////////////////////////////////////////////
1061/// Terminate the proof server.
1062
1064{
1065 if (fTerminated)
1066 // Avoid doubling the exit operations
1067 exit(1);
1069
1070 // Notify
1071 Info("Terminate", "starting session termination operations ...");
1072 if (fgLogToSysLog > 0) {
1073 TString s;
1074 s.Form("%s -1 %.3f %.3f", fgSysLogEntity.Data(), fRealTime, fCpuTime);
1075 gSystem->Syslog(kLogNotice, s.Data());
1076 }
1077
1078 // Notify the memory footprint
1079 ProcInfo_t pi;
1080 if (!gSystem->GetProcInfo(&pi)){
1081 Info("Terminate", "process memory footprint: %ld/%ld kB virtual, %ld/%ld kB resident ",
1082 pi.fMemVirtual, fgVirtMemMax, pi.fMemResident, fgResMemMax);
1083 }
1084
1085 // Deactivate current monitor, if any
1086 if (fProof)
1088
1089 // Cleanup session directory
1090 if (status == 0) {
1091 // make sure we remain in a "connected" directory
1093 // needed in case fSessionDir is on NFS ?!
1094 gSystem->MakeDirectory(fSessionDir+"/.delete");
1095 gSystem->Exec(Form("%s %s", kRM, fSessionDir.Data()));
1096 }
1097
1098 // Cleanup queries directory if empty
1099 if (IsMaster()) {
1100 if (!(fQMgr && fQMgr->Queries() && fQMgr->Queries()->GetSize())) {
1101 // make sure we remain in a "connected" directory
1103 // needed in case fQueryDir is on NFS ?!
1104 gSystem->MakeDirectory(fQueryDir+"/.delete");
1105 gSystem->Exec(Form("%s %s", kRM, fQueryDir.Data()));
1106 // Remove lock file
1107 if (fQueryLock)
1109 }
1110
1111 // Unlock the query dir owned by this session
1112 if (fQueryLock)
1113 fQueryLock->Unlock();
1114 } else {
1115 // Try to stop processing if any
1116 Bool_t abort = (status == 0) ? kFALSE : kTRUE;
1117 if (!fIdle && fPlayer)
1118 fPlayer->StopProcess(abort,1);
1119 gSystem->Sleep(2000);
1120 }
1121
1122 // Cleanup data directory if empty
1125 Info("Terminate", "data directory '%s' has been removed", fDataDir.Data());
1126 }
1127
1128 // Remove input and signal handlers to avoid spurious "signals"
1129 // for closing activities executed upon exit()
1131
1132 // Stop processing events (set a flag to exit the event loop)
1133 gSystem->ExitLoop();
1134
1135 // We post the pipe once to wake up the main thread which is waiting for
1136 // activity on this socket; this fake activity will make it return and
1137 // eventually exit the loop.
1139
1140 // Notify
1141 Printf("Terminate: termination operations ended: quitting!");
1142}
1143
1144////////////////////////////////////////////////////////////////////////////////
1145/// Try locking query area of session tagged sessiontag.
1146/// The id of the locking file is returned in fid and must be
1147/// unlocked via UnlockQueryFile(fid).
1148
1149Int_t TXProofServ::LockSession(const char *sessiontag, TProofLockPath **lck)
1150{
1151 // We do not need to lock our own session
1152 if (strstr(sessiontag, fTopSessionTag))
1153 return 0;
1154
1155 if (!lck) {
1156 Info("LockSession","locker space undefined");
1157 return -1;
1158 }
1159 *lck = 0;
1160
1161 // Check the format
1162 TString stag = sessiontag;
1163 TRegexp re("session-.*-.*-.*");
1164 Int_t i1 = stag.Index(re);
1165 if (i1 == kNPOS) {
1166 Info("LockSession","bad format: %s", sessiontag);
1167 return -1;
1168 }
1169 stag.ReplaceAll("session-","");
1170
1171 // Drop query number, if any
1172 Int_t i2 = stag.Index(":q");
1173 if (i2 != kNPOS)
1174 stag.Remove(i2);
1175
1176 // Make sure that parent process does not exist anylonger
1177 TString parlog = fSessionDir;
1178 parlog = parlog.Remove(parlog.Index("master-")+strlen("master-"));
1179 parlog += stag;
1180 if (!gSystem->AccessPathName(parlog)) {
1181 Info("LockSession","parent still running: do nothing");
1182 return -1;
1183 }
1184
1185 // Lock the query lock file
1186 TString qlock = fQueryLock->GetName();
1187 qlock.ReplaceAll(fTopSessionTag, stag);
1188
1189 if (!gSystem->AccessPathName(qlock)) {
1190 *lck = new TProofLockPath(qlock);
1191 if (((*lck)->Lock()) < 0) {
1192 Info("LockSession","problems locking query lock file");
1193 SafeDelete(*lck);
1194 return -1;
1195 }
1196 }
1197
1198 // We are done
1199 return 0;
1200}
1201
1202////////////////////////////////////////////////////////////////////////////////
1203/// Send message to intermediate coordinator to release worker of last ordinal
1204/// ord.
1205
1206void TXProofServ::ReleaseWorker(const char *ord)
1207{
1208 if (gDebug > 2) Info("ReleaseWorker","releasing: %s", ord);
1209
1210 ((TXSocket *)fSocket)->SendCoordinator(kReleaseWorker, ord);
1211}
#define SafeDelete(p)
Definition: RConfig.hxx:543
#define h(i)
Definition: RSha256.hxx:106
const Ssiz_t kNPOS
Definition: RtypesCore.h:113
int Int_t
Definition: RtypesCore.h:43
const Bool_t kFALSE
Definition: RtypesCore.h:90
long Long_t
Definition: RtypesCore.h:52
bool Bool_t
Definition: RtypesCore.h:61
R__EXTERN Int_t gDebug
Definition: RtypesCore.h:117
float Float_t
Definition: RtypesCore.h:55
const Bool_t kTRUE
Definition: RtypesCore.h:89
#define ClassImp(name)
Definition: Rtypes.h:361
R__EXTERN TEnv * gEnv
Definition: TEnv.h:171
int type
Definition: TGX11.cxx:120
#define gInterpreter
Definition: TInterpreter.h:556
#define PDB(mask, level)
Definition: TProofDebug.h:56
R__EXTERN Int_t gProofDebugLevel
Definition: TProofDebug.h:54
const char *const kRM
Definition: TProof.h:142
const char *const kPROOF_WorkDir
Definition: TProof.h:124
const Int_t kPROOF_Protocol
Definition: TProof.h:120
#define gROOT
Definition: TROOT.h:406
char * Form(const char *fmt,...)
void Printf(const char *fmt,...)
@ kSigTermination
@ kSigInterrupt
@ kSigSegmentationViolation
@ kKeepAlive
Definition: TSystem.h:218
@ kNoDelay
Definition: TSystem.h:220
@ kReadPermission
Definition: TSystem.h:46
@ kWritePermission
Definition: TSystem.h:45
@ kLogNotice
Definition: TSystem.h:60
R__EXTERN TSystem * gSystem
Definition: TSystem.h:556
static volatile Int_t gProofServDebug
Definition: TXProofServ.cxx:60
TApplication * GetTXProofServ(Int_t *argc, char **argv, FILE *flog)
@ kReleaseWorker
@ kGetWorkers
const char *const XPD_GW_QueryEnqueued
@ kXPD_flush
@ kXPD_msg
@ kXPD_urgent
@ kXPD_clusterinfo
@ kXPD_interrupt
@ kXPD_ping
@ kXPD_inflate
@ kXPD_priority
const char *const XPD_GW_Static
#define NAME_DEBUG
#define EnvPutInt(name, val)
Definition: XrdClientEnv.hh:47
const char * proto
Definition: civetweb.c:16604
#define snprintf
Definition: civetweb.c:1540
This class creates the ROOT Application Environment that interfaces to the windowing system eventloop...
Definition: TApplication.h:39
char ** Argv() const
Definition: TApplication.h:137
Bool_t NoLogOpt() const
Definition: TApplication.h:139
virtual Long_t ProcessLine(const char *line, Bool_t sync=kFALSE, Int_t *error=0)
Process a single command line, either a C++ statement or an interpreter command starting with a "....
virtual Long_t ProcessFile(const char *file, Int_t *error=0, Bool_t keep=kFALSE)
Process a file containing a C++ macro.
Int_t Argc() const
Definition: TApplication.h:136
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Bool_t IsEmpty() const
Definition: TCollection.h:186
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Definition: TCollection.h:182
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition: TEnv.cxx:491
virtual void SetValue(const char *name, const char *value, EEnvLevel level=kEnvChange, const char *type=0)
Set the value of a resource or create a new resource.
Definition: TEnv.cxx:736
virtual Bool_t Notify()
Notify when event occurred on descriptor associated with this handler.
virtual Bool_t ReadNotify()
Notify when something can be read from the descriptor associated with this handler.
A doubly linked list.
Definition: TList.h:44
virtual void Add(TObject *obj)
Definition: TList.h:87
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:577
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
Definition: TList.cxx:692
virtual void Delete(Option_t *option="")
Remove all objects from the list AND delete all heap based objects.
Definition: TList.cxx:469
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition: TList.cxx:658
virtual void SetTitle(const char *title="")
Set the title of the TNamed.
Definition: TNamed.cxx:164
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:47
Collectable string class.
Definition: TObjString.h:28
const char * GetName() const
Returns name of object.
Definition: TObjString.h:38
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:877
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:891
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:865
Named parameter, streamable and storable.
Definition: TParameter.h:37
void SetVal(const AParamType &val)
Definition: TParameter.h:71
const AParamType & GetVal() const
Definition: TParameter.h:69
const char * GetName() const
Returns name of object.
Definition: TParameter.h:68
This class implements a plugin library manager.
TPluginHandler * FindHandler(const char *base, const char *uri=0)
Returns the handler if there exists a handler for the specified URI.
Int_t Unlock()
Unlock the directory.
The purpose of this class is to provide a complete node description for masters, submasters and worke...
const TString & GetImage() const
const TString & GetOrdinal() const
const TString & GetNodeName() const
TQueryResult version adapted to PROOF neeeds.
Class providing the PROOF server.
Definition: TProofServ.h:66
Float_t fCpuTime
Definition: TProofServ.h:114
TString fQueryDir
Definition: TProofServ.h:88
Int_t fProtocol
Definition: TProofServ.h:103
void Interrupt()
Definition: TProofServ.h:291
Int_t CatMotd()
Print message of the day (in the file pointed by the env PROOFMOTD or from fConfDir/etc/proof/motd).
Int_t fLogLevel
Definition: TProofServ.h:107
FILE * fLogFile
Definition: TProofServ.h:100
virtual EQueryAction GetWorkers(TList *workers, Int_t &prioritychange, Bool_t resume=kFALSE)
Get list of workers to be used from now on.
TString fDataDir
Definition: TProofServ.h:90
static Long_t fgResMemMax
Definition: TProofServ.h:160
TString fSessionDir
Definition: TProofServ.h:85
TString fAdminPath
Definition: TProofServ.h:92
void FlushLogFile()
Reposition the read pointer in the log file to the very end.
TString fService
Definition: TProofServ.h:76
Int_t fLogFileDes
Definition: TProofServ.h:101
TProofLockPath * fQueryLock
Definition: TProofServ.h:95
void RedirectOutput(const char *dir=0, const char *mode="w")
Redirect stdout to a log file.
Bool_t fMasterServ
Definition: TProofServ.h:111
TSocket * fSocket
Definition: TProofServ.h:97
Int_t fCompressMsg
Definition: TProofServ.h:142
Int_t SetupCommon()
Common part (between TProofServ and TXProofServ) of the setup phase.
void SendAsynMessage(const char *msg, Bool_t lf=kTRUE)
Send an asychronous message to the master / client .
TVirtualProofPlayer * fPlayer
Definition: TProofServ.h:99
TString fSessionTag
Definition: TProofServ.h:83
Bool_t IsMaster() const
Definition: TProofServ.h:293
TProof * fProof
Definition: TProofServ.h:98
TString fOrdinal
Definition: TProofServ.h:104
Bool_t fEndMaster
Definition: TProofServ.h:110
virtual void HandleSocketInput()
Handle input coming from the client or from the master server.
TString fWorkDir
Definition: TProofServ.h:81
TShutdownTimer * fShutdownTimer
Definition: TProofServ.h:138
Int_t fActSessions
Definition: TProofServ.h:121
TString fGroup
Definition: TProofServ.h:78
Bool_t fRealTimeLog
Definition: TProofServ.h:136
static Long_t fgVirtMemMax
Definition: TProofServ.h:159
TString fImage
Definition: TProofServ.h:82
@ kQueryEnqueued
Definition: TProofServ.h:73
TString fTopSessionTag
Definition: TProofServ.h:84
Float_t fEffSessions
Definition: TProofServ.h:122
TString fConfDir
Definition: TProofServ.h:79
TIdleTOTimer * fIdleTOTimer
Definition: TProofServ.h:140
static Int_t fgLogToSysLog
Definition: TProofServ.h:172
Bool_t UnlinkDataDir(const char *path)
Scan recursively the datadir and unlink it if empty Return kTRUE if it can be unlinked,...
friend class TXProofServ
Definition: TProofServ.h:69
TString fConfFile
Definition: TProofServ.h:80
Int_t fTotSessions
Definition: TProofServ.h:120
TString fUser
Definition: TProofServ.h:77
TQueryResultManager * fQMgr
Definition: TProofServ.h:126
TList * fWaitingQueries
Definition: TProofServ.h:128
static TString fgSysLogEntity
Definition: TProofServ.h:174
void LogToMaster(Bool_t on=kTRUE)
Definition: TProofServ.h:322
Int_t fGroupPriority
Definition: TProofServ.h:109
virtual void SendLogFile(Int_t status=0, Int_t start=-1, Int_t end=-1)
Send log file to master.
Float_t fRealTime
Definition: TProofServ.h:113
Int_t UpdateSessionStatus(Int_t xst=-1)
Update the session status in the relevant file.
Bool_t fIdle
Definition: TProofServ.h:129
This class controls a Parallel ROOT Facility, PROOF, cluster.
Definition: TProof.h:316
Int_t BroadcastGroupPriority(const char *grp, Int_t priority, ESlaves list=kAllUnique)
Broadcast the group priority to all workers in the specified list.
Definition: TProof.cxx:2441
@ kShutdownInterrupt
Definition: TProof.h:398
@ kHardInterrupt
Definition: TProof.h:396
@ kPing
Definition: TProof.h:395
@ kSoftInterrupt
Definition: TProof.h:397
Bool_t IsEndMaster() const
Definition: TProof.h:664
void Close(Option_t *option="")
Close all open slave servers.
Definition: TProof.cxx:1788
TList * fActiveSlaves
Definition: TProof.h:477
void InterruptCurrentMonitor()
If in active in a monitor set ready state.
Definition: TProof.cxx:11325
Bool_t IsValid() const
Definition: TProof.h:937
Int_t Collect(const TSlave *sl, Long_t timeout=-1, Int_t endtype=-1, Bool_t deactonfail=kFALSE)
Collect responses from slave sl.
Definition: TProof.cxx:2659
void SetActive(Bool_t=kTRUE)
Definition: TProof.h:988
void SetMonitor(TMonitor *mon=0, Bool_t on=kTRUE)
Activate (on == TRUE) or deactivate (on == FALSE) all sockets monitored by 'mon'.
Definition: TProof.cxx:2398
@ kActive
Definition: TProof.h:564
Int_t Ping(ESlaves list)
Ping PROOF slaves. Returns the number of slaves that responded.
Definition: TProof.cxx:4742
void StopProcess(Bool_t abort, Int_t timeout=-1)
Send STOPPROCESS message to master and workers.
Definition: TProof.cxx:6214
void Interrupt(EUrgent type, ESlaves list=kActive)
Send interrupt to master or slave servers.
Definition: TProof.cxx:2266
virtual void SaveWorkerInfo()
Save information about the worker set in the file .workers in the working dir.
Definition: TProof.cxx:11798
TList * Queries() const
static const char * GetMacroPath()
Get macro search path. Static utility function.
Definition: TROOT.cxx:2693
Regular expression class.
Definition: TRegexp.h:31
virtual Bool_t Notify()
Notify when signal occurs.
virtual Int_t SetOption(ESockOptions opt, Int_t val)
Set socket options.
Definition: TSocket.cxx:1012
virtual Int_t GetDescriptor() const
Definition: TSocket.h:112
void SetCompressionSettings(Int_t settings=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault)
Used to specify the compression level and algorithm: settings = 100 * algorithm + level.
Definition: TSocket.cxx:1097
virtual Bool_t IsValid() const
Definition: TSocket.h:132
virtual Int_t Reconnect()
Definition: TSocket.h:138
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:522
Basic string class.
Definition: TString.h:131
Ssiz_t Length() const
Definition: TString.h:405
TString & Insert(Ssiz_t pos, const char *s)
Definition: TString.h:644
Bool_t EndsWith(const char *pat, ECaseCompare cmp=kExact) const
Return true if string ends with the specified string.
Definition: TString.cxx:2177
const char * Data() const
Definition: TString.h:364
TString & ReplaceAll(const TString &s1, const TString &s2)
Definition: TString.h:687
Ssiz_t Last(char c) const
Find last occurrence of a character c.
Definition: TString.cxx:892
TObjArray * Tokenize(const TString &delim) const
This function is used to isolate sequential tokens in a TString.
Definition: TString.cxx:2197
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
Definition: TString.h:610
Bool_t IsNull() const
Definition: TString.h:402
Int_t CountChar(Int_t c) const
Return number of times character c occurs in the string.
Definition: TString.cxx:476
TString & Remove(Ssiz_t pos)
Definition: TString.h:668
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition: TString.cxx:2289
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition: TString.h:619
Ssiz_t Index(const char *pat, Ssiz_t i=0, ECaseCompare cmp=kExact) const
Definition: TString.h:634
virtual void AddFileHandler(TFileHandler *fh)
Add a file handler to the list of system file handlers.
Definition: TSystem.cxx:552
virtual void Syslog(ELogLevel level, const char *mess)
Send mess to syslog daemon.
Definition: TSystem.cxx:1679
static void ResetErrno()
Static function resetting system error number.
Definition: TSystem.cxx:274
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
Definition: TSystem.cxx:1269
static Int_t GetErrno()
Static function returning system error number.
Definition: TSystem.cxx:258
virtual int GetPid()
Get process id.
Definition: TSystem.cxx:705
virtual const char * Getenv(const char *env)
Get environment variable.
Definition: TSystem.cxx:1658
virtual int MakeDirectory(const char *name)
Make a directory.
Definition: TSystem.cxx:823
virtual Int_t Exec(const char *shellcmd)
Execute a command.
Definition: TSystem.cxx:651
virtual TFileHandler * RemoveFileHandler(TFileHandler *fh)
Remove a file handler from the list of file handlers.
Definition: TSystem.cxx:562
virtual Bool_t AccessPathName(const char *path, EAccessMode mode=kFileExists)
Returns FALSE if one can access a file using the specified access mode.
Definition: TSystem.cxx:1291
virtual void ExitLoop()
Exit from event loop.
Definition: TSystem.cxx:390
virtual Bool_t ChangeDirectory(const char *path)
Change directory.
Definition: TSystem.cxx:858
virtual int GetProcInfo(ProcInfo_t *info) const
Returns cpu and memory used by this process into the ProcInfo_t structure.
Definition: TSystem.cxx:2490
virtual void AddSignalHandler(TSignalHandler *sh)
Add a signal handler to list of system signal handlers.
Definition: TSystem.cxx:530
virtual const char * HostName()
Return the system's host name.
Definition: TSystem.cxx:301
virtual void Sleep(UInt_t milliSec)
Sleep milliSec milli seconds.
Definition: TSystem.cxx:435
virtual char * Which(const char *search, const char *file, EAccessMode mode=kFileExists)
Find location of file in a search path.
Definition: TSystem.cxx:1541
virtual TString GetDirName(const char *pathname)
Return the directory name in pathname.
Definition: TSystem.cxx:1027
virtual int Unlink(const char *name)
Unlink, i.e.
Definition: TSystem.cxx:1376
virtual UserGroup_t * GetUserInfo(Int_t uid)
Returns all user info in the UserGroup_t structure.
Definition: TSystem.cxx:1594
virtual void Start(Long_t milliSec=-1, Bool_t singleShot=kFALSE)
Starts the timer with a milliSec timeout.
Definition: TTimer.cxx:211
virtual void StopProcess(Bool_t abort, Int_t timeout=-1)=0
This class implements the XProofD version of TProofServ, with respect to which it differs only for th...
Definition: TXProofServ.h:30
void EnableTimeout()
Enable read timeout on the underlying socket.
TString fSockPath
Definition: TXProofServ.h:35
Int_t Setup()
Print the ProofServ logo on standard output.
void Terminate(Int_t status)
Terminate the proof server.
void HandleTermination()
Called when the client is not alive anymore; terminate the session.
Bool_t fTerminated
Definition: TXProofServ.h:37
EQueryAction GetWorkers(TList *workers, Int_t &prioritychange, Bool_t resume=kFALSE)
Get list of workers to be used from now on.
virtual ~TXProofServ()
Cleanup.
void DisableTimeout()
Disable read timeout on the underlying socket.
void HandleSigPipe()
Called when the client is not alive anymore; terminate the session.
Bool_t HandleError(const void *in=0)
Handle error on the input socket.
Bool_t HandleInput(const void *in=0)
Handle asynchronous input on the input socket.
TXProofServInterruptHandler * fInterruptHandler
Definition: TXProofServ.h:33
void ReleaseWorker(const char *ord)
Send message to intermediate coordinator to release worker of last ordinal ord.
Int_t LockSession(const char *sessiontag, TProofLockPath **lck)
Try locking query area of session tagged sessiontag.
TXSocketHandler * fInputHandler
Definition: TXProofServ.h:34
Int_t CreateServer()
Finalize the server setup.
void HandleUrgentData()
Handle high priority data sent by the master or client.
Int_t Post(TSocket *s)
Write a byte to the global pipe to signal new availibility of new messages.
Definition: TXSocket.cxx:2284
static TXSocketHandler * GetSocketHandler(TFileHandler *h=0, TSocket *s=0)
Get an instance of the input socket handler with 'h' as handler, connected to socket 's'.
High level handler of connections to XProofD.
Definition: TXSocket.h:59
static void SetLocation(const char *loc="")
Set location string.
Definition: TXSocket.cxx:242
@ kStopProcess
Definition: TXSocket.h:139
static TXSockPipe fgPipe
Definition: TXSocket.h:111
Implementation of TXSocket using PF_UNIX sockets.
Definition: TXUnixSocket.h:29
static constexpr double s
static constexpr double pi
static constexpr double pc
TString fUser
Definition: TSystem.h:140
Int_t fInt2
Definition: TXSocket.h:50
Int_t fInt4
Definition: TXSocket.h:52
Int_t fInt1
Definition: TXSocket.h:49
Int_t fInt3
Definition: TXSocket.h:51