[ltt-dev] [PATCH 1/2] Add the support for streaming analysis in the libltttraceread library
Mathieu Desnoyers
compudj at krystal.dyndns.org
Sat Sep 11 12:22:28 EDT 2010
* Oussama El Mfadli (oussama.el-mfadli at polymtl.ca) wrote:
> Hi,
> thanks for you quick answer. I made the modification. The only
> difference in the API is the new parameter in ltt_trace_open(..) that
> permit to set a flag in streaming mode. I also added the fallback if
> inotify is not available.
>
> Concerning the function to construct the block index. I modified the
> name. I hope they are more clear :
> ltt_trace_create_block_index : use when no block index is build
> ltt_trace_continue_block_index : use to continue the construct of the
> block index
> ltt_trace_update_block_index : an internal function used by both create
> and continue, because a big part of there code is the same.(This
> function is mainly added to prevent duplication of the source code).
>
> The current plan is that the application poll the fd(available from the
> trace structure) to check if new informations was received. Then call
> for update. Since the textDump is the only module that used this option,
> it should not affect the analysis performance. For the up-coming
> implentation in the GUI modules, the callback scheme would probably be
> more adapted.
>
> The patch is following:
>
> -----------
>
> Add the support for streaming analysis in the libltttraceread library
>
> - Modification of the opening of the tracefile
> and the way some informations are retreived
> - Add some implementations to append the new arriving blocks into
> the index
> ---
> ltt/ltt-private.h | 3 +
> ltt/trace.h | 38 ++++++-
> ltt/tracefile.c | 340
> +++++++++++++++++++++++++++++++++++++++++++++++------
> 3 files changed, 345 insertions(+), 36 deletions(-)
>
> diff --git a/ltt/ltt-private.h b/ltt/ltt-private.h
> index 65d73d1..665081a 100644
> --- a/ltt/ltt-private.h
> +++ b/ltt/ltt-private.h
> @@ -171,6 +171,9 @@ struct LttTracefile {
>
> /* Current block */
> LttBuffer buffer; //current buffer
> +
> + /* For streaming usage: in case the tracefile is currently empty */
> + gboolean streaming_is_empty;
> };
>
> /* The characteristics of the system on which the trace was obtained
> diff --git a/ltt/trace.h b/ltt/trace.h
> index e16c66f..aec3455 100644
> --- a/ltt/trace.h
> +++ b/ltt/trace.h
> @@ -25,6 +25,32 @@
> #include <stdint.h>
> #include <glib.h>
>
> +#include <linux/version.h>
> +
> +//Options used to open a trace
> +#define NO_FLAG 0
> +#define TRACE_STREAMING (1 << 0)
> +
> +#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,14)
> +#include <sys/inotify.h>
> +
> +#define HAS_INOTIFY
> +#else
> +
> +#undef HAS_INOTIFY
> +#endif
> +struct inotify_watch {
> + int wd;
> + char path_channel[PATH_MAX];
> + GQuark name;
> + guint num;
> +};
> +
> +struct inotify_watch_array {
> + struct inotify_watch *elem;
> + int num;
> +};
> +
> struct LttTrace {
> GQuark pathname; //the pathname of the trace
> //LttSystemDescription * system_description;//system description
> @@ -47,6 +73,13 @@ struct LttTrace {
> LttTime start_time_from_tsc;
>
> GData *tracefiles; //tracefiles groups
> +
> + // Keep a inotify file descriptor array to detect the changes in the
> files
> + struct inotify_watch_array inotify_watch_array;
> + int inotify_fd;
> + int open_tracefile_counter;
> +
> + gboolean streaming_is_enable;
proper english would be: streaming_is_enabled
> };
>
> static inline guint ltt_trace_get_num_cpu(LttTrace *t)
> @@ -65,8 +98,9 @@ static inline guint ltt_trace_get_num_cpu(LttTrace *t)
>
> */
>
> -LttTrace *ltt_trace_open(const gchar *pathname);
> -
> +LttTrace *ltt_trace_open(const gchar *pathname, unsigned int flags);
> +int open_tracefiles(LttTrace *trace, gchar *root_path,
> + gchar *relative_path);
> /* copy reopens a trace
> *
> * return value NULL if error while opening the trace
> diff --git a/ltt/tracefile.c b/ltt/tracefile.c
> index 54da747..3fbb016 100644
> --- a/ltt/tracefile.c
> +++ b/ltt/tracefile.c
> @@ -97,8 +97,6 @@ static guint64 cycles_2_ns(LttTracefile *tf, guint64
> cycles);
> /* go to the next event */
> static int ltt_seek_next_event(LttTracefile *tf);
>
> -static int open_tracefiles(LttTrace *trace, gchar *root_path,
> - gchar *relative_path);
> static int ltt_process_metadata_tracefile(LttTracefile *tf);
> static void ltt_tracefile_time_span_get(LttTracefile *tf,
> LttTime *start, LttTime *end);
> @@ -209,18 +207,14 @@ int get_block_offset_size(LttTracefile *tf, guint
> block_num,
> return 0;
> }
>
> -int ltt_trace_create_block_index(LttTracefile *tf)
> -{
> - int page_size = getpagesize();
> - uint64_t offset = 0;
> - unsigned long i = 0;
> - unsigned int header_map_size = PAGE_ALIGN(ltt_subbuffer_header_size());
> -
> - tf->buf_index = g_array_sized_new(FALSE, TRUE, sizeof(uint64_t),
> - DEFAULT_N_BLOCKS);
> -
> - g_assert(tf->buf_index->len == i);
>
> +static int ltt_trace_update_block_index(LttTracefile *tf, uint64_t
> offset, unsigned long firstBlock)
> +{
> + int i = firstBlock;
> + int page_size = getpagesize();
> + unsigned int header_map_size =
> PAGE_ALIGN(ltt_subbuffer_header_size());
> +
> + g_assert(tf->buf_index->len == i);
> while (offset < tf->file_size) {
> ltt_subbuffer_header_t *header;
> uint64_t *off;
> @@ -228,7 +222,7 @@ int ltt_trace_create_block_index(LttTracefile *tf)
> tf->buf_index = g_array_set_size(tf->buf_index, i + 1);
> off = &g_array_index(tf->buf_index, uint64_t, i);
> *off = offset;
> -
> +
Did you add a whitespace ?
> /* map block header */
> header = mmap(0, header_map_size, PROT_READ,
> MAP_PRIVATE, tf->fd, (off_t)offset);
> @@ -253,6 +247,46 @@ int ltt_trace_create_block_index(LttTracefile *tf)
> return 0;
> }
>
> +/* parse the new information from the file and reajust the number of
> blocks.
> + *
> + * Return value : 0 success, -1 error
> + */
> +int ltt_trace_continue_block_index(LttTracefile *tf)
> +{
> + int ret;
> + uint64_t offset;
> + uint32_t last_block_size;
> + unsigned long i = tf->num_blocks;
> + int page_size = getpagesize();
> + unsigned int header_map_size = PAGE_ALIGN(ltt_subbuffer_header_size());
> +
> + get_block_offset_size(tf, tf->num_blocks-1, &offset, &last_block_size);
> +
> + ltt_subbuffer_header_t *header_tmp = mmap(0, header_map_size, PROT_READ,
> + MAP_PRIVATE, tf->fd, (off_t)offset);
> + if(header_tmp == MAP_FAILED) {
> + perror("Error in allocating memory for buffer of tracefile");
> + return -1;
> + }
> +
> + /* read len, offset += len */
> + offset += ltt_get_uint32(LTT_GET_BO(tf), &header_tmp->sb_size);
> +
> + ret = ltt_trace_process_block_index(tf, offset, i);
hrm, did you try compiling this ? It should be updated to "update"
rather than "process".
> +}
> +
> +int ltt_trace_create_block_index(LttTracefile *tf)
> +{
> + int ret;
> + uint64_t offset = 0;
> + unsigned long i = 0;
> +
> + tf->buf_index = g_array_sized_new(FALSE, TRUE, sizeof(uint64_t),
> + DEFAULT_N_BLOCKS);
Please check memory allocation error.
> + ret = ltt_trace_process_block_index(tf, offset, i);
Same here (process -> update).
> + return ret;
> +}
> +
> /*****************************************************************************
> *Function name
> * ltt_tracefile_open : open a trace file, construct a LttTracefile
> @@ -263,12 +297,13 @@ int ltt_trace_create_block_index(LttTracefile *tf)
> *Return value
> * : 0 for success, -1 otherwise.
>
> ****************************************************************************/
>
>
> -
Useless empty line removal. Please separate coding style fixes from
actual features.
> static gint ltt_tracefile_open(LttTrace *t, gchar * fileName,
> LttTracefile *tf)
> {
> struct stat lTDFStat; /* Trace data file status */
> ltt_subbuffer_header_t *header;
> int page_size = getpagesize();
> +
Unneeded empty line.
> + tf->streaming_is_empty = FALSE;
>
> //open the file
> tf->long_name = g_quark_from_string(fileName);
> @@ -289,10 +324,31 @@ static gint ltt_tracefile_open(LttTrace *t, gchar
> * fileName, LttTracefile *tf)
> // Is the file large enough to contain a trace
> if(lTDFStat.st_size <
> (off_t)(ltt_subbuffer_header_size())){
> - g_print("The input data file %s does not contain a trace\n", fileName);
> - goto close_file;
> + if(t->streaming_is_enable == FALSE)
> + {
> + g_print("The input data file %s does not contain a trace\n",
> fileName);
> + goto close_file;
> + }
> + else
> + {
> + //We are in a streaming mode, so the file can be empty at the
> start of the analysis
> + tf->streaming_is_empty = TRUE;
> + tf->file_size = lTDFStat.st_size;
> + tf->events_lost = 0;
> + tf->subbuf_corrupt = 0;
> +
> + tf->buffer.head = NULL;
> + return -1;
> + }
> }
>
> + tf->buffer.head = NULL;
> +
> + //store the size of the file
> + tf->file_size = lTDFStat.st_size;
> + tf->events_lost = 0;
> + tf->subbuf_corrupt = 0;
> +
> /* Temporarily map the buffer start header to get trace information */
> /* Multiple of pages aligned head */
> tf->buffer.head = mmap(0,
> @@ -310,11 +366,6 @@ static gint ltt_tracefile_open(LttTrace *t, gchar *
> fileName, LttTracefile *tf)
> g_warning("parse_trace_header error");
> goto unmap_file;
> }
> -
> - //store the size of the file
> - tf->file_size = lTDFStat.st_size;
> - tf->events_lost = 0;
> - tf->subbuf_corrupt = 0;
>
> if(munmap(tf->buffer.head,
> PAGE_ALIGN(ltt_subbuffer_header_size()))) {
> @@ -353,6 +404,109 @@ end:
> return -1;
> }
>
> +/*****************************************************************************
> + *Function name
> + * ltt_tracefile_update : Update the informations concerning a tracefile
rather than "information", I would say something more specific, e.g.
must be called periodically to update trace file and file size
information.
> + * Must be called periodically in order to have the
> latest informations
> + *Input params
> + * tf : the tracefile
> + *Return value
> + * : 0 for success, -1 otherwise.
> +
> ****************************************************************************/
>
>
> +int ltt_tracefile_update(LttTracefile *tf, LttTrace *t)
> +{
> + struct stat lTDFStat; /* Trace data file status */
> + if(fstat(tf->fd, &lTDFStat) < 0){
Variable declarations must go before code. Please enable your compiler
warnings and address these issues.
> + perror("error in getting the tracefile informations.");
> + }
> + int page_size = getpagesize();
> +
> +#ifndef HAS_INOTIFY
> + if(tf->file_size < lTDFStat.st_size)
> + {
> +#endif
this could be done by creating a
#ifndef HAS_INOTIFY
static inline int ltt_tracefile_check_size(...)
{
return tf->file_size < ....)
}
#else
static inline int ltt_tracefile_check_size(...)
{
return 0;
}
#endif
and call this from the function.
> + //Ajust the size of the file
> + tf->file_size = lTDFStat.st_size;
> +
extra empty line.
> +
> + if(tf->streaming_is_empty)
> + {
> + ltt_subbuffer_header_t *header;
> +
> + /* Temporarily map the buffer start header to get trace
> information */
> + /* Multiple of pages aligned head */
> + tf->buffer.head = mmap(0,
> + PAGE_ALIGN(ltt_subbuffer_header_size()), PROT_READ,
> + MAP_PRIVATE, tf->fd, 0);
> + if(tf->buffer.head == MAP_FAILED) {
> + perror("Error in allocating memory for buffer of tracefile");
> + goto close_file;
> + }
> + g_assert( ( (gulong)tf->buffer.head&(8-1) ) == 0); // make sure
> it's aligned.
> +
> + header = (ltt_subbuffer_header_t *)tf->buffer.head;
> +
> + if(parse_trace_header(header, tf, t)) {
> + g_warning("parse_trace_header error");
> + goto unmap_file;
> + }
> +
> + if(munmap(tf->buffer.head,
> + PAGE_ALIGN(ltt_subbuffer_header_size()))) {
> + g_warning("unmap size : %zu\n",
> + PAGE_ALIGN(ltt_subbuffer_header_size()));
> + perror("munmap error");
> + g_assert(0);
> + }
> + tf->buffer.head = NULL;
> +
> + /* Create block index */
> + ltt_trace_create_block_index(tf);
> +
> + //read the first block
> + if(map_block(tf,0)) {
Please use a consistent bracket style...
if () {
} else {
}
rather than a mix of
if () {
}
else
{
}
> + perror("Cannot map block for tracefile");
> + goto close_file;
> + }
> + tf->streaming_is_empty = FALSE;
> + }
> + else
> + {
> + //Retreive the new subbuffers and index them
> + ltt_trace_update_block_index(tf);
> + }
> +
> + //Update the metadata table if applicable
> + if(tf->name == LTT_TRACEFILE_NAME_METADATA)
> + {
> + map_block(tf, tf->buffer.index + 1);
> + ltt_process_metadata_tracefile(tf);
> + }
> + ltt_tracefile_read(tf);
> +
> +#ifndef HAS_INOTIFY
> + return 1;
> + }
> +#endif
> +
> + return 0;
> +// /* Error */
> + unmap_file:
> + if(munmap(tf->buffer.head,
> + PAGE_ALIGN(ltt_subbuffer_header_size()))) {
> + g_warning("unmap size : %zu\n",
> + PAGE_ALIGN(ltt_subbuffer_header_size()));
> + perror("munmap error");
> + g_assert(0);
> + }
> + close_file:
> + close(tf->fd);
> +// end:
> + if (tf->buf_index)
> + g_array_free(tf->buf_index, TRUE);
Why are you freeing an array that has not been allocated within this
function ? This is asymmetric and unexpected. Can you explain ?
Same question for closing the file in this function.
> + return -1;
> +}
> +
>
> /*****************************************************************************
> *Function name
> @@ -567,7 +721,63 @@ static __attribute__ ((__unused__)) gboolean
> ltt_tracefile_group_has_cpu_online(
> }
> return 0;
> }
> +/****************************************************************************
> + * get_absolute_pathname
> + *
> + * Update the number of tracefile of a "name" group
> + * This function is normally called when a tracefile is detected but
> cannot be parse at
cannot be parsed
> + * this point. The group size will count this file.
will account for ...
> + *
> + * return the unique pathname in the system
> + *
> + * MD : Fixed this function so it uses realpath, dealing well with
> + * forgotten cases (.. were not used correctly before).
> + *
> +
> ****************************************************************************/
>
> +/*
> +* Update the number of tracefile in a "name" group
> +* This function is normally called when a tracefile is detected but
> cannot be parse at
cannot be parsed
> +* this point. The group size will count this file.
account for...
> +*
> +* return 1 if a new group is created
> +* return 0 otherwise
> +*
> +**/
> +int update_num_cpu(LttTrace *trace, GQuark name, guint num)
> +{
> + struct marker_data *mdata;
> + GArray *group;
> + int i;
> +
> + group = g_datalist_id_get_data(&trace->tracefiles, name);
> + if (group == NULL)
> + {
> + group = g_array_sized_new (FALSE, TRUE, sizeof(LttTracefile), 10);
> + g_datalist_id_set_data_full(&trace->tracefiles, name,
> + group, ltt_tracefile_group_destroy);
> + mdata = allocate_marker_data();
> + if (!mdata)
> + g_error("Error in allocating marker data");
> +
> + group = g_array_set_size(group, num+1);
> + for (i = 0; i < group->len; i++)
> + g_array_index (group, LttTracefile, i).mdata = mdata;
> + return 1;
> + }
> + if ( num+1 > group->len )
ah, a new coding style here ? if ( ... ) rather than if (...).
> + {
> + mdata = g_array_index (group, LttTracefile, 0).mdata;
> + int old_num = group->len;
> + group = g_array_set_size(group, num+1);
> + for (i = old_num; i < group->len; i++)
> + g_array_index (group, LttTracefile, i).mdata = mdata;
> + }
> + if( name != LTT_TRACEFILE_NAME_METADATA ) {
> + update_num_cpu(trace, LTT_TRACEFILE_NAME_METADATA, num);
> + }
> + return 0;
> +}
>
> /* Open each tracefile under a specific directory. Put them in a
> * GData : permits to access them using their tracefile group pathname.
> @@ -579,8 +789,7 @@ static __attribute__ ((__unused__)) gboolean
> ltt_tracefile_group_has_cpu_online(
> *
> * A tracefile group is simply an array where all the per cpu
> tracefiles sit.
> */
> -
> -static int open_tracefiles(LttTrace *trace, gchar *root_path, gchar
> *relative_path)
> +int open_tracefiles(LttTrace *trace, gchar *root_path, gchar
> *relative_path)
> {
> DIR *dir = opendir(root_path);
> struct dirent *entry;
> @@ -634,6 +843,19 @@ static int open_tracefiles(LttTrace *trace, gchar
> *root_path, gchar *relative_pa
> if(S_ISDIR(stat_buf.st_mode)) {
>
> g_debug("Entering subdirectory...\n");
> + //Monitor the directory if we are in a streaming mode
> +#ifdef HAS_INOTIFY
see other comment about ifdefs within functions. Please create static
inlines instead.
> + if(trace->streaming_is_enable == TRUE)
> + {
> + trace->inotify_watch_array.elem =
> realloc(trace->inotify_watch_array.elem,
> + ++trace->inotify_watch_array.num * sizeof(struct
> inotify_watch));
> +
> +
> trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].wd =
> inotify_add_watch(trace->inotify_fd, path, IN_CREATE | IN_OPEN |
> IN_CLOSE_WRITE);
> +
> strcpy(trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].path_channel,
> path);
> +
> trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].name =
> g_quark_from_string("");
> +
> trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].num =
> -1;
> + }
> +#endif
> ret = open_tracefiles(trace, path, rel_path);
> if(ret < 0) continue;
> } else if(S_ISREG(stat_buf.st_mode)) {
> @@ -647,12 +869,29 @@ static int open_tracefiles(LttTrace *trace, gchar
> *root_path, gchar *relative_pa
> creation = 0;
> if(get_tracefile_name_number(rel_path, &name, &num, &tid, &pgid,
> &creation))
> continue; /* invalid name */
> -
> +#ifdef HAS_INOTIFY
same here
Thanks,
Mathieu
> + //Monitor the directory if we are in a streaming mode
> + if(trace->streaming_is_enable == TRUE)
> + {
> + trace->inotify_watch_array.elem =
> realloc(trace->inotify_watch_array.elem,
> + ++trace->inotify_watch_array.num * sizeof(struct
> inotify_watch));
> +
> +
> trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].wd =
> inotify_add_watch(trace->inotify_fd, path, IN_OPEN | IN_MODIFY |
> IN_CLOSE);
> +
> strcpy(trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].path_channel,
> path);
> +
> trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].name =
> name;
> +
> trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].num =
> num;
> + }
> +#endif
> g_debug("Opening file.\n");
> if(ltt_tracefile_open(trace, path, &tmp_tf)) {
> g_info("Error opening tracefile %s", path);
> -
> - continue; /* error opening the tracefile : bad magic number ? */
> + if(trace->streaming_is_enable == TRUE)
> + {
> + // Take to account the current file in the Cpu count of the group
> + update_num_cpu(trace, name, num);
> + }
> + continue; /* error opening the tracefile : bad magic number ? */
> +
> }
>
> g_debug("Tracefile name is %s and number is %u",
> @@ -678,6 +917,9 @@ static int open_tracefiles(LttTrace *trace, gchar
> *root_path, gchar *relative_pa
> g_error("Error in allocating marker data");
> }
>
> + if(trace->streaming_is_enable == TRUE && name !=
> LTT_TRACEFILE_NAME_METADATA)
> + update_num_cpu(trace, LTT_TRACEFILE_NAME_METADATA, num);
> +
> /* Add the per cpu tracefile to the named group */
> unsigned int old_len = group->len;
> if(num+1 > old_len)
> @@ -802,8 +1044,7 @@ seek_error:
> *
> * pathname must be the directory of the trace
> */
> -
> -LttTrace *ltt_trace_open(const gchar *pathname)
> +LttTrace *ltt_trace_open(const gchar *pathname, unsigned int flags)
> {
> gchar abs_path[PATH_MAX];
> LttTrace * t;
> @@ -823,6 +1064,8 @@ LttTrace *ltt_trace_open(const gchar *pathname)
> get_absolute_pathname(pathname, abs_path);
> t->pathname = g_quark_from_string(abs_path);
>
> + t->streaming_is_enable = FALSE;
> +
> g_datalist_init(&t->tracefiles);
>
> /* Test to see if it looks like a trace */
> @@ -842,7 +1085,18 @@ LttTrace *ltt_trace_open(const gchar *pathname)
> }
> }
> closedir(dir);
> -
> +
> + if(flags & TRACE_STREAMING)
> + {
> + t->streaming_is_enable = TRUE;
> +#ifdef HAS_INOTIFY
> + t->inotify_fd = inotify_init();
> + fcntl(t->inotify_fd, F_SETFL, O_NONBLOCK);
> + t->inotify_watch_array.elem = NULL;
> + t->inotify_watch_array.num = 0;
> + t->open_tracefile_counter = 0;
> +#endif
> + }
> /* Open all the tracefiles */
> t->start_freq= 0;
> if(open_tracefiles(t, abs_path, "")) {
> @@ -858,15 +1112,30 @@ LttTrace *ltt_trace_open(const gchar *pathname)
> }
>
> /*
> - * Get the trace information for the metadata_0 tracefile.
> * Getting a correct trace start_time and start_tsc is insured by the
> fact
> * that no subbuffers are supposed to be lost in the metadata channel.
> * Therefore, the first subbuffer contains the start_tsc timestamp in
> its
> * buffer header.
> */
> g_assert(group->len > 0);
> - tf = &g_array_index (group, LttTracefile, 0);
> - header = (ltt_subbuffer_header_t *)tf->buffer.head;
> + if(flags & TRACE_STREAMING)
> + {
> + //Get the trace information from the metadata tracefile.
> + for(i = 0; i < group->len; i++)
> + {
> + tf = &g_array_index (group, LttTracefile, i);
> + header = (ltt_subbuffer_header_t *)tf->buffer.head;
> + if(header != NULL)
> + break;
> + }
> + }
> + else
> + {
> + //Get the trace information from the metadata_0 tracefile.
> + tf = &g_array_index (group, LttTracefile, 0);
> + header = (ltt_subbuffer_header_t *)tf->buffer.head;
> + }
> +
> ret = parse_trace_header(header, tf, t);
> g_assert(!ret);
>
> @@ -910,7 +1179,10 @@ alloc_error:
> */
> LttTrace *ltt_trace_copy(LttTrace *self)
> {
> - return ltt_trace_open(g_quark_to_string(self->pathname));
> + unsigned int flag = NO_FLAG;
> + if(self->streaming_is_enable)
> + flag |= TRACE_STREAMING;
> + return ltt_trace_open(g_quark_to_string(self->pathname), flag);
> }
>
> /*
> --
> 1.7.0.4
>
--
Mathieu Desnoyers
Operating System Efficiency R&D Consultant
EfficiOS Inc.
http://www.efficios.com
More information about the lttng-dev
mailing list