[lttng-dev] URCU background threads vs signalfd

Eric Wong normalperson at yhbt.net
Fri Sep 23 13:55:03 EDT 2022


Mathieu Desnoyers <mathieu.desnoyers at efficios.com> wrote:
> On 2022-09-22 05:15, Eric Wong via lttng-dev wrote:
> > Hello, I'm using urcu-bp + rculfhash + call_rcu to implement
> > malloc instrumentation (via LD_PRELOAD) on an existing
> > single-threaded Perl codebase which uses Linux signalfd.
> > 
> > signalfd depends on signals being blocked in all threads
> > of the process, otherwise threads with unblocked signals
> > can receive them and starve the signalfd.
> > 
> > While some threads in URCU do block signals (e.g. workqueue
> > worker for rculfhash), the call_rcu thread and rculfhash
> > partition_resize_helper threads do not...
> > 
> > Should all threads URCU creates block signals (aside from SIGRCU)?
> 
> Yes, I think you are right. The SIGRCU signal is only needed for the
> urcu-signal flavor though.
> 
> Would you like to submit a patch ?

Sure.

Is there a way to detect at runtime when urcu-signal is in use
so SIGRCU (SIGUSR1) doesn't get unblocked when using other flavors?

I actually use SIGUSR1 in my signalfd-using codebase.

I also want to remove cds_lfht_worker_init entirely since it's racy.
Signal blocking needs to be done in the parent before pthread_create
to avoid a window where the child has unblocked signals.

Thanks.  Anyways, this is my work-in-progress:

diff --git a/src/rculfhash.c b/src/rculfhash.c
index 7c0b9fb..5f455af 100644
--- a/src/rculfhash.c
+++ b/src/rculfhash.c
@@ -1251,6 +1251,7 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i,
 	struct partition_resize_work *work;
 	int ret;
 	unsigned long thread, nr_threads;
+	sigset_t newmask, oldmask;
 
 	urcu_posix_assert(nr_cpus_mask != -1);
 	if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD)
@@ -1273,6 +1274,12 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i,
 		dbg_printf("error allocating for resize, single-threading\n");
 		goto fallback;
 	}
+
+	ret = sigfillset(&newmask);
+	urcu_posix_assert(!ret);
+	ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask);
+	urcu_posix_assert(!ret);
+
 	for (thread = 0; thread < nr_threads; thread++) {
 		work[thread].ht = ht;
 		work[thread].i = i;
@@ -1294,6 +1301,10 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i,
 		}
 		urcu_posix_assert(!ret);
 	}
+
+	ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL);
+	urcu_posix_assert(!ret);
+
 	for (thread = 0; thread < nr_threads; thread++) {
 		ret = pthread_join(work[thread].thread_id, NULL);
 		urcu_posix_assert(!ret);
diff --git a/src/urcu-call-rcu-impl.h b/src/urcu-call-rcu-impl.h
index e9366b4..9f85d55 100644
--- a/src/urcu-call-rcu-impl.h
+++ b/src/urcu-call-rcu-impl.h
@@ -434,6 +434,7 @@ static void call_rcu_data_init(struct call_rcu_data **crdpp,
 {
 	struct call_rcu_data *crdp;
 	int ret;
+	sigset_t newmask, oldmask;
 
 	crdp = malloc(sizeof(*crdp));
 	if (crdp == NULL)
@@ -448,9 +449,18 @@ static void call_rcu_data_init(struct call_rcu_data **crdpp,
 	crdp->gp_count = 0;
 	cmm_smp_mb();  /* Structure initialized before pointer is planted. */
 	*crdpp = crdp;
+
+	ret = sigfillset(&newmask);
+	urcu_posix_assert(!ret);
+	ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask);
+	urcu_posix_assert(!ret);
+
 	ret = pthread_create(&crdp->tid, NULL, call_rcu_thread, crdp);
 	if (ret)
 		urcu_die(ret);
+
+	ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL);
+	urcu_posix_assert(!ret);
 }
 
 /*
diff --git a/src/urcu-defer-impl.h b/src/urcu-defer-impl.h
index b5d7926..1c96287 100644
--- a/src/urcu-defer-impl.h
+++ b/src/urcu-defer-impl.h
@@ -409,9 +409,18 @@ void defer_rcu(void (*fct)(void *p), void *p)
 static void start_defer_thread(void)
 {
 	int ret;
+	sigset_t newmask, oldmask;
+
+	ret = sigfillset(&newmask);
+	urcu_posix_assert(!ret);
+	ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask);
+	urcu_posix_assert(!ret);
 
 	ret = pthread_create(&tid_defer, NULL, thr_defer, NULL);
 	urcu_posix_assert(!ret);
+
+	ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL);
+	urcu_posix_assert(!ret);
 }
 
 static void stop_defer_thread(void)
diff --git a/src/workqueue.c b/src/workqueue.c
index b6361ad..1039d72 100644
--- a/src/workqueue.c
+++ b/src/workqueue.c
@@ -284,6 +284,7 @@ struct urcu_workqueue *urcu_workqueue_create(unsigned long flags,
 {
 	struct urcu_workqueue *workqueue;
 	int ret;
+	sigset_t newmask, oldmask;
 
 	workqueue = malloc(sizeof(*workqueue));
 	if (workqueue == NULL)
@@ -304,10 +305,20 @@ struct urcu_workqueue *urcu_workqueue_create(unsigned long flags,
 	workqueue->cpu_affinity = cpu_affinity;
 	workqueue->loop_count = 0;
 	cmm_smp_mb();  /* Structure initialized before pointer is planted. */
+
+	ret = sigfillset(&newmask);
+	urcu_posix_assert(!ret);
+	ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask);
+	urcu_posix_assert(!ret);
+
 	ret = pthread_create(&workqueue->tid, NULL, workqueue_thread, workqueue);
 	if (ret) {
 		urcu_die(ret);
 	}
+
+	ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL);
+	urcu_posix_assert(!ret);
+
 	return workqueue;
 }
 
@@ -464,13 +475,23 @@ void urcu_workqueue_resume_worker(struct urcu_workqueue *workqueue)
 void urcu_workqueue_create_worker(struct urcu_workqueue *workqueue)
 {
 	int ret;
+	sigset_t newmask, oldmask;
 
 	/* Clear workqueue state from parent. */
 	workqueue->flags &= ~URCU_WORKQUEUE_PAUSED;
 	workqueue->flags &= ~URCU_WORKQUEUE_PAUSE;
 	workqueue->tid = 0;
+
+	ret = sigfillset(&newmask);
+	urcu_posix_assert(!ret);
+	ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask);
+	urcu_posix_assert(!ret);
+
 	ret = pthread_create(&workqueue->tid, NULL, workqueue_thread, workqueue);
 	if (ret) {
 		urcu_die(ret);
 	}
+
+	ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL);
+	urcu_posix_assert(!ret);
 }


More information about the lttng-dev mailing list