28#ifdef ROOBATCHCOMPUTE_USE_IMT
44#error "RF_ARCH should always be defined"
52void fillBatches(
Batches &batches,
double *
output,
size_t nEvents, std::size_t nBatches, ArgSpan extraArgs)
54 batches.extra = extraArgs.data();
55 batches.nEvents = nEvents;
56 batches.nBatches = nBatches;
57 batches.nExtra = extraArgs.size();
61void fillArrays(std::span<Batch> arrays, VarSpan vars, std::size_t nEvents)
63 for (std::size_t i = 0; i < vars.size(); i++) {
64 arrays[i]._array = vars[i].data();
65 arrays[i]._isVector = vars[i].empty() || vars[i].size() >= nEvents;
69inline void advance(
Batches &batches, std::size_t nEvents)
71 for (std::size_t i = 0; i < batches.nBatches; i++) {
72 Batch &arg = batches.args[i];
73 arg._array += arg._isVector * nEvents;
75 batches.output += nEvents;
84class RooBatchComputeClass :
public RooBatchComputeInterface {
97#error "It's unexpected that _QUOTEVAL_ is defined at this point!"
99#define _QUOTEVAL_(x) _QUOTE_(x)
102 std::transform(out.begin(), out.end(), out.begin(), [](
unsigned char c) { return std::tolower(c); });
109 std::span<const double> offsetProbas)
override;
119 throw std::bad_function_call();
123 throw std::bad_function_call();
128#ifdef ROOBATCHCOMPUTE_USE_IMT
132 const std::vector<void (*)(
Batches &)> _computeFunctions;
135#ifdef ROOBATCHCOMPUTE_USE_IMT
138 std::size_t nEvents =
output.size();
145 std::size_t nEventsPerThread = nEvents / nThreads + (nEvents % nThreads > 0);
148 nThreads = nEvents / nEventsPerThread + (nEvents % nEventsPerThread > 0);
150 auto task = [&](std::size_t idx) ->
int {
154 std::vector<Batch> arrays(vars.size());
155 fillBatches(batches,
output.data(), nEventsPerThread, vars.size(), extraArgs);
156 fillArrays(arrays, vars, nEvents);
157 batches.
args = arrays.data();
158 advance(batches, batches.
nEvents * idx);
161 if (idx == nThreads - 1) {
165 std::size_t events = batches.
nEvents;
167 while (events > bufferSize) {
168 _computeFunctions[computer](batches);
169 advance(batches, bufferSize);
170 events -= bufferSize;
173 _computeFunctions[computer](batches);
177 std::vector<std::size_t> indices(nThreads);
178 for (
unsigned int i = 1; i < nThreads; i++) {
181 ex.Map(task, indices);
193void RooBatchComputeClass::compute(Config
const &, Computer computer, std::span<double>
output, VarSpan vars,
214#ifdef ROOBATCHCOMPUTE_USE_IMT
216 computeIMT(computer,
output, vars, extraArgs);
220 std::size_t nEvents =
output.size();
225 std::vector<Batch> arrays(vars.size());
226 fillBatches(batches,
output.data(), nEvents, vars.size(), extraArgs);
227 fillArrays(arrays, vars, nEvents);
228 batches.args = arrays.data();
230 std::size_t events = batches.nEvents;
232 while (events > bufferSize) {
233 _computeFunctions[computer](batches);
234 advance(batches, bufferSize);
237 batches.nEvents = events;
238 _computeFunctions[computer](batches);
243inline std::pair<double, double> getLog(
double prob, ReduceNLLOutput &out)
245 if (std::abs(prob) > 1e6) {
250 out.nNonPositiveValues++;
251 return {std::log(prob), -prob};
254 if (std::isnan(prob)) {
259 return {std::log(prob), 0.0};
264double RooBatchComputeClass::reduceSum(Config
const &, InputArr
input,
size_t n)
269ReduceNLLOutput RooBatchComputeClass::reduceNLL(Config
const &, std::span<const double> probas,
270 std::span<const double> weights, std::span<const double> offsetProbas)
274 double badness = 0.0;
278 for (std::size_t i = 0; i <
probas.size(); ++i) {
280 const double eventWeight = weights.size() > 1 ? weights[i] : weights[0];
282 if (0. == eventWeight)
285 std::pair<double, double> logOut = getLog(probas[i], out);
286 double term = logOut.first;
287 badness += logOut.second;
289 if (!offsetProbas.empty()) {
290 term -= std::log(offsetProbas[i]);
293 term *= -eventWeight;
298 out.nllSum = nllSum.
Sum();
304 out.nllSumCarry = 0.0;
312class ScalarBufferContainer {
314 ScalarBufferContainer() {}
315 ScalarBufferContainer(std::size_t
size)
318 throw std::runtime_error(
"ScalarBufferContainer can only be of size 1");
321 double const *hostReadPtr()
const {
return &
_val; }
322 double const *deviceReadPtr()
const {
return &
_val; }
324 double *hostWritePtr() {
return &
_val; }
325 double *deviceWritePtr() {
return &
_val; }
327 void assignFromHost(std::span<const double>
input) {
_val =
input[0]; }
328 void assignFromDevice(std::span<const double>) {
throw std::bad_function_call(); }
334class CPUBufferContainer {
338 double const *hostReadPtr()
const {
return _vec.data(); }
339 double const *deviceReadPtr()
const
341 throw std::bad_function_call();
345 double *hostWritePtr() {
return _vec.data(); }
346 double *deviceWritePtr()
348 throw std::bad_function_call();
352 void assignFromHost(std::span<const double>
input) {
_vec.assign(
input.begin(),
input.end()); }
353 void assignFromDevice(std::span<const double>) {
throw std::bad_function_call(); }
359template <
class Container>
360class BufferImpl :
public AbsBuffer {
362 using Queue = std::queue<std::unique_ptr<Container>>;
364 BufferImpl(std::size_t
size, Queue &queue) :
_queue{queue}
367 _vec = std::make_unique<Container>(
size);
374 ~BufferImpl()
override {
_queue.emplace(std::move(
_vec)); }
376 double const *hostReadPtr()
const override {
return _vec->hostReadPtr(); }
377 double const *deviceReadPtr()
const override {
return _vec->deviceReadPtr(); }
379 double *hostWritePtr()
override {
return _vec->hostWritePtr(); }
380 double *deviceWritePtr()
override {
return _vec->deviceWritePtr(); }
382 void assignFromHost(std::span<const double>
input)
override {
_vec->assignFromHost(
input); }
383 void assignFromDevice(std::span<const double>
input)
override {
_vec->assignFromDevice(
input); }
385 Container &
vec() {
return *
_vec; }
388 std::unique_ptr<Container>
_vec;
392using ScalarBuffer = BufferImpl<ScalarBufferContainer>;
393using CPUBuffer = BufferImpl<CPUBufferContainer>;
395struct BufferQueuesMaps {
400class BufferManager :
public AbsBufferManager {
403 BufferManager() :
_queuesMaps{std::make_unique<BufferQueuesMaps>()} {}
405 std::unique_ptr<AbsBuffer> makeScalarBuffer()
override
407 return std::make_unique<ScalarBuffer>(1,
_queuesMaps->scalarBufferQueuesMap[1]);
409 std::unique_ptr<AbsBuffer> makeCpuBuffer(std::size_t
size)
override
413 std::unique_ptr<AbsBuffer> makeGpuBuffer(std::size_t)
override {
throw std::bad_function_call(); }
414 std::unique_ptr<AbsBuffer> makePinnedBuffer(std::size_t, CudaInterface::CudaStream * =
nullptr)
override
416 throw std::bad_function_call();
425std::unique_ptr<AbsBufferManager> RooBatchComputeClass::createBufferManager()
const
427 return std::make_unique<BufferManager>();
std::vector< double > _vec
std::map< std::size_t, CPUBuffer::Queue > cpuBufferQueuesMap
std::map< std::size_t, ScalarBuffer::Queue > scalarBufferQueuesMap
std::unique_ptr< BufferQueuesMaps > _queuesMaps
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void input
These classes encapsulate the necessary data for the computations.
This class implements the interface to execute the same task multiple times, sequentially or in paral...
unsigned GetPoolSize() const
Return the number of pooled workers.
The Kahan summation is a compensated summation algorithm, which significantly reduces numerical error...
static KahanSum< T, N > Accumulate(Iterator begin, Iterator end, T initialValue=T{})
Iterate over a range and return an instance of a KahanSum.
void Add(T x)
Single-element accumulation. Will not vectorise.
This class overrides some RooBatchComputeInterface functions, for the purpose of providing a cuda spe...
double reduceSum(Config const &, InputArr input, size_t n) override
void deleteCudaStream(CudaInterface::CudaStream *) const override
CudaInterface::CudaStream * newCudaStream() const override
std::unique_ptr< AbsBufferManager > createBufferManager() const override
CudaInterface::CudaEvent * newCudaEvent(bool) const override
bool cudaStreamIsActive(CudaInterface::CudaStream *) const override
ReduceNLLOutput reduceNLL(Config const &, std::span< const double > probas, std::span< const double > weights, std::span< const double > offsetProbas) override
void cudaStreamWaitForEvent(CudaInterface::CudaStream *, CudaInterface::CudaEvent *) const override
std::string architectureName() const override
void cudaEventRecord(CudaInterface::CudaEvent *, CudaInterface::CudaStream *) const override
void compute(Config const &, Computer computer, std::span< double > output, VarSpan vars, ArgSpan extraArgs) override
void deleteCudaEvent(CudaInterface::CudaEvent *) const override
Architecture architecture() const override
Minimal configuration struct to steer the evaluation of a single node with the RooBatchCompute librar...
void(off) SmallVectorTemplateBase< T
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
std::vector< void(*)(Batches &)> getFunctions()
Returns a std::vector of pointers to the compute functions in this file.
static RooBatchComputeClass computeObj
Static object to trigger the constructor which overwrites the dispatch pointer.
Namespace for dispatching RooFit computations to various backends.
std::span< double > ArgSpan
R__EXTERN RooBatchComputeInterface * dispatchCPU
This dispatch pointer points to an implementation of the compute library, provided one has been loade...
constexpr std::size_t bufferSize
const double *__restrict InputArr
std::span< const std::span< const double > > VarSpan
void probas(TString dataset, TString fin="TMVA.root", Bool_t useTMVAStyle=kTRUE)
__roodevice__ static __roohost__ double packFloatIntoNaN(float payload)
Pack float into mantissa of a NaN.
static float unpackNaN(double val)
If val is NaN and a this NaN has been tagged as containing a payload, unpack the float from the manti...