[ltt-dev] [PATCH 08/10] call_rcu: redo futex implementation

Paolo Bonzini pbonzini at redhat.com
Wed Jun 8 04:59:16 EDT 2011


The current futex implementation is actually busy waiting on the
list, because futex is never set to -1.  This one instead piggybacks
on the flags field, so that the futex can also be used for the STOP flag.

Signed-off-by: Paolo Bonzini <pbonzini at redhat.com>
---
 urcu-call-rcu-impl.h |   43 +++++++++++++++++++++++++------------------
 urcu-call-rcu.h      |    1 +
 2 files changed, 26 insertions(+), 18 deletions(-)

diff --git a/urcu-call-rcu-impl.h b/urcu-call-rcu-impl.h
index ca597d0..9824515 100644
--- a/urcu-call-rcu-impl.h
+++ b/urcu-call-rcu-impl.h
@@ -45,8 +45,7 @@
 
 struct call_rcu_data {
 	struct cds_wfq_queue cbs;
-	unsigned long flags;
-	int futex;
+	int flags;
 	pthread_t tid;
 	int cpu_affinity;
 	struct cds_list_head list;
@@ -89,22 +88,18 @@ static long maxcpus;
 
 static void call_rcu_wait(struct call_rcu_data *crdp)
 {
-	/* Read call_rcu list before read futex */
-	cmm_smp_mb();
-	if (uatomic_read(&crdp->futex) == -1)
-		futex_async(&crdp->futex, FUTEX_WAIT, -1,
-		      NULL, NULL, 0);
+	int flags;
+	for (;;) {
+		flags = uatomic_read(&crdp->flags);
+		if (flags & (URCU_CALL_RCU_BUSY | URCU_CALL_RCU_STOP))
+			break;
+		futex_async(&crdp->flags, FUTEX_WAIT, flags, NULL, 0, 0);
+	}
 }
 
 static void call_rcu_wake_up(struct call_rcu_data *crdp)
 {
-	/* Write to call_rcu list before reading/writing futex */
-	cmm_smp_mb();
-	if (unlikely(uatomic_read(&crdp->futex) == -1)) {
-		uatomic_set(&crdp->futex, 0);
-		futex_async(&crdp->futex, FUTEX_WAKE, 1,
-		      NULL, NULL, 0);
-	}
+	futex_async(&crdp->flags, FUTEX_WAKE, 1, NULL, NULL, 0);
 }
 
 /* Allocate the array if it has not already been allocated. */
@@ -209,7 +204,16 @@ static void *call_rcu_thread(void *arg)
 
 	thread_call_rcu_data = crdp;
 	for (;;) {
-		if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) {
+		for (;;) {
+			if (&crdp->cbs.head
+			    == _CMM_LOAD_SHARED(crdp->cbs.tail)) {
+				uatomic_and(&crdp->flags, ~URCU_CALL_RCU_BUSY);
+				/* Write flags before rechecking the list.  */
+				cmm_smp_mb();
+				if (&crdp->cbs.head
+				    == _CMM_LOAD_SHARED(crdp->cbs.tail))
+					break;
+			}
 			while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL)
 				poll(NULL, 0, 1);
 			_CMM_STORE_SHARED(crdp->cbs.head, NULL);
@@ -229,13 +233,14 @@ static void *call_rcu_thread(void *arg)
 				rhp->func(rhp);
 			} while (cbs != NULL);
 		}
+		/* Read call_rcu list before reading the flags.  */
+		cmm_smp_mb();
 		if (uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOP)
 			break;
 		if (uatomic_read(&crdp->flags) & URCU_CALL_RCU_RT)
 			poll(NULL, 0, 10);
 		else {
-			if (&crdp->cbs.head == _CMM_LOAD_SHARED(crdp->cbs.tail))
-				call_rcu_wait(crdp);
+			call_rcu_wait(crdp);
 			poll(NULL, 0, 10);
 		}
 	}
@@ -262,7 +267,6 @@ static void call_rcu_data_init(struct call_rcu_data **crdpp,
 	}
 	memset(crdp, '\0', sizeof(*crdp));
 	cds_wfq_init(&crdp->cbs);
-	crdp->futex = 0;
 	crdp->flags = flags;
 	cds_list_add(&crdp->list, &call_rcu_data_list);
 	crdp->cpu_affinity = cpu_affinity;
@@ -513,6 +517,9 @@ void call_rcu(struct rcu_head *head,
 	head->func = func;
 	crdp = get_call_rcu_data();
 	cds_wfq_enqueue(&crdp->cbs, &head->next);
+	/* Write list before writing the flags.  */
+	cmm_smp_mb();
+	uatomic_or(&crdp->flags, URCU_CALL_RCU_BUSY);
 	wake_call_rcu_thread(crdp);
 }
 
diff --git a/urcu-call-rcu.h b/urcu-call-rcu.h
index e76a844..bfd56ec 100644
--- a/urcu-call-rcu.h
+++ b/urcu-call-rcu.h
@@ -48,6 +48,7 @@ struct call_rcu_data;
 #define URCU_CALL_RCU_RUNNING	0x2
 #define URCU_CALL_RCU_STOP	0x4
 #define URCU_CALL_RCU_STOPPED	0x8
+#define URCU_CALL_RCU_BUSY	0x10
 
 /*
  * The rcu_head data structure is placed in the structure to be freed
-- 
1.7.4.4






More information about the lttng-dev mailing list