[ltt-dev] LTTng relay model

Mathieu Desnoyers compudj at krystal.dyndns.org
Thu Jun 26 13:19:03 EDT 2008


* Paul McKenney (Paul.McKenney at us.ibm.com) wrote:
> Mathieu Desnoyers <compudj at krystal.dyndns.org> wrote on 06/26/2008 
> 06:23:27 AM:
> > Hi everyone,
> > 
> > I just released LTTng 0.10-pre56 for the 2.6.26-rc8 kernel. I try to
> > keep the changes in this "stable" version to the minimum level. The
> > following changes about to come to the development tree will include :
> > 
> > * Basic user-space tracing
> > * Fix resulting of formal verification of ltt-tracer.c A rare buffer
> >   corruption scenario (which never happens with large buffers) has been
> >   identified. If some of you are interested to review the Promela models
> >   I created, I could post them here. I used the "spin" verification
> >   tool, following http://lwn.net/Articles/243851/ (very interesting,
> >   with real-life examples of conversion from C code to Promela. Article
> >   from Paul McKenney)
> 
> I would be interested in at least looking at your model.  ;-)
> 
>                                                 Thanx, Paul

Here it is.

The first model is the correct one. Follows the flawed model (current
implementation). The first model takes about 600MB or ram to verify. I
am ordering a machine with 16GB ram to play a bit more with formal
verification this summer. Comments are welcome.

file: buffer.spin


// LTTng ltt-tracer.c atomic lockless buffering scheme Promela model v1
// Created for the Spin validator.
// Mathieu Desnoyers <mathieu.desnoyers at polymtl.ca>
// June 2008

// TODO : create test cases that will generate an overflow on the offset and
// counter type. Counter types smaller than a byte should be used.

// Promela only has unsigned char, no signed char.
// Because detection of difference < 0 depends on a signed type, but we want
// compactness, check also for the values being higher than half of the unsigned
// char range (and consider them negative). The model, by design, does not use
// offsets or counts higher than 127 because we would then have to use a larger
// type (short or int).
#define HALF_UCHAR (255/2)

// NUMPROCS 4 : causes event loss with some reader timings.
// e.g. 3 events, 1 switch, 1 event (lost, buffer full), read 1 subbuffer
#define NUMPROCS 4

// NUMPROCS 3 : does not cause event loss because buffers are big enough.
//#define NUMPROCS 3
// e.g. 3 events, 1 switch, read 1 subbuffer

#define NUMSWITCH 1
#define BUFSIZE 4
#define NR_SUBBUFS 2
#define SUBBUF_SIZE (BUFSIZE / NR_SUBBUFS)

// Writer counters
byte write_off = 0;
byte commit_count[NR_SUBBUFS];

// Reader counters
byte read_off = 0;
byte retrieve_count[NR_SUBBUFS];

byte events_lost = 0;
byte refcount = 0;

bool deliver = 0;

// buffer slot in-use bit. Detects racy use (more than a single process
// accessing a slot at any given step).
bool buffer_use[BUFSIZE];

// Proceed to a sub-subber switch is needed.
// Used in a periodical timer interrupt to fill and ship the current subbuffer
// to the reader so we can guarantee a steady flow. If a subbuffer is
// completely empty, don't switch.
// Also used as "finalize" operation to complete the last subbuffer after
// all writers have finished so the last subbuffer can be read by the reader.
proctype switcher()
{
  byte prev_off, new_off, tmp_commit;
  byte size;

cmpxchg_loop:
  atomic {
    prev_off = write_off;
    size = SUBBUF_SIZE - (prev_off % SUBBUF_SIZE);
    new_off = prev_off + size;
    if
    :: (new_off - read_off > BUFSIZE && new_off - read_off < HALF_UCHAR)
        || size == SUBBUF_SIZE ->
      refcount = refcount - 1;
      goto not_needed;
    :: else -> skip
    fi;
  }
  atomic {
    if
    :: prev_off != write_off -> goto cmpxchg_loop
    :: else -> write_off = new_off;
    fi;
  }

  atomic {
    tmp_commit = commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] + size;
    commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] = tmp_commit;
    if
    :: tmp_commit % SUBBUF_SIZE == 0 -> deliver = 1
    :: else -> skip
    fi;
    refcount = refcount - 1;
  }
