[lttng-dev] [PATCH lttng-tools 3/3] Fix: Stream allocation and insertion consistency
David Goulet
dgoulet at efficios.com
Wed Oct 3 12:42:10 EDT 2012
Mathieu Desnoyers:
> * David Goulet (dgoulet at efficios.com) wrote:
>> The stream allocation in the consumer was doing ustctl actions on the
>> stream and updating refounts. However, before inserting the stream into
>
> refounts -> refcounts.
>
>> the hash table and polling on the fd for data, an error could occur
>> which could stop the stream insertion hence creating multiple fd leaks,
>> mem leaks and bad refount state.
>
> refount -> refcount
>
>>
>> Furthermore, the consumer_del_stream now can destroy a stream even if
>> that stream is not added to the global hash table. The kernel and UST
>> consumer uses it on error between allocation and hash table insertion.
>
> consumer -> consumers
> uses -> use
>
>>
>> Signed-off-by: David Goulet <dgoulet at efficios.com>
>> ---
>> src/common/consumer.c | 219 +++++++++++++++++---------
>> src/common/kernel-consumer/kernel-consumer.c | 13 +-
>> src/common/ust-consumer/ust-consumer.c | 16 +-
>> src/common/ust-consumer/ust-consumer.h | 4 +-
>> 4 files changed, 172 insertions(+), 80 deletions(-)
>>
>> diff --git a/src/common/consumer.c b/src/common/consumer.c
>> index 6ee366f..6011622 100644
>> --- a/src/common/consumer.c
>> +++ b/src/common/consumer.c
>> @@ -227,8 +227,8 @@ void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
>> }
>>
>> /*
>> - * Remove a stream from the global list protected by a mutex. This
>> - * function is also responsible for freeing its data structures.
>> + * Remove a stream from the global list protected by a mutex. This function is
>> + * also responsible for freeing its data structures.
>> */
>> void consumer_del_stream(struct lttng_consumer_stream *stream)
>> {
>> @@ -236,10 +236,46 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
>> struct lttng_ht_iter iter;
>> struct lttng_consumer_channel *free_chan = NULL;
>> struct consumer_relayd_sock_pair *relayd;
>> + struct lttng_ht_node_ulong *node;
>>
>> assert(stream);
>>
>> + DBG3("Consumer deleting stream %d", stream->key);
>> +
>> pthread_mutex_lock(&consumer_data.lock);
>> + rcu_read_lock();
>> +
>> + /*
>> + * A stream with a key value of -1 means that the stream is in the hash
>> + * table but can not be looked up. This happens when consumer_add_stream is
>> + * done and we have a duplicate key before insertion.
>> + * consumer_steal_stream_key() is called to make sure we can insert a
>> + * stream even though the index is already present. Since the key is the fd
>> + * value on the session daemon side, duplicates are possible.
>> + */
>> + if (stream->key != -1) {
>> + lttng_ht_lookup(consumer_data.stream_ht,
>> + (void *)((unsigned long) stream->key), &iter);
>> + node = lttng_ht_iter_get_node_ulong(&iter);
>> + if (node == NULL) {
>> + rcu_read_unlock();
>> +
>> + /*
>> + * Stream doest not exist in hash table. This can happen if we hit
>> + * an error after allocation but before adding it to the table. We
>> + * consider that if the node is not in the hash table and has a
>> + * valid key, no ustctl/ioctl nor mmap action was done hence
>> + * jumping to the RCU free.
>> + */
>> + DBG2("Consumer stream key %d not found during deletion", stream->key);
>> + goto free_stream;
>> + } else {
>> + /* Remove stream from hash table and continue */
>> + ret = lttng_ht_del(consumer_data.stream_ht, &iter);
>> + assert(!ret);
>> + }
>> + }
>> + rcu_read_unlock();
>
> Why are you changing this code ? You add a lookup to get the node you
> already receive as parameter. It looks pretty much useless to me.
>
> What you probably want there is to pass a flag to consumer_del_stream()
> telling it whether or not it needs to remove the stream from the hash
> table, so it can skip the ht_del step accordingly.
Yes we can do that instead.
>
> Let's discuss this one and, once we understand the intent, we'll
> continue on the rest of the patch.
>
> Thanks,
>
> Mathieu
>
>>
>> switch (consumer_data.type) {
>> case LTTNG_CONSUMER_KERNEL:
>> @@ -260,20 +296,10 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
>> goto end;
>> }
>>
>> - rcu_read_lock();
>> - iter.iter.node = &stream->node.node;
>> - ret = lttng_ht_del(consumer_data.stream_ht, &iter);
>> - assert(!ret);
>> -
>> - rcu_read_unlock();
>> -
>> - if (consumer_data.stream_count <= 0) {
>> - goto end;
>> - }
>> + /* This should NEVER reach a negative value. */
>> + assert(consumer_data.stream_count >= 0);
>> consumer_data.stream_count--;
>> - if (!stream) {
>> - goto end;
>> - }
>> +
>> if (stream->out_fd >= 0) {
>> ret = close(stream->out_fd);
>> if (ret) {
>> @@ -321,7 +347,6 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
>> destroy_relayd(relayd);
>> }
>> }
>> - rcu_read_unlock();
>>
>> uatomic_dec(&stream->chan->refcount);
>> if (!uatomic_read(&stream->chan->refcount)
>> @@ -329,7 +354,6 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
>> free_chan = stream->chan;
>> }
>>
>> - call_rcu(&stream->node.head, consumer_free_stream);
>> end:
>> consumer_data.need_update = 1;
>> pthread_mutex_unlock(&consumer_data.lock);
>> @@ -337,6 +361,10 @@ end:
>> if (free_chan) {
>> consumer_del_channel(free_chan);
>> }
>> +
>> +free_stream:
>> + call_rcu(&stream->node.head, consumer_free_stream);
>> + rcu_read_unlock();
>> }
>>
>> struct lttng_consumer_stream *consumer_allocate_stream(
>> @@ -353,7 +381,6 @@ struct lttng_consumer_stream *consumer_allocate_stream(
>> int *alloc_ret)
>> {
>> struct lttng_consumer_stream *stream;
>> - int ret;
>>
>> stream = zmalloc(sizeof(*stream));
>> if (stream == NULL) {
>> @@ -372,7 +399,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(
>> ERR("Unable to find channel for stream %d", stream_key);
>> goto error;
>> }
>> - stream->chan->refcount++;
>> +
>> stream->key = stream_key;
>> stream->shm_fd = shm_fd;
>> stream->wait_fd = wait_fd;
>> @@ -391,35 +418,6 @@ struct lttng_consumer_stream *consumer_allocate_stream(
>> lttng_ht_node_init_ulong(&stream->node, stream->key);
>> lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd);
>>
>> - switch (consumer_data.type) {
>> - case LTTNG_CONSUMER_KERNEL:
>> - break;
>> - case LTTNG_CONSUMER32_UST:
>> - case LTTNG_CONSUMER64_UST:
>> - stream->cpu = stream->chan->cpucount++;
>> - ret = lttng_ustconsumer_allocate_stream(stream);
>> - if (ret) {
>> - *alloc_ret = -EINVAL;
>> - goto error;
>> - }
>> - break;
>> - default:
>> - ERR("Unknown consumer_data type");
>> - *alloc_ret = -EINVAL;
>> - goto error;
>> - }
>> -
>> - /*
>> - * When nb_init_streams reaches 0, we don't need to trigger any action in
>> - * terms of destroying the associated channel, because the action that
>> - * causes the count to become 0 also causes a stream to be added. The
>> - * channel deletion will thus be triggered by the following removal of this
>> - * stream.
>> - */
>> - if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
>> - uatomic_dec(&stream->chan->nb_init_streams);
>> - }
>> -
>> DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
>> " out_fd %d, net_seq_idx %d)", stream->path_name, stream->key,
>> stream->shm_fd, stream->wait_fd,
>> @@ -439,38 +437,66 @@ end:
>> int consumer_add_stream(struct lttng_consumer_stream *stream)
>> {
>> int ret = 0;
>> - struct lttng_ht_node_ulong *node;
>> - struct lttng_ht_iter iter;
>> struct consumer_relayd_sock_pair *relayd;
>>
>> - pthread_mutex_lock(&consumer_data.lock);
>> - /* Steal stream identifier, for UST */
>> - consumer_steal_stream_key(stream->key, consumer_data.stream_ht);
>> + assert(stream);
>>
>> + DBG3("Adding consumer stream %d", stream->key);
>> +
>> + pthread_mutex_lock(&consumer_data.lock);
>> rcu_read_lock();
>> - lttng_ht_lookup(consumer_data.stream_ht,
>> - (void *)((unsigned long) stream->key), &iter);
>> - node = lttng_ht_iter_get_node_ulong(&iter);
>> - if (node != NULL) {
>> - rcu_read_unlock();
>> - /* Stream already exist. Ignore the insertion */
>> - goto end;
>> - }
>>
>> - lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
>> + switch (consumer_data.type) {
>> + case LTTNG_CONSUMER_KERNEL:
>> + break;
>> + case LTTNG_CONSUMER32_UST:
>> + case LTTNG_CONSUMER64_UST:
>> + stream->cpu = stream->chan->cpucount++;
>> + ret = lttng_ustconsumer_add_stream(stream);
>> + if (ret) {
>> + ret = -EINVAL;
>> + goto error;
>> + }
>> +
>> + /* Steal stream identifier only for UST */
>> + consumer_steal_stream_key(stream->key, consumer_data.stream_ht);
>> + break;
>> + default:
>> + ERR("Unknown consumer_data type");
>> + assert(0);
>> + ret = -ENOSYS;
>> + goto error;
>> + }
>>
>> /* Check and cleanup relayd */
>> relayd = consumer_find_relayd(stream->net_seq_idx);
>> if (relayd != NULL) {
>> uatomic_inc(&relayd->refcount);
>> }
>> - rcu_read_unlock();
>>
>> - /* Update consumer data */
>> + /* Final operation is to add the stream to the global hash table. */
>> + lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
>> +
>> + /* Update channel refcount once added without error(s). */
>> + uatomic_inc(&stream->chan->refcount);
>> +
>> + /*
>> + * When nb_init_streams reaches 0, we don't need to trigger any action in
>> + * terms of destroying the associated channel, because the action that
>> + * causes the count to become 0 also causes a stream to be added. The
>> + * channel deletion will thus be triggered by the following removal of this
>> + * stream.
>> + */
>> + if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
>> + uatomic_dec(&stream->chan->nb_init_streams);
>> + }
>> +
>> + /* Update consumer data once the node is inserted. */
>> consumer_data.stream_count++;
>> consumer_data.need_update = 1;
>>
>> -end:
>> +error:
>> + rcu_read_unlock();
>> pthread_mutex_unlock(&consumer_data.lock);
>>
>> return ret;
>> @@ -1648,10 +1674,37 @@ static void consumer_del_metadata_stream(struct lttng_consumer_stream *stream)
>> * Action done with the metadata stream when adding it to the consumer internal
>> * data structures to handle it.
>> */
>> -static void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
>> +static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
>> + struct lttng_ht *ht)
>> {
>> + int ret = 0;
>> struct consumer_relayd_sock_pair *relayd;
>>
>> + switch (consumer_data.type) {
>> + case LTTNG_CONSUMER_KERNEL:
>> + break;
>> + case LTTNG_CONSUMER32_UST:
>> + case LTTNG_CONSUMER64_UST:
>> + ret = lttng_ustconsumer_add_stream(stream);
>> + if (ret) {
>> + ret = -EINVAL;
>> + goto error;
>> + }
>> +
>> + /* Steal stream identifier only for UST */
>> + consumer_steal_stream_key(stream->key, ht);
>> + break;
>> + default:
>> + ERR("Unknown consumer_data type");
>> + assert(0);
>> + return -ENOSYS;
>> + }
>> +
>> + /*
>> + * From here, refcounts are updated so be _careful_ when returning an error
>> + * after this point.
>> + */
>> +
>> /* Find relayd and, if one is found, increment refcount. */
>> rcu_read_lock();
>> relayd = consumer_find_relayd(stream->net_seq_idx);
>> @@ -1659,6 +1712,27 @@ static void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
>> uatomic_inc(&relayd->refcount);
>> }
>> rcu_read_unlock();
>> +
>> + /* Update channel refcount once added without error(s). */
>> + uatomic_inc(&stream->chan->refcount);
>> +
>> + /*
>> + * When nb_init_streams reaches 0, we don't need to trigger any action in
>> + * terms of destroying the associated channel, because the action that
>> + * causes the count to become 0 also causes a stream to be added. The
>> + * channel deletion will thus be triggered by the following removal of this
>> + * stream.
>> + */
>> + if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
>> + uatomic_dec(&stream->chan->nb_init_streams);
>> + }
>> +
>> + rcu_read_lock();
>> + lttng_ht_add_unique_ulong(ht, &stream->waitfd_node);
>> + rcu_read_unlock();
>> +
>> +error:
>> + return ret;
>> }
>>
>> /*
>> @@ -1755,17 +1829,16 @@ restart:
>> DBG("Adding metadata stream %d to poll set",
>> stream->wait_fd);
>>
>> - rcu_read_lock();
>> - /* The node should be init at this point */
>> - lttng_ht_add_unique_ulong(metadata_ht,
>> - &stream->waitfd_node);
>> - rcu_read_unlock();
>> + ret = consumer_add_metadata_stream(stream, metadata_ht);
>> + if (ret) {
>> + /* Stream was not setup properly. Continuing. */
>> + free(stream);
>> + continue;
>> + }
>>
>> /* Add metadata stream to the global poll events list */
>> lttng_poll_add(&events, stream->wait_fd,
>> LPOLLIN | LPOLLPRI);
>> -
>> - consumer_add_metadata_stream(stream);
>> }
>>
>> /* Metadata pipe handled. Continue handling the others */
>> diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c
>> index 4d61cc5..878c4ab 100644
>> --- a/src/common/kernel-consumer/kernel-consumer.c
>> +++ b/src/common/kernel-consumer/kernel-consumer.c
>> @@ -206,18 +206,20 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
>> &new_stream->relayd_stream_id);
>> pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
>> if (ret < 0) {
>> + consumer_del_stream(new_stream);
>> goto end_nosignal;
>> }
>> } else if (msg.u.stream.net_index != -1) {
>> ERR("Network sequence index %d unknown. Not adding stream.",
>> msg.u.stream.net_index);
>> - free(new_stream);
>> + consumer_del_stream(new_stream);
>> goto end_nosignal;
>> }
>>
>> if (ctx->on_recv_stream) {
>> ret = ctx->on_recv_stream(new_stream);
>> if (ret < 0) {
>> + consumer_del_stream(new_stream);
>> goto end_nosignal;
>> }
>> }
>> @@ -230,9 +232,16 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
>> } while (ret < 0 && errno == EINTR);
>> if (ret < 0) {
>> PERROR("write metadata pipe");
>> + consumer_del_stream(new_stream);
>> }
>> } else {
>> - consumer_add_stream(new_stream);
>> + ret = consumer_add_stream(new_stream);
>> + if (ret) {
>> + ERR("Consumer add stream %d failed. Continuing",
>> + new_stream->key);
>> + consumer_del_stream(new_stream);
>> + goto end_nosignal;
>> + }
>> }
>>
>> DBG("Kernel consumer_add_stream (%d)", fd);
>> diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c
>> index 76238a0..e10d540 100644
>> --- a/src/common/ust-consumer/ust-consumer.c
>> +++ b/src/common/ust-consumer/ust-consumer.c
>> @@ -234,12 +234,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
>> &new_stream->relayd_stream_id);
>> pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
>> if (ret < 0) {
>> + consumer_del_stream(new_stream);
>> goto end_nosignal;
>> }
>> } else if (msg.u.stream.net_index != -1) {
>> ERR("Network sequence index %d unknown. Not adding stream.",
>> msg.u.stream.net_index);
>> - free(new_stream);
>> + consumer_del_stream(new_stream);
>> goto end_nosignal;
>> }
>>
>> @@ -247,6 +248,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
>> if (ctx->on_recv_stream) {
>> ret = ctx->on_recv_stream(new_stream);
>> if (ret < 0) {
>> + consumer_del_stream(new_stream);
>> goto end_nosignal;
>> }
>> }
>> @@ -259,9 +261,17 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
>> } while (ret < 0 && errno == EINTR);
>> if (ret < 0) {
>> PERROR("write metadata pipe");
>> + consumer_del_stream(new_stream);
>> + goto end_nosignal;
>> }
>> } else {
>> - consumer_add_stream(new_stream);
>> + ret = consumer_add_stream(new_stream);
>> + if (ret) {
>> + ERR("Consumer add stream %d failed. Continuing",
>> + new_stream->key);
>> + consumer_del_stream(new_stream);
>> + goto end_nosignal;
>> + }
>> }
>>
>> DBG("UST consumer_add_stream %s (%d,%d) with relayd id %" PRIu64,
>> @@ -373,7 +383,7 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
>> ustctl_unmap_channel(chan->handle);
>> }
>>
>> -int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream)
>> +int lttng_ustconsumer_add_stream(struct lttng_consumer_stream *stream)
>> {
>> struct lttng_ust_object_data obj;
>> int ret;
>> diff --git a/src/common/ust-consumer/ust-consumer.h b/src/common/ust-consumer/ust-consumer.h
>> index 3f76f23..6b507ed 100644
>> --- a/src/common/ust-consumer/ust-consumer.h
>> +++ b/src/common/ust-consumer/ust-consumer.h
>> @@ -49,7 +49,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
>>
>> extern int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan);
>> extern void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan);
>> -extern int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream);
>> +extern int lttng_ustconsumer_add_stream(struct lttng_consumer_stream *stream);
>> extern void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream);
>>
>> int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
>> @@ -117,7 +117,7 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
>> }
>>
>> static inline
>> -int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream)
>> +int lttng_ustconsumer_add_stream(struct lttng_consumer_stream *stream)
>> {
>> return -ENOSYS;
>> }
>> --
>> 1.7.10.4
>>
>>
>> _______________________________________________
>> lttng-dev mailing list
>> lttng-dev at lists.lttng.org
>> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev
>
More information about the lttng-dev
mailing list