[ltt-dev] [LTTV PATCH 2/3] Add live trace read support to lttv base library and to command line modules

Yannick Brosseau yannick.brosseau at gmail.com
Mon Aug 29 17:23:38 EDT 2011


Change process_traceset_middle to handle live traces
Add lttv_process_trace_update to manage trace update
Add parameter to batchanalysis to indicate a live trace and its update period
-When activating the live option of the batchAnalysis module, the trace processing will loop. In each loop, it will
 wait for the specified period, update the trace and then read the added events.

Signed-off-by: Yannick Brosseau <yannick.brosseau at gmail.com>
---
 lttv/lttv/option.c                |   23 ++++++++-
 lttv/lttv/tracecontext.c          |  104 +++++++++++++++++++++++++++++++++++--
 lttv/lttv/tracecontext.h          |    2 +
 lttv/modules/text/batchAnalysis.c |   46 ++++++++++++++--
 4 files changed, 164 insertions(+), 11 deletions(-)

diff --git a/lttv/lttv/option.c b/lttv/lttv/option.c
index 148b4c8..11eb56a 100644
--- a/lttv/lttv/option.c
+++ b/lttv/lttv/option.c
@@ -61,6 +61,20 @@ static void free_option(LttvOption *option)
 	g_free(option);
 }
 