not_needed:
  skip;
}

// tracer
// Writes 1 byte of information in the buffer at the current
// "write_off" position and then increment the commit_count of the sub-buffer
// the information has been written to.
proctype tracer()
{
  byte size = 1;
  byte prev_off, new_off, tmp_commit;
  byte i, j;

cmpxchg_loop:
  atomic {
    prev_off = write_off;
    new_off = prev_off + size;
  }
  atomic {
    if
    :: new_off - read_off > BUFSIZE && new_off - read_off < HALF_UCHAR ->
      goto lost
    :: else -> skip
    fi;
  }
  atomic {
    if
    :: prev_off != write_off -> goto cmpxchg_loop
    :: else -> write_off = new_off;
    fi;
    i = 0;
    do
    :: i < size ->
      assert(buffer_use[(prev_off + i) % BUFSIZE] == 0);
      buffer_use[(prev_off + i) % BUFSIZE] = 1;
      i++
    :: i >= size -> break
    od;
  }

  // writing to buffer...

  atomic {
    i = 0;
    do
    :: i < size ->
      buffer_use[(prev_off + i) % BUFSIZE] = 0;
      i++
    :: i >= size -> break
    od;
    tmp_commit = commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] + size;
    commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] = tmp_commit;
    if
    :: tmp_commit % SUBBUF_SIZE == 0 -> deliver = 1;
    :: else -> skip
    fi;
  }
  atomic {
    goto end;
lost:
    events_lost++;
end:
    refcount = refcount - 1;
  }
}

// reader
// Read the information sub-buffer per sub-buffer when available.
//
// Reads the information as soon as it is ready, or may be delayed by
// an asynchronous delivery. Being modeled as a process insures all cases
// (scheduled very quickly or very late, causing event loss) are covered.
// Only one reader per buffer (normally ensured by a mutex). This is modeled
// by using a single reader process.
proctype reader()
{
  byte i, j;
  byte tmp_retrieve;
  byte lwrite_off, lcommit_count;

  do
  :: (write_off / SUBBUF_SIZE) - (read_off / SUBBUF_SIZE) > 0
     && (write_off / SUBBUF_SIZE) - (read_off / SUBBUF_SIZE) < HALF_UCHAR
           && commit_count[(read_off % BUFSIZE) / SUBBUF_SIZE]
             - retrieve_count[(read_off % BUFSIZE) / SUBBUF_SIZE]
               == SUBBUF_SIZE ->
      atomic {
        i = 0;
        do
        :: i < SUBBUF_SIZE ->
          assert(buffer_use[(read_off + i) % BUFSIZE] == 0);
          buffer_use[(read_off + i) % BUFSIZE] = 1;
          i++
        :: i >= SUBBUF_SIZE -> break
        od;
      }
      // reading from buffer...

      // Since there is only one reader per buffer at any given time,
      // we don't care about retrieve_count vs read_off ordering :
      // combined use of retrieve_count and read_off are made atomic by a
      // mutex.
      atomic {
        i = 0;
        do
        :: i < SUBBUF_SIZE ->
          buffer_use[(read_off + i) % BUFSIZE] = 0;
          i++
        :: i >= SUBBUF_SIZE -> break
        od;
        tmp_retrieve = retrieve_count[(read_off % BUFSIZE) / SUBBUF_SIZE]
            + SUBBUF_SIZE;
        retrieve_count[(read_off % BUFSIZE) / SUBBUF_SIZE] = tmp_retrieve;
        read_off = read_off + SUBBUF_SIZE;
      }
  :: read_off >= (NUMPROCS - events_lost) -> break;
  od;
}

