Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
BidirMMapPipe.h
Go to the documentation of this file.
1/// \cond ROOFIT_INTERNAL
2
3/** @file BidirMMapPipe.h
4 *
5 * header file for BidirMMapPipe, a class which forks off a child process and
6 * serves as communications channel between parent and child
7 *
8 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
9 * @date 2013-07-07
10 */
11
12#ifndef BIDIRMMAPPIPE_H
13#define BIDIRMMAPPIPE_H
14
15#include <cstring>
16#include <list>
17#include <pthread.h>
18#include <string>
19#include <unistd.h>
20#include <vector>
21
22#define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
23#define END_NAMESPACE_ROOFIT }
24
25BEGIN_NAMESPACE_ROOFIT
26
27/// namespace for implementation details of BidirMMapPipe
28namespace BidirMMapPipe_impl {
29 // forward declarations
30 class BidirMMapPipeException;
31 class Page;
32 class PagePool;
33 class Pages;
34
35 /** @brief class representing a chunk of pages
36 *
37 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
38 * @date 2013-07-24
39 *
40 * allocating pages from the OS happens in chunks in order to not exhaust
41 * the maximum allowed number of memory mappings per process; this class
42 * takes care of such a chunk
43 *
44 * a page chunk allows callers to obtain or release pages in groups of
45 * continuous pages of fixed size
46 */
47 class PageChunk {
48 public:
49 /// type of mmap support found
50 typedef enum {
51 Unknown, ///< don't know yet what'll work
52 Copy, ///< mmap doesn't work, have to copy back and forth
53 FileBacked, ///< mmapping a temp file works
54 DevZero, ///< mmapping /dev/zero works
55 Anonymous ///< anonymous mmap works
56 } MMapVariety;
57
58 private:
59 static unsigned s_physpgsz; ///< system physical page size
60 static unsigned s_pagesize; ///< logical page size (run-time determined)
61 /// mmap variety that works on this system
62 static MMapVariety s_mmapworks;
63
64 /// convenience typedef
65 typedef BidirMMapPipeException Exception;
66
67 void* m_begin; ///< pointer to start of mmapped area
68 void* m_end; ///< pointer one behind end of mmapped area
69 // FIXME: cannot keep freelist inline - other end may need that
70 // data, and we'd end up overwriting the page header
71 std::list<void*> m_freelist; ///< free pages list
72 PagePool* m_parent; ///< parent page pool
73 unsigned m_nPgPerGrp; ///< number of pages per group
74 unsigned m_nUsedGrp; ///< number of used page groups
75
76 /// determine page size at run time
77 static unsigned getPageSize();
78
79 /// mmap pages, len is length of mmapped area in bytes
80 static void* dommap(unsigned len);
81 /// munmap pages p, len is length of mmapped area in bytes
82 static void domunmap(void* p, unsigned len);
83 /// forbid copying
84 PageChunk(const PageChunk&) {}
85 /// forbid assignment
86 PageChunk& operator=(const PageChunk&) { return *this; }
87 public:
88 /// return the logical page size
89 static unsigned pagesize() { return s_pagesize; }
90 /// return the physical page size of the system
91 static unsigned physPgSz() { return s_physpgsz; }
92 /// return mmap variety support found
93 static MMapVariety mmapVariety() { return s_mmapworks; }
94
95 /// constructor
96 PageChunk(PagePool* parent, unsigned length, unsigned nPgPerGroup);
97
98 /// destructor
99 ~PageChunk();
100
101 /// return if p is contained in this PageChunk
102 bool contains(const Pages& p) const;
103
104 /// pop a group of pages off the free list
105 Pages pop();
106
107 /// push a group of pages onto the free list
108 void push(const Pages& p);
109
110 /// return length of chunk
111 unsigned len() const
112 {
113 return reinterpret_cast<unsigned char*>(m_end) -
114 reinterpret_cast<unsigned char*>(m_begin);
115 }
116 /// return number of pages per page group
117 unsigned nPagesPerGroup() const { return m_nPgPerGrp; }
118
119 /// return true if no used page groups in this chunk
120 bool empty() const { return !m_nUsedGrp; }
121
122 /// return true if no free page groups in this chunk
123 bool full() const { return m_freelist.empty(); }
124
125 /// free all pages except for those pointed to by p
126 void zap(Pages& p);
127 };
128
129 /** @brief handle class for a number of Pages
130 *
131 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
132 * @date 2013-07-24
133 *
134 * the associated pages are continuous in memory
135 */
136 class Pages {
137 private:
138 /// implementation
139 typedef struct {
140 PageChunk *m_parent; ///< pointer to parent pool
141 Page* m_pages; ///< pointer to first page
142 unsigned m_refcnt; ///< reference counter
143 unsigned char m_npages; ///< length in pages
144 } impl;
145 public:
146 /// default constructor
147 Pages() = default;
148
149 /// destructor
150 ~Pages();
151
152 /** @brief copy constructor
153 *
154 * copy Pages handle to new object - old object loses ownership,
155 * and becomes a dangling handle
156 */
157 Pages(const Pages& other);
158
159 /** @brief assignment operator
160 *
161 * assign Pages handle to new object - old object loses ownership,
162 * and becomes a dangling handle
163 */
164 Pages& operator=(const Pages& other);
165
166 /// return page size
167 static unsigned pagesize();
168
169 /// return number of pages accessible
170 unsigned npages() const { return m_pimpl->m_npages; }
171
172 /// return page number pageno
173 Page* page(unsigned pgno) const;
174
175 /// return page number pageno
176 Page* operator[](unsigned pgno) const { return page(pgno); }
177
178 /// perform page to page number mapping
179 unsigned pageno(Page* p) const;
180
181 /// perform page to page number mapping
182 unsigned operator[](Page* p) const { return pageno(p); }
183
184 /// swap with other's contents
185 void swap(Pages& other)
186 {
187 impl* tmp = other.m_pimpl;
188 other.m_pimpl = m_pimpl;
189 m_pimpl = tmp;
190 }
191
192 private:
193 /// page pool is our friend - it's allowed to construct Pages
194 friend class BidirMMapPipe_impl::PageChunk;
195
196 /// pointer to implementation
197 impl* m_pimpl = nullptr;
198
199 /// constructor
200 Pages(PageChunk* parent, Page* pages, unsigned npg);
201 };
202}
203
204/** @brief BidirMMapPipe creates a bidirectional channel between the current
205 * process and a child it forks.
206 *
207 * @author Manuel Schiller <manuel.schiller@nikhef.nl>
208 * @date 2013-07-07
209 *
210 * This class creates a bidirectional channel between this process and a child
211 * it creates with fork().
212 *
213 * The channel is comrised of a small shared pool of buffer memory mmapped into
214 * both process spaces, and two pipes to synchronise the exchange of data. The
215 * idea behind using the pipes at all is to have some primitive which we can
216 * block on without having to worry about atomic operations or polling, leaving
217 * these tasks to the OS. In case the anonymous mmap cannot be performed on the
218 * OS the code is running on (for whatever reason), the code falls back to
219 * mmapping /dev/zero, mmapping a temporary file, or (if those all fail), a
220 * dynamically allocated buffer which is then transmitted through the pipe(s),
221 * a slightly slower alternative (because the data is copied more often).
222 *
223 * The channel supports five major operations: read(), write(), flush(),
224 * purge() and close(). Reading and writing may block until the required buffer
225 * space is available. Writes may queue up data to be sent to the other end
226 * until either enough pages are full, or the user calls flush which forces
227 * any unsent buffers to be sent to the other end. flush forces any data that
228 * is to be sent to be sent. purge discards any buffered data waiting to be
229 * read and/or sent. Closing the channel on the child returns zero, closing it
230 * on the parent returns the child's exit status.
231 *
232 * The class also provides operator<< and operator>> for C++-style I/O for
233 * basic data types (bool, char, short, int, long, long long, float, double
234 * and their unsigned counterparts). Data is transmitted binary (i.e. no
235 * formatting to strings like std::cout does). There are also overloads to
236 * support C-style zero terminated strings and std::string. In terms of
237 * performance, the former is to be preferred.
238 *
239 * If the caller needs to multiplex input and output to/from several pipes, the
240 * class provides the poll() method which allows to block until an event occurs
241 * on any of the polled pipes.
242 *
243 * After the BidirMMapPipe is closed, no further operations may be performed on
244 * that object, save for the destructor which may still be called.
245 *
246 * If the BidirMMapPipe has not properly been closed, the destructor will call
247 * close. However, the exit code of the child is lost in that case.
248 *
249 * Closing the object causes the mmapped memory to be unmapped and the two
250 * pipes to be closed. We also install an atexit handler in the process of
251 * creating BidirMMapPipes. This ensures that when the current process
252 * terminates, a SIGTERM signal is sent to the child processes created for all
253 * unclosed pipes to avoid leaving zombie processes in the OS's process table.
254 *
255 * BidirMMapPipe creation, closing and destruction are thread safe. If the
256 * BidirMMapPipe is used in more than one thread, the other operations have to
257 * be protected with a mutex (or something similar), though.
258 *
259 * End of file (other end closed its pipe, or died) is indicated with the eof()
260 * method, serious I/O errors set a flags (bad(), fail()), and also throw
261 * exceptions. For normal read/write operations, they can be suppressed (i.e.
262 * error reporting only using flags) with a constructor argument.
263 *
264 * Technicalities:
265 * - there is a pool of mmapped pages, half the pages are allocated to the
266 * parent process, half to the child
267 * - when one side has accumulated enough data (or a flush forces dirty pages
268 * out to the other end), it sends these pages to the other end by writing a
269 * byte containing the page number into the pipe
270 * - the other end (which has the pages mmapped, too) reads the page number(s)
271 * and puts the corresponding pages on its busy list
272 * - as the other ends reads, it frees busy pages, and eventually tries to put
273 * them on the its list; if a page belongs to the other end of the
274 * connection, it is sent back
275 * - lists of pages are sent across the pipe, not individual pages, in order
276 * to minimise the number of read/write operations needed
277 * - when mmap works properly, only one bytes containing the page number of
278 * the page list head is sent back and forth; the contents of that page
279 * allow to access the rest of the page list sent, and page headers on the
280 * list tell the receiving end if the page is free or has to be added to the
281 * busy list
282 * - when mmap does not work, we transfer one byte to indicate the head of the
283 * page list sent, and for each page on the list of sent pages, the page
284 * header and the page payload is sent (if the page is free, we only
285 * transmit the page header, and we never transmit more payload than
286 * the page actually contains)
287 * - in the child, all open BidirMMapPipes but the current one are closed. this
288 * is done for two reasons: first, to conserve file descriptors and address
289 * space. second, if more than one process is meant to use such a
290 * BidirMMapPipe, synchronisation issues arise which can lead to bugs that
291 * are hard to find and understand. it's much better to come up with a design
292 * which does not need pipes to be shared among more than two processes.
293 *
294 * Here is a trivial example of a parent and a child talking to each other over
295 * a BidirMMapPipe:
296 * @code
297 * #include <string>
298 * #include <iostream>
299 * #include <cstdlib>
300 *
301 * #include "BidirMMapPipe.h"
302 *
303 * int simplechild(BidirMMapPipe& pipe)
304 * {
305 * // child does an echo loop
306 * while (pipe.good() && !pipe.eof()) {
307 * // read a string
308 * std::string str;
309 * pipe >> str;
310 * if (!pipe) return -1;
311 * if (pipe.eof()) break;
312 * // check if parent wants us to shut down
313 * if (!str.empty()) {
314 * std::cout << "[CHILD] : read: " << str << std::endl;
315 * str = "... early in the morning?";
316 * }
317 * pipe << str << BidirMMapPipe::flush;
318 * if (str.empty()) break;
319 * if (!pipe) return -1;
320 * std::cout << "[CHILD] : wrote: " << str << std::endl;
321 * }
322 * // send shutdown request acknowledged
323 * pipe << "" << BidirMMapPipe::flush;
324 *
325 * pipe.close();
326 * return 0;
327 * }
328 *
329 * BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
330 * {
331 * BidirMMapPipe *p = new BidirMMapPipe();
332 * if (p->isChild()) {
333 * int retVal = childexec(*p);
334 * delete p;
335 * std::exit(retVal);
336 * }
337 * return p;
338 * }
339 *
340 * int main()
341 * {
342 * std::cout << "[PARENT]: simple challenge-response test, one child:" <<
343 * std::endl;
344 * BidirMMapPipe* pipe = spawnChild(simplechild);
345 * for (int i = 0; i < 5; ++i) {
346 * std::string str("What shall we do with a drunken sailor...");
347 * *pipe << str << BidirMMapPipe::flush;
348 * if (!*pipe) return -1;
349 * std::cout << "[PARENT]: wrote: " << str << std::endl;
350 * *pipe >> str;
351 * if (!*pipe) return -1;
352 * std::cout << "[PARENT]: read: " << str << std::endl;
353 * }
354 * // ask child to shut down
355 * pipe << "" << BidirMMapPipe::flush;
356 * // wait for it to see the shutdown request
357 * std::string s;
358 * pipe >> s;
359 * std::cout << "[PARENT]: exit status of child: " << pipe->close() <<
360 * std::endl;
361 * delete pipe;
362 * return 0;
363 * }
364 * @endcode
365 *
366 * When designing your own protocols to use over the pipe, there are a few
367 * things to bear in mind:
368 * - Do as http does: When building a request, send all the options and
369 * properties of that request with the request itself in a single go (one
370 * flush). Then, the server has everything it needs, and hopefully, it'll
371 * shut up for a while and to let the client do something useful in the
372 * meantime... The same goes when the server replies to the request: include
373 * everything there is to know about the result of the request in the reply.
374 * - The expensive operation should be the request that is made, all other
375 * operations should somehow be formulated as options or properties to that
376 * request.
377 * - Include a shutdown handshake in whatever protocol you send over the
378 * pipe. That way, you can shut things down in a controlled way. Otherwise,
379 * and depending on your OS's scheduling quirks, you may catch a SIGPIPE if
380 * one end closes its pipe while the other is still trying to read.
381 */
382class BidirMMapPipe {
383#ifndef _WIN32
384 public:
385 /// type used to represent sizes
386 typedef std::size_t size_type;
387 /// convenience typedef for BidirMMapPipeException
388 typedef BidirMMapPipe_impl::BidirMMapPipeException Exception;
389 /// flag bits for partial C++ iostream compatibility
390 enum {
391 eofbit = 1, ///< end of file reached
392 failbit = 2, ///< logical failure (e.g. pipe closed)
393 rderrbit = 4, ///< read error
394 wrerrbit = 8, ///< write error
395 badbit = rderrbit | wrerrbit, ///< general I/O error
396 exceptionsbit = 16 ///< error reporting with exceptions
397 };
398
399 /** @brief constructor (forks!)
400 *
401 * Creates a bidirectional communications channel between this process
402 * and a child the constructor forks. On return from the constructor,
403 * isParent() and isChild() can be used to tell the parent end from the
404 * child end of the pipe. In the child, all other open BidirMMapPipes
405 * are closed.
406 *
407 * @param useExceptions read()/write() error reporting also done using
408 * exceptions
409 * @param useSocketpair use a socketpair instead of a pair or pipes
410 *
411 * Normally, exceptions are thrown for all serious I/O errors (apart
412 * from end of file). Setting useExceptions to false will force the
413 * read() and write() methods to only report serious I/O errors using
414 * flags.
415 *
416 * When useSocketpair is true, use a pair of Unix domain sockets
417 * created using socketpair instead a pair of pipes. The advantage is
418 * that only one pair of file descriptors is needed instead of two
419 * pairs which are needed for the pipe pair. Performance should very
420 * similar on most platforms, especially if mmap works, since only
421 * very little data is sent through the pipe(s)/socketpair.
422 */
423 BidirMMapPipe(bool useExceptions = true, bool useSocketpair = false);
424
425 /** @brief destructor
426 *
427 * closes this end of pipe
428 */
429 ~BidirMMapPipe();
430
431 /** @brief return the current setting of the debug flag
432 *
433 * @returns an integer with the debug Setting
434 */
435 static int debugflag() { return s_debugflag; }
436
437 /** @brief set the debug flags
438 *
439 * @param flag debug flags (if zero, no messages are printed)
440 */
441 static void setDebugflag(int flag) { s_debugflag = flag; }
442
443 /** @brief read from pipe
444 *
445 * @param addr where to put read data
446 * @param sz size of data to read (in bytes)
447 * @returns size of data read, or 0 in case of end-of-file
448 *
449 * read may block until data from other end is available. It will
450 * return 0 if the other end closed the pipe.
451 */
452 size_type read(void* addr, size_type sz);
453
454 /** @brief write to pipe
455 *
456 * @param addr where to get data to write from
457 * @param sz size of data to write (in bytes)
458 * @returns size of data written, or 0 in case of end-of-file
459 *
460 * write may block until data can be written to other end (depends a
461 * bit on available buffer space). It will return 0 if the other end
462 * closed the pipe. The data is queued to be written on the next
463 * convenient occasion, or it can be forced out with flush().
464 */
465 size_type write(const void* addr, size_type sz);
466
467 /** @brief flush buffers with unwritten data
468 *
469 * This forces unwritten data to be written to the other end. The call
470 * will block until this has been done (or the attempt failed with an
471 * error).
472 */
473 void flush();
474
475 /** @brief purge buffered data waiting to be read and/or written
476 *
477 * Discards all internal buffers.
478 */
479 void purge();
480
481 /** @brief number of bytes that can be read without blocking
482 *
483 * @returns number of bytes that can be read without blocking
484 */
485 size_type bytesReadableNonBlocking();
486
487 /** @brief number of bytes that can be written without blocking
488 *
489 * @returns number of bytes that can be written without blocking
490 */
491 size_type bytesWritableNonBlocking();
492
493 /** @brief flush buffers, close pipe
494 *
495 * Flush buffers, discard unread data, closes the pipe. If the pipe is
496 * in the parent process, it waits for the child.
497 *
498 * @returns exit code of child process in parent, zero in child
499 */
500 int close();
501
502 /** @brief return PID of the process on the other end of the pipe
503 *
504 * @returns PID of the process running on the remote end
505 */
506 pid_t pidOtherEnd() const
507 { return isChild() ? m_parentPid : m_childPid; }
508
509 /// condition flags for poll
510 enum PollFlags {
511 None = 0, ///< nothing special on this pipe
512 Readable = 1, ///< pipe has data for reading
513 Writable = 2, ///< pipe can be written to
514 ReadError = 4, ///< pipe error read end
515 WriteError = 8, ///< pipe error Write end
516 Error = ReadError | WriteError, ///< pipe error
517 ReadEndOfFile = 32, ///< read pipe in end-of-file state
518 WriteEndOfFile = 64,///< write pipe in end-of-file state
519 EndOfFile = ReadEndOfFile | WriteEndOfFile, ///< end of file
520 ReadInvalid = 64, ///< read end of pipe invalid
521 WriteInvalid = 128, ///< write end of pipe invalid
522 Invalid = ReadInvalid | WriteInvalid ///< invalid pipe
523 };
524
525 /// for poll() interface
526 class PollEntry {
527 public:
528 BidirMMapPipe* pipe; ///< pipe of interest
529 unsigned events; ///< events of interest (or'ed bitmask)
530 unsigned revents; ///< events that happened (or'ed bitmask)
531 /// poll a pipe for all events
532 PollEntry(BidirMMapPipe* _pipe) :
533 pipe(_pipe), events(None), revents(None) { }
534 /// poll a pipe for specified events
535 PollEntry(BidirMMapPipe* _pipe, int _events) :
536 pipe(_pipe), events(_events), revents(None) { }
537 };
538 /// convenience typedef for poll() interface
539 typedef std::vector<PollEntry> PollVector;
540
541 /** @brief poll a set of pipes for events (ready to read from, ready to
542 * write to, error)
543 *
544 * @param pipes set of pipes to check
545 * @param timeout timeout in milliseconds
546 * @returns positive number: number of pipes which have
547 * status changes, 0: timeout, or no pipes with
548 * status changed, -1 on error
549 *
550 * Timeout can be zero (check for specified events, and return), finite
551 * (wait at most timeout milliseconds before returning), or -1
552 * (infinite). The poll method returns when the timeout has elapsed,
553 * or if an event occurs on one of the pipes being polled, whichever
554 * happens earlier.
555 *
556 * Pipes is a vector of one or more PollEntries, which each list a pipe
557 * and events to poll for. If events is left empty (zero), all
558 * conditions are polled for, otherwise only the indicated ones. On
559 * return, the revents fields contain the events that occurred for each
560 * pipe; error Error, EndOfFile or Invalid events are always set,
561 * regardless of whether they were in the set of requested events.
562 *
563 * poll may block slightly longer than specified by timeout due to OS
564 * timer granularity and OS scheduling. Due to its implementation, the
565 * poll call can also return early if the remote end of the page sends
566 * a free page while polling (which is put on that pipe's freelist),
567 * while that pipe is polled for e.g Reading. The status of the pipe is
568 * indicated correctly in revents, and the caller can simply poll
569 * again. (The reason this is done this way is because it helps to
570 * replenish the pool of free pages and queue busy pages without
571 * blocking.)
572 *
573 * Here's a piece of example code waiting on two pipes; if they become
574 * readable they are read:
575 * @code
576 * #include <unistd.h>
577 * #include <cstdlib>
578 * #include <string>
579 * #include <sstream>
580 * #include <iostream>
581 *
582 * #include "BidirMMapPipe.h"
583 *
584 * // what to execute in the child
585 * int randomchild(BidirMMapPipe& pipe)
586 * {
587 * ::srand48(::getpid());
588 * for (int i = 0; i < 5; ++i) {
589 * // sleep a random time between 0 and .9 seconds
590 * ::usleep(int(1e6 * ::drand48()));
591 * std::ostringstream buf;
592 * buf << "child pid " << ::getpid() << " sends message " << i;
593 * std::cout << "[CHILD] : " << buf.str() << std::endl;
594 * pipe << buf.str() << BidirMMapPipe::flush;
595 * if (!pipe) return -1;
596 * if (pipe.eof()) break;
597 * }
598 * // tell parent we're done
599 * pipe << "" << BidirMMapPipe::flush;
600 * // wait for parent to acknowledge
601 * std::string s;
602 * pipe >> s;
603 * pipe.close();
604 * return 0;
605 * }
606 *
607 * // function to spawn a child
608 * BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
609 * {
610 * BidirMMapPipe *p = new BidirMMapPipe();
611 * if (p->isChild()) {
612 * int retVal = childexec(*p);
613 * delete p;
614 * std::exit(retVal);
615 * }
616 * return p;
617 * }
618 *
619 * int main()
620 * {
621 * typedef BidirMMapPipe::PollEntry PollEntry;
622 * // poll data structure
623 * BidirMMapPipe::PollVector pipes;
624 * pipes.reserve(3);
625 * // spawn children
626 * for (int i = 0; i < 3; ++i) {
627 * pipes.push_back(PollEntry(spawnChild(randomchild),
628 * BidirMMapPipe::Readable));
629 * }
630 * // while at least some children alive
631 * while (!pipes.empty()) {
632 * // poll, wait until status change (infinite timeout)
633 * int npipes = BidirMMapPipe::poll(pipes, -1);
634 * // scan for pipes with changed status
635 * for (std::vector<PollEntry>::iterator it = pipes.begin();
636 * npipes && pipes.end() != it; ) {
637 * if (!it->revents) {
638 * // unchanged, next one
639 * ++it;
640 * continue;
641 * }
642 * --npipes; // maybe we can stop early...
643 * // read from pipes which are readable
644 * if (it->revents & BidirMMapPipe::Readable) {
645 * std::string s;
646 * *(it->pipe) >> s;
647 * if (!s.empty()) {
648 * std::cout << "[PARENT]: Read from pipe " <<
649 * it->pipe << ": " << s << std::endl;
650 * ++it;
651 * continue;
652 * } else {
653 * // child is shutting down...
654 * *(it->pipe) << "" << BidirMMapPipe::flush;
655 * goto childcloses;
656 * }
657 * }
658 * // retire pipes with error or end-of-file condition
659 * if (it->revents & (BidirMMapPipe::Error |
660 * BidirMMapPipe::EndOfFile |
661 * BidirMMapPipe::Invalid)) {
662 * std::cout << "[PARENT]: Error on pipe " <<
663 * it->pipe << " revents " << it->revents <<
664 * std::endl;
665 * childcloses:
666 * std::cout << "[PARENT]:\tchild exit status: " <<
667 * it->pipe->close() << std::endl;
668 * if (retVal) return retVal;
669 * delete it->pipe;
670 * it = pipes.erase(it);
671 * continue;
672 * }
673 * }
674 * }
675 * return 0;
676 * }
677 * @endcode
678 */
679 static int poll(PollVector& pipes, int timeout);
680
681 /** @brief return if this end of the pipe is the parent end
682 *
683 * @returns true if parent end of pipe
684 */
685 bool isParent() const { return m_childPid; }
686
687 /** @brief return if this end of the pipe is the child end
688 *
689 * @returns true if child end of pipe
690 */
691 bool isChild() const { return !m_childPid; }
692
693 /** @brief if BidirMMapPipe uses a socketpair for communications
694 *
695 * @returns true if BidirMMapPipe uses a socketpair for communications
696 */
697 bool usesSocketpair() const { return m_inpipe == m_outpipe; }
698
699 /** @brief if BidirMMapPipe uses a pipe pair for communications
700 *
701 * @returns true if BidirMMapPipe uses a pipe pair for communications
702 */
703 bool usesPipepair() const { return m_inpipe != m_outpipe; }
704
705 /** @brief return flags (end of file, BidirMMapPipe closed, ...)
706 *
707 * @returns flags (end of file, BidirMMapPipe closed, ...)
708 */
709 int rdstate() const { return m_flags; }
710
711 /** @brief true if end-of-file
712 *
713 * @returns true if end-of-file
714 */
715 bool eof() const { return m_flags & eofbit; }
716
717 /** @brief logical failure (e.g. I/O on closed BidirMMapPipe)
718 *
719 * @returns true in case of grave logical error (I/O on closed pipe,...)
720 */
721 bool fail() const { return m_flags & failbit; }
722
723 /** @brief true on I/O error
724 *
725 * @returns true on I/O error
726 */
727 bool bad() const { return m_flags & badbit; }
728
729 /** @brief status of stream is good
730 *
731 * @returns true if pipe is good (no errors, eof, ...)
732 */
733 bool good() const { return !(m_flags & (eofbit | failbit | badbit)); }
734
735 /** @brief true if closed
736 *
737 * @returns true if stream is closed
738 */
739 bool closed() const { return m_flags & failbit; }
740
741 /** @brief return true if not serious error (fail/bad)
742 *
743 * @returns true if stream is does not have serious error (fail/bad)
744 *
745 * (if EOF, this is still true)
746 */
747 operator bool() const { return !fail() && !bad(); }
748
749 /** @brief return true if serious error (fail/bad)
750 *
751 * @returns true if stream has a serious error (fail/bad)
752 */
753 bool operator!() const { return fail() || bad(); }
754
755#ifdef STREAMOP
756#undef STREAMOP
757#endif
758#define STREAMOP(TYPE) \
759 BidirMMapPipe& operator<<(const TYPE& val) \
760 { write(&val, sizeof(TYPE)); return *this; } \
761 BidirMMapPipe& operator>>(TYPE& val) \
762 { read(&val, sizeof(TYPE)); return *this; }
763 STREAMOP(bool); ///< C++ style stream operators for bool
764 STREAMOP(char); ///< C++ style stream operators for char
765 STREAMOP(short); ///< C++ style stream operators for short
766 STREAMOP(int); ///< C++ style stream operators for int
767 STREAMOP(long); ///< C++ style stream operators for long
768 STREAMOP(long long); ///< C++ style stream operators for long long
769 STREAMOP(unsigned char); ///< C++ style stream operators for unsigned char
770 STREAMOP(unsigned short); ///< C++ style stream operators for unsigned short
771 STREAMOP(unsigned int); ///< C++ style stream operators for unsigned int
772 STREAMOP(unsigned long); ///< C++ style stream operators for unsigned long
773 STREAMOP(unsigned long long); ///< C++ style stream operators for unsigned long long
774 STREAMOP(float); ///< C++ style stream operators for float
775 STREAMOP(double); ///< C++ style stream operators for double
776#undef STREAMOP
777
778 /** @brief write a C-style string
779 *
780 * @param str C-style string
781 * @returns pipe written to
782 */
783 BidirMMapPipe& operator<<(const char* str);
784
785 /** @brief read a C-style string
786 *
787 * @param str pointer to string (space allocated with malloc!)
788 * @returns pipe read from
789 *
790 * since this is for C-style strings, we use malloc/realloc/free for
791 * strings. passing in a nullptr pointer is valid here, and the routine
792 * will use realloc to allocate a chunk of memory of the right size.
793 */
794 BidirMMapPipe& operator>>(char* (&str));
795
796 /** @brief write a std::string object
797 *
798 * @param str string to write
799 * @returns pipe written to
800 */
801 BidirMMapPipe& operator<<(const std::string& str);
802
803 /** @brief read a std::string object
804 *
805 * @param str string to be read
806 * @returns pipe read from
807 */
808 BidirMMapPipe& operator>>(std::string& str);
809
810 /** @brief write raw pointer to T to other side
811 *
812 * NOTE: This will not write the pointee! Only the value of the
813 * pointer is transferred.
814 *
815 * @param tptr pointer to be written
816 * @returns pipe written to
817 */
818 template<class T> BidirMMapPipe& operator<<(const T* tptr)
819 { write(&tptr, sizeof(tptr)); return *this; }
820
821 /** @brief read raw pointer to T from other side
822 *
823 * NOTE: This will not read the pointee! Only the value of the
824 * pointer is transferred.
825 *
826 * @param tptr pointer to be read
827 * @returns pipe read from
828 */
829 template<class T> BidirMMapPipe& operator>>(T* &tptr)
830 { read(&tptr, sizeof(tptr)); return *this; }
831
832 /** @brief I/O manipulator support
833 *
834 * @param manip manipulator
835 * @returns pipe with manipulator applied
836 *
837 * example:
838 * @code
839 * pipe << BidirMMapPipe::flush;
840 * @endcode
841 */
842 BidirMMapPipe& operator<<(BidirMMapPipe& (*manip)(BidirMMapPipe&))
843 { return manip(*this); }
844
845 /** @brief I/O manipulator support
846 *
847 * @param manip manipulator
848 * @returns pipe with manipulator applied
849 *
850 * example:
851 * @code
852 * pipe >> BidirMMapPipe::purge;
853 * @endcode
854 */
855 BidirMMapPipe& operator>>(BidirMMapPipe& (*manip)(BidirMMapPipe&))
856 { return manip(*this); }
857
858 /// for usage a la "pipe << flush;"
859 static BidirMMapPipe& flush(BidirMMapPipe& pipe) { pipe.flush(); return pipe; }
860 /// for usage a la "pipe << purge;"
861 static BidirMMapPipe& purge(BidirMMapPipe& pipe) { pipe.purge(); return pipe; }
862
863 private:
864 /// copy-construction forbidden
865 BidirMMapPipe(const BidirMMapPipe&);
866 /// assignment forbidden
867 BidirMMapPipe& operator=(const BidirMMapPipe&) { return *this; }
868
869 /// page is our friend
870 friend class BidirMMapPipe_impl::Page;
871 /// convenience typedef for Page
872 typedef BidirMMapPipe_impl::Page Page;
873
874 /// tuning constants
875 enum {
876 // TotPages = 16 will give 32k buffers at 4k page size for both
877 // parent and child; if your average message to send is larger
878 // than this, consider raising the value (max 256)
879 TotPages = 16, ///< pages shared (child + parent)
880
881 PagesPerEnd = TotPages / 2, ///< pages per pipe end
882
883 // if FlushThresh pages are filled, the code forces a flush; 3/4
884 // of the pages available seems to work quite well
885 FlushThresh = (3 * PagesPerEnd) / 4 ///< flush threshold
886 };
887
888 // per-class members
889 static pthread_mutex_t s_openpipesmutex; ///< protects s_openpipes
890 /// list of open BidirMMapPipes
891 static std::list<BidirMMapPipe*> s_openpipes;
892 /// pool of mmapped pages
893 static BidirMMapPipe_impl::PagePool* s_pagepool;
894 /// page pool reference counter
895 static unsigned s_pagepoolrefcnt;
896 /// debug flag
897 static int s_debugflag;
898
899 /// return page pool
900 static BidirMMapPipe_impl::PagePool& pagepool();
901
902 // per-instance members
903 BidirMMapPipe_impl::Pages m_pages; ///< mmapped pages
904 Page* m_busylist; ///< linked list: busy pages (data to be read)
905 Page* m_freelist; ///< linked list: free pages
906 Page* m_dirtylist; ///< linked list: dirty pages (data to be sent)
907 int m_inpipe; ///< pipe end from which data may be read
908 int m_outpipe; ///< pipe end to which data may be written
909 int m_flags; ///< flags (e.g. end of file)
910 pid_t m_childPid; ///< pid of the child (zero if we're child)
911 pid_t m_parentPid; ///< pid of the parent
912
913 /// cleanup routine - at exit, we want our children to get a SIGTERM...
914 static void teardownall(void);
915
916 /// return length of a page list
917 static unsigned lenPageList(const Page* list);
918
919 /** "feed" the busy and free lists with a list of pages
920 *
921 * @param plist linked list of pages
922 *
923 * goes through plist, puts free pages from plist onto the freelist
924 * (or sends them to the remote end if they belong there), and puts
925 * non-empty pages on plist onto the busy list
926 */
927 void feedPageLists(Page* plist);
928
929 /// put on dirty pages list
930 void markPageDirty(Page* p);
931
932 /// transfer bytes through the pipe (reading, writing, may block)
933 static size_type xferraw(int fd, void* addr, size_type len,
934 ssize_t (*xferfn)(int, void*, std::size_t));
935 /// transfer bytes through the pipe (reading, writing, may block)
936 static size_type xferraw(int fd, void* addr, const size_type len,
937 ssize_t (*xferfn)(int, const void*, std::size_t))
938 {
939 return xferraw(fd, addr, len,
940 reinterpret_cast<ssize_t (*)(
941 int, void*, std::size_t)>(xferfn));
942 }
943
944 /** @brief send page(s) to the other end (may block)
945 *
946 * @param plist linked list of pages to send
947 *
948 * the implementation gathers the different write(s) wherever
949 * possible; if mmap works, this results in a single write to transfer
950 * the list of pages sent, if we need to copy things through the pipe,
951 * we have one write to transfer which pages are sent, and then one
952 * write per page.
953 */
954 void sendpages(Page* plist);
955
956 /** @brief receive a pages from the other end (may block), queue them
957 *
958 * @returns number of pages received
959 *
960 * this is an application-level scatter read, which gets the list of
961 * pages to read from the pipe. if mmap works, it needs only one read
962 * call (to get the head of the list of pages transferred). if we need
963 * to copy pages through the pipe, we need to add one read for each
964 * empty page, and two reads for each non-empty page.
965 */
966 unsigned recvpages();
967
968 /** @brief receive pages from other end (non-blocking)
969 *
970 * @returns number of pages received
971 *
972 * like recvpages(), but does not block if nothing is available for
973 * reading
974 */
975 unsigned recvpages_nonblock();
976
977 /// get a busy page to read data from (may block)
978 Page* busypage();
979 /// get a dirty page to write data to (may block)
980 Page* dirtypage();
981
982 /// close the pipe (no flush if forced)
983 int doClose(bool force, bool holdlock = false);
984 /// perform the flush
985 void doFlush(bool forcePartialPages = true);
986#endif //_WIN32
987};
988
989END_NAMESPACE_ROOFIT
990
991#undef BEGIN_NAMESPACE_ROOFIT
992#undef END_NAMESPACE_ROOFIT
993
994#endif // BIDIRMMAPPIPE_H
995
996// vim: ft=cpp:sw=4:tw=78:et
997
998/// \endcond
ROOT::R::TRInterface & Exception()
Definition Exception.C:4
TBuffer & operator<<(TBuffer &buf, const Tmpl *obj)
Definition TBuffer.h:399
TBuffer & operator>>(TBuffer &buf, Tmpl *&obj)
Definition TBuffer.h:383
TCut operator!(const TCut &rhs)
Logical negation.
Definition TCut.cxx:292
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:185
winID h TVirtualViewer3D TVirtualGLPainter p
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
Binding & operator=(OUT(*fun)(void))
void swap(RDirectoryEntry &e1, RDirectoryEntry &e2) noexcept
void Copy(void *source, void *dest)