// RooRealMPFE is the multi-processor front-end for parallel calculation
// of RooAbsReal objects. Each RooRealMPFE forks a process that calculates
// the value of the proxies RooAbsReal object. The (re)calculation of
// the proxied object is started asynchronously with the calculate() option.
// A subsequent call to getVal() will return the calculated value when available
// If the calculation is still in progress when getVal() is called it blocks
// the calling process until the calculation is done. The forked calculation process
// is terminated when the front-end object is deleted
// Simple use demonstration
//
// <pre>
// RooAbsReal* slowFunc ;
//
// Double_t val = slowFunc->getVal() // Evaluate slowFunc in current process
//
// RooRealMPFE mpfe("mpfe","frontend to slowFunc",*slowFunc) ;
// mpfe.calculate() ; // Start calculation of slow-func in remote process
// // .. do other stuff here ..
// Double_t val = mpfe.getVal() // Wait for remote calculation to finish and retrieve value
// </pre>
//
// END_HTML
#include "Riostream.h"
#include "RooFit.h"
#ifndef _WIN32
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#endif
#include <errno.h>
#include <sstream>
#include "RooRealMPFE.h"
#include "RooArgSet.h"
#include "RooAbsCategory.h"
#include "RooRealVar.h"
#include "RooCategory.h"
#include "RooMPSentinel.h"
#include "TSystem.h"
RooMPSentinel RooRealMPFE::_sentinel ;
ClassImp(RooRealMPFE)
;
RooRealMPFE::RooRealMPFE(const char *name, const char *title, RooAbsReal& arg, Bool_t calcInline) :
RooAbsReal(name,title),
_state(Initialize),
_arg("arg","arg",this,arg),
_vars("vars","vars",this),
_calcInProgress(kFALSE),
_verboseClient(kFALSE),
_verboseServer(kFALSE),
_inlineMode(calcInline),
_remoteEvalErrorLoggingState(kFALSE),
_pid(0)
{
#ifdef _WIN32
_inlineMode = kTRUE;
#endif
initVars() ;
_sentinel.add(*this) ;
}
RooRealMPFE::RooRealMPFE(const RooRealMPFE& other, const char* name) :
RooAbsReal(other, name),
_state(Initialize),
_arg("arg",this,other._arg),
_vars("vars",this,other._vars),
_calcInProgress(kFALSE),
_verboseClient(other._verboseClient),
_verboseServer(other._verboseServer),
_inlineMode(other._inlineMode),
_forceCalc(other._forceCalc),
_remoteEvalErrorLoggingState(other._remoteEvalErrorLoggingState),
_pid(0)
{
initVars() ;
_sentinel.add(*this) ;
}
RooRealMPFE::~RooRealMPFE()
{
if (_state==Client) {
standby() ;
}
_sentinel.remove(*this) ;
}
void RooRealMPFE::initVars()
{
_vars.removeAll() ;
_saveVars.removeAll() ;
RooArgSet* vars = _arg.arg().getParameters(RooArgSet()) ;
RooArgSet* ncVars = (RooArgSet*) vars->selectByAttrib("Constant",kFALSE) ;
RooArgList varList(*ncVars) ;
_vars.add(varList) ;
_saveVars.addClone(varList) ;
_forceCalc = kTRUE ;
delete vars ;
delete ncVars ;
}
void RooRealMPFE::initialize()
{
if (_inlineMode) {
_state = Inline ;
return ;
}
#ifndef _WIN32
pipe(_pipeToClient) ;
pipe(_pipeToServer) ;
clearEvalErrorLog() ;
_pid = fork() ;
if (_pid==0) {
_state = Server ;
serverLoop() ;
cout << "RooRealMPFE::initialize(" << GetName()
<< ") server process terminating" << endl ;
exit(0) ;
} else if (_pid>0) {
cout << "RooRealMPFE::initialize(" << GetName()
<< ") successfully forked server process " << _pid << endl ;
_state = Client ;
_calcInProgress = kFALSE ;
} else {
cout << "RooRealMPFE::initialize(" << GetName() << ") ERROR fork() failed" << endl ;
_state = Inline ;
}
#endif // _WIN32
}
void RooRealMPFE::serverLoop()
{
#ifndef _WIN32
Bool_t doLoop(kTRUE) ;
Message msg ;
Int_t idx, index, numErrors ;
Double_t value ;
Bool_t isConst ;
clearEvalErrorLog() ;
while(doLoop) {
ssize_t n = read(_pipeToServer[0],&msg,sizeof(msg)) ;
if (n<0&&_verboseServer) perror("read") ;
switch (msg) {
case SendReal:
read(_pipeToServer[0],&idx,sizeof(Int_t)) ;
read(_pipeToServer[0],&value,sizeof(Double_t)) ;
read(_pipeToServer[0],&isConst,sizeof(Bool_t)) ;
if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName()
<< ") IPC fromClient> SendReal [" << idx << "]=" << value << endl ;
((RooRealVar*)_vars.at(idx))->setVal(value) ;
((RooRealVar*)_vars.at(idx))->setConstant(isConst) ;
break ;
case SendCat:
read(_pipeToServer[0],&idx,sizeof(Int_t)) ;
read(_pipeToServer[0],&index,sizeof(Int_t)) ;
if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName()
<< ") IPC fromClient> SendCat [" << idx << "]=" << index << endl ;
((RooCategory*)_vars.at(idx))->setIndex(index) ;
break ;
case Calculate:
if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName()
<< ") IPC fromClient> Calculate" << endl ;
_value = _arg ;
break ;
case Retrieve:
if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName()
<< ") IPC fromClient> Retrieve" << endl ;
msg = ReturnValue ;
write(_pipeToClient[1],&msg,sizeof(Message)) ;
write(_pipeToClient[1],&_value,sizeof(Double_t)) ;
numErrors = numEvalErrors() ;
write(_pipeToClient[1],&numErrors,sizeof(Int_t)) ;
if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName()
<< ") IPC toClient> ReturnValue " << _value << " NumError " << numErrors << endl ;
break ;
case ConstOpt:
ConstOpCode code ;
read(_pipeToServer[0],&code,sizeof(ConstOpCode)) ;
if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName()
<< ") IPC fromClient> ConstOpt " << code << endl ;
((RooAbsReal&)_arg.arg()).constOptimizeTestStatistic(code) ;
break ;
case Verbose:
Bool_t flag ;
read(_pipeToServer[0],&flag,sizeof(Bool_t)) ;
if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName()
<< ") IPC fromClient> Verbose " << (flag?1:0) << endl ;
_verboseServer = flag ;
break ;
case Terminate:
if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName()
<< ") IPC fromClient> Terminate" << endl ;
doLoop = kFALSE ;
break ;
case LogEvalError:
{
Bool_t flag2 ;
read(_pipeToServer[0],&flag2,sizeof(Bool_t)) ;
RooAbsReal::enableEvalErrorLogging(flag2) ;
if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName()
<< ") IPC fromClient> LogEvalError flag = " << (flag2?"kTRUE":"kFALSE") << endl ;
}
break ;
case RetrieveErrors:
if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName()
<< ") IPC fromClient> RetrieveErrors" << endl ;
{
std::map<const RooAbsArg*,pair<string,list<EvalError> > >::const_iterator iter = evalErrorIter() ;
for (int i=0 ; i<numEvalErrorItems() ; i++) {
list<EvalError>::const_iterator iter2 = iter->second.second.begin() ;
for (;iter2!=iter->second.second.end();++iter2) {
msg = SendError ;
write(_pipeToClient[1],&msg,sizeof(Message)) ;
write(_pipeToClient[1],&iter->first,sizeof(RooAbsReal*)) ;
Int_t ntext = strlen(iter2->_msg) ;
write(_pipeToClient[1],&ntext,sizeof(Int_t)) ;
write(_pipeToClient[1],iter2->_msg,ntext+1) ;
Int_t ntext2 = strlen(iter2->_srvval) ;
write(_pipeToClient[1],&ntext2,sizeof(Int_t)) ;
write(_pipeToClient[1],iter2->_srvval,ntext2+1) ;
ostringstream oss2 ;
oss2 << "PID" << gSystem->GetPid() << "/" ;
printStream(oss2,kName|kClassName|kArgs,kInline) ;
Int_t ntext3 = strlen(oss2.str().c_str()) ;
write(_pipeToClient[1],&ntext3,sizeof(Int_t)) ;
write(_pipeToClient[1],oss2.str().c_str(),ntext3+1) ;
if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName()
<< ") IPC toClient> SendError Arg " << iter->first << " Msg " << iter2->_msg << endl ;
}
}
RooAbsReal* null(0) ;
write(_pipeToClient[1],&msg,sizeof(Message)) ;
write(_pipeToClient[1],&null,sizeof(RooAbsReal*)) ;
}
clearEvalErrorLog() ;
break ;
default:
if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName()
<< ") IPC fromClient> Unknown message (code = " << msg << ")" << endl ;
break ;
}
}
#endif // _WIN32
}
void RooRealMPFE::calculate() const
{
if (_state==Initialize) {
const_cast<RooRealMPFE*>(this)->initialize() ;
}
if (_state==Inline) {
_value = _arg ;
clearValueDirty() ;
}
#ifndef _WIN32
if (_state==Client) {
Int_t i ;
for (i=0 ; i<_vars.getSize() ; i++) {
RooAbsArg* var = _vars.at(i) ;
RooAbsArg* saveVar = _saveVars.at(i) ;
if (!(*var==*saveVar) || (var->isConstant() != saveVar->isConstant()) || _forceCalc) {
if (_verboseClient) cout << "RooRealMPFE::calculate(" << GetName()
<< ") variable " << _vars.at(i)->GetName() << " changed" << endl ;
((RooRealVar*)saveVar)->setConstant(var->isConstant()) ;
saveVar->copyCache(var) ;
if (dynamic_cast<RooAbsReal*>(var)) {
Message msg = SendReal ;
Double_t val = ((RooAbsReal*)var)->getVal() ;
Bool_t isC = var->isConstant() ;
write(_pipeToServer[1],&msg,sizeof(msg)) ;
write(_pipeToServer[1],&i,sizeof(Int_t)) ;
write(_pipeToServer[1],&val,sizeof(Double_t)) ;
write(_pipeToServer[1],&isC,sizeof(Bool_t)) ;
if (_verboseServer) cout << "RooRealMPFE::calculate(" << GetName()
<< ") IPC toServer> SendReal [" << i << "]=" << val << (isC?" (Constant)":"") << endl ;
} else if (dynamic_cast<RooAbsCategory*>(var)) {
Message msg = SendCat ;
Int_t idx = ((RooAbsCategory*)var)->getIndex() ;
write(_pipeToServer[1],&msg,sizeof(msg)) ;
write(_pipeToServer[1],&i,sizeof(Int_t)) ;
write(_pipeToServer[1],&idx,sizeof(Int_t)) ;
if (_verboseServer) cout << "RooRealMPFE::calculate(" << GetName()
<< ") IPC toServer> SendCat [" << i << "]=" << idx << endl ;
}
}
}
Message msg = Calculate ;
write(_pipeToServer[1],&msg,sizeof(msg)) ;
if (_verboseServer) cout << "RooRealMPFE::calculate(" << GetName()
<< ") IPC toServer> Calculate " << endl ;
clearValueDirty() ;
_calcInProgress = kTRUE ;
_forceCalc = kFALSE ;
} else if (_state!=Inline) {
cout << "RooRealMPFE::calculate(" << GetName()
<< ") ERROR not in Client or Inline mode" << endl ;
}
#endif // _WIN32
}
Double_t RooRealMPFE::getVal(const RooArgSet* ) const
{
if (isValueDirty()) {
calculate() ;
_value = evaluate() ;
} else if (_calcInProgress) {
_value = evaluate() ;
} else {
}
return _value ;
}
Double_t RooRealMPFE::evaluate() const
{
Double_t return_value = 0;
if (_state==Inline) {
return_value = _arg ;
} else if (_state==Client) {
#ifndef _WIN32
Message msg ;
Double_t value ;
if (evalErrorLoggingEnabled() != _remoteEvalErrorLoggingState) {
msg = LogEvalError ;
write(_pipeToServer[1],&msg,sizeof(Message)) ;
Bool_t flag = evalErrorLoggingEnabled() ;
write(_pipeToServer[1],&flag,sizeof(Bool_t)) ;
_remoteEvalErrorLoggingState = evalErrorLoggingEnabled() ;
}
msg = Retrieve ;
write(_pipeToServer[1],&msg,sizeof(Message)) ;
if (_verboseServer) cout << "RooRealMPFE::evaluate(" << GetName()
<< ") IPC toServer> Retrieve " << endl ;
read(_pipeToClient[0],&msg,sizeof(Message)) ;
if (msg!=ReturnValue) {
cout << "RooRealMPFE::evaluate(" << GetName()
<< ") ERROR: unexpected message from server process: " << msg << endl ;
return 0 ;
}
read(_pipeToClient[0],&value,sizeof(Double_t)) ;
if (_verboseServer) cout << "RooRealMPFE::evaluate(" << GetName()
<< ") IPC fromServer> ReturnValue " << value << endl ;
Int_t numError ;
read(_pipeToClient[0],&numError,sizeof(Int_t)) ;
if (_verboseServer) cout << "RooRealMPFE::evaluate(" << GetName()
<< ") IPC fromServer> NumErrors " << numError << endl ;
if (numError>0) {
msg=RetrieveErrors ;
write(_pipeToServer[1],&msg,sizeof(Message)) ;
if (_verboseServer) cout << "RooRealMPFE::evaluate(" << GetName()
<< ") IPC toServer> RetrieveErrors " << endl ;
while(true) {
RooAbsReal* ptr(0) ;
Int_t ntext1,ntext2,ntext3 ;
char msgbuf1[1024] ;
char msgbuf2[1024] ;
char msgbuf3[1024] ;
read(_pipeToClient[0],&msg,sizeof(Message)) ;
read(_pipeToClient[0],&ptr,sizeof(RooAbsReal*)) ;
if (ptr==0) {
break ;
}
read(_pipeToClient[0],&ntext1,sizeof(Int_t)) ;
read(_pipeToClient[0],msgbuf1,ntext1+1) ;
read(_pipeToClient[0],&ntext2,sizeof(Int_t)) ;
read(_pipeToClient[0],msgbuf2,ntext2+1) ;
read(_pipeToClient[0],&ntext3,sizeof(Int_t)) ;
read(_pipeToClient[0],msgbuf3,ntext3+1) ;
if (_verboseServer) cout << "RooRealMPFE::evaluate(" << GetName()
<< ") IPC fromServer> SendError Arg " << ptr << " Msg " << msgbuf1 << endl ;
logEvalError(ptr,msgbuf3,msgbuf1,msgbuf2) ;
}
}
_calcInProgress = kFALSE ;
return_value = value ;
#endif // _WIN32
}
return return_value;
}
void RooRealMPFE::standby()
{
#ifndef _WIN32
if (_state==Client) {
Message msg = Terminate ;
write(_pipeToServer[1],&msg,sizeof(msg)) ;
if (_verboseServer) cout << "RooRealMPFE::standby(" << GetName()
<< ") IPC toServer> Terminate " << endl ;
close(_pipeToClient[0]) ;
close(_pipeToClient[1]) ;
close(_pipeToServer[0]) ;
close(_pipeToServer[1]) ;
waitpid(_pid,0,0) ;
_state = Initialize ;
}
#endif // _WIN32
}
void RooRealMPFE::constOptimizeTestStatistic(ConstOpCode opcode)
{
#ifndef _WIN32
if (_state==Client) {
Message msg = ConstOpt ;
write(_pipeToServer[1],&msg,sizeof(msg)) ;
write(_pipeToServer[1],&opcode,sizeof(ConstOpCode)) ;
if (_verboseServer) cout << "RooRealMPFE::constOptimize(" << GetName()
<< ") IPC toServer> ConstOpt " << opcode << endl ;
initVars() ;
}
#endif // _WIN32
if (_state==Inline) {
((RooAbsReal&)_arg.arg()).constOptimizeTestStatistic(opcode) ;
}
}
void RooRealMPFE::setVerbose(Bool_t clientFlag, Bool_t serverFlag)
{
#ifndef _WIN32
if (_state==Client) {
Message msg = Verbose ;
write(_pipeToServer[1],&msg,sizeof(msg)) ;
write(_pipeToServer[1],&serverFlag,sizeof(Bool_t)) ;
if (_verboseServer) cout << "RooRealMPFE::setVerbose(" << GetName()
<< ") IPC toServer> Verbose " << (serverFlag?1:0) << endl ;
}
#endif // _WIN32
_verboseClient = clientFlag ; _verboseServer = serverFlag ;
}