[ltt-dev] [PATCH 1/2] Add the support for streaming analysis in the libltttraceread library

Oussama El Mfadli oussama.el-mfadli at polymtl.ca
Fri Aug 27 20:23:12 EDT 2010


Hi,
thanks for you quick answer. I made the modification. The only 
difference in the API is the new parameter in ltt_trace_open(..) that 
permit to set a flag in streaming mode. I also added the fallback if 
inotify is not available.

Concerning the function to construct the block index. I modified the 
name. I hope they are more clear :
ltt_trace_create_block_index : use when no block index is build
ltt_trace_continue_block_index  : use to continue the construct of the 
block index
ltt_trace_update_block_index  : an internal function used by both create 
and continue, because a big part of there code is the same.(This 
function is mainly added to prevent duplication of the source code).

The current plan is that the application poll the fd(available from the 
trace structure) to check if new informations was received. Then call 
for update. Since the textDump is the only module that used this option, 
it should not affect the analysis performance. For the up-coming 
implentation in the GUI modules, the callback scheme would probably be 
more adapted.

The patch is following:

-----------

Add the support for streaming analysis in the libltttraceread library

     - Modification of the opening of the tracefile
        and the way some informations are retreived
      - Add some implementations to append the new arriving blocks into 
the index
---
  ltt/ltt-private.h |    3 +
  ltt/trace.h       |   38 ++++++-
  ltt/tracefile.c   |  340 
+++++++++++++++++++++++++++++++++++++++++++++++------
  3 files changed, 345 insertions(+), 36 deletions(-)

diff --git a/ltt/ltt-private.h b/ltt/ltt-private.h
index 65d73d1..665081a 100644
--- a/ltt/ltt-private.h
+++ b/ltt/ltt-private.h
@@ -171,6 +171,9 @@ struct LttTracefile {

    /* Current block */
    LttBuffer buffer;                  //current buffer
+
+  /* For streaming usage: in case the tracefile is currently empty */
+  gboolean streaming_is_empty;
  };

  /* The characteristics of the system on which the trace was obtained
diff --git a/ltt/trace.h b/ltt/trace.h
index e16c66f..aec3455 100644
--- a/ltt/trace.h
+++ b/ltt/trace.h
@@ -25,6 +25,32 @@
  #include <stdint.h>
  #include <glib.h>

+#include <linux/version.h>
+
+//Options used to open a trace
+#define NO_FLAG 0
+#define TRACE_STREAMING (1 << 0)
+
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,14)
+#include <sys/inotify.h>
+
+#define HAS_INOTIFY
+#else
+
+#undef HAS_INOTIFY
+#endif
+struct inotify_watch {
+    int wd;
+    char path_channel[PATH_MAX];
+    GQuark name;
+    guint num;
+};
+
+struct inotify_watch_array {
+    struct inotify_watch *elem;
+    int num;
+};
+
  struct LttTrace {
    GQuark pathname;                          //the pathname of the trace
    //LttSystemDescription * system_description;//system description
@@ -47,6 +73,13 @@ struct LttTrace {
    LttTime   start_time_from_tsc;

    GData     *tracefiles;                    //tracefiles groups
+
+  // Keep a inotify file descriptor array to detect the changes in the 
files
+  struct inotify_watch_array inotify_watch_array;
+  int inotify_fd;
+  int open_tracefile_counter;
+
+  gboolean streaming_is_enable;
  };

  static inline guint ltt_trace_get_num_cpu(LttTrace *t)
@@ -65,8 +98,9 @@ static inline guint ltt_trace_get_num_cpu(LttTrace *t)

     */

-LttTrace *ltt_trace_open(const gchar *pathname);
-
+LttTrace *ltt_trace_open(const gchar *pathname, unsigned int flags);
+int open_tracefiles(LttTrace *trace, gchar *root_path,
+    gchar *relative_path);
  /* copy reopens a trace
   *
   * return value NULL if error while opening the trace
diff --git a/ltt/tracefile.c b/ltt/tracefile.c
index 54da747..3fbb016 100644
--- a/ltt/tracefile.c
+++ b/ltt/tracefile.c
@@ -97,8 +97,6 @@ static guint64 cycles_2_ns(LttTracefile *tf, guint64 
cycles);
  /* go to the next event */
  static int ltt_seek_next_event(LttTracefile *tf);

-static int open_tracefiles(LttTrace *trace, gchar *root_path,
-    gchar *relative_path);
  static int ltt_process_metadata_tracefile(LttTracefile *tf);
  static void ltt_tracefile_time_span_get(LttTracefile *tf,
                                          LttTime *start, LttTime *end);
