[lttng-dev] [PATCH lttng-tools 2/4] Move add data stream to the data thread
David Goulet
dgoulet at efficios.com
Mon Oct 15 11:40:31 EDT 2012
Mathieu Desnoyers:
> * David Goulet (dgoulet at efficios.com) wrote:
>> As a second step of refactoring, upon receiving a data stream, we send
>> it to the data thread that is now in charge of handling it.
>>
>> Furthermore, in order for this to behave correctly, we have to make the
>> ustctl actions on the stream upon before passing it to the right thread
>> (the kernel does not need special actions.). This way, once the sessiond
>> thread reply back to the session daemon, the stream is sure to be open
>> and ready for data to be recorded on the application side so we avoid a
>> race between the application thinking the stream is ready and the stream
>> thread still scheduled out.
>
> Normally, as long as we have a reference on the SHM file descriptor, and
> we have the wakeup FD, we should be good to fetch the data of buffers
> belonging to an application that has already exited, even if it did so
> before the ustctl calls are done.
>
> So I'm wondering why you do the ustctl calls in the sessiond thread ? It
> seems to complexify the implementation needlessly: we could still do the
> ustctl calls and output file open at the same location, the
> data/metadata threads.
Hmmm, it was my understanding that does ustctl_* calls were needed
before the trace could be recording thus making them quickly. Wrong?
David
>
> Thanks,
>
> Mathieu
>
>>
>> This commit should speed up the add stream process for the session
>> daemon. There is still some actions to move out of the session daemon
>> poll thread to gain speed significantly, especially for network
>> streaming.
>>
>> Signed-off-by: David Goulet <dgoulet at efficios.com>
>> ---
>> src/common/consumer.c | 123 +++++++++++---------------
>> src/common/consumer.h | 1 +
>> src/common/kernel-consumer/kernel-consumer.c | 24 ++---
>> src/common/ust-consumer/ust-consumer.c | 40 ++++-----
>> 4 files changed, 78 insertions(+), 110 deletions(-)
>>
>> diff --git a/src/common/consumer.c b/src/common/consumer.c
>> index 055de1b..1d2b1f7 100644
>> --- a/src/common/consumer.c
>> +++ b/src/common/consumer.c
>> @@ -89,7 +89,7 @@ static struct lttng_consumer_stream *consumer_find_stream(int key,
>> return stream;
>> }
>>
>> -static void consumer_steal_stream_key(int key, struct lttng_ht *ht)
>> +void consumer_steal_stream_key(int key, struct lttng_ht *ht)
>> {
>> struct lttng_consumer_stream *stream;
>>
>> @@ -409,6 +409,14 @@ struct lttng_consumer_stream *consumer_allocate_stream(
>> lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd);
>> lttng_ht_node_init_ulong(&stream->node, stream->key);
>>
>> + /*
>> + * The cpu number is needed before using any ustctl_* actions. Ignored for
>> + * the kernel so the value does not matter.
>> + */
>> + pthread_mutex_lock(&consumer_data.lock);
>> + stream->cpu = stream->chan->cpucount++;
>> + pthread_mutex_unlock(&consumer_data.lock);
>> +
>> DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
>> " out_fd %d, net_seq_idx %d)", stream->path_name, stream->key,
>> stream->shm_fd, stream->wait_fd,
>> @@ -437,28 +445,6 @@ int consumer_add_stream(struct lttng_consumer_stream *stream)
>> pthread_mutex_lock(&consumer_data.lock);
>> rcu_read_lock();
>>
>> - switch (consumer_data.type) {
>> - case LTTNG_CONSUMER_KERNEL:
>> - break;
>> - case LTTNG_CONSUMER32_UST:
>> - case LTTNG_CONSUMER64_UST:
>> - stream->cpu = stream->chan->cpucount++;
>> - ret = lttng_ustconsumer_add_stream(stream);
>> - if (ret) {
>> - ret = -EINVAL;
>> - goto error;
>> - }
>> -
>> - /* Steal stream identifier only for UST */
>> - consumer_steal_stream_key(stream->key, consumer_data.stream_ht);
>> - break;
>> - default:
>> - ERR("Unknown consumer_data type");
>> - assert(0);
>> - ret = -ENOSYS;
>> - goto error;
>> - }
>> -
>> lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
>>
>> /* Check and cleanup relayd */
>> @@ -485,7 +471,6 @@ int consumer_add_stream(struct lttng_consumer_stream *stream)
>> consumer_data.stream_count++;
>> consumer_data.need_update = 1;
>>
>> -error:
>> rcu_read_unlock();
>> pthread_mutex_unlock(&consumer_data.lock);
>>
>> @@ -1582,17 +1567,6 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
>>
>> DBG3("Consumer delete metadata stream %d", stream->wait_fd);
>>
>> - if (ht == NULL) {
>> - /* Means the stream was allocated but not successfully added */
>> - goto free_stream;
>> - }
>> -
>> - rcu_read_lock();
>> - iter.iter.node = &stream->waitfd_node.node;
>> - ret = lttng_ht_del(ht, &iter);
>> - assert(!ret);
>> - rcu_read_unlock();
>> -
>> pthread_mutex_lock(&consumer_data.lock);
>> switch (consumer_data.type) {
>> case LTTNG_CONSUMER_KERNEL:
>> @@ -1613,6 +1587,18 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
>> goto end;
>> }
>>
>> + if (ht == NULL) {
>> + pthread_mutex_unlock(&consumer_data.lock);
>> + /* Means the stream was allocated but not successfully added */
>> + goto free_stream;
>> + }
>> +
>> + rcu_read_lock();
>> + iter.iter.node = &stream->waitfd_node.node;
>> + ret = lttng_ht_del(ht, &iter);
>> + assert(!ret);
>> + rcu_read_unlock();
>> +
>> if (stream->out_fd >= 0) {
>> ret = close(stream->out_fd);
>> if (ret) {
>> @@ -1699,27 +1685,6 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
>>
>> pthread_mutex_lock(&consumer_data.lock);
>>
>> - switch (consumer_data.type) {
>> - case LTTNG_CONSUMER_KERNEL:
>> - break;
>> - case LTTNG_CONSUMER32_UST:
>> - case LTTNG_CONSUMER64_UST:
>> - ret = lttng_ustconsumer_add_stream(stream);
>> - if (ret) {
>> - ret = -EINVAL;
>> - goto error;
>> - }
>> -
>> - /* Steal stream identifier only for UST */
>> - consumer_steal_stream_key(stream->wait_fd, ht);
>> - break;
>> - default:
>> - ERR("Unknown consumer_data type");
>> - assert(0);
>> - ret = -ENOSYS;
>> - goto error;
>> - }
>> -
>> /*
>> * From here, refcounts are updated so be _careful_ when returning an error
>> * after this point.
>> @@ -1749,7 +1714,6 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
>> lttng_ht_add_unique_ulong(ht, &stream->waitfd_node);
>> rcu_read_unlock();
>>
>> -error:
>> pthread_mutex_unlock(&consumer_data.lock);
>> return ret;
>> }
>> @@ -1946,7 +1910,7 @@ void *consumer_thread_data_poll(void *data)
>> int num_rdy, num_hup, high_prio, ret, i;
>> struct pollfd *pollfd = NULL;
>> /* local view of the streams */
>> - struct lttng_consumer_stream **local_stream = NULL;
>> + struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
>> /* local view of consumer_data.fds_count */
>> int nb_fd = 0;
>> struct lttng_consumer_local_data *ctx = data;
>> @@ -2034,13 +1998,35 @@ void *consumer_thread_data_poll(void *data)
>> */
>> if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
>> size_t pipe_readlen;
>> - char tmp;
>>
>> DBG("consumer_poll_pipe wake up");
>> /* Consume 1 byte of pipe data */
>> do {
>> - pipe_readlen = read(ctx->consumer_poll_pipe[0], &tmp, 1);
>> + pipe_readlen = read(ctx->consumer_poll_pipe[0], &new_stream,
>> + sizeof(new_stream));
>> } while (pipe_readlen == -1 && errno == EINTR);
>> +
>> + /*
>> + * If the stream is NULL, just ignore it. It's also possible that
>> + * the sessiond poll thread changed the consumer_quit state and is
>> + * waking us up to test it.
>> + */
>> + if (new_stream == NULL) {
>> + continue;
>> + }
>> +
>> + ret = consumer_add_stream(new_stream);
>> + if (ret) {
>> + ERR("Consumer add stream %d failed. Continuing",
>> + new_stream->key);
>> + /*
>> + * At this point, if the add_stream fails, it is not in the
>> + * hash table thus passing the NULL value here.
>> + */
>> + consumer_del_stream(new_stream, NULL);
>> + }
>> +
>> + /* Continue to update the local streams and handle prio ones */
>> continue;
>> }
>>
>> @@ -2260,19 +2246,16 @@ end:
>> consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
>>
>> /*
>> - * Wake-up the other end by writing a null byte in the pipe
>> - * (non-blocking). Important note: Because writing into the
>> - * pipe is non-blocking (and therefore we allow dropping wakeup
>> - * data, as long as there is wakeup data present in the pipe
>> - * buffer to wake up the other end), the other end should
>> - * perform the following sequence for waiting:
>> - * 1) empty the pipe (reads).
>> - * 2) perform update operation.
>> - * 3) wait on the pipe (poll).
>> + * Notify the data poll thread to poll back again and test the
>> + * consumer_quit state to quit gracefully.
>> */
>> do {
>> - ret = write(ctx->consumer_poll_pipe[1], "", 1);
>> + struct lttng_consumer_stream *null_stream = NULL;
>> +
>> + ret = write(ctx->consumer_poll_pipe[1], &null_stream,
>> + sizeof(null_stream));
>> } while (ret < 0 && errno == EINTR);
>> +
>> rcu_unregister_thread();
>> return NULL;
>> }
>> diff --git a/src/common/consumer.h b/src/common/consumer.h
>> index 4b225e4..8e5891a 100644
>> --- a/src/common/consumer.h
>> +++ b/src/common/consumer.h
>> @@ -362,6 +362,7 @@ struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
>> struct consumer_relayd_sock_pair *consumer_find_relayd(int key);
>> int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
>> size_t data_size);
>> +void consumer_steal_stream_key(int key, struct lttng_ht *ht);
>>
>> extern struct lttng_consumer_local_data *lttng_consumer_create(
>> enum lttng_consumer_type type,
>> diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c
>> index 13cbe21..444f5e0 100644
>> --- a/src/common/kernel-consumer/kernel-consumer.c
>> +++ b/src/common/kernel-consumer/kernel-consumer.c
>> @@ -235,10 +235,12 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
>> consumer_del_stream(new_stream, NULL);
>> }
>> } else {
>> - ret = consumer_add_stream(new_stream);
>> - if (ret) {
>> - ERR("Consumer add stream %d failed. Continuing",
>> - new_stream->key);
>> + do {
>> + ret = write(ctx->consumer_poll_pipe[1], &new_stream,
>> + sizeof(new_stream));
>> + } while (ret < 0 && errno == EINTR);
>> + if (ret < 0) {
>> + PERROR("write data pipe");
>> consumer_del_stream(new_stream, NULL);
>> goto end_nosignal;
>> }
>> @@ -284,20 +286,6 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
>> goto end_nosignal;
>> }
>>
>> - /*
>> - * Wake-up the other end by writing a null byte in the pipe (non-blocking).
>> - * Important note: Because writing into the pipe is non-blocking (and
>> - * therefore we allow dropping wakeup data, as long as there is wakeup data
>> - * present in the pipe buffer to wake up the other end), the other end
>> - * should perform the following sequence for waiting:
>> - *
>> - * 1) empty the pipe (reads).
>> - * 2) perform update operation.
>> - * 3) wait on the pipe (poll).
>> - */
>> - do {
>> - ret = write(ctx->consumer_poll_pipe[1], "", 1);
>> - } while (ret < 0 && errno == EINTR);
>> end_nosignal:
>> rcu_read_unlock();
>>
>> diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c
>> index 1170687..4ca4b84 100644
>> --- a/src/common/ust-consumer/ust-consumer.c
>> +++ b/src/common/ust-consumer/ust-consumer.c
>> @@ -224,6 +224,18 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
>> goto end_nosignal;
>> }
>>
>> + /*
>> + * This needs to be done as soon as we can so we don't block the
>> + * application too long.
>> + */
>> + ret = lttng_ustconsumer_add_stream(new_stream);
>> + if (ret) {
>> + consumer_del_stream(new_stream, NULL);
>> + goto end_nosignal;
>> + }
>> + /* Steal stream identifier to avoid having streams with the same key */
>> + consumer_steal_stream_key(new_stream->key, consumer_data.stream_ht);
>> +
>> /* The stream is not metadata. Get relayd reference if exists. */
>> relayd = consumer_find_relayd(msg.u.stream.net_index);
>> if (relayd != NULL) {
>> @@ -265,14 +277,12 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
>> goto end_nosignal;
>> }
>> } else {
>> - ret = consumer_add_stream(new_stream);
>> - if (ret) {
>> - ERR("Consumer add stream %d failed. Continuing",
>> - new_stream->key);
>> - /*
>> - * At this point, if the add_stream fails, it is not in the
>> - * hash table thus passing the NULL value here.
>> - */
>> + do {
>> + ret = write(ctx->consumer_poll_pipe[1], &new_stream,
>> + sizeof(new_stream));
>> + } while (ret < 0 && errno == EINTR);
>> + if (ret < 0) {
>> + PERROR("write data pipe");
>> consumer_del_stream(new_stream, NULL);
>> goto end_nosignal;
>> }
>> @@ -334,20 +344,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
>> break;
>> }
>>
>> - /*
>> - * Wake-up the other end by writing a null byte in the pipe (non-blocking).
>> - * Important note: Because writing into the pipe is non-blocking (and
>> - * therefore we allow dropping wakeup data, as long as there is wakeup data
>> - * present in the pipe buffer to wake up the other end), the other end
>> - * should perform the following sequence for waiting:
>> - *
>> - * 1) empty the pipe (reads).
>> - * 2) perform update operation.
>> - * 3) wait on the pipe (poll).
>> - */
>> - do {
>> - ret = write(ctx->consumer_poll_pipe[1], "", 1);
>> - } while (ret < 0 && errno == EINTR);
>> end_nosignal:
>> rcu_read_unlock();
>>
>> --
>> 1.7.10.4
>>
>>
>> _______________________________________________
>> lttng-dev mailing list
>> lttng-dev at lists.lttng.org
>> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev
>
More information about the lttng-dev
mailing list