[ltt-dev] [rp] [PATCH RFC] v2 call_rcu() interface for userspace-rcu

Mathieu Desnoyers mathieu.desnoyers at efficios.com
Mon Feb 14 10:38:44 EST 2011


* Paul E. McKenney (paulmck at linux.vnet.ibm.com) wrote:
> On Fri, Feb 11, 2011 at 08:46:44PM -0500, Mathieu Desnoyers wrote:
> > * Paul E. McKenney (paulmck at linux.vnet.ibm.com) wrote:
> > > Adds call_rcu(), with RCU threads to invoke the callbacks.  By default,
> > > there will be one such RCU thread per process, created the first time
> > > that call_rcu() is invoked.  On systems supporting sched_getcpu(), it
> > > is possible to create one RCU thread per CPU by calling
> > > create_all_cpu_call_rcu_data().
> > > 
> > > This version includes feedback from Mathieu Desnoyers and forward-ports
> > > to the new cds naming scheme.  Note that v1 was posted on October 29th.
> > 
> > Hi Paul,
> > 
> > The interfaces have improved a lot, thanks ! Here are a few comments,
> > 
> > > 
> > > Signed-off-by: Paul E. McKenney <paulmck at linux.vnet.ibm.com>
> > > 
> > > diff --git a/Makefile.am b/Makefile.am
> > > index 79a7152..64bb299 100644
> > > --- a/Makefile.am
> > > +++ b/Makefile.am
> > > @@ -29,8 +29,8 @@ COMPAT+=compat_futex.c
> > >  endif
> > >  
> > >  lib_LTLIBRARIES = liburcu.la liburcu-qsbr.la liburcu-mb.la liburcu-signal.la \
> > > -		  liburcu-bp.la liburcu-defer.la libwfqueue.la libwfstack.la \
> > > -		  librculfqueue.la librculfstack.la
> > > +		  liburcu-bp.la liburcu-defer.la liburcu-call.la \
> > > +		  libwfqueue.la libwfstack.la librculfqueue.la librculfstack.la
> > >  
> > >  liburcu_la_SOURCES = urcu.c urcu-pointer.c $(COMPAT)
> > >  
> > > @@ -44,6 +44,7 @@ liburcu_signal_la_CFLAGS = -DRCU_SIGNAL
> > >  
> > >  liburcu_bp_la_SOURCES = urcu-bp.c urcu-pointer.c $(COMPAT)
> > >  
> > > +liburcu_call_la_SOURCES = urcu-call-rcu.c $(COMPAT)
> > >  liburcu_defer_la_SOURCES = urcu-defer.c $(COMPAT)
> > >  
> > >  libwfqueue_la_SOURCES = wfqueue.c $(COMPAT)
> > > diff --git a/configure.ac b/configure.ac
> > > index 02780e7..88771d4 100644
> > > --- a/configure.ac
> > > +++ b/configure.ac
> > > @@ -34,7 +34,7 @@ AC_TYPE_SIZE_T
> > >  # Checks for library functions.
> > >  AC_FUNC_MALLOC
> > >  AC_FUNC_MMAP
> > > -AC_CHECK_FUNCS([bzero gettimeofday munmap strtoul])
> > > +AC_CHECK_FUNCS([bzero gettimeofday munmap sched_getcpu strtoul sysconf])
> > >  # Find arch type
> > >  case $host_cpu in
> > > diff --git a/tests/Makefile.am b/tests/Makefile.am
> > > index a43dd75..3c025a4 100644
> > > --- a/tests/Makefile.am
> > > +++ b/tests/Makefile.am
> > > @@ -1,5 +1,5 @@
> > >  AM_LDFLAGS=-lpthread
> > > -AM_CFLAGS=-I$(top_srcdir) -I$(top_builddir)
> > > +AM_CFLAGS=-I$(top_srcdir) -I$(top_builddir) -g
> > 
> > It makes me think: I noticed this week that -Wall is missing from the top level
> > build options. We should really add it.
> 
> Fair enough.  It did find a nasty uninitialized variable, so worth it in
> this case.  The ignored return values from write() I leave to you.  ;-)
> Ditto the "warning: value computed is not used" in rculfqueue-static.h.

OK, I'll wait for your patches before I change anything though.