@@ -209,18 +207,14 @@ int get_block_offset_size(LttTracefile *tf, guint 
block_num,
    return 0;
  }

-int ltt_trace_create_block_index(LttTracefile *tf)
-{
-  int page_size = getpagesize();
-  uint64_t offset = 0;
-  unsigned long i = 0;
-  unsigned int header_map_size = PAGE_ALIGN(ltt_subbuffer_header_size());
-
-  tf->buf_index = g_array_sized_new(FALSE, TRUE, sizeof(uint64_t),
-                                    DEFAULT_N_BLOCKS);
-
-  g_assert(tf->buf_index->len == i);

+static int ltt_trace_update_block_index(LttTracefile *tf, uint64_t 
offset, unsigned long firstBlock)
+{
+      int i = firstBlock;
+      int page_size = getpagesize();
+      unsigned int header_map_size = 
PAGE_ALIGN(ltt_subbuffer_header_size());
+
+    g_assert(tf->buf_index->len == i);
    while (offset < tf->file_size) {
      ltt_subbuffer_header_t *header;
      uint64_t *off;
@@ -228,7 +222,7 @@ int ltt_trace_create_block_index(LttTracefile *tf)
      tf->buf_index = g_array_set_size(tf->buf_index, i + 1);
      off = &g_array_index(tf->buf_index, uint64_t, i);
      *off = offset;
-
+
      /* map block header */
      header = mmap(0, header_map_size, PROT_READ,
                    MAP_PRIVATE, tf->fd, (off_t)offset);
@@ -253,6 +247,46 @@ int ltt_trace_create_block_index(LttTracefile *tf)
    return 0;
  }

+/* parse the new information from the file and reajust the number of 
blocks.
+ *
+ * Return value : 0 success, -1 error
+ */
+int ltt_trace_continue_block_index(LttTracefile *tf)
+{
+  int ret;
+  uint64_t offset;
+  uint32_t last_block_size;
+  unsigned long i = tf->num_blocks;
+  int page_size = getpagesize();
+  unsigned int header_map_size = PAGE_ALIGN(ltt_subbuffer_header_size());
+
+  get_block_offset_size(tf, tf->num_blocks-1, &offset, &last_block_size);
+
+  ltt_subbuffer_header_t *header_tmp = mmap(0, header_map_size, PROT_READ,
+                  MAP_PRIVATE, tf->fd, (off_t)offset);
+  if(header_tmp == MAP_FAILED) {
+    perror("Error in allocating memory for buffer of tracefile");
+    return -1;
+  }
+
+  /* read len, offset += len */
+  offset += ltt_get_uint32(LTT_GET_BO(tf), &header_tmp->sb_size);
+
+  ret = ltt_trace_process_block_index(tf, offset, i);
+}
+
+int ltt_trace_create_block_index(LttTracefile *tf)
+{
+  int ret;
+  uint64_t offset = 0;
+  unsigned long i = 0;
+
+  tf->buf_index = g_array_sized_new(FALSE, TRUE, sizeof(uint64_t),
+                                    DEFAULT_N_BLOCKS);
+  ret = ltt_trace_process_block_index(tf, offset, i);
+  return ret;
+}
+
  /*****************************************************************************
   *Function name
   *    ltt_tracefile_open : open a trace file, construct a LttTracefile
@@ -263,12 +297,13 @@ int ltt_trace_create_block_index(LttTracefile *tf)
   *Return value
   *                       : 0 for success, -1 otherwise.
   
****************************************************************************/ 

-
  static gint ltt_tracefile_open(LttTrace *t, gchar * fileName, 
LttTracefile *tf)
  {
    struct stat    lTDFStat;    /* Trace data file status */
    ltt_subbuffer_header_t *header;
    int page_size = getpagesize();
+
+  tf->streaming_is_empty = FALSE;

    //open the file
    tf->long_name = g_quark_from_string(fileName);
@@ -289,10 +324,31 @@ static gint ltt_tracefile_open(LttTrace *t, gchar 
* fileName, LttTracefile *tf)
    // Is the file large enough to contain a trace
    if(lTDFStat.st_size <
        (off_t)(ltt_subbuffer_header_size())){
-    g_print("The input data file %s does not contain a trace\n", fileName);
-    goto close_file;
+    if(t->streaming_is_enable == FALSE)
+    {
+      g_print("The input data file %s does not contain a trace\n", 
fileName);
+      goto close_file;
+    }
+    else
+    {
+      //We are in a streaming mode, so the file can be empty at the 
start of the analysis
+      tf->streaming_is_empty = TRUE;
+      tf->file_size = lTDFStat.st_size;
+      tf->events_lost = 0;
+      tf->subbuf_corrupt = 0;
+
+      tf->buffer.head = NULL;
+      return -1;
+    }
    }

+  tf->buffer.head = NULL;
+
+  //store the size of the file
+  tf->file_size = lTDFStat.st_size;
+  tf->events_lost = 0;
+  tf->subbuf_corrupt = 0;
+
    /* Temporarily map the buffer start header to get trace information */
    /* Multiple of pages aligned head */
    tf->buffer.head = mmap(0,
@@ -310,11 +366,6 @@ static gint ltt_tracefile_open(LttTrace *t, gchar * 
fileName, LttTracefile *tf)
      g_warning("parse_trace_header error");
      goto unmap_file;
    }
-
-  //store the size of the file
-  tf->file_size = lTDFStat.st_size;
-  tf->events_lost = 0;
-  tf->subbuf_corrupt = 0;

    if(munmap(tf->buffer.head,
          PAGE_ALIGN(ltt_subbuffer_header_size()))) {
@@ -353,6 +404,109 @@ end:
    return -1;
  }

+/*****************************************************************************
+ *Function name
+ *    ltt_tracefile_update : Update the informations concerning a tracefile
+ *                 Must be called periodically in order to have the 
latest informations
+ *Input params
+ *    tf                  : the tracefile
+ *Return value
+ *                       : 0 for success, -1 otherwise.
+ 
****************************************************************************/ 

+int ltt_tracefile_update(LttTracefile *tf, LttTrace *t)
+{
+    struct stat    lTDFStat;    /* Trace data file status */
+    if(fstat(tf->fd, &lTDFStat) < 0){
+      perror("error in getting the tracefile informations.");
+    }
+    int page_size = getpagesize();
+
+#ifndef HAS_INOTIFY
+   if(tf->file_size < lTDFStat.st_size)
+   {
+#endif
+    //Ajust the size of the file
+    tf->file_size = lTDFStat.st_size;
+
+
+    if(tf->streaming_is_empty)
+    {
+      ltt_subbuffer_header_t *header;
+
+      /* Temporarily map the buffer start header to get trace 
information */
+      /* Multiple of pages aligned head */
+      tf->buffer.head = mmap(0,
+      PAGE_ALIGN(ltt_subbuffer_header_size()), PROT_READ,
+      MAP_PRIVATE, tf->fd, 0);
+      if(tf->buffer.head == MAP_FAILED) {
+    perror("Error in allocating memory for buffer of tracefile");
+    goto close_file;
+      }
+      g_assert( ( (gulong)tf->buffer.head&(8-1) ) == 0); // make sure 
it's aligned.
+
+      header = (ltt_subbuffer_header_t *)tf->buffer.head;
+
+      if(parse_trace_header(header, tf, t)) {
+    g_warning("parse_trace_header error");
+    goto unmap_file;
+      }
+
+      if(munmap(tf->buffer.head,
+    PAGE_ALIGN(ltt_subbuffer_header_size()))) {
+    g_warning("unmap size : %zu\n",
+        PAGE_ALIGN(ltt_subbuffer_header_size()));
+    perror("munmap error");
+    g_assert(0);
+    }
+      tf->buffer.head = NULL;
+
+      /* Create block index */
+      ltt_trace_create_block_index(tf);
+
+      //read the first block
+      if(map_block(tf,0)) {
+    perror("Cannot map block for tracefile");
+    goto close_file;
+      }
+      tf->streaming_is_empty = FALSE;
+    }
+    else
+    {
+      //Retreive the new subbuffers and index them
+      ltt_trace_update_block_index(tf);
+    }
+
+    //Update the metadata table if applicable
+    if(tf->name == LTT_TRACEFILE_NAME_METADATA)
+    {
+      map_block(tf, tf->buffer.index + 1);
+      ltt_process_metadata_tracefile(tf);
+    }
+    ltt_tracefile_read(tf);
+
+#ifndef HAS_INOTIFY
+    return 1;
+   }
+#endif
+
+    return 0;
+//       /* Error */
+    unmap_file:
+      if(munmap(tf->buffer.head,
+        PAGE_ALIGN(ltt_subbuffer_header_size()))) {
+    g_warning("unmap size : %zu\n",
+        PAGE_ALIGN(ltt_subbuffer_header_size()));
+    perror("munmap error");
+    g_assert(0);
+      }
+    close_file:
+      close(tf->fd);
+//    end:
+      if (tf->buf_index)
+    g_array_free(tf->buf_index, TRUE);
+      return -1;
+}
+

  /*****************************************************************************
   *Function name
@@ -567,7 +721,63 @@ static __attribute__ ((__unused__)) gboolean 
ltt_tracefile_group_has_cpu_online(
    }
    return 0;
  }
+/****************************************************************************
+ * get_absolute_pathname
+ *
+ * Update the number of tracefile of a "name" group
+ * This function is normally called when a tracefile is detected but 
cannot be parse at
+ * this point. The group size will count this file.
+ *
+ * return the unique pathname in the system
+ *
+ * MD : Fixed this function so it uses realpath, dealing well with
+ * forgotten cases (.. were not used correctly before).
+ *
+ 
****************************************************************************/

+/*
+* Update the number of tracefile in a "name" group
+* This function is normally called when a tracefile is detected but 
cannot be parse at
+* this point. The group size will count this file.
+*
+* return 1 if a new group is created
+* return 0 otherwise
+*
+**/
+int update_num_cpu(LttTrace *trace, GQuark name, guint num)
+{
+  struct marker_data *mdata;
+  GArray *group;
+  int i;
+
+  group = g_datalist_id_get_data(&trace->tracefiles, name);
+  if (group == NULL)
+  {
+    group = g_array_sized_new (FALSE, TRUE, sizeof(LttTracefile), 10);
+    g_datalist_id_set_data_full(&trace->tracefiles, name,
+            group, ltt_tracefile_group_destroy);
+    mdata = allocate_marker_data();
+    if (!mdata)
+      g_error("Error in allocating marker data");
+
+    group = g_array_set_size(group, num+1);
+    for (i = 0; i < group->len; i++)
+      g_array_index (group, LttTracefile, i).mdata = mdata;
+    return 1;
+  }
+  if ( num+1 > group->len )
+  {
+    mdata = g_array_index (group, LttTracefile, 0).mdata;
+    int old_num = group->len;
+    group = g_array_set_size(group, num+1);
+    for (i = old_num; i < group->len; i++)
+      g_array_index (group, LttTracefile, i).mdata = mdata;
+  }
+  if( name != LTT_TRACEFILE_NAME_METADATA ) {
+    update_num_cpu(trace, LTT_TRACEFILE_NAME_METADATA, num);
+  }
+  return 0;
+}

  /* Open each tracefile under a specific directory. Put them in a
   * GData : permits to access them using their tracefile group pathname.
@@ -579,8 +789,7 @@ static __attribute__ ((__unused__)) gboolean 
ltt_tracefile_group_has_cpu_online(
   *
   * A tracefile group is simply an array where all the per cpu 
tracefiles sit.
   */
-
-static int open_tracefiles(LttTrace *trace, gchar *root_path, gchar 
*relative_path)
+int open_tracefiles(LttTrace *trace, gchar *root_path, gchar 
*relative_path)
  {
    DIR *dir = opendir(root_path);
    struct dirent *entry;
@@ -634,6 +843,19 @@ static int open_tracefiles(LttTrace *trace, gchar 
*root_path, gchar *relative_pa
      if(S_ISDIR(stat_buf.st_mode)) {

        g_debug("Entering subdirectory...\n");
+      //Monitor the directory if we are in a streaming mode
+#ifdef HAS_INOTIFY
+      if(trace->streaming_is_enable == TRUE)
+      {
+       trace->inotify_watch_array.elem = 
realloc(trace->inotify_watch_array.elem,
+           ++trace->inotify_watch_array.num * sizeof(struct 
inotify_watch));
+
+       
trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].wd = 
inotify_add_watch(trace->inotify_fd, path, IN_CREATE | IN_OPEN | 
IN_CLOSE_WRITE);
+       
strcpy(trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].path_channel, 
path);
+       
trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].name = 
g_quark_from_string("");
+       
trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].num = -1;
+      }
+#endif
        ret = open_tracefiles(trace, path, rel_path);
        if(ret < 0) continue;
      } else if(S_ISREG(stat_buf.st_mode)) {
@@ -647,12 +869,29 @@ static int open_tracefiles(LttTrace *trace, gchar 
*root_path, gchar *relative_pa
        creation = 0;
        if(get_tracefile_name_number(rel_path, &name, &num, &tid, &pgid, 
&creation))
          continue; /* invalid name */
-
+#ifdef HAS_INOTIFY
+     //Monitor the directory if we are in a streaming mode
+     if(trace->streaming_is_enable == TRUE)
+     {
+       trace->inotify_watch_array.elem = 
realloc(trace->inotify_watch_array.elem,
+           ++trace->inotify_watch_array.num * sizeof(struct 
inotify_watch));
+
+       
trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].wd = 
inotify_add_watch(trace->inotify_fd, path, IN_OPEN | IN_MODIFY | IN_CLOSE);
+       
strcpy(trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].path_channel, 
path);
+       
trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].name = 
name;
+       
trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].num = num;
+      }
+#endif
        g_debug("Opening file.\n");
        if(ltt_tracefile_open(trace, path, &tmp_tf)) {
          g_info("Error opening tracefile %s", path);
-
-        continue; /* error opening the tracefile : bad magic number ? */
+    if(trace->streaming_is_enable == TRUE)
+    {
+      // Take to account the current file in the Cpu count of the group
+      update_num_cpu(trace, name, num);
+    }
+    continue; /* error opening the tracefile : bad magic number ? */
+
        }

        g_debug("Tracefile name is %s and number is %u",
@@ -678,6 +917,9 @@ static int open_tracefiles(LttTrace *trace, gchar 
*root_path, gchar *relative_pa
            g_error("Error in allocating marker data");
        }

+      if(trace->streaming_is_enable == TRUE && name != 
LTT_TRACEFILE_NAME_METADATA)
+    update_num_cpu(trace, LTT_TRACEFILE_NAME_METADATA, num);
+
        /* Add the per cpu tracefile to the named group */
        unsigned int old_len = group->len;
        if(num+1 > old_len)
@@ -802,8 +1044,7 @@ seek_error:
   *
   * pathname must be the directory of the trace
   */
-
-LttTrace *ltt_trace_open(const gchar *pathname)
+LttTrace *ltt_trace_open(const gchar *pathname, unsigned int flags)
  {
    gchar abs_path[PATH_MAX];
    LttTrace  * t;
@@ -823,6 +1064,8 @@ LttTrace *ltt_trace_open(const gchar *pathname)
    get_absolute_pathname(pathname, abs_path);
    t->pathname = g_quark_from_string(abs_path);

+  t->streaming_is_enable = FALSE;
+
    g_datalist_init(&t->tracefiles);

    /* Test to see if it looks like a trace */
@@ -842,7 +1085,18 @@ LttTrace *ltt_trace_open(const gchar *pathname)
      }
    }
    closedir(dir);
