[ltt-dev] [PATCH 1/2] Add the support for streaming analysis in the libltttraceread library

Oussama El Mfadli oussama.el-mfadli at polymtl.ca
Fri Aug 27 13:09:18 EDT 2010


Add the support for streaming analysis in the libltttraceread library
      - Modification of the opening of the tracefile
        and the way some informations are retreived
      - Add some implementations to append the new arriving blocks into 
the index

---
  ltt/ltt-private.h |    3 +
  ltt/trace.h       |   22 +++-
  ltt/tracefile.c   |  366 
+++++++++++++++++++++++++++++++++++++++++++++++-----
  3 files changed, 354 insertions(+), 37 deletions(-)

diff --git a/ltt/ltt-private.h b/ltt/ltt-private.h
index 65d73d1..a0bcad3 100644
--- a/ltt/ltt-private.h
+++ b/ltt/ltt-private.h
@@ -171,6 +171,9 @@ struct LttTracefile {

    /* Current block */
    LttBuffer buffer;                  //current buffer
+
+  /* For streaming usage: in case the tracefile is currently empty */
+  gboolean Streaming_isEmpty;
  };

  /* The characteristics of the system on which the trace was obtained
diff --git a/ltt/trace.h b/ltt/trace.h
index e16c66f..abbb254 100644
--- a/ltt/trace.h
+++ b/ltt/trace.h
@@ -25,6 +25,20 @@
  #include <stdint.h>
  #include <glib.h>

+#include <sys/inotify.h>
+
+struct inotify_watch {
+    int wd;
+    char path_channel[PATH_MAX];
+    GQuark name;
+    guint num;
+};
+
+struct inotify_watch_array {
+    struct inotify_watch *elem;
+    int num;
+};
+
  struct LttTrace {
    GQuark pathname;                          //the pathname of the trace
    //LttSystemDescription * system_description;//system description
@@ -47,6 +61,11 @@ struct LttTrace {
    LttTime   start_time_from_tsc;

    GData     *tracefiles;                    //tracefiles groups
+
+  // Keep a inotify file descriptor array to detect the changes in the 
files
+  struct inotify_watch_array inotify_watch_array;
+  int inotify_fd;
+  int open_tracefile_counter;
  };

  static inline guint ltt_trace_get_num_cpu(LttTrace *t)
@@ -66,7 +85,8 @@ static inline guint ltt_trace_get_num_cpu(LttTrace *t)
     */

  LttTrace *ltt_trace_open(const gchar *pathname);
-
+LttTrace *ltt_trace_open_streaming(const gchar *pathname, gboolean 
isStreaming);
+void ltt_tracefile_add_streaming( LttTrace *t, gchar *root_path, gchar* 
rel_path);
  /* copy reopens a trace
   *
   * return value NULL if error while opening the trace
diff --git a/ltt/tracefile.c b/ltt/tracefile.c
index 54da747..2d03ed3 100644
--- a/ltt/tracefile.c
+++ b/ltt/tracefile.c
@@ -99,6 +99,8 @@ static int ltt_seek_next_event(LttTracefile *tf);

  static int open_tracefiles(LttTrace *trace, gchar *root_path,
      gchar *relative_path);
+static int open_tracefiles_streaming(LttTrace *trace, gchar *root_path,
+    gchar *relative_path, gboolean isStreaming);
  static int ltt_process_metadata_tracefile(LttTracefile *tf);
  static void ltt_tracefile_time_span_get(LttTracefile *tf,
                                          LttTime *start, LttTime *end);
@@ -209,26 +211,22 @@ int get_block_offset_size(LttTracefile *tf, guint 
block_num,
    return 0;
  }

-int ltt_trace_create_block_index(LttTracefile *tf)
-{
-  int page_size = getpagesize();
-  uint64_t offset = 0;
-  unsigned long i = 0;
-  unsigned int header_map_size = PAGE_ALIGN(ltt_subbuffer_header_size());

-  tf->buf_index = g_array_sized_new(FALSE, TRUE, sizeof(uint64_t),
-                                    DEFAULT_N_BLOCKS);
-
-  g_assert(tf->buf_index->len == i);
-
-  while (offset < tf->file_size) {
+int ltt_trace_process_block_index(LttTracefile *tf, uint64_t offset, 
unsigned long firstBlock)
+{
+      int i = firstBlock;
+      int page_size = getpagesize();
+      unsigned int header_map_size = 
PAGE_ALIGN(ltt_subbuffer_header_size());
+
+    g_assert(tf->buf_index->len == i);
+  while (offset < ltt_get_uint32(LTT_GET_BO(tf), &tf->file_size)) {
      ltt_subbuffer_header_t *header;
      uint64_t *off;

      tf->buf_index = g_array_set_size(tf->buf_index, i + 1);
      off = &g_array_index(tf->buf_index, uint64_t, i);
      *off = offset;
-
+
      /* map block header */
      header = mmap(0, header_map_size, PROT_READ,
                    MAP_PRIVATE, tf->fd, (off_t)offset);
