[lttng-dev] [RFC PATCH] wfqueue: expand API, simplify implementation, small performance boost
Lai Jiangshan
eag0628 at gmail.com
Tue Aug 14 09:49:42 EDT 2012
On Tue, Aug 14, 2012 at 4:12 AM, Mathieu Desnoyers
<mathieu.desnoyers at efficios.com> wrote:
> Hi Lai,
>
> * Lai Jiangshan (eag0628 at gmail.com) wrote:
>> On Sun, Aug 12, 2012 at 10:50 PM, Mathieu Desnoyers
>> <mathieu.desnoyers at efficios.com> wrote:
>> > This work is derived from the patch from Lai Jiangshan submitted as
>> > "urcu: new wfqueue implementation"
>> > (http://lists.lttng.org/pipermail/lttng-dev/2012-August/018379.html)
>> >
>> > Its changelog:
>> >
>> >> Some guys would be surprised by this fact:
>> >> There are already TWO implementations of wfqueue in urcu.
>> >>
>> >> The first one is in urcu/static/wfqueue.h:
>> >> 1) enqueue: exchange the tail and then update previous->next
>> >> 2) dequeue: wait for first node's next pointer and them shift, a dummy node
>> >> is introduced to avoid the queue->tail become NULL when shift.
>> >>
>> >> The second one shares some code with the first one, and the left code
>> >> are spreading in urcu-call-rcu-impl.h:
>> >> 1) enqueue: share with the first one
>> >> 2) no dequeue operation: and no shift, so it don't need dummy node,
>> >> Although the dummy node is queued when initialization, but it is removed
>> >> after the first dequeue_all operation in call_rcu_thread().
>> >> call_rcu_data_free() forgets to handle the dummy node if it is not removed.
>> >> 3)dequeue_all: record the old head and tail, and queue->head become the special
>> >> tail node.(atomic record the tail and change the tail).
>> >>
>> >> The second implementation's code are spreading, bad for review, and it is not
>> >> tested by tests/test_urcu_wfq.
>> >>
>> >> So we need a better implementation avoid the dummy node dancing and can service
>> >> both generic wfqueue APIs and dequeue_all API for call rcu.
>> >>
>> >> The new implementation:
>> >> 1) enqueue: share with the first one/original implementation.
>> >> 2) dequeue: shift when node count >= 2, cmpxchg when node count = 1.
>> >> no dummy node, save memory.
>> >> 3) dequeue_all: simply set queue->head.next to NULL, xchg the tail
>> >> and return the old head.next.
>> >>
>> >> More implementation details are in the code.
>> >> tests/test_urcu_wfq will be update in future for testing new APIs.
>> >
>> > The patch proposed by Lai brings a very interesting simplification to
>> > the single-node handling (which is kept here), and moves all queue
>> > handling code away from call_rcu implementation, back into the wfqueue
>> > code. This has the benefit to allow testing enhancements.
>> >
>> > I modified it so the API does not expose implementation details to the
>> > user (e.g. ___cds_wfq_node_sync_next). I added a "splice" operation and
>> > a for loop iterator which should allow wfqueue users to use the list
>> > very efficiently both from LGPL/GPL code and from non-LGPL-compatible
>> > code.
>> >
>> > Benchmarks performed on Intel(R) Core(TM) i7-3520M CPU @ 2.90GHz
>> > (dual-core, with hyperthreading)
>> >
>> > Benchmark invoked:
>> > test_urcu_wfq 2 2 10
>> >
>> > Only did 2 runs, but a small improvement seems to be clear for the
>> > dequeue speed:
>> >
>> > Before patch:
>> >
>> > testdur 10 nr_enqueuers 2 wdelay 0 nr_dequeuers 2 rdur 0 nr_enqueues 136251248 nr_dequeues 54694027 successful enqueues 136251248 successful dequeues 54693904 end_dequeues 81557344 nr_ops 190945275
>> > testdur 10 nr_enqueuers 2 wdelay 0 nr_dequeuers 2 rdur 0 nr_enqueues 137258881 nr_dequeues 54463340 successful enqueues 137258881 successful dequeues 54463238 end_dequeues 82795643 nr_ops 191722221
>> >
>> > After patch:
>> >
>> > testdur 10 nr_enqueuers 2 wdelay 0 nr_dequeuers 2 rdur 0 nr_enqueues 138589301 nr_dequeues 56911253 successful enqueues 138589301 successful dequeues 56910916 end_dequeues 81678385 nr_ops 195500554
>> > testdur 10 nr_enqueuers 2 wdelay 0 nr_dequeuers 2 rdur 0 nr_enqueues 139007622 nr_dequeues 57281502 successful enqueues 139007622 successful dequeues 57281348 end_dequeues 81726274 nr_ops 196289124
>> >
>> > Summary: Number of enqueues is slightly lower,
>>
>> ?!
>> I see the nr_enqueues and successful enqueues are both increased after
>> after patch.
>
> Oh, you're right. I wrote that summary in a hurry. Sorry about that,
> will fix.
>
>>
>> > probably due to higher
>> > dequeue rate. Number of dequeue increased. Respective rate change is
>> > within 1% (slowdown) for enqueue, 2% (performance improvement) for
>> > dequeue. Overall number of operations (dequeue+enqueue) increased with
>> > the patch.
>> >
>> > We can verify that:
>> > successful enqueues - successful dequeues = end_dequeues
>> >
>> > For all runs (ensures correctness: no lost node).
>> >
>> > CC: Lai Jiangshan <laijs at cn.fujitsu.com>
>> > CC: Paul McKenney <paulmck at linux.vnet.ibm.com>
>> > Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
>> > ---
>> > diff --git a/urcu-call-rcu-impl.h b/urcu-call-rcu-impl.h
>> > index 13b24ff..5363fe0 100644
>> > --- a/urcu-call-rcu-impl.h
>> > +++ b/urcu-call-rcu-impl.h
>> > @@ -21,6 +21,7 @@
>> > */
>> >
>> > #define _GNU_SOURCE
>> > +#define _LGPL_SOURCE
>> > #include <stdio.h>
>> > #include <pthread.h>
>> > #include <signal.h>
>> > @@ -220,10 +221,7 @@ static void call_rcu_wake_up(struct call_rcu_data *crdp)
>> > static void *call_rcu_thread(void *arg)
>> > {
>> > unsigned long cbcount;
>> > - struct cds_wfq_node *cbs;
>> > - struct cds_wfq_node **cbs_tail;
>> > - struct call_rcu_data *crdp = (struct call_rcu_data *)arg;
>> > - struct rcu_head *rhp;
>> > + struct call_rcu_data *crdp = (struct call_rcu_data *) arg;
>> > int rt = !!(uatomic_read(&crdp->flags) & URCU_CALL_RCU_RT);
>> > int ret;
>> >
>> > @@ -243,35 +241,29 @@ static void *call_rcu_thread(void *arg)
>> > cmm_smp_mb();
>> > }
>> > for (;;) {
>> > - if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) {
>> > - while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL)
>> > - poll(NULL, 0, 1);
>> > - _CMM_STORE_SHARED(crdp->cbs.head, NULL);
>> > - cbs_tail = (struct cds_wfq_node **)
>> > - uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head);
>> > + struct cds_wfq_queue cbs_tmp;
>> > + struct cds_wfq_node *cbs;
>> > +
>> > + cds_wfq_init(&cbs_tmp);
>> > + __cds_wfq_splice_blocking(&cbs_tmp, &crdp->cbs);
>> > + if (!cds_wfq_empty(&cbs_tmp)) {
>> > synchronize_rcu();
>> > cbcount = 0;
>> > - do {
>> > - while (cbs->next == NULL &&
>> > - &cbs->next != cbs_tail)
>> > - poll(NULL, 0, 1);
>> > - if (cbs == &crdp->cbs.dummy) {
>> > - cbs = cbs->next;
>> > - continue;
>> > - }
>> > - rhp = (struct rcu_head *)cbs;
>> > - cbs = cbs->next;
>> > + __cds_wfq_for_each_blocking(&cbs_tmp, cbs) {
>> > + struct rcu_head *rhp;
>> > +
>> > + rhp = caa_container_of(cbs,
>> > + struct rcu_head, next);
>> > rhp->func(rhp);
>>
>>
>> cbs is freed hear, but it will be used in __cds_wfq_next_blocking().
>> Introduce __cds_wfq_for_each_blocking_safe() ?
>
> Good point! Will do.
>
>>
>> > cbcount++;
>> > - } while (cbs != NULL);
>> > + }
>> > uatomic_sub(&crdp->qlen, cbcount);
>> > }
>> > if (uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOP)
>> > break;
>> > rcu_thread_offline();
>> > if (!rt) {
>> > - if (&crdp->cbs.head
>> > - == _CMM_LOAD_SHARED(crdp->cbs.tail)) {
>> > + if (cds_wfq_empty(&crdp->cbs)) {
>> > call_rcu_wait(crdp);
>> > poll(NULL, 0, 10);
>> > uatomic_dec(&crdp->futex);
>> > @@ -625,32 +617,32 @@ void call_rcu(struct rcu_head *head,
>> > */
>> > void call_rcu_data_free(struct call_rcu_data *crdp)
>> > {
>> > - struct cds_wfq_node *cbs;
>> > - struct cds_wfq_node **cbs_tail;
>> > - struct cds_wfq_node **cbs_endprev;
>> > -
>> > if (crdp == NULL || crdp == default_call_rcu_data) {
>> > return;
>> > }
>> > +
>> > if ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == 0) {
>> > uatomic_or(&crdp->flags, URCU_CALL_RCU_STOP);
>> > wake_call_rcu_thread(crdp);
>> > while ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == 0)
>> > poll(NULL, 0, 1);
>> > }
>> > - if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) {
>> > - while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL)
>> > - poll(NULL, 0, 1);
>> > - _CMM_STORE_SHARED(crdp->cbs.head, NULL);
>> > - cbs_tail = (struct cds_wfq_node **)
>> > - uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head);
>> > +
>> > + if (!cds_wfq_empty(&crdp->cbs)) {
>> > + struct cds_wfq_queue cbs_tmp;
>> > +
>> > + cds_wfq_init(&cbs_tmp);
>> > + __cds_wfq_splice_blocking(&cbs_tmp, &crdp->cbs);
>> > +
>> > /* Create default call rcu data if need be */
>> > (void) get_default_call_rcu_data();
>> > - cbs_endprev = (struct cds_wfq_node **)
>> > - uatomic_xchg(&default_call_rcu_data, cbs_tail);
>> > - *cbs_endprev = cbs;
>> > +
>> > + __cds_wfq_splice_blocking(&default_call_rcu_data->cbs,
>> > + &cbs_tmp);
>> > +
>>
>> Too much code to me, cbs_tmp is not required here.
>>
>>
>> /* Create default call rcu data if need be */
>> (void) get_default_call_rcu_data();
>> + __cds_wfq_splice_blocking(&default_call_rcu_data->cbs,&crdp->cbs);
>
> You're right. I initially thought that the order with the call to
> get_default_call_rcu_data() was important, but it rather looks like we
> call get_default_call_rcu_data() just to ensure that we have somewhere
> to send the already enqueued callbacks. Indeed, we can merge those as
> you propose. Will do!
>
>>
>>
>>
>> > uatomic_add(&default_call_rcu_data->qlen,
>> > uatomic_read(&crdp->qlen));
>> > +
>> > wake_call_rcu_thread(default_call_rcu_data);
>> > }
>> >
>> > diff --git a/urcu/static/wfqueue.h b/urcu/static/wfqueue.h
>> > index 636e1af..08d8d52 100644
>> > --- a/urcu/static/wfqueue.h
>> > +++ b/urcu/static/wfqueue.h
>> > @@ -9,7 +9,8 @@
>> > * TO BE INCLUDED ONLY IN LGPL-COMPATIBLE CODE. See wfqueue.h for linking
>> > * dynamically with the userspace rcu library.
>> > *
>> > - * Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
>> > + * Copyright 2010-2012 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
>> > + * Copyright 2011-2012 - Lai Jiangshan <laijs at cn.fujitsu.com>
>> > *
>> > * This library is free software; you can redistribute it and/or
>> > * modify it under the terms of the GNU Lesser General Public
>> > @@ -29,6 +30,7 @@
>> > #include <pthread.h>
>> > #include <assert.h>
>> > #include <poll.h>
>> > +#include <stdbool.h>
>> > #include <urcu/compiler.h>
>> > #include <urcu/uatomic.h>
>> >
>> > @@ -38,11 +40,16 @@ extern "C" {
>> >
>> > /*
>> > * Queue with wait-free enqueue/blocking dequeue.
>> > - * This implementation adds a dummy head node when the queue is empty to ensure
>> > - * we can always update the queue locklessly.
>> > *
>> > * Inspired from half-wait-free/half-blocking queue implementation done by
>> > * Paul E. McKenney.
>> > + *
>> > + * Caller must ensure mutual exclusion of queue update operations
>> > + * "dequeue" and "splice" source queue. Queue read operations "first"
>> > + * and "next" need to be protected against concurrent "dequeue" and
>> > + * "splice" (for source queue) by the caller. "enqueue", "splice"
>> > + * (destination queue), and "empty" are the only operations that can be
>> > + * used without any mutual exclusion.
>> > */
>> >
>> > #define WFQ_ADAPT_ATTEMPTS 10 /* Retry if being set */
>> > @@ -57,31 +64,51 @@ static inline void _cds_wfq_init(struct cds_wfq_queue *q)
>> > {
>> > int ret;
>> >
>> > - _cds_wfq_node_init(&q->dummy);
>> > /* Set queue head and tail */
>> > - q->head = &q->dummy;
>> > - q->tail = &q->dummy.next;
>> > + _cds_wfq_node_init(&q->head);
>> > + q->tail = &q->head;
>> > ret = pthread_mutex_init(&q->lock, NULL);
>> > assert(!ret);
>> > }
>> >
>> > -static inline void _cds_wfq_enqueue(struct cds_wfq_queue *q,
>> > - struct cds_wfq_node *node)
>> > +static inline bool _cds_wfq_empty(struct cds_wfq_queue *q)
>> > +{
>> > + /*
>> > + * Queue is empty if no node is pointed by q->head.next nor q->tail.
>> > + */
>> > + return CMM_LOAD_SHARED(q->head.next) == NULL
>> > + && CMM_LOAD_SHARED(q->tail) == &q->head;
>> > +}
>> > +
>> > +static inline void ___cds_wfq_append(struct cds_wfq_queue *q,
>> > + struct cds_wfq_node *new_head,
>> > + struct cds_wfq_node *new_tail)
>> > {
>> > - struct cds_wfq_node **old_tail;
>> > + struct cds_wfq_node *old_tail;
>> >
>> > /*
>> > - * uatomic_xchg() implicit memory barrier orders earlier stores to data
>> > - * structure containing node and setting node->next to NULL before
>> > - * publication.
>> > + * Implicit memory barrier before uatomic_xchg() orders earlier
>> > + * stores to data structure containing node and setting
>> > + * node->next to NULL before publication.
>> > */
>> > - old_tail = uatomic_xchg(&q->tail, &node->next);
>> > + old_tail = uatomic_xchg(&q->tail, new_tail);
>> > +
>> > /*
>> > - * At this point, dequeuers see a NULL old_tail->next, which indicates
>> > - * that the queue is being appended to. The following store will append
>> > - * "node" to the queue from a dequeuer perspective.
>> > + * Implicit memory barrier after uatomic_xchg() orders store to
>> > + * q->tail before store to old_tail->next.
>> > + *
>> > + * At this point, dequeuers see a NULL q->tail->next, which
>> > + * indicates that the queue is being appended to. The following
>> > + * store will append "node" to the queue from a dequeuer
>> > + * perspective.
>> > */
>> > - CMM_STORE_SHARED(*old_tail, node);
>> > + CMM_STORE_SHARED(old_tail->next, new_head);
>> > +}
>> > +
>> > +static inline void _cds_wfq_enqueue(struct cds_wfq_queue *q,
>> > + struct cds_wfq_node *new_tail)
>> > +{
>> > + ___cds_wfq_append(q, new_tail, new_tail);
>> > }
>> >
>> > /*
>> > @@ -100,14 +127,45 @@ ___cds_wfq_node_sync_next(struct cds_wfq_node *node)
>> > if (++attempt >= WFQ_ADAPT_ATTEMPTS) {
>> > poll(NULL, 0, WFQ_WAIT); /* Wait for 10ms */
>> > attempt = 0;
>> > - } else
>> > + } else {
>> > caa_cpu_relax();
>> > + }
>> > }
>> >
>> > return next;
>> > }
>> >
>> > /*
>> > + * ___cds_wfq_first_blocking: get first node of a queue, without dequeuing.
>> > + *
>> > + * Mutual exclusion with "dequeue" and "splice" operations must be ensured
>> > + * by the caller.
>> > + */
>> > +static inline struct cds_wfq_node *
>> > +___cds_wfq_first_blocking(struct cds_wfq_queue *q)
>> > +{
>> > + if (_cds_wfq_empty(q))
>> > + return NULL;
>> > + return ___cds_wfq_node_sync_next(&q->head);
>> > +}
>> > +
>> > +/*
>> > + * ___cds_wfq_next_blocking: get next node of a queue, without dequeuing.
>> > + *
>> > + * Mutual exclusion with "dequeue" and "splice" operations must be ensured
>> > + * by the caller.
>> > + */
>> > +static inline struct cds_wfq_node *
>> > +___cds_wfq_next_blocking(struct cds_wfq_queue *q, struct cds_wfq_node *node)
>> > +{
>> > + if (CMM_LOAD_SHARED(q->tail) == node)
>> > + return NULL;
>> > + return ___cds_wfq_node_sync_next(node);
>> > +}
>>
>>
>> The same BUG as you told me.
>> If q has only one node just enqueued by other thread.
>> but if q->head.next is seen, ___cds_wfq_first_blocking() returns a node,
>> And the update of q->tail is not seen, it is still &q->head,
>> ___cds_wfq_node_sync_next(node) will be loop for every if there is no
>> other enqueue.
>
> Good catch ! :-)
>
>>
>>
>>
>> static inline struct cds_wfq_node *
>> ___cds_wfq_first_blocking(struct cds_wfq_queue *q)
>> {
>> + struct cds_wfq_node *ret.
>> if (_cds_wfq_empty(q))
>> return NULL;
>> ret = ___cds_wfq_node_sync_next(&q->head);
>> + cmm_smp_rmb();
>> + return ret;
>> }
>
> However, I think we should add the rmb at the beginning of
> ___cds_wfq_next_blocking(), so it applies at each "next" call.
> Otherwise, I think we could end up in a situation where we wait for a
> NULL next forever in the second of two consecutive
> ___cds_wfq_next_blocking() calls. I therefore propose:
How this can happen(wait for a NULL next forever in the second ...)?
A rmb is enough to leave the state(simgle node && q->tail == &q->head).
"CMM_LOAD_SHARED(q->tail) == node" will be true in the loop except this state.
>
> static inline struct cds_wfq_node *
> ___cds_wfq_next_blocking(struct cds_wfq_queue *q, struct cds_wfq_node *node)
> {
> /* Load previous node->next before q->tail */
> cmm_smp_rmb();
> if (CMM_LOAD_SHARED(q->tail) == node)
> return NULL;
> return ___cds_wfq_node_sync_next(node);
> }
>
>>
>>
>> > +
>> > +/*
>> > + * ___cds_wfq_dequeue_blocking: dequeue a node from the queue.
>> > + *
>> > * It is valid to reuse and free a dequeued node immediately.
>> > *
>> > * No need to go on a waitqueue here, as there is no possible state in which the
>> > @@ -120,42 +178,123 @@ ___cds_wfq_dequeue_blocking(struct cds_wfq_queue *q)
>> > {
>> > struct cds_wfq_node *node, *next;
>> >
>> > - /*
>> > - * Queue is empty if it only contains the dummy node.
>> > - */
>> > - if (q->head == &q->dummy && CMM_LOAD_SHARED(q->tail) == &q->dummy.next)
>> > + if (_cds_wfq_empty(q))
>> > return NULL;
>> > - node = q->head;
>> >
>> > - next = ___cds_wfq_node_sync_next(node);
>> > + node = ___cds_wfq_node_sync_next(&q->head);
>> > +
>> > + if ((next = CMM_LOAD_SHARED(node->next)) == NULL) {
>> > + /* Load node->next before q->tail */
>> > + cmm_smp_rmb();
>> > + if (CMM_LOAD_SHARED(q->tail) == node) {
>>
>> I don't know why I added this "if" since it is likely true.
>> Could you remove the above 3 lines?
>> (I remember there is a mb() before uatomic_cmpxchg() which means
>> this mb() is before the test in uatomic_cmpxchg())
>
> Indeed. I replaced these by a large comment.
>
>>
>> > + /*
>> > + * @node is the only node in the queue.
>> > + * Try to move the tail to &q->head
>> > + */
>> > + _cds_wfq_node_init(&q->head);
>> > + if (uatomic_cmpxchg(&q->tail, node, &q->head) == node)
>> > + return node;
>> > + }
>> > + next = ___cds_wfq_node_sync_next(node);
>> > + }
>> >
>> > /*
>> > * Move queue head forward.
>> > */
>> > - q->head = next;
>> > + q->head.next = next;
>> > +
>> > + return node;
>> > +}
>> > +
>> > +/*
>> > + * ___cds_wfq_splice_blocking: enqueue all src_q nodes at the end of dest_q.
>> > + *
>> > + * Dequeue all nodes from src_q.
>> > + * dest_q must be already initialized.
>> > + * caller ensures mutual exclusion of dequeue and splice operations on
>> > + * src_q.
>> > + */
>> > +static inline void
>> > +___cds_wfq_splice_blocking(struct cds_wfq_queue *dest_q,
>> > + struct cds_wfq_queue *src_q)
>> > +{
>> > + struct cds_wfq_node *head, *tail;
>> > +
>> > + if (_cds_wfq_empty(src_q))
>> > + return;
>> > +
>> > + head = ___cds_wfq_node_sync_next(&src_q->head);
>> > + _cds_wfq_node_init(&src_q->head);
>> > +
>> > + /*
>> > + * Memory barrier implied before uatomic_xchg() orders store to
>> > + * src_q->head before store to src_q->tail. This is required by
>> > + * concurrent enqueue on src_q, which exchanges the tail before
>> > + * updating the previous tail's next pointer.
>> > + */
>> > + tail = uatomic_xchg(&src_q->tail, &src_q->head);
>> > +
>> > /*
>> > - * Requeue dummy node if we just dequeued it.
>> > + * Append the spliced content of src_q into dest_q. Does not
>> > + * require mutual exclusion on dest_q (wait-free).
>> > */
>> > - if (node == &q->dummy) {
>> > - _cds_wfq_node_init(node);
>> > - _cds_wfq_enqueue(q, node);
>> > - return ___cds_wfq_dequeue_blocking(q);
>> > - }
>> > - return node;
>> > + ___cds_wfq_append(dest_q, head, tail);
>> > +}
>> > +
>> > +/* Locking performed within cds_wfq calls. */
>> > +static inline struct cds_wfq_node *
>> > +_cds_wfq_first_blocking(struct cds_wfq_queue *q)
>> > +{
>> > + struct cds_wfq_node *retval;
>> > + int ret;
>> > +
>> > + ret = pthread_mutex_lock(&q->lock);
>> > + assert(!ret);
>> > + retval = ___cds_wfq_first_blocking(q);
>> > + ret = pthread_mutex_unlock(&q->lock);
>> > + assert(!ret);
>> > + return retval;
>> > +}
>> > +
>> > +static inline struct cds_wfq_node *
>> > +_cds_wfq_next_blocking(struct cds_wfq_queue *q, struct cds_wfq_node *node)
>> > +{
>> > + struct cds_wfq_node *retval;
>> > + int ret;
>> > +
>> > + ret = pthread_mutex_lock(&q->lock);
>> > + assert(!ret);
>> > + retval = ___cds_wfq_next_blocking(q, node);
>> > + ret = pthread_mutex_unlock(&q->lock);
>> > + assert(!ret);
>> > + return retval;
>> > }
>>
>> I reject these _cds_wfq_first_blocking(), _cds_wfq_next_blocking()
>> and cds_wfq_for_each_blocking(), because the claimed "Locking"
>> makes no sense:
>> 1. It protects nothing in _cds_wfq_next_blocking().
>
> Good point!
>
>> 2. There is no "Locking" in the loop body, @node is not dequeued,
>> it will be invalid if some other dequeue it,
>> and _cds_wfq_next_blocking() results BUG.
>
> Indeed.
>
> So do you recommend we just leave locking to the callers then ? What
> locking rules would you recommend we document in the API ?
>
> The only API members that would still have implicit locking are thus:
>
> - cds_wfq_dequeue_blocking()
> - cds_wfq_splice_blocking()
>
> The only reason I leave them there is that we already expose a
> "cds_wfq_dequeue_blocking" to users which provides locking, and I don't
> want to break the API.
>
>>
>> >
>> > static inline struct cds_wfq_node *
>> > _cds_wfq_dequeue_blocking(struct cds_wfq_queue *q)
>> > {
>> > - struct cds_wfq_node *retnode;
>> > + struct cds_wfq_node *retval;
>> > int ret;
>> >
>> > ret = pthread_mutex_lock(&q->lock);
>> > assert(!ret);
>> > - retnode = ___cds_wfq_dequeue_blocking(q);
>> > + retval = ___cds_wfq_dequeue_blocking(q);
>> > ret = pthread_mutex_unlock(&q->lock);
>> > assert(!ret);
>> > - return retnode;
>> > + return retval;
>> > +}
>> > +
>> > +static inline void
>> > +_cds_wfq_splice_blocking(struct cds_wfq_queue *dest_q,
>> > + struct cds_wfq_queue *src_q)
>> > +{
>> > + int ret;
>> > +
>> > + ret = pthread_mutex_lock(&src_q->lock);
>> > + assert(!ret);
>> > + ___cds_wfq_splice_blocking(dest_q, src_q);
>> > + ret = pthread_mutex_unlock(&src_q->lock);
>> > + assert(!ret);
>> > }
>> >
>> > #ifdef __cplusplus
>> > diff --git a/urcu/wfqueue.h b/urcu/wfqueue.h
>> > index 03a73f1..d33d47a 100644
>> > --- a/urcu/wfqueue.h
>> > +++ b/urcu/wfqueue.h
>> > @@ -6,7 +6,8 @@
>> > *
>> > * Userspace RCU library - Queue with Wait-Free Enqueue/Blocking Dequeue
>> > *
>> > - * Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
>> > + * Copyright 2010-2012 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
>> > + * Copyright 2011-2012 - Lai Jiangshan <laijs at cn.fujitsu.com>
>> > *
>> > * This library is free software; you can redistribute it and/or
>> > * modify it under the terms of the GNU Lesser General Public
>> > @@ -25,6 +26,7 @@
>> >
>> > #include <pthread.h>
>> > #include <assert.h>
>> > +#include <stdbool.h>
>> > #include <urcu/compiler.h>
>> >
>> > #ifdef __cplusplus
>> > @@ -33,8 +35,6 @@ extern "C" {
>> >
>> > /*
>> > * Queue with wait-free enqueue/blocking dequeue.
>> > - * This implementation adds a dummy head node when the queue is empty to ensure
>> > - * we can always update the queue locklessly.
>> > *
>> > * Inspired from half-wait-free/half-blocking queue implementation done by
>> > * Paul E. McKenney.
>> > @@ -45,8 +45,8 @@ struct cds_wfq_node {
>> > };
>> >
>> > struct cds_wfq_queue {
>> > - struct cds_wfq_node *head, **tail;
>> > - struct cds_wfq_node dummy; /* Dummy node */
>> > + struct cds_wfq_node head, *tail;
>> > + struct cds_wfq_node padding; /* unused */
>> > pthread_mutex_t lock;
>> > };
>>
>> Why keep the padding?
>
> This is mainly to keep compatibility with applications already using
> wfqueue.h. We could indeed make the size of the cds_wfq_queue smaller,
> but we cannot enlarge it without breaking the API. Therefore, I think it
> is safe to keep some unused padding rather than shrink the size, if we
> ever need to put an extra flag or pointer in the structure.
>
> What I can do, though, is to move the padding to the end of the
> structure, and give it a void * type. Is that OK ?
>
> Thanks,
>
> Mathieu
>
> --
> Mathieu Desnoyers
> Operating System Efficiency R&D Consultant
> EfficiOS Inc.
> http://www.efficios.com
More information about the lttng-dev
mailing list