Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
LikelihoodJob.cxx
Go to the documentation of this file.
1/*
2 * Project: RooFit
3 * Authors:
4 * PB, Patrick Bos, Netherlands eScience Center, p.bos@esciencecenter.nl
5 *
6 * Copyright (c) 2021, CERN
7 *
8 * Redistribution and use in source and binary forms,
9 * with or without modification, are permitted according to the terms
10 * listed in LICENSE (http://roofit.sourceforge.net/license.txt)
11 */
12
13#include "LikelihoodJob.h"
14
15#include "LikelihoodSerial.h"
27#include "RooRealVar.h"
28#include "RooNaNPacker.h"
29
30#include "TMath.h" // IsNaN
31
32namespace RooFit {
33namespace TestStatistics {
34
35LikelihoodJob::LikelihoodJob(std::shared_ptr<RooAbsL> likelihood,
36 std::shared_ptr<WrapperCalculationCleanFlags> calculation_is_clean, SharedOffset offset)
37 : LikelihoodWrapper(std::move(likelihood), std::move(calculation_is_clean), std::move(offset)),
38 n_event_tasks_(MultiProcess::Config::LikelihoodJob::defaultNEventTasks),
39 n_component_tasks_(MultiProcess::Config::LikelihoodJob::defaultNComponentTasks),
40 likelihood_serial_(likelihood_, calculation_is_clean_, shared_offset_)
41{
42 init_vars();
44}
45
46// This is a separate function (instead of just in ctor) for historical reasons.
47// Its predecessor RooRealMPFE::initVars() was used from multiple ctors, but also
48// from RooRealMPFE::constOptimizeTestStatistic at the end, which makes sense,
49// because it might change the set of variables. We may at some point want to do
50// this here as well.
52{
53 // Empty current lists
56
57 // Retrieve non-constant parameters
58 std::unique_ptr<RooArgSet> vars{likelihood_->getParameters()};
59 // TODO: make sure this is the right list of parameters, compare to original
60 // implementation in RooRealMPFE.cxx
61
62 RooArgList varList(*vars);
63
64 // Save in lists
65 vars_.add(varList);
66 save_vars_.addClone(varList);
67}
68
70{
71 if (get_manager()->process_manager().is_worker()) {
72 bool more;
73
75 assert(more);
76
77 switch (mode) {
80 assert(more);
81 auto message = get_manager()->messenger().receive_from_master_on_worker<zmq::message_t>(&more);
82 auto message_begin = message.data<update_state_t>();
83 auto message_end = message_begin + message.size() / sizeof(update_state_t);
84 std::vector<update_state_t> to_update(message_begin, message_end);
85 for (auto const &item : to_update) {
86 RooRealVar *rvar = static_cast<RooRealVar *>(vars_.at(item.var_index));
87 rvar->setVal(static_cast<double>(item.value));
88 if (rvar->isConstant() != item.is_constant) {
89 rvar->setConstant(static_cast<bool>(item.is_constant));
90 }
91 }
92
93 if (more) {
94 // offsets also incoming
95 auto offsets_message = get_manager()->messenger().receive_from_master_on_worker<zmq::message_t>(&more);
96 assert(!more);
97 auto offsets_message_begin = offsets_message.data<ROOT::Math::KahanSum<double>>();
98 std::size_t N_offsets = offsets_message.size() / sizeof(ROOT::Math::KahanSum<double>);
99 shared_offset_.offsets().resize(N_offsets);
100 auto offsets_message_end = offsets_message_begin + N_offsets;
101 std::copy(offsets_message_begin, offsets_message_end, shared_offset_.offsets().begin());
102 }
103
104 break;
105 }
107 LikelihoodWrapper::enableOffsetting(get_manager()->messenger().receive_from_master_on_worker<bool>(&more));
108 assert(!more);
109 break;
110 }
111 }
112 }
113}
114
116{
117 std::size_t val = n_event_tasks_;
119 val = 1;
120 }
121 if (val > likelihood_->getNEvents()) {
122 val = likelihood_->getNEvents();
123 }
124 return val;
125}
126
127/// \warning In automatic mode, this function can start MultiProcess (forks, starts workers, etc)!
129{
130 std::size_t val = n_component_tasks_;
132 val = get_manager()
134 .N_workers(); // get_manager() is the call that can start MultiProcess, mentioned above
135 }
136 if (val > likelihood_->getNComponents()) {
137 val = likelihood_->getNComponents();
138 }
139 return val;
140}
141
143{
144 if (get_manager()->process_manager().is_master()) {
145 bool valChanged = false;
146 bool constChanged = false;
147 std::vector<update_state_t> to_update;
148 for (std::size_t ix = 0u; ix < static_cast<std::size_t>(vars_.size()); ++ix) {
149 valChanged = !vars_[ix].isIdentical(save_vars_[ix], true);
150 constChanged = (vars_[ix].isConstant() != save_vars_[ix].isConstant());
151
152 if (valChanged || constChanged) {
153 if (constChanged) {
154 (static_cast<RooRealVar *>(&save_vars_[ix]))->setConstant(vars_[ix].isConstant());
155 }
156 // TODO: Check with Wouter why he uses copyCache in MPFE; makes it very difficult to extend, because
157 // copyCache is protected (so must be friend). Moved setting value to if-block below.
158 // _saveVars[ix].copyCache(&_vars[ix]);
159
160 // send message to queue (which will relay to workers)
161 RooAbsReal *rar_val = dynamic_cast<RooAbsReal *>(&vars_[ix]);
162 if (rar_val) {
163 double val = rar_val->getVal();
164 dynamic_cast<RooRealVar *>(&save_vars_[ix])->setVal(val);
165 bool isC = vars_[ix].isConstant();
166 to_update.push_back(update_state_t{ix, val, isC});
167 }
168 }
169 }
170 bool update_offsets = isOffsetting() && shared_offset_.offsets() != offsets_previous_;
171 if (!to_update.empty() || update_offsets) {
172 ++state_id_;
173 zmq::message_t message(to_update.begin(), to_update.end());
174 // always send Job id first! This is used in worker_loop to route the
175 // update_state call to the correct Job.
176 if (update_offsets) {
177 zmq::message_t offsets_message(shared_offset_.offsets().begin(), shared_offset_.offsets().end());
179 std::move(message), std::move(offsets_message));
181 } else {
183 std::move(message));
184 }
185 }
186 }
187}
188
190{
192}
193
195{
196 if (get_manager()->process_manager().is_master()) {
197 // evaluate the serial likelihood to set the offsets
198 if (do_offset_ && shared_offset_.offsets().empty()) {
200 // note: we don't need to get the offsets from the serial likelihood, because they are already coupled through
201 // the shared_ptr
202 }
203
204 // update parameters that changed since last calculation (or creation if first time)
206
207 // master fills queue with tasks
208 auto N_tasks = getNEventTasks() * getNComponentTasks();
209 for (std::size_t ix = 0; ix < N_tasks; ++ix) {
210 get_manager()->queue()->add({id_, state_id_, ix});
211 }
212 n_tasks_at_workers_ = N_tasks;
213
214 // wait for task results back from workers to master
216
217 RooNaNPacker packedNaN;
218
219 // Note: initializing result_ to results_[0] instead of zero-initializing it makes
220 // a difference due to Kahan sum precision. This way, a single-worker run gives
221 // the same result as a run with serial likelihood. Adding the terms to a zero
222 // initial sum can cancel the carry in some cases, causing divergent values.
223 result_ = results_[0];
224 packedNaN.accumulate(results_[0].Sum());
225 for (auto item_it = results_.cbegin() + 1; item_it != results_.cend(); ++item_it) {
226 result_ += *item_it;
227 packedNaN.accumulate(item_it->Sum());
228 }
229 results_.clear();
230
231 if (packedNaN.getPayload() != 0) {
233 }
234
235 if (TMath::IsNaN(result_.Sum())) {
236 RooAbsReal::logEvalError(nullptr, GetName().c_str(), "function value is NAN");
237 }
238 }
239}
240
241// --- RESULT LOGISTICS ---
242
244{
245 int numErrors = RooAbsReal::numEvalErrors();
246
247 if (numErrors) {
248 // Clear error list on local side
250 }
251
252 task_result_t task_result{id_, result_.Result(), result_.Carry(), numErrors > 0};
253 zmq::message_t message(sizeof(task_result_t));
254 memcpy(message.data(), &task_result, sizeof(task_result_t));
255 get_manager()->messenger().send_from_worker_to_master(std::move(message));
256}
257
258bool LikelihoodJob::receive_task_result_on_master(const zmq::message_t &message)
259{
260 auto task_result = message.data<task_result_t>();
261 results_.emplace_back(task_result->value, task_result->carry);
262 if (task_result->has_errors) {
263 RooAbsReal::logEvalError(nullptr, "LikelihoodJob", "evaluation errors at the worker processes", "no servervalue");
264 }
266 bool job_completed = (n_tasks_at_workers_ == 0);
267 return job_completed;
268}
269
270// --- END OF RESULT LOGISTICS ---
271
272void LikelihoodJob::evaluate_task(std::size_t task)
273{
274 assert(get_manager()->process_manager().is_worker());
275
276 double section_first = 0;
277 double section_last = 1;
278 if (getNEventTasks() > 1) {
279 std::size_t event_task = task % getNEventTasks();
280 std::size_t N_events = likelihood_->numDataEntries();
281 if (event_task > 0) {
282 std::size_t first = N_events * event_task / getNEventTasks();
283 section_first = static_cast<double>(first) / N_events;
284 }
285 if (event_task < getNEventTasks() - 1) {
286 std::size_t last = N_events * (event_task + 1) / getNEventTasks();
287 section_last = static_cast<double>(last) / N_events;
288 }
289 }
290
291 switch (likelihood_type_) {
294 result_ = likelihood_->evaluatePartition({section_first, section_last}, 0, 0);
295 if (do_offset_ && section_last == 1) {
296 // we only subtract at the end of event sections, otherwise the offset is subtracted for each event split
298 }
299 break;
300 }
302 result_ = likelihood_->evaluatePartition({0, 1}, 0, 0);
305 }
306 break;
307 }
308 case LikelihoodType::sum: {
309 std::size_t components_first = 0;
310 std::size_t components_last = likelihood_->getNComponents();
311 if (getNComponentTasks() > 1) {
312 std::size_t component_task = task / getNEventTasks();
313 components_first = likelihood_->getNComponents() * component_task / getNComponentTasks();
314 if (component_task == getNComponentTasks() - 1) {
315 components_last = likelihood_->getNComponents();
316 } else {
317 components_last = likelihood_->getNComponents() * (component_task + 1) / getNComponentTasks();
318 }
319 }
320
322 RooNaNPacker packedNaN;
323 for (std::size_t comp_ix = components_first; comp_ix < components_last; ++comp_ix) {
324 auto component_result = likelihood_->evaluatePartition({section_first, section_last}, comp_ix, comp_ix + 1);
325 packedNaN.accumulate(component_result.Sum());
326 if (do_offset_ && section_last == 1 &&
328 // we only subtract at the end of event sections, otherwise the offset is subtracted for each event split
329 result_ += (component_result - shared_offset_.offsets()[comp_ix]);
330 } else {
331 result_ += component_result;
332 }
333 }
334 if (packedNaN.getPayload() != 0) {
336 }
337
338 break;
339 }
340 }
341}
342
344{
348 printf("WARNING: when calling MinuitFcnGrad::setOffsetting after the run has already been started the "
349 "MinuitFcnGrad::likelihood_in_gradient object (a LikelihoodSerial) on the workers can no longer be "
350 "updated! This function (LikelihoodJob::enableOffsetting) can in principle be used outside of "
351 "MinuitFcnGrad, but be aware of this limitation. To do a minimization with a different offsetting "
352 "setting, please delete all RooFit::MultiProcess based objects so that the forked processes are killed "
353 "and then set up a new RooMinimizer.\n");
355 }
356}
357
358#define PROCESS_VAL(p) \
359 case (p): s = #p; break;
360
361std::ostream &operator<<(std::ostream &out, const LikelihoodJob::update_state_mode value)
362{
363 std::string s;
364 switch (value) {
367 default: s = std::to_string(static_cast<int>(value));
368 }
369 return out << s;
370}
371
372#undef PROCESS_VAL
373
374} // namespace TestStatistics
375} // namespace RooFit
#define PROCESS_VAL(p)
TBuffer & operator<<(TBuffer &buf, const Tmpl *obj)
Definition TBuffer.h:397
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h offset
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void value
Option_t Option_t TPoint TPoint const char mode
The Kahan summation is a compensated summation algorithm, which significantly reduces numerical error...
Definition Util.h:122
T Sum() const
Definition Util.h:240
T Result() const
Definition Util.h:245
T Carry() const
Definition Util.h:250
bool isConstant() const
Check if the "Constant" attribute is set.
Definition RooAbsArg.h:335
virtual void removeAll()
Remove all arguments from our set, deleting them if we own them.
virtual bool add(const RooAbsArg &var, bool silent=false)
Add the specified argument to list.
Storage_t::size_type size() const
virtual RooAbsArg * addClone(const RooAbsArg &var, bool silent=false)
Add a clone of the specified argument to list.
void setConstant(bool value=true)
Abstract base class for objects that represent a real value and implements functionality common to al...
Definition RooAbsReal.h:59
double getVal(const RooArgSet *normalisationSet=nullptr) const
Evaluate object.
Definition RooAbsReal.h:103
static Int_t numEvalErrors()
Return the number of logged evaluation errors since the last clearing.
void logEvalError(const char *message, const char *serverValueString=nullptr) const
Log evaluation error message.
static void clearEvalErrorLog()
Clear the stack of evaluation error messages.
RooArgList is a container object that can hold multiple RooAbsArg objects.
Definition RooArgList.h:22
RooAbsArg * at(Int_t idx) const
Return object at given index, or nullptr if index is out of range.
Definition RooArgList.h:110
ProcessManager & process_manager() const
std::size_t id_
Definition Job.h:45
std::size_t state_id_
Definition Job.h:46
JobManager * get_manager()
Get JobManager instance; create and activate if necessary.
Definition Job.cxx:112
void gather_worker_results()
Wait for all tasks to be retrieved for the current Job.
Definition Job.cxx:126
value_t receive_from_master_on_worker(bool *more=nullptr)
Definition Messenger.h:176
void send_from_worker_to_master(T &&item)
specialization that sends the final message
Definition Messenger.h:192
void publish_from_master_to_workers(T &&item)
specialization that sends the final message
Definition Messenger.h:150
virtual void add(JobTask job_task)=0
Enqueue a task.
void enableOffsetting(bool flag) override
bool receive_task_result_on_master(const zmq::message_t &message) override
void evaluate_task(std::size_t task) override
std::vector< ROOT::Math::KahanSum< double > > results_
void send_back_task_result_from_worker(std::size_t task) override
void update_state() override
Virtual function to update any necessary state on workers.
void evaluate() override
Triggers (possibly asynchronous) evaluation of the likelihood.
SharedOffset::OffsetVec offsets_previous_
ROOT::Math::KahanSum< double > result_
LikelihoodJob(std::shared_ptr< RooAbsL > _likelihood, std::shared_ptr< WrapperCalculationCleanFlags > calculation_is_clean, SharedOffset offset)
void evaluate() override
Triggers (possibly asynchronous) evaluation of the likelihood.
Virtual base class for implementation of likelihood calculation strategies.
Variable that can be changed from the outside.
Definition RooRealVar.h:37
void setVal(double value) override
Set value of variable to 'value'.
OffsetVec & offsets()
std::size_t State
Definition types.h:23
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
Definition JSONIO.h:26
Bool_t IsNaN(Double_t x)
Definition TMath.h:892
static constexpr std::size_t automaticNEventTasks
Definition Config.h:34
static constexpr std::size_t automaticNComponentTasks
Definition Config.h:35
Little struct that can pack a float into the unused bits of the mantissa of a NaN double.
float getPayload() const
Retrieve packed float.
double getNaNWithPayload() const
Retrieve a NaN with the current float payload packed into the mantissa.
void accumulate(double val)
Accumulate a packed float from another NaN into this.