[ltt-dev] [PATCH] Add time reduction option to synchronisation

Benjamin Poirier benjamin.poirier at polymtl.ca
Wed Sep 22 21:47:15 EDT 2010


On 22/09/10 07:38 PM, Masoume Jabbarifar wrote:
>
>
> On Mon, Sep 20, 2010 at 8:49 PM, Benjamin Poirier
> <benjamin.poirier at polymtl.ca <mailto:benjamin.poirier at polymtl.ca>> wrote:

Read linux-2.6/Documentation/SubmittingPatches
7) No MIME, no links, no compression, no attachments.  Just plain text.

More comments below.

>
>     Some introduction about the synchronization code and the benefits of
>     modules:
>     The synchronization framework consists of a group of modules that
>     interchangeably plug in to a central "synchronization chain"
>     concept. This chain performs the different steps needed to
>     synchronize the time between a group of traces: read the events from
>     different traces, match them together, analyze them to come up with
>     correction factors and finally reduce the potentially large number
>     of factors to a minimal set. Each step of the synchronization chain
>     is carefully kept independent of the other steps. This allows, for
>     example, the accuracy analysis code to work off of UDP events from
>     lttng traces or simulated TCP events from text files. Any
>     appropriate module can be plugged in at any step because 1) each
>     module is kept independent of other modules, 2) the core is kept
>     independent of any specific module, 3) each module performs a
>     single, specific task.
>
>
> Thank you for your introduction about your code, it can be helpful to
> whom is not familiar with it.
> A few months ago, we had a meeting together and we discuss about your
> code. There was some redundant codes in Convex-hull and Linear
> Regression. This redundant code converted factors after completing
> synchronization and found time based on reference time. So you decided
> to put this code in a new module that called the Reduction module.
>
> In this module you find, first of all, reference node, factors for each
> pair nodes and then best path for accuracy (and use Minimum Spanning
> Tree to find minimum drifts and offsets). Finding Minimum Spanning Tree,
> ignoring useless links and routing accurate path does not mean event
> filtering! I do implement same concept in other way.
>
> let’s take a look at simple example and talk more about my patch.
> In following example, A, B, C and D are connected. Each number in each
> link shows the number of exchanged packets.
>
>              D ---- 40  ---- A ---- 5  ---- B
>                   \               |               /
>                 5    \      20 |           /   10
>                      \    |     /
>                           C
> In synchronization based on accuracy (your code), you find factors for
> each pair of nodes means A-B, A-D, A-C and B-C and also D-C and
> synchronize them two by two. Reduction module finds reference node (A)
> then checks factors (drifts and offsets) and finds Minimum Spanning Tree
> based on factors (Minimum because you need less drift and offset between
> two clocks therefore more accuracy)
>
> It means, it ignores one of three links in the triangles so we lost some
> time to find factors in for ignored or useless links (A-B and D-C).
>
> I studied your documentations and some other papers and got that if
> there is more exchanged packets in a link, there is better accuracy in
> that link. It means, “A” with 70 exchanged packets is reference node and
> B will be synchronized with A through (B-C-A) path. Also you will not
> use the calculated factors between A-B and D-C after this.
>
> So what I did, is counting the number of packets before
> synchronization.To buffer packets and count them, I need to change some
> parts of Matching module. In your case, Matching module sends packet to
> Analysis module directly and drops them. In my case, packets are
> buffered in Matching module and sent to Reduction module to be organized.
>
> I explained my idea in that meeting and you said I can add this to
> Reduction module. It is meaningful  because user can chose the Accuracy
> or Time based algorithm as user can chose Linear Regression or
> Convex-hull in Analysis module.
>
> Following figure illustrates modules briefly.
>
>
> In Reduction time branch, as I discussed beforehand, the number of
> exchanged packets is counted and reference node (A) is chosen based on
> them and Maximum Spanning Tree is found means following tree.
>
>              D ---- 60  ---- A              B
>                                 |               /
>                            30 |           /   20
>                                 |     /
>                                   C
> This tree will be sent to Analysis module to synchronize nodes.
> Obviously, the total synchronization time will be less than the total
> synchronization time of the main tree since useless links has been removed.
>

So, is it right to summarize what you're doing this way?:
Instead of picking the edges that have the best accuracy, your 
algorithms picks the edges that have the highest number of exchanged 
messages.

-Ben