// Waits for all tracer and switcher processes to finish before finalizing
// the buffer. Only after that will the reader be allowed to read the
// last subbuffer.
proctype cleaner()
{
  atomic {
    do
    :: refcount == 0 ->
      refcount = refcount + 1;
      run switcher();  // Finalize the last sub-buffer so it can be read.
      break;
    od;
  }
}

init {
  byte i = 0;
  byte j = 0;
  byte sum = 0;
  byte commit_sum = 0;

  atomic {
    i = 0;
    do
    :: i < NR_SUBBUFS ->
      commit_count[i] = 0;
      retrieve_count[i] = 0;
      i++
    :: i >= NR_SUBBUFS -> break
    od;
    i = 0;
    do
    :: i < BUFSIZE ->
      buffer_use[i] = 0;
      i++
    :: i >= BUFSIZE -> break
    od;
    run reader();
    run cleaner();
    i = 0;
    do
    :: i < NUMPROCS ->
      refcount = refcount + 1;
      run tracer();
      i++
    :: i >= NUMPROCS -> break
    od;
    i = 0;
    do
    :: i < NUMSWITCH ->
      refcount = refcount + 1;
      run switcher();
      i++
    :: i >= NUMSWITCH -> break
    od;
  }
  // Assertions.
  atomic {
    // The writer head must always be superior or equal to the reader head.
    assert(write_off - read_off >= 0 && write_off - read_off < HALF_UCHAR);
    j = 0;
    commit_sum = 0;
    do
    :: j < NR_SUBBUFS ->
      commit_sum = commit_sum + commit_count[j];
      // The commit count of a particular subbuffer must always be higher
      // or equal to the retrieve_count of this subbuffer.
      assert(commit_count[j] - retrieve_count[j] >= 0 &&
        commit_count[j] - retrieve_count[j] < HALF_UCHAR);
      j++
    :: j >= NR_SUBBUFS -> break
    od;
    // The sum of all subbuffer commit counts must always be lower or equal
    // to the writer head, because space must be reserved before it is
    // written to and then committed.
    assert(write_off - commit_sum >= 0 && write_off - commit_sum < HALF_UCHAR);

    // If we have less writers than the buffer space available, we should
    // not lose events
    assert(NUMPROCS + NUMSWITCH > BUFSIZE || events_lost == 0);
  }
}


file: buffer.spin.missing_retrieve_count


//#define NUMPROCS 5
#define NUMPROCS 4
#define BUFSIZE 4
#define NR_SUBBUFS 2
#define SUBBUF_SIZE (BUFSIZE / NR_SUBBUFS)

byte write_off = 0;
byte read_off = 0;
byte events_lost = 0;
byte deliver = 0; // Number of subbuffers delivered
byte refcount = 0;

byte commit_count[NR_SUBBUFS];

byte buffer_use_count[BUFSIZE];

proctype switcher()
{
  int prev_off, new_off, tmp_commit;
  int size;

cmpxchg_loop:
  atomic {
    prev_off = write_off;
    size = SUBBUF_SIZE - (prev_off % SUBBUF_SIZE);
    new_off = prev_off + size;
    if
    :: new_off - read_off > BUFSIZE ->
      goto not_needed;
    :: else -> skip
    fi;
  }
  atomic {
    if
    :: prev_off != write_off -> goto cmpxchg_loop
    :: else -> write_off = new_off;
    fi;
  }

  atomic {
    tmp_commit = commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] + size;
    commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] = tmp_commit;
    if
    :: tmp_commit % SUBBUF_SIZE == 0 -> deliver++
    :: else -> skip
    fi;
  }
not_needed:
  skip;
}

