28 #include "XrdOuc/XrdOucStream.hh" 30 #include "XrdVersion.hh" 31 #include "Xrd/XrdBuffer.hh" 32 #include "Xrd/XrdScheduler.hh" 59 "xproofd protocol anchor");
81 #define XPDCOND(n,ns) ((n == -1 && ns == -1) || (n > 0 && n >= ns)) 84 #define XPDSETSTRING(n,ns,c,s) \ 85 { if (XPDCOND(n,ns)) { \ 86 SafeFree(c); c = strdup(s.c_str()); ns = n; }} 89 #ifndef XPDADOPTSTRING 90 #define XPDADOPTSTRING(n,ns,c,s) \ 92 XPDSETSTRING(n, ns, t, s); \ 93 if (t && strlen(t)) { \ 100 #define XPDSETINT(n,ns,i,s) \ 101 { if (XPDCOND(n,ns)) { \ 102 i = strtol(s.c_str(),0,10); ns = n; }} 112 typedef struct ResetCtrlcGuard {
116 ~ResetCtrlcGuard() {
if (xpd && type !=
kXP_ctrlc) xpd->ResetCtrlC(); }
124 XrdProofdProtCfg(
const char *cfg,
XrdSysError *edest = 0);
126 void RegisterDirectives();
132 XrdProofdProtCfg::XrdProofdProtCfg(
const char *cfg,
XrdSysError *edest)
136 RegisterDirectives();
142 void XrdProofdProtCfg::RegisterDirectives()
152 char *val, XrdOucStream *cfg,
bool)
156 XrdOucString port(val);
157 if (d->
fName ==
"xrd.protocol") {
158 port = cfg->GetWord();
159 port.replace(
"xproofd:",
"");
160 }
else if (d->
fName !=
"port") {
163 if (port.length() > 0) {
164 fPort = strtol(port.c_str(), 0, 10);
170 #if (ROOTXRDVERS >= 300030000) 188 return (XrdProtocol *)0;
201 XrdProofdProtCfg pcfg(pi->ConfigFN, pi->eDest);
206 if (pcfg.fPort > 0) {
219 : XrdProtocol(
"xproofd protocol handler"), fProtLink(this)
228 fStdErrFD = (pi && pi->eDest) ? pi->eDest->baseFD() : fileno(stderr);
239 XPDLOC(ALL,
"Protocol::Response")
255 XPDLOC(ALL,
"Protocol::GetNewResponse")
265 msg +=
" new capacity: ";
273 msg +=
"; new size: ";
278 TRACE(XERR,
"wrong sid: "<<sid);
293 XPDLOC(ALL,
"Protocol::Match")
295 struct ClientInitHandShake hsdata;
296 char *hsbuff = (
char *)&hsdata;
298 static hs_response_t hsresp = {0, 0, kXR_int32(htonl(
XPROOFD_VERSBIN)), 0};
300 XrdProtocol *xp =
nullptr;
302 TRACE(HDBG,
"enter");
306 if ((dlen = lp->Peek(hsbuff,
sizeof(hsdata),
fgReadWait)) !=
sizeof(hsdata)) {
307 if (dlen <= 0) lp->setEtext(
"Match: handshake not received");
310 hsdata.first = ntohl(hsdata.first);
311 if (hsdata.first == 8) {
317 emsg =
"rootd: failed to start daemon: ";
321 XPDFORM(emsg,
"rootd-file serving not authorized for host '%s'", lp->Host());
324 emsg =
"rootd-file serving not enabled";
327 if (emsg.length() > 0) {
328 lp->setEtext(emsg.c_str());
330 lp->setEtext(
"link transfered");
334 TRACE(XERR,
"peeked incomplete or empty information! (dlen: "<<dlen<<
" bytes)");
339 hsdata.third = ntohl(hsdata.third);
340 if (dlen !=
sizeof(hsdata) || hsdata.first || hsdata.second
341 || !(hsdata.third == 1) || hsdata.fourth || hsdata.fifth) {
345 TRACE(ALL,
"matched xrootd protocol on link: serving a file");
347 TRACE(XERR,
"failed to match any known or enabled protocol");
353 if (!lp->Send((
char *)&hsresp,
sizeof(hsresp))) {
354 lp->setEtext(
"Match: handshake failed");
355 TRACE(XERR,
"handshake failed");
360 int len =
sizeof(hsdata);
361 if (lp->Recv(hsbuff, len) != len) {
362 lp->setEtext(
"Match: reread failed");
363 TRACE(XERR,
"reread failed");
375 xpp->
fSecEntity.host = strdup((
char *)lp->Host());
379 if (xpp->
GetData(
"dummy",(
char *)&dum[0],
sizeof(dum)) != 0) {
383 xp = (XrdProtocol *) xpp;
395 XPDLOC(ALL,
"Protocol::StartRootd")
405 if ((pid =
fgMgr->
Sched()->Fork(lp->Name()))) {
407 emsg =
"rootd fork failed";
420 dup2(lp->FDnum(), STDIN_FILENO);
421 dup2(lp->FDnum(), STDOUT_FILENO);
424 execv((
const char *)prog, (
char *
const *)progArg);
425 TRACE(XERR,
"rootd: Oops! Exec(" <<prog <<
") failed; errno: " <<errno);
435 emsg =
"ROOT version undefined!";
441 if (access(pexe.c_str(), X_OK) != 0) {
442 XPDFORM(emsg,
"path '%s' does not exist or is not executable (errno: %d)",
443 pexe.c_str(), (int)errno);
448 XrdOucString cmd,
exp;
449 XPDFORM(cmd,
"export ROOTBINDIR=\"%s\"; %s 20 0 %s %s", roo->
BinDir(),
452 while (progArg[n] != 0) {
453 cmd +=
" "; cmd += progArg[
n]; n++;
457 if (system(cmd.c_str()) == -1) {
458 XPDFORM(emsg,
"failure from 'system' (errno: %d)", (
int)errno);
465 if (!uconn || !uconn->isvalid(0)) {
466 XPDFORM(emsg,
"failure accepting callback (errno: %d)", -err);
467 if (uconn)
delete uconn;
470 TRACE(HDBG,
"proofexecv connected!");
474 int fd = dup(lp->FDnum());
475 if (fd < 0 || (rcc = uconn->senddesc(fd)) != 0) {
476 XPDFORM(emsg,
"failure sending descriptor '%d' (original: %d); (errno: %d)", fd, lp->FDnum(), -rcc);
477 if (uconn)
delete uconn;
494 static char statfmt[] =
"<stats id=\"xproofd\"><num>%ld</num></stats>";
498 return sizeof(statfmt)+16;
529 std::vector<XrdProofdResponse *>::iterator ii =
fResponses.begin();
543 XPDLOC(ALL,
"Protocol::Configure")
593 mp =
"global manager created";
598 " build "<<XrdVERSION<<
" successfully loaded");
610 XPDLOC(ALL,
"Protocol::Process")
616 if ((rc =
GetData(
"request", (
char *)&
fRequest,
sizeof(fRequest))) != 0)
621 fRequest.header.requestid = ntohs(fRequest.header.requestid);
622 fRequest.header.dlen = ntohl(fRequest.header.dlen);
626 memcpy((
void *)&sid, (
const void *)&(fRequest.header.streamid[0]), 2);
630 TRACET(
TraceID(), XERR,
"could not get Response instance for rid: "<< sid);
635 response->
Set(fRequest.header.streamid);
638 TRACET(
TraceID(), REQ,
"sid: " << sid <<
", req id: " << fRequest.header.requestid <<
640 ")" <<
", dlen: " <<fRequest.header.dlen);
644 if (fRequest.header.dlen < 0) {
645 response->
Send(kXR_ArgInvalid,
"Process: Invalid request data length");
646 return fLink->setEtext(
"Process: protocol data length error");
652 if (fRequest.header.requestid !=
kXP_sendmsg && fRequest.header.dlen) {
654 response->
Send(kXR_ArgTooLong,
"fRequest.argument is too long");
657 if ((rc =
GetData(
"arg",
fArgp->buff, fRequest.header.dlen)))
659 fArgp->buff[fRequest.header.dlen] =
'\0';
672 XPDLOC(ALL,
"Protocol::Process2")
689 response->Send(kXR_InvalidRequest,
"client undefined!!! ");
719 TRACE(XERR,
"link is undefined! ");
730 TRACE(XERR,
"link is undefined! ");
741 XPDLOC(ALL,
"Protocol::Recycle")
743 const char *srvtype[6] = {
"ANY",
"MasterWorker",
"MasterMaster",
744 "ClientMaster",
"Internal",
"Admin"};
767 TRACE(REQ,
"External disconnection of protocol associated with pid "<<
fPid);
771 discpath.replace(
"/cid",
"/disconnected");
772 FILE *fd = fopen(discpath.c_str(),
"w");
773 if (!fd && errno != ENOENT) {
774 TRACE(XERR,
"unable to create path: " <<discpath<<
" (errno: "<<errno<<
")");
787 TRACE(REQ,
"Non-destroyed proofserv processes attached to this protocol ("<<
this<<
788 "), setting reconnect time");
793 TRACE(XERR,
"No XrdProofdMgr ("<<
fgMgr<<
") or SessionMgr (" 835 XPDLOC(ALL,
"Protocol::GetBuff")
837 TRACE(HDBG,
"len: "<<quantum);
842 if (quantum >= argp->bsize / 2 && quantum <= argp->
bsize)
852 if ((argp =
fgBPool->Obtain(quantum)) == 0) {
853 TRACE(XERR,
"could not get requested buffer (size: "<<quantum<<
854 ") = insufficient memory");
856 TRACE(HDBG,
"quantum: "<<quantum<<
857 ", buff: "<<(
void *)(argp->buff)<<
", bsize:"<<argp->bsize);
878 XPDLOC(ALL,
"Protocol::GetData")
884 TRACET(
TraceID(), HDBG,
"dtype: "<<(dtype ? dtype :
" - ")<<
", blen: "<<blen);
889 if (rlen != -ENOMSG && rlen != -ECONNRESET) {
890 XrdOucString emsg =
"link read error: errno: ";
893 return (
fLink ?
fLink->setEtext(emsg.c_str()) : -1);
895 TRACET(
TraceID(), HDBG,
"connection closed by peer (errno: "<<-rlen<<
")");
900 TRACET(
TraceID(), DBG, dtype <<
" timeout; read " <<rlen <<
" of " <<blen <<
" bytes - rescheduling");
914 XPDLOC(ALL,
"Protocol::SendData")
928 if (!argp)
return -1;
936 if ((rc =
GetData(
"data", argp->buff, quantum))) {
940 if (buf && !(*buf) && savebuf)
945 XPDFORM(msg,
"EXT: server ID: %d, sending: %d bytes", sid, quantum);
947 argp->buff, quantum) != 0) {
949 XPDFORM(msg,
"EXT: server ID: %d, problems sending: %d bytes to server",
959 XPDFORM(msg,
"INT: client ID: %d, sending: %d bytes", cid, quantum);
960 if (xps->
SendData(cid, argp->buff, quantum) != 0) {
962 XPDFORM(msg,
"INT: client ID: %d, problems sending: %d bytes to client",
990 XPDLOC(ALL,
"Protocol::SendDataN")
1004 if (!argp)
return -1;
1008 if ((rc =
GetData(
"data", argp->buff, quantum))) {
1012 if (buf && !(*buf) && savebuf)
1016 if (xps->
SendDataN(argp->buff, quantum) != 0) {
1039 XPDLOC(ALL,
"Protocol::SendMsg")
1041 static const char *crecv[5] = {
"master proofserv",
"top master",
1042 "client",
"undefined",
"any"};
1055 XPDFORM(msg,
"%s: session ID not found: %d", (
Internal() ?
"INT" :
"EXT"), psid);
1057 response->Send(kXR_InvalidRequest, msg.c_str());
1068 XPDFORM(msg,
"EXT: sending %d bytes to proofserv (psid: %d, xps: %p, status: %d," 1069 " cid: %d)", len, psid, xps, xps->
Status(),
fCID);
1080 TRACET(
TraceID(), REQ,
"EXT: error sending message to proofserv");
1092 XPDFORM(msg,
"INT: sending %d bytes to client/master (psid: %d, xps: %p, status: %d)",
1093 len, psid, xps, xps->
Status());
1096 bool saveStartMsg = 0;
1100 TRACET(
TraceID(), DBG,
"INT: setting proofserv in 'idle' state");
1105 TRACET(
TraceID(), DBG,
"INT: got message with query number");
1107 TRACET(
TraceID(), DBG,
"INT: setting proofserv in 'running' state");
1127 if (
SendData(xps, -1, &savedBuf, saveStartMsg) != 0) {
1129 "SendMsg: INT: session is reconnecting: retry later");
1134 if (
SendDataN(xps, &savedBuf, saveStartMsg) != 0) {
1136 "SendMsg: INT: session is reconnecting: retry later");
1148 XPDFORM(msg,
"INT: message sent to %s (%d bytes)", crecv[ii], len);
1164 XPDLOC(ALL,
"Protocol::Urgent")
1166 unsigned int rc = 0;
1182 response->Send(kXR_InvalidRequest,
"Urgent: session ID not found");
1189 if (!xps->
Match(psid)) {
1196 response->Send(
kXP_InvalidRequest,
"Urgent: session response object undefined - do nothing");
1201 int len = 3 *
sizeof(kXR_int32);
1202 char *buf =
new char[len];
1204 kXR_int32 itmp =
static_cast<kXR_int32
>(htonl(type));
1205 memcpy(buf, &itmp,
sizeof(kXR_int32));
1207 itmp =
static_cast<kXR_int32
>(htonl(int1));
1208 memcpy(buf +
sizeof(kXR_int32), &itmp,
sizeof(kXR_int32));
1210 itmp =
static_cast<kXR_int32
>(htonl(int2));
1211 memcpy(buf + 2 *
sizeof(kXR_int32), &itmp,
sizeof(kXR_int32));
1215 "Urgent: could not propagate request to proofsrv");
1232 XPDLOC(ALL,
"Protocol::Interrupt")
1247 response->Send(kXR_InvalidRequest,
"Interrupt: session ID not found");
1254 if (!xps->
Match(psid)) {
1260 XPDFORM(msg,
"xps: %p, link ID: %s, proofsrv PID: %d",
1267 "Interrupt: could not propagate interrupt code to proofsrv");
1288 XPDLOC(ALL,
"Protocol::Ping")
1304 TRACET(
TraceID(), REQ,
"psid: "<<psid<<
", async: "<<asyncopt);
1311 response->Send(kXR_InvalidRequest,
"session ID not found");
1316 kXR_int32 pingres = (psid > -1) ? 0 : 1;
1317 if (psid > -1 && xps && xps->
IsValid()) {
1325 if (asyncopt == 1) {
1326 TRACET(
TraceID(), DBG,
"EXT: async: notifying timeout to client: "<<checkfq<<
" secs");
1327 response->Send(kXR_ok, checkfq);
1332 if (path.length() <= 0) {
1333 TRACET(
TraceID(), XERR,
"EXT: admin path is empty! - protocol error");
1335 response->Send(
kXP_ServerError,
"EXT: admin path is empty! - protocol error");
1345 if (stat(path.c_str(), &st0) != 0) {
1346 TRACET(
TraceID(), XERR,
"EXT: cannot stat admin path: "<<path);
1357 if ((now - st0.st_mtime) > checkfq - 5) {
1360 TRACET(
TraceID(), XERR,
"EXT: could not send verify request to proofsrv");
1362 response->Send(
kXP_ServerError,
"EXT: could not verify reuqest to proofsrv");
1369 if (stat(path.c_str(), &st1) == 0) {
1370 if (st1.st_mtime > st0.st_mtime) {
1376 TRACET(
TraceID(), DBG,
"EXT: waiting "<<ns<<
" secs for session "<<pid<<
1377 " to touch the admin path");
1391 TRACET(
TraceID(), DBG,
"EXT: notified the result to client: "<<pingres);
1392 if (asyncopt == 0) {
1393 response->Send(kXR_ok, pingres);
1396 int len =
sizeof(kXR_int32);
1397 char *buf =
new char[len];
1399 kXR_int32 ifw = (kXR_int32)0;
1400 ifw =
static_cast<kXR_int32
>(htonl(ifw));
1401 memcpy(buf, &ifw,
sizeof(kXR_int32));
1402 response->Send(kXR_attn,
kXPD_ping, buf, len);
1405 }
else if (psid > -1) {
1411 response->Send(kXR_ok, pingres);
1423 XPDLOC(ALL,
"Protocol::PostSession")
1427 int pid = (xps) ? xps->
SrvPID() : -1;
1429 TRACE(XERR,
"undefined session or process id");
1433 XPDFORM(buf,
"%d %s %s %d", on, u, g, pid);
1436 buf.c_str()) != 0) {
1437 TRACE(XERR,
"problem posting the prority manager pipe");
1443 TRACE(DBG,
"posting the scheduler pipe");
1445 TRACE(XERR,
"problem posting the scheduler pipe");
1452 TRACE(XERR,
"problem posting the session manager pipe");
1464 XPDLOC(ALL,
"Protocol::TouchAdminPath")
1477 apath.replace(
"/activesessions/",
"/terminatedsessions/");
1478 apath.replace(
".status",
"");
1481 if (rc != 0 && rc != -ENOENT) {
1482 const char *
type =
Internal() ?
"internal" :
"external";
1483 TRACET(
TraceID(), XERR, type<<
": problems touching "<<apath<<
"; errno: "<<-rc);
1496 XPDLOC(ALL,
"Protocol::CtrlC")
static void PostSession(int on, const char *u, const char *g, XrdProofdProofServ *xps)
Post change of session status.
XrdProofdProtocol * objectItem()
static XpdObjectQ fgProtStack
void SetReconnectTime(bool on=1)
Change reconnecting status.
const char * RootdExe() const
static XrdSysLogger * fgLogger
int ResetClientSlot(int ic)
Reset slot at 'ic'.
XrdProofdProofServMgr * SessionMgr() const
XrdSecEntity * fSecClient
XrdProtocol * Match(XrdLink *lp)
Check whether the request matches this protocol.
XrdROOT * DefaultVersion() const
XrdProofdNetMgr * NetMgr() const
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor.
struct XPClientInterruptRequest interrupt
int Config(bool rcf=0)
Run configuration and parse the entered config directives.
#define TRACE(Flag, Args)
int Process(XrdProofdProtocol *p)
Process manager request.
int DeleteFromSessions(const char *pid)
Delete from the hash list the session with ID pid.
bool Match(short int id) const
XrdScheduler * Sched() const
void setItem(XrdProofdProtocol *ival)
int SendDataN(void *buff, int len)
Send data over the open client links of this session.
static XrdSysError fgEDest
bool Alive(XrdProofdProtocol *p)
Check destroyed status.
static XrdSysRecMutex fgBMutex
#define kXPD_ClientMaster
static int ChangePerm(uid_t uid, gid_t gid)
static XrdBuffer * GetBuff(int quantum, XrdBuffer *argp=0)
Allocate a buffer to handle quantum bytes; if argp points to an existing buffer, its size is checked ...
rpdunixsrv * RootdUnixSrv() const
struct ClientRequestHdr header
you should not use this method at all Int_t Int_t Double_t Double_t em
static int Configure(char *parms, XrdProtocol_Config *pi)
Protocol configuration tool Function: Establish configuration at load time.
XrdProofSched * ProofSched() const
int BroadcastCtrlC(const char *usr)
Broadcast a ctrlc interrupt Return 0 on success, -1 on error.
const char * User() const
static XrdBuffManager * fgBPool
static XrdProofdManager * fgMgr
std::vector< XrdProofdResponse * > fResponses
int CheckActiveSessions(bool verify=1)
Go through the active sessions admin path and make sure sessions are alive.
void Reset()
Reset static and local vars.
XrdProofdClient * fPClient
void DisconnectFromProofServ(int pid)
Change reconnecting status.
XrdSecProtocol * fAuthProt
struct XPClientSendRcvRequest sendrcv
unsigned char fClntCapVer
#define TRACESET(act, on)
void Push(XpdObject *Node)
Push back a protocol.
int CheckFrequency() const
XrdProofdResponse * Response(kXR_unt16 rid)
Get response instance corresponding to stream ID 'sid'.
static const char * ProofRequestTypes(int type)
Translates the proof request type in a human readable string.
XrdProofdProtocol * Pop()
Pop up a protocol object.
#define XrdSysMutexHelper
const char ** RootdArgs() const
struct XPClientProofRequest proof
void Set(XrdLink *l)
Set the link to be used by this response.
void Recycle(XrdLink *lp, int x, const char *y)
Recycle call. Release the instance and give it back to the stack.
bool IsRootdAllowed(const char *host)
Check if 'host' is allowed to access files via rootd.
static int Touch(const char *path, int opt=0)
Set access (opt == 1), modify (opt =2 ) or access&modify (opt = 0, default) times of path to current ...
int MvSession(const char *fpid)
Move session file from the active to the terminated areas.
const char * TraceID() const
int GetData(const char *dtype, char *buff, int blen)
Get data from the open link.
const char * TraceID() const
#define XPD_SETRESPV(p, x)
const char * AdminPath() const
int Touch(bool reset=0)
Send a touch the connected clients: this will remotely touch the associated TSocket instance and sche...
XrdProofdProtocol(XrdProtocol_Config *pi=0)
Protocol constructor.
XrdROOTMgr * ROOTMgr() const
void TouchAdminPath()
Recording time of the last request on this instance.
int Process2()
Local processing method: here the request is dispatched to the appropriate method.
XrdProtocol * Xrootd() const
int SendData(int cid, void *buff, int len)
Send data to client cid.
XrdSysRecMutex fCtrlcMutex
XrdProofdResponse * GetNewResponse(kXR_unt16 rid)
Create new response instance for stream ID 'sid'.
int Ping()
Handle a ping request.
#define kXPD_startprocess
static int fgEUidAtStartup
int Process(XrdLink *lp)
Process the information received on the active link.
#define XPD_SETRESP(p, x)
R__EXTERN C unsigned int sleep(unsigned int seconds)
static XrdSysLogger gMainLogger
XrdProofdResponse * Response() const
int StartRootd(XrdLink *lp, XrdOucString &emsg)
Transfer the connection to a rootd daemon to serve a file access request Return 0 on success...
XrdProofdPriorityMgr * PriorityMgr() const
int Urgent()
Handle generic request of a urgent message to be forwarded to the server.
#define TRACET(tid, act, x)
int XrdgetProtocolPort(const char *, char *, XrdProtocol_Config *pi)
This function is called early on to determine the port we need to use.
XrdProtocol * XrdgetProtocol(const char *, char *parms, XrdProtocol_Config *pi)
This protocol is meant to live in a shared library.
static void ReleaseBuff(XrdBuffer *argp)
Release a buffer previously allocated via GetBuff.
int Stats(char *buff, int blen, int do_sync)
Return statistics info about the protocol.
struct ResetCtrlcGuard ResetCtrlcGuard_t
int Interrupt()
Handle an interrupt request.
int SendDataN(XrdProofdProofServ *xps, XrdSrvBuffer **buf=0, bool sb=0)
Send data over the open client links of session 'xps'.
XrdProofdClient * Client() const
int Post(int type, const char *msg)
Post message on the pipe.
void Set(int inQMax, time_t agemax=1800)
Lock the data area and set the values.
int SendMsg()
Handle a request to forward a message to another process.
static int VerifyProcessByID(int pid, const char *pname="proofserv")
Check if a process named 'pname' and process 'pid' is still in the process table. ...
XrdOucTrace * XrdProofdTrace
XrdProofdProofServ * GetServer(int psid)
Get from the vector server instance with ID psid.
int CtrlC()
Set and propagate a Ctrl-C request.
const char * BinDir() const
int VerifyProofServ(bool fw)
Check if the associated proofserv process is alive.
int Send(void)
Auxilliary Send method.
void SetStartMsg(XrdSrvBuffer *sm)
int SendData(XrdProofdProofServ *xps, kXR_int32 sid=-1, XrdSrvBuffer **buf=0, bool sb=0)
Send data over the open link. Segmentation is done here, if required.