[ltt-dev] [PATCH 1/2] Add the support for streaming analysis in the libltttraceread library

Oussama El Mfadli oussama.el-mfadli at polymtl.ca
Fri Oct 1 20:56:47 EDT 2010


Hi,
thanks for you feed-back Mathieu. I made the modifications that you
requested.

The patch is following:

- Modification of the opening of the tracefile
    and the way some informations are retreived
  - Add some implementations to append the new arriving blocks into the
index
---
 ltt/ltt-private.h |    3 +
 ltt/trace.h       |   97 +++++++++++++++++-
 ltt/tracefile.c   |  296
+++++++++++++++++++++++++++++++++++++++++++++++------
 3 files changed, 364 insertions(+), 32 deletions(-)

diff --git a/ltt/ltt-private.h b/ltt/ltt-private.h
index 65d73d1..9f23690 100644
--- a/ltt/ltt-private.h
+++ b/ltt/ltt-private.h
@@ -171,6 +171,9 @@ struct LttTracefile {

   /* Current block */
   LttBuffer buffer;                  //current buffer
+
+  /* For streaming usage: in case the tracefile is currently empty */
+  gboolean streaming_is_enabled;
 };

 /* The characteristics of the system on which the trace was obtained
diff --git a/ltt/trace.h b/ltt/trace.h
index e16c66f..a1503d3 100644
--- a/ltt/trace.h
+++ b/ltt/trace.h
@@ -24,6 +24,35 @@
 #include <ltt/ltt-private.h>
 #include <stdint.h>
 #include <glib.h>
+#include <sys/stat.h>
+#include <string.h>
+#include <stdlib.h>
+
+#include <linux/version.h>
+
+//Options used to open a trace
+#define NO_FLAG 0
+#define TRACE_STREAMING (1 << 0)
+
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,14)
+#include <sys/inotify.h>
+
+#define HAS_INOTIFY
+#else
+
+#undef HAS_INOTIFY
+#endif
+struct inotify_watch {
+ int wd;
+ char path_channel[PATH_MAX];
+ GQuark name;
+ guint num;
+};
+
+struct inotify_watch_array {
+ struct inotify_watch *elem;
+ int num;
+};

 struct LttTrace {
   GQuark pathname;                          //the pathname of the trace
@@ -47,7 +76,70 @@ struct LttTrace {
   LttTime   start_time_from_tsc;

   GData     *tracefiles;                    //tracefiles groups
+
+  // Keep a inotify file descriptor array to detect the changes in the
files
+  struct inotify_watch_array inotify_watch_array;
+  int inotify_fd;
+  int open_tracefile_counter;
+
+  gboolean streaming_is_enable;
 };
+#ifndef HAS_INOTIFY
+static inline int ltt_tracefile_check_size(struct stat* lTDFStat)
+{
+   if(tf->file_size < lTDFStat->st_size)
+   {
+     return 0;
+   }
+   return 1;
+}
+
+static inline int add_directory_to_watch_array(LttTrace *trace)
+{
+   return 0;
+}
+
+static inline int add_tracefile_to_watch_array(LttTrace *trace)
+{
+  return 0;
+}
+#else
+static inline int ltt_tracefile_check_size(struct stat* lTDFStat)
+{
+   return 0;
+}
+
+static inline int add_directory_to_watch_array(LttTrace *trace, const
gchar* path)
+{
+  if(trace->streaming_is_enable == TRUE)
+  {
+    trace->inotify_watch_array.elem =
realloc(trace->inotify_watch_array.elem,
+     ++trace->inotify_watch_array.num * sizeof(struct inotify_watch));
+
+    trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].wd =
inotify_add_watch(trace->inotify_fd, path, IN_CREATE | IN_OPEN |
IN_CLOSE_WRITE);
+
 strcpy(trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].path_channel,
path);
+    trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].name
= g_quark_from_string("");
+    trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].num =
-1;
+    return 1;
+  }
+  return 0;
+}
+
+static inline int add_tracefile_to_watch_array(LttTrace *trace, const
gchar* path, GQuark name, guint num)
+{
+   if(trace->streaming_is_enable == TRUE)
+   {
+      trace->inotify_watch_array.elem =
realloc(trace->inotify_watch_array.elem,
+       ++trace->inotify_watch_array.num * sizeof(struct inotify_watch));
+
+      trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].wd
= inotify_add_watch(trace->inotify_fd, path, IN_OPEN | IN_MODIFY |
IN_CLOSE);
+
 strcpy(trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].path_channel,
path);
+
 trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].name =
name;
+      trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].num
= num;
+    }
+  return 0;
+}
+#endif

 static inline guint ltt_trace_get_num_cpu(LttTrace *t)
 {
@@ -65,8 +157,9 @@ static inline guint ltt_trace_get_num_cpu(LttTrace *t)

    */

