[ltt-dev] [PATCH RFC] call_rcu() interface for userspace-rcu

Mathieu Desnoyers mathieu.desnoyers at efficios.com
Fri Oct 29 15:58:34 EDT 2010


* Paul E. McKenney (paulmck at linux.vnet.ibm.com) wrote:

> #include <stdio.h>
> #include <pthread.h>
> #include <signal.h>
> #include <assert.h>
> #include <stdlib.h>
> #include <string.h>
> #include <errno.h>
> #include <poll.h>
> #include <sys/time.h>
> #include <syscall.h>
> #include <unistd.h>
> 
> #include "config.h"

Please add #define _LGPL_SOURCE to use the call-less version of wfqueue.
This is LGPL-compatible code after all, no ? :-)

> #include "urcu/wfqueue.h"
> #include "urcu-call-rcu.h"
> #include "urcu-pointer.h"


[...]

And the rest, after a good meal...

> +/* Data structure that identifies a call_rcu thread. */
> +
> +struct call_rcu_data {
> +	struct wfq_queue cbs;
> +	unsigned long qlen;
> +	pthread_t tid;

I'd recommend moving "tid" down to the last entry in the structure, because it
seems to be the only element not used on the call_rcu() fast path. We might get
lucky and fit all accesses in a single cache line.

Arguably, moving qlen too, as the next to last field, might be good, as it is
not "technically" required. (although I agree that we might want to leave it
there for debugging purposes)

We should also make sure struct call_rcu_data is cache-line aligned. The reason
is twofold: minimize the number of cache lines kept hot, and also making sure
that the per-cpu structures allocated don't end up doing false sharing.

> +	pthread_mutex_t mtx;
> +	pthread_cond_t cond;
> +	unsigned long flags;

Some reorganising might be good for the fields above. Here is the order I
recommend:

struct call_rcu_data {
  struct wfq_queue cbs;
  unsigned long flags;
  pthread_mutex_t mtx;
  pthread_cont_t cond;
  unsigned long qlen;
  pthread_t tid;
} __attribute__((aligned(CACHE_LINE_SIZE)));

This would match the order in which call_rcu accesses them.

By the way, for update-heavy workloads, I foresee that both
rcu_{assign,xchg}_pointer and call_rcu() will be the performance bottleneck.
This is partly why I am reluctant to keep the "qlen" field in non-debug configs.

> +};
> +
> +/* Link a thread using call_rcu() to its call_rcu thread. */
> +
> +static __thread struct call_rcu_data *my_call_rcu_data;

I would replace the identifier "my_" with "thread_" here. For some reason, I
tend to reserve the "my_" namespace for my old CS 101 projects. ;-)

