[ltt-dev] [PATCH] LTT relay formal verif fix

Mathieu Desnoyers compudj at krystal.dyndns.org
Tue Aug 5 12:12:45 EDT 2008


Running a ltt relay buffer model into the spin verification tool shown that in
rare cases, where subbuffers are so small that enough reserve (uncomitted) space
would be equal to the subbuffer size, the reader size could think that the
subbuffer is fully committed when in fact the data is currently being written to
it.

Fix this by checking the commit count and write offset difference to figure out
if all the reserved space for this subbuffer has been committed. Given that the
write offset increments for the whole subbuffer each time a given subbuffer's
commit count increment of the amount of bytes found in a subbuffer, we have to
multiply the commit count by the number of subbuffers to match the value of the
write offset. Also, the write offset has to be brought back to the window
corresponding to the commit count being checked. This is done by substracting
the subbuffer offset and by then aligning on the buffer size (- 1 is used to
align an already aligned offset on the current value, not the next).

Note that the retrieve count solution, used in the formal model, has not been
implemented in C code because of atomicity constraints due to the fact that
writers can push readers in flight recorder mode. Having an extra counter to
update makes synchronization messy.

This fix also moves the memory barriers found in the ltt_ioctl code to make sure
the commit count is read before the write offset, given that the write barrier
orders :

write offset and buffer write
smp_wmb()
commit count write

the read-side should look like :

commit count read
smp_rmb()
write offset and buffer read

Previously, the read-side did do :
commit count read and write offset read
smp_rmb()
buffer read

Which could lead to unordered write and commit count reads.

It applies to LTTng 0.13 and earlier.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at polymtl.ca>
CC: Paul McKenney <Paul.McKenney at us.ibm.com>
CC: Robert Wisniewski <bob at watson.ibm.com>
---
 include/linux/ltt-tracer.h |   16 ++++
 ltt/ltt-relay.c            |  178 +++++++++++++++++++++++++++++----------------
 2 files changed, 134 insertions(+), 60 deletions(-)

