[lttng-dev] [RFC PATCH] wfqueue: expand API, simplify implementation, small performance boost
Mathieu Desnoyers
mathieu.desnoyers at efficios.com
Mon Aug 13 16:12:19 EDT 2012
Hi Lai,
* Lai Jiangshan (eag0628 at gmail.com) wrote:
> On Sun, Aug 12, 2012 at 10:50 PM, Mathieu Desnoyers
> <mathieu.desnoyers at efficios.com> wrote:
> > This work is derived from the patch from Lai Jiangshan submitted as
> > "urcu: new wfqueue implementation"
> > (http://lists.lttng.org/pipermail/lttng-dev/2012-August/018379.html)
> >
> > Its changelog:
> >
> >> Some guys would be surprised by this fact:
> >> There are already TWO implementations of wfqueue in urcu.
> >>
> >> The first one is in urcu/static/wfqueue.h:
> >> 1) enqueue: exchange the tail and then update previous->next
> >> 2) dequeue: wait for first node's next pointer and them shift, a dummy node
> >> is introduced to avoid the queue->tail become NULL when shift.
> >>
> >> The second one shares some code with the first one, and the left code
> >> are spreading in urcu-call-rcu-impl.h:
> >> 1) enqueue: share with the first one
> >> 2) no dequeue operation: and no shift, so it don't need dummy node,
> >> Although the dummy node is queued when initialization, but it is removed
> >> after the first dequeue_all operation in call_rcu_thread().
> >> call_rcu_data_free() forgets to handle the dummy node if it is not removed.
> >> 3)dequeue_all: record the old head and tail, and queue->head become the special
> >> tail node.(atomic record the tail and change the tail).
> >>
> >> The second implementation's code are spreading, bad for review, and it is not
> >> tested by tests/test_urcu_wfq.
> >>
> >> So we need a better implementation avoid the dummy node dancing and can service
> >> both generic wfqueue APIs and dequeue_all API for call rcu.
> >>
> >> The new implementation:
> >> 1) enqueue: share with the first one/original implementation.
> >> 2) dequeue: shift when node count >= 2, cmpxchg when node count = 1.
> >> no dummy node, save memory.
> >> 3) dequeue_all: simply set queue->head.next to NULL, xchg the tail
> >> and return the old head.next.
> >>
> >> More implementation details are in the code.
> >> tests/test_urcu_wfq will be update in future for testing new APIs.
> >
> > The patch proposed by Lai brings a very interesting simplification to
> > the single-node handling (which is kept here), and moves all queue
> > handling code away from call_rcu implementation, back into the wfqueue
> > code. This has the benefit to allow testing enhancements.
> >
> > I modified it so the API does not expose implementation details to the
> > user (e.g. ___cds_wfq_node_sync_next). I added a "splice" operation and
> > a for loop iterator which should allow wfqueue users to use the list
> > very efficiently both from LGPL/GPL code and from non-LGPL-compatible
> > code.
> >
> > Benchmarks performed on Intel(R) Core(TM) i7-3520M CPU @ 2.90GHz
> > (dual-core, with hyperthreading)
> >
> > Benchmark invoked:
> > test_urcu_wfq 2 2 10
> >
> > Only did 2 runs, but a small improvement seems to be clear for the
> > dequeue speed:
> >
> > Before patch:
> >
> > testdur 10 nr_enqueuers 2 wdelay 0 nr_dequeuers 2 rdur 0 nr_enqueues 136251248 nr_dequeues 54694027 successful enqueues 136251248 successful dequeues 54693904 end_dequeues 81557344 nr_ops 190945275
> > testdur 10 nr_enqueuers 2 wdelay 0 nr_dequeuers 2 rdur 0 nr_enqueues 137258881 nr_dequeues 54463340 successful enqueues 137258881 successful dequeues 54463238 end_dequeues 82795643 nr_ops 191722221
> >
> > After patch:
> >
> > testdur 10 nr_enqueuers 2 wdelay 0 nr_dequeuers 2 rdur 0 nr_enqueues 138589301 nr_dequeues 56911253 successful enqueues 138589301 successful dequeues 56910916 end_dequeues 81678385 nr_ops 195500554
> > testdur 10 nr_enqueuers 2 wdelay 0 nr_dequeuers 2 rdur 0 nr_enqueues 139007622 nr_dequeues 57281502 successful enqueues 139007622 successful dequeues 57281348 end_dequeues 81726274 nr_ops 196289124
> >
> > Summary: Number of enqueues is slightly lower,
>
> ?!
> I see the nr_enqueues and successful enqueues are both increased after
> after patch.
Oh, you're right. I wrote that summary in a hurry. Sorry about that,
will fix.
>
> > probably due to higher
> > dequeue rate. Number of dequeue increased. Respective rate change is
> > within 1% (slowdown) for enqueue, 2% (performance improvement) for
> > dequeue. Overall number of operations (dequeue+enqueue) increased with
> > the patch.
> >
> > We can verify that:
> > successful enqueues - successful dequeues = end_dequeues
> >
> > For all runs (ensures correctness: no lost node).
> >
> > CC: Lai Jiangshan <laijs at cn.fujitsu.com>
> > CC: Paul McKenney <paulmck at linux.vnet.ibm.com>
> > Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> > ---
> > diff --git a/urcu-call-rcu-impl.h b/urcu-call-rcu-impl.h
> > index 13b24ff..5363fe0 100644
> > --- a/urcu-call-rcu-impl.h
> > +++ b/urcu-call-rcu-impl.h
> > @@ -21,6 +21,7 @@
> > */
> >
> > #define _GNU_SOURCE
> > +#define _LGPL_SOURCE
> > #include <stdio.h>
> > #include <pthread.h>
> > #include <signal.h>
> > @@ -220,10 +221,7 @@ static void call_rcu_wake_up(struct call_rcu_data *crdp)
> > static void *call_rcu_thread(void *arg)
> > {
> > unsigned long cbcount;
> > - struct cds_wfq_node *cbs;
> > - struct cds_wfq_node **cbs_tail;
> > - struct call_rcu_data *crdp = (struct call_rcu_data *)arg;
> > - struct rcu_head *rhp;
> > + struct call_rcu_data *crdp = (struct call_rcu_data *) arg;
> > int rt = !!(uatomic_read(&crdp->flags) & URCU_CALL_RCU_RT);
> > int ret;
> >
> > @@ -243,35 +241,29 @@ static void *call_rcu_thread(void *arg)
> > cmm_smp_mb();
> > }
> > for (;;) {
> > - if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) {
> > - while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL)
> > - poll(NULL, 0, 1);
> > - _CMM_STORE_SHARED(crdp->cbs.head, NULL);
> > - cbs_tail = (struct cds_wfq_node **)
> > - uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head);
> > + struct cds_wfq_queue cbs_tmp;
> > + struct cds_wfq_node *cbs;
> > +
> > + cds_wfq_init(&cbs_tmp);
> > + __cds_wfq_splice_blocking(&cbs_tmp, &crdp->cbs);
> > + if (!cds_wfq_empty(&cbs_tmp)) {
> > synchronize_rcu();
> > cbcount = 0;
> > - do {
> > - while (cbs->next == NULL &&
> > - &cbs->next != cbs_tail)
> > - poll(NULL, 0, 1);
> > - if (cbs == &crdp->cbs.dummy) {
> > - cbs = cbs->next;
> > - continue;
> > - }
> > - rhp = (struct rcu_head *)cbs;
> > - cbs = cbs->next;
> > + __cds_wfq_for_each_blocking(&cbs_tmp, cbs) {
> > + struct rcu_head *rhp;
> > +
> > + rhp = caa_container_of(cbs,
> > + struct rcu_head, next);
> > rhp->func(rhp);
>
>
> cbs is freed hear, but it will be used in __cds_wfq_next_blocking().
> Introduce __cds_wfq_for_each_blocking_safe() ?
Good point! Will do.
>
> > cbcount++;
> > - } while (cbs != NULL);
> > + }
> > uatomic_sub(&crdp->qlen, cbcount);
> > }
> > if (uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOP)
> > break;
> > rcu_thread_offline();
> > if (!rt) {
> > - if (&crdp->cbs.head
> > - == _CMM_LOAD_SHARED(crdp->cbs.tail)) {
> > + if (cds_wfq_empty(&crdp->cbs)) {
> > call_rcu_wait(crdp);
> > poll(NULL, 0, 10);
> > uatomic_dec(&crdp->futex);
> > @@ -625,32 +617,32 @@ void call_rcu(struct rcu_head *head,
> > */
> > void call_rcu_data_free(struct call_rcu_data *crdp)
> > {
> > - struct cds_wfq_node *cbs;
> > - struct cds_wfq_node **cbs_tail;
> > - struct cds_wfq_node **cbs_endprev;
> > -
> > if (crdp == NULL || crdp == default_call_rcu_data) {
> > return;
> > }
> > +
> > if ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == 0) {
> > uatomic_or(&crdp->flags, URCU_CALL_RCU_STOP);
> > wake_call_rcu_thread(crdp);
> > while ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == 0)
> > poll(NULL, 0, 1);
> > }
> > - if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) {
> > - while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL)
> > - poll(NULL, 0, 1);
> > - _CMM_STORE_SHARED(crdp->cbs.head, NULL);
> > - cbs_tail = (struct cds_wfq_node **)
> > - uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head);
> > +
> > + if (!cds_wfq_empty(&crdp->cbs)) {
> > + struct cds_wfq_queue cbs_tmp;
> > +
> > + cds_wfq_init(&cbs_tmp);
> > + __cds_wfq_splice_blocking(&cbs_tmp, &crdp->cbs);
> > +
> > /* Create default call rcu data if need be */
> > (void) get_default_call_rcu_data();
> > - cbs_endprev = (struct cds_wfq_node **)
> > - uatomic_xchg(&default_call_rcu_data, cbs_tail);
> > - *cbs_endprev = cbs;
> > +
> > + __cds_wfq_splice_blocking(&default_call_rcu_data->cbs,
> > + &cbs_tmp);
> > +
>
> Too much code to me, cbs_tmp is not required here.
>
>
> /* Create default call rcu data if need be */
> (void) get_default_call_rcu_data();
> + __cds_wfq_splice_blocking(&default_call_rcu_data->cbs,&crdp->cbs);
You're right. I initially thought that the order with the call to
get_default_call_rcu_data() was important, but it rather looks like we
call get_default_call_rcu_data() just to ensure that we have somewhere
to send the already enqueued callbacks. Indeed, we can merge those as
you propose. Will do!
>
>
>
> > uatomic_add(&default_call_rcu_data->qlen,
> > uatomic_read(&crdp->qlen));
> > +
> > wake_call_rcu_thread(default_call_rcu_data);
> > }
> >
> > diff --git a/urcu/static/wfqueue.h b/urcu/static/wfqueue.h
> > index 636e1af..08d8d52 100644
> > --- a/urcu/static/wfqueue.h
> > +++ b/urcu/static/wfqueue.h
> > @@ -9,7 +9,8 @@
> > * TO BE INCLUDED ONLY IN LGPL-COMPATIBLE CODE. See wfqueue.h for linking
> > * dynamically with the userspace rcu library.
> > *
> > - * Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> > + * Copyright 2010-2012 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> > + * Copyright 2011-2012 - Lai Jiangshan <laijs at cn.fujitsu.com>
> > *
> > * This library is free software; you can redistribute it and/or
> > * modify it under the terms of the GNU Lesser General Public
> > @@ -29,6 +30,7 @@
> > #include <pthread.h>
> > #include <assert.h>
> > #include <poll.h>
> > +#include <stdbool.h>
> > #include <urcu/compiler.h>
> > #include <urcu/uatomic.h>
> >
> > @@ -38,11 +40,16 @@ extern "C" {
> >
> > /*
> > * Queue with wait-free enqueue/blocking dequeue.
> > - * This implementation adds a dummy head node when the queue is empty to ensure
> > - * we can always update the queue locklessly.
> > *
> > * Inspired from half-wait-free/half-blocking queue implementation done by
> > * Paul E. McKenney.
> > + *
> > + * Caller must ensure mutual exclusion of queue update operations
> > + * "dequeue" and "splice" source queue. Queue read operations "first"
> > + * and "next" need to be protected against concurrent "dequeue" and
> > + * "splice" (for source queue) by the caller. "enqueue", "splice"
> > + * (destination queue), and "empty" are the only operations that can be
> > + * used without any mutual exclusion.
> > */
> >
> > #define WFQ_ADAPT_ATTEMPTS 10 /* Retry if being set */
> > @@ -57,31 +64,51 @@ static inline void _cds_wfq_init(struct cds_wfq_queue *q)
> > {
> > int ret;
> >
> > - _cds_wfq_node_init(&q->dummy);
> > /* Set queue head and tail */
> > - q->head = &q->dummy;
> > - q->tail = &q->dummy.next;
> > + _cds_wfq_node_init(&q->head);
> > + q->tail = &q->head;
> > ret = pthread_mutex_init(&q->lock, NULL);
> > assert(!ret);
> > }
> >
> > -static inline void _cds_wfq_enqueue(struct cds_wfq_queue *q,
> > - struct cds_wfq_node *node)
> > +static inline bool _cds_wfq_empty(struct cds_wfq_queue *q)
> > +{
> > + /*
> > + * Queue is empty if no node is pointed by q->head.next nor q->tail.
> > + */
> > + return CMM_LOAD_SHARED(q->head.next) == NULL
> > + && CMM_LOAD_SHARED(q->tail) == &q->head;
> > +}
> > +
> > +static inline void ___cds_wfq_append(struct cds_wfq_queue *q,
> > + struct cds_wfq_node *new_head,
> > + struct cds_wfq_node *new_tail)
> > {
> > - struct cds_wfq_node **old_tail;
> > + struct cds_wfq_node *old_tail;
> >
> > /*
> > - * uatomic_xchg() implicit memory barrier orders earlier stores to data
> > - * structure containing node and setting node->next to NULL before
> > - * publication.
> > + * Implicit memory barrier before uatomic_xchg() orders earlier
> > + * stores to data structure containing node and setting
> > + * node->next to NULL before publication.
> > */
> > - old_tail = uatomic_xchg(&q->tail, &node->next);
> > + old_tail = uatomic_xchg(&q->tail, new_tail);
> > +
> > /*
> > - * At this point, dequeuers see a NULL old_tail->next, which indicates
> > - * that the queue is being appended to. The following store will append
> > - * "node" to the queue from a dequeuer perspective.
> > + * Implicit memory barrier after uatomic_xchg() orders store to
> > + * q->tail before store to old_tail->next.
> > + *
> > + * At this point, dequeuers see a NULL q->tail->next, which
> > + * indicates that the queue is being appended to. The following
> > + * store will append "node" to the queue from a dequeuer
> > + * perspective.
> > */
> > - CMM_STORE_SHARED(*old_tail, node);
> > + CMM_STORE_SHARED(old_tail->next, new_head);
> > +}
> > +
> > +static inline void _cds_wfq_enqueue(struct cds_wfq_queue *q,
> > + struct cds_wfq_node *new_tail)
> > +{
> > + ___cds_wfq_append(q, new_tail, new_tail);
> > }
> >
> > /*
> > @@ -100,14 +127,45 @@ ___cds_wfq_node_sync_next(struct cds_wfq_node *node)
> > if (++attempt >= WFQ_ADAPT_ATTEMPTS) {
> > poll(NULL, 0, WFQ_WAIT); /* Wait for 10ms */
> > attempt = 0;
> > - } else
> > + } else {
> > caa_cpu_relax();
> > + }
> > }
> >
> > return next;
> > }
> >
> > /*
> > + * ___cds_wfq_first_blocking: get first node of a queue, without dequeuing.
> > + *
> > + * Mutual exclusion with "dequeue" and "splice" operations must be ensured
> > + * by the caller.
> > + */
> > +static inline struct cds_wfq_node *
> > +___cds_wfq_first_blocking(struct cds_wfq_queue *q)
> > +{
> > + if (_cds_wfq_empty(q))
> > + return NULL;
> > + return ___cds_wfq_node_sync_next(&q->head);
> > +}
> > +
> > +/*
> > + * ___cds_wfq_next_blocking: get next node of a queue, without dequeuing.
> > + *
> > + * Mutual exclusion with "dequeue" and "splice" operations must be ensured
> > + * by the caller.
> > + */
> > +static inline struct cds_wfq_node *
> > +___cds_wfq_next_blocking(struct cds_wfq_queue *q, struct cds_wfq_node *node)
> > +{
> > + if (CMM_LOAD_SHARED(q->tail) == node)
> > + return NULL;
> > + return ___cds_wfq_node_sync_next(node);
> > +}
>
>
> The same BUG as you told me.
> If q has only one node just enqueued by other thread.
> but if q->head.next is seen, ___cds_wfq_first_blocking() returns a node,
> And the update of q->tail is not seen, it is still &q->head,
> ___cds_wfq_node_sync_next(node) will be loop for every if there is no
> other enqueue.
Good catch ! :-)
>
>
>
> static inline struct cds_wfq_node *
> ___cds_wfq_first_blocking(struct cds_wfq_queue *q)
> {
> + struct cds_wfq_node *ret.
> if (_cds_wfq_empty(q))
> return NULL;
> ret = ___cds_wfq_node_sync_next(&q->head);
> + cmm_smp_rmb();
> + return ret;
> }
However, I think we should add the rmb at the beginning of
___cds_wfq_next_blocking(), so it applies at each "next" call.
Otherwise, I think we could end up in a situation where we wait for a
NULL next forever in the second of two consecutive
___cds_wfq_next_blocking() calls. I therefore propose:
static inline struct cds_wfq_node *
___cds_wfq_next_blocking(struct cds_wfq_queue *q, struct cds_wfq_node *node)
{
/* Load previous node->next before q->tail */
cmm_smp_rmb();
if (CMM_LOAD_SHARED(q->tail) == node)
return NULL;
return ___cds_wfq_node_sync_next(node);
}
>
>
> > +
> > +/*
> > + * ___cds_wfq_dequeue_blocking: dequeue a node from the queue.
> > + *
> > * It is valid to reuse and free a dequeued node immediately.
> > *
> > * No need to go on a waitqueue here, as there is no possible state in which the
> > @@ -120,42 +178,123 @@ ___cds_wfq_dequeue_blocking(struct cds_wfq_queue *q)
> > {
> > struct cds_wfq_node *node, *next;
> >
> > - /*
> > - * Queue is empty if it only contains the dummy node.
> > - */
> > - if (q->head == &q->dummy && CMM_LOAD_SHARED(q->tail) == &q->dummy.next)
> > + if (_cds_wfq_empty(q))
> > return NULL;
> > - node = q->head;
> >
> > - next = ___cds_wfq_node_sync_next(node);
> > + node = ___cds_wfq_node_sync_next(&q->head);
> > +
> > + if ((next = CMM_LOAD_SHARED(node->next)) == NULL) {
> > + /* Load node->next before q->tail */
> > + cmm_smp_rmb();
> > + if (CMM_LOAD_SHARED(q->tail) == node) {
>
> I don't know why I added this "if" since it is likely true.
> Could you remove the above 3 lines?
> (I remember there is a mb() before uatomic_cmpxchg() which means
> this mb() is before the test in uatomic_cmpxchg())
Indeed. I replaced these by a large comment.
>
> > + /*
> > + * @node is the only node in the queue.
> > + * Try to move the tail to &q->head
> > + */
> > + _cds_wfq_node_init(&q->head);
> > + if (uatomic_cmpxchg(&q->tail, node, &q->head) == node)
> > + return node;
> > + }
> > + next = ___cds_wfq_node_sync_next(node);
> > + }
> >
> > /*
> > * Move queue head forward.
> > */
> > - q->head = next;
> > + q->head.next = next;
> > +
> > + return node;
> > +}
> > +
> > +/*
> > + * ___cds_wfq_splice_blocking: enqueue all src_q nodes at the end of dest_q.
> > + *
> > + * Dequeue all nodes from src_q.
> > + * dest_q must be already initialized.
> > + * caller ensures mutual exclusion of dequeue and splice operations on
> > + * src_q.
> > + */
> > +static inline void
> > +___cds_wfq_splice_blocking(struct cds_wfq_queue *dest_q,
> > + struct cds_wfq_queue *src_q)
> > +{
> > + struct cds_wfq_node *head, *tail;
> > +
> > + if (_cds_wfq_empty(src_q))
> > + return;
> > +
> > + head = ___cds_wfq_node_sync_next(&src_q->head);
> > + _cds_wfq_node_init(&src_q->head);
> > +
> > + /*
> > + * Memory barrier implied before uatomic_xchg() orders store to
> > + * src_q->head before store to src_q->tail. This is required by
> > + * concurrent enqueue on src_q, which exchanges the tail before
> > + * updating the previous tail's next pointer.
> > + */
> > + tail = uatomic_xchg(&src_q->tail, &src_q->head);
> > +
> > /*
> > - * Requeue dummy node if we just dequeued it.
> > + * Append the spliced content of src_q into dest_q. Does not
> > + * require mutual exclusion on dest_q (wait-free).
> > */
> > - if (node == &q->dummy) {
> > - _cds_wfq_node_init(node);
> > - _cds_wfq_enqueue(q, node);
> > - return ___cds_wfq_dequeue_blocking(q);
> > - }
> > - return node;
> > + ___cds_wfq_append(dest_q, head, tail);
> > +}
> > +
> > +/* Locking performed within cds_wfq calls. */
> > +static inline struct cds_wfq_node *
> > +_cds_wfq_first_blocking(struct cds_wfq_queue *q)
> > +{
> > + struct cds_wfq_node *retval;
> > + int ret;
> > +
> > + ret = pthread_mutex_lock(&q->lock);
> > + assert(!ret);
> > + retval = ___cds_wfq_first_blocking(q);
> > + ret = pthread_mutex_unlock(&q->lock);
> > + assert(!ret);
> > + return retval;
> > +}
> > +
> > +static inline struct cds_wfq_node *
> > +_cds_wfq_next_blocking(struct cds_wfq_queue *q, struct cds_wfq_node *node)
> > +{
> > + struct cds_wfq_node *retval;
> > + int ret;
> > +
> > + ret = pthread_mutex_lock(&q->lock);
> > + assert(!ret);
> > + retval = ___cds_wfq_next_blocking(q, node);
> > + ret = pthread_mutex_unlock(&q->lock);
> > + assert(!ret);
> > + return retval;
> > }
>
> I reject these _cds_wfq_first_blocking(), _cds_wfq_next_blocking()
> and cds_wfq_for_each_blocking(), because the claimed "Locking"
> makes no sense:
> 1. It protects nothing in _cds_wfq_next_blocking().
Good point!
> 2. There is no "Locking" in the loop body, @node is not dequeued,
> it will be invalid if some other dequeue it,
> and _cds_wfq_next_blocking() results BUG.
Indeed.
So do you recommend we just leave locking to the callers then ? What
locking rules would you recommend we document in the API ?
The only API members that would still have implicit locking are thus:
- cds_wfq_dequeue_blocking()
- cds_wfq_splice_blocking()
The only reason I leave them there is that we already expose a
"cds_wfq_dequeue_blocking" to users which provides locking, and I don't
want to break the API.
>
> >
> > static inline struct cds_wfq_node *
> > _cds_wfq_dequeue_blocking(struct cds_wfq_queue *q)
> > {
> > - struct cds_wfq_node *retnode;
> > + struct cds_wfq_node *retval;
> > int ret;
> >
> > ret = pthread_mutex_lock(&q->lock);
> > assert(!ret);
> > - retnode = ___cds_wfq_dequeue_blocking(q);
> > + retval = ___cds_wfq_dequeue_blocking(q);
> > ret = pthread_mutex_unlock(&q->lock);
> > assert(!ret);
> > - return retnode;
> > + return retval;
> > +}
> > +
> > +static inline void
> > +_cds_wfq_splice_blocking(struct cds_wfq_queue *dest_q,
> > + struct cds_wfq_queue *src_q)
> > +{
> > + int ret;
> > +
> > + ret = pthread_mutex_lock(&src_q->lock);
> > + assert(!ret);
> > + ___cds_wfq_splice_blocking(dest_q, src_q);
> > + ret = pthread_mutex_unlock(&src_q->lock);
> > + assert(!ret);
> > }
> >
> > #ifdef __cplusplus
> > diff --git a/urcu/wfqueue.h b/urcu/wfqueue.h
> > index 03a73f1..d33d47a 100644
> > --- a/urcu/wfqueue.h
> > +++ b/urcu/wfqueue.h
> > @@ -6,7 +6,8 @@
> > *
> > * Userspace RCU library - Queue with Wait-Free Enqueue/Blocking Dequeue
> > *
> > - * Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> > + * Copyright 2010-2012 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> > + * Copyright 2011-2012 - Lai Jiangshan <laijs at cn.fujitsu.com>
> > *
> > * This library is free software; you can redistribute it and/or
> > * modify it under the terms of the GNU Lesser General Public
> > @@ -25,6 +26,7 @@
> >
> > #include <pthread.h>
> > #include <assert.h>
> > +#include <stdbool.h>
> > #include <urcu/compiler.h>
> >
> > #ifdef __cplusplus
> > @@ -33,8 +35,6 @@ extern "C" {
> >
> > /*
> > * Queue with wait-free enqueue/blocking dequeue.
> > - * This implementation adds a dummy head node when the queue is empty to ensure
> > - * we can always update the queue locklessly.
> > *
> > * Inspired from half-wait-free/half-blocking queue implementation done by
> > * Paul E. McKenney.
> > @@ -45,8 +45,8 @@ struct cds_wfq_node {
> > };
> >
> > struct cds_wfq_queue {
> > - struct cds_wfq_node *head, **tail;
> > - struct cds_wfq_node dummy; /* Dummy node */
> > + struct cds_wfq_node head, *tail;
> > + struct cds_wfq_node padding; /* unused */
> > pthread_mutex_t lock;
> > };
>
> Why keep the padding?
This is mainly to keep compatibility with applications already using
wfqueue.h. We could indeed make the size of the cds_wfq_queue smaller,
but we cannot enlarge it without breaking the API. Therefore, I think it
is safe to keep some unused padding rather than shrink the size, if we
ever need to put an extra flag or pointer in the structure.
What I can do, though, is to move the padding to the end of the
structure, and give it a void * type. Is that OK ?
Thanks,
Mathieu
--
Mathieu Desnoyers
Operating System Efficiency R&D Consultant
EfficiOS Inc.
http://www.efficios.com
More information about the lttng-dev
mailing list