[lttng-dev] [PATCH lttng-tools] Fix: consumer should await for initial streams
Mathieu Desnoyers
mathieu.desnoyers at efficios.com
Fri Sep 28 15:39:42 EDT 2012
lttng-sessiond need to let the consumer know how many streams are sent
initially, so that for very short traces (short-lived apps, short kernel
trace), the consumerd don't run into the scenario where it deletes the
channel when there are still pending streams to receive for this
channel.
Fixes #355
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
---
diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c
index b69df16..d33f85f 100644
--- a/src/bin/lttng-sessiond/consumer.c
+++ b/src/bin/lttng-sessiond/consumer.c
@@ -486,7 +486,8 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
int channel_key,
uint64_t max_sb_size,
uint64_t mmap_len,
- const char *name)
+ const char *name,
+ unsigned int nb_init_streams)
{
assert(msg);
@@ -500,6 +501,7 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
msg->u.channel.channel_key = channel_key;
msg->u.channel.max_sb_size = max_sb_size;
msg->u.channel.mmap_len = mmap_len;
+ msg->u.channel.nb_init_streams = nb_init_streams;
}
/*
diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h
index 5e8ad9b..ec4ef3f 100644
--- a/src/bin/lttng-sessiond/consumer.h
+++ b/src/bin/lttng-sessiond/consumer.h
@@ -195,6 +195,7 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
int channel_key,
uint64_t max_sb_size,
uint64_t mmap_len,
- const char *name);
+ const char *name,
+ unsigned int nb_init_streams);
#endif /* _CONSUMER_H */
diff --git a/src/bin/lttng-sessiond/kernel-consumer.c b/src/bin/lttng-sessiond/kernel-consumer.c
index 8251213..33cbbed 100644
--- a/src/bin/lttng-sessiond/kernel-consumer.c
+++ b/src/bin/lttng-sessiond/kernel-consumer.c
@@ -48,7 +48,8 @@ int kernel_consumer_add_channel(int sock, struct ltt_kernel_channel *channel)
channel->fd,
channel->channel->attr.subbuf_size,
0, /* Kernel */
- channel->channel->name);
+ channel->channel->name,
+ channel->stream_count);
ret = consumer_send_channel(sock, &lkm);
if (ret < 0) {
@@ -116,7 +117,8 @@ int kernel_consumer_add_metadata(int sock, struct ltt_kernel_session *session)
session->metadata->fd,
session->metadata->conf->attr.subbuf_size,
0, /* for kernel */
- "metadata");
+ "metadata",
+ 1);
ret = consumer_send_channel(sock, &lkm);
if (ret < 0) {
diff --git a/src/bin/lttng-sessiond/ust-app.c b/src/bin/lttng-sessiond/ust-app.c
index 3202cd4..fc8728d 100644
--- a/src/bin/lttng-sessiond/ust-app.c
+++ b/src/bin/lttng-sessiond/ust-app.c
@@ -2239,7 +2239,8 @@ int ust_app_start_trace(struct ltt_ust_session *usess, struct ust_app *app)
/* Order is important */
cds_list_add_tail(&ustream->list, &ua_chan->streams.head);
ret = snprintf(ustream->name, sizeof(ustream->name), "%s_%u",
- ua_chan->name, ua_chan->streams.count++);
+ ua_chan->name, ua_chan->streams.count);
+ ua_chan->streams.count++;
if (ret < 0) {
PERROR("asprintf UST create stream");
/*
diff --git a/src/bin/lttng-sessiond/ust-consumer.c b/src/bin/lttng-sessiond/ust-consumer.c
index aabe494..44913cb 100644
--- a/src/bin/lttng-sessiond/ust-consumer.c
+++ b/src/bin/lttng-sessiond/ust-consumer.c
@@ -52,7 +52,8 @@ static int send_channel(int sock, struct ust_app_channel *uchan)
uchan->obj->shm_fd,
uchan->attr.subbuf_size,
uchan->obj->memory_map_size,
- uchan->name);
+ uchan->name,
+ uchan->streams.count);
ret = consumer_send_channel(sock, &msg);
if (ret < 0) {
@@ -208,7 +209,8 @@ static int send_metadata(int sock, struct ust_app_session *usess,
usess->metadata->obj->shm_fd,
usess->metadata->attr.subbuf_size,
usess->metadata->obj->memory_map_size,
- "metadata");
+ "metadata",
+ 1);
ret = consumer_send_channel(sock, &msg);
if (ret < 0) {
diff --git a/src/common/consumer.c b/src/common/consumer.c
index a2980e7..be358aa 100644
--- a/src/common/consumer.c
+++ b/src/common/consumer.c
@@ -319,7 +319,9 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
}
rcu_read_unlock();
- if (!--stream->chan->refcount) {
+ uatomic_dec(&stream->chan->refcount);
+ if (!uatomic_read(&stream->chan->refcount)
+ && !uatomic_read(&stream->chan->nb_init_streams)) {
free_chan = stream->chan;
}
@@ -394,6 +396,16 @@ struct lttng_consumer_stream *consumer_allocate_stream(
assert(0);
goto end;
}
+ /*
+ * When nb_init_streams reaches 0, we don't need to trigger any
+ * action in terms of destroying the associated channel, because
+ * the action that causes the count to become 0 also causes a
+ * stream to be added. The channel deletion will thus be
+ * triggered by the following removal of this stream.
+ */
+ if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
+ uatomic_dec(&stream->chan->nb_init_streams);
+ }
DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d, net_seq_idx %d)",
stream->path_name, stream->key,
stream->shm_fd,
@@ -671,7 +683,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(
int channel_key,
int shm_fd, int wait_fd,
uint64_t mmap_len,
- uint64_t max_sb_size)
+ uint64_t max_sb_size,
+ unsigned int nb_init_streams)
{
struct lttng_consumer_channel *channel;
int ret;
@@ -687,6 +700,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(
channel->mmap_len = mmap_len;
channel->max_sb_size = max_sb_size;
channel->refcount = 0;
+ channel->nb_init_streams = nb_init_streams;
lttng_ht_node_init_ulong(&channel->node, channel->key);
switch (consumer_data.type) {
@@ -1602,7 +1616,8 @@ static void consumer_del_metadata_stream(struct lttng_consumer_stream *stream)
/* Atomically decrement channel refcount since other threads can use it. */
uatomic_dec(&stream->chan->refcount);
- if (!uatomic_read(&stream->chan->refcount)) {
+ if (!uatomic_read(&stream->chan->refcount)
+ && !uatomic_read(&stream->chan->nb_init_streams)) {
free_chan = stream->chan;
}
diff --git a/src/common/consumer.h b/src/common/consumer.h
index dba7765..6dbece9 100644
--- a/src/common/consumer.h
+++ b/src/common/consumer.h
@@ -77,6 +77,13 @@ struct lttng_consumer_channel {
int key;
uint64_t max_sb_size; /* the subbuffer size for this channel */
int refcount; /* Number of streams referencing this channel */
+ /*
+ * nb_init_streams is the number of streams to receive
+ * initially. Used to guarantee that we do not destroy a
+ * channel before receiving all its associated streams.
+ */
+ unsigned int nb_init_streams;
+
/* For UST */
int shm_fd;
int wait_fd;
@@ -342,7 +349,8 @@ extern struct lttng_consumer_channel *consumer_allocate_channel(
int channel_key,
int shm_fd, int wait_fd,
uint64_t mmap_len,
- uint64_t max_sb_size);
+ uint64_t max_sb_size,
+ unsigned int nb_init_streams);
int consumer_add_channel(struct lttng_consumer_channel *channel);
/* lttng-relayd consumer command */
diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c
index f910f03..5a219fc 100644
--- a/src/common/kernel-consumer/kernel-consumer.c
+++ b/src/common/kernel-consumer/kernel-consumer.c
@@ -118,7 +118,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
-1, -1,
msg.u.channel.mmap_len,
- msg.u.channel.max_sb_size);
+ msg.u.channel.max_sb_size,
+ msg.u.channel.nb_init_streams);
if (new_channel == NULL) {
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
goto end_nosignal;
diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h
index 62205f4..6d796ef 100644
--- a/src/common/sessiond-comm/sessiond-comm.h
+++ b/src/common/sessiond-comm/sessiond-comm.h
@@ -255,6 +255,8 @@ struct lttcomm_consumer_msg {
uint64_t max_sb_size; /* the subbuffer size for this channel */
/* shm_fd and wait_fd are sent as ancillary data */
uint64_t mmap_len;
+ /* nb_init_streams is the number of streams open initially. */
+ unsigned int nb_init_streams;
char name[LTTNG_SYMBOL_NAME_LEN];
} channel;
struct {
diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c
index 8ab2b81..ad4b014 100644
--- a/src/common/ust-consumer/ust-consumer.c
+++ b/src/common/ust-consumer/ust-consumer.c
@@ -150,7 +150,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
fds[0], -1,
msg.u.channel.mmap_len,
- msg.u.channel.max_sb_size);
+ msg.u.channel.max_sb_size,
+ msg.u.channel.nb_init_streams);
if (new_channel == NULL) {
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
goto end_nosignal;
--
Mathieu Desnoyers
Operating System Efficiency R&D Consultant
EfficiOS Inc.
http://www.efficios.com
More information about the lttng-dev
mailing list