[lttng-dev] [PATCH 2/2] urcu: new wfqueue implementation

Lai Jiangshan eag0628 at gmail.com
Fri Aug 10 21:46:07 EDT 2012


On Sat, Aug 11, 2012 at 2:28 AM, Mathieu Desnoyers
<mathieu.desnoyers at efficios.com> wrote:
> * Lai Jiangshan (laijs at cn.fujitsu.com) wrote:
>> Some guys would be surprised by this fact:
>> There are already TWO implementations of wfqueue in urcu.
>>
>> The first one is in urcu/static/wfqueue.h:
>> 1) enqueue: exchange the tail and then update previous->next
>> 2) dequeue: wait for first node's next pointer and them shift, a dummy node
>>       is introduced to avoid the queue->tail become NULL when shift.
>>
>> The second one shares some code with the first one, and the left code
>> are spreading in urcu-call-rcu-impl.h:
>> 1) enqueue: share with the first one
>> 2) no dequeue operation: and no shift, so it don't need dummy node,
>>       Although the dummy node is queued when initialization, but it is removed
>>       after the first dequeue_all operation in call_rcu_thread().
>>       call_rcu_data_free() forgets to handle the dummy node if it is not removed.
>> 3)dequeue_all: record the old head and tail, and queue->head become the special
>>       tail node.(atomic record the tail and change the tail).
>>
>> The second implementation's code are spreading, bad for review, and it is not
>> tested by tests/test_urcu_wfq.
>>
>> So we need a better implementation avoid the dummy node dancing and can service
>> both generic wfqueue APIs and dequeue_all API for call rcu.
>>
>> The new implementation:
>> 1) enqueue: share with the first one/original implementation.
>> 2) dequeue: shift when node count >= 2, cmpxchg when node count = 1.
>>       no dummy node, save memory.
>> 3) dequeue_all: simply set queue->head.next to NULL, xchg the tail
>>       and return the old head.next.
>
> Hi Lai,
>
> Your approach is very interesting! It is indeed good for testing and
> maintenance if we can keep all the queue code within the API.
>
> I am concerned about the following scenario in your new implementation,
> I would like to know your thoughts on this. It could happen on
> architectures reordering loads (DEC Alpha, AMD64, IA64, PA-RISC, POWER,
> SPARC RMO, x86 TSO, and x86 OOStore):
>
> init state: list is empty
>
> CPU 0                                              CPU 1
>
> ___cds_wfq_append_list() (append newtail)
>   oldtail = uatomic_xchg(&q->tail, newtail); (A)
>   CMM_STORE_SHARED(oldtail->next, head); (B)
>
>                                                     (B) is observable by cpu 1, but not (A) yet
>
>                                                     ___cds_wfq_dequeue_blocking()
>                                                         _cds_wfq_empty(q)
>                                                             return q->head.next == NULL
>                                                                     && CMM_LOAD_SHARED(q->tail) == &q->head;
>                                                                  -> false (q->tail != &q->head)
>                                                         node = ___cds_wfq_node_sync_next(&q->head);
>                                                            -> node is newtail
>                                                         if ((next = CMM_LOAD_SHARED(node->next)) == NULL) {
>                                                            -> taken, newtail->next is indeed NULL
>                                                            * (see note below)
>                                                            if (CMM_LOAD_SHARED(q->tail) == node) {
>                                                               -> not taken, since q->tail still appears as &q->head
>                                                            }
>                                                            next = ___cds_wfq_node_sync_next(node);
>                                                              -> endless loop if no other enqueue is performed. (BUG)
>                                                         }
>
>                                                     (A) is observable by cpu 1
>
> * note: I think we should add a cmm_smp_rmb() here to fix this issue. It
>   would force CPU 1 to necessarily see store (A) if store (B) is seen.
>   This would be matching the full memory barrier implied after
>   uatomic_xchg().
>

You are right. Can you add this line of code after merge this patch if
there is no other problem.


Lai



