[ltt-dev] [PATCH 1/2] Add the support for streaming analysis in the libltttraceread library

y at cyberlogic-desktop y at cyberlogic-desktop
Fri Aug 27 12:31:01 EDT 2010


From: Oussama El Mfadli <oussama.el-mfadli at polymtl.ca>

	Add the support for streaming analysis in the libltttraceread library
 	- Modification of the opening of the tracefile
 	  and the way some informations are retreived
 	- Add some implementations to append the new arriving blocks into the index

---
 ltt/ltt-private.h |    3 +
 ltt/trace.h       |   22 +++-
 ltt/tracefile.c   |  366 +++++++++++++++++++++++++++++++++++++++++++++++-----
 3 files changed, 354 insertions(+), 37 deletions(-)

diff --git a/ltt/ltt-private.h b/ltt/ltt-private.h
index 65d73d1..a0bcad3 100644
--- a/ltt/ltt-private.h
+++ b/ltt/ltt-private.h
@@ -171,6 +171,9 @@ struct LttTracefile {
 
   /* Current block */
   LttBuffer buffer;                  //current buffer
+  
+  /* For streaming usage: in case the tracefile is currently empty */
+  gboolean Streaming_isEmpty;
 };
 
 /* The characteristics of the system on which the trace was obtained
diff --git a/ltt/trace.h b/ltt/trace.h
index e16c66f..abbb254 100644
--- a/ltt/trace.h
+++ b/ltt/trace.h
@@ -25,6 +25,20 @@
 #include <stdint.h>
 #include <glib.h>
 
+#include <sys/inotify.h>
+
+struct inotify_watch {
+	int wd;
+	char path_channel[PATH_MAX];
+	GQuark name;
+	guint num;
+};
+
+struct inotify_watch_array {
+	struct inotify_watch *elem;
+	int num;
+};
+
 struct LttTrace {
   GQuark pathname;                          //the pathname of the trace
   //LttSystemDescription * system_description;//system description 
@@ -47,6 +61,11 @@ struct LttTrace {
   LttTime   start_time_from_tsc;
 
   GData     *tracefiles;                    //tracefiles groups
+  
+  // Keep a inotify file descriptor array to detect the changes in the files
+  struct inotify_watch_array inotify_watch_array;
+  int inotify_fd;
+  int open_tracefile_counter;
 };
 
 static inline guint ltt_trace_get_num_cpu(LttTrace *t)
@@ -66,7 +85,8 @@ static inline guint ltt_trace_get_num_cpu(LttTrace *t)
    */
 
 LttTrace *ltt_trace_open(const gchar *pathname);
-
+LttTrace *ltt_trace_open_streaming(const gchar *pathname, gboolean isStreaming);
+void ltt_tracefile_add_streaming( LttTrace *t, gchar *root_path, gchar* rel_path);
 /* copy reopens a trace 
  *
  * return value NULL if error while opening the trace 
diff --git a/ltt/tracefile.c b/ltt/tracefile.c
index 54da747..2d03ed3 100644
--- a/ltt/tracefile.c
+++ b/ltt/tracefile.c
@@ -99,6 +99,8 @@ static int ltt_seek_next_event(LttTracefile *tf);
 
 static int open_tracefiles(LttTrace *trace, gchar *root_path,
     gchar *relative_path);
+static int open_tracefiles_streaming(LttTrace *trace, gchar *root_path,
+    gchar *relative_path, gboolean isStreaming);
 static int ltt_process_metadata_tracefile(LttTracefile *tf);
 static void ltt_tracefile_time_span_get(LttTracefile *tf,
                                         LttTime *start, LttTime *end);
@@ -209,26 +211,22 @@ int get_block_offset_size(LttTracefile *tf, guint block_num,
   return 0;
 }
 
-int ltt_trace_create_block_index(LttTracefile *tf)
-{
-  int page_size = getpagesize();
-  uint64_t offset = 0;
-  unsigned long i = 0;
-  unsigned int header_map_size = PAGE_ALIGN(ltt_subbuffer_header_size());
 
-  tf->buf_index = g_array_sized_new(FALSE, TRUE, sizeof(uint64_t),
-                                    DEFAULT_N_BLOCKS);
-
-  g_assert(tf->buf_index->len == i);
-
-  while (offset < tf->file_size) {
+int ltt_trace_process_block_index(LttTracefile *tf, uint64_t offset, unsigned long firstBlock)
+{
+      int i = firstBlock;
+      int page_size = getpagesize();
+      unsigned int header_map_size = PAGE_ALIGN(ltt_subbuffer_header_size());
+    
+    g_assert(tf->buf_index->len == i);
+  while (offset < ltt_get_uint32(LTT_GET_BO(tf), &tf->file_size)) {
     ltt_subbuffer_header_t *header;
     uint64_t *off;
 
     tf->buf_index = g_array_set_size(tf->buf_index, i + 1);
     off = &g_array_index(tf->buf_index, uint64_t, i);
     *off = offset;
-
+    
     /* map block header */
     header = mmap(0, header_map_size, PROT_READ, 
                   MAP_PRIVATE, tf->fd, (off_t)offset);
