Logo ROOT   6.07/09
Reference Guide
BidirMMapPipe.h
Go to the documentation of this file.
1 /** @file BidirMMapPipe.h
2  *
3  * header file for BidirMMapPipe, a class which forks off a child process and
4  * serves as communications channel between parent and child
5  *
6  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
7  * @date 2013-07-07
8  */
9 
10 #ifndef BIDIRMMAPPIPE_H
11 #define BIDIRMMAPPIPE_H
12 
13 #include <list>
14 #include <vector>
15 #include <cassert>
16 #include <cstring>
17 #include <unistd.h>
18 
19 #define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
20 #define END_NAMESPACE_ROOFIT }
21 
23 
24 /// namespace for implementation details of BidirMMapPipe
25 namespace BidirMMapPipe_impl {
26  // forward declarations
27  class BidirMMapPipeException;
28  class Page;
29  class PagePool;
30  class Pages;
31 
32  /** @brief class representing a chunk of pages
33  *
34  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
35  * @date 2013-07-24
36  *
37  * allocating pages from the OS happens in chunks in order to not exhaust
38  * the maximum allowed number of memory mappings per process; this class
39  * takes care of such a chunk
40  *
41  * a page chunk allows callers to obtain or release pages in groups of
42  * continuous pages of fixed size
43  */
44  class PageChunk {
45  public:
46  /// type of mmap support found
47  typedef enum {
48  Unknown, ///< don't know yet what'll work
49  Copy, ///< mmap doesn't work, have to copy back and forth
50  FileBacked, ///< mmapping a temp file works
51  DevZero, ///< mmapping /dev/zero works
52  Anonymous ///< anonymous mmap works
53  } MMapVariety;
54 
55  private:
56  static unsigned s_physpgsz; ///< system physical page size
57  static unsigned s_pagesize; ///< logical page size (run-time determined)
58  /// mmap variety that works on this system
59  static MMapVariety s_mmapworks;
60 
61  /// convenience typedef
62  typedef BidirMMapPipeException Exception;
63 
64  void* m_begin; ///< pointer to start of mmapped area
65  void* m_end; ///< pointer one behind end of mmapped area
66  // FIXME: cannot keep freelist inline - other end may need that
67  // data, and we'd end up overwriting the page header
68  std::list<void*> m_freelist; ///< free pages list
69  PagePool* m_parent; ///< parent page pool
70  unsigned m_nPgPerGrp; ///< number of pages per group
71  unsigned m_nUsedGrp; ///< number of used page groups
72 
73  /// determine page size at run time
74  static unsigned getPageSize();
75 
76  /// mmap pages, len is length of mmapped area in bytes
77  static void* dommap(unsigned len);
78  /// munmap pages p, len is length of mmapped area in bytes
79  static void domunmap(void* p, unsigned len);
80  /// forbid copying
81  PageChunk(const PageChunk&) {}
82  /// forbid assignment
83  PageChunk& operator=(const PageChunk&) { return *this; }
84  public:
85  /// return the logical page size
86  static unsigned pagesize() { return s_pagesize; }
87  /// return the physical page size of the system
88  static unsigned physPgSz() { return s_physpgsz; }
89  /// return mmap variety support found
90  static MMapVariety mmapVariety() { return s_mmapworks; }
91 
92  /// constructor
93  PageChunk(PagePool* parent, unsigned length, unsigned nPgPerGroup);
94 
95  /// destructor
96  ~PageChunk();
97 
98  /// return if p is contained in this PageChunk
99  bool contains(const Pages& p) const;
100 
101  /// pop a group of pages off the free list
102  Pages pop();
103 
104  /// push a group of pages onto the free list
105  void push(const Pages& p);
106 
107  /// return length of chunk
108  unsigned len() const
109  {
110  return reinterpret_cast<unsigned char*>(m_end) -
111  reinterpret_cast<unsigned char*>(m_begin);
112  }
113  /// return number of pages per page group
114  unsigned nPagesPerGroup() const { return m_nPgPerGrp; }
115 
116  /// return true if no used page groups in this chunk
117  bool empty() const { return !m_nUsedGrp; }
118 
119  /// return true if no free page groups in this chunk
120  bool full() const { return m_freelist.empty(); }
121 
122  /// free all pages except for those pointed to by p
123  void zap(Pages& p);
124  };
125 
126  /** @brief handle class for a number of Pages
127  *
128  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
129  * @date 2013-07-24
130  *
131  * the associated pages are continuous in memory
132  */
133  class Pages {
134  private:
135  /// implementation
136  typedef struct {
137  PageChunk *m_parent; ///< pointer to parent pool
138  Page* m_pages; ///< pointer to first page
139  unsigned m_refcnt; ///< reference counter
140  unsigned char m_npages; ///< length in pages
141  } impl;
142  public:
143  /// default constructor
144  Pages() : m_pimpl(0) { }
145 
146  /// destructor
147  ~Pages();
148 
149  /** @brief copy constructor
150  *
151  * copy Pages handle to new object - old object loses ownership,
152  * and becomes a dangling handle
153  */
154  Pages(const Pages& other);
155 
156  /** @brief assignment operator
157  *
158  * assign Pages handle to new object - old object loses ownership,
159  * and becomes a dangling handle
160  */
161  Pages& operator=(const Pages& other);
162 
163  /// return page size
164  static unsigned pagesize();
165 
166  /// return number of pages accessible
167  unsigned npages() const { return m_pimpl->m_npages; }
168 
169  /// return page number pageno
170  Page* page(unsigned pgno) const;
171 
172  /// return page number pageno
173  Page* operator[](unsigned pgno) const { return page(pgno); }
174 
175  /// perform page to page number mapping
176  unsigned pageno(Page* p) const;
177 
178  /// perform page to page number mapping
179  unsigned operator[](Page* p) const { return pageno(p); }
180 
181  /// swap with other's contents
182  void swap(Pages& other)
183  {
184  impl* tmp = other.m_pimpl;
185  other.m_pimpl = m_pimpl;
186  m_pimpl = tmp;
187  }
188 
189  private:
190  /// page pool is our friend - it's allowed to construct Pages
192 
193  /// pointer to implementation
195 
196  /// constructor
197  Pages(PageChunk* parent, Page* pages, unsigned npg);
198  };
199 }
200 
201 /** @brief BidirMMapPipe creates a bidirectional channel between the current
202  * process and a child it forks.
203  *
204  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
205  * @date 2013-07-07
206  *
207  * This class creates a bidirectional channel between this process and a child
208  * it creates with fork().
209  *
210  * The channel is comrised of a small shared pool of buffer memory mmapped into
211  * both process spaces, and two pipes to synchronise the exchange of data. The
212  * idea behind using the pipes at all is to have some primitive which we can
213  * block on without having to worry about atomic operations or polling, leaving
214  * these tasks to the OS. In case the anonymous mmap cannot be performed on the
215  * OS the code is running on (for whatever reason), the code falls back to
216  * mmapping /dev/zero, mmapping a temporary file, or (if those all fail), a
217  * dynamically allocated buffer which is then transmitted through the pipe(s),
218  * a slightly slower alternative (because the data is copied more often).
219  *
220  * The channel supports five major operations: read(), write(), flush(),
221  * purge() and close(). Reading and writing may block until the required buffer
222  * space is available. Writes may queue up data to be sent to the other end
223  * until either enough pages are full, or the user calls flush which forces
224  * any unsent buffers to be sent to the other end. flush forces any data that
225  * is to be sent to be sent. purge discards any buffered data waiting to be
226  * read and/or sent. Closing the channel on the child returns zero, closing it
227  * on the parent returns the child's exit status.
228  *
229  * The class also provides operator<< and operator>> for C++-style I/O for
230  * basic data types (bool, char, short, int, long, long long, float, double
231  * and their unsigned counterparts). Data is transmitted binary (i.e. no
232  * formatting to strings like std::cout does). There are also overloads to
233  * support C-style zero terminated strings and std::string. In terms of
234  * performance, the former is to be preferred.
235  *
236  * If the caller needs to multiplex input and output to/from several pipes, the
237  * class provides the poll() method which allows to block until an event occurs
238  * on any of the polled pipes.
239  *
240  * After the BidirMMapPipe is closed, no further operations may be performed on
241  * that object, save for the destructor which may still be called.
242  *
243  * If the BidirMMapPipe has not properly been closed, the destructor will call
244  * close. However, the exit code of the child is lost in that case.
245  *
246  * Closing the object causes the mmapped memory to be unmapped and the two
247  * pipes to be closed. We also install an atexit handler in the process of
248  * creating BidirMMapPipes. This ensures that when the current process
249  * terminates, a SIGTERM signal is sent to the child processes created for all
250  * unclosed pipes to avoid leaving zombie processes in the OS's process table.
251  *
252  * BidirMMapPipe creation, closing and destruction are thread safe. If the
253  * BidirMMapPipe is used in more than one thread, the other operations have to
254  * be protected with a mutex (or something similar), though.
255  *
256  * End of file (other end closed its pipe, or died) is indicated with the eof()
257  * method, serious I/O errors set a flags (bad(), fail()), and also throw
258  * exceptions. For normal read/write operations, they can be suppressed (i.e.
259  * error reporting only using flags) with a constructor argument.
260  *
261  * Technicalities:
262  * - there is a pool of mmapped pages, half the pages are allocated to the
263  * parent process, half to the child
264  * - when one side has accumulated enough data (or a flush forces dirty pages
265  * out to the other end), it sends these pages to the other end by writing a
266  * byte containing the page number into the pipe
267  * - the other end (which has the pages mmapped, too) reads the page number(s)
268  * and puts the corresponding pages on its busy list
269  * - as the other ends reads, it frees busy pages, and eventually tries to put
270  * them on the its list; if a page belongs to the other end of the
271  * connection, it is sent back
272  * - lists of pages are sent across the pipe, not individual pages, in order
273  * to minimise the number of read/write operations needed
274  * - when mmap works properly, only one bytes containing the page number of
275  * the page list head is sent back and forth; the contents of that page
276  * allow to access the rest of the page list sent, and page headers on the
277  * list tell the receiving end if the page is free or has to be added to the
278  * busy list
279  * - when mmap does not work, we transfer one byte to indicate the head of the
280  * page list sent, and for each page on the list of sent pages, the page
281  * header and the page payload is sent (if the page is free, we only
282  * transmit the page header, and we never transmit more payload than
283  * the page actually contains)
284  * - in the child, all open BidirMMapPipes but the current one are closed. this
285  * is done for two reasons: first, to conserve file descriptors and address
286  * space. second, if more than one process is meant to use such a
287  * BidirMMapPipe, synchronisation issues arise which can lead to bugs that
288  * are hard to find and understand. it's much better to come up with a design
289  * which does not need pipes to be shared among more than two processes.
290  *
291  * Here is a trivial example of a parent and a child talking to each other over
292  * a BidirMMapPipe:
293  * @code
294  * #include <string>
295  * #include <iostream>
296  * #include <cstdlib>
297  *
298  * #include "BidirMMapPipe.h"
299  *
300  * int simplechild(BidirMMapPipe& pipe)
301  * {
302  * // child does an echo loop
303  * while (pipe.good() && !pipe.eof()) {
304  * // read a string
305  * std::string str;
306  * pipe >> str;
307  * if (!pipe) return -1;
308  * if (pipe.eof()) break;
309  * // check if parent wants us to shut down
310  * if (!str.empty()) {
311  * std::cout << "[CHILD] : read: " << str << std::endl;
312  * str = "... early in the morning?";
313  * }
314  * pipe << str << BidirMMapPipe::flush;
315  * if (str.empty()) break;
316  * if (!pipe) return -1;
317  * std::cout << "[CHILD] : wrote: " << str << std::endl;
318  * }
319  * // send shutdown request acknowledged
320  * pipe << "" << BidirMMapPipe::flush;
321  *
322  * pipe.close();
323  * return 0;
324  * }
325  *
326  * BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
327  * {
328  * BidirMMapPipe *p = new BidirMMapPipe();
329  * if (p->isChild()) {
330  * int retVal = childexec(*p);
331  * delete p;
332  * std::exit(retVal);
333  * }
334  * return p;
335  * }
336  *
337  * int main()
338  * {
339  * std::cout << "[PARENT]: simple challenge-response test, one child:" <<
340  * std::endl;
341  * BidirMMapPipe* pipe = spawnChild(simplechild);
342  * for (int i = 0; i < 5; ++i) {
343  * std::string str("What shall we do with a drunken sailor...");
344  * *pipe << str << BidirMMapPipe::flush;
345  * if (!*pipe) return -1;
346  * std::cout << "[PARENT]: wrote: " << str << std::endl;
347  * *pipe >> str;
348  * if (!*pipe) return -1;
349  * std::cout << "[PARENT]: read: " << str << std::endl;
350  * }
351  * // ask child to shut down
352  * pipe << "" << BidirMMapPipe::flush;
353  * // wait for it to see the shutdown request
354  * std::string s;
355  * pipe >> s;
356  * std::cout << "[PARENT]: exit status of child: " << pipe->close() <<
357  * std::endl;
358  * delete pipe;
359  * return 0;
360  * }
361  * @endcode
362  *
363  * When designing your own protocols to use over the pipe, there are a few
364  * things to bear in mind:
365  * - Do as http does: When building a request, send all the options and
366  * properties of that request with the request itself in a single go (one
367  * flush). Then, the server has everything it needs, and hopefully, it'll
368  * shut up for a while and to let the client do something useful in the
369  * meantime... The same goes when the server replies to the request: include
370  * everything there is to know about the result of the request in the reply.
371  * - The expensive operation should be the request that is made, all other
372  * operations should somehow be formulated as options or properties to that
373  * request.
374  * - Include a shutdown handshake in whatever protocol you send over the
375  * pipe. That way, you can shut things down in a controlled way. Otherwise,
376  * and depending on your OS's scheduling quirks, you may catch a SIGPIPE if
377  * one end closes its pipe while the other is still trying to read.
378  */
380 #ifndef _WIN32
381  public:
382  /// type used to represent sizes
383  typedef std::size_t size_type;
384  /// convenience typedef for BidirMMapPipeException
385  typedef BidirMMapPipe_impl::BidirMMapPipeException Exception;
386  /// flag bits for partial C++ iostream compatibility
387  enum {
388  eofbit = 1, ///< end of file reached
389  failbit = 2, ///< logical failure (e.g. pipe closed)
390  rderrbit = 4, ///< read error
391  wrerrbit = 8, ///< write error
392  badbit = rderrbit | wrerrbit, ///< general I/O error
393  exceptionsbit = 16 ///< error reporting with exceptions
394  };
395 
396  /** @brief constructor (forks!)
397  *
398  * Creates a bidirectional communications channel between this process
399  * and a child the constructor forks. On return from the constructor,
400  * isParent() and isChild() can be used to tell the parent end from the
401  * child end of the pipe. In the child, all other open BidirMMapPipes
402  * are closed.
403  *
404  * @param useExceptions read()/write() error reporting also done using
405  * exceptions
406  * @param useSocketpair use a socketpair instead of a pair or pipes
407  *
408  * Normally, exceptions are thrown for all serious I/O errors (apart
409  * from end of file). Setting useExceptions to false will force the
410  * read() and write() methods to only report serious I/O errors using
411  * flags.
412  *
413  * When useSocketpair is true, use a pair of Unix domain sockets
414  * created using socketpair instead a pair of pipes. The advantage is
415  * that only one pair of file descriptors is needed instead of two
416  * pairs which are needed for the pipe pair. Performance should very
417  * similar on most platforms, especially if mmap works, since only
418  * very little data is sent through the pipe(s)/socketpair.
419  */
420  BidirMMapPipe(bool useExceptions = true, bool useSocketpair = false);
421 
422  /** @brief destructor
423  *
424  * closes this end of pipe
425  */
426  ~BidirMMapPipe();
427 
428  /** @brief return the current setting of the debug flag
429  *
430  * @returns an integer with the debug Setting
431  */
432  static int debugflag() { return s_debugflag; }
433 
434  /** @brief set the debug flags
435  *
436  * @param flag debug flags (if zero, no messages are printed)
437  */
438  static void setDebugflag(int flag) { s_debugflag = flag; }
439 
440  /** @brief read from pipe
441  *
442  * @param addr where to put read data
443  * @param sz size of data to read (in bytes)
444  * @returns size of data read, or 0 in case of end-of-file
445  *
446  * read may block until data from other end is available. It will
447  * return 0 if the other end closed the pipe.
448  */
449  size_type read(void* addr, size_type sz);
450 
451  /** @brief wirte to pipe
452  *
453  * @param addr where to get data to write from
454  * @param sz size of data to write (in bytes)
455  * @returns size of data written, or 0 in case of end-of-file
456  *
457  * write may block until data can be written to other end (depends a
458  * bit on available buffer space). It will return 0 if the other end
459  * closed the pipe. The data is queued to be written on the next
460  * convenient occasion, or it can be forced out with flush().
461  */
462  size_type write(const void* addr, size_type sz);
463 
464  /** @brief flush buffers with unwritten data
465  *
466  * This forces unwritten data to be written to the other end. The call
467  * will block until this has been done (or the attempt failed with an
468  * error).
469  */
470  void flush();
471 
472  /** @brief purge buffered data waiting to be read and/or written
473  *
474  * Discards all internal buffers.
475  */
476  void purge();
477 
478  /** @brief number of bytes that can be read without blocking
479  *
480  * @returns number of bytes that can be read without blocking
481  */
482  size_type bytesReadableNonBlocking();
483 
484  /** @brief number of bytes that can be written without blocking
485  *
486  * @returns number of bytes that can be written without blocking
487  */
488  size_type bytesWritableNonBlocking();
489 
490  /** @brief flush buffers, close pipe
491  *
492  * Flush buffers, discard unread data, closes the pipe. If the pipe is
493  * in the parent process, it waits for the child.
494  *
495  * @returns exit code of child process in parent, zero in child
496  */
497  int close();
498 
499  /** @brief return PID of the process on the other end of the pipe
500  *
501  * @returns PID of the process running on the remote end
502  */
503  pid_t pidOtherEnd() const
504  { return isChild() ? m_parentPid : m_childPid; }
505 
506  /// condition flags for poll
507  enum PollFlags {
508  None = 0, ///< nothing special on this pipe
509  Readable = 1, ///< pipe has data for reading
510  Writable = 2, ///< pipe can be written to
511  ReadError = 4, ///< pipe error read end
512  WriteError = 8, ///< pipe error Write end
513  Error = ReadError | WriteError, ///< pipe error
514  ReadEndOfFile = 32, ///< read pipe in end-of-file state
515  WriteEndOfFile = 64,///< write pipe in end-of-file state
516  EndOfFile = ReadEndOfFile | WriteEndOfFile, ///< end of file
517  ReadInvalid = 64, ///< read end of pipe invalid
518  WriteInvalid = 128, ///< write end of pipe invalid
519  Invalid = ReadInvalid | WriteInvalid ///< invalid pipe
520  };
521 
522  /// for poll() interface
523  class PollEntry {
524  public:
525  BidirMMapPipe* pipe; ///< pipe of interest
526  unsigned events; ///< events of interest (or'ed bitmask)
527  unsigned revents; ///< events that happened (or'ed bitmask)
528  /// poll a pipe for all events
530  pipe(_pipe), events(None), revents(None) { }
531  /// poll a pipe for specified events
532  PollEntry(BidirMMapPipe* _pipe, int _events) :
533  pipe(_pipe), events(_events), revents(None) { }
534  };
535  /// convenience typedef for poll() interface
536  typedef std::vector<PollEntry> PollVector;
537 
538  /** @brief poll a set of pipes for events (ready to read from, ready to
539  * write to, error)
540  *
541  * @param pipes set of pipes to check
542  * @param timeout timeout in milliseconds
543  * @returns positive number: number of pipes which have
544  * status changes, 0: timeout, or no pipes with
545  * status changed, -1 on error
546  *
547  * Timeout can be zero (check for specified events, and return), finite
548  * (wait at most timeout milliseconds before returning), or -1
549  * (infinite). The poll method returns when the timeout has elapsed,
550  * or if an event occurs on one of the pipes being polled, whichever
551  * happens earlier.
552  *
553  * Pipes is a vector of one or more PollEntries, which each list a pipe
554  * and events to poll for. If events is left empty (zero), all
555  * conditions are polled for, otherwise only the indicated ones. On
556  * return, the revents fields contain the events that occurred for each
557  * pipe; error Error, EndOfFile or Invalid events are always set,
558  * regardless of wether they were in the set of requested events.
559  *
560  * poll may block slightly longer than specified by timeout due to OS
561  * timer granularity and OS scheduling. Due to its implementation, the
562  * poll call can also return early if the remote end of the page sends
563  * a free page while polling (which is put on that pipe's freelist),
564  * while that pipe is polled for e.g Reading. The status of the pipe is
565  * indicated correctly in revents, and the caller can simply poll
566  * again. (The reason this is done this way is because it helps to
567  * replenish the pool of free pages and queue busy pages without
568  * blocking.)
569  *
570  * Here's a piece of example code waiting on two pipes; if they become
571  * readable they are read:
572  * @code
573  * #include <unistd.h>
574  * #include <cstdlib>
575  * #include <string>
576  * #include <sstream>
577  * #include <iostream>
578  *
579  * #include "BidirMMapPipe.h"
580  *
581  * // what to execute in the child
582  * int randomchild(BidirMMapPipe& pipe)
583  * {
584  * ::srand48(::getpid());
585  * for (int i = 0; i < 5; ++i) {
586  * // sleep a random time between 0 and .9 seconds
587  * ::usleep(int(1e6 * ::drand48()));
588  * std::ostringstream buf;
589  * buf << "child pid " << ::getpid() << " sends message " << i;
590  * std::cout << "[CHILD] : " << buf.str() << std::endl;
591  * pipe << buf.str() << BidirMMapPipe::flush;
592  * if (!pipe) return -1;
593  * if (pipe.eof()) break;
594  * }
595  * // tell parent we're done
596  * pipe << "" << BidirMMapPipe::flush;
597  * // wait for parent to acknowledge
598  * std::string s;
599  * pipe >> s;
600  * pipe.close();
601  * return 0;
602  * }
603  *
604  * // function to spawn a child
605  * BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
606  * {
607  * BidirMMapPipe *p = new BidirMMapPipe();
608  * if (p->isChild()) {
609  * int retVal = childexec(*p);
610  * delete p;
611  * std::exit(retVal);
612  * }
613  * return p;
614  * }
615  *
616  * int main()
617  * {
618  * typedef BidirMMapPipe::PollEntry PollEntry;
619  * // poll data structure
620  * BidirMMapPipe::PollVector pipes;
621  * pipes.reserve(3);
622  * // spawn children
623  * for (int i = 0; i < 3; ++i) {
624  * pipes.push_back(PollEntry(spawnChild(randomchild),
625  * BidirMMapPipe::Readable));
626  * }
627  * // while at least some children alive
628  * while (!pipes.empty()) {
629  * // poll, wait until status change (infinite timeout)
630  * int npipes = BidirMMapPipe::poll(pipes, -1);
631  * // scan for pipes with changed status
632  * for (std::vector<PollEntry>::iterator it = pipes.begin();
633  * npipes && pipes.end() != it; ) {
634  * if (!it->revents) {
635  * // unchanged, next one
636  * ++it;
637  * continue;
638  * }
639  * --npipes; // maybe we can stop early...
640  * // read from pipes which are readable
641  * if (it->revents & BidirMMapPipe::Readable) {
642  * std::string s;
643  * *(it->pipe) >> s;
644  * if (!s.empty()) {
645  * std::cout << "[PARENT]: Read from pipe " <<
646  * it->pipe << ": " << s << std::endl;
647  * ++it;
648  * continue;
649  * } else {
650  * // child is shutting down...
651  * *(it->pipe) << "" << BidirMMapPipe::flush;
652  * goto childcloses;
653  * }
654  * }
655  * // retire pipes with error or end-of-file condition
656  * if (it->revents & (BidirMMapPipe::Error |
657  * BidirMMapPipe::EndOfFile |
658  * BidirMMapPipe::Invalid)) {
659  * std::cout << "[PARENT]: Error on pipe " <<
660  * it->pipe << " revents " << it->revents <<
661  * std::endl;
662  * childcloses:
663  * std::cout << "[PARENT]:\tchild exit status: " <<
664  * it->pipe->close() << std::endl;
665  * if (retVal) return retVal;
666  * delete it->pipe;
667  * it = pipes.erase(it);
668  * continue;
669  * }
670  * }
671  * }
672  * return 0;
673  * }
674  * @endcode
675  */
676  static int poll(PollVector& pipes, int timeout);
677 
678  /** @brief return if this end of the pipe is the parent end
679  *
680  * @returns true if parent end of pipe
681  */
682  bool isParent() const { return m_childPid; }
683 
684  /** @brief return if this end of the pipe is the child end
685  *
686  * @returns true if child end of pipe
687  */
688  bool isChild() const { return !m_childPid; }
689 
690  /** @brief if BidirMMapPipe uses a socketpair for communications
691  *
692  * @returns true if BidirMMapPipe uses a socketpair for communications
693  */
694  bool usesSocketpair() const { return m_inpipe == m_outpipe; }
695 
696  /** @brief if BidirMMapPipe uses a pipe pair for communications
697  *
698  * @returns true if BidirMMapPipe uses a pipe pair for communications
699  */
700  bool usesPipepair() const { return m_inpipe != m_outpipe; }
701 
702  /** @brief return flags (end of file, BidirMMapPipe closed, ...)
703  *
704  * @returns flags (end of file, BidirMMapPipe closed, ...)
705  */
706  int rdstate() const { return m_flags; }
707 
708  /** @brief true if end-of-file
709  *
710  * @returns true if end-of-file
711  */
712  bool eof() const { return m_flags & eofbit; }
713 
714  /** @brief logical failure (e.g. I/O on closed BidirMMapPipe)
715  *
716  * @returns true in case of grave logical error (I/O on closed pipe,...)
717  */
718  bool fail() const { return m_flags & failbit; }
719 
720  /** @brief true on I/O error
721  *
722  * @returns true on I/O error
723  */
724  bool bad() const { return m_flags & badbit; }
725 
726  /** @brief status of stream is good
727  *
728  * @returns true if pipe is good (no errors, eof, ...)
729  */
730  bool good() const { return !(m_flags & (eofbit | failbit | badbit)); }
731 
732  /** @brief true if closed
733  *
734  * @returns true if stream is closed
735  */
736  bool closed() const { return m_flags & failbit; }
737 
738  /** @brief return true if not serious error (fail/bad)
739  *
740  * @returns true if stream is does not have serious error (fail/bad)
741  *
742  * (if EOF, this is still true)
743  */
744  operator bool() const { return !fail() && !bad(); }
745 
746  /** @brief return true if serious error (fail/bad)
747  *
748  * @returns true if stream has a serious error (fail/bad)
749  */
750  bool operator!() const { return fail() || bad(); }
751 
752 #ifdef STREAMOP
753 #undef STREAMOP
754 #endif
755 #define STREAMOP(TYPE) \
756  BidirMMapPipe& operator<<(const TYPE& val) \
757  { write(&val, sizeof(TYPE)); return *this; } \
758  BidirMMapPipe& operator>>(TYPE& val) \
759  { read(&val, sizeof(TYPE)); return *this; }
760  STREAMOP(bool); ///< C++ style stream operators for bool
761  STREAMOP(char); ///< C++ style stream operators for char
762  STREAMOP(short); ///< C++ style stream operators for short
763  STREAMOP(int); ///< C++ style stream operators for int
764  STREAMOP(long); ///< C++ style stream operators for long
765  STREAMOP(long long); ///< C++ style stream operators for long long
766  STREAMOP(unsigned char); ///< C++ style stream operators for unsigned char
767  STREAMOP(unsigned short); ///< C++ style stream operators for unsigned short
768  STREAMOP(unsigned int); ///< C++ style stream operators for unsigned int
769  STREAMOP(unsigned long); ///< C++ style stream operators for unsigned long
770  STREAMOP(unsigned long long); ///< C++ style stream operators for unsigned long long
771  STREAMOP(float); ///< C++ style stream operators for float
772  STREAMOP(double); ///< C++ style stream operators for double
773 #undef STREAMOP
774 
775  /** @brief write a C-style string
776  *
777  * @param str C-style string
778  * @returns pipe written to
779  */
780  BidirMMapPipe& operator<<(const char* str);
781 
782  /** @brief read a C-style string
783  *
784  * @param str pointer to string (space allocated with malloc!)
785  * @returns pipe read from
786  *
787  * since this is for C-style strings, we use malloc/realloc/free for
788  * strings. passing in a NULL pointer is valid here, and the routine
789  * will use realloc to allocate a chunk of memory of the right size.
790  */
791  BidirMMapPipe& operator>>(char* (&str));
792 
793  /** @brief write a std::string object
794  *
795  * @param str string to write
796  * @returns pipe written to
797  */
798  BidirMMapPipe& operator<<(const std::string& str);
799 
800  /** @brief read a std::string object
801  *
802  * @param str string to be read
803  * @returns pipe read from
804  */
805  BidirMMapPipe& operator>>(std::string& str);
806 
807  /** @brief write raw pointer to T to other side
808  *
809  * NOTE: This will not write the pointee! Only the value of the
810  * pointer is transferred.
811  *
812  * @param tptr pointer to be written
813  * @returns pipe written to
814  */
815  template<class T> BidirMMapPipe& operator<<(const T* tptr)
816  { write(&tptr, sizeof(tptr)); return *this; }
817 
818  /** @brief read raw pointer to T from other side
819  *
820  * NOTE: This will not read the pointee! Only the value of the
821  * pointer is transferred.
822  *
823  * @param tptr pointer to be read
824  * @returns pipe read from
825  */
826  template<class T> BidirMMapPipe& operator>>(T* &tptr)
827  { read(&tptr, sizeof(tptr)); return *this; }
828 
829  /** @brief I/O manipulator support
830  *
831  * @param manip manipulator
832  * @returns pipe with manipulator applied
833  *
834  * example:
835  * @code
836  * pipe << BidirMMapPipe::flush;
837  * @endcode
838  */
840  { return manip(*this); }
841 
842  /** @brief I/O manipulator support
843  *
844  * @param manip manipulator
845  * @returns pipe with manipulator applied
846  *
847  * example:
848  * @code
849  * pipe >> BidirMMapPipe::purge;
850  * @endcode
851  */
853  { return manip(*this); }
854 
855  /// for usage a la "pipe << flush;"
856  static BidirMMapPipe& flush(BidirMMapPipe& pipe) { pipe.flush(); return pipe; }
857  /// for usage a la "pipe << purge;"
858  static BidirMMapPipe& purge(BidirMMapPipe& pipe) { pipe.purge(); return pipe; }
859 
860  private:
861  /// copy-construction forbidden
863  /// assignment forbidden
864  BidirMMapPipe& operator=(const BidirMMapPipe&) { return *this; }
865 
866  /// page is our friend
867  friend class BidirMMapPipe_impl::Page;
868  /// convenience typedef for Page
869  typedef BidirMMapPipe_impl::Page Page;
870 
871  /// tuning constants
872  enum {
873  // TotPages = 16 will give 32k buffers at 4k page size for both
874  // parent and child; if your average message to send is larger
875  // than this, consider raising the value (max 256)
876  TotPages = 16, ///< pages shared (child + parent)
877 
878  PagesPerEnd = TotPages / 2, ///< pages per pipe end
879 
880  // if FlushThresh pages are filled, the code forces a flush; 3/4
881  // of the pages available seems to work quite well
882  FlushThresh = (3 * PagesPerEnd) / 4 ///< flush threshold
883  };
884 
885  // per-class members
886  static pthread_mutex_t s_openpipesmutex; ///< protects s_openpipes
887  /// list of open BidirMMapPipes
888  static std::list<BidirMMapPipe*> s_openpipes;
889  /// pool of mmapped pages
890  static BidirMMapPipe_impl::PagePool* s_pagepool;
891  /// page pool reference counter
892  static unsigned s_pagepoolrefcnt;
893  /// debug flag
894  static int s_debugflag;
895 
896  /// return page pool
897  static BidirMMapPipe_impl::PagePool& pagepool();
898 
899  // per-instance members
900  BidirMMapPipe_impl::Pages m_pages; ///< mmapped pages
901  Page* m_busylist; ///< linked list: busy pages (data to be read)
902  Page* m_freelist; ///< linked list: free pages
903  Page* m_dirtylist; ///< linked list: dirty pages (data to be sent)
904  int m_inpipe; ///< pipe end from which data may be read
905  int m_outpipe; ///< pipe end to which data may be written
906  int m_flags; ///< flags (e.g. end of file)
907  pid_t m_childPid; ///< pid of the child (zero if we're child)
908  pid_t m_parentPid; ///< pid of the parent
909 
910  /// cleanup routine - at exit, we want our children to get a SIGTERM...
911  static void teardownall(void);
912 
913  /// return length of a page list
914  static unsigned lenPageList(const Page* list);
915 
916  /** "feed" the busy and free lists with a list of pages
917  *
918  * @param plist linked list of pages
919  *
920  * goes through plist, puts free pages from plist onto the freelist
921  * (or sends them to the remote end if they belong there), and puts
922  * non-empty pages on plist onto the busy list
923  */
924  void feedPageLists(Page* plist);
925 
926  /// put on dirty pages list
927  void markPageDirty(Page* p);
928 
929  /// transfer bytes through the pipe (reading, writing, may block)
930  static size_type xferraw(int fd, void* addr, size_type len,
931  ssize_t (*xferfn)(int, void*, std::size_t));
932  /// transfer bytes through the pipe (reading, writing, may block)
933  static size_type xferraw(int fd, void* addr, const size_type len,
934  ssize_t (*xferfn)(int, const void*, std::size_t))
935  {
936  return xferraw(fd, addr, len,
937  reinterpret_cast<ssize_t (*)(
938  int, void*, std::size_t)>(xferfn));
939  }
940 
941  /** @brief send page(s) to the other end (may block)
942  *
943  * @param plist linked list of pages to send
944  *
945  * the implementation gathers the different write(s) whereever
946  * possible; if mmap works, this results in a single write to transfer
947  * the list of pages sent, if we need to copy things through the pipe,
948  * we have one write to transfer which pages are sent, and then one
949  * write per page.
950  */
951  void sendpages(Page* plist);
952 
953  /** @brief receive a pages from the other end (may block), queue them
954  *
955  * @returns number of pages received
956  *
957  * this is an application-level scatter read, which gets the list of
958  * pages to read from the pipe. if mmap works, it needs only one read
959  * call (to get the head of the list of pages transferred). if we need
960  * to copy pages through the pipe, we need to add one read for each
961  * empty page, and two reads for each non-empty page.
962  */
963  unsigned recvpages();
964 
965  /** @brief receive pages from other end (non-blocking)
966  *
967  * @returns number of pages received
968  *
969  * like recvpages(), but does not block if nothing is available for
970  * reading
971  */
972  unsigned recvpages_nonblock();
973 
974  /// get a busy page to read data from (may block)
975  Page* busypage();
976  /// get a dirty page to write data to (may block)
977  Page* dirtypage();
978 
979  /// close the pipe (no flush if forced)
980  int doClose(bool force, bool holdlock = false);
981  /// perform the flush
982  void doFlush(bool forcePartialPages = true);
983 #endif //_WIN32
984 };
985 
987 
988 #undef BEGIN_NAMESPACE_ROOFIT
989 #undef END_NAMESPACE_ROOFIT
990 
991 #endif // BIDIRMMAPPIPE_H
992 
993 // vim: ft=cpp:sw=4:tw=78:et
pid_t pidOtherEnd() const
return PID of the process on the other end of the pipe
std::size_t size_type
type used to represent sizes
#define BEGIN_NAMESPACE_ROOFIT
Definition: BidirMMapPipe.h:19
PagePool * m_parent
parent page pool
Definition: BidirMMapPipe.h:69
unsigned char m_npages
length in pages
double read(const std::string &file_name)
reading
BidirMMapPipe & operator<<(const T *tptr)
write raw pointer to T to other side
BidirMMapPipe * pipe
pipe of interest
PollEntry(BidirMMapPipe *_pipe, int _events)
poll a pipe for specified events
bool eof() const
true if end-of-file
unsigned revents
events that happened (or&#39;ed bitmask)
double write(int n, const std::string &file_name, const std::string &vector_type, int compress=0)
writing
bool isChild() const
return if this end of the pipe is the child end
handle class for a number of Pages
PageChunk(const PageChunk &)
forbid copying
Definition: BidirMMapPipe.h:81
impl * m_pimpl
pointer to implementation
double T(double x)
Definition: ChebyshevPol.h:34
unsigned m_refcnt
reference counter
Page * m_freelist
linked list: free pages
bool bad() const
true on I/O error
unsigned npages() const
return number of pages accessible
don&#39;t know yet what&#39;ll work
Definition: BidirMMapPipe.h:48
BidirMMapPipeException Exception
convenience typedef
Definition: BidirMMapPipe.h:62
namespace for implementation details of BidirMMapPipe
unsigned events
events of interest (or&#39;ed bitmask)
PollEntry(BidirMMapPipe *_pipe)
poll a pipe for all events
static MMapVariety s_mmapworks
mmap variety that works on this system
Definition: BidirMMapPipe.h:59
#define STREAMOP(TYPE)
unsigned m_nUsedGrp
number of used page groups
Definition: BidirMMapPipe.h:71
pid_t m_childPid
pid of the child (zero if we&#39;re child)
static size_type xferraw(int fd, void *addr, const size_type len, ssize_t(*xferfn)(int, const void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
TBuffer & operator>>(TBuffer &buf, Tmpl *&obj)
Definition: TBuffer.h:370
TBuffer & operator<<(TBuffer &buf, const Tmpl *obj)
Definition: TBuffer.h:386
unsigned nPagesPerGroup() const
return number of pages per page group
class representing a chunk of pages
Definition: BidirMMapPipe.h:44
Page * m_busylist
linked list: busy pages (data to be read)
bool good() const
status of stream is good
static MMapVariety mmapVariety()
return mmap variety support found
Definition: BidirMMapPipe.h:90
void flush()
flush buffers with unwritten data
BidirMMapPipe_impl::Page Page
convenience typedef for Page
Page * m_dirtylist
linked list: dirty pages (data to be sent)
PollFlags
condition flags for poll
#define END_NAMESPACE_ROOFIT
Definition: BidirMMapPipe.h:20
unsigned len() const
return length of chunk
static unsigned physPgSz()
return the physical page size of the system
Definition: BidirMMapPipe.h:88
Pages()
default constructor
Page * operator[](unsigned pgno) const
return page number pageno
static unsigned s_physpgsz
system physical page size
Definition: BidirMMapPipe.h:56
void swap(Pages &other)
swap with other&#39;s contents
void purge()
purge buffered data waiting to be read and/or written
std::vector< PollEntry > PollVector
convenience typedef for poll() interface
void Error(const char *location, const char *msgfmt,...)
static int debugflag()
return the current setting of the debug flag
unsigned m_nPgPerGrp
number of pages per group
Definition: BidirMMapPipe.h:70
#define None
Definition: TGWin32.h:59
BidirMMapPipe & operator=(const BidirMMapPipe &)
assignment forbidden
bool fail() const
logical failure (e.g.
static BidirMMapPipe & purge(BidirMMapPipe &pipe)
for usage a la "pipe << purge;"
Pages pop()
pop a group of pages off the free list
static void setDebugflag(int flag)
set the debug flags
int m_outpipe
pipe end to which data may be written
bool contains(const Pages &p) const
return if p is contained in this PageChunk
void * m_begin
pointer to start of mmapped area
Definition: BidirMMapPipe.h:64
static BidirMMapPipe & flush(BidirMMapPipe &pipe)
for usage a la "pipe << flush;"
bool usesSocketpair() const
if BidirMMapPipe uses a socketpair for communications
bool full() const
return true if no free page groups in this chunk
BidirMMapPipe creates a bidirectional channel between the current process and a child it forks...
pid_t m_parentPid
pid of the parent
static void domunmap(void *p, unsigned len)
munmap pages p, len is length of mmapped area in bytes
static pthread_mutex_t s_openpipesmutex
protects s_openpipes
static int s_debugflag
debug flag
bool isParent() const
return if this end of the pipe is the parent end
static BidirMMapPipe_impl::PagePool * s_pagepool
pool of mmapped pages
bool empty() const
return true if no used page groups in this chunk
bool closed() const
true if closed
MMapVariety
type of mmap support found
Definition: BidirMMapPipe.h:47
PageChunk & operator=(const PageChunk &)
forbid assignment
Definition: BidirMMapPipe.h:83
void zap(Pages &p)
free all pages except for those pointed to by p
unsigned operator[](Page *p) const
perform page to page number mapping
for poll() interface
int rdstate() const
return flags (end of file, BidirMMapPipe closed, ...)
static void * dommap(unsigned len)
mmap pages, len is length of mmapped area in bytes
static unsigned getPageSize()
determine page size at run time
std::list< void * > m_freelist
free pages list
Definition: BidirMMapPipe.h:68
void * m_end
pointer one behind end of mmapped area
Definition: BidirMMapPipe.h:65
PageChunk * m_parent
pointer to parent pool
BidirMMapPipe_impl::BidirMMapPipeException Exception
convenience typedef for BidirMMapPipeException
void push(const Pages &p)
push a group of pages onto the free list
int m_inpipe
pipe end from which data may be read
BidirMMapPipe & operator>>(T *&tptr)
read raw pointer to T from other side
BidirMMapPipe & operator<<(BidirMMapPipe &(*manip)(BidirMMapPipe &))
I/O manipulator support.
Page * m_pages
pointer to first page
BidirMMapPipe_impl::Pages m_pages
mmapped pages
static std::list< BidirMMapPipe * > s_openpipes
list of open BidirMMapPipes
static unsigned s_pagepoolrefcnt
page pool reference counter
static unsigned s_pagesize
logical page size (run-time determined)
Definition: BidirMMapPipe.h:57
bool usesPipepair() const
if BidirMMapPipe uses a pipe pair for communications
int m_flags
flags (e.g. end of file)
BidirMMapPipe & operator>>(BidirMMapPipe &(*manip)(BidirMMapPipe &))
I/O manipulator support.
mmap doesn&#39;t work, have to copy back and forth
Definition: BidirMMapPipe.h:49
static unsigned pagesize()
return the logical page size
Definition: BidirMMapPipe.h:86
bool operator!() const
return true if serious error (fail/bad)