[lttng-dev] [PATCH lttng-tools] Fix: unpublish stream on close

Jérémie Galarneau jeremie.galarneau at efficios.com
Sat Sep 5 12:19:17 EDT 2015


Squashed into "Fix: Relay daemon ownership and reference counting"
with some comment changes. Read on.

Thanks!
Jérémie

On Fri, Sep 4, 2015 at 3:44 PM, Mathieu Desnoyers
<mathieu.desnoyers at efficios.com> wrote:
> Fixes race where data connection can still add indexes after close,
> preventing graceful teardown of the stream.
>
> Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> ---
>  src/bin/lttng-relayd/main.c   | 11 +++++++++--
>  src/bin/lttng-relayd/stream.c | 29 ++++++++++++++++-------------
>  src/bin/lttng-relayd/stream.h |  1 +
>  3 files changed, 26 insertions(+), 15 deletions(-)
>
> diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c
> index 7b385b4..40ce50a 100644
> --- a/src/bin/lttng-relayd/main.c
> +++ b/src/bin/lttng-relayd/main.c
> @@ -1267,8 +1267,16 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
>                 goto end;
>         }
>         pthread_mutex_lock(&stream->lock);
> -       stream->closed = true;
>         stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
> +       pthread_mutex_unlock(&stream->lock);
> +
> +       /*
> +        * Set last_net_seq_num before the close flag. Required by data
> +        * pending check.
> +        */

Moved this comment over the update.

> +
> +       stream_close(stream);
> +
>         if (stream->is_metadata) {
>                 struct relay_viewer_stream *vstream;
>
> @@ -1287,7 +1295,6 @@ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
>                         viewer_stream_put(vstream);
>                 }
>         }
> -       pthread_mutex_unlock(&stream->lock);
>         stream_put(stream);
>
>  end:
> diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c
> index 870b75a..fca358f 100644
> --- a/src/bin/lttng-relayd/stream.c
> +++ b/src/bin/lttng-relayd/stream.c
> @@ -168,6 +168,7 @@ struct relay_stream *stream_create(struct ctf_trace *trace,
>          * side of the relayd does not have the concept of session.
>          */
>         lttng_ht_add_unique_u64(relay_streams_ht, &stream->node);
> +       stream->in_stream_ht = true;
>
>         DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
>                         stream->stream_handle);
> @@ -231,14 +232,21 @@ unlock:
>   */

Updated this function's header since there is no longer a single
caller to indicate that the stream must be protected either by the
stream->lock, or by virtue of being called by stream_destroy (refcount
== 0).

>  static void stream_unpublish(struct relay_stream *stream)
>  {
> -       if (!stream->published) {
> -               return;
> +       if (stream->in_stream_ht) {
> +               struct lttng_ht_iter iter;
> +               int ret;
> +
> +               iter.iter.node = &stream->node.node;
> +               ret = lttng_ht_del(relay_streams_ht, &iter);
> +               assert(!ret);
> +               stream->in_stream_ht = false;
> +       }
> +       if (stream->published) {
> +               pthread_mutex_lock(&stream->trace->stream_list_lock);
> +               cds_list_del_rcu(&stream->stream_node);
> +               pthread_mutex_unlock(&stream->trace->stream_list_lock);
> +               stream->published = false;
>         }
> -       pthread_mutex_lock(&stream->trace->stream_list_lock);
> -       cds_list_del_rcu(&stream->stream_node);
> -       pthread_mutex_unlock(&stream->trace->stream_list_lock);
> -
> -       stream->published = false;
>  }
>
>  static void stream_destroy(struct relay_stream *stream)
> @@ -267,8 +275,6 @@ static void stream_release(struct urcu_ref *ref)
>         struct relay_stream *stream =
>                 caa_container_of(ref, struct relay_stream, ref);
>         struct relay_session *session;
> -       int ret;
> -       struct lttng_ht_iter iter;
>
>         session = stream->trace->session;
>
> @@ -282,10 +288,6 @@ static void stream_release(struct urcu_ref *ref)
>         }
>         pthread_mutex_unlock(&session->recv_list_lock);
>
> -       iter.iter.node = &stream->node.node;
> -       ret = lttng_ht_del(relay_streams_ht, &iter);
> -       assert(!ret);
> -
>         stream_unpublish(stream);
>
>         if (stream->stream_fd) {
> @@ -333,6 +335,7 @@ void stream_close(struct relay_stream *stream)
>  {
>         DBG("closing stream %" PRIu64, stream->stream_handle);
>         pthread_mutex_lock(&stream->lock);
> +       stream_unpublish(stream);
>         stream->closed = true;
>         relay_index_close_all(stream);
>         pthread_mutex_unlock(&stream->lock);
> diff --git a/src/bin/lttng-relayd/stream.h b/src/bin/lttng-relayd/stream.h
> index ca6be81..542e05c 100644
> --- a/src/bin/lttng-relayd/stream.h
> +++ b/src/bin/lttng-relayd/stream.h
> @@ -132,6 +132,7 @@ struct relay_stream {
>          * Node of stream within global stream hash table.
>          */
>         struct lttng_ht_node_u64 node;
> +       bool in_stream_ht;              /* is stream in stream hash table. */
>         struct rcu_head rcu_node;       /* For call_rcu teardown. */
>  };
>
> --
> 2.1.4
>



-- 
Jérémie Galarneau
EfficiOS Inc.
http://www.efficios.com



More information about the lttng-dev mailing list