[lttng-dev] [BABELTRACE RFC PATCH] basic index creation and importing
Julien Desfossez
jdesfossez at efficios.com
Fri Sep 14 12:07:24 EDT 2012
On 14/09/12 11:42 AM, Mathieu Desnoyers wrote:
> * Julien Desfossez (jdesfossez at efficios.com) wrote:
>> This patch provides a proof-of-concept of writing the index and importing it if
>> it exists. The intent is to have the consumer (not babeltrace) write the index
>> on disk while tracing and Babeltrace import it if possible.
>>
>> I added the stream_id field in the packet_index structure, I think it won't be
>> a problem since this field is in the tracer packet_header structure.
>>
>> Like we discussed, we'll make sure to write the fields in network byte order.
>>
>> This index should also be the basis of the live reading of streamed traces. I
>> think with the timestamp_end of the indexes we will have enough information to
>> do so : only read the trace data up to min(indexes.timestamp_end).
>>
>> In the past, we discussed a separate synchronization file in which we would
>> write the max sequence number that can be read for each stream, but I think we
>> can simplify that by just reading this index.
>
> No. The simplification you propose does not take into account streams
> that do _not_ produce data. This is why the separate sync file is
> required.
Ah yes, that's why we had this scheme :-) !
Ok, I'll start working on that.
Thanks,
Julien
>
> Thanks,
>
> Mathieu
>
>>
>> Before I start working on the consumer, I'd like your opinion on that.
>>
>> Thanks,
>>
>> Julien
>>
>> Signed-off-by: Julien Desfossez <jdesfossez at efficios.com>
>> ---
>> formats/ctf/ctf.c | 78 +++++++++++++++++++++++++++++++++++++---
>> include/babeltrace/ctf/types.h | 2 ++
>> 2 files changed, 75 insertions(+), 5 deletions(-)
>>
>> diff --git a/formats/ctf/ctf.c b/formats/ctf/ctf.c
>> index f1fdba1..c82dd3c 100644
>> --- a/formats/ctf/ctf.c
>> +++ b/formats/ctf/ctf.c
>> @@ -1218,6 +1218,39 @@ error:
>> return ret;
>> }
>>
>> +static
>> +int import_stream_packet_index(struct ctf_trace *td,
>> + struct ctf_file_stream *file_stream)
>> +{
>> + struct ctf_stream_declaration *stream;
>> + struct ctf_stream_pos *pos;
>> + struct packet_index index;
>> + int ret, index_read;
>> + int first_packet = 1;
>> +
>> + pos = &file_stream->pos;
>> +
>> + while ((index_read = read(pos->index_fd, &index, sizeof(struct packet_index)))) {
>> + if (first_packet) {
>> + file_stream->parent.stream_id = index.stream_id;
>> + stream = g_ptr_array_index(td->streams, index.stream_id);
>> + if (!stream) {
>> + fprintf(stderr, "[error] Stream %" PRIu64 " is not declared in metadata.\n",
>> + index.stream_id);
>> + return -EINVAL;
>> + }
>> + file_stream->parent.stream_class = stream;
>> + ret = create_stream_definitions(td, &file_stream->parent);
>> + if (ret)
>> + return ret;
>> + first_packet = 0;
>> + }
>> + /* add index to packet array */
>> + g_array_append_val(file_stream->pos.packet_cycles_index, index);
>> + }
>> +
>> + return 0;
>> +}
>>
>> static
>> int create_stream_packet_index(struct ctf_trace *td,
>> @@ -1323,6 +1356,7 @@ int create_stream_packet_index(struct ctf_trace *td,
>>
>> field = struct_definition_get_field_from_index(file_stream->parent.trace_packet_header, len_index);
>> stream_id = get_unsigned_int(field);
>> + packet_index.stream_id = stream_id;
>> }
>> }
>>
>> @@ -1441,6 +1475,7 @@ int create_stream_packet_index(struct ctf_trace *td,
>>
>> /* add index to packet array */
>> g_array_append_val(file_stream->pos.packet_cycles_index, packet_index);
>> + write(pos->index_fd, &packet_index, sizeof(struct packet_index));
>>
>> pos->mmap_offset += packet_index.packet_size / CHAR_BIT;
>> }
>> @@ -1481,9 +1516,10 @@ int ctf_open_file_stream_read(struct ctf_trace *td, const char *path, int flags,
>> void (*packet_seek)(struct stream_pos *pos, size_t index,
>> int whence))
>> {
>> - int ret, fd;
>> + int ret, fd, index_fd;
>> struct ctf_file_stream *file_stream;
>> struct stat statbuf;
>> + char *index_name;
>>
>> fd = openat(td->dirfd, path, flags);
>> if (fd < 0) {
>> @@ -1523,9 +1559,40 @@ int ctf_open_file_stream_read(struct ctf_trace *td, const char *path, int flags,
>> * For now, only a single clock per trace is supported.
>> */
>> file_stream->parent.current_clock = td->single_clock;
>> - ret = create_stream_packet_index(td, file_stream);
>> - if (ret)
>> - goto error_index;
>> +
>> + index_name = malloc((strlen(td->path) + strlen(path)) * sizeof(char) + 5);
>> + if (!index_name) {
>> + fprintf(stderr, "[error] Cannot allocate index filename\n");
>> + goto error;
>> + }
>> + sprintf(index_name, "%s/idx_%s", td->path, path);
>> +
>> + if (access(index_name, F_OK) != 0) {
>> + index_fd = open(index_name, O_WRONLY|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO);
>> + if (index_fd < 0) {
>> + perror("Index file openat()");
>> + ret = -1;
>> + goto error;
>> + }
>> + file_stream->pos.index_fd = index_fd;
>> + ret = create_stream_packet_index(td, file_stream);
>> + if (ret)
>> + goto error_index;
>> + } else {
>> + index_fd = open(index_name, O_RDONLY);
>> + if (index_fd < 0) {
>> + perror("Index file openat()");
>> + ret = -1;
>> + goto error;
>> + }
>> + file_stream->pos.index_fd = index_fd;
>> + ret = import_stream_packet_index(td, file_stream);
>> + if (ret)
>> + goto error_index;
>> + }
>> + close(file_stream->pos.index_fd);
>> + free(index_name);
>> +
>> /* Add stream file to stream class */
>> g_ptr_array_add(file_stream->parent.stream_class->streams,
>> &file_stream->parent);
>> @@ -1607,7 +1674,8 @@ int ctf_open_trace_read(struct ctf_trace *td,
>> /* Ignore hidden files, ., .. and metadata. */
>> if (!strncmp(diriter->d_name, ".", 1)
>> || !strcmp(diriter->d_name, "..")
>> - || !strcmp(diriter->d_name, "metadata"))
>> + || !strcmp(diriter->d_name, "metadata")
>> + || !strncmp(diriter->d_name, "idx_", 4))
>> continue;
>> ret = ctf_open_file_stream_read(td, diriter->d_name,
>> flags, packet_seek);
>> diff --git a/include/babeltrace/ctf/types.h b/include/babeltrace/ctf/types.h
>> index 6a76c6b..ef79711 100644
>> --- a/include/babeltrace/ctf/types.h
>> +++ b/include/babeltrace/ctf/types.h
>> @@ -43,6 +43,7 @@ struct packet_index {
>> uint64_t timestamp_begin;
>> uint64_t timestamp_end;
>> uint64_t events_discarded;
>> + uint64_t stream_id;
>> size_t events_discarded_len; /* length of the field, in bits */
>> };
>>
>> @@ -52,6 +53,7 @@ struct packet_index {
>> struct ctf_stream_pos {
>> struct stream_pos parent;
>> int fd; /* backing file fd. -1 if unset. */
>> + int index_fd; /* index file fd. -1 if unset. */
>> GArray *packet_cycles_index; /* contains struct packet_index in cycles */
>> GArray *packet_real_index; /* contains struct packet_index in ns */
>> int prot; /* mmap protection */
>> --
>> 1.7.10.4
>>
>
More information about the lttng-dev
mailing list