> 
> > >  noinst_PROGRAMS = test_urcu test_urcu_dynamic_link test_urcu_timing \
> > >  	test_urcu_signal test_urcu_signal_dynamic_link test_urcu_signal_timing \
> > > @@ -28,20 +28,21 @@ if COMPAT_FUTEX
> > >  COMPAT+=$(top_srcdir)/compat_futex.c
> > >  endif
> > >  
> > > -URCU=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-pointer.c $(COMPAT)
> > > -URCU_QSBR=$(top_srcdir)/urcu-qsbr.c $(top_srcdir)/urcu-pointer.c $(COMPAT)
> > > +URCU=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/urcu-call-rcu.c $(top_srcdir)/wfqueue.c $(COMPAT)
> > > +URCU_QSBR=$(top_srcdir)/urcu-qsbr.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/urcu-call-rcu.c $(top_srcdir)/wfqueue.c $(COMPAT)
> > >  # URCU_MB uses urcu.c but -DRCU_MB must be defined
> > > -URCU_MB=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-pointer.c $(COMPAT)
> > > +URCU_MB=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/urcu-call-rcu.c $(top_srcdir)/wfqueue.c $(COMPAT)
> > >  # URCU_SIGNAL uses urcu.c but -DRCU_SIGNAL must be defined
> > > -URCU_SIGNAL=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-pointer.c $(COMPAT)
> > > -URCU_BP=$(top_srcdir)/urcu-bp.c $(top_srcdir)/urcu-pointer.c $(COMPAT)
> > > -URCU_DEFER=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-defer.c $(top_srcdir)/urcu-pointer.c $(COMPAT)
> > > +URCU_SIGNAL=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/urcu-call-rcu.c $(top_srcdir)/wfqueue.c $(COMPAT)
> > > +URCU_BP=$(top_srcdir)/urcu-bp.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/urcu-call-rcu.c $(top_srcdir)/wfqueue.c $(COMPAT)
> > > +URCU_DEFER=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-defer.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/urcu-call-rcu.c $(top_srcdir)/wfqueue.c $(COMPAT)
> > >  
> > >  URCU_LIB=$(top_builddir)/liburcu.la
> > >  URCU_QSBR_LIB=$(top_builddir)/liburcu-qsbr.la
> > >  URCU_MB_LIB=$(top_builddir)/liburcu-mb.la
> > >  URCU_SIGNAL_LIB=$(top_builddir)/liburcu-signal.la
> > >  URCU_BP_LIB=$(top_builddir)/liburcu-bp.la
> > > +URCU_CALL_LIB=$(top_builddir)/liburcu-call.la
> > >  WFQUEUE_LIB=$(top_builddir)/libwfqueue.la
> > >  WFSTACK_LIB=$(top_builddir)/libwfstack.la
> > >  RCULFQUEUE_LIB=$(top_builddir)/librculfqueue.la
> > > @@ -95,23 +96,23 @@ test_perthreadlock_SOURCES = test_perthreadlock.c $(URCU_SIGNAL)
> > >  
> > >  rcutorture_urcu_SOURCES = urcutorture.c
> > >  rcutorture_urcu_CFLAGS = -DTORTURE_URCU $(AM_CFLAGS)
> > > -rcutorture_urcu_LDADD = $(URCU)
> > > +rcutorture_urcu_LDADD = $(URCU) $(URCU_CALL_LIB) $(WFQUEUE_LIB)
> > >  
> > >  rcutorture_urcu_mb_SOURCES = urcutorture.c
> > >  rcutorture_urcu_mb_CFLAGS = -DTORTURE_URCU_MB $(AM_CFLAGS)
> > > -rcutorture_urcu_mb_LDADD = $(URCU_MB_LIB)
> > > +rcutorture_urcu_mb_LDADD = $(URCU_MB_LIB) $(URCU_CALL_LIB) $(WFQUEUE_LIB)
> > >  
> > >  rcutorture_qsbr_SOURCES = urcutorture.c
> > >  rcutorture_qsbr_CFLAGS = -DTORTURE_QSBR $(AM_CFLAGS)
> > > -rcutorture_qsbr_LDADD = $(URCU_QSBR_LIB)
> > > +rcutorture_qsbr_LDADD = $(URCU_QSBR_LIB) $(URCU_CALL_LIB) $(WFQUEUE_LIB)
> > >  
> > >  rcutorture_urcu_signal_SOURCES = urcutorture.c
> > >  rcutorture_urcu_signal_CFLAGS = -DTORTURE_URCU_SIGNAL $(AM_CFLAGS)
> > > -rcutorture_urcu_signal_LDADD = $(URCU_SIGNAL_LIB)
> > > +rcutorture_urcu_signal_LDADD = $(URCU_SIGNAL_LIB) $(URCU_CALL_LIB) $(WFQUEUE_LIB)
> > >  
> > >  rcutorture_urcu_bp_SOURCES = urcutorture.c
> > >  rcutorture_urcu_bp_CFLAGS = -DTORTURE_URCU_BP $(AM_CFLAGS)
> > > -rcutorture_urcu_bp_LDADD = $(URCU_BP_LIB)
> > > +rcutorture_urcu_bp_LDADD = $(URCU_BP_LIB) $(URCU_CALL_LIB) $(WFQUEUE_LIB)
> > >  
> > >  test_mutex_SOURCES = test_mutex.c $(URCU)
> > >  
> > > diff --git a/tests/rcutorture.h b/tests/rcutorture.h
> > > index 4dac2f2..207d9f8 100644
> > > --- a/tests/rcutorture.h
> > > +++ b/tests/rcutorture.h
> > > @@ -65,6 +65,8 @@
> > >   * Test variables.
> > >   */
> > >  
> > > +#include "../urcu-call-rcu.h"
> > > +
> > >  DEFINE_PER_THREAD(long long, n_reads_pt);
> > >  DEFINE_PER_THREAD(long long, n_updates_pt);
> > >  
> > > @@ -296,10 +298,30 @@ void *rcu_read_stress_test(void *arg)
> > >  	return (NULL);
> > >  }
> > >  
> > > +static pthread_mutex_t call_rcu_test_mutex = PTHREAD_MUTEX_INITIALIZER;
> > > +static pthread_cond_t call_rcu_test_cond = PTHREAD_COND_INITIALIZER;
> > > +
> > > +void rcu_update_stress_test_rcu(struct rcu_head *head)
> > > +{
> > > +	if (pthread_mutex_lock(&call_rcu_test_mutex) != 0) {
> > > +		perror("pthread_mutex_lock");
> > > +		exit(-1);
> > > +	}
> > > +	if (pthread_cond_signal(&call_rcu_test_cond) != 0) {
> > > +		perror("pthread_cond_signal");
> > > +		exit(-1);
> > > +	}
> > > +	if (pthread_mutex_unlock(&call_rcu_test_mutex) != 0) {
> > > +		perror("pthread_mutex_unlock");
> > > +		exit(-1);
> > > +	}
> > > +}
> > > +
> > >  void *rcu_update_stress_test(void *arg)
> > >  {
> > >  	int i;
> > >  	struct rcu_stress *p;
> > > +	struct rcu_head rh;
> > >  
> > >  	while (goflag == GOFLAG_INIT)
> > >  		poll(NULL, 0, 1);
> > > @@ -317,7 +339,24 @@ void *rcu_update_stress_test(void *arg)
> > >  		for (i = 0; i < RCU_STRESS_PIPE_LEN; i++)
> > >  			if (i != rcu_stress_idx)
> > >  				rcu_stress_array[i].pipe_count++;
> > > -		synchronize_rcu();
> > > +		if (n_updates & 0x1)
> > > +			synchronize_rcu();
> > > +		else {
> > > +			if (pthread_mutex_lock(&call_rcu_test_mutex) != 0) {
> > > +				perror("pthread_mutex_lock");
> > > +				exit(-1);
> > > +			}
> > > +			call_rcu(&rh, rcu_update_stress_test_rcu);
> > > +			if (pthread_cond_wait(&call_rcu_test_cond,
> > > +					      &call_rcu_test_mutex) != 0) {
> > > +				perror("pthread_cond_wait");
> > > +				exit(-1);
> > > +			}
> > > +			if (pthread_mutex_unlock(&call_rcu_test_mutex) != 0) {
> > > +				perror("pthread_mutex_unlock");
> > > +				exit(-1);
> > > +			}
> > > +		}
> > >  		n_updates++;
> > >  	}
> > >  	return NULL;
> > > diff --git a/urcu-call-rcu.c b/urcu-call-rcu.c
> > > new file mode 100644
> > > index 0000000..d59a2d7
> > > --- /dev/null
> > > +++ b/urcu-call-rcu.c
> > > @@ -0,0 +1,405 @@
> > > +/*
> > > + * urcu-call-rcu.c
> > > + *
> > > + * Userspace RCU library - batch memory reclamation with kernel API
> > > + *
> > > + * Copyright (c) 2010 Paul E. McKenney <paulmck at linux.vnet.ibm.com>
> > > + *
> > > + * This library is free software; you can redistribute it and/or
> > > + * modify it under the terms of the GNU Lesser General Public
> > > + * License as published by the Free Software Foundation; either
> > > + * version 2.1 of the License, or (at your option) any later version.
> > > + *
> > > + * This library is distributed in the hope that it will be useful,
> > > + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> > > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> > > + * Lesser General Public License for more details.
> > > + *
> > > + * You should have received a copy of the GNU Lesser General Public
> > > + * License along with this library; if not, write to the Free Software
> > > + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> > > + */
> > > +
> > > +#include <stdio.h>
> > > +#include <pthread.h>
> > > +#include <signal.h>
> > > +#include <assert.h>
> > > +#include <stdlib.h>
> > > +#include <string.h>
> > > +#include <errno.h>
> > > +#include <poll.h>
> > > +#include <sys/time.h>
> > > +#include <syscall.h>
> > > +#include <unistd.h>
> > > +
> > > +#include "config.h"
> > > +#include "urcu/wfqueue.h"
> > > +#include "urcu-call-rcu.h"
> > > +#include "urcu-pointer.h"
> > > +
> > > +/* Data structure that identifies a call_rcu thread. */
> > > +
> > > +struct call_rcu_data {
> > > +	struct cds_wfq_queue cbs;
> > > +	unsigned long flags;
> > > +	pthread_mutex_t mtx;
> > > +	pthread_cond_t cond;
> > > +	unsigned long qlen;
> > > +	pthread_t tid;
> > > +} __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
> > > +
> > > +/* Link a thread using call_rcu() to its call_rcu thread. */
> > > +
> > > +static __thread struct call_rcu_data *thread_call_rcu_data;
> > > +
> > > +/* Guard call_rcu thread creation. */
> > > +
> > > +static pthread_mutex_t call_rcu_mutex = PTHREAD_MUTEX_INITIALIZER;
> > > +
> > > +/* If a given thread does not have its own call_rcu thread, this is default. */
> > > +
> > > +static struct call_rcu_data *default_call_rcu_data;
> > > +
> > > +extern void synchronize_rcu(void);
> > > +
> > > +/*
> > > + * If the sched_getcpu() and sysconf(_SC_NPROCESSORS_CONF) calls are
> > > + * available, then we can have call_rcu threads assigned to individual
> > > + * CPUs rather than only to specific threads.
> > > + */
> > > +
> > > +#if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF)
> > > +
> > > +/*
> > > + * Pointer to array of pointers to per-CPU call_rcu_data structures
> > > + * and # CPUs.
> > > + */
> > > +
> > > +static struct call_rcu_data **per_cpu_call_rcu_data;
> > > +static long maxcpus;
> > > +
> > > +/* Allocate the array if it has not already been allocated. */
> > > +
> > > +static void alloc_cpu_call_rcu_data(void)
> > > +{
> > > +	struct call_rcu_data **p;
> > > +
> > > +	if (maxcpus != 0)
> > > +		return;
> > > +	maxcpus = sysconf(_SC_NPROCESSORS_CONF);
> > 
> > I noticed an odd sysconf behavior inside a chroot jail where /proc was not
> > mounted: it returns 1 single CPU for _SC_NPROCESSORS_CONF, which would be
> > catastrophic here because the array would be too small to match the index
> > returned by sched_getcpu. Any thought on how we should deal with this ?
> > Maybe doing an extra value sanity check in our own wrapper over sched_getcpu
> > would be appropriate ?
> 
> Good catch, we do need a range check in get_call_rcu_data(), which I
> I have added.  That is the only sched_getcpu() in the call_rcu() code,
> so that should cover it.  If the CPU is out of range, we fall back to
> the default global CPU.

I'm wondering if it is OK for a library to silently accept that we're falling
back on global queue rather than per-cpu if the range is incorrect and when we
fail to allocate memory. See below,

> 
> > > +	if (maxcpus == -1)
> > > +		return;
> > > +	p = malloc(maxcpus * sizeof(*per_cpu_call_rcu_data));
> > 
> > Shouldn't we yell or something if p is null ? :-)
> 
> If p is NULL, it will just force use of the global call_rcu() thread.
> The data for this thread is static, so always there.
> 
> But what form of yelling did you have in mind?

e.g.

if (!p)
  fprintf(stderr, "[error] liburcu: unable to allocate per_cpu data\n");

> 
> > > +	if (p != NULL)
> > > +		memset(p, '\0', maxcpus * sizeof(*per_cpu_call_rcu_data));
> > > +}
> > > +
> > > +#else /* #if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF) */
> > > +
> > > +static const struct call_rcu_data **per_cpu_call_rcu_data = NULL;
> > > +static const long maxcpus = -1;
> > > +
> > > +static void alloc_cpu_call_rcu_data(void)
> > > +{
> > > +}
> > > +
> > > +static int sched_getcpu(void)
> > > +{
> > > +	return -1;
> > > +}
> > > +
> > > +#endif /* #else #if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF) */
> > > +
> > > +/* Acquire the specified pthread mutex. */
> > > +
> > > +static void call_rcu_lock(pthread_mutex_t *pmp)
> > > +{
> > > +	if (pthread_mutex_lock(pmp) != 0) {
> > > +		perror("pthread_mutex_lock");
> > > +		exit(-1);
> > > +	}
> > > +}
> > > +
> > > +/* Release the specified pthread mutex. */
> > > +
> > > +static void call_rcu_unlock(pthread_mutex_t *pmp)
> > > +{
> > > +	if (pthread_mutex_unlock(pmp) != 0) {
> > > +		perror("pthread_mutex_unlock");
> > > +		exit(-1);
> > > +	}
> > > +}
> > > +
> > > +/* This is the code run by each call_rcu thread. */
> > > +
> > > +static void *call_rcu_thread(void *arg)
> > > +{
> > > +	unsigned long cbcount;
> > > +	struct cds_wfq_node *cbs;
> > > +	struct cds_wfq_node **cbs_tail;
> > > +	struct call_rcu_data *crdp = (struct call_rcu_data *)arg;
> > > +	struct rcu_head *rhp;
> > > +
> > > +	thread_call_rcu_data = crdp;
> > > +	for (;;) {
> > > +		if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) {
> > > +			while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL)
> > > +				poll(NULL, 0, 1);
> > > +			_CMM_STORE_SHARED(crdp->cbs.head, NULL);
> > > +			cbs_tail = (struct cds_wfq_node **)
> > > +				uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head);
> > > +			synchronize_rcu();
> > > +			cbcount = 0;
> > > +			do {
> > > +				while (cbs->next == NULL &&
> > > +				       &cbs->next != cbs_tail)
> > > +				       	poll(NULL, 0, 1);
> > > +				if (cbs == &crdp->cbs.dummy) {
> > > +					cbs = cbs->next;
> > > +					continue;
> > > +				}
> > > +				rhp = (struct rcu_head *)cbs;
> > > +				cbs = cbs->next;
> > > +				rhp->func(rhp);
> > > +				cbcount++;
> > > +			} while (cbs != NULL);
> > > +			uatomic_sub(&crdp->qlen, cbcount);
> > > +		}
> > > +		if (crdp->flags & URCU_CALL_RCU_RT)
> > > +			poll(NULL, 0, 10);
> > > +		else {
> > > +			call_rcu_lock(&crdp->mtx);
> > > +			_CMM_STORE_SHARED(crdp->flags,
> > > +				     crdp->flags & ~URCU_CALL_RCU_RUNNING);
> > > +			if (&cbs->next != crdp->cbs.tail &&
> > > +			    pthread_cond_wait(&crdp->cond, &crdp->mtx) != 0) {
> > > +				perror("pthread_cond_wait");
> > > +				exit(-1);
> > > +			}
> > > +			_CMM_STORE_SHARED(crdp->flags,
> > > +				     crdp->flags | URCU_CALL_RCU_RUNNING);
> > > +			poll(NULL, 0, 10);
> > > +			call_rcu_unlock(&crdp->mtx);
> > > +		}
> > > +	}
> > > +	return NULL;  /* NOTREACHED */
> > 
> > I'm not sure how this thread is supposed to clean up properly at process exit ?
> 
> Given that all it has is in-memory data structures, when exit() makes
> the address space vanish, that will implicity clean things up for this
> thread.
> 
> Or am I missing your point?

