[lttng-dev] [LTTNG-TOOLS PATCH 5/9] Extract the lost packets and discarded events counters
Julien Desfossez
jdesfossez at efficios.com
Mon Jul 13 11:28:05 EDT 2015
lttng list, now shows the number of discarded events (discard mode) or
lost packets (overwrite mode).
Signed-off-by: Julien Desfossez <jdesfossez at efficios.com>
---
include/lttng/channel.h | 5 +-
src/bin/lttng-sessiond/buffer-registry.c | 39 +++++++
src/bin/lttng-sessiond/buffer-registry.h | 5 +
src/bin/lttng-sessiond/cmd.c | 94 ++++++++++++++-
src/bin/lttng-sessiond/consumer.c | 108 +++++++++++++++++
src/bin/lttng-sessiond/consumer.h | 4 +
src/bin/lttng-sessiond/trace-ust.h | 2 +
src/bin/lttng-sessiond/ust-app.c | 122 ++++++++++++++++++-
src/bin/lttng-sessiond/ust-app.h | 25 ++++
src/bin/lttng/commands/list.c | 2 +
src/common/config/config-session-abi.h | 2 +
src/common/config/config.c | 2 +
src/common/config/session.xsd | 2 +
src/common/consumer.c | 1 +
src/common/consumer.h | 13 +++
src/common/kernel-consumer/kernel-consumer.c | 114 ++++++++++++++++++
src/common/kernel-ctl/kernel-ctl.c | 6 +
src/common/kernel-ctl/kernel-ctl.h | 1 +
src/common/kernel-ctl/kernel-ioctl.h | 2 +
src/common/mi-lttng.c | 16 +++
src/common/mi_lttng.xsd | 2 +
src/common/sessiond-comm/sessiond-comm.h | 8 ++
src/common/ust-consumer/ust-consumer.c | 167 +++++++++++++++++++++++++++
src/common/ust-consumer/ust-consumer.h | 7 ++
24 files changed, 743 insertions(+), 6 deletions(-)
diff --git a/include/lttng/channel.h b/include/lttng/channel.h
index 6ab0c7b..be65b7b 100644
--- a/include/lttng/channel.h
+++ b/include/lttng/channel.h
@@ -30,7 +30,7 @@ extern "C" {
*
* The structures should be initialized to zero before use.
*/
-#define LTTNG_CHANNEL_ATTR_PADDING1 LTTNG_SYMBOL_NAME_LEN + 12
+#define LTTNG_CHANNEL_ATTR_PADDING1 LTTNG_SYMBOL_NAME_LEN - 4 /* 252 */
struct lttng_channel_attr {
int overwrite; /* 1: overwrite, 0: discard */
uint64_t subbuf_size; /* bytes */
@@ -43,6 +43,9 @@ struct lttng_channel_attr {
uint64_t tracefile_count; /* number of tracefiles */
/* LTTng 2.3 padding limit */
unsigned int live_timer_interval; /* usec */
+ /* LTTng 2.7 padding limit */
+ uint64_t discarded_events; /* events */
+ uint64_t lost_packets; /* packets */
char padding[LTTNG_CHANNEL_ATTR_PADDING1];
};
diff --git a/src/bin/lttng-sessiond/buffer-registry.c b/src/bin/lttng-sessiond/buffer-registry.c
index b4667a4..372153e 100644
--- a/src/bin/lttng-sessiond/buffer-registry.c
+++ b/src/bin/lttng-sessiond/buffer-registry.c
@@ -329,6 +329,45 @@ end:
}
/*
+ * Find the consumer channel key from a UST session per-uid channel key.
+ *
+ * Return the matching key or -1 if not found.
+ */
+int buffer_reg_uid_consumer_channel_key(
+ struct cds_list_head *buffer_reg_uid_list,
+ uint64_t usess_id, uint64_t chan_key,
+ uint64_t *consumer_chan_key)
+{
+ struct lttng_ht_iter iter;
+ struct buffer_reg_uid *uid_reg = NULL;
+ struct buffer_reg_session *session_reg = NULL;
+ struct buffer_reg_channel *reg_chan;
+ int ret;
+
+ rcu_read_lock();
+ /*
+ * For the per-uid registry, we have to iterate since we don't have the
+ * uid and bitness key.
+ */
+ cds_list_for_each_entry(uid_reg, buffer_reg_uid_list, lnode) {
+ session_reg = uid_reg->registry;
+ cds_lfht_for_each_entry(session_reg->channels->ht,
+ &iter.iter, reg_chan, node.node) {
+ if (reg_chan->key == chan_key) {
+ *consumer_chan_key = reg_chan->consumer_key;
+ ret = 0;
+ goto end;
+ }
+ }
+ }
+ ret = -1;
+
+end:
+ rcu_read_unlock();
+ return ret;
+}
+
+/*
* Allocate and initialize a buffer registry channel with the given key. Set
* regp with the object pointer.
*
diff --git a/src/bin/lttng-sessiond/buffer-registry.h b/src/bin/lttng-sessiond/buffer-registry.h
index 7a817ec..db1ce0c 100644
--- a/src/bin/lttng-sessiond/buffer-registry.h
+++ b/src/bin/lttng-sessiond/buffer-registry.h
@@ -150,4 +150,9 @@ void buffer_reg_stream_destroy(struct buffer_reg_stream *regp,
/* Global registry. */
void buffer_reg_destroy_registries(void);
+int buffer_reg_uid_consumer_channel_key(
+ struct cds_list_head *buffer_reg_uid_list,
+ uint64_t usess_id, uint64_t chan_key,
+ uint64_t *consumer_chan_key);
+
#endif /* LTTNG_BUFFER_REGISTRY_H */
diff --git a/src/bin/lttng-sessiond/cmd.c b/src/bin/lttng-sessiond/cmd.c
index a6ce344..b972547 100644
--- a/src/bin/lttng-sessiond/cmd.c
+++ b/src/bin/lttng-sessiond/cmd.c
@@ -129,12 +129,93 @@ error:
}
/*
+ * Get run-time attributes if the session has been started (discarded events,
+ * lost packets).
+ */
+static int get_kernel_runtime_stats(struct ltt_session *session,
+ struct ltt_kernel_channel *kchan)
+{
+ int ret;
+
+ if (!session->has_been_started) {
+ ret = 0;
+ goto end;
+ }
+
+ ret = consumer_get_discarded_events(session->id, kchan->fd,
+ session->kernel_session->consumer,
+ &kchan->channel->attr.discarded_events);
+ if (ret < 0) {
+ goto end;
+ }
+
+ ret = consumer_get_lost_packets(session->id, kchan->fd,
+ session->kernel_session->consumer,
+ &kchan->channel->attr.lost_packets);
+ if (ret < 0) {
+ goto end;
+ }
+
+end:
+ return ret;
+}
+
+/*
+ * Get run-time attributes if the session has been started (discarded events,
+ * lost packets).
+ */
+static int get_ust_runtime_stats(struct ltt_session *session,
+ struct ltt_ust_channel *uchan,
+ struct lttng_channel *channel)
+{
+
+ int ret;
+ struct ltt_ust_session *usess;
+
+ if (!session) {
+ ret = -1;
+ goto end;
+ }
+ usess = session->ust_session;
+
+ if (!usess || !session->has_been_started) {
+ ret = 0;
+ goto end;
+ }
+
+ if (usess->buffer_type == LTTNG_BUFFER_PER_UID) {
+ ret = ust_app_uid_channel_runtime_stats(usess->id,
+ &usess->buffer_reg_uid_list,
+ usess->consumer, uchan->id,
+ channel->attr.overwrite,
+ &channel->attr.discarded_events,
+ &channel->attr.lost_packets);
+ } else if (usess->buffer_type == LTTNG_BUFFER_PER_PID) {
+ ret = ust_app_pid_channel_runtime_stats(usess,
+ uchan, usess->consumer,
+ channel->attr.overwrite,
+ &channel->attr.discarded_events,
+ &channel->attr.lost_packets);
+ channel->attr.discarded_events += uchan->per_pid_closed_app_discarded;
+ channel->attr.lost_packets += uchan->per_pid_closed_app_lost;
+
+ } else {
+ ERR("Unsupported buffer type");
+ ret = -1;
+ goto end;
+ }
+
+end:
+ return ret;
+}
+
+/*
* Fill lttng_channel array of all channels.
*/
static void list_lttng_channels(int domain, struct ltt_session *session,
struct lttng_channel *channels)
{
- int i = 0;
+ int i = 0, ret;
struct ltt_kernel_channel *kchan;
DBG("Listing channels for session %s", session->name);
@@ -145,6 +226,10 @@ static void list_lttng_channels(int domain, struct ltt_session *session,
if (session->kernel_session != NULL) {
cds_list_for_each_entry(kchan,
&session->kernel_session->channel_list.head, list) {
+ ret = get_kernel_runtime_stats(session, kchan);
+ if (ret < 0) {
+ goto end;
+ }
/* Copy lttng_channel struct to array */
memcpy(&channels[i], kchan->channel, sizeof(struct lttng_channel));
channels[i].enabled = kchan->enabled;
@@ -177,6 +262,10 @@ static void list_lttng_channels(int domain, struct ltt_session *session,
channels[i].attr.output = LTTNG_EVENT_MMAP;
break;
}
+ ret = get_ust_runtime_stats(session, uchan, &channels[i]);
+ if (ret < 0) {
+ break;
+ }
i++;
}
rcu_read_unlock();
@@ -185,6 +274,9 @@ static void list_lttng_channels(int domain, struct ltt_session *session,
default:
break;
}
+
+end:
+ return;
}
/*
diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c
index 87d5f34..a73749f 100644
--- a/src/bin/lttng-sessiond/consumer.c
+++ b/src/bin/lttng-sessiond/consumer.c
@@ -1357,3 +1357,111 @@ error:
health_code_update();
return ret;
}
+
+/*
+ * Ask the consumer the number of discarded events for a channel.
+ */
+int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key,
+ struct consumer_output *consumer, uint64_t *discarded)
+{
+ int ret;
+ struct consumer_socket *socket;
+ struct lttng_ht_iter iter;
+ struct lttcomm_consumer_msg msg;
+
+ assert(consumer);
+
+ DBG3("Consumer discarded events id %" PRIu64, session_id);
+
+ memset(&msg, 0, sizeof(msg));
+ msg.cmd_type = LTTNG_CONSUMER_DISCARDED_EVENTS;
+ msg.u.discarded_events.session_id = session_id;
+ msg.u.discarded_events.channel_key = channel_key;
+
+ /* Send command for each consumer */
+ rcu_read_lock();
+ cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
+ node.node) {
+ pthread_mutex_lock(socket->lock);
+ ret = consumer_socket_send(socket, &msg, sizeof(msg));
+ if (ret < 0) {
+ pthread_mutex_unlock(socket->lock);
+ goto error_unlock;
+ }
+
+ /*
+ * No need for a recv reply status because the answer to the
+ * command is the reply status message.
+ */
+ ret = consumer_socket_recv(socket, discarded, sizeof(*discarded));
+ if (ret < 0) {
+ ERR("get discarded events");
+ pthread_mutex_unlock(socket->lock);
+ goto error_unlock;
+ }
+ pthread_mutex_unlock(socket->lock);
+ }
+ rcu_read_unlock();
+
+ DBG("Consumer discarded %" PRIu64 " events in session id %" PRIu64,
+ *discarded, session_id);
+ return 0;
+
+error_unlock:
+ rcu_read_unlock();
+ return -1;
+}
+
+/*
+ * Ask the consumer the number of lost packets for a channel.
+ */
+int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key,
+ struct consumer_output *consumer, uint64_t *lost)
+{
+ int ret;
+ struct consumer_socket *socket;
+ struct lttng_ht_iter iter;
+ struct lttcomm_consumer_msg msg;
+
+ assert(consumer);
+
+ DBG3("Consumer lost packets id %" PRIu64, session_id);
+
+ memset(&msg, 0, sizeof(msg));
+ msg.cmd_type = LTTNG_CONSUMER_LOST_PACKETS;
+ msg.u.lost_packets.session_id = session_id;
+ msg.u.lost_packets.channel_key = channel_key;
+
+ /* Send command for each consumer */
+ rcu_read_lock();
+ cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
+ node.node) {
+ pthread_mutex_lock(socket->lock);
+ ret = consumer_socket_send(socket, &msg, sizeof(msg));
+ if (ret < 0) {
+ pthread_mutex_unlock(socket->lock);
+ goto error_unlock;
+ }
+
+ /*
+ * No need for a recv reply status because the answer to the
+ * command is the reply status message.
+ */
+ ret = consumer_socket_recv(socket, lost, sizeof(*lost));
+ if (ret < 0) {
+ ERR("get lost packets");
+ pthread_mutex_unlock(socket->lock);
+ goto error_unlock;
+ }
+ pthread_mutex_unlock(socket->lock);
+ }
+ rcu_read_unlock();
+
+ DBG("Consumer lost %" PRIu64 " packets in session id %" PRIu64,
+ *lost, session_id);
+ return 0;
+
+error_unlock:
+ rcu_read_unlock();
+ return -1;
+}
diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h
index e4845f5..98270ef 100644
--- a/src/bin/lttng-sessiond/consumer.h
+++ b/src/bin/lttng-sessiond/consumer.h
@@ -277,6 +277,10 @@ int consumer_push_metadata(struct consumer_socket *socket,
uint64_t metadata_key, char *metadata_str, size_t len,
size_t target_offset);
int consumer_flush_channel(struct consumer_socket *socket, uint64_t key);
+int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key,
+ struct consumer_output *consumer, uint64_t *discarded);
+int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key,
+ struct consumer_output *consumer, uint64_t *lost);
/* Snapshot command. */
int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
diff --git a/src/bin/lttng-sessiond/trace-ust.h b/src/bin/lttng-sessiond/trace-ust.h
index c4dbe06..3b31751 100644
--- a/src/bin/lttng-sessiond/trace-ust.h
+++ b/src/bin/lttng-sessiond/trace-ust.h
@@ -66,6 +66,8 @@ struct ltt_ust_channel {
struct lttng_ht_node_str node;
uint64_t tracefile_size;
uint64_t tracefile_count;
+ uint64_t per_pid_closed_app_discarded;
+ uint64_t per_pid_closed_app_lost;
};
/* UST domain global (LTTNG_DOMAIN_UST) */
diff --git a/src/bin/lttng-sessiond/ust-app.c b/src/bin/lttng-sessiond/ust-app.c
index 564de69..ca17e6f 100644
--- a/src/bin/lttng-sessiond/ust-app.c
+++ b/src/bin/lttng-sessiond/ust-app.c
@@ -359,6 +359,31 @@ void delete_ust_app_channel_rcu(struct rcu_head *head)
}
/*
+ * Extract the lost packet or discarded events counter when the channel is
+ * being deleted and store the value in the parent channel so we can
+ * access it from lttng list and at stop/destroy.
+ */
+static
+void per_pid_lost_discarded(struct ust_app_channel *ua_chan)
+{
+ uint64_t discarded = 0, lost = 0;
+
+ if (ua_chan->attr.type != LTTNG_UST_CHAN_PER_CPU) {
+ return;
+ }
+
+ if (ua_chan->attr.overwrite) {
+ consumer_get_lost_packets(ua_chan->usess->id, ua_chan->key,
+ ua_chan->usess->consumer, &lost);
+ } else {
+ consumer_get_discarded_events(ua_chan->usess->id, ua_chan->key,
+ ua_chan->usess->consumer, &discarded);
+ }
+ ua_chan->uchan->per_pid_closed_app_discarded += discarded;
+ ua_chan->uchan->per_pid_closed_app_lost += lost;
+}
+
+/*
* Delete ust app channel safely. RCU read lock must be held before calling
* this function.
*/
@@ -405,6 +430,7 @@ void delete_ust_app_channel(int sock, struct ust_app_channel *ua_chan,
if (registry) {
ust_registry_channel_del_free(registry, ua_chan->key);
}
+ per_pid_lost_discarded(ua_chan);
}
if (ua_chan->obj != NULL) {
@@ -847,7 +873,9 @@ error_free:
static
struct ust_app_channel *alloc_ust_app_channel(char *name,
struct ust_app_session *ua_sess,
- struct lttng_ust_channel_attr *attr)
+ struct lttng_ust_channel_attr *attr,
+ struct ltt_ust_session *usess,
+ struct ltt_ust_channel *uchan)
{
struct ust_app_channel *ua_chan;
@@ -885,6 +913,12 @@ struct ust_app_channel *alloc_ust_app_channel(char *name,
}
/* By default, the channel is a per cpu channel. */
ua_chan->attr.type = LTTNG_UST_CHAN_PER_CPU;
+ /*
+ * Back pointers to the ust session and channel.
+ * XXX
+ */
+ ua_chan->usess = usess;
+ ua_chan->uchan = uchan;
DBG3("UST app channel %s allocated", ua_chan->name);
@@ -1690,7 +1724,8 @@ static void shadow_copy_session(struct ust_app_session *ua_sess,
DBG2("Channel %s not found on shadow session copy, creating it",
uchan->name);
- ua_chan = alloc_ust_app_channel(uchan->name, ua_sess, &uchan->attr);
+ ua_chan = alloc_ust_app_channel(uchan->name, ua_sess, &uchan->attr,
+ usess, uchan);
if (ua_chan == NULL) {
/* malloc failed FIXME: Might want to do handle ENOMEM .. */
continue;
@@ -2781,7 +2816,8 @@ static int create_ust_app_channel(struct ust_app_session *ua_sess,
goto end;
}
- ua_chan = alloc_ust_app_channel(uchan->name, ua_sess, &uchan->attr);
+ ua_chan = alloc_ust_app_channel(uchan->name, ua_sess, &uchan->attr,
+ usess, uchan);
if (ua_chan == NULL) {
/* Only malloc can fail here */
ret = -ENOMEM;
@@ -2898,7 +2934,8 @@ static int create_ust_app_metadata(struct ust_app_session *ua_sess,
}
/* Allocate UST metadata */
- metadata = alloc_ust_app_channel(DEFAULT_METADATA_NAME, ua_sess, NULL);
+ metadata = alloc_ust_app_channel(DEFAULT_METADATA_NAME, ua_sess, NULL,
+ NULL, NULL);
if (!metadata) {
/* malloc() failed */
ret = -ENOMEM;
@@ -5362,3 +5399,80 @@ uint64_t ust_app_get_size_one_more_packet_per_stream(struct ltt_ust_session *use
return tot_size;
}
+
+int ust_app_uid_channel_runtime_stats(uint64_t ust_session_id,
+ struct cds_list_head *buffer_reg_uid_list,
+ struct consumer_output *consumer, uint64_t uchan_id,
+ int overwrite, uint64_t *discarded, uint64_t *lost)
+{
+ int ret;
+ uint64_t consumer_chan_key;
+
+ ret = buffer_reg_uid_consumer_channel_key(
+ buffer_reg_uid_list, ust_session_id,
+ uchan_id, &consumer_chan_key);
+ if (ret < 0) {
+ goto end;
+ }
+
+ if (overwrite) {
+ ret = consumer_get_lost_packets(ust_session_id,
+ consumer_chan_key, consumer, lost);
+ } else {
+ ret = consumer_get_discarded_events(ust_session_id,
+ consumer_chan_key, consumer, discarded);
+ }
+
+end:
+ return ret;
+}
+
+int ust_app_pid_channel_runtime_stats(struct ltt_ust_session *usess,
+ struct ltt_ust_channel *uchan,
+ struct consumer_output *consumer, int overwrite,
+ uint64_t *discarded, uint64_t *lost)
+{
+ int ret = 0;
+ struct lttng_ht_iter iter;
+ struct lttng_ht_node_str *ua_chan_node;
+ struct ust_app *app;
+ struct ust_app_session *ua_sess;
+ struct ust_app_channel *ua_chan;
+
+ rcu_read_lock();
+ /*
+ * Iterate over every registered applications, return when we
+ * found one in the right session and channel.
+ */
+ cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
+ struct lttng_ht_iter uiter;
+
+ ua_sess = lookup_session_by_app(usess, app);
+ if (ua_sess == NULL) {
+ continue;
+ }
+
+ /* Get channel */
+ lttng_ht_lookup(ua_sess->channels, (void *)uchan->name, &uiter);
+ ua_chan_node = lttng_ht_iter_get_node_str(&uiter);
+ /* If the session is found for the app, the channel must be there */
+ assert(ua_chan_node);
+
+ ua_chan = caa_container_of(ua_chan_node, struct ust_app_channel, node);
+
+ if (overwrite) {
+ ret = consumer_get_lost_packets(usess->id, ua_chan->key,
+ consumer, lost);
+ goto end;
+ } else {
+ ret = consumer_get_discarded_events(usess->id,
+ ua_chan->key, consumer, discarded);
+ goto end;
+ }
+ goto end;
+ }
+
+end:
+ rcu_read_unlock();
+ return ret;
+}
diff --git a/src/bin/lttng-sessiond/ust-app.h b/src/bin/lttng-sessiond/ust-app.h
index 38eaf6d..34e05e1 100644
--- a/src/bin/lttng-sessiond/ust-app.h
+++ b/src/bin/lttng-sessiond/ust-app.h
@@ -341,6 +341,14 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess,
uint64_t ust_app_get_size_one_more_packet_per_stream(
struct ltt_ust_session *usess, uint64_t cur_nr_packets);
struct ust_app *ust_app_find_by_sock(int sock);
+int ust_app_uid_channel_runtime_stats(uint64_t ust_session_id,
+ struct cds_list_head *buffer_reg_uid_list,
+ struct consumer_output *consumer, uint64_t uchan_id,
+ int overwrite, uint64_t *discarded, uint64_t *lost);
+int ust_app_pid_channel_runtime_stats(struct ltt_ust_session *usess,
+ struct ltt_ust_channel *uchan,
+ struct consumer_output *consumer,
+ int overwrite, uint64_t *discarded, uint64_t *lost);
static inline
int ust_app_supported(void)
@@ -555,6 +563,23 @@ uint64_t ust_app_get_size_one_more_packet_per_stream(
struct ltt_ust_session *usess, uint64_t cur_nr_packets) {
return 0;
}
+static inline
+int ust_app_uid_channel_runtime_stats(uint64_t ust_session_id,
+ struct cds_list_head *buffer_reg_uid_list,
+ struct consumer_output *consumer, int overwrite,
+ uint64_t uchan_id, uint64_t *discarded)
+{
+ return 0;
+}
+
+static inline
+int ust_app_pid_channel_runtime_stats(struct ltt_ust_session *usess,
+ struct ltt_ust_channel *uchan,
+ struct consumer_output *consumer,
+ int overwrite, uint64_t *discarded, uint64_t *lost)
+{
+ return 0;
+}
#endif /* HAVE_LIBLTTNG_UST_CTL */
diff --git a/src/bin/lttng/commands/list.c b/src/bin/lttng/commands/list.c
index 1eac934..e8b376e 100644
--- a/src/bin/lttng/commands/list.c
+++ b/src/bin/lttng/commands/list.c
@@ -1082,6 +1082,8 @@ static void print_channel(struct lttng_channel *channel)
MSG("%sread timer interval: %u", indent6, channel->attr.read_timer_interval);
MSG("%strace file count: %" PRIu64, indent6, channel->attr.tracefile_count);
MSG("%strace file size (bytes): %" PRIu64, indent6, channel->attr.tracefile_size);
+ MSG("%sdiscarded events: %" PRIu64, indent6, channel->attr.discarded_events);
+ MSG("%slost packets: %" PRIu64, indent6, channel->attr.lost_packets);
switch (channel->attr.output) {
case LTTNG_EVENT_SPLICE:
MSG("%soutput: splice()", indent6);
diff --git a/src/common/config/config-session-abi.h b/src/common/config/config-session-abi.h
index c578af7..c2411c1 100644
--- a/src/common/config/config-session-abi.h
+++ b/src/common/config/config-session-abi.h
@@ -46,6 +46,8 @@ extern const char * const config_element_output_type;
extern const char * const config_element_tracefile_size;
extern const char * const config_element_tracefile_count;
extern const char * const config_element_live_timer_interval;
+extern const char * const config_element_discarded_events;
+extern const char * const config_element_lost_packets;
extern const char * const config_element_type;
extern const char * const config_element_buffer_type;
extern const char * const config_element_session;
diff --git a/src/common/config/config.c b/src/common/config/config.c
index a4b59ff..2ed61bc 100644
--- a/src/common/config/config.c
+++ b/src/common/config/config.c
@@ -96,6 +96,8 @@ const char * const config_element_output_type = "output_type";
const char * const config_element_tracefile_size = "tracefile_size";
const char * const config_element_tracefile_count = "tracefile_count";
const char * const config_element_live_timer_interval = "live_timer_interval";
+const char * const config_element_discarded_events = "discarded_events";
+const char * const config_element_lost_packets = "lost_packets";
const char * const config_element_type = "type";
const char * const config_element_buffer_type = "buffer_type";
const char * const config_element_session = "session";
diff --git a/src/common/config/session.xsd b/src/common/config/session.xsd
index 0a7458d..5f2e5ce 100644
--- a/src/common/config/session.xsd
+++ b/src/common/config/session.xsd
@@ -181,6 +181,8 @@ elementFormDefault="qualified" version="2.5">
<xs:element name="tracefile_size" type="uint64_type" default="0" minOccurs="0"/> <!-- bytes -->
<xs:element name="tracefile_count" type="uint64_type" default="0" minOccurs="0"/>
<xs:element name="live_timer_interval" type="uint32_type" default="0" minOccurs="0"/> <!-- usec -->
+ <xs:element name="discarded_events" type="uint64_type" default="0" minOccurs="0"/>
+ <xs:element name="lost_packets" type="uint64_type" default="0" minOccurs="0"/>
<xs:element name="events" type="event_list_type" minOccurs="0"/>
<xs:element name="contexts" type="event_context_list_type" minOccurs="0"/>
</xs:all>
diff --git a/src/common/consumer.c b/src/common/consumer.c
index effa5f8..5207ad1 100644
--- a/src/common/consumer.c
+++ b/src/common/consumer.c
@@ -562,6 +562,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
stream->monitor = monitor;
stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
stream->index_fd = -1;
+ stream->last_sequence_number = -1ULL;
pthread_mutex_init(&stream->lock, NULL);
/* If channel is the metadata, flag this stream as metadata. */
diff --git a/src/common/consumer.h b/src/common/consumer.h
index 509e24e..c47e860 100644
--- a/src/common/consumer.h
+++ b/src/common/consumer.h
@@ -58,6 +58,8 @@ enum lttng_consumer_command {
LTTNG_CONSUMER_SNAPSHOT_CHANNEL,
LTTNG_CONSUMER_SNAPSHOT_METADATA,
LTTNG_CONSUMER_STREAMS_SENT,
+ LTTNG_CONSUMER_DISCARDED_EVENTS,
+ LTTNG_CONSUMER_LOST_PACKETS,
};
/* State of each fd in consumer */
@@ -211,6 +213,10 @@ struct lttng_consumer_channel {
int nr_stream_fds;
char root_shm_path[PATH_MAX];
char shm_path[PATH_MAX];
+ /* Total number of discarded events for that channel. */
+ uint64_t discarded_events;
+ /* Total number of missed packets due to overwriting (overwrite). */
+ uint64_t lost_packets;
};
/*
@@ -337,6 +343,13 @@ struct lttng_consumer_stream {
*/
uint64_t ust_metadata_pushed;
/*
+ * Copy of the last discarded event value to detect the overflow of
+ * the counter.
+ */
+ unsigned long last_discarded_events;
+ /* Copy of the sequence number of the last packet extracted. */
+ uint64_t last_sequence_number;
+ /*
* FD of the index file for this stream.
*/
int index_fd;
diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c
index e30d21b..52ab2ef 100644
--- a/src/common/kernel-consumer/kernel-consumer.c
+++ b/src/common/kernel-consumer/kernel-consumer.c
@@ -941,6 +941,66 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
goto end_nosignal;
}
+ case LTTNG_CONSUMER_DISCARDED_EVENTS:
+ {
+ uint64_t ret;
+ struct lttng_consumer_channel *channel;
+ uint64_t id = msg.u.discarded_events.session_id;
+ uint64_t key = msg.u.discarded_events.channel_key;
+
+ channel = consumer_find_channel(key);
+ if (!channel) {
+ ERR("Kernel consumer discarded events channel %"
+ PRIu64 " not found", key);
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ }
+
+ DBG("Kernel consumer discarded events command for session id %"
+ PRIu64 ", channel key %" PRIu64, id, key);
+
+ ret = channel->discarded_events;
+
+ health_code_update();
+
+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+ if (ret < 0) {
+ PERROR("send discarded events");
+ goto error_fatal;
+ }
+
+ break;
+ }
+ case LTTNG_CONSUMER_LOST_PACKETS:
+ {
+ uint64_t ret;
+ struct lttng_consumer_channel *channel;
+ uint64_t id = msg.u.lost_packets.session_id;
+ uint64_t key = msg.u.lost_packets.channel_key;
+
+ channel = consumer_find_channel(key);
+ if (!channel) {
+ ERR("Kernel consumer lost packets channel %"
+ PRIu64 " not found", key);
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ }
+
+ DBG("Kernel consumer lost packets command for session id %"
+ PRIu64 ", channel key %" PRIu64, id, key);
+
+ ret = channel->lost_packets;
+
+ health_code_update();
+
+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+ if (ret < 0) {
+ PERROR("send lost packets");
+ goto error_fatal;
+ }
+
+ break;
+ }
default:
goto end_nosignal;
}
@@ -1052,6 +1112,56 @@ end:
return ret;
}
+static
+int get_discarded_lost(struct lttng_consumer_stream *stream, unsigned long len)
+{
+ int ret;
+ uint64_t seq, discarded;
+
+ ret = kernctl_get_sequence_number(stream->wait_fd, &seq);
+ if (ret < 0) {
+ PERROR("kernctl_get_sequence_number");
+ goto end;
+ }
+ /*
+ * Start the sequence when we extract the first packet in case we
+ * don't start at 0 (connecting later a consumer for example).
+ */
+ if (stream->last_sequence_number == -1ULL) {
+ stream->last_sequence_number = seq;
+ } else if (seq < stream->last_sequence_number) {
+ int order = utils_get_count_order_ulong(len);
+ uint64_t overflow = 1ULL << order;
+
+ stream->chan->lost_packets += overflow - stream->last_sequence_number + seq;
+ } else if (seq > stream->last_sequence_number) {
+ stream->chan->lost_packets += seq - stream->last_sequence_number - 1;
+ } else {
+ /* seq == last_sequence_number */
+ assert(0);
+ }
+ stream->last_sequence_number = seq;
+
+ ret = kernctl_get_events_discarded(stream->wait_fd, &discarded);
+ if (ret < 0) {
+ PERROR("kernctl_get_events_discarded");
+ goto end;
+ }
+ if (discarded < stream->last_discarded_events) {
+ stream->chan->discarded_events += (2 ^ CAA_BITS_PER_LONG) -
+ stream->last_discarded_events + discarded;
+
+ } else {
+ stream->chan->discarded_events += discarded -
+ stream->last_discarded_events;
+ }
+ stream->last_discarded_events = discarded;
+
+ ret = 0;
+end:
+ return ret;
+}
+
/*
* Consume data on a file descriptor and write it on a trace file.
*/
@@ -1116,6 +1226,10 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
}
goto end;
}
+ ret = get_discarded_lost(stream, len);
+ if (ret < 0) {
+ goto end;
+ }
} else {
write_index = 0;
}
diff --git a/src/common/kernel-ctl/kernel-ctl.c b/src/common/kernel-ctl/kernel-ctl.c
index 18cd955..b3be8be 100644
--- a/src/common/kernel-ctl/kernel-ctl.c
+++ b/src/common/kernel-ctl/kernel-ctl.c
@@ -533,3 +533,9 @@ int kernctl_get_current_timestamp(int fd, uint64_t *ts)
{
return ioctl(fd, LTTNG_RING_BUFFER_GET_CURRENT_TIMESTAMP, ts);
}
+
+/* Returns the packet sequence number of the current sub-buffer. */
+int kernctl_get_sequence_number(int fd, uint64_t *seq)
+{
+ return ioctl(fd, LTTNG_RING_BUFFER_GET_SEQ_NUM, seq);
+}
diff --git a/src/common/kernel-ctl/kernel-ctl.h b/src/common/kernel-ctl/kernel-ctl.h
index b71b285..ab8154c 100644
--- a/src/common/kernel-ctl/kernel-ctl.h
+++ b/src/common/kernel-ctl/kernel-ctl.h
@@ -99,5 +99,6 @@ int kernctl_get_content_size(int fd, uint64_t *content_size);
int kernctl_get_packet_size(int fd, uint64_t *packet_size);
int kernctl_get_stream_id(int fd, uint64_t *stream_id);
int kernctl_get_current_timestamp(int fd, uint64_t *ts);
+int kernctl_get_sequence_number(int fd, uint64_t *seq);
#endif /* _LTTNG_KERNEL_CTL_H */
diff --git a/src/common/kernel-ctl/kernel-ioctl.h b/src/common/kernel-ctl/kernel-ioctl.h
index e469b5f..d988a83 100644
--- a/src/common/kernel-ctl/kernel-ioctl.h
+++ b/src/common/kernel-ctl/kernel-ioctl.h
@@ -64,6 +64,8 @@
#define LTTNG_RING_BUFFER_GET_STREAM_ID _IOR(0xF6, 0x25, uint64_t)
/* returns the current timestamp */
#define LTTNG_RING_BUFFER_GET_CURRENT_TIMESTAMP _IOR(0xF6, 0x26, uint64_t)
+/* returns the packet sequence number */
+#define LTTNG_RING_BUFFER_GET_SEQ_NUM _IOR(0xF6, 0x27, uint64_t)
/* Old ABI (without support for 32/64 bits compat) */
/* LTTng file descriptor ioctl */
diff --git a/src/common/mi-lttng.c b/src/common/mi-lttng.c
index 44ff56f..4291431 100644
--- a/src/common/mi-lttng.c
+++ b/src/common/mi-lttng.c
@@ -903,6 +903,22 @@ int mi_lttng_channel_attr(struct mi_writer *writer,
goto end;
}
+ /* Discarded events */
+ ret = mi_lttng_writer_write_element_unsigned_int(writer,
+ config_element_discarded_events,
+ attr->discarded_events);
+ if (ret) {
+ goto end;
+ }
+
+ /* Lost packets */
+ ret = mi_lttng_writer_write_element_unsigned_int(writer,
+ config_element_lost_packets,
+ attr->lost_packets);
+ if (ret) {
+ goto end;
+ }
+
/* Closing attributes */
ret = mi_lttng_writer_close_element(writer);
if (ret) {
diff --git a/src/common/mi_lttng.xsd b/src/common/mi_lttng.xsd
index 3f0894e..60430b1 100644
--- a/src/common/mi_lttng.xsd
+++ b/src/common/mi_lttng.xsd
@@ -313,6 +313,8 @@ THE SOFTWARE.
<xs:element name="tracefile_size" type="uint64_type" default="0" minOccurs="0" /> <!-- bytes -->
<xs:element name="tracefile_count" type="uint64_type" default="0" minOccurs="0" />
<xs:element name="live_timer_interval" type="uint32_type" default="0" minOccurs="0" /> <!-- usec -->
+ <xs:element name="discarded_events" type="uint64_type" default="0" minOccurs="0" />
+ <xs:element name="lost_packets" type="uint64_type" default="0" minOccurs="0" />
</xs:all>
</xs:complexType>
diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h
index 1e51ae1..3d08ab6 100644
--- a/src/common/sessiond-comm/sessiond-comm.h
+++ b/src/common/sessiond-comm/sessiond-comm.h
@@ -470,6 +470,14 @@ struct lttcomm_consumer_msg {
uint64_t channel_key;
uint64_t net_seq_idx;
} LTTNG_PACKED sent_streams;
+ struct {
+ uint64_t session_id;
+ uint64_t channel_key;
+ } LTTNG_PACKED discarded_events;
+ struct {
+ uint64_t session_id;
+ uint64_t channel_key;
+ } LTTNG_PACKED lost_packets;
} u;
} LTTNG_PACKED;
diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c
index fbc3bbb..d39f4e6 100644
--- a/src/common/ust-consumer/ust-consumer.c
+++ b/src/common/ust-consumer/ust-consumer.c
@@ -1649,6 +1649,105 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
health_code_update();
break;
}
+ case LTTNG_CONSUMER_DISCARDED_EVENTS:
+ {
+ uint64_t ret;
+ struct lttng_ht_iter iter;
+ struct lttng_ht *ht;
+ struct lttng_consumer_stream *stream;
+ uint64_t id = msg.u.discarded_events.session_id;
+ uint64_t key = msg.u.discarded_events.channel_key;
+
+ DBG("UST consumer discarded events command for session id %"
+ PRIu64, id);
+ rcu_read_lock();
+ pthread_mutex_lock(&consumer_data.lock);
+
+ ht = consumer_data.stream_list_ht;
+
+ /*
+ * We only need a reference to the channel, but they are not
+ * directly indexed, so we just use the first matching stream
+ * to extract the information we need, we default to 0 if not
+ * found (no events are dropped if the channel is not yet in
+ * use).
+ */
+ ret = 0;
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&id, lttng_ht_seed),
+ ht->match_fct, &id,
+ &iter.iter, stream, node_session_id.node) {
+ if (stream->chan->key == key) {
+ ret = stream->chan->discarded_events;
+ break;
+ }
+ }
+ pthread_mutex_unlock(&consumer_data.lock);
+ rcu_read_unlock();
+
+ DBG("UST consumer discarded events command for session id %"
+ PRIu64 ", channel key %" PRIu64, id, key);
+
+ health_code_update();
+
+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+ if (ret < 0) {
+ PERROR("send discarded events");
+ goto error_fatal;
+ }
+
+ break;
+ }
+ case LTTNG_CONSUMER_LOST_PACKETS:
+ {
+ uint64_t ret;
+ struct lttng_ht_iter iter;
+ struct lttng_ht *ht;
+ struct lttng_consumer_stream *stream;
+ uint64_t id = msg.u.lost_packets.session_id;
+ uint64_t key = msg.u.lost_packets.channel_key;
+
+ DBG("UST consumer lost packets command for session id %"
+ PRIu64, id);
+ rcu_read_lock();
+ pthread_mutex_lock(&consumer_data.lock);
+
+ ht = consumer_data.stream_list_ht;
+
+ /*
+ * We only need a reference to the channel, but they are not
+ * directly indexed, so we just use the first matching stream
+ * to extract the information we need, we default to 0 if not
+ * found (no packets lost if the channel is not yet in use).
+ */
+ ret = 0;
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&id, lttng_ht_seed),
+ ht->match_fct, &id,
+ &iter.iter, stream, node_session_id.node) {
+ if (stream->chan->key == key) {
+ ret = stream->chan->lost_packets;
+ break;
+ }
+ }
+ pthread_mutex_unlock(&consumer_data.lock);
+ rcu_read_unlock();
+
+ DBG("UST consumer lost packets command for session id %"
+ PRIu64 ", channel key %" PRIu64, id, key);
+
+ health_code_update();
+
+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+ if (ret < 0) {
+ PERROR("send lost packets");
+ goto error_fatal;
+ }
+
+ break;
+ }
default:
break;
}
@@ -1791,6 +1890,16 @@ int lttng_ustconsumer_get_current_timestamp(
return ustctl_get_current_timestamp(stream->ustream, ts);
}
+int lttng_ustconsumer_get_sequence_number(
+ struct lttng_consumer_stream *stream, uint64_t *seq)
+{
+ assert(stream);
+ assert(stream->ustream);
+ assert(seq);
+
+ return ustctl_get_sequence_number(stream->ustream, seq);
+}
+
/*
* Called when the stream signal the consumer that it has hang up.
*/
@@ -2077,6 +2186,58 @@ end:
return ret;
}
+static
+int get_discarded_lost(struct lttng_consumer_stream *stream, unsigned long len)
+{
+ uint64_t seq, discarded;
+ int ret;
+
+ ret = ustctl_get_sequence_number(stream->ustream, &seq);
+ if (ret < 0) {
+ PERROR("ustctl_get_sequence_number");
+ goto end;
+ }
+ /*
+ * Start the sequence when we extract the first packet in case we
+ * don't start at 0 (connecting later a consumer for example).
+ */
+ if (stream->last_sequence_number == -1ULL) {
+ stream->last_sequence_number = seq;
+ } else if (seq < stream->last_sequence_number) {
+ int order = utils_get_count_order_ulong(len);
+ uint64_t overflow = 1ULL << order;
+
+ stream->chan->lost_packets += overflow -
+ stream->last_sequence_number + seq;
+ } else if (seq > stream->last_sequence_number) {
+ stream->chan->lost_packets += seq -
+ stream->last_sequence_number - 1;
+ } else {
+ /* seq == last_sequence_number */
+ assert(0);
+ }
+ stream->last_sequence_number = seq;
+
+ ret = ustctl_get_events_discarded(stream->ustream, &discarded);
+ if (ret < 0) {
+ PERROR("kernctl_get_events_discarded");
+ goto end;
+ }
+ if (discarded < stream->last_discarded_events) {
+ stream->chan->discarded_events += (2 ^ CAA_BITS_PER_LONG) -
+ stream->last_discarded_events + discarded;
+
+ } else {
+ stream->chan->discarded_events += discarded -
+ stream->last_discarded_events;
+ }
+ stream->last_discarded_events = discarded;
+ ret = 0;
+
+end:
+ return ret;
+}
+
/*
* Read subbuffer from the given stream.
*
@@ -2160,6 +2321,12 @@ retry:
if (ret < 0) {
goto end;
}
+
+ ret = get_discarded_lost(stream, len);
+ if (ret < 0) {
+ PERROR("kernctl_get_events_discarded");
+ goto end;
+ }
} else {
write_index = 0;
}
diff --git a/src/common/ust-consumer/ust-consumer.h b/src/common/ust-consumer/ust-consumer.h
index 4357380..bde355c 100644
--- a/src/common/ust-consumer/ust-consumer.h
+++ b/src/common/ust-consumer/ust-consumer.h
@@ -67,6 +67,8 @@ void lttng_ustconsumer_flush_buffer(struct lttng_consumer_stream *stream,
int producer);
int lttng_ustconsumer_get_current_timestamp(
struct lttng_consumer_stream *stream, uint64_t *ts);
+int lttng_ustconsumer_get_sequence_number(
+ struct lttng_consumer_stream *stream, uint64_t *seq);
#else /* HAVE_LIBLTTNG_UST_CTL */
@@ -206,6 +208,11 @@ int lttng_ustconsumer_get_current_timestamp(
{
return -ENOSYS;
}
+int lttng_ustconsumer_get_sequence_number(
+ struct lttng_consumer_stream *stream, uint64_t *seq)
+{
+ return -ENOSYS;
+}
static inline
int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream,
uint64_t *stream_id)
--
1.9.1
More information about the lttng-dev
mailing list