[lttng-dev] [PATCH lttng-tools v2] Fix: consumer relayd cleanup on disconnect

David Goulet dgoulet at efficios.com
Thu Oct 25 18:01:36 EDT 2012


Improve the resilience of the consumer by cleaning up a relayd object
and all associated streams when a write error is detected on a relayd
socket.

Fixes #385

Signed-off-by: David Goulet <dgoulet at efficios.com>
---
 src/common/consumer.c      |  259 ++++++++++++++++++++++++++++++++++++++++----
 src/common/consumer.h      |   12 ++
 src/common/relayd/relayd.c |    3 +
 3 files changed, 254 insertions(+), 20 deletions(-)

diff --git a/src/common/consumer.c b/src/common/consumer.c
index 53c6180..eeb2f59 100644
--- a/src/common/consumer.c
+++ b/src/common/consumer.c
@@ -70,6 +70,21 @@ struct lttng_ht *metadata_ht;
 struct lttng_ht *data_ht;
 
 /*
+ * Notify a thread pipe to poll back again. This usually means that some global
+ * state has changed so we just send back the thread in a poll wait call.
+ */
+static void notify_thread_pipe(int wpipe)
+{
+	int ret;
+
+	do {
+		struct lttng_consumer_stream *null_stream = NULL;
+
+		ret = write(wpipe, &null_stream, sizeof(null_stream));
+	} while (ret < 0 && errno == EINTR);
+}
+
+/*
  * Find a stream. The consumer_data.lock must be locked during this
  * call.
  */
@@ -182,6 +197,17 @@ static void consumer_rcu_free_relayd(struct rcu_head *head)
 	struct consumer_relayd_sock_pair *relayd =
 		caa_container_of(node, struct consumer_relayd_sock_pair, node);
 
+	/*
+	 * Close all sockets. This is done in the call RCU since we don't want the
+	 * socket fds to be reassigned thus potentially creating bad state of the
+	 * relayd object.
+	 *
+	 * We do not have to lock the control socket mutex here since at this stage
+	 * there is no one referencing to this relayd object.
+	 */
+	(void) relayd_close(&relayd->control_sock);
+	(void) relayd_close(&relayd->data_sock);
+
 	free(relayd);
 }
 
@@ -204,21 +230,89 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd)
 	iter.iter.node = &relayd->node.node;
 	ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
 	if (ret != 0) {
-		/* We assume the relayd was already destroyed */
+		/* We assume the relayd is being or is destroyed */
 		return;
 	}
 
-	/* Close all sockets */
-	pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-	(void) relayd_close(&relayd->control_sock);
-	pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-	(void) relayd_close(&relayd->data_sock);
-
 	/* RCU free() call */
 	call_rcu(&relayd->node.head, consumer_rcu_free_relayd);
 }
 
 /*
+ * Update the end point status of all streams having the given network sequence
+ * index (relayd index).
+ *
+ * It's atomically set without having the stream mutex locked so be aware of
+ * potential race when using it.
+ */
+static void update_endpoint_status_by_netidx(int net_seq_idx,
+		enum consumer_endpoint_status status)
+{
+	struct lttng_ht_iter iter;
+	struct lttng_consumer_stream *stream;
+
+	DBG("Consumer set delete flag on stream by idx %d", net_seq_idx);
+
+	rcu_read_lock();
+
+	/* Let's begin with metadata */
+	cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
+		if (stream->net_seq_idx == net_seq_idx) {
+			uatomic_set(&stream->endpoint_status, status);
+			DBG("Delete flag set to metadata stream %d", stream->wait_fd);
+		}
+	}
+
+	/* Follow up by the data streams */
+	cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
+		if (stream->net_seq_idx == net_seq_idx) {
+			uatomic_set(&stream->endpoint_status, status);
+			DBG("Delete flag set to data stream %d", stream->wait_fd);
+		}
+	}
+	rcu_read_unlock();
+}
+
+/*
+ * Cleanup a relayd object by flagging every associated streams for deletion,
+ * destroying the object meaning removing it from the relayd hash table,
+ * closing the sockets and freeing the memory in a RCU call.
+ *
+ * If a local data context is available, notify the threads that the streams'
+ * state have changed.
+ */
+static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
+		struct lttng_consumer_local_data *ctx)
+{
+	int netidx;
+
+	assert(relayd);
+
+	/* Save the net sequence index before destroying the object */
+	netidx = relayd->net_seq_idx;
+
+	/*
+	 * Delete the relayd from the relayd hash table, close the sockets and free
+	 * the object in a RCU call.
+	 */
+	destroy_relayd(relayd);
+
+	/* Set inactive endpoint to all streams */
+	update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
+
+	/*
+	 * With a local data context, notify the threads that the streams' state
+	 * have changed. The write() action on the pipe acts as an "implicit"
+	 * memory barrier ordering the updates of the end point status from the
+	 * read of this status which happens AFTER receiving this notify.
+	 */
+	if (ctx) {
+		notify_thread_pipe(ctx->consumer_data_pipe[1]);
+		notify_thread_pipe(ctx->consumer_metadata_pipe[1]);
+	}
+}
+
+/*
  * Flag a relayd socket pair for destruction. Destroy it if the refcount
  * reaches zero.
  *
@@ -251,11 +345,14 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
 
 	assert(stream);
 
+	DBG("Consumer del stream %d", stream->wait_fd);
+
 	if (ht == NULL) {
 		/* Means the stream was allocated but not successfully added */
 		goto free_stream;
 	}
 
