Logo ROOT   6.16/01
Reference Guide
TMPClient.h
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: Enrico Guiraud July 2015
3
4/*************************************************************************
5 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#ifndef ROOT_TMPClient
13#define ROOT_TMPClient
14
15#include "MPSendRecv.h"
16#include "TMonitor.h"
17#include "TMPWorker.h"
18#include <memory> //unique_ptr
19#include <iostream>
20#include <unistd.h> //pid_t
21#include <vector>
22
23class TMPClient {
24public:
25 explicit TMPClient(unsigned nWorkers = 0);
26 ~TMPClient();
27 //it doesn't make sense to copy a TMPClient
28 TMPClient(const TMPClient &) = delete;
29 TMPClient &operator=(const TMPClient &) = delete;
30
31 bool Fork(TMPWorker &server); // we expect application to pass a reference to an inheriting class and take advantage of polymorphism
32 unsigned Broadcast(unsigned code, unsigned nMessages = 0);
33 template<class T> unsigned Broadcast(unsigned code, const std::vector<T> &objs);
34 template<class T> unsigned Broadcast(unsigned code, std::initializer_list<T> &objs);
35 template<class T> unsigned Broadcast(unsigned code, T obj, unsigned nMessages = 0);
36 TMonitor &GetMonitor() { return fMon; }
37 bool GetIsParent() const { return fIsParent; }
38 /// Set the number of workers that will be spawned by the next call to Fork()
39 void SetNWorkers(unsigned n) { fNWorkers = n; }
40 unsigned GetNWorkers() const { return fNWorkers; }
41 void DeActivate(TSocket *s);
42 void Remove(TSocket *s);
43 void ReapWorkers();
44 void HandleMPCode(MPCodeBufPair &msg, TSocket *sender);
45
46private:
47 bool fIsParent; ///< This is true if this is the parent/client process, false if this is a child/worker process
48 std::vector<pid_t> fWorkerPids; ///< A vector containing the PIDs of children processes/workers
49 TMonitor fMon; ///< This object manages the sockets and detect socket events via TMonitor::Select
50 unsigned fNWorkers; ///< The number of workers that should be spawned upon forking
51};
52
53
54//////////////////////////////////////////////////////////////////////////
55/// Send a message with a different object to each server.
56/// Sockets can either be in an "active" or "non-active" state. This method
57/// activates all the sockets through which the client is connected to the
58/// workers, and deactivates them when a message is sent to the corresponding
59/// worker. This way the sockets pertaining to workers who have been left
60/// idle will be the only ones in the active list
61/// (TSocket::GetMonitor()->GetListOfActives()) after execution.
62/// \param code the code of the message to send (e.g. EMPCode)
63/// \param args
64/// \parblock
65/// a vector containing the different messages to be sent. If the size of
66/// the vector is smaller than the number of workers, a message will be
67/// sent only to the first args.size() workers. If the size of the args vector
68/// is bigger than the number of workers, only the first fNWorkers arguments
69/// will be sent.
70/// \endparblock
71/// \return the number of messages successfully sent
72template<class T>
73unsigned TMPClient::Broadcast(unsigned code, const std::vector<T> &args)
74{
76
77 std::unique_ptr<TList> lp(fMon.GetListOfActives());
78 unsigned count = 0;
79 unsigned nArgs = args.size();
80 for (auto s : *lp) {
81 if (count == nArgs)
82 break;
83 if (MPSend((TSocket *)s, code, args[count])) {
85 ++count;
86 } else {
87 Error("TMPClient::Broadcast", "[E] Could not send message to server\n");
88 }
89 }
90
91 return count;
92}
93
94
95//////////////////////////////////////////////////////////////////////////
96/// Send a message with a different object to each server.
97/// See TMPClient::Broadcast(unsigned code, const std::vector<T> &args)
98/// for more informations.
99template<class T>
100unsigned TMPClient::Broadcast(unsigned code, std::initializer_list<T> &args)
101{
102 std::vector<T> vargs(std::move(args));
103 return Broadcast(code, vargs);
104}
105
106
107//////////////////////////////////////////////////////////////////////////
108/// Send a message containing code and obj to each worker, up to a
109/// maximum number of nMessages workers. See
110/// Broadcast(unsigned code, unsigned nMessages) for more informations.
111/// \param code the code of the message to send (e.g. EMPCode)
112/// \param obj the object to send
113/// \param nMessages
114/// \parblock
115/// the maximum number of messages to send.
116/// If nMessages == 0, send a message to every worker.
117/// \endparblock
118/// \return the number of messages successfully sent
119template<class T>
120unsigned TMPClient::Broadcast(unsigned code, T obj, unsigned nMessages)
121{
122 if (nMessages == 0)
123 nMessages = fNWorkers;
124 unsigned count = 0;
126
127 //send message to all sockets
128 std::unique_ptr<TList> lp(fMon.GetListOfActives());
129 for (auto s : *lp) {
130 if (count == nMessages)
131 break;
132 if (MPSend((TSocket *)s, code, obj)) {
134 ++count;
135 } else {
136 Error("TMPClient::Broadcast", "[E] Could not send message to server\n");
137 }
138 }
139
140 return count;
141}
142
143#endif
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:31
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
void Error(const char *location, const char *msgfmt,...)
Base class for multiprocess applications' clients.
Definition: TMPClient.h:23
unsigned GetNWorkers() const
Definition: TMPClient.h:40
unsigned fNWorkers
The number of workers that should be spawned upon forking.
Definition: TMPClient.h:50
TMPClient(unsigned nWorkers=0)
Class constructor.
Definition: TMPClient.cxx:51
bool GetIsParent() const
Definition: TMPClient.h:37
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
Definition: TMPClient.cxx:248
TMPClient & operator=(const TMPClient &)=delete
std::vector< pid_t > fWorkerPids
A vector containing the PIDs of children processes/workers.
Definition: TMPClient.h:48
~TMPClient()
Class destructor.
Definition: TMPClient.cxx:70
TMonitor fMon
This object manages the sockets and detect socket events via TMonitor::Select.
Definition: TMPClient.h:49
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
Definition: TMPClient.cxx:329
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
Definition: TMPClient.cxx:308
TMPClient(const TMPClient &)=delete
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
Definition: TMPClient.h:39
TMonitor & GetMonitor()
Definition: TMPClient.h:36
void Remove(TSocket *s)
Remove a certain socket from the monitor.
Definition: TMPClient.cxx:295
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Definition: TMPClient.cxx:128
bool fIsParent
This is true if this is the parent/client process, false if this is a child/worker process.
Definition: TMPClient.h:47
void DeActivate(TSocket *s)
DeActivate a certain socket.
Definition: TMPClient.cxx:281
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
Definition: TMPWorker.h:26
virtual void ActivateAll()
Activate all de-activated sockets.
Definition: TMonitor.cxx:268
virtual void DeActivate(TSocket *sock)
De-activate a socket.
Definition: TMonitor.cxx:284
TList * GetListOfActives() const
Returns a list with all active sockets.
Definition: TMonitor.cxx:498
const Int_t n
Definition: legend1.C:16
double T(double x)
Definition: ChebyshevPol.h:34
static constexpr double s