[ltt-dev] [rp] [PATCH RFC urcu 2/2] Map symbols to allow multiple RCU flavors to be used in one binary

Paul E. McKenney paulmck at linux.vnet.ibm.com
Tue Apr 5 00:15:49 EDT 2011


On Fri, Apr 01, 2011 at 04:59:09PM -0400, Mathieu Desnoyers wrote:
> * Paul E. McKenney (paulmck at linux.vnet.ibm.com) wrote:
> > Probably need similar mapping for rcu_defer().  Definitely need
> > backwards-compatibility mapping for programs compiled against
> > old versions of the library.
> > 
> > Signed-off-by: Paul E. McKenney <paulmck at linux.vnet.ibm.com>
> > ---
> >  Makefile.am          |    3 +-
> >  tests/Makefile.am    |   33 ++--
> >  tests/rcutorture.h   |    1 -
> >  tests/urcutorture.c  |   13 +-
> >  urcu-bp.c            |    4 +
> >  urcu-bp.h            |    8 +-
> >  urcu-call-rcu-impl.h |  618 +++++++++++++++++++++++++++++++++++++++++++++++++
> >  urcu-call-rcu.c      |  620 --------------------------------------------------
> >  urcu-qsbr.c          |   36 ++--
> >  urcu-qsbr.h          |   18 +-
> >  urcu.c               |    5 +
> >  urcu.h               |   20 ++-
> >  12 files changed, 704 insertions(+), 675 deletions(-)
> >  create mode 100644 urcu-call-rcu-impl.h
> >  delete mode 100644 urcu-call-rcu.c
> > 
> > diff --git a/Makefile.am b/Makefile.am
> > index 7956e7e..ef3bfef 100644
> > --- a/Makefile.am
> > +++ b/Makefile.am
> > @@ -30,7 +30,7 @@ COMPAT+=compat_futex.c
> >  endif
> >  
> >  lib_LTLIBRARIES = liburcu.la liburcu-qsbr.la liburcu-mb.la liburcu-signal.la \
> > -		  liburcu-bp.la liburcu-defer.la liburcu-call.la \
> > +		  liburcu-bp.la liburcu-defer.la \
> >  		  libwfqueue.la libwfstack.la librculfqueue.la librculfstack.la
> >  
> >  liburcu_la_SOURCES = urcu.c urcu-pointer.c $(COMPAT)
> > @@ -45,7 +45,6 @@ liburcu_signal_la_CFLAGS = -DRCU_SIGNAL
> >  
> >  liburcu_bp_la_SOURCES = urcu-bp.c urcu-pointer.c $(COMPAT)
> >  
> > -liburcu_call_la_SOURCES = urcu-call-rcu.c $(COMPAT)
> >  liburcu_defer_la_SOURCES = urcu-defer.c $(COMPAT)
> >  
> >  libwfqueue_la_SOURCES = wfqueue.c $(COMPAT)
> > diff --git a/tests/Makefile.am b/tests/Makefile.am
> > index 3c025a4..8dacb11 100644
> > --- a/tests/Makefile.am
> > +++ b/tests/Makefile.am
> > @@ -28,21 +28,20 @@ if COMPAT_FUTEX
> >  COMPAT+=$(top_srcdir)/compat_futex.c
> >  endif
> >  
> > -URCU=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/urcu-call-rcu.c $(top_srcdir)/wfqueue.c $(COMPAT)
> > -URCU_QSBR=$(top_srcdir)/urcu-qsbr.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/urcu-call-rcu.c $(top_srcdir)/wfqueue.c $(COMPAT)
> > +URCU=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/wfqueue.c $(COMPAT)
> > +URCU_QSBR=$(top_srcdir)/urcu-qsbr.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/wfqueue.c $(COMPAT)
> >  # URCU_MB uses urcu.c but -DRCU_MB must be defined
> > -URCU_MB=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/urcu-call-rcu.c $(top_srcdir)/wfqueue.c $(COMPAT)
> > +URCU_MB=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/wfqueue.c $(COMPAT)
> >  # URCU_SIGNAL uses urcu.c but -DRCU_SIGNAL must be defined
> > -URCU_SIGNAL=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/urcu-call-rcu.c $(top_srcdir)/wfqueue.c $(COMPAT)
> > -URCU_BP=$(top_srcdir)/urcu-bp.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/urcu-call-rcu.c $(top_srcdir)/wfqueue.c $(COMPAT)
> > -URCU_DEFER=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-defer.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/urcu-call-rcu.c $(top_srcdir)/wfqueue.c $(COMPAT)
> > +URCU_SIGNAL=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/wfqueue.c $(COMPAT)
> > +URCU_BP=$(top_srcdir)/urcu-bp.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/wfqueue.c $(COMPAT)
> > +URCU_DEFER=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-defer.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/wfqueue.c $(COMPAT)
> >  
> >  URCU_LIB=$(top_builddir)/liburcu.la
> >  URCU_QSBR_LIB=$(top_builddir)/liburcu-qsbr.la
> >  URCU_MB_LIB=$(top_builddir)/liburcu-mb.la
> >  URCU_SIGNAL_LIB=$(top_builddir)/liburcu-signal.la
> >  URCU_BP_LIB=$(top_builddir)/liburcu-bp.la
> > -URCU_CALL_LIB=$(top_builddir)/liburcu-call.la
> >  WFQUEUE_LIB=$(top_builddir)/libwfqueue.la
> >  WFSTACK_LIB=$(top_builddir)/libwfstack.la
> >  RCULFQUEUE_LIB=$(top_builddir)/librculfqueue.la
> > @@ -95,24 +94,24 @@ test_perthreadlock_SOURCES = test_perthreadlock.c $(URCU_SIGNAL)
> >  
> >  
> >  rcutorture_urcu_SOURCES = urcutorture.c
> > -rcutorture_urcu_CFLAGS = -DTORTURE_URCU $(AM_CFLAGS)
> > -rcutorture_urcu_LDADD = $(URCU) $(URCU_CALL_LIB) $(WFQUEUE_LIB)
> > +rcutorture_urcu_CFLAGS = -DRCU_MEMBARRIER $(AM_CFLAGS)
> > +rcutorture_urcu_LDADD = $(URCU) $(WFQUEUE_LIB)
> >  
> >  rcutorture_urcu_mb_SOURCES = urcutorture.c
> > -rcutorture_urcu_mb_CFLAGS = -DTORTURE_URCU_MB $(AM_CFLAGS)
> > -rcutorture_urcu_mb_LDADD = $(URCU_MB_LIB) $(URCU_CALL_LIB) $(WFQUEUE_LIB)
> > +rcutorture_urcu_mb_CFLAGS = -DRCU_MB $(AM_CFLAGS)
> > +rcutorture_urcu_mb_LDADD = $(URCU_MB_LIB) $(WFQUEUE_LIB)
> >  
> >  rcutorture_qsbr_SOURCES = urcutorture.c
> > -rcutorture_qsbr_CFLAGS = -DTORTURE_QSBR $(AM_CFLAGS)
> > -rcutorture_qsbr_LDADD = $(URCU_QSBR_LIB) $(URCU_CALL_LIB) $(WFQUEUE_LIB)
> > +rcutorture_qsbr_CFLAGS = -DRCU_QSBR $(AM_CFLAGS)
> > +rcutorture_qsbr_LDADD = $(URCU_QSBR_LIB) $(WFQUEUE_LIB)
> >  
> >  rcutorture_urcu_signal_SOURCES = urcutorture.c
> > -rcutorture_urcu_signal_CFLAGS = -DTORTURE_URCU_SIGNAL $(AM_CFLAGS)
> > -rcutorture_urcu_signal_LDADD = $(URCU_SIGNAL_LIB) $(URCU_CALL_LIB) $(WFQUEUE_LIB)
> > +rcutorture_urcu_signal_CFLAGS = -DRCU_SIGNAL $(AM_CFLAGS)
> > +rcutorture_urcu_signal_LDADD = $(URCU_SIGNAL_LIB) $(WFQUEUE_LIB)
> >  
> >  rcutorture_urcu_bp_SOURCES = urcutorture.c
> > -rcutorture_urcu_bp_CFLAGS = -DTORTURE_URCU_BP $(AM_CFLAGS)
> > -rcutorture_urcu_bp_LDADD = $(URCU_BP_LIB) $(URCU_CALL_LIB) $(WFQUEUE_LIB)
> > +rcutorture_urcu_bp_CFLAGS = -DRCU_BP $(AM_CFLAGS)
> > +rcutorture_urcu_bp_LDADD = $(URCU_BP_LIB) $(WFQUEUE_LIB)
> >  
> >  test_mutex_SOURCES = test_mutex.c $(URCU)
> >  
> > diff --git a/tests/rcutorture.h b/tests/rcutorture.h
> > index 66fdd7f..aba74b0 100644
> > --- a/tests/rcutorture.h
> > +++ b/tests/rcutorture.h
> > @@ -66,7 +66,6 @@
> >   */
> >  
> >  #include <stdlib.h>
> > -#include "../urcu-call-rcu.h"
> >  
> >  DEFINE_PER_THREAD(long long, n_reads_pt);
> >  DEFINE_PER_THREAD(long long, n_updates_pt);
> > diff --git a/tests/urcutorture.c b/tests/urcutorture.c
> > index 63fa386..a098d87 100644
> > --- a/tests/urcutorture.c
> > +++ b/tests/urcutorture.c
> > @@ -8,22 +8,19 @@
> >  #include "api.h"
> >  #define _LGPL_SOURCE
> >  
> > -#ifdef TORTURE_RCU_MEMBARRIER
> > -#define RCU_MEMBARRIER
> > +#ifdef RCU_MEMBARRIER
> >  #include <urcu.h>
> >  #endif
> > -#ifdef TORTURE_URCU_SIGNAL
> > -#define RCU_SIGNAL
> > +#ifdef RCU_SIGNAL
> >  #include <urcu.h>
> >  #endif
> > -#ifdef TORTURE_URCU_MB
> > -#define RCU_MB
> > +#ifdef RCU_MB
> >  #include <urcu.h>
> >  #endif
> > -#ifdef TORTURE_QSBR
> > +#ifdef RCU_QSBR
> >  #include <urcu-qsbr.h>
> >  #endif
> > -#ifdef TORTURE_URCU_BP
> > +#ifdef RCU_BP
> >  #include <urcu-bp.h>
> >  #endif
> >  
> > diff --git a/urcu-bp.c b/urcu-bp.c
> > index 04bb675..5474f9f 100644
> > --- a/urcu-bp.c
> > +++ b/urcu-bp.c
> > @@ -35,6 +35,8 @@
> >  #include <unistd.h>
> >  #include <sys/mman.h>
> >  
> > +#include "urcu-bp-map.h"
> > +
> >  #include "urcu-bp-static.h"
> >  /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */
> >  #include "urcu-bp.h"
> > @@ -375,3 +377,5 @@ void rcu_bp_after_fork_child(void)
> >  	ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL);
> >  	assert(!ret);
> >  }
> > +
> > +#include "urcu-call-rcu-impl.h"
> > diff --git a/urcu-bp.h b/urcu-bp.h
> > index d92fbd1..fdf885c 100644
> > --- a/urcu-bp.h
> > +++ b/urcu-bp.h
> > @@ -46,6 +46,8 @@
> >  extern "C" {
> >  #endif
> >  
> > +#include "urcu-bp-map.h"
> > +
> >  /*
> >   * Important !
> >   *
> > @@ -69,8 +71,8 @@ extern "C" {
> >   *
> >   * Mark the beginning and end of a read-side critical section.
> >   */
> > -#define rcu_read_lock()		_rcu_read_lock()
> > -#define rcu_read_unlock()	_rcu_read_unlock()
> > +#define rcu_read_lock_bp()		_rcu_read_lock()
> 
> It apply to _bp and all other flavors:
> 
> I would prefer a
> 
> #define rcu_read_lock_bp()	_rcu_read_lock_bp()
> 
> so we don't go through the "map" file mapping too maping times,
> otherwise things get confusing.

The reason I avoided that approach is that it puts the mapping information
all in one place, making it easy to update the mapping if needed.  And
the person reading the code has to have seen the mapping file to get
to the rcu_read_lock_bp(), right?  And the definition for _rcu_read_lock
is right below that for rcu_read_lock.

Hmmm...  I should drop the () to allow taking the address of the
rcu_read_lock() "function", shouldn't I?

							Thanx, Paul

> Thanks,
> 
> Mathieu
> 
> > +#define rcu_read_unlock_bp()		_rcu_read_unlock()
> >  
> >  #else /* !_LGPL_SOURCE */
> >  
> > @@ -115,4 +117,6 @@ static inline void rcu_init(void)
> >  }
> >  #endif
> >  
> > +#include "urcu-call-rcu.h"
> > +
> >  #endif /* _URCU_BP_H */
> > diff --git a/urcu-call-rcu-impl.h b/urcu-call-rcu-impl.h
> > new file mode 100644
> > index 0000000..68dbbdd
> > --- /dev/null
> > +++ b/urcu-call-rcu-impl.h
> > @@ -0,0 +1,618 @@
> > +/*
> > + * urcu-call-rcu.c
> > + *
> > + * Userspace RCU library - batch memory reclamation with kernel API
> > + *
> > + * Copyright (c) 2010 Paul E. McKenney <paulmck at linux.vnet.ibm.com>
> > + *
> > + * This library is free software; you can redistribute it and/or
> > + * modify it under the terms of the GNU Lesser General Public
> > + * License as published by the Free Software Foundation; either
> > + * version 2.1 of the License, or (at your option) any later version.
> > + *
> > + * This library is distributed in the hope that it will be useful,
> > + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> > + * Lesser General Public License for more details.
> > + *
> > + * You should have received a copy of the GNU Lesser General Public
> > + * License along with this library; if not, write to the Free Software
> > + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> > + */
> > +
> > +#include <stdio.h>
> > +#include <pthread.h>
> > +#include <signal.h>
> > +#include <assert.h>
> > +#include <stdlib.h>
> > +#include <string.h>
> > +#include <errno.h>
> > +#include <poll.h>
> > +#include <sys/time.h>
> > +#include <syscall.h>
> > +#include <unistd.h>
> > +
> > +#include "config.h"
> > +#include "urcu/wfqueue.h"
> > +#include "urcu-call-rcu.h"
> > +#include "urcu-pointer.h"
> > +#include "urcu/list.h"
> > +
> > +/* Data structure that identifies a call_rcu thread. */
> > +
> > +struct call_rcu_data {
> > +	struct cds_wfq_queue cbs;
> > +	unsigned long flags;
> > +	pthread_mutex_t mtx;
> > +	pthread_cond_t cond;
> > +	unsigned long qlen;
> > +	pthread_t tid;
> > +	struct cds_list_head list;
> > +} __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
> > +
> > +/*
> > + * List of all call_rcu_data structures to keep valgrind happy.
> > + * Protected by call_rcu_mutex.
> > + */
> > +
> > +CDS_LIST_HEAD(call_rcu_data_list);
> > +
> > +/* Link a thread using call_rcu() to its call_rcu thread. */
> > +
> > +static __thread struct call_rcu_data *thread_call_rcu_data;
> > +
> > +/* Guard call_rcu thread creation. */
> > +
> > +static pthread_mutex_t call_rcu_mutex = PTHREAD_MUTEX_INITIALIZER;
> > +
> > +/* If a given thread does not have its own call_rcu thread, this is default. */
> > +
> > +static struct call_rcu_data *default_call_rcu_data;
> > +
> > +/*
> > + * If the sched_getcpu() and sysconf(_SC_NPROCESSORS_CONF) calls are
> > + * available, then we can have call_rcu threads assigned to individual
> > + * CPUs rather than only to specific threads.
> > + */
> > +
> > +#if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF)
> > +
> > +/*
> > + * Pointer to array of pointers to per-CPU call_rcu_data structures
> > + * and # CPUs.
> > + */
> > +
> > +static struct call_rcu_data **per_cpu_call_rcu_data;
> > +static long maxcpus;
> > +
> > +/* Allocate the array if it has not already been allocated. */
> > +
> > +static void alloc_cpu_call_rcu_data(void)
> > +{
> > +	struct call_rcu_data **p;
> > +	static int warned = 0;
> > +
> > +	if (maxcpus != 0)
> > +		return;
> > +	maxcpus = sysconf(_SC_NPROCESSORS_CONF);
> > +	if (maxcpus <= 0) {
> > +		return;
> > +	}
> > +	p = malloc(maxcpus * sizeof(*per_cpu_call_rcu_data));
> > +	if (p != NULL) {
> > +		memset(p, '\0', maxcpus * sizeof(*per_cpu_call_rcu_data));
> > +		per_cpu_call_rcu_data = p;
> > +	} else {
> > +		if (!warned) {
> > +			fprintf(stderr, "[error] liburcu: unable to allocate per-CPU pointer array\n");
> > +		}
> > +		warned = 1;
> > +	}
> > +}
> > +
> > +#else /* #if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF) */
> > +
> > +static const struct call_rcu_data **per_cpu_call_rcu_data = NULL;
> > +static const long maxcpus = -1;
> > +
> > +static void alloc_cpu_call_rcu_data(void)
> > +{
> > +}
> > +
> > +static int sched_getcpu(void)
> > +{
> > +	return -1;
> > +}
> > +
> > +#endif /* #else #if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF) */
> > +
> > +/* Acquire the specified pthread mutex. */
> > +
> > +static void call_rcu_lock(pthread_mutex_t *pmp)
> > +{
> > +	if (pthread_mutex_lock(pmp) != 0) {
> > +		perror("pthread_mutex_lock");
> > +		exit(-1);
> > +	}
> > +}
> > +
> > +/* Release the specified pthread mutex. */
> > +
> > +static void call_rcu_unlock(pthread_mutex_t *pmp)
> > +{
> > +	if (pthread_mutex_unlock(pmp) != 0) {
> > +		perror("pthread_mutex_unlock");
> > +		exit(-1);
> > +	}
> > +}
> > +
> > +/* This is the code run by each call_rcu thread. */
> > +
> > +static void *call_rcu_thread(void *arg)
> > +{
> > +	unsigned long cbcount;
> > +	struct cds_wfq_node *cbs;
> > +	struct cds_wfq_node **cbs_tail;
> > +	struct call_rcu_data *crdp = (struct call_rcu_data *)arg;
> > +	struct rcu_head *rhp;
> > +
> > +	thread_call_rcu_data = crdp;
> > +	for (;;) {
> > +		if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) {
> > +			while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL)
> > +				poll(NULL, 0, 1);
> > +			_CMM_STORE_SHARED(crdp->cbs.head, NULL);
> > +			cbs_tail = (struct cds_wfq_node **)
> > +				uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head);
> > +			synchronize_rcu();
> > +			cbcount = 0;
> > +			do {
> > +				while (cbs->next == NULL &&
> > +				       &cbs->next != cbs_tail)
> > +				       	poll(NULL, 0, 1);
> > +				if (cbs == &crdp->cbs.dummy) {
> > +					cbs = cbs->next;
> > +					continue;
> > +				}
> > +				rhp = (struct rcu_head *)cbs;
> > +				cbs = cbs->next;
> > +				rhp->func(rhp);
> > +				cbcount++;
> > +			} while (cbs != NULL);
> > +			uatomic_sub(&crdp->qlen, cbcount);
> > +		}
> > +		if (crdp->flags & URCU_CALL_RCU_STOP)
> > +			break;
> > +		if (crdp->flags & URCU_CALL_RCU_RT)
> > +			poll(NULL, 0, 10);
> > +		else {
> > +			call_rcu_lock(&crdp->mtx);
> > +			_CMM_STORE_SHARED(crdp->flags,
> > +				     crdp->flags & ~URCU_CALL_RCU_RUNNING);
> > +			if (&crdp->cbs.head ==
> > +			    _CMM_LOAD_SHARED(crdp->cbs.tail) &&
> > +			    pthread_cond_wait(&crdp->cond, &crdp->mtx) != 0) {
> > +				perror("pthread_cond_wait");
> > +				exit(-1);
> > +			}
> > +			_CMM_STORE_SHARED(crdp->flags,
> > +				     crdp->flags | URCU_CALL_RCU_RUNNING);
> > +			poll(NULL, 0, 10);
> > +			call_rcu_unlock(&crdp->mtx);
> > +		}
> > +	}
> > +	call_rcu_lock(&crdp->mtx);
> > +	crdp->flags |= URCU_CALL_RCU_STOPPED;
> > +	call_rcu_unlock(&crdp->mtx);
> > +	return NULL;
> > +}
> > +
> > +/*
> > + * Create both a call_rcu thread and the corresponding call_rcu_data
> > + * structure, linking the structure in as specified.  Caller must hold
> > + * call_rcu_mutex.
> > + */
> > +
> > +static void call_rcu_data_init(struct call_rcu_data **crdpp,
> > +			       unsigned long flags)
> > +{
> > +	struct call_rcu_data *crdp;
> > +
> > +	crdp = malloc(sizeof(*crdp));
> > +	if (crdp == NULL) {
> > +		fprintf(stderr, "Out of memory.\n");
> > +		exit(-1);
> > +	}
> > +	memset(crdp, '\0', sizeof(*crdp));
> > +	cds_wfq_init(&crdp->cbs);
> > +	crdp->qlen = 0;
> > +	if (pthread_mutex_init(&crdp->mtx, NULL) != 0) {
> > +		perror("pthread_mutex_init");
> > +		exit(-1);
> > +	}
> > +	if (pthread_cond_init(&crdp->cond, NULL) != 0) {
> > +		perror("pthread_cond_init");
> > +		exit(-1);
> > +	}
> > +	crdp->flags = flags | URCU_CALL_RCU_RUNNING;
> > +	cds_list_add(&crdp->list, &call_rcu_data_list);
> > +	cmm_smp_mb();  /* Structure initialized before pointer is planted. */
> > +	*crdpp = crdp;
> > +	if (pthread_create(&crdp->tid, NULL, call_rcu_thread, crdp) != 0) {
> > +		perror("pthread_create");
> > +		exit(-1);
> > +	}
> > +}
> > +
> > +/*
> > + * Return a pointer to the call_rcu_data structure for the specified
> > + * CPU, returning NULL if there is none.  We cannot automatically
> > + * created it because the platform we are running on might not define
> > + * sched_getcpu().
> > + */
> > +
> > +struct call_rcu_data *get_cpu_call_rcu_data(int cpu)
> > +{
> > +	static int warned = 0;
> > +
> > +	if (per_cpu_call_rcu_data == NULL)
> > +		return NULL;
> > +	if (!warned && maxcpus > 0 && (cpu < 0 || maxcpus <= cpu)) {
> > +		fprintf(stderr, "[error] liburcu: get CPU # out of range\n");
> > +		warned = 1;
> > +	}
> > +	if (cpu < 0 || maxcpus <= cpu)
> > +		return NULL;
> > +	return per_cpu_call_rcu_data[cpu];
> > +}
> > +
> > +/*
> > + * Return the tid corresponding to the call_rcu thread whose
> > + * call_rcu_data structure is specified.
> > + */
> > +
> > +pthread_t get_call_rcu_thread(struct call_rcu_data *crdp)
> > +{
> > +	return crdp->tid;
> > +}
> > +
> > +/*
> > + * Create a call_rcu_data structure (with thread) and return a pointer.
> > + */
> > +
> > +static struct call_rcu_data *__create_call_rcu_data(unsigned long flags)
> > +{
> > +	struct call_rcu_data *crdp;
> > +
> > +	call_rcu_data_init(&crdp, flags);
> > +	return crdp;
> > +}
> > +
> > +struct call_rcu_data *create_call_rcu_data(unsigned long flags)
> > +{
> > +	struct call_rcu_data *crdp;
> > +
> > +	call_rcu_lock(&call_rcu_mutex);
> > +	crdp = __create_call_rcu_data(flags);
> > +	call_rcu_unlock(&call_rcu_mutex);
> > +	return crdp;
> > +}
> > +
> > +/*
> > + * Set the specified CPU to use the specified call_rcu_data structure.
> > + *
> > + * Use NULL to remove a CPU's call_rcu_data structure, but it is
> > + * the caller's responsibility to dispose of the removed structure.
> > + * Use get_cpu_call_rcu_data() to obtain a pointer to the old structure
> > + * (prior to NULLing it out, of course).
> > + */
> > +
> > +int set_cpu_call_rcu_data(int cpu, struct call_rcu_data *crdp)
> > +{
> > +	int warned = 0;
> > +
> > +	call_rcu_lock(&call_rcu_mutex);
> > +	if (cpu < 0 || maxcpus <= cpu) {
> > +		if (!warned) {
> > +			fprintf(stderr, "[error] liburcu: set CPU # out of range\n");
> > +			warned = 1;
> > +		}
> > +		call_rcu_unlock(&call_rcu_mutex);
> > +		errno = EINVAL;
> > +		return -EINVAL;
> > +	}
> > +	alloc_cpu_call_rcu_data();
> > +	call_rcu_unlock(&call_rcu_mutex);
> > +	if (per_cpu_call_rcu_data == NULL) {
> > +		errno = ENOMEM;
> > +		return -ENOMEM;
> > +	}
> > +	per_cpu_call_rcu_data[cpu] = crdp;
> > +	return 0;
> > +}
> > +
> > +/*
> > + * Return a pointer to the default call_rcu_data structure, creating
> > + * one if need be.  Because we never free call_rcu_data structures,
> > + * we don't need to be in an RCU read-side critical section.
> > + */
> > +
> > +struct call_rcu_data *get_default_call_rcu_data(void)
> > +{
> > +	if (default_call_rcu_data != NULL)
> > +		return rcu_dereference(default_call_rcu_data);
> > +	call_rcu_lock(&call_rcu_mutex);
> > +	if (default_call_rcu_data != NULL) {
> > +		call_rcu_unlock(&call_rcu_mutex);
> > +		return default_call_rcu_data;
> > +	}
> > +	call_rcu_data_init(&default_call_rcu_data, 0);
> > +	call_rcu_unlock(&call_rcu_mutex);
> > +	return default_call_rcu_data;
> > +}
> > +
> > +/*
> > + * Return the call_rcu_data structure that applies to the currently
> > + * running thread.  Any call_rcu_data structure assigned specifically
> > + * to this thread has first priority, followed by any call_rcu_data
> > + * structure assigned to the CPU on which the thread is running,
> > + * followed by the default call_rcu_data structure.  If there is not
> > + * yet a default call_rcu_data structure, one will be created.
> > + */
> > +struct call_rcu_data *get_call_rcu_data(void)
> > +{
> > +	int curcpu;
> > +	static int warned = 0;
> > +
> > +	if (thread_call_rcu_data != NULL)
> > +		return thread_call_rcu_data;
> > +	if (maxcpus <= 0)
> > +		return get_default_call_rcu_data();
> > +	curcpu = sched_getcpu();
> > +	if (!warned && (curcpu < 0 || maxcpus <= curcpu)) {
> > +		fprintf(stderr, "[error] liburcu: gcrd CPU # out of range\n");
> > +		warned = 1;
> > +	}
> > +	if (curcpu >= 0 && maxcpus > curcpu &&
> > +	    per_cpu_call_rcu_data != NULL &&
> > +	    per_cpu_call_rcu_data[curcpu] != NULL)
> > +	    	return per_cpu_call_rcu_data[curcpu];
> > +	return get_default_call_rcu_data();
> > +}
> > +
> > +/*
> > + * Return a pointer to this task's call_rcu_data if there is one.
> > + */
> > +
> > +struct call_rcu_data *get_thread_call_rcu_data(void)
> > +{
> > +	return thread_call_rcu_data;
> > +}
> > +
> > +/*
> > + * Set this task's call_rcu_data structure as specified, regardless
> > + * of whether or not this task already had one.  (This allows switching
> > + * to and from real-time call_rcu threads, for example.)
> > + *
> > + * Use NULL to remove a thread's call_rcu_data structure, but it is
> > + * the caller's responsibility to dispose of the removed structure.
> > + * Use get_thread_call_rcu_data() to obtain a pointer to the old structure
> > + * (prior to NULLing it out, of course).
> > + */
> > +
> > +void set_thread_call_rcu_data(struct call_rcu_data *crdp)
> > +{
> > +	thread_call_rcu_data = crdp;
> > +}
> > +
> > +/*
> > + * Create a separate call_rcu thread for each CPU.  This does not
> > + * replace a pre-existing call_rcu thread -- use the set_cpu_call_rcu_data()
> > + * function if you want that behavior.
> > + */
> > +
> > +int create_all_cpu_call_rcu_data(unsigned long flags)
> > +{
> > +	int i;
> > +	struct call_rcu_data *crdp;
> > +	int ret;
> > +
> > +	call_rcu_lock(&call_rcu_mutex);
> > +	alloc_cpu_call_rcu_data();
> > +	call_rcu_unlock(&call_rcu_mutex);
> > +	if (maxcpus <= 0) {
> > +		errno = EINVAL;
> > +		return -EINVAL;
> > +	}
> > +	if (per_cpu_call_rcu_data == NULL) {
> > +		errno = ENOMEM;
> > +		return -ENOMEM;
> > +	}
> > +	for (i = 0; i < maxcpus; i++) {
> > +		call_rcu_lock(&call_rcu_mutex);
> > +		if (get_cpu_call_rcu_data(i)) {
> > +			call_rcu_unlock(&call_rcu_mutex);
> > +			continue;
> > +		}
> > +		crdp = __create_call_rcu_data(flags);
> > +		if (crdp == NULL) {
> > +			call_rcu_unlock(&call_rcu_mutex);
> > +			errno = ENOMEM;
> > +			return -ENOMEM;
> > +		}
> > +		call_rcu_unlock(&call_rcu_mutex);
> > +		if ((ret = set_cpu_call_rcu_data(i, crdp)) != 0) {
> > +			/* FIXME: Leaks crdp for now. */
> > +			return ret; /* Can happen on race. */
> > +		}
> > +	}
> > +	return 0;
> > +}
> > +
> > +/*
> > + * Wake up the call_rcu thread corresponding to the specified
> > + * call_rcu_data structure.
> > + */
> > +static void wake_call_rcu_thread(struct call_rcu_data *crdp)
> > +{
> > +	if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RT)) {
> > +		call_rcu_lock(&crdp->mtx);
> > +		if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RUNNING)) {
> > +			if (pthread_cond_signal(&crdp->cond) != 0) {
> > +				perror("pthread_cond_signal");
> > +				exit(-1);
> > +			}
> > +		}
> > +		call_rcu_unlock(&crdp->mtx);
> > +	}
> > +}
> > +
> > +/*
> > + * Schedule a function to be invoked after a following grace period.
> > + * This is the only function that must be called -- the others are
> > + * only present to allow applications to tune their use of RCU for
> > + * maximum performance.
> > + *
> > + * Note that unless a call_rcu thread has not already been created,
> > + * the first invocation of call_rcu() will create one.  So, if you
> > + * need the first invocation of call_rcu() to be fast, make sure
> > + * to create a call_rcu thread first.  One way to accomplish this is
> > + * "get_call_rcu_data();", and another is create_all_cpu_call_rcu_data().
> > + */
> > +
> > +void call_rcu(struct rcu_head *head,
> > +	      void (*func)(struct rcu_head *head))
> > +{
> > +	struct call_rcu_data *crdp;
> > +
> > +	cds_wfq_node_init(&head->next);
> > +	head->func = func;
> > +	crdp = get_call_rcu_data();
> > +	cds_wfq_enqueue(&crdp->cbs, &head->next);
> > +	uatomic_inc(&crdp->qlen);
> > +	wake_call_rcu_thread(crdp);
> > +}
> > +
> > +/*
> > + * Free up the specified call_rcu_data structure, terminating the
> > + * associated call_rcu thread.  The caller must have previously
> > + * removed the call_rcu_data structure from per-thread or per-CPU
> > + * usage.  For example, set_cpu_call_rcu_data(cpu, NULL) for per-CPU
> > + * call_rcu_data structures or set_thread_call_rcu_data(NULL) for
> > + * per-thread call_rcu_data structures.
> > + *
> > + * We silently refuse to free up the default call_rcu_data structure
> > + * because that is where we put any leftover callbacks.  Note that
> > + * the possibility of self-spawning callbacks makes it impossible
> > + * to execute all the callbacks in finite time without putting any
> > + * newly spawned callbacks somewhere else.  The "somewhere else" of
> > + * last resort is the default call_rcu_data structure.
> > + *
> > + * We also silently refuse to free NULL pointers.  This simplifies
> > + * the calling code.
> > + */
> > +void call_rcu_data_free(struct call_rcu_data *crdp)
> > +{
> > +	struct cds_wfq_node *cbs;
> > +	struct cds_wfq_node **cbs_tail;
> > +	struct cds_wfq_node **cbs_endprev;
> > +
> > +	if (crdp == NULL || crdp == default_call_rcu_data) {
> > +		return;
> > +	}
> > +	if ((crdp->flags & URCU_CALL_RCU_STOPPED) == 0) {
> > +		call_rcu_lock(&crdp->mtx);
> > +		crdp->flags |= URCU_CALL_RCU_STOP;
> > +		call_rcu_unlock(&crdp->mtx);
> > +		wake_call_rcu_thread(crdp);
> > +		while ((crdp->flags & URCU_CALL_RCU_STOPPED) == 0)
> > +			poll(NULL, 0, 1);
> > +	}
> > +	if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) {
> > +		while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL)
> > +			poll(NULL, 0, 1);
> > +		_CMM_STORE_SHARED(crdp->cbs.head, NULL);
> > +		cbs_tail = (struct cds_wfq_node **)
> > +			uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head);
> > +		cbs_endprev = (struct cds_wfq_node **)
> > +			uatomic_xchg(&default_call_rcu_data, cbs_tail);
> > +		*cbs_endprev = cbs;
> > +		uatomic_add(&default_call_rcu_data->qlen,
> > +			    uatomic_read(&crdp->qlen));
> > +		cds_list_del(&crdp->list);
> > +		free(crdp);
> > +	}
> > +}
> > +
> > +/*
> > + * Clean up all the per-CPU call_rcu threads.
> > + */
> > +void free_all_cpu_call_rcu_data(void)
> > +{
> > +	int cpu;
> > +	struct call_rcu_data *crdp;
> > +
> > +	if (maxcpus <= 0)
> > +		return;
> > +	for (cpu = 0; cpu < maxcpus; cpu++) {
> > +		crdp = get_cpu_call_rcu_data(cpu);
> > +		if (crdp == NULL)
> > +			continue;
> > +		set_cpu_call_rcu_data(cpu, NULL);
> > +		call_rcu_data_free(crdp);
> > +	}
> > +}
> > +
> > +/*
> > + * Acquire the call_rcu_mutex in order to ensure that the child sees
> > + * all of the call_rcu() data structures in a consistent state.
> > + * Suitable for pthread_atfork() and friends.
> > + */
> > +void call_rcu_before_fork(void)
> > +{
> > +	call_rcu_lock(&call_rcu_mutex);
> > +}
> > +
> > +/*
> > + * Clean up call_rcu data structures in the parent of a successful fork()
> > + * that is not followed by exec() in the child.  Suitable for
> > + * pthread_atfork() and friends.
> > + */
> > +void call_rcu_after_fork_parent(void)
> > +{
> > +	call_rcu_unlock(&call_rcu_mutex);
> > +}
> > +
> > +/*
> > + * Clean up call_rcu data structures in the child of a successful fork()
> > + * that is not followed by exec().  Suitable for pthread_atfork() and
> > + * friends.
> > + */
> > +void call_rcu_after_fork_child(void)
> > +{
> > +	struct call_rcu_data *crdp;
> > +
> > +	/* Re-initialize the mutex. */
> > +	if (pthread_mutex_init(&call_rcu_mutex, NULL) != 0) {
> > +		perror("pthread_mutex_init");
> > +		exit(-1);
> > +	}
> > +
> > +	/*
> > +	 * Allocate a new default call_rcu_data structure in order
> > +	 * to get a working call_rcu thread to go with it.
> > +	 */
> > +	default_call_rcu_data = NULL;
> > +	(void)get_default_call_rcu_data();
> > +
> > +	/* Dispose of all of the rest of the call_rcu_data structures. */
> > +	while (call_rcu_data_list.next != call_rcu_data_list.prev) {
> > +		crdp = cds_list_entry(call_rcu_data_list.prev,
> > +				      struct call_rcu_data, list);
> > +		if (crdp == default_call_rcu_data)
> > +			crdp = cds_list_entry(crdp->list.prev,
> > +					      struct call_rcu_data, list);
> > +		crdp->flags = URCU_CALL_RCU_STOPPED;
> > +		call_rcu_data_free(crdp);
> > +	}
> > +}
> > diff --git a/urcu-call-rcu.c b/urcu-call-rcu.c
> > deleted file mode 100644
> > index 665f20c..0000000
> > --- a/urcu-call-rcu.c
> > +++ /dev/null
> > @@ -1,620 +0,0 @@
> > -/*
> > - * urcu-call-rcu.c
> > - *
> > - * Userspace RCU library - batch memory reclamation with kernel API
> > - *
> > - * Copyright (c) 2010 Paul E. McKenney <paulmck at linux.vnet.ibm.com>
> > - *
> > - * This library is free software; you can redistribute it and/or
> > - * modify it under the terms of the GNU Lesser General Public
> > - * License as published by the Free Software Foundation; either
> > - * version 2.1 of the License, or (at your option) any later version.
> > - *
> > - * This library is distributed in the hope that it will be useful,
> > - * but WITHOUT ANY WARRANTY; without even the implied warranty of
> > - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> > - * Lesser General Public License for more details.
> > - *
> > - * You should have received a copy of the GNU Lesser General Public
> > - * License along with this library; if not, write to the Free Software
> > - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> > - */
> > -
> > -#include <stdio.h>
> > -#include <pthread.h>
> > -#include <signal.h>
> > -#include <assert.h>
> > -#include <stdlib.h>
> > -#include <string.h>
> > -#include <errno.h>
> > -#include <poll.h>
> > -#include <sys/time.h>
> > -#include <syscall.h>
> > -#include <unistd.h>
> > -
> > -#include "config.h"
> > -#include "urcu/wfqueue.h"
> > -#include "urcu-call-rcu.h"
> > -#include "urcu-pointer.h"
> > -#include "urcu/list.h"
> > -
> > -/* Data structure that identifies a call_rcu thread. */
> > -
> > -struct call_rcu_data {
> > -	struct cds_wfq_queue cbs;
> > -	unsigned long flags;
> > -	pthread_mutex_t mtx;
> > -	pthread_cond_t cond;
> > -	unsigned long qlen;
> > -	pthread_t tid;
> > -	struct cds_list_head list;
> > -} __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
> > -
> > -/*
> > - * List of all call_rcu_data structures to keep valgrind happy.
> > - * Protected by call_rcu_mutex.
> > - */
> > -
> > -CDS_LIST_HEAD(call_rcu_data_list);
> > -
> > -/* Link a thread using call_rcu() to its call_rcu thread. */
> > -
> > -static __thread struct call_rcu_data *thread_call_rcu_data;
> > -
> > -/* Guard call_rcu thread creation. */
> > -
> > -static pthread_mutex_t call_rcu_mutex = PTHREAD_MUTEX_INITIALIZER;
> > -
> > -/* If a given thread does not have its own call_rcu thread, this is default. */
> > -
> > -static struct call_rcu_data *default_call_rcu_data;
> > -
> > -extern void synchronize_rcu(void);
> > -
> > -/*
> > - * If the sched_getcpu() and sysconf(_SC_NPROCESSORS_CONF) calls are
> > - * available, then we can have call_rcu threads assigned to individual
> > - * CPUs rather than only to specific threads.
> > - */
> > -
> > -#if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF)
> > -
> > -/*
> > - * Pointer to array of pointers to per-CPU call_rcu_data structures
> > - * and # CPUs.
> > - */
> > -
> > -static struct call_rcu_data **per_cpu_call_rcu_data;
> > -static long maxcpus;
> > -
> > -/* Allocate the array if it has not already been allocated. */
> > -
> > -static void alloc_cpu_call_rcu_data(void)
> > -{
> > -	struct call_rcu_data **p;
> > -	static int warned = 0;
> > -
> > -	if (maxcpus != 0)
> > -		return;
> > -	maxcpus = sysconf(_SC_NPROCESSORS_CONF);
> > -	if (maxcpus <= 0) {
> > -		return;
> > -	}
> > -	p = malloc(maxcpus * sizeof(*per_cpu_call_rcu_data));
> > -	if (p != NULL) {
> > -		memset(p, '\0', maxcpus * sizeof(*per_cpu_call_rcu_data));
> > -		per_cpu_call_rcu_data = p;
> > -	} else {
> > -		if (!warned) {
> > -			fprintf(stderr, "[error] liburcu: unable to allocate per-CPU pointer array\n");
> > -		}
> > -		warned = 1;
> > -	}
> > -}
> > -
> > -#else /* #if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF) */
> > -
> > -static const struct call_rcu_data **per_cpu_call_rcu_data = NULL;
> > -static const long maxcpus = -1;
> > -
> > -static void alloc_cpu_call_rcu_data(void)
> > -{
> > -}
> > -
> > -static int sched_getcpu(void)
> > -{
> > -	return -1;
> > -}
> > -
> > -#endif /* #else #if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF) */
> > -
> > -/* Acquire the specified pthread mutex. */
> > -
> > -static void call_rcu_lock(pthread_mutex_t *pmp)
> > -{
> > -	if (pthread_mutex_lock(pmp) != 0) {
> > -		perror("pthread_mutex_lock");
> > -		exit(-1);
> > -	}
> > -}
> > -
> > -/* Release the specified pthread mutex. */
> > -
> > -static void call_rcu_unlock(pthread_mutex_t *pmp)
> > -{
> > -	if (pthread_mutex_unlock(pmp) != 0) {
> > -		perror("pthread_mutex_unlock");
> > -		exit(-1);
> > -	}
> > -}
> > -
> > -/* This is the code run by each call_rcu thread. */
> > -
> > -static void *call_rcu_thread(void *arg)
> > -{
> > -	unsigned long cbcount;
> > -	struct cds_wfq_node *cbs;
> > -	struct cds_wfq_node **cbs_tail;
> > -	struct call_rcu_data *crdp = (struct call_rcu_data *)arg;
> > -	struct rcu_head *rhp;
> > -
> > -	thread_call_rcu_data = crdp;
> > -	for (;;) {
> > -		if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) {
> > -			while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL)
> > -				poll(NULL, 0, 1);
> > -			_CMM_STORE_SHARED(crdp->cbs.head, NULL);
> > -			cbs_tail = (struct cds_wfq_node **)
> > -				uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head);
> > -			synchronize_rcu();
> > -			cbcount = 0;
> > -			do {
> > -				while (cbs->next == NULL &&
> > -				       &cbs->next != cbs_tail)
> > -				       	poll(NULL, 0, 1);
> > -				if (cbs == &crdp->cbs.dummy) {
> > -					cbs = cbs->next;
> > -					continue;
> > -				}
> > -				rhp = (struct rcu_head *)cbs;
> > -				cbs = cbs->next;
> > -				rhp->func(rhp);
> > -				cbcount++;
> > -			} while (cbs != NULL);
> > -			uatomic_sub(&crdp->qlen, cbcount);
> > -		}
> > -		if (crdp->flags & URCU_CALL_RCU_STOP)
> > -			break;
> > -		if (crdp->flags & URCU_CALL_RCU_RT)
> > -			poll(NULL, 0, 10);
> > -		else {
> > -			call_rcu_lock(&crdp->mtx);
> > -			_CMM_STORE_SHARED(crdp->flags,
> > -				     crdp->flags & ~URCU_CALL_RCU_RUNNING);
> > -			if (&crdp->cbs.head ==
> > -			    _CMM_LOAD_SHARED(crdp->cbs.tail) &&
> > -			    pthread_cond_wait(&crdp->cond, &crdp->mtx) != 0) {
> > -				perror("pthread_cond_wait");
> > -				exit(-1);
> > -			}
> > -			_CMM_STORE_SHARED(crdp->flags,
> > -				     crdp->flags | URCU_CALL_RCU_RUNNING);
> > -			poll(NULL, 0, 10);
> > -			call_rcu_unlock(&crdp->mtx);
> > -		}
> > -	}
> > -	call_rcu_lock(&crdp->mtx);
> > -	crdp->flags |= URCU_CALL_RCU_STOPPED;
> > -	call_rcu_unlock(&crdp->mtx);
> > -	return NULL;
> > -}
> > -
> > -/*
> > - * Create both a call_rcu thread and the corresponding call_rcu_data
> > - * structure, linking the structure in as specified.  Caller must hold
> > - * call_rcu_mutex.
> > - */
> > -
> > -static void call_rcu_data_init(struct call_rcu_data **crdpp,
> > -			       unsigned long flags)
> > -{
> > -	struct call_rcu_data *crdp;
> > -
> > -	crdp = malloc(sizeof(*crdp));
> > -	if (crdp == NULL) {
> > -		fprintf(stderr, "Out of memory.\n");
> > -		exit(-1);
> > -	}
> > -	memset(crdp, '\0', sizeof(*crdp));
> > -	cds_wfq_init(&crdp->cbs);
> > -	crdp->qlen = 0;
> > -	if (pthread_mutex_init(&crdp->mtx, NULL) != 0) {
> > -		perror("pthread_mutex_init");
> > -		exit(-1);
> > -	}
> > -	if (pthread_cond_init(&crdp->cond, NULL) != 0) {
> > -		perror("pthread_cond_init");
> > -		exit(-1);
> > -	}
> > -	crdp->flags = flags | URCU_CALL_RCU_RUNNING;
> > -	cds_list_add(&crdp->list, &call_rcu_data_list);
> > -	cmm_smp_mb();  /* Structure initialized before pointer is planted. */
> > -	*crdpp = crdp;
> > -	if (pthread_create(&crdp->tid, NULL, call_rcu_thread, crdp) != 0) {
> > -		perror("pthread_create");
> > -		exit(-1);
> > -	}
> > -}
> > -
> > -/*
> > - * Return a pointer to the call_rcu_data structure for the specified
> > - * CPU, returning NULL if there is none.  We cannot automatically
> > - * created it because the platform we are running on might not define
> > - * sched_getcpu().
> > - */
> > -
> > -struct call_rcu_data *get_cpu_call_rcu_data(int cpu)
> > -{
> > -	static int warned = 0;
> > -
> > -	if (per_cpu_call_rcu_data == NULL)
> > -		return NULL;
> > -	if (!warned && maxcpus > 0 && (cpu < 0 || maxcpus <= cpu)) {
> > -		fprintf(stderr, "[error] liburcu: get CPU # out of range\n");
> > -		warned = 1;
> > -	}
> > -	if (cpu < 0 || maxcpus <= cpu)
> > -		return NULL;
> > -	return per_cpu_call_rcu_data[cpu];
> > -}
> > -
> > -/*
> > - * Return the tid corresponding to the call_rcu thread whose
> > - * call_rcu_data structure is specified.
> > - */
> > -
> > -pthread_t get_call_rcu_thread(struct call_rcu_data *crdp)
> > -{
> > -	return crdp->tid;
> > -}
> > -
> > -/*
> > - * Create a call_rcu_data structure (with thread) and return a pointer.
> > - */
> > -
> > -static struct call_rcu_data *__create_call_rcu_data(unsigned long flags)
> > -{
> > -	struct call_rcu_data *crdp;
> > -
> > -	call_rcu_data_init(&crdp, flags);
> > -	return crdp;
> > -}
> > -
> > -struct call_rcu_data *create_call_rcu_data(unsigned long flags)
> > -{
> > -	struct call_rcu_data *crdp;
> > -
> > -	call_rcu_lock(&call_rcu_mutex);
> > -	crdp = __create_call_rcu_data(flags);
> > -	call_rcu_unlock(&call_rcu_mutex);
> > -	return crdp;
> > -}
> > -
> > -/*
> > - * Set the specified CPU to use the specified call_rcu_data structure.
> > - *
> > - * Use NULL to remove a CPU's call_rcu_data structure, but it is
> > - * the caller's responsibility to dispose of the removed structure.
> > - * Use get_cpu_call_rcu_data() to obtain a pointer to the old structure
> > - * (prior to NULLing it out, of course).
> > - */
> > -
> > -int set_cpu_call_rcu_data(int cpu, struct call_rcu_data *crdp)
> > -{
> > -	int warned = 0;
> > -
> > -	call_rcu_lock(&call_rcu_mutex);
> > -	if (cpu < 0 || maxcpus <= cpu) {
> > -		if (!warned) {
> > -			fprintf(stderr, "[error] liburcu: set CPU # out of range\n");
> > -			warned = 1;
> > -		}
> > -		call_rcu_unlock(&call_rcu_mutex);
> > -		errno = EINVAL;
> > -		return -EINVAL;
> > -	}
> > -	alloc_cpu_call_rcu_data();
> > -	call_rcu_unlock(&call_rcu_mutex);
> > -	if (per_cpu_call_rcu_data == NULL) {
> > -		errno = ENOMEM;
> > -		return -ENOMEM;
> > -	}
> > -	per_cpu_call_rcu_data[cpu] = crdp;
> > -	return 0;
> > -}
> > -
> > -/*
> > - * Return a pointer to the default call_rcu_data structure, creating
> > - * one if need be.  Because we never free call_rcu_data structures,
> > - * we don't need to be in an RCU read-side critical section.
> > - */
> > -
> > -struct call_rcu_data *get_default_call_rcu_data(void)
> > -{
> > -	if (default_call_rcu_data != NULL)
> > -		return rcu_dereference(default_call_rcu_data);
> > -	call_rcu_lock(&call_rcu_mutex);
> > -	if (default_call_rcu_data != NULL) {
> > -		call_rcu_unlock(&call_rcu_mutex);
> > -		return default_call_rcu_data;
> > -	}
> > -	call_rcu_data_init(&default_call_rcu_data, 0);
> > -	call_rcu_unlock(&call_rcu_mutex);
> > -	return default_call_rcu_data;
> > -}
> > -
> > -/*
> > - * Return the call_rcu_data structure that applies to the currently
> > - * running thread.  Any call_rcu_data structure assigned specifically
> > - * to this thread has first priority, followed by any call_rcu_data
> > - * structure assigned to the CPU on which the thread is running,
> > - * followed by the default call_rcu_data structure.  If there is not
> > - * yet a default call_rcu_data structure, one will be created.
> > - */
> > -struct call_rcu_data *get_call_rcu_data(void)
> > -{
> > -	int curcpu;
> > -	static int warned = 0;
> > -
> > -	if (thread_call_rcu_data != NULL)
> > -		return thread_call_rcu_data;
> > -	if (maxcpus <= 0)
> > -		return get_default_call_rcu_data();
> > -	curcpu = sched_getcpu();
> > -	if (!warned && (curcpu < 0 || maxcpus <= curcpu)) {
> > -		fprintf(stderr, "[error] liburcu: gcrd CPU # out of range\n");
> > -		warned = 1;
> > -	}
> > -	if (curcpu >= 0 && maxcpus > curcpu &&
> > -	    per_cpu_call_rcu_data != NULL &&
> > -	    per_cpu_call_rcu_data[curcpu] != NULL)
> > -	    	return per_cpu_call_rcu_data[curcpu];
> > -	return get_default_call_rcu_data();
> > -}
> > -
> > -/*
> > - * Return a pointer to this task's call_rcu_data if there is one.
> > - */
> > -
> > -struct call_rcu_data *get_thread_call_rcu_data(void)
> > -{
> > -	return thread_call_rcu_data;
> > -}
> > -
> > -/*
> > - * Set this task's call_rcu_data structure as specified, regardless
> > - * of whether or not this task already had one.  (This allows switching
> > - * to and from real-time call_rcu threads, for example.)
> > - *
> > - * Use NULL to remove a thread's call_rcu_data structure, but it is
> > - * the caller's responsibility to dispose of the removed structure.
> > - * Use get_thread_call_rcu_data() to obtain a pointer to the old structure
> > - * (prior to NULLing it out, of course).
> > - */
> > -
> > -void set_thread_call_rcu_data(struct call_rcu_data *crdp)
> > -{
> > -	thread_call_rcu_data = crdp;
> > -}
> > -
> > -/*
> > - * Create a separate call_rcu thread for each CPU.  This does not
> > - * replace a pre-existing call_rcu thread -- use the set_cpu_call_rcu_data()
> > - * function if you want that behavior.
> > - */
> > -
> > -int create_all_cpu_call_rcu_data(unsigned long flags)
> > -{
> > -	int i;
> > -	struct call_rcu_data *crdp;
> > -	int ret;
> > -
> > -	call_rcu_lock(&call_rcu_mutex);
> > -	alloc_cpu_call_rcu_data();
> > -	call_rcu_unlock(&call_rcu_mutex);
> > -	if (maxcpus <= 0) {
> > -		errno = EINVAL;
> > -		return -EINVAL;
> > -	}
> > -	if (per_cpu_call_rcu_data == NULL) {
> > -		errno = ENOMEM;
> > -		return -ENOMEM;
> > -	}
> > -	for (i = 0; i < maxcpus; i++) {
> > -		call_rcu_lock(&call_rcu_mutex);
> > -		if (get_cpu_call_rcu_data(i)) {
> > -			call_rcu_unlock(&call_rcu_mutex);
> > -			continue;
> > -		}
> > -		crdp = __create_call_rcu_data(flags);
> > -		if (crdp == NULL) {
> > -			call_rcu_unlock(&call_rcu_mutex);
> > -			errno = ENOMEM;
> > -			return -ENOMEM;
> > -		}
> > -		call_rcu_unlock(&call_rcu_mutex);
> > -		if ((ret = set_cpu_call_rcu_data(i, crdp)) != 0) {
> > -			/* FIXME: Leaks crdp for now. */
> > -			return ret; /* Can happen on race. */
> > -		}
> > -	}
> > -	return 0;
> > -}
> > -
> > -/*
> > - * Wake up the call_rcu thread corresponding to the specified
> > - * call_rcu_data structure.
> > - */
> > -static void wake_call_rcu_thread(struct call_rcu_data *crdp)
> > -{
> > -	if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RT)) {
> > -		call_rcu_lock(&crdp->mtx);
> > -		if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RUNNING)) {
> > -			if (pthread_cond_signal(&crdp->cond) != 0) {
> > -				perror("pthread_cond_signal");
> > -				exit(-1);
> > -			}
> > -		}
> > -		call_rcu_unlock(&crdp->mtx);
> > -	}
> > -}
> > -
> > -/*
> > - * Schedule a function to be invoked after a following grace period.
> > - * This is the only function that must be called -- the others are
> > - * only present to allow applications to tune their use of RCU for
> > - * maximum performance.
> > - *
> > - * Note that unless a call_rcu thread has not already been created,
> > - * the first invocation of call_rcu() will create one.  So, if you
> > - * need the first invocation of call_rcu() to be fast, make sure
> > - * to create a call_rcu thread first.  One way to accomplish this is
> > - * "get_call_rcu_data();", and another is create_all_cpu_call_rcu_data().
> > - */
> > -
> > -void call_rcu(struct rcu_head *head,
> > -	      void (*func)(struct rcu_head *head))
> > -{
> > -	struct call_rcu_data *crdp;
> > -
> > -	cds_wfq_node_init(&head->next);
> > -	head->func = func;
> > -	crdp = get_call_rcu_data();
> > -	cds_wfq_enqueue(&crdp->cbs, &head->next);
> > -	uatomic_inc(&crdp->qlen);
> > -	wake_call_rcu_thread(crdp);
> > -}
> > -
> > -/*
> > - * Free up the specified call_rcu_data structure, terminating the
> > - * associated call_rcu thread.  The caller must have previously
> > - * removed the call_rcu_data structure from per-thread or per-CPU
> > - * usage.  For example, set_cpu_call_rcu_data(cpu, NULL) for per-CPU
> > - * call_rcu_data structures or set_thread_call_rcu_data(NULL) for
> > - * per-thread call_rcu_data structures.
> > - *
> > - * We silently refuse to free up the default call_rcu_data structure
> > - * because that is where we put any leftover callbacks.  Note that
> > - * the possibility of self-spawning callbacks makes it impossible
> > - * to execute all the callbacks in finite time without putting any
> > - * newly spawned callbacks somewhere else.  The "somewhere else" of
> > - * last resort is the default call_rcu_data structure.
> > - *
> > - * We also silently refuse to free NULL pointers.  This simplifies
> > - * the calling code.
> > - */
> > -void call_rcu_data_free(struct call_rcu_data *crdp)
> > -{
> > -	struct cds_wfq_node *cbs;
> > -	struct cds_wfq_node **cbs_tail;
> > -	struct cds_wfq_node **cbs_endprev;
> > -
> > -	if (crdp == NULL || crdp == default_call_rcu_data) {
> > -		return;
> > -	}
> > -	if ((crdp->flags & URCU_CALL_RCU_STOPPED) == 0) {
> > -		call_rcu_lock(&crdp->mtx);
> > -		crdp->flags |= URCU_CALL_RCU_STOP;
> > -		call_rcu_unlock(&crdp->mtx);
> > -		wake_call_rcu_thread(crdp);
> > -		while ((crdp->flags & URCU_CALL_RCU_STOPPED) == 0)
> > -			poll(NULL, 0, 1);
> > -	}
> > -	if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) {
> > -		while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL)
> > -			poll(NULL, 0, 1);
> > -		_CMM_STORE_SHARED(crdp->cbs.head, NULL);
> > -		cbs_tail = (struct cds_wfq_node **)
> > -			uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head);
> > -		cbs_endprev = (struct cds_wfq_node **)
> > -			uatomic_xchg(&default_call_rcu_data, cbs_tail);
> > -		*cbs_endprev = cbs;
> > -		uatomic_add(&default_call_rcu_data->qlen,
> > -			    uatomic_read(&crdp->qlen));
> > -		cds_list_del(&crdp->list);
> > -		free(crdp);
> > -	}
> > -}
> > -
> > -/*
> > - * Clean up all the per-CPU call_rcu threads.
> > - */
> > -void free_all_cpu_call_rcu_data(void)
> > -{
> > -	int cpu;
> > -	struct call_rcu_data *crdp;
> > -
> > -	if (maxcpus <= 0)
> > -		return;
> > -	for (cpu = 0; cpu < maxcpus; cpu++) {
> > -		crdp = get_cpu_call_rcu_data(cpu);
> > -		if (crdp == NULL)
> > -			continue;
> > -		set_cpu_call_rcu_data(cpu, NULL);
> > -		call_rcu_data_free(crdp);
> > -	}
> > -}
> > -
> > -/*
> > - * Acquire the call_rcu_mutex in order to ensure that the child sees
> > - * all of the call_rcu() data structures in a consistent state.
> > - * Suitable for pthread_atfork() and friends.
> > - */
> > -void call_rcu_before_fork(void)
> > -{
> > -	call_rcu_lock(&call_rcu_mutex);
> > -}
> > -
> > -/*
> > - * Clean up call_rcu data structures in the parent of a successful fork()
> > - * that is not followed by exec() in the child.  Suitable for
> > - * pthread_atfork() and friends.
> > - */
> > -void call_rcu_after_fork_parent(void)
> > -{
> > -	call_rcu_unlock(&call_rcu_mutex);
> > -}
> > -
> > -/*
> > - * Clean up call_rcu data structures in the child of a successful fork()
> > - * that is not followed by exec().  Suitable for pthread_atfork() and
> > - * friends.
> > - */
> > -void call_rcu_after_fork_child(void)
> > -{
> > -	struct call_rcu_data *crdp;
> > -
> > -	/* Re-initialize the mutex. */
> > -	if (pthread_mutex_init(&call_rcu_mutex, NULL) != 0) {
> > -		perror("pthread_mutex_init");
> > -		exit(-1);
> > -	}
> > -
> > -	/*
> > -	 * Allocate a new default call_rcu_data structure in order
> > -	 * to get a working call_rcu thread to go with it.
> > -	 */
> > -	default_call_rcu_data = NULL;
> > -	(void)get_default_call_rcu_data();
> > -
> > -	/* Dispose of all of the rest of the call_rcu_data structures. */
> > -	while (call_rcu_data_list.next != call_rcu_data_list.prev) {
> > -		crdp = cds_list_entry(call_rcu_data_list.prev,
> > -				      struct call_rcu_data, list);
> > -		if (crdp == default_call_rcu_data)
> > -			crdp = cds_list_entry(crdp->list.prev,
> > -					      struct call_rcu_data, list);
> > -		crdp->flags = URCU_CALL_RCU_STOPPED;
> > -		call_rcu_data_free(crdp);
> > -	}
> > -}
> > diff --git a/urcu-qsbr.c b/urcu-qsbr.c
> > index 69effd5..8dcad33 100644
> > --- a/urcu-qsbr.c
> > +++ b/urcu-qsbr.c
> > @@ -32,6 +32,8 @@
> >  #include <errno.h>
> >  #include <poll.h>
> >  
> > +#include "urcu-qsbr-map.h"
> > +
> >  #define BUILD_QSBR_LIB
> >  #include "urcu-qsbr-static.h"
> >  /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */
> > @@ -121,10 +123,11 @@ static void update_counter_and_wait(void)
> >  #endif	/* !(CAA_BITS_PER_LONG < 64) */
> >  
> >  	/*
> > -	 * Must commit rcu_gp_ctr update to memory before waiting for quiescent
> > -	 * state. Failure to do so could result in the writer waiting forever
> > -	 * while new readers are always accessing data (no progress). Enforce
> > -	 * compiler-order of store to rcu_gp_ctr before load rcu_reader ctr.
> > +	 * Must commit rcu_gp_ctr update to memory before waiting for
> > +	 * quiescent state. Failure to do so could result in the writer
> > +	 * waiting forever while new readers are always accessing data
> > +	 * (no progress). Enforce compiler-order of store to rcu_gp_ctr
> > +	 * before load rcu_reader ctr.
> >  	 */
> >  	cmm_barrier();
> >  
> > @@ -194,8 +197,8 @@ void synchronize_rcu(void)
> >  
> >  	/*
> >  	 * Mark the writer thread offline to make sure we don't wait for
> > -	 * our own quiescent state. This allows using synchronize_rcu() in
> > -	 * threads registered as readers.
> > +	 * our own quiescent state. This allows using synchronize_rcu()
> > +	 * in threads registered as readers.
> >  	 */
> >  	if (was_online)
> >  		CMM_STORE_SHARED(rcu_reader.ctr, 0);
> > @@ -212,10 +215,11 @@ void synchronize_rcu(void)
> >  
> >  	/*
> >  	 * Must finish waiting for quiescent state for parity 0 before
> > -	 * committing next rcu_gp_ctr update to memory. Failure to do so could
> > -	 * result in the writer waiting forever while new readers are always
> > -	 * accessing data (no progress).  Enforce compiler-order of load
> > -	 * rcu_reader ctr before store to rcu_gp_ctr.
> > +	 * committing next rcu_gp_ctr update to memory. Failure to
> > +	 * do so could result in the writer waiting forever while new
> > +	 * readers are always accessing data (no progress).  Enforce
> > +	 * compiler-order of load rcu_reader ctr before store to
> > +	 * rcu_gp_ctr.
> >  	 */
> >  	cmm_barrier();
> >  
> > @@ -238,7 +242,8 @@ out:
> >  	 * freed.
> >  	 */
> >  	if (was_online)
> > -		_CMM_STORE_SHARED(rcu_reader.ctr, CMM_LOAD_SHARED(rcu_gp_ctr));
> > +		_CMM_STORE_SHARED(rcu_reader.ctr,
> > +				  CMM_LOAD_SHARED(rcu_gp_ctr));
> >  	cmm_smp_mb();
> >  }
> >  #else /* !(CAA_BITS_PER_LONG < 64) */
> > @@ -250,8 +255,8 @@ void synchronize_rcu(void)
> >  
> >  	/*
> >  	 * Mark the writer thread offline to make sure we don't wait for
> > -	 * our own quiescent state. This allows using synchronize_rcu() in
> > -	 * threads registered as readers.
> > +	 * our own quiescent state. This allows using synchronize_rcu()
> > +	 * in threads registered as readers.
> >  	 */
> >  	cmm_smp_mb();
> >  	if (was_online)
> > @@ -265,7 +270,8 @@ out:
> >  	mutex_unlock(&rcu_gp_lock);
> >  
> >  	if (was_online)
> > -		_CMM_STORE_SHARED(rcu_reader.ctr, CMM_LOAD_SHARED(rcu_gp_ctr));
> > +		_CMM_STORE_SHARED(rcu_reader.ctr,
> > +				  CMM_LOAD_SHARED(rcu_gp_ctr));
> >  	cmm_smp_mb();
> >  }
> >  #endif  /* !(CAA_BITS_PER_LONG < 64) */
> > @@ -326,3 +332,5 @@ void rcu_exit(void)
> >  {
> >  	assert(cds_list_empty(&registry));
> >  }
> > +
> > +#include "urcu-call-rcu-impl.h"
> > diff --git a/urcu-qsbr.h b/urcu-qsbr.h
> > index 116fd77..984d70c 100644
> > --- a/urcu-qsbr.h
> > +++ b/urcu-qsbr.h
> > @@ -40,6 +40,8 @@
> >  extern "C" {
> >  #endif 
> >  
> > +#include "urcu-qsbr-map.h"
> > +
> >  /*
> >   * Important !
> >   *
> > @@ -62,15 +64,15 @@ extern "C" {
> >   * rcu_read_unlock()
> >   *
> >   * Mark the beginning and end of a read-side critical section.
> > - * DON'T FORGET TO USE rcu_register_thread/rcu_unregister_thread() FOR EACH
> > - * THREAD WITH READ-SIDE CRITICAL SECTION.
> > + * DON'T FORGET TO USE rcu_register_thread/rcu_unregister_thread()
> > + * FOR EACH THREAD WITH READ-SIDE CRITICAL SECTION.
> >   */
> > -#define rcu_read_lock()		_rcu_read_lock()
> > -#define rcu_read_unlock()	_rcu_read_unlock()
> > +#define rcu_read_lock_qsbr()		_rcu_read_lock()
> > +#define rcu_read_unlock_qsbr()		_rcu_read_unlock()
> >  
> > -#define rcu_quiescent_state()	_rcu_quiescent_state()
> > -#define rcu_thread_offline()	_rcu_thread_offline()
> > -#define rcu_thread_online()	_rcu_thread_online()
> > +#define rcu_quiescent_state_qsbr()	_rcu_quiescent_state()
> > +#define rcu_thread_offline_qsbr()	_rcu_thread_offline()
> > +#define rcu_thread_online_qsbr()	_rcu_thread_online()
> >  
> >  #else /* !_LGPL_SOURCE */
> >  
> > @@ -122,4 +124,6 @@ extern void rcu_unregister_thread(void);
> >  }
> >  #endif
> >  
> > +#include "urcu-call-rcu.h"
> > +
> >  #endif /* _URCU_QSBR_H */
> > diff --git a/urcu.c b/urcu.c
> > index e529ac0..4ee9e3b 100644
> > --- a/urcu.c
> > +++ b/urcu.c
> > @@ -33,6 +33,8 @@
> >  #include <errno.h>
> >  #include <poll.h>
> >  
> > +#include "urcu-map.h"
> > +
> >  #include "urcu-static.h"
> >  /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */
> >  #include "urcu.h"
> > @@ -428,4 +430,7 @@ void rcu_exit(void)
> >  	assert(act.sa_sigaction == sigrcu_handler);
> >  	assert(cds_list_empty(&registry));
> >  }
> > +
> >  #endif /* #ifdef RCU_SIGNAL */
> > +
> > +#include "urcu-call-rcu-impl.h"
> > diff --git a/urcu.h b/urcu.h
> > index c6c54e7..00d9b75 100644
> > --- a/urcu.h
> > +++ b/urcu.h
> > @@ -43,12 +43,14 @@
> >  extern "C" {
> >  #endif 
> >  
> > +#include "urcu-map.h"
> > +
> >  /*
> >   * Important !
> >   *
> >   * Each thread containing read-side critical sections must be registered
> > - * with rcu_register_thread() before calling rcu_read_lock().
> > - * rcu_unregister_thread() should be called before the thread exits.
> > + * with rcu_register_thread_mb() before calling rcu_read_lock_mb().
> > + * rcu_unregister_thread_mb() should be called before the thread exits.
> >   */
> >  
> >  #ifdef _LGPL_SOURCE
> > @@ -68,8 +70,16 @@ extern "C" {
> >   * DON'T FORGET TO USE RCU_REGISTER/UNREGISTER_THREAD() FOR EACH THREAD WITH
> >   * READ-SIDE CRITICAL SECTION.
> >   */
> > -#define rcu_read_lock()		_rcu_read_lock()
> > -#define rcu_read_unlock()	_rcu_read_unlock()
> > +#ifdef RCU_MEMBARRIER
> > +#define rcu_read_lock_memb()		_rcu_read_lock()
> > +#define rcu_read_unlock_memb()		_rcu_read_unlock()
> > +#elif defined(RCU_SIGNAL)
> > +#define rcu_read_lock_sig()		_rcu_read_lock()
> > +#define rcu_read_unlock_sig()		_rcu_read_unlock()
> > +#elif defined(RCU_MB)
> > +#define rcu_read_lock_mb()		_rcu_read_lock()
> > +#define rcu_read_unlock_mb()		_rcu_read_unlock()
> > +#endif
> >  
> >  #else /* !_LGPL_SOURCE */
> >  
> > @@ -100,4 +110,6 @@ extern void rcu_init(void);
> >  }
> >  #endif
> >  
> > +#include "urcu-call-rcu.h"
> > +
> >  #endif /* _URCU_H */
> > -- 
> > 1.7.3.2
> > 
> > 
> > _______________________________________________
> > rp mailing list
> > rp at svcs.cs.pdx.edu
> > http://svcs.cs.pdx.edu/mailman/listinfo/rp
> 
> -- 
> Mathieu Desnoyers
> Operating System Efficiency R&D Consultant
> EfficiOS Inc.
> http://www.efficios.com




More information about the lttng-dev mailing list