> Future Improvement:
> Since Analysis module works on “CPU-time”  and needs just trace-number
> so there is no need to send whole packet to Analysis. So a simple change
> is needed in the output of Matching module and the input of Analysis
> module. With this improvement, the buffering will be removed from my
> algorithm and “time” branch too.
>
> Masoume
>
>
>
>     On 15/09/10 12:36 PM, Masoume Jabbarifar wrote:
>
>         ---
>           lttv/lttv/Makefile.am                  |    2 +
>           lttv/lttv/sync/Makefile.am             |    4 +-
>           lttv/lttv/sync/data_structures.c       |    2 -
>           lttv/lttv/sync/event_matching_tcp.c    |   87 +++++-
>           lttv/lttv/sync/event_matching_tcp.h    |    1 +
>           lttv/lttv/sync/event_processing_text.c |    4 +
>           lttv/lttv/sync/factor_reduction.h      |    6 +
>           lttv/lttv/sync/factor_reduction_time.c |  514
>         ++++++++++++++++++++++++++++++++
>           lttv/lttv/sync/factor_reduction_time.h |   53 ++++
>           lttv/lttv/sync/sync_chain_lttv.c       |    6 +
>           lttv/lttv/sync/sync_chain_unittest.c   |    2 +
>           11 files changed, 664 insertions(+), 17 deletions(-)
>           create mode 100644 lttv/lttv/sync/factor_reduction_time.c
>           create mode 100644 lttv/lttv/sync/factor_reduction_time.h
>
>         diff --git a/lttv/lttv/Makefile.am b/lttv/lttv/Makefile.am
>         index 30ead55..750f2d2 100644
>         --- a/lttv/lttv/Makefile.am
>         +++ b/lttv/lttv/Makefile.am
>         @@ -87,6 +87,8 @@ lttv_real_SOURCES = \
>                 sync/factor_reduction.h\
>                 sync/factor_reduction_accuracy.c\
>                 sync/factor_reduction_accuracy.h\
>         +       sync/factor_reduction_time.c\
>         +       sync/factor_reduction_time.h\
>                 sync/lookup3.h
>
>           lttvinclude_HEADERS = \
>         diff --git a/lttv/lttv/sync/Makefile.am b/lttv/lttv/sync/Makefile.am
>         index e1d6775..7a5417d 100644
>         --- a/lttv/lttv/sync/Makefile.am
>         +++ b/lttv/lttv/sync/Makefile.am
>         @@ -30,4 +30,6 @@ unittest_SOURCES = \
>                 event_analysis_linreg.h\
>                 factor_reduction.h\
>                 factor_reduction_accuracy.c\
>         -       factor_reduction_accuracy.h
>         +       factor_reduction_accuracy.h\
>         +       factor_reduction_time.c\
>         +       factor_reduction_time.h
>         diff --git a/lttv/lttv/sync/data_structures.c
>         b/lttv/lttv/sync/data_structures.c
>         index acac9d7..c5fe736 100644
>         --- a/lttv/lttv/sync/data_structures.c
>         +++ b/lttv/lttv/sync/data_structures.c
>         @@ -271,8 +271,6 @@ void destroyTCPSegment(Message* const segment)
>           {
>                 TCPEvent* inE, *outE;
>
>         -       segment->print(segment);
>         -
>                 g_assert(segment->inE != NULL&&  segment->outE != NULL);
>                 g_assert(segment->inE->type == TCP&&
>           segment->outE->type == TCP);
>                 inE= segment->inE->event.tcpEvent;
>         diff --git a/lttv/lttv/sync/event_matching_tcp.c
>         b/lttv/lttv/sync/event_matching_tcp.c
>         index 90d6c43..678a334 100644
>         --- a/lttv/lttv/sync/event_matching_tcp.c
>         +++ b/lttv/lttv/sync/event_matching_tcp.c
>         @@ -56,7 +56,8 @@ static void
>         buildReversedConnectionKey(ConnectionKey* const
>           static void openGraphDataFiles(SyncState* const syncState);
>           static void closeGraphDataFiles(SyncState* const syncState);
>           static void writeMessagePoint(FILE* stream, const Message*
>         const message);
>         -
>         +static void gfPacketDestroy(gpointer data, gpointer userData);
>         +static void gfExchangeDestroy(gpointer data, gpointer userData);
>
>           static MatchingModule matchingModuleTCP = {
>                 .name= "TCP",
>         @@ -101,6 +102,7 @@ void registerMatchingTCP()
>           static void initMatchingTCP(SyncState* const syncState)
>           {
>                 MatchingDataTCP* matchingData;
>         +       int i, j;
>
>                 matchingData= malloc(sizeof(MatchingDataTCP));
>                 syncState->matchingData= matchingData;
>         @@ -113,8 +115,7 @@ static void initMatchingTCP(SyncState* const
>         syncState)
>         &gefConnectionKeyEqual,&gdnConnectionKeyDestroy,
>         &gdnTCPSegmentListDestroy);
>
>         -       if (syncState->stats)
>         -       {
>         +       if (syncState->reductionModule->preProcessReduction !=
>         NULL || syncState->stats) {
>                         unsigned int i;
>
>                         matchingData->stats= calloc(1,
>         sizeof(MatchingStatsTCP));
>         @@ -139,6 +140,20 @@ static void initMatchingTCP(SyncState*
>         const syncState)
>                 {
>                         matchingData->messagePoints= NULL;
>                 }
>         +       if (syncState->reductionModule->preProcessReduction !=
>         NULL) {
>         +               matchingData->packetArray=
>         malloc(syncState->traceNb * sizeof(GQueue**));
>         +               for (i= 0; i<  syncState->traceNb; i++) {
>         +                       matchingData->packetArray[i]=
>         malloc(syncState->traceNb * sizeof(GQueue*));
>         +
>         +                       for (j= 0; j<  syncState->traceNb; j++) {
>         +                               matchingData->packetArray[i][j]=
>         g_queue_new();
>         +                       }
>         +               }
>         +       }
>         +       else
>         +       {
>         +               matchingData->packetArray= NULL;
>         +       }
>           }
>
>
>         @@ -155,6 +170,7 @@ static void initMatchingTCP(SyncState* const
>         syncState)
>           static void destroyMatchingTCP(SyncState* const syncState)
>           {
>                 MatchingDataTCP* matchingData;
>         +       int i, j;
>
>                 matchingData= (MatchingDataTCP*) syncState->matchingData;
>
>         @@ -165,8 +181,7 @@ static void destroyMatchingTCP(SyncState*
>         const syncState)
>
>                 partialDestroyMatchingTCP(syncState);
>
>         -       if (syncState->stats)
>         -       {
>         +       if (syncState->reductionModule->preProcessReduction !=
>         NULL || syncState->stats) {
>                         unsigned int i;
>
>                         for (i= 0; i<  syncState->traceNb; i++)
>         @@ -179,6 +194,17 @@ static void destroyMatchingTCP(SyncState*
>         const syncState)
>
>                 free(syncState->matchingData);
>                 syncState->matchingData= NULL;
>         +       if (syncState->reductionModule->preProcessReduction !=
>         NULL) {
>         +                       for (i= 0; i<  syncState->traceNb; i++) {
>         +                               for (j= 0; j<
>           syncState->traceNb; j++)
>         +                                       if
>         (syncState->analysisModule->analyzeMessage != NULL)
>         +
>         g_queue_foreach(matchingData->packetArray[i][j],
>         gfPacketDestroy, NULL);
>         +                                       else
>         +
>         g_queue_foreach(matchingData->packetArray[i][j],
>         gfExchangeDestroy, NULL);
>         +                               free(matchingData->packetArray[i]);
>         +                       }
>         +                       free(matchingData->packetArray);
>         +               }
>           }
>
>
>         @@ -335,6 +361,7 @@ static void matchEvents(SyncState* const
>         syncState, Event* const event,
>                 Message* packet;
>                 MatchingDataTCP* matchingData;
>                 GQueue* conUnAcked;
>         +       GQueue* packetMatching;
>
>                 matchingData= (MatchingDataTCP*) syncState->matchingData;
>
>         @@ -354,10 +381,11 @@ static void matchEvents(SyncState* const
>         syncState, Event* const event,
>                         packet->outE->event.tcpEvent->segmentKey=
>         packet->inE->event.tcpEvent->segmentKey;
>
>                         if (syncState->stats)
>         -               {
>                                 matchingData->stats->totPacket++;
>         +
>         +               if
>         (syncState->reductionModule->preProcessReduction != NULL ||
>         syncState->stats)
>
>           matchingData->stats->totMessageArray[packet->inE->traceNum][packet->outE->traceNum]++;
>         -               }
>         +
>
>                         // Discard loopback traffic
>                         if (packet->inE->traceNum == packet->outE->traceNum)
>         @@ -371,17 +399,24 @@ static void matchEvents(SyncState* const
>         syncState, Event* const event,
>
>           writeMessagePoint(matchingData->messagePoints[packet->inE->traceNum][packet->outE->traceNum],
>                                         packet);
>                         }
>         -
>         -               if (syncState->analysisModule->analyzeMessage !=
>         NULL)
>         -               {
>         -
>         syncState->analysisModule->analyzeMessage(syncState, packet);
>         +               if (syncState->analysisModule->analyzeMessage !=
>         NULL) {
>         +                       if
>         (syncState->reductionModule->preProcessReduction == NULL)
>         +
>         syncState->analysisModule->analyzeMessage(syncState, packet);
>         +                       else {
>         +                               packetMatching=
>         +
>         matchingData->packetArray[packet->inE->traceNum][packet->outE->traceNum];
>         +
>         g_queue_push_tail(packetMatching, packet);
>         +                       }
>         +
>                         }
>
>                         // We can skip the rest of the algorithm if the
>         analysis module is not
>                         // interested in exchanges
>                         if (syncState->analysisModule->analyzeExchange
>         == NULL)
>                         {
>         -                       destroyTCPSegment(packet);
>         +                       if
>         (syncState->reductionModule->preProcessReduction == NULL)
>         +                               destroyTCPSegment(packet);
>         +
>                                 return;
>                         }
>
>         @@ -452,8 +487,14 @@ static void matchEvents(SyncState* const
>         syncState, Event* const event,
>
>           matchingData->stats->totExchangeSync++;
>                                                         }
>
>         -
>         syncState->analysisModule->analyzeExchange(syncState,
>         -                                                       exchange);
>         +                                               if
>         (syncState->reductionModule->preProcessReduction == NULL)
>         +
>         syncState->analysisModule->analyzeExchange(syncState, exchange);
>         +                                               else
>         +                                               {
>         +
>         packetMatching=
>         +
>         matchingData->packetArray[packet->inE->traceNum][packet->outE->traceNum];
>         +
>         g_queue_push_tail(packetMatching, exchange);
>         +                                               }
>                                                 }
>
>                                                 exchange->message= NULL;
>         @@ -707,3 +748,21 @@ static void
>         writeMatchingGraphsPlotsTCPMessages(SyncState* const syncState,
>         "title \"Received messages\" with points linetype 4 "
>         "linecolor rgb \"#6699cc\" pointtype 11 pointsize 2, \\\n", i, j);
>           }
>         +static void gfPacketDestroy(gpointer data, gpointer userData)
>         +{
>         +       Message* packet;
>         +
>         +       packet= (Message*) data;
>         +       destroyTCPSegment(packet);
>         +
>         +}
>         +
>         +static void gfExchangeDestroy(gpointer data, gpointer userData)
>         +{
>         +       Exchange* exchange;
>         +
>         +       exchange= (Exchange*) data;
>         +       exchange->message= NULL;
>         +       destroyTCPExchange(exchange);
>         +
>         +}
>         diff --git a/lttv/lttv/sync/event_matching_tcp.h
>         b/lttv/lttv/sync/event_matching_tcp.h
>         index 6f9b072..e7d1d12 100644
>         --- a/lttv/lttv/sync/event_matching_tcp.h
>         +++ b/lttv/lttv/sync/event_matching_tcp.h
>         @@ -57,6 +57,7 @@ typedef struct
>                  * The elements on the diagonal are not initialized.
>                  */
>                 FILE*** messagePoints;
>         +       GQueue*** packetArray;
>           } MatchingDataTCP;
>
>           void registerMatchingTCP();
>         diff --git a/lttv/lttv/sync/event_processing_text.c
>         b/lttv/lttv/sync/event_processing_text.c
>         index bcbea9b..894f71c 100644
>         --- a/lttv/lttv/sync/event_processing_text.c
>         +++ b/lttv/lttv/sync/event_processing_text.c
>         @@ -273,6 +273,10 @@ static AllFactors*
>         finalizeProcessingText(SyncState* const syncState)
>                         free(line);
>                 }
>
>         +       if (syncState->reductionModule->preProcessReduction !=
>         NULL) {
>         +
>         syncState->reductionModule->preProcessReduction(syncState);
>         +       }
>         +
>                 factors=
>         syncState->matchingModule->finalizeMatching(syncState);
>                 if (syncState->stats)
>                 {
>         diff --git a/lttv/lttv/sync/factor_reduction.h
>         b/lttv/lttv/sync/factor_reduction.h
>         index 561df6b..e971ea6 100644
>         --- a/lttv/lttv/sync/factor_reduction.h
>         +++ b/lttv/lttv/sync/factor_reduction.h
>         @@ -39,6 +39,12 @@ typedef struct
>                  */
>                 void (*destroyReduction)(struct _SyncState* const
>         syncState);
>
>         +       /* This function is called when time reduction is needed and
>         +        * removes useless communication links and finds
>         Spanning Tree
>         +        * of nodes in the network and then sends packets to
>         synchronization
>         +        */
>         +       void (*preProcessReduction)(struct _SyncState* const
>         syncState);
>         +
>                 /*
>                  * Convert trace pair synchronization factors to a
>         resulting offset and
>                  * drift for each trace.
>         diff --git a/lttv/lttv/sync/factor_reduction_time.c
>         b/lttv/lttv/sync/factor_reduction_time.c
>         new file mode 100644
>         index 0000000..2bd37bf
>         --- /dev/null
>         +++ b/lttv/lttv/sync/factor_reduction_time.c
>         @@ -0,0 +1,514 @@
>         +/* This file is part of the Linux Trace Toolkit viewer
>         + * Copyright (C) 2010 Masoume
>         Jabbarifar<masoume.jabbarifar at polymtl.ca
>         <mailto:masoume.jabbarifar at polymtl.ca>>
>         + *
>         + * This program is free software: you can redistribute it
>         and/or modify it
>         + * under the terms of the GNU Lesser General Public License as
>         published by
>         + * the Free Software Foundation, either version 2.1 of the
>         License, or (at
>         + * your option) any later version.
>         + *
>         + * This program is distributed in the hope that it will be
>         useful, but WITHOUT
>         + * ANY WARRANTY; without even the implied warranty of
>         MERCHANTABILITY or
>         + * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser
>         General Public
>         + * License for more details.
>         + *
>         + * You should have received a copy of the GNU Lesser General
>         Public License
>         + * along with this program.  If not,
>         see<http://www.gnu.org/licenses/>.
>         + */
>         +#define _ISOC99_SOURCE
>         +
>         +#ifdef HAVE_CONFIG_H
>         +#include<config.h>
>         +#endif
>         +
>         +#include<math.h>
>         +#include<errno.h>
>         +#include<stdlib.h>
>         +#include<string.h>
>         +#include<unistd.h>
>         +
>         +
>         +#include "event_analysis.h"
>         +#include "event_matching.h"
>         +#include "sync_chain.h"
>         +
>         +#include "factor_reduction_time.h"
>         +#include "event_matching_tcp.h"
>         +
>         +
>         +/* Functions common to all reduction modules */
>         +static void initReductionTime(SyncState* const syncState);
>         +static void destroyReductionTime(SyncState* const syncState);
>         +static void preProcessReductionTime(SyncState* const syncState);
>         +
>         +static GArray* finalizeReductionTime(SyncState* const syncState,
>         +       AllFactors* allFactors);
>         +static void printReductionStatsTime(SyncState* const syncState);
>         +
>         +/* Functions specific to this module */
>         +static void getFactors(SyncState* const syncState,AllFactors*
>         const allFactors,
>         +       const unsigned int traceNum,
>         +       Factors* const factors);
>         +static void maximumSpanningTree(SyncState* const syncState);
>         +static void maximumCommunicationPacket(SyncState* const syncState);
>         +static void floodingSyncExecute(SyncState* const syncState);
>         +void updateDistances(int target,ReductionData*
>         reductionData,SyncState* const syncState);
>         +void sumCommunicationPackets(SyncState* const syncState);
>         +void proposeRefNode(int** connectionArray, int* refNodes, int n);
>         +void SyncExecute(int refNode,SyncState* const syncState);
>         +int findParent(SyncState* const syncState, int root, int node);
>         +
>         +
>         +static ReductionModule reductionModuleTime= {
>         +       .name= "time",
>         +       .initReduction=&initReductionTime,
>         +       .destroyReduction=&destroyReductionTime,
>         +       .preProcessReduction=&preProcessReductionTime,
>         +       .finalizeReduction=&finalizeReductionTime,
>         +       .printReductionStats=&printReductionStatsTime,
>         +       .graphFunctions= {},
>         +};
>         +
>         +
>         +/*
>         + * Reduction module registering function
>         + */
>         +void registerReductionTime()
>         +{
>         +       g_queue_push_tail(&reductionModules,&reductionModuleTime);
>         +}
>         +
>         +
>         +/*
>         + * Reduction init function
>         + *
>         + * This function is called at the beginning of a
>         synchronization run for a set
>         + * of traces.
>         + *
>         + * Allocate some reduction specific data structures
>         + *
>         + * Args:
>         + *   syncState     container for synchronization data.
>         + */
>         +static void initReductionTime(SyncState* const syncState)
>         +{
>         +       int i;
>         +
>         +       ReductionData* reductionData;
>         +
>         +       reductionData= malloc(sizeof(ReductionData));
>         +       syncState->reductionData= reductionData;
>         +
>         +
>         +       reductionData->routed= malloc(syncState->traceNb *
>         sizeof(char*));
>         +
>         +       reductionData->distance= malloc(syncState->traceNb *
>         sizeof(int*));
>         +
>         +       reductionData->neighbour= malloc(syncState->traceNb *
>         sizeof(int*));
>         +
>         +       reductionData->totMessageArray=
>         malloc(syncState->traceNb * sizeof(unsigned int*));
>         +
>         +       for (i= 0; i<  syncState->traceNb; i++) {
>         +               reductionData->totMessageArray[i]=
>         +                       calloc(syncState->traceNb,
>         sizeof(unsigned int));
>         +       }
>         +
>         +       reductionData->totMSTMessageArray=
>         malloc(syncState->traceNb * sizeof(unsigned int*));
>         +
>         +       for (i= 0; i<  syncState->traceNb; i++) {
>         +               reductionData->totMSTMessageArray[i]=
>         +                       calloc(syncState->traceNb,
>         sizeof(unsigned int));
>         +       }
>         +
>         +       reductionData->totMSTAnalysisArray=
>         malloc(syncState->traceNb * sizeof(unsigned int*));
>         +
>         +       for (i= 0; i<  syncState->traceNb; i++) {
>         +               reductionData->totMSTAnalysisArray[i]=
>         +                       calloc(syncState->traceNb,
>         sizeof(unsigned int));
>         +       }
>         +
>         +       reductionData->maxRoot= malloc(syncState->traceNb *
>         sizeof(int*));
>         +       reductionData->maxRootMST= malloc(syncState->traceNb *
>         sizeof(int*));
>         +}
>         +
>         +static void preProcessReductionTime(SyncState* const syncState)
>         +{
>         +
>         +       sumCommunicationPackets(syncState);
>         +       maximumCommunicationPacket(syncState);
>         +       maximumSpanningTree(syncState);
>         +       floodingSyncExecute(syncState);
>         +}
>         +/* Finding best path in the network that has more interaction
>         among the
>         + * nodes and synchronisation trough this path
>         + *
>         + * Maximum Spanning Tree function based on Prim's Algorithm
>         + *
>         + * The algorithm is implemented according to the code here:
>         + * http://snippets.dzone.com/posts/show/4743
>         + *
>         +*/
>         +static void maximumSpanningTree(SyncState* const syncState)
>         +{
>         +       int i, j;
>         +       int total= 0;
>         +       int treeSize, max;
>         +       ReductionData* reductionData;
>         +
>         +       reductionData= syncState->reductionData;
>         +
>         +       /* Initialise distance with 0 */
>         +       for (i= 0; i<  syncState->traceNb ; ++i)
>         +               reductionData->distance[i]= 0;
>         +
>         +       /* Mark all nodes as NOT beeing in the maximum spanning
>         tree */
>         +       for (i= 0; i<  syncState->traceNb; ++i)
>         +               reductionData->routed[i]= 0;
>         +
>         +       /* Add the first node to the tree */
>         +       if (reductionData->referenceNode<  0) return;
>         +       g_debug("Adding node %d\n", reductionData->referenceNode);
>         +       reductionData->routed[reductionData->referenceNode]= 1;
>         +       updateDistances(reductionData->referenceNode,
>         reductionData, syncState);
>         +
>         +       for (treeSize= 1; treeSize<  syncState->traceNb;
>         ++treeSize) {
>         +               /* Find the node with the bigest distance to the
>         tree */
>         +               max= -1;
>         +               for (i= 0; i<  syncState->traceNb; ++i)
>         +                       if (!reductionData->routed[i])
>         +                               if ((max == -1) ||
>         (reductionData->distance[max]<  reductionData->distance[i]))
>         +                                       max= i;
>         +
>         +               /* And add it */
>         +
>         reductionData->totMSTMessageArray[max][reductionData->neighbour[max]]=
>         reductionData->distance[max];
>         +
>         reductionData->totMSTMessageArray[reductionData->neighbour[max]][max]=
>         reductionData->distance[max];
>         +
>         +
>         reductionData->totMSTAnalysisArray[max][reductionData->neighbour[max]]=
>         reductionData->distance[max];
>         +
>         reductionData->totMSTAnalysisArray[reductionData->neighbour[max]][max]=
>         reductionData->distance[max];
>         +
>         +               g_debug("Adding edge %i-MAX(%i)-D(%i)\n",
>         reductionData->neighbour[max], max, reductionData->distance[max]);
>         +               reductionData->routed[max]= 1;
>         +               total+= reductionData->distance[max];
>         +
>         +               updateDistances(max, reductionData, syncState);
>         +       }
>         +
>         +
>         +       for ( i=0 ; i<  syncState->traceNb ; i++) {
>         +               for (j= 0 ; j<  syncState->traceNb ; j++)
>         +                       g_debug("\t%d ",
>         reductionData->totMSTMessageArray[i][j]);
>         +       g_debug("\n");
>         +       }
>         +       g_debug("Total distance: %d\n", total);
>         +
>         +}
>         +
>         +
>         +void updateDistances(int target, ReductionData* reductionData,
>         SyncState* const syncState) {
>         +
>         +       int i;
>         +       g_debug("Update[%i]= ", target);
>         +       for (i= 0; i<  syncState->traceNb; ++i) {
>         +               if ((reductionData->totMessageArray[target][i]
>         != 0)&&  (reductionData->distance[i]<=
>         reductionData->totMessageArray[target][i])) {
>         +                       if (reductionData->distance[i] !=
>         reductionData->totMessageArray[target][i]) {
>         +                               reductionData->distance[i]=
>         reductionData->totMessageArray[target][i];
>         +                               reductionData->neighbour[i]= target;
>         +                       }
>         +                       else if (reductionData->distance[i] ==
>         reductionData->totMessageArray[target][i]&&  target ==
>         reductionData->referenceNode) {
>         +                               reductionData->distance[i]=
>         reductionData->totMessageArray[target][i];
>         +                               reductionData->neighbour[i]= target;
>         +                       }
>         +               }
>         +               g_debug("%iD(%i)N(%i)", i,
>         reductionData->distance[i], reductionData->neighbour[i]);
>         +        }
>         +
>         g_debug("\n------------------------------------------------------------\n");
>         +}
>         +
>         +
>         +static void maximumCommunicationPacket(SyncState* const syncState)
>         +{
>         +
>         +       ReductionData* reductionData;
>         +
>         +       reductionData= syncState->reductionData;
>         +       proposeRefNode(reductionData->totMessageArray,
>         reductionData->maxRoot, syncState->traceNb);
>         +       reductionData->referenceNode= reductionData->maxRoot[0];
>         +       g_debug("Best RefNODE is : %i\n",
>         reductionData->referenceNode);
>         +
>         +}
>         +
>         +/* Symmetrical matrix of packet communications */
>         +
>         +void sumCommunicationPackets(SyncState* const syncState) {
>         +       int i, j, sum;
>         +
>         +       MatchingDataTCP* matchingData;
>         +
>         +       matchingData= syncState->matchingData;
>         +
>         +       ReductionData* reductionData;
>         +
>         +       reductionData= syncState->reductionData;
>         +
>         +
>         +       for (i= 0; i<  syncState->traceNb; ++i)
>         +               for (j= 0; j<  syncState->traceNb; ++j){
>         +                       sum=
>         matchingData->stats->totMessageArray[i][j] +
>         matchingData->stats->totMessageArray[j][i];
>         +                       reductionData->totMessageArray[i][j]= sum;
>         +                       reductionData->totMessageArray[j][i]= sum;
>         +               }
>         +
>         +}
>         +
>         +/* Finding the node who has more interaction */
>         +
>         +void proposeRefNode(int** connectionArray, int* refNodes, int n){
>         +
>         +       int* sumPacket;
>         +       int i, j, index=0;
>         +       int maxSumPacket=0;
>         +
>         +       sumPacket= malloc(n* sizeof(int));
>         +
>         +       for (i= 0; i<  n; ++i) {
>         +               sumPacket[i]= 0;
>         +               for (j= 0;j<  n; ++j)
>         +                       sumPacket[i]+= connectionArray[i][j];
>         +               g_debug("sum(%i)=%i\n", i, sumPacket[i]);
>         +               if (sumPacket[i]>  maxSumPacket)
>         +                       maxSumPacket= sumPacket[i];
>         +       }
>         +       g_debug("proposed Reference Node: ");
>         +       for (i= 0 ; i<  n ; ++i) {
>         +               if (sumPacket[i] == maxSumPacket) {
>         +                       refNodes[index++]= i;
>         +                       g_debug("%i \t", i);
>         +               }
>         +       }
>         +       refNodes[index]= -1;
>         +       g_debug("\n");
>         +       free(sumPacket);
>         +}
>         +
>         +void floodingSyncExecute(SyncState* const syncState) {
>         +
>         +       ReductionData* reductionData;
>         +       reductionData= syncState->reductionData;
>         +       SyncExecute(reductionData->referenceNode, syncState);
>         +
>         +}
>         +
>         +
>         +/* First root and all of nodes who has connection with it (it's
>         childs)
>         + * must be synchronised then function will be run for the
>         childs recursively
>         + */
>         +
>         +void SyncExecute(int refNode, SyncState* const syncState) {
>         +
>         +       unsigned int i, j;
>         +       GQueue* packetSync;
>         +       Message* packet;
>         +       Exchange* exchange;
>         +
>         +       ReductionData* reductionData;
>         +
>         +       reductionData= syncState->reductionData;
>         +
>         +       MatchingDataTCP* matchingData;
>         +
>         +       matchingData= syncState->matchingData;
>         +
>         +       for (i= 0 ; i<  syncState->traceNb ; ++i)
>         +               if
>         (reductionData->totMSTMessageArray[refNode][i] != 0){
>         +                       packetSync=
>         matchingData->packetArray[refNode][i];
>         +
>         +                       for (j= 0 ; j<  packetSync->length ; ++j){
>         +                               if
>         (syncState->analysisModule->analyzeMessage != NULL) {
>         +                                       packet=
>         g_queue_peek_nth(packetSync, j);
>         +
>         syncState->analysisModule->analyzeMessage(syncState, packet);
>         +                               }
>         +                               else {
>         +                                       exchange=
>         g_queue_peek_nth(packetSync, j);
>         +
>         syncState->analysisModule->analyzeExchange(syncState, exchange);
>         +                               }
>         +                       }
>         +
>         +                       packetSync=
>         matchingData->packetArray[i][refNode];
>         +
>         +                       for (j= 0 ;j<  packetSync->length ; ++j){
>         +                               if
>         (syncState->analysisModule->analyzeMessage != NULL) {
>         +                                       packet=
>         g_queue_peek_nth(packetSync, j);
>         +
>         syncState->analysisModule->analyzeMessage(syncState, packet);
>         +                               }
>         +                               else {
>         +                                       exchange=
>         g_queue_peek_nth(packetSync, j);
>         +
>         syncState->analysisModule->analyzeExchange(syncState, exchange);
>         +                               }
>         +                       }
>         +
>         +
>         reductionData->totMSTMessageArray[i][refNode]= 0;
>         +
>         reductionData->totMSTMessageArray[refNode][i]= 0;
>         +                       SyncExecute(i, syncState);
>         +               }
>         +       return;
>         +}
>         +/*
>         + * Reduction destroy function
>         + *
>         + * Free the analysis specific data structures
>         + *
>         + * Args:
>         + *   syncState     container for synchronization data.
>         + */
>         +static void destroyReductionTime(SyncState* const syncState)
>         +{
>         +       int i;
>         +       ReductionData* reductionData;
>         +       reductionData= (ReductionData*) syncState->reductionData;
>         +
>         +       if (reductionData == NULL) return;
>         +
>         +       for (i= 0; i<  syncState->traceNb; i++) {
>         +               free(reductionData->totMessageArray[i]);
>         +               free(reductionData->totMSTMessageArray[i]);
>         +               free(reductionData->totMSTAnalysisArray[i]);
>         +       }
>         +       free(reductionData->totMessageArray);
>         +       free(reductionData->totMSTMessageArray);
>         +       free(reductionData->totMSTAnalysisArray);
>         +       free(reductionData->routed);
>         +       free(reductionData->distance);
>         +       free(reductionData->neighbour);
>         +       free(reductionData->maxRoot);
>         +       free(reductionData->maxRootMST);
>         +       free(syncState->reductionData);
>         +       syncState->reductionData= NULL;
>         +}
>         +
>         +
>         +/*
>         + * Finalize the factor reduction
>         + *
>         + * Calculate a resulting offset and drift for each trace.
>         + *
>         + * Args:
>         + *   syncState     container for synchronization data.
>         + *   allFactors    offset and drift between each pair of traces
>         + *
>         + * Returns:
>         + *   Factors[traceNb] synchronization factors for each trace
>         +
>         + */
>         +static GArray* finalizeReductionTime(SyncState* const syncState,
>         +       AllFactors* allFactors)
>         +{
>         +       int i, j;
>         +       GArray* factors;
>         +
>         +       factors= g_array_sized_new(FALSE, FALSE, sizeof(Factors),
>         +               syncState->traceNb);
>         +       g_array_set_size(factors, syncState->traceNb);
>         +       for (i= 0; i<  syncState->traceNb; i++) {
>         +               getFactors(syncState, allFactors,
>         i,&g_array_index(factors,
>         +                       Factors, i));
>         +       }
>         +
>         +       return factors;
>         +}
>         +
>         +
>         +/*
>         + * Print statistics related to reduction. Must be called after
>         + * finalizeReduction.
>         + *
>         + * Args:
>         + *   syncState     container for synchronization data.
>         + */
>         +static void printReductionStatsTime(SyncState* const syncState)
>         +{
>         +}
>         +
>         +/*
>         + * Cummulate the time correction factors to convert a node's
>         time to its
>         + * reference's time.
>         + * This function recursively calls itself until it reaches the
>         reference node.
>         + *
>         + * Args:
>         + *   allFactors:   offset and drift between each pair of traces
>         + *   predecessors: matrix of each node's predecessor on the
>         shortest
>         + *                 path between two nodes
>         + *   references:   reference node for each node
>         + *   traceNum:     node for which to find the factors
>         + *   factors:      resulting factors
>         + */
>         +static void getFactors(SyncState* const syncState, AllFactors*
>         const allFactors, const unsigned int traceNum,
>         +       Factors* const factors)
>         +{
>         +       unsigned int reference;
>         +       PairFactors** const pairFactors= allFactors->pairFactors;
>         +       int parent, i;
>         +
>         +       ReductionData* reductionData;
>         +       reductionData= syncState->reductionData;
>         +
>         +       if (traceNum == reductionData->referenceNode)
>         +               reference= traceNum;
>         +       else if
>         (reductionData->totMSTAnalysisArray[reductionData->referenceNode][traceNum]
>         != 0)
>         +               reference= reductionData->referenceNode;
>         +       else
>         +               reference= findParent(syncState, -1, traceNum);
>         +
>         +       if (reference == traceNum) {
>         +               factors->offset= 0.;
>         +               factors->drift= 1.;
>         +       }
>         +       else {
>         +               Factors previousVertexFactors;
>         +
>         +               getFactors(syncState, allFactors,
>         reference,&previousVertexFactors);
>         +
>         +               /* Convert the time from traceNum to reference;
>         +                * pairFactors[row][col] converts the time from
>         col to row, invert the
>         +                * factors as necessary */
>         +
>         +               if (pairFactors[reference][traceNum].approx !=
>         NULL) {
>         +                       factors->offset=
>         previousVertexFactors.drift *
>         +
>         pairFactors[reference][traceNum].approx->offset +
>         +                               previousVertexFactors.offset;
>         +                       factors->drift=
>         previousVertexFactors.drift *
>         +
>         pairFactors[reference][traceNum].approx->drift;
>         +               }
>         +               else if (pairFactors[traceNum][reference].approx
>         != NULL) {
>         +                       factors->offset=
>         previousVertexFactors.drift * (-1. *
>         +
>         pairFactors[traceNum][reference].approx->offset /
>         +
>         pairFactors[traceNum][reference].approx->drift) +
>         +                               previousVertexFactors.offset;
>         +                       factors->drift=
>         previousVertexFactors.drift * (1. /
>         +
>         pairFactors[traceNum][reference].approx->drift);
>         +               }
>         +               else {
>         +                       g_assert_not_reached();
>         +               }
>         +       }
>         +}
>         +
>         +int findParent(SyncState* const syncState, int root, int node)
>         +{
>         +
>         +       int i;
>         +       int result;
>         +       ReductionData* reductionData;
>         +
>         +       reductionData= syncState->reductionData;
>         +       for (i= 0; i<  syncState->traceNb; i++)
>         +               if (reductionData->totMSTAnalysisArray[node][i]
>         != 0&&  i == reductionData->referenceNode)
>         +                       return 1;
>         +
>         +       for (i= 0; i<  syncState->traceNb; i++)
>         +               if (reductionData->totMSTAnalysisArray[node][i]
>         != 0)
>         +                       if (i != root) {
>         +                               result= findParent(syncState,
>         node, i);
>         +                               if (result == 1&&  root == -1)
>         return i;
>         +                               if (result == 1) return 1;
>         +                       }
>         +       return 0;
>         +}
>         diff --git a/lttv/lttv/sync/factor_reduction_time.h
>         b/lttv/lttv/sync/factor_reduction_time.h
>         new file mode 100644
>         index 0000000..c46cbd1
>         --- /dev/null
>         +++ b/lttv/lttv/sync/factor_reduction_time.h
>         @@ -0,0 +1,53 @@
>         +/* This file is part of the Linux Trace Toolkit viewer
>         + * Copyright (C) 2010 Masoume
>         Jabbarifar<masoume.jabbarifar at polymtl.ca
>         <mailto:masoume.jabbarifar at polymtl.ca>>
>         + *
>         + * This program is free software: you can redistribute it
>         and/or modify it
>         + * under the terms of the GNU Lesser General Public License as
>         published by
>         + * the Free Software Foundation, either version 2.1 of the
>         License, or (at
>         + * your option) any later version.
>         + *
>         + * This program is distributed in the hope that it will be
>         useful, but WITHOUT
>         + * ANY WARRANTY; without even the implied warranty of
>         MERCHANTABILITY or
>         + * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser
>         General Public
>         + * License for more details.
>         + *
>         + * You should have received a copy of the GNU Lesser General
>         Public License
>         + * along with this program.  If not,
>         see<http://www.gnu.org/licenses/>.
>         + */
>         +
>         +#ifndef FACTOR_REDUCTION_TIME_H
>         +#define FACTOR_REDUCTION_TIME_H
>         +
>         +#include<glib.h>
>         +
>         +#include "data_structures.h"
>         +
>         +
>         +typedef struct {
>         +
>         +       char* routed;   /* routed[i] is 1 when we have
>         concidered the node i in the maximum
>         +                       spanning tree; otherwise it is 0 */
>         +
>         +       int* distance;  /* distance[i] is the distance between
>         node i and the maximum
>         +                       spanning tree; this is initially 0; if i is
>         +                       already in the tree, then d[i] is undefined;
>         +                       this is just a temporary variable. It's
>         not necessary but speeds
>         +                       up execution considerably (by a factor
>         of n) */
>         +
>         +       int* neighbour; /* neighbour[i] holds the index of the
>         node i would have to be
>         +                       linked to in order to get a distance of
>         d[i] */
>         +
>         +       int referenceNode; /*referenceNode is one of networks
>         node whose time will be
>         +                          considered as reference time*/
>         +
>         +       unsigned int** totMessageArray;
>         +       unsigned int** totMSTMessageArray;  /*Maximum spanning
>         tree is saved in totMSTMessageArray*/
>         +       unsigned int** totMSTAnalysisArray;
>         +       int* maxRoot;
>         +       int* maxRootMST;
>         +
>         +} ReductionData;
>         +
>         +void registerReductionTime();
>         +
>         +#endif
>         diff --git a/lttv/lttv/sync/sync_chain_lttv.c
>         b/lttv/lttv/sync/sync_chain_lttv.c
>         index 95bef44..c60e8ac 100644
>         --- a/lttv/lttv/sync/sync_chain_lttv.c
>         +++ b/lttv/lttv/sync/sync_chain_lttv.c
>         @@ -45,6 +45,7 @@
>           #include "event_analysis_linreg.h"
>           #include "event_analysis_eval.h"
>           #include "factor_reduction_accuracy.h"
>         +#include "factor_reduction_time.h"
>           #include "sync_chain.h"
>           #include "sync_chain_lttv.h"
>
>         @@ -139,6 +140,7 @@ static void init()
>                 registerAnalysisEval();
>
>                 registerReductionAccuracy();
>         +       registerReductionTime();
>
>                 // Build module names lists for option and help string
>                 for (i= 0; i<  ARRAY_SIZE(loopValues); i++)
>         @@ -313,6 +315,10 @@ bool syncTraceset(LttvTracesetContext*
>         const traceSetContext)
>                 lttv_process_traceset_middle(traceSetContext,
>         ltt_time_infinite,
>                         G_MAXULONG, NULL);
>                 lttv_process_traceset_seek_time(traceSetContext,
>         ltt_time_zero);
>         +
>         +       // Find the best refrence node and remove the useless
>         sunchronization
>         +       if (syncState->reductionModule->preProcessReduction != NULL)
>         +
>         syncState->reductionModule->preProcessReduction(syncState);
>
>                 // Obtain, reduce, adjust and set correction factors
>                 allFactors=
>         syncState->processingModule->finalizeProcessing(syncState);
>         diff --git a/lttv/lttv/sync/sync_chain_unittest.c
>         b/lttv/lttv/sync/sync_chain_unittest.c
>         index 40302a0..e76fa78 100644
>         --- a/lttv/lttv/sync/sync_chain_unittest.c
>         +++ b/lttv/lttv/sync/sync_chain_unittest.c
>         @@ -42,6 +42,7 @@
>           #include "event_analysis_linreg.h"
>           #include "event_analysis_eval.h"
>           #include "factor_reduction_accuracy.h"
>         +#include "factor_reduction_time.h"
>           #include "sync_chain.h"
>
>
>         @@ -134,6 +135,7 @@ int main(const int argc, char* const argv[])
>                 registerAnalysisEval();
>
>                 registerReductionAccuracy();
>         +       registerReductionTime();
>
>                 // Initialize data structures
>                 syncState= malloc(sizeof(SyncState));
>
>
>     _______________________________________________
>     ltt-dev mailing list
>     ltt-dev at lists.casi.polymtl.ca <mailto:ltt-dev at lists.casi.polymtl.ca>
>     http://lists.casi.polymtl.ca/cgi-bin/mailman/listinfo/ltt-dev
>
>




More information about the lttng-dev mailing list