[ltt-dev] [PATCH 11/13] implement ltt-ascii

Mathieu Desnoyers compudj at krystal.dyndns.org
Thu Jan 15 15:02:14 EST 2009


* Lai Jiangshan (laijs at cn.fujitsu.com) wrote:
> 
> implement text output module for lttng.
> 
> Signed-off-by: Lai Jiangshan <laijs at cn.fujitsu.com>
> ---
>  b/ltt/Makefile    |    1
>  b/ltt/ltt-ascii.c |  442 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
>  ltt/Kconfig       |    8
>  3 files changed, 451 insertions(+)
> diff --git a/ltt/Kconfig b/ltt/Kconfig
> index 36548b5..91f2f05 100644
> --- a/ltt/Kconfig
> +++ b/ltt/Kconfig
> @@ -65,6 +65,14 @@ 
>  config LTT_TEXT_OUTPUT
>  	def_bool n
>  
> +config LTT_ASCII
> +	tristate "Linux Trace Toolkit Text Output"
> +	depends on LTT_SERIALIZE
> +	default y
> +	select LTT_TEXT_OUTPUT
> +	help
> +	  Support text output
> +
>  config LTT_RELAY_CHECK_RANDOM_ACCESS
>  	bool "Debug check for random access in ltt relay buffers"
>  	depends on LTT_RELAY
> diff --git a/ltt/Makefile b/ltt/Makefile
> index acc8cee..5fc4e4c 100644
> --- a/ltt/Makefile
> +++ b/ltt/Makefile
> @@ -8,6 +8,7 @@ obj-$(CONFIG_LTT_TRACER)		+= ltt-tracer.o
>  obj-$(CONFIG_LTT_USERSPACE_EVENT)	+= ltt-userspace-event.o
>  obj-$(CONFIG_LTT_RELAY)			+= ltt-relay.o
>  obj-$(CONFIG_LTT_RELAY_LOCKED)		+= ltt-relay-locked.o
> +obj-$(CONFIG_LTT_ASCII)			+= ltt-ascii.o
>  obj-$(CONFIG_LTT_RELAY_ALLOC)		+= ltt-relay-alloc.o
>  obj-$(CONFIG_LTT_NETLINK_CONTROL)	+= ltt-control.o
>  obj-$(CONFIG_LTT_TRACE_CONTROL)		+= ltt-trace-control.o
> diff --git a/ltt/ltt-ascii.c b/ltt/ltt-ascii.c
> index 0000000..28d9fc5
> --- /dev/null
> +++ b/ltt/ltt-ascii.c
> @@ -0,0 +1,442 @@
> +/*
> + * ltt/ltt-ascii.c
> + *
> + * (C) Copyright	2008	Lai Jiangshan (laijs at cn.fujitsu.com)
> + *
> + * Tracing text output.
> + *
> + * Author:
> + *	Lai Jiangshan (laijs at cn.fujitsu.com)
> + */
> +
> +#include <linux/ltt-tracer.h>
> +#include <linux/ltt-relay.h>
> +#include <linux/seq_file.h>
> +#include <linux/debugfs.h>
> +#include <linux/module.h>
> +#include <linux/string.h>
> +#include <linux/slab.h>
> +#include <linux/cpu.h>
> +#include <linux/fs.h>
> +
> +struct ltt_relay_cpu_iter {
> +	/* cpu buffer information */
> +	struct rchan_buf *buf;
> +	size_t start_offset;
> +	size_t eof_offset;
> +
> +	/* current event information */
> +	struct ltt_subbuffer_header *header;
> +	size_t offset;
> +	u64 tsc;
> +	u32 data_size;
> +	u16 chID; /* channel ID, const */
> +	u16 eID;
> +};
> +
> +static u64 calculate_tsc(u64 pre_tsc, u64 read_tsc, unsigned int rflags)
> +{
> +	u64 new_tsc = read_tsc;
> +
> +	if (rflags != LTT_RFLAG_ID_SIZE_TSC) {
> +		BUG_ON(read_tsc >> LTT_TSC_BITS);
> +
> +		new_tsc = (pre_tsc & ~LTT_TSC_MASK) + read_tsc;
> +		if (read_tsc < (pre_tsc & LTT_TSC_MASK))
> +			new_tsc += 1 << LTT_TSC_BITS;

1ULL ? (just to play safe)

> +	}
> +
> +	return new_tsc;
> +}
> +
> +static inline int calculate_offset(size_t offset, u16 chid, u16 eid)
> +{
> +	const char *fmt = "";
> +#ifdef config_ltt_alignment
> +	fmt = marker_get_fmt_form_id(chid, eid);
> +	bug_on(!fmt);
> +#endif
> +	return offset + ltt_fmt_largest_align(offset, fmt);
> +}

