[lttng-dev] [PATCH 2/2] urcu: new wfqueue implementation
Lai Jiangshan
laijs at cn.fujitsu.com
Thu Aug 9 04:46:26 EDT 2012
Some guys would be surprised by this fact:
There are already TWO implementations of wfqueue in urcu.
The first one is in urcu/static/wfqueue.h:
1) enqueue: exchange the tail and then update previous->next
2) dequeue: wait for first node's next pointer and them shift, a dummy node
is introduced to avoid the queue->tail become NULL when shift.
The second one shares some code with the first one, and the left code
are spreading in urcu-call-rcu-impl.h:
1) enqueue: share with the first one
2) no dequeue operation: and no shift, so it don't need dummy node,
Although the dummy node is queued when initialization, but it is removed
after the first dequeue_all operation in call_rcu_thread().
call_rcu_data_free() forgets to handle the dummy node if it is not removed.
3)dequeue_all: record the old head and tail, and queue->head become the special
tail node.(atomic record the tail and change the tail).
The second implementation's code are spreading, bad for review, and it is not
tested by tests/test_urcu_wfq.
So we need a better implementation avoid the dummy node dancing and can service
both generic wfqueue APIs and dequeue_all API for call rcu.
The new implementation:
1) enqueue: share with the first one/original implementation.
2) dequeue: shift when node count >= 2, cmpxchg when node count = 1.
no dummy node, save memory.
3) dequeue_all: simply set queue->head.next to NULL, xchg the tail
and return the old head.next.
More implementation details are in the code.
tests/test_urcu_wfq will be update in future for testing new APIs.
Signed-off-by: Lai Jiangshan <laijs at cn.fujitsu.com>
---
urcu-call-rcu-impl.h | 50 ++++++++++--------------
urcu/static/wfqueue.h | 104 ++++++++++++++++++++++++++++++++++++------------
urcu/wfqueue.h | 25 ++++++++++--
wfqueue.c | 29 ++++++++++++++
4 files changed, 149 insertions(+), 59 deletions(-)
diff --git a/urcu-call-rcu-impl.h b/urcu-call-rcu-impl.h
index 13b24ff..dbfb410 100644
--- a/urcu-call-rcu-impl.h
+++ b/urcu-call-rcu-impl.h
@@ -221,7 +221,7 @@ static void *call_rcu_thread(void *arg)
{
unsigned long cbcount;
struct cds_wfq_node *cbs;
- struct cds_wfq_node **cbs_tail;
+ struct cds_wfq_node *cbs_tail;
struct call_rcu_data *crdp = (struct call_rcu_data *)arg;
struct rcu_head *rhp;
int rt = !!(uatomic_read(&crdp->flags) & URCU_CALL_RCU_RT);
@@ -243,24 +243,18 @@ static void *call_rcu_thread(void *arg)
cmm_smp_mb();
}
for (;;) {
- if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) {
- while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL)
- poll(NULL, 0, 1);
- _CMM_STORE_SHARED(crdp->cbs.head, NULL);
- cbs_tail = (struct cds_wfq_node **)
- uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head);
+ cbs = __cds_wfq_dequeue_all_blocking(&crdp->cbs, &cbs_tail);
+ if (cbs) {
synchronize_rcu();
cbcount = 0;
do {
- while (cbs->next == NULL &&
- &cbs->next != cbs_tail)
- poll(NULL, 0, 1);
- if (cbs == &crdp->cbs.dummy) {
- cbs = cbs->next;
- continue;
- }
rhp = (struct rcu_head *)cbs;
- cbs = cbs->next;
+
+ if (cbs != cbs_tail)
+ cbs = __cds_wfq_node_sync_next(cbs);
+ else
+ cbs = NULL;
+
rhp->func(rhp);
cbcount++;
} while (cbs != NULL);
@@ -270,8 +264,7 @@ static void *call_rcu_thread(void *arg)
break;
rcu_thread_offline();
if (!rt) {
- if (&crdp->cbs.head
- == _CMM_LOAD_SHARED(crdp->cbs.tail)) {
+ if (cds_wfq_empty(&crdp->cbs)) {
call_rcu_wait(crdp);
poll(NULL, 0, 10);
uatomic_dec(&crdp->futex);
@@ -625,32 +618,31 @@ void call_rcu(struct rcu_head *head,
*/
void call_rcu_data_free(struct call_rcu_data *crdp)
{
- struct cds_wfq_node *cbs;
- struct cds_wfq_node **cbs_tail;
- struct cds_wfq_node **cbs_endprev;
+ struct cds_wfq_node *head, *tail;
if (crdp == NULL || crdp == default_call_rcu_data) {
return;
}
+
if ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == 0) {
uatomic_or(&crdp->flags, URCU_CALL_RCU_STOP);
wake_call_rcu_thread(crdp);
while ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == 0)
poll(NULL, 0, 1);
}
- if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) {
- while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL)
- poll(NULL, 0, 1);
- _CMM_STORE_SHARED(crdp->cbs.head, NULL);
- cbs_tail = (struct cds_wfq_node **)
- uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head);
+
+ if (!cds_wfq_empty(&crdp->cbs)) {
+ head = __cds_wfq_dequeue_all_blocking(&crdp->cbs, &tail);
+ assert(head);
+
/* Create default call rcu data if need be */
(void) get_default_call_rcu_data();
- cbs_endprev = (struct cds_wfq_node **)
- uatomic_xchg(&default_call_rcu_data, cbs_tail);
- *cbs_endprev = cbs;
+
+ __cds_wfq_append_list(&default_call_rcu_data->cbs, head, tail);
+
uatomic_add(&default_call_rcu_data->qlen,
uatomic_read(&crdp->qlen));
+
wake_call_rcu_thread(default_call_rcu_data);
}
diff --git a/urcu/static/wfqueue.h b/urcu/static/wfqueue.h
index 636e1af..15ea9fc 100644
--- a/urcu/static/wfqueue.h
+++ b/urcu/static/wfqueue.h
@@ -10,6 +10,7 @@
* dynamically with the userspace rcu library.
*
* Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ * Copyright 2011-2012 - Lai Jiangshan <laijs at cn.fujitsu.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
@@ -29,6 +30,7 @@
#include <pthread.h>
#include <assert.h>
#include <poll.h>
+#include <stdbool.h>
#include <urcu/compiler.h>
#include <urcu/uatomic.h>
@@ -38,8 +40,6 @@ extern "C" {
/*
* Queue with wait-free enqueue/blocking dequeue.
- * This implementation adds a dummy head node when the queue is empty to ensure
- * we can always update the queue locklessly.
*
* Inspired from half-wait-free/half-blocking queue implementation done by
* Paul E. McKenney.
@@ -57,31 +57,43 @@ static inline void _cds_wfq_init(struct cds_wfq_queue *q)
{
int ret;
- _cds_wfq_node_init(&q->dummy);
/* Set queue head and tail */
- q->head = &q->dummy;
- q->tail = &q->dummy.next;
+ _cds_wfq_node_init(&q->head);
+ q->tail = &q->head;
ret = pthread_mutex_init(&q->lock, NULL);
assert(!ret);
}
-static inline void _cds_wfq_enqueue(struct cds_wfq_queue *q,
- struct cds_wfq_node *node)
+static inline bool _cds_wfq_empty(struct cds_wfq_queue *q)
{
- struct cds_wfq_node **old_tail;
+ /*
+ * Queue is empty if no node is pointed by q->head.next nor q->tail.
+ */
+ return q->head.next == NULL && CMM_LOAD_SHARED(q->tail) == &q->head;
+}
+static inline void ___cds_wfq_append_list(struct cds_wfq_queue *q,
+ struct cds_wfq_node *head, struct cds_wfq_node *tail)
+{
/*
* uatomic_xchg() implicit memory barrier orders earlier stores to data
* structure containing node and setting node->next to NULL before
* publication.
*/
- old_tail = uatomic_xchg(&q->tail, &node->next);
+ tail = uatomic_xchg(&q->tail, tail);
+
/*
- * At this point, dequeuers see a NULL old_tail->next, which indicates
+ * At this point, dequeuers see a NULL tail->next, which indicates
* that the queue is being appended to. The following store will append
* "node" to the queue from a dequeuer perspective.
*/
- CMM_STORE_SHARED(*old_tail, node);
+ CMM_STORE_SHARED(tail->next, head);
+}
+
+static inline void _cds_wfq_enqueue(struct cds_wfq_queue *q,
+ struct cds_wfq_node *node)
+{
+ ___cds_wfq_append_list(q, node, node);
}
/*
@@ -120,27 +132,46 @@ ___cds_wfq_dequeue_blocking(struct cds_wfq_queue *q)
{
struct cds_wfq_node *node, *next;
- /*
- * Queue is empty if it only contains the dummy node.
- */
- if (q->head == &q->dummy && CMM_LOAD_SHARED(q->tail) == &q->dummy.next)
+ if (_cds_wfq_empty(q))
return NULL;
- node = q->head;
- next = ___cds_wfq_node_sync_next(node);
+ node = ___cds_wfq_node_sync_next(&q->head);
+
+ if ((next = CMM_LOAD_SHARED(node->next)) == NULL) {
+ if (CMM_LOAD_SHARED(q->tail) == node) {
+ /*
+ * @node is the only node in the queue.
+ * Try to move the tail to &q->head
+ */
+ _cds_wfq_node_init(&q->head);
+ if (uatomic_cmpxchg(&q->tail, node, &q->head) == node)
+ return node;
+ }
+ next = ___cds_wfq_node_sync_next(node);
+ }
/*
* Move queue head forward.
*/
- q->head = next;
- /*
- * Requeue dummy node if we just dequeued it.
- */
- if (node == &q->dummy) {
- _cds_wfq_node_init(node);
- _cds_wfq_enqueue(q, node);
- return ___cds_wfq_dequeue_blocking(q);
- }
+ q->head.next = next;
+
+ return node;
+}
+
+/* dequeue all nodes, the nodes are not synchronized for the next pointer */
+static inline struct cds_wfq_node *
+___cds_wfq_dequeue_all_blocking(struct cds_wfq_queue *q,
+ struct cds_wfq_node **tail)
+{
+ struct cds_wfq_node *node;
+
+ if (_cds_wfq_empty(q))
+ return NULL;
+
+ node = ___cds_wfq_node_sync_next(&q->head);
+ _cds_wfq_node_init(&q->head);
+ *tail = uatomic_xchg(&q->tail, &q->head);
+
return node;
}
@@ -158,6 +189,27 @@ _cds_wfq_dequeue_blocking(struct cds_wfq_queue *q)
return retnode;
}
+static inline struct cds_wfq_node *
+_cds_wfq_dequeue_all_blocking(struct cds_wfq_queue *q,
+ struct cds_wfq_node **tail)
+{
+ struct cds_wfq_node *node, *next;
+ int ret;
+
+ ret = pthread_mutex_lock(&q->lock);
+ assert(!ret);
+ node = ___cds_wfq_dequeue_all_blocking(q, tail);
+ ret = pthread_mutex_unlock(&q->lock);
+ assert(!ret);
+
+ /* synchronize all nodes' next pointer */
+ next = node;
+ while (next != *tail)
+ next = ___cds_wfq_node_sync_next(next);
+
+ return node;
+}
+
#ifdef __cplusplus
}
#endif
diff --git a/urcu/wfqueue.h b/urcu/wfqueue.h
index 03a73f1..985f540 100644
--- a/urcu/wfqueue.h
+++ b/urcu/wfqueue.h
@@ -7,6 +7,7 @@
* Userspace RCU library - Queue with Wait-Free Enqueue/Blocking Dequeue
*
* Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ * Copyright 2011-2012 - Lai Jiangshan <laijs at cn.fujitsu.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
@@ -25,6 +26,7 @@
#include <pthread.h>
#include <assert.h>
+#include <stdbool.h>
#include <urcu/compiler.h>
#ifdef __cplusplus
@@ -33,8 +35,6 @@ extern "C" {
/*
* Queue with wait-free enqueue/blocking dequeue.
- * This implementation adds a dummy head node when the queue is empty to ensure
- * we can always update the queue locklessly.
*
* Inspired from half-wait-free/half-blocking queue implementation done by
* Paul E. McKenney.
@@ -45,8 +45,7 @@ struct cds_wfq_node {
};
struct cds_wfq_queue {
- struct cds_wfq_node *head, **tail;
- struct cds_wfq_node dummy; /* Dummy node */
+ struct cds_wfq_node head, *tail;
pthread_mutex_t lock;
};
@@ -56,18 +55,36 @@ struct cds_wfq_queue {
#define cds_wfq_node_init _cds_wfq_node_init
#define cds_wfq_init _cds_wfq_init
+#define cds_wfq_empty _cds_wfq_empty
+#define __cds_wfq_append_list ___cds_wfq_append_list
#define cds_wfq_enqueue _cds_wfq_enqueue
#define __cds_wfq_dequeue_blocking ___cds_wfq_dequeue_blocking
#define cds_wfq_dequeue_blocking _cds_wfq_dequeue_blocking
+#define __cds_wfq_node_sync_next ___cds_wfq_node_sync_next
+#define __cds_wfq_dequeue_all_blocking ___cds_wfq_dequeue_all_blocking
+#define cds_wfq_dequeue_all_blocking _cds_wfq_dequeue_all_blocking
#else /* !_LGPL_SOURCE */
extern void cds_wfq_node_init(struct cds_wfq_node *node);
extern void cds_wfq_init(struct cds_wfq_queue *q);
+extern bool cds_wfq_empty(struct cds_wfq_queue *q);
+/* __cds_wfq_append_list: caller ensures mutual exclusion between dequeues */
+extern void __cds_wfq_append_list(struct cds_wfq_queue *q,
+ struct cds_wfq_node *head, struct cds_wfq_node *tail);
extern void cds_wfq_enqueue(struct cds_wfq_queue *q, struct cds_wfq_node *node);
/* __cds_wfq_dequeue_blocking: caller ensures mutual exclusion between dequeues */
extern struct cds_wfq_node *__cds_wfq_dequeue_blocking(struct cds_wfq_queue *q);
extern struct cds_wfq_node *cds_wfq_dequeue_blocking(struct cds_wfq_queue *q);
+extern struct cds_wfq_node *__cds_wfq_node_sync_next(struct cds_wfq_node *node);
+/*
+ * __cds_wfq_dequeue_all_blocking: caller ensures mutual exclusion between
+ * dequeues, and need synchronize next pointer berfore use it.
+ */
+extern struct cds_wfq_node *__cds_wfq_dequeue_all_blocking(
+ struct cds_wfq_queue *q, struct cds_wfq_node **tail);
+extern struct cds_wfq_node *cds_wfq_dequeue_all_blocking(
+ struct cds_wfq_queue *q, struct cds_wfq_node **tail);
#endif /* !_LGPL_SOURCE */
diff --git a/wfqueue.c b/wfqueue.c
index 3337171..28a7b58 100644
--- a/wfqueue.c
+++ b/wfqueue.c
@@ -4,6 +4,7 @@
* Userspace RCU library - Queue with Wait-Free Enqueue/Blocking Dequeue
*
* Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ * Copyright 2011-2012 - Lai Jiangshan <laijs at cn.fujitsu.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
@@ -38,6 +39,17 @@ void cds_wfq_init(struct cds_wfq_queue *q)
_cds_wfq_init(q);
}
+bool cds_wfq_empty(struct cds_wfq_queue *q)
+{
+ return _cds_wfq_empty(q);
+}
+
+void __cds_wfq_append_list(struct cds_wfq_queue *q,
+ struct cds_wfq_node *head, struct cds_wfq_node *tail)
+{
+ return ___cds_wfq_append_list(q, head, tail);
+}
+
void cds_wfq_enqueue(struct cds_wfq_queue *q, struct cds_wfq_node *node)
{
_cds_wfq_enqueue(q, node);
@@ -52,3 +64,20 @@ struct cds_wfq_node *cds_wfq_dequeue_blocking(struct cds_wfq_queue *q)
{
return _cds_wfq_dequeue_blocking(q);
}
+
+struct cds_wfq_node *__cds_wfq_node_sync_next(struct cds_wfq_node *node)
+{
+ return ___cds_wfq_node_sync_next(node);
+}
+
+struct cds_wfq_node *__cds_wfq_dequeue_all_blocking(
+ struct cds_wfq_queue *q, struct cds_wfq_node **tail)
+{
+ return ___cds_wfq_dequeue_all_blocking(q, tail);
+}
+
+struct cds_wfq_node *cds_wfq_dequeue_all_blocking(
+ struct cds_wfq_queue *q, struct cds_wfq_node **tail)
+{
+ return _cds_wfq_dequeue_all_blocking(q, tail);
+}
--
1.7.7
More information about the lttng-dev
mailing list