[lttng-dev] [PATCH lttng-tools 02/10] Fix: UST should not generate packet at destroy after stop

Mathieu Desnoyers mathieu.desnoyers at efficios.com
Wed May 18 18:04:11 UTC 2016


In the following scenario:
- create, enable events (ust),
- start
- ...
- stop (await for data_pending to complete)
- destroy
- rm the trace directory

We would expect that the "rm" operation would not conflict with the
consumer daemon trying to output data into the trace files, since the
"stop" operation ensured that there was no data_pending.

However, the "destroy" operation currently generates an extra packet
after the data_pending check (the "on_stream_hangup"). This causes the
consumer daemon to try to perform trace file rotation concurrently with
the trace directory removal in the scenario above, which triggers
errors. The main reason why this empty packet is generated by "destroy"
is to deal with trace start/stop scenario which would otherwise generate
a completely empty stream.

Therefore, introduce the concept of a "quiescent stream". It is
initialized at false on stream creation (first packet is empty). When
tracing is started, it is set to false (for cases of start/stop/start).
When tracing is stopped, if the stream is not quiescent, perform a
"final" flush (which will generate an empty packet if the current packet
was empty), and set quiescent to true.  On "destroy" stream and on
application hangup: if the stream is not quiescent, perform a "final"
flush, and set the quiescent state to true.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
---
 src/bin/lttng-sessiond/consumer.c        |  32 +++++++
 src/bin/lttng-sessiond/consumer.h        |   1 +
 src/bin/lttng-sessiond/ust-app.c         | 142 +++++++++++++++++++++++++++++++
 src/common/consumer/consumer.h           |  13 +++
 src/common/sessiond-comm/sessiond-comm.h |   3 +
 src/common/ust-consumer/ust-consumer.c   |  67 ++++++++++++++-
 6 files changed, 256 insertions(+), 2 deletions(-)

diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c
index bd019dd..2da3723 100644
--- a/src/bin/lttng-sessiond/consumer.c
+++ b/src/bin/lttng-sessiond/consumer.c
@@ -1185,6 +1185,38 @@ end:
 }
 
 /*
+ * Send a clear quiescent command to consumer using the given channel key.
+ *
+ * Return 0 on success else a negative value.
+ */
+int consumer_clear_quiescent_channel(struct consumer_socket *socket, uint64_t key)
+{
+	int ret;
+	struct lttcomm_consumer_msg msg;
+
+	assert(socket);
+
+	DBG2("Consumer clear quiescent channel key %" PRIu64, key);
+
+	memset(&msg, 0, sizeof(msg));
+	msg.cmd_type = LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL;
+	msg.u.clear_quiescent_channel.key = key;
+
+	pthread_mutex_lock(socket->lock);
+	health_code_update();
+
+	ret = consumer_send_msg(socket, &msg);
+	if (ret < 0) {
+		goto end;
+	}
+
+end:
+	health_code_update();
+	pthread_mutex_unlock(socket->lock);
+	return ret;
+}
+
+/*
  * Send a close metadata command to consumer using the given channel key.
  * Called with registry lock held.
  *
diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h
index 75a40f8..08b57eb 100644
--- a/src/bin/lttng-sessiond/consumer.h
+++ b/src/bin/lttng-sessiond/consumer.h
@@ -284,6 +284,7 @@ int consumer_push_metadata(struct consumer_socket *socket,
 		uint64_t metadata_key, char *metadata_str, size_t len,
 		size_t target_offset, uint64_t version);
 int consumer_flush_channel(struct consumer_socket *socket, uint64_t key);
+int consumer_clear_quiescent_channel(struct consumer_socket *socket, uint64_t key);
 int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key,
 		struct consumer_output *consumer, uint64_t *discarded);
 int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key,
diff --git a/src/bin/lttng-sessiond/ust-app.c b/src/bin/lttng-sessiond/ust-app.c
index 1bb183d..bf0569e 100644
--- a/src/bin/lttng-sessiond/ust-app.c
+++ b/src/bin/lttng-sessiond/ust-app.c
@@ -4650,6 +4650,140 @@ int ust_app_flush_session(struct ltt_ust_session *usess)
 	return ret;
 }
 
+static
+int ust_app_clear_quiescent_app_session(struct ust_app *app,
+		struct ust_app_session *ua_sess)
+{
+	int ret, retval = 0;
+	struct lttng_ht_iter iter;
+	struct ust_app_channel *ua_chan;
+	struct consumer_socket *socket;
+
+	DBG("Clearing stream quiescent state for ust app pid %d", app->pid);
+
+	rcu_read_lock();
+
+	if (!app->compatible) {
+		goto end_not_compatible;
+	}
+
+	pthread_mutex_lock(&ua_sess->lock);
+
+	if (ua_sess->deleted) {
+		goto end_deleted;
+	}
+
+	health_code_update();
+
+	socket = consumer_find_socket_by_bitness(app->bits_per_long,
+			ua_sess->consumer);
+
+	/* Clear quiescent state. */
+	switch (ua_sess->buffer_type) {
+	case LTTNG_BUFFER_PER_PID:
+		cds_lfht_for_each_entry(ua_sess->channels->ht, &iter.iter, ua_chan,
+				node.node) {
+			health_code_update();
+			ret = consumer_clear_quiescent_channel(socket, ua_chan->key);
+			if (ret) {
+				ERR("Error clearing quiescent state for consumer channel");
+				retval = -1;
+				continue;
+			}
+		}
+		break;
+	case LTTNG_BUFFER_PER_UID:
+	default:
+		assert(0);
+		break;
+	}
+
+	health_code_update();
+
+end_deleted:
+	pthread_mutex_unlock(&ua_sess->lock);
+
+end_not_compatible:
+	rcu_read_unlock();
+	health_code_update();
+	return retval;
+}
+
+/*
+ * Clear quiescent state in each stream for all applications for a
+ * specific UST session.
+ * Called with UST session lock held.
+ */
+static
+int ust_app_clear_quiescent_session(struct ltt_ust_session *usess)
+
+{
+	int ret = 0;
+
+	DBG("Clearing stream quiescent state for all ust apps");
+
+	rcu_read_lock();
+
+	switch (usess->buffer_type) {
+	case LTTNG_BUFFER_PER_UID:
+	{
+		struct buffer_reg_uid *reg;
+		struct lttng_ht_iter iter;
+
+		/*
+		 * Clear quiescent for all per UID buffers associated to
+		 * that session.
+		 */
+		cds_list_for_each_entry(reg, &usess->buffer_reg_uid_list, lnode) {
+			struct buffer_reg_channel *reg_chan;
+			struct consumer_socket *socket;
+
+			/* Get associated consumer socket.*/
+			socket = consumer_find_socket_by_bitness(reg->bits_per_long,
+					usess->consumer);
+			if (!socket) {
+				/* Ignore request if no consumer is found for the session. */
+				continue;
+			}
+
+			cds_lfht_for_each_entry(reg->registry->channels->ht, &iter.iter,
+					reg_chan, node.node) {
+				/*
+				 * The following call will print error values so the return
+				 * code is of little importance because whatever happens, we
+				 * have to try them all.
+				 */
+				(void) consumer_clear_quiescent_channel(socket, reg_chan->consumer_key);
+			}
+		}
+		break;
+	}
+	case LTTNG_BUFFER_PER_PID:
+	{
+		struct ust_app_session *ua_sess;
+		struct lttng_ht_iter iter;
+		struct ust_app *app;
+
+		cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
+			ua_sess = lookup_session_by_app(usess, app);
+			if (ua_sess == NULL) {
+				continue;
+			}
+			(void) ust_app_clear_quiescent_app_session(app, ua_sess);
+		}
+		break;
+	}
+	default:
+		ret = -1;
+		assert(0);
+		break;
+	}
+
+	rcu_read_unlock();
+	health_code_update();
+	return ret;
+}
+
 /*
  * Destroy a specific UST session in apps.
  */
@@ -4708,6 +4842,14 @@ int ust_app_start_trace_all(struct ltt_ust_session *usess)
 
 	rcu_read_lock();
 
