#include "TNetXNGFile.h"
#include "TEnv.h"
#include "TSystem.h"
#include "TTimeStamp.h"
#include "TVirtualPerfStats.h"
#include "TVirtualMonitoring.h"
#include <XrdCl/XrdClURL.hh>
#include <XrdCl/XrdClFile.hh>
#include <XrdCl/XrdClXRootDResponses.hh>
#include <XrdCl/XrdClDefaultEnv.hh>
#include <XrdVersion.hh>
#include <iostream>
class TAsyncOpenHandler: public XrdCl::ResponseHandler
{
public:
TAsyncOpenHandler(TNetXNGFile *file)
{
fFile = file;
fFile->SetAsyncOpenStatus(TFile::kAOSInProgress);
}
virtual void HandleResponse(XrdCl::XRootDStatus *status,
XrdCl::AnyObject *response)
{
if (status->IsOK())
{
fFile->SetAsyncOpenStatus(TFile::kAOSSuccess);
}
else
{
fFile->SetAsyncOpenStatus(TFile::kAOSFailure);
}
delete response;
delete status;
delete this;
}
private:
TNetXNGFile *fFile;
};
class TAsyncReadvHandler: public XrdCl::ResponseHandler
{
public:
TAsyncReadvHandler(std::vector<XrdCl::XRootDStatus*> *statuses,
Int_t statusIndex,
TSemaphore *semaphore):
fStatuses(statuses), fStatusIndex(statusIndex), fSemaphore(semaphore) {}
virtual void HandleResponse(XrdCl::XRootDStatus *status,
XrdCl::AnyObject *response)
{
fStatuses->at(fStatusIndex) = status;
fSemaphore->Post();
delete response;
delete this;
}
private:
std::vector<XrdCl::XRootDStatus*> *fStatuses;
Int_t fStatusIndex;
TSemaphore *fSemaphore;
};
ClassImp(TNetXNGFile);
TNetXNGFile::TNetXNGFile(const char *url,
Option_t *mode,
const char *title,
Int_t compress,
Int_t ,
Bool_t parallelopen) :
TFile(url, "NET", title, compress)
{
using namespace XrdCl;
{
TUrl urlnoanchor(url);
urlnoanchor.SetAnchor("");
fUrl = new URL(std::string(urlnoanchor.GetUrl()));
}
fFile = new File();
fInitCondVar = new XrdSysCondVar();
fUrl->SetProtocol(std::string("root"));
fQueryReadVParams = 1;
fReadvIorMax = 2097136;
fReadvIovMax = 1024;
if (ParseOpenMode(mode, fOption, fMode, kTRUE)<0) {
Error("Open", "could not parse open mode %s", mode);
MakeZombie();
return;
}
SetEnv();
if (gMonitoringWriter) {
if (!fOpenPhases) {
fOpenPhases = new TList;
fOpenPhases->SetOwner();
}
gMonitoringWriter->SendFileOpenProgress(this, fOpenPhases, "xrdopen",
kFALSE);
}
XRootDStatus status;
if (parallelopen) {
TAsyncOpenHandler *handler = new TAsyncOpenHandler(this);
status = fFile->Open(fUrl->GetURL(), fMode, Access::None, handler);
if (!status.IsOK()) {
Error("Open", "%s", status.ToStr().c_str());
MakeZombie();
}
return;
}
status = fFile->Open(fUrl->GetURL(), fMode);
if (!status.IsOK()) {
#if XrdVNUMBER >= 40000
if( status.code == errRedirect )
fNewUrl = status.GetErrorMessage().c_str();
else
Error("Open", "%s", status.ToStr().c_str());
#else
Error("Open", "%s", status.ToStr().c_str());
#endif
MakeZombie();
return;
}
if( (fMode & OpenFlags::New) || (fMode & OpenFlags::Delete) ||
(fMode & OpenFlags::Update) )
fWritable = true;
bool create = false;
if( (fMode & OpenFlags::New) || (fMode & OpenFlags::Delete) )
create = true;
TFile::Init(create);
GetVectorReadLimits();
}
TNetXNGFile::~TNetXNGFile()
{
if (IsOpen())
Close();
delete fFile;
delete fUrl;
delete fInitCondVar;
}
void TNetXNGFile::Init(Bool_t create)
{
using namespace XrdCl;
if (fInitDone) {
if (gDebug > 1) Info("Init", "TFile::Init already called once");
return;
}
if (!IsOpen() && fAsyncOpenStatus == kAOSInProgress) {
fInitCondVar->Wait();
}
if (gMonitoringWriter)
gMonitoringWriter->SendFileOpenProgress(this, fOpenPhases, "rootinit",
kFALSE);
TFile::Init(create);
if (gMonitoringWriter)
gMonitoringWriter->SendFileOpenProgress(this, fOpenPhases, "endopen",
kTRUE);
GetVectorReadLimits();
}
Long64_t TNetXNGFile::GetSize() const
{
using namespace XrdCl;
if (!IsUseable())
return -1;
bool forceStat = true;
if( fMode == XrdCl::OpenFlags::Read )
forceStat = false;
StatInfo *info = 0;
if( !fFile->Stat( forceStat, info ).IsOK() )
return -1;
Long64_t size = info->GetSize();
delete info;
return size;
}
Bool_t TNetXNGFile::IsOpen() const
{
return fFile->IsOpen();
}
void TNetXNGFile::SetAsyncOpenStatus(EAsyncOpenStatus status)
{
fAsyncOpenStatus = status;
fInitCondVar->Signal();
}
void TNetXNGFile::Close(const Option_t *)
{
TFile::Close();
XrdCl::XRootDStatus status = fFile->Close();
if (!status.IsOK()) {
Error("Close", "%s", status.ToStr().c_str());
MakeZombie();
}
}
Int_t TNetXNGFile::ReOpen(Option_t *modestr)
{
using namespace XrdCl;
TString newOpt;
OpenFlags::Flags mode;
Int_t parseres = ParseOpenMode(modestr, newOpt, mode, kFALSE);
if (parseres<0 || (mode != OpenFlags::Read && mode != OpenFlags::Update)) {
Error("ReOpen", "mode must be either READ or UPDATE, not %s", modestr);
return 1;
}
if (mode == fMode || (mode == OpenFlags::Update
&& fMode == OpenFlags::New)) {
return 1;
}
fFile->Close();
fOption = newOpt;
fMode = mode;
XRootDStatus st = fFile->Open(fUrl->GetURL(), fMode);
if (!st.IsOK()) {
Error("ReOpen", "%s", st.ToStr().c_str());
return 1;
}
return 0;
}
Bool_t TNetXNGFile::ReadBuffer(char *buffer, Int_t length)
{
return ReadBuffer(buffer, GetRelOffset(), length);
}
Bool_t TNetXNGFile::ReadBuffer(char *buffer, Long64_t position, Int_t length)
{
using namespace XrdCl;
if (gDebug > 0)
Info("ReadBuffer", "offset: %lld length: %d", position, length);
if (!IsUseable())
return kTRUE;
SetOffset(position);
Int_t status;
if ((status = ReadBufferViaCache(buffer, length))) {
if (status == 2)
return kTRUE;
return kFALSE;
}
uint32_t bytesRead = 0;
XRootDStatus st = fFile->Read(fOffset, length, buffer, bytesRead);
if (gDebug > 0)
Info("ReadBuffer", "%s bytes read: %d", st.ToStr().c_str(), bytesRead);
if (!st.IsOK()) {
Error("ReadBuffer", "%s", st.ToStr().c_str());
return kTRUE;
}
fOffset += bytesRead;
fBytesRead += bytesRead;
fgBytesRead += bytesRead;
fReadCalls ++;
fgReadCalls ++;
if (gMonitoringWriter)
gMonitoringWriter->SendFileReadProgress(this);
return kFALSE;
}
Bool_t TNetXNGFile::ReadBuffers(char *buffer, Long64_t *position, Int_t *length,
Int_t nbuffs)
{
using namespace XrdCl;
if (!IsUseable())
return kTRUE;
std::vector<ChunkList> chunkLists;
ChunkList chunks;
std::vector<XRootDStatus*> *statuses;
TSemaphore *semaphore;
Int_t totalBytes = 0;
Long64_t offset = 0;
char *cursor = buffer;
Double_t start = 0;
if (gPerfStats) start = TTimeStamp();
if (fArchiveOffset)
for (Int_t i = 0; i < nbuffs; i++)
position[i] += fArchiveOffset;
for (Int_t i = 0; i < nbuffs; ++i) {
totalBytes += length[i];
if (length[i] > fReadvIorMax) {
Int_t nsplit = length[i] / fReadvIorMax;
Int_t rem = length[i] % fReadvIorMax;
Int_t j;
for (j = 0; j < nsplit; ++j) {
offset = position[i] + (j * fReadvIorMax);
chunks.push_back(ChunkInfo(offset, fReadvIorMax, cursor));
cursor += fReadvIorMax;
}
offset = position[i] + (j * fReadvIorMax);
chunks.push_back(ChunkInfo(offset, rem, cursor));
cursor += rem;
} else {
chunks.push_back(ChunkInfo(position[i], length[i], cursor));
cursor += length[i];
}
if ((Int_t) chunks.size() == fReadvIovMax) {
chunkLists.push_back(chunks);
chunks = ChunkList();
} else if ((Int_t) chunks.size() > fReadvIovMax) {
chunkLists.push_back(ChunkList(chunks.begin(),
chunks.begin() + fReadvIovMax));
chunks = ChunkList(chunks.begin() + fReadvIovMax, chunks.end());
}
}
if( !chunks.empty() )
chunkLists.push_back(chunks);
TAsyncReadvHandler *handler;
XRootDStatus status;
semaphore = new TSemaphore(0);
statuses = new std::vector<XRootDStatus*>(chunkLists.size());
std::vector<ChunkList>::iterator it;
for (it = chunkLists.begin(); it != chunkLists.end(); ++it)
{
handler = new TAsyncReadvHandler(statuses, it - chunkLists.begin(),
semaphore);
status = fFile->VectorRead(*it, 0, handler);
if (!status.IsOK()) {
Error("ReadBuffers", "%s", status.ToStr().c_str());
return kTRUE;
}
}
for (it = chunkLists.begin(); it != chunkLists.end(); ++it) {
semaphore->Wait();
}
for (it = chunkLists.begin(); it != chunkLists.end(); ++it) {
XRootDStatus *st = statuses->at(it - chunkLists.begin());
if (!st->IsOK()) {
Error("ReadBuffers", "%s", st->ToStr().c_str());
for( ; it != chunkLists.end(); ++it )
{
st = statuses->at( it - chunkLists.begin() );
delete st;
}
delete statuses;
delete semaphore;
return kTRUE;
}
delete st;
}
fBytesRead += totalBytes;
fgBytesRead += totalBytes;
fReadCalls ++;
fgReadCalls ++;
if (gPerfStats) {
fOffset = position[0];
gPerfStats->FileReadEvent(this, position[nbuffs - 1] + length[nbuffs - 1]
- position[0], start);
}
if (gMonitoringWriter)
gMonitoringWriter->SendFileReadProgress(this);
delete statuses;
delete semaphore;
return kFALSE;
}
Bool_t TNetXNGFile::WriteBuffer(const char *buffer, Int_t length)
{
using namespace XrdCl;
if (!IsUseable())
return kTRUE;
if (!fWritable) {
if (gDebug > 1)
Info("WriteBuffer", "file not writable");
return kTRUE;
}
Int_t status;
if ((status = WriteBufferViaCache(buffer, length))) {
if (status == 2)
return kTRUE;
return kFALSE;
}
XRootDStatus st = fFile->Write(fOffset, length, buffer);
if (!st.IsOK()) {
Error("WriteBuffer", "%s", st.ToStr().c_str());
return kTRUE;
}
fOffset += length;
fBytesWrite += length;
fgBytesWrite += length;
return kFALSE;
}
void TNetXNGFile::Flush()
{
if (!IsUseable())
return;
if (!fWritable) {
if (gDebug > 1)
Info("Flush", "file not writable - do nothing");
return;
}
FlushWriteCache();
XrdCl::XRootDStatus status = fFile->Sync();
if( !status.IsOK() )
Error("Flush", "%s", status.ToStr().c_str());
if (gDebug > 1)
Info("Flush", "XrdClient::Sync succeeded.");
}
void TNetXNGFile::Seek(Long64_t offset, ERelativeTo position)
{
SetOffset(offset, position);
}
Int_t TNetXNGFile::ParseOpenMode(Option_t *in, TString &modestr,
XrdCl::OpenFlags::Flags &mode,
Bool_t assumeRead)
{
using namespace XrdCl;
modestr = ToUpper(TString(in));
if (modestr == "NEW" || modestr == "CREATE") mode = OpenFlags::New;
else if (modestr == "RECREATE") mode = OpenFlags::Delete;
else if (modestr == "UPDATE") mode = OpenFlags::Update;
else if (modestr == "READ") mode = OpenFlags::Read;
else {
if (!assumeRead) {
return -1;
}
modestr = "READ";
mode = OpenFlags::Read;
}
return 0;
}
Bool_t TNetXNGFile::IsUseable() const
{
if (IsZombie()) {
Error("TNetXNGFile", "Object is in 'zombie' state");
return kFALSE;
}
if (!IsOpen()) {
Error("TNetXNGFile", "The remote file is not open");
return kFALSE;
}
return kTRUE;
}
Bool_t TNetXNGFile::GetVectorReadLimits()
{
using namespace XrdCl;
if (!IsUseable())
return kFALSE;
if (!fQueryReadVParams)
return kTRUE;
#if XrdVNUMBER >= 40000
std::string dataServerStr;
if( !fFile->GetProperty( "DataServer", dataServerStr ) )
return kFALSE;
URL dataServer(dataServerStr);
#else
URL dataServer(fFile->GetDataServer());
#endif
FileSystem fs(dataServer);
Buffer arg;
Buffer *response;
arg.FromString(std::string("readv_ior_max readv_iov_max"));
XRootDStatus status = fs.Query(QueryCode::Config, arg, response);
if (!status.IsOK())
return kFALSE;
Ssiz_t from = 0;
TString token;
std::vector<TString> resps;
while (TString(response->ToString()).Tokenize(token, from, "\n"))
resps.push_back(token);
if (resps.size() != 2)
return kFALSE;
if (resps[0].IsDigit())
fReadvIorMax = resps[0].Atoi();
if (resps[1].IsDigit())
fReadvIovMax = resps[1].Atoi();
delete response;
if( fReadvIovMax == 0x7FFFFFFF )
{
fReadvIovMax = 1024;
fReadvIorMax = 2097136;
}
return kTRUE;
}
void TNetXNGFile::SetEnv()
{
XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
const char *cenv = 0;
TString val;
val = gEnv->GetValue("NetXNG.ConnectionWindow", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_CONNECTIONWINDOW"))
|| strlen(cenv) <= 0))
env->PutInt("ConnectionWindow", val.Atoi());
val = gEnv->GetValue("NetXNG.ConnectionRetry", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_CONNECTIONRETRY"))
|| strlen(cenv) <= 0))
env->PutInt("RequestTimeout", val.Atoi());
val = gEnv->GetValue("NetXNG.RequestTimeout", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_REQUESTTIMEOUT"))
|| strlen(cenv) <= 0))
env->PutInt("RequestTimeout", val.Atoi());
val = gEnv->GetValue("NetXNG.SubStreamsPerChannel", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_SUBSTREAMSPERCHANNEL"))
|| strlen(cenv) <= 0))
env->PutInt("SubStreamsPerChannel", val.Atoi());
val = gEnv->GetValue("NetXNG.TimeoutResolution", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_TIMEOUTRESOLUTION"))
|| strlen(cenv) <= 0))
env->PutInt("TimeoutResolution", val.Atoi());
val = gEnv->GetValue("NetXNG.StreamErrorWindow", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_STREAMERRORWINDOW"))
|| strlen(cenv) <= 0))
env->PutInt("StreamErrorWindow", val.Atoi());
val = gEnv->GetValue("NetXNG.RunForkHandler", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_RUNFORKHANDLER"))
|| strlen(cenv) <= 0))
env->PutInt("RunForkHandler", val.Atoi());
val = gEnv->GetValue("NetXNG.RedirectLimit", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_REDIRECTLIMIT"))
|| strlen(cenv) <= 0))
env->PutInt("RedirectLimit", val.Atoi());
val = gEnv->GetValue("NetXNG.WorkerThreads", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_WORKERTHREADS"))
|| strlen(cenv) <= 0))
env->PutInt("WorkerThreads", val.Atoi());
val = gEnv->GetValue("NetXNG.CPChunkSize", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_CPCHUNKSIZE"))
|| strlen(cenv) <= 0))
env->PutInt("CPChunkSize", val.Atoi());
val = gEnv->GetValue("NetXNG.CPParallelChunks", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_CPPARALLELCHUNKS"))
|| strlen(cenv) <= 0))
env->PutInt("CPParallelChunks", val.Atoi());
val = gEnv->GetValue("NetXNG.PollerPreference", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_POLLERPREFERENCE"))
|| strlen(cenv) <= 0))
env->PutString("PollerPreference", val.Data());
val = gEnv->GetValue("NetXNG.ClientMonitor", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_CLIENTMONITOR"))
|| strlen(cenv) <= 0))
env->PutString("ClientMonitor", val.Data());
val = gEnv->GetValue("NetXNG.ClientMonitorParam", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_CLIENTMONITORPARAM"))
|| strlen(cenv) <= 0))
env->PutString("ClientMonitorParam", val.Data());
fQueryReadVParams = gEnv->GetValue("NetXNG.QueryReadVParams", 1);
env->PutInt( "MultiProtocol", gEnv->GetValue("TFile.CrossProtocolRedirects", 1));
TString netrc;
netrc.Form("%s/.rootnetrc", gSystem->HomeDirectory());
gSystem->Setenv("XrdSecNETRC", netrc.Data());
val = gEnv->GetValue("XSec.Pwd.ALogFile", "");
if (val.Length() > 0)
gSystem->Setenv("XrdSecPWDALOGFILE", val.Data());
val = gEnv->GetValue("XSec.Pwd.ServerPuk", "");
if (val.Length() > 0)
gSystem->Setenv("XrdSecPWDSRVPUK", val.Data());
val = gEnv->GetValue("XSec.GSI.CAdir", "");
if (val.Length() > 0)
gSystem->Setenv("XrdSecGSICADIR", val.Data());
val = gEnv->GetValue("XSec.GSI.CRLdir", "");
if (val.Length() > 0)
gSystem->Setenv("XrdSecGSICRLDIR", val.Data());
val = gEnv->GetValue("XSec.GSI.CRLextension", "");
if (val.Length() > 0)
gSystem->Setenv("XrdSecGSICRLEXT", val.Data());
val = gEnv->GetValue("XSec.GSI.UserCert", "");
if (val.Length() > 0)
gSystem->Setenv("XrdSecGSIUSERCERT", val.Data());
val = gEnv->GetValue("XSec.GSI.UserKey", "");
if (val.Length() > 0)
gSystem->Setenv("XrdSecGSIUSERKEY", val.Data());
val = gEnv->GetValue("XSec.GSI.UserProxy", "");
if (val.Length() > 0)
gSystem->Setenv("XrdSecGSIUSERPROXY", val.Data());
val = gEnv->GetValue("XSec.GSI.ProxyValid", "");
if (val.Length() > 0)
gSystem->Setenv("XrdSecGSIPROXYVALID", val.Data());
val = gEnv->GetValue("XSec.GSI.ProxyKeyBits", "");
if (val.Length() > 0)
gSystem->Setenv("XrdSecGSIPROXYKEYBITS", val.Data());
val = gEnv->GetValue("XSec.GSI.ProxyForward", "0");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XrdSecGSIPROXYDEPLEN"))
|| strlen(cenv) <= 0))
gSystem->Setenv("XrdSecGSIPROXYDEPLEN", val.Data());
val = gEnv->GetValue("XSec.GSI.CheckCRL", "1");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XrdSecGSICRLCHECK"))
|| strlen(cenv) <= 0))
gSystem->Setenv("XrdSecGSICRLCHECK", val.Data());
val = gEnv->GetValue("XSec.GSI.DelegProxy", "0");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XrdSecGSIDELEGPROXY"))
|| strlen(cenv) <= 0))
gSystem->Setenv("XrdSecGSIDELEGPROXY", val.Data());
val = gEnv->GetValue("XSec.GSI.SignProxy", "1");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XrdSecGSISIGNPROXY"))
|| strlen(cenv) <= 0))
gSystem->Setenv("XrdSecGSISIGNPROXY", val.Data());
val = gEnv->GetValue("XSec.Pwd.AutoLogin", "1");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XrdSecPWDAUTOLOG"))
|| strlen(cenv) <= 0))
gSystem->Setenv("XrdSecPWDAUTOLOG", val.Data());
val = gEnv->GetValue("XSec.Pwd.VerifySrv", "1");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XrdSecPWDVERIFYSRV"))
|| strlen(cenv) <= 0))
gSystem->Setenv("XrdSecPWDVERIFYSRV", val.Data());
}