Well, technically, we should free all memory that has been allocated by malloc.
Otherwise, memory leak detection tools like valgrind will not be happy at all.
So relying on process exit to free memory seems a bit inelegant.

> 
> > > +}
> > > +
> > > +/*
> > > + * Create both a call_rcu thread and the corresponding call_rcu_data
> > > + * structure, linking the structure in as specified.
> > > + */
> > > +
> > > +void call_rcu_data_init(struct call_rcu_data **crdpp, unsigned long flags)
> > > +{
> > > +	struct call_rcu_data *crdp;
> > > +
> > > +	crdp = malloc(sizeof(*crdp));
> > > +	if (crdp == NULL) {
> > > +		fprintf(stderr, "Out of memory.\n");
> > > +		exit(-1);
> > > +	}
> > > +	memset(crdp, '\0', sizeof(*crdp));
> > > +	cds_wfq_init(&crdp->cbs);
> > > +	crdp->qlen = 0;
> > > +	if (pthread_mutex_init(&crdp->mtx, NULL) != 0) {
> > > +		perror("pthread_mutex_init");
> > > +		exit(-1);
> > > +	}
> > > +	if (pthread_cond_init(&crdp->cond, NULL) != 0) {
> > > +		perror("pthread_cond_init");
> > > +		exit(-1);
> > > +	}
> > > +	crdp->flags = flags | URCU_CALL_RCU_RUNNING;
> > > +	cmm_smp_mb();  /* Structure initialized before pointer is planted. */
> > > +	*crdpp = crdp;
> > > +	if (pthread_create(&crdp->tid, NULL, call_rcu_thread, crdp) != 0) {
> > > +		perror("pthread_create");
> > > +		exit(-1);
> > > +	}
> > > +}
> > > +
> > > +/*
> > > + * Return a pointer to the call_rcu_data structure for the specified
> > > + * CPU, returning NULL if there is none.  We cannot automnatically
> > 
> > automnatically -> automatically
> 
> Good eyes, fixed.
> 
> But wouldn't you really prefer automaniacally?  ;-)

lol :)

> 
> > > + * created it because the platform we are running on might not define
> > > + * sched_getcpu().
> > > + */
> > > +
> > > +struct call_rcu_data *get_cpu_call_rcu_data(int cpu)
> > > +{
> > > +	if (per_cpu_call_rcu_data == NULL)
> > > +		return NULL;
> > > +	if (cpu < 0 || maxcpus <= cpu)
> > > +		return NULL;
> > 
> > Ah! I think this is where is would make sense to validate that "cpu" is not out
> > of bounds. If it is, then we could fallback on the default call_rcu thread, or
> > yell about lack of /proc.
> 
> You lost me here -- we are validating that the "cpu" is not out of bounds
> with respect to the array bounds.  The caller sees the NULL pointer and
> acts accordingly.
> 
> I did need to add a check to get_call_rcu_data() down below, which I did.

Well my point is that letting the library silently take a slower fallback
without complaining as a result of an error situation (-ENOMEM, /proc
unavailable) is, IMHO, heading towards a disaster. We want the library to print
an error message, at least once, when it find about such problematic situations.

In this specific case, if "cpu > maxcpus", something _is_ wrong, so we should
print an appropriate error message (and fallback on the slower default callback
queue of course).

> > > +	return per_cpu_call_rcu_data[cpu];
> > > +}
> > > +
> > > +/*
> > > + * Return the tid corresponding to the call_rcu thread whose
> > > + * call_rcu_data structure is specified.
> > > + */
> > > +
> > > +pthread_t get_call_rcu_thread(struct call_rcu_data *crdp)
> > > +{
> > > +	return crdp->tid;
> > > +}
> > > +
> > > +/*
> > > + * Create a call_rcu_data structure (with thread) and return a pointer.
> > > + */
> > > +
> > > +struct call_rcu_data *create_call_rcu_data(unsigned long flags)
> > > +{
> > > +	struct call_rcu_data *crdp;
> > > +
> > > +	call_rcu_data_init(&crdp, flags);
> > > +	return crdp;
> > > +}
> > > +
> > > +/*
> > > + * Set the specified CPU to use the specified call_rcu_data structure.
> > > + */
> > > +
> > > +int set_cpu_call_rcu_data(int cpu, struct call_rcu_data *crdp)
> > > +{
> > > +	call_rcu_lock(&call_rcu_mutex);
> > > +	alloc_cpu_call_rcu_data();
> > > +	call_rcu_unlock(&call_rcu_mutex);
> > > +	if (cpu < 0 || maxcpus <= cpu || per_cpu_call_rcu_data == NULL)
> > > +		return 0;
> > > +	per_cpu_call_rcu_data[cpu] = crdp;
> > 
> > another point where we chould validate that cpu is not out of bound.
> 
> The validation is already there: "cpu < 0 || maxcpus <= cpu".
> 
> What am I missing here?

Same as the previous one: if (cpu > maxcpus), we know that something is wrong.
I'm OK with returning 0 and using a fallback, but only after printing an error
message in this case.

Also, I'm wondering if we should reverse the return logic here: usually, other
functions in this library return 0 on success, and -ESOMETHING on error.

> 
> > > +	return 1;
> > > +}
> > > +
> > > +/*
> > > + * Return a pointer to the default call_rcu_data structure, creating
> > > + * one if need be.  Because we never free call_rcu_data structures,
> > > + * we don't need to be in an RCU read-side critical section.
> > > + */
> > > +
> > > +struct call_rcu_data *get_default_call_rcu_data(void)
> > > +{
> > > +	if (default_call_rcu_data != NULL)
> > > +		return rcu_dereference(default_call_rcu_data);
> > > +	call_rcu_lock(&call_rcu_mutex);
> > > +	if (default_call_rcu_data != NULL) {
> > > +		call_rcu_unlock(&call_rcu_mutex);
> > > +		return default_call_rcu_data;
> > > +	}
> > > +	call_rcu_data_init(&default_call_rcu_data, 0);
> > > +	call_rcu_unlock(&call_rcu_mutex);
> > > +	return default_call_rcu_data;
> > > +}
> > > +
> > > +/*
> > > + * Return the call_rcu_data structure that applies to the currently
> > > + * running thread.  Any call_rcu_data structure assigned specifically
> > > + * to this thread has first priority, followed by any call_rcu_data
> > > + * structure assigned to the CPU on which the thread is running,
> > > + * followed by the default call_rcu_data structure.  If there is not
> > > + * yet a default call_rcu_data structure, one will be created.
> > > + */
> > > +struct call_rcu_data *get_call_rcu_data(void)
> > > +{
> > > +	int curcpu;
> > > +
> > > +	if (thread_call_rcu_data != NULL)
> > > +		return thread_call_rcu_data;
> > > +	curcpu = sched_getcpu();
> 
> Now -here- we need validation.  The following "if" becomes:
> 
> 	if (curcpu >= 0 && maxcpus > curcpu &&
> 	    per_cpu_call_rcu_data != NULL &&
> 	    per_cpu_call_rcu_data[curcpu] != NULL)
> 	    	return per_cpu_call_rcu_data[curcpu];
> 
> > > +	if (per_cpu_call_rcu_data != NULL &&
> > > +	    per_cpu_call_rcu_data[curcpu] != NULL)
> > 
> > Another array bounds validation here ?
> 
> Yep, see above.

I'd also be tempted to print an error message (once) for the specific case where
cpu > maxcpus.

> 
> > > +	    	return per_cpu_call_rcu_data[curcpu];
> > > +	return get_default_call_rcu_data();
> > > +}
> > > +
> > > +/*
> > > + * Return a pointer to this task's call_rcu_data if there is one.
> > > + */
> > > +
> > > +struct call_rcu_data *get_thread_call_rcu_data(void)
> > > +{
> > > +	return thread_call_rcu_data;
> > > +}
> > > +
> > > +/*
> > > + * Set this task's call_rcu_data structure as specified, regardless
> > > + * of whether or not this task already had one.  (This allows switching
> > > + * to and from real-time call_rcu threads, for example.)
> > > + */
> > > +
> > > +void set_thread_call_rcu_data(struct call_rcu_data *crdp)
> > > +{
> > > +	thread_call_rcu_data = crdp;
> > > +}
> > > +
> > > +/*
> > > + * Create a separate call_rcu thread for each CPU.  This does not
> > > + * replace a pre-existing call_rcu thread -- use the set_cpu_call_rcu_data()
> > > + * function if you want that behavior.
> > > + */
> > > +
> > > +int create_all_cpu_call_rcu_data(unsigned long flags)
> > > +{
> > > +	int i;
> > > +	struct call_rcu_data *crdp;
> > > +
> > > +	call_rcu_lock(&call_rcu_mutex);
> > > +	alloc_cpu_call_rcu_data();
> > > +	call_rcu_unlock(&call_rcu_mutex);
> > > +	if (maxcpus == -1 || per_cpu_call_rcu_data == NULL)
> > > +		return 0;
> > > +	for (i = 0; i < maxcpus; i++) {
> > > +		call_rcu_lock(&call_rcu_mutex);
> > > +		if (get_cpu_call_rcu_data(i)) {
> > > +			call_rcu_unlock(&call_rcu_mutex);
> > > +			continue;
> > > +		}
> > > +		crdp = create_call_rcu_data(flags);
> > 
> > Hrm, just so we're clear: on a 8-way system with a kernel supporting up to 32
> > possible cpus, this would create 32 threads per application. Is it really what
> > we want ?
> 
> Only if the application invokes create_all_cpu_call_rcu_data().
> If an application uses call_rcu() only rarely, then it should avoid
> calling create_all_cpu_call_rcu_data(), instead relying on the single
> default call_rcu() thread.
> 
> > For the non-RT case, we might want to investigate using sysconf get "online"
> > cpus, and fixup dynamically within call_rcu() if we discover that
> > 
> > 1) per-cpu rcu_data should be available
> > 2) some data within our per-cpu rcu_data tells us that it has not been allocated
> > 
> > So we could allocate the per-cpu rcu_data array for the number of possible cpus
> > on the system, but only allocate more expensive ressources (e.g. create threads)
> > for the number of online cpus, and dynamically spawn threads if needed.
> > 
> > We could leave the code as is for the RT case (allocate threads for each
> > possible cpus). Thoughts ?
> 
> I am for keeping it simple to begin with.  On most modern systems,
> I bet that the extra few tens of threads is not going to be a big deal.