+	pthread_mutex_lock(&stream->lock);
 	pthread_mutex_lock(&consumer_data.lock);
 
 	switch (consumer_data.type) {
@@ -349,6 +446,7 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
 end:
 	consumer_data.need_update = 1;
 	pthread_mutex_unlock(&consumer_data.lock);
+	pthread_mutex_unlock(&stream->lock);
 
 	if (free_chan) {
 		consumer_del_channel(free_chan);
@@ -804,7 +902,17 @@ static int consumer_update_poll_array(
 	DBG("Updating poll fd array");
 	rcu_read_lock();
 	cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
-		if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
+		/*
+		 * Only active streams with an active end point can be added to the
+		 * poll set and local stream storage of the thread.
+		 *
+		 * There is a potential race here for endpoint_status to be updated
+		 * just after the check. However, this is OK since the stream(s) will
+		 * be deleted once the thread is notified that the end point state has
+		 * changed where this function will be called back again.
+		 */
+		if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
+				stream->endpoint_status) {
 			continue;
 		}
 		DBG("Active FD %d", stream->wait_fd);
@@ -1169,6 +1277,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
 	/* Default is on the disk */
 	int outfd = stream->out_fd;
 	struct consumer_relayd_sock_pair *relayd = NULL;
+	unsigned int relayd_hang_up = 0;
 
 	/* RCU lock for the relayd pointer */
 	rcu_read_lock();
@@ -1228,11 +1337,22 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
 				ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
 				if (ret < 0) {
 					written = ret;
+					/* Socket operation failed. We consider the relayd dead */
+					if (ret == -EPIPE || ret == -EINVAL) {
+						relayd_hang_up = 1;
+						goto write_error;
+					}
 					goto end;
 				}
 			}
+		} else {
+			/* Socket operation failed. We consider the relayd dead */
+			if (ret == -EPIPE || ret == -EINVAL) {
+				relayd_hang_up = 1;
+				goto write_error;
+			}
+			/* Else, use the default set before which is the filesystem. */
 		}
-		/* Else, use the default set before which is the filesystem. */
 	} else {
 		/* No streaming, we have to set the len with the full padding */
 		len += padding;
@@ -1248,6 +1368,11 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
 			if (written == 0) {
 				written = ret;
 			}
+			/* Socket operation failed. We consider the relayd dead */
+			if (errno == EPIPE || errno == EINVAL) {
+				relayd_hang_up = 1;
+				goto write_error;
+			}
 			goto end;
 		} else if (ret > len) {
 			PERROR("Error in file write (ret %zd > len %lu)", ret, len);
@@ -1269,6 +1394,15 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
 	}
 	lttng_consumer_sync_trace_file(stream, orig_offset);
 
