Logo ROOT  
Reference Guide
TMPWorker.cxx
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: Enrico Guiraud July 2015
3// Modified: G Ganis Jan 2017
4
5/*************************************************************************
6 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13#include "MPCode.h"
14#include "MPSendRecv.h"
15#include "TEnv.h"
16#include "TError.h"
17#include "TMPWorker.h"
18#include "TSystem.h"
19#include <memory> //unique_ptr
20#include <string>
21
22#include <iostream>
23
24//////////////////////////////////////////////////////////////////////////
25///
26/// \class TMPWorker
27///
28/// This class works in conjuction with TMPClient, reacting to messages
29/// received from it as specified by the Notify and HandleInput methods.
30/// When TMPClient::Fork is called, a TMPWorker instance is passed to it
31/// which will take control of the ROOT session in the children processes.
32///
33/// After forking, every time a message is sent or broadcast to the workers,
34/// TMPWorker::Notify is called and the message is retrieved.
35/// Messages exchanged between TMPClient and TMPWorker should be sent with
36/// the MPSend() standalone function.\n
37/// If the code of the message received is above 1000 (i.e. it is an MPCode)
38/// the qualified TMPWorker::HandleInput method is called, that takes care
39/// of handling the most generic type of messages. Otherwise the unqualified
40/// (possibly overridden) version of HandleInput is called, allowing classes
41/// that inherit from TMPWorker to manage their own protocol.\n
42/// An application's worker class should inherit from TMPWorker and implement
43/// a HandleInput method that overrides TMPWorker's.\n
44///
45//////////////////////////////////////////////////////////////////////////
46
47//////////////////////////////////////////////////////////////////////////
48/// This method is called by children processes right after forking.
49/// Initialization of worker properties that must be delayed until after
50/// forking must be done here.\n
51/// For example, Init saves the pid into fPid, and adds the TMPWorker to
52/// the main eventloop (as a TFileHandler).\n
53/// Make sure this operations are performed also by overriding implementations,
54/// e.g. by calling TMPWorker::Init explicitly.
55void TMPWorker::Init(int fd, unsigned workerN)
56{
57 fS.reset(new TSocket(fd, "MPsock")); //TSocket's constructor with this signature seems much faster than TSocket(int fd)
58 fPid = getpid();
59 fNWorker = workerN;
60 fId = "W" + std::to_string(GetNWorker()) + "|P" + std::to_string(GetPid());
61}
62
63
65{
66 while(true) {
67 MPCodeBufPair msg = MPRecv(fS.get());
68 if (msg.first == MPCode::kRecvError) {
69 Error("TMPWorker::Run", "Lost connection to client\n");
70 gSystem->Exit(0);
71 }
72
73 if (msg.first < 1000)
74 HandleInput(msg); //call overridden method
75 else
76 TMPWorker::HandleInput(msg); //call this class' method
77 }
78}
79
80
81//////////////////////////////////////////////////////////////////////////
82/// Handle a message with an EMPCode.
83/// This method is called upon receiving a message with a code >= 1000 (i.e.
84/// EMPCode). It handles the most generic types of messages.\n
85/// Classes inheriting from TMPWorker should implement their own HandleInput
86/// function, that should be able to handle codes specific to that application.\n
87/// The appropriate version of the HandleInput method (TMPWorker's or the
88/// overriding version) is automatically called depending on the message code.
90{
91 unsigned code = msg.first;
92
93 std::string reply = fId;
94 if (code == MPCode::kMessage) {
95 //general message, ignore it
96 reply += ": ok";
97 MPSend(fS.get(), MPCode::kMessage, reply.c_str());
98 } else if (code == MPCode::kError) {
99 //general error, ignore it
100 reply += ": ko";
101 MPSend(fS.get(), MPCode::kMessage, reply.c_str());
102 } else if (code == MPCode::kShutdownOrder || code == MPCode::kFatalError) {
103 //client is asking the server to shutdown or client is dying
104 MPSend(fS.get(), MPCode::kShutdownNotice, reply.c_str());
105 gSystem->Exit(0);
106 } else {
107 reply += ": unknown code received. code=" + std::to_string(code);
108 MPSend(fS.get(), MPCode::kError, reply.c_str());
109 }
110}
111
112//////////////////////////////////////////////////////////////////////////
113/// Error sender
114
115void TMPWorker::SendError(const std::string& errmsg, unsigned int errcode)
116{
117 std::string reply = fId + ": " + errmsg;
118 MPSend(GetSocket(), errcode, reply.c_str());
119}
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:31
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
Definition: MPSendRecv.cxx:54
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
void Error(const char *location, const char *msgfmt,...)
R__EXTERN TSystem * gSystem
Definition: TSystem.h:560
void SendError(const std::string &errmsg, unsigned int code=MPCode::kError)
Error sender.
Definition: TMPWorker.cxx:115
unsigned GetNWorker() const
Definition: TMPWorker.h:42
std::string fId
identifier string in the form W<nwrk>|P<proc id>
Definition: TMPWorker.h:45
pid_t GetPid()
Definition: TMPWorker.h:41
TSocket * GetSocket()
Definition: TMPWorker.h:40
unsigned fNWorker
the ordinal number of this worker (0 to nWorkers-1)
Definition: TMPWorker.h:57
virtual void HandleInput(MPCodeBufPair &msg)
Handle a message with an EMPCode.
Definition: TMPWorker.cxx:89
void Run()
Definition: TMPWorker.cxx:64
pid_t fPid
the PID of the process in which this worker is running
Definition: TMPWorker.h:56
std::unique_ptr< TSocket > fS
This worker's socket. The unique_ptr makes sure resources are released.
Definition: TMPWorker.h:55
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
Definition: TMPWorker.cxx:55
virtual void Exit(int code, Bool_t mode=kTRUE)
Exit the application.
Definition: TSystem.cxx:726
@ kMessage
Generic message.
Definition: MPCode.h:46
@ kRecvError
Error while reading from the socket.
Definition: MPCode.h:51
@ kError
Error message.
Definition: MPCode.h:47
@ kFatalError
Fatal error: whoever sends this message is terminating execution.
Definition: MPCode.h:48
@ kShutdownOrder
Used by the client to tell servers to shutdown.
Definition: MPCode.h:49
@ kShutdownNotice
Used by the workers to notify client of shutdown.
Definition: MPCode.h:50