[ltt-dev] [PATCH RFC] v2 call_rcu() interface for userspace-rcu
Mathieu Desnoyers
mathieu.desnoyers at efficios.com
Fri Feb 11 20:46:44 EST 2011
* Paul E. McKenney (paulmck at linux.vnet.ibm.com) wrote:
> Adds call_rcu(), with RCU threads to invoke the callbacks. By default,
> there will be one such RCU thread per process, created the first time
> that call_rcu() is invoked. On systems supporting sched_getcpu(), it
> is possible to create one RCU thread per CPU by calling
> create_all_cpu_call_rcu_data().
>
> This version includes feedback from Mathieu Desnoyers and forward-ports
> to the new cds naming scheme. Note that v1 was posted on October 29th.
Hi Paul,
The interfaces have improved a lot, thanks ! Here are a few comments,
>
> Signed-off-by: Paul E. McKenney <paulmck at linux.vnet.ibm.com>
>
> diff --git a/Makefile.am b/Makefile.am
> index 79a7152..64bb299 100644
> --- a/Makefile.am
> +++ b/Makefile.am
> @@ -29,8 +29,8 @@ COMPAT+=compat_futex.c
> endif
>
> lib_LTLIBRARIES = liburcu.la liburcu-qsbr.la liburcu-mb.la liburcu-signal.la \
> - liburcu-bp.la liburcu-defer.la libwfqueue.la libwfstack.la \
> - librculfqueue.la librculfstack.la
> + liburcu-bp.la liburcu-defer.la liburcu-call.la \
> + libwfqueue.la libwfstack.la librculfqueue.la librculfstack.la
>
> liburcu_la_SOURCES = urcu.c urcu-pointer.c $(COMPAT)
>
> @@ -44,6 +44,7 @@ liburcu_signal_la_CFLAGS = -DRCU_SIGNAL
>
> liburcu_bp_la_SOURCES = urcu-bp.c urcu-pointer.c $(COMPAT)
>
> +liburcu_call_la_SOURCES = urcu-call-rcu.c $(COMPAT)
> liburcu_defer_la_SOURCES = urcu-defer.c $(COMPAT)
>
> libwfqueue_la_SOURCES = wfqueue.c $(COMPAT)
> diff --git a/configure.ac b/configure.ac
> index 02780e7..88771d4 100644
> --- a/configure.ac
> +++ b/configure.ac
> @@ -34,7 +34,7 @@ AC_TYPE_SIZE_T
> # Checks for library functions.
> AC_FUNC_MALLOC
> AC_FUNC_MMAP
> -AC_CHECK_FUNCS([bzero gettimeofday munmap strtoul])
> +AC_CHECK_FUNCS([bzero gettimeofday munmap sched_getcpu strtoul sysconf])
> # Find arch type
> case $host_cpu in
> diff --git a/tests/Makefile.am b/tests/Makefile.am
> index a43dd75..3c025a4 100644
> --- a/tests/Makefile.am
> +++ b/tests/Makefile.am
> @@ -1,5 +1,5 @@
> AM_LDFLAGS=-lpthread
> -AM_CFLAGS=-I$(top_srcdir) -I$(top_builddir)
> +AM_CFLAGS=-I$(top_srcdir) -I$(top_builddir) -g
It makes me think: I noticed this week that -Wall is missing from the top level
build options. We should really add it.
>
> noinst_PROGRAMS = test_urcu test_urcu_dynamic_link test_urcu_timing \
> test_urcu_signal test_urcu_signal_dynamic_link test_urcu_signal_timing \
> @@ -28,20 +28,21 @@ if COMPAT_FUTEX
> COMPAT+=$(top_srcdir)/compat_futex.c
> endif
>
> -URCU=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-pointer.c $(COMPAT)
> -URCU_QSBR=$(top_srcdir)/urcu-qsbr.c $(top_srcdir)/urcu-pointer.c $(COMPAT)
> +URCU=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/urcu-call-rcu.c $(top_srcdir)/wfqueue.c $(COMPAT)
> +URCU_QSBR=$(top_srcdir)/urcu-qsbr.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/urcu-call-rcu.c $(top_srcdir)/wfqueue.c $(COMPAT)
> # URCU_MB uses urcu.c but -DRCU_MB must be defined
> -URCU_MB=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-pointer.c $(COMPAT)
> +URCU_MB=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/urcu-call-rcu.c $(top_srcdir)/wfqueue.c $(COMPAT)
> # URCU_SIGNAL uses urcu.c but -DRCU_SIGNAL must be defined
> -URCU_SIGNAL=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-pointer.c $(COMPAT)
> -URCU_BP=$(top_srcdir)/urcu-bp.c $(top_srcdir)/urcu-pointer.c $(COMPAT)
> -URCU_DEFER=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-defer.c $(top_srcdir)/urcu-pointer.c $(COMPAT)
> +URCU_SIGNAL=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/urcu-call-rcu.c $(top_srcdir)/wfqueue.c $(COMPAT)
> +URCU_BP=$(top_srcdir)/urcu-bp.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/urcu-call-rcu.c $(top_srcdir)/wfqueue.c $(COMPAT)
> +URCU_DEFER=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-defer.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/urcu-call-rcu.c $(top_srcdir)/wfqueue.c $(COMPAT)
>
> URCU_LIB=$(top_builddir)/liburcu.la
> URCU_QSBR_LIB=$(top_builddir)/liburcu-qsbr.la
> URCU_MB_LIB=$(top_builddir)/liburcu-mb.la
> URCU_SIGNAL_LIB=$(top_builddir)/liburcu-signal.la
> URCU_BP_LIB=$(top_builddir)/liburcu-bp.la
> +URCU_CALL_LIB=$(top_builddir)/liburcu-call.la
> WFQUEUE_LIB=$(top_builddir)/libwfqueue.la
> WFSTACK_LIB=$(top_builddir)/libwfstack.la
> RCULFQUEUE_LIB=$(top_builddir)/librculfqueue.la
> @@ -95,23 +96,23 @@ test_perthreadlock_SOURCES = test_perthreadlock.c $(URCU_SIGNAL)
>
> rcutorture_urcu_SOURCES = urcutorture.c
> rcutorture_urcu_CFLAGS = -DTORTURE_URCU $(AM_CFLAGS)
> -rcutorture_urcu_LDADD = $(URCU)
> +rcutorture_urcu_LDADD = $(URCU) $(URCU_CALL_LIB) $(WFQUEUE_LIB)
>
> rcutorture_urcu_mb_SOURCES = urcutorture.c
> rcutorture_urcu_mb_CFLAGS = -DTORTURE_URCU_MB $(AM_CFLAGS)
> -rcutorture_urcu_mb_LDADD = $(URCU_MB_LIB)
> +rcutorture_urcu_mb_LDADD = $(URCU_MB_LIB) $(URCU_CALL_LIB) $(WFQUEUE_LIB)
>
> rcutorture_qsbr_SOURCES = urcutorture.c
> rcutorture_qsbr_CFLAGS = -DTORTURE_QSBR $(AM_CFLAGS)
> -rcutorture_qsbr_LDADD = $(URCU_QSBR_LIB)
> +rcutorture_qsbr_LDADD = $(URCU_QSBR_LIB) $(URCU_CALL_LIB) $(WFQUEUE_LIB)
>
> rcutorture_urcu_signal_SOURCES = urcutorture.c
> rcutorture_urcu_signal_CFLAGS = -DTORTURE_URCU_SIGNAL $(AM_CFLAGS)
> -rcutorture_urcu_signal_LDADD = $(URCU_SIGNAL_LIB)
> +rcutorture_urcu_signal_LDADD = $(URCU_SIGNAL_LIB) $(URCU_CALL_LIB) $(WFQUEUE_LIB)
>
> rcutorture_urcu_bp_SOURCES = urcutorture.c
> rcutorture_urcu_bp_CFLAGS = -DTORTURE_URCU_BP $(AM_CFLAGS)
> -rcutorture_urcu_bp_LDADD = $(URCU_BP_LIB)
> +rcutorture_urcu_bp_LDADD = $(URCU_BP_LIB) $(URCU_CALL_LIB) $(WFQUEUE_LIB)
>
> test_mutex_SOURCES = test_mutex.c $(URCU)
>
> diff --git a/tests/rcutorture.h b/tests/rcutorture.h
> index 4dac2f2..207d9f8 100644
> --- a/tests/rcutorture.h
> +++ b/tests/rcutorture.h
> @@ -65,6 +65,8 @@
> * Test variables.
> */
>
> +#include "../urcu-call-rcu.h"
> +
> DEFINE_PER_THREAD(long long, n_reads_pt);
> DEFINE_PER_THREAD(long long, n_updates_pt);
>
> @@ -296,10 +298,30 @@ void *rcu_read_stress_test(void *arg)
> return (NULL);
> }
>
> +static pthread_mutex_t call_rcu_test_mutex = PTHREAD_MUTEX_INITIALIZER;
> +static pthread_cond_t call_rcu_test_cond = PTHREAD_COND_INITIALIZER;
> +
> +void rcu_update_stress_test_rcu(struct rcu_head *head)
> +{
> + if (pthread_mutex_lock(&call_rcu_test_mutex) != 0) {
> + perror("pthread_mutex_lock");
> + exit(-1);
> + }
> + if (pthread_cond_signal(&call_rcu_test_cond) != 0) {
> + perror("pthread_cond_signal");
> + exit(-1);
> + }
> + if (pthread_mutex_unlock(&call_rcu_test_mutex) != 0) {
> + perror("pthread_mutex_unlock");
> + exit(-1);
> + }
> +}
> +
> void *rcu_update_stress_test(void *arg)
> {
> int i;
> struct rcu_stress *p;
> + struct rcu_head rh;
>
> while (goflag == GOFLAG_INIT)
> poll(NULL, 0, 1);
> @@ -317,7 +339,24 @@ void *rcu_update_stress_test(void *arg)
> for (i = 0; i < RCU_STRESS_PIPE_LEN; i++)
> if (i != rcu_stress_idx)
> rcu_stress_array[i].pipe_count++;
> - synchronize_rcu();
> + if (n_updates & 0x1)
> + synchronize_rcu();
> + else {
> + if (pthread_mutex_lock(&call_rcu_test_mutex) != 0) {
> + perror("pthread_mutex_lock");
> + exit(-1);
> + }
> + call_rcu(&rh, rcu_update_stress_test_rcu);
> + if (pthread_cond_wait(&call_rcu_test_cond,
> + &call_rcu_test_mutex) != 0) {
> + perror("pthread_cond_wait");
> + exit(-1);
> + }
> + if (pthread_mutex_unlock(&call_rcu_test_mutex) != 0) {
> + perror("pthread_mutex_unlock");
> + exit(-1);
> + }
> + }
> n_updates++;
> }
> return NULL;
> diff --git a/urcu-call-rcu.c b/urcu-call-rcu.c
> new file mode 100644
> index 0000000..d59a2d7
> --- /dev/null
> +++ b/urcu-call-rcu.c
> @@ -0,0 +1,405 @@
> +/*
> + * urcu-call-rcu.c
> + *
> + * Userspace RCU library - batch memory reclamation with kernel API
> + *
> + * Copyright (c) 2010 Paul E. McKenney <paulmck at linux.vnet.ibm.com>
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, write to the Free Software
> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> + */
> +
> +#include <stdio.h>
> +#include <pthread.h>
> +#include <signal.h>
> +#include <assert.h>
> +#include <stdlib.h>
> +#include <string.h>
> +#include <errno.h>
> +#include <poll.h>
> +#include <sys/time.h>
> +#include <syscall.h>
> +#include <unistd.h>
> +
> +#include "config.h"
> +#include "urcu/wfqueue.h"
> +#include "urcu-call-rcu.h"
> +#include "urcu-pointer.h"
> +
> +/* Data structure that identifies a call_rcu thread. */
> +
> +struct call_rcu_data {
> + struct cds_wfq_queue cbs;
> + unsigned long flags;
> + pthread_mutex_t mtx;
> + pthread_cond_t cond;
> + unsigned long qlen;
> + pthread_t tid;
> +} __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
> +
> +/* Link a thread using call_rcu() to its call_rcu thread. */
> +
> +static __thread struct call_rcu_data *thread_call_rcu_data;
> +
> +/* Guard call_rcu thread creation. */
> +
> +static pthread_mutex_t call_rcu_mutex = PTHREAD_MUTEX_INITIALIZER;
> +
> +/* If a given thread does not have its own call_rcu thread, this is default. */
> +
> +static struct call_rcu_data *default_call_rcu_data;
> +
> +extern void synchronize_rcu(void);
> +
> +/*
> + * If the sched_getcpu() and sysconf(_SC_NPROCESSORS_CONF) calls are
> + * available, then we can have call_rcu threads assigned to individual
> + * CPUs rather than only to specific threads.
> + */
> +
> +#if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF)
> +
> +/*
> + * Pointer to array of pointers to per-CPU call_rcu_data structures
> + * and # CPUs.
> + */
> +
> +static struct call_rcu_data **per_cpu_call_rcu_data;
> +static long maxcpus;
> +
> +/* Allocate the array if it has not already been allocated. */
> +
> +static void alloc_cpu_call_rcu_data(void)
> +{
> + struct call_rcu_data **p;
> +
> + if (maxcpus != 0)
> + return;
> + maxcpus = sysconf(_SC_NPROCESSORS_CONF);
I noticed an odd sysconf behavior inside a chroot jail where /proc was not
mounted: it returns 1 single CPU for _SC_NPROCESSORS_CONF, which would be
catastrophic here because the array would be too small to match the index
returned by sched_getcpu. Any thought on how we should deal with this ?
Maybe doing an extra value sanity check in our own wrapper over sched_getcpu
would be appropriate ?
> + if (maxcpus == -1)
> + return;
> + p = malloc(maxcpus * sizeof(*per_cpu_call_rcu_data));
Shouldn't we yell or something if p is null ? :-)
> + if (p != NULL)
> + memset(p, '\0', maxcpus * sizeof(*per_cpu_call_rcu_data));
> +}
> +
> +#else /* #if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF) */
> +
> +static const struct call_rcu_data **per_cpu_call_rcu_data = NULL;
> +static const long maxcpus = -1;
> +
> +static void alloc_cpu_call_rcu_data(void)
> +{
> +}
> +
> +static int sched_getcpu(void)
> +{
> + return -1;
> +}
> +
> +#endif /* #else #if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF) */
> +
> +/* Acquire the specified pthread mutex. */
> +
> +static void call_rcu_lock(pthread_mutex_t *pmp)
> +{
> + if (pthread_mutex_lock(pmp) != 0) {
> + perror("pthread_mutex_lock");
> + exit(-1);
> + }
> +}
> +
> +/* Release the specified pthread mutex. */
> +
> +static void call_rcu_unlock(pthread_mutex_t *pmp)
> +{
> + if (pthread_mutex_unlock(pmp) != 0) {
> + perror("pthread_mutex_unlock");
> + exit(-1);
> + }
> +}
> +
> +/* This is the code run by each call_rcu thread. */
> +
> +static void *call_rcu_thread(void *arg)
> +{
> + unsigned long cbcount;
> + struct cds_wfq_node *cbs;
> + struct cds_wfq_node **cbs_tail;
> + struct call_rcu_data *crdp = (struct call_rcu_data *)arg;
> + struct rcu_head *rhp;
> +
> + thread_call_rcu_data = crdp;
> + for (;;) {
> + if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) {
> + while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL)
> + poll(NULL, 0, 1);
> + _CMM_STORE_SHARED(crdp->cbs.head, NULL);
> + cbs_tail = (struct cds_wfq_node **)
> + uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head);
> + synchronize_rcu();
> + cbcount = 0;
> + do {
> + while (cbs->next == NULL &&
> + &cbs->next != cbs_tail)
> + poll(NULL, 0, 1);
> + if (cbs == &crdp->cbs.dummy) {
> + cbs = cbs->next;
> + continue;
> + }
> + rhp = (struct rcu_head *)cbs;
> + cbs = cbs->next;
> + rhp->func(rhp);
> + cbcount++;
> + } while (cbs != NULL);
> + uatomic_sub(&crdp->qlen, cbcount);
> + }
> + if (crdp->flags & URCU_CALL_RCU_RT)
> + poll(NULL, 0, 10);
> + else {
> + call_rcu_lock(&crdp->mtx);
> + _CMM_STORE_SHARED(crdp->flags,
> + crdp->flags & ~URCU_CALL_RCU_RUNNING);
> + if (&cbs->next != crdp->cbs.tail &&
> + pthread_cond_wait(&crdp->cond, &crdp->mtx) != 0) {
> + perror("pthread_cond_wait");
> + exit(-1);
> + }
> + _CMM_STORE_SHARED(crdp->flags,
> + crdp->flags | URCU_CALL_RCU_RUNNING);
> + poll(NULL, 0, 10);
> + call_rcu_unlock(&crdp->mtx);
> + }
> + }
> + return NULL; /* NOTREACHED */
I'm not sure how this thread is supposed to clean up properly at process exit ?
> +}
> +
> +/*
> + * Create both a call_rcu thread and the corresponding call_rcu_data
> + * structure, linking the structure in as specified.
> + */
> +
> +void call_rcu_data_init(struct call_rcu_data **crdpp, unsigned long flags)
> +{
> + struct call_rcu_data *crdp;
> +
> + crdp = malloc(sizeof(*crdp));
> + if (crdp == NULL) {
> + fprintf(stderr, "Out of memory.\n");
> + exit(-1);
> + }
> + memset(crdp, '\0', sizeof(*crdp));
> + cds_wfq_init(&crdp->cbs);
> + crdp->qlen = 0;
> + if (pthread_mutex_init(&crdp->mtx, NULL) != 0) {
> + perror("pthread_mutex_init");
> + exit(-1);
> + }
> + if (pthread_cond_init(&crdp->cond, NULL) != 0) {
> + perror("pthread_cond_init");
> + exit(-1);
> + }
> + crdp->flags = flags | URCU_CALL_RCU_RUNNING;
> + cmm_smp_mb(); /* Structure initialized before pointer is planted. */
> + *crdpp = crdp;
> + if (pthread_create(&crdp->tid, NULL, call_rcu_thread, crdp) != 0) {
> + perror("pthread_create");
> + exit(-1);
> + }
> +}
> +
> +/*
> + * Return a pointer to the call_rcu_data structure for the specified
> + * CPU, returning NULL if there is none. We cannot automnatically
automnatically -> automatically
> + * created it because the platform we are running on might not define
> + * sched_getcpu().
> + */
> +
> +struct call_rcu_data *get_cpu_call_rcu_data(int cpu)
> +{
> + if (per_cpu_call_rcu_data == NULL)
> + return NULL;
> + if (cpu < 0 || maxcpus <= cpu)
> + return NULL;
Ah! I think this is where is would make sense to validate that "cpu" is not out
of bounds. If it is, then we could fallback on the default call_rcu thread, or
yell about lack of /proc.
> + return per_cpu_call_rcu_data[cpu];
> +}
> +
> +/*
> + * Return the tid corresponding to the call_rcu thread whose
> + * call_rcu_data structure is specified.
> + */
> +
> +pthread_t get_call_rcu_thread(struct call_rcu_data *crdp)
> +{
> + return crdp->tid;
> +}
> +
> +/*
> + * Create a call_rcu_data structure (with thread) and return a pointer.
> + */
> +
> +struct call_rcu_data *create_call_rcu_data(unsigned long flags)
> +{
> + struct call_rcu_data *crdp;
> +
> + call_rcu_data_init(&crdp, flags);
> + return crdp;
> +}
> +
> +/*
> + * Set the specified CPU to use the specified call_rcu_data structure.
> + */
> +
> +int set_cpu_call_rcu_data(int cpu, struct call_rcu_data *crdp)
> +{
> + call_rcu_lock(&call_rcu_mutex);
> + alloc_cpu_call_rcu_data();
> + call_rcu_unlock(&call_rcu_mutex);
> + if (cpu < 0 || maxcpus <= cpu || per_cpu_call_rcu_data == NULL)
> + return 0;
> + per_cpu_call_rcu_data[cpu] = crdp;
another point where we chould validate that cpu is not out of bound.
> + return 1;
> +}
> +
> +/*
> + * Return a pointer to the default call_rcu_data structure, creating
> + * one if need be. Because we never free call_rcu_data structures,
> + * we don't need to be in an RCU read-side critical section.
> + */
> +
> +struct call_rcu_data *get_default_call_rcu_data(void)
> +{
> + if (default_call_rcu_data != NULL)
> + return rcu_dereference(default_call_rcu_data);
> + call_rcu_lock(&call_rcu_mutex);
> + if (default_call_rcu_data != NULL) {
> + call_rcu_unlock(&call_rcu_mutex);
> + return default_call_rcu_data;
> + }
> + call_rcu_data_init(&default_call_rcu_data, 0);
> + call_rcu_unlock(&call_rcu_mutex);
> + return default_call_rcu_data;
> +}
> +
> +/*
> + * Return the call_rcu_data structure that applies to the currently
> + * running thread. Any call_rcu_data structure assigned specifically
> + * to this thread has first priority, followed by any call_rcu_data
> + * structure assigned to the CPU on which the thread is running,
> + * followed by the default call_rcu_data structure. If there is not
> + * yet a default call_rcu_data structure, one will be created.
> + */
> +struct call_rcu_data *get_call_rcu_data(void)
> +{
> + int curcpu;
> +
> + if (thread_call_rcu_data != NULL)
> + return thread_call_rcu_data;
> + curcpu = sched_getcpu();
> + if (per_cpu_call_rcu_data != NULL &&
> + per_cpu_call_rcu_data[curcpu] != NULL)
Another array bounds validation here ?
> + return per_cpu_call_rcu_data[curcpu];
> + return get_default_call_rcu_data();
> +}
> +
> +/*
> + * Return a pointer to this task's call_rcu_data if there is one.
> + */
> +
> +struct call_rcu_data *get_thread_call_rcu_data(void)
> +{
> + return thread_call_rcu_data;
> +}
> +
> +/*
> + * Set this task's call_rcu_data structure as specified, regardless
> + * of whether or not this task already had one. (This allows switching
> + * to and from real-time call_rcu threads, for example.)
> + */
> +
> +void set_thread_call_rcu_data(struct call_rcu_data *crdp)
> +{
> + thread_call_rcu_data = crdp;
> +}
> +
> +/*
> + * Create a separate call_rcu thread for each CPU. This does not
> + * replace a pre-existing call_rcu thread -- use the set_cpu_call_rcu_data()
> + * function if you want that behavior.
> + */
> +
> +int create_all_cpu_call_rcu_data(unsigned long flags)
> +{
> + int i;
> + struct call_rcu_data *crdp;
> +
> + call_rcu_lock(&call_rcu_mutex);
> + alloc_cpu_call_rcu_data();
> + call_rcu_unlock(&call_rcu_mutex);
> + if (maxcpus == -1 || per_cpu_call_rcu_data == NULL)
> + return 0;
> + for (i = 0; i < maxcpus; i++) {
> + call_rcu_lock(&call_rcu_mutex);
> + if (get_cpu_call_rcu_data(i)) {
> + call_rcu_unlock(&call_rcu_mutex);
> + continue;
> + }
> + crdp = create_call_rcu_data(flags);
Hrm, just so we're clear: on a 8-way system with a kernel supporting up to 32
possible cpus, this would create 32 threads per application. Is it really what
we want ?
For the non-RT case, we might want to investigate using sysconf get "online"
cpus, and fixup dynamically within call_rcu() if we discover that
1) per-cpu rcu_data should be available
2) some data within our per-cpu rcu_data tells us that it has not been allocated
So we could allocate the per-cpu rcu_data array for the number of possible cpus
on the system, but only allocate more expensive ressources (e.g. create threads)
for the number of online cpus, and dynamically spawn threads if needed.
We could leave the code as is for the RT case (allocate threads for each
possible cpus). Thoughts ?
> + if (crdp == NULL) {
> + call_rcu_unlock(&call_rcu_mutex);
> + return 0;
> + }
> + if (!set_cpu_call_rcu_data(i, crdp)) {
> + call_rcu_unlock(&call_rcu_mutex);
> + return 0; /* should not happen, but... */
> + }
> + call_rcu_unlock(&call_rcu_mutex);
> + }
> + return 1;
> +}
> +
> +/*
> + * Schedule a function to be invoked after a following grace period.
> + * This is the only function that must be called -- the others are
> + * only present to allow applications to tune their use of RCU for
> + * maximum performance.
> + *
> + * Note that unless a call_rcu thread has not already been created,
> + * the first invocation of call_rcu() will create one. So, if you
> + * need the first invocation of call_rcu() to be fast, make sure
> + * to create a call_rcu thread first. One way to accomplish this is
> + * "get_call_rcu_data();", and another is create_all_cpu_call_rcu_data().
> + */
> +
> +void call_rcu(struct rcu_head *head,
> + void (*func)(struct rcu_head *head))
> +{
> + struct call_rcu_data *crdp;
> +
> + cds_wfq_node_init(&head->next);
> + head->func = func;
> + crdp = get_call_rcu_data();
> + cds_wfq_enqueue(&crdp->cbs, &head->next);
> + uatomic_inc(&crdp->qlen);
> + if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RT)) {
> + call_rcu_lock(&crdp->mtx);
> + if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RUNNING)) {
> + if (pthread_cond_signal(&crdp->cond) != 0) {
> + perror("pthread_cond_signal");
> + exit(-1);
> + }
> + }
> + call_rcu_unlock(&crdp->mtx);
> + }
> +}
> diff --git a/urcu-call-rcu.h b/urcu-call-rcu.h
> new file mode 100644
> index 0000000..b390b7e
> --- /dev/null
> +++ b/urcu-call-rcu.h
> @@ -0,0 +1,80 @@
> +#ifndef _URCU_CALL_RCU_H
> +#define _URCU_CALL_RCU_H
> +
> +/*
> + * urcu-defer.h
urcu-call-rcu.h
> + *
> + * Userspace RCU header - deferred execution
> + *
> + * Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> + * Copyright (c) 2009 Paul E. McKenney, IBM Corporation.
> + *
> + * LGPL-compatible code should include this header with :
> + *
> + * #define _LGPL_SOURCE
> + * #include <urcu-defer.h>
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, write to the Free Software
> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> + */
> +
> +#include <stdlib.h>
> +#include <pthread.h>
> +
> +#include "urcu/wfqueue.h"
should this header be between < > instead ?
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +/* Note that struct call_rcu_data is opaque to callers. */
> +
> +struct call_rcu_data;
> +
> +/* Flag values. */
> +
> +#define URCU_CALL_RCU_RT 0x1
> +#define URCU_CALL_RCU_RUNNING 0x2
> +
> +/*
> + * The rcu_head data structure is placed in the structure to be freed
> + * via call_rcu().
> + */
> +
> +struct rcu_head {
> + struct cds_wfq_node next;
> + void (*func)(struct rcu_head *head);
> +};
> +
> +/*
> + * Exported functions
> + */
> +void call_rcu_data_init(struct call_rcu_data **crdpp, unsigned long flags);
> +struct call_rcu_data *get_cpu_call_rcu_data(int cpu);
> +pthread_t get_call_rcu_thread(struct call_rcu_data *crdp);
> +struct call_rcu_data *create_call_rcu_data(unsigned long flags);
> +int set_cpu_call_rcu_data(int cpu, struct call_rcu_data *crdp);
> +struct call_rcu_data *get_default_call_rcu_data(void);
> +struct call_rcu_data *get_call_rcu_data(void);
> +struct call_rcu_data *get_thread_call_rcu_data(void);
> +void set_thread_call_rcu_data(struct call_rcu_data *crdp);
> +int create_all_cpu_call_rcu_data(unsigned long flags);
> +void call_rcu(struct rcu_head *head,
> + void (*func)(struct rcu_head *head));
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _URCU_CALL_RCU_H */
> diff --git a/urcu-defer.h b/urcu-defer.h
> index e161616..a64c75c 100644
> --- a/urcu-defer.h
> +++ b/urcu-defer.h
> @@ -53,14 +53,6 @@ extern "C" {
> extern void defer_rcu(void (*fct)(void *p), void *p);
>
> /*
> - * call_rcu will eventually be implemented with an API similar to the Linux
> - * kernel call_rcu(), which will allow its use within RCU read-side C.S.
> - * Generate an error if used for now.
> - */
> -
> -#define call_rcu __error_call_rcu_not_implemented_please_use_defer_rcu
> -
> -/*
> * Thread registration for reclamation.
> */
> extern void rcu_defer_register_thread(void);
> diff --git a/urcu/wfqueue-static.h b/urcu/wfqueue-static.h
> index 30d6e96..a989f28 100644
> --- a/urcu/wfqueue-static.h
> +++ b/urcu/wfqueue-static.h
> @@ -47,11 +47,17 @@ extern "C" {
> #define WFQ_ADAPT_ATTEMPTS 10 /* Retry if being set */
> #define WFQ_WAIT 10 /* Wait 10 ms if being set */
>
> +#ifdef _LGPL_SOURCE
> +static inline
> +#endif /* #ifdef _LGPL_SOURCE */
We should actually declare all these as "static inline" in every case, and
create a little .c file in the urcu root to contain the wrapper stub library
apps could link to. This would follow the same layout I've used for all the
other code.
Thanks,
Mathieu
> void _cds_wfq_node_init(struct cds_wfq_node *node)
> {
> node->next = NULL;
> }
>
> +#ifdef _LGPL_SOURCE
> +static inline
> +#endif /* #ifdef _LGPL_SOURCE */
> void _cds_wfq_init(struct cds_wfq_queue *q)
> {
> int ret;
> @@ -64,6 +70,9 @@ void _cds_wfq_init(struct cds_wfq_queue *q)
> assert(!ret);
> }
>
> +#ifdef _LGPL_SOURCE
> +static inline
> +#endif /* #ifdef _LGPL_SOURCE */
> void _cds_wfq_enqueue(struct cds_wfq_queue *q, struct cds_wfq_node *node)
> {
> struct cds_wfq_node **old_tail;
> @@ -90,6 +99,9 @@ void _cds_wfq_enqueue(struct cds_wfq_queue *q, struct cds_wfq_node *node)
> * thread to be scheduled. The queue appears empty until tail->next is set by
> * enqueue.
> */
> +#ifdef _LGPL_SOURCE
> +static inline
> +#endif /* #ifdef _LGPL_SOURCE */
> struct cds_wfq_node *
> ___cds_wfq_dequeue_blocking(struct cds_wfq_queue *q)
> {
> @@ -128,6 +140,9 @@ ___cds_wfq_dequeue_blocking(struct cds_wfq_queue *q)
> return node;
> }
>
> +#ifdef _LGPL_SOURCE
> +static inline
> +#endif /* #ifdef _LGPL_SOURCE */
> struct cds_wfq_node *
> _cds_wfq_dequeue_blocking(struct cds_wfq_queue *q)
> {
--
Mathieu Desnoyers
Operating System Efficiency R&D Consultant
EfficiOS Inc.
http://www.efficios.com
More information about the lttng-dev
mailing list