@@ -253,22 +251,65 @@ int ltt_trace_create_block_index(LttTracefile *tf)
    return 0;
  }

+/* parse the new information from the file and reajust the number of 
blocks.
+ *
+ * Return value : 0 success, -1 error
+ */
+int ltt_trace_update_block_index(LttTracefile *tf)
+{
+  int ret;
+  uint64_t offset;
+  uint32_t last_block_size;
+  unsigned long i = tf->num_blocks;
+  int page_size = getpagesize();
+  unsigned int header_map_size = PAGE_ALIGN(ltt_subbuffer_header_size());
+
+  get_block_offset_size(tf, tf->num_blocks-1, &offset, &last_block_size)
+
+  ltt_subbuffer_header_t *header_tmp = mmap(0, header_map_size, PROT_READ,
+                  MAP_PRIVATE, tf->fd, (off_t)offset);
+  if(header_tmp == MAP_FAILED) {
+    perror("Error in allocating memory for buffer of tracefile");
+    return -1;
+  }
+
+  /* read len, offset += len */
+  offset += ltt_get_uint32(LTT_GET_BO(tf), &header_tmp->sb_size);
+
+  ret = ltt_trace_process_block_index(tf, offset, i);
+}
+
+int ltt_trace_create_block_index(LttTracefile *tf)
+{
+  int ret;
+  uint64_t offset = 0;
+  unsigned long i = 0;
+
+  tf->buf_index = g_array_sized_new(FALSE, TRUE, sizeof(uint64_t),
+                                    DEFAULT_N_BLOCKS);
+  ret = ltt_trace_process_block_index(tf, offset, i);
+  return ret;
+}
+
  /*****************************************************************************
   *Function name
- *    ltt_tracefile_open : open a trace file, construct a LttTracefile
+ *    ltt_tracefile_open_streaming : open a trace file, construct a 
LttTracefile
+ *                     The trace file can be a streaming
   *Input params
   *    t                  : the trace containing the tracefile
   *    fileName           : path name of the trace file
   *    tf                 : the tracefile structure
+ *    isStreaming     : flag to open in a streaming mode (TRUE) or not 
(FALSE).
   *Return value
   *                       : 0 for success, -1 otherwise.
   
****************************************************************************/ 

