[ltt-dev] [PATCH 3/3] lock-free queue with RCU-based garbage collection

Mathieu Desnoyers compudj at krystal.dyndns.org
Thu Mar 11 17:09:21 EST 2010


* Paolo Bonzini (pbonzini at redhat.com) wrote:
> A lock-free queue example using RCU to avoid the need for double-word
> compare-and-swap and, at the same time, to implement efficient garbage
> collection.  I tested it lightly, and I don't think it's fully ready
> for inclusion.  It does need the previous two patches to avoid deadlocks,
> so I did test it somewhat. :-)

It could be nice to try separating memory allocation from this lockless
queue implementation. I think they belong to different problem space,
and that we should just let the users provide whatever allocator they
use. Possibly, if you need internal copies within your data structure,
that the caller should pass pointers to its malloc/free implementation.

Then providing a lock-free malloc/free could come as a separate lib.

By the way, if you could provide a few links to articles/books from
where you got the lockless algorithms inspiration with the source-code
comments, that would be helpful.

Thanks!

Mathieu

> 
> See individual files for detailed comments on how it works.
> 
> Cc: Paul E. McKenney <paulmck at linux.vnet.ibm.com>
> ---
>  tests/Makefile.am     |   36 +++-
>  tests/test_qsbr_lfq.c |  587 +++++++++++++++++++++++++++++++++++++++++++++++++
>  tests/test_urcu_lfq.c |  585 ++++++++++++++++++++++++++++++++++++++++++++++++
>  3 files changed, 1207 insertions(+), 1 deletions(-)
>  create mode 100644 tests/test_qsbr_lfq.c
>  create mode 100644 tests/test_urcu_lfq.c
> 
> diff --git a/tests/Makefile.am b/tests/Makefile.am
> index 17b89db..2ebaae5 100644
> --- a/tests/Makefile.am
> +++ b/tests/Makefile.am
> @@ -12,7 +12,12 @@ noinst_PROGRAMS = test_urcu test_urcu_dynamic_link test_urcu_timing \
>          test_urcu_mb_gc test_qsbr_gc test_qsbr_lgc test_urcu_signal_lgc \
>          test_urcu_mb_lgc test_qsbr_dynamic_link test_urcu_defer \
>          test_uatomic test_urcu_assign test_urcu_assign_dynamic_link \
> -        test_urcu_bp test_urcu_bp_dynamic_link
> +        test_urcu_bp test_urcu_bp_dynamic_link \
> +        test_urcu_lfq test_urcu_signal_lfq test_urcu_dynamic_link_lfq \
> +        test_urcu_signal_dynamic_link_lfq test_urcu_mb_lfq \
> +        test_urcu_yield_lfq test_urcu_signal_yield_lfq test_urcu_defer_lfq \
> +	test_qsbr_lfq test_qsbr_dynamic_link_lfq
> +
>  noinst_HEADERS = rcutorture.h
>  
>  if COMPAT_ARCH
> @@ -149,4 +154,33 @@ test_urcu_bp_SOURCES = test_urcu_bp.c $(URCU_BP)
>  test_urcu_bp_dynamic_link_SOURCES = test_urcu_bp.c $(URCU_BP)
>  test_urcu_bp_dynamic_link_CFLAGS = -DDYNAMIC_LINK_TEST $(AM_CFLAGS)
>  
> +test_urcu_lfq_SOURCES = test_urcu_lfq.c $(URCU)
> +
> +test_urcu_defer_lfq_SOURCES = test_urcu_defer_lfq.c $(URCU_DEFER)
> +
> +test_urcu_signal_lfq_SOURCES = test_urcu_lfq.c $(URCU_SIGNAL)
> +test_urcu_signal_lfq_CFLAGS = -DRCU_SIGNAL $(AM_CFLAGS)
> +
> +test_urcu_dynamic_link_lfq_SOURCES = test_urcu_lfq.c $(URCU)
> +test_urcu_dynamic_link_lfq_CFLAGS = -DDYNAMIC_LINK_TEST $(AM_CFLAGS)
> +
> +test_urcu_signal_dynamic_link_lfq_SOURCES = test_urcu_lfq.c $(URCU)
> +test_urcu_signal_dynamic_link_lfq_CFLAGS = -DRCU_SIGNAL -DDYNAMIC_LINK_TEST $(AM_CFLAGS)
> +
> +test_urcu_mb_lfq_SOURCES = test_urcu_lfq.c $(URCU_MB)
> +test_urcu_mb_lfq_CFLAGS = -DRCU_MB $(AM_CFLAGS)
> +
> +test_urcu_yield_lfq_SOURCES = test_urcu_lfq.c $(URCU)
> +test_urcu_yield_lfq_CFLAGS = -DDEBUG_YIELD $(AM_CFLAGS)
> +
> +test_urcu_signal_yield_lfq_SOURCES = test_urcu_lfq.c $(URCU_SIGNAL)
> +test_urcu_signal_yield_lfq_CFLAGS = -DRCU_SIGNAL -DDEBUG_YIELD $(AM_CFLAGS)
> +
> +test_qsbr_lfq_SOURCES = test_qsbr_lfq.c $(URCU_QSBR)
> +
> +test_qsbr_dynamic_link_lfq_SOURCES = test_qsbr_lfq.c $(URCU_QSBR)
> +test_qsbr_dynamic_link_lfq_CFLAGS = -DDYNAMIC_LINK_TEST $(AM_CFLAGS)
> +
> +
> +
>  urcutorture.c: api.h
> diff --git a/tests/test_qsbr_lfq.c b/tests/test_qsbr_lfq.c
> new file mode 100644
> index 0000000..2e3f130
> --- /dev/null
> +++ b/tests/test_qsbr_lfq.c
> @@ -0,0 +1,587 @@
> +/*
> + * test_urcu.c
> + *
> + * Userspace RCU library - example RCU-based lock-free queue
> + *
> + * Copyright February 2010 - Paolo Bonzini <pbonzinI at redhat.com>
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU General Public License as published by
> + * the Free Software Foundation; either version 2 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> + * GNU General Public License for more details.
> + *
> + * You should have received a copy of the GNU General Public License along
> + * with this program; if not, write to the Free Software Foundation, Inc.,
> + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
> + */
> +
> +#define _GNU_SOURCE
> +#include "../config.h"
> +#include <stdio.h>
> +#include <pthread.h>
> +#include <stdlib.h>
> +#include <stdint.h>
> +#include <stdbool.h>
> +#include <string.h>
> +#include <sys/types.h>
> +#include <sys/wait.h>
> +#include <unistd.h>
> +#include <stdio.h>
> +#include <assert.h>
> +#include <sys/syscall.h>
> +#include <sched.h>
> +#include <errno.h>
> +
> +#include <urcu/arch.h>
> +
> +/* hardcoded number of CPUs */
> +#define NR_CPUS 16384
> +
> +#if defined(_syscall0)
> +_syscall0(pid_t, gettid)
> +#elif defined(__NR_gettid)
> +static inline pid_t gettid(void)
> +{
> +	return syscall(__NR_gettid);
> +}
> +#else
> +#warning "use pid as tid"
> +static inline pid_t gettid(void)
> +{
> +	return getpid();
> +}
> +#endif
> +
> +#ifndef DYNAMIC_LINK_TEST
> +#define _LGPL_SOURCE
> +#endif
> +#include "urcu-qsbr.h"
> +
> +
> +static volatile int test_go, test_stop;
> +
> +static unsigned long rduration;
> +
> +static unsigned long duration;
> +
> +/* read-side C.S. duration, in loops */
> +static unsigned long wdelay;
> +
> +static inline void loop_sleep(unsigned long l)
> +{
> +	while(l-- != 0)
> +		cpu_relax();
> +}
> +
> +static int verbose_mode;
> +
> +#define printf_verbose(fmt, args...)		\
> +	do {					\
> +		if (verbose_mode)		\
> +			printf(fmt, args);	\
> +	} while (0)
> +
> +static unsigned int cpu_affinities[NR_CPUS];
> +static unsigned int next_aff = 0;
> +static int use_affinity = 0;
> +
> +pthread_mutex_t affinity_mutex = PTHREAD_MUTEX_INITIALIZER;
> +
> +#ifndef HAVE_CPU_SET_T
> +typedef unsigned long cpu_set_t;
> +# define CPU_ZERO(cpuset) do { *(cpuset) = 0; } while(0)
> +# define CPU_SET(cpu, cpuset) do { *(cpuset) |= (1UL << (cpu)); } while(0)
> +#endif
> +
> +static void set_affinity(void)
> +{
> +	cpu_set_t mask;
> +	int cpu;
> +	int ret;
> +
> +	if (!use_affinity)
> +		return;
> +
> +#if HAVE_SCHED_SETAFFINITY
> +	ret = pthread_mutex_lock(&affinity_mutex);
> +	if (ret) {
> +		perror("Error in pthread mutex lock");
> +		exit(-1);
> +	}
> +	cpu = cpu_affinities[next_aff++];
> +	ret = pthread_mutex_unlock(&affinity_mutex);
> +	if (ret) {
> +		perror("Error in pthread mutex unlock");
> +		exit(-1);
> +	}
> +
> +	CPU_ZERO(&mask);
> +	CPU_SET(cpu, &mask);
> +#if SCHED_SETAFFINITY_ARGS == 2
> +	sched_setaffinity(0, &mask);
> +#else
> +	sched_setaffinity(0, sizeof(mask), &mask);
> +#endif
> +#endif /* HAVE_SCHED_SETAFFINITY */
> +}
> +
> +/*
> + * returns 0 if test should end.
> + */
> +static int test_duration_dequeue(void)
> +{
> +	return !test_stop;
> +}
> +
> +static int test_duration_enqueue(void)
> +{
> +	return !test_stop;
> +}
> +
> +static unsigned long long __thread nr_dequeues;
> +static unsigned long long __thread nr_enqueues;
> +
> +static unsigned long nr_successful_dequeues;
> +static unsigned int nr_enqueuers;
> +static unsigned int nr_dequeuers;
> +
> +
> +#define ARRAY_POISON	0xDEADBEEF
> +#define PAGE_SIZE	4096
> +#define PAGE_MASK	(PAGE_SIZE - 1)
> +#define NODES_PER_PAGE	(PAGE_SIZE - 16) / sizeof (struct node)
> +
> +/* Lock-free queue, using the RCU to avoid the ABA problem and (more
> +   interestingly) to efficiently handle freeing memory.
> +
> +   We have to protect both the enqueuer and dequeuer's compare-and-
> +   exchange operation from running across a free and a subsequent
> +   reallocation of the same memory.  So, we protect the free with
> +   synchronize_rcu; this is enough because all the allocations take
> +   place before the compare-and-exchange ops.
> +
> +   Besides adding rcu_read_{,un}lock, the enqueue/dequeue are a standard
> +   implementation of a lock-free-queue.  The first node in the queue is
> +   always dummy: dequeuing returns the data from HEAD->NEXT, advances
> +   HEAD to HEAD->NEXT (which will now serve as dummy node), and frees the
> +   old HEAD.  Since RCU avoids the ABA problem, it doesn't use double-word
> +   compare-and-exchange operations.  Node allocation and deallocation are
> +   a "black box" and synchronize_rcu is hidden within node deallocation.
> +
> +   So, the tricky part is finding a good allocation strategy for nodes.
> +   The allocator for nodes is shared by multiple threads, and since
> +   malloc/free are not lock-free a layer above them is obviously
> +   necessary: otherwise the whole exercise is useless.  In addition,
> +   to avoid penalizing dequeues, the allocator should avoid frequent
> +   synchronization (because synchronize_rcu is expensive).
> +
> +   The scheme that is used here uses a page as the allocation
> +   unit for nodes.  A page is freed when no more nodes are in use.
> +   Nodes from a page are never reused.
> +
> +   The nodes are allocated from Q->CURRENT.  Since whoever finds a full
> +   page has to busy wait, we use a trick to limit the duration of busy
> +   waiting.  A free page Q->FREE is always kept ready, so that any thread
> +   that allocates the last node in a page, or finds a full page can try
> +   to update Q->CURRENT.  Whoever loses the race has to busy wait, OTOH
> +   whoever wins the race has to allocate the new Q->FREE.  In other words,  
> +   if the following sequence happens
> +
> +     Thread 1                    Thread 2                 other threads
> +     -----------------------------------------------------------------------
> +     Get last node from page
> +	                         q->current = q->free;
> +				                          fill up q->current
> +     q->current = q->free fails
> +
> +   then thread 1 does not have anymore the duty of allocating q->current;
> +   thread 2 will do that.  If a thread finds a full current page and
> +   Q->CURRENT == Q->FREE, this means that another thread is going to
> +   allocate Q->FREE soon, and it busy waits.  After the allocation
> +   finishes, everything proceeds normally: some thread will take care
> +   of setting Q->CURRENT and allocating a new Q->FREE.
> + 
> +   One common scheme for allocation is to use a free list (implemented
> +   as a lock-free stack), but this free list is potentially unbounded.
> +   Instead, with the above scheme the number of live pages at any time
> +   is equal to the number of enqueuing threads.  */
> +
> +struct node {
> +	void *data;
> +	void *next;
> +};
> +
> +struct node_page {
> +	int in;
> +	int out;
> +	char padding[16 - sizeof(int) * 2];
> +	struct node nodes[NODES_PER_PAGE];
> +};
> +
> +
> +struct queue {
> +	struct node_page *current, *free;
> +	struct node *head, *tail;
> +};
> +
> +static struct node_page *new_node_page()
> +{
> +	struct node_page *p = valloc (PAGE_SIZE);
> +	rcu_quiescent_state();
> +	p->in = p->out = 0;
> +	return p;
> +}
> +
> +static void free_node_page(struct node_page *p)
> +{
> +	/* Help making sure that accessing a dangling pointer is
> +	   adequately punished.  */
> +	p->in = ARRAY_POISON;
> +	free (p);
> +}
> +
> +static struct node *new_node(struct queue *q)
> +{
> +	struct node *n;
> +	struct node_page *p;
> +	int i;
> +
> +	do {
> +		p = q->current;
> +		i = p->in;
> +		if (i >= NODES_PER_PAGE - 1 &&
> +		    q->free != p &&
> +		    uatomic_cmpxchg(&q->current, p, q->free) == p)
> +			q->free = new_node_page();
> +
> +	} while (i == NODES_PER_PAGE || uatomic_cmpxchg(&p->in, i, i+1) != i);
> +
> +	assert (i >= 0 && i < NODES_PER_PAGE);
> +	n = &p->nodes[i];
> +	n->next = NULL;
> +	return n;
> +}
> +
> +void free_node(struct node *n)
> +{
> +	struct node_page *p = (struct node_page *) ((intptr_t) n & ~PAGE_MASK);
> +
> +	if (uatomic_add_return(&p->out, 1) == NODES_PER_PAGE) {
> +		rcu_quiescent_state();
> +		synchronize_rcu();
> +		free_node_page(p);
> +	}
> +}
> +
> +void init_queue(struct queue *q)
> +{
> +	q->current = new_node_page();
> +	q->free = new_node_page();
> +	q->head = q->tail = new_node(q);
> +}
> +
> +void enqueue(struct queue *q, void *value)
> +{
> +	struct node *n = new_node(q);
> +	n->data = value;
> +	rcu_read_lock();
> +	for (;;) {
> +		struct node *tail = rcu_dereference(q->tail);
> +		struct node *next = rcu_dereference(tail->next);
> +		if (tail != q->tail) {
> +			/* A change occurred while reading the values.  */
> +			continue;
> +		}
> +
> +		if (next) {
> +			/* Help moving tail further.  */
> +			uatomic_cmpxchg(&q->tail, tail, next);
> +			continue;
> +		}
> +
> +		if (uatomic_cmpxchg(&tail->next, NULL, n) == NULL) {
> +			/* Move tail (another operation might beat us to it,
> +			   that's fine).  */
> +			uatomic_cmpxchg(&q->tail, tail, n);
> +			rcu_read_unlock();
> +			return;
> +		}
> +	}
> +}
> +
> +void *dequeue(struct queue *q, bool *not_empty)
> +{
> +	bool dummy;
> +	if (!not_empty)
> +		not_empty = &dummy;
> +
> +	rcu_read_lock();
> +	*not_empty = false;
> +	for (;;) {
> +		void *data;
> +		struct node *head = rcu_dereference(q->head);
> +		struct node *tail = rcu_dereference(q->tail);
> +		struct node *next = rcu_dereference(head->next);
> +
> +		if (head != q->head) {
> +			/* A change occurred while reading the values.  */
> +			continue;
> +		}
> +
> +		if (head == tail) {
> +			/* If all three are consistent, the queue is empty.  */
> +			if (!next)
> +				return NULL;
> +
> +			/* Help moving tail further.  */
> +			uatomic_cmpxchg(&q->tail, tail, next);
> +			continue;
> +		}
> +
> +		data = next->data;
> +		if (uatomic_cmpxchg(&q->head, head, next) == head) {
> +			/* Next remains as a dummy node, head is freed.  */
> +			rcu_read_unlock();
> +			*not_empty = true;
> +			free_node (head);
> +			return data;
> +		}
> +	}
> +}
> +
> +
> +static struct queue q;
> +
> +void *thr_enqueuer(void *_count)
> +{
> +	unsigned long long *count = _count;
> +
> +	printf_verbose("thread_begin %s, thread id : %lx, tid %lu\n",
> +			"enqueuer", pthread_self(), (unsigned long)gettid());
> +
> +	set_affinity();
> +
> +	rcu_register_thread();
> +
> +	while (!test_go)
> +	{
> +	}
> +	smp_mb();
> +
> +	for (;;) {
> +		enqueue (&q, NULL);
> +
> +		if (unlikely(wdelay))
> +			loop_sleep(wdelay);
> +		nr_enqueues++;
> +		if (unlikely(!test_duration_enqueue()))
> +			break;
> +	}
> +
> +	rcu_unregister_thread();
> +
> +	*count = nr_enqueues;
> +	printf_verbose("thread_end %s, thread id : %lx, tid %lu - count %d\n",
> +		       "enqueuer", pthread_self(), (unsigned long)gettid(), nr_enqueues);
> +	return ((void*)1);
> +
> +}
> +
> +void *thr_dequeuer(void *_count)
> +{
> +	unsigned long long *count = _count;
> +
> +	printf_verbose("thread_begin %s, thread id : %lx, tid %lu\n",
> +			"dequeuer", pthread_self(), (unsigned long)gettid());
> +
> +	set_affinity();
> +
> +	rcu_register_thread();
> +
> +	while (!test_go)
> +	{
> +	}
> +	smp_mb();
> +
> +	for (;;) {
> +		bool not_empty;
> +		dequeue (&q, &not_empty);
> +		if (not_empty)
> +			uatomic_inc (&nr_successful_dequeues);
> +
> +		nr_dequeues++;
> +		if (unlikely(!test_duration_dequeue()))
> +			break;
> +		if (unlikely(rduration))
> +			loop_sleep(rduration);
> +	}
> +
> +	rcu_unregister_thread();
> +
> +	printf_verbose("thread_end %s, thread id : %lx, tid %lu - count %d\n",
> +			"dequeuer", pthread_self(), (unsigned long)gettid(), nr_dequeues);
> +	*count = nr_dequeues;
> +	return ((void*)2);
> +}
> +
> +void test_end(struct queue *q)
> +{
> +	bool not_empty;
> +	do
> +		dequeue (q, &not_empty);
> +	while (!not_empty);
> +	if (q->current != q->free)
> +		free_node_page(q->free);
> +	free_node_page(q->current);
> +}
> +
> +void show_usage(int argc, char **argv)
> +{
> +	printf("Usage : %s nr_dequeuers nr_enqueuers duration (s)", argv[0]);
> +	printf(" [-d delay] (enqueuer period (us))");
> +	printf(" [-c duration] (dequeuer C.S. duration (in loops))");
> +	printf(" [-v] (verbose output)");
> +	printf(" [-a cpu#] [-a cpu#]... (affinity)");
> +	printf("\n");
> +}
> +
> +int main(int argc, char **argv)
> +{
> +	int err;
> +	pthread_t *tid_enqueuer, *tid_dequeuer;
> +	void *tret;
> +	unsigned long long *count_enqueuer, *count_dequeuer;
> +	unsigned long long tot_enqueues = 0, tot_dequeues = 0;
> +	int i, a;
> +
> +	if (argc < 4) {
> +		show_usage(argc, argv);
> +		return -1;
> +	}
> +
> +	err = sscanf(argv[1], "%u", &nr_dequeuers);
> +	if (err != 1) {
> +		show_usage(argc, argv);
> +		return -1;
> +	}
> +
> +	err = sscanf(argv[2], "%u", &nr_enqueuers);
> +	if (err != 1) {
> +		show_usage(argc, argv);
> +		return -1;
> +	}
> +	
> +	err = sscanf(argv[3], "%lu", &duration);
> +	if (err != 1) {
> +		show_usage(argc, argv);
> +		return -1;
> +	}
> +
> +	for (i = 4; i < argc; i++) {
> +		if (argv[i][0] != '-')
> +			continue;
> +		switch (argv[i][1]) {
> +		case 'a':
> +			if (argc < i + 2) {
> +				show_usage(argc, argv);
> +				return -1;
> +			}
> +			a = atoi(argv[++i]);
> +			cpu_affinities[next_aff++] = a;
> +			use_affinity = 1;
> +			printf_verbose("Adding CPU %d affinity\n", a);
> +			break;
> +		case 'c':
> +			if (argc < i + 2) {
> +				show_usage(argc, argv);
> +				return -1;
> +			}
> +			rduration = atol(argv[++i]);
> +			break;
> +		case 'd':
> +			if (argc < i + 2) {
> +				show_usage(argc, argv);
> +				return -1;
> +			}
> +			wdelay = atol(argv[++i]);
> +			break;
> +		case 'v':
> +			verbose_mode = 1;
> +			break;
> +		}
> +	}
> +
> +	printf_verbose("running test for %lu seconds, %u enqueuers, %u dequeuers.\n",
> +		duration, nr_enqueuers, nr_dequeuers);
> +	printf_verbose("Writer delay : %lu loops.\n", rduration);
> +	printf_verbose("Reader duration : %lu loops.\n", wdelay);
> +	printf_verbose("thread %-6s, thread id : %lx, tid %lu\n",
> +			"main", pthread_self(), (unsigned long)gettid());
> +
> +	tid_enqueuer = malloc(sizeof(*tid_enqueuer) * nr_enqueuers);
> +	tid_dequeuer = malloc(sizeof(*tid_dequeuer) * nr_dequeuers);
> +	count_enqueuer = malloc(sizeof(*count_enqueuer) * nr_enqueuers);
> +	count_dequeuer = malloc(sizeof(*count_dequeuer) * nr_dequeuers);
> +	init_queue (&q);
> +
> +	next_aff = 0;
> +
> +	for (i = 0; i < nr_enqueuers; i++) {
> +		err = pthread_create(&tid_enqueuer[i], NULL, thr_enqueuer,
> +				     &count_enqueuer[i]);
> +		if (err != 0)
> +			exit(1);
> +	}
> +	for (i = 0; i < nr_dequeuers; i++) {
> +		err = pthread_create(&tid_dequeuer[i], NULL, thr_dequeuer,
> +				     &count_dequeuer[i]);
> +		if (err != 0)
> +			exit(1);
> +	}
> +
> +	smp_mb();
> +
> +	test_go = 1;
> +
> +	for (i = 0; i < duration; i++) {
> +		sleep(1);
> +		if (verbose_mode)
> +			write (1, ".", 1);
> +	}
> +
> +	test_stop = 1;
> +
> +	for (i = 0; i < nr_enqueuers; i++) {
> +		err = pthread_join(tid_enqueuer[i], &tret);
> +		if (err != 0)
> +			exit(1);
> +		tot_enqueues += count_enqueuer[i];
> +	}
> +	for (i = 0; i < nr_dequeuers; i++) {
> +		err = pthread_join(tid_dequeuer[i], &tret);
> +		if (err != 0)
> +			exit(1);
> +		tot_dequeues += count_dequeuer[i];
> +	}
> +	
> +	printf_verbose("total number of enqueues : %llu, dequeues %llu\n", tot_enqueues,
> +	       tot_dequeues);
> +	printf("SUMMARY %-25s testdur %4lu nr_enqueuers %3u wdelay %6lu "
> +		"nr_dequeuers %3u "
> +		"rdur %6lu nr_enqueues %12llu nr_dequeues %12llu successful %12lu nr_ops %12llu\n",
> +		argv[0], duration, nr_enqueuers, wdelay,
> +		nr_dequeuers, rduration, tot_enqueues, tot_dequeues,
> +		nr_successful_dequeues, tot_enqueues + tot_dequeues);
> +
> +	test_end(&q);
> +	free(tid_enqueuer);
> +	free(tid_dequeuer);
> +	free(count_enqueuer);
> +	free(count_dequeuer);
> +	return 0;
> +}
> diff --git a/tests/test_urcu_lfq.c b/tests/test_urcu_lfq.c
> new file mode 100644
> index 0000000..90587b7
> --- /dev/null
> +++ b/tests/test_urcu_lfq.c
> @@ -0,0 +1,585 @@
> +/*
> + * test_urcu.c
> + *
> + * Userspace RCU library - example RCU-based lock-free queue
> + *
> + * Copyright February 2010 - Paolo Bonzini <pbonzinI at redhat.com>
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU General Public License as published by
> + * the Free Software Foundation; either version 2 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> + * GNU General Public License for more details.
> + *
> + * You should have received a copy of the GNU General Public License along
> + * with this program; if not, write to the Free Software Foundation, Inc.,
> + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
> + */
> +
> +#define _GNU_SOURCE
> +#include "../config.h"
> +#include <stdio.h>
> +#include <pthread.h>
> +#include <stdlib.h>
> +#include <stdint.h>
> +#include <stdbool.h>
> +#include <string.h>
> +#include <sys/types.h>
> +#include <sys/wait.h>
> +#include <unistd.h>
> +#include <stdio.h>
> +#include <assert.h>
> +#include <sys/syscall.h>
> +#include <sched.h>
> +#include <errno.h>
> +
> +#include <urcu/arch.h>
> +
> +/* hardcoded number of CPUs */
> +#define NR_CPUS 16384
> +
> +#if defined(_syscall0)
> +_syscall0(pid_t, gettid)
> +#elif defined(__NR_gettid)
> +static inline pid_t gettid(void)
> +{
> +	return syscall(__NR_gettid);
> +}
> +#else
> +#warning "use pid as tid"
> +static inline pid_t gettid(void)
> +{
> +	return getpid();
> +}
> +#endif
> +
> +#ifndef DYNAMIC_LINK_TEST
> +#define _LGPL_SOURCE
> +#endif
> +#include <urcu.h>
> +
> +
> +static volatile int test_go, test_stop;
> +
> +static unsigned long rduration;
> +
> +static unsigned long duration;
> +
> +/* read-side C.S. duration, in loops */
> +static unsigned long wdelay;
> +
> +static inline void loop_sleep(unsigned long l)
> +{
> +	while(l-- != 0)
> +		cpu_relax();
> +}
> +
> +static int verbose_mode;
> +
> +#define printf_verbose(fmt, args...)		\
> +	do {					\
> +		if (verbose_mode)		\
> +			printf(fmt, args);	\
> +	} while (0)
> +
> +static unsigned int cpu_affinities[NR_CPUS];
> +static unsigned int next_aff = 0;
> +static int use_affinity = 0;
> +
> +pthread_mutex_t affinity_mutex = PTHREAD_MUTEX_INITIALIZER;
> +
> +#ifndef HAVE_CPU_SET_T
> +typedef unsigned long cpu_set_t;
> +# define CPU_ZERO(cpuset) do { *(cpuset) = 0; } while(0)
> +# define CPU_SET(cpu, cpuset) do { *(cpuset) |= (1UL << (cpu)); } while(0)
> +#endif
> +
> +static void set_affinity(void)
> +{
> +	cpu_set_t mask;
> +	int cpu;
> +	int ret;
> +
> +	if (!use_affinity)
> +		return;
> +
> +#if HAVE_SCHED_SETAFFINITY
> +	ret = pthread_mutex_lock(&affinity_mutex);
> +	if (ret) {
> +		perror("Error in pthread mutex lock");
> +		exit(-1);
> +	}
> +	cpu = cpu_affinities[next_aff++];
> +	ret = pthread_mutex_unlock(&affinity_mutex);
> +	if (ret) {
> +		perror("Error in pthread mutex unlock");
> +		exit(-1);
> +	}
> +
> +	CPU_ZERO(&mask);
> +	CPU_SET(cpu, &mask);
> +#if SCHED_SETAFFINITY_ARGS == 2
> +	sched_setaffinity(0, &mask);
> +#else
> +	sched_setaffinity(0, sizeof(mask), &mask);
> +#endif
> +#endif /* HAVE_SCHED_SETAFFINITY */
> +}
> +
> +/*
> + * returns 0 if test should end.
> + */
> +static int test_duration_dequeue(void)
> +{
> +	return !test_stop;
> +}
> +
> +static int test_duration_enqueue(void)
> +{
> +	return !test_stop;
> +}
> +
> +static unsigned long long __thread nr_dequeues;
> +static unsigned long long __thread nr_enqueues;
> +
> +static unsigned long nr_successful_dequeues;
> +static unsigned int nr_enqueuers;
> +static unsigned int nr_dequeuers;
> +
> +
> +#define ARRAY_POISON	0xDEADBEEF
> +#define PAGE_SIZE	4096
> +#define PAGE_MASK	(PAGE_SIZE - 1)
> +#define NODES_PER_PAGE	(PAGE_SIZE - 16) / sizeof (struct node)
> +
> +/* Lock-free queue, using the RCU to avoid the ABA problem and (more
> +   interestingly) to efficiently handle freeing memory.
> +
> +   We have to protect both the enqueuer and dequeuer's compare-and-
> +   exchange operation from running across a free and a subsequent
> +   reallocation of the same memory.  So, we protect the free with
> +   synchronize_rcu; this is enough because all the allocations take
> +   place before the compare-and-exchange ops.
> +
> +   Besides adding rcu_read_{,un}lock, the enqueue/dequeue are a standard
> +   implementation of a lock-free-queue.  The first node in the queue is
> +   always dummy: dequeuing returns the data from HEAD->NEXT, advances
> +   HEAD to HEAD->NEXT (which will now serve as dummy node), and frees the
> +   old HEAD.  Since RCU avoids the ABA problem, it doesn't use double-word
> +   compare-and-exchange operations.  Node allocation and deallocation are
> +   a "black box" and synchronize_rcu is hidden within node deallocation.
> +
> +   So, the tricky part is finding a good allocation strategy for nodes.
> +   The allocator for nodes is shared by multiple threads, and since
> +   malloc/free are not lock-free a layer above them is obviously
> +   necessary: otherwise the whole exercise is useless.  In addition,
> +   to avoid penalizing dequeues, the allocator should avoid frequent
> +   synchronization (because synchronize_rcu is expensive).
> +
> +   The scheme that is used here uses a page as the allocation
> +   unit for nodes.  A page is freed when no more nodes are in use.
> +   Nodes from a page are never reused.
> +
> +   The nodes are allocated from Q->CURRENT.  Since whoever finds a full
> +   page has to busy wait, we use a trick to limit the duration of busy
> +   waiting.  A free page Q->FREE is always kept ready, so that any thread
> +   that allocates the last node in a page, or finds a full page can try
> +   to update Q->CURRENT.  Whoever loses the race has to busy wait, OTOH
> +   whoever wins the race has to allocate the new Q->FREE.  In other words,  
> +   if the following sequence happens
> +
> +     Thread 1                    Thread 2                 other threads
> +     -----------------------------------------------------------------------
> +     Get last node from page
> +	                         q->current = q->free;
> +				                          fill up q->current
> +     q->current = q->free fails
> +
> +   then thread 1 does not have anymore the duty of allocating q->current;
> +   thread 2 will do that.  If a thread finds a full current page and
> +   Q->CURRENT == Q->FREE, this means that another thread is going to
> +   allocate Q->FREE soon, and it busy waits.  After the allocation
> +   finishes, everything proceeds normally: some thread will take care
> +   of setting Q->CURRENT and allocating a new Q->FREE.
> + 
> +   One common scheme for allocation is to use a free list (implemented
> +   as a lock-free stack), but this free list is potentially unbounded.
> +   Instead, with the above scheme the number of live pages at any time
> +   is equal to the number of enqueuing threads.  */
> +
> +struct node {
> +	void *data;
> +	void *next;
> +};
> +
> +struct node_page {
> +	int in;
> +	int out;
> +	char padding[16 - sizeof(int) * 2];
> +	struct node nodes[NODES_PER_PAGE];
> +};
> +
> +
> +struct queue {
> +	struct node_page *current, *free;
> +	struct node *head, *tail;
> +};
> +
> +static struct node_page *new_node_page()
> +{
> +	struct node_page *p = valloc (PAGE_SIZE);
> +	p->in = p->out = 0;
> +	return p;
> +}
> +
> +static void free_node_page(struct node_page *p)
> +{
> +	/* Help making sure that accessing a dangling pointer is
> +	   adequately punished.  */
> +	p->in = ARRAY_POISON;
> +	free (p);
> +}
> +
> +static struct node *new_node(struct queue *q)
> +{
> +	struct node *n;
> +	struct node_page *p;
> +	int i;
> +
> +	do {
> +		p = q->current;
> +		i = p->in;
> +		if (i >= NODES_PER_PAGE - 1 &&
> +		    q->free != p &&
> +		    uatomic_cmpxchg(&q->current, p, q->free) == p)
> +			q->free = new_node_page();
> +
> +	} while (i == NODES_PER_PAGE || uatomic_cmpxchg(&p->in, i, i+1) != i);
> +
> +	assert (i >= 0 && i < NODES_PER_PAGE);
> +	n = &p->nodes[i];
> +	n->next = NULL;
> +	return n;
> +}
> +
> +void free_node(struct node *n)
> +{
> +	struct node_page *p = (struct node_page *) ((intptr_t) n & ~PAGE_MASK);
> +
> +	if (uatomic_add_return(&p->out, 1) == NODES_PER_PAGE) {
> +		synchronize_rcu();
> +		free_node_page(p);
> +	}
> +}
> +
> +void init_queue(struct queue *q)
> +{
> +	q->current = new_node_page();
> +	q->free = new_node_page();
> +	q->head = q->tail = new_node(q);
> +}
> +
> +void enqueue(struct queue *q, void *value)
> +{
> +	struct node *n = new_node(q);
> +	n->data = value;
> +	rcu_read_lock();
> +	for (;;) {
> +		struct node *tail = rcu_dereference(q->tail);
> +		struct node *next = rcu_dereference(tail->next);
> +		if (tail != q->tail) {
> +			/* A change occurred while reading the values.  */
> +			continue;
> +		}
> +
> +		if (next) {
> +			/* Help moving tail further.  */
> +			uatomic_cmpxchg(&q->tail, tail, next);
> +			continue;
> +		}
> +
> +		if (uatomic_cmpxchg(&tail->next, NULL, n) == NULL) {
> +			/* Move tail (another operation might beat us to it,
> +			   that's fine).  */
> +			uatomic_cmpxchg(&q->tail, tail, n);
> +			rcu_read_unlock();
> +			return;
> +		}
> +	}
> +}
> +
> +void *dequeue(struct queue *q, bool *not_empty)
> +{
> +	bool dummy;
> +	if (!not_empty)
> +		not_empty = &dummy;
> +
> +	rcu_read_lock();
> +	*not_empty = false;
> +	for (;;) {
> +		void *data;
> +		struct node *head = rcu_dereference(q->head);
> +		struct node *tail = rcu_dereference(q->tail);
> +		struct node *next = rcu_dereference(head->next);
> +
> +		if (head != q->head) {
> +			/* A change occurred while reading the values.  */
> +			continue;
> +		}
> +
> +		if (head == tail) {
> +			/* If all three are consistent, the queue is empty.  */
> +			if (!next)
> +				return NULL;
> +
> +			/* Help moving tail further.  */
> +			uatomic_cmpxchg(&q->tail, tail, next);
> +			continue;
> +		}
> +
> +		data = next->data;
> +		if (uatomic_cmpxchg(&q->head, head, next) == head) {
> +			/* Next remains as a dummy node, head is freed.  */
> +			rcu_read_unlock();
> +			*not_empty = true;
> +			free_node (head);
> +			return data;
> +		}
> +	}
> +}
> +
> +
> +static struct queue q;
> +
> +void *thr_enqueuer(void *_count)
> +{
> +	unsigned long long *count = _count;
> +
> +	printf_verbose("thread_begin %s, thread id : %lx, tid %lu\n",
> +			"enqueuer", pthread_self(), (unsigned long)gettid());
> +
> +	set_affinity();
> +
> +	rcu_register_thread();
> +
> +	while (!test_go)
> +	{
> +	}
> +	smp_mb();
> +
> +	for (;;) {
> +		enqueue (&q, NULL);
> +
> +		if (unlikely(wdelay))
> +			loop_sleep(wdelay);
> +		nr_enqueues++;
> +		if (unlikely(!test_duration_enqueue()))
> +			break;
> +	}
> +
> +	rcu_unregister_thread();
> +
> +	*count = nr_enqueues;
> +	printf_verbose("thread_end %s, thread id : %lx, tid %lu - count %d\n",
> +		       "enqueuer", pthread_self(), (unsigned long)gettid(), nr_enqueues);
> +	return ((void*)1);
> +
> +}
> +
> +void *thr_dequeuer(void *_count)
> +{
> +	unsigned long long *count = _count;
> +
> +	printf_verbose("thread_begin %s, thread id : %lx, tid %lu\n",
> +			"dequeuer", pthread_self(), (unsigned long)gettid());
> +
> +	set_affinity();
> +
> +	rcu_register_thread();
> +
> +	while (!test_go)
> +	{
> +	}
> +	smp_mb();
> +
> +	for (;;) {
> +		bool not_empty;
> +		dequeue (&q, &not_empty);
> +		if (not_empty)
> +			uatomic_inc (&nr_successful_dequeues);
> +
> +		nr_dequeues++;
> +		if (unlikely(!test_duration_dequeue()))
> +			break;
> +		if (unlikely(rduration))
> +			loop_sleep(rduration);
> +	}
> +
> +	rcu_unregister_thread();
> +
> +	printf_verbose("thread_end %s, thread id : %lx, tid %lu - count %d\n",
> +			"dequeuer", pthread_self(), (unsigned long)gettid(), nr_dequeues);
> +	*count = nr_dequeues;
> +	return ((void*)2);
> +}
> +
> +void test_end(struct queue *q)
> +{
> +	bool not_empty;
> +	do
> +		dequeue (q, &not_empty);
> +	while (!not_empty);
> +	if (q->current != q->free)
> +		free_node_page(q->free);
> +	free_node_page(q->current);
> +}
> +
> +void show_usage(int argc, char **argv)
> +{
> +	printf("Usage : %s nr_dequeuers nr_enqueuers duration (s)", argv[0]);
> +	printf(" [-d delay] (enqueuer period (in loops))");
> +	printf(" [-c duration] (dequeuer period (in loops))");
> +	printf(" [-v] (verbose output)");
> +	printf(" [-a cpu#] [-a cpu#]... (affinity)");
> +	printf("\n");
> +}
> +
> +int main(int argc, char **argv)
> +{
> +	int err;
> +	pthread_t *tid_enqueuer, *tid_dequeuer;
> +	void *tret;
> +	unsigned long long *count_enqueuer, *count_dequeuer;
> +	unsigned long long tot_enqueues = 0, tot_dequeues = 0;
> +	int i, a;
> +
> +	if (argc < 4) {
> +		show_usage(argc, argv);
> +		return -1;
> +	}
> +
> +	err = sscanf(argv[1], "%u", &nr_dequeuers);
> +	if (err != 1) {
> +		show_usage(argc, argv);
> +		return -1;
> +	}
> +
> +	err = sscanf(argv[2], "%u", &nr_enqueuers);
> +	if (err != 1) {
> +		show_usage(argc, argv);
> +		return -1;
> +	}
> +	
> +	err = sscanf(argv[3], "%lu", &duration);
> +	if (err != 1) {
> +		show_usage(argc, argv);
> +		return -1;
> +	}
> +
> +	for (i = 4; i < argc; i++) {
> +		if (argv[i][0] != '-')
> +			continue;
> +		switch (argv[i][1]) {
> +		case 'a':
> +			if (argc < i + 2) {
> +				show_usage(argc, argv);
> +				return -1;
> +			}
> +			a = atoi(argv[++i]);
> +			cpu_affinities[next_aff++] = a;
> +			use_affinity = 1;
> +			printf_verbose("Adding CPU %d affinity\n", a);
> +			break;
> +		case 'c':
> +			if (argc < i + 2) {
> +				show_usage(argc, argv);
> +				return -1;
> +			}
> +			rduration = atol(argv[++i]);
> +			break;
> +		case 'd':
> +			if (argc < i + 2) {
> +				show_usage(argc, argv);
> +				return -1;
> +			}
> +			wdelay = atol(argv[++i]);
> +			break;
> +		case 'v':
> +			verbose_mode = 1;
> +			break;
> +		}
> +	}
> +
> +	printf_verbose("running test for %lu seconds, %u enqueuers, %u dequeuers.\n",
> +		duration, nr_enqueuers, nr_dequeuers);
> +	printf_verbose("Writer delay : %lu loops.\n", rduration);
> +	printf_verbose("Reader duration : %lu loops.\n", wdelay);
> +	printf_verbose("thread %-6s, thread id : %lx, tid %lu\n",
> +			"main", pthread_self(), (unsigned long)gettid());
> +
> +	tid_enqueuer = malloc(sizeof(*tid_enqueuer) * nr_enqueuers);
> +	tid_dequeuer = malloc(sizeof(*tid_dequeuer) * nr_dequeuers);
> +	count_enqueuer = malloc(sizeof(*count_enqueuer) * nr_enqueuers);
> +	count_dequeuer = malloc(sizeof(*count_dequeuer) * nr_dequeuers);
> +	init_queue (&q);
> +
> +	next_aff = 0;
> +
> +	for (i = 0; i < nr_enqueuers; i++) {
> +		err = pthread_create(&tid_enqueuer[i], NULL, thr_enqueuer,
> +				     &count_enqueuer[i]);
> +		if (err != 0)
> +			exit(1);
> +	}
> +	for (i = 0; i < nr_dequeuers; i++) {
> +		err = pthread_create(&tid_dequeuer[i], NULL, thr_dequeuer,
> +				     &count_dequeuer[i]);
> +		if (err != 0)
> +			exit(1);
> +	}
> +
> +	smp_mb();
> +
> +	test_go = 1;
> +
> +	for (i = 0; i < duration; i++) {
> +		sleep(1);
> +		if (verbose_mode)
> +			write (1, ".", 1);
> +	}
> +
> +	test_stop = 1;
> +
> +	for (i = 0; i < nr_enqueuers; i++) {
> +		err = pthread_join(tid_enqueuer[i], &tret);
> +		if (err != 0)
> +			exit(1);
> +		tot_enqueues += count_enqueuer[i];
> +	}
> +	for (i = 0; i < nr_dequeuers; i++) {
> +		err = pthread_join(tid_dequeuer[i], &tret);
> +		if (err != 0)
> +			exit(1);
> +		tot_dequeues += count_dequeuer[i];
> +	}
> +	
> +	printf_verbose("total number of enqueues : %llu, dequeues %llu\n", tot_enqueues,
> +	       tot_dequeues);
> +	printf("SUMMARY %-25s testdur %4lu nr_enqueuers %3u wdelay %6lu "
> +		"nr_dequeuers %3u "
> +		"rdur %6lu nr_enqueues %12llu nr_dequeues %12llu successful %12lu nr_ops %12llu\n",
> +		argv[0], duration, nr_enqueuers, wdelay,
> +		nr_dequeuers, rduration, tot_enqueues, tot_dequeues,
> +		nr_successful_dequeues, tot_enqueues + tot_dequeues);
> +
> +	test_end(&q);
> +	free(tid_enqueuer);
> +	free(tid_dequeuer);
> +	free(count_enqueuer);
> +	free(count_dequeuer);
> +	return 0;
> +}
> -- 
> 1.6.6
> 
> +{
> +	q->current = new_node_page();
> +	q->free = new_node_page();
> +	q->head = q->tail = new_node(q);
> +}
> +
> +void enqueue(struct queue *q, void *value)
> +{
> +	struct node *n = new_node(q);
> +	n->data = value;
> +	rcu_read_lock();
> +	for (;;) {
> +		struct node *tail = rcu_dereference(q->tail);
> +		struct node *next = rcu_dereference(tail->next);
> +		if (tail != q->tail) {
> +			/* A change occurred while reading the values.  */
> +			continue;
> +		}
> +
> +		if (next) {
> +			/* Help moving tail further.  */
> +			uatomic_cmpxchg(&q->tail, tail, next);
> +			continue;
> +		}
> +
> +		if (uatomic_cmpxchg(&tail->next, NULL, n) == NULL) {
> +			/* Move tail (another operation might beat us to it,
> +			   that's fine).  */
> +			uatomic_cmpxchg(&q->tail, tail, n);
> +			rcu_read_unlock();
> +			return;
> +		}
> +	}
> +}
> +
> +void *dequeue(struct queue *q, bool *not_empty)
> +{
> +	bool dummy;
> +	if (!not_empty)
> +		not_empty = &dummy;
> +
> +	rcu_read_lock();
> +	*not_empty = false;
> +	for (;;) {
> +		void *data;
> +		struct node *head = rcu_dereference(q->head);
> +		struct node *tail = rcu_dereference(q->tail);
> +		struct node *next = rcu_dereference(head->next);
> +
> +		if (head != q->head) {
> +			/* A change occurred while reading the values.  */
> +			continue;
> +		}
> +
> +		if (head == tail) {
> +			/* If all three are consistent, the queue is empty.  */
> +			if (!next)
> +				return NULL;
> +
> +			/* Help moving tail further.  */
> +			uatomic_cmpxchg(&q->tail, tail, next);
> +			continue;
> +		}
> +
> +		data = next->data;
> +		if (uatomic_cmpxchg(&q->head, head, next) == head) {
> +			/* Next remains as a dummy node, head is freed.  */
> +			rcu_read_unlock();
> +			*not_empty = true;
> +			free_node (head);
> +			return data;
> +		}
> +	}
> +}
> +
> +
> +static struct queue q;
> +
> +void *thr_enqueuer(void *_count)
> +{
> +	unsigned long long *count = _count;
> +
> +	printf_verbose("thread_begin %s, thread id : %lx, tid %lu\n",
> +			"enqueuer", pthread_self(), (unsigned long)gettid());
> +
> +	set_affinity();
> +
> +	rcu_register_thread();
> +
> +	while (!test_go)
> +	{
> +	}
> +	smp_mb();
> +
> +	for (;;) {
> +		enqueue (&q, NULL);
> +
> +		if (unlikely(wdelay))
> +			loop_sleep(wdelay);
> +		nr_enqueues++;
> +		if (unlikely(!test_duration_enqueue()))
> +			break;
> +	}
> +
> +	rcu_unregister_thread();
> +
> +	*count = nr_enqueues;
> +	printf_verbose("thread_end %s, thread id : %lx, tid %lu - count %d\n",
> +		       "enqueuer", pthread_self(), (unsigned long)gettid(), nr_enqueues);
> +	return ((void*)1);
> +
> +}
> +
> +void *thr_dequeuer(void *_count)
> +{
> +	unsigned long long *count = _count;
> +
> +	printf_verbose("thread_begin %s, thread id : %lx, tid %lu\n",
> +			"dequeuer", pthread_self(), (unsigned long)gettid());
> +
> +	set_affinity();
> +
> +	rcu_register_thread();
> +
> +	while (!test_go)
> +	{
> +	}
> +	smp_mb();
> +
> +	for (;;) {
> +		bool not_empty;
> +		dequeue (&q, &not_empty);
> +		if (not_empty)
> +			uatomic_inc (&nr_successful_dequeues);
> +
> +		nr_dequeues++;
> +		if (unlikely(!test_duration_dequeue()))
> +			break;
> +		if (unlikely(rduration))
> +			loop_sleep(rduration);
> +	}
> +
> +	rcu_unregister_thread();
> +
> +	printf_verbose("thread_end %s, thread id : %lx, tid %lu - count %d\n",
> +			"dequeuer", pthread_self(), (unsigned long)gettid(), nr_dequeues);
> +	*count = nr_dequeues;
> +	return ((void*)2);
> +}
> +
> +void test_end(struct queue *q)
> +{
> +	bool not_empty;
> +	do
> +		dequeue (q, &not_empty);
> +	while (!not_empty);
> +	if (q->current != q->free)
> +		free_node_page(q->free);
> +	free_node_page(q->current);
> +}
> +
> +void show_usage(int argc, char **argv)
> +{
> +	printf("Usage : %s nr_dequeuers nr_enqueuers duration (s)", argv[0]);
> +	printf(" [-d delay] (enqueuer period (in loops))");
> +	printf(" [-c duration] (dequeuer period (in loops))");
> +	printf(" [-v] (verbose output)");
> +	printf(" [-a cpu#] [-a cpu#]... (affinity)");
> +	printf("\n");
> +}
> +
> +int main(int argc, char **argv)
> +{
> +	int err;
> +	pthread_t *tid_enqueuer, *tid_dequeuer;
> +	void *tret;
> +	unsigned long long *count_enqueuer, *count_dequeuer;
> +	unsigned long long tot_enqueues = 0, tot_dequeues = 0;
> +	int i, a;
> +
> +	if (argc < 4) {
> +		show_usage(argc, argv);
> +		return -1;
> +	}
> +
> +	err = sscanf(argv[1], "%u", &nr_dequeuers);
> +	if (err != 1) {
> +		show_usage(argc, argv);
> +		return -1;
> +	}
> +
> +	err = sscanf(argv[2], "%u", &nr_enqueuers);
> +	if (err != 1) {
> +		show_usage(argc, argv);
> +		return -1;
> +	}
> +	
> +	err = sscanf(argv[3], "%lu", &duration);
> +	if (err != 1) {
> +		show_usage(argc, argv);
> +		return -1;
> +	}
> +
> +	for (i = 4; i < argc; i++) {
> +		if (argv[i][0] != '-')
> +			continue;
> +		switch (argv[i][1]) {
> +		case 'a':
> +			if (argc < i + 2) {
> +				show_usage(argc, argv);
> +				return -1;
> +			}
> +			a = atoi(argv[++i]);
> +			cpu_affinities[next_aff++] = a;
> +			use_affinity = 1;
> +			printf_verbose("Adding CPU %d affinity\n", a);
> +			break;
> +		case 'c':
> +			if (argc < i + 2) {
> +				show_usage(argc, argv);
> +				return -1;
> +			}
> +			rduration = atol(argv[++i]);
> +			break;
> +		case 'd':
> +			if (argc < i + 2) {
> +				show_usage(argc, argv);
> +				return -1;
> +			}
> +			wdelay = atol(argv[++i]);
> +			break;
> +		case 'v':
> +			verbose_mode = 1;
> +			break;
> +		}
> +	}
> +
> +	printf_verbose("running test for %lu seconds, %u enqueuers, %u dequeuers.\n",
> +		duration, nr_enqueuers, nr_dequeuers);
> +	printf_verbose("Writer delay : %lu loops.\n", rduration);
> +	printf_verbose("Reader duration : %lu loops.\n", wdelay);
> +	printf_verbose("thread %-6s, thread id : %lx, tid %lu\n",
> +			"main", pthread_self(), (unsigned long)gettid());
> +
> +	tid_enqueuer = malloc(sizeof(*tid_enqueuer) * nr_enqueuers);
> +	tid_dequeuer = malloc(sizeof(*tid_dequeuer) * nr_dequeuers);
> +	count_enqueuer = malloc(sizeof(*count_enqueuer) * nr_enqueuers);
> +	count_dequeuer = malloc(sizeof(*count_dequeuer) * nr_dequeuers);
> +	init_queue (&q);
> +
> +	next_aff = 0;
> +
> +	for (i = 0; i < nr_enqueuers; i++) {
> +		err = pthread_create(&tid_enqueuer[i], NULL, thr_enqueuer,
> +				     &count_enqueuer[i]);
> +		if (err != 0)
> +			exit(1);
> +	}
> +	for (i = 0; i < nr_dequeuers; i++) {
> +		err = pthread_create(&tid_dequeuer[i], NULL, thr_dequeuer,
> +				     &count_dequeuer[i]);
> +		if (err != 0)
> +			exit(1);
> +	}
> +
> +	smp_mb();
> +
> +	test_go = 1;
> +
> +	for (i = 0; i < duration; i++) {
> +		sleep(1);
> +		if (verbose_mode)
> +			write (1, ".", 1);
> +	}
> +
> +	test_stop = 1;
> +
> +	for (i = 0; i < nr_enqueuers; i++) {
> +		err = pthread_join(tid_enqueuer[i], &tret);
> +		if (err != 0)
> +			exit(1);
> +		tot_enqueues += count_enqueuer[i];
> +	}
> +	for (i = 0; i < nr_dequeuers; i++) {
> +		err = pthread_join(tid_dequeuer[i], &tret);
> +		if (err != 0)
> +			exit(1);
> +		tot_dequeues += count_dequeuer[i];
> +	}
> +	
> +	printf_verbose("total number of enqueues : %llu, dequeues %llu\n", tot_enqueues,
> +	       tot_dequeues);
> +	printf("SUMMARY %-25s testdur %4lu nr_enqueuers %3u wdelay %6lu "
> +		"nr_dequeuers %3u "
> +		"rdur %6lu nr_enqueues %12llu nr_dequeues %12llu successful %12lu nr_ops %12llu\n",
> +		argv[0], duration, nr_enqueuers, wdelay,
> +		nr_dequeuers, rduration, tot_enqueues, tot_dequeues,
> +		nr_successful_dequeues, tot_enqueues + tot_dequeues);
> +
> +	test_end(&q);
> +	free(tid_enqueuer);
> +	free(tid_dequeuer);
> +	free(count_enqueuer);
> +	free(count_dequeuer);
> +	return 0;
> +}
> -- 
> 1.6.6
> 
> 
> 
> _______________________________________________
> ltt-dev mailing list
> ltt-dev at lists.casi.polymtl.ca
> http://lists.casi.polymtl.ca/cgi-bin/mailman/listinfo/ltt-dev
> 

-- 
Mathieu Desnoyers
Operating System Efficiency Consultant
EfficiOS Inc.
http://www.efficios.com




More information about the lttng-dev mailing list