[lttng-dev] [PATCH lttng-tools 3/3] Fix: Stream allocation and insertion consistency
David Goulet
dgoulet at efficios.com
Wed Oct 3 12:48:45 EDT 2012
David Goulet:
> Mathieu Desnoyers:
>> * David Goulet (dgoulet at efficios.com) wrote:
>>> The stream allocation in the consumer was doing ustctl actions on the
>>> stream and updating refounts. However, before inserting the stream into
>>
>> refounts -> refcounts.
>>
>>> the hash table and polling on the fd for data, an error could occur
>>> which could stop the stream insertion hence creating multiple fd leaks,
>>> mem leaks and bad refount state.
>>
>> refount -> refcount
>>
>>>
>>> Furthermore, the consumer_del_stream now can destroy a stream even if
>>> that stream is not added to the global hash table. The kernel and UST
>>> consumer uses it on error between allocation and hash table insertion.
>>
>> consumer -> consumers
>> uses -> use
>>
>>>
>>> Signed-off-by: David Goulet <dgoulet at efficios.com>
>>> ---
>>> src/common/consumer.c | 219 +++++++++++++++++---------
>>> src/common/kernel-consumer/kernel-consumer.c | 13 +-
>>> src/common/ust-consumer/ust-consumer.c | 16 +-
>>> src/common/ust-consumer/ust-consumer.h | 4 +-
>>> 4 files changed, 172 insertions(+), 80 deletions(-)
>>>
>>> diff --git a/src/common/consumer.c b/src/common/consumer.c
>>> index 6ee366f..6011622 100644
>>> --- a/src/common/consumer.c
>>> +++ b/src/common/consumer.c
>>> @@ -227,8 +227,8 @@ void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
>>> }
>>>
>>> /*
>>> - * Remove a stream from the global list protected by a mutex. This
>>> - * function is also responsible for freeing its data structures.
>>> + * Remove a stream from the global list protected by a mutex. This function is
>>> + * also responsible for freeing its data structures.
>>> */
>>> void consumer_del_stream(struct lttng_consumer_stream *stream)
>>> {
>>> @@ -236,10 +236,46 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
>>> struct lttng_ht_iter iter;
>>> struct lttng_consumer_channel *free_chan = NULL;
>>> struct consumer_relayd_sock_pair *relayd;
>>> + struct lttng_ht_node_ulong *node;
>>>
>>> assert(stream);
>>>
>>> + DBG3("Consumer deleting stream %d", stream->key);
>>> +
>>> pthread_mutex_lock(&consumer_data.lock);
>>> + rcu_read_lock();
>>> +
>>> + /*
>>> + * A stream with a key value of -1 means that the stream is in the hash
>>> + * table but can not be looked up. This happens when consumer_add_stream is
>>> + * done and we have a duplicate key before insertion.
>>> + * consumer_steal_stream_key() is called to make sure we can insert a
>>> + * stream even though the index is already present. Since the key is the fd
>>> + * value on the session daemon side, duplicates are possible.
>>> + */
>>> + if (stream->key != -1) {
>>> + lttng_ht_lookup(consumer_data.stream_ht,
>>> + (void *)((unsigned long) stream->key), &iter);
>>> + node = lttng_ht_iter_get_node_ulong(&iter);
>>> + if (node == NULL) {
>>> + rcu_read_unlock();
>>> +
>>> + /*
>>> + * Stream doest not exist in hash table. This can happen if we hit
>>> + * an error after allocation but before adding it to the table. We
>>> + * consider that if the node is not in the hash table and has a
>>> + * valid key, no ustctl/ioctl nor mmap action was done hence
>>> + * jumping to the RCU free.
>>> + */
>>> + DBG2("Consumer stream key %d not found during deletion", stream->key);
>>> + goto free_stream;
>>> + } else {
>>> + /* Remove stream from hash table and continue */
>>> + ret = lttng_ht_del(consumer_data.stream_ht, &iter);
>>> + assert(!ret);
>>> + }
>>> + }
>>> + rcu_read_unlock();
>>
>> Why are you changing this code ? You add a lookup to get the node you
>> already receive as parameter. It looks pretty much useless to me.
>>
>> What you probably want there is to pass a flag to consumer_del_stream()
>> telling it whether or not it needs to remove the stream from the hash
>> table, so it can skip the ht_del step accordingly.
>
> Yes we can do that instead.
Actually, we could provide the hash table to the call making
consumer_del_stream goes:
consumer_del_stream(key, ht)
and a NULL ht means that the key cannot be remove from the hash table.
Else, with a flag, I'll go with wrapper macro to make the code clearer
and not just "consumer_del_stream(key, 1)"
Thoughts?
David
>
>>
>> Let's discuss this one and, once we understand the intent, we'll
>> continue on the rest of the patch.
>>
>> Thanks,
>>
>> Mathieu
>>
>>>
>>> switch (consumer_data.type) {
>>> case LTTNG_CONSUMER_KERNEL:
>>> @@ -260,20 +296,10 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
>>> goto end;
>>> }
>>>
>>> - rcu_read_lock();
>>> - iter.iter.node = &stream->node.node;
>>> - ret = lttng_ht_del(consumer_data.stream_ht, &iter);
>>> - assert(!ret);
>>> -
>>> - rcu_read_unlock();
>>> -
>>> - if (consumer_data.stream_count <= 0) {
>>> - goto end;
>>> - }
>>> + /* This should NEVER reach a negative value. */
>>> + assert(consumer_data.stream_count >= 0);
>>> consumer_data.stream_count--;
>>> - if (!stream) {
>>> - goto end;
>>> - }
>>> +
>>> if (stream->out_fd >= 0) {
>>> ret = close(stream->out_fd);
>>> if (ret) {
>>> @@ -321,7 +347,6 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
>>> destroy_relayd(relayd);
>>> }
>>> }
>>> - rcu_read_unlock();
>>>
>>> uatomic_dec(&stream->chan->refcount);
>>> if (!uatomic_read(&stream->chan->refcount)
>>> @@ -329,7 +354,6 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
>>> free_chan = stream->chan;
>>> }
>>>
>>> - call_rcu(&stream->node.head, consumer_free_stream);
>>> end:
>>> consumer_data.need_update = 1;
>>> pthread_mutex_unlock(&consumer_data.lock);
>>> @@ -337,6 +361,10 @@ end:
>>> if (free_chan) {
>>> consumer_del_channel(free_chan);
>>> }
>>> +
>>> +free_stream:
>>> + call_rcu(&stream->node.head, consumer_free_stream);
>>> + rcu_read_unlock();
>>> }
>>>
>>> struct lttng_consumer_stream *consumer_allocate_stream(
>>> @@ -353,7 +381,6 @@ struct lttng_consumer_stream *consumer_allocate_stream(
>>> int *alloc_ret)
>>> {
>>> struct lttng_consumer_stream *stream;
>>> - int ret;
>>>
>>> stream = zmalloc(sizeof(*stream));
>>> if (stream == NULL) {
>>> @@ -372,7 +399,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(
>>> ERR("Unable to find channel for stream %d", stream_key);
>>> goto error;
>>> }
>>> - stream->chan->refcount++;
>>> +
>>> stream->key = stream_key;
>>> stream->shm_fd = shm_fd;
>>> stream->wait_fd = wait_fd;
>>> @@ -391,35 +418,6 @@ struct lttng_consumer_stream *consumer_allocate_stream(
>>> lttng_ht_node_init_ulong(&stream->node, stream->key);
>>> lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd);
>>>
>>> - switch (consumer_data.type) {
>>> - case LTTNG_CONSUMER_KERNEL:
>>> - break;
>>> - case LTTNG_CONSUMER32_UST:
>>> - case LTTNG_CONSUMER64_UST:
>>> - stream->cpu = stream->chan->cpucount++;
>>> - ret = lttng_ustconsumer_allocate_stream(stream);
>>> - if (ret) {
>>> - *alloc_ret = -EINVAL;
>>> - goto error;
>>> - }
>>> - break;
>>> - default:
>>> - ERR("Unknown consumer_data type");
>>> - *alloc_ret = -EINVAL;
>>> - goto error;
>>> - }
>>> -
>>> - /*
>>> - * When nb_init_streams reaches 0, we don't need to trigger any action in
>>> - * terms of destroying the associated channel, because the action that
>>> - * causes the count to become 0 also causes a stream to be added. The
>>> - * channel deletion will thus be triggered by the following removal of this
>>> - * stream.
>>> - */
>>> - if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
>>> - uatomic_dec(&stream->chan->nb_init_streams);
>>> - }
>>> -
>>> DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
>>> " out_fd %d, net_seq_idx %d)", stream->path_name, stream->key,
>>> stream->shm_fd, stream->wait_fd,
>>> @@ -439,38 +437,66 @@ end:
>>> int consumer_add_stream(struct lttng_consumer_stream *stream)
>>> {
>>> int ret = 0;
>>> - struct lttng_ht_node_ulong *node;
>>> - struct lttng_ht_iter iter;
>>> struct consumer_relayd_sock_pair *relayd;
>>>
>>> - pthread_mutex_lock(&consumer_data.lock);
>>> - /* Steal stream identifier, for UST */
>>> - consumer_steal_stream_key(stream->key, consumer_data.stream_ht);
>>> + assert(stream);
>>>
>>> + DBG3("Adding consumer stream %d", stream->key);
>>> +
>>> + pthread_mutex_lock(&consumer_data.lock);
>>> rcu_read_lock();
>>> - lttng_ht_lookup(consumer_data.stream_ht,
>>> - (void *)((unsigned long) stream->key), &iter);
>>> - node = lttng_ht_iter_get_node_ulong(&iter);
>>> - if (node != NULL) {
>>> - rcu_read_unlock();
>>> - /* Stream already exist. Ignore the insertion */
>>> - goto end;
>>> - }
>>>
>>> - lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
>>> + switch (consumer_data.type) {
>>> + case LTTNG_CONSUMER_KERNEL:
>>> + break;
>>> + case LTTNG_CONSUMER32_UST:
>>> + case LTTNG_CONSUMER64_UST:
>>> + stream->cpu = stream->chan->cpucount++;
>>> + ret = lttng_ustconsumer_add_stream(stream);
>>> + if (ret) {
>>> + ret = -EINVAL;
>>> + goto error;
>>> + }
>>> +
>>> + /* Steal stream identifier only for UST */
>>> + consumer_steal_stream_key(stream->key, consumer_data.stream_ht);
>>> + break;
>>> + default:
>>> + ERR("Unknown consumer_data type");
>>> + assert(0);
>>> + ret = -ENOSYS;
>>> + goto error;
>>> + }
>>>
>>> /* Check and cleanup relayd */
>>> relayd = consumer_find_relayd(stream->net_seq_idx);
>>> if (relayd != NULL) {
>>> uatomic_inc(&relayd->refcount);
>>> }
>>> - rcu_read_unlock();
>>>
>>> - /* Update consumer data */
>>> + /* Final operation is to add the stream to the global hash table. */
>>> + lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
>>> +
>>> + /* Update channel refcount once added without error(s). */
>>> + uatomic_inc(&stream->chan->refcount);
>>> +
>>> + /*
>>> + * When nb_init_streams reaches 0, we don't need to trigger any action in
>>> + * terms of destroying the associated channel, because the action that
>>> + * causes the count to become 0 also causes a stream to be added. The
>>> + * channel deletion will thus be triggered by the following removal of this
>>> + * stream.
>>> + */
>>> + if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
>>> + uatomic_dec(&stream->chan->nb_init_streams);
>>> + }
>>> +
>>> + /* Update consumer data once the node is inserted. */
>>> consumer_data.stream_count++;
>>> consumer_data.need_update = 1;
>>>
>>> -end:
>>> +error:
>>> + rcu_read_unlock();
>>> pthread_mutex_unlock(&consumer_data.lock);
>>>
>>> return ret;
>>> @@ -1648,10 +1674,37 @@ static void consumer_del_metadata_stream(struct lttng_consumer_stream *stream)
>>> * Action done with the metadata stream when adding it to the consumer internal
>>> * data structures to handle it.
>>> */
>>> -static void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
>>> +static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
>>> + struct lttng_ht *ht)
>>> {
>>> + int ret = 0;
>>> struct consumer_relayd_sock_pair *relayd;
>>>
>>> + switch (consumer_data.type) {
>>> + case LTTNG_CONSUMER_KERNEL:
>>> + break;
>>> + case LTTNG_CONSUMER32_UST:
>>> + case LTTNG_CONSUMER64_UST:
>>> + ret = lttng_ustconsumer_add_stream(stream);
>>> + if (ret) {
>>> + ret = -EINVAL;
>>> + goto error;
>>> + }
>>> +
>>> + /* Steal stream identifier only for UST */
>>> + consumer_steal_stream_key(stream->key, ht);
>>> + break;
>>> + default:
>>> + ERR("Unknown consumer_data type");
>>> + assert(0);
>>> + return -ENOSYS;
>>> + }
>>> +
>>> + /*
>>> + * From here, refcounts are updated so be _careful_ when returning an error
>>> + * after this point.
>>> + */
>>> +
>>> /* Find relayd and, if one is found, increment refcount. */
>>> rcu_read_lock();
>>> relayd = consumer_find_relayd(stream->net_seq_idx);
>>> @@ -1659,6 +1712,27 @@ static void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
>>> uatomic_inc(&relayd->refcount);
>>> }
>>> rcu_read_unlock();
>>> +
>>> + /* Update channel refcount once added without error(s). */
>>> + uatomic_inc(&stream->chan->refcount);
>>> +
>>> + /*
>>> + * When nb_init_streams reaches 0, we don't need to trigger any action in
>>> + * terms of destroying the associated channel, because the action that
>>> + * causes the count to become 0 also causes a stream to be added. The
>>> + * channel deletion will thus be triggered by the following removal of this
>>> + * stream.
>>> + */
>>> + if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
>>> + uatomic_dec(&stream->chan->nb_init_streams);
>>> + }
>>> +
>>> + rcu_read_lock();
>>> + lttng_ht_add_unique_ulong(ht, &stream->waitfd_node);
>>> + rcu_read_unlock();
>>> +
>>> +error:
>>> + return ret;
>>> }
>>>
>>> /*
>>> @@ -1755,17 +1829,16 @@ restart:
>>> DBG("Adding metadata stream %d to poll set",
>>> stream->wait_fd);
>>>
>>> - rcu_read_lock();
>>> - /* The node should be init at this point */
>>> - lttng_ht_add_unique_ulong(metadata_ht,
>>> - &stream->waitfd_node);
>>> - rcu_read_unlock();
>>> + ret = consumer_add_metadata_stream(stream, metadata_ht);
>>> + if (ret) {
>>> + /* Stream was not setup properly. Continuing. */
>>> + free(stream);
>>> + continue;
>>> + }
>>>
>>> /* Add metadata stream to the global poll events list */
>>> lttng_poll_add(&events, stream->wait_fd,
>>> LPOLLIN | LPOLLPRI);
>>> -
>>> - consumer_add_metadata_stream(stream);
>>> }
>>>
>>> /* Metadata pipe handled. Continue handling the others */
>>> diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c
>>> index 4d61cc5..878c4ab 100644
>>> --- a/src/common/kernel-consumer/kernel-consumer.c
>>> +++ b/src/common/kernel-consumer/kernel-consumer.c
>>> @@ -206,18 +206,20 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
>>> &new_stream->relayd_stream_id);
>>> pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
>>> if (ret < 0) {
>>> + consumer_del_stream(new_stream);
>>> goto end_nosignal;
>>> }
>>> } else if (msg.u.stream.net_index != -1) {
>>> ERR("Network sequence index %d unknown. Not adding stream.",
>>> msg.u.stream.net_index);
>>> - free(new_stream);
>>> + consumer_del_stream(new_stream);
>>> goto end_nosignal;
>>> }
>>>
>>> if (ctx->on_recv_stream) {
>>> ret = ctx->on_recv_stream(new_stream);
>>> if (ret < 0) {
>>> + consumer_del_stream(new_stream);
>>> goto end_nosignal;
>>> }
>>> }
>>> @@ -230,9 +232,16 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
>>> } while (ret < 0 && errno == EINTR);
>>> if (ret < 0) {
>>> PERROR("write metadata pipe");
>>> + consumer_del_stream(new_stream);
>>> }
>>> } else {
>>> - consumer_add_stream(new_stream);
>>> + ret = consumer_add_stream(new_stream);
>>> + if (ret) {
>>> + ERR("Consumer add stream %d failed. Continuing",
>>> + new_stream->key);
>>> + consumer_del_stream(new_stream);
>>> + goto end_nosignal;
>>> + }
>>> }
>>>
>>> DBG("Kernel consumer_add_stream (%d)", fd);
>>> diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c
>>> index 76238a0..e10d540 100644
>>> --- a/src/common/ust-consumer/ust-consumer.c
>>> +++ b/src/common/ust-consumer/ust-consumer.c
>>> @@ -234,12 +234,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
>>> &new_stream->relayd_stream_id);
>>> pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
>>> if (ret < 0) {
>>> + consumer_del_stream(new_stream);
>>> goto end_nosignal;
>>> }
>>> } else if (msg.u.stream.net_index != -1) {
>>> ERR("Network sequence index %d unknown. Not adding stream.",
>>> msg.u.stream.net_index);
>>> - free(new_stream);
>>> + consumer_del_stream(new_stream);
>>> goto end_nosignal;
>>> }
>>>
>>> @@ -247,6 +248,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
>>> if (ctx->on_recv_stream) {
>>> ret = ctx->on_recv_stream(new_stream);
>>> if (ret < 0) {
>>> + consumer_del_stream(new_stream);
>>> goto end_nosignal;
>>> }
>>> }
>>> @@ -259,9 +261,17 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
>>> } while (ret < 0 && errno == EINTR);
>>> if (ret < 0) {
>>> PERROR("write metadata pipe");
>>> + consumer_del_stream(new_stream);
>>> + goto end_nosignal;
>>> }
>>> } else {
>>> - consumer_add_stream(new_stream);
>>> + ret = consumer_add_stream(new_stream);
>>> + if (ret) {
>>> + ERR("Consumer add stream %d failed. Continuing",
>>> + new_stream->key);
>>> + consumer_del_stream(new_stream);
>>> + goto end_nosignal;
>>> + }
>>> }
>>>
>>> DBG("UST consumer_add_stream %s (%d,%d) with relayd id %" PRIu64,
>>> @@ -373,7 +383,7 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
>>> ustctl_unmap_channel(chan->handle);
>>> }
>>>
>>> -int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream)
>>> +int lttng_ustconsumer_add_stream(struct lttng_consumer_stream *stream)
>>> {
>>> struct lttng_ust_object_data obj;
>>> int ret;
>>> diff --git a/src/common/ust-consumer/ust-consumer.h b/src/common/ust-consumer/ust-consumer.h
>>> index 3f76f23..6b507ed 100644
>>> --- a/src/common/ust-consumer/ust-consumer.h
>>> +++ b/src/common/ust-consumer/ust-consumer.h
>>> @@ -49,7 +49,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
>>>
>>> extern int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan);
>>> extern void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan);
>>> -extern int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream);
>>> +extern int lttng_ustconsumer_add_stream(struct lttng_consumer_stream *stream);
>>> extern void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream);
>>>
>>> int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
>>> @@ -117,7 +117,7 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
>>> }
>>>
>>> static inline
>>> -int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream)
>>> +int lttng_ustconsumer_add_stream(struct lttng_consumer_stream *stream)
>>> {
>>> return -ENOSYS;
>>> }
>>> --
>>> 1.7.10.4
>>>
>>>
>>> _______________________________________________
>>> lttng-dev mailing list
>>> lttng-dev at lists.lttng.org
>>> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev
>>
>
> _______________________________________________
> lttng-dev mailing list
> lttng-dev at lists.lttng.org
> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev
More information about the lttng-dev
mailing list