@@ -253,22 +251,65 @@ int ltt_trace_create_block_index(LttTracefile *tf)
   return 0;
 }
 
+/* parse the new information from the file and reajust the number of blocks.
+ *
+ * Return value : 0 success, -1 error
+ */ 
+int ltt_trace_update_block_index(LttTracefile *tf)
+{
+  int ret;
+  uint64_t offset;
+  uint32_t last_block_size;
+  unsigned long i = tf->num_blocks;
+  int page_size = getpagesize();
+  unsigned int header_map_size = PAGE_ALIGN(ltt_subbuffer_header_size());
+  
+  get_block_offset_size(tf, tf->num_blocks-1, &offset, &last_block_size)
+ 
+  ltt_subbuffer_header_t *header_tmp = mmap(0, header_map_size, PROT_READ, 
+                  MAP_PRIVATE, tf->fd, (off_t)offset);
+  if(header_tmp == MAP_FAILED) {
+    perror("Error in allocating memory for buffer of tracefile");
+    return -1;
+  }
+
+  /* read len, offset += len */
+  offset += ltt_get_uint32(LTT_GET_BO(tf), &header_tmp->sb_size);
+  
+  ret = ltt_trace_process_block_index(tf, offset, i);
+}
+
+int ltt_trace_create_block_index(LttTracefile *tf)
+{
+  int ret;
+  uint64_t offset = 0;
+  unsigned long i = 0;
+
+  tf->buf_index = g_array_sized_new(FALSE, TRUE, sizeof(uint64_t),
+                                    DEFAULT_N_BLOCKS);
+  ret = ltt_trace_process_block_index(tf, offset, i);
+  return ret;
+}
+
 /*****************************************************************************
  *Function name
- *    ltt_tracefile_open : open a trace file, construct a LttTracefile
+ *    ltt_tracefile_open_streaming : open a trace file, construct a LttTracefile
+ *				     The trace file can be a streaming
  *Input params
  *    t                  : the trace containing the tracefile
  *    fileName           : path name of the trace file
  *    tf                 : the tracefile structure
+ *    isStreaming	 : flag to open in a streaming mode (TRUE) or not (FALSE).
  *Return value
  *                       : 0 for success, -1 otherwise.
  ****************************************************************************/ 