Hrm, basically, this can be rewritten as :

static inline int calculate_offset(size_t offset, u16 chid, u16 eid)
{
	const char *fmt;

  if (!ltt_get_alignment())
    return offset;
	fmt = marker_get_fmt_form_id(chid, eid);
	bug_on(!fmt);
	return offset + ltt_fmt_largest_align(offset, fmt);
}


> +
> +static void update_new_event(struct ltt_relay_cpu_iter *citer)
> +{
> +	u64 read_tsc;
> +	unsigned int rflags;
> +
> +	citer->offset = ltt_read_event_header(citer->buf, citer->offset,
> +			&read_tsc, &citer->data_size, &citer->eID, &rflags);
> +
> +	citer->offset = calculate_offset(citer->offset, citer->chID,
> +			citer->eID);
> +	citer->tsc = calculate_tsc(citer->tsc, read_tsc, rflags);
> +}
> +
> +static void update_event_size(struct ltt_relay_cpu_iter *citer)
> +{
> +	char output[1];
> +	const char *fmt;
> +
> +	if (citer->data_size != INT_MAX)
> +		return;
> +

OK, so you don't benefit from the dual calculation of the event size
from both the writer and the reader... it's a neat feature to spot bugs
when we have it, although there is a small performance impact associated
on the reader side. I'd recommend to still calculate the event payload
size as seen from the reader even when it's available in the data_size
information. After we can put a WARN_ON if the size differs (between the
original data_size written by the writer and the one calculated by
ltt_serialize_printf()).

> +	fmt = marker_get_fmt_form_id(citer->chID, citer->eID);
> +	BUG_ON(!fmt);
> +	ltt_serialize_printf(citer->buf, citer->offset, &citer->data_size,
> +			output, 0, fmt);
> +}
> +
> +/* Is @value in the region [@left, @right) */

The comment should state "in a circular buffer".

> +static int is_in_region(size_t value, size_t left, size_t right, size_t len)
> +{
> +	if (right < left)
> +		right += len;
> +	if (value < left)
> +		value += len;
> +	return value < right;
> +}
> +

/* Assumes buffer size (len) is power of two */

size_t half_len = len >> 1;
size_t len_mask = len - 1;

if ((value - left) & len_mask > half_len)
  return 0;
if ((right - value) & len_mask > half_len)
  return 0;
return 1;

I'm not convinced my implementation generates faster assembly.. this
should probably be tried. The good side here is that we only have two
comparison.


