27#ifdef ROOBATCHCOMPUTE_USE_IMT
43#error "RF_ARCH should always be defined"
51void fillBatches(Batches &batches,
double *output,
size_t nEvents, std::size_t nBatches,
ArgSpan extraArgs)
53 batches.extra = extraArgs.data();
54 batches.nEvents = nEvents;
55 batches.nBatches = nBatches;
56 batches.nExtra = extraArgs.size();
57 batches.output = output;
60void fillArrays(std::span<Batch> arrays,
VarSpan vars, std::size_t nEvents)
62 for (std::size_t i = 0; i < vars.size(); i++) {
63 arrays[i]._array = vars[i].data();
64 arrays[i]._isVector = vars[i].empty() || vars[i].size() >= nEvents;
68inline void advance(Batches &batches, std::size_t nEvents)
70 for (std::size_t i = 0; i < batches.nBatches; i++) {
71 Batch &arg = batches.args[i];
72 arg._array += arg._isVector * nEvents;
74 batches.output += nEvents;
96 std::transform(out.begin(), out.end(), out.begin(), [](
unsigned char c) { return std::tolower(c); });
103 std::span<const double> offsetProbas)
override;
113 throw std::bad_function_call();
117 throw std::bad_function_call();
122#ifdef ROOBATCHCOMPUTE_USE_IMT
129#ifdef ROOBATCHCOMPUTE_USE_IMT
130void RooBatchComputeClass::computeIMT(
Computer computer, std::span<double> output,
VarSpan vars,
ArgSpan extraArgs)
132 std::size_t nEvents = output.size();
137 std::size_t nThreads =
ex.GetPoolSize();
139 std::size_t nEventsPerThread = nEvents / nThreads + (nEvents % nThreads > 0);
142 nThreads = nEvents / nEventsPerThread + (nEvents % nEventsPerThread > 0);
144 auto task = [&](std::size_t idx) ->
int {
148 std::vector<Batch> arrays(vars.size());
149 fillBatches(batches, output.data(), nEventsPerThread, vars.size(), extraArgs);
150 fillArrays(arrays, vars, nEvents);
151 batches.
args = arrays.data();
152 advance(batches, batches.
nEvents * idx);
155 if (idx == nThreads - 1) {
159 std::size_t events = batches.
nEvents;
171 std::vector<std::size_t> indices(nThreads);
172 for (
unsigned int i = 1; i < nThreads; i++) {
175 ex.Map(task, indices);
208#ifdef ROOBATCHCOMPUTE_USE_IMT
210 computeIMT(computer, output, vars, extraArgs);
214 std::size_t nEvents = output.size();
219 std::vector<Batch> arrays(vars.size());
220 fillBatches(batches, output.data(), nEvents, vars.size(), extraArgs);
221 fillArrays(arrays, vars, nEvents);
222 batches.
args = arrays.data();
224 std::size_t events = batches.
nEvents;
237inline std::pair<double, double> getLog(
double prob,
ReduceNLLOutput &out)
241 return {std::log(prob), -prob};
244 if (std::isinf(prob)) {
248 if (std::isnan(prob)) {
253 return {std::log(prob), 0.0};
264 std::span<const double> weights, std::span<const double> offsetProbas)
268 double badness = 0.0;
272 for (std::size_t i = 0; i < weights.size(); ++i) {
274 if (0. == weights[i])
277 std::pair<double, double> logOut = getLog(probas.size() == 1 ? probas[0] : probas[i], out);
278 double term = logOut.first;
279 badness += logOut.second;
281 if (!offsetProbas.empty()) {
282 term -= std::log(offsetProbas[i]);
304class ScalarBufferContainer {
306 ScalarBufferContainer() {}
307 ScalarBufferContainer(std::size_t
size)
310 throw std::runtime_error(
"ScalarBufferContainer can only be of size 1");
313 double const *hostReadPtr()
const {
return &_val; }
314 double const *deviceReadPtr()
const {
return &_val; }
316 double *hostWritePtr() {
return &_val; }
317 double *deviceWritePtr() {
return &_val; }
319 void assignFromHost(std::span<const double> input) { _val = input[0]; }
320 void assignFromDevice(std::span<const double>) {
throw std::bad_function_call(); }
326class CPUBufferContainer {
328 CPUBufferContainer(std::size_t
size) : _vec(
size) {}
330 double const *hostReadPtr()
const {
return _vec.data(); }
331 double const *deviceReadPtr()
const
333 throw std::bad_function_call();
337 double *hostWritePtr() {
return _vec.data(); }
338 double *deviceWritePtr()
340 throw std::bad_function_call();
344 void assignFromHost(std::span<const double> input) { _vec.assign(input.begin(), input.end()); }
345 void assignFromDevice(std::span<const double>) {
throw std::bad_function_call(); }
348 std::vector<double> _vec;
351template <
class Container>
352class BufferImpl :
public AbsBuffer {
354 using Queue = std::queue<std::unique_ptr<Container>>;
356 BufferImpl(std::size_t
size, Queue &queue) : _queue{queue}
358 if (_queue.empty()) {
359 _vec = std::make_unique<Container>(
size);
361 _vec = std::move(_queue.front());
366 ~BufferImpl()
override { _queue.emplace(std::move(_vec)); }
368 double const *hostReadPtr()
const override {
return _vec->hostReadPtr(); }
369 double const *deviceReadPtr()
const override {
return _vec->deviceReadPtr(); }
371 double *hostWritePtr()
override {
return _vec->hostWritePtr(); }
372 double *deviceWritePtr()
override {
return _vec->deviceWritePtr(); }
374 void assignFromHost(std::span<const double> input)
override { _vec->assignFromHost(input); }
375 void assignFromDevice(std::span<const double> input)
override { _vec->assignFromDevice(input); }
377 Container &vec() {
return *_vec; }
380 std::unique_ptr<Container> _vec;
384using ScalarBuffer = BufferImpl<ScalarBufferContainer>;
385using CPUBuffer = BufferImpl<CPUBufferContainer>;
387struct BufferQueuesMaps {
388 std::map<std::size_t, ScalarBuffer::Queue> scalarBufferQueuesMap;
389 std::map<std::size_t, CPUBuffer::Queue> cpuBufferQueuesMap;
392class BufferManager :
public AbsBufferManager {
395 BufferManager() : _queuesMaps{std::make_unique<BufferQueuesMaps>()} {}
397 std::unique_ptr<AbsBuffer> makeScalarBuffer()
override
399 return std::make_unique<ScalarBuffer>(1, _queuesMaps->scalarBufferQueuesMap[1]);
401 std::unique_ptr<AbsBuffer> makeCpuBuffer(std::size_t
size)
override
403 return std::make_unique<CPUBuffer>(
size, _queuesMaps->cpuBufferQueuesMap[
size]);
405 std::unique_ptr<AbsBuffer> makeGpuBuffer(std::size_t)
override {
throw std::bad_function_call(); }
406 std::unique_ptr<AbsBuffer> makePinnedBuffer(std::size_t, CudaInterface::CudaStream * =
nullptr)
override
408 throw std::bad_function_call();
412 std::unique_ptr<BufferQueuesMaps> _queuesMaps;
419 return std::make_unique<BufferManager>();
#define _R_QUOTEVAL_(string)
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
This class implements the interface to execute the same task multiple times, sequentially or in paral...
The Kahan summation is a compensated summation algorithm, which significantly reduces numerical error...
static KahanSum< T, N > Accumulate(Iterator begin, Iterator end, T initialValue=T{})
Iterate over a range and return an instance of a KahanSum.
void Add(T x)
Single-element accumulation. Will not vectorise.
Minimal configuration struct to steer the evaluation of a single node with the RooBatchCompute librar...
This class overrides some RooBatchComputeInterface functions, for the purpose of providing a CPU spec...
std::string architectureName() const override
void cudaEventRecord(CudaInterface::CudaEvent *, CudaInterface::CudaStream *) const override
void compute(Config const &, Computer computer, std::span< double > output, VarSpan vars, ArgSpan extraArgs) override
Compute multiple values using optimized functions.
const std::vector< void(*)(Batches &)> _computeFunctions
CudaInterface::CudaEvent * newCudaEvent(bool) const override
void deleteCudaEvent(CudaInterface::CudaEvent *) const override
double reduceSum(Config const &, InputArr input, size_t n) override
void deleteCudaStream(CudaInterface::CudaStream *) const override
std::unique_ptr< AbsBufferManager > createBufferManager() const override
CudaInterface::CudaStream * newCudaStream() const override
bool cudaStreamIsActive(CudaInterface::CudaStream *) const override
Architecture architecture() const override
ReduceNLLOutput reduceNLL(Config const &, std::span< const double > probas, std::span< const double > weights, std::span< const double > offsetProbas) override
void cudaStreamWaitForEvent(CudaInterface::CudaStream *, CudaInterface::CudaEvent *) const override
The interface which should be implemented to provide optimised computation functions for implementati...
void(off) SmallVectorTemplateBase< T
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
std::vector< void(*)(Batches &)> getFunctions()
Returns a std::vector of pointers to the compute functions in this file.
static RooBatchComputeClass computeObj
Static object to trigger the constructor which overwrites the dispatch pointer.
Namespace for dispatching RooFit computations to various backends.
std::span< double > ArgSpan
R__EXTERN RooBatchComputeInterface * dispatchCPU
This dispatch pointer points to an implementation of the compute library, provided one has been loade...
constexpr std::size_t bufferSize
const double *__restrict InputArr
std::span< const std::span< const double > > VarSpan
std::size_t nInfiniteValues
std::size_t nNonPositiveValues
static double packFloatIntoNaN(float payload)
Pack float into mantissa of a NaN.
static float unpackNaN(double val)
If val is NaN and a this NaN has been tagged as containing a payload, unpack the float from the manti...