[lttng-dev] [BABELTRACE RFC PATCH] basic index creation and importing

Julien Desfossez jdesfossez at efficios.com
Fri Sep 14 11:20:09 EDT 2012


This patch provides a proof-of-concept of writing the index and importing it if
it exists. The intent is to have the consumer (not babeltrace) write the index
on disk while tracing and Babeltrace import it if possible.

I added the stream_id field in the packet_index structure, I think it won't be
a problem since this field is in the tracer packet_header structure.

Like we discussed, we'll make sure to write the fields in network byte order.

This index should also be the basis of the live reading of streamed traces. I
think with the timestamp_end of the indexes we will have enough information to
do so : only read the trace data up to min(indexes.timestamp_end).

In the past, we discussed a separate synchronization file in which we would
write the max sequence number that can be read for each stream, but I think we
can simplify that by just reading this index.

Before I start working on the consumer, I'd like your opinion on that.

Thanks,

Julien

Signed-off-by: Julien Desfossez <jdesfossez at efficios.com>
---
 formats/ctf/ctf.c              |   78 +++++++++++++++++++++++++++++++++++++---
 include/babeltrace/ctf/types.h |    2 ++
 2 files changed, 75 insertions(+), 5 deletions(-)

diff --git a/formats/ctf/ctf.c b/formats/ctf/ctf.c
index f1fdba1..c82dd3c 100644
--- a/formats/ctf/ctf.c
+++ b/formats/ctf/ctf.c
@@ -1218,6 +1218,39 @@ error:
 	return ret;
 }
 
+static
+int import_stream_packet_index(struct ctf_trace *td,
+		struct ctf_file_stream *file_stream)
+{
+	struct ctf_stream_declaration *stream;
+	struct ctf_stream_pos *pos;
+	struct packet_index index;
+	int ret, index_read;
+	int first_packet = 1;
+
+	pos = &file_stream->pos;
+
+	while ((index_read = read(pos->index_fd, &index, sizeof(struct packet_index)))) {
+		if (first_packet) {
+			file_stream->parent.stream_id = index.stream_id;
+			stream = g_ptr_array_index(td->streams, index.stream_id);
+			if (!stream) {
+				fprintf(stderr, "[error] Stream %" PRIu64 " is not declared in metadata.\n",
+						index.stream_id);
+				return -EINVAL;
+			}
+			file_stream->parent.stream_class = stream;
+			ret = create_stream_definitions(td, &file_stream->parent);
+			if (ret)
+				return ret;
+			first_packet = 0;
+		}
+		/* add index to packet array */
+		g_array_append_val(file_stream->pos.packet_cycles_index, index);
+	}
+
+	return 0;
+}
 
 static
 int create_stream_packet_index(struct ctf_trace *td,
@@ -1323,6 +1356,7 @@ int create_stream_packet_index(struct ctf_trace *td,
 
 				field = struct_definition_get_field_from_index(file_stream->parent.trace_packet_header, len_index);
 				stream_id = get_unsigned_int(field);
+				packet_index.stream_id = stream_id;
 			}
 		}
 
@@ -1441,6 +1475,7 @@ int create_stream_packet_index(struct ctf_trace *td,
 
 		/* add index to packet array */
 		g_array_append_val(file_stream->pos.packet_cycles_index, packet_index);
+		write(pos->index_fd, &packet_index, sizeof(struct packet_index));
 
 		pos->mmap_offset += packet_index.packet_size / CHAR_BIT;
 	}
@@ -1481,9 +1516,10 @@ int ctf_open_file_stream_read(struct ctf_trace *td, const char *path, int flags,
 		void (*packet_seek)(struct stream_pos *pos, size_t index,
 			int whence))
 {
-	int ret, fd;
+	int ret, fd, index_fd;
 	struct ctf_file_stream *file_stream;
 	struct stat statbuf;
+	char *index_name;
 
 	fd = openat(td->dirfd, path, flags);
 	if (fd < 0) {
@@ -1523,9 +1559,40 @@ int ctf_open_file_stream_read(struct ctf_trace *td, const char *path, int flags,
 	 * For now, only a single clock per trace is supported.
 	 */
 	file_stream->parent.current_clock = td->single_clock;
-	ret = create_stream_packet_index(td, file_stream);
-	if (ret)
-		goto error_index;
+
+	index_name = malloc((strlen(td->path) + strlen(path)) * sizeof(char) + 5);
+	if (!index_name) {
+		fprintf(stderr, "[error] Cannot allocate index filename\n");
+		goto error;
+	}
+	sprintf(index_name, "%s/idx_%s", td->path, path);
+
+	if (access(index_name, F_OK) != 0) {
+		index_fd = open(index_name, O_WRONLY|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO);
+		if (index_fd < 0) {
+			perror("Index file openat()");
+			ret = -1;
+			goto error;
+		}
+		file_stream->pos.index_fd = index_fd;
+		ret = create_stream_packet_index(td, file_stream);
+		if (ret)
+			goto error_index;
+	} else {
+		index_fd = open(index_name, O_RDONLY);
+		if (index_fd < 0) {
+			perror("Index file openat()");
+			ret = -1;
+			goto error;
+		}
+		file_stream->pos.index_fd = index_fd;
+		ret = import_stream_packet_index(td, file_stream);
+		if (ret)
+			goto error_index;
+	}
+	close(file_stream->pos.index_fd);
+	free(index_name);
+
 	/* Add stream file to stream class */
 	g_ptr_array_add(file_stream->parent.stream_class->streams,
 			&file_stream->parent);
@@ -1607,7 +1674,8 @@ int ctf_open_trace_read(struct ctf_trace *td,
 		/* Ignore hidden files, ., .. and metadata. */
 		if (!strncmp(diriter->d_name, ".", 1)
 				|| !strcmp(diriter->d_name, "..")
-				|| !strcmp(diriter->d_name, "metadata"))
+				|| !strcmp(diriter->d_name, "metadata")
+				|| !strncmp(diriter->d_name, "idx_", 4))
 			continue;
 		ret = ctf_open_file_stream_read(td, diriter->d_name,
 					flags, packet_seek);
diff --git a/include/babeltrace/ctf/types.h b/include/babeltrace/ctf/types.h
index 6a76c6b..ef79711 100644
--- a/include/babeltrace/ctf/types.h
+++ b/include/babeltrace/ctf/types.h
@@ -43,6 +43,7 @@ struct packet_index {
 	uint64_t timestamp_begin;
 	uint64_t timestamp_end;
 	uint64_t events_discarded;
+	uint64_t stream_id;
 	size_t events_discarded_len;	/* length of the field, in bits */
 };
 
@@ -52,6 +53,7 @@ struct packet_index {
 struct ctf_stream_pos {
 	struct stream_pos parent;
 	int fd;			/* backing file fd. -1 if unset. */
+	int index_fd;		/* index file fd. -1 if unset. */
 	GArray *packet_cycles_index;	/* contains struct packet_index in cycles */
 	GArray *packet_real_index;	/* contains struct packet_index in ns */
 	int prot;		/* mmap protection */
-- 
1.7.10.4




More information about the lttng-dev mailing list