[ltt-dev] [PATCH] Add time reduction option to synchronisation

Masoume Jabbarifar masoume.jabbarifar at polymtl.ca
Wed Sep 15 12:36:58 EDT 2010


---
 lttv/lttv/Makefile.am                  |    2 +
 lttv/lttv/sync/Makefile.am             |    4 +-
 lttv/lttv/sync/data_structures.c       |    2 -
 lttv/lttv/sync/event_matching_tcp.c    |   87 +++++-
 lttv/lttv/sync/event_matching_tcp.h    |    1 +
 lttv/lttv/sync/event_processing_text.c |    4 +
 lttv/lttv/sync/factor_reduction.h      |    6 +
 lttv/lttv/sync/factor_reduction_time.c |  514 ++++++++++++++++++++++++++++++++
 lttv/lttv/sync/factor_reduction_time.h |   53 ++++
 lttv/lttv/sync/sync_chain_lttv.c       |    6 +
 lttv/lttv/sync/sync_chain_unittest.c   |    2 +
 11 files changed, 664 insertions(+), 17 deletions(-)
 create mode 100644 lttv/lttv/sync/factor_reduction_time.c
 create mode 100644 lttv/lttv/sync/factor_reduction_time.h

diff --git a/lttv/lttv/Makefile.am b/lttv/lttv/Makefile.am
index 30ead55..750f2d2 100644
--- a/lttv/lttv/Makefile.am
+++ b/lttv/lttv/Makefile.am
@@ -87,6 +87,8 @@ lttv_real_SOURCES = \
 	sync/factor_reduction.h\
 	sync/factor_reduction_accuracy.c\
 	sync/factor_reduction_accuracy.h\
+	sync/factor_reduction_time.c\
+	sync/factor_reduction_time.h\
 	sync/lookup3.h
 
 lttvinclude_HEADERS = \
diff --git a/lttv/lttv/sync/Makefile.am b/lttv/lttv/sync/Makefile.am
index e1d6775..7a5417d 100644
--- a/lttv/lttv/sync/Makefile.am
+++ b/lttv/lttv/sync/Makefile.am
@@ -30,4 +30,6 @@ unittest_SOURCES = \
 	event_analysis_linreg.h\
 	factor_reduction.h\
 	factor_reduction_accuracy.c\
