#ifndef ROOT_TProofServ
#define ROOT_TProofServ
#ifndef ROOT_TApplication
#include "TApplication.h"
#endif
#ifndef ROOT_TString
#include "TString.h"
#endif
#ifndef ROOT_TSysEvtHandler
#include "TSysEvtHandler.h"
#endif
#ifndef ROOT_TStopwatch
#include "TStopwatch.h"
#endif
#ifndef ROOT_TTimer
#include "TTimer.h"
#endif
#ifndef ROOT_TProofQueryResult
#include "TProofQueryResult.h"
#endif
class TDataSetManager;
class TDataSetManagerFile;
class TDSet;
class TDSetElement;
class TFileCollection;
class TFileHandler;
class THashList;
class TIdleTOTimer;
class TList;
class TMap;
class TMessage;
class TMonitor;
class TMutex;
class TProof;
class TProofLockPath;
class TQueryResultManager;
class TReaperTimer;
class TServerSocket;
class TShutdownTimer;
class TSocket;
class TVirtualProofPlayer;
typedef Int_t (*OldProofServAuthSetup_t)(TSocket *, Bool_t, Int_t,
TString &, TString &, TString &);
class TProofServ : public TApplication {
friend class TProofServLite;
friend class TXProofServ;
public:
enum EStatusBits { kHighMemory = BIT(16) };
enum EQueryAction { kQueryOK, kQueryModify, kQueryStop, kQueryEnqueued };
private:
TString fService;
TString fUser;
TString fGroup;
TString fConfDir;
TString fConfFile;
TString fWorkDir;
TString fImage;
TString fSessionTag;
TString fTopSessionTag;
TString fSessionDir;
TString fPackageDir;
THashList *fGlobalPackageDirList;
TString fCacheDir;
TString fQueryDir;
TString fDataSetDir;
TString fDataDir;
TString fDataDirOpts;
TString fAdminPath;
TString fOutputFile;
TProofLockPath *fPackageLock;
TProofLockPath *fCacheLock;
TProofLockPath *fQueryLock;
TString fArchivePath;
TSocket *fSocket;
TProof *fProof;
TVirtualProofPlayer *fPlayer;
FILE *fLogFile;
Int_t fLogFileDes;
Long64_t fLogFileMaxSize;
TList *fEnabledPackages;
Int_t fProtocol;
TString fOrdinal;
Int_t fGroupId;
Int_t fGroupSize;
Int_t fLogLevel;
Int_t fNcmd;
Int_t fGroupPriority;
Bool_t fEndMaster;
Bool_t fMasterServ;
Bool_t fInterrupt;
Float_t fRealTime;
Float_t fCpuTime;
TStopwatch fLatency;
TStopwatch fCompute;
TStopwatch fSaveOutput;
Int_t fQuerySeqNum;
Int_t fTotSessions;
Int_t fActSessions;
Float_t fEffSessions;
TFileHandler *fInputHandler;
TQueryResultManager *fQMgr;
TList *fWaitingQueries;
Bool_t fIdle;
TMutex *fQMtx;
TList *fQueuedMsg;
TString fPrefix;
Bool_t fRealTimeLog;
TShutdownTimer *fShutdownTimer;
TReaperTimer *fReaperTimer;
TIdleTOTimer *fIdleTOTimer;
Int_t fCompressMsg;
TDataSetManager* fDataSetManager;
TDataSetManagerFile *fDataSetStgRepo;
Bool_t fSendLogToMaster;
TServerSocket *fMergingSocket;
TMonitor *fMergingMonitor;
Int_t fMergedWorkers;
Int_t fMaxQueries;
Long64_t fMaxBoxSize;
Long64_t fHWMBoxSize;
static Long_t fgVirtMemMax;
static Long_t fgResMemMax;
static Float_t fgMemHWM;
static Float_t fgMemStop;
Long64_t fMsgSizeHWM;
static FILE *fgErrorHandlerFile;
static Int_t fgRecursive;
static Int_t fgLogToSysLog;
static TString fgSysLogService;
static TString fgSysLogEntity;
Int_t GetCompressionLevel() const;
void RedirectOutput(const char *dir = 0, const char *mode = "w");
Int_t CatMotd();
Int_t UnloadPackage(const char *package);
Int_t UnloadPackages();
Int_t OldAuthSetup(TString &wconf);
Int_t GetPriority();
TProofQueryResult *MakeQueryResult(Long64_t nentries, const char *opt,
TList *inl, Long64_t first, TDSet *dset,
const char *selec, TObject *elist);
void SetQueryRunning(TProofQueryResult *pq);
Int_t SendResults(TSocket *sock, TList *outlist = 0, TQueryResult *pq = 0);
Bool_t AcceptResults(Int_t connections, TVirtualProofPlayer *mergerPlayer);
void SetIdle(Bool_t st = kTRUE);
Bool_t IsWaiting();
Int_t WaitingQueries();
Int_t QueueQuery(TProofQueryResult *pq);
TProofQueryResult *NextQuery();
Int_t CleanupWaitingQueries(Bool_t del = kTRUE, TList *qls = 0);
protected:
virtual void HandleArchive(TMessage *mess, TString *slb = 0);
virtual Int_t HandleCache(TMessage *mess, TString *slb = 0);
virtual void HandleCheckFile(TMessage *mess, TString *slb = 0);
virtual Int_t HandleDataSets(TMessage *mess, TString *slb = 0);
virtual void HandleSubmerger(TMessage *mess);
virtual void HandleFork(TMessage *mess);
virtual void HandleLibIncPath(TMessage *mess);
virtual void HandleProcess(TMessage *mess, TString *slb = 0);
virtual void HandleQueryList(TMessage *mess);
virtual void HandleRemove(TMessage *mess, TString *slb = 0);
virtual void HandleRetrieve(TMessage *mess, TString *slb = 0);
virtual Int_t HandleWorkerLists(TMessage *mess);
virtual void ProcessNext(TString *slb = 0);
virtual Int_t Setup();
Int_t SetupCommon();
virtual void MakePlayer();
virtual void DeletePlayer();
virtual Int_t Fork();
Int_t GetSessionStatus();
Bool_t IsIdle();
Bool_t UnlinkDataDir(const char *path);
static TString fgLastMsg;
static Long64_t fgLastEntry;
public:
TProofServ(Int_t *argc, char **argv, FILE *flog = 0);
virtual ~TProofServ();
virtual Int_t CreateServer();
TProof *GetProof() const { return fProof; }
const char *GetService() const { return fService; }
const char *GetConfDir() const { return fConfDir; }
const char *GetConfFile() const { return fConfFile; }
const char *GetUser() const { return fUser; }
const char *GetGroup() const { return fGroup; }
const char *GetWorkDir() const { return fWorkDir; }
const char *GetImage() const { return fImage; }
const char *GetSessionTag() const { return fSessionTag; }
const char *GetTopSessionTag() const { return fTopSessionTag; }
const char *GetSessionDir() const { return fSessionDir; }
const char *GetPackageDir() const { return fPackageDir; }
const char *GetDataDir() const { return fDataDir; }
const char *GetDataDirOpts() const { return fDataDirOpts; }
Int_t GetProtocol() const { return fProtocol; }
const char *GetOrdinal() const { return fOrdinal; }
Int_t GetGroupId() const { return fGroupId; }
Int_t GetGroupSize() const { return fGroupSize; }
Int_t GetLogLevel() const { return fLogLevel; }
TSocket *GetSocket() const { return fSocket; }
Float_t GetRealTime() const { return fRealTime; }
Float_t GetCpuTime() const { return fCpuTime; }
Int_t GetQuerySeqNum() const { return fQuerySeqNum; }
Int_t GetTotSessions() const { return fTotSessions; }
Int_t GetActSessions() const { return fActSessions; }
Float_t GetEffSessions() const { return fEffSessions; }
void GetOptions(Int_t *argc, char **argv);
TList *GetEnabledPackages() const { return fEnabledPackages; }
static Long_t GetVirtMemMax();
static Long_t GetResMemMax();
static Float_t GetMemHWM();
static Float_t GetMemStop();
Long64_t GetMsgSizeHWM() const { return fMsgSizeHWM; }
const char *GetPrefix() const { return fPrefix; }
void FlushLogFile();
void TruncateLogFile();
TProofLockPath *GetCacheLock() { return fCacheLock; }
Int_t CopyFromCache(const char *name, Bool_t cpbin);
Int_t CopyToCache(const char *name, Int_t opt = 0);
virtual EQueryAction GetWorkers(TList *workers, Int_t &prioritychange,
Bool_t resume = kFALSE);
virtual void HandleException(Int_t sig);
virtual Int_t HandleSocketInput(TMessage *mess, Bool_t all);
virtual void HandleSocketInput();
virtual void HandleUrgentData();
virtual void HandleSigPipe();
virtual void HandleTermination() { Terminate(0); }
void Interrupt() { fInterrupt = kTRUE; }
Bool_t IsEndMaster() const { return fEndMaster; }
Bool_t IsMaster() const { return fMasterServ; }
Bool_t IsParallel() const;
Bool_t IsTopMaster() const { return fOrdinal == "0"; }
void Run(Bool_t retrn = kFALSE);
void Print(Option_t *option="") const;
void RestartComputeTime();
TObject *Get(const char *namecycle);
TDSetElement *GetNextPacket(Long64_t totalEntries = -1);
virtual void ReleaseWorker(const char *) { }
void Reset(const char *dir);
Int_t ReceiveFile(const char *file, Bool_t bin, Long64_t size);
virtual Int_t SendAsynMessage(const char *msg, Bool_t lf = kTRUE);
virtual void SendLogFile(Int_t status = 0, Int_t start = -1, Int_t end = -1);
void SendStatistics();
void SendParallel(Bool_t async = kFALSE);
Int_t UpdateSessionStatus(Int_t xst = -1);
virtual void DisableTimeout() { }
virtual void EnableTimeout() { }
virtual void Terminate(Int_t status);
void LogToMaster(Bool_t on = kTRUE) { fSendLogToMaster = on; }
static FILE *SetErrorHandlerFile(FILE *ferr);
static void ErrorHandler(Int_t level, Bool_t abort, const char *location,
const char *msg);
static void ResolveKeywords(TString &fname, const char *path = 0);
static void SetLastMsg(const char *lastmsg);
static void SetLastEntry(Long64_t lastentry);
static void FilterLocalroot(TString &path, const char *url = "root://dum/");
static void GetLocalServer(TString &dsrv);
static TMap *GetDataSetNodeMap(TFileCollection *fc, TString &emsg);
static Int_t RegisterDataSets(TList *in, TList *out, TDataSetManager *dsm, TString &e);
static Bool_t IsActive();
static TProofServ *This();
ClassDef(TProofServ,0)
};
R__EXTERN TProofServ *gProofServ;
class TProofLockPath : public TNamed {
private:
Int_t fLockId;
public:
TProofLockPath(const char *path) : TNamed(path,path), fLockId(-1) { }
~TProofLockPath() { if (IsLocked()) Unlock(); }
Int_t Lock();
Int_t Unlock();
Bool_t IsLocked() const { return (fLockId > -1); }
};
class TProofLockPathGuard {
private:
TProofLockPath *fLocker;
public:
TProofLockPathGuard(TProofLockPath *l) { fLocker = l; if (fLocker) fLocker->Lock(); }
~TProofLockPathGuard() { if (fLocker) fLocker->Unlock(); }
};
class TProofServLogHandler : public TFileHandler {
private:
TSocket *fSocket;
FILE *fFile;
TString fPfx;
static TString fgPfx;
static Int_t fgCmdRtn;
public:
enum EStatusBits { kFileIsPipe = BIT(23) };
TProofServLogHandler(const char *cmd, TSocket *s, const char *pfx = "");
TProofServLogHandler(FILE *f, TSocket *s, const char *pfx = "");
virtual ~TProofServLogHandler();
Bool_t IsValid() { return ((fFile && fSocket) ? kTRUE : kFALSE); }
Bool_t Notify();
Bool_t ReadNotify() { return Notify(); }
static void SetDefaultPrefix(const char *pfx);
static Int_t GetCmdRtn();
};
class TProofServLogHandlerGuard {
private:
TProofServLogHandler *fExecHandler;
public:
TProofServLogHandlerGuard(const char *cmd, TSocket *s,
const char *pfx = "", Bool_t on = kTRUE);
TProofServLogHandlerGuard(FILE *f, TSocket *s,
const char *pfx = "", Bool_t on = kTRUE);
virtual ~TProofServLogHandlerGuard();
};
class TShutdownTimer : public TTimer {
private:
TProofServ *fProofServ;
Int_t fTimeout;
public:
TShutdownTimer(TProofServ *p, Int_t delay);
Bool_t Notify();
};
class TReaperTimer : public TTimer {
private:
TList *fChildren;
public:
TReaperTimer(Long_t frequency = 1000) : TTimer(frequency, kTRUE), fChildren(0) { }
virtual ~TReaperTimer();
void AddPid(Int_t pid);
Bool_t Notify();
};
class TIdleTOTimer : public TTimer {
private:
TProofServ *fProofServ;
public:
TIdleTOTimer(TProofServ *p, Int_t delay) : TTimer(delay, kTRUE), fProofServ(p) { }
Bool_t Notify();
};
class TIdleTOTimerGuard {
private:
TIdleTOTimer *fIdleTOTimer;
public:
TIdleTOTimerGuard(TIdleTOTimer *t) : fIdleTOTimer(t) { if (fIdleTOTimer) fIdleTOTimer->Stop(); }
virtual ~TIdleTOTimerGuard() { if (fIdleTOTimer) fIdleTOTimer->Start(-1, kTRUE); }
};
inline Int_t TProofServ::GetCompressionLevel() const
{
return (fCompressMsg < 0) ? -1 : fCompressMsg % 100;
}
#endif