+static gboolean compare_short_option(gpointer key,
+				     gpointer value,
+				     gpointer user_data)
+{
+	LttvOption *option = value;
+	const char short_option = *(const char *)user_data;
+
+
+	if(option->char_name == short_option) {
+		return TRUE;
+	} else {
+		return FALSE;
+	}
+}
 
 void lttv_option_add(const char *long_name, const char char_name,
 		const char *description, const char *arg_description,
@@ -71,9 +85,16 @@ void lttv_option_add(const char *long_name, const char char_name,
 
 	g_log(G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Add option %s", long_name);
 	if(g_hash_table_lookup(options, long_name) != NULL) {
-		g_warning("duplicate option");
+		g_warning("duplicate long-option: %s", long_name);
 		return;
 	}
+	if(char_name && g_hash_table_find(options, compare_short_option, (gpointer)&char_name) != NULL) {
+		g_warning("duplicate short-option: %c for option %s", 
+			  char_name,
+			  long_name);
+		return;		
+	}
+
 
 	option = g_new(LttvOption, 1);
 	option->long_name = g_strdup(long_name);
diff --git a/lttv/lttv/tracecontext.c b/lttv/lttv/tracecontext.c
index 136b2ee..1c71ac2 100644
--- a/lttv/lttv/tracecontext.c
+++ b/lttv/lttv/tracecontext.c
@@ -27,6 +27,7 @@
 #include <ltt/trace.h>
 #include <lttv/filter.h>
 #include <errno.h>
+#include <ltt/time.h>
 
 gint compare_tracefile(gconstpointer a, gconstpointer b)
 {
@@ -689,6 +690,8 @@ guint lttv_process_traceset_middle(LttvTracesetContext *self,
 
 	unsigned count = 0;
 
+	gboolean is_live = FALSE; /* set this flag if we detect a live trace */
+
 	guint read_ret;
 
 	//enum read_state last_read_state = LAST_NONE;
@@ -706,7 +709,7 @@ guint lttv_process_traceset_middle(LttvTracesetContext *self,
 		/* End of traceset : tfc is NULL */
 		if(unlikely(tfc == NULL))
 		{
-			return count;
+			break;
 		}
 
 		/* Have we reached :
@@ -716,14 +719,17 @@ guint lttv_process_traceset_middle(LttvTracesetContext *self,
 		 * then the read is finished. We leave the queue in the same state and
 		 * break the loop.
 		 */
-
+		if(tfc->tf->trace->is_live && ltt_time_compare(tfc->timestamp, tfc->tf->trace->live_safe_timestamp) >= 0) {
+		  
+		  break;
+		}
 		if(unlikely(last_ret == TRUE
 				|| ((count >= nb_events) && (nb_events != G_MAXULONG))
 				|| (end_position!=NULL&&lttv_traceset_context_ctx_pos_compare(self,
 						end_position) == 0)
 				|| ltt_time_compare(end, tfc->timestamp) <= 0))
 		{
-			return count;
+			break;
 		}
 
 		/* Get the tracefile with an event for the smallest time found. If two
@@ -765,7 +771,8 @@ guint lttv_process_traceset_middle(LttvTracesetContext *self,
 	 if(unlikely(last_ret == 2)) {
 			/* This is a case where we want to stay at this position and stop read. */
 			g_tree_insert(pqueue, tfc, tfc);
-			return count - 1;
+			count--;
+			break;
 		}
 #endif //0
 		read_ret = ltt_tracefile_read(tfc->tf);
@@ -774,8 +781,15 @@ guint lttv_process_traceset_middle(LttvTracesetContext *self,
 		if(likely(!read_ret)) {
 			//g_debug("An event is ready");
 			tfc->timestamp = ltt_event_time(e);
+
+
 			g_assert(ltt_time_compare(tfc->timestamp, ltt_time_infinite) != 0);
 			g_tree_insert(pqueue, tfc, tfc);
+			if(tfc->tf->trace->is_live && ltt_time_compare(tfc->timestamp, tfc->tf->trace->live_safe_timestamp) >= 0)
+			{
+				is_live |= TRUE;
+				break;
+			}
 #ifdef DEBUG
 			test_time.tv_sec = 0;
 			test_time.tv_nsec = 0;
@@ -794,6 +808,12 @@ guint lttv_process_traceset_middle(LttvTracesetContext *self,
 				g_error("Error happened in lttv_process_traceset_middle");
 		}
 	}
+
+	if (unlikely((count == 0) && is_live)) {
+		return -1;
+	} else {
+		return count;
+	}
 }
 
 
@@ -856,6 +876,82 @@ void lttv_process_trace_seek_time(LttvTraceContext *self, LttTime start)
 #endif //DEBUG
 }
 
+/****************************************************************************
+ * lttv_process_trace_update
+ *
+ * process the changes that occur in the trace. Use a regular file polling to
+ * monitor the tracefile.
+ *
+ * Return the number of tracefile updated
+ ***************************************************************************/
+guint lttv_process_trace_update(LttvTraceContext *self)
+{
+	guint i; 
+	guint nb_tracefile = 0;
+
+	LttTracefile *tf = 0;
+	LttvTracefileContext **tfc;
+
+	/* Skip non live traces */
+	if(self->t->is_live) {
+
+		nb_tracefile = ltt_trace_update(self->t);
+
+		/* Recreate the pqueue following an update*/
+		GTree *pqueue = self->ts_context->pqueue;
+		
+		for(i = 0 ; i < self->tracefiles->len ; i++) {
+			tfc = &g_array_index(self->tracefiles, LttvTracefileContext*, i);
+			tf = (*tfc)->tf;
+			if(g_tree_remove(pqueue, *tfc) == FALSE) {
+				if(tf->buf_index != NULL) {
+
+					if(ltt_tracefile_read(tf) == 0) {
+						
+						(*tfc)->timestamp = ltt_event_time(ltt_tracefile_get_event((*tfc)->tf));
+						g_tree_insert(pqueue, (*tfc), (*tfc));
+						
+					}
+				}
+			} else {
+				g_tree_insert(pqueue, (*tfc), (*tfc));
+
+			}
+
+		
+		}
+		//Update self time span
+		self->time_span.end_time = LTT_TIME_MAX(self->t->live_safe_timestamp, 
+							self->time_span.end_time);
+		//Update self tscontext time span
+		self->ts_context->time_span.end_time = LTT_TIME_MAX(self->time_span.end_time, 
+								self->ts_context->time_span.end_time);
+	}
+	return nb_tracefile;
+	
+}
+
+/****************************************************************************
+ * lttv_process_traceset_update
+ *
+ * process the changes that occur in the traceset.
+ *
+ * Return the number of file presently monitor(open for writting). If 0, the
+ * current traceset probably received all the data.
+ ***************************************************************************/
+guint lttv_process_traceset_update(LttvTracesetContext *self)
+{
+	guint i;
+	guint nb_trace;
+	guint open_counter = 0;
+
+	nb_trace = lttv_traceset_number(self->ts);
+
+	for(i = 0 ; i < nb_trace ; i++) {
+		open_counter += lttv_process_trace_update(self->traces[i]);
+	}
+	return open_counter;
+}
 
 void lttv_process_traceset_seek_time(LttvTracesetContext *self, LttTime start)
 {
diff --git a/lttv/lttv/tracecontext.h b/lttv/lttv/tracecontext.h
index acedeea..e509811 100644
--- a/lttv/lttv/tracecontext.h
+++ b/lttv/lttv/tracecontext.h
@@ -220,6 +220,8 @@ void lttv_process_traceset_end(LttvTracesetContext *self,
 		LttvHooks *event,
 		LttvHooksByIdChannelArray *event_by_id_channel);
 
+guint lttv_process_traceset_update(LttvTracesetContext *self);
+
 
 void lttv_process_traceset_seek_time(LttvTracesetContext *self, LttTime start);
 
diff --git a/lttv/modules/text/batchAnalysis.c b/lttv/modules/text/batchAnalysis.c
index 7d55f44..d06f9c8 100644
--- a/lttv/modules/text/batchAnalysis.c
+++ b/lttv/modules/text/batchAnalysis.c
@@ -24,6 +24,7 @@
 #endif
 
 #include <glib.h>
+#include <unistd.h>
 #include <lttv/lttv.h>
 #include <lttv/attribute.h>
 #include <lttv/hook.h>
@@ -51,12 +52,20 @@ static LttvHooks
 static char *a_trace;
 
 static gboolean a_stats;
+static gboolean a_live;
+static int a_live_update_period;
+
+#define DEFAULT_LIVE_UPDATE_PERIOD 1
 
 void lttv_trace_option(void *hook_data)
 { 
   LttTrace *trace;
-
-  trace = ltt_trace_open(a_trace);
+  
+  if(a_live) {
+    trace = ltt_trace_open_live(a_trace);
+  } else {
+    trace = ltt_trace_open(a_trace);
+  }
   if(trace == NULL) g_critical("cannot open trace %s", a_trace);
   lttv_traceset_add(traceset, lttv_trace_new(trace));
 }
@@ -134,10 +143,19 @@ static gboolean process_traceset(void *hook_data, void *call_data)
   g_info("BatchAnalysis process traceset");
 
   lttv_process_traceset_seek_time(tc, start);
-  lttv_process_traceset_middle(tc,
-                               end,
-                               G_MAXULONG,
-                               NULL);
+  /* Read as long a we do not reach the end (0) */
+  unsigned int count;
+  unsigned int updated_count;
+  do {
+	  count = lttv_process_traceset_middle(tc,
+							  end,
+							  G_MAXULONG,
+							  NULL);
+	  
+	  updated_count = lttv_process_traceset_update(tc); 
+		
+	  sleep(a_live_update_period);
+  } while(count != 0 || updated_count > 0);
 
 
   //lttv_traceset_context_remove_hooks(tc,
@@ -186,6 +204,20 @@ static void init()
       "", 
       LTTV_OPT_NONE, &a_stats, NULL, NULL);
 
+  a_live = FALSE;
+  lttv_option_add("live", 0,
+      "define if the traceset is receiving live informations",
+      "",
+      LTTV_OPT_NONE, &a_live, NULL, NULL);
+  
+  a_live_update_period = DEFAULT_LIVE_UPDATE_PERIOD;
+  lttv_option_add("live-period", 0,
+		  "period to update a live trace",
+		  "in seconds",
+		  LTTV_OPT_INT,
+		  &a_live_update_period,
+		  NULL, NULL);
+
 
   traceset = lttv_traceset_new();
 
@@ -251,6 +283,8 @@ static void destroy()
 
   lttv_option_remove("trace");
   lttv_option_remove("stats");
+  lttv_option_remove("live");
+  lttv_option_remove("live-period");
 
   lttv_hooks_destroy(before_traceset);
   lttv_hooks_destroy(after_traceset);
-- 
1.7.5.4





More information about the lttng-dev mailing list