-	factor_reduction_accuracy.h
+	factor_reduction_accuracy.h\
+	factor_reduction_time.c\
+	factor_reduction_time.h
diff --git a/lttv/lttv/sync/data_structures.c b/lttv/lttv/sync/data_structures.c
index acac9d7..c5fe736 100644
--- a/lttv/lttv/sync/data_structures.c
+++ b/lttv/lttv/sync/data_structures.c
@@ -271,8 +271,6 @@ void destroyTCPSegment(Message* const segment)
 {
 	TCPEvent* inE, *outE;
 
-	segment->print(segment);
-
 	g_assert(segment->inE != NULL && segment->outE != NULL);
 	g_assert(segment->inE->type == TCP && segment->outE->type == TCP);
 	inE= segment->inE->event.tcpEvent;
diff --git a/lttv/lttv/sync/event_matching_tcp.c b/lttv/lttv/sync/event_matching_tcp.c
index 90d6c43..678a334 100644
--- a/lttv/lttv/sync/event_matching_tcp.c
+++ b/lttv/lttv/sync/event_matching_tcp.c
@@ -56,7 +56,8 @@ static void buildReversedConnectionKey(ConnectionKey* const
 static void openGraphDataFiles(SyncState* const syncState);
 static void closeGraphDataFiles(SyncState* const syncState);
 static void writeMessagePoint(FILE* stream, const Message* const message);
-
+static void gfPacketDestroy(gpointer data, gpointer userData);
+static void gfExchangeDestroy(gpointer data, gpointer userData);
 
 static MatchingModule matchingModuleTCP = {
 	.name= "TCP",
@@ -101,6 +102,7 @@ void registerMatchingTCP()
 static void initMatchingTCP(SyncState* const syncState)
 {
 	MatchingDataTCP* matchingData;
+	int i, j;
 
 	matchingData= malloc(sizeof(MatchingDataTCP));
 	syncState->matchingData= matchingData;
@@ -113,8 +115,7 @@ static void initMatchingTCP(SyncState* const syncState)
 		&gefConnectionKeyEqual, &gdnConnectionKeyDestroy,
 		&gdnTCPSegmentListDestroy);
 
-	if (syncState->stats)
-	{
+	if (syncState->reductionModule->preProcessReduction != NULL || syncState->stats) {
 		unsigned int i;
 
 		matchingData->stats= calloc(1, sizeof(MatchingStatsTCP));
@@ -139,6 +140,20 @@ static void initMatchingTCP(SyncState* const syncState)
 	{
 		matchingData->messagePoints= NULL;
 	}
+	if (syncState->reductionModule->preProcessReduction != NULL) {
+		matchingData->packetArray= malloc(syncState->traceNb * sizeof(GQueue**));
+		for (i= 0; i < syncState->traceNb; i++) {
+			matchingData->packetArray[i]= malloc(syncState->traceNb * sizeof(GQueue*));
+	
+			for (j= 0; j < syncState->traceNb; j++) {
+				matchingData->packetArray[i][j]= g_queue_new();
+			}
+		}
+	}
+	else
+	{
+		matchingData->packetArray= NULL;
+	}
 }
 
 
@@ -155,6 +170,7 @@ static void initMatchingTCP(SyncState* const syncState)
 static void destroyMatchingTCP(SyncState* const syncState)
 {
 	MatchingDataTCP* matchingData;
+	int i, j;
 
 	matchingData= (MatchingDataTCP*) syncState->matchingData;
 
@@ -165,8 +181,7 @@ static void destroyMatchingTCP(SyncState* const syncState)
 
 	partialDestroyMatchingTCP(syncState);
 
-	if (syncState->stats)
-	{
+	if (syncState->reductionModule->preProcessReduction != NULL || syncState->stats) {
 		unsigned int i;
 
 		for (i= 0; i < syncState->traceNb; i++)
@@ -179,6 +194,17 @@ static void destroyMatchingTCP(SyncState* const syncState)
 
 	free(syncState->matchingData);
 	syncState->matchingData= NULL;
+	if (syncState->reductionModule->preProcessReduction != NULL) {
+			for (i= 0; i < syncState->traceNb; i++) {
+				for (j= 0; j < syncState->traceNb; j++)
+					if (syncState->analysisModule->analyzeMessage != NULL)
+						g_queue_foreach(matchingData->packetArray[i][j], gfPacketDestroy, NULL);
+					else
+						g_queue_foreach(matchingData->packetArray[i][j], gfExchangeDestroy, NULL);
+				free(matchingData->packetArray[i]);
+			}
+			free(matchingData->packetArray);
+		}
 }
 
 
@@ -335,6 +361,7 @@ static void matchEvents(SyncState* const syncState, Event* const event,
 	Message* packet;
 	MatchingDataTCP* matchingData;
 	GQueue* conUnAcked;
+	GQueue* packetMatching;
 
 	matchingData= (MatchingDataTCP*) syncState->matchingData;
 
@@ -354,10 +381,11 @@ static void matchEvents(SyncState* const syncState, Event* const event,
 		packet->outE->event.tcpEvent->segmentKey= packet->inE->event.tcpEvent->segmentKey;
 
 		if (syncState->stats)
-		{
 			matchingData->stats->totPacket++;
+
+		if (syncState->reductionModule->preProcessReduction != NULL || syncState->stats)
 			matchingData->stats->totMessageArray[packet->inE->traceNum][packet->outE->traceNum]++;
-		}
+		
 
 		// Discard loopback traffic
 		if (packet->inE->traceNum == packet->outE->traceNum)
@@ -371,17 +399,24 @@ static void matchEvents(SyncState* const syncState, Event* const event,
 			writeMessagePoint(matchingData->messagePoints[packet->inE->traceNum][packet->outE->traceNum],
 				packet);
 		}
-
-		if (syncState->analysisModule->analyzeMessage != NULL)
-		{
-			syncState->analysisModule->analyzeMessage(syncState, packet);
+		if (syncState->analysisModule->analyzeMessage != NULL) {
+			if (syncState->reductionModule->preProcessReduction == NULL)
+			 	syncState->analysisModule->analyzeMessage(syncState, packet);			
+			else {
+				packetMatching= 
+					matchingData->packetArray[packet->inE->traceNum][packet->outE->traceNum];
+				g_queue_push_tail(packetMatching, packet);
+			}
+		
 		}
 
 		// We can skip the rest of the algorithm if the analysis module is not
 		// interested in exchanges
 		if (syncState->analysisModule->analyzeExchange == NULL)
 		{
-			destroyTCPSegment(packet);
+			if (syncState->reductionModule->preProcessReduction == NULL)				
+				destroyTCPSegment(packet);		
+
 			return;
 		}
 
@@ -452,8 +487,14 @@ static void matchEvents(SyncState* const syncState, Event* const event,
 							matchingData->stats->totExchangeSync++;
 						}
 
-						syncState->analysisModule->analyzeExchange(syncState,
-							exchange);
+						if (syncState->reductionModule->preProcessReduction == NULL)
+							syncState->analysisModule->analyzeExchange(syncState, exchange);
+						else 
+						{
+							packetMatching= 
+								matchingData->packetArray[packet->inE->traceNum][packet->outE->traceNum];
+							g_queue_push_tail(packetMatching, exchange);
+						}
 					}
 
 					exchange->message= NULL;
@@ -707,3 +748,21 @@ static void writeMatchingGraphsPlotsTCPMessages(SyncState* const syncState,
 			"title \"Received messages\" with points linetype 4 "
 			"linecolor rgb \"#6699cc\" pointtype 11 pointsize 2, \\\n", i, j);
 }
+static void gfPacketDestroy(gpointer data, gpointer userData)
+{
+	Message* packet;
+
+	packet= (Message*) data;
+	destroyTCPSegment(packet);
+
+}
+
+static void gfExchangeDestroy(gpointer data, gpointer userData)
+{
+	Exchange* exchange;
+
+	exchange= (Exchange*) data;
+	exchange->message= NULL;
+	destroyTCPExchange(exchange);
+
+}
diff --git a/lttv/lttv/sync/event_matching_tcp.h b/lttv/lttv/sync/event_matching_tcp.h
index 6f9b072..e7d1d12 100644
--- a/lttv/lttv/sync/event_matching_tcp.h
+++ b/lttv/lttv/sync/event_matching_tcp.h
@@ -57,6 +57,7 @@ typedef struct
 	 * The elements on the diagonal are not initialized.
 	 */
 	FILE*** messagePoints;
+	GQueue*** packetArray;
 } MatchingDataTCP;
 
 void registerMatchingTCP();
diff --git a/lttv/lttv/sync/event_processing_text.c b/lttv/lttv/sync/event_processing_text.c
index bcbea9b..894f71c 100644
--- a/lttv/lttv/sync/event_processing_text.c
+++ b/lttv/lttv/sync/event_processing_text.c
@@ -273,6 +273,10 @@ static AllFactors* finalizeProcessingText(SyncState* const syncState)
 		free(line);
 	}
 
+	if (syncState->reductionModule->preProcessReduction != NULL) {
+		syncState->reductionModule->preProcessReduction(syncState);
+	}
+
 	factors= syncState->matchingModule->finalizeMatching(syncState);
 	if (syncState->stats)
 	{
diff --git a/lttv/lttv/sync/factor_reduction.h b/lttv/lttv/sync/factor_reduction.h
index 561df6b..e971ea6 100644
--- a/lttv/lttv/sync/factor_reduction.h
+++ b/lttv/lttv/sync/factor_reduction.h
@@ -39,6 +39,12 @@ typedef struct
 	 */
 	void (*destroyReduction)(struct _SyncState* const syncState);
 
+	/* This function is called when time reduction is needed and
+	 * removes useless communication links and finds Spanning Tree
+	 * of nodes in the network and then sends packets to synchronization
+	 */
+	void (*preProcessReduction)(struct _SyncState* const syncState);
+
 	/*
 	 * Convert trace pair synchronization factors to a resulting offset and
 	 * drift for each trace.
diff --git a/lttv/lttv/sync/factor_reduction_time.c b/lttv/lttv/sync/factor_reduction_time.c
new file mode 100644
index 0000000..2bd37bf
--- /dev/null
+++ b/lttv/lttv/sync/factor_reduction_time.c
@@ -0,0 +1,514 @@
+/* This file is part of the Linux Trace Toolkit viewer
+ * Copyright (C) 2010 Masoume Jabbarifar <masoume.jabbarifar at polymtl.ca>
+ *
+ * This program is free software: you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
+ * License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+#define _ISOC99_SOURCE
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <math.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+
+#include "event_analysis.h"
+#include "event_matching.h"
+#include "sync_chain.h"
+
+#include "factor_reduction_time.h"
+#include "event_matching_tcp.h"
+
+
+/* Functions common to all reduction modules */
+static void initReductionTime(SyncState* const syncState);
+static void destroyReductionTime(SyncState* const syncState);
+static void preProcessReductionTime(SyncState* const syncState);
+
+static GArray* finalizeReductionTime(SyncState* const syncState,
+	AllFactors* allFactors);
+static void printReductionStatsTime(SyncState* const syncState);
+
+/* Functions specific to this module */
+static void getFactors(SyncState* const syncState,AllFactors* const allFactors,
+	const unsigned int traceNum,
+	Factors* const factors);
+static void maximumSpanningTree(SyncState* const syncState);
+static void maximumCommunicationPacket(SyncState* const syncState);
+static void floodingSyncExecute(SyncState* const syncState);
+void updateDistances(int target,ReductionData* reductionData,SyncState* const syncState);
+void sumCommunicationPackets(SyncState* const syncState);
+void proposeRefNode(int** connectionArray, int* refNodes, int n);
+void SyncExecute(int refNode,SyncState* const syncState);
+int findParent(SyncState* const syncState, int root, int node);
+
+
+static ReductionModule reductionModuleTime= {
+	.name= "time",
+	.initReduction= &initReductionTime,
+	.destroyReduction= &destroyReductionTime,
+	.preProcessReduction= &preProcessReductionTime,
+	.finalizeReduction= &finalizeReductionTime,
+	.printReductionStats= &printReductionStatsTime,
+	.graphFunctions= {},
+};
+
+
+/*
+ * Reduction module registering function
+ */
+void registerReductionTime()
+{
+	g_queue_push_tail(&reductionModules, &reductionModuleTime);
+}
+
+
+/*
+ * Reduction init function
+ *
+ * This function is called at the beginning of a synchronization run for a set
+ * of traces.
+ *
+ * Allocate some reduction specific data structures
+ *
+ * Args:
+ *   syncState     container for synchronization data.
+ */
+static void initReductionTime(SyncState* const syncState)
+{
+	int i;
+	
+	ReductionData* reductionData;
+
+	reductionData= malloc(sizeof(ReductionData));
+	syncState->reductionData= reductionData;
+
+	
+	reductionData->routed= malloc(syncState->traceNb * sizeof(char*));
+
+	reductionData->distance= malloc(syncState->traceNb * sizeof(int*));
+
+	reductionData->neighbour= malloc(syncState->traceNb * sizeof(int*));
+
+	reductionData->totMessageArray= malloc(syncState->traceNb * sizeof(unsigned int*));
+
+	for (i= 0; i < syncState->traceNb; i++) {
+		reductionData->totMessageArray[i]=
+			calloc(syncState->traceNb, sizeof(unsigned int));
+	}
+
+	reductionData->totMSTMessageArray= malloc(syncState->traceNb * sizeof(unsigned int*));
+
+	for (i= 0; i < syncState->traceNb; i++) {
+		reductionData->totMSTMessageArray[i]=
+			calloc(syncState->traceNb, sizeof(unsigned int));
+	}
+
+	reductionData->totMSTAnalysisArray= malloc(syncState->traceNb * sizeof(unsigned int*));
+
+	for (i= 0; i < syncState->traceNb; i++) {
+		reductionData->totMSTAnalysisArray[i]=
+			calloc(syncState->traceNb, sizeof(unsigned int));
+	}
+
+	reductionData->maxRoot= malloc(syncState->traceNb * sizeof(int*));
+	reductionData->maxRootMST= malloc(syncState->traceNb * sizeof(int*));
+}
+
+static void preProcessReductionTime(SyncState* const syncState)
+{
+
+	sumCommunicationPackets(syncState);	
+	maximumCommunicationPacket(syncState);
+	maximumSpanningTree(syncState);
+	floodingSyncExecute(syncState);
+}
+/* Finding best path in the network that has more interaction among the 
+ * nodes and synchronisation trough this path 
+ *
+ * Maximum Spanning Tree function based on Prim's Algorithm 
+ *
+ * The algorithm is implemented according to the code here:
+ * http://snippets.dzone.com/posts/show/4743
+ *
+*/
+static void maximumSpanningTree(SyncState* const syncState) 
+{
+	int i, j;
+	int total= 0;
+	int treeSize, max;
+	ReductionData* reductionData;
+
+	reductionData= syncState->reductionData;
+
+	/* Initialise distance with 0 */
+	for (i= 0; i < syncState->traceNb ; ++i)
+		reductionData->distance[i]= 0;
+
+	/* Mark all nodes as NOT beeing in the maximum spanning tree */
+	for (i= 0; i < syncState->traceNb; ++i)
+		reductionData->routed[i]= 0;
+
+	/* Add the first node to the tree */
+	if (reductionData->referenceNode < 0) return;	
+	g_debug("Adding node %d\n", reductionData->referenceNode);
+	reductionData->routed[reductionData->referenceNode]= 1;
+	updateDistances(reductionData->referenceNode, reductionData, syncState);
+	
+	for (treeSize= 1; treeSize < syncState->traceNb; ++treeSize) {
+		/* Find the node with the bigest distance to the tree */
+		max= -1;
+		for (i= 0; i < syncState->traceNb; ++i)
+			if (!reductionData->routed[i])
+				if ((max == -1) || (reductionData->distance[max] < reductionData->distance[i]))
+					max= i;
+
+		/* And add it */
+		reductionData->totMSTMessageArray[max][reductionData->neighbour[max]]= reductionData->distance[max];
+		reductionData->totMSTMessageArray[reductionData->neighbour[max]][max]= reductionData->distance[max];
+
+		reductionData->totMSTAnalysisArray[max][reductionData->neighbour[max]]= reductionData->distance[max];
+		reductionData->totMSTAnalysisArray[reductionData->neighbour[max]][max]= reductionData->distance[max];
+
+		g_debug("Adding edge %i-MAX(%i)-D(%i)\n", reductionData->neighbour[max], max, reductionData->distance[max]);
+		reductionData->routed[max]= 1;
+		total+= reductionData->distance[max];
+
+		updateDistances(max, reductionData, syncState);
+	}
+	
+
+	for ( i=0 ; i < syncState->traceNb ; i++) {
+		for (j= 0 ; j < syncState->traceNb ; j++)
+			g_debug("\t%d ", reductionData->totMSTMessageArray[i][j]);
+	g_debug("\n");
+	}
+	g_debug("Total distance: %d\n", total);
+
+}
+
+
+void updateDistances(int target, ReductionData* reductionData, SyncState* const syncState) {
+
+	int i;
+	g_debug("Update[%i]= ", target);
+	for (i= 0; i < syncState->traceNb; ++i) {
+		if ((reductionData->totMessageArray[target][i] != 0) && (reductionData->distance[i] <= reductionData->totMessageArray[target][i])) {
+			if (reductionData->distance[i] != reductionData->totMessageArray[target][i]) {
+				reductionData->distance[i]= reductionData->totMessageArray[target][i];
+				reductionData->neighbour[i]= target;
+			}
+			else if (reductionData->distance[i] == reductionData->totMessageArray[target][i] && target == reductionData->referenceNode) {
+				reductionData->distance[i]= reductionData->totMessageArray[target][i];
+				reductionData->neighbour[i]= target;
+			}
+		}
+		g_debug("%iD(%i)N(%i)", i, reductionData->distance[i], reductionData->neighbour[i]);
+        }
+	g_debug("\n------------------------------------------------------------\n");
+}
+
+
+static void maximumCommunicationPacket(SyncState* const syncState)
+{
+	
+	ReductionData* reductionData;
+
+	reductionData= syncState->reductionData;
+	proposeRefNode(reductionData->totMessageArray, reductionData->maxRoot, syncState->traceNb);
+	reductionData->referenceNode= reductionData->maxRoot[0];
+	g_debug("Best RefNODE is : %i\n", reductionData->referenceNode);
+	
+}
+
+/* Symmetrical matrix of packet communications */
+
+void sumCommunicationPackets(SyncState* const syncState) {
+	int i, j, sum;
+
+	MatchingDataTCP* matchingData;
+
+	matchingData= syncState->matchingData;
+
+	ReductionData* reductionData;
+
+	reductionData= syncState->reductionData;
+
+
+	for (i= 0; i < syncState->traceNb; ++i)
+		for (j= 0; j < syncState->traceNb; ++j){
+			sum= matchingData->stats->totMessageArray[i][j] + matchingData->stats->totMessageArray[j][i];
+			reductionData->totMessageArray[i][j]= sum;
+			reductionData->totMessageArray[j][i]= sum;
+		}
+
+}
+
+/* Finding the node who has more interaction */
+
+void proposeRefNode(int** connectionArray, int* refNodes, int n){
+
+	int* sumPacket;
+	int i, j, index=0;
+	int maxSumPacket=0;
+
+	sumPacket= malloc(n* sizeof(int));
+
+	for (i= 0; i < n; ++i) {
+		sumPacket[i]= 0;
+		for (j= 0;j < n; ++j)
+			sumPacket[i]+= connectionArray[i][j];
+		g_debug("sum(%i)=%i\n", i, sumPacket[i]);
+		if (sumPacket[i] > maxSumPacket) 
+			maxSumPacket= sumPacket[i];
+	}
+	g_debug("proposed Reference Node: ");
+	for (i= 0 ; i < n ; ++i) { 
+		if (sumPacket[i] == maxSumPacket) {
+			refNodes[index++]= i;
+			g_debug("%i \t", i);  
+		}
+	}
+	refNodes[index]= -1;
+	g_debug("\n");
+	free(sumPacket);
+}
+
+void floodingSyncExecute(SyncState* const syncState) {
+
+	ReductionData* reductionData;
+	reductionData= syncState->reductionData;
+	SyncExecute(reductionData->referenceNode, syncState);
+	
+}
+
+
+/* First root and all of nodes who has connection with it (it's childs) 
+ * must be synchronised then function will be run for the childs recursively 
+ */
+
+void SyncExecute(int refNode, SyncState* const syncState) {
+
+	unsigned int i, j;
+	GQueue* packetSync;
+	Message* packet;
+	Exchange* exchange;
+	
+	ReductionData* reductionData;
+
+	reductionData= syncState->reductionData;
+
+	MatchingDataTCP* matchingData;
+
+	matchingData= syncState->matchingData;
+
+	for (i= 0 ; i < syncState->traceNb ; ++i)
+		if (reductionData->totMSTMessageArray[refNode][i] != 0){
+			packetSync= matchingData->packetArray[refNode][i];
+
+			for (j= 0 ; j < packetSync->length ; ++j){
+				if (syncState->analysisModule->analyzeMessage != NULL) { 
+					packet= g_queue_peek_nth(packetSync, j);	
+					syncState->analysisModule->analyzeMessage(syncState, packet);
+				}
+				else {							
+					exchange= g_queue_peek_nth(packetSync, j);
+					syncState->analysisModule->analyzeExchange(syncState, exchange);
+				}
+			}
+
+			packetSync= matchingData->packetArray[i][refNode];
+
+			for (j= 0 ;j < packetSync->length ; ++j){
+				if (syncState->analysisModule->analyzeMessage != NULL) {
+					packet= g_queue_peek_nth(packetSync, j);	
+					syncState->analysisModule->analyzeMessage(syncState, packet);
+				}
+				else {
+					exchange= g_queue_peek_nth(packetSync, j);
+					syncState->analysisModule->analyzeExchange(syncState, exchange);
+				}
+			}
+
+			reductionData->totMSTMessageArray[i][refNode]= 0;
+			reductionData->totMSTMessageArray[refNode][i]= 0;
+			SyncExecute(i, syncState);
+		}
+	return;
+}
+/*
+ * Reduction destroy function
+ *
+ * Free the analysis specific data structures
+ *
+ * Args:
+ *   syncState     container for synchronization data.
+ */
+static void destroyReductionTime(SyncState* const syncState)
+{
+	int i;
+	ReductionData* reductionData;
+	reductionData= (ReductionData*) syncState->reductionData;
+
+	if (reductionData == NULL) return;
+	
+	for (i= 0; i < syncState->traceNb; i++) {
+		free(reductionData->totMessageArray[i]);
+		free(reductionData->totMSTMessageArray[i]);
+		free(reductionData->totMSTAnalysisArray[i]);
+	}
+	free(reductionData->totMessageArray);
+	free(reductionData->totMSTMessageArray);
+	free(reductionData->totMSTAnalysisArray);
+	free(reductionData->routed);
+	free(reductionData->distance);
+	free(reductionData->neighbour);
+	free(reductionData->maxRoot);
+	free(reductionData->maxRootMST);
+	free(syncState->reductionData);
+	syncState->reductionData= NULL;
+}
+
+
+/*
+ * Finalize the factor reduction
+ *
+ * Calculate a resulting offset and drift for each trace.
+ *
+ * Args:
+ *   syncState     container for synchronization data.
+ *   allFactors    offset and drift between each pair of traces
+ *
+ * Returns:
+ *   Factors[traceNb] synchronization factors for each trace
+
+ */
+static GArray* finalizeReductionTime(SyncState* const syncState, 
+	AllFactors* allFactors)
+{
+	int i, j;
+	GArray* factors;
+
+	factors= g_array_sized_new(FALSE, FALSE, sizeof(Factors),
+		syncState->traceNb);
+	g_array_set_size(factors, syncState->traceNb);
+	for (i= 0; i < syncState->traceNb; i++) {
+		getFactors(syncState, allFactors, i, &g_array_index(factors,
+			Factors, i));
+	}
+
+	return factors;
+}
+
+
+/*
+ * Print statistics related to reduction. Must be called after
+ * finalizeReduction.
+ *
+ * Args:
+ *   syncState     container for synchronization data.
+ */
+static void printReductionStatsTime(SyncState* const syncState)
+{
+}
+
+/*
+ * Cummulate the time correction factors to convert a node's time to its
+ * reference's time.
+ * This function recursively calls itself until it reaches the reference node.
+ *
+ * Args:
+ *   allFactors:   offset and drift between each pair of traces
+ *   predecessors: matrix of each node's predecessor on the shortest
+ *                 path between two nodes
+ *   references:   reference node for each node
+ *   traceNum:     node for which to find the factors
+ *   factors:      resulting factors
+ */
+static void getFactors(SyncState* const syncState, AllFactors* const allFactors, const unsigned int traceNum,
+	Factors* const factors)
+{
+	unsigned int reference;
+	PairFactors** const pairFactors= allFactors->pairFactors;
+	int parent, i;
+
+	ReductionData* reductionData;
+	reductionData= syncState->reductionData;	
+
+	if (traceNum == reductionData->referenceNode)
+		reference= traceNum;
+	else if (reductionData->totMSTAnalysisArray[reductionData->referenceNode][traceNum] != 0)
+		reference= reductionData->referenceNode;
+	else 
+		reference= findParent(syncState, -1, traceNum);
+
+	if (reference == traceNum) {
+		factors->offset= 0.;
+		factors->drift= 1.;
+	}
+	else {
+		Factors previousVertexFactors;
+		
+		getFactors(syncState, allFactors, reference, &previousVertexFactors);
+
+		/* Convert the time from traceNum to reference;
+		 * pairFactors[row][col] converts the time from col to row, invert the
+		 * factors as necessary */
+		
+		if (pairFactors[reference][traceNum].approx != NULL) {
+			factors->offset= previousVertexFactors.drift *
+				pairFactors[reference][traceNum].approx->offset +
+				previousVertexFactors.offset;
+			factors->drift= previousVertexFactors.drift *
+				pairFactors[reference][traceNum].approx->drift;
+		}
+		else if (pairFactors[traceNum][reference].approx != NULL) {
+			factors->offset= previousVertexFactors.drift * (-1. *
+				pairFactors[traceNum][reference].approx->offset /
+				pairFactors[traceNum][reference].approx->drift) +
+				previousVertexFactors.offset;
+			factors->drift= previousVertexFactors.drift * (1. /
+				pairFactors[traceNum][reference].approx->drift);
+		}
+		else {
+			g_assert_not_reached();
+		}
+	}
+}
+
+int findParent(SyncState* const syncState, int root, int node)
+{
+
+	int i;
+	int result;
+	ReductionData* reductionData;
+
+	reductionData= syncState->reductionData;	
+	for (i= 0; i < syncState->traceNb; i++) 
+		if (reductionData->totMSTAnalysisArray[node][i] != 0 && i == reductionData->referenceNode)
+			return 1;	
+
+	for (i= 0; i < syncState->traceNb; i++)
+		if (reductionData->totMSTAnalysisArray[node][i] != 0)
+			if (i != root) {
+				result= findParent(syncState, node, i);
+				if (result == 1 && root == -1) return i;
+				if (result == 1) return 1;
+			}
+	return 0;
+}
diff --git a/lttv/lttv/sync/factor_reduction_time.h b/lttv/lttv/sync/factor_reduction_time.h
new file mode 100644
index 0000000..c46cbd1
--- /dev/null
+++ b/lttv/lttv/sync/factor_reduction_time.h
@@ -0,0 +1,53 @@
+/* This file is part of the Linux Trace Toolkit viewer
+ * Copyright (C) 2010 Masoume Jabbarifar <masoume.jabbarifar at polymtl.ca>
+ *
+ * This program is free software: you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
+ * License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef FACTOR_REDUCTION_TIME_H
+#define FACTOR_REDUCTION_TIME_H
+
+#include <glib.h>
+
+#include "data_structures.h"
+
+
+typedef struct {
+
+	char* routed;   /* routed[i] is 1 when we have concidered the node i in the maximum
+			spanning tree; otherwise it is 0 */
+
+	int* distance;  /* distance[i] is the distance between node i and the maximum 
+			spanning tree; this is initially 0; if i is
+			already in the tree, then d[i] is undefined;
+			this is just a temporary variable. It's not necessary but speeds
+			up execution considerably (by a factor of n) */
+
+	int* neighbour; /* neighbour[i] holds the index of the node i would have to be
+			linked to in order to get a distance of d[i] */
+
+	int referenceNode; /*referenceNode is one of networks node whose time will be 
+			   considered as reference time*/
+
+	unsigned int** totMessageArray;
+	unsigned int** totMSTMessageArray;  /*Maximum spanning tree is saved in totMSTMessageArray*/
+	unsigned int** totMSTAnalysisArray;
+	int* maxRoot;
+	int* maxRootMST;
+
+} ReductionData;
+
+void registerReductionTime();
+
+#endif
diff --git a/lttv/lttv/sync/sync_chain_lttv.c b/lttv/lttv/sync/sync_chain_lttv.c
index 95bef44..c60e8ac 100644
--- a/lttv/lttv/sync/sync_chain_lttv.c
+++ b/lttv/lttv/sync/sync_chain_lttv.c
@@ -45,6 +45,7 @@
 #include "event_analysis_linreg.h"
 #include "event_analysis_eval.h"
 #include "factor_reduction_accuracy.h"
+#include "factor_reduction_time.h"
 #include "sync_chain.h"
 #include "sync_chain_lttv.h"
 
@@ -139,6 +140,7 @@ static void init()
 	registerAnalysisEval();
 
 	registerReductionAccuracy();
+	registerReductionTime();
 
 	// Build module names lists for option and help string
 	for (i= 0; i < ARRAY_SIZE(loopValues); i++)
@@ -313,6 +315,10 @@ bool syncTraceset(LttvTracesetContext* const traceSetContext)
 	lttv_process_traceset_middle(traceSetContext, ltt_time_infinite,
 		G_MAXULONG, NULL);
 	lttv_process_traceset_seek_time(traceSetContext, ltt_time_zero);
+	
+	// Find the best refrence node and remove the useless sunchronization	
+	if (syncState->reductionModule->preProcessReduction != NULL) 
+		syncState->reductionModule->preProcessReduction(syncState);		
 
 	// Obtain, reduce, adjust and set correction factors
 	allFactors= syncState->processingModule->finalizeProcessing(syncState);
diff --git a/lttv/lttv/sync/sync_chain_unittest.c b/lttv/lttv/sync/sync_chain_unittest.c
index 40302a0..e76fa78 100644
--- a/lttv/lttv/sync/sync_chain_unittest.c
+++ b/lttv/lttv/sync/sync_chain_unittest.c
@@ -42,6 +42,7 @@
 #include "event_analysis_linreg.h"
 #include "event_analysis_eval.h"
 #include "factor_reduction_accuracy.h"
+#include "factor_reduction_time.h"
 #include "sync_chain.h"
 
 
@@ -134,6 +135,7 @@ int main(const int argc, char* const argv[])
 	registerAnalysisEval();
 
 	registerReductionAccuracy();
+	registerReductionTime();
 
 	// Initialize data structures
 	syncState= malloc(sizeof(SyncState));
-- 
1.6.0.4





More information about the lttng-dev mailing list