[ltt-dev] [PATCH] Add time reduction option to synchronisation
Benjamin Poirier
benjamin.poirier at polymtl.ca
Wed Sep 22 21:47:15 EDT 2010
On 22/09/10 07:38 PM, Masoume Jabbarifar wrote:
>
>
> On Mon, Sep 20, 2010 at 8:49 PM, Benjamin Poirier
> <benjamin.poirier at polymtl.ca <mailto:benjamin.poirier at polymtl.ca>> wrote:
Read linux-2.6/Documentation/SubmittingPatches
7) No MIME, no links, no compression, no attachments. Just plain text.
More comments below.
>
> Some introduction about the synchronization code and the benefits of
> modules:
> The synchronization framework consists of a group of modules that
> interchangeably plug in to a central "synchronization chain"
> concept. This chain performs the different steps needed to
> synchronize the time between a group of traces: read the events from
> different traces, match them together, analyze them to come up with
> correction factors and finally reduce the potentially large number
> of factors to a minimal set. Each step of the synchronization chain
> is carefully kept independent of the other steps. This allows, for
> example, the accuracy analysis code to work off of UDP events from
> lttng traces or simulated TCP events from text files. Any
> appropriate module can be plugged in at any step because 1) each
> module is kept independent of other modules, 2) the core is kept
> independent of any specific module, 3) each module performs a
> single, specific task.
>
>
> Thank you for your introduction about your code, it can be helpful to
> whom is not familiar with it.
> A few months ago, we had a meeting together and we discuss about your
> code. There was some redundant codes in Convex-hull and Linear
> Regression. This redundant code converted factors after completing
> synchronization and found time based on reference time. So you decided
> to put this code in a new module that called the Reduction module.
>
> In this module you find, first of all, reference node, factors for each
> pair nodes and then best path for accuracy (and use Minimum Spanning
> Tree to find minimum drifts and offsets). Finding Minimum Spanning Tree,
> ignoring useless links and routing accurate path does not mean event
> filtering! I do implement same concept in other way.
>
> let’s take a look at simple example and talk more about my patch.
> In following example, A, B, C and D are connected. Each number in each
> link shows the number of exchanged packets.
>
> D ---- 40 ---- A ---- 5 ---- B
> \ | /
> 5 \ 20 | / 10
> \ | /
> C
> In synchronization based on accuracy (your code), you find factors for
> each pair of nodes means A-B, A-D, A-C and B-C and also D-C and
> synchronize them two by two. Reduction module finds reference node (A)
> then checks factors (drifts and offsets) and finds Minimum Spanning Tree
> based on factors (Minimum because you need less drift and offset between
> two clocks therefore more accuracy)
>
> It means, it ignores one of three links in the triangles so we lost some
> time to find factors in for ignored or useless links (A-B and D-C).
>
> I studied your documentations and some other papers and got that if
> there is more exchanged packets in a link, there is better accuracy in
> that link. It means, “A” with 70 exchanged packets is reference node and
> B will be synchronized with A through (B-C-A) path. Also you will not
> use the calculated factors between A-B and D-C after this.
>
> So what I did, is counting the number of packets before
> synchronization.To buffer packets and count them, I need to change some
> parts of Matching module. In your case, Matching module sends packet to
> Analysis module directly and drops them. In my case, packets are
> buffered in Matching module and sent to Reduction module to be organized.
>
> I explained my idea in that meeting and you said I can add this to
> Reduction module. It is meaningful because user can chose the Accuracy
> or Time based algorithm as user can chose Linear Regression or
> Convex-hull in Analysis module.
>
> Following figure illustrates modules briefly.
>
>
> In Reduction time branch, as I discussed beforehand, the number of
> exchanged packets is counted and reference node (A) is chosen based on
> them and Maximum Spanning Tree is found means following tree.
>
> D ---- 60 ---- A B
> | /
> 30 | / 20
> | /
> C
> This tree will be sent to Analysis module to synchronize nodes.
> Obviously, the total synchronization time will be less than the total
> synchronization time of the main tree since useless links has been removed.
>
So, is it right to summarize what you're doing this way?:
Instead of picking the edges that have the best accuracy, your
algorithms picks the edges that have the highest number of exchanged
messages.
-Ben
> Future Improvement:
> Since Analysis module works on “CPU-time” and needs just trace-number
> so there is no need to send whole packet to Analysis. So a simple change
> is needed in the output of Matching module and the input of Analysis
> module. With this improvement, the buffering will be removed from my
> algorithm and “time” branch too.
>
> Masoume
>
>
>
> On 15/09/10 12:36 PM, Masoume Jabbarifar wrote:
>
> ---
> lttv/lttv/Makefile.am | 2 +
> lttv/lttv/sync/Makefile.am | 4 +-
> lttv/lttv/sync/data_structures.c | 2 -
> lttv/lttv/sync/event_matching_tcp.c | 87 +++++-
> lttv/lttv/sync/event_matching_tcp.h | 1 +
> lttv/lttv/sync/event_processing_text.c | 4 +
> lttv/lttv/sync/factor_reduction.h | 6 +
> lttv/lttv/sync/factor_reduction_time.c | 514
> ++++++++++++++++++++++++++++++++
> lttv/lttv/sync/factor_reduction_time.h | 53 ++++
> lttv/lttv/sync/sync_chain_lttv.c | 6 +
> lttv/lttv/sync/sync_chain_unittest.c | 2 +
> 11 files changed, 664 insertions(+), 17 deletions(-)
> create mode 100644 lttv/lttv/sync/factor_reduction_time.c
> create mode 100644 lttv/lttv/sync/factor_reduction_time.h
>
> diff --git a/lttv/lttv/Makefile.am b/lttv/lttv/Makefile.am
> index 30ead55..750f2d2 100644
> --- a/lttv/lttv/Makefile.am
> +++ b/lttv/lttv/Makefile.am
> @@ -87,6 +87,8 @@ lttv_real_SOURCES = \
> sync/factor_reduction.h\
> sync/factor_reduction_accuracy.c\
> sync/factor_reduction_accuracy.h\
> + sync/factor_reduction_time.c\
> + sync/factor_reduction_time.h\
> sync/lookup3.h
>
> lttvinclude_HEADERS = \
> diff --git a/lttv/lttv/sync/Makefile.am b/lttv/lttv/sync/Makefile.am
> index e1d6775..7a5417d 100644
> --- a/lttv/lttv/sync/Makefile.am
> +++ b/lttv/lttv/sync/Makefile.am
> @@ -30,4 +30,6 @@ unittest_SOURCES = \
> event_analysis_linreg.h\
> factor_reduction.h\
> factor_reduction_accuracy.c\
> - factor_reduction_accuracy.h
> + factor_reduction_accuracy.h\
> + factor_reduction_time.c\
> + factor_reduction_time.h
> diff --git a/lttv/lttv/sync/data_structures.c
> b/lttv/lttv/sync/data_structures.c
> index acac9d7..c5fe736 100644
> --- a/lttv/lttv/sync/data_structures.c
> +++ b/lttv/lttv/sync/data_structures.c
> @@ -271,8 +271,6 @@ void destroyTCPSegment(Message* const segment)
> {
> TCPEvent* inE, *outE;
>
> - segment->print(segment);
> -
> g_assert(segment->inE != NULL&& segment->outE != NULL);
> g_assert(segment->inE->type == TCP&&
> segment->outE->type == TCP);
> inE= segment->inE->event.tcpEvent;
> diff --git a/lttv/lttv/sync/event_matching_tcp.c
> b/lttv/lttv/sync/event_matching_tcp.c
> index 90d6c43..678a334 100644
> --- a/lttv/lttv/sync/event_matching_tcp.c
> +++ b/lttv/lttv/sync/event_matching_tcp.c
> @@ -56,7 +56,8 @@ static void
> buildReversedConnectionKey(ConnectionKey* const
> static void openGraphDataFiles(SyncState* const syncState);
> static void closeGraphDataFiles(SyncState* const syncState);
> static void writeMessagePoint(FILE* stream, const Message*
> const message);
> -
> +static void gfPacketDestroy(gpointer data, gpointer userData);
> +static void gfExchangeDestroy(gpointer data, gpointer userData);
>
> static MatchingModule matchingModuleTCP = {
> .name= "TCP",
> @@ -101,6 +102,7 @@ void registerMatchingTCP()
> static void initMatchingTCP(SyncState* const syncState)
> {
> MatchingDataTCP* matchingData;
> + int i, j;
>
> matchingData= malloc(sizeof(MatchingDataTCP));
> syncState->matchingData= matchingData;
> @@ -113,8 +115,7 @@ static void initMatchingTCP(SyncState* const
> syncState)
> &gefConnectionKeyEqual,&gdnConnectionKeyDestroy,
> &gdnTCPSegmentListDestroy);
>
> - if (syncState->stats)
> - {
> + if (syncState->reductionModule->preProcessReduction !=
> NULL || syncState->stats) {
> unsigned int i;
>
> matchingData->stats= calloc(1,
> sizeof(MatchingStatsTCP));
> @@ -139,6 +140,20 @@ static void initMatchingTCP(SyncState*
> const syncState)
> {
> matchingData->messagePoints= NULL;
> }
> + if (syncState->reductionModule->preProcessReduction !=
> NULL) {
> + matchingData->packetArray=
> malloc(syncState->traceNb * sizeof(GQueue**));
> + for (i= 0; i< syncState->traceNb; i++) {
> + matchingData->packetArray[i]=
> malloc(syncState->traceNb * sizeof(GQueue*));
> +
> + for (j= 0; j< syncState->traceNb; j++) {
> + matchingData->packetArray[i][j]=
> g_queue_new();
> + }
> + }
> + }
> + else
> + {
> + matchingData->packetArray= NULL;
> + }
> }
>
>
> @@ -155,6 +170,7 @@ static void initMatchingTCP(SyncState* const
> syncState)
> static void destroyMatchingTCP(SyncState* const syncState)
> {
> MatchingDataTCP* matchingData;
> + int i, j;
>
> matchingData= (MatchingDataTCP*) syncState->matchingData;
>
> @@ -165,8 +181,7 @@ static void destroyMatchingTCP(SyncState*
> const syncState)
>
> partialDestroyMatchingTCP(syncState);
>
> - if (syncState->stats)
> - {
> + if (syncState->reductionModule->preProcessReduction !=
> NULL || syncState->stats) {
> unsigned int i;
>
> for (i= 0; i< syncState->traceNb; i++)
> @@ -179,6 +194,17 @@ static void destroyMatchingTCP(SyncState*
> const syncState)
>
> free(syncState->matchingData);
> syncState->matchingData= NULL;
> + if (syncState->reductionModule->preProcessReduction !=
> NULL) {
> + for (i= 0; i< syncState->traceNb; i++) {
> + for (j= 0; j<
> syncState->traceNb; j++)
> + if
> (syncState->analysisModule->analyzeMessage != NULL)
> +
> g_queue_foreach(matchingData->packetArray[i][j],
> gfPacketDestroy, NULL);
> + else
> +
> g_queue_foreach(matchingData->packetArray[i][j],
> gfExchangeDestroy, NULL);
> + free(matchingData->packetArray[i]);
> + }
> + free(matchingData->packetArray);
> + }
> }
>
>
> @@ -335,6 +361,7 @@ static void matchEvents(SyncState* const
> syncState, Event* const event,
> Message* packet;
> MatchingDataTCP* matchingData;
> GQueue* conUnAcked;
> + GQueue* packetMatching;
>
> matchingData= (MatchingDataTCP*) syncState->matchingData;
>
> @@ -354,10 +381,11 @@ static void matchEvents(SyncState* const
> syncState, Event* const event,
> packet->outE->event.tcpEvent->segmentKey=
> packet->inE->event.tcpEvent->segmentKey;
>
> if (syncState->stats)
> - {
> matchingData->stats->totPacket++;
> +
> + if
> (syncState->reductionModule->preProcessReduction != NULL ||
> syncState->stats)
>
> matchingData->stats->totMessageArray[packet->inE->traceNum][packet->outE->traceNum]++;
> - }
> +
>
> // Discard loopback traffic
> if (packet->inE->traceNum == packet->outE->traceNum)
> @@ -371,17 +399,24 @@ static void matchEvents(SyncState* const
> syncState, Event* const event,
>
> writeMessagePoint(matchingData->messagePoints[packet->inE->traceNum][packet->outE->traceNum],
> packet);
> }
> -
> - if (syncState->analysisModule->analyzeMessage !=
> NULL)
> - {
> -
> syncState->analysisModule->analyzeMessage(syncState, packet);
> + if (syncState->analysisModule->analyzeMessage !=
> NULL) {
> + if
> (syncState->reductionModule->preProcessReduction == NULL)
> +
> syncState->analysisModule->analyzeMessage(syncState, packet);
> + else {
> + packetMatching=
> +
> matchingData->packetArray[packet->inE->traceNum][packet->outE->traceNum];
> +
> g_queue_push_tail(packetMatching, packet);
> + }
> +
> }
>
> // We can skip the rest of the algorithm if the
> analysis module is not
> // interested in exchanges
> if (syncState->analysisModule->analyzeExchange
> == NULL)
> {
> - destroyTCPSegment(packet);
> + if
> (syncState->reductionModule->preProcessReduction == NULL)
> + destroyTCPSegment(packet);
> +
> return;
> }
>
> @@ -452,8 +487,14 @@ static void matchEvents(SyncState* const
> syncState, Event* const event,
>
> matchingData->stats->totExchangeSync++;
> }
>
> -
> syncState->analysisModule->analyzeExchange(syncState,
> - exchange);
> + if
> (syncState->reductionModule->preProcessReduction == NULL)
> +
> syncState->analysisModule->analyzeExchange(syncState, exchange);
> + else
> + {
> +
> packetMatching=
> +
> matchingData->packetArray[packet->inE->traceNum][packet->outE->traceNum];
> +
> g_queue_push_tail(packetMatching, exchange);
> + }
> }
>
> exchange->message= NULL;
> @@ -707,3 +748,21 @@ static void
> writeMatchingGraphsPlotsTCPMessages(SyncState* const syncState,
> "title \"Received messages\" with points linetype 4 "
> "linecolor rgb \"#6699cc\" pointtype 11 pointsize 2, \\\n", i, j);
> }
> +static void gfPacketDestroy(gpointer data, gpointer userData)
> +{
> + Message* packet;
> +
> + packet= (Message*) data;
> + destroyTCPSegment(packet);
> +
> +}
> +
> +static void gfExchangeDestroy(gpointer data, gpointer userData)
> +{
> + Exchange* exchange;
> +
> + exchange= (Exchange*) data;
> + exchange->message= NULL;
> + destroyTCPExchange(exchange);
> +
> +}
> diff --git a/lttv/lttv/sync/event_matching_tcp.h
> b/lttv/lttv/sync/event_matching_tcp.h
> index 6f9b072..e7d1d12 100644
> --- a/lttv/lttv/sync/event_matching_tcp.h
> +++ b/lttv/lttv/sync/event_matching_tcp.h
> @@ -57,6 +57,7 @@ typedef struct
> * The elements on the diagonal are not initialized.
> */
> FILE*** messagePoints;
> + GQueue*** packetArray;
> } MatchingDataTCP;
>
> void registerMatchingTCP();
> diff --git a/lttv/lttv/sync/event_processing_text.c
> b/lttv/lttv/sync/event_processing_text.c
> index bcbea9b..894f71c 100644
> --- a/lttv/lttv/sync/event_processing_text.c
> +++ b/lttv/lttv/sync/event_processing_text.c
> @@ -273,6 +273,10 @@ static AllFactors*
> finalizeProcessingText(SyncState* const syncState)
> free(line);
> }
>
> + if (syncState->reductionModule->preProcessReduction !=
> NULL) {
> +
> syncState->reductionModule->preProcessReduction(syncState);
> + }
> +
> factors=
> syncState->matchingModule->finalizeMatching(syncState);
> if (syncState->stats)
> {
> diff --git a/lttv/lttv/sync/factor_reduction.h
> b/lttv/lttv/sync/factor_reduction.h
> index 561df6b..e971ea6 100644
> --- a/lttv/lttv/sync/factor_reduction.h
> +++ b/lttv/lttv/sync/factor_reduction.h
> @@ -39,6 +39,12 @@ typedef struct
> */
> void (*destroyReduction)(struct _SyncState* const
> syncState);
>
> + /* This function is called when time reduction is needed and
> + * removes useless communication links and finds
> Spanning Tree
> + * of nodes in the network and then sends packets to
> synchronization
> + */
> + void (*preProcessReduction)(struct _SyncState* const
> syncState);
> +
> /*
> * Convert trace pair synchronization factors to a
> resulting offset and
> * drift for each trace.
> diff --git a/lttv/lttv/sync/factor_reduction_time.c
> b/lttv/lttv/sync/factor_reduction_time.c
> new file mode 100644
> index 0000000..2bd37bf
> --- /dev/null
> +++ b/lttv/lttv/sync/factor_reduction_time.c
> @@ -0,0 +1,514 @@
> +/* This file is part of the Linux Trace Toolkit viewer
> + * Copyright (C) 2010 Masoume
> Jabbarifar<masoume.jabbarifar at polymtl.ca
> <mailto:masoume.jabbarifar at polymtl.ca>>
> + *
> + * This program is free software: you can redistribute it
> and/or modify it
> + * under the terms of the GNU Lesser General Public License as
> published by
> + * the Free Software Foundation, either version 2.1 of the
> License, or (at
> + * your option) any later version.
> + *
> + * This program is distributed in the hope that it will be
> useful, but WITHOUT
> + * ANY WARRANTY; without even the implied warranty of
> MERCHANTABILITY or
> + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
> General Public
> + * License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General
> Public License
> + * along with this program. If not,
> see<http://www.gnu.org/licenses/>.
> + */
> +#define _ISOC99_SOURCE
> +
> +#ifdef HAVE_CONFIG_H
> +#include<config.h>
> +#endif
> +
> +#include<math.h>
> +#include<errno.h>
> +#include<stdlib.h>
> +#include<string.h>
> +#include<unistd.h>
> +
> +
> +#include "event_analysis.h"
> +#include "event_matching.h"
> +#include "sync_chain.h"
> +
> +#include "factor_reduction_time.h"
> +#include "event_matching_tcp.h"
> +
> +
> +/* Functions common to all reduction modules */
> +static void initReductionTime(SyncState* const syncState);
> +static void destroyReductionTime(SyncState* const syncState);
> +static void preProcessReductionTime(SyncState* const syncState);
> +
> +static GArray* finalizeReductionTime(SyncState* const syncState,
> + AllFactors* allFactors);
> +static void printReductionStatsTime(SyncState* const syncState);
> +
> +/* Functions specific to this module */
> +static void getFactors(SyncState* const syncState,AllFactors*
> const allFactors,
> + const unsigned int traceNum,
> + Factors* const factors);
> +static void maximumSpanningTree(SyncState* const syncState);
> +static void maximumCommunicationPacket(SyncState* const syncState);
> +static void floodingSyncExecute(SyncState* const syncState);
> +void updateDistances(int target,ReductionData*
> reductionData,SyncState* const syncState);
> +void sumCommunicationPackets(SyncState* const syncState);
> +void proposeRefNode(int** connectionArray, int* refNodes, int n);
> +void SyncExecute(int refNode,SyncState* const syncState);
> +int findParent(SyncState* const syncState, int root, int node);
> +
> +
> +static ReductionModule reductionModuleTime= {
> + .name= "time",
> + .initReduction=&initReductionTime,
> + .destroyReduction=&destroyReductionTime,
> + .preProcessReduction=&preProcessReductionTime,
> + .finalizeReduction=&finalizeReductionTime,
> + .printReductionStats=&printReductionStatsTime,
> + .graphFunctions= {},
> +};
> +
> +
> +/*
> + * Reduction module registering function
> + */
> +void registerReductionTime()
> +{
> + g_queue_push_tail(&reductionModules,&reductionModuleTime);
> +}
> +
> +
> +/*
> + * Reduction init function
> + *
> + * This function is called at the beginning of a
> synchronization run for a set
> + * of traces.
> + *
> + * Allocate some reduction specific data structures
> + *
> + * Args:
> + * syncState container for synchronization data.
> + */
> +static void initReductionTime(SyncState* const syncState)
> +{
> + int i;
> +
> + ReductionData* reductionData;
> +
> + reductionData= malloc(sizeof(ReductionData));
> + syncState->reductionData= reductionData;
> +
> +
> + reductionData->routed= malloc(syncState->traceNb *
> sizeof(char*));
> +
> + reductionData->distance= malloc(syncState->traceNb *
> sizeof(int*));
> +
> + reductionData->neighbour= malloc(syncState->traceNb *
> sizeof(int*));
> +
> + reductionData->totMessageArray=
> malloc(syncState->traceNb * sizeof(unsigned int*));
> +
> + for (i= 0; i< syncState->traceNb; i++) {
> + reductionData->totMessageArray[i]=
> + calloc(syncState->traceNb,
> sizeof(unsigned int));
> + }
> +
> + reductionData->totMSTMessageArray=
> malloc(syncState->traceNb * sizeof(unsigned int*));
> +
> + for (i= 0; i< syncState->traceNb; i++) {
> + reductionData->totMSTMessageArray[i]=
> + calloc(syncState->traceNb,
> sizeof(unsigned int));
> + }
> +
> + reductionData->totMSTAnalysisArray=
> malloc(syncState->traceNb * sizeof(unsigned int*));
> +
> + for (i= 0; i< syncState->traceNb; i++) {
> + reductionData->totMSTAnalysisArray[i]=
> + calloc(syncState->traceNb,
> sizeof(unsigned int));
> + }
> +
> + reductionData->maxRoot= malloc(syncState->traceNb *
> sizeof(int*));
> + reductionData->maxRootMST= malloc(syncState->traceNb *
> sizeof(int*));
> +}
> +
> +static void preProcessReductionTime(SyncState* const syncState)
> +{
> +
> + sumCommunicationPackets(syncState);
> + maximumCommunicationPacket(syncState);
> + maximumSpanningTree(syncState);
> + floodingSyncExecute(syncState);
> +}
> +/* Finding best path in the network that has more interaction
> among the
> + * nodes and synchronisation trough this path
> + *
> + * Maximum Spanning Tree function based on Prim's Algorithm
> + *
> + * The algorithm is implemented according to the code here:
> + * http://snippets.dzone.com/posts/show/4743
> + *
> +*/
> +static void maximumSpanningTree(SyncState* const syncState)
> +{
> + int i, j;
> + int total= 0;
> + int treeSize, max;
> + ReductionData* reductionData;
> +
> + reductionData= syncState->reductionData;
> +
> + /* Initialise distance with 0 */
> + for (i= 0; i< syncState->traceNb ; ++i)
> + reductionData->distance[i]= 0;
> +
> + /* Mark all nodes as NOT beeing in the maximum spanning
> tree */
> + for (i= 0; i< syncState->traceNb; ++i)
> + reductionData->routed[i]= 0;
> +
> + /* Add the first node to the tree */
> + if (reductionData->referenceNode< 0) return;
> + g_debug("Adding node %d\n", reductionData->referenceNode);
> + reductionData->routed[reductionData->referenceNode]= 1;
> + updateDistances(reductionData->referenceNode,
> reductionData, syncState);
> +
> + for (treeSize= 1; treeSize< syncState->traceNb;
> ++treeSize) {
> + /* Find the node with the bigest distance to the
> tree */
> + max= -1;
> + for (i= 0; i< syncState->traceNb; ++i)
> + if (!reductionData->routed[i])
> + if ((max == -1) ||
> (reductionData->distance[max]< reductionData->distance[i]))
> + max= i;
> +
> + /* And add it */
> +
> reductionData->totMSTMessageArray[max][reductionData->neighbour[max]]=
> reductionData->distance[max];
> +
> reductionData->totMSTMessageArray[reductionData->neighbour[max]][max]=
> reductionData->distance[max];
> +
> +
> reductionData->totMSTAnalysisArray[max][reductionData->neighbour[max]]=
> reductionData->distance[max];
> +
> reductionData->totMSTAnalysisArray[reductionData->neighbour[max]][max]=
> reductionData->distance[max];
> +
> + g_debug("Adding edge %i-MAX(%i)-D(%i)\n",
> reductionData->neighbour[max], max, reductionData->distance[max]);
> + reductionData->routed[max]= 1;
> + total+= reductionData->distance[max];
> +
> + updateDistances(max, reductionData, syncState);
> + }
> +
> +
> + for ( i=0 ; i< syncState->traceNb ; i++) {
> + for (j= 0 ; j< syncState->traceNb ; j++)
> + g_debug("\t%d ",
> reductionData->totMSTMessageArray[i][j]);
> + g_debug("\n");
> + }
> + g_debug("Total distance: %d\n", total);
> +
> +}
> +
> +
> +void updateDistances(int target, ReductionData* reductionData,
> SyncState* const syncState) {
> +
> + int i;
> + g_debug("Update[%i]= ", target);
> + for (i= 0; i< syncState->traceNb; ++i) {
> + if ((reductionData->totMessageArray[target][i]
> != 0)&& (reductionData->distance[i]<=
> reductionData->totMessageArray[target][i])) {
> + if (reductionData->distance[i] !=
> reductionData->totMessageArray[target][i]) {
> + reductionData->distance[i]=
> reductionData->totMessageArray[target][i];
> + reductionData->neighbour[i]= target;
> + }
> + else if (reductionData->distance[i] ==
> reductionData->totMessageArray[target][i]&& target ==
> reductionData->referenceNode) {
> + reductionData->distance[i]=
> reductionData->totMessageArray[target][i];
> + reductionData->neighbour[i]= target;
> + }
> + }
> + g_debug("%iD(%i)N(%i)", i,
> reductionData->distance[i], reductionData->neighbour[i]);
> + }
> +
> g_debug("\n------------------------------------------------------------\n");
> +}
> +
> +
> +static void maximumCommunicationPacket(SyncState* const syncState)
> +{
> +
> + ReductionData* reductionData;
> +
> + reductionData= syncState->reductionData;
> + proposeRefNode(reductionData->totMessageArray,
> reductionData->maxRoot, syncState->traceNb);
> + reductionData->referenceNode= reductionData->maxRoot[0];
> + g_debug("Best RefNODE is : %i\n",
> reductionData->referenceNode);
> +
> +}
> +
> +/* Symmetrical matrix of packet communications */
> +
> +void sumCommunicationPackets(SyncState* const syncState) {
> + int i, j, sum;
> +
> + MatchingDataTCP* matchingData;
> +
> + matchingData= syncState->matchingData;
> +
> + ReductionData* reductionData;
> +
> + reductionData= syncState->reductionData;
> +
> +
> + for (i= 0; i< syncState->traceNb; ++i)
> + for (j= 0; j< syncState->traceNb; ++j){
> + sum=
> matchingData->stats->totMessageArray[i][j] +
> matchingData->stats->totMessageArray[j][i];
> + reductionData->totMessageArray[i][j]= sum;
> + reductionData->totMessageArray[j][i]= sum;
> + }
> +
> +}
> +
> +/* Finding the node who has more interaction */
> +
> +void proposeRefNode(int** connectionArray, int* refNodes, int n){
> +
> + int* sumPacket;
> + int i, j, index=0;
> + int maxSumPacket=0;
> +
> + sumPacket= malloc(n* sizeof(int));
> +
> + for (i= 0; i< n; ++i) {
> + sumPacket[i]= 0;
> + for (j= 0;j< n; ++j)
> + sumPacket[i]+= connectionArray[i][j];
> + g_debug("sum(%i)=%i\n", i, sumPacket[i]);
> + if (sumPacket[i]> maxSumPacket)
> + maxSumPacket= sumPacket[i];
> + }
> + g_debug("proposed Reference Node: ");
> + for (i= 0 ; i< n ; ++i) {
> + if (sumPacket[i] == maxSumPacket) {
> + refNodes[index++]= i;
> + g_debug("%i \t", i);
> + }
> + }
> + refNodes[index]= -1;
> + g_debug("\n");
> + free(sumPacket);
> +}
> +
> +void floodingSyncExecute(SyncState* const syncState) {
> +
> + ReductionData* reductionData;
> + reductionData= syncState->reductionData;
> + SyncExecute(reductionData->referenceNode, syncState);
> +
> +}
> +
> +
> +/* First root and all of nodes who has connection with it (it's
> childs)
> + * must be synchronised then function will be run for the
> childs recursively
> + */
> +
> +void SyncExecute(int refNode, SyncState* const syncState) {
> +
> + unsigned int i, j;
> + GQueue* packetSync;
> + Message* packet;
> + Exchange* exchange;
> +
> + ReductionData* reductionData;
> +
> + reductionData= syncState->reductionData;
> +
> + MatchingDataTCP* matchingData;
> +
> + matchingData= syncState->matchingData;
> +
> + for (i= 0 ; i< syncState->traceNb ; ++i)
> + if
> (reductionData->totMSTMessageArray[refNode][i] != 0){
> + packetSync=
> matchingData->packetArray[refNode][i];
> +
> + for (j= 0 ; j< packetSync->length ; ++j){
> + if
> (syncState->analysisModule->analyzeMessage != NULL) {
> + packet=
> g_queue_peek_nth(packetSync, j);
> +
> syncState->analysisModule->analyzeMessage(syncState, packet);
> + }
> + else {
> + exchange=
> g_queue_peek_nth(packetSync, j);
> +
> syncState->analysisModule->analyzeExchange(syncState, exchange);
> + }
> + }
> +
> + packetSync=
> matchingData->packetArray[i][refNode];
> +
> + for (j= 0 ;j< packetSync->length ; ++j){
> + if
> (syncState->analysisModule->analyzeMessage != NULL) {
> + packet=
> g_queue_peek_nth(packetSync, j);
> +
> syncState->analysisModule->analyzeMessage(syncState, packet);
> + }
> + else {
> + exchange=
> g_queue_peek_nth(packetSync, j);
> +
> syncState->analysisModule->analyzeExchange(syncState, exchange);
> + }
> + }
> +
> +
> reductionData->totMSTMessageArray[i][refNode]= 0;
> +
> reductionData->totMSTMessageArray[refNode][i]= 0;
> + SyncExecute(i, syncState);
> + }
> + return;
> +}
> +/*
> + * Reduction destroy function
> + *
> + * Free the analysis specific data structures
> + *
> + * Args:
> + * syncState container for synchronization data.
> + */
> +static void destroyReductionTime(SyncState* const syncState)
> +{
> + int i;
> + ReductionData* reductionData;
> + reductionData= (ReductionData*) syncState->reductionData;
> +
> + if (reductionData == NULL) return;
> +
> + for (i= 0; i< syncState->traceNb; i++) {
> + free(reductionData->totMessageArray[i]);
> + free(reductionData->totMSTMessageArray[i]);
> + free(reductionData->totMSTAnalysisArray[i]);
> + }
> + free(reductionData->totMessageArray);
> + free(reductionData->totMSTMessageArray);
> + free(reductionData->totMSTAnalysisArray);
> + free(reductionData->routed);
> + free(reductionData->distance);
> + free(reductionData->neighbour);
> + free(reductionData->maxRoot);
> + free(reductionData->maxRootMST);
> + free(syncState->reductionData);
> + syncState->reductionData= NULL;
> +}
> +
> +
> +/*
> + * Finalize the factor reduction
> + *
> + * Calculate a resulting offset and drift for each trace.
> + *
> + * Args:
> + * syncState container for synchronization data.
> + * allFactors offset and drift between each pair of traces
> + *
> + * Returns:
> + * Factors[traceNb] synchronization factors for each trace
> +
> + */
> +static GArray* finalizeReductionTime(SyncState* const syncState,
> + AllFactors* allFactors)
> +{
> + int i, j;
> + GArray* factors;
> +
> + factors= g_array_sized_new(FALSE, FALSE, sizeof(Factors),
> + syncState->traceNb);
> + g_array_set_size(factors, syncState->traceNb);
> + for (i= 0; i< syncState->traceNb; i++) {
> + getFactors(syncState, allFactors,
> i,&g_array_index(factors,
> + Factors, i));
> + }
> +
> + return factors;
> +}
> +
> +
> +/*
> + * Print statistics related to reduction. Must be called after
> + * finalizeReduction.
> + *
> + * Args:
> + * syncState container for synchronization data.
> + */
> +static void printReductionStatsTime(SyncState* const syncState)
> +{
> +}
> +
> +/*
> + * Cummulate the time correction factors to convert a node's
> time to its
> + * reference's time.
> + * This function recursively calls itself until it reaches the
> reference node.
> + *
> + * Args:
> + * allFactors: offset and drift between each pair of traces
> + * predecessors: matrix of each node's predecessor on the
> shortest
> + * path between two nodes
> + * references: reference node for each node
> + * traceNum: node for which to find the factors
> + * factors: resulting factors
> + */
> +static void getFactors(SyncState* const syncState, AllFactors*
> const allFactors, const unsigned int traceNum,
> + Factors* const factors)
> +{
> + unsigned int reference;
> + PairFactors** const pairFactors= allFactors->pairFactors;
> + int parent, i;
> +
> + ReductionData* reductionData;
> + reductionData= syncState->reductionData;
> +
> + if (traceNum == reductionData->referenceNode)
> + reference= traceNum;
> + else if
> (reductionData->totMSTAnalysisArray[reductionData->referenceNode][traceNum]
> != 0)
> + reference= reductionData->referenceNode;
> + else
> + reference= findParent(syncState, -1, traceNum);
> +
> + if (reference == traceNum) {
> + factors->offset= 0.;
> + factors->drift= 1.;
> + }
> + else {
> + Factors previousVertexFactors;
> +
> + getFactors(syncState, allFactors,
> reference,&previousVertexFactors);
> +
> + /* Convert the time from traceNum to reference;
> + * pairFactors[row][col] converts the time from
> col to row, invert the
> + * factors as necessary */
> +
> + if (pairFactors[reference][traceNum].approx !=
> NULL) {
> + factors->offset=
> previousVertexFactors.drift *
> +
> pairFactors[reference][traceNum].approx->offset +
> + previousVertexFactors.offset;
> + factors->drift=
> previousVertexFactors.drift *
> +
> pairFactors[reference][traceNum].approx->drift;
> + }
> + else if (pairFactors[traceNum][reference].approx
> != NULL) {
> + factors->offset=
> previousVertexFactors.drift * (-1. *
> +
> pairFactors[traceNum][reference].approx->offset /
> +
> pairFactors[traceNum][reference].approx->drift) +
> + previousVertexFactors.offset;
> + factors->drift=
> previousVertexFactors.drift * (1. /
> +
> pairFactors[traceNum][reference].approx->drift);
> + }
> + else {
> + g_assert_not_reached();
> + }
> + }
> +}
> +
> +int findParent(SyncState* const syncState, int root, int node)
> +{
> +
> + int i;
> + int result;
> + ReductionData* reductionData;
> +
> + reductionData= syncState->reductionData;
> + for (i= 0; i< syncState->traceNb; i++)
> + if (reductionData->totMSTAnalysisArray[node][i]
> != 0&& i == reductionData->referenceNode)
> + return 1;
> +
> + for (i= 0; i< syncState->traceNb; i++)
> + if (reductionData->totMSTAnalysisArray[node][i]
> != 0)
> + if (i != root) {
> + result= findParent(syncState,
> node, i);
> + if (result == 1&& root == -1)
> return i;
> + if (result == 1) return 1;
> + }
> + return 0;
> +}
> diff --git a/lttv/lttv/sync/factor_reduction_time.h
> b/lttv/lttv/sync/factor_reduction_time.h
> new file mode 100644
> index 0000000..c46cbd1
> --- /dev/null
> +++ b/lttv/lttv/sync/factor_reduction_time.h
> @@ -0,0 +1,53 @@
> +/* This file is part of the Linux Trace Toolkit viewer
> + * Copyright (C) 2010 Masoume
> Jabbarifar<masoume.jabbarifar at polymtl.ca
> <mailto:masoume.jabbarifar at polymtl.ca>>
> + *
> + * This program is free software: you can redistribute it
> and/or modify it
> + * under the terms of the GNU Lesser General Public License as
> published by
> + * the Free Software Foundation, either version 2.1 of the
> License, or (at
> + * your option) any later version.
> + *
> + * This program is distributed in the hope that it will be
> useful, but WITHOUT
> + * ANY WARRANTY; without even the implied warranty of
> MERCHANTABILITY or
> + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
> General Public
> + * License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General
> Public License
> + * along with this program. If not,
> see<http://www.gnu.org/licenses/>.
> + */
> +
> +#ifndef FACTOR_REDUCTION_TIME_H
> +#define FACTOR_REDUCTION_TIME_H
> +
> +#include<glib.h>
> +
> +#include "data_structures.h"
> +
> +
> +typedef struct {
> +
> + char* routed; /* routed[i] is 1 when we have
> concidered the node i in the maximum
> + spanning tree; otherwise it is 0 */
> +
> + int* distance; /* distance[i] is the distance between
> node i and the maximum
> + spanning tree; this is initially 0; if i is
> + already in the tree, then d[i] is undefined;
> + this is just a temporary variable. It's
> not necessary but speeds
> + up execution considerably (by a factor
> of n) */
> +
> + int* neighbour; /* neighbour[i] holds the index of the
> node i would have to be
> + linked to in order to get a distance of
> d[i] */
> +
> + int referenceNode; /*referenceNode is one of networks
> node whose time will be
> + considered as reference time*/
> +
> + unsigned int** totMessageArray;
> + unsigned int** totMSTMessageArray; /*Maximum spanning
> tree is saved in totMSTMessageArray*/
> + unsigned int** totMSTAnalysisArray;
> + int* maxRoot;
> + int* maxRootMST;
> +
> +} ReductionData;
> +
> +void registerReductionTime();
> +
> +#endif
> diff --git a/lttv/lttv/sync/sync_chain_lttv.c
> b/lttv/lttv/sync/sync_chain_lttv.c
> index 95bef44..c60e8ac 100644
> --- a/lttv/lttv/sync/sync_chain_lttv.c
> +++ b/lttv/lttv/sync/sync_chain_lttv.c
> @@ -45,6 +45,7 @@
> #include "event_analysis_linreg.h"
> #include "event_analysis_eval.h"
> #include "factor_reduction_accuracy.h"
> +#include "factor_reduction_time.h"
> #include "sync_chain.h"
> #include "sync_chain_lttv.h"
>
> @@ -139,6 +140,7 @@ static void init()
> registerAnalysisEval();
>
> registerReductionAccuracy();
> + registerReductionTime();
>
> // Build module names lists for option and help string
> for (i= 0; i< ARRAY_SIZE(loopValues); i++)
> @@ -313,6 +315,10 @@ bool syncTraceset(LttvTracesetContext*
> const traceSetContext)
> lttv_process_traceset_middle(traceSetContext,
> ltt_time_infinite,
> G_MAXULONG, NULL);
> lttv_process_traceset_seek_time(traceSetContext,
> ltt_time_zero);
> +
> + // Find the best refrence node and remove the useless
> sunchronization
> + if (syncState->reductionModule->preProcessReduction != NULL)
> +
> syncState->reductionModule->preProcessReduction(syncState);
>
> // Obtain, reduce, adjust and set correction factors
> allFactors=
> syncState->processingModule->finalizeProcessing(syncState);
> diff --git a/lttv/lttv/sync/sync_chain_unittest.c
> b/lttv/lttv/sync/sync_chain_unittest.c
> index 40302a0..e76fa78 100644
> --- a/lttv/lttv/sync/sync_chain_unittest.c
> +++ b/lttv/lttv/sync/sync_chain_unittest.c
> @@ -42,6 +42,7 @@
> #include "event_analysis_linreg.h"
> #include "event_analysis_eval.h"
> #include "factor_reduction_accuracy.h"
> +#include "factor_reduction_time.h"
> #include "sync_chain.h"
>
>
> @@ -134,6 +135,7 @@ int main(const int argc, char* const argv[])
> registerAnalysisEval();
>
> registerReductionAccuracy();
> + registerReductionTime();
>
> // Initialize data structures
> syncState= malloc(sizeof(SyncState));
>
>
> _______________________________________________
> ltt-dev mailing list
> ltt-dev at lists.casi.polymtl.ca <mailto:ltt-dev at lists.casi.polymtl.ca>
> http://lists.casi.polymtl.ca/cgi-bin/mailman/listinfo/ltt-dev
>
>
More information about the lttng-dev
mailing list