[ltt-dev] [PATCH] Add time reduction option to synchronisation

Benjamin Poirier benjamin.poirier at polymtl.ca
Mon Sep 20 20:49:12 EDT 2010


Some introduction about the synchronization code and the benefits of 
modules:
The synchronization framework consists of a group of modules that 
interchangeably plug in to a central "synchronization chain" concept. 
This chain performs the different steps needed to synchronize the time 
between a group of traces: read the events from different traces, match 
them together, analyze them to come up with correction factors and 
finally reduce the potentially large number of factors to a minimal set. 
Each step of the synchronization chain is carefully kept independent of 
the other steps. This allows, for example, the accuracy analysis code to 
work off of UDP events from lttng traces or simulated TCP events from 
text files. Any appropriate module can be plugged in at any step because 
1) each module is kept independent of other modules, 2) the core is kept 
independent of any specific module, 3) each module performs a single, 
specific task.

On to your patch:
The patch you are submitting adds code specific to your module inside 
the generic sync chain code. This robs the chain of its genericity which 
has very real consequences. As you start swapping modules, your patch 
induces crashes:

$ ./runlttv -m eval -H gdb
[...]
Program received signal SIGSEGV, Segmentation fault.
0x0806eb64 in initMatchingTCP (syncState=0x8087b68) at 
sync/event_matching_tcp.c:118
118        if (syncState->reductionModule->preProcessReduction != NULL 
|| syncState->stats) {

Please rework the patch to respect points 2). If new hooks are needed, 
these hooks should be made generic and available to all relevant 
modules. It would also mean that your module is doing something that no 
other module is currently doing, which brings me to my second point.

Your code and patch are very scarce in comments, this one seems to be 
the most revealing:
/* This function is called when time reduction is needed and
  * removes useless communication links and finds Spanning Tree
  * of nodes in the network and then sends packets to synchronization
  */

The number of "and"s in that sentence is revealing... That module is 
doing more than a "single, specific task". The job of reduction modules 
is to convert the pair-wise synchronization factors to a potentially 
smaller number of factors between each trace and a reference trace. This 
reduces the number of factors, not the number of "useless communication 
links".

If that module's intention is to perform factor reduction, keep it at 
that. If its intention is to do something else, separate it in a 
different a module. If I'm correct, what you want to do is to "filter 
out events". lttv already includes event filtering capabilities, perhaps 
these could be leveraged? Please explain what you want to do if you'd 
like some more advice.

Don't forget point 10 of the kernel's Documentation/SubmittingPatches!
10) Don't get discouraged.  Re-submit.

-Ben