Simple to begin with is good enough for me, and as you say, the default is the
global queue.

> 
> > > +		if (crdp == NULL) {
> > > +			call_rcu_unlock(&call_rcu_mutex);
> > > +			return 0;
> > > +		}
> > > +		if (!set_cpu_call_rcu_data(i, crdp)) {
> > > +			call_rcu_unlock(&call_rcu_mutex);
> > > +			return 0; /* should not happen, but... */
> > > +		}
> > > +		call_rcu_unlock(&call_rcu_mutex);
> > > +	}
> > > +	return 1;
> > > +}
> > > +
> > > +/*
> > > + * Schedule a function to be invoked after a following grace period.
> > > + * This is the only function that must be called -- the others are
> > > + * only present to allow applications to tune their use of RCU for
> > > + * maximum performance.
> > > + *
> > > + * Note that unless a call_rcu thread has not already been created,
> > > + * the first invocation of call_rcu() will create one.  So, if you
> > > + * need the first invocation of call_rcu() to be fast, make sure
> > > + * to create a call_rcu thread first.  One way to accomplish this is
> > > + * "get_call_rcu_data();", and another is create_all_cpu_call_rcu_data().
> > > + */
> > > +
> > > +void call_rcu(struct rcu_head *head,
> > > +	      void (*func)(struct rcu_head *head))
> > > +{
> > > +	struct call_rcu_data *crdp;
> > > +
> > > +	cds_wfq_node_init(&head->next);
> > > +	head->func = func;
> > > +	crdp = get_call_rcu_data();
> > > +	cds_wfq_enqueue(&crdp->cbs, &head->next);
> > > +	uatomic_inc(&crdp->qlen);
> > > +	if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RT)) {
> > > +		call_rcu_lock(&crdp->mtx);
> > > +		if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RUNNING)) {
> > > +			if (pthread_cond_signal(&crdp->cond) != 0) {
> > > +				perror("pthread_cond_signal");
> > > +				exit(-1);
> > > +			}
> > > +		}
> > > +		call_rcu_unlock(&crdp->mtx);
> > > +	}
> > > +}
> > > diff --git a/urcu-call-rcu.h b/urcu-call-rcu.h
> > > new file mode 100644
> > > index 0000000..b390b7e
> > > --- /dev/null
> > > +++ b/urcu-call-rcu.h
> > > @@ -0,0 +1,80 @@
> > > +#ifndef _URCU_CALL_RCU_H
> > > +#define _URCU_CALL_RCU_H
> > > +
> > > +/*
> > > + * urcu-defer.h
> > 
> > urcu-call-rcu.h
> 
> Good catch, fixed.
> 
> > > + *
> > > + * Userspace RCU header - deferred execution
> > > + *
> > > + * Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> > > + * Copyright (c) 2009 Paul E. McKenney, IBM Corporation.
> > > + *
> > > + * LGPL-compatible code should include this header with :
> > > + *
> > > + * #define _LGPL_SOURCE
> > > + * #include <urcu-defer.h>
> > > + *
> > > + * This library is free software; you can redistribute it and/or
> > > + * modify it under the terms of the GNU Lesser General Public
> > > + * License as published by the Free Software Foundation; either
> > > + * version 2.1 of the License, or (at your option) any later version.
> > > + *
> > > + * This library is distributed in the hope that it will be useful,
> > > + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> > > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> > > + * Lesser General Public License for more details.
> > > + *
> > > + * You should have received a copy of the GNU Lesser General Public
> > > + * License along with this library; if not, write to the Free Software
> > > + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> > > + */
> > > +
> > > +#include <stdlib.h>
> > > +#include <pthread.h>
> > > +
> > > +#include "urcu/wfqueue.h"
> > 
> > should this header be between < > instead ?
> 
> No idea, I was just copying the other examples using double quotes.
> 
> I fixed this one to use angle brackets, and leave the others to you.