> +
> +/* Guard call_rcu thread creation. */
> +
> +static pthread_mutex_t call_rcu_mutex = PTHREAD_MUTEX_INITIALIZER;
> +
> +/* If a given thread does not have its own call_rcu thread, this is default. */
> +
> +static struct call_rcu_data *default_call_rcu_data;
> +
> +extern void synchronize_rcu(void);
> +
> +/*
> + * If the sched_getcpu() and sysconf(_SC_NPROCESSORS_CONF) calls are
> + * available, then we can have call_rcu threads assigned to individual
> + * CPUs rather than only to specific threads.
> + */
> +
> +#if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF)
> +
> +/*
> + * Pointer to array of pointers to per-CPU call_rcu_data structures
> + * and # CPUs.
> + */
> +
> +static struct call_rcu_data **per_cpu_call_rcu_data;
> +static long maxcpus;
> +
> +/* Allocate the array if it has not already been allocated. */
> +
> +static void alloc_cpu_call_rcu_data(void)
> +{
> +	struct call_rcu_data **p;
> +
> +	if (maxcpus != 0)
> +		return;
> +	maxcpus = sysconf(_SC_NPROCESSORS_CONF);
> +	if (maxcpus == -1)
> +		return;
> +	p = malloc(maxcpus * sizeof(*per_cpu_call_rcu_data));

Why are we allocating an array of pointers rather than an array of struct
per_cpu_call_rcu_data ?

> +	if (p != NULL)
> +		memset(p, '\0', maxcpus * sizeof(*per_cpu_call_rcu_data));

We probably need to test for errors and return an error here if allocation
fails. The callers should be updated accordingly.

> +}
> +
> +#else /* #if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF) */
> +
> +static const struct call_rcu_data **per_cpu_call_rcu_data = NULL;
> +static const long maxcpus = -1;
> +
> +static void alloc_cpu_call_rcu_data(void)
> +{
> +}
> +
> +static int sched_getcpu(void)
> +{
> +	return -1;
> +}
> +
> +#endif /* #else #if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF) */
> +
> +/* Acquire the specified pthread mutex. */
> +
> +static void call_rcu_lock(pthread_mutex_t *pmp)
> +{
> +	if (pthread_mutex_lock(pmp) != 0) {
> +		perror("pthread_mutex_lock");
> +		exit(-1);
> +	}
> +}
> +
> +/* Release the specified pthread mutex. */
> +
> +static void call_rcu_unlock(pthread_mutex_t *pmp)
> +{
> +	if (pthread_mutex_unlock(pmp) != 0) {
> +		perror("pthread_mutex_unlock");
> +		exit(-1);
> +	}
> +}
> +
> +/* This is the code run by each call_rcu thread. */
> +
> +static void *call_rcu_thread(void *arg)
> +{
> +	unsigned long cbcount;
> +	struct wfq_node *cbs;
> +	struct wfq_node **cbs_tail;
> +	struct call_rcu_data *crdp = (struct call_rcu_data *)arg;
> +	struct rcu_head *rhp;
> +
> +	my_call_rcu_data = crdp;
> +	for (;;) {
> +		if (&crdp->cbs.head != crdp->cbs.tail) {

We should use LOAD_SHARED() and STORE_SHARED() for cbs.head and cbs.tail
accesses here and below.

> +			while ((cbs = crdp->cbs.head) == NULL)

LOAD_SHARED.

> +				poll(NULL, 0, 1);
> +			crdp->cbs.head = NULL;

STORE_SHARED

> +			crdp->cbs.head = NULL;

Cut n paste duplicate.

> +			cbs_tail = (struct wfq_node **)
> +				uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head);

I'm not sure why you use an xchg() here. ___wfq_dequeue_blocking() does it with
a simple read followed by an update. This code should probably be closer to the
wfqueue implementation, except if you have a good reason for it of course.

> +			synchronize_rcu();
> +			cbcount = 0;
> +			do {
> +				while (cbs->next == NULL &&
> +				       &cbs->next != cbs_tail)
> +				       	poll(NULL, 0, 1);

For this poll, we could do something like what urcu/wfqueue-static.h is doing:
an adaptative busy-loop (with cpu_relax()), and sleeping only after a couple of
iterations.

> +				if (cbs == &crdp->cbs.dummy) {
> +					cbs = cbs->next;
> +					continue;
> +				}
> +				rhp = (struct rcu_head *)cbs;
> +				cbs = cbs->next;
> +				rhp->func(rhp);
> +				cbcount++;
> +			} while (cbs != NULL);
> +			uatomic_sub(&crdp->qlen, cbcount);
> +		}
> +		if (crdp->flags & URCU_CALL_RCU_RT)
> +			poll(NULL, 0, 10);
> +		else {
> +			call_rcu_lock(&crdp->mtx);
> +			crdp->flags &= ~URCU_CALL_RCU_RUNNING;

STORE_SHARED is needed with flags too.

> +			if (&cbs->next != cbs_tail &&
> +			    pthread_cond_wait(&crdp->cond, &crdp->mtx) != 0) {
> +				perror("pthread_cond_wait");
> +				exit(-1);
> +			} else
> +				poll(NULL, 0, 10);
> +			crdp->flags |= URCU_CALL_RCU_RUNNING;

Same.

> +			call_rcu_unlock(&crdp->mtx);
> +		}
> +	}
> +	return NULL;  /* NOTREACHED */
> +}
> +
> +/*
> + * Create both a call_rcu thread and the corresponding call_rcu_data
> + * structure, linking the structure in as specified.
> + */
> +
> +void call_rcu_data_init(struct call_rcu_data **crdpp, unsigned long flags)
> +{
> +	struct call_rcu_data *crdp;
> +
> +	crdp = malloc(sizeof(*crdp));
> +	if (crdp == NULL) {
> +		fprintf(stderr, "Out of memory.\n");
> +		exit(-1);
> +	}
> +	memset(crdp, '\0', sizeof(*crdp));
> +	wfq_init(&crdp->cbs);
> +	crdp->qlen = 0;
> +	if (pthread_mutex_init(&crdp->mtx, NULL) != 0) {
> +		perror("pthread_mutex_init");
> +		exit(-1);
> +	}
> +	if (pthread_cond_init(&crdp->cond, NULL) != 0) {
> +		perror("pthread_cond_init");
> +		exit(-1);
> +	}
> +	crdp->flags = flags | URCU_CALL_RCU_RUNNING;
> +	mb();  /* Structure initialized before pointer is planted. */
> +	*crdpp = crdp;
> +	if (pthread_create(&crdp->tid, NULL, call_rcu_thread, crdp) != 0) {
> +		perror("pthread_create");
> +		exit(-1);
> +	}
> +}
> +
> +/*
> + * Return a pointer to the call_rcu_data structure for the specified
> + * CPU, returning NULL if there is none.  We cannot automnatically
> + * created it because the platform we are running on might not define
> + * sched_getcpu().
> + */
> +
> +struct call_rcu_data *get_cpu_call_rcu_data(int cpu)
> +{
> +	if (per_cpu_call_rcu_data == NULL)
> +		return NULL;
> +	if (cpu < 0 || maxcpus <= cpu)
> +		return NULL;
> +	return per_cpu_call_rcu_data[cpu];
> +}
> +
> +/*
> + * Return the tid corresponding to the call_rcu thread whose
> + * call_rcu_data structure is specified.
> + */
> +
> +pthread_t get_call_rcu_thread(struct call_rcu_data *crdp)
> +{
> +	return crdp->tid;
> +}
> +
> +/*
> + * Create a call_rcu_data structure (with thread) and return a pointer.
> + */
> +
> +struct call_rcu_data *create_call_rcu_data(unsigned long flags)
> +{
> +	struct call_rcu_data *crdp;
> +
> +	call_rcu_data_init(&crdp, flags);
> +	return crdp;
> +}
> +
> +/*
> + * Set the specified CPU to use the specified call_rcu_data structure.
> + */
> +
> +int set_cpu_call_rcu_data(int cpu, struct call_rcu_data *crdp)
> +{
> +	call_rcu_lock(&call_rcu_mutex);
> +	alloc_cpu_call_rcu_data();
> +	call_rcu_unlock(&call_rcu_mutex);
> +	if (cpu < 0 || maxcpus <= cpu)
> +		return 0;
> +	per_cpu_call_rcu_data[cpu] = crdp;
> +	return 1;
> +}
> +
> +/*
> + * Return a pointer to the default call_rcu_data structure, creating
> + * one if need be.
> + */
> +
> +struct call_rcu_data *get_default_call_rcu_data(void)
> +{
> +	if (default_call_rcu_data != NULL)
> +		return rcu_dereference(default_call_rcu_data);

Yep, this makes sense given that default_call_rcu_data is never freed. It should
probably be documented (we don't necessarily hold an rcu read lock here).

> +	call_rcu_lock(&call_rcu_mutex);
> +	if (default_call_rcu_data != NULL) {
> +		call_rcu_unlock(&call_rcu_mutex);
> +		return default_call_rcu_data;
> +	}
> +	call_rcu_data_init(&default_call_rcu_data, 0);
> +	call_rcu_unlock(&call_rcu_mutex);
> +	return default_call_rcu_data;
> +}
> +
> +/*
> + * Return the call_rcu_data structure that applies to the currently
> + * running thread.  Any call_rcu_data structure assigned specifically
> + * to this thread has first priority, followed by any call_rcu_data
> + * structure assigned to the CPU on which the thread is running,
> + * followed by the default call_rcu_data structure.  If there is not
> + * yet a default call_rcu_data structure, one will be created.
> + */

We should document (somewhere) that if call_rcu() is called without previously
creating the handler threads, we are taking mutexes and creating the threads on
the caller stack.

> +struct call_rcu_data *get_call_rcu_data(void)
> +{
> +	int curcpu;
> +
> +	if (my_call_rcu_data != NULL)
> +		return my_call_rcu_data;
> +	curcpu = sched_getcpu();
> +	if (per_cpu_call_rcu_data != NULL &&
> +	    per_cpu_call_rcu_data[curcpu] != NULL)
> +	    	return per_cpu_call_rcu_data[curcpu];
> +	return get_default_call_rcu_data();
> +}
> +
> +/*
> + * Return a pointer to this task's call_rcu_data if there is one.
> + */
> +
> +struct call_rcu_data *get_my_call_rcu_data(void)
> +{
> +	return my_call_rcu_data;
> +}
> +
> +/*
> + * Set this task's call_rcu_data structure as specified, regardless
> + * of whether or not this task already had one.  (This allows switching
> + * to and from real-time call_rcu threads, for example.)
> + */
> +
> +void set_my_call_rcu_data(struct call_rcu_data *crdp)
> +{
> +	my_call_rcu_data = crdp;
> +}
> +
> +/*
> + * Create a separate call_rcu thread for each CPU.  This does not
> + * replace a pre-existing call_rcu thread -- use the set_cpu_call_rcu_data()
> + * function if you want that behavior.
> + */
> +
> +int create_all_cpu_call_rcu_data(unsigned long flags)
> +{
> +	int i;
> +	struct call_rcu_data *crdp;
> +
> +	call_rcu_lock(&call_rcu_mutex);
> +	alloc_cpu_call_rcu_data();
> +	call_rcu_unlock(&call_rcu_mutex);
> +	if (maxcpus == -1)
> +		return 0;
> +	for (i = 0; i < maxcpus; i++) {
> +		call_rcu_lock(&call_rcu_mutex);
> +		if (get_cpu_call_rcu_data(i)) {
> +			call_rcu_unlock(&call_rcu_mutex);
> +			continue;
> +		}
> +		crdp = create_call_rcu_data(flags);
> +		if (crdp == NULL) {
> +			call_rcu_unlock(&call_rcu_mutex);
> +			return 0;
> +		}
> +		if (!set_cpu_call_rcu_data(i, crdp)) {
> +			call_rcu_unlock(&call_rcu_mutex);
> +			return 0; /* should not happen, but... */
> +		}
> +		call_rcu_unlock(&call_rcu_mutex);
> +	}
> +	return 1;
> +}
> +
> +/*
> + * Schedule a function to be invoked after a following grace period.
> + * This is the only function that must be called -- the others are
> + * only present to allow applications to tune their use of RCU for
> + * maximum performance.
> + */
> +
> +void call_rcu(struct rcu_head *head,
> +	      void (*func)(struct rcu_head *head))
> +{
> +	struct call_rcu_data *crdp;
> +
> +	wfq_node_init(&head->next);
> +	head->func = func;
> +	crdp = get_call_rcu_data();
> +	wfq_enqueue(&crdp->cbs, &head->next);
> +	uatomic_inc(&crdp->qlen);
> +	if (!(crdp->flags & URCU_CALL_RCU_RT)) {
> +		call_rcu_lock(&crdp->mtx);
> +		if (!(crdp->flags & URCU_CALL_RCU_RUNNING)) {

LOAD_SHARED here.

I would recommend to remove the "URCU_CALL_RCU_RT" awareness outside of the
call_rcu infrastructure and make this general to the whole urcu library.
Currently, compat_futex_noasync provided by urcu/urcu-futex.h does use futexes
directly, and falls back on pthread_mutex if the futex syscall is unavailable.

The kind of tweak provided by "RT" mode should also apply to, e.g. urcu read
unlock wakeup path.

One advantage of going with the futex scheme I implemented is that the wakeup
path becomes a simple test (without mutex) on

  crdp->flags & URCU_CALL_RCU_RUNNING

and the futex_noasync() call will re-check that the value has not changed. So
basically, the fast path that does not need to send the wakeup does not get the
mutex performance hit: it's a simple read.

You should do something close to urcu-static.h wake_up_gp() and wait_gp().

> +			if (pthread_cond_signal(&crdp->cond) != 0) {
> +				perror("pthread_cond_signal");
> +				exit(-1);
> +			}
> +		}
> +		call_rcu_unlock(&crdp->mtx);
> +	}
> +}
> diff --git a/urcu-call-rcu.h b/urcu-call-rcu.h
> new file mode 100644
> index 0000000..c6886e2
> --- /dev/null
> +++ b/urcu-call-rcu.h
> @@ -0,0 +1,80 @@
> +#ifndef _URCU_CALL_RCU_H
> +#define _URCU_CALL_RCU_H
> +
> +/*
> + * urcu-defer.h
> + *
> + * Userspace RCU header - deferred execution
> + *
> + * Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> + * Copyright (c) 2009 Paul E. McKenney, IBM Corporation.
> + *
> + * LGPL-compatible code should include this header with :
> + *
> + * #define _LGPL_SOURCE
> + * #include <urcu-defer.h>
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, write to the Free Software
> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> + */
> +
> +#include <stdlib.h>
> +#include <pthread.h>
> +
> +#include "urcu/wfqueue.h"
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +/* Note that struct call_rcu_data is opaque to callers. */
> +
> +struct call_rcu_data;
> +
> +/* Flag values. */
> +
> +#define URCU_CALL_RCU_RT	0x1
> +#define URCU_CALL_RCU_RUNNING	0x2
> +
> +/*
> + * The rcu_head data structure is placed in the structure to be freed
> + * via call_rcu().
> + */
> +
> +struct rcu_head {
> +	struct wfq_node next;
> +	void (*func)(struct rcu_head *head);
> +};
> +
> +/*
> + * Exported functions
> + */
> +void call_rcu_data_init(struct call_rcu_data **crdpp, unsigned long flags);
> +struct call_rcu_data *get_cpu_call_rcu_data(int cpu);
> +pthread_t get_call_rcu_thread(struct call_rcu_data *crdp);
> +struct call_rcu_data *create_call_rcu_data(unsigned long flags);
> +int set_cpu_call_rcu_data(int cpu, struct call_rcu_data *crdp);
> +struct call_rcu_data *get_default_call_rcu_data(void);
> +struct call_rcu_data *get_call_rcu_data(void);
> +struct call_rcu_data *get_my_call_rcu_data(void);
> +void set_my_call_rcu_data(struct call_rcu_data *crdp);
> +int create_all_cpu_call_rcu_data(unsigned long flags);
> +void call_rcu(struct rcu_head *head,
> +	      void (*func)(struct rcu_head *head));
> +
> +#ifdef __cplusplus 
> +}
> +#endif
> +
> +#endif /* _URCU_CALL_RCU_H */
> diff --git a/urcu-defer.h b/urcu-defer.h
> index e161616..a64c75c 100644
> --- a/urcu-defer.h
> +++ b/urcu-defer.h
> @@ -53,14 +53,6 @@ extern "C" {
>  extern void defer_rcu(void (*fct)(void *p), void *p);
>  
>  /*
> - * call_rcu will eventually be implemented with an API similar to the Linux
> - * kernel call_rcu(), which will allow its use within RCU read-side C.S.
> - * Generate an error if used for now.
> - */
> -
> -#define call_rcu	__error_call_rcu_not_implemented_please_use_defer_rcu
> -
> -/*
>   * Thread registration for reclamation.
>   */
>  extern void rcu_defer_register_thread(void);
> diff --git a/urcu/wfqueue-static.h b/urcu/wfqueue-static.h
> index 0f7e68f..eba8cc2 100644
> --- a/urcu/wfqueue-static.h
> +++ b/urcu/wfqueue-static.h
> @@ -47,11 +47,17 @@ extern "C" {
>  #define WFQ_ADAPT_ATTEMPTS		10	/* Retry if being set */
>  #define WFQ_WAIT			10	/* Wait 10 ms if being set */
>  
> +#ifdef _LGPL_SOURCE
> +static inline
> +#endif /* #ifdef _LGPL_SOURCE */

The fixups below should no be needed. Just define _LGPL_SOURCE before including
wfqueue.h and everything should work. If not, I need to fix it at the wfqueue
level.

Thanks,

Mathieu

>  void _wfq_node_init(struct wfq_node *node)
>  {
>  	node->next = NULL;
>  }
>  
> +#ifdef _LGPL_SOURCE
> +static inline
> +#endif /* #ifdef _LGPL_SOURCE */
>  void _wfq_init(struct wfq_queue *q)
>  {
>  	int ret;
> @@ -64,6 +70,9 @@ void _wfq_init(struct wfq_queue *q)
>  	assert(!ret);
>  }
>  
> +#ifdef _LGPL_SOURCE
> +static inline
> +#endif /* #ifdef _LGPL_SOURCE */
>  void _wfq_enqueue(struct wfq_queue *q, struct wfq_node *node)
>  {
>  	struct wfq_node **old_tail;
> @@ -90,6 +99,9 @@ void _wfq_enqueue(struct wfq_queue *q, struct wfq_node *node)
>   * thread to be scheduled. The queue appears empty until tail->next is set by
>   * enqueue.
>   */
> +#ifdef _LGPL_SOURCE
> +static inline
> +#endif /* #ifdef _LGPL_SOURCE */
>  struct wfq_node *
>  ___wfq_dequeue_blocking(struct wfq_queue *q)
>  {
> @@ -128,6 +140,9 @@ ___wfq_dequeue_blocking(struct wfq_queue *q)
>  	return node;
>  }
>  
> +#ifdef _LGPL_SOURCE
> +static inline
> +#endif /* #ifdef _LGPL_SOURCE */
>  struct wfq_node *
>  _wfq_dequeue_blocking(struct wfq_queue *q)
>  {

-- 
Mathieu Desnoyers
Operating System Efficiency R&D Consultant
EfficiOS Inc.
http://www.efficios.com




More information about the lttng-dev mailing list