// End_Html
// End_Html
#include <errno.h>
#include "Bytes.h"
#include "NetErrors.h"
#include "TApplication.h"
#include "TEnv.h"
#include "TNetFile.h"
#include "TPSocket.h"
#include "TROOT.h"
#include "TSysEvtHandler.h"
#include "TSystem.h"
#include "TTimeStamp.h"
#include "TVirtualPerfStats.h"
ClassImp(TNetFile)
ClassImp(TNetSystem)
TNetFile::TNetFile(const char *url, Option_t *option, const char *ftitle,
Int_t compress, Int_t netopt)
: TFile(url, "NET", ftitle, compress), fEndpointUrl(url)
{
fSocket = 0;
Create(url, option, netopt);
}
TNetFile::TNetFile(const char *url, const char *ftitle, Int_t compress, Bool_t)
: TFile(url, "NET", ftitle, compress), fEndpointUrl(url)
{
fSocket = 0;
fProtocol = 0;
fErrorCode = 0;
fNetopt = 0;
}
TNetFile::~TNetFile()
{
Close();
}
Int_t TNetFile::SysOpen(const char * , Int_t , UInt_t )
{
if (!fSocket) {
Create(fUrl.GetUrl(), fOption, fNetopt);
if (!fSocket) return -1;
} else {
if (fProtocol > 15) {
fSocket->Send(Form("%s %s", fUrl.GetFile(), ToLower(fOption).Data()),
kROOTD_OPEN);
} else {
fSocket->Send(Form("/%s %s", fUrl.GetFile(), ToLower(fOption).Data()),
kROOTD_OPEN);
}
EMessageTypes kind;
int stat;
Recv(stat, kind);
if (kind == kROOTD_ERR) {
PrintError("SysOpen", stat);
return -1;
}
}
return -2;
}
Int_t TNetFile::SysClose(Int_t )
{
if (fSocket)
fSocket->Send(kROOTD_CLOSE);
return 0;
}
Int_t TNetFile::SysStat(Int_t, Long_t *id, Long64_t *size, Long_t *flags, Long_t *modtime)
{
if (fProtocol < 3) return 1;
if (!fSocket) return 1;
fSocket->Send(kROOTD_FSTAT);
char msg[1024];
Int_t kind;
fSocket->Recv(msg, sizeof(msg), kind);
Int_t mode, uid, gid, islink;
Long_t dev, ino;
if (fProtocol > 12) {
#ifdef R__WIN32
sscanf(msg, "%ld %ld %d %d %d %I64d %ld %d", &dev, &ino, &mode,
&uid, &gid, size, modtime, &islink);
#else
sscanf(msg, "%ld %ld %d %d %d %lld %ld %d", &dev, &ino, &mode,
&uid, &gid, size, modtime, &islink);
#endif
if (dev == -1)
return 1;
if (id)
*id = (dev << 24) + ino;
if (flags) {
*flags = 0;
if (mode & (kS_IXUSR|kS_IXGRP|kS_IXOTH))
*flags |= 1;
if (R_ISDIR(mode))
*flags |= 2;
if (!R_ISREG(mode) && !R_ISDIR(mode))
*flags |= 4;
}
} else {
#ifdef R__WIN32
sscanf(msg, "%ld %I64d %ld %ld", id, size, flags, modtime);
#else
sscanf(msg, "%ld %lld %ld %ld", id, size, flags, modtime);
#endif
if (*id == -1)
return 1;
}
return 0;
}
void TNetFile::Close(Option_t *opt)
{
if (!fSocket) return;
TFile::Close(opt);
if (fProtocol > 6)
fSocket->Send(kROOTD_BYE);
SafeDelete(fSocket);
fD = -1;
}
void TNetFile::Flush()
{
FlushWriteCache();
if (fSocket && fWritable)
fSocket->Send(kROOTD_FLUSH);
}
void TNetFile::Init(Bool_t create)
{
Seek(0);
TFile::Init(create);
fD = -2;
}
Bool_t TNetFile::IsOpen() const
{
return fSocket == 0 ? kFALSE : kTRUE;
}
void TNetFile::Print(Option_t *) const
{
const char *fname = fUrl.GetFile();
Printf("URL: %s", ((TUrl*)&fUrl)->GetUrl());
Printf("Remote file: %s", &fname[1]);
Printf("Remote user: %s", fUser.Data());
Printf("Title: %s", fTitle.Data());
Printf("Option: %s", fOption.Data());
Printf("Bytes written: %g", fBytesWrite);
Printf("Bytes read: %g", fBytesRead);
}
void TNetFile::PrintError(const char *where, Int_t err)
{
fErrorCode = err;
Error(where, gRootdErrStr[err]);
}
Int_t TNetFile::ReOpen(Option_t *mode)
{
if (fProtocol < 7) {
Error("ReOpen", "operation not supported by remote rootd (protocol = %d)",
fProtocol);
return 1;
}
return TFile::ReOpen(mode);
}
Bool_t TNetFile::ReadBuffer(char *buf, Int_t len)
{
if (!fSocket) return kTRUE;
if (len == 0)
return kFALSE;
Bool_t result = kFALSE;
Int_t st;
if ((st = ReadBufferViaCache(buf, len))) {
if (st == 2)
return kTRUE;
return kFALSE;
}
if (gApplication && gApplication->GetSignalHandler())
gApplication->GetSignalHandler()->Delay();
Double_t start = 0;
if (gPerfStats != 0) start = TTimeStamp();
if (fSocket->Send(Form("%lld %d", fOffset, len), kROOTD_GET) < 0) {
Error("ReadBuffer", "error sending kROOTD_GET command");
result = kTRUE;
goto end;
}
Int_t stat, n;
EMessageTypes kind;
fErrorCode = -1;
if (Recv(stat, kind) < 0 || kind == kROOTD_ERR) {
PrintError("ReadBuffer", stat);
result = kTRUE;
goto end;
}
while ((n = fSocket->RecvRaw(buf, len)) < 0 && TSystem::GetErrno() == EINTR)
TSystem::ResetErrno();
if (n != len) {
Error("ReadBuffer", "error receiving buffer of length %d, got %d", len, n);
result = kTRUE;
goto end;
}
fOffset += len;
fBytesRead += len;
fReadCalls++;
#ifdef R__WIN32
SetFileBytesRead(GetFileBytesRead() + len);
SetFileReadCalls(GetFileReadCalls() + 1);
#else
fgBytesRead += len;
fgReadCalls++;
#endif
end:
if (gPerfStats != 0) {
gPerfStats->FileReadEvent(this, len, double(TTimeStamp())-start);
}
if (gApplication && gApplication->GetSignalHandler())
gApplication->GetSignalHandler()->HandleDelayedSignal();
return result;
}
Bool_t TNetFile::ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
{
if (!fSocket) return kTRUE;
if (fProtocol < 17)
return TFile::ReadBuffers(buf, pos, len, nbuf);
Int_t stat;
Int_t blockSize = 262144;
Bool_t result = kFALSE;
EMessageTypes kind;
TString data_buf;
if (gApplication && gApplication->GetSignalHandler())
gApplication->GetSignalHandler()->Delay();
Double_t start = 0;
if (gPerfStats != 0) start = TTimeStamp();
Long64_t total_len = 0;
Long64_t actual_pos;
for(Int_t i = 0; i < nbuf; i++) {
data_buf += pos[i] + fArchiveOffset;
data_buf += "-";
data_buf += len[i];
data_buf += "/";
total_len += len[i];
}
if (fSocket->Send(Form("%d %d %d", nbuf, data_buf.Length(), blockSize),
kROOTD_GETS) < 0) {
Error("ReadBuffers", "error sending kROOTD_GETS command");
result = kTRUE;
goto end;
}
if (fSocket->SendRaw(data_buf, data_buf.Length()) < 0) {
Error("ReadBuffers", "error sending buffer");
result = kTRUE;
goto end;
}
fErrorCode = -1;
if (Recv(stat, kind) < 0 || kind == kROOTD_ERR) {
PrintError("ReadBuffers", stat);
result = kTRUE;
goto end;
}
actual_pos = 0;
while (actual_pos < total_len) {
Long64_t left = total_len - actual_pos;
if (left > blockSize)
left = blockSize;
Int_t n;
while ((n = fSocket->RecvRaw(buf + actual_pos, Int_t(left))) < 0 &&
TSystem::GetErrno() == EINTR)
TSystem::ResetErrno();
if (n != Int_t(left)) {
Error("GetBuffers", "error receiving buffer of length %d, got %d",
Int_t(left), n);
result = kTRUE ;
goto end;
}
actual_pos += left;
}
fBytesRead += total_len;
fReadCalls++;
#ifdef R__WIN32
SetFileBytesRead(GetFileBytesRead() + total_len);
SetFileReadCalls(GetFileReadCalls() + 1);
#else
fgBytesRead += total_len;
fgReadCalls++;
#endif
end:
if (gPerfStats != 0) {
gPerfStats->FileReadEvent(this, total_len, double(TTimeStamp())-start);
}
if (gApplication && gApplication->GetSignalHandler())
gApplication->GetSignalHandler()->HandleDelayedSignal();
if (result) {
if (gDebug > 0)
Info("ReadBuffers", "Couldnt use the specific implementation, calling TFile::ReadBuffers");
return TFile::ReadBuffers(buf, pos, len, nbuf);
}
return result;
}
Bool_t TNetFile::WriteBuffer(const char *buf, Int_t len)
{
if (!fSocket || !fWritable) return kTRUE;
Bool_t result = kFALSE;
Int_t st;
if ((st = WriteBufferViaCache(buf, len))) {
if (st == 2)
return kTRUE;
return kFALSE;
}
gSystem->IgnoreInterrupt();
if (fSocket->Send(Form("%lld %d", fOffset, len), kROOTD_PUT) < 0) {
SetBit(kWriteError);
Error("WriteBuffer", "error sending kROOTD_PUT command");
result = kTRUE;
goto end;
}
if (fSocket->SendRaw(buf, len) < 0) {
SetBit(kWriteError);
Error("WriteBuffer", "error sending buffer");
result = kTRUE;
goto end;
}
Int_t stat;
EMessageTypes kind;
fErrorCode = -1;
if (Recv(stat, kind) < 0 || kind == kROOTD_ERR) {
SetBit(kWriteError);
PrintError("WriteBuffer", stat);
result = kTRUE;
goto end;
}
fOffset += len;
fBytesWrite += len;
#ifdef R__WIN32
SetFileBytesWritten(GetFileBytesWritten() + len);
#else
fgBytesWrite += len;
#endif
end:
gSystem->IgnoreInterrupt(kFALSE);
return result;
}
Int_t TNetFile::Recv(Int_t &status, EMessageTypes &kind)
{
kind = kROOTD_ERR;
status = 0;
if (!fSocket) return -1;
Int_t what;
Int_t n = fSocket->Recv(status, what);
kind = (EMessageTypes) what;
return n;
}
void TNetFile::Seek(Long64_t offset, ERelativeTo pos)
{
switch (pos) {
case kBeg:
fOffset = offset + fArchiveOffset;
break;
case kCur:
fOffset += offset;
break;
case kEnd:
if (fArchiveOffset)
Error("Seek", "seeking from end in archive is not (yet) supported");
fOffset = fEND + offset;
break;
}
}
void TNetFile::ConnectServer(Int_t *stat, EMessageTypes *kind, Int_t netopt,
Int_t tcpwindowsize, Bool_t forceOpen,
Bool_t forceRead)
{
TString fn = fUrl.GetFile();
Int_t sSize = netopt < -1 ? -netopt : 1;
TString url(fUrl.GetProtocol());
if (url.Contains("root")) {
url.Insert(4,"dp");
} else {
url = "rootdp";
}
url += TString(Form("://%s@%s:%d",
fUrl.GetUser(), fUrl.GetHost(), fUrl.GetPort()));
fSocket = TSocket::CreateAuthSocket(url, sSize, tcpwindowsize, fSocket);
if (!fSocket || !fSocket->IsAuthenticated()) {
if (sSize > 1)
Error("TNetFile", "can't open %d-stream connection to rootd on "
"host %s at port %d", sSize, fUrl.GetHost(), fUrl.GetPort());
else
Error("TNetFile", "can't open connection to rootd on "
"host %s at port %d", fUrl.GetHost(), fUrl.GetPort());
*kind = kROOTD_ERR;
*stat = (Int_t)kErrAuthNotOK;
goto zombie;
}
fProtocol = fSocket->GetRemoteProtocol();
if (forceRead && fProtocol < 5) {
Warning("ConnectServer", "rootd does not support \"+read\" option");
forceRead = kFALSE;
}
if (fProtocol < 16)
fn.Insert(0,"/");
if (forceOpen)
fSocket->Send(Form("%s %s", fn.Data(),
ToLower("f"+fOption).Data()), kROOTD_OPEN);
else if (forceRead)
fSocket->Send(Form("%s %s", fn.Data(), "+read"), kROOTD_OPEN);
else
fSocket->Send(Form("%s %s", fn.Data(),
ToLower(fOption).Data()), kROOTD_OPEN);
EMessageTypes tmpkind;
int tmpstat;
Recv(tmpstat, tmpkind);
*stat = tmpstat;
*kind = tmpkind;
return;
zombie:
MakeZombie();
SafeDelete(fSocket);
gDirectory = gROOT;
}
void TNetFile::Create(const char * , Option_t *option, Int_t netopt)
{
Int_t tcpwindowsize = 65535;
fErrorCode = -1;
fNetopt = netopt;
fOption = option;
Bool_t forceOpen = kFALSE;
if (option[0] == '-') {
fOption = &option[1];
forceOpen = kTRUE;
}
if (option[0] == 'F' || option[0] == 'f') {
fOption = &option[1];
forceOpen = kTRUE;
}
Bool_t forceRead = kFALSE;
if (!strcasecmp(option, "+read")) {
fOption = &option[1];
forceRead = kTRUE;
}
fOption.ToUpper();
if (fOption == "NEW")
fOption = "CREATE";
Bool_t create = (fOption == "CREATE") ? kTRUE : kFALSE;
Bool_t recreate = (fOption == "RECREATE") ? kTRUE : kFALSE;
Bool_t update = (fOption == "UPDATE") ? kTRUE : kFALSE;
Bool_t read = (fOption == "READ") ? kTRUE : kFALSE;
if (!create && !recreate && !update && !read) {
read = kTRUE;
fOption = "READ";
}
if (!fUrl.IsValid()) {
Error("Create", "invalid URL specified: %s", fUrl.GetUrl());
goto zombie;
}
if (netopt > tcpwindowsize)
tcpwindowsize = netopt;
EMessageTypes kind;
Int_t stat;
ConnectServer(&stat, &kind, netopt, tcpwindowsize, forceOpen, forceRead);
if (gDebug > 2) Info("Create", "got from host %d %d", stat, kind);
if (kind == kROOTD_ERR) {
PrintError("Create", stat);
Error("Create", "failing on file %s", fUrl.GetUrl());
goto zombie;
}
if (recreate) {
recreate = kFALSE;
create = kTRUE;
fOption = "CREATE";
}
if (update && stat > 1) {
update = kFALSE;
create = kTRUE;
stat = 1;
}
if (stat == 1)
fWritable = kTRUE;
else
fWritable = kFALSE;
Init(create);
return;
zombie:
MakeZombie();
SafeDelete(fSocket);
gDirectory = gROOT;
}
void TNetFile::Create(TSocket *s, Option_t *option, Int_t netopt)
{
fSocket = s;
Create(s->GetUrl(), option, netopt);
}
Bool_t TNetFile::Matches(const char *url)
{
Bool_t rc = TFile::Matches(url);
if (rc)
return kTRUE;
TUrl u(url);
if (!strcmp(u.GetFile(),fEndpointUrl.GetFile())) {
TString fqdn = u.GetHostFQDN();
if (u.GetPort() == fEndpointUrl.GetPort()) {
TString fqdnref = fEndpointUrl.GetHostFQDN();
if (fqdn == fqdnref)
return kTRUE;
}
}
return kFALSE;
}
TNetSystem::TNetSystem(Bool_t ftpowner)
: TSystem("-root", "Net file Helper System")
{
SetName("root");
fDir = kFALSE;
fDirp = 0;
fFTP = 0;
fFTPOwner = ftpowner;
fUser = "";
fHost = "";
fPort = -1;
fIsLocal = kFALSE;
}
TNetSystem::TNetSystem(const char *url, Bool_t ftpowner)
: TSystem("-root", "Net file Helper System")
{
SetName("root");
fFTPOwner = ftpowner;
fIsLocal = kFALSE;
Create(url);
}
void TNetSystem::InitRemoteEntity(const char *url)
{
TUrl turl(url);
fUser = turl.GetUser();
if (!fUser.Length()) {
UserGroup_t *u = gSystem->GetUserInfo();
if (u)
fUser = u->fUser;
delete u;
}
fHost = turl.GetHostFQDN();
fPort = turl.GetPort();
}
void TNetSystem::Create(const char *url, TSocket *sock)
{
TString surl(url);
if (!surl.Contains("://")) {
surl.Insert(surl.Index(":")+1,"//");
}
TUrl turl(surl);
fDir = kFALSE;
fDirp = 0;
fFTP = 0;
fLocalPrefix = "";
fIsLocal = kFALSE;
Bool_t forceRemote = gEnv->GetValue("Path.ForceRemote", 0);
TString opts = TUrl(url).GetOptions();
if (opts.Contains("remote=1"))
forceRemote = kTRUE;
else if (opts.Contains("remote=0"))
forceRemote = kFALSE;
if (!forceRemote) {
if ((fIsLocal = gSystem->IsPathLocal(url))) {
fLocalPrefix = gEnv->GetValue("Path.Localroot","");
return;
}
}
InitRemoteEntity(surl);
if (fHost.Length()) {
TString eurl = "";
if (strlen(turl.GetProtocol())) {
eurl = turl.GetProtocol();
eurl += "://";
} else
eurl = "root://";
if (strlen(turl.GetUser())) {
eurl += turl.GetUser();
eurl += "@";
}
eurl += fHost;
eurl += ":";
eurl += turl.GetPort();
fFTP = new TFTP(eurl, 1, TFTP::kDfltWindowSize, sock);
if (fFTP && fFTP->IsOpen()) {
if (fFTP->GetSocket()->GetRemoteProtocol() < 12) {
Error("TNetSystem",
"remote daemon does not support 'system' functionality");
fFTP->Close();
delete fFTP;
} else {
fUser = fFTP->GetSocket()->GetSecContext()->GetUser();
fHost = fFTP->GetSocket()->GetSecContext()->GetHost();
if (fFTPOwner)
gROOT->GetListOfSockets()->Remove(fFTP);
}
}
}
}
TNetSystem::~TNetSystem()
{
if (fFTPOwner) {
if (fFTP) {
if (fFTP->IsOpen()) {
if (fDir) {
fFTP->FreeDirectory(kFALSE);
fDir = kFALSE;
}
fFTP->Close();
}
delete fFTP;
}
}
fDirp = 0;
fFTP = 0;
}
Int_t TNetSystem::MakeDirectory(const char *dir)
{
if (fIsLocal) {
TString edir = TUrl(dir).GetFile();
if (!fLocalPrefix.IsNull())
edir.Insert(0, fLocalPrefix);
return gSystem->MakeDirectory(edir);
}
if (fFTP && fFTP->IsOpen()) {
TString edir = TUrl(dir).GetFile();
return fFTP->MakeDirectory(edir,kFALSE);
}
return -1;
}
void *TNetSystem::OpenDirectory(const char *dir)
{
if (fIsLocal) {
TString edir = TUrl(dir).GetFile();
if (!fLocalPrefix.IsNull())
edir.Insert(0, fLocalPrefix);
return gSystem->OpenDirectory(edir);
}
if (!fFTP || !fFTP->IsOpen())
return (void *)0;
if (fDir) {
if (gDebug > 0)
Info("OpenDirectory", "a directory is already open: close it first");
fFTP->FreeDirectory(kFALSE);
fDir = kFALSE;
}
TString edir = TUrl(dir).GetFile();
if (fFTP->OpenDirectory(edir,kFALSE)) {
fDir = kTRUE;
fDirp = (void *)&fDir;
return fDirp;
} else
return (void *)0;
}
void TNetSystem::FreeDirectory(void *dirp)
{
if (fIsLocal) {
gSystem->FreeDirectory(dirp);
return;
}
if (dirp != fDirp) {
Error("FreeDirectory", "invalid directory pointer (should never happen)");
return;
}
if (fFTP && fFTP->IsOpen()) {
if (fDir) {
fFTP->FreeDirectory(kFALSE);
fDir = kFALSE;
fDirp = 0;
}
}
}
const char *TNetSystem::GetDirEntry(void *dirp)
{
if (fIsLocal) {
return gSystem->GetDirEntry(dirp);
}
if (dirp != fDirp) {
Error("GetDirEntry", "invalid directory pointer (should never happen)");
return 0;
}
if (fFTP && fFTP->IsOpen() && fDir) {
return fFTP->GetDirEntry(kFALSE);
}
return 0;
}
Int_t TNetSystem::GetPathInfo(const char *path, FileStat_t &buf)
{
if (fIsLocal) {
TString epath = TUrl(path).GetFile();
if (!fLocalPrefix.IsNull())
epath.Insert(0, fLocalPrefix);
return gSystem->GetPathInfo(epath, buf);
}
if (fFTP && fFTP->IsOpen()) {
TString epath = TUrl(path).GetFile();
fFTP->GetPathInfo(epath, buf, kFALSE);
return 0;
}
return 1;
}
Bool_t TNetSystem::AccessPathName(const char *path, EAccessMode mode)
{
if (fIsLocal) {
TString epath = TUrl(path).GetFile();
if (!fLocalPrefix.IsNull())
epath.Insert(0, fLocalPrefix);
return gSystem->AccessPathName(epath, mode);
}
if (fFTP && fFTP->IsOpen()) {
TString epath = TUrl(path).GetFile();
return fFTP->AccessPathName(epath, mode, kFALSE);
}
return kTRUE;
}
Bool_t TNetSystem::ConsistentWith(const char *path, void *dirptr)
{
Bool_t checkstd = TSystem::ConsistentWith(path, dirptr);
if (!checkstd) return kFALSE;
Bool_t checknet = path ? kFALSE : kTRUE;
if (path && strlen(path)) {
TUrl url(path);
TString user = url.GetUser();
if (!user.Length()) {
UserGroup_t *u = gSystem->GetUserInfo();
if (u)
user = u->fUser;
delete u;
}
TString host = url.GetHostFQDN();
Int_t port = url.GetPort();
if (user == fUser && host == fHost && port == fPort)
checknet = kTRUE;
}
return (checkstd && checknet);
}
Int_t TNetSystem::Unlink(const char *path)
{
if (fIsLocal) {
TString epath = TUrl(path).GetFile();
if (!fLocalPrefix.IsNull())
epath.Insert(0, fLocalPrefix);
return gSystem->Unlink(epath);
}
Warning("Unlink", "funtionality not implemented - ignored (path: %s)", path);
return -1;
}
Last change: Fri Jul 4 14:51:24 2008
Last generated: 2008-07-04 14:51
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.