Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
Messenger.h
Go to the documentation of this file.
1/*
2 * Project: RooFit
3 * Authors:
4 * PB, Patrick Bos, Netherlands eScience Center, p.bos@esciencecenter.nl
5 * IP, Inti Pelupessy, Netherlands eScience Center, i.pelupessy@esciencecenter.nl
6 *
7 * Copyright (c) 2021, CERN
8 *
9 * Redistribution and use in source and binary forms,
10 * with or without modification, are permitted according to the terms
11 * listed in LICENSE (http://roofit.sourceforge.net/license.txt)
12 */
13#ifndef ROOT_ROOFIT_MultiProcess_Messenger
14#define ROOT_ROOFIT_MultiProcess_Messenger
15
17
18#ifdef NDEBUG
19#undef NDEBUG
20#define turn_NDEBUG_back_on
21#endif
22
23namespace RooFit {
24namespace MultiProcess {
25
26// -- WORKER - QUEUE COMMUNICATION --
27
28template <typename T, typename... Ts>
29void Messenger::send_from_worker_to_queue(T item, Ts... items)
30{
31#ifndef NDEBUG
32 std::stringstream ss;
33 ss << "PID " << getpid() << " sends W2Q " << item;
34 debug_print(ss.str());
35#endif
36
38 // if (sizeof...(items) > 0) { // this will only work with if constexpr, c++17
40}
41
42template <typename value_t>
43value_t Messenger::receive_from_worker_on_queue(std::size_t this_worker_id)
44{
45 qw_pull_poller_[this_worker_id].ppoll(-1, &ppoll_sigmask);
46 auto value = zmqSvc().receive<value_t>(*qw_pull_[this_worker_id], zmq::recv_flags::dontwait);
47
48#ifndef NDEBUG
49 std::stringstream ss;
50 ss << "PID " << getpid() << " receives W(" << this_worker_id << ")2Q " << value;
51 debug_print(ss.str());
52#endif
53
54 return value;
55}
56
57template <typename T, typename... Ts>
58void Messenger::send_from_queue_to_worker(std::size_t this_worker_id, T item, Ts... items)
59{
60#ifndef NDEBUG
61 std::stringstream ss;
62 ss << "PID " << getpid() << " sends Q2W(" << this_worker_id << ") " << item;
63 debug_print(ss.str());
64#endif
65
66 zmqSvc().send(*qw_push_[this_worker_id], item, send_flag_);
67 // if (sizeof...(items) > 0) { // this will only work with if constexpr, c++17
68 send_from_queue_to_worker(this_worker_id, items...);
69}
70
71template <typename value_t>
73{
74 qw_pull_poller_[0].ppoll(-1, &ppoll_sigmask);
75 auto value = zmqSvc().receive<value_t>(*this_worker_qw_pull_, zmq::recv_flags::dontwait);
76
77#ifndef NDEBUG
78 std::stringstream ss;
79 ss << "PID " << getpid() << " receives Q2W " << value;
80 debug_print(ss.str());
81#endif
82
83 return value;
84}
85
86// -- QUEUE - MASTER COMMUNICATION --
87
88template <typename T, typename... Ts>
89void Messenger::send_from_queue_to_master(T item, Ts... items)
90{
91#ifndef NDEBUG
92 std::stringstream ss;
93 ss << "PID " << getpid() << " sends Q2M " << item;
94 debug_print(ss.str());
95#endif
96
97 zmqSvc().send(*mq_push_, item, send_flag_);
98 // if (sizeof...(items) > 0) { // this will only work with if constexpr, c++17
100}
101
102template <typename value_t>
104{
106 auto value = zmqSvc().receive<value_t>(*mq_pull_, zmq::recv_flags::dontwait);
107
108#ifndef NDEBUG
109 std::stringstream ss;
110 ss << "PID " << getpid() << " receives Q2M " << value;
111 debug_print(ss.str());
112#endif
113
114 return value;
115}
116
117template <typename T, typename... Ts>
119{
120#ifndef NDEBUG
121 std::stringstream ss;
122 ss << "PID " << getpid() << " sends M2Q " << item;
123 debug_print(ss.str());
124#endif
125
126 zmqSvc().send(*mq_push_, item, send_flag_);
127 // if (sizeof...(items) > 0) { // this will only work with if constexpr, c++17
129}
130
131template <typename value_t>
133{
135 auto value = zmqSvc().receive<value_t>(*mq_pull_, zmq::recv_flags::dontwait);
136
137#ifndef NDEBUG
138 std::stringstream ss;
139 ss << "PID " << getpid() << " receives M2Q " << value;
140 debug_print(ss.str());
141#endif
142
143 return value;
144}
145
146// -- MASTER - WORKER COMMUNICATION --
147
148/// specialization that sends the final message
149template <typename T>
151{
152#ifndef NDEBUG
153 std::stringstream ss;
154 ss << "PID " << getpid() << " sends M2W " << item;
155 debug_print(ss.str());
156#endif
157
158 zmqSvc().send(*mw_pub_, std::forward<T>(item), send_flag_);
159}
160
161/// specialization that queues first parts of multipart messages
162template <typename T, typename T2, typename... Ts>
163void Messenger::publish_from_master_to_workers(T&& item, T2&& item2, Ts&&... items)
164{
165#ifndef NDEBUG
166 std::stringstream ss;
167 ss << "PID " << getpid() << " sends M2W " << item;
168 debug_print(ss.str());
169#endif
170
171 zmqSvc().send(*mw_pub_, std::forward<T>(item), send_flag_ | zmq::send_flags::sndmore);
172 publish_from_master_to_workers(std::forward<T2>(item2), std::forward<Ts>(items)...);
173}
174
175template <typename value_t>
177{
179 auto value = zmqSvc().receive<value_t>(*mw_sub_, zmq::recv_flags::dontwait, more);
180
181#ifndef NDEBUG
182 std::stringstream ss;
183 ss << "PID " << getpid() << " receives M2W " << value;
184 debug_print(ss.str());
185#endif
186
187 return value;
188}
189
190template <typename T, typename... Ts>
192{
193#ifndef NDEBUG
194 std::stringstream ss;
195 ss << "PID " << getpid() << " sends M2W " << item;
196 debug_print(ss.str());
197#endif
198
199 zmqSvc().send(*wm_push_, item, send_flag_);
200 // if (sizeof...(items) > 0) { // this will only work with if constexpr, c++17
202}
203
204template <typename value_t>
206{
208 auto value = zmqSvc().receive<value_t>(*wm_pull_, zmq::recv_flags::dontwait);
209
210#ifndef NDEBUG
211 std::stringstream ss;
212 ss << "PID " << getpid() << " receives M2W " << value;
213 debug_print(ss.str());
214#endif
215
216 return value;
217}
218
219} // namespace MultiProcess
220} // namespace RooFit
221
222#ifdef turn_NDEBUG_back_on
223#define NDEBUG
224#undef turn_NDEBUG_back_on
225#endif
226
227#endif // ROOT_ROOFIT_MultiProcess_Messenger
ZeroMQSvc & zmqSvc()
Get singleton object of this class.
Definition ZeroMQSvc.cpp:34
value_t receive_from_master_on_worker(bool *more=nullptr)
Definition Messenger.h:176
std::vector< ZmqLingeringSocketPtr<> > qw_push_
ZmqLingeringSocketPtr wm_push_
std::vector< ZeroMQPoller > qw_pull_poller_
std::vector< ZmqLingeringSocketPtr<> > qw_pull_
ZmqLingeringSocketPtr mw_pub_
ZmqLingeringSocketPtr mq_pull_
ZmqLingeringSocketPtr wm_pull_
ZmqLingeringSocketPtr this_worker_qw_push_
void debug_print(std::string s)
Function called from send and receive template functions in debug builds used to monitor the messages...
value_t receive_from_worker_on_queue(std::size_t this_worker_id)
Definition Messenger.h:43
void publish_from_master_to_workers(T &&item)
specialization that sends the final message
Definition Messenger.h:150
ZmqLingeringSocketPtr mw_sub_
void send_from_queue_to_worker(std::size_t this_worker_id)
ZmqLingeringSocketPtr this_worker_qw_pull_
ZmqLingeringSocketPtr mq_push_
std::vector< std::pair< size_t, zmq::event_flags > > ppoll(int timeo, const sigset_t *sigmask)
Poll the sockets with ppoll.
zmq::send_result_t send(zmq::socket_t &socket, const T &item, zmq::send_flags flags=zmq::send_flags::none) const
Send message with ZMQ.
Definition ZeroMQSvc.h:205
T receive(zmq::socket_t &socket, zmq::recv_flags flags=zmq::recv_flags::none, bool *more=nullptr) const
receive message with ZMQ, general version
Definition ZeroMQSvc.h:159
#define T2
Definition md5.inl:146
double T(double x)
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
Definition Common.h:18