[ltt-dev] [PATCH] LTT relay formal verif fix
Paul McKenney
Paul.McKenney at us.ibm.com
Tue Aug 5 17:00:04 EDT 2008
I am very glad that Promela/spin was able to help you out! Been quite
useful to me over the years.
Thanx, Paul
Mathieu Desnoyers <compudj at krystal.dyndns.org> wrote on 08/05/2008 09:12:45
AM:
> Running a ltt relay buffer model into the spin verification tool shown
that in
> rare cases, where subbuffers are so small that enough reserve
> (uncomitted) space
> would be equal to the subbuffer size, the reader size could think that
the
> subbuffer is fully committed when in fact the data is currently
> being written to
> it.
>
> Fix this by checking the commit count and write offset difference
tofigure out
> if all the reserved space for this subbuffer has been committed.
> Given that the
> write offset increments for the whole subbuffer each time a given
subbuffer's
> commit count increment of the amount of bytes found in a subbuffer, we
have to
> multiply the commit count by the number of subbuffers to match the
> value of the
> write offset. Also, the write offset has to be brought back to the window
> corresponding to the commit count being checked. This is done by
substracting
> the subbuffer offset and by then aligning on the buffer size (- 1 is used
to
> align an already aligned offset on the current value, not the next).
>
> Note that the retrieve count solution, used in the formal model, has not
been
> implemented in C code because of atomicity constraints due to the fact
that
> writers can push readers in flight recorder mode. Having an extra counter
to
> update makes synchronization messy.
>
> This fix also moves the memory barriers found in the ltt_ioctl code
> to make sure
> the commit count is read before the write offset, given that the write
barrier
> orders :
>
> write offset and buffer write
> smp_wmb()
> commit count write
>
> the read-side should look like :
>
> commit count read
> smp_rmb()
> write offset and buffer read
>
> Previously, the read-side did do :
> commit count read and write offset read
> smp_rmb()
> buffer read
>
> Which could lead to unordered write and commit count reads.
>
> It applies to LTTng 0.13 and earlier.
>
> Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at polymtl.ca>
> CC: Paul McKenney <Paul.McKenney at us.ibm.com>
> CC: Robert Wisniewski <bob at watson.ibm.com>
> ---
> include/linux/ltt-tracer.h | 16 ++++
> ltt/ltt-relay.c | 178 ++++++++++++++++++++++++++++
> +----------------
> 2 files changed, 134 insertions(+), 60 deletions(-)
>
> Index: linux-2.6-lttng/include/linux/ltt-tracer.h
> ===================================================================
> --- linux-2.6-lttng.orig/include/linux/ltt-tracer.h 2008-08-05 11:
> 54:13.000000000 -0400
> +++ linux-2.6-lttng/include/linux/ltt-tracer.h 2008-08-05 11:54:
> 48.000000000 -0400
> @@ -176,6 +176,8 @@ struct ltt_channel_struct {
> int overwrite;
> struct kref kref;
> int compact;
> + unsigned int n_subbufs_order; /* n_subbufs count order */
> + unsigned int subbuf_size_order; /* subbuf_size count order */
>
> void *trans_channel_data;
>
> @@ -574,7 +576,21 @@ static inline char *ltt_write_event_head
> /* Buffer offset macros */
>
> #define BUFFER_OFFSET(offset, chan) ((offset) & (chan->alloc_size-1))
> +/*
> + * BUFFER_ALIGN aligns on the next buffer boundary.
> + * Note that if the offset is already aligned on a buffer boundary,
> + * it is pushed to the next one.
> + */
> +#define BUFFER_ALIGN(offset, chan) \
> + (((offset) + chan->alloc_size) & (~(chan->alloc_size-1)))
> +#define BUFFER_TRUNC(offset, chan) \
> + ((offset) & (~(chan->alloc_size-1)))
> #define SUBBUF_OFFSET(offset, chan) ((offset) & (chan->subbuf_size-1))
> +/*
> + * SUBBUF_ALIGN aligns on the next subbuffer boundary.
> + * Note that if the offset is already aligned on a subbuffer boundary,
> + * it is pushed to the next one.
> + */
> #define SUBBUF_ALIGN(offset, chan) \
> (((offset) + chan->subbuf_size) & (~(chan->subbuf_size-1)))
> #define SUBBUF_TRUNC(offset, chan) \
> Index: linux-2.6-lttng/ltt/ltt-relay.c
> ===================================================================
> --- linux-2.6-lttng.orig/ltt/ltt-relay.c 2008-08-05 11:54:43.000000000
-0400
> +++ linux-2.6-lttng/ltt/ltt-relay.c 2008-08-05 11:56:18.000000000 -0400
> @@ -261,24 +261,37 @@ static int ltt_ioctl(struct inode *inode
> switch (cmd) {
> case RELAY_GET_SUBBUF:
> {
> - long consumed_old, consumed_idx;
> + long consumed_old, consumed_idx, commit_count, write_offset;
> +
> atomic_long_inc(<t_buf->active_readers);
> consumed_old = atomic_long_read(<t_buf->consumed);
> consumed_idx = SUBBUF_INDEX(consumed_old, buf->chan);
> - if (SUBBUF_OFFSET(
> - local_read(<t_buf->commit_count[consumed_idx]),
> - buf->chan) != 0) {
> + commit_count = local_read(<t_buf->commit_count[consumed_idx]);
> + /*
> + * make sure we read the commit count before reading the
> + * buffer data and the write offset.
> + */
> + smp_rmb();
> + write_offset = local_read(<t_buf->offset);
> + /*
> + * Check that the subbuffer has been fully committed.
> + */
> + if (BUFFER_ALIGN(write_offset - 1
> + - (consumed_idx << ltt_channel->subbuf_size_order),
> + buf->chan)
> + - (commit_count << ltt_channel->n_subbufs_order) != 0) {
> atomic_long_dec(<t_buf->active_readers);
> return -EAGAIN;
> }
> - if ((SUBBUF_TRUNC(local_read(<t_buf->offset), buf->chan)
> + /*
> + * Check that we are not about to read the same subbuffer in
> + * which the writer head is.
> + */
> + if ((SUBBUF_TRUNC(write_offset, buf->chan)
> - SUBBUF_TRUNC(consumed_old, buf->chan)) == 0) {
> atomic_long_dec(<t_buf->active_readers);
> return -EAGAIN;
> }
> - /* must make sure we read the counter before reading the data
> - * in the buffer. */
> - smp_rmb();
> return put_user((u32)consumed_old, argp);
> break;
> }
> @@ -350,24 +363,29 @@ static void ltt_relay_print_subbuffer_er
> long cons_off, unsigned int i)
> {
> struct rchan *rchan = ltt_chan->trans_channel_data;
> - long cons_idx;
> + long cons_idx, commit_count, write_offset;
>
> + cons_idx = SUBBUF_INDEX(cons_off, rchan);
> + commit_count = local_read(<t_chan->buf[i].commit_count[cons_idx]);
> + /*
> + * No need to order commit_count and write_offset reads because we
> + * execute after trace is stopped when there are no readers left.
> + */
> + write_offset = local_read(<t_chan->buf[i].offset);
> printk(KERN_WARNING
> "LTT : unread channel %s offset is %ld "
> "and cons_off : %ld (cpu %u)\n",
> - ltt_chan->channel_name,
> - local_read(<t_chan->buf[i].offset), cons_off, i);
> - /* Check each sub-buffer for non zero commit count */
> - cons_idx = SUBBUF_INDEX(cons_off, rchan);
> - if (SUBBUF_OFFSET(local_read(<t_chan->buf[i].commit_count
[cons_idx]),
> - rchan))
> + ltt_chan->channel_name, write_offset, cons_off, i);
> + /* Check each sub-buffer for non filled commit count */
> + if (BUFFER_ALIGN(write_offset - 1
> + - (cons_idx << ltt_chan->subbuf_size_order), rchan)
> + - (commit_count << ltt_chan->n_subbufs_order))
> printk(KERN_ALERT
> - "LTT : %s : subbuffer %lu has non zero "
> - "commit count.\n",
> - ltt_chan->channel_name, cons_idx);
> + "LTT : %s : subbuffer %lu has non filled "
> + "commit count %lu.\n",
> + ltt_chan->channel_name, cons_idx, commit_count);
> printk(KERN_ALERT "LTT : %s : commit count : %lu, subbuf size %zd\n",
> - ltt_chan->channel_name,
> - local_read(<t_chan->buf[i].commit_count[cons_idx]),
> + ltt_chan->channel_name, commit_count,
> rchan->subbuf_size);
> }
>
> @@ -508,6 +526,8 @@ static int ltt_relay_create_channel(char
> (*ltt_chan)->buffer_begin = ltt_buffer_begin_callback;
> (*ltt_chan)->buffer_end = ltt_buffer_end_callback;
> (*ltt_chan)->overwrite = overwrite;
> + (*ltt_chan)->n_subbufs_order = get_count_order(n_subbufs);
> + (*ltt_chan)->subbuf_size_order = get_count_order(subbuf_size);
> if (strcmp(channel_name, LTT_COMPACT_CHANNEL) == 0)
> (*ltt_chan)->compact = 1;
> else
> @@ -642,6 +662,12 @@ static void ltt_relay_remove_channel(str
> kref_put(&channel->kref, ltt_relay_release_channel);
> }
>
> +/*
> + * reserve_commit_diff is the difference between the write offset
> aligned to the
> + * next (or previous) subbuffer and n_subbufs times the commit
> count of a given
> + * subbuffer. It is used to find out if the space reserved in a
> given subbuffer
> + * has been fully committed.
> + */
> struct ltt_reserve_switch_offsets {
> long begin, end, old;
> long begin_switch, end_switch_current, end_switch_old;
> @@ -680,16 +706,19 @@ static inline int ltt_relay_try_reserve(
> }
> }
> if (offsets->begin_switch) {
> + long subbuf_index;
> if (offsets->end_switch_old)
> offsets->begin = SUBBUF_ALIGN(offsets->begin,
> buf->chan);
> offsets->begin = offsets->begin + ltt_subbuf_header_len();
> /* Test new buffer integrity */
> + subbuf_index = SUBBUF_INDEX(offsets->begin, buf->chan);
> offsets->reserve_commit_diff =
> - SUBBUF_OFFSET(buf->chan->subbuf_size
> - - local_read(<t_buf->commit_count[
> - SUBBUF_INDEX(offsets->begin, buf->chan)]),
> - buf->chan);
> + BUFFER_TRUNC(offsets->begin
> + - (subbuf_index << ltt_channel->subbuf_size_order),
> + buf->chan)
> + - (local_read(<t_buf->commit_count[subbuf_index])
> + << ltt_channel->n_subbufs_order);
> if (offsets->reserve_commit_diff == 0) {
> /* Next buffer not corrupted. */
> if (!ltt_channel->overwrite &&
> @@ -775,6 +804,8 @@ static inline int ltt_relay_try_switch(
> struct ltt_reserve_switch_offsets *offsets,
> u64 *tsc)
> {
> + long subbuf_index;
> +
> offsets->begin = local_read(<t_buf->offset);
> offsets->old = offsets->begin;
> offsets->begin_switch = 0;
> @@ -793,10 +824,13 @@ static inline int ltt_relay_try_switch(
> * Always begin_switch in FORCE_ACTIVE mode.
> * Test new buffer integrity
> */
> - offsets->reserve_commit_diff = SUBBUF_OFFSET(buf->chan->subbuf_size
> - - local_read(
> - <t_buf->commit_count[SUBBUF_INDEX(offsets->begin,
> - buf->chan)]), buf->chan);
> + subbuf_index = SUBBUF_INDEX(offsets->begin, buf->chan);
> + offsets->reserve_commit_diff =
> + BUFFER_TRUNC(offsets->begin
> + - (subbuf_index << ltt_channel->subbuf_size_order),
> + buf->chan)
> + - (local_read(<t_buf->commit_count[subbuf_index])
> + << ltt_channel->n_subbufs_order);
> if (offsets->reserve_commit_diff == 0) {
> /* Next buffer not corrupted. */
> if (mode == FORCE_ACTIVE && !ltt_channel->overwrite &&
> @@ -820,7 +854,9 @@ static inline int ltt_relay_try_switch(
> }
>
> static inline void ltt_reserve_push_reader(
> - struct ltt_channel_buf_struct *ltt_buf, struct rchan *rchan,
> + struct ltt_channel_struct *ltt_channel,
> + struct ltt_channel_buf_struct *ltt_buf,
> + struct rchan *rchan,
> struct rchan_buf *buf,
> struct ltt_reserve_switch_offsets *offsets)
> {
> @@ -864,14 +900,24 @@ static inline void ltt_reserve_push_read
> */
> if (offsets->reserve_commit_diff) {
> /*
> - * We have to alter the sub-buffer commit count : a
> - * sub-buffer is corrupted. We do not deliver it.
> + * We have to alter the sub-buffer commit count.
> + * We do not deliver the previous subbuffer, given
> + * it was either corrupted or not consumed (overwrite
> + * mode).
> */
> - local_add(
> - offsets->reserve_commit_diff,
> + local_add(offsets->reserve_commit_diff
> + >> ltt_channel->n_subbufs_order,
> <t_buf->commit_count[
> - SUBBUF_INDEX(offsets->begin, buf->chan)]);
> - local_inc(<t_buf->corrupted_subbuffers);
> + SUBBUF_INDEX(offsets->begin, rchan)]);
> + if (offsets->reserve_commit_diff != rchan->alloc_size) {
> + /*
> + * The reserve commit diff was not n_subbufs *
> + * subbuf_size (= alloc_size) : it means the
> + * subbuffer was partly written to and is
> + * therefore corrupted.
> + */
> + local_inc(<t_buf->corrupted_subbuffers);
> + }
> }
> }
> }
> @@ -903,18 +949,20 @@ static inline void ltt_reserve_switch_ol
> struct rchan_buf *buf,
> struct ltt_reserve_switch_offsets *offsets, u64 *tsc)
> {
> - ltt_channel->buffer_end(buf, *tsc, offsets->old,
> - SUBBUF_INDEX((offsets->old-1), buf->chan));
> + long oldidx = SUBBUF_INDEX(offsets->old - 1, buf->chan);
> +
> + ltt_channel->buffer_end(buf, *tsc, offsets->old, oldidx);
> /* Must write buffer end before incrementing commit count */
> smp_wmb();
> offsets->commit_count =
> local_add_return(buf->chan->subbuf_size
> - (SUBBUF_OFFSET(offsets->old-1, buf->chan)+1),
> - <t_buf->commit_count[SUBBUF_INDEX(
> - offsets->old-1, buf->chan)]);
> - if (SUBBUF_OFFSET(offsets->commit_count, buf->chan) == 0)
> - ltt_deliver(buf, SUBBUF_INDEX((offsets->old-1),
> - buf->chan), NULL);
> + <t_buf->commit_count[oldidx]);
> + if (BUFFER_ALIGN(offsets->old - 1
> + - (oldidx << ltt_channel->subbuf_size_order), rchan)
> + - (offsets->commit_count
> + << ltt_channel->n_subbufs_order) == 0)
> + ltt_deliver(buf, oldidx, NULL);
> }
>
> /*
> @@ -930,16 +978,19 @@ static inline void ltt_reserve_switch_ne
> struct rchan_buf *buf,
> struct ltt_reserve_switch_offsets *offsets, u64 *tsc)
> {
> - ltt_channel->buffer_begin(buf, *tsc, SUBBUF_INDEX(offsets->begin,
> - buf->chan));
> + long beginidx = SUBBUF_INDEX(offsets->begin, buf->chan);
> +
> + ltt_channel->buffer_begin(buf, *tsc, beginidx);
> /* Must write buffer end before incrementing commit count */
> smp_wmb();
> offsets->commit_count = local_add_return(ltt_subbuf_header_len(),
> - <t_buf->commit_count[
> - SUBBUF_INDEX(offsets->begin, buf->chan)]);
> + <t_buf->commit_count[beginidx]);
> /* Check if the written buffer has to be delivered */
> - if (SUBBUF_OFFSET(offsets->commit_count, buf->chan) == 0)
> - ltt_deliver(buf, SUBBUF_INDEX(offsets->begin, buf->chan), NULL);
> + if (BUFFER_ALIGN(offsets->begin
> + - (beginidx << ltt_channel->subbuf_size_order), rchan)
> + - (offsets->commit_count
> + << ltt_channel->n_subbufs_order) == 0)
> + ltt_deliver(buf, beginidx, NULL);
> }
>
>
> @@ -967,18 +1018,20 @@ static inline void ltt_reserve_end_switc
> struct rchan_buf *buf,
> struct ltt_reserve_switch_offsets *offsets, u64 *tsc)
> {
> - ltt_channel->buffer_end(buf, *tsc, offsets->end,
> - SUBBUF_INDEX((offsets->end-1), buf->chan));
> + long endidx = SUBBUF_INDEX(offsets->end - 1, buf->chan);
> +
> + ltt_channel->buffer_end(buf, *tsc, offsets->end, endidx);
> /* Must write buffer begin before incrementing commit count */
> smp_wmb();
> offsets->commit_count =
> local_add_return(buf->chan->subbuf_size
> - (SUBBUF_OFFSET(offsets->end-1, buf->chan)+1),
> - <t_buf->commit_count[SUBBUF_INDEX(
> - offsets->end-1, buf->chan)]);
> - if (SUBBUF_OFFSET(offsets->commit_count, buf->chan) == 0)
> - ltt_deliver(buf, SUBBUF_INDEX((offsets->end-1),
> - buf->chan), NULL);
> + <t_buf->commit_count[endidx]);
> + if (BUFFER_ALIGN(offsets->end - 1
> + - (endidx << ltt_channel->subbuf_size_order), rchan)
> + - (offsets->commit_count
> + << ltt_channel->n_subbufs_order) == 0)
> + ltt_deliver(buf, endidx, NULL);
> }
>
> /**
> @@ -1028,7 +1081,7 @@ static void *ltt_relay_reserve_slot(stru
> /*
> * Push the reader if necessary
> */
> - ltt_reserve_push_reader(ltt_buf, rchan, buf, &offsets);
> + ltt_reserve_push_reader(ltt_channel, ltt_buf, rchan, buf, &offsets);
>
> /*
> * Switch old subbuffer if needed.
> @@ -1089,7 +1142,8 @@ static void ltt_force_switch(struct rcha
> * Push the reader if necessary
> */
> if (mode == FORCE_ACTIVE)
> - ltt_reserve_push_reader(ltt_buf, rchan, buf, &offsets);
> + ltt_reserve_push_reader(ltt_channel, ltt_buf, rchan, buf,
> + &offsets);
>
> /*
> * Switch old subbuffer if needed.
> @@ -1124,14 +1178,18 @@ static void ltt_relay_commit_slot(struct
> struct ltt_channel_buf_struct *ltt_buf = <t_channel->buf[buf->cpu];
> unsigned int offset_end = reserved - buf->start;
> long commit_count;
> + long endidx = SUBBUF_INDEX(offset_end - 1, buf->chan);
> + struct rchan *rchan = buf->chan;
>
> /* Must write slot data before incrementing commit count */
> smp_wmb();
> commit_count = local_add_return(slot_size,
> - <t_buf->commit_count[SUBBUF_INDEX(offset_end-1, buf->chan)]);
> + <t_buf->commit_count[endidx]);
> /* Check if all commits have been done */
> - if (SUBBUF_OFFSET(commit_count, buf->chan) == 0)
> - ltt_deliver(buf, SUBBUF_INDEX(offset_end-1, buf->chan), NULL);
> + if (BUFFER_ALIGN(offset_end - 1
> + - (endidx << ltt_channel->subbuf_size_order), rchan)
> + - (commit_count << ltt_channel->n_subbufs_order) == 0)
> + ltt_deliver(buf, endidx, NULL);
> /*
> * Update lost_size for each commit. It's needed only for extracting
> * ltt buffers from vmcore, after crash.
>
> --
> Mathieu Desnoyers
> OpenPGP key fingerprint: 8CD5 52C3 8E3C 4140 715F BA06 3F25 A8FE 3BAE
9A68
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.casi.polymtl.ca/pipermail/lttng-dev/attachments/20080805/96f277d3/attachment-0003.htm>
More information about the lttng-dev
mailing list