[lttng-dev] [PATCH 1/3 lttng-tools] Change consumer_data_pipe to be a lttng_pipe

Mathieu Desnoyers mathieu.desnoyers at efficios.com
Wed May 15 15:16:41 EDT 2013


* David Goulet (dgoulet at efficios.com) wrote:
> Also, an important change here is that this pipe is no longer in non
> block mode. Before sending stream's pointer over this pipe, only one
> byte was written thus making it unlikely to fail in a read/write race
> condition between threads. Now, 4 bytes are written so keeping this pipe
> non block with threads is a bit of a "looking for trouble situation".
> 
> The lttng pipe wrappers make sure that the read and write side are
> synchronized between threads using a mutex for each side. Furthermore,
> the read and write handle partial I/O and EINTR meaning that once the
> call returns we are sure that either everything was read/written or an
> error occured thus making it not possible for the read side to block
> indefinitely after a poll event.
> 
> Signed-off-by: David Goulet <dgoulet at efficios.com>
> ---
>  src/common/consumer.c                        |   53 ++++++++++++--------------
>  src/common/consumer.h                        |    3 +-
>  src/common/kernel-consumer/kernel-consumer.c |    3 +-
>  src/common/ust-consumer/ust-consumer.c       |    2 +-
>  4 files changed, 29 insertions(+), 32 deletions(-)
> 
> diff --git a/src/common/consumer.c b/src/common/consumer.c
> index 01266a7..bd618dd 100644
> --- a/src/common/consumer.c
> +++ b/src/common/consumer.c
> @@ -91,6 +91,20 @@ static void notify_thread_pipe(int wpipe)
>  	} while (ret < 0 && errno == EINTR);
>  }
>  
> +/*
> + * Notify a thread lttng pipe to poll back again. This usually means that some
> + * global state has changed so we just send back the thread in a poll wait
> + * call.
> + */
> +static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
> +{
> +	struct lttng_consumer_stream *null_stream = NULL;
> +
> +	assert(pipe);
> +
> +	(void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
> +}
> +
>  static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
>  		struct lttng_consumer_channel *chan,
>  		uint64_t key,
> @@ -406,7 +420,7 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
>  	 * read of this status which happens AFTER receiving this notify.
>  	 */
>  	if (ctx) {
> -		notify_thread_pipe(ctx->consumer_data_pipe[1]);
> +		notify_thread_lttng_pipe(ctx->consumer_data_pipe);
>  		notify_thread_pipe(ctx->consumer_metadata_pipe[1]);
>  	}
>  }
> @@ -971,7 +985,7 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
>  	 * Insert the consumer_data_pipe at the end of the array and don't
>  	 * increment i so nb_fd is the number of real FD.
>  	 */
> -	(*pollfd)[i].fd = ctx->consumer_data_pipe[0];
> +	(*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
>  	(*pollfd)[i].events = POLLIN | POLLPRI;
>  	return i;
>  }
> @@ -1167,26 +1181,11 @@ struct lttng_consumer_local_data *lttng_consumer_create(
>  	ctx->on_recv_stream = recv_stream;
>  	ctx->on_update_stream = update_stream;
>  
> -	ret = pipe(ctx->consumer_data_pipe);
> -	if (ret < 0) {
> -		PERROR("Error creating poll pipe");
> +	ctx->consumer_data_pipe = lttng_pipe_open(0);
> +	if (!ctx->consumer_data_pipe) {
>  		goto error_poll_pipe;
>  	}
>  
> -	/* set read end of the pipe to non-blocking */
> -	ret = fcntl(ctx->consumer_data_pipe[0], F_SETFL, O_NONBLOCK);
> -	if (ret < 0) {
> -		PERROR("fcntl O_NONBLOCK");
> -		goto error_poll_fcntl;
> -	}
> -
> -	/* set write end of the pipe to non-blocking */
> -	ret = fcntl(ctx->consumer_data_pipe[1], F_SETFL, O_NONBLOCK);
> -	if (ret < 0) {
> -		PERROR("fcntl O_NONBLOCK");
> -		goto error_poll_fcntl;
> -	}
> -
>  	ret = pipe(ctx->consumer_should_quit);
>  	if (ret < 0) {
>  		PERROR("Error creating recv pipe");
> @@ -1225,9 +1224,8 @@ error_channel_pipe:
>  	utils_close_pipe(ctx->consumer_thread_pipe);
>  error_thread_pipe:
>  	utils_close_pipe(ctx->consumer_should_quit);
> -error_poll_fcntl:
>  error_quit_pipe:
> -	utils_close_pipe(ctx->consumer_data_pipe);
> +	lttng_pipe_destroy(ctx->consumer_data_pipe);
>  error_poll_pipe:
>  	free(ctx);
>  error:
> @@ -1253,7 +1251,7 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
>  	}
>  	utils_close_pipe(ctx->consumer_thread_pipe);
>  	utils_close_pipe(ctx->consumer_channel_pipe);
> -	utils_close_pipe(ctx->consumer_data_pipe);
> +	lttng_pipe_destroy(ctx->consumer_data_pipe);
>  	utils_close_pipe(ctx->consumer_should_quit);
>  	utils_close_pipe(ctx->consumer_splice_metadata_pipe);
>  
> @@ -2402,13 +2400,10 @@ void *consumer_thread_data_poll(void *data)
>  			ssize_t pipe_readlen;
>  
>  			DBG("consumer_data_pipe wake up");
> -			/* Consume 1 byte of pipe data */
> -			do {
> -				pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream,
> -						sizeof(new_stream));
> -			} while (pipe_readlen == -1 && errno == EINTR);
> +			pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
> +					&new_stream, sizeof(new_stream));
>  			if (pipe_readlen < 0) {

can lttng_pipe_read return a value smaller than sizeof(new_stream) on
some error conditions ? (but higher than 0)

Thanks,

Mathieu

> -				PERROR("read consumer data pipe");
> +				ERR("Consumer data pipe ret %ld", pipe_readlen);
>  				/* Continue so we can at least handle the current stream(s). */
>  				continue;
>  			}
> @@ -2968,7 +2963,7 @@ end:
>  	 * Notify the data poll thread to poll back again and test the
>  	 * consumer_quit state that we just set so to quit gracefully.
>  	 */
> -	notify_thread_pipe(ctx->consumer_data_pipe[1]);
> +	notify_thread_lttng_pipe(ctx->consumer_data_pipe);
>  
>  	notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
>  
> diff --git a/src/common/consumer.h b/src/common/consumer.h
> index 43989e4..91039e8 100644
> --- a/src/common/consumer.h
> +++ b/src/common/consumer.h
> @@ -31,6 +31,7 @@
>  #include <common/compat/fcntl.h>
>  #include <common/compat/uuid.h>
>  #include <common/sessiond-comm/sessiond-comm.h>
> +#include <common/pipe.h>
>  
>  /* Commands for consumer */
>  enum lttng_consumer_command {
> @@ -346,7 +347,7 @@ struct lttng_consumer_local_data {
>  	int consumer_channel_pipe[2];
>  	int consumer_splice_metadata_pipe[2];
>  	/* Data stream poll thread pipe. To transfer data stream to the thread */
> -	int consumer_data_pipe[2];
> +	struct lttng_pipe *consumer_data_pipe;
>  	/* to let the signal handler wake up the fd receiver thread */
>  	int consumer_should_quit[2];
>  	/* Metadata poll thread pipe. Transfer metadata stream to it */
> diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c
> index 2cf9ac1..d8aec49 100644
> --- a/src/common/kernel-consumer/kernel-consumer.c
> +++ b/src/common/kernel-consumer/kernel-consumer.c
> @@ -34,6 +34,7 @@
>  #include <common/sessiond-comm/sessiond-comm.h>
>  #include <common/sessiond-comm/relayd.h>
>  #include <common/compat/fcntl.h>
> +#include <common/pipe.h>
>  #include <common/relayd/relayd.h>
>  #include <common/utils.h>
>  
> @@ -289,7 +290,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
>  		if (new_stream->metadata_flag) {
>  			stream_pipe = ctx->consumer_metadata_pipe[1];
>  		} else {
> -			stream_pipe = ctx->consumer_data_pipe[1];
> +			stream_pipe = lttng_pipe_get_writefd(ctx->consumer_data_pipe);
>  		}
>  
>  		do {
> diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c
> index 031a7cb..ddf80da 100644
> --- a/src/common/ust-consumer/ust-consumer.c
> +++ b/src/common/ust-consumer/ust-consumer.c
> @@ -191,7 +191,7 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream,
>  	if (stream->metadata_flag) {
>  		stream_pipe = ctx->consumer_metadata_pipe[1];
>  	} else {
> -		stream_pipe = ctx->consumer_data_pipe[1];
> +		stream_pipe = lttng_pipe_get_writefd(ctx->consumer_data_pipe);
>  	}
>  
>  	do {
> -- 
> 1.7.10.4
> 
> 
> _______________________________________________
> lttng-dev mailing list
> lttng-dev at lists.lttng.org
> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev

-- 
Mathieu Desnoyers
EfficiOS Inc.
http://www.efficios.com



More information about the lttng-dev mailing list