Logo ROOT   6.10/09
Reference Guide
BidirMMapPipe.h
Go to the documentation of this file.
1 /** @file BidirMMapPipe.h
2  *
3  * header file for BidirMMapPipe, a class which forks off a child process and
4  * serves as communications channel between parent and child
5  *
6  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
7  * @date 2013-07-07
8  */
9 
10 #ifndef BIDIRMMAPPIPE_H
11 #define BIDIRMMAPPIPE_H
12 
13 #include <list>
14 #include <vector>
15 #include <cassert>
16 #include <cstring>
17 #include <unistd.h>
18 
19 #define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
20 #define END_NAMESPACE_ROOFIT }
21 
23 
24 /// namespace for implementation details of BidirMMapPipe
25 namespace BidirMMapPipe_impl {
26  // forward declarations
27  class BidirMMapPipeException;
28  class Page;
29  class PagePool;
30  class Pages;
31 
32  /** @brief class representing a chunk of pages
33  *
34  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
35  * @date 2013-07-24
36  *
37  * allocating pages from the OS happens in chunks in order to not exhaust
38  * the maximum allowed number of memory mappings per process; this class
39  * takes care of such a chunk
40  *
41  * a page chunk allows callers to obtain or release pages in groups of
42  * continuous pages of fixed size
43  */
44  class PageChunk {
45  public:
46  /// type of mmap support found
47  typedef enum {
48  Unknown, ///< don't know yet what'll work
49  Copy, ///< mmap doesn't work, have to copy back and forth
50  FileBacked, ///< mmapping a temp file works
51  DevZero, ///< mmapping /dev/zero works
52  Anonymous ///< anonymous mmap works
53  } MMapVariety;
54 
55  private:
56  static unsigned s_physpgsz; ///< system physical page size
57  static unsigned s_pagesize; ///< logical page size (run-time determined)
58  /// mmap variety that works on this system
59  static MMapVariety s_mmapworks;
60 
61  /// convenience typedef
62  typedef BidirMMapPipeException Exception;
63 
64  void* m_begin; ///< pointer to start of mmapped area
65  void* m_end; ///< pointer one behind end of mmapped area
66  // FIXME: cannot keep freelist inline - other end may need that
67  // data, and we'd end up overwriting the page header
68  std::list<void*> m_freelist; ///< free pages list
69  PagePool* m_parent; ///< parent page pool
70  unsigned m_nPgPerGrp; ///< number of pages per group
71  unsigned m_nUsedGrp; ///< number of used page groups
72 
73  /// determine page size at run time
74  static unsigned getPageSize();
75 
76  /// mmap pages, len is length of mmapped area in bytes
77  static void* dommap(unsigned len);
78  /// munmap pages p, len is length of mmapped area in bytes
79  static void domunmap(void* p, unsigned len);
80  /// forbid copying
81  PageChunk(const PageChunk&) {}
82  /// forbid assignment
83  PageChunk& operator=(const PageChunk&) { return *this; }
84  public:
85  /// return the logical page size
86  static unsigned pagesize() { return s_pagesize; }
87  /// return the physical page size of the system
88  static unsigned physPgSz() { return s_physpgsz; }
89  /// return mmap variety support found
90  static MMapVariety mmapVariety() { return s_mmapworks; }
91 
92  /// constructor
93  PageChunk(PagePool* parent, unsigned length, unsigned nPgPerGroup);
94 
95  /// destructor
96  ~PageChunk();
97 
98  /// return if p is contained in this PageChunk
99  bool contains(const Pages& p) const;
100 
101  /// pop a group of pages off the free list
102  Pages pop();
103 
104  /// push a group of pages onto the free list
105  void push(const Pages& p);
106 
107  /// return length of chunk
108  unsigned len() const
109  {
110  return reinterpret_cast<unsigned char*>(m_end) -
111  reinterpret_cast<unsigned char*>(m_begin);
112  }
113  /// return number of pages per page group
114  unsigned nPagesPerGroup() const { return m_nPgPerGrp; }
115 
116  /// return true if no used page groups in this chunk
117  bool empty() const { return !m_nUsedGrp; }
118 
119  /// return true if no free page groups in this chunk
120  bool full() const { return m_freelist.empty(); }
121 
122  /// free all pages except for those pointed to by p
123  void zap(Pages& p);
124  };
125 
126  /** @brief handle class for a number of Pages
127  *
128  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
129  * @date 2013-07-24
130  *
131  * the associated pages are continuous in memory
132  */
133  class Pages {
134  private:
135  /// implementation
136  typedef struct {
137  PageChunk *m_parent; ///< pointer to parent pool
138  Page* m_pages; ///< pointer to first page
139  unsigned m_refcnt; ///< reference counter
140  unsigned char m_npages; ///< length in pages
141  } impl;
142  public:
143  /// default constructor
144  Pages() : m_pimpl(0) { }
145 
146  /// destructor
147  ~Pages();
148 
149  /** @brief copy constructor
150  *
151  * copy Pages handle to new object - old object loses ownership,
152  * and becomes a dangling handle
153  */
154  Pages(const Pages& other);
155 
156  /** @brief assignment operator
157  *
158  * assign Pages handle to new object - old object loses ownership,
159  * and becomes a dangling handle
160  */
161  Pages& operator=(const Pages& other);
162 
163  /// return page size
164  static unsigned pagesize();
165 
166  /// return number of pages accessible
167  unsigned npages() const { return m_pimpl->m_npages; }
168 
169  /// return page number pageno
170  Page* page(unsigned pgno) const;
171 
172  /// return page number pageno
173  Page* operator[](unsigned pgno) const { return page(pgno); }
174 
175  /// perform page to page number mapping
176  unsigned pageno(Page* p) const;
177 
178  /// perform page to page number mapping
179  unsigned operator[](Page* p) const { return pageno(p); }
180 
181  /// swap with other's contents
182  void swap(Pages& other)
183  {
184  impl* tmp = other.m_pimpl;
185  other.m_pimpl = m_pimpl;
186  m_pimpl = tmp;
187  }
188 
189  private:
190  /// page pool is our friend - it's allowed to construct Pages
192 
193  /// pointer to implementation
195 
196  /// constructor
197  Pages(PageChunk* parent, Page* pages, unsigned npg);
198  };
199 }
200 
201 /** @brief BidirMMapPipe creates a bidirectional channel between the current
202  * process and a child it forks.
203  *
204  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
205  * @date 2013-07-07
206  *
207  * This class creates a bidirectional channel between this process and a child
208  * it creates with fork().
209  *
210  * The channel is comrised of a small shared pool of buffer memory mmapped into
211  * both process spaces, and two pipes to synchronise the exchange of data. The
212  * idea behind using the pipes at all is to have some primitive which we can
213  * block on without having to worry about atomic operations or polling, leaving
214  * these tasks to the OS. In case the anonymous mmap cannot be performed on the
215  * OS the code is running on (for whatever reason), the code falls back to
216  * mmapping /dev/zero, mmapping a temporary file, or (if those all fail), a
217  * dynamically allocated buffer which is then transmitted through the pipe(s),
218  * a slightly slower alternative (because the data is copied more often).
219  *
220  * The channel supports five major operations: read(), write(), flush(),
221  * purge() and close(). Reading and writing may block until the required buffer
222  * space is available. Writes may queue up data to be sent to the other end
223  * until either enough pages are full, or the user calls flush which forces
224  * any unsent buffers to be sent to the other end. flush forces any data that
225  * is to be sent to be sent. purge discards any buffered data waiting to be
226  * read and/or sent. Closing the channel on the child returns zero, closing it
227  * on the parent returns the child's exit status.
228  *
229  * The class also provides operator<< and operator>> for C++-style I/O for
230  * basic data types (bool, char, short, int, long, long long, float, double
231  * and their unsigned counterparts). Data is transmitted binary (i.e. no
232  * formatting to strings like std::cout does). There are also overloads to
233  * support C-style zero terminated strings and std::string. In terms of
234  * performance, the former is to be preferred.
235  *
236  * If the caller needs to multiplex input and output to/from several pipes, the
237  * class provides the poll() method which allows to block until an event occurs
238  * on any of the polled pipes.
239  *
240  * After the BidirMMapPipe is closed, no further operations may be performed on
241  * that object, save for the destructor which may still be called.
242  *
243  * If the BidirMMapPipe has not properly been closed, the destructor will call
244  * close. However, the exit code of the child is lost in that case.
245  *
246  * Closing the object causes the mmapped memory to be unmapped and the two
247  * pipes to be closed. We also install an atexit handler in the process of
248  * creating BidirMMapPipes. This ensures that when the current process
249  * terminates, a SIGTERM signal is sent to the child processes created for all
250  * unclosed pipes to avoid leaving zombie processes in the OS's process table.
251  *
252  * BidirMMapPipe creation, closing and destruction are thread safe. If the
253  * BidirMMapPipe is used in more than one thread, the other operations have to
254  * be protected with a mutex (or something similar), though.
255  *
256  * End of file (other end closed its pipe, or died) is indicated with the eof()
257  * method, serious I/O errors set a flags (bad(), fail()), and also throw
258  * exceptions. For normal read/write operations, they can be suppressed (i.e.
259  * error reporting only using flags) with a constructor argument.
260  *
261  * Technicalities:
262  * - there is a pool of mmapped pages, half the pages are allocated to the
263  * parent process, half to the child
264  * - when one side has accumulated enough data (or a flush forces dirty pages
265  * out to the other end), it sends these pages to the other end by writing a
266  * byte containing the page number into the pipe
267  * - the other end (which has the pages mmapped, too) reads the page number(s)
268  * and puts the corresponding pages on its busy list
269  * - as the other ends reads, it frees busy pages, and eventually tries to put
270  * them on the its list; if a page belongs to the other end of the
271  * connection, it is sent back
272  * - lists of pages are sent across the pipe, not individual pages, in order
273  * to minimise the number of read/write operations needed
274  * - when mmap works properly, only one bytes containing the page number of
275  * the page list head is sent back and forth; the contents of that page
276  * allow to access the rest of the page list sent, and page headers on the
277  * list tell the receiving end if the page is free or has to be added to the
278  * busy list
279  * - when mmap does not work, we transfer one byte to indicate the head of the
280  * page list sent, and for each page on the list of sent pages, the page
281  * header and the page payload is sent (if the page is free, we only
282  * transmit the page header, and we never transmit more payload than
283  * the page actually contains)
284  * - in the child, all open BidirMMapPipes but the current one are closed. this
285  * is done for two reasons: first, to conserve file descriptors and address
286  * space. second, if more than one process is meant to use such a
287  * BidirMMapPipe, synchronisation issues arise which can lead to bugs that
288  * are hard to find and understand. it's much better to come up with a design
289  * which does not need pipes to be shared among more than two processes.
290  *
291  * Here is a trivial example of a parent and a child talking to each other over
292  * a BidirMMapPipe:
293  * @code
294  * #include <string>
295  * #include <iostream>
296  * #include <cstdlib>
297  *
298  * #include "BidirMMapPipe.h"
299  *
300  * int simplechild(BidirMMapPipe& pipe)
301  * {
302  * // child does an echo loop
303  * while (pipe.good() && !pipe.eof()) {
304  * // read a string
305  * std::string str;
306  * pipe >> str;
307  * if (!pipe) return -1;
308  * if (pipe.eof()) break;
309  * // check if parent wants us to shut down
310  * if (!str.empty()) {
311  * std::cout << "[CHILD] : read: " << str << std::endl;
312  * str = "... early in the morning?";
313  * }
314  * pipe << str << BidirMMapPipe::flush;
315  * if (str.empty()) break;
316  * if (!pipe) return -1;
317  * std::cout << "[CHILD] : wrote: " << str << std::endl;
318  * }
319  * // send shutdown request acknowledged
320  * pipe << "" << BidirMMapPipe::flush;
321  *
322  * pipe.close();
323  * return 0;
324  * }
325  *
326  * BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
327  * {
328  * BidirMMapPipe *p = new BidirMMapPipe();
329  * if (p->isChild()) {
330  * int retVal = childexec(*p);
331  * delete p;
332  * std::exit(retVal);
333  * }
334  * return p;
335  * }
336  *
337  * int main()
338  * {
339  * std::cout << "[PARENT]: simple challenge-response test, one child:" <<
340  * std::endl;
341  * BidirMMapPipe* pipe = spawnChild(simplechild);
342  * for (int i = 0; i < 5; ++i) {
343  * std::string str("What shall we do with a drunken sailor...");
344  * *pipe << str << BidirMMapPipe::flush;
345  * if (!*pipe) return -1;
346  * std::cout << "[PARENT]: wrote: " << str << std::endl;
347  * *pipe >> str;
348  * if (!*pipe) return -1;
349  * std::cout << "[PARENT]: read: " << str << std::endl;
350  * }
351  * // ask child to shut down
352  * pipe << "" << BidirMMapPipe::flush;
353  * // wait for it to see the shutdown request
354  * std::string s;
355  * pipe >> s;
356  * std::cout << "[PARENT]: exit status of child: " << pipe->close() <<
357  * std::endl;
358  * delete pipe;
359  * return 0;
360  * }
361  * @endcode
362  *
363  * When designing your own protocols to use over the pipe, there are a few
364  * things to bear in mind:
365  * - Do as http does: When building a request, send all the options and
366  * properties of that request with the request itself in a single go (one
367  * flush). Then, the server has everything it needs, and hopefully, it'll
368  * shut up for a while and to let the client do something useful in the
369  * meantime... The same goes when the server replies to the request: include
370  * everything there is to know about the result of the request in the reply.
371  * - The expensive operation should be the request that is made, all other
372  * operations should somehow be formulated as options or properties to that
373  * request.
374  * - Include a shutdown handshake in whatever protocol you send over the
375  * pipe. That way, you can shut things down in a controlled way. Otherwise,
376  * and depending on your OS's scheduling quirks, you may catch a SIGPIPE if
377  * one end closes its pipe while the other is still trying to read.
378  */
380 #ifndef _WIN32
381  public:
382  /// type used to represent sizes
383  typedef std::size_t size_type;
384  /// convenience typedef for BidirMMapPipeException
385  typedef BidirMMapPipe_impl::BidirMMapPipeException Exception;
386  /// flag bits for partial C++ iostream compatibility
387  enum {
388  eofbit = 1, ///< end of file reached
389  failbit = 2, ///< logical failure (e.g. pipe closed)
390  rderrbit = 4, ///< read error
391  wrerrbit = 8, ///< write error
392  badbit = rderrbit | wrerrbit, ///< general I/O error
393  exceptionsbit = 16 ///< error reporting with exceptions
394  };
395 
396  /** @brief constructor (forks!)
397  *
398  * Creates a bidirectional communications channel between this process
399  * and a child the constructor forks. On return from the constructor,
400  * isParent() and isChild() can be used to tell the parent end from the
401  * child end of the pipe. In the child, all other open BidirMMapPipes
402  * are closed.
403  *
404  * @param useExceptions read()/write() error reporting also done using
405  * exceptions
406  * @param useSocketpair use a socketpair instead of a pair or pipes
407  *
408  * Normally, exceptions are thrown for all serious I/O errors (apart
409  * from end of file). Setting useExceptions to false will force the
410  * read() and write() methods to only report serious I/O errors using
411  * flags.
412  *
413  * When useSocketpair is true, use a pair of Unix domain sockets
414  * created using socketpair instead a pair of pipes. The advantage is
415  * that only one pair of file descriptors is needed instead of two
416  * pairs which are needed for the pipe pair. Performance should very
417  * similar on most platforms, especially if mmap works, since only
418  * very little data is sent through the pipe(s)/socketpair.
419  */
420  BidirMMapPipe(bool useExceptions = true, bool useSocketpair = false);
421 
422  /** @brief destructor
423  *
424  * closes this end of pipe
425  */
426  ~BidirMMapPipe();
427 
428  /** @brief return the current setting of the debug flag
429  *
430  * @returns an integer with the debug Setting
431  */
432  static int debugflag() { return s_debugflag; }
433 
434  /** @brief set the debug flags
435  *
436  * @param flag debug flags (if zero, no messages are printed)
437  */
438  static void setDebugflag(int flag) { s_debugflag = flag; }
439 
440  /** @brief read from pipe
441  *
442  * @param addr where to put read data
443  * @param sz size of data to read (in bytes)
444  * @returns size of data read, or 0 in case of end-of-file
445  *
446  * read may block until data from other end is available. It will
447  * return 0 if the other end closed the pipe.
448  */
449  size_type read(void* addr, size_type sz);
450 
451  /** @brief wirte to pipe
452  *
453  * @param addr where to get data to write from
454  * @param sz size of data to write (in bytes)
455  * @returns size of data written, or 0 in case of end-of-file
456  *
457  * write may block until data can be written to other end (depends a
458  * bit on available buffer space). It will return 0 if the other end
459  * closed the pipe. The data is queued to be written on the next
460  * convenient occasion, or it can be forced out with flush().
461  */
462  size_type write(const void* addr, size_type sz);
463 
464  /** @brief flush buffers with unwritten data
465  *
466  * This forces unwritten data to be written to the other end. The call
467  * will block until this has been done (or the attempt failed with an
468  * error).
469  */
470  void flush();
471 
472  /** @brief purge buffered data waiting to be read and/or written
473  *
474  * Discards all internal buffers.
475  */
476  void purge();
477 
478  /** @brief number of bytes that can be read without blocking
479  *
480  * @returns number of bytes that can be read without blocking
481  */
482  size_type bytesReadableNonBlocking();
483 
484  /** @brief number of bytes that can be written without blocking
485  *
486  * @returns number of bytes that can be written without blocking
487  */
488  size_type bytesWritableNonBlocking();
489 
490  /** @brief flush buffers, close pipe
491  *
492  * Flush buffers, discard unread data, closes the pipe. If the pipe is
493  * in the parent process, it waits for the child.
494  *
495  * @returns exit code of child process in parent, zero in child
496  */
497  int close();
498 
499  /** @brief return PID of the process on the other end of the pipe
500  *
501  * @returns PID of the process running on the remote end
502  */
503  pid_t pidOtherEnd() const
504  { return isChild() ? m_parentPid : m_childPid; }
505 
506  /// condition flags for poll
507  enum PollFlags {
508  None = 0, ///< nothing special on this pipe
509  Readable = 1, ///< pipe has data for reading
510  Writable = 2, ///< pipe can be written to
511  ReadError = 4, ///< pipe error read end
512  WriteError = 8, ///< pipe error Write end
513  Error = ReadError | WriteError, ///< pipe error
514  ReadEndOfFile = 32, ///< read pipe in end-of-file state
515  WriteEndOfFile = 64,///< write pipe in end-of-file state
516  EndOfFile = ReadEndOfFile | WriteEndOfFile, ///< end of file
517  ReadInvalid = 64, ///< read end of pipe invalid
518  WriteInvalid = 128, ///< write end of pipe invalid
519  Invalid = ReadInvalid | WriteInvalid ///< invalid pipe
520  };
521 
522  /// for poll() interface
523  class PollEntry {
524  public:
525  BidirMMapPipe* pipe; ///< pipe of interest
526  unsigned events; ///< events of interest (or'ed bitmask)
527  unsigned revents; ///< events that happened (or'ed bitmask)
528  /// poll a pipe for all events
530  pipe(_pipe), events(None), revents(None) { }
531  /// poll a pipe for specified events
532  PollEntry(BidirMMapPipe* _pipe, int _events) :
533  pipe(_pipe), events(_events), revents(None) { }
534  };
535  /// convenience typedef for poll() interface
536  typedef std::vector<PollEntry> PollVector;
537 
538  /** @brief poll a set of pipes for events (ready to read from, ready to
539  * write to, error)
540  *
541  * @param pipes set of pipes to check
542  * @param timeout timeout in milliseconds
543  * @returns positive number: number of pipes which have
544  * status changes, 0: timeout, or no pipes with
545  * status changed, -1 on error
546  *
547  * Timeout can be zero (check for specified events, and return), finite
548  * (wait at most timeout milliseconds before returning), or -1
549  * (infinite). The poll method returns when the timeout has elapsed,
550  * or if an event occurs on one of the pipes being polled, whichever
551  * happens earlier.
552  *
553  * Pipes is a vector of one or more PollEntries, which each list a pipe
554  * and events to poll for. If events is left empty (zero), all
555  * conditions are polled for, otherwise only the indicated ones. On
556  * return, the revents fields contain the events that occurred for each
557  * pipe; error Error, EndOfFile or Invalid events are always set,
558  * regardless of wether they were in the set of requested events.
559  *
560  * poll may block slightly longer than specified by timeout due to OS
561  * timer granularity and OS scheduling. Due to its implementation, the
562  * poll call can also return early if the remote end of the page sends
563  * a free page while polling (which is put on that pipe's freelist),
564  * while that pipe is polled for e.g Reading. The status of the pipe is
565  * indicated correctly in revents, and the caller can simply poll
566  * again. (The reason this is done this way is because it helps to
567  * replenish the pool of free pages and queue busy pages without
568  * blocking.)
569  *
570  * Here's a piece of example code waiting on two pipes; if they become
571  * readable they are read:
572  * @code
573  * #include <unistd.h>
574  * #include <cstdlib>
575  * #include <string>
576  * #include <sstream>
577  * #include <iostream>
578  *
579  * #include "BidirMMapPipe.h"
580  *
581  * // what to execute in the child
582  * int randomchild(BidirMMapPipe& pipe)
583  * {
584  * ::srand48(::getpid());
585  * for (int i = 0; i < 5; ++i) {
586  * // sleep a random time between 0 and .9 seconds
587  * ::usleep(int(1e6 * ::drand48()));
588  * std::ostringstream buf;
589  * buf << "child pid " << ::getpid() << " sends message " << i;
590  * std::cout << "[CHILD] : " << buf.str() << std::endl;
591  * pipe << buf.str() << BidirMMapPipe::flush;
592  * if (!pipe) return -1;
593  * if (pipe.eof()) break;
594  * }
595  * // tell parent we're done
596  * pipe << "" << BidirMMapPipe::flush;
597  * // wait for parent to acknowledge
598  * std::string s;
599  * pipe >> s;
600  * pipe.close();
601  * return 0;
602  * }
603  *
604  * // function to spawn a child
605  * BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
606  * {
607  * BidirMMapPipe *p = new BidirMMapPipe();
608  * if (p->isChild()) {
609  * int retVal = childexec(*p);
610  * delete p;
611  * std::exit(retVal);
612  * }
613  * return p;
614  * }
615  *
616  * int main()
617  * {
618  * typedef BidirMMapPipe::PollEntry PollEntry;
619  * // poll data structure
620  * BidirMMapPipe::PollVector pipes;
621  * pipes.reserve(3);
622  * // spawn children
623  * for (int i = 0; i < 3; ++i) {
624  * pipes.push_back(PollEntry(spawnChild(randomchild),
625  * BidirMMapPipe::Readable));
626  * }
627  * // while at least some children alive
628  * while (!pipes.empty()) {
629  * // poll, wait until status change (infinite timeout)
630  * int npipes = BidirMMapPipe::poll(pipes, -1);
631  * // scan for pipes with changed status
632  * for (std::vector<PollEntry>::iterator it = pipes.begin();
633  * npipes && pipes.end() != it; ) {
634  * if (!it->revents) {
635  * // unchanged, next one
636  * ++it;
637  * continue;
638  * }
639  * --npipes; // maybe we can stop early...
640  * // read from pipes which are readable
641  * if (it->revents & BidirMMapPipe::Readable) {
642  * std::string s;
643  * *(it->pipe) >> s;
644  * if (!s.empty()) {
645  * std::cout << "[PARENT]: Read from pipe " <<
646  * it->pipe << ": " << s << std::endl;
647  * ++it;
648  * continue;
649  * } else {
650  * // child is shutting down...
651  * *(it->pipe) << "" << BidirMMapPipe::flush;
652  * goto childcloses;
653  * }
654  * }
655  * // retire pipes with error or end-of-file condition
656  * if (it->revents & (BidirMMapPipe::Error |
657  * BidirMMapPipe::EndOfFile |
658  * BidirMMapPipe::Invalid)) {
659  * std::cout << "[PARENT]: Error on pipe " <<
660  * it->pipe << " revents " << it->revents <<
661  * std::endl;
662  * childcloses:
663  * std::cout << "[PARENT]:\tchild exit status: " <<
664  * it->pipe->close() << std::endl;
665  * if (retVal) return retVal;
666  * delete it->pipe;
667  * it = pipes.erase(it);
668  * continue;
669  * }
670  * }
671  * }
672  * return 0;
673  * }
674  * @endcode
675  */
676  static int poll(PollVector& pipes, int timeout);
677 
678  /** @brief return if this end of the pipe is the parent end
679  *
680  * @returns true if parent end of pipe
681  */
682  bool isParent() const { return m_childPid; }
683 
684  /** @brief return if this end of the pipe is the child end
685  *
686  * @returns true if child end of pipe
687  */
688  bool isChild() const { return !m_childPid; }
689 
690  /** @brief if BidirMMapPipe uses a socketpair for communications
691  *
692  * @returns true if BidirMMapPipe uses a socketpair for communications
693  */
694  bool usesSocketpair() const { return m_inpipe == m_outpipe; }
695 
696  /** @brief if BidirMMapPipe uses a pipe pair for communications
697  *
698  * @returns true if BidirMMapPipe uses a pipe pair for communications
699  */
700  bool usesPipepair() const { return m_inpipe != m_outpipe; }
701 
702  /** @brief return flags (end of file, BidirMMapPipe closed, ...)
703  *
704  * @returns flags (end of file, BidirMMapPipe closed, ...)
705  */
706  int rdstate() const { return m_flags; }
707 
708  /** @brief true if end-of-file
709  *
710  * @returns true if end-of-file
711  */
712  bool eof() const { return m_flags & eofbit; }
713 
714  /** @brief logical failure (e.g. I/O on closed BidirMMapPipe)
715  *
716  * @returns true in case of grave logical error (I/O on closed pipe,...)
717  */
718  bool fail() const { return m_flags & failbit; }
719 
720  /** @brief true on I/O error
721  *
722  * @returns true on I/O error
723  */
724  bool bad() const { return m_flags & badbit; }
725 
726  /** @brief status of stream is good
727  *
728  * @returns true if pipe is good (no errors, eof, ...)
729  */
730  bool good() const { return !(m_flags & (eofbit | failbit | badbit)); }
731 
732  /** @brief true if closed
733  *
734  * @returns true if stream is closed
735  */
736  bool closed() const { return m_flags & failbit; }
737 
738  /** @brief return true if not serious error (fail/bad)
739  *
740  * @returns true if stream is does not have serious error (fail/bad)
741  *
742  * (if EOF, this is still true)
743  */
744  operator bool() const { return !fail() && !bad(); }
745 
746  /** @brief return true if serious error (fail/bad)
747  *
748  * @returns true if stream has a serious error (fail/bad)
749  */
750  bool operator!() const { return fail() || bad(); }
751 
752 #ifdef STREAMOP
753 #undef STREAMOP
754 #endif
755 #define STREAMOP(TYPE) \
756  BidirMMapPipe& operator<<(const TYPE& val) \
757  { write(&val, sizeof(TYPE)); return *this; } \
758  BidirMMapPipe& operator>>(TYPE& val) \
759  { read(&val, sizeof(TYPE)); return *this; }
760  STREAMOP(bool); ///< C++ style stream operators for bool
761  STREAMOP(char); ///< C++ style stream operators for char
762  STREAMOP(short); ///< C++ style stream operators for short
763  STREAMOP(int); ///< C++ style stream operators for int
764  STREAMOP(long); ///< C++ style stream operators for long
765  STREAMOP(long long); ///< C++ style stream operators for long long
766  STREAMOP(unsigned char); ///< C++ style stream operators for unsigned char
767  STREAMOP(unsigned short); ///< C++ style stream operators for unsigned short
768  STREAMOP(unsigned int); ///< C++ style stream operators for unsigned int
769  STREAMOP(unsigned long); ///< C++ style stream operators for unsigned long
770  STREAMOP(unsigned long long); ///< C++ style stream operators for unsigned long long
771  STREAMOP(float); ///< C++ style stream operators for float
772  STREAMOP(double); ///< C++ style stream operators for double
773 #undef STREAMOP
774 
775  /** @brief write a C-style string
776  *
777  * @param str C-style string
778  * @returns pipe written to
779  */
780  BidirMMapPipe& operator<<(const char* str);
781 
782  /** @brief read a C-style string
783  *
784  * @param str pointer to string (space allocated with malloc!)
785  * @returns pipe read from
786  *
787  * since this is for C-style strings, we use malloc/realloc/free for
788  * strings. passing in a NULL pointer is valid here, and the routine
789  * will use realloc to allocate a chunk of memory of the right size.
790  */
791  BidirMMapPipe& operator>>(char* (&str));
792 
793  /** @brief write a std::string object
794  *
795  * @param str string to write
796  * @returns pipe written to
797  */
798  BidirMMapPipe& operator<<(const std::string& str);
799 
800  /** @brief read a std::string object
801  *
802  * @param str string to be read
803  * @returns pipe read from
804  */
805  BidirMMapPipe& operator>>(std::string& str);
806 
807  /** @brief write raw pointer to T to other side
808  *
809  * NOTE: This will not write the pointee! Only the value of the
810  * pointer is transferred.
811  *
812  * @param tptr pointer to be written
813  * @returns pipe written to
814  */
815  template<class T> BidirMMapPipe& operator<<(const T* tptr)
816  { write(&tptr, sizeof(tptr)); return *this; }
817 
818  /** @brief read raw pointer to T from other side
819  *
820  * NOTE: This will not read the pointee! Only the value of the
821  * pointer is transferred.
822  *
823  * @param tptr pointer to be read
824  * @returns pipe read from
825  */
826  template<class T> BidirMMapPipe& operator>>(T* &tptr)
827  { read(&tptr, sizeof(tptr)); return *this; }
828 
829  /** @brief I/O manipulator support
830  *
831  * @param manip manipulator
832  * @returns pipe with manipulator applied
833  *
834  * example:
835  * @code
836  * pipe << BidirMMapPipe::flush;
837  * @endcode
838  */
840  { return manip(*this); }
841 
842  /** @brief I/O manipulator support
843  *
844  * @param manip manipulator
845  * @returns pipe with manipulator applied
846  *
847  * example:
848  * @code
849  * pipe >> BidirMMapPipe::purge;
850  * @endcode
851  */
853  { return manip(*this); }
854 
855  /// for usage a la "pipe << flush;"
856  static BidirMMapPipe& flush(BidirMMapPipe& pipe) { pipe.flush(); return pipe; }
857  /// for usage a la "pipe << purge;"
858  static BidirMMapPipe& purge(BidirMMapPipe& pipe) { pipe.purge(); return pipe; }
859 
860  private:
861  /// copy-construction forbidden
863  /// assignment forbidden
864  BidirMMapPipe& operator=(const BidirMMapPipe&) { return *this; }
865 
866  /// page is our friend
867  friend class BidirMMapPipe_impl::Page;
868  /// convenience typedef for Page
869  typedef BidirMMapPipe_impl::Page Page;
870 
871  /// tuning constants
872  enum {
873  // TotPages = 16 will give 32k buffers at 4k page size for both
874  // parent and child; if your average message to send is larger
875  // than this, consider raising the value (max 256)
876  TotPages = 16, ///< pages shared (child + parent)
877 
878  PagesPerEnd = TotPages / 2, ///< pages per pipe end
879 
880  // if FlushThresh pages are filled, the code forces a flush; 3/4
881  // of the pages available seems to work quite well
882  FlushThresh = (3 * PagesPerEnd) / 4 ///< flush threshold
883  };
884 
885  // per-class members
886  static pthread_mutex_t s_openpipesmutex; ///< protects s_openpipes
887  /// list of open BidirMMapPipes
888  static std::list<BidirMMapPipe*> s_openpipes;
889  /// pool of mmapped pages
890  static BidirMMapPipe_impl::PagePool* s_pagepool;
891  /// page pool reference counter
892  static unsigned s_pagepoolrefcnt;
893  /// debug flag
894  static int s_debugflag;
895 
896  /// return page pool
897  static BidirMMapPipe_impl::PagePool& pagepool();
898 
899  // per-instance members
900  BidirMMapPipe_impl::Pages m_pages; ///< mmapped pages
901  Page* m_busylist; ///< linked list: busy pages (data to be read)
902  Page* m_freelist; ///< linked list: free pages
903  Page* m_dirtylist; ///< linked list: dirty pages (data to be sent)
904  int m_inpipe; ///< pipe end from which data may be read
905  int m_outpipe; ///< pipe end to which data may be written
906  int m_flags; ///< flags (e.g. end of file)
907  pid_t m_childPid; ///< pid of the child (zero if we're child)
908  pid_t m_parentPid; ///< pid of the parent
909 
910  /// cleanup routine - at exit, we want our children to get a SIGTERM...
911  static void teardownall(void);
912 
913  /// return length of a page list
914  static unsigned lenPageList(const Page* list);
915 
916  /** "feed" the busy and free lists with a list of pages
917  *
918  * @param plist linked list of pages
919  *
920  * goes through plist, puts free pages from plist onto the freelist
921  * (or sends them to the remote end if they belong there), and puts
922  * non-empty pages on plist onto the busy list
923  */
924  void feedPageLists(Page* plist);
925 
926  /// put on dirty pages list
927  void markPageDirty(Page* p);
928 
929  /// transfer bytes through the pipe (reading, writing, may block)
930  static size_type xferraw(int fd, void* addr, size_type len,
931  ssize_t (*xferfn)(int, void*, std::size_t));
932  /// transfer bytes through the pipe (reading, writing, may block)
933  static size_type xferraw(int fd, void* addr, const size_type len,
934  ssize_t (*xferfn)(int, const void*, std::size_t))
935  {
936  return xferraw(fd, addr, len,
937  reinterpret_cast<ssize_t (*)(
938  int, void*, std::size_t)>(xferfn));
939  }
940 
941  /** @brief send page(s) to the other end (may block)
942  *
943  * @param plist linked list of pages to send
944  *
945  * the implementation gathers the different write(s) whereever
946  * possible; if mmap works, this results in a single write to transfer
947  * the list of pages sent, if we need to copy things through the pipe,
948  * we have one write to transfer which pages are sent, and then one
949  * write per page.
950  */
951  void sendpages(Page* plist);
952 
953  /** @brief receive a pages from the other end (may block), queue them
954  *
955  * @returns number of pages received
956  *
957  * this is an application-level scatter read, which gets the list of
958  * pages to read from the pipe. if mmap works, it needs only one read
959  * call (to get the head of the list of pages transferred). if we need
960  * to copy pages through the pipe, we need to add one read for each
961  * empty page, and two reads for each non-empty page.
962  */
963  unsigned recvpages();
964 
965  /** @brief receive pages from other end (non-blocking)
966  *
967  * @returns number of pages received
968  *
969  * like recvpages(), but does not block if nothing is available for
970  * reading
971  */
972  unsigned recvpages_nonblock();
973 
974  /// get a busy page to read data from (may block)
975  Page* busypage();
976  /// get a dirty page to write data to (may block)
977  Page* dirtypage();
978 
979  /// close the pipe (no flush if forced)
980  int doClose(bool force, bool holdlock = false);
981  /// perform the flush
982  void doFlush(bool forcePartialPages = true);
983 #endif //_WIN32
984 };
985 
987 
988 #undef BEGIN_NAMESPACE_ROOFIT
989 #undef END_NAMESPACE_ROOFIT
990 
991 #endif // BIDIRMMAPPIPE_H
992 
993 // vim: ft=cpp:sw=4:tw=78:et
std::size_t size_type
type used to represent sizes
bool usesSocketpair() const
if BidirMMapPipe uses a socketpair for communications
#define BEGIN_NAMESPACE_ROOFIT
Definition: BidirMMapPipe.h:19
PagePool * m_parent
parent page pool
Definition: BidirMMapPipe.h:69
unsigned char m_npages
length in pages
double read(const std::string &file_name)
reading
BidirMMapPipe & operator<<(const T *tptr)
write raw pointer to T to other side
BidirMMapPipe * pipe
pipe of interest
PollEntry(BidirMMapPipe *_pipe, int _events)
poll a pipe for specified events
unsigned revents
events that happened (or&#39;ed bitmask)
bool empty() const
return true if no used page groups in this chunk
double write(int n, const std::string &file_name, const std::string &vector_type, int compress=0)
writing
handle class for a number of Pages
PageChunk(const PageChunk &)
forbid copying
Definition: BidirMMapPipe.h:81
impl * m_pimpl
pointer to implementation
double T(double x)
Definition: ChebyshevPol.h:34
unsigned m_refcnt
reference counter
bool bad() const
true on I/O error
Page * m_freelist
linked list: free pages
don&#39;t know yet what&#39;ll work
Definition: BidirMMapPipe.h:48
BidirMMapPipeException Exception
convenience typedef
Definition: BidirMMapPipe.h:62
namespace for implementation details of BidirMMapPipe
unsigned events
events of interest (or&#39;ed bitmask)
bool isChild() const
return if this end of the pipe is the child end
int rdstate() const
return flags (end of file, BidirMMapPipe closed, ...)
PollEntry(BidirMMapPipe *_pipe)
poll a pipe for all events
static MMapVariety s_mmapworks
mmap variety that works on this system
Definition: BidirMMapPipe.h:59
#define STREAMOP(TYPE)
unsigned m_nUsedGrp
number of used page groups
Definition: BidirMMapPipe.h:71
bool fail() const
logical failure (e.g.
pid_t m_childPid
pid of the child (zero if we&#39;re child)
static size_type xferraw(int fd, void *addr, const size_type len, ssize_t(*xferfn)(int, const void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
bool usesPipepair() const
if BidirMMapPipe uses a pipe pair for communications
TBuffer & operator>>(TBuffer &buf, Tmpl *&obj)
Definition: TBuffer.h:374
TBuffer & operator<<(TBuffer &buf, const Tmpl *obj)
Definition: TBuffer.h:390
class representing a chunk of pages
Definition: BidirMMapPipe.h:44
Page * m_busylist
linked list: busy pages (data to be read)
static MMapVariety mmapVariety()
return mmap variety support found
Definition: BidirMMapPipe.h:90
void flush()
flush buffers with unwritten data
BidirMMapPipe_impl::Page Page
convenience typedef for Page
Page * m_dirtylist
linked list: dirty pages (data to be sent)
PollFlags
condition flags for poll
#define END_NAMESPACE_ROOFIT
Definition: BidirMMapPipe.h:20
static unsigned physPgSz()
return the physical page size of the system
Definition: BidirMMapPipe.h:88
unsigned nPagesPerGroup() const
return number of pages per page group
Pages()
default constructor
static unsigned s_physpgsz
system physical page size
Definition: BidirMMapPipe.h:56
void swap(Pages &other)
swap with other&#39;s contents
void purge()
purge buffered data waiting to be read and/or written
std::vector< PollEntry > PollVector
convenience typedef for poll() interface
void Error(const char *location, const char *msgfmt,...)
static int debugflag()
return the current setting of the debug flag
unsigned m_nPgPerGrp
number of pages per group
Definition: BidirMMapPipe.h:70
bool eof() const
true if end-of-file
#define None
Definition: TGWin32.h:55
BidirMMapPipe & operator=(const BidirMMapPipe &)
assignment forbidden
static BidirMMapPipe & purge(BidirMMapPipe &pipe)
for usage a la "pipe << purge;"
Pages pop()
pop a group of pages off the free list
static void setDebugflag(int flag)
set the debug flags
int m_outpipe
pipe end to which data may be written
void * m_begin
pointer to start of mmapped area
Definition: BidirMMapPipe.h:64
static BidirMMapPipe & flush(BidirMMapPipe &pipe)
for usage a la "pipe << flush;"
bool isParent() const
return if this end of the pipe is the parent end
BidirMMapPipe creates a bidirectional channel between the current process and a child it forks...
pid_t m_parentPid
pid of the parent
static void domunmap(void *p, unsigned len)
munmap pages p, len is length of mmapped area in bytes
static pthread_mutex_t s_openpipesmutex
protects s_openpipes
static int s_debugflag
debug flag
static BidirMMapPipe_impl::PagePool * s_pagepool
pool of mmapped pages
Page * operator[](unsigned pgno) const
return page number pageno
MMapVariety
type of mmap support found
Definition: BidirMMapPipe.h:47
PageChunk & operator=(const PageChunk &)
forbid assignment
Definition: BidirMMapPipe.h:83
void zap(Pages &p)
free all pages except for those pointed to by p
for poll() interface
static void * dommap(unsigned len)
mmap pages, len is length of mmapped area in bytes
static unsigned getPageSize()
determine page size at run time
bool contains(const Pages &p) const
return if p is contained in this PageChunk
std::list< void * > m_freelist
free pages list
Definition: BidirMMapPipe.h:68
unsigned len() const
return length of chunk
bool operator!() const
return true if serious error (fail/bad)
void * m_end
pointer one behind end of mmapped area
Definition: BidirMMapPipe.h:65
PageChunk * m_parent
pointer to parent pool
BidirMMapPipe_impl::BidirMMapPipeException Exception
convenience typedef for BidirMMapPipeException
unsigned npages() const
return number of pages accessible
unsigned operator[](Page *p) const
perform page to page number mapping
void push(const Pages &p)
push a group of pages onto the free list
int m_inpipe
pipe end from which data may be read
BidirMMapPipe & operator>>(T *&tptr)
read raw pointer to T from other side
BidirMMapPipe & operator<<(BidirMMapPipe &(*manip)(BidirMMapPipe &))
I/O manipulator support.
Page * m_pages
pointer to first page
BidirMMapPipe_impl::Pages m_pages
mmapped pages
static std::list< BidirMMapPipe * > s_openpipes
list of open BidirMMapPipes
static unsigned s_pagepoolrefcnt
page pool reference counter
static unsigned s_pagesize
logical page size (run-time determined)
Definition: BidirMMapPipe.h:57
bool full() const
return true if no free page groups in this chunk
bool good() const
status of stream is good
pid_t pidOtherEnd() const
return PID of the process on the other end of the pipe
bool closed() const
true if closed
int m_flags
flags (e.g. end of file)
BidirMMapPipe & operator>>(BidirMMapPipe &(*manip)(BidirMMapPipe &))
I/O manipulator support.
mmap doesn&#39;t work, have to copy back and forth
Definition: BidirMMapPipe.h:49
static unsigned pagesize()
return the logical page size
Definition: BidirMMapPipe.h:86