-
-static gint ltt_tracefile_open(LttTrace *t, gchar * fileName, LttTracefile *tf)
+static gint ltt_tracefile_open_streaming(LttTrace *t, gchar * fileName, LttTracefile *tf, gboolean isStreaming)
 {
   struct stat    lTDFStat;    /* Trace data file status */
   ltt_subbuffer_header_t *header;
   int page_size = getpagesize();
+  
+  tf->Streaming_isEmpty = FALSE;
 
   //open the file
   tf->long_name = g_quark_from_string(fileName);
@@ -289,10 +330,31 @@ static gint ltt_tracefile_open(LttTrace *t, gchar * fileName, LttTracefile *tf)
   // Is the file large enough to contain a trace 
   if(lTDFStat.st_size <
       (off_t)(ltt_subbuffer_header_size())){
-    g_print("The input data file %s does not contain a trace\n", fileName);
-    goto close_file;
+    if(isStreaming == FALSE)
+    {
+      g_print("The input data file %s does not contain a trace\n", fileName);
+      goto close_file;
+    }
+    else
+    {
+      //We are in a streaming mode, so the file can be empty at the start of the analysis
+      tf->Streaming_isEmpty = TRUE;
+      tf->file_size = lTDFStat.st_size;
+      tf->events_lost = 0;
+      tf->subbuf_corrupt = 0;
+  
+      tf->buffer.head = NULL;
+      return -1;
+    }
   }
   
+  tf->buffer.head = NULL;
+  
+  //store the size of the file
+  tf->file_size = lTDFStat.st_size;
+  tf->events_lost = 0;
+  tf->subbuf_corrupt = 0;
+  
   /* Temporarily map the buffer start header to get trace information */
   /* Multiple of pages aligned head */
   tf->buffer.head = mmap(0,
@@ -310,11 +372,6 @@ static gint ltt_tracefile_open(LttTrace *t, gchar * fileName, LttTracefile *tf)
     g_warning("parse_trace_header error");
     goto unmap_file;
   }
-    
-  //store the size of the file
-  tf->file_size = lTDFStat.st_size;
-  tf->events_lost = 0;
-  tf->subbuf_corrupt = 0;
 
   if(munmap(tf->buffer.head,
         PAGE_ALIGN(ltt_subbuffer_header_size()))) {
@@ -353,6 +410,114 @@ end:
   return -1;
 }
 
+/*****************************************************************************
+ *Function name
+ *    ltt_tracefile_open : open a trace file, construct a LttTracefile
+ *Input params
+ *    t                  : the trace containing the tracefile
+ *    fileName           : path name of the trace file
+ *    tf                 : the tracefile structure
+ *Return value
+ *                       : 0 for success, -1 otherwise.
+ ****************************************************************************/ 
+static gint ltt_tracefile_open(LttTrace *t, gchar * fileName, LttTracefile *tf)
+{
+    return ltt_tracefile_open_streaming(t, fileName, tf, FALSE);
+}
+
+/*****************************************************************************
+ *Function name
+ *    ltt_tracefile_update : Update the informations concerning a tracefile
+ *			     Must be called periodically in order to have the latest informations
+ *Input params
+ *    tf                  : the tracefile
+ *Return value
+ *                       : 0 for success, -1 otherwise.
+ ****************************************************************************/ 
+int ltt_tracefile_update(LttTracefile *tf, LttTrace *t)
+{
+    struct stat    lTDFStat;    /* Trace data file status */
+    if(fstat(tf->fd, &lTDFStat) < 0){
+      perror("error in getting the tracefile informations.");
+    }
+    int page_size = getpagesize();
+    
+    //Ajust the size of the file
+    tf->file_size = lTDFStat.st_size;
+    
+    if(tf->Streaming_isEmpty)
+    {
+      ltt_subbuffer_header_t *header;
+      
+      /* Temporarily map the buffer start header to get trace information */
+      /* Multiple of pages aligned head */
+      tf->buffer.head = mmap(0,
+	  PAGE_ALIGN(ltt_subbuffer_header_size()), PROT_READ, 
+	  MAP_PRIVATE, tf->fd, 0);
+      if(tf->buffer.head == MAP_FAILED) {
+	perror("Error in allocating memory for buffer of tracefile");
+	goto close_file;
+      }
+      g_assert( ( (gulong)tf->buffer.head&(8-1) ) == 0); // make sure it's aligned.
+      
+      header = (ltt_subbuffer_header_t *)tf->buffer.head;
+      
+      if(parse_trace_header(header, tf, t)) {
+	g_warning("parse_trace_header error");
+	goto unmap_file;
+      }
+      
+      if(munmap(tf->buffer.head,
+	PAGE_ALIGN(ltt_subbuffer_header_size()))) {
+	g_warning("unmap size : %zu\n",
+	    PAGE_ALIGN(ltt_subbuffer_header_size()));
+	perror("munmap error");
+	g_assert(0);
+	}
+      tf->buffer.head = NULL;
+	
+      /* Create block index */
+      ltt_trace_create_block_index(tf);
+    
+      //read the first block
+      if(map_block(tf,0)) {
+	perror("Cannot map block for tracefile");
+	goto close_file;
+      }
+      tf->Streaming_isEmpty = FALSE;
+    }
+    else
+    {
+      //Retreive the new subbuffers and index them
+      ltt_trace_update_block_index(tf);
+    }
+    
+    //Update the metadata table if applicable
+    if(tf->name == LTT_TRACEFILE_NAME_METADATA)
+    {
+      map_block(tf, tf->buffer.index + 1);
+      ltt_process_metadata_tracefile(tf);
+    }  
+    ltt_tracefile_read(tf);
+    
+    return 0;
+//       /* Error */
+    unmap_file:
+      if(munmap(tf->buffer.head,
+	    PAGE_ALIGN(ltt_subbuffer_header_size()))) {
+	g_warning("unmap size : %zu\n",
+	    PAGE_ALIGN(ltt_subbuffer_header_size()));
+	perror("munmap error");
+	g_assert(0);
+      }
+    close_file:
+      close(tf->fd);
+//    end:
+      if (tf->buf_index)
+	g_array_free(tf->buf_index, TRUE);
+      return -1;
+}
+
 
 /*****************************************************************************
  *Function name
@@ -567,8 +732,77 @@ static __attribute__ ((__unused__)) gboolean ltt_tracefile_group_has_cpu_online(
   }
   return 0;
 }
+/****************************************************************************
+ * get_absolute_pathname
+ *
+ * Update the number of tracefile of a "name" group
+ * This function is normally called when a tracefile is detected but cannot be parse at
+ * this point. The group size will count this file.
+ *
+ * return the unique pathname in the system
+ * 
+ * MD : Fixed this function so it uses realpath, dealing well with
+ * forgotten cases (.. were not used correctly before).
+ *
+ ****************************************************************************/
 
-
+/*
+* Update the number of tracefile in a "name" group
+* This function is normally called when a tracefile is detected but cannot be parse at
+* this point. The group size will count this file.
+*
+* return 1 if a new group is created
+* return 0 otherwise
+*
+**/
+int update_num_cpu(LttTrace *trace, GQuark name, guint num)
+{
+  struct marker_data *mdata;
+  GArray *group;
+  int i;
+  
+  group = g_datalist_id_get_data(&trace->tracefiles, name);
+  if (group == NULL)
+  {
+    group = g_array_sized_new (FALSE, TRUE, sizeof(LttTracefile), 10);
+    g_datalist_id_set_data_full(&trace->tracefiles, name,
+			group, ltt_tracefile_group_destroy);
+    mdata = allocate_marker_data();
+    if (!mdata)
+      g_error("Error in allocating marker data");
+    
+    group = g_array_set_size(group, num+1);
+    for (i = 0; i < group->len; i++)
+      g_array_index (group, LttTracefile, i).mdata = mdata;
+    return 1;
+  }
+  if ( num+1 > group->len )
+  {
+    mdata = g_array_index (group, LttTracefile, 0).mdata;
+    int old_num = group->len;
+    group = g_array_set_size(group, num+1);
+    for (i = old_num; i < group->len; i++)
+      g_array_index (group, LttTracefile, i).mdata = mdata;
+  }
+  if( name != LTT_TRACEFILE_NAME_METADATA ) {
+    update_num_cpu(trace, LTT_TRACEFILE_NAME_METADATA, num);
+  }
+  return 0;
+}
+/*****************************************************************************
+ *Function name
+ *    ltt_tracefile_add_streaming: open and add a tracefile to a trace
+ *				   normally used when a new file is received
+ *				   in a streaming analysis
+ *Input params
+ *    t                  : trace in which the tracefile will be add
+ *    root_path		 : path of the trace
+ *    rel_path		 : relative path of the tracefile
+ ****************************************************************************/
+void ltt_tracefile_add_streaming( LttTrace *t, gchar *root_path, gchar* rel_path)
+{	
+  	open_tracefiles_streaming(t, root_path, rel_path, TRUE);
+}
 /* Open each tracefile under a specific directory. Put them in a
  * GData : permits to access them using their tracefile group pathname.
  * i.e. access control/modules tracefile group by index :
@@ -579,9 +813,13 @@ static __attribute__ ((__unused__)) gboolean ltt_tracefile_group_has_cpu_online(
  *
  * A tracefile group is simply an array where all the per cpu tracefiles sit.
  */
-
 static int open_tracefiles(LttTrace *trace, gchar *root_path, gchar *relative_path)
 {
+  return open_tracefiles_streaming(trace, root_path, relative_path, FALSE);
+}
+
+static int open_tracefiles_streaming(LttTrace *trace, gchar *root_path, gchar *relative_path, gboolean isStreaming)
+{
   DIR *dir = opendir(root_path);
   struct dirent *entry;
   struct stat stat_buf;
@@ -634,7 +872,18 @@ static int open_tracefiles(LttTrace *trace, gchar *root_path, gchar *relative_pa
     if(S_ISDIR(stat_buf.st_mode)) {
 
       g_debug("Entering subdirectory...\n");
-      ret = open_tracefiles(trace, path, rel_path);
+      //Monitor the directory if we are in a streaming mode
+      if(isStreaming == TRUE)
+      {
+       trace->inotify_watch_array.elem = realloc(trace->inotify_watch_array.elem,
+	       ++trace->inotify_watch_array.num * sizeof(struct inotify_watch));
+
+       trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].wd = inotify_add_watch(trace->inotify_fd, path, IN_CREATE | IN_OPEN | IN_CLOSE_WRITE);
+       strcpy(trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].path_channel, path);
+       trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].name = g_quark_from_string("");
+       trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].num = -1;
+      }
+      ret = open_tracefiles_streaming(trace, path, rel_path, isStreaming);
       if(ret < 0) continue;
     } else if(S_ISREG(stat_buf.st_mode)) {
       GQuark name;
@@ -647,12 +896,27 @@ static int open_tracefiles(LttTrace *trace, gchar *root_path, gchar *relative_pa
       creation = 0;
       if(get_tracefile_name_number(rel_path, &name, &num, &tid, &pgid, &creation))
         continue; /* invalid name */
-      
+     //Monitor the directory if we are in a streaming mode
+     if(isStreaming == TRUE)
+     {
+       trace->inotify_watch_array.elem = realloc(trace->inotify_watch_array.elem,
+	       ++trace->inotify_watch_array.num * sizeof(struct inotify_watch));
+
+       trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].wd = inotify_add_watch(trace->inotify_fd, path, IN_OPEN | IN_MODIFY | IN_CLOSE);
+       strcpy(trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].path_channel, path);
+       trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].name = name;
+       trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].num = num;
+      }
       g_debug("Opening file.\n");
