Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
BidirMMapPipe.h
Go to the documentation of this file.
1/** @file BidirMMapPipe.h
2 *
3 * header file for BidirMMapPipe, a class which forks off a child process and
4 * serves as communications channel between parent and child
5 *
6 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
7 * @date 2013-07-07
8 */
9
10#ifndef BIDIRMMAPPIPE_H
11#define BIDIRMMAPPIPE_H
12
13#include <list>
14#include <vector>
15#include <cstring>
16#include <unistd.h>
17
18#define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
19#define END_NAMESPACE_ROOFIT }
20
22
23/// namespace for implementation details of BidirMMapPipe
24namespace BidirMMapPipe_impl {
25 // forward declarations
26 class BidirMMapPipeException;
27 class Page;
28 class PagePool;
29 class Pages;
30
31 /** @brief class representing a chunk of pages
32 *
33 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
34 * @date 2013-07-24
35 *
36 * allocating pages from the OS happens in chunks in order to not exhaust
37 * the maximum allowed number of memory mappings per process; this class
38 * takes care of such a chunk
39 *
40 * a page chunk allows callers to obtain or release pages in groups of
41 * continuous pages of fixed size
42 */
43 class PageChunk {
44 public:
45 /// type of mmap support found
46 typedef enum {
47 Unknown, ///< don't know yet what'll work
48 Copy, ///< mmap doesn't work, have to copy back and forth
49 FileBacked, ///< mmapping a temp file works
50 DevZero, ///< mmapping /dev/zero works
51 Anonymous ///< anonymous mmap works
53
54 private:
55 static unsigned s_physpgsz; ///< system physical page size
56 static unsigned s_pagesize; ///< logical page size (run-time determined)
57 /// mmap variety that works on this system
59
60 /// convenience typedef
61 typedef BidirMMapPipeException Exception;
62
63 void* m_begin; ///< pointer to start of mmapped area
64 void* m_end; ///< pointer one behind end of mmapped area
65 // FIXME: cannot keep freelist inline - other end may need that
66 // data, and we'd end up overwriting the page header
67 std::list<void*> m_freelist; ///< free pages list
68 PagePool* m_parent; ///< parent page pool
69 unsigned m_nPgPerGrp; ///< number of pages per group
70 unsigned m_nUsedGrp; ///< number of used page groups
71
72 /// determine page size at run time
73 static unsigned getPageSize();
74
75 /// mmap pages, len is length of mmapped area in bytes
76 static void* dommap(unsigned len);
77 /// munmap pages p, len is length of mmapped area in bytes
78 static void domunmap(void* p, unsigned len);
79 /// forbid copying
81 /// forbid assignment
82 PageChunk& operator=(const PageChunk&) { return *this; }
83 public:
84 /// return the logical page size
85 static unsigned pagesize() { return s_pagesize; }
86 /// return the physical page size of the system
87 static unsigned physPgSz() { return s_physpgsz; }
88 /// return mmap variety support found
89 static MMapVariety mmapVariety() { return s_mmapworks; }
90
91 /// constructor
92 PageChunk(PagePool* parent, unsigned length, unsigned nPgPerGroup);
93
94 /// destructor
95 ~PageChunk();
96
97 /// return if p is contained in this PageChunk
98 bool contains(const Pages& p) const;
99
100 /// pop a group of pages off the free list
101 Pages pop();
102
103 /// push a group of pages onto the free list
104 void push(const Pages& p);
105
106 /// return length of chunk
107 unsigned len() const
108 {
109 return reinterpret_cast<unsigned char*>(m_end) -
110 reinterpret_cast<unsigned char*>(m_begin);
111 }
112 /// return number of pages per page group
113 unsigned nPagesPerGroup() const { return m_nPgPerGrp; }
114
115 /// return true if no used page groups in this chunk
116 bool empty() const { return !m_nUsedGrp; }
117
118 /// return true if no free page groups in this chunk
119 bool full() const { return m_freelist.empty(); }
120
121 /// free all pages except for those pointed to by p
122 void zap(Pages& p);
123 };
124
125 /** @brief handle class for a number of Pages
126 *
127 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
128 * @date 2013-07-24
129 *
130 * the associated pages are continuous in memory
131 */
132 class Pages {
133 private:
134 /// implementation
135 typedef struct {
136 PageChunk *m_parent; ///< pointer to parent pool
137 Page* m_pages; ///< pointer to first page
138 unsigned m_refcnt; ///< reference counter
139 unsigned char m_npages; ///< length in pages
140 } impl;
141 public:
142 /// default constructor
143 Pages() : m_pimpl(0) { }
144
145 /// destructor
146 ~Pages();
147
148 /** @brief copy constructor
149 *
150 * copy Pages handle to new object - old object loses ownership,
151 * and becomes a dangling handle
152 */
153 Pages(const Pages& other);
154
155 /** @brief assignment operator
156 *
157 * assign Pages handle to new object - old object loses ownership,
158 * and becomes a dangling handle
159 */
160 Pages& operator=(const Pages& other);
161
162 /// return page size
163 static unsigned pagesize();
164
165 /// return number of pages accessible
166 unsigned npages() const { return m_pimpl->m_npages; }
167
168 /// return page number pageno
169 Page* page(unsigned pgno) const;
170
171 /// return page number pageno
172 Page* operator[](unsigned pgno) const { return page(pgno); }
173
174 /// perform page to page number mapping
175 unsigned pageno(Page* p) const;
176
177 /// perform page to page number mapping
178 unsigned operator[](Page* p) const { return pageno(p); }
179
180 /// swap with other's contents
181 void swap(Pages& other)
182 {
183 impl* tmp = other.m_pimpl;
184 other.m_pimpl = m_pimpl;
185 m_pimpl = tmp;
186 }
187
188 private:
189 /// page pool is our friend - it's allowed to construct Pages
191
192 /// pointer to implementation
194
195 /// constructor
196 Pages(PageChunk* parent, Page* pages, unsigned npg);
197 };
198}
199
200/** @brief BidirMMapPipe creates a bidirectional channel between the current
201 * process and a child it forks.
202 *
203 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
204 * @date 2013-07-07
205 *
206 * This class creates a bidirectional channel between this process and a child
207 * it creates with fork().
208 *
209 * The channel is comrised of a small shared pool of buffer memory mmapped into
210 * both process spaces, and two pipes to synchronise the exchange of data. The
211 * idea behind using the pipes at all is to have some primitive which we can
212 * block on without having to worry about atomic operations or polling, leaving
213 * these tasks to the OS. In case the anonymous mmap cannot be performed on the
214 * OS the code is running on (for whatever reason), the code falls back to
215 * mmapping /dev/zero, mmapping a temporary file, or (if those all fail), a
216 * dynamically allocated buffer which is then transmitted through the pipe(s),
217 * a slightly slower alternative (because the data is copied more often).
218 *
219 * The channel supports five major operations: read(), write(), flush(),
220 * purge() and close(). Reading and writing may block until the required buffer
221 * space is available. Writes may queue up data to be sent to the other end
222 * until either enough pages are full, or the user calls flush which forces
223 * any unsent buffers to be sent to the other end. flush forces any data that
224 * is to be sent to be sent. purge discards any buffered data waiting to be
225 * read and/or sent. Closing the channel on the child returns zero, closing it
226 * on the parent returns the child's exit status.
227 *
228 * The class also provides operator<< and operator>> for C++-style I/O for
229 * basic data types (bool, char, short, int, long, long long, float, double
230 * and their unsigned counterparts). Data is transmitted binary (i.e. no
231 * formatting to strings like std::cout does). There are also overloads to
232 * support C-style zero terminated strings and std::string. In terms of
233 * performance, the former is to be preferred.
234 *
235 * If the caller needs to multiplex input and output to/from several pipes, the
236 * class provides the poll() method which allows to block until an event occurs
237 * on any of the polled pipes.
238 *
239 * After the BidirMMapPipe is closed, no further operations may be performed on
240 * that object, save for the destructor which may still be called.
241 *
242 * If the BidirMMapPipe has not properly been closed, the destructor will call
243 * close. However, the exit code of the child is lost in that case.
244 *
245 * Closing the object causes the mmapped memory to be unmapped and the two
246 * pipes to be closed. We also install an atexit handler in the process of
247 * creating BidirMMapPipes. This ensures that when the current process
248 * terminates, a SIGTERM signal is sent to the child processes created for all
249 * unclosed pipes to avoid leaving zombie processes in the OS's process table.
250 *
251 * BidirMMapPipe creation, closing and destruction are thread safe. If the
252 * BidirMMapPipe is used in more than one thread, the other operations have to
253 * be protected with a mutex (or something similar), though.
254 *
255 * End of file (other end closed its pipe, or died) is indicated with the eof()
256 * method, serious I/O errors set a flags (bad(), fail()), and also throw
257 * exceptions. For normal read/write operations, they can be suppressed (i.e.
258 * error reporting only using flags) with a constructor argument.
259 *
260 * Technicalities:
261 * - there is a pool of mmapped pages, half the pages are allocated to the
262 * parent process, half to the child
263 * - when one side has accumulated enough data (or a flush forces dirty pages
264 * out to the other end), it sends these pages to the other end by writing a
265 * byte containing the page number into the pipe
266 * - the other end (which has the pages mmapped, too) reads the page number(s)
267 * and puts the corresponding pages on its busy list
268 * - as the other ends reads, it frees busy pages, and eventually tries to put
269 * them on the its list; if a page belongs to the other end of the
270 * connection, it is sent back
271 * - lists of pages are sent across the pipe, not individual pages, in order
272 * to minimise the number of read/write operations needed
273 * - when mmap works properly, only one bytes containing the page number of
274 * the page list head is sent back and forth; the contents of that page
275 * allow to access the rest of the page list sent, and page headers on the
276 * list tell the receiving end if the page is free or has to be added to the
277 * busy list
278 * - when mmap does not work, we transfer one byte to indicate the head of the
279 * page list sent, and for each page on the list of sent pages, the page
280 * header and the page payload is sent (if the page is free, we only
281 * transmit the page header, and we never transmit more payload than
282 * the page actually contains)
283 * - in the child, all open BidirMMapPipes but the current one are closed. this
284 * is done for two reasons: first, to conserve file descriptors and address
285 * space. second, if more than one process is meant to use such a
286 * BidirMMapPipe, synchronisation issues arise which can lead to bugs that
287 * are hard to find and understand. it's much better to come up with a design
288 * which does not need pipes to be shared among more than two processes.
289 *
290 * Here is a trivial example of a parent and a child talking to each other over
291 * a BidirMMapPipe:
292 * @code
293 * #include <string>
294 * #include <iostream>
295 * #include <cstdlib>
296 *
297 * #include "BidirMMapPipe.h"
298 *
299 * int simplechild(BidirMMapPipe& pipe)
300 * {
301 * // child does an echo loop
302 * while (pipe.good() && !pipe.eof()) {
303 * // read a string
304 * std::string str;
305 * pipe >> str;
306 * if (!pipe) return -1;
307 * if (pipe.eof()) break;
308 * // check if parent wants us to shut down
309 * if (!str.empty()) {
310 * std::cout << "[CHILD] : read: " << str << std::endl;
311 * str = "... early in the morning?";
312 * }
313 * pipe << str << BidirMMapPipe::flush;
314 * if (str.empty()) break;
315 * if (!pipe) return -1;
316 * std::cout << "[CHILD] : wrote: " << str << std::endl;
317 * }
318 * // send shutdown request acknowledged
319 * pipe << "" << BidirMMapPipe::flush;
320 *
321 * pipe.close();
322 * return 0;
323 * }
324 *
325 * BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
326 * {
327 * BidirMMapPipe *p = new BidirMMapPipe();
328 * if (p->isChild()) {
329 * int retVal = childexec(*p);
330 * delete p;
331 * std::exit(retVal);
332 * }
333 * return p;
334 * }
335 *
336 * int main()
337 * {
338 * std::cout << "[PARENT]: simple challenge-response test, one child:" <<
339 * std::endl;
340 * BidirMMapPipe* pipe = spawnChild(simplechild);
341 * for (int i = 0; i < 5; ++i) {
342 * std::string str("What shall we do with a drunken sailor...");
343 * *pipe << str << BidirMMapPipe::flush;
344 * if (!*pipe) return -1;
345 * std::cout << "[PARENT]: wrote: " << str << std::endl;
346 * *pipe >> str;
347 * if (!*pipe) return -1;
348 * std::cout << "[PARENT]: read: " << str << std::endl;
349 * }
350 * // ask child to shut down
351 * pipe << "" << BidirMMapPipe::flush;
352 * // wait for it to see the shutdown request
353 * std::string s;
354 * pipe >> s;
355 * std::cout << "[PARENT]: exit status of child: " << pipe->close() <<
356 * std::endl;
357 * delete pipe;
358 * return 0;
359 * }
360 * @endcode
361 *
362 * When designing your own protocols to use over the pipe, there are a few
363 * things to bear in mind:
364 * - Do as http does: When building a request, send all the options and
365 * properties of that request with the request itself in a single go (one
366 * flush). Then, the server has everything it needs, and hopefully, it'll
367 * shut up for a while and to let the client do something useful in the
368 * meantime... The same goes when the server replies to the request: include
369 * everything there is to know about the result of the request in the reply.
370 * - The expensive operation should be the request that is made, all other
371 * operations should somehow be formulated as options or properties to that
372 * request.
373 * - Include a shutdown handshake in whatever protocol you send over the
374 * pipe. That way, you can shut things down in a controlled way. Otherwise,
375 * and depending on your OS's scheduling quirks, you may catch a SIGPIPE if
376 * one end closes its pipe while the other is still trying to read.
377 */
379#ifndef _WIN32
380 public:
381 /// type used to represent sizes
382 typedef std::size_t size_type;
383 /// convenience typedef for BidirMMapPipeException
384 typedef BidirMMapPipe_impl::BidirMMapPipeException Exception;
385 /// flag bits for partial C++ iostream compatibility
386 enum {
387 eofbit = 1, ///< end of file reached
388 failbit = 2, ///< logical failure (e.g. pipe closed)
389 rderrbit = 4, ///< read error
390 wrerrbit = 8, ///< write error
391 badbit = rderrbit | wrerrbit, ///< general I/O error
392 exceptionsbit = 16 ///< error reporting with exceptions
393 };
394
395 /** @brief constructor (forks!)
396 *
397 * Creates a bidirectional communications channel between this process
398 * and a child the constructor forks. On return from the constructor,
399 * isParent() and isChild() can be used to tell the parent end from the
400 * child end of the pipe. In the child, all other open BidirMMapPipes
401 * are closed.
402 *
403 * @param useExceptions read()/write() error reporting also done using
404 * exceptions
405 * @param useSocketpair use a socketpair instead of a pair or pipes
406 *
407 * Normally, exceptions are thrown for all serious I/O errors (apart
408 * from end of file). Setting useExceptions to false will force the
409 * read() and write() methods to only report serious I/O errors using
410 * flags.
411 *
412 * When useSocketpair is true, use a pair of Unix domain sockets
413 * created using socketpair instead a pair of pipes. The advantage is
414 * that only one pair of file descriptors is needed instead of two
415 * pairs which are needed for the pipe pair. Performance should very
416 * similar on most platforms, especially if mmap works, since only
417 * very little data is sent through the pipe(s)/socketpair.
418 */
419 BidirMMapPipe(bool useExceptions = true, bool useSocketpair = false);
420
421 /** @brief destructor
422 *
423 * closes this end of pipe
424 */
426
427 /** @brief return the current setting of the debug flag
428 *
429 * @returns an integer with the debug Setting
430 */
431 static int debugflag() { return s_debugflag; }
432
433 /** @brief set the debug flags
434 *
435 * @param flag debug flags (if zero, no messages are printed)
436 */
437 static void setDebugflag(int flag) { s_debugflag = flag; }
438
439 /** @brief read from pipe
440 *
441 * @param addr where to put read data
442 * @param sz size of data to read (in bytes)
443 * @returns size of data read, or 0 in case of end-of-file
444 *
445 * read may block until data from other end is available. It will
446 * return 0 if the other end closed the pipe.
447 */
448 size_type read(void* addr, size_type sz);
449
450 /** @brief wirte to pipe
451 *
452 * @param addr where to get data to write from
453 * @param sz size of data to write (in bytes)
454 * @returns size of data written, or 0 in case of end-of-file
455 *
456 * write may block until data can be written to other end (depends a
457 * bit on available buffer space). It will return 0 if the other end
458 * closed the pipe. The data is queued to be written on the next
459 * convenient occasion, or it can be forced out with flush().
460 */
461 size_type write(const void* addr, size_type sz);
462
463 /** @brief flush buffers with unwritten data
464 *
465 * This forces unwritten data to be written to the other end. The call
466 * will block until this has been done (or the attempt failed with an
467 * error).
468 */
469 void flush();
470
471 /** @brief purge buffered data waiting to be read and/or written
472 *
473 * Discards all internal buffers.
474 */
475 void purge();
476
477 /** @brief number of bytes that can be read without blocking
478 *
479 * @returns number of bytes that can be read without blocking
480 */
482
483 /** @brief number of bytes that can be written without blocking
484 *
485 * @returns number of bytes that can be written without blocking
486 */
488
489 /** @brief flush buffers, close pipe
490 *
491 * Flush buffers, discard unread data, closes the pipe. If the pipe is
492 * in the parent process, it waits for the child.
493 *
494 * @returns exit code of child process in parent, zero in child
495 */
496 int close();
497
498 /** @brief return PID of the process on the other end of the pipe
499 *
500 * @returns PID of the process running on the remote end
501 */
502 pid_t pidOtherEnd() const
503 { return isChild() ? m_parentPid : m_childPid; }
504
505 /// condition flags for poll
507 None = 0, ///< nothing special on this pipe
508 Readable = 1, ///< pipe has data for reading
509 Writable = 2, ///< pipe can be written to
510 ReadError = 4, ///< pipe error read end
511 WriteError = 8, ///< pipe error Write end
512 Error = ReadError | WriteError, ///< pipe error
513 ReadEndOfFile = 32, ///< read pipe in end-of-file state
514 WriteEndOfFile = 64,///< write pipe in end-of-file state
516 ReadInvalid = 64, ///< read end of pipe invalid
517 WriteInvalid = 128, ///< write end of pipe invalid
518 Invalid = ReadInvalid | WriteInvalid ///< invalid pipe
519 };
520
521 /// for poll() interface
522 class PollEntry {
523 public:
524 BidirMMapPipe* pipe; ///< pipe of interest
525 unsigned events; ///< events of interest (or'ed bitmask)
526 unsigned revents; ///< events that happened (or'ed bitmask)
527 /// poll a pipe for all events
529 pipe(_pipe), events(None), revents(None) { }
530 /// poll a pipe for specified events
531 PollEntry(BidirMMapPipe* _pipe, int _events) :
532 pipe(_pipe), events(_events), revents(None) { }
533 };
534 /// convenience typedef for poll() interface
535 typedef std::vector<PollEntry> PollVector;
536
537 /** @brief poll a set of pipes for events (ready to read from, ready to
538 * write to, error)
539 *
540 * @param pipes set of pipes to check
541 * @param timeout timeout in milliseconds
542 * @returns positive number: number of pipes which have
543 * status changes, 0: timeout, or no pipes with
544 * status changed, -1 on error
545 *
546 * Timeout can be zero (check for specified events, and return), finite
547 * (wait at most timeout milliseconds before returning), or -1
548 * (infinite). The poll method returns when the timeout has elapsed,
549 * or if an event occurs on one of the pipes being polled, whichever
550 * happens earlier.
551 *
552 * Pipes is a vector of one or more PollEntries, which each list a pipe
553 * and events to poll for. If events is left empty (zero), all
554 * conditions are polled for, otherwise only the indicated ones. On
555 * return, the revents fields contain the events that occurred for each
556 * pipe; error Error, EndOfFile or Invalid events are always set,
557 * regardless of wether they were in the set of requested events.
558 *
559 * poll may block slightly longer than specified by timeout due to OS
560 * timer granularity and OS scheduling. Due to its implementation, the
561 * poll call can also return early if the remote end of the page sends
562 * a free page while polling (which is put on that pipe's freelist),
563 * while that pipe is polled for e.g Reading. The status of the pipe is
564 * indicated correctly in revents, and the caller can simply poll
565 * again. (The reason this is done this way is because it helps to
566 * replenish the pool of free pages and queue busy pages without
567 * blocking.)
568 *
569 * Here's a piece of example code waiting on two pipes; if they become
570 * readable they are read:
571 * @code
572 * #include <unistd.h>
573 * #include <cstdlib>
574 * #include <string>
575 * #include <sstream>
576 * #include <iostream>
577 *
578 * #include "BidirMMapPipe.h"
579 *
580 * // what to execute in the child
581 * int randomchild(BidirMMapPipe& pipe)
582 * {
583 * ::srand48(::getpid());
584 * for (int i = 0; i < 5; ++i) {
585 * // sleep a random time between 0 and .9 seconds
586 * ::usleep(int(1e6 * ::drand48()));
587 * std::ostringstream buf;
588 * buf << "child pid " << ::getpid() << " sends message " << i;
589 * std::cout << "[CHILD] : " << buf.str() << std::endl;
590 * pipe << buf.str() << BidirMMapPipe::flush;
591 * if (!pipe) return -1;
592 * if (pipe.eof()) break;
593 * }
594 * // tell parent we're done
595 * pipe << "" << BidirMMapPipe::flush;
596 * // wait for parent to acknowledge
597 * std::string s;
598 * pipe >> s;
599 * pipe.close();
600 * return 0;
601 * }
602 *
603 * // function to spawn a child
604 * BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
605 * {
606 * BidirMMapPipe *p = new BidirMMapPipe();
607 * if (p->isChild()) {
608 * int retVal = childexec(*p);
609 * delete p;
610 * std::exit(retVal);
611 * }
612 * return p;
613 * }
614 *
615 * int main()
616 * {
617 * typedef BidirMMapPipe::PollEntry PollEntry;
618 * // poll data structure
619 * BidirMMapPipe::PollVector pipes;
620 * pipes.reserve(3);
621 * // spawn children
622 * for (int i = 0; i < 3; ++i) {
623 * pipes.push_back(PollEntry(spawnChild(randomchild),
624 * BidirMMapPipe::Readable));
625 * }
626 * // while at least some children alive
627 * while (!pipes.empty()) {
628 * // poll, wait until status change (infinite timeout)
629 * int npipes = BidirMMapPipe::poll(pipes, -1);
630 * // scan for pipes with changed status
631 * for (std::vector<PollEntry>::iterator it = pipes.begin();
632 * npipes && pipes.end() != it; ) {
633 * if (!it->revents) {
634 * // unchanged, next one
635 * ++it;
636 * continue;
637 * }
638 * --npipes; // maybe we can stop early...
639 * // read from pipes which are readable
640 * if (it->revents & BidirMMapPipe::Readable) {
641 * std::string s;
642 * *(it->pipe) >> s;
643 * if (!s.empty()) {
644 * std::cout << "[PARENT]: Read from pipe " <<
645 * it->pipe << ": " << s << std::endl;
646 * ++it;
647 * continue;
648 * } else {
649 * // child is shutting down...
650 * *(it->pipe) << "" << BidirMMapPipe::flush;
651 * goto childcloses;
652 * }
653 * }
654 * // retire pipes with error or end-of-file condition
655 * if (it->revents & (BidirMMapPipe::Error |
656 * BidirMMapPipe::EndOfFile |
657 * BidirMMapPipe::Invalid)) {
658 * std::cout << "[PARENT]: Error on pipe " <<
659 * it->pipe << " revents " << it->revents <<
660 * std::endl;
661 * childcloses:
662 * std::cout << "[PARENT]:\tchild exit status: " <<
663 * it->pipe->close() << std::endl;
664 * if (retVal) return retVal;
665 * delete it->pipe;
666 * it = pipes.erase(it);
667 * continue;
668 * }
669 * }
670 * }
671 * return 0;
672 * }
673 * @endcode
674 */
675 static int poll(PollVector& pipes, int timeout);
676
677 /** @brief return if this end of the pipe is the parent end
678 *
679 * @returns true if parent end of pipe
680 */
681 bool isParent() const { return m_childPid; }
682
683 /** @brief return if this end of the pipe is the child end
684 *
685 * @returns true if child end of pipe
686 */
687 bool isChild() const { return !m_childPid; }
688
689 /** @brief if BidirMMapPipe uses a socketpair for communications
690 *
691 * @returns true if BidirMMapPipe uses a socketpair for communications
692 */
693 bool usesSocketpair() const { return m_inpipe == m_outpipe; }
694
695 /** @brief if BidirMMapPipe uses a pipe pair for communications
696 *
697 * @returns true if BidirMMapPipe uses a pipe pair for communications
698 */
699 bool usesPipepair() const { return m_inpipe != m_outpipe; }
700
701 /** @brief return flags (end of file, BidirMMapPipe closed, ...)
702 *
703 * @returns flags (end of file, BidirMMapPipe closed, ...)
704 */
705 int rdstate() const { return m_flags; }
706
707 /** @brief true if end-of-file
708 *
709 * @returns true if end-of-file
710 */
711 bool eof() const { return m_flags & eofbit; }
712
713 /** @brief logical failure (e.g. I/O on closed BidirMMapPipe)
714 *
715 * @returns true in case of grave logical error (I/O on closed pipe,...)
716 */
717 bool fail() const { return m_flags & failbit; }
718
719 /** @brief true on I/O error
720 *
721 * @returns true on I/O error
722 */
723 bool bad() const { return m_flags & badbit; }
724
725 /** @brief status of stream is good
726 *
727 * @returns true if pipe is good (no errors, eof, ...)
728 */
729 bool good() const { return !(m_flags & (eofbit | failbit | badbit)); }
730
731 /** @brief true if closed
732 *
733 * @returns true if stream is closed
734 */
735 bool closed() const { return m_flags & failbit; }
736
737 /** @brief return true if not serious error (fail/bad)
738 *
739 * @returns true if stream is does not have serious error (fail/bad)
740 *
741 * (if EOF, this is still true)
742 */
743 operator bool() const { return !fail() && !bad(); }
744
745 /** @brief return true if serious error (fail/bad)
746 *
747 * @returns true if stream has a serious error (fail/bad)
748 */
749 bool operator!() const { return fail() || bad(); }
750
751#ifdef STREAMOP
752#undef STREAMOP
753#endif
754#define STREAMOP(TYPE) \
755 BidirMMapPipe& operator<<(const TYPE& val) \
756 { write(&val, sizeof(TYPE)); return *this; } \
757 BidirMMapPipe& operator>>(TYPE& val) \
758 { read(&val, sizeof(TYPE)); return *this; }
759 STREAMOP(bool); ///< C++ style stream operators for bool
760 STREAMOP(char); ///< C++ style stream operators for char
761 STREAMOP(short); ///< C++ style stream operators for short
762 STREAMOP(int); ///< C++ style stream operators for int
763 STREAMOP(long); ///< C++ style stream operators for long
764 STREAMOP(long long); ///< C++ style stream operators for long long
765 STREAMOP(unsigned char); ///< C++ style stream operators for unsigned char
766 STREAMOP(unsigned short); ///< C++ style stream operators for unsigned short
767 STREAMOP(unsigned int); ///< C++ style stream operators for unsigned int
768 STREAMOP(unsigned long); ///< C++ style stream operators for unsigned long
769 STREAMOP(unsigned long long); ///< C++ style stream operators for unsigned long long
770 STREAMOP(float); ///< C++ style stream operators for float
771 STREAMOP(double); ///< C++ style stream operators for double
772#undef STREAMOP
773
774 /** @brief write a C-style string
775 *
776 * @param str C-style string
777 * @returns pipe written to
778 */
779 BidirMMapPipe& operator<<(const char* str);
780
781 /** @brief read a C-style string
782 *
783 * @param str pointer to string (space allocated with malloc!)
784 * @returns pipe read from
785 *
786 * since this is for C-style strings, we use malloc/realloc/free for
787 * strings. passing in a NULL pointer is valid here, and the routine
788 * will use realloc to allocate a chunk of memory of the right size.
789 */
790 BidirMMapPipe& operator>>(char* (&str));
791
792 /** @brief write a std::string object
793 *
794 * @param str string to write
795 * @returns pipe written to
796 */
797 BidirMMapPipe& operator<<(const std::string& str);
798
799 /** @brief read a std::string object
800 *
801 * @param str string to be read
802 * @returns pipe read from
803 */
804 BidirMMapPipe& operator>>(std::string& str);
805
806 /** @brief write raw pointer to T to other side
807 *
808 * NOTE: This will not write the pointee! Only the value of the
809 * pointer is transferred.
810 *
811 * @param tptr pointer to be written
812 * @returns pipe written to
813 */
814 template<class T> BidirMMapPipe& operator<<(const T* tptr)
815 { write(&tptr, sizeof(tptr)); return *this; }
816
817 /** @brief read raw pointer to T from other side
818 *
819 * NOTE: This will not read the pointee! Only the value of the
820 * pointer is transferred.
821 *
822 * @param tptr pointer to be read
823 * @returns pipe read from
824 */
825 template<class T> BidirMMapPipe& operator>>(T* &tptr)
826 { read(&tptr, sizeof(tptr)); return *this; }
827
828 /** @brief I/O manipulator support
829 *
830 * @param manip manipulator
831 * @returns pipe with manipulator applied
832 *
833 * example:
834 * @code
835 * pipe << BidirMMapPipe::flush;
836 * @endcode
837 */
839 { return manip(*this); }
840
841 /** @brief I/O manipulator support
842 *
843 * @param manip manipulator
844 * @returns pipe with manipulator applied
845 *
846 * example:
847 * @code
848 * pipe >> BidirMMapPipe::purge;
849 * @endcode
850 */
852 { return manip(*this); }
853
854 /// for usage a la "pipe << flush;"
855 static BidirMMapPipe& flush(BidirMMapPipe& pipe) { pipe.flush(); return pipe; }
856 /// for usage a la "pipe << purge;"
857 static BidirMMapPipe& purge(BidirMMapPipe& pipe) { pipe.purge(); return pipe; }
858
859 private:
860 /// copy-construction forbidden
862 /// assignment forbidden
863 BidirMMapPipe& operator=(const BidirMMapPipe&) { return *this; }
864
865 /// page is our friend
867 /// convenience typedef for Page
869
870 /// tuning constants
871 enum {
872 // TotPages = 16 will give 32k buffers at 4k page size for both
873 // parent and child; if your average message to send is larger
874 // than this, consider raising the value (max 256)
875 TotPages = 16, ///< pages shared (child + parent)
876
877 PagesPerEnd = TotPages / 2, ///< pages per pipe end
878
879 // if FlushThresh pages are filled, the code forces a flush; 3/4
880 // of the pages available seems to work quite well
881 FlushThresh = (3 * PagesPerEnd) / 4 ///< flush threshold
882 };
883
884 // per-class members
885 static pthread_mutex_t s_openpipesmutex; ///< protects s_openpipes
886 /// list of open BidirMMapPipes
887 static std::list<BidirMMapPipe*> s_openpipes;
888 /// pool of mmapped pages
889 static BidirMMapPipe_impl::PagePool* s_pagepool;
890 /// page pool reference counter
891 static unsigned s_pagepoolrefcnt;
892 /// debug flag
893 static int s_debugflag;
894
895 /// return page pool
896 static BidirMMapPipe_impl::PagePool& pagepool();
897
898 // per-instance members
900 Page* m_busylist; ///< linked list: busy pages (data to be read)
901 Page* m_freelist; ///< linked list: free pages
902 Page* m_dirtylist; ///< linked list: dirty pages (data to be sent)
903 int m_inpipe; ///< pipe end from which data may be read
904 int m_outpipe; ///< pipe end to which data may be written
905 int m_flags; ///< flags (e.g. end of file)
906 pid_t m_childPid; ///< pid of the child (zero if we're child)
907 pid_t m_parentPid; ///< pid of the parent
908
909 /// cleanup routine - at exit, we want our children to get a SIGTERM...
910 static void teardownall(void);
911
912 /// return length of a page list
913 static unsigned lenPageList(const Page* list);
914
915 /** "feed" the busy and free lists with a list of pages
916 *
917 * @param plist linked list of pages
918 *
919 * goes through plist, puts free pages from plist onto the freelist
920 * (or sends them to the remote end if they belong there), and puts
921 * non-empty pages on plist onto the busy list
922 */
923 void feedPageLists(Page* plist);
924
925 /// put on dirty pages list
926 void markPageDirty(Page* p);
927
928 /// transfer bytes through the pipe (reading, writing, may block)
929 static size_type xferraw(int fd, void* addr, size_type len,
930 ssize_t (*xferfn)(int, void*, std::size_t));
931 /// transfer bytes through the pipe (reading, writing, may block)
932 static size_type xferraw(int fd, void* addr, const size_type len,
933 ssize_t (*xferfn)(int, const void*, std::size_t))
934 {
935 return xferraw(fd, addr, len,
936 reinterpret_cast<ssize_t (*)(
937 int, void*, std::size_t)>(xferfn));
938 }
939
940 /** @brief send page(s) to the other end (may block)
941 *
942 * @param plist linked list of pages to send
943 *
944 * the implementation gathers the different write(s) whereever
945 * possible; if mmap works, this results in a single write to transfer
946 * the list of pages sent, if we need to copy things through the pipe,
947 * we have one write to transfer which pages are sent, and then one
948 * write per page.
949 */
950 void sendpages(Page* plist);
951
952 /** @brief receive a pages from the other end (may block), queue them
953 *
954 * @returns number of pages received
955 *
956 * this is an application-level scatter read, which gets the list of
957 * pages to read from the pipe. if mmap works, it needs only one read
958 * call (to get the head of the list of pages transferred). if we need
959 * to copy pages through the pipe, we need to add one read for each
960 * empty page, and two reads for each non-empty page.
961 */
962 unsigned recvpages();
963
964 /** @brief receive pages from other end (non-blocking)
965 *
966 * @returns number of pages received
967 *
968 * like recvpages(), but does not block if nothing is available for
969 * reading
970 */
971 unsigned recvpages_nonblock();
972
973 /// get a busy page to read data from (may block)
974 Page* busypage();
975 /// get a dirty page to write data to (may block)
976 Page* dirtypage();
977
978 /// close the pipe (no flush if forced)
979 int doClose(bool force, bool holdlock = false);
980 /// perform the flush
981 void doFlush(bool forcePartialPages = true);
982#endif //_WIN32
983};
984
986
987#undef BEGIN_NAMESPACE_ROOFIT
988#undef END_NAMESPACE_ROOFIT
989
990#endif // BIDIRMMAPPIPE_H
991
992// vim: ft=cpp:sw=4:tw=78:et
#define END_NAMESPACE_ROOFIT
#define BEGIN_NAMESPACE_ROOFIT
for poll() interface
PollEntry(BidirMMapPipe *_pipe)
poll a pipe for all events
unsigned revents
events that happened (or'ed bitmask)
PollEntry(BidirMMapPipe *_pipe, int _events)
poll a pipe for specified events
BidirMMapPipe * pipe
pipe of interest
unsigned events
events of interest (or'ed bitmask)
class representing a chunk of pages
PageChunk(const PageChunk &)
forbid copying
BidirMMapPipeException Exception
convenience typedef
PageChunk & operator=(const PageChunk &)
forbid assignment
unsigned m_nUsedGrp
number of used page groups
unsigned len() const
return length of chunk
unsigned nPagesPerGroup() const
return number of pages per page group
void * m_begin
pointer to start of mmapped area
static unsigned physPgSz()
return the physical page size of the system
static unsigned s_physpgsz
system physical page size
bool full() const
return true if no free page groups in this chunk
void * m_end
pointer one behind end of mmapped area
void push(const Pages &p)
push a group of pages onto the free list
bool contains(const Pages &p) const
return if p is contained in this PageChunk
PagePool * m_parent
parent page pool
std::list< void * > m_freelist
free pages list
static MMapVariety s_mmapworks
mmap variety that works on this system
MMapVariety
type of mmap support found
@ Copy
mmap doesn't work, have to copy back and forth
@ Unknown
don't know yet what'll work
@ DevZero
mmapping /dev/zero works
@ Anonymous
anonymous mmap works
@ FileBacked
mmapping a temp file works
static void domunmap(void *p, unsigned len)
munmap pages p, len is length of mmapped area in bytes
static unsigned s_pagesize
logical page size (run-time determined)
unsigned m_nPgPerGrp
number of pages per group
static MMapVariety mmapVariety()
return mmap variety support found
Pages pop()
pop a group of pages off the free list
static unsigned pagesize()
return the logical page size
static void * dommap(unsigned len)
mmap pages, len is length of mmapped area in bytes
bool empty() const
return true if no used page groups in this chunk
void zap(Pages &p)
free all pages except for those pointed to by p
static unsigned getPageSize()
determine page size at run time
handle class for a number of Pages
unsigned operator[](Page *p) const
perform page to page number mapping
Pages()
default constructor
static unsigned pagesize()
return page size
impl * m_pimpl
pointer to implementation
Page * operator[](unsigned pgno) const
return page number pageno
Pages & operator=(const Pages &other)
assignment operator
void swap(Pages &other)
swap with other's contents
Page * page(unsigned pgno) const
return page number pageno
unsigned npages() const
return number of pages accessible
unsigned pageno(Page *p) const
perform page to page number mapping
BidirMMapPipe creates a bidirectional channel between the current process and a child it forks.
STREAMOP(short)
C++ style stream operators for short.
BidirMMapPipe & operator>>(T *&tptr)
read raw pointer to T from other side
void purge()
purge buffered data waiting to be read and/or written
STREAMOP(unsigned long long)
C++ style stream operators for unsigned long long.
static BidirMMapPipe_impl::PagePool * s_pagepool
pool of mmapped pages
void feedPageLists(Page *plist)
"feed" the busy and free lists with a list of pages
static std::list< BidirMMapPipe * > s_openpipes
list of open BidirMMapPipes
int close()
flush buffers, close pipe
STREAMOP(float)
C++ style stream operators for float.
static BidirMMapPipe & purge(BidirMMapPipe &pipe)
for usage a la "pipe << purge;"
bool usesSocketpair() const
if BidirMMapPipe uses a socketpair for communications
BidirMMapPipe_impl::Page Page
convenience typedef for Page
STREAMOP(long long)
C++ style stream operators for long long.
STREAMOP(long)
C++ style stream operators for long.
BidirMMapPipe_impl::Pages m_pages
mmapped pages
bool good() const
status of stream is good
static size_type xferraw(int fd, void *addr, const size_type len, ssize_t(*xferfn)(int, const void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
Page * m_dirtylist
linked list: dirty pages (data to be sent)
bool usesPipepair() const
if BidirMMapPipe uses a pipe pair for communications
pid_t m_parentPid
pid of the parent
STREAMOP(bool)
C++ style stream operators for bool.
@ wrerrbit
write error
@ badbit
general I/O error
@ eofbit
end of file reached
@ failbit
logical failure (e.g. pipe closed)
@ exceptionsbit
error reporting with exceptions
@ rderrbit
read error
unsigned recvpages()
receive a pages from the other end (may block), queue them
BidirMMapPipe & operator>>(char *(&str))
read a C-style string
static pthread_mutex_t s_openpipesmutex
protects s_openpipes
STREAMOP(double)
C++ style stream operators for double.
unsigned recvpages_nonblock()
receive pages from other end (non-blocking)
BidirMMapPipe & operator<<(const T *tptr)
write raw pointer to T to other side
~BidirMMapPipe()
destructor
int m_inpipe
pipe end from which data may be read
STREAMOP(unsigned long)
C++ style stream operators for unsigned long.
BidirMMapPipe & operator>>(BidirMMapPipe &(*manip)(BidirMMapPipe &))
I/O manipulator support.
bool isChild() const
return if this end of the pipe is the child end
void doFlush(bool forcePartialPages=true)
perform the flush
static BidirMMapPipe & flush(BidirMMapPipe &pipe)
for usage a la "pipe << flush;"
static void teardownall(void)
cleanup routine - at exit, we want our children to get a SIGTERM...
size_type bytesWritableNonBlocking()
number of bytes that can be written without blocking
STREAMOP(int)
C++ style stream operators for int.
STREAMOP(unsigned short)
C++ style stream operators for unsigned short.
static BidirMMapPipe_impl::PagePool & pagepool()
return page pool
bool eof() const
true if end-of-file
size_type bytesReadableNonBlocking()
number of bytes that can be read without blocking
pid_t pidOtherEnd() const
return PID of the process on the other end of the pipe
@ PagesPerEnd
pages per pipe end
@ TotPages
pages shared (child + parent)
@ FlushThresh
flush threshold
static int debugflag()
return the current setting of the debug flag
bool isParent() const
return if this end of the pipe is the parent end
std::size_t size_type
type used to represent sizes
int doClose(bool force, bool holdlock=false)
close the pipe (no flush if forced)
bool bad() const
true on I/O error
PollFlags
condition flags for poll
@ WriteInvalid
write end of pipe invalid
@ Invalid
invalid pipe
@ ReadEndOfFile
read pipe in end-of-file state
@ WriteError
pipe error Write end
@ ReadError
pipe error read end
@ Error
pipe error
@ Readable
pipe has data for reading
@ None
nothing special on this pipe
@ ReadInvalid
read end of pipe invalid
@ WriteEndOfFile
write pipe in end-of-file state
@ Writable
pipe can be written to
@ EndOfFile
end of file
int m_flags
flags (e.g. end of file)
Page * m_freelist
linked list: free pages
Page * busypage()
get a busy page to read data from (may block)
int rdstate() const
return flags (end of file, BidirMMapPipe closed, ...)
Page * dirtypage()
get a dirty page to write data to (may block)
BidirMMapPipe & operator=(const BidirMMapPipe &)
assignment forbidden
void sendpages(Page *plist)
send page(s) to the other end (may block)
BidirMMapPipe & operator<<(const char *str)
write a C-style string
static size_type xferraw(int fd, void *addr, size_type len, ssize_t(*xferfn)(int, void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
STREAMOP(char)
C++ style stream operators for char.
BidirMMapPipe & operator<<(BidirMMapPipe &(*manip)(BidirMMapPipe &))
I/O manipulator support.
STREAMOP(unsigned char)
C++ style stream operators for unsigned char.
size_type read(void *addr, size_type sz)
read from pipe
STREAMOP(unsigned int)
C++ style stream operators for unsigned int.
static int s_debugflag
debug flag
bool closed() const
true if closed
BidirMMapPipe_impl::BidirMMapPipeException Exception
convenience typedef for BidirMMapPipeException
size_type write(const void *addr, size_type sz)
wirte to pipe
bool operator!() const
return true if serious error (fail/bad)
friend class BidirMMapPipe_impl::Page
page is our friend
std::vector< PollEntry > PollVector
convenience typedef for poll() interface
void flush()
flush buffers with unwritten data
bool fail() const
logical failure (e.g.
static unsigned s_pagepoolrefcnt
page pool reference counter
static void setDebugflag(int flag)
set the debug flags
Page * m_busylist
linked list: busy pages (data to be read)
static int poll(PollVector &pipes, int timeout)
poll a set of pipes for events (ready to read from, ready to write to, error)
int m_outpipe
pipe end to which data may be written
void markPageDirty(Page *p)
put on dirty pages list
static unsigned lenPageList(const Page *list)
return length of a page list
pid_t m_childPid
pid of the child (zero if we're child)
namespace for implementation details of BidirMMapPipe
unsigned char m_npages
length in pages
Page * m_pages
pointer to first page
unsigned m_refcnt
reference counter
PageChunk * m_parent
pointer to parent pool