[lttng-dev] [PATCH 2/3 lttng-tools v2] Change consumer_metadata_pipe to be a lttng_pipe

Mathieu Desnoyers mathieu.desnoyers at efficios.com
Mon May 20 10:31:44 EDT 2013


* David Goulet (dgoulet at efficios.com) wrote:
> The read() call in the metadata thread is also changed to use the lttng
> pipe read wrapper.


Acked-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com>

> 
> Signed-off-by: David Goulet <dgoulet at efficios.com>
> ---
>  src/common/consumer.c                        |   58 +++++++++-----------------
>  src/common/consumer.h                        |    2 +-
>  src/common/kernel-consumer/kernel-consumer.c |    2 +-
>  src/common/ust-consumer/ust-consumer.c       |    2 +-
>  4 files changed, 22 insertions(+), 42 deletions(-)
> 
> diff --git a/src/common/consumer.c b/src/common/consumer.c
> index ca51ce0..c63e6e6 100644
> --- a/src/common/consumer.c
> +++ b/src/common/consumer.c
> @@ -77,21 +77,6 @@ static struct lttng_ht *metadata_ht;
>  static struct lttng_ht *data_ht;
>  
>  /*
> - * Notify a thread pipe to poll back again. This usually means that some global
> - * state has changed so we just send back the thread in a poll wait call.
> - */
> -static void notify_thread_pipe(int wpipe)
> -{
> -	int ret;
> -
> -	do {
> -		struct lttng_consumer_stream *null_stream = NULL;
> -
> -		ret = write(wpipe, &null_stream, sizeof(null_stream));
> -	} while (ret < 0 && errno == EINTR);
> -}
> -
> -/*
>   * Notify a thread lttng pipe to poll back again. This usually means that some
>   * global state has changed so we just send back the thread in a poll wait
>   * call.
> @@ -423,7 +408,7 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
>  	 */
>  	if (ctx) {
>  		notify_thread_lttng_pipe(ctx->consumer_data_pipe);
> -		notify_thread_pipe(ctx->consumer_metadata_pipe[1]);
> +		notify_thread_lttng_pipe(ctx->consumer_metadata_pipe);
>  	}
>  }
>  
> @@ -1206,8 +1191,8 @@ struct lttng_consumer_local_data *lttng_consumer_create(
>  		goto error_channel_pipe;
>  	}
>  
> -	ret = utils_create_pipe(ctx->consumer_metadata_pipe);
> -	if (ret < 0) {
> +	ctx->consumer_metadata_pipe = lttng_pipe_open(0);
> +	if (!ctx->consumer_metadata_pipe) {
>  		goto error_metadata_pipe;
>  	}
>  
> @@ -1219,7 +1204,7 @@ struct lttng_consumer_local_data *lttng_consumer_create(
>  	return ctx;
>  
>  error_splice_pipe:
> -	utils_close_pipe(ctx->consumer_metadata_pipe);
> +	lttng_pipe_destroy(ctx->consumer_metadata_pipe);
>  error_metadata_pipe:
>  	utils_close_pipe(ctx->consumer_channel_pipe);
>  error_channel_pipe:
> @@ -1254,6 +1239,7 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
>  	utils_close_pipe(ctx->consumer_thread_pipe);
>  	utils_close_pipe(ctx->consumer_channel_pipe);
>  	lttng_pipe_destroy(ctx->consumer_data_pipe);
> +	lttng_pipe_destroy(ctx->consumer_metadata_pipe);
>  	utils_close_pipe(ctx->consumer_should_quit);
>  	utils_close_pipe(ctx->consumer_splice_metadata_pipe);
>  
> @@ -2134,7 +2120,8 @@ void *consumer_thread_metadata_poll(void *data)
>  		goto end_poll;
>  	}
>  
> -	ret = lttng_poll_add(&events, ctx->consumer_metadata_pipe[0], LPOLLIN);
> +	ret = lttng_poll_add(&events,
> +			lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), LPOLLIN);
>  	if (ret < 0) {
>  		goto end;
>  	}
> @@ -2172,30 +2159,26 @@ restart:
>  				continue;
>  			}
>  
> -			if (pollfd == ctx->consumer_metadata_pipe[0]) {
> +			if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
>  				if (revents & (LPOLLERR | LPOLLHUP )) {
>  					DBG("Metadata thread pipe hung up");
>  					/*
>  					 * Remove the pipe from the poll set and continue the loop
>  					 * since their might be data to consume.
>  					 */
> -					lttng_poll_del(&events, ctx->consumer_metadata_pipe[0]);
> -					ret = close(ctx->consumer_metadata_pipe[0]);
> -					if (ret < 0) {
> -						PERROR("close metadata pipe");
> -					}
> +					lttng_poll_del(&events,
> +							lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
> +					lttng_pipe_read_close(ctx->consumer_metadata_pipe);
>  					continue;
>  				} else if (revents & LPOLLIN) {
> -					do {
> -						/* Get the stream pointer received */
> -						ret = read(pollfd, &stream, sizeof(stream));
> -					} while (ret < 0 && errno == EINTR);
> -					if (ret < 0 ||
> -							ret < sizeof(struct lttng_consumer_stream *)) {
> -						PERROR("read metadata stream");
> +					ssize_t pipe_len;
> +
> +					pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
> +							&stream, sizeof(stream));
> +					if (pipe_len < 0) {
> +						ERR("read metadata stream, ret: %ld", pipe_len);
>  						/*
> -						 * Let's continue here and hope we can still work
> -						 * without stopping the consumer. XXX: Should we?
> +						 * Continue here to handle the rest of the streams.
>  						 */
>  						continue;
>  					}
> @@ -2543,10 +2526,7 @@ end:
>  	 * only tracked fd in the poll set. The thread will take care of closing
>  	 * the read side.
>  	 */
> -	ret = close(ctx->consumer_metadata_pipe[1]);
> -	if (ret < 0) {
> -		PERROR("close data pipe");
> -	}
> +	(void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
>  
>  	destroy_data_stream_ht(data_ht);
>  
> diff --git a/src/common/consumer.h b/src/common/consumer.h
> index 91039e8..3726fd1 100644
> --- a/src/common/consumer.h
> +++ b/src/common/consumer.h
> @@ -351,7 +351,7 @@ struct lttng_consumer_local_data {
>  	/* to let the signal handler wake up the fd receiver thread */
>  	int consumer_should_quit[2];
>  	/* Metadata poll thread pipe. Transfer metadata stream to it */
> -	int consumer_metadata_pipe[2];
> +	struct lttng_pipe *consumer_metadata_pipe;
>  };
>  
>  /*
> diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c
> index d8aec49..f23fc9c 100644
> --- a/src/common/kernel-consumer/kernel-consumer.c
> +++ b/src/common/kernel-consumer/kernel-consumer.c
> @@ -288,7 +288,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
>  
>  		/* Get the right pipe where the stream will be sent. */
>  		if (new_stream->metadata_flag) {
> -			stream_pipe = ctx->consumer_metadata_pipe[1];
> +			stream_pipe = lttng_pipe_get_writefd(ctx->consumer_metadata_pipe);
>  		} else {
>  			stream_pipe = lttng_pipe_get_writefd(ctx->consumer_data_pipe);
>  		}
> diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c
> index ddf80da..a81e9d4 100644
> --- a/src/common/ust-consumer/ust-consumer.c
> +++ b/src/common/ust-consumer/ust-consumer.c
> @@ -189,7 +189,7 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream,
>  
>  	/* Get the right pipe where the stream will be sent. */
>  	if (stream->metadata_flag) {
> -		stream_pipe = ctx->consumer_metadata_pipe[1];
> +		stream_pipe = lttng_pipe_get_writefd(ctx->consumer_metadata_pipe);
>  	} else {
>  		stream_pipe = lttng_pipe_get_writefd(ctx->consumer_data_pipe);
>  	}
> -- 
> 1.7.10.4
> 
> 
> _______________________________________________
> lttng-dev mailing list
> lttng-dev at lists.lttng.org
> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev

-- 
Mathieu Desnoyers
EfficiOS Inc.
http://www.efficios.com



More information about the lttng-dev mailing list