#include "RConfigure.h"
#include "TProofBench.h"
#include "Getline.h"
#include "TProofBenchRunCPU.h"
#include "TProofBenchRunDataRead.h"
#include "TProofBenchDataSet.h"
#include "TProofNodes.h"
#include "TClass.h"
#include "TFile.h"
#include "TFileCollection.h"
#include "TFileInfo.h"
#include "THashList.h"
#include "TKey.h"
#include "TObjString.h"
#include "TProof.h"
#include "TROOT.h"
#include "TUrl.h"
#include "TCanvas.h"
#include "TGraphErrors.h"
#include "TH1F.h"
#include "TMath.h"
#include "TProfile.h"
#include "TStyle.h"
ClassImp(TProofBench)
TProofBench::TProofBench(const char *url, const char *outfile, const char *proofopt)
: fUnlinkOutfile(kFALSE), fProofDS(0), fOutFile(0),
fNtries(4), fHistType(0), fNHist(16), fReadType(0),
fDataSet("BenchDataSet"), fNFilesWrk(4),
fDataGenSel(kPROOF_BenchSelDataGenDef),
fRunCPU(0), fRunDS(0), fDS(0), fDebug(kFALSE)
{
SetBit(kInvalidObject);
if (!url) {
Error("TProofBench", "specifying a PROOF master url is mandatory - cannot continue");
return;
}
if (!(fProof = TProof::Open(url, proofopt)) || (fProof && !fProof->IsValid())) {
Error("TProofBench", "could not open a valid PROOF session - cannot continue");
return;
}
fProofDS = fProof;
ResetBit(kInvalidObject);
if (SetOutFile(outfile, kFALSE) != 0)
Warning("TProofBench", "problems opening '%s' - ignoring: use SetOutFile to try"
" again or with another file", outfile);
}
TProofBench::~TProofBench()
{
CloseOutFile();
if (fUnlinkOutfile) gSystem->Unlink(fOutFileName);
SafeDelete(fReadType);
SafeDelete(fRunCPU);
SafeDelete(fRunDS);
}
Int_t TProofBench::OpenOutFile(Bool_t wrt, Bool_t verbose)
{
if (fOutFile && fOutFile->IsZombie()) SafeDelete(fOutFile);
Int_t rc = 0;
if (!fOutFile && fOutFileName.Length() > 0) {
const char *mode = 0;
if (wrt)
mode = gSystem->AccessPathName(fOutFileName) ? "RECREATE" : "UPDATE";
else
mode = "READ";
if (!(fOutFile = TFile::Open(fOutFileName, mode)) || (fOutFile && fOutFile->IsZombie())) {
if (verbose)
Warning("OpenOutFile", "problems opening '%s' - ignoring: use SetOutFile to try"
" again or with another file", fOutFileName.Data());
rc = -1;
}
if (fOutFile) gROOT->GetListOfFiles()->Remove(fOutFile);
}
return rc;
}
Int_t TProofBench::SetOutFile(const char *outfile, Bool_t verbose)
{
Int_t rc = 0;
if (fOutFile) {
if (!fOutFile->IsZombie()) fOutFile->Close();
SafeDelete(fOutFile);
}
fOutFileName = outfile;
if (fOutFileName == "<default>") {
TDatime dat;
const char *lite = (fProof->IsLite()) ? "-lite" : "";
fOutFileName.Form("proofbench-%s%s-%dw-%d-%.2d%.2d.root",
fProof->GetMaster(), lite, fProof->GetParallel(),
dat.GetDate(), dat.GetHour(), dat.GetMinute());
Info("SetOutFile", "using default output file: '%s'", fOutFileName.Data());
fUnlinkOutfile = kTRUE;
}
if (!fOutFileName.IsNull()) {
if ((rc = OpenOutFile(kTRUE, kFALSE)) != 0 && verbose)
Warning("SetOutFile", "problems opening '%s' - ignoring: use SetOutFile to try"
" again or with another file", outfile);
}
return rc;
}
void TProofBench::CloseOutFile()
{
if (SetOutFile(0) != 0)
Warning("CloseOutFile", "problems closing '%s'", fOutFileName.Data());
}
Int_t TProofBench::RunCPU(Long64_t nevents, Int_t start, Int_t stop, Int_t step)
{
if (OpenOutFile(kTRUE) != 0) {
Error("RunCPU", "problems opening '%s' to save the result", fOutFileName.Data());
return -1;
}
fUnlinkOutfile = kFALSE;
SafeDelete(fRunCPU);
TPBHistType htype(TPBHistType::kHist1D);
fRunCPU = new TProofBenchRunCPU(&htype, fNHist, fOutFile);
if (!fCPUSel.IsNull()) fRunCPU->SetSelName(fCPUSel);
if (!fCPUPar.IsNull()) fRunCPU->SetParList(fCPUPar);
fRunCPU->Run(nevents, start, stop, step, fNtries, fDebug, -1);
if (SetOutFile(0) != 0)
Warning("RunCPU", "problems closing '%s'", fOutFileName.Data());
return 0;
}
Int_t TProofBench::RunCPUx(Long64_t nevents, Int_t start, Int_t stop)
{
if (OpenOutFile(kTRUE) != 0) {
Error("RunCPUx", "problems opening '%s' to save the result", fOutFileName.Data());
return -1;
}
fUnlinkOutfile = kFALSE;
SafeDelete(fRunCPU);
TPBHistType htype(TPBHistType::kHist1D);
fRunCPU = new TProofBenchRunCPU(&htype, fNHist, fOutFile);
if (!fCPUSel.IsNull()) fRunCPU->SetSelName(fCPUSel);
if (!fCPUPar.IsNull()) fRunCPU->SetParList(fCPUPar);
fRunCPU->Run(nevents, start, stop, -2, fNtries, fDebug, -1);
if (SetOutFile(0) != 0)
Warning("RunCPUx", "problems closing '%s'", fOutFileName.Data());
return 0;
}
void TProofBench::DrawCPU(const char *outfile, const char *opt)
{
TFile *fout = TFile::Open(outfile, "READ");
if (!fout || (fout && fout->IsZombie())) {
::Error("DrawCPU", "could not open file '%s' ...", outfile);
return;
}
TString oo(opt);
const char *dirn = (oo.Contains("x:")) ? "RunCPUx" : "RunCPU";
TDirectory *d = (TDirectory *) fout->Get(dirn);
if (!d) {
::Error("DrawCPU", "could not find directory 'RunCPU' ...");
fout->Close();
delete fout;
return;
}
d->cd();
TString hprofn;
if (!strcmp(opt, "std:")) {
hprofn = "Prof_CPU_QR_Evts";
} else if (!strcmp(opt, "stdx:")) {
hprofn = "Prof_x_CPU_QR_Evts";
} else if (!strcmp(opt, "norm:")) {
hprofn = "Norm_CPU_QR_Evts";
} else if (!strcmp(opt, "normx:")) {
hprofn = "Norm_x_CPU_QR_Evts";
} else {
::Error("DrawCPU", "unknown option '%s'", opt);
fout->Close();
delete fout;
return;
}
TProfile *pf = 0;
TList *keylist = d->GetListOfKeys();
TKey *key = 0;
TIter nxk(keylist);
while ((key = (TKey *) nxk())) {
if (TString(key->GetName()).BeginsWith(hprofn)) {
pf = (TProfile *) d->Get(key->GetName());
break;
}
}
if (!pf) {
::Error("DrawCPU", "could not find '%s' ...", hprofn.Data());
fout->Close();
delete fout;
return;
}
Int_t nbins = pf->GetNbinsX();
TGraphErrors *gr = new TGraphErrors(nbins);
Double_t xx, ex, yy, ey, ymi = pf->GetBinContent(1), ymx = ymi;
Int_t k =1;
for (;k <= nbins; k++) {
xx = pf->GetBinCenter(k);
ex = pf->GetBinWidth(k) * .001;
yy = pf->GetBinContent(k);
ey = pf->GetBinError(k);
if (k == 1) {
ymi = yy;
ymx = yy;
} else {
if (yy < ymi) ymi = yy;
if (yy > ymx) ymx = yy;
}
gr->SetPoint(k-1, xx, yy);
gr->SetPointError(k-1, ex, ey);
}
TCanvas *cpu = new TCanvas("cpu", "Rate vs wrks",204,69,1050,502);
cpu->Range(-3.106332,0.7490716,28.1362,1.249867);
gStyle->SetOptTitle(0);
gr->SetFillColor(1);
gr->SetLineColor(13);
gr->SetMarkerStyle(21);
gr->SetMarkerSize(1.2);
TH1F *hgr = new TH1F("Graph-CPU"," CPU speed-up", nbins*4,0,nbins+1);
hgr->SetMaximum(ymx + (ymx-ymi)*0.2);
hgr->SetMinimum(0);
hgr->SetDirectory(0);
hgr->SetStats(0);
hgr->GetXaxis()->SetTitle(pf->GetXaxis()->GetTitle());
hgr->GetXaxis()->CenterTitle(true);
hgr->GetXaxis()->SetLabelSize(0.05);
hgr->GetXaxis()->SetTitleSize(0.06);
hgr->GetXaxis()->SetTitleOffset(0.62);
hgr->GetYaxis()->SetLabelSize(0.06);
gr->SetHistogram(hgr);
gr->Print();
gr->Draw("alp");
fout->Close();
}
Int_t TProofBench::RunDataSet(const char *dset,
Int_t start, Int_t stop, Int_t step)
{
if (OpenOutFile(kTRUE) != 0) {
Error("RunDataSet", "problems opening '%s' to save the result", fOutFileName.Data());
return -1;
}
fUnlinkOutfile = kFALSE;
ReleaseCache(dset);
SafeDelete(fRunDS);
TPBReadType *readType = fReadType;
if (!readType) readType = new TPBReadType(TPBReadType::kReadOpt);
fRunDS = new TProofBenchRunDataRead(fDS, readType, fOutFile);
if (!fDataSel.IsNull()) fRunDS->SetSelName(fDataSel);
if (!fDataPar.IsNull()) fRunDS->SetParList(fDataPar);
fRunDS->Run(dset, start, stop, step, fNtries, fDebug, -1);
if (!fReadType) SafeDelete(readType);
if (SetOutFile(0) != 0)
Warning("RunDataSet", "problems closing '%s'", fOutFileName.Data());
return 0;
}
Int_t TProofBench::RunDataSetx(const char *dset, Int_t start, Int_t stop)
{
if (OpenOutFile(kTRUE) != 0) {
Error("RunDataSetx", "problems opening '%s' to save the result", fOutFileName.Data());
return -1;
}
fUnlinkOutfile = kFALSE;
ReleaseCache(dset);
SafeDelete(fRunDS);
TPBReadType *readType = fReadType;
if (!readType) readType = new TPBReadType(TPBReadType::kReadOpt);
fRunDS = new TProofBenchRunDataRead(fDS, readType, fOutFile);
if (!fDataSel.IsNull()) fRunDS->SetSelName(fDataSel);
if (!fDataPar.IsNull()) fRunDS->SetParList(fDataPar);
fRunDS->Run(dset, start, stop, -2, fNtries, fDebug, -1);
if (!fReadType) SafeDelete(readType);
if (SetOutFile(0) != 0)
Warning("RunDataSetx", "problems closing '%s'", fOutFileName.Data());
return 0;
}
void TProofBench::DrawDataSet(const char *outfile, const char *opt, const char *type)
{
TFile *fout = TFile::Open(outfile, "READ");
if (!fout || (fout && fout->IsZombie())) {
::Error("DrawDataSet", "could not open file '%s' ...", outfile);
return;
}
TString oo(opt);
const char *dirn = (oo.Contains("x:")) ? "RunDataReadx" : "RunDataRead";
TDirectory *d = (TDirectory *) fout->Get(dirn);
if (!d) {
::Error("DrawDataSet", "could not find directory 'RunDataRead' ...");
fout->Close();
delete fout;
return;
}
d->cd();
TString hprofn, typ("QR_IO");
if (type && !strcmp(type, "evts")) typ = "QR_Evts";
if (!strcmp(opt, "std:")) {
hprofn.Form("Prof_DataRead_%s", typ.Data());
} else if (!strcmp(opt, "stdx:")) {
hprofn.Form("Prof_x_DataRead_%s", typ.Data());
} else if (!strcmp(opt, "norm:")) {
hprofn.Form("Norm_DataRead_%s", typ.Data());
} else if (!strcmp(opt, "absx:")) {
hprofn.Form("Norm_x_DataRead_%s", typ.Data());
} else {
::Error("DrawDataSet", "unknown option '%s'", opt);
fout->Close();
delete fout;
return;
}
TProfile *pf = 0;
TList *keylist = d->GetListOfKeys();
TKey *key = 0;
TIter nxk(keylist);
while ((key = (TKey *) nxk())) {
if (TString(key->GetName()).BeginsWith(hprofn)) {
pf = (TProfile *) d->Get(key->GetName());
break;
}
}
if (!pf) {
::Error("DrawDataSet", "could not find '%s' ...", hprofn.Data());
fout->Close();
delete fout;
return;
}
Int_t nbins = pf->GetNbinsX();
TGraphErrors *gr = new TGraphErrors(nbins);
Double_t xx, ex, yy, ey, ymi = pf->GetBinContent(1), ymx = ymi;
Int_t k =1;
for (;k <= nbins; k++) {
xx = pf->GetBinCenter(k);
ex = pf->GetBinWidth(k) * .001;
yy = pf->GetBinContent(k);
ey = pf->GetBinError(k);
if (k == 1) {
ymi = yy;
ymx = yy;
} else {
if (yy < ymi) ymi = yy;
if (yy > ymx) ymx = yy;
}
gr->SetPoint(k-1, xx, yy);
gr->SetPointError(k-1, ex, ey);
Printf("%d %f %f", (Int_t)xx, yy, ey);
}
TCanvas *cpu = new TCanvas("dataset", "Rate vs wrks",204,69,1050,502);
cpu->Range(-3.106332,0.7490716,28.1362,1.249867);
gStyle->SetOptTitle(0);
gr->SetFillColor(1);
gr->SetLineColor(13);
gr->SetMarkerStyle(21);
gr->SetMarkerSize(1.2);
TH1F *hgr = new TH1F("Graph-DataSet"," Data Read speed-up", nbins*4,0,nbins+1);
hgr->SetMaximum(ymx + (ymx-ymi)*0.2);
hgr->SetMinimum(0);
hgr->SetDirectory(0);
hgr->SetStats(0);
hgr->GetXaxis()->SetTitle(pf->GetXaxis()->GetTitle());
hgr->GetXaxis()->CenterTitle(true);
hgr->GetXaxis()->SetLabelSize(0.05);
hgr->GetXaxis()->SetTitleSize(0.06);
hgr->GetXaxis()->SetTitleOffset(0.62);
hgr->GetYaxis()->SetLabelSize(0.06);
gr->SetHistogram(hgr);
gr->Print();
gr->Draw("alp");
fout->Close();
}
Int_t TProofBench::ReleaseCache(const char *dset)
{
if (!fDS) fDS = new TProofBenchDataSet(fProofDS);
return fDS ? fDS->ReleaseCache(dset) : -1;
}
Int_t TProofBench::RemoveDataSet(const char *dset)
{
if (!fDS) fDS = new TProofBenchDataSet(fProofDS);
return fDS ? fDS->RemoveFiles(dset) : -1;
}
Int_t TProofBench::MakeDataSet(const char *dset, Long64_t nevt, const char *fnroot,
Bool_t regenerate)
{
if (dset && strlen(dset) > 0) fDataSet = dset;
if (!TClass::GetClass(fDataGenSel)) {
if (fDataGenSel == kPROOF_BenchSelDataGenDef) {
#ifdef R__HAVE_CONFIG
TString par = TString::Format("%s/%s%s.par", ROOTETCDIR, kPROOF_BenchParDir, kPROOF_BenchDataSelPar);
#else
TString par = TString::Format("$ROOTSYS/etc/%s%s.par", kPROOF_BenchParDir, kPROOF_BenchDataSelPar);
#endif
Info("MakeDataSet", "uploading '%s' ...", par.Data());
if (fProof->UploadPackage(par) != 0) {
Error("MakeDataSet", "problems uploading '%s' - cannot continue", par.Data());
return -1;
}
Info("MakeDataSet", "enabling '%s' ...", kPROOF_BenchDataSelPar);
if (fProof->EnablePackage(kPROOF_BenchDataSelPar) != 0) {
Error("MakeDataSet", "problems enabling '%s' - cannot continue", kPROOF_BenchDataSelPar);
return -1;
}
} else {
if (fDataGenPar.IsNull()) {
Error("MakeDataSet", "you should load the class '%s' before running the benchmark", fDataGenSel.Data());
return -1;
}
}
TString par;
Int_t from = 0;
while (fDataGenPar.Tokenize(par, from, ",")) {
Info("MakeDataSet", "Uploading '%s' ...", par.Data());
if (fProof->UploadPackage(par) != 0) {
Error("MakeDataSet", "problems uploading '%s' - cannot continue", par.Data());
return -1;
}
Info("MakeDataSet", "Enabling '%s' ...", par.Data());
if (fProof->EnablePackage(par) != 0) {
Error("MakeDataSet", "problems enabling '%s' - cannot continue", par.Data());
return -1;
}
}
if (!TClass::GetClass(fDataGenSel)) {
Error("MakeDataSet", "failed to load '%s'", fDataGenSel.Data());
return -1;
}
}
TString fn, fnr = (fnroot && strlen(fnroot) > 0) ? fnroot : "event";
TProofNodes pn(fProof);
TMap *filesmap = new TMap;
TMap *nodesmap = pn.GetMapOfNodes();
TIter nxnd(nodesmap);
TList *wli = 0;
TObject *obj = 0;
Int_t kf = 1;
while ((obj = nxnd()) != 0) {
if ((wli = dynamic_cast<TList *>(nodesmap->GetValue(obj)))) {
THashList *fli = new THashList;
Int_t nf = wli->GetSize() * fNFilesWrk;
TSlaveInfo *wi = (TSlaveInfo *) wli->First();
while (nf--) {
fn.Form("%s-%s-%d.root", fnr.Data(), wi->GetName(), kf++);
fli->Add(new TObjString(fn));
}
filesmap->Add(new TObjString(obj->GetName()), fli);
}
}
filesmap->Print();
filesmap->SetName("PROOF_FilesToProcess");
fProof->AddInput(filesmap);
TString oldpack;
if (TProof::GetParameter(fProof->GetInputList(), "PROOF_Packetizer", oldpack) != 0) oldpack = "";
fProof->SetParameter("PROOF_Packetizer", "TPacketizerFile");
Int_t oldnotass = -1;
if (TProof::GetParameter(fProof->GetInputList(), "PROOF_ProcessNotAssigned", oldnotass) != 0) oldnotass = -1;
fProof->SetParameter("PROOF_ProcessNotAssigned", (Int_t)0);
Long64_t ne = (nevt > 0) ? nevt : 30000;
fProof->SetParameter("PROOF_BenchmarkNEvents", ne);
fProof->SetParameter("PROOF_BenchmarkRegenerate", Int_t(regenerate));
fProof->Process(fDataGenSel, (Long64_t) 1);
fProof->DeleteParameters("PROOF_BenchmarkNEvents");
fProof->DeleteParameters("PROOF_BenchmarkRegenerate");
if (!oldpack.IsNull())
fProof->SetParameter("PROOF_Packetizer", oldpack);
else
fProof->DeleteParameters("PROOF_Packetizer");
if (oldnotass != -1)
fProof->SetParameter("PROOF_ProcessNotAssigned", oldnotass);
else
fProof->DeleteParameters("PROOF_ProcessNotAssigned");
if (fProof->GetInputList()) fProof->GetInputList()->Remove(filesmap);
filesmap->SetOwner(kTRUE);
delete filesmap;
TFileCollection *fc = new TFileCollection("dum", "dum");
if (fProof->GetOutputList()) {
fProof->GetOutputList()->Print();
TIter nxout(fProof->GetOutputList());
while ((obj = nxout())) {
TList *fli = dynamic_cast<TList *>(obj);
if (fli && TString(fli->GetName()).BeginsWith("PROOF_FilesGenerated_")) {
TIter nxfg(fli);
TFileInfo *fi = 0;
while ((fi = (TFileInfo *) nxfg()))
fc->Add(fi);
fli->SetOwner(kFALSE);
}
}
fc->Update();
if (fc->GetNFiles() > 0) {
if (!(fProof->RegisterDataSet(fDataSet, fc, "OT")))
Warning("MakeDataSet", "problems registering '%s'", dset);
} else {
Warning("MakeDataSet", "dataset '%s' is empty!", dset);
}
} else {
Warning("MakeDataSet", "PROOF output list is empty!");
}
SafeDelete(fc);
fc = fProof->GetDataSet(fDataSet);
fc->Print("F");
SafeDelete(fc);
return 0;
}
Int_t TProofBench::CopyDataSet(const char *dset, const char *dsetdst, const char *destdir)
{
if (!fProof) {
Error("CopyDataSet", "no PROOF found - cannot continue");
return -1;
}
if (!dset || (dset && !fProof->ExistsDataSet(dset))) {
Error("CopyDataSet", "dataset '%s' does not exist", dset);
return -1;
}
if (!dsetdst || (dsetdst && fProof->ExistsDataSet(dsetdst))) {
if (isatty(0) != 0 && isatty(1) != 0) {
Printf("Target dataset '%s' exists already:"
" do you want to remove it first?", dsetdst);
const char *a = Getline("[Y,n] ");
Printf("a: %s", a);
if (a[0] == 'Y' || a[0] == 'y' || a[0] == '\n') {
Info("CopyDataSet", "removing dataset '%s' ...", dsetdst);
RemoveDataSet(dsetdst);
} else {
return -1;
}
} else {
Error("CopyDataSet", "destination dataset '%s' does already exist: remove it first", dsetdst);
return -1;
}
}
TFileCollection *fc = fProof->GetDataSet(dset);
if (!fc) {
Error("CopyDataSet", "problems retrieving TFileCollection for dataset '%s'", dset);
return -1;
}
TFileCollection *fcn = new TFileCollection(dsetdst, "");
TString fn;
TFileInfo *fi = 0;
TIter nxfi(fc->GetList());
while ((fi = (TFileInfo *) nxfi())) {
fn.Form("%s/%s", destdir, gSystem->BaseName(fi->GetCurrentUrl()->GetFile()));
Info("CopyDataSet", "adding info for file '%s'", fn.Data());
fcn->Add(new TFileInfo(fn));
}
delete fc;
if (!fDS) fDS = new TProofBenchDataSet(fProofDS);
if (fDS->CopyFiles(dset, destdir) != 0) {
Error("CopyDataSet", "problems copying files of dataset '%s' to dest dir '%s'", dset, destdir);
delete fcn;
return -1;
}
Int_t rc = 0;
if (!(fProof->RegisterDataSet(dsetdst, fcn, "OT"))) {
Error("CopyDataSet", "problems registering and verifying '%s'", dsetdst);
rc = -1;
}
delete fcn;
return rc;
}
void TProofBench::SetProofDS(TProof *pds)
{
if (pds && !pds->IsValid()) {
Error("SetProofDS", "trying to set an invalid PROOF instance");
return;
}
fProofDS = pds ? pds : fProof;
if (fProofDS) {
SafeDelete(fDS);
fDS = new TProofBenchDataSet(fProofDS);
}
return;
}