[ltt-dev] [LTTV PATCH 2/3] Add live trace read support to lttv base library and to command line modules

Yannick Brosseau yannick.brosseau at gmail.com
Thu Apr 21 16:52:39 EDT 2011


Change process_traceset_middle to handle live traces
Add lttv_process_trace_update to manage trace update
Add parameter to batchanalysis to indicate a live trace and its update period
-When activating the live option of the batchAnalysis module, the trace processing will loop. In each loop, it will
 wait for the specified period, update the trace and then read the added events.

Signed-off-by: Yannick Brosseau <yannick.brosseau at gmail.com>
---
 lttv/lttv/batchtest.c             |    2 +-
 lttv/lttv/tracecontext.c          |   95 +++++++++++++++++++++++++++++++++++-
 lttv/lttv/tracecontext.h          |    2 +
 lttv/modules/text/batchAnalysis.c |   40 ++++++++++++++--
 4 files changed, 130 insertions(+), 9 deletions(-)

diff --git a/lttv/lttv/batchtest.c b/lttv/lttv/batchtest.c
index 08b892b..fd4a218 100644
--- a/lttv/lttv/batchtest.c
+++ b/lttv/lttv/batchtest.c
@@ -100,7 +100,7 @@ static void lttv_trace_option(void __UNUSED__ *hook_data)
 { 
 	LttTrace *trace;
 
-	trace = ltt_trace_open(a_trace);
+	trace = ltt_trace_open(a_trace, FALSE);
 	if(trace == NULL) {
 		g_critical("cannot open trace %s", a_trace);
 	} else {
diff --git a/lttv/lttv/tracecontext.c b/lttv/lttv/tracecontext.c
index 136b2ee..45a810d 100644
--- a/lttv/lttv/tracecontext.c
+++ b/lttv/lttv/tracecontext.c
@@ -27,6 +27,7 @@
 #include <ltt/trace.h>
 #include <lttv/filter.h>
 #include <errno.h>
+#include <ltt/time.h>
 
 gint compare_tracefile(gconstpointer a, gconstpointer b)
 {
@@ -689,6 +690,8 @@ guint lttv_process_traceset_middle(LttvTracesetContext *self,
 
 	unsigned count = 0;
 
+	gboolean is_live = FALSE; /* set this flag if we detect a live trace */
+
 	guint read_ret;
 
 	//enum read_state last_read_state = LAST_NONE;
@@ -706,7 +709,7 @@ guint lttv_process_traceset_middle(LttvTracesetContext *self,
 		/* End of traceset : tfc is NULL */
 		if(unlikely(tfc == NULL))
 		{
-			return count;
+			break;
 		}
 
 		/* Have we reached :
@@ -723,7 +726,7 @@ guint lttv_process_traceset_middle(LttvTracesetContext *self,
 						end_position) == 0)
 				|| ltt_time_compare(end, tfc->timestamp) <= 0))
 		{
-			return count;
+			break;
 		}
 
 		/* Get the tracefile with an event for the smallest time found. If two
@@ -765,7 +768,8 @@ guint lttv_process_traceset_middle(LttvTracesetContext *self,
 	 if(unlikely(last_ret == 2)) {
 			/* This is a case where we want to stay at this position and stop read. */
 			g_tree_insert(pqueue, tfc, tfc);
-			return count - 1;
+			count--;
+			break;
 		}
 #endif //0
 		read_ret = ltt_tracefile_read(tfc->tf);
@@ -774,6 +778,12 @@ guint lttv_process_traceset_middle(LttvTracesetContext *self,
 		if(likely(!read_ret)) {
 			//g_debug("An event is ready");
 			tfc->timestamp = ltt_event_time(e);
+			if(tfc->tf->trace->is_live && ltt_time_compare(tfc->timestamp, tfc->tf->trace->live_safe_timestamp) >= 0)
+			{
+				is_live |= TRUE;
+				break;
+			}
+
 			g_assert(ltt_time_compare(tfc->timestamp, ltt_time_infinite) != 0);
 			g_tree_insert(pqueue, tfc, tfc);
 #ifdef DEBUG
@@ -794,6 +804,12 @@ guint lttv_process_traceset_middle(LttvTracesetContext *self,
 				g_error("Error happened in lttv_process_traceset_middle");
 		}
 	}
+
+	if (unlikely(count == 0 && is_live)) {
+		return -1;
+	} else {
+		return count;
+	}
 }
 
 
@@ -856,6 +872,79 @@ void lttv_process_trace_seek_time(LttvTraceContext *self, LttTime start)
 #endif //DEBUG
 }
 
+/****************************************************************************
+ * lttv_process_trace_update
+ *
+ * process the changes that occur in the trace. Use a regular file polling to
+ * monitor the tracefile.
+ *
+ * Return the number of tracefile updated
+ ***************************************************************************/
+guint lttv_process_trace_update(LttvTraceContext *self)
+{
+	guint i; 
+	guint nb_tracefile = 0;
+
+	gint ret;
+
+	LttvTracefileContext **tfc;
+
+	/* Skip non live traces */
+	if(self->t->is_live) {
+
+		nb_tracefile = self->tracefiles->len;
+		
+		GTree *pqueue = self->ts_context->pqueue;
+		
+		for(i = 0 ; i < nb_tracefile ; i++) {
+			tfc = &g_array_index(self->tracefiles, LttvTracefileContext*, i);
+			
+			g_tree_remove(pqueue, *tfc);
+			
+			(*tfc)->tf->trace->live_safe_timestamp = LTT_TIME_MAX((*tfc)->tf->trace->live_safe_timestamp,
+									ltt_interpolate_time_from_tsc((*tfc)->tf, 
+												(*tfc)->tf->end_timestamp));
+
+			ret = ltt_tracefile_update((*tfc)->tf);
+			if(ret == 1 || ltt_time_compare((*tfc)->timestamp,ltt_time_infinite) != 0)
+			{
+				(*tfc)->timestamp = ltt_event_time(ltt_tracefile_get_event((*tfc)->tf));
+				g_tree_insert(pqueue, (*tfc), (*tfc));
+				
+			}
+		}
+		//Update self time span
+		self->time_span.end_time = LTT_TIME_MAX(self->t->live_safe_timestamp, 
+							self->time_span.end_time);
+		//Update self tscontext time span
+		self->ts_context->time_span.end_time = LTT_TIME_MAX(self->time_span.end_time, 
+								self->ts_context->time_span.end_time);
+	}
+	return nb_tracefile;
+	
+}
+
+/****************************************************************************
+ * lttv_process_traceset_update
+ *
+ * process the changes that occur in the traceset.
+ *
+ * Return the number of file presently monitor(open for writting). If 0, the
+ * current traceset probably received all the data.
+ ***************************************************************************/
+guint lttv_process_traceset_update(LttvTracesetContext *self)
+{
+	guint i;
+	guint nb_trace;
+	guint open_counter = 0;
+
+	nb_trace = lttv_traceset_number(self->ts);
+
+	for(i = 0 ; i < nb_trace ; i++) {
+		open_counter += lttv_process_trace_update(self->traces[i]);
+	}
+	return open_counter;
+}
 
 void lttv_process_traceset_seek_time(LttvTracesetContext *self, LttTime start)
 {
diff --git a/lttv/lttv/tracecontext.h b/lttv/lttv/tracecontext.h
index acedeea..e509811 100644
--- a/lttv/lttv/tracecontext.h
+++ b/lttv/lttv/tracecontext.h
@@ -220,6 +220,8 @@ void lttv_process_traceset_end(LttvTracesetContext *self,
 		LttvHooks *event,
 		LttvHooksByIdChannelArray *event_by_id_channel);
 
+guint lttv_process_traceset_update(LttvTracesetContext *self);
+
 
 void lttv_process_traceset_seek_time(LttvTracesetContext *self, LttTime start);
 
diff --git a/lttv/modules/text/batchAnalysis.c b/lttv/modules/text/batchAnalysis.c
index 7d55f44..4a79afb 100644
--- a/lttv/modules/text/batchAnalysis.c
+++ b/lttv/modules/text/batchAnalysis.c
@@ -24,6 +24,7 @@
 #endif
 
 #include <glib.h>
+#include <unistd.h>
 #include <lttv/lttv.h>
 #include <lttv/attribute.h>
 #include <lttv/hook.h>
@@ -51,12 +52,16 @@ static LttvHooks
 static char *a_trace;
 
 static gboolean a_stats;
+static gboolean a_live;
+static int a_live_update_period;
+
+#define DEFAULT_LIVE_UPDATE_PERIOD 1
 
 void lttv_trace_option(void *hook_data)
 { 
   LttTrace *trace;
 
-  trace = ltt_trace_open(a_trace);
+  trace = ltt_trace_open(a_trace, a_live);
   if(trace == NULL) g_critical("cannot open trace %s", a_trace);
   lttv_traceset_add(traceset, lttv_trace_new(trace));
 }
@@ -134,10 +139,19 @@ static gboolean process_traceset(void *hook_data, void *call_data)
   g_info("BatchAnalysis process traceset");
 
   lttv_process_traceset_seek_time(tc, start);
-  lttv_process_traceset_middle(tc,
-                               end,
-                               G_MAXULONG,
-                               NULL);
+  /* Read as long a we do not reach the end (0) */
+  unsigned int count;
+  unsigned int updated_count;
+  do {
+	  count = lttv_process_traceset_middle(tc,
+							  end,
+							  G_MAXULONG,
+							  NULL);
+	  
+	  updated_count = lttv_process_traceset_update(tc); 
+		
+	  sleep(a_live_update_period);
+  } while(count != 0 || updated_count > 0);
 
 
   //lttv_traceset_context_remove_hooks(tc,
@@ -186,6 +200,20 @@ static void init()
       "", 
       LTTV_OPT_NONE, &a_stats, NULL, NULL);
 
+  a_live = FALSE;
+  lttv_option_add("live", 'L',
+      "define if the traceset is receiving live informations",
+      "",
+      LTTV_OPT_NONE, &a_live, NULL, NULL);
+  
+  a_live_update_period = DEFAULT_LIVE_UPDATE_PERIOD;
+  lttv_option_add("live-period", 0,
+		  "period to update a live trace",
+		  "in seconds",
+		  LTTV_OPT_INT,
+		  &a_live_update_period,
+		  NULL, NULL);
+
 
   traceset = lttv_traceset_new();
 
@@ -251,6 +279,8 @@ static void destroy()
 
   lttv_option_remove("trace");
   lttv_option_remove("stats");
+  lttv_option_remove("live");
+  lttv_option_remove("live-period");
 
   lttv_hooks_destroy(before_traceset);
   lttv_hooks_destroy(after_traceset);
-- 
1.7.2.3





More information about the lttng-dev mailing list