[lttng-dev] [PATCH lttng-tools master] Fix: cleanup inactive FDs in the consumer polling thread

Jérémie Galarneau jeremie.galarneau at efficios.com
Sat Feb 3 23:29:58 UTC 2018


On 1 February 2018 at 14:24, Jonathan Rajotte
<jonathan.rajotte-julien at efficios.com> wrote:
> From: Julien Desfossez <jdesfossez at efficios.com>
>
> The data polling thread on the consumer relies on nb_fd and
> consumer_quit to determine if it can exit, but the polling thread is
> also responsible to close inactive streams and there is a case where it
> could exit before it does.
>
> On consumer teardown (consumer_quit == 1 and all streams hanging up), if
> a relay becomes unreachable, we flag the streams that talk to this relay
> as inactive (CONSUMER_ENDPOINT_INACTIVE) and wakeup the data polling
> thread to close them. If that thread is already busy handling the hangup
> of the remaining streams, it ends up updating the poll array without all
> the inactive streams in it and nb_fd can end up being 0, which makes the
> polling thread exit.
>
> So we now track the number of inactive streams in update_poll_array()
> and prevent the data polling thread to exit if there are inactive
> streams. That way the write on the data_poll_pipe is received by this
> thread and it can close the inactive streams properly.

The code looks good.

I propose the following message which reflects my understanding of the
problem. Let me know if it is correct.

Fix: cleanup inactive FDs in the consumer polling thread

Users have reported assert() hitting on consumerd shutdown on a
non-empty data stream hash table.

Relevant stack trace:
[...] in lttng_ht_destroy (ht=0x6) at hashtable.c:162
[...] in lttng_consumer_cleanup () at consumer.c:1207
[...] in main ([...]) at lttng-consumerd.c:625

This is reproducible when a consumerd is shutting down at the same
time as one of its relay daemon peers.

On failure to reach a relay daemon, all of that relay daemons'
associated streams are marked as having an inactive endpoint (see
cleanup_relayd(), consumer.c:467). The data polling thread is notified
of the change through an empty message on the "data" pipe.

Before blocking on the next poll(), the data polling thread checks if
it needs to update its poll set using the "need_update" flag. This
flag is set anytime a stream is added or deleted.

While building a new poll set, streams that are now marked as inactive
or as having an inactive endpoint are not included in the new poll
set. Those inactive streams are in a transitional state, awaiting
a clean-up.

After updating the poll set, the data polling thread checks if it
should quit (via the consumer_quit flag). Assuming this flag is set,
the thread cannot simply exit; it must clean-up any remaining data
stream.

The thread currently performs this check at consumer.c:2532. This
check is erroneous as it assumes that the number of FDs in the poll set is
indicative of the number of FDs the thread has ownership of.

If all streams are inactive, the poll set will contain no FDs to
monitor and the thread will assume that it can exit. This will leave
streams in "data_ht", causing an assertion to hit in the main thread
during the clean-up.

This patch adds an inactive FD count which must also reach zero before
the data polling thread can exit.

The clean-up of the inactive streams occurs as the data polling thread
wakes-up on its "data" pipe. Upon being woken-up on the "data" pipe,
the data polling thread will validate the endpoint status of every
data stream and close those that have been marked as inactive
(see consumer_del_stream(), consumer.c:525).

This occurs as often as necessary to allow the thread to clean-up all
of its inactive streams and exit cleanly.



Thanks,
Jérémie

>
> Signed-off-by: Julien Desfossez <jdesfossez at efficios.com>
> Signed-off-by: Jonathan Rajotte <jonathan.rajotte-julien at efficios.com>
> ---
>  src/common/consumer/consumer.c | 14 +++++++++++---
>  1 file changed, 11 insertions(+), 3 deletions(-)
>
> diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c
> index 96ad851..004a265 100644
> --- a/src/common/consumer/consumer.c
> +++ b/src/common/consumer/consumer.c
> @@ -1074,7 +1074,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
>   */
>  static int update_poll_array(struct lttng_consumer_local_data *ctx,
>                 struct pollfd **pollfd, struct lttng_consumer_stream **local_stream,
> -               struct lttng_ht *ht)
> +               struct lttng_ht *ht, int *nb_inactive_fd)
>  {
>         int i = 0;
>         struct lttng_ht_iter iter;
> @@ -1086,6 +1086,7 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
>         assert(local_stream);
>
>         DBG("Updating poll fd array");
> +       *nb_inactive_fd = 0;
>         rcu_read_lock();
>         cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
>                 /*
> @@ -1096,9 +1097,14 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
>                  * just after the check. However, this is OK since the stream(s) will
>                  * be deleted once the thread is notified that the end point state has
>                  * changed where this function will be called back again.
> +                *
> +                * We track the number of inactive FDs because they still need to be
> +                * closed by the polling thread after a wakeup on the data_pipe or
> +                * metadata_pipe.
>                  */
>                 if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
>                                 stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
> +                       (*nb_inactive_fd)++;
>                         continue;
>                 }
>                 /*
> @@ -2452,6 +2458,8 @@ void *consumer_thread_data_poll(void *data)
>         struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
>         /* local view of consumer_data.fds_count */
>         int nb_fd = 0;
> +       /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */
> +       int nb_inactive_fd = 0;
>         struct lttng_consumer_local_data *ctx = data;
>         ssize_t len;
>
> @@ -2508,7 +2516,7 @@ void *consumer_thread_data_poll(void *data)
>                                 goto end;
>                         }
>                         ret = update_poll_array(ctx, &pollfd, local_stream,
> -                                       data_ht);
> +                                       data_ht, &nb_inactive_fd);
>                         if (ret < 0) {
>                                 ERR("Error in allocating pollfd or local_outfds");
>                                 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
> @@ -2521,7 +2529,7 @@ void *consumer_thread_data_poll(void *data)
>                 pthread_mutex_unlock(&consumer_data.lock);
>
>                 /* No FDs and consumer_quit, consumer_cleanup the thread */
> -               if (nb_fd == 0 && CMM_LOAD_SHARED(consumer_quit) == 1) {
> +               if (nb_fd == 0 && nb_inactive_fd == 0 && CMM_LOAD_SHARED(consumer_quit) == 1) {
>                         err = 0;        /* All is OK */
>                         goto end;
>                 }
> --
> 2.7.4
>



-- 
Jérémie Galarneau
EfficiOS Inc.
http://www.efficios.com


More information about the lttng-dev mailing list