[ltt-dev] [PATCH 1/2] Add the support for streaming analysis in the libltttraceread library
Oussama El Mfadli
oussama.el-mfadli at polymtl.ca
Fri Aug 27 13:09:18 EDT 2010
Add the support for streaming analysis in the libltttraceread library
- Modification of the opening of the tracefile
and the way some informations are retreived
- Add some implementations to append the new arriving blocks into
the index
---
ltt/ltt-private.h | 3 +
ltt/trace.h | 22 +++-
ltt/tracefile.c | 366
+++++++++++++++++++++++++++++++++++++++++++++++-----
3 files changed, 354 insertions(+), 37 deletions(-)
diff --git a/ltt/ltt-private.h b/ltt/ltt-private.h
index 65d73d1..a0bcad3 100644
--- a/ltt/ltt-private.h
+++ b/ltt/ltt-private.h
@@ -171,6 +171,9 @@ struct LttTracefile {
/* Current block */
LttBuffer buffer; //current buffer
+
+ /* For streaming usage: in case the tracefile is currently empty */
+ gboolean Streaming_isEmpty;
};
/* The characteristics of the system on which the trace was obtained
diff --git a/ltt/trace.h b/ltt/trace.h
index e16c66f..abbb254 100644
--- a/ltt/trace.h
+++ b/ltt/trace.h
@@ -25,6 +25,20 @@
#include <stdint.h>
#include <glib.h>
+#include <sys/inotify.h>
+
+struct inotify_watch {
+ int wd;
+ char path_channel[PATH_MAX];
+ GQuark name;
+ guint num;
+};
+
+struct inotify_watch_array {
+ struct inotify_watch *elem;
+ int num;
+};
+
struct LttTrace {
GQuark pathname; //the pathname of the trace
//LttSystemDescription * system_description;//system description
@@ -47,6 +61,11 @@ struct LttTrace {
LttTime start_time_from_tsc;
GData *tracefiles; //tracefiles groups
+
+ // Keep a inotify file descriptor array to detect the changes in the
files
+ struct inotify_watch_array inotify_watch_array;
+ int inotify_fd;
+ int open_tracefile_counter;
};
static inline guint ltt_trace_get_num_cpu(LttTrace *t)
@@ -66,7 +85,8 @@ static inline guint ltt_trace_get_num_cpu(LttTrace *t)
*/
LttTrace *ltt_trace_open(const gchar *pathname);
-
+LttTrace *ltt_trace_open_streaming(const gchar *pathname, gboolean
isStreaming);
+void ltt_tracefile_add_streaming( LttTrace *t, gchar *root_path, gchar*
rel_path);
/* copy reopens a trace
*
* return value NULL if error while opening the trace
diff --git a/ltt/tracefile.c b/ltt/tracefile.c
index 54da747..2d03ed3 100644
--- a/ltt/tracefile.c
+++ b/ltt/tracefile.c
@@ -99,6 +99,8 @@ static int ltt_seek_next_event(LttTracefile *tf);
static int open_tracefiles(LttTrace *trace, gchar *root_path,
gchar *relative_path);
+static int open_tracefiles_streaming(LttTrace *trace, gchar *root_path,
+ gchar *relative_path, gboolean isStreaming);
static int ltt_process_metadata_tracefile(LttTracefile *tf);
static void ltt_tracefile_time_span_get(LttTracefile *tf,
LttTime *start, LttTime *end);
@@ -209,26 +211,22 @@ int get_block_offset_size(LttTracefile *tf, guint
block_num,
return 0;
}
-int ltt_trace_create_block_index(LttTracefile *tf)
-{
- int page_size = getpagesize();
- uint64_t offset = 0;
- unsigned long i = 0;
- unsigned int header_map_size = PAGE_ALIGN(ltt_subbuffer_header_size());
- tf->buf_index = g_array_sized_new(FALSE, TRUE, sizeof(uint64_t),
- DEFAULT_N_BLOCKS);
-
- g_assert(tf->buf_index->len == i);
-
- while (offset < tf->file_size) {
+int ltt_trace_process_block_index(LttTracefile *tf, uint64_t offset,
unsigned long firstBlock)
+{
+ int i = firstBlock;
+ int page_size = getpagesize();
+ unsigned int header_map_size =
PAGE_ALIGN(ltt_subbuffer_header_size());
+
+ g_assert(tf->buf_index->len == i);
+ while (offset < ltt_get_uint32(LTT_GET_BO(tf), &tf->file_size)) {
ltt_subbuffer_header_t *header;
uint64_t *off;
tf->buf_index = g_array_set_size(tf->buf_index, i + 1);
off = &g_array_index(tf->buf_index, uint64_t, i);
*off = offset;
-
+
/* map block header */
header = mmap(0, header_map_size, PROT_READ,
MAP_PRIVATE, tf->fd, (off_t)offset);
@@ -253,22 +251,65 @@ int ltt_trace_create_block_index(LttTracefile *tf)
return 0;
}
+/* parse the new information from the file and reajust the number of
blocks.
+ *
+ * Return value : 0 success, -1 error
+ */
+int ltt_trace_update_block_index(LttTracefile *tf)
+{
+ int ret;
+ uint64_t offset;
+ uint32_t last_block_size;
+ unsigned long i = tf->num_blocks;
+ int page_size = getpagesize();
+ unsigned int header_map_size = PAGE_ALIGN(ltt_subbuffer_header_size());
+
+ get_block_offset_size(tf, tf->num_blocks-1, &offset, &last_block_size)
+
+ ltt_subbuffer_header_t *header_tmp = mmap(0, header_map_size, PROT_READ,
+ MAP_PRIVATE, tf->fd, (off_t)offset);
+ if(header_tmp == MAP_FAILED) {
+ perror("Error in allocating memory for buffer of tracefile");
+ return -1;
+ }
+
+ /* read len, offset += len */
+ offset += ltt_get_uint32(LTT_GET_BO(tf), &header_tmp->sb_size);
+
+ ret = ltt_trace_process_block_index(tf, offset, i);
+}
+
+int ltt_trace_create_block_index(LttTracefile *tf)
+{
+ int ret;
+ uint64_t offset = 0;
+ unsigned long i = 0;
+
+ tf->buf_index = g_array_sized_new(FALSE, TRUE, sizeof(uint64_t),
+ DEFAULT_N_BLOCKS);
+ ret = ltt_trace_process_block_index(tf, offset, i);
+ return ret;
+}
+
/*****************************************************************************
*Function name
- * ltt_tracefile_open : open a trace file, construct a LttTracefile
+ * ltt_tracefile_open_streaming : open a trace file, construct a
LttTracefile
+ * The trace file can be a streaming
*Input params
* t : the trace containing the tracefile
* fileName : path name of the trace file
* tf : the tracefile structure
+ * isStreaming : flag to open in a streaming mode (TRUE) or not
(FALSE).
*Return value
* : 0 for success, -1 otherwise.
****************************************************************************/
-
-static gint ltt_tracefile_open(LttTrace *t, gchar * fileName,
LttTracefile *tf)
+static gint ltt_tracefile_open_streaming(LttTrace *t, gchar * fileName,
LttTracefile *tf, gboolean isStreaming)
{
struct stat lTDFStat; /* Trace data file status */
ltt_subbuffer_header_t *header;
int page_size = getpagesize();
+
+ tf->Streaming_isEmpty = FALSE;
//open the file
tf->long_name = g_quark_from_string(fileName);
@@ -289,10 +330,31 @@ static gint ltt_tracefile_open(LttTrace *t, gchar
* fileName, LttTracefile *tf)
// Is the file large enough to contain a trace
if(lTDFStat.st_size <
(off_t)(ltt_subbuffer_header_size())){
- g_print("The input data file %s does not contain a trace\n", fileName);
- goto close_file;
+ if(isStreaming == FALSE)
+ {
+ g_print("The input data file %s does not contain a trace\n",
fileName);
+ goto close_file;
+ }
+ else
+ {
+ //We are in a streaming mode, so the file can be empty at the
start of the analysis
+ tf->Streaming_isEmpty = TRUE;
+ tf->file_size = lTDFStat.st_size;
+ tf->events_lost = 0;
+ tf->subbuf_corrupt = 0;
+
+ tf->buffer.head = NULL;
+ return -1;
+ }
}
+ tf->buffer.head = NULL;
+
+ //store the size of the file
+ tf->file_size = lTDFStat.st_size;
+ tf->events_lost = 0;
+ tf->subbuf_corrupt = 0;
+
/* Temporarily map the buffer start header to get trace information */
/* Multiple of pages aligned head */
tf->buffer.head = mmap(0,
@@ -310,11 +372,6 @@ static gint ltt_tracefile_open(LttTrace *t, gchar *
fileName, LttTracefile *tf)
g_warning("parse_trace_header error");
goto unmap_file;
}
-
- //store the size of the file
- tf->file_size = lTDFStat.st_size;
- tf->events_lost = 0;
- tf->subbuf_corrupt = 0;
if(munmap(tf->buffer.head,
PAGE_ALIGN(ltt_subbuffer_header_size()))) {
@@ -353,6 +410,114 @@ end:
return -1;
}
+/*****************************************************************************
+ *Function name
+ * ltt_tracefile_open : open a trace file, construct a LttTracefile
+ *Input params
+ * t : the trace containing the tracefile
+ * fileName : path name of the trace file
+ * tf : the tracefile structure
+ *Return value
+ * : 0 for success, -1 otherwise.
+
****************************************************************************/
+static gint ltt_tracefile_open(LttTrace *t, gchar * fileName,
LttTracefile *tf)
+{
+ return ltt_tracefile_open_streaming(t, fileName, tf, FALSE);
+}
+
+/*****************************************************************************
+ *Function name
+ * ltt_tracefile_update : Update the informations concerning a tracefile
+ * Must be called periodically in order to have the
latest informations
+ *Input params
+ * tf : the tracefile
+ *Return value
+ * : 0 for success, -1 otherwise.
+
****************************************************************************/
+int ltt_tracefile_update(LttTracefile *tf, LttTrace *t)
+{
+ struct stat lTDFStat; /* Trace data file status */
+ if(fstat(tf->fd, &lTDFStat) < 0){
+ perror("error in getting the tracefile informations.");
+ }
+ int page_size = getpagesize();
+
+ //Ajust the size of the file
+ tf->file_size = lTDFStat.st_size;
+
+ if(tf->Streaming_isEmpty)
+ {
+ ltt_subbuffer_header_t *header;
+
+ /* Temporarily map the buffer start header to get trace
information */
+ /* Multiple of pages aligned head */
+ tf->buffer.head = mmap(0,
+ PAGE_ALIGN(ltt_subbuffer_header_size()), PROT_READ,
+ MAP_PRIVATE, tf->fd, 0);
+ if(tf->buffer.head == MAP_FAILED) {
+ perror("Error in allocating memory for buffer of tracefile");
+ goto close_file;
+ }
+ g_assert( ( (gulong)tf->buffer.head&(8-1) ) == 0); // make sure
it's aligned.
+
+ header = (ltt_subbuffer_header_t *)tf->buffer.head;
+
+ if(parse_trace_header(header, tf, t)) {
+ g_warning("parse_trace_header error");
+ goto unmap_file;
+ }
+
+ if(munmap(tf->buffer.head,
+ PAGE_ALIGN(ltt_subbuffer_header_size()))) {
+ g_warning("unmap size : %zu\n",
+ PAGE_ALIGN(ltt_subbuffer_header_size()));
+ perror("munmap error");
+ g_assert(0);
+ }
+ tf->buffer.head = NULL;
+
+ /* Create block index */
+ ltt_trace_create_block_index(tf);
+
+ //read the first block
+ if(map_block(tf,0)) {
+ perror("Cannot map block for tracefile");
+ goto close_file;
+ }
+ tf->Streaming_isEmpty = FALSE;
+ }
+ else
+ {
+ //Retreive the new subbuffers and index them
+ ltt_trace_update_block_index(tf);
+ }
+
+ //Update the metadata table if applicable
+ if(tf->name == LTT_TRACEFILE_NAME_METADATA)
+ {
+ map_block(tf, tf->buffer.index + 1);
+ ltt_process_metadata_tracefile(tf);
+ }
+ ltt_tracefile_read(tf);
+
+ return 0;
+// /* Error */
+ unmap_file:
+ if(munmap(tf->buffer.head,
+ PAGE_ALIGN(ltt_subbuffer_header_size()))) {
+ g_warning("unmap size : %zu\n",
+ PAGE_ALIGN(ltt_subbuffer_header_size()));
+ perror("munmap error");
+ g_assert(0);
+ }
+ close_file:
+ close(tf->fd);
+// end:
+ if (tf->buf_index)
+ g_array_free(tf->buf_index, TRUE);
+ return -1;
+}
+
/*****************************************************************************
*Function name
@@ -567,8 +732,77 @@ static __attribute__ ((__unused__)) gboolean
ltt_tracefile_group_has_cpu_online(
}
return 0;
}
+/****************************************************************************
+ * get_absolute_pathname
+ *
+ * Update the number of tracefile of a "name" group
+ * This function is normally called when a tracefile is detected but
cannot be parse at
+ * this point. The group size will count this file.
+ *
+ * return the unique pathname in the system
+ *
+ * MD : Fixed this function so it uses realpath, dealing well with
+ * forgotten cases (.. were not used correctly before).
+ *
+
****************************************************************************/
-
+/*
+* Update the number of tracefile in a "name" group
+* This function is normally called when a tracefile is detected but
cannot be parse at
+* this point. The group size will count this file.
+*
+* return 1 if a new group is created
+* return 0 otherwise
+*
+**/
+int update_num_cpu(LttTrace *trace, GQuark name, guint num)
+{
+ struct marker_data *mdata;
+ GArray *group;
+ int i;
+
+ group = g_datalist_id_get_data(&trace->tracefiles, name);
+ if (group == NULL)
+ {
+ group = g_array_sized_new (FALSE, TRUE, sizeof(LttTracefile), 10);
+ g_datalist_id_set_data_full(&trace->tracefiles, name,
+ group, ltt_tracefile_group_destroy);
+ mdata = allocate_marker_data();
+ if (!mdata)
+ g_error("Error in allocating marker data");
+
+ group = g_array_set_size(group, num+1);
+ for (i = 0; i < group->len; i++)
+ g_array_index (group, LttTracefile, i).mdata = mdata;
+ return 1;
+ }
+ if ( num+1 > group->len )
+ {
+ mdata = g_array_index (group, LttTracefile, 0).mdata;
+ int old_num = group->len;
+ group = g_array_set_size(group, num+1);
+ for (i = old_num; i < group->len; i++)
+ g_array_index (group, LttTracefile, i).mdata = mdata;
+ }
+ if( name != LTT_TRACEFILE_NAME_METADATA ) {
+ update_num_cpu(trace, LTT_TRACEFILE_NAME_METADATA, num);
+ }
+ return 0;
+}
+/*****************************************************************************
+ *Function name
+ * ltt_tracefile_add_streaming: open and add a tracefile to a trace
+ * normally used when a new file is received
+ * in a streaming analysis
+ *Input params
+ * t : trace in which the tracefile will be add
+ * root_path : path of the trace
+ * rel_path : relative path of the tracefile
+
****************************************************************************/
+void ltt_tracefile_add_streaming( LttTrace *t, gchar *root_path, gchar*
rel_path)
+{
+ open_tracefiles_streaming(t, root_path, rel_path, TRUE);
+}
/* Open each tracefile under a specific directory. Put them in a
* GData : permits to access them using their tracefile group pathname.
* i.e. access control/modules tracefile group by index :
@@ -579,9 +813,13 @@ static __attribute__ ((__unused__)) gboolean
ltt_tracefile_group_has_cpu_online(
*
* A tracefile group is simply an array where all the per cpu
tracefiles sit.
*/
-
static int open_tracefiles(LttTrace *trace, gchar *root_path, gchar
*relative_path)
{
+ return open_tracefiles_streaming(trace, root_path, relative_path, FALSE);
+}
+
+static int open_tracefiles_streaming(LttTrace *trace, gchar *root_path,
gchar *relative_path, gboolean isStreaming)
+{
DIR *dir = opendir(root_path);
struct dirent *entry;
struct stat stat_buf;
@@ -634,7 +872,18 @@ static int open_tracefiles(LttTrace *trace, gchar
*root_path, gchar *relative_pa
if(S_ISDIR(stat_buf.st_mode)) {
g_debug("Entering subdirectory...\n");
- ret = open_tracefiles(trace, path, rel_path);
+ //Monitor the directory if we are in a streaming mode
+ if(isStreaming == TRUE)
+ {
+ trace->inotify_watch_array.elem =
realloc(trace->inotify_watch_array.elem,
+ ++trace->inotify_watch_array.num * sizeof(struct
inotify_watch));
+
+
trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].wd =
inotify_add_watch(trace->inotify_fd, path, IN_CREATE | IN_OPEN |
IN_CLOSE_WRITE);
+
strcpy(trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].path_channel,
path);
+
trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].name =
g_quark_from_string("");
+
trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].num = -1;
+ }
+ ret = open_tracefiles_streaming(trace, path, rel_path, isStreaming);
if(ret < 0) continue;
} else if(S_ISREG(stat_buf.st_mode)) {
GQuark name;
@@ -647,12 +896,27 @@ static int open_tracefiles(LttTrace *trace, gchar
*root_path, gchar *relative_pa
creation = 0;
if(get_tracefile_name_number(rel_path, &name, &num, &tid, &pgid,
&creation))
continue; /* invalid name */
-
+ //Monitor the directory if we are in a streaming mode
+ if(isStreaming == TRUE)
+ {
+ trace->inotify_watch_array.elem =
realloc(trace->inotify_watch_array.elem,
+ ++trace->inotify_watch_array.num * sizeof(struct
inotify_watch));
+
+
trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].wd =
inotify_add_watch(trace->inotify_fd, path, IN_OPEN | IN_MODIFY | IN_CLOSE);
+
strcpy(trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].path_channel,
path);
+
trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].name =
name;
+
trace->inotify_watch_array.elem[trace->inotify_watch_array.num-1].num = num;
+ }
g_debug("Opening file.\n");
- if(ltt_tracefile_open(trace, path, &tmp_tf)) {
+ if(ltt_tracefile_open_streaming(trace, path, &tmp_tf, isStreaming)) {
g_info("Error opening tracefile %s", path);
-
- continue; /* error opening the tracefile : bad magic number ? */
+ if(isStreaming == TRUE)
+ {
+ // Take to account the current file in the Cpu count of the group
+ update_num_cpu(trace, name, num);
+ }
+ continue; /* error opening the tracefile : bad magic number ? */
+
}
g_debug("Tracefile name is %s and number is %u",
@@ -678,6 +942,9 @@ static int open_tracefiles(LttTrace *trace, gchar
*root_path, gchar *relative_pa
g_error("Error in allocating marker data");
}
+ if(isStreaming == TRUE && name != LTT_TRACEFILE_NAME_METADATA)
+ update_num_cpu(trace, LTT_TRACEFILE_NAME_METADATA, num);
+
/* Add the per cpu tracefile to the named group */
unsigned int old_len = group->len;
if(num+1 > old_len)
@@ -802,9 +1069,13 @@ seek_error:
*
* pathname must be the directory of the trace
*/
-
LttTrace *ltt_trace_open(const gchar *pathname)
{
+ return ltt_trace_open_streaming(pathname, FALSE);
+}
+
+LttTrace *ltt_trace_open_streaming(const gchar *pathname, gboolean
isStreaming)
+{
gchar abs_path[PATH_MAX];
LttTrace * t;
LttTracefile *tf;
@@ -842,10 +1113,18 @@ LttTrace *ltt_trace_open(const gchar *pathname)
}
}
closedir(dir);
-
+
+ if(isStreaming == TRUE)
+ {
+ t->inotify_fd = inotify_init();
+ fcntl(t->inotify_fd, F_SETFL, O_NONBLOCK);
+ t->inotify_watch_array.elem = NULL;
+ t->inotify_watch_array.num = 0;
+ t->open_tracefile_counter = 0;
+ }
/* Open all the tracefiles */
t->start_freq= 0;
- if(open_tracefiles(t, abs_path, "")) {
+ if(open_tracefiles_streaming(t, abs_path, "", isStreaming)) {
g_warning("Error opening tracefile %s", abs_path);
goto find_error;
}
@@ -858,15 +1137,30 @@ LttTrace *ltt_trace_open(const gchar *pathname)
}
/*
- * Get the trace information for the metadata_0 tracefile.
* Getting a correct trace start_time and start_tsc is insured by
the fact
* that no subbuffers are supposed to be lost in the metadata channel.
* Therefore, the first subbuffer contains the start_tsc timestamp
in its
* buffer header.
*/
g_assert(group->len > 0);
- tf = &g_array_index (group, LttTracefile, 0);
- header = (ltt_subbuffer_header_t *)tf->buffer.head;
+ if(isStreaming == TRUE)
+ {
+ //Get the trace information from the metadata tracefile.
+ for(i = 0; i < group->len; i++)
+ {
+ tf = &g_array_index (group, LttTracefile, i);
+ header = (ltt_subbuffer_header_t *)tf->buffer.head;
+ if(header != NULL)
+ break;
+ }
+ }
+ else
+ {
+ //Get the trace information from the metadata_0 tracefile.
+ tf = &g_array_index (group, LttTracefile, 0);
+ header = (ltt_subbuffer_header_t *)tf->buffer.head;
+ }
+
ret = parse_trace_header(header, tf, t);
g_assert(!ret);
--
1.7.0.4
More information about the lttng-dev
mailing list