58 fNWorkers(0), fMaxNEntries(0),
59 fProcessedEntries(0), fS(), fPid(0), fNWorker(0),
60 fTreeCache(0), fTreeCacheIsLearning(
kFALSE),
61 fUseTreeCache(
kTRUE), fCacheSize(-1)
67 const std::string& treeName,
125 Error(
"TMPWorker::Run",
"Lost connection to client\n");
129 if (msg.first < 1000)
147 unsigned code = msg.first;
149 std::string reply =
fId;
163 reply +=
": unknown code received. code=" + std::to_string(code);
189 if (fp ==
nullptr || fp->
IsZombie()) {
190 std::stringstream ss;
191 ss <<
"could not open file " << fileName;
192 std::string errmsg = ss.str();
222 if (tree ==
nullptr) {
223 std::stringstream ss;
224 ss <<
"cannot find tree with name " <<
fTreeName <<
" in file " << fp->
GetName();
225 std::string errmsg = ss.str();
253 Info(
"SetupTreeCache",
"the tree cache is in learning phase");
256 Warning(
"SetupTreeCache",
"default tree does not have a file attached: corruption? Tree cache untouched");
270 std::string reply =
fId +
": " + errmsg;
virtual const char * GetName() const
Returns name of object.
Bool_t fTreeCacheIsLearning
virtual void HandleInput(MPCodeBufPair &msg)
Handle a message with an EMPCode.
pid_t fPid
the PID of the process in which this worker is running
TFile * OpenFile(const std::string &fileName)
Handle file opening.
virtual void SetCacheRead(TFileCacheRead *cache, TObject *tree=0, ECacheAction action=kDisconnect)
Set a pointer to the read cache.
unsigned fNWorker
the ordinal number of this worker (0 to nWorkers-1)
virtual const char * GetClassName() const
A specialized TFileCacheRead object for a TTree.
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format...
virtual TObject * Get(const char *namecycle)
Return pointer to object identified by namecycle.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Error while reading from the socket.
ULong64_t fProcessedEntries
the number of entries processed by this worker so far
std::string fTreeName
the name of the tree to be processed
void SendError(const std::string &errmsg, unsigned int code=MPCode::kError)
Error sender.
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=1, Int_t netopt=0)
Create / open a file.
unsigned GetNWorker() const
virtual Long64_t GetCacheSize() const
void Info(const char *location, const char *msgfmt,...)
virtual void ResetCache()
This will simply clear the cache.
Book space in a file, create I/O buffers, to fill them, (un)compress them.
TTree * fTree
pointer to the tree to be processed. It is only used if the tree is directly passed to TProcessExecut...
ULong64_t fMaxNEntries
the maximum number of entries to be processed by this worker
Used by the workers to notify client of shutdown.
void Error(const char *location, const char *msgfmt,...)
void SetupTreeCache(TTree *tree)
Tree cache handling.
std::unique_ptr< TSocket > fS
This worker's socket. The unique_ptr makes sure resources are released.
Fatal error: whoever sends this message is terminating execution.
void CloseFile()
Handle file closing.
TFile * fFile
last open file
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
R__EXTERN TSystem * gSystem
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
TMPWorker()
Class constructors.
unsigned fNWorkers
the number of workers spawned
TFile * GetCurrentFile() const
Return pointer to the current file.
void Warning(const char *location, const char *msgfmt,...)
Used by the client to tell servers to shutdown.
Tell the client there was an error while processing.
unsigned long long ULong64_t
virtual Int_t SetCacheSize(Long64_t cachesize=-1)
Set maximum size of the file cache .
std::vector< std::string > fFileNames
the files to be processed by all workers
virtual Bool_t IsLearning() const
std::string fId
identifier string in the form W<nwrk>|P<proc id>="">
virtual TList * GetListOfKeys() const
virtual void Exit(int code, Bool_t mode=kTRUE)
Exit the application.
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
TFileCacheRead * GetCacheRead(TObject *tree=0) const
Return a pointer to the current read cache.
A TTree object has a header with a name and a title.
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
void Setup()
Auxilliary method for common initializations.
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
TTree * RetrieveTree(TFile *fp)
Retrieve a tree from an open file.