-
-static gint ltt_tracefile_open(LttTrace *t, gchar * fileName, 
LttTracefile *tf)
+static gint ltt_tracefile_open_streaming(LttTrace *t, gchar * fileName, 
LttTracefile *tf, gboolean isStreaming)
  {
    struct stat    lTDFStat;    /* Trace data file status */
    ltt_subbuffer_header_t *header;
    int page_size = getpagesize();
+
+  tf->Streaming_isEmpty = FALSE;

    //open the file
    tf->long_name = g_quark_from_string(fileName);
@@ -289,10 +330,31 @@ static gint ltt_tracefile_open(LttTrace *t, gchar 
* fileName, LttTracefile *tf)
    // Is the file large enough to contain a trace
    if(lTDFStat.st_size <
        (off_t)(ltt_subbuffer_header_size())){
-    g_print("The input data file %s does not contain a trace\n", fileName);
-    goto close_file;
+    if(isStreaming == FALSE)
+    {
+      g_print("The input data file %s does not contain a trace\n", 
fileName);
+      goto close_file;
+    }
+    else
+    {
+      //We are in a streaming mode, so the file can be empty at the 
start of the analysis
+      tf->Streaming_isEmpty = TRUE;
+      tf->file_size = lTDFStat.st_size;
+      tf->events_lost = 0;
+      tf->subbuf_corrupt = 0;
+
+      tf->buffer.head = NULL;
+      return -1;
+    }
    }

+  tf->buffer.head = NULL;
+
+  //store the size of the file
+  tf->file_size = lTDFStat.st_size;
+  tf->events_lost = 0;
+  tf->subbuf_corrupt = 0;
+
    /* Temporarily map the buffer start header to get trace information */
    /* Multiple of pages aligned head */
    tf->buffer.head = mmap(0,
@@ -310,11 +372,6 @@ static gint ltt_tracefile_open(LttTrace *t, gchar * 
fileName, LttTracefile *tf)
      g_warning("parse_trace_header error");
      goto unmap_file;
    }
-
-  //store the size of the file
-  tf->file_size = lTDFStat.st_size;
-  tf->events_lost = 0;
-  tf->subbuf_corrupt = 0;

    if(munmap(tf->buffer.head,
          PAGE_ALIGN(ltt_subbuffer_header_size()))) {
@@ -353,6 +410,114 @@ end:
    return -1;
  }

+/*****************************************************************************
+ *Function name
+ *    ltt_tracefile_open : open a trace file, construct a LttTracefile
+ *Input params
+ *    t                  : the trace containing the tracefile
+ *    fileName           : path name of the trace file
+ *    tf                 : the tracefile structure
+ *Return value
+ *                       : 0 for success, -1 otherwise.
+ 
****************************************************************************/ 

+static gint ltt_tracefile_open(LttTrace *t, gchar * fileName, 
LttTracefile *tf)
+{
+    return ltt_tracefile_open_streaming(t, fileName, tf, FALSE);
+}
+
+/*****************************************************************************
+ *Function name
+ *    ltt_tracefile_update : Update the informations concerning a tracefile
+ *                 Must be called periodically in order to have the 
latest informations
+ *Input params
+ *    tf                  : the tracefile
+ *Return value
+ *                       : 0 for success, -1 otherwise.
+ 
****************************************************************************/ 

+int ltt_tracefile_update(LttTracefile *tf, LttTrace *t)
+{
+    struct stat    lTDFStat;    /* Trace data file status */
+    if(fstat(tf->fd, &lTDFStat) < 0){
+      perror("error in getting the tracefile informations.");
+    }
+    int page_size = getpagesize();
+
+    //Ajust the size of the file
+    tf->file_size = lTDFStat.st_size;
+
+    if(tf->Streaming_isEmpty)
+    {
+      ltt_subbuffer_header_t *header;
+
+      /* Temporarily map the buffer start header to get trace 
information */
+      /* Multiple of pages aligned head */
+      tf->buffer.head = mmap(0,
+      PAGE_ALIGN(ltt_subbuffer_header_size()), PROT_READ,
+      MAP_PRIVATE, tf->fd, 0);
+      if(tf->buffer.head == MAP_FAILED) {
+    perror("Error in allocating memory for buffer of tracefile");
+    goto close_file;
+      }
+      g_assert( ( (gulong)tf->buffer.head&(8-1) ) == 0); // make sure 
it's aligned.
+
+      header = (ltt_subbuffer_header_t *)tf->buffer.head;
+
+      if(parse_trace_header(header, tf, t)) {
+    g_warning("parse_trace_header error");
+    goto unmap_file;
+      }
+
+      if(munmap(tf->buffer.head,
+    PAGE_ALIGN(ltt_subbuffer_header_size()))) {
+    g_warning("unmap size : %zu\n",
+        PAGE_ALIGN(ltt_subbuffer_header_size()));
+    perror("munmap error");
+    g_assert(0);
+    }
+      tf->buffer.head = NULL;
+
+      /* Create block index */
+      ltt_trace_create_block_index(tf);
+
+      //read the first block
+      if(map_block(tf,0)) {
+    perror("Cannot map block for tracefile");
+    goto close_file;
+      }
+      tf->Streaming_isEmpty = FALSE;
+    }
+    else
+    {
+      //Retreive the new subbuffers and index them
+      ltt_trace_update_block_index(tf);
+    }
+
+    //Update the metadata table if applicable
+    if(tf->name == LTT_TRACEFILE_NAME_METADATA)
+    {
+      map_block(tf, tf->buffer.index + 1);
+      ltt_process_metadata_tracefile(tf);
+    }
+    ltt_tracefile_read(tf);
+
+    return 0;
+//       /* Error */
+    unmap_file:
+      if(munmap(tf->buffer.head,
+        PAGE_ALIGN(ltt_subbuffer_header_size()))) {
+    g_warning("unmap size : %zu\n",
+        PAGE_ALIGN(ltt_subbuffer_header_size()));
+    perror("munmap error");
+    g_assert(0);
+      }
+    close_file:
+      close(tf->fd);
+//    end:
+      if (tf->buf_index)
+    g_array_free(tf->buf_index, TRUE);
+      return -1;
+}
+

  /*****************************************************************************
   *Function name
@@ -567,8 +732,77 @@ static __attribute__ ((__unused__)) gboolean 
ltt_tracefile_group_has_cpu_online(
    }
    return 0;
  }
+/****************************************************************************
+ * get_absolute_pathname
+ *
+ * Update the number of tracefile of a "name" group
+ * This function is normally called when a tracefile is detected but 
cannot be parse at
+ * this point. The group size will count this file.
+ *
+ * return the unique pathname in the system
+ *
+ * MD : Fixed this function so it uses realpath, dealing well with
+ * forgotten cases (.. were not used correctly before).
+ *
+ 
****************************************************************************/

-
+/*
+* Update the number of tracefile in a "name" group
+* This function is normally called when a tracefile is detected but 
cannot be parse at
+* this point. The group size will count this file.
+*
+* return 1 if a new group is created
+* return 0 otherwise
+*
+**/
+int update_num_cpu(LttTrace *trace, GQuark name, guint num)
+{
+  struct marker_data *mdata;
+  GArray *group;
+  int i;
+
+  group = g_datalist_id_get_data(&trace->tracefiles, name);
+  if (group == NULL)
+  {
+    group = g_array_sized_new (FALSE, TRUE, sizeof(LttTracefile), 10);
+    g_datalist_id_set_data_full(&trace->tracefiles, name,
+            group, ltt_tracefile_group_destroy);
+    mdata = allocate_marker_data();
+    if (!mdata)
+      g_error("Error in allocating marker data");
+
+    group = g_array_set_size(group, num+1);
+    for (i = 0; i < group->len; i++)
+      g_array_index (group, LttTracefile, i).mdata = mdata;
+    return 1;
+  }
+  if ( num+1 > group->len )
+  {
+    mdata = g_array_index (group, LttTracefile, 0).mdata;
+    int old_num = group->len;
+    group = g_array_set_size(group, num+1);
+    for (i = old_num; i < group->len; i++)
+      g_array_index (group, LttTracefile, i).mdata = mdata;
+  }
+  if( name != LTT_TRACEFILE_NAME_METADATA ) {
+    update_num_cpu(trace, LTT_TRACEFILE_NAME_METADATA, num);
+  }
+  return 0;
+}
+/*****************************************************************************
+ *Function name
+ *    ltt_tracefile_add_streaming: open and add a tracefile to a trace
+ *                   normally used when a new file is received
+ *                   in a streaming analysis
+ *Input params
+ *    t                  : trace in which the tracefile will be add
+ *    root_path         : path of the trace
+ *    rel_path         : relative path of the tracefile
+ 
****************************************************************************/
+void ltt_tracefile_add_streaming( LttTrace *t, gchar *root_path, gchar* 
rel_path)
+{
+      open_tracefiles_streaming(t, root_path, rel_path, TRUE);
+}
  /* Open each tracefile under a specific directory. Put them in a
   * GData : permits to access them using their tracefile group pathname.
   * i.e. access control/modules tracefile group by index :
@@ -579,9 +813,13 @@ static __attribute__ ((__unused__)) gboolean 
ltt_tracefile_group_has_cpu_online(
   *
   * A tracefile group is simply an array where all the per cpu 
tracefiles sit.
   */
-
  static int open_tracefiles(LttTrace *trace, gchar *root_path, gchar 
*relative_path)
  {
+  return open_tracefiles_streaming(trace, root_path, relative_path, FALSE);
+}
+
+static int open_tracefiles_streaming(LttTrace *trace, gchar *root_path, 
gchar *relative_path, gboolean isStreaming)
+{
    DIR *dir = opendir(root_path);
    struct dirent *entry;
    struct stat stat_buf;
@@ -634,7 +872,18 @@ static int open_tracefiles(LttTrace *trace, gchar 
*root_path, gchar *relative_pa
      if(S_ISDIR(stat_buf.st_mode)) {

        g_debug("Entering subdirectory...\n");
-      ret = open_tracefiles(trace, path, rel_path);
+      //Monitor the directory if we are in a streaming mode
+      if(isStreaming == TRUE)
+      {
+       trace->inotify_watch_array.elem = 
realloc(trace->inotify_watch_array.elem,
+           ++trace->inotify_watch_array.num * sizeof(struct 
inotify_watch));
+
+       
trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].wd = 
inotify_add_watch(trace->inotify_fd, path, IN_CREATE | IN_OPEN | 
IN_CLOSE_WRITE);
+       
strcpy(trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].path_channel, 
path);
+       
trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].name = 
g_quark_from_string("");
+       
trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].num = -1;
+      }
+      ret = open_tracefiles_streaming(trace, path, rel_path, isStreaming);
        if(ret < 0) continue;
      } else if(S_ISREG(stat_buf.st_mode)) {
        GQuark name;
@@ -647,12 +896,27 @@ static int open_tracefiles(LttTrace *trace, gchar 
*root_path, gchar *relative_pa
        creation = 0;
        if(get_tracefile_name_number(rel_path, &name, &num, &tid, &pgid, 
&creation))
          continue; /* invalid name */
-
+     //Monitor the directory if we are in a streaming mode
+     if(isStreaming == TRUE)
+     {
+       trace->inotify_watch_array.elem = 
realloc(trace->inotify_watch_array.elem,
+           ++trace->inotify_watch_array.num * sizeof(struct 
inotify_watch));
+
+       
trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].wd = 
inotify_add_watch(trace->inotify_fd, path, IN_OPEN | IN_MODIFY | IN_CLOSE);
+       
strcpy(trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].path_channel, 
path);
+       
trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].name = 
name;
+       
trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].num = num;
+      }
        g_debug("Opening file.\n");