Index: linux-2.6-lttng/include/linux/ltt-tracer.h
===================================================================
--- linux-2.6-lttng.orig/include/linux/ltt-tracer.h	2008-08-05 11:54:13.000000000 -0400
+++ linux-2.6-lttng/include/linux/ltt-tracer.h	2008-08-05 11:54:48.000000000 -0400
@@ -176,6 +176,8 @@ struct ltt_channel_struct {
 	int overwrite;
 	struct kref kref;
 	int compact;
+	unsigned int n_subbufs_order;	/* n_subbufs count order */
+	unsigned int subbuf_size_order;	/* subbuf_size count order */
 
 	void *trans_channel_data;
 
@@ -574,7 +576,21 @@ static inline char *ltt_write_event_head
 /* Buffer offset macros */
 
 #define BUFFER_OFFSET(offset, chan) ((offset) & (chan->alloc_size-1))
+/*
+ * BUFFER_ALIGN aligns on the next buffer boundary.
+ * Note that if the offset is already aligned on a buffer boundary,
+ * it is pushed to the next one.
+ */
+#define BUFFER_ALIGN(offset, chan) \
+	(((offset) + chan->alloc_size) & (~(chan->alloc_size-1)))
+#define BUFFER_TRUNC(offset, chan) \
+	((offset) & (~(chan->alloc_size-1)))
 #define SUBBUF_OFFSET(offset, chan) ((offset) & (chan->subbuf_size-1))
+/*
+ * SUBBUF_ALIGN aligns on the next subbuffer boundary.
+ * Note that if the offset is already aligned on a subbuffer boundary,
+ * it is pushed to the next one.
+ */
 #define SUBBUF_ALIGN(offset, chan) \
 	(((offset) + chan->subbuf_size) & (~(chan->subbuf_size-1)))
 #define SUBBUF_TRUNC(offset, chan) \
Index: linux-2.6-lttng/ltt/ltt-relay.c
===================================================================
--- linux-2.6-lttng.orig/ltt/ltt-relay.c	2008-08-05 11:54:43.000000000 -0400
+++ linux-2.6-lttng/ltt/ltt-relay.c	2008-08-05 11:56:18.000000000 -0400
@@ -261,24 +261,37 @@ static int ltt_ioctl(struct inode *inode
 	switch (cmd) {
 	case RELAY_GET_SUBBUF:
 	{
-		long consumed_old, consumed_idx;
+		long consumed_old, consumed_idx, commit_count, write_offset;
+
 		atomic_long_inc(&ltt_buf->active_readers);
 		consumed_old = atomic_long_read(&ltt_buf->consumed);
 		consumed_idx = SUBBUF_INDEX(consumed_old, buf->chan);
-		if (SUBBUF_OFFSET(
-			local_read(&ltt_buf->commit_count[consumed_idx]),
-				buf->chan) != 0) {
+		commit_count = local_read(&ltt_buf->commit_count[consumed_idx]);
+		/*
+		 * make sure we read the commit count before reading the
+		 * buffer data and the write offset.
+		 */
+		smp_rmb();
+		write_offset = local_read(&ltt_buf->offset);
+		/*
+		 * Check that the subbuffer has been fully committed.
+		 */
+		if (BUFFER_ALIGN(write_offset - 1
+			- (consumed_idx << ltt_channel->subbuf_size_order),
+				buf->chan)
+			- (commit_count << ltt_channel->n_subbufs_order) != 0) {
 			atomic_long_dec(&ltt_buf->active_readers);
 			return -EAGAIN;
 		}
-		if ((SUBBUF_TRUNC(local_read(&ltt_buf->offset), buf->chan)
+		/*
+		 * Check that we are not about to read the same subbuffer in
+		 * which the writer head is.
+		 */
+		if ((SUBBUF_TRUNC(write_offset, buf->chan)
 				- SUBBUF_TRUNC(consumed_old, buf->chan)) == 0) {
 			atomic_long_dec(&ltt_buf->active_readers);
 			return -EAGAIN;
 		}
-		/* must make sure we read the counter before reading the data
-		 * in the buffer. */
-		smp_rmb();
 		return put_user((u32)consumed_old, argp);
 		break;
 	}
@@ -350,24 +363,29 @@ static void ltt_relay_print_subbuffer_er
 		long cons_off, unsigned int i)
 {
 	struct rchan *rchan = ltt_chan->trans_channel_data;
-	long cons_idx;
+	long cons_idx, commit_count, write_offset;
 
+	cons_idx = SUBBUF_INDEX(cons_off, rchan);
+	commit_count = local_read(&ltt_chan->buf[i].commit_count[cons_idx]);
+	/*
+	 * No need to order commit_count and write_offset reads because we
+	 * execute after trace is stopped when there are no readers left.
+	 */
+	write_offset = local_read(&ltt_chan->buf[i].offset);
 	printk(KERN_WARNING
 		"LTT : unread channel %s offset is %ld "
 		"and cons_off : %ld (cpu %u)\n",
-		ltt_chan->channel_name,
-		local_read(&ltt_chan->buf[i].offset), cons_off, i);
-	/* Check each sub-buffer for non zero commit count */
-	cons_idx = SUBBUF_INDEX(cons_off, rchan);
-	if (SUBBUF_OFFSET(local_read(&ltt_chan->buf[i].commit_count[cons_idx]),
-				rchan))
+		ltt_chan->channel_name, write_offset, cons_off, i);
+	/* Check each sub-buffer for non filled commit count */
+	if (BUFFER_ALIGN(write_offset - 1
+			- (cons_idx << ltt_chan->subbuf_size_order), rchan)
+			- (commit_count << ltt_chan->n_subbufs_order))
 		printk(KERN_ALERT
-			"LTT : %s : subbuffer %lu has non zero "
-			"commit count.\n",
-			ltt_chan->channel_name, cons_idx);
+			"LTT : %s : subbuffer %lu has non filled "
+			"commit count %lu.\n",
+			ltt_chan->channel_name, cons_idx, commit_count);
 	printk(KERN_ALERT "LTT : %s : commit count : %lu, subbuf size %zd\n",
-			ltt_chan->channel_name,
-			local_read(&ltt_chan->buf[i].commit_count[cons_idx]),
+			ltt_chan->channel_name, commit_count,
 			rchan->subbuf_size);
 }
 
@@ -508,6 +526,8 @@ static int ltt_relay_create_channel(char
 	(*ltt_chan)->buffer_begin = ltt_buffer_begin_callback;
 	(*ltt_chan)->buffer_end = ltt_buffer_end_callback;
 	(*ltt_chan)->overwrite = overwrite;
+	(*ltt_chan)->n_subbufs_order = get_count_order(n_subbufs);
+	(*ltt_chan)->subbuf_size_order = get_count_order(subbuf_size);
 	if (strcmp(channel_name, LTT_COMPACT_CHANNEL) == 0)
 		(*ltt_chan)->compact = 1;
 	else
@@ -642,6 +662,12 @@ static void ltt_relay_remove_channel(str
 	kref_put(&channel->kref, ltt_relay_release_channel);
 }
 
+/*
+ * reserve_commit_diff is the difference between the write offset aligned to the
+ * next (or previous) subbuffer and n_subbufs times the commit count of a given
+ * subbuffer. It is used to find out if the space reserved in a given subbuffer
+ * has been fully committed.
+ */
 struct ltt_reserve_switch_offsets {
 	long begin, end, old;
 	long begin_switch, end_switch_current, end_switch_old;
@@ -680,16 +706,19 @@ static inline int ltt_relay_try_reserve(
 		}
 	}
 	if (offsets->begin_switch) {
+		long subbuf_index;
 		if (offsets->end_switch_old)
 			offsets->begin = SUBBUF_ALIGN(offsets->begin,
 						buf->chan);
 		offsets->begin = offsets->begin + ltt_subbuf_header_len();
 		/* Test new buffer integrity */
+		subbuf_index = SUBBUF_INDEX(offsets->begin, buf->chan);
 		offsets->reserve_commit_diff =
-			SUBBUF_OFFSET(buf->chan->subbuf_size
-			- local_read(&ltt_buf->commit_count[
-				SUBBUF_INDEX(offsets->begin, buf->chan)]),
-					buf->chan);
+			BUFFER_TRUNC(offsets->begin
+			- (subbuf_index << ltt_channel->subbuf_size_order),
+				buf->chan)
+			- (local_read(&ltt_buf->commit_count[subbuf_index])
+				<< ltt_channel->n_subbufs_order);
 		if (offsets->reserve_commit_diff == 0) {
 			/* Next buffer not corrupted. */
 			if (!ltt_channel->overwrite &&
@@ -775,6 +804,8 @@ static inline int ltt_relay_try_switch(
 		struct ltt_reserve_switch_offsets *offsets,
 		u64 *tsc)
 {
+	long subbuf_index;
+
 	offsets->begin = local_read(&ltt_buf->offset);
 	offsets->old = offsets->begin;
 	offsets->begin_switch = 0;
@@ -793,10 +824,13 @@ static inline int ltt_relay_try_switch(
 	 * Always begin_switch in FORCE_ACTIVE mode.
 	 * Test new buffer integrity
 	 */
-	offsets->reserve_commit_diff = SUBBUF_OFFSET(buf->chan->subbuf_size
-		- local_read(
-		&ltt_buf->commit_count[SUBBUF_INDEX(offsets->begin,
-					buf->chan)]), buf->chan);
+	subbuf_index = SUBBUF_INDEX(offsets->begin, buf->chan);
+	offsets->reserve_commit_diff =
+		BUFFER_TRUNC(offsets->begin
+			- (subbuf_index << ltt_channel->subbuf_size_order),
+				buf->chan)
+		- (local_read(&ltt_buf->commit_count[subbuf_index])
+			<< ltt_channel->n_subbufs_order);
 	if (offsets->reserve_commit_diff == 0) {
 		/* Next buffer not corrupted. */
 		if (mode == FORCE_ACTIVE && !ltt_channel->overwrite &&
@@ -820,7 +854,9 @@ static inline int ltt_relay_try_switch(
 }
 
 static inline void ltt_reserve_push_reader(
-		struct ltt_channel_buf_struct *ltt_buf, struct rchan *rchan,
+		struct ltt_channel_struct *ltt_channel,
+		struct ltt_channel_buf_struct *ltt_buf,
+		struct rchan *rchan,
 		struct rchan_buf *buf,
 		struct ltt_reserve_switch_offsets *offsets)
 {
@@ -864,14 +900,24 @@ static inline void ltt_reserve_push_read
 		 */
 		if (offsets->reserve_commit_diff) {
 			/*
-			 * We have to alter the sub-buffer commit count : a
-			 * sub-buffer is corrupted. We do not deliver it.
+			 * We have to alter the sub-buffer commit count.
+			 * We do not deliver the previous subbuffer, given
+			 * it was either corrupted or not consumed (overwrite
+			 * mode).
 			 */
-			local_add(
-				offsets->reserve_commit_diff,
+			local_add(offsets->reserve_commit_diff
+					>> ltt_channel->n_subbufs_order,
 				&ltt_buf->commit_count[
-				SUBBUF_INDEX(offsets->begin, buf->chan)]);
-			local_inc(&ltt_buf->corrupted_subbuffers);
+					SUBBUF_INDEX(offsets->begin, rchan)]);
+			if (offsets->reserve_commit_diff != rchan->alloc_size) {
+				/*
+				 * The reserve commit diff was not n_subbufs *
+				 * subbuf_size (= alloc_size) : it means the
+				 * subbuffer was partly written to and is
+				 * therefore corrupted.
+				 */
+				local_inc(&ltt_buf->corrupted_subbuffers);
+			}
 		}
 	}
 }
@@ -903,18 +949,20 @@ static inline void ltt_reserve_switch_ol
 		struct rchan_buf *buf,
 		struct ltt_reserve_switch_offsets *offsets, u64 *tsc)
 {
-	ltt_channel->buffer_end(buf, *tsc, offsets->old,
-		SUBBUF_INDEX((offsets->old-1), buf->chan));
+	long oldidx = SUBBUF_INDEX(offsets->old - 1, buf->chan);
+
+	ltt_channel->buffer_end(buf, *tsc, offsets->old, oldidx);
 	/* Must write buffer end before incrementing commit count */
 	smp_wmb();
 	offsets->commit_count =
 		local_add_return(buf->chan->subbuf_size
 			- (SUBBUF_OFFSET(offsets->old-1, buf->chan)+1),
-			&ltt_buf->commit_count[SUBBUF_INDEX(
-					offsets->old-1, buf->chan)]);
-	if (SUBBUF_OFFSET(offsets->commit_count, buf->chan) == 0)
-		ltt_deliver(buf, SUBBUF_INDEX((offsets->old-1),
-					buf->chan), NULL);
+			&ltt_buf->commit_count[oldidx]);
+	if (BUFFER_ALIGN(offsets->old - 1
+		- (oldidx << ltt_channel->subbuf_size_order), rchan)
+		- (offsets->commit_count
+			<< ltt_channel->n_subbufs_order) == 0)
+		ltt_deliver(buf, oldidx, NULL);
 }
 
 /*
@@ -930,16 +978,19 @@ static inline void ltt_reserve_switch_ne
 		struct rchan_buf *buf,
 		struct ltt_reserve_switch_offsets *offsets, u64 *tsc)
 {
-	ltt_channel->buffer_begin(buf, *tsc, SUBBUF_INDEX(offsets->begin,
-				buf->chan));
+	long beginidx = SUBBUF_INDEX(offsets->begin, buf->chan);
+
+	ltt_channel->buffer_begin(buf, *tsc, beginidx);
 	/* Must write buffer end before incrementing commit count */
 	smp_wmb();
 	offsets->commit_count = local_add_return(ltt_subbuf_header_len(),
-			&ltt_buf->commit_count[
-				SUBBUF_INDEX(offsets->begin, buf->chan)]);
+			&ltt_buf->commit_count[beginidx]);
 	/* Check if the written buffer has to be delivered */
-	if (SUBBUF_OFFSET(offsets->commit_count, buf->chan) == 0)
-		ltt_deliver(buf, SUBBUF_INDEX(offsets->begin, buf->chan), NULL);
+	if (BUFFER_ALIGN(offsets->begin
+			- (beginidx << ltt_channel->subbuf_size_order), rchan)
+			- (offsets->commit_count
+				<< ltt_channel->n_subbufs_order) == 0)
+		ltt_deliver(buf, beginidx, NULL);
 }
 
 
@@ -967,18 +1018,20 @@ static inline void ltt_reserve_end_switc
 		struct rchan_buf *buf,
 		struct ltt_reserve_switch_offsets *offsets, u64 *tsc)
 {
-	ltt_channel->buffer_end(buf, *tsc, offsets->end,
-		SUBBUF_INDEX((offsets->end-1), buf->chan));
+	long endidx = SUBBUF_INDEX(offsets->end - 1, buf->chan);
+
+	ltt_channel->buffer_end(buf, *tsc, offsets->end, endidx);
 	/* Must write buffer begin before incrementing commit count */
 	smp_wmb();
 	offsets->commit_count =
 		local_add_return(buf->chan->subbuf_size
 			- (SUBBUF_OFFSET(offsets->end-1, buf->chan)+1),
-			&ltt_buf->commit_count[SUBBUF_INDEX(
-					offsets->end-1, buf->chan)]);
-	if (SUBBUF_OFFSET(offsets->commit_count, buf->chan) == 0)
-		ltt_deliver(buf, SUBBUF_INDEX((offsets->end-1),
-			buf->chan), NULL);
+			&ltt_buf->commit_count[endidx]);
+	if (BUFFER_ALIGN(offsets->end - 1
+			- (endidx << ltt_channel->subbuf_size_order), rchan)
+			- (offsets->commit_count
+				<< ltt_channel->n_subbufs_order) == 0)
+		ltt_deliver(buf, endidx, NULL);
 }
 
 /**
@@ -1028,7 +1081,7 @@ static void *ltt_relay_reserve_slot(stru
 	/*
 	 * Push the reader if necessary
 	 */
-	ltt_reserve_push_reader(ltt_buf, rchan, buf, &offsets);
+	ltt_reserve_push_reader(ltt_channel, ltt_buf, rchan, buf, &offsets);
 
 	/*
 	 * Switch old subbuffer if needed.
@@ -1089,7 +1142,8 @@ static void ltt_force_switch(struct rcha
 	 * Push the reader if necessary
 	 */
 	if (mode == FORCE_ACTIVE)
-		ltt_reserve_push_reader(ltt_buf, rchan, buf, &offsets);
+		ltt_reserve_push_reader(ltt_channel, ltt_buf, rchan, buf,
+			&offsets);
 
 	/*
 	 * Switch old subbuffer if needed.
@@ -1124,14 +1178,18 @@ static void ltt_relay_commit_slot(struct
 	struct ltt_channel_buf_struct *ltt_buf = &ltt_channel->buf[buf->cpu];
 	unsigned int offset_end = reserved - buf->start;
 	long commit_count;
+	long endidx = SUBBUF_INDEX(offset_end - 1, buf->chan);
+	struct rchan *rchan = buf->chan;
 
 	/* Must write slot data before incrementing commit count */
 	smp_wmb();
 	commit_count = local_add_return(slot_size,
-		&ltt_buf->commit_count[SUBBUF_INDEX(offset_end-1, buf->chan)]);
+		&ltt_buf->commit_count[endidx]);
 	/* Check if all commits have been done */
-	if (SUBBUF_OFFSET(commit_count, buf->chan) == 0)
-		ltt_deliver(buf, SUBBUF_INDEX(offset_end-1, buf->chan), NULL);
+	if (BUFFER_ALIGN(offset_end - 1
+			- (endidx << ltt_channel->subbuf_size_order), rchan)
+			- (commit_count << ltt_channel->n_subbufs_order) == 0)
+		ltt_deliver(buf, endidx, NULL);
 	/*
 	 * Update lost_size for each commit. It's needed only for extracting
 	 * ltt buffers from vmcore, after crash.

-- 
Mathieu Desnoyers
OpenPGP key fingerprint: 8CD5 52C3 8E3C 4140 715F  BA06 3F25 A8FE 3BAE 9A68




More information about the lttng-dev mailing list