[ltt-dev] [PATCH 1/2] Add the support for streaming analysis in the libltttraceread library
Oussama El Mfadli
oussama.el-mfadli at polymtl.ca
Fri Oct 1 20:56:47 EDT 2010
Hi,
thanks for you feed-back Mathieu. I made the modifications that you
requested.
The patch is following:
- Modification of the opening of the tracefile
and the way some informations are retreived
- Add some implementations to append the new arriving blocks into the
index
---
ltt/ltt-private.h | 3 +
ltt/trace.h | 97 +++++++++++++++++-
ltt/tracefile.c | 296
+++++++++++++++++++++++++++++++++++++++++++++++------
3 files changed, 364 insertions(+), 32 deletions(-)
diff --git a/ltt/ltt-private.h b/ltt/ltt-private.h
index 65d73d1..9f23690 100644
--- a/ltt/ltt-private.h
+++ b/ltt/ltt-private.h
@@ -171,6 +171,9 @@ struct LttTracefile {
/* Current block */
LttBuffer buffer; //current buffer
+
+ /* For streaming usage: in case the tracefile is currently empty */
+ gboolean streaming_is_enabled;
};
/* The characteristics of the system on which the trace was obtained
diff --git a/ltt/trace.h b/ltt/trace.h
index e16c66f..a1503d3 100644
--- a/ltt/trace.h
+++ b/ltt/trace.h
@@ -24,6 +24,35 @@
#include <ltt/ltt-private.h>
#include <stdint.h>
#include <glib.h>
+#include <sys/stat.h>
+#include <string.h>
+#include <stdlib.h>
+
+#include <linux/version.h>
+
+//Options used to open a trace
+#define NO_FLAG 0
+#define TRACE_STREAMING (1 << 0)
+
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,14)
+#include <sys/inotify.h>
+
+#define HAS_INOTIFY
+#else
+
+#undef HAS_INOTIFY
+#endif
+struct inotify_watch {
+ int wd;
+ char path_channel[PATH_MAX];
+ GQuark name;
+ guint num;
+};
+
+struct inotify_watch_array {
+ struct inotify_watch *elem;
+ int num;
+};
struct LttTrace {
GQuark pathname; //the pathname of the trace
@@ -47,7 +76,70 @@ struct LttTrace {
LttTime start_time_from_tsc;
GData *tracefiles; //tracefiles groups
+
+ // Keep a inotify file descriptor array to detect the changes in the
files
+ struct inotify_watch_array inotify_watch_array;
+ int inotify_fd;
+ int open_tracefile_counter;
+
+ gboolean streaming_is_enable;
};
+#ifndef HAS_INOTIFY
+static inline int ltt_tracefile_check_size(struct stat* lTDFStat)
+{
+ if(tf->file_size < lTDFStat->st_size)
+ {
+ return 0;
+ }
+ return 1;
+}
+
+static inline int add_directory_to_watch_array(LttTrace *trace)
+{
+ return 0;
+}
+
+static inline int add_tracefile_to_watch_array(LttTrace *trace)
+{
+ return 0;
+}
+#else
+static inline int ltt_tracefile_check_size(struct stat* lTDFStat)
+{
+ return 0;
+}
+
+static inline int add_directory_to_watch_array(LttTrace *trace, const
gchar* path)
+{
+ if(trace->streaming_is_enable == TRUE)
+ {
+ trace->inotify_watch_array.elem =
realloc(trace->inotify_watch_array.elem,
+ ++trace->inotify_watch_array.num * sizeof(struct inotify_watch));
+
+ trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].wd =
inotify_add_watch(trace->inotify_fd, path, IN_CREATE | IN_OPEN |
IN_CLOSE_WRITE);
+
strcpy(trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].path_channel,
path);
+ trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].name
= g_quark_from_string("");
+ trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].num =
-1;
+ return 1;
+ }
+ return 0;
+}
+
+static inline int add_tracefile_to_watch_array(LttTrace *trace, const
gchar* path, GQuark name, guint num)
+{
+ if(trace->streaming_is_enable == TRUE)
+ {
+ trace->inotify_watch_array.elem =
realloc(trace->inotify_watch_array.elem,
+ ++trace->inotify_watch_array.num * sizeof(struct inotify_watch));
+
+ trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].wd
= inotify_add_watch(trace->inotify_fd, path, IN_OPEN | IN_MODIFY |
IN_CLOSE);
+
strcpy(trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].path_channel,
path);
+
trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].name =
name;
+ trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].num
= num;
+ }
+ return 0;
+}
+#endif
static inline guint ltt_trace_get_num_cpu(LttTrace *t)
{
@@ -65,8 +157,9 @@ static inline guint ltt_trace_get_num_cpu(LttTrace *t)
*/
-LttTrace *ltt_trace_open(const gchar *pathname);
-
+LttTrace *ltt_trace_open(const gchar *pathname, unsigned int flags);
+int open_tracefiles(LttTrace *trace, gchar *root_path,
+ gchar *relative_path);
/* copy reopens a trace
*
* return value NULL if error while opening the trace
diff --git a/ltt/tracefile.c b/ltt/tracefile.c
index 0d8a248..607ca7c 100644
--- a/ltt/tracefile.c
+++ b/ltt/tracefile.c
@@ -99,8 +99,6 @@ static guint64 cycles_2_ns(LttTracefile *tf, guint64
cycles);
/* go to the next event */
static int ltt_seek_next_event(LttTracefile *tf);
-static int open_tracefiles(LttTrace *trace, gchar *root_path,
- gchar *relative_path);
static int ltt_process_metadata_tracefile(LttTracefile *tf);
static void ltt_tracefile_time_span_get(LttTracefile *tf,
LttTime *start, LttTime *end);
@@ -211,18 +209,14 @@ int get_block_offset_size(LttTracefile *tf, guint
block_num,
return 0;
}
-int ltt_trace_create_block_index(LttTracefile *tf)
-{
- int page_size = getpagesize();
- uint64_t offset = 0;
- unsigned long i = 0;
- unsigned int header_map_size = PAGE_ALIGN(ltt_subbuffer_header_size());
-
- tf->buf_index = g_array_sized_new(FALSE, TRUE, sizeof(uint64_t),
- DEFAULT_N_BLOCKS);
- g_assert(tf->buf_index->len == i);
+static int ltt_trace_update_block_index(LttTracefile *tf, uint64_t offset,
unsigned long firstBlock)
+{
+ int i = firstBlock;
+ int page_size = getpagesize();
+ unsigned int header_map_size =
PAGE_ALIGN(ltt_subbuffer_header_size());
+ g_assert(tf->buf_index->len == i);
while (offset < tf->file_size) {
ltt_subbuffer_header_t *header;
uint64_t *off;
@@ -255,6 +249,48 @@ int ltt_trace_create_block_index(LttTracefile *tf)
return 0;
}
+/* parse the new information from the file and reajust the number of
blocks.
+ *
+ * Return value : 0 success, -1 error
+ */
+int ltt_trace_continue_block_index(LttTracefile *tf)
+{
+ int ret;
+ uint64_t offset;
+ uint32_t last_block_size;
+ unsigned long i = tf->num_blocks;
+ int page_size = getpagesize();
+ unsigned int header_map_size = PAGE_ALIGN(ltt_subbuffer_header_size());
+
+ get_block_offset_size(tf, tf->num_blocks-1, &offset, &last_block_size);
+
+ ltt_subbuffer_header_t *header_tmp = mmap(0, header_map_size, PROT_READ,
+ MAP_PRIVATE, tf->fd, (off_t)offset);
+ if(header_tmp == MAP_FAILED) {
+ perror("Error in allocating memory for buffer of tracefile");
+ return -1;
+ }
+
+ /* read len, offset += len */
+ offset += ltt_get_uint32(LTT_GET_BO(tf), &header_tmp->sb_size);
+
+ ret = ltt_trace_update_block_index(tf, offset, i);
+}
+
+int ltt_trace_create_block_index(LttTracefile *tf)
+{
+ int ret;
+ uint64_t offset = 0;
+ unsigned long i = 0;
+
+ tf->buf_index = g_array_sized_new(FALSE, TRUE, sizeof(uint64_t),
+ DEFAULT_N_BLOCKS);
+ if(!tf->buf_index)
+ return -1;
+ ret = ltt_trace_update_block_index(tf, offset, i);
+ return ret;
+}
+
/*****************************************************************************
*Function name
* ltt_tracefile_open : open a trace file, construct a LttTracefile
@@ -271,6 +307,7 @@ static gint ltt_tracefile_open(LttTrace *t, gchar *
fileName, LttTracefile *tf)
struct stat lTDFStat; /* Trace data file status */
ltt_subbuffer_header_t *header;
int page_size = getpagesize();
+ tf->streaming_is_enabled = FALSE;
//open the file
tf->long_name = g_quark_from_string(fileName);
@@ -291,10 +328,31 @@ static gint ltt_tracefile_open(LttTrace *t, gchar *
fileName, LttTracefile *tf)
// Is the file large enough to contain a trace
if(lTDFStat.st_size <
(off_t)(ltt_subbuffer_header_size())){
- g_print("The input data file %s does not contain a trace\n", fileName);
- goto close_file;
+ if(t->streaming_is_enable == FALSE)
+ {
+ g_print("The input data file %s does not contain a trace\n",
fileName);
+ goto close_file;
+ }
+ else
+ {
+ //We are in a streaming mode, so the file can be empty at the start
of the analysis
+ tf->streaming_is_enabled = TRUE;
+ tf->file_size = lTDFStat.st_size;
+ tf->events_lost = 0;
+ tf->subbuf_corrupt = 0;
+
+ tf->buffer.head = NULL;
+ return -1;
+ }
}
+ tf->buffer.head = NULL;
+
+ //store the size of the file
+ tf->file_size = lTDFStat.st_size;
+ tf->events_lost = 0;
+ tf->subbuf_corrupt = 0;
+
/* Temporarily map the buffer start header to get trace information */
/* Multiple of pages aligned head */
tf->buffer.head = mmap(0,
@@ -312,11 +370,6 @@ static gint ltt_tracefile_open(LttTrace *t, gchar *
fileName, LttTracefile *tf)
g_warning("parse_trace_header error");
goto unmap_file;
}
-
- //store the size of the file
- tf->file_size = lTDFStat.st_size;
- tf->events_lost = 0;
- tf->subbuf_corrupt = 0;
if(munmap(tf->buffer.head,
PAGE_ALIGN(ltt_subbuffer_header_size()))) {
@@ -361,6 +414,103 @@ end:
return -1;
}
+/*****************************************************************************
+ *Function name
+ * ltt_tracefile_update : Update the informations concerning a tracefile
+ * Must be called periodically to update trace file and file size
+ information.
+ *Input params
+ * tf : the tracefile
+ *Return value
+ * : 0 for success, -1 otherwise.
+
****************************************************************************/
+int ltt_tracefile_update(LttTracefile *tf, LttTrace *t)
+{
+ struct stat lTDFStat; /* Trace data file status */
+ int page_size = getpagesize();
+ if(fstat(tf->fd, &lTDFStat) < 0){
+ perror("error in getting the tracefile informations.");
+ }
+
+ if(!ltt_tracefile_check_size(&lTDFStat))
+ {
+ //Ajust the size of the file
+ tf->file_size = lTDFStat.st_size;
+
+ if(tf->streaming_is_enabled)
+ {
+ ltt_subbuffer_header_t *header;
+
+ /* Temporarily map the buffer start header to get trace information
*/
+ /* Multiple of pages aligned head */
+ tf->buffer.head = mmap(0,
+ PAGE_ALIGN(ltt_subbuffer_header_size()), PROT_READ,
+ MAP_PRIVATE, tf->fd, 0);
+ if(tf->buffer.head == MAP_FAILED) {
+ perror("Error in allocating memory for buffer of tracefile");
+ goto unmap_file;
+ }
+ g_assert( ( (gulong)tf->buffer.head&(8-1) ) == 0); // make sure it's
aligned.
+
+ header = (ltt_subbuffer_header_t *)tf->buffer.head;
+
+ if(parse_trace_header(header, tf, t))
+ {
+ g_warning("parse_trace_header error");
+ goto unmap_file;
+ }
+
+ if(munmap(tf->buffer.head,
+ PAGE_ALIGN(ltt_subbuffer_header_size())))
+ {
+ g_warning("unmap size : %zu\n",
+ PAGE_ALIGN(ltt_subbuffer_header_size()));
+ perror("munmap error");
+ g_assert(0);
+ }
+ tf->buffer.head = NULL;
+
+ /* Create block index */
+ ltt_trace_create_block_index(tf);
+
+ //read the first block
+ if(map_block(tf,0))
+ {
+ perror("Cannot map block for tracefile");
+ goto unmap_file;
+ }
+ tf->streaming_is_enabled = FALSE;
+ }
+ else
+ {
+ //Retreive the new subbuffers and index them
+ ltt_trace_continue_block_index(tf);
+ }
+
+ //Update the metadata table if applicable
+ if(tf->name == LTT_TRACEFILE_NAME_METADATA)
+ {
+ map_block(tf, tf->buffer.index + 1);
+ ltt_process_metadata_tracefile(tf);
+ }
+ ltt_tracefile_read(tf);
+
+ return 1;
+ }
+
+ return 0;
+// /* Error */
+ unmap_file:
+ if(munmap(tf->buffer.head,
+ PAGE_ALIGN(ltt_subbuffer_header_size()))) {
+ g_warning("unmap size : %zu\n",
+ PAGE_ALIGN(ltt_subbuffer_header_size()));
+ perror("munmap error");
+ g_assert(0);
+ }
+ return -1;
+}
+
/*****************************************************************************
*Function name
@@ -576,7 +726,51 @@ static __attribute__ ((__unused__)) gboolean
ltt_tracefile_group_has_cpu_online(
}
return 0;
}
+/****************************************************************************
+ * update_num_cpu
+ *
+ * Update the number of tracefile in a "name" group
+ * This function is normally called when a tracefile is detected but cannot
be parsed at
+ * this point. The group size will account for this file.
+ *
+ * return 1 if a new group is created
+ * return 0 otherwise
+ *
+
****************************************************************************/
+int update_num_cpu(LttTrace *trace, GQuark name, guint num)
+{
+ struct marker_data *mdata;
+ GArray *group;
+ int i;
+ group = g_datalist_id_get_data(&trace->tracefiles, name);
+ if (group == NULL)
+ {
+ group = g_array_sized_new (FALSE, TRUE, sizeof(LttTracefile), 10);
+ g_datalist_id_set_data_full(&trace->tracefiles, name,
+ group, ltt_tracefile_group_destroy);
+ mdata = allocate_marker_data();
+ if (!mdata)
+ g_error("Error in allocating marker data");
+
+ group = g_array_set_size(group, num+1);
+ for (i = 0; i < group->len; i++)
+ g_array_index (group, LttTracefile, i).mdata = mdata;
+ return 1;
+ }
+ if(num+1 > group->len)
+ {
+ mdata = g_array_index (group, LttTracefile, 0).mdata;
+ int old_num = group->len;
+ group = g_array_set_size(group, num+1);
+ for (i = old_num; i < group->len; i++)
+ g_array_index (group, LttTracefile, i).mdata = mdata;
+ }
+ if( name != LTT_TRACEFILE_NAME_METADATA ) {
+ update_num_cpu(trace, LTT_TRACEFILE_NAME_METADATA, num);
+ }
+ return 0;
+}
/* Open each tracefile under a specific directory. Put them in a
* GData : permits to access them using their tracefile group pathname.
@@ -588,8 +782,7 @@ static __attribute__ ((__unused__)) gboolean
ltt_tracefile_group_has_cpu_online(
*
* A tracefile group is simply an array where all the per cpu tracefiles
sit.
*/
-
-static int open_tracefiles(LttTrace *trace, gchar *root_path, gchar
*relative_path)
+int open_tracefiles(LttTrace *trace, gchar *root_path, gchar
*relative_path)
{
DIR *dir = opendir(root_path);
struct dirent *entry;
@@ -643,6 +836,9 @@ static int open_tracefiles(LttTrace *trace, gchar
*root_path, gchar *relative_pa
if(S_ISDIR(stat_buf.st_mode)) {
g_debug("Entering subdirectory...\n");
+ //Monitor the directory if we are in a streaming mode
+ add_directory_to_watch_array(trace, path);
+
ret = open_tracefiles(trace, path, rel_path);
if(ret < 0) continue;
} else if(S_ISREG(stat_buf.st_mode)) {
@@ -656,12 +852,19 @@ static int open_tracefiles(LttTrace *trace, gchar
*root_path, gchar *relative_pa
creation = 0;
if(get_tracefile_name_number(rel_path, &name, &num, &tid, &pgid,
&creation))
continue; /* invalid name */
-
+
+ //Monitor the directory if we are in a streaming mode
+ add_tracefile_to_watch_array(trace, path, name, num);
g_debug("Opening file.\n");
if(ltt_tracefile_open(trace, path, &tmp_tf)) {
g_info("Error opening tracefile %s", path);
+ if(trace->streaming_is_enable == TRUE)
+ {
+ // Take to account the current file in the Cpu count of the group
+ update_num_cpu(trace, name, num);
+ }
+ continue; /* error opening the tracefile : bad magic number ? */
- continue; /* error opening the tracefile : bad magic number ? */
}
g_debug("Tracefile name is %s and number is %u",
@@ -687,6 +890,9 @@ static int open_tracefiles(LttTrace *trace, gchar
*root_path, gchar *relative_pa
g_error("Error in allocating marker data");
}
+ if(trace->streaming_is_enable == TRUE && name !=
LTT_TRACEFILE_NAME_METADATA)
+ update_num_cpu(trace, LTT_TRACEFILE_NAME_METADATA, num);
+
/* Add the per cpu tracefile to the named group */
unsigned int old_len = group->len;
if(num+1 > old_len)
@@ -811,8 +1017,7 @@ seek_error:
*
* pathname must be the directory of the trace
*/
-
-LttTrace *ltt_trace_open(const gchar *pathname)
+LttTrace *ltt_trace_open(const gchar *pathname, unsigned int flags)
{
gchar abs_path[PATH_MAX];
LttTrace * t;
@@ -832,6 +1037,8 @@ LttTrace *ltt_trace_open(const gchar *pathname)
get_absolute_pathname(pathname, abs_path);
t->pathname = g_quark_from_string(abs_path);
+ t->streaming_is_enable = FALSE;
+
g_datalist_init(&t->tracefiles);
/* Test to see if it looks like a trace */
@@ -851,7 +1058,18 @@ LttTrace *ltt_trace_open(const gchar *pathname)
}
}
closedir(dir);
-
+
+ if(flags & TRACE_STREAMING)
+ {
+ t->streaming_is_enable = TRUE;
+#ifdef HAS_INOTIFY
+ t->inotify_fd = inotify_init();
+ fcntl(t->inotify_fd, F_SETFL, O_NONBLOCK);
+ t->inotify_watch_array.elem = NULL;
+ t->inotify_watch_array.num = 0;
+ t->open_tracefile_counter = 0;
+#endif
+ }
/* Open all the tracefiles */
t->start_freq= 0;
if(open_tracefiles(t, abs_path, "")) {
@@ -867,15 +1085,30 @@ LttTrace *ltt_trace_open(const gchar *pathname)
}
/*
- * Get the trace information for the metadata_0 tracefile.
* Getting a correct trace start_time and start_tsc is insured by the
fact
* that no subbuffers are supposed to be lost in the metadata channel.
* Therefore, the first subbuffer contains the start_tsc timestamp in its
* buffer header.
*/
g_assert(group->len > 0);
- tf = &g_array_index (group, LttTracefile, 0);
- header = (ltt_subbuffer_header_t *)tf->buffer.head;
+ if(flags & TRACE_STREAMING)
+ {
+ //Get the trace information from the metadata tracefile.
+ for(i = 0; i < group->len; i++)
+ {
+ tf = &g_array_index (group, LttTracefile, i);
+ header = (ltt_subbuffer_header_t *)tf->buffer.head;
+ if(header != NULL)
+ break;
+ }
+ }
+ else
+ {
+ //Get the trace information from the metadata_0 tracefile.
+ tf = &g_array_index (group, LttTracefile, 0);
+ header = (ltt_subbuffer_header_t *)tf->buffer.head;
+ }
+
ret = parse_trace_header(header, tf, t);
g_assert(!ret);
@@ -919,7 +1152,10 @@ alloc_error:
*/
LttTrace *ltt_trace_copy(LttTrace *self)
{
- return ltt_trace_open(g_quark_to_string(self->pathname));
+ unsigned int flag = NO_FLAG;
+ if(self->streaming_is_enable)
+ flag |= TRACE_STREAMING;
+ return ltt_trace_open(g_quark_to_string(self->pathname), flag);
}
/*
--
1.7.0.4
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.casi.polymtl.ca/pipermail/lttng-dev/attachments/20101001/9862bd19/attachment-0001.htm>
More information about the lttng-dev
mailing list