[lttng-dev] [PATCH 06/16] wfcqueue: implement mutex-free splice
Mathieu Desnoyers
mathieu.desnoyers at efficios.com
Thu Nov 22 13:54:55 EST 2012
* Lai Jiangshan (laijs at cn.fujitsu.com) wrote:
> On 11/21/2012 11:18 PM, Mathieu Desnoyers wrote:
> > * Lai Jiangshan (laijs at cn.fujitsu.com) wrote:
> >> On 11/21/2012 03:40 AM, Mathieu Desnoyers wrote:
> >>> A carefully crafted splice operation does not need to use an external
> >>> mutex to synchronize against other splice operations.
> >>>
> >>> The trick is atomically exchange the head next pointer with
> >>> NULL. If the pointer we replaced was NULL, it means the queue was
> >>> possibly empty. If head next was not NULL, by setting head to NULL, we
> >>> ensure that concurrent splice operations are going to see an empty
> >>> queue, even if concurrent enqueue operations move tail further. This
> >>> means that as long as we are within splice, after setting head to NULL,
> >>> but before moving tail back to head, concurrent splice operations will
> >>> always see an empty queue, therefore acting as mutual exclusion.
> >>>
> >>> If exchange returns a NULL head, we confirm that it was indeed empty by
> >>> checking if the tail pointer points to the head node, busy-waiting if
> >>> necessary.
> >>>
> >>> Then the last step is to move the tail pointer to head. At that point,
> >>> enqueuers are going to start enqueuing at head again, and other splice
> >>> operations will be able to proceed.
> >>>
> >>> Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> >>> ---
> >>> urcu/static/wfcqueue.h | 68 ++++++++++++++++++++++++++++++++++++------------
> >>> urcu/wfcqueue.h | 40 ++++++++++++++++++----------
> >>> wfcqueue.c | 2 +-
> >>> 3 files changed, 79 insertions(+), 31 deletions(-)
> >>>
> >>> diff --git a/urcu/static/wfcqueue.h b/urcu/static/wfcqueue.h
> >>> index 8774c03..4b2de50 100644
> >>> --- a/urcu/static/wfcqueue.h
> >>> +++ b/urcu/static/wfcqueue.h
> >>> @@ -46,15 +46,30 @@ extern "C" {
> >>> * half-wait-free/half-blocking queue implementation done by Paul E.
> >>> * McKenney.
> >>> *
> >>> - * Mutual exclusion of __cds_wfcq_* API
> >>> - *
> >>> - * Unless otherwise stated, the caller must ensure mutual exclusion of
> >>> - * queue update operations "dequeue" and "splice" (for source queue).
> >>> - * Queue read operations "first" and "next", which are used by
> >>> - * "for_each" iterations, need to be protected against concurrent
> >>> - * "dequeue" and "splice" (for source queue) by the caller.
> >>> - * "enqueue", "splice" (for destination queue), and "empty" are the only
> >>> - * operations that can be used without any mutual exclusion.
> >>> + * Mutual exclusion of cds_wfcq_* / __cds_wfcq_* API
> >>> + *
> >>> + * Synchronization table:
> >>> + *
> >>> + * External synchronization techniques described in the API below is
> >>> + * required between pairs marked with "X". No external synchronization
> >>> + * required between pairs marked with "-".
> >>> + *
> >>> + * Legend:
> >>> + * [1] cds_wfcq_enqueue
> >>> + * [2] __cds_wfcq_splice (destination queue)
> >>> + * [3] __cds_wfcq_dequeue
> >>> + * [4] __cds_wfcq_splice (source queue)
> >>> + * [5] __cds_wfcq_first
> >>> + * [6] __cds_wfcq_next
> >>> + *
> >>> + * [1] [2] [3] [4] [5] [6]
> >>> + * [1] - - - - - -
> >>> + * [2] - - - - - -
> >>> + * [3] - - X X X X
> >>> + * [4] - - X - X X
> >>> + * [5] - - X X - -
> >>> + * [6] - - X X - -
> >>> + *
> >>> * Mutual exclusion can be ensured by holding cds_wfcq_dequeue_lock().
> >>> *
> >>> * For convenience, cds_wfcq_dequeue_blocking() and
> >>> @@ -399,6 +414,16 @@ ___cds_wfcq_dequeue_nonblocking(struct cds_wfcq_head *head,
> >>> return ___cds_wfcq_dequeue(head, tail, 0);
> >>> }
> >>>
> >>> +/*
> >>> + * __cds_wfcq_splice: enqueue all src_q nodes at the end of dest_q.
> >>> + *
> >>> + * Dequeue all nodes from src_q.
> >>> + * dest_q must be already initialized.
> >>> + * Mutual exclusion for src_q should be ensured by the caller as
> >>> + * specified in the "Synchronisation table".
> >>> + * Returns enum cds_wfcq_ret which indicates the state of the src or
> >>> + * dest queue.
> >>> + */
> >>> static inline enum cds_wfcq_ret
> >>> ___cds_wfcq_splice(
> >>> struct cds_wfcq_head *dest_q_head,
> >>> @@ -408,14 +433,26 @@ ___cds_wfcq_splice(
> >>> int blocking)
> >>> {
> >>> struct cds_wfcq_node *head, *tail;
> >>> + int attempt = 0;
> >>
> >> +again:
> >>
> >>>
> >>> if (_cds_wfcq_empty(src_q_head, src_q_tail))
> >>> return CDS_WFCQ_RET_SRC_EMPTY;
> >>>
> >>> - head = ___cds_wfcq_node_sync_next(&src_q_head->node, blocking);
> >>> - if (head == CDS_WFCQ_WOULDBLOCK)
> >>> - return CDS_WFCQ_RET_WOULDBLOCK;
> >>> - _cds_wfcq_node_init(&src_q_head->node);
> >>> + for (;;) {
> >>> + head = uatomic_xchg(&src_q_head->node.next, NULL);
> >>> + if (head)
> >>> + break; /* non-empty */
> >>> + if (CMM_LOAD_SHARED(src_q_tail->p) == &src_q_head->node)
> >>> + return CDS_WFCQ_RET_SRC_EMPTY;
> >>> + if (!blocking)
> >>> + return CDS_WFCQ_RET_WOULDBLOCK;
> >>> + if (++attempt >= WFCQ_ADAPT_ATTEMPTS) {
> >>> + poll(NULL, 0, WFCQ_WAIT); /* Wait for 10ms */
> >>> + attempt = 0;
> >>> + } else {
> >>> + caa_cpu_relax();
> >>> + }
> >>> + }
> >>
> >>
> >> Is it OK:
> >>
> >> - _cds_wfcq_node_init(&src_q_head->node);
> >> + head = uatomic_xchg(&src_q_head->node.next, NULL);
> >> + if (!head)
> >> + goto again;
> >
> > You are right that we can simplify the code a bit by re-using
> > _cds_wfcq_empty() to test validate emptiness of the source queue, rather
> > than open-code it.
> >
> > The only issue here is that the busy-loop (goto again) will not invoke
> > caa_cpu_relax(), nor do adaptative waiting like
> > ___cds_wfcq_node_sync_next() normally does.
>
> Don't need, it will re-enter ___cds_wfcq_node_sync_next() to do it.
The ___cds_wfcq_node_sync_next is actually removed. It is replaced by
the xchg of the head's next pointer.
>
> > Also, it does not check for
> > blocking/non-blocking caller.
>
> we can add code to check it.
>
> but:
>
> for () {
> see src_q_head->node.next is not NULL
> xchg fail
> }
>
> Is this a kind of blocking?
Not sure what your code snippet does, but if there is any way that we
can have to busy-loop while we are in an intermediate transient state,
then we could have to block.
>
>
> > Also, whenever possible, I like to have
> > for () or do/while constructs in place to make it clear that we can loop.
> > So a modification of your proposal would look like:
> >
> > /* Return 1 if nonblocking and needs to block, 0 otherwise */
> > static inline
> > bool ___cds_wfcq_busy_wait(int *attempt, int blocking)
> > {
> > if (!blocking)
> > return 1;
> > if (+attempt >= WFCQ_ADAPT_ATTEMPTS) {
> > poll(NULL, 0, WFCQ_WAIT); /* Wait for 10ms */
> > attempt = 0;
> > } else {
> > caa_cpu_relax();
> > }
> > return 0;
> > }
> >
> > [...]
> >
> > struct cds_wfcq_node *head, *tail;
> > int attempt = 0;
> >
> > for (;;) {
> > if (_cds_wfcq_empty(src_q_head, src_q_tail))
> > return CDS_WFCQ_RET_SRC_EMPTY;
> > head = uatomic_xchg(&src_q_head->node.next, NULL);
> > if (head)
> > break;
> > if (___cds_wfcq_busy_wait(&attempt, blocking))
> > return CDS_WFCQ_RET_WOULDBLOCK;
>
> return _cds_wfcq_empty(src_q_head, src_q_tail) ? CDS_WFCQ_RET_SRC_EMPTY : CDS_WFCQ_RET_WOULDBLOCK
I'm not sure it really buys us anything semantically: we already checked
that the queue was non-empty, then xchg returns that head next pointer
is NULL, so we have seen an null head next, but with tail not pointing
to the head node, so we are in a state that could make us busy-wait. So
rather than checking again if the queue is still in a transient state,
we can return CDS_WFCQ_RET_WOULDBLOCK immediately, no ? The only
advantage of checking it again is to catch a few cases where we would
not have to block, but I'm not convinced this is really useful.
Thoughts ?
Thanks,
Mathieu
>
> > }
> >
> >
> > Thoughts ?
> >
> > Thanks,
> >
> > Mathieu
> >
> >
> >>
> >>>
> >>> /*
> >>> * Memory barrier implied before uatomic_xchg() orders store to
> >>> @@ -435,14 +472,13 @@ ___cds_wfcq_splice(
> >>> return CDS_WFCQ_RET_DEST_EMPTY;
> >>> }
> >>>
> >>> -
> >>> /*
> >>> * __cds_wfcq_splice_blocking: enqueue all src_q nodes at the end of dest_q.
> >>> *
> >>> * Dequeue all nodes from src_q.
> >>> * dest_q must be already initialized.
> >>> - * Dequeue/splice/iteration mutual exclusion for src_q should be ensured
> >>> - * by the caller.
> >>> + * Mutual exclusion for src_q should be ensured by the caller as
> >>> + * specified in the "Synchronisation table".
> >>> * Returns enum cds_wfcq_ret which indicates the state of the src or
> >>> * dest queue. Never returns CDS_WFCQ_RET_WOULDBLOCK.
> >>> */
> >>> diff --git a/urcu/wfcqueue.h b/urcu/wfcqueue.h
> >>> index ddf6b87..d9ec534 100644
> >>> --- a/urcu/wfcqueue.h
> >>> +++ b/urcu/wfcqueue.h
> >>> @@ -46,7 +46,7 @@ extern "C" {
> >>> #define CDS_WFCQ_WOULDBLOCK ((void *) -1UL)
> >>>
> >>> enum cds_wfcq_ret {
> >>> - CDS_WFCQ_RET_WOULDBLOCK = -1,
> >>> + CDS_WFCQ_RET_WOULDBLOCK = -1,
> >>> CDS_WFCQ_RET_DEST_EMPTY = 0,
> >>> CDS_WFCQ_RET_DEST_NON_EMPTY = 1,
> >>> CDS_WFCQ_RET_SRC_EMPTY = 2,
> >>> @@ -110,13 +110,28 @@ struct cds_wfcq_tail {
> >>> /*
> >>> * Mutual exclusion of cds_wfcq_* / __cds_wfcq_* API
> >>> *
> >>> - * Unless otherwise stated, the caller must ensure mutual exclusion of
> >>> - * queue update operations "dequeue" and "splice" (for source queue).
> >>> - * Queue read operations "first" and "next", which are used by
> >>> - * "for_each" iterations, need to be protected against concurrent
> >>> - * "dequeue" and "splice" (for source queue) by the caller.
> >>> - * "enqueue", "splice" (for destination queue), and "empty" are the only
> >>> - * operations that can be used without any mutual exclusion.
> >>> + * Synchronization table:
> >>> + *
> >>> + * External synchronization techniques described in the API below is
> >>> + * required between pairs marked with "X". No external synchronization
> >>> + * required between pairs marked with "-".
> >>> + *
> >>> + * Legend:
> >>> + * [1] cds_wfcq_enqueue
> >>> + * [2] __cds_wfcq_splice (destination queue)
> >>> + * [3] __cds_wfcq_dequeue
> >>> + * [4] __cds_wfcq_splice (source queue)
> >>> + * [5] __cds_wfcq_first
> >>> + * [6] __cds_wfcq_next
> >>> + *
> >>> + * [1] [2] [3] [4] [5] [6]
> >>> + * [1] - - - - - -
> >>> + * [2] - - - - - -
> >>> + * [3] - - X X X X
> >>> + * [4] - - X - X X
> >>> + * [5] - - X X - -
> >>> + * [6] - - X X - -
> >>> + *
> >>> * Mutual exclusion can be ensured by holding cds_wfcq_dequeue_lock().
> >>> *
> >>> * For convenience, cds_wfcq_dequeue_blocking() and
> >>> @@ -231,13 +246,10 @@ extern struct cds_wfcq_node *__cds_wfcq_dequeue_nonblocking(
> >>> *
> >>> * Dequeue all nodes from src_q.
> >>> * dest_q must be already initialized.
> >>> - * Content written into the node before enqueue is guaranteed to be
> >>> - * consistent, but no other memory ordering is ensured.
> >>> - * Dequeue/splice/iteration mutual exclusion for src_q should be ensured
> >>> - * by the caller.
> >>> - *
> >>> + * Mutual exclusion for src_q should be ensured by the caller as
> >>> + * specified in the "Synchronisation table".
> >>> * Returns enum cds_wfcq_ret which indicates the state of the src or
> >>> - * dest queue. Cannot block.
> >>> + * dest queue. Never returns CDS_WFCQ_RET_WOULDBLOCK.
> >>> */
> >>> extern enum cds_wfcq_ret __cds_wfcq_splice_blocking(
> >>> struct cds_wfcq_head *dest_q_head,
> >>> diff --git a/wfcqueue.c b/wfcqueue.c
> >>> index 207df95..ab0eb93 100644
> >>> --- a/wfcqueue.c
> >>> +++ b/wfcqueue.c
> >>> @@ -1,7 +1,7 @@
> >>> /*
> >>> * wfcqueue.c
> >>> *
> >>> - * Userspace RCU library - Concurrent queue with Wait-Free Enqueue/Blocking Dequeue
> >>> + * Userspace RCU library - Concurrent Queue with Wait-Free Enqueue/Blocking Dequeue
> >>> *
> >>> * Copyright 2010-2012 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> >>> * Copyright 2011-2012 - Lai Jiangshan <laijs at cn.fujitsu.com>
> >>
> >
>
--
Mathieu Desnoyers
Operating System Efficiency R&D Consultant
EfficiOS Inc.
http://www.efficios.com
More information about the lttng-dev
mailing list