Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RooBatchCompute.cxx
Go to the documentation of this file.
1/*
2 * Project: RooFit
3 * Authors:
4 * Emmanouil Michalainas, CERN, September 2020
5 *
6 * Copyright (c) 2021, CERN
7 *
8 * Redistribution and use in source and binary forms,
9 * with or without modification, are permitted according to the terms
10 * listed in LICENSE (http://roofit.sourceforge.net/license.txt)
11 */
12
13/**
14\file RooBatchCompute.cxx
15\class RbcClass
16\ingroup roofit_dev_docs_batchcompute
17
18This file contains the code for cpu computations using the RooBatchCompute library.
19**/
20
21#include "RooBatchCompute.h"
22#include "RooNaNPacker.h"
23#include "Batches.h"
24
25#include <ROOT/RConfig.hxx>
26
27#ifdef ROOBATCHCOMPUTE_USE_IMT
28#include <ROOT/TExecutor.hxx>
29#endif
30
31#include <Math/Util.h>
32
33#include <algorithm>
34#include <functional>
35#include <map>
36#include <queue>
37#include <sstream>
38#include <stdexcept>
39
40#include <vector>
41
42#ifndef RF_ARCH
43#error "RF_ARCH should always be defined"
44#endif
45
46namespace RooBatchCompute {
47namespace RF_ARCH {
48
49namespace {
50
51void fillBatches(Batches &batches, double *output, size_t nEvents, std::size_t nBatches, ArgSpan extraArgs)
52{
53 batches.extra = extraArgs.data();
54 batches.nEvents = nEvents;
55 batches.nBatches = nBatches;
56 batches.nExtra = extraArgs.size();
57 batches.output = output;
58}
59
60void fillArrays(std::span<Batch> arrays, VarSpan vars, std::size_t nEvents)
61{
62 for (std::size_t i = 0; i < vars.size(); i++) {
63 arrays[i]._array = vars[i].data();
64 arrays[i]._isVector = vars[i].empty() || vars[i].size() >= nEvents;
65 }
66}
67
68inline void advance(Batches &batches, std::size_t nEvents)
69{
70 for (std::size_t i = 0; i < batches.nBatches; i++) {
71 Batch &arg = batches.args[i];
72 arg._array += arg._isVector * nEvents;
73 }
74 batches.output += nEvents;
75}
76
77} // namespace
78
79std::vector<void (*)(Batches &)> getFunctions();
80
81/// This class overrides some RooBatchComputeInterface functions, for the
82/// purpose of providing a CPU specific implementation of the library.
84public:
86 {
87 // Set the dispatch pointer to this instance of the library upon loading
88 dispatchCPU = this;
89 }
90
91 Architecture architecture() const override { return Architecture::RF_ARCH; };
92 std::string architectureName() const override
93 {
94 // transform to lower case to match the original architecture name passed to the compiler
95 std::string out = _R_QUOTEVAL_(RF_ARCH);
96 std::transform(out.begin(), out.end(), out.begin(), [](unsigned char c) { return std::tolower(c); });
97 return out;
98 };
99
100 void compute(Config const &, Computer computer, std::span<double> output, VarSpan vars, ArgSpan extraArgs) override;
101 double reduceSum(Config const &, InputArr input, size_t n) override;
102 ReduceNLLOutput reduceNLL(Config const &, std::span<const double> probas, std::span<const double> weights,
103 std::span<const double> offsetProbas) override;
104
105 std::unique_ptr<AbsBufferManager> createBufferManager() const override;
106
107 CudaInterface::CudaEvent *newCudaEvent(bool) const override { throw std::bad_function_call(); }
108 CudaInterface::CudaStream *newCudaStream() const override { throw std::bad_function_call(); }
109 void deleteCudaEvent(CudaInterface::CudaEvent *) const override { throw std::bad_function_call(); }
110 void deleteCudaStream(CudaInterface::CudaStream *) const override { throw std::bad_function_call(); }
112 {
113 throw std::bad_function_call();
114 }
116 {
117 throw std::bad_function_call();
118 }
119 bool cudaStreamIsActive(CudaInterface::CudaStream *) const override { throw std::bad_function_call(); }
120
121private:
122#ifdef ROOBATCHCOMPUTE_USE_IMT
123 void computeIMT(Computer computer, std::span<double> output, VarSpan vars, ArgSpan extraArgs);
124#endif
125
126 const std::vector<void (*)(Batches &)> _computeFunctions;
127};
128
129#ifdef ROOBATCHCOMPUTE_USE_IMT
130void RooBatchComputeClass::computeIMT(Computer computer, std::span<double> output, VarSpan vars, ArgSpan extraArgs)
131{
132 std::size_t nEvents = output.size();
133
134 if (nEvents == 0)
135 return;
137 std::size_t nThreads = ex.GetPoolSize();
138
139 std::size_t nEventsPerThread = nEvents / nThreads + (nEvents % nThreads > 0);
140
141 // Reset the number of threads to the number we actually need given nEventsPerThread
142 nThreads = nEvents / nEventsPerThread + (nEvents % nEventsPerThread > 0);
143
144 auto task = [&](std::size_t idx) -> int {
145 // Fill a std::vector<Batches> with the same object and with ~nEvents/nThreads
146 // Then advance every object but the first to split the work between threads
148 std::vector<Batch> arrays(vars.size());
149 fillBatches(batches, output.data(), nEventsPerThread, vars.size(), extraArgs);
150 fillArrays(arrays, vars, nEvents);
151 batches.args = arrays.data();
152 advance(batches, batches.nEvents * idx);
153
154 // Set the number of events of the last Batches object as the remaining events
155 if (idx == nThreads - 1) {
156 batches.nEvents = nEvents - idx * batches.nEvents;
157 }
158
159 std::size_t events = batches.nEvents;
160 batches.nEvents = bufferSize;
161 while (events > bufferSize) {
164 events -= bufferSize;
165 }
166 batches.nEvents = events;
168 return 0;
169 };
170
171 std::vector<std::size_t> indices(nThreads);
172 for (unsigned int i = 1; i < nThreads; i++) {
173 indices[i] = i;
174 }
175 ex.Map(task, indices);
176}
177#endif
178
179/** Compute multiple values using optimized functions.
180This method creates a Batches object and passes it to the correct compute function.
181In case Implicit Multithreading is enabled, the events to be processed are equally
182divided among the tasks to be generated and computed in parallel.
183\param computer An enum specifying the compute function to be used.
184\param output The array where the computation results are stored.
185\param vars A std::span containing pointers to the variables involved in the computation.
186\param extraArgs An optional std::span containing extra double values that may participate in the computation. **/
189{
190 // In the original implementation of this library, the evaluation was done
191 // multi-threaded in implicit multi-threading was enabled in ROOT with
192 // ROOT::EnableImplicitMT().
193 //
194 // However, this multithreaded mode was not carefully validated and is
195 // therefore not production ready. One would first have to study the
196 // overhead for different numbers of cores, number of events, and model
197 // complexity. The, we should only consider implicit multithreading here if
198 // there is no performance penalty for any scenario, to not surprise the
199 // users with unexpected slowdows!
200 //
201 // Note that the priority of investigating this is not high, because RooFit
202 // R & D efforts currently go in the direction of parallelization at the
203 // level of the gradient components, or achieving single-threaded speedup
204 // with automatic differentiation. Furthermore, the single-threaded
205 // performance of the new CPU evaluation backend with the RooBatchCompute
206 // library, is generally much faster than the legacy evaluation backend
207 // already, even if the latter uses multi-threading.
208#ifdef ROOBATCHCOMPUTE_USE_IMT
211 }
212#endif
213
214 std::size_t nEvents = output.size();
215
216 // Fill a std::vector<Batches> with the same object and with ~nEvents/nThreads
217 // Then advance every object but the first to split the work between threads
219 std::vector<Batch> arrays(vars.size());
220 fillBatches(batches, output.data(), nEvents, vars.size(), extraArgs);
221 fillArrays(arrays, vars, nEvents);
222 batches.args = arrays.data();
223
224 std::size_t events = batches.nEvents;
225 batches.nEvents = bufferSize;
226 while (events > bufferSize) {
229 events -= bufferSize;
230 }
231 batches.nEvents = events;
233}
234
235namespace {
236
237inline std::pair<double, double> getLog(double prob, ReduceNLLOutput &out)
238{
239 if (prob <= 0.0) {
240 out.nNonPositiveValues++;
241 return {std::log(prob), -prob};
242 }
243
244 if (std::isinf(prob)) {
245 out.nInfiniteValues++;
246 }
247
248 if (std::isnan(prob)) {
249 out.nNaNValues++;
251 }
252
253 return {std::log(prob), 0.0};
254}
255
256} // namespace
257
262
263ReduceNLLOutput RooBatchComputeClass::reduceNLL(Config const &, std::span<const double> probas,
264 std::span<const double> weights, std::span<const double> offsetProbas)
265{
266 ReduceNLLOutput out;
267
268 double badness = 0.0;
269
271
272 for (std::size_t i = 0; i < weights.size(); ++i) {
273
274 if (0. == weights[i])
275 continue;
276
277 std::pair<double, double> logOut = getLog(probas.size() == 1 ? probas[0] : probas[i], out);
278 double term = logOut.first;
279 badness += logOut.second;
280
281 if (!offsetProbas.empty()) {
282 term -= std::log(offsetProbas[i]);
283 }
284
285 term *= -weights[i];
286
287 nllSum.Add(term);
288 }
289
290 out.nllSum = nllSum.Sum();
291 out.nllSumCarry = nllSum.Carry();
292
293 if (badness != 0.) {
294 // Some events with evaluation errors. Return "badness" of errors.
296 out.nllSumCarry = 0.0;
297 }
298
299 return out;
300}
301
302namespace {
303
304class ScalarBufferContainer {
305public:
306 ScalarBufferContainer() {}
307 ScalarBufferContainer(std::size_t size)
308 {
309 if (size != 1)
310 throw std::runtime_error("ScalarBufferContainer can only be of size 1");
311 }
312
313 double const *hostReadPtr() const { return &_val; }
314 double const *deviceReadPtr() const { return &_val; }
315
316 double *hostWritePtr() { return &_val; }
317 double *deviceWritePtr() { return &_val; }
318
319 void assignFromHost(std::span<const double> input) { _val = input[0]; }
320 void assignFromDevice(std::span<const double>) { throw std::bad_function_call(); }
321
322private:
323 double _val;
324};
325
326class CPUBufferContainer {
327public:
328 CPUBufferContainer(std::size_t size) : _vec(size) {}
329
330 double const *hostReadPtr() const { return _vec.data(); }
331 double const *deviceReadPtr() const
332 {
333 throw std::bad_function_call();
334 return nullptr;
335 }
336
337 double *hostWritePtr() { return _vec.data(); }
338 double *deviceWritePtr()
339 {
340 throw std::bad_function_call();
341 return nullptr;
342 }
343
344 void assignFromHost(std::span<const double> input) { _vec.assign(input.begin(), input.end()); }
345 void assignFromDevice(std::span<const double>) { throw std::bad_function_call(); }
346
347private:
348 std::vector<double> _vec;
349};
350
351template <class Container>
352class BufferImpl : public AbsBuffer {
353public:
354 using Queue = std::queue<std::unique_ptr<Container>>;
355
356 BufferImpl(std::size_t size, Queue &queue) : _queue{queue}
357 {
358 if (_queue.empty()) {
359 _vec = std::make_unique<Container>(size);
360 } else {
361 _vec = std::move(_queue.front());
362 _queue.pop();
363 }
364 }
365
366 ~BufferImpl() override { _queue.emplace(std::move(_vec)); }
367
368 double const *hostReadPtr() const override { return _vec->hostReadPtr(); }
369 double const *deviceReadPtr() const override { return _vec->deviceReadPtr(); }
370
371 double *hostWritePtr() override { return _vec->hostWritePtr(); }
372 double *deviceWritePtr() override { return _vec->deviceWritePtr(); }
373
374 void assignFromHost(std::span<const double> input) override { _vec->assignFromHost(input); }
375 void assignFromDevice(std::span<const double> input) override { _vec->assignFromDevice(input); }
376
377 Container &vec() { return *_vec; }
378
379private:
380 std::unique_ptr<Container> _vec;
381 Queue &_queue;
382};
383
386
387struct BufferQueuesMaps {
388 std::map<std::size_t, ScalarBuffer::Queue> scalarBufferQueuesMap;
389 std::map<std::size_t, CPUBuffer::Queue> cpuBufferQueuesMap;
390};
391
392class BufferManager : public AbsBufferManager {
393
394public:
395 BufferManager() : _queuesMaps{std::make_unique<BufferQueuesMaps>()} {}
396
397 std::unique_ptr<AbsBuffer> makeScalarBuffer() override
398 {
399 return std::make_unique<ScalarBuffer>(1, _queuesMaps->scalarBufferQueuesMap[1]);
400 }
401 std::unique_ptr<AbsBuffer> makeCpuBuffer(std::size_t size) override
402 {
403 return std::make_unique<CPUBuffer>(size, _queuesMaps->cpuBufferQueuesMap[size]);
404 }
405 std::unique_ptr<AbsBuffer> makeGpuBuffer(std::size_t) override { throw std::bad_function_call(); }
406 std::unique_ptr<AbsBuffer> makePinnedBuffer(std::size_t, CudaInterface::CudaStream * = nullptr) override
407 {
408 throw std::bad_function_call();
409 }
410
411private:
412 std::unique_ptr<BufferQueuesMaps> _queuesMaps;
413};
414
415} // namespace
416
417std::unique_ptr<AbsBufferManager> RooBatchComputeClass::createBufferManager() const
418{
419 return std::make_unique<BufferManager>();
420}
421
422/// Static object to trigger the constructor which overwrites the dispatch pointer.
424
425} // End namespace RF_ARCH
426} // End namespace RooBatchCompute
#define RF_ARCH
#define _R_QUOTEVAL_(string)
Definition RConfig.hxx:450
#define c(i)
Definition RSha256.hxx:101
std::vector< double > _vec
double _val
std::map< std::size_t, CPUBuffer::Queue > cpuBufferQueuesMap
std::map< std::size_t, ScalarBuffer::Queue > scalarBufferQueuesMap
Queue & _queue
std::unique_ptr< BufferQueuesMaps > _queuesMaps
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void input
This class implements the interface to execute the same task multiple times, sequentially or in paral...
Definition TExecutor.hxx:37
The Kahan summation is a compensated summation algorithm, which significantly reduces numerical error...
Definition Util.h:141
T Sum() const
Definition Util.h:259
static KahanSum< T, N > Accumulate(Iterator begin, Iterator end, T initialValue=T{})
Iterate over a range and return an instance of a KahanSum.
Definition Util.h:230
T Carry() const
Definition Util.h:269
void Add(T x)
Single-element accumulation. Will not vectorise.
Definition Util.h:184
const double *__restrict _array
Definition Batches.h:32
Minimal configuration struct to steer the evaluation of a single node with the RooBatchCompute librar...
This class overrides some RooBatchComputeInterface functions, for the purpose of providing a CPU spec...
void cudaEventRecord(CudaInterface::CudaEvent *, CudaInterface::CudaStream *) const override
void compute(Config const &, Computer computer, std::span< double > output, VarSpan vars, ArgSpan extraArgs) override
Compute multiple values using optimized functions.
const std::vector< void(*)(Batches &)> _computeFunctions
CudaInterface::CudaEvent * newCudaEvent(bool) const override
void deleteCudaEvent(CudaInterface::CudaEvent *) const override
double reduceSum(Config const &, InputArr input, size_t n) override
void deleteCudaStream(CudaInterface::CudaStream *) const override
std::unique_ptr< AbsBufferManager > createBufferManager() const override
CudaInterface::CudaStream * newCudaStream() const override
bool cudaStreamIsActive(CudaInterface::CudaStream *) const override
ReduceNLLOutput reduceNLL(Config const &, std::span< const double > probas, std::span< const double > weights, std::span< const double > offsetProbas) override
void cudaStreamWaitForEvent(CudaInterface::CudaStream *, CudaInterface::CudaEvent *) const override
The interface which should be implemented to provide optimised computation functions for implementati...
const Int_t n
Definition legend1.C:16
Double_t ex[n]
Definition legend1.C:17
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:600
std::vector< void(*)(Batches &)> getFunctions()
static RooBatchComputeClass computeObj
Static object to trigger the constructor which overwrites the dispatch pointer.
Namespace for dispatching RooFit computations to various backends.
std::span< double > ArgSpan
R__EXTERN RooBatchComputeInterface * dispatchCPU
This dispatch pointer points to an implementation of the compute library, provided one has been loade...
constexpr std::size_t bufferSize
const double *__restrict InputArr
std::span< const std::span< const double > > VarSpan
static double packFloatIntoNaN(float payload)
Pack float into mantissa of a NaN.
static float unpackNaN(double val)
If val is NaN and a this NaN has been tagged as containing a payload, unpack the float from the manti...
static void output()