[lttng-dev] [PATCH lttng-tools v2] Fix: consumer relayd cleanup on disconnect
Mathieu Desnoyers
mathieu.desnoyers at efficios.com
Fri Oct 26 09:57:13 EDT 2012
* David Goulet (dgoulet at efficios.com) wrote:
> Improve the resilience of the consumer by cleaning up a relayd object
> and all associated streams when a write error is detected on a relayd
> socket.
>
> Fixes #385
>
> Signed-off-by: David Goulet <dgoulet at efficios.com>
> ---
> src/common/consumer.c | 259 ++++++++++++++++++++++++++++++++++++++++----
> src/common/consumer.h | 12 ++
> src/common/relayd/relayd.c | 3 +
> 3 files changed, 254 insertions(+), 20 deletions(-)
>
> diff --git a/src/common/consumer.c b/src/common/consumer.c
> index 53c6180..eeb2f59 100644
> --- a/src/common/consumer.c
> +++ b/src/common/consumer.c
> @@ -70,6 +70,21 @@ struct lttng_ht *metadata_ht;
> struct lttng_ht *data_ht;
>
> /*
> + * Notify a thread pipe to poll back again. This usually means that some global
> + * state has changed so we just send back the thread in a poll wait call.
> + */
> +static void notify_thread_pipe(int wpipe)
> +{
> + int ret;
> +
> + do {
> + struct lttng_consumer_stream *null_stream = NULL;
> +
> + ret = write(wpipe, &null_stream, sizeof(null_stream));
> + } while (ret < 0 && errno == EINTR);
> +}
> +
> +/*
> * Find a stream. The consumer_data.lock must be locked during this
> * call.
> */
> @@ -182,6 +197,17 @@ static void consumer_rcu_free_relayd(struct rcu_head *head)
> struct consumer_relayd_sock_pair *relayd =
> caa_container_of(node, struct consumer_relayd_sock_pair, node);
>
> + /*
> + * Close all sockets. This is done in the call RCU since we don't want the
> + * socket fds to be reassigned thus potentially creating bad state of the
> + * relayd object.
> + *
> + * We do not have to lock the control socket mutex here since at this stage
> + * there is no one referencing to this relayd object.
> + */
> + (void) relayd_close(&relayd->control_sock);
> + (void) relayd_close(&relayd->data_sock);
> +
> free(relayd);
> }
>
> @@ -204,21 +230,89 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd)
> iter.iter.node = &relayd->node.node;
> ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
> if (ret != 0) {
> - /* We assume the relayd was already destroyed */
> + /* We assume the relayd is being or is destroyed */
> return;
> }
>
> - /* Close all sockets */
> - pthread_mutex_lock(&relayd->ctrl_sock_mutex);
> - (void) relayd_close(&relayd->control_sock);
> - pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
> - (void) relayd_close(&relayd->data_sock);
> -
> /* RCU free() call */
> call_rcu(&relayd->node.head, consumer_rcu_free_relayd);
> }
>
> /*
> + * Update the end point status of all streams having the given network sequence
> + * index (relayd index).
> + *
> + * It's atomically set without having the stream mutex locked so be aware of
> + * potential race when using it.
Please describe that we handle this race with a retry that will happen,
triggered by the pipe wakeup.
Other than that,
Acked-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> + */
> +static void update_endpoint_status_by_netidx(int net_seq_idx,
> + enum consumer_endpoint_status status)
> +{
> + struct lttng_ht_iter iter;
> + struct lttng_consumer_stream *stream;
> +
> + DBG("Consumer set delete flag on stream by idx %d", net_seq_idx);
> +
> + rcu_read_lock();
> +
> + /* Let's begin with metadata */
> + cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
> + if (stream->net_seq_idx == net_seq_idx) {
> + uatomic_set(&stream->endpoint_status, status);
> + DBG("Delete flag set to metadata stream %d", stream->wait_fd);
> + }
> + }
> +
> + /* Follow up by the data streams */
> + cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
> + if (stream->net_seq_idx == net_seq_idx) {
> + uatomic_set(&stream->endpoint_status, status);
> + DBG("Delete flag set to data stream %d", stream->wait_fd);
> + }
> + }
> + rcu_read_unlock();
> +}
> +
> +/*
> + * Cleanup a relayd object by flagging every associated streams for deletion,
> + * destroying the object meaning removing it from the relayd hash table,
> + * closing the sockets and freeing the memory in a RCU call.
> + *
> + * If a local data context is available, notify the threads that the streams'
> + * state have changed.
> + */
> +static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
> + struct lttng_consumer_local_data *ctx)
> +{
> + int netidx;
> +
> + assert(relayd);
> +
> + /* Save the net sequence index before destroying the object */
> + netidx = relayd->net_seq_idx;
> +
> + /*
> + * Delete the relayd from the relayd hash table, close the sockets and free
> + * the object in a RCU call.
> + */
> + destroy_relayd(relayd);
> +
> + /* Set inactive endpoint to all streams */
> + update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
> +
> + /*
> + * With a local data context, notify the threads that the streams' state
> + * have changed. The write() action on the pipe acts as an "implicit"
> + * memory barrier ordering the updates of the end point status from the
> + * read of this status which happens AFTER receiving this notify.
> + */
> + if (ctx) {
> + notify_thread_pipe(ctx->consumer_data_pipe[1]);
> + notify_thread_pipe(ctx->consumer_metadata_pipe[1]);
> + }
> +}
> +
> +/*
> * Flag a relayd socket pair for destruction. Destroy it if the refcount
> * reaches zero.
> *
> @@ -251,11 +345,14 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
>
> assert(stream);
>
> + DBG("Consumer del stream %d", stream->wait_fd);
> +
> if (ht == NULL) {
> /* Means the stream was allocated but not successfully added */
> goto free_stream;
> }
>
> + pthread_mutex_lock(&stream->lock);
> pthread_mutex_lock(&consumer_data.lock);
>
> switch (consumer_data.type) {
> @@ -349,6 +446,7 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
> end:
> consumer_data.need_update = 1;
> pthread_mutex_unlock(&consumer_data.lock);
> + pthread_mutex_unlock(&stream->lock);
>
> if (free_chan) {
> consumer_del_channel(free_chan);
> @@ -804,7 +902,17 @@ static int consumer_update_poll_array(
> DBG("Updating poll fd array");
> rcu_read_lock();
> cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
> - if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
> + /*
> + * Only active streams with an active end point can be added to the
> + * poll set and local stream storage of the thread.
> + *
> + * There is a potential race here for endpoint_status to be updated
> + * just after the check. However, this is OK since the stream(s) will
> + * be deleted once the thread is notified that the end point state has
> + * changed where this function will be called back again.
> + */
> + if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
> + stream->endpoint_status) {
> continue;
> }
> DBG("Active FD %d", stream->wait_fd);
> @@ -1169,6 +1277,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
> /* Default is on the disk */
> int outfd = stream->out_fd;
> struct consumer_relayd_sock_pair *relayd = NULL;
> + unsigned int relayd_hang_up = 0;
>
> /* RCU lock for the relayd pointer */
> rcu_read_lock();
> @@ -1228,11 +1337,22 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
> ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
> if (ret < 0) {
> written = ret;
> + /* Socket operation failed. We consider the relayd dead */
> + if (ret == -EPIPE || ret == -EINVAL) {
> + relayd_hang_up = 1;
> + goto write_error;
> + }
> goto end;
> }
> }
> + } else {
> + /* Socket operation failed. We consider the relayd dead */
> + if (ret == -EPIPE || ret == -EINVAL) {
> + relayd_hang_up = 1;
> + goto write_error;
> + }
> + /* Else, use the default set before which is the filesystem. */
> }
> - /* Else, use the default set before which is the filesystem. */
> } else {
> /* No streaming, we have to set the len with the full padding */
> len += padding;
> @@ -1248,6 +1368,11 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
> if (written == 0) {
> written = ret;
> }
> + /* Socket operation failed. We consider the relayd dead */
> + if (errno == EPIPE || errno == EINVAL) {
> + relayd_hang_up = 1;
> + goto write_error;
> + }
> goto end;
> } else if (ret > len) {
> PERROR("Error in file write (ret %zd > len %lu)", ret, len);
> @@ -1269,6 +1394,15 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
> }
> lttng_consumer_sync_trace_file(stream, orig_offset);
>
> +write_error:
> + /*
> + * This is a special case that the relayd has closed its socket. Let's
> + * cleanup the relayd object and all associated streams.
> + */
> + if (relayd && relayd_hang_up) {
> + cleanup_relayd(relayd, ctx);
> + }
> +
> end:
> /* Unlock only if ctrl socket used */
> if (relayd && stream->metadata_flag) {
> @@ -1298,6 +1432,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
> int outfd = stream->out_fd;
> struct consumer_relayd_sock_pair *relayd = NULL;
> int *splice_pipe;
> + unsigned int relayd_hang_up = 0;
>
> switch (consumer_data.type) {
> case LTTNG_CONSUMER_KERNEL:
> @@ -1350,6 +1485,12 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
> padding);
> if (ret < 0) {
> written = ret;
> + /* Socket operation failed. We consider the relayd dead */
> + if (ret == -EBADF) {
> + WARN("Remote relayd disconnected. Stopping");
> + relayd_hang_up = 1;
> + goto write_error;
> + }
> goto end;
> }
>
> @@ -1361,7 +1502,12 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
> /* Use the returned socket. */
> outfd = ret;
> } else {
> - ERR("Remote relayd disconnected. Stopping");
> + /* Socket operation failed. We consider the relayd dead */
> + if (ret == -EBADF) {
> + WARN("Remote relayd disconnected. Stopping");
> + relayd_hang_up = 1;
> + goto write_error;
> + }
> goto end;
> }
> } else {
> @@ -1410,6 +1556,12 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
> if (written == 0) {
> written = ret_splice;
> }
> + /* Socket operation failed. We consider the relayd dead */
> + if (errno == EBADF) {
> + WARN("Remote relayd disconnected. Stopping");
> + relayd_hang_up = 1;
> + goto write_error;
> + }
> ret = errno;
> goto splice_error;
> } else if (ret_splice > len) {
> @@ -1437,12 +1589,20 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
>
> goto end;
>
> +write_error:
> + /*
> + * This is a special case that the relayd has closed its socket. Let's
> + * cleanup the relayd object and all associated streams.
> + */
> + if (relayd && relayd_hang_up) {
> + cleanup_relayd(relayd, ctx);
> + /* Skip splice error so the consumer does not fail */
> + goto end;
> + }
> +
> splice_error:
> /* send the appropriate error description to sessiond */
> switch (ret) {
> - case EBADF:
> - lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EBADF);
> - break;
> case EINVAL:
> lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
> break;
> @@ -1604,6 +1764,8 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
> goto free_stream;
> }
>
> + pthread_mutex_lock(&stream->lock);
> +
> pthread_mutex_lock(&consumer_data.lock);
> switch (consumer_data.type) {
> case LTTNG_CONSUMER_KERNEL:
> @@ -1695,6 +1857,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
>
> end:
> pthread_mutex_unlock(&consumer_data.lock);
> + pthread_mutex_unlock(&stream->lock);
>
> if (free_chan) {
> consumer_del_channel(free_chan);
> @@ -1766,6 +1929,59 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
> }
>
> /*
> + * Delete data stream that are flagged for deletion (endpoint_status).
> + */
> +static void validate_endpoint_status_data_stream(void)
> +{
> + struct lttng_ht_iter iter;
> + struct lttng_consumer_stream *stream;
> +
> + DBG("Consumer delete flagged data stream");
> +
> + rcu_read_lock();
> + cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
> + /* Validate delete flag of the stream */
> + if (!stream->endpoint_status) {
> + continue;
> + }
> + /* Delete it right now */
> + consumer_del_stream(stream, data_ht);
> + }
> + rcu_read_unlock();
> +}
> +
> +/*
> + * Delete metadata stream that are flagged for deletion (endpoint_status).
> + */
> +static void validate_endpoint_status_metadata_stream(
> + struct lttng_poll_event *pollset)
> +{
> + struct lttng_ht_iter iter;
> + struct lttng_consumer_stream *stream;
> +
> + DBG("Consumer delete flagged metadata stream");
> +
> + assert(pollset);
> +
> + rcu_read_lock();
> + cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
> + /* Validate delete flag of the stream */
> + if (!stream->endpoint_status) {
> + continue;
> + }
> + /*
> + * Remove from pollset so the metadata thread can continue without
> + * blocking on a deleted stream.
> + */
> + lttng_poll_del(pollset, stream->wait_fd);
> +
> + /* Delete it right now */
> + consumer_del_metadata_stream(stream, metadata_ht);
> + }
> + rcu_read_unlock();
> +}
> +
> +/*
> * Thread polls on metadata file descriptor and write them on disk or on the
> * network.
> */
> @@ -1856,6 +2072,13 @@ restart:
> continue;
> }
>
> + /* A NULL stream means that the state has changed. */
> + if (stream == NULL) {
> + /* Check for deleted streams. */
> + validate_endpoint_status_metadata_stream(&events);
> + continue;
> + }
> +
> DBG("Adding metadata stream %d to poll set",
> stream->wait_fd);
>
> @@ -2063,6 +2286,7 @@ void *consumer_thread_data_poll(void *data)
> * waking us up to test it.
> */
> if (new_stream == NULL) {
> + validate_endpoint_status_data_stream();
> continue;
> }
>
> @@ -2301,14 +2525,9 @@ end:
>
> /*
> * Notify the data poll thread to poll back again and test the
> - * consumer_quit state to quit gracefully.
> + * consumer_quit state that we just set so to quit gracefully.
> */
> - do {
> - struct lttng_consumer_stream *null_stream = NULL;
> -
> - ret = write(ctx->consumer_data_pipe[1], &null_stream,
> - sizeof(null_stream));
> - } while (ret < 0 && errno == EINTR);
> + notify_thread_pipe(ctx->consumer_data_pipe[1]);
>
> rcu_unregister_thread();
> return NULL;
> diff --git a/src/common/consumer.h b/src/common/consumer.h
> index 53b6151..0334c49 100644
> --- a/src/common/consumer.h
> +++ b/src/common/consumer.h
> @@ -74,6 +74,11 @@ enum lttng_consumer_type {
> LTTNG_CONSUMER32_UST,
> };
>
> +enum consumer_endpoint_status {
> + CONSUMER_ENDPOINT_ACTIVE,
> + CONSUMER_ENDPOINT_INACTIVE,
> +};
> +
> struct lttng_consumer_channel {
> struct lttng_ht_node_ulong node;
> int key;
> @@ -150,6 +155,13 @@ struct lttng_consumer_stream {
> pthread_mutex_t lock;
> /* Tracing session id */
> uint64_t session_id;
> + /*
> + * Indicates if the stream end point is still active or not (network
> + * streaming or local file system). The thread "owning" the stream is
> + * handling this status and can be notified of a state change through the
> + * consumer data appropriate pipe.
> + */
> + enum consumer_endpoint_status endpoint_status;
> };
>
> /*
> diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c
> index 785d3dc..db47608 100644
> --- a/src/common/relayd/relayd.c
> +++ b/src/common/relayd/relayd.c
> @@ -67,6 +67,7 @@ static int send_command(struct lttcomm_sock *sock,
>
> ret = sock->ops->sendmsg(sock, buf, buf_size, flags);
> if (ret < 0) {
> + ret = -errno;
> goto error;
> }
>
> @@ -90,6 +91,7 @@ static int recv_reply(struct lttcomm_sock *sock, void *data, size_t size)
>
> ret = sock->ops->recvmsg(sock, data, size, 0);
> if (ret < 0) {
> + ret = -errno;
> goto error;
> }
>
> @@ -283,6 +285,7 @@ int relayd_send_data_hdr(struct lttcomm_sock *sock,
> /* Only send data header. */
> ret = sock->ops->sendmsg(sock, hdr, size, 0);
> if (ret < 0) {
> + ret = -errno;
> goto error;
> }
>
> --
> 1.7.10.4
>
>
> _______________________________________________
> lttng-dev mailing list
> lttng-dev at lists.lttng.org
> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev
--
Mathieu Desnoyers
Operating System Efficiency R&D Consultant
EfficiOS Inc.
http://www.efficios.com
More information about the lttng-dev
mailing list