[lttng-dev] [PATCH lttng-tools 2/4] Move add data stream to the data thread

David Goulet dgoulet at efficios.com
Mon Oct 15 13:40:52 EDT 2012



Mathieu Desnoyers:
> * David Goulet (dgoulet at efficios.com) wrote:
>>
>>
>> Mathieu Desnoyers:
>>> * David Goulet (dgoulet at efficios.com) wrote:
>>>> As a second step of refactoring, upon receiving a data stream, we send
>>>> it to the data thread that is now in charge of handling it.
>>>>
>>>> Furthermore, in order for this to behave correctly, we have to make the
>>>> ustctl actions on the stream upon before passing it to the right thread
>>>> (the kernel does not need special actions.). This way, once the sessiond
>>>> thread reply back to the session daemon, the stream is sure to be open
>>>> and ready for data to be recorded on the application side so we avoid a
>>>> race between the application thinking the stream is ready and the stream
>>>> thread still scheduled out.
>>>
>>> Normally, as long as we have a reference on the SHM file descriptor, and
>>> we have the wakeup FD, we should be good to fetch the data of buffers
>>> belonging to an application that has already exited, even if it did so
>>> before the ustctl calls are done.
>>>
>>> So I'm wondering why you do the ustctl calls in the sessiond thread ? It
>>> seems to complexify the implementation needlessly: we could still do the
>>> ustctl calls and output file open at the same location, the
>>> data/metadata threads.
>>
>> Hmmm, it was my understanding that does
> 
> does -> those
> 
>> ustctl_* calls were needed
>> before the trace could be recording thus making them quickly. Wrong?
> 
> Can you rephrase your question ? I don't understand.
> 

My understanding was that _those_ ustctl calls need to be done before
the tracer could start recording data. This is why they were moved to
the session daemon thread.

Am I wrong here? When receiving an UST stream< on the consumer side, is
the SHM reference already acquired?

David

