[lttng-dev] [PATCH lttng-tools v2] Fix: consumer relayd cleanup on disconnect

Mathieu Desnoyers mathieu.desnoyers at efficios.com
Fri Oct 26 09:57:13 EDT 2012


* David Goulet (dgoulet at efficios.com) wrote:
> Improve the resilience of the consumer by cleaning up a relayd object
> and all associated streams when a write error is detected on a relayd
> socket.
> 
> Fixes #385
> 
> Signed-off-by: David Goulet <dgoulet at efficios.com>
> ---
>  src/common/consumer.c      |  259 ++++++++++++++++++++++++++++++++++++++++----
>  src/common/consumer.h      |   12 ++
>  src/common/relayd/relayd.c |    3 +
>  3 files changed, 254 insertions(+), 20 deletions(-)
> 
> diff --git a/src/common/consumer.c b/src/common/consumer.c
> index 53c6180..eeb2f59 100644
> --- a/src/common/consumer.c
> +++ b/src/common/consumer.c
> @@ -70,6 +70,21 @@ struct lttng_ht *metadata_ht;
>  struct lttng_ht *data_ht;
>  
>  /*
> + * Notify a thread pipe to poll back again. This usually means that some global
> + * state has changed so we just send back the thread in a poll wait call.
> + */
> +static void notify_thread_pipe(int wpipe)
> +{
> +	int ret;
> +
> +	do {
> +		struct lttng_consumer_stream *null_stream = NULL;
> +
> +		ret = write(wpipe, &null_stream, sizeof(null_stream));
> +	} while (ret < 0 && errno == EINTR);
> +}
> +
> +/*
>   * Find a stream. The consumer_data.lock must be locked during this
>   * call.
>   */
> @@ -182,6 +197,17 @@ static void consumer_rcu_free_relayd(struct rcu_head *head)
>  	struct consumer_relayd_sock_pair *relayd =
>  		caa_container_of(node, struct consumer_relayd_sock_pair, node);
>  
> +	/*
> +	 * Close all sockets. This is done in the call RCU since we don't want the
> +	 * socket fds to be reassigned thus potentially creating bad state of the
> +	 * relayd object.
> +	 *
> +	 * We do not have to lock the control socket mutex here since at this stage
> +	 * there is no one referencing to this relayd object.
> +	 */
> +	(void) relayd_close(&relayd->control_sock);
> +	(void) relayd_close(&relayd->data_sock);
> +
>  	free(relayd);
>  }
>  
> @@ -204,21 +230,89 @@ static void destroy_relayd(struct consumer_relayd_sock_pair *relayd)
>  	iter.iter.node = &relayd->node.node;
>  	ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
>  	if (ret != 0) {
> -		/* We assume the relayd was already destroyed */
> +		/* We assume the relayd is being or is destroyed */
>  		return;
>  	}
>  
> -	/* Close all sockets */
> -	pthread_mutex_lock(&relayd->ctrl_sock_mutex);
> -	(void) relayd_close(&relayd->control_sock);
> -	pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
> -	(void) relayd_close(&relayd->data_sock);
> -
>  	/* RCU free() call */
>  	call_rcu(&relayd->node.head, consumer_rcu_free_relayd);
>  }
>  
>  /*
> + * Update the end point status of all streams having the given network sequence
> + * index (relayd index).
> + *
> + * It's atomically set without having the stream mutex locked so be aware of
> + * potential race when using it.

Please describe that we handle this race with a retry that will happen,
triggered by the pipe wakeup.

Other than that,

Acked-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com>


> + */
> +static void update_endpoint_status_by_netidx(int net_seq_idx,
> +		enum consumer_endpoint_status status)
> +{
> +	struct lttng_ht_iter iter;
> +	struct lttng_consumer_stream *stream;
> +
> +	DBG("Consumer set delete flag on stream by idx %d", net_seq_idx);
> +
> +	rcu_read_lock();
> +
> +	/* Let's begin with metadata */
> +	cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
> +		if (stream->net_seq_idx == net_seq_idx) {
> +			uatomic_set(&stream->endpoint_status, status);
> +			DBG("Delete flag set to metadata stream %d", stream->wait_fd);
> +		}
> +	}
> +
> +	/* Follow up by the data streams */
> +	cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
> +		if (stream->net_seq_idx == net_seq_idx) {
> +			uatomic_set(&stream->endpoint_status, status);
> +			DBG("Delete flag set to data stream %d", stream->wait_fd);
> +		}
> +	}
> +	rcu_read_unlock();
> +}
> +
> +/*
> + * Cleanup a relayd object by flagging every associated streams for deletion,
> + * destroying the object meaning removing it from the relayd hash table,
> + * closing the sockets and freeing the memory in a RCU call.
> + *
> + * If a local data context is available, notify the threads that the streams'
> + * state have changed.
> + */
> +static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
> +		struct lttng_consumer_local_data *ctx)
> +{
> +	int netidx;
> +
> +	assert(relayd);
> +
> +	/* Save the net sequence index before destroying the object */
> +	netidx = relayd->net_seq_idx;
> +
> +	/*
> +	 * Delete the relayd from the relayd hash table, close the sockets and free
> +	 * the object in a RCU call.
> +	 */
> +	destroy_relayd(relayd);
> +
> +	/* Set inactive endpoint to all streams */
> +	update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
> +
> +	/*
> +	 * With a local data context, notify the threads that the streams' state
> +	 * have changed. The write() action on the pipe acts as an "implicit"
> +	 * memory barrier ordering the updates of the end point status from the
> +	 * read of this status which happens AFTER receiving this notify.
> +	 */
> +	if (ctx) {
> +		notify_thread_pipe(ctx->consumer_data_pipe[1]);
> +		notify_thread_pipe(ctx->consumer_metadata_pipe[1]);
> +	}
> +}
> +
> +/*
>   * Flag a relayd socket pair for destruction. Destroy it if the refcount
>   * reaches zero.
>   *
> @@ -251,11 +345,14 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
>  
>  	assert(stream);
>  
> +	DBG("Consumer del stream %d", stream->wait_fd);
> +
>  	if (ht == NULL) {
>  		/* Means the stream was allocated but not successfully added */
>  		goto free_stream;
>  	}
>  
> +	pthread_mutex_lock(&stream->lock);
>  	pthread_mutex_lock(&consumer_data.lock);
>  
>  	switch (consumer_data.type) {
> @@ -349,6 +446,7 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
>  end:
>  	consumer_data.need_update = 1;
>  	pthread_mutex_unlock(&consumer_data.lock);
> +	pthread_mutex_unlock(&stream->lock);
>  
>  	if (free_chan) {
>  		consumer_del_channel(free_chan);
> @@ -804,7 +902,17 @@ static int consumer_update_poll_array(
>  	DBG("Updating poll fd array");
>  	rcu_read_lock();
>  	cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
> -		if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
> +		/*
> +		 * Only active streams with an active end point can be added to the
> +		 * poll set and local stream storage of the thread.
> +		 *
> +		 * There is a potential race here for endpoint_status to be updated
> +		 * just after the check. However, this is OK since the stream(s) will
> +		 * be deleted once the thread is notified that the end point state has
> +		 * changed where this function will be called back again.
> +		 */
> +		if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
> +				stream->endpoint_status) {
>  			continue;
>  		}
>  		DBG("Active FD %d", stream->wait_fd);
> @@ -1169,6 +1277,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
>  	/* Default is on the disk */
>  	int outfd = stream->out_fd;
>  	struct consumer_relayd_sock_pair *relayd = NULL;
> +	unsigned int relayd_hang_up = 0;
>  
>  	/* RCU lock for the relayd pointer */
>  	rcu_read_lock();
> @@ -1228,11 +1337,22 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
>  				ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
>  				if (ret < 0) {
>  					written = ret;
> +					/* Socket operation failed. We consider the relayd dead */
> +					if (ret == -EPIPE || ret == -EINVAL) {
> +						relayd_hang_up = 1;
> +						goto write_error;
> +					}
>  					goto end;
>  				}
>  			}
> +		} else {
> +			/* Socket operation failed. We consider the relayd dead */
> +			if (ret == -EPIPE || ret == -EINVAL) {
> +				relayd_hang_up = 1;
> +				goto write_error;
> +			}
> +			/* Else, use the default set before which is the filesystem. */
>  		}
> -		/* Else, use the default set before which is the filesystem. */
>  	} else {
>  		/* No streaming, we have to set the len with the full padding */
>  		len += padding;
> @@ -1248,6 +1368,11 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
>  			if (written == 0) {
>  				written = ret;
>  			}
> +			/* Socket operation failed. We consider the relayd dead */
> +			if (errno == EPIPE || errno == EINVAL) {
> +				relayd_hang_up = 1;
> +				goto write_error;
> +			}
>  			goto end;
>  		} else if (ret > len) {
>  			PERROR("Error in file write (ret %zd > len %lu)", ret, len);
> @@ -1269,6 +1394,15 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
>  	}
>  	lttng_consumer_sync_trace_file(stream, orig_offset);
>  
> +write_error:
> +	/*
> +	 * This is a special case that the relayd has closed its socket. Let's
> +	 * cleanup the relayd object and all associated streams.
> +	 */
> +	if (relayd && relayd_hang_up) {
> +		cleanup_relayd(relayd, ctx);
> +	}
> +
>  end:
>  	/* Unlock only if ctrl socket used */
>  	if (relayd && stream->metadata_flag) {
> @@ -1298,6 +1432,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
>  	int outfd = stream->out_fd;
>  	struct consumer_relayd_sock_pair *relayd = NULL;
>  	int *splice_pipe;
> +	unsigned int relayd_hang_up = 0;
>  
>  	switch (consumer_data.type) {
>  	case LTTNG_CONSUMER_KERNEL:
> @@ -1350,6 +1485,12 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
>  					padding);
>  			if (ret < 0) {
>  				written = ret;
> +				/* Socket operation failed. We consider the relayd dead */
> +				if (ret == -EBADF) {
> +					WARN("Remote relayd disconnected. Stopping");
> +					relayd_hang_up = 1;
> +					goto write_error;
> +				}
>  				goto end;
>  			}
>  
> @@ -1361,7 +1502,12 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
>  			/* Use the returned socket. */
>  			outfd = ret;
>  		} else {
> -			ERR("Remote relayd disconnected. Stopping");
> +			/* Socket operation failed. We consider the relayd dead */
> +			if (ret == -EBADF) {
> +				WARN("Remote relayd disconnected. Stopping");
> +				relayd_hang_up = 1;
> +				goto write_error;
> +			}
>  			goto end;
>  		}
>  	} else {
> @@ -1410,6 +1556,12 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
>  			if (written == 0) {
>  				written = ret_splice;
>  			}
> +			/* Socket operation failed. We consider the relayd dead */
> +			if (errno == EBADF) {
> +				WARN("Remote relayd disconnected. Stopping");
> +				relayd_hang_up = 1;
> +				goto write_error;
> +			}
>  			ret = errno;
>  			goto splice_error;
>  		} else if (ret_splice > len) {
> @@ -1437,12 +1589,20 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
>  
>  	goto end;
>  
> +write_error:
> +	/*
> +	 * This is a special case that the relayd has closed its socket. Let's
> +	 * cleanup the relayd object and all associated streams.
> +	 */
> +	if (relayd && relayd_hang_up) {
> +		cleanup_relayd(relayd, ctx);
> +		/* Skip splice error so the consumer does not fail */
> +		goto end;
> +	}
> +
>  splice_error:
>  	/* send the appropriate error description to sessiond */
>  	switch (ret) {
> -	case EBADF:
> -		lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EBADF);
> -		break;
>  	case EINVAL:
>  		lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
>  		break;
> @@ -1604,6 +1764,8 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
>  		goto free_stream;
>  	}
>  
> +	pthread_mutex_lock(&stream->lock);
> +
>  	pthread_mutex_lock(&consumer_data.lock);
>  	switch (consumer_data.type) {
>  	case LTTNG_CONSUMER_KERNEL:
> @@ -1695,6 +1857,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
>  
>  end:
>  	pthread_mutex_unlock(&consumer_data.lock);
> +	pthread_mutex_unlock(&stream->lock);
>  
>  	if (free_chan) {
>  		consumer_del_channel(free_chan);
> @@ -1766,6 +1929,59 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
>  }
>  
>  /*
> + * Delete data stream that are flagged for deletion (endpoint_status).
> + */
> +static void validate_endpoint_status_data_stream(void)
> +{
> +	struct lttng_ht_iter iter;
> +	struct lttng_consumer_stream *stream;
> +
> +	DBG("Consumer delete flagged data stream");
> +
> +	rcu_read_lock();
> +	cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
> +		/* Validate delete flag of the stream */
> +		if (!stream->endpoint_status) {
> +			continue;
> +		}
> +		/* Delete it right now */
> +		consumer_del_stream(stream, data_ht);
> +	}
> +	rcu_read_unlock();
> +}
> +
> +/*
> + * Delete metadata stream that are flagged for deletion (endpoint_status).
> + */
> +static void validate_endpoint_status_metadata_stream(
> +		struct lttng_poll_event *pollset)
> +{
> +	struct lttng_ht_iter iter;
> +	struct lttng_consumer_stream *stream;
> +
> +	DBG("Consumer delete flagged metadata stream");
> +
> +	assert(pollset);
> +
> +	rcu_read_lock();
> +	cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
> +		/* Validate delete flag of the stream */
> +		if (!stream->endpoint_status) {
> +			continue;
> +		}
> +		/*
> +		 * Remove from pollset so the metadata thread can continue without
> +		 * blocking on a deleted stream.
> +		 */
> +		lttng_poll_del(pollset, stream->wait_fd);
> +
> +		/* Delete it right now */
> +		consumer_del_metadata_stream(stream, metadata_ht);
> +	}
> +	rcu_read_unlock();
> +}
> +
> +/*
>   * Thread polls on metadata file descriptor and write them on disk or on the
>   * network.
>   */
> @@ -1856,6 +2072,13 @@ restart:
>  						continue;
>  					}
>  
> +					/* A NULL stream means that the state has changed. */
> +					if (stream == NULL) {
> +						/* Check for deleted streams. */
> +						validate_endpoint_status_metadata_stream(&events);
> +						continue;
> +					}
> +
>  					DBG("Adding metadata stream %d to poll set",
>  							stream->wait_fd);
>  
> @@ -2063,6 +2286,7 @@ void *consumer_thread_data_poll(void *data)
>  			 * waking us up to test it.
>  			 */
>  			if (new_stream == NULL) {
> +				validate_endpoint_status_data_stream();
>  				continue;
>  			}
>  
> @@ -2301,14 +2525,9 @@ end:
>  
>  	/*
>  	 * Notify the data poll thread to poll back again and test the
> -	 * consumer_quit state to quit gracefully.
> +	 * consumer_quit state that we just set so to quit gracefully.
>  	 */
> -	do {
> -		struct lttng_consumer_stream *null_stream = NULL;
> -
> -		ret = write(ctx->consumer_data_pipe[1], &null_stream,
> -				sizeof(null_stream));
> -	} while (ret < 0 && errno == EINTR);
> +	notify_thread_pipe(ctx->consumer_data_pipe[1]);
>  
>  	rcu_unregister_thread();
>  	return NULL;
> diff --git a/src/common/consumer.h b/src/common/consumer.h
> index 53b6151..0334c49 100644
> --- a/src/common/consumer.h
> +++ b/src/common/consumer.h
> @@ -74,6 +74,11 @@ enum lttng_consumer_type {
>  	LTTNG_CONSUMER32_UST,
>  };
>  
> +enum consumer_endpoint_status {
> +	CONSUMER_ENDPOINT_ACTIVE,
> +	CONSUMER_ENDPOINT_INACTIVE,
> +};
> +
>  struct lttng_consumer_channel {
>  	struct lttng_ht_node_ulong node;
>  	int key;
> @@ -150,6 +155,13 @@ struct lttng_consumer_stream {
>  	pthread_mutex_t lock;
>  	/* Tracing session id */
>  	uint64_t session_id;
> +	/*
> +	 * Indicates if the stream end point is still active or not (network
> +	 * streaming or local file system). The thread "owning" the stream is
> +	 * handling this status and can be notified of a state change through the
> +	 * consumer data appropriate pipe.
> +	 */
> +	enum consumer_endpoint_status endpoint_status;
>  };
>  
>  /*
> diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c
> index 785d3dc..db47608 100644
> --- a/src/common/relayd/relayd.c
> +++ b/src/common/relayd/relayd.c
> @@ -67,6 +67,7 @@ static int send_command(struct lttcomm_sock *sock,
>  
>  	ret = sock->ops->sendmsg(sock, buf, buf_size, flags);
>  	if (ret < 0) {
> +		ret = -errno;
>  		goto error;
>  	}
>  
> @@ -90,6 +91,7 @@ static int recv_reply(struct lttcomm_sock *sock, void *data, size_t size)
>  
>  	ret = sock->ops->recvmsg(sock, data, size, 0);
>  	if (ret < 0) {
> +		ret = -errno;
>  		goto error;
>  	}
>  
> @@ -283,6 +285,7 @@ int relayd_send_data_hdr(struct lttcomm_sock *sock,
>  	/* Only send data header. */
>  	ret = sock->ops->sendmsg(sock, hdr, size, 0);
>  	if (ret < 0) {
> +		ret = -errno;
>  		goto error;
>  	}
>  
> -- 
> 1.7.10.4
> 
> 
> _______________________________________________
> lttng-dev mailing list
> lttng-dev at lists.lttng.org
> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev

-- 
Mathieu Desnoyers
Operating System Efficiency R&D Consultant
EfficiOS Inc.
http://www.efficios.com



More information about the lttng-dev mailing list