27 #include <XrdCl/XrdClURL.hh>
28 #include <XrdCl/XrdClFile.hh>
29 #include <XrdCl/XrdClXRootDResponses.hh>
30 #include <XrdCl/XrdClDefaultEnv.hh>
31 #include <XrdVersion.hh>
38 class TAsyncOpenHandler:
public XrdCl::ResponseHandler
55 virtual void HandleResponse(XrdCl::XRootDStatus *status,
56 XrdCl::AnyObject *response)
80 class TAsyncReadvHandler:
public XrdCl::ResponseHandler
87 TAsyncReadvHandler(std::vector<XrdCl::XRootDStatus*> *statuses,
90 fStatuses(statuses), fStatusIndex(statusIndex), fSemaphore(semaphore) {}
97 virtual void HandleResponse(XrdCl::XRootDStatus *status,
98 XrdCl::AnyObject *response)
100 fStatuses->at(fStatusIndex) =
status;
107 std::vector<XrdCl::XRootDStatus*> *fStatuses;
131 TFile(url,
"NET", title, compress)
133 using namespace XrdCl;
139 TUrl urlnoanchor(url);
141 fUrl =
new URL(std::string(urlnoanchor.
GetUrl()));
146 fUrl->SetProtocol(std::string(
"root"));
152 Error(
"Open",
"could not parse open mode %s", mode);
173 TAsyncOpenHandler *handler =
new TAsyncOpenHandler(
this);
175 if (!status.IsOK()) {
176 Error(
"Open",
"%s", status.ToStr().c_str());
184 if (!status.IsOK()) {
185 #if XrdVNUMBER >= 40000
186 if( status.code == errRedirect )
187 fNewUrl = status.GetErrorMessage().c_str();
189 Error(
"Open",
"%s", status.ToStr().c_str());
191 Error(
"Open",
"%s", status.ToStr().c_str());
197 if( (
fMode & OpenFlags::New) || (
fMode & OpenFlags::Delete) ||
198 (
fMode & OpenFlags::Update) )
203 if( (
fMode & OpenFlags::New) || (
fMode & OpenFlags::Delete) )
229 using namespace XrdCl;
232 if (
gDebug > 1)
Info(
"Init",
"TFile::Init already called once");
264 using namespace XrdCl;
270 bool forceStat =
true;
271 if(
fMode == XrdCl::OpenFlags::Read )
275 if( !
fFile->Stat( forceStat, info ).IsOK() )
287 return fFile->IsOpen();
310 XrdCl::XRootDStatus status =
fFile->Close();
311 if (!status.IsOK()) {
312 Error(
"Close",
"%s", status.ToStr().c_str());
328 using namespace XrdCl;
330 OpenFlags::Flags mode;
335 if (parseres<0 || (mode != OpenFlags::Read && mode != OpenFlags::Update)) {
336 Error(
"ReOpen",
"mode must be either READ or UPDATE, not %s", modestr);
341 if (mode ==
fMode || (mode == OpenFlags::Update
342 &&
fMode == OpenFlags::New)) {
352 Error(
"ReOpen",
"%s", st.ToStr().c_str());
381 using namespace XrdCl;
383 Info(
"ReadBuffer",
"offset: %lld length: %d", position, length);
402 uint32_t bytesRead = 0;
403 XRootDStatus st =
fFile->Read(
fOffset, length, buffer, bytesRead);
405 Info(
"ReadBuffer",
"%s bytes read: %d", st.ToStr().c_str(), bytesRead);
408 Error(
"ReadBuffer",
"%s", st.ToStr().c_str());
443 using namespace XrdCl;
449 std::vector<ChunkList> chunkLists;
451 std::vector<XRootDStatus*> *statuses;
453 Int_t totalBytes = 0;
455 char *cursor = buffer;
461 for (
Int_t i = 0; i < nbuffs; i++)
465 for (
Int_t i = 0; i < nbuffs; ++i) {
466 totalBytes += length[i];
475 for (j = 0; j < nsplit; ++j) {
477 chunks.push_back(ChunkInfo(offset,
fReadvIorMax, cursor));
483 chunks.push_back(ChunkInfo(offset, rem, cursor));
486 chunks.push_back(ChunkInfo(position[i], length[i], cursor));
492 chunkLists.push_back(chunks);
493 chunks = ChunkList();
495 chunkLists.push_back(ChunkList(chunks.begin(),
497 chunks = ChunkList(chunks.begin() +
fReadvIovMax, chunks.end());
502 if( !chunks.empty() )
503 chunkLists.push_back(chunks);
505 TAsyncReadvHandler *handler;
508 statuses =
new std::vector<XRootDStatus*>(chunkLists.size());
511 std::vector<ChunkList>::iterator it;
512 for (it = chunkLists.begin(); it != chunkLists.end(); ++it)
514 handler =
new TAsyncReadvHandler(statuses, it - chunkLists.begin(),
516 status =
fFile->VectorRead(*it, 0, handler);
518 if (!status.IsOK()) {
519 Error(
"ReadBuffers",
"%s", status.ToStr().c_str());
525 for (it = chunkLists.begin(); it != chunkLists.end(); ++it) {
530 for (it = chunkLists.begin(); it != chunkLists.end(); ++it) {
531 XRootDStatus *st = statuses->at(it - chunkLists.begin());
534 Error(
"ReadBuffers",
"%s", st->ToStr().c_str());
535 for( ; it != chunkLists.end(); ++it )
537 st = statuses->at( it - chunkLists.begin() );
556 gPerfStats->FileReadEvent(
this, totalBytes, start);
576 using namespace XrdCl;
584 Info(
"WriteBuffer",
"file not writable");
597 XRootDStatus st =
fFile->Write(
fOffset, length, buffer);
599 Error(
"WriteBuffer",
"%s", st.ToStr().c_str());
620 Info(
"Flush",
"file not writable - do nothing");
628 XrdCl::XRootDStatus status =
fFile->Sync();
630 Error(
"Flush",
"%s", status.ToStr().c_str());
633 Info(
"Flush",
"XrdClient::Sync succeeded.");
659 XrdCl::OpenFlags::Flags &mode,
662 using namespace XrdCl;
665 if (modestr ==
"NEW" || modestr ==
"CREATE") mode = OpenFlags::New;
666 else if (modestr ==
"RECREATE") mode = OpenFlags::Delete;
667 else if (modestr ==
"UPDATE") mode = OpenFlags::Update;
668 else if (modestr ==
"READ") mode = OpenFlags::Read;
674 mode = OpenFlags::Read;
686 Error(
"TNetXNGFile",
"Object is in 'zombie' state");
691 Error(
"TNetXNGFile",
"The remote file is not open");
704 using namespace XrdCl;
713 #if XrdVNUMBER >= 40000
714 std::string dataServerStr;
715 if( !
fFile->GetProperty(
"DataServer", dataServerStr ) )
717 URL dataServer(dataServerStr);
719 URL dataServer(
fFile->GetDataServer());
721 FileSystem fs(dataServer);
724 arg.FromString(std::string(
"readv_ior_max readv_iov_max"));
726 XRootDStatus status = fs.Query(QueryCode::Config, arg, response);
733 std::vector<TString> resps;
734 while (
TString(response->ToString()).Tokenize(token, from,
"\n"))
735 resps.push_back(token);
737 if (resps.size() != 2)
740 if (resps[0].IsDigit())
743 if (resps[1].IsDigit())
764 XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
765 const char *cenv = 0;
770 || strlen(cenv) <= 0))
771 env->PutInt(
"ConnectionWindow", val.
Atoi());
775 || strlen(cenv) <= 0))
776 env->PutInt(
"RequestTimeout", val.
Atoi());
780 || strlen(cenv) <= 0))
781 env->PutInt(
"RequestTimeout", val.
Atoi());
783 val =
gEnv->
GetValue(
"NetXNG.SubStreamsPerChannel",
"");
785 || strlen(cenv) <= 0))
786 env->PutInt(
"SubStreamsPerChannel", val.
Atoi());
790 || strlen(cenv) <= 0))
791 env->PutInt(
"TimeoutResolution", val.
Atoi());
795 || strlen(cenv) <= 0))
796 env->PutInt(
"StreamErrorWindow", val.
Atoi());
800 || strlen(cenv) <= 0))
801 env->PutInt(
"RunForkHandler", val.
Atoi());
805 || strlen(cenv) <= 0))
806 env->PutInt(
"RedirectLimit", val.
Atoi());
810 || strlen(cenv) <= 0))
811 env->PutInt(
"WorkerThreads", val.
Atoi());
815 || strlen(cenv) <= 0))
816 env->PutInt(
"CPChunkSize", val.
Atoi());
820 || strlen(cenv) <= 0))
821 env->PutInt(
"CPParallelChunks", val.
Atoi());
825 || strlen(cenv) <= 0))
826 env->PutString(
"PollerPreference", val.
Data());
830 || strlen(cenv) <= 0))
831 env->PutString(
"ClientMonitor", val.
Data());
835 || strlen(cenv) <= 0))
836 env->PutString(
"ClientMonitorParam", val.
Data());
839 env->PutInt(
"MultiProtocol",
gEnv->
GetValue(
"TFile.CrossProtocolRedirects", 1));
889 || strlen(cenv) <= 0))
894 || strlen(cenv) <= 0))
899 || strlen(cenv) <= 0))
904 || strlen(cenv) <= 0))
909 || strlen(cenv) <= 0))
914 || strlen(cenv) <= 0))
virtual Long64_t GetSize() const
Get the file size.
virtual Bool_t WriteBuffer(const char *buffer, Int_t length)
Write a data chunk.
virtual void Init(Bool_t create)
Initialize the file.
Long64_t fBytesWrite
Number of bytes written to this file.
EAsyncOpenStatus
Asynchronous open request status.
Long64_t GetRelOffset() const
static std::atomic< Long64_t > fgBytesRead
Number of bytes read by all TFile objects.
virtual void SetOffset(Long64_t offset, ERelativeTo pos=kBeg)
Set position from where to start reading.
This class represents a WWW compatible URL.
virtual Bool_t SendFileOpenProgress(TFile *, TList *, const char *, Bool_t=kFALSE)
virtual void SetEnv()
Map ROOT and xrootd environment variables.
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
Int_t Wait(Int_t millisec=0)
If semaphore value is > 0 then decrement it and carry on.
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format...
virtual const char * HomeDirectory(const char *userName=0)
Return the user's home directory.
virtual Bool_t IsUseable() const
Check the file is open and isn't a zombie.
Int_t WriteBufferViaCache(const char *buf, Int_t len)
Write buffer via cache.
Int_t fReadCalls
Number of read calls ( not counting the cache calls )
virtual void Close(const Option_t *option="")
Close the file.
const char * Data() const
R__EXTERN TVirtualMonitoringWriter * gMonitoringWriter
virtual const char * Getenv(const char *env)
Get environment variable.
Int_t Atoi() const
Return integer value of string.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
virtual void Setenv(const char *name, const char *value)
Set environment variable.
Double_t length(const TVector2 &v)
R__EXTERN TSystem * gSystem
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
virtual ~TNetXNGFile()
Destructor.
virtual Int_t ReOpen(Option_t *modestr)
Reopen the file with the new access mode.
virtual void Seek(Long64_t offset, ERelativeTo position=kBeg)
Set the position within the file.
EAsyncOpenStatus fAsyncOpenStatus
!Status of an asynchronous open request
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
static std::atomic< Long64_t > fgBytesWrite
Number of bytes written by all TFile objects.
static std::atomic< Int_t > fgReadCalls
Number of bytes read from all TFile objects.
Int_t ParseOpenMode(Option_t *in, TString &modestr, XrdCl::OpenFlags::Flags &mode, Bool_t assumeRead)
Parse a file open mode given as a string into a canonically formatted output mode string and an integ...
virtual Bool_t ReadBuffer(char *buffer, Int_t length)
Read a data chunk of the given size.
virtual void Init(Bool_t create)
Initialize a TFile object.
XrdCl::OpenFlags::Flags fMode
Int_t ReadBufferViaCache(char *buf, Int_t len)
Read buffer via cache.
Bool_t fWritable
True if directory is writable.
virtual Bool_t IsOpen() const
Check if the file is open.
const char * GetUrl(Bool_t withDeflt=kFALSE) const
Return full URL.
virtual Bool_t ReadBuffers(char *buffer, Long64_t *position, Int_t *length, Int_t nbuffs)
Read scattered data chunks in one operation.
void SetAnchor(const char *anchor)
The TTimeStamp encapsulates seconds and ns since EPOCH.
TList * fOpenPhases
!Time info about open phases
virtual Bool_t GetVectorReadLimits()
Find the server-specific readv config params.
TString fOption
File options.
virtual Bool_t SendFileReadProgress(TFile *)
XrdSysCondVar * fInitCondVar
virtual void Flush()
Synchronize a file's in-memory and on-disk states.
Bool_t FlushWriteCache()
Flush the write cache if active.
virtual void SetAsyncOpenStatus(EAsyncOpenStatus status)
Set the status of an asynchronous file open.
TString ToUpper(const TString &s)
Return an upper-case version of str.
Long64_t fArchiveOffset
!Offset at which file starts in archive
Long64_t fOffset
!Seek offset cache
Bool_t fInitDone
!True if the file has been initialized
Long64_t fBytesRead
Number of bytes read from this file.
virtual void Close(Option_t *option="")
Close a file.