ROOT  6.06/09
Reference Guide
BidirMMapPipe.cxx
Go to the documentation of this file.
1 /** @file BidirMMapPipe.cxx
2  *
3  * implementation of BidirMMapPipe, a class which forks off a child process
4  * and serves as communications channel between parent and child
5  *
6  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
7  * @date 2013-07-07
8  */
9 #ifndef _WIN32
10 #include <map>
11 #include <cerrno>
12 #include <limits>
13 #include <string>
14 #include <cstdlib>
15 #include <cstring>
16 #include <iostream>
17 #include <algorithm>
18 #include <exception>
19 
20 #include <poll.h>
21 #include <fcntl.h>
22 #include <signal.h>
23 #include <string.h>
24 #include <unistd.h>
25 #include <stdlib.h>
26 #include <pthread.h>
27 #include <sys/mman.h>
28 #include <sys/stat.h>
29 #include <sys/wait.h>
30 #include <sys/socket.h>
31 
32 #include "BidirMMapPipe.h"
33 
34 #define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
35 #define END_NAMESPACE_ROOFIT }
36 
38 
39 /// namespace for implementation details of BidirMMapPipe
40 namespace BidirMMapPipe_impl {
41  /** @brief exception to throw if low-level OS calls go wrong
42  *
43  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
44  * @date 2013-07-07
45  */
46  class BidirMMapPipeException : public std::exception
47  {
48  private:
49  enum {
50  s_sz = 256 ///< length of buffer
51  };
52  char m_buf[s_sz]; ///< buffer containing the error message
53 
54  /// for the POSIX version of strerror_r
55  static int dostrerror_r(int err, char* buf, std::size_t sz,
56  int (*f)(int, char*, std::size_t))
57  { return f(err, buf, sz); }
58  /// for the GNU version of strerror_r
59  static int dostrerror_r(int, char*, std::size_t,
60  char* (*f)(int, char*, std::size_t));
61  public:
62  /// constructor taking error code, hint on operation (msg)
63  BidirMMapPipeException(const char* msg, int err);
64  /// return a destcription of what went wrong
65  virtual const char* what() const throw() { return m_buf; }
66  };
67 
68  BidirMMapPipeException::BidirMMapPipeException(const char* msg, int err)
69  {
70  std::size_t msgsz = std::strlen(msg);
71  if (msgsz) {
72  msgsz = std::min(msgsz, std::size_t(s_sz));
73  std::copy(msg, msg + msgsz, m_buf);
74  if (msgsz < s_sz) { m_buf[msgsz] = ':'; ++msgsz; }
75  if (msgsz < s_sz) { m_buf[msgsz] = ' '; ++msgsz; }
76  }
77  if (msgsz < s_sz) {
78  // UGLY: GNU and POSIX cannot agree on prototype and behaviour, so
79  // have to sort it out with overloads
80  dostrerror_r(err, &m_buf[msgsz], s_sz - msgsz, ::strerror_r);
81  }
82  m_buf[s_sz - 1] = 0; // enforce zero-termination
83  }
84 
85  int BidirMMapPipeException::dostrerror_r(int err, char* buf,
86  std::size_t sz, char* (*f)(int, char*, std::size_t))
87  {
88  buf[0] = 0;
89  char *tmp = f(err, buf, sz);
90  if (tmp && tmp != buf) {
91  std::strncpy(buf, tmp, sz);
92  buf[sz - 1] = 0;
93  if (std::strlen(tmp) > sz - 1) return ERANGE;
94  }
95  return 0;
96  }
97 
98  /** @brief class representing the header structure in an mmapped page
99  *
100  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
101  * @date 2013-07-07
102  *
103  * contains a field to put pages into a linked list, a field for the size
104  * of the data being transmitted, and a field for the position until which
105  * the data has been read
106  */
107  class Page
108  {
109  private:
110  // use as small a data type as possible to maximise payload area
111  // of pages
112  short m_next; ///< next page in list (in pagesizes)
113  unsigned short m_size; ///< size of payload (in bytes)
114  unsigned short m_pos; ///< index of next byte in payload area
115  /// copy construction forbidden
116  Page(const Page&) {}
117  /// assertignment forbidden
118  Page& operator=(const Page&)
119  { return *reinterpret_cast<Page*>(0); }
120  public:
121  /// constructor
122  Page() : m_next(0), m_size(0), m_pos(0)
123  {
124  // check that short is big enough - must be done at runtime
125  // because the page size is not known until runtime
128  }
129  /// set pointer to next page
130  void setNext(const Page* p);
131  /// return pointer to next page
132  Page* next() const;
133  /// return reference to size field
134  unsigned short& size() { return m_size; }
135  /// return size (of payload data)
136  unsigned size() const { return m_size; }
137  /// return reference to position field
138  unsigned short& pos() { return m_pos; }
139  /// return position
140  unsigned pos() const { return m_pos; }
141  /// return pointer to first byte in payload data area of page
142  inline unsigned char* begin() const
143  { return reinterpret_cast<unsigned char*>(const_cast<Page*>(this))
144  + sizeof(Page); }
145  /// return pointer to first byte in payload data area of page
146  inline unsigned char* end() const
147  { return reinterpret_cast<unsigned char*>(const_cast<Page*>(this))
148  + PageChunk::pagesize(); }
149  /// return the capacity of the page
150  static unsigned capacity()
151  { return PageChunk::pagesize() - sizeof(Page); }
152  /// true if page empty
153  bool empty() const { return !m_size; }
154  /// true if page partially filled
155  bool filled() const { return !empty(); }
156  /// free space left (to be written to)
157  unsigned free() const { return capacity() - m_size; }
158  /// bytes remaining to be read
159  unsigned remaining() const { return m_size - m_pos; }
160  /// true if page completely full
161  bool full() const { return !free(); }
162  };
163 
164  void Page::setNext(const Page* p)
165  {
166  if (!p) {
167  m_next = 0;
168  } else {
169  const char* p1 = reinterpret_cast<char*>(this);
170  const char* p2 = reinterpret_cast<const char*>(p);
171  std::ptrdiff_t tmp = p2 - p1;
172  // difference must be divisible by page size
173  assert(!(tmp % PageChunk::pagesize()));
174  tmp /= static_cast<std::ptrdiff_t>(PageChunk::pagesize());
175  m_next = tmp;
176  // no truncation when saving in a short
177  assert(m_next == tmp);
178  // final check: next() must return p
179  assert(next() == p);
180  }
181  }
182 
183  Page* Page::next() const
184  {
185  if (!m_next) return 0;
186  char* ptmp = reinterpret_cast<char*>(const_cast<Page*>(this));
187  ptmp += std::ptrdiff_t(m_next) * PageChunk::pagesize();
188  return reinterpret_cast<Page*>(ptmp);
189  }
190 
191  /** @brief class representing a page pool
192  *
193  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
194  * @date 2013-07-24
195  *
196  * pool of mmapped pages (on systems which support it, on all others, the
197  * functionality is emulated with dynamically allocated memory)
198  *
199  * in most operating systems there is a limit to how many mappings any one
200  * process is allowed to request; for this reason, we mmap a relatively
201  * large amount up front, and then carve off little pieces as we need them
202  */
203  class PagePool {
204  private:
205  /// convenience typedef
206  typedef BidirMMapPipeException Exception;
207 
208  enum {
209  minsz = 7, ///< minimum chunk size (just below 1 << minsz bytes)
210  maxsz = 20, ///< maximum chunk size (just below 1 << maxsz bytes)
211  szincr = 1 ///< size class increment (sz = 1 << (minsz + k * szincr))
212  };
213  /// a chunk of memory in the pool
214  typedef BidirMMapPipe_impl::PageChunk Chunk;
215  /// list of chunks
216  typedef std::list<Chunk*> ChunkList;
217 
218  friend class BidirMMapPipe_impl::PageChunk;
219  public:
220  /// convenience typedef
222  /// constructor
223  PagePool(unsigned nPagesPerGroup);
224  /// destructor
225  ~PagePool();
226  /// pop a free element out of the pool
227  Pages pop();
228 
229  /// return page size of the system
230  static unsigned pagesize() { return PageChunk::pagesize(); }
231  /// return variety of mmap supported on the system
232  static MMapVariety mmapVariety()
233  { return PageChunk::mmapVariety(); }
234 
235  /// return number of pages per group (ie. as returned by pop())
236  unsigned nPagesPerGroup() const { return m_nPgPerGrp; }
237 
238  /// zap the pool (unmap all but Pages p)
239  void zap(Pages& p);
240 
241  private:
242  /// list of chunks used by the pool
243  ChunkList m_chunks;
244  /// list of chunks used by the pool which are not full
245  ChunkList m_freelist;
246  /// chunk size map (histogram of chunk sizes)
247  unsigned m_szmap[(maxsz - minsz) / szincr];
248  /// current chunk size
249  int m_cursz;
250  /// page group size
251  unsigned m_nPgPerGrp;
252 
253  /// adjust _cursz to current largest block
254  void updateCurSz(int sz, int incr);
255  /// find size of next chunk to allocate (in a hopefully smart way)
256  int nextChunkSz() const;
257  /// release a chunk
258  void putOnFreeList(Chunk* chunk);
259  /// release a chunk
260  void release(Chunk* chunk);
261  };
262 
263  Pages::Pages(PageChunk* parent, Page* pages, unsigned npg) :
264  m_pimpl(new impl)
265  {
266  assert(npg < 256);
267  m_pimpl->m_parent = parent;
268  m_pimpl->m_pages = pages;
269  m_pimpl->m_refcnt = 1;
270  m_pimpl->m_npages = npg;
271  /// initialise pages
272  for (unsigned i = 0; i < m_pimpl->m_npages; ++i) new(page(i)) Page();
273  }
274 
275  unsigned PageChunk::s_pagesize = PageChunk::getPageSize();
276  PageChunk::MMapVariety PageChunk::s_mmapworks = PageChunk::Unknown;
277 
279  {
280  if (m_pimpl && !--(m_pimpl->m_refcnt)) {
281  if (m_pimpl->m_parent) m_pimpl->m_parent->push(*this);
282  delete m_pimpl;
283  }
284  }
285 
286  Pages::Pages(const Pages& other) :
287  m_pimpl(other.m_pimpl)
288  { ++(m_pimpl->m_refcnt); }
289 
290  Pages& Pages::operator=(const Pages& other)
291  {
292  if (&other == this) return *this;
293  if (--(m_pimpl->m_refcnt)) {
294  if (m_pimpl->m_parent) m_pimpl->m_parent->push(*this);
295  delete m_pimpl;
296  }
297  m_pimpl = other.m_pimpl;
298  ++(m_pimpl->m_refcnt);
299  return *this;
300  }
301 
302  unsigned Pages::pagesize() { return PageChunk::pagesize(); }
303 
304  Page* Pages::page(unsigned pgno) const
305  {
306  assert(pgno < m_pimpl->m_npages);
307  unsigned char* pptr =
308  reinterpret_cast<unsigned char*>(m_pimpl->m_pages);
309  pptr += pgno * pagesize();
310  return reinterpret_cast<Page*>(pptr);
311  }
312 
313  unsigned Pages::pageno(Page* p) const
314  {
315  const unsigned char* pptr =
316  reinterpret_cast<const unsigned char*>(p);
317  const unsigned char* bptr =
318  reinterpret_cast<const unsigned char*>(m_pimpl->m_pages);
319  assert(0 == ((pptr - bptr) % pagesize()));
320  const unsigned nr = (pptr - bptr) / pagesize();
321  assert(nr < m_pimpl->m_npages);
322  return nr;
323  }
324 
326  {
327  // find out page size of system
328  long pgsz = sysconf(_SC_PAGESIZE);
329  if (-1 == pgsz) throw Exception("sysconf", errno);
330  if (pgsz > 512 && pgsz > long(sizeof(Page)))
331  return pgsz;
332 
333  // in case of failure or implausible value, use a safe default: 4k
334  // page size, and do not try to mmap
335  s_mmapworks = Copy;
336  return 1 << 12;
337  }
338 
339  PageChunk::PageChunk(PagePool* parent,
340  unsigned length, unsigned nPgPerGroup) :
341  m_begin(dommap(length)),
342  m_end(reinterpret_cast<void*>(
343  reinterpret_cast<unsigned char*>(m_begin) + length)),
344  m_parent(parent), m_nPgPerGrp(nPgPerGroup), m_nUsedGrp(0)
345  {
346  // ok, push groups of pages onto freelist here
347  unsigned char* p = reinterpret_cast<unsigned char*>(m_begin);
348  unsigned char* pend = reinterpret_cast<unsigned char*>(m_end);
349  while (p < pend) {
350  m_freelist.push_back(reinterpret_cast<void*>(p));
351  p += nPgPerGroup * PagePool::pagesize();
352  }
353  }
354 
356  {
357  if (m_parent) assert(empty());
358  if (m_begin) domunmap(m_begin, len());
359  }
360 
361  bool PageChunk::contains(const Pages& p) const
362  { return p.m_pimpl->m_parent == this; }
363 
365  {
366  assert(!m_freelist.empty());
367  void* p = m_freelist.front();
368  m_freelist.pop_front();
369  ++m_nUsedGrp;
370  return Pages(this, reinterpret_cast<Page*>(p), m_nPgPerGrp);
371  }
372 
373  void PageChunk::push(const Pages& p)
374  {
375  assert(contains(p));
376  bool wasempty = m_freelist.empty();
377  m_freelist.push_front(reinterpret_cast<void*>(p[0u]));
378  --m_nUsedGrp;
379  if (m_parent) {
380  // notify parent if we need to be put on the free list again
381  if (wasempty) m_parent->putOnFreeList(this);
382  // notify parent if we're empty
383  if (empty()) return m_parent->release(this);
384  }
385  }
386 
387  void* PageChunk::dommap(unsigned len)
388  {
389  assert(len && 0 == (len % s_pagesize));
390  // ok, the idea here is to try the different methods of mmapping, and
391  // choose the first one that works. we have four flavours:
392  // 1 - anonymous mmap (best)
393  // 2 - mmap of /dev/zero (about as good as anonymous mmap, but a tiny
394  // bit more tedious to set up, since you need to open/close a
395  // device file)
396  // 3 - mmap of a temporary file (very tedious to set up - need to
397  // create a temporary file, delete it, make the underlying storage
398  // large enough, then mmap the fd and close it)
399  // 4 - if all those fail, we malloc the buffers, and copy the data
400  // through the OS (then we're no better than normal pipes)
401  static bool msgprinted = false;
402  if (Anonymous == s_mmapworks || Unknown == s_mmapworks) {
403 #if defined(MAP_ANONYMOUS)
404 #undef MYANONFLAG
405 #define MYANONFLAG MAP_ANONYMOUS
406 #elif defined(MAP_ANON)
407 #undef MYANONFLAG
408 #define MYANONFLAG MAP_ANON
409 #else
410 #undef MYANONFLAG
411 #endif
412 #ifdef MYANONFLAG
413  void* retVal = ::mmap(0, len, PROT_READ | PROT_WRITE,
414  MYANONFLAG | MAP_SHARED, -1, 0);
415  if (MAP_FAILED == retVal) {
416  if (Anonymous == s_mmapworks) throw Exception("mmap", errno);
417  } else {
420  if (BidirMMapPipe::debugflag() && !msgprinted) {
421  std::cerr << " INFO: In " << __func__ << " (" <<
422  __FILE__ << ", line " << __LINE__ <<
423  "): anonymous mmapping works, excellent!" <<
424  std::endl;
425  msgprinted = true;
426  }
427  return retVal;
428  }
429 #endif
430 #undef MYANONFLAG
431  }
432  if (DevZero == s_mmapworks || Unknown == s_mmapworks) {
433  // ok, no anonymous mappings supported directly, so try to map
434  // /dev/zero which has much the same effect on many systems
435  int fd = ::open("/dev/zero", O_RDWR);
436  if (-1 == fd)
437  throw Exception("open /dev/zero", errno);
438  void* retVal = ::mmap(0, len,
439  PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
440  if (MAP_FAILED == retVal) {
441  int errsv = errno;
442  ::close(fd);
443  if (DevZero == s_mmapworks) throw Exception("mmap", errsv);
444  } else {
447  }
448  if (-1 == ::close(fd))
449  throw Exception("close /dev/zero", errno);
450  if (BidirMMapPipe::debugflag() && !msgprinted) {
451  std::cerr << " INFO: In " << __func__ << " (" << __FILE__ <<
452  ", line " << __LINE__ << "): mmapping /dev/zero works, "
453  "very good!" << std::endl;
454  msgprinted = true;
455  }
456  return retVal;
457  }
458  if (FileBacked == s_mmapworks || Unknown == s_mmapworks) {
459  char name[] = "/tmp/BidirMMapPipe-XXXXXX";
460  int fd;
461  // open temp file
462  if (-1 == (fd = ::mkstemp(name))) throw Exception("mkstemp", errno);
463  // remove it, but keep fd open
464  if (-1 == ::unlink(name)) {
465  int errsv = errno;
466  ::close(fd);
467  throw Exception("unlink", errsv);
468  }
469  // make it the right size: lseek
470  if (-1 == ::lseek(fd, len - 1, SEEK_SET)) {
471  int errsv = errno;
472  ::close(fd);
473  throw Exception("lseek", errsv);
474  }
475  // make it the right size: write a byte
476  if (1 != ::write(fd, name, 1)) {
477  int errsv = errno;
478  ::close(fd);
479  throw Exception("write", errsv);
480  }
481  // do mmap
482  void* retVal = ::mmap(0, len,
483  PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
484  if (MAP_FAILED == retVal) {
485  int errsv = errno;
486  ::close(fd);
487  if (FileBacked == s_mmapworks) throw Exception("mmap", errsv);
488  } else {
491  }
492  if (-1 == ::close(fd)) {
493  int errsv = errno;
494  ::munmap(retVal, len);
495  throw Exception("close", errsv);
496  }
497  if (BidirMMapPipe::debugflag() && !msgprinted) {
498  std::cerr << " INFO: In " << __func__ << " (" << __FILE__ <<
499  ", line " << __LINE__ << "): mmapping temporary files "
500  "works, good!" << std::endl;
501  msgprinted = true;
502  }
503  return retVal;
504  }
505  if (Copy == s_mmapworks || Unknown == s_mmapworks) {
506  // fallback solution: mmap does not work on this OS (or does not
507  // work for what we want to use it), so use a normal buffer of
508  // memory instead, and collect data in that buffer - this needs an
509  // additional write/read to/from the pipe(s), but there you go...
510  if (BidirMMapPipe::debugflag() && !msgprinted) {
511  std::cerr << "WARNING: In " << __func__ << " (" << __FILE__ <<
512  ", line " << __LINE__ << "): anonymous mmapping of "
513  "shared buffers failed, falling back to read/write on "
514  " pipes!" << std::endl;
515  msgprinted = true;
516  }
517  s_mmapworks = Copy;
518  void* retVal = std::malloc(len);
519  if (!retVal) throw Exception("malloc", errno);
520  return retVal;
521  }
522  // should never get here
523  assert(false);
524  return 0;
525  }
526 
527  void PageChunk::domunmap(void* addr, unsigned len)
528  {
529  assert(len && 0 == (len % s_pagesize));
530  if (addr) {
532  if (Copy != s_mmapworks) {
533  if (-1 == ::munmap(addr, len))
534  throw Exception("munmap", errno);
535  } else {
536  std::free(addr);
537  }
538  }
539  }
540 
542  {
543  // try to mprotect the other bits of the pool with no access...
544  // we'd really like a version of mremap here that can unmap all the
545  // other pages in the chunk, but that does not exist, so we protect
546  // the other pages in this chunk such that they may neither be read,
547  // written nor executed, only the pages we're interested in for
548  // communications stay readable and writable
549  //
550  // if an OS does not support changing the protection of a part of an
551  // mmapped area, the mprotect calls below should just fail and not
552  // change any protection, so we're a little less safe against
553  // corruption, but everything should still work
554  if (Copy != s_mmapworks) {
555  unsigned char* p0 = reinterpret_cast<unsigned char*>(m_begin);
556  unsigned char* p1 = reinterpret_cast<unsigned char*>(p[0u]);
557  unsigned char* p2 = p1 + p.npages() * pagesize();
558  unsigned char* p3 = reinterpret_cast<unsigned char*>(m_end);
559  if (p1 != p0) ::mprotect(p0, p1 - p0, PROT_NONE);
560  if (p2 != p3) ::mprotect(p2, p3 - p2, PROT_NONE);
561  }
562  m_parent = 0;
563  m_freelist.clear();
564  m_nUsedGrp = 1;
565  p.m_pimpl->m_parent = 0;
566  m_begin = m_end = 0;
567  // commit suicide
568  delete this;
569  }
570 
571  PagePool::PagePool(unsigned nPgPerGroup) :
572  m_cursz(minsz), m_nPgPerGrp(nPgPerGroup)
573  { std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0); }
574 
575  PagePool::~PagePool()
576  {
577  m_freelist.clear();
578  for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it)
579  delete *it;
580  m_chunks.clear();
581  }
582 
583  void PagePool::zap(Pages& p)
584  {
585  // unmap all pages but those pointed to by p
586  m_freelist.clear();
587  for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it) {
588  if ((*it)->contains(p)) {
589  (*it)->zap(p);
590  } else {
591  delete *it;
592  }
593  }
594  m_chunks.clear();
595  std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
596  m_cursz = minsz;
597  }
598 
599  Pages PagePool::pop()
600  {
601  if (m_freelist.empty()) {
602  // allocate and register new chunk and put it on the freelist
603  const int sz = nextChunkSz();
604  Chunk *c = new Chunk(this,
605  sz * m_nPgPerGrp * pagesize(), m_nPgPerGrp);
606  m_chunks.push_front(c);
607  m_freelist.push_back(c);
608  updateCurSz(sz, +1);
609  }
610  // get free element from first chunk on _freelist
611  Chunk* c = m_freelist.front();
612  Pages p(c->pop());
613  // full chunks are removed from _freelist
614  if (c->full()) m_freelist.pop_front();
615  return p;
616  }
617 
618  void PagePool::release(PageChunk* chunk)
619  {
620  assert(chunk->empty());
621  // find chunk on freelist and remove
622  ChunkList::iterator it = std::find(
623  m_freelist.begin(), m_freelist.end(), chunk);
624  if (m_freelist.end() == it)
625  throw Exception("PagePool::release(PageChunk*)", EINVAL);
626  m_freelist.erase(it);
627  // find chunk in m_chunks and remove
628  it = std::find(m_chunks.begin(), m_chunks.end(), chunk);
629  if (m_chunks.end() == it)
630  throw Exception("PagePool::release(PageChunk*)", EINVAL);
631  m_chunks.erase(it);
632  const unsigned sz = chunk->len() / (pagesize() * m_nPgPerGrp);
633  delete chunk;
634  updateCurSz(sz, -1);
635  }
636 
637  void PagePool::putOnFreeList(PageChunk* chunk)
638  {
639  assert(!chunk->full());
640  m_freelist.push_back(chunk);
641  }
642 
643  void PagePool::updateCurSz(int sz, int incr)
644  {
645  m_szmap[(sz - minsz) / szincr] += incr;
646  m_cursz = minsz;
647  for (int i = (maxsz - minsz) / szincr; i--; ) {
648  if (m_szmap[i]) {
649  m_cursz += i * szincr;
650  break;
651  }
652  }
653  }
654 
655  int PagePool::nextChunkSz() const
656  {
657  // no chunks with space available, figure out chunk size
658  int sz = m_cursz;
659  if (m_chunks.empty()) {
660  // if we start allocating chunks, we start from minsz
661  sz = minsz;
662  } else {
663  if (minsz >= sz) {
664  // minimal sized chunks are always grown
665  sz = minsz + szincr;
666  } else {
667  if (1 != m_chunks.size()) {
668  // if we have more than one completely filled chunk, grow
669  sz += szincr;
670  } else {
671  // just one chunk left, try shrinking chunk size
672  sz -= szincr;
673  }
674  }
675  }
676  // clamp size to allowed range
677  if (sz > maxsz) sz = maxsz;
678  if (sz < minsz) sz = minsz;
679  return sz;
680  }
681 }
682 
683 // static BidirMMapPipe members
684 pthread_mutex_t BidirMMapPipe::s_openpipesmutex = PTHREAD_MUTEX_INITIALIZER;
685 std::list<BidirMMapPipe*> BidirMMapPipe::s_openpipes;
686 BidirMMapPipe_impl::PagePool* BidirMMapPipe::s_pagepool = 0;
687 unsigned BidirMMapPipe::s_pagepoolrefcnt = 0;
688 int BidirMMapPipe::s_debugflag = 0;
689 
690 BidirMMapPipe_impl::PagePool& BidirMMapPipe::pagepool()
691 {
692  if (!s_pagepool)
693  s_pagepool = new BidirMMapPipe_impl::PagePool(TotPages);
694  return *s_pagepool;
695 }
696 
698 {
699  pthread_mutex_lock(&s_openpipesmutex);
700  while (!s_openpipes.empty()) {
701  BidirMMapPipe *p = s_openpipes.front();
702  pthread_mutex_unlock(&s_openpipesmutex);
703  if (p->m_childPid) kill(p->m_childPid, SIGTERM);
704  p->doClose(true, true);
705  pthread_mutex_lock(&s_openpipesmutex);
706  }
707  pthread_mutex_unlock(&s_openpipesmutex);
708 }
709 
711  m_pages(pagepool().pop())
712 {
713  // free pages again
715  if (!s_pagepoolrefcnt) {
716  delete s_pagepool;
717  s_pagepool = 0;
718  }
719 }
720 
721 BidirMMapPipe::BidirMMapPipe(bool useExceptions, bool useSocketpair) :
722  m_pages(pagepool().pop()), m_busylist(0), m_freelist(0), m_dirtylist(0),
723  m_inpipe(-1), m_outpipe(-1), m_flags(failbit), m_childPid(0),
724  m_parentPid(::getpid())
725 
726 {
728  assert(TotPages && 0 == (TotPages & 1) && TotPages <= 256);
729  int fds[4] = { -1, -1, -1, -1 };
730  int myerrno;
731  static bool firstcall = true;
732  if (useExceptions) m_flags |= exceptionsbit;
733 
734  try {
735  if (firstcall) {
736  firstcall = false;
737  // register a cleanup handler to make sure all BidirMMapPipes are torn
738  // down, and child processes are sent a SIGTERM
739  if (0 != atexit(BidirMMapPipe::teardownall))
740  throw Exception("atexit", errno);
741  }
742 
743  // build free lists
744  for (unsigned i = 1; i < TotPages; ++i)
745  m_pages[i - 1]->setNext(m_pages[i]);
746  m_pages[PagesPerEnd - 1]->setNext(0);
747  if (!useSocketpair) {
748  // create pipes
749  if (0 != ::pipe(&fds[0])) throw Exception("pipe", errno);
750  if (0 != ::pipe(&fds[2])) throw Exception("pipe", errno);
751  } else {
752  if (0 != ::socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]))
753  throw Exception("socketpair", errno);
754  }
755  // fork the child
756  pthread_mutex_lock(&s_openpipesmutex);
757  char c;
758  switch ((m_childPid = ::fork())) {
759  case -1: // error in fork()
760  myerrno = errno;
761  pthread_mutex_unlock(&s_openpipesmutex);
762  m_childPid = 0;
763  throw Exception("fork", myerrno);
764  case 0: // child
765  // put the ends in the right place
766  if (-1 != fds[2]) {
767  // pair of pipes
768  if (-1 == ::close(fds[0]) || (-1 == ::close(fds[3]))) {
769  myerrno = errno;
770  pthread_mutex_unlock(&s_openpipesmutex);
771  throw Exception("close", myerrno);
772  }
773  fds[0] = fds[3] = -1;
774  m_outpipe = fds[1];
775  m_inpipe = fds[2];
776  } else {
777  // socket pair
778  if (-1 == ::close(fds[0])) {
779  myerrno = errno;
780  pthread_mutex_unlock(&s_openpipesmutex);
781  throw Exception("close", myerrno);
782  }
783  fds[0] = -1;
784  m_inpipe = m_outpipe = fds[1];
785  }
786  // close other pipes our parent may have open - we have no business
787  // reading from/writing to those...
788  for (std::list<BidirMMapPipe*>::iterator it = s_openpipes.begin();
789  s_openpipes.end() != it; ) {
790  BidirMMapPipe* p = *it;
791  it = s_openpipes.erase(it);
792  p->doClose(true, true);
793  }
794  pagepool().zap(m_pages);
795  s_pagepoolrefcnt = 0;
796  delete s_pagepool;
797  s_pagepool = 0;
798  s_openpipes.push_front(this);
799  pthread_mutex_unlock(&s_openpipesmutex);
800  // ok, put our pages on freelist
802  // handshare with other end (to make sure it's alive)...
803  c = 'C'; // ...hild
804  if (1 != xferraw(m_outpipe, &c, 1, ::write))
805  throw Exception("handshake: xferraw write", EPIPE);
806  if (1 != xferraw(m_inpipe, &c, 1, ::read))
807  throw Exception("handshake: xferraw read", EPIPE);
808  if ('P' != c) throw Exception("handshake", EPIPE);
809  break;
810  default: // parent
811  // put the ends in the right place
812  if (-1 != fds[2]) {
813  // pair of pipes
814  if (-1 == ::close(fds[1]) || -1 == ::close(fds[2])) {
815  myerrno = errno;
816  pthread_mutex_unlock(&s_openpipesmutex);
817  throw Exception("close", myerrno);
818  }
819  fds[1] = fds[2] = -1;
820  m_outpipe = fds[3];
821  m_inpipe = fds[0];
822  } else {
823  // socketpair
824  if (-1 == ::close(fds[1])) {
825  myerrno = errno;
826  pthread_mutex_unlock(&s_openpipesmutex);
827  throw Exception("close", myerrno);
828  }
829  fds[1] = -1;
830  m_inpipe = m_outpipe = fds[0];
831  }
832  // put on list of open pipes (so we can kill child processes
833  // if things go wrong)
834  s_openpipes.push_front(this);
835  pthread_mutex_unlock(&s_openpipesmutex);
836  // ok, put our pages on freelist
837  m_freelist = m_pages[0u];
838  // handshare with other end (to make sure it's alive)...
839  c = 'P'; // ...arent
840  if (1 != xferraw(m_outpipe, &c, 1, ::write))
841  throw Exception("handshake: xferraw write", EPIPE);
842  if (1 != xferraw(m_inpipe, &c, 1, ::read))
843  throw Exception("handshake: xferraw read", EPIPE);
844  if ('C' != c) throw Exception("handshake", EPIPE);
845  break;
846  }
847  // mark file descriptors for close on exec (we do not want to leak the
848  // connection to anything we happen to exec)
849  int fdflags = 0;
850  if (-1 == ::fcntl(m_outpipe, F_GETFD, &fdflags))
851  throw Exception("fcntl", errno);
852  fdflags |= FD_CLOEXEC;
853  if (-1 == ::fcntl(m_outpipe, F_SETFD, fdflags))
854  throw Exception("fcntl", errno);
855  if (m_inpipe != m_outpipe) {
856  if (-1 == ::fcntl(m_inpipe, F_GETFD, &fdflags))
857  throw Exception("fcntl", errno);
858  fdflags |= FD_CLOEXEC;
859  if (-1 == ::fcntl(m_inpipe, F_SETFD, fdflags))
860  throw Exception("fcntl", errno);
861  }
862  // ok, finally, clear the failbit
863  m_flags &= ~failbit;
864  // all done
865  } catch (const BidirMMapPipe::Exception& e) {
866  if (0 != m_childPid) kill(m_childPid, SIGTERM);
867  for (int i = 0; i < 4; ++i)
868  if (-1 != fds[i] && 0 != fds[i]) ::close(fds[i]);
869  {
870  // free resources associated with mmapped pages
872  }
873  if (!--s_pagepoolrefcnt) {
874  delete s_pagepool;
875  s_pagepool = 0;
876  }
877  throw e;
878  }
879 }
880 
882 {
883  assert(!(m_flags & failbit));
884  return doClose(false);
885 }
886 
887 int BidirMMapPipe::doClose(bool force, bool holdlock)
888 {
889  if (m_flags & failbit) return 0;
890  // flush data to be written
891  if (!force && -1 != m_outpipe && -1 != m_inpipe) flush();
892  // shut down the write direction (no more writes from our side)
893  if (m_inpipe == m_outpipe) {
894  if (-1 != m_outpipe && !force && -1 == ::shutdown(m_outpipe, SHUT_WR))
895  throw Exception("shutdown", errno);
896  m_outpipe = -1;
897  } else {
898  if (-1 != m_outpipe && -1 == ::close(m_outpipe))
899  if (!force) throw Exception("close", errno);
900  m_outpipe = -1;
901  }
902  // shut down the write direction (no more writes from our side)
903  // drain anything the other end might still want to send
904  if (!force && -1 != m_inpipe) {
905  // **************** THIS IS EXTREMELY UGLY: ****************
906  // POLLHUP is not set reliably on pipe/socket shutdown on all
907  // platforms, unfortunately, so we poll for readability here until
908  // the other end closes, too
909  //
910  // the read loop below ensures that the other end sees the POLLIN that
911  // is set on shutdown instead, and goes ahead to close its end
912  //
913  // if we don't do this, and close straight away, the other end
914  // will catch a SIGPIPE or similar, and we don't want that
915  int err;
916  struct pollfd fds;
917  fds.fd = m_inpipe;
918  fds.events = POLLIN;
919  fds.revents = 0;
920  do {
921  while ((err = ::poll(&fds, 1, 1 << 20)) >= 0) {
922  if (fds.revents & (POLLERR | POLLHUP | POLLNVAL)) break;
923  if (fds.revents & POLLIN) {
924  char c;
925  if (1 > ::read(m_inpipe, &c, 1)) break;
926  }
927  }
928  } while (0 > err && EINTR == errno);
929  // ignore all other poll errors
930  }
931  // close read end
932  if (-1 != m_inpipe && -1 == ::close(m_inpipe))
933  if (!force) throw Exception("close", errno);
934  m_inpipe = -1;
935  // unmap memory
936  try {
938  if (!--s_pagepoolrefcnt) {
939  delete s_pagepool;
940  s_pagepool = 0;
941  }
942  } catch (const std::exception& e) {
943  if (!force) throw e;
944  }
946  // wait for child process
947  int retVal = 0;
948  if (isParent()) {
949  int tmp;
950  do {
951  tmp = waitpid(m_childPid, &retVal, 0);
952  } while (-1 == tmp && EINTR == errno);
953  if (-1 == tmp)
954  if (!force) throw Exception("waitpid", errno);
955  m_childPid = 0;
956  }
957  // remove from list of open pipes
958  if (!holdlock) pthread_mutex_lock(&s_openpipesmutex);
959  std::list<BidirMMapPipe*>::iterator it = std::find(
960  s_openpipes.begin(), s_openpipes.end(), this);
961  if (s_openpipes.end() != it) s_openpipes.erase(it);
962  if (!holdlock) pthread_mutex_unlock(&s_openpipesmutex);
963  m_flags |= failbit;
964  return retVal;
965 }
966 
968 { doClose(false); }
969 
971  int fd, void* addr, size_type len,
972  ssize_t (*xferfn)(int, void*, std::size_t))
973 {
974  size_type xferred = 0;
975  unsigned char* buf = reinterpret_cast<unsigned char*>(addr);
976  while (len) {
977  ssize_t tmp = xferfn(fd, buf, len);
978  if (tmp > 0) {
979  xferred += tmp;
980  len -= tmp;
981  buf += tmp;
982  continue;
983  } else if (0 == tmp) {
984  // check for end-of-file on pipe
985  break;
986  } else if (-1 == tmp) {
987  // ok some error occurred, so figure out if we want to retry of throw
988  switch (errno) {
989  default:
990  // if anything was transferred, return number of bytes
991  // transferred so far, we can start throwing on the next
992  // transfer...
993  if (xferred) return xferred;
994  // else throw
995  throw Exception("xferraw", errno);
996  case EAGAIN: // fallthrough intended
997 #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
998  case EWOULDBLOCK: // fallthrough intended
999 #endif
1000  std::cerr << " ERROR: In " << __func__ << " (" <<
1001  __FILE__ << ", line " << __LINE__ <<
1002  "): expect transfer to block!" << std::endl;
1003  case EINTR:
1004  break;
1005  }
1006  continue;
1007  } else {
1008  throw Exception("xferraw: unexpected return value from read/write",
1009  errno);
1010  }
1011  }
1012  return xferred;
1013 }
1014 
1016 {
1017  if (plist) {
1018  unsigned char pg = m_pages[plist];
1019  if (1 == xferraw(m_outpipe, &pg, 1, ::write)) {
1022  // ok, have to copy pages through pipe
1023  for (Page* p = plist; p; p = p->next()) {
1024  if (sizeof(Page) + p->size() !=
1025  xferraw(m_outpipe, p, sizeof(Page) + p->size(),
1026  ::write)) {
1027  throw Exception("sendpages: short write", EPIPE);
1028  }
1029  }
1030  }
1031  } else {
1032  throw Exception("sendpages: short write", EPIPE);
1033  }
1034  } else { assert(plist); }
1035 }
1036 
1038 {
1039  unsigned char pg;
1040  unsigned retVal = 0;
1041  Page *plisthead = 0, *plisttail = 0;
1042  if (1 == xferraw(m_inpipe, &pg, 1, ::read)) {
1043  plisthead = plisttail = m_pages[pg];
1044  // ok, have number of pages
1047  // ok, need to copy pages through pipe
1048  for (; plisttail; ++retVal) {
1049  Page* p = plisttail;
1050  if (sizeof(Page) == xferraw(m_inpipe, p, sizeof(Page),
1051  ::read)) {
1052  plisttail = p->next();
1053  if (!p->size()) continue;
1054  // break in case of read error
1055  if (p->size() != xferraw(m_inpipe, p->begin(), p->size(),
1056  ::read)) break;
1057  }
1058  }
1059  } else {
1060  retVal = lenPageList(plisthead);
1061  }
1062  }
1063  // put list of pages we just received into correct lists (busy/free)
1064  if (plisthead) feedPageLists(plisthead);
1065  // ok, retVal contains the number of pages read, so put them on the
1066  // correct lists
1067  return retVal;
1068 }
1069 
1071 {
1072  struct pollfd fds;
1073  fds.fd = m_inpipe;
1074  fds.events = POLLIN;
1075  fds.revents = 0;
1076  unsigned retVal = 0;
1077  do {
1078  int rc = ::poll(&fds, 1, 0);
1079  if (0 > rc) {
1080  if (EINTR == errno) continue;
1081  break;
1082  }
1083  if (1 == retVal && fds.revents & POLLIN &&
1084  !(fds.revents & (POLLNVAL | POLLERR))) {
1085  // ok, we can read without blocking, so the other end has
1086  // something for us
1087  return recvpages();
1088  } else {
1089  break;
1090  }
1091  } while (true);
1092  return retVal;
1093 }
1094 
1096 {
1097  unsigned n = 0;
1098  for ( ; p; p = p->next()) ++n;
1099  return n;
1100 }
1101 
1103 {
1104  assert(plist);
1105  // get end of busy list
1106  Page *blend = m_busylist;
1107  while (blend && blend->next()) blend = blend->next();
1108  // ok, might have to send free pages to other end, and (if we do have to
1109  // send something to the other end) while we're at it, send any dirty
1110  // pages which are completely full, too
1111  Page *sendlisthead = 0, *sendlisttail = 0;
1112  // loop over plist
1113  while (plist) {
1114  Page* p = plist;
1115  plist = p->next();
1116  p->setNext(0);
1117  if (p->size()) {
1118  // busy page...
1119  p->pos() = 0;
1120  // put at end of busy list
1121  if (blend) blend->setNext(p);
1122  else m_busylist = p;
1123  blend = p;
1124  } else {
1125  // free page...
1126  // Very simple algorithm: once we're done with a page, we send it back
1127  // where it came from. If it's from our end, we put it on the free list, if
1128  // it's from the other end, we send it back.
1129  if ((isParent() && m_pages[p] >= PagesPerEnd) ||
1130  (isChild() && m_pages[p] < PagesPerEnd)) {
1131  // page "belongs" to other end
1132  if (!sendlisthead) sendlisthead = p;
1133  if (sendlisttail) sendlisttail->setNext(p);
1134  sendlisttail = p;
1135  } else {
1136  // add page to freelist
1137  p->setNext(m_freelist);
1138  m_freelist = p;
1139  }
1140  }
1141  }
1142  // check if we have to send stuff to the other end
1143  if (sendlisthead) {
1144  // go through our list of dirty pages, and see what we can
1145  // send along
1146  Page* dp;
1147  while ((dp = m_dirtylist) && dp->full()) {
1148  Page* p = dp;
1149  // move head of dirty list
1150  m_dirtylist = p->next();
1151  // queue for sending
1152  p->setNext(0);
1153  sendlisttail->setNext(p);
1154  sendlisttail = p;
1155  }
1156  // poll if the other end is still alive - this needs that we first
1157  // close the write pipe of the other end when the remote end of the
1158  // connection is shutting down in doClose; we'll see that because we
1159  // get a POLLHUP on our inpipe
1160  const int nfds = (m_outpipe == m_inpipe) ? 1 : 2;
1161  struct pollfd fds[2];
1162  fds[0].fd = m_outpipe;
1163  fds[0].events = fds[0].revents = 0;
1164  if (m_outpipe != m_inpipe) {
1165  fds[1].fd = m_inpipe;
1166  fds[1].events = fds[1].revents = 0;
1167  } else {
1168  fds[0].events |= POLLIN;
1169  }
1170  int retVal = 0;
1171  do {
1172  retVal = ::poll(fds, nfds, 0);
1173  if (0 > retVal && EINTR == errno)
1174  continue;
1175  break;
1176  } while (true);
1177  if (0 <= retVal) {
1178  bool ok = !(fds[0].revents & (POLLERR | POLLNVAL | POLLHUP));
1179  if (m_outpipe != m_inpipe) {
1180  ok = ok && !(fds[1].revents & (POLLERR | POLLNVAL | POLLHUP));
1181  } else {
1182  if (ok && fds[0].revents & POLLIN) {
1183  unsigned ret = recvpages();
1184  if (!ret) ok = false;
1185  }
1186  }
1187 
1188  if (ok) sendpages(sendlisthead);
1189  // (if the pipe is dead already, we don't care that we leak the
1190  // contents of the pages on the send list here, so that is why
1191  // there's no else clause here)
1192  } else {
1193  throw Exception("feedPageLists: poll", errno);
1194  }
1195  }
1196 }
1197 
1199 {
1200  assert(p);
1201  assert(p == m_freelist);
1202  // remove from freelist
1203  m_freelist = p->next();
1204  p->setNext(0);
1205  // append to dirty list
1206  Page* dl = m_dirtylist;
1207  while (dl && dl->next()) dl = dl->next();
1208  if (dl) dl->setNext(p);
1209  else m_dirtylist = p;
1210 }
1211 
1213 {
1214  // queue any pages available for reading we can without blocking
1216  Page* p;
1217  // if there are no busy pages, try to get them from the other end,
1218  // block if we have to...
1219  while (!(p = m_busylist)) if (!recvpages()) return 0;
1220  return p;
1221 }
1222 
1224 {
1225  // queue any pages available for reading we can without blocking
1227  Page* p = m_dirtylist;
1228  // go to end of dirty list
1229  if (p) while (p->next()) p = p->next();
1230  if (!p || p->full()) {
1231  // need to append free page, so get one
1232  while (!(p = m_freelist)) if (!recvpages()) return 0;
1233  markPageDirty(p);
1234  }
1235  return p;
1236 }
1237 
1239 { return doFlush(true); }
1240 
1241 void BidirMMapPipe::doFlush(bool forcePartialPages)
1242 {
1243  assert(!(m_flags & failbit));
1244  // build a list of pages to flush
1245  Page *flushlisthead = 0, *flushlisttail = 0;
1246  while (m_dirtylist) {
1247  Page* p = m_dirtylist;
1248  if (!forcePartialPages && !p->full()) break;
1249  // remove dirty page from dirty list
1250  m_dirtylist = p->next();
1251  p->setNext(0);
1252  // and send it to other end
1253  if (!flushlisthead) flushlisthead = p;
1254  if (flushlisttail) flushlisttail->setNext(p);
1255  flushlisttail = p;
1256  }
1257  if (flushlisthead) sendpages(flushlisthead);
1258 }
1259 
1261 {
1262  assert(!(m_flags & failbit));
1263  // join busy and dirty lists
1264  {
1265  Page *l = m_busylist;
1266  while (l && l->next()) l = l->next();
1267  if (l) l->setNext(m_dirtylist);
1268  else m_busylist = m_dirtylist;
1269  }
1270  // empty busy and dirty pages
1271  for (Page* p = m_busylist; p; p = p->next()) p->size() = 0;
1272  // put them on the free list
1274  m_busylist = m_dirtylist = 0;
1275 }
1276 
1278 {
1279  // queue all pages waiting for consumption in the pipe before we give an
1280  // answer
1282  size_type retVal = 0;
1283  for (Page* p = m_busylist; p; p = p->next())
1284  retVal += p->size() - p->pos();
1285  return retVal;
1286 }
1287 
1289 {
1290  // queue all pages waiting for consumption in the pipe before we give an
1291  // answer
1293  // check if we could write to the pipe without blocking (we need to know
1294  // because we might need to check if flushing of dirty pages would block)
1295  bool couldwrite = false;
1296  {
1297  struct pollfd fds;
1298  fds.fd = m_outpipe;
1299  fds.events = POLLOUT;
1300  fds.revents = 0;
1301  int retVal = 0;
1302  do {
1303  retVal = ::poll(&fds, 1, 0);
1304  if (0 > retVal) {
1305  if (EINTR == errno) continue;
1306  throw Exception("bytesWritableNonBlocking: poll", errno);
1307  }
1308  if (1 == retVal && fds.revents & POLLOUT &&
1309  !(fds.revents & (POLLNVAL | POLLERR | POLLHUP)))
1310  couldwrite = true;
1311  break;
1312  } while (true);
1313  }
1314  // ok, start counting bytes
1315  size_type retVal = 0;
1316  unsigned npages = 0;
1317  // go through the dirty list
1318  for (Page* p = m_dirtylist; p; p = p->next()) {
1319  ++npages;
1320  // if page only partially filled
1321  if (!p->full())
1322  retVal += p->free();
1323  if (npages >= FlushThresh && !couldwrite) break;
1324  }
1325  // go through the free list
1326  for (Page* p = m_freelist; p && (!m_dirtylist ||
1327  npages < FlushThresh || couldwrite); p = p->next()) {
1328  ++npages;
1329  retVal += Page::capacity();
1330  }
1331  return retVal;
1332 }
1333 
1335 {
1336  assert(!(m_flags & failbit));
1337  size_type nread = 0;
1338  unsigned char *ap = reinterpret_cast<unsigned char*>(addr);
1339  try {
1340  while (sz) {
1341  // find next page to read from
1342  Page* p = busypage();
1343  if (!p) {
1344  m_flags |= eofbit;
1345  return nread;
1346  }
1347  unsigned char* pp = p->begin() + p->pos();
1348  size_type csz = std::min(size_type(p->remaining()), sz);
1349  std::copy(pp, pp + csz, ap);
1350  nread += csz;
1351  ap += csz;
1352  sz -= csz;
1353  p->pos() += csz;
1354  assert(p->size() >= p->pos());
1355  if (p->size() == p->pos()) {
1356  // if no unread data remains, page is free
1357  m_busylist = p->next();
1358  p->setNext(0);
1359  p->size() = 0;
1360  feedPageLists(p);
1361  }
1362  }
1363  } catch (const Exception& e) {
1364  m_flags |= rderrbit;
1365  if (m_flags & exceptionsbit) throw e;
1366  }
1367  return nread;
1368 }
1369 
1371 {
1372  assert(!(m_flags & failbit));
1373  size_type written = 0;
1374  const unsigned char *ap = reinterpret_cast<const unsigned char*>(addr);
1375  try {
1376  while (sz) {
1377  // find next page to write to
1378  Page* p = dirtypage();
1379  if (!p) {
1380  m_flags |= eofbit;
1381  return written;
1382  }
1383  unsigned char* pp = p->begin() + p->size();
1384  size_type csz = std::min(size_type(p->free()), sz);
1385  std::copy(ap, ap + csz, pp);
1386  written += csz;
1387  ap += csz;
1388  p->size() += csz;
1389  sz -= csz;
1390  assert(p->capacity() >= p->size());
1391  if (p->full()) {
1392  // if page is full, see if we're above the flush threshold of
1393  // 3/4 of our pages
1395  doFlush(false);
1396  }
1397  }
1398  } catch (const Exception& e) {
1399  m_flags |= wrerrbit;
1400  if (m_flags & exceptionsbit) throw e;
1401  }
1402  return written;
1403 }
1404 
1406 {
1407  // go through pipes, and change flags where we already know without really
1408  // polling - stuff where we don't need poll to wait for its timeout in the
1409  // OS...
1410  bool canskiptimeout = false;
1411  std::vector<unsigned> masks(pipes.size(), ~(Readable | Writable));
1412  std::vector<unsigned>::iterator mit = masks.begin();
1413  for (PollVector::iterator it = pipes.begin(); pipes.end() != it;
1414  ++it, ++mit) {
1415  PollEntry& pe = *it;
1416  pe.revents = None;
1417  // null pipe pointer or closed pipe is invalid
1418  if (!pe.pipe || pe.pipe->closed()) pe.revents |= Invalid;
1419  // check for error
1420  if (pe.pipe->bad()) pe.revents |= Error;
1421  // check for end of file
1422  if (pe.pipe->eof()) pe.revents |= EndOfFile;
1423  // check if readable
1424  if (pe.events & Readable) {
1425  *mit |= Readable;
1426  if (pe.pipe->m_busylist) pe.revents |= Readable;
1427  }
1428  // check if writable
1429  if (pe.events & Writable) {
1430  *mit |= Writable;
1431  if (pe.pipe->m_freelist) {
1432  pe.revents |= Writable;
1433  } else {
1434  Page *dl = pe.pipe->m_dirtylist;
1435  while (dl && dl->next()) dl = dl->next();
1436  if (dl && dl->pos() < Page::capacity())
1437  pe.revents |= Writable;
1438  }
1439  }
1440  if (pe.revents) canskiptimeout = true;
1441  }
1442  // set up the data structures required for the poll syscall
1443  std::vector<pollfd> fds;
1444  fds.reserve(2 * pipes.size());
1445  std::map<int, PollEntry*> fds2pipes;
1446  for (PollVector::const_iterator it = pipes.begin();
1447  pipes.end() != it; ++it) {
1448  const PollEntry& pe = *it;
1449  struct pollfd tmp;
1450  fds2pipes.insert(std::make_pair((tmp.fd = pe.pipe->m_inpipe),
1451  const_cast<PollEntry*>(&pe)));
1452  tmp.events = tmp.revents = 0;
1453  // we always poll for readability; this allows us to queue pages
1454  // early
1455  tmp.events |= POLLIN;
1456  if (pe.pipe->m_outpipe != tmp.fd) {
1457  // ok, it's a pair of pipes
1458  fds.push_back(tmp);
1459  fds2pipes.insert(std::make_pair(
1460  unsigned(tmp.fd = pe.pipe->m_outpipe),
1461  const_cast<PollEntry*>(&pe)));
1462  tmp.events = 0;
1463 
1464  }
1465  if (pe.events & Writable) tmp.events |= POLLOUT;
1466  fds.push_back(tmp);
1467  }
1468  // poll
1469  int retVal = 0;
1470  do {
1471  retVal = ::poll(&fds[0], fds.size(), canskiptimeout ? 0 : timeout);
1472  if (0 > retVal) {
1473  if (EINTR == errno) continue;
1474  throw Exception("poll", errno);
1475  }
1476  break;
1477  } while (true);
1478  // fds may have changed state, so update...
1479  for (std::vector<pollfd>::iterator it = fds.begin();
1480  fds.end() != it; ++it) {
1481  pollfd& fe = *it;
1482  //if (!fe.revents) continue;
1483  --retVal;
1484  PollEntry& pe = *fds2pipes[fe.fd];
1485 oncemore:
1486  if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_inpipe)
1487  pe.revents |= ReadInvalid;
1488  if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_outpipe)
1489  pe.revents |= WriteInvalid;
1490  if (fe.revents & POLLERR && fe.fd == pe.pipe->m_inpipe)
1491  pe.revents |= ReadError;
1492  if (fe.revents & POLLERR && fe.fd == pe.pipe->m_outpipe)
1493  pe.revents |= WriteError;
1494  if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_inpipe)
1495  pe.revents |= ReadEndOfFile;
1496  if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_outpipe)
1497  pe.revents |= WriteEndOfFile;
1498  if ((fe.revents & POLLIN) && fe.fd == pe.pipe->m_inpipe &&
1499  !(fe.revents & (POLLNVAL | POLLERR))) {
1500  // ok, there is at least one page for us to receive from the
1501  // other end
1502  if (0 == pe.pipe->recvpages()) continue;
1503  // more pages there?
1504  do {
1505  int tmp = ::poll(&fe, 1, 0);
1506  if (tmp > 0) goto oncemore; // yippie! I don't even feel bad!
1507  if (0 > tmp) {
1508  if (EINTR == errno) continue;
1509  throw Exception("poll", errno);
1510  }
1511  break;
1512  } while (true);
1513  }
1514  if (pe.pipe->m_busylist) pe.revents |= Readable;
1515  if (fe.revents & POLLOUT && fe.fd == pe.pipe->m_outpipe) {
1516  if (pe.pipe->m_freelist) {
1517  pe.revents |= Writable;
1518  } else {
1519  Page *dl = pe.pipe->m_dirtylist;
1520  while (dl && dl->next()) dl = dl->next();
1521  if (dl && dl->pos() < Page::capacity())
1522  pe.revents |= Writable;
1523  }
1524  }
1525  }
1526  // apply correct masks, and count pipes with pending events
1527  int npipes = 0;
1528  mit = masks.begin();
1529  for (PollVector::iterator it = pipes.begin();
1530  pipes.end() != it; ++it, ++mit)
1531  if ((it->revents &= *mit)) ++npipes;
1532  return npipes;
1533 }
1534 
1536 {
1537  size_t sz = std::strlen(str);
1538  *this << sz;
1539  if (sz) write(str, sz);
1540  return *this;
1541 }
1542 
1544 {
1545  size_t sz = 0;
1546  *this >> sz;
1547  if (good() && !eof()) {
1548  str = reinterpret_cast<char*>(std::realloc(str, sz + 1));
1549  if (!str) throw Exception("realloc", errno);
1550  if (sz) read(str, sz);
1551  str[sz] = 0;
1552  }
1553  return *this;
1554 }
1555 
1557 {
1558  size_t sz = str.size();
1559  *this << sz;
1560  write(str.data(), sz);
1561  return *this;
1562 }
1563 
1565 {
1566  str.clear();
1567  size_t sz = 0;
1568  *this >> sz;
1569  if (good() && !eof()) {
1570  str.reserve(sz);
1571  for (unsigned char c; sz--; str.push_back(c)) *this >> c;
1572  }
1573  return *this;
1574 }
1575 
1577 
1578 #ifdef TEST_BIDIRMMAPPIPE
1579 using namespace RooFit;
1580 
1581 int simplechild(BidirMMapPipe& pipe)
1582 {
1583  // child does an echo loop
1584  while (pipe.good() && !pipe.eof()) {
1585  // read a string
1586  std::string str;
1587  pipe >> str;
1588  if (!pipe) return -1;
1589  if (pipe.eof()) break;
1590  if (!str.empty()) {
1591  std::cout << "[CHILD] : read: " << str << std::endl;
1592  str = "... early in the morning?";
1593  }
1594  pipe << str << BidirMMapPipe::flush;
1595  // did our parent tell us to shut down?
1596  if (str.empty()) break;
1597  if (!pipe) return -1;
1598  if (pipe.eof()) break;
1599  std::cout << "[CHILD] : wrote: " << str << std::endl;
1600  }
1601  pipe.close();
1602  return 0;
1603 }
1604 
1605 #include <sstream>
1606 int randomchild(BidirMMapPipe& pipe)
1607 {
1608  // child sends out something at random intervals
1609  ::srand48(::getpid());
1610  {
1611  // wait for parent's go ahead signal
1612  std::string s;
1613  pipe >> s;
1614  }
1615  // no shutdown sequence needed on this side - we're producing the data,
1616  // and the parent can just read until we're done (when it'll get EOF)
1617  for (int i = 0; i < 5; ++i) {
1618  // sleep a random time between 0 and .9 seconds
1619  ::usleep(int(1e6 * ::drand48()));
1620  std::ostringstream buf;
1621  buf << "child pid " << ::getpid() << " sends message " << i;
1622  std::string str = buf.str();
1623  std::cout << "[CHILD] : " << str << std::endl;
1624  pipe << str << BidirMMapPipe::flush;
1625  if (!pipe) return -1;
1626  if (pipe.eof()) break;
1627  }
1628  // tell parent we're shutting down
1629  pipe << "" << BidirMMapPipe::flush;
1630  // wait for parent to acknowledge
1631  std::string s;
1632  pipe >> s;
1633  pipe.close();
1634  return 0;
1635 }
1636 
1637 int benchchildrtt(BidirMMapPipe& pipe)
1638 {
1639  // child does the equivalent of listening for pings and sending the
1640  // packet back
1641  char* str = 0;
1642  while (pipe && !pipe.eof()) {
1643  pipe >> str;
1644  if (!pipe) {
1645  std::free(str);
1646  pipe.close();
1647  return -1;
1648  }
1649  if (pipe.eof()) break;
1650  pipe << str << BidirMMapPipe::flush;
1651  // if we have just completed the shutdown handshake, we break here
1652  if (!std::strlen(str)) break;
1653  }
1654  std::free(str);
1655  pipe.close();
1656  return 0;
1657 }
1658 
1659 int benchchildsink(BidirMMapPipe& pipe)
1660 {
1661  // child behaves like a sink
1662  char* str = 0;
1663  while (pipe && !pipe.eof()) {
1664  pipe >> str;
1665  if (!std::strlen(str)) break;
1666  }
1667  pipe << "" << BidirMMapPipe::flush;
1668  std::free(str);
1669  pipe.close();
1670  return 0;
1671 }
1672 
1673 int benchchildsource(BidirMMapPipe& pipe)
1674 {
1675  // child behaves like a source
1676  char* str = 0;
1677  for (unsigned i = 0; i <= 24; ++i) {
1678  str = reinterpret_cast<char*>(std::realloc(str, (1 << i) + 1));
1679  std::memset(str, '4', 1 << i);
1680  str[1 << i] = 0;
1681  for (unsigned j = 0; j < 1 << 7; ++j) {
1682  pipe << str;
1683  if (!pipe || pipe.eof()) {
1684  std::free(str);
1685  pipe.close();
1686  return -1;
1687  }
1688  }
1689  // tell parent we're done with this block size
1690  pipe << "" << BidirMMapPipe::flush;
1691  }
1692  // tell parent to shut down
1693  pipe << "" << BidirMMapPipe::flush;
1694  std::free(str);
1695  pipe.close();
1696  return 0;
1697 }
1698 
1699 BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
1700 {
1701  // create a pipe with the given child at the remote end
1702  BidirMMapPipe *p = new BidirMMapPipe();
1703  if (p->isChild()) {
1704  int retVal = childexec(*p);
1705  delete p;
1706  std::exit(retVal);
1707  }
1708  return p;
1709 }
1710 
1711 #include <sys/time.h>
1712 #include <iomanip>
1713 int main()
1714 {
1715  // simple echo loop test
1716  {
1717  std::cout << "[PARENT]: simple challenge-response test, "
1718  "one child:" << std::endl;
1719  BidirMMapPipe* pipe = spawnChild(simplechild);
1720  for (int i = 0; i < 5; ++i) {
1721  std::string str("What shall we do with a drunken sailor...");
1722  *pipe << str << BidirMMapPipe::flush;
1723  if (!*pipe) return -1;
1724  std::cout << "[PARENT]: wrote: " << str << std::endl;
1725  *pipe >> str;
1726  if (!*pipe) return -1;
1727  std::cout << "[PARENT]: read: " << str << std::endl;
1728  }
1729  // send shutdown string
1730  *pipe << "" << BidirMMapPipe::flush;
1731  // wait for shutdown handshake
1732  std::string s;
1733  *pipe >> s;
1734  int retVal = pipe->close();
1735  std::cout << "[PARENT]: exit status of child: " << retVal <<
1736  std::endl;
1737  if (retVal) return retVal;
1738  delete pipe;
1739  }
1740  // simple poll test - children send 5 results in random intervals
1741  {
1742  unsigned nch = 20;
1743  std::cout << std::endl << "[PARENT]: polling test, " << nch <<
1744  " children:" << std::endl;
1745  typedef BidirMMapPipe::PollEntry PollEntry;
1746  // poll data structure
1748  pipes.reserve(nch);
1749  // spawn children
1750  for (unsigned i = 0; i < nch; ++i) {
1751  std::cout << "[PARENT]: spawning child " << i << std::endl;
1752  pipes.push_back(PollEntry(spawnChild(randomchild),
1754  }
1755  // wake children up
1756  std::cout << "[PARENT]: waking up children" << std::endl;
1757  for (unsigned i = 0; i < nch; ++i)
1758  *pipes[i].pipe << "" << BidirMMapPipe::flush;
1759  std::cout << "[PARENT]: waiting for events on children's pipes" << std::endl;
1760  // while at least some children alive
1761  while (!pipes.empty()) {
1762  // poll, wait until status change (infinite timeout)
1763  int npipes = BidirMMapPipe::poll(pipes, -1);
1764  // scan for pipes with changed status
1765  for (std::vector<PollEntry>::iterator it = pipes.begin();
1766  npipes && pipes.end() != it; ) {
1767  if (!it->revents) {
1768  // unchanged, next one
1769  ++it;
1770  continue;
1771  }
1772  --npipes; // maybe we can stop early...
1773  // read from pipes which are readable
1774  if (it->revents & BidirMMapPipe::Readable) {
1775  std::string s;
1776  *(it->pipe) >> s;
1777  if (!s.empty()) {
1778  std::cout << "[PARENT]: Read from pipe " << it->pipe <<
1779  ": " << s << std::endl;
1780  ++it;
1781  continue;
1782  } else {
1783  // child is shutting down...
1784  *(it->pipe) << "" << BidirMMapPipe::flush;
1785  goto childcloses;
1786  }
1787  }
1788  // retire pipes with error or end-of-file condition
1789  if (it->revents & (BidirMMapPipe::Error |
1792  std::cerr << "[DEBUG]: Event on pipe " << it->pipe <<
1793  " revents" <<
1794  ((it->revents & BidirMMapPipe::Readable) ? " Readable" : "") <<
1795  ((it->revents & BidirMMapPipe::Writable) ? " Writable" : "") <<
1796  ((it->revents & BidirMMapPipe::ReadError) ? " ReadError" : "") <<
1797  ((it->revents & BidirMMapPipe::WriteError) ? " WriteError" : "") <<
1798  ((it->revents & BidirMMapPipe::ReadEndOfFile) ? " ReadEndOfFile" : "") <<
1799  ((it->revents & BidirMMapPipe::WriteEndOfFile) ? " WriteEndOfFile" : "") <<
1800  ((it->revents & BidirMMapPipe::ReadInvalid) ? " ReadInvalid" : "") <<
1801  ((it->revents & BidirMMapPipe::WriteInvalid) ? " WriteInvalid" : "") <<
1802  std::endl;
1803 childcloses:
1804  int retVal = it->pipe->close();
1805  std::cout << "[PARENT]: child exit status: " <<
1806  retVal << ", number of children still alive: " <<
1807  (pipes.size() - 1) << std::endl;
1808  if (retVal) return retVal;
1809  delete it->pipe;
1810  it = pipes.erase(it);
1811  continue;
1812  }
1813  }
1814  }
1815  }
1816  // little benchmark - round trip time
1817  {
1818  std::cout << std::endl << "[PARENT]: benchmark: round-trip times vs block size" << std::endl;
1819  for (unsigned i = 0; i <= 24; ++i) {
1820  char *s = new char[1 + (1 << i)];
1821  std::memset(s, 'A', 1 << i);
1822  s[1 << i] = 0;
1823  const unsigned n = 1 << 7;
1824  double avg = 0., min = 1e42, max = -1e42;
1825  BidirMMapPipe *pipe = spawnChild(benchchildrtt);
1826  for (unsigned j = n; j--; ) {
1827  struct timeval t1;
1828  ::gettimeofday(&t1, 0);
1829  *pipe << s << BidirMMapPipe::flush;
1830  if (!*pipe || pipe->eof()) break;
1831  *pipe >> s;
1832  if (!*pipe || pipe->eof()) break;
1833  struct timeval t2;
1834  ::gettimeofday(&t2, 0);
1835  t2.tv_sec -= t1.tv_sec;
1836  t2.tv_usec -= t1.tv_usec;
1837  double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1838  if (dt < min) min = dt;
1839  if (dt > max) max = dt;
1840  avg += dt;
1841  }
1842  // send a shutdown string
1843  *pipe << "" << BidirMMapPipe::flush;
1844  // get child's shutdown ok
1845  *pipe >> s;
1846  avg /= double(n);
1847  avg *= 1e6; min *= 1e6; max *= 1e6;
1848  int retVal = pipe->close();
1849  if (retVal) {
1850  std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1851  return retVal;
1852  }
1853  delete pipe;
1854  // there is a factor 2 in the formula for the transfer rate below,
1855  // because we transfer data of twice the size of the block - once
1856  // to the child, and once for the return trip
1857  std::cout << "block size " << std::setw(9) << (1 << i) <<
1858  " avg " << std::setw(7) << avg << " us min " <<
1859  std::setw(7) << min << " us max " << std::setw(7) << max <<
1860  "us speed " << std::setw(9) <<
1861  2. * (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1862  " MB/s" << std::endl;
1863  delete[] s;
1864  }
1865  std::cout << "[PARENT]: all children had exit code 0" << std::endl;
1866  }
1867  // little benchmark - child as sink
1868  {
1869  std::cout << std::endl << "[PARENT]: benchmark: raw transfer rate with child as sink" << std::endl;
1870  for (unsigned i = 0; i <= 24; ++i) {
1871  char *s = new char[1 + (1 << i)];
1872  std::memset(s, 'A', 1 << i);
1873  s[1 << i] = 0;
1874  const unsigned n = 1 << 7;
1875  double avg = 0., min = 1e42, max = -1e42;
1876  BidirMMapPipe *pipe = spawnChild(benchchildsink);
1877  for (unsigned j = n; j--; ) {
1878  struct timeval t1;
1879  ::gettimeofday(&t1, 0);
1880  // streaming mode - we do not flush here
1881  *pipe << s;
1882  if (!*pipe || pipe->eof()) break;
1883  struct timeval t2;
1884  ::gettimeofday(&t2, 0);
1885  t2.tv_sec -= t1.tv_sec;
1886  t2.tv_usec -= t1.tv_usec;
1887  double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1888  if (dt < min) min = dt;
1889  if (dt > max) max = dt;
1890  avg += dt;
1891  }
1892  // send a shutdown string
1893  *pipe << "" << BidirMMapPipe::flush;
1894  // get child's shutdown ok
1895  *pipe >> s;
1896  avg /= double(n);
1897  avg *= 1e6; min *= 1e6; max *= 1e6;
1898  int retVal = pipe->close();
1899  if (retVal) {
1900  std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1901  return retVal;
1902  }
1903  delete pipe;
1904  std::cout << "block size " << std::setw(9) << (1 << i) <<
1905  " avg " << std::setw(7) << avg << " us min " <<
1906  std::setw(7) << min << " us max " << std::setw(7) << max <<
1907  "us speed " << std::setw(9) <<
1908  (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1909  " MB/s" << std::endl;
1910  delete[] s;
1911  }
1912  std::cout << "[PARENT]: all children had exit code 0" << std::endl;
1913  }
1914  // little benchmark - child as source
1915  {
1916  std::cout << std::endl << "[PARENT]: benchmark: raw transfer rate with child as source" << std::endl;
1917  char *s = 0;
1918  double avg = 0., min = 1e42, max = -1e42;
1919  unsigned n = 0, bsz = 0;
1920  BidirMMapPipe *pipe = spawnChild(benchchildsource);
1921  while (*pipe && !pipe->eof()) {
1922  struct timeval t1;
1923  ::gettimeofday(&t1, 0);
1924  // streaming mode - we do not flush here
1925  *pipe >> s;
1926  if (!*pipe || pipe->eof()) break;
1927  struct timeval t2;
1928  ::gettimeofday(&t2, 0);
1929  t2.tv_sec -= t1.tv_sec;
1930  t2.tv_usec -= t1.tv_usec;
1931  double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1932  if (std::strlen(s)) {
1933  ++n;
1934  if (dt < min) min = dt;
1935  if (dt > max) max = dt;
1936  avg += dt;
1937  bsz = std::strlen(s);
1938  } else {
1939  if (!n) break;
1940  // next block size
1941  avg /= double(n);
1942  avg *= 1e6; min *= 1e6; max *= 1e6;
1943 
1944  std::cout << "block size " << std::setw(9) << bsz <<
1945  " avg " << std::setw(7) << avg << " us min " <<
1946  std::setw(7) << min << " us max " << std::setw(7) <<
1947  max << "us speed " << std::setw(9) <<
1948  (double(bsz) / double(1 << 20) / (1e-6 * avg)) <<
1949  " MB/s" << std::endl;
1950  n = 0;
1951  avg = 0.;
1952  min = 1e42;
1953  max = -1e42;
1954  }
1955  }
1956  int retVal = pipe->close();
1957  std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1958  if (retVal) return retVal;
1959  delete pipe;
1960  std::free(s);
1961  }
1962  return 0;
1963 }
1964 #endif // TEST_BIDIRMMAPPIPE
1965 #endif // _WIN32
1966 
1967 // vim: ft=cpp:sw=4:tw=78
std::size_t size_type
type used to represent sizes
PagePool * m_parent
parent page pool
Definition: BidirMMapPipe.h:68
unsigned char m_npages
length in pages
static size_type xferraw(int fd, void *addr, size_type len, ssize_t(*xferfn)(int, void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
BidirMMapPipe * pipe
pipe of interest
pipe error Write end
bool eof() const
true if end-of-file
static Vc_ALWAYS_INLINE int_v min(const int_v &x, const int_v &y)
Definition: vector.h:433
error reporting with exceptions
static double p3(double t, double a, double b, double c, double d)
unsigned revents
events that happened (or'ed bitmask)
size_type write(const void *addr, size_type sz)
wirte to pipe
read pipe in end-of-file state
double write(int n, const std::string &file_name, const std::string &vector_type, int compress=0)
writing
ClassImp(TSeqCollection) Int_t TSeqCollection TIter next(this)
Return index of object in collection.
pages per pipe end
bool isChild() const
return if this end of the pipe is the child end
handle class for a number of Pages
header file for BidirMMapPipe, a class which forks off a child process and serves as communications c...
PageChunk(const PageChunk &)
forbid copying
Definition: BidirMMapPipe.h:80
impl * m_pimpl
pointer to implementation
ROOT::R::TRInterface & Exception()
Definition: Exception.C:4
unsigned m_refcnt
reference counter
#define assert(cond)
Definition: unittest.h:542
Page * m_freelist
linked list: free pages
bool bad() const
true on I/O error
write pipe in end-of-file state
unsigned npages() const
return number of pages accessible
don't know yet what'll work
Definition: BidirMMapPipe.h:48
BidirMMapPipeException Exception
convenience typedef
Definition: BidirMMapPipe.h:61
namespace for implementation details of BidirMMapPipe
unsigned events
events of interest (or'ed bitmask)
BidirMMapPipe & operator<<(const char *str)
write a C-style string
void doFlush(bool forcePartialPages=true)
perform the flush
read end of pipe invalid
static MMapVariety s_mmapworks
mmap variety that works on this system
Definition: BidirMMapPipe.h:58
unsigned m_nUsedGrp
number of used page groups
Definition: BidirMMapPipe.h:70
size_t
Definition: TBuffer.cxx:28
pid_t m_childPid
pid of the child (zero if we're child)
TLatex * t1
Definition: textangle.C:20
pipe error read end
void markPageDirty(Page *p)
put on dirty pages list
~BidirMMapPipe()
destructor
unsigned nPagesPerGroup() const
return number of pages per page group
Pages & operator=(const Pages &other)
assignment operator
class representing a chunk of pages
Definition: BidirMMapPipe.h:44
Page * m_busylist
linked list: busy pages (data to be read)
write end of pipe invalid
bool good() const
status of stream is good
size_type read(void *addr, size_type sz)
read from pipe
Vc_ALWAYS_INLINE void free(T *p)
Frees memory that was allocated with Vc::malloc.
Definition: memory.h:94
static MMapVariety mmapVariety()
return mmap variety support found
Definition: BidirMMapPipe.h:87
void flush()
flush buffers with unwritten data
BidirMMapPipe_impl::Page Page
convenience typedef for Page
static unsigned pagesize()
return page size
Page * m_dirtylist
linked list: dirty pages (data to be sent)
void sendpages(Page *plist)
send page(s) to the other end (may block)
static double p2(double t, double a, double b, double c)
unsigned len() const
return length of chunk
Page * page(unsigned pgno) const
return page number pageno
pipe can be written to
Page * busypage()
get a busy page to read data from (may block)
if(pyself &&pyself!=Py_None)
size_type bytesReadableNonBlocking()
number of bytes that can be read without blocking
Pages()
default constructor
int close()
flush buffers, close pipe
void swap(Pages &other)
swap with other's contents
Page * dirtypage()
get a dirty page to write data to (may block)
void purge()
purge buffered data waiting to be read and/or written
std::vector< PollEntry > PollVector
convenience typedef for poll() interface
static int debugflag()
return the current setting of the debug flag
unsigned m_nPgPerGrp
number of pages per group
Definition: BidirMMapPipe.h:69
static const char * what
Definition: stlLoader.cc:6
static unsigned lenPageList(const Page *list)
return length of a page list
end of file reached
Pages pop()
pop a group of pages off the free list
logical failure (e.g. pipe closed)
Double_t length(const TVector2 &v)
Definition: CsgOps.cxx:347
static BidirMMapPipe_impl::PagePool & pagepool()
return page pool
int m_outpipe
pipe end to which data may be written
bool contains(const Pages &p) const
return if p is contained in this PageChunk
void * m_begin
pointer to start of mmapped area
Definition: BidirMMapPipe.h:63
unsigned recvpages_nonblock()
receive pages from other end (non-blocking)
int doClose(bool force, bool holdlock=false)
close the pipe (no flush if forced)
#define END_NAMESPACE_ROOFIT
TLine * l
Definition: textangle.C:4
BidirMMapPipe creates a bidirectional channel between the current process and a child it forks...
static double p1(double t, double a, double b)
static void domunmap(void *p, unsigned len)
munmap pages p, len is length of mmapped area in bytes
static pthread_mutex_t s_openpipesmutex
protects s_openpipes
size_type bytesWritableNonBlocking()
number of bytes that can be written without blocking
bool isParent() const
return if this end of the pipe is the parent end
static int poll(PollVector &pipes, int timeout)
poll a set of pipes for events (ready to read from, ready to write to, error)
#define blend
static BidirMMapPipe_impl::PagePool * s_pagepool
pool of mmapped pages
pages shared (child + parent)
bool empty() const
return true if no used page groups in this chunk
int main(int argc, char *argv[])
Definition: python64.c:14
bool closed() const
true if closed
MMapVariety
type of mmap support found
Definition: BidirMMapPipe.h:47
void zap(Pages &p)
free all pages except for those pointed to by p
double f(double x)
for poll() interface
unsigned recvpages()
receive a pages from the other end (may block), queue them
void fill()
Definition: utils.cpp:314
static void * dommap(unsigned len)
mmap pages, len is length of mmapped area in bytes
static unsigned getPageSize()
determine page size at run time
static void teardownall(void)
cleanup routine - at exit, we want our children to get a SIGTERM...
std::list< void * > m_freelist
free pages list
Definition: BidirMMapPipe.h:67
static Vc_ALWAYS_INLINE int_v max(const int_v &x, const int_v &y)
Definition: vector.h:440
#define name(a, b)
Definition: linkTestLib0.cpp:5
Binding & operator=(OUT(*fun)(void))
BidirMMapPipe & operator>>(char *(&str))
read a C-style string
void * m_end
pointer one behind end of mmapped area
Definition: BidirMMapPipe.h:64
PageChunk * m_parent
pointer to parent pool
BidirMMapPipe_impl::BidirMMapPipeException Exception
convenience typedef for BidirMMapPipeException
typedef void((*Func_t)())
void push(const Pages &p)
push a group of pages onto the free list
int m_inpipe
pipe end from which data may be read
void feedPageLists(Page *plist)
"feed" the busy and free lists with a list of pages
static unsigned long masks[]
Definition: gifencode.c:211
Page * m_pages
pointer to first page
BidirMMapPipe_impl::Pages m_pages
mmapped pages
static std::list< BidirMMapPipe * > s_openpipes
list of open BidirMMapPipes
BidirMMapPipe(bool useExceptions=true, bool useSocketpair=false)
constructor (forks!)
static unsigned s_pagepoolrefcnt
page pool reference counter
static unsigned s_pagesize
system page size (run-time determined)
Definition: BidirMMapPipe.h:56
pipe has data for reading
Vc_ALWAYS_INLINE_L T *Vc_ALWAYS_INLINE_R malloc(size_t n)
Allocates memory on the Heap with alignment and padding suitable for vectorized access.
Definition: memory.h:67
nothing special on this pipe
const Int_t n
Definition: legend1.C:16
int m_flags
flags (e.g. end of file)
mmap doesn't work, have to copy back and forth
Definition: BidirMMapPipe.h:49
static unsigned pagesize()
return the page size of the system
Definition: BidirMMapPipe.h:85
unsigned pageno(Page *p) const
perform page to page number mapping
#define BEGIN_NAMESPACE_ROOFIT