+write_error:
+	/*
+	 * This is a special case that the relayd has closed its socket. Let's
+	 * cleanup the relayd object and all associated streams.
+	 */
+	if (relayd && relayd_hang_up) {
+		cleanup_relayd(relayd, ctx);
+	}
+
 end:
 	/* Unlock only if ctrl socket used */
 	if (relayd && stream->metadata_flag) {
@@ -1298,6 +1432,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
 	int outfd = stream->out_fd;
 	struct consumer_relayd_sock_pair *relayd = NULL;
 	int *splice_pipe;
+	unsigned int relayd_hang_up = 0;
 
 	switch (consumer_data.type) {
 	case LTTNG_CONSUMER_KERNEL:
@@ -1350,6 +1485,12 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
 					padding);
 			if (ret < 0) {
 				written = ret;
+				/* Socket operation failed. We consider the relayd dead */
+				if (ret == -EBADF) {
+					WARN("Remote relayd disconnected. Stopping");
+					relayd_hang_up = 1;
+					goto write_error;
+				}
 				goto end;
 			}
 
@@ -1361,7 +1502,12 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
 			/* Use the returned socket. */
 			outfd = ret;
 		} else {
-			ERR("Remote relayd disconnected. Stopping");
+			/* Socket operation failed. We consider the relayd dead */
+			if (ret == -EBADF) {
+				WARN("Remote relayd disconnected. Stopping");
+				relayd_hang_up = 1;
+				goto write_error;
+			}
 			goto end;
 		}
 	} else {
@@ -1410,6 +1556,12 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
 			if (written == 0) {
 				written = ret_splice;
 			}
+			/* Socket operation failed. We consider the relayd dead */
+			if (errno == EBADF) {
+				WARN("Remote relayd disconnected. Stopping");
+				relayd_hang_up = 1;
+				goto write_error;
+			}
 			ret = errno;
 			goto splice_error;
 		} else if (ret_splice > len) {
@@ -1437,12 +1589,20 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
 
 	goto end;
 
+write_error:
+	/*
+	 * This is a special case that the relayd has closed its socket. Let's
+	 * cleanup the relayd object and all associated streams.
+	 */
+	if (relayd && relayd_hang_up) {
+		cleanup_relayd(relayd, ctx);
+		/* Skip splice error so the consumer does not fail */
+		goto end;
+	}
+
 splice_error:
 	/* send the appropriate error description to sessiond */
 	switch (ret) {
-	case EBADF:
-		lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EBADF);
-		break;
 	case EINVAL:
 		lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
 		break;
@@ -1604,6 +1764,8 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
 		goto free_stream;
 	}
 
+	pthread_mutex_lock(&stream->lock);
+
 	pthread_mutex_lock(&consumer_data.lock);
 	switch (consumer_data.type) {
 	case LTTNG_CONSUMER_KERNEL:
@@ -1695,6 +1857,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
 
 end:
 	pthread_mutex_unlock(&consumer_data.lock);
+	pthread_mutex_unlock(&stream->lock);
 
 	if (free_chan) {
 		consumer_del_channel(free_chan);
@@ -1766,6 +1929,59 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
 }
 
 /*
+ * Delete data stream that are flagged for deletion (endpoint_status).
+ */
+static void validate_endpoint_status_data_stream(void)
+{
+	struct lttng_ht_iter iter;
+	struct lttng_consumer_stream *stream;
+
+	DBG("Consumer delete flagged data stream");
+
+	rcu_read_lock();
+	cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
+		/* Validate delete flag of the stream */
+		if (!stream->endpoint_status) {
+			continue;
+		}
+		/* Delete it right now */
+		consumer_del_stream(stream, data_ht);
+	}
+	rcu_read_unlock();
+}
+
+/*
+ * Delete metadata stream that are flagged for deletion (endpoint_status).
+ */
+static void validate_endpoint_status_metadata_stream(
+		struct lttng_poll_event *pollset)
+{
+	struct lttng_ht_iter iter;
+	struct lttng_consumer_stream *stream;
+
+	DBG("Consumer delete flagged metadata stream");
+
+	assert(pollset);
+
+	rcu_read_lock();
+	cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
+		/* Validate delete flag of the stream */
+		if (!stream->endpoint_status) {
+			continue;
+		}
+		/*
+		 * Remove from pollset so the metadata thread can continue without
+		 * blocking on a deleted stream.
+		 */
+		lttng_poll_del(pollset, stream->wait_fd);
+
+		/* Delete it right now */
+		consumer_del_metadata_stream(stream, metadata_ht);
+	}
+	rcu_read_unlock();
+}
+
+/*
  * Thread polls on metadata file descriptor and write them on disk or on the
  * network.
  */
