yqueue 和 ypipe



 //  Individual memory chunk to hold N elements.
// Individual memory chunk to hold N elements.
struct chunk_t
T values [N];
chunk_t *prev;
chunk_t *next;
}; // Back position may point to invalid memory if the queue is empty,
// while begin & end positions are always valid. Begin position is
// accessed exclusively be queue reader (front/pop), while back and
// end positions are accessed exclusively by queue writer (back/push).
chunk_t *begin_chunk;
int begin_pos;
chunk_t *back_chunk;
int back_pos;
chunk_t *end_chunk;
int end_pos; // People are likely to produce and consume at similar rates. In
// this scenario holding onto the most recently freed chunk saves
// us from having to call malloc/free.
atomic_ptr_t<chunk_t> spare_chunk;



        //  Adds an element to the back end of the queue.
inline void push ()
back_chunk = end_chunk;
back_pos = end_pos; if (++end_pos != N)
return; chunk_t *sc = spare_chunk.xchg (NULL);
if (sc) {
end_chunk->next = sc;
sc->prev = end_chunk;
} else {
end_chunk->next = (chunk_t*) malloc (sizeof (chunk_t));
alloc_assert (end_chunk->next);
end_chunk->next->prev = end_chunk;
end_chunk = end_chunk->next;
end_pos = 0;


    template <typename T> class ypipe_base_t
virtual ~ypipe_base_t () {}
virtual void write (const T &value_, bool incomplete_) = 0;
virtual bool unwrite (T *value_) = 0;
virtual bool flush () = 0;
virtual bool check_read () = 0;
virtual bool read (T *value_) = 0;
virtual bool probe (bool (*fn)(const T &)) = 0;


       //  Allocation-efficient queue to store pipe items.
// Front of the queue points to the first prefetched item, back of
// the pipe points to last un-flushed item. Front is used only by
// reader thread, while back is used only by writer thread.
yqueue_t <T, N> queue; // Points to the first un-flushed item. This variable is used
// exclusively by writer thread.
T *w; // Points to the first un-prefetched item. This variable is used
// exclusively by reader thread.
T *r; // Points to the first item to be flushed in the future.
T *f; // The single point of contention between writer and reader thread.
// Points past the last flushed item. If it is NULL,
// reader is asleep. This pointer should be always accessed using
// atomic operations.
atomic_ptr_t <T> c;


        //  Initialises the pipe.
inline ypipe_t ()
// Insert terminator element into the queue.
queue.push (); // Let all the pointers to point to the terminator.
// (unless pipe is dead, in which case c is set to NULL).
r = w = f = &queue.back ();
c.set (&queue.back ());


       //  Write an item to the pipe.  Don't flush it yet. If incomplete is
// set to true the item is assumed to be continued by items
// subsequently written to the pipe. Incomplete items are never
// flushed down the stream.
inline void write (const T &value_, bool incomplete_)
// Place the value to the queue, add new terminator element.
queue.back () = value_;
queue.push (); // Move the "flush up to here" poiter.
if (!incomplete_)
f = &queue.back ();
} // Pop an incomplete item from the pipe. Returns true is such
// item exists, false otherwise.
inline bool unwrite (T *value_)
if (f == &queue.back ())
return false;
queue.unpush ();
*value_ = queue.back ();
return true;


        //  Flush all the completed items into the pipe. Returns false if
// the reader thread is sleeping. In that case, caller is obliged to
// wake the reader up before using the pipe again.
inline bool flush ()
// If there are no un-flushed items, do nothing.
if (w == f)
return true; // Try to set 'c' to 'f'.
if (c.cas (w, f) != w) { // Compare-and-swap was unseccessful because 'c' is NULL.
// This means that the reader is asleep. Therefore we don't
// care about thread-safeness and update c in non-atomic
// manner. We'll return false to let the caller know
// that reader is sleeping.
c.set (f);
w = f;
return false;
} // Reader is alive. Nothing special to do now. Just move
// the 'first un-flushed item' pointer to 'f'.
w = f;
return true;



        //  Check whether item is available for reading.
inline bool check_read ()
// Was the value prefetched already? If so, return.
if (&queue.front () != r && r)
return true; // There's no prefetched value, so let us prefetch more values.
// Prefetching is to simply retrieve the
// pointer from c in atomic fashion. If there are no
// items to prefetch, set c to NULL (using compare-and-swap).
r = c.cas (&queue.front (), NULL); // If there are no elements prefetched, exit.
// During pipe's lifetime r should never be NULL, however,
// it can happen during pipe shutdown when items
// are being deallocated.
if (&queue.front () == r || !r)
return false; // There was at least one value prefetched.
return true;
} // Reads an item from the pipe. Returns false if there is no value.
// available.
inline bool read (T *value_)
// Try to prefetch a value.
if (!check_read ())
return false; // There was at least one value prefetched.
// Return it to the caller.
*value_ = queue.front ();
queue.pop ();
return true;



dbuffer_t 和 ypipe_conflate_t






        //  Underlying pipes for both directions.
upipe_t *inpipe;
upipe_t *outpipe; // Can the pipe be read from / written to? bool in_active;
bool out_active; // High watermark for the outbound pipe.
int hwm; // Low watermark for the inbound pipe.
int lwm; // Number of messages read and written so far.
uint64_t msgs_read;
uint64_t msgs_written; // Last received peer's msgs_read. The actual number in the peer
// can be higher at the moment.
uint64_t peers_msgs_read; // The pipe object on the other side of the pipepair.
pipe_t *peer; // Sink to send events to.
i_pipe_events *sink; // States of the pipe endpoint:
// active: common state before any termination begins,
// delimiter_received: delimiter was read from pipe before
// term command was received,
// waiting_fo_delimiter: term command was already received
// from the peer but there are still pending messages to read,
// term_ack_sent: all pending messages were already read and
// all we are waiting for is ack from the peer,
// term_req_sent1: 'terminate' was explicitly called by the user,
// term_req_sent2: user called 'terminate' and then we've got
// term command from the peer as well.
enum {
} state; // If true, we receive all the pending inbound messages before
// terminating. If false, we terminate immediately when the peer
// asks us to.
bool delay; // Identity of the writer. Used uniquely by the reader side.
blob_t identity; // Pipe's credential.
blob_t credential;
const bool conflate;


消息的发送机制会在接下来的章节中分析。i_pipe_events 是一个抽象类:

    struct i_pipe_events
virtual ~i_pipe_events () {}
virtual void read_activated (zmq::pipe_t *pipe_) = 0;
virtual void write_activated (zmq::pipe_t *pipe_) = 0;
virtual void hiccuped (zmq::pipe_t *pipe_) = 0;
virtual void pipe_terminated (zmq::pipe_t *pipe_) = 0;



