27 #include "XrdNet/XrdNetAddr.hh"
39 #include "XrdClient/XrdClientConst.hh"
40 #include "XrdClient/XrdClientEnv.hh"
44 #include "XrdClient/XrdClientUrlInfo.hh"
45 #include "XrdOuc/XrdOucErrInfo.hh"
46 #include "XrdOuc/XrdOucString.hh"
47 #include "XrdSec/XrdSecInterface.hh"
48 #include "XrdSys/XrdSysLogger.hh"
49 #include "XrdSys/XrdSysPlatform.hh"
53 #if (defined(SUNCC) || defined(SUN))
54 #include <sys/isa_defs.h>
55 #if defined(_ILP32) && (_FILE_OFFSET_BITS != 32)
56 #undef _FILE_OFFSET_BITS
57 #define _FILE_OFFSET_BITS 32
58 #undef _LARGEFILE_SOURCE
64 #if !defined(__APPLE__)
74 # include <sys/socket.h>
76 #include <sys/types.h>
85 typedef XrdSecProtocol *(*secGetProt_t)(
const char *,
const struct sockaddr &,
86 const XrdSecParameters &, XrdOucErrInfo *);
97 #define URLTAG "["<<fUrl.Host<<":"<<fUrl.Port<<"]"
113 XrdClientAbsUnsolMsgHandler *uh,
const char *logbuf)
114 : fMode(m), fConnected(0), fLogConnID(-1), fStreamid(0), fRemoteProtocol(-1),
115 fServerProto(-1), fServerType(
kSTNone), fSessionID(psid), fPort(-1),
116 fLastErr(kXR_noErrorYet), fCapVer(capver), fLoginBuffer(logbuf), fMutex(0),
117 fConnectInterruptMtx(0), fConnectInterrupt(0), fPhyConn(0),
118 fOpenSockFD(-1), fUnsolMsgHandler(uh), fSender(0), fSenderArg(0)
120 XPDLOC(ALL,
"XrdProofConn")
127 if (url && !
Init(url)) {
129 TRACE(XERR,
"XrdProofConn: severe error occurred while opening a"
130 " connection" <<
" to server "<<
URLTAG);
166 TRACE(XERR,
"error initializing connection manager");
172 fUrl.TakeUrl(XrdOucString(url));
175 if (
fUser.length() <= 0) {
178 struct passwd *pw = getpwuid(getuid());
183 ::GetUserName(name, &length);
184 if (strlen(name) > 1)
203 XPDLOC(ALL,
"Conn::Connect")
206 int maxTry = (
fgMaxTry > -1) ?
fgMaxTry : EnvGetLong(NAME_FIRSTCONNECTMAXCNT);
219 TRACE(ALL,
"got an interrupt while connecting - aborting attempts");
232 TRACE(DBG,
"new logical connection ID: "<<logid);
243 if (
fLastErr == kXR_InvalidRequest) {
245 msg.erase(msg.rfind(
":"));
246 TRACE(XERR,
"failure: " << msg);
258 TRACE(DBG,
"connection successfully created");
264 TRACE(REQ,
"disconnecting");
268 if (i < maxTry - 1) {
269 TRACE(DBG,
"connection attempt failed: sleep " << timeWait <<
" secs");
270 if (
fUrl.Host ==
"lite" ||
fUrl.Host ==
"pod") {
271 const char *cdef = (
fUrl.Host ==
"lite") ?
" (or \"\": check 'Proof.LocalDefault')" :
"";
272 const char *cnow = (
fUrl.Host ==
"lite") ?
"now " :
"";
273 const char *cses = (
fUrl.Host ==
"lite") ?
"PROOF-Lite" :
"PoD";
274 TRACE(ALL,
"connection attempt to server \""<<
fUrl.Host<<
"\" failed. We are going to retry after some sleep,");
275 TRACE(ALL,
"but if you intended to start a "<<cses<<
" session instead, please note that you must");
276 TRACE(ALL, cnow<<
"use \""<<
fUrl.Host<<
"://\" as connection string"<<cdef);
285 TRACE(XERR,
"failed to connect to " <<
fUrl.GetUrl());
318 XPDLOC(ALL,
"Conn::ReConnect")
327 int maxtry, timewait;
334 TRACE(DBG,
"server does not support reconnections (protocol: %d" <<
345 XPDLOC(ALL,
"Conn::TryConnect")
351 static int servdef = -1;
353 struct servent *ent = getservbyname(
"proofd",
"tcp");
354 servdef = (ent) ? (
int)ntohs(ent->s_port) : 1093;
359 char *haddr[10] = {0}, *hname[10] = {0};
360 int naddr = XrdSysDNS::getAddrName(
fUrl.Host.c_str(), 10, haddr, hname);
363 for (; i < naddr; i++ ) {
365 fUrl.HostAddr = (
const char *) haddr[i];
367 fUrl.Host = (
const char *) hname[i];
369 TRACE(HDBG,
"found host "<<
fUrl.Host<<
" with addr " <<
fUrl.HostAddr);
374 aNA.Set(
fUrl.Host.c_str());
375 fUrl.Host = (
const char *) aNA.Name();
377 if (aNA.Format(ha, 256) <= 0) {
378 TRACE(DBG,
"failure resolving address name " <<
URLTAG);
383 fUrl.HostAddr = (
const char *) ha;
385 TRACE(HDBG,
"found host "<<
fUrl.Host<<
" with addr " <<
fUrl.HostAddr);
393 TRACE(DBG,
"failure creating logical connection to " <<
URLTAG);
424 XPDLOC(ALL,
"Conn::Close")
431 bool closephys = (opt[0] ==
'P') ? 1 : 0;
432 TRACE(DBG,
URLTAG <<
": closing also physical connection ? "<< closephys);
455 XPDLOC(ALL,
"Conn::ProcessUnsolicitedMsg")
457 TRACE(DBG,
"processing unsolicited response");
460 TRACE(XERR,
"Got empty or error unsolicited message");
464 if ((len = m->
DataLen()) < (
int)
sizeof(kXR_int32)) {
465 TRACE(XERR,
"empty or bad-formed message - ignoring");
470 memcpy(&acod, m->
GetData(),
sizeof(kXR_int32));
473 void *pdata = (
void *)((
char *)(m->
GetData()) +
sizeof(kXR_int32));
479 memcpy(&opt, pdata,
sizeof(kXR_int32));
481 if (opt == 0 || opt == 1 || opt == 2) {
483 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
484 len -=
sizeof(kXR_int32);
490 (*fSender)((
const char *)pdata, len,
fSenderArg);
531 XPDLOC(ALL,
"Conn::SendRecv")
546 int reqDataLen = req->
header.dlen;
552 TRACE(XERR,
"problems sending request to server "<<
URLTAG);
557 bool needalloc = (answData && !(*answData));
562 size_t dataRecvSize = 0;
567 kXR_int16 xst = kXR_error;
569 TRACE(XERR,
"reading msg from connmgr (server "<<
URLTAG<<
")");
580 if ((xst == kXR_ok) || (xst == kXR_oksofar) || (xst == kXR_authmore)) {
581 if (answData && xmsg->
DataLen() > 0) {
583 *answData = (
char *) realloc(*answData, dataRecvSize + xmsg->
DataLen());
586 TRACE(XERR,
"reallocating "<<dataRecvSize<<
" bytes");
587 free((
void *) *answData);
595 memcpy((*answData)+dataRecvSize,
600 TRACE(DBG,
"dumping read data ...");
601 for (
int jj = 0; jj < xmsg->
DataLen(); jj++) {
603 if (!(jj%10))
printf(
"\n");
608 dataRecvSize += xmsg->
DataLen();
610 }
else if (xst != kXR_error) {
613 TRACE(XERR,
"status in reply is unknown ["<<
615 "] (server "<<
URLTAG<<
") - Abort");
621 if (xmsg && (xst == kXR_oksofar) && (xmsg->
DataLen() == 0))
624 }
while (xmsg && (xmsg->
HeaderStatus() == kXR_oksofar));
628 xmsg->
fHdr.dlen = dataRecvSize;
637 char **answData,
const char *CmdName,
640 XPDLOC(ALL,
"Conn::SendReq")
647 bool resp = 0, abortcmd = 0;
654 while (!abortcmd && !resp) {
666 TRACE(DBG,
"calling SendRecv");
667 answMex =
SendRecv(req, reqData, answData);
672 if (!answMex || answMex->
IsError()) {
674 TRACE(DBG,
"communication error detected with "<<
URLTAG);
675 if (retry > maxTry) {
676 TRACE(XERR,
"max number of retries reached - Abort");
683 TRACE(XERR,
"not connected: nothing to do");
702 if (retry > maxTry) {
703 TRACE(XERR,
"max number of retries reached - Abort");
713 TRACE(DBG,
"sleep "<<sleeptime<<
" secs ...");
728 const char *method,
bool notifyerr)
730 XPDLOC(ALL,
"Conn::CheckResp")
734 if (resp->status != kXR_ok && resp->status != kXR_authmore &&
735 resp->status != kXR_wait) {
738 " did not return OK replying to last request");
746 TRACE(XERR, method <<
" return message not belonging to this client"
747 " - protocol error");
763 return (memcmp(ServerResponse->streamid, sid,
sizeof(sid)) == 0 );
770 memcpy((
void *)sid, (
const void*)&
fStreamid, 2);
780 XPDLOC(ALL,
"Conn::LowWrite")
788 int len =
sizeof(req->
header);
789 if ((wc =
WriteRaw(req, len)) != len) {
790 TRACE(XERR,
"sending header to server "<<
URLTAG<<
" (rc="<<wc<<
")");
796 if (reqDataLen > 0) {
798 if ((wc =
WriteRaw(reqData, reqDataLen)) != reqDataLen) {
799 TRACE(XERR,
"sending data ("<<reqDataLen<<
" bytes) to server "<<
URLTAG<<
812 const char *CmdName,
bool notifyerr)
814 XPDLOC(ALL,
"Conn::CheckErrorStatus")
823 struct ServerResponseBody_Error *body_err;
825 body_err = (
struct ServerResponseBody_Error *)mex->
GetData();
828 fLastErr = (XErrorCode)ntohl(body_err->errnum);
849 struct ServerResponseBody_Wait *body_wait;
851 body_wait = (
struct ServerResponseBody_Wait *)mex->
GetData();
854 int sleeptime = ntohl(body_wait->seconds);
856 TRACE(DBG,
"wait request ("<<sleeptime<<
857 " secs); message: "<<(
const char*)body_wait->infomsg);
859 TRACE(DBG,
"wait request ("<<sleeptime<<
" secs)");
870 TRACE(XERR,
"after: "<<CmdName<<
": server reply not recognized - protocol error");
881 XPDLOC(ALL,
"Conn::GetAccessToSrv")
904 dum[0] = (kXR_int32)htonl(0);
905 dum[1] = (kXR_int32)htonl(2034);
911 TRACE(XERR,
"handshake failed with server "<<
URLTAG);
936 if (phyconn && phyconn->
IsValid()) {
951 if (phyconn && phyconn->
IsValid()) {
967 XPDLOC(ALL,
"Conn::DoHandShake")
974 TRACE(DBG,
"already connected to a PROOF server "<<
URLTAG);
979 struct ClientInitHandShake initHS;
980 memset(&initHS, 0,
sizeof(initHS));
981 initHS.third = (kXR_int32)htonl((
int)1);
985 int len =
sizeof(initHS);
986 TRACE(HDBG,
"step 1: sending "<<len<<
" bytes to server "<<
URLTAG);
988 int writeCount =
WriteRaw(&initHS, len, p);
989 if (writeCount != len) {
990 TRACE(XERR,
"sending "<<len<<
" bytes to server "<<
URLTAG);
996 dum[0] = (kXR_int32)htonl(4);
997 dum[1] = (kXR_int32)htonl(2012);
998 writeCount =
WriteRaw(&dum[0],
sizeof(dum), p);
999 if (writeCount !=
sizeof(dum)) {
1000 TRACE(XERR,
"sending "<<
sizeof(dum)<<
" bytes to server "<<
URLTAG);
1005 ServerResponseType
type;
1007 TRACE(HDBG,
"step 2: reading "<<len<<
" bytes from server "<<
URLTAG);
1011 int readCount =
ReadRaw(&type, len, p);
1012 if (readCount != len) {
1013 if (readCount == (
int)TXSOCK_ERR_TIMEOUT) {
1014 TRACE(ALL,
"-----------------------");
1015 TRACE(ALL,
"TimeOut condition reached reading from remote server.");
1016 TRACE(ALL,
"This may indicate that the server is a 'proofd', version <= 12");
1017 TRACE(ALL,
"Retry commenting the 'Plugin.TSlave' line in system.rootrc or adding");
1018 TRACE(ALL,
"Plugin.TSlave: ^xpd TSlave Proof \"TSlave(const char *,const char"
1019 " *,int,const char *, TProof *,ESlaveType,const char *,const char *)\"");
1020 TRACE(ALL,
"to your $HOME/.rootrc .");
1021 TRACE(ALL,
"-----------------------");
1023 TRACE(XERR,
"reading "<<len<<
" bytes from server "<<
URLTAG);
1034 struct ServerInitHandShake xbody;
1037 len =
sizeof(xbody);
1038 TRACE(HDBG,
"step 3: reading "<<len<<
" bytes from server "<<
URLTAG);
1040 readCount =
ReadRaw(&xbody, len, p);
1041 if (readCount != len) {
1042 TRACE(XERR,
"reading "<<len<<
" bytes from server "<<
URLTAG);
1054 }
else if (type == 8) {
1059 TRACE(XERR,
"unknown server type ("<<type<<
")");
1078 XPDLOC(ALL,
"Conn::Login")
1083 memset( &reqhdr, 0,
sizeof(reqhdr));
1088 XrdOucString ug =
fUser;
1089 if (
fUrl.Passwd.length() > 0) {
1095 if (ug.length() > 8) {
1105 }
else if (ug.length() >= 0) {
1106 memcpy((
void *)reqhdr.
login.
username, (
void *)(ug.c_str()), ug.length());
1107 if (ug.length() < 8) reqhdr.
login.
username[ug.length()] =
'\0';
1114 const void *buf = (
const void *)(
fLoginBuffer.c_str());
1132 XrdOucString usr((
const char *)&reqhdr.
login.
username[0], 8);
1151 XrdSecProtocol *secp = 0;
1161 &pltmp,
"XrdProofConn::Login", 0);
1164 char *plref = pltmp;
1169 if (len >= (
int)
sizeof(kXR_int32)) {
1172 memcpy(&vers, pltmp,
sizeof(kXR_int32));
1174 pltmp = (
char *)((
char *)pltmp +
sizeof(kXR_int32));
1175 len -=
sizeof(kXR_int32);
1178 if (pltmp && (len > 0)) {
1185 if (EnvGetLong(NAME_DEBUG) > 0) {
1186 s =
new char [strlen(
"XrdSecDEBUG")+20];
1187 sprintf(s,
"XrdSecDEBUG=%ld", EnvGetLong(NAME_DEBUG));
1191 s =
new char [strlen(
"XrdSecUSER")+
fUser.length()+2];
1192 sprintf(s,
"XrdSecUSER=%s",
fUser.c_str());
1195 s =
new char [strlen(
"XrdSecHOST")+
fHost.length()+2];
1196 sprintf(s,
"XrdSecHOST=%s",
fHost.c_str());
1201 struct passwd *pw = getpwuid(getuid());
1204 netrc +=
"/.rootnetrc";
1207 if (netrc.length() > 0) {
1208 s =
new char [strlen(
"XrdSecNETRC")+netrc.length()+2];
1209 sprintf(s,
"XrdSecNETRC=%s", netrc.c_str());
1214 char *plist =
new char[len+1];
1215 memcpy(plist, pltmp, len);
1217 TRACE(DBG,
"server requires authentication");
1220 resp = (secp != 0) ? 1 : 0;
1266 XPDLOC(ALL,
"Conn::Authenticate")
1268 XrdSecProtocol *protocol = (XrdSecProtocol *)0;
1270 if (!plist || plsiz <= 0)
1273 TRACE(DBG,
"host "<<
URLTAG<<
" sent a list of "<<plsiz<<
" bytes");
1277 struct sockaddr_in netaddr;
1278 #ifndef ROOT_XrdFour
1279 char **hosterrmsg = 0;
1280 if (XrdSysDNS::getHostAddr((
char *)
fUrl.HostAddr.c_str(),
1281 (
struct sockaddr &)netaddr, hosterrmsg) <= 0) {
1282 TRACE(XERR,
"getHostAddr: "<< *hosterrmsg);
1287 aNA.Set(
fUrl.HostAddr.c_str());
1288 memcpy(&netaddr, aNA.NetAddr(),
sizeof(
struct sockaddr_in));
1290 netaddr.sin_port =
fUrl.Port;
1293 XrdSecParameters *secToken = 0;
1294 XrdSecCredentials *credentials = 0;
1298 char *bpar = (
char *)
malloc(plsiz + 1);
1300 memcpy(bpar, plist, plsiz);
1302 XrdSecParameters Parms(bpar, plsiz + 1);
1309 XrdOucString libsec;
1311 #if !defined(ROOT_XrdNoUtils)
1312 libsec =
"libXrdSec";
1313 libsec += LT_MODULE_EXT;
1315 libsec =
"libXrdSec.so";
1322 TRACE(XERR,
"unable to load XrdSecGetProtocol()");
1329 (
const struct sockaddr &)netaddr, Parms, 0))) {
1332 XrdOucString protname = protocol->Entity.prot;
1336 credentials = protocol->getCredentials(0, &ei);
1338 TRACE(XERR,
"cannot obtain credentials (protocol: "<<protname<<
")");
1342 fLastErrMsg +=
"cannot obtain credentials for protocol: ";
1348 TRACE(HDBG,
"credentials size: " << credentials->size);
1353 memset(reqhdr.
auth.reserved, 0, 12);
1354 memset(reqhdr.
auth.credtype, 0, 4);
1355 memcpy(reqhdr.
auth.credtype, protname.c_str(), protname.length());
1358 int status = kXR_authmore;
1362 while (status == kXR_authmore) {
1367 reqhdr.
header.dlen = (credentials) ? credentials->size : 0;
1368 char *credbuf = (credentials) ? credentials->buffer : 0;
1369 xrsp =
SendReq(&reqhdr, credbuf, &srvans,
"XrdProofConn::Authenticate");
1372 dlen = (xrsp) ? xrsp->
DataLen() : 0;
1373 TRACE(HDBG,
"server reply: status: "<<status<<
" dlen: "<<dlen);
1375 if (xrsp && (status == kXR_authmore)) {
1380 secToken =
new XrdSecParameters(srvans, dlen);
1383 credentials = protocol->getCredentials(secToken, &ei);
1387 TRACE(XERR,
"cannot obtain credentials");
1402 TRACE(HDBG,
"credentials size " << credentials->size);
1404 }
else if (status != kXR_ok) {
1428 TRACE(XERR,
"unable to get protocol object.");
virtual bool GetAccessToSrv(XrdClientPhyConnection *p=0)
Gets access to the connected server.
virtual ~XrdProofConn()
Destructor.
virtual XrdClientMessage * ReadMsg()
Pickup message from the queue.
virtual void Connect(int=-1)
Run the connection attempts: the result is stored in fConnected.
void SetLogged(ELoginState status)
bool ConnectInterrupt()
Check if interrupted during connect.
static void SetRetryParam(int maxtry=5, int timewait=2)
Change values of the retry control parameters, numer of retries and wait time between attempts (in se...
#define TRACE(Flag, Args)
XrdClientMessage * SendReq(XPClientRequest *req, const void *reqData, char **answData, const char *CmdName, bool notifyerr=1)
SendReq tries to send a single command for a number of times.
XrdOucString fLoginBuffer
ServerResponseHeader fHdr
char * convertRespStatusToChar(kXR_int16 status)
void SetSID(kXR_char *sid)
Set our stream id, to match against that one in the server's response.
ESrvType DoHandShake(XrdClientPhyConnection *p=0)
Performs initial hand-shake with the server in order to understand which kind of server is there at t...
void SetInterrupt()
Interrupt the underlying socket.
struct ClientRequestHdr header
virtual int WriteRaw(const void *buf, int len, XrdClientPhyConnection *p=0)
Low level write call.
Vc_ALWAYS_INLINE void free(T *p)
Frees memory that was allocated with Vc::malloc.
bool MatchStreamID(struct ServerResponseHeader *resp)
Check stream ID matching.
XrdClientMessage * SendRecv(XPClientRequest *req, const void *reqData, char **answData)
SendRecv sends a command to the server and to get a response.
XrdProofConn(const char *url, char mode= 'M', int psid=-1, char ver=-1, XrdClientAbsUnsolMsgHandler *uh=0, const char *logbuf=0)
Constructor.
static XrdClientConnectionMgr * fgConnMgr
const char * GetLastErr()
if(pyself &&pyself!=Py_None)
int(* XrdProofConnSender_t)(const char *, int, void *)
bool Login()
This method perform the loggin-in into the server just after the hand-shake.
XrdProofConnSender_t fSender
int ReadRaw(void *buffer, int BufferLength, int substreamid=-1, int *usedsubstreamid=0)
virtual int TryConnect(int=-1)
Connect to remote server.
struct XPClientSendRcvRequest sendrcv
XrdSecProtocol * Authenticate(char *plist, int lsiz)
Negotiate authentication with the remote server.
XrdSysRecMutex * fConnectInterruptMtx
bool CheckErrorStatus(XrdClientMessage *, int &, const char *, bool)
Check error status.
ERemoteServerType fServerType
virtual bool Init(const char *url=0, int=-1)
Initialization.
Double_t length(const TVector2 &v)
static XrdSysPlugin * fgSecPlugin
#define XrdSysMutexHelper
void smartPrintServerHeader(struct ServerResponseHeader *hdr)
int clientMarshall(XPClientRequest *str)
This function applies the network byte order on those parts of the 16-bytes buffer, only if it is composed by some binary part Return 0 if OK, -1 in case the ID is unknown.
void SetConnectInterrupt()
Interrupt connection attempts.
struct ClientAuthRequest auth
bool CheckResp(struct ServerResponseHeader *resp, const char *met, bool)
Checks if the server's response is ours.
int WriteRaw(int LogConnectionID, const void *buffer, int BufferLength, int substreamid)
static void GetRetryParam(int &maxtry, int &timewait)
Retrieve current values of the retry control parameters, numer of retries and wait time between attem...
int GetLowSocket()
Return the socket descriptor of the underlying connection.
bool IsValid() const
Test validity of this connection.
ClassImp(TMCParticle) void TMCParticle printf(": p=(%7.3f,%7.3f,%9.3f) ;", fPx, fPy, fPz)
virtual void Close(const char *opt="")
Close connection.
XrdClientPhyConnection * fPhyConn
virtual void SetAsync(XrdClientAbsUnsolMsgHandler *uh, XrdProofConnSender_t=0, void *=0)
Set handler of unsolicited responses.
void SetSecProtocol(XrdSecProtocol *sp)
XrdClientMessage * ReadMsg(int LogConnectionID)
void ReConnect()
Perform a reconnection attempt when a connection is not valid any more.
struct XPClientLoginRequest login
XrdClientPhyConnection * GetPhyConnection()
int ReadRaw(int LogConnectionID, void *buffer, int BufferLength)
R__EXTERN C unsigned int sleep(unsigned int seconds)
void ServerInitHandShake2HostFmt(struct ServerInitHandShake *srh)
void smartPrintClientHeader(XPClientRequest *hdr)
int WriteRaw(const void *buffer, int BufferLength, int substreamid=0)
XReqErrorType LowWrite(XPClientRequest *, const void *, int)
Send request to server (NB: req is marshalled at this point, so we need also the plain reqDataLen) ...
int Connect(XrdClientUrlInfo RemoteAddress)
static void * fgSecGetProtocol
XrdClientLogConnection * GetConnection(int LogConnectionID)
void Disconnect(int LogConnectionID, bool ForcePhysicalDisc)
Vc_ALWAYS_INLINE_L T *Vc_ALWAYS_INLINE_R malloc(size_t n)
Allocates memory on the Heap with alignment and padding suitable for vectorized access.
virtual int ReadRaw(void *buf, int len, XrdClientPhyConnection *p=0)
Low level receive call.
XrdClientAbsUnsolMsgHandler * fUnsolMsgHandler
XrdSecProtocol *(* secGetProt_t)(const char *, const struct sockaddr &, const XrdSecParameters &, XrdOucErrInfo *)
virtual UnsolRespProcResult ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *s, XrdClientMessage *m)
We are here if an unsolicited response comes from a logical conn The response comes in the form of an...