#include "TVirtualPacketizer.h"
#include "TFile.h"
#include "TTree.h"
#include "TKey.h"
#include "TDSet.h"
#include "TError.h"
#include "TEventList.h"
#include "TMap.h"
#include "TMessage.h"
#include "TObjString.h"
#include "TProofPlayer.h"
#include "TProofServ.h"
#include "TSlave.h"
#include "TSocket.h"
#include "TTimer.h"
#include "TUrl.h"
#include "TMath.h"
#include "TMonitor.h"
#include "TNtupleD.h"
#include "TPerfStats.h"
ClassImp(TVirtualPacketizer)
TVirtualPacketizer::TVirtualPacketizer()
{
fValid = kTRUE;
fStop = kFALSE;
}
Long64_t TVirtualPacketizer::GetEntries(Bool_t tree, TDSetElement *e)
{
Long64_t entries;
TFile *file = TFile::Open(e->GetFileName());
if ( file->IsZombie() ) {
Error("GetEntries","Cannot open file: %s (%s)",
e->GetFileName(), strerror(file->GetErrno()) );
return -1;
}
TDirectory *dirsave = gDirectory;
if ( ! file->cd(e->GetDirectory()) ) {
Error("GetEntries","Cannot cd to: %s", e->GetDirectory() );
delete file;
return -1;
}
TDirectory *dir = gDirectory;
dirsave->cd();
if ( tree ) {
TKey *key = dir->GetKey(e->GetObjName());
if ( key == 0 ) {
Error("GetEntries","Cannot find tree \"%s\" in %s",
e->GetObjName(), e->GetFileName() );
delete file;
return -1;
}
TTree *tree = (TTree *) key->ReadObj();
if ( tree == 0 ) {
delete file;
return -1;
}
entries = (Long64_t) tree->GetEntries();
delete tree;
} else {
TList *keys = dir->GetListOfKeys();
entries = keys->GetSize();
}
delete file;
return entries;
}
Long64_t TVirtualPacketizer::GetEntriesProcessed(TSlave *) const
{
AbstractMethod("GetEntriesProcessed");
return 0;
}
TDSetElement *TVirtualPacketizer::GetNextPacket(TSlave *, TMessage *)
{
AbstractMethod("GetNextPacket");
return 0;
}
void TVirtualPacketizer::StopProcess(Bool_t )
{
fStop = kTRUE;
}
void TVirtualPacketizer::SplitEventList(TDSet *dset)
{
TEventList *mainList = dset->GetEventList();
R__ASSERT(mainList);
TIter next(dset->GetListOfElements());
TDSetElement *el, *prev;
prev = dynamic_cast<TDSetElement*> (next());
if (!prev)
return;
Long64_t low = prev->GetTDSetOffset();
Long64_t high = low;
Long64_t currPos = 0;
do {
el = dynamic_cast<TDSetElement*> (next());
if (el == 0)
high = kMaxLong64;
else
high = el->GetTDSetOffset();
#ifdef DEBUG
while (currPos < mainList->GetN() && mainList->GetEntry(currPos) < low) {
Error("SplitEventList", "event outside of the range of any of the TDSetElements");
currPos++;
}
#endif
TEventList* newEventList = new TEventList();
while (currPos < mainList->GetN() && mainList->GetEntry((Int_t)currPos) < high) {
newEventList->Enter(mainList->GetEntry((Int_t)currPos) - low);
currPos++;
}
prev->SetEventList(newEventList);
prev->SetNum(newEventList->GetN());
low = high;
prev = el;
} while (el);
}
TDSetElement* TVirtualPacketizer::CreateNewPacket(TDSetElement* base,
Long64_t first, Long64_t num)
{
TDSetElement* elem = new TDSetElement(base->GetFileName(), base->GetObjName(),
base->GetDirectory(), first, num);
TList *friends = base->GetListOfFriends();
if (friends) {
TIter nxf(friends);
TPair *p = 0;
while ((p = (TPair *) nxf())) {
TDSetElement *fe = (TDSetElement *) p->Key();
elem->AddFriend(new TDSetElement(fe->GetFileName(), fe->GetObjName(),
fe->GetDirectory(), first, num),
((TObjString *)(p->Value()))->GetName());
}
}
return elem;
}
Bool_t TVirtualPacketizer::HandleTimer(TTimer *)
{
if (fProgress == 0) return kFALSE;
TMessage m(kPROOF_PROGRESS);
if (gProofServ->GetProtocol() > 11) {
TTime tnow = gSystem->Now();
Float_t now = (Float_t) (Long_t(tnow) - fStartTime) / (Double_t)1000.;
Double_t evts = (Double_t) fProcessed;
Double_t mbs = (fBytesRead > 0) ? fBytesRead / TMath::Power(2.,20.) : 0.;
Float_t evtrti = -1., mbrti = -1.;
if (evts <= 0) {
fInitTime = now;
} else {
if (fCircProg->GetEntries() <= 0) {
fCircProg->Fill((Double_t)0., 0., 0.);
fInitTime = (now + fInitTime) / 2.;
}
fTimeUpdt = now - fProcTime;
fProcTime = now - fInitTime;
fCircProg->Fill((Double_t)fProcTime, evts, mbs);
if (fCircProg->GetEntries() > 4) {
Double_t *ar = fCircProg->GetArgs();
fCircProg->GetEntry(0);
Double_t dt = (Double_t)fProcTime - ar[0];
evtrti = (dt > 0) ? (Float_t) (evts - ar[1]) / dt : -1. ;
mbrti = (dt > 0) ? (Float_t) (mbs - ar[2]) / dt : -1. ;
if (gPerfStats != 0)
gPerfStats->RateEvent((Double_t)fProcTime, dt,
(Long64_t) (evts - ar[1]),
(Long64_t) ((mbs - ar[2])*TMath::Power(2.,20.)));
}
}
m << fTotalEntries << fProcessed << fBytesRead << fInitTime << fProcTime
<< evtrti << mbrti;
} else {
m << fTotalEntries << fProcessed;
}
gProofServ->GetSocket()->Send(m);
return kFALSE;
}
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.