[ltt-dev] [PATCH] LTT relay formal verif fix

Paul McKenney Paul.McKenney at us.ibm.com
Tue Aug 5 17:00:04 EDT 2008


I am very glad that Promela/spin was able to help you out!  Been quite
useful to me over the years.

                              Thanx, Paul

Mathieu Desnoyers <compudj at krystal.dyndns.org> wrote on 08/05/2008 09:12:45
AM:
> Running a ltt relay buffer model into the spin verification tool shown
that in
> rare cases, where subbuffers are so small that enough reserve
> (uncomitted) space
> would be equal to the subbuffer size, the reader size could think that
the
> subbuffer is fully committed when in fact the data is currently
> being written to
> it.
>
> Fix this by checking the commit count and write offset difference
tofigure out
> if all the reserved space for this subbuffer has been committed.
> Given that the
> write offset increments for the whole subbuffer each time a given
subbuffer's
> commit count increment of the amount of bytes found in a subbuffer, we
have to
> multiply the commit count by the number of subbuffers to match the
> value of the
> write offset. Also, the write offset has to be brought back to the window
> corresponding to the commit count being checked. This is done by
substracting
> the subbuffer offset and by then aligning on the buffer size (- 1 is used
to
> align an already aligned offset on the current value, not the next).
>
> Note that the retrieve count solution, used in the formal model, has not
been
> implemented in C code because of atomicity constraints due to the fact
that
> writers can push readers in flight recorder mode. Having an extra counter
to
> update makes synchronization messy.
>
> This fix also moves the memory barriers found in the ltt_ioctl code
> to make sure
> the commit count is read before the write offset, given that the write
barrier
> orders :
>
> write offset and buffer write
> smp_wmb()
> commit count write
>
> the read-side should look like :
>
> commit count read
> smp_rmb()
> write offset and buffer read
>
> Previously, the read-side did do :
> commit count read and write offset read
> smp_rmb()
> buffer read
>
> Which could lead to unordered write and commit count reads.
>
> It applies to LTTng 0.13 and earlier.
>
> Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at polymtl.ca>
> CC: Paul McKenney <Paul.McKenney at us.ibm.com>
> CC: Robert Wisniewski <bob at watson.ibm.com>
> ---
>  include/linux/ltt-tracer.h |   16 ++++
>  ltt/ltt-relay.c            |  178 ++++++++++++++++++++++++++++
> +----------------
>  2 files changed, 134 insertions(+), 60 deletions(-)
>
> Index: linux-2.6-lttng/include/linux/ltt-tracer.h
> ===================================================================
> --- linux-2.6-lttng.orig/include/linux/ltt-tracer.h   2008-08-05 11:
> 54:13.000000000 -0400
> +++ linux-2.6-lttng/include/linux/ltt-tracer.h   2008-08-05 11:54:
> 48.000000000 -0400
> @@ -176,6 +176,8 @@ struct ltt_channel_struct {
>     int overwrite;
>     struct kref kref;
>     int compact;
> +   unsigned int n_subbufs_order;   /* n_subbufs count order */
> +   unsigned int subbuf_size_order;   /* subbuf_size count order */
>
>     void *trans_channel_data;
>
> @@ -574,7 +576,21 @@ static inline char *ltt_write_event_head
>  /* Buffer offset macros */
>
>  #define BUFFER_OFFSET(offset, chan) ((offset) & (chan->alloc_size-1))
> +/*
> + * BUFFER_ALIGN aligns on the next buffer boundary.
> + * Note that if the offset is already aligned on a buffer boundary,
> + * it is pushed to the next one.
> + */
> +#define BUFFER_ALIGN(offset, chan) \
> +   (((offset) + chan->alloc_size) & (~(chan->alloc_size-1)))
> +#define BUFFER_TRUNC(offset, chan) \
> +   ((offset) & (~(chan->alloc_size-1)))
>  #define SUBBUF_OFFSET(offset, chan) ((offset) & (chan->subbuf_size-1))
> +/*
> + * SUBBUF_ALIGN aligns on the next subbuffer boundary.
> + * Note that if the offset is already aligned on a subbuffer boundary,
> + * it is pushed to the next one.
> + */
>  #define SUBBUF_ALIGN(offset, chan) \
>     (((offset) + chan->subbuf_size) & (~(chan->subbuf_size-1)))
>  #define SUBBUF_TRUNC(offset, chan) \
> Index: linux-2.6-lttng/ltt/ltt-relay.c
> ===================================================================
> --- linux-2.6-lttng.orig/ltt/ltt-relay.c   2008-08-05 11:54:43.000000000
-0400
> +++ linux-2.6-lttng/ltt/ltt-relay.c   2008-08-05 11:56:18.000000000 -0400
> @@ -261,24 +261,37 @@ static int ltt_ioctl(struct inode *inode
>     switch (cmd) {
>     case RELAY_GET_SUBBUF:
>     {
> -      long consumed_old, consumed_idx;
> +      long consumed_old, consumed_idx, commit_count, write_offset;
> +
>        atomic_long_inc(&ltt_buf->active_readers);
>        consumed_old = atomic_long_read(&ltt_buf->consumed);
>        consumed_idx = SUBBUF_INDEX(consumed_old, buf->chan);
> -      if (SUBBUF_OFFSET(
> -         local_read(&ltt_buf->commit_count[consumed_idx]),
> -            buf->chan) != 0) {
> +      commit_count = local_read(&ltt_buf->commit_count[consumed_idx]);
> +      /*
> +       * make sure we read the commit count before reading the
> +       * buffer data and the write offset.
> +       */
> +      smp_rmb();
> +      write_offset = local_read(&ltt_buf->offset);
> +      /*
> +       * Check that the subbuffer has been fully committed.
> +       */
> +      if (BUFFER_ALIGN(write_offset - 1
> +         - (consumed_idx << ltt_channel->subbuf_size_order),
> +            buf->chan)
> +         - (commit_count << ltt_channel->n_subbufs_order) != 0) {
>           atomic_long_dec(&ltt_buf->active_readers);
>           return -EAGAIN;
>        }
> -      if ((SUBBUF_TRUNC(local_read(&ltt_buf->offset), buf->chan)
> +      /*
> +       * Check that we are not about to read the same subbuffer in
> +       * which the writer head is.
> +       */
> +      if ((SUBBUF_TRUNC(write_offset, buf->chan)
>              - SUBBUF_TRUNC(consumed_old, buf->chan)) == 0) {
>           atomic_long_dec(&ltt_buf->active_readers);
>           return -EAGAIN;
>        }
> -      /* must make sure we read the counter before reading the data
> -       * in the buffer. */
> -      smp_rmb();
>        return put_user((u32)consumed_old, argp);
>        break;
>     }
> @@ -350,24 +363,29 @@ static void ltt_relay_print_subbuffer_er
>        long cons_off, unsigned int i)
>  {
>     struct rchan *rchan = ltt_chan->trans_channel_data;
> -   long cons_idx;
> +   long cons_idx, commit_count, write_offset;
>
> +   cons_idx = SUBBUF_INDEX(cons_off, rchan);
> +   commit_count = local_read(&ltt_chan->buf[i].commit_count[cons_idx]);
> +   /*
> +    * No need to order commit_count and write_offset reads because we
> +    * execute after trace is stopped when there are no readers left.
> +    */
> +   write_offset = local_read(&ltt_chan->buf[i].offset);
>     printk(KERN_WARNING
>        "LTT : unread channel %s offset is %ld "
>        "and cons_off : %ld (cpu %u)\n",
> -      ltt_chan->channel_name,
> -      local_read(&ltt_chan->buf[i].offset), cons_off, i);
> -   /* Check each sub-buffer for non zero commit count */
> -   cons_idx = SUBBUF_INDEX(cons_off, rchan);
> -   if (SUBBUF_OFFSET(local_read(&ltt_chan->buf[i].commit_count
[cons_idx]),
> -            rchan))
> +      ltt_chan->channel_name, write_offset, cons_off, i);
> +   /* Check each sub-buffer for non filled commit count */
> +   if (BUFFER_ALIGN(write_offset - 1
> +         - (cons_idx << ltt_chan->subbuf_size_order), rchan)
> +         - (commit_count << ltt_chan->n_subbufs_order))
>        printk(KERN_ALERT
> -         "LTT : %s : subbuffer %lu has non zero "
> -         "commit count.\n",
> -         ltt_chan->channel_name, cons_idx);
> +         "LTT : %s : subbuffer %lu has non filled "
> +         "commit count %lu.\n",
> +         ltt_chan->channel_name, cons_idx, commit_count);
>     printk(KERN_ALERT "LTT : %s : commit count : %lu, subbuf size %zd\n",
> -         ltt_chan->channel_name,
> -         local_read(&ltt_chan->buf[i].commit_count[cons_idx]),
> +         ltt_chan->channel_name, commit_count,
>           rchan->subbuf_size);
>  }
>
> @@ -508,6 +526,8 @@ static int ltt_relay_create_channel(char
>     (*ltt_chan)->buffer_begin = ltt_buffer_begin_callback;
>     (*ltt_chan)->buffer_end = ltt_buffer_end_callback;
>     (*ltt_chan)->overwrite = overwrite;
> +   (*ltt_chan)->n_subbufs_order = get_count_order(n_subbufs);
> +   (*ltt_chan)->subbuf_size_order = get_count_order(subbuf_size);
>     if (strcmp(channel_name, LTT_COMPACT_CHANNEL) == 0)
>        (*ltt_chan)->compact = 1;
>     else
> @@ -642,6 +662,12 @@ static void ltt_relay_remove_channel(str
>     kref_put(&channel->kref, ltt_relay_release_channel);
>  }
>
> +/*
> + * reserve_commit_diff is the difference between the write offset
> aligned to the
> + * next (or previous) subbuffer and n_subbufs times the commit
> count of a given
> + * subbuffer. It is used to find out if the space reserved in a
> given subbuffer
> + * has been fully committed.
> + */
>  struct ltt_reserve_switch_offsets {
>     long begin, end, old;
>     long begin_switch, end_switch_current, end_switch_old;
> @@ -680,16 +706,19 @@ static inline int ltt_relay_try_reserve(
>        }
>     }
>     if (offsets->begin_switch) {
> +      long subbuf_index;
>        if (offsets->end_switch_old)
>           offsets->begin = SUBBUF_ALIGN(offsets->begin,
>                    buf->chan);
>        offsets->begin = offsets->begin + ltt_subbuf_header_len();
>        /* Test new buffer integrity */
> +      subbuf_index = SUBBUF_INDEX(offsets->begin, buf->chan);
>        offsets->reserve_commit_diff =
> -         SUBBUF_OFFSET(buf->chan->subbuf_size
> -         - local_read(&ltt_buf->commit_count[
> -            SUBBUF_INDEX(offsets->begin, buf->chan)]),
> -               buf->chan);
> +         BUFFER_TRUNC(offsets->begin
> +         - (subbuf_index << ltt_channel->subbuf_size_order),
> +            buf->chan)
> +         - (local_read(&ltt_buf->commit_count[subbuf_index])
> +            << ltt_channel->n_subbufs_order);
>        if (offsets->reserve_commit_diff == 0) {
>           /* Next buffer not corrupted. */
>           if (!ltt_channel->overwrite &&
> @@ -775,6 +804,8 @@ static inline int ltt_relay_try_switch(
>        struct ltt_reserve_switch_offsets *offsets,
>        u64 *tsc)
>  {
> +   long subbuf_index;
> +
>     offsets->begin = local_read(&ltt_buf->offset);
>     offsets->old = offsets->begin;
>     offsets->begin_switch = 0;
> @@ -793,10 +824,13 @@ static inline int ltt_relay_try_switch(
>      * Always begin_switch in FORCE_ACTIVE mode.
>      * Test new buffer integrity
>      */
> -   offsets->reserve_commit_diff = SUBBUF_OFFSET(buf->chan->subbuf_size
> -      - local_read(
> -      &ltt_buf->commit_count[SUBBUF_INDEX(offsets->begin,
> -               buf->chan)]), buf->chan);
> +   subbuf_index = SUBBUF_INDEX(offsets->begin, buf->chan);
> +   offsets->reserve_commit_diff =
> +      BUFFER_TRUNC(offsets->begin
> +         - (subbuf_index << ltt_channel->subbuf_size_order),
> +            buf->chan)
> +      - (local_read(&ltt_buf->commit_count[subbuf_index])
> +         << ltt_channel->n_subbufs_order);
>     if (offsets->reserve_commit_diff == 0) {
>        /* Next buffer not corrupted. */
>        if (mode == FORCE_ACTIVE && !ltt_channel->overwrite &&
> @@ -820,7 +854,9 @@ static inline int ltt_relay_try_switch(
>  }
>
>  static inline void ltt_reserve_push_reader(
> -      struct ltt_channel_buf_struct *ltt_buf, struct rchan *rchan,
> +      struct ltt_channel_struct *ltt_channel,
> +      struct ltt_channel_buf_struct *ltt_buf,
> +      struct rchan *rchan,
>        struct rchan_buf *buf,
>        struct ltt_reserve_switch_offsets *offsets)
>  {
> @@ -864,14 +900,24 @@ static inline void ltt_reserve_push_read
>         */
>        if (offsets->reserve_commit_diff) {
>           /*
> -          * We have to alter the sub-buffer commit count : a
> -          * sub-buffer is corrupted. We do not deliver it.
> +          * We have to alter the sub-buffer commit count.
> +          * We do not deliver the previous subbuffer, given
> +          * it was either corrupted or not consumed (overwrite
> +          * mode).
>            */
> -         local_add(
> -            offsets->reserve_commit_diff,
> +         local_add(offsets->reserve_commit_diff
> +               >> ltt_channel->n_subbufs_order,
>              &ltt_buf->commit_count[
> -            SUBBUF_INDEX(offsets->begin, buf->chan)]);
> -         local_inc(&ltt_buf->corrupted_subbuffers);
> +               SUBBUF_INDEX(offsets->begin, rchan)]);
> +         if (offsets->reserve_commit_diff != rchan->alloc_size) {
> +            /*
> +             * The reserve commit diff was not n_subbufs *
> +             * subbuf_size (= alloc_size) : it means the
> +             * subbuffer was partly written to and is
> +             * therefore corrupted.
> +             */
> +            local_inc(&ltt_buf->corrupted_subbuffers);
> +         }
>        }
>     }
>  }
> @@ -903,18 +949,20 @@ static inline void ltt_reserve_switch_ol
>        struct rchan_buf *buf,
>        struct ltt_reserve_switch_offsets *offsets, u64 *tsc)
>  {
> -   ltt_channel->buffer_end(buf, *tsc, offsets->old,
> -      SUBBUF_INDEX((offsets->old-1), buf->chan));
> +   long oldidx = SUBBUF_INDEX(offsets->old - 1, buf->chan);
> +
> +   ltt_channel->buffer_end(buf, *tsc, offsets->old, oldidx);
>     /* Must write buffer end before incrementing commit count */
>     smp_wmb();
>     offsets->commit_count =
>        local_add_return(buf->chan->subbuf_size
>           - (SUBBUF_OFFSET(offsets->old-1, buf->chan)+1),
> -         &ltt_buf->commit_count[SUBBUF_INDEX(
> -               offsets->old-1, buf->chan)]);
> -   if (SUBBUF_OFFSET(offsets->commit_count, buf->chan) == 0)
> -      ltt_deliver(buf, SUBBUF_INDEX((offsets->old-1),
> -               buf->chan), NULL);
> +         &ltt_buf->commit_count[oldidx]);
> +   if (BUFFER_ALIGN(offsets->old - 1
> +      - (oldidx << ltt_channel->subbuf_size_order), rchan)
> +      - (offsets->commit_count
> +         << ltt_channel->n_subbufs_order) == 0)
> +      ltt_deliver(buf, oldidx, NULL);
>  }
>
>  /*
> @@ -930,16 +978,19 @@ static inline void ltt_reserve_switch_ne
>        struct rchan_buf *buf,
>        struct ltt_reserve_switch_offsets *offsets, u64 *tsc)
>  {
> -   ltt_channel->buffer_begin(buf, *tsc, SUBBUF_INDEX(offsets->begin,
> -            buf->chan));
> +   long beginidx = SUBBUF_INDEX(offsets->begin, buf->chan);
> +
> +   ltt_channel->buffer_begin(buf, *tsc, beginidx);
>     /* Must write buffer end before incrementing commit count */
>     smp_wmb();
>     offsets->commit_count = local_add_return(ltt_subbuf_header_len(),
> -         &ltt_buf->commit_count[
> -            SUBBUF_INDEX(offsets->begin, buf->chan)]);
> +         &ltt_buf->commit_count[beginidx]);
>     /* Check if the written buffer has to be delivered */
> -   if (SUBBUF_OFFSET(offsets->commit_count, buf->chan) == 0)
> -      ltt_deliver(buf, SUBBUF_INDEX(offsets->begin, buf->chan), NULL);
> +   if (BUFFER_ALIGN(offsets->begin
> +         - (beginidx << ltt_channel->subbuf_size_order), rchan)
> +         - (offsets->commit_count
> +            << ltt_channel->n_subbufs_order) == 0)
> +      ltt_deliver(buf, beginidx, NULL);
>  }
>
>
> @@ -967,18 +1018,20 @@ static inline void ltt_reserve_end_switc
>        struct rchan_buf *buf,
>        struct ltt_reserve_switch_offsets *offsets, u64 *tsc)
>  {
> -   ltt_channel->buffer_end(buf, *tsc, offsets->end,
> -      SUBBUF_INDEX((offsets->end-1), buf->chan));
> +   long endidx = SUBBUF_INDEX(offsets->end - 1, buf->chan);
> +
> +   ltt_channel->buffer_end(buf, *tsc, offsets->end, endidx);
>     /* Must write buffer begin before incrementing commit count */
>     smp_wmb();
>     offsets->commit_count =
>        local_add_return(buf->chan->subbuf_size
>           - (SUBBUF_OFFSET(offsets->end-1, buf->chan)+1),
> -         &ltt_buf->commit_count[SUBBUF_INDEX(
> -               offsets->end-1, buf->chan)]);
> -   if (SUBBUF_OFFSET(offsets->commit_count, buf->chan) == 0)
> -      ltt_deliver(buf, SUBBUF_INDEX((offsets->end-1),
> -         buf->chan), NULL);
> +         &ltt_buf->commit_count[endidx]);
> +   if (BUFFER_ALIGN(offsets->end - 1
> +         - (endidx << ltt_channel->subbuf_size_order), rchan)
> +         - (offsets->commit_count
> +            << ltt_channel->n_subbufs_order) == 0)
> +      ltt_deliver(buf, endidx, NULL);
>  }
>
>  /**
> @@ -1028,7 +1081,7 @@ static void *ltt_relay_reserve_slot(stru
>     /*
>      * Push the reader if necessary
>      */
> -   ltt_reserve_push_reader(ltt_buf, rchan, buf, &offsets);
> +   ltt_reserve_push_reader(ltt_channel, ltt_buf, rchan, buf, &offsets);
>
>     /*
>      * Switch old subbuffer if needed.
> @@ -1089,7 +1142,8 @@ static void ltt_force_switch(struct rcha
>      * Push the reader if necessary
>      */
>     if (mode == FORCE_ACTIVE)
> -      ltt_reserve_push_reader(ltt_buf, rchan, buf, &offsets);
> +      ltt_reserve_push_reader(ltt_channel, ltt_buf, rchan, buf,
> +         &offsets);
>
>     /*
>      * Switch old subbuffer if needed.
> @@ -1124,14 +1178,18 @@ static void ltt_relay_commit_slot(struct
>     struct ltt_channel_buf_struct *ltt_buf = &ltt_channel->buf[buf->cpu];
>     unsigned int offset_end = reserved - buf->start;
>     long commit_count;
> +   long endidx = SUBBUF_INDEX(offset_end - 1, buf->chan);
> +   struct rchan *rchan = buf->chan;
>
>     /* Must write slot data before incrementing commit count */
>     smp_wmb();
>     commit_count = local_add_return(slot_size,
> -      &ltt_buf->commit_count[SUBBUF_INDEX(offset_end-1, buf->chan)]);
> +      &ltt_buf->commit_count[endidx]);
>     /* Check if all commits have been done */
> -   if (SUBBUF_OFFSET(commit_count, buf->chan) == 0)
> -      ltt_deliver(buf, SUBBUF_INDEX(offset_end-1, buf->chan), NULL);
> +   if (BUFFER_ALIGN(offset_end - 1
> +         - (endidx << ltt_channel->subbuf_size_order), rchan)
> +         - (commit_count << ltt_channel->n_subbufs_order) == 0)
> +      ltt_deliver(buf, endidx, NULL);
>     /*
>      * Update lost_size for each commit. It's needed only for extracting
>      * ltt buffers from vmcore, after crash.
>
> --
> Mathieu Desnoyers
> OpenPGP key fingerprint: 8CD5 52C3 8E3C 4140 715F  BA06 3F25 A8FE 3BAE
9A68
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.casi.polymtl.ca/pipermail/lttng-dev/attachments/20080805/96f277d3/attachment-0003.htm>


More information about the lttng-dev mailing list