On 15/09/10 12:36 PM, Masoume Jabbarifar wrote:
> ---
>   lttv/lttv/Makefile.am                  |    2 +
>   lttv/lttv/sync/Makefile.am             |    4 +-
>   lttv/lttv/sync/data_structures.c       |    2 -
>   lttv/lttv/sync/event_matching_tcp.c    |   87 +++++-
>   lttv/lttv/sync/event_matching_tcp.h    |    1 +
>   lttv/lttv/sync/event_processing_text.c |    4 +
>   lttv/lttv/sync/factor_reduction.h      |    6 +
>   lttv/lttv/sync/factor_reduction_time.c |  514 ++++++++++++++++++++++++++++++++
>   lttv/lttv/sync/factor_reduction_time.h |   53 ++++
>   lttv/lttv/sync/sync_chain_lttv.c       |    6 +
>   lttv/lttv/sync/sync_chain_unittest.c   |    2 +
>   11 files changed, 664 insertions(+), 17 deletions(-)
>   create mode 100644 lttv/lttv/sync/factor_reduction_time.c
>   create mode 100644 lttv/lttv/sync/factor_reduction_time.h
>
> diff --git a/lttv/lttv/Makefile.am b/lttv/lttv/Makefile.am
> index 30ead55..750f2d2 100644
> --- a/lttv/lttv/Makefile.am
> +++ b/lttv/lttv/Makefile.am
> @@ -87,6 +87,8 @@ lttv_real_SOURCES = \
>   	sync/factor_reduction.h\
>   	sync/factor_reduction_accuracy.c\
>   	sync/factor_reduction_accuracy.h\
> +	sync/factor_reduction_time.c\
> +	sync/factor_reduction_time.h\
>   	sync/lookup3.h
>
>   lttvinclude_HEADERS = \
> diff --git a/lttv/lttv/sync/Makefile.am b/lttv/lttv/sync/Makefile.am
> index e1d6775..7a5417d 100644
> --- a/lttv/lttv/sync/Makefile.am
> +++ b/lttv/lttv/sync/Makefile.am
> @@ -30,4 +30,6 @@ unittest_SOURCES = \
>   	event_analysis_linreg.h\
>   	factor_reduction.h\
>   	factor_reduction_accuracy.c\
> -	factor_reduction_accuracy.h
> +	factor_reduction_accuracy.h\
> +	factor_reduction_time.c\
> +	factor_reduction_time.h
> diff --git a/lttv/lttv/sync/data_structures.c b/lttv/lttv/sync/data_structures.c
> index acac9d7..c5fe736 100644
> --- a/lttv/lttv/sync/data_structures.c
> +++ b/lttv/lttv/sync/data_structures.c
> @@ -271,8 +271,6 @@ void destroyTCPSegment(Message* const segment)
>   {
>   	TCPEvent* inE, *outE;
>
> -	segment->print(segment);
> -
>   	g_assert(segment->inE != NULL&&  segment->outE != NULL);
>   	g_assert(segment->inE->type == TCP&&  segment->outE->type == TCP);
>   	inE= segment->inE->event.tcpEvent;
> diff --git a/lttv/lttv/sync/event_matching_tcp.c b/lttv/lttv/sync/event_matching_tcp.c
> index 90d6c43..678a334 100644
> --- a/lttv/lttv/sync/event_matching_tcp.c
> +++ b/lttv/lttv/sync/event_matching_tcp.c
> @@ -56,7 +56,8 @@ static void buildReversedConnectionKey(ConnectionKey* const
>   static void openGraphDataFiles(SyncState* const syncState);
>   static void closeGraphDataFiles(SyncState* const syncState);
>   static void writeMessagePoint(FILE* stream, const Message* const message);
> -
> +static void gfPacketDestroy(gpointer data, gpointer userData);
> +static void gfExchangeDestroy(gpointer data, gpointer userData);
>
>   static MatchingModule matchingModuleTCP = {
>   	.name= "TCP",
> @@ -101,6 +102,7 @@ void registerMatchingTCP()
>   static void initMatchingTCP(SyncState* const syncState)
>   {
>   	MatchingDataTCP* matchingData;
> +	int i, j;
>
>   	matchingData= malloc(sizeof(MatchingDataTCP));
>   	syncState->matchingData= matchingData;
> @@ -113,8 +115,7 @@ static void initMatchingTCP(SyncState* const syncState)
>   		&gefConnectionKeyEqual,&gdnConnectionKeyDestroy,
>   		&gdnTCPSegmentListDestroy);
>
> -	if (syncState->stats)
> -	{
> +	if (syncState->reductionModule->preProcessReduction != NULL || syncState->stats) {
>   		unsigned int i;
>
>   		matchingData->stats= calloc(1, sizeof(MatchingStatsTCP));
> @@ -139,6 +140,20 @@ static void initMatchingTCP(SyncState* const syncState)
>   	{
>   		matchingData->messagePoints= NULL;
>   	}
> +	if (syncState->reductionModule->preProcessReduction != NULL) {
> +		matchingData->packetArray= malloc(syncState->traceNb * sizeof(GQueue**));
> +		for (i= 0; i<  syncState->traceNb; i++) {
> +			matchingData->packetArray[i]= malloc(syncState->traceNb * sizeof(GQueue*));
> +	
> +			for (j= 0; j<  syncState->traceNb; j++) {
> +				matchingData->packetArray[i][j]= g_queue_new();
> +			}
> +		}
> +	}
> +	else
> +	{
> +		matchingData->packetArray= NULL;
> +	}
>   }
>
>
> @@ -155,6 +170,7 @@ static void initMatchingTCP(SyncState* const syncState)
>   static void destroyMatchingTCP(SyncState* const syncState)
>   {
>   	MatchingDataTCP* matchingData;
> +	int i, j;
>
>   	matchingData= (MatchingDataTCP*) syncState->matchingData;
>
> @@ -165,8 +181,7 @@ static void destroyMatchingTCP(SyncState* const syncState)
>
>   	partialDestroyMatchingTCP(syncState);
>
> -	if (syncState->stats)
> -	{
> +	if (syncState->reductionModule->preProcessReduction != NULL || syncState->stats) {
>   		unsigned int i;
>
>   		for (i= 0; i<  syncState->traceNb; i++)
> @@ -179,6 +194,17 @@ static void destroyMatchingTCP(SyncState* const syncState)
>
>   	free(syncState->matchingData);
>   	syncState->matchingData= NULL;
> +	if (syncState->reductionModule->preProcessReduction != NULL) {
> +			for (i= 0; i<  syncState->traceNb; i++) {
> +				for (j= 0; j<  syncState->traceNb; j++)
> +					if (syncState->analysisModule->analyzeMessage != NULL)
> +						g_queue_foreach(matchingData->packetArray[i][j], gfPacketDestroy, NULL);
> +					else
> +						g_queue_foreach(matchingData->packetArray[i][j], gfExchangeDestroy, NULL);
> +				free(matchingData->packetArray[i]);
> +			}
> +			free(matchingData->packetArray);
> +		}
>   }
>
>
> @@ -335,6 +361,7 @@ static void matchEvents(SyncState* const syncState, Event* const event,
>   	Message* packet;
>   	MatchingDataTCP* matchingData;
>   	GQueue* conUnAcked;
> +	GQueue* packetMatching;
>
>   	matchingData= (MatchingDataTCP*) syncState->matchingData;
>
> @@ -354,10 +381,11 @@ static void matchEvents(SyncState* const syncState, Event* const event,
>   		packet->outE->event.tcpEvent->segmentKey= packet->inE->event.tcpEvent->segmentKey;
>
>   		if (syncState->stats)
> -		{
>   			matchingData->stats->totPacket++;
> +
> +		if (syncState->reductionModule->preProcessReduction != NULL || syncState->stats)
>   			matchingData->stats->totMessageArray[packet->inE->traceNum][packet->outE->traceNum]++;
> -		}
> +		
>
>   		// Discard loopback traffic
>   		if (packet->inE->traceNum == packet->outE->traceNum)
> @@ -371,17 +399,24 @@ static void matchEvents(SyncState* const syncState, Event* const event,
>   			writeMessagePoint(matchingData->messagePoints[packet->inE->traceNum][packet->outE->traceNum],
>   				packet);
>   		}
> -
> -		if (syncState->analysisModule->analyzeMessage != NULL)
> -		{
> -			syncState->analysisModule->analyzeMessage(syncState, packet);
> +		if (syncState->analysisModule->analyzeMessage != NULL) {
> +			if (syncState->reductionModule->preProcessReduction == NULL)
> +			 	syncState->analysisModule->analyzeMessage(syncState, packet);			
> +			else {
> +				packetMatching=
> +					matchingData->packetArray[packet->inE->traceNum][packet->outE->traceNum];
> +				g_queue_push_tail(packetMatching, packet);
> +			}
> +		
>   		}
>
>   		// We can skip the rest of the algorithm if the analysis module is not
>   		// interested in exchanges
>   		if (syncState->analysisModule->analyzeExchange == NULL)
>   		{
> -			destroyTCPSegment(packet);
> +			if (syncState->reductionModule->preProcessReduction == NULL)				
> +				destroyTCPSegment(packet);		
> +
>   			return;
>   		}
>
> @@ -452,8 +487,14 @@ static void matchEvents(SyncState* const syncState, Event* const event,
>   							matchingData->stats->totExchangeSync++;
>   						}
>
> -						syncState->analysisModule->analyzeExchange(syncState,
> -							exchange);
> +						if (syncState->reductionModule->preProcessReduction == NULL)
> +							syncState->analysisModule->analyzeExchange(syncState, exchange);
> +						else
> +						{
> +							packetMatching=
> +								matchingData->packetArray[packet->inE->traceNum][packet->outE->traceNum];
> +							g_queue_push_tail(packetMatching, exchange);
> +						}
>   					}
>
>   					exchange->message= NULL;
> @@ -707,3 +748,21 @@ static void writeMatchingGraphsPlotsTCPMessages(SyncState* const syncState,
>   			"title \"Received messages\" with points linetype 4 "
>   			"linecolor rgb \"#6699cc\" pointtype 11 pointsize 2, \\\n", i, j);
>   }
> +static void gfPacketDestroy(gpointer data, gpointer userData)
> +{
> +	Message* packet;
> +
> +	packet= (Message*) data;
> +	destroyTCPSegment(packet);
> +
> +}
> +
> +static void gfExchangeDestroy(gpointer data, gpointer userData)
> +{
> +	Exchange* exchange;
> +
> +	exchange= (Exchange*) data;
> +	exchange->message= NULL;
> +	destroyTCPExchange(exchange);
> +
> +}
> diff --git a/lttv/lttv/sync/event_matching_tcp.h b/lttv/lttv/sync/event_matching_tcp.h
> index 6f9b072..e7d1d12 100644
> --- a/lttv/lttv/sync/event_matching_tcp.h
> +++ b/lttv/lttv/sync/event_matching_tcp.h
> @@ -57,6 +57,7 @@ typedef struct
>   	 * The elements on the diagonal are not initialized.
>   	 */
>   	FILE*** messagePoints;
> +	GQueue*** packetArray;
>   } MatchingDataTCP;
>
>   void registerMatchingTCP();
> diff --git a/lttv/lttv/sync/event_processing_text.c b/lttv/lttv/sync/event_processing_text.c
> index bcbea9b..894f71c 100644
> --- a/lttv/lttv/sync/event_processing_text.c
> +++ b/lttv/lttv/sync/event_processing_text.c
> @@ -273,6 +273,10 @@ static AllFactors* finalizeProcessingText(SyncState* const syncState)
>   		free(line);
>   	}
>
> +	if (syncState->reductionModule->preProcessReduction != NULL) {
> +		syncState->reductionModule->preProcessReduction(syncState);
> +	}
> +
>   	factors= syncState->matchingModule->finalizeMatching(syncState);
>   	if (syncState->stats)
>   	{
> diff --git a/lttv/lttv/sync/factor_reduction.h b/lttv/lttv/sync/factor_reduction.h
> index 561df6b..e971ea6 100644
> --- a/lttv/lttv/sync/factor_reduction.h
> +++ b/lttv/lttv/sync/factor_reduction.h
> @@ -39,6 +39,12 @@ typedef struct
>   	 */
>   	void (*destroyReduction)(struct _SyncState* const syncState);
>
> +	/* This function is called when time reduction is needed and
> +	 * removes useless communication links and finds Spanning Tree
> +	 * of nodes in the network and then sends packets to synchronization
> +	 */
> +	void (*preProcessReduction)(struct _SyncState* const syncState);
> +
>   	/*
>   	 * Convert trace pair synchronization factors to a resulting offset and
>   	 * drift for each trace.
> diff --git a/lttv/lttv/sync/factor_reduction_time.c b/lttv/lttv/sync/factor_reduction_time.c
> new file mode 100644
> index 0000000..2bd37bf
> --- /dev/null
> +++ b/lttv/lttv/sync/factor_reduction_time.c
> @@ -0,0 +1,514 @@
> +/* This file is part of the Linux Trace Toolkit viewer
> + * Copyright (C) 2010 Masoume Jabbarifar<masoume.jabbarifar at polymtl.ca>
> + *
> + * This program is free software: you can redistribute it and/or modify it
> + * under the terms of the GNU Lesser General Public License as published by
> + * the Free Software Foundation, either version 2.1 of the License, or (at
> + * your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful, but WITHOUT
> + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
> + * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
> + * License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public License
> + * along with this program.  If not, see<http://www.gnu.org/licenses/>.
> + */
> +#define _ISOC99_SOURCE
> +
> +#ifdef HAVE_CONFIG_H
> +#include<config.h>
> +#endif
> +
> +#include<math.h>
> +#include<errno.h>
> +#include<stdlib.h>
> +#include<string.h>
> +#include<unistd.h>
> +
> +
> +#include "event_analysis.h"
> +#include "event_matching.h"
> +#include "sync_chain.h"
> +
> +#include "factor_reduction_time.h"
> +#include "event_matching_tcp.h"
> +
> +
> +/* Functions common to all reduction modules */
> +static void initReductionTime(SyncState* const syncState);
> +static void destroyReductionTime(SyncState* const syncState);
> +static void preProcessReductionTime(SyncState* const syncState);
> +
> +static GArray* finalizeReductionTime(SyncState* const syncState,
> +	AllFactors* allFactors);
> +static void printReductionStatsTime(SyncState* const syncState);
> +
> +/* Functions specific to this module */
> +static void getFactors(SyncState* const syncState,AllFactors* const allFactors,
> +	const unsigned int traceNum,
> +	Factors* const factors);
> +static void maximumSpanningTree(SyncState* const syncState);
> +static void maximumCommunicationPacket(SyncState* const syncState);
> +static void floodingSyncExecute(SyncState* const syncState);
> +void updateDistances(int target,ReductionData* reductionData,SyncState* const syncState);
> +void sumCommunicationPackets(SyncState* const syncState);
> +void proposeRefNode(int** connectionArray, int* refNodes, int n);
> +void SyncExecute(int refNode,SyncState* const syncState);
> +int findParent(SyncState* const syncState, int root, int node);
> +
> +
> +static ReductionModule reductionModuleTime= {
> +	.name= "time",
> +	.initReduction=&initReductionTime,
> +	.destroyReduction=&destroyReductionTime,
> +	.preProcessReduction=&preProcessReductionTime,
> +	.finalizeReduction=&finalizeReductionTime,
> +	.printReductionStats=&printReductionStatsTime,
> +	.graphFunctions= {},
> +};
> +
> +
> +/*
> + * Reduction module registering function
> + */
> +void registerReductionTime()
> +{
> +	g_queue_push_tail(&reductionModules,&reductionModuleTime);
> +}
> +
> +
> +/*
> + * Reduction init function
> + *
> + * This function is called at the beginning of a synchronization run for a set
> + * of traces.
> + *
> + * Allocate some reduction specific data structures
> + *
> + * Args:
> + *   syncState     container for synchronization data.
> + */
> +static void initReductionTime(SyncState* const syncState)
> +{
> +	int i;
> +	
> +	ReductionData* reductionData;
> +
> +	reductionData= malloc(sizeof(ReductionData));
> +	syncState->reductionData= reductionData;
> +
> +	
> +	reductionData->routed= malloc(syncState->traceNb * sizeof(char*));
> +
> +	reductionData->distance= malloc(syncState->traceNb * sizeof(int*));
> +
> +	reductionData->neighbour= malloc(syncState->traceNb * sizeof(int*));
> +
> +	reductionData->totMessageArray= malloc(syncState->traceNb * sizeof(unsigned int*));
> +
> +	for (i= 0; i<  syncState->traceNb; i++) {
> +		reductionData->totMessageArray[i]=
> +			calloc(syncState->traceNb, sizeof(unsigned int));
> +	}
> +
> +	reductionData->totMSTMessageArray= malloc(syncState->traceNb * sizeof(unsigned int*));
> +
> +	for (i= 0; i<  syncState->traceNb; i++) {
> +		reductionData->totMSTMessageArray[i]=
> +			calloc(syncState->traceNb, sizeof(unsigned int));
> +	}
> +
> +	reductionData->totMSTAnalysisArray= malloc(syncState->traceNb * sizeof(unsigned int*));
> +
> +	for (i= 0; i<  syncState->traceNb; i++) {
> +		reductionData->totMSTAnalysisArray[i]=
> +			calloc(syncState->traceNb, sizeof(unsigned int));
> +	}
> +
> +	reductionData->maxRoot= malloc(syncState->traceNb * sizeof(int*));
> +	reductionData->maxRootMST= malloc(syncState->traceNb * sizeof(int*));
> +}
> +
> +static void preProcessReductionTime(SyncState* const syncState)
> +{
> +
> +	sumCommunicationPackets(syncState);	
> +	maximumCommunicationPacket(syncState);
> +	maximumSpanningTree(syncState);
> +	floodingSyncExecute(syncState);
> +}
> +/* Finding best path in the network that has more interaction among the
> + * nodes and synchronisation trough this path
> + *
> + * Maximum Spanning Tree function based on Prim's Algorithm
> + *
> + * The algorithm is implemented according to the code here:
> + * http://snippets.dzone.com/posts/show/4743
> + *
> +*/
> +static void maximumSpanningTree(SyncState* const syncState)
> +{
> +	int i, j;
> +	int total= 0;
> +	int treeSize, max;
> +	ReductionData* reductionData;
> +
> +	reductionData= syncState->reductionData;
> +
> +	/* Initialise distance with 0 */
> +	for (i= 0; i<  syncState->traceNb ; ++i)
> +		reductionData->distance[i]= 0;
> +
> +	/* Mark all nodes as NOT beeing in the maximum spanning tree */
> +	for (i= 0; i<  syncState->traceNb; ++i)
> +		reductionData->routed[i]= 0;
> +
> +	/* Add the first node to the tree */
> +	if (reductionData->referenceNode<  0) return;	
> +	g_debug("Adding node %d\n", reductionData->referenceNode);
> +	reductionData->routed[reductionData->referenceNode]= 1;
> +	updateDistances(reductionData->referenceNode, reductionData, syncState);
> +	
> +	for (treeSize= 1; treeSize<  syncState->traceNb; ++treeSize) {
> +		/* Find the node with the bigest distance to the tree */
> +		max= -1;
> +		for (i= 0; i<  syncState->traceNb; ++i)
> +			if (!reductionData->routed[i])
> +				if ((max == -1) || (reductionData->distance[max]<  reductionData->distance[i]))
> +					max= i;
> +
> +		/* And add it */
> +		reductionData->totMSTMessageArray[max][reductionData->neighbour[max]]= reductionData->distance[max];
> +		reductionData->totMSTMessageArray[reductionData->neighbour[max]][max]= reductionData->distance[max];
> +
> +		reductionData->totMSTAnalysisArray[max][reductionData->neighbour[max]]= reductionData->distance[max];
> +		reductionData->totMSTAnalysisArray[reductionData->neighbour[max]][max]= reductionData->distance[max];
> +
> +		g_debug("Adding edge %i-MAX(%i)-D(%i)\n", reductionData->neighbour[max], max, reductionData->distance[max]);
> +		reductionData->routed[max]= 1;
> +		total+= reductionData->distance[max];
> +
> +		updateDistances(max, reductionData, syncState);
> +	}
> +	
> +
> +	for ( i=0 ; i<  syncState->traceNb ; i++) {
> +		for (j= 0 ; j<  syncState->traceNb ; j++)
> +			g_debug("\t%d ", reductionData->totMSTMessageArray[i][j]);
> +	g_debug("\n");
> +	}
> +	g_debug("Total distance: %d\n", total);
> +
> +}
> +
> +
> +void updateDistances(int target, ReductionData* reductionData, SyncState* const syncState) {
> +
> +	int i;
> +	g_debug("Update[%i]= ", target);
> +	for (i= 0; i<  syncState->traceNb; ++i) {
> +		if ((reductionData->totMessageArray[target][i] != 0)&&  (reductionData->distance[i]<= reductionData->totMessageArray[target][i])) {
> +			if (reductionData->distance[i] != reductionData->totMessageArray[target][i]) {
> +				reductionData->distance[i]= reductionData->totMessageArray[target][i];
> +				reductionData->neighbour[i]= target;
> +			}
> +			else if (reductionData->distance[i] == reductionData->totMessageArray[target][i]&&  target == reductionData->referenceNode) {
> +				reductionData->distance[i]= reductionData->totMessageArray[target][i];
> +				reductionData->neighbour[i]= target;
> +			}
> +		}
> +		g_debug("%iD(%i)N(%i)", i, reductionData->distance[i], reductionData->neighbour[i]);
> +        }
> +	g_debug("\n------------------------------------------------------------\n");
> +}
> +
> +
> +static void maximumCommunicationPacket(SyncState* const syncState)
> +{
> +	
> +	ReductionData* reductionData;
> +
> +	reductionData= syncState->reductionData;
> +	proposeRefNode(reductionData->totMessageArray, reductionData->maxRoot, syncState->traceNb);
> +	reductionData->referenceNode= reductionData->maxRoot[0];
> +	g_debug("Best RefNODE is : %i\n", reductionData->referenceNode);
> +	
> +}
> +
> +/* Symmetrical matrix of packet communications */
> +
> +void sumCommunicationPackets(SyncState* const syncState) {
> +	int i, j, sum;
> +
> +	MatchingDataTCP* matchingData;
> +
> +	matchingData= syncState->matchingData;
> +
> +	ReductionData* reductionData;
> +
> +	reductionData= syncState->reductionData;
> +
> +
> +	for (i= 0; i<  syncState->traceNb; ++i)
> +		for (j= 0; j<  syncState->traceNb; ++j){
> +			sum= matchingData->stats->totMessageArray[i][j] + matchingData->stats->totMessageArray[j][i];
> +			reductionData->totMessageArray[i][j]= sum;
> +			reductionData->totMessageArray[j][i]= sum;
> +		}
> +
> +}
> +
> +/* Finding the node who has more interaction */
> +
> +void proposeRefNode(int** connectionArray, int* refNodes, int n){
> +
> +	int* sumPacket;
> +	int i, j, index=0;
> +	int maxSumPacket=0;
> +
> +	sumPacket= malloc(n* sizeof(int));
> +
> +	for (i= 0; i<  n; ++i) {
> +		sumPacket[i]= 0;
> +		for (j= 0;j<  n; ++j)
> +			sumPacket[i]+= connectionArray[i][j];
> +		g_debug("sum(%i)=%i\n", i, sumPacket[i]);
> +		if (sumPacket[i]>  maxSumPacket)
> +			maxSumPacket= sumPacket[i];
> +	}
> +	g_debug("proposed Reference Node: ");
> +	for (i= 0 ; i<  n ; ++i) {
> +		if (sumPacket[i] == maxSumPacket) {
> +			refNodes[index++]= i;
> +			g_debug("%i \t", i);
> +		}
> +	}
> +	refNodes[index]= -1;
> +	g_debug("\n");
> +	free(sumPacket);
> +}
> +
> +void floodingSyncExecute(SyncState* const syncState) {
> +
> +	ReductionData* reductionData;
> +	reductionData= syncState->reductionData;
> +	SyncExecute(reductionData->referenceNode, syncState);
> +	
> +}
> +
> +
> +/* First root and all of nodes who has connection with it (it's childs)
> + * must be synchronised then function will be run for the childs recursively
> + */
> +
> +void SyncExecute(int refNode, SyncState* const syncState) {
> +
> +	unsigned int i, j;
> +	GQueue* packetSync;
> +	Message* packet;
> +	Exchange* exchange;
> +	
> +	ReductionData* reductionData;
> +
> +	reductionData= syncState->reductionData;
> +
> +	MatchingDataTCP* matchingData;
> +
> +	matchingData= syncState->matchingData;
> +
> +	for (i= 0 ; i<  syncState->traceNb ; ++i)
> +		if (reductionData->totMSTMessageArray[refNode][i] != 0){
> +			packetSync= matchingData->packetArray[refNode][i];
> +
> +			for (j= 0 ; j<  packetSync->length ; ++j){
> +				if (syncState->analysisModule->analyzeMessage != NULL) {
> +					packet= g_queue_peek_nth(packetSync, j);	
> +					syncState->analysisModule->analyzeMessage(syncState, packet);
> +				}
> +				else {							
> +					exchange= g_queue_peek_nth(packetSync, j);
> +					syncState->analysisModule->analyzeExchange(syncState, exchange);
> +				}
> +			}
> +
> +			packetSync= matchingData->packetArray[i][refNode];
> +
> +			for (j= 0 ;j<  packetSync->length ; ++j){
> +				if (syncState->analysisModule->analyzeMessage != NULL) {
> +					packet= g_queue_peek_nth(packetSync, j);	
> +					syncState->analysisModule->analyzeMessage(syncState, packet);
> +				}
> +				else {
> +					exchange= g_queue_peek_nth(packetSync, j);
> +					syncState->analysisModule->analyzeExchange(syncState, exchange);
> +				}
> +			}
> +
> +			reductionData->totMSTMessageArray[i][refNode]= 0;
> +			reductionData->totMSTMessageArray[refNode][i]= 0;
> +			SyncExecute(i, syncState);
> +		}
> +	return;
> +}
> +/*
> + * Reduction destroy function
> + *
> + * Free the analysis specific data structures
> + *
> + * Args:
> + *   syncState     container for synchronization data.
> + */
> +static void destroyReductionTime(SyncState* const syncState)
> +{
> +	int i;
> +	ReductionData* reductionData;
> +	reductionData= (ReductionData*) syncState->reductionData;
> +
> +	if (reductionData == NULL) return;
> +	
> +	for (i= 0; i<  syncState->traceNb; i++) {
> +		free(reductionData->totMessageArray[i]);
> +		free(reductionData->totMSTMessageArray[i]);
> +		free(reductionData->totMSTAnalysisArray[i]);
> +	}
> +	free(reductionData->totMessageArray);
> +	free(reductionData->totMSTMessageArray);
> +	free(reductionData->totMSTAnalysisArray);
> +	free(reductionData->routed);
> +	free(reductionData->distance);
> +	free(reductionData->neighbour);
> +	free(reductionData->maxRoot);
> +	free(reductionData->maxRootMST);
> +	free(syncState->reductionData);
> +	syncState->reductionData= NULL;
> +}
> +
> +
> +/*
> + * Finalize the factor reduction
> + *
> + * Calculate a resulting offset and drift for each trace.
> + *
> + * Args:
> + *   syncState     container for synchronization data.
> + *   allFactors    offset and drift between each pair of traces
> + *
> + * Returns:
> + *   Factors[traceNb] synchronization factors for each trace
> +
> + */
> +static GArray* finalizeReductionTime(SyncState* const syncState,
> +	AllFactors* allFactors)
> +{
> +	int i, j;
> +	GArray* factors;
> +
> +	factors= g_array_sized_new(FALSE, FALSE, sizeof(Factors),
> +		syncState->traceNb);
> +	g_array_set_size(factors, syncState->traceNb);
> +	for (i= 0; i<  syncState->traceNb; i++) {
> +		getFactors(syncState, allFactors, i,&g_array_index(factors,
> +			Factors, i));
> +	}
> +
> +	return factors;
> +}
> +
> +
> +/*
> + * Print statistics related to reduction. Must be called after
> + * finalizeReduction.
> + *
> + * Args:
> + *   syncState     container for synchronization data.
> + */
> +static void printReductionStatsTime(SyncState* const syncState)
> +{
> +}
> +
> +/*
> + * Cummulate the time correction factors to convert a node's time to its
> + * reference's time.
> + * This function recursively calls itself until it reaches the reference node.
> + *
> + * Args:
> + *   allFactors:   offset and drift between each pair of traces
> + *   predecessors: matrix of each node's predecessor on the shortest
> + *                 path between two nodes
> + *   references:   reference node for each node
> + *   traceNum:     node for which to find the factors
> + *   factors:      resulting factors
> + */
> +static void getFactors(SyncState* const syncState, AllFactors* const allFactors, const unsigned int traceNum,
> +	Factors* const factors)
> +{
> +	unsigned int reference;
> +	PairFactors** const pairFactors= allFactors->pairFactors;
> +	int parent, i;
> +
> +	ReductionData* reductionData;
> +	reductionData= syncState->reductionData;	
> +
> +	if (traceNum == reductionData->referenceNode)
> +		reference= traceNum;
> +	else if (reductionData->totMSTAnalysisArray[reductionData->referenceNode][traceNum] != 0)
> +		reference= reductionData->referenceNode;
> +	else
> +		reference= findParent(syncState, -1, traceNum);
> +
> +	if (reference == traceNum) {
> +		factors->offset= 0.;
> +		factors->drift= 1.;
> +	}
> +	else {
> +		Factors previousVertexFactors;
> +		
> +		getFactors(syncState, allFactors, reference,&previousVertexFactors);
> +
> +		/* Convert the time from traceNum to reference;
> +		 * pairFactors[row][col] converts the time from col to row, invert the
> +		 * factors as necessary */
> +		
> +		if (pairFactors[reference][traceNum].approx != NULL) {
> +			factors->offset= previousVertexFactors.drift *
> +				pairFactors[reference][traceNum].approx->offset +
> +				previousVertexFactors.offset;
> +			factors->drift= previousVertexFactors.drift *
> +				pairFactors[reference][traceNum].approx->drift;
> +		}
> +		else if (pairFactors[traceNum][reference].approx != NULL) {
> +			factors->offset= previousVertexFactors.drift * (-1. *
> +				pairFactors[traceNum][reference].approx->offset /
> +				pairFactors[traceNum][reference].approx->drift) +
> +				previousVertexFactors.offset;
> +			factors->drift= previousVertexFactors.drift * (1. /
> +				pairFactors[traceNum][reference].approx->drift);
> +		}
> +		else {
> +			g_assert_not_reached();
> +		}
> +	}
> +}
> +
> +int findParent(SyncState* const syncState, int root, int node)
> +{
> +
> +	int i;
> +	int result;
> +	ReductionData* reductionData;
> +
> +	reductionData= syncState->reductionData;	
> +	for (i= 0; i<  syncState->traceNb; i++)
> +		if (reductionData->totMSTAnalysisArray[node][i] != 0&&  i == reductionData->referenceNode)
> +			return 1;	
> +
> +	for (i= 0; i<  syncState->traceNb; i++)
> +		if (reductionData->totMSTAnalysisArray[node][i] != 0)
> +			if (i != root) {
> +				result= findParent(syncState, node, i);
> +				if (result == 1&&  root == -1) return i;
> +				if (result == 1) return 1;
> +			}
> +	return 0;
> +}
> diff --git a/lttv/lttv/sync/factor_reduction_time.h b/lttv/lttv/sync/factor_reduction_time.h
> new file mode 100644
> index 0000000..c46cbd1
> --- /dev/null
> +++ b/lttv/lttv/sync/factor_reduction_time.h
> @@ -0,0 +1,53 @@
> +/* This file is part of the Linux Trace Toolkit viewer
> + * Copyright (C) 2010 Masoume Jabbarifar<masoume.jabbarifar at polymtl.ca>
> + *
> + * This program is free software: you can redistribute it and/or modify it
> + * under the terms of the GNU Lesser General Public License as published by
> + * the Free Software Foundation, either version 2.1 of the License, or (at
> + * your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful, but WITHOUT
> + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
> + * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
> + * License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public License
> + * along with this program.  If not, see<http://www.gnu.org/licenses/>.
> + */
> +
> +#ifndef FACTOR_REDUCTION_TIME_H
> +#define FACTOR_REDUCTION_TIME_H
> +
> +#include<glib.h>
> +
> +#include "data_structures.h"
> +
> +
> +typedef struct {
> +
> +	char* routed;   /* routed[i] is 1 when we have concidered the node i in the maximum
> +			spanning tree; otherwise it is 0 */
> +
> +	int* distance;  /* distance[i] is the distance between node i and the maximum
> +			spanning tree; this is initially 0; if i is
> +			already in the tree, then d[i] is undefined;
> +			this is just a temporary variable. It's not necessary but speeds
> +			up execution considerably (by a factor of n) */
> +
> +	int* neighbour; /* neighbour[i] holds the index of the node i would have to be
> +			linked to in order to get a distance of d[i] */
> +
> +	int referenceNode; /*referenceNode is one of networks node whose time will be
> +			   considered as reference time*/
> +
> +	unsigned int** totMessageArray;
> +	unsigned int** totMSTMessageArray;  /*Maximum spanning tree is saved in totMSTMessageArray*/
> +	unsigned int** totMSTAnalysisArray;
> +	int* maxRoot;
> +	int* maxRootMST;
> +
> +} ReductionData;
> +
> +void registerReductionTime();
> +
> +#endif
> diff --git a/lttv/lttv/sync/sync_chain_lttv.c b/lttv/lttv/sync/sync_chain_lttv.c
> index 95bef44..c60e8ac 100644
> --- a/lttv/lttv/sync/sync_chain_lttv.c
> +++ b/lttv/lttv/sync/sync_chain_lttv.c
> @@ -45,6 +45,7 @@
>   #include "event_analysis_linreg.h"
>   #include "event_analysis_eval.h"
>   #include "factor_reduction_accuracy.h"
> +#include "factor_reduction_time.h"
>   #include "sync_chain.h"
>   #include "sync_chain_lttv.h"
>
> @@ -139,6 +140,7 @@ static void init()
>   	registerAnalysisEval();
>
>   	registerReductionAccuracy();
> +	registerReductionTime();
>
>   	// Build module names lists for option and help string
>   	for (i= 0; i<  ARRAY_SIZE(loopValues); i++)
> @@ -313,6 +315,10 @@ bool syncTraceset(LttvTracesetContext* const traceSetContext)
>   	lttv_process_traceset_middle(traceSetContext, ltt_time_infinite,
>   		G_MAXULONG, NULL);
>   	lttv_process_traceset_seek_time(traceSetContext, ltt_time_zero);
> +	
> +	// Find the best refrence node and remove the useless sunchronization	
> +	if (syncState->reductionModule->preProcessReduction != NULL)
> +		syncState->reductionModule->preProcessReduction(syncState);		
>
>   	// Obtain, reduce, adjust and set correction factors
>   	allFactors= syncState->processingModule->finalizeProcessing(syncState);
> diff --git a/lttv/lttv/sync/sync_chain_unittest.c b/lttv/lttv/sync/sync_chain_unittest.c
> index 40302a0..e76fa78 100644
> --- a/lttv/lttv/sync/sync_chain_unittest.c
> +++ b/lttv/lttv/sync/sync_chain_unittest.c
> @@ -42,6 +42,7 @@
>   #include "event_analysis_linreg.h"
>   #include "event_analysis_eval.h"
>   #include "factor_reduction_accuracy.h"
> +#include "factor_reduction_time.h"
>   #include "sync_chain.h"
>
>
> @@ -134,6 +135,7 @@ int main(const int argc, char* const argv[])
>   	registerAnalysisEval();
>
>   	registerReductionAccuracy();
> +	registerReductionTime();
>
>   	// Initialize data structures
>   	syncState= malloc(sizeof(SyncState));




More information about the lttng-dev mailing list