ROOT  6.06/09
Reference Guide
XrdProofdProtocol.cxx
Go to the documentation of this file.
1 // @(#)root/proofd:$Id$
2 // Author: Gerardo Ganis 12/12/2005
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 //////////////////////////////////////////////////////////////////////////
13 // //
14 // XrdProofdProtocol //
15 // //
16 // Authors: G. Ganis, CERN, 2005 //
17 // //
18 // XrdProtocol implementation to coordinate 'proofserv' applications. //
19 // //
20 //////////////////////////////////////////////////////////////////////////
21 
22 #include "XrdProofdPlatform.h"
23 
24 #include "XpdSysError.h"
25 #include "XpdSysLogger.h"
26 
27 #include "XrdSys/XrdSysPriv.hh"
28 #include "XrdOuc/XrdOucStream.hh"
29 
30 #include "XrdVersion.hh"
31 #include "Xrd/XrdBuffer.hh"
32 #include "Xrd/XrdScheduler.hh"
33 
34 #include "XrdProofdClient.h"
35 #include "XrdProofdClientMgr.h"
36 #include "XrdProofdConfig.h"
37 #include "XrdProofdManager.h"
38 #include "XrdProofdNetMgr.h"
39 #include "XrdProofdPriorityMgr.h"
40 #include "XrdProofdProofServMgr.h"
41 #include "XrdProofdProtocol.h"
42 #include "XrdProofdResponse.h"
43 #include "XrdProofdProofServ.h"
44 #include "XrdProofSched.h"
45 #include "XrdROOT.h"
46 #include "rpdconn.h"
47 
48 // Tracing utils
49 #include "XrdProofdTrace.h"
50 XrdOucTrace *XrdProofdTrace = 0;
51 
52 // Loggers: we need two to avoid deadlocks
54 
55 //
56 // Static area: general protocol managing section
59  "xproofd protocol anchor");
60 XrdSysRecMutex XrdProofdProtocol::fgBMutex; // Buffer management mutex
61 XrdBuffManager *XrdProofdProtocol::fgBPool = 0;
65 //
66 // Static area: protocol configuration section
68 //
70 // Cluster manager
71 XrdProofdManager *XrdProofdProtocol::fgMgr = 0;
72 
73 // Effective uid
75 
76 // Local definitions
77 #define MAX_ARGS 128
78 
79 // Macros used to set conditional options
80 #ifndef XPDCOND
81 #define XPDCOND(n,ns) ((n == -1 && ns == -1) || (n > 0 && n >= ns))
82 #endif
83 #ifndef XPDSETSTRING
84 #define XPDSETSTRING(n,ns,c,s) \
85  { if (XPDCOND(n,ns)) { \
86  SafeFree(c); c = strdup(s.c_str()); ns = n; }}
87 #endif
88 
89 #ifndef XPDADOPTSTRING
90 #define XPDADOPTSTRING(n,ns,c,s) \
91  { char *t = 0; \
92  XPDSETSTRING(n, ns, t, s); \
93  if (t && strlen(t)) { \
94  SafeFree(c); c = t; \
95  } else \
96  SafeFree(t); }
97 #endif
98 
99 #ifndef XPDSETINT
100 #define XPDSETINT(n,ns,i,s) \
101  { if (XPDCOND(n,ns)) { \
102  i = strtol(s.c_str(),0,10); ns = n; }}
103 #endif
104 
105 typedef struct {
106  kXR_int32 ptyp; // must be always 0 !
107  kXR_int32 rlen;
108  kXR_int32 pval;
109  kXR_int32 styp;
110 } hs_response_t;
111 
112 typedef struct ResetCtrlcGuard {
113  XrdProofdProtocol *xpd;
114  int type;
115  ResetCtrlcGuard(XrdProofdProtocol *p, int t) : xpd(p), type(t) { }
116  ~ResetCtrlcGuard() { if (xpd && type != kXP_ctrlc) xpd->ResetCtrlC(); }
118 
119 //
120 // Derivation of XrdProofdConfig to read the port from the config file
121 class XrdProofdProtCfg : public XrdProofdConfig {
122 public:
123  int fPort; // The port on which we listen
124  XrdProofdProtCfg(const char *cfg, XrdSysError *edest = 0);
125  int DoDirective(XrdProofdDirective *, char *, XrdOucStream *, bool);
126  void RegisterDirectives();
127 };
128 
129 ////////////////////////////////////////////////////////////////////////////////
130 /// Constructor
131 
132 XrdProofdProtCfg::XrdProofdProtCfg(const char *cfg, XrdSysError *edest)
133  : XrdProofdConfig(cfg, edest)
134 {
135  fPort = -1;
136  RegisterDirectives();
137 }
138 
139 ////////////////////////////////////////////////////////////////////////////////
140 /// Register directives for configuration
141 
142 void XrdProofdProtCfg::RegisterDirectives()
143 {
144  Register("port", new XrdProofdDirective("port", this, &DoDirectiveClass));
145  Register("xrd.protocol", new XrdProofdDirective("xrd.protocol", this, &DoDirectiveClass));
146 }
147 
148 ////////////////////////////////////////////////////////////////////////////////
149 /// Parse directives
150 
151 int XrdProofdProtCfg::DoDirective(XrdProofdDirective *d,
152  char *val, XrdOucStream *cfg, bool)
153 {
154  if (!d) return -1;
155 
156  XrdOucString port(val);
157  if (d->fName == "xrd.protocol") {
158  port = cfg->GetWord();
159  port.replace("xproofd:", "");
160  } else if (d->fName != "port") {
161  return -1;
162  }
163  if (port.length() > 0) {
164  fPort = strtol(port.c_str(), 0, 10);
165  }
166  fPort = (fPort < 0) ? XPD_DEF_PORT : fPort;
167  return 0;
168 }
169 
170 #if (ROOTXRDVERS >= 300030000)
171 XrdVERSIONINFO(XrdgetProtocol,xproofd);
172 XrdVERSIONINFO(XrdgetProtocolPort,xproofd);
173 #endif
174 
175 extern "C" {
176 ////////////////////////////////////////////////////////////////////////////////
177 /// This protocol is meant to live in a shared library. The interface below is
178 /// used by the server to obtain a copy of the protocol object that can be used
179 /// to decide whether or not a link is talking a particular protocol.
180 
181 XrdProtocol *XrdgetProtocol(const char *, char *parms, XrdProtocol_Config *pi)
182 {
183  // Return the protocol object to be used if static init succeeds
184  if (XrdProofdProtocol::Configure(parms, pi)) {
185 
186  return (XrdProtocol *) new XrdProofdProtocol(pi);
187  }
188  return (XrdProtocol *)0;
189 }
190 
191 ////////////////////////////////////////////////////////////////////////////////
192 /// This function is called early on to determine the port we need to use. The
193 /// The default is ostensibly 1093 but can be overidden; which we allow.
194 
195 int XrdgetProtocolPort(const char * /*pname*/, char * /*parms*/, XrdProtocol_Config *pi)
196 {
197  // Default XPD_DEF_PORT (1093)
198  int port = XPD_DEF_PORT;
199 
200  if (pi) {
201  XrdProofdProtCfg pcfg(pi->ConfigFN, pi->eDest);
202  // Init some relevant quantities for tracing
203  XrdProofdTrace = new XrdOucTrace(pi->eDest);
204  pcfg.Config(0);
205 
206  if (pcfg.fPort > 0) {
207  port = pcfg.fPort;
208  } else {
209  port = (pi && pi->Port > 0) ? pi->Port : XPD_DEF_PORT;
210  }
211  }
212  return port;
213 }}
214 
215 ////////////////////////////////////////////////////////////////////////////////
216 /// Protocol constructor
217 
219  : XrdProtocol("xproofd protocol handler"), fProtLink(this)
220 {
221  fLink = 0;
222  fArgp = 0;
223  fPClient = 0;
224  fSecClient = 0;
225  fAuthProt = 0;
226  fResponses.reserve(10);
227 
228  fStdErrFD = (pi && pi->eDest) ? pi->eDest->baseFD() : fileno(stderr);
229 
230  // Instantiate a Proofd protocol object
231  Reset();
232 }
233 
234 ////////////////////////////////////////////////////////////////////////////////
235 /// Get response instance corresponding to stream ID 'sid'
236 
238 {
239  XPDLOC(ALL, "Protocol::Response")
240 
241  TRACE(HDBG, "sid: "<<sid<<", size: "<<fResponses.size());
242 
243  if (sid > 0)
244  if (sid <= fResponses.size())
245  return fResponses[sid-1];
246 
247  return (XrdProofdResponse *)0;
248 }
249 
250 ////////////////////////////////////////////////////////////////////////////////
251 /// Create new response instance for stream ID 'sid'
252 
254 {
255  XPDLOC(ALL, "Protocol::GetNewResponse")
256 
257  XrdOucString msg;
258  XPDFORM(msg, "sid: %d", sid);
259  if (sid > 0) {
260  if (sid > fResponses.size()) {
261  if (sid > fResponses.capacity()) {
262  int newsz = (sid < 2 * fResponses.capacity()) ? 2 * fResponses.capacity() : sid+1 ;
263  fResponses.reserve(newsz);
264  if (TRACING(DBG)) {
265  msg += " new capacity: ";
266  msg += (int) fResponses.capacity();
267  }
268  }
269  int nnew = sid - fResponses.size();
270  while (nnew--)
271  fResponses.push_back(new XrdProofdResponse());
272  if (TRACING(DBG)) {
273  msg += "; new size: ";
274  msg += (int) fResponses.size();
275  }
276  }
277  } else {
278  TRACE(XERR,"wrong sid: "<<sid);
279  return (XrdProofdResponse *)0;
280  }
281 
282  TRACE(DBG, msg);
283 
284  // Done
285  return fResponses[sid-1];
286 }
287 
288 ////////////////////////////////////////////////////////////////////////////////
289 /// Check whether the request matches this protocol
290 
291 XrdProtocol *XrdProofdProtocol::Match(XrdLink *lp)
292 {
293  XPDLOC(ALL, "Protocol::Match")
294 
295  struct ClientInitHandShake hsdata;
296  char *hsbuff = (char *)&hsdata;
297 
298  static hs_response_t hsresp = {0, 0, kXR_int32(htonl(XPROOFD_VERSBIN)), 0};
299 
300  XrdProofdProtocol *xp;
301  int dlen;
302  TRACE(HDBG, "enter");
303 
304  XrdOucString emsg;
305  // Peek at the first 20 bytes of data
306  if ((dlen = lp->Peek(hsbuff,sizeof(hsdata),fgReadWait)) != sizeof(hsdata)) {
307  if (dlen <= 0) lp->setEtext("Match: handshake not received");
308  if (dlen == 12) {
309  // Check if it is a request to open a file via 'rootd'
310  hsdata.first = ntohl(hsdata.first);
311  if (hsdata.first == 8) {
312  if (strlen(fgMgr->RootdExe()) > 0) {
313  if (fgMgr->IsRootdAllowed((const char *)lp->Host())) {
314  TRACE(ALL, "matched rootd protocol on link: executing "<<fgMgr->RootdExe());
315  XrdOucString em;
316  if (StartRootd(lp, em) != 0) {
317  emsg = "rootd: failed to start daemon: ";
318  emsg += em;
319  }
320  } else {
321  XPDFORM(emsg, "rootd-file serving not authorized for host '%s'", lp->Host());
322  }
323  } else {
324  emsg = "rootd-file serving not enabled";
325  }
326  }
327  if (emsg.length() > 0) {
328  lp->setEtext(emsg.c_str());
329  } else {
330  lp->setEtext("link transfered");
331  }
332  return (XrdProtocol *)0;
333  }
334  TRACE(XERR, "peeked incomplete or empty information! (dlen: "<<dlen<<" bytes)");
335  return (XrdProtocol *)0;
336  }
337 
338  // Verify that this is our protocol
339  hsdata.third = ntohl(hsdata.third);
340  if (dlen != sizeof(hsdata) || hsdata.first || hsdata.second
341  || !(hsdata.third == 1) || hsdata.fourth || hsdata.fifth) return 0;
342 
343  // Respond to this request with the handshake response
344  if (!lp->Send((char *)&hsresp, sizeof(hsresp))) {
345  lp->setEtext("Match: handshake failed");
346  TRACE(XERR, "handshake failed");
347  return (XrdProtocol *)0;
348  }
349 
350  // We can now read all 20 bytes and discard them (no need to wait for it)
351  int len = sizeof(hsdata);
352  if (lp->Recv(hsbuff, len) != len) {
353  lp->setEtext("Match: reread failed");
354  TRACE(XERR, "reread failed");
355  return (XrdProtocol *)0;
356  }
357 
358  // Get a protocol object off the stack (if none, allocate a new one)
359  if (!(xp = fgProtStack.Pop()))
360  xp = new XrdProofdProtocol();
361 
362  // Bind the protocol to the link and return the protocol
363  xp->fLink = lp;
364  snprintf(xp->fSecEntity.prot, XrdSecPROTOIDSIZE, "host");
365  xp->fSecEntity.host = strdup((char *)lp->Host());
366 
367  // Dummy data used by 'proofd'
368  kXR_int32 dum[2];
369  if (xp->GetData("dummy",(char *)&dum[0],sizeof(dum)) != 0) {
370  xp->Recycle(0,0,0);
371  return (XrdProtocol *)0;
372  }
373 
374  // We are done
375  return (XrdProtocol *)xp;
376 }
377 
378 ////////////////////////////////////////////////////////////////////////////////
379 /// Transfer the connection to a rootd daemon to serve a file access request
380 /// Return 0 on success, -1 on failure
381 
382 int XrdProofdProtocol::StartRootd(XrdLink *lp, XrdOucString &emsg)
383 {
384  XPDLOC(ALL, "Protocol::StartRootd")
385 
386  const char *prog = fgMgr->RootdExe();
387  const char **progArg = fgMgr->RootdArgs();
388 
389  if (fgMgr->RootdFork()) {
390 
391  // Start rootd using fork()
392 
393  pid_t pid;
394  if ((pid = fgMgr->Sched()->Fork(lp->Name()))) {
395  if (pid < 0) {
396  emsg = "rootd fork failed";
397  return -1;
398  }
399  return 0;
400  }
401  // In the child ...
402 
403  // Restablish standard error for the program we will exec
404  dup2(fStdErrFD, STDERR_FILENO);
405  close(fStdErrFD);
406 
407  // Force stdin/out to point to the socket FD (this will also bypass the
408  // close on exec setting for the socket)
409  dup2(lp->FDnum(), STDIN_FILENO);
410  dup2(lp->FDnum(), STDOUT_FILENO);
411 
412  // Do the exec
413  execv((const char *)prog, (char * const *)progArg);
414  TRACE(XERR, "rootd: Oops! Exec(" <<prog <<") failed; errno: " <<errno);
415  _exit(17);
416 
417  } else {
418 
419  // Start rootd using system + proofexecv
420 
421  // ROOT version
422  XrdROOT *roo = fgMgr->ROOTMgr()->DefaultVersion();
423  if (!roo) {
424  emsg = "ROOT version undefined!";
425  return -1;
426  }
427  // The path to the executable
428  XrdOucString pexe;
429  XPDFORM(pexe, "%s/proofexecv", roo->BinDir());
430  if (access(pexe.c_str(), X_OK) != 0) {
431  XPDFORM(emsg, "path '%s' does not exist or is not executable (errno: %d)",
432  pexe.c_str(), (int)errno);
433  return -1;
434  }
435 
436  // Start the proofexecv
437  XrdOucString cmd, exp;
438  XPDFORM(cmd, "export ROOTBINDIR=\"%s\"; %s 20 0 %s %s", roo->BinDir(),
439  pexe.c_str(), fgMgr->RootdUnixSrv()->path(), prog);
440  int n = 1;
441  while (progArg[n] != 0) {
442  cmd += " "; cmd += progArg[n]; n++;
443  }
444  cmd += " &";
445  TRACE(HDBG, cmd);
446  if (system(cmd.c_str()) == -1) {
447  XPDFORM(emsg, "failure from 'system' (errno: %d)", (int)errno);
448  return -1;
449  }
450 
451  // Accept a connection from the second server
452  int err = 0;
453  rpdunix *uconn = fgMgr->RootdUnixSrv()->accept(-1, &err);
454  if (!uconn || !uconn->isvalid(0)) {
455  XPDFORM(emsg, "failure accepting callback (errno: %d)", -err);
456  if (uconn) delete uconn;
457  return -1;
458  }
459  TRACE(HDBG, "proofexecv connected!");
460 
461  int rcc = 0;
462  // Transfer the open descriptor to be used in rootd
463  int fd = dup(lp->FDnum());
464  if (fd < 0 || (rcc = uconn->senddesc(fd)) != 0) {
465  XPDFORM(emsg, "failure sending descriptor '%d' (original: %d); (errno: %d)", fd, lp->FDnum(), -rcc);
466  if (uconn) delete uconn;
467  return -1;
468  }
469  // Close the connection to the parent
470  delete uconn;
471  }
472 
473  // Done
474  return 0;
475 }
476 
477 ////////////////////////////////////////////////////////////////////////////////
478 /// Return statistics info about the protocol.
479 /// Not really implemented yet: this is a reduced XrdXrootd version.
480 
481 int XrdProofdProtocol::Stats(char *buff, int blen, int)
482 {
483  static char statfmt[] = "<stats id=\"xproofd\"><num>%ld</num></stats>";
484 
485  // If caller wants only size, give it to them
486  if (!buff)
487  return sizeof(statfmt)+16;
488 
489  // We have only one statistic -- number of successful matches
490  return snprintf(buff, blen, statfmt, fgCount);
491 }
492 
493 ////////////////////////////////////////////////////////////////////////////////
494 /// Reset static and local vars
495 
497 {
498  // Init local vars
499  fLink = 0;
500  fPid = -1;
501  fArgp = 0;
502  fStatus = 0;
503  fClntCapVer = 0;
505  fSuperUser = 0;
506  fPClient = 0;
507  fUserIn = "";
508  fGroupIn = "";
509  fCID = -1;
510  fTraceID = "";
511  fAdminPath = "";
512  if (fAuthProt) {
513  fAuthProt->Delete();
514  fAuthProt = 0;
515  }
516  memset(&fSecEntity, 0, sizeof(fSecEntity));
517  // Cleanup existing XrdProofdResponse objects
518  std::vector<XrdProofdResponse *>::iterator ii = fResponses.begin(); // One per each logical connection
519  while (ii != fResponses.end()) {
520  (*ii)->Reset();
521  ii++;
522  }
523 }
524 
525 ////////////////////////////////////////////////////////////////////////////////
526 /// Protocol configuration tool
527 /// Function: Establish configuration at load time.
528 /// Output: 1 upon success or 0 otherwise.
529 
530 int XrdProofdProtocol::Configure(char *, XrdProtocol_Config *pi)
531 {
532  XPDLOC(ALL, "Protocol::Configure")
533 
534  XrdOucString mp;
535 
536  // Only once
537  if (fgConfigDone)
538  return 1;
539  fgConfigDone = 1;
540 
541  // Copy out the special info we want to use at top level
542  fgLogger = pi->eDest->logger();
543  fgEDest.logger(fgLogger);
544  if (XrdProofdTrace) delete XrdProofdTrace; // It could have been initialized in XrdgetProtocolPort
545  XrdProofdTrace = new XrdOucTrace(&fgEDest);
546  fgBPool = pi->BPool;
547  fgReadWait = pi->readWait;
548 
549  // Pre-initialize some i/o values
550  fgMaxBuffsz = fgBPool->MaxSize();
551 
552  // Schedule protocol object cleanup; the maximum number of objects
553  // and the max age are taken from XrdXrootdProtocol: this may need
554  // some optimization in the future.
555 #if 1
556  fgProtStack.Set(pi->Sched, XrdProofdTrace, TRACE_MEM);
557  fgProtStack.Set((pi->ConnMax/3 ? pi->ConnMax/3 : 30), 60*60);
558 #else
559  fgProtStack.Set(pi->Sched, 3600);
560 #endif
561 
562  // Default tracing options: always trace logins and errors for all
563  // domains; if the '-d' option was specified on the command line then
564  // trace also REQ and FORM.
565  // NB: these are superseeded by settings in the config file (xpd.trace)
566  XrdProofdTrace->What = TRACE_DOMAINS;
567  TRACESET(XERR, 1);
568  TRACESET(LOGIN, 1);
569  TRACESET(RSP, 0);
570  if (pi->DebugON)
571  XrdProofdTrace->What |= (TRACE_REQ | TRACE_FORK);
572 
573  // Work as root to avoid contineous changes of the effective user
574  // (users are logged in their box after forking)
575  fgEUidAtStartup = geteuid();
576  if (!getuid()) XrdSysPriv::ChangePerm((uid_t)0, (gid_t)0);
577 
578  // Process the config file for directives meaningful to us
579  // Create and Configure the manager
580  fgMgr = new XrdProofdManager(pi, &fgEDest);
581  if (fgMgr->Config(0)) return 0;
582  mp = "global manager created";
583  TRACE(ALL, mp);
584 
585  // Issue herald indicating we configured successfully
586  TRACE(ALL, "xproofd protocol version "<<XPROOFD_VERSION<<
587  " build "<<XrdVERSION<<" successfully loaded");
588 
589  // Return success
590  return 1;
591 }
592 
593 ////////////////////////////////////////////////////////////////////////////////
594 /// Process the information received on the active link.
595 /// (We ignore the argument here)
596 
598 {
599  XPDLOC(ALL, "Protocol::Process")
600 
601  int rc = 0;
602  TRACEP(this, DBG, "instance: " << this);
603 
604  // Read the next request header
605  if ((rc = GetData("request", (char *)&fRequest, sizeof(fRequest))) != 0)
606  return rc;
607  TRACEP(this, HDBG, "after GetData: rc: " << rc);
608 
609  // Deserialize the data
610  fRequest.header.requestid = ntohs(fRequest.header.requestid);
611  fRequest.header.dlen = ntohl(fRequest.header.dlen);
612 
613  // Get response object
614  kXR_unt16 sid;
615  memcpy((void *)&sid, (const void *)&(fRequest.header.streamid[0]), 2);
616  XrdProofdResponse *response = 0;
617  if (!(response = Response(sid))) {
618  if (!(response = GetNewResponse(sid))) {
619  TRACEP(this, XERR, "could not get Response instance for rid: "<< sid);
620  return rc;
621  }
622  }
623  // Set the stream ID for the reply
624  response->Set(fRequest.header.streamid);
625  response->Set(fLink);
626 
627  TRACEP(this, REQ, "sid: " << sid << ", req id: " << fRequest.header.requestid <<
628  " (" << XrdProofdAux::ProofRequestTypes(fRequest.header.requestid)<<
629  ")" << ", dlen: " <<fRequest.header.dlen);
630 
631  // Every request has an associated data length. It better be >= 0 or we won't
632  // be able to know how much data to read.
633  if (fRequest.header.dlen < 0) {
634  response->Send(kXR_ArgInvalid, "Process: Invalid request data length");
635  return fLink->setEtext("Process: protocol data length error");
636  }
637 
638  // Read any argument data at this point, except when the request is to forward
639  // a buffer: the argument may have to be segmented and we're not prepared to do
640  // that here.
641  if (fRequest.header.requestid != kXP_sendmsg && fRequest.header.dlen) {
642  if ((fArgp = GetBuff(fRequest.header.dlen+1, fArgp)) == 0) {
643  response->Send(kXR_ArgTooLong, "fRequest.argument is too long");
644  return rc;
645  }
646  if ((rc = GetData("arg", fArgp->buff, fRequest.header.dlen)))
647  return rc;
648  fArgp->buff[fRequest.header.dlen] = '\0';
649  }
650 
651  // Continue with request processing at the resume point
652  return Process2();
653 }
654 
655 ////////////////////////////////////////////////////////////////////////////////
656 /// Local processing method: here the request is dispatched to the appropriate
657 /// method
658 
660 {
661  XPDLOC(ALL, "Protocol::Process2")
662 
663  int rc = 0;
664  XPD_SETRESP(this, "Process2");
665 
666  TRACEP(this, REQ, "req id: " << fRequest.header.requestid << " (" <<
668 
669  ResetCtrlcGuard_t ctrlcguard(this, fRequest.header.requestid);
670 
671  // If the user is logged in check if the wanted action is to be done by us
672  if (fStatus && (fStatus & XPD_LOGGEDIN)) {
673  // Record time of the last action
674  TouchAdminPath();
675  // We must have a client instance if here
676  if (!fPClient) {
677  TRACEP(this, XERR, "client undefined!!! ");
678  response->Send(kXR_InvalidRequest,"client undefined!!! ");
679  return 0;
680  }
681  bool formgr = 0;
682  switch(fRequest.header.requestid) {
683  case kXP_ctrlc:
684  rc = CtrlC();
685  break;
686  case kXP_touch:
687  // Reset the asked-to-touch flag, if it was never set
688  fPClient->Touch(1);
689  break;
690  case kXP_interrupt:
691  rc = Interrupt();
692  break;
693  case kXP_ping:
694  rc = Ping();
695  break;
696  case kXP_sendmsg:
697  rc = SendMsg();
698  break;
699  case kXP_urgent:
700  rc = Urgent();
701  break;
702  default:
703  formgr = 1;
704  }
705  if (!formgr) {
706  // Check the link
707  if (!fLink || (fLink->FDnum() <= 0)) {
708  TRACE(XERR, "link is undefined! ");
709  return -1;
710  }
711  return rc;
712  }
713  }
714 
715  // The request is for the manager
716  rc = fgMgr->Process(this);
717  // Check the link
718  if (!fLink || (fLink->FDnum() <= 0)) {
719  TRACE(XERR, "link is undefined! ");
720  return -1;
721  }
722  return rc;
723 }
724 
725 ////////////////////////////////////////////////////////////////////////////////
726 /// Recycle call. Release the instance and give it back to the stack.
727 
728 void XrdProofdProtocol::Recycle(XrdLink *, int, const char *)
729 {
730  XPDLOC(ALL, "Protocol::Recycle")
731 
732  const char *srvtype[6] = {"ANY", "MasterWorker", "MasterMaster",
733  "ClientMaster", "Internal", "Admin"};
734  XrdOucString buf;
735 
736  // Document the disconnect
737  if (fPClient)
738  XPDFORM(buf, "user %s disconnected; type: %s", fPClient->User(),
739  srvtype[fConnType+1]);
740  else
741  XPDFORM(buf, "user disconnected; type: %s", srvtype[fConnType+1]);
742  TRACEP(this, LOGIN, buf);
743 
744  // If we have a buffer, release it
745  if (fArgp) {
746  fgBPool->Release(fArgp);
747  fArgp = 0;
748  }
749 
750  // Locate the client instance
751  XrdProofdClient *pmgr = fPClient;
752 
753  if (pmgr) {
754  if (!Internal()) {
755 
756  TRACE(REQ,"External disconnection of protocol associated with pid "<<fPid);
757 
758  // Write disconnection file
759  XrdOucString discpath(fAdminPath);
760  discpath.replace("/cid", "/disconnected");
761  FILE *fd = fopen(discpath.c_str(), "w");
762  if (!fd && errno != ENOENT) {
763  TRACE(XERR, "unable to create path: " <<discpath<<" (errno: "<<errno<<")");
764  } else if (fd) {
765  fclose(fd);
766  }
767 
768  // Remove protocol and response from attached client/proofserv instances
769  // Set reconnect flag if proofserv instances attached to this client are still running
770  pmgr->ResetClientSlot(fCID);
771  if(fgMgr && fgMgr->SessionMgr()) {
773 
775  if((fConnType == 0) && fgMgr->SessionMgr()->Alive(this)) {
776  TRACE(REQ, "Non-destroyed proofserv processes attached to this protocol ("<<this<<
777  "), setting reconnect time");
779  }
781  } else {
782  TRACE(XERR, "No XrdProofdMgr ("<<fgMgr<<") or SessionMgr ("
783  <<(fgMgr ? fgMgr->SessionMgr() : (void *) -1)<<")")
784  }
785 
786  } else {
787 
788  // Internal connection: we need to remove this instance from the list
789  // of proxy servers and to notify the attached clients.
790  // Tell the session manager that this session has gone
791  if (fgMgr && fgMgr->SessionMgr()) {
793  TRACE(HDBG, "fAdminPath: "<<fAdminPath);
794  buf.assign(fAdminPath, fAdminPath.rfind('/') + 1, -1);
795  fgMgr->SessionMgr()->DeleteFromSessions(buf.c_str());
796  // Move the entry to the terminated sessions area
797  fgMgr->SessionMgr()->MvSession(buf.c_str());
798  }
799  else {
800  TRACE(XERR,"No XrdProofdMgr ("<<fgMgr<<") or SessionMgr ("<<fgMgr->SessionMgr()<<")")
801  }
802  }
803  }
804  // Set fields to starting point (debugging mostly)
805  Reset();
806 
807  // Push ourselves on the stack
809 #if 0
810  if(fgProtStack.Push(&fProtLink) != 0) {
812  fProtLink.setItem(0);
813  delete xp;
814  }
815 #endif
816 }
817 
818 ////////////////////////////////////////////////////////////////////////////////
819 /// Allocate a buffer to handle quantum bytes; if argp points to an existing
820 /// buffer, its size is checked and re-allocated if needed
821 
822 XrdBuffer *XrdProofdProtocol::GetBuff(int quantum, XrdBuffer *argp)
823 {
824  XPDLOC(ALL, "Protocol::GetBuff")
825 
826  TRACE(HDBG, "len: "<<quantum);
827 
828  // If we are given an existing buffer, we keep it if we use at least half
829  // of it; otherwise we take a smaller one
830  if (argp) {
831  if (quantum >= argp->bsize / 2 && quantum <= argp->bsize)
832  return argp;
833  }
834 
835  // Release the buffer if too small
837  if (argp)
838  fgBPool->Release(argp);
839 
840  // Obtain a new one
841  if ((argp = fgBPool->Obtain(quantum)) == 0) {
842  TRACE(XERR, "could not get requested buffer (size: "<<quantum<<
843  ") = insufficient memory");
844  } else {
845  TRACE(HDBG, "quantum: "<<quantum<<
846  ", buff: "<<(void *)(argp->buff)<<", bsize:"<<argp->bsize);
847  }
848 
849  // Done
850  return argp;
851 }
852 
853 ////////////////////////////////////////////////////////////////////////////////
854 /// Release a buffer previously allocated via GetBuff
855 
856 void XrdProofdProtocol::ReleaseBuff(XrdBuffer *argp)
857 {
859  fgBPool->Release(argp);
860 }
861 
862 ////////////////////////////////////////////////////////////////////////////////
863 /// Get data from the open link
864 
865 int XrdProofdProtocol::GetData(const char *dtype, char *buff, int blen)
866 {
867  XPDLOC(ALL, "Protocol::GetData")
868 
869  int rlen;
870 
871  // Read the data but reschedule the link if we have not received all of the
872  // data within the timeout interval.
873  TRACEP(this, HDBG, "dtype: "<<(dtype ? dtype : " - ")<<", blen: "<<blen);
874 
875  // No need to lock:the link is disable while we are here
876  rlen = fLink->Recv(buff, blen, fgReadWait);
877  if (rlen < 0) {
878  if (rlen != -ENOMSG && rlen != -ECONNRESET) {
879  XrdOucString emsg = "link read error: errno: ";
880  emsg += -rlen;
881  TRACEP(this, XERR, emsg.c_str());
882  return (fLink ? fLink->setEtext(emsg.c_str()) : -1);
883  } else {
884  TRACEP(this, HDBG, "connection closed by peer (errno: "<<-rlen<<")");
885  return -1;
886  }
887  }
888  if (rlen < blen) {
889  TRACEP(this, DBG, dtype << " timeout; read " <<rlen <<" of " <<blen <<" bytes - rescheduling");
890  return 1;
891  }
892  TRACEP(this, HDBG, "rlen: "<<rlen);
893 
894  return 0;
895 }
896 
897 ////////////////////////////////////////////////////////////////////////////////
898 /// Send data over the open link. Segmentation is done here, if required.
899 
901  kXR_int32 sid, XrdSrvBuffer **buf, bool savebuf)
902 {
903  XPDLOC(ALL, "Protocol::SendData")
904 
905  int rc = 0;
906 
907  TRACEP(this, HDBG, "length: "<<fRequest.header.dlen<<" bytes ");
908 
909  // Buffer length
910  int len = fRequest.header.dlen;
911 
912  // Quantum size
913  int quantum = (len > fgMaxBuffsz ? fgMaxBuffsz : len);
914 
915  // Get a buffer
916  XrdBuffer *argp = XrdProofdProtocol::GetBuff(quantum);
917  if (!argp) return -1;
918 
919  // Now send over all of the data as unsolicited messages
920  XrdOucString msg;
921  while (len > 0) {
922 
923  XrdProofdResponse *response = (sid > -1) ? xps->Response() : 0;
924 
925  if ((rc = GetData("data", argp->buff, quantum))) {
926  { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
927  return -1;
928  }
929  if (buf && !(*buf) && savebuf)
930  *buf = new XrdSrvBuffer(argp->buff, quantum, 1);
931  // Send
932  if (sid > -1) {
933  if (TRACING(HDBG))
934  XPDFORM(msg, "EXT: server ID: %d, sending: %d bytes", sid, quantum);
935  if (!response || response->Send(kXR_attn, kXPD_msgsid, sid,
936  argp->buff, quantum) != 0) {
937  { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
938  XPDFORM(msg, "EXT: server ID: %d, problems sending: %d bytes to server",
939  sid, quantum);
940  TRACEP(this, XERR, msg);
941  return -1;
942  }
943  } else {
944 
945  // Get ID of the client
946  int cid = ntohl(fRequest.sendrcv.cid);
947  if (TRACING(HDBG))
948  XPDFORM(msg, "INT: client ID: %d, sending: %d bytes", cid, quantum);
949  if (xps->SendData(cid, argp->buff, quantum) != 0) {
950  { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
951  XPDFORM(msg, "INT: client ID: %d, problems sending: %d bytes to client",
952  cid, quantum);
953  TRACEP(this, XERR, msg);
954  return -1;
955  }
956  }
957  TRACEP(this, HDBG, msg);
958  // Next segment
959  len -= quantum;
960  if (len < quantum)
961  quantum = len;
962  }
963 
964  // Release the buffer
965  { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
966 
967  // Done
968  return 0;
969 }
970 
971 ////////////////////////////////////////////////////////////////////////////////
972 /// Send data over the open client links of session 'xps'.
973 /// Used when all the connected clients are eligible to receive the message.
974 /// Segmentation is done here, if required.
975 
977  XrdSrvBuffer **buf, bool savebuf)
978 {
979  XPDLOC(ALL, "Protocol::SendDataN")
980 
981  int rc = 0;
982 
983  TRACEP(this, HDBG, "length: "<<fRequest.header.dlen<<" bytes ");
984 
985  // Buffer length
986  int len = fRequest.header.dlen;
987 
988  // Quantum size
989  int quantum = (len > fgMaxBuffsz ? fgMaxBuffsz : len);
990 
991  // Get a buffer
992  XrdBuffer *argp = XrdProofdProtocol::GetBuff(quantum);
993  if (!argp) return -1;
994 
995  // Now send over all of the data as unsolicited messages
996  while (len > 0) {
997  if ((rc = GetData("data", argp->buff, quantum))) {
999  return -1;
1000  }
1001  if (buf && !(*buf) && savebuf)
1002  *buf = new XrdSrvBuffer(argp->buff, quantum, 1);
1003 
1004  // Send to connected clients
1005  if (xps->SendDataN(argp->buff, quantum) != 0) {
1007  return -1;
1008  }
1009 
1010  // Next segment
1011  len -= quantum;
1012  if (len < quantum)
1013  quantum = len;
1014  }
1015 
1016  // Release the buffer
1018 
1019  // Done
1020  return 0;
1021 }
1022 
1023 ////////////////////////////////////////////////////////////////////////////////
1024 /// Handle a request to forward a message to another process
1025 
1027 {
1028  XPDLOC(ALL, "Protocol::SendMsg")
1029 
1030  static const char *crecv[5] = {"master proofserv", "top master",
1031  "client", "undefined", "any"};
1032  int rc = 0;
1033 
1034  XPD_SETRESP(this, "SendMsg");
1035 
1036  // Unmarshall the data
1037  int psid = ntohl(fRequest.sendrcv.sid);
1038  int opt = ntohl(fRequest.sendrcv.opt);
1039 
1040  XrdOucString msg;
1041  // Find server session
1042  XrdProofdProofServ *xps = 0;
1043  if (!fPClient || !(xps = fPClient->GetServer(psid))) {
1044  XPDFORM(msg, "%s: session ID not found: %d", (Internal() ? "INT" : "EXT"), psid);
1045  TRACEP(this, XERR, msg.c_str());
1046  response->Send(kXR_InvalidRequest, msg.c_str());
1047  return 0;
1048  }
1049 
1050  // Message length
1051  int len = fRequest.header.dlen;
1052 
1053  if (!Internal()) {
1054 
1055  if (TRACING(HDBG)) {
1056  // Notify
1057  XPDFORM(msg, "EXT: sending %d bytes to proofserv (psid: %d, xps: %p, status: %d,"
1058  " cid: %d)", len, psid, xps, xps->Status(), fCID);
1059  TRACEP(this, HDBG, msg.c_str());
1060  }
1061 
1062  // Send to proofsrv our client ID
1063  if (fCID == -1) {
1064  TRACEP(this, REQ, "EXT: error getting clientSID");
1065  response->Send(kXP_ServerError,"EXT: getting clientSID");
1066  return 0;
1067  }
1068  if (SendData(xps, fCID)) {
1069  TRACEP(this, REQ, "EXT: error sending message to proofserv");
1070  response->Send(kXP_reconnecting,"EXT: sending message to proofserv");
1071  return 0;
1072  }
1073 
1074  // Notify to user
1075  response->Send();
1076 
1077  } else {
1078 
1079  if (TRACING(HDBG)) {
1080  // Notify
1081  XPDFORM(msg, "INT: sending %d bytes to client/master (psid: %d, xps: %p, status: %d)",
1082  len, psid, xps, xps->Status());
1083  TRACEP(this, HDBG, msg.c_str());
1084  }
1085  bool saveStartMsg = 0;
1086  XrdSrvBuffer *savedBuf = 0;
1087  // Additional info about the message
1088  if (opt & kXPD_setidle) {
1089  TRACEP(this, DBG, "INT: setting proofserv in 'idle' state");
1090  xps->SetStatus(kXPD_idle);
1091  PostSession(-1, fPClient->UI().fUser.c_str(),
1092  fPClient->UI().fGroup.c_str(), xps);
1093  } else if (opt & kXPD_querynum) {
1094  TRACEP(this, DBG, "INT: got message with query number");
1095  } else if (opt & kXPD_startprocess) {
1096  TRACEP(this, DBG, "INT: setting proofserv in 'running' state");
1097  xps->SetStatus(kXPD_running);
1098  PostSession(1, fPClient->UI().fUser.c_str(),
1099  fPClient->UI().fGroup.c_str(), xps);
1100  // Save start processing message for later clients
1101  xps->DeleteStartMsg();
1102  saveStartMsg = 1;
1103  } else if (opt & kXPD_logmsg) {
1104  // We broadcast log messages only not idle to catch the
1105  // result from processing
1106  if (xps->Status() == kXPD_running) {
1107  TRACEP(this, DBG, "INT: broadcasting log message");
1108  opt |= kXPD_fb_prog;
1109  }
1110  }
1111  bool fbprog = (opt & kXPD_fb_prog);
1112 
1113  if (!fbprog) {
1114  //
1115  // The message is strictly for the client requiring it
1116  if (SendData(xps, -1, &savedBuf, saveStartMsg) != 0) {
1117  response->Send(kXP_reconnecting,
1118  "SendMsg: INT: session is reconnecting: retry later");
1119  return 0;
1120  }
1121  } else {
1122  // Send to all connected clients
1123  if (SendDataN(xps, &savedBuf, saveStartMsg) != 0) {
1124  response->Send(kXP_reconnecting,
1125  "SendMsg: INT: session is reconnecting: retry later");
1126  return 0;
1127  }
1128  }
1129  // Save start processing messages, if required
1130  if (saveStartMsg)
1131  xps->SetStartMsg(savedBuf);
1132 
1133  if (TRACING(DBG)) {
1134  int ii = xps->SrvType();
1135  if (ii > 3) ii = 3;
1136  if (ii < 0) ii = 4;
1137  XPDFORM(msg, "INT: message sent to %s (%d bytes)", crecv[ii], len);
1138  TRACEP(this, DBG, msg);
1139  }
1140  // Notify to proofsrv
1141  response->Send();
1142  }
1143 
1144  // Over
1145  return 0;
1146 }
1147 
1148 ////////////////////////////////////////////////////////////////////////////////
1149 /// Handle generic request of a urgent message to be forwarded to the server
1150 
1152 {
1153  XPDLOC(ALL, "Protocol::Urgent")
1154 
1155  unsigned int rc = 0;
1156 
1157  XPD_SETRESP(this, "Urgent");
1158 
1159  // Unmarshall the data
1160  int psid = ntohl(fRequest.proof.sid);
1161  int type = ntohl(fRequest.proof.int1);
1162  int int1 = ntohl(fRequest.proof.int2);
1163  int int2 = ntohl(fRequest.proof.int3);
1164 
1165  TRACEP(this, REQ, "psid: "<<psid<<", type: "<< type);
1166 
1167  // Find server session
1168  XrdProofdProofServ *xps = 0;
1169  if (!fPClient || !(xps = fPClient->GetServer(psid))) {
1170  TRACEP(this, XERR, "session ID not found: "<<psid);
1171  response->Send(kXR_InvalidRequest,"Urgent: session ID not found");
1172  return 0;
1173  }
1174 
1175  TRACEP(this, DBG, "xps: "<<xps<<", status: "<<xps->Status());
1176 
1177  // Check ID matching
1178  if (!xps->Match(psid)) {
1179  response->Send(kXP_InvalidRequest,"Urgent: IDs do not match - do nothing");
1180  return 0;
1181  }
1182 
1183  // Check the link to the session
1184  if (!xps->Response()) {
1185  response->Send(kXP_InvalidRequest,"Urgent: session response object undefined - do nothing");
1186  return 0;
1187  }
1188 
1189  // Prepare buffer
1190  int len = 3 *sizeof(kXR_int32);
1191  char *buf = new char[len];
1192  // Type
1193  kXR_int32 itmp = static_cast<kXR_int32>(htonl(type));
1194  memcpy(buf, &itmp, sizeof(kXR_int32));
1195  // First info container
1196  itmp = static_cast<kXR_int32>(htonl(int1));
1197  memcpy(buf + sizeof(kXR_int32), &itmp, sizeof(kXR_int32));
1198  // Second info container
1199  itmp = static_cast<kXR_int32>(htonl(int2));
1200  memcpy(buf + 2 * sizeof(kXR_int32), &itmp, sizeof(kXR_int32));
1201  // Send over
1202  if (xps->Response()->Send(kXR_attn, kXPD_urgent, buf, len) != 0) {
1203  response->Send(kXP_ServerError,
1204  "Urgent: could not propagate request to proofsrv");
1205  return 0;
1206  }
1207 
1208  // Notify to user
1209  response->Send();
1210  TRACEP(this, DBG, "request propagated to proofsrv");
1211 
1212  // Over
1213  return 0;
1214 }
1215 
1216 ////////////////////////////////////////////////////////////////////////////////
1217 /// Handle an interrupt request
1218 
1220 {
1221  XPDLOC(ALL, "Protocol::Interrupt")
1222 
1223  int rc = 0;
1224 
1225  XPD_SETRESP(this, "Interrupt");
1226 
1227  // Unmarshall the data
1228  int psid = ntohl(fRequest.interrupt.sid);
1229  int type = ntohl(fRequest.interrupt.type);
1230  TRACEP(this, REQ, "psid: "<<psid<<", type:"<<type);
1231 
1232  // Find server session
1233  XrdProofdProofServ *xps = 0;
1234  if (!fPClient || !(xps = fPClient->GetServer(psid))) {
1235  TRACEP(this, XERR, "session ID not found: "<<psid);
1236  response->Send(kXR_InvalidRequest,"Interrupt: session ID not found");
1237  return 0;
1238  }
1239 
1240  if (xps) {
1241 
1242  // Check ID matching
1243  if (!xps->Match(psid)) {
1244  response->Send(kXP_InvalidRequest,"Interrupt: IDs do not match - do nothing");
1245  return 0;
1246  }
1247 
1248  XrdOucString msg;
1249  XPDFORM(msg, "xps: %p, link ID: %s, proofsrv PID: %d",
1250  xps, xps->Response()->TraceID(), xps->SrvPID());
1251  TRACEP(this, DBG, msg.c_str());
1252 
1253  // Propagate the type as unsolicited
1254  if (xps->Response()->Send(kXR_attn, kXPD_interrupt, type) != 0) {
1255  response->Send(kXP_ServerError,
1256  "Interrupt: could not propagate interrupt code to proofsrv");
1257  return 0;
1258  }
1259 
1260  // Notify to user
1261  response->Send();
1262  TRACEP(this, DBG, "interrupt propagated to proofsrv");
1263  }
1264 
1265  // Over
1266  return 0;
1267 }
1268 
1269 ////////////////////////////////////////////////////////////////////////////////
1270 /// Handle a ping request.
1271 /// For internal connections, ping is done asynchronously to avoid locking
1272 /// problems; the session checker verifies that the admin file has been touched
1273 /// recently enough; touching is done in Process2, so we have nothing to do here
1274 
1276 {
1277  XPDLOC(ALL, "Protocol::Ping")
1278 
1279  int rc = 0;
1280  if (Internal()) {
1281  if (TRACING(HDBG)) {
1282  XPD_SETRESP(this, "Ping");
1283  TRACEP(this, HDBG, "INT: nothing to do ");
1284  }
1285  return 0;
1286  }
1287  XPD_SETRESP(this, "Ping");
1288 
1289  // Unmarshall the data
1290  int psid = ntohl(fRequest.sendrcv.sid);
1291  int asyncopt = ntohl(fRequest.sendrcv.opt);
1292 
1293  TRACEP(this, REQ, "psid: "<<psid<<", async: "<<asyncopt);
1294 
1295  // For connections to servers find the server session; manager connections
1296  // (psid == -1) do not have any session attached
1297  XrdProofdProofServ *xps = 0;
1298  if (!fPClient || (psid > -1 && !(xps = fPClient->GetServer(psid)))) {
1299  TRACEP(this, XERR, "session ID not found: "<<psid);
1300  response->Send(kXR_InvalidRequest,"session ID not found");
1301  return 0;
1302  }
1303 
1304  // For manager connections we are done
1305  kXR_int32 pingres = (psid > -1) ? 0 : 1;
1306  if (psid > -1 && xps && xps->IsValid()) {
1307 
1308  TRACEP(this, DBG, "EXT: psid: "<<psid);
1309 
1310  // This is the max time we will privide an answer
1311  kXR_int32 checkfq = fgMgr->SessionMgr()->CheckFrequency();
1312 
1313  // If asynchronous return the timeout for an answer
1314  if (asyncopt == 1) {
1315  TRACEP(this, DBG, "EXT: async: notifying timeout to client: "<<checkfq<<" secs");
1316  response->Send(kXR_ok, checkfq);
1317  }
1318 
1319  // Admin path
1320  XrdOucString path(xps->AdminPath());
1321  if (path.length() <= 0) {
1322  TRACEP(this, XERR, "EXT: admin path is empty! - protocol error");
1323  if (asyncopt == 0)
1324  response->Send(kXP_ServerError, "EXT: admin path is empty! - protocol error");
1325  return 0;
1326  }
1327  path += ".status";
1328 
1329  // Current time
1330  int now = time(0);
1331 
1332  // Stat the admin file
1333  struct stat st0;
1334  if (stat(path.c_str(), &st0) != 0) {
1335  TRACEP(this, XERR, "EXT: cannot stat admin path: "<<path);
1336  if (asyncopt == 0)
1337  response->Send(kXP_ServerError, "EXT: cannot stat admin path");
1338  return 0;
1339  }
1340 
1341  // Take the pid
1342  int pid = xps->SrvPID();
1343  // If the session is alive ...
1344  if (XrdProofdAux::VerifyProcessByID(pid) != 0) {
1345  // If it as not touched during the last ~checkfq secs we ask for a refresh
1346  if ((now - st0.st_mtime) > checkfq - 5) {
1347  // Send the request (asking for further propagation)
1348  if (xps->VerifyProofServ(1) != 0) {
1349  TRACEP(this, XERR, "EXT: could not send verify request to proofsrv");
1350  if (asyncopt == 0)
1351  response->Send(kXP_ServerError, "EXT: could not verify reuqest to proofsrv");
1352  return 0;
1353  }
1354  // Wait for the action for checkfq secs, checking every 1 sec
1355  struct stat st1;
1356  int ns = checkfq;
1357  while (ns--) {
1358  if (stat(path.c_str(), &st1) == 0) {
1359  if (st1.st_mtime > st0.st_mtime) {
1360  pingres = 1;
1361  break;
1362  }
1363  }
1364  // Wait 1 sec
1365  TRACEP(this, DBG, "EXT: waiting "<<ns<<" secs for session "<<pid<<
1366  " to touch the admin path");
1367  sleep(1);
1368  }
1369 
1370  } else {
1371  // Session is alive
1372  pingres = 1;
1373  }
1374  } else {
1375  // Session is dead
1376  pingres = 0;
1377  }
1378 
1379  // Notify the client
1380  TRACEP(this, DBG, "EXT: notified the result to client: "<<pingres);
1381  if (asyncopt == 0) {
1382  response->Send(kXR_ok, pingres);
1383  } else {
1384  // Prepare buffer for asynchronous notification
1385  int len = sizeof(kXR_int32);
1386  char *buf = new char[len];
1387  // Option
1388  kXR_int32 ifw = (kXR_int32)0;
1389  ifw = static_cast<kXR_int32>(htonl(ifw));
1390  memcpy(buf, &ifw, sizeof(kXR_int32));
1391  response->Send(kXR_attn, kXPD_ping, buf, len);
1392  }
1393  return 0;
1394  } else if (psid > -1) {
1395  // This is a failure for connections to sessions
1396  TRACEP(this, XERR, "session ID not found: "<<psid);
1397  }
1398 
1399  // Send the result
1400  response->Send(kXR_ok, pingres);
1401 
1402  // Done
1403  return 0;
1404 }
1405 
1406 ////////////////////////////////////////////////////////////////////////////////
1407 /// Post change of session status
1408 
1409 void XrdProofdProtocol::PostSession(int on, const char *u, const char *g,
1410  XrdProofdProofServ *xps)
1411 {
1412  XPDLOC(ALL, "Protocol::PostSession")
1413 
1414  // Tell the priority manager
1415  if (fgMgr && fgMgr->PriorityMgr()) {
1416  int pid = (xps) ? xps->SrvPID() : -1;
1417  if (pid < 0) {
1418  TRACE(XERR, "undefined session or process id");
1419  return;
1420  }
1421  XrdOucString buf;
1422  XPDFORM(buf, "%d %s %s %d", on, u, g, pid);
1423 
1425  buf.c_str()) != 0) {
1426  TRACE(XERR, "problem posting the prority manager pipe");
1427  }
1428  }
1429  // Tell the scheduler
1430  if (fgMgr && fgMgr->ProofSched()) {
1431  if (on == -1 && xps && xps->SrvType() == kXPD_TopMaster) {
1432  TRACE(DBG, "posting the scheduler pipe");
1433  if (fgMgr->ProofSched()->Pipe()->Post(XrdProofSched::kReschedule, 0) != 0) {
1434  TRACE(XERR, "problem posting the scheduler pipe");
1435  }
1436  }
1437  }
1438  // Tell the session manager
1439  if (fgMgr && fgMgr->SessionMgr()) {
1441  TRACE(XERR, "problem posting the session manager pipe");
1442  }
1443  }
1444  // Done
1445  return;
1446 }
1447 
1448 ////////////////////////////////////////////////////////////////////////////////
1449 /// Recording time of the last request on this instance
1450 
1452 {
1453  XPDLOC(ALL, "Protocol::TouchAdminPath")
1454 
1455  XPD_SETRESPV(this, "TouchAdminPath");
1456  TRACEP(this, HDBG, fAdminPath);
1457 
1458  if (fAdminPath.length() > 0) {
1459  int rc = 0;
1460  if ((rc = XrdProofdAux::Touch(fAdminPath.c_str())) != 0) {
1461  // In the case the file was not found and the connetion is internal
1462  // try also the terminated sessions, as the file could have been moved
1463  // in the meanwhile
1464  XrdOucString apath = fAdminPath;
1465  if (rc == -ENOENT && Internal()) {
1466  apath.replace("/activesessions/", "/terminatedsessions/");
1467  apath.replace(".status", "");
1468  rc = XrdProofdAux::Touch(apath.c_str());
1469  }
1470  if (rc != 0 && rc != -ENOENT) {
1471  const char *type = Internal() ? "internal" : "external";
1472  TRACEP(this, XERR, type<<": problems touching "<<apath<<"; errno: "<<-rc);
1473  }
1474  }
1475  }
1476  // Done
1477  return;
1478 }
1479 
1480 ////////////////////////////////////////////////////////////////////////////////
1481 /// Set and propagate a Ctrl-C request
1482 
1484 {
1485  XPDLOC(ALL, "Protocol::CtrlC")
1486 
1487  TRACEP(this, ALL, "handling request");
1488 
1490  fIsCtrlC = 1;
1491  }
1492 
1493  // Propagate now
1494  if (fgMgr) {
1495  if (fgMgr->SrvType() != kXPD_Worker) {
1496  if (fgMgr->NetMgr()) {
1497  fgMgr->NetMgr()->BroadcastCtrlC(Client()->User());
1498  }
1499  }
1500  }
1501 
1502  // Over
1503  return 0;
1504 }
static void PostSession(int on, const char *u, const char *g, XrdProofdProofServ *xps)
Post change of session status.
XrdProofdResponse * Response() const
#define XrdSysLogger
Definition: XpdSysLogger.h:8
#define kXPD_querynum
XrdProofdProtocol * objectItem()
Definition: XpdObject.h:43
static XpdObjectQ fgProtStack
#define XPD_LOGGEDIN
void SetReconnectTime(bool on=1)
Change reconnecting status.
#define TRACE_FORK
#define kXPD_TopMaster
static XrdSysLogger * fgLogger
int ResetClientSlot(int ic)
Reset slot at 'ic'.
#define TRACING(x)
XrdSecEntity * fSecClient
#define XPROOFD_VERSBIN
const double pi
XrdProtocol * Match(XrdLink *lp)
Check whether the request matches this protocol.
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor.
const char ** RootdArgs() const
#define XrdSysRecMutex
Definition: XrdSysToOuc.h:18
struct XPClientInterruptRequest interrupt
int Config(bool rcf=0)
Run configuration and parse the entered config directives.
bool Match(short int id) const
#define TRACE(Flag, Args)
Definition: TGHtml.h:124
int SrvType() const
int Process(XrdProofdProtocol *p)
Process manager request.
int DeleteFromSessions(const char *pid)
Delete from the hash list the session with ID pid.
const char * RootdExe() const
void setItem(XrdProofdProtocol *ival)
Definition: XpdObject.h:49
int SendDataN(void *buff, int len)
Send data over the open client links of this session.
static XrdSysError fgEDest
XrdProofdProofServMgr * SessionMgr() const
XrdProofUI UI() const
bool Alive(XrdProofdProtocol *p)
Check destroyed status.
XPClientRequest fRequest
static XrdSysRecMutex fgBMutex
#define kXPD_ClientMaster
virtual void RegisterDirectives()
static int ChangePerm(uid_t uid, gid_t gid)
static XrdBuffer * GetBuff(int quantum, XrdBuffer *argp=0)
Allocate a buffer to handle quantum bytes; if argp points to an existing buffer, its size is checked ...
struct ClientRequestHdr header
XrdROOT * DefaultVersion() const
Definition: XrdROOT.h:118
const char * BinDir() const
Definition: XrdROOT.h:66
#define kXPD_setidle
Int_t bsize[]
Definition: SparseFit4.cxx:31
static int Configure(char *parms, XrdProtocol_Config *pi)
Protocol configuration tool Function: Establish configuration at load time.
#define XPROOFD_VERSION
int BroadcastCtrlC(const char *usr)
Broadcast a ctrlc interrupt Return 0 on success, -1 on error.
XrdSecEntity fSecEntity
static XrdBuffManager * fgBPool
static XrdProofdManager * fgMgr
std::vector< XrdProofdResponse * > fResponses
int CheckActiveSessions(bool verify=1)
Go through the active sessions admin path and make sure sessions are alive.
void Reset()
Reset static and local vars.
XrdProofdClient * fPClient
XrdOucString fUser
Definition: XrdProofdAux.h:40
void DisconnectFromProofServ(int pid)
Change reconnecting status.
XrdSecProtocol * fAuthProt
struct XPClientSendRcvRequest sendrcv
#define XPDLOC(d, x)
XrdProofSched * ProofSched() const
XrdProofdPipe * Pipe()
const char * TraceID() const
virtual int DoDirective(XrdProofdDirective *, char *, XrdOucStream *, bool)
unsigned char fClntCapVer
#define TRACESET(act, on)
void Push(XpdObject *Node)
Push back a protocol.
Definition: XpdObject.cxx:47
XrdProofdResponse * Response(kXR_unt16 rid)
Get response instance corresponding to stream ID 'sid'.
XrdProofdPipe * Pipe()
static const char * ProofRequestTypes(int type)
Translates the proof request type in a human readable string.
XrdProofdProtocol * Pop()
Pop up a protocol object.
Definition: XpdObject.cxx:31
XrdOucString fGroup
Definition: XrdProofdAux.h:41
#define XrdSysMutexHelper
Definition: XrdSysToOuc.h:17
struct XPClientProofRequest proof
void Set(XrdLink *l)
Set the link to be used by this response.
void Recycle(XrdLink *lp, int x, const char *y)
Recycle call. Release the instance and give it back to the stack.
bool IsRootdAllowed(const char *host)
Check if 'host' is allowed to access files via rootd.
XrdScheduler * Sched() const
#define XrdSysError
Definition: XpdSysError.h:8
static int Touch(const char *path, int opt=0)
Set access (opt == 1), modify (opt =2 ) or access&modify (opt = 0, default) times of path to current ...
const char * AdminPath() const
#define TRACEP(p, act, x)
int MvSession(const char *fpid)
Move session file from the active to the terminated areas.
#define TRACE_REQ
#define TRACE_DOMAINS
XrdProofdNetMgr * NetMgr() const
int GetData(const char *dtype, char *buff, int blen)
Get data from the open link.
#define kXPD_logmsg
#define XPD_SETRESPV(p, x)
int Touch(bool reset=0)
Send a touch the connected clients: this will remotely touch the associated TSocket instance and sche...
XrdProofdProtocol(XrdProtocol_Config *pi=0)
Protocol constructor.
rpdunixsrv * RootdUnixSrv() const
#define XPDFORM
Definition: XrdProofdAux.h:381
#define kXPD_Worker
void TouchAdminPath()
Recording time of the last request on this instance.
int Process2()
Local processing method: here the request is dispatched to the appropriate method.
#define TRACE_MEM
int type
Definition: TGX11.cxx:120
int SendData(int cid, void *buff, int len)
Send data to client cid.
#define kXPD_fb_prog
XrdSysRecMutex fCtrlcMutex
XrdProofdResponse * GetNewResponse(kXR_unt16 rid)
Create new response instance for stream ID 'sid'.
int Ping()
Handle a ping request.
#define kXPD_startprocess
static int fgEUidAtStartup
int Process(XrdLink *lp)
Process the information received on the active link.
#define XPD_SETRESP(p, x)
R__EXTERN C unsigned int sleep(unsigned int seconds)
static XrdSysLogger gMainLogger
int StartRootd(XrdLink *lp, XrdOucString &emsg)
Transfer the connection to a rootd daemon to serve a file access request Return 0 on success...
XrdProofdPriorityMgr * PriorityMgr() const
int Urgent()
Handle generic request of a urgent message to be forwarded to the server.
int XrdgetProtocolPort(const char *, char *, XrdProtocol_Config *pi)
This function is called early on to determine the port we need to use.
XrdProtocol * XrdgetProtocol(const char *, char *parms, XrdProtocol_Config *pi)
This protocol is meant to live in a shared library.
static void ReleaseBuff(XrdBuffer *argp)
Release a buffer previously allocated via GetBuff.
XrdOucString fName
Definition: XrdProofdAux.h:111
int Stats(char *buff, int blen, int do_sync)
Return statistics info about the protocol.
#define XPD_DEF_PORT
struct ResetCtrlcGuard ResetCtrlcGuard_t
const char * User() const
XrdROOTMgr * ROOTMgr() const
int Interrupt()
Handle an interrupt request.
int SendDataN(XrdProofdProofServ *xps, XrdSrvBuffer **buf=0, bool sb=0)
Send data over the open client links of session 'xps'.
int Post(int type, const char *msg)
Post message on the pipe.
void Set(int inQMax, time_t agemax=1800)
Lock the data area and set the values.
Definition: XpdObject.cxx:64
int SendMsg()
Handle a request to forward a message to another process.
static int VerifyProcessByID(int pid, const char *pname="proofserv")
Check if a process named 'pname' and process 'pid' is still in the process table. ...
XrdOucTrace * XrdProofdTrace
XrdProofdProofServ * GetServer(int psid)
Get from the vector server instance with ID psid.
double exp(double)
int CtrlC()
Set and propagate a Ctrl-C request.
XrdOucString fAdminPath
int VerifyProofServ(bool fw)
Check if the associated proofserv process is alive.
int Send(void)
Auxilliary Send method.
const Int_t n
Definition: legend1.C:16
void SetStartMsg(XrdSrvBuffer *sm)
int SendData(XrdProofdProofServ *xps, kXR_int32 sid=-1, XrdSrvBuffer **buf=0, bool sb=0)
Send data over the open link. Segmentation is done here, if required.
bool RootdFork() const
XrdProofdClient * Client() const
static bool fgConfigDone