#include "TNetXNGFile.h"
#include "TEnv.h"
#include "TSystem.h"
#include "TTimeStamp.h"
#include "TVirtualPerfStats.h"
#include "TVirtualMonitoring.h"
#include <XrdCl/XrdClURL.hh>
#include <XrdCl/XrdClFile.hh>
#include <XrdCl/XrdClXRootDResponses.hh>
#include <XrdCl/XrdClDefaultEnv.hh>
#include <iostream>
class TAsyncOpenHandler: public XrdCl::ResponseHandler
{
public:
TAsyncOpenHandler(TNetXNGFile *file)
{
fFile = file;
fFile->SetAsyncOpenStatus(TFile::kAOSInProgress);
}
virtual void HandleResponse(XrdCl::XRootDStatus *status,
XrdCl::AnyObject *response)
{
if (status->IsOK())
{
fFile->SetAsyncOpenStatus(TFile::kAOSSuccess);
}
else
{
fFile->SetAsyncOpenStatus(TFile::kAOSFailure);
}
delete response;
delete status;
delete this;
}
private:
TNetXNGFile *fFile;
};
class TAsyncReadvHandler: public XrdCl::ResponseHandler
{
public:
TAsyncReadvHandler(std::vector<XrdCl::XRootDStatus*> *statuses,
Int_t statusIndex,
TSemaphore *semaphore):
fStatuses(statuses), fStatusIndex(statusIndex), fSemaphore(semaphore) {}
virtual void HandleResponse(XrdCl::XRootDStatus *status,
XrdCl::AnyObject *response)
{
fStatuses->at(fStatusIndex) = status;
fSemaphore->Post();
delete response;
delete this;
}
private:
std::vector<XrdCl::XRootDStatus*> *fStatuses;
Int_t fStatusIndex;
TSemaphore *fSemaphore;
};
ClassImp(TNetXNGFile);
TNetXNGFile::TNetXNGFile(const char *url,
Option_t *mode,
const char *title,
Int_t compress,
Int_t ,
Bool_t parallelopen) :
TFile(url, "NET", title, compress)
{
using namespace XrdCl;
fFile = new File();
fUrl = new URL(std::string(url));
fInitCondVar = new XrdSysCondVar();
fUrl->SetProtocol(std::string("root"));
fMode = ParseOpenMode(mode);
SetEnv();
if (gMonitoringWriter) {
if (!fOpenPhases) {
fOpenPhases = new TList;
fOpenPhases->SetOwner();
}
gMonitoringWriter->SendFileOpenProgress(this, fOpenPhases, "xrdopen",
kFALSE);
}
XRootDStatus status;
if (parallelopen) {
TAsyncOpenHandler *handler = new TAsyncOpenHandler(this);
status = fFile->Open(fUrl->GetURL(), fMode, Access::None, handler);
if (!status.IsOK()) {
Error("Open", "%s", status.ToStr().c_str());
MakeZombie();
}
return;
}
status = fFile->Open(fUrl->GetURL(), fMode);
if (!status.IsOK()) {
Error("Open", "%s", status.ToStr().c_str());
MakeZombie();
return;
}
TFile::Init(false);
GetVectorReadLimits();
}
TNetXNGFile::~TNetXNGFile()
{
if (IsOpen())
Close();
delete fFile;
delete fUrl;
delete fInitCondVar;
}
void TNetXNGFile::Init(Bool_t create)
{
using namespace XrdCl;
if (fInitDone) {
if (gDebug > 1) Info("Init", "TFile::Init already called once");
return;
}
if (!IsOpen() && fAsyncOpenStatus == kAOSInProgress) {
fInitCondVar->Wait();
}
if (gMonitoringWriter)
gMonitoringWriter->SendFileOpenProgress(this, fOpenPhases, "rootinit",
kFALSE);
TFile::Init(create);
if (gMonitoringWriter)
gMonitoringWriter->SendFileOpenProgress(this, fOpenPhases, "endopen",
kTRUE);
GetVectorReadLimits();
}
Long64_t TNetXNGFile::GetSize() const
{
using namespace XrdCl;
if (!IsUseable())
return -1;
bool forceStat = true;
if( fMode == XrdCl::OpenFlags::Read )
forceStat = false;
StatInfo *info = 0;
if( !fFile->Stat( forceStat, info ).IsOK() )
return -1;
Long64_t size = info->GetSize();
delete info;
return size;
}
Bool_t TNetXNGFile::IsOpen() const
{
return fFile->IsOpen();
}
void TNetXNGFile::SetAsyncOpenStatus(EAsyncOpenStatus status)
{
fAsyncOpenStatus = status;
fInitCondVar->Signal();
}
void TNetXNGFile::Close(const Option_t *)
{
TFile::Close();
XrdCl::XRootDStatus status = fFile->Close();
if (!status.IsOK()) {
Error("Close", "%s", status.ToStr().c_str());
MakeZombie();
}
}
Int_t TNetXNGFile::ReOpen(Option_t *modestr)
{
using namespace XrdCl;
OpenFlags::Flags mode = ParseOpenMode(modestr);
if (mode != OpenFlags::Read && mode != OpenFlags::Update) {
Error("ReOpen", "mode must be either READ or UPDATE, not %s", modestr);
return 1;
}
if (mode == fMode || (mode == OpenFlags::Update
&& fMode == OpenFlags::New)) {
return 1;
}
fFile->Close();
fMode = mode;
XRootDStatus st = fFile->Open(fUrl->GetURL(), fMode);
if (!st.IsOK()) {
Error("ReOpen", "%s", st.ToStr().c_str());
return 1;
}
return 0;
}
Bool_t TNetXNGFile::ReadBuffer(char *buffer, Int_t length)
{
return ReadBuffer(buffer, fOffset, length);
}
Bool_t TNetXNGFile::ReadBuffer(char *buffer, Long64_t position, Int_t length)
{
using namespace XrdCl;
if (gDebug > 0)
Info("ReadBuffer", "offset: %lld length: %d", position, length);
if (!IsUseable())
return kTRUE;
Int_t status;
if ((status = ReadBufferViaCache(buffer, length))) {
if (status == 2)
return kTRUE;
return kFALSE;
}
uint32_t bytesRead = 0;
XRootDStatus st = fFile->Read(position, length, buffer, bytesRead);
if (gDebug > 0)
Info("ReadBuffer", "%s bytes read: %d", st.ToStr().c_str(), bytesRead);
if (!st.IsOK()) {
Error("ReadBuffer", "%s", st.ToStr().c_str());
return kTRUE;
}
fOffset += length;
fBytesRead += bytesRead;
fgBytesRead += bytesRead;
fReadCalls ++;
fgReadCalls ++;
if (gMonitoringWriter)
gMonitoringWriter->SendFileReadProgress(this);
return kFALSE;
}
Bool_t TNetXNGFile::ReadBuffers(char *buffer, Long64_t *position, Int_t *length,
Int_t nbuffs)
{
using namespace XrdCl;
if (!IsUseable())
return kTRUE;
std::vector<ChunkList> chunkLists;
ChunkList chunks;
std::vector<XRootDStatus*> *statuses;
TSemaphore *semaphore;
Int_t totalBytes = 0;
Int_t offset = 0;
Double_t start = 0;
if (gPerfStats) start = TTimeStamp();
if (fArchiveOffset)
for (Int_t i = 0; i < nbuffs; i++)
position[i] += fArchiveOffset;
for (Int_t i = 0; i < nbuffs; ++i) {
totalBytes += length[i];
if (length[i] > fReadvIorMax) {
Int_t nsplit = length[i] / fReadvIorMax;
Int_t rem = length[i] % fReadvIorMax;
Int_t j;
for (j = 0; j < nsplit; ++j) {
offset = position[i] + (j * fReadvIorMax);
chunks.push_back(ChunkInfo(offset, fReadvIorMax, buffer));
}
offset = position[i] + (j * fReadvIorMax);
chunks.push_back(ChunkInfo(offset, rem, buffer));
} else {
offset = position[i];
chunks.push_back(ChunkInfo(offset, length[i], buffer));
}
if ((Int_t) chunks.size() == fReadvIovMax) {
chunkLists.push_back(chunks);
chunks = ChunkList();
} else if ((Int_t) chunks.size() > fReadvIovMax) {
chunkLists.push_back(ChunkList(chunks.begin(),
chunks.begin() + fReadvIovMax));
chunks = ChunkList(chunks.begin() + fReadvIovMax, chunks.end());
}
}
chunkLists.push_back(chunks);
TAsyncReadvHandler *handler;
XRootDStatus status;
semaphore = new TSemaphore(0);
statuses = new std::vector<XRootDStatus*>(chunkLists.size());
std::vector<ChunkList>::iterator it;
for (it = chunkLists.begin(); it != chunkLists.end(); ++it)
{
handler = new TAsyncReadvHandler(statuses, it - chunkLists.begin(),
semaphore);
status = fFile->VectorRead(*it, buffer, handler);
if (!status.IsOK()) {
Error("ReadBuffers", "%s", status.ToStr().c_str());
return kTRUE;
}
}
for (it = chunkLists.begin(); it != chunkLists.end(); ++it) {
semaphore->Wait();
}
for (it = chunkLists.begin(); it != chunkLists.end(); ++it) {
XRootDStatus *st = statuses->at(it - chunkLists.begin());
if (!st->IsOK()) {
Error("ReadBuffers", "%s", st->ToStr().c_str());
delete statuses;
delete semaphore;
for( ; it != chunkLists.end(); ++it )
{
st = statuses->at( it - chunkLists.begin() );
delete st;
}
return kTRUE;
}
delete st;
}
fBytesRead += totalBytes;
fgBytesRead += totalBytes;
fReadCalls ++;
fgReadCalls ++;
if (gPerfStats) {
fOffset = position[0];
gPerfStats->FileReadEvent(this, position[nbuffs - 1] + length[nbuffs - 1]
- position[0], start);
}
if (gMonitoringWriter)
gMonitoringWriter->SendFileReadProgress(this);
delete statuses;
delete semaphore;
return kFALSE;
}
Bool_t TNetXNGFile::WriteBuffer(const char *buffer, Int_t length)
{
using namespace XrdCl;
if (!IsUseable())
return kTRUE;
Int_t status;
if ((status = WriteBufferViaCache(buffer, length))) {
if (status == 2)
return kTRUE;
return kFALSE;
}
XRootDStatus st = fFile->Write(fOffset, length, buffer);
if (!st.IsOK()) {
Error("WriteBuffer", "%s", st.ToStr().c_str());
return kTRUE;
}
fOffset += length;
fBytesWrite += length;
fgBytesWrite += length;
return kFALSE;
}
void TNetXNGFile::Seek(Long64_t offset, ERelativeTo position)
{
SetOffset(offset, position);
}
XrdCl::OpenFlags::Flags TNetXNGFile::ParseOpenMode(Option_t *modestr)
{
using namespace XrdCl;
OpenFlags::Flags mode = OpenFlags::None;
TString mod = ToUpper(TString(modestr));
if (mod == "NEW" || mod == "CREATE") mode = OpenFlags::New;
else if (mod == "RECREATE") mode = OpenFlags::Delete;
else if (mod == "UPDATE") mode = OpenFlags::Update;
else if (mod == "READ") mode = OpenFlags::Read;
return mode;
}
Bool_t TNetXNGFile::IsUseable() const
{
if (IsZombie()) {
Error("TNetXNGFile", "Object is in 'zombie' state");
return kFALSE;
}
if (!IsOpen()) {
Error("TNetXNGFile", "The remote file is not open");
return kFALSE;
}
return kTRUE;
}
Bool_t TNetXNGFile::GetVectorReadLimits()
{
using namespace XrdCl;
if (!IsUseable())
return kFALSE;
URL dataServer(fFile->GetDataServer());
FileSystem fs(dataServer);
Buffer arg;
Buffer *response;
arg.FromString(std::string("readv_ior_max readv_iov_max"));
XRootDStatus status = fs.Query(QueryCode::Config, arg, response);
if (!status.IsOK()) {
fReadvIorMax = 2097136;
fReadvIovMax = 1024;
return kFALSE;
}
Ssiz_t from = 0;
TString token;
fReadvIorMax = fReadvIovMax = 0;
while (TString(response->ToString()).Tokenize(token, from, "\n")) {
if (fReadvIorMax == 0) fReadvIorMax = token.Atoi();
else if (fReadvIovMax == 0) fReadvIovMax = token.Atoi();
}
delete response;
return kTRUE;
}
void TNetXNGFile::SetEnv()
{
XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
const char *cenv = 0;
TString val;
val = gEnv->GetValue("NetXNG.ConnectionWindow", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_CONNECTIONWINDOW"))
|| strlen(cenv) <= 0))
env->PutInt("ConnectionWindow", val.Atoi());
val = gEnv->GetValue("NetXNG.ConnectionRetry", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_CONNECTIONRETRY"))
|| strlen(cenv) <= 0))
env->PutInt("RequestTimeout", val.Atoi());
val = gEnv->GetValue("NetXNG.RequestTimeout", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_REQUESTTIMEOUT"))
|| strlen(cenv) <= 0))
env->PutInt("RequestTimeout", val.Atoi());
val = gEnv->GetValue("NetXNG.SubStreamsPerChannel", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_SUBSTREAMSPERCHANNEL"))
|| strlen(cenv) <= 0))
env->PutInt("SubStreamsPerChannel", val.Atoi());
val = gEnv->GetValue("NetXNG.TimeoutResolution", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_TIMEOUTRESOLUTION"))
|| strlen(cenv) <= 0))
env->PutInt("TimeoutResolution", val.Atoi());
val = gEnv->GetValue("NetXNG.StreamErrorWindow", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_STREAMERRORWINDOW"))
|| strlen(cenv) <= 0))
env->PutInt("StreamErrorWindow", val.Atoi());
val = gEnv->GetValue("NetXNG.RunForkHandler", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_RUNFORKHANDLER"))
|| strlen(cenv) <= 0))
env->PutInt("RunForkHandler", val.Atoi());
val = gEnv->GetValue("NetXNG.RedirectLimit", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_REDIRECTLIMIT"))
|| strlen(cenv) <= 0))
env->PutInt("RedirectLimit", val.Atoi());
val = gEnv->GetValue("NetXNG.WorkerThreads", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_WORKERTHREADS"))
|| strlen(cenv) <= 0))
env->PutInt("WorkerThreads", val.Atoi());
val = gEnv->GetValue("NetXNG.CPChunkSize", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_CPCHUNKSIZE"))
|| strlen(cenv) <= 0))
env->PutInt("CPChunkSize", val.Atoi());
val = gEnv->GetValue("NetXNG.CPParallelChunks", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_CPPARALLELCHUNKS"))
|| strlen(cenv) <= 0))
env->PutInt("CPParallelChunks", val.Atoi());
val = gEnv->GetValue("NetXNG.PollerPreference", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_POLLERPREFERENCE"))
|| strlen(cenv) <= 0))
env->PutString("PollerPreference", val.Data());
val = gEnv->GetValue("NetXNG.ClientMonitor", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_CLIENTMONITOR"))
|| strlen(cenv) <= 0))
env->PutString("ClientMonitor", val.Data());
val = gEnv->GetValue("NetXNG.ClientMonitorParam", "");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_CLIENTMONITORPARAM"))
|| strlen(cenv) <= 0))
env->PutString("ClientMonitorParam", val.Data());
TString netrc;
netrc.Form("%s/.rootnetrc", gSystem->HomeDirectory());
gSystem->Setenv("XrdSecNETRC", netrc.Data());
val = gEnv->GetValue("XSec.Pwd.ALogFile", "");
if (val.Length() > 0)
gSystem->Setenv("XrdSecPWDALOGFILE", val.Data());
val = gEnv->GetValue("XSec.Pwd.ServerPuk", "");
if (val.Length() > 0)
gSystem->Setenv("XrdSecPWDSRVPUK", val.Data());
val = gEnv->GetValue("XSec.GSI.CAdir", "");
if (val.Length() > 0)
gSystem->Setenv("XrdSecGSICADIR", val.Data());
val = gEnv->GetValue("XSec.GSI.CRLdir", "");
if (val.Length() > 0)
gSystem->Setenv("XrdSecGSICRLDIR", val.Data());
val = gEnv->GetValue("XSec.GSI.CRLextension", "");
if (val.Length() > 0)
gSystem->Setenv("XrdSecGSICRLEXT", val.Data());
val = gEnv->GetValue("XSec.GSI.UserCert", "");
if (val.Length() > 0)
gSystem->Setenv("XrdSecGSIUSERCERT", val.Data());
val = gEnv->GetValue("XSec.GSI.UserKey", "");
if (val.Length() > 0)
gSystem->Setenv("XrdSecGSIUSERKEY", val.Data());
val = gEnv->GetValue("XSec.GSI.UserProxy", "");
if (val.Length() > 0)
gSystem->Setenv("XrdSecGSIUSERPROXY", val.Data());
val = gEnv->GetValue("XSec.GSI.ProxyValid", "");
if (val.Length() > 0)
gSystem->Setenv("XrdSecGSIPROXYVALID", val.Data());
val = gEnv->GetValue("XSec.GSI.ProxyKeyBits", "");
if (val.Length() > 0)
gSystem->Setenv("XrdSecGSIPROXYKEYBITS", val.Data());
val = gEnv->GetValue("XSec.GSI.ProxyForward", "0");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XrdSecGSIPROXYDEPLEN"))
|| strlen(cenv) <= 0))
gSystem->Setenv("XrdSecGSIPROXYDEPLEN", val.Data());
val = gEnv->GetValue("XSec.GSI.CheckCRL", "1");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XrdSecGSICRLCHECK"))
|| strlen(cenv) <= 0))
gSystem->Setenv("XrdSecGSICRLCHECK", val.Data());
val = gEnv->GetValue("XSec.GSI.DelegProxy", "0");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XrdSecGSIDELEGPROXY"))
|| strlen(cenv) <= 0))
gSystem->Setenv("XrdSecGSIDELEGPROXY", val.Data());
val = gEnv->GetValue("XSec.GSI.SignProxy", "1");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XrdSecGSISIGNPROXY"))
|| strlen(cenv) <= 0))
gSystem->Setenv("XrdSecGSISIGNPROXY", val.Data());
val = gEnv->GetValue("XSec.Pwd.AutoLogin", "1");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XrdSecPWDAUTOLOG"))
|| strlen(cenv) <= 0))
gSystem->Setenv("XrdSecPWDAUTOLOG", val.Data());
val = gEnv->GetValue("XSec.Pwd.VerifySrv", "1");
if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XrdSecPWDVERIFYSRV"))
|| strlen(cenv) <= 0))
gSystem->Setenv("XrdSecPWDVERIFYSRV", val.Data());
}