-      if(ltt_tracefile_open(trace, path, &tmp_tf)) {
+      if(ltt_tracefile_open_streaming(trace, path, &tmp_tf, isStreaming)) {
          g_info("Error opening tracefile %s", path);
-
-        continue; /* error opening the tracefile : bad magic number ? */
+    if(isStreaming == TRUE)
+    {
+      // Take to account the current file in the Cpu count of the group
+      update_num_cpu(trace, name, num);
+    }
+    continue; /* error opening the tracefile : bad magic number ? */
+
        }

        g_debug("Tracefile name is %s and number is %u",
@@ -678,6 +942,9 @@ static int open_tracefiles(LttTrace *trace, gchar 
*root_path, gchar *relative_pa
            g_error("Error in allocating marker data");
        }

+      if(isStreaming == TRUE && name != LTT_TRACEFILE_NAME_METADATA)
+    update_num_cpu(trace, LTT_TRACEFILE_NAME_METADATA, num);
+
        /* Add the per cpu tracefile to the named group */
        unsigned int old_len = group->len;
        if(num+1 > old_len)
@@ -802,9 +1069,13 @@ seek_error:
   *
   * pathname must be the directory of the trace
   */
-
  LttTrace *ltt_trace_open(const gchar *pathname)
  {
+  return ltt_trace_open_streaming(pathname, FALSE);
+}
+
+LttTrace *ltt_trace_open_streaming(const gchar *pathname, gboolean 
isStreaming)
+{
    gchar abs_path[PATH_MAX];
    LttTrace  * t;
    LttTracefile *tf;
@@ -842,10 +1113,18 @@ LttTrace *ltt_trace_open(const gchar *pathname)
      }
    }
    closedir(dir);
-
+
+  if(isStreaming == TRUE)
+  {
+    t->inotify_fd = inotify_init();
+    fcntl(t->inotify_fd, F_SETFL, O_NONBLOCK);
+    t->inotify_watch_array.elem = NULL;
+    t->inotify_watch_array.num = 0;
+    t->open_tracefile_counter = 0;
+  }
    /* Open all the tracefiles */
    t->start_freq= 0;
-  if(open_tracefiles(t, abs_path, "")) {
+  if(open_tracefiles_streaming(t, abs_path, "", isStreaming)) {
      g_warning("Error opening tracefile %s", abs_path);
      goto find_error;
    }
@@ -858,15 +1137,30 @@ LttTrace *ltt_trace_open(const gchar *pathname)
    }

    /*
-   * Get the trace information for the metadata_0 tracefile.
     * Getting a correct trace start_time and start_tsc is insured by 
the fact
     * that no subbuffers are supposed to be lost in the metadata channel.
     * Therefore, the first subbuffer contains the start_tsc timestamp 
in its
     * buffer header.
     */
    g_assert(group->len > 0);
-  tf = &g_array_index (group, LttTracefile, 0);
-  header = (ltt_subbuffer_header_t *)tf->buffer.head;
+  if(isStreaming == TRUE)
+  {
+    //Get the trace information from the metadata tracefile.
+    for(i = 0; i < group->len; i++)
+    {
+      tf = &g_array_index (group, LttTracefile, i);
+      header = (ltt_subbuffer_header_t *)tf->buffer.head;
+      if(header != NULL)
+    break;
+    }
+  }
+  else
+  {
+    //Get the trace information from the metadata_0 tracefile.
+    tf = &g_array_index (group, LttTracefile, 0);
+    header = (ltt_subbuffer_header_t *)tf->buffer.head;
+  }
+
    ret = parse_trace_header(header, tf, t);
    g_assert(!ret);

-- 
1.7.0.4





More information about the lttng-dev mailing list