28#include <XrdCl/XrdClURL.hh>
29#include <XrdCl/XrdClFile.hh>
30#include <XrdCl/XrdClXRootDResponses.hh>
31#include <XrdCl/XrdClDefaultEnv.hh>
32#include <XrdCl/XrdClFileSystem.hh>
33#include <XrdVersion.hh>
64 XrdCl::AnyObject *response)
override
106 XrdCl::AnyObject *response)
override
149 TNetXNGFile(url,0,mode,title,compress,netopt,parallelopen){}
153 :
TFile((lurl ? lurl : url),
154 strstr(mode,
"_WITHOUT_GLOBALREGISTRATION") != nullptr ?
"NET_WITHOUT_GLOBALREGISTRATION" :
"NET", title,
157 using namespace XrdCl;
161 if (val.
IsNull()) val =
gEnv->GetValue(
"NetXNG.Debug",
"");
162 if (!val.
IsNull()) XrdCl::DefaultEnv::SetLogLevel(val.
Data());
168 TUrl urlnoanchor(url);
169 urlnoanchor.SetAnchor(
"");
170 fUrl = new URL(std::string(urlnoanchor.GetUrl()));
181 Error(
"Open",
"could not parse open mode %s", mode);
203 status =
fFile->Open(
fUrl->GetURL(),
static_cast<XrdCl::OpenFlags::Flags
>(
fMode), Access::None, handler);
204 if (!status.IsOK()) {
205 Error(
"Open",
"%s", status.ToStr().c_str());
212 status =
fFile->Open(
fUrl->GetURL(),
static_cast<XrdCl::OpenFlags::Flags
>(
fMode));
213 if (!status.IsOK()) {
214#if XrdVNUMBER >= 40000
215 if( status.code == errRedirect )
216 fNewUrl = status.GetErrorMessage().c_str();
218 Error(
"Open",
"%s", status.ToStr().c_str());
220 Error(
"Open",
"%s", status.ToStr().c_str());
226 if( (
fMode & OpenFlags::New) || (
fMode & OpenFlags::Delete) ||
227 (
fMode & OpenFlags::Update) )
232 if( (
fMode & OpenFlags::New) || (
fMode & OpenFlags::Delete) )
257 using namespace XrdCl;
260 if (
gDebug > 1)
Info(
"Init",
"TFile::Init already called once");
293 return fArchive->GetMember()->GetDecompressedSize();
296 using namespace XrdCl;
302 bool forceStat =
true;
303 if(
fMode == XrdCl::OpenFlags::Read )
307 if( !
fFile->Stat( forceStat, info ).IsOK() )
344 XrdCl::XRootDStatus status =
fFile->Close();
345 if (!status.IsOK()) {
346 Error(
"Close",
"%s", status.ToStr().c_str());
364 using namespace XrdCl;
368 Int_t parseres = ParseOpenMode(modestr, newOpt, mode,
kFALSE);
371 if (parseres<0 || (mode != OpenFlags::Read && mode != OpenFlags::Update)) {
372 Error(
"ReOpen",
"mode must be either READ or UPDATE, not %s", modestr);
377 if (mode ==
fMode || (mode == OpenFlags::Update
378 &&
fMode == OpenFlags::New)) {
382 XRootDStatus st =
fFile->Close();
384 Error(
"ReOpen",
"%s", st.ToStr().c_str());
390 st =
fFile->Open(
fUrl->GetURL(),
static_cast<XrdCl::OpenFlags::Flags
>(
fMode));
392 Error(
"ReOpen",
"%s", st.ToStr().c_str());
421 using namespace XrdCl;
423 Info(
"ReadBuffer",
"offset: %lld length: %d", position, length);
442 uint32_t bytesRead = 0;
443 XRootDStatus st =
fFile->Read(
fOffset, length, buffer, bytesRead);
445 Info(
"ReadBuffer",
"%s bytes read: %u", st.ToStr().c_str(), bytesRead);
448 Error(
"ReadBuffer",
"%s", st.ToStr().c_str());
452 if ((
Int_t)bytesRead != length) {
453 Error(
"ReadBuffer",
"error reading all requested bytes, got %u of %d",
489 using namespace XrdCl;
495 std::vector<ChunkList> chunkLists;
497 std::vector<XRootDStatus*> *statuses;
499 Int_t totalBytes = 0;
501 char *cursor = buffer;
507 for (
Int_t i = 0; i < nbuffs; i++)
511 for (
Int_t i = 0; i < nbuffs; ++i) {
512 totalBytes += length[i];
521 for (j = 0; j < nsplit; ++j) {
523 chunks.push_back(ChunkInfo(offset,
fReadvIorMax, cursor));
529 chunks.push_back(ChunkInfo(offset, rem, cursor));
532 chunks.push_back(ChunkInfo(position[i], length[i], cursor));
538 chunkLists.push_back(chunks);
539 chunks = ChunkList();
541 chunkLists.push_back(ChunkList(chunks.begin(),
543 chunks = ChunkList(chunks.begin() +
fReadvIovMax, chunks.end());
548 if( !chunks.empty() )
549 chunkLists.push_back(chunks);
554 statuses =
new std::vector<XRootDStatus*>(chunkLists.size());
557 std::vector<ChunkList>::iterator it;
558 for (it = chunkLists.begin(); it != chunkLists.end(); ++it)
562 status =
fFile->VectorRead(*it, 0, handler);
564 if (!status.IsOK()) {
565 Error(
"ReadBuffers",
"%s", status.ToStr().c_str());
571 for (it = chunkLists.begin(); it != chunkLists.end(); ++it) {
576 for (it = chunkLists.begin(); it != chunkLists.end(); ++it) {
577 XRootDStatus *st = statuses->at(it - chunkLists.begin());
580 Error(
"ReadBuffers",
"%s", st->ToStr().c_str());
581 for( ; it != chunkLists.end(); ++it )
583 st = statuses->at( it - chunkLists.begin() );
622 using namespace XrdCl;
630 Info(
"WriteBuffer",
"file not writable");
643 XRootDStatus st =
fFile->Write(
fOffset, length, buffer);
645 Error(
"WriteBuffer",
"%s", st.ToStr().c_str());
666 Info(
"Flush",
"file not writable - do nothing");
674 XrdCl::XRootDStatus status =
fFile->Sync();
676 Error(
"Flush",
"%s", status.ToStr().c_str());
679 Info(
"Flush",
"XrdClient::Sync succeeded.");
708 using namespace XrdCl;
711 if (modestr ==
"NEW" || modestr ==
"CREATE") mode = OpenFlags::New;
712 else if (modestr ==
"RECREATE") mode = OpenFlags::Delete;
713 else if (modestr ==
"UPDATE") mode = OpenFlags::Update;
714 else if (modestr ==
"READ") mode = OpenFlags::Read;
720 mode = OpenFlags::Read;
734 Error(
"TNetXNGFile",
"Object is in 'zombie' state");
739 Error(
"TNetXNGFile",
"The remote file is not open");
752 using namespace XrdCl;
761#if XrdVNUMBER >= 40000
763 fFile->GetProperty(
"LastURL",lasturl);
767 if(lrl.GetProtocol().compare(
"file") == 0 &&
768 lrl.GetHostId().compare(
"localhost") == 0){
770 Info(
"GetVectorReadLimits",
"Local redirect, using default values");
774 std::string dataServerStr;
775 if( !
fFile->GetProperty(
"DataServer", dataServerStr ) )
777 URL dataServer(dataServerStr);
779 URL dataServer(
fFile->GetDataServer());
781 FileSystem fs(dataServer);
784 arg.FromString(std::string(
"readv_ior_max readv_iov_max"));
786 XRootDStatus status = fs.Query(QueryCode::Config, arg, response);
793 std::vector<TString> resps;
795 resps.push_back(token);
797 if (resps.size() != 2)
800 if (resps[0].IsDigit())
803 if (resps[1].IsDigit())
824 XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
825 const char *cenv = 0;
828 val =
gEnv->GetValue(
"NetXNG.ConnectionWindow",
"");
829 if (val.
Length() > 0 && (!(cenv =
gSystem->Getenv(
"XRD_CONNECTIONWINDOW"))
830 || strlen(cenv) <= 0))
831 env->PutInt(
"ConnectionWindow", val.
Atoi());
833 val =
gEnv->GetValue(
"NetXNG.ConnectionRetry",
"");
834 if (val.
Length() > 0 && (!(cenv =
gSystem->Getenv(
"XRD_CONNECTIONRETRY"))
835 || strlen(cenv) <= 0))
836 env->PutInt(
"RequestTimeout", val.
Atoi());
838 val =
gEnv->GetValue(
"NetXNG.RequestTimeout",
"");
839 if (val.
Length() > 0 && (!(cenv =
gSystem->Getenv(
"XRD_REQUESTTIMEOUT"))
840 || strlen(cenv) <= 0))
841 env->PutInt(
"RequestTimeout", val.
Atoi());
843 val =
gEnv->GetValue(
"NetXNG.SubStreamsPerChannel",
"");
844 if (val.
Length() > 0 && (!(cenv =
gSystem->Getenv(
"XRD_SUBSTREAMSPERCHANNEL"))
845 || strlen(cenv) <= 0))
846 env->PutInt(
"SubStreamsPerChannel", val.
Atoi());
848 val =
gEnv->GetValue(
"NetXNG.TimeoutResolution",
"");
849 if (val.
Length() > 0 && (!(cenv =
gSystem->Getenv(
"XRD_TIMEOUTRESOLUTION"))
850 || strlen(cenv) <= 0))
851 env->PutInt(
"TimeoutResolution", val.
Atoi());
853 val =
gEnv->GetValue(
"NetXNG.StreamErrorWindow",
"");
854 if (val.
Length() > 0 && (!(cenv =
gSystem->Getenv(
"XRD_STREAMERRORWINDOW"))
855 || strlen(cenv) <= 0))
856 env->PutInt(
"StreamErrorWindow", val.
Atoi());
858 val =
gEnv->GetValue(
"NetXNG.RunForkHandler",
"");
859 if (val.
Length() > 0 && (!(cenv =
gSystem->Getenv(
"XRD_RUNFORKHANDLER"))
860 || strlen(cenv) <= 0))
861 env->PutInt(
"RunForkHandler", val.
Atoi());
863 val =
gEnv->GetValue(
"NetXNG.RedirectLimit",
"");
864 if (val.
Length() > 0 && (!(cenv =
gSystem->Getenv(
"XRD_REDIRECTLIMIT"))
865 || strlen(cenv) <= 0))
866 env->PutInt(
"RedirectLimit", val.
Atoi());
868 val =
gEnv->GetValue(
"NetXNG.WorkerThreads",
"");
869 if (val.
Length() > 0 && (!(cenv =
gSystem->Getenv(
"XRD_WORKERTHREADS"))
870 || strlen(cenv) <= 0))
871 env->PutInt(
"WorkerThreads", val.
Atoi());
873 val =
gEnv->GetValue(
"NetXNG.CPChunkSize",
"");
874 if (val.
Length() > 0 && (!(cenv =
gSystem->Getenv(
"XRD_CPCHUNKSIZE"))
875 || strlen(cenv) <= 0))
876 env->PutInt(
"CPChunkSize", val.
Atoi());
878 val =
gEnv->GetValue(
"NetXNG.CPParallelChunks",
"");
879 if (val.
Length() > 0 && (!(cenv =
gSystem->Getenv(
"XRD_CPPARALLELCHUNKS"))
880 || strlen(cenv) <= 0))
881 env->PutInt(
"CPParallelChunks", val.
Atoi());
883 val =
gEnv->GetValue(
"NetXNG.PollerPreference",
"");
884 if (val.
Length() > 0 && (!(cenv =
gSystem->Getenv(
"XRD_POLLERPREFERENCE"))
885 || strlen(cenv) <= 0))
886 env->PutString(
"PollerPreference", val.
Data());
888 val =
gEnv->GetValue(
"NetXNG.ClientMonitor",
"");
889 if (val.
Length() > 0 && (!(cenv =
gSystem->Getenv(
"XRD_CLIENTMONITOR"))
890 || strlen(cenv) <= 0))
891 env->PutString(
"ClientMonitor", val.
Data());
893 val =
gEnv->GetValue(
"NetXNG.ClientMonitorParam",
"");
894 if (val.
Length() > 0 && (!(cenv =
gSystem->Getenv(
"XRD_CLIENTMONITORPARAM"))
895 || strlen(cenv) <= 0))
896 env->PutString(
"ClientMonitorParam", val.
Data());
899 env->PutInt(
"MultiProtocol",
gEnv->GetValue(
"TFile.CrossProtocolRedirects", 1));
903 netrc.
Form(
"%s/.rootnetrc",
gSystem->HomeDirectory());
907 val =
gEnv->GetValue(
"XSec.Pwd.ALogFile",
"");
911 val =
gEnv->GetValue(
"XSec.Pwd.ServerPuk",
"");
915 val =
gEnv->GetValue(
"XSec.GSI.CAdir",
"");
919 val =
gEnv->GetValue(
"XSec.GSI.CRLdir",
"");
923 val =
gEnv->GetValue(
"XSec.GSI.CRLextension",
"");
927 val =
gEnv->GetValue(
"XSec.GSI.UserCert",
"");
931 val =
gEnv->GetValue(
"XSec.GSI.UserKey",
"");
935 val =
gEnv->GetValue(
"XSec.GSI.UserProxy",
"");
939 val =
gEnv->GetValue(
"XSec.GSI.ProxyValid",
"");
941 gSystem->Setenv(
"XrdSecGSIPROXYVALID", val.
Data());
943 val =
gEnv->GetValue(
"XSec.GSI.ProxyKeyBits",
"");
945 gSystem->Setenv(
"XrdSecGSIPROXYKEYBITS", val.
Data());
947 val =
gEnv->GetValue(
"XSec.GSI.ProxyForward",
"0");
948 if (val.
Length() > 0 && (!(cenv =
gSystem->Getenv(
"XrdSecGSIPROXYDEPLEN"))
949 || strlen(cenv) <= 0))
950 gSystem->Setenv(
"XrdSecGSIPROXYDEPLEN", val.
Data());
952 val =
gEnv->GetValue(
"XSec.GSI.CheckCRL",
"1");
953 if (val.
Length() > 0 && (!(cenv =
gSystem->Getenv(
"XrdSecGSICRLCHECK"))
954 || strlen(cenv) <= 0))
957 val =
gEnv->GetValue(
"XSec.GSI.DelegProxy",
"0");
958 if (val.
Length() > 0 && (!(cenv =
gSystem->Getenv(
"XrdSecGSIDELEGPROXY"))
959 || strlen(cenv) <= 0))
960 gSystem->Setenv(
"XrdSecGSIDELEGPROXY", val.
Data());
962 val =
gEnv->GetValue(
"XSec.GSI.SignProxy",
"1");
963 if (val.
Length() > 0 && (!(cenv =
gSystem->Getenv(
"XrdSecGSISIGNPROXY"))
964 || strlen(cenv) <= 0))
967 val =
gEnv->GetValue(
"XSec.Pwd.AutoLogin",
"1");
968 if (val.
Length() > 0 && (!(cenv =
gSystem->Getenv(
"XrdSecPWDAUTOLOG"))
969 || strlen(cenv) <= 0))
972 val =
gEnv->GetValue(
"XSec.Pwd.VerifySrv",
"1");
973 if (val.
Length() > 0 && (!(cenv =
gSystem->Getenv(
"XrdSecPWDVERIFYSRV"))
974 || strlen(cenv) <= 0))
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
int Int_t
Signed integer 4 bytes (int).
int Ssiz_t
String size (currently int).
bool Bool_t
Boolean (0=false, 1=true) (bool).
double Double_t
Double 8 bytes.
long long Long64_t
Portable signed long integer 8 bytes.
const char Option_t
Option string (const char).
Error("WriteTObject","The current directory (%s) is not associated with a file. The object (%s) has not been written.", GetName(), objname)
void Info(const char *location, const char *msgfmt,...)
Use this function for informational messages.
TString ToUpper(const TString &s)
Return an upper-case version of str.
externTVirtualMonitoringWriter * gMonitoringWriter
void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
TAsyncOpenHandler(TNetXNGFile *file)
std::vector< XrdCl::XRootDStatus * > * fStatuses
TAsyncReadvHandler(std::vector< XrdCl::XRootDStatus * > *statuses, Int_t statusIndex, TSemaphore *semaphore)
void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override
Bool_t fWritable
True if directory is writable.
static std::atomic< Long64_t > fgBytesRead
Number of bytes read by all TFile objects.
Int_t fReadCalls
Number of read calls ( not counting the cache calls ).
Long64_t fBytesRead
Number of bytes read from this file.
TArchiveFile * fArchive
!Archive file from which we read this file
TList * fOpenPhases
!Time info about open phases
Int_t WriteBufferViaCache(const char *buf, Int_t len)
Write buffer via cache.
Int_t ReadBufferViaCache(char *buf, Int_t len)
Read buffer via cache.
Long64_t fArchiveOffset
!Offset at which file starts in archive
virtual void Init(Bool_t create)
Initialize a TFile object.
TFile(const TFile &)=delete
Long64_t GetRelOffset() const
TString fOption
File options.
EAsyncOpenStatus
Asynchronous open request status.
Bool_t FlushWriteCache()
Flush the write cache if active.
Long64_t fBytesWrite
Number of bytes written to this file.
Bool_t fInitDone
!True if the file has been initialized
virtual void SetOffset(Long64_t offset, ERelativeTo pos=kBeg)
Set position from where to start reading.
Long64_t fOffset
!Seek offset cache
static std::atomic< Long64_t > fgBytesWrite
Number of bytes written by all TFile objects.
EAsyncOpenStatus fAsyncOpenStatus
!Status of an asynchronous open request
void Close(Option_t *option="") override
Close a file.
static std::atomic< Int_t > fgReadCalls
Number of bytes read from all TFile objects.
Enables access to XRootD files using the new client.
void Seek(Long64_t offset, ERelativeTo position=kBeg) override
Set the position within the file.
virtual void SetEnv()
Map ROOT and xrootd environment variables.
virtual Bool_t IsUseable() const
Check the file is open and isn't a zombie.
virtual void SetAsyncOpenStatus(EAsyncOpenStatus status)
Set the status of an asynchronous file open.
Bool_t ReadBuffer(char *buffer, Int_t length) override
Read a data chunk of the given size.
void Close(const Option_t *option="") override
Close the file.
Bool_t ReadBuffers(char *buffer, Long64_t *position, Int_t *length, Int_t nbuffs) override
Read scattered data chunks in one operation.
void Flush() override
Synchronize a file's in-memory and on-disk states.
Bool_t WriteBuffer(const char *buffer, Int_t length) override
Write a data chunk.
Int_t ReOpen(Option_t *modestr) override
Reopen the file with the new access mode.
XrdSysCondVar * fInitCondVar
Long64_t GetSize() const override
Get the file size.
virtual ~TNetXNGFile()
Destructor.
void Init(Bool_t create) override
Initialize the file.
virtual Bool_t GetVectorReadLimits()
Find the server-specific readv config params.
Bool_t IsOpen() const override
Check if the file is open.
Int_t Wait()
If the semaphore value is > 0 then decrement it and carry on, else block, waiting on the condition un...
Int_t Atoi() const
Return integer value of string.
const char * Data() const
TObjArray * Tokenize(const TString &delim) const
This function is used to isolate sequential tokens in a TString.
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
The TTimeStamp encapsulates seconds and ns since EPOCH.