#include "Bytes.h"
#include "Compression.h"
#include "NetErrors.h"
#include "TEnv.h"
#include "TError.h"
#include "TMessage.h"
#include "TUDPSocket.h"
#include "TPluginManager.h"
#include "TROOT.h"
#include "TString.h"
#include "TSystem.h"
#include "TUrl.h"
#include "TVirtualAuth.h"
#include "TStreamerInfo.h"
#include "TProcessID.h"
ULong64_t TUDPSocket::fgBytesSent = 0;
ULong64_t TUDPSocket::fgBytesRecv = 0;
ClassImp(TUDPSocket)
TUDPSocket::TUDPSocket(TInetAddress addr, const char *service)
: TNamed(addr.GetHostName(), service)
{
R__ASSERT(gROOT);
R__ASSERT(gSystem);
fService = service;
fSecContext = 0;
fRemoteProtocol= -1;
fServType = kSOCKD;
if (fService.Contains("root"))
fServType = kROOTD;
if (fService.Contains("proof"))
fServType = kPROOFD;
fAddress = addr;
fAddress.fPort = gSystem->GetServiceByName(service);
fBytesSent = 0;
fBytesRecv = 0;
fCompress = 0;
fUUIDs = 0;
fLastUsageMtx = 0;
ResetBit(TUDPSocket::kBrokenConn);
if (fAddress.GetPort() != -1) {
fSocket = gSystem->OpenConnection(addr.GetHostName(), fAddress.GetPort(),
-1, "upd");
if (fSocket != -1) {
R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfSockets()->Add(this);
}
} else
fSocket = -1;
}
TUDPSocket::TUDPSocket(TInetAddress addr, Int_t port)
: TNamed(addr.GetHostName(), "")
{
R__ASSERT(gROOT);
R__ASSERT(gSystem);
fService = gSystem->GetServiceByPort(port);
fSecContext = 0;
fRemoteProtocol= -1;
fServType = kSOCKD;
if (fService.Contains("root"))
fServType = kROOTD;
if (fService.Contains("proof"))
fServType = kPROOFD;
fAddress = addr;
fAddress.fPort = port;
SetTitle(fService);
fBytesSent = 0;
fBytesRecv = 0;
fCompress = 0;
fUUIDs = 0;
fLastUsageMtx = 0;
ResetBit(TUDPSocket::kBrokenConn);
fSocket = gSystem->OpenConnection(addr.GetHostName(), fAddress.GetPort(),
-1, "upd");
if (fSocket == -1)
fAddress.fPort = -1;
else {
R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfSockets()->Add(this);
}
}
TUDPSocket::TUDPSocket(const char *host, const char *service)
: TNamed(host, service)
{
R__ASSERT(gROOT);
R__ASSERT(gSystem);
fService = service;
fSecContext = 0;
fRemoteProtocol= -1;
fServType = kSOCKD;
if (fService.Contains("root"))
fServType = kROOTD;
if (fService.Contains("proof"))
fServType = kPROOFD;
fAddress = gSystem->GetHostByName(host);
fAddress.fPort = gSystem->GetServiceByName(service);
SetName(fAddress.GetHostName());
fBytesSent = 0;
fBytesRecv = 0;
fCompress = 0;
fUUIDs = 0;
fLastUsageMtx = 0;
ResetBit(TUDPSocket::kBrokenConn);
if (fAddress.GetPort() != -1) {
fSocket = gSystem->OpenConnection(host, fAddress.GetPort(), -1, "upd");
if (fSocket != -1) {
R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfSockets()->Add(this);
}
} else
fSocket = -1;
}
TUDPSocket::TUDPSocket(const char *url, Int_t port)
: TNamed(TUrl(url).GetHost(), "")
{
R__ASSERT(gROOT);
R__ASSERT(gSystem);
fUrl = TString(url);
TString host(TUrl(fUrl).GetHost());
fService = gSystem->GetServiceByPort(port);
fSecContext = 0;
fRemoteProtocol= -1;
fServType = kSOCKD;
if (fUrl.Contains("root"))
fServType = kROOTD;
if (fUrl.Contains("proof"))
fServType = kPROOFD;
fAddress = gSystem->GetHostByName(host);
fAddress.fPort = port;
SetName(fAddress.GetHostName());
SetTitle(fService);
fBytesSent = 0;
fBytesRecv = 0;
fCompress = 0;
fUUIDs = 0;
fLastUsageMtx = 0;
ResetBit(TUDPSocket::kBrokenConn);
fSocket = gSystem->OpenConnection(host, fAddress.GetPort(), -1, "udp");
if (fSocket == -1) {
fAddress.fPort = -1;
} else {
R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfSockets()->Add(this);
}
}
TUDPSocket::TUDPSocket(const char *sockpath) : TNamed(sockpath, "")
{
R__ASSERT(gROOT);
R__ASSERT(gSystem);
fUrl = sockpath;
fService = "unix";
fSecContext = 0;
fRemoteProtocol= -1;
fServType = kSOCKD;
fAddress.fPort = -1;
fName.Form("unix:%s", sockpath);
SetTitle(fService);
fBytesSent = 0;
fBytesRecv = 0;
fCompress = 0;
fUUIDs = 0;
fLastUsageMtx = 0;
ResetBit(TUDPSocket::kBrokenConn);
fSocket = gSystem->OpenConnection(sockpath, -1, -1, "udp");
if (fSocket > 0) {
R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfSockets()->Add(this);
}
}
TUDPSocket::TUDPSocket(Int_t desc) : TNamed("", "")
{
R__ASSERT(gROOT);
R__ASSERT(gSystem);
fSecContext = 0;
fRemoteProtocol = 0;
fService = (char *)kSOCKD;
fServType = kSOCKD;
fBytesSent = 0;
fBytesRecv = 0;
fCompress = 0;
fUUIDs = 0;
fLastUsageMtx = 0;
ResetBit(TUDPSocket::kBrokenConn);
if (desc >= 0) {
fSocket = desc;
fAddress = gSystem->GetPeerName(fSocket);
R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfSockets()->Add(this);
} else
fSocket = -1;
}
TUDPSocket::TUDPSocket(Int_t desc, const char *sockpath) : TNamed(sockpath, "")
{
R__ASSERT(gROOT);
R__ASSERT(gSystem);
fUrl = sockpath;
fService = "unix";
fSecContext = 0;
fRemoteProtocol= -1;
fServType = kSOCKD;
fAddress.fPort = -1;
fName.Form("unix:%s", sockpath);
SetTitle(fService);
fBytesSent = 0;
fBytesRecv = 0;
fCompress = 0;
fUUIDs = 0;
fLastUsageMtx = 0;
ResetBit(TUDPSocket::kBrokenConn);
if (desc >= 0) {
fSocket = desc;
R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfSockets()->Add(this);
} else
fSocket = -1;
}
TUDPSocket::TUDPSocket(const TUDPSocket &s) : TNamed(s)
{
fSocket = s.fSocket;
fService = s.fService;
fAddress = s.fAddress;
fLocalAddress = s.fLocalAddress;
fBytesSent = s.fBytesSent;
fBytesRecv = s.fBytesRecv;
fCompress = s.fCompress;
fSecContext = s.fSecContext;
fRemoteProtocol = s.fRemoteProtocol;
fServType = s.fServType;
fUUIDs = 0;
fLastUsageMtx = 0;
ResetBit(TUDPSocket::kBrokenConn);
if (fSocket != -1) {
R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfSockets()->Add(this);
}
}
void TUDPSocket::Close(Option_t *option)
{
Bool_t force = option ? (!strcmp(option, "force") ? kTRUE : kFALSE) : kFALSE;
if (fSocket != -1) {
gSystem->CloseConnection(fSocket, force);
R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfSockets()->Remove(this);
}
fSocket = -1;
SafeDelete(fUUIDs);
SafeDelete(fLastUsageMtx);
}
TInetAddress TUDPSocket::GetLocalInetAddress()
{
if (IsValid()) {
if (fLocalAddress.GetPort() == -1)
fLocalAddress = gSystem->GetSockName(fSocket);
return fLocalAddress;
}
return TInetAddress();
}
Int_t TUDPSocket::GetLocalPort()
{
if (IsValid()) {
if (fLocalAddress.GetPort() == -1)
GetLocalInetAddress();
return fLocalAddress.GetPort();
}
return -1;
}
Int_t TUDPSocket::Select(Int_t interest, Long_t timeout)
{
Int_t rc = 1;
TFileHandler fh(fSocket, interest);
rc = gSystem->Select(&fh, timeout);
return rc;
}
Int_t TUDPSocket::Send(Int_t kind)
{
TMessage mess(kind);
Int_t nsent;
if ((nsent = Send(mess)) < 0)
return -1;
return nsent;
}
Int_t TUDPSocket::Send(Int_t status, Int_t kind)
{
TMessage mess(kind);
mess << status;
Int_t nsent;
if ((nsent = Send(mess)) < 0)
return -1;
return nsent;
}
Int_t TUDPSocket::Send(const char *str, Int_t kind)
{
TMessage mess(kind);
if (str) mess.WriteString(str);
Int_t nsent;
if ((nsent = Send(mess)) < 0)
return -1;
return nsent - sizeof(Int_t);
}
Int_t TUDPSocket::Send(const TMessage &mess)
{
TSystem::ResetErrno();
if (fSocket == -1) return -1;
if (mess.IsReading()) {
Error("Send", "cannot send a message used for reading");
return -1;
}
SendStreamerInfos(mess);
SendProcessIDs(mess);
mess.SetLength();
if (GetCompressionLevel() > 0 && mess.GetCompressionLevel() == 0)
const_cast<TMessage&>(mess).SetCompressionSettings(fCompress);
if (mess.GetCompressionLevel() > 0)
const_cast<TMessage&>(mess).Compress();
char *mbuf = mess.Buffer();
Int_t mlen = mess.Length();
if (mess.CompBuffer()) {
mbuf = mess.CompBuffer();
mlen = mess.CompLength();
}
ResetBit(TUDPSocket::kBrokenConn);
Int_t nsent;
if ((nsent = gSystem->SendRaw(fSocket, mbuf, mlen, 0)) <= 0) {
if (nsent == -5) {
SetBit(TUDPSocket::kBrokenConn);
Close();
}
return nsent;
}
fBytesSent += nsent;
fgBytesSent += nsent;
if (mess.What() & kMESS_ACK) {
TSystem::ResetErrno();
ResetBit(TUDPSocket::kBrokenConn);
char buf[2];
Int_t n = 0;
if ((n = gSystem->RecvRaw(fSocket, buf, sizeof(buf), 0)) < 0) {
if (n == -5) {
SetBit(TUDPSocket::kBrokenConn);
Close();
} else
n = -1;
return n;
}
if (strncmp(buf, "ok", 2)) {
Error("Send", "bad acknowledgement");
return -1;
}
fBytesRecv += 2;
fgBytesRecv += 2;
}
Touch();
return nsent - sizeof(UInt_t);
}
Int_t TUDPSocket::SendObject(const TObject *obj, Int_t kind)
{
TMessage mess(kind);
mess.WriteObject(obj);
Int_t nsent;
if ((nsent = Send(mess)) < 0)
return -1;
return nsent;
}
Int_t TUDPSocket::SendRaw(const void *buffer, Int_t length, ESendRecvOptions opt)
{
TSystem::ResetErrno();
if (fSocket == -1) return -1;
ResetBit(TUDPSocket::kBrokenConn);
Int_t nsent;
if ((nsent = gSystem->SendRaw(fSocket, buffer, length, (int) opt)) <= 0) {
if (nsent == -5) {
SetBit(TUDPSocket::kBrokenConn);
Close();
}
return nsent;
}
fBytesSent += nsent;
fgBytesSent += nsent;
Touch();
return nsent;
}
void TUDPSocket::SendStreamerInfos(const TMessage &mess)
{
if (mess.fInfos && mess.fInfos->GetEntries()) {
TIter next(mess.fInfos);
TStreamerInfo *info;
TList *minilist = 0;
while ((info = (TStreamerInfo*)next())) {
Int_t uid = info->GetNumber();
if (fBitsInfo.TestBitNumber(uid))
continue;
fBitsInfo.SetBitNumber(uid);
if (!minilist)
minilist = new TList();
if (gDebug > 0)
Info("SendStreamerInfos", "sending TStreamerInfo: %s, version = %d",
info->GetName(),info->GetClassVersion());
minilist->Add(info);
}
if (minilist) {
TMessage messinfo(kMESS_STREAMERINFO);
messinfo.WriteObject(minilist);
delete minilist;
if (messinfo.fInfos)
messinfo.fInfos->Clear();
if (Send(messinfo) < 0)
Warning("SendStreamerInfos", "problems sending TStreamerInfo's ...");
}
}
}
void TUDPSocket::SendProcessIDs(const TMessage &mess)
{
if (mess.TestBitNumber(0)) {
TObjArray *pids = TProcessID::GetPIDs();
Int_t npids = pids->GetEntries();
TProcessID *pid;
TList *minilist = 0;
for (Int_t ipid = 0; ipid < npids; ipid++) {
pid = (TProcessID*)pids->At(ipid);
if (!pid || !mess.TestBitNumber(pid->GetUniqueID()+1))
continue;
if (!fUUIDs) {
fUUIDs = new TList();
} else {
if (fUUIDs->FindObject(pid->GetTitle()))
continue;
}
fUUIDs->Add(new TObjString(pid->GetTitle()));
if (!minilist)
minilist = new TList();
if (gDebug > 0)
Info("SendProcessIDs", "sending TProcessID: %s", pid->GetTitle());
minilist->Add(pid);
}
if (minilist) {
TMessage messpid(kMESS_PROCESSID);
messpid.WriteObject(minilist);
delete minilist;
if (Send(messpid) < 0)
Warning("SendProcessIDs", "problems sending TProcessID's ...");
}
}
}
Int_t TUDPSocket::Recv(char *str, Int_t max)
{
Int_t n, kind;
ResetBit(TUDPSocket::kBrokenConn);
if ((n = Recv(str, max, kind)) <= 0) {
if (n == -5) {
SetBit(TUDPSocket::kBrokenConn);
n = -1;
}
return n;
}
if (kind != kMESS_STRING) {
Error("Recv", "got message of wrong kind (expected %d, got %d)",
kMESS_STRING, kind);
return -1;
}
return n;
}
Int_t TUDPSocket::Recv(char *str, Int_t max, Int_t &kind)
{
Int_t n;
TMessage *mess;
ResetBit(TUDPSocket::kBrokenConn);
if ((n = Recv(mess)) <= 0) {
if (n == -5) {
SetBit(TUDPSocket::kBrokenConn);
n = -1;
}
return n;
}
kind = mess->What();
if (str) {
if (mess->BufferSize() > (Int_t)sizeof(Int_t))
mess->ReadString(str, max);
else
str[0] = 0;
}
delete mess;
return n;
}
Int_t TUDPSocket::Recv(Int_t &status, Int_t &kind)
{
Int_t n;
TMessage *mess;
ResetBit(TUDPSocket::kBrokenConn);
if ((n = Recv(mess)) <= 0) {
if (n == -5) {
SetBit(TUDPSocket::kBrokenConn);
n = -1;
}
return n;
}
kind = mess->What();
(*mess) >> status;
delete mess;
return n;
}
Int_t TUDPSocket::Recv(TMessage *&mess)
{
TSystem::ResetErrno();
if (fSocket == -1) {
mess = 0;
return -1;
}
oncemore:
ResetBit(TUDPSocket::kBrokenConn);
Int_t n;
UInt_t len;
if ((n = gSystem->RecvRaw(fSocket, &len, sizeof(UInt_t), 0)) <= 0) {
if (n == 0 || n == -5) {
SetBit(TUDPSocket::kBrokenConn);
Close();
}
mess = 0;
return n;
}
len = net2host(len);
ResetBit(TUDPSocket::kBrokenConn);
char *buf = new char[len+sizeof(UInt_t)];
if ((n = gSystem->RecvRaw(fSocket, buf+sizeof(UInt_t), len, 0)) <= 0) {
if (n == 0 || n == -5) {
SetBit(TUDPSocket::kBrokenConn);
Close();
}
delete [] buf;
mess = 0;
return n;
}
fBytesRecv += n + sizeof(UInt_t);
fgBytesRecv += n + sizeof(UInt_t);
mess = new TMessage(buf, len+sizeof(UInt_t));
if (RecvStreamerInfos(mess))
goto oncemore;
if (RecvProcessIDs(mess))
goto oncemore;
if (mess->What() & kMESS_ACK) {
ResetBit(TUDPSocket::kBrokenConn);
char ok[2] = { 'o', 'k' };
Int_t n2 = 0;
if ((n2 = gSystem->SendRaw(fSocket, ok, sizeof(ok), 0)) < 0) {
if (n2 == -5) {
SetBit(TUDPSocket::kBrokenConn);
Close();
}
delete mess;
mess = 0;
return n2;
}
mess->SetWhat(mess->What() & ~kMESS_ACK);
fBytesSent += 2;
fgBytesSent += 2;
}
Touch();
return n;
}
Int_t TUDPSocket::RecvRaw(void *buffer, Int_t length, ESendRecvOptions opt)
{
TSystem::ResetErrno();
if (fSocket == -1) return -1;
if (length == 0) return 0;
ResetBit(TUDPSocket::kBrokenConn);
Int_t n;
if ((n = gSystem->RecvRaw(fSocket, buffer, length, (int) opt)) <= 0) {
if (n == 0 || n == -5) {
SetBit(TUDPSocket::kBrokenConn);
Close();
}
return n;
}
fBytesRecv += n;
fgBytesRecv += n;
Touch();
return n;
}
Bool_t TUDPSocket::RecvStreamerInfos(TMessage *mess)
{
if (mess->What() == kMESS_STREAMERINFO) {
TList *list = (TList*)mess->ReadObject(TList::Class());
TIter next(list);
TStreamerInfo *info;
TObjLink *lnk = list->FirstLink();
while (lnk) {
info = (TStreamerInfo*)lnk->GetObject();
TObject *element = info->GetElements()->UncheckedAt(0);
Bool_t isstl = element && strcmp("This",element->GetName())==0;
if (!isstl) {
info->BuildCheck();
if (gDebug > 0)
Info("RecvStreamerInfos", "importing TStreamerInfo: %s, version = %d",
info->GetName(), info->GetClassVersion());
}
lnk = lnk->Next();
}
lnk = list->FirstLink();
while (lnk) {
info = (TStreamerInfo*)lnk->GetObject();
TObject *element = info->GetElements()->UncheckedAt(0);
Bool_t isstl = element && strcmp("This",element->GetName())==0;
if (isstl) {
info->BuildCheck();
if (gDebug > 0)
Info("RecvStreamerInfos", "importing TStreamerInfo: %s, version = %d",
info->GetName(), info->GetClassVersion());
}
lnk = lnk->Next();
}
delete list;
delete mess;
return kTRUE;
}
return kFALSE;
}
Bool_t TUDPSocket::RecvProcessIDs(TMessage *mess)
{
if (mess->What() == kMESS_PROCESSID) {
TList *list = (TList*)mess->ReadObject(TList::Class());
TIter next(list);
TProcessID *pid;
while ((pid = (TProcessID*)next())) {
TObjArray *pidslist = TProcessID::GetPIDs();
TIter nextpid(pidslist);
TProcessID *p;
while ((p = (TProcessID*)nextpid())) {
if (!strcmp(p->GetTitle(), pid->GetTitle())) {
delete pid;
pid = 0;
break;
}
}
if (pid) {
if (gDebug > 0)
Info("RecvProcessIDs", "importing TProcessID: %s", pid->GetTitle());
pid->IncrementCount();
pidslist->Add(pid);
Int_t ind = pidslist->IndexOf(pid);
pid->SetUniqueID((UInt_t)ind);
}
}
delete list;
delete mess;
return kTRUE;
}
return kFALSE;
}
Int_t TUDPSocket::SetOption(ESockOptions opt, Int_t val)
{
if (fSocket == -1) return -1;
return gSystem->SetSockOpt(fSocket, opt, val);
}
Int_t TUDPSocket::GetOption(ESockOptions opt, Int_t &val)
{
if (fSocket == -1) return -1;
return gSystem->GetSockOpt(fSocket, opt, &val);
}
Int_t TUDPSocket::GetErrorCode() const
{
if (!IsValid())
return fSocket;
return 0;
}
void TUDPSocket::SetCompressionAlgorithm(Int_t algorithm)
{
if (algorithm < 0 || algorithm >= ROOT::kUndefinedCompressionAlgorithm) algorithm = 0;
if (fCompress < 0) {
fCompress = 100 * algorithm + 1;
} else {
int level = fCompress % 100;
fCompress = 100 * algorithm + level;
}
}
void TUDPSocket::SetCompressionLevel(Int_t level)
{
if (level < 0) level = 0;
if (level > 99) level = 99;
if (fCompress < 0) {
fCompress = level;
} else {
int algorithm = fCompress / 100;
if (algorithm >= ROOT::kUndefinedCompressionAlgorithm) algorithm = 0;
fCompress = 100 * algorithm + level;
}
}
void TUDPSocket::SetCompressionSettings(Int_t settings)
{
fCompress = settings;
}
void TUDPSocket::NetError(const char *where, Int_t err)
{
err = (err < kErrError) ? ((err > -1) ? err : 0) : kErrError;
if (gDebug > 0)
::Error(where, "%s", gRootdErrStr[err]);
}
ULong64_t TUDPSocket::GetSocketBytesSent()
{
return fgBytesSent;
}
ULong64_t TUDPSocket::GetSocketBytesRecv()
{
return fgBytesRecv;
}