proctype tracer(byte size)
{
  int prev_off, new_off, tmp_commit;
  int i;
  int j;

cmpxchg_loop:
  atomic {
    prev_off = write_off;
    new_off = prev_off + size;
  }
  atomic {
    if
    :: new_off - read_off > BUFSIZE -> goto lost
    :: else -> skip
    fi;
  }
  atomic {
    if
    :: prev_off != write_off -> goto cmpxchg_loop
    :: else -> write_off = new_off;
    fi;
    i = 0;
    do
    :: i < size ->
      buffer_use_count[(prev_off + i) % BUFSIZE]
        = buffer_use_count[(prev_off + i) % BUFSIZE] + 1;
      i++
    :: i >= size -> break
    od;
  }
    do
    :: j < BUFSIZE ->
      assert(buffer_use_count[j] < 2);
      j++
    :: j >= BUFSIZE -> break
    od;



  // writing to buffer...

  atomic {
    i = 0;
    do
    :: i < size ->
      buffer_use_count[(prev_off + i) % BUFSIZE]
        = buffer_use_count[(prev_off + i) % BUFSIZE] - 1;
      i++
    :: i >= size -> break
    od;
    tmp_commit = commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] + size;
    commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] = tmp_commit;
    if
    :: tmp_commit % SUBBUF_SIZE == 0 -> deliver++
    :: else -> skip
    fi;
  }
  goto end;
lost:
  events_lost++;
end:
  refcount = refcount - 1;
}

proctype reader()
{
  int i;
  int j;
    //atomic {
    //  do
    //  :: (deliver == (NUMPROCS - events_lost) / SUBBUF_SIZE) -> break;
    //  od;
    //}
  do
  :: (write_off / SUBBUF_SIZE) - (read_off / SUBBUF_SIZE) > 0 && commit_count[(read_off % BUFSIZE) / SUBBUF_SIZE] % SUBBUF_SIZE == 0 ->
    atomic {
      i = 0;
      do
      :: i < SUBBUF_SIZE ->
        buffer_use_count[(read_off + i) % BUFSIZE]
          = buffer_use_count[(read_off + i) % BUFSIZE] + 1;
        i++
      :: i >= SUBBUF_SIZE -> break
      od;
    }
    j = 0;
    do
    :: j < BUFSIZE ->
      assert(buffer_use_count[j] < 2);
      j++
    :: j >= BUFSIZE -> break
    od;

    // reading from buffer...

    atomic {
      i = 0;
      do
      :: i < SUBBUF_SIZE ->
        buffer_use_count[(read_off + i) % BUFSIZE]
          = buffer_use_count[(read_off + i) % BUFSIZE] - 1;
        i++
      :: i >= SUBBUF_SIZE -> break
      od;
      read_off = read_off + SUBBUF_SIZE;
    }
  :: read_off >= (NUMPROCS - events_lost) -> break;
  od;
}

proctype cleaner()
{
  atomic {
    do
    :: refcount == 0 ->
      run switcher();
      break;
    od;
  }
}

init {
  int i = 0;
  int j = 0;
  int sum = 0;
  int commit_sum = 0;

  atomic {
    i = 0;
    do
    :: i < NR_SUBBUFS ->
      commit_count[i] = 0;
      i++
    :: i >= NR_SUBBUFS -> break
    od;
    i = 0;
    do
    :: i < BUFSIZE ->
      buffer_use_count[i] = 0;
      i++
    :: i >= BUFSIZE -> break
    od;
    run reader();
    run cleaner();
    i = 0;
    do
    :: i < NUMPROCS ->
      refcount = refcount + 1;
      run tracer(1);
      i++
    :: i >= NUMPROCS -> break
    od;
    run switcher();
  }
  atomic {
    assert(write_off - read_off >= 0);
    j = 0;
    commit_sum = 0;
    do
    :: j < NR_SUBBUFS ->
      commit_sum = commit_sum + commit_count[j];
      j++
    :: j >= NR_SUBBUFS -> break
    od;
    assert(commit_sum <= write_off);
    j = 0;
    do
    :: j < BUFSIZE ->
      assert(buffer_use_count[j] < 2);
      j++
    :: j >= BUFSIZE -> break
    od;

    //assert(events_lost == 0);
  }
}


-- 
Mathieu Desnoyers
OpenPGP key fingerprint: 8CD5 52C3 8E3C 4140 715F  BA06 3F25 A8FE 3BAE 9A68




More information about the lttng-dev mailing list