+	/*
+	 * In a start-stop-start use-case, we need to clear the quiescent state
+	 * of each channel set by the prior stop command, thus ensuring that a
+	 * following stop or destroy is sure to grab a timestamp_end near those
+	 * operations, even if the packet is empty.
+	 */
+	(void) ust_app_clear_quiescent_session(usess);
+
 	cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
 		ret = ust_app_start_trace(usess, app);
 		if (ret < 0) {
diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h
index 59764e1..d2f225a 100644
--- a/src/common/consumer/consumer.h
+++ b/src/common/consumer/consumer.h
@@ -60,6 +60,7 @@ enum lttng_consumer_command {
 	LTTNG_CONSUMER_STREAMS_SENT,
 	LTTNG_CONSUMER_DISCARDED_EVENTS,
 	LTTNG_CONSUMER_LOST_PACKETS,
+	LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL,
 };
 
 /* State of each fd in consumer */
@@ -250,6 +251,18 @@ struct lttng_consumer_stream {
 	int hangup_flush_done;
 
 	/*
+	 * Whether the stream is in a "complete" state, e.g. no
+	 * partially written sub-buffer.
+	 * Initialized at false on stream creation (first packet is empty).
+	 * "start" tracing: set to false.
+	 * "stop" tracing: if !quiescent -> flush FINAL, set to true.
+	 * "destroy" stream/application hangup: if !quiescent -> flush
+	 * FINAL, set to true.
+	 * Update and read are protected by the stream lock.
+	 */
+	bool quiescent;
+
+	/*
 	 * metadata_timer_lock protects flags waiting_on_metadata and
 	 * missed_metadata_flush.
 	 */
diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h
index e7fd69d..0983865 100644
--- a/src/common/sessiond-comm/sessiond-comm.h
+++ b/src/common/sessiond-comm/sessiond-comm.h
@@ -501,6 +501,9 @@ struct lttcomm_consumer_msg {
 			uint64_t key;	/* Channel key. */
 		} LTTNG_PACKED flush_channel;
 		struct {
+			uint64_t key;	/* Channel key. */
+		} LTTNG_PACKED clear_quiescent_channel;
+		struct {
 			char pathname[PATH_MAX];
 			/* Indicate if the snapshot goes on the relayd or locally. */
 			uint32_t use_relayd;
diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c
index a113ef1..91ffdce 100644
--- a/src/common/ust-consumer/ust-consumer.c
+++ b/src/common/ust-consumer/ust-consumer.c
@@ -767,7 +767,54 @@ static int flush_channel(uint64_t chan_key)
 
 		health_code_update();
 
-		ustctl_flush_buffer(stream->ustream, 1);
+		pthread_mutex_lock(&stream->lock);
+		if (!stream->quiescent) {
+			stream->quiescent = true;
+			ustctl_flush_buffer(stream->ustream, 0);
+		}
+		pthread_mutex_unlock(&stream->lock);
+	}
+error:
+	rcu_read_unlock();
+	return ret;
+}
+
+/*
+ * Clear quiescent state from channel's streams using the given key to
+ * retrieve the channel.
+ *
+ * Return 0 on success else an LTTng error code.
+ */
+static int clear_quiescent_channel(uint64_t chan_key)
+{
+	int ret = 0;
+	struct lttng_consumer_channel *channel;
+	struct lttng_consumer_stream *stream;
+	struct lttng_ht *ht;
+	struct lttng_ht_iter iter;
+
+	DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key);
+
+	rcu_read_lock();
+	channel = consumer_find_channel(chan_key);
+	if (!channel) {
+		ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key);
+		ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
+		goto error;
+	}
+
+	ht = consumer_data.stream_per_chan_id_ht;
+
+	/* For each stream of the channel id, clear quiescent state. */
+	cds_lfht_for_each_entry_duplicate(ht->ht,
+			ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
+			&channel->key, &iter.iter, stream, node_channel_id.node) {
+
+		health_code_update();
+
+		pthread_mutex_lock(&stream->lock);
+		stream->quiescent = false;
+		pthread_mutex_unlock(&stream->lock);
 	}
 error:
 	rcu_read_unlock();
@@ -1582,6 +1629,17 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
 		goto end_msg_sessiond;
 	}
+	case LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL:
+	{
+		int ret;
+
+		ret = clear_quiescent_channel(msg.u.clear_quiescent_channel.key);
+		if (ret != 0) {
+			ret_code = ret;
+		}
+
+		goto end_msg_sessiond;
+	}
 	case LTTNG_CONSUMER_PUSH_METADATA:
 	{
 		int ret;
@@ -1951,7 +2009,12 @@ void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
 	assert(stream);
 	assert(stream->ustream);
 
-	ustctl_flush_buffer(stream->ustream, 0);
+	pthread_mutex_lock(&stream->lock);
+	if (!stream->quiescent) {
+		stream->quiescent = true;
+		ustctl_flush_buffer(stream->ustream, 0);
+	}
+	pthread_mutex_unlock(&stream->lock);
 	stream->hangup_flush_done = 1;
 }
 
-- 
2.1.4



More information about the lttng-dev mailing list