28#include "XrdOuc/XrdOucStream.hh"
30#include "XrdVersion.hh"
31#include "Xrd/XrdBuffer.hh"
32#include "Xrd/XrdScheduler.hh"
59 "xproofd protocol anchor");
81#define XPDCOND(n,ns) ((n == -1 && ns == -1) || (n > 0 && n >= ns))
84#define XPDSETSTRING(n,ns,c,s) \
85 { if (XPDCOND(n,ns)) { \
86 SafeFree(c); c = strdup(s.c_str()); ns = n; }}
90#define XPDADOPTSTRING(n,ns,c,s) \
92 XPDSETSTRING(n, ns, t, s); \
93 if (t && strlen(t)) { \
100#define XPDSETINT(n,ns,i,s) \
101 { if (XPDCOND(n,ns)) { \
102 i = strtol(s.c_str(),0,10); ns = n; }}
112typedef struct ResetCtrlcGuard {
116 ~ResetCtrlcGuard() {
if (xpd && type !=
kXP_ctrlc) xpd->ResetCtrlC(); }
124 XrdProofdProtCfg(
const char *cfg, XrdSysError *edest = 0);
132XrdProofdProtCfg::XrdProofdProtCfg(
const char *cfg, XrdSysError *edest)
136 RegisterDirectives();
142void XrdProofdProtCfg::RegisterDirectives()
152 char *val, XrdOucStream *cfg,
bool)
156 XrdOucString port(val);
157 if (
d->fName ==
"xrd.protocol") {
158 port = cfg->GetWord();
159 port.replace(
"xproofd:",
"");
160 }
else if (
d->fName !=
"port") {
163 if (port.length() > 0) {
164 fPort = strtol(port.c_str(), 0, 10);
170#if (ROOTXRDVERS >= 300030000)
201 XrdProofdProtCfg pcfg(pi->ConfigFN, pi->eDest);
206 if (pcfg.fPort > 0) {
219 :
XrdProtocol(
"xproofd protocol handler"), fProtLink(this)
228 fStdErrFD = (pi && pi->eDest) ? pi->eDest->baseFD() : fileno(stderr);
239 XPDLOC(ALL,
"Protocol::Response")
255 XPDLOC(ALL,
"Protocol::GetNewResponse")
265 msg +=
" new capacity: ";
273 msg +=
"; new size: ";
278 TRACE(XERR,
"wrong sid: "<<sid);
293 XPDLOC(ALL,
"Protocol::Match")
295 struct ClientInitHandShake hsdata;
296 char *hsbuff = (
char *)&hsdata;
298 static hs_response_t hsresp = {0, 0, kXR_int32(htonl(
XPROOFD_VERSBIN)), 0};
302 TRACE(HDBG,
"enter");
306 if ((dlen = lp->Peek(hsbuff,
sizeof(hsdata),
fgReadWait)) !=
sizeof(hsdata)) {
307 if (dlen <= 0) lp->setEtext(
"Match: handshake not received");
310 hsdata.first = ntohl(hsdata.first);
311 if (hsdata.first == 8) {
312 emsg =
"rootd-file serving not supported any-longer";
314 if (emsg.length() > 0) {
315 lp->setEtext(emsg.c_str());
317 lp->setEtext(
"link transfered");
321 TRACE(XERR,
"peeked incomplete or empty information! (dlen: "<<dlen<<
" bytes)");
326 hsdata.third = ntohl(hsdata.third);
327 if (dlen !=
sizeof(hsdata) || hsdata.first || hsdata.second
328 || !(hsdata.third == 1) || hsdata.fourth || hsdata.fifth) {
332 TRACE(ALL,
"matched xrootd protocol on link: serving a file");
334 TRACE(XERR,
"failed to match any known or enabled protocol");
340 if (!lp->Send((
char *)&hsresp,
sizeof(hsresp))) {
341 lp->setEtext(
"Match: handshake failed");
342 TRACE(XERR,
"handshake failed");
347 int len =
sizeof(hsdata);
348 if (lp->Recv(hsbuff, len) != len) {
349 lp->setEtext(
"Match: reread failed");
350 TRACE(XERR,
"reread failed");
362 xpp->
fSecEntity.host = strdup((
char *)lp->Host());
366 if (xpp->
GetData(
"dummy",(
char *)&dum[0],
sizeof(dum)) != 0) {
382 static char statfmt[] =
"<stats id=\"xproofd\"><num>%ld</num></stats>";
386 return sizeof(statfmt)+16;
417 std::vector<XrdProofdResponse *>::iterator ii =
fResponses.begin();
431 XPDLOC(ALL,
"Protocol::Configure")
481 mp =
"global manager created";
486 " build "<<XrdVERSION<<
" successfully loaded");
498 XPDLOC(ALL,
"Protocol::Process")
514 memcpy((
void *)&sid, (
const void *)&(
fRequest.
header.streamid[0]), 2);
518 TRACET(
TraceID(), XERR,
"could not get Response instance for rid: "<< sid);
533 response->
Send(kXR_ArgInvalid,
"Process: Invalid request data length");
534 return fLink->setEtext(
"Process: protocol data length error");
542 response->
Send(kXR_ArgTooLong,
"fRequest.argument is too long");
560 XPDLOC(ALL,
"Protocol::Process2")
577 response->Send(kXR_InvalidRequest,
"client undefined!!! ");
607 TRACE(XERR,
"link is undefined! ");
618 TRACE(XERR,
"link is undefined! ");
629 XPDLOC(ALL,
"Protocol::Recycle")
631 const char *srvtype[6] = {
"ANY",
"MasterWorker",
"MasterMaster",
632 "ClientMaster",
"Internal",
"Admin"};
655 TRACE(REQ,
"External disconnection of protocol associated with pid "<<
fPid);
659 discpath.replace(
"/cid",
"/disconnected");
660 FILE *fd = fopen(discpath.c_str(),
"w");
661 if (!fd && errno != ENOENT) {
662 TRACE(XERR,
"unable to create path: " <<discpath<<
" (errno: "<<errno<<
")");
675 TRACE(REQ,
"Non-destroyed proofserv processes attached to this protocol ("<<
this<<
676 "), setting reconnect time");
681 TRACE(XERR,
"No XrdProofdMgr ("<<
fgMgr<<
") or SessionMgr ("
723 XPDLOC(ALL,
"Protocol::GetBuff")
725 TRACE(HDBG,
"len: "<<quantum);
730 if (quantum >= argp->bsize / 2 && quantum <= argp->bsize)
740 if ((argp =
fgBPool->Obtain(quantum)) == 0) {
741 TRACE(XERR,
"could not get requested buffer (size: "<<quantum<<
742 ") = insufficient memory");
744 TRACE(HDBG,
"quantum: "<<quantum<<
745 ", buff: "<<(
void *)(argp->buff)<<
", bsize:"<<argp->bsize);
766 XPDLOC(ALL,
"Protocol::GetData")
772 TRACET(
TraceID(), HDBG,
"dtype: "<<(dtype ? dtype :
" - ")<<
", blen: "<<blen);
777 if (rlen != -ENOMSG && rlen != -ECONNRESET) {
778 XrdOucString emsg =
"link read error: errno: ";
781 return (
fLink ?
fLink->setEtext(emsg.c_str()) : -1);
783 TRACET(
TraceID(), HDBG,
"connection closed by peer (errno: "<<-rlen<<
")");
788 TRACET(
TraceID(), DBG, dtype <<
" timeout; read " <<rlen <<
" of " <<blen <<
" bytes - rescheduling");
802 XPDLOC(ALL,
"Protocol::SendData")
816 if (!argp)
return -1;
824 if ((rc =
GetData(
"data", argp->buff, quantum))) {
828 if (buf && !(*buf) && savebuf)
833 XPDFORM(msg,
"EXT: server ID: %d, sending: %d bytes", sid, quantum);
835 argp->buff, quantum) != 0) {
837 XPDFORM(msg,
"EXT: server ID: %d, problems sending: %d bytes to server",
847 XPDFORM(msg,
"INT: client ID: %d, sending: %d bytes", cid, quantum);
848 if (xps->
SendData(cid, argp->buff, quantum) != 0) {
850 XPDFORM(msg,
"INT: client ID: %d, problems sending: %d bytes to client",
878 XPDLOC(ALL,
"Protocol::SendDataN")
892 if (!argp)
return -1;
896 if ((rc =
GetData(
"data", argp->buff, quantum))) {
900 if (buf && !(*buf) && savebuf)
904 if (xps->
SendDataN(argp->buff, quantum) != 0) {
927 XPDLOC(ALL,
"Protocol::SendMsg")
929 static const char *crecv[5] = {
"master proofserv",
"top master",
930 "client",
"undefined",
"any"};
943 XPDFORM(msg,
"%s: session ID not found: %d", (
Internal() ?
"INT" :
"EXT"), psid);
945 response->Send(kXR_InvalidRequest, msg.c_str());
956 XPDFORM(msg,
"EXT: sending %d bytes to proofserv (psid: %d, xps: %p, status: %d,"
957 " cid: %d)", len, psid, xps, xps->
Status(),
fCID);
968 TRACET(
TraceID(), REQ,
"EXT: error sending message to proofserv");
980 XPDFORM(msg,
"INT: sending %d bytes to client/master (psid: %d, xps: %p, status: %d)",
981 len, psid, xps, xps->
Status());
984 bool saveStartMsg = 0;
988 TRACET(
TraceID(), DBG,
"INT: setting proofserv in 'idle' state");
993 TRACET(
TraceID(), DBG,
"INT: got message with query number");
995 TRACET(
TraceID(), DBG,
"INT: setting proofserv in 'running' state");
1015 if (
SendData(xps, -1, &savedBuf, saveStartMsg) != 0) {
1017 "SendMsg: INT: session is reconnecting: retry later");
1022 if (
SendDataN(xps, &savedBuf, saveStartMsg) != 0) {
1024 "SendMsg: INT: session is reconnecting: retry later");
1036 XPDFORM(msg,
"INT: message sent to %s (%d bytes)", crecv[ii], len);
1052 XPDLOC(ALL,
"Protocol::Urgent")
1054 unsigned int rc = 0;
1070 response->Send(kXR_InvalidRequest,
"Urgent: session ID not found");
1077 if (!xps->
Match(psid)) {
1084 response->Send(
kXP_InvalidRequest,
"Urgent: session response object undefined - do nothing");
1089 int len = 3 *
sizeof(kXR_int32);
1090 char *buf =
new char[len];
1092 kXR_int32 itmp =
static_cast<kXR_int32
>(htonl(
type));
1093 memcpy(buf, &itmp,
sizeof(kXR_int32));
1095 itmp =
static_cast<kXR_int32
>(htonl(int1));
1096 memcpy(buf +
sizeof(kXR_int32), &itmp,
sizeof(kXR_int32));
1098 itmp =
static_cast<kXR_int32
>(htonl(int2));
1099 memcpy(buf + 2 *
sizeof(kXR_int32), &itmp,
sizeof(kXR_int32));
1103 "Urgent: could not propagate request to proofsrv");
1120 XPDLOC(ALL,
"Protocol::Interrupt")
1135 response->Send(kXR_InvalidRequest,
"Interrupt: session ID not found");
1142 if (!xps->
Match(psid)) {
1148 XPDFORM(msg,
"xps: %p, link ID: %s, proofsrv PID: %d",
1155 "Interrupt: could not propagate interrupt code to proofsrv");
1176 XPDLOC(ALL,
"Protocol::Ping")
1192 TRACET(
TraceID(), REQ,
"psid: "<<psid<<
", async: "<<asyncopt);
1199 response->Send(kXR_InvalidRequest,
"session ID not found");
1204 kXR_int32 pingres = (psid > -1) ? 0 : 1;
1205 if (psid > -1 && xps && xps->
IsValid()) {
1213 if (asyncopt == 1) {
1214 TRACET(
TraceID(), DBG,
"EXT: async: notifying timeout to client: "<<checkfq<<
" secs");
1215 response->Send(kXR_ok, checkfq);
1220 if (path.length() <= 0) {
1221 TRACET(
TraceID(), XERR,
"EXT: admin path is empty! - protocol error");
1223 response->Send(
kXP_ServerError,
"EXT: admin path is empty! - protocol error");
1233 if (stat(path.c_str(), &st0) != 0) {
1234 TRACET(
TraceID(), XERR,
"EXT: cannot stat admin path: "<<path);
1245 if ((now - st0.st_mtime) > checkfq - 5) {
1248 TRACET(
TraceID(), XERR,
"EXT: could not send verify request to proofsrv");
1250 response->Send(
kXP_ServerError,
"EXT: could not verify reuqest to proofsrv");
1257 if (stat(path.c_str(), &st1) == 0) {
1258 if (st1.st_mtime > st0.st_mtime) {
1264 TRACET(
TraceID(), DBG,
"EXT: waiting "<<ns<<
" secs for session "<<pid<<
1265 " to touch the admin path");
1279 TRACET(
TraceID(), DBG,
"EXT: notified the result to client: "<<pingres);
1280 if (asyncopt == 0) {
1281 response->Send(kXR_ok, pingres);
1284 int len =
sizeof(kXR_int32);
1285 char *buf =
new char[len];
1287 kXR_int32 ifw = (kXR_int32)0;
1288 ifw =
static_cast<kXR_int32
>(htonl(ifw));
1289 memcpy(buf, &ifw,
sizeof(kXR_int32));
1290 response->Send(kXR_attn,
kXPD_ping, buf, len);
1293 }
else if (psid > -1) {
1299 response->Send(kXR_ok, pingres);
1311 XPDLOC(ALL,
"Protocol::PostSession")
1315 int pid = (xps) ? xps->
SrvPID() : -1;
1317 TRACE(XERR,
"undefined session or process id");
1321 XPDFORM(buf,
"%d %s %s %d", on, u,
g, pid);
1324 buf.c_str()) != 0) {
1325 TRACE(XERR,
"problem posting the prority manager pipe");
1331 TRACE(DBG,
"posting the scheduler pipe");
1333 TRACE(XERR,
"problem posting the scheduler pipe");
1340 TRACE(XERR,
"problem posting the session manager pipe");
1352 XPDLOC(ALL,
"Protocol::TouchAdminPath")
1365 apath.replace(
"/activesessions/",
"/terminatedsessions/");
1366 apath.replace(
".status",
"");
1369 if (rc != 0 && rc != -ENOENT) {
1370 const char *
type =
Internal() ?
"internal" :
"external";
1384 XPDLOC(ALL,
"Protocol::CtrlC")
#define TRACE(Flag, Args)
R__EXTERN C unsigned int sleep(unsigned int seconds)
#define kXPD_ClientMaster
#define kXPD_startprocess
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor.
static XrdSysLogger gMainLogger
XrdProtocol * XrdgetProtocol(const char *, char *parms, XrdProtocol_Config *pi)
This protocol is meant to live in a shared library.
int XrdgetProtocolPort(const char *, char *, XrdProtocol_Config *pi)
This function is called early on to determine the port we need to use.
struct ResetCtrlcGuard ResetCtrlcGuard_t
XrdOucTrace * XrdProofdTrace
#define XPD_SETRESPV(p, x)
#define XPD_SETRESP(p, x)
#define TRACESET(act, on)
R__EXTERN XrdOucTrace * XrdProofdTrace
#define TRACET(tid, act, x)
void Set(int inQMax, time_t agemax=1800)
Lock the data area and set the values.
void Push(XpdObject *Node)
Push back a protocol.
XrdProofdProtocol * Pop()
Pop up a protocol object.
XrdProofdProtocol * objectItem()
void setItem(XrdProofdProtocol *ival)
static int Touch(const char *path, int opt=0)
Set access (opt == 1), modify (opt =2 ) or access&modify (opt = 0, default) times of path to current ...
static int VerifyProcessByID(int pid, const char *pname="proofserv")
Check if a process named 'pname' and process 'pid' is still in the process table.
static const char * ProofRequestTypes(int type)
Translates the proof request type in a human readable string.
const char * User() const
int ResetClientSlot(int ic)
Reset slot at 'ic'.
XrdProofdProofServ * GetServer(int psid)
Get from the vector server instance with ID psid.
int Touch(bool reset=0)
Send a touch the connected clients: this will remotely touch the associated TSocket instance and sche...
virtual int DoDirective(XrdProofdDirective *, char *, XrdOucStream *, bool)
virtual void RegisterDirectives()
XrdProofdPriorityMgr * PriorityMgr() const
XrdProofSched * ProofSched() const
XrdProofdNetMgr * NetMgr() const
int Process(XrdProofdProtocol *p)
Process manager request.
XrdProtocol * Xrootd() const
XrdProofdProofServMgr * SessionMgr() const
int Config(bool rcf=0)
Run configuration and parse the entered config directives.
int BroadcastCtrlC(const char *usr)
Broadcast a ctrlc interrupt Return 0 on success, -1 on error.
int Post(int type, const char *msg)
Post message on the pipe.
int MvSession(const char *fpid)
Move session file from the active to the terminated areas.
void DisconnectFromProofServ(int pid)
Change reconnecting status.
int CheckActiveSessions(bool verify=1)
Go through the active sessions admin path and make sure sessions are alive.
bool Alive(XrdProofdProtocol *p)
Check destroyed status.
int DeleteFromSessions(const char *pid)
Delete from the hash list the session with ID pid.
void SetReconnectTime(bool on=1)
Change reconnecting status.
int CheckFrequency() const
XrdProofdResponse * Response() const
const char * AdminPath() const
int SendData(int cid, void *buff, int len)
Send data to client cid.
int VerifyProofServ(bool fw)
Check if the associated proofserv process is alive.
int SendDataN(void *buff, int len)
Send data over the open client links of this session.
bool Match(short int id) const
void SetStartMsg(XrdSrvBuffer *sm)
static int Configure(char *parms, XrdProtocol_Config *pi)
Protocol configuration tool Function: Establish configuration at load time.
void Recycle(XrdLink *lp, int x, const char *y)
Recycle call. Release the instance and give it back to the stack.
int SendMsg()
Handle a request to forward a message to another process.
int SendData(XrdProofdProofServ *xps, kXR_int32 sid=-1, XrdSrvBuffer **buf=0, bool sb=0)
Send data over the open link. Segmentation is done here, if required.
XrdProofdResponse * Response(kXR_unt16 rid)
Get response instance corresponding to stream ID 'sid'.
XrdProofdResponse * GetNewResponse(kXR_unt16 rid)
Create new response instance for stream ID 'sid'.
static XrdSysRecMutex fgBMutex
static void PostSession(int on, const char *u, const char *g, XrdProofdProofServ *xps)
Post change of session status.
static XrdSysError fgEDest
XrdProofdClient * fPClient
int SendDataN(XrdProofdProofServ *xps, XrdSrvBuffer **buf=0, bool sb=0)
Send data over the open client links of session 'xps'.
static XpdObjectQ fgProtStack
XrdProtocol * Match(XrdLink *lp)
Check whether the request matches this protocol.
XrdProofdProtocol(XrdProtocol_Config *pi=0)
Protocol constructor.
static XrdProofdManager * fgMgr
XrdProofdClient * Client() const
int Process2()
Local processing method: here the request is dispatched to the appropriate method.
int CtrlC()
Set and propagate a Ctrl-C request.
int Stats(char *buff, int blen, int do_sync)
Return statistics info about the protocol.
static XrdBuffManager * fgBPool
std::vector< XrdProofdResponse * > fResponses
XrdSysRecMutex fCtrlcMutex
XrdSecProtocol * fAuthProt
void Reset()
Reset static and local vars.
int Process(XrdLink *lp)
Process the information received on the active link.
static XrdSysLogger * fgLogger
int Interrupt()
Handle an interrupt request.
static XrdBuffer * GetBuff(int quantum, XrdBuffer *argp=0)
Allocate a buffer to handle quantum bytes; if argp points to an existing buffer, its size is checked ...
void TouchAdminPath()
Recording time of the last request on this instance.
int Urgent()
Handle generic request of a urgent message to be forwarded to the server.
static int fgEUidAtStartup
const char * TraceID() const
XrdSecEntity * fSecClient
unsigned char fClntCapVer
int GetData(const char *dtype, char *buff, int blen)
Get data from the open link.
static void ReleaseBuff(XrdBuffer *argp)
Release a buffer previously allocated via GetBuff.
int Ping()
Handle a ping request.
void Set(XrdLink *l)
Set the link to be used by this response.
int Send(void)
Auxilliary Send method.
const char * TraceID() const
static int ChangePerm(uid_t uid, gid_t gid)
struct ClientRequestHdr header
struct XPClientProofRequest proof
struct XPClientInterruptRequest interrupt
struct XPClientSendRcvRequest sendrcv