ROOT  6.06/09
Reference Guide
TMPClient.h
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud July 2015
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TMPClient
13 #define ROOT_TMPClient
14 
15 #include "TMonitor.h"
16 #include "TMPWorker.h"
17 #include "MPSendRecv.h"
18 #include <vector>
19 #include <unistd.h> //pid_t
20 #include <memory> //unique_ptr
21 #include <iostream>
22 
23 class TMPClient {
24 public:
25  explicit TMPClient(unsigned nWorkers = 0);
26  ~TMPClient();
27  //it doesn't make sense to copy a TMPClient
28  TMPClient(const TMPClient &) = delete;
29  TMPClient &operator=(const TMPClient &) = delete;
30 
31  bool Fork(TMPWorker &server); // we expect application to pass a reference to an inheriting class and take advantage of polymorphism
32  unsigned Broadcast(unsigned code, unsigned nMessages = 0);
33  template<class T> unsigned Broadcast(unsigned code, const std::vector<T> &objs);
34  template<class T> unsigned Broadcast(unsigned code, std::initializer_list<T> &objs);
35  template<class T> unsigned Broadcast(unsigned code, T obj, unsigned nMessages = 0);
36  TMonitor &GetMonitor() { return fMon; }
37  bool GetIsParent() const { return fIsParent; }
38  /// Set the number of workers that will be spawned by the next call to Fork()
39  void SetNWorkers(unsigned n) { fNWorkers = n; }
40  unsigned GetNWorkers() const { return fNWorkers; }
41  void DeActivate(TSocket *s);
42  void Remove(TSocket *s);
43  void ReapWorkers();
44  void HandleMPCode(MPCodeBufPair &msg, TSocket *sender);
45 
46 private:
47  bool fIsParent; ///< This is true if this is the parent/client process, false if this is a child/worker process
48  std::vector<pid_t> fWorkerPids; ///< A vector containing the PIDs of children processes/workers
49  TMonitor fMon; ///< This object manages the sockets and detect socket events via TMonitor::Select
50  unsigned fNWorkers; ///< The number of workers that should be spawned upon forking
51 };
52 
53 
54 //////////////////////////////////////////////////////////////////////////
55 /// Send a message with a different object to each server.
56 /// Sockets can either be in an "active" or "non-active" state. This method
57 /// activates all the sockets through which the client is connected to the
58 /// workers, and deactivates them when a message is sent to the corresponding
59 /// worker. This way the sockets pertaining to workers who have been left
60 /// idle will be the only ones in the active list
61 /// (TSocket::GetMonitor()->GetListOfActives()) after execution.
62 /// \param code the code of the message to send (e.g. EMPCode)
63 /// \param args
64 /// \parblock
65 /// a vector containing the different messages to be sent. If the size of
66 /// the vector is smaller than the number of workers, a message will be
67 /// sent only to the first args.size() workers. If the size of the args vector
68 /// is bigger than the number of workers, only the first fNWorkers arguments
69 /// will be sent.
70 /// \endparblock
71 /// \return the number of messages successfully sent
72 template<class T>
73 unsigned TMPClient::Broadcast(unsigned code, const std::vector<T> &args)
74 {
75  fMon.ActivateAll();
76 
77  std::unique_ptr<TList> lp(fMon.GetListOfActives());
78  unsigned count = 0;
79  unsigned nArgs = args.size();
80  for (auto s : *lp) {
81  if (count == nArgs)
82  break;
83  if (MPSend((TSocket *)s, code, args[count])) {
84  fMon.DeActivate((TSocket *)s);
85  ++count;
86  } else {
87  std::cerr << "[E] Could not send message to server\n";
88  }
89  }
90 
91  return count;
92 }
93 
94 
95 //////////////////////////////////////////////////////////////////////////
96 /// Send a message with a different object to each server.
97 /// See TMPClient::Broadcast(unsigned code, const std::vector<T> &args)
98 /// for more informations.
99 template<class T>
100 unsigned TMPClient::Broadcast(unsigned code, std::initializer_list<T> &args)
101 {
102  std::vector<T> vargs(std::move(args));
103  return Broadcast(code, vargs);
104 }
105 
106 
107 //////////////////////////////////////////////////////////////////////////
108 /// Send a message containing code and obj to each worker, up to a
109 /// maximum number of nMessages workers. See
110 /// Broadcast(unsigned code, unsigned nMessages) for more informations.
111 /// \param code the code of the message to send (e.g. EMPCode)
112 /// \param obj the object to send
113 /// \param nMessages
114 /// \parblock
115 /// the maximum number of messages to send.
116 /// If nMessages == 0, send a message to every worker.
117 /// \endparblock
118 /// \return the number of messages successfully sent
119 template<class T>
120 unsigned TMPClient::Broadcast(unsigned code, T obj, unsigned nMessages)
121 {
122  if (nMessages == 0)
123  nMessages = fNWorkers;
124  unsigned count = 0;
125  fMon.ActivateAll();
126 
127  //send message to all sockets
128  std::unique_ptr<TList> lp(fMon.GetListOfActives());
129  for (auto s : *lp) {
130  if (count == nMessages)
131  break;
132  if (MPSend((TSocket *)s, code, obj)) {
133  fMon.DeActivate((TSocket *)s);
134  ++count;
135  } else {
136  std::cerr << "[E] Could not send message to server\n";
137  }
138  }
139 
140  return count;
141 }
142 
143 #endif
std::vector< pid_t > fWorkerPids
A vector containing the PIDs of children processes/workers.
Definition: TMPClient.h:48
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Definition: TMPClient.cxx:89
TMonitor fMon
This object manages the sockets and detect socket events via TMonitor::Select.
Definition: TMPClient.h:49
double T(double x)
Definition: ChebyshevPol.h:34
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
Definition: TMPWorker.h:20
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:21
TList * GetListOfActives() const
Returns a list with all active sockets.
Definition: TMonitor.cxx:498
~TMPClient()
Class destructor.
Definition: TMPClient.cxx:59
unsigned fNWorkers
The number of workers that should be spawned upon forking.
Definition: TMPClient.h:50
unsigned GetNWorkers() const
Definition: TMPClient.h:40
void Remove(TSocket *s)
Remove a certain socket from the monitor.
Definition: TMPClient.cxx:239
virtual void DeActivate(TSocket *sock)
De-activate a socket.
Definition: TMonitor.cxx:284
void DeActivate(TSocket *s)
DeActivate a certain socket.
Definition: TMPClient.cxx:225
std::pair< unsigned, std::unique_ptr< TBufferFile >> MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:20
TMPClient(unsigned nWorkers=0)
Class constructor.
Definition: TMPClient.cxx:40
bool fIsParent
This is true if this is the parent/client process, false if this is a child/worker process...
Definition: TMPClient.h:47
Base class for multiprocess applications' clients.
Definition: TMPClient.h:23
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
Definition: TMPClient.h:39
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
Definition: TMPClient.cxx:273
virtual void ActivateAll()
Activate all de-activated sockets.
Definition: TMonitor.cxx:268
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
Definition: TMPClient.cxx:192
TObject * obj
const Int_t n
Definition: legend1.C:16
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
Definition: TMPClient.cxx:252
TMonitor & GetMonitor()
Definition: TMPClient.h:36
bool GetIsParent() const
Definition: TMPClient.h:37
TMPClient & operator=(const TMPClient &)=delete