> Thanks,
>
> Mathieu
>
>>
>> More implementation details are in the code.
>> tests/test_urcu_wfq will be update in future for testing new APIs.
>>
>>
>> Signed-off-by: Lai Jiangshan <laijs at cn.fujitsu.com>
>> ---
>>  urcu-call-rcu-impl.h  |   50 ++++++++++--------------
>>  urcu/static/wfqueue.h |  104 ++++++++++++++++++++++++++++++++++++------------
>>  urcu/wfqueue.h        |   25 ++++++++++--
>>  wfqueue.c             |   29 ++++++++++++++
>>  4 files changed, 149 insertions(+), 59 deletions(-)
>>
>> diff --git a/urcu-call-rcu-impl.h b/urcu-call-rcu-impl.h
>> index 13b24ff..dbfb410 100644
>> --- a/urcu-call-rcu-impl.h
>> +++ b/urcu-call-rcu-impl.h
>> @@ -221,7 +221,7 @@ static void *call_rcu_thread(void *arg)
>>  {
>>       unsigned long cbcount;
>>       struct cds_wfq_node *cbs;
>> -     struct cds_wfq_node **cbs_tail;
>> +     struct cds_wfq_node *cbs_tail;
>>       struct call_rcu_data *crdp = (struct call_rcu_data *)arg;
>>       struct rcu_head *rhp;
>>       int rt = !!(uatomic_read(&crdp->flags) & URCU_CALL_RCU_RT);
>> @@ -243,24 +243,18 @@ static void *call_rcu_thread(void *arg)
>>               cmm_smp_mb();
>>       }
>>       for (;;) {
>> -             if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) {
>> -                     while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL)
>> -                             poll(NULL, 0, 1);
>> -                     _CMM_STORE_SHARED(crdp->cbs.head, NULL);
>> -                     cbs_tail = (struct cds_wfq_node **)
>> -                             uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head);
>> +             cbs = __cds_wfq_dequeue_all_blocking(&crdp->cbs, &cbs_tail);
>> +             if (cbs) {
>>                       synchronize_rcu();
>>                       cbcount = 0;
>>                       do {
>> -                             while (cbs->next == NULL &&
>> -                                    &cbs->next != cbs_tail)
>> -                                     poll(NULL, 0, 1);
>> -                             if (cbs == &crdp->cbs.dummy) {
>> -                                     cbs = cbs->next;
>> -                                     continue;
>> -                             }
>>                               rhp = (struct rcu_head *)cbs;
>> -                             cbs = cbs->next;
>> +
>> +                             if (cbs != cbs_tail)
>> +                                     cbs = __cds_wfq_node_sync_next(cbs);
>> +                             else
>> +                                     cbs = NULL;
>> +
>>                               rhp->func(rhp);
>>                               cbcount++;
>>                       } while (cbs != NULL);
>> @@ -270,8 +264,7 @@ static void *call_rcu_thread(void *arg)
>>                       break;
>>               rcu_thread_offline();
>>               if (!rt) {
>> -                     if (&crdp->cbs.head
>> -                         == _CMM_LOAD_SHARED(crdp->cbs.tail)) {
>> +                     if (cds_wfq_empty(&crdp->cbs)) {
>>                               call_rcu_wait(crdp);
>>                               poll(NULL, 0, 10);
>>                               uatomic_dec(&crdp->futex);
>> @@ -625,32 +618,31 @@ void call_rcu(struct rcu_head *head,
>>   */
>>  void call_rcu_data_free(struct call_rcu_data *crdp)
>>  {
>> -     struct cds_wfq_node *cbs;
>> -     struct cds_wfq_node **cbs_tail;
>> -     struct cds_wfq_node **cbs_endprev;
>> +     struct cds_wfq_node *head, *tail;
>>
>>       if (crdp == NULL || crdp == default_call_rcu_data) {
>>               return;
>>       }
>> +
>>       if ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == 0) {
>>               uatomic_or(&crdp->flags, URCU_CALL_RCU_STOP);
>>               wake_call_rcu_thread(crdp);
>>               while ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == 0)
>>                       poll(NULL, 0, 1);
>>       }
>> -     if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) {
>> -             while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL)
>> -                     poll(NULL, 0, 1);
>> -             _CMM_STORE_SHARED(crdp->cbs.head, NULL);
>> -             cbs_tail = (struct cds_wfq_node **)
>> -                     uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head);
>> +
>> +     if (!cds_wfq_empty(&crdp->cbs)) {
>> +             head = __cds_wfq_dequeue_all_blocking(&crdp->cbs, &tail);
>> +             assert(head);
>> +
>>               /* Create default call rcu data if need be */
>>               (void) get_default_call_rcu_data();
>> -             cbs_endprev = (struct cds_wfq_node **)
>> -                     uatomic_xchg(&default_call_rcu_data, cbs_tail);
>> -             *cbs_endprev = cbs;
>> +
>> +             __cds_wfq_append_list(&default_call_rcu_data->cbs, head, tail);
>> +
>>               uatomic_add(&default_call_rcu_data->qlen,
>>                           uatomic_read(&crdp->qlen));
>> +
>>               wake_call_rcu_thread(default_call_rcu_data);
>>       }
>>
>> diff --git a/urcu/static/wfqueue.h b/urcu/static/wfqueue.h
>> index 636e1af..15ea9fc 100644
>> --- a/urcu/static/wfqueue.h
>> +++ b/urcu/static/wfqueue.h
>> @@ -10,6 +10,7 @@
>>   * dynamically with the userspace rcu library.
>>   *
>>   * Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
>> + * Copyright 2011-2012 - Lai Jiangshan <laijs at cn.fujitsu.com>
>>   *
>>   * This library is free software; you can redistribute it and/or
>>   * modify it under the terms of the GNU Lesser General Public
>> @@ -29,6 +30,7 @@
>>  #include <pthread.h>
>>  #include <assert.h>
>>  #include <poll.h>
>> +#include <stdbool.h>
>>  #include <urcu/compiler.h>
>>  #include <urcu/uatomic.h>
>>
>> @@ -38,8 +40,6 @@ extern "C" {
>>
>>  /*
>>   * Queue with wait-free enqueue/blocking dequeue.
>> - * This implementation adds a dummy head node when the queue is empty to ensure
>> - * we can always update the queue locklessly.
>>   *
>>   * Inspired from half-wait-free/half-blocking queue implementation done by
>>   * Paul E. McKenney.
>> @@ -57,31 +57,43 @@ static inline void _cds_wfq_init(struct cds_wfq_queue *q)
>>  {
>>       int ret;
>>
>> -     _cds_wfq_node_init(&q->dummy);
>>       /* Set queue head and tail */
>> -     q->head = &q->dummy;
>> -     q->tail = &q->dummy.next;
>> +     _cds_wfq_node_init(&q->head);
>> +     q->tail = &q->head;
>>       ret = pthread_mutex_init(&q->lock, NULL);
>>       assert(!ret);
>>  }
>>
>> -static inline void _cds_wfq_enqueue(struct cds_wfq_queue *q,
>> -                                 struct cds_wfq_node *node)
>> +static inline bool _cds_wfq_empty(struct cds_wfq_queue *q)
>>  {
>> -     struct cds_wfq_node **old_tail;
>> +     /*
>> +      * Queue is empty if no node is pointed by q->head.next nor q->tail.
>> +      */
>> +     return q->head.next == NULL && CMM_LOAD_SHARED(q->tail) == &q->head;
>> +}
>>
>> +static inline void ___cds_wfq_append_list(struct cds_wfq_queue *q,
>> +             struct cds_wfq_node *head, struct cds_wfq_node *tail)
>> +{
>>       /*
>>        * uatomic_xchg() implicit memory barrier orders earlier stores to data
>>        * structure containing node and setting node->next to NULL before
>>        * publication.
>>        */
>> -     old_tail = uatomic_xchg(&q->tail, &node->next);
>> +     tail = uatomic_xchg(&q->tail, tail);
>> +
>>       /*
>> -      * At this point, dequeuers see a NULL old_tail->next, which indicates
>> +      * At this point, dequeuers see a NULL tail->next, which indicates
>>        * that the queue is being appended to. The following store will append
>>        * "node" to the queue from a dequeuer perspective.
>>        */
>> -     CMM_STORE_SHARED(*old_tail, node);
>> +     CMM_STORE_SHARED(tail->next, head);
>> +}
>> +
>> +static inline void _cds_wfq_enqueue(struct cds_wfq_queue *q,
>> +                                 struct cds_wfq_node *node)
>> +{
>> +     ___cds_wfq_append_list(q, node, node);
>>  }
>>
>>  /*
>> @@ -120,27 +132,46 @@ ___cds_wfq_dequeue_blocking(struct cds_wfq_queue *q)
>>  {
>>       struct cds_wfq_node *node, *next;
>>
>> -     /*
>> -      * Queue is empty if it only contains the dummy node.
>> -      */
>> -     if (q->head == &q->dummy && CMM_LOAD_SHARED(q->tail) == &q->dummy.next)
>> +     if (_cds_wfq_empty(q))
>>               return NULL;
>> -     node = q->head;
>>
>> -     next = ___cds_wfq_node_sync_next(node);
>> +     node = ___cds_wfq_node_sync_next(&q->head);
>> +
>> +     if ((next = CMM_LOAD_SHARED(node->next)) == NULL) {
>> +             if (CMM_LOAD_SHARED(q->tail) == node) {
>> +                     /*
>> +                      * @node is the only node in the queue.
>> +                      * Try to move the tail to &q->head
>> +                      */
>> +                     _cds_wfq_node_init(&q->head);
>> +                     if (uatomic_cmpxchg(&q->tail, node, &q->head) == node)
>> +                             return node;
>> +             }
>> +             next = ___cds_wfq_node_sync_next(node);
>> +     }
>>
>>       /*
>>        * Move queue head forward.
>>        */
>> -     q->head = next;
>> -     /*
>> -      * Requeue dummy node if we just dequeued it.
>> -      */
>> -     if (node == &q->dummy) {
>> -             _cds_wfq_node_init(node);
>> -             _cds_wfq_enqueue(q, node);
>> -             return ___cds_wfq_dequeue_blocking(q);
>> -     }
>> +     q->head.next = next;
>> +
>> +     return node;
>> +}
>> +
>> +/* dequeue all nodes, the nodes are not synchronized for the next pointer */
>> +static inline struct cds_wfq_node *
>> +___cds_wfq_dequeue_all_blocking(struct cds_wfq_queue *q,
>> +             struct cds_wfq_node **tail)
>> +{
>> +     struct cds_wfq_node *node;
>> +
>> +     if (_cds_wfq_empty(q))
>> +             return NULL;
>> +
>> +     node = ___cds_wfq_node_sync_next(&q->head);
>> +     _cds_wfq_node_init(&q->head);
>> +     *tail = uatomic_xchg(&q->tail, &q->head);
>> +
>>       return node;
>>  }
>>
>> @@ -158,6 +189,27 @@ _cds_wfq_dequeue_blocking(struct cds_wfq_queue *q)
>>       return retnode;
>>  }
>>
>> +static inline struct cds_wfq_node *
>> +_cds_wfq_dequeue_all_blocking(struct cds_wfq_queue *q,
>> +             struct cds_wfq_node **tail)
>> +{
>> +     struct cds_wfq_node *node, *next;
>> +     int ret;
>> +
>> +     ret = pthread_mutex_lock(&q->lock);
>> +     assert(!ret);
>> +     node = ___cds_wfq_dequeue_all_blocking(q, tail);
>> +     ret = pthread_mutex_unlock(&q->lock);
>> +     assert(!ret);
>> +
>> +     /* synchronize all nodes' next pointer */
>> +     next = node;
>> +     while (next != *tail)
>> +             next = ___cds_wfq_node_sync_next(next);
>> +
>> +     return node;
>> +}
>> +
>>  #ifdef __cplusplus
>>  }
>>  #endif
>> diff --git a/urcu/wfqueue.h b/urcu/wfqueue.h
>> index 03a73f1..985f540 100644
>> --- a/urcu/wfqueue.h
>> +++ b/urcu/wfqueue.h
>> @@ -7,6 +7,7 @@
>>   * Userspace RCU library - Queue with Wait-Free Enqueue/Blocking Dequeue
>>   *
>>   * Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
>> + * Copyright 2011-2012 - Lai Jiangshan <laijs at cn.fujitsu.com>
>>   *
>>   * This library is free software; you can redistribute it and/or
>>   * modify it under the terms of the GNU Lesser General Public
>> @@ -25,6 +26,7 @@
>>
>>  #include <pthread.h>
>>  #include <assert.h>
>> +#include <stdbool.h>
>>  #include <urcu/compiler.h>
>>
>>  #ifdef __cplusplus
>> @@ -33,8 +35,6 @@ extern "C" {
>>
>>  /*
>>   * Queue with wait-free enqueue/blocking dequeue.
>> - * This implementation adds a dummy head node when the queue is empty to ensure
>> - * we can always update the queue locklessly.
>>   *
>>   * Inspired from half-wait-free/half-blocking queue implementation done by
>>   * Paul E. McKenney.
>> @@ -45,8 +45,7 @@ struct cds_wfq_node {
>>  };
>>
>>  struct cds_wfq_queue {
>> -     struct cds_wfq_node *head, **tail;
>> -     struct cds_wfq_node dummy;      /* Dummy node */
>> +     struct cds_wfq_node head, *tail;
>>       pthread_mutex_t lock;
>>  };
>>
>> @@ -56,18 +55,36 @@ struct cds_wfq_queue {
>>
>>  #define cds_wfq_node_init            _cds_wfq_node_init
>>  #define cds_wfq_init         _cds_wfq_init
>> +#define cds_wfq_empty                _cds_wfq_empty
>> +#define __cds_wfq_append_list        ___cds_wfq_append_list
>>  #define cds_wfq_enqueue              _cds_wfq_enqueue
>>  #define __cds_wfq_dequeue_blocking   ___cds_wfq_dequeue_blocking
>>  #define cds_wfq_dequeue_blocking     _cds_wfq_dequeue_blocking
>> +#define __cds_wfq_node_sync_next     ___cds_wfq_node_sync_next
>> +#define __cds_wfq_dequeue_all_blocking       ___cds_wfq_dequeue_all_blocking
>> +#define cds_wfq_dequeue_all_blocking _cds_wfq_dequeue_all_blocking
>>
>>  #else /* !_LGPL_SOURCE */
>>
>>  extern void cds_wfq_node_init(struct cds_wfq_node *node);
>>  extern void cds_wfq_init(struct cds_wfq_queue *q);
>> +extern bool cds_wfq_empty(struct cds_wfq_queue *q);
>> +/* __cds_wfq_append_list: caller ensures mutual exclusion between dequeues */
>> +extern void __cds_wfq_append_list(struct cds_wfq_queue *q,
>> +             struct cds_wfq_node *head, struct cds_wfq_node *tail);
>>  extern void cds_wfq_enqueue(struct cds_wfq_queue *q, struct cds_wfq_node *node);
>>  /* __cds_wfq_dequeue_blocking: caller ensures mutual exclusion between dequeues */
>>  extern struct cds_wfq_node *__cds_wfq_dequeue_blocking(struct cds_wfq_queue *q);
>>  extern struct cds_wfq_node *cds_wfq_dequeue_blocking(struct cds_wfq_queue *q);
>> +extern struct cds_wfq_node *__cds_wfq_node_sync_next(struct cds_wfq_node *node);
>> +/*
>> + * __cds_wfq_dequeue_all_blocking: caller ensures mutual exclusion between
>> + * dequeues, and need synchronize next pointer berfore use it.
>> + */
>> +extern struct cds_wfq_node *__cds_wfq_dequeue_all_blocking(
>> +             struct cds_wfq_queue *q, struct cds_wfq_node **tail);
>> +extern struct cds_wfq_node *cds_wfq_dequeue_all_blocking(
>> +             struct cds_wfq_queue *q, struct cds_wfq_node **tail);
>>
>>  #endif /* !_LGPL_SOURCE */
>>
>> diff --git a/wfqueue.c b/wfqueue.c
>> index 3337171..28a7b58 100644
>> --- a/wfqueue.c
>> +++ b/wfqueue.c
>> @@ -4,6 +4,7 @@
>>   * Userspace RCU library - Queue with Wait-Free Enqueue/Blocking Dequeue
>>   *
>>   * Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
>> + * Copyright 2011-2012 - Lai Jiangshan <laijs at cn.fujitsu.com>
>>   *
>>   * This library is free software; you can redistribute it and/or
>>   * modify it under the terms of the GNU Lesser General Public
>> @@ -38,6 +39,17 @@ void cds_wfq_init(struct cds_wfq_queue *q)
>>       _cds_wfq_init(q);
>>  }
>>
>> +bool cds_wfq_empty(struct cds_wfq_queue *q)
>> +{
>> +     return _cds_wfq_empty(q);
>> +}
>> +
>> +void __cds_wfq_append_list(struct cds_wfq_queue *q,
>> +             struct cds_wfq_node *head, struct cds_wfq_node *tail)
>> +{
>> +     return ___cds_wfq_append_list(q, head, tail);
>> +}
>> +
>>  void cds_wfq_enqueue(struct cds_wfq_queue *q, struct cds_wfq_node *node)
>>  {
>>       _cds_wfq_enqueue(q, node);
>> @@ -52,3 +64,20 @@ struct cds_wfq_node *cds_wfq_dequeue_blocking(struct cds_wfq_queue *q)
>>  {
>>       return _cds_wfq_dequeue_blocking(q);
>>  }
>> +
>> +struct cds_wfq_node *__cds_wfq_node_sync_next(struct cds_wfq_node *node)
>> +{
>> +     return ___cds_wfq_node_sync_next(node);
>> +}
>> +
>> +struct cds_wfq_node *__cds_wfq_dequeue_all_blocking(
>> +             struct cds_wfq_queue *q, struct cds_wfq_node **tail)
>> +{
>> +     return ___cds_wfq_dequeue_all_blocking(q, tail);
>> +}
>> +
>> +struct cds_wfq_node *cds_wfq_dequeue_all_blocking(
>> +             struct cds_wfq_queue *q, struct cds_wfq_node **tail)
>> +{
>> +     return _cds_wfq_dequeue_all_blocking(q, tail);
>> +}
>> --
>> 1.7.7
>>
>>
>> _______________________________________________
>> lttng-dev mailing list
>> lttng-dev at lists.lttng.org
>> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev
>
> --
> Mathieu Desnoyers
> Operating System Efficiency R&D Consultant
> EfficiOS Inc.
> http://www.efficios.com



More information about the lttng-dev mailing list