[lttng-dev] [PATCH lttng-tools] Fix: relayd vs consumerd compatibility

Jérémie Galarneau jeremie.galarneau at efficios.com
Fri Dec 16 09:50:03 UTC 2016


Merged in master, stable-2.9 and stable-2.8 with minor changes. Read on.

Thanks!
Jérémie

On 15 December 2016 at 05:04, Mathieu Desnoyers
<mathieu.desnoyers at efficios.com> wrote:
> relay and consumerd 2.7 and 2.8 are expected to negociate compatibility
> with the lowest common minor version.
>
> If a consumer daemon 2.8 interacts with a relayd 2.7, it needs to send
> the index fields for ctf index 1.0. Same if a relayd 2.8 interacts with
> a consumer daemon 2.7: relayd should expect ctf index 1.0 fields, and
> generate a ctf index 1.0 index file layout.
>
> If both relayd and consumerd versions are 2.8+, then we can send the ctf
> index 1.1 fields over the protocol, and store them in the index files.
>
> Whenever the relayd live viewer server opens and reads an index file,
> it needs to use the file's header to figure out the index "element"
> size.
>
> [ Should be applied to master, stable-2.9, stable-2.8. ]
>
> Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> ---
>  src/bin/lttng-relayd/index.c                 |  30 +++---
>  src/bin/lttng-relayd/index.h                 |  11 ++-
>  src/bin/lttng-relayd/live.c                  |  34 +++----
>  src/bin/lttng-relayd/main.c                  |  45 +++++----
>  src/bin/lttng-relayd/stream.c                |   6 +-
>  src/bin/lttng-relayd/stream.h                |   4 +-
>  src/bin/lttng-relayd/viewer-stream.c         |  47 ++++-----
>  src/bin/lttng-relayd/viewer-stream.h         |   4 +-
>  src/common/consumer/consumer-stream.c        |  20 ++--
>  src/common/consumer/consumer.c               |  37 +++-----
>  src/common/consumer/consumer.h               |   4 +-
>  src/common/index/ctf-index.h                 |  39 ++++++++
>  src/common/index/index.c                     | 136 ++++++++++++++++++++++-----
>  src/common/index/index.h                     |  32 ++++++-
>  src/common/kernel-consumer/kernel-consumer.c |  11 ++-
>  src/common/macros.h                          |   2 +
>  src/common/relayd/relayd.c                   |   5 +-
>  src/common/sessiond-comm/relayd.h            |  20 ++++
>  src/common/ust-consumer/ust-consumer.c       |  11 ++-
>  19 files changed, 318 insertions(+), 180 deletions(-)
>
> diff --git a/src/bin/lttng-relayd/index.c b/src/bin/lttng-relayd/index.c
> index 80a4bb9..b15bbcd 100644
> --- a/src/bin/lttng-relayd/index.c
> +++ b/src/bin/lttng-relayd/index.c
> @@ -166,18 +166,19 @@ end:
>         return index;
>  }
>
> -int relay_index_set_fd(struct relay_index *index, struct stream_fd *index_fd,
> +int relay_index_set_file(struct relay_index *index,
> +               struct lttng_index_file *index_file,
>                 uint64_t data_offset)
>  {
>         int ret = 0;
>
>         pthread_mutex_lock(&index->lock);
> -       if (index->index_fd) {
> +       if (index->index_file) {
>                 ret = -1;
>                 goto end;
>         }
> -       stream_fd_get(index_fd);
> -       index->index_fd = index_fd;
> +       lttng_index_file_get(index_file);
> +       index->index_file = index_file;
>         index->index_data.offset = data_offset;
>  end:
>         pthread_mutex_unlock(&index->lock);
> @@ -228,9 +229,9 @@ static void index_release(struct urcu_ref *ref)
>         int ret;
>         struct lttng_ht_iter iter;
>
> -       if (index->index_fd) {
> -               stream_fd_put(index->index_fd);
> -               index->index_fd = NULL;
> +       if (index->index_file) {
> +               lttng_index_file_put(index->index_file);
> +               index->index_file = NULL;
>         }
>         if (index->in_hash_table) {
>                 /* Delete index from hash table. */
> @@ -290,21 +291,16 @@ int relay_index_try_flush(struct relay_index *index)
>                 goto skip;
>         }
>         /* Check if we are ready to flush. */
> -       if (!index->has_index_data || !index->index_fd) {
> +       if (!index->has_index_data || !index->index_file) {
>                 goto skip;
>         }
> -       fd = index->index_fd->fd;
> +       fd = index->index_file->fd;
>         DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
>                         " on fd %d", index->stream->stream_handle,
>                         index->index_n.key, fd);
>         flushed = true;
>         index->flushed = true;
> -       ret = index_write(fd, &index->index_data, sizeof(index->index_data));
> -       if (ret == sizeof(index->index_data)) {
> -               ret = 0;
> -       } else {
> -               ret = -1;
> -       }
> +       ret = lttng_index_file_write(index->index_file, &index->index_data);
>  skip:
>         pthread_mutex_unlock(&index->lock);
>
> @@ -341,11 +337,11 @@ void relay_index_close_partial_fd(struct relay_stream *stream)
>         rcu_read_lock();
>         cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
>                         index, index_n.node) {
> -               if (!index->index_fd) {
> +               if (!index->index_file) {
>                         continue;
>                 }
>                 /*
> -                * Partial index has its index_fd: we have only
> +                * Partial index has its index_file: we have only
>                  * received its info from the data socket.
>                  * Put self-ref from index.
>                  */
> diff --git a/src/bin/lttng-relayd/index.h b/src/bin/lttng-relayd/index.h
> index 15c4ac8..80fe86a 100644
> --- a/src/bin/lttng-relayd/index.h
> +++ b/src/bin/lttng-relayd/index.h
> @@ -40,10 +40,10 @@ struct relay_index {
>
>         pthread_mutex_t lock;
>         /*
> -        * FD on which to write the index data. May differ from
> -        * stream->index_fd due to tracefile rotation.
> +        * index file on which to write the index data. May differ from
> +        * stream->index_file due to tracefile rotation.
>          */
> -       struct stream_fd *index_fd;
> +       struct lttng_index_file *index_file;
>
>         /* Index packet data. This is the data that is written on disk. */
>         struct ctf_packet_index index_data;
> @@ -64,8 +64,9 @@ struct relay_index {
>  struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
>                 uint64_t net_seq_num);
>  void relay_index_put(struct relay_index *index);
> -int relay_index_set_fd(struct relay_index *index, struct stream_fd *index_fd,
> -                uint64_t data_offset);
> +int relay_index_set_file(struct relay_index *index,
> +               struct lttng_index_file *index_file,
> +               uint64_t data_offset);
>  int relay_index_set_data(struct relay_index *index,
>                  const struct ctf_packet_index *data);
>  int relay_index_try_flush(struct relay_index *index);
> diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c
> index 2c3a105..1d0492c 100644
> --- a/src/bin/lttng-relayd/live.c
> +++ b/src/bin/lttng-relayd/live.c
> @@ -1122,7 +1122,7 @@ error:
>  /*
>   * Open the index file if needed for the given vstream.
>   *
> - * If an index file is successfully opened, the vstream index_fd set with
> + * If an index file is successfully opened, the vstream index_file set with

Slightly modified to make it a parse-able sentence by the same occasion.

>   * it.
>   *
>   * Return 0 on success, a negative value on error (-ENOENT if not ready yet).
> @@ -1133,8 +1133,9 @@ static int try_open_index(struct relay_viewer_stream *vstream,
>                 struct relay_stream *rstream)
>  {
>         int ret = 0;
> +       struct lttng_index_file *index_file;
>
> -       if (vstream->index_fd) {
> +       if (vstream->index_file) {
>                 goto end;
>         }
>
> @@ -1145,20 +1146,15 @@ static int try_open_index(struct relay_viewer_stream *vstream,
>                 ret = -ENOENT;
>                 goto end;
>         }
> -       ret = index_open(vstream->path_name, vstream->channel_name,
> +       index_file = lttng_index_file_open(vstream->path_name,
> +                       vstream->channel_name,
>                         vstream->stream->tracefile_count,
>                         vstream->current_tracefile_id);
> -       if (ret >= 0) {
> -               vstream->index_fd = stream_fd_create(ret);
> -               if (!vstream->index_fd) {
> -                       if (close(ret)) {
> -                               PERROR("close");
> -                       }
> -                       ret = -1;
> -               } else {
> -                       ret = 0;
> -               }
> -               goto end;
> +       if (index_file) {
> +               vstream->index_file = index_file;
> +               ret = 0;
> +       } else {
> +               ret = -1;

Since you simplified the code quite a bit here, it makes sense to
eliminate "index_file" altogether.

Changed to:

vstream->index_file = lttng_index_file_open....
if (!vstream->index_file) {
    ret = -1;
}

end:
return ret;

>         }
>
>  end:
> @@ -1277,7 +1273,6 @@ static
>  int viewer_get_next_index(struct relay_connection *conn)
>  {
>         int ret;
> -       ssize_t read_ret;
>         struct lttng_viewer_get_next_index request_index;
>         struct lttng_viewer_index viewer_index;
>         struct ctf_packet_index packet_index;
> @@ -1400,11 +1395,10 @@ int viewer_get_next_index(struct relay_connection *conn)
>                 viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
>         }
>
> -       read_ret = lttng_read(vstream->index_fd->fd, &packet_index,
> -                       sizeof(packet_index));
> -       if (read_ret < sizeof(packet_index)) {
> -               ERR("Relay reading index file %d returned %zd",
> -                       vstream->index_fd->fd, read_ret);
> +       ret = lttng_index_file_read(vstream->index_file, &packet_index);
> +       if (ret) {
> +               ERR("Relay error reading index file %d",
> +                       vstream->index_file->fd);
>                 viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
>                 goto send_reply;
>         } else {
> diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c
> index 9c0e2b1..43603a9 100644
> --- a/src/bin/lttng-relayd/main.c
> +++ b/src/bin/lttng-relayd/main.c
> @@ -1946,6 +1946,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
>         struct lttcomm_relayd_generic_reply reply;
>         struct relay_stream *stream;
>         uint64_t net_seq_num;
> +       size_t msg_len;
>
>         assert(conn);
>
> @@ -1957,9 +1958,12 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
>                 goto end_no_session;
>         }
>
> +       msg_len = lttcomm_relayd_index_len(lttng_to_index_major(conn->major,
> +                                               conn->minor),

Changed the indentation here.

> +               lttng_to_index_minor(conn->major, conn->minor));
>         ret = conn->sock->ops->recvmsg(conn->sock, &index_info,
> -                       sizeof(index_info), 0);
> -       if (ret < sizeof(index_info)) {
> +                       msg_len, 0);
> +       if (ret < msg_len) {
>                 if (ret == 0) {
>                         /* Orderly shutdown. Not necessary to print an error. */
>                         DBG("Socket %d did an orderly shutdown", conn->sock->fd);
> @@ -2183,38 +2187,31 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
>                 goto end;
>         }
>
> -       if (rotate_index || !stream->index_fd) {
> -               int fd;
> +       if (rotate_index || !stream->index_file) {
> +               uint32_t major, minor;
>
> -               /* Put ref on previous index_fd. */
> -               if (stream->index_fd) {
> -                       stream_fd_put(stream->index_fd);
> -                       stream->index_fd = NULL;
> +               /* Put ref on previous index_file. */
> +               if (stream->index_file) {
> +                       lttng_index_file_put(stream->index_file);
> +                       stream->index_file = NULL;
>                 }
> -
> -               fd = index_create_file(stream->path_name, stream->channel_name,
> +               major = stream->trace->session->major;
> +               minor = stream->trace->session->minor;
> +               stream->index_file = lttng_index_file_create(stream->path_name,
> +                               stream->channel_name,
>                                 -1, -1, stream->tracefile_size,
> -                               tracefile_array_get_file_index_head(stream->tfa));
> -               if (fd < 0) {
> +                               tracefile_array_get_file_index_head(stream->tfa),
> +                               lttng_to_index_major(major, minor),
> +                               lttng_to_index_minor(major, minor));
> +               if (!stream->index_file) {
>                         ret = -1;
>                         /* Put self-ref for this index due to error. */
>                         relay_index_put(index);
>                         goto end;
>                 }
> -               stream->index_fd = stream_fd_create(fd);
> -               if (!stream->index_fd) {
> -                       ret = -1;
> -                       if (close(fd)) {
> -                               PERROR("Error closing FD %d", fd);
> -                       }
> -                       /* Put self-ref for this index due to error. */
> -                       relay_index_put(index);
> -                       /* Will put the local ref. */
> -                       goto end;
> -               }
>         }
>
> -       if (relay_index_set_fd(index, stream->index_fd, data_offset)) {
> +       if (relay_index_set_file(index, stream->index_file, data_offset)) {
>                 ret = -1;
>                 /* Put self-ref for this index due to error. */
>                 relay_index_put(index);
> diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c
> index 335a1cf..c59bb94 100644
> --- a/src/bin/lttng-relayd/stream.c
> +++ b/src/bin/lttng-relayd/stream.c
> @@ -306,9 +306,9 @@ static void stream_release(struct urcu_ref *ref)
>                 stream_fd_put(stream->stream_fd);
>                 stream->stream_fd = NULL;
>         }
> -       if (stream->index_fd) {
> -               stream_fd_put(stream->index_fd);
> -               stream->index_fd = NULL;
> +       if (stream->index_file) {
> +               lttng_index_file_put(stream->index_file);
> +               stream->index_file = NULL;
>         }
>         if (stream->trace) {
>                 ctf_trace_put(stream->trace);
> diff --git a/src/bin/lttng-relayd/stream.h b/src/bin/lttng-relayd/stream.h
> index 5030e5d..d471c7b 100644
> --- a/src/bin/lttng-relayd/stream.h
> +++ b/src/bin/lttng-relayd/stream.h
> @@ -58,8 +58,8 @@ struct relay_stream {
>
>         /* FD on which to write the stream data. */
>         struct stream_fd *stream_fd;
> -       /* FD on which to write the index data. */
> -       struct stream_fd *index_fd;
> +       /* index file on which to write the index data. */
> +       struct lttng_index_file *index_file;
>
>         char *path_name;
>         char *channel_name;
> diff --git a/src/bin/lttng-relayd/viewer-stream.c b/src/bin/lttng-relayd/viewer-stream.c
> index 7c59cd0..8a3b09a 100644
> --- a/src/bin/lttng-relayd/viewer-stream.c
> +++ b/src/bin/lttng-relayd/viewer-stream.c
> @@ -116,29 +116,21 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
>          * the opening of the index, otherwise open it right now.
>          */
>         if (stream->index_received_seqcount == 0) {
> -               vstream->index_fd = NULL;
> +               vstream->index_file = NULL;
>         } else {
> -               int read_fd;
> -
> -               read_fd = index_open(vstream->path_name, vstream->channel_name,
> +               vstream->index_file = lttng_index_file_open(vstream->path_name,
> +                               vstream->channel_name,
>                                 stream->tracefile_count,
>                                 vstream->current_tracefile_id);
> -               if (read_fd < 0) {
> -                       goto error_unlock;
> -               }
> -               vstream->index_fd = stream_fd_create(read_fd);
> -               if (!vstream->index_fd) {
> -                       if (close(read_fd)) {
> -                               PERROR("close");
> -                       }
> +               if (!vstream->index_file) {
>                         goto error_unlock;
>                 }
>         }
>
> -       if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_fd) {
> +       if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_file) {
>                 off_t lseek_ret;
>
> -               lseek_ret = lseek(vstream->index_fd->fd, 0, SEEK_END);
> +               lseek_ret = lseek(vstream->index_file->fd, 0, SEEK_END);
>                 if (lseek_ret < 0) {
>                         goto error_unlock;
>                 }
> @@ -192,9 +184,9 @@ static void viewer_stream_release(struct urcu_ref *ref)
>                 stream_fd_put(vstream->stream_fd);
>                 vstream->stream_fd = NULL;
>         }
> -       if (vstream->index_fd) {
> -               stream_fd_put(vstream->index_fd);
> -               vstream->index_fd = NULL;
> +       if (vstream->index_file) {
> +               lttng_index_file_put(vstream->index_file);
> +               vstream->index_file = NULL;
>         }
>         if (vstream->stream) {
>                 stream_put(vstream->stream);
> @@ -305,29 +297,24 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream)
>                 vstream->index_sent_seqcount = seq_tail;
>         }
>
> -       if (vstream->index_fd) {
> -               stream_fd_put(vstream->index_fd);
> -               vstream->index_fd = NULL;
> +       if (vstream->index_file) {
> +               lttng_index_file_put(vstream->index_file);
> +               vstream->index_file = NULL;
>         }
>         if (vstream->stream_fd) {
>                 stream_fd_put(vstream->stream_fd);
>                 vstream->stream_fd = NULL;
>         }
>
> -       ret = index_open(vstream->path_name, vstream->channel_name,
> +       vstream->index_file = lttng_index_file_open(vstream->path_name,
> +                       vstream->channel_name,
>                         stream->tracefile_count,
>                         vstream->current_tracefile_id);
> -       if (ret < 0) {
> +       if (!vstream->index_file) {
> +               ret = -1;
>                 goto end;
> -       }
> -       vstream->index_fd = stream_fd_create(ret);
> -       if (vstream->index_fd) {
> -               ret = 0;
>         } else {
> -               if (close(ret)) {
> -                       PERROR("close");
> -               }
> -               ret = -1;
> +               ret = 0;
>         }
>  end:
>         return ret;
> diff --git a/src/bin/lttng-relayd/viewer-stream.h b/src/bin/lttng-relayd/viewer-stream.h
> index 5dc135d..2514b17 100644
> --- a/src/bin/lttng-relayd/viewer-stream.h
> +++ b/src/bin/lttng-relayd/viewer-stream.h
> @@ -52,8 +52,8 @@ struct relay_viewer_stream {
>
>         /* FD from which to read the stream data. */
>         struct stream_fd *stream_fd;
> -       /* FD from which to read the index data. */
> -       struct stream_fd *index_fd;
> +       /* index file from which to read the index data. */
> +       struct lttng_index_file *index_file;
>
>         char *path_name;
>         char *channel_name;
> diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c
> index a62cef2..2167f73 100644
> --- a/src/common/consumer/consumer-stream.c
> +++ b/src/common/consumer/consumer-stream.c
> @@ -163,12 +163,8 @@ void consumer_stream_close(struct lttng_consumer_stream *stream)
>                 stream->out_fd = -1;
>         }
>
> -       if (stream->index_fd >= 0) {
> -               ret = close(stream->index_fd);
> -               if (ret) {
> -                       PERROR("close stream index_fd");
> -               }
> -               stream->index_fd = -1;
> +       if (stream->index_file) {
> +               lttng_index_file_put(stream->index_file);

stream->index_file = NULL;

>         }
>
>         /* Check and cleanup relayd if needed. */
> @@ -359,27 +355,23 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
>   * Return 0 on success or else a negative value.
>   */
>  int consumer_stream_write_index(struct lttng_consumer_stream *stream,
> -               struct ctf_packet_index *index)
> +               struct ctf_packet_index *element)
>  {
>         int ret;
>         struct consumer_relayd_sock_pair *relayd;
>
>         assert(stream);
> -       assert(index);
> +       assert(element);
>
>         rcu_read_lock();
>         relayd = consumer_find_relayd(stream->net_seq_idx);
>         if (relayd) {
>                 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
> -               ret = relayd_send_index(&relayd->control_sock, index,
> +               ret = relayd_send_index(&relayd->control_sock, element,
>                                 stream->relayd_stream_id, stream->next_net_seq_num - 1);
>                 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
>         } else {
> -               ssize_t size_ret;
> -
> -               size_ret = index_write(stream->index_fd, index,
> -                               sizeof(struct ctf_packet_index));
> -               if (size_ret < sizeof(struct ctf_packet_index)) {
> +               if (lttng_index_file_write(stream->index_file, element)) {
>                         ret = -1;
>                 } else {
>                         ret = 0;
> diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c
> index a184afd..929be65 100644
> --- a/src/common/consumer/consumer.c
> +++ b/src/common/consumer/consumer.c
> @@ -570,7 +570,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
>         stream->session_id = session_id;
>         stream->monitor = monitor;
>         stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
> -       stream->index_fd = -1;
> +       stream->index_file = NULL;
>         stream->last_sequence_number = -1ULL;
>         pthread_mutex_init(&stream->lock, NULL);
>         pthread_mutex_init(&stream->metadata_timer_lock, NULL);
> @@ -1624,21 +1624,16 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
>                         }
>                         outfd = stream->out_fd;
>
> -                       if (stream->index_fd >= 0) {
> -                               ret = close(stream->index_fd);
> -                               if (ret < 0) {
> -                                       PERROR("Closing index");
> -                                       goto end;
> -                               }
> -                               stream->index_fd = -1;
> -                               ret = index_create_file(stream->chan->pathname,
> +                       if (stream->index_file) {
> +                               lttng_index_file_put(stream->index_file);
> +                               stream->index_file = lttng_index_file_create(stream->chan->pathname,
>                                                 stream->name, stream->uid, stream->gid,
>                                                 stream->chan->tracefile_size,
> -                                               stream->tracefile_count_current);
> -                               if (ret < 0) {
> +                                               stream->tracefile_count_current,
> +                                               CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
> +                               if (!stream->index_file) {
>                                         goto end;
>                                 }
> -                               stream->index_fd = ret;
>                         }
>
>                         /* Reset current size because we just perform a rotation. */
> @@ -1831,22 +1826,16 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
>                         }
>                         outfd = stream->out_fd;
>
> -                       if (stream->index_fd >= 0) {
> -                               ret = close(stream->index_fd);
> -                               if (ret < 0) {
> -                                       PERROR("Closing index");
> -                                       goto end;
> -                               }
> -                               stream->index_fd = -1;
> -                               ret = index_create_file(stream->chan->pathname,
> +                       if (stream->index_file) {
> +                               lttng_index_file_put(stream->index_file);
> +                               stream->index_file = lttng_index_file_create(stream->chan->pathname,
>                                                 stream->name, stream->uid, stream->gid,
>                                                 stream->chan->tracefile_size,
> -                                               stream->tracefile_count_current);
> -                               if (ret < 0) {
> -                                       written = ret;
> +                                               stream->tracefile_count_current,
> +                                               CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
> +                               if (!stream->index_file) {
>                                         goto end;
>                                 }
> -                               stream->index_fd = ret;
>                         }
>
>                         /* Reset current size because we just perform a rotation. */
> diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h
> index 256ea19..acefacb 100644
> --- a/src/common/consumer/consumer.h
> +++ b/src/common/consumer/consumer.h
> @@ -390,9 +390,9 @@ struct lttng_consumer_stream {
>         /* Copy of the sequence number of the last packet extracted. */
>         uint64_t last_sequence_number;
>         /*
> -        * FD of the index file for this stream.
> +        * Index file object of the index file for this stream.
>          */
> -       int index_fd;
> +       struct lttng_index_file *index_file;
>
>         /*
>          * Local pipe to extract data when using splice.
> diff --git a/src/common/index/ctf-index.h b/src/common/index/ctf-index.h
> index 8755f12..b73340f 100644
> --- a/src/common/index/ctf-index.h
> +++ b/src/common/index/ctf-index.h
> @@ -60,4 +60,43 @@ struct ctf_packet_index {
>         uint64_t packet_seq_num;        /* packet sequence number */
>  } __attribute__((__packed__));
>
> +static inline size_t ctf_packet_index_len(uint32_t major, uint32_t minor)
> +{
> +       if (major == 1) {
> +               switch (minor) {
> +               case 0:
> +                       return offsetof(struct ctf_packet_index, stream_id)
> +                               + member_sizeof(struct ctf_packet_index,
> +                                               stream_id);
> +               case 1:
> +                       return offsetof(struct ctf_packet_index, packet_seq_num)
> +                               + member_sizeof(struct ctf_packet_index,
> +                                               packet_seq_num);
> +               default:
> +                       abort();
> +               }
> +       }
> +       abort();
> +}
> +
> +static inline uint32_t lttng_to_index_major(uint32_t lttng_major,
> +               uint32_t lttng_minor)
> +{
> +       if (lttng_major == 2)
> +               return 1;

Missing curly braces.

> +       abort();
> +}
> +
> +static inline uint32_t lttng_to_index_minor(uint32_t lttng_major,
> +               uint32_t lttng_minor)
> +{
> +       if (lttng_major == 2) {
> +               if (lttng_minor < 8)
> +                       return 0;
> +               else
> +                       return 1;

Missing curly braces.

> +       }
> +       abort();
> +}
> +
>  #endif /* LTTNG_INDEX_H */
> diff --git a/src/common/index/index.c b/src/common/index/index.c
> index 066618e..ba69cdc 100644
> --- a/src/common/index/index.c
> +++ b/src/common/index/index.c
> @@ -1,6 +1,7 @@
>  /*
>   * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
>   *                      David Goulet <dgoulet at efficios.com>
> + *               2016 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
>   *
>   * This program is free software; you can redistribute it and/or modify it
>   * under the terms of the GNU General Public License, version 2 only, as
> @@ -32,15 +33,24 @@
>  /*
>   * Create the index file associated with a trace file.
>   *
> - * Return fd on success, a negative value on error.
> + * Return allocated struct lttng_index_file, NULL on error.
>   */
> -int index_create_file(char *path_name, char *stream_name, int uid, int gid,
> -               uint64_t size, uint64_t count)
> +struct lttng_index_file *lttng_index_file_create(char *path_name,
> +               char *stream_name, int uid, int gid,
> +               uint64_t size, uint64_t count, uint32_t major, uint32_t minor)
>  {
> +       struct lttng_index_file *index_file;
>         int ret, fd = -1;
>         ssize_t size_ret;
>         struct ctf_packet_index_file_hdr hdr;
>         char fullpath[PATH_MAX];
> +       uint32_t element_len = ctf_packet_index_len(major, minor);
> +
> +       index_file = zmalloc(sizeof(*index_file));
> +       if (!index_file) {
> +               PERROR("out of memory");

Changed message to "allocating lttng_index_file" to provide some
context on error.

> +               goto error;
> +       }
>
>         ret = snprintf(fullpath, sizeof(fullpath), "%s/" DEFAULT_INDEX_DIR,
>                         path_name);
> @@ -79,9 +89,9 @@ int index_create_file(char *path_name, char *stream_name, int uid, int gid,
>         fd = ret;
>
>         hdr.magic = htobe32(CTF_INDEX_MAGIC);
> -       hdr.index_major = htobe32(CTF_INDEX_MAJOR);
> -       hdr.index_minor = htobe32(CTF_INDEX_MINOR);
> -       hdr.packet_index_len = htobe32(sizeof(struct ctf_packet_index));
> +       hdr.index_major = htobe32(major);
> +       hdr.index_minor = htobe32(minor);
> +       hdr.packet_index_len = htobe32(element_len);
>
>         size_ret = lttng_write(fd, &hdr, sizeof(hdr));
>         if (size_ret < sizeof(hdr)) {
> @@ -89,8 +99,13 @@ int index_create_file(char *path_name, char *stream_name, int uid, int gid,
>                 ret = -1;
>                 goto error;
>         }
> +       index_file->fd = fd;
> +       index_file->major = major;
> +       index_file->minor = minor;
> +       index_file->element_len = element_len;
> +       urcu_ref_init(&index_file->ref);
>
> -       return fd;
> +       return index_file;
>
>  error:
>         if (fd >= 0) {
> @@ -101,51 +116,94 @@ error:
>                         PERROR("close index fd");
>                 }
>         }
> -       return ret;
> +       free(index_file);
> +       return NULL;
>  }
>
>  /*
> - * Write index values to the given fd of size len.
> + * Write index values to the given index file.
>   *
> - * Return "len" on success or else < len on error. errno contains error
> - * details.
> + * Return 0 on success, -1 on error.
>   */
> -ssize_t index_write(int fd, struct ctf_packet_index *index, size_t len)
> +int lttng_index_file_write(const struct lttng_index_file *index_file,
> +               const struct ctf_packet_index *element)
>  {
>         ssize_t ret;
> +       int fd = index_file->fd;
> +       size_t len = index_file->element_len;
>
> -       assert(index);
> +       assert(element);
>
>         if (fd < 0) {
> -               ret = -EINVAL;
>                 goto error;
>         }
>
> -       ret = lttng_write(fd, index, len);
> +       ret = lttng_write(fd, element, len);
>         if (ret < len) {
>                 PERROR("writing index file");
> +               goto error;
> +       }
> +       return 0;
> +
> +error:
> +       return -1;
> +}
> +
> +/*
> + * Read index values from the given index file.
> + *
> + * Return 0 on success, -1 on error.
> + */
> +int lttng_index_file_read(const struct lttng_index_file *index_file,
> +               struct ctf_packet_index *element)
> +{
> +       ssize_t ret;
> +       int fd = index_file->fd;
> +       size_t len = index_file->element_len;
> +
> +       assert(element);
> +
> +       if (fd < 0) {
> +               goto error;
> +       }
> +
> +       ret = lttng_read(fd, element, len);
> +       if (ret < len) {
> +               PERROR("read index file");
> +               goto error;
>         }
> +       return 0;
>
>  error:
> -       return ret;
> +       return -1;
>  }
>
>  /*
>   * Open index file using a given path, channel name and tracefile count.
>   *
> - * Return read only FD on success or else a negative value.
> + * Return allocated struct lttng_index_file, NULL on error.
>   */
> -int index_open(const char *path_name, const char *channel_name,
> -               uint64_t tracefile_count, uint64_t tracefile_count_current)
> +struct lttng_index_file *lttng_index_file_open(const char *path_name,
> +               const char *channel_name, uint64_t tracefile_count,
> +               uint64_t tracefile_count_current)
>  {
> +       struct lttng_index_file *index_file;
>         int ret, read_fd;
>         ssize_t read_len;
>         char fullpath[PATH_MAX];
>         struct ctf_packet_index_file_hdr hdr;
> +       uint32_t major, minor, element_len;
>
>         assert(path_name);
>         assert(channel_name);
>
> +       index_file = zmalloc(sizeof(*index_file));
> +       if (!index_file) {
> +               PERROR("out of memory");

Changed message to "allocating lttng_index_file" to provide some
context on error.

> +               goto error;
> +       }
> +
> +

Removed extra empty line here.

>         if (tracefile_count > 0) {
>                 ret = snprintf(fullpath, sizeof(fullpath), "%s/" DEFAULT_INDEX_DIR "/%s_%"
>                                 PRIu64 DEFAULT_INDEX_FILE_SUFFIX, path_name,
> @@ -180,13 +238,22 @@ int index_open(const char *path_name, const char *channel_name,
>                 ERR("Invalid header magic");
>                 goto error_close;
>         }
> -       if (be32toh(hdr.index_major) != CTF_INDEX_MAJOR ||
> -                       be32toh(hdr.index_minor) != CTF_INDEX_MINOR) {
> +       major = be32toh(hdr.index_major);
> +       minor = be32toh(hdr.index_minor);
> +       element_len = be32toh(hdr.packet_index_len);
> +
> +       if (major != CTF_INDEX_MAJOR) {
>                 ERR("Invalid header version");
>                 goto error_close;
>         }
>
> -       return read_fd;
> +       index_file->fd = read_fd;
> +       index_file->major = major;
> +       index_file->minor = minor;
> +       index_file->element_len = element_len;
> +       urcu_ref_init(&index_file->ref);
> +
> +       return index_file;
>
>  error_close:
>         if (read_fd >= 0) {
> @@ -200,5 +267,28 @@ error_close:
>         ret = -1;
>
>  error:
> -       return ret;
> +       free(index_file);
> +       return NULL;
> +}
> +
> +

Removed extra empty line.

> +void lttng_index_file_get(struct lttng_index_file *index_file)
> +{
> +       urcu_ref_get(&index_file->ref);
> +}
> +
> +static void lttng_index_file_release(struct urcu_ref *ref)
> +{
> +       struct lttng_index_file *index_file = caa_container_of(ref,
> +                       struct lttng_index_file, ref);
> +
> +       if (close(index_file->fd)) {
> +               PERROR("close index fd");
> +       }
> +       free(index_file);
> +}
> +
> +void lttng_index_file_put(struct lttng_index_file *index_file)
> +{
> +       urcu_ref_put(&index_file->ref, lttng_index_file_release);
>  }
> diff --git a/src/common/index/index.h b/src/common/index/index.h
> index a7e6aee..7020936 100644
> --- a/src/common/index/index.h
> +++ b/src/common/index/index.h
> @@ -1,6 +1,7 @@
>  /*
>   * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
>   *                      David Goulet <dgoulet at efficios.com>
> + *               2016 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
>   *
>   * This program is free software; you can redistribute it and/or modify it
>   * under the terms of the GNU General Public License, version 2 only, as
> @@ -20,13 +21,34 @@
>  #define _INDEX_H
>
>  #include <inttypes.h>
> +#include <urcu/ref.h>
>
>  #include "ctf-index.h"
>
> -int index_create_file(char *path_name, char *stream_name, int uid, int gid,
> -               uint64_t size, uint64_t count);
> -ssize_t index_write(int fd, struct ctf_packet_index *index, size_t len);
> -int index_open(const char *path_name, const char *channel_name,
> -               uint64_t tracefile_count, uint64_t tracefile_count_current);
> +struct lttng_index_file {
> +       int fd;
> +       uint32_t major;
> +       uint32_t minor;
> +       uint32_t element_len;
> +       struct urcu_ref ref;
> +};
> +
> +/*
> + * create and open have refcount of 1. Use put to decrement the
> + * refcount. Destroys when reaching 0. Use "get" to increment refcount.
> + */
> +struct lttng_index_file *lttng_index_file_create(char *path_name,
> +               char *stream_name, int uid, int gid, uint64_t size,
> +               uint64_t count, uint32_t major, uint32_t minor);
> +struct lttng_index_file *lttng_index_file_open(const char *path_name,
> +               const char *channel_name, uint64_t tracefile_count,
> +               uint64_t tracefile_count_current);
> +int lttng_index_file_write(const struct lttng_index_file *index_file,
> +               const struct ctf_packet_index *element);
> +int lttng_index_file_read(const struct lttng_index_file *index_file,
> +               struct ctf_packet_index *element);
> +
> +void lttng_index_file_get(struct lttng_index_file *index_file);
> +void lttng_index_file_put(struct lttng_index_file *index_file);
>
>  #endif /* _INDEX_H */
> diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c
> index a8abcd7..d03ff70 100644
> --- a/src/common/kernel-consumer/kernel-consumer.c
> +++ b/src/common/kernel-consumer/kernel-consumer.c
> @@ -1475,14 +1475,17 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
>                 stream->tracefile_size_current = 0;
>
>                 if (!stream->metadata_flag) {
> -                       ret = index_create_file(stream->chan->pathname,
> +                       struct lttng_index_file *index_file;
> +
> +                       index_file = lttng_index_file_create(stream->chan->pathname,
>                                         stream->name, stream->uid, stream->gid,
>                                         stream->chan->tracefile_size,
> -                                       stream->tracefile_count_current);
> -                       if (ret < 0) {
> +                                       stream->tracefile_count_current,
> +                                       CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
> +                       if (!index_file) {
>                                 goto error;
>                         }
> -                       stream->index_fd = ret;
> +                       stream->index_file = index_file;
>                 }
>         }
>
> diff --git a/src/common/macros.h b/src/common/macros.h
> index 0e11d3c..8ae6535 100644
> --- a/src/common/macros.h
> +++ b/src/common/macros.h
> @@ -78,6 +78,8 @@ void *zmalloc(size_t len)
>  #define LTTNG_HIDDEN __attribute__((visibility("hidden")))
>  #endif
>
> +#define member_sizeof(type, field)     sizeof(((type *) 0)->field)
> +
>  /*
>   * lttng_strncpy returns 0 on success, or nonzero on failure.
>   * It checks that the @src string fits into @dst_len before performing
> diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c
> index 7f0ea74..ec0111e 100644
> --- a/src/common/relayd/relayd.c
> +++ b/src/common/relayd/relayd.c
> @@ -856,7 +856,10 @@ int relayd_send_index(struct lttcomm_relayd_sock *rsock,
>         }
>
>         /* Send command */
> -       ret = send_command(rsock, RELAYD_SEND_INDEX, &msg, sizeof(msg), 0);
> +       ret = send_command(rsock, RELAYD_SEND_INDEX, &msg,
> +               lttcomm_relayd_index_len(lttng_to_index_major(rsock->major,
> +                                                       rsock->minor),
> +                       lttng_to_index_minor(rsock->major, rsock->minor)), 0);
>         if (ret < 0) {
>                 goto error;
>         }
> diff --git a/src/common/sessiond-comm/relayd.h b/src/common/sessiond-comm/relayd.h
> index 1fc48c4..eb3e5f1 100644
> --- a/src/common/sessiond-comm/relayd.h
> +++ b/src/common/sessiond-comm/relayd.h
> @@ -159,10 +159,30 @@ struct lttcomm_relayd_index {
>         uint64_t timestamp_end;
>         uint64_t events_discarded;
>         uint64_t stream_id;
> +       /* 2.8+ */
>         uint64_t stream_instance_id;
>         uint64_t packet_seq_num;
>  } LTTNG_PACKED;
>
> +static inline size_t lttcomm_relayd_index_len(uint32_t major, uint32_t minor)
> +{
> +       if (major == 1) {
> +               switch (minor) {
> +               case 0:
> +                       return offsetof(struct lttcomm_relayd_index, stream_id)
> +                               + member_sizeof(struct lttcomm_relayd_index,
> +                                               stream_id);
> +               case 1:
> +                       return offsetof(struct lttcomm_relayd_index, packet_seq_num)
> +                               + member_sizeof(struct lttcomm_relayd_index,
> +                                               packet_seq_num);
> +               default:
> +                       abort();
> +               }
> +       }
> +       abort();
> +}
> +
>  /*
>   * Create session in 2.4 adds additionnal parameters for live reading.
>   */
> diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c
> index 76eee2a..82d56eb 100644
> --- a/src/common/ust-consumer/ust-consumer.c
> +++ b/src/common/ust-consumer/ust-consumer.c
> @@ -2620,14 +2620,17 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
>                 stream->tracefile_size_current = 0;
>
>                 if (!stream->metadata_flag) {
> -                       ret = index_create_file(stream->chan->pathname,
> +                       struct lttng_index_file *index_file;
> +
> +                       index_file = lttng_index_file_create(stream->chan->pathname,
>                                         stream->name, stream->uid, stream->gid,
>                                         stream->chan->tracefile_size,
> -                                       stream->tracefile_count_current);
> -                       if (ret < 0) {
> +                                       stream->tracefile_count_current,
> +                                       CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
> +                       if (!index_file) {
>                                 goto error;
>                         }
> -                       stream->index_fd = ret;
> +                       stream->index_file = index_file;
>                 }
>         }
>         ret = 0;
> --
> 2.1.4
>



-- 
Jérémie Galarneau
EfficiOS Inc.
http://www.efficios.com


More information about the lttng-dev mailing list