[ltt-dev] [PATCH 11/13] implement ltt-ascii
Mathieu Desnoyers
compudj at krystal.dyndns.org
Thu Jan 15 15:02:14 EST 2009
* Lai Jiangshan (laijs at cn.fujitsu.com) wrote:
>
> implement text output module for lttng.
>
> Signed-off-by: Lai Jiangshan <laijs at cn.fujitsu.com>
> ---
> b/ltt/Makefile | 1
> b/ltt/ltt-ascii.c | 442 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
> ltt/Kconfig | 8
> 3 files changed, 451 insertions(+)
> diff --git a/ltt/Kconfig b/ltt/Kconfig
> index 36548b5..91f2f05 100644
> --- a/ltt/Kconfig
> +++ b/ltt/Kconfig
> @@ -65,6 +65,14 @@
> config LTT_TEXT_OUTPUT
> def_bool n
>
> +config LTT_ASCII
> + tristate "Linux Trace Toolkit Text Output"
> + depends on LTT_SERIALIZE
> + default y
> + select LTT_TEXT_OUTPUT
> + help
> + Support text output
> +
> config LTT_RELAY_CHECK_RANDOM_ACCESS
> bool "Debug check for random access in ltt relay buffers"
> depends on LTT_RELAY
> diff --git a/ltt/Makefile b/ltt/Makefile
> index acc8cee..5fc4e4c 100644
> --- a/ltt/Makefile
> +++ b/ltt/Makefile
> @@ -8,6 +8,7 @@ obj-$(CONFIG_LTT_TRACER) += ltt-tracer.o
> obj-$(CONFIG_LTT_USERSPACE_EVENT) += ltt-userspace-event.o
> obj-$(CONFIG_LTT_RELAY) += ltt-relay.o
> obj-$(CONFIG_LTT_RELAY_LOCKED) += ltt-relay-locked.o
> +obj-$(CONFIG_LTT_ASCII) += ltt-ascii.o
> obj-$(CONFIG_LTT_RELAY_ALLOC) += ltt-relay-alloc.o
> obj-$(CONFIG_LTT_NETLINK_CONTROL) += ltt-control.o
> obj-$(CONFIG_LTT_TRACE_CONTROL) += ltt-trace-control.o
> diff --git a/ltt/ltt-ascii.c b/ltt/ltt-ascii.c
> index 0000000..28d9fc5
> --- /dev/null
> +++ b/ltt/ltt-ascii.c
> @@ -0,0 +1,442 @@
> +/*
> + * ltt/ltt-ascii.c
> + *
> + * (C) Copyright 2008 Lai Jiangshan (laijs at cn.fujitsu.com)
> + *
> + * Tracing text output.
> + *
> + * Author:
> + * Lai Jiangshan (laijs at cn.fujitsu.com)
> + */
> +
> +#include <linux/ltt-tracer.h>
> +#include <linux/ltt-relay.h>
> +#include <linux/seq_file.h>
> +#include <linux/debugfs.h>
> +#include <linux/module.h>
> +#include <linux/string.h>
> +#include <linux/slab.h>
> +#include <linux/cpu.h>
> +#include <linux/fs.h>
> +
> +struct ltt_relay_cpu_iter {
> + /* cpu buffer information */
> + struct rchan_buf *buf;
> + size_t start_offset;
> + size_t eof_offset;
> +
> + /* current event information */
> + struct ltt_subbuffer_header *header;
> + size_t offset;
> + u64 tsc;
> + u32 data_size;
> + u16 chID; /* channel ID, const */
> + u16 eID;
> +};
> +
> +static u64 calculate_tsc(u64 pre_tsc, u64 read_tsc, unsigned int rflags)
> +{
> + u64 new_tsc = read_tsc;
> +
> + if (rflags != LTT_RFLAG_ID_SIZE_TSC) {
> + BUG_ON(read_tsc >> LTT_TSC_BITS);
> +
> + new_tsc = (pre_tsc & ~LTT_TSC_MASK) + read_tsc;
> + if (read_tsc < (pre_tsc & LTT_TSC_MASK))
> + new_tsc += 1 << LTT_TSC_BITS;
1ULL ? (just to play safe)
> + }
> +
> + return new_tsc;
> +}
> +
> +static inline int calculate_offset(size_t offset, u16 chid, u16 eid)
> +{
> + const char *fmt = "";
> +#ifdef config_ltt_alignment
> + fmt = marker_get_fmt_form_id(chid, eid);
> + bug_on(!fmt);
> +#endif
> + return offset + ltt_fmt_largest_align(offset, fmt);
> +}
Hrm, basically, this can be rewritten as :
static inline int calculate_offset(size_t offset, u16 chid, u16 eid)
{
const char *fmt;
if (!ltt_get_alignment())
return offset;
fmt = marker_get_fmt_form_id(chid, eid);
bug_on(!fmt);
return offset + ltt_fmt_largest_align(offset, fmt);
}
> +
> +static void update_new_event(struct ltt_relay_cpu_iter *citer)
> +{
> + u64 read_tsc;
> + unsigned int rflags;
> +
> + citer->offset = ltt_read_event_header(citer->buf, citer->offset,
> + &read_tsc, &citer->data_size, &citer->eID, &rflags);
> +
> + citer->offset = calculate_offset(citer->offset, citer->chID,
> + citer->eID);
> + citer->tsc = calculate_tsc(citer->tsc, read_tsc, rflags);
> +}
> +
> +static void update_event_size(struct ltt_relay_cpu_iter *citer)
> +{
> + char output[1];
> + const char *fmt;
> +
> + if (citer->data_size != INT_MAX)
> + return;
> +
OK, so you don't benefit from the dual calculation of the event size
from both the writer and the reader... it's a neat feature to spot bugs
when we have it, although there is a small performance impact associated
on the reader side. I'd recommend to still calculate the event payload
size as seen from the reader even when it's available in the data_size
information. After we can put a WARN_ON if the size differs (between the
original data_size written by the writer and the one calculated by
ltt_serialize_printf()).
> + fmt = marker_get_fmt_form_id(citer->chID, citer->eID);
> + BUG_ON(!fmt);
> + ltt_serialize_printf(citer->buf, citer->offset, &citer->data_size,
> + output, 0, fmt);
> +}
> +
> +/* Is @value in the region [@left, @right) */
The comment should state "in a circular buffer".
> +static int is_in_region(size_t value, size_t left, size_t right, size_t len)
> +{
> + if (right < left)
> + right += len;
> + if (value < left)
> + value += len;
> + return value < right;
> +}
> +
/* Assumes buffer size (len) is power of two */
size_t half_len = len >> 1;
size_t len_mask = len - 1;
if ((value - left) & len_mask > half_len)
return 0;
if ((right - value) & len_mask > half_len)
return 0;
return 1;
I'm not convinced my implementation generates faster assembly.. this
should probably be tried. The good side here is that we only have two
comparison.
> +static void update_cpu_iter(struct ltt_relay_cpu_iter *citer, size_t old_offset)
> +{
> + size_t len = citer->buf->chan->alloc_size;
> + size_t eof_offset = BUFFER_OFFSET(citer->eof_offset, citer->buf->chan);
> +
> + if (is_in_region(eof_offset, old_offset, citer->offset, len)) {
> + citer->header = NULL;
> + } else {
> + update_new_event(citer);
> + update_event_size(citer);
> +
> + if (is_in_region(eof_offset, old_offset,
> + citer->offset + citer->data_size, len))
> + citer->header = NULL;
> + }
> +}
> +
> +static void start_new_subbuffer(struct ltt_relay_cpu_iter *citer, size_t offset)
> +{
> + int index = SUBBUF_INDEX(offset, citer->buf->chan);
> + size_t head_offset = index * citer->buf->chan->subbuf_size;
> +
faster with :
size_t head_offset = index << citer->buf->chan->subbuf_size_order;
> + citer->header = ltt_relay_offset_address(citer->buf, head_offset);
> + citer->offset = head_offset + ltt_subbuffer_header_size();
> + citer->tsc = citer->header->cycle_count_begin;
> +}
> +
> +static void ltt_relay_advance_cpu_iter(struct ltt_relay_cpu_iter *citer)
> +{
> + size_t old_offset = citer->offset, new_offset = citer->offset;
> + size_t sub_offset = SUBBUF_OFFSET(old_offset, citer->buf->chan);
> +
> + new_offset += citer->data_size;
> +
> + /* find that whether we read all data in this subbuffer */
> + if (sub_offset + citer->data_size + citer->header->lost_size
> + >= citer->buf->chan->subbuf_size) {
> + new_offset += citer->header->lost_size;
> + start_new_subbuffer(citer, new_offset);
> + } else {
> + citer->offset = new_offset + ltt_align(new_offset,
> + sizeof(struct ltt_event_header));
> + }
> +
> + update_cpu_iter(citer, old_offset);
> +}
> +
> +static void ltt_relay_reset_cpu_iter(struct ltt_relay_cpu_iter *citer)
> +{
> + start_new_subbuffer(citer, citer->start_offset);
> + update_cpu_iter(citer, citer->start_offset);
> +}
> +
> +static int cpu_iter_eof(struct ltt_relay_cpu_iter *citer)
> +{
> + return !citer->header;
> +}
> +
> +struct ltt_relay_iter {
> + struct ltt_relay_cpu_iter iter_cpu[NR_CPUS];
> + struct ltt_channel_struct *ltt_channel;
> + loff_t pos;
> + int cpu;
> +};
> +
> +static int ltt_relay_iter_eof(struct ltt_relay_iter *iter)
> +{
> + return iter->cpu < 0;
> +}
> +
> +static void ltt_relay_advance_iter(struct ltt_relay_iter *iter)
> +{
> + int i;
> + struct ltt_relay_cpu_iter *curr, *min = NULL;
> + iter->cpu = -1;
> +
> + /*
> + * find the event with the minimum tsc.
> + * TODO: use min-heep for 4096CPUS
> + */
Yes, that's going to be required. :)
> + for_each_possible_cpu(i) {
> + curr = &iter->iter_cpu[i];
> +
> + if (!curr->buf)
> + continue;
> +
> + if (cpu_iter_eof(curr))
> + continue;
Hrm, how do we deal with the fact that some CPUs or channels wait for
too long before providing data ? This is the purpose of the planned
periodical buffer switch. I suspect that here you simply read the
"oldest available data", thus discarding the data from subbuffers not
completed yet, which could lead to timestamps going backward. Or am I
missing something ?
Ideally, we should just block and wait for data when any given buffer
have partially complete subbuffers.
Oh, maybe this only works on stopped traces then ? This periodical
subbuffer flush trick is only needed if we plan to read from
non-overwrite traces (normal traces) while tracing is active.
> +
> + if (!min || curr->tsc < min->tsc) {
> + min = curr;
> + iter->cpu = i;
> + }
> + }
> +
> + /* update cpu_iter for next ltt_relay_advance_iter() */
> + if (min)
> + ltt_relay_advance_cpu_iter(min);
> +
> + /* increase pos */
> + iter->pos++;
> +}
> +
> +static void ltt_relay_start_iter(struct ltt_relay_iter *iter)
> +{
> + BUG_ON(iter->pos != -1);
> +
> + /* TODO: use min-heep for 4096CPUS */
> + ltt_relay_advance_iter(iter);
> +}
> +
> +static void *txt_next(struct seq_file *m, void *v, loff_t *ppos)
> +{
> + struct ltt_relay_iter *iter = m->private;
> +
> + BUG_ON(iter->pos != *ppos || v != iter);
> +
> + ltt_relay_advance_iter(iter);
> + (*ppos)++;
> +
> + return ltt_relay_iter_eof(iter) ? NULL : iter;
> +}
> +
> +static void ltt_relay_reset_iter(struct ltt_relay_iter *iter)
> +{
> + int i;
> +
> + for_each_possible_cpu(i) {
> + if (iter->iter_cpu[i].buf)
> + ltt_relay_reset_cpu_iter(&iter->iter_cpu[i]);
> + }
> +
> + ltt_relay_start_iter(iter);
> +}
> +
> +static void *txt_start(struct seq_file *m, loff_t *ppos)
> +{
> + loff_t pos = *ppos;
> + struct ltt_relay_iter *iter = m->private;
> +
> +
> + if (pos != iter->pos) {
> + /* restart when read backward */
> + if (pos < iter->pos)
> + iter->pos = -1;
> +
> + if (iter->pos < 0)
> + ltt_relay_reset_iter(iter);
> +
> + while (!ltt_relay_iter_eof(iter) && iter->pos < pos)
> + ltt_relay_advance_iter(iter);
> + }
> +
> + return ltt_relay_iter_eof(iter) ? NULL : iter;
> +}
> +
> +static void txt_stop(struct seq_file *m, void *v)
> +{
> + /* do nothing */
> +}
> +
> +static int seq_serialize(struct seq_file *m, struct rchan_buf *buf,
> + size_t buf_offset, const char *fmt, size_t *data_size)
> +{
> + int len;
> +
> + if (m->count < m->size) {
> + len = ltt_serialize_printf(buf, buf_offset, data_size,
> + m->buf + m->count, m->size - m->count, fmt);
> + if (m->count + len < m->size) {
> + m->count += len;
> + return 0;
> + }
> + }
> +
> + m->count = m->size;
> + return -1;
> +}
> +
> +static int txt_show(struct seq_file *m, void *v)
> +{
> + struct ltt_relay_iter *iter = v;
> + struct ltt_relay_cpu_iter *citer = &iter->iter_cpu[iter->cpu];
> + const char *name;
> + const char *fmt;
> + unsigned long long tsc = citer->tsc;
> + size_t *data_size;
> + data_size = citer->data_size == INT_MAX ? &citer->data_size : NULL;
> +
> + name = marker_get_name_form_id(citer->chID, citer->eID);
> + fmt = marker_get_fmt_form_id(citer->chID, citer->eID);
> +
> + if (!name || !fmt)
> + return 0;
> +
> + seq_printf(m, "%16.16s: %2d %20.20llu ", name, iter->cpu, tsc);
> + seq_serialize(m, citer->buf, citer->offset, fmt, data_size);
> + seq_puts(m, "\n");
> +
> + return 0;
> +}
> +
> +static struct seq_operations txt_seq_ops = {
> + .start = txt_start,
> + .next = txt_next,
> + .stop = txt_stop,
> + .show = txt_show,
> +};
> +
> +#define get_value(ltt_chan, value, cpu) \
> +ltt_chan->buf_access_ops->get_##value(ltt_chan->buf, cpu)
> +
> +static int ltt_relay_iter_get_channel(struct ltt_relay_iter *iter,
> + struct ltt_channel_struct *ltt_channel)
> +{
> + int i;
> + struct rchan *chan = ltt_channel->trans_channel_data;
> + u16 chID = ltt_channels_get_index_from_name(ltt_channel->channel_name);
> +
> + /* we don't need lock relay_channels_mutex */
.. because the channel is insured to stay valid until all references to
the traces are released, and we hold one... ? (should comment)
> + for_each_possible_cpu(i) {
> + iter->iter_cpu[i].buf = chan->buf[i];
> + if (iter->iter_cpu[i].buf)
> + ltt_relay_get_chan_buf(chan->buf[i]);
> + }
> +
> + /* stop event record for all chan_buf for this channel */
> + atomic_inc(<t_channel->channel_disabled);
> + synchronize_sched();
> +
> + for_each_possible_cpu(i) {
> + if (!iter->iter_cpu[i].buf)
> + continue;
> +
> + iter->iter_cpu[i].start_offset = SUBBUF_TRUNC(
> + get_value(ltt_channel, consumed, i),
> + iter->iter_cpu[i].buf->chan);
> + iter->iter_cpu[i].eof_offset =
> + get_value(ltt_channel, offset, i);
> + iter->iter_cpu[i].chID = chID;
> + }
> +
> + return 0;
> +}
> +
> +static int ltt_relay_iter_put_channel(struct ltt_relay_iter *iter)
> +{
> + int i;
> + for_each_possible_cpu(i) {
> + if (iter->iter_cpu[i].buf)
> + ltt_relay_put_chan_buf(iter->iter_cpu[i].buf);
> + }
> +
> + atomic_dec(&iter->ltt_channel->channel_disabled);
> + return 0;
> +}
> +
> +static int ltt_relay_txt_open(struct inode *inode, struct file *file)
> +{
> + int ret;
> + struct ltt_channel_struct *ltt_channel = inode->i_private;
> + struct ltt_relay_iter *iter = kmalloc(sizeof(*iter), GFP_KERNEL);
> + if (!iter)
> + return -ENOMEM;
> +
> + iter->ltt_channel = ltt_channel;
> + iter->pos = -1;
> + ltt_relay_iter_get_channel(iter, ltt_channel);
> +
> + ret = seq_open(file, &txt_seq_ops);
> + if (ret) {
> + ltt_relay_iter_put_channel(iter);
> + kfree(iter);
> + return ret;
> + }
> + ((struct seq_file *)file->private_data)->private = iter;
> +
> + return 0;
> +};
> +
> +static int ltt_relay_txt_release(struct inode *inode, struct file *file)
> +{
> + struct seq_file *seq = file->private_data;
> + struct ltt_relay_iter *iter = seq->private;
> + ltt_relay_iter_put_channel(iter);
> + kfree(iter);
> + return 0;
> +}
> +
> +static struct file_operations ltt_txt_fops =
> +{
> + .read = seq_read,
> + .open = ltt_relay_txt_open,
> + .release = ltt_relay_txt_release,
> + .owner = THIS_MODULE,
> +};
> +
> +struct dentry *ltt_txt_create(struct ltt_trace_struct *trace,
> + struct ltt_channel_struct *ltt_channel)
> +{
> + struct dentry *entry = debugfs_create_file(ltt_channel->channel_name,
> + S_IRUSR | S_IRGRP, trace->dentry.txt_root, ltt_channel,
> + <t_txt_fops);
> +
We should handle the error case here. And make sure the ltt_txt_remove
won't forget to put the channel on error.
Mathieu
> + if (entry)
> + ltt_relay_get_chan(ltt_channel->trans_channel_data);
> +
> + return entry;
> +}
> +EXPORT_SYMBOL_GPL(ltt_txt_create);
> +
> +void ltt_txt_remove(struct ltt_channel_struct *ltt_channel, struct dentry *txt)
> +{
> + if (txt) {
> + debugfs_remove(txt);
> +
> + ltt_relay_put_chan(ltt_channel->trans_channel_data);
> + }
> +}
> +EXPORT_SYMBOL_GPL(ltt_txt_remove);
> +
> +struct dentry *ltt_txt_dir_dentry;
> +EXPORT_SYMBOL_GPL(ltt_txt_dir_dentry);
> +
> +static __init int ltt_ascii_init(void)
> +{
> + ltt_lock_traces();
> + ltt_txt_dir_dentry = debugfs_create_dir("text", get_ltt_root());
> + ltt_unlock_traces();
> +
> + return ltt_txt_dir_dentry ? 0 : -EFAULT;
> +}
> +
> +static __exit void ltt_ascii_exit(void)
> +{
> + debugfs_remove(ltt_txt_dir_dentry);
> +}
> +
> +module_init(ltt_ascii_init);
> +module_exit(ltt_ascii_exit);
> +
> +MODULE_LICENSE("GPL");
> +MODULE_AUTHOR("Lai Jiangshan at FNST");
> +MODULE_DESCRIPTION("Linux Trace Toolkit Next Generation Text Output");
>
>
>
>
> _______________________________________________
> ltt-dev mailing list
> ltt-dev at lists.casi.polymtl.ca
> http://lists.casi.polymtl.ca/cgi-bin/mailman/listinfo/ltt-dev
>
--
Mathieu Desnoyers
OpenPGP key fingerprint: 8CD5 52C3 8E3C 4140 715F BA06 3F25 A8FE 3BAE 9A68
More information about the lttng-dev
mailing list