#include "TPacketizerMulti.h"
#include "TClass.h"
#include "TDSet.h"
#include "TError.h"
#include "TFileInfo.h"
#include "TList.h"
#include "TMap.h"
#include "TMethodCall.h"
#include "TProof.h"
#include "TProofDebug.h"
ClassImp(TPacketizerMulti)
TPacketizerMulti::TPacketizerMulti(TDSet *dset, TList *wrks,
Long64_t first, Long64_t num,
TList *input, TProofProgressStatus *st)
: TVirtualPacketizer(input, st)
{
PDB(kPacketizer,1) Info("TPacketizerMulti",
"enter (first %lld, num %lld)", first, num);
fValid = kFALSE;
fPacketizersIter = 0;
fCurrent = 0;
fAssignedPack = 0;
if (!dset || !wrks || !input || !st) {
Error("TPacketizerMulti", "invalid inputs: dset:%p wrks:%p input:%p st:%p",
dset, wrks, input, st);
return;
}
fPacketizers = new TList;
TNamed *progTimerFlag = new TNamed("PROOF_StartProgressTimer", "no");
input->Add(progTimerFlag);
fTotalEntries = 0;
TVirtualPacketizer *packetizer = 0;
if (!(dset->TestBit(TDSet::kMultiDSet))) {
if ((packetizer = CreatePacketizer(dset, wrks, first, num, input, st))) {
fPacketizers->Add(packetizer);
fTotalEntries = packetizer->GetTotalEntries();
} else {
Error("TPacketizerMulti", "problems initializing packetizer for single dataset");
input->Remove(progTimerFlag);
delete progTimerFlag;
return;
}
} else {
TIter nxds(dset->GetListOfElements());
TDSet *ds = 0;
while ((ds = (TDSet *)nxds())) {
if ((packetizer = CreatePacketizer(ds, wrks, first, num, input, st))) {
fPacketizers->Add(packetizer);
fTotalEntries += packetizer->GetTotalEntries();
} else {
Error("TPacketizerMulti", "problems initializing packetizer for dataset '%s'", ds->GetName());
}
}
}
input->Remove(progTimerFlag);
delete progTimerFlag;
if (fPacketizers->GetSize() <= 0) {
Error("TPacketizerMulti", "no valid packetizer could be initialized - aborting");
SafeDelete(fPacketizers);
return;
} else {
Info("TPacketizerMulti", "%d packetizer(s) have been successfully initialized (%lld events in total)",
fPacketizers->GetSize(), fTotalEntries);
TIter nxp(fPacketizers);
while ((packetizer = (TVirtualPacketizer *) nxp()))
packetizer->SetTotalEntries(fTotalEntries);
}
fPacketizersIter = new TIter(fPacketizers);
if (!(fCurrent = (TVirtualPacketizer *) fPacketizersIter->Next())) {
Error("TPacketizerMulti", "could not point to the first valid packetizer");
fPacketizers->SetOwner(kTRUE);
SafeDelete(fPacketizers);
SafeDelete(fPacketizersIter);
return;
}
fAssignedPack = new TMap;
fValid = kTRUE;
PDB(kPacketizer,1) Info("TPacketizerMulti", "done");
}
TPacketizerMulti::~TPacketizerMulti()
{
if (fPacketizers) {
fPacketizers->SetOwner(kTRUE);
SafeDelete(fPacketizers);
}
SafeDelete(fPacketizers);
fCurrent = 0;
if (fAssignedPack) {
fAssignedPack->SetOwner(kFALSE);
SafeDelete(fAssignedPack);
}
SafeDelete(fPacketizersIter);
}
TDSetElement *TPacketizerMulti::GetNextPacket(TSlave *wrk, TMessage *r)
{
TDSetElement *elem = 0;
if (!fValid) return elem;
TVirtualPacketizer *lastPacketizer = dynamic_cast<TVirtualPacketizer *>(fAssignedPack->GetValue(wrk));
if (lastPacketizer && lastPacketizer != fCurrent) {
PDB(kPacketizer,2)
Info("GetNextPacket", "%s: asking old packetizer %p ... ", wrk->GetOrdinal(), lastPacketizer);
if ((elem = lastPacketizer->GetNextPacket(wrk, r))) return elem;
if (fCurrent) {
TVirtualSlaveStat *oldstat = dynamic_cast<TVirtualSlaveStat *>(lastPacketizer->GetSlaveStats()->GetValue(wrk));
TVirtualSlaveStat *curstat = dynamic_cast<TVirtualSlaveStat *>(fCurrent->GetSlaveStats()->GetValue(wrk));
if (oldstat && curstat)
*(curstat->GetProgressStatus()) += *(oldstat->GetProgressStatus());
}
}
if (!fCurrent) {
HandleTimer(0);
return elem;
}
PDB(kPacketizer,2)
Info("GetNextPacket", "%s: asking current packetizer %p ... ", wrk->GetOrdinal(), fCurrent);
if (!(elem = fCurrent->GetNextPacket(wrk, r))) {
TMap *oldStats = (lastPacketizer && lastPacketizer == fCurrent) ? lastPacketizer->GetSlaveStats() : 0;
fCurrent = (TVirtualPacketizer *) fPacketizersIter->Next();
if (fCurrent) {
if (oldStats) {
TVirtualSlaveStat *oldstat = dynamic_cast<TVirtualSlaveStat *>(oldStats->GetValue(wrk));
TVirtualSlaveStat *curstat = dynamic_cast<TVirtualSlaveStat *>(fCurrent->GetSlaveStats()->GetValue(wrk));
if (oldstat && curstat)
*(curstat->GetProgressStatus()) += *(oldstat->GetProgressStatus());
}
PDB(kPacketizer,2)
Info("GetNextPacket", "%s: asking new packetizer %p ... ", wrk->GetOrdinal(), fCurrent);
elem = fCurrent->GetNextPacket(wrk, r);
}
}
if (fCurrent) {
TPair *pair = dynamic_cast<TPair *>(fAssignedPack->FindObject(wrk));
if (pair) {
pair->SetValue(fCurrent);
} else {
fAssignedPack->Add(wrk, fCurrent);
}
PDB(kPacketizer,2)
Info("GetNextPacket", "assigned packetizer %p to %s (check: %p)",
fCurrent, wrk->GetOrdinal(), fAssignedPack->GetValue(wrk));
}
if (fProgressStatus->GetEntries() >= fTotalEntries) {
if (fProgressStatus->GetEntries() > fTotalEntries)
Error("GetNextPacket", "Processed too many entries!");
HandleTimer(0);
SafeDelete(fProgress);
}
return elem;
}
TVirtualPacketizer *TPacketizerMulti::CreatePacketizer(TDSet *dset, TList *wrks,
Long64_t first, Long64_t num,
TList *input, TProofProgressStatus *st)
{
TVirtualPacketizer *packetizer = 0;
if (!dset || !wrks || !input || !st) {
Error("CreatePacketizer", "invalid inputs: dset:%p wrks:%p input:%p st:%p",
dset, wrks, input, st);
return packetizer;
}
if (dset->TestBit(TDSet::kEmpty)) {
Error("CreatePacketizer", "dataset is empty: protocol error?");
return packetizer;
}
TString packetizername;
TList *listOfMissingFiles = 0;
TMethodCall callEnv;
TClass *cl;
if (!(listOfMissingFiles = (TList *) input->FindObject("MissingFiles"))) {
listOfMissingFiles = new TList;
input->Add(listOfMissingFiles);
}
dset->Lookup(kTRUE, &listOfMissingFiles);
if (!(dset->GetListOfElements()) ||
!(dset->GetListOfElements()->GetSize())) {
Error("CreatePacketizer", "no files from the data set were found - skipping");
return packetizer;
}
if (TProof::GetParameter(input, "PROOF_Packetizer", packetizername) != 0) {
packetizername = "TPacketizerAdaptive";
} else {
Info("CreatePacketizer", "using alternate packetizer: %s", packetizername.Data());
}
cl = TClass::GetClass(packetizername);
if (cl == 0) {
Error("CreatePacketizer", "class '%s' not found", packetizername.Data());
return packetizer;
}
callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
if (!callEnv.IsValid()) {
Error("CreatePacketizer", "cannot find correct constructor for '%s'", cl->GetName());
return packetizer;
}
callEnv.ResetParam();
callEnv.SetParam((Long_t) dset);
callEnv.SetParam((Long_t) wrks);
callEnv.SetParam((Long64_t) first);
callEnv.SetParam((Long64_t) num);
callEnv.SetParam((Long_t) input);
callEnv.SetParam((Long_t) st);
dset->SetBit(TDSet::kValidityChecked);
dset->ResetBit(TDSet::kSomeInvalid);
Long_t ret = 0;
callEnv.Execute(ret);
if ((packetizer = (TVirtualPacketizer *)ret) == 0) {
Error("CreatePacketizer", "cannot construct '%s'", cl->GetName());
return packetizer;
}
if (!packetizer->IsValid()) {
Error("CreatePacketizer",
"instantiated packetizer object '%s' is invalid", cl->GetName());
SafeDelete(packetizer);
}
TDSetElement *elem = 0;
if (dset->TestBit(TDSet::kSomeInvalid)) {
TIter nxe(dset->GetListOfElements());
while ((elem = (TDSetElement *)nxe())) {
if (!elem->GetValid()) {
listOfMissingFiles->Add(elem->GetFileInfo(dset->GetType()));
dset->Remove(elem, kFALSE);
}
}
dset->ResetBit(TDSet::kSomeInvalid);
}
return packetizer;
}