29#include <sys/socket.h>
33#define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
34#define END_NAMESPACE_ROOFIT }
45 class BidirMMapPipeException :
public std::exception
54 static int dostrerror_r(
int err,
char* buf, std::size_t sz,
55 int (*
f)(
int,
char*, std::size_t))
56 {
return f(err, buf, sz); }
58 static int dostrerror_r(
int,
char*, std::size_t,
59 char* (*
f)(
int,
char*, std::size_t));
62 BidirMMapPipeException(
const char* msg,
int err);
64 virtual const char*
what()
const noexcept {
return m_buf; }
67 BidirMMapPipeException::BidirMMapPipeException(
const char* msg,
int err)
69 std::size_t msgsz = std::strlen(msg);
71 msgsz = std::min(msgsz, std::size_t(s_sz));
72 std::copy(msg, msg + msgsz, m_buf);
73 if (msgsz < s_sz) { m_buf[msgsz] =
':'; ++msgsz; }
74 if (msgsz < s_sz) { m_buf[msgsz] =
' '; ++msgsz; }
79 dostrerror_r(err, &m_buf[msgsz], s_sz - msgsz, ::strerror_r);
84 int BidirMMapPipeException::dostrerror_r(
int err,
char* buf,
85 std::size_t sz,
char* (*
f)(
int,
char*, std::size_t))
88 char *tmp =
f(err, buf, sz);
89 if (tmp && tmp != buf) {
90 std::strncpy(buf, tmp, sz);
92 if (std::strlen(tmp) > sz - 1)
return ERANGE;
112 unsigned short m_size;
113 unsigned short m_pos;
120 Page() : m_next(0), m_size(0), m_pos(0)
124 assert(std::numeric_limits<unsigned short>::max() >=
128 void setNext(
const Page* p);
132 unsigned short& size() {
return m_size; }
134 unsigned size()
const {
return m_size; }
136 unsigned short& pos() {
return m_pos; }
138 unsigned pos()
const {
return m_pos; }
140 inline unsigned char* begin()
const
141 {
return reinterpret_cast<unsigned char*
>(
const_cast<Page*
>(
this))
144 inline unsigned char* end()
const
145 {
return reinterpret_cast<unsigned char*
>(
const_cast<Page*
>(
this))
148 static unsigned capacity()
151 bool empty()
const {
return !m_size; }
153 bool filled()
const {
return !empty(); }
155 unsigned free()
const {
return capacity() - m_size; }
157 unsigned remaining()
const {
return m_size - m_pos; }
159 bool full()
const {
return !
free(); }
162 void Page::setNext(
const Page* p)
167 const char* p1 =
reinterpret_cast<char*
>(
this);
168 const char* p2 =
reinterpret_cast<const char*
>(p);
169 std::ptrdiff_t tmp = p2 - p1;
175 assert(m_next == tmp);
181 Page* Page::next()
const
183 if (!m_next)
return 0;
184 char* ptmp =
reinterpret_cast<char*
>(
const_cast<Page*
>(
this));
186 return reinterpret_cast<Page*
>(ptmp);
209 typedef BidirMMapPipeException
Exception;
219 typedef std::list<Chunk*> ChunkList;
250 unsigned m_szmap[(maxsz - minsz) / szincr];
257 void updateCurSz(
int sz,
int incr);
259 int nextChunkSz()
const;
261 void putOnFreeList(Chunk* chunk);
263 void release(Chunk* chunk);
291 m_pimpl(other.m_pimpl)
296 if (&other ==
this)
return *
this;
310 assert(pgno < m_pimpl->m_npages);
311 unsigned char* pptr =
314 return reinterpret_cast<Page*
>(pptr);
319 const unsigned char* pptr =
320 reinterpret_cast<const unsigned char*
>(p);
321 const unsigned char* bptr =
323 assert(0 == ((pptr - bptr) %
pagesize()));
324 const unsigned nr = (pptr - bptr) /
pagesize();
325 assert(nr < m_pimpl->m_npages);
332 long pgsz = sysconf(_SC_PAGESIZE);
333 if (-1 == pgsz)
throw Exception(
"sysconf", errno);
334 if (pgsz > 512 && pgsz >
long(
sizeof(Page)))
344 unsigned length,
unsigned nPgPerGroup) :
347 reinterpret_cast<unsigned char*>(
m_begin) + length)),
351 unsigned char* p =
reinterpret_cast<unsigned char*
>(
m_begin);
352 unsigned char* pend =
reinterpret_cast<unsigned char*
>(
m_end);
354 m_freelist.push_back(
reinterpret_cast<void*
>(p));
355 p += nPgPerGroup * PagePool::pagesize();
381 m_freelist.push_front(
reinterpret_cast<void*
>(p[0u]));
385 if (wasempty)
m_parent->putOnFreeList(
this);
405 static bool msgprinted =
false;
407#if defined(MAP_ANONYMOUS)
409#define MYANONFLAG MAP_ANONYMOUS
410#elif defined(MAP_ANON)
412#define MYANONFLAG MAP_ANON
417 void* retVal = ::mmap(0,
len, PROT_READ | PROT_WRITE,
418 MYANONFLAG | MAP_SHARED, -1, 0);
419 if (MAP_FAILED == retVal) {
425 std::cerr <<
" INFO: In " << __func__ <<
" (" <<
426 __FILE__ <<
", line " << __LINE__ <<
427 "): anonymous mmapping works, excellent!" <<
439 int fd =
::open(
"/dev/zero", O_RDWR);
441 throw Exception(
"open /dev/zero", errno);
442 void* retVal = ::mmap(0,
len,
443 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
444 if (MAP_FAILED == retVal) {
453 throw Exception(
"close /dev/zero", errno);
455 std::cerr <<
" INFO: In " << __func__ <<
" (" << __FILE__ <<
456 ", line " << __LINE__ <<
"): mmapping /dev/zero works, "
457 "very good!" << std::endl;
463 char name[] =
"/tmp/BidirMMapPipe-XXXXXX";
466 if (-1 == (fd = ::mkstemp(
name)))
throw Exception(
"mkstemp", errno);
468 if (-1 == ::unlink(
name)) {
474 if (-1 == ::lseek(fd,
len - 1, SEEK_SET)) {
480 if (1 != ::write(fd,
name, 1)) {
486 void* retVal = ::mmap(0,
len,
487 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
488 if (MAP_FAILED == retVal) {
498 ::munmap(retVal,
len);
502 std::cerr <<
" INFO: In " << __func__ <<
" (" << __FILE__ <<
503 ", line " << __LINE__ <<
"): mmapping temporary files "
504 "works, good!" << std::endl;
515 std::cerr <<
"WARNING: In " << __func__ <<
" (" << __FILE__ <<
516 ", line " << __LINE__ <<
"): anonymous mmapping of "
517 "shared buffers failed, falling back to read/write on "
518 " pipes!" << std::endl;
523 if (!retVal)
throw Exception(
"malloc", errno);
537 if (-1 == ::munmap(addr,
len))
559 unsigned char* p0 =
reinterpret_cast<unsigned char*
>(
m_begin);
560 unsigned char* p1 =
reinterpret_cast<unsigned char*
>(p[0u]);
562 unsigned char* p3 =
reinterpret_cast<unsigned char*
>(
m_end);
563 if (p1 != p0) ::mprotect(p0, p1 - p0, PROT_NONE);
564 if (p2 != p3) ::mprotect(p2, p3 - p2, PROT_NONE);
575 PagePool::PagePool(
unsigned nPgPerGroup) :
581 const unsigned mult =
585 const unsigned actual = mult *
586 (desired / mult +
bool(desired % mult));
589 std::cerr <<
" INFO: In " << __func__ <<
" (" <<
590 __FILE__ <<
", line " << __LINE__ <<
592 ", subdividing into logical pages of size " <<
600 std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
603 PagePool::~PagePool()
606 for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it)
611 void PagePool::zap(Pages& p)
615 for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it) {
616 if ((*it)->contains(p)) {
623 std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
627 Pages PagePool::pop()
631 const int sz = nextChunkSz();
632 Chunk *
c =
new Chunk(
this,
634 m_chunks.push_front(
c);
648 assert(chunk->empty());
650 ChunkList::iterator it = std::find(
653 throw Exception(
"PagePool::release(PageChunk*)", EINVAL);
656 it = std::find(m_chunks.begin(), m_chunks.end(), chunk);
657 if (m_chunks.end() == it)
658 throw Exception(
"PagePool::release(PageChunk*)", EINVAL);
665 void PagePool::putOnFreeList(
PageChunk* chunk)
667 assert(!chunk->full());
671 void PagePool::updateCurSz(
int sz,
int incr)
673 m_szmap[(sz - minsz) / szincr] += incr;
675 for (
int i = (maxsz - minsz) / szincr; i--; ) {
677 m_cursz += i * szincr;
683 int PagePool::nextChunkSz()
const
687 if (m_chunks.empty()) {
695 if (1 != m_chunks.size()) {
705 if (sz > maxsz) sz = maxsz;
706 if (sz < minsz) sz = minsz;
739 m_pages(pagepool().
pop())
750 m_pages(pagepool().
pop()), m_busylist(0),
m_freelist(0), m_dirtylist(0),
751 m_inpipe(-1), m_outpipe(-1), m_flags(failbit), m_childPid(0),
752 m_parentPid(::getpid())
757 int fds[4] = { -1, -1, -1, -1 };
759 static bool firstcall =
true;
772 for (
unsigned i = 1; i <
TotPages; ++i)
775 if (!useSocketpair) {
777 if (0 != ::pipe(&fds[0]))
throw Exception(
"pipe", errno);
778 if (0 != ::pipe(&fds[2]))
throw Exception(
"pipe", errno);
780 if (0 != ::socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]))
801 fds[0] = fds[3] = -1;
816 for (std::list<BidirMMapPipe*>::iterator it =
s_openpipes.begin();
833 throw Exception(
"handshake: xferraw write", EPIPE);
835 throw Exception(
"handshake: xferraw read", EPIPE);
836 if (
'P' !=
c)
throw Exception(
"handshake", EPIPE);
847 fds[1] = fds[2] = -1;
869 throw Exception(
"handshake: xferraw write", EPIPE);
871 throw Exception(
"handshake: xferraw read", EPIPE);
872 if (
'C' !=
c)
throw Exception(
"handshake", EPIPE);
878 if (-1 == ::fcntl(
m_outpipe, F_GETFD, &fdflags))
880 fdflags |= FD_CLOEXEC;
881 if (-1 == ::fcntl(
m_outpipe, F_SETFD, fdflags))
884 if (-1 == ::fcntl(
m_inpipe, F_GETFD, &fdflags))
886 fdflags |= FD_CLOEXEC;
887 if (-1 == ::fcntl(
m_inpipe, F_SETFD, fdflags))
895 for (
int i = 0; i < 4; ++i)
896 if (-1 != fds[i] && 0 != fds[i])
::close(fds[i]);
927 if (!force)
throw Exception(
"close", errno);
949 while ((err =
::poll(&fds, 1, 1 << 20)) >= 0) {
950 if (fds.revents & (POLLERR | POLLHUP | POLLNVAL))
break;
951 if (fds.revents & POLLIN) {
956 }
while (0 > err && EINTR == errno);
961 if (!force)
throw Exception(
"close", errno);
970 }
catch (std::exception&) {
980 }
while (-1 == tmp && EINTR == errno);
982 if (!force)
throw Exception(
"waitpid", errno);
987 std::list<BidirMMapPipe*>::iterator it = std::find(
1000 ssize_t (*xferfn)(
int,
void*, std::size_t))
1003 unsigned char* buf =
reinterpret_cast<unsigned char*
>(addr);
1005 ssize_t tmp = xferfn(fd, buf, len);
1011 }
else if (0 == tmp) {
1014 }
else if (-1 == tmp) {
1021 if (xferred)
return xferred;
1025#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
1028 std::cerr <<
" ERROR: In " << __func__ <<
" (" <<
1029 __FILE__ <<
", line " << __LINE__ <<
1030 "): expect transfer to block!" << std::endl;
1036 throw Exception(
"xferraw: unexpected return value from read/write",
1046 unsigned char pg =
m_pages[plist];
1051 for (
Page* p = plist; p; p = p->next()) {
1052 if (
sizeof(
Page) + p->size() !=
1055 throw Exception(
"sendpages: short write", EPIPE);
1060 throw Exception(
"sendpages: short write", EPIPE);
1062 }
else { assert(plist); }
1068 unsigned retVal = 0;
1069 Page *plisthead = 0, *plisttail = 0;
1071 plisthead = plisttail =
m_pages[pg];
1076 for (; plisttail; ++retVal) {
1077 Page* p = plisttail;
1080 plisttail = p->next();
1081 if (!p->size())
continue;
1102 fds.events = POLLIN;
1104 unsigned retVal = 0;
1106 int rc =
::poll(&fds, 1, 0);
1108 if (EINTR == errno)
continue;
1111 if (1 == retVal && fds.revents & POLLIN &&
1112 !(fds.revents & (POLLNVAL | POLLERR))) {
1126 for ( ; p; p = p->next()) ++
n;
1135 while (blend && blend->next()) blend = blend->next();
1139 Page *sendlisthead = 0, *sendlisttail = 0;
1149 if (blend) blend->setNext(p);
1160 if (!sendlisthead) sendlisthead = p;
1161 if (sendlisttail) sendlisttail->setNext(p);
1181 sendlisttail->setNext(p);
1189 struct pollfd fds[2];
1191 fds[0].events = fds[0].revents = 0;
1194 fds[1].events = fds[1].revents = 0;
1196 fds[0].events |= POLLIN;
1200 retVal =
::poll(fds, nfds, 0);
1201 if (0 > retVal && EINTR == errno)
1206 bool ok = !(fds[0].revents & (POLLERR | POLLNVAL | POLLHUP));
1208 ok = ok && !(fds[1].revents & (POLLERR | POLLNVAL | POLLHUP));
1210 if (ok && fds[0].revents & POLLIN) {
1212 if (!ret) ok =
false;
1221 throw Exception(
"feedPageLists: poll", errno);
1235 while (dl && dl->next()) dl = dl->next();
1236 if (dl) dl->setNext(p);
1257 if (p)
while (p->next()) p = p->next();
1258 if (!p || p->full()) {
1273 Page *flushlisthead = 0, *flushlisttail = 0;
1276 if (!forcePartialPages && !p->full())
break;
1281 if (!flushlisthead) flushlisthead = p;
1282 if (flushlisttail) flushlisttail->setNext(p);
1285 if (flushlisthead)
sendpages(flushlisthead);
1294 while (
l &&
l->next())
l =
l->next();
1312 retVal += p->size() - p->pos();
1323 bool couldwrite =
false;
1327 fds.events = POLLOUT;
1331 retVal =
::poll(&fds, 1, 0);
1333 if (EINTR == errno)
continue;
1334 throw Exception(
"bytesWritableNonBlocking: poll", errno);
1336 if (1 == retVal && fds.revents & POLLOUT &&
1337 !(fds.revents & (POLLNVAL | POLLERR | POLLHUP)))
1344 unsigned npages = 0;
1350 retVal += p->free();
1355 npages <
FlushThresh || couldwrite); p = p->next()) {
1357 retVal += Page::capacity();
1366 unsigned char *ap =
reinterpret_cast<unsigned char*
>(addr);
1375 unsigned char* pp = p->begin() + p->pos();
1377 std::copy(pp, pp + csz, ap);
1382 assert(p->size() >= p->pos());
1383 if (p->size() == p->pos()) {
1402 const unsigned char *ap =
reinterpret_cast<const unsigned char*
>(addr);
1411 unsigned char* pp = p->begin() + p->size();
1413 std::copy(ap, ap + csz, pp);
1418 assert(p->capacity() >= p->size());
1438 bool canskiptimeout =
false;
1440 std::vector<unsigned>::iterator mit =
masks.begin();
1441 for (PollVector::iterator it = pipes.begin(); pipes.end() != it;
1463 while (dl && dl->next()) dl = dl->next();
1464 if (dl && dl->pos() < Page::capacity())
1468 if (pe.
revents) canskiptimeout =
true;
1471 std::vector<pollfd> fds;
1472 fds.reserve(2 * pipes.size());
1473 std::map<int, PollEntry*> fds2pipes;
1474 for (PollVector::const_iterator it = pipes.begin();
1475 pipes.end() != it; ++it) {
1478 fds2pipes.insert(std::make_pair((tmp.fd = pe.
pipe->
m_inpipe),
1480 tmp.
events = tmp.revents = 0;
1483 tmp.events |= POLLIN;
1487 fds2pipes.insert(std::make_pair(
1499 retVal =
::poll(&fds[0], fds.size(), canskiptimeout ? 0 : timeout);
1501 if (EINTR == errno)
continue;
1507 for (std::vector<pollfd>::iterator it = fds.begin();
1508 fds.end() != it; ++it) {
1514 if (fe.revents & POLLNVAL && fe.fd == pe.
pipe->
m_inpipe)
1518 if (fe.revents & POLLERR && fe.fd == pe.
pipe->
m_inpipe)
1522 if (fe.revents & POLLHUP && fe.fd == pe.
pipe->
m_inpipe)
1526 if ((fe.revents & POLLIN) && fe.fd == pe.
pipe->
m_inpipe &&
1527 !(fe.revents & (POLLNVAL | POLLERR))) {
1533 int tmp =
::poll(&fe, 1, 0);
1534 if (tmp > 0)
goto oncemore;
1536 if (EINTR == errno)
continue;
1548 while (dl && dl->next()) dl = dl->next();
1549 if (dl && dl->pos() < Page::capacity())
1556 mit =
masks.begin();
1557 for (PollVector::iterator it = pipes.begin();
1558 pipes.end() != it; ++it, ++mit)
1559 if ((it->revents &= *mit)) ++npipes;
1565 size_t sz = std::strlen(str);
1567 if (sz)
write(str, sz);
1576 str =
reinterpret_cast<char*
>(
std::realloc(str, sz + 1));
1577 if (!str)
throw Exception(
"realloc", errno);
1578 if (sz)
read(str, sz);
1586 size_t sz = str.size();
1588 write(str.data(), sz);
1599 for (
unsigned char c; sz--; str.push_back(
c)) *
this >>
c;
1606#ifdef TEST_BIDIRMMAPPIPE
1612 while (pipe.
good() && !pipe.
eof()) {
1616 if (!pipe)
return -1;
1617 if (pipe.
eof())
break;
1619 std::cout <<
"[CHILD] : read: " << str << std::endl;
1620 str =
"... early in the morning?";
1624 if (str.empty())
break;
1625 if (!pipe)
return -1;
1626 if (pipe.
eof())
break;
1627 std::cout <<
"[CHILD] : wrote: " << str << std::endl;
1637 ::srand48(::getpid());
1645 for (
int i = 0; i < 5; ++i) {
1647 ::usleep(
int(1e6 * ::drand48()));
1648 std::ostringstream buf;
1649 buf <<
"child pid " << ::getpid() <<
" sends message " << i;
1650 std::string str = buf.str();
1651 std::cout <<
"[CHILD] : " << str << std::endl;
1653 if (!pipe)
return -1;
1654 if (pipe.
eof())
break;
1670 while (pipe && !pipe.
eof()) {
1677 if (pipe.
eof())
break;
1680 if (!std::strlen(str))
break;
1691 while (pipe && !pipe.
eof()) {
1693 if (!std::strlen(str))
break;
1705 for (
unsigned i = 0; i <= 24; ++i) {
1706 str =
reinterpret_cast<char*
>(
std::realloc(str, (1 << i) + 1));
1707 std::memset(str,
'4', 1 << i);
1709 for (
unsigned j = 0; j < 1 << 7; ++j) {
1711 if (!pipe || pipe.
eof()) {
1732 int retVal = childexec(*p);
1739#include <sys/time.h>
1745 std::cout <<
"[PARENT]: simple challenge-response test, "
1746 "one child:" << std::endl;
1748 for (
int i = 0; i < 5; ++i) {
1749 std::string str(
"What shall we do with a drunken sailor...");
1751 if (!*pipe)
return -1;
1752 std::cout <<
"[PARENT]: wrote: " << str << std::endl;
1754 if (!*pipe)
return -1;
1755 std::cout <<
"[PARENT]: read: " << str << std::endl;
1762 int retVal = pipe->
close();
1763 std::cout <<
"[PARENT]: exit status of child: " << retVal <<
1765 if (retVal)
return retVal;
1771 std::cout << std::endl <<
"[PARENT]: polling test, " << nch <<
1772 " children:" << std::endl;
1778 for (
unsigned i = 0; i < nch; ++i) {
1779 std::cout <<
"[PARENT]: spawning child " << i << std::endl;
1780 pipes.push_back(PollEntry(spawnChild(randomchild),
1784 std::cout <<
"[PARENT]: waking up children" << std::endl;
1785 for (
unsigned i = 0; i < nch; ++i)
1787 std::cout <<
"[PARENT]: waiting for events on children's pipes" << std::endl;
1789 while (!pipes.empty()) {
1793 for (std::vector<PollEntry>::iterator it = pipes.begin();
1794 npipes && pipes.end() != it; ) {
1806 std::cout <<
"[PARENT]: Read from pipe " << it->pipe <<
1807 ": " <<
s << std::endl;
1820 std::cerr <<
"[DEBUG]: Event on pipe " << it->pipe <<
1832 int retVal = it->pipe->close();
1833 std::cout <<
"[PARENT]: child exit status: " <<
1834 retVal <<
", number of children still alive: " <<
1835 (pipes.size() - 1) << std::endl;
1836 if (retVal)
return retVal;
1838 it = pipes.erase(it);
1846 std::cout << std::endl <<
"[PARENT]: benchmark: round-trip times vs block size" << std::endl;
1847 for (
unsigned i = 0; i <= 24; ++i) {
1848 char *
s =
new char[1 + (1 << i)];
1849 std::memset(
s,
'A', 1 << i);
1851 const unsigned n = 1 << 7;
1852 double avg = 0., min = 1e42, max = -1e42;
1854 for (
unsigned j =
n; j--; ) {
1856 ::gettimeofday(&
t1, 0);
1858 if (!*pipe || pipe->
eof())
break;
1860 if (!*pipe || pipe->
eof())
break;
1862 ::gettimeofday(&t2, 0);
1863 t2.tv_sec -=
t1.tv_sec;
1864 t2.tv_usec -=
t1.tv_usec;
1866 if (dt < min) min = dt;
1867 if (dt > max) max = dt;
1875 avg *= 1e6; min *= 1e6; max *= 1e6;
1876 int retVal = pipe->
close();
1878 std::cout <<
"[PARENT]: child exited with code " << retVal << std::endl;
1886 std::cout <<
"block size " << std::setw(9) << (1 << i) <<
1887 " avg " << std::setw(7) << avg <<
" us min " <<
1888 std::setw(7) << min <<
" us max " << std::setw(7) << max <<
1889 "us speed " << std::setw(9) <<
1891 " MB/s" << std::endl;
1894 std::cout <<
"[PARENT]: all children had exit code 0" << std::endl;
1898 std::cout << std::endl <<
"[PARENT]: benchmark: raw transfer rate with child as sink" << std::endl;
1899 for (
unsigned i = 0; i <= 24; ++i) {
1900 char *
s =
new char[1 + (1 << i)];
1901 std::memset(
s,
'A', 1 << i);
1903 const unsigned n = 1 << 7;
1904 double avg = 0., min = 1e42, max = -1e42;
1906 for (
unsigned j =
n; j--; ) {
1908 ::gettimeofday(&
t1, 0);
1911 if (!*pipe || pipe->
eof())
break;
1913 ::gettimeofday(&t2, 0);
1914 t2.tv_sec -=
t1.tv_sec;
1915 t2.tv_usec -=
t1.tv_usec;
1917 if (dt < min) min = dt;
1918 if (dt > max) max = dt;
1926 avg *= 1e6; min *= 1e6; max *= 1e6;
1927 int retVal = pipe->
close();
1929 std::cout <<
"[PARENT]: child exited with code " << retVal << std::endl;
1933 std::cout <<
"block size " << std::setw(9) << (1 << i) <<
1934 " avg " << std::setw(7) << avg <<
" us min " <<
1935 std::setw(7) << min <<
" us max " << std::setw(7) << max <<
1936 "us speed " << std::setw(9) <<
1938 " MB/s" << std::endl;
1941 std::cout <<
"[PARENT]: all children had exit code 0" << std::endl;
1945 std::cout << std::endl <<
"[PARENT]: benchmark: raw transfer rate with child as source" << std::endl;
1947 double avg = 0., min = 1e42, max = -1e42;
1948 unsigned n = 0, bsz = 0;
1950 while (*pipe && !pipe->
eof()) {
1952 ::gettimeofday(&
t1, 0);
1955 if (!*pipe || pipe->
eof())
break;
1957 ::gettimeofday(&t2, 0);
1958 t2.tv_sec -=
t1.tv_sec;
1959 t2.tv_usec -=
t1.tv_usec;
1961 if (std::strlen(
s)) {
1963 if (dt < min) min = dt;
1964 if (dt > max) max = dt;
1966 bsz = std::strlen(
s);
1971 avg *= 1e6; min *= 1e6; max *= 1e6;
1973 std::cout <<
"block size " << std::setw(9) << bsz <<
1974 " avg " << std::setw(7) << avg <<
" us min " <<
1975 std::setw(7) << min <<
" us max " << std::setw(7) <<
1976 max <<
"us speed " << std::setw(9) <<
1978 " MB/s" << std::endl;
1985 int retVal = pipe->
close();
1986 std::cout <<
"[PARENT]: child exited with code " << retVal << std::endl;
1987 if (retVal)
return retVal;
#define END_NAMESPACE_ROOFIT
#define BEGIN_NAMESPACE_ROOFIT
header file for BidirMMapPipe, a class which forks off a child process and serves as communications c...
ROOT::R::TRInterface & Exception()
Binding & operator=(OUT(*fun)(void))
typedef void((*Func_t)())
unsigned revents
events that happened (or'ed bitmask)
BidirMMapPipe * pipe
pipe of interest
unsigned events
events of interest (or'ed bitmask)
class representing a chunk of pages
PageChunk(const PageChunk &)
forbid copying
BidirMMapPipeException Exception
convenience typedef
unsigned m_nUsedGrp
number of used page groups
unsigned len() const
return length of chunk
unsigned nPagesPerGroup() const
return number of pages per page group
void * m_begin
pointer to start of mmapped area
static unsigned physPgSz()
return the physical page size of the system
static unsigned s_physpgsz
system physical page size
void * m_end
pointer one behind end of mmapped area
void push(const Pages &p)
push a group of pages onto the free list
bool contains(const Pages &p) const
return if p is contained in this PageChunk
PagePool * m_parent
parent page pool
std::list< void * > m_freelist
free pages list
static MMapVariety s_mmapworks
mmap variety that works on this system
MMapVariety
type of mmap support found
@ Copy
mmap doesn't work, have to copy back and forth
@ Unknown
don't know yet what'll work
@ DevZero
mmapping /dev/zero works
@ Anonymous
anonymous mmap works
@ FileBacked
mmapping a temp file works
static void domunmap(void *p, unsigned len)
munmap pages p, len is length of mmapped area in bytes
static unsigned s_pagesize
logical page size (run-time determined)
unsigned m_nPgPerGrp
number of pages per group
static MMapVariety mmapVariety()
return mmap variety support found
Pages pop()
pop a group of pages off the free list
static unsigned pagesize()
return the logical page size
static void * dommap(unsigned len)
mmap pages, len is length of mmapped area in bytes
bool empty() const
return true if no used page groups in this chunk
void zap(Pages &p)
free all pages except for those pointed to by p
static unsigned getPageSize()
determine page size at run time
handle class for a number of Pages
Pages()
default constructor
static unsigned pagesize()
return page size
impl * m_pimpl
pointer to implementation
Pages & operator=(const Pages &other)
assignment operator
void swap(Pages &other)
swap with other's contents
Page * page(unsigned pgno) const
return page number pageno
unsigned npages() const
return number of pages accessible
unsigned pageno(Page *p) const
perform page to page number mapping
BidirMMapPipe creates a bidirectional channel between the current process and a child it forks.
void purge()
purge buffered data waiting to be read and/or written
static BidirMMapPipe_impl::PagePool * s_pagepool
pool of mmapped pages
void feedPageLists(Page *plist)
"feed" the busy and free lists with a list of pages
static std::list< BidirMMapPipe * > s_openpipes
list of open BidirMMapPipes
int close()
flush buffers, close pipe
BidirMMapPipe_impl::Page Page
convenience typedef for Page
BidirMMapPipe_impl::Pages m_pages
mmapped pages
@ PagesPerEnd
pages per pipe end
@ TotPages
pages shared (child + parent)
@ FlushThresh
flush threshold
bool good() const
status of stream is good
Page * m_dirtylist
linked list: dirty pages (data to be sent)
unsigned recvpages()
receive a pages from the other end (may block), queue them
BidirMMapPipe & operator>>(char *(&str))
read a C-style string
static pthread_mutex_t s_openpipesmutex
protects s_openpipes
unsigned recvpages_nonblock()
receive pages from other end (non-blocking)
~BidirMMapPipe()
destructor
int m_inpipe
pipe end from which data may be read
bool isChild() const
return if this end of the pipe is the child end
void doFlush(bool forcePartialPages=true)
perform the flush
static void teardownall(void)
cleanup routine - at exit, we want our children to get a SIGTERM...
size_type bytesWritableNonBlocking()
number of bytes that can be written without blocking
static BidirMMapPipe_impl::PagePool & pagepool()
return page pool
bool eof() const
true if end-of-file
size_type bytesReadableNonBlocking()
number of bytes that can be read without blocking
@ eofbit
end of file reached
@ failbit
logical failure (e.g. pipe closed)
@ exceptionsbit
error reporting with exceptions
static int debugflag()
return the current setting of the debug flag
bool isParent() const
return if this end of the pipe is the parent end
std::size_t size_type
type used to represent sizes
int doClose(bool force, bool holdlock=false)
close the pipe (no flush if forced)
bool bad() const
true on I/O error
@ WriteInvalid
write end of pipe invalid
@ ReadEndOfFile
read pipe in end-of-file state
@ WriteError
pipe error Write end
@ ReadError
pipe error read end
@ Readable
pipe has data for reading
@ None
nothing special on this pipe
@ ReadInvalid
read end of pipe invalid
@ WriteEndOfFile
write pipe in end-of-file state
@ Writable
pipe can be written to
int m_flags
flags (e.g. end of file)
Page * m_freelist
linked list: free pages
Page * busypage()
get a busy page to read data from (may block)
Page * dirtypage()
get a dirty page to write data to (may block)
void sendpages(Page *plist)
send page(s) to the other end (may block)
BidirMMapPipe & operator<<(const char *str)
write a C-style string
static size_type xferraw(int fd, void *addr, size_type len, ssize_t(*xferfn)(int, void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
size_type read(void *addr, size_type sz)
read from pipe
static int s_debugflag
debug flag
bool closed() const
true if closed
BidirMMapPipe_impl::BidirMMapPipeException Exception
convenience typedef for BidirMMapPipeException
size_type write(const void *addr, size_type sz)
wirte to pipe
std::vector< PollEntry > PollVector
convenience typedef for poll() interface
void flush()
flush buffers with unwritten data
static unsigned s_pagepoolrefcnt
page pool reference counter
Page * m_busylist
linked list: busy pages (data to be read)
static int poll(PollVector &pipes, int timeout)
poll a set of pipes for events (ready to read from, ready to write to, error)
int m_outpipe
pipe end to which data may be written
void markPageDirty(Page *p)
put on dirty pages list
BidirMMapPipe(bool useExceptions=true, bool useSocketpair=false)
constructor (forks!)
static unsigned lenPageList(const Page *list)
return length of a page list
pid_t m_childPid
pid of the child (zero if we're child)
int main(int argc, char **argv)
namespace for implementation details of BidirMMapPipe
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
static constexpr double s
unsigned char m_npages
length in pages
Page * m_pages
pointer to first page
unsigned m_refcnt
reference counter
PageChunk * m_parent
pointer to parent pool
static unsigned long masks[]