[lttng-dev] [PATCH 2/2] urcu: new wfqueue implementation

Mathieu Desnoyers mathieu.desnoyers at efficios.com
Sat Aug 11 09:14:17 EDT 2012


* Lai Jiangshan (laijs at cn.fujitsu.com) wrote:
> Some guys would be surprised by this fact:
> There are already TWO implementations of wfqueue in urcu.
> 
> The first one is in urcu/static/wfqueue.h:
> 1) enqueue: exchange the tail and then update previous->next
> 2) dequeue: wait for first node's next pointer and them shift, a dummy node
> 	is introduced to avoid the queue->tail become NULL when shift.
> 
> The second one shares some code with the first one, and the left code
> are spreading in urcu-call-rcu-impl.h:
> 1) enqueue: share with the first one
> 2) no dequeue operation: and no shift, so it don't need dummy node,
> 	Although the dummy node is queued when initialization, but it is removed
> 	after the first dequeue_all operation in call_rcu_thread().
> 	call_rcu_data_free() forgets to handle the dummy node if it is not removed.
> 3)dequeue_all: record the old head and tail, and queue->head become the special
> 	tail node.(atomic record the tail and change the tail).
> 
> The second implementation's code are spreading, bad for review, and it is not
> tested by tests/test_urcu_wfq.
> 
> So we need a better implementation avoid the dummy node dancing and can service
> both generic wfqueue APIs and dequeue_all API for call rcu.
> 
> The new implementation:
> 1) enqueue: share with the first one/original implementation.
> 2) dequeue: shift when node count >= 2, cmpxchg when node count = 1.
> 	no dummy node, save memory.
> 3) dequeue_all: simply set queue->head.next to NULL, xchg the tail
> 	and return the old head.next.
> 
> More implementation details are in the code.
> tests/test_urcu_wfq will be update in future for testing new APIs.

Hi Lai,

Some other style-related questions below,