> +static void update_cpu_iter(struct ltt_relay_cpu_iter *citer, size_t old_offset)
> +{
> +	size_t len = citer->buf->chan->alloc_size;
> +	size_t eof_offset = BUFFER_OFFSET(citer->eof_offset, citer->buf->chan);
> +
> +	if (is_in_region(eof_offset, old_offset, citer->offset, len)) {
> +		citer->header = NULL;
> +	} else {
> +		update_new_event(citer);
> +		update_event_size(citer);
> +
> +		if (is_in_region(eof_offset, old_offset,
> +				citer->offset + citer->data_size, len))
> +			citer->header = NULL;
> +	}
> +}
> +
> +static void start_new_subbuffer(struct ltt_relay_cpu_iter *citer, size_t offset)
> +{
> +	int index = SUBBUF_INDEX(offset, citer->buf->chan);
> +	size_t head_offset = index * citer->buf->chan->subbuf_size;
> +

faster with :

size_t head_offset = index << citer->buf->chan->subbuf_size_order;

> +	citer->header = ltt_relay_offset_address(citer->buf, head_offset);
> +	citer->offset = head_offset + ltt_subbuffer_header_size();
> +	citer->tsc = citer->header->cycle_count_begin;
> +}
> +
> +static void ltt_relay_advance_cpu_iter(struct ltt_relay_cpu_iter *citer)
> +{
> +	size_t old_offset = citer->offset, new_offset = citer->offset;
> +	size_t sub_offset = SUBBUF_OFFSET(old_offset, citer->buf->chan);
> +
> +	new_offset += citer->data_size;
> +
> +	/* find that whether we read all data in this subbuffer */
> +	if (sub_offset + citer->data_size + citer->header->lost_size
> +			>= citer->buf->chan->subbuf_size) {
> +		new_offset += citer->header->lost_size;
> +		start_new_subbuffer(citer, new_offset);
> +	} else {
> +		citer->offset = new_offset + ltt_align(new_offset,
> +				sizeof(struct ltt_event_header));
> +	}
> +
> +	update_cpu_iter(citer, old_offset);
> +}
> +
> +static void ltt_relay_reset_cpu_iter(struct ltt_relay_cpu_iter *citer)
> +{
> +	start_new_subbuffer(citer, citer->start_offset);
> +	update_cpu_iter(citer, citer->start_offset);
> +}
> +
> +static int cpu_iter_eof(struct ltt_relay_cpu_iter *citer)
> +{
> +	return !citer->header;
> +}
> +
> +struct ltt_relay_iter {
> +	struct ltt_relay_cpu_iter iter_cpu[NR_CPUS];
> +	struct ltt_channel_struct *ltt_channel;
> +	loff_t pos;
> +	int cpu;
> +};
> +
> +static int ltt_relay_iter_eof(struct ltt_relay_iter *iter)
> +{
> +	return iter->cpu < 0;
> +}
> +
> +static void ltt_relay_advance_iter(struct ltt_relay_iter *iter)
> +{
> +	int i;
> +	struct ltt_relay_cpu_iter *curr, *min = NULL;
> +	iter->cpu = -1;
> +
> +	/*
> +	 * find the event with the minimum tsc.
> +	 * TODO: use min-heep for 4096CPUS
> +	 */

Yes, that's going to be required. :)

> +	for_each_possible_cpu(i) {
> +		curr = &iter->iter_cpu[i];
> +
> +		if (!curr->buf)
> +			continue;
> +
> +		if (cpu_iter_eof(curr))
> +			continue;

Hrm, how do we deal with the fact that some CPUs or channels wait for
too long before providing data ? This is the purpose of the planned
periodical buffer switch. I suspect that here you simply read the
"oldest available data", thus discarding the data from subbuffers not
completed yet, which could lead to timestamps going backward. Or am I
missing something ?

Ideally, we should just block and wait for data when any given buffer
have partially complete subbuffers.

Oh, maybe this only works on stopped traces then ? This periodical
subbuffer flush trick is only needed if we plan to read from
non-overwrite traces (normal traces) while tracing is active.

> +
> +		if (!min || curr->tsc < min->tsc) {
> +			min = curr;
> +			iter->cpu = i;
> +		}
> +	}
> +
> +	/* update cpu_iter for next ltt_relay_advance_iter() */
> +	if (min)
> +		ltt_relay_advance_cpu_iter(min);
> +
> +	/* increase pos */
> +	iter->pos++;
> +}
> +
> +static void ltt_relay_start_iter(struct ltt_relay_iter *iter)
> +{
> +	BUG_ON(iter->pos != -1);
> +
> +	/* TODO: use min-heep for 4096CPUS */
> +	ltt_relay_advance_iter(iter);
> +}
> +
> +static void *txt_next(struct seq_file *m, void *v, loff_t *ppos)
> +{
> +	struct ltt_relay_iter *iter = m->private;
> +
> +	BUG_ON(iter->pos != *ppos || v != iter);
> +
> +	ltt_relay_advance_iter(iter);
> +	(*ppos)++;
> +
> +	return ltt_relay_iter_eof(iter) ? NULL : iter;
> +}
> +
> +static void ltt_relay_reset_iter(struct ltt_relay_iter *iter)
> +{
> +	int i;
> +
> +	for_each_possible_cpu(i) {
> +		if (iter->iter_cpu[i].buf)
> +			ltt_relay_reset_cpu_iter(&iter->iter_cpu[i]);
> +	}
> +
> +	ltt_relay_start_iter(iter);
> +}
> +
> +static void *txt_start(struct seq_file *m, loff_t *ppos)
> +{
> +	loff_t pos = *ppos;
> +	struct ltt_relay_iter *iter = m->private;
> +
> +
> +	if (pos != iter->pos) {
> +		/* restart when read backward */
> +		if (pos < iter->pos)
> +			iter->pos = -1;
> +
> +		if (iter->pos < 0)
> +			ltt_relay_reset_iter(iter);
> +
> +		while (!ltt_relay_iter_eof(iter) && iter->pos < pos)
> +			ltt_relay_advance_iter(iter);
> +	}
> +
> +	return ltt_relay_iter_eof(iter) ? NULL : iter;
> +}
> +
> +static void txt_stop(struct seq_file *m, void *v)
> +{
> +	/* do nothing */
> +}
> +
> +static int seq_serialize(struct seq_file *m, struct rchan_buf *buf,
> +		size_t buf_offset, const char *fmt, size_t *data_size)
> +{
> +	int len;
> +
> +	if (m->count < m->size) {
> +		len = ltt_serialize_printf(buf, buf_offset, data_size,
> +				m->buf + m->count, m->size - m->count, fmt);
> +		if (m->count + len < m->size) {
> +			m->count += len;
> +			return 0;
> +		}
> +	}
> +
> +	m->count = m->size;
> +	return -1;
> +}
> +
> +static int txt_show(struct seq_file *m, void *v)
> +{
> +	struct ltt_relay_iter *iter = v;
> +	struct ltt_relay_cpu_iter *citer = &iter->iter_cpu[iter->cpu];
> +	const char *name;
> +	const char *fmt;
> +	unsigned long long tsc = citer->tsc;
> +	size_t *data_size;
> +	data_size = citer->data_size == INT_MAX ? &citer->data_size : NULL;
> +
> +	name = marker_get_name_form_id(citer->chID, citer->eID);
> +	fmt = marker_get_fmt_form_id(citer->chID, citer->eID);
> +
> +	if (!name || !fmt)
> +		return 0;
> +
> +	seq_printf(m, "%16.16s: %2d %20.20llu ", name, iter->cpu, tsc);
> +	seq_serialize(m, citer->buf, citer->offset, fmt, data_size);
> +	seq_puts(m, "\n");
> +
> +	return 0;
> +}
> +
> +static struct seq_operations txt_seq_ops = {
> +	.start		= txt_start,
> +	.next		= txt_next,
> +	.stop		= txt_stop,
> +	.show		= txt_show,
> +};
> +
> +#define get_value(ltt_chan, value, cpu)			\
> +ltt_chan->buf_access_ops->get_##value(ltt_chan->buf, cpu)
> +
> +static int ltt_relay_iter_get_channel(struct ltt_relay_iter *iter,
> +		struct ltt_channel_struct *ltt_channel)
> +{
> +	int i;
> +	struct rchan *chan = ltt_channel->trans_channel_data;
> +	u16 chID = ltt_channels_get_index_from_name(ltt_channel->channel_name);
> +
> +	/* we don't need lock relay_channels_mutex */

.. because the channel is insured to stay valid until all references to
the traces are released, and we hold one... ? (should comment)

> +	for_each_possible_cpu(i) {
> +		iter->iter_cpu[i].buf = chan->buf[i];
> +		if (iter->iter_cpu[i].buf)
> +			ltt_relay_get_chan_buf(chan->buf[i]);
> +	}
> +
> +	/* stop event record for all chan_buf for this channel */
> +	atomic_inc(&ltt_channel->channel_disabled);
> +	synchronize_sched();
> +
> +	for_each_possible_cpu(i) {
> +		if (!iter->iter_cpu[i].buf)
> +			continue;
> +
> +		iter->iter_cpu[i].start_offset = SUBBUF_TRUNC(
> +				get_value(ltt_channel, consumed, i),
> +				iter->iter_cpu[i].buf->chan);
> +		iter->iter_cpu[i].eof_offset =
> +				get_value(ltt_channel, offset, i);
> +		iter->iter_cpu[i].chID = chID;
> +	}
> +
> +	return 0;
> +}
> +
> +static int ltt_relay_iter_put_channel(struct ltt_relay_iter *iter)
> +{
> +	int i;
> +	for_each_possible_cpu(i) {
> +		if (iter->iter_cpu[i].buf)
> +			ltt_relay_put_chan_buf(iter->iter_cpu[i].buf);
> +	}
> +
> +	atomic_dec(&iter->ltt_channel->channel_disabled);
> +	return 0;
> +}
> +
> +static int ltt_relay_txt_open(struct inode *inode, struct file *file)
> +{
> +	int ret;
> +	struct ltt_channel_struct *ltt_channel = inode->i_private;
> +	struct ltt_relay_iter *iter = kmalloc(sizeof(*iter), GFP_KERNEL);
> +	if (!iter)
> +		return -ENOMEM;
> +
> +	iter->ltt_channel = ltt_channel;
> +	iter->pos = -1;
> +	ltt_relay_iter_get_channel(iter, ltt_channel);
> +
> +	ret = seq_open(file, &txt_seq_ops);
> +	if (ret) {
> +		ltt_relay_iter_put_channel(iter);
> +		kfree(iter);
> +		return ret;
> +	}
> +	((struct seq_file *)file->private_data)->private = iter;
> +
> +	return 0;
> +};
> +
> +static int ltt_relay_txt_release(struct inode *inode, struct file *file)
> +{
> +	struct seq_file *seq = file->private_data;
> +	struct ltt_relay_iter *iter = seq->private;
> +	ltt_relay_iter_put_channel(iter);
> +	kfree(iter);
> +	return 0;
> +}
> +
> +static struct file_operations ltt_txt_fops =
> +{
> +	.read = seq_read,
> +	.open = ltt_relay_txt_open,
> +	.release = ltt_relay_txt_release,
> +	.owner = THIS_MODULE,
> +};
> +
> +struct dentry *ltt_txt_create(struct ltt_trace_struct *trace,
> +		struct ltt_channel_struct *ltt_channel)
> +{
> +	struct dentry *entry = debugfs_create_file(ltt_channel->channel_name,
> +			S_IRUSR | S_IRGRP, trace->dentry.txt_root, ltt_channel,
> +			&ltt_txt_fops);
> +

We should handle the error case here. And make sure the ltt_txt_remove
won't forget to put the channel on error.

Mathieu

> +	if (entry)
> +		ltt_relay_get_chan(ltt_channel->trans_channel_data);
> +
> +	return entry;
> +}
> +EXPORT_SYMBOL_GPL(ltt_txt_create);
> +
> +void ltt_txt_remove(struct ltt_channel_struct *ltt_channel, struct dentry *txt)
> +{
> +	if (txt) {
> +		debugfs_remove(txt);
> +
> +		ltt_relay_put_chan(ltt_channel->trans_channel_data);
> +	}
> +}
> +EXPORT_SYMBOL_GPL(ltt_txt_remove);
> +
> +struct dentry *ltt_txt_dir_dentry;
> +EXPORT_SYMBOL_GPL(ltt_txt_dir_dentry);
> +
> +static __init int ltt_ascii_init(void)
> +{
> +	ltt_lock_traces();
> +	ltt_txt_dir_dentry = debugfs_create_dir("text", get_ltt_root());
> +	ltt_unlock_traces();
> +
> +	return ltt_txt_dir_dentry ? 0 : -EFAULT;
> +}
> +
> +static __exit void ltt_ascii_exit(void)
> +{
> +	debugfs_remove(ltt_txt_dir_dentry);
> +}
> +
> +module_init(ltt_ascii_init);
> +module_exit(ltt_ascii_exit);
> +
> +MODULE_LICENSE("GPL");
> +MODULE_AUTHOR("Lai Jiangshan at FNST");
> +MODULE_DESCRIPTION("Linux Trace Toolkit Next Generation Text Output");
> 
> 
> 
> 
> _______________________________________________
> ltt-dev mailing list
> ltt-dev at lists.casi.polymtl.ca
> http://lists.casi.polymtl.ca/cgi-bin/mailman/listinfo/ltt-dev
> 

-- 
Mathieu Desnoyers
OpenPGP key fingerprint: 8CD5 52C3 8E3C 4140 715F  BA06 3F25 A8FE 3BAE 9A68




More information about the lttng-dev mailing list