Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
BidirMMapPipe.cxx
Go to the documentation of this file.
1/** @file BidirMMapPipe.cxx
2 *
3 * implementation of BidirMMapPipe, a class which forks off a child process
4 * and serves as communications channel between parent and child
5 *
6 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
7 * @date 2013-07-07
8 */
9#ifndef _WIN32
10#include <map>
11#include <cerrno>
12#include <limits>
13#include <string>
14#include <cstdlib>
15#include <cstring>
16#include <cassert>
17#include <iostream>
18#include <algorithm>
19#include <exception>
20
21#include <poll.h>
22#include <fcntl.h>
23#include <signal.h>
24#include <unistd.h>
25#include <pthread.h>
26#include <sys/mman.h>
27#include <sys/stat.h>
28#include <sys/wait.h>
29#include <sys/socket.h>
30
31#include "BidirMMapPipe.h"
32
33#define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
34#define END_NAMESPACE_ROOFIT }
35
37
38/// namespace for implementation details of BidirMMapPipe
40 /** @brief exception to throw if low-level OS calls go wrong
41 *
42 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
43 * @date 2013-07-07
44 */
45 class BidirMMapPipeException : public std::exception
46 {
47 private:
48 enum {
49 s_sz = 256 ///< length of buffer
50 };
51 char m_buf[s_sz]; ///< buffer containing the error message
52
53 /// for the POSIX version of strerror_r
54 static int dostrerror_r(int err, char* buf, std::size_t sz,
55 int (*f)(int, char*, std::size_t))
56 { return f(err, buf, sz); }
57 /// for the GNU version of strerror_r
58 static int dostrerror_r(int, char*, std::size_t,
59 char* (*f)(int, char*, std::size_t));
60 public:
61 /// constructor taking error code, hint on operation (msg)
62 BidirMMapPipeException(const char* msg, int err);
63 /// return a destcription of what went wrong
64 virtual const char* what() const noexcept { return m_buf; }
65 };
66
67 BidirMMapPipeException::BidirMMapPipeException(const char* msg, int err)
68 {
69 std::size_t msgsz = std::strlen(msg);
70 if (msgsz) {
71 msgsz = std::min(msgsz, std::size_t(s_sz));
72 std::copy(msg, msg + msgsz, m_buf);
73 if (msgsz < s_sz) { m_buf[msgsz] = ':'; ++msgsz; }
74 if (msgsz < s_sz) { m_buf[msgsz] = ' '; ++msgsz; }
75 }
76 if (msgsz < s_sz) {
77 // UGLY: GNU and POSIX cannot agree on prototype and behaviour, so
78 // have to sort it out with overloads
79 dostrerror_r(err, &m_buf[msgsz], s_sz - msgsz, ::strerror_r);
80 }
81 m_buf[s_sz - 1] = 0; // enforce zero-termination
82 }
83
84 int BidirMMapPipeException::dostrerror_r(int err, char* buf,
85 std::size_t sz, char* (*f)(int, char*, std::size_t))
86 {
87 buf[0] = 0;
88 char *tmp = f(err, buf, sz);
89 if (tmp && tmp != buf) {
90 std::strncpy(buf, tmp, sz);
91 buf[sz - 1] = 0;
92 if (std::strlen(tmp) > sz - 1) return ERANGE;
93 }
94 return 0;
95 }
96
97 /** @brief class representing the header structure in an mmapped page
98 *
99 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
100 * @date 2013-07-07
101 *
102 * contains a field to put pages into a linked list, a field for the size
103 * of the data being transmitted, and a field for the position until which
104 * the data has been read
105 */
106 class Page
107 {
108 private:
109 // use as small a data type as possible to maximise payload area
110 // of pages
111 short m_next; ///< next page in list (in pagesizes)
112 unsigned short m_size; ///< size of payload (in bytes)
113 unsigned short m_pos; ///< index of next byte in payload area
114 /// copy construction forbidden
115 Page(const Page&) {}
116 /// assigment forbidden
117 Page& operator=(const Page&) = delete;
118 public:
119 /// constructor
120 Page() : m_next(0), m_size(0), m_pos(0)
121 {
122 // check that short is big enough - must be done at runtime
123 // because the page size is not known until runtime
124 assert(std::numeric_limits<unsigned short>::max() >=
126 }
127 /// set pointer to next page
128 void setNext(const Page* p);
129 /// return pointer to next page
130 Page* next() const;
131 /// return reference to size field
132 unsigned short& size() { return m_size; }
133 /// return size (of payload data)
134 unsigned size() const { return m_size; }
135 /// return reference to position field
136 unsigned short& pos() { return m_pos; }
137 /// return position
138 unsigned pos() const { return m_pos; }
139 /// return pointer to first byte in payload data area of page
140 inline unsigned char* begin() const
141 { return reinterpret_cast<unsigned char*>(const_cast<Page*>(this))
142 + sizeof(Page); }
143 /// return pointer to first byte in payload data area of page
144 inline unsigned char* end() const
145 { return reinterpret_cast<unsigned char*>(const_cast<Page*>(this))
147 /// return the capacity of the page
148 static unsigned capacity()
149 { return PageChunk::pagesize() - sizeof(Page); }
150 /// true if page empty
151 bool empty() const { return !m_size; }
152 /// true if page partially filled
153 bool filled() const { return !empty(); }
154 /// free space left (to be written to)
155 unsigned free() const { return capacity() - m_size; }
156 /// bytes remaining to be read
157 unsigned remaining() const { return m_size - m_pos; }
158 /// true if page completely full
159 bool full() const { return !free(); }
160 };
161
162 void Page::setNext(const Page* p)
163 {
164 if (!p) {
165 m_next = 0;
166 } else {
167 const char* p1 = reinterpret_cast<char*>(this);
168 const char* p2 = reinterpret_cast<const char*>(p);
169 std::ptrdiff_t tmp = p2 - p1;
170 // difference must be divisible by page size
171 assert(!(tmp % PageChunk::pagesize()));
172 tmp /= static_cast<std::ptrdiff_t>(PageChunk::pagesize());
173 m_next = tmp;
174 // no truncation when saving in a short
175 assert(m_next == tmp);
176 // final check: next() must return p
177 assert(next() == p);
178 }
179 }
180
181 Page* Page::next() const
182 {
183 if (!m_next) return 0;
184 char* ptmp = reinterpret_cast<char*>(const_cast<Page*>(this));
185 ptmp += std::ptrdiff_t(m_next) * PageChunk::pagesize();
186 return reinterpret_cast<Page*>(ptmp);
187 }
188
189 /** @brief class representing a page pool
190 *
191 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
192 * @date 2013-07-24
193 *
194 * pool of mmapped pages (on systems which support it, on all others, the
195 * functionality is emulated with dynamically allocated memory)
196 *
197 * in most operating systems there is a limit to how many mappings any one
198 * process is allowed to request; for this reason, we mmap a relatively
199 * large amount up front, and then carve off little pieces as we need them
200 *
201 * Moreover, some systems have too large a physical page size in their MMU
202 * for the code to handle (we want offsets and lengths to fit into 16
203 * bits), so we carve such big physical pages into smaller logical Pages
204 * if needed. The largest logical page size is currently 16 KiB.
205 */
206 class PagePool {
207 private:
208 /// convenience typedef
209 typedef BidirMMapPipeException Exception;
210
211 enum {
212 minsz = 7, ///< minimum chunk size (just below 1 << minsz bytes)
213 maxsz = 20, ///< maximum chunk size (just below 1 << maxsz bytes)
214 szincr = 1 ///< size class increment (sz = 1 << (minsz + k * szincr))
215 };
216 /// a chunk of memory in the pool
217 typedef BidirMMapPipe_impl::PageChunk Chunk;
218 /// list of chunks
219 typedef std::list<Chunk*> ChunkList;
220
222 public:
223 /// convenience typedef
224 typedef PageChunk::MMapVariety MMapVariety;
225 /// constructor
226 PagePool(unsigned nPagesPerGroup);
227 /// destructor
228 ~PagePool();
229 /// pop a free element out of the pool
230 Pages pop();
231
232 /// return (logical) page size of the system
233 static unsigned pagesize() { return PageChunk::pagesize(); }
234 /// return variety of mmap supported on the system
235 static MMapVariety mmapVariety()
236 { return PageChunk::mmapVariety(); }
237
238 /// return number of pages per group (ie. as returned by pop())
239 unsigned nPagesPerGroup() const { return m_nPgPerGrp; }
240
241 /// zap the pool (unmap all but Pages p)
242 void zap(Pages& p);
243
244 private:
245 /// list of chunks used by the pool
246 ChunkList m_chunks;
247 /// list of chunks used by the pool which are not full
248 ChunkList m_freelist;
249 /// chunk size map (histogram of chunk sizes)
250 unsigned m_szmap[(maxsz - minsz) / szincr];
251 /// current chunk size
252 int m_cursz;
253 /// page group size
254 unsigned m_nPgPerGrp;
255
256 /// adjust _cursz to current largest block
257 void updateCurSz(int sz, int incr);
258 /// find size of next chunk to allocate (in a hopefully smart way)
259 int nextChunkSz() const;
260 /// release a chunk
261 void putOnFreeList(Chunk* chunk);
262 /// release a chunk
263 void release(Chunk* chunk);
264 };
265
266 Pages::Pages(PageChunk* parent, Page* pages, unsigned npg) :
267 m_pimpl(new impl)
268 {
269 assert(npg < 256);
270 m_pimpl->m_parent = parent;
271 m_pimpl->m_pages = pages;
272 m_pimpl->m_refcnt = 1;
273 m_pimpl->m_npages = npg;
274 /// initialise pages
275 for (unsigned i = 0; i < m_pimpl->m_npages; ++i) new(page(i)) Page();
276 }
277
279 unsigned PageChunk::s_pagesize = std::min(PageChunk::s_physpgsz, 16384u);
281
283 {
284 if (m_pimpl && !--(m_pimpl->m_refcnt)) {
285 if (m_pimpl->m_parent) m_pimpl->m_parent->push(*this);
286 delete m_pimpl;
287 }
288 }
289
290 Pages::Pages(const Pages& other) :
291 m_pimpl(other.m_pimpl)
292 { ++(m_pimpl->m_refcnt); }
293
295 {
296 if (&other == this) return *this;
297 if (--(m_pimpl->m_refcnt)) {
298 if (m_pimpl->m_parent) m_pimpl->m_parent->push(*this);
299 delete m_pimpl;
300 }
301 m_pimpl = other.m_pimpl;
302 ++(m_pimpl->m_refcnt);
303 return *this;
304 }
305
306 unsigned Pages::pagesize() { return PageChunk::pagesize(); }
307
308 Page* Pages::page(unsigned pgno) const
309 {
310 assert(pgno < m_pimpl->m_npages);
311 unsigned char* pptr =
312 reinterpret_cast<unsigned char*>(m_pimpl->m_pages);
313 pptr += pgno * pagesize();
314 return reinterpret_cast<Page*>(pptr);
315 }
316
317 unsigned Pages::pageno(Page* p) const
318 {
319 const unsigned char* pptr =
320 reinterpret_cast<const unsigned char*>(p);
321 const unsigned char* bptr =
322 reinterpret_cast<const unsigned char*>(m_pimpl->m_pages);
323 assert(0 == ((pptr - bptr) % pagesize()));
324 const unsigned nr = (pptr - bptr) / pagesize();
325 assert(nr < m_pimpl->m_npages);
326 return nr;
327 }
328
330 {
331 // find out page size of system
332 long pgsz = sysconf(_SC_PAGESIZE);
333 if (-1 == pgsz) throw Exception("sysconf", errno);
334 if (pgsz > 512 && pgsz > long(sizeof(Page)))
335 return pgsz;
336
337 // in case of failure or implausible value, use a safe default: 4k
338 // page size, and do not try to mmap
340 return 1 << 12;
341 }
342
343 PageChunk::PageChunk(PagePool* parent,
344 unsigned length, unsigned nPgPerGroup) :
345 m_begin(dommap(length)),
346 m_end(reinterpret_cast<void*>(
347 reinterpret_cast<unsigned char*>(m_begin) + length)),
348 m_parent(parent), m_nPgPerGrp(nPgPerGroup), m_nUsedGrp(0)
349 {
350 // ok, push groups of pages onto freelist here
351 unsigned char* p = reinterpret_cast<unsigned char*>(m_begin);
352 unsigned char* pend = reinterpret_cast<unsigned char*>(m_end);
353 while (p < pend) {
354 m_freelist.push_back(reinterpret_cast<void*>(p));
355 p += nPgPerGroup * PagePool::pagesize();
356 }
357 }
358
360 {
361 if (m_parent) assert(empty());
362 if (m_begin) domunmap(m_begin, len());
363 }
364
365 bool PageChunk::contains(const Pages& p) const
366 { return p.m_pimpl->m_parent == this; }
367
369 {
370 assert(!m_freelist.empty());
371 void* p = m_freelist.front();
372 m_freelist.pop_front();
373 ++m_nUsedGrp;
374 return Pages(this, reinterpret_cast<Page*>(p), m_nPgPerGrp);
375 }
376
377 void PageChunk::push(const Pages& p)
378 {
379 assert(contains(p));
380 bool wasempty = m_freelist.empty();
381 m_freelist.push_front(reinterpret_cast<void*>(p[0u]));
382 --m_nUsedGrp;
383 if (m_parent) {
384 // notify parent if we need to be put on the free list again
385 if (wasempty) m_parent->putOnFreeList(this);
386 // notify parent if we're empty
387 if (empty()) return m_parent->release(this);
388 }
389 }
390
391 void* PageChunk::dommap(unsigned len)
392 {
393 assert(len && 0 == (len % s_physpgsz));
394 // ok, the idea here is to try the different methods of mmapping, and
395 // choose the first one that works. we have four flavours:
396 // 1 - anonymous mmap (best)
397 // 2 - mmap of /dev/zero (about as good as anonymous mmap, but a tiny
398 // bit more tedious to set up, since you need to open/close a
399 // device file)
400 // 3 - mmap of a temporary file (very tedious to set up - need to
401 // create a temporary file, delete it, make the underlying storage
402 // large enough, then mmap the fd and close it)
403 // 4 - if all those fail, we malloc the buffers, and copy the data
404 // through the OS (then we're no better than normal pipes)
405 static bool msgprinted = false;
407#if defined(MAP_ANONYMOUS)
408#undef MYANONFLAG
409#define MYANONFLAG MAP_ANONYMOUS
410#elif defined(MAP_ANON)
411#undef MYANONFLAG
412#define MYANONFLAG MAP_ANON
413#else
414#undef MYANONFLAG
415#endif
416#ifdef MYANONFLAG
417 void* retVal = ::mmap(0, len, PROT_READ | PROT_WRITE,
418 MYANONFLAG | MAP_SHARED, -1, 0);
419 if (MAP_FAILED == retVal) {
420 if (Anonymous == s_mmapworks) throw Exception("mmap", errno);
421 } else {
422 assert(Unknown == s_mmapworks || Anonymous == s_mmapworks);
424 if (BidirMMapPipe::debugflag() && !msgprinted) {
425 std::cerr << " INFO: In " << __func__ << " (" <<
426 __FILE__ << ", line " << __LINE__ <<
427 "): anonymous mmapping works, excellent!" <<
428 std::endl;
429 msgprinted = true;
430 }
431 return retVal;
432 }
433#endif
434#undef MYANONFLAG
435 }
436 if (DevZero == s_mmapworks || Unknown == s_mmapworks) {
437 // ok, no anonymous mappings supported directly, so try to map
438 // /dev/zero which has much the same effect on many systems
439 int fd = ::open("/dev/zero", O_RDWR);
440 if (-1 == fd)
441 throw Exception("open /dev/zero", errno);
442 void* retVal = ::mmap(0, len,
443 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
444 if (MAP_FAILED == retVal) {
445 int errsv = errno;
446 ::close(fd);
447 if (DevZero == s_mmapworks) throw Exception("mmap", errsv);
448 } else {
449 assert(Unknown == s_mmapworks || DevZero == s_mmapworks);
451 }
452 if (-1 == ::close(fd))
453 throw Exception("close /dev/zero", errno);
454 if (BidirMMapPipe::debugflag() && !msgprinted) {
455 std::cerr << " INFO: In " << __func__ << " (" << __FILE__ <<
456 ", line " << __LINE__ << "): mmapping /dev/zero works, "
457 "very good!" << std::endl;
458 msgprinted = true;
459 }
460 return retVal;
461 }
463 char name[] = "/tmp/BidirMMapPipe-XXXXXX";
464 int fd;
465 // open temp file
466 if (-1 == (fd = ::mkstemp(name))) throw Exception("mkstemp", errno);
467 // remove it, but keep fd open
468 if (-1 == ::unlink(name)) {
469 int errsv = errno;
470 ::close(fd);
471 throw Exception("unlink", errsv);
472 }
473 // make it the right size: lseek
474 if (-1 == ::lseek(fd, len - 1, SEEK_SET)) {
475 int errsv = errno;
476 ::close(fd);
477 throw Exception("lseek", errsv);
478 }
479 // make it the right size: write a byte
480 if (1 != ::write(fd, name, 1)) {
481 int errsv = errno;
482 ::close(fd);
483 throw Exception("write", errsv);
484 }
485 // do mmap
486 void* retVal = ::mmap(0, len,
487 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
488 if (MAP_FAILED == retVal) {
489 int errsv = errno;
490 ::close(fd);
491 if (FileBacked == s_mmapworks) throw Exception("mmap", errsv);
492 } else {
493 assert(Unknown == s_mmapworks || FileBacked == s_mmapworks);
495 }
496 if (-1 == ::close(fd)) {
497 int errsv = errno;
498 ::munmap(retVal, len);
499 throw Exception("close", errsv);
500 }
501 if (BidirMMapPipe::debugflag() && !msgprinted) {
502 std::cerr << " INFO: In " << __func__ << " (" << __FILE__ <<
503 ", line " << __LINE__ << "): mmapping temporary files "
504 "works, good!" << std::endl;
505 msgprinted = true;
506 }
507 return retVal;
508 }
509 if (Copy == s_mmapworks || Unknown == s_mmapworks) {
510 // fallback solution: mmap does not work on this OS (or does not
511 // work for what we want to use it), so use a normal buffer of
512 // memory instead, and collect data in that buffer - this needs an
513 // additional write/read to/from the pipe(s), but there you go...
514 if (BidirMMapPipe::debugflag() && !msgprinted) {
515 std::cerr << "WARNING: In " << __func__ << " (" << __FILE__ <<
516 ", line " << __LINE__ << "): anonymous mmapping of "
517 "shared buffers failed, falling back to read/write on "
518 " pipes!" << std::endl;
519 msgprinted = true;
520 }
522 void* retVal = std::malloc(len);
523 if (!retVal) throw Exception("malloc", errno);
524 return retVal;
525 }
526 // should never get here
527 assert(false);
528 return 0;
529 }
530
531 void PageChunk::domunmap(void* addr, unsigned len)
532 {
533 assert(len && 0 == (len % s_physpgsz));
534 if (addr) {
535 assert(Unknown != s_mmapworks);
536 if (Copy != s_mmapworks) {
537 if (-1 == ::munmap(addr, len))
538 throw Exception("munmap", errno);
539 } else {
540 std::free(addr);
541 }
542 }
543 }
544
546 {
547 // try to mprotect the other bits of the pool with no access...
548 // we'd really like a version of mremap here that can unmap all the
549 // other pages in the chunk, but that does not exist, so we protect
550 // the other pages in this chunk such that they may neither be read,
551 // written nor executed, only the pages we're interested in for
552 // communications stay readable and writable
553 //
554 // if an OS does not support changing the protection of a part of an
555 // mmapped area, the mprotect calls below should just fail and not
556 // change any protection, so we're a little less safe against
557 // corruption, but everything should still work
558 if (Copy != s_mmapworks) {
559 unsigned char* p0 = reinterpret_cast<unsigned char*>(m_begin);
560 unsigned char* p1 = reinterpret_cast<unsigned char*>(p[0u]);
561 unsigned char* p2 = p1 + p.npages() * s_physpgsz;
562 unsigned char* p3 = reinterpret_cast<unsigned char*>(m_end);
563 if (p1 != p0) ::mprotect(p0, p1 - p0, PROT_NONE);
564 if (p2 != p3) ::mprotect(p2, p3 - p2, PROT_NONE);
565 }
566 m_parent = 0;
567 m_freelist.clear();
568 m_nUsedGrp = 1;
569 p.m_pimpl->m_parent = 0;
570 m_begin = m_end = 0;
571 // commit suicide
572 delete this;
573 }
574
575 PagePool::PagePool(unsigned nPgPerGroup) :
576 m_cursz(minsz), m_nPgPerGrp(nPgPerGroup)
577 {
578 // if logical and physical page size differ, we may have to adjust
579 // m_nPgPerGrp to make things fit
581 const unsigned mult =
583 const unsigned desired = nPgPerGroup * PageChunk::pagesize();
584 // round up to to next physical page boundary
585 const unsigned actual = mult *
586 (desired / mult + bool(desired % mult));
587 const unsigned newPgPerGrp = actual / PageChunk::pagesize();
589 std::cerr << " INFO: In " << __func__ << " (" <<
590 __FILE__ << ", line " << __LINE__ <<
591 "): physical page size " << PageChunk::physPgSz() <<
592 ", subdividing into logical pages of size " <<
593 PageChunk::pagesize() << ", adjusting nPgPerGroup " <<
594 m_nPgPerGrp << " -> " << newPgPerGrp <<
595 std::endl;
596 }
597 assert(newPgPerGrp >= m_nPgPerGrp);
598 m_nPgPerGrp = newPgPerGrp;
599 }
600 std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
601 }
602
603 PagePool::~PagePool()
604 {
605 m_freelist.clear();
606 for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it)
607 delete *it;
608 m_chunks.clear();
609 }
610
611 void PagePool::zap(Pages& p)
612 {
613 // unmap all pages but those pointed to by p
614 m_freelist.clear();
615 for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it) {
616 if ((*it)->contains(p)) {
617 (*it)->zap(p);
618 } else {
619 delete *it;
620 }
621 }
622 m_chunks.clear();
623 std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
624 m_cursz = minsz;
625 }
626
627 Pages PagePool::pop()
628 {
629 if (m_freelist.empty()) {
630 // allocate and register new chunk and put it on the freelist
631 const int sz = nextChunkSz();
632 Chunk *c = new Chunk(this,
633 sz * m_nPgPerGrp * pagesize(), m_nPgPerGrp);
634 m_chunks.push_front(c);
635 m_freelist.push_back(c);
636 updateCurSz(sz, +1);
637 }
638 // get free element from first chunk on _freelist
639 Chunk* c = m_freelist.front();
640 Pages p(c->pop());
641 // full chunks are removed from _freelist
642 if (c->full()) m_freelist.pop_front();
643 return p;
644 }
645
646 void PagePool::release(PageChunk* chunk)
647 {
648 assert(chunk->empty());
649 // find chunk on freelist and remove
650 ChunkList::iterator it = std::find(
651 m_freelist.begin(), m_freelist.end(), chunk);
652 if (m_freelist.end() == it)
653 throw Exception("PagePool::release(PageChunk*)", EINVAL);
654 m_freelist.erase(it);
655 // find chunk in m_chunks and remove
656 it = std::find(m_chunks.begin(), m_chunks.end(), chunk);
657 if (m_chunks.end() == it)
658 throw Exception("PagePool::release(PageChunk*)", EINVAL);
659 m_chunks.erase(it);
660 const unsigned sz = chunk->len() / (pagesize() * m_nPgPerGrp);
661 delete chunk;
662 updateCurSz(sz, -1);
663 }
664
665 void PagePool::putOnFreeList(PageChunk* chunk)
666 {
667 assert(!chunk->full());
668 m_freelist.push_back(chunk);
669 }
670
671 void PagePool::updateCurSz(int sz, int incr)
672 {
673 m_szmap[(sz - minsz) / szincr] += incr;
674 m_cursz = minsz;
675 for (int i = (maxsz - minsz) / szincr; i--; ) {
676 if (m_szmap[i]) {
677 m_cursz += i * szincr;
678 break;
679 }
680 }
681 }
682
683 int PagePool::nextChunkSz() const
684 {
685 // no chunks with space available, figure out chunk size
686 int sz = m_cursz;
687 if (m_chunks.empty()) {
688 // if we start allocating chunks, we start from minsz
689 sz = minsz;
690 } else {
691 if (minsz >= sz) {
692 // minimal sized chunks are always grown
693 sz = minsz + szincr;
694 } else {
695 if (1 != m_chunks.size()) {
696 // if we have more than one completely filled chunk, grow
697 sz += szincr;
698 } else {
699 // just one chunk left, try shrinking chunk size
700 sz -= szincr;
701 }
702 }
703 }
704 // clamp size to allowed range
705 if (sz > maxsz) sz = maxsz;
706 if (sz < minsz) sz = minsz;
707 return sz;
708 }
709}
710
711// static BidirMMapPipe members
712pthread_mutex_t BidirMMapPipe::s_openpipesmutex = PTHREAD_MUTEX_INITIALIZER;
713std::list<BidirMMapPipe*> BidirMMapPipe::s_openpipes;
714BidirMMapPipe_impl::PagePool* BidirMMapPipe::s_pagepool = 0;
717
718BidirMMapPipe_impl::PagePool& BidirMMapPipe::pagepool()
719{
720 if (!s_pagepool)
721 s_pagepool = new BidirMMapPipe_impl::PagePool(TotPages);
722 return *s_pagepool;
723}
724
726{
727 pthread_mutex_lock(&s_openpipesmutex);
728 while (!s_openpipes.empty()) {
729 BidirMMapPipe *p = s_openpipes.front();
730 pthread_mutex_unlock(&s_openpipesmutex);
731 if (p->m_childPid) kill(p->m_childPid, SIGTERM);
732 p->doClose(true, true);
733 pthread_mutex_lock(&s_openpipesmutex);
734 }
735 pthread_mutex_unlock(&s_openpipesmutex);
736}
737
739 m_pages(pagepool().pop())
740{
741 // free pages again
743 if (!s_pagepoolrefcnt) {
744 delete s_pagepool;
745 s_pagepool = 0;
746 }
747}
748
749BidirMMapPipe::BidirMMapPipe(bool useExceptions, bool useSocketpair) :
750 m_pages(pagepool().pop()), m_busylist(0), m_freelist(0), m_dirtylist(0),
751 m_inpipe(-1), m_outpipe(-1), m_flags(failbit), m_childPid(0),
752 m_parentPid(::getpid())
753
754{
756 assert(0 < TotPages && 0 == (TotPages & 1) && TotPages <= 256);
757 int fds[4] = { -1, -1, -1, -1 };
758 int myerrno;
759 static bool firstcall = true;
760 if (useExceptions) m_flags |= exceptionsbit;
761
762 try {
763 if (firstcall) {
764 firstcall = false;
765 // register a cleanup handler to make sure all BidirMMapPipes are torn
766 // down, and child processes are sent a SIGTERM
767 if (0 != atexit(BidirMMapPipe::teardownall))
768 throw Exception("atexit", errno);
769 }
770
771 // build free lists
772 for (unsigned i = 1; i < TotPages; ++i)
773 m_pages[i - 1]->setNext(m_pages[i]);
774 m_pages[PagesPerEnd - 1]->setNext(0);
775 if (!useSocketpair) {
776 // create pipes
777 if (0 != ::pipe(&fds[0])) throw Exception("pipe", errno);
778 if (0 != ::pipe(&fds[2])) throw Exception("pipe", errno);
779 } else {
780 if (0 != ::socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]))
781 throw Exception("socketpair", errno);
782 }
783 // fork the child
784 pthread_mutex_lock(&s_openpipesmutex);
785 char c;
786 switch ((m_childPid = ::fork())) {
787 case -1: // error in fork()
788 myerrno = errno;
789 pthread_mutex_unlock(&s_openpipesmutex);
790 m_childPid = 0;
791 throw Exception("fork", myerrno);
792 case 0: // child
793 // put the ends in the right place
794 if (-1 != fds[2]) {
795 // pair of pipes
796 if (-1 == ::close(fds[0]) || (-1 == ::close(fds[3]))) {
797 myerrno = errno;
798 pthread_mutex_unlock(&s_openpipesmutex);
799 throw Exception("close", myerrno);
800 }
801 fds[0] = fds[3] = -1;
802 m_outpipe = fds[1];
803 m_inpipe = fds[2];
804 } else {
805 // socket pair
806 if (-1 == ::close(fds[0])) {
807 myerrno = errno;
808 pthread_mutex_unlock(&s_openpipesmutex);
809 throw Exception("close", myerrno);
810 }
811 fds[0] = -1;
812 m_inpipe = m_outpipe = fds[1];
813 }
814 // close other pipes our parent may have open - we have no business
815 // reading from/writing to those...
816 for (std::list<BidirMMapPipe*>::iterator it = s_openpipes.begin();
817 s_openpipes.end() != it; ) {
818 BidirMMapPipe* p = *it;
819 it = s_openpipes.erase(it);
820 p->doClose(true, true);
821 }
822 pagepool().zap(m_pages);
824 delete s_pagepool;
825 s_pagepool = 0;
826 s_openpipes.push_front(this);
827 pthread_mutex_unlock(&s_openpipesmutex);
828 // ok, put our pages on freelist
830 // handshare with other end (to make sure it's alive)...
831 c = 'C'; // ...hild
832 if (1 != xferraw(m_outpipe, &c, 1, ::write))
833 throw Exception("handshake: xferraw write", EPIPE);
834 if (1 != xferraw(m_inpipe, &c, 1, ::read))
835 throw Exception("handshake: xferraw read", EPIPE);
836 if ('P' != c) throw Exception("handshake", EPIPE);
837 break;
838 default: // parent
839 // put the ends in the right place
840 if (-1 != fds[2]) {
841 // pair of pipes
842 if (-1 == ::close(fds[1]) || -1 == ::close(fds[2])) {
843 myerrno = errno;
844 pthread_mutex_unlock(&s_openpipesmutex);
845 throw Exception("close", myerrno);
846 }
847 fds[1] = fds[2] = -1;
848 m_outpipe = fds[3];
849 m_inpipe = fds[0];
850 } else {
851 // socketpair
852 if (-1 == ::close(fds[1])) {
853 myerrno = errno;
854 pthread_mutex_unlock(&s_openpipesmutex);
855 throw Exception("close", myerrno);
856 }
857 fds[1] = -1;
858 m_inpipe = m_outpipe = fds[0];
859 }
860 // put on list of open pipes (so we can kill child processes
861 // if things go wrong)
862 s_openpipes.push_front(this);
863 pthread_mutex_unlock(&s_openpipesmutex);
864 // ok, put our pages on freelist
865 m_freelist = m_pages[0u];
866 // handshare with other end (to make sure it's alive)...
867 c = 'P'; // ...arent
868 if (1 != xferraw(m_outpipe, &c, 1, ::write))
869 throw Exception("handshake: xferraw write", EPIPE);
870 if (1 != xferraw(m_inpipe, &c, 1, ::read))
871 throw Exception("handshake: xferraw read", EPIPE);
872 if ('C' != c) throw Exception("handshake", EPIPE);
873 break;
874 }
875 // mark file descriptors for close on exec (we do not want to leak the
876 // connection to anything we happen to exec)
877 int fdflags = 0;
878 if (-1 == ::fcntl(m_outpipe, F_GETFD, &fdflags))
879 throw Exception("fcntl", errno);
880 fdflags |= FD_CLOEXEC;
881 if (-1 == ::fcntl(m_outpipe, F_SETFD, fdflags))
882 throw Exception("fcntl", errno);
883 if (m_inpipe != m_outpipe) {
884 if (-1 == ::fcntl(m_inpipe, F_GETFD, &fdflags))
885 throw Exception("fcntl", errno);
886 fdflags |= FD_CLOEXEC;
887 if (-1 == ::fcntl(m_inpipe, F_SETFD, fdflags))
888 throw Exception("fcntl", errno);
889 }
890 // ok, finally, clear the failbit
891 m_flags &= ~failbit;
892 // all done
893 } catch (BidirMMapPipe::Exception&) {
894 if (0 != m_childPid) kill(m_childPid, SIGTERM);
895 for (int i = 0; i < 4; ++i)
896 if (-1 != fds[i] && 0 != fds[i]) ::close(fds[i]);
897 {
898 // free resources associated with mmapped pages
900 }
901 if (!--s_pagepoolrefcnt) {
902 delete s_pagepool;
903 s_pagepool = 0;
904 }
905 throw;
906 }
907}
908
910{
911 assert(!(m_flags & failbit));
912 return doClose(false);
913}
914
915int BidirMMapPipe::doClose(bool force, bool holdlock)
916{
917 if (m_flags & failbit) return 0;
918 // flush data to be written
919 if (!force && -1 != m_outpipe && -1 != m_inpipe) flush();
920 // shut down the write direction (no more writes from our side)
921 if (m_inpipe == m_outpipe) {
922 if (-1 != m_outpipe && !force && -1 == ::shutdown(m_outpipe, SHUT_WR))
923 throw Exception("shutdown", errno);
924 m_outpipe = -1;
925 } else {
926 if (-1 != m_outpipe && -1 == ::close(m_outpipe))
927 if (!force) throw Exception("close", errno);
928 m_outpipe = -1;
929 }
930 // shut down the write direction (no more writes from our side)
931 // drain anything the other end might still want to send
932 if (!force && -1 != m_inpipe) {
933 // **************** THIS IS EXTREMELY UGLY: ****************
934 // POLLHUP is not set reliably on pipe/socket shutdown on all
935 // platforms, unfortunately, so we poll for readability here until
936 // the other end closes, too
937 //
938 // the read loop below ensures that the other end sees the POLLIN that
939 // is set on shutdown instead, and goes ahead to close its end
940 //
941 // if we don't do this, and close straight away, the other end
942 // will catch a SIGPIPE or similar, and we don't want that
943 int err;
944 struct pollfd fds;
945 fds.fd = m_inpipe;
946 fds.events = POLLIN;
947 fds.revents = 0;
948 do {
949 while ((err = ::poll(&fds, 1, 1 << 20)) >= 0) {
950 if (fds.revents & (POLLERR | POLLHUP | POLLNVAL)) break;
951 if (fds.revents & POLLIN) {
952 char c;
953 if (1 > ::read(m_inpipe, &c, 1)) break;
954 }
955 }
956 } while (0 > err && EINTR == errno);
957 // ignore all other poll errors
958 }
959 // close read end
960 if (-1 != m_inpipe && -1 == ::close(m_inpipe))
961 if (!force) throw Exception("close", errno);
962 m_inpipe = -1;
963 // unmap memory
964 try {
966 if (!--s_pagepoolrefcnt) {
967 delete s_pagepool;
968 s_pagepool = 0;
969 }
970 } catch (std::exception&) {
971 if (!force) throw;
972 }
974 // wait for child process
975 int retVal = 0;
976 if (isParent()) {
977 int tmp;
978 do {
979 tmp = waitpid(m_childPid, &retVal, 0);
980 } while (-1 == tmp && EINTR == errno);
981 if (-1 == tmp)
982 if (!force) throw Exception("waitpid", errno);
983 m_childPid = 0;
984 }
985 // remove from list of open pipes
986 if (!holdlock) pthread_mutex_lock(&s_openpipesmutex);
987 std::list<BidirMMapPipe*>::iterator it = std::find(
988 s_openpipes.begin(), s_openpipes.end(), this);
989 if (s_openpipes.end() != it) s_openpipes.erase(it);
990 if (!holdlock) pthread_mutex_unlock(&s_openpipesmutex);
991 m_flags |= failbit;
992 return retVal;
993}
994
996{ doClose(false); }
997
999 int fd, void* addr, size_type len,
1000 ssize_t (*xferfn)(int, void*, std::size_t))
1001{
1002 size_type xferred = 0;
1003 unsigned char* buf = reinterpret_cast<unsigned char*>(addr);
1004 while (len) {
1005 ssize_t tmp = xferfn(fd, buf, len);
1006 if (tmp > 0) {
1007 xferred += tmp;
1008 len -= tmp;
1009 buf += tmp;
1010 continue;
1011 } else if (0 == tmp) {
1012 // check for end-of-file on pipe
1013 break;
1014 } else if (-1 == tmp) {
1015 // ok some error occurred, so figure out if we want to retry of throw
1016 switch (errno) {
1017 default:
1018 // if anything was transferred, return number of bytes
1019 // transferred so far, we can start throwing on the next
1020 // transfer...
1021 if (xferred) return xferred;
1022 // else throw
1023 throw Exception("xferraw", errno);
1024 case EAGAIN: // fallthrough intended
1025#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
1026 case EWOULDBLOCK: // fallthrough intended
1027#endif
1028 std::cerr << " ERROR: In " << __func__ << " (" <<
1029 __FILE__ << ", line " << __LINE__ <<
1030 "): expect transfer to block!" << std::endl;
1031 case EINTR:
1032 break;
1033 }
1034 continue;
1035 } else {
1036 throw Exception("xferraw: unexpected return value from read/write",
1037 errno);
1038 }
1039 }
1040 return xferred;
1041}
1042
1044{
1045 if (plist) {
1046 unsigned char pg = m_pages[plist];
1047 if (1 == xferraw(m_outpipe, &pg, 1, ::write)) {
1050 // ok, have to copy pages through pipe
1051 for (Page* p = plist; p; p = p->next()) {
1052 if (sizeof(Page) + p->size() !=
1053 xferraw(m_outpipe, p, sizeof(Page) + p->size(),
1054 ::write)) {
1055 throw Exception("sendpages: short write", EPIPE);
1056 }
1057 }
1058 }
1059 } else {
1060 throw Exception("sendpages: short write", EPIPE);
1061 }
1062 } else { assert(plist); }
1063}
1064
1066{
1067 unsigned char pg;
1068 unsigned retVal = 0;
1069 Page *plisthead = 0, *plisttail = 0;
1070 if (1 == xferraw(m_inpipe, &pg, 1, ::read)) {
1071 plisthead = plisttail = m_pages[pg];
1072 // ok, have number of pages
1075 // ok, need to copy pages through pipe
1076 for (; plisttail; ++retVal) {
1077 Page* p = plisttail;
1078 if (sizeof(Page) == xferraw(m_inpipe, p, sizeof(Page),
1079 ::read)) {
1080 plisttail = p->next();
1081 if (!p->size()) continue;
1082 // break in case of read error
1083 if (p->size() != xferraw(m_inpipe, p->begin(), p->size(),
1084 ::read)) break;
1085 }
1086 }
1087 } else {
1088 retVal = lenPageList(plisthead);
1089 }
1090 }
1091 // put list of pages we just received into correct lists (busy/free)
1092 if (plisthead) feedPageLists(plisthead);
1093 // ok, retVal contains the number of pages read, so put them on the
1094 // correct lists
1095 return retVal;
1096}
1097
1099{
1100 struct pollfd fds;
1101 fds.fd = m_inpipe;
1102 fds.events = POLLIN;
1103 fds.revents = 0;
1104 unsigned retVal = 0;
1105 do {
1106 int rc = ::poll(&fds, 1, 0);
1107 if (0 > rc) {
1108 if (EINTR == errno) continue;
1109 break;
1110 }
1111 if (1 == retVal && fds.revents & POLLIN &&
1112 !(fds.revents & (POLLNVAL | POLLERR))) {
1113 // ok, we can read without blocking, so the other end has
1114 // something for us
1115 return recvpages();
1116 } else {
1117 break;
1118 }
1119 } while (true);
1120 return retVal;
1121}
1122
1124{
1125 unsigned n = 0;
1126 for ( ; p; p = p->next()) ++n;
1127 return n;
1128}
1129
1131{
1132 assert(plist);
1133 // get end of busy list
1134 Page *blend = m_busylist;
1135 while (blend && blend->next()) blend = blend->next();
1136 // ok, might have to send free pages to other end, and (if we do have to
1137 // send something to the other end) while we're at it, send any dirty
1138 // pages which are completely full, too
1139 Page *sendlisthead = 0, *sendlisttail = 0;
1140 // loop over plist
1141 while (plist) {
1142 Page* p = plist;
1143 plist = p->next();
1144 p->setNext(0);
1145 if (p->size()) {
1146 // busy page...
1147 p->pos() = 0;
1148 // put at end of busy list
1149 if (blend) blend->setNext(p);
1150 else m_busylist = p;
1151 blend = p;
1152 } else {
1153 // free page...
1154 // Very simple algorithm: once we're done with a page, we send it back
1155 // where it came from. If it's from our end, we put it on the free list, if
1156 // it's from the other end, we send it back.
1157 if ((isParent() && m_pages[p] >= PagesPerEnd) ||
1158 (isChild() && m_pages[p] < PagesPerEnd)) {
1159 // page "belongs" to other end
1160 if (!sendlisthead) sendlisthead = p;
1161 if (sendlisttail) sendlisttail->setNext(p);
1162 sendlisttail = p;
1163 } else {
1164 // add page to freelist
1165 p->setNext(m_freelist);
1166 m_freelist = p;
1167 }
1168 }
1169 }
1170 // check if we have to send stuff to the other end
1171 if (sendlisthead) {
1172 // go through our list of dirty pages, and see what we can
1173 // send along
1174 Page* dp;
1175 while ((dp = m_dirtylist) && dp->full()) {
1176 Page* p = dp;
1177 // move head of dirty list
1178 m_dirtylist = p->next();
1179 // queue for sending
1180 p->setNext(0);
1181 sendlisttail->setNext(p);
1182 sendlisttail = p;
1183 }
1184 // poll if the other end is still alive - this needs that we first
1185 // close the write pipe of the other end when the remote end of the
1186 // connection is shutting down in doClose; we'll see that because we
1187 // get a POLLHUP on our inpipe
1188 const int nfds = (m_outpipe == m_inpipe) ? 1 : 2;
1189 struct pollfd fds[2];
1190 fds[0].fd = m_outpipe;
1191 fds[0].events = fds[0].revents = 0;
1192 if (m_outpipe != m_inpipe) {
1193 fds[1].fd = m_inpipe;
1194 fds[1].events = fds[1].revents = 0;
1195 } else {
1196 fds[0].events |= POLLIN;
1197 }
1198 int retVal = 0;
1199 do {
1200 retVal = ::poll(fds, nfds, 0);
1201 if (0 > retVal && EINTR == errno)
1202 continue;
1203 break;
1204 } while (true);
1205 if (0 <= retVal) {
1206 bool ok = !(fds[0].revents & (POLLERR | POLLNVAL | POLLHUP));
1207 if (m_outpipe != m_inpipe) {
1208 ok = ok && !(fds[1].revents & (POLLERR | POLLNVAL | POLLHUP));
1209 } else {
1210 if (ok && fds[0].revents & POLLIN) {
1211 unsigned ret = recvpages();
1212 if (!ret) ok = false;
1213 }
1214 }
1215
1216 if (ok) sendpages(sendlisthead);
1217 // (if the pipe is dead already, we don't care that we leak the
1218 // contents of the pages on the send list here, so that is why
1219 // there's no else clause here)
1220 } else {
1221 throw Exception("feedPageLists: poll", errno);
1222 }
1223 }
1224}
1225
1227{
1228 assert(p);
1229 assert(p == m_freelist);
1230 // remove from freelist
1231 m_freelist = p->next();
1232 p->setNext(0);
1233 // append to dirty list
1234 Page* dl = m_dirtylist;
1235 while (dl && dl->next()) dl = dl->next();
1236 if (dl) dl->setNext(p);
1237 else m_dirtylist = p;
1238}
1239
1241{
1242 // queue any pages available for reading we can without blocking
1244 Page* p;
1245 // if there are no busy pages, try to get them from the other end,
1246 // block if we have to...
1247 while (!(p = m_busylist)) if (!recvpages()) return 0;
1248 return p;
1249}
1250
1252{
1253 // queue any pages available for reading we can without blocking
1255 Page* p = m_dirtylist;
1256 // go to end of dirty list
1257 if (p) while (p->next()) p = p->next();
1258 if (!p || p->full()) {
1259 // need to append free page, so get one
1260 while (!(p = m_freelist)) if (!recvpages()) return 0;
1261 markPageDirty(p);
1262 }
1263 return p;
1264}
1265
1267{ return doFlush(true); }
1268
1269void BidirMMapPipe::doFlush(bool forcePartialPages)
1270{
1271 assert(!(m_flags & failbit));
1272 // build a list of pages to flush
1273 Page *flushlisthead = 0, *flushlisttail = 0;
1274 while (m_dirtylist) {
1275 Page* p = m_dirtylist;
1276 if (!forcePartialPages && !p->full()) break;
1277 // remove dirty page from dirty list
1278 m_dirtylist = p->next();
1279 p->setNext(0);
1280 // and send it to other end
1281 if (!flushlisthead) flushlisthead = p;
1282 if (flushlisttail) flushlisttail->setNext(p);
1283 flushlisttail = p;
1284 }
1285 if (flushlisthead) sendpages(flushlisthead);
1286}
1287
1289{
1290 assert(!(m_flags & failbit));
1291 // join busy and dirty lists
1292 {
1293 Page *l = m_busylist;
1294 while (l && l->next()) l = l->next();
1295 if (l) l->setNext(m_dirtylist);
1296 else m_busylist = m_dirtylist;
1297 }
1298 // empty busy and dirty pages
1299 for (Page* p = m_busylist; p; p = p->next()) p->size() = 0;
1300 // put them on the free list
1302 m_busylist = m_dirtylist = 0;
1303}
1304
1306{
1307 // queue all pages waiting for consumption in the pipe before we give an
1308 // answer
1310 size_type retVal = 0;
1311 for (Page* p = m_busylist; p; p = p->next())
1312 retVal += p->size() - p->pos();
1313 return retVal;
1314}
1315
1317{
1318 // queue all pages waiting for consumption in the pipe before we give an
1319 // answer
1321 // check if we could write to the pipe without blocking (we need to know
1322 // because we might need to check if flushing of dirty pages would block)
1323 bool couldwrite = false;
1324 {
1325 struct pollfd fds;
1326 fds.fd = m_outpipe;
1327 fds.events = POLLOUT;
1328 fds.revents = 0;
1329 int retVal = 0;
1330 do {
1331 retVal = ::poll(&fds, 1, 0);
1332 if (0 > retVal) {
1333 if (EINTR == errno) continue;
1334 throw Exception("bytesWritableNonBlocking: poll", errno);
1335 }
1336 if (1 == retVal && fds.revents & POLLOUT &&
1337 !(fds.revents & (POLLNVAL | POLLERR | POLLHUP)))
1338 couldwrite = true;
1339 break;
1340 } while (true);
1341 }
1342 // ok, start counting bytes
1343 size_type retVal = 0;
1344 unsigned npages = 0;
1345 // go through the dirty list
1346 for (Page* p = m_dirtylist; p; p = p->next()) {
1347 ++npages;
1348 // if page only partially filled
1349 if (!p->full())
1350 retVal += p->free();
1351 if (npages >= FlushThresh && !couldwrite) break;
1352 }
1353 // go through the free list
1354 for (Page* p = m_freelist; p && (!m_dirtylist ||
1355 npages < FlushThresh || couldwrite); p = p->next()) {
1356 ++npages;
1357 retVal += Page::capacity();
1358 }
1359 return retVal;
1360}
1361
1363{
1364 assert(!(m_flags & failbit));
1365 size_type nread = 0;
1366 unsigned char *ap = reinterpret_cast<unsigned char*>(addr);
1367 try {
1368 while (sz) {
1369 // find next page to read from
1370 Page* p = busypage();
1371 if (!p) {
1372 m_flags |= eofbit;
1373 return nread;
1374 }
1375 unsigned char* pp = p->begin() + p->pos();
1376 size_type csz = std::min(size_type(p->remaining()), sz);
1377 std::copy(pp, pp + csz, ap);
1378 nread += csz;
1379 ap += csz;
1380 sz -= csz;
1381 p->pos() += csz;
1382 assert(p->size() >= p->pos());
1383 if (p->size() == p->pos()) {
1384 // if no unread data remains, page is free
1385 m_busylist = p->next();
1386 p->setNext(0);
1387 p->size() = 0;
1388 feedPageLists(p);
1389 }
1390 }
1391 } catch (Exception&) {
1392 m_flags |= rderrbit;
1393 if (m_flags & exceptionsbit) throw;
1394 }
1395 return nread;
1396}
1397
1399{
1400 assert(!(m_flags & failbit));
1401 size_type written = 0;
1402 const unsigned char *ap = reinterpret_cast<const unsigned char*>(addr);
1403 try {
1404 while (sz) {
1405 // find next page to write to
1406 Page* p = dirtypage();
1407 if (!p) {
1408 m_flags |= eofbit;
1409 return written;
1410 }
1411 unsigned char* pp = p->begin() + p->size();
1412 size_type csz = std::min(size_type(p->free()), sz);
1413 std::copy(ap, ap + csz, pp);
1414 written += csz;
1415 ap += csz;
1416 p->size() += csz;
1417 sz -= csz;
1418 assert(p->capacity() >= p->size());
1419 if (p->full()) {
1420 // if page is full, see if we're above the flush threshold of
1421 // 3/4 of our pages
1423 doFlush(false);
1424 }
1425 }
1426 } catch (Exception&) {
1427 m_flags |= wrerrbit;
1428 if (m_flags & exceptionsbit) throw;
1429 }
1430 return written;
1431}
1432
1434{
1435 // go through pipes, and change flags where we already know without really
1436 // polling - stuff where we don't need poll to wait for its timeout in the
1437 // OS...
1438 bool canskiptimeout = false;
1439 std::vector<unsigned> masks(pipes.size(), ~(Readable | Writable));
1440 std::vector<unsigned>::iterator mit = masks.begin();
1441 for (PollVector::iterator it = pipes.begin(); pipes.end() != it;
1442 ++it, ++mit) {
1443 PollEntry& pe = *it;
1444 pe.revents = None;
1445 // null pipe pointer or closed pipe is invalid
1446 if (!pe.pipe || pe.pipe->closed()) pe.revents |= Invalid;
1447 // check for error
1448 if (pe.pipe->bad()) pe.revents |= Error;
1449 // check for end of file
1450 if (pe.pipe->eof()) pe.revents |= EndOfFile;
1451 // check if readable
1452 if (pe.events & Readable) {
1453 *mit |= Readable;
1454 if (pe.pipe->m_busylist) pe.revents |= Readable;
1455 }
1456 // check if writable
1457 if (pe.events & Writable) {
1458 *mit |= Writable;
1459 if (pe.pipe->m_freelist) {
1460 pe.revents |= Writable;
1461 } else {
1462 Page *dl = pe.pipe->m_dirtylist;
1463 while (dl && dl->next()) dl = dl->next();
1464 if (dl && dl->pos() < Page::capacity())
1465 pe.revents |= Writable;
1466 }
1467 }
1468 if (pe.revents) canskiptimeout = true;
1469 }
1470 // set up the data structures required for the poll syscall
1471 std::vector<pollfd> fds;
1472 fds.reserve(2 * pipes.size());
1473 std::map<int, PollEntry*> fds2pipes;
1474 for (PollVector::const_iterator it = pipes.begin();
1475 pipes.end() != it; ++it) {
1476 const PollEntry& pe = *it;
1477 struct pollfd tmp;
1478 fds2pipes.insert(std::make_pair((tmp.fd = pe.pipe->m_inpipe),
1479 const_cast<PollEntry*>(&pe)));
1480 tmp.events = tmp.revents = 0;
1481 // we always poll for readability; this allows us to queue pages
1482 // early
1483 tmp.events |= POLLIN;
1484 if (pe.pipe->m_outpipe != tmp.fd) {
1485 // ok, it's a pair of pipes
1486 fds.push_back(tmp);
1487 fds2pipes.insert(std::make_pair(
1488 unsigned(tmp.fd = pe.pipe->m_outpipe),
1489 const_cast<PollEntry*>(&pe)));
1490 tmp.events = 0;
1491
1492 }
1493 if (pe.events & Writable) tmp.events |= POLLOUT;
1494 fds.push_back(tmp);
1495 }
1496 // poll
1497 int retVal = 0;
1498 do {
1499 retVal = ::poll(&fds[0], fds.size(), canskiptimeout ? 0 : timeout);
1500 if (0 > retVal) {
1501 if (EINTR == errno) continue;
1502 throw Exception("poll", errno);
1503 }
1504 break;
1505 } while (true);
1506 // fds may have changed state, so update...
1507 for (std::vector<pollfd>::iterator it = fds.begin();
1508 fds.end() != it; ++it) {
1509 pollfd& fe = *it;
1510 //if (!fe.revents) continue;
1511 --retVal;
1512 PollEntry& pe = *fds2pipes[fe.fd];
1513oncemore:
1514 if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_inpipe)
1515 pe.revents |= ReadInvalid;
1516 if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_outpipe)
1517 pe.revents |= WriteInvalid;
1518 if (fe.revents & POLLERR && fe.fd == pe.pipe->m_inpipe)
1519 pe.revents |= ReadError;
1520 if (fe.revents & POLLERR && fe.fd == pe.pipe->m_outpipe)
1521 pe.revents |= WriteError;
1522 if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_inpipe)
1523 pe.revents |= ReadEndOfFile;
1524 if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_outpipe)
1525 pe.revents |= WriteEndOfFile;
1526 if ((fe.revents & POLLIN) && fe.fd == pe.pipe->m_inpipe &&
1527 !(fe.revents & (POLLNVAL | POLLERR))) {
1528 // ok, there is at least one page for us to receive from the
1529 // other end
1530 if (0 == pe.pipe->recvpages()) continue;
1531 // more pages there?
1532 do {
1533 int tmp = ::poll(&fe, 1, 0);
1534 if (tmp > 0) goto oncemore; // yippie! I don't even feel bad!
1535 if (0 > tmp) {
1536 if (EINTR == errno) continue;
1537 throw Exception("poll", errno);
1538 }
1539 break;
1540 } while (true);
1541 }
1542 if (pe.pipe->m_busylist) pe.revents |= Readable;
1543 if (fe.revents & POLLOUT && fe.fd == pe.pipe->m_outpipe) {
1544 if (pe.pipe->m_freelist) {
1545 pe.revents |= Writable;
1546 } else {
1547 Page *dl = pe.pipe->m_dirtylist;
1548 while (dl && dl->next()) dl = dl->next();
1549 if (dl && dl->pos() < Page::capacity())
1550 pe.revents |= Writable;
1551 }
1552 }
1553 }
1554 // apply correct masks, and count pipes with pending events
1555 int npipes = 0;
1556 mit = masks.begin();
1557 for (PollVector::iterator it = pipes.begin();
1558 pipes.end() != it; ++it, ++mit)
1559 if ((it->revents &= *mit)) ++npipes;
1560 return npipes;
1561}
1562
1564{
1565 size_t sz = std::strlen(str);
1566 *this << sz;
1567 if (sz) write(str, sz);
1568 return *this;
1569}
1570
1572{
1573 size_t sz = 0;
1574 *this >> sz;
1575 if (good() && !eof()) {
1576 str = reinterpret_cast<char*>(std::realloc(str, sz + 1));
1577 if (!str) throw Exception("realloc", errno);
1578 if (sz) read(str, sz);
1579 str[sz] = 0;
1580 }
1581 return *this;
1582}
1583
1585{
1586 size_t sz = str.size();
1587 *this << sz;
1588 write(str.data(), sz);
1589 return *this;
1590}
1591
1593{
1594 str.clear();
1595 size_t sz = 0;
1596 *this >> sz;
1597 if (good() && !eof()) {
1598 str.reserve(sz);
1599 for (unsigned char c; sz--; str.push_back(c)) *this >> c;
1600 }
1601 return *this;
1602}
1603
1605
1606#ifdef TEST_BIDIRMMAPPIPE
1607using namespace RooFit;
1608
1609int simplechild(BidirMMapPipe& pipe)
1610{
1611 // child does an echo loop
1612 while (pipe.good() && !pipe.eof()) {
1613 // read a string
1614 std::string str;
1615 pipe >> str;
1616 if (!pipe) return -1;
1617 if (pipe.eof()) break;
1618 if (!str.empty()) {
1619 std::cout << "[CHILD] : read: " << str << std::endl;
1620 str = "... early in the morning?";
1621 }
1622 pipe << str << BidirMMapPipe::flush;
1623 // did our parent tell us to shut down?
1624 if (str.empty()) break;
1625 if (!pipe) return -1;
1626 if (pipe.eof()) break;
1627 std::cout << "[CHILD] : wrote: " << str << std::endl;
1628 }
1629 pipe.close();
1630 return 0;
1631}
1632
1633#include <sstream>
1634int randomchild(BidirMMapPipe& pipe)
1635{
1636 // child sends out something at random intervals
1637 ::srand48(::getpid());
1638 {
1639 // wait for parent's go ahead signal
1640 std::string s;
1641 pipe >> s;
1642 }
1643 // no shutdown sequence needed on this side - we're producing the data,
1644 // and the parent can just read until we're done (when it'll get EOF)
1645 for (int i = 0; i < 5; ++i) {
1646 // sleep a random time between 0 and .9 seconds
1647 ::usleep(int(1e6 * ::drand48()));
1648 std::ostringstream buf;
1649 buf << "child pid " << ::getpid() << " sends message " << i;
1650 std::string str = buf.str();
1651 std::cout << "[CHILD] : " << str << std::endl;
1652 pipe << str << BidirMMapPipe::flush;
1653 if (!pipe) return -1;
1654 if (pipe.eof()) break;
1655 }
1656 // tell parent we're shutting down
1657 pipe << "" << BidirMMapPipe::flush;
1658 // wait for parent to acknowledge
1659 std::string s;
1660 pipe >> s;
1661 pipe.close();
1662 return 0;
1663}
1664
1665int benchchildrtt(BidirMMapPipe& pipe)
1666{
1667 // child does the equivalent of listening for pings and sending the
1668 // packet back
1669 char* str = 0;
1670 while (pipe && !pipe.eof()) {
1671 pipe >> str;
1672 if (!pipe) {
1673 std::free(str);
1674 pipe.close();
1675 return -1;
1676 }
1677 if (pipe.eof()) break;
1678 pipe << str << BidirMMapPipe::flush;
1679 // if we have just completed the shutdown handshake, we break here
1680 if (!std::strlen(str)) break;
1681 }
1682 std::free(str);
1683 pipe.close();
1684 return 0;
1685}
1686
1687int benchchildsink(BidirMMapPipe& pipe)
1688{
1689 // child behaves like a sink
1690 char* str = 0;
1691 while (pipe && !pipe.eof()) {
1692 pipe >> str;
1693 if (!std::strlen(str)) break;
1694 }
1695 pipe << "" << BidirMMapPipe::flush;
1696 std::free(str);
1697 pipe.close();
1698 return 0;
1699}
1700
1701int benchchildsource(BidirMMapPipe& pipe)
1702{
1703 // child behaves like a source
1704 char* str = 0;
1705 for (unsigned i = 0; i <= 24; ++i) {
1706 str = reinterpret_cast<char*>(std::realloc(str, (1 << i) + 1));
1707 std::memset(str, '4', 1 << i);
1708 str[1 << i] = 0;
1709 for (unsigned j = 0; j < 1 << 7; ++j) {
1710 pipe << str;
1711 if (!pipe || pipe.eof()) {
1712 std::free(str);
1713 pipe.close();
1714 return -1;
1715 }
1716 }
1717 // tell parent we're done with this block size
1718 pipe << "" << BidirMMapPipe::flush;
1719 }
1720 // tell parent to shut down
1721 pipe << "" << BidirMMapPipe::flush;
1722 std::free(str);
1723 pipe.close();
1724 return 0;
1725}
1726
1727BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
1728{
1729 // create a pipe with the given child at the remote end
1730 BidirMMapPipe *p = new BidirMMapPipe();
1731 if (p->isChild()) {
1732 int retVal = childexec(*p);
1733 delete p;
1734 std::exit(retVal);
1735 }
1736 return p;
1737}
1738
1739#include <sys/time.h>
1740#include <iomanip>
1741int main()
1742{
1743 // simple echo loop test
1744 {
1745 std::cout << "[PARENT]: simple challenge-response test, "
1746 "one child:" << std::endl;
1747 BidirMMapPipe* pipe = spawnChild(simplechild);
1748 for (int i = 0; i < 5; ++i) {
1749 std::string str("What shall we do with a drunken sailor...");
1750 *pipe << str << BidirMMapPipe::flush;
1751 if (!*pipe) return -1;
1752 std::cout << "[PARENT]: wrote: " << str << std::endl;
1753 *pipe >> str;
1754 if (!*pipe) return -1;
1755 std::cout << "[PARENT]: read: " << str << std::endl;
1756 }
1757 // send shutdown string
1758 *pipe << "" << BidirMMapPipe::flush;
1759 // wait for shutdown handshake
1760 std::string s;
1761 *pipe >> s;
1762 int retVal = pipe->close();
1763 std::cout << "[PARENT]: exit status of child: " << retVal <<
1764 std::endl;
1765 if (retVal) return retVal;
1766 delete pipe;
1767 }
1768 // simple poll test - children send 5 results in random intervals
1769 {
1770 unsigned nch = 20;
1771 std::cout << std::endl << "[PARENT]: polling test, " << nch <<
1772 " children:" << std::endl;
1773 typedef BidirMMapPipe::PollEntry PollEntry;
1774 // poll data structure
1776 pipes.reserve(nch);
1777 // spawn children
1778 for (unsigned i = 0; i < nch; ++i) {
1779 std::cout << "[PARENT]: spawning child " << i << std::endl;
1780 pipes.push_back(PollEntry(spawnChild(randomchild),
1782 }
1783 // wake children up
1784 std::cout << "[PARENT]: waking up children" << std::endl;
1785 for (unsigned i = 0; i < nch; ++i)
1786 *pipes[i].pipe << "" << BidirMMapPipe::flush;
1787 std::cout << "[PARENT]: waiting for events on children's pipes" << std::endl;
1788 // while at least some children alive
1789 while (!pipes.empty()) {
1790 // poll, wait until status change (infinite timeout)
1791 int npipes = BidirMMapPipe::poll(pipes, -1);
1792 // scan for pipes with changed status
1793 for (std::vector<PollEntry>::iterator it = pipes.begin();
1794 npipes && pipes.end() != it; ) {
1795 if (!it->revents) {
1796 // unchanged, next one
1797 ++it;
1798 continue;
1799 }
1800 --npipes; // maybe we can stop early...
1801 // read from pipes which are readable
1802 if (it->revents & BidirMMapPipe::Readable) {
1803 std::string s;
1804 *(it->pipe) >> s;
1805 if (!s.empty()) {
1806 std::cout << "[PARENT]: Read from pipe " << it->pipe <<
1807 ": " << s << std::endl;
1808 ++it;
1809 continue;
1810 } else {
1811 // child is shutting down...
1812 *(it->pipe) << "" << BidirMMapPipe::flush;
1813 goto childcloses;
1814 }
1815 }
1816 // retire pipes with error or end-of-file condition
1817 if (it->revents & (BidirMMapPipe::Error |
1820 std::cerr << "[DEBUG]: Event on pipe " << it->pipe <<
1821 " revents" <<
1822 ((it->revents & BidirMMapPipe::Readable) ? " Readable" : "") <<
1823 ((it->revents & BidirMMapPipe::Writable) ? " Writable" : "") <<
1824 ((it->revents & BidirMMapPipe::ReadError) ? " ReadError" : "") <<
1825 ((it->revents & BidirMMapPipe::WriteError) ? " WriteError" : "") <<
1826 ((it->revents & BidirMMapPipe::ReadEndOfFile) ? " ReadEndOfFile" : "") <<
1827 ((it->revents & BidirMMapPipe::WriteEndOfFile) ? " WriteEndOfFile" : "") <<
1828 ((it->revents & BidirMMapPipe::ReadInvalid) ? " ReadInvalid" : "") <<
1829 ((it->revents & BidirMMapPipe::WriteInvalid) ? " WriteInvalid" : "") <<
1830 std::endl;
1831childcloses:
1832 int retVal = it->pipe->close();
1833 std::cout << "[PARENT]: child exit status: " <<
1834 retVal << ", number of children still alive: " <<
1835 (pipes.size() - 1) << std::endl;
1836 if (retVal) return retVal;
1837 delete it->pipe;
1838 it = pipes.erase(it);
1839 continue;
1840 }
1841 }
1842 }
1843 }
1844 // little benchmark - round trip time
1845 {
1846 std::cout << std::endl << "[PARENT]: benchmark: round-trip times vs block size" << std::endl;
1847 for (unsigned i = 0; i <= 24; ++i) {
1848 char *s = new char[1 + (1 << i)];
1849 std::memset(s, 'A', 1 << i);
1850 s[1 << i] = 0;
1851 const unsigned n = 1 << 7;
1852 double avg = 0., min = 1e42, max = -1e42;
1853 BidirMMapPipe *pipe = spawnChild(benchchildrtt);
1854 for (unsigned j = n; j--; ) {
1855 struct timeval t1;
1856 ::gettimeofday(&t1, 0);
1857 *pipe << s << BidirMMapPipe::flush;
1858 if (!*pipe || pipe->eof()) break;
1859 *pipe >> s;
1860 if (!*pipe || pipe->eof()) break;
1861 struct timeval t2;
1862 ::gettimeofday(&t2, 0);
1863 t2.tv_sec -= t1.tv_sec;
1864 t2.tv_usec -= t1.tv_usec;
1865 double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1866 if (dt < min) min = dt;
1867 if (dt > max) max = dt;
1868 avg += dt;
1869 }
1870 // send a shutdown string
1871 *pipe << "" << BidirMMapPipe::flush;
1872 // get child's shutdown ok
1873 *pipe >> s;
1874 avg /= double(n);
1875 avg *= 1e6; min *= 1e6; max *= 1e6;
1876 int retVal = pipe->close();
1877 if (retVal) {
1878 std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1879 delete[] s;
1880 return retVal;
1881 }
1882 delete pipe;
1883 // there is a factor 2 in the formula for the transfer rate below,
1884 // because we transfer data of twice the size of the block - once
1885 // to the child, and once for the return trip
1886 std::cout << "block size " << std::setw(9) << (1 << i) <<
1887 " avg " << std::setw(7) << avg << " us min " <<
1888 std::setw(7) << min << " us max " << std::setw(7) << max <<
1889 "us speed " << std::setw(9) <<
1890 2. * (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1891 " MB/s" << std::endl;
1892 delete[] s;
1893 }
1894 std::cout << "[PARENT]: all children had exit code 0" << std::endl;
1895 }
1896 // little benchmark - child as sink
1897 {
1898 std::cout << std::endl << "[PARENT]: benchmark: raw transfer rate with child as sink" << std::endl;
1899 for (unsigned i = 0; i <= 24; ++i) {
1900 char *s = new char[1 + (1 << i)];
1901 std::memset(s, 'A', 1 << i);
1902 s[1 << i] = 0;
1903 const unsigned n = 1 << 7;
1904 double avg = 0., min = 1e42, max = -1e42;
1905 BidirMMapPipe *pipe = spawnChild(benchchildsink);
1906 for (unsigned j = n; j--; ) {
1907 struct timeval t1;
1908 ::gettimeofday(&t1, 0);
1909 // streaming mode - we do not flush here
1910 *pipe << s;
1911 if (!*pipe || pipe->eof()) break;
1912 struct timeval t2;
1913 ::gettimeofday(&t2, 0);
1914 t2.tv_sec -= t1.tv_sec;
1915 t2.tv_usec -= t1.tv_usec;
1916 double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1917 if (dt < min) min = dt;
1918 if (dt > max) max = dt;
1919 avg += dt;
1920 }
1921 // send a shutdown string
1922 *pipe << "" << BidirMMapPipe::flush;
1923 // get child's shutdown ok
1924 *pipe >> s;
1925 avg /= double(n);
1926 avg *= 1e6; min *= 1e6; max *= 1e6;
1927 int retVal = pipe->close();
1928 if (retVal) {
1929 std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1930 return retVal;
1931 }
1932 delete pipe;
1933 std::cout << "block size " << std::setw(9) << (1 << i) <<
1934 " avg " << std::setw(7) << avg << " us min " <<
1935 std::setw(7) << min << " us max " << std::setw(7) << max <<
1936 "us speed " << std::setw(9) <<
1937 (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1938 " MB/s" << std::endl;
1939 delete[] s;
1940 }
1941 std::cout << "[PARENT]: all children had exit code 0" << std::endl;
1942 }
1943 // little benchmark - child as source
1944 {
1945 std::cout << std::endl << "[PARENT]: benchmark: raw transfer rate with child as source" << std::endl;
1946 char *s = 0;
1947 double avg = 0., min = 1e42, max = -1e42;
1948 unsigned n = 0, bsz = 0;
1949 BidirMMapPipe *pipe = spawnChild(benchchildsource);
1950 while (*pipe && !pipe->eof()) {
1951 struct timeval t1;
1952 ::gettimeofday(&t1, 0);
1953 // streaming mode - we do not flush here
1954 *pipe >> s;
1955 if (!*pipe || pipe->eof()) break;
1956 struct timeval t2;
1957 ::gettimeofday(&t2, 0);
1958 t2.tv_sec -= t1.tv_sec;
1959 t2.tv_usec -= t1.tv_usec;
1960 double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1961 if (std::strlen(s)) {
1962 ++n;
1963 if (dt < min) min = dt;
1964 if (dt > max) max = dt;
1965 avg += dt;
1966 bsz = std::strlen(s);
1967 } else {
1968 if (!n) break;
1969 // next block size
1970 avg /= double(n);
1971 avg *= 1e6; min *= 1e6; max *= 1e6;
1972
1973 std::cout << "block size " << std::setw(9) << bsz <<
1974 " avg " << std::setw(7) << avg << " us min " <<
1975 std::setw(7) << min << " us max " << std::setw(7) <<
1976 max << "us speed " << std::setw(9) <<
1977 (double(bsz) / double(1 << 20) / (1e-6 * avg)) <<
1978 " MB/s" << std::endl;
1979 n = 0;
1980 avg = 0.;
1981 min = 1e42;
1982 max = -1e42;
1983 }
1984 }
1985 int retVal = pipe->close();
1986 std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1987 if (retVal) return retVal;
1988 delete pipe;
1989 std::free(s);
1990 }
1991 return 0;
1992}
1993#endif // TEST_BIDIRMMAPPIPE
1994#endif // _WIN32
1995
1996// vim: ft=cpp:sw=4:tw=78:et
#define END_NAMESPACE_ROOFIT
#define BEGIN_NAMESPACE_ROOFIT
header file for BidirMMapPipe, a class which forks off a child process and serves as communications c...
double
ROOT::R::TRInterface & Exception()
Definition Exception.C:4
#define f(i)
Definition RSha256.hxx:104
#define c(i)
Definition RSha256.hxx:101
#define e(i)
Definition RSha256.hxx:103
char name[80]
Definition TGX11.cxx:110
Binding & operator=(OUT(*fun)(void))
typedef void((*Func_t)())
#define free
Definition civetweb.c:1539
for poll() interface
unsigned revents
events that happened (or'ed bitmask)
BidirMMapPipe * pipe
pipe of interest
unsigned events
events of interest (or'ed bitmask)
class representing a chunk of pages
PageChunk(const PageChunk &)
forbid copying
BidirMMapPipeException Exception
convenience typedef
unsigned m_nUsedGrp
number of used page groups
unsigned len() const
return length of chunk
void * m_begin
pointer to start of mmapped area
static unsigned physPgSz()
return the physical page size of the system
static unsigned s_physpgsz
system physical page size
void * m_end
pointer one behind end of mmapped area
void push(const Pages &p)
push a group of pages onto the free list
bool contains(const Pages &p) const
return if p is contained in this PageChunk
PagePool * m_parent
parent page pool
std::list< void * > m_freelist
free pages list
static MMapVariety s_mmapworks
mmap variety that works on this system
MMapVariety
type of mmap support found
@ Copy
mmap doesn't work, have to copy back and forth
@ Unknown
don't know yet what'll work
@ DevZero
mmapping /dev/zero works
@ Anonymous
anonymous mmap works
@ FileBacked
mmapping a temp file works
static void domunmap(void *p, unsigned len)
munmap pages p, len is length of mmapped area in bytes
static unsigned s_pagesize
logical page size (run-time determined)
unsigned m_nPgPerGrp
number of pages per group
static MMapVariety mmapVariety()
return mmap variety support found
Pages pop()
pop a group of pages off the free list
static unsigned pagesize()
return the logical page size
static void * dommap(unsigned len)
mmap pages, len is length of mmapped area in bytes
bool empty() const
return true if no used page groups in this chunk
void zap(Pages &p)
free all pages except for those pointed to by p
static unsigned getPageSize()
determine page size at run time
handle class for a number of Pages
Pages()
default constructor
static unsigned pagesize()
return page size
impl * m_pimpl
pointer to implementation
Pages & operator=(const Pages &other)
assignment operator
void swap(Pages &other)
swap with other's contents
Page * page(unsigned pgno) const
return page number pageno
unsigned npages() const
return number of pages accessible
unsigned pageno(Page *p) const
perform page to page number mapping
BidirMMapPipe creates a bidirectional channel between the current process and a child it forks.
void purge()
purge buffered data waiting to be read and/or written
static BidirMMapPipe_impl::PagePool * s_pagepool
pool of mmapped pages
void feedPageLists(Page *plist)
"feed" the busy and free lists with a list of pages
static std::list< BidirMMapPipe * > s_openpipes
list of open BidirMMapPipes
int close()
flush buffers, close pipe
BidirMMapPipe_impl::Page Page
convenience typedef for Page
BidirMMapPipe_impl::Pages m_pages
mmapped pages
bool good() const
status of stream is good
Page * m_dirtylist
linked list: dirty pages (data to be sent)
@ wrerrbit
write error
@ eofbit
end of file reached
@ failbit
logical failure (e.g. pipe closed)
@ exceptionsbit
error reporting with exceptions
@ rderrbit
read error
unsigned recvpages()
receive a pages from the other end (may block), queue them
BidirMMapPipe & operator>>(char *(&str))
read a C-style string
static pthread_mutex_t s_openpipesmutex
protects s_openpipes
unsigned recvpages_nonblock()
receive pages from other end (non-blocking)
~BidirMMapPipe()
destructor
int m_inpipe
pipe end from which data may be read
bool isChild() const
return if this end of the pipe is the child end
void doFlush(bool forcePartialPages=true)
perform the flush
static void teardownall(void)
cleanup routine - at exit, we want our children to get a SIGTERM...
size_type bytesWritableNonBlocking()
number of bytes that can be written without blocking
static BidirMMapPipe_impl::PagePool & pagepool()
return page pool
bool eof() const
true if end-of-file
size_type bytesReadableNonBlocking()
number of bytes that can be read without blocking
@ PagesPerEnd
pages per pipe end
@ TotPages
pages shared (child + parent)
@ FlushThresh
flush threshold
static int debugflag()
return the current setting of the debug flag
bool isParent() const
return if this end of the pipe is the parent end
std::size_t size_type
type used to represent sizes
int doClose(bool force, bool holdlock=false)
close the pipe (no flush if forced)
bool bad() const
true on I/O error
@ WriteInvalid
write end of pipe invalid
@ Invalid
invalid pipe
@ ReadEndOfFile
read pipe in end-of-file state
@ WriteError
pipe error Write end
@ ReadError
pipe error read end
@ Error
pipe error
@ Readable
pipe has data for reading
@ None
nothing special on this pipe
@ ReadInvalid
read end of pipe invalid
@ WriteEndOfFile
write pipe in end-of-file state
@ Writable
pipe can be written to
@ EndOfFile
end of file
int m_flags
flags (e.g. end of file)
Page * m_freelist
linked list: free pages
Page * busypage()
get a busy page to read data from (may block)
Page * dirtypage()
get a dirty page to write data to (may block)
void sendpages(Page *plist)
send page(s) to the other end (may block)
BidirMMapPipe & operator<<(const char *str)
write a C-style string
static size_type xferraw(int fd, void *addr, size_type len, ssize_t(*xferfn)(int, void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
size_type read(void *addr, size_type sz)
read from pipe
static int s_debugflag
debug flag
bool closed() const
true if closed
BidirMMapPipe_impl::BidirMMapPipeException Exception
convenience typedef for BidirMMapPipeException
size_type write(const void *addr, size_type sz)
wirte to pipe
std::vector< PollEntry > PollVector
convenience typedef for poll() interface
void flush()
flush buffers with unwritten data
static unsigned s_pagepoolrefcnt
page pool reference counter
Page * m_busylist
linked list: busy pages (data to be read)
static int poll(PollVector &pipes, int timeout)
poll a set of pipes for events (ready to read from, ready to write to, error)
int m_outpipe
pipe end to which data may be written
void markPageDirty(Page *p)
put on dirty pages list
BidirMMapPipe(bool useExceptions=true, bool useSocketpair=false)
constructor (forks!)
static unsigned lenPageList(const Page *list)
return length of a page list
pid_t m_childPid
pid of the child (zero if we're child)
int main()
const Int_t n
Definition legend1.C:16
namespace for implementation details of BidirMMapPipe
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
unsigned char m_npages
length in pages
Page * m_pages
pointer to first page
unsigned m_refcnt
reference counter
PageChunk * m_parent
pointer to parent pool
auto * l
Definition textangle.C:4
auto * t1
Definition textangle.C:20
static unsigned long masks[]
Definition gifencode.c:211