Hopefully I'm not missing something I should myself remember. :-/ This might
become important when we choose to lookup for headers in the urcu tree before
the same header installed in the system.

> 
> > > +
> > > +#ifdef __cplusplus
> > > +extern "C" {
> > > +#endif
> > > +
> > > +/* Note that struct call_rcu_data is opaque to callers. */
> > > +
> > > +struct call_rcu_data;
> > > +
> > > +/* Flag values. */
> > > +
> > > +#define URCU_CALL_RCU_RT	0x1
> > > +#define URCU_CALL_RCU_RUNNING	0x2
> > > +
> > > +/*
> > > + * The rcu_head data structure is placed in the structure to be freed
> > > + * via call_rcu().
> > > + */
> > > +
> > > +struct rcu_head {
> > > +	struct cds_wfq_node next;
> > > +	void (*func)(struct rcu_head *head);
> > > +};
> > > +
> > > +/*
> > > + * Exported functions
> > > + */
> > > +void call_rcu_data_init(struct call_rcu_data **crdpp, unsigned long flags);
> > > +struct call_rcu_data *get_cpu_call_rcu_data(int cpu);
> > > +pthread_t get_call_rcu_thread(struct call_rcu_data *crdp);
> > > +struct call_rcu_data *create_call_rcu_data(unsigned long flags);
> > > +int set_cpu_call_rcu_data(int cpu, struct call_rcu_data *crdp);
> > > +struct call_rcu_data *get_default_call_rcu_data(void);
> > > +struct call_rcu_data *get_call_rcu_data(void);
> > > +struct call_rcu_data *get_thread_call_rcu_data(void);
> > > +void set_thread_call_rcu_data(struct call_rcu_data *crdp);
> > > +int create_all_cpu_call_rcu_data(unsigned long flags);
> > > +void call_rcu(struct rcu_head *head,
> > > +	      void (*func)(struct rcu_head *head));
> > > +
> > > +#ifdef __cplusplus 
> > > +}
> > > +#endif
> > > +
> > > +#endif /* _URCU_CALL_RCU_H */
> > > diff --git a/urcu-defer.h b/urcu-defer.h
> > > index e161616..a64c75c 100644
> > > --- a/urcu-defer.h
> > > +++ b/urcu-defer.h
> > > @@ -53,14 +53,6 @@ extern "C" {
> > >  extern void defer_rcu(void (*fct)(void *p), void *p);
> > >  
> > >  /*
> > > - * call_rcu will eventually be implemented with an API similar to the Linux
> > > - * kernel call_rcu(), which will allow its use within RCU read-side C.S.
> > > - * Generate an error if used for now.
> > > - */
> > > -
> > > -#define call_rcu	__error_call_rcu_not_implemented_please_use_defer_rcu
> > > -
> > > -/*
> > >   * Thread registration for reclamation.
> > >   */
> > >  extern void rcu_defer_register_thread(void);
> > > diff --git a/urcu/wfqueue-static.h b/urcu/wfqueue-static.h
> > > index 30d6e96..a989f28 100644
> > > --- a/urcu/wfqueue-static.h
> > > +++ b/urcu/wfqueue-static.h
> > > @@ -47,11 +47,17 @@ extern "C" {
> > >  #define WFQ_ADAPT_ATTEMPTS		10	/* Retry if being set */
> > >  #define WFQ_WAIT			10	/* Wait 10 ms if being set */
> > >  
> > > +#ifdef _LGPL_SOURCE
> > > +static inline
> > > +#endif /* #ifdef _LGPL_SOURCE */
> > 
> > We should actually declare all these as "static inline" in every case, and
> > create a little .c file in the urcu root to contain the wrapper stub library
> > apps could link to. This would follow the same layout I've used for all the
> > other code.
> 
> Let me make sure I understand what you are looking for here...
> 
> You want a urcu-call-rcu-static.h, which will contain the current
> contents of urcu-call-rcu.c, but with all exported functions having
> their names prefixed by "_".  Then the new urcu-call-rcu.c has one
> wrapper function per exported function, correct?

Yes, and urcu-call.h defines wrapper macros for the _LGPL_SOURCE case. urcu.h,
urcu-static.h and urcu.c are very good examples of this.

Thanks Paul !

Mathieu

> 
> 							Thanx, Paul
> 
> > Thanks,
> > 
> > Mathieu
> > 
> > >  void _cds_wfq_node_init(struct cds_wfq_node *node)
> > >  {
> > >  	node->next = NULL;
> > >  }
> > >  
> > > +#ifdef _LGPL_SOURCE
> > > +static inline
> > > +#endif /* #ifdef _LGPL_SOURCE */
> > >  void _cds_wfq_init(struct cds_wfq_queue *q)
> > >  {
> > >  	int ret;
> > > @@ -64,6 +70,9 @@ void _cds_wfq_init(struct cds_wfq_queue *q)
> > >  	assert(!ret);
> > >  }
> > >  
> > > +#ifdef _LGPL_SOURCE
> > > +static inline
> > > +#endif /* #ifdef _LGPL_SOURCE */
> > >  void _cds_wfq_enqueue(struct cds_wfq_queue *q, struct cds_wfq_node *node)
> > >  {
> > >  	struct cds_wfq_node **old_tail;
> > > @@ -90,6 +99,9 @@ void _cds_wfq_enqueue(struct cds_wfq_queue *q, struct cds_wfq_node *node)
> > >   * thread to be scheduled. The queue appears empty until tail->next is set by
> > >   * enqueue.
> > >   */
> > > +#ifdef _LGPL_SOURCE
> > > +static inline
> > > +#endif /* #ifdef _LGPL_SOURCE */
> > >  struct cds_wfq_node *
> > >  ___cds_wfq_dequeue_blocking(struct cds_wfq_queue *q)
> > >  {
> > > @@ -128,6 +140,9 @@ ___cds_wfq_dequeue_blocking(struct cds_wfq_queue *q)
> > >  	return node;
> > >  }
> > >  
> > > +#ifdef _LGPL_SOURCE
> > > +static inline
> > > +#endif /* #ifdef _LGPL_SOURCE */
> > >  struct cds_wfq_node *
> > >  _cds_wfq_dequeue_blocking(struct cds_wfq_queue *q)
> > >  {
> > 
> > -- 
> > Mathieu Desnoyers
> > Operating System Efficiency R&D Consultant
> > EfficiOS Inc.
> > http://www.efficios.com
> > 
> > _______________________________________________
> > rp mailing list
> > rp at svcs.cs.pdx.edu
> > http://svcs.cs.pdx.edu/mailman/listinfo/rp

-- 
Mathieu Desnoyers
Operating System Efficiency R&D Consultant
EfficiOS Inc.
http://www.efficios.com




More information about the lttng-dev mailing list