#ifndef ROOT_TProof
#define ROOT_TProof
#ifndef ROOT_TProofMgr
#include "TProofMgr.h"
#endif
#ifndef ROOT_TProofDebug
#include "TProofDebug.h"
#endif
#ifndef ROOT_TString
#include "TString.h"
#endif
#ifndef ROOT_TMacro
#include "TMacro.h"
#endif
#ifndef ROOT_MessageTypes
#include "MessageTypes.h"
#endif
#ifndef ROOT_TMD5
#include "TMD5.h"
#endif
#ifndef ROOT_TRegexp
#include "TRegexp.h"
#endif
#ifndef ROOT_TSysEvtHandler
#include "TSysEvtHandler.h"
#endif
#ifndef ROOT_TThread
#include "TThread.h"
#endif
#ifndef ROOT_TUrl
#include "TUrl.h"
#endif
#ifndef ROOT_TProofOutputList
#include "TProofOutputList.h"
#endif
#ifndef ROOT_TStopwatch
#include "TStopwatch.h"
#endif
#include <map>
#ifdef R__GLOBALSTL
namespace std { using ::map; }
#endif
#define CANNOTUSE(x) Info(x,"Not manager: cannot use this method")
class TChain;
class TCondor;
class TCondorSlave;
class TDrawFeedback;
class TDSet;
class TEventList;
class THashList;
class TList;
class TCollection;
class TMessage;
class TMonitor;
class TPluginHandler;
class TProof;
class TProofInputHandler;
class TProofInterruptHandler;
class TProofLockPath;
class TVirtualProofPlayer;
class TProofPlayer;
class TProofPlayerRemote;
class TProofProgressDialog;
class TProofServ;
class TQueryResult;
class TSignalHandler;
class TSlave;
class TSemaphore;
class TSocket;
class TTree;
class TVirtualMutex;
class TFileCollection;
class TMap;
class TDataSetManager;
class TDataSetManagerFile;
class TMacro;
class TSelector;
const Int_t kPROOF_Protocol = 36;
const Int_t kPROOF_Port = 1093;
const char* const kPROOF_ConfFile = "proof.conf";
const char* const kPROOF_ConfDir = "/usr/local/root";
const char* const kPROOF_WorkDir = ".proof";
const char* const kPROOF_CacheDir = "cache";
const char* const kPROOF_PackDir = "packages";
const char* const kPROOF_PackDownloadDir = "downloaded";
const char* const kPROOF_QueryDir = "queries";
const char* const kPROOF_DataSetDir = "datasets";
const char* const kPROOF_DataDir = "data";
const char* const kPROOF_CacheLockFile = "proof-cache-lock-";
const char* const kPROOF_PackageLockFile = "proof-package-lock-";
const char* const kPROOF_QueryLockFile = "proof-query-lock-";
const char* const kPROOF_TerminateWorker = "+++ terminating +++";
const char* const kPROOF_WorkerIdleTO = "+++ idle-timeout +++";
const char* const kPROOF_InputDataFile = "inputdata.root";
const char* const kPROOF_MissingFiles = "MissingFiles";
const Long64_t kPROOF_DynWrkPollInt_s = 10;
#ifndef R__WIN32
const char* const kCP = "/bin/cp -fp";
const char* const kRM = "/bin/rm -rf";
const char* const kLS = "/bin/ls -l";
const char* const kUNTAR = "%s -c %s/%s | (cd %s; tar xf -)";
const char* const kUNTAR2 = "%s -c %s | (cd %s; tar xf -)";
const char* const kUNTAR3 = "%s -c %s | (tar xf -)";
const char* const kGUNZIP = "gunzip";
#else
const char* const kCP = "copy";
const char* const kRM = "delete";
const char* const kLS = "dir";
const char* const kUNTAR = "...";
const char* const kUNTAR2 = "...";
const char* const kUNTAR3 = "...";
const char* const kGUNZIP = "gunzip";
#endif
R__EXTERN TVirtualMutex *gProofMutex;
typedef void (*PrintProgress_t)(Long64_t tot, Long64_t proc, Float_t proctime, Long64_t bytes);
class TProofProgressInfo : public TObject {
public:
Long64_t fTotal;
Long64_t fProcessed;
Long64_t fBytesRead;
Float_t fInitTime;
Float_t fProcTime;
Float_t fEvtRateI;
Float_t fMBRateI;
Int_t fActWorkers;
Int_t fTotSessions;
Float_t fEffSessions;
TProofProgressInfo(Long64_t tot = 0, Long64_t proc = 0, Long64_t bytes = 0,
Float_t initt = -1., Float_t proct = -1.,
Float_t evts = -1., Float_t mbs = -1.,
Int_t actw = 0, Int_t tsess = 0, Float_t esess = 0.) :
fTotal(tot), fProcessed(proc), fBytesRead(bytes),
fInitTime(initt), fProcTime(proct), fEvtRateI(evts), fMBRateI(mbs),
fActWorkers(actw), fTotSessions(tsess), fEffSessions(esess) { }
virtual ~TProofProgressInfo() { }
ClassDef(TProofProgressInfo, 1);
};
class TProofInterruptHandler : public TSignalHandler {
private:
TProof *fProof;
TProofInterruptHandler(const TProofInterruptHandler&);
TProofInterruptHandler& operator=(const TProofInterruptHandler&);
public:
TProofInterruptHandler(TProof *p)
: TSignalHandler(kSigInterrupt, kFALSE), fProof(p) { }
Bool_t Notify();
};
class TProofInputHandler : public TFileHandler {
private:
TSocket *fSocket;
TProof *fProof;
TProofInputHandler(const TProofInputHandler&);
TProofInputHandler& operator=(const TProofInputHandler&);
public:
TProofInputHandler(TProof *p, TSocket *s);
Bool_t Notify();
Bool_t ReadNotify() { return Notify(); }
};
class TSlaveInfo : public TObject {
public:
enum ESlaveStatus { kActive, kNotActive, kBad };
TString fOrdinal;
TString fHostName;
TString fMsd;
TString fDataDir;
Int_t fPerfIndex;
SysInfo_t fSysInfo;
ESlaveStatus fStatus;
TSlaveInfo(const char *ordinal = "", const char *host = "", Int_t perfidx = 0,
const char *msd = "", const char *datadir = "") :
fOrdinal(ordinal), fHostName(host), fMsd(msd), fDataDir(datadir),
fPerfIndex(perfidx), fSysInfo(), fStatus(kNotActive) { }
const char *GetDataDir() const { return fDataDir; }
const char *GetMsd() const { return fMsd; }
const char *GetName() const { return fHostName; }
const char *GetOrdinal() const { return fOrdinal; }
SysInfo_t GetSysInfo() const { return fSysInfo; }
void SetStatus(ESlaveStatus stat) { fStatus = stat; }
void SetSysInfo(SysInfo_t si);
void SetOrdinal(const char *ord) { fOrdinal = ord; }
Int_t Compare(const TObject *obj) const;
Bool_t IsSortable() const { return kTRUE; }
void Print(Option_t *option="") const;
Bool_t IsEqual(const TObject* obj) const;
ClassDef(TSlaveInfo,4)
};
class TMergerInfo : public TObject {
private:
TSlave *fMerger;
Int_t fPort;
Int_t fMergedObjects;
Int_t fWorkersToMerge;
Int_t fMergedWorkers;
TList *fWorkers;
Bool_t fIsActive;
TMergerInfo(const TMergerInfo&);
TMergerInfo& operator=(const TMergerInfo&);
public:
TMergerInfo(TSlave *t, Int_t port, Int_t forHowManyWorkers) :
fMerger(t), fPort(port), fMergedObjects(0), fWorkersToMerge(forHowManyWorkers),
fMergedWorkers(0), fWorkers(0), fIsActive(kTRUE) { }
virtual ~TMergerInfo();
void AddWorker(TSlave *sl);
TList *GetWorkers() { return fWorkers; }
TSlave *GetMerger() { return fMerger; }
Int_t GetPort() { return fPort; }
Int_t GetWorkersToMerge() { return fWorkersToMerge; }
Int_t GetMergedWorkers() { return fMergedWorkers; }
Int_t GetMergedObjects() { return fMergedObjects; }
void SetMergedWorker();
void AddMergedObjects(Int_t objects) { fMergedObjects += objects; }
Bool_t AreAllWorkersAssigned();
Bool_t AreAllWorkersMerged();
void Deactivate() { fIsActive = kFALSE; }
Bool_t IsActive() { return fIsActive; }
ClassDef(TMergerInfo,0)
};
class TProofMergePrg {
private:
TString fExp;
Int_t fIdx;
Int_t fNWrks;
Int_t fLastNWrks;
static char fgCr[4];
public:
TProofMergePrg() : fExp(), fIdx(-1), fNWrks(-1), fLastNWrks(-1) { }
const char *Export(Bool_t &changed) {
fExp.Form("%c (%d workers still sending) ", fgCr[fIdx], fNWrks);
changed = (fLastNWrks != fNWrks || fLastNWrks == -1) ? kTRUE : kFALSE;
fLastNWrks = fNWrks;
return fExp.Data(); }
void DecreaseNWrks() { fNWrks--; }
void IncreaseNWrks() { fNWrks++; }
void IncreaseIdx() { fIdx++; if (fIdx == 4) fIdx = 0; }
void Reset(Int_t n = -1) { fIdx = -1; SetNWrks(n); }
void SetNWrks(Int_t n) { fNWrks = n; }
};
class TProof : public TNamed, public TQObject {
friend class TPacketizer;
friend class TPacketizerDev;
friend class TPacketizerAdaptive;
friend class TProofLite;
friend class TDataSetManager;
friend class TProofServ;
friend class TProofInputHandler;
friend class TProofInterruptHandler;
friend class TProofPlayer;
friend class TProofPlayerLite;
friend class TProofPlayerRemote;
friend class TProofProgressDialog;
friend class TSlave;
friend class TSlaveLite;
friend class TVirtualPacketizer;
friend class TXSlave;
friend class TXSocket;
friend class TXSocketHandler;
friend class TXProofMgr;
friend class TXProofServ;
public:
enum EStatusBits {
kUsingSessionGui = BIT(14),
kNewInputData = BIT(15),
kIsClient = BIT(16),
kIsMaster = BIT(17),
kIsTopMaster = BIT(18),
kUseProgressDialog = BIT(19)
};
enum EQueryMode {
kSync = 0,
kAsync = 1
};
enum EUploadOpt {
kAppend = 0x1,
kOverwriteDataSet = 0x2,
kNoOverwriteDataSet = 0x4,
kOverwriteAllFiles = 0x8,
kOverwriteNoFiles = 0x10,
kAskUser = 0x0
};
enum ERegisterOpt {
kFailIfExists = 0,
kOverwriteIfExists = 1,
kMergeIfExists = 2
};
enum EUploadDataSetAnswer {
kError = -1,
kDataSetExists = -2,
kFail = -3
};
enum EUploadPackageOpt {
kUntar = 0x0,
kRemoveOld = 0x1
};
enum ERunStatus {
kRunning = 0,
kStopped = 1,
kAborted = 2
};
enum ESubMerger {
kOutputSize = 1,
kSendOutput = 2,
kBeMerger = 3,
kMergerDown = 4,
kStopMerging = 5,
kOutputSent = 6
};
enum EProofClearData {
kPurge = 0x1,
kUnregistered = 0x2,
kDataset = 0x4,
kForceClear = 0x8
};
private:
enum EUrgent {
kLocalInterrupt = -1,
kPing = 0,
kHardInterrupt = 1,
kSoftInterrupt,
kShutdownInterrupt
};
enum EProofCacheCommands {
kShowCache = 1,
kClearCache = 2,
kShowPackages = 3,
kClearPackages = 4,
kClearPackage = 5,
kBuildPackage = 6,
kLoadPackage = 7,
kShowEnabledPackages = 8,
kShowSubCache = 9,
kClearSubCache = 10,
kShowSubPackages = 11,
kDisableSubPackages = 12,
kDisableSubPackage = 13,
kBuildSubPackage = 14,
kUnloadPackage = 15,
kDisablePackage = 16,
kUnloadPackages = 17,
kDisablePackages = 18,
kListPackages = 19,
kListEnabledPackages = 20,
kLoadMacro = 21
};
enum EProofDataSetCommands {
kUploadDataSet = 1,
kCheckDataSetName = 2,
kGetDataSets = 3,
kRegisterDataSet = 4,
kGetDataSet = 5,
kVerifyDataSet = 6,
kRemoveDataSet = 7,
kMergeDataSet = 8,
kShowDataSets = 9,
kGetQuota = 10,
kShowQuota = 11,
kSetDefaultTreeName = 12,
kCache = 13,
kRequestStaging = 14,
kStagingStatus = 15,
kCancelStaging = 16
};
enum ESendFileOpt {
kAscii = 0x0,
kBinary = 0x1,
kForce = 0x2,
kForward = 0x4,
kCpBin = 0x8,
kCp = 0x10
};
enum EProofWrkListAction {
kActivateWorker = 1,
kDeactivateWorker = 2
};
enum EBuildPackageOpt {
kDontBuildOnClient = -2,
kBuildOnSlavesNoWait = -1,
kBuildAll = 0,
kCollectBuildResults = 1
};
enum EParCheckVersionOpt {
kDontCheck = 0,
kCheckROOT = 1,
kCheckSVN = 2
};
enum EProofShowQuotaOpt {
kPerGroup = 0x1,
kPerUser = 0x2
};
Bool_t fValid;
Bool_t fTty;
TString fMaster;
TString fWorkDir;
TString fGroup;
Int_t fLogLevel;
Int_t fStatus;
Int_t fCheckFileStatus;
TList *fRecvMessages;
TList *fSlaveInfo;
Bool_t fSendGroupView;
Bool_t fIsPollingWorkers;
Long64_t fLastPollWorkers_s;
TList *fActiveSlaves;
TString fActiveSlavesSaved;
TList *fInactiveSlaves;
TList *fUniqueSlaves;
TList *fAllUniqueSlaves;
TList *fNonUniqueMasters;
TMonitor *fActiveMonitor;
TMonitor *fUniqueMonitor;
TMonitor *fAllUniqueMonitor;
TMonitor *fCurrentMonitor;
Long64_t fBytesRead;
Float_t fRealTime;
Float_t fCpuTime;
TSignalHandler *fIntHandler;
TPluginHandler *fProgressDialog;
Bool_t fProgressDialogStarted;
TVirtualProofPlayer *fPlayer;
TList *fFeedback;
TList *fChains;
struct MD5Mod_t {
TMD5 fMD5;
Long_t fModtime;
};
typedef std::map<TString, MD5Mod_t> FileMap_t;
FileMap_t fFileMap;
TDSet *fDSet;
Int_t fNotIdle;
Bool_t fSync;
ERunStatus fRunStatus;
Bool_t fIsWaiting;
Bool_t fRedirLog;
TString fLogFileName;
FILE *fLogFileW;
FILE *fLogFileR;
Bool_t fLogToWindowOnly;
Bool_t fSaveLogToMacro;
TMacro fMacroLog;
TProofMergePrg fMergePrg;
TList *fWaitingSlaves;
TList *fQueries;
Int_t fOtherQueries;
Int_t fDrawQueries;
Int_t fMaxDrawQueries;
Int_t fSeqNum;
Int_t fSessionID;
Bool_t fEndMaster;
TString fPackageDir;
THashList *fGlobalPackageDirList;
TProofLockPath *fPackageLock;
TList *fEnabledPackagesOnClient;
TList *fEnabledPackagesOnCluster;
TList *fInputData;
TString fInputDataFile;
TProofOutputList fOutputList;
PrintProgress_t fPrintProgress;
TVirtualMutex *fCloseMutex;
TList *fLoadedMacros;
static TList *fgProofEnvList;
Bool_t fMergersSet;
Bool_t fMergersByHost;
Int_t fMergersCount;
Int_t fWorkersToMerge;
Int_t fLastAssignedMerger;
TList *fMergers;
Bool_t fFinalizationRunning;
Int_t fRedirectNext;
TString fPerfTree;
TList *fWrksOutputReady;
static TPluginHandler *fgLogViewer;
protected:
enum ESlaves { kAll, kActive, kUnique, kAllUnique };
Bool_t fMasterServ;
TUrl fUrl;
TString fConfFile;
TString fConfDir;
TString fImage;
Int_t fProtocol;
TList *fSlaves;
TList *fTerminatedSlaveInfos;
TList *fBadSlaves;
TMonitor *fAllMonitor;
Bool_t fDataReady;
Long64_t fBytesReady;
Long64_t fTotalBytes;
TList *fAvailablePackages;
TList *fEnabledPackages;
TList *fRunningDSets;
Int_t fCollectTimeout;
TString fDataPoolUrl;
TProofMgr::EServType fServType;
TProofMgr *fManager;
EQueryMode fQueryMode;
Bool_t fDynamicStartup;
TSelector *fSelector;
TStopwatch fQuerySTW;
Float_t fPrepTime;
static TSemaphore *fgSemaphore;
private:
TProof(const TProof &);
void operator=(const TProof &);
void CleanGDirectory(TList *ol);
Int_t Exec(const char *cmd, ESlaves list, Bool_t plusMaster);
Int_t SendCommand(const char *cmd, ESlaves list = kActive);
Int_t SendCurrentState(ESlaves list = kActive);
Int_t SendCurrentState(TList *list);
Bool_t CheckFile(const char *file, TSlave *sl, Long_t modtime, Int_t cpopt = (kCp | kCpBin));
Int_t SendObject(const TObject *obj, ESlaves list = kActive);
Int_t SendGroupView();
Int_t SendInitialState();
Int_t SendPrint(Option_t *option="");
Int_t Ping(ESlaves list);
void Interrupt(EUrgent type, ESlaves list = kActive);
void AskStatistics();
void AskParallel();
Int_t GoParallel(Int_t nodes, Bool_t accept = kFALSE, Bool_t random = kFALSE);
Int_t GoMoreParallel(Int_t nWorkersToAdd);
Int_t SetParallelSilent(Int_t nodes, Bool_t random = kFALSE);
void RecvLogFile(TSocket *s, Int_t size);
void NotifyLogMsg(const char *msg, const char *sfx = "\n");
Int_t BuildPackage(const char *package, EBuildPackageOpt opt = kBuildAll, Int_t chkveropt = 2, TList *workers = 0);
Int_t BuildPackageOnClient(const char *package, Int_t opt = 0, TString *path = 0, Int_t chkveropt = 2);
Int_t LoadPackage(const char *package, Bool_t notOnClient = kFALSE, TList *loadopts = 0, TList *workers = 0);
Int_t LoadPackageOnClient(const char *package, TList *loadopts = 0);
Int_t UnloadPackage(const char *package);
Int_t UnloadPackageOnClient(const char *package);
Int_t UnloadPackages();
Int_t UploadPackageOnClient(const char *package, EUploadPackageOpt opt, TMD5 *md5);
Int_t DisablePackage(const char *package);
Int_t DisablePackageOnClient(const char *package);
Int_t DisablePackages();
void Activate(TList *slaves = 0);
Int_t Broadcast(const TMessage &mess, TList *slaves);
Int_t Broadcast(const TMessage &mess, ESlaves list = kActive);
Int_t Broadcast(const char *mess, Int_t kind, TList *slaves);
Int_t Broadcast(const char *mess, Int_t kind = kMESS_STRING, ESlaves list = kActive);
Int_t Broadcast(Int_t kind, TList *slaves) { return Broadcast(0, kind, slaves); }
Int_t Broadcast(Int_t kind, ESlaves list = kActive) { return Broadcast(0, kind, list); }
Int_t BroadcastFile(const char *file, Int_t opt, const char *rfile, TList *wrks);
Int_t BroadcastFile(const char *file, Int_t opt, const char *rfile = 0, ESlaves list = kAllUnique);
Int_t BroadcastGroupPriority(const char *grp, Int_t priority, ESlaves list = kAllUnique);
Int_t BroadcastGroupPriority(const char *grp, Int_t priority, TList *workers);
Int_t BroadcastObject(const TObject *obj, Int_t kind, TList *slaves);
Int_t BroadcastObject(const TObject *obj, Int_t kind = kMESS_OBJECT, ESlaves list = kActive);
Int_t BroadcastRaw(const void *buffer, Int_t length, TList *slaves);
Int_t BroadcastRaw(const void *buffer, Int_t length, ESlaves list = kActive);
Int_t Collect(const TSlave *sl, Long_t timeout = -1, Int_t endtype = -1, Bool_t deactonfail = kFALSE);
Int_t Collect(TMonitor *mon, Long_t timeout = -1, Int_t endtype = -1, Bool_t deactonfail = kFALSE);
Int_t CollectInputFrom(TSocket *s, Int_t endtype = -1, Bool_t deactonfail = kFALSE);
Int_t HandleInputMessage(TSlave *wrk, TMessage *m, Bool_t deactonfail = kFALSE);
void HandleSubmerger(TMessage *mess, TSlave *sl);
void SetMonitor(TMonitor *mon = 0, Bool_t on = kTRUE);
void ReleaseMonitor(TMonitor *mon);
virtual void FindUniqueSlaves();
TSlave *FindSlave(TSocket *s) const;
TList *GetListOfSlaves() const { return fSlaves; }
TList *GetListOfInactiveSlaves() const { return fInactiveSlaves; }
TList *GetListOfUniqueSlaves() const { return fUniqueSlaves; }
TList *GetListOfBadSlaves() const { return fBadSlaves; }
Int_t GetNumberOfSlaves() const;
Int_t GetNumberOfActiveSlaves() const;
Int_t GetNumberOfInactiveSlaves() const;
Int_t GetNumberOfUniqueSlaves() const;
Int_t GetNumberOfBadSlaves() const;
Bool_t IsEndMaster() const { return fEndMaster; }
Int_t ModifyWorkerLists(const char *ord, Bool_t add, Bool_t save);
Int_t RestoreActiveList();
void SaveActiveList();
Bool_t IsSync() const { return fSync; }
void InterruptCurrentMonitor();
void SetRunStatus(ERunStatus rst) { fRunStatus = rst; }
void MarkBad(TSlave *wrk, const char *reason = 0);
void MarkBad(TSocket *s, const char *reason = 0);
void TerminateWorker(TSlave *wrk);
void TerminateWorker(const char *ord);
void ActivateAsyncInput();
void DeActivateAsyncInput();
Int_t GetQueryReference(Int_t qry, TString &ref);
void PrintProgress(Long64_t total, Long64_t processed,
Float_t procTime = -1., Long64_t bytesread = -1);
Bool_t CreateMerger(TSlave *sl, Int_t port);
void RedirectWorker(TSocket *s, TSlave * sl, Int_t output_size);
Int_t GetActiveMergersCount();
Int_t FindNextFreeMerger();
void ResetMergers() { fMergersSet = kFALSE; }
void AskForOutput(TSlave *sl);
void FinalizationDone() { fFinalizationRunning = kFALSE; }
void ResetMergePrg();
void ParseConfigField(const char *config);
Bool_t Prompt(const char *p);
void ClearDataProgress(Int_t r, Int_t t);
static TList *GetDataSetSrvMaps(const TString &srvmaps);
protected:
TProof();
void InitMembers();
Int_t Init(const char *masterurl, const char *conffile,
const char *confdir, Int_t loglevel,
const char *alias = 0);
virtual Bool_t StartSlaves(Bool_t attach = kFALSE);
Int_t AddWorkers(TList *wrks);
Int_t RemoveWorkers(TList *wrks);
void SetupWorkersEnv(TList *wrks, Bool_t increasingpool = kFALSE);
void SetPlayer(TVirtualProofPlayer *player);
TVirtualProofPlayer *GetPlayer() const { return fPlayer; }
virtual TVirtualProofPlayer *MakePlayer(const char *player = 0, TSocket *s = 0);
void UpdateDialog();
void HandleLibIncPath(const char *what, Bool_t add, const char *dirs);
TList *GetListOfActiveSlaves() const { return fActiveSlaves; }
TSlave *CreateSlave(const char *url, const char *ord,
Int_t perf, const char *image, const char *workdir);
TSlave *CreateSubmaster(const char *url, const char *ord,
const char *image, const char *msd);
virtual Int_t PollForNewWorkers();
virtual void SaveWorkerInfo();
Int_t Collect(ESlaves list = kActive, Long_t timeout = -1, Int_t endtype = -1, Bool_t deactonfail = kFALSE);
Int_t Collect(TList *slaves, Long_t timeout = -1, Int_t endtype = -1, Bool_t deactonfail = kFALSE);
TList *GetEnabledPackages() const { return fEnabledPackagesOnCluster; }
void SetDSet(TDSet *dset) { fDSet = dset; }
virtual void ValidateDSet(TDSet *dset);
Int_t VerifyDataSetParallel(const char *uri, const char *optStr);
TPluginHandler *GetProgressDialog() const { return fProgressDialog; }
Int_t AssertPath(const char *path, Bool_t writable);
Int_t GetSandbox(TString &sb, Bool_t assert = kFALSE, const char *rc = 0);
void PrepareInputDataFile(TString &dataFile);
virtual void SendInputDataFile();
Int_t SendFile(const char *file, Int_t opt = (kBinary | kForward | kCp | kCpBin),
const char *rfile = 0, TSlave *sl = 0);
void SetFeedback(TString &opt, TString &optfb, Int_t action);
Int_t HandleOutputOptions(TString &opt, TString &target, Int_t action);
static void *SlaveStartupThread(void *arg);
static Int_t AssertDataSet(TDSet *dset, TList *input,
TDataSetManager *mgr, TString &emsg);
static Int_t GetInputData(TList *input, const char *cachedir, TString &emsg);
static Int_t SaveInputData(TQueryResult *qr, const char *cachedir, TString &emsg);
static Int_t SendInputData(TQueryResult *qr, TProof *p, TString &emsg);
static Bool_t GetFileInCmd(const char *cmd, TString &fn);
static void SystemCmd(const char *cmd, Int_t fdout);
public:
TProof(const char *masterurl, const char *conffile = kPROOF_ConfFile,
const char *confdir = kPROOF_ConfDir, Int_t loglevel = 0,
const char *alias = 0, TProofMgr *mgr = 0);
virtual ~TProof();
void cd(Int_t id = -1);
Int_t Ping();
void Touch();
Int_t Exec(const char *cmd, Bool_t plusMaster = kFALSE);
Int_t Exec(const char *cmd, const char *ord, Bool_t logtomacro = kFALSE);
TString Getenv(const char *env, const char *ord = "0");
Int_t GetRC(const char *RCenv, Int_t &env, const char *ord = "0");
Int_t GetRC(const char *RCenv, Double_t &env, const char *ord = "0");
Int_t GetRC(const char *RCenv, TString &env, const char *ord = "0");
virtual Long64_t Process(TDSet *dset, const char *selector,
Option_t *option = "", Long64_t nentries = -1,
Long64_t firstentry = 0);
virtual Long64_t Process(TFileCollection *fc, const char *selector,
Option_t *option = "", Long64_t nentries = -1,
Long64_t firstentry = 0);
virtual Long64_t Process(const char *dsetname, const char *selector,
Option_t *option = "", Long64_t nentries = -1,
Long64_t firstentry = 0, TObject *enl = 0);
virtual Long64_t Process(const char *selector, Long64_t nentries,
Option_t *option = "");
virtual Long64_t Process(TDSet *dset, TSelector *selector,
Option_t *option = "", Long64_t nentries = -1,
Long64_t firstentry = 0);
virtual Long64_t Process(TFileCollection *fc, TSelector *selector,
Option_t *option = "", Long64_t nentries = -1,
Long64_t firstentry = 0);
virtual Long64_t Process(const char *dsetname, TSelector *selector,
Option_t *option = "", Long64_t nentries = -1,
Long64_t firstentry = 0, TObject *enl = 0);
virtual Long64_t Process(TSelector *selector, Long64_t nentries,
Option_t *option = "");
virtual Long64_t DrawSelect(TDSet *dset, const char *varexp,
const char *selection = "",
Option_t *option = "", Long64_t nentries = -1,
Long64_t firstentry = 0);
Long64_t DrawSelect(const char *dsetname, const char *varexp,
const char *selection = "",
Option_t *option = "", Long64_t nentries = -1,
Long64_t firstentry = 0, TObject *enl = 0);
Int_t Archive(Int_t query, const char *url);
Int_t Archive(const char *queryref, const char *url = 0);
Int_t CleanupSession(const char *sessiontag);
Long64_t Finalize(Int_t query = -1, Bool_t force = kFALSE);
Long64_t Finalize(const char *queryref, Bool_t force = kFALSE);
Int_t Remove(Int_t query, Bool_t all = kFALSE);
Int_t Remove(const char *queryref, Bool_t all = kFALSE);
Int_t Retrieve(Int_t query, const char *path = 0);
Int_t Retrieve(const char *queryref, const char *path = 0);
void DisableGoAsyn();
void GoAsynchronous();
void StopProcess(Bool_t abort, Int_t timeout = -1);
void Browse(TBrowser *b);
virtual Int_t Echo(const TObject *obj);
virtual Int_t Echo(const char *str);
Int_t SetParallel(Int_t nodes = -1, Bool_t random = kFALSE);
void SetLogLevel(Int_t level, UInt_t mask = TProofDebug::kAll);
void Close(Option_t *option="");
virtual void Print(Option_t *option="") const;
virtual void ShowCache(Bool_t all = kFALSE);
virtual void ClearCache(const char *file = 0);
TList *GetListOfPackages();
TList *GetListOfEnabledPackages();
void ShowPackages(Bool_t all = kFALSE, Bool_t redirlog = kFALSE);
void ShowEnabledPackages(Bool_t all = kFALSE);
Int_t ClearPackages();
Int_t ClearPackage(const char *package);
Int_t DownloadPackage(const char *par, const char *dstdir = 0);
Int_t EnablePackage(const char *package, Bool_t notOnClient = kFALSE, TList *workers = 0);
Int_t EnablePackage(const char *package, const char *loadopts,
Bool_t notOnClient = kFALSE, TList *workers = 0);
Int_t EnablePackage(const char *package, TList *loadopts,
Bool_t notOnClient = kFALSE, TList *workers = 0);
Int_t UploadPackage(const char *par, EUploadPackageOpt opt = kUntar, TList *workers = 0);
virtual Int_t Load(const char *macro, Bool_t notOnClient = kFALSE, Bool_t uniqueOnly = kTRUE,
TList *wrks = 0);
Int_t AddDynamicPath(const char *libpath, Bool_t onClient = kFALSE, TList *wrks = 0, Bool_t doCollect = kTRUE);
Int_t AddIncludePath(const char *incpath, Bool_t onClient = kFALSE, TList *wrks = 0, Bool_t doCollect = kTRUE);
Int_t RemoveDynamicPath(const char *libpath, Bool_t onClient = kFALSE);
Int_t RemoveIncludePath(const char *incpath, Bool_t onClient = kFALSE);
Int_t UploadDataSet(const char *, TList *, const char * = 0, Int_t = 0, TList * = 0);
Int_t UploadDataSet(const char *, const char *, const char * = 0, Int_t = 0, TList * = 0);
Int_t UploadDataSetFromFile(const char *, const char *, const char * = 0, Int_t = 0, TList * = 0);
virtual Bool_t RegisterDataSet(const char *name,
TFileCollection *dataset, const char* optStr = "");
virtual TMap *GetDataSets(const char *uri = "", const char* optStr = "");
virtual void ShowDataSets(const char *uri = "", const char* optStr = "");
TMap *GetDataSetQuota(const char* optStr = "");
void ShowDataSetQuota(Option_t* opt = 0);
virtual Bool_t ExistsDataSet(const char *dataset);
void ShowDataSet(const char *dataset = "", const char* opt = "filter:SsCc");
virtual Int_t RemoveDataSet(const char *dataset, const char* optStr = "");
virtual Int_t VerifyDataSet(const char *dataset, const char* optStr = "");
virtual TFileCollection *GetDataSet(const char *dataset, const char* optStr = "");
TList *FindDataSets(const char *searchString, const char* optStr = "");
virtual Bool_t RequestStagingDataSet(const char *dataset);
virtual TFileCollection *GetStagingStatusDataSet(const char *dataset);
virtual void ShowStagingStatusDataSet(const char *dataset, const char *optStr = "filter:SsCc");
virtual Bool_t CancelStagingDataSet(const char *dataset);
virtual Int_t SetDataSetTreeName( const char *dataset, const char *treename);
virtual void ShowDataSetCache(const char *dataset = 0);
virtual void ClearDataSetCache(const char *dataset = 0);
virtual void ShowData();
void ClearData(UInt_t what = kUnregistered, const char *dsname = 0);
const char *GetMaster() const { return fMaster; }
const char *GetConfDir() const { return fConfDir; }
const char *GetConfFile() const { return fConfFile; }
const char *GetUser() const { return fUrl.GetUser(); }
const char *GetGroup() const { return fGroup; }
const char *GetWorkDir() const { return fWorkDir; }
const char *GetSessionTag() const { return GetName(); }
const char *GetImage() const { return fImage; }
const char *GetUrl() { return fUrl.GetUrl(); }
Int_t GetPort() const { return fUrl.GetPort(); }
Int_t GetRemoteProtocol() const { return fProtocol; }
Int_t GetClientProtocol() const { return kPROOF_Protocol; }
Int_t GetStatus() const { return fStatus; }
Int_t GetLogLevel() const { return fLogLevel; }
Int_t GetParallel() const;
Int_t GetSeqNum() const { return fSeqNum; }
Int_t GetSessionID() const { return fSessionID; }
TList *GetListOfSlaveInfos();
Bool_t UseDynamicStartup() const { return fDynamicStartup; }
EQueryMode GetQueryMode(Option_t *mode = 0) const;
void SetQueryMode(EQueryMode mode);
void SetRealTimeLog(Bool_t on = kTRUE);
void GetStatistics(Bool_t verbose = kFALSE);
Long64_t GetBytesRead() const { return fBytesRead; }
Float_t GetRealTime() const { return fRealTime; }
Float_t GetCpuTime() const { return fCpuTime; }
Bool_t IsLite() const { return (fServType == TProofMgr::kProofLite) ? kTRUE : kFALSE; }
Bool_t IsProofd() const { return (fServType == TProofMgr::kProofd) ? kTRUE : kFALSE; }
Bool_t IsFolder() const { return kTRUE; }
Bool_t IsMaster() const { return fMasterServ; }
Bool_t IsValid() const { return fValid; }
Bool_t IsTty() const { return fTty; }
Bool_t IsParallel() const { return GetParallel() > 0 ? kTRUE : kFALSE; }
Bool_t IsIdle() const { return (fNotIdle <= 0) ? kTRUE : kFALSE; }
Bool_t IsWaiting() const { return fIsWaiting; }
ERunStatus GetRunStatus() const { return fRunStatus; }
TList *GetLoadedMacros() const { return fLoadedMacros; }
void SetParameter(const char *par, const char *value);
void SetParameter(const char *par, Int_t value);
void SetParameter(const char *par, Long_t value);
void SetParameter(const char *par, Long64_t value);
void SetParameter(const char *par, Double_t value);
TObject *GetParameter(const char *par) const;
void DeleteParameters(const char *wildcard);
void ShowParameters(const char *wildcard = "PROOF_*") const;
void AddInput(TObject *obj);
void ClearInput();
TList *GetInputList();
TObject *GetOutput(const char *name);
TList *GetOutputList();
static TObject *GetOutput(const char *name, TList *out);
void ShowMissingFiles(TQueryResult *qr = 0);
TFileCollection *GetMissingFiles(TQueryResult *qr = 0);
void AddInputData(TObject *obj, Bool_t push = kFALSE);
void SetInputDataFile(const char *datafile);
void ClearInputData(TObject *obj = 0);
void ClearInputData(const char *name);
void AddFeedback(const char *name);
void RemoveFeedback(const char *name);
void ClearFeedback();
void ShowFeedback() const;
TList *GetFeedbackList() const;
virtual TList *GetListOfQueries(Option_t *opt = "");
Int_t GetNumberOfQueries();
Int_t GetNumberOfDrawQueries() { return fDrawQueries; }
TList *GetQueryResults();
TQueryResult *GetQueryResult(const char *ref = 0);
void GetMaxQueries();
void SetMaxDrawQueries(Int_t max);
void ShowQueries(Option_t *opt = "");
Bool_t IsDataReady(Long64_t &totalbytes, Long64_t &bytesready);
void SetActive(Bool_t = kTRUE) { }
void LogMessage(const char *msg, Bool_t all);
void Progress(Long64_t total, Long64_t processed);
void Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
Float_t initTime, Float_t procTime,
Float_t evtrti, Float_t mbrti);
void Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
Float_t initTime, Float_t procTime,
Float_t evtrti, Float_t mbrti,
Int_t actw, Int_t tses, Float_t eses);
void Feedback(TList *objs);
void QueryResultReady(const char *ref);
void CloseProgressDialog();
void ResetProgressDialog(const char *sel, Int_t sz,
Long64_t fst, Long64_t ent);
void StartupMessage(const char *msg, Bool_t status, Int_t done,
Int_t total);
void DataSetStatus(const char *msg, Bool_t status,
Int_t done, Int_t total);
void SendDataSetStatus(const char *msg, UInt_t n, UInt_t tot, Bool_t st);
void GetLog(Int_t start = -1, Int_t end = -1);
TMacro *GetLastLog();
void PutLog(TQueryResult *qr);
void ShowLog(Int_t qry = -1);
void ShowLog(const char *queryref);
Bool_t SendingLogToWindow() const { return fLogToWindowOnly; }
void SendLogToWindow(Bool_t mode) { fLogToWindowOnly = mode; }
TMacro *GetMacroLog() { return &fMacroLog; }
void ResetProgressDialogStatus() { fProgressDialogStarted = kFALSE; }
virtual TTree *GetTreeHeader(TDSet *tdset);
TList *GetOutputNames();
void AddChain(TChain *chain);
void RemoveChain(TChain *chain);
TDrawFeedback *CreateDrawFeedback();
void SetDrawFeedbackOption(TDrawFeedback *f, Option_t *opt);
void DeleteDrawFeedback(TDrawFeedback *f);
void Detach(Option_t *opt = "");
virtual void SetAlias(const char *alias="");
TProofMgr *GetManager() { return fManager; }
void SetManager(TProofMgr *mgr);
Int_t ActivateWorker(const char *ord, Bool_t save = kTRUE);
Int_t DeactivateWorker(const char *ord, Bool_t save = kTRUE);
const char *GetDataPoolUrl() const { return fManager ? fManager->GetMssUrl() : 0; }
void SetDataPoolUrl(const char *url) { if (fManager) fManager->SetMssUrl(url); }
void SetPrintProgress(PrintProgress_t pp) { fPrintProgress = pp; }
void SetProgressDialog(Bool_t on = kTRUE);
Int_t SavePerfTree(const char *pf = 0, const char *qref = 0);
void SetPerfTree(const char *pf = "perftree.root", Bool_t withWrks = kFALSE);
static TProof *Open(const char *url = 0, const char *conffile = 0,
const char *confdir = 0, Int_t loglevel = 0);
static void LogViewer(const char *url = 0, Int_t sessionidx = 0);
static TProofMgr *Mgr(const char *url);
static void Reset(const char *url, Bool_t hard = kFALSE);
static void AddEnvVar(const char *name, const char *value);
static void DelEnvVar(const char *name);
static const TList *GetEnvVars();
static void ResetEnvVars();
static Int_t GetParameter(TCollection *c, const char *par, TString &value);
static Int_t GetParameter(TCollection *c, const char *par, Int_t &value);
static Int_t GetParameter(TCollection *c, const char *par, Long_t &value);
static Int_t GetParameter(TCollection *c, const char *par, Long64_t &value);
static Int_t GetParameter(TCollection *c, const char *par, Double_t &value);
ClassDef(TProof,0)
};
R__EXTERN TProof *gProof;
#endif