[ltt-dev] [PATCH] Add time reduction option to synchronisation

Masoume Jabbarifar jabbarifar at gmail.com
Wed Sep 22 19:38:39 EDT 2010


On Mon, Sep 20, 2010 at 8:49 PM, Benjamin Poirier <
benjamin.poirier at polymtl.ca> wrote:

> Some introduction about the synchronization code and the benefits of
> modules:
> The synchronization framework consists of a group of modules that
> interchangeably plug in to a central "synchronization chain" concept. This
> chain performs the different steps needed to synchronize the time between a
> group of traces: read the events from different traces, match them together,
> analyze them to come up with correction factors and finally reduce the
> potentially large number of factors to a minimal set. Each step of the
> synchronization chain is carefully kept independent of the other steps. This
> allows, for example, the accuracy analysis code to work off of UDP events
> from lttng traces or simulated TCP events from text files. Any appropriate
> module can be plugged in at any step because 1) each module is kept
> independent of other modules, 2) the core is kept independent of any
> specific module, 3) each module performs a single, specific task.
>

Thank you for your introduction about your code, it can be helpful to whom
is not familiar with it.
A few months ago, we had a meeting together and we discuss about your code.
There was some redundant codes in Convex-hull and Linear Regression. This
redundant code converted factors after completing synchronization and found
time based on reference time. So you decided to put this code in a new
module that called the Reduction module.

In this module you find, first of all, reference node, factors for each pair
nodes and then best path for accuracy (and use Minimum Spanning Tree to find
minimum drifts and offsets). Finding Minimum Spanning Tree, ignoring useless
links and routing accurate path does not mean event filtering! I do
implement same concept in other way.

let’s take a look at simple example and talk more about my patch.
In following example, A, B, C and D are connected. Each number in each link
shows the number of exchanged packets.

            D ---- 40  ---- A ---- 5  ---- B
                 \               |               /
               5    \      20 |           /   10
                    \    |     /
                         C
In synchronization based on accuracy (your code), you find factors for each
pair of nodes means A-B, A-D, A-C and B-C and also D-C and synchronize them
two by two. Reduction module finds reference node (A) then checks factors
(drifts and offsets) and finds Minimum Spanning Tree based on factors
(Minimum because you need less drift and offset between two clocks therefore
more accuracy)

It means, it ignores one of three links in the triangles so we lost some
time to find factors in for ignored or useless links (A-B and D-C).

I studied your documentations and some other papers and got that if there is
more exchanged packets in a link, there is better accuracy in that link. It
means, “A” with 70 exchanged packets is reference node and B will be
synchronized with A through (B-C-A) path. Also you will not use the
calculated factors between A-B and D-C after this.

So what I did, is counting the number of packets before synchronization.To
buffer packets and count them, I need to change some parts of Matching
module. In your case, Matching module sends packet to Analysis module
directly and drops them. In my case, packets are buffered in Matching module
and sent to Reduction module to be organized.

I explained my idea in that meeting and you said I can add this to Reduction
module. It is meaningful  because user can chose the Accuracy or Time based
algorithm as user can chose Linear Regression or Convex-hull in Analysis
module.

Following figure illustrates modules briefly.


In Reduction time branch, as I discussed beforehand, the number of exchanged
packets is counted and reference node (A) is chosen based on them and
Maximum Spanning Tree is found means following tree.

            D ---- 60  ---- A              B
                               |               /
                          30 |           /   20
                               |     /
                                 C
This tree will be sent to Analysis module to synchronize nodes. Obviously,
the total synchronization time will be less than the total synchronization
time of the main tree since useless links has been removed.

Future Improvement:
Since Analysis module works on “CPU-time”  and needs just trace-number so
there is no need to send whole packet to Analysis. So a simple change is
needed in the output of Matching module and the input of Analysis module.
With this improvement, the buffering will be removed from my algorithm and
“time” branch too.

Masoume


