[lttng-dev] [PATCH lttng-tools 3/4] Make stream hash tables global to the consumer
Mathieu Desnoyers
mathieu.desnoyers at efficios.com
Sat Oct 13 11:56:15 EDT 2012
* David Goulet (dgoulet at efficios.com) wrote:
> The data stream hash table is now global to the consumer and used in the
> data thread. The consumer_data stream_ht is no longer used to track the
> data streams but instead will be used (and possibly renamed) by the
> session daemon poll thread to keep track of streams on a per session id
> basis for the upcoming feature that check traced data availability.
>
> For now, in order to avoid mind bugging problems to access the streams,
> both hash table are now defined globally (metadata and data). However,
> stream update are still done in a single thread. Don't count on this to
> be guaranteed in the next commits.
>
> Signed-off-by: David Goulet <dgoulet at efficios.com>
> ---
> src/common/consumer.c | 91 +++++++++++++++++++++++++-------
> src/common/consumer.h | 9 ++--
> src/common/ust-consumer/ust-consumer.c | 2 -
> 3 files changed, 75 insertions(+), 27 deletions(-)
>
> diff --git a/src/common/consumer.c b/src/common/consumer.c
> index 1d2b1f7..1fb9960 100644
> --- a/src/common/consumer.c
> +++ b/src/common/consumer.c
> @@ -59,6 +59,17 @@ int consumer_poll_timeout = -1;
> volatile int consumer_quit = 0;
>
> /*
> + * The following two hash tables are visible by all threads which are separated
> + * in different source files.
> + *
> + * Global hash table containing respectively metadata and data streams. The
> + * stream element in this ht should only be updated by the metadata poll thread
> + * for the metadata and the data poll thread for the data.
> + */
> +struct lttng_ht *metadata_ht = NULL;
> +struct lttng_ht *data_ht = NULL;
> +
> +/*
> * Find a stream. The consumer_data.lock must be locked during this
> * call.
> */
> @@ -433,19 +444,24 @@ end:
> /*
> * Add a stream to the global list protected by a mutex.
> */
> -int consumer_add_stream(struct lttng_consumer_stream *stream)
> +static int consumer_add_stream(struct lttng_consumer_stream *stream,
> + struct lttng_ht *ht)
> {
> int ret = 0;
> struct consumer_relayd_sock_pair *relayd;
>
> assert(stream);
> + assert(ht);
>
> DBG3("Adding consumer stream %d", stream->key);
>
> pthread_mutex_lock(&consumer_data.lock);
> rcu_read_lock();
>
> - lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
> + /* Steal stream identifier to avoid having streams with the same key */
> + consumer_steal_stream_key(stream->key, ht);
I don't understand why suddenly this change is needed. Considering what
this patch should be doing (just moving a ht from per-thread to global),
it should not have any behavior impact.
Thanks,
Mathieu
> +
> + lttng_ht_add_unique_ulong(ht, &stream->node);
>
> /* Check and cleanup relayd */
> relayd = consumer_find_relayd(stream->net_seq_idx);
> @@ -783,9 +799,9 @@ end:
> *
> * Returns the number of fds in the structures.
> */
> -int consumer_update_poll_array(
> +static int consumer_update_poll_array(
> struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
> - struct lttng_consumer_stream **local_stream)
> + struct lttng_consumer_stream **local_stream, struct lttng_ht *ht)
> {
> int i = 0;
> struct lttng_ht_iter iter;
> @@ -793,8 +809,7 @@ int consumer_update_poll_array(
>
> DBG("Updating poll fd array");
> rcu_read_lock();
> - cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream,
> - node.node) {
> + cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
> if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
> continue;
> }
> @@ -1523,6 +1538,33 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
> /*
> * Iterate over all streams of the hashtable and free them properly.
> *
> + * WARNING: *MUST* be used with data stream only.
> + */
> +static void destroy_data_stream_ht(struct lttng_ht *ht)
> +{
> + int ret;
> + struct lttng_ht_iter iter;
> + struct lttng_consumer_stream *stream;
> +
> + if (ht == NULL) {
> + return;
> + }
> +
> + rcu_read_lock();
> + cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
> + ret = lttng_ht_del(ht, &iter);
> + assert(!ret);
> +
> + call_rcu(&stream->node.head, consumer_free_stream);
> + }
> + rcu_read_unlock();
> +
> + lttng_ht_destroy(ht);
> +}
> +
> +/*
> + * Iterate over all streams of the hashtable and free them properly.
> + *
> * XXX: Should not be only for metadata stream or else use an other name.
> */
> static void destroy_stream_ht(struct lttng_ht *ht)
> @@ -1711,6 +1753,9 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
> uatomic_dec(&stream->chan->nb_init_streams);
> }
>
> + /* Steal stream identifier to avoid having streams with the same key */
> + consumer_steal_stream_key(stream->key, ht);
> +
> lttng_ht_add_unique_ulong(ht, &stream->waitfd_node);
> rcu_read_unlock();
>
> @@ -1729,7 +1774,6 @@ void *consumer_thread_metadata_poll(void *data)
> struct lttng_consumer_stream *stream = NULL;
> struct lttng_ht_iter iter;
> struct lttng_ht_node_ulong *node;
> - struct lttng_ht *metadata_ht = NULL;
> struct lttng_poll_event events;
> struct lttng_consumer_local_data *ctx = data;
> ssize_t len;
> @@ -1738,11 +1782,6 @@ void *consumer_thread_metadata_poll(void *data)
>
> DBG("Thread metadata poll started");
>
> - metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
> - if (metadata_ht == NULL) {
> - goto end;
> - }
> -
> /* Size is set to 1 for the consumer_metadata pipe */
> ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
> if (ret < 0) {
> @@ -1918,6 +1957,11 @@ void *consumer_thread_data_poll(void *data)
>
> rcu_register_thread();
>
> + data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
> + if (data_ht == NULL) {
> + goto end;
> + }
> +
> local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
>
> while (1) {
> @@ -1955,7 +1999,8 @@ void *consumer_thread_data_poll(void *data)
> pthread_mutex_unlock(&consumer_data.lock);
> goto end;
> }
> - ret = consumer_update_poll_array(ctx, &pollfd, local_stream);
> + ret = consumer_update_poll_array(ctx, &pollfd, local_stream,
> + data_ht);
> if (ret < 0) {
> ERR("Error in allocating pollfd or local_outfds");
> lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
> @@ -2015,7 +2060,7 @@ void *consumer_thread_data_poll(void *data)
> continue;
> }
>
> - ret = consumer_add_stream(new_stream);
> + ret = consumer_add_stream(new_stream, data_ht);
> if (ret) {
> ERR("Consumer add stream %d failed. Continuing",
> new_stream->key);
> @@ -2088,22 +2133,19 @@ void *consumer_thread_data_poll(void *data)
> if ((pollfd[i].revents & POLLHUP)) {
> DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
> if (!local_stream[i]->data_read) {
> - consumer_del_stream(local_stream[i],
> - consumer_data.stream_ht);
> + consumer_del_stream(local_stream[i], data_ht);
> num_hup++;
> }
> } else if (pollfd[i].revents & POLLERR) {
> ERR("Error returned in polling fd %d.", pollfd[i].fd);
> if (!local_stream[i]->data_read) {
> - consumer_del_stream(local_stream[i],
> - consumer_data.stream_ht);
> + consumer_del_stream(local_stream[i], data_ht);
> num_hup++;
> }
> } else if (pollfd[i].revents & POLLNVAL) {
> ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
> if (!local_stream[i]->data_read) {
> - consumer_del_stream(local_stream[i],
> - consumer_data.stream_ht);
> + consumer_del_stream(local_stream[i], data_ht);
> num_hup++;
> }
> }
> @@ -2131,6 +2173,10 @@ end:
> */
> close(ctx->consumer_metadata_pipe[1]);
>
> + if (data_ht) {
> + destroy_data_stream_ht(data_ht);
> + }
> +
> rcu_unregister_thread();
> return NULL;
> }
> @@ -2299,6 +2345,11 @@ void lttng_consumer_init(void)
> consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
> consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
> consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
> +
> + metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
> + assert(metadata_ht);
> + data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
> + assert(data_ht);
> }
>
> /*
> diff --git a/src/common/consumer.h b/src/common/consumer.h
> index 8e5891a..6bce96d 100644
> --- a/src/common/consumer.h
> +++ b/src/common/consumer.h
> @@ -275,6 +275,10 @@ struct lttng_consumer_global_data {
> struct lttng_ht *relayd_ht;
> };
>
> +/* Defined in consumer.c and coupled with explanations */
> +extern struct lttng_ht *metadata_ht;
> +extern struct lttng_ht *data_ht;
> +
> /*
> * Init consumer data structures.
> */
> @@ -324,10 +328,6 @@ extern void lttng_consumer_sync_trace_file(
> */
> extern int lttng_consumer_poll_socket(struct pollfd *kconsumer_sockpoll);
>
> -extern int consumer_update_poll_array(
> - struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
> - struct lttng_consumer_stream **local_consumer_streams);
> -
> extern struct lttng_consumer_stream *consumer_allocate_stream(
> int channel_key, int stream_key,
> int shm_fd, int wait_fd,
> @@ -340,7 +340,6 @@ extern struct lttng_consumer_stream *consumer_allocate_stream(
> int net_index,
> int metadata_flag,
> int *alloc_ret);
> -extern int consumer_add_stream(struct lttng_consumer_stream *stream);
> extern void consumer_del_stream(struct lttng_consumer_stream *stream,
> struct lttng_ht *ht);
> extern void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
> diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c
> index 4ca4b84..3b41e55 100644
> --- a/src/common/ust-consumer/ust-consumer.c
> +++ b/src/common/ust-consumer/ust-consumer.c
> @@ -233,8 +233,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
> consumer_del_stream(new_stream, NULL);
> goto end_nosignal;
> }
> - /* Steal stream identifier to avoid having streams with the same key */
> - consumer_steal_stream_key(new_stream->key, consumer_data.stream_ht);
>
> /* The stream is not metadata. Get relayd reference if exists. */
> relayd = consumer_find_relayd(msg.u.stream.net_index);
> --
> 1.7.10.4
>
>
> _______________________________________________
> lttng-dev mailing list
> lttng-dev at lists.lttng.org
> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev
--
Mathieu Desnoyers
Operating System Efficiency R&D Consultant
EfficiOS Inc.
http://www.efficios.com
More information about the lttng-dev
mailing list