[ltt-dev] [PATCH 08/10] call_rcu: redo futex implementation
Paolo Bonzini
pbonzini at redhat.com
Wed Jun 8 04:59:16 EDT 2011
The current futex implementation is actually busy waiting on the
list, because futex is never set to -1. This one instead piggybacks
on the flags field, so that the futex can also be used for the STOP flag.
Signed-off-by: Paolo Bonzini <pbonzini at redhat.com>
---
urcu-call-rcu-impl.h | 43 +++++++++++++++++++++++++------------------
urcu-call-rcu.h | 1 +
2 files changed, 26 insertions(+), 18 deletions(-)
diff --git a/urcu-call-rcu-impl.h b/urcu-call-rcu-impl.h
index ca597d0..9824515 100644
--- a/urcu-call-rcu-impl.h
+++ b/urcu-call-rcu-impl.h
@@ -45,8 +45,7 @@
struct call_rcu_data {
struct cds_wfq_queue cbs;
- unsigned long flags;
- int futex;
+ int flags;
pthread_t tid;
int cpu_affinity;
struct cds_list_head list;
@@ -89,22 +88,18 @@ static long maxcpus;
static void call_rcu_wait(struct call_rcu_data *crdp)
{
- /* Read call_rcu list before read futex */
- cmm_smp_mb();
- if (uatomic_read(&crdp->futex) == -1)
- futex_async(&crdp->futex, FUTEX_WAIT, -1,
- NULL, NULL, 0);
+ int flags;
+ for (;;) {
+ flags = uatomic_read(&crdp->flags);
+ if (flags & (URCU_CALL_RCU_BUSY | URCU_CALL_RCU_STOP))
+ break;
+ futex_async(&crdp->flags, FUTEX_WAIT, flags, NULL, 0, 0);
+ }
}
static void call_rcu_wake_up(struct call_rcu_data *crdp)
{
- /* Write to call_rcu list before reading/writing futex */
- cmm_smp_mb();
- if (unlikely(uatomic_read(&crdp->futex) == -1)) {
- uatomic_set(&crdp->futex, 0);
- futex_async(&crdp->futex, FUTEX_WAKE, 1,
- NULL, NULL, 0);
- }
+ futex_async(&crdp->flags, FUTEX_WAKE, 1, NULL, NULL, 0);
}
/* Allocate the array if it has not already been allocated. */
@@ -209,7 +204,16 @@ static void *call_rcu_thread(void *arg)
thread_call_rcu_data = crdp;
for (;;) {
- if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) {
+ for (;;) {
+ if (&crdp->cbs.head
+ == _CMM_LOAD_SHARED(crdp->cbs.tail)) {
+ uatomic_and(&crdp->flags, ~URCU_CALL_RCU_BUSY);
+ /* Write flags before rechecking the list. */
+ cmm_smp_mb();
+ if (&crdp->cbs.head
+ == _CMM_LOAD_SHARED(crdp->cbs.tail))
+ break;
+ }
while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL)
poll(NULL, 0, 1);
_CMM_STORE_SHARED(crdp->cbs.head, NULL);
@@ -229,13 +233,14 @@ static void *call_rcu_thread(void *arg)
rhp->func(rhp);
} while (cbs != NULL);
}
+ /* Read call_rcu list before reading the flags. */
+ cmm_smp_mb();
if (uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOP)
break;
if (uatomic_read(&crdp->flags) & URCU_CALL_RCU_RT)
poll(NULL, 0, 10);
else {
- if (&crdp->cbs.head == _CMM_LOAD_SHARED(crdp->cbs.tail))
- call_rcu_wait(crdp);
+ call_rcu_wait(crdp);
poll(NULL, 0, 10);
}
}
@@ -262,7 +267,6 @@ static void call_rcu_data_init(struct call_rcu_data **crdpp,
}
memset(crdp, '\0', sizeof(*crdp));
cds_wfq_init(&crdp->cbs);
- crdp->futex = 0;
crdp->flags = flags;
cds_list_add(&crdp->list, &call_rcu_data_list);
crdp->cpu_affinity = cpu_affinity;
@@ -513,6 +517,9 @@ void call_rcu(struct rcu_head *head,
head->func = func;
crdp = get_call_rcu_data();
cds_wfq_enqueue(&crdp->cbs, &head->next);
+ /* Write list before writing the flags. */
+ cmm_smp_mb();
+ uatomic_or(&crdp->flags, URCU_CALL_RCU_BUSY);
wake_call_rcu_thread(crdp);
}
diff --git a/urcu-call-rcu.h b/urcu-call-rcu.h
index e76a844..bfd56ec 100644
--- a/urcu-call-rcu.h
+++ b/urcu-call-rcu.h
@@ -48,6 +48,7 @@ struct call_rcu_data;
#define URCU_CALL_RCU_RUNNING 0x2
#define URCU_CALL_RCU_STOP 0x4
#define URCU_CALL_RCU_STOPPED 0x8
+#define URCU_CALL_RCU_BUSY 0x10
/*
* The rcu_head data structure is placed in the structure to be freed
--
1.7.4.4
More information about the lttng-dev
mailing list