Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
Messenger_decl.h
Go to the documentation of this file.
1/*
2 * Project: RooFit
3 * Authors:
4 * PB, Patrick Bos, Netherlands eScience Center, p.bos@esciencecenter.nl
5 * IP, Inti Pelupessy, Netherlands eScience Center, i.pelupessy@esciencecenter.nl
6 *
7 * Copyright (c) 2021, CERN
8 *
9 * Redistribution and use in source and binary forms,
10 * with or without modification, are permitted according to the terms
11 * listed in LICENSE (http://roofit.sourceforge.net/license.txt)
12 */
13#ifndef ROOT_ROOFIT_MultiProcess_Messenger_decl
14#define ROOT_ROOFIT_MultiProcess_Messenger_decl
15
19
20#include <iosfwd>
21#include <vector>
22#include <csignal> // sigprocmask, sigset_t, etc
23#include <string>
24
25namespace RooFit {
26namespace MultiProcess {
27
29
30// test messages
31enum class X2X : int { ping = -1, pong = -2, initial_value = 0 };
32
33class Messenger {
34public:
35 explicit Messenger(const ProcessManager &process_manager);
36 ~Messenger();
37
38 void test_connections(const ProcessManager &process_manager);
39
40 enum class test_snd_pipes {
41 M2Q,
42 Q2M,
43 Q2W,
44 W2Q,
45 };
46
47 enum class test_rcv_pipes {
52 };
53
54 std::pair<ZeroMQPoller, std::size_t> create_queue_poller();
55 std::pair<ZeroMQPoller, std::size_t> create_worker_poller();
56
57 // -- WORKER - QUEUE COMMUNICATION --
58
60 template <typename T, typename... Ts>
61 void send_from_worker_to_queue(T item, Ts... items);
62 template <typename value_t>
63 value_t receive_from_worker_on_queue(std::size_t this_worker_id);
64 void send_from_queue_to_worker(std::size_t this_worker_id);
65 template <typename T, typename... Ts>
66 void send_from_queue_to_worker(std::size_t this_worker_id, T item, Ts... items);
67 template <typename value_t>
69
70 // -- QUEUE - MASTER COMMUNICATION --
71
73
74 template <typename T, typename... Ts>
75 void send_from_queue_to_master(T item, Ts... items);
76 template <typename value_t>
79
80 template <typename T, typename... Ts>
81 void send_from_master_to_queue(T item, Ts... items);
82 template <typename value_t>
84
85 // -- MASTER - WORKER COMMUNICATION --
86
87 template <typename T>
88 void publish_from_master_to_workers(T &&item);
89 template <typename T, typename T2, typename... Ts>
90 void publish_from_master_to_workers(T &&item, T2 &&item2, Ts &&...items);
91 template <typename value_t>
92 value_t receive_from_master_on_worker(bool *more = nullptr);
93
94 template <typename T>
95 void send_from_worker_to_master(T &&item);
96 template <typename T, typename T2, typename... Ts>
97 void send_from_worker_to_master(T &&item, T2 &&item2, Ts &&...items);
98 template <typename value_t>
99 value_t receive_from_worker_on_master(bool *more = nullptr);
100
101 void test_receive(X2X expected_ping_value, test_rcv_pipes rcv_pipe, std::size_t worker_id);
102 void test_send(X2X ping_value, test_snd_pipes snd_pipe, std::size_t worker_id);
103
105
106 void set_send_flag(zmq::send_flags flag);
107
108private:
109 void debug_print(std::string s);
110
111 template <class T>
112 void bindAddr(T &socket, std::string &&addr)
113 {
114 bound_ipc_addresses_.emplace_back(addr);
115 socket->bind(bound_ipc_addresses_.back());
116 }
117
118 // push
119 std::vector<ZmqLingeringSocketPtr<>> qw_push_;
122 // pollers for all push sockets
123 std::vector<ZeroMQPoller> qw_push_poller_;
125 // pull
126 std::vector<ZmqLingeringSocketPtr<>> qw_pull_;
129 // pollers for all pull sockets
130 std::vector<ZeroMQPoller> qw_pull_poller_;
132
133 // publish/subscribe sockets for parameter updating from master to workers
137 // push/pull sockets for result retrieving from workers on master
141
142 // destruction flags to distinguish between different process-type setups:
146
147 zmq::send_flags send_flag_ = zmq::send_flags::none;
148
149 std::vector<std::string> bound_ipc_addresses_;
150};
151
152// Messages from master to queue
153enum class M2Q : int {
154 enqueue = 10,
155};
156
157// Messages from worker to queue
158enum class W2Q : int { dequeue = 30 };
159
160// Messages from queue to worker
161enum class Q2W : int {
162 dequeue_rejected = 40,
163 dequeue_accepted = 41,
164};
165
166// stream output operators for debugging
167std::ostream &operator<<(std::ostream &out, const M2Q value);
168std::ostream &operator<<(std::ostream &out, const Q2W value);
169std::ostream &operator<<(std::ostream &out, const W2Q value);
170std::ostream &operator<<(std::ostream &out, const X2X value);
171
172} // namespace MultiProcess
173} // namespace RooFit
174
175#endif // ROOT_ROOFIT_MultiProcess_Messenger_decl
TBuffer & operator<<(TBuffer &buf, const Tmpl *obj)
Definition TBuffer.h:397
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void value
std::unique_ptr< zmq::socket_t, ZmqLingeringSocketPtrDeleter< PERIOD > > ZmqLingeringSocketPtr
Definition ZeroMQSvc.h:74
Manages ZeroMQ sockets and wraps send and receive calls.
void test_receive(X2X expected_ping_value, test_rcv_pipes rcv_pipe, std::size_t worker_id)
void test_connections(const ProcessManager &process_manager)
Test whether push-pull sockets are working.
value_t receive_from_master_on_worker(bool *more=nullptr)
Definition Messenger.h:176
std::vector< ZmqLingeringSocketPtr<> > qw_push_
std::vector< std::string > bound_ipc_addresses_
void send_from_worker_to_master(T &&item)
specialization that sends the final message
Definition Messenger.h:192
void set_send_flag(zmq::send_flags flag)
Set the flag used in all send functions; 0, ZMQ_DONTWAIT, ZMQ_SNDMORE or bitwise combination.
value_t receive_from_worker_on_master(bool *more=nullptr)
Definition Messenger.h:219
ZmqLingeringSocketPtr wm_push_
std::vector< ZeroMQPoller > qw_pull_poller_
std::vector< ZmqLingeringSocketPtr<> > qw_pull_
ZmqLingeringSocketPtr mw_pub_
void bindAddr(T &socket, std::string &&addr)
ZmqLingeringSocketPtr mq_pull_
ZmqLingeringSocketPtr wm_pull_
std::vector< ZeroMQPoller > qw_push_poller_
ZmqLingeringSocketPtr this_worker_qw_push_
void debug_print(std::string s)
Function called from send and receive template functions in debug builds used to monitor the messages...
value_t receive_from_worker_on_queue(std::size_t this_worker_id)
Definition Messenger.h:43
void publish_from_master_to_workers(T &&item)
specialization that sends the final message
Definition Messenger.h:150
ZmqLingeringSocketPtr mw_sub_
void send_from_queue_to_worker(std::size_t this_worker_id)
ZmqLingeringSocketPtr this_worker_qw_pull_
ZmqLingeringSocketPtr mq_push_
void test_send(X2X ping_value, test_snd_pipes snd_pipe, std::size_t worker_id)
std::pair< ZeroMQPoller, std::size_t > create_worker_poller()
Helper function that creates a poller for worker_loop()
std::pair< ZeroMQPoller, std::size_t > create_queue_poller()
Helper function that creates a poller for Queue::loop()
Fork processes for queue and workers.
Wrapper class for polling ZeroMQ sockets.
#define T2
Definition md5.inl:147
void set_socket_immediate(ZmqLingeringSocketPtr<> &socket)
Definition Messenger.cxx:24
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
Definition JSONIO.h:26