Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TMPWorkerExecutor.h
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: Enrico Guiraud July 2015
3
4/*************************************************************************
5 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#ifndef ROOT_TMPWorkerExecutor
13#define ROOT_TMPWorkerExecutor
14
15#include "MPCode.h"
16#include "MPSendRecv.h"
17#include "PoolUtils.h"
18#include "TMPWorker.h"
19
20#include <string>
21#include <vector>
22
23//////////////////////////////////////////////////////////////////////////
24///
25/// \class TMPWorkerExecutor
26///
27/// This class works together with TProcessExecutor to allow the execution of
28/// functions in server processes. Depending on the exact task that the
29/// worker is required to execute, a different version of the class
30/// can be called.
31///
32/// ### TMPWorkerExecutor<F, T, R>
33/// The most general case, used by
34/// TProcessExecutor::MapReduce(F func, T& args, R redfunc).
35/// This worker is build with:
36/// * a function of signature F (the one to be executed)
37/// * a collection of arguments of type T on which to apply the function
38/// * a reduce function with signature R to be used to squash many
39/// returned values together.
40///
41/// ### Partial specializations
42/// A few partial specializations are provided for less general cases:
43/// * TMPWorkerExecutor<F, T, void> handles the case of a function that takes
44/// one argument and does not perform reduce operations
45/// (TProcessExecutor::Map(F func, T& args)).
46/// * TMPWorkerExecutor<F, void, R> handles the case of a function that takes
47/// no arguments, to be executed a specified amount of times, which
48/// returned values are squashed together (reduced)
49/// (TProcessExecutor::Map(F func, unsigned nTimes, R redfunc))
50/// * TMPWorkerExecutor<F, void, void> handles the case of a function that takes
51/// no arguments and whose arguments are not "reduced"
52/// (TProcessExecutor::Map(F func, unsigned nTimes))
53///
54/// Since all the important data are passed to TMPWorkerExecutor at construction
55/// time, the kind of messages that client and workers have to exchange
56/// are usually very simple.
57///
58//////////////////////////////////////////////////////////////////////////
59
60// Quick guide to TMPWorkerExecutor:
61// For each TProcessExecutor::Map and TProcessExecutor::MapReduce signature
62// there's a corresponding
63// specialization of TMPWorkerExecutor:
64// * Map(func, nTimes) --> TMPWorkerExecutor<F, void, void>
65// * Map(func, args) --> TMPWorkerExecutor<F, T, void>
66// * MapReduce(func, nTimes, redfunc) --> TMPWorkerExecutor<F, void, R>
67// * MapReduce(func, args, redfunc) --> TMPWorkerExecutor<F, T, R>
68// I thought about having four different classes (with different names)
69// instead of four specializations of the same class template, but it really
70// makes no difference in the end since the different classes would be class
71// templates anyway, and I would have to find a meaningful name for each one.
72// About code replication: looking carefully, it can be noticed that there's
73// very little code replication since the different versions of TMPWorkerExecutor
74// all behave slightly differently, in incompatible ways (e.g. they all need
75// different data members, different signatures for the ctors, and so on).
76
77template<class F, class T = void, class R = void>
79public:
80 // TProcessExecutor is in charge of checking the signatures for incompatibilities:
81 // we trust that decltype(redfunc(std::vector<decltype(func(args[0]))>)) == decltype(args[0])
82 // TODO document somewhere that fReducedResult must have a default ctor
83 TMPWorkerExecutor(F func, const std::vector<T> &args, R redfunc) :
84 TMPWorker(), fFunc(func), fArgs(args), fRedFunc(redfunc),
86 {}
88
89 void HandleInput(MPCodeBufPair &msg) ///< Execute instructions received from a TProcessExecutor client
90 {
91 unsigned code = msg.first;
92 TSocket *s = GetSocket();
93 std::string reply = "S" + std::to_string(GetNWorker());
94 if (code == MPCode::kExecFuncWithArg) {
95 unsigned n;
96 msg.second->ReadUInt(n);
97 // execute function on argument n
98 const auto &res = fFunc(fArgs[n]);
99 // tell client we're done
101 // reduce arguments if possible
102 if (fCanReduce) {
103 using FINAL = decltype(fReducedResult);
104 using ORIGINAL = decltype(fRedFunc({res, fReducedResult}));
105 fReducedResult = ROOT::Internal::PoolUtils::ResultCaster<ORIGINAL, FINAL>::CastIfNeeded(fRedFunc({res, fReducedResult})); //TODO try not to copy these into a vector, do everything by ref. std::vector<T&>?
106 } else {
107 fCanReduce = true;
108 fReducedResult = res;
109 }
110 } else if (code == MPCode::kSendResult) {
112 } else {
113 reply += ": unknown code received: " + std::to_string(code);
114 MPSend(s, MPCode::kError, reply.c_str());
115 }
116 }
117
118private:
119 F fFunc; ///< the function to be executed
120 std::vector<T> fArgs; ///< a vector containing the arguments that must be passed to fFunc
121 R fRedFunc; ///< the reduce function
122 decltype(fFunc(fArgs.front())) fReducedResult; ///< the result of the execution
123 bool fCanReduce; ///< true if fReducedResult can be reduced with a new result, false until we have produced one result
124};
125
126
127template<class F, class R>
128class TMPWorkerExecutor<F, void, R> : public TMPWorker {
129public:
130 TMPWorkerExecutor(F func, R redfunc) :
131 TMPWorker(), fFunc(func), fRedFunc(redfunc),
132 fReducedResult(), fCanReduce(false)
133 {}
135
136 void HandleInput(MPCodeBufPair &msg) ///< Execute instructions received from a TProcessExecutor client
137 {
138 unsigned code = msg.first;
139 TSocket *s = GetSocket();
140 std::string reply = "S" + std::to_string(GetNWorker());
141 if (code == MPCode::kExecFunc) {
142 // execute function
143 const auto &res = fFunc();
144 // tell client we're done
146 // reduce arguments if possible
147 if (fCanReduce) {
148 fReducedResult = fRedFunc({res, fReducedResult}); //TODO try not to copy these into a vector, do everything by ref. std::vector<T&>?
149 } else {
150 fCanReduce = true;
151 fReducedResult = res;
152 }
153 } else if (code == MPCode::kSendResult) {
155 } else {
156 reply += ": unknown code received: " + std::to_string(code);
157 MPSend(s, MPCode::kError, reply.c_str());
158 }
159 }
160
161private:
162 F fFunc; ///< the function to be executed
163 R fRedFunc; ///< the reduce function
164 decltype(fFunc()) fReducedResult; ///< the result of the execution
165 bool fCanReduce; ///< true if fReducedResult can be reduced with a new result, false until we have produced one result
166};
167
168template<class F, class T>
169class TMPWorkerExecutor<F, T, void> : public TMPWorker {
170public:
171 TMPWorkerExecutor(F func, const std::vector<T> &args) : TMPWorker(), fFunc(func), fArgs(std::move(args)) {}
173 void HandleInput(MPCodeBufPair &msg) ///< Execute instructions received from a TProcessExecutor client
174 {
175 unsigned code = msg.first;
176 TSocket *s = GetSocket();
177 if (code == MPCode::kExecFuncWithArg) {
178 unsigned n;
179 msg.second->ReadUInt(n);
181 } else {
182 std::string reply = "S" + std::to_string(GetNWorker()) + ": unknown code received: " + std::to_string(code);
183 MPSend(s, MPCode::kError, reply.c_str());
184 }
185 }
186
187private:
188 F fFunc; ///< the function to be executed
189 std::vector<T> fArgs; ///< a vector containing the arguments that must be passed to fFunc
190};
191
192
193// doxygen should ignore this specialization
194/// \cond
195// The most generic class template is meant to handle functions that
196// must be executed by passing one argument to them.
197// This partial specialization is used to handle the case
198// of functions which must be executed without passing any argument.
199template<class F>
200class TMPWorkerExecutor<F, void, void> : public TMPWorker {
201public:
202 explicit TMPWorkerExecutor(F func) : TMPWorker(), fFunc(func) {}
204 void HandleInput(MPCodeBufPair &msg)
205 {
206 unsigned code = msg.first;
207 TSocket *s = GetSocket();
208 std::string myId = "S" + std::to_string(GetPid());
209 if (code == MPCode::kExecFunc) {
211 } else {
212 MPSend(s, MPCode::kError, (myId + ": unknown code received: " + std::to_string(code)).c_str());
213 }
214 }
215
216private:
217 F fFunc;
218};
219/// \endcond
220
221#endif
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition MPSendRecv.h:32
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
F fFunc
the function to be executed
TMPWorkerExecutor(F func, const std::vector< T > &args)
void HandleInput(MPCodeBufPair &msg)
Handle a message with an EMPCode.
std::vector< T > fArgs
a vector containing the arguments that must be passed to fFunc
void HandleInput(MPCodeBufPair &msg)
Handle a message with an EMPCode.
F fFunc
the function to be executed
bool fCanReduce
true if fReducedResult can be reduced with a new result, false until we have produced one result
This class works together with TProcessExecutor to allow the execution of functions in server process...
bool fCanReduce
true if fReducedResult can be reduced with a new result, false until we have produced one result
F fFunc
the function to be executed
R fRedFunc
the reduce function
void HandleInput(MPCodeBufPair &msg)
Handle a message with an EMPCode.
decltype(fFunc(fArgs.front())) fReducedResult
the result of the execution
TMPWorkerExecutor(F func, const std::vector< T > &args, R redfunc)
std::vector< T > fArgs
a vector containing the arguments that must be passed to fFunc
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
Definition TMPWorker.h:25
unsigned GetNWorker() const
Definition TMPWorker.h:41
pid_t GetPid()
Definition TMPWorker.h:40
TSocket * GetSocket()
Definition TMPWorker.h:39
const Int_t n
Definition legend1.C:16
#define F(x, y, z)
@ kSendResult
Ask for a kFuncResult/kProcResult.
Definition MPCode.h:36
@ kIdling
We are ready for the next task.
Definition MPCode.h:35
@ kError
Error message.
Definition MPCode.h:47
@ kFuncResult
The message contains the result of a function execution.
Definition MPCode.h:33
@ kExecFuncWithArg
Execute function with the argument contained in the message.
Definition MPCode.h:32
@ kExecFunc
Execute function without arguments.
Definition MPCode.h:31