27 #include <XrdCl/XrdClURL.hh> 28 #include <XrdCl/XrdClFile.hh> 29 #include <XrdCl/XrdClXRootDResponses.hh> 30 #include <XrdCl/XrdClDefaultEnv.hh> 31 #include <XrdVersion.hh> 38 class TAsyncOpenHandler:
public XrdCl::ResponseHandler
55 virtual void HandleResponse(XrdCl::XRootDStatus *status,
56 XrdCl::AnyObject *response)
80 class TAsyncReadvHandler:
public XrdCl::ResponseHandler
87 TAsyncReadvHandler(std::vector<XrdCl::XRootDStatus*> *statuses,
90 fStatuses(statuses), fStatusIndex(statusIndex), fSemaphore(semaphore) {}
97 virtual void HandleResponse(XrdCl::XRootDStatus *status,
98 XrdCl::AnyObject *response)
100 fStatuses->at(fStatusIndex) = status;
107 std::vector<XrdCl::XRootDStatus*> *fStatuses;
131 TFile(url,
"NET", title, compress)
133 using namespace XrdCl;
139 TUrl urlnoanchor(url);
141 fUrl =
new URL(std::string(urlnoanchor.
GetUrl()));
146 fUrl->SetProtocol(std::string(
"root"));
152 Error(
"Open",
"could not parse open mode %s", mode);
173 TAsyncOpenHandler *handler =
new TAsyncOpenHandler(
this);
175 if (!status.IsOK()) {
176 Error(
"Open",
"%s", status.ToStr().c_str());
184 if (!status.IsOK()) {
185 #if XrdVNUMBER >= 40000 186 if( status.code == errRedirect )
187 fNewUrl = status.GetErrorMessage().c_str();
189 Error(
"Open",
"%s", status.ToStr().c_str());
191 Error(
"Open",
"%s", status.ToStr().c_str());
197 if( (
fMode & OpenFlags::New) || (
fMode & OpenFlags::Delete) ||
198 (
fMode & OpenFlags::Update) )
203 if( (
fMode & OpenFlags::New) || (
fMode & OpenFlags::Delete) )
229 using namespace XrdCl;
232 if (
gDebug > 1)
Info(
"Init",
"TFile::Init already called once");
264 using namespace XrdCl;
270 bool forceStat =
true;
271 if(
fMode == XrdCl::OpenFlags::Read )
275 if( !
fFile->Stat( forceStat, info ).IsOK() )
287 return fFile->IsOpen();
310 XrdCl::XRootDStatus status =
fFile->Close();
311 if (!status.IsOK()) {
312 Error(
"Close",
"%s", status.ToStr().c_str());
328 using namespace XrdCl;
330 OpenFlags::Flags mode;
335 if (parseres<0 || (mode != OpenFlags::Read && mode != OpenFlags::Update)) {
336 Error(
"ReOpen",
"mode must be either READ or UPDATE, not %s", modestr);
341 if (mode ==
fMode || (mode == OpenFlags::Update
342 &&
fMode == OpenFlags::New)) {
346 XRootDStatus st =
fFile->Close();
348 Error(
"ReOpen",
"%s", st.ToStr().c_str());
356 Error(
"ReOpen",
"%s", st.ToStr().c_str());
385 using namespace XrdCl;
387 Info(
"ReadBuffer",
"offset: %lld length: %d", position, length);
406 uint32_t bytesRead = 0;
407 XRootDStatus st =
fFile->Read(
fOffset, length, buffer, bytesRead);
409 Info(
"ReadBuffer",
"%s bytes read: %u", st.ToStr().c_str(), bytesRead);
412 Error(
"ReadBuffer",
"%s", st.ToStr().c_str());
416 if ((
Int_t)bytesRead != length) {
417 Error(
"ReadBuffer",
"error reading all requested bytes, got %u of %d",
453 using namespace XrdCl;
459 std::vector<ChunkList> chunkLists;
461 std::vector<XRootDStatus*> *statuses;
463 Int_t totalBytes = 0;
465 char *cursor = buffer;
471 for (
Int_t i = 0; i < nbuffs; i++)
475 for (
Int_t i = 0; i < nbuffs; ++i) {
476 totalBytes += length[i];
485 for (j = 0; j < nsplit; ++j) {
487 chunks.push_back(ChunkInfo(offset,
fReadvIorMax, cursor));
493 chunks.push_back(ChunkInfo(offset, rem, cursor));
496 chunks.push_back(ChunkInfo(position[i], length[i], cursor));
502 chunkLists.push_back(chunks);
503 chunks = ChunkList();
505 chunkLists.push_back(ChunkList(chunks.begin(),
507 chunks = ChunkList(chunks.begin() +
fReadvIovMax, chunks.end());
512 if( !chunks.empty() )
513 chunkLists.push_back(chunks);
515 TAsyncReadvHandler *handler;
518 statuses =
new std::vector<XRootDStatus*>(chunkLists.size());
521 std::vector<ChunkList>::iterator it;
522 for (it = chunkLists.begin(); it != chunkLists.end(); ++it)
524 handler =
new TAsyncReadvHandler(statuses, it - chunkLists.begin(),
526 status =
fFile->VectorRead(*it, 0, handler);
528 if (!status.IsOK()) {
529 Error(
"ReadBuffers",
"%s", status.ToStr().c_str());
535 for (it = chunkLists.begin(); it != chunkLists.end(); ++it) {
540 for (it = chunkLists.begin(); it != chunkLists.end(); ++it) {
541 XRootDStatus *st = statuses->at(it - chunkLists.begin());
544 Error(
"ReadBuffers",
"%s", st->ToStr().c_str());
545 for( ; it != chunkLists.end(); ++it )
547 st = statuses->at( it - chunkLists.begin() );
566 gPerfStats->FileReadEvent(
this, totalBytes, start);
586 using namespace XrdCl;
594 Info(
"WriteBuffer",
"file not writable");
607 XRootDStatus st =
fFile->Write(
fOffset, length, buffer);
609 Error(
"WriteBuffer",
"%s", st.ToStr().c_str());
630 Info(
"Flush",
"file not writable - do nothing");
638 XrdCl::XRootDStatus status =
fFile->Sync();
640 Error(
"Flush",
"%s", status.ToStr().c_str());
643 Info(
"Flush",
"XrdClient::Sync succeeded.");
669 XrdCl::OpenFlags::Flags &mode,
672 using namespace XrdCl;
673 modestr =
ToUpper(TString(in));
675 if (modestr ==
"NEW" || modestr ==
"CREATE") mode = OpenFlags::New;
676 else if (modestr ==
"RECREATE") mode = OpenFlags::Delete;
677 else if (modestr ==
"UPDATE") mode = OpenFlags::Update;
678 else if (modestr ==
"READ") mode = OpenFlags::Read;
684 mode = OpenFlags::Read;
696 Error(
"TNetXNGFile",
"Object is in 'zombie' state");
701 Error(
"TNetXNGFile",
"The remote file is not open");
714 using namespace XrdCl;
723 #if XrdVNUMBER >= 40000 724 std::string dataServerStr;
725 if( !
fFile->GetProperty(
"DataServer", dataServerStr ) )
727 URL dataServer(dataServerStr);
729 URL dataServer(
fFile->GetDataServer());
731 FileSystem fs(dataServer);
734 arg.FromString(std::string(
"readv_ior_max readv_iov_max"));
736 XRootDStatus status = fs.Query(QueryCode::Config, arg, response);
743 std::vector<TString> resps;
744 while (TString(response->ToString()).Tokenize(token, from,
"\n"))
745 resps.push_back(token);
747 if (resps.size() != 2)
750 if (resps[0].IsDigit())
753 if (resps[1].IsDigit())
774 XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
775 const char *cenv = 0;
779 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_CONNECTIONWINDOW"))
780 || strlen(cenv) <= 0))
781 env->PutInt(
"ConnectionWindow", val.Atoi());
784 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_CONNECTIONRETRY"))
785 || strlen(cenv) <= 0))
786 env->PutInt(
"RequestTimeout", val.Atoi());
789 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_REQUESTTIMEOUT"))
790 || strlen(cenv) <= 0))
791 env->PutInt(
"RequestTimeout", val.Atoi());
793 val =
gEnv->
GetValue(
"NetXNG.SubStreamsPerChannel",
"");
794 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_SUBSTREAMSPERCHANNEL"))
795 || strlen(cenv) <= 0))
796 env->PutInt(
"SubStreamsPerChannel", val.Atoi());
799 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_TIMEOUTRESOLUTION"))
800 || strlen(cenv) <= 0))
801 env->PutInt(
"TimeoutResolution", val.Atoi());
804 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_STREAMERRORWINDOW"))
805 || strlen(cenv) <= 0))
806 env->PutInt(
"StreamErrorWindow", val.Atoi());
809 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_RUNFORKHANDLER"))
810 || strlen(cenv) <= 0))
811 env->PutInt(
"RunForkHandler", val.Atoi());
814 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_REDIRECTLIMIT"))
815 || strlen(cenv) <= 0))
816 env->PutInt(
"RedirectLimit", val.Atoi());
819 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_WORKERTHREADS"))
820 || strlen(cenv) <= 0))
821 env->PutInt(
"WorkerThreads", val.Atoi());
824 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_CPCHUNKSIZE"))
825 || strlen(cenv) <= 0))
826 env->PutInt(
"CPChunkSize", val.Atoi());
829 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_CPPARALLELCHUNKS"))
830 || strlen(cenv) <= 0))
831 env->PutInt(
"CPParallelChunks", val.Atoi());
834 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_POLLERPREFERENCE"))
835 || strlen(cenv) <= 0))
836 env->PutString(
"PollerPreference", val.Data());
839 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_CLIENTMONITOR"))
840 || strlen(cenv) <= 0))
841 env->PutString(
"ClientMonitor", val.Data());
844 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XRD_CLIENTMONITORPARAM"))
845 || strlen(cenv) <= 0))
846 env->PutString(
"ClientMonitorParam", val.Data());
849 env->PutInt(
"MultiProtocol",
gEnv->
GetValue(
"TFile.CrossProtocolRedirects", 1));
858 if (val.Length() > 0)
862 if (val.Length() > 0)
866 if (val.Length() > 0)
870 if (val.Length() > 0)
874 if (val.Length() > 0)
878 if (val.Length() > 0)
882 if (val.Length() > 0)
886 if (val.Length() > 0)
890 if (val.Length() > 0)
894 if (val.Length() > 0)
898 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XrdSecGSIPROXYDEPLEN"))
899 || strlen(cenv) <= 0))
903 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XrdSecGSICRLCHECK"))
904 || strlen(cenv) <= 0))
908 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XrdSecGSIDELEGPROXY"))
909 || strlen(cenv) <= 0))
913 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XrdSecGSISIGNPROXY"))
914 || strlen(cenv) <= 0))
918 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XrdSecPWDAUTOLOG"))
919 || strlen(cenv) <= 0))
923 if (val.Length() > 0 && (!(cenv =
gSystem->
Getenv(
"XrdSecPWDVERIFYSRV"))
924 || strlen(cenv) <= 0))
Long64_t GetRelOffset() const
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
virtual Bool_t WriteBuffer(const char *buffer, Int_t length)
Write a data chunk.
virtual void Init(Bool_t create)
Initialize the file.
Long64_t fBytesWrite
Number of bytes written to this file.
EAsyncOpenStatus
Asynchronous open request status.
static std::atomic< Long64_t > fgBytesRead
Number of bytes read by all TFile objects.
virtual Bool_t IsUseable() const
Check the file is open and isn't a zombie.
virtual void SetOffset(Long64_t offset, ERelativeTo pos=kBeg)
Set position from where to start reading.
This class represents a WWW compatible URL.
virtual Bool_t SendFileOpenProgress(TFile *, TList *, const char *, Bool_t=kFALSE)
virtual void SetEnv()
Map ROOT and xrootd environment variables.
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format...
virtual const char * HomeDirectory(const char *userName=0)
Return the user's home directory.
Int_t WriteBufferViaCache(const char *buf, Int_t len)
Write buffer via cache.
Int_t fReadCalls
Number of read calls ( not counting the cache calls )
virtual Bool_t IsOpen() const
Check if the file is open.
const char * GetUrl(Bool_t withDeflt=kFALSE) const
Return full URL.
virtual void Close(const Option_t *option="")
Close the file.
R__EXTERN TVirtualMonitoringWriter * gMonitoringWriter
virtual const char * Getenv(const char *env)
Get environment variable.
Int_t Wait()
If the semaphore value is > 0 then decrement it and carry on, else block, waiting on the condition un...
virtual void Setenv(const char *name, const char *value)
Set environment variable.
R__EXTERN TSystem * gSystem
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
virtual ~TNetXNGFile()
Destructor.
virtual Int_t ReOpen(Option_t *modestr)
Reopen the file with the new access mode.
virtual void Seek(Long64_t offset, ERelativeTo position=kBeg)
Set the position within the file.
EAsyncOpenStatus fAsyncOpenStatus
!Status of an asynchronous open request
virtual Long64_t GetSize() const
Get the file size.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
static std::atomic< Long64_t > fgBytesWrite
Number of bytes written by all TFile objects.
static std::atomic< Int_t > fgReadCalls
Number of bytes read from all TFile objects.
Int_t ParseOpenMode(Option_t *in, TString &modestr, XrdCl::OpenFlags::Flags &mode, Bool_t assumeRead)
Parse a file open mode given as a string into a canonically formatted output mode string and an integ...
virtual Bool_t ReadBuffer(char *buffer, Int_t length)
Read a data chunk of the given size.
virtual void Init(Bool_t create)
Initialize a TFile object.
XrdCl::OpenFlags::Flags fMode
Int_t ReadBufferViaCache(char *buf, Int_t len)
Read buffer via cache.
Bool_t fWritable
True if directory is writable.
virtual Bool_t ReadBuffers(char *buffer, Long64_t *position, Int_t *length, Int_t nbuffs)
Read scattered data chunks in one operation.
void SetAnchor(const char *anchor)
The TTimeStamp encapsulates seconds and ns since EPOCH.
TList * fOpenPhases
!Time info about open phases
virtual Bool_t GetVectorReadLimits()
Find the server-specific readv config params.
TString fOption
File options.
virtual Bool_t SendFileReadProgress(TFile *)
XrdSysCondVar * fInitCondVar
virtual void Flush()
Synchronize a file's in-memory and on-disk states.
Bool_t FlushWriteCache()
Flush the write cache if active.
virtual void SetAsyncOpenStatus(EAsyncOpenStatus status)
Set the status of an asynchronous file open.
TString ToUpper(const TString &s)
Return an upper-case version of str.
Long64_t fArchiveOffset
!Offset at which file starts in archive
Long64_t fOffset
!Seek offset cache
Bool_t fInitDone
!True if the file has been initialized
Long64_t fBytesRead
Number of bytes read from this file.
virtual void Close(Option_t *option="")
Close a file.