[lttng-dev] [PATCH lttng-tools] Fix: relayd vs consumerd compatibility
Jérémie Galarneau
jeremie.galarneau at efficios.com
Fri Dec 16 09:50:03 UTC 2016
Merged in master, stable-2.9 and stable-2.8 with minor changes. Read on.
Thanks!
Jérémie
On 15 December 2016 at 05:04, Mathieu Desnoyers
<mathieu.desnoyers at efficios.com> wrote:
> relay and consumerd 2.7 and 2.8 are expected to negociate compatibility
> with the lowest common minor version.
>
> If a consumer daemon 2.8 interacts with a relayd 2.7, it needs to send
> the index fields for ctf index 1.0. Same if a relayd 2.8 interacts with
> a consumer daemon 2.7: relayd should expect ctf index 1.0 fields, and
> generate a ctf index 1.0 index file layout.
>
> If both relayd and consumerd versions are 2.8+, then we can send the ctf
> index 1.1 fields over the protocol, and store them in the index files.
>
> Whenever the relayd live viewer server opens and reads an index file,
> it needs to use the file's header to figure out the index "element"
> size.
>
> [ Should be applied to master, stable-2.9, stable-2.8. ]
>
> Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> ---
> src/bin/lttng-relayd/index.c | 30 +++---
> src/bin/lttng-relayd/index.h | 11 ++-
> src/bin/lttng-relayd/live.c | 34 +++----
> src/bin/lttng-relayd/main.c | 45 +++++----
> src/bin/lttng-relayd/stream.c | 6 +-
> src/bin/lttng-relayd/stream.h | 4 +-
> src/bin/lttng-relayd/viewer-stream.c | 47 ++++-----
> src/bin/lttng-relayd/viewer-stream.h | 4 +-
> src/common/consumer/consumer-stream.c | 20 ++--
> src/common/consumer/consumer.c | 37 +++-----
> src/common/consumer/consumer.h | 4 +-
> src/common/index/ctf-index.h | 39 ++++++++
> src/common/index/index.c | 136 ++++++++++++++++++++++-----
> src/common/index/index.h | 32 ++++++-
> src/common/kernel-consumer/kernel-consumer.c | 11 ++-
> src/common/macros.h | 2 +
> src/common/relayd/relayd.c | 5 +-
> src/common/sessiond-comm/relayd.h | 20 ++++
> src/common/ust-consumer/ust-consumer.c | 11 ++-
> 19 files changed, 318 insertions(+), 180 deletions(-)
>
> diff --git a/src/bin/lttng-relayd/index.c b/src/bin/lttng-relayd/index.c
> index 80a4bb9..b15bbcd 100644
> --- a/src/bin/lttng-relayd/index.c
> +++ b/src/bin/lttng-relayd/index.c
> @@ -166,18 +166,19 @@ end:
> return index;
> }
>
> -int relay_index_set_fd(struct relay_index *index, struct stream_fd *index_fd,
> +int relay_index_set_file(struct relay_index *index,
> + struct lttng_index_file *index_file,
> uint64_t data_offset)
> {
> int ret = 0;
>
> pthread_mutex_lock(&index->lock);
> - if (index->index_fd) {
> + if (index->index_file) {
> ret = -1;
> goto end;
> }
> - stream_fd_get(index_fd);
> - index->index_fd = index_fd;
> + lttng_index_file_get(index_file);
> + index->index_file = index_file;
> index->index_data.offset = data_offset;
> end:
> pthread_mutex_unlock(&index->lock);
> @@ -228,9 +229,9 @@ static void index_release(struct urcu_ref *ref)
> int ret;
> struct lttng_ht_iter iter;
>
> - if (index->index_fd) {
> - stream_fd_put(index->index_fd);
> - index->index_fd = NULL;
> + if (index->index_file) {
> + lttng_index_file_put(index->index_file);
> + index->index_file = NULL;
> }
> if (index->in_hash_table) {
> /* Delete index from hash table. */
> @@ -290,21 +291,16 @@ int relay_index_try_flush(struct relay_index *index)
> goto skip;
> }
> /* Check if we are ready to flush. */
> - if (!index->has_index_data || !index->index_fd) {
> + if (!index->has_index_data || !index->index_file) {
> goto skip;
> }
> - fd = index->index_fd->fd;
> + fd = index->index_file->fd;
> DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
> " on fd %d", index->stream->stream_handle,
> index->index_n.key, fd);
> flushed = true;
> index->flushed = true;
> - ret = index_write(fd, &index->index_data, sizeof(index->index_data));
> - if (ret == sizeof(index->index_data)) {
> - ret = 0;
> - } else {
> - ret = -1;
> - }
> + ret = lttng_index_file_write(index->index_file, &index->index_data);
> skip:
> pthread_mutex_unlock(&index->lock);
>
> @@ -341,11 +337,11 @@ void relay_index_close_partial_fd(struct relay_stream *stream)
> rcu_read_lock();
> cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
> index, index_n.node) {
> - if (!index->index_fd) {
> + if (!index->index_file) {
> continue;
> }
> /*
> - * Partial index has its index_fd: we have only
> + * Partial index has its index_file: we have only
> * received its info from the data socket.
> * Put self-ref from index.
> */
> diff --git a/src/bin/lttng-relayd/index.h b/src/bin/lttng-relayd/index.h
> index 15c4ac8..80fe86a 100644
> --- a/src/bin/lttng-relayd/index.h
> +++ b/src/bin/lttng-relayd/index.h
> @@ -40,10 +40,10 @@ struct relay_index {
>
> pthread_mutex_t lock;
> /*
> - * FD on which to write the index data. May differ from
> - * stream->index_fd due to tracefile rotation.
> + * index file on which to write the index data. May differ from
> + * stream->index_file due to tracefile rotation.
> */
> - struct stream_fd *index_fd;
> + struct lttng_index_file *index_file;
>
> /* Index packet data. This is the data that is written on disk. */
> struct ctf_packet_index index_data;
> @@ -64,8 +64,9 @@ struct relay_index {
> struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
> uint64_t net_seq_num);
> void relay_index_put(struct relay_index *index);
> -int relay_index_set_fd(struct relay_index *index, struct stream_fd *index_fd,
> - uint64_t data_offset);
> +int relay_index_set_file(struct relay_index *index,
> + struct lttng_index_file *index_file,
> + uint64_t data_offset);
> int relay_index_set_data(struct relay_index *index,
> const struct ctf_packet_index *data);
> int relay_index_try_flush(struct relay_index *index);
> diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c
> index 2c3a105..1d0492c 100644
> --- a/src/bin/lttng-relayd/live.c
> +++ b/src/bin/lttng-relayd/live.c
> @@ -1122,7 +1122,7 @@ error:
> /*
> * Open the index file if needed for the given vstream.
> *
> - * If an index file is successfully opened, the vstream index_fd set with
> + * If an index file is successfully opened, the vstream index_file set with
Slightly modified to make it a parse-able sentence by the same occasion.
> * it.
> *
> * Return 0 on success, a negative value on error (-ENOENT if not ready yet).
> @@ -1133,8 +1133,9 @@ static int try_open_index(struct relay_viewer_stream *vstream,
> struct relay_stream *rstream)
> {
> int ret = 0;
> + struct lttng_index_file *index_file;
>
> - if (vstream->index_fd) {
> + if (vstream->index_file) {
> goto end;
> }
>
> @@ -1145,20 +1146,15 @@ static int try_open_index(struct relay_viewer_stream *vstream,
> ret = -ENOENT;
> goto end;
> }
> - ret = index_open(vstream->path_name, vstream->channel_name,
> + index_file = lttng_index_file_open(vstream->path_name,
> + vstream->channel_name,
> vstream->stream->tracefile_count,
> vstream->current_tracefile_id);
> - if (ret >= 0) {
> - vstream->index_fd = stream_fd_create(ret);
> - if (!vstream->index_fd) {
> - if (close(ret)) {
> - PERROR("close");
> - }
> - ret = -1;
> - } else {
> - ret = 0;
> - }
> - goto end;
> + if (index_file) {
> + vstream->index_file = index_file;
> + ret = 0;
> + } else {
> + ret = -1;
Since you simplified the code quite a bit here, it makes sense to
eliminate "index_file" altogether.
Changed to:
vstream->index_file = lttng_index_file_open....
if (!vstream->index_file) {
ret = -1;
}
end:
return ret;
> }
>
> end:
> @@ -1277,7 +1273,6 @@ static
> int viewer_get_next_index(struct relay_connection *conn)
> {
> int ret;
> - ssize_t read_ret;
> struct lttng_viewer_get_next_index request_index;
> struct lttng_viewer_index viewer_index;
> struct ctf_packet_index packet_index;
> @@ -1400,11 +1395,10 @@ int viewer_get_next_index(struct relay_connection *conn)
> viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
> }
>
> - read_ret = lttng_read(vstream->index_fd->fd, &packet_index,
> - sizeof(packet_index));
> - if (read_ret < sizeof(packet_index)) {
> - ERR("Relay reading index file %d returned %zd",
> - vstream->index_fd->fd, read_ret);
> + ret = lttng_index_file_read(vstream->index_file, &packet_index);
> + if (ret) {
> + ERR("Relay error reading index file %d",
> + vstream->index_file->fd);
> viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
> goto send_reply;
> } else {
> diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c
> index 9c0e2b1..43603a9 100644
> --- a/src/bin/lttng-relayd/main.c
> +++ b/src/bin/lttng-relayd/main.c
> @@ -1946,6 +1946,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
> struct lttcomm_relayd_generic_reply reply;
> struct relay_stream *stream;
> uint64_t net_seq_num;
> + size_t msg_len;
>
> assert(conn);
>
> @@ -1957,9 +1958,12 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
> goto end_no_session;
> }
>
> + msg_len = lttcomm_relayd_index_len(lttng_to_index_major(conn->major,
> + conn->minor),
Changed the indentation here.
> + lttng_to_index_minor(conn->major, conn->minor));
> ret = conn->sock->ops->recvmsg(conn->sock, &index_info,
> - sizeof(index_info), 0);
> - if (ret < sizeof(index_info)) {
> + msg_len, 0);
> + if (ret < msg_len) {
> if (ret == 0) {
> /* Orderly shutdown. Not necessary to print an error. */
> DBG("Socket %d did an orderly shutdown", conn->sock->fd);
> @@ -2183,38 +2187,31 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
> goto end;
> }
>
> - if (rotate_index || !stream->index_fd) {
> - int fd;
> + if (rotate_index || !stream->index_file) {
> + uint32_t major, minor;
>
> - /* Put ref on previous index_fd. */
> - if (stream->index_fd) {
> - stream_fd_put(stream->index_fd);
> - stream->index_fd = NULL;
> + /* Put ref on previous index_file. */
> + if (stream->index_file) {
> + lttng_index_file_put(stream->index_file);
> + stream->index_file = NULL;
> }
> -
> - fd = index_create_file(stream->path_name, stream->channel_name,
> + major = stream->trace->session->major;
> + minor = stream->trace->session->minor;
> + stream->index_file = lttng_index_file_create(stream->path_name,
> + stream->channel_name,
> -1, -1, stream->tracefile_size,
> - tracefile_array_get_file_index_head(stream->tfa));
> - if (fd < 0) {
> + tracefile_array_get_file_index_head(stream->tfa),
> + lttng_to_index_major(major, minor),
> + lttng_to_index_minor(major, minor));
> + if (!stream->index_file) {
> ret = -1;
> /* Put self-ref for this index due to error. */
> relay_index_put(index);
> goto end;
> }
> - stream->index_fd = stream_fd_create(fd);
> - if (!stream->index_fd) {
> - ret = -1;
> - if (close(fd)) {
> - PERROR("Error closing FD %d", fd);
> - }
> - /* Put self-ref for this index due to error. */
> - relay_index_put(index);
> - /* Will put the local ref. */
> - goto end;
> - }
> }
>
> - if (relay_index_set_fd(index, stream->index_fd, data_offset)) {
> + if (relay_index_set_file(index, stream->index_file, data_offset)) {
> ret = -1;
> /* Put self-ref for this index due to error. */
> relay_index_put(index);
> diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c
> index 335a1cf..c59bb94 100644
> --- a/src/bin/lttng-relayd/stream.c
> +++ b/src/bin/lttng-relayd/stream.c
> @@ -306,9 +306,9 @@ static void stream_release(struct urcu_ref *ref)
> stream_fd_put(stream->stream_fd);
> stream->stream_fd = NULL;
> }
> - if (stream->index_fd) {
> - stream_fd_put(stream->index_fd);
> - stream->index_fd = NULL;
> + if (stream->index_file) {
> + lttng_index_file_put(stream->index_file);
> + stream->index_file = NULL;
> }
> if (stream->trace) {
> ctf_trace_put(stream->trace);
> diff --git a/src/bin/lttng-relayd/stream.h b/src/bin/lttng-relayd/stream.h
> index 5030e5d..d471c7b 100644
> --- a/src/bin/lttng-relayd/stream.h
> +++ b/src/bin/lttng-relayd/stream.h
> @@ -58,8 +58,8 @@ struct relay_stream {
>
> /* FD on which to write the stream data. */
> struct stream_fd *stream_fd;
> - /* FD on which to write the index data. */
> - struct stream_fd *index_fd;
> + /* index file on which to write the index data. */
> + struct lttng_index_file *index_file;
>
> char *path_name;
> char *channel_name;
> diff --git a/src/bin/lttng-relayd/viewer-stream.c b/src/bin/lttng-relayd/viewer-stream.c
> index 7c59cd0..8a3b09a 100644
> --- a/src/bin/lttng-relayd/viewer-stream.c
> +++ b/src/bin/lttng-relayd/viewer-stream.c
> @@ -116,29 +116,21 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
> * the opening of the index, otherwise open it right now.
> */
> if (stream->index_received_seqcount == 0) {
> - vstream->index_fd = NULL;
> + vstream->index_file = NULL;
> } else {
> - int read_fd;
> -
> - read_fd = index_open(vstream->path_name, vstream->channel_name,
> + vstream->index_file = lttng_index_file_open(vstream->path_name,
> + vstream->channel_name,
> stream->tracefile_count,
> vstream->current_tracefile_id);
> - if (read_fd < 0) {
> - goto error_unlock;
> - }
> - vstream->index_fd = stream_fd_create(read_fd);
> - if (!vstream->index_fd) {
> - if (close(read_fd)) {
> - PERROR("close");
> - }
> + if (!vstream->index_file) {
> goto error_unlock;
> }
> }
>
> - if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_fd) {
> + if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_file) {
> off_t lseek_ret;
>
> - lseek_ret = lseek(vstream->index_fd->fd, 0, SEEK_END);
> + lseek_ret = lseek(vstream->index_file->fd, 0, SEEK_END);
> if (lseek_ret < 0) {
> goto error_unlock;
> }
> @@ -192,9 +184,9 @@ static void viewer_stream_release(struct urcu_ref *ref)
> stream_fd_put(vstream->stream_fd);
> vstream->stream_fd = NULL;
> }
> - if (vstream->index_fd) {
> - stream_fd_put(vstream->index_fd);
> - vstream->index_fd = NULL;
> + if (vstream->index_file) {
> + lttng_index_file_put(vstream->index_file);
> + vstream->index_file = NULL;
> }
> if (vstream->stream) {
> stream_put(vstream->stream);
> @@ -305,29 +297,24 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream)
> vstream->index_sent_seqcount = seq_tail;
> }
>
> - if (vstream->index_fd) {
> - stream_fd_put(vstream->index_fd);
> - vstream->index_fd = NULL;
> + if (vstream->index_file) {
> + lttng_index_file_put(vstream->index_file);
> + vstream->index_file = NULL;
> }
> if (vstream->stream_fd) {
> stream_fd_put(vstream->stream_fd);
> vstream->stream_fd = NULL;
> }
>
> - ret = index_open(vstream->path_name, vstream->channel_name,
> + vstream->index_file = lttng_index_file_open(vstream->path_name,
> + vstream->channel_name,
> stream->tracefile_count,
> vstream->current_tracefile_id);
> - if (ret < 0) {
> + if (!vstream->index_file) {
> + ret = -1;
> goto end;
> - }
> - vstream->index_fd = stream_fd_create(ret);
> - if (vstream->index_fd) {
> - ret = 0;
> } else {
> - if (close(ret)) {
> - PERROR("close");
> - }
> - ret = -1;
> + ret = 0;
> }
> end:
> return ret;
> diff --git a/src/bin/lttng-relayd/viewer-stream.h b/src/bin/lttng-relayd/viewer-stream.h
> index 5dc135d..2514b17 100644
> --- a/src/bin/lttng-relayd/viewer-stream.h
> +++ b/src/bin/lttng-relayd/viewer-stream.h
> @@ -52,8 +52,8 @@ struct relay_viewer_stream {
>
> /* FD from which to read the stream data. */
> struct stream_fd *stream_fd;
> - /* FD from which to read the index data. */
> - struct stream_fd *index_fd;
> + /* index file from which to read the index data. */
> + struct lttng_index_file *index_file;
>
> char *path_name;
> char *channel_name;
> diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c
> index a62cef2..2167f73 100644
> --- a/src/common/consumer/consumer-stream.c
> +++ b/src/common/consumer/consumer-stream.c
> @@ -163,12 +163,8 @@ void consumer_stream_close(struct lttng_consumer_stream *stream)
> stream->out_fd = -1;
> }
>
> - if (stream->index_fd >= 0) {
> - ret = close(stream->index_fd);
> - if (ret) {
> - PERROR("close stream index_fd");
> - }
> - stream->index_fd = -1;
> + if (stream->index_file) {
> + lttng_index_file_put(stream->index_file);
stream->index_file = NULL;
> }
>
> /* Check and cleanup relayd if needed. */
> @@ -359,27 +355,23 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
> * Return 0 on success or else a negative value.
> */
> int consumer_stream_write_index(struct lttng_consumer_stream *stream,
> - struct ctf_packet_index *index)
> + struct ctf_packet_index *element)
> {
> int ret;
> struct consumer_relayd_sock_pair *relayd;
>
> assert(stream);
> - assert(index);
> + assert(element);
>
> rcu_read_lock();
> relayd = consumer_find_relayd(stream->net_seq_idx);
> if (relayd) {
> pthread_mutex_lock(&relayd->ctrl_sock_mutex);
> - ret = relayd_send_index(&relayd->control_sock, index,
> + ret = relayd_send_index(&relayd->control_sock, element,
> stream->relayd_stream_id, stream->next_net_seq_num - 1);
> pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
> } else {
> - ssize_t size_ret;
> -
> - size_ret = index_write(stream->index_fd, index,
> - sizeof(struct ctf_packet_index));
> - if (size_ret < sizeof(struct ctf_packet_index)) {
> + if (lttng_index_file_write(stream->index_file, element)) {
> ret = -1;
> } else {
> ret = 0;
> diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c
> index a184afd..929be65 100644
> --- a/src/common/consumer/consumer.c
> +++ b/src/common/consumer/consumer.c
> @@ -570,7 +570,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
> stream->session_id = session_id;
> stream->monitor = monitor;
> stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
> - stream->index_fd = -1;
> + stream->index_file = NULL;
> stream->last_sequence_number = -1ULL;
> pthread_mutex_init(&stream->lock, NULL);
> pthread_mutex_init(&stream->metadata_timer_lock, NULL);
> @@ -1624,21 +1624,16 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
> }
> outfd = stream->out_fd;
>
> - if (stream->index_fd >= 0) {
> - ret = close(stream->index_fd);
> - if (ret < 0) {
> - PERROR("Closing index");
> - goto end;
> - }
> - stream->index_fd = -1;
> - ret = index_create_file(stream->chan->pathname,
> + if (stream->index_file) {
> + lttng_index_file_put(stream->index_file);
> + stream->index_file = lttng_index_file_create(stream->chan->pathname,
> stream->name, stream->uid, stream->gid,
> stream->chan->tracefile_size,
> - stream->tracefile_count_current);
> - if (ret < 0) {
> + stream->tracefile_count_current,
> + CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
> + if (!stream->index_file) {
> goto end;
> }
> - stream->index_fd = ret;
> }
>
> /* Reset current size because we just perform a rotation. */
> @@ -1831,22 +1826,16 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
> }
> outfd = stream->out_fd;
>
> - if (stream->index_fd >= 0) {
> - ret = close(stream->index_fd);
> - if (ret < 0) {
> - PERROR("Closing index");
> - goto end;
> - }
> - stream->index_fd = -1;
> - ret = index_create_file(stream->chan->pathname,
> + if (stream->index_file) {
> + lttng_index_file_put(stream->index_file);
> + stream->index_file = lttng_index_file_create(stream->chan->pathname,
> stream->name, stream->uid, stream->gid,
> stream->chan->tracefile_size,
> - stream->tracefile_count_current);
> - if (ret < 0) {
> - written = ret;
> + stream->tracefile_count_current,
> + CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
> + if (!stream->index_file) {
> goto end;
> }
> - stream->index_fd = ret;
> }
>
> /* Reset current size because we just perform a rotation. */
> diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h
> index 256ea19..acefacb 100644
> --- a/src/common/consumer/consumer.h
> +++ b/src/common/consumer/consumer.h
> @@ -390,9 +390,9 @@ struct lttng_consumer_stream {
> /* Copy of the sequence number of the last packet extracted. */
> uint64_t last_sequence_number;
> /*
> - * FD of the index file for this stream.
> + * Index file object of the index file for this stream.
> */
> - int index_fd;
> + struct lttng_index_file *index_file;
>
> /*
> * Local pipe to extract data when using splice.
> diff --git a/src/common/index/ctf-index.h b/src/common/index/ctf-index.h
> index 8755f12..b73340f 100644
> --- a/src/common/index/ctf-index.h
> +++ b/src/common/index/ctf-index.h
> @@ -60,4 +60,43 @@ struct ctf_packet_index {
> uint64_t packet_seq_num; /* packet sequence number */
> } __attribute__((__packed__));
>
> +static inline size_t ctf_packet_index_len(uint32_t major, uint32_t minor)
> +{
> + if (major == 1) {
> + switch (minor) {
> + case 0:
> + return offsetof(struct ctf_packet_index, stream_id)
> + + member_sizeof(struct ctf_packet_index,
> + stream_id);
> + case 1:
> + return offsetof(struct ctf_packet_index, packet_seq_num)
> + + member_sizeof(struct ctf_packet_index,
> + packet_seq_num);
> + default:
> + abort();
> + }
> + }
> + abort();
> +}
> +
> +static inline uint32_t lttng_to_index_major(uint32_t lttng_major,
> + uint32_t lttng_minor)
> +{
> + if (lttng_major == 2)
> + return 1;
Missing curly braces.
> + abort();
> +}
> +
> +static inline uint32_t lttng_to_index_minor(uint32_t lttng_major,
> + uint32_t lttng_minor)
> +{
> + if (lttng_major == 2) {
> + if (lttng_minor < 8)
> + return 0;
> + else
> + return 1;
Missing curly braces.
> + }
> + abort();
> +}
> +
> #endif /* LTTNG_INDEX_H */
> diff --git a/src/common/index/index.c b/src/common/index/index.c
> index 066618e..ba69cdc 100644
> --- a/src/common/index/index.c
> +++ b/src/common/index/index.c
> @@ -1,6 +1,7 @@
> /*
> * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> + * 2016 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify it
> * under the terms of the GNU General Public License, version 2 only, as
> @@ -32,15 +33,24 @@
> /*
> * Create the index file associated with a trace file.
> *
> - * Return fd on success, a negative value on error.
> + * Return allocated struct lttng_index_file, NULL on error.
> */
> -int index_create_file(char *path_name, char *stream_name, int uid, int gid,
> - uint64_t size, uint64_t count)
> +struct lttng_index_file *lttng_index_file_create(char *path_name,
> + char *stream_name, int uid, int gid,
> + uint64_t size, uint64_t count, uint32_t major, uint32_t minor)
> {
> + struct lttng_index_file *index_file;
> int ret, fd = -1;
> ssize_t size_ret;
> struct ctf_packet_index_file_hdr hdr;
> char fullpath[PATH_MAX];
> + uint32_t element_len = ctf_packet_index_len(major, minor);
> +
> + index_file = zmalloc(sizeof(*index_file));
> + if (!index_file) {
> + PERROR("out of memory");
Changed message to "allocating lttng_index_file" to provide some
context on error.
> + goto error;
> + }
>
> ret = snprintf(fullpath, sizeof(fullpath), "%s/" DEFAULT_INDEX_DIR,
> path_name);
> @@ -79,9 +89,9 @@ int index_create_file(char *path_name, char *stream_name, int uid, int gid,
> fd = ret;
>
> hdr.magic = htobe32(CTF_INDEX_MAGIC);
> - hdr.index_major = htobe32(CTF_INDEX_MAJOR);
> - hdr.index_minor = htobe32(CTF_INDEX_MINOR);
> - hdr.packet_index_len = htobe32(sizeof(struct ctf_packet_index));
> + hdr.index_major = htobe32(major);
> + hdr.index_minor = htobe32(minor);
> + hdr.packet_index_len = htobe32(element_len);
>
> size_ret = lttng_write(fd, &hdr, sizeof(hdr));
> if (size_ret < sizeof(hdr)) {
> @@ -89,8 +99,13 @@ int index_create_file(char *path_name, char *stream_name, int uid, int gid,
> ret = -1;
> goto error;
> }
> + index_file->fd = fd;
> + index_file->major = major;
> + index_file->minor = minor;
> + index_file->element_len = element_len;
> + urcu_ref_init(&index_file->ref);
>
> - return fd;
> + return index_file;
>
> error:
> if (fd >= 0) {
> @@ -101,51 +116,94 @@ error:
> PERROR("close index fd");
> }
> }
> - return ret;
> + free(index_file);
> + return NULL;
> }
>
> /*
> - * Write index values to the given fd of size len.
> + * Write index values to the given index file.
> *
> - * Return "len" on success or else < len on error. errno contains error
> - * details.
> + * Return 0 on success, -1 on error.
> */
> -ssize_t index_write(int fd, struct ctf_packet_index *index, size_t len)
> +int lttng_index_file_write(const struct lttng_index_file *index_file,
> + const struct ctf_packet_index *element)
> {
> ssize_t ret;
> + int fd = index_file->fd;
> + size_t len = index_file->element_len;
>
> - assert(index);
> + assert(element);
>
> if (fd < 0) {
> - ret = -EINVAL;
> goto error;
> }
>
> - ret = lttng_write(fd, index, len);
> + ret = lttng_write(fd, element, len);
> if (ret < len) {
> PERROR("writing index file");
> + goto error;
> + }
> + return 0;
> +
> +error:
> + return -1;
> +}
> +
> +/*
> + * Read index values from the given index file.
> + *
> + * Return 0 on success, -1 on error.
> + */
> +int lttng_index_file_read(const struct lttng_index_file *index_file,
> + struct ctf_packet_index *element)
> +{
> + ssize_t ret;
> + int fd = index_file->fd;
> + size_t len = index_file->element_len;
> +
> + assert(element);
> +
> + if (fd < 0) {
> + goto error;
> + }
> +
> + ret = lttng_read(fd, element, len);
> + if (ret < len) {
> + PERROR("read index file");
> + goto error;
> }
> + return 0;
>
> error:
> - return ret;
> + return -1;
> }
>
> /*
> * Open index file using a given path, channel name and tracefile count.
> *
> - * Return read only FD on success or else a negative value.
> + * Return allocated struct lttng_index_file, NULL on error.
> */
> -int index_open(const char *path_name, const char *channel_name,
> - uint64_t tracefile_count, uint64_t tracefile_count_current)
> +struct lttng_index_file *lttng_index_file_open(const char *path_name,
> + const char *channel_name, uint64_t tracefile_count,
> + uint64_t tracefile_count_current)
> {
> + struct lttng_index_file *index_file;
> int ret, read_fd;
> ssize_t read_len;
> char fullpath[PATH_MAX];
> struct ctf_packet_index_file_hdr hdr;
> + uint32_t major, minor, element_len;
>
> assert(path_name);
> assert(channel_name);
>
> + index_file = zmalloc(sizeof(*index_file));
> + if (!index_file) {
> + PERROR("out of memory");
Changed message to "allocating lttng_index_file" to provide some
context on error.
> + goto error;
> + }
> +
> +
Removed extra empty line here.
> if (tracefile_count > 0) {
> ret = snprintf(fullpath, sizeof(fullpath), "%s/" DEFAULT_INDEX_DIR "/%s_%"
> PRIu64 DEFAULT_INDEX_FILE_SUFFIX, path_name,
> @@ -180,13 +238,22 @@ int index_open(const char *path_name, const char *channel_name,
> ERR("Invalid header magic");
> goto error_close;
> }
> - if (be32toh(hdr.index_major) != CTF_INDEX_MAJOR ||
> - be32toh(hdr.index_minor) != CTF_INDEX_MINOR) {
> + major = be32toh(hdr.index_major);
> + minor = be32toh(hdr.index_minor);
> + element_len = be32toh(hdr.packet_index_len);
> +
> + if (major != CTF_INDEX_MAJOR) {
> ERR("Invalid header version");
> goto error_close;
> }
>
> - return read_fd;
> + index_file->fd = read_fd;
> + index_file->major = major;
> + index_file->minor = minor;
> + index_file->element_len = element_len;
> + urcu_ref_init(&index_file->ref);
> +
> + return index_file;
>
> error_close:
> if (read_fd >= 0) {
> @@ -200,5 +267,28 @@ error_close:
> ret = -1;
>
> error:
> - return ret;
> + free(index_file);
> + return NULL;
> +}
> +
> +
Removed extra empty line.
> +void lttng_index_file_get(struct lttng_index_file *index_file)
> +{
> + urcu_ref_get(&index_file->ref);
> +}
> +
> +static void lttng_index_file_release(struct urcu_ref *ref)
> +{
> + struct lttng_index_file *index_file = caa_container_of(ref,
> + struct lttng_index_file, ref);
> +
> + if (close(index_file->fd)) {
> + PERROR("close index fd");
> + }
> + free(index_file);
> +}
> +
> +void lttng_index_file_put(struct lttng_index_file *index_file)
> +{
> + urcu_ref_put(&index_file->ref, lttng_index_file_release);
> }
> diff --git a/src/common/index/index.h b/src/common/index/index.h
> index a7e6aee..7020936 100644
> --- a/src/common/index/index.h
> +++ b/src/common/index/index.h
> @@ -1,6 +1,7 @@
> /*
> * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> + * 2016 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify it
> * under the terms of the GNU General Public License, version 2 only, as
> @@ -20,13 +21,34 @@
> #define _INDEX_H
>
> #include <inttypes.h>
> +#include <urcu/ref.h>
>
> #include "ctf-index.h"
>
> -int index_create_file(char *path_name, char *stream_name, int uid, int gid,
> - uint64_t size, uint64_t count);
> -ssize_t index_write(int fd, struct ctf_packet_index *index, size_t len);
> -int index_open(const char *path_name, const char *channel_name,
> - uint64_t tracefile_count, uint64_t tracefile_count_current);
> +struct lttng_index_file {
> + int fd;
> + uint32_t major;
> + uint32_t minor;
> + uint32_t element_len;
> + struct urcu_ref ref;
> +};
> +
> +/*
> + * create and open have refcount of 1. Use put to decrement the
> + * refcount. Destroys when reaching 0. Use "get" to increment refcount.
> + */
> +struct lttng_index_file *lttng_index_file_create(char *path_name,
> + char *stream_name, int uid, int gid, uint64_t size,
> + uint64_t count, uint32_t major, uint32_t minor);
> +struct lttng_index_file *lttng_index_file_open(const char *path_name,
> + const char *channel_name, uint64_t tracefile_count,
> + uint64_t tracefile_count_current);
> +int lttng_index_file_write(const struct lttng_index_file *index_file,
> + const struct ctf_packet_index *element);
> +int lttng_index_file_read(const struct lttng_index_file *index_file,
> + struct ctf_packet_index *element);
> +
> +void lttng_index_file_get(struct lttng_index_file *index_file);
> +void lttng_index_file_put(struct lttng_index_file *index_file);
>
> #endif /* _INDEX_H */
> diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c
> index a8abcd7..d03ff70 100644
> --- a/src/common/kernel-consumer/kernel-consumer.c
> +++ b/src/common/kernel-consumer/kernel-consumer.c
> @@ -1475,14 +1475,17 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
> stream->tracefile_size_current = 0;
>
> if (!stream->metadata_flag) {
> - ret = index_create_file(stream->chan->pathname,
> + struct lttng_index_file *index_file;
> +
> + index_file = lttng_index_file_create(stream->chan->pathname,
> stream->name, stream->uid, stream->gid,
> stream->chan->tracefile_size,
> - stream->tracefile_count_current);
> - if (ret < 0) {
> + stream->tracefile_count_current,
> + CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
> + if (!index_file) {
> goto error;
> }
> - stream->index_fd = ret;
> + stream->index_file = index_file;
> }
> }
>
> diff --git a/src/common/macros.h b/src/common/macros.h
> index 0e11d3c..8ae6535 100644
> --- a/src/common/macros.h
> +++ b/src/common/macros.h
> @@ -78,6 +78,8 @@ void *zmalloc(size_t len)
> #define LTTNG_HIDDEN __attribute__((visibility("hidden")))
> #endif
>
> +#define member_sizeof(type, field) sizeof(((type *) 0)->field)
> +
> /*
> * lttng_strncpy returns 0 on success, or nonzero on failure.
> * It checks that the @src string fits into @dst_len before performing
> diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c
> index 7f0ea74..ec0111e 100644
> --- a/src/common/relayd/relayd.c
> +++ b/src/common/relayd/relayd.c
> @@ -856,7 +856,10 @@ int relayd_send_index(struct lttcomm_relayd_sock *rsock,
> }
>
> /* Send command */
> - ret = send_command(rsock, RELAYD_SEND_INDEX, &msg, sizeof(msg), 0);
> + ret = send_command(rsock, RELAYD_SEND_INDEX, &msg,
> + lttcomm_relayd_index_len(lttng_to_index_major(rsock->major,
> + rsock->minor),
> + lttng_to_index_minor(rsock->major, rsock->minor)), 0);
> if (ret < 0) {
> goto error;
> }
> diff --git a/src/common/sessiond-comm/relayd.h b/src/common/sessiond-comm/relayd.h
> index 1fc48c4..eb3e5f1 100644
> --- a/src/common/sessiond-comm/relayd.h
> +++ b/src/common/sessiond-comm/relayd.h
> @@ -159,10 +159,30 @@ struct lttcomm_relayd_index {
> uint64_t timestamp_end;
> uint64_t events_discarded;
> uint64_t stream_id;
> + /* 2.8+ */
> uint64_t stream_instance_id;
> uint64_t packet_seq_num;
> } LTTNG_PACKED;
>
> +static inline size_t lttcomm_relayd_index_len(uint32_t major, uint32_t minor)
> +{
> + if (major == 1) {
> + switch (minor) {
> + case 0:
> + return offsetof(struct lttcomm_relayd_index, stream_id)
> + + member_sizeof(struct lttcomm_relayd_index,
> + stream_id);
> + case 1:
> + return offsetof(struct lttcomm_relayd_index, packet_seq_num)
> + + member_sizeof(struct lttcomm_relayd_index,
> + packet_seq_num);
> + default:
> + abort();
> + }
> + }
> + abort();
> +}
> +
> /*
> * Create session in 2.4 adds additionnal parameters for live reading.
> */
> diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c
> index 76eee2a..82d56eb 100644
> --- a/src/common/ust-consumer/ust-consumer.c
> +++ b/src/common/ust-consumer/ust-consumer.c
> @@ -2620,14 +2620,17 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
> stream->tracefile_size_current = 0;
>
> if (!stream->metadata_flag) {
> - ret = index_create_file(stream->chan->pathname,
> + struct lttng_index_file *index_file;
> +
> + index_file = lttng_index_file_create(stream->chan->pathname,
> stream->name, stream->uid, stream->gid,
> stream->chan->tracefile_size,
> - stream->tracefile_count_current);
> - if (ret < 0) {
> + stream->tracefile_count_current,
> + CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
> + if (!index_file) {
> goto error;
> }
> - stream->index_fd = ret;
> + stream->index_file = index_file;
> }
> }
> ret = 0;
> --
> 2.1.4
>
--
Jérémie Galarneau
EfficiOS Inc.
http://www.efficios.com
More information about the lttng-dev
mailing list