44#include <sys/socket.h>
91Bool_t TXSocketPingHandler::Notify()
93 fSocket->Ping(
"ping handler");
129 :
TSocket(), fMode(
m), fLogLevel(loglevel),
130 fBuffer(logbuf), fConn(0), fASem(0), fAsynProc(1),
131 fDontTimeout(
kFALSE), fRDInterrupt(
kFALSE), fXrdProofdVersion(-1)
168 Error(
"TXSocket",
"internal pipe is invalid");
190 Error(
"TXSocket",
"fatal error occurred while opening a connection"
205 Error(
"TXSocket",
"create or attach failed (%s)",
273 Info(
"DisconnectSession",
"not connected: nothing to do");
277 Bool_t shutdown = opt && (strchr(opt,
'S') || strchr(opt,
's'));
278 Bool_t all = opt && (strchr(opt,
'A') || strchr(opt,
'a'));
280 if (
id > -1 || all) {
283 memset(&Request, 0,
sizeof(Request) );
293 fConn->
SendReq(&Request, (
const void *)0, 0,
"DisconnectSession");
315 Warning(
"Close",
"could not hold semaphore for async messages after %d sec: closing anyhow (may give error messages)", to);
323 Info(
"Close",
"no connection: nothing to do");
337 if (o.Index(
"#") !=
kNPOS) {
338 o.Remove(0,o.Index(
"#")+1);
339 if (o.Index(
"#") !=
kNPOS) {
340 o.Remove(o.Index(
"#"));
341 sessID = o.IsDigit() ? o.Atoi() : sessID;
376 Error(
"ProcessUnsolicitedMsg",
"%p: async semaphore taken by Close()! Should not be here!",
this);
382 Info(
"ProcessUnsolicitedMsg",
"%p: got empty message: skipping",
this);
387 Info(
"ProcessUnsolicitedMsg",
"%p: got message with status: %d, len: %d bytes (ID: %d)",
388 this,
m->GetStatusCode(),
m->DataLen(),
m->HeaderSID());
395 Info(
"ProcessUnsolicitedMsg",
"%p: got error from underlying connection",
this);
399 Info(
"ProcessUnsolicitedMsg",
"%p: handler undefined or recovery failed",
this);
409 Info(
"ProcessUnsolicitedMsg",
"%p: underlying connection timed out",
this);
419 Info(
"ProcessUnsolicitedMsg",
"%p: IDs do not match: {%d, %d}",
this,
fConn->
fStreamid,
m->HeaderSID());
425 if ((len =
m->DataLen()) < (
int)
sizeof(kXR_int32)) {
426 Error(
"ProcessUnsolicitedMsg",
"empty or bad-formed message - disabling");
436 memcpy(&acod,
m->GetData(),
sizeof(kXR_int32));
438 Info(
"ProcessUnsolicitedMsg",
"%p: got acod %d (%x): message has status: %d, len: %d bytes (ID: %d)",
439 this, acod, acod,
m->GetStatusCode(),
m->DataLen(),
m->HeaderSID());
442 void *pdata = (
void *)((
char *)(
m->GetData()) +
sizeof(kXR_int32));
443 len -=
sizeof(kXR_int32);
445 Info(
"ProcessUnsolicitedMsg",
"%p: got action: %d (%d bytes) (ID: %d)",
446 this, acod, len,
m->HeaderSID());
464 lab = !lab ?
"kXPD_interrupt" : lab;
465 { std::lock_guard<std::recursive_mutex> lock(
fIMtx);
467 memcpy(&ilev, pdata,
sizeof(kXR_int32));
470 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
471 len -=
sizeof(kXR_int32);
476 memcpy(&ifw, pdata,
sizeof(kXR_int32));
479 Info(
"ProcessUnsolicitedMsg",
"%s: forwarding option: %d", lab, ifw);
492 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
503 memcpy(&opt, pdata,
sizeof(kXR_int32));
506 Info(
"ProcessUnsolicitedMsg",
"kXPD_timer: found opt: %d", opt);
508 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
509 len -=
sizeof(kXR_int32);
513 memcpy(&delay, pdata,
sizeof(kXR_int32));
516 Info(
"ProcessUnsolicitedMsg",
"kXPD_timer: found delay: %d", delay);
518 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
519 len -=
sizeof(kXR_int32);
528 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
535 kXR_int32 inflate = 1000;
537 memcpy(&inflate, pdata,
sizeof(kXR_int32));
540 Info(
"ProcessUnsolicitedMsg",
"kXPD_inflate: factor: %d", inflate);
542 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
543 len -=
sizeof(kXR_int32);
551 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
558 kXR_int32 priority = -1;
560 memcpy(&priority, pdata,
sizeof(kXR_int32));
563 Info(
"ProcessUnsolicitedMsg",
"kXPD_priority: priority: %d", priority);
565 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
566 len -=
sizeof(kXR_int32);
574 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
587 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
597 memcpy(&
type, pdata,
sizeof(kXR_int32));
600 Info(
"ProcessUnsolicitedMsg",
"kXPD_urgent: found type: %d",
type);
602 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
603 len -=
sizeof(kXR_int32);
608 memcpy(&int1, pdata,
sizeof(kXR_int32));
611 Info(
"ProcessUnsolicitedMsg",
"kXPD_urgent: found int1: %d", int1);
613 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
614 len -=
sizeof(kXR_int32);
619 memcpy(&int2, pdata,
sizeof(kXR_int32));
622 Info(
"ProcessUnsolicitedMsg",
"kXPD_urgent: found int2: %d", int2);
624 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
625 len -=
sizeof(kXR_int32);
634 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
640 { std::lock_guard<std::recursive_mutex> lock(
fAMtx);
645 Error(
"ProcessUnsolicitedMsg",
"could allocate spare buffer");
648 memcpy(
b->fBuf, pdata, len);
662 Info(
"ProcessUnsolicitedMsg",
"%p: %s: posting semaphore: %p (%d bytes)",
669 Info(
"ProcessUnsolicitedMsg",
670 "kXPD_feedback treatment not yet implemented");
678 memcpy(&opt, pdata,
sizeof(kXR_int32));
680 if (opt >= 0 && opt <= 4) {
682 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
683 len -=
sizeof(kXR_int32);
690 Printf(
"| %.*s", len, (
char *)pdata);
691 }
else if (opt == 2) {
693 Printf(
"%.*s", len, (
char *)pdata);
694 }
else if (opt == 3) {
696 fprintf(stderr,
"%.*s", len, (
char *)pdata);
697 }
else if (opt == 4) {
699 fprintf(stderr,
"%.*s\r", len, (
char *)pdata);
703 Printf(
"| Message from server:");
704 Printf(
"| %.*s", len, (
char *)pdata);
712 Printf(
"| Error condition occured: message from server:");
713 Printf(
"| %.*s", len, (
char *)pdata);
719 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
724 { std::lock_guard<std::recursive_mutex> lock(
fAMtx);
728 memcpy(&cid, pdata,
sizeof(kXR_int32));
732 Info(
"ProcessUnsolicitedMsg",
"found cid: %d", cid);
735 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
736 len -=
sizeof(kXR_int32);
741 Error(
"ProcessUnsolicitedMsg",
"could allocate spare buffer");
744 memcpy(
b->fBuf, pdata, len);
761 Info(
"ProcessUnsolicitedMsg",
"%p: cid: %d, posting semaphore: %p (%d bytes)",
762 this, cid, &
fASem, len);
771 if (what.BeginsWith(
"idle-timeout")) {
776 Printf(
"| %s", what.Data());
781 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
800 kXR_int32 nsess = -1, nacti = -1, neffs = -1;
803 memcpy(&nsess, pdata,
sizeof(kXR_int32));
805 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
806 len -=
sizeof(kXR_int32);
808 memcpy(&nacti, pdata,
sizeof(kXR_int32));
810 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
811 len -=
sizeof(kXR_int32);
813 memcpy(&neffs, pdata,
sizeof(kXR_int32));
815 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
816 len -=
sizeof(kXR_int32);
819 Info(
"ProcessUnsolicitedMsg",
"kXPD_clusterinfo: # sessions: %d,"
820 " # active: %d, # effective: %f", nsess, nacti, neffs/1000.);
827 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
831 Error(
"ProcessUnsolicitedMsg",
"%p: unknown action code: %d received from '%s' - disabling",
854 if (msg && strlen(msg) > 0)
861 char *mbuf =
m.Buffer();
863 if (
m.CompBuffer()) {
864 mbuf =
m.CompBuffer();
865 mlen =
m.CompLength();
870 std::lock_guard<std::recursive_mutex> lock(
fAMtx);
875 Error(
"PostMsg",
"could allocate spare buffer");
880 memcpy(
b->fBuf, mbuf, mlen);
894 Info(
"PostMsg",
"%p: posting type %d to semaphore: %p (%d bytes)",
907 std::lock_guard<std::recursive_mutex> lock(
fAMtx);
975 Info(
"GetInterrupt",
"%p: waiting to lock mutex",
this);
977 std::lock_guard<std::recursive_mutex> lock(
fIMtx);
985 Error(
"GetInterrupt",
"value is unset (%d) - protocol error",
fILev);
1007 list<TXSockBuf *> splist;
1008 list<TXSockBuf *>::iterator i;
1010 { std::lock_guard<std::recursive_mutex> lock(
fAMtx);
1013 if (
fAQue.size() > 0) {
1020 splist.push_back(*i);
1029 Printf(
"Warning in TXSocket::Flush: semaphore counter already 0 (sz: %d)", sz);
1036 { std::lock_guard<std::mutex> lock(
fgSMtx);
1037 if (splist.size() > 0) {
1038 for (i = splist.begin(); i != splist.end();) {
1040 i = splist.erase(i);
1058 Info(
"Create",
"not connected: nothing to do");
1064 while (retriesleft--) {
1069 memset( &reqhdr, 0,
sizeof(reqhdr));
1073 if (
fMode ==
'A' || attach) {
1084 const void *buf = (
const void *)(
fBuffer.Data());
1087 Info(
"Create",
"sending %d bytes to server", reqhdr.
header.dlen);
1091 Info(
"Create",
"creating session of server %s",
fUrl.Data());
1096 &answData,
"TXSocket::Create", 0);
1097 struct ServerResponseBody_Protocol *srvresp = (
struct ServerResponseBody_Protocol *)answData;
1105 void *pdata = (
void *)(xrsp->
GetData());
1108 if (len >= (
Int_t)
sizeof(kXR_int32)) {
1111 memcpy(&psid, pdata,
sizeof(kXR_int32));
1113 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
1114 len -=
sizeof(kXR_int32);
1116 Error(
"Create",
"session ID is undefined!");
1118 if (srvresp)
free(srvresp);
1122 if (len >= (
Int_t)
sizeof(kXR_int16)) {
1125 memcpy(&dver, pdata,
sizeof(kXR_int16));
1127 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int16));
1128 len -=
sizeof(kXR_int16);
1130 Warning(
"Create",
"protocol version of the remote PROOF undefined!");
1135 len +=
sizeof(kXR_int16);
1137 memcpy(&dver, pdata,
sizeof(kXR_int32));
1139 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
1140 len -=
sizeof(kXR_int32);
1142 if (len >= (
Int_t)
sizeof(kXR_int16)) {
1145 memcpy(&dver, pdata,
sizeof(kXR_int16));
1147 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int16));
1148 len -=
sizeof(kXR_int16);
1150 Warning(
"Create",
"version of the remote XrdProofdProtocol undefined!");
1156 char *url =
new char[len+1];
1157 memcpy(url, pdata, len);
1165 if (srvresp)
free(srvresp);
1172 if (retriesleft <= 0 && fConn->GetLastErr()) {
1180 if (srvresp)
free(srvresp);
1186 if ((ilog = emsg.Index(
"|log:")) !=
kNPOS) emsg.Remove(ilog);
1193 Info(
"Create",
"creation/attachment attempt failed: %d attempts left", retriesleft);
1194 if (retriesleft <= 0)
1195 Error(
"Create",
"%d creation/attachment attempts failed: no attempts left",
1198 if (srvresp)
free(srvresp);
1207 "problems creating or attaching to a remote server (%s)",
1228 memset( &Request, 0,
sizeof(Request) );
1243 Int_t nsent = length;
1261 Printf(
"%s: error occured but no message from server",
fHost.Data());
1265 Error(
"SendRaw",
"%s: problems sending %d bytes to server",
1266 fHost.Data(), length);
1280 Info(
"Ping",
"%p: %s: sid: %d",
this, ord ? ord :
"int",
fSessionID);
1284 Error(
"Ping",
"not connected: nothing to do");
1293 memset( &Request, 0,
sizeof(Request) );
1305 fConn->
SendReq(&Request, (
const void *)0, &pans,
"Ping");
1306 kXR_int32 *pres = (kXR_int32 *) pans;
1322 if (pans)
free(pans);
1329 Error(
"Ping",
"%p: int: problems marshalling request",
this);
1335 Error(
"Ping",
"%p: %s: problems sending ping to server",
this, ord ? ord :
"int");
1337 Info(
"Ping",
"%p: %s: sid: %d OK",
this, ord ? ord :
"int",
fSessionID);
1352 Info(
"RemoteTouch",
"%p: sending touch request to %s",
this,
GetName());
1356 Error(
"RemoteTouch",
"not connected: nothing to do");
1362 memset( &Request, 0,
sizeof(Request) );
1371 Error(
"Touch",
"%p: problems marshalling request ",
this);
1375 Error(
"Touch",
"%p: problems sending touch request to server",
this);
1390 Info(
"CtrlC",
"%p: sending ctrl-c request to %s",
this,
GetName());
1394 Error(
"CtrlC",
"not connected: nothing to do");
1400 memset( &Request, 0,
sizeof(Request) );
1408 Error(
"CtrlC",
"%p: problems marshalling request ",
this);
1412 Error(
"CtrlC",
"%p: problems sending ctrl-c request to server",
this);
1427 Info(
"PickUpReady",
"%p: %s: going to sleep",
this,
GetTitle());
1432 static Int_t dt = 2000;
1440 Error(
"PickUpReady",
"error waiting at semaphore");
1444 Info(
"PickUpReady",
"%p: %s: got timeout: retring (%d secs)",
1454 Info(
"PickUpReady",
"interrupted");
1463 Error(
"PickUpReady",
"error waiting at semaphore");
1470 Info(
"PickUpReady",
"%p: %s: waken up",
this,
GetTitle());
1472 std::lock_guard<std::recursive_mutex> lock(
fAMtx);
1475 if (
fAQue.size() <= 0) {
1476 Error(
"PickUpReady",
"queue is empty - protocol error ?");
1480 Error(
"PickUpReady",
"got invalid buffer - protocol error ?");
1490 Info(
"PickUpReady",
"%p: %s: got message (%d bytes)",
1515 static Int_t nBuf = 0;
1517 std::lock_guard<std::mutex> lock(
fgSMtx);
1521 list<TXSockBuf *>::iterator i;
1523 maxsz = ((*i)->fSiz > maxsz) ? (*i)->fSiz : maxsz;
1524 if ((*i) && (*i)->fSiz >= size) {
1527 Info(
"PopUpSpare",
"asked: %d, spare: %d/%d, REUSE buf %p, sz: %d",
1528 size, (
int)
fgSQue.size(), nBuf, buf, buf->
fSiz);
1538 Info(
"PopUpSpare",
"asked: %d, spare: %d/%d, maxsz: %d, RESIZE buf %p, sz: %d",
1539 size, (
int)
fgSQue.size(), nBuf, maxsz, buf, buf->
fSiz);
1550 Info(
"PopUpSpare",
"asked: %d, spare: %d/%d, maxsz: %d, NEW buf %p, sz: %d",
1551 size, (
int)
fgSQue.size(), nBuf, maxsz, buf, buf->
fSiz);
1562 std::lock_guard<std::mutex> lock(
fgSMtx);
1565 Info(
"PushBackSpare",
"release buf %p, sz: %d (BuffMem: %lld)",
1584 if (!buffer || (length <= 0))
1607 while (tobecopied > 0) {
1644 memset(&Request, 0,
sizeof(Request) );
1656 fConn->
SendReq(&Request, (
const void *)0, 0,
"SendInterrupt");
1671 Error(
"SendInterrupt",
"problems sending interrupt to server");
1679 std::lock_guard<std::recursive_mutex> lock(
fAMtx);
1694 Error(
"Send",
"cannot send a message used for reading");
1712 char *mbuf = mess.
Buffer();
1720 kXR_int32 fSendOptDefault =
fSendOpt;
1721 switch (mess.
What()) {
1754 Info(
"Send",
"sending type %d (%d bytes) to '%s'", mess.
What(), mlen,
GetTitle());
1765 return nsent -
sizeof(
UInt_t);
1792 char *buf =
new char[len+
sizeof(
UInt_t)];
1832 const void *buf = 0;
1835 memset(&reqhdr, 0,
sizeof(reqhdr));
1847 vout = (
char **)&bout;
1854 reqhdr.
header.dlen = (msg) ? strlen(msg) : 0;
1855 buf = (msg) ? (
const void *)msg : buf;
1862 reqhdr.
header.dlen = (msg) ? strlen(msg) : 0;
1863 buf = (msg) ? (
const void *)msg : buf;
1864 vout = (
char **)&bout;
1867 vout = (
char **)&bout;
1875 reqhdr.
header.dlen = (msg) ? strlen(msg) : 0;
1876 buf = (msg) ? (
const void *)msg : buf;
1879 reqhdr.
header.dlen = (msg) ? strlen(msg) : 0;
1880 buf = (msg) ? (
const void *)msg : buf;
1884 reqhdr.
header.dlen = (msg) ? strlen(msg) : 0;
1886 buf = (
const void *)msg;
1887 vout = (
char **)&bout;
1894 Info(
"SendCoordinator",
"kReadBuffer: old server (ver %d < 1003):"
1899 if (!msg || strlen(msg) <= 0) {
1900 Info(
"SendCoordinator",
"kReadBuffer: file path undefined");
1903 reqhdr.
header.dlen = strlen(msg);
1904 buf = (
const void *)msg;
1905 vout = (
char **)&bout;
1908 Info(
"SendCoordinator",
"unknown message kind: %d", kind);
1915 fConn->
SendReq(&reqhdr, buf, vout,
"TXSocket::SendCoordinator", noterr);
1920 if (bout && (xrsp->
DataLen() > 0))
1948 memset(&Request, 0,
sizeof(Request) );
1959 fConn->
SendReq(&Request, (
const void *)0, 0,
"SendUrgent");
1997 const char *cenv = 0;
2000 TString allowCO =
gEnv->
GetValue(
"XProof.ConnectDomainAllowRE",
"");
2001 if (allowCO.Length() > 0)
2005 TString denyCO =
gEnv->
GetValue(
"XProof.ConnectDomainDenyRE",
"");
2006 if (denyCO.Length() > 0)
2034 TString socks4Host =
gEnv->
GetValue(
"XNet.SOCKS4Host",
"");
2036 if (socks4Port > 0) {
2037 if (socks4Host.IsNull())
2039 socks4Host =
"127.0.0.1";
2045 TString autolog =
gEnv->
GetValue(
"XSec.Pwd.AutoLogin",
"1");
2046 if (autolog.Length() > 0 &&
2047 (!(cenv =
gSystem->
Getenv(
"XrdSecPWDAUTOLOG")) || strlen(cenv) <= 0))
2055 TString alogfile =
gEnv->
GetValue(
"XSec.Pwd.ALogFile",
"");
2056 if (alogfile.Length() > 0)
2059 TString verisrv =
gEnv->
GetValue(
"XSec.Pwd.VerifySrv",
"1");
2060 if (verisrv.Length() > 0 &&
2061 (!(cenv =
gSystem->
Getenv(
"XrdSecPWDVERIFYSRV")) || strlen(cenv) <= 0))
2064 TString srvpuk =
gEnv->
GetValue(
"XSec.Pwd.ServerPuk",
"");
2065 if (srvpuk.Length() > 0)
2070 if (cadir.Length() > 0)
2074 if (crldir.Length() > 0)
2077 TString crlext =
gEnv->
GetValue(
"XSec.GSI.CRLextension",
"");
2078 if (crlext.Length() > 0)
2081 TString ucert =
gEnv->
GetValue(
"XSec.GSI.UserCert",
"");
2082 if (ucert.Length() > 0)
2086 if (ukey.Length() > 0)
2089 TString upxy =
gEnv->
GetValue(
"XSec.GSI.UserProxy",
"");
2090 if (upxy.Length() > 0)
2093 TString valid =
gEnv->
GetValue(
"XSec.GSI.ProxyValid",
"");
2094 if (valid.Length() > 0)
2097 TString deplen =
gEnv->
GetValue(
"XSec.GSI.ProxyForward",
"0");
2098 if (deplen.Length() > 0 &&
2099 (!(cenv =
gSystem->
Getenv(
"XrdSecGSIPROXYDEPLEN")) || strlen(cenv) <= 0))
2102 TString pxybits =
gEnv->
GetValue(
"XSec.GSI.ProxyKeyBits",
"");
2103 if (pxybits.Length() > 0)
2106 TString crlcheck =
gEnv->
GetValue(
"XSec.GSI.CheckCRL",
"1");
2107 if (crlcheck.Length() > 0 &&
2108 (!(cenv =
gSystem->
Getenv(
"XrdSecGSICRLCHECK")) || strlen(cenv) <= 0))
2111 TString delegpxy =
gEnv->
GetValue(
"XSec.GSI.DelegProxy",
"0");
2112 if (delegpxy.Length() > 0 &&
2113 (!(cenv =
gSystem->
Getenv(
"XrdSecGSIDELEGPROXY")) || strlen(cenv) <= 0))
2116 TString signpxy =
gEnv->
GetValue(
"XSec.GSI.SignProxy",
"1");
2117 if (signpxy.Length() > 0 &&
2118 (!(cenv =
gSystem->
Getenv(
"XrdSecGSISIGNPROXY")) || strlen(cenv) <= 0))
2123 ::Info(
"TXSocket",
"(C) 2005 CERN TXSocket (XPROOF client) %s",
2124 gROOT->GetVersion());
2136 Info(
"Reconnect",
"%p (c:%p, v:%d): trying to reconnect to %s (logid: %d)",
2143 if (tryreconnect == 0)
2144 Info(
"Reconnect",
"%p: reconnection attempts explicitly disabled!",
this);
2146 Info(
"Reconnect",
"%p: server does not support reconnections (protocol: %d < 1005)",
2161 Error(
"TXSocket",
"create or attach failed (%s)",
2172 Info(
"Reconnect",
"%p (c:%p): attempt %s (logid: %d)",
this,
fConn,
2176 Info(
"Reconnect",
"%p (c:0x0): attempt failed",
this);
2262 if (pipe(
fPipe) != 0) {
2263 Printf(
"TXSockPipe: problem initializing pipe for socket inputs");
2290 { std::lock_guard<std::recursive_mutex> lock(
fMutex);
2296 if (write(
fPipe[1],(
const void *)&
c,
sizeof(
Char_t)) < 1) {
2297 Printf(
"TXSockPipe::Post: %s: can't notify pipe",
fLoc.Data());
2304 Printf(
"TXSockPipe::Post: %s: %p: pipe posted (pending %d) (descriptor: %d)",
2321 { std::lock_guard<std::recursive_mutex> lock(
fMutex);
2323 Printf(
"TXSockPipe::Clean: %s: can't read from pipe",
fLoc.Data());
2333 Printf(
"TXSockPipe::Clean: %s: %p: pipe cleaned (pending %d) (descriptor: %d)",
2351 { std::lock_guard<std::recursive_mutex> lock(
fMutex);
2361 Printf(
"TXSockPipe::Flush: %s: can't read from pipe",
fLoc.Data());
2369 Printf(
"TXSockPipe::Flush: %s: %p: pipe flushed",
fLoc.Data(),
s);
2380 std::lock_guard<std::recursive_mutex> lock(
fMutex);
2386 buf +=
Form(
" %p",o);
2387 Printf(
"TXSockPipe::DumpReadySock: %s: list content: %s",
fLoc.Data(), buf.Data());
2395 std::lock_guard<std::recursive_mutex> lock(
fMutex);
UShort_t net2host(UShort_t x)
unsigned long long ULong64_t
void ErrorHandler(int level, const char *location, const char *fmt, va_list va)
General error handler function. It calls the user set error handler.
const char *const kPROOF_WorkerIdleTO
char * Form(const char *fmt,...)
R__EXTERN TSystem * gSystem
static XrdSysError eDest(0, "Proofx")
static XrdSysLogger eLogger
XrdOucTrace * XrdProofdTrace
#define kXPD_startprocess
#define NAME_CONNECTTIMEOUT
#define NAME_REQUESTTIMEOUT
#define NAME_FIRSTCONNECTMAXCNT
#define NAME_RECONNECTWAIT
#define NAME_CONNECTDOMAINDENY_RE
#define NAME_CONNECTDOMAINALLOW_RE
#define NAME_KEEPSOCKOPENIFNOTXRD
#define DFLT_RECONNECTWAIT
#define EnvPutInt(name, val)
#define EnvPutString(name, val)
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
virtual Bool_t Notify()
Notify when event occurred on descriptor associated with this handler.
virtual Bool_t ReadNotify()
Notify when something can be read from the descriptor associated with this handler.
virtual void Add(TObject *obj)
virtual TObject * Remove(TObject *obj)
Remove object from the list.
virtual TObject * FindObject(const char *name) const
Delete a TObjLink object.
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
void SetLength() const
Set the message length at the beginning of the message buffer.
char * CompBuffer() const
Int_t Compress()
Compress the message.
Int_t GetCompressionLevel() const
void SetWhat(UInt_t what)
Using this method one can change the message type a-posteriory.
virtual const char * GetTitle() const
Returns title of object.
virtual const char * GetName() const
Returns name of object.
Collectable string class.
Mother of all ROOT objects.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Int_t TryWait()
If the semaphore value is > 0 then decrement it and return 0.
Int_t Post()
Increment the value of the semaphore.
Int_t Wait()
If the semaphore value is > 0 then decrement it and carry on, else block, waiting on the condition un...
void SendStreamerInfos(const TMessage &mess)
Check if TStreamerInfo must be sent.
Bool_t RecvStreamerInfos(TMessage *mess)
Receive a message containing streamer infos.
static ULong64_t fgBytesRecv
Bool_t RecvProcessIDs(TMessage *mess)
Receive a message containing process ids.
Int_t GetCompressionLevel() const
static ULong64_t fgBytesSent
void SendProcessIDs(const TMessage &mess)
Check if TProcessIDs must be sent.
void SetCompressionSettings(Int_t settings=ROOT::RCompressionSetting::EDefaults::kUseGeneralPurpose)
Used to specify the compression level and algorithm: settings = 100 * algorithm + level.
static TString Format(const char *fmt,...)
Static method which formats a string using a printf style format descriptor and return a TString.
static void ResetErrno()
Static function resetting system error number.
virtual int GetPid()
Get process id.
virtual const char * Getenv(const char *env)
Get environment variable.
virtual const char * HomeDirectory(const char *userName=0)
Return the user's home directory.
virtual TInetAddress GetHostByName(const char *server)
Get Internet Protocol (IP) address of host.
virtual void Setenv(const char *name, const char *value)
Set environment variable.
This class represents a WWW compatible URL.
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
const char * GetHost() const
Handler of asynchronous events for XProofD sockets.
virtual Bool_t HandleInput(const void *in=0)
Handler of asynchronous input events.
virtual Bool_t HandleError(const void *in=0)
Handler of asynchronous error events.
static void SetMemMax(Long64_t memmax)
Return the max allocated memory allowed.
static Long64_t GetMemMax()
Return the max allocated memory allowed.
static Long64_t BuffMem()
Return the currently allocated memory.
TXSockBuf(Char_t *bp=0, Int_t sz=0, Bool_t own=1)
constructor
static Long64_t fgBuffMem
void Resize(Int_t sz)
resize socket buffer
TXSockPipe(const char *loc="")
Constructor.
void SetLoc(const char *loc="")
Int_t Flush(TSocket *s)
Remove any reference to socket 's' from the global pipe and ready-socket queue.
std::recursive_mutex fMutex
Int_t Clean(TSocket *s)
Read a byte to the global pipe to synchronize message pickup.
virtual ~TXSockPipe()
Destructor.
void DumpReadySock()
Dump content of the ready socket list.
Int_t Post(TSocket *s)
Write a byte to the global pipe to signal new availibility of new messages.
TXSocket * GetLastReady()
Return last ready socket.
High level handler of connections to XProofD.
void SetSessionID(Int_t id)
Set session ID to 'id'. If id < 0, disable also the asynchronous handler.
static std::list< TXSockBuf * > fgSQue
Int_t GetSessionID() const
Getter for session ID.
virtual Int_t GetClientID() const
void PostSemAll()
Wake up all threads waiting for at the semaphore (used by TXSlave)
Int_t GetLogConnID() const
Getter for logical connection ID.
Int_t GetOpenError() const
Getter for last error.
std::list< TXSockBuf * > fAQue
Int_t GetServType() const
Getter for server type.
static void SetLocation(const char *loc="")
Set location string.
TXSocket(const char *url, Char_t mode='M', Int_t psid=-1, Char_t ver=-1, const char *logbuf=0, Int_t loglevel=-1, TXHandler *handler=0)
Constructor Open the connection to a remote XrdProofd instance and start a PROOF session.
Int_t SendInterrupt(Int_t type)
Send urgent message (interrupt) to remote server Returns 0 or -1 in case of error.
void DoError(int level, const char *location, const char *fmt, va_list va) const
Interface to ErrorHandler (protected).
std::recursive_mutex fAMtx
Bool_t Create(Bool_t attach=kFALSE)
This method sends a request for creation of (or attachment to) a remote server application.
virtual Int_t GetClientIDSize() const
virtual ~TXSocket()
Destructor.
void PushBackSpare()
Release read buffer giving back to the spare list.
virtual UnsolRespProcResult ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *s, XrdClientMessage *msg)
We are here if an unsolicited response comes from a logical conn The response comes in the form of an...
static void InitEnvs()
Init environment variables for XrdClient.
Bool_t Ping(const char *ord=0)
Ping functionality: contact the server to check its vitality.
std::recursive_mutex fIMtx
TXSockBuf * PopUpSpare(Int_t sz)
Pop-up a buffer of at least size bytes from the spare list If none is found either one is reallocated...
Int_t RecvRaw(void *buf, Int_t len, ESendRecvOptions opt=kDefault)
Receive a raw buffer of specified length bytes.
Int_t GetLowSocket() const
virtual void Close(Option_t *opt="")
Close connection.
Int_t PickUpReady()
Wait and pick-up next buffer from the asynchronous queue.
void SetInterrupt(Bool_t i=kTRUE)
void SendUrgent(Int_t type, Int_t int1, Int_t int2)
Send urgent message to counterpart; 'type' specifies the type of the message (see TXSocket::EUrgentMs...
TObjString * SendCoordinator(Int_t kind, const char *msg=0, Int_t int2=0, Long64_t l64=0, Int_t int3=0, const char *opt=0)
Send message to intermediate coordinator.
virtual Int_t Reconnect()
Try reconnection after failure.
Int_t SendRaw(const void *buf, Int_t len, ESendRecvOptions opt=kDontBlock)
Send a raw buffer of specified length.
Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Int_t Flush()
Flush the asynchronous queue.
void DisconnectSession(Int_t id, Option_t *opt="")
Disconnect a session.
Int_t GetInterrupt(Bool_t &forward)
Get latest interrupt level and reset it; if the interrupt has to be propagated to lower stages forwar...
virtual void SetClientID(Int_t)
Int_t Send(const TMessage &mess)
Send a TMessage object.
void CtrlC()
Interrupt the remote protocol instance.
Bool_t IsServProofd()
Return kTRUE if the remote server is a 'proofd'.
void PostMsg(Int_t type, const char *msg=0)
Post a message of type 'type' into the read messages queue.
void SetAWait(Bool_t w=kTRUE)
Bool_t IsValid() const
Getter for validity status.
void RemoteTouch()
Remote touch functionality: contact the server to proof our vitality.
XrdClientPhyConnection * fPhyConn
XrdClientMessage * SendReq(XPClientRequest *req, const void *reqData, char **answData, const char *CmdName, bool notifyerr=1)
SendReq tries to send a single command for a number of times.
virtual void SetAsync(XrdClientAbsUnsolMsgHandler *uh, XrdProofConnSender_t=0, void *=0)
Set handler of unsolicited responses.
bool IsValid() const
Test validity of this connection.
virtual void Close(const char *opt="")
Close connection.
short GetSessionID() const
void SetSID(kXR_char *sid)
Set our stream id, to match against that one in the server's response.
XReqErrorType LowWrite(XPClientRequest *, const void *, int)
Send request to server (NB: req is marshalled at this point, so we need also the plain reqDataLen)
const char * GetLastErr()
int GetLowSocket()
Return the socket descriptor of the underlying connection.
void ReConnect()
Perform a reconnection attempt when a connection is not valid any more.
static void SetRetryParam(int maxtry=5, int timewait=2)
Change values of the retry control parameters, numer of retries and wait time between attempts (in se...
void SetInterrupt()
Interrupt the underlying socket.
static constexpr double s
void forward(const LAYERDATA &prevLayerData, LAYERDATA &currLayerData)
apply the weights (and functions) in forward direction of the DNN
int clientMarshall(XPClientRequest *str)
This function applies the network byte order on those parts of the 16-bytes buffer,...
struct ClientRequestHdr header
struct XPClientProofRequest proof
struct XPClientInterruptRequest interrupt
struct XPClientSendRcvRequest sendrcv
struct XPClientReadbufRequest readbuf