[lttng-dev] [PATCH lttng-tools master] Fix: cleanup inactive FDs in the consumer polling thread

Jonathan Rajotte jonathan.rajotte-julien at efficios.com
Thu Feb 1 19:24:10 UTC 2018


From: Julien Desfossez <jdesfossez at efficios.com>

The data polling thread on the consumer relies on nb_fd and
consumer_quit to determine if it can exit, but the polling thread is
also responsible to close inactive streams and there is a case where it
could exit before it does.

On consumer teardown (consumer_quit == 1 and all streams hanging up), if
a relay becomes unreachable, we flag the streams that talk to this relay
as inactive (CONSUMER_ENDPOINT_INACTIVE) and wakeup the data polling
thread to close them. If that thread is already busy handling the hangup
of the remaining streams, it ends up updating the poll array without all
the inactive streams in it and nb_fd can end up being 0, which makes the
polling thread exit.

So we now track the number of inactive streams in update_poll_array()
and prevent the data polling thread to exit if there are inactive
streams. That way the write on the data_poll_pipe is received by this
thread and it can close the inactive streams properly.

Signed-off-by: Julien Desfossez <jdesfossez at efficios.com>
Signed-off-by: Jonathan Rajotte <jonathan.rajotte-julien at efficios.com>
---
 src/common/consumer/consumer.c | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)

diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c
index 96ad851..004a265 100644
--- a/src/common/consumer/consumer.c
+++ b/src/common/consumer/consumer.c
@@ -1074,7 +1074,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
  */
 static int update_poll_array(struct lttng_consumer_local_data *ctx,
 		struct pollfd **pollfd, struct lttng_consumer_stream **local_stream,
-		struct lttng_ht *ht)
+		struct lttng_ht *ht, int *nb_inactive_fd)
 {
 	int i = 0;
 	struct lttng_ht_iter iter;
@@ -1086,6 +1086,7 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
 	assert(local_stream);
 
 	DBG("Updating poll fd array");
+	*nb_inactive_fd = 0;
 	rcu_read_lock();
 	cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
 		/*
@@ -1096,9 +1097,14 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
 		 * just after the check. However, this is OK since the stream(s) will
 		 * be deleted once the thread is notified that the end point state has
 		 * changed where this function will be called back again.
+		 *
+		 * We track the number of inactive FDs because they still need to be
+		 * closed by the polling thread after a wakeup on the data_pipe or
+		 * metadata_pipe.
 		 */
 		if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
 				stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
+			(*nb_inactive_fd)++;
 			continue;
 		}
 		/*
@@ -2452,6 +2458,8 @@ void *consumer_thread_data_poll(void *data)
 	struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
 	/* local view of consumer_data.fds_count */
 	int nb_fd = 0;
+	/* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */
+	int nb_inactive_fd = 0;
 	struct lttng_consumer_local_data *ctx = data;
 	ssize_t len;
 
@@ -2508,7 +2516,7 @@ void *consumer_thread_data_poll(void *data)
 				goto end;
 			}
 			ret = update_poll_array(ctx, &pollfd, local_stream,
-					data_ht);
+					data_ht, &nb_inactive_fd);
 			if (ret < 0) {
 				ERR("Error in allocating pollfd or local_outfds");
 				lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
@@ -2521,7 +2529,7 @@ void *consumer_thread_data_poll(void *data)
 		pthread_mutex_unlock(&consumer_data.lock);
 
 		/* No FDs and consumer_quit, consumer_cleanup the thread */
-		if (nb_fd == 0 && CMM_LOAD_SHARED(consumer_quit) == 1) {
+		if (nb_fd == 0 && nb_inactive_fd == 0 && CMM_LOAD_SHARED(consumer_quit) == 1) {
 			err = 0;	/* All is OK */
 			goto end;
 		}
-- 
2.7.4



More information about the lttng-dev mailing list