> 
> 
> Signed-off-by: Lai Jiangshan <laijs at cn.fujitsu.com>
> ---
>  urcu-call-rcu-impl.h  |   50 ++++++++++--------------
>  urcu/static/wfqueue.h |  104 ++++++++++++++++++++++++++++++++++++------------
>  urcu/wfqueue.h        |   25 ++++++++++--
>  wfqueue.c             |   29 ++++++++++++++
>  4 files changed, 149 insertions(+), 59 deletions(-)
> 
> diff --git a/urcu-call-rcu-impl.h b/urcu-call-rcu-impl.h
> index 13b24ff..dbfb410 100644
> --- a/urcu-call-rcu-impl.h
> +++ b/urcu-call-rcu-impl.h
> @@ -221,7 +221,7 @@ static void *call_rcu_thread(void *arg)
>  {
>  	unsigned long cbcount;
>  	struct cds_wfq_node *cbs;
> -	struct cds_wfq_node **cbs_tail;
> +	struct cds_wfq_node *cbs_tail;
>  	struct call_rcu_data *crdp = (struct call_rcu_data *)arg;
>  	struct rcu_head *rhp;
>  	int rt = !!(uatomic_read(&crdp->flags) & URCU_CALL_RCU_RT);
> @@ -243,24 +243,18 @@ static void *call_rcu_thread(void *arg)
>  		cmm_smp_mb();
>  	}
>  	for (;;) {
> -		if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) {
> -			while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL)
> -				poll(NULL, 0, 1);
> -			_CMM_STORE_SHARED(crdp->cbs.head, NULL);
> -			cbs_tail = (struct cds_wfq_node **)
> -				uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head);
> +		cbs = __cds_wfq_dequeue_all_blocking(&crdp->cbs, &cbs_tail);
> +		if (cbs) {
>  			synchronize_rcu();
>  			cbcount = 0;
>  			do {
> -				while (cbs->next == NULL &&
> -				       &cbs->next != cbs_tail)
> -				       	poll(NULL, 0, 1);
> -				if (cbs == &crdp->cbs.dummy) {
> -					cbs = cbs->next;
> -					continue;
> -				}
>  				rhp = (struct rcu_head *)cbs;
> -				cbs = cbs->next;
> +
> +				if (cbs != cbs_tail)
> +					cbs = __cds_wfq_node_sync_next(cbs);
> +				else
> +					cbs = NULL;
> +
>  				rhp->func(rhp);
>  				cbcount++;
>  			} while (cbs != NULL);
> @@ -270,8 +264,7 @@ static void *call_rcu_thread(void *arg)
>  			break;
>  		rcu_thread_offline();
>  		if (!rt) {
> -			if (&crdp->cbs.head
> -			    == _CMM_LOAD_SHARED(crdp->cbs.tail)) {
> +			if (cds_wfq_empty(&crdp->cbs)) {
>  				call_rcu_wait(crdp);
>  				poll(NULL, 0, 10);
>  				uatomic_dec(&crdp->futex);
> @@ -625,32 +618,31 @@ void call_rcu(struct rcu_head *head,
>   */
>  void call_rcu_data_free(struct call_rcu_data *crdp)
>  {
> -	struct cds_wfq_node *cbs;
> -	struct cds_wfq_node **cbs_tail;
> -	struct cds_wfq_node **cbs_endprev;
> +	struct cds_wfq_node *head, *tail;
>  
>  	if (crdp == NULL || crdp == default_call_rcu_data) {
>  		return;
>  	}
> +
>  	if ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == 0) {
>  		uatomic_or(&crdp->flags, URCU_CALL_RCU_STOP);
>  		wake_call_rcu_thread(crdp);
>  		while ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == 0)
>  			poll(NULL, 0, 1);
>  	}
> -	if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) {
> -		while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL)
> -			poll(NULL, 0, 1);
> -		_CMM_STORE_SHARED(crdp->cbs.head, NULL);
> -		cbs_tail = (struct cds_wfq_node **)
> -			uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head);
> +
> +	if (!cds_wfq_empty(&crdp->cbs)) {
> +		head = __cds_wfq_dequeue_all_blocking(&crdp->cbs, &tail);
> +		assert(head);
> +
>  		/* Create default call rcu data if need be */
>  		(void) get_default_call_rcu_data();
> -		cbs_endprev = (struct cds_wfq_node **)
> -			uatomic_xchg(&default_call_rcu_data, cbs_tail);
> -		*cbs_endprev = cbs;
> +
> +		__cds_wfq_append_list(&default_call_rcu_data->cbs, head, tail);
> +
>  		uatomic_add(&default_call_rcu_data->qlen,
>  			    uatomic_read(&crdp->qlen));
> +
>  		wake_call_rcu_thread(default_call_rcu_data);
>  	}
>  
> diff --git a/urcu/static/wfqueue.h b/urcu/static/wfqueue.h
> index 636e1af..15ea9fc 100644
> --- a/urcu/static/wfqueue.h
> +++ b/urcu/static/wfqueue.h
> @@ -10,6 +10,7 @@
>   * dynamically with the userspace rcu library.
>   *
>   * Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> + * Copyright 2011-2012 - Lai Jiangshan <laijs at cn.fujitsu.com>
>   *
>   * This library is free software; you can redistribute it and/or
>   * modify it under the terms of the GNU Lesser General Public
> @@ -29,6 +30,7 @@
>  #include <pthread.h>
>  #include <assert.h>
>  #include <poll.h>
> +#include <stdbool.h>
>  #include <urcu/compiler.h>
>  #include <urcu/uatomic.h>
>  
> @@ -38,8 +40,6 @@ extern "C" {
>  
>  /*
>   * Queue with wait-free enqueue/blocking dequeue.
> - * This implementation adds a dummy head node when the queue is empty to ensure
> - * we can always update the queue locklessly.
>   *
>   * Inspired from half-wait-free/half-blocking queue implementation done by
>   * Paul E. McKenney.
> @@ -57,31 +57,43 @@ static inline void _cds_wfq_init(struct cds_wfq_queue *q)
>  {
>  	int ret;
>  
> -	_cds_wfq_node_init(&q->dummy);
>  	/* Set queue head and tail */
> -	q->head = &q->dummy;
> -	q->tail = &q->dummy.next;
> +	_cds_wfq_node_init(&q->head);
> +	q->tail = &q->head;
>  	ret = pthread_mutex_init(&q->lock, NULL);
>  	assert(!ret);
>  }
>  
> -static inline void _cds_wfq_enqueue(struct cds_wfq_queue *q,
> -				    struct cds_wfq_node *node)
> +static inline bool _cds_wfq_empty(struct cds_wfq_queue *q)
>  {
> -	struct cds_wfq_node **old_tail;
> +	/*
> +	 * Queue is empty if no node is pointed by q->head.next nor q->tail.
> +	 */
> +	return q->head.next == NULL && CMM_LOAD_SHARED(q->tail) == &q->head;
> +}
>  
> +static inline void ___cds_wfq_append_list(struct cds_wfq_queue *q,
> +		struct cds_wfq_node *head, struct cds_wfq_node *tail)
> +{
>  	/*
>  	 * uatomic_xchg() implicit memory barrier orders earlier stores to data
>  	 * structure containing node and setting node->next to NULL before
>  	 * publication.
>  	 */
> -	old_tail = uatomic_xchg(&q->tail, &node->next);
> +	tail = uatomic_xchg(&q->tail, tail);

I'd prefer to keep "old_tail" here, because it becomes clearer to anyone
reviewing that uatomic_xchg() returns the old tail (and this extra
clarity comes without any overhead).

> +
>  	/*
> -	 * At this point, dequeuers see a NULL old_tail->next, which indicates
> +	 * At this point, dequeuers see a NULL tail->next, which indicates
>  	 * that the queue is being appended to. The following store will append
>  	 * "node" to the queue from a dequeuer perspective.
>  	 */
> -	CMM_STORE_SHARED(*old_tail, node);
> +	CMM_STORE_SHARED(tail->next, head);
> +}
> +
> +static inline void _cds_wfq_enqueue(struct cds_wfq_queue *q,
> +				    struct cds_wfq_node *node)
> +{
> +	___cds_wfq_append_list(q, node, node);
>  }

Why not keep ___cds_wfq_append_list() merged into _cds_wfq_enqueue() ?

This would keep the number of symbols exported minimal.

So if I get it right, one "_" prefix is the "normally used" functions
(exposed through the LGPL symbol API).

The "__" prefix are somewhat more internal, but can also be used
externally.

Finally, the "___" prefix seem to be quite similar to the
double-underscores.

We might need more consistency, I'm not sure the triple-underscores are
needed. Also, I'm not sure should export the double-underscore functions
outside of LGPL use (in other words, maybe we should not expose them to
!LGPL_SOURCE code). So we would emit the static inlines, but no symbols
for those. This covers ___cds_wfq_node_sync_next(), and
___cds_wfq_dequeue_all_blocking (which requires the caller to use
sync_next). Currently, all code that needs to fine-grained integration
is within the userspace RCU tree, which defines LGPL_SOURCE.

>  
>  /*
> @@ -120,27 +132,46 @@ ___cds_wfq_dequeue_blocking(struct cds_wfq_queue *q)
>  {
>  	struct cds_wfq_node *node, *next;
>  
> -	/*
> -	 * Queue is empty if it only contains the dummy node.
> -	 */
> -	if (q->head == &q->dummy && CMM_LOAD_SHARED(q->tail) == &q->dummy.next)
> +	if (_cds_wfq_empty(q))
>  		return NULL;
> -	node = q->head;
>  
> -	next = ___cds_wfq_node_sync_next(node);
> +	node = ___cds_wfq_node_sync_next(&q->head);
> +
> +	if ((next = CMM_LOAD_SHARED(node->next)) == NULL) {
> +		if (CMM_LOAD_SHARED(q->tail) == node) {
> +			/*
> +			 * @node is the only node in the queue.
> +			 * Try to move the tail to &q->head
> +			 */
> +			_cds_wfq_node_init(&q->head);
> +			if (uatomic_cmpxchg(&q->tail, node, &q->head) == node)
> +				return node;
> +		}
> +		next = ___cds_wfq_node_sync_next(node);
> +	}
>  
>  	/*
>  	 * Move queue head forward.
>  	 */
> -	q->head = next;
> -	/*
> -	 * Requeue dummy node if we just dequeued it.
> -	 */
> -	if (node == &q->dummy) {
> -		_cds_wfq_node_init(node);
> -		_cds_wfq_enqueue(q, node);
> -		return ___cds_wfq_dequeue_blocking(q);
> -	}
> +	q->head.next = next;
> +
> +	return node;
> +}
> +
> +/* dequeue all nodes, the nodes are not synchronized for the next pointer */
> +static inline struct cds_wfq_node *
> +___cds_wfq_dequeue_all_blocking(struct cds_wfq_queue *q,
> +		struct cds_wfq_node **tail)
> +{
> +	struct cds_wfq_node *node;
> +
> +	if (_cds_wfq_empty(q))
> +		return NULL;
> +
> +	node = ___cds_wfq_node_sync_next(&q->head);
> +	_cds_wfq_node_init(&q->head);
> +	*tail = uatomic_xchg(&q->tail, &q->head);
> +
>  	return node;
>  }
>  
> @@ -158,6 +189,27 @@ _cds_wfq_dequeue_blocking(struct cds_wfq_queue *q)
>  	return retnode;
>  }
>  
> +static inline struct cds_wfq_node *
> +_cds_wfq_dequeue_all_blocking(struct cds_wfq_queue *q,
> +		struct cds_wfq_node **tail)
> +{
> +	struct cds_wfq_node *node, *next;
> +	int ret;
> +
> +	ret = pthread_mutex_lock(&q->lock);
> +	assert(!ret);
> +	node = ___cds_wfq_dequeue_all_blocking(q, tail);
> +	ret = pthread_mutex_unlock(&q->lock);
> +	assert(!ret);

So we take the queue lock on dequeue_all, but not on dequeue. It might
be good to have a consistent behavior: either we lock dequeue and
dequeue_all, or leave the lock entirely to the caller (and document it).

Thoughts ?

Thanks!

Mathieu

> +
> +	/* synchronize all nodes' next pointer */
> +	next = node;
> +	while (next != *tail)
> +		next = ___cds_wfq_node_sync_next(next);
> +
> +	return node;
> +}
> +
>  #ifdef __cplusplus
>  }
>  #endif
> diff --git a/urcu/wfqueue.h b/urcu/wfqueue.h
> index 03a73f1..985f540 100644
> --- a/urcu/wfqueue.h
> +++ b/urcu/wfqueue.h
> @@ -7,6 +7,7 @@
>   * Userspace RCU library - Queue with Wait-Free Enqueue/Blocking Dequeue
>   *
>   * Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> + * Copyright 2011-2012 - Lai Jiangshan <laijs at cn.fujitsu.com>
>   *
>   * This library is free software; you can redistribute it and/or
>   * modify it under the terms of the GNU Lesser General Public
> @@ -25,6 +26,7 @@
>  
>  #include <pthread.h>
>  #include <assert.h>
> +#include <stdbool.h>
>  #include <urcu/compiler.h>
>  
>  #ifdef __cplusplus
> @@ -33,8 +35,6 @@ extern "C" {
>  
>  /*
>   * Queue with wait-free enqueue/blocking dequeue.
> - * This implementation adds a dummy head node when the queue is empty to ensure
> - * we can always update the queue locklessly.
>   *
>   * Inspired from half-wait-free/half-blocking queue implementation done by
>   * Paul E. McKenney.
> @@ -45,8 +45,7 @@ struct cds_wfq_node {
>  };
>  
>  struct cds_wfq_queue {
> -	struct cds_wfq_node *head, **tail;
> -	struct cds_wfq_node dummy;	/* Dummy node */
> +	struct cds_wfq_node head, *tail;
>  	pthread_mutex_t lock;
>  };
>  
> @@ -56,18 +55,36 @@ struct cds_wfq_queue {
>  
>  #define cds_wfq_node_init		_cds_wfq_node_init
>  #define cds_wfq_init		_cds_wfq_init
> +#define cds_wfq_empty		_cds_wfq_empty
> +#define __cds_wfq_append_list	___cds_wfq_append_list
>  #define cds_wfq_enqueue		_cds_wfq_enqueue
>  #define __cds_wfq_dequeue_blocking	___cds_wfq_dequeue_blocking
>  #define cds_wfq_dequeue_blocking	_cds_wfq_dequeue_blocking
> +#define __cds_wfq_node_sync_next	___cds_wfq_node_sync_next
> +#define __cds_wfq_dequeue_all_blocking	___cds_wfq_dequeue_all_blocking
> +#define cds_wfq_dequeue_all_blocking	_cds_wfq_dequeue_all_blocking
>  
>  #else /* !_LGPL_SOURCE */
>  
>  extern void cds_wfq_node_init(struct cds_wfq_node *node);
>  extern void cds_wfq_init(struct cds_wfq_queue *q);
> +extern bool cds_wfq_empty(struct cds_wfq_queue *q);
> +/* __cds_wfq_append_list: caller ensures mutual exclusion between dequeues */
> +extern void __cds_wfq_append_list(struct cds_wfq_queue *q,
> +		struct cds_wfq_node *head, struct cds_wfq_node *tail);
>  extern void cds_wfq_enqueue(struct cds_wfq_queue *q, struct cds_wfq_node *node);
>  /* __cds_wfq_dequeue_blocking: caller ensures mutual exclusion between dequeues */
>  extern struct cds_wfq_node *__cds_wfq_dequeue_blocking(struct cds_wfq_queue *q);
>  extern struct cds_wfq_node *cds_wfq_dequeue_blocking(struct cds_wfq_queue *q);
> +extern struct cds_wfq_node *__cds_wfq_node_sync_next(struct cds_wfq_node *node);
> +/*
> + * __cds_wfq_dequeue_all_blocking: caller ensures mutual exclusion between
> + * dequeues, and need synchronize next pointer berfore use it.
> + */
> +extern struct cds_wfq_node *__cds_wfq_dequeue_all_blocking(
> +		struct cds_wfq_queue *q, struct cds_wfq_node **tail);
> +extern struct cds_wfq_node *cds_wfq_dequeue_all_blocking(
> +		struct cds_wfq_queue *q, struct cds_wfq_node **tail);
>  
>  #endif /* !_LGPL_SOURCE */
>  
> diff --git a/wfqueue.c b/wfqueue.c
> index 3337171..28a7b58 100644
> --- a/wfqueue.c
> +++ b/wfqueue.c
> @@ -4,6 +4,7 @@
>   * Userspace RCU library - Queue with Wait-Free Enqueue/Blocking Dequeue
>   *
>   * Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> + * Copyright 2011-2012 - Lai Jiangshan <laijs at cn.fujitsu.com>
>   *
>   * This library is free software; you can redistribute it and/or
>   * modify it under the terms of the GNU Lesser General Public
> @@ -38,6 +39,17 @@ void cds_wfq_init(struct cds_wfq_queue *q)
>  	_cds_wfq_init(q);
>  }
>  
> +bool cds_wfq_empty(struct cds_wfq_queue *q)
> +{
> +	return _cds_wfq_empty(q);
> +}
> +
> +void __cds_wfq_append_list(struct cds_wfq_queue *q,
> +		struct cds_wfq_node *head, struct cds_wfq_node *tail)
> +{
> +	return ___cds_wfq_append_list(q, head, tail);
> +}
> +
>  void cds_wfq_enqueue(struct cds_wfq_queue *q, struct cds_wfq_node *node)
>  {
>  	_cds_wfq_enqueue(q, node);
> @@ -52,3 +64,20 @@ struct cds_wfq_node *cds_wfq_dequeue_blocking(struct cds_wfq_queue *q)
>  {
>  	return _cds_wfq_dequeue_blocking(q);
>  }
> +
> +struct cds_wfq_node *__cds_wfq_node_sync_next(struct cds_wfq_node *node)
> +{
> +	return ___cds_wfq_node_sync_next(node);
> +}
> +
> +struct cds_wfq_node *__cds_wfq_dequeue_all_blocking(
> +		struct cds_wfq_queue *q, struct cds_wfq_node **tail)
> +{
> +	return ___cds_wfq_dequeue_all_blocking(q, tail);
> +}
> +
> +struct cds_wfq_node *cds_wfq_dequeue_all_blocking(
> +		struct cds_wfq_queue *q, struct cds_wfq_node **tail)
> +{
> +	return _cds_wfq_dequeue_all_blocking(q, tail);
> +}
> -- 
> 1.7.7
> 
> 
> _______________________________________________
> lttng-dev mailing list
> lttng-dev at lists.lttng.org
> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev

-- 
Mathieu Desnoyers
Operating System Efficiency R&D Consultant
EfficiOS Inc.
http://www.efficios.com



More information about the lttng-dev mailing list