[ltt-dev] LTTng relay model
Mathieu Desnoyers
compudj at krystal.dyndns.org
Thu Jun 26 13:19:03 EDT 2008
* Paul McKenney (Paul.McKenney at us.ibm.com) wrote:
> Mathieu Desnoyers <compudj at krystal.dyndns.org> wrote on 06/26/2008
> 06:23:27 AM:
> > Hi everyone,
> >
> > I just released LTTng 0.10-pre56 for the 2.6.26-rc8 kernel. I try to
> > keep the changes in this "stable" version to the minimum level. The
> > following changes about to come to the development tree will include :
> >
> > * Basic user-space tracing
> > * Fix resulting of formal verification of ltt-tracer.c A rare buffer
> > corruption scenario (which never happens with large buffers) has been
> > identified. If some of you are interested to review the Promela models
> > I created, I could post them here. I used the "spin" verification
> > tool, following http://lwn.net/Articles/243851/ (very interesting,
> > with real-life examples of conversion from C code to Promela. Article
> > from Paul McKenney)
>
> I would be interested in at least looking at your model. ;-)
>
> Thanx, Paul
Here it is.
The first model is the correct one. Follows the flawed model (current
implementation). The first model takes about 600MB or ram to verify. I
am ordering a machine with 16GB ram to play a bit more with formal
verification this summer. Comments are welcome.
file: buffer.spin
// LTTng ltt-tracer.c atomic lockless buffering scheme Promela model v1
// Created for the Spin validator.
// Mathieu Desnoyers <mathieu.desnoyers at polymtl.ca>
// June 2008
// TODO : create test cases that will generate an overflow on the offset and
// counter type. Counter types smaller than a byte should be used.
// Promela only has unsigned char, no signed char.
// Because detection of difference < 0 depends on a signed type, but we want
// compactness, check also for the values being higher than half of the unsigned
// char range (and consider them negative). The model, by design, does not use
// offsets or counts higher than 127 because we would then have to use a larger
// type (short or int).
#define HALF_UCHAR (255/2)
// NUMPROCS 4 : causes event loss with some reader timings.
// e.g. 3 events, 1 switch, 1 event (lost, buffer full), read 1 subbuffer
#define NUMPROCS 4
// NUMPROCS 3 : does not cause event loss because buffers are big enough.
//#define NUMPROCS 3
// e.g. 3 events, 1 switch, read 1 subbuffer
#define NUMSWITCH 1
#define BUFSIZE 4
#define NR_SUBBUFS 2
#define SUBBUF_SIZE (BUFSIZE / NR_SUBBUFS)
// Writer counters
byte write_off = 0;
byte commit_count[NR_SUBBUFS];
// Reader counters
byte read_off = 0;
byte retrieve_count[NR_SUBBUFS];
byte events_lost = 0;
byte refcount = 0;
bool deliver = 0;
// buffer slot in-use bit. Detects racy use (more than a single process
// accessing a slot at any given step).
bool buffer_use[BUFSIZE];
// Proceed to a sub-subber switch is needed.
// Used in a periodical timer interrupt to fill and ship the current subbuffer
// to the reader so we can guarantee a steady flow. If a subbuffer is
// completely empty, don't switch.
// Also used as "finalize" operation to complete the last subbuffer after
// all writers have finished so the last subbuffer can be read by the reader.
proctype switcher()
{
byte prev_off, new_off, tmp_commit;
byte size;
cmpxchg_loop:
atomic {
prev_off = write_off;
size = SUBBUF_SIZE - (prev_off % SUBBUF_SIZE);
new_off = prev_off + size;
if
:: (new_off - read_off > BUFSIZE && new_off - read_off < HALF_UCHAR)
|| size == SUBBUF_SIZE ->
refcount = refcount - 1;
goto not_needed;
:: else -> skip
fi;
}
atomic {
if
:: prev_off != write_off -> goto cmpxchg_loop
:: else -> write_off = new_off;
fi;
}
atomic {
tmp_commit = commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] + size;
commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] = tmp_commit;
if
:: tmp_commit % SUBBUF_SIZE == 0 -> deliver = 1
:: else -> skip
fi;
refcount = refcount - 1;
}
not_needed:
skip;
}
// tracer
// Writes 1 byte of information in the buffer at the current
// "write_off" position and then increment the commit_count of the sub-buffer
// the information has been written to.
proctype tracer()
{
byte size = 1;
byte prev_off, new_off, tmp_commit;
byte i, j;
cmpxchg_loop:
atomic {
prev_off = write_off;
new_off = prev_off + size;
}
atomic {
if
:: new_off - read_off > BUFSIZE && new_off - read_off < HALF_UCHAR ->
goto lost
:: else -> skip
fi;
}
atomic {
if
:: prev_off != write_off -> goto cmpxchg_loop
:: else -> write_off = new_off;
fi;
i = 0;
do
:: i < size ->
assert(buffer_use[(prev_off + i) % BUFSIZE] == 0);
buffer_use[(prev_off + i) % BUFSIZE] = 1;
i++
:: i >= size -> break
od;
}
// writing to buffer...
atomic {
i = 0;
do
:: i < size ->
buffer_use[(prev_off + i) % BUFSIZE] = 0;
i++
:: i >= size -> break
od;
tmp_commit = commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] + size;
commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] = tmp_commit;
if
:: tmp_commit % SUBBUF_SIZE == 0 -> deliver = 1;
:: else -> skip
fi;
}
atomic {
goto end;
lost:
events_lost++;
end:
refcount = refcount - 1;
}
}
// reader
// Read the information sub-buffer per sub-buffer when available.
//
// Reads the information as soon as it is ready, or may be delayed by
// an asynchronous delivery. Being modeled as a process insures all cases
// (scheduled very quickly or very late, causing event loss) are covered.
// Only one reader per buffer (normally ensured by a mutex). This is modeled
// by using a single reader process.
proctype reader()
{
byte i, j;
byte tmp_retrieve;
byte lwrite_off, lcommit_count;
do
:: (write_off / SUBBUF_SIZE) - (read_off / SUBBUF_SIZE) > 0
&& (write_off / SUBBUF_SIZE) - (read_off / SUBBUF_SIZE) < HALF_UCHAR
&& commit_count[(read_off % BUFSIZE) / SUBBUF_SIZE]
- retrieve_count[(read_off % BUFSIZE) / SUBBUF_SIZE]
== SUBBUF_SIZE ->
atomic {
i = 0;
do
:: i < SUBBUF_SIZE ->
assert(buffer_use[(read_off + i) % BUFSIZE] == 0);
buffer_use[(read_off + i) % BUFSIZE] = 1;
i++
:: i >= SUBBUF_SIZE -> break
od;
}
// reading from buffer...
// Since there is only one reader per buffer at any given time,
// we don't care about retrieve_count vs read_off ordering :
// combined use of retrieve_count and read_off are made atomic by a
// mutex.
atomic {
i = 0;
do
:: i < SUBBUF_SIZE ->
buffer_use[(read_off + i) % BUFSIZE] = 0;
i++
:: i >= SUBBUF_SIZE -> break
od;
tmp_retrieve = retrieve_count[(read_off % BUFSIZE) / SUBBUF_SIZE]
+ SUBBUF_SIZE;
retrieve_count[(read_off % BUFSIZE) / SUBBUF_SIZE] = tmp_retrieve;
read_off = read_off + SUBBUF_SIZE;
}
:: read_off >= (NUMPROCS - events_lost) -> break;
od;
}
// Waits for all tracer and switcher processes to finish before finalizing
// the buffer. Only after that will the reader be allowed to read the
// last subbuffer.
proctype cleaner()
{
atomic {
do
:: refcount == 0 ->
refcount = refcount + 1;
run switcher(); // Finalize the last sub-buffer so it can be read.
break;
od;
}
}
init {
byte i = 0;
byte j = 0;
byte sum = 0;
byte commit_sum = 0;
atomic {
i = 0;
do
:: i < NR_SUBBUFS ->
commit_count[i] = 0;
retrieve_count[i] = 0;
i++
:: i >= NR_SUBBUFS -> break
od;
i = 0;
do
:: i < BUFSIZE ->
buffer_use[i] = 0;
i++
:: i >= BUFSIZE -> break
od;
run reader();
run cleaner();
i = 0;
do
:: i < NUMPROCS ->
refcount = refcount + 1;
run tracer();
i++
:: i >= NUMPROCS -> break
od;
i = 0;
do
:: i < NUMSWITCH ->
refcount = refcount + 1;
run switcher();
i++
:: i >= NUMSWITCH -> break
od;
}
// Assertions.
atomic {
// The writer head must always be superior or equal to the reader head.
assert(write_off - read_off >= 0 && write_off - read_off < HALF_UCHAR);
j = 0;
commit_sum = 0;
do
:: j < NR_SUBBUFS ->
commit_sum = commit_sum + commit_count[j];
// The commit count of a particular subbuffer must always be higher
// or equal to the retrieve_count of this subbuffer.
assert(commit_count[j] - retrieve_count[j] >= 0 &&
commit_count[j] - retrieve_count[j] < HALF_UCHAR);
j++
:: j >= NR_SUBBUFS -> break
od;
// The sum of all subbuffer commit counts must always be lower or equal
// to the writer head, because space must be reserved before it is
// written to and then committed.
assert(write_off - commit_sum >= 0 && write_off - commit_sum < HALF_UCHAR);
// If we have less writers than the buffer space available, we should
// not lose events
assert(NUMPROCS + NUMSWITCH > BUFSIZE || events_lost == 0);
}
}
file: buffer.spin.missing_retrieve_count
//#define NUMPROCS 5
#define NUMPROCS 4
#define BUFSIZE 4
#define NR_SUBBUFS 2
#define SUBBUF_SIZE (BUFSIZE / NR_SUBBUFS)
byte write_off = 0;
byte read_off = 0;
byte events_lost = 0;
byte deliver = 0; // Number of subbuffers delivered
byte refcount = 0;
byte commit_count[NR_SUBBUFS];
byte buffer_use_count[BUFSIZE];
proctype switcher()
{
int prev_off, new_off, tmp_commit;
int size;
cmpxchg_loop:
atomic {
prev_off = write_off;
size = SUBBUF_SIZE - (prev_off % SUBBUF_SIZE);
new_off = prev_off + size;
if
:: new_off - read_off > BUFSIZE ->
goto not_needed;
:: else -> skip
fi;
}
atomic {
if
:: prev_off != write_off -> goto cmpxchg_loop
:: else -> write_off = new_off;
fi;
}
atomic {
tmp_commit = commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] + size;
commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] = tmp_commit;
if
:: tmp_commit % SUBBUF_SIZE == 0 -> deliver++
:: else -> skip
fi;
}
not_needed:
skip;
}
proctype tracer(byte size)
{
int prev_off, new_off, tmp_commit;
int i;
int j;
cmpxchg_loop:
atomic {
prev_off = write_off;
new_off = prev_off + size;
}
atomic {
if
:: new_off - read_off > BUFSIZE -> goto lost
:: else -> skip
fi;
}
atomic {
if
:: prev_off != write_off -> goto cmpxchg_loop
:: else -> write_off = new_off;
fi;
i = 0;
do
:: i < size ->
buffer_use_count[(prev_off + i) % BUFSIZE]
= buffer_use_count[(prev_off + i) % BUFSIZE] + 1;
i++
:: i >= size -> break
od;
}
do
:: j < BUFSIZE ->
assert(buffer_use_count[j] < 2);
j++
:: j >= BUFSIZE -> break
od;
// writing to buffer...
atomic {
i = 0;
do
:: i < size ->
buffer_use_count[(prev_off + i) % BUFSIZE]
= buffer_use_count[(prev_off + i) % BUFSIZE] - 1;
i++
:: i >= size -> break
od;
tmp_commit = commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] + size;
commit_count[(prev_off % BUFSIZE) / SUBBUF_SIZE] = tmp_commit;
if
:: tmp_commit % SUBBUF_SIZE == 0 -> deliver++
:: else -> skip
fi;
}
goto end;
lost:
events_lost++;
end:
refcount = refcount - 1;
}
proctype reader()
{
int i;
int j;
//atomic {
// do
// :: (deliver == (NUMPROCS - events_lost) / SUBBUF_SIZE) -> break;
// od;
//}
do
:: (write_off / SUBBUF_SIZE) - (read_off / SUBBUF_SIZE) > 0 && commit_count[(read_off % BUFSIZE) / SUBBUF_SIZE] % SUBBUF_SIZE == 0 ->
atomic {
i = 0;
do
:: i < SUBBUF_SIZE ->
buffer_use_count[(read_off + i) % BUFSIZE]
= buffer_use_count[(read_off + i) % BUFSIZE] + 1;
i++
:: i >= SUBBUF_SIZE -> break
od;
}
j = 0;
do
:: j < BUFSIZE ->
assert(buffer_use_count[j] < 2);
j++
:: j >= BUFSIZE -> break
od;
// reading from buffer...
atomic {
i = 0;
do
:: i < SUBBUF_SIZE ->
buffer_use_count[(read_off + i) % BUFSIZE]
= buffer_use_count[(read_off + i) % BUFSIZE] - 1;
i++
:: i >= SUBBUF_SIZE -> break
od;
read_off = read_off + SUBBUF_SIZE;
}
:: read_off >= (NUMPROCS - events_lost) -> break;
od;
}
proctype cleaner()
{
atomic {
do
:: refcount == 0 ->
run switcher();
break;
od;
}
}
init {
int i = 0;
int j = 0;
int sum = 0;
int commit_sum = 0;
atomic {
i = 0;
do
:: i < NR_SUBBUFS ->
commit_count[i] = 0;
i++
:: i >= NR_SUBBUFS -> break
od;
i = 0;
do
:: i < BUFSIZE ->
buffer_use_count[i] = 0;
i++
:: i >= BUFSIZE -> break
od;
run reader();
run cleaner();
i = 0;
do
:: i < NUMPROCS ->
refcount = refcount + 1;
run tracer(1);
i++
:: i >= NUMPROCS -> break
od;
run switcher();
}
atomic {
assert(write_off - read_off >= 0);
j = 0;
commit_sum = 0;
do
:: j < NR_SUBBUFS ->
commit_sum = commit_sum + commit_count[j];
j++
:: j >= NR_SUBBUFS -> break
od;
assert(commit_sum <= write_off);
j = 0;
do
:: j < BUFSIZE ->
assert(buffer_use_count[j] < 2);
j++
:: j >= BUFSIZE -> break
od;
//assert(events_lost == 0);
}
}
--
Mathieu Desnoyers
OpenPGP key fingerprint: 8CD5 52C3 8E3C 4140 715F BA06 3F25 A8FE 3BAE 9A68
More information about the lttng-dev
mailing list