[lttng-dev] [PATCH 1/3 lttng-tools] Change consumer_data_pipe to be a lttng_pipe
David Goulet
dgoulet at efficios.com
Thu May 16 09:54:52 EDT 2013
Mathieu Desnoyers:
> * David Goulet (dgoulet at efficios.com) wrote:
>> Also, an important change here is that this pipe is no longer in non
>> block mode. Before sending stream's pointer over this pipe, only one
>> byte was written thus making it unlikely to fail in a read/write race
>> condition between threads. Now, 4 bytes are written so keeping this pipe
>> non block with threads is a bit of a "looking for trouble situation".
>>
>> The lttng pipe wrappers make sure that the read and write side are
>> synchronized between threads using a mutex for each side. Furthermore,
>> the read and write handle partial I/O and EINTR meaning that once the
>> call returns we are sure that either everything was read/written or an
>> error occured thus making it not possible for the read side to block
>> indefinitely after a poll event.
>>
>> Signed-off-by: David Goulet <dgoulet at efficios.com>
>> ---
>> src/common/consumer.c | 53 ++++++++++++--------------
>> src/common/consumer.h | 3 +-
>> src/common/kernel-consumer/kernel-consumer.c | 3 +-
>> src/common/ust-consumer/ust-consumer.c | 2 +-
>> 4 files changed, 29 insertions(+), 32 deletions(-)
>>
>> diff --git a/src/common/consumer.c b/src/common/consumer.c
>> index 01266a7..bd618dd 100644
>> --- a/src/common/consumer.c
>> +++ b/src/common/consumer.c
>> @@ -91,6 +91,20 @@ static void notify_thread_pipe(int wpipe)
>> } while (ret < 0 && errno == EINTR);
>> }
>>
>> +/*
>> + * Notify a thread lttng pipe to poll back again. This usually means that some
>> + * global state has changed so we just send back the thread in a poll wait
>> + * call.
>> + */
>> +static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
>> +{
>> + struct lttng_consumer_stream *null_stream = NULL;
>> +
>> + assert(pipe);
>> +
>> + (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
>> +}
>> +
>> static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
>> struct lttng_consumer_channel *chan,
>> uint64_t key,
>> @@ -406,7 +420,7 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
>> * read of this status which happens AFTER receiving this notify.
>> */
>> if (ctx) {
>> - notify_thread_pipe(ctx->consumer_data_pipe[1]);
>> + notify_thread_lttng_pipe(ctx->consumer_data_pipe);
>> notify_thread_pipe(ctx->consumer_metadata_pipe[1]);
>> }
>> }
>> @@ -971,7 +985,7 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
>> * Insert the consumer_data_pipe at the end of the array and don't
>> * increment i so nb_fd is the number of real FD.
>> */
>> - (*pollfd)[i].fd = ctx->consumer_data_pipe[0];
>> + (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe);
>> (*pollfd)[i].events = POLLIN | POLLPRI;
>> return i;
>> }
>> @@ -1167,26 +1181,11 @@ struct lttng_consumer_local_data *lttng_consumer_create(
>> ctx->on_recv_stream = recv_stream;
>> ctx->on_update_stream = update_stream;
>>
>> - ret = pipe(ctx->consumer_data_pipe);
>> - if (ret < 0) {
>> - PERROR("Error creating poll pipe");
>> + ctx->consumer_data_pipe = lttng_pipe_open(0);
>> + if (!ctx->consumer_data_pipe) {
>> goto error_poll_pipe;
>> }
>>
>> - /* set read end of the pipe to non-blocking */
>> - ret = fcntl(ctx->consumer_data_pipe[0], F_SETFL, O_NONBLOCK);
>> - if (ret < 0) {
>> - PERROR("fcntl O_NONBLOCK");
>> - goto error_poll_fcntl;
>> - }
>> -
>> - /* set write end of the pipe to non-blocking */
>> - ret = fcntl(ctx->consumer_data_pipe[1], F_SETFL, O_NONBLOCK);
>> - if (ret < 0) {
>> - PERROR("fcntl O_NONBLOCK");
>> - goto error_poll_fcntl;
>> - }
>> -
>> ret = pipe(ctx->consumer_should_quit);
>> if (ret < 0) {
>> PERROR("Error creating recv pipe");
>> @@ -1225,9 +1224,8 @@ error_channel_pipe:
>> utils_close_pipe(ctx->consumer_thread_pipe);
>> error_thread_pipe:
>> utils_close_pipe(ctx->consumer_should_quit);
>> -error_poll_fcntl:
>> error_quit_pipe:
>> - utils_close_pipe(ctx->consumer_data_pipe);
>> + lttng_pipe_destroy(ctx->consumer_data_pipe);
>> error_poll_pipe:
>> free(ctx);
>> error:
>> @@ -1253,7 +1251,7 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
>> }
>> utils_close_pipe(ctx->consumer_thread_pipe);
>> utils_close_pipe(ctx->consumer_channel_pipe);
>> - utils_close_pipe(ctx->consumer_data_pipe);
>> + lttng_pipe_destroy(ctx->consumer_data_pipe);
>> utils_close_pipe(ctx->consumer_should_quit);
>> utils_close_pipe(ctx->consumer_splice_metadata_pipe);
>>
>> @@ -2402,13 +2400,10 @@ void *consumer_thread_data_poll(void *data)
>> ssize_t pipe_readlen;
>>
>> DBG("consumer_data_pipe wake up");
>> - /* Consume 1 byte of pipe data */
>> - do {
>> - pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream,
>> - sizeof(new_stream));
>> - } while (pipe_readlen == -1 && errno == EINTR);
>> + pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
>> + &new_stream, sizeof(new_stream));
>> if (pipe_readlen < 0) {
>
> can lttng_pipe_read return a value smaller than sizeof(new_stream) on
> some error conditions ? (but higher than 0)
As long as the pipe is not in NON BLOCK mode, no. It's either the full
number of bytes or a negative value. The wrapper takes care of partial
read and EINTR.
David
>
> Thanks,
>
> Mathieu
>
>> - PERROR("read consumer data pipe");
>> + ERR("Consumer data pipe ret %ld", pipe_readlen);
>> /* Continue so we can at least handle the current stream(s). */
>> continue;
>> }
>> @@ -2968,7 +2963,7 @@ end:
>> * Notify the data poll thread to poll back again and test the
>> * consumer_quit state that we just set so to quit gracefully.
>> */
>> - notify_thread_pipe(ctx->consumer_data_pipe[1]);
>> + notify_thread_lttng_pipe(ctx->consumer_data_pipe);
>>
>> notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
>>
>> diff --git a/src/common/consumer.h b/src/common/consumer.h
>> index 43989e4..91039e8 100644
>> --- a/src/common/consumer.h
>> +++ b/src/common/consumer.h
>> @@ -31,6 +31,7 @@
>> #include <common/compat/fcntl.h>
>> #include <common/compat/uuid.h>
>> #include <common/sessiond-comm/sessiond-comm.h>
>> +#include <common/pipe.h>
>>
>> /* Commands for consumer */
>> enum lttng_consumer_command {
>> @@ -346,7 +347,7 @@ struct lttng_consumer_local_data {
>> int consumer_channel_pipe[2];
>> int consumer_splice_metadata_pipe[2];
>> /* Data stream poll thread pipe. To transfer data stream to the thread */
>> - int consumer_data_pipe[2];
>> + struct lttng_pipe *consumer_data_pipe;
>> /* to let the signal handler wake up the fd receiver thread */
>> int consumer_should_quit[2];
>> /* Metadata poll thread pipe. Transfer metadata stream to it */
>> diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c
>> index 2cf9ac1..d8aec49 100644
>> --- a/src/common/kernel-consumer/kernel-consumer.c
>> +++ b/src/common/kernel-consumer/kernel-consumer.c
>> @@ -34,6 +34,7 @@
>> #include <common/sessiond-comm/sessiond-comm.h>
>> #include <common/sessiond-comm/relayd.h>
>> #include <common/compat/fcntl.h>
>> +#include <common/pipe.h>
>> #include <common/relayd/relayd.h>
>> #include <common/utils.h>
>>
>> @@ -289,7 +290,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
>> if (new_stream->metadata_flag) {
>> stream_pipe = ctx->consumer_metadata_pipe[1];
>> } else {
>> - stream_pipe = ctx->consumer_data_pipe[1];
>> + stream_pipe = lttng_pipe_get_writefd(ctx->consumer_data_pipe);
>> }
>>
>> do {
>> diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c
>> index 031a7cb..ddf80da 100644
>> --- a/src/common/ust-consumer/ust-consumer.c
>> +++ b/src/common/ust-consumer/ust-consumer.c
>> @@ -191,7 +191,7 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream,
>> if (stream->metadata_flag) {
>> stream_pipe = ctx->consumer_metadata_pipe[1];
>> } else {
>> - stream_pipe = ctx->consumer_data_pipe[1];
>> + stream_pipe = lttng_pipe_get_writefd(ctx->consumer_data_pipe);
>> }
>>
>> do {
>> --
>> 1.7.10.4
>>
>>
>> _______________________________________________
>> lttng-dev mailing list
>> lttng-dev at lists.lttng.org
>> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev
>
More information about the lttng-dev
mailing list