-      if(ltt_tracefile_open(trace, path, &tmp_tf)) {
+      if(ltt_tracefile_open_streaming(trace, path, &tmp_tf, isStreaming)) {
         g_info("Error opening tracefile %s", path);
-
-        continue; /* error opening the tracefile : bad magic number ? */
+	if(isStreaming == TRUE)
+	{
+	  // Take to account the current file in the Cpu count of the group
+	  update_num_cpu(trace, name, num);
+	}
+	continue; /* error opening the tracefile : bad magic number ? */
+	    
       }
 
       g_debug("Tracefile name is %s and number is %u", 
@@ -678,6 +942,9 @@ static int open_tracefiles(LttTrace *trace, gchar *root_path, gchar *relative_pa
           g_error("Error in allocating marker data");
       }
 
+      if(isStreaming == TRUE && name != LTT_TRACEFILE_NAME_METADATA)
+	update_num_cpu(trace, LTT_TRACEFILE_NAME_METADATA, num);
+      
       /* Add the per cpu tracefile to the named group */
       unsigned int old_len = group->len;
       if(num+1 > old_len)
@@ -802,9 +1069,13 @@ seek_error:
  *
  * pathname must be the directory of the trace
  */
-
 LttTrace *ltt_trace_open(const gchar *pathname)
 {
+  return ltt_trace_open_streaming(pathname, FALSE);
+}
+
+LttTrace *ltt_trace_open_streaming(const gchar *pathname, gboolean isStreaming)
+{
   gchar abs_path[PATH_MAX];
   LttTrace  * t;
   LttTracefile *tf;
@@ -842,10 +1113,18 @@ LttTrace *ltt_trace_open(const gchar *pathname)
     }
   }
   closedir(dir);
-  
+ 
+  if(isStreaming == TRUE)
+  {
+    t->inotify_fd = inotify_init();
+    fcntl(t->inotify_fd, F_SETFL, O_NONBLOCK);
+    t->inotify_watch_array.elem = NULL;
+    t->inotify_watch_array.num = 0;
+    t->open_tracefile_counter = 0;
+  }
   /* Open all the tracefiles */
   t->start_freq= 0;
-  if(open_tracefiles(t, abs_path, "")) {
+  if(open_tracefiles_streaming(t, abs_path, "", isStreaming)) {
     g_warning("Error opening tracefile %s", abs_path);
     goto find_error;
   }
@@ -858,15 +1137,30 @@ LttTrace *ltt_trace_open(const gchar *pathname)
   }
 
   /*
-   * Get the trace information for the metadata_0 tracefile.
    * Getting a correct trace start_time and start_tsc is insured by the fact
    * that no subbuffers are supposed to be lost in the metadata channel.
    * Therefore, the first subbuffer contains the start_tsc timestamp in its
    * buffer header.
    */
   g_assert(group->len > 0);
-  tf = &g_array_index (group, LttTracefile, 0);
-  header = (ltt_subbuffer_header_t *)tf->buffer.head;
+  if(isStreaming == TRUE)
+  {
+    //Get the trace information from the metadata tracefile.
+    for(i = 0; i < group->len; i++)
+    {
+      tf = &g_array_index (group, LttTracefile, i);
+      header = (ltt_subbuffer_header_t *)tf->buffer.head;
+      if(header != NULL)
+	break;
+    }
+  }
+  else
+  {
+    //Get the trace information from the metadata_0 tracefile.
+    tf = &g_array_index (group, LttTracefile, 0);
+    header = (ltt_subbuffer_header_t *)tf->buffer.head;
+  }
+    
   ret = parse_trace_header(header, tf, t);
   g_assert(!ret);
 
-- 
1.7.0.4





More information about the lttng-dev mailing list