[lttng-dev] [BABELTRACE RFC PATCH] basic index creation and importing

Mathieu Desnoyers mathieu.desnoyers at efficios.com
Fri Sep 14 11:42:07 EDT 2012


* Julien Desfossez (jdesfossez at efficios.com) wrote:
> This patch provides a proof-of-concept of writing the index and importing it if
> it exists. The intent is to have the consumer (not babeltrace) write the index
> on disk while tracing and Babeltrace import it if possible.
> 
> I added the stream_id field in the packet_index structure, I think it won't be
> a problem since this field is in the tracer packet_header structure.
> 
> Like we discussed, we'll make sure to write the fields in network byte order.
> 
> This index should also be the basis of the live reading of streamed traces. I
> think with the timestamp_end of the indexes we will have enough information to
> do so : only read the trace data up to min(indexes.timestamp_end).
> 
> In the past, we discussed a separate synchronization file in which we would
> write the max sequence number that can be read for each stream, but I think we
> can simplify that by just reading this index.

No. The simplification you propose does not take into account streams
that do _not_ produce data. This is why the separate sync file is
required.

Thanks,

Mathieu

> 
> Before I start working on the consumer, I'd like your opinion on that.
> 
> Thanks,
> 
> Julien
> 
> Signed-off-by: Julien Desfossez <jdesfossez at efficios.com>
> ---
>  formats/ctf/ctf.c              |   78 +++++++++++++++++++++++++++++++++++++---
>  include/babeltrace/ctf/types.h |    2 ++
>  2 files changed, 75 insertions(+), 5 deletions(-)
> 
> diff --git a/formats/ctf/ctf.c b/formats/ctf/ctf.c
> index f1fdba1..c82dd3c 100644
> --- a/formats/ctf/ctf.c
> +++ b/formats/ctf/ctf.c
> @@ -1218,6 +1218,39 @@ error:
>  	return ret;
>  }
>  
> +static
> +int import_stream_packet_index(struct ctf_trace *td,
> +		struct ctf_file_stream *file_stream)
> +{
> +	struct ctf_stream_declaration *stream;
> +	struct ctf_stream_pos *pos;
> +	struct packet_index index;
> +	int ret, index_read;
> +	int first_packet = 1;
> +
> +	pos = &file_stream->pos;
> +
> +	while ((index_read = read(pos->index_fd, &index, sizeof(struct packet_index)))) {
> +		if (first_packet) {
> +			file_stream->parent.stream_id = index.stream_id;
> +			stream = g_ptr_array_index(td->streams, index.stream_id);
> +			if (!stream) {
> +				fprintf(stderr, "[error] Stream %" PRIu64 " is not declared in metadata.\n",
> +						index.stream_id);
> +				return -EINVAL;
> +			}
> +			file_stream->parent.stream_class = stream;
> +			ret = create_stream_definitions(td, &file_stream->parent);
> +			if (ret)
> +				return ret;
> +			first_packet = 0;
> +		}
> +		/* add index to packet array */
> +		g_array_append_val(file_stream->pos.packet_cycles_index, index);
> +	}
> +
> +	return 0;
> +}
>  
>  static
>  int create_stream_packet_index(struct ctf_trace *td,
> @@ -1323,6 +1356,7 @@ int create_stream_packet_index(struct ctf_trace *td,
>  
>  				field = struct_definition_get_field_from_index(file_stream->parent.trace_packet_header, len_index);
>  				stream_id = get_unsigned_int(field);
> +				packet_index.stream_id = stream_id;
>  			}
>  		}
>  
> @@ -1441,6 +1475,7 @@ int create_stream_packet_index(struct ctf_trace *td,
>  
>  		/* add index to packet array */
>  		g_array_append_val(file_stream->pos.packet_cycles_index, packet_index);
> +		write(pos->index_fd, &packet_index, sizeof(struct packet_index));
>  
>  		pos->mmap_offset += packet_index.packet_size / CHAR_BIT;
>  	}
> @@ -1481,9 +1516,10 @@ int ctf_open_file_stream_read(struct ctf_trace *td, const char *path, int flags,
>  		void (*packet_seek)(struct stream_pos *pos, size_t index,
>  			int whence))
>  {
> -	int ret, fd;
> +	int ret, fd, index_fd;
>  	struct ctf_file_stream *file_stream;
>  	struct stat statbuf;
> +	char *index_name;
>  
>  	fd = openat(td->dirfd, path, flags);
>  	if (fd < 0) {
> @@ -1523,9 +1559,40 @@ int ctf_open_file_stream_read(struct ctf_trace *td, const char *path, int flags,
>  	 * For now, only a single clock per trace is supported.
>  	 */
>  	file_stream->parent.current_clock = td->single_clock;
> -	ret = create_stream_packet_index(td, file_stream);
> -	if (ret)
> -		goto error_index;
> +
> +	index_name = malloc((strlen(td->path) + strlen(path)) * sizeof(char) + 5);
> +	if (!index_name) {
> +		fprintf(stderr, "[error] Cannot allocate index filename\n");
> +		goto error;
> +	}
> +	sprintf(index_name, "%s/idx_%s", td->path, path);
> +
> +	if (access(index_name, F_OK) != 0) {
> +		index_fd = open(index_name, O_WRONLY|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO);
> +		if (index_fd < 0) {
> +			perror("Index file openat()");
> +			ret = -1;
> +			goto error;
> +		}
> +		file_stream->pos.index_fd = index_fd;
> +		ret = create_stream_packet_index(td, file_stream);
> +		if (ret)
> +			goto error_index;
> +	} else {
> +		index_fd = open(index_name, O_RDONLY);
> +		if (index_fd < 0) {
> +			perror("Index file openat()");
> +			ret = -1;
> +			goto error;
> +		}
> +		file_stream->pos.index_fd = index_fd;
> +		ret = import_stream_packet_index(td, file_stream);
> +		if (ret)
> +			goto error_index;
> +	}
> +	close(file_stream->pos.index_fd);
> +	free(index_name);
> +
>  	/* Add stream file to stream class */
>  	g_ptr_array_add(file_stream->parent.stream_class->streams,
>  			&file_stream->parent);
> @@ -1607,7 +1674,8 @@ int ctf_open_trace_read(struct ctf_trace *td,
>  		/* Ignore hidden files, ., .. and metadata. */
>  		if (!strncmp(diriter->d_name, ".", 1)
>  				|| !strcmp(diriter->d_name, "..")
> -				|| !strcmp(diriter->d_name, "metadata"))
> +				|| !strcmp(diriter->d_name, "metadata")
> +				|| !strncmp(diriter->d_name, "idx_", 4))
>  			continue;
>  		ret = ctf_open_file_stream_read(td, diriter->d_name,
>  					flags, packet_seek);
> diff --git a/include/babeltrace/ctf/types.h b/include/babeltrace/ctf/types.h
> index 6a76c6b..ef79711 100644
> --- a/include/babeltrace/ctf/types.h
> +++ b/include/babeltrace/ctf/types.h
> @@ -43,6 +43,7 @@ struct packet_index {
>  	uint64_t timestamp_begin;
>  	uint64_t timestamp_end;
>  	uint64_t events_discarded;
> +	uint64_t stream_id;
>  	size_t events_discarded_len;	/* length of the field, in bits */
>  };
>  
> @@ -52,6 +53,7 @@ struct packet_index {
>  struct ctf_stream_pos {
>  	struct stream_pos parent;
>  	int fd;			/* backing file fd. -1 if unset. */
> +	int index_fd;		/* index file fd. -1 if unset. */
>  	GArray *packet_cycles_index;	/* contains struct packet_index in cycles */
>  	GArray *packet_real_index;	/* contains struct packet_index in ns */
>  	int prot;		/* mmap protection */
> -- 
> 1.7.10.4
> 

-- 
Mathieu Desnoyers
Operating System Efficiency R&D Consultant
EfficiOS Inc.
http://www.efficios.com



More information about the lttng-dev mailing list