Logo ROOT  
Reference Guide
TMPWorker.cxx
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: Enrico Guiraud July 2015
3// Modified: G Ganis Jan 2017
4
5/*************************************************************************
6 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13#include "MPCode.h"
14#include "MPSendRecv.h"
15#include "TError.h"
16#include "TMPWorker.h"
17#include "TSystem.h"
18#include <memory> //unique_ptr
19#include <string>
20
21#include <iostream>
22
23//////////////////////////////////////////////////////////////////////////
24///
25/// \class TMPWorker
26///
27/// This class works in conjuction with TMPClient, reacting to messages
28/// received from it as specified by the Notify and HandleInput methods.
29/// When TMPClient::Fork is called, a TMPWorker instance is passed to it
30/// which will take control of the ROOT session in the children processes.
31///
32/// After forking, every time a message is sent or broadcast to the workers,
33/// TMPWorker::Notify is called and the message is retrieved.
34/// Messages exchanged between TMPClient and TMPWorker should be sent with
35/// the MPSend() standalone function.\n
36/// If the code of the message received is above 1000 (i.e. it is an MPCode)
37/// the qualified TMPWorker::HandleInput method is called, that takes care
38/// of handling the most generic type of messages. Otherwise the unqualified
39/// (possibly overridden) version of HandleInput is called, allowing classes
40/// that inherit from TMPWorker to manage their own protocol.\n
41/// An application's worker class should inherit from TMPWorker and implement
42/// a HandleInput method that overrides TMPWorker's.\n
43///
44//////////////////////////////////////////////////////////////////////////
45
46//////////////////////////////////////////////////////////////////////////
47/// This method is called by children processes right after forking.
48/// Initialization of worker properties that must be delayed until after
49/// forking must be done here.\n
50/// For example, Init saves the pid into fPid, and adds the TMPWorker to
51/// the main eventloop (as a TFileHandler).\n
52/// Make sure this operations are performed also by overriding implementations,
53/// e.g. by calling TMPWorker::Init explicitly.
54void TMPWorker::Init(int fd, unsigned workerN)
55{
56 fS.reset(new TSocket(fd, "MPsock")); //TSocket's constructor with this signature seems much faster than TSocket(int fd)
57 fPid = getpid();
58 fNWorker = workerN;
59 fId = "W" + std::to_string(GetNWorker()) + "|P" + std::to_string(GetPid());
60}
61
62
64{
65 while(true) {
66 MPCodeBufPair msg = MPRecv(fS.get());
67 if (msg.first == MPCode::kRecvError) {
68 Error("TMPWorker::Run", "Lost connection to client\n");
69 gSystem->Exit(0);
70 }
71
72 if (msg.first < 1000)
73 HandleInput(msg); //call overridden method
74 else
75 TMPWorker::HandleInput(msg); //call this class' method
76 }
77}
78
79
80//////////////////////////////////////////////////////////////////////////
81/// Handle a message with an EMPCode.
82/// This method is called upon receiving a message with a code >= 1000 (i.e.
83/// EMPCode). It handles the most generic types of messages.\n
84/// Classes inheriting from TMPWorker should implement their own HandleInput
85/// function, that should be able to handle codes specific to that application.\n
86/// The appropriate version of the HandleInput method (TMPWorker's or the
87/// overriding version) is automatically called depending on the message code.
89{
90 unsigned code = msg.first;
91
92 std::string reply = fId;
93 if (code == MPCode::kMessage) {
94 //general message, ignore it
95 reply += ": ok";
96 MPSend(fS.get(), MPCode::kMessage, reply.c_str());
97 } else if (code == MPCode::kError) {
98 //general error, ignore it
99 reply += ": ko";
100 MPSend(fS.get(), MPCode::kMessage, reply.c_str());
101 } else if (code == MPCode::kShutdownOrder || code == MPCode::kFatalError) {
102 //client is asking the server to shutdown or client is dying
103 MPSend(fS.get(), MPCode::kShutdownNotice, reply.c_str());
104 gSystem->Exit(0);
105 } else {
106 reply += ": unknown code received. code=" + std::to_string(code);
107 MPSend(fS.get(), MPCode::kError, reply.c_str());
108 }
109}
110
111//////////////////////////////////////////////////////////////////////////
112/// Error sender
113
114void TMPWorker::SendError(const std::string& errmsg, unsigned int errcode)
115{
116 std::string reply = fId + ": " + errmsg;
117 MPSend(GetSocket(), errcode, reply.c_str());
118}
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:31
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
Definition: MPSendRecv.cxx:54
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
void Error(const char *location, const char *msgfmt,...)
R__EXTERN TSystem * gSystem
Definition: TSystem.h:556
void SendError(const std::string &errmsg, unsigned int code=MPCode::kError)
Error sender.
Definition: TMPWorker.cxx:114
unsigned GetNWorker() const
Definition: TMPWorker.h:42
std::string fId
identifier string in the form W<nwrk>|P<proc id>
Definition: TMPWorker.h:45
pid_t GetPid()
Definition: TMPWorker.h:41
TSocket * GetSocket()
Definition: TMPWorker.h:40
unsigned fNWorker
the ordinal number of this worker (0 to nWorkers-1)
Definition: TMPWorker.h:57
virtual void HandleInput(MPCodeBufPair &msg)
Handle a message with an EMPCode.
Definition: TMPWorker.cxx:88
void Run()
Definition: TMPWorker.cxx:63
pid_t fPid
the PID of the process in which this worker is running
Definition: TMPWorker.h:56
std::unique_ptr< TSocket > fS
This worker's socket. The unique_ptr makes sure resources are released.
Definition: TMPWorker.h:55
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
Definition: TMPWorker.cxx:54
virtual void Exit(int code, Bool_t mode=kTRUE)
Exit the application.
Definition: TSystem.cxx:714
@ kMessage
Generic message.
Definition: MPCode.h:46
@ kRecvError
Error while reading from the socket.
Definition: MPCode.h:51
@ kError
Error message.
Definition: MPCode.h:47
@ kFatalError
Fatal error: whoever sends this message is terminating execution.
Definition: MPCode.h:48
@ kShutdownOrder
Used by the client to tell servers to shutdown.
Definition: MPCode.h:49
@ kShutdownNotice
Used by the workers to notify client of shutdown.
Definition: MPCode.h:50