[ltt-dev] [URCU pull] lfqueue-dev branch into master
Mathieu Desnoyers
mathieu.desnoyers at efficios.com
Wed Aug 17 06:13:08 EDT 2011
Hi,
I'm about to pull the lfqueue-dev branch into the master branch. The
goal is to simplify the currently overly complex urcu/rculfqueue.h RCU
lock-free queue API. I've got another implementation ported from an
original implementation by Lai Jiangshan awaiting in the lfqueue-dev2
branch, but I have not got time to get comfortable with it yet (it's
rather more complex. Lai's implementation has the advantage of being
entirely independent from the memory allocator, even for
dequeue-the-last operation).
So meanwhile, I really want to clean up the API and remove the
dependency of the current queue on the reference counter. The diff
follows. It will be in the master branch for now, so please feel free to
test and provide feedback before the next release,
Thanks!
Mathieu
diff --git a/rculfqueue.c b/rculfqueue.c
index 0daee5d..09ba9cf 100644
--- a/rculfqueue.c
+++ b/rculfqueue.c
@@ -39,9 +39,15 @@ void cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node)
}
void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q,
- void (*release)(struct urcu_ref *ref))
+ void queue_call_rcu(struct rcu_head *head,
+ void (*func)(struct rcu_head *head)))
{
- _cds_lfq_init_rcu(q, release);
+ _cds_lfq_init_rcu(q, queue_call_rcu);
+}
+
+int cds_lfq_destroy_rcu(struct cds_lfq_queue_rcu *q)
+{
+ return _cds_lfq_destroy_rcu(q);
}
void cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *node)
diff --git a/tests/test_urcu_lfq.c b/tests/test_urcu_lfq.c
index cb50586..b61a7d4 100644
--- a/tests/test_urcu_lfq.c
+++ b/tests/test_urcu_lfq.c
@@ -66,6 +66,7 @@ static inline pid_t gettid(void)
#endif
#include <urcu.h>
#include <urcu/cds.h>
+#include <urcu-defer.h>
static volatile int test_go, test_stop;
@@ -204,20 +205,6 @@ fail:
}
-static void rcu_free_node(struct rcu_head *head)
-{
- struct cds_lfq_node_rcu *node =
- caa_container_of(head, struct cds_lfq_node_rcu, rcu_head);
- free(node);
-}
-
-static void ref_release_node(struct urcu_ref *ref)
-{
- struct cds_lfq_node_rcu *node =
- caa_container_of(ref, struct cds_lfq_node_rcu, ref);
- call_rcu(&node->rcu_head, rcu_free_node);
-}
-
void *thr_dequeuer(void *_count)
{
unsigned long long *count = _count;
@@ -228,6 +215,11 @@ void *thr_dequeuer(void *_count)
set_affinity();
+ ret = rcu_defer_register_thread();
+ if (ret) {
+ printf("Error in rcu_defer_register_thread\n");
+ exit(-1);
+ }
rcu_register_thread();
while (!test_go)
@@ -243,7 +235,7 @@ void *thr_dequeuer(void *_count)
rcu_read_unlock();
if (node) {
- urcu_ref_put(&node->ref, ref_release_node);
+ defer_rcu(free, node);
nr_successful_dequeues++;
}
@@ -255,6 +247,7 @@ void *thr_dequeuer(void *_count)
}
rcu_unregister_thread();
+ rcu_defer_unregister_thread();
printf_verbose("dequeuer thread_end, thread id : %lx, tid %lu, "
"dequeues %llu, successful_dequeues %llu\n",
pthread_self(), (unsigned long)gettid(), nr_dequeues,
@@ -264,12 +257,6 @@ void *thr_dequeuer(void *_count)
return ((void*)2);
}
-static void release_node(struct urcu_ref *ref)
-{
- struct cds_lfq_node_rcu *node = caa_container_of(ref, struct cds_lfq_node_rcu, ref);
- free(node);
-}
-
void test_end(struct cds_lfq_queue_rcu *q, unsigned long long *nr_dequeues)
{
struct cds_lfq_node_rcu *node;
@@ -279,7 +266,7 @@ void test_end(struct cds_lfq_queue_rcu *q, unsigned long long *nr_dequeues)
node = cds_lfq_dequeue_rcu(q);
rcu_read_unlock();
if (node) {
- urcu_ref_put(&node->ref, release_node);
+ free(node); /* no more concurrent access */
(*nr_dequeues)++;
}
} while (node);
@@ -376,7 +363,7 @@ int main(int argc, char **argv)
tid_dequeuer = malloc(sizeof(*tid_dequeuer) * nr_dequeuers);
count_enqueuer = malloc(2 * sizeof(*count_enqueuer) * nr_enqueuers);
count_dequeuer = malloc(2 * sizeof(*count_dequeuer) * nr_dequeuers);
- cds_lfq_init_rcu(&q, ref_release_node);
+ cds_lfq_init_rcu(&q, call_rcu);
next_aff = 0;
@@ -421,6 +408,8 @@ int main(int argc, char **argv)
}
test_end(&q, &end_dequeues);
+ err = cds_lfq_destroy_rcu(&q);
+ assert(!err);
printf_verbose("total number of enqueues : %llu, dequeues %llu\n",
tot_enqueues, tot_dequeues);
diff --git a/urcu/rculfqueue.h b/urcu/rculfqueue.h
index fbef6f9..598fa50 100644
--- a/urcu/rculfqueue.h
+++ b/urcu/rculfqueue.h
@@ -23,36 +23,24 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
-#include <urcu/ref.h>
#include <assert.h>
+#include <urcu-call-rcu.h>
#ifdef __cplusplus
extern "C" {
#endif
-/*
- * Lock-free RCU queue using reference counting. Enqueue and dequeue operations
- * hold a RCU read lock to deal with cmpxchg ABA problem. This implementation
- * keeps a dummy head node to ensure we can always update the queue locklessly.
- * Given that this is a queue, the dummy head node must always advance as we
- * dequeue entries. Therefore, we keep a reference count on each entry we are
- * dequeueing, so they can be kept as dummy head node until the next dequeue, at
- * which point their reference count will be decremented.
- */
-
struct cds_lfq_queue_rcu;
struct cds_lfq_node_rcu {
struct cds_lfq_node_rcu *next;
- struct urcu_ref ref;
- struct cds_lfq_queue_rcu *queue;
- struct rcu_head rcu_head;
+ int dummy;
};
struct cds_lfq_queue_rcu {
struct cds_lfq_node_rcu *head, *tail;
- struct cds_lfq_node_rcu init; /* Dummy initialization node */
- void (*release)(struct urcu_ref *ref);
+ void (*queue_call_rcu)(struct rcu_head *head,
+ void (*func)(struct rcu_head *head));
};
#ifdef _LGPL_SOURCE
@@ -61,6 +49,7 @@ struct cds_lfq_queue_rcu {
#define cds_lfq_node_init_rcu _cds_lfq_node_init_rcu
#define cds_lfq_init_rcu _cds_lfq_init_rcu
+#define cds_lfq_destroy_rcu _cds_lfq_destroy_rcu
#define cds_lfq_enqueue_rcu _cds_lfq_enqueue_rcu
#define cds_lfq_dequeue_rcu _cds_lfq_dequeue_rcu
@@ -68,7 +57,14 @@ struct cds_lfq_queue_rcu {
extern void cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node);
extern void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q,
- void (*release)(struct urcu_ref *ref));
+ void queue_call_rcu(struct rcu_head *head,
+ void (*func)(struct rcu_head *head)));
+/*
+ * The queue should be emptied before calling destroy.
+ *
+ * Return 0 on success, -EPERM if queue is not empty.
+ */
+extern int cds_lfq_destroy_rcu(struct cds_lfq_queue_rcu *q);
/*
* Should be called under rcu read lock critical section.
@@ -79,12 +75,9 @@ extern void cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q,
/*
* Should be called under rcu read lock critical section.
*
- * The entry returned by dequeue must be taken care of by doing a
- * sequence of urcu_ref_put which release handler should do a call_rcu.
- *
- * In other words, the entry lfq node returned by dequeue must not be
- * modified/re-used/freed until the reference count reaches zero and a grace
- * period has elapsed (after the refcount reached 0).
+ * The caller must wait for a grace period to pass before freeing the returned
+ * node or modifying the cds_lfq_node_rcu structure.
+ * Returns NULL if queue is empty.
*/
extern
struct cds_lfq_node_rcu *cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q);
diff --git a/urcu/static/rculfqueue.h b/urcu/static/rculfqueue.h
index b627e45..fea6110 100644
--- a/urcu/static/rculfqueue.h
+++ b/urcu/static/rculfqueue.h
@@ -26,52 +26,122 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
-#include <urcu/ref.h>
+#include <urcu-call-rcu.h>
#include <urcu/uatomic.h>
#include <assert.h>
-/* A urcu implementation header should be already included. */
+#include <errno.h>
#ifdef __cplusplus
extern "C" {
#endif
+struct cds_lfq_node_rcu_dummy {
+ struct cds_lfq_node_rcu parent;
+ struct rcu_head head;
+ struct cds_lfq_queue_rcu *q;
+};
+
/*
- * Lock-free RCU queue using reference counting. Enqueue and dequeue operations
- * hold a RCU read lock to deal with cmpxchg ABA problem. This implementation
- * keeps a dummy head node to ensure we can always update the queue locklessly.
- * Given that this is a queue, the dummy head node must always advance as we
- * dequeue entries. Therefore, we keep a reference count on each entry we are
- * dequeueing, so they can be kept as dummy head node until the next dequeue, at
- * which point their reference count will be decremented.
+ * Lock-free RCU queue. Enqueue and dequeue operations hold a RCU read
+ * lock to deal with cmpxchg ABA problem. This queue is *not* circular:
+ * head points to the oldest node, tail points to the newest node.
+ * A dummy node is kept to ensure enqueue and dequeue can always proceed
+ * concurrently. Keeping a separate head and tail helps with large
+ * queues: enqueue and dequeue can proceed concurrently without
+ * wrestling for exclusive access to the same variables.
+ *
+ * Dequeue retry if it detects that it would be dequeueing the last node
+ * (it means a dummy node dequeue-requeue is in progress). This ensures
+ * that there is always at least one node in the queue.
+ *
+ * In the dequeue operation, we internally reallocate the dummy node
+ * upon dequeue/requeue and use call_rcu to free the old one after a
+ * grace period.
*/
-#define URCU_LFQ_PERMANENT_REF 128
+static inline
+struct cds_lfq_node_rcu *make_dummy(struct cds_lfq_queue_rcu *q,
+ struct cds_lfq_node_rcu *next)
+{
+ struct cds_lfq_node_rcu_dummy *dummy;
+
+ dummy = malloc(sizeof(struct cds_lfq_node_rcu_dummy));
+ assert(dummy);
+ dummy->parent.next = next;
+ dummy->parent.dummy = 1;
+ dummy->q = q;
+ return &dummy->parent;
+}
+
+static inline
+void free_dummy_cb(struct rcu_head *head)
+{
+ struct cds_lfq_node_rcu_dummy *dummy =
+ caa_container_of(head, struct cds_lfq_node_rcu_dummy, head);
+ free(dummy);
+}
+
+static inline
+void rcu_free_dummy(struct cds_lfq_node_rcu *node)
+{
+ struct cds_lfq_node_rcu_dummy *dummy;
+ assert(node->dummy);
+ dummy = caa_container_of(node, struct cds_lfq_node_rcu_dummy, parent);
+ dummy->q->queue_call_rcu(&dummy->head, free_dummy_cb);
+}
+
+static inline
+void free_dummy(struct cds_lfq_node_rcu *node)
+{
+ struct cds_lfq_node_rcu_dummy *dummy;
+
+ assert(node->dummy);
+ dummy = caa_container_of(node, struct cds_lfq_node_rcu_dummy, parent);
+ free(dummy);
+}
+
+static inline
void _cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node)
{
node->next = NULL;
- urcu_ref_init(&node->ref);
+ node->dummy = 0;
}
+static inline
void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q,
- void (*release)(struct urcu_ref *ref))
+ void queue_call_rcu(struct rcu_head *head,
+ void (*func)(struct rcu_head *head)))
+{
+ q->tail = make_dummy(q, NULL);
+ q->head = q->tail;
+ q->queue_call_rcu = queue_call_rcu;
+}
+
+/*
+ * The queue should be emptied before calling destroy.
+ *
+ * Return 0 on success, -EPERM if queue is not empty.
+ */
+static inline
+int _cds_lfq_destroy_rcu(struct cds_lfq_queue_rcu *q)
{
- _cds_lfq_node_init_rcu(&q->init);
- /* Make sure the initial node is never freed. */
- urcu_ref_set(&q->init.ref, URCU_LFQ_PERMANENT_REF);
- q->head = q->tail = &q->init;
- q->release = release;
+ struct cds_lfq_node_rcu *head;
+
+ head = rcu_dereference(q->head);
+ if (!(head->dummy && head->next == NULL))
+ return -EPERM; /* not empty */
+ free_dummy(head);
+ return 0;
}
/*
* Should be called under rcu read lock critical section.
*/
+static inline
void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q,
struct cds_lfq_node_rcu *node)
{
- urcu_ref_get(&node->ref);
- node->queue = q;
-
/*
* uatomic_cmpxchg() implicit memory barrier orders earlier stores to
* node before publication.
@@ -81,23 +151,19 @@ void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q,
struct cds_lfq_node_rcu *tail, *next;
tail = rcu_dereference(q->tail);
- /*
- * Typically expect tail->next to be NULL.
- */
next = uatomic_cmpxchg(&tail->next, NULL, node);
if (next == NULL) {
/*
* Tail was at the end of queue, we successfully
- * appended to it.
- * Now move tail (another enqueue might beat
- * us to it, that's fine).
+ * appended to it. Now move tail (another
+ * enqueue might beat us to it, that's fine).
*/
(void) uatomic_cmpxchg(&q->tail, tail, node);
return;
} else {
/*
- * Failure to append to current tail. Help moving tail
- * further and retry.
+ * Failure to append to current tail.
+ * Help moving tail further and retry.
*/
(void) uatomic_cmpxchg(&q->tail, tail, next);
continue;
@@ -105,16 +171,24 @@ void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q,
}
}
+static inline
+void enqueue_dummy(struct cds_lfq_queue_rcu *q)
+{
+ struct cds_lfq_node_rcu *node;
+
+ /* We need to reallocate to protect from ABA. */
+ node = make_dummy(q, NULL);
+ _cds_lfq_enqueue_rcu(q, node);
+}
+
/*
* Should be called under rcu read lock critical section.
*
- * The entry returned by dequeue must be taken care of by doing a
- * sequence of urcu_ref_put which release handler should do a call_rcu.
- *
- * In other words, the entry lfq node returned by dequeue must not be
- * modified/re-used/freed until the reference count reaches zero and a grace
- * period has elapsed.
+ * The caller must wait for a grace period to pass before freeing the returned
+ * node or modifying the cds_lfq_node_rcu structure.
+ * Returns NULL if queue is empty.
*/
+static inline
struct cds_lfq_node_rcu *_cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q)
{
for (;;) {
@@ -122,18 +196,27 @@ struct cds_lfq_node_rcu *_cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q)
head = rcu_dereference(q->head);
next = rcu_dereference(head->next);
- if (next) {
- if (uatomic_cmpxchg(&q->head, head, next) == head) {
- urcu_ref_put(&head->ref, q->release);
- return next;
- } else {
- /* Concurrently pushed, retry */
- continue;
- }
- } else {
- /* Empty */
- return NULL;
+ if (head->dummy && next == NULL)
+ return NULL; /* empty */
+ /*
+ * We never, ever allow dequeue to get to a state where
+ * the queue is empty (we need at least one node in the
+ * queue). This is ensured by checking if the head next
+ * is NULL, which means we need to enqueue a dummy node
+ * before we can hope dequeuing anything.
+ */
+ if (!next) {
+ enqueue_dummy(q);
+ next = rcu_dereference(head->next);
+ }
+ if (uatomic_cmpxchg(&q->head, head, next) != head)
+ continue; /* Concurrently pushed. */
+ if (head->dummy) {
+ /* Free dummy after grace period. */
+ rcu_free_dummy(head);
+ continue; /* try again */
}
+ return head;
}
}
diff --git a/urcu/static/rculfstack.h b/urcu/static/rculfstack.h
index 3f48b7e..ba26231 100644
--- a/urcu/static/rculfstack.h
+++ b/urcu/static/rculfstack.h
@@ -27,7 +27,6 @@
*/
#include <urcu/uatomic.h>
-/* A urcu implementation header should be already included. */
#ifdef __cplusplus
extern "C" {
--
Mathieu Desnoyers
Operating System Efficiency R&D Consultant
EfficiOS Inc.
http://www.efficios.com
More information about the lttng-dev
mailing list