[lttng-dev] [PATCH lttng-tools 02/10] Fix: UST should not generate packet at destroy after stop
Jérémie Galarneau
jeremie.galarneau at efficios.com
Thu May 19 04:09:12 UTC 2016
On Wed, May 18, 2016 at 2:04 PM, Mathieu Desnoyers
<mathieu.desnoyers at efficios.com> wrote:
> In the following scenario:
> - create, enable events (ust),
> - start
> - ...
> - stop (await for data_pending to complete)
> - destroy
> - rm the trace directory
>
> We would expect that the "rm" operation would not conflict with the
> consumer daemon trying to output data into the trace files, since the
> "stop" operation ensured that there was no data_pending.
>
> However, the "destroy" operation currently generates an extra packet
> after the data_pending check (the "on_stream_hangup"). This causes the
> consumer daemon to try to perform trace file rotation concurrently with
> the trace directory removal in the scenario above, which triggers
> errors. The main reason why this empty packet is generated by "destroy"
> is to deal with trace start/stop scenario which would otherwise generate
> a completely empty stream.
>
> Therefore, introduce the concept of a "quiescent stream". It is
> initialized at false on stream creation (first packet is empty). When
> tracing is started, it is set to false (for cases of start/stop/start).
> When tracing is stopped, if the stream is not quiescent, perform a
> "final" flush (which will generate an empty packet if the current packet
> was empty), and set quiescent to true. On "destroy" stream and on
> application hangup: if the stream is not quiescent, perform a "final"
> flush, and set the quiescent state to true.
>
> Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> ---
> src/bin/lttng-sessiond/consumer.c | 32 +++++++
> src/bin/lttng-sessiond/consumer.h | 1 +
> src/bin/lttng-sessiond/ust-app.c | 142 +++++++++++++++++++++++++++++++
> src/common/consumer/consumer.h | 13 +++
> src/common/sessiond-comm/sessiond-comm.h | 3 +
> src/common/ust-consumer/ust-consumer.c | 67 ++++++++++++++-
> 6 files changed, 256 insertions(+), 2 deletions(-)
>
> diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c
> index bd019dd..2da3723 100644
> --- a/src/bin/lttng-sessiond/consumer.c
> +++ b/src/bin/lttng-sessiond/consumer.c
> @@ -1185,6 +1185,38 @@ end:
> }
>
> /*
> + * Send a clear quiescent command to consumer using the given channel key.
> + *
> + * Return 0 on success else a negative value.
> + */
> +int consumer_clear_quiescent_channel(struct consumer_socket *socket, uint64_t key)
> +{
> + int ret;
> + struct lttcomm_consumer_msg msg;
> +
> + assert(socket);
> +
> + DBG2("Consumer clear quiescent channel key %" PRIu64, key);
> +
> + memset(&msg, 0, sizeof(msg));
> + msg.cmd_type = LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL;
> + msg.u.clear_quiescent_channel.key = key;
> +
> + pthread_mutex_lock(socket->lock);
> + health_code_update();
> +
> + ret = consumer_send_msg(socket, &msg);
> + if (ret < 0) {
> + goto end;
> + }
> +
> +end:
> + health_code_update();
> + pthread_mutex_unlock(socket->lock);
> + return ret;
> +}
> +
> +/*
> * Send a close metadata command to consumer using the given channel key.
> * Called with registry lock held.
> *
> diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h
> index 75a40f8..08b57eb 100644
> --- a/src/bin/lttng-sessiond/consumer.h
> +++ b/src/bin/lttng-sessiond/consumer.h
> @@ -284,6 +284,7 @@ int consumer_push_metadata(struct consumer_socket *socket,
> uint64_t metadata_key, char *metadata_str, size_t len,
> size_t target_offset, uint64_t version);
> int consumer_flush_channel(struct consumer_socket *socket, uint64_t key);
> +int consumer_clear_quiescent_channel(struct consumer_socket *socket, uint64_t key);
> int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key,
> struct consumer_output *consumer, uint64_t *discarded);
> int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key,
> diff --git a/src/bin/lttng-sessiond/ust-app.c b/src/bin/lttng-sessiond/ust-app.c
> index 1bb183d..bf0569e 100644
> --- a/src/bin/lttng-sessiond/ust-app.c
> +++ b/src/bin/lttng-sessiond/ust-app.c
> @@ -4650,6 +4650,140 @@ int ust_app_flush_session(struct ltt_ust_session *usess)
> return ret;
> }
>
> +static
> +int ust_app_clear_quiescent_app_session(struct ust_app *app,
> + struct ust_app_session *ua_sess)
> +{
> + int ret, retval = 0;
> + struct lttng_ht_iter iter;
> + struct ust_app_channel *ua_chan;
> + struct consumer_socket *socket;
> +
> + DBG("Clearing stream quiescent state for ust app pid %d", app->pid);
> +
> + rcu_read_lock();
> +
> + if (!app->compatible) {
> + goto end_not_compatible;
> + }
> +
> + pthread_mutex_lock(&ua_sess->lock);
> +
> + if (ua_sess->deleted) {
> + goto end_deleted;
> + }
> +
> + health_code_update();
> +
> + socket = consumer_find_socket_by_bitness(app->bits_per_long,
> + ua_sess->consumer);
Added error checking (logging) for "!socket" here.
> +
> + /* Clear quiescent state. */
> + switch (ua_sess->buffer_type) {
> + case LTTNG_BUFFER_PER_PID:
> + cds_lfht_for_each_entry(ua_sess->channels->ht, &iter.iter, ua_chan,
> + node.node) {
> + health_code_update();
> + ret = consumer_clear_quiescent_channel(socket, ua_chan->key);
> + if (ret) {
> + ERR("Error clearing quiescent state for consumer channel");
> + retval = -1;
Combined ret and retval.
> + continue;
> + }
> + }
> + break;
> + case LTTNG_BUFFER_PER_UID:
> + default:
> + assert(0);
> + break;
> + }
> +
> + health_code_update();
> +
> +end_deleted:
> + pthread_mutex_unlock(&ua_sess->lock);
> +
> +end_not_compatible:
> + rcu_read_unlock();
> + health_code_update();
> + return retval;
> +}
> +
> +/*
> + * Clear quiescent state in each stream for all applications for a
> + * specific UST session.
> + * Called with UST session lock held.
> + */
> +static
> +int ust_app_clear_quiescent_session(struct ltt_ust_session *usess)
> +
> +{
> + int ret = 0;
> +
> + DBG("Clearing stream quiescent state for all ust apps");
> +
> + rcu_read_lock();
> +
> + switch (usess->buffer_type) {
> + case LTTNG_BUFFER_PER_UID:
> + {
> + struct buffer_reg_uid *reg;
> + struct lttng_ht_iter iter;
> +
> + /*
> + * Clear quiescent for all per UID buffers associated to
> + * that session.
> + */
> + cds_list_for_each_entry(reg, &usess->buffer_reg_uid_list, lnode) {
> + struct buffer_reg_channel *reg_chan;
> + struct consumer_socket *socket;
> +
> + /* Get associated consumer socket.*/
> + socket = consumer_find_socket_by_bitness(reg->bits_per_long,
> + usess->consumer);
> + if (!socket) {
> + /* Ignore request if no consumer is found for the session. */
> + continue;
> + }
> +
> + cds_lfht_for_each_entry(reg->registry->channels->ht, &iter.iter,
> + reg_chan, node.node) {
> + /*
> + * The following call will print error values so the return
> + * code is of little importance because whatever happens, we
> + * have to try them all.
> + */
> + (void) consumer_clear_quiescent_channel(socket, reg_chan->consumer_key);
> + }
> + }
> + break;
> + }
> + case LTTNG_BUFFER_PER_PID:
> + {
> + struct ust_app_session *ua_sess;
> + struct lttng_ht_iter iter;
> + struct ust_app *app;
> +
> + cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
> + ua_sess = lookup_session_by_app(usess, app);
> + if (ua_sess == NULL) {
> + continue;
> + }
> + (void) ust_app_clear_quiescent_app_session(app, ua_sess);
> + }
> + break;
> + }
> + default:
> + ret = -1;
> + assert(0);
> + break;
> + }
> +
> + rcu_read_unlock();
> + health_code_update();
> + return ret;
> +}
> +
> /*
> * Destroy a specific UST session in apps.
> */
> @@ -4708,6 +4842,14 @@ int ust_app_start_trace_all(struct ltt_ust_session *usess)
>
> rcu_read_lock();
>
> + /*
> + * In a start-stop-start use-case, we need to clear the quiescent state
> + * of each channel set by the prior stop command, thus ensuring that a
> + * following stop or destroy is sure to grab a timestamp_end near those
> + * operations, even if the packet is empty.
> + */
> + (void) ust_app_clear_quiescent_session(usess);
> +
> cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
> ret = ust_app_start_trace(usess, app);
> if (ret < 0) {
> diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h
> index 59764e1..d2f225a 100644
> --- a/src/common/consumer/consumer.h
> +++ b/src/common/consumer/consumer.h
> @@ -60,6 +60,7 @@ enum lttng_consumer_command {
> LTTNG_CONSUMER_STREAMS_SENT,
> LTTNG_CONSUMER_DISCARDED_EVENTS,
> LTTNG_CONSUMER_LOST_PACKETS,
> + LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL,
> };
>
> /* State of each fd in consumer */
> @@ -250,6 +251,18 @@ struct lttng_consumer_stream {
> int hangup_flush_done;
>
> /*
> + * Whether the stream is in a "complete" state, e.g. no
> + * partially written sub-buffer.
> + * Initialized at false on stream creation (first packet is empty).
> + * "start" tracing: set to false.
> + * "stop" tracing: if !quiescent -> flush FINAL, set to true.
> + * "destroy" stream/application hangup: if !quiescent -> flush
> + * FINAL, set to true.
> + * Update and read are protected by the stream lock.
I fleshed this out a bit more since I don't want to go through that
gymnastic again ;-)
The rest of the patch looks good, merged!
Thanks!
Jérémie
> + */
> + bool quiescent;
> +
> + /*
> * metadata_timer_lock protects flags waiting_on_metadata and
> * missed_metadata_flush.
> */
> diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h
> index e7fd69d..0983865 100644
> --- a/src/common/sessiond-comm/sessiond-comm.h
> +++ b/src/common/sessiond-comm/sessiond-comm.h
> @@ -501,6 +501,9 @@ struct lttcomm_consumer_msg {
> uint64_t key; /* Channel key. */
> } LTTNG_PACKED flush_channel;
> struct {
> + uint64_t key; /* Channel key. */
> + } LTTNG_PACKED clear_quiescent_channel;
> + struct {
> char pathname[PATH_MAX];
> /* Indicate if the snapshot goes on the relayd or locally. */
> uint32_t use_relayd;
> diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c
> index a113ef1..91ffdce 100644
> --- a/src/common/ust-consumer/ust-consumer.c
> +++ b/src/common/ust-consumer/ust-consumer.c
> @@ -767,7 +767,54 @@ static int flush_channel(uint64_t chan_key)
>
> health_code_update();
>
> - ustctl_flush_buffer(stream->ustream, 1);
> + pthread_mutex_lock(&stream->lock);
> + if (!stream->quiescent) {
> + stream->quiescent = true;
> + ustctl_flush_buffer(stream->ustream, 0);
> + }
> + pthread_mutex_unlock(&stream->lock);
> + }
> +error:
> + rcu_read_unlock();
> + return ret;
> +}
> +
> +/*
> + * Clear quiescent state from channel's streams using the given key to
> + * retrieve the channel.
> + *
> + * Return 0 on success else an LTTng error code.
> + */
> +static int clear_quiescent_channel(uint64_t chan_key)
> +{
> + int ret = 0;
> + struct lttng_consumer_channel *channel;
> + struct lttng_consumer_stream *stream;
> + struct lttng_ht *ht;
> + struct lttng_ht_iter iter;
> +
> + DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key);
> +
> + rcu_read_lock();
> + channel = consumer_find_channel(chan_key);
> + if (!channel) {
> + ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key);
> + ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
> + goto error;
> + }
> +
> + ht = consumer_data.stream_per_chan_id_ht;
> +
> + /* For each stream of the channel id, clear quiescent state. */
> + cds_lfht_for_each_entry_duplicate(ht->ht,
> + ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
> + &channel->key, &iter.iter, stream, node_channel_id.node) {
> +
> + health_code_update();
> +
> + pthread_mutex_lock(&stream->lock);
> + stream->quiescent = false;
> + pthread_mutex_unlock(&stream->lock);
> }
> error:
> rcu_read_unlock();
> @@ -1582,6 +1629,17 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
>
> goto end_msg_sessiond;
> }
> + case LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL:
> + {
> + int ret;
> +
> + ret = clear_quiescent_channel(msg.u.clear_quiescent_channel.key);
> + if (ret != 0) {
> + ret_code = ret;
> + }
> +
> + goto end_msg_sessiond;
> + }
> case LTTNG_CONSUMER_PUSH_METADATA:
> {
> int ret;
> @@ -1951,7 +2009,12 @@ void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
> assert(stream);
> assert(stream->ustream);
>
> - ustctl_flush_buffer(stream->ustream, 0);
> + pthread_mutex_lock(&stream->lock);
> + if (!stream->quiescent) {
> + stream->quiescent = true;
> + ustctl_flush_buffer(stream->ustream, 0);
> + }
> + pthread_mutex_unlock(&stream->lock);
> stream->hangup_flush_done = 1;
> }
>
> --
> 2.1.4
>
--
Jérémie Galarneau
EfficiOS Inc.
http://www.efficios.com
More information about the lttng-dev
mailing list