[ltt-dev] [RFC git tree] Userspace RCU (urcu) for Linux (repost)
Paul E. McKenney
paulmck at linux.vnet.ibm.com
Sun Feb 8 19:24:26 EST 2009
On Sun, Feb 08, 2009 at 02:36:06PM -0800, Paul E. McKenney wrote:
> On Sun, Feb 08, 2009 at 04:46:10PM -0500, Mathieu Desnoyers wrote:
[ . . . ]
> > I ran your modified version within my benchmarks :
> >
> > with return value : 14.164 cycles per read
> > without return value : 16.4017 cycles per read
> >
> > So we have a 14% performance decrease due to this. We also pollute the
> > branch prediction buffer and we add a cache access due to the added
> > variables in the TLS. Returning the value has the clear advantage of
> > letting the compiler keep it around in registers or on the stack, which
> > clearly costs less.
> >
> > So I think the speed factor outweights the visual considerations. Maybe
> > we could switch to something like :
> >
> > unsigned int qparity;
> >
> > urcu_read_lock(&qparity);
> > ...
> > urcu_read_unlock(&qparity);
> >
> > That would be a bit like local_irq_save() in the kernel, except that we
> > could do it in a static inline because we pass the address. I
> > personnally dislike the local_irq_save() way of hiding the fact that it
> > writes to the variable in a "clever" macro. I'd really prefer to leave
> > the " & ".
> >
> > What is your opinion ?
>
> My current opinion is that I can avoid the overflow problem and the
> need to recheck, which might get rid of the need for both arguments
> and return values while still maintaining good performance. The trick
> is to use only the topmost bit for the grace-period counter, and all
> the rest of the bits for nesting. That way, no matter what value of
> global counter one picks up, it will be waited for (since there are but
> two values that the global counter takes on).
>
> But just now coding it, so will see if it actually works.
Seems to work, and seems to be pretty fast on my machine, anyway.
This one adapts itself to 32- and 64-bit machines, though almost
all of the code is common. It does do a check, but avoids array
indexing, arguments, and return values.
How does it do on your hardware?
Signed-off-by: Paul E. McKenney <paulmck at linux.vnet.ibm.com>
---
test_urcu.c | 6 +++---
test_urcu_timing.c | 6 +++---
urcu.c | 23 ++++++++++-------------
urcu.h | 42 +++++++++++++++++++++++++++++-------------
4 files changed, 45 insertions(+), 32 deletions(-)
diff --git a/test_urcu.c b/test_urcu.c
index f6be45b..f115a4a 100644
--- a/test_urcu.c
+++ b/test_urcu.c
@@ -72,7 +72,7 @@ void rcu_copy_mutex_unlock(void)
void *thr_reader(void *arg)
{
- int qparity, i, j;
+ int i, j;
struct test_array *local_ptr;
printf("thread %s, thread id : %lx, tid %lu\n",
@@ -83,14 +83,14 @@ void *thr_reader(void *arg)
for (i = 0; i < 100000; i++) {
for (j = 0; j < 100000000; j++) {
- rcu_read_lock(&qparity);
+ rcu_read_lock();
local_ptr = rcu_dereference(test_rcu_pointer);
if (local_ptr) {
assert(local_ptr->a == 8);
assert(local_ptr->b == 12);
assert(local_ptr->c[55] == 2);
}
- rcu_read_unlock(&qparity);
+ rcu_read_unlock();
}
}
diff --git a/test_urcu_timing.c b/test_urcu_timing.c
index 57fda4f..9903705 100644
--- a/test_urcu_timing.c
+++ b/test_urcu_timing.c
@@ -94,7 +94,7 @@ static cycles_t reader_time[NR_READ] __attribute__((aligned(128)));
void *thr_reader(void *arg)
{
- int qparity, i, j;
+ int i, j;
struct test_array *local_ptr;
cycles_t time1, time2;
@@ -107,12 +107,12 @@ void *thr_reader(void *arg)
time1 = get_cycles();
for (i = 0; i < OUTER_READ_LOOP; i++) {
for (j = 0; j < INNER_READ_LOOP; j++) {
- rcu_read_lock(&qparity);
+ rcu_read_lock();
local_ptr = rcu_dereference(test_rcu_pointer);
if (local_ptr) {
assert(local_ptr->a == 8);
}
- rcu_read_unlock(&qparity);
+ rcu_read_unlock();
}
}
time2 = get_cycles();
diff --git a/urcu.c b/urcu.c
index 08fb75d..2914b66 100644
--- a/urcu.c
+++ b/urcu.c
@@ -19,17 +19,17 @@
pthread_mutex_t urcu_mutex = PTHREAD_MUTEX_INITIALIZER;
-/* Global quiescent period parity */
-int urcu_qparity;
+/* Global grace period counter */
+long urcu_gp_ctr;
-int __thread urcu_active_readers[2];
+long __thread urcu_active_readers;
/* Thread IDs of registered readers */
#define INIT_NUM_THREADS 4
struct reader_data {
pthread_t tid;
- int *urcu_active_readers;
+ long *urcu_active_readers;
};
static struct reader_data *reader_data;
@@ -60,11 +60,9 @@ void internal_urcu_unlock(void)
/*
* called with urcu_mutex held.
*/
-static int switch_next_urcu_qparity(void)
+static void switch_next_urcu_qparity(void)
{
- int old_parity = urcu_qparity;
- urcu_qparity = 1 - old_parity;
- return old_parity;
+ urcu_gp_ctr += RCU_GP_CTR_BOTTOM_BIT;
}
static void force_mb_all_threads(void)
@@ -89,7 +87,7 @@ static void force_mb_all_threads(void)
mb(); /* read sig_done before ending the barrier */
}
-void wait_for_quiescent_state(int parity)
+void wait_for_quiescent_state(void)
{
struct reader_data *index;
@@ -101,7 +99,7 @@ void wait_for_quiescent_state(int parity)
/*
* BUSY-LOOP.
*/
- while (index->urcu_active_readers[parity] != 0)
+ while (rcu_old_gp_ongoing(index->urcu_active_readers))
barrier();
}
/*
@@ -115,17 +113,16 @@ void wait_for_quiescent_state(int parity)
static void switch_qparity(void)
{
- int prev_parity;
/* All threads should read qparity before accessing data structure. */
/* Write ptr before changing the qparity */
force_mb_all_threads();
- prev_parity = switch_next_urcu_qparity();
+ switch_next_urcu_qparity();
/*
* Wait for previous parity to be empty of readers.
*/
- wait_for_quiescent_state(prev_parity);
+ wait_for_quiescent_state();
}
void synchronize_rcu(void)
diff --git a/urcu.h b/urcu.h
index b6b5c7b..e83c69f 100644
--- a/urcu.h
+++ b/urcu.h
@@ -66,23 +66,39 @@ static inline void atomic_inc(int *v)
#define SIGURCU SIGUSR1
-/* Global quiescent period parity */
-extern int urcu_qparity;
+#define RCU_GP_CTR_BOTTOM_BIT (sizeof(long) == 4 ? 0x80000000 : 0x100L)
+#define RCU_GP_CTR_NEST_MASK (RCU_GP_CTR_BOTTOM_BIT - 1)
-extern int __thread urcu_active_readers[2];
+/* Global quiescent period counter with low-order bits unused. */
+extern long urcu_gp_ctr;
-static inline int get_urcu_qparity(void)
+extern long __thread urcu_active_readers;
+
+static inline int rcu_old_gp_ongoing(long *value)
{
- return urcu_qparity;
+ long v;
+
+ if (value == NULL)
+ return 0;
+ v = ACCESS_ONCE(*value);
+ if (sizeof(long) == 4) {
+ return (v & RCU_GP_CTR_NEST_MASK) &&
+ ((v ^ ACCESS_ONCE(urcu_gp_ctr)) & ~RCU_GP_CTR_NEST_MASK);
+ } else {
+ return (v & RCU_GP_CTR_NEST_MASK) &&
+ (v - ACCESS_ONCE(urcu_gp_ctr) < 0);
+ }
}
-/*
- * urcu_parity should be declared on the caller's stack.
- */
-static inline void rcu_read_lock(int *urcu_parity)
+static inline void rcu_read_lock(void)
{
- *urcu_parity = get_urcu_qparity();
- urcu_active_readers[*urcu_parity]++;
+ long tmp;
+
+ tmp = urcu_active_readers;
+ if ((tmp & RCU_GP_CTR_NEST_MASK) == 0)
+ urcu_active_readers = urcu_gp_ctr + 1;
+ else
+ urcu_active_readers = tmp + 1;
/*
* Increment active readers count before accessing the pointer.
* See force_mb_all_threads().
@@ -90,14 +106,14 @@ static inline void rcu_read_lock(int *urcu_parity)
barrier();
}
-static inline void rcu_read_unlock(int *urcu_parity)
+static inline void rcu_read_unlock(void)
{
barrier();
/*
* Finish using rcu before decrementing the pointer.
* See force_mb_all_threads().
*/
- urcu_active_readers[*urcu_parity]--;
+ urcu_active_readers--;
}
extern void *urcu_publish_content(void **ptr, void *new);
More information about the lttng-dev
mailing list