[lttng-dev] [RFC PATCH liburcu v2] Use workqueue in rculfhash
Mathieu Desnoyers
mathieu.desnoyers at efficios.com
Wed May 31 15:51:01 UTC 2017
Changes since v1:
- Block all signals in rculfhash worker thread,
- Integrate with all urcu flavors atfork APIs to provide a fully
backward-compatible behavior,
- Only run the worker thread when hash tables with "auto-resize" flag
exist.
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
---
include/urcu/map/urcu-bp.h | 5 +
include/urcu/map/urcu-qsbr.h | 5 +
include/urcu/map/urcu.h | 15 +++
include/urcu/rculfhash.h | 15 ++-
src/rculfhash-internal.h | 2 +-
src/rculfhash.c | 228 +++++++++++++++++++++++++++++++------------
src/urcu-call-rcu-impl.h | 37 +++++++
src/urcu-flavor.h | 15 +++
8 files changed, 254 insertions(+), 68 deletions(-)
diff --git a/include/urcu/map/urcu-bp.h b/include/urcu/map/urcu-bp.h
index 67ba5c3..1476924 100644
--- a/include/urcu/map/urcu-bp.h
+++ b/include/urcu/map/urcu-bp.h
@@ -77,4 +77,9 @@
#define rcu_yield_active rcu_yield_active_bp
#define rcu_rand_yield rcu_rand_yield_bp
+#define urcu_register_rculfhash_atfork \
+ urcu_register_rculfhash_atfork_bp
+#define urcu_unregister_rculfhash_atfork \
+ urcu_unregister_rculfhash_atfork_bp
+
#endif /* _URCU_BP_MAP_H */
diff --git a/include/urcu/map/urcu-qsbr.h b/include/urcu/map/urcu-qsbr.h
index 9e90e3c..bf38c82 100644
--- a/include/urcu/map/urcu-qsbr.h
+++ b/include/urcu/map/urcu-qsbr.h
@@ -76,4 +76,9 @@
#define rcu_flavor rcu_flavor_qsbr
+#define urcu_register_rculfhash_atfork \
+ urcu_register_rculfhash_atfork_qsbr
+#define urcu_unregister_rculfhash_atfork \
+ urcu_unregister_rculfhash_atfork_qsbr
+
#endif /* _URCU_QSBR_MAP_H */
diff --git a/include/urcu/map/urcu.h b/include/urcu/map/urcu.h
index 449513e..b12fa74 100644
--- a/include/urcu/map/urcu.h
+++ b/include/urcu/map/urcu.h
@@ -80,6 +80,11 @@
#define rcu_flavor rcu_flavor_memb
+#define urcu_register_rculfhash_atfork \
+ urcu_register_rculfhash_atfork_memb
+#define urcu_unregister_rculfhash_atfork \
+ urcu_unregister_rculfhash_atfork_memb
+
#elif defined(RCU_SIGNAL)
#define rcu_read_lock rcu_read_lock_sig
@@ -122,6 +127,11 @@
#define rcu_flavor rcu_flavor_sig
+#define urcu_register_rculfhash_atfork \
+ urcu_register_rculfhash_atfork_sig
+#define urcu_unregister_rculfhash_atfork \
+ urcu_unregister_rculfhash_atfork_sig
+
#elif defined(RCU_MB)
#define rcu_read_lock rcu_read_lock_mb
@@ -164,6 +174,11 @@
#define rcu_flavor rcu_flavor_mb
+#define urcu_register_rculfhash_atfork \
+ urcu_register_rculfhash_atfork_mb
+#define urcu_unregister_rculfhash_atfork \
+ urcu_unregister_rculfhash_atfork_mb
+
#else
#error "Undefined selection"
diff --git a/include/urcu/rculfhash.h b/include/urcu/rculfhash.h
index 9934422..0789aa5 100644
--- a/include/urcu/rculfhash.h
+++ b/include/urcu/rculfhash.h
@@ -176,10 +176,17 @@ struct cds_lfht *cds_lfht_new(unsigned long init_size,
* need to be informed of the value passed to cds_lfht_new().
*
* Return 0 on success, negative error value on error.
- * Threads calling this API need to be registered RCU read-side threads.
- * cds_lfht_destroy should *not* be called from a RCU read-side critical
- * section. It should *not* be called from a call_rcu thread context
- * neither.
+
+ * Prior to liburcu 0.10:
+ * - Threads calling this API need to be registered RCU read-side
+ * threads.
+ * - cds_lfht_destroy should *not* be called from a RCU read-side
+ * critical section. It should *not* be called from a call_rcu thread
+ * context neither.
+ *
+ * Starting from liburcu 0.10, rculfhash implements its own worker
+ * thread to handle resize operations, which removes RCU requirements on
+ * cds_lfht_destroy.
*/
extern
int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr);
diff --git a/src/rculfhash-internal.h b/src/rculfhash-internal.h
index d7cec95..0f8df97 100644
--- a/src/rculfhash-internal.h
+++ b/src/rculfhash-internal.h
@@ -82,7 +82,7 @@ struct cds_lfht {
*/
pthread_mutex_t resize_mutex; /* resize mutex: add/del mutex */
pthread_attr_t *resize_attr; /* Resize threads attributes */
- unsigned int in_progress_resize, in_progress_destroy;
+ unsigned int in_progress_destroy;
unsigned long resize_target;
int resize_initiated;
diff --git a/src/rculfhash.c b/src/rculfhash.c
index d7a1f23..0bd1384 100644
--- a/src/rculfhash.c
+++ b/src/rculfhash.c
@@ -64,7 +64,7 @@
* - Split-counters are used to keep track of the number of
* nodes within the hash table for automatic resize triggering.
* - Resize operation initiated by long chain detection is executed by a
- * call_rcu thread, which keeps lock-freedom of add and remove.
+ * worker thread, which keeps lock-freedom of add and remove.
* - Resize operations are protected by a mutex.
* - The removal operation is split in two parts: first, a "removed"
* flag is set in the next pointer within the node to remove. Then,
@@ -276,6 +276,9 @@
#include <rculfhash-internal.h>
#include <stdio.h>
#include <pthread.h>
+#include <signal.h>
+#include "workqueue.h"
+#include "urcu-die.h"
/*
* Split-counters lazily update the global counter each 1024
@@ -335,11 +338,11 @@ struct ht_items_count {
} __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
/*
- * rcu_resize_work: Contains arguments passed to RCU worker thread
+ * resize_work: Contains arguments passed to worker thread
* responsible for performing lazy resize.
*/
-struct rcu_resize_work {
- struct rcu_head head;
+struct resize_work {
+ struct urcu_work work;
struct cds_lfht *ht;
};
@@ -356,6 +359,27 @@ struct partition_resize_work {
unsigned long start, unsigned long len);
};
+static struct urcu_workqueue *cds_lfht_workqueue;
+static unsigned long cds_lfht_workqueue_user_count;
+
+/*
+ * Mutex ensuring mutual exclusion between workqueue initialization and
+ * fork handlers. cds_lfht_fork_mutex nests inside call_rcu_mutex.
+ */
+static pthread_mutex_t cds_lfht_fork_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static struct urcu_atfork cds_lfht_atfork;
+
+/*
+ * atfork handler nesting counters. Handle being registered to many urcu
+ * flavors, thus being possibly invoked more than once in the
+ * pthread_atfork list of callbacks.
+ */
+static int cds_lfht_workqueue_atfork_nesting;
+
+static void cds_lfht_init_worker(const struct rcu_flavor_struct *flavor);
+static void cds_lfht_fini_worker(const struct rcu_flavor_struct *flavor);
+
/*
* Algorithm to reverse bits in a word by lookup table, extended to
* 64-bit words.
@@ -1224,14 +1248,12 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i,
if (start == 0 && nr_threads > 0)
return;
fallback:
- ht->flavor->thread_online();
fct(ht, i, start, len);
- ht->flavor->thread_offline();
}
/*
* Holding RCU read lock to protect _cds_lfht_add against memory
- * reclaim that could be performed by other call_rcu worker threads (ABA
+ * reclaim that could be performed by other worker threads (ABA
* problem).
*
* When we reach a certain length, we can split this population phase over
@@ -1308,7 +1330,7 @@ void init_table(struct cds_lfht *ht,
/*
* Holding RCU read lock to protect _cds_lfht_remove against memory
- * reclaim that could be performed by other call_rcu worker threads (ABA
+ * reclaim that could be performed by other worker threads (ABA
* problem).
* For a single level, we logically remove and garbage collect each node.
*
@@ -1320,8 +1342,9 @@ void init_table(struct cds_lfht *ht,
*
* Concurrent removal and add operations are helping us perform garbage
* collection of logically removed nodes. We guarantee that all logically
- * removed nodes have been garbage-collected (unlinked) before call_rcu is
- * invoked to free a hole level of bucket nodes (after a grace period).
+ * removed nodes have been garbage-collected (unlinked) before work
+ * enqueue is invoked to free a hole level of bucket nodes (after a
+ * grace period).
*
* Logical removal and garbage collection can therefore be done in batch
* or on a node-per-node basis, as long as the guarantee above holds.
@@ -1513,6 +1536,9 @@ struct cds_lfht *_cds_lfht_new(unsigned long init_size,
if (!max_nr_buckets || (max_nr_buckets & (max_nr_buckets - 1)))
return NULL;
+ if (flags & CDS_LFHT_AUTO_RESIZE)
+ cds_lfht_init_worker(flavor);
+
min_nr_alloc_buckets = max(min_nr_alloc_buckets, MIN_TABLE_SIZE);
init_size = max(init_size, MIN_TABLE_SIZE);
max_nr_buckets = max(max_nr_buckets, min_nr_alloc_buckets);
@@ -1772,25 +1798,14 @@ int cds_lfht_delete_bucket(struct cds_lfht *ht)
*/
int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr)
{
- int ret, was_online;
-
- /* Wait for in-flight resize operations to complete */
- _CMM_STORE_SHARED(ht->in_progress_destroy, 1);
- cmm_smp_mb(); /* Store destroy before load resize */
- was_online = ht->flavor->read_ongoing();
- if (was_online)
- ht->flavor->thread_offline();
- /* Calling with RCU read-side held is an error. */
- if (ht->flavor->read_ongoing()) {
- ret = -EINVAL;
- if (was_online)
- ht->flavor->thread_online();
- goto end;
+ int ret;
+
+ if (ht->flags & CDS_LFHT_AUTO_RESIZE) {
+ /* Cancel ongoing resize operations. */
+ _CMM_STORE_SHARED(ht->in_progress_destroy, 1);
+ /* Wait for in-flight resize operations to complete */
+ urcu_workqueue_flush_queued_work(cds_lfht_workqueue);
}
- while (uatomic_read(&ht->in_progress_resize))
- poll(NULL, 0, 100); /* wait for 100ms */
- if (was_online)
- ht->flavor->thread_online();
ret = cds_lfht_delete_bucket(ht);
if (ret)
return ret;
@@ -1800,8 +1815,9 @@ int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr)
ret = pthread_mutex_destroy(&ht->resize_mutex);
if (ret)
ret = -EBUSY;
+ if (ht->flags & CDS_LFHT_AUTO_RESIZE)
+ cds_lfht_fini_worker(ht->flavor);
poison_free(ht);
-end:
return ret;
}
@@ -1897,7 +1913,6 @@ void _do_cds_lfht_resize(struct cds_lfht *ht)
* Resize table, re-do if the target size has changed under us.
*/
do {
- assert(uatomic_read(&ht->in_progress_resize));
if (CMM_LOAD_SHARED(ht->in_progress_destroy))
break;
ht->resize_initiated = 1;
@@ -1930,71 +1945,47 @@ void resize_target_update_count(struct cds_lfht *ht,
void cds_lfht_resize(struct cds_lfht *ht, unsigned long new_size)
{
- int was_online;
-
- was_online = ht->flavor->read_ongoing();
- if (was_online)
- ht->flavor->thread_offline();
- /* Calling with RCU read-side held is an error. */
- if (ht->flavor->read_ongoing()) {
- static int print_once;
-
- if (!CMM_LOAD_SHARED(print_once))
- fprintf(stderr, "[error] rculfhash: cds_lfht_resize "
- "called with RCU read-side lock held.\n");
- CMM_STORE_SHARED(print_once, 1);
- assert(0);
- goto end;
- }
resize_target_update_count(ht, new_size);
CMM_STORE_SHARED(ht->resize_initiated, 1);
pthread_mutex_lock(&ht->resize_mutex);
_do_cds_lfht_resize(ht);
pthread_mutex_unlock(&ht->resize_mutex);
-end:
- if (was_online)
- ht->flavor->thread_online();
}
static
-void do_resize_cb(struct rcu_head *head)
+void do_resize_cb(struct urcu_work *work)
{
- struct rcu_resize_work *work =
- caa_container_of(head, struct rcu_resize_work, head);
- struct cds_lfht *ht = work->ht;
+ struct resize_work *resize_work =
+ caa_container_of(work, struct resize_work, work);
+ struct cds_lfht *ht = resize_work->ht;
- ht->flavor->thread_offline();
+ ht->flavor->register_thread();
pthread_mutex_lock(&ht->resize_mutex);
_do_cds_lfht_resize(ht);
pthread_mutex_unlock(&ht->resize_mutex);
- ht->flavor->thread_online();
+ ht->flavor->unregister_thread();
poison_free(work);
- cmm_smp_mb(); /* finish resize before decrement */
- uatomic_dec(&ht->in_progress_resize);
}
static
void __cds_lfht_resize_lazy_launch(struct cds_lfht *ht)
{
- struct rcu_resize_work *work;
+ struct resize_work *work;
/* Store resize_target before read resize_initiated */
cmm_smp_mb();
if (!CMM_LOAD_SHARED(ht->resize_initiated)) {
- uatomic_inc(&ht->in_progress_resize);
- cmm_smp_mb(); /* increment resize count before load destroy */
if (CMM_LOAD_SHARED(ht->in_progress_destroy)) {
- uatomic_dec(&ht->in_progress_resize);
return;
}
work = malloc(sizeof(*work));
if (work == NULL) {
dbg_printf("error allocating resize work, bailing out\n");
- uatomic_dec(&ht->in_progress_resize);
return;
}
work->ht = ht;
- ht->flavor->update_call_rcu(&work->head, do_resize_cb);
+ urcu_workqueue_queue_work(cds_lfht_workqueue,
+ &work->work, do_resize_cb);
CMM_STORE_SHARED(ht->resize_initiated, 1);
}
}
@@ -2045,3 +2036,114 @@ void cds_lfht_resize_lazy_count(struct cds_lfht *ht, unsigned long size,
}
__cds_lfht_resize_lazy_launch(ht);
}
+
+static void mutex_lock(pthread_mutex_t *mutex)
+{
+ int ret;
+
+#ifndef DISTRUST_SIGNALS_EXTREME
+ ret = pthread_mutex_lock(mutex);
+ if (ret)
+ urcu_die(ret);
+#else /* #ifndef DISTRUST_SIGNALS_EXTREME */
+ while ((ret = pthread_mutex_trylock(mutex)) != 0) {
+ if (ret != EBUSY && ret != EINTR)
+ urcu_die(ret);
+ if (CMM_LOAD_SHARED(URCU_TLS(rcu_reader).need_mb)) {
+ cmm_smp_mb();
+ _CMM_STORE_SHARED(URCU_TLS(rcu_reader).need_mb, 0);
+ cmm_smp_mb();
+ }
+ (void) poll(NULL, 0, 10);
+ }
+#endif /* #else #ifndef DISTRUST_SIGNALS_EXTREME */
+}
+
+static void mutex_unlock(pthread_mutex_t *mutex)
+{
+ int ret;
+
+ ret = pthread_mutex_unlock(mutex);
+ if (ret)
+ urcu_die(ret);
+}
+
+static void cds_lfht_before_fork(void *priv)
+{
+ if (cds_lfht_workqueue_atfork_nesting++)
+ return;
+ mutex_lock(&cds_lfht_fork_mutex);
+ if (!cds_lfht_workqueue)
+ return;
+ urcu_workqueue_pause_worker(cds_lfht_workqueue);
+}
+
+static void cds_lfht_after_fork_parent(void *priv)
+{
+ if (--cds_lfht_workqueue_atfork_nesting)
+ return;
+ if (!cds_lfht_workqueue)
+ goto end;
+ urcu_workqueue_resume_worker(cds_lfht_workqueue);
+end:
+ mutex_unlock(&cds_lfht_fork_mutex);
+}
+
+static void cds_lfht_after_fork_child(void *priv)
+{
+ if (--cds_lfht_workqueue_atfork_nesting)
+ return;
+ if (!cds_lfht_workqueue)
+ goto end;
+ urcu_workqueue_create_worker(cds_lfht_workqueue);
+end:
+ mutex_unlock(&cds_lfht_fork_mutex);
+}
+
+static struct urcu_atfork cds_lfht_atfork = {
+ .before_fork = cds_lfht_before_fork,
+ .after_fork_parent = cds_lfht_after_fork_parent,
+ .after_fork_child = cds_lfht_after_fork_child,
+};
+
+/* Block all signals to ensure we don't disturb the application. */
+static void cds_lfht_worker_init(struct urcu_workqueue *workqueue,
+ void *priv)
+{
+ int ret;
+ sigset_t mask;
+
+ /* Block signal for entire process, so only our thread processes it. */
+ ret = sigfillset(&mask);
+ if (ret)
+ urcu_die(errno);
+ ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
+ if (ret)
+ urcu_die(ret);
+}
+
+static void cds_lfht_init_worker(const struct rcu_flavor_struct *flavor)
+{
+ flavor->register_rculfhash_atfork(&cds_lfht_atfork);
+
+ mutex_lock(&cds_lfht_fork_mutex);
+ if (cds_lfht_workqueue_user_count++)
+ goto end;
+ cds_lfht_workqueue = urcu_workqueue_create(0, -1, NULL,
+ NULL, cds_lfht_worker_init, NULL, NULL, NULL, NULL, NULL);
+end:
+ mutex_unlock(&cds_lfht_fork_mutex);
+}
+
+static void cds_lfht_fini_worker(const struct rcu_flavor_struct *flavor)
+{
+ mutex_lock(&cds_lfht_fork_mutex);
+ if (--cds_lfht_workqueue_user_count)
+ goto end;
+ urcu_workqueue_destroy(cds_lfht_workqueue);
+ cds_lfht_workqueue = NULL;
+end:
+ mutex_unlock(&cds_lfht_fork_mutex);
+
+ flavor->unregister_rculfhash_atfork(&cds_lfht_atfork);
+}
diff --git a/src/urcu-call-rcu-impl.h b/src/urcu-call-rcu-impl.h
index bfa53f8..4562ba4 100644
--- a/src/urcu-call-rcu-impl.h
+++ b/src/urcu-call-rcu-impl.h
@@ -99,6 +99,9 @@ static pthread_mutex_t call_rcu_mutex = PTHREAD_MUTEX_INITIALIZER;
static struct call_rcu_data *default_call_rcu_data;
+static struct urcu_atfork *registered_rculfhash_atfork;
+static unsigned long registered_rculfhash_atfork_refcount;
+
/*
* If the sched_getcpu() and sysconf(_SC_NPROCESSORS_CONF) calls are
* available, then we can have call_rcu threads assigned to individual
@@ -907,9 +910,14 @@ online:
void call_rcu_before_fork(void)
{
struct call_rcu_data *crdp;
+ struct urcu_atfork *atfork;
call_rcu_lock(&call_rcu_mutex);
+ atfork = registered_rculfhash_atfork;
+ if (atfork)
+ atfork->before_fork(atfork->priv);
+
cds_list_for_each_entry(crdp, &call_rcu_data_list, list) {
uatomic_or(&crdp->flags, URCU_CALL_RCU_PAUSE);
cmm_smp_mb__after_uatomic_or();
@@ -929,6 +937,7 @@ void call_rcu_before_fork(void)
void call_rcu_after_fork_parent(void)
{
struct call_rcu_data *crdp;
+ struct urcu_atfork *atfork;
cds_list_for_each_entry(crdp, &call_rcu_data_list, list)
uatomic_and(&crdp->flags, ~URCU_CALL_RCU_PAUSE);
@@ -936,6 +945,9 @@ void call_rcu_after_fork_parent(void)
while ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_PAUSED) != 0)
(void) poll(NULL, 0, 1);
}
+ atfork = registered_rculfhash_atfork;
+ if (atfork)
+ atfork->after_fork_parent(atfork->priv);
call_rcu_unlock(&call_rcu_mutex);
}
@@ -947,10 +959,15 @@ void call_rcu_after_fork_parent(void)
void call_rcu_after_fork_child(void)
{
struct call_rcu_data *crdp, *next;
+ struct urcu_atfork *atfork;
/* Release the mutex. */
call_rcu_unlock(&call_rcu_mutex);
+ atfork = registered_rculfhash_atfork;
+ if (atfork)
+ atfork->after_fork_child(atfork->priv);
+
/* Do nothing when call_rcu() has not been used */
if (cds_list_empty(&call_rcu_data_list))
return;
@@ -980,3 +997,23 @@ void call_rcu_after_fork_child(void)
call_rcu_data_free(crdp);
}
}
+
+void urcu_register_rculfhash_atfork(struct urcu_atfork *atfork)
+{
+ call_rcu_lock(&call_rcu_mutex);
+ if (registered_rculfhash_atfork_refcount++)
+ goto end;
+ registered_rculfhash_atfork = atfork;
+end:
+ call_rcu_unlock(&call_rcu_mutex);
+}
+
+void urcu_unregister_rculfhash_atfork(struct urcu_atfork *atfork)
+{
+ call_rcu_lock(&call_rcu_mutex);
+ if (--registered_rculfhash_atfork_refcount)
+ goto end;
+ registered_rculfhash_atfork = NULL;
+end:
+ call_rcu_unlock(&call_rcu_mutex);
+}
diff --git a/src/urcu-flavor.h b/src/urcu-flavor.h
index 5e7f292..9cfbd6a 100644
--- a/src/urcu-flavor.h
+++ b/src/urcu-flavor.h
@@ -27,6 +27,16 @@
extern "C" {
#endif
+struct urcu_atfork {
+ void (*before_fork)(void *priv);
+ void (*after_fork_parent)(void *priv);
+ void (*after_fork_child)(void *priv);
+ void *priv;
+};
+
+void urcu_register_rculfhash_atfork(struct urcu_atfork *atfork);
+void urcu_unregister_rculfhash_atfork(struct urcu_atfork *atfork);
+
struct rcu_flavor_struct {
void (*read_lock)(void);
void (*read_unlock)(void);
@@ -43,6 +53,9 @@ struct rcu_flavor_struct {
void (*unregister_thread)(void);
void (*barrier)(void);
+
+ void (*register_rculfhash_atfork)(struct urcu_atfork *atfork);
+ void (*unregister_rculfhash_atfork)(struct urcu_atfork *atfork);
};
#define DEFINE_RCU_FLAVOR(x) \
@@ -59,6 +72,8 @@ const struct rcu_flavor_struct x = { \
.register_thread = rcu_register_thread, \
.unregister_thread = rcu_unregister_thread,\
.barrier = rcu_barrier, \
+ .register_rculfhash_atfork = urcu_register_rculfhash_atfork, \
+ .unregister_rculfhash_atfork = urcu_unregister_rculfhash_atfork,\
}
extern const struct rcu_flavor_struct rcu_flavor;
--
2.1.4
More information about the lttng-dev
mailing list