#include "TProofPlayerLite.h"
#include "MessageTypes.h"
#include "TDSet.h"
#include "TDSetProxy.h"
#include "TEntryList.h"
#include "TEventList.h"
#include "THashList.h"
#include "TMap.h"
#include "TMessage.h"
#include "TObjString.h"
#include "TPerfStats.h"
#include "TProofLite.h"
#include "TProofDebug.h"
#include "TProofServ.h"
#include "TROOT.h"
#include "TSelector.h"
#include "TVirtualPacketizer.h"
Int_t TProofPlayerLite::MakeSelector(const char *selfile)
{
fSelectorClass = 0;
SafeDelete(fSelector);
if (!selfile || strlen(selfile) <= 0) {
Error("MakeSelector", "input file path or name undefined");
return -1;
}
if (!strchr(gSystem->BaseName(selfile), '.')) {
if (gDebug > 1)
Info("MakeSelector", "selector name '%s' does not contain a '.':"
" no file to check, it will be loaded from a library", selfile);
if (!(fSelector = TSelector::GetSelector(selfile))) {
Error("MakeSelector", "could not create a %s selector", selfile);
return -1;
}
return 0;
}
if (((TProofLite*)fProof)->CopyMacroToCache(selfile, 1, &fSelector, TProof::kCp | TProof::kCpBin) < 0)
return -1;
return 0;
}
Long64_t TProofPlayerLite::Process(TDSet *dset, TSelector *selector,
Option_t *option, Long64_t nentries,
Long64_t first)
{
if (!selector) {
Error("Process", "selector object undefined");
return -1;
}
if (selector != fSelector) {
if (fCreateSelObj) SafeDelete(fSelector);
fSelector = selector;
}
fCreateSelObj = kFALSE;
Long64_t rc = Process(dset, selector->ClassName(), option, nentries, first);
fCreateSelObj = kTRUE;
return rc;
}
Long64_t TProofPlayerLite::Process(TDSet *dset, const char *selector_file,
Option_t *option, Long64_t nentries,
Long64_t first)
{
PDB(kGlobal,1) Info("Process","Enter");
fDSet = dset;
fExitStatus = kFinished;
if (!fProgressStatus) {
Error("Process", "No progress status");
return -1;
}
fProgressStatus->Reset();
if (!fOutput)
fOutput = new THashList;
else
fOutput->Clear();
TPerfStats::Setup(fInput);
TPerfStats::Start(fInput, fOutput);
TStopwatch elapsed;
TMessage mesg(kPROOF_PROCESS);
TString fn(gSystem->BaseName(selector_file));
Bool_t sync = (fProof->GetQueryMode(option) == TProof::kSync);
if (fOutputLists) {
fOutputLists->Delete();
delete fOutputLists;
fOutputLists = 0;
}
if (!sync) {
gSystem->RedirectOutput(fProof->fLogFileName);
Printf(" ");
Info("Process","starting new query");
}
if (fCreateSelObj) {
if (MakeSelector(selector_file) != 0) {
if (!sync)
gSystem->RedirectOutput(0);
return -1;
}
}
fSelectorClass = fSelector->IsA();
TList *inputtmp = 0;
if (!fCreateSelObj) {
if (fSelector->GetInputList() && fSelector->GetInputList()->GetSize() > 0) {
TIter nxi(fSelector->GetInputList());
TObject *o = 0;
while ((o = nxi())) {
if (!fInput->FindObject(o)) {
fInput->Add(o);
if (!inputtmp) {
inputtmp = new TList;
inputtmp->SetOwner(kFALSE);
}
inputtmp->Add(o);
}
}
}
fInput->Add(fSelector);
}
fSelector->SetInputList(fInput);
fSelector->SetOption(option);
if (fSelector->GetOutputList()) fSelector->GetOutputList()->Clear();
PDB(kLoop,1) Info("Process","Call Begin(0)");
fSelector->Begin(0);
gProof->SendInputDataFile();
if (fInput->FindObject("PROOF_StatsHist") != 0) {
if (!(fProcPackets = (TH1 *) fOutput->FindObject("PROOF_ProcPcktHist"))) {
Warning("Process", "could not attach to histogram 'PROOF_ProcPcktHist'");
} else {
PDB(kLoop,1)
Info("Process", "attached to histogram 'PROOF_ProcPcktHist' to record"
" packets being processed");
}
}
PDB(kPacketizer,1) Info("Process","Create Proxy TDSet");
TDSet *set = new TDSetProxy(dset->GetType(), dset->GetObjName(),
dset->GetDirectory());
if (dset->TestBit(TDSet::kEmpty))
set->SetBit(TDSet::kEmpty);
fProof->SetParameter("PROOF_MaxSlavesPerNode", (Long_t) 0);
if (InitPacketizer(dset, nentries, first, "TPacketizerUnit", "TPacketizer") != 0) {
Error("Process", "cannot init the packetizer");
fExitStatus = kAborted;
return -1;
}
first = 0;
Int_t mrc = -1;
Long64_t memlogfreq = -1, mlf;
if ((mrc = TProof::GetParameter(fProof->GetInputList(), "PROOF_MemLogFreq", mlf)) == 0) memlogfreq = mlf;
if (mrc != 0 && gSystem->Getenv("PROOF_MEMLOGFREQ")) {
TString clf(gSystem->Getenv("PROOF_MEMLOGFREQ"));
if (clf.IsDigit()) { memlogfreq = clf.Atoi(); mrc = 0; }
}
if (memlogfreq == 0) {
memlogfreq = fPacketizer->GetTotalEntries()/(fProof->GetParallel()*100);
if (memlogfreq <= 0) memlogfreq = 1;
}
if (mrc == 0) fProof->SetParameter("PROOF_MemLogFreq", memlogfreq);
fProof->SetParameter("PROOF_QueryTag", fProof->GetName());
fProof->SetParameter("PROOF_QuerySeqNum", fProof->fSeqNum);
if (!sync)
gSystem->RedirectOutput(0);
TCleanup clean(this);
SetupFeedback();
TString opt = option;
Long64_t num = (fProof->IsParallel()) ? -1 : nentries;
Long64_t fst = (fProof->IsParallel()) ? -1 : first;
TEntryList *enl = (!fProof->IsMaster()) ? dynamic_cast<TEntryList *>(set->GetEntryList())
: (TEntryList *)0;
TEventList *evl = (!fProof->IsMaster() && !enl) ? dynamic_cast<TEventList *>(set->GetEntryList())
: (TEventList *)0;
fProof->ResetMergePrg();
PDB(kGlobal,1) Info("Process","Calling Broadcast");
if (fProcessMessage) delete fProcessMessage;
fProcessMessage = new TMessage(kPROOF_PROCESS);
mesg << set << fn << fInput << opt << num << fst << evl << sync << enl;
(*fProcessMessage) << set << fn << fInput << opt << num << fst << evl << sync << enl;
Int_t nb = fProof->Broadcast(mesg);
PDB(kGlobal,1) Info("Process", "Broadcast called: %d workers notified", nb);
fProof->fNotIdle += nb;
fProof->fRedirLog = kTRUE;
if (!sync) {
PDB(kGlobal,1) Info("Process","Asynchronous processing:"
" activating CollectInputFrom");
fProof->Activate();
return fProof->fSeqNum;
} else {
PDB(kGlobal,1) Info("Process","Synchronous processing: calling Collect");
fProof->Collect();
fProof->fRedirLog = kFALSE;
if (!TSelector::IsStandardDraw(fn))
HandleTimer(0);
if (fPacketizer) {
fPacketizer->StopProcess(kFALSE, kTRUE);
fPacketizer->SetBit(TVirtualPacketizer::kIsDone);
elapsed.Stop();
if (fQuery)
fQuery->SetProcessInfo(0, 0., fPacketizer->GetBytesRead(),
fPacketizer->GetInitTime(),
elapsed.RealTime());
}
StopFeedback();
Long64_t rc = -1;
if (GetExitStatus() != TProofPlayer::kAborted)
rc = Finalize(kFALSE, sync);
if (inputtmp) {
TIter nxi(inputtmp);
TObject *o = 0;
while ((o = nxi())) fInput->Remove(o);
SafeDelete(inputtmp);
}
return rc;
}
}
Long64_t TProofPlayerLite::Finalize(Bool_t force, Bool_t sync)
{
if (fOutputLists == 0) {
if (force && fQuery)
return fProof->Finalize(Form("%s:%s", fQuery->GetTitle(),
fQuery->GetName()), force);
}
Long64_t rv = 0;
TPerfStats::Stop();
if (!fQuery) {
Info("Finalize", "query is undefined!");
return -1;
}
MergeOutput();
if (fExitStatus != kAborted) {
if (!sync) {
if (ReinitSelector(fQuery) == -1) {
Info("Finalize", "problems reinitializing selector \"%s\"",
fQuery->GetSelecImp()->GetName());
return -1;
}
}
fSelector->SetInputList(fInput);
TList *output = fSelector->GetOutputList();
if (output) {
TIter next(fOutput);
while(TObject* obj = next()) {
if (fProof->IsParallel() || DrawCanvas(obj) == 1)
output->Add(obj);
}
} else {
Warning("Finalize", "undefined output list in the selector! Protocol error?");
}
SetSelectorDataMembersFromOutputList();
PDB(kLoop,1) Info("Finalize","Call Terminate()");
fOutput->Clear("nodelete");
SetMerging(kFALSE);
fProof->fQuerySTW.Reset();
fSelector->Terminate();
rv = fSelector->GetStatus();
TIter it(output);
while(TObject* o = it()) {
fOutput->Add(o);
}
if (fQuery) {
fQuery->SetOutputList(fOutput);
fQuery->SetFinalized();
} else {
Warning("Finalize","current TQueryResult object is undefined!");
}
if (!fCreateSelObj) {
fInput->Remove(fSelector);
fOutput->Remove(fSelector);
if (output) output->Remove(fSelector);
fSelector = 0;
}
if (output) output->SetOwner(kFALSE);
if (fCreateSelObj) SafeDelete(fSelector);
fOutput->SetOwner(kFALSE);
SafeDelete(fOutput);
} else {
fOutput->SetOwner();
SafeDelete(fSelector);
if (!fCreateSelObj) fSelector = 0;
}
PDB(kGlobal,1) Info("Finalize","exit");
return rv;
}
Bool_t TProofPlayerLite::HandleTimer(TTimer *)
{
PDB(kFeedback,2)
Info("HandleTimer","Entry: %p", fFeedbackTimer);
if (fFeedbackTimer == 0) return kFALSE;
TList *fb = new TList;
fb->SetOwner();
TIter next(fFeedback);
while( TObjString *name = (TObjString*) next() ) {
TObject *o = fOutput->FindObject(name->GetName());
if (o != 0) fb->Add(o->Clone());
}
if (fb->GetSize() > 0)
StoreFeedback(this, fb);
else
delete fb;
if (fFeedbackLists == 0) {
fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
return kFALSE;
}
fb = MergeFeedback();
Feedback(fb);
fb->SetOwner();
delete fb;
fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
return kFALSE;
}
void TProofPlayerLite::SetupFeedback()
{
fFeedback = (TList*) fInput->FindObject("FeedbackList");
if (fFeedback) {
PDB(kFeedback,1)
Info("SetupFeedback","\"FeedbackList\" found: %d objects", fFeedback->GetSize());
} else {
PDB(kFeedback,1)
Info("SetupFeedback","\"FeedbackList\" NOT found");
}
if (fFeedback == 0 || fFeedback->GetSize() == 0) return;
SafeDelete(fFeedbackTimer);
fFeedbackPeriod = 2000;
TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
fFeedbackTimer = new TTimer;
fFeedbackTimer->SetObject(this);
fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
}
void TProofPlayerLite::StoreFeedback(TObject *slave, TList *out)
{
PDB(kFeedback,1)
Info("StoreFeedback","Enter (%p,%p,%d)", fFeedbackLists, out, (out ? out->GetSize() : -1));
if ( out == 0 ) {
PDB(kFeedback,1)
Info("StoreFeedback","Leave (empty)");
return;
}
if (fFeedbackLists == 0) {
PDB(kFeedback,2) Info("StoreFeedback","Create fFeedbackLists");
fFeedbackLists = new TList;
fFeedbackLists->SetOwner();
}
TIter next(out);
out->SetOwner(kFALSE);
TObject *obj;
while( (obj = next()) ) {
PDB(kFeedback,2)
Info("StoreFeedback","Find '%s'", obj->GetName() );
TMap *map = (TMap*) fFeedbackLists->FindObject(obj->GetName());
if ( map == 0 ) {
PDB(kFeedback,2)
Info("StoreFeedback", "map for '%s' not found (creating)", obj->GetName());
map = new TMap;
map->SetName(obj->GetName());
fFeedbackLists->Add(map);
} else {
PDB(kFeedback,2)
Info("StoreFeedback","removing previous value");
if (map->GetValue(slave))
delete map->GetValue(slave);
map->Remove(slave);
}
map->Add(slave, obj);
}
delete out;
PDB(kFeedback,1)
Info("StoreFeedback","Leave");
}