[ltt-dev] [PATCH 1/2] Add the support for streaming analysis in the libltttraceread library
Mathieu Desnoyers
compudj at krystal.dyndns.org
Fri Aug 27 13:53:36 EDT 2010
* Oussama El Mfadli (oussama.el-mfadli at polymtl.ca) wrote:
> Add the support for streaming analysis in the libltttraceread library
> - Modification of the opening of the tracefile
> and the way some informations are retreived
> - Add some implementations to append the new arriving blocks into
> the index
>
> ---
> ltt/ltt-private.h | 3 +
> ltt/trace.h | 22 +++-
> ltt/tracefile.c | 366
> +++++++++++++++++++++++++++++++++++++++++++++++-----
> 3 files changed, 354 insertions(+), 37 deletions(-)
>
> diff --git a/ltt/ltt-private.h b/ltt/ltt-private.h
> index 65d73d1..a0bcad3 100644
> --- a/ltt/ltt-private.h
> +++ b/ltt/ltt-private.h
> @@ -171,6 +171,9 @@ struct LttTracefile {
>
> /* Current block */
> LttBuffer buffer; //current buffer
> +
> + /* For streaming usage: in case the tracefile is currently empty */
> + gboolean Streaming_isEmpty;
PleaseFollowOurCodingStyle, which would be:
streaming_is_empty for the variable name.
> };
>
> /* The characteristics of the system on which the trace was obtained
> diff --git a/ltt/trace.h b/ltt/trace.h
> index e16c66f..abbb254 100644
> --- a/ltt/trace.h
> +++ b/ltt/trace.h
> @@ -25,6 +25,20 @@
> #include <stdint.h>
> #include <glib.h>
>
> +#include <sys/inotify.h>
Here too, please used ifdefs to detect the kernel header version, and
fallback on a timer-based fallback.
> +
> +struct inotify_watch {
> + int wd;
> + char path_channel[PATH_MAX];
> + GQuark name;
> + guint num;
> +};
> +
> +struct inotify_watch_array {
> + struct inotify_watch *elem;
> + int num;
> +};
> +
> struct LttTrace {
> GQuark pathname; //the pathname of the trace
> //LttSystemDescription * system_description;//system description
> @@ -47,6 +61,11 @@ struct LttTrace {
> LttTime start_time_from_tsc;
>
> GData *tracefiles; //tracefiles groups
> +
> + // Keep a inotify file descriptor array to detect the changes in the
> files
> + struct inotify_watch_array inotify_watch_array;
> + int inotify_fd;
> + int open_tracefile_counter;
> };
>
> static inline guint ltt_trace_get_num_cpu(LttTrace *t)
> @@ -66,7 +85,8 @@ static inline guint ltt_trace_get_num_cpu(LttTrace *t)
> */
>
> LttTrace *ltt_trace_open(const gchar *pathname);
> -
> +LttTrace *ltt_trace_open_streaming(const gchar *pathname, gboolean
> isStreaming);
Why do we need new API members for this ?
Please just modify ltt_trace_open, maybe adding a
unsigned int flag
field with a "#define TRACE_STREAMING (1 << 0)"
flag.
> +void ltt_tracefile_add_streaming( LttTrace *t, gchar *root_path, gchar*
> rel_path);
Again, why this new API member ? What is wrong with ltt_tracefile_open ?
> /* copy reopens a trace
> *
> * return value NULL if error while opening the trace
> diff --git a/ltt/tracefile.c b/ltt/tracefile.c
> index 54da747..2d03ed3 100644
> --- a/ltt/tracefile.c
> +++ b/ltt/tracefile.c
> @@ -99,6 +99,8 @@ static int ltt_seek_next_event(LttTracefile *tf);
>
> static int open_tracefiles(LttTrace *trace, gchar *root_path,
> gchar *relative_path);
> +static int open_tracefiles_streaming(LttTrace *trace, gchar *root_path,
> + gchar *relative_path, gboolean isStreaming);
Same here.
> static int ltt_process_metadata_tracefile(LttTracefile *tf);
> static void ltt_tracefile_time_span_get(LttTracefile *tf,
> LttTime *start, LttTime *end);
> @@ -209,26 +211,22 @@ int get_block_offset_size(LttTracefile *tf, guint
> block_num,
> return 0;
> }
>
> -int ltt_trace_create_block_index(LttTracefile *tf)
> -{
> - int page_size = getpagesize();
> - uint64_t offset = 0;
> - unsigned long i = 0;
> - unsigned int header_map_size = PAGE_ALIGN(ltt_subbuffer_header_size());
>
> - tf->buf_index = g_array_sized_new(FALSE, TRUE, sizeof(uint64_t),
> - DEFAULT_N_BLOCKS);
> -
> - g_assert(tf->buf_index->len == i);
> -
> - while (offset < tf->file_size) {
> +int ltt_trace_process_block_index(LttTracefile *tf, uint64_t offset,
what do you mean by "process" ?
Maybe you want to change the name from "create" to "update_range" ?
> unsigned long firstBlock)
> +{
> + int i = firstBlock;
> + int page_size = getpagesize();
> + unsigned int header_map_size =
> PAGE_ALIGN(ltt_subbuffer_header_size());
> +
> + g_assert(tf->buf_index->len == i);
> + while (offset < ltt_get_uint32(LTT_GET_BO(tf), &tf->file_size)) {
Uh ? what are you using the byte-order swapping on a field that's in the
LttTracefile structure ? This is incorrect.
> ltt_subbuffer_header_t *header;
> uint64_t *off;
>
> tf->buf_index = g_array_set_size(tf->buf_index, i + 1);
> off = &g_array_index(tf->buf_index, uint64_t, i);
> *off = offset;
> -
> +
Hrm, adding whitespace ?
> /* map block header */
> header = mmap(0, header_map_size, PROT_READ,
> MAP_PRIVATE, tf->fd, (off_t)offset);
> @@ -253,22 +251,65 @@ int ltt_trace_create_block_index(LttTracefile *tf)
> return 0;
> }
>
> +/* parse the new information from the file and reajust the number of
> blocks.
> + *
> + * Return value : 0 success, -1 error
> + */
> +int ltt_trace_update_block_index(LttTracefile *tf)
> +{
> + int ret;
> + uint64_t offset;
> + uint32_t last_block_size;
> + unsigned long i = tf->num_blocks;
> + int page_size = getpagesize();
> + unsigned int header_map_size = PAGE_ALIGN(ltt_subbuffer_header_size());
> +
> + get_block_offset_size(tf, tf->num_blocks-1, &offset, &last_block_size)
missing ;
> +
> + ltt_subbuffer_header_t *header_tmp = mmap(0, header_map_size, PROT_READ,
> + MAP_PRIVATE, tf->fd, (off_t)offset);
> + if(header_tmp == MAP_FAILED) {
> + perror("Error in allocating memory for buffer of tracefile");
> + return -1;
> + }
> +
> + /* read len, offset += len */
> + offset += ltt_get_uint32(LTT_GET_BO(tf), &header_tmp->sb_size);
> +
> + ret = ltt_trace_process_block_index(tf, offset, i);
> +}
> +
> +int ltt_trace_create_block_index(LttTracefile *tf)
> +{
> + int ret;
> + uint64_t offset = 0;
> + unsigned long i = 0;
> +
> + tf->buf_index = g_array_sized_new(FALSE, TRUE, sizeof(uint64_t),
> + DEFAULT_N_BLOCKS);
> + ret = ltt_trace_process_block_index(tf, offset, i);
> + return ret;
> +}
> +
> /*****************************************************************************
> *Function name
> - * ltt_tracefile_open : open a trace file, construct a LttTracefile
> + * ltt_tracefile_open_streaming : open a trace file, construct a
> LttTracefile
> + * The trace file can be a streaming
> *Input params
> * t : the trace containing the tracefile
> * fileName : path name of the trace file
> * tf : the tracefile structure
> + * isStreaming : flag to open in a streaming mode (TRUE) or not
> (FALSE).
> *Return value
> * : 0 for success, -1 otherwise.
>
> ****************************************************************************/
>
>
> -
> -static gint ltt_tracefile_open(LttTrace *t, gchar * fileName,
> LttTracefile *tf)
> +static gint ltt_tracefile_open_streaming(LttTrace *t, gchar * fileName,
> LttTracefile *tf, gboolean isStreaming)
> {
> struct stat lTDFStat; /* Trace data file status */
> ltt_subbuffer_header_t *header;
> int page_size = getpagesize();
> +
> + tf->Streaming_isEmpty = FALSE;
>
> //open the file
> tf->long_name = g_quark_from_string(fileName);
> @@ -289,10 +330,31 @@ static gint ltt_tracefile_open(LttTrace *t, gchar
> * fileName, LttTracefile *tf)
> // Is the file large enough to contain a trace
> if(lTDFStat.st_size <
> (off_t)(ltt_subbuffer_header_size())){
> - g_print("The input data file %s does not contain a trace\n", fileName);
> - goto close_file;
> + if(isStreaming == FALSE)
> + {
> + g_print("The input data file %s does not contain a trace\n",
> fileName);
> + goto close_file;
> + }
> + else
> + {
> + //We are in a streaming mode, so the file can be empty at the
> start of the analysis
> + tf->Streaming_isEmpty = TRUE;
> + tf->file_size = lTDFStat.st_size;
> + tf->events_lost = 0;
> + tf->subbuf_corrupt = 0;
> +
> + tf->buffer.head = NULL;
> + return -1;
> + }
> }
>
> + tf->buffer.head = NULL;
> +
> + //store the size of the file
> + tf->file_size = lTDFStat.st_size;
> + tf->events_lost = 0;
> + tf->subbuf_corrupt = 0;
> +
> /* Temporarily map the buffer start header to get trace information */
> /* Multiple of pages aligned head */
> tf->buffer.head = mmap(0,
> @@ -310,11 +372,6 @@ static gint ltt_tracefile_open(LttTrace *t, gchar *
> fileName, LttTracefile *tf)
> g_warning("parse_trace_header error");
> goto unmap_file;
> }
> -
> - //store the size of the file
> - tf->file_size = lTDFStat.st_size;
> - tf->events_lost = 0;
> - tf->subbuf_corrupt = 0;
>
> if(munmap(tf->buffer.head,
> PAGE_ALIGN(ltt_subbuffer_header_size()))) {
> @@ -353,6 +410,114 @@ end:
> return -1;
> }
>
> +/*****************************************************************************
> + *Function name
> + * ltt_tracefile_open : open a trace file, construct a LttTracefile
> + *Input params
> + * t : the trace containing the tracefile
> + * fileName : path name of the trace file
> + * tf : the tracefile structure
> + *Return value
> + * : 0 for success, -1 otherwise.
> +
> ****************************************************************************/
>
>
> +static gint ltt_tracefile_open(LttTrace *t, gchar * fileName,
> LttTracefile *tf)
> +{
> + return ltt_tracefile_open_streaming(t, fileName, tf, FALSE);
> +}
> +
> +/*****************************************************************************
> + *Function name
> + * ltt_tracefile_update : Update the informations concerning a tracefile
> + * Must be called periodically in order to have the
> latest informations
Hrm, but in the textDump implementation, you busy-wait calling it all
the time.
Hrm, OK. So your plan seems to be that libltttraceread uses the
inotify/timer polling scheme to check the files. From there it updates
its internal library structures.
The application (e.g. textdump, lttv gui) is responsible for
periodically calling ltt_tracefile_update().
But this requires busy-waiting. libltttraceread should provide a
callback mechanism to let the application be informed of trace data
updates instead. This seems to be missing here.
This would fix problems with the current scheme. For instance, the
application should always be told to update its internal hook lists
based on newly perceived markers before it proceeds to read the rest of
the events. Otherwise, we're just going to miss delivery of some events,
which is bad.
Thanks,
Mathieu
> + *Input params
> + * tf : the tracefile
> + *Return value
> + * : 0 for success, -1 otherwise.
> +
> ****************************************************************************/
>
>
> +int ltt_tracefile_update(LttTracefile *tf, LttTrace *t)
> +{
> + struct stat lTDFStat; /* Trace data file status */
> + if(fstat(tf->fd, &lTDFStat) < 0){
> + perror("error in getting the tracefile informations.");
> + }
> + int page_size = getpagesize();
> +
> + //Ajust the size of the file
> + tf->file_size = lTDFStat.st_size;
> +
> + if(tf->Streaming_isEmpty)
> + {
> + ltt_subbuffer_header_t *header;
> +
> + /* Temporarily map the buffer start header to get trace
> information */
> + /* Multiple of pages aligned head */
> + tf->buffer.head = mmap(0,
> + PAGE_ALIGN(ltt_subbuffer_header_size()), PROT_READ,
> + MAP_PRIVATE, tf->fd, 0);
> + if(tf->buffer.head == MAP_FAILED) {
> + perror("Error in allocating memory for buffer of tracefile");
> + goto close_file;
> + }
> + g_assert( ( (gulong)tf->buffer.head&(8-1) ) == 0); // make sure
> it's aligned.
> +
> + header = (ltt_subbuffer_header_t *)tf->buffer.head;
> +
> + if(parse_trace_header(header, tf, t)) {
> + g_warning("parse_trace_header error");
> + goto unmap_file;
> + }
> +
> + if(munmap(tf->buffer.head,
> + PAGE_ALIGN(ltt_subbuffer_header_size()))) {
> + g_warning("unmap size : %zu\n",
> + PAGE_ALIGN(ltt_subbuffer_header_size()));
> + perror("munmap error");
> + g_assert(0);
> + }
> + tf->buffer.head = NULL;
> +
> + /* Create block index */
> + ltt_trace_create_block_index(tf);
> +
> + //read the first block
> + if(map_block(tf,0)) {
> + perror("Cannot map block for tracefile");
> + goto close_file;
> + }
> + tf->Streaming_isEmpty = FALSE;
> + }
> + else
> + {
> + //Retreive the new subbuffers and index them
> + ltt_trace_update_block_index(tf);
> + }
> +
> + //Update the metadata table if applicable
> + if(tf->name == LTT_TRACEFILE_NAME_METADATA)
> + {
> + map_block(tf, tf->buffer.index + 1);
> + ltt_process_metadata_tracefile(tf);
> + }
> + ltt_tracefile_read(tf);
> +
> + return 0;
> +// /* Error */
> + unmap_file:
> + if(munmap(tf->buffer.head,
> + PAGE_ALIGN(ltt_subbuffer_header_size()))) {
> + g_warning("unmap size : %zu\n",
> + PAGE_ALIGN(ltt_subbuffer_header_size()));
> + perror("munmap error");
> + g_assert(0);
> + }
> + close_file:
> + close(tf->fd);
> +// end:
> + if (tf->buf_index)
> + g_array_free(tf->buf_index, TRUE);
> + return -1;
> +}
> +
>
> /*****************************************************************************
> *Function name
> @@ -567,8 +732,77 @@ static __attribute__ ((__unused__)) gboolean
> ltt_tracefile_group_has_cpu_online(
> }
> return 0;
> }
> +/****************************************************************************
> + * get_absolute_pathname
> + *
> + * Update the number of tracefile of a "name" group
> + * This function is normally called when a tracefile is detected but
> cannot be parse at
> + * this point. The group size will count this file.
> + *
> + * return the unique pathname in the system
> + *
> + * MD : Fixed this function so it uses realpath, dealing well with
> + * forgotten cases (.. were not used correctly before).
> + *
> +
> ****************************************************************************/
>
> -
> +/*
> +* Update the number of tracefile in a "name" group
> +* This function is normally called when a tracefile is detected but
> cannot be parse at
> +* this point. The group size will count this file.
> +*
> +* return 1 if a new group is created
> +* return 0 otherwise
> +*
> +**/
> +int update_num_cpu(LttTrace *trace, GQuark name, guint num)
> +{
> + struct marker_data *mdata;
> + GArray *group;
> + int i;
> +
> + group = g_datalist_id_get_data(&trace->tracefiles, name);
> + if (group == NULL)
> + {
> + group = g_array_sized_new (FALSE, TRUE, sizeof(LttTracefile), 10);
> + g_datalist_id_set_data_full(&trace->tracefiles, name,
> + group, ltt_tracefile_group_destroy);
> + mdata = allocate_marker_data();
> + if (!mdata)
> + g_error("Error in allocating marker data");
> +
> + group = g_array_set_size(group, num+1);
> + for (i = 0; i < group->len; i++)
> + g_array_index (group, LttTracefile, i).mdata = mdata;
> + return 1;
> + }
> + if ( num+1 > group->len )
> + {
> + mdata = g_array_index (group, LttTracefile, 0).mdata;
> + int old_num = group->len;
> + group = g_array_set_size(group, num+1);
> + for (i = old_num; i < group->len; i++)
> + g_array_index (group, LttTracefile, i).mdata = mdata;
> + }
> + if( name != LTT_TRACEFILE_NAME_METADATA ) {
> + update_num_cpu(trace, LTT_TRACEFILE_NAME_METADATA, num);
> + }
> + return 0;
> +}
> +/*****************************************************************************
> + *Function name
> + * ltt_tracefile_add_streaming: open and add a tracefile to a trace
> + * normally used when a new file is received
> + * in a streaming analysis
> + *Input params
> + * t : trace in which the tracefile will be add
> + * root_path : path of the trace
> + * rel_path : relative path of the tracefile
> +
> ****************************************************************************/
> +void ltt_tracefile_add_streaming( LttTrace *t, gchar *root_path, gchar*
> rel_path)
> +{
> + open_tracefiles_streaming(t, root_path, rel_path, TRUE);
> +}
> /* Open each tracefile under a specific directory. Put them in a
> * GData : permits to access them using their tracefile group pathname.
> * i.e. access control/modules tracefile group by index :
> @@ -579,9 +813,13 @@ static __attribute__ ((__unused__)) gboolean
> ltt_tracefile_group_has_cpu_online(
> *
> * A tracefile group is simply an array where all the per cpu
> tracefiles sit.
> */
> -
> static int open_tracefiles(LttTrace *trace, gchar *root_path, gchar
> *relative_path)
> {
> + return open_tracefiles_streaming(trace, root_path, relative_path, FALSE);
> +}
> +
> +static int open_tracefiles_streaming(LttTrace *trace, gchar *root_path,
> gchar *relative_path, gboolean isStreaming)
> +{
> DIR *dir = opendir(root_path);
> struct dirent *entry;
> struct stat stat_buf;
> @@ -634,7 +872,18 @@ static int open_tracefiles(LttTrace *trace, gchar
> *root_path, gchar *relative_pa
> if(S_ISDIR(stat_buf.st_mode)) {
>
> g_debug("Entering subdirectory...\n");
> - ret = open_tracefiles(trace, path, rel_path);
> + //Monitor the directory if we are in a streaming mode
> + if(isStreaming == TRUE)
> + {
> + trace->inotify_watch_array.elem =
> realloc(trace->inotify_watch_array.elem,
> + ++trace->inotify_watch_array.num * sizeof(struct
> inotify_watch));
> +
> +
> trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].wd =
> inotify_add_watch(trace->inotify_fd, path, IN_CREATE | IN_OPEN |
> IN_CLOSE_WRITE);
> +
> strcpy(trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].path_channel,
> path);
> +
> trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].name =
> g_quark_from_string("");
> +
> trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].num =
> -1;
> + }
> + ret = open_tracefiles_streaming(trace, path, rel_path, isStreaming);
> if(ret < 0) continue;
> } else if(S_ISREG(stat_buf.st_mode)) {
> GQuark name;
> @@ -647,12 +896,27 @@ static int open_tracefiles(LttTrace *trace, gchar
> *root_path, gchar *relative_pa
> creation = 0;
> if(get_tracefile_name_number(rel_path, &name, &num, &tid, &pgid,
> &creation))
> continue; /* invalid name */
> -
> + //Monitor the directory if we are in a streaming mode
> + if(isStreaming == TRUE)
> + {
> + trace->inotify_watch_array.elem =
> realloc(trace->inotify_watch_array.elem,
> + ++trace->inotify_watch_array.num * sizeof(struct
> inotify_watch));
> +
> +
> trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].wd =
> inotify_add_watch(trace->inotify_fd, path, IN_OPEN | IN_MODIFY |
> IN_CLOSE);
> +
> strcpy(trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].path_channel,
> path);
> +
> trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].name =
> name;
> +
> trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].num =
> num;
> + }
> g_debug("Opening file.\n");
> - if(ltt_tracefile_open(trace, path, &tmp_tf)) {
> + if(ltt_tracefile_open_streaming(trace, path, &tmp_tf, isStreaming)) {
> g_info("Error opening tracefile %s", path);
> -
> - continue; /* error opening the tracefile : bad magic number ? */
> + if(isStreaming == TRUE)
> + {
> + // Take to account the current file in the Cpu count of the group
> + update_num_cpu(trace, name, num);
> + }
> + continue; /* error opening the tracefile : bad magic number ? */
> +
> }
>
> g_debug("Tracefile name is %s and number is %u",
> @@ -678,6 +942,9 @@ static int open_tracefiles(LttTrace *trace, gchar
> *root_path, gchar *relative_pa
> g_error("Error in allocating marker data");
> }
>
> + if(isStreaming == TRUE && name != LTT_TRACEFILE_NAME_METADATA)
> + update_num_cpu(trace, LTT_TRACEFILE_NAME_METADATA, num);
> +
> /* Add the per cpu tracefile to the named group */
> unsigned int old_len = group->len;
> if(num+1 > old_len)
> @@ -802,9 +1069,13 @@ seek_error:
> *
> * pathname must be the directory of the trace
> */
> -
> LttTrace *ltt_trace_open(const gchar *pathname)
> {
> + return ltt_trace_open_streaming(pathname, FALSE);
> +}
> +
> +LttTrace *ltt_trace_open_streaming(const gchar *pathname, gboolean
> isStreaming)
> +{
> gchar abs_path[PATH_MAX];
> LttTrace * t;
> LttTracefile *tf;
> @@ -842,10 +1113,18 @@ LttTrace *ltt_trace_open(const gchar *pathname)
> }
> }
> closedir(dir);
> -
> +
> + if(isStreaming == TRUE)
> + {
> + t->inotify_fd = inotify_init();
> + fcntl(t->inotify_fd, F_SETFL, O_NONBLOCK);
> + t->inotify_watch_array.elem = NULL;
> + t->inotify_watch_array.num = 0;
> + t->open_tracefile_counter = 0;
> + }
> /* Open all the tracefiles */
> t->start_freq= 0;
> - if(open_tracefiles(t, abs_path, "")) {
> + if(open_tracefiles_streaming(t, abs_path, "", isStreaming)) {
> g_warning("Error opening tracefile %s", abs_path);
> goto find_error;
> }
> @@ -858,15 +1137,30 @@ LttTrace *ltt_trace_open(const gchar *pathname)
> }
>
> /*
> - * Get the trace information for the metadata_0 tracefile.
> * Getting a correct trace start_time and start_tsc is insured by the
> fact
> * that no subbuffers are supposed to be lost in the metadata channel.
> * Therefore, the first subbuffer contains the start_tsc timestamp in
> its
> * buffer header.
> */
> g_assert(group->len > 0);
> - tf = &g_array_index (group, LttTracefile, 0);
> - header = (ltt_subbuffer_header_t *)tf->buffer.head;
> + if(isStreaming == TRUE)
> + {
> + //Get the trace information from the metadata tracefile.
> + for(i = 0; i < group->len; i++)
> + {
> + tf = &g_array_index (group, LttTracefile, i);
> + header = (ltt_subbuffer_header_t *)tf->buffer.head;
> + if(header != NULL)
> + break;
> + }
> + }
> + else
> + {
> + //Get the trace information from the metadata_0 tracefile.
> + tf = &g_array_index (group, LttTracefile, 0);
> + header = (ltt_subbuffer_header_t *)tf->buffer.head;
> + }
> +
> ret = parse_trace_header(header, tf, t);
> g_assert(!ret);
>
> --
> 1.7.0.4
>
>
> _______________________________________________
> ltt-dev mailing list
> ltt-dev at lists.casi.polymtl.ca
> http://lists.casi.polymtl.ca/cgi-bin/mailman/listinfo/ltt-dev
>
--
Mathieu Desnoyers
Operating System Efficiency R&D Consultant
EfficiOS Inc.
http://www.efficios.com
More information about the lttng-dev
mailing list