30#include <sys/socket.h>
34#define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
35#define END_NAMESPACE_ROOFIT }
46 class BidirMMapPipeException :
public std::exception
55 static int dostrerror_r(
int err,
char* buf, std::size_t sz,
56 int (*
f)(
int,
char*, std::size_t))
57 {
return f(err, buf, sz); }
59 static int dostrerror_r(
int,
char*, std::size_t,
60 char* (*
f)(
int,
char*, std::size_t));
63 BidirMMapPipeException(
const char* msg,
int err);
65 virtual const char* what()
const noexcept {
return m_buf; }
68 BidirMMapPipeException::BidirMMapPipeException(
const char* msg,
int err)
70 std::size_t msgsz = std::strlen(msg);
72 msgsz = std::min(msgsz, std::size_t(s_sz));
73 std::copy(msg, msg + msgsz, m_buf);
74 if (msgsz < s_sz) { m_buf[msgsz] =
':'; ++msgsz; }
75 if (msgsz < s_sz) { m_buf[msgsz] =
' '; ++msgsz; }
80 dostrerror_r(err, &m_buf[msgsz], s_sz - msgsz, ::strerror_r);
85 int BidirMMapPipeException::dostrerror_r(
int err,
char* buf,
86 std::size_t sz,
char* (*
f)(
int,
char*, std::size_t))
89 char *tmp =
f(err, buf, sz);
90 if (tmp && tmp != buf) {
91 std::strncpy(buf, tmp, sz);
93 if (std::strlen(tmp) > sz - 1)
return ERANGE;
113 unsigned short m_size;
114 unsigned short m_pos;
121 Page() : m_next(0), m_size(0), m_pos(0)
125 assert(std::numeric_limits<unsigned short>::max() >=
129 void setNext(
const Page* p);
133 unsigned short& size() {
return m_size; }
135 unsigned size()
const {
return m_size; }
137 unsigned short& pos() {
return m_pos; }
139 unsigned pos()
const {
return m_pos; }
141 inline unsigned char* begin()
const
142 {
return reinterpret_cast<unsigned char*
>(
const_cast<Page*
>(
this))
145 inline unsigned char* end()
const
146 {
return reinterpret_cast<unsigned char*
>(
const_cast<Page*
>(
this))
149 static unsigned capacity()
152 bool empty()
const {
return !m_size; }
154 bool filled()
const {
return !empty(); }
156 unsigned free()
const {
return capacity() - m_size; }
158 unsigned remaining()
const {
return m_size - m_pos; }
160 bool full()
const {
return !
free(); }
163 void Page::setNext(
const Page* p)
168 const char* p1 =
reinterpret_cast<char*
>(
this);
169 const char* p2 =
reinterpret_cast<const char*
>(p);
170 std::ptrdiff_t tmp = p2 - p1;
176 assert(m_next == tmp);
182 Page* Page::next()
const
184 if (!m_next)
return 0;
185 char* ptmp =
reinterpret_cast<char*
>(
const_cast<Page*
>(
this));
187 return reinterpret_cast<Page*
>(ptmp);
210 typedef BidirMMapPipeException
Exception;
220 typedef std::list<Chunk*> ChunkList;
251 unsigned m_szmap[(maxsz - minsz) / szincr];
258 void updateCurSz(
int sz,
int incr);
260 int nextChunkSz()
const;
262 void putOnFreeList(Chunk* chunk);
264 void release(Chunk* chunk);
292 m_pimpl(other.m_pimpl)
297 if (&other ==
this)
return *
this;
311 assert(pgno < m_pimpl->m_npages);
312 unsigned char* pptr =
315 return reinterpret_cast<Page*
>(pptr);
320 const unsigned char* pptr =
321 reinterpret_cast<const unsigned char*
>(p);
322 const unsigned char* bptr =
324 assert(0 == ((pptr - bptr) %
pagesize()));
325 const unsigned nr = (pptr - bptr) /
pagesize();
326 assert(nr < m_pimpl->m_npages);
333 long pgsz = sysconf(_SC_PAGESIZE);
334 if (-1 == pgsz)
throw Exception(
"sysconf", errno);
335 if (pgsz > 512 && pgsz >
long(
sizeof(Page)))
345 unsigned length,
unsigned nPgPerGroup) :
348 reinterpret_cast<unsigned char*>(
m_begin) + length)),
352 unsigned char* p =
reinterpret_cast<unsigned char*
>(
m_begin);
353 unsigned char* pend =
reinterpret_cast<unsigned char*
>(
m_end);
355 m_freelist.push_back(
reinterpret_cast<void*
>(p));
356 p += nPgPerGroup * PagePool::pagesize();
382 m_freelist.push_front(
reinterpret_cast<void*
>(p[0u]));
386 if (wasempty)
m_parent->putOnFreeList(
this);
406 static bool msgprinted =
false;
408#if defined(MAP_ANONYMOUS)
410#define MYANONFLAG MAP_ANONYMOUS
411#elif defined(MAP_ANON)
413#define MYANONFLAG MAP_ANON
418 void* retVal = ::mmap(0,
len, PROT_READ | PROT_WRITE,
419 MYANONFLAG | MAP_SHARED, -1, 0);
420 if (MAP_FAILED == retVal) {
426 std::cerr <<
" INFO: In " << __func__ <<
" (" <<
427 __FILE__ <<
", line " << __LINE__ <<
428 "): anonymous mmapping works, excellent!" <<
440 int fd =
::open(
"/dev/zero", O_RDWR);
442 throw Exception(
"open /dev/zero", errno);
443 void* retVal = ::mmap(0,
len,
444 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
445 if (MAP_FAILED == retVal) {
454 throw Exception(
"close /dev/zero", errno);
456 std::cerr <<
" INFO: In " << __func__ <<
" (" << __FILE__ <<
457 ", line " << __LINE__ <<
"): mmapping /dev/zero works, "
458 "very good!" << std::endl;
464 char name[] =
"/tmp/BidirMMapPipe-XXXXXX";
467 if (-1 == (fd = ::mkstemp(
name)))
throw Exception(
"mkstemp", errno);
469 if (-1 == ::unlink(
name)) {
475 if (-1 == ::lseek(fd,
len - 1, SEEK_SET)) {
481 if (1 != ::write(fd,
name, 1)) {
487 void* retVal = ::mmap(0,
len,
488 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
489 if (MAP_FAILED == retVal) {
499 ::munmap(retVal,
len);
503 std::cerr <<
" INFO: In " << __func__ <<
" (" << __FILE__ <<
504 ", line " << __LINE__ <<
"): mmapping temporary files "
505 "works, good!" << std::endl;
516 std::cerr <<
"WARNING: In " << __func__ <<
" (" << __FILE__ <<
517 ", line " << __LINE__ <<
"): anonymous mmapping of "
518 "shared buffers failed, falling back to read/write on "
519 " pipes!" << std::endl;
524 if (!retVal)
throw Exception(
"malloc", errno);
538 if (-1 == ::munmap(addr,
len))
560 unsigned char* p0 =
reinterpret_cast<unsigned char*
>(
m_begin);
561 unsigned char* p1 =
reinterpret_cast<unsigned char*
>(p[0u]);
563 unsigned char* p3 =
reinterpret_cast<unsigned char*
>(
m_end);
564 if (p1 != p0) ::mprotect(p0, p1 - p0, PROT_NONE);
565 if (p2 != p3) ::mprotect(p2, p3 - p2, PROT_NONE);
576 PagePool::PagePool(
unsigned nPgPerGroup) :
582 const unsigned mult =
586 const unsigned actual = mult *
587 (desired / mult + bool(desired % mult));
590 std::cerr <<
" INFO: In " << __func__ <<
" (" <<
591 __FILE__ <<
", line " << __LINE__ <<
593 ", subdividing into logical pages of size " <<
601 std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
604 PagePool::~PagePool()
607 for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it)
612 void PagePool::zap(Pages& p)
616 for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it) {
617 if ((*it)->contains(p)) {
624 std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
628 Pages PagePool::pop()
632 const int sz = nextChunkSz();
633 Chunk *
c =
new Chunk(
this,
635 m_chunks.push_front(
c);
649 assert(chunk->empty());
651 ChunkList::iterator it = std::find(
654 throw Exception(
"PagePool::release(PageChunk*)", EINVAL);
657 it = std::find(m_chunks.begin(), m_chunks.end(), chunk);
658 if (m_chunks.end() == it)
659 throw Exception(
"PagePool::release(PageChunk*)", EINVAL);
666 void PagePool::putOnFreeList(
PageChunk* chunk)
668 assert(!chunk->full());
672 void PagePool::updateCurSz(
int sz,
int incr)
674 m_szmap[(sz - minsz) / szincr] += incr;
676 for (
int i = (maxsz - minsz) / szincr; i--; ) {
678 m_cursz += i * szincr;
684 int PagePool::nextChunkSz()
const
688 if (m_chunks.empty()) {
696 if (1 != m_chunks.size()) {
706 if (sz > maxsz) sz = maxsz;
707 if (sz < minsz) sz = minsz;
740 m_pages(pagepool().
pop())
751 m_pages(pagepool().
pop()), m_busylist(0),
m_freelist(0), m_dirtylist(0),
752 m_inpipe(-1), m_outpipe(-1), m_flags(failbit), m_childPid(0),
753 m_parentPid(::getpid())
758 int fds[4] = { -1, -1, -1, -1 };
760 static bool firstcall =
true;
773 for (
unsigned i = 1; i <
TotPages; ++i)
776 if (!useSocketpair) {
778 if (0 != ::pipe(&fds[0]))
throw Exception(
"pipe", errno);
779 if (0 != ::pipe(&fds[2]))
throw Exception(
"pipe", errno);
781 if (0 != ::socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]))
802 fds[0] = fds[3] = -1;
817 for (std::list<BidirMMapPipe*>::iterator it =
s_openpipes.begin();
834 throw Exception(
"handshake: xferraw write", EPIPE);
836 throw Exception(
"handshake: xferraw read", EPIPE);
837 if (
'P' !=
c)
throw Exception(
"handshake", EPIPE);
848 fds[1] = fds[2] = -1;
870 throw Exception(
"handshake: xferraw write", EPIPE);
872 throw Exception(
"handshake: xferraw read", EPIPE);
873 if (
'C' !=
c)
throw Exception(
"handshake", EPIPE);
879 if (-1 == ::fcntl(
m_outpipe, F_GETFD, &fdflags))
881 fdflags |= FD_CLOEXEC;
882 if (-1 == ::fcntl(
m_outpipe, F_SETFD, fdflags))
885 if (-1 == ::fcntl(
m_inpipe, F_GETFD, &fdflags))
887 fdflags |= FD_CLOEXEC;
888 if (-1 == ::fcntl(
m_inpipe, F_SETFD, fdflags))
896 for (
int i = 0; i < 4; ++i)
897 if (-1 != fds[i] && 0 != fds[i])
::close(fds[i]);
928 if (!force)
throw Exception(
"close", errno);
950 while ((err =
::poll(&fds, 1, 1 << 20)) >= 0) {
951 if (fds.revents & (POLLERR | POLLHUP | POLLNVAL))
break;
952 if (fds.revents & POLLIN) {
957 }
while (0 > err && EINTR == errno);
962 if (!force)
throw Exception(
"close", errno);
971 }
catch (std::exception&) {
981 }
while (-1 == tmp && EINTR == errno);
983 if (!force)
throw Exception(
"waitpid", errno);
988 std::list<BidirMMapPipe*>::iterator it = std::find(
1001 ssize_t (*xferfn)(
int,
void*, std::size_t))
1004 unsigned char* buf =
reinterpret_cast<unsigned char*
>(addr);
1006 ssize_t tmp = xferfn(fd, buf, len);
1012 }
else if (0 == tmp) {
1015 }
else if (-1 == tmp) {
1022 if (xferred)
return xferred;
1026#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
1029 std::cerr <<
" ERROR: In " << __func__ <<
" (" <<
1030 __FILE__ <<
", line " << __LINE__ <<
1031 "): expect transfer to block!" << std::endl;
1037 throw Exception(
"xferraw: unexpected return value from read/write",
1047 unsigned char pg =
m_pages[plist];
1052 for (
Page* p = plist; p; p = p->next()) {
1053 if (
sizeof(
Page) + p->size() !=
1056 throw Exception(
"sendpages: short write", EPIPE);
1061 throw Exception(
"sendpages: short write", EPIPE);
1063 }
else { assert(plist); }
1069 unsigned retVal = 0;
1070 Page *plisthead = 0, *plisttail = 0;
1072 plisthead = plisttail =
m_pages[pg];
1077 for (; plisttail; ++retVal) {
1078 Page* p = plisttail;
1081 plisttail = p->next();
1082 if (!p->size())
continue;
1103 fds.events = POLLIN;
1105 unsigned retVal = 0;
1107 int rc =
::poll(&fds, 1, 0);
1109 if (EINTR == errno)
continue;
1112 if (1 == retVal && fds.revents & POLLIN &&
1113 !(fds.revents & (POLLNVAL | POLLERR))) {
1127 for ( ; p; p = p->next()) ++
n;
1136 while (blend && blend->next()) blend = blend->next();
1140 Page *sendlisthead = 0, *sendlisttail = 0;
1150 if (blend) blend->setNext(p);
1161 if (!sendlisthead) sendlisthead = p;
1162 if (sendlisttail) sendlisttail->setNext(p);
1182 sendlisttail->setNext(p);
1190 struct pollfd fds[2];
1192 fds[0].events = fds[0].revents = 0;
1195 fds[1].events = fds[1].revents = 0;
1197 fds[0].events |= POLLIN;
1201 retVal =
::poll(fds, nfds, 0);
1202 if (0 > retVal && EINTR == errno)
1207 bool ok = !(fds[0].revents & (POLLERR | POLLNVAL | POLLHUP));
1209 ok = ok && !(fds[1].revents & (POLLERR | POLLNVAL | POLLHUP));
1211 if (ok && fds[0].revents & POLLIN) {
1213 if (!ret) ok =
false;
1222 throw Exception(
"feedPageLists: poll", errno);
1236 while (dl && dl->next()) dl = dl->next();
1237 if (dl) dl->setNext(p);
1258 if (p)
while (p->next()) p = p->next();
1259 if (!p || p->full()) {
1274 Page *flushlisthead = 0, *flushlisttail = 0;
1277 if (!forcePartialPages && !p->full())
break;
1282 if (!flushlisthead) flushlisthead = p;
1283 if (flushlisttail) flushlisttail->setNext(p);
1286 if (flushlisthead)
sendpages(flushlisthead);
1295 while (
l &&
l->next())
l =
l->next();
1313 retVal += p->size() - p->pos();
1324 bool couldwrite =
false;
1328 fds.events = POLLOUT;
1332 retVal =
::poll(&fds, 1, 0);
1334 if (EINTR == errno)
continue;
1335 throw Exception(
"bytesWritableNonBlocking: poll", errno);
1337 if (1 == retVal && fds.revents & POLLOUT &&
1338 !(fds.revents & (POLLNVAL | POLLERR | POLLHUP)))
1345 unsigned npages = 0;
1351 retVal += p->free();
1356 npages <
FlushThresh || couldwrite); p = p->next()) {
1358 retVal += Page::capacity();
1367 unsigned char *ap =
reinterpret_cast<unsigned char*
>(addr);
1376 unsigned char* pp = p->begin() + p->pos();
1378 std::copy(pp, pp + csz, ap);
1383 assert(p->size() >= p->pos());
1384 if (p->size() == p->pos()) {
1403 const unsigned char *ap =
reinterpret_cast<const unsigned char*
>(addr);
1412 unsigned char* pp = p->begin() + p->size();
1414 std::copy(ap, ap + csz, pp);
1419 assert(p->capacity() >= p->size());
1439 bool canskiptimeout =
false;
1441 std::vector<unsigned>::iterator mit =
masks.begin();
1442 for (PollVector::iterator it = pipes.begin(); pipes.end() != it;
1464 while (dl && dl->next()) dl = dl->next();
1465 if (dl && dl->pos() < Page::capacity())
1469 if (pe.
revents) canskiptimeout =
true;
1472 std::vector<pollfd> fds;
1473 fds.reserve(2 * pipes.size());
1474 std::map<int, PollEntry*> fds2pipes;
1475 for (PollVector::const_iterator it = pipes.begin();
1476 pipes.end() != it; ++it) {
1479 fds2pipes.insert(std::make_pair((tmp.fd = pe.
pipe->
m_inpipe),
1481 tmp.
events = tmp.revents = 0;
1484 tmp.events |= POLLIN;
1488 fds2pipes.insert(std::make_pair(
1500 retVal =
::poll(&fds[0], fds.size(), canskiptimeout ? 0 : timeout);
1502 if (EINTR == errno)
continue;
1508 for (std::vector<pollfd>::iterator it = fds.begin();
1509 fds.end() != it; ++it) {
1515 if (fe.revents & POLLNVAL && fe.fd == pe.
pipe->
m_inpipe)
1519 if (fe.revents & POLLERR && fe.fd == pe.
pipe->
m_inpipe)
1523 if (fe.revents & POLLHUP && fe.fd == pe.
pipe->
m_inpipe)
1527 if ((fe.revents & POLLIN) && fe.fd == pe.
pipe->
m_inpipe &&
1528 !(fe.revents & (POLLNVAL | POLLERR))) {
1534 int tmp =
::poll(&fe, 1, 0);
1535 if (tmp > 0)
goto oncemore;
1537 if (EINTR == errno)
continue;
1549 while (dl && dl->next()) dl = dl->next();
1550 if (dl && dl->pos() < Page::capacity())
1557 mit =
masks.begin();
1558 for (PollVector::iterator it = pipes.begin();
1559 pipes.end() != it; ++it, ++mit)
1560 if ((it->revents &= *mit)) ++npipes;
1566 size_t sz = std::strlen(str);
1568 if (sz)
write(str, sz);
1577 str =
reinterpret_cast<char*
>(
std::realloc(str, sz + 1));
1578 if (!str)
throw Exception(
"realloc", errno);
1579 if (sz)
read(str, sz);
1587 size_t sz = str.size();
1589 write(str.data(), sz);
1600 for (
unsigned char c; sz--; str.push_back(
c)) *
this >>
c;
1607#ifdef TEST_BIDIRMMAPPIPE
1613 while (pipe.
good() && !pipe.
eof()) {
1617 if (!pipe)
return -1;
1618 if (pipe.
eof())
break;
1620 std::cout <<
"[CHILD] : read: " << str << std::endl;
1621 str =
"... early in the morning?";
1625 if (str.empty())
break;
1626 if (!pipe)
return -1;
1627 if (pipe.
eof())
break;
1628 std::cout <<
"[CHILD] : wrote: " << str << std::endl;
1638 ::srand48(::getpid());
1646 for (
int i = 0; i < 5; ++i) {
1648 ::usleep(
int(1e6 * ::drand48()));
1649 std::ostringstream buf;
1650 buf <<
"child pid " << ::getpid() <<
" sends message " << i;
1651 std::string str = buf.str();
1652 std::cout <<
"[CHILD] : " << str << std::endl;
1654 if (!pipe)
return -1;
1655 if (pipe.
eof())
break;
1671 while (pipe && !pipe.
eof()) {
1678 if (pipe.
eof())
break;
1681 if (!std::strlen(str))
break;
1692 while (pipe && !pipe.
eof()) {
1694 if (!std::strlen(str))
break;
1706 for (
unsigned i = 0; i <= 24; ++i) {
1707 str =
reinterpret_cast<char*
>(
std::realloc(str, (1 << i) + 1));
1708 std::memset(str,
'4', 1 << i);
1710 for (
unsigned j = 0; j < 1 << 7; ++j) {
1712 if (!pipe || pipe.
eof()) {
1733 int retVal = childexec(*p);
1740#include <sys/time.h>
1746 std::cout <<
"[PARENT]: simple challenge-response test, "
1747 "one child:" << std::endl;
1749 for (
int i = 0; i < 5; ++i) {
1750 std::string str(
"What shall we do with a drunken sailor...");
1752 if (!*pipe)
return -1;
1753 std::cout <<
"[PARENT]: wrote: " << str << std::endl;
1755 if (!*pipe)
return -1;
1756 std::cout <<
"[PARENT]: read: " << str << std::endl;
1763 int retVal = pipe->
close();
1764 std::cout <<
"[PARENT]: exit status of child: " << retVal <<
1766 if (retVal)
return retVal;
1772 std::cout << std::endl <<
"[PARENT]: polling test, " << nch <<
1773 " children:" << std::endl;
1779 for (
unsigned i = 0; i < nch; ++i) {
1780 std::cout <<
"[PARENT]: spawning child " << i << std::endl;
1781 pipes.push_back(PollEntry(spawnChild(randomchild),
1785 std::cout <<
"[PARENT]: waking up children" << std::endl;
1786 for (
unsigned i = 0; i < nch; ++i)
1788 std::cout <<
"[PARENT]: waiting for events on children's pipes" << std::endl;
1790 while (!pipes.empty()) {
1794 for (std::vector<PollEntry>::iterator it = pipes.begin();
1795 npipes && pipes.end() != it; ) {
1807 std::cout <<
"[PARENT]: Read from pipe " << it->pipe <<
1808 ": " <<
s << std::endl;
1821 std::cerr <<
"[DEBUG]: Event on pipe " << it->pipe <<
1833 int retVal = it->pipe->close();
1834 std::cout <<
"[PARENT]: child exit status: " <<
1835 retVal <<
", number of children still alive: " <<
1836 (pipes.size() - 1) << std::endl;
1837 if (retVal)
return retVal;
1839 it = pipes.erase(it);
1847 std::cout << std::endl <<
"[PARENT]: benchmark: round-trip times vs block size" << std::endl;
1848 for (
unsigned i = 0; i <= 24; ++i) {
1849 char *
s =
new char[1 + (1 << i)];
1850 std::memset(
s,
'A', 1 << i);
1852 const unsigned n = 1 << 7;
1853 double avg = 0., min = 1e42, max = -1e42;
1855 for (
unsigned j =
n; j--; ) {
1857 ::gettimeofday(&
t1, 0);
1859 if (!*pipe || pipe->
eof())
break;
1861 if (!*pipe || pipe->
eof())
break;
1863 ::gettimeofday(&t2, 0);
1864 t2.tv_sec -=
t1.tv_sec;
1865 t2.tv_usec -=
t1.tv_usec;
1866 double dt = 1
e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1867 if (dt < min) min = dt;
1868 if (dt > max) max = dt;
1876 avg *= 1e6; min *= 1e6; max *= 1e6;
1877 int retVal = pipe->
close();
1879 std::cout <<
"[PARENT]: child exited with code " << retVal << std::endl;
1887 std::cout <<
"block size " << std::setw(9) << (1 << i) <<
1888 " avg " << std::setw(7) << avg <<
" us min " <<
1889 std::setw(7) << min <<
" us max " << std::setw(7) << max <<
1890 "us speed " << std::setw(9) <<
1891 2. * (double(1 << i) / double(1 << 20) / (1
e-6 * avg)) <<
1892 " MB/s" << std::endl;
1895 std::cout <<
"[PARENT]: all children had exit code 0" << std::endl;
1899 std::cout << std::endl <<
"[PARENT]: benchmark: raw transfer rate with child as sink" << std::endl;
1900 for (
unsigned i = 0; i <= 24; ++i) {
1901 char *
s =
new char[1 + (1 << i)];
1902 std::memset(
s,
'A', 1 << i);
1904 const unsigned n = 1 << 7;
1905 double avg = 0., min = 1e42, max = -1e42;
1907 for (
unsigned j =
n; j--; ) {
1909 ::gettimeofday(&
t1, 0);
1912 if (!*pipe || pipe->
eof())
break;
1914 ::gettimeofday(&t2, 0);
1915 t2.tv_sec -=
t1.tv_sec;
1916 t2.tv_usec -=
t1.tv_usec;
1917 double dt = 1
e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1918 if (dt < min) min = dt;
1919 if (dt > max) max = dt;
1927 avg *= 1e6; min *= 1e6; max *= 1e6;
1928 int retVal = pipe->
close();
1930 std::cout <<
"[PARENT]: child exited with code " << retVal << std::endl;
1934 std::cout <<
"block size " << std::setw(9) << (1 << i) <<
1935 " avg " << std::setw(7) << avg <<
" us min " <<
1936 std::setw(7) << min <<
" us max " << std::setw(7) << max <<
1937 "us speed " << std::setw(9) <<
1938 (double(1 << i) / double(1 << 20) / (1
e-6 * avg)) <<
1939 " MB/s" << std::endl;
1942 std::cout <<
"[PARENT]: all children had exit code 0" << std::endl;
1946 std::cout << std::endl <<
"[PARENT]: benchmark: raw transfer rate with child as source" << std::endl;
1948 double avg = 0., min = 1e42, max = -1e42;
1949 unsigned n = 0, bsz = 0;
1951 while (*pipe && !pipe->
eof()) {
1953 ::gettimeofday(&
t1, 0);
1956 if (!*pipe || pipe->
eof())
break;
1958 ::gettimeofday(&t2, 0);
1959 t2.tv_sec -=
t1.tv_sec;
1960 t2.tv_usec -=
t1.tv_usec;
1961 double dt = 1
e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1962 if (std::strlen(
s)) {
1964 if (dt < min) min = dt;
1965 if (dt > max) max = dt;
1967 bsz = std::strlen(
s);
1972 avg *= 1e6; min *= 1e6; max *= 1e6;
1974 std::cout <<
"block size " << std::setw(9) << bsz <<
1975 " avg " << std::setw(7) << avg <<
" us min " <<
1976 std::setw(7) << min <<
" us max " << std::setw(7) <<
1977 max <<
"us speed " << std::setw(9) <<
1978 (double(bsz) / double(1 << 20) / (1
e-6 * avg)) <<
1979 " MB/s" << std::endl;
1986 int retVal = pipe->
close();
1987 std::cout <<
"[PARENT]: child exited with code " << retVal << std::endl;
1988 if (retVal)
return retVal;
#define END_NAMESPACE_ROOFIT
#define BEGIN_NAMESPACE_ROOFIT
header file for BidirMMapPipe, a class which forks off a child process and serves as communications c...
ROOT::R::TRInterface & Exception()
Binding & operator=(OUT(*fun)(void))
typedef void((*Func_t)())
unsigned revents
events that happened (or'ed bitmask)
BidirMMapPipe * pipe
pipe of interest
unsigned events
events of interest (or'ed bitmask)
class representing a chunk of pages
PageChunk(const PageChunk &)
forbid copying
BidirMMapPipeException Exception
convenience typedef
unsigned m_nUsedGrp
number of used page groups
unsigned len() const
return length of chunk
unsigned nPagesPerGroup() const
return number of pages per page group
void * m_begin
pointer to start of mmapped area
static unsigned physPgSz()
return the physical page size of the system
static unsigned s_physpgsz
system physical page size
void * m_end
pointer one behind end of mmapped area
void push(const Pages &p)
push a group of pages onto the free list
bool contains(const Pages &p) const
return if p is contained in this PageChunk
PagePool * m_parent
parent page pool
std::list< void * > m_freelist
free pages list
static MMapVariety s_mmapworks
mmap variety that works on this system
MMapVariety
type of mmap support found
@ Copy
mmap doesn't work, have to copy back and forth
@ Unknown
don't know yet what'll work
@ DevZero
mmapping /dev/zero works
@ Anonymous
anonymous mmap works
@ FileBacked
mmapping a temp file works
static void domunmap(void *p, unsigned len)
munmap pages p, len is length of mmapped area in bytes
static unsigned s_pagesize
logical page size (run-time determined)
unsigned m_nPgPerGrp
number of pages per group
static MMapVariety mmapVariety()
return mmap variety support found
Pages pop()
pop a group of pages off the free list
static unsigned pagesize()
return the logical page size
static void * dommap(unsigned len)
mmap pages, len is length of mmapped area in bytes
bool empty() const
return true if no used page groups in this chunk
void zap(Pages &p)
free all pages except for those pointed to by p
static unsigned getPageSize()
determine page size at run time
handle class for a number of Pages
Pages()
default constructor
static unsigned pagesize()
return page size
impl * m_pimpl
pointer to implementation
Pages & operator=(const Pages &other)
assignment operator
void swap(Pages &other)
swap with other's contents
Page * page(unsigned pgno) const
return page number pageno
unsigned npages() const
return number of pages accessible
unsigned pageno(Page *p) const
perform page to page number mapping
BidirMMapPipe creates a bidirectional channel between the current process and a child it forks.
void purge()
purge buffered data waiting to be read and/or written
static BidirMMapPipe_impl::PagePool * s_pagepool
pool of mmapped pages
void feedPageLists(Page *plist)
"feed" the busy and free lists with a list of pages
static std::list< BidirMMapPipe * > s_openpipes
list of open BidirMMapPipes
int close()
flush buffers, close pipe
@ PagesPerEnd
pages per pipe end
@ TotPages
pages shared (child + parent)
@ FlushThresh
flush threshold
BidirMMapPipe_impl::Page Page
convenience typedef for Page
BidirMMapPipe_impl::Pages m_pages
mmapped pages
bool good() const
status of stream is good
Page * m_dirtylist
linked list: dirty pages (data to be sent)
unsigned recvpages()
receive a pages from the other end (may block), queue them
BidirMMapPipe & operator>>(char *(&str))
read a C-style string
static pthread_mutex_t s_openpipesmutex
protects s_openpipes
unsigned recvpages_nonblock()
receive pages from other end (non-blocking)
~BidirMMapPipe()
destructor
int m_inpipe
pipe end from which data may be read
bool isChild() const
return if this end of the pipe is the child end
void doFlush(bool forcePartialPages=true)
perform the flush
static void teardownall(void)
cleanup routine - at exit, we want our children to get a SIGTERM...
size_type bytesWritableNonBlocking()
number of bytes that can be written without blocking
static BidirMMapPipe_impl::PagePool & pagepool()
return page pool
bool eof() const
true if end-of-file
size_type bytesReadableNonBlocking()
number of bytes that can be read without blocking
static int debugflag()
return the current setting of the debug flag
bool isParent() const
return if this end of the pipe is the parent end
std::size_t size_type
type used to represent sizes
int doClose(bool force, bool holdlock=false)
close the pipe (no flush if forced)
bool bad() const
true on I/O error
@ WriteInvalid
write end of pipe invalid
@ ReadEndOfFile
read pipe in end-of-file state
@ WriteError
pipe error Write end
@ ReadError
pipe error read end
@ Readable
pipe has data for reading
@ None
nothing special on this pipe
@ ReadInvalid
read end of pipe invalid
@ WriteEndOfFile
write pipe in end-of-file state
@ Writable
pipe can be written to
int m_flags
flags (e.g. end of file)
Page * m_freelist
linked list: free pages
Page * busypage()
get a busy page to read data from (may block)
Page * dirtypage()
get a dirty page to write data to (may block)
void sendpages(Page *plist)
send page(s) to the other end (may block)
BidirMMapPipe & operator<<(const char *str)
write a C-style string
static size_type xferraw(int fd, void *addr, size_type len, ssize_t(*xferfn)(int, void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
size_type read(void *addr, size_type sz)
read from pipe
static int s_debugflag
debug flag
bool closed() const
true if closed
@ eofbit
end of file reached
@ failbit
logical failure (e.g. pipe closed)
@ exceptionsbit
error reporting with exceptions
BidirMMapPipe_impl::BidirMMapPipeException Exception
convenience typedef for BidirMMapPipeException
size_type write(const void *addr, size_type sz)
wirte to pipe
std::vector< PollEntry > PollVector
convenience typedef for poll() interface
void flush()
flush buffers with unwritten data
static unsigned s_pagepoolrefcnt
page pool reference counter
Page * m_busylist
linked list: busy pages (data to be read)
static int poll(PollVector &pipes, int timeout)
poll a set of pipes for events (ready to read from, ready to write to, error)
int m_outpipe
pipe end to which data may be written
void markPageDirty(Page *p)
put on dirty pages list
BidirMMapPipe(bool useExceptions=true, bool useSocketpair=false)
constructor (forks!)
static unsigned lenPageList(const Page *list)
return length of a page list
pid_t m_childPid
pid of the child (zero if we're child)
int main(int argc, char **argv)
namespace for implementation details of BidirMMapPipe
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
static constexpr double s
unsigned char m_npages
length in pages
Page * m_pages
pointer to first page
unsigned m_refcnt
reference counter
PageChunk * m_parent
pointer to parent pool
static unsigned long masks[]