// @(#)root/proof:$Name: $:$Id: TProofSuperMaster.cxx,v 1.11 2005/12/10 16:51:57 rdm Exp $
// Author: Fons Rademakers 13/02/97
/*************************************************************************
* Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
* All rights reserved. *
* *
* For the licensing terms see $ROOTSYS/LICENSE. *
* For the list of contributors see $ROOTSYS/README/CREDITS. *
*************************************************************************/
//////////////////////////////////////////////////////////////////////////
// //
// TProofSuperMaster //
// //
// This class controls a Parallel ROOT Facility, PROOF, cluster. //
// It fires the slave servers, it keeps track of how many slaves are //
// running, it keeps track of the slaves running status, it broadcasts //
// messages to all slaves, it collects results, etc. //
// //
//////////////////////////////////////////////////////////////////////////
#include "TProofSuperMaster.h"
#include "TString.h"
#include "TError.h"
#include "TList.h"
#include "TSortedList.h"
#include "TSlave.h"
#include "TMap.h"
#include "TProofServ.h"
#include "TSocket.h"
#include "TMonitor.h"
#include "TSemaphore.h"
#include "TDSet.h"
#include "TPluginManager.h"
#include "TProofPlayer.h"
#include "TMessage.h"
#include "TUrl.h"
#include "TProofResourcesStatic.h"
#include "TProofNodeInfo.h"
ClassImp(TProofSuperMaster)
//______________________________________________________________________________
TProofSuperMaster::TProofSuperMaster(const char *masterurl, const char *conffile,
const char *confdir, Int_t loglevel)
{
// Start super master PROOF session.
fUrl = TUrl(masterurl);
if (!conffile || strlen(conffile) == 0)
conffile = kPROOF_ConfFile;
else if (!strncasecmp(conffile, "sm:", 3))
conffile+=3;
if (!confdir || strlen(confdir) == 0)
confdir = kPROOF_ConfDir;
Init(masterurl, conffile, confdir, loglevel);
}
//______________________________________________________________________________
Bool_t TProofSuperMaster::StartSlaves(Bool_t parallel, Bool_t)
{
// Start up PROOF submasters.
// If this is a supermaster server, find the config file and start
// submaster servers as specified in the config file.
// There is a difference in startup between a slave and a submaster
// in which the submaster will issue a kPROOF_LOGFILE and
// then a kPROOF_LOGDONE message (which must be collected)
// while slaves do not.
// Parse the config file
TProofResourcesStatic *resources = new TProofResourcesStatic(fConfDir, fConfFile);
fConfFile = resources->GetFileName(); // Update the global file name (with path)
PDB(kGlobal,1) Info("StartSlaves", "using PROOF config file: %s", fConfFile.Data());
// Get the master
TProofNodeInfo *master = resources->GetMaster();
if (master) {
fImage = master->GetImage();
}
if (!master || (fImage.Length() == 0)) {
Error("StartSlaves",
"no appropriate master line found in %s", fConfFile.Data());
return kFALSE;
}
TList *submasterList = resources->GetSubmasters();
UInt_t nSubmasters = submasterList->GetSize();
UInt_t nSubmastersDone = 0;
Int_t ord = 0;
TList validSubmasters;
TList validPairs;
validPairs.SetOwner();
// Init arrays for threads, if neeeded
std::vector<TProofThread *> thrHandlers;
if (parallel) {
thrHandlers.reserve(nSubmasters);
if (thrHandlers.max_size() < nSubmasters) {
PDB(kGlobal,1)
Info("StartSlaves","cannot reserve enough space thread"
" handlers - switch to serial startup");
parallel = kFALSE;
}
}
// Loop over all submasters and start them
TListIter next(submasterList);
TObject *to;
TProofNodeInfo *submaster;
while ((to = next())) {
// Get the next submaster from the list
submaster = (TProofNodeInfo *)to;
const Char_t *conffile = submaster->GetConfig();
const Char_t *image = submaster->GetImage();
const Char_t *msd = submaster->GetMsd();
Int_t sport = submaster->GetPort();
if (sport == -1)
sport = fUrl.GetPort();
TString fullord = TString(gProofServ->GetOrdinal()) + "." + ((Long_t) ord);
if (parallel) {
// Prepare arguments
TProofThreadArg *ta =
new TProofThreadArg(submaster->GetNodeName().Data(), sport,
fullord, image, conffile, msd, fSlaves, this);
if (ta) {
// Change default type
ta->fType = TSlave::kMaster;
// The type of the thread func makes it a detached thread
TThread *th = new TThread(SlaveStartupThread,ta);
if (!th) {
Info("StartSlaves","Can't create startup thread:"
" out of system resources");
SafeDelete(ta);
} else {
// Save in vector
thrHandlers.push_back(new TProofThread(th,ta));
// Run the thread
th->Run();
// Notify opening of connection
nSubmastersDone++;
TMessage m(kPROOF_SERVERSTARTED);
m << TString("Opening connections to submasters") << nSubmasters
<< nSubmastersDone << kTRUE;
gProofServ->GetSocket()->Send(m);
}
} else {
Info("StartSlaves","Can't create thread arguments object:"
" out of system resources");
}
} // end if (parallel)
else {
// create submaster server
TSlave *slave =
CreateSubmaster(Form("%s:%d", submaster->GetNodeName().Data(), sport),
fullord, image, msd);
// Add to global list (we will add to the monitor list after
// finalizing the server startup)
Bool_t submasterOk = kTRUE;
fSlaves->Add(slave);
if (slave->IsValid()) {
validPairs.Add(new TPair(slave, new TObjString(conffile)));
} else {
submasterOk = kFALSE;
fBadSlaves->Add(slave);
}
PDB(kGlobal,3)
Info("StartSlaves","submaster on host %s created and"
" added to list", submaster->GetNodeName().Data());
// Notify opening of connection
nSubmastersDone++;
TMessage m(kPROOF_SERVERSTARTED);
m << TString("Opening connections to submasters") << nSubmasters
<< nSubmastersDone << submasterOk;
gProofServ->GetSocket()->Send(m);
} // end else (create submaster server)
ord++;
} // end loop over all submasters
// Cleanup
delete resources;
resources = 0;
nSubmastersDone = 0;
if (parallel) {
// Wait completion of startup operations
std::vector<TProofThread *>::iterator i;
for (i = thrHandlers.begin(); i != thrHandlers.end(); ++i) {
TProofThread *pt = *i;
// Wait on this condition
if (pt && pt->fThread->GetState() == TThread::kRunningState) {
PDB(kGlobal,3)
Info("StartSlaves",
"parallel startup: waiting for submaster %s (%s:%d)",
pt->fArgs->fOrd.Data(), pt->fArgs->fUrl->GetHost(),
pt->fArgs->fUrl->GetPort());
pt->fThread->Join();
}
// Notify end of startup operations
nSubmastersDone++;
TMessage m(kPROOF_SERVERSTARTED);
m << TString("Setting up submasters") << nSubmasters
<< nSubmastersDone << kTRUE;
gProofServ->GetSocket()->Send(m);
}
TIter next(fSlaves);
TSlave *sl = 0;
while ((sl = (TSlave *)next())) {
if (sl->IsValid()) {
if (fProtocol == 1) {
Error("StartSlaves", "master and submaster protocols"
" not compatible (%d and %d)",
kPROOF_Protocol, fProtocol);
fBadSlaves->Add(sl);
} else {
fAllMonitor->Add(sl->GetSocket());
validSubmasters.Add(sl);
}
} else {
fBadSlaves->Add(sl);
}
} // end while
// We can cleanup now
while (!thrHandlers.empty()) {
i = thrHandlers.end()-1;
if (*i) {
SafeDelete(*i);
thrHandlers.erase(i);
}
}
} // end if (parallel)
else {
// Here we finalize the server startup: in this way the bulk
// of remote operations are almost parallelized
TIter nxsc(&validPairs);
TPair *sc = 0;
while ((sc = (TPair *) nxsc())) {
// Finalize setup of the server
TSlave *sl = (TSlave *) sc->Key();
TObjString *cf = (TObjString *) sc->Value();
sl->SetupServ(TSlave::kMaster, cf->GetName());
// Monitor good slaves
Bool_t submasterOk = kTRUE;
if (sl->IsValid()) {
// check protocol compatability
// protocol 1 is not supported anymore
if (fProtocol == 1) {
Error("StartSlaves", "master and submaster protocols"
" not compatible (%d and %d)",
kPROOF_Protocol, fProtocol);
submasterOk = kFALSE;
fBadSlaves->Add(sl);
} else {
fAllMonitor->Add(sl->GetSocket());
validSubmasters.Add(sl);
}
} else {
submasterOk = kFALSE;
fBadSlaves->Add(sl);
}
// Notify end of startup operations
nSubmastersDone++;
TMessage m(kPROOF_SERVERSTARTED);
m << TString("Setting up submasters") << nSubmasters
<< nSubmastersDone << submasterOk;
gProofServ->GetSocket()->Send(m);
}
}
Collect(kAll); //Get kPROOF_LOGFILE and kPROOF_LOGDONE messages
TIter nextSubmaster(&validSubmasters);
while (TSlave* sl = dynamic_cast<TSlave*>(nextSubmaster())) {
if (sl->GetStatus() == -99) {
Error("StartSlaves", "not allowed to connect to PROOF master server");
fBadSlaves->Add(sl);
continue;
}
if (!sl->IsValid()) {
Error("StartSlaves", "failed to setup connection with PROOF master server");
fBadSlaves->Add(sl);
continue;
}
}
return kTRUE;
}
//______________________________________________________________________________
Int_t TProofSuperMaster::Process(TDSet *set, const char *selector, Option_t *option,
Long64_t nentries, Long64_t first, TEventList *evl)
{
// Process a data set (TDSet) using the specified selector (.C) file.
// Returns -1 in case of error, 0 otherwise.
if (!IsValid()) return -1;
Assert(GetPlayer());
if (GetProgressDialog())
GetProgressDialog()->ExecPlugin(5, this, selector, set->GetListOfElements()->GetSize(),
first, nentries);
return GetPlayer()->Process(set, selector, option, nentries, first, evl);
}
//______________________________________________________________________________
void TProofSuperMaster::ValidateDSet(TDSet *dset)
{
// Validate a TDSet.
if (dset->ElementsValid()) return;
TList msds;
msds.SetOwner();
TList smholder;
smholder.SetOwner();
TList elemholder;
elemholder.SetOwner();
// build nodelist with slaves and elements
TIter nextSubmaster(GetListOfActiveSlaves());
while (TSlave *sl = dynamic_cast<TSlave*>(nextSubmaster())) {
TList *smlist = 0;
TPair *p = dynamic_cast<TPair*>(msds.FindObject(sl->GetMsd()));
if (!p) {
smlist = new TList;
smlist->SetName(sl->GetMsd());
smholder.Add(smlist);
TList *elemlist = new TSortedList(kSortDescending);
elemlist->SetName(TString(sl->GetMsd())+"_elem");
elemholder.Add(elemlist);
msds.Add(new TPair(smlist, elemlist));
} else {
smlist = dynamic_cast<TList*>(p->Key());
}
smlist->Add(sl);
}
TIter nextElem(dset->GetListOfElements());
while (TDSetElement *elem = dynamic_cast<TDSetElement*>(nextElem())) {
if (elem->GetValid()) continue;
TPair *p = dynamic_cast<TPair*>(msds.FindObject(elem->GetMsd()));
if (p) {
dynamic_cast<TList*>(p->Value())->Add(elem);
} else {
Error("ValidateDSet", "no mass storage domain '%s' associated"
" with available submasters",
elem->GetMsd());
return;
}
}
// send to slaves
TList usedsms;
TIter nextSM(&msds);
SetDSet(dset); // set dset to be validated in Collect()
while (TPair *msd = dynamic_cast<TPair*>(nextSM())) {
TList *sms = dynamic_cast<TList*>(msd->Key());
TList *setelements = dynamic_cast<TList*>(msd->Value());
// distribute elements over the slaves
Int_t nsms = sms->GetSize();
Int_t nelements = setelements->GetSize();
for (Int_t i=0; i<nsms; i++) {
TDSet set(dset->GetType(), dset->GetObjName(),
dset->GetDirectory());
for (Int_t j = (i*nelements)/nsms;
j < ((i+1)*nelements)/nsms;
j++) {
TDSetElement *elem =
dynamic_cast<TDSetElement*>(setelements->At(j));
set.Add(elem->GetFileName(), elem->GetObjName(),
elem->GetDirectory(), elem->GetFirst(),
elem->GetNum(), elem->GetMsd());
}
if (set.GetListOfElements()->GetSize()>0) {
TMessage mesg(kPROOF_VALIDATE_DSET);
mesg << &set;
TSlave *sl = dynamic_cast<TSlave*>(sms->At(i));
PDB(kGlobal,1) Info("ValidateDSet",
"Sending TDSet with %d elements to slave %s"
" to be validated",
set.GetListOfElements()->GetSize(),
sl->GetOrdinal());
sl->GetSocket()->Send(mesg);
usedsms.Add(sl);
}
}
}
PDB(kGlobal,1) Info("ValidateDSet","Calling Collect");
Collect(&usedsms);
SetDSet(0);
}
//______________________________________________________________________________
TProofPlayer *TProofSuperMaster::MakePlayer()
{
// Construct a TProofPlayer object.
SetPlayer(new TProofPlayerSuperMaster(this));
return GetPlayer();
}
ROOT page - Class index - Class Hierarchy - Top of the page
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.