-LttTrace *ltt_trace_open(const gchar *pathname);
-
+LttTrace *ltt_trace_open(const gchar *pathname, unsigned int flags);
+int open_tracefiles(LttTrace *trace, gchar *root_path,
+    gchar *relative_path);
 /* copy reopens a trace
  *
  * return value NULL if error while opening the trace
diff --git a/ltt/tracefile.c b/ltt/tracefile.c
index 0d8a248..607ca7c 100644
--- a/ltt/tracefile.c
+++ b/ltt/tracefile.c
@@ -99,8 +99,6 @@ static guint64 cycles_2_ns(LttTracefile *tf, guint64
cycles);
 /* go to the next event */
 static int ltt_seek_next_event(LttTracefile *tf);

-static int open_tracefiles(LttTrace *trace, gchar *root_path,
-    gchar *relative_path);
 static int ltt_process_metadata_tracefile(LttTracefile *tf);
 static void ltt_tracefile_time_span_get(LttTracefile *tf,
                                         LttTime *start, LttTime *end);
@@ -211,18 +209,14 @@ int get_block_offset_size(LttTracefile *tf, guint
block_num,
   return 0;
 }

-int ltt_trace_create_block_index(LttTracefile *tf)
-{
-  int page_size = getpagesize();
-  uint64_t offset = 0;
-  unsigned long i = 0;
-  unsigned int header_map_size = PAGE_ALIGN(ltt_subbuffer_header_size());
-
-  tf->buf_index = g_array_sized_new(FALSE, TRUE, sizeof(uint64_t),
-                                    DEFAULT_N_BLOCKS);

-  g_assert(tf->buf_index->len == i);
+static int ltt_trace_update_block_index(LttTracefile *tf, uint64_t offset,
unsigned long firstBlock)
+{
+      int i = firstBlock;
+      int page_size = getpagesize();
+      unsigned int header_map_size =
PAGE_ALIGN(ltt_subbuffer_header_size());

+    g_assert(tf->buf_index->len == i);
   while (offset < tf->file_size) {
     ltt_subbuffer_header_t *header;
     uint64_t *off;
@@ -255,6 +249,48 @@ int ltt_trace_create_block_index(LttTracefile *tf)
   return 0;
 }

+/* parse the new information from the file and reajust the number of
blocks.
+ *
+ * Return value : 0 success, -1 error
+ */
+int ltt_trace_continue_block_index(LttTracefile *tf)
+{
+  int ret;
+  uint64_t offset;
+  uint32_t last_block_size;
+  unsigned long i = tf->num_blocks;
+  int page_size = getpagesize();
+  unsigned int header_map_size = PAGE_ALIGN(ltt_subbuffer_header_size());
+
+  get_block_offset_size(tf, tf->num_blocks-1, &offset, &last_block_size);
+
+  ltt_subbuffer_header_t *header_tmp = mmap(0, header_map_size, PROT_READ,
+                  MAP_PRIVATE, tf->fd, (off_t)offset);
+  if(header_tmp == MAP_FAILED) {
+    perror("Error in allocating memory for buffer of tracefile");
+    return -1;
+  }
+
+  /* read len, offset += len */
+  offset += ltt_get_uint32(LTT_GET_BO(tf), &header_tmp->sb_size);
+
+  ret = ltt_trace_update_block_index(tf, offset, i);
+}
+
+int ltt_trace_create_block_index(LttTracefile *tf)
+{
+  int ret;
+  uint64_t offset = 0;
+  unsigned long i = 0;
+
+  tf->buf_index = g_array_sized_new(FALSE, TRUE, sizeof(uint64_t),
+                                    DEFAULT_N_BLOCKS);
+  if(!tf->buf_index)
+      return -1;
+  ret = ltt_trace_update_block_index(tf, offset, i);
+  return ret;
+}
+
 /*****************************************************************************
  *Function name
  *    ltt_tracefile_open : open a trace file, construct a LttTracefile
@@ -271,6 +307,7 @@ static gint ltt_tracefile_open(LttTrace *t, gchar *
fileName, LttTracefile *tf)
   struct stat    lTDFStat;    /* Trace data file status */
   ltt_subbuffer_header_t *header;
   int page_size = getpagesize();
+  tf->streaming_is_enabled = FALSE;

   //open the file
   tf->long_name = g_quark_from_string(fileName);
@@ -291,10 +328,31 @@ static gint ltt_tracefile_open(LttTrace *t, gchar *
fileName, LttTracefile *tf)
   // Is the file large enough to contain a trace
   if(lTDFStat.st_size <
       (off_t)(ltt_subbuffer_header_size())){
-    g_print("The input data file %s does not contain a trace\n", fileName);
-    goto close_file;
+    if(t->streaming_is_enable == FALSE)
+    {
+      g_print("The input data file %s does not contain a trace\n",
fileName);
+      goto close_file;
+    }
+    else
+    {
+      //We are in a streaming mode, so the file can be empty at the start
of the analysis
+      tf->streaming_is_enabled = TRUE;
+      tf->file_size = lTDFStat.st_size;
+      tf->events_lost = 0;
+      tf->subbuf_corrupt = 0;
+
+      tf->buffer.head = NULL;
+      return -1;
+    }
   }

+  tf->buffer.head = NULL;
+
+  //store the size of the file
+  tf->file_size = lTDFStat.st_size;
+  tf->events_lost = 0;
+  tf->subbuf_corrupt = 0;
+
   /* Temporarily map the buffer start header to get trace information */
   /* Multiple of pages aligned head */
   tf->buffer.head = mmap(0,
@@ -312,11 +370,6 @@ static gint ltt_tracefile_open(LttTrace *t, gchar *
fileName, LttTracefile *tf)
     g_warning("parse_trace_header error");
     goto unmap_file;
   }
-
-  //store the size of the file
-  tf->file_size = lTDFStat.st_size;
-  tf->events_lost = 0;
-  tf->subbuf_corrupt = 0;

   if(munmap(tf->buffer.head,
         PAGE_ALIGN(ltt_subbuffer_header_size()))) {
@@ -361,6 +414,103 @@ end:
   return -1;
 }

+/*****************************************************************************
+ *Function name
+ *    ltt_tracefile_update : Update the informations concerning a tracefile
+ *      Must be called periodically to update trace file and file size
+      information.
+ *Input params
+ *    tf                  : the tracefile
+ *Return value
+ *                       : 0 for success, -1 otherwise.
+
****************************************************************************/
+int ltt_tracefile_update(LttTracefile *tf, LttTrace *t)
+{
+    struct stat    lTDFStat;    /* Trace data file status */
+    int page_size = getpagesize();
+    if(fstat(tf->fd, &lTDFStat) < 0){
+      perror("error in getting the tracefile informations.");
+    }
+
+   if(!ltt_tracefile_check_size(&lTDFStat))
+   {
+    //Ajust the size of the file
+    tf->file_size = lTDFStat.st_size;
+
+    if(tf->streaming_is_enabled)
+    {
+      ltt_subbuffer_header_t *header;
+
+      /* Temporarily map the buffer start header to get trace information
*/
+      /* Multiple of pages aligned head */
+      tf->buffer.head = mmap(0,
+   PAGE_ALIGN(ltt_subbuffer_header_size()), PROT_READ,
+   MAP_PRIVATE, tf->fd, 0);
+      if(tf->buffer.head == MAP_FAILED) {
+ perror("Error in allocating memory for buffer of tracefile");
+ goto unmap_file;
+      }
+      g_assert( ( (gulong)tf->buffer.head&(8-1) ) == 0); // make sure it's
aligned.
+
+      header = (ltt_subbuffer_header_t *)tf->buffer.head;
+
+      if(parse_trace_header(header, tf, t))
+      {
+ g_warning("parse_trace_header error");
+ goto unmap_file;
+      }
+
+      if(munmap(tf->buffer.head,
+ PAGE_ALIGN(ltt_subbuffer_header_size())))
+      {
+ g_warning("unmap size : %zu\n",
+     PAGE_ALIGN(ltt_subbuffer_header_size()));
+ perror("munmap error");
+ g_assert(0);
+      }
+      tf->buffer.head = NULL;
+
+      /* Create block index */
+      ltt_trace_create_block_index(tf);
+
+      //read the first block
+      if(map_block(tf,0))
+      {
+ perror("Cannot map block for tracefile");
+ goto unmap_file;
+      }
+      tf->streaming_is_enabled = FALSE;
+    }
+    else
+    {
+      //Retreive the new subbuffers and index them
+      ltt_trace_continue_block_index(tf);
+    }
+
+    //Update the metadata table if applicable
+    if(tf->name == LTT_TRACEFILE_NAME_METADATA)
+    {
+      map_block(tf, tf->buffer.index + 1);
+      ltt_process_metadata_tracefile(tf);
+    }
+    ltt_tracefile_read(tf);
+
+    return 1;
+   }
+
+    return 0;
+//       /* Error */
+    unmap_file:
+      if(munmap(tf->buffer.head,
+     PAGE_ALIGN(ltt_subbuffer_header_size()))) {
+ g_warning("unmap size : %zu\n",
+     PAGE_ALIGN(ltt_subbuffer_header_size()));
+ perror("munmap error");
+ g_assert(0);
+      }
+      return -1;
+}
+

 /*****************************************************************************
  *Function name
@@ -576,7 +726,51 @@ static __attribute__ ((__unused__)) gboolean
ltt_tracefile_group_has_cpu_online(
   }
   return 0;
 }
+/****************************************************************************
+ * update_num_cpu
+ *
+ * Update the number of tracefile in a "name" group
+ * This function is normally called when a tracefile is detected but cannot
be parsed at
+ * this point. The group size will account for this file.
+ *
+ * return 1 if a new group is created
+ * return 0 otherwise
+ *
+
****************************************************************************/
+int update_num_cpu(LttTrace *trace, GQuark name, guint num)
+{
+  struct marker_data *mdata;
+  GArray *group;
+  int i;

+  group = g_datalist_id_get_data(&trace->tracefiles, name);
+  if (group == NULL)
+  {
+    group = g_array_sized_new (FALSE, TRUE, sizeof(LttTracefile), 10);
+    g_datalist_id_set_data_full(&trace->tracefiles, name,
+ group, ltt_tracefile_group_destroy);
+    mdata = allocate_marker_data();
+    if (!mdata)
+      g_error("Error in allocating marker data");
+
+    group = g_array_set_size(group, num+1);
+    for (i = 0; i < group->len; i++)
+      g_array_index (group, LttTracefile, i).mdata = mdata;
+    return 1;
+  }
+  if(num+1 > group->len)
+  {
+    mdata = g_array_index (group, LttTracefile, 0).mdata;
+    int old_num = group->len;
+    group = g_array_set_size(group, num+1);
+    for (i = old_num; i < group->len; i++)
+      g_array_index (group, LttTracefile, i).mdata = mdata;
+  }
+  if( name != LTT_TRACEFILE_NAME_METADATA ) {
+    update_num_cpu(trace, LTT_TRACEFILE_NAME_METADATA, num);
+  }
+  return 0;
+}

 /* Open each tracefile under a specific directory. Put them in a
  * GData : permits to access them using their tracefile group pathname.
@@ -588,8 +782,7 @@ static __attribute__ ((__unused__)) gboolean
ltt_tracefile_group_has_cpu_online(
  *
  * A tracefile group is simply an array where all the per cpu tracefiles
sit.
  */
-
-static int open_tracefiles(LttTrace *trace, gchar *root_path, gchar
*relative_path)
+int open_tracefiles(LttTrace *trace, gchar *root_path, gchar
*relative_path)
 {
   DIR *dir = opendir(root_path);
   struct dirent *entry;
@@ -643,6 +836,9 @@ static int open_tracefiles(LttTrace *trace, gchar
*root_path, gchar *relative_pa
     if(S_ISDIR(stat_buf.st_mode)) {

       g_debug("Entering subdirectory...\n");
+      //Monitor the directory if we are in a streaming mode
+      add_directory_to_watch_array(trace, path);
+
       ret = open_tracefiles(trace, path, rel_path);
       if(ret < 0) continue;
     } else if(S_ISREG(stat_buf.st_mode)) {
@@ -656,12 +852,19 @@ static int open_tracefiles(LttTrace *trace, gchar
*root_path, gchar *relative_pa
       creation = 0;
       if(get_tracefile_name_number(rel_path, &name, &num, &tid, &pgid,
&creation))
         continue; /* invalid name */
-
+
+      //Monitor the directory if we are in a streaming mode
+      add_tracefile_to_watch_array(trace, path, name, num);
       g_debug("Opening file.\n");
       if(ltt_tracefile_open(trace, path, &tmp_tf)) {
         g_info("Error opening tracefile %s", path);
+ if(trace->streaming_is_enable == TRUE)
+ {
+   // Take to account the current file in the Cpu count of the group
+   update_num_cpu(trace, name, num);
+ }
+ continue; /* error opening the tracefile : bad magic number ? */

-        continue; /* error opening the tracefile : bad magic number ? */
       }

       g_debug("Tracefile name is %s and number is %u",
@@ -687,6 +890,9 @@ static int open_tracefiles(LttTrace *trace, gchar
*root_path, gchar *relative_pa
           g_error("Error in allocating marker data");
       }

+      if(trace->streaming_is_enable == TRUE && name !=
LTT_TRACEFILE_NAME_METADATA)
+ update_num_cpu(trace, LTT_TRACEFILE_NAME_METADATA, num);
+
       /* Add the per cpu tracefile to the named group */
       unsigned int old_len = group->len;
       if(num+1 > old_len)
@@ -811,8 +1017,7 @@ seek_error:
  *
  * pathname must be the directory of the trace
  */
-
-LttTrace *ltt_trace_open(const gchar *pathname)
+LttTrace *ltt_trace_open(const gchar *pathname, unsigned int flags)
 {
   gchar abs_path[PATH_MAX];
   LttTrace  * t;
@@ -832,6 +1037,8 @@ LttTrace *ltt_trace_open(const gchar *pathname)
   get_absolute_pathname(pathname, abs_path);
   t->pathname = g_quark_from_string(abs_path);

+  t->streaming_is_enable = FALSE;
+
   g_datalist_init(&t->tracefiles);

   /* Test to see if it looks like a trace */
@@ -851,7 +1058,18 @@ LttTrace *ltt_trace_open(const gchar *pathname)
     }
   }
   closedir(dir);
-
+
+  if(flags & TRACE_STREAMING)
+  {
+    t->streaming_is_enable = TRUE;
+#ifdef HAS_INOTIFY
+    t->inotify_fd = inotify_init();
+    fcntl(t->inotify_fd, F_SETFL, O_NONBLOCK);
+    t->inotify_watch_array.elem = NULL;
+    t->inotify_watch_array.num = 0;
+    t->open_tracefile_counter = 0;
+#endif
+  }
   /* Open all the tracefiles */
   t->start_freq= 0;
   if(open_tracefiles(t, abs_path, "")) {
@@ -867,15 +1085,30 @@ LttTrace *ltt_trace_open(const gchar *pathname)
   }

   /*
-   * Get the trace information for the metadata_0 tracefile.
    * Getting a correct trace start_time and start_tsc is insured by the
fact
    * that no subbuffers are supposed to be lost in the metadata channel.
    * Therefore, the first subbuffer contains the start_tsc timestamp in its
    * buffer header.
    */
   g_assert(group->len > 0);
-  tf = &g_array_index (group, LttTracefile, 0);
-  header = (ltt_subbuffer_header_t *)tf->buffer.head;
+  if(flags & TRACE_STREAMING)
+  {
+    //Get the trace information from the metadata tracefile.
+    for(i = 0; i < group->len; i++)
+    {
+      tf = &g_array_index (group, LttTracefile, i);
+      header = (ltt_subbuffer_header_t *)tf->buffer.head;
+      if(header != NULL)
+ break;
+    }
+  }
+  else
+  {
+    //Get the trace information from the metadata_0 tracefile.
+    tf = &g_array_index (group, LttTracefile, 0);
+    header = (ltt_subbuffer_header_t *)tf->buffer.head;
+  }
+
   ret = parse_trace_header(header, tf, t);
   g_assert(!ret);

@@ -919,7 +1152,10 @@ alloc_error:
  */
 LttTrace *ltt_trace_copy(LttTrace *self)
 {
-  return ltt_trace_open(g_quark_to_string(self->pathname));
+  unsigned int flag = NO_FLAG;
+  if(self->streaming_is_enable)
+    flag |= TRACE_STREAMING;
+  return ltt_trace_open(g_quark_to_string(self->pathname), flag);
 }

 /*
-- 
1.7.0.4
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.casi.polymtl.ca/pipermail/lttng-dev/attachments/20101001/9862bd19/attachment-0001.htm>


More information about the lttng-dev mailing list