-
+
+  if(flags & TRACE_STREAMING)
+  {
+    t->streaming_is_enable = TRUE;
+#ifdef HAS_INOTIFY
+    t->inotify_fd = inotify_init();
+    fcntl(t->inotify_fd, F_SETFL, O_NONBLOCK);
+    t->inotify_watch_array.elem = NULL;
+    t->inotify_watch_array.num = 0;
+    t->open_tracefile_counter = 0;
+#endif
+  }
    /* Open all the tracefiles */
    t->start_freq= 0;
    if(open_tracefiles(t, abs_path, "")) {
@@ -858,15 +1112,30 @@ LttTrace *ltt_trace_open(const gchar *pathname)
    }

    /*
-   * Get the trace information for the metadata_0 tracefile.
     * Getting a correct trace start_time and start_tsc is insured by 
the fact
     * that no subbuffers are supposed to be lost in the metadata channel.
     * Therefore, the first subbuffer contains the start_tsc timestamp 
in its
     * buffer header.
     */
    g_assert(group->len > 0);
-  tf = &g_array_index (group, LttTracefile, 0);
-  header = (ltt_subbuffer_header_t *)tf->buffer.head;
+  if(flags & TRACE_STREAMING)
+  {
+    //Get the trace information from the metadata tracefile.
+    for(i = 0; i < group->len; i++)
+    {
+      tf = &g_array_index (group, LttTracefile, i);
+      header = (ltt_subbuffer_header_t *)tf->buffer.head;
+      if(header != NULL)
+    break;
+    }
+  }
+  else
+  {
+    //Get the trace information from the metadata_0 tracefile.
+    tf = &g_array_index (group, LttTracefile, 0);
+    header = (ltt_subbuffer_header_t *)tf->buffer.head;
+  }
+
    ret = parse_trace_header(header, tf, t);
    g_assert(!ret);

@@ -910,7 +1179,10 @@ alloc_error:
   */
  LttTrace *ltt_trace_copy(LttTrace *self)
  {
-  return ltt_trace_open(g_quark_to_string(self->pathname));
+  unsigned int flag = NO_FLAG;
+  if(self->streaming_is_enable)
+    flag |= TRACE_STREAMING;
+  return ltt_trace_open(g_quark_to_string(self->pathname), flag);
  }

  /*
-- 
1.7.0.4

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.casi.polymtl.ca/pipermail/lttng-dev/attachments/20100827/7b056cd0/attachment-0003.htm>


More information about the lttng-dev mailing list