[lttng-dev] [PATCH lttng-tools] Fix: relayd vs consumerd compatibility
Mathieu Desnoyers
mathieu.desnoyers at efficios.com
Thu Dec 15 10:04:57 UTC 2016
relay and consumerd 2.7 and 2.8 are expected to negociate compatibility
with the lowest common minor version.
If a consumer daemon 2.8 interacts with a relayd 2.7, it needs to send
the index fields for ctf index 1.0. Same if a relayd 2.8 interacts with
a consumer daemon 2.7: relayd should expect ctf index 1.0 fields, and
generate a ctf index 1.0 index file layout.
If both relayd and consumerd versions are 2.8+, then we can send the ctf
index 1.1 fields over the protocol, and store them in the index files.
Whenever the relayd live viewer server opens and reads an index file,
it needs to use the file's header to figure out the index "element"
size.
[ Should be applied to master, stable-2.9, stable-2.8. ]
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
---
src/bin/lttng-relayd/index.c | 30 +++---
src/bin/lttng-relayd/index.h | 11 ++-
src/bin/lttng-relayd/live.c | 34 +++----
src/bin/lttng-relayd/main.c | 45 +++++----
src/bin/lttng-relayd/stream.c | 6 +-
src/bin/lttng-relayd/stream.h | 4 +-
src/bin/lttng-relayd/viewer-stream.c | 47 ++++-----
src/bin/lttng-relayd/viewer-stream.h | 4 +-
src/common/consumer/consumer-stream.c | 20 ++--
src/common/consumer/consumer.c | 37 +++-----
src/common/consumer/consumer.h | 4 +-
src/common/index/ctf-index.h | 39 ++++++++
src/common/index/index.c | 136 ++++++++++++++++++++++-----
src/common/index/index.h | 32 ++++++-
src/common/kernel-consumer/kernel-consumer.c | 11 ++-
src/common/macros.h | 2 +
src/common/relayd/relayd.c | 5 +-
src/common/sessiond-comm/relayd.h | 20 ++++
src/common/ust-consumer/ust-consumer.c | 11 ++-
19 files changed, 318 insertions(+), 180 deletions(-)
diff --git a/src/bin/lttng-relayd/index.c b/src/bin/lttng-relayd/index.c
index 80a4bb9..b15bbcd 100644
--- a/src/bin/lttng-relayd/index.c
+++ b/src/bin/lttng-relayd/index.c
@@ -166,18 +166,19 @@ end:
return index;
}
-int relay_index_set_fd(struct relay_index *index, struct stream_fd *index_fd,
+int relay_index_set_file(struct relay_index *index,
+ struct lttng_index_file *index_file,
uint64_t data_offset)
{
int ret = 0;
pthread_mutex_lock(&index->lock);
- if (index->index_fd) {
+ if (index->index_file) {
ret = -1;
goto end;
}
- stream_fd_get(index_fd);
- index->index_fd = index_fd;
+ lttng_index_file_get(index_file);
+ index->index_file = index_file;
index->index_data.offset = data_offset;
end:
pthread_mutex_unlock(&index->lock);
@@ -228,9 +229,9 @@ static void index_release(struct urcu_ref *ref)
int ret;
struct lttng_ht_iter iter;
- if (index->index_fd) {
- stream_fd_put(index->index_fd);
- index->index_fd = NULL;
+ if (index->index_file) {
+ lttng_index_file_put(index->index_file);
+ index->index_file = NULL;
}
if (index->in_hash_table) {
/* Delete index from hash table. */
@@ -290,21 +291,16 @@ int relay_index_try_flush(struct relay_index *index)
goto skip;
}
/* Check if we are ready to flush. */
- if (!index->has_index_data || !index->index_fd) {
+ if (!index->has_index_data || !index->index_file) {
goto skip;
}
- fd = index->index_fd->fd;
+ fd = index->index_file->fd;
DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
" on fd %d", index->stream->stream_handle,
index->index_n.key, fd);
flushed = true;
index->flushed = true;
- ret = index_write(fd, &index->index_data, sizeof(index->index_data));
- if (ret == sizeof(index->index_data)) {
- ret = 0;
- } else {
- ret = -1;
- }
+ ret = lttng_index_file_write(index->index_file, &index->index_data);
skip:
pthread_mutex_unlock(&index->lock);
@@ -341,11 +337,11 @@ void relay_index_close_partial_fd(struct relay_stream *stream)
rcu_read_lock();
cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
index, index_n.node) {
- if (!index->index_fd) {
+ if (!index->index_file) {
continue;
}
/*
- * Partial index has its index_fd: we have only
+ * Partial index has its index_file: we have only
* received its info from the data socket.
* Put self-ref from index.
*/
diff --git a/src/bin/lttng-relayd/index.h b/src/bin/lttng-relayd/index.h
index 15c4ac8..80fe86a 100644
--- a/src/bin/lttng-relayd/index.h
+++ b/src/bin/lttng-relayd/index.h
@@ -40,10 +40,10 @@ struct relay_index {
pthread_mutex_t lock;
/*
- * FD on which to write the index data. May differ from
- * stream->index_fd due to tracefile rotation.
+ * index file on which to write the index data. May differ from
+ * stream->index_file due to tracefile rotation.
*/
- struct stream_fd *index_fd;
+ struct lttng_index_file *index_file;
/* Index packet data. This is the data that is written on disk. */
struct ctf_packet_index index_data;
@@ -64,8 +64,9 @@ struct relay_index {
struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
uint64_t net_seq_num);
void relay_index_put(struct relay_index *index);
-int relay_index_set_fd(struct relay_index *index, struct stream_fd *index_fd,
- uint64_t data_offset);
+int relay_index_set_file(struct relay_index *index,
+ struct lttng_index_file *index_file,
+ uint64_t data_offset);
int relay_index_set_data(struct relay_index *index,
const struct ctf_packet_index *data);
int relay_index_try_flush(struct relay_index *index);
diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c
index 2c3a105..1d0492c 100644
--- a/src/bin/lttng-relayd/live.c
+++ b/src/bin/lttng-relayd/live.c
@@ -1122,7 +1122,7 @@ error:
/*
* Open the index file if needed for the given vstream.
*
- * If an index file is successfully opened, the vstream index_fd set with
+ * If an index file is successfully opened, the vstream index_file set with
* it.
*
* Return 0 on success, a negative value on error (-ENOENT if not ready yet).
@@ -1133,8 +1133,9 @@ static int try_open_index(struct relay_viewer_stream *vstream,
struct relay_stream *rstream)
{
int ret = 0;
+ struct lttng_index_file *index_file;
- if (vstream->index_fd) {
+ if (vstream->index_file) {
goto end;
}
@@ -1145,20 +1146,15 @@ static int try_open_index(struct relay_viewer_stream *vstream,
ret = -ENOENT;
goto end;
}
- ret = index_open(vstream->path_name, vstream->channel_name,
+ index_file = lttng_index_file_open(vstream->path_name,
+ vstream->channel_name,
vstream->stream->tracefile_count,
vstream->current_tracefile_id);
- if (ret >= 0) {
- vstream->index_fd = stream_fd_create(ret);
- if (!vstream->index_fd) {
- if (close(ret)) {
- PERROR("close");
- }
- ret = -1;
- } else {
- ret = 0;
- }
- goto end;
+ if (index_file) {
+ vstream->index_file = index_file;
+ ret = 0;
+ } else {
+ ret = -1;
}
end:
@@ -1277,7 +1273,6 @@ static
int viewer_get_next_index(struct relay_connection *conn)
{
int ret;
- ssize_t read_ret;
struct lttng_viewer_get_next_index request_index;
struct lttng_viewer_index viewer_index;
struct ctf_packet_index packet_index;
@@ -1400,11 +1395,10 @@ int viewer_get_next_index(struct relay_connection *conn)
viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
}
- read_ret = lttng_read(vstream->index_fd->fd, &packet_index,
- sizeof(packet_index));
- if (read_ret < sizeof(packet_index)) {
- ERR("Relay reading index file %d returned %zd",
- vstream->index_fd->fd, read_ret);
+ ret = lttng_index_file_read(vstream->index_file, &packet_index);
+ if (ret) {
+ ERR("Relay error reading index file %d",
+ vstream->index_file->fd);
viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
goto send_reply;
} else {
diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c
index 9c0e2b1..43603a9 100644
--- a/src/bin/lttng-relayd/main.c
+++ b/src/bin/lttng-relayd/main.c
@@ -1946,6 +1946,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
struct lttcomm_relayd_generic_reply reply;
struct relay_stream *stream;
uint64_t net_seq_num;
+ size_t msg_len;
assert(conn);
@@ -1957,9 +1958,12 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
goto end_no_session;
}
+ msg_len = lttcomm_relayd_index_len(lttng_to_index_major(conn->major,
+ conn->minor),
+ lttng_to_index_minor(conn->major, conn->minor));
ret = conn->sock->ops->recvmsg(conn->sock, &index_info,
- sizeof(index_info), 0);
- if (ret < sizeof(index_info)) {
+ msg_len, 0);
+ if (ret < msg_len) {
if (ret == 0) {
/* Orderly shutdown. Not necessary to print an error. */
DBG("Socket %d did an orderly shutdown", conn->sock->fd);
@@ -2183,38 +2187,31 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
goto end;
}
- if (rotate_index || !stream->index_fd) {
- int fd;
+ if (rotate_index || !stream->index_file) {
+ uint32_t major, minor;
- /* Put ref on previous index_fd. */
- if (stream->index_fd) {
- stream_fd_put(stream->index_fd);
- stream->index_fd = NULL;
+ /* Put ref on previous index_file. */
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = NULL;
}
-
- fd = index_create_file(stream->path_name, stream->channel_name,
+ major = stream->trace->session->major;
+ minor = stream->trace->session->minor;
+ stream->index_file = lttng_index_file_create(stream->path_name,
+ stream->channel_name,
-1, -1, stream->tracefile_size,
- tracefile_array_get_file_index_head(stream->tfa));
- if (fd < 0) {
+ tracefile_array_get_file_index_head(stream->tfa),
+ lttng_to_index_major(major, minor),
+ lttng_to_index_minor(major, minor));
+ if (!stream->index_file) {
ret = -1;
/* Put self-ref for this index due to error. */
relay_index_put(index);
goto end;
}
- stream->index_fd = stream_fd_create(fd);
- if (!stream->index_fd) {
- ret = -1;
- if (close(fd)) {
- PERROR("Error closing FD %d", fd);
- }
- /* Put self-ref for this index due to error. */
- relay_index_put(index);
- /* Will put the local ref. */
- goto end;
- }
}
- if (relay_index_set_fd(index, stream->index_fd, data_offset)) {
+ if (relay_index_set_file(index, stream->index_file, data_offset)) {
ret = -1;
/* Put self-ref for this index due to error. */
relay_index_put(index);
diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c
index 335a1cf..c59bb94 100644
--- a/src/bin/lttng-relayd/stream.c
+++ b/src/bin/lttng-relayd/stream.c
@@ -306,9 +306,9 @@ static void stream_release(struct urcu_ref *ref)
stream_fd_put(stream->stream_fd);
stream->stream_fd = NULL;
}
- if (stream->index_fd) {
- stream_fd_put(stream->index_fd);
- stream->index_fd = NULL;
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = NULL;
}
if (stream->trace) {
ctf_trace_put(stream->trace);
diff --git a/src/bin/lttng-relayd/stream.h b/src/bin/lttng-relayd/stream.h
index 5030e5d..d471c7b 100644
--- a/src/bin/lttng-relayd/stream.h
+++ b/src/bin/lttng-relayd/stream.h
@@ -58,8 +58,8 @@ struct relay_stream {
/* FD on which to write the stream data. */
struct stream_fd *stream_fd;
- /* FD on which to write the index data. */
- struct stream_fd *index_fd;
+ /* index file on which to write the index data. */
+ struct lttng_index_file *index_file;
char *path_name;
char *channel_name;
diff --git a/src/bin/lttng-relayd/viewer-stream.c b/src/bin/lttng-relayd/viewer-stream.c
index 7c59cd0..8a3b09a 100644
--- a/src/bin/lttng-relayd/viewer-stream.c
+++ b/src/bin/lttng-relayd/viewer-stream.c
@@ -116,29 +116,21 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
* the opening of the index, otherwise open it right now.
*/
if (stream->index_received_seqcount == 0) {
- vstream->index_fd = NULL;
+ vstream->index_file = NULL;
} else {
- int read_fd;
-
- read_fd = index_open(vstream->path_name, vstream->channel_name,
+ vstream->index_file = lttng_index_file_open(vstream->path_name,
+ vstream->channel_name,
stream->tracefile_count,
vstream->current_tracefile_id);
- if (read_fd < 0) {
- goto error_unlock;
- }
- vstream->index_fd = stream_fd_create(read_fd);
- if (!vstream->index_fd) {
- if (close(read_fd)) {
- PERROR("close");
- }
+ if (!vstream->index_file) {
goto error_unlock;
}
}
- if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_fd) {
+ if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_file) {
off_t lseek_ret;
- lseek_ret = lseek(vstream->index_fd->fd, 0, SEEK_END);
+ lseek_ret = lseek(vstream->index_file->fd, 0, SEEK_END);
if (lseek_ret < 0) {
goto error_unlock;
}
@@ -192,9 +184,9 @@ static void viewer_stream_release(struct urcu_ref *ref)
stream_fd_put(vstream->stream_fd);
vstream->stream_fd = NULL;
}
- if (vstream->index_fd) {
- stream_fd_put(vstream->index_fd);
- vstream->index_fd = NULL;
+ if (vstream->index_file) {
+ lttng_index_file_put(vstream->index_file);
+ vstream->index_file = NULL;
}
if (vstream->stream) {
stream_put(vstream->stream);
@@ -305,29 +297,24 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream)
vstream->index_sent_seqcount = seq_tail;
}
- if (vstream->index_fd) {
- stream_fd_put(vstream->index_fd);
- vstream->index_fd = NULL;
+ if (vstream->index_file) {
+ lttng_index_file_put(vstream->index_file);
+ vstream->index_file = NULL;
}
if (vstream->stream_fd) {
stream_fd_put(vstream->stream_fd);
vstream->stream_fd = NULL;
}
- ret = index_open(vstream->path_name, vstream->channel_name,
+ vstream->index_file = lttng_index_file_open(vstream->path_name,
+ vstream->channel_name,
stream->tracefile_count,
vstream->current_tracefile_id);
- if (ret < 0) {
+ if (!vstream->index_file) {
+ ret = -1;
goto end;
- }
- vstream->index_fd = stream_fd_create(ret);
- if (vstream->index_fd) {
- ret = 0;
} else {
- if (close(ret)) {
- PERROR("close");
- }
- ret = -1;
+ ret = 0;
}
end:
return ret;
diff --git a/src/bin/lttng-relayd/viewer-stream.h b/src/bin/lttng-relayd/viewer-stream.h
index 5dc135d..2514b17 100644
--- a/src/bin/lttng-relayd/viewer-stream.h
+++ b/src/bin/lttng-relayd/viewer-stream.h
@@ -52,8 +52,8 @@ struct relay_viewer_stream {
/* FD from which to read the stream data. */
struct stream_fd *stream_fd;
- /* FD from which to read the index data. */
- struct stream_fd *index_fd;
+ /* index file from which to read the index data. */
+ struct lttng_index_file *index_file;
char *path_name;
char *channel_name;
diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c
index a62cef2..2167f73 100644
--- a/src/common/consumer/consumer-stream.c
+++ b/src/common/consumer/consumer-stream.c
@@ -163,12 +163,8 @@ void consumer_stream_close(struct lttng_consumer_stream *stream)
stream->out_fd = -1;
}
- if (stream->index_fd >= 0) {
- ret = close(stream->index_fd);
- if (ret) {
- PERROR("close stream index_fd");
- }
- stream->index_fd = -1;
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
}
/* Check and cleanup relayd if needed. */
@@ -359,27 +355,23 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream,
* Return 0 on success or else a negative value.
*/
int consumer_stream_write_index(struct lttng_consumer_stream *stream,
- struct ctf_packet_index *index)
+ struct ctf_packet_index *element)
{
int ret;
struct consumer_relayd_sock_pair *relayd;
assert(stream);
- assert(index);
+ assert(element);
rcu_read_lock();
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd) {
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_send_index(&relayd->control_sock, index,
+ ret = relayd_send_index(&relayd->control_sock, element,
stream->relayd_stream_id, stream->next_net_seq_num - 1);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
} else {
- ssize_t size_ret;
-
- size_ret = index_write(stream->index_fd, index,
- sizeof(struct ctf_packet_index));
- if (size_ret < sizeof(struct ctf_packet_index)) {
+ if (lttng_index_file_write(stream->index_file, element)) {
ret = -1;
} else {
ret = 0;
diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c
index a184afd..929be65 100644
--- a/src/common/consumer/consumer.c
+++ b/src/common/consumer/consumer.c
@@ -570,7 +570,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
stream->session_id = session_id;
stream->monitor = monitor;
stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
- stream->index_fd = -1;
+ stream->index_file = NULL;
stream->last_sequence_number = -1ULL;
pthread_mutex_init(&stream->lock, NULL);
pthread_mutex_init(&stream->metadata_timer_lock, NULL);
@@ -1624,21 +1624,16 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
}
outfd = stream->out_fd;
- if (stream->index_fd >= 0) {
- ret = close(stream->index_fd);
- if (ret < 0) {
- PERROR("Closing index");
- goto end;
- }
- stream->index_fd = -1;
- ret = index_create_file(stream->chan->pathname,
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = lttng_index_file_create(stream->chan->pathname,
stream->name, stream->uid, stream->gid,
stream->chan->tracefile_size,
- stream->tracefile_count_current);
- if (ret < 0) {
+ stream->tracefile_count_current,
+ CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+ if (!stream->index_file) {
goto end;
}
- stream->index_fd = ret;
}
/* Reset current size because we just perform a rotation. */
@@ -1831,22 +1826,16 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
}
outfd = stream->out_fd;
- if (stream->index_fd >= 0) {
- ret = close(stream->index_fd);
- if (ret < 0) {
- PERROR("Closing index");
- goto end;
- }
- stream->index_fd = -1;
- ret = index_create_file(stream->chan->pathname,
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = lttng_index_file_create(stream->chan->pathname,
stream->name, stream->uid, stream->gid,
stream->chan->tracefile_size,
- stream->tracefile_count_current);
- if (ret < 0) {
- written = ret;
+ stream->tracefile_count_current,
+ CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+ if (!stream->index_file) {
goto end;
}
- stream->index_fd = ret;
}
/* Reset current size because we just perform a rotation. */
diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h
index 256ea19..acefacb 100644
--- a/src/common/consumer/consumer.h
+++ b/src/common/consumer/consumer.h
@@ -390,9 +390,9 @@ struct lttng_consumer_stream {
/* Copy of the sequence number of the last packet extracted. */
uint64_t last_sequence_number;
/*
- * FD of the index file for this stream.
+ * Index file object of the index file for this stream.
*/
- int index_fd;
+ struct lttng_index_file *index_file;
/*
* Local pipe to extract data when using splice.
diff --git a/src/common/index/ctf-index.h b/src/common/index/ctf-index.h
index 8755f12..b73340f 100644
--- a/src/common/index/ctf-index.h
+++ b/src/common/index/ctf-index.h
@@ -60,4 +60,43 @@ struct ctf_packet_index {
uint64_t packet_seq_num; /* packet sequence number */
} __attribute__((__packed__));
+static inline size_t ctf_packet_index_len(uint32_t major, uint32_t minor)
+{
+ if (major == 1) {
+ switch (minor) {
+ case 0:
+ return offsetof(struct ctf_packet_index, stream_id)
+ + member_sizeof(struct ctf_packet_index,
+ stream_id);
+ case 1:
+ return offsetof(struct ctf_packet_index, packet_seq_num)
+ + member_sizeof(struct ctf_packet_index,
+ packet_seq_num);
+ default:
+ abort();
+ }
+ }
+ abort();
+}
+
+static inline uint32_t lttng_to_index_major(uint32_t lttng_major,
+ uint32_t lttng_minor)
+{
+ if (lttng_major == 2)
+ return 1;
+ abort();
+}
+
+static inline uint32_t lttng_to_index_minor(uint32_t lttng_major,
+ uint32_t lttng_minor)
+{
+ if (lttng_major == 2) {
+ if (lttng_minor < 8)
+ return 0;
+ else
+ return 1;
+ }
+ abort();
+}
+
#endif /* LTTNG_INDEX_H */
diff --git a/src/common/index/index.c b/src/common/index/index.c
index 066618e..ba69cdc 100644
--- a/src/common/index/index.c
+++ b/src/common/index/index.c
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2016 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -32,15 +33,24 @@
/*
* Create the index file associated with a trace file.
*
- * Return fd on success, a negative value on error.
+ * Return allocated struct lttng_index_file, NULL on error.
*/
-int index_create_file(char *path_name, char *stream_name, int uid, int gid,
- uint64_t size, uint64_t count)
+struct lttng_index_file *lttng_index_file_create(char *path_name,
+ char *stream_name, int uid, int gid,
+ uint64_t size, uint64_t count, uint32_t major, uint32_t minor)
{
+ struct lttng_index_file *index_file;
int ret, fd = -1;
ssize_t size_ret;
struct ctf_packet_index_file_hdr hdr;
char fullpath[PATH_MAX];
+ uint32_t element_len = ctf_packet_index_len(major, minor);
+
+ index_file = zmalloc(sizeof(*index_file));
+ if (!index_file) {
+ PERROR("out of memory");
+ goto error;
+ }
ret = snprintf(fullpath, sizeof(fullpath), "%s/" DEFAULT_INDEX_DIR,
path_name);
@@ -79,9 +89,9 @@ int index_create_file(char *path_name, char *stream_name, int uid, int gid,
fd = ret;
hdr.magic = htobe32(CTF_INDEX_MAGIC);
- hdr.index_major = htobe32(CTF_INDEX_MAJOR);
- hdr.index_minor = htobe32(CTF_INDEX_MINOR);
- hdr.packet_index_len = htobe32(sizeof(struct ctf_packet_index));
+ hdr.index_major = htobe32(major);
+ hdr.index_minor = htobe32(minor);
+ hdr.packet_index_len = htobe32(element_len);
size_ret = lttng_write(fd, &hdr, sizeof(hdr));
if (size_ret < sizeof(hdr)) {
@@ -89,8 +99,13 @@ int index_create_file(char *path_name, char *stream_name, int uid, int gid,
ret = -1;
goto error;
}
+ index_file->fd = fd;
+ index_file->major = major;
+ index_file->minor = minor;
+ index_file->element_len = element_len;
+ urcu_ref_init(&index_file->ref);
- return fd;
+ return index_file;
error:
if (fd >= 0) {
@@ -101,51 +116,94 @@ error:
PERROR("close index fd");
}
}
- return ret;
+ free(index_file);
+ return NULL;
}
/*
- * Write index values to the given fd of size len.
+ * Write index values to the given index file.
*
- * Return "len" on success or else < len on error. errno contains error
- * details.
+ * Return 0 on success, -1 on error.
*/
-ssize_t index_write(int fd, struct ctf_packet_index *index, size_t len)
+int lttng_index_file_write(const struct lttng_index_file *index_file,
+ const struct ctf_packet_index *element)
{
ssize_t ret;
+ int fd = index_file->fd;
+ size_t len = index_file->element_len;
- assert(index);
+ assert(element);
if (fd < 0) {
- ret = -EINVAL;
goto error;
}
- ret = lttng_write(fd, index, len);
+ ret = lttng_write(fd, element, len);
if (ret < len) {
PERROR("writing index file");
+ goto error;
+ }
+ return 0;
+
+error:
+ return -1;
+}
+
+/*
+ * Read index values from the given index file.
+ *
+ * Return 0 on success, -1 on error.
+ */
+int lttng_index_file_read(const struct lttng_index_file *index_file,
+ struct ctf_packet_index *element)
+{
+ ssize_t ret;
+ int fd = index_file->fd;
+ size_t len = index_file->element_len;
+
+ assert(element);
+
+ if (fd < 0) {
+ goto error;
+ }
+
+ ret = lttng_read(fd, element, len);
+ if (ret < len) {
+ PERROR("read index file");
+ goto error;
}
+ return 0;
error:
- return ret;
+ return -1;
}
/*
* Open index file using a given path, channel name and tracefile count.
*
- * Return read only FD on success or else a negative value.
+ * Return allocated struct lttng_index_file, NULL on error.
*/
-int index_open(const char *path_name, const char *channel_name,
- uint64_t tracefile_count, uint64_t tracefile_count_current)
+struct lttng_index_file *lttng_index_file_open(const char *path_name,
+ const char *channel_name, uint64_t tracefile_count,
+ uint64_t tracefile_count_current)
{
+ struct lttng_index_file *index_file;
int ret, read_fd;
ssize_t read_len;
char fullpath[PATH_MAX];
struct ctf_packet_index_file_hdr hdr;
+ uint32_t major, minor, element_len;
assert(path_name);
assert(channel_name);
+ index_file = zmalloc(sizeof(*index_file));
+ if (!index_file) {
+ PERROR("out of memory");
+ goto error;
+ }
+
+
if (tracefile_count > 0) {
ret = snprintf(fullpath, sizeof(fullpath), "%s/" DEFAULT_INDEX_DIR "/%s_%"
PRIu64 DEFAULT_INDEX_FILE_SUFFIX, path_name,
@@ -180,13 +238,22 @@ int index_open(const char *path_name, const char *channel_name,
ERR("Invalid header magic");
goto error_close;
}
- if (be32toh(hdr.index_major) != CTF_INDEX_MAJOR ||
- be32toh(hdr.index_minor) != CTF_INDEX_MINOR) {
+ major = be32toh(hdr.index_major);
+ minor = be32toh(hdr.index_minor);
+ element_len = be32toh(hdr.packet_index_len);
+
+ if (major != CTF_INDEX_MAJOR) {
ERR("Invalid header version");
goto error_close;
}
- return read_fd;
+ index_file->fd = read_fd;
+ index_file->major = major;
+ index_file->minor = minor;
+ index_file->element_len = element_len;
+ urcu_ref_init(&index_file->ref);
+
+ return index_file;
error_close:
if (read_fd >= 0) {
@@ -200,5 +267,28 @@ error_close:
ret = -1;
error:
- return ret;
+ free(index_file);
+ return NULL;
+}
+
+
+void lttng_index_file_get(struct lttng_index_file *index_file)
+{
+ urcu_ref_get(&index_file->ref);
+}
+
+static void lttng_index_file_release(struct urcu_ref *ref)
+{
+ struct lttng_index_file *index_file = caa_container_of(ref,
+ struct lttng_index_file, ref);
+
+ if (close(index_file->fd)) {
+ PERROR("close index fd");
+ }
+ free(index_file);
+}
+
+void lttng_index_file_put(struct lttng_index_file *index_file)
+{
+ urcu_ref_put(&index_file->ref, lttng_index_file_release);
}
diff --git a/src/common/index/index.h b/src/common/index/index.h
index a7e6aee..7020936 100644
--- a/src/common/index/index.h
+++ b/src/common/index/index.h
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2016 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -20,13 +21,34 @@
#define _INDEX_H
#include <inttypes.h>
+#include <urcu/ref.h>
#include "ctf-index.h"
-int index_create_file(char *path_name, char *stream_name, int uid, int gid,
- uint64_t size, uint64_t count);
-ssize_t index_write(int fd, struct ctf_packet_index *index, size_t len);
-int index_open(const char *path_name, const char *channel_name,
- uint64_t tracefile_count, uint64_t tracefile_count_current);
+struct lttng_index_file {
+ int fd;
+ uint32_t major;
+ uint32_t minor;
+ uint32_t element_len;
+ struct urcu_ref ref;
+};
+
+/*
+ * create and open have refcount of 1. Use put to decrement the
+ * refcount. Destroys when reaching 0. Use "get" to increment refcount.
+ */
+struct lttng_index_file *lttng_index_file_create(char *path_name,
+ char *stream_name, int uid, int gid, uint64_t size,
+ uint64_t count, uint32_t major, uint32_t minor);
+struct lttng_index_file *lttng_index_file_open(const char *path_name,
+ const char *channel_name, uint64_t tracefile_count,
+ uint64_t tracefile_count_current);
+int lttng_index_file_write(const struct lttng_index_file *index_file,
+ const struct ctf_packet_index *element);
+int lttng_index_file_read(const struct lttng_index_file *index_file,
+ struct ctf_packet_index *element);
+
+void lttng_index_file_get(struct lttng_index_file *index_file);
+void lttng_index_file_put(struct lttng_index_file *index_file);
#endif /* _INDEX_H */
diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c
index a8abcd7..d03ff70 100644
--- a/src/common/kernel-consumer/kernel-consumer.c
+++ b/src/common/kernel-consumer/kernel-consumer.c
@@ -1475,14 +1475,17 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
stream->tracefile_size_current = 0;
if (!stream->metadata_flag) {
- ret = index_create_file(stream->chan->pathname,
+ struct lttng_index_file *index_file;
+
+ index_file = lttng_index_file_create(stream->chan->pathname,
stream->name, stream->uid, stream->gid,
stream->chan->tracefile_size,
- stream->tracefile_count_current);
- if (ret < 0) {
+ stream->tracefile_count_current,
+ CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+ if (!index_file) {
goto error;
}
- stream->index_fd = ret;
+ stream->index_file = index_file;
}
}
diff --git a/src/common/macros.h b/src/common/macros.h
index 0e11d3c..8ae6535 100644
--- a/src/common/macros.h
+++ b/src/common/macros.h
@@ -78,6 +78,8 @@ void *zmalloc(size_t len)
#define LTTNG_HIDDEN __attribute__((visibility("hidden")))
#endif
+#define member_sizeof(type, field) sizeof(((type *) 0)->field)
+
/*
* lttng_strncpy returns 0 on success, or nonzero on failure.
* It checks that the @src string fits into @dst_len before performing
diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c
index 7f0ea74..ec0111e 100644
--- a/src/common/relayd/relayd.c
+++ b/src/common/relayd/relayd.c
@@ -856,7 +856,10 @@ int relayd_send_index(struct lttcomm_relayd_sock *rsock,
}
/* Send command */
- ret = send_command(rsock, RELAYD_SEND_INDEX, &msg, sizeof(msg), 0);
+ ret = send_command(rsock, RELAYD_SEND_INDEX, &msg,
+ lttcomm_relayd_index_len(lttng_to_index_major(rsock->major,
+ rsock->minor),
+ lttng_to_index_minor(rsock->major, rsock->minor)), 0);
if (ret < 0) {
goto error;
}
diff --git a/src/common/sessiond-comm/relayd.h b/src/common/sessiond-comm/relayd.h
index 1fc48c4..eb3e5f1 100644
--- a/src/common/sessiond-comm/relayd.h
+++ b/src/common/sessiond-comm/relayd.h
@@ -159,10 +159,30 @@ struct lttcomm_relayd_index {
uint64_t timestamp_end;
uint64_t events_discarded;
uint64_t stream_id;
+ /* 2.8+ */
uint64_t stream_instance_id;
uint64_t packet_seq_num;
} LTTNG_PACKED;
+static inline size_t lttcomm_relayd_index_len(uint32_t major, uint32_t minor)
+{
+ if (major == 1) {
+ switch (minor) {
+ case 0:
+ return offsetof(struct lttcomm_relayd_index, stream_id)
+ + member_sizeof(struct lttcomm_relayd_index,
+ stream_id);
+ case 1:
+ return offsetof(struct lttcomm_relayd_index, packet_seq_num)
+ + member_sizeof(struct lttcomm_relayd_index,
+ packet_seq_num);
+ default:
+ abort();
+ }
+ }
+ abort();
+}
+
/*
* Create session in 2.4 adds additionnal parameters for live reading.
*/
diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c
index 76eee2a..82d56eb 100644
--- a/src/common/ust-consumer/ust-consumer.c
+++ b/src/common/ust-consumer/ust-consumer.c
@@ -2620,14 +2620,17 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
stream->tracefile_size_current = 0;
if (!stream->metadata_flag) {
- ret = index_create_file(stream->chan->pathname,
+ struct lttng_index_file *index_file;
+
+ index_file = lttng_index_file_create(stream->chan->pathname,
stream->name, stream->uid, stream->gid,
stream->chan->tracefile_size,
- stream->tracefile_count_current);
- if (ret < 0) {
+ stream->tracefile_count_current,
+ CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+ if (!index_file) {
goto error;
}
- stream->index_fd = ret;
+ stream->index_file = index_file;
}
}
ret = 0;
--
2.1.4
More information about the lttng-dev
mailing list