>
>
> On 15/09/10 12:36 PM, Masoume Jabbarifar wrote:
>
>> ---
>>  lttv/lttv/Makefile.am                  |    2 +
>>  lttv/lttv/sync/Makefile.am             |    4 +-
>>  lttv/lttv/sync/data_structures.c       |    2 -
>>  lttv/lttv/sync/event_matching_tcp.c    |   87 +++++-
>>  lttv/lttv/sync/event_matching_tcp.h    |    1 +
>>  lttv/lttv/sync/event_processing_text.c |    4 +
>>  lttv/lttv/sync/factor_reduction.h      |    6 +
>>  lttv/lttv/sync/factor_reduction_time.c |  514
>> ++++++++++++++++++++++++++++++++
>>  lttv/lttv/sync/factor_reduction_time.h |   53 ++++
>>  lttv/lttv/sync/sync_chain_lttv.c       |    6 +
>>  lttv/lttv/sync/sync_chain_unittest.c   |    2 +
>>  11 files changed, 664 insertions(+), 17 deletions(-)
>>  create mode 100644 lttv/lttv/sync/factor_reduction_time.c
>>  create mode 100644 lttv/lttv/sync/factor_reduction_time.h
>>
>> diff --git a/lttv/lttv/Makefile.am b/lttv/lttv/Makefile.am
>> index 30ead55..750f2d2 100644
>> --- a/lttv/lttv/Makefile.am
>> +++ b/lttv/lttv/Makefile.am
>> @@ -87,6 +87,8 @@ lttv_real_SOURCES = \
>>        sync/factor_reduction.h\
>>        sync/factor_reduction_accuracy.c\
>>        sync/factor_reduction_accuracy.h\
>> +       sync/factor_reduction_time.c\
>> +       sync/factor_reduction_time.h\
>>        sync/lookup3.h
>>
>>  lttvinclude_HEADERS = \
>> diff --git a/lttv/lttv/sync/Makefile.am b/lttv/lttv/sync/Makefile.am
>> index e1d6775..7a5417d 100644
>> --- a/lttv/lttv/sync/Makefile.am
>> +++ b/lttv/lttv/sync/Makefile.am
>> @@ -30,4 +30,6 @@ unittest_SOURCES = \
>>        event_analysis_linreg.h\
>>        factor_reduction.h\
>>        factor_reduction_accuracy.c\
>> -       factor_reduction_accuracy.h
>> +       factor_reduction_accuracy.h\
>> +       factor_reduction_time.c\
>> +       factor_reduction_time.h
>> diff --git a/lttv/lttv/sync/data_structures.c
>> b/lttv/lttv/sync/data_structures.c
>> index acac9d7..c5fe736 100644
>> --- a/lttv/lttv/sync/data_structures.c
>> +++ b/lttv/lttv/sync/data_structures.c
>> @@ -271,8 +271,6 @@ void destroyTCPSegment(Message* const segment)
>>  {
>>        TCPEvent* inE, *outE;
>>
>> -       segment->print(segment);
>> -
>>        g_assert(segment->inE != NULL&&  segment->outE != NULL);
>>        g_assert(segment->inE->type == TCP&&  segment->outE->type == TCP);
>>        inE= segment->inE->event.tcpEvent;
>> diff --git a/lttv/lttv/sync/event_matching_tcp.c
>> b/lttv/lttv/sync/event_matching_tcp.c
>> index 90d6c43..678a334 100644
>> --- a/lttv/lttv/sync/event_matching_tcp.c
>> +++ b/lttv/lttv/sync/event_matching_tcp.c
>> @@ -56,7 +56,8 @@ static void buildReversedConnectionKey(ConnectionKey*
>> const
>>  static void openGraphDataFiles(SyncState* const syncState);
>>  static void closeGraphDataFiles(SyncState* const syncState);
>>  static void writeMessagePoint(FILE* stream, const Message* const
>> message);
>> -
>> +static void gfPacketDestroy(gpointer data, gpointer userData);
>> +static void gfExchangeDestroy(gpointer data, gpointer userData);
>>
>>  static MatchingModule matchingModuleTCP = {
>>        .name= "TCP",
>> @@ -101,6 +102,7 @@ void registerMatchingTCP()
>>  static void initMatchingTCP(SyncState* const syncState)
>>  {
>>        MatchingDataTCP* matchingData;
>> +       int i, j;
>>
>>        matchingData= malloc(sizeof(MatchingDataTCP));
>>        syncState->matchingData= matchingData;
>> @@ -113,8 +115,7 @@ static void initMatchingTCP(SyncState* const
>> syncState)
>>                &gefConnectionKeyEqual,&gdnConnectionKeyDestroy,
>>                &gdnTCPSegmentListDestroy);
>>
>> -       if (syncState->stats)
>> -       {
>> +       if (syncState->reductionModule->preProcessReduction != NULL ||
>> syncState->stats) {
>>                unsigned int i;
>>
>>                matchingData->stats= calloc(1, sizeof(MatchingStatsTCP));
>> @@ -139,6 +140,20 @@ static void initMatchingTCP(SyncState* const
>> syncState)
>>        {
>>                matchingData->messagePoints= NULL;
>>        }
>> +       if (syncState->reductionModule->preProcessReduction != NULL) {
>> +               matchingData->packetArray= malloc(syncState->traceNb *
>> sizeof(GQueue**));
>> +               for (i= 0; i<  syncState->traceNb; i++) {
>> +                       matchingData->packetArray[i]=
>> malloc(syncState->traceNb * sizeof(GQueue*));
>> +
>> +                       for (j= 0; j<  syncState->traceNb; j++) {
>> +                               matchingData->packetArray[i][j]=
>> g_queue_new();
>> +                       }
>> +               }
>> +       }
>> +       else
>> +       {
>> +               matchingData->packetArray= NULL;
>> +       }
>>  }
>>
>>
>> @@ -155,6 +170,7 @@ static void initMatchingTCP(SyncState* const
>> syncState)
>>  static void destroyMatchingTCP(SyncState* const syncState)
>>  {
>>        MatchingDataTCP* matchingData;
>> +       int i, j;
>>
>>        matchingData= (MatchingDataTCP*) syncState->matchingData;
>>
>> @@ -165,8 +181,7 @@ static void destroyMatchingTCP(SyncState* const
>> syncState)
>>
>>        partialDestroyMatchingTCP(syncState);
>>
>> -       if (syncState->stats)
>> -       {
>> +       if (syncState->reductionModule->preProcessReduction != NULL ||
>> syncState->stats) {
>>                unsigned int i;
>>
>>                for (i= 0; i<  syncState->traceNb; i++)
>> @@ -179,6 +194,17 @@ static void destroyMatchingTCP(SyncState* const
>> syncState)
>>
>>        free(syncState->matchingData);
>>        syncState->matchingData= NULL;
>> +       if (syncState->reductionModule->preProcessReduction != NULL) {
>> +                       for (i= 0; i<  syncState->traceNb; i++) {
>> +                               for (j= 0; j<  syncState->traceNb; j++)
>> +                                       if
>> (syncState->analysisModule->analyzeMessage != NULL)
>> +
>> g_queue_foreach(matchingData->packetArray[i][j], gfPacketDestroy, NULL);
>> +                                       else
>> +
>> g_queue_foreach(matchingData->packetArray[i][j], gfExchangeDestroy, NULL);
>> +                               free(matchingData->packetArray[i]);
>> +                       }
>> +                       free(matchingData->packetArray);
>> +               }
>>  }
>>
>>
>> @@ -335,6 +361,7 @@ static void matchEvents(SyncState* const syncState,
>> Event* const event,
>>        Message* packet;
>>        MatchingDataTCP* matchingData;
>>        GQueue* conUnAcked;
>> +       GQueue* packetMatching;
>>
>>        matchingData= (MatchingDataTCP*) syncState->matchingData;
>>
>> @@ -354,10 +381,11 @@ static void matchEvents(SyncState* const syncState,
>> Event* const event,
>>                packet->outE->event.tcpEvent->segmentKey=
>> packet->inE->event.tcpEvent->segmentKey;
>>
>>                if (syncState->stats)
>> -               {
>>                        matchingData->stats->totPacket++;
>> +
>> +               if (syncState->reductionModule->preProcessReduction !=
>> NULL || syncState->stats)
>>
>>  matchingData->stats->totMessageArray[packet->inE->traceNum][packet->outE->traceNum]++;
>> -               }
>> +
>>
>>                // Discard loopback traffic
>>                if (packet->inE->traceNum == packet->outE->traceNum)
>> @@ -371,17 +399,24 @@ static void matchEvents(SyncState* const syncState,
>> Event* const event,
>>
>>  writeMessagePoint(matchingData->messagePoints[packet->inE->traceNum][packet->outE->traceNum],
>>                                packet);
>>                }
>> -
>> -               if (syncState->analysisModule->analyzeMessage != NULL)
>> -               {
>> -
>> syncState->analysisModule->analyzeMessage(syncState, packet);
>> +               if (syncState->analysisModule->analyzeMessage != NULL) {
>> +                       if
>> (syncState->reductionModule->preProcessReduction == NULL)
>> +
>> syncState->analysisModule->analyzeMessage(syncState, packet);
>>
>> +                       else {
>> +                               packetMatching=
>> +
>> matchingData->packetArray[packet->inE->traceNum][packet->outE->traceNum];
>> +                               g_queue_push_tail(packetMatching, packet);
>> +                       }
>> +
>>                }
>>
>>                // We can skip the rest of the algorithm if the analysis
>> module is not
>>                // interested in exchanges
>>                if (syncState->analysisModule->analyzeExchange == NULL)
>>                {
>> -                       destroyTCPSegment(packet);
>> +                       if
>> (syncState->reductionModule->preProcessReduction == NULL)
>>
>> +                               destroyTCPSegment(packet);
>> +
>>                        return;
>>                }
>>
>> @@ -452,8 +487,14 @@ static void matchEvents(SyncState* const syncState,
>> Event* const event,
>>
>>  matchingData->stats->totExchangeSync++;
>>                                                }
>>
>> -
>> syncState->analysisModule->analyzeExchange(syncState,
>> -                                                       exchange);
>> +                                               if
>> (syncState->reductionModule->preProcessReduction == NULL)
>> +
>> syncState->analysisModule->analyzeExchange(syncState, exchange);
>> +                                               else
>> +                                               {
>> +                                                       packetMatching=
>> +
>> matchingData->packetArray[packet->inE->traceNum][packet->outE->traceNum];
>> +
>> g_queue_push_tail(packetMatching, exchange);
>> +                                               }
>>                                        }
>>
>>                                        exchange->message= NULL;
>> @@ -707,3 +748,21 @@ static void
>> writeMatchingGraphsPlotsTCPMessages(SyncState* const syncState,
>>                        "title \"Received messages\" with points linetype 4
>> "
>>                        "linecolor rgb \"#6699cc\" pointtype 11 pointsize
>> 2, \\\n", i, j);
>>  }
>> +static void gfPacketDestroy(gpointer data, gpointer userData)
>> +{
>> +       Message* packet;
>> +
>> +       packet= (Message*) data;
>> +       destroyTCPSegment(packet);
>> +
>> +}
>> +
>> +static void gfExchangeDestroy(gpointer data, gpointer userData)
>> +{
>> +       Exchange* exchange;
>> +
>> +       exchange= (Exchange*) data;
>> +       exchange->message= NULL;
>> +       destroyTCPExchange(exchange);
>> +
>> +}
>> diff --git a/lttv/lttv/sync/event_matching_tcp.h
>> b/lttv/lttv/sync/event_matching_tcp.h
>> index 6f9b072..e7d1d12 100644
>> --- a/lttv/lttv/sync/event_matching_tcp.h
>> +++ b/lttv/lttv/sync/event_matching_tcp.h
>> @@ -57,6 +57,7 @@ typedef struct
>>         * The elements on the diagonal are not initialized.
>>         */
>>        FILE*** messagePoints;
>> +       GQueue*** packetArray;
>>  } MatchingDataTCP;
>>
>>  void registerMatchingTCP();
>> diff --git a/lttv/lttv/sync/event_processing_text.c
>> b/lttv/lttv/sync/event_processing_text.c
>> index bcbea9b..894f71c 100644
>> --- a/lttv/lttv/sync/event_processing_text.c
>> +++ b/lttv/lttv/sync/event_processing_text.c
>> @@ -273,6 +273,10 @@ static AllFactors* finalizeProcessingText(SyncState*
>> const syncState)
>>                free(line);
>>        }
>>
>> +       if (syncState->reductionModule->preProcessReduction != NULL) {
>> +
>> syncState->reductionModule->preProcessReduction(syncState);
>> +       }
>> +
>>        factors= syncState->matchingModule->finalizeMatching(syncState);
>>        if (syncState->stats)
>>        {
>> diff --git a/lttv/lttv/sync/factor_reduction.h
>> b/lttv/lttv/sync/factor_reduction.h
>> index 561df6b..e971ea6 100644
>> --- a/lttv/lttv/sync/factor_reduction.h
>> +++ b/lttv/lttv/sync/factor_reduction.h
>> @@ -39,6 +39,12 @@ typedef struct
>>         */
>>        void (*destroyReduction)(struct _SyncState* const syncState);
>>
>> +       /* This function is called when time reduction is needed and
>> +        * removes useless communication links and finds Spanning Tree
>> +        * of nodes in the network and then sends packets to
>> synchronization
>> +        */
>> +       void (*preProcessReduction)(struct _SyncState* const syncState);
>> +
>>        /*
>>         * Convert trace pair synchronization factors to a resulting offset
>> and
>>         * drift for each trace.
>> diff --git a/lttv/lttv/sync/factor_reduction_time.c
>> b/lttv/lttv/sync/factor_reduction_time.c
>> new file mode 100644
>> index 0000000..2bd37bf
>> --- /dev/null
>> +++ b/lttv/lttv/sync/factor_reduction_time.c
>> @@ -0,0 +1,514 @@
>> +/* This file is part of the Linux Trace Toolkit viewer
>> + * Copyright (C) 2010 Masoume Jabbarifar<masoume.jabbarifar at polymtl.ca>
>> + *
>> + * This program is free software: you can redistribute it and/or modify
>> it
>> + * under the terms of the GNU Lesser General Public License as published
>> by
>> + * the Free Software Foundation, either version 2.1 of the License, or
>> (at
>> + * your option) any later version.
>> + *
>> + * This program is distributed in the hope that it will be useful, but
>> WITHOUT
>> + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
>> + * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
>> + * License for more details.
>> + *
>> + * You should have received a copy of the GNU Lesser General Public
>> License
>> + * along with this program.  If not, see<http://www.gnu.org/licenses/>.
>> + */
>> +#define _ISOC99_SOURCE
>> +
>> +#ifdef HAVE_CONFIG_H
>> +#include<config.h>
>> +#endif
>> +
>> +#include<math.h>
>> +#include<errno.h>
>> +#include<stdlib.h>
>> +#include<string.h>
>> +#include<unistd.h>
>> +
>> +
>> +#include "event_analysis.h"
>> +#include "event_matching.h"
>> +#include "sync_chain.h"
>> +
>> +#include "factor_reduction_time.h"
>> +#include "event_matching_tcp.h"
>> +
>> +
>> +/* Functions common to all reduction modules */
>> +static void initReductionTime(SyncState* const syncState);
>> +static void destroyReductionTime(SyncState* const syncState);
>> +static void preProcessReductionTime(SyncState* const syncState);
>> +
>> +static GArray* finalizeReductionTime(SyncState* const syncState,
>> +       AllFactors* allFactors);
>> +static void printReductionStatsTime(SyncState* const syncState);
>> +
>> +/* Functions specific to this module */
>> +static void getFactors(SyncState* const syncState,AllFactors* const
>> allFactors,
>> +       const unsigned int traceNum,
>> +       Factors* const factors);
>> +static void maximumSpanningTree(SyncState* const syncState);
>> +static void maximumCommunicationPacket(SyncState* const syncState);
>> +static void floodingSyncExecute(SyncState* const syncState);
>> +void updateDistances(int target,ReductionData* reductionData,SyncState*
>> const syncState);
>> +void sumCommunicationPackets(SyncState* const syncState);
>> +void proposeRefNode(int** connectionArray, int* refNodes, int n);
>> +void SyncExecute(int refNode,SyncState* const syncState);
>> +int findParent(SyncState* const syncState, int root, int node);
>> +
>> +
>> +static ReductionModule reductionModuleTime= {
>> +       .name= "time",
>> +       .initReduction=&initReductionTime,
>> +       .destroyReduction=&destroyReductionTime,
>> +       .preProcessReduction=&preProcessReductionTime,
>> +       .finalizeReduction=&finalizeReductionTime,
>> +       .printReductionStats=&printReductionStatsTime,
>> +       .graphFunctions= {},
>> +};
>> +
>> +
>> +/*
>> + * Reduction module registering function
>> + */
>> +void registerReductionTime()
>> +{
>> +       g_queue_push_tail(&reductionModules,&reductionModuleTime);
>> +}
>> +
>> +
>> +/*
>> + * Reduction init function
>> + *
>> + * This function is called at the beginning of a synchronization run for
>> a set
>> + * of traces.
>> + *
>> + * Allocate some reduction specific data structures
>> + *
>> + * Args:
>> + *   syncState     container for synchronization data.
>> + */
>> +static void initReductionTime(SyncState* const syncState)
>> +{
>> +       int i;
>> +
>> +       ReductionData* reductionData;
>> +
>> +       reductionData= malloc(sizeof(ReductionData));
>> +       syncState->reductionData= reductionData;
>> +
>> +
>> +       reductionData->routed= malloc(syncState->traceNb * sizeof(char*));
>> +
>> +       reductionData->distance= malloc(syncState->traceNb *
>> sizeof(int*));
>> +
>> +       reductionData->neighbour= malloc(syncState->traceNb *
>> sizeof(int*));
>> +
>> +       reductionData->totMessageArray= malloc(syncState->traceNb *
>> sizeof(unsigned int*));
>> +
>> +       for (i= 0; i<  syncState->traceNb; i++) {
>> +               reductionData->totMessageArray[i]=
>> +                       calloc(syncState->traceNb, sizeof(unsigned int));
>> +       }
>> +
>> +       reductionData->totMSTMessageArray= malloc(syncState->traceNb *
>> sizeof(unsigned int*));
>> +
>> +       for (i= 0; i<  syncState->traceNb; i++) {
>> +               reductionData->totMSTMessageArray[i]=
>> +                       calloc(syncState->traceNb, sizeof(unsigned int));
>> +       }
>> +
>> +       reductionData->totMSTAnalysisArray= malloc(syncState->traceNb *
>> sizeof(unsigned int*));
>> +
>> +       for (i= 0; i<  syncState->traceNb; i++) {
>> +               reductionData->totMSTAnalysisArray[i]=
>> +                       calloc(syncState->traceNb, sizeof(unsigned int));
>> +       }
>> +
>> +       reductionData->maxRoot= malloc(syncState->traceNb * sizeof(int*));
>> +       reductionData->maxRootMST= malloc(syncState->traceNb *
>> sizeof(int*));
>> +}
>> +
>> +static void preProcessReductionTime(SyncState* const syncState)
>> +{
>> +
>> +       sumCommunicationPackets(syncState);
>> +       maximumCommunicationPacket(syncState);
>> +       maximumSpanningTree(syncState);
>> +       floodingSyncExecute(syncState);
>> +}
>> +/* Finding best path in the network that has more interaction among the
>> + * nodes and synchronisation trough this path
>> + *
>> + * Maximum Spanning Tree function based on Prim's Algorithm
>> + *
>> + * The algorithm is implemented according to the code here:
>> + * http://snippets.dzone.com/posts/show/4743
>> + *
>> +*/
>> +static void maximumSpanningTree(SyncState* const syncState)
>> +{
>> +       int i, j;
>> +       int total= 0;
>> +       int treeSize, max;
>> +       ReductionData* reductionData;
>> +
>> +       reductionData= syncState->reductionData;
>> +
>> +       /* Initialise distance with 0 */
>> +       for (i= 0; i<  syncState->traceNb ; ++i)
>> +               reductionData->distance[i]= 0;
>> +
>> +       /* Mark all nodes as NOT beeing in the maximum spanning tree */
>> +       for (i= 0; i<  syncState->traceNb; ++i)
>> +               reductionData->routed[i]= 0;
>> +
>> +       /* Add the first node to the tree */
>> +       if (reductionData->referenceNode<  0) return;
>> +       g_debug("Adding node %d\n", reductionData->referenceNode);
>> +       reductionData->routed[reductionData->referenceNode]= 1;
>> +       updateDistances(reductionData->referenceNode, reductionData,
>> syncState);
>> +
>> +       for (treeSize= 1; treeSize<  syncState->traceNb; ++treeSize) {
>> +               /* Find the node with the bigest distance to the tree */
>> +               max= -1;
>> +               for (i= 0; i<  syncState->traceNb; ++i)
>> +                       if (!reductionData->routed[i])
>> +                               if ((max == -1) ||
>> (reductionData->distance[max]<  reductionData->distance[i]))
>> +                                       max= i;
>> +
>> +               /* And add it */
>> +
>> reductionData->totMSTMessageArray[max][reductionData->neighbour[max]]=
>> reductionData->distance[max];
>> +
>> reductionData->totMSTMessageArray[reductionData->neighbour[max]][max]=
>> reductionData->distance[max];
>> +
>> +
>> reductionData->totMSTAnalysisArray[max][reductionData->neighbour[max]]=
>> reductionData->distance[max];
>> +
>> reductionData->totMSTAnalysisArray[reductionData->neighbour[max]][max]=
>> reductionData->distance[max];
>> +
>> +               g_debug("Adding edge %i-MAX(%i)-D(%i)\n",
>> reductionData->neighbour[max], max, reductionData->distance[max]);
>> +               reductionData->routed[max]= 1;
>> +               total+= reductionData->distance[max];
>> +
>> +               updateDistances(max, reductionData, syncState);
>> +       }
>> +
>> +
>> +       for ( i=0 ; i<  syncState->traceNb ; i++) {
>> +               for (j= 0 ; j<  syncState->traceNb ; j++)
>> +                       g_debug("\t%d ",
>> reductionData->totMSTMessageArray[i][j]);
>> +       g_debug("\n");
>> +       }
>> +       g_debug("Total distance: %d\n", total);
>> +
>> +}
>> +
>> +
>> +void updateDistances(int target, ReductionData* reductionData, SyncState*
>> const syncState) {
>> +
>> +       int i;
>> +       g_debug("Update[%i]= ", target);
>> +       for (i= 0; i<  syncState->traceNb; ++i) {
>> +               if ((reductionData->totMessageArray[target][i] != 0)&&
>>  (reductionData->distance[i]<= reductionData->totMessageArray[target][i])) {
>> +                       if (reductionData->distance[i] !=
>> reductionData->totMessageArray[target][i]) {
>> +                               reductionData->distance[i]=
>> reductionData->totMessageArray[target][i];
>> +                               reductionData->neighbour[i]= target;
>> +                       }
>> +                       else if (reductionData->distance[i] ==
>> reductionData->totMessageArray[target][i]&&  target ==
>> reductionData->referenceNode) {
>> +                               reductionData->distance[i]=
>> reductionData->totMessageArray[target][i];
>> +                               reductionData->neighbour[i]= target;
>> +                       }
>> +               }
>> +               g_debug("%iD(%i)N(%i)", i, reductionData->distance[i],
>> reductionData->neighbour[i]);
>> +        }
>> +
>> g_debug("\n------------------------------------------------------------\n");
>> +}
>> +
>> +
>> +static void maximumCommunicationPacket(SyncState* const syncState)
>> +{
>> +
>> +       ReductionData* reductionData;
>> +
>> +       reductionData= syncState->reductionData;
>> +       proposeRefNode(reductionData->totMessageArray,
>> reductionData->maxRoot, syncState->traceNb);
>> +       reductionData->referenceNode= reductionData->maxRoot[0];
>> +       g_debug("Best RefNODE is : %i\n", reductionData->referenceNode);
>> +
>> +}
>> +
>> +/* Symmetrical matrix of packet communications */
>> +
>> +void sumCommunicationPackets(SyncState* const syncState) {
>> +       int i, j, sum;
>> +
>> +       MatchingDataTCP* matchingData;
>> +
>> +       matchingData= syncState->matchingData;
>> +
>> +       ReductionData* reductionData;
>> +
>> +       reductionData= syncState->reductionData;
>> +
>> +
>> +       for (i= 0; i<  syncState->traceNb; ++i)
>> +               for (j= 0; j<  syncState->traceNb; ++j){
>> +                       sum= matchingData->stats->totMessageArray[i][j] +
>> matchingData->stats->totMessageArray[j][i];
>> +                       reductionData->totMessageArray[i][j]= sum;
>> +                       reductionData->totMessageArray[j][i]= sum;
>> +               }
>> +
>> +}
>> +
>> +/* Finding the node who has more interaction */
>> +
>> +void proposeRefNode(int** connectionArray, int* refNodes, int n){
>> +
>> +       int* sumPacket;
>> +       int i, j, index=0;
>> +       int maxSumPacket=0;
>> +
>> +       sumPacket= malloc(n* sizeof(int));
>> +
>> +       for (i= 0; i<  n; ++i) {
>> +               sumPacket[i]= 0;
>> +               for (j= 0;j<  n; ++j)
>> +                       sumPacket[i]+= connectionArray[i][j];
>> +               g_debug("sum(%i)=%i\n", i, sumPacket[i]);
>> +               if (sumPacket[i]>  maxSumPacket)
>> +                       maxSumPacket= sumPacket[i];
>> +       }
>> +       g_debug("proposed Reference Node: ");
>> +       for (i= 0 ; i<  n ; ++i) {
>> +               if (sumPacket[i] == maxSumPacket) {
>> +                       refNodes[index++]= i;
>> +                       g_debug("%i \t", i);
>> +               }
>> +       }
>> +       refNodes[index]= -1;
>> +       g_debug("\n");
>> +       free(sumPacket);
>> +}
>> +
>> +void floodingSyncExecute(SyncState* const syncState) {
>> +
>> +       ReductionData* reductionData;
>> +       reductionData= syncState->reductionData;
>> +       SyncExecute(reductionData->referenceNode, syncState);
>> +
>> +}
>> +
>> +
>> +/* First root and all of nodes who has connection with it (it's childs)
>> + * must be synchronised then function will be run for the childs
>> recursively
>> + */
>> +
>> +void SyncExecute(int refNode, SyncState* const syncState) {
>> +
>> +       unsigned int i, j;
>> +       GQueue* packetSync;
>> +       Message* packet;
>> +       Exchange* exchange;
>> +
>> +       ReductionData* reductionData;
>> +
>> +       reductionData= syncState->reductionData;
>> +
>> +       MatchingDataTCP* matchingData;
>> +
>> +       matchingData= syncState->matchingData;
>> +
>> +       for (i= 0 ; i<  syncState->traceNb ; ++i)
>> +               if (reductionData->totMSTMessageArray[refNode][i] != 0){
>> +                       packetSync= matchingData->packetArray[refNode][i];
>> +
>> +                       for (j= 0 ; j<  packetSync->length ; ++j){
>> +                               if
>> (syncState->analysisModule->analyzeMessage != NULL) {
>> +                                       packet=
>> g_queue_peek_nth(packetSync, j);
>> +
>> syncState->analysisModule->analyzeMessage(syncState, packet);
>> +                               }
>> +                               else {
>>
>> +                                       exchange=
>> g_queue_peek_nth(packetSync, j);
>> +
>> syncState->analysisModule->analyzeExchange(syncState, exchange);
>> +                               }
>> +                       }
>> +
>> +                       packetSync= matchingData->packetArray[i][refNode];
>> +
>> +                       for (j= 0 ;j<  packetSync->length ; ++j){
>> +                               if
>> (syncState->analysisModule->analyzeMessage != NULL) {
>> +                                       packet=
>> g_queue_peek_nth(packetSync, j);
>> +
>> syncState->analysisModule->analyzeMessage(syncState, packet);
>> +                               }
>> +                               else {
>> +                                       exchange=
>> g_queue_peek_nth(packetSync, j);
>> +
>> syncState->analysisModule->analyzeExchange(syncState, exchange);
>> +                               }
>> +                       }
>> +
>> +                       reductionData->totMSTMessageArray[i][refNode]= 0;
>> +                       reductionData->totMSTMessageArray[refNode][i]= 0;
>> +                       SyncExecute(i, syncState);
>> +               }
>> +       return;
>> +}
>> +/*
>> + * Reduction destroy function
>> + *
>> + * Free the analysis specific data structures
>> + *
>> + * Args:
>> + *   syncState     container for synchronization data.
>> + */
>> +static void destroyReductionTime(SyncState* const syncState)
>> +{
>> +       int i;
>> +       ReductionData* reductionData;
>> +       reductionData= (ReductionData*) syncState->reductionData;
>> +
>> +       if (reductionData == NULL) return;
>> +
>> +       for (i= 0; i<  syncState->traceNb; i++) {
>> +               free(reductionData->totMessageArray[i]);
>> +               free(reductionData->totMSTMessageArray[i]);
>> +               free(reductionData->totMSTAnalysisArray[i]);
>> +       }
>> +       free(reductionData->totMessageArray);
>> +       free(reductionData->totMSTMessageArray);
>> +       free(reductionData->totMSTAnalysisArray);
>> +       free(reductionData->routed);
>> +       free(reductionData->distance);
>> +       free(reductionData->neighbour);
>> +       free(reductionData->maxRoot);
>> +       free(reductionData->maxRootMST);
>> +       free(syncState->reductionData);
>> +       syncState->reductionData= NULL;
>> +}
>> +
>> +
>> +/*
>> + * Finalize the factor reduction
>> + *
>> + * Calculate a resulting offset and drift for each trace.
>> + *
>> + * Args:
>> + *   syncState     container for synchronization data.
>> + *   allFactors    offset and drift between each pair of traces
>> + *
>> + * Returns:
>> + *   Factors[traceNb] synchronization factors for each trace
>> +
>> + */
>> +static GArray* finalizeReductionTime(SyncState* const syncState,
>> +       AllFactors* allFactors)
>> +{
>> +       int i, j;
>> +       GArray* factors;
>> +
>> +       factors= g_array_sized_new(FALSE, FALSE, sizeof(Factors),
>> +               syncState->traceNb);
>> +       g_array_set_size(factors, syncState->traceNb);
>> +       for (i= 0; i<  syncState->traceNb; i++) {
>> +               getFactors(syncState, allFactors,
>> i,&g_array_index(factors,
>> +                       Factors, i));
>> +       }
>> +
>> +       return factors;
>> +}
>> +
>> +
>> +/*
>> + * Print statistics related to reduction. Must be called after
>> + * finalizeReduction.
>> + *
>> + * Args:
>> + *   syncState     container for synchronization data.
>> + */
>> +static void printReductionStatsTime(SyncState* const syncState)
>> +{
>> +}
>> +
>> +/*
>> + * Cummulate the time correction factors to convert a node's time to its
>> + * reference's time.
>> + * This function recursively calls itself until it reaches the reference
>> node.
>> + *
>> + * Args:
>> + *   allFactors:   offset and drift between each pair of traces
>> + *   predecessors: matrix of each node's predecessor on the shortest
>> + *                 path between two nodes
>> + *   references:   reference node for each node
>> + *   traceNum:     node for which to find the factors
>> + *   factors:      resulting factors
>> + */
>> +static void getFactors(SyncState* const syncState, AllFactors* const
>> allFactors, const unsigned int traceNum,
>> +       Factors* const factors)
>> +{
>> +       unsigned int reference;
>> +       PairFactors** const pairFactors= allFactors->pairFactors;
>> +       int parent, i;
>> +
>> +       ReductionData* reductionData;
>> +       reductionData= syncState->reductionData;
>> +
>> +       if (traceNum == reductionData->referenceNode)
>> +               reference= traceNum;
>> +       else if
>> (reductionData->totMSTAnalysisArray[reductionData->referenceNode][traceNum]
>> != 0)
>> +               reference= reductionData->referenceNode;
>> +       else
>> +               reference= findParent(syncState, -1, traceNum);
>> +
>> +       if (reference == traceNum) {
>> +               factors->offset= 0.;
>> +               factors->drift= 1.;
>> +       }
>> +       else {
>> +               Factors previousVertexFactors;
>> +
>> +               getFactors(syncState, allFactors,
>> reference,&previousVertexFactors);
>> +
>> +               /* Convert the time from traceNum to reference;
>> +                * pairFactors[row][col] converts the time from col to
>> row, invert the
>> +                * factors as necessary */
>> +
>> +               if (pairFactors[reference][traceNum].approx != NULL) {
>> +                       factors->offset= previousVertexFactors.drift *
>> +
>> pairFactors[reference][traceNum].approx->offset +
>> +                               previousVertexFactors.offset;
>> +                       factors->drift= previousVertexFactors.drift *
>> +
>> pairFactors[reference][traceNum].approx->drift;
>> +               }
>> +               else if (pairFactors[traceNum][reference].approx != NULL)
>> {
>> +                       factors->offset= previousVertexFactors.drift *
>> (-1. *
>> +
>> pairFactors[traceNum][reference].approx->offset /
>> +
>> pairFactors[traceNum][reference].approx->drift) +
>> +                               previousVertexFactors.offset;
>> +                       factors->drift= previousVertexFactors.drift * (1.
>> /
>> +
>> pairFactors[traceNum][reference].approx->drift);
>> +               }
>> +               else {
>> +                       g_assert_not_reached();
>> +               }
>> +       }
>> +}
>> +
>> +int findParent(SyncState* const syncState, int root, int node)
>> +{
>> +
>> +       int i;
>> +       int result;
>> +       ReductionData* reductionData;
>> +
>> +       reductionData= syncState->reductionData;
>> +       for (i= 0; i<  syncState->traceNb; i++)
>> +               if (reductionData->totMSTAnalysisArray[node][i] != 0&&  i
>> == reductionData->referenceNode)
>> +                       return 1;
>> +
>> +       for (i= 0; i<  syncState->traceNb; i++)
>> +               if (reductionData->totMSTAnalysisArray[node][i] != 0)
>> +                       if (i != root) {
>> +                               result= findParent(syncState, node, i);
>> +                               if (result == 1&&  root == -1) return i;
>> +                               if (result == 1) return 1;
>> +                       }
>> +       return 0;
>> +}
>> diff --git a/lttv/lttv/sync/factor_reduction_time.h
>> b/lttv/lttv/sync/factor_reduction_time.h
>> new file mode 100644
>> index 0000000..c46cbd1
>> --- /dev/null
>> +++ b/lttv/lttv/sync/factor_reduction_time.h
>> @@ -0,0 +1,53 @@
>> +/* This file is part of the Linux Trace Toolkit viewer
>> + * Copyright (C) 2010 Masoume Jabbarifar<masoume.jabbarifar at polymtl.ca>
>> + *
>> + * This program is free software: you can redistribute it and/or modify
>> it
>> + * under the terms of the GNU Lesser General Public License as published
>> by
>> + * the Free Software Foundation, either version 2.1 of the License, or
>> (at
>> + * your option) any later version.
>> + *
>> + * This program is distributed in the hope that it will be useful, but
>> WITHOUT
>> + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
>> + * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
>> + * License for more details.
>> + *
>> + * You should have received a copy of the GNU Lesser General Public
>> License
>> + * along with this program.  If not, see<http://www.gnu.org/licenses/>.
>> + */
>> +
>> +#ifndef FACTOR_REDUCTION_TIME_H
>> +#define FACTOR_REDUCTION_TIME_H
>> +
>> +#include<glib.h>
>> +
>> +#include "data_structures.h"
>> +
>> +
>> +typedef struct {
>> +
>> +       char* routed;   /* routed[i] is 1 when we have concidered the node
>> i in the maximum
>> +                       spanning tree; otherwise it is 0 */
>> +
>> +       int* distance;  /* distance[i] is the distance between node i and
>> the maximum
>> +                       spanning tree; this is initially 0; if i is
>> +                       already in the tree, then d[i] is undefined;
>> +                       this is just a temporary variable. It's not
>> necessary but speeds
>> +                       up execution considerably (by a factor of n) */
>> +
>> +       int* neighbour; /* neighbour[i] holds the index of the node i
>> would have to be
>> +                       linked to in order to get a distance of d[i] */
>> +
>> +       int referenceNode; /*referenceNode is one of networks node whose
>> time will be
>> +                          considered as reference time*/
>> +
>> +       unsigned int** totMessageArray;
>> +       unsigned int** totMSTMessageArray;  /*Maximum spanning tree is
>> saved in totMSTMessageArray*/
>> +       unsigned int** totMSTAnalysisArray;
>> +       int* maxRoot;
>> +       int* maxRootMST;
>> +
>> +} ReductionData;
>> +
>> +void registerReductionTime();
>> +
>> +#endif
>> diff --git a/lttv/lttv/sync/sync_chain_lttv.c
>> b/lttv/lttv/sync/sync_chain_lttv.c
>> index 95bef44..c60e8ac 100644
>> --- a/lttv/lttv/sync/sync_chain_lttv.c
>> +++ b/lttv/lttv/sync/sync_chain_lttv.c
>> @@ -45,6 +45,7 @@
>>  #include "event_analysis_linreg.h"
>>  #include "event_analysis_eval.h"
>>  #include "factor_reduction_accuracy.h"
>> +#include "factor_reduction_time.h"
>>  #include "sync_chain.h"
>>  #include "sync_chain_lttv.h"
>>
>> @@ -139,6 +140,7 @@ static void init()
>>        registerAnalysisEval();
>>
>>        registerReductionAccuracy();
>> +       registerReductionTime();
>>
>>        // Build module names lists for option and help string
>>        for (i= 0; i<  ARRAY_SIZE(loopValues); i++)
>> @@ -313,6 +315,10 @@ bool syncTraceset(LttvTracesetContext* const
>> traceSetContext)
>>        lttv_process_traceset_middle(traceSetContext, ltt_time_infinite,
>>                G_MAXULONG, NULL);
>>        lttv_process_traceset_seek_time(traceSetContext, ltt_time_zero);
>> +
>> +       // Find the best refrence node and remove the useless
>> sunchronization
>> +       if (syncState->reductionModule->preProcessReduction != NULL)
>> +
>> syncState->reductionModule->preProcessReduction(syncState);
>>
>>        // Obtain, reduce, adjust and set correction factors
>>        allFactors=
>> syncState->processingModule->finalizeProcessing(syncState);
>> diff --git a/lttv/lttv/sync/sync_chain_unittest.c
>> b/lttv/lttv/sync/sync_chain_unittest.c
>> index 40302a0..e76fa78 100644
>> --- a/lttv/lttv/sync/sync_chain_unittest.c
>> +++ b/lttv/lttv/sync/sync_chain_unittest.c
>> @@ -42,6 +42,7 @@
>>  #include "event_analysis_linreg.h"
>>  #include "event_analysis_eval.h"
>>  #include "factor_reduction_accuracy.h"
>> +#include "factor_reduction_time.h"
>>  #include "sync_chain.h"
>>
>>
>> @@ -134,6 +135,7 @@ int main(const int argc, char* const argv[])
>>        registerAnalysisEval();
>>
>>        registerReductionAccuracy();
>> +       registerReductionTime();
>>
>>        // Initialize data structures
>>        syncState= malloc(sizeof(SyncState));
>>
>
> _______________________________________________
> ltt-dev mailing list
> ltt-dev at lists.casi.polymtl.ca
> http://lists.casi.polymtl.ca/cgi-bin/mailman/listinfo/ltt-dev
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.casi.polymtl.ca/pipermail/lttng-dev/attachments/20100922/c2a1a06c/attachment-0003.htm>


More information about the lttng-dev mailing list