35#include <sys/socket.h>
37#define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
38#define END_NAMESPACE_ROOFIT }
59 int (*
f)(
int,
char*, std::size_t))
60 {
return f(err, buf,
sz); }
63 char* (*
f)(
int,
char*, std::size_t));
71 BidirMMapPipeException::BidirMMapPipeException(
const char*
msg,
int err)
73 std::size_t
msgsz = std::strlen(
msg);
88 int BidirMMapPipeException::dostrerror_r(
int err,
char* buf,
89 std::size_t
sz,
char* (*
f)(
int,
char*, std::size_t))
92 char *
tmp =
f(err, buf,
sz);
94 std::strncpy(buf,
tmp,
sz);
116 unsigned short m_size = 0;
117 unsigned short m_pos = 0;
124 assert(std::numeric_limits<unsigned short>::max() >=
125 PageChunk::pagesize());
136 unsigned short&
size() {
return m_size; }
138 unsigned size()
const {
return m_size; }
140 unsigned short& pos() {
return m_pos; }
142 unsigned pos()
const {
return m_pos; }
144 inline unsigned char* begin()
const
145 {
return reinterpret_cast<unsigned char*
>(
const_cast<Page*
>(
this))
148 inline unsigned char* end()
const
149 {
return reinterpret_cast<unsigned char*
>(
const_cast<Page*
>(
this))
150 + PageChunk::pagesize(); }
152 static unsigned capacity()
153 {
return PageChunk::pagesize() -
sizeof(
Page); }
155 bool empty()
const {
return !m_size; }
157 bool filled()
const {
return !empty(); }
159 unsigned free()
const {
return capacity() - m_size; }
163 bool full()
const {
return !
free(); }
166 void Page::setNext(
const Page*
p)
171 const char*
p1 =
reinterpret_cast<char*
>(
this);
172 const char*
p2 =
reinterpret_cast<const char*
>(
p);
176 tmp /=
static_cast<std::ptrdiff_t
>(PageChunk::pagesize());
185 Page* Page::next()
const
187 if (!
m_next)
return nullptr;
188 char*
ptmp =
reinterpret_cast<char*
>(
const_cast<Page*
>(
this));
189 ptmp += std::ptrdiff_t(
m_next) * PageChunk::pagesize();
190 return reinterpret_cast<Page*
>(
ptmp);
221 typedef BidirMMapPipe_impl::PageChunk Chunk;
225 friend class BidirMMapPipe_impl::PageChunk;
237 static unsigned pagesize() {
return PageChunk::pagesize(); }
240 {
return PageChunk::mmapVariety(); }
267 void release(Chunk*
chunk);
282 unsigned PageChunk::s_physpgsz = PageChunk::getPageSize();
283 unsigned PageChunk::s_pagesize = std::min(PageChunk::s_physpgsz, 16384u);
284 PageChunk::MMapVariety PageChunk::s_mmapworks = PageChunk::Unknown;
300 if (&
other ==
this)
return *
this;
310 unsigned Pages::pagesize() {
return PageChunk::pagesize(); }
312 Page* Pages::page(
unsigned pgno)
const
315 unsigned char*
pptr =
316 reinterpret_cast<unsigned char*
>(
m_pimpl->m_pages);
318 return reinterpret_cast<Page*
>(
pptr);
321 unsigned Pages::pageno(
Page*
p)
const
323 const unsigned char*
pptr =
324 reinterpret_cast<const unsigned char*
>(
p);
325 const unsigned char*
bptr =
326 reinterpret_cast<const unsigned char*
>(
m_pimpl->m_pages);
333 unsigned PageChunk::getPageSize()
347 PageChunk::PageChunk(
PagePool* parent,
355 unsigned char*
p =
reinterpret_cast<unsigned char*
>(
m_begin);
356 unsigned char*
pend =
reinterpret_cast<unsigned char*
>(
m_end);
363 PageChunk::~PageChunk()
369 bool PageChunk::contains(
const Pages&
p)
const
370 {
return p.m_pimpl->m_parent ==
this; }
372 Pages PageChunk::pop()
381 void PageChunk::push(
const Pages&
p)
391 if (empty())
return m_parent->release(
this);
395 void* PageChunk::dommap(
unsigned len)
411#if defined(MAP_ANONYMOUS)
413#define MYANONFLAG MAP_ANONYMOUS
414#elif defined(MAP_ANON)
416#define MYANONFLAG MAP_ANON
428 if (BidirMMapPipe::debugflag() && !
msgprinted) {
429 std::cerr <<
" INFO: In " <<
__func__ <<
" (" <<
431 "): anonymous mmapping works, excellent!" <<
443 int fd = ::open(
"/dev/zero",
O_RDWR);
456 if (-1 == ::close(fd))
458 if (BidirMMapPipe::debugflag() && !
msgprinted) {
460 ", line " <<
__LINE__ <<
"): mmapping /dev/zero works, "
461 "very good!" << std::endl;
468 std::string
name =
tmpPath +
"/roofit_BidirMMapPipe-XXXXXX";
485 if (1 != ::write(fd,
name.c_str(), 1)) {
501 if (-1 == ::close(fd)) {
506 if (BidirMMapPipe::debugflag() && !
msgprinted) {
508 ", line " <<
__LINE__ <<
"): mmapping temporary files "
509 "works, good!" << std::endl;
519 if (BidirMMapPipe::debugflag() && !
msgprinted) {
521 ", line " <<
__LINE__ <<
"): anonymous mmapping of "
522 "shared buffers failed, falling back to read/write on "
523 " pipes!" << std::endl;
536 void PageChunk::domunmap(
void*
addr,
unsigned len)
550 void PageChunk::zap(
Pages&
p)
564 unsigned char*
p0 =
reinterpret_cast<unsigned char*
>(
m_begin);
565 unsigned char*
p1 =
reinterpret_cast<unsigned char*
>(
p[0
u]);
567 unsigned char*
p3 =
reinterpret_cast<unsigned char*
>(
m_end);
574 p.m_pimpl->m_parent =
nullptr;
584 if (PageChunk::pagesize() != PageChunk::physPgSz()) {
585 const unsigned mult =
586 PageChunk::physPgSz() / PageChunk::pagesize();
592 if (BidirMMapPipe::debugflag()) {
593 std::cerr <<
" INFO: In " <<
__func__ <<
" (" <<
595 "): physical page size " << PageChunk::physPgSz() <<
596 ", subdividing into logical pages of size " <<
597 PageChunk::pagesize() <<
", adjusting nPgPerGroup " <<
607 PagePool::~PagePool()
615 void PagePool::zap(
Pages&
p)
620 if ((*it)->contains(
p)) {
631 Pages PagePool::pop()
636 Chunk *
c =
new Chunk(
this,
675 void PagePool::updateCurSz(
int sz,
int incr)
687 int PagePool::nextChunkSz()
const
717std::list<BidirMMapPipe*> BidirMMapPipe::s_openpipes;
718BidirMMapPipe_impl::PagePool* BidirMMapPipe::s_pagepool =
nullptr;
719unsigned BidirMMapPipe::s_pagepoolrefcnt = 0;
720int BidirMMapPipe::s_debugflag = 0;
722BidirMMapPipe_impl::PagePool& BidirMMapPipe::pagepool()
729void BidirMMapPipe::teardownall(
void)
736 p->doClose(
true,
true);
746 { BidirMMapPipe_impl::Pages
p;
p.swap(
m_pages); }
761 int fds[4] = { -1, -1, -1, -1 };
771 if (0 !=
atexit(BidirMMapPipe::teardownall))
776 for (
unsigned i = 1; i <
TotPages; ++i)
800 if (-1 == ::close(
fds[0]) || (-1 == ::close(
fds[3]))) {
810 if (-1 == ::close(
fds[0])) {
824 p->doClose(
true,
true);
846 if (-1 == ::close(
fds[1]) || -1 == ::close(
fds[2])) {
856 if (-1 == ::close(
fds[1])) {
897 }
catch (BidirMMapPipe::Exception&) {
899 for (
int i = 0; i < 4; ++i)
900 if (-1 !=
fds[i] && 0 !=
fds[i]) ::close(
fds[i]);
903 BidirMMapPipe_impl::Pages
p;
p.swap(
m_pages);
913int BidirMMapPipe::close()
953 while ((err = ::poll(&
fds, 1, 1 << 20)) >= 0) {
969 { BidirMMapPipe_impl::Pages
p;
p.swap(
m_pages); }
974 }
catch (std::exception&) {
991 std::list<BidirMMapPipe*>::iterator it = std::find(
999BidirMMapPipe::~BidirMMapPipe()
1002BidirMMapPipe::size_type BidirMMapPipe::xferraw(
1003 int fd,
void*
addr, size_type
len,
1007 unsigned char* buf =
reinterpret_cast<unsigned char*
>(
addr);
1015 }
else if (0 ==
tmp) {
1018 }
else if (-1 ==
tmp) {
1029#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
1032 std::cerr <<
" ERROR: In " <<
__func__ <<
" (" <<
1034 "): expect transfer to block!" << std::endl;
1040 throw Exception(
"xferraw: unexpected return value from read/write",
1047void BidirMMapPipe::sendpages(
Page*
plist)
1052 if (BidirMMapPipe_impl::PageChunk::Copy ==
1053 BidirMMapPipe_impl::PageChunk::mmapVariety()) {
1056 if (
sizeof(
Page) +
p->size() !=
1069unsigned BidirMMapPipe::recvpages()
1078 if (BidirMMapPipe_impl::PageChunk::Copy ==
1079 BidirMMapPipe_impl::PageChunk::mmapVariety()) {
1086 if (
p->empty())
continue;
1103unsigned BidirMMapPipe::recvpages_nonblock()
1111 int rc = ::poll(&
fds, 1, 0);
1128unsigned BidirMMapPipe::lenPageList(
const Page*
p)
1131 for ( ;
p;
p =
p->next()) ++
n;
1135void BidirMMapPipe::feedPageLists(
Page*
plist)
1150 p->setNext(
nullptr);
1186 p->setNext(
nullptr);
1197 fds[0].events =
fds[0].revents = 0;
1200 fds[1].events =
fds[1].revents = 0;
1218 if (!
ret) ok =
false;
1232void BidirMMapPipe::markPageDirty(
Page*
p)
1238 p->setNext(
nullptr);
1241 while (
dl &&
dl->next())
dl =
dl->next();
1242 if (
dl)
dl->setNext(
p);
1246BidirMMapPipe::Page* BidirMMapPipe::busypage()
1257BidirMMapPipe::Page* BidirMMapPipe::dirtypage()
1263 if (
p)
while (
p->next())
p =
p->next();
1264 if (!
p ||
p->full()) {
1272void BidirMMapPipe::flush()
1286 p->setNext(
nullptr);
1295void BidirMMapPipe::purge()
1301 while (
l &&
l->next())
l =
l->next();
1312BidirMMapPipe::size_type BidirMMapPipe::bytesReadableNonBlocking()
1323BidirMMapPipe::size_type BidirMMapPipe::bytesWritableNonBlocking()
1364 retVal += Page::capacity();
1369BidirMMapPipe::size_type BidirMMapPipe::read(
void*
addr, size_type
sz)
1372 size_type
nread = 0;
1373 unsigned char *
ap =
reinterpret_cast<unsigned char*
>(
addr);
1382 unsigned char* pp =
p->
begin() +
p->pos();
1383 size_type
csz = std::min(size_type(
p->remaining()),
sz);
1384 std::copy(pp, pp +
csz,
ap);
1390 if (
p->size() ==
p->pos()) {
1393 p->setNext(
nullptr);
1405BidirMMapPipe::size_type BidirMMapPipe::write(
const void*
addr, size_type
sz)
1409 const unsigned char *
ap =
reinterpret_cast<const unsigned char*
>(
addr);
1418 unsigned char* pp =
p->
begin() +
p->size();
1419 size_type
csz = std::min(size_type(
p->free()),
sz);
1425 assert(
p->capacity() >=
p->size());
1440int BidirMMapPipe::poll(BidirMMapPipe::PollVector&
pipes,
int timeout)
1459 if (
pe.pipe->closed())
pe.revents |= Invalid;
1461 if (
pe.pipe->bad())
pe.revents |=
Error;
1463 if (
pe.pipe->eof())
pe.revents |= EndOfFile;
1472 if (
pe.pipe->m_freelist) {
1476 while (
dl &&
dl->next())
dl =
dl->next();
1477 if (
dl &&
dl->pos() < Page::capacity())
1484 std::vector<pollfd>
fds;
1493 tmp.events =
tmp.revents = 0;
1497 if (
pe.pipe->m_outpipe !=
tmp.fd) {
1501 unsigned(
tmp.fd =
pe.pipe->m_outpipe),
1520 for (std::vector<pollfd>::iterator it =
fds.
begin();
1539 if ((
fe.revents &
POLLIN) &&
fe.fd ==
pe.pipe->m_inpipe &&
1543 if (0 ==
pe.pipe->recvpages())
continue;
1546 int tmp = ::poll(&
fe, 1, 0);
1557 if (
pe.pipe->m_freelist) {
1561 while (
dl &&
dl->next())
dl =
dl->next();
1562 if (
dl &&
dl->pos() < Page::capacity())
1578 size_t sz = std::strlen(str);
1580 if (
sz) write(str,
sz);
1589 str =
reinterpret_cast<char*
>(std::realloc(str,
sz + 1));
1591 if (
sz) read(str,
sz);
1597BidirMMapPipe& BidirMMapPipe::operator<<(
const std::string& str)
1599 size_t sz = str.size();
1601 write(str.data(),
sz);
1612 for (
unsigned char c;
sz--; str.push_back(
c)) *
this >>
c;
1619#ifdef TEST_BIDIRMMAPPIPE
1625 while (
pipe.good() && !
pipe.eof()) {
1629 if (!
pipe)
return -1;
1630 if (
pipe.eof())
break;
1632 std::cout <<
"[CHILD] : read: " << str << std::endl;
1633 str =
"... early in the morning?";
1635 pipe << str << BidirMMapPipe::flush;
1637 if (str.empty())
break;
1638 if (!
pipe)
return -1;
1639 if (
pipe.eof())
break;
1640 std::cout <<
"[CHILD] : wrote: " << str << std::endl;
1658 for (
int i = 0; i < 5; ++i) {
1661 std::ostringstream buf;
1662 buf <<
"child pid " <<
::getpid() <<
" sends message " << i;
1663 std::string str = buf.str();
1664 std::cout <<
"[CHILD] : " << str << std::endl;
1665 pipe << str << BidirMMapPipe::flush;
1666 if (!
pipe)
return -1;
1667 if (
pipe.eof())
break;
1670 pipe <<
"" << BidirMMapPipe::flush;
1690 if (
pipe.eof())
break;
1691 pipe << str << BidirMMapPipe::flush;
1693 if (!std::strlen(str))
break;
1706 if (!std::strlen(str))
break;
1708 pipe <<
"" << BidirMMapPipe::flush;
1718 for (
unsigned i = 0; i <= 24; ++i) {
1719 str =
reinterpret_cast<char*
>(std::realloc(str, (1 << i) + 1));
1720 std::memset(str,
'4', 1 << i);
1722 for (
unsigned j = 0;
j < 1 << 7; ++
j) {
1731 pipe <<
"" << BidirMMapPipe::flush;
1734 pipe <<
"" << BidirMMapPipe::flush;
1752#include <sys/time.h>
1758 std::cout <<
"[PARENT]: simple challenge-response test, "
1759 "one child:" << std::endl;
1761 for (
int i = 0; i < 5; ++i) {
1762 std::string str(
"What shall we do with a drunken sailor...");
1763 *
pipe << str << BidirMMapPipe::flush;
1764 if (!*
pipe)
return -1;
1765 std::cout <<
"[PARENT]: wrote: " << str << std::endl;
1767 if (!*
pipe)
return -1;
1768 std::cout <<
"[PARENT]: read: " << str << std::endl;
1771 *
pipe <<
"" << BidirMMapPipe::flush;
1776 std::cout <<
"[PARENT]: exit status of child: " <<
retVal <<
1784 std::cout << std::endl <<
"[PARENT]: polling test, " <<
nch <<
1785 " children:" << std::endl;
1786 typedef BidirMMapPipe::PollEntry
PollEntry;
1788 BidirMMapPipe::PollVector
pipes;
1791 for (
unsigned i = 0; i <
nch; ++i) {
1792 std::cout <<
"[PARENT]: spawning child " << i << std::endl;
1794 BidirMMapPipe::Readable));
1797 std::cout <<
"[PARENT]: waking up children" << std::endl;
1798 for (
unsigned i = 0; i <
nch; ++i)
1799 *
pipes[i].
pipe <<
"" << BidirMMapPipe::flush;
1800 std::cout <<
"[PARENT]: waiting for events on children's pipes" << std::endl;
1802 while (!
pipes.empty()) {
1806 for (std::vector<PollEntry>::iterator it =
pipes.
begin();
1815 if (it->revents & BidirMMapPipe::Readable) {
1819 std::cout <<
"[PARENT]: Read from pipe " << it->pipe <<
1820 ": " << s << std::endl;
1825 *(it->pipe) <<
"" << BidirMMapPipe::flush;
1830 if (it->revents & (BidirMMapPipe::Error |
1831 BidirMMapPipe::EndOfFile |
1832 BidirMMapPipe::Invalid)) {
1833 std::cerr <<
"[DEBUG]: Event on pipe " << it->pipe <<
1835 ((it->revents & BidirMMapPipe::Readable) ?
" Readable" :
"") <<
1845 int retVal = it->pipe->close();
1846 std::cout <<
"[PARENT]: child exit status: " <<
1847 retVal <<
", number of children still alive: " <<
1848 (
pipes.size() - 1) << std::endl;
1851 it =
pipes.erase(it);
1859 std::cout << std::endl <<
"[PARENT]: benchmark: round-trip times vs block size" << std::endl;
1860 for (
unsigned i = 0; i <= 24; ++i) {
1861 std::vector<char> s(1 + (1 << i));
1862 std::memset(s,
'A', 1 << i);
1864 const unsigned n = 1 << 7;
1865 double avg = 0., min = 1
e42, max = -1
e42;
1867 for (
unsigned j =
n;
j--; ) {
1870 *
pipe << s << BidirMMapPipe::flush;
1876 t2.tv_sec -=
t1.tv_sec;
1877 t2.tv_usec -=
t1.tv_usec;
1879 if (
dt < min) min =
dt;
1880 if (
dt > max) max =
dt;
1884 *
pipe <<
"" << BidirMMapPipe::flush;
1891 std::cout <<
"[PARENT]: child exited with code " <<
retVal << std::endl;
1898 std::cout <<
"block size " << std::setw(9) << (1 << i) <<
1899 " avg " << std::setw(7) <<
avg <<
" us min " <<
1900 std::setw(7) << min <<
" us max " << std::setw(7) << max <<
1901 "us speed " << std::setw(9) <<
1903 " MB/s" << std::endl;
1905 std::cout <<
"[PARENT]: all children had exit code 0" << std::endl;
1909 std::cout << std::endl <<
"[PARENT]: benchmark: raw transfer rate with child as sink" << std::endl;
1910 for (
unsigned i = 0; i <= 24; ++i) {
1911 std::vector<char> s(1 + (1 << i));
1912 std::memset(s,
'A', 1 << i);
1914 const unsigned n = 1 << 7;
1915 double avg = 0., min = 1
e42, max = -1
e42;
1917 for (
unsigned j =
n;
j--; ) {
1925 t2.tv_sec -=
t1.tv_sec;
1926 t2.tv_usec -=
t1.tv_usec;
1928 if (
dt < min) min =
dt;
1929 if (
dt > max) max =
dt;
1933 *
pipe <<
"" << BidirMMapPipe::flush;
1940 std::cout <<
"[PARENT]: child exited with code " <<
retVal << std::endl;
1944 std::cout <<
"block size " << std::setw(9) << (1 << i) <<
1945 " avg " << std::setw(7) <<
avg <<
" us min " <<
1946 std::setw(7) << min <<
" us max " << std::setw(7) << max <<
1947 "us speed " << std::setw(9) <<
1949 " MB/s" << std::endl;
1951 std::cout <<
"[PARENT]: all children had exit code 0" << std::endl;
1955 std::cout << std::endl <<
"[PARENT]: benchmark: raw transfer rate with child as source" << std::endl;
1957 double avg = 0., min = 1
e42, max = -1
e42;
1958 unsigned n = 0,
bsz = 0;
1968 t2.tv_sec -=
t1.tv_sec;
1969 t2.tv_usec -=
t1.tv_usec;
1971 if (std::strlen(s)) {
1973 if (
dt < min) min =
dt;
1974 if (
dt > max) max =
dt;
1976 bsz = std::strlen(s);
1983 std::cout <<
"block size " << std::setw(9) <<
bsz <<
1984 " avg " << std::setw(7) <<
avg <<
" us min " <<
1985 std::setw(7) << min <<
" us max " << std::setw(7) <<
1986 max <<
"us speed " << std::setw(9) <<
1988 " MB/s" << std::endl;
1996 std::cout <<
"[PARENT]: child exited with code " <<
retVal << std::endl;
ROOT::R::TRInterface & Exception()
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
winID h TVirtualViewer3D TVirtualGLPainter p
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
Binding & operator=(OUT(*fun)(void))
R__EXTERN TSystem * gSystem
const_iterator begin() const
const_iterator end() const
virtual const char * TempDirectory() const
Return a user configured or systemwide directory to create temporary files in.
void Copy(void *source, void *dest)
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
static unsigned long masks[]