ROOT  6.07/01
Reference Guide
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
BidirMMapPipe.h
Go to the documentation of this file.
1 /** @file BidirMMapPipe.h
2  *
3  * header file for BidirMMapPipe, a class which forks off a child process and
4  * serves as communications channel between parent and child
5  *
6  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
7  * @date 2013-07-07
8  */
9 
10 #ifndef BIDIRMMAPPIPE_H
11 #define BIDIRMMAPPIPE_H
12 
13 #include <list>
14 #include <vector>
15 #include <cassert>
16 #include <cstring>
17 #include <unistd.h>
18 
19 #define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
20 #define END_NAMESPACE_ROOFIT }
21 
23 
24 /// namespace for implementation details of BidirMMapPipe
25 namespace BidirMMapPipe_impl {
26  // forward declarations
27  class BidirMMapPipeException;
28  class Page;
29  class PagePool;
30  class Pages;
31 
32  /** @brief class representing a chunk of pages
33  *
34  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
35  * @date 2013-07-24
36  *
37  * allocating pages from the OS happens in chunks in order to not exhaust
38  * the maximum allowed number of memory mappings per process; this class
39  * takes care of such a chunk
40  *
41  * a page chunk allows callers to obtain or release pages in groups of
42  * continuous pages of fixed size
43  */
44  class PageChunk {
45  public:
46  /// type of mmap support found
47  typedef enum {
48  Unknown, ///< don't know yet what'll work
49  Copy, ///< mmap doesn't work, have to copy back and forth
50  FileBacked, ///< mmapping a temp file works
51  DevZero, ///< mmapping /dev/zero works
52  Anonymous ///< anonymous mmap works
53  } MMapVariety;
54 
55  private:
56  static unsigned s_pagesize; ///< system page size (run-time determined)
57  /// mmap variety that works on this system
59 
60  /// convenience typedef
61  typedef BidirMMapPipeException Exception;
62 
63  void* m_begin; ///< pointer to start of mmapped area
64  void* m_end; ///< pointer one behind end of mmapped area
65  // FIXME: cannot keep freelist inline - other end may need that
66  // data, and we'd end up overwriting the page header
67  std::list<void*> m_freelist; ///< free pages list
68  PagePool* m_parent; ///< parent page pool
69  unsigned m_nPgPerGrp; ///< number of pages per group
70  unsigned m_nUsedGrp; ///< number of used page groups
71 
72  /// determine page size at run time
73  static unsigned getPageSize();
74 
75  /// mmap pages, len is length of mmapped area in bytes
76  static void* dommap(unsigned len);
77  /// munmap pages p, len is length of mmapped area in bytes
78  static void domunmap(void* p, unsigned len);
79  /// forbid copying
80  PageChunk(const PageChunk&) {}
81  /// forbid assignment
82  PageChunk& operator=(const PageChunk&) { return *this; }
83  public:
84  /// return the page size of the system
85  static unsigned pagesize() { return s_pagesize; }
86  /// return mmap variety support found
87  static MMapVariety mmapVariety() { return s_mmapworks; }
88 
89  /// constructor
90  PageChunk(PagePool* parent, unsigned length, unsigned nPgPerGroup);
91 
92  /// destructor
93  ~PageChunk();
94 
95  /// return if p is contained in this PageChunk
96  bool contains(const Pages& p) const;
97 
98  /// pop a group of pages off the free list
99  Pages pop();
100 
101  /// push a group of pages onto the free list
102  void push(const Pages& p);
103 
104  /// return length of chunk
105  unsigned len() const
106  {
107  return reinterpret_cast<unsigned char*>(m_end) -
108  reinterpret_cast<unsigned char*>(m_begin);
109  }
110  /// return number of pages per page group
111  unsigned nPagesPerGroup() const { return m_nPgPerGrp; }
112 
113  /// return true if no used page groups in this chunk
114  bool empty() const { return !m_nUsedGrp; }
115 
116  /// return true if no free page groups in this chunk
117  bool full() const { return m_freelist.empty(); }
118 
119  /// free all pages except for those pointed to by p
120  void zap(Pages& p);
121  };
122 
123  /** @brief handle class for a number of Pages
124  *
125  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
126  * @date 2013-07-24
127  *
128  * the associated pages are continuous in memory
129  */
130  class Pages {
131  private:
132  /// implementation
133  typedef struct {
134  PageChunk *m_parent; ///< pointer to parent pool
135  Page* m_pages; ///< pointer to first page
136  unsigned m_refcnt; ///< reference counter
137  unsigned char m_npages; ///< length in pages
138  } impl;
139  public:
140  /// default constructor
141  Pages() : m_pimpl(0) { }
142 
143  /// destructor
144  ~Pages();
145 
146  /** @brief copy constructor
147  *
148  * copy Pages handle to new object - old object loses ownership,
149  * and becomes a dangling handle
150  */
151  Pages(const Pages& other);
152 
153  /** @brief assignment operator
154  *
155  * assign Pages handle to new object - old object loses ownership,
156  * and becomes a dangling handle
157  */
158  Pages& operator=(const Pages& other);
159 
160  /// return page size
161  static unsigned pagesize();
162 
163  /// return number of pages accessible
164  unsigned npages() const { return m_pimpl->m_npages; }
165 
166  /// return page number pageno
167  Page* page(unsigned pgno) const;
168 
169  /// return page number pageno
170  Page* operator[](unsigned pgno) const { return page(pgno); }
171 
172  /// perform page to page number mapping
173  unsigned pageno(Page* p) const;
174 
175  /// perform page to page number mapping
176  unsigned operator[](Page* p) const { return pageno(p); }
177 
178  /// swap with other's contents
179  void swap(Pages& other)
180  {
181  impl* tmp = other.m_pimpl;
182  other.m_pimpl = m_pimpl;
183  m_pimpl = tmp;
184  }
185 
186  private:
187  /// page pool is our friend - it's allowed to construct Pages
189 
190  /// pointer to implementation
192 
193  /// constructor
194  Pages(PageChunk* parent, Page* pages, unsigned npg);
195  };
196 }
197 
198 /** @brief BidirMMapPipe creates a bidirectional channel between the current
199  * process and a child it forks.
200  *
201  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
202  * @date 2013-07-07
203  *
204  * This class creates a bidirectional channel between this process and a child
205  * it creates with fork().
206  *
207  * The channel is comrised of a small shared pool of buffer memory mmapped into
208  * both process spaces, and two pipes to synchronise the exchange of data. The
209  * idea behind using the pipes at all is to have some primitive which we can
210  * block on without having to worry about atomic operations or polling, leaving
211  * these tasks to the OS. In case the anonymous mmap cannot be performed on the
212  * OS the code is running on (for whatever reason), the code falls back to
213  * mmapping /dev/zero, mmapping a temporary file, or (if those all fail), a
214  * dynamically allocated buffer which is then transmitted through the pipe(s),
215  * a slightly slower alternative (because the data is copied more often).
216  *
217  * The channel supports five major operations: read(), write(), flush(),
218  * purge() and close(). Reading and writing may block until the required buffer
219  * space is available. Writes may queue up data to be sent to the other end
220  * until either enough pages are full, or the user calls flush which forces
221  * any unsent buffers to be sent to the other end. flush forces any data that
222  * is to be sent to be sent. purge discards any buffered data waiting to be
223  * read and/or sent. Closing the channel on the child returns zero, closing it
224  * on the parent returns the child's exit status.
225  *
226  * The class also provides operator<< and operator>> for C++-style I/O for
227  * basic data types (bool, char, short, int, long, long long, float, double
228  * and their unsigned counterparts). Data is transmitted binary (i.e. no
229  * formatting to strings like std::cout does). There are also overloads to
230  * support C-style zero terminated strings and std::string. In terms of
231  * performance, the former is to be preferred.
232  *
233  * If the caller needs to multiplex input and output to/from several pipes, the
234  * class provides the poll() method which allows to block until an event occurs
235  * on any of the polled pipes.
236  *
237  * After the BidirMMapPipe is closed, no further operations may be performed on
238  * that object, save for the destructor which may still be called.
239  *
240  * If the BidirMMapPipe has not properly been closed, the destructor will call
241  * close. However, the exit code of the child is lost in that case.
242  *
243  * Closing the object causes the mmapped memory to be unmapped and the two
244  * pipes to be closed. We also install an atexit handler in the process of
245  * creating BidirMMapPipes. This ensures that when the current process
246  * terminates, a SIGTERM signal is sent to the child processes created for all
247  * unclosed pipes to avoid leaving zombie processes in the OS's process table.
248  *
249  * BidirMMapPipe creation, closing and destruction are thread safe. If the
250  * BidirMMapPipe is used in more than one thread, the other operations have to
251  * be protected with a mutex (or something similar), though.
252  *
253  * End of file (other end closed its pipe, or died) is indicated with the eof()
254  * method, serious I/O errors set a flags (bad(), fail()), and also throw
255  * exceptions. For normal read/write operations, they can be suppressed (i.e.
256  * error reporting only using flags) with a constructor argument.
257  *
258  * Technicalities:
259  * - there is a pool of mmapped pages, half the pages are allocated to the
260  * parent process, half to the child
261  * - when one side has accumulated enough data (or a flush forces dirty pages
262  * out to the other end), it sends these pages to the other end by writing a
263  * byte containing the page number into the pipe
264  * - the other end (which has the pages mmapped, too) reads the page number(s)
265  * and puts the corresponding pages on its busy list
266  * - as the other ends reads, it frees busy pages, and eventually tries to put
267  * them on the its list; if a page belongs to the other end of the
268  * connection, it is sent back
269  * - lists of pages are sent across the pipe, not individual pages, in order
270  * to minimise the number of read/write operations needed
271  * - when mmap works properly, only one bytes containing the page number of
272  * the page list head is sent back and forth; the contents of that page
273  * allow to access the rest of the page list sent, and page headers on the
274  * list tell the receiving end if the page is free or has to be added to the
275  * busy list
276  * - when mmap does not work, we transfer one byte to indicate the head of the
277  * page list sent, and for each page on the list of sent pages, the page
278  * header and the page payload is sent (if the page is free, we only
279  * transmit the page header, and we never transmit more payload than
280  * the page actually contains)
281  * - in the child, all open BidirMMapPipes but the current one are closed. this
282  * is done for two reasons: first, to conserve file descriptors and address
283  * space. second, if more than one process is meant to use such a
284  * BidirMMapPipe, synchronisation issues arise which can lead to bugs that
285  * are hard to find and understand. it's much better to come up with a design
286  * which does not need pipes to be shared among more than two processes.
287  *
288  * Here is a trivial example of a parent and a child talking to each other over
289  * a BidirMMapPipe:
290  * @code
291  * #include <string>
292  * #include <iostream>
293  * #include <cstdlib>
294  *
295  * #include "BidirMMapPipe.h"
296  *
297  * int simplechild(BidirMMapPipe& pipe)
298  * {
299  * // child does an echo loop
300  * while (pipe.good() && !pipe.eof()) {
301  * // read a string
302  * std::string str;
303  * pipe >> str;
304  * if (!pipe) return -1;
305  * if (pipe.eof()) break;
306  * // check if parent wants us to shut down
307  * if (!str.empty()) {
308  * std::cout << "[CHILD] : read: " << str << std::endl;
309  * str = "... early in the morning?";
310  * }
311  * pipe << str << BidirMMapPipe::flush;
312  * if (str.empty()) break;
313  * if (!pipe) return -1;
314  * std::cout << "[CHILD] : wrote: " << str << std::endl;
315  * }
316  * // send shutdown request acknowledged
317  * pipe << "" << BidirMMapPipe::flush;
318  *
319  * pipe.close();
320  * return 0;
321  * }
322  *
323  * BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
324  * {
325  * BidirMMapPipe *p = new BidirMMapPipe();
326  * if (p->isChild()) {
327  * int retVal = childexec(*p);
328  * delete p;
329  * std::exit(retVal);
330  * }
331  * return p;
332  * }
333  *
334  * int main()
335  * {
336  * std::cout << "[PARENT]: simple challenge-response test, one child:" <<
337  * std::endl;
338  * BidirMMapPipe* pipe = spawnChild(simplechild);
339  * for (int i = 0; i < 5; ++i) {
340  * std::string str("What shall we do with a drunken sailor...");
341  * *pipe << str << BidirMMapPipe::flush;
342  * if (!*pipe) return -1;
343  * std::cout << "[PARENT]: wrote: " << str << std::endl;
344  * *pipe >> str;
345  * if (!*pipe) return -1;
346  * std::cout << "[PARENT]: read: " << str << std::endl;
347  * }
348  * // ask child to shut down
349  * pipe << "" << BidirMMapPipe::flush;
350  * // wait for it to see the shutdown request
351  * std::string s;
352  * pipe >> s;
353  * std::cout << "[PARENT]: exit status of child: " << pipe->close() <<
354  * std::endl;
355  * delete pipe;
356  * return 0;
357  * }
358  * @endcode
359  *
360  * When designing your own protocols to use over the pipe, there are a few
361  * things to bear in mind:
362  * - Do as http does: When building a request, send all the options and
363  * properties of that request with the request itself in a single go (one
364  * flush). Then, the server has everything it needs, and hopefully, it'll
365  * shut up for a while and to let the client do something useful in the
366  * meantime... The same goes when the server replies to the request: include
367  * everything there is to know about the result of the request in the reply.
368  * - The expensive operation should be the request that is made, all other
369  * operations should somehow be formulated as options or properties to that
370  * request.
371  * - Include a shutdown handshake in whatever protocol you send over the
372  * pipe. That way, you can shut things down in a controlled way. Otherwise,
373  * and depending on your OS's scheduling quirks, you may catch a SIGPIPE if
374  * one end closes its pipe while the other is still trying to read.
375  */
377 #ifndef _WIN32
378  public:
379  /// type used to represent sizes
381  /// convenience typedef for BidirMMapPipeException
382  typedef BidirMMapPipe_impl::BidirMMapPipeException Exception;
383  /// flag bits for partial C++ iostream compatibility
384  enum {
385  eofbit = 1, ///< end of file reached
386  failbit = 2, ///< logical failure (e.g. pipe closed)
387  rderrbit = 4, ///< read error
388  wrerrbit = 8, ///< write error
389  badbit = rderrbit | wrerrbit, ///< general I/O error
390  exceptionsbit = 16 ///< error reporting with exceptions
391  };
392 
393  /** @brief constructor (forks!)
394  *
395  * Creates a bidirectional communications channel between this process
396  * and a child the constructor forks. On return from the constructor,
397  * isParent() and isChild() can be used to tell the parent end from the
398  * child end of the pipe. In the child, all other open BidirMMapPipes
399  * are closed.
400  *
401  * @param useExceptions read()/write() error reporting also done using
402  * exceptions
403  * @param useSocketpair use a socketpair instead of a pair or pipes
404  *
405  * Normally, exceptions are thrown for all serious I/O errors (apart
406  * from end of file). Setting useExceptions to false will force the
407  * read() and write() methods to only report serious I/O errors using
408  * flags.
409  *
410  * When useSocketpair is true, use a pair of Unix domain sockets
411  * created using socketpair instead a pair of pipes. The advantage is
412  * that only one pair of file descriptors is needed instead of two
413  * pairs which are needed for the pipe pair. Performance should very
414  * similar on most platforms, especially if mmap works, since only
415  * very little data is sent through the pipe(s)/socketpair.
416  */
417  BidirMMapPipe(bool useExceptions = true, bool useSocketpair = false);
418 
419  /** @brief destructor
420  *
421  * closes this end of pipe
422  */
423  ~BidirMMapPipe();
424 
425  /** @brief read from pipe
426  *
427  * @param addr where to put read data
428  * @param sz size of data to read (in bytes)
429  * @returns size of data read, or 0 in case of end-of-file
430  *
431  * read may block until data from other end is available. It will
432  * return 0 if the other end closed the pipe.
433  */
434  size_type read(void* addr, size_type sz);
435 
436  /** @brief wirte to pipe
437  *
438  * @param addr where to get data to write from
439  * @param sz size of data to write (in bytes)
440  * @returns size of data written, or 0 in case of end-of-file
441  *
442  * write may block until data can be written to other end (depends a
443  * bit on available buffer space). It will return 0 if the other end
444  * closed the pipe. The data is queued to be written on the next
445  * convenient occasion, or it can be forced out with flush().
446  */
447  size_type write(const void* addr, size_type sz);
448 
449  /** @brief flush buffers with unwritten data
450  *
451  * This forces unwritten data to be written to the other end. The call
452  * will block until this has been done (or the attempt failed with an
453  * error).
454  */
455  void flush();
456 
457  /** @brief purge buffered data waiting to be read and/or written
458  *
459  * Discards all internal buffers.
460  */
461  void purge();
462 
463  /** @brief number of bytes that can be read without blocking
464  *
465  * @returns number of bytes that can be read without blocking
466  */
468 
469  /** @brief number of bytes that can be written without blocking
470  *
471  * @returns number of bytes that can be written without blocking
472  */
474 
475  /** @brief flush buffers, close pipe
476  *
477  * Flush buffers, discard unread data, closes the pipe. If the pipe is
478  * in the parent process, it waits for the child.
479  *
480  * @returns exit code of child process in parent, zero in child
481  */
482  int close();
483 
484  /** @brief return PID of the process on the other end of the pipe
485  *
486  * @returns PID of the process running on the remote end
487  */
488  pid_t pidOtherEnd() const
489  { return isChild() ? m_parentPid : m_childPid; }
490 
491  /// condition flags for poll
492  enum PollFlags {
493  None = 0, ///< nothing special on this pipe
494  Readable = 1, ///< pipe has data for reading
495  Writable = 2, ///< pipe can be written to
496  ReadError = 4, ///< pipe error read end
497  WriteError = 8, ///< pipe error Write end
498  Error = ReadError | WriteError, ///< pipe error
499  ReadEndOfFile = 32, ///< read pipe in end-of-file state
500  WriteEndOfFile = 64,///< write pipe in end-of-file state
501  EndOfFile = ReadEndOfFile | WriteEndOfFile, ///< end of file
502  ReadInvalid = 64, ///< read end of pipe invalid
503  WriteInvalid = 128, ///< write end of pipe invalid
504  Invalid = ReadInvalid | WriteInvalid ///< invalid pipe
505  };
506 
507  /// for poll() interface
508  class PollEntry {
509  public:
510  BidirMMapPipe* pipe; ///< pipe of interest
511  unsigned events; ///< events of interest (or'ed bitmask)
512  unsigned revents; ///< events that happened (or'ed bitmask)
513  /// poll a pipe for all events
515  pipe(_pipe), events(None), revents(None) { }
516  /// poll a pipe for specified events
517  PollEntry(BidirMMapPipe* _pipe, int _events) :
518  pipe(_pipe), events(_events), revents(None) { }
519  };
520  /// convenience typedef for poll() interface
521  typedef std::vector<PollEntry> PollVector;
522 
523  /** @brief poll a set of pipes for events (ready to read from, ready to
524  * write to, error)
525  *
526  * @param pipes set of pipes to check
527  * @param timeout timeout in milliseconds
528  * @returns positive number: number of pipes which have
529  * status changes, 0: timeout, or no pipes with
530  * status changed, -1 on error
531  *
532  * Timeout can be zero (check for specified events, and return), finite
533  * (wait at most timeout milliseconds before returning), or -1
534  * (infinite). The poll method returns when the timeout has elapsed,
535  * or if an event occurs on one of the pipes being polled, whichever
536  * happens earlier.
537  *
538  * Pipes is a vector of one or more PollEntries, which each list a pipe
539  * and events to poll for. If events is left empty (zero), all
540  * conditions are polled for, otherwise only the indicated ones. On
541  * return, the revents fields contain the events that occurred for each
542  * pipe; error Error, EndOfFile or Invalid events are always set,
543  * regardless of wether they were in the set of requested events.
544  *
545  * poll may block slightly longer than specified by timeout due to OS
546  * timer granularity and OS scheduling. Due to its implementation, the
547  * poll call can also return early if the remote end of the page sends
548  * a free page while polling (which is put on that pipe's freelist),
549  * while that pipe is polled for e.g Reading. The status of the pipe is
550  * indicated correctly in revents, and the caller can simply poll
551  * again. (The reason this is done this way is because it helps to
552  * replenish the pool of free pages and queue busy pages without
553  * blocking.)
554  *
555  * Here's a piece of example code waiting on two pipes; if they become
556  * readable they are read:
557  * @code
558  * #include <unistd.h>
559  * #include <cstdlib>
560  * #include <string>
561  * #include <sstream>
562  * #include <iostream>
563  *
564  * #include "BidirMMapPipe.h"
565  *
566  * // what to execute in the child
567  * int randomchild(BidirMMapPipe& pipe)
568  * {
569  * ::srand48(::getpid());
570  * for (int i = 0; i < 5; ++i) {
571  * // sleep a random time between 0 and .9 seconds
572  * ::usleep(int(1e6 * ::drand48()));
573  * std::ostringstream buf;
574  * buf << "child pid " << ::getpid() << " sends message " << i;
575  * std::cout << "[CHILD] : " << buf.str() << std::endl;
576  * pipe << buf.str() << BidirMMapPipe::flush;
577  * if (!pipe) return -1;
578  * if (pipe.eof()) break;
579  * }
580  * // tell parent we're done
581  * pipe << "" << BidirMMapPipe::flush;
582  * // wait for parent to acknowledge
583  * std::string s;
584  * pipe >> s;
585  * pipe.close();
586  * return 0;
587  * }
588  *
589  * // function to spawn a child
590  * BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
591  * {
592  * BidirMMapPipe *p = new BidirMMapPipe();
593  * if (p->isChild()) {
594  * int retVal = childexec(*p);
595  * delete p;
596  * std::exit(retVal);
597  * }
598  * return p;
599  * }
600  *
601  * int main()
602  * {
603  * typedef BidirMMapPipe::PollEntry PollEntry;
604  * // poll data structure
605  * BidirMMapPipe::PollVector pipes;
606  * pipes.reserve(3);
607  * // spawn children
608  * for (int i = 0; i < 3; ++i) {
609  * pipes.push_back(PollEntry(spawnChild(randomchild),
610  * BidirMMapPipe::Readable));
611  * }
612  * // while at least some children alive
613  * while (!pipes.empty()) {
614  * // poll, wait until status change (infinite timeout)
615  * int npipes = BidirMMapPipe::poll(pipes, -1);
616  * // scan for pipes with changed status
617  * for (std::vector<PollEntry>::iterator it = pipes.begin();
618  * npipes && pipes.end() != it; ) {
619  * if (!it->revents) {
620  * // unchanged, next one
621  * ++it;
622  * continue;
623  * }
624  * --npipes; // maybe we can stop early...
625  * // read from pipes which are readable
626  * if (it->revents & BidirMMapPipe::Readable) {
627  * std::string s;
628  * *(it->pipe) >> s;
629  * if (!s.empty()) {
630  * std::cout << "[PARENT]: Read from pipe " <<
631  * it->pipe << ": " << s << std::endl;
632  * ++it;
633  * continue;
634  * } else {
635  * // child is shutting down...
636  * *(it->pipe) << "" << BidirMMapPipe::flush;
637  * goto childcloses;
638  * }
639  * }
640  * // retire pipes with error or end-of-file condition
641  * if (it->revents & (BidirMMapPipe::Error |
642  * BidirMMapPipe::EndOfFile |
643  * BidirMMapPipe::Invalid)) {
644  * std::cout << "[PARENT]: Error on pipe " <<
645  * it->pipe << " revents " << it->revents <<
646  * std::endl;
647  * childcloses:
648  * std::cout << "[PARENT]:\tchild exit status: " <<
649  * it->pipe->close() << std::endl;
650  * if (retVal) return retVal;
651  * delete it->pipe;
652  * it = pipes.erase(it);
653  * continue;
654  * }
655  * }
656  * }
657  * return 0;
658  * }
659  * @endcode
660  */
661  static int poll(PollVector& pipes, int timeout);
662 
663  /** @brief return if this end of the pipe is the parent end
664  *
665  * @returns true if parent end of pipe
666  */
667  bool isParent() const { return m_childPid; }
668 
669  /** @brief return if this end of the pipe is the child end
670  *
671  * @returns true if child end of pipe
672  */
673  bool isChild() const { return !m_childPid; }
674 
675  /** @brief if BidirMMapPipe uses a socketpair for communications
676  *
677  * @returns true if BidirMMapPipe uses a socketpair for communications
678  */
679  bool usesSocketpair() const { return m_inpipe == m_outpipe; }
680 
681  /** @brief if BidirMMapPipe uses a pipe pair for communications
682  *
683  * @returns true if BidirMMapPipe uses a pipe pair for communications
684  */
685  bool usesPipepair() const { return m_inpipe != m_outpipe; }
686 
687  /** @brief return flags (end of file, BidirMMapPipe closed, ...)
688  *
689  * @returns flags (end of file, BidirMMapPipe closed, ...)
690  */
691  int rdstate() const { return m_flags; }
692 
693  /** @brief true if end-of-file
694  *
695  * @returns true if end-of-file
696  */
697  bool eof() const { return m_flags & eofbit; }
698 
699  /** @brief logical failure (e.g. I/O on closed BidirMMapPipe)
700  *
701  * @returns true in case of grave logical error (I/O on closed pipe,...)
702  */
703  bool fail() const { return m_flags & failbit; }
704 
705  /** @brief true on I/O error
706  *
707  * @returns true on I/O error
708  */
709  bool bad() const { return m_flags & badbit; }
710 
711  /** @brief status of stream is good
712  *
713  * @returns true if pipe is good (no errors, eof, ...)
714  */
715  bool good() const { return !(m_flags & (eofbit | failbit | badbit)); }
716 
717  /** @brief true if closed
718  *
719  * @returns true if stream is closed
720  */
721  bool closed() const { return m_flags & failbit; }
722 
723  /** @brief return true if not serious error (fail/bad)
724  *
725  * @returns true if stream is does not have serious error (fail/bad)
726  *
727  * (if EOF, this is still true)
728  */
729  operator bool() const { return !fail() && !bad(); }
730 
731  /** @brief return true if serious error (fail/bad)
732  *
733  * @returns true if stream has a serious error (fail/bad)
734  */
735  bool operator!() const { return fail() || bad(); }
736 
737 #ifdef STREAMOP
738 #undef STREAMOP
739 #endif
740 #define STREAMOP(TYPE) \
741  BidirMMapPipe& operator<<(const TYPE& val) \
742  { write(&val, sizeof(TYPE)); return *this; } \
743  BidirMMapPipe& operator>>(TYPE& val) \
744  { read(&val, sizeof(TYPE)); return *this; }
745  STREAMOP(bool); ///< C++ style stream operators for bool
746  STREAMOP(char); ///< C++ style stream operators for char
747  STREAMOP(short); ///< C++ style stream operators for short
748  STREAMOP(int); ///< C++ style stream operators for int
749  STREAMOP(long); ///< C++ style stream operators for long
750  STREAMOP(long long); ///< C++ style stream operators for long long
751  STREAMOP(unsigned char); ///< C++ style stream operators for unsigned char
752  STREAMOP(unsigned short); ///< C++ style stream operators for unsigned short
753  STREAMOP(unsigned int); ///< C++ style stream operators for unsigned int
754  STREAMOP(unsigned long); ///< C++ style stream operators for unsigned long
755  STREAMOP(unsigned long long); ///< C++ style stream operators for unsigned long long
756  STREAMOP(float); ///< C++ style stream operators for float
757  STREAMOP(double); ///< C++ style stream operators for double
758 #undef STREAMOP
759 
760  /** @brief write a C-style string
761  *
762  * @param str C-style string
763  * @returns pipe written to
764  */
765  BidirMMapPipe& operator<<(const char* str);
766 
767  /** @brief read a C-style string
768  *
769  * @param str pointer to string (space allocated with malloc!)
770  * @returns pipe read from
771  *
772  * since this is for C-style strings, we use malloc/realloc/free for
773  * strings. passing in a NULL pointer is valid here, and the routine
774  * will use realloc to allocate a chunk of memory of the right size.
775  */
776  BidirMMapPipe& operator>>(char* (&str));
777 
778  /** @brief write a std::string object
779  *
780  * @param str string to write
781  * @returns pipe written to
782  */
783  BidirMMapPipe& operator<<(const std::string& str);
784 
785  /** @brief read a std::string object
786  *
787  * @param str string to be read
788  * @returns pipe read from
789  */
790  BidirMMapPipe& operator>>(std::string& str);
791 
792  /** @brief write raw pointer to T to other side
793  *
794  * NOTE: This will not write the pointee! Only the value of the
795  * pointer is transferred.
796  *
797  * @param tptr pointer to be written
798  * @returns pipe written to
799  */
800  template<class T> BidirMMapPipe& operator<<(const T* tptr)
801  { write(&tptr, sizeof(tptr)); return *this; }
802 
803  /** @brief read raw pointer to T from other side
804  *
805  * NOTE: This will not read the pointee! Only the value of the
806  * pointer is transferred.
807  *
808  * @param tptr pointer to be read
809  * @returns pipe read from
810  */
811  template<class T> BidirMMapPipe& operator>>(T* &tptr)
812  { read(&tptr, sizeof(tptr)); return *this; }
813 
814  /** @brief I/O manipulator support
815  *
816  * @param f manipulator
817  * @returns pipe with manipulator applied
818  *
819  * example:
820  * @code
821  * pipe << BidirMMapPipe::flush;
822  * @endcode
823  */
825  { return manip(*this); }
826 
827  /** @brief I/O manipulator support
828  *
829  * @param f manipulator
830  * @returns pipe with manipulator applied
831  *
832  * example:
833  * @code
834  * pipe >> BidirMMapPipe::purge;
835  * @endcode
836  */
838  { return manip(*this); }
839 
840  /// for usage a la "pipe << flush;"
841  static BidirMMapPipe& flush(BidirMMapPipe& pipe) { pipe.flush(); return pipe; }
842  /// for usage a la "pipe << purge;"
843  static BidirMMapPipe& purge(BidirMMapPipe& pipe) { pipe.purge(); return pipe; }
844 
845  private:
846  /// copy-construction forbidden
848  /// assignment forbidden
849  BidirMMapPipe& operator=(const BidirMMapPipe&) { return *this; }
850 
851  /// page is our friend
853  /// convenience typedef for Page
855 
856  /// tuning constants
857  enum {
858  // TotPages = 16 will give 32k buffers at 4k page size for both
859  // parent and child; if your average message to send is larger
860  // than this, consider raising the value (max 256)
861  TotPages = 16, ///< pages shared (child + parent)
862 
863  PagesPerEnd = TotPages / 2, ///< pages per pipe end
864 
865  // if FlushThresh pages are filled, the code forces a flush; 3/4
866  // of the pages available seems to work quite well
867  FlushThresh = (3 * PagesPerEnd) / 4 ///< flush threshold
868  };
869 
870  // per-class members
871  static pthread_mutex_t s_openpipesmutex; ///< protects s_openpipes
872  /// list of open BidirMMapPipes
873  static std::list<BidirMMapPipe*> s_openpipes;
874  /// pool of mmapped pages
875  static BidirMMapPipe_impl::PagePool* s_pagepool;
876  /// page pool reference counter
877  static unsigned s_pagepoolrefcnt;
878 
879  /// return page pool
880  static BidirMMapPipe_impl::PagePool& pagepool();
881 
882  // per-instance members
883  BidirMMapPipe_impl::Pages m_pages; ///< mmapped pages
884  Page* m_busylist; ///< linked list: busy pages (data to be read)
885  Page* m_freelist; ///< linked list: free pages
886  Page* m_dirtylist; ///< linked list: dirty pages (data to be sent)
887  int m_inpipe; ///< pipe end from which data may be read
888  int m_outpipe; ///< pipe end to which data may be written
889  int m_flags; ///< flags (e.g. end of file)
890  pid_t m_childPid; ///< pid of the child (zero if we're child)
891  pid_t m_parentPid; ///< pid of the parent
892 
893  /// cleanup routine - at exit, we want our children to get a SIGTERM...
894  static void teardownall(void);
895 
896  /// return length of a page list
897  static unsigned lenPageList(const Page* list);
898 
899  /** "feed" the busy and free lists with a list of pages
900  *
901  * @param plist linked list of pages
902  *
903  * goes through plist, puts free pages from plist onto the freelist
904  * (or sends them to the remote end if they belong there), and puts
905  * non-empty pages on plist onto the busy list
906  */
907  void feedPageLists(Page* plist);
908 
909  /// put on dirty pages list
910  void markPageDirty(Page* p);
911 
912  /// transfer bytes through the pipe (reading, writing, may block)
913  static size_type xferraw(int fd, void* addr, size_type len,
914  ssize_t (*xferfn)(int, void*, std::size_t));
915  /// transfer bytes through the pipe (reading, writing, may block)
916  static size_type xferraw(int fd, void* addr, const size_type len,
917  ssize_t (*xferfn)(int, const void*, std::size_t))
918  {
919  return xferraw(fd, addr, len,
920  reinterpret_cast<ssize_t (*)(
921  int, void*, std::size_t)>(xferfn));
922  }
923 
924  /** @brief send page(s) to the other end (may block)
925  *
926  * @param plist linked list of pages to send
927  *
928  * the implementation gathers the different write(s) whereever
929  * possible; if mmap works, this results in a single write to transfer
930  * the list of pages sent, if we need to copy things through the pipe,
931  * we have one write to transfer which pages are sent, and then one
932  * write per page.
933  */
934  void sendpages(Page* plist);
935 
936  /** @brief receive a pages from the other end (may block), queue them
937  *
938  * @returns number of pages received
939  *
940  * this is an application-level scatter read, which gets the list of
941  * pages to read from the pipe. if mmap works, it needs only one read
942  * call (to get the head of the list of pages transferred). if we need
943  * to copy pages through the pipe, we need to add one read for each
944  * empty page, and two reads for each non-empty page.
945  */
946  unsigned recvpages();
947 
948  /** @brief receive pages from other end (non-blocking)
949  *
950  * @returns number of pages received
951  *
952  * like recvpages(), but does not block if nothing is available for
953  * reading
954  */
955  unsigned recvpages_nonblock();
956 
957  /// get a busy page to read data from (may block)
958  Page* busypage();
959  /// get a dirty page to write data to (may block)
960  Page* dirtypage();
961 
962  /// close the pipe (no flush if forced)
963  int doClose(bool force, bool holdlock = false);
964  /// perform the flush
965  void doFlush(bool forcePartialPages = true);
966 #endif //_WIN32
967 };
968 
970 
971 #undef BEGIN_NAMESPACE_ROOFIT
972 #undef END_NAMESPACE_ROOFIT
973 
974 #endif // BIDIRMMAPPIPE_H
975 
976 // vim: ft=cpp:sw=4:tw=78
pid_t pidOtherEnd() const
return PID of the process on the other end of the pipe
std::size_t size_type
type used to represent sizes
#define BEGIN_NAMESPACE_ROOFIT
Definition: BidirMMapPipe.h:19
PagePool * m_parent
parent page pool
Definition: BidirMMapPipe.h:68
unsigned char m_npages
length in pages
static size_type xferraw(int fd, void *addr, size_type len, ssize_t(*xferfn)(int, void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
BidirMMapPipe & operator<<(const T *tptr)
write raw pointer to T to other side
BidirMMapPipe * pipe
pipe of interest
pipe error Write end
PollEntry(BidirMMapPipe *_pipe, int _events)
poll a pipe for specified events
bool eof() const
true if end-of-file
unsigned revents
events that happened (or'ed bitmask)
size_type write(const void *addr, size_type sz)
wirte to pipe
read pipe in end-of-file state
bool isChild() const
return if this end of the pipe is the child end
handle class for a number of Pages
PageChunk(const PageChunk &)
forbid copying
Definition: BidirMMapPipe.h:80
impl * m_pimpl
pointer to implementation
unsigned m_refcnt
reference counter
Page * m_freelist
linked list: free pages
bool bad() const
true on I/O error
write pipe in end-of-file state
unsigned npages() const
return number of pages accessible
don't know yet what'll work
Definition: BidirMMapPipe.h:48
BidirMMapPipeException Exception
convenience typedef
Definition: BidirMMapPipe.h:61
unsigned events
events of interest (or'ed bitmask)
BidirMMapPipe & operator<<(const char *str)
write a C-style string
void doFlush(bool forcePartialPages=true)
perform the flush
read end of pipe invalid
PollEntry(BidirMMapPipe *_pipe)
poll a pipe for all events
static MMapVariety s_mmapworks
mmap variety that works on this system
Definition: BidirMMapPipe.h:58
unsigned m_nUsedGrp
number of used page groups
Definition: BidirMMapPipe.h:70
size_t
Definition: TBuffer.cxx:28
pid_t m_childPid
pid of the child (zero if we're child)
static size_type xferraw(int fd, void *addr, const size_type len, ssize_t(*xferfn)(int, const void *, std::size_t))
transfer bytes through the pipe (reading, writing, may block)
pipe error read end
void markPageDirty(Page *p)
put on dirty pages list
~BidirMMapPipe()
destructor
unsigned nPagesPerGroup() const
return number of pages per page group
TTree * T
Pages & operator=(const Pages &other)
assignment operator
class representing a chunk of pages
Definition: BidirMMapPipe.h:44
Page * m_busylist
linked list: busy pages (data to be read)
write end of pipe invalid
bool good() const
status of stream is good
size_type read(void *addr, size_type sz)
read from pipe
pages shared (child + parent)
static MMapVariety mmapVariety()
return mmap variety support found
Definition: BidirMMapPipe.h:87
void flush()
flush buffers with unwritten data
BidirMMapPipe_impl::Page Page
convenience typedef for Page
static unsigned pagesize()
return page size
Page * m_dirtylist
linked list: dirty pages (data to be sent)
PollFlags
condition flags for poll
#define END_NAMESPACE_ROOFIT
Definition: BidirMMapPipe.h:20
void sendpages(Page *plist)
send page(s) to the other end (may block)
unsigned len() const
return length of chunk
Page * page(unsigned pgno) const
return page number pageno
pipe can be written to
Page * busypage()
get a busy page to read data from (may block)
size_type bytesReadableNonBlocking()
number of bytes that can be read without blocking
Pages()
default constructor
int close()
flush buffers, close pipe
Page * operator[](unsigned pgno) const
return page number pageno
void swap(Pages &other)
swap with other's contents
Page * dirtypage()
get a dirty page to write data to (may block)
void purge()
purge buffered data waiting to be read and/or written
std::vector< PollEntry > PollVector
convenience typedef for poll() interface
unsigned m_nPgPerGrp
number of pages per group
Definition: BidirMMapPipe.h:69
static unsigned lenPageList(const Page *list)
return length of a page list
BidirMMapPipe & operator=(const BidirMMapPipe &)
assignment forbidden
bool fail() const
logical failure (e.g.
static BidirMMapPipe & purge(BidirMMapPipe &pipe)
for usage a la "pipe << purge;"
Pages pop()
pop a group of pages off the free list
general I/O error
Double_t length(const TVector2 &v)
Definition: CsgOps.cxx:347
static BidirMMapPipe_impl::PagePool & pagepool()
return page pool
int m_outpipe
pipe end to which data may be written
bool contains(const Pages &p) const
return if p is contained in this PageChunk
logical failure (e.g. pipe closed)
void * m_begin
pointer to start of mmapped area
Definition: BidirMMapPipe.h:63
static BidirMMapPipe & flush(BidirMMapPipe &pipe)
for usage a la "pipe << flush;"
bool usesSocketpair() const
if BidirMMapPipe uses a socketpair for communications
unsigned recvpages_nonblock()
receive pages from other end (non-blocking)
int doClose(bool force, bool holdlock=false)
close the pipe (no flush if forced)
error reporting with exceptions
bool full() const
return true if no free page groups in this chunk
BidirMMapPipe creates a bidirectional channel between the current process and a child it forks...
pid_t m_parentPid
pid of the parent
static void domunmap(void *p, unsigned len)
munmap pages p, len is length of mmapped area in bytes
static pthread_mutex_t s_openpipesmutex
protects s_openpipes
size_type bytesWritableNonBlocking()
number of bytes that can be written without blocking
bool isParent() const
return if this end of the pipe is the parent end
static int poll(PollVector &pipes, int timeout)
poll a set of pipes for events (ready to read from, ready to write to, error)
friend class BidirMMapPipe_impl::Page
page is our friend
static BidirMMapPipe_impl::PagePool * s_pagepool
pool of mmapped pages
bool empty() const
return true if no used page groups in this chunk
bool closed() const
true if closed
MMapVariety
type of mmap support found
Definition: BidirMMapPipe.h:47
PageChunk & operator=(const PageChunk &)
forbid assignment
Definition: BidirMMapPipe.h:82
void zap(Pages &p)
free all pages except for those pointed to by p
unsigned operator[](Page *p) const
perform page to page number mapping
for poll() interface
unsigned recvpages()
receive a pages from the other end (may block), queue them
int rdstate() const
return flags (end of file, BidirMMapPipe closed, ...)
static void * dommap(unsigned len)
mmap pages, len is length of mmapped area in bytes
static unsigned getPageSize()
determine page size at run time
end of file reached
STREAMOP(bool)
C++ style stream operators for bool.
static void teardownall(void)
cleanup routine - at exit, we want our children to get a SIGTERM...
std::list< void * > m_freelist
free pages list
Definition: BidirMMapPipe.h:67
BidirMMapPipe & operator>>(char *(&str))
read a C-style string
void * m_end
pointer one behind end of mmapped area
Definition: BidirMMapPipe.h:64
PageChunk * m_parent
pointer to parent pool
BidirMMapPipe_impl::BidirMMapPipeException Exception
convenience typedef for BidirMMapPipeException
void push(const Pages &p)
push a group of pages onto the free list
int m_inpipe
pipe end from which data may be read
BidirMMapPipe & operator>>(T *&tptr)
read raw pointer to T from other side
void feedPageLists(Page *plist)
"feed" the busy and free lists with a list of pages
BidirMMapPipe & operator<<(BidirMMapPipe &(*manip)(BidirMMapPipe &))
I/O manipulator support.
Page * m_pages
pointer to first page
BidirMMapPipe_impl::Pages m_pages
mmapped pages
static std::list< BidirMMapPipe * > s_openpipes
list of open BidirMMapPipes
BidirMMapPipe(bool useExceptions=true, bool useSocketpair=false)
constructor (forks!)
static unsigned s_pagepoolrefcnt
page pool reference counter
static unsigned s_pagesize
system page size (run-time determined)
Definition: BidirMMapPipe.h:56
pipe has data for reading
nothing special on this pipe
bool usesPipepair() const
if BidirMMapPipe uses a pipe pair for communications
int m_flags
flags (e.g. end of file)
BidirMMapPipe & operator>>(BidirMMapPipe &(*manip)(BidirMMapPipe &))
I/O manipulator support.
mmap doesn't work, have to copy back and forth
Definition: BidirMMapPipe.h:49
static unsigned pagesize()
return the page size of the system
Definition: BidirMMapPipe.h:85
pages per pipe end
unsigned pageno(Page *p) const
perform page to page number mapping
bool operator!() const
return true if serious error (fail/bad)