Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RooBatchCompute.cxx
Go to the documentation of this file.
1/*
2 * Project: RooFit
3 * Authors:
4 * Emmanouil Michalainas, CERN, September 2020
5 *
6 * Copyright (c) 2021, CERN
7 *
8 * Redistribution and use in source and binary forms,
9 * with or without modification, are permitted according to the terms
10 * listed in LICENSE (http://roofit.sourceforge.net/license.txt)
11 */
12
13/**
14\file RooBatchCompute.cxx
15\class RbcClass
16\ingroup roofit_dev_docs_batchcompute
17
18This file contains the code for cpu computations using the RooBatchCompute library.
19**/
20
21#include "RooBatchCompute.h"
22#include "RooNaNPacker.h"
23#include "Batches.h"
24
25#include <ROOT/RConfig.hxx>
26
27#ifdef ROOBATCHCOMPUTE_USE_IMT
28#include <ROOT/TExecutor.hxx>
29#endif
30
31#include <Math/Util.h>
32
33#include <algorithm>
34#include <functional>
35#include <map>
36#include <queue>
37#include <sstream>
38#include <stdexcept>
39
40#include <vector>
41
42#ifndef RF_ARCH
43#error "RF_ARCH should always be defined"
44#endif
45
46namespace RooBatchCompute {
47namespace RF_ARCH {
48
49namespace {
50
51void fillBatches(Batches &batches, double *output, size_t nEvents, std::size_t nBatches, ArgSpan extraArgs)
52{
53 batches.extra = extraArgs.data();
54 batches.nEvents = nEvents;
55 batches.nBatches = nBatches;
56 batches.nExtra = extraArgs.size();
57 batches.output = output;
58}
59
60void fillArrays(std::span<Batch> arrays, VarSpan vars, std::size_t nEvents)
61{
62 for (std::size_t i = 0; i < vars.size(); i++) {
63 arrays[i]._array = vars[i].data();
64 arrays[i]._isVector = vars[i].empty() || vars[i].size() >= nEvents;
65 }
66}
67
68inline void advance(Batches &batches, std::size_t nEvents)
69{
70 for (std::size_t i = 0; i < batches.nBatches; i++) {
71 Batch &arg = batches.args[i];
72 arg._array += arg._isVector * nEvents;
73 }
74 batches.output += nEvents;
75}
76
77} // namespace
78
79std::vector<void (*)(Batches &)> getFunctions();
80
81/// This class overrides some RooBatchComputeInterface functions, for the
82/// purpose of providing a CPU specific implementation of the library.
84public:
86 {
87 // Set the dispatch pointer to this instance of the library upon loading
88 dispatchCPU = this;
89 }
90
91 Architecture architecture() const override { return Architecture::RF_ARCH; };
92 std::string architectureName() const override
93 {
94 // transform to lower case to match the original architecture name passed to the compiler
95#ifdef _QUOTEVAL_ // to quote the value of the preprocessor macro instead of the name
96#error "It's unexpected that _QUOTEVAL_ is defined at this point!"
97#endif
98#define _QUOTEVAL_(x) _QUOTE_(x)
99 std::string out = _QUOTEVAL_(RF_ARCH);
100#undef _QUOTEVAL_
101 std::transform(out.begin(), out.end(), out.begin(), [](unsigned char c) { return std::tolower(c); });
102 return out;
103 };
104
105 void compute(Config const &, Computer computer, std::span<double> output, VarSpan vars, ArgSpan extraArgs) override;
106 double reduceSum(Config const &, InputArr input, size_t n) override;
107 ReduceNLLOutput reduceNLL(Config const &, std::span<const double> probas, std::span<const double> weights,
108 std::span<const double> offsetProbas) override;
109
110 std::unique_ptr<AbsBufferManager> createBufferManager() const override;
111
112 CudaInterface::CudaEvent *newCudaEvent(bool) const override { throw std::bad_function_call(); }
113 CudaInterface::CudaStream *newCudaStream() const override { throw std::bad_function_call(); }
114 void deleteCudaEvent(CudaInterface::CudaEvent *) const override { throw std::bad_function_call(); }
115 void deleteCudaStream(CudaInterface::CudaStream *) const override { throw std::bad_function_call(); }
117 {
118 throw std::bad_function_call();
119 }
121 {
122 throw std::bad_function_call();
123 }
124 bool cudaStreamIsActive(CudaInterface::CudaStream *) const override { throw std::bad_function_call(); }
125
126private:
127#ifdef ROOBATCHCOMPUTE_USE_IMT
128 void computeIMT(Computer computer, std::span<double> output, VarSpan vars, ArgSpan extraArgs);
129#endif
130
131 const std::vector<void (*)(Batches &)> _computeFunctions;
132};
133
134#ifdef ROOBATCHCOMPUTE_USE_IMT
135void RooBatchComputeClass::computeIMT(Computer computer, std::span<double> output, VarSpan vars, ArgSpan extraArgs)
136{
137 std::size_t nEvents = output.size();
138
139 if (nEvents == 0)
140 return;
142 std::size_t nThreads = ex.GetPoolSize();
143
144 std::size_t nEventsPerThread = nEvents / nThreads + (nEvents % nThreads > 0);
145
146 // Reset the number of threads to the number we actually need given nEventsPerThread
147 nThreads = nEvents / nEventsPerThread + (nEvents % nEventsPerThread > 0);
148
149 auto task = [&](std::size_t idx) -> int {
150 // Fill a std::vector<Batches> with the same object and with ~nEvents/nThreads
151 // Then advance every object but the first to split the work between threads
153 std::vector<Batch> arrays(vars.size());
154 fillBatches(batches, output.data(), nEventsPerThread, vars.size(), extraArgs);
155 fillArrays(arrays, vars, nEvents);
156 batches.args = arrays.data();
157 advance(batches, batches.nEvents * idx);
158
159 // Set the number of events of the last Batches object as the remaining events
160 if (idx == nThreads - 1) {
161 batches.nEvents = nEvents - idx * batches.nEvents;
162 }
163
164 std::size_t events = batches.nEvents;
165 batches.nEvents = bufferSize;
166 while (events > bufferSize) {
169 events -= bufferSize;
170 }
171 batches.nEvents = events;
173 return 0;
174 };
175
176 std::vector<std::size_t> indices(nThreads);
177 for (unsigned int i = 1; i < nThreads; i++) {
178 indices[i] = i;
179 }
180 ex.Map(task, indices);
181}
182#endif
183
184/** Compute multiple values using optimized functions.
185This method creates a Batches object and passes it to the correct compute function.
186In case Implicit Multithreading is enabled, the events to be processed are equally
187divided among the tasks to be generated and computed in parallel.
188\param computer An enum specifying the compute function to be used.
189\param output The array where the computation results are stored.
190\param vars A std::span containing pointers to the variables involved in the computation.
191\param extraArgs An optional std::span containing extra double values that may participate in the computation. **/
194{
195 // In the original implementation of this library, the evaluation was done
196 // multi-threaded in implicit multi-threading was enabled in ROOT with
197 // ROOT::EnableImplicitMT().
198 //
199 // However, this multithreaded mode was not carefully validated and is
200 // therefore not production ready. One would first have to study the
201 // overhead for different numbers of cores, number of events, and model
202 // complexity. The, we should only consider implicit multithreading here if
203 // there is no performance penalty for any scenario, to not surprise the
204 // users with unexpected slowdows!
205 //
206 // Note that the priority of investigating this is not high, because RooFit
207 // R & D efforts currently go in the direction of parallelization at the
208 // level of the gradient components, or achieving single-threaded speedup
209 // with automatic differentiation. Furthermore, the single-threaded
210 // performance of the new CPU evaluation backend with the RooBatchCompute
211 // library, is generally much faster than the legacy evaluation backend
212 // already, even if the latter uses multi-threading.
213#ifdef ROOBATCHCOMPUTE_USE_IMT
216 }
217#endif
218
219 std::size_t nEvents = output.size();
220
221 // Fill a std::vector<Batches> with the same object and with ~nEvents/nThreads
222 // Then advance every object but the first to split the work between threads
224 std::vector<Batch> arrays(vars.size());
225 fillBatches(batches, output.data(), nEvents, vars.size(), extraArgs);
226 fillArrays(arrays, vars, nEvents);
227 batches.args = arrays.data();
228
229 std::size_t events = batches.nEvents;
230 batches.nEvents = bufferSize;
231 while (events > bufferSize) {
234 events -= bufferSize;
235 }
236 batches.nEvents = events;
238}
239
240namespace {
241
242inline std::pair<double, double> getLog(double prob, ReduceNLLOutput &out)
243{
244 if (prob <= 0.0) {
245 out.nNonPositiveValues++;
246 return {std::log(prob), -prob};
247 }
248
249 if (std::isinf(prob)) {
250 out.nInfiniteValues++;
251 }
252
253 if (std::isnan(prob)) {
254 out.nNaNValues++;
256 }
257
258 return {std::log(prob), 0.0};
259}
260
261} // namespace
262
267
268ReduceNLLOutput RooBatchComputeClass::reduceNLL(Config const &, std::span<const double> probas,
269 std::span<const double> weights, std::span<const double> offsetProbas)
270{
271 ReduceNLLOutput out;
272
273 double badness = 0.0;
274
276
277 for (std::size_t i = 0; i < weights.size(); ++i) {
278
279 if (0. == weights[i])
280 continue;
281
282 std::pair<double, double> logOut = getLog(probas.size() == 1 ? probas[0] : probas[i], out);
283 double term = logOut.first;
284 badness += logOut.second;
285
286 if (!offsetProbas.empty()) {
287 term -= std::log(offsetProbas[i]);
288 }
289
290 term *= -weights[i];
291
292 nllSum.Add(term);
293 }
294
295 out.nllSum = nllSum.Sum();
296 out.nllSumCarry = nllSum.Carry();
297
298 if (badness != 0.) {
299 // Some events with evaluation errors. Return "badness" of errors.
301 out.nllSumCarry = 0.0;
302 }
303
304 return out;
305}
306
307namespace {
308
309class ScalarBufferContainer {
310public:
311 ScalarBufferContainer() {}
312 ScalarBufferContainer(std::size_t size)
313 {
314 if (size != 1)
315 throw std::runtime_error("ScalarBufferContainer can only be of size 1");
316 }
317
318 double const *hostReadPtr() const { return &_val; }
319 double const *deviceReadPtr() const { return &_val; }
320
321 double *hostWritePtr() { return &_val; }
322 double *deviceWritePtr() { return &_val; }
323
324 void assignFromHost(std::span<const double> input) { _val = input[0]; }
325 void assignFromDevice(std::span<const double>) { throw std::bad_function_call(); }
326
327private:
328 double _val;
329};
330
331class CPUBufferContainer {
332public:
333 CPUBufferContainer(std::size_t size) : _vec(size) {}
334
335 double const *hostReadPtr() const { return _vec.data(); }
336 double const *deviceReadPtr() const
337 {
338 throw std::bad_function_call();
339 return nullptr;
340 }
341
342 double *hostWritePtr() { return _vec.data(); }
343 double *deviceWritePtr()
344 {
345 throw std::bad_function_call();
346 return nullptr;
347 }
348
349 void assignFromHost(std::span<const double> input) { _vec.assign(input.begin(), input.end()); }
350 void assignFromDevice(std::span<const double>) { throw std::bad_function_call(); }
351
352private:
353 std::vector<double> _vec;
354};
355
356template <class Container>
357class BufferImpl : public AbsBuffer {
358public:
359 using Queue = std::queue<std::unique_ptr<Container>>;
360
361 BufferImpl(std::size_t size, Queue &queue) : _queue{queue}
362 {
363 if (_queue.empty()) {
364 _vec = std::make_unique<Container>(size);
365 } else {
366 _vec = std::move(_queue.front());
367 _queue.pop();
368 }
369 }
370
371 ~BufferImpl() override { _queue.emplace(std::move(_vec)); }
372
373 double const *hostReadPtr() const override { return _vec->hostReadPtr(); }
374 double const *deviceReadPtr() const override { return _vec->deviceReadPtr(); }
375
376 double *hostWritePtr() override { return _vec->hostWritePtr(); }
377 double *deviceWritePtr() override { return _vec->deviceWritePtr(); }
378
379 void assignFromHost(std::span<const double> input) override { _vec->assignFromHost(input); }
380 void assignFromDevice(std::span<const double> input) override { _vec->assignFromDevice(input); }
381
382 Container &vec() { return *_vec; }
383
384private:
385 std::unique_ptr<Container> _vec;
386 Queue &_queue;
387};
388
391
392struct BufferQueuesMaps {
393 std::map<std::size_t, ScalarBuffer::Queue> scalarBufferQueuesMap;
394 std::map<std::size_t, CPUBuffer::Queue> cpuBufferQueuesMap;
395};
396
397class BufferManager : public AbsBufferManager {
398
399public:
400 BufferManager() : _queuesMaps{std::make_unique<BufferQueuesMaps>()} {}
401
402 std::unique_ptr<AbsBuffer> makeScalarBuffer() override
403 {
404 return std::make_unique<ScalarBuffer>(1, _queuesMaps->scalarBufferQueuesMap[1]);
405 }
406 std::unique_ptr<AbsBuffer> makeCpuBuffer(std::size_t size) override
407 {
408 return std::make_unique<CPUBuffer>(size, _queuesMaps->cpuBufferQueuesMap[size]);
409 }
410 std::unique_ptr<AbsBuffer> makeGpuBuffer(std::size_t) override { throw std::bad_function_call(); }
411 std::unique_ptr<AbsBuffer> makePinnedBuffer(std::size_t, CudaInterface::CudaStream * = nullptr) override
412 {
413 throw std::bad_function_call();
414 }
415
416private:
417 std::unique_ptr<BufferQueuesMaps> _queuesMaps;
418};
419
420} // namespace
421
422std::unique_ptr<AbsBufferManager> RooBatchComputeClass::createBufferManager() const
423{
424 return std::make_unique<BufferManager>();
425}
426
427/// Static object to trigger the constructor which overwrites the dispatch pointer.
429
430} // End namespace RF_ARCH
431} // End namespace RooBatchCompute
#define RF_ARCH
#define c(i)
Definition RSha256.hxx:101
std::vector< double > _vec
double _val
std::map< std::size_t, CPUBuffer::Queue > cpuBufferQueuesMap
std::map< std::size_t, ScalarBuffer::Queue > scalarBufferQueuesMap
Queue & _queue
std::unique_ptr< BufferQueuesMaps > _queuesMaps
#define _QUOTEVAL_(x)
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void input
This class implements the interface to execute the same task multiple times, sequentially or in paral...
Definition TExecutor.hxx:37
The Kahan summation is a compensated summation algorithm, which significantly reduces numerical error...
Definition Util.h:136
T Sum() const
Definition Util.h:254
static KahanSum< T, N > Accumulate(Iterator begin, Iterator end, T initialValue=T{})
Iterate over a range and return an instance of a KahanSum.
Definition Util.h:225
T Carry() const
Definition Util.h:264
void Add(T x)
Single-element accumulation. Will not vectorise.
Definition Util.h:179
const double *__restrict _array
Definition Batches.h:32
Minimal configuration struct to steer the evaluation of a single node with the RooBatchCompute librar...
This class overrides some RooBatchComputeInterface functions, for the purpose of providing a CPU spec...
void cudaEventRecord(CudaInterface::CudaEvent *, CudaInterface::CudaStream *) const override
void compute(Config const &, Computer computer, std::span< double > output, VarSpan vars, ArgSpan extraArgs) override
Compute multiple values using optimized functions.
const std::vector< void(*)(Batches &)> _computeFunctions
CudaInterface::CudaEvent * newCudaEvent(bool) const override
void deleteCudaEvent(CudaInterface::CudaEvent *) const override
double reduceSum(Config const &, InputArr input, size_t n) override
void deleteCudaStream(CudaInterface::CudaStream *) const override
std::unique_ptr< AbsBufferManager > createBufferManager() const override
CudaInterface::CudaStream * newCudaStream() const override
bool cudaStreamIsActive(CudaInterface::CudaStream *) const override
ReduceNLLOutput reduceNLL(Config const &, std::span< const double > probas, std::span< const double > weights, std::span< const double > offsetProbas) override
void cudaStreamWaitForEvent(CudaInterface::CudaStream *, CudaInterface::CudaEvent *) const override
The interface which should be implemented to provide optimised computation functions for implementati...
const Int_t n
Definition legend1.C:16
Double_t ex[n]
Definition legend1.C:17
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:600
std::vector< void(*)(Batches &)> getFunctions()
static RooBatchComputeClass computeObj
Static object to trigger the constructor which overwrites the dispatch pointer.
Namespace for dispatching RooFit computations to various backends.
std::span< double > ArgSpan
R__EXTERN RooBatchComputeInterface * dispatchCPU
This dispatch pointer points to an implementation of the compute library, provided one has been loade...
constexpr std::size_t bufferSize
const double *__restrict InputArr
std::span< const std::span< const double > > VarSpan
static double packFloatIntoNaN(float payload)
Pack float into mantissa of a NaN.
static float unpackNaN(double val)
If val is NaN and a this NaN has been tagged as containing a payload, unpack the float from the manti...
static void output()