Logo ROOT  
Reference Guide
Loading...
Searching...
No Matches
LikelihoodJob.cxx
Go to the documentation of this file.
1/*
2 * Project: RooFit
3 * Authors:
4 * PB, Patrick Bos, Netherlands eScience Center, p.bos@esciencecenter.nl
5 *
6 * Copyright (c) 2021, CERN
7 *
8 * Redistribution and use in source and binary forms,
9 * with or without modification, are permitted according to the terms
10 * listed in LICENSE (http://roofit.sourceforge.net/license.txt)
11 */
12
13#include "LikelihoodJob.h"
14
15#include "LikelihoodSerial.h"
27#include "RooRealVar.h"
28#include "RooNaNPacker.h"
29
30#include "TMath.h" // IsNaN
31
32namespace RooFit {
33namespace TestStatistics {
34
35LikelihoodJob::LikelihoodJob(std::shared_ptr<RooAbsL> likelihood,
36 std::shared_ptr<WrapperCalculationCleanFlags> calculation_is_clean, SharedOffset offset)
37 : LikelihoodWrapper(std::move(likelihood), std::move(calculation_is_clean), std::move(offset)),
38 n_event_tasks_(MultiProcess::Config::LikelihoodJob::defaultNEventTasks),
39 n_component_tasks_(MultiProcess::Config::LikelihoodJob::defaultNComponentTasks),
41{
42 init_vars();
44}
45
46// This is a separate function (instead of just in ctor) for historical reasons.
47// Its predecessor RooRealMPFE::initVars() was used from multiple ctors, but also
48// from RooRealMPFE::constOptimizeTestStatistic at the end, which makes sense,
49// because it might change the set of variables. We may at some point want to do
50// this here as well.
52{
53 // Empty current lists
54 vars_.removeAll();
55 save_vars_.removeAll();
56
57 // Retrieve non-constant parameters
58 std::unique_ptr<RooArgSet> vars{likelihood_->getParameters()};
59 // TODO: make sure this is the right list of parameters, compare to original
60 // implementation in RooRealMPFE.cxx
61
62 RooArgList varList(*vars);
63
64 // Save in lists
65 vars_.add(varList);
66 save_vars_.addClone(varList);
67}
68
70{
71 if (get_manager()->process_manager().is_worker()) {
72 bool more;
73
75 assert(more);
76
77 switch (mode) {
80 assert(more);
81 auto message = get_manager()->messenger().receive_from_master_on_worker<zmq::message_t>(&more);
82 auto message_begin = message.data<update_state_t>();
83 auto message_end = message_begin + message.size() / sizeof(update_state_t);
84 std::vector<update_state_t> to_update(message_begin, message_end);
85 for (auto const &item : to_update) {
86 RooRealVar *rvar = static_cast<RooRealVar *>(vars_.at(item.var_index));
87 rvar->setVal(static_cast<double>(item.value));
88 if (rvar->isConstant() != item.is_constant) {
89 rvar->setConstant(static_cast<bool>(item.is_constant));
90 }
91 }
92
93 if (more) {
94 // offsets also incoming
95 auto offsets_message = get_manager()->messenger().receive_from_master_on_worker<zmq::message_t>(&more);
96 assert(!more);
97 auto offsets_message_begin = offsets_message.data<ROOT::Math::KahanSum<double>>();
98 std::size_t N_offsets = offsets_message.size() / sizeof(ROOT::Math::KahanSum<double>);
99 shared_offset_.offsets().resize(N_offsets);
100 auto offsets_message_end = offsets_message_begin + N_offsets;
101 std::copy(offsets_message_begin, offsets_message_end, shared_offset_.offsets().begin());
102 }
103
104 break;
105 }
107 LikelihoodWrapper::enableOffsetting(get_manager()->messenger().receive_from_master_on_worker<bool>(&more));
108 assert(!more);
109 break;
110 }
111 }
112 }
113}
114
116{
117 std::size_t val = n_event_tasks_;
119 val = 1;
120 }
121 if (val > likelihood_->getNEvents()) {
122 val = likelihood_->getNEvents();
123 }
124 return val;
125}
126
127/// \warning In automatic mode, this function can start MultiProcess (forks, starts workers, etc)!
129{
130 std::size_t val = n_component_tasks_;
132 val = get_manager()
134 .N_workers(); // get_manager() is the call that can start MultiProcess, mentioned above
135 }
136 if (val > likelihood_->getNComponents()) {
137 val = likelihood_->getNComponents();
138 }
139 return val;
140}
141
143{
144 if (get_manager()->process_manager().is_master()) {
145 bool valChanged = false;
146 bool constChanged = false;
147 std::vector<update_state_t> to_update;
148 for (std::size_t ix = 0u; ix < static_cast<std::size_t>(vars_.size()); ++ix) {
149 valChanged = !vars_[ix].isIdentical(save_vars_[ix], true);
150 constChanged = (vars_[ix].isConstant() != save_vars_[ix].isConstant());
151
152 if (valChanged || constChanged) {
153 if (constChanged) {
154 (static_cast<RooRealVar *>(&save_vars_[ix]))->setConstant(vars_[ix].isConstant());
155 }
156 // TODO: Check with Wouter why he uses copyCache in MPFE; makes it very difficult to extend, because
157 // copyCache is protected (so must be friend). Moved setting value to if-block below.
158 // _saveVars[ix].copyCache(&_vars[ix]);
159
160 // send message to queue (which will relay to workers)
161 RooAbsReal *rar_val = dynamic_cast<RooAbsReal *>(&vars_[ix]);
162 if (rar_val) {
163 double val = rar_val->getVal();
164 dynamic_cast<RooRealVar *>(&save_vars_[ix])->setVal(val);
165 bool isC = vars_[ix].isConstant();
166 to_update.push_back(update_state_t{ix, val, isC});
167 }
168 }
169 }
170 bool update_offsets = isOffsetting() && shared_offset_.offsets() != offsets_previous_;
171 if (!to_update.empty() || update_offsets) {
172 ++state_id_;
173 zmq::message_t message(to_update.begin(), to_update.end());
174 // always send Job id first! This is used in worker_loop to route the
175 // update_state call to the correct Job.
176 if (update_offsets) {
177 zmq::message_t offsets_message(shared_offset_.offsets().begin(), shared_offset_.offsets().end());
179 std::move(message), std::move(offsets_message));
181 } else {
183 std::move(message));
184 }
185 }
186 }
187}
188
193
195{
196 if (get_manager()->process_manager().is_master()) {
197 // evaluate the serial likelihood to set the offsets
198 if (do_offset_ && shared_offset_.offsets().empty()) {
199 likelihood_serial_.evaluate();
200 // note: we don't need to get the offsets from the serial likelihood, because they are already coupled through
201 // the shared_ptr
202 }
203
204 // update parameters that changed since last calculation (or creation if first time)
206
207 // master fills queue with tasks
208 auto N_tasks = getNEventTasks() * getNComponentTasks();
209 for (std::size_t ix = 0; ix < N_tasks; ++ix) {
210 get_manager()->queue()->add({id_, state_id_, ix});
211 }
212 n_tasks_at_workers_ = N_tasks;
213
214 // wait for task results back from workers to master
216
217 RooNaNPacker packedNaN;
218
219 // Note: initializing result_ to results_[0] instead of zero-initializing it makes
220 // a difference due to Kahan sum precision. This way, a single-worker run gives
221 // the same result as a run with serial likelihood. Adding the terms to a zero
222 // initial sum can cancel the carry in some cases, causing divergent values.
223 result_ = results_[0];
224 packedNaN.accumulate(results_[0].Sum());
225 for (auto item_it = results_.cbegin() + 1; item_it != results_.cend(); ++item_it) {
226 result_ += *item_it;
227 packedNaN.accumulate(item_it->Sum());
228 }
229 results_.clear();
230
231 if (packedNaN.getPayload() != 0) {
233 }
234
235 if (TMath::IsNaN(result_.Sum())) {
236 RooAbsReal::logEvalError(nullptr, GetName().c_str(), "function value is NAN");
237 }
238 }
239}
240
241// --- RESULT LOGISTICS ---
242
244{
245 int numErrors = RooAbsReal::numEvalErrors();
246
247 if (numErrors) {
248 // Clear error list on local side
250 }
251
252 task_result_t task_result{id_, result_.Result(), result_.Carry(), numErrors > 0};
253 zmq::message_t message(sizeof(task_result_t));
254 memcpy(message.data(), &task_result, sizeof(task_result_t));
255 get_manager()->messenger().send_from_worker_to_master(std::move(message));
256}
257
258bool LikelihoodJob::receive_task_result_on_master(const zmq::message_t &message)
259{
260 auto task_result = message.data<task_result_t>();
261 results_.emplace_back(task_result->value, task_result->carry);
262 if (task_result->has_errors) {
263 RooAbsReal::logEvalError(nullptr, "LikelihoodJob", "evaluation errors at the worker processes", "no servervalue");
264 }
266 bool job_completed = (n_tasks_at_workers_ == 0);
267 return job_completed;
268}
269
270// --- END OF RESULT LOGISTICS ---
271
272void LikelihoodJob::evaluate_task(std::size_t task)
273{
274 assert(get_manager()->process_manager().is_worker());
275
276 double section_first = 0;
277 double section_last = 1;
278 if (getNEventTasks() > 1) {
279 std::size_t event_task = task % getNEventTasks();
280 std::size_t N_events = likelihood_->numDataEntries();
281 if (event_task > 0) {
282 std::size_t first = N_events * event_task / getNEventTasks();
283 section_first = static_cast<double>(first) / N_events;
284 }
285 if (event_task < getNEventTasks() - 1) {
286 std::size_t last = N_events * (event_task + 1) / getNEventTasks();
287 section_last = static_cast<double>(last) / N_events;
288 }
289 }
290
291 switch (likelihood_type_) {
294 result_ = likelihood_->evaluatePartition({section_first, section_last}, 0, 0);
295 if (do_offset_ && section_last == 1) {
296 // we only subtract at the end of event sections, otherwise the offset is subtracted for each event split
297 result_ -= shared_offset_.offsets()[0];
298 }
299 break;
300 }
302 result_ = likelihood_->evaluatePartition({0, 1}, 0, 0);
304 result_ -= shared_offset_.offsets()[0];
305 }
306 break;
307 }
308 case LikelihoodType::sum: {
309 std::size_t components_first = 0;
310 std::size_t components_last = likelihood_->getNComponents();
311 if (getNComponentTasks() > 1) {
312 std::size_t component_task = task / getNEventTasks();
313 components_first = likelihood_->getNComponents() * component_task / getNComponentTasks();
314 if (component_task == getNComponentTasks() - 1) {
315 components_last = likelihood_->getNComponents();
316 } else {
317 components_last = likelihood_->getNComponents() * (component_task + 1) / getNComponentTasks();
318 }
319 }
320
322 RooNaNPacker packedNaN;
323 for (std::size_t comp_ix = components_first; comp_ix < components_last; ++comp_ix) {
324 auto component_result = likelihood_->evaluatePartition({section_first, section_last}, comp_ix, comp_ix + 1);
325 packedNaN.accumulate(component_result.Sum());
326 if (do_offset_ && section_last == 1 &&
327 shared_offset_.offsets()[comp_ix] != ROOT::Math::KahanSum<double>(0, 0)) {
328 // we only subtract at the end of event sections, otherwise the offset is subtracted for each event split
329 result_ += (component_result - shared_offset_.offsets()[comp_ix]);
330 } else {
331 result_ += component_result;
332 }
333 }
334 if (packedNaN.getPayload() != 0) {
336 }
337
338 break;
339 }
340 }
341}
342
344{
345 likelihood_serial_.enableOffsetting(flag);
348 printf("WARNING: when calling MinuitFcnGrad::setOffsetting after the run has already been started the "
349 "MinuitFcnGrad::likelihood_in_gradient object (a LikelihoodSerial) on the workers can no longer be "
350 "updated! This function (LikelihoodJob::enableOffsetting) can in principle be used outside of "
351 "MinuitFcnGrad, but be aware of this limitation. To do a minimization with a different offsetting "
352 "setting, please delete all RooFit::MultiProcess based objects so that the forked processes are killed "
353 "and then set up a new RooMinimizer.\n");
355 }
356}
357
358#define PROCESS_VAL(p) \
359 case (p): s = #p; break;
360
361std::ostream &operator<<(std::ostream &out, const LikelihoodJob::update_state_mode value)
362{
363 std::string s;
364 switch (value) {
367 default: s = std::to_string(static_cast<int>(value));
368 }
369 return out << s;
370}
371
372#undef PROCESS_VAL
373
374} // namespace TestStatistics
375} // namespace RooFit
#define PROCESS_VAL(p)
The Kahan summation is a compensated summation algorithm, which significantly reduces numerical error...
Definition Util.h:141
bool isConstant() const
Check if the "Constant" attribute is set.
Definition RooAbsArg.h:283
void setConstant(bool value=true)
Abstract base class for objects that represent a real value and implements functionality common to al...
Definition RooAbsReal.h:63
double getVal(const RooArgSet *normalisationSet=nullptr) const
Evaluate object.
Definition RooAbsReal.h:107
static Int_t numEvalErrors()
Return the number of logged evaluation errors since the last clearing.
void logEvalError(const char *message, const char *serverValueString=nullptr) const
Log evaluation error message.
static void clearEvalErrorLog()
Clear the stack of evaluation error messages.
RooArgList is a container object that can hold multiple RooAbsArg objects.
Definition RooArgList.h:22
ProcessManager & process_manager() const
std::size_t id_
Definition Job.h:45
std::size_t state_id_
Definition Job.h:46
JobManager * get_manager()
Get JobManager instance; create and activate if necessary.
Definition Job.cxx:112
void gather_worker_results()
Wait for all tasks to be retrieved for the current Job.
Definition Job.cxx:126
value_t receive_from_master_on_worker(bool *more=nullptr)
Definition Messenger.h:176
void send_from_worker_to_master(T &&item)
specialization that sends the final message
Definition Messenger.h:192
void publish_from_master_to_workers(T &&item)
specialization that sends the final message
Definition Messenger.h:150
virtual void add(JobTask job_task)=0
Enqueue a task.
void enableOffsetting(bool flag) override
bool receive_task_result_on_master(const zmq::message_t &message) override
void evaluate_task(std::size_t task) override
std::vector< ROOT::Math::KahanSum< double > > results_
void send_back_task_result_from_worker(std::size_t task) override
void update_state() override
Virtual function to update any necessary state on workers.
void evaluate() override
Triggers (possibly asynchronous) evaluation of the likelihood.
SharedOffset::OffsetVec offsets_previous_
ROOT::Math::KahanSum< double > result_
LikelihoodJob(std::shared_ptr< RooAbsL > _likelihood, std::shared_ptr< WrapperCalculationCleanFlags > calculation_is_clean, SharedOffset offset)
LikelihoodWrapper(std::shared_ptr< RooAbsL > likelihood, std::shared_ptr< WrapperCalculationCleanFlags > calculation_is_clean, SharedOffset offset)
std::shared_ptr< WrapperCalculationCleanFlags > calculation_is_clean_
Variable that can be changed from the outside.
Definition RooRealVar.h:37
void setVal(double value) override
Set value of variable to 'value'.
std::size_t State
Definition types.h:23
Namespace for new RooFit test statistic calculation.
Definition RooAbsData.h:50
std::ostream & operator<<(std::ostream &out, const LikelihoodJob::update_state_mode value)
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
Definition CodegenImpl.h:72
Bool_t IsNaN(Double_t x)
Definition TMath.h:903
static constexpr std::size_t automaticNEventTasks
Definition Config.h:34
static constexpr std::size_t automaticNComponentTasks
Definition Config.h:35
Little struct that can pack a float into the unused bits of the mantissa of a NaN double.
float getPayload() const
Retrieve packed float.
double getNaNWithPayload() const
Retrieve a NaN with the current float payload packed into the mantissa.
void accumulate(double val)
Accumulate a packed float from another NaN into this.