> Thanks,
> 
> Mathieu
> 
>>
>> David
>>
>>>
>>> Thanks,
>>>
>>> Mathieu
>>>
>>>>
>>>> This commit should speed up the add stream process for the session
>>>> daemon. There is still some actions to move out of the session daemon
>>>> poll thread to gain speed significantly, especially for network
>>>> streaming.
>>>>
>>>> Signed-off-by: David Goulet <dgoulet at efficios.com>
>>>> ---
>>>>  src/common/consumer.c                        |  123 +++++++++++---------------
>>>>  src/common/consumer.h                        |    1 +
>>>>  src/common/kernel-consumer/kernel-consumer.c |   24 ++---
>>>>  src/common/ust-consumer/ust-consumer.c       |   40 ++++-----
>>>>  4 files changed, 78 insertions(+), 110 deletions(-)
>>>>
>>>> diff --git a/src/common/consumer.c b/src/common/consumer.c
>>>> index 055de1b..1d2b1f7 100644
>>>> --- a/src/common/consumer.c
>>>> +++ b/src/common/consumer.c
>>>> @@ -89,7 +89,7 @@ static struct lttng_consumer_stream *consumer_find_stream(int key,
>>>>  	return stream;
>>>>  }
>>>>  
>>>> -static void consumer_steal_stream_key(int key, struct lttng_ht *ht)
>>>> +void consumer_steal_stream_key(int key, struct lttng_ht *ht)
>>>>  {
>>>>  	struct lttng_consumer_stream *stream;
>>>>  
>>>> @@ -409,6 +409,14 @@ struct lttng_consumer_stream *consumer_allocate_stream(
>>>>  	lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd);
>>>>  	lttng_ht_node_init_ulong(&stream->node, stream->key);
>>>>  
>>>> +	/*
>>>> +	 * The cpu number is needed before using any ustctl_* actions. Ignored for
>>>> +	 * the kernel so the value does not matter.
>>>> +	 */
>>>> +	pthread_mutex_lock(&consumer_data.lock);
>>>> +	stream->cpu = stream->chan->cpucount++;
>>>> +	pthread_mutex_unlock(&consumer_data.lock);
>>>> +
>>>>  	DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
>>>>  			" out_fd %d, net_seq_idx %d)", stream->path_name, stream->key,
>>>>  			stream->shm_fd, stream->wait_fd,
>>>> @@ -437,28 +445,6 @@ int consumer_add_stream(struct lttng_consumer_stream *stream)
>>>>  	pthread_mutex_lock(&consumer_data.lock);
>>>>  	rcu_read_lock();
>>>>  
>>>> -	switch (consumer_data.type) {
>>>> -	case LTTNG_CONSUMER_KERNEL:
>>>> -		break;
>>>> -	case LTTNG_CONSUMER32_UST:
>>>> -	case LTTNG_CONSUMER64_UST:
>>>> -		stream->cpu = stream->chan->cpucount++;
>>>> -		ret = lttng_ustconsumer_add_stream(stream);
>>>> -		if (ret) {
>>>> -			ret = -EINVAL;
>>>> -			goto error;
>>>> -		}
>>>> -
>>>> -		/* Steal stream identifier only for UST */
>>>> -		consumer_steal_stream_key(stream->key, consumer_data.stream_ht);
>>>> -		break;
>>>> -	default:
>>>> -		ERR("Unknown consumer_data type");
>>>> -		assert(0);
>>>> -		ret = -ENOSYS;
>>>> -		goto error;
>>>> -	}
>>>> -
>>>>  	lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
>>>>  
>>>>  	/* Check and cleanup relayd */
>>>> @@ -485,7 +471,6 @@ int consumer_add_stream(struct lttng_consumer_stream *stream)
>>>>  	consumer_data.stream_count++;
>>>>  	consumer_data.need_update = 1;
>>>>  
>>>> -error:
>>>>  	rcu_read_unlock();
>>>>  	pthread_mutex_unlock(&consumer_data.lock);
>>>>  
>>>> @@ -1582,17 +1567,6 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
>>>>  
>>>>  	DBG3("Consumer delete metadata stream %d", stream->wait_fd);
>>>>  
>>>> -	if (ht == NULL) {
>>>> -		/* Means the stream was allocated but not successfully added */
>>>> -		goto free_stream;
>>>> -	}
>>>> -
>>>> -	rcu_read_lock();
>>>> -	iter.iter.node = &stream->waitfd_node.node;
>>>> -	ret = lttng_ht_del(ht, &iter);
>>>> -	assert(!ret);
>>>> -	rcu_read_unlock();
>>>> -
>>>>  	pthread_mutex_lock(&consumer_data.lock);
>>>>  	switch (consumer_data.type) {
>>>>  	case LTTNG_CONSUMER_KERNEL:
>>>> @@ -1613,6 +1587,18 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
>>>>  		goto end;
>>>>  	}
>>>>  
>>>> +	if (ht == NULL) {
>>>> +		pthread_mutex_unlock(&consumer_data.lock);
>>>> +		/* Means the stream was allocated but not successfully added */
>>>> +		goto free_stream;
>>>> +	}
>>>> +
>>>> +	rcu_read_lock();
>>>> +	iter.iter.node = &stream->waitfd_node.node;
>>>> +	ret = lttng_ht_del(ht, &iter);
>>>> +	assert(!ret);
>>>> +	rcu_read_unlock();
>>>> +
>>>>  	if (stream->out_fd >= 0) {
>>>>  		ret = close(stream->out_fd);
>>>>  		if (ret) {
>>>> @@ -1699,27 +1685,6 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
>>>>  
>>>>  	pthread_mutex_lock(&consumer_data.lock);
>>>>  
>>>> -	switch (consumer_data.type) {
>>>> -	case LTTNG_CONSUMER_KERNEL:
>>>> -		break;
>>>> -	case LTTNG_CONSUMER32_UST:
>>>> -	case LTTNG_CONSUMER64_UST:
>>>> -		ret = lttng_ustconsumer_add_stream(stream);
>>>> -		if (ret) {
>>>> -			ret = -EINVAL;
>>>> -			goto error;
>>>> -		}
>>>> -
>>>> -		/* Steal stream identifier only for UST */
>>>> -		consumer_steal_stream_key(stream->wait_fd, ht);
>>>> -		break;
>>>> -	default:
>>>> -		ERR("Unknown consumer_data type");
>>>> -		assert(0);
>>>> -		ret = -ENOSYS;
>>>> -		goto error;
>>>> -	}
>>>> -
>>>>  	/*
>>>>  	 * From here, refcounts are updated so be _careful_ when returning an error
>>>>  	 * after this point.
>>>> @@ -1749,7 +1714,6 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
>>>>  	lttng_ht_add_unique_ulong(ht, &stream->waitfd_node);
>>>>  	rcu_read_unlock();
>>>>  
>>>> -error:
>>>>  	pthread_mutex_unlock(&consumer_data.lock);
>>>>  	return ret;
>>>>  }
>>>> @@ -1946,7 +1910,7 @@ void *consumer_thread_data_poll(void *data)
>>>>  	int num_rdy, num_hup, high_prio, ret, i;
>>>>  	struct pollfd *pollfd = NULL;
>>>>  	/* local view of the streams */
>>>> -	struct lttng_consumer_stream **local_stream = NULL;
>>>> +	struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
>>>>  	/* local view of consumer_data.fds_count */
>>>>  	int nb_fd = 0;
>>>>  	struct lttng_consumer_local_data *ctx = data;
>>>> @@ -2034,13 +1998,35 @@ void *consumer_thread_data_poll(void *data)
>>>>  		 */
>>>>  		if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
>>>>  			size_t pipe_readlen;
>>>> -			char tmp;
>>>>  
>>>>  			DBG("consumer_poll_pipe wake up");
>>>>  			/* Consume 1 byte of pipe data */
>>>>  			do {
>>>> -				pipe_readlen = read(ctx->consumer_poll_pipe[0], &tmp, 1);
>>>> +				pipe_readlen = read(ctx->consumer_poll_pipe[0], &new_stream,
>>>> +						sizeof(new_stream));
>>>>  			} while (pipe_readlen == -1 && errno == EINTR);
>>>> +
>>>> +			/*
>>>> +			 * If the stream is NULL, just ignore it. It's also possible that
>>>> +			 * the sessiond poll thread changed the consumer_quit state and is
>>>> +			 * waking us up to test it.
>>>> +			 */
>>>> +			if (new_stream == NULL) {
>>>> +				continue;
>>>> +			}
>>>> +
>>>> +			ret = consumer_add_stream(new_stream);
>>>> +			if (ret) {
>>>> +				ERR("Consumer add stream %d failed. Continuing",
>>>> +						new_stream->key);
>>>> +				/*
>>>> +				 * At this point, if the add_stream fails, it is not in the
>>>> +				 * hash table thus passing the NULL value here.
>>>> +				 */
>>>> +				consumer_del_stream(new_stream, NULL);
>>>> +			}
>>>> +
>>>> +			/* Continue to update the local streams and handle prio ones */
>>>>  			continue;
>>>>  		}
>>>>  
>>>> @@ -2260,19 +2246,16 @@ end:
>>>>  	consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
>>>>  
>>>>  	/*
>>>> -	 * Wake-up the other end by writing a null byte in the pipe
>>>> -	 * (non-blocking). Important note: Because writing into the
>>>> -	 * pipe is non-blocking (and therefore we allow dropping wakeup
>>>> -	 * data, as long as there is wakeup data present in the pipe
>>>> -	 * buffer to wake up the other end), the other end should
>>>> -	 * perform the following sequence for waiting:
>>>> -	 * 1) empty the pipe (reads).
>>>> -	 * 2) perform update operation.
>>>> -	 * 3) wait on the pipe (poll).
>>>> +	 * Notify the data poll thread to poll back again and test the
>>>> +	 * consumer_quit state to quit gracefully.
>>>>  	 */
>>>>  	do {
>>>> -		ret = write(ctx->consumer_poll_pipe[1], "", 1);
>>>> +		struct lttng_consumer_stream *null_stream = NULL;
>>>> +
>>>> +		ret = write(ctx->consumer_poll_pipe[1], &null_stream,
>>>> +				sizeof(null_stream));
>>>>  	} while (ret < 0 && errno == EINTR);
>>>> +
>>>>  	rcu_unregister_thread();
>>>>  	return NULL;
>>>>  }
>>>> diff --git a/src/common/consumer.h b/src/common/consumer.h
>>>> index 4b225e4..8e5891a 100644
>>>> --- a/src/common/consumer.h
>>>> +++ b/src/common/consumer.h
>>>> @@ -362,6 +362,7 @@ struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
>>>>  struct consumer_relayd_sock_pair *consumer_find_relayd(int key);
>>>>  int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
>>>>  		size_t data_size);
>>>> +void consumer_steal_stream_key(int key, struct lttng_ht *ht);
>>>>  
>>>>  extern struct lttng_consumer_local_data *lttng_consumer_create(
>>>>  		enum lttng_consumer_type type,
>>>> diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c
>>>> index 13cbe21..444f5e0 100644
>>>> --- a/src/common/kernel-consumer/kernel-consumer.c
>>>> +++ b/src/common/kernel-consumer/kernel-consumer.c
>>>> @@ -235,10 +235,12 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
>>>>  				consumer_del_stream(new_stream, NULL);
>>>>  			}
>>>>  		} else {
>>>> -			ret = consumer_add_stream(new_stream);
>>>> -			if (ret) {
>>>> -				ERR("Consumer add stream %d failed. Continuing",
>>>> -						new_stream->key);
>>>> +			do {
>>>> +				ret = write(ctx->consumer_poll_pipe[1], &new_stream,
>>>> +						sizeof(new_stream));
>>>> +			} while (ret < 0 && errno == EINTR);
>>>> +			if (ret < 0) {
>>>> +				PERROR("write data pipe");
>>>>  				consumer_del_stream(new_stream, NULL);
>>>>  				goto end_nosignal;
>>>>  			}
>>>> @@ -284,20 +286,6 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
>>>>  		goto end_nosignal;
>>>>  	}
>>>>  
>>>> -	/*
>>>> -	 * Wake-up the other end by writing a null byte in the pipe (non-blocking).
>>>> -	 * Important note: Because writing into the pipe is non-blocking (and
>>>> -	 * therefore we allow dropping wakeup data, as long as there is wakeup data
>>>> -	 * present in the pipe buffer to wake up the other end), the other end
>>>> -	 * should perform the following sequence for waiting:
>>>> -	 *
>>>> -	 * 1) empty the pipe (reads).
>>>> -	 * 2) perform update operation.
>>>> -	 * 3) wait on the pipe (poll).
>>>> -	 */
>>>> -	do {
>>>> -		ret = write(ctx->consumer_poll_pipe[1], "", 1);
>>>> -	} while (ret < 0 && errno == EINTR);
>>>>  end_nosignal:
>>>>  	rcu_read_unlock();
>>>>  
>>>> diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c
>>>> index 1170687..4ca4b84 100644
>>>> --- a/src/common/ust-consumer/ust-consumer.c
>>>> +++ b/src/common/ust-consumer/ust-consumer.c
>>>> @@ -224,6 +224,18 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
>>>>  			goto end_nosignal;
>>>>  		}
>>>>  
>>>> +		/*
>>>> +		 * This needs to be done as soon as we can so we don't block the
>>>> +		 * application too long.
>>>> +		 */
>>>> +		ret = lttng_ustconsumer_add_stream(new_stream);
>>>> +		if (ret) {
>>>> +			consumer_del_stream(new_stream, NULL);
>>>> +			goto end_nosignal;
>>>> +		}
>>>> +		/* Steal stream identifier to avoid having streams with the same key */
>>>> +		consumer_steal_stream_key(new_stream->key, consumer_data.stream_ht);
>>>> +
>>>>  		/* The stream is not metadata. Get relayd reference if exists. */
>>>>  		relayd = consumer_find_relayd(msg.u.stream.net_index);
>>>>  		if (relayd != NULL) {
>>>> @@ -265,14 +277,12 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
>>>>  				goto end_nosignal;
>>>>  			}
>>>>  		} else {
>>>> -			ret = consumer_add_stream(new_stream);
>>>> -			if (ret) {
>>>> -				ERR("Consumer add stream %d failed. Continuing",
>>>> -						new_stream->key);
>>>> -				/*
>>>> -				 * At this point, if the add_stream fails, it is not in the
>>>> -				 * hash table thus passing the NULL value here.
>>>> -				 */
>>>> +			do {
>>>> +				ret = write(ctx->consumer_poll_pipe[1], &new_stream,
>>>> +						sizeof(new_stream));
>>>> +			} while (ret < 0 && errno == EINTR);
>>>> +			if (ret < 0) {
>>>> +				PERROR("write data pipe");
>>>>  				consumer_del_stream(new_stream, NULL);
>>>>  				goto end_nosignal;
>>>>  			}
>>>> @@ -334,20 +344,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
>>>>  		break;
>>>>  	}
>>>>  
>>>> -	/*
>>>> -	 * Wake-up the other end by writing a null byte in the pipe (non-blocking).
>>>> -	 * Important note: Because writing into the pipe is non-blocking (and
>>>> -	 * therefore we allow dropping wakeup data, as long as there is wakeup data
>>>> -	 * present in the pipe buffer to wake up the other end), the other end
>>>> -	 * should perform the following sequence for waiting:
>>>> -	 *
>>>> -	 * 1) empty the pipe (reads).
>>>> -	 * 2) perform update operation.
>>>> -	 * 3) wait on the pipe (poll).
>>>> -	 */
>>>> -	do {
>>>> -		ret = write(ctx->consumer_poll_pipe[1], "", 1);
>>>> -	} while (ret < 0 && errno == EINTR);
>>>>  end_nosignal:
>>>>  	rcu_read_unlock();
>>>>  
>>>> -- 
>>>> 1.7.10.4
>>>>
>>>>
>>>> _______________________________________________
>>>> lttng-dev mailing list
>>>> lttng-dev at lists.lttng.org
>>>> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev
>>>
> 



More information about the lttng-dev mailing list