[ltt-dev] [PATCH 08/10] call_rcu: redo futex implementation
Paolo Bonzini
pbonzini at redhat.com
Thu Jun 9 02:54:01 EDT 2011
On 06/09/2011 12:42 AM, Mathieu Desnoyers wrote:
> I'm tempted to just leave the mutex in place for now until we see a
> clearly measurable performance gain by going with the "or/and" flag
> approach compared to the simpler (in terms of memory ordering) mutex
> implementation.
I don't think the memory ordering guarantees are much different:
URCU_CALL_RCU_STOP is always written before a futex system call, and
URCU_CALL_RCU_STOPPED is always written as the last thing before the
thread exits. In both cases there is no need for a memory barrier. So,
the version with atomic accesses seems simpler to me, and I generally
like patches that remove more lines than they add. Also, the mutex
in code that strives to be lockless (in the RT case) seemed out of
place...
One thing is that the RT/non-RT flag is not modifiable. So perhaps it is
better to move it to a separate variable and limit the uatomic_read noise.
See the attached patch.
> As far as the or/and implementations you provide in separate patches are
> concerned, I'm tempted to pull them for completeness sake (they might
> end up being useful). Before I do that though, I'm just curious about
> the memory ordering guarantees they should provide: no-ordering seems
> like a good thing for or/and: it would be similar to add/sub.
Yes, I agree.
Paolo
------------------ 8< --------------------------
>From 9eb4f2821f212705a72ea38568b2a6157665b203 Mon Sep 17 00:00:00 2001
From: Paolo Bonzini <pbonzini at redhat.com>
Date: Wed, 8 Jun 2011 09:33:51 +0200
Subject: [PATCH] call_rcu: drop mutex
The mutex is being used only to protect OR accesses to the flags.
Just use atomic operations for that.
Signed-off-by: Paolo Bonzini <pbonzini at redhat.com>
---
urcu-call-rcu-impl.h | 28 ++++++++++------------------
1 files changed, 10 insertions(+), 18 deletions(-)
diff --git a/urcu-call-rcu-impl.h b/urcu-call-rcu-impl.h
index 9beb58c..82fec80 100644
--- a/urcu-call-rcu-impl.h
+++ b/urcu-call-rcu-impl.h
@@ -46,7 +46,6 @@
struct call_rcu_data {
struct cds_wfq_queue cbs;
unsigned long flags;
- pthread_mutex_t mtx;
int futex;
unsigned long qlen; /* maintained for debugging. */
pthread_t tid;
@@ -204,6 +203,7 @@ static void *call_rcu_thread(void *arg)
struct cds_wfq_node **cbs_tail;
struct call_rcu_data *crdp = (struct call_rcu_data *)arg;
struct rcu_head *rhp;
+ int rt = !!(uatomic_read(&crdp->flags) & URCU_CALL_RCU_RT);
if (set_thread_cpu_affinity(crdp) != 0) {
perror("pthread_setaffinity_np");
@@ -212,7 +212,7 @@ static void *call_rcu_thread(void *arg)
thread_call_rcu_data = crdp;
for (;;) {
- if (!(crdp->flags & URCU_CALL_RCU_RT)) {
+ if (!rt) {
uatomic_dec(&crdp->futex);
/* Decrement futex before reading call_rcu list */
cmm_smp_mb();
@@ -240,8 +240,8 @@ static void *call_rcu_thread(void *arg)
} while (cbs != NULL);
uatomic_sub(&crdp->qlen, cbcount);
}
- if (crdp->flags & URCU_CALL_RCU_STOP) {
- if (!(crdp->flags & URCU_CALL_RCU_RT)) {
+ if (uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOP) {
+ if (!rt) {
/*
* Read call_rcu list before write futex.
*/
@@ -250,15 +250,13 @@ static void *call_rcu_thread(void *arg)
}
break;
}
- if (!(crdp->flags & URCU_CALL_RCU_RT)) {
+ if (!rt) {
if (&crdp->cbs.head == _CMM_LOAD_SHARED(crdp->cbs.tail))
call_rcu_wait(crdp);
}
poll(NULL, 0, 10);
}
- call_rcu_lock(&crdp->mtx);
- crdp->flags |= URCU_CALL_RCU_STOPPED;
- call_rcu_unlock(&crdp->mtx);
+ uatomic_or(&crdp->flags, URCU_CALL_RCU_STOPPED);
return NULL;
}
@@ -282,10 +280,6 @@ static void call_rcu_data_init(struct call_rcu_data **crdpp,
memset(crdp, '\0', sizeof(*crdp));
cds_wfq_init(&crdp->cbs);
crdp->qlen = 0;
- if (pthread_mutex_init(&crdp->mtx, NULL) != 0) {
- perror("pthread_mutex_init");
- exit(-1);
- }
crdp->futex = 0;
crdp->flags = flags;
cds_list_add(&crdp->list, &call_rcu_data_list);
@@ -568,12 +562,10 @@ void call_rcu_data_free(struct call_rcu_data *crdp)
if (crdp == NULL || crdp == default_call_rcu_data) {
return;
}
- if ((crdp->flags & URCU_CALL_RCU_STOPPED) == 0) {
- call_rcu_lock(&crdp->mtx);
- crdp->flags |= URCU_CALL_RCU_STOP;
- call_rcu_unlock(&crdp->mtx);
+ if ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == 0) {
+ uatomic_or(&crdp->flags, URCU_CALL_RCU_STOP);
wake_call_rcu_thread(crdp);
- while ((crdp->flags & URCU_CALL_RCU_STOPPED) == 0)
+ while ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == 0)
poll(NULL, 0, 1);
}
if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) {
@@ -657,7 +649,7 @@ void call_rcu_after_fork_child(void)
if (crdp == default_call_rcu_data)
crdp = cds_list_entry(crdp->list.prev,
struct call_rcu_data, list);
- crdp->flags = URCU_CALL_RCU_STOPPED;
+ uatomic_set(&crdp->flags, URCU_CALL_RCU_STOPPED);
call_rcu_data_free(crdp);
}
}
--
1.7.4.4
More information about the lttng-dev
mailing list