Logo ROOT   6.18/05
Reference Guide
BidirMMapPipe.h
Go to the documentation of this file.
1/** @file BidirMMapPipe.h
2 *
3 * header file for BidirMMapPipe, a class which forks off a child process and
4 * serves as communications channel between parent and child
5 *
6 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
7 * @date 2013-07-07
8 */
9
10#ifndef BIDIRMMAPPIPE_H
11#define BIDIRMMAPPIPE_H
12
13#include <list>
14#include <vector>
15#include <cassert>
16#include <cstring>
17#include <unistd.h>
18
19#define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
20#define END_NAMESPACE_ROOFIT }
21
23
24/// namespace for implementation details of BidirMMapPipe
25namespace BidirMMapPipe_impl {
26 // forward declarations
27 class BidirMMapPipeException;
28 class Page;
29 class PagePool;
30 class Pages;
31
32 /** @brief class representing a chunk of pages
33 *
34 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
35 * @date 2013-07-24
36 *
37 * allocating pages from the OS happens in chunks in order to not exhaust
38 * the maximum allowed number of memory mappings per process; this class
39 * takes care of such a chunk
40 *
41 * a page chunk allows callers to obtain or release pages in groups of
42 * continuous pages of fixed size
43 */
44 class PageChunk {
45 public:
46 /// type of mmap support found
47 typedef enum {
48 Unknown, ///< don't know yet what'll work
49 Copy, ///< mmap doesn't work, have to copy back and forth
50 FileBacked, ///< mmapping a temp file works
51 DevZero, ///< mmapping /dev/zero works
52 Anonymous ///< anonymous mmap works
54
55 private:
56 static unsigned s_physpgsz; ///< system physical page size
57 static unsigned s_pagesize; ///< logical page size (run-time determined)
58 /// mmap variety that works on this system
60
61 /// convenience typedef
62 typedef BidirMMapPipeException Exception;
63
64 void* m_begin; ///< pointer to start of mmapped area
65 void* m_end; ///< pointer one behind end of mmapped area
66 // FIXME: cannot keep freelist inline - other end may need that
67 // data, and we'd end up overwriting the page header
68 std::list<void*> m_freelist; ///< free pages list
69 PagePool* m_parent; ///< parent page pool
70 unsigned m_nPgPerGrp; ///< number of pages per group
71 unsigned m_nUsedGrp; ///< number of used page groups
72
73 /// determine page size at run time
74 static unsigned getPageSize();
75
76 /// mmap pages, len is length of mmapped area in bytes
77 static void* dommap(unsigned len);
78 /// munmap pages p, len is length of mmapped area in bytes
79 static void domunmap(void* p, unsigned len);
80 /// forbid copying
82 /// forbid assignment
83 PageChunk& operator=(const PageChunk&) { return *this; }
84 public:
85 /// return the logical page size
86 static unsigned pagesize() { return s_pagesize; }
87 /// return the physical page size of the system
88 static unsigned physPgSz() { return s_physpgsz; }
89 /// return mmap variety support found
90 static MMapVariety mmapVariety() { return s_mmapworks; }
91
92 /// constructor
93 PageChunk(PagePool* parent, unsigned length, unsigned nPgPerGroup);
94
95 /// destructor
96 ~PageChunk();
97
98 /// return if p is contained in this PageChunk
99 bool contains(const Pages& p) const;
100
101 /// pop a group of pages off the free list
102 Pages pop();
103
104 /// push a group of pages onto the free list
105 void push(const Pages& p);
106
107 /// return length of chunk
108 unsigned len() const
109 {
110 return reinterpret_cast<unsigned char*>(m_end) -
111 reinterpret_cast<unsigned char*>(m_begin);
112 }
113 /// return number of pages per page group
114 unsigned nPagesPerGroup() const { return m_nPgPerGrp; }
115
116 /// return true if no used page groups in this chunk
117 bool empty() const { return !m_nUsedGrp; }
118
119 /// return true if no free page groups in this chunk
120 bool full() const { return m_freelist.empty(); }
121
122 /// free all pages except for those pointed to by p
123 void zap(Pages& p);
124 };
125
126 /** @brief handle class for a number of Pages
127 *
128 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
129 * @date 2013-07-24
130 *
131 * the associated pages are continuous in memory
132 */
133 class Pages {
134 private:
135 /// implementation
136 typedef struct {
137 PageChunk *m_parent; ///< pointer to parent pool
138 Page* m_pages; ///< pointer to first page
139 unsigned m_refcnt; ///< reference counter
140 unsigned char m_npages; ///< length in pages
141 } impl;
142 public:
143 /// default constructor
144 Pages() : m_pimpl(0) { }
145
146 /// destructor
147 ~Pages();
148
149 /** @brief copy constructor
150 *
151 * copy Pages handle to new object - old object loses ownership,
152 * and becomes a dangling handle
153 */
154 Pages(const Pages& other);
155
156 /** @brief assignment operator
157 *
158 * assign Pages handle to new object - old object loses ownership,
159 * and becomes a dangling handle
160 */
161 Pages& operator=(const Pages& other);
162
163 /// return page size
164 static unsigned pagesize();
165
166 /// return number of pages accessible
167 unsigned npages() const { return m_pimpl->m_npages; }
168
169 /// return page number pageno
170 Page* page(unsigned pgno) const;
171
172 /// return page number pageno
173 Page* operator[](unsigned pgno) const { return page(pgno); }
174
175 /// perform page to page number mapping
176 unsigned pageno(Page* p) const;
177
178 /// perform page to page number mapping
179 unsigned operator[](Page* p) const { return pageno(p); }
180
181 /// swap with other's contents
182 void swap(Pages& other)
183 {
184 impl* tmp = other.m_pimpl;
185 other.m_pimpl = m_pimpl;
186 m_pimpl = tmp;
187 }
188
189 private:
190 /// page pool is our friend - it's allowed to construct Pages
192
193 /// pointer to implementation
195
196 /// constructor
197 Pages(PageChunk* parent, Page* pages, unsigned npg);
198 };
199}
200
201/** @brief BidirMMapPipe creates a bidirectional channel between the current
202 * process and a child it forks.
203 *
204 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
205 * @date 2013-07-07
206 *
207 * This class creates a bidirectional channel between this process and a child
208 * it creates with fork().
209 *
210 * The channel is comrised of a small shared pool of buffer memory mmapped into
211 * both process spaces, and two pipes to synchronise the exchange of data. The
212 * idea behind using the pipes at all is to have some primitive which we can
213 * block on without having to worry about atomic operations or polling, leaving
214 * these tasks to the OS. In case the anonymous mmap cannot be performed on the
215 * OS the code is running on (for whatever reason), the code falls back to
216 * mmapping /dev/zero, mmapping a temporary file, or (if those all fail), a
217 * dynamically allocated buffer which is then transmitted through the pipe(s),
218 * a slightly slower alternative (because the data is copied more often).
219 *
220 * The channel supports five major operations: read(), write(), flush(),
221 * purge() and close(). Reading and writing may block until the required buffer
222 * space is available. Writes may queue up data to be sent to the other end
223 * until either enough pages are full, or the user calls flush which forces
224 * any unsent buffers to be sent to the other end. flush forces any data that
225 * is to be sent to be sent. purge discards any buffered data waiting to be
226 * read and/or sent. Closing the channel on the child returns zero, closing it
227 * on the parent returns the child's exit status.
228 *
229 * The class also provides operator<< and operator>> for C++-style I/O for
230 * basic data types (bool, char, short, int, long, long long, float, double
231 * and their unsigned counterparts). Data is transmitted binary (i.e. no
232 * formatting to strings like std::cout does). There are also overloads to
233 * support C-style zero terminated strings and std::string. In terms of
234 * performance, the former is to be preferred.
235 *
236 * If the caller needs to multiplex input and output to/from several pipes, the
237 * class provides the poll() method which allows to block until an event occurs
238 * on any of the polled pipes.
239 *
240 * After the BidirMMapPipe is closed, no further operations may be performed on
241 * that object, save for the destructor which may still be called.
242 *
243 * If the BidirMMapPipe has not properly been closed, the destructor will call
244 * close. However, the exit code of the child is lost in that case.
245 *
246 * Closing the object causes the mmapped memory to be unmapped and the two
247 * pipes to be closed. We also install an atexit handler in the process of
248 * creating BidirMMapPipes. This ensures that when the current process
249 * terminates, a SIGTERM signal is sent to the child processes created for all
250 * unclosed pipes to avoid leaving zombie processes in the OS's process table.
251 *
252 * BidirMMapPipe creation, closing and destruction are thread safe. If the
253 * BidirMMapPipe is used in more than one thread, the other operations have to
254 * be protected with a mutex (or something similar), though.
255 *
256 * End of file (other end closed its pipe, or died) is indicated with the eof()
257 * method, serious I/O errors set a flags (bad(), fail()), and also throw
258 * exceptions. For normal read/write operations, they can be suppressed (i.e.
259 * error reporting only using flags) with a constructor argument.
260 *
261 * Technicalities:
262 * - there is a pool of mmapped pages, half the pages are allocated to the
263 * parent process, half to the child
264 * - when one side has accumulated enough data (or a flush forces dirty pages
265 * out to the other end), it sends these pages to the other end by writing a
266 * byte containing the page number into the pipe
267 * - the other end (which has the pages mmapped, too) reads the page number(s)
268 * and puts the corresponding pages on its busy list
269 * - as the other ends reads, it frees busy pages, and eventually tries to put
270 * them on the its list; if a page belongs to the other end of the
271 * connection, it is sent back
272 * - lists of pages are sent across the pipe, not individual pages, in order
273 * to minimise the number of read/write operations needed
274 * - when mmap works properly, only one bytes containing the page number of
275 * the page list head is sent back and forth; the contents of that page
276 * allow to access the rest of the page list sent, and page headers on the
277 * list tell the receiving end if the page is free or has to be added to the
278 * busy list
279 * - when mmap does not work, we transfer one byte to indicate the head of the
280 * page list sent, and for each page on the list of sent pages, the page
281 * header and the page payload is sent (if the page is free, we only
282 * transmit the page header, and we never transmit more payload than
283 * the page actually contains)
284 * - in the child, all open BidirMMapPipes but the current one are closed. this
285 * is done for two reasons: first, to conserve file descriptors and address
286 * space. second, if more than one process is meant to use such a
287 * BidirMMapPipe, synchronisation issues arise which can lead to bugs that
288 * are hard to find and understand. it's much better to come up with a design
289 * which does not need pipes to be shared among more than two processes.
290 *
291 * Here is a trivial example of a parent and a child talking to each other over
292 * a BidirMMapPipe:
293 * @code
294 * #include <string>
295 * #include <iostream>
296 * #include <cstdlib>
297 *
298 * #include "BidirMMapPipe.h"
299 *
300 * int simplechild(BidirMMapPipe& pipe)
301 * {
302 * // child does an echo loop
303 * while (pipe.good() && !pipe.eof()) {
304 * // read a string
305 * std::string str;
306 * pipe >> str;
307 * if (!pipe) return -1;
308 * if (pipe.eof()) break;
309 * // check if parent wants us to shut down
310 * if (!str.empty()) {
311 * std::cout << "[CHILD] : read: " << str << std::endl;
312 * str = "... early in the morning?";
313 * }
314 * pipe << str << BidirMMapPipe::flush;
315 * if (str.empty()) break;
316 * if (!pipe) return -1;
317 * std::cout << "[CHILD] : wrote: " << str << std::endl;
318 * }
319 * // send shutdown request acknowledged
320 * pipe << "" << BidirMMapPipe::flush;
321 *
322 * pipe.close();
323 * return 0;
324 * }
325 *
326 * BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
327 * {
328 * BidirMMapPipe *p = new BidirMMapPipe();
329 * if (p->isChild()) {
330 * int retVal = childexec(*p);
331 * delete p;
332 * std::exit(retVal);
333 * }
334 * return p;
335 * }
336 *
337 * int main()
338 * {
339 * std::cout << "[PARENT]: simple challenge-response test, one child:" <<
340 * std::endl;
341 * BidirMMapPipe* pipe = spawnChild(simplechild);
342 * for (int i = 0; i < 5; ++i) {
343 * std::string str("What shall we do with a drunken sailor...");
344 * *pipe << str << BidirMMapPipe::flush;
345 * if (!*pipe) return -1;
346 * std::cout << "[PARENT]: wrote: " << str << std::endl;
347 * *pipe >> str;
348 * if (!*pipe) return -1;
349 * std::cout << "[PARENT]: read: " << str << std::endl;
350 * }
351 * // ask child to shut down
352 * pipe << "" << BidirMMapPipe::flush;
353 * // wait for it to see the shutdown request
354 * std::string s;
355 * pipe >> s;
356 * std::cout << "[PARENT]: exit status of child: " << pipe->close() <<
357 * std::endl;
358 * delete pipe;
359 * return 0;
360 * }
361 * @endcode
362 *
363 * When designing your own protocols to use over the pipe, there are a few
364 * things to bear in mind:
365 * - Do as http does: When building a request, send all the options and
366 * properties of that request with the request itself in a single go (one
367 * flush). Then, the server has everything it needs, and hopefully, it'll
368 * shut up for a while and to let the client do something useful in the
369 * meantime... The same goes when the server replies to the request: include
370 * everything there is to know about the result of the request in the reply.
371 * - The expensive operation should be the request that is made, all other
372 * operations should somehow be formulated as options or properties to that
373 * request.
374 * - Include a shutdown handshake in whatever protocol you send over the
375 * pipe. That way, you can shut things down in a controlled way. Otherwise,
376 * and depending on your OS's scheduling quirks, you may catch a SIGPIPE if
377 * one end closes its pipe while the other is still trying to read.
378 */
380#ifndef _WIN32
381 public:
382 /// type used to represent sizes
383 typedef std::size_t size_type;
384 /// convenience typedef for BidirMMapPipeException
385 typedef BidirMMapPipe_impl::BidirMMapPipeException Exception;
386 /// flag bits for partial C++ iostream compatibility
387 enum {
388 eofbit = 1, ///< end of file reached
389 failbit = 2, ///< logical failure (e.g. pipe closed)
390 rderrbit = 4, ///< read error
391 wrerrbit = 8, ///< write error
392 badbit = rderrbit | wrerrbit, ///< general I/O error
393 exceptionsbit = 16 ///< error reporting with exceptions
394 };
395
396 /** @brief constructor (forks!)
397 *
398 * Creates a bidirectional communications channel between this process
399 * and a child the constructor forks. On return from the constructor,
400 * isParent() and isChild() can be used to tell the parent end from the
401 * child end of the pipe. In the child, all other open BidirMMapPipes
402 * are closed.
403 *
404 * @param useExceptions read()/write() error reporting also done using
405 * exceptions
406 * @param useSocketpair use a socketpair instead of a pair or pipes
407 *
408 * Normally, exceptions are thrown for all serious I/O errors (apart
409 * from end of file). Setting useExceptions to false will force the
410 * read() and write() methods to only report serious I/O errors using
411 * flags.
412 *
413 * When useSocketpair is true, use a pair of Unix domain sockets
414 * created using socketpair instead a pair of pipes. The advantage is
415 * that only one pair of file descriptors is needed instead of two
416 * pairs which are needed for the pipe pair. Performance should very
417 * similar on most platforms, especially if mmap works, since only
418 * very little data is sent through the pipe(s)/socketpair.
419 */
420 BidirMMapPipe(bool useExceptions = true, bool useSocketpair = false);
421
422 /** @brief destructor
423 *
424 * closes this end of pipe
425 */
427
428 /** @brief return the current setting of the debug flag
429 *
430 * @returns an integer with the debug Setting
431 */
432 static int debugflag() { return s_debugflag; }
433
434 /** @brief set the debug flags
435 *
436 * @param flag debug flags (if zero, no messages are printed)
437 */
438 static void setDebugflag(int flag) { s_debugflag = flag; }
439
440 /** @brief read from pipe
441 *
442 * @param addr where to put read data
443 * @param sz size of data to read (in bytes)
444 * @returns size of data read, or 0 in case of end-of-file
445 *
446 * read may block until data from other end is available. It will
447 * return 0 if the other end closed the pipe.
448 */
449 size_type read(void* addr, size_type sz);
450
451 /** @brief wirte to pipe
452 *
453 * @param addr where to get data to write from
454 * @param sz size of data to write (in bytes)
455 * @returns size of data written, or 0 in case of end-of-file
456 *
457 * write may block until data can be written to other end (depends a
458 * bit on available buffer space). It will return 0 if the other end
459 * closed the pipe. The data is queued to be written on the next
460 * convenient occasion, or it can be forced out with flush().
461 */
462 size_type write(const void* addr, size_type sz);
463
464 /** @brief flush buffers with unwritten data
465 *
466 * This forces unwritten data to be written to the other end. The call
467 * will block until this has been done (or the attempt failed with an
468 * error).
469 */
470 void flush();
471
472 /** @brief purge buffered data waiting to be read and/or written
473 *
474 * Discards all internal buffers.
475 */
476 void purge();
477
478 /** @brief number of bytes that can be read without blocking
479 *
480 * @returns number of bytes that can be read without blocking
481 */
483
484 /** @brief number of bytes that can be written without blocking
485 *
486 * @returns number of bytes that can be written without blocking
487 */
489
490 /** @brief flush buffers, close pipe
491 *
492 * Flush buffers, discard unread data, closes the pipe. If the pipe is
493 * in the parent process, it waits for the child.
494 *
495 * @returns exit code of child process in parent, zero in child
496 */
497 int close();
498
499 /** @brief return PID of the process on the other end of the pipe
500 *
501 * @returns PID of the process running on the remote end
502 */
503 pid_t pidOtherEnd() const
504 { return isChild() ? m_parentPid : m_childPid; }
505
506 /// condition flags for poll
508 None = 0, ///< nothing special on this pipe
509 Readable = 1, ///< pipe has data for reading
510 Writable = 2, ///< pipe can be written to
511 ReadError = 4, ///< pipe error read end
512 WriteError = 8, ///< pipe error Write end
513 Error = ReadError | WriteError, ///< pipe error
514 ReadEndOfFile = 32, ///< read pipe in end-of-file state
515 WriteEndOfFile = 64,///< write pipe in end-of-file state
517 ReadInvalid = 64, ///< read end of pipe invalid
518 WriteInvalid = 128, ///< write end of pipe invalid
519 Invalid = ReadInvalid | WriteInvalid ///< invalid pipe
520 };
521
522 /// for poll() interface
523 class PollEntry {
524 public:
525 BidirMMapPipe* pipe; ///< pipe of interest
526 unsigned events; ///< events of interest (or'ed bitmask)
527 unsigned revents; ///< events that happened (or'ed bitmask)
528 /// poll a pipe for all events
530 pipe(_pipe), events(None), revents(None) { }
531 /// poll a pipe for specified events
532 PollEntry(BidirMMapPipe* _pipe, int _events) :
533 pipe(_pipe), events(_events), revents(None) { }
534 };
535 /// convenience typedef for poll() interface
536 typedef std::vector<PollEntry> PollVector;
537
538 /** @brief poll a set of pipes for events (ready to read from, ready to
539 * write to, error)
540 *
541 * @param pipes set of pipes to check
542 * @param timeout timeout in milliseconds
543 * @returns positive number: number of pipes which have
544 * status changes, 0: timeout, or no pipes with
545 * status changed, -1 on error
546 *
547 * Timeout can be zero (check for specified events, and return), finite
548 * (wait at most timeout milliseconds before returning), or -1
549 * (infinite). The poll method returns when the timeout has elapsed,
550 * or if an event occurs on one of the pipes being polled, whichever
551 * happens earlier.
552 *
553 * Pipes is a vector of one or more PollEntries, which each list a pipe
554 * and events to poll for. If events is left empty (zero), all
555 * conditions are polled for, otherwise only the indicated ones. On
556 * return, the revents fields contain the events that occurred for each
557 * pipe; error Error, EndOfFile or Invalid events are always set,
558 * regardless of wether they were in the set of requested events.
559 *
560 * poll may block slightly longer than specified by timeout due to OS
561 * timer granularity and OS scheduling. Due to its implementation, the
562 * poll call can also return early if the remote end of the page sends
563 * a free page while polling (which is put on that pipe's freelist),
564 * while that pipe is polled for e.g Reading. The status of the pipe is
565 * indicated correctly in revents, and the caller can simply poll
566 * again. (The reason this is done this way is because it helps to
567 * replenish the pool of free pages and queue busy pages without
568 * blocking.)
569 *
570 * Here's a piece of example code waiting on two pipes; if they become
571 * readable they are read:
572 * @code
573 * #include <unistd.h>
574 * #include <cstdlib>
575 * #include <string>
576 * #include <sstream>
577 * #include <iostream>
578 *
579 * #include "BidirMMapPipe.h"
580 *
581 * // what to execute in the child
582 * int randomchild(BidirMMapPipe& pipe)
583 * {
584 * ::srand48(::getpid());
585 * for (int i = 0; i < 5; ++i) {
586 * // sleep a random time between 0 and .9 seconds
587 * ::usleep(int(1e6 * ::drand48()));
588 * std::ostringstream buf;
589 * buf << "child pid " << ::getpid() << " sends message " << i;
590 * std::cout << "[CHILD] : " << buf.str() << std::endl;
591 * pipe << buf.str() << BidirMMapPipe::flush;
592 * if (!pipe) return -1;
593 * if (pipe.eof()) break;
594 * }
595 * // tell parent we're done
596 * pipe << "" << BidirMMapPipe::flush;
597 * // wait for parent to acknowledge
598 * std::string s;
599 * pipe >> s;
600 * pipe.close();
601 * return 0;
602 * }
603 *
604 * // function to spawn a child
605 * BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
606 * {
607 * BidirMMapPipe *p = new BidirMMapPipe();
608 * if (p->isChild()) {
609 * int retVal = childexec(*p);
610 * delete p;
611 * std::exit(retVal);
612 * }
613 * return p;
614 * }
615 *
616 * int main()
617 * {
618 * typedef BidirMMapPipe::PollEntry PollEntry;
619 * // poll data structure
620 * BidirMMapPipe::PollVector pipes;
621 * pipes.reserve(3);
622 * // spawn children
623 * for (int i = 0; i < 3; ++i) {
624 * pipes.push_back(PollEntry(spawnChild(randomchild),
625 * BidirMMapPipe::Readable));
626 * }
627 * // while at least some children alive
628 * while (!pipes.empty()) {
629 * // poll, wait until status change (infinite timeout)
630 * int npipes = BidirMMapPipe::poll(pipes, -1);
631 * // scan for pipes with changed status
632 * for (std::vector<PollEntry>::iterator it = pipes.begin();
633 * npipes && pipes.end() != it; ) {
634 * if (!it->revents) {
635 * // unchanged, next one
636 * ++it;
637 * continue;
638 * }
639 * --npipes; // maybe we can stop early...
640 * // read from pipes which are readable
641 * if (it->revents & BidirMMapPipe::Readable) {
642 * std::string s;
643 * *(it->pipe) >> s;
644 * if (!s.empty()) {
645 * std::cout << "[PARENT]: Read from pipe " <<
646 * it->pipe << ": " << s << std::endl;
647 * ++it;
648 * continue;
649 * } else {
650 * // child is shutting down...
651 * *(it->pipe) << "" << BidirMMapPipe::flush;
652 * goto childcloses;
653 * }
654 * }
655 * // retire pipes with error or end-of-file condition
656 * if (it->revents & (BidirMMapPipe::Error |
657 * BidirMMapPipe::EndOfFile |
658 * BidirMMapPipe::Invalid)) {
659 * std::cout << "[PARENT]: Error on pipe " <<
660 * it->pipe << " revents " << it->revents <<
661 * std::endl;
662 * childcloses:
663 * std::cout << "[PARENT]:\tchild exit status: " <<
664 * it->pipe->close() << std::endl;
665 * if (retVal) return retVal;
666 * delete it->pipe;
667 * it = pipes.erase(it);
668 * continue;
669 * }
670 * }
671 * }
672 * return 0;
673 * }
674 * @endcode
675 */
676 static int poll(PollVector& pipes, int timeout);
677
678 /** @brief return if this end of the pipe is the parent end
679 *
680 * @returns true if parent end of pipe
681 */
682 bool isParent() const { return m_childPid; }
683
684 /** @brief return if this end of the pipe is the child end
685 *
686 * @returns true if child end of pipe
687 */
688 bool isChild() const { return !m_childPid; }
689
690 /** @brief if BidirMMapPipe uses a socketpair for communications
691 *
692 * @returns true if BidirMMapPipe uses a socketpair for communications
693 */
694 bool usesSocketpair() const { return m_inpipe == m_outpipe; }
695
696 /** @brief if BidirMMapPipe uses a pipe pair for communications
697 *
698 * @returns true if BidirMMapPipe uses a pipe pair for communications
699 */
700 bool usesPipepair() const { return m_inpipe != m_outpipe; }
701
702 /** @brief return flags (end of file, BidirMMapPipe closed, ...)
703 *
704 * @returns flags (end of file, BidirMMapPipe closed, ...)
705 */
706 int rdstate() const { return m_flags; }
707
708 /** @brief true if end-of-file
709 *
710 * @returns true if end-of-file
711 */
712 bool eof() const { return m_flags & eofbit; }
713
714 /** @brief logical failure (e.g. I/O on closed BidirMMapPipe)
715 *
716 * @returns true in case of grave logical error (I/O on closed pipe,...)
717 */
718 bool fail() const { return m_flags & failbit; }
719
720 /** @brief true on I/O error
721 *
722 * @returns true on I/O error
723 */
724 bool bad() const { return m_flags & badbit; }
725
726 /** @brief status of stream is good
727 *
728 * @returns true if pipe is good (no errors, eof, ...)
729 */
730 bool good() const { return !(m_flags & (eofbit | failbit | badbit)); }
731
732 /** @brief true if closed
733 *
734 * @returns true if stream is closed
735 */
736 bool closed() const { return m_flags & failbit; }
737
738 /** @brief return true if not serious error (fail/bad)
739 *
740 * @returns true if stream is does not have serious error (fail/bad)
741 *
742 * (if EOF, this is still true)
743 */
744 operator bool() const { return !fail() && !bad(); }
745
746 /** @brief return true if serious error (fail/bad)
747 *
748 * @returns true if stream has a serious error (fail/bad)
749 */
750 bool operator!() const { return fail() || bad(); }
751
752#ifdef STREAMOP
753#undef STREAMOP
754#endif
755#define STREAMOP(TYPE) \
756 BidirMMapPipe& operator<<(const TYPE& val) \
757 { write(&val, sizeof(TYPE)); return *this; } \
758 BidirMMapPipe& operator>>(TYPE& val) \
759 { read(&val, sizeof(TYPE)); return *this; }
760 STREAMOP(bool); ///< C++ style stream operators for bool
761 STREAMOP(char); ///< C++ style stream operators for char
762 STREAMOP(short); ///< C++ style stream operators for short
763 STREAMOP(int); ///< C++ style stream operators for int
764 STREAMOP(long); ///< C++ style stream operators for long
765 STREAMOP(long long); ///< C++ style stream operators for long long
766 STREAMOP(unsigned char); ///< C++ style stream operators for unsigned char
767 STREAMOP(unsigned short); ///< C++ style stream operators for unsigned short
768 STREAMOP(unsigned int); ///< C++ style stream operators for unsigned int
769 STREAMOP(unsigned long); ///< C++ style stream operators for unsigned long
770 STREAMOP(unsigned long long); ///< C++ style stream operators for unsigned long long
771 STREAMOP(float); ///< C++ style stream operators for float
772 STREAMOP(double); ///< C++ style stream operators for double
773#undef STREAMOP
774
775 /** @brief write a C-style string
776 *
777 * @param str C-style string
778 * @returns pipe written to
779 */
780 BidirMMapPipe& operator<<(const char* str);
781
782 /** @brief read a C-style string
783 *
784 * @param str pointer to string (space allocated with malloc!)
785 * @returns pipe read from
786 *
787 * since this is for C-style strings, we use malloc/realloc/free for
788 * strings. passing in a NULL pointer is valid here, and the routine
789 * will use realloc to allocate a chunk of memory of the right size.
790 */
791 BidirMMapPipe& operator>>(char* (&str));
792
793 /** @brief write a std::string object
794 *
795 * @param str string to write
796 * @returns pipe written to
797 */
798 BidirMMapPipe& operator<<(const std::string& str);
799
800 /** @brief read a std::string object
801 *
802 * @param str string to be read
803 * @returns pipe read from
804 */
805 BidirMMapPipe& operator>>(std::string& str);
806
807 /** @brief write raw pointer to T to other side
808 *
809 * NOTE: This will not write the pointee! Only the value of the
810 * pointer is transferred.
811 *
812 * @param tptr pointer to be written
813 * @returns pipe written to
814 */
815 template<class T> BidirMMapPipe& operator<<(const T* tptr)
816 { write(&tptr, sizeof(tptr)); return *this; }
817
818 /** @brief read raw pointer to T from other side
819 *
820 * NOTE: This will not read the pointee! Only the value of the
821 * pointer is transferred.
822 *
823 * @param tptr pointer to be read
824 * @returns pipe read from
825 */
826 template<class T> BidirMMapPipe& operator>>(T* &tptr)
827 { read(&tptr, sizeof(tptr)); return *this; }
828
829 /** @brief I/O manipulator support
830 *
831 * @param manip manipulator
832 * @returns pipe with manipulator applied
833 *
834 * example:
835 * @code
836 * pipe << BidirMMapPipe::flush;
837 * @endcode
838 */
840 { return manip(*this); }
841
842 /** @brief I/O manipulator support
843 *
844 * @param manip manipulator
845 * @returns pipe with manipulator applied
846 *
847 * example:
848 * @code
849 * pipe >> BidirMMapPipe::purge;
850 * @endcode
851 */
853 { return manip(*this); }
854
855 /// for usage a la "pipe << flush;"
856 static BidirMMapPipe& flush(BidirMMapPipe& pipe) { pipe.flush(); return pipe; }
857 /// for usage a la "pipe << purge;"
858 static BidirMMapPipe& purge(BidirMMapPipe& pipe) { pipe.purge(); return pipe; }
859
860 private:
861 /// copy-construction forbidden
863 /// assignment forbidden
864 BidirMMapPipe& operator=(const BidirMMapPipe&) { return *this; }
865
866 /// page is our friend
868 /// convenience typedef for Page
870
871 /// tuning constants
872 enum {
873 // TotPages = 16 will give 32k buffers at 4k page size for both
874 // parent and child; if your average message to send is larger
875 // than this, consider raising the value (max 256)
876 TotPages = 16, ///< pages shared (child + parent)
877
878 PagesPerEnd = TotPages / 2, ///< pages per pipe end
879
880 // if FlushThresh pages are filled, the code forces a flush; 3/4
881 // of the pages available seems to work quite well
882 FlushThresh = (3 * PagesPerEnd) / 4 ///< flush threshold
883 };
884
885 // per-class members
886 static pthread_mutex_t s_openpipesmutex; ///< protects s_openpipes
887 /// list of open BidirMMapPipes
888 static std::list<BidirMMapPipe*> s_openpipes;
889 /// pool of mmapped pages
890 static BidirMMapPipe_impl::PagePool* s_pagepool;
891 /// page pool reference counter
892 static unsigned s_pagepoolrefcnt;
893 /// debug flag
894 static int s_debugflag;
895
896 /// return page pool
897 static BidirMMapPipe_impl::PagePool& pagepool();
898
899 // per-instance members
901 Page* m_busylist; ///< linked list: busy pages (data to be read)
902 Page* m_freelist; ///< linked list: free pages
903 Page* m_dirtylist; ///< linked list: dirty pages (data to be sent)
904 int m_inpipe; ///< pipe end from which data may be read
905 int m_outpipe; ///< pipe end to which data may be written
906 int m_flags; ///< flags (e.g. end of file)
907 pid_t m_childPid; ///< pid of the child (zero if we're child)
908 pid_t m_parentPid; ///< pid of the parent
909
910 /// cleanup routine - at exit, we want our children to get a SIGTERM...
911 static void teardownall(void);
912
913 /// return length of a page list
914 static unsigned lenPageList(const Page* list);
915
916 /** "feed" the busy and free lists with a list of pages
917 *
918 * @param plist linked list of pages
919 *
920 * goes through plist, puts free pages from plist onto the freelist
921 * (or sends them to the remote end if they belong there), and puts
922 * non-empty pages on plist onto the busy list
923 */
924 void feedPageLists(Page* plist);
925
926 /// put on dirty pages list
927 void markPageDirty(Page* p);
928
929 /// transfer bytes through the pipe (reading, writing, may block)
930 static size_type xferraw(int fd, void* addr, size_type len,
931 ssize_t (*xferfn)(int, void*, std::size_t));
932 /// transfer bytes through the pipe (reading, writing, may block)
933 static size_type xferraw(int fd, void* addr, const size_type len,
934 ssize_t (*xferfn)(int, const void*, std::size_t))
935 {
936 return xferraw(fd, addr, len,
937 reinterpret_cast<ssize_t (*)(
938 int, void*, std::size_t)>(xferfn));
939 }
940
941 /** @brief send page(s) to the other end (may block)
942 *
943 * @param plist linked list of pages to send
944 *
945 * the implementation gathers the different write(s) whereever
946 * possible; if mmap works, this results in a single write to transfer
947 * the list of pages sent, if we need to copy things through the pipe,
948 * we have one write to transfer which pages are sent, and then one
949 * write per page.
950 */
951 void sendpages(Page* plist);
952
953 /** @brief receive a pages from the other end (may block), queue them
954 *
955 * @returns number of pages received
956 *
957 * this is an application-level scatter read, which gets the list of
958 * pages to read from the pipe. if mmap works, it needs only one read
959 * call (to get the head of the list of pages transferred). if we need
960 * to copy pages through the pipe, we need to add one read for each
961 * empty page, and two reads for each non-empty page.
962 */
963 unsigned recvpages();
964
965 /** @brief receive pages from other end (non-blocking)
966 *
967 * @returns number of pages received
968 *
969 * like recvpages(), but does not block if nothing is available for
970 * reading
971 */
972 unsigned recvpages_nonblock();
973
974 /// get a busy page to read data from (may block)
975 Page* busypage();
976 /// get a dirty page to write data to (may block)
977 Page* dirtypage();
978
979 /// close the pipe (no flush if forced)
980 int doClose(bool force, bool holdlock = false);
981 /// perform the flush
982 void doFlush(bool forcePartialPages = true);
983#endif //_WIN32
984};
985
987
988#undef BEGIN_NAMESPACE_ROOFIT
989#undef END_NAMESPACE_ROOFIT
990
991#endif // BIDIRMMAPPIPE_H
992
993// vim: ft=cpp:sw=4:tw=78:et
#define END_NAMESPACE_ROOFIT
Definition: BidirMMapPipe.h:20
#define BEGIN_NAMESPACE_ROOFIT
Definition: BidirMMapPipe.h:19
for poll() interface
PollEntry(BidirMMapPipe *_pipe)
poll a pipe for all events
unsigned revents
events that happened (or'ed bitmask)
PollEntry(BidirMMapPipe *_pipe, int _events)
poll a pipe for specified events
BidirMMapPipe * pipe
pipe of interest
unsigned events
events of interest (or'ed bitmask)
class representing a chunk of pages
Definition: BidirMMapPipe.h:44
PageChunk(const PageChunk &)
forbid copying
Definition: BidirMMapPipe.h:81
BidirMMapPipeException Exception
convenience typedef
Definition: BidirMMapPipe.h:62
PageChunk & operator=(const PageChunk &)
forbid assignment
Definition: BidirMMapPipe.h:83
unsigned m_nUsedGrp
number of used page groups
Definition: BidirMMapPipe.h:71
unsigned len() const
return length of chunk
unsigned nPagesPerGroup() const
return number of pages per page group
void * m_begin
pointer to start of mmapped area
Definition: BidirMMapPipe.h:64
static unsigned physPgSz()
return the physical page size of the system
Definition: BidirMMapPipe.h:88
static unsigned s_physpgsz
system physical page size
Definition: BidirMMapPipe.h:56
bool full() const
return true if no free page groups in this chunk
void * m_end
pointer one behind end of mmapped area
Definition: BidirMMapPipe.h:65
void push(const Pages &p)
push a group of pages onto the free list
bool contains(const Pages &p) const
return if p is contained in this PageChunk
PagePool * m_parent
parent page pool
Definition: BidirMMapPipe.h:69
std::list< void * > m_freelist
free pages list
Definition: BidirMMapPipe.h:68
static MMapVariety s_mmapworks
mmap variety that works on this system
Definition: BidirMMapPipe.h:59
MMapVariety
type of mmap support found
Definition: BidirMMapPipe.h:47
@ Copy
mmap doesn't work, have to copy back and forth
Definition: BidirMMapPipe.h:49
@ Unknown
don't know yet what'll work
Definition: BidirMMapPipe.h:48
@ DevZero
mmapping /dev/zero works
Definition: BidirMMapPipe.h:51
@ Anonymous
anonymous mmap works
Definition: BidirMMapPipe.h:52
@ FileBacked
mmapping a temp file works
Definition: BidirMMapPipe.h:50
static void domunmap(void *p, unsigned len)
munmap pages p, len is length of mmapped area in bytes
static unsigned s_pagesize
logical page size (run-time determined)
Definition: BidirMMapPipe.h:57
unsigned m_nPgPerGrp
number of pages per group
Definition: BidirMMapPipe.h:70
static MMapVariety mmapVariety()
return mmap variety support found
Definition: BidirMMapPipe.h:90
Pages pop()
pop a group of pages off the free list
static unsigned pagesize()
return the logical page size
Definition: BidirMMapPipe.h:86
static void * dommap(unsigned len)
mmap pages, len is length of mmapped area in bytes
bool empty() const
return true if no used page groups in this chunk
void zap(Pages &p)
free all pages except for those pointed to by p
static unsigned getPageSize()
determine page size at run time
handle class for a number of Pages
unsigned operator[](Page *p) const
perform page to page number mapping
Pages()
default constructor
static unsigned pagesize()
return page size
impl * m_pimpl
pointer to implementation
Page * operator[](unsigned pgno) const
return page number pageno
Pages & operator=(const Pages &other)
assignment operator
void swap(Pages &other)
swap with other's contents
Page * page(unsigned pgno) const
return page number pageno
unsigned npages() const
return number of pages accessible
unsigned pageno(Page *p) const
perform page to page number mapping
BidirMMapPipe creates a bidirectional channel between the current process and a child it forks.
STREAMOP(short)
C++ style stream operators for short.
BidirMMapPipe & operator>>(T *&tptr)
read raw pointer to T from other side
void purge()
purge buffered data waiting to be read and/or written
STREAMOP(unsigned long long)
C++ style stream operators for unsigned long long.
static BidirMMapPipe_impl::PagePool * s_pagepool
pool of mmapped pages
void feedPageLists(Page *plist)
"feed" the busy and free lists with a list of pages
static std::list< BidirMMapPipe * > s_openpipes
list of open BidirMMapPipes
int close()
flush buffers, close pipe
STREAMOP(float)
C++ style stream operators for float.
static BidirMMapPipe & purge(BidirMMapPipe &pipe)
for usage a la "pipe << purge;"
bool usesSocketpair() const
if BidirMMapPipe uses a socketpair for communications
BidirMMapPipe_impl::Page Page
convenience typedef for Page
STREAMOP(long long)
C++ style stream operators for long long.
STREAMOP(long)
C++ style stream operators for long.
BidirMMapPipe_impl::Pages m_pages
mmapped pages
@ wrerrbit
write error
@ badbit
general I/O error
@ eofbit
end of file reached
@ failbit
logical failure (e.g. pipe closed)
@ exceptionsbit
error reporting with exceptions
@ rderrbit
read error
bool good() const
status of stream is good
static size_type xferraw(int fd, void *addr, const size_type len, ssize_t(*xferfn)(int, const void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
Page * m_dirtylist
linked list: dirty pages (data to be sent)
bool usesPipepair() const
if BidirMMapPipe uses a pipe pair for communications
pid_t m_parentPid
pid of the parent
STREAMOP(bool)
C++ style stream operators for bool.
@ PagesPerEnd
pages per pipe end
@ TotPages
pages shared (child + parent)
@ FlushThresh
flush threshold
unsigned recvpages()
receive a pages from the other end (may block), queue them
BidirMMapPipe & operator>>(char *(&str))
read a C-style string
static pthread_mutex_t s_openpipesmutex
protects s_openpipes
STREAMOP(double)
C++ style stream operators for double.
unsigned recvpages_nonblock()
receive pages from other end (non-blocking)
BidirMMapPipe & operator<<(const T *tptr)
write raw pointer to T to other side
~BidirMMapPipe()
destructor
int m_inpipe
pipe end from which data may be read
STREAMOP(unsigned long)
C++ style stream operators for unsigned long.
BidirMMapPipe & operator>>(BidirMMapPipe &(*manip)(BidirMMapPipe &))
I/O manipulator support.
bool isChild() const
return if this end of the pipe is the child end
void doFlush(bool forcePartialPages=true)
perform the flush
static BidirMMapPipe & flush(BidirMMapPipe &pipe)
for usage a la "pipe << flush;"
static void teardownall(void)
cleanup routine - at exit, we want our children to get a SIGTERM...
size_type bytesWritableNonBlocking()
number of bytes that can be written without blocking
STREAMOP(int)
C++ style stream operators for int.
STREAMOP(unsigned short)
C++ style stream operators for unsigned short.
static BidirMMapPipe_impl::PagePool & pagepool()
return page pool
bool eof() const
true if end-of-file
size_type bytesReadableNonBlocking()
number of bytes that can be read without blocking
pid_t pidOtherEnd() const
return PID of the process on the other end of the pipe
static int debugflag()
return the current setting of the debug flag
bool isParent() const
return if this end of the pipe is the parent end
std::size_t size_type
type used to represent sizes
int doClose(bool force, bool holdlock=false)
close the pipe (no flush if forced)
bool bad() const
true on I/O error
PollFlags
condition flags for poll
@ WriteInvalid
write end of pipe invalid
@ Invalid
invalid pipe
@ ReadEndOfFile
read pipe in end-of-file state
@ WriteError
pipe error Write end
@ ReadError
pipe error read end
@ Error
pipe error
@ Readable
pipe has data for reading
@ None
nothing special on this pipe
@ ReadInvalid
read end of pipe invalid
@ WriteEndOfFile
write pipe in end-of-file state
@ Writable
pipe can be written to
@ EndOfFile
end of file
int m_flags
flags (e.g. end of file)
Page * m_freelist
linked list: free pages
Page * busypage()
get a busy page to read data from (may block)
int rdstate() const
return flags (end of file, BidirMMapPipe closed, ...)
Page * dirtypage()
get a dirty page to write data to (may block)
BidirMMapPipe & operator=(const BidirMMapPipe &)
assignment forbidden
void sendpages(Page *plist)
send page(s) to the other end (may block)
BidirMMapPipe & operator<<(const char *str)
write a C-style string
static size_type xferraw(int fd, void *addr, size_type len, ssize_t(*xferfn)(int, void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
STREAMOP(char)
C++ style stream operators for char.
BidirMMapPipe & operator<<(BidirMMapPipe &(*manip)(BidirMMapPipe &))
I/O manipulator support.
STREAMOP(unsigned char)
C++ style stream operators for unsigned char.
size_type read(void *addr, size_type sz)
read from pipe
STREAMOP(unsigned int)
C++ style stream operators for unsigned int.
static int s_debugflag
debug flag
bool closed() const
true if closed
BidirMMapPipe_impl::BidirMMapPipeException Exception
convenience typedef for BidirMMapPipeException
size_type write(const void *addr, size_type sz)
wirte to pipe
bool operator!() const
return true if serious error (fail/bad)
friend class BidirMMapPipe_impl::Page
page is our friend
std::vector< PollEntry > PollVector
convenience typedef for poll() interface
void flush()
flush buffers with unwritten data
bool fail() const
logical failure (e.g.
static unsigned s_pagepoolrefcnt
page pool reference counter
static void setDebugflag(int flag)
set the debug flags
Page * m_busylist
linked list: busy pages (data to be read)
static int poll(PollVector &pipes, int timeout)
poll a set of pipes for events (ready to read from, ready to write to, error)
int m_outpipe
pipe end to which data may be written
void markPageDirty(Page *p)
put on dirty pages list
BidirMMapPipe(bool useExceptions=true, bool useSocketpair=false)
constructor (forks!)
static unsigned lenPageList(const Page *list)
return length of a page list
pid_t m_childPid
pid of the child (zero if we're child)
namespace for implementation details of BidirMMapPipe
double T(double x)
Definition: ChebyshevPol.h:34
unsigned char m_npages
length in pages
Page * m_pages
pointer to first page
unsigned m_refcnt
reference counter
PageChunk * m_parent
pointer to parent pool