30 #include <sys/socket.h>
34 #define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
35 #define END_NAMESPACE_ROOFIT }
46 class BidirMMapPipeException :
public std::exception
55 static int dostrerror_r(
int err,
char* buf,
std::size_t sz,
57 {
return f(err, buf, sz); }
63 BidirMMapPipeException(
const char* msg,
int err);
65 virtual const char*
what()
const throw() {
return m_buf; }
68 BidirMMapPipeException::BidirMMapPipeException(
const char* msg,
int err)
73 std::copy(msg, msg + msgsz, m_buf);
74 if (msgsz < s_sz) { m_buf[msgsz] =
':'; ++msgsz; }
75 if (msgsz < s_sz) { m_buf[msgsz] =
' '; ++msgsz; }
80 dostrerror_r(err, &m_buf[msgsz], s_sz - msgsz, ::strerror_r);
85 int BidirMMapPipeException::dostrerror_r(
int err,
char* buf,
89 char *tmp =
f(err, buf, sz);
90 if (tmp && tmp != buf) {
91 std::strncpy(buf, tmp, sz);
93 if (std::strlen(tmp) > sz - 1)
return ERANGE;
113 unsigned short m_size;
114 unsigned short m_pos;
119 {
return *
reinterpret_cast<Page*
>(0); }
122 Page() : m_next(0), m_size(0), m_pos(0)
130 void setNext(
const Page* p);
134 unsigned short& size() {
return m_size; }
136 unsigned size()
const {
return m_size; }
138 unsigned short& pos() {
return m_pos; }
140 unsigned pos()
const {
return m_pos; }
142 inline unsigned char* begin()
const
143 {
return reinterpret_cast<unsigned char*
>(
const_cast<Page*
>(
this))
146 inline unsigned char* end()
const
147 {
return reinterpret_cast<unsigned char*
>(
const_cast<Page*
>(
this))
150 static unsigned capacity()
153 bool empty()
const {
return !m_size; }
155 bool filled()
const {
return !empty(); }
157 unsigned free()
const {
return capacity() - m_size; }
159 unsigned remaining()
const {
return m_size - m_pos; }
161 bool full()
const {
return !
free(); }
164 void Page::setNext(
const Page* p)
169 const char*
p1 =
reinterpret_cast<char*
>(
this);
170 const char*
p2 =
reinterpret_cast<const char*
>(p);
171 std::ptrdiff_t tmp = p2 -
p1;
185 if (!m_next)
return 0;
186 char* ptmp =
reinterpret_cast<char*
>(
const_cast<Page*
>(
this));
188 return reinterpret_cast<Page*
>(ptmp);
206 typedef BidirMMapPipeException
Exception;
216 typedef std::list<Chunk*> ChunkList;
247 unsigned m_szmap[(maxsz - minsz) / szincr];
254 void updateCurSz(
int sz,
int incr);
256 int nextChunkSz()
const;
258 void putOnFreeList(Chunk* chunk);
260 void release(Chunk* chunk);
287 m_pimpl(other.m_pimpl)
292 if (&other ==
this)
return *
this;
306 assert(pgno < m_pimpl->m_npages);
307 unsigned char* pptr =
310 return reinterpret_cast<Page*
>(pptr);
315 const unsigned char* pptr =
316 reinterpret_cast<const unsigned char*
>(p);
317 const unsigned char* bptr =
320 const unsigned nr = (pptr - bptr) /
pagesize();
321 assert(nr < m_pimpl->m_npages);
328 long pgsz = sysconf(_SC_PAGESIZE);
329 if (-1 == pgsz)
throw Exception(
"sysconf", errno);
330 if (pgsz > 512 && pgsz >
long(
sizeof(Page)))
340 unsigned length,
unsigned nPgPerGroup) :
343 reinterpret_cast<unsigned char*>(
m_begin) + length)),
347 unsigned char* p =
reinterpret_cast<unsigned char*
>(
m_begin);
348 unsigned char* pend =
reinterpret_cast<unsigned char*
>(
m_end);
350 m_freelist.push_back(reinterpret_cast<void*>(p));
351 p += nPgPerGroup * PagePool::pagesize();
377 m_freelist.push_front(reinterpret_cast<void*>(p[0u]));
381 if (wasempty)
m_parent->putOnFreeList(
this);
401 static bool msgprinted =
false;
403 #if defined(MAP_ANONYMOUS)
405 #define MYANONFLAG MAP_ANONYMOUS
406 #elif defined(MAP_ANON)
408 #define MYANONFLAG MAP_ANON
413 void* retVal = ::mmap(0, len, PROT_READ | PROT_WRITE,
414 MYANONFLAG | MAP_SHARED, -1, 0);
415 if (MAP_FAILED == retVal) {
421 std::cerr <<
" INFO: In " << __func__ <<
" (" <<
422 __FILE__ <<
", line " << __LINE__ <<
423 "): anonymous mmapping works, excellent!" <<
435 int fd =
::open(
"/dev/zero", O_RDWR);
437 throw Exception(
"open /dev/zero", errno);
438 void* retVal = ::mmap(0, len,
439 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
440 if (MAP_FAILED == retVal) {
448 if (-1 == ::
close(fd))
449 throw Exception(
"close /dev/zero", errno);
451 std::cerr <<
" INFO: In " << __func__ <<
" (" << __FILE__ <<
452 ", line " << __LINE__ <<
"): mmapping /dev/zero works, "
453 "very good!" << std::endl;
459 char name[] =
"/tmp/BidirMMapPipe-XXXXXX";
462 if (-1 == (fd = ::mkstemp(name)))
throw Exception(
"mkstemp", errno);
464 if (-1 == ::unlink(name)) {
470 if (-1 == ::lseek(fd, len - 1, SEEK_SET)) {
476 if (1 != ::
write(fd, name, 1)) {
482 void* retVal = ::mmap(0, len,
483 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
484 if (MAP_FAILED == retVal) {
492 if (-1 == ::
close(fd)) {
494 ::munmap(retVal, len);
498 std::cerr <<
" INFO: In " << __func__ <<
" (" << __FILE__ <<
499 ", line " << __LINE__ <<
"): mmapping temporary files "
500 "works, good!" << std::endl;
511 std::cerr <<
"WARNING: In " << __func__ <<
" (" << __FILE__ <<
512 ", line " << __LINE__ <<
"): anonymous mmapping of "
513 "shared buffers failed, falling back to read/write on "
514 " pipes!" << std::endl;
519 if (!retVal)
throw Exception(
"malloc", errno);
533 if (-1 == ::munmap(addr, len))
555 unsigned char* p0 =
reinterpret_cast<unsigned char*
>(
m_begin);
556 unsigned char*
p1 =
reinterpret_cast<unsigned char*
>(p[0u]);
558 unsigned char*
p3 =
reinterpret_cast<unsigned char*
>(
m_end);
559 if (p1 != p0) ::mprotect(p0, p1 - p0, PROT_NONE);
560 if (p2 != p3) ::mprotect(p2, p3 - p2, PROT_NONE);
571 PagePool::PagePool(
unsigned nPgPerGroup) :
573 {
std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0); }
575 PagePool::~PagePool()
578 for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it)
583 void PagePool::zap(Pages& p)
587 for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it) {
588 if ((*it)->contains(p)) {
595 std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
599 Pages PagePool::pop()
603 const int sz = nextChunkSz();
604 Chunk *c =
new Chunk(
this,
606 m_chunks.push_front(c);
622 ChunkList::iterator it = std::find(
625 throw Exception(
"PagePool::release(PageChunk*)", EINVAL);
628 it = std::find(m_chunks.begin(), m_chunks.end(), chunk);
629 if (m_chunks.end() == it)
630 throw Exception(
"PagePool::release(PageChunk*)", EINVAL);
637 void PagePool::putOnFreeList(
PageChunk* chunk)
643 void PagePool::updateCurSz(
int sz,
int incr)
645 m_szmap[(sz - minsz) / szincr] += incr;
647 for (
int i = (maxsz - minsz) / szincr; i--; ) {
649 m_cursz += i * szincr;
655 int PagePool::nextChunkSz()
const
659 if (m_chunks.empty()) {
667 if (1 != m_chunks.size()) {
677 if (sz > maxsz) sz = maxsz;
678 if (sz < minsz) sz = minsz;
684 pthread_mutex_t BidirMMapPipe::s_openpipesmutex = PTHREAD_MUTEX_INITIALIZER;
686 BidirMMapPipe_impl::PagePool* BidirMMapPipe::s_pagepool = 0;
687 unsigned BidirMMapPipe::s_pagepoolrefcnt = 0;
688 int BidirMMapPipe::s_debugflag = 0;
711 m_pages(pagepool().
pop())
722 m_pages(pagepool().
pop()), m_busylist(0),
m_freelist(0), m_dirtylist(0),
723 m_inpipe(-1), m_outpipe(-1), m_flags(failbit), m_childPid(0),
724 m_parentPid(::getpid())
729 int fds[4] = { -1, -1, -1, -1 };
731 static bool firstcall =
true;
744 for (
unsigned i = 1; i <
TotPages; ++i)
747 if (!useSocketpair) {
749 if (0 != ::pipe(&fds[0]))
throw Exception(
"pipe", errno);
750 if (0 != ::pipe(&fds[2]))
throw Exception(
"pipe", errno);
752 if (0 != ::socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]))
768 if (-1 == ::
close(fds[0]) || (-1 == ::
close(fds[3]))) {
773 fds[0] = fds[3] = -1;
778 if (-1 == ::
close(fds[0])) {
788 for (std::list<BidirMMapPipe*>::iterator it =
s_openpipes.begin();
805 throw Exception(
"handshake: xferraw write", EPIPE);
807 throw Exception(
"handshake: xferraw read", EPIPE);
808 if (
'P' != c)
throw Exception(
"handshake", EPIPE);
814 if (-1 == ::
close(fds[1]) || -1 == ::
close(fds[2])) {
819 fds[1] = fds[2] = -1;
824 if (-1 == ::
close(fds[1])) {
841 throw Exception(
"handshake: xferraw write", EPIPE);
843 throw Exception(
"handshake: xferraw read", EPIPE);
844 if (
'C' != c)
throw Exception(
"handshake", EPIPE);
850 if (-1 == ::fcntl(
m_outpipe, F_GETFD, &fdflags))
852 fdflags |= FD_CLOEXEC;
853 if (-1 == ::fcntl(
m_outpipe, F_SETFD, fdflags))
856 if (-1 == ::fcntl(
m_inpipe, F_GETFD, &fdflags))
858 fdflags |= FD_CLOEXEC;
859 if (-1 == ::fcntl(
m_inpipe, F_SETFD, fdflags))
867 for (
int i = 0; i < 4; ++i)
868 if (-1 != fds[i] && 0 != fds[i])
::close(fds[i]);
899 if (!force)
throw Exception(
"close", errno);
921 while ((err = ::
poll(&fds, 1, 1 << 20)) >= 0) {
922 if (fds.revents & (POLLERR | POLLHUP | POLLNVAL))
break;
923 if (fds.revents & POLLIN) {
928 }
while (0 > err && EINTR == errno);
933 if (!force)
throw Exception(
"close", errno);
942 }
catch (
const std::exception& e) {
952 }
while (-1 == tmp && EINTR == errno);
954 if (!force)
throw Exception(
"waitpid", errno);
959 std::list<BidirMMapPipe*>::iterator it = std::find(
975 unsigned char* buf =
reinterpret_cast<unsigned char*
>(addr);
977 ssize_t tmp = xferfn(fd, buf, len);
983 }
else if (0 == tmp) {
986 }
else if (-1 == tmp) {
993 if (xferred)
return xferred;
997 #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
1000 std::cerr <<
" ERROR: In " << __func__ <<
" (" <<
1001 __FILE__ <<
", line " << __LINE__ <<
1002 "): expect transfer to block!" << std::endl;
1008 throw Exception(
"xferraw: unexpected return value from read/write",
1018 unsigned char pg =
m_pages[plist];
1023 for (
Page* p = plist; p; p = p->next()) {
1024 if (
sizeof(
Page) + p->size() !=
1027 throw Exception(
"sendpages: short write", EPIPE);
1032 throw Exception(
"sendpages: short write", EPIPE);
1034 }
else {
assert(plist); }
1040 unsigned retVal = 0;
1041 Page *plisthead = 0, *plisttail = 0;
1043 plisthead = plisttail =
m_pages[pg];
1048 for (; plisttail; ++retVal) {
1049 Page* p = plisttail;
1052 plisttail = p->next();
1053 if (!p->size())
continue;
1074 fds.events = POLLIN;
1076 unsigned retVal = 0;
1078 int rc =
::poll(&fds, 1, 0);
1080 if (EINTR == errno)
continue;
1083 if (1 == retVal && fds.revents & POLLIN &&
1084 !(fds.revents & (POLLNVAL | POLLERR))) {
1098 for ( ; p; p = p->next()) ++n;
1107 while (blend && blend->next()) blend = blend->next();
1111 Page *sendlisthead = 0, *sendlisttail = 0;
1121 if (blend) blend->setNext(p);
1132 if (!sendlisthead) sendlisthead = p;
1133 if (sendlisttail) sendlisttail->setNext(p);
1153 sendlisttail->setNext(p);
1161 struct pollfd fds[2];
1163 fds[0].events = fds[0].revents = 0;
1166 fds[1].events = fds[1].revents = 0;
1168 fds[0].events |= POLLIN;
1172 retVal =
::poll(fds, nfds, 0);
1173 if (0 > retVal && EINTR == errno)
1178 bool ok = !(fds[0].revents & (POLLERR | POLLNVAL | POLLHUP));
1180 ok = ok && !(fds[1].revents & (POLLERR | POLLNVAL | POLLHUP));
1182 if (ok && fds[0].revents & POLLIN) {
1184 if (!ret) ok =
false;
1193 throw Exception(
"feedPageLists: poll", errno);
1207 while (dl && dl->next()) dl = dl->next();
1208 if (dl) dl->setNext(p);
1229 if (p)
while (p->next()) p = p->next();
1230 if (!p || p->full()) {
1245 Page *flushlisthead = 0, *flushlisttail = 0;
1248 if (!forcePartialPages && !p->full())
break;
1253 if (!flushlisthead) flushlisthead = p;
1254 if (flushlisttail) flushlisttail->setNext(p);
1257 if (flushlisthead)
sendpages(flushlisthead);
1266 while (l && l->next()) l = l->next();
1284 retVal += p->size() - p->pos();
1295 bool couldwrite =
false;
1299 fds.events = POLLOUT;
1303 retVal =
::poll(&fds, 1, 0);
1305 if (EINTR == errno)
continue;
1306 throw Exception(
"bytesWritableNonBlocking: poll", errno);
1308 if (1 == retVal && fds.revents & POLLOUT &&
1309 !(fds.revents & (POLLNVAL | POLLERR | POLLHUP)))
1316 unsigned npages = 0;
1322 retVal += p->free();
1327 npages <
FlushThresh || couldwrite); p = p->next()) {
1329 retVal += Page::capacity();
1338 unsigned char *ap =
reinterpret_cast<unsigned char*
>(addr);
1347 unsigned char* pp = p->begin() + p->pos();
1349 std::copy(pp, pp + csz, ap);
1354 assert(p->size() >= p->pos());
1355 if (p->size() == p->pos()) {
1374 const unsigned char *ap =
reinterpret_cast<const unsigned char*
>(addr);
1383 unsigned char* pp = p->begin() + p->size();
1385 std::copy(ap, ap + csz, pp);
1390 assert(p->capacity() >= p->size());
1410 bool canskiptimeout =
false;
1412 std::vector<unsigned>::iterator mit = masks.begin();
1413 for (PollVector::iterator it = pipes.begin(); pipes.end() != it;
1415 PollEntry& pe = *it;
1435 while (dl && dl->next()) dl = dl->next();
1436 if (dl && dl->pos() < Page::capacity())
1440 if (pe.
revents) canskiptimeout =
true;
1443 std::vector<pollfd> fds;
1444 fds.reserve(2 * pipes.size());
1445 std::map<int, PollEntry*> fds2pipes;
1446 for (PollVector::const_iterator it = pipes.begin();
1447 pipes.end() != it; ++it) {
1448 const PollEntry& pe = *it;
1450 fds2pipes.insert(std::make_pair((tmp.fd = pe.
pipe->
m_inpipe),
1451 const_cast<PollEntry*>(&pe)));
1452 tmp.events = tmp.revents = 0;
1455 tmp.events |= POLLIN;
1459 fds2pipes.insert(std::make_pair(
1461 const_cast<PollEntry*>(&pe)));
1471 retVal =
::poll(&fds[0], fds.size(), canskiptimeout ? 0 : timeout);
1473 if (EINTR == errno)
continue;
1479 for (std::vector<pollfd>::iterator it = fds.begin();
1480 fds.end() != it; ++it) {
1484 PollEntry& pe = *fds2pipes[fe.fd];
1486 if (fe.revents & POLLNVAL && fe.fd == pe.
pipe->
m_inpipe)
1490 if (fe.revents & POLLERR && fe.fd == pe.
pipe->
m_inpipe)
1494 if (fe.revents & POLLHUP && fe.fd == pe.
pipe->
m_inpipe)
1498 if ((fe.revents & POLLIN) && fe.fd == pe.
pipe->
m_inpipe &&
1499 !(fe.revents & (POLLNVAL | POLLERR))) {
1505 int tmp =
::poll(&fe, 1, 0);
1506 if (tmp > 0)
goto oncemore;
1508 if (EINTR == errno)
continue;
1520 while (dl && dl->next()) dl = dl->next();
1521 if (dl && dl->pos() < Page::capacity())
1528 mit = masks.begin();
1529 for (PollVector::iterator it = pipes.begin();
1530 pipes.end() != it; ++it, ++mit)
1531 if ((it->revents &= *mit)) ++npipes;
1537 size_t sz = std::strlen(str);
1539 if (sz)
write(str, sz);
1548 str =
reinterpret_cast<char*
>(std::realloc(str, sz + 1));
1549 if (!str)
throw Exception(
"realloc", errno);
1550 if (sz)
read(str, sz);
1558 size_t sz = str.size();
1560 write(str.data(), sz);
1571 for (
unsigned char c; sz--; str.push_back(c)) *
this >> c;
1578 #ifdef TEST_BIDIRMMAPPIPE
1584 while (pipe.
good() && !pipe.
eof()) {
1588 if (!pipe)
return -1;
1589 if (pipe.
eof())
break;
1591 std::cout <<
"[CHILD] : read: " << str << std::endl;
1592 str =
"... early in the morning?";
1596 if (str.empty())
break;
1597 if (!pipe)
return -1;
1598 if (pipe.
eof())
break;
1599 std::cout <<
"[CHILD] : wrote: " << str << std::endl;
1609 ::srand48(::getpid());
1617 for (
int i = 0; i < 5; ++i) {
1619 ::usleep(
int(1e6 * ::drand48()));
1620 std::ostringstream buf;
1621 buf <<
"child pid " << ::getpid() <<
" sends message " << i;
1622 std::string str = buf.str();
1623 std::cout <<
"[CHILD] : " << str << std::endl;
1625 if (!pipe)
return -1;
1626 if (pipe.
eof())
break;
1642 while (pipe && !pipe.
eof()) {
1649 if (pipe.
eof())
break;
1652 if (!std::strlen(str))
break;
1663 while (pipe && !pipe.
eof()) {
1665 if (!std::strlen(str))
break;
1677 for (
unsigned i = 0; i <= 24; ++i) {
1678 str =
reinterpret_cast<char*
>(std::realloc(str, (1 << i) + 1));
1679 std::memset(str,
'4', 1 << i);
1681 for (
unsigned j = 0; j < 1 << 7; ++j) {
1683 if (!pipe || pipe.
eof()) {
1704 int retVal = childexec(*p);
1711 #include <sys/time.h>
1717 std::cout <<
"[PARENT]: simple challenge-response test, "
1718 "one child:" << std::endl;
1720 for (
int i = 0; i < 5; ++i) {
1721 std::string str(
"What shall we do with a drunken sailor...");
1723 if (!*pipe)
return -1;
1724 std::cout <<
"[PARENT]: wrote: " << str << std::endl;
1726 if (!*pipe)
return -1;
1727 std::cout <<
"[PARENT]: read: " << str << std::endl;
1734 int retVal = pipe->
close();
1735 std::cout <<
"[PARENT]: exit status of child: " << retVal <<
1737 if (retVal)
return retVal;
1743 std::cout << std::endl <<
"[PARENT]: polling test, " << nch <<
1744 " children:" << std::endl;
1750 for (
unsigned i = 0; i < nch; ++i) {
1751 std::cout <<
"[PARENT]: spawning child " << i << std::endl;
1752 pipes.push_back(PollEntry(spawnChild(randomchild),
1756 std::cout <<
"[PARENT]: waking up children" << std::endl;
1757 for (
unsigned i = 0; i < nch; ++i)
1758 *pipes[i].pipe <<
"" << BidirMMapPipe::flush;
1759 std::cout <<
"[PARENT]: waiting for events on children's pipes" << std::endl;
1761 while (!pipes.empty()) {
1765 for (std::vector<PollEntry>::iterator it = pipes.begin();
1766 npipes && pipes.end() != it; ) {
1778 std::cout <<
"[PARENT]: Read from pipe " << it->pipe <<
1779 ": " << s << std::endl;
1784 *(it->pipe) <<
"" << BidirMMapPipe::flush;
1792 std::cerr <<
"[DEBUG]: Event on pipe " << it->pipe <<
1804 int retVal = it->pipe->close();
1805 std::cout <<
"[PARENT]: child exit status: " <<
1806 retVal <<
", number of children still alive: " <<
1807 (pipes.size() - 1) << std::endl;
1808 if (retVal)
return retVal;
1810 it = pipes.erase(it);
1818 std::cout << std::endl <<
"[PARENT]: benchmark: round-trip times vs block size" << std::endl;
1819 for (
unsigned i = 0; i <= 24; ++i) {
1820 char *s =
new char[1 + (1 << i)];
1821 std::memset(s,
'A', 1 << i);
1823 const unsigned n = 1 << 7;
1824 double avg = 0.,
min = 1e42, max = -1e42;
1826 for (
unsigned j = n; j--; ) {
1828 ::gettimeofday(&
t1, 0);
1830 if (!*pipe || pipe->
eof())
break;
1832 if (!*pipe || pipe->
eof())
break;
1834 ::gettimeofday(&t2, 0);
1835 t2.tv_sec -=
t1.tv_sec;
1836 t2.tv_usec -=
t1.tv_usec;
1837 double dt = 1e-6 *
double(t2.tv_usec) +
double(t2.tv_sec);
1839 if (dt > max) max = dt;
1847 avg *= 1e6;
min *= 1e6; max *= 1e6;
1848 int retVal = pipe->
close();
1850 std::cout <<
"[PARENT]: child exited with code " << retVal << std::endl;
1857 std::cout <<
"block size " << std::setw(9) << (1 << i) <<
1858 " avg " << std::setw(7) << avg <<
" us min " <<
1859 std::setw(7) <<
min <<
" us max " << std::setw(7) << max <<
1860 "us speed " << std::setw(9) <<
1861 2. * (
double(1 << i) /
double(1 << 20) / (1e-6 * avg)) <<
1862 " MB/s" << std::endl;
1865 std::cout <<
"[PARENT]: all children had exit code 0" << std::endl;
1869 std::cout << std::endl <<
"[PARENT]: benchmark: raw transfer rate with child as sink" << std::endl;
1870 for (
unsigned i = 0; i <= 24; ++i) {
1871 char *s =
new char[1 + (1 << i)];
1872 std::memset(s,
'A', 1 << i);
1874 const unsigned n = 1 << 7;
1875 double avg = 0.,
min = 1e42, max = -1e42;
1877 for (
unsigned j = n; j--; ) {
1879 ::gettimeofday(&
t1, 0);
1882 if (!*pipe || pipe->
eof())
break;
1884 ::gettimeofday(&t2, 0);
1885 t2.tv_sec -=
t1.tv_sec;
1886 t2.tv_usec -=
t1.tv_usec;
1887 double dt = 1e-6 *
double(t2.tv_usec) +
double(t2.tv_sec);
1889 if (dt > max) max = dt;
1897 avg *= 1e6;
min *= 1e6; max *= 1e6;
1898 int retVal = pipe->
close();
1900 std::cout <<
"[PARENT]: child exited with code " << retVal << std::endl;
1904 std::cout <<
"block size " << std::setw(9) << (1 << i) <<
1905 " avg " << std::setw(7) << avg <<
" us min " <<
1906 std::setw(7) <<
min <<
" us max " << std::setw(7) << max <<
1907 "us speed " << std::setw(9) <<
1909 " MB/s" << std::endl;
1912 std::cout <<
"[PARENT]: all children had exit code 0" << std::endl;
1916 std::cout << std::endl <<
"[PARENT]: benchmark: raw transfer rate with child as source" << std::endl;
1918 double avg = 0.,
min = 1e42, max = -1e42;
1919 unsigned n = 0, bsz = 0;
1921 while (*pipe && !pipe->
eof()) {
1923 ::gettimeofday(&
t1, 0);
1926 if (!*pipe || pipe->
eof())
break;
1928 ::gettimeofday(&t2, 0);
1929 t2.tv_sec -=
t1.tv_sec;
1930 t2.tv_usec -=
t1.tv_usec;
1931 double dt = 1e-6 *
double(t2.tv_usec) +
double(t2.tv_sec);
1932 if (std::strlen(s)) {
1935 if (dt > max) max = dt;
1937 bsz = std::strlen(s);
1942 avg *= 1e6;
min *= 1e6; max *= 1e6;
1944 std::cout <<
"block size " << std::setw(9) << bsz <<
1945 " avg " << std::setw(7) << avg <<
" us min " <<
1946 std::setw(7) <<
min <<
" us max " << std::setw(7) <<
1947 max <<
"us speed " << std::setw(9) <<
1949 " MB/s" << std::endl;
1956 int retVal = pipe->
close();
1957 std::cout <<
"[PARENT]: child exited with code " << retVal << std::endl;
1958 if (retVal)
return retVal;
1964 #endif // TEST_BIDIRMMAPPIPE
std::size_t size_type
type used to represent sizes
PagePool * m_parent
parent page pool
unsigned char m_npages
length in pages
static size_type xferraw(int fd, void *addr, size_type len, ssize_t(*xferfn)(int, void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
BidirMMapPipe * pipe
pipe of interest
bool eof() const
true if end-of-file
static Vc_ALWAYS_INLINE int_v min(const int_v &x, const int_v &y)
error reporting with exceptions
static double p3(double t, double a, double b, double c, double d)
unsigned revents
events that happened (or'ed bitmask)
size_type write(const void *addr, size_type sz)
wirte to pipe
read pipe in end-of-file state
double write(int n, const std::string &file_name, const std::string &vector_type, int compress=0)
writing
ClassImp(TSeqCollection) Int_t TSeqCollection TIter next(this)
Return index of object in collection.
bool isChild() const
return if this end of the pipe is the child end
handle class for a number of Pages
header file for BidirMMapPipe, a class which forks off a child process and serves as communications c...
PageChunk(const PageChunk &)
forbid copying
impl * m_pimpl
pointer to implementation
ROOT::R::TRInterface & Exception()
unsigned m_refcnt
reference counter
Page * m_freelist
linked list: free pages
bool bad() const
true on I/O error
write pipe in end-of-file state
unsigned npages() const
return number of pages accessible
don't know yet what'll work
BidirMMapPipeException Exception
convenience typedef
namespace for implementation details of BidirMMapPipe
unsigned events
events of interest (or'ed bitmask)
BidirMMapPipe & operator<<(const char *str)
write a C-style string
void doFlush(bool forcePartialPages=true)
perform the flush
static MMapVariety s_mmapworks
mmap variety that works on this system
unsigned m_nUsedGrp
number of used page groups
pid_t m_childPid
pid of the child (zero if we're child)
void markPageDirty(Page *p)
put on dirty pages list
~BidirMMapPipe()
destructor
unsigned nPagesPerGroup() const
return number of pages per page group
mmapping a temp file works
Pages & operator=(const Pages &other)
assignment operator
class representing a chunk of pages
Page * m_busylist
linked list: busy pages (data to be read)
write end of pipe invalid
bool good() const
status of stream is good
size_type read(void *addr, size_type sz)
read from pipe
Vc_ALWAYS_INLINE void free(T *p)
Frees memory that was allocated with Vc::malloc.
static MMapVariety mmapVariety()
return mmap variety support found
void flush()
flush buffers with unwritten data
BidirMMapPipe_impl::Page Page
convenience typedef for Page
static unsigned pagesize()
return page size
Page * m_dirtylist
linked list: dirty pages (data to be sent)
void sendpages(Page *plist)
send page(s) to the other end (may block)
static double p2(double t, double a, double b, double c)
unsigned len() const
return length of chunk
Page * page(unsigned pgno) const
return page number pageno
Page * busypage()
get a busy page to read data from (may block)
if(pyself &&pyself!=Py_None)
size_type bytesReadableNonBlocking()
number of bytes that can be read without blocking
Pages()
default constructor
int close()
flush buffers, close pipe
void swap(Pages &other)
swap with other's contents
Page * dirtypage()
get a dirty page to write data to (may block)
void purge()
purge buffered data waiting to be read and/or written
std::vector< PollEntry > PollVector
convenience typedef for poll() interface
static int debugflag()
return the current setting of the debug flag
unsigned m_nPgPerGrp
number of pages per group
static unsigned lenPageList(const Page *list)
return length of a page list
Pages pop()
pop a group of pages off the free list
logical failure (e.g. pipe closed)
Double_t length(const TVector2 &v)
static BidirMMapPipe_impl::PagePool & pagepool()
return page pool
int m_outpipe
pipe end to which data may be written
bool contains(const Pages &p) const
return if p is contained in this PageChunk
void * m_begin
pointer to start of mmapped area
unsigned recvpages_nonblock()
receive pages from other end (non-blocking)
int doClose(bool force, bool holdlock=false)
close the pipe (no flush if forced)
#define END_NAMESPACE_ROOFIT
BidirMMapPipe creates a bidirectional channel between the current process and a child it forks...
static double p1(double t, double a, double b)
static void domunmap(void *p, unsigned len)
munmap pages p, len is length of mmapped area in bytes
static pthread_mutex_t s_openpipesmutex
protects s_openpipes
size_type bytesWritableNonBlocking()
number of bytes that can be written without blocking
bool isParent() const
return if this end of the pipe is the parent end
static int poll(PollVector &pipes, int timeout)
poll a set of pipes for events (ready to read from, ready to write to, error)
static BidirMMapPipe_impl::PagePool * s_pagepool
pool of mmapped pages
pages shared (child + parent)
bool empty() const
return true if no used page groups in this chunk
int main(int argc, char *argv[])
bool closed() const
true if closed
MMapVariety
type of mmap support found
void zap(Pages &p)
free all pages except for those pointed to by p
unsigned recvpages()
receive a pages from the other end (may block), queue them
static void * dommap(unsigned len)
mmap pages, len is length of mmapped area in bytes
static unsigned getPageSize()
determine page size at run time
static void teardownall(void)
cleanup routine - at exit, we want our children to get a SIGTERM...
std::list< void * > m_freelist
free pages list
static Vc_ALWAYS_INLINE int_v max(const int_v &x, const int_v &y)
Binding & operator=(OUT(*fun)(void))
BidirMMapPipe & operator>>(char *(&str))
read a C-style string
void * m_end
pointer one behind end of mmapped area
PageChunk * m_parent
pointer to parent pool
BidirMMapPipe_impl::BidirMMapPipeException Exception
convenience typedef for BidirMMapPipeException
typedef void((*Func_t)())
void push(const Pages &p)
push a group of pages onto the free list
int m_inpipe
pipe end from which data may be read
void feedPageLists(Page *plist)
"feed" the busy and free lists with a list of pages
static unsigned long masks[]
Page * m_pages
pointer to first page
BidirMMapPipe_impl::Pages m_pages
mmapped pages
static std::list< BidirMMapPipe * > s_openpipes
list of open BidirMMapPipes
BidirMMapPipe(bool useExceptions=true, bool useSocketpair=false)
constructor (forks!)
static unsigned s_pagepoolrefcnt
page pool reference counter
static unsigned s_pagesize
system page size (run-time determined)
pipe has data for reading
Vc_ALWAYS_INLINE_L T *Vc_ALWAYS_INLINE_R malloc(size_t n)
Allocates memory on the Heap with alignment and padding suitable for vectorized access.
nothing special on this pipe
int m_flags
flags (e.g. end of file)
mmap doesn't work, have to copy back and forth
static unsigned pagesize()
return the page size of the system
unsigned pageno(Page *p) const
perform page to page number mapping
#define BEGIN_NAMESPACE_ROOFIT