[lttng-dev] [PATCH lttng-tools 02/10] Fix: UST should not generate packet at destroy after stop

Jérémie Galarneau jeremie.galarneau at efficios.com
Thu May 19 04:09:12 UTC 2016


On Wed, May 18, 2016 at 2:04 PM, Mathieu Desnoyers
<mathieu.desnoyers at efficios.com> wrote:
> In the following scenario:
> - create, enable events (ust),
> - start
> - ...
> - stop (await for data_pending to complete)
> - destroy
> - rm the trace directory
>
> We would expect that the "rm" operation would not conflict with the
> consumer daemon trying to output data into the trace files, since the
> "stop" operation ensured that there was no data_pending.
>
> However, the "destroy" operation currently generates an extra packet
> after the data_pending check (the "on_stream_hangup"). This causes the
> consumer daemon to try to perform trace file rotation concurrently with
> the trace directory removal in the scenario above, which triggers
> errors. The main reason why this empty packet is generated by "destroy"
> is to deal with trace start/stop scenario which would otherwise generate
> a completely empty stream.
>
> Therefore, introduce the concept of a "quiescent stream". It is
> initialized at false on stream creation (first packet is empty). When
> tracing is started, it is set to false (for cases of start/stop/start).
> When tracing is stopped, if the stream is not quiescent, perform a
> "final" flush (which will generate an empty packet if the current packet
> was empty), and set quiescent to true.  On "destroy" stream and on
> application hangup: if the stream is not quiescent, perform a "final"
> flush, and set the quiescent state to true.
>
> Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> ---
>  src/bin/lttng-sessiond/consumer.c        |  32 +++++++
>  src/bin/lttng-sessiond/consumer.h        |   1 +
>  src/bin/lttng-sessiond/ust-app.c         | 142 +++++++++++++++++++++++++++++++
>  src/common/consumer/consumer.h           |  13 +++
>  src/common/sessiond-comm/sessiond-comm.h |   3 +
>  src/common/ust-consumer/ust-consumer.c   |  67 ++++++++++++++-
>  6 files changed, 256 insertions(+), 2 deletions(-)
>
> diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c
> index bd019dd..2da3723 100644
> --- a/src/bin/lttng-sessiond/consumer.c
> +++ b/src/bin/lttng-sessiond/consumer.c
> @@ -1185,6 +1185,38 @@ end:
>  }
>
>  /*
> + * Send a clear quiescent command to consumer using the given channel key.
> + *
> + * Return 0 on success else a negative value.
> + */
> +int consumer_clear_quiescent_channel(struct consumer_socket *socket, uint64_t key)
> +{
> +       int ret;
> +       struct lttcomm_consumer_msg msg;
> +
> +       assert(socket);
> +
> +       DBG2("Consumer clear quiescent channel key %" PRIu64, key);
> +
> +       memset(&msg, 0, sizeof(msg));
> +       msg.cmd_type = LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL;
> +       msg.u.clear_quiescent_channel.key = key;
> +
> +       pthread_mutex_lock(socket->lock);
> +       health_code_update();
> +
> +       ret = consumer_send_msg(socket, &msg);
> +       if (ret < 0) {
> +               goto end;
> +       }
> +
> +end:
> +       health_code_update();
> +       pthread_mutex_unlock(socket->lock);
> +       return ret;
> +}
> +
> +/*
>   * Send a close metadata command to consumer using the given channel key.
>   * Called with registry lock held.
>   *
> diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h
> index 75a40f8..08b57eb 100644
> --- a/src/bin/lttng-sessiond/consumer.h
> +++ b/src/bin/lttng-sessiond/consumer.h
> @@ -284,6 +284,7 @@ int consumer_push_metadata(struct consumer_socket *socket,
>                 uint64_t metadata_key, char *metadata_str, size_t len,
>                 size_t target_offset, uint64_t version);
>  int consumer_flush_channel(struct consumer_socket *socket, uint64_t key);
> +int consumer_clear_quiescent_channel(struct consumer_socket *socket, uint64_t key);
>  int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key,
>                 struct consumer_output *consumer, uint64_t *discarded);
>  int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key,
> diff --git a/src/bin/lttng-sessiond/ust-app.c b/src/bin/lttng-sessiond/ust-app.c
> index 1bb183d..bf0569e 100644
> --- a/src/bin/lttng-sessiond/ust-app.c
> +++ b/src/bin/lttng-sessiond/ust-app.c
> @@ -4650,6 +4650,140 @@ int ust_app_flush_session(struct ltt_ust_session *usess)
>         return ret;
>  }
>
> +static
> +int ust_app_clear_quiescent_app_session(struct ust_app *app,
> +               struct ust_app_session *ua_sess)
> +{
> +       int ret, retval = 0;
> +       struct lttng_ht_iter iter;
> +       struct ust_app_channel *ua_chan;
> +       struct consumer_socket *socket;
> +
> +       DBG("Clearing stream quiescent state for ust app pid %d", app->pid);
> +
> +       rcu_read_lock();
> +
> +       if (!app->compatible) {
> +               goto end_not_compatible;
> +       }
> +
> +       pthread_mutex_lock(&ua_sess->lock);
> +
> +       if (ua_sess->deleted) {
> +               goto end_deleted;
> +       }
> +
> +       health_code_update();
> +
> +       socket = consumer_find_socket_by_bitness(app->bits_per_long,
> +                       ua_sess->consumer);

Added error checking (logging) for "!socket" here.

> +
> +       /* Clear quiescent state. */
> +       switch (ua_sess->buffer_type) {
> +       case LTTNG_BUFFER_PER_PID:
> +               cds_lfht_for_each_entry(ua_sess->channels->ht, &iter.iter, ua_chan,
> +                               node.node) {
> +                       health_code_update();
> +                       ret = consumer_clear_quiescent_channel(socket, ua_chan->key);
> +                       if (ret) {
> +                               ERR("Error clearing quiescent state for consumer channel");
> +                               retval = -1;

Combined ret and retval.

> +                               continue;
> +                       }
> +               }
> +               break;
> +       case LTTNG_BUFFER_PER_UID:
> +       default:
> +               assert(0);
> +               break;
> +       }
> +
> +       health_code_update();
> +
> +end_deleted:
> +       pthread_mutex_unlock(&ua_sess->lock);
> +
> +end_not_compatible:
> +       rcu_read_unlock();
> +       health_code_update();
> +       return retval;
> +}
> +
> +/*
> + * Clear quiescent state in each stream for all applications for a
> + * specific UST session.
> + * Called with UST session lock held.
> + */
> +static
> +int ust_app_clear_quiescent_session(struct ltt_ust_session *usess)
> +
> +{
> +       int ret = 0;
> +
> +       DBG("Clearing stream quiescent state for all ust apps");
> +
> +       rcu_read_lock();
> +
> +       switch (usess->buffer_type) {
> +       case LTTNG_BUFFER_PER_UID:
> +       {
> +               struct buffer_reg_uid *reg;
> +               struct lttng_ht_iter iter;
> +
> +               /*
> +                * Clear quiescent for all per UID buffers associated to
> +                * that session.
> +                */
> +               cds_list_for_each_entry(reg, &usess->buffer_reg_uid_list, lnode) {
> +                       struct buffer_reg_channel *reg_chan;
> +                       struct consumer_socket *socket;
> +
> +                       /* Get associated consumer socket.*/
> +                       socket = consumer_find_socket_by_bitness(reg->bits_per_long,
> +                                       usess->consumer);
> +                       if (!socket) {
> +                               /* Ignore request if no consumer is found for the session. */
> +                               continue;
> +                       }
> +
> +                       cds_lfht_for_each_entry(reg->registry->channels->ht, &iter.iter,
> +                                       reg_chan, node.node) {
> +                               /*
> +                                * The following call will print error values so the return
> +                                * code is of little importance because whatever happens, we
> +                                * have to try them all.
> +                                */
> +                               (void) consumer_clear_quiescent_channel(socket, reg_chan->consumer_key);
> +                       }
> +               }
> +               break;
> +       }
> +       case LTTNG_BUFFER_PER_PID:
> +       {
> +               struct ust_app_session *ua_sess;
> +               struct lttng_ht_iter iter;
> +               struct ust_app *app;
> +
> +               cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
> +                       ua_sess = lookup_session_by_app(usess, app);
> +                       if (ua_sess == NULL) {
> +                               continue;
> +                       }
> +                       (void) ust_app_clear_quiescent_app_session(app, ua_sess);
> +               }
> +               break;
> +       }
> +       default:
> +               ret = -1;
> +               assert(0);
> +               break;
> +       }
> +
> +       rcu_read_unlock();
> +       health_code_update();
> +       return ret;
> +}
> +
>  /*
>   * Destroy a specific UST session in apps.
>   */
> @@ -4708,6 +4842,14 @@ int ust_app_start_trace_all(struct ltt_ust_session *usess)
>
>         rcu_read_lock();
>
> +       /*
> +        * In a start-stop-start use-case, we need to clear the quiescent state
> +        * of each channel set by the prior stop command, thus ensuring that a
> +        * following stop or destroy is sure to grab a timestamp_end near those
> +        * operations, even if the packet is empty.
> +        */
> +       (void) ust_app_clear_quiescent_session(usess);
> +
>         cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
>                 ret = ust_app_start_trace(usess, app);
>                 if (ret < 0) {
> diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h
> index 59764e1..d2f225a 100644
> --- a/src/common/consumer/consumer.h
> +++ b/src/common/consumer/consumer.h
> @@ -60,6 +60,7 @@ enum lttng_consumer_command {
>         LTTNG_CONSUMER_STREAMS_SENT,
>         LTTNG_CONSUMER_DISCARDED_EVENTS,
>         LTTNG_CONSUMER_LOST_PACKETS,
> +       LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL,
>  };
>
>  /* State of each fd in consumer */
> @@ -250,6 +251,18 @@ struct lttng_consumer_stream {
>         int hangup_flush_done;
>
>         /*
> +        * Whether the stream is in a "complete" state, e.g. no
> +        * partially written sub-buffer.
> +        * Initialized at false on stream creation (first packet is empty).
> +        * "start" tracing: set to false.
> +        * "stop" tracing: if !quiescent -> flush FINAL, set to true.
> +        * "destroy" stream/application hangup: if !quiescent -> flush
> +        * FINAL, set to true.
> +        * Update and read are protected by the stream lock.

I fleshed this out a bit more since I don't want to go through that
gymnastic again ;-)


The rest of the patch looks good, merged!

Thanks!
Jérémie

> +        */
> +       bool quiescent;
> +
> +       /*
>          * metadata_timer_lock protects flags waiting_on_metadata and
>          * missed_metadata_flush.
>          */
> diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h
> index e7fd69d..0983865 100644
> --- a/src/common/sessiond-comm/sessiond-comm.h
> +++ b/src/common/sessiond-comm/sessiond-comm.h
> @@ -501,6 +501,9 @@ struct lttcomm_consumer_msg {
>                         uint64_t key;   /* Channel key. */
>                 } LTTNG_PACKED flush_channel;
>                 struct {
> +                       uint64_t key;   /* Channel key. */
> +               } LTTNG_PACKED clear_quiescent_channel;
> +               struct {
>                         char pathname[PATH_MAX];
>                         /* Indicate if the snapshot goes on the relayd or locally. */
>                         uint32_t use_relayd;
> diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c
> index a113ef1..91ffdce 100644
> --- a/src/common/ust-consumer/ust-consumer.c
> +++ b/src/common/ust-consumer/ust-consumer.c
> @@ -767,7 +767,54 @@ static int flush_channel(uint64_t chan_key)
>
>                 health_code_update();
>
> -               ustctl_flush_buffer(stream->ustream, 1);
> +               pthread_mutex_lock(&stream->lock);
> +               if (!stream->quiescent) {
> +                       stream->quiescent = true;
> +                       ustctl_flush_buffer(stream->ustream, 0);
> +               }
> +               pthread_mutex_unlock(&stream->lock);
> +       }
> +error:
> +       rcu_read_unlock();
> +       return ret;
> +}
> +
> +/*
> + * Clear quiescent state from channel's streams using the given key to
> + * retrieve the channel.
> + *
> + * Return 0 on success else an LTTng error code.
> + */
> +static int clear_quiescent_channel(uint64_t chan_key)
> +{
> +       int ret = 0;
> +       struct lttng_consumer_channel *channel;
> +       struct lttng_consumer_stream *stream;
> +       struct lttng_ht *ht;
> +       struct lttng_ht_iter iter;
> +
> +       DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key);
> +
> +       rcu_read_lock();
> +       channel = consumer_find_channel(chan_key);
> +       if (!channel) {
> +               ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key);
> +               ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
> +               goto error;
> +       }
> +
> +       ht = consumer_data.stream_per_chan_id_ht;
> +
> +       /* For each stream of the channel id, clear quiescent state. */
> +       cds_lfht_for_each_entry_duplicate(ht->ht,
> +                       ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
> +                       &channel->key, &iter.iter, stream, node_channel_id.node) {
> +
> +               health_code_update();
> +
> +               pthread_mutex_lock(&stream->lock);
> +               stream->quiescent = false;
> +               pthread_mutex_unlock(&stream->lock);
>         }
>  error:
>         rcu_read_unlock();
> @@ -1582,6 +1629,17 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
>
>                 goto end_msg_sessiond;
>         }
> +       case LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL:
> +       {
> +               int ret;
> +
> +               ret = clear_quiescent_channel(msg.u.clear_quiescent_channel.key);
> +               if (ret != 0) {
> +                       ret_code = ret;
> +               }
> +
> +               goto end_msg_sessiond;
> +       }
>         case LTTNG_CONSUMER_PUSH_METADATA:
>         {
>                 int ret;
> @@ -1951,7 +2009,12 @@ void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
>         assert(stream);
>         assert(stream->ustream);
>
> -       ustctl_flush_buffer(stream->ustream, 0);
> +       pthread_mutex_lock(&stream->lock);
> +       if (!stream->quiescent) {
> +               stream->quiescent = true;
> +               ustctl_flush_buffer(stream->ustream, 0);
> +       }
> +       pthread_mutex_unlock(&stream->lock);
>         stream->hangup_flush_done = 1;
>  }
>
> --
> 2.1.4
>



-- 
Jérémie Galarneau
EfficiOS Inc.
http://www.efficios.com


More information about the lttng-dev mailing list