@@ -1856,6 +2072,13 @@ restart:
 						continue;
 					}
 
+					/* A NULL stream means that the state has changed. */
+					if (stream == NULL) {
+						/* Check for deleted streams. */
+						validate_endpoint_status_metadata_stream(&events);
+						continue;
+					}
+
 					DBG("Adding metadata stream %d to poll set",
 							stream->wait_fd);
 
@@ -2063,6 +2286,7 @@ void *consumer_thread_data_poll(void *data)
 			 * waking us up to test it.
 			 */
 			if (new_stream == NULL) {
+				validate_endpoint_status_data_stream();
 				continue;
 			}
 
@@ -2301,14 +2525,9 @@ end:
 
 	/*
 	 * Notify the data poll thread to poll back again and test the
-	 * consumer_quit state to quit gracefully.
+	 * consumer_quit state that we just set so to quit gracefully.
 	 */
-	do {
-		struct lttng_consumer_stream *null_stream = NULL;
-
-		ret = write(ctx->consumer_data_pipe[1], &null_stream,
-				sizeof(null_stream));
-	} while (ret < 0 && errno == EINTR);
+	notify_thread_pipe(ctx->consumer_data_pipe[1]);
 
 	rcu_unregister_thread();
 	return NULL;
diff --git a/src/common/consumer.h b/src/common/consumer.h
index 53b6151..0334c49 100644
--- a/src/common/consumer.h
+++ b/src/common/consumer.h
@@ -74,6 +74,11 @@ enum lttng_consumer_type {
 	LTTNG_CONSUMER32_UST,
 };
 
+enum consumer_endpoint_status {
+	CONSUMER_ENDPOINT_ACTIVE,
+	CONSUMER_ENDPOINT_INACTIVE,
+};
+
 struct lttng_consumer_channel {
 	struct lttng_ht_node_ulong node;
 	int key;
@@ -150,6 +155,13 @@ struct lttng_consumer_stream {
 	pthread_mutex_t lock;
 	/* Tracing session id */
 	uint64_t session_id;
+	/*
+	 * Indicates if the stream end point is still active or not (network
+	 * streaming or local file system). The thread "owning" the stream is
+	 * handling this status and can be notified of a state change through the
+	 * consumer data appropriate pipe.
+	 */
+	enum consumer_endpoint_status endpoint_status;
 };
 
 /*
diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c
index 785d3dc..db47608 100644
--- a/src/common/relayd/relayd.c
+++ b/src/common/relayd/relayd.c
@@ -67,6 +67,7 @@ static int send_command(struct lttcomm_sock *sock,
 
 	ret = sock->ops->sendmsg(sock, buf, buf_size, flags);
 	if (ret < 0) {
+		ret = -errno;
 		goto error;
 	}
 
@@ -90,6 +91,7 @@ static int recv_reply(struct lttcomm_sock *sock, void *data, size_t size)
 
 	ret = sock->ops->recvmsg(sock, data, size, 0);
 	if (ret < 0) {
+		ret = -errno;
 		goto error;
 	}
 
@@ -283,6 +285,7 @@ int relayd_send_data_hdr(struct lttcomm_sock *sock,
 	/* Only send data header. */
 	ret = sock->ops->sendmsg(sock, hdr, size, 0);
 	if (ret < 0) {
+		ret = -errno;
 		goto error;
 	}
 
-- 
1.7.10.4




More information about the lttng-dev mailing list