Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
BidirMMapPipe.h
Go to the documentation of this file.
1/** @file BidirMMapPipe.h
2 *
3 * header file for BidirMMapPipe, a class which forks off a child process and
4 * serves as communications channel between parent and child
5 *
6 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
7 * @date 2013-07-07
8 */
9
10#ifndef BIDIRMMAPPIPE_H
11#define BIDIRMMAPPIPE_H
12
13#include <cstring>
14#include <list>
15#include <pthread.h>
16#include <string>
17#include <unistd.h>
18#include <vector>
19
20#define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
21#define END_NAMESPACE_ROOFIT }
22
24
25/// namespace for implementation details of BidirMMapPipe
26namespace BidirMMapPipe_impl {
27 // forward declarations
28 class BidirMMapPipeException;
29 class Page;
30 class PagePool;
31 class Pages;
32
33 /** @brief class representing a chunk of pages
34 *
35 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
36 * @date 2013-07-24
37 *
38 * allocating pages from the OS happens in chunks in order to not exhaust
39 * the maximum allowed number of memory mappings per process; this class
40 * takes care of such a chunk
41 *
42 * a page chunk allows callers to obtain or release pages in groups of
43 * continuous pages of fixed size
44 */
45 class PageChunk {
46 public:
47 /// type of mmap support found
48 typedef enum {
49 Unknown, ///< don't know yet what'll work
50 Copy, ///< mmap doesn't work, have to copy back and forth
51 FileBacked, ///< mmapping a temp file works
52 DevZero, ///< mmapping /dev/zero works
53 Anonymous ///< anonymous mmap works
55
56 private:
57 static unsigned s_physpgsz; ///< system physical page size
58 static unsigned s_pagesize; ///< logical page size (run-time determined)
59 /// mmap variety that works on this system
61
62 /// convenience typedef
64
65 void* m_begin; ///< pointer to start of mmapped area
66 void* m_end; ///< pointer one behind end of mmapped area
67 // FIXME: cannot keep freelist inline - other end may need that
68 // data, and we'd end up overwriting the page header
69 std::list<void*> m_freelist; ///< free pages list
70 PagePool* m_parent; ///< parent page pool
71 unsigned m_nPgPerGrp; ///< number of pages per group
72 unsigned m_nUsedGrp; ///< number of used page groups
73
74 /// determine page size at run time
75 static unsigned getPageSize();
76
77 /// mmap pages, len is length of mmapped area in bytes
78 static void* dommap(unsigned len);
79 /// munmap pages p, len is length of mmapped area in bytes
80 static void domunmap(void* p, unsigned len);
81 /// forbid copying
83 /// forbid assignment
84 PageChunk& operator=(const PageChunk&) { return *this; }
85 public:
86 /// return the logical page size
87 static unsigned pagesize() { return s_pagesize; }
88 /// return the physical page size of the system
89 static unsigned physPgSz() { return s_physpgsz; }
90 /// return mmap variety support found
91 static MMapVariety mmapVariety() { return s_mmapworks; }
92
93 /// constructor
94 PageChunk(PagePool* parent, unsigned length, unsigned nPgPerGroup);
95
96 /// destructor
97 ~PageChunk();
98
99 /// return if p is contained in this PageChunk
100 bool contains(const Pages& p) const;
101
102 /// pop a group of pages off the free list
103 Pages pop();
104
105 /// push a group of pages onto the free list
106 void push(const Pages& p);
107
108 /// return length of chunk
109 unsigned len() const
110 {
111 return reinterpret_cast<unsigned char*>(m_end) -
112 reinterpret_cast<unsigned char*>(m_begin);
113 }
114 /// return number of pages per page group
115 unsigned nPagesPerGroup() const { return m_nPgPerGrp; }
116
117 /// return true if no used page groups in this chunk
118 bool empty() const { return !m_nUsedGrp; }
119
120 /// return true if no free page groups in this chunk
121 bool full() const { return m_freelist.empty(); }
122
123 /// free all pages except for those pointed to by p
124 void zap(Pages& p);
125 };
126
127 /** @brief handle class for a number of Pages
128 *
129 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
130 * @date 2013-07-24
131 *
132 * the associated pages are continuous in memory
133 */
134 class Pages {
135 private:
136 /// implementation
137 typedef struct {
138 PageChunk *m_parent; ///< pointer to parent pool
139 Page* m_pages; ///< pointer to first page
140 unsigned m_refcnt; ///< reference counter
141 unsigned char m_npages; ///< length in pages
142 } impl;
143 public:
144 /// default constructor
145 Pages() : m_pimpl(0) { }
146
147 /// destructor
148 ~Pages();
149
150 /** @brief copy constructor
151 *
152 * copy Pages handle to new object - old object loses ownership,
153 * and becomes a dangling handle
154 */
155 Pages(const Pages& other);
156
157 /** @brief assignment operator
158 *
159 * assign Pages handle to new object - old object loses ownership,
160 * and becomes a dangling handle
161 */
162 Pages& operator=(const Pages& other);
163
164 /// return page size
165 static unsigned pagesize();
166
167 /// return number of pages accessible
168 unsigned npages() const { return m_pimpl->m_npages; }
169
170 /// return page number pageno
171 Page* page(unsigned pgno) const;
172
173 /// return page number pageno
174 Page* operator[](unsigned pgno) const { return page(pgno); }
175
176 /// perform page to page number mapping
177 unsigned pageno(Page* p) const;
178
179 /// perform page to page number mapping
180 unsigned operator[](Page* p) const { return pageno(p); }
181
182 /// swap with other's contents
183 void swap(Pages& other)
184 {
185 impl* tmp = other.m_pimpl;
186 other.m_pimpl = m_pimpl;
187 m_pimpl = tmp;
188 }
189
190 private:
191 /// page pool is our friend - it's allowed to construct Pages
193
194 /// pointer to implementation
196
197 /// constructor
198 Pages(PageChunk* parent, Page* pages, unsigned npg);
199 };
200}
201
202/** @brief BidirMMapPipe creates a bidirectional channel between the current
203 * process and a child it forks.
204 *
205 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
206 * @date 2013-07-07
207 *
208 * This class creates a bidirectional channel between this process and a child
209 * it creates with fork().
210 *
211 * The channel is comrised of a small shared pool of buffer memory mmapped into
212 * both process spaces, and two pipes to synchronise the exchange of data. The
213 * idea behind using the pipes at all is to have some primitive which we can
214 * block on without having to worry about atomic operations or polling, leaving
215 * these tasks to the OS. In case the anonymous mmap cannot be performed on the
216 * OS the code is running on (for whatever reason), the code falls back to
217 * mmapping /dev/zero, mmapping a temporary file, or (if those all fail), a
218 * dynamically allocated buffer which is then transmitted through the pipe(s),
219 * a slightly slower alternative (because the data is copied more often).
220 *
221 * The channel supports five major operations: read(), write(), flush(),
222 * purge() and close(). Reading and writing may block until the required buffer
223 * space is available. Writes may queue up data to be sent to the other end
224 * until either enough pages are full, or the user calls flush which forces
225 * any unsent buffers to be sent to the other end. flush forces any data that
226 * is to be sent to be sent. purge discards any buffered data waiting to be
227 * read and/or sent. Closing the channel on the child returns zero, closing it
228 * on the parent returns the child's exit status.
229 *
230 * The class also provides operator<< and operator>> for C++-style I/O for
231 * basic data types (bool, char, short, int, long, long long, float, double
232 * and their unsigned counterparts). Data is transmitted binary (i.e. no
233 * formatting to strings like std::cout does). There are also overloads to
234 * support C-style zero terminated strings and std::string. In terms of
235 * performance, the former is to be preferred.
236 *
237 * If the caller needs to multiplex input and output to/from several pipes, the
238 * class provides the poll() method which allows to block until an event occurs
239 * on any of the polled pipes.
240 *
241 * After the BidirMMapPipe is closed, no further operations may be performed on
242 * that object, save for the destructor which may still be called.
243 *
244 * If the BidirMMapPipe has not properly been closed, the destructor will call
245 * close. However, the exit code of the child is lost in that case.
246 *
247 * Closing the object causes the mmapped memory to be unmapped and the two
248 * pipes to be closed. We also install an atexit handler in the process of
249 * creating BidirMMapPipes. This ensures that when the current process
250 * terminates, a SIGTERM signal is sent to the child processes created for all
251 * unclosed pipes to avoid leaving zombie processes in the OS's process table.
252 *
253 * BidirMMapPipe creation, closing and destruction are thread safe. If the
254 * BidirMMapPipe is used in more than one thread, the other operations have to
255 * be protected with a mutex (or something similar), though.
256 *
257 * End of file (other end closed its pipe, or died) is indicated with the eof()
258 * method, serious I/O errors set a flags (bad(), fail()), and also throw
259 * exceptions. For normal read/write operations, they can be suppressed (i.e.
260 * error reporting only using flags) with a constructor argument.
261 *
262 * Technicalities:
263 * - there is a pool of mmapped pages, half the pages are allocated to the
264 * parent process, half to the child
265 * - when one side has accumulated enough data (or a flush forces dirty pages
266 * out to the other end), it sends these pages to the other end by writing a
267 * byte containing the page number into the pipe
268 * - the other end (which has the pages mmapped, too) reads the page number(s)
269 * and puts the corresponding pages on its busy list
270 * - as the other ends reads, it frees busy pages, and eventually tries to put
271 * them on the its list; if a page belongs to the other end of the
272 * connection, it is sent back
273 * - lists of pages are sent across the pipe, not individual pages, in order
274 * to minimise the number of read/write operations needed
275 * - when mmap works properly, only one bytes containing the page number of
276 * the page list head is sent back and forth; the contents of that page
277 * allow to access the rest of the page list sent, and page headers on the
278 * list tell the receiving end if the page is free or has to be added to the
279 * busy list
280 * - when mmap does not work, we transfer one byte to indicate the head of the
281 * page list sent, and for each page on the list of sent pages, the page
282 * header and the page payload is sent (if the page is free, we only
283 * transmit the page header, and we never transmit more payload than
284 * the page actually contains)
285 * - in the child, all open BidirMMapPipes but the current one are closed. this
286 * is done for two reasons: first, to conserve file descriptors and address
287 * space. second, if more than one process is meant to use such a
288 * BidirMMapPipe, synchronisation issues arise which can lead to bugs that
289 * are hard to find and understand. it's much better to come up with a design
290 * which does not need pipes to be shared among more than two processes.
291 *
292 * Here is a trivial example of a parent and a child talking to each other over
293 * a BidirMMapPipe:
294 * @code
295 * #include <string>
296 * #include <iostream>
297 * #include <cstdlib>
298 *
299 * #include "BidirMMapPipe.h"
300 *
301 * int simplechild(BidirMMapPipe& pipe)
302 * {
303 * // child does an echo loop
304 * while (pipe.good() && !pipe.eof()) {
305 * // read a string
306 * std::string str;
307 * pipe >> str;
308 * if (!pipe) return -1;
309 * if (pipe.eof()) break;
310 * // check if parent wants us to shut down
311 * if (!str.empty()) {
312 * std::cout << "[CHILD] : read: " << str << std::endl;
313 * str = "... early in the morning?";
314 * }
315 * pipe << str << BidirMMapPipe::flush;
316 * if (str.empty()) break;
317 * if (!pipe) return -1;
318 * std::cout << "[CHILD] : wrote: " << str << std::endl;
319 * }
320 * // send shutdown request acknowledged
321 * pipe << "" << BidirMMapPipe::flush;
322 *
323 * pipe.close();
324 * return 0;
325 * }
326 *
327 * BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
328 * {
329 * BidirMMapPipe *p = new BidirMMapPipe();
330 * if (p->isChild()) {
331 * int retVal = childexec(*p);
332 * delete p;
333 * std::exit(retVal);
334 * }
335 * return p;
336 * }
337 *
338 * int main()
339 * {
340 * std::cout << "[PARENT]: simple challenge-response test, one child:" <<
341 * std::endl;
342 * BidirMMapPipe* pipe = spawnChild(simplechild);
343 * for (int i = 0; i < 5; ++i) {
344 * std::string str("What shall we do with a drunken sailor...");
345 * *pipe << str << BidirMMapPipe::flush;
346 * if (!*pipe) return -1;
347 * std::cout << "[PARENT]: wrote: " << str << std::endl;
348 * *pipe >> str;
349 * if (!*pipe) return -1;
350 * std::cout << "[PARENT]: read: " << str << std::endl;
351 * }
352 * // ask child to shut down
353 * pipe << "" << BidirMMapPipe::flush;
354 * // wait for it to see the shutdown request
355 * std::string s;
356 * pipe >> s;
357 * std::cout << "[PARENT]: exit status of child: " << pipe->close() <<
358 * std::endl;
359 * delete pipe;
360 * return 0;
361 * }
362 * @endcode
363 *
364 * When designing your own protocols to use over the pipe, there are a few
365 * things to bear in mind:
366 * - Do as http does: When building a request, send all the options and
367 * properties of that request with the request itself in a single go (one
368 * flush). Then, the server has everything it needs, and hopefully, it'll
369 * shut up for a while and to let the client do something useful in the
370 * meantime... The same goes when the server replies to the request: include
371 * everything there is to know about the result of the request in the reply.
372 * - The expensive operation should be the request that is made, all other
373 * operations should somehow be formulated as options or properties to that
374 * request.
375 * - Include a shutdown handshake in whatever protocol you send over the
376 * pipe. That way, you can shut things down in a controlled way. Otherwise,
377 * and depending on your OS's scheduling quirks, you may catch a SIGPIPE if
378 * one end closes its pipe while the other is still trying to read.
379 */
381#ifndef _WIN32
382 public:
383 /// type used to represent sizes
384 typedef std::size_t size_type;
385 /// convenience typedef for BidirMMapPipeException
387 /// flag bits for partial C++ iostream compatibility
388 enum {
389 eofbit = 1, ///< end of file reached
390 failbit = 2, ///< logical failure (e.g. pipe closed)
391 rderrbit = 4, ///< read error
392 wrerrbit = 8, ///< write error
393 badbit = rderrbit | wrerrbit, ///< general I/O error
394 exceptionsbit = 16 ///< error reporting with exceptions
395 };
396
397 /** @brief constructor (forks!)
398 *
399 * Creates a bidirectional communications channel between this process
400 * and a child the constructor forks. On return from the constructor,
401 * isParent() and isChild() can be used to tell the parent end from the
402 * child end of the pipe. In the child, all other open BidirMMapPipes
403 * are closed.
404 *
405 * @param useExceptions read()/write() error reporting also done using
406 * exceptions
407 * @param useSocketpair use a socketpair instead of a pair or pipes
408 *
409 * Normally, exceptions are thrown for all serious I/O errors (apart
410 * from end of file). Setting useExceptions to false will force the
411 * read() and write() methods to only report serious I/O errors using
412 * flags.
413 *
414 * When useSocketpair is true, use a pair of Unix domain sockets
415 * created using socketpair instead a pair of pipes. The advantage is
416 * that only one pair of file descriptors is needed instead of two
417 * pairs which are needed for the pipe pair. Performance should very
418 * similar on most platforms, especially if mmap works, since only
419 * very little data is sent through the pipe(s)/socketpair.
420 */
421 BidirMMapPipe(bool useExceptions = true, bool useSocketpair = false);
422
423 /** @brief destructor
424 *
425 * closes this end of pipe
426 */
428
429 /** @brief return the current setting of the debug flag
430 *
431 * @returns an integer with the debug Setting
432 */
433 static int debugflag() { return s_debugflag; }
434
435 /** @brief set the debug flags
436 *
437 * @param flag debug flags (if zero, no messages are printed)
438 */
439 static void setDebugflag(int flag) { s_debugflag = flag; }
440
441 /** @brief read from pipe
442 *
443 * @param addr where to put read data
444 * @param sz size of data to read (in bytes)
445 * @returns size of data read, or 0 in case of end-of-file
446 *
447 * read may block until data from other end is available. It will
448 * return 0 if the other end closed the pipe.
449 */
450 size_type read(void* addr, size_type sz);
451
452 /** @brief wirte to pipe
453 *
454 * @param addr where to get data to write from
455 * @param sz size of data to write (in bytes)
456 * @returns size of data written, or 0 in case of end-of-file
457 *
458 * write may block until data can be written to other end (depends a
459 * bit on available buffer space). It will return 0 if the other end
460 * closed the pipe. The data is queued to be written on the next
461 * convenient occasion, or it can be forced out with flush().
462 */
463 size_type write(const void* addr, size_type sz);
464
465 /** @brief flush buffers with unwritten data
466 *
467 * This forces unwritten data to be written to the other end. The call
468 * will block until this has been done (or the attempt failed with an
469 * error).
470 */
471 void flush();
472
473 /** @brief purge buffered data waiting to be read and/or written
474 *
475 * Discards all internal buffers.
476 */
477 void purge();
478
479 /** @brief number of bytes that can be read without blocking
480 *
481 * @returns number of bytes that can be read without blocking
482 */
484
485 /** @brief number of bytes that can be written without blocking
486 *
487 * @returns number of bytes that can be written without blocking
488 */
490
491 /** @brief flush buffers, close pipe
492 *
493 * Flush buffers, discard unread data, closes the pipe. If the pipe is
494 * in the parent process, it waits for the child.
495 *
496 * @returns exit code of child process in parent, zero in child
497 */
498 int close();
499
500 /** @brief return PID of the process on the other end of the pipe
501 *
502 * @returns PID of the process running on the remote end
503 */
504 pid_t pidOtherEnd() const
505 { return isChild() ? m_parentPid : m_childPid; }
506
507 /// condition flags for poll
509 None = 0, ///< nothing special on this pipe
510 Readable = 1, ///< pipe has data for reading
511 Writable = 2, ///< pipe can be written to
512 ReadError = 4, ///< pipe error read end
513 WriteError = 8, ///< pipe error Write end
514 Error = ReadError | WriteError, ///< pipe error
515 ReadEndOfFile = 32, ///< read pipe in end-of-file state
516 WriteEndOfFile = 64,///< write pipe in end-of-file state
518 ReadInvalid = 64, ///< read end of pipe invalid
519 WriteInvalid = 128, ///< write end of pipe invalid
520 Invalid = ReadInvalid | WriteInvalid ///< invalid pipe
521 };
522
523 /// for poll() interface
524 class PollEntry {
525 public:
526 BidirMMapPipe* pipe; ///< pipe of interest
527 unsigned events; ///< events of interest (or'ed bitmask)
528 unsigned revents; ///< events that happened (or'ed bitmask)
529 /// poll a pipe for all events
531 pipe(_pipe), events(None), revents(None) { }
532 /// poll a pipe for specified events
533 PollEntry(BidirMMapPipe* _pipe, int _events) :
534 pipe(_pipe), events(_events), revents(None) { }
535 };
536 /// convenience typedef for poll() interface
537 typedef std::vector<PollEntry> PollVector;
538
539 /** @brief poll a set of pipes for events (ready to read from, ready to
540 * write to, error)
541 *
542 * @param pipes set of pipes to check
543 * @param timeout timeout in milliseconds
544 * @returns positive number: number of pipes which have
545 * status changes, 0: timeout, or no pipes with
546 * status changed, -1 on error
547 *
548 * Timeout can be zero (check for specified events, and return), finite
549 * (wait at most timeout milliseconds before returning), or -1
550 * (infinite). The poll method returns when the timeout has elapsed,
551 * or if an event occurs on one of the pipes being polled, whichever
552 * happens earlier.
553 *
554 * Pipes is a vector of one or more PollEntries, which each list a pipe
555 * and events to poll for. If events is left empty (zero), all
556 * conditions are polled for, otherwise only the indicated ones. On
557 * return, the revents fields contain the events that occurred for each
558 * pipe; error Error, EndOfFile or Invalid events are always set,
559 * regardless of wether they were in the set of requested events.
560 *
561 * poll may block slightly longer than specified by timeout due to OS
562 * timer granularity and OS scheduling. Due to its implementation, the
563 * poll call can also return early if the remote end of the page sends
564 * a free page while polling (which is put on that pipe's freelist),
565 * while that pipe is polled for e.g Reading. The status of the pipe is
566 * indicated correctly in revents, and the caller can simply poll
567 * again. (The reason this is done this way is because it helps to
568 * replenish the pool of free pages and queue busy pages without
569 * blocking.)
570 *
571 * Here's a piece of example code waiting on two pipes; if they become
572 * readable they are read:
573 * @code
574 * #include <unistd.h>
575 * #include <cstdlib>
576 * #include <string>
577 * #include <sstream>
578 * #include <iostream>
579 *
580 * #include "BidirMMapPipe.h"
581 *
582 * // what to execute in the child
583 * int randomchild(BidirMMapPipe& pipe)
584 * {
585 * ::srand48(::getpid());
586 * for (int i = 0; i < 5; ++i) {
587 * // sleep a random time between 0 and .9 seconds
588 * ::usleep(int(1e6 * ::drand48()));
589 * std::ostringstream buf;
590 * buf << "child pid " << ::getpid() << " sends message " << i;
591 * std::cout << "[CHILD] : " << buf.str() << std::endl;
592 * pipe << buf.str() << BidirMMapPipe::flush;
593 * if (!pipe) return -1;
594 * if (pipe.eof()) break;
595 * }
596 * // tell parent we're done
597 * pipe << "" << BidirMMapPipe::flush;
598 * // wait for parent to acknowledge
599 * std::string s;
600 * pipe >> s;
601 * pipe.close();
602 * return 0;
603 * }
604 *
605 * // function to spawn a child
606 * BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
607 * {
608 * BidirMMapPipe *p = new BidirMMapPipe();
609 * if (p->isChild()) {
610 * int retVal = childexec(*p);
611 * delete p;
612 * std::exit(retVal);
613 * }
614 * return p;
615 * }
616 *
617 * int main()
618 * {
619 * typedef BidirMMapPipe::PollEntry PollEntry;
620 * // poll data structure
621 * BidirMMapPipe::PollVector pipes;
622 * pipes.reserve(3);
623 * // spawn children
624 * for (int i = 0; i < 3; ++i) {
625 * pipes.push_back(PollEntry(spawnChild(randomchild),
626 * BidirMMapPipe::Readable));
627 * }
628 * // while at least some children alive
629 * while (!pipes.empty()) {
630 * // poll, wait until status change (infinite timeout)
631 * int npipes = BidirMMapPipe::poll(pipes, -1);
632 * // scan for pipes with changed status
633 * for (std::vector<PollEntry>::iterator it = pipes.begin();
634 * npipes && pipes.end() != it; ) {
635 * if (!it->revents) {
636 * // unchanged, next one
637 * ++it;
638 * continue;
639 * }
640 * --npipes; // maybe we can stop early...
641 * // read from pipes which are readable
642 * if (it->revents & BidirMMapPipe::Readable) {
643 * std::string s;
644 * *(it->pipe) >> s;
645 * if (!s.empty()) {
646 * std::cout << "[PARENT]: Read from pipe " <<
647 * it->pipe << ": " << s << std::endl;
648 * ++it;
649 * continue;
650 * } else {
651 * // child is shutting down...
652 * *(it->pipe) << "" << BidirMMapPipe::flush;
653 * goto childcloses;
654 * }
655 * }
656 * // retire pipes with error or end-of-file condition
657 * if (it->revents & (BidirMMapPipe::Error |
658 * BidirMMapPipe::EndOfFile |
659 * BidirMMapPipe::Invalid)) {
660 * std::cout << "[PARENT]: Error on pipe " <<
661 * it->pipe << " revents " << it->revents <<
662 * std::endl;
663 * childcloses:
664 * std::cout << "[PARENT]:\tchild exit status: " <<
665 * it->pipe->close() << std::endl;
666 * if (retVal) return retVal;
667 * delete it->pipe;
668 * it = pipes.erase(it);
669 * continue;
670 * }
671 * }
672 * }
673 * return 0;
674 * }
675 * @endcode
676 */
677 static int poll(PollVector& pipes, int timeout);
678
679 /** @brief return if this end of the pipe is the parent end
680 *
681 * @returns true if parent end of pipe
682 */
683 bool isParent() const { return m_childPid; }
684
685 /** @brief return if this end of the pipe is the child end
686 *
687 * @returns true if child end of pipe
688 */
689 bool isChild() const { return !m_childPid; }
690
691 /** @brief if BidirMMapPipe uses a socketpair for communications
692 *
693 * @returns true if BidirMMapPipe uses a socketpair for communications
694 */
695 bool usesSocketpair() const { return m_inpipe == m_outpipe; }
696
697 /** @brief if BidirMMapPipe uses a pipe pair for communications
698 *
699 * @returns true if BidirMMapPipe uses a pipe pair for communications
700 */
701 bool usesPipepair() const { return m_inpipe != m_outpipe; }
702
703 /** @brief return flags (end of file, BidirMMapPipe closed, ...)
704 *
705 * @returns flags (end of file, BidirMMapPipe closed, ...)
706 */
707 int rdstate() const { return m_flags; }
708
709 /** @brief true if end-of-file
710 *
711 * @returns true if end-of-file
712 */
713 bool eof() const { return m_flags & eofbit; }
714
715 /** @brief logical failure (e.g. I/O on closed BidirMMapPipe)
716 *
717 * @returns true in case of grave logical error (I/O on closed pipe,...)
718 */
719 bool fail() const { return m_flags & failbit; }
720
721 /** @brief true on I/O error
722 *
723 * @returns true on I/O error
724 */
725 bool bad() const { return m_flags & badbit; }
726
727 /** @brief status of stream is good
728 *
729 * @returns true if pipe is good (no errors, eof, ...)
730 */
731 bool good() const { return !(m_flags & (eofbit | failbit | badbit)); }
732
733 /** @brief true if closed
734 *
735 * @returns true if stream is closed
736 */
737 bool closed() const { return m_flags & failbit; }
738
739 /** @brief return true if not serious error (fail/bad)
740 *
741 * @returns true if stream is does not have serious error (fail/bad)
742 *
743 * (if EOF, this is still true)
744 */
745 operator bool() const { return !fail() && !bad(); }
746
747 /** @brief return true if serious error (fail/bad)
748 *
749 * @returns true if stream has a serious error (fail/bad)
750 */
751 bool operator!() const { return fail() || bad(); }
752
753#ifdef STREAMOP
754#undef STREAMOP
755#endif
756#define STREAMOP(TYPE) \
757 BidirMMapPipe& operator<<(const TYPE& val) \
758 { write(&val, sizeof(TYPE)); return *this; } \
759 BidirMMapPipe& operator>>(TYPE& val) \
760 { read(&val, sizeof(TYPE)); return *this; }
761 STREAMOP(bool); ///< C++ style stream operators for bool
762 STREAMOP(char); ///< C++ style stream operators for char
763 STREAMOP(short); ///< C++ style stream operators for short
764 STREAMOP(int); ///< C++ style stream operators for int
765 STREAMOP(long); ///< C++ style stream operators for long
766 STREAMOP(long long); ///< C++ style stream operators for long long
767 STREAMOP(unsigned char); ///< C++ style stream operators for unsigned char
768 STREAMOP(unsigned short); ///< C++ style stream operators for unsigned short
769 STREAMOP(unsigned int); ///< C++ style stream operators for unsigned int
770 STREAMOP(unsigned long); ///< C++ style stream operators for unsigned long
771 STREAMOP(unsigned long long); ///< C++ style stream operators for unsigned long long
772 STREAMOP(float); ///< C++ style stream operators for float
773 STREAMOP(double); ///< C++ style stream operators for double
774#undef STREAMOP
775
776 /** @brief write a C-style string
777 *
778 * @param str C-style string
779 * @returns pipe written to
780 */
781 BidirMMapPipe& operator<<(const char* str);
782
783 /** @brief read a C-style string
784 *
785 * @param str pointer to string (space allocated with malloc!)
786 * @returns pipe read from
787 *
788 * since this is for C-style strings, we use malloc/realloc/free for
789 * strings. passing in a NULL pointer is valid here, and the routine
790 * will use realloc to allocate a chunk of memory of the right size.
791 */
792 BidirMMapPipe& operator>>(char* (&str));
793
794 /** @brief write a std::string object
795 *
796 * @param str string to write
797 * @returns pipe written to
798 */
799 BidirMMapPipe& operator<<(const std::string& str);
800
801 /** @brief read a std::string object
802 *
803 * @param str string to be read
804 * @returns pipe read from
805 */
806 BidirMMapPipe& operator>>(std::string& str);
807
808 /** @brief write raw pointer to T to other side
809 *
810 * NOTE: This will not write the pointee! Only the value of the
811 * pointer is transferred.
812 *
813 * @param tptr pointer to be written
814 * @returns pipe written to
815 */
816 template<class T> BidirMMapPipe& operator<<(const T* tptr)
817 { write(&tptr, sizeof(tptr)); return *this; }
818
819 /** @brief read raw pointer to T from other side
820 *
821 * NOTE: This will not read the pointee! Only the value of the
822 * pointer is transferred.
823 *
824 * @param tptr pointer to be read
825 * @returns pipe read from
826 */
827 template<class T> BidirMMapPipe& operator>>(T* &tptr)
828 { read(&tptr, sizeof(tptr)); return *this; }
829
830 /** @brief I/O manipulator support
831 *
832 * @param manip manipulator
833 * @returns pipe with manipulator applied
834 *
835 * example:
836 * @code
837 * pipe << BidirMMapPipe::flush;
838 * @endcode
839 */
841 { return manip(*this); }
842
843 /** @brief I/O manipulator support
844 *
845 * @param manip manipulator
846 * @returns pipe with manipulator applied
847 *
848 * example:
849 * @code
850 * pipe >> BidirMMapPipe::purge;
851 * @endcode
852 */
854 { return manip(*this); }
855
856 /// for usage a la "pipe << flush;"
857 static BidirMMapPipe& flush(BidirMMapPipe& pipe) { pipe.flush(); return pipe; }
858 /// for usage a la "pipe << purge;"
859 static BidirMMapPipe& purge(BidirMMapPipe& pipe) { pipe.purge(); return pipe; }
860
861 private:
862 /// copy-construction forbidden
864 /// assignment forbidden
865 BidirMMapPipe& operator=(const BidirMMapPipe&) { return *this; }
866
867 /// page is our friend
869 /// convenience typedef for Page
871
872 /// tuning constants
873 enum {
874 // TotPages = 16 will give 32k buffers at 4k page size for both
875 // parent and child; if your average message to send is larger
876 // than this, consider raising the value (max 256)
877 TotPages = 16, ///< pages shared (child + parent)
878
879 PagesPerEnd = TotPages / 2, ///< pages per pipe end
880
881 // if FlushThresh pages are filled, the code forces a flush; 3/4
882 // of the pages available seems to work quite well
883 FlushThresh = (3 * PagesPerEnd) / 4 ///< flush threshold
884 };
885
886 // per-class members
887 static pthread_mutex_t s_openpipesmutex; ///< protects s_openpipes
888 /// list of open BidirMMapPipes
889 static std::list<BidirMMapPipe*> s_openpipes;
890 /// pool of mmapped pages
892 /// page pool reference counter
893 static unsigned s_pagepoolrefcnt;
894 /// debug flag
895 static int s_debugflag;
896
897 /// return page pool
899
900 // per-instance members
902 Page* m_busylist; ///< linked list: busy pages (data to be read)
903 Page* m_freelist; ///< linked list: free pages
904 Page* m_dirtylist; ///< linked list: dirty pages (data to be sent)
905 int m_inpipe; ///< pipe end from which data may be read
906 int m_outpipe; ///< pipe end to which data may be written
907 int m_flags; ///< flags (e.g. end of file)
908 pid_t m_childPid; ///< pid of the child (zero if we're child)
909 pid_t m_parentPid; ///< pid of the parent
910
911 /// cleanup routine - at exit, we want our children to get a SIGTERM...
912 static void teardownall(void);
913
914 /// return length of a page list
915 static unsigned lenPageList(const Page* list);
916
917 /** "feed" the busy and free lists with a list of pages
918 *
919 * @param plist linked list of pages
920 *
921 * goes through plist, puts free pages from plist onto the freelist
922 * (or sends them to the remote end if they belong there), and puts
923 * non-empty pages on plist onto the busy list
924 */
925 void feedPageLists(Page* plist);
926
927 /// put on dirty pages list
928 void markPageDirty(Page* p);
929
930 /// transfer bytes through the pipe (reading, writing, may block)
931 static size_type xferraw(int fd, void* addr, size_type len,
932 ssize_t (*xferfn)(int, void*, std::size_t));
933 /// transfer bytes through the pipe (reading, writing, may block)
934 static size_type xferraw(int fd, void* addr, const size_type len,
935 ssize_t (*xferfn)(int, const void*, std::size_t))
936 {
937 return xferraw(fd, addr, len,
938 reinterpret_cast<ssize_t (*)(
939 int, void*, std::size_t)>(xferfn));
940 }
941
942 /** @brief send page(s) to the other end (may block)
943 *
944 * @param plist linked list of pages to send
945 *
946 * the implementation gathers the different write(s) whereever
947 * possible; if mmap works, this results in a single write to transfer
948 * the list of pages sent, if we need to copy things through the pipe,
949 * we have one write to transfer which pages are sent, and then one
950 * write per page.
951 */
952 void sendpages(Page* plist);
953
954 /** @brief receive a pages from the other end (may block), queue them
955 *
956 * @returns number of pages received
957 *
958 * this is an application-level scatter read, which gets the list of
959 * pages to read from the pipe. if mmap works, it needs only one read
960 * call (to get the head of the list of pages transferred). if we need
961 * to copy pages through the pipe, we need to add one read for each
962 * empty page, and two reads for each non-empty page.
963 */
964 unsigned recvpages();
965
966 /** @brief receive pages from other end (non-blocking)
967 *
968 * @returns number of pages received
969 *
970 * like recvpages(), but does not block if nothing is available for
971 * reading
972 */
973 unsigned recvpages_nonblock();
974
975 /// get a busy page to read data from (may block)
976 Page* busypage();
977 /// get a dirty page to write data to (may block)
978 Page* dirtypage();
979
980 /// close the pipe (no flush if forced)
981 int doClose(bool force, bool holdlock = false);
982 /// perform the flush
983 void doFlush(bool forcePartialPages = true);
984#endif //_WIN32
985};
986
988
989#undef BEGIN_NAMESPACE_ROOFIT
990#undef END_NAMESPACE_ROOFIT
991
992#endif // BIDIRMMAPPIPE_H
993
994// vim: ft=cpp:sw=4:tw=78:et
#define END_NAMESPACE_ROOFIT
#define BEGIN_NAMESPACE_ROOFIT
for poll() interface
PollEntry(BidirMMapPipe *_pipe)
poll a pipe for all events
unsigned revents
events that happened (or'ed bitmask)
PollEntry(BidirMMapPipe *_pipe, int _events)
poll a pipe for specified events
BidirMMapPipe * pipe
pipe of interest
unsigned events
events of interest (or'ed bitmask)
exception to throw if low-level OS calls go wrong
class representing a chunk of pages
PageChunk(const PageChunk &)
forbid copying
BidirMMapPipeException Exception
convenience typedef
PageChunk & operator=(const PageChunk &)
forbid assignment
unsigned m_nUsedGrp
number of used page groups
unsigned len() const
return length of chunk
unsigned nPagesPerGroup() const
return number of pages per page group
void * m_begin
pointer to start of mmapped area
static unsigned physPgSz()
return the physical page size of the system
static unsigned s_physpgsz
system physical page size
bool full() const
return true if no free page groups in this chunk
void * m_end
pointer one behind end of mmapped area
void push(const Pages &p)
push a group of pages onto the free list
bool contains(const Pages &p) const
return if p is contained in this PageChunk
PagePool * m_parent
parent page pool
std::list< void * > m_freelist
free pages list
static MMapVariety s_mmapworks
mmap variety that works on this system
MMapVariety
type of mmap support found
@ Copy
mmap doesn't work, have to copy back and forth
@ Unknown
don't know yet what'll work
@ DevZero
mmapping /dev/zero works
@ Anonymous
anonymous mmap works
@ FileBacked
mmapping a temp file works
static void domunmap(void *p, unsigned len)
munmap pages p, len is length of mmapped area in bytes
static unsigned s_pagesize
logical page size (run-time determined)
unsigned m_nPgPerGrp
number of pages per group
static MMapVariety mmapVariety()
return mmap variety support found
Pages pop()
pop a group of pages off the free list
static unsigned pagesize()
return the logical page size
static void * dommap(unsigned len)
mmap pages, len is length of mmapped area in bytes
bool empty() const
return true if no used page groups in this chunk
void zap(Pages &p)
free all pages except for those pointed to by p
static unsigned getPageSize()
determine page size at run time
class representing a page pool
class representing the header structure in an mmapped page
handle class for a number of Pages
unsigned operator[](Page *p) const
perform page to page number mapping
Pages()
default constructor
static unsigned pagesize()
return page size
impl * m_pimpl
pointer to implementation
Page * operator[](unsigned pgno) const
return page number pageno
Pages & operator=(const Pages &other)
assignment operator
void swap(Pages &other)
swap with other's contents
Page * page(unsigned pgno) const
return page number pageno
unsigned npages() const
return number of pages accessible
unsigned pageno(Page *p) const
perform page to page number mapping
BidirMMapPipe creates a bidirectional channel between the current process and a child it forks.
STREAMOP(short)
C++ style stream operators for short.
BidirMMapPipe & operator>>(T *&tptr)
read raw pointer to T from other side
void purge()
purge buffered data waiting to be read and/or written
STREAMOP(unsigned long long)
C++ style stream operators for unsigned long long.
static BidirMMapPipe_impl::PagePool * s_pagepool
pool of mmapped pages
void feedPageLists(Page *plist)
"feed" the busy and free lists with a list of pages
static std::list< BidirMMapPipe * > s_openpipes
list of open BidirMMapPipes
int close()
flush buffers, close pipe
STREAMOP(float)
C++ style stream operators for float.
static BidirMMapPipe & purge(BidirMMapPipe &pipe)
for usage a la "pipe << purge;"
bool usesSocketpair() const
if BidirMMapPipe uses a socketpair for communications
BidirMMapPipe_impl::Page Page
convenience typedef for Page
STREAMOP(long long)
C++ style stream operators for long long.
STREAMOP(long)
C++ style stream operators for long.
BidirMMapPipe_impl::Pages m_pages
mmapped pages
bool good() const
status of stream is good
static size_type xferraw(int fd, void *addr, const size_type len, ssize_t(*xferfn)(int, const void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
Page * m_dirtylist
linked list: dirty pages (data to be sent)
bool usesPipepair() const
if BidirMMapPipe uses a pipe pair for communications
pid_t m_parentPid
pid of the parent
STREAMOP(bool)
C++ style stream operators for bool.
unsigned recvpages()
receive a pages from the other end (may block), queue them
BidirMMapPipe & operator>>(char *(&str))
read a C-style string
static pthread_mutex_t s_openpipesmutex
protects s_openpipes
STREAMOP(double)
C++ style stream operators for double.
unsigned recvpages_nonblock()
receive pages from other end (non-blocking)
BidirMMapPipe & operator<<(const T *tptr)
write raw pointer to T to other side
~BidirMMapPipe()
destructor
int m_inpipe
pipe end from which data may be read
STREAMOP(unsigned long)
C++ style stream operators for unsigned long.
BidirMMapPipe & operator>>(BidirMMapPipe &(*manip)(BidirMMapPipe &))
I/O manipulator support.
bool isChild() const
return if this end of the pipe is the child end
void doFlush(bool forcePartialPages=true)
perform the flush
static BidirMMapPipe & flush(BidirMMapPipe &pipe)
for usage a la "pipe << flush;"
static void teardownall(void)
cleanup routine - at exit, we want our children to get a SIGTERM...
@ wrerrbit
write error
@ badbit
general I/O error
@ eofbit
end of file reached
@ failbit
logical failure (e.g. pipe closed)
@ exceptionsbit
error reporting with exceptions
@ rderrbit
read error
size_type bytesWritableNonBlocking()
number of bytes that can be written without blocking
STREAMOP(int)
C++ style stream operators for int.
STREAMOP(unsigned short)
C++ style stream operators for unsigned short.
static BidirMMapPipe_impl::PagePool & pagepool()
return page pool
bool eof() const
true if end-of-file
size_type bytesReadableNonBlocking()
number of bytes that can be read without blocking
pid_t pidOtherEnd() const
return PID of the process on the other end of the pipe
static int debugflag()
return the current setting of the debug flag
bool isParent() const
return if this end of the pipe is the parent end
std::size_t size_type
type used to represent sizes
int doClose(bool force, bool holdlock=false)
close the pipe (no flush if forced)
@ PagesPerEnd
pages per pipe end
@ TotPages
pages shared (child + parent)
@ FlushThresh
flush threshold
bool bad() const
true on I/O error
PollFlags
condition flags for poll
@ WriteInvalid
write end of pipe invalid
@ Invalid
invalid pipe
@ ReadEndOfFile
read pipe in end-of-file state
@ WriteError
pipe error Write end
@ ReadError
pipe error read end
@ Error
pipe error
@ Readable
pipe has data for reading
@ None
nothing special on this pipe
@ ReadInvalid
read end of pipe invalid
@ WriteEndOfFile
write pipe in end-of-file state
@ Writable
pipe can be written to
@ EndOfFile
end of file
int m_flags
flags (e.g. end of file)
Page * m_freelist
linked list: free pages
Page * busypage()
get a busy page to read data from (may block)
int rdstate() const
return flags (end of file, BidirMMapPipe closed, ...)
Page * dirtypage()
get a dirty page to write data to (may block)
BidirMMapPipe & operator=(const BidirMMapPipe &)
assignment forbidden
void sendpages(Page *plist)
send page(s) to the other end (may block)
BidirMMapPipe & operator<<(const char *str)
write a C-style string
static size_type xferraw(int fd, void *addr, size_type len, ssize_t(*xferfn)(int, void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
STREAMOP(char)
C++ style stream operators for char.
BidirMMapPipe & operator<<(BidirMMapPipe &(*manip)(BidirMMapPipe &))
I/O manipulator support.
STREAMOP(unsigned char)
C++ style stream operators for unsigned char.
size_type read(void *addr, size_type sz)
read from pipe
STREAMOP(unsigned int)
C++ style stream operators for unsigned int.
static int s_debugflag
debug flag
bool closed() const
true if closed
BidirMMapPipe_impl::BidirMMapPipeException Exception
convenience typedef for BidirMMapPipeException
size_type write(const void *addr, size_type sz)
wirte to pipe
bool operator!() const
return true if serious error (fail/bad)
std::vector< PollEntry > PollVector
convenience typedef for poll() interface
void flush()
flush buffers with unwritten data
bool fail() const
logical failure (e.g.
static unsigned s_pagepoolrefcnt
page pool reference counter
static void setDebugflag(int flag)
set the debug flags
Page * m_busylist
linked list: busy pages (data to be read)
static int poll(PollVector &pipes, int timeout)
poll a set of pipes for events (ready to read from, ready to write to, error)
int m_outpipe
pipe end to which data may be written
void markPageDirty(Page *p)
put on dirty pages list
static unsigned lenPageList(const Page *list)
return length of a page list
pid_t m_childPid
pid of the child (zero if we're child)
namespace for implementation details of BidirMMapPipe
unsigned char m_npages
length in pages
Page * m_pages
pointer to first page
unsigned m_refcnt
reference counter
PageChunk * m_parent
pointer to parent pool