[ltt-dev] [RFC patch 05/41] LTTng relay buffer allocation, read, write
Mathieu Desnoyers
mathieu.desnoyers at polymtl.ca
Fri Mar 6 15:01:51 EST 2009
* Steven Rostedt (rostedt at goodmis.org) wrote:
>
> On Thu, 5 Mar 2009, Mathieu Desnoyers wrote:
>
> > As I told Martin, I was thinking about taking an axe and moving stuff around in
> > relay. Which I just did.
> >
> > This patch reimplements relay with a linked list of pages. Provides read/write
> > wrappers which should be used to read or write from the buffers. It's the core
> > of a layered approach to the design requirements expressed by Martin and
> > discussed earlier.
> >
> > It does not provide _any_ sort of locking on buffer data. Locking should be done
> > by the caller. Given that we might think of very lightweight locking schemes, it
> > makes sense to me that the underlying buffering infrastructure supports event
> > records larger than 1 page.
>
> You bring up two points.
>
> 1) lockless
>
> 2) larger than 1 page of data
>
> Soon the ring buffer will be lockless on the write side. On the read side
> we have locking. This could be changed to allow for a new API with
> specific requirements that does not need reader side locking. If the ring
> buffer is strictly produce/consumer without overwrite, then it would make
> sense to have a lockless system on both reader and writer.
>
> I have even expressed interest in implementing this. But right now my
> focus has been on getting other aspects working. Ftrace runs in overwrite
> mode so it must have the locking.
>
Note that this "buffer allocation" is not lockless in the sense that it
deals with concurrent writers in a lockless manner. It just _does not
provide_ any protection for the write side nor the read-side. The layer
over this provides this. In this patchset, it's called
"ltt-relay-locked" for the irq off/spinlock version of the concurrency
management. I did not post the lockless version in this post, it is
further down in my patchset and will probably require a bit more
discussion.
I really like to have the allocation and "locking management" layers
separated, because we can then easily mix and match each of those.
For allocation :
- Page-based backend
- Static array-based backend
- Video memory based backend (useful for crash trace extraction, because
this storage survives a hot reboot)
For locking :
- lockless
- irq off/spinlock
>
> >
> > A cache saving 4 pointers is used to keep track of current page used for the
> > buffer for write, current page read and two contiguous subbuffer header pointer
> > lookup. The offset of each page within the buffer is saved in a structure
> > containing the offset, linked list and page frame pointer to permit cache lookup
> > without extra locking.
>
> I'm also all for optimizations. Right now the focus has been on making
> sure the ring buffer can do all that is requested of it. I wanted it to be
> able to useful for all users, not focused on a select few. This actually
> includes ftrace. That is, I did not develop the ring buffer to have only
> ftrace as its only user.
>
I think we both agree that it's good to make the buffering
infrastructure usable by all.
> >
> > The offset and linked list are not placed in the page frame itself to allow
> > using the pages directly for disk I/O, network I/O or to mmap it to userspace
> > for live processing.
> >
> > Write and header address lookup tested through LTTng. This patch contains
> > self-test code which detects if a client is actually trying to use the
> > read/write/get header address API to do random buffer offset access. If such
> > behavior is detected, a warning message is issued and the random access is done
> > as requested.
>
> Hmm, if a warning message is done, it seems that the tracer should shut
> down. Either support the operation or do not support it. Do not give the
> user a "Oh, you really should not do that, but I'll let you anyway".
> Otherwise you will be pressured to make it a true feature.
>
It is _possible_ that such scenario occurs, and it is supported. It's
just very unlikely. With the lockless tracer, if the tracing code is
interrupted for a long time and still holds a reference to a previous
page while the current write offset went too far away, then it's OK to
do a backward-multiple-pages-walk. But it's very, very unlikely. And
this debugging option helps pinpointing ill uses of the buffers in the
"likely" path very quickly. So I would never consider it a "failure" per
se. It's one of those useful tracer-debug-only options.
> >
> > TODO : Currently, no splice file operations are implemented. Should come soon.
> > The idea is to splice the buffers directly into files or to the network.
> > We have to make sure the page frame fields used are not used by disk I/O or
> > network.
> >
> > Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at polymtl.ca>
> > CC: Jens Axboe <jens.axboe at oracle.com>
> > CC: Martin Bligh <mbligh at google.com>
> > CC: Peter Zijlstra <a.p.zijlstra at chello.nl>
> > CC: Tom Zanussi <zanussi at comcast.net>
> > CC: prasad at linux.vnet.ibm.com
> > CC: Linus Torvalds <torvalds at linux-foundation.org>
> > CC: Thomas Gleixner <tglx at linutronix.de>
> > CC: Steven Rostedt <rostedt at goodmis.org>
> > CC: od at suse.com
> > CC: "Frank Ch. Eigler" <fche at redhat.com>
> > CC: Andrew Morton <akpm at linux-foundation.org>
> > CC: hch at lst.de
> > CC: David Wilder <dwilder at us.ibm.com>
> > ---
> > include/linux/ltt-relay.h | 182 +++++++++++
> > ltt/ltt-relay-alloc.c | 705 ++++++++++++++++++++++++++++++++++++++++++++++
> > 2 files changed, 887 insertions(+)
> >
> > Index: linux-2.6-lttng/ltt/ltt-relay-alloc.c
> > ===================================================================
> > --- /dev/null 1970-01-01 00:00:00.000000000 +0000
> > +++ linux-2.6-lttng/ltt/ltt-relay-alloc.c 2009-03-05 15:05:56.000000000 -0500
> > @@ -0,0 +1,705 @@
> > +/*
> > + * Public API and common code for kernel->userspace relay file support.
> > + *
> > + * Copyright (C) 2002-2005 - Tom Zanussi (zanussi at us.ibm.com), IBM Corp
> > + * Copyright (C) 1999-2005 - Karim Yaghmour (karim at opersys.com)
> > + * Copyright (C) 2008 - Mathieu Desnoyers (mathieu.desnoyers at polymtl.ca)
> > + *
> > + * Moved to kernel/relay.c by Paul Mundt, 2006.
> > + * November 2006 - CPU hotplug support by Mathieu Desnoyers
> > + * (mathieu.desnoyers at polymtl.ca)
> > + *
> > + * This file is released under the GPL.
> > + */
> > +#include <linux/errno.h>
> > +#include <linux/stddef.h>
> > +#include <linux/slab.h>
> > +#include <linux/module.h>
> > +#include <linux/string.h>
> > +#include <linux/ltt-relay.h>
> > +#include <linux/vmalloc.h>
> > +#include <linux/mm.h>
> > +#include <linux/cpu.h>
> > +#include <linux/splice.h>
> > +#include <linux/bitops.h>
> > +
> > +/* list of open channels, for cpu hotplug */
> > +static DEFINE_MUTEX(relay_channels_mutex);
> > +static LIST_HEAD(relay_channels);
> > +
> > +/**
> > + * relay_alloc_buf - allocate a channel buffer
> > + * @buf: the buffer struct
> > + * @size: total size of the buffer
> > + */
> > +static int relay_alloc_buf(struct rchan_buf *buf, size_t *size)
> > +{
> > + unsigned int i, n_pages;
> > + struct buf_page *buf_page, *n;
> > +
> > + *size = PAGE_ALIGN(*size);
> > + n_pages = *size >> PAGE_SHIFT;
> > +
> > + INIT_LIST_HEAD(&buf->pages);
> > +
> > + for (i = 0; i < n_pages; i++) {
> > + buf_page = kmalloc_node(sizeof(*buf_page), GFP_KERNEL,
> > + cpu_to_node(buf->cpu));
> > + if (unlikely(!buf_page))
> > + goto depopulate;
> > + buf_page->page = alloc_pages_node(cpu_to_node(buf->cpu),
> > + GFP_KERNEL | __GFP_ZERO, 0);
>
> So these buffers allocate single pages?
>
Each buffer allocate n_pages. Each of these pages are allocated with
alloc_pages_node(), and linked together in a linked list.
I use a linked list of pages rather than a page pointer array to make
sure I never depend on vmalloc if the page pointer array grows too
large. vmalloc'd data can trigger page faults, which I don't want. Note
that an alternate option to this would be to use vmalloc_sync_all()
after the pages have been allocated, but I prefer not to use vmalloc at
all unless there is a clear advantage.
Note that the tracer calls vmalloc_sync_all() for each tracer module
registered. Actually, the probe modules should also call
vmalloc_sync_all() before their callback gets registered. This would
ensure the module code and data would never trigger a page fault.
I actually wonder why we don't add a vmalloc_sync_all() call in
module.c ? this is a slow path anyway...
> > + if (unlikely(!buf_page->page)) {
> > + kfree(buf_page);
> > + goto depopulate;
> > + }
> > + list_add_tail(&buf_page->list, &buf->pages);
> > + buf_page->offset = (size_t)i << PAGE_SHIFT;
> > + set_page_private(buf_page->page, (unsigned long)buf_page);
> > + if (i == 0) {
> > + buf->wpage = buf_page;
> > + buf->hpage[0] = buf_page;
> > + buf->hpage[1] = buf_page;
> > + buf->rpage = buf_page;
> > + }
> > + }
> > + buf->page_count = n_pages;
> > + return 0;
> > +
> > +depopulate:
> > + list_for_each_entry_safe(buf_page, n, &buf->pages, list) {
> > + list_del_init(&buf_page->list);
> > + __free_page(buf_page->page);
> > + kfree(buf_page);
> > + }
> > + return -ENOMEM;
> > +}
> > +
> > +/**
> > + * relay_create_buf - allocate and initialize a channel buffer
> > + * @chan: the relay channel
> > + * @cpu: cpu the buffer belongs to
> > + *
> > + * Returns channel buffer if successful, %NULL otherwise.
> > + */
> > +static struct rchan_buf *relay_create_buf(struct rchan *chan, int cpu)
> > +{
> > + int ret;
> > + struct rchan_buf *buf = kzalloc(sizeof(struct rchan_buf), GFP_KERNEL);
> > + if (!buf)
> > + return NULL;
> > +
> > + buf->cpu = cpu;
> > + ret = relay_alloc_buf(buf, &chan->alloc_size);
> > + if (ret)
> > + goto free_buf;
> > +
> > + buf->chan = chan;
> > + kref_get(&buf->chan->kref);
> > + return buf;
> > +
> > +free_buf:
> > + kfree(buf);
> > + return NULL;
> > +}
> > +
> > +/**
> > + * relay_destroy_channel - free the channel struct
> > + * @kref: target kernel reference that contains the relay channel
> > + *
> > + * Should only be called from kref_put().
> > + */
> > +static void relay_destroy_channel(struct kref *kref)
> > +{
> > + struct rchan *chan = container_of(kref, struct rchan, kref);
> > + kfree(chan);
> > +}
> > +
> > +void ltt_relay_get_chan(struct rchan *chan)
> > +{
> > + kref_get(&chan->kref);
> > +}
> > +EXPORT_SYMBOL_GPL(ltt_relay_get_chan);
> > +
> > +void ltt_relay_put_chan(struct rchan *chan)
> > +{
> > + kref_put(&chan->kref, relay_destroy_channel);
> > +}
> > +EXPORT_SYMBOL_GPL(ltt_relay_put_chan);
> > +
> > +/**
> > + * relay_destroy_buf - destroy an rchan_buf struct and associated buffer
> > + * @buf: the buffer struct
> > + */
> > +static void relay_destroy_buf(struct rchan_buf *buf)
> > +{
> > + struct rchan *chan = buf->chan;
> > + struct buf_page *buf_page, *n;
> > +
> > + list_for_each_entry_safe(buf_page, n, &buf->pages, list) {
> > + list_del_init(&buf_page->list);
> > + __free_page(buf_page->page);
> > + kfree(buf_page);
> > + }
> > + chan->buf[buf->cpu] = NULL;
> > + kfree(buf);
> > + kref_put(&chan->kref, relay_destroy_channel);
> > +}
> > +
> > +/**
> > + * relay_remove_buf - remove a channel buffer
> > + * @kref: target kernel reference that contains the relay buffer
> > + *
> > + * Removes the file from the fileystem, which also frees the
> > + * rchan_buf_struct and the channel buffer. Should only be called from
> > + * kref_put().
> > + */
> > +static void relay_remove_buf(struct kref *kref)
> > +{
> > + struct rchan_buf *buf = container_of(kref, struct rchan_buf, kref);
> > + buf->chan->cb->remove_buf_file(buf->dentry);
> > + relay_destroy_buf(buf);
> > +}
> > +
> > +void ltt_relay_get_chan_buf(struct rchan_buf *buf)
> > +{
> > + kref_get(&buf->kref);
> > +}
> > +EXPORT_SYMBOL_GPL(ltt_relay_get_chan_buf);
> > +
> > +void ltt_relay_put_chan_buf(struct rchan_buf *buf)
> > +{
> > + kref_put(&buf->kref, relay_remove_buf);
> > +}
> > +EXPORT_SYMBOL_GPL(ltt_relay_put_chan_buf);
> > +
> > +/*
> > + * High-level relay kernel API and associated functions.
> > + */
> > +
> > +/*
> > + * rchan_callback implementations defining default channel behavior. Used
> > + * in place of corresponding NULL values in client callback struct.
> > + */
> > +
> > +/*
> > + * create_buf_file_create() default callback. Does nothing.
> > + */
> > +static struct dentry *create_buf_file_default_callback(const char *filename,
> > + struct dentry *parent,
> > + int mode,
> > + struct rchan_buf *buf)
> > +{
> > + return NULL;
> > +}
> > +
> > +/*
> > + * remove_buf_file() default callback. Does nothing.
> > + */
> > +static int remove_buf_file_default_callback(struct dentry *dentry)
> > +{
> > + return -EINVAL;
> > +}
> > +
> > +/* relay channel default callbacks */
> > +static struct rchan_callbacks default_channel_callbacks = {
> > + .create_buf_file = create_buf_file_default_callback,
> > + .remove_buf_file = remove_buf_file_default_callback,
> > +};
> > +
> > +/**
> > + * __relay_reset - reset a channel buffer
> > + * @buf: the channel buffer
> > + * @init: 1 if this is a first-time initialization
> > + *
> > + * See relay_reset() for description of effect.
> > + */
> > +static void __relay_reset(struct rchan_buf *buf, unsigned int init)
> > +{
> > + if (init)
> > + kref_init(&buf->kref);
> > +}
> > +
> > +/*
> > + * relay_open_buf - create a new relay channel buffer
> > + *
> > + * used by relay_open() and CPU hotplug.
> > + */
> > +static struct rchan_buf *relay_open_buf(struct rchan *chan, unsigned int cpu)
> > +{
> > + struct rchan_buf *buf = NULL;
> > + struct dentry *dentry;
> > + char *tmpname;
> > +
> > + tmpname = kzalloc(NAME_MAX + 1, GFP_KERNEL);
> > + if (!tmpname)
> > + goto end;
> > + snprintf(tmpname, NAME_MAX, "%s%d", chan->base_filename, cpu);
> > +
> > + buf = relay_create_buf(chan, cpu);
> > + if (!buf)
> > + goto free_name;
> > +
> > + __relay_reset(buf, 1);
> > +
> > + /* Create file in fs */
> > + dentry = chan->cb->create_buf_file(tmpname, chan->parent, S_IRUSR,
> > + buf);
> > + if (!dentry)
> > + goto free_buf;
> > +
> > + buf->dentry = dentry;
> > +
> > + goto free_name;
> > +
> > +free_buf:
> > + relay_destroy_buf(buf);
> > + buf = NULL;
> > +free_name:
> > + kfree(tmpname);
> > +end:
> > + return buf;
> > +}
> > +
> > +/**
> > + * relay_close_buf - close a channel buffer
> > + * @buf: channel buffer
> > + *
> > + * Restores the default callbacks.
> > + * The channel buffer and channel buffer data structure are then freed
> > + * automatically when the last reference is given up.
> > + */
> > +static void relay_close_buf(struct rchan_buf *buf)
> > +{
> > + kref_put(&buf->kref, relay_remove_buf);
> > +}
> > +
> > +static void setup_callbacks(struct rchan *chan,
> > + struct rchan_callbacks *cb)
> > +{
> > + if (!cb) {
> > + chan->cb = &default_channel_callbacks;
> > + return;
> > + }
> > +
> > + if (!cb->create_buf_file)
> > + cb->create_buf_file = create_buf_file_default_callback;
> > + if (!cb->remove_buf_file)
> > + cb->remove_buf_file = remove_buf_file_default_callback;
> > + chan->cb = cb;
> > +}
> > +
> > +/**
> > + * relay_hotcpu_callback - CPU hotplug callback
> > + * @nb: notifier block
> > + * @action: hotplug action to take
> > + * @hcpu: CPU number
> > + *
> > + * Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
> > + */
> > +static int __cpuinit relay_hotcpu_callback(struct notifier_block *nb,
> > + unsigned long action,
> > + void *hcpu)
> > +{
> > + unsigned int hotcpu = (unsigned long)hcpu;
> > + struct rchan *chan;
> > +
> > + switch (action) {
> > + case CPU_UP_PREPARE:
> > + case CPU_UP_PREPARE_FROZEN:
> > + mutex_lock(&relay_channels_mutex);
> > + list_for_each_entry(chan, &relay_channels, list) {
> > + if (chan->buf[hotcpu])
> > + continue;
> > + chan->buf[hotcpu] = relay_open_buf(chan, hotcpu);
> > + if (!chan->buf[hotcpu]) {
> > + printk(KERN_ERR
> > + "relay_hotcpu_callback: cpu %d buffer "
> > + "creation failed\n", hotcpu);
> > + mutex_unlock(&relay_channels_mutex);
> > + return NOTIFY_BAD;
> > + }
> > + }
> > + mutex_unlock(&relay_channels_mutex);
> > + break;
> > + case CPU_DEAD:
> > + case CPU_DEAD_FROZEN:
> > + /* No need to flush the cpu : will be flushed upon
> > + * final relay_flush() call. */
> > + break;
> > + }
> > + return NOTIFY_OK;
> > +}
> > +
> > +/**
> > + * ltt_relay_open - create a new relay channel
> > + * @base_filename: base name of files to create
> > + * @parent: dentry of parent directory, %NULL for root directory
> > + * @subbuf_size: size of sub-buffers
> > + * @n_subbufs: number of sub-buffers
> > + * @cb: client callback functions
> > + * @private_data: user-defined data
> > + *
> > + * Returns channel pointer if successful, %NULL otherwise.
> > + *
> > + * Creates a channel buffer for each cpu using the sizes and
> > + * attributes specified. The created channel buffer files
> > + * will be named base_filename0...base_filenameN-1. File
> > + * permissions will be %S_IRUSR.
> > + */
> > +struct rchan *ltt_relay_open(const char *base_filename,
> > + struct dentry *parent,
> > + size_t subbuf_size,
> > + size_t n_subbufs,
> > + struct rchan_callbacks *cb,
> > + void *private_data)
> > +{
> > + unsigned int i;
> > + struct rchan *chan;
> > + if (!base_filename)
> > + return NULL;
> > +
> > + if (!(subbuf_size && n_subbufs))
> > + return NULL;
> > +
> > + chan = kzalloc(sizeof(struct rchan), GFP_KERNEL);
> > + if (!chan)
> > + return NULL;
> > +
> > + chan->version = LTT_RELAY_CHANNEL_VERSION;
> > + chan->n_subbufs = n_subbufs;
> > + chan->subbuf_size = subbuf_size;
>
> You declare the sub buf size here, but I do not see how it gets allocated.
>
below :
chan->alloc_size = FIX_SIZE(subbuf_size * n_subbufs);
Then :
relay_open_buf()
relay_create_buf()
relay_alloc_buf(buf, &chan->alloc_size);
And there we iterate on n_pages, this is PAGE_ALIGN(*size) >> PAGE_SHIFT
> > +static int relay_alloc_buf(struct rchan_buf *buf, size_t *size)
> > +{
> > + unsigned int i, n_pages;
> > + struct buf_page *buf_page, *n;
> > +
> > + *size = PAGE_ALIGN(*size);
> > + n_pages = *size >> PAGE_SHIFT;
> > +
> > + INIT_LIST_HEAD(&buf->pages);
> > +
> > + for (i = 0; i < n_pages; i++) {
Mathieu
> -- Steve
>
> > + chan->subbuf_size_order = get_count_order(subbuf_size);
> > + chan->alloc_size = FIX_SIZE(subbuf_size * n_subbufs);
> > + chan->parent = parent;
> > + chan->private_data = private_data;
> > + strlcpy(chan->base_filename, base_filename, NAME_MAX);
> > + setup_callbacks(chan, cb);
> > + kref_init(&chan->kref);
> > +
> > + mutex_lock(&relay_channels_mutex);
> > + for_each_online_cpu(i) {
> > + chan->buf[i] = relay_open_buf(chan, i);
> > + if (!chan->buf[i])
> > + goto free_bufs;
> > + }
> > + list_add(&chan->list, &relay_channels);
> > + mutex_unlock(&relay_channels_mutex);
> > +
> > + return chan;
> > +
> > +free_bufs:
> > + for_each_possible_cpu(i) {
> > + if (!chan->buf[i])
> > + break;
> > + relay_close_buf(chan->buf[i]);
> > + }
> > +
> > + kref_put(&chan->kref, relay_destroy_channel);
> > + mutex_unlock(&relay_channels_mutex);
> > + return NULL;
> > +}
> > +EXPORT_SYMBOL_GPL(ltt_relay_open);
> > +
> > +/**
> > + * ltt_relay_close - close the channel
> > + * @chan: the channel
> > + *
> > + * Closes all channel buffers and frees the channel.
> > + */
> > +void ltt_relay_close(struct rchan *chan)
> > +{
> > + unsigned int i;
> > +
> > + if (!chan)
> > + return;
> > +
> > + mutex_lock(&relay_channels_mutex);
> > + for_each_possible_cpu(i)
> > + if (chan->buf[i])
> > + relay_close_buf(chan->buf[i]);
> > +
> > + list_del(&chan->list);
> > + kref_put(&chan->kref, relay_destroy_channel);
> > + mutex_unlock(&relay_channels_mutex);
> > +}
> > +EXPORT_SYMBOL_GPL(ltt_relay_close);
> > +
> > +/*
> > + * Start iteration at the previous element. Skip the real list head.
> > + */
> > +static struct buf_page *ltt_relay_find_prev_page(struct rchan_buf *buf,
> > + struct buf_page *page, size_t offset, ssize_t diff_offset)
> > +{
> > + struct buf_page *iter;
> > + size_t orig_iter_off;
> > + unsigned int i = 0;
> > +
> > + orig_iter_off = page->offset;
> > + list_for_each_entry_reverse(iter, &page->list, list) {
> > + /*
> > + * Skip the real list head.
> > + */
> > + if (&iter->list == &buf->pages)
> > + continue;
> > + i++;
> > + if (offset >= iter->offset
> > + && offset < iter->offset + PAGE_SIZE) {
> > +#ifdef CONFIG_LTT_RELAY_CHECK_RANDOM_ACCESS
> > + if (i > 1) {
> > + printk(KERN_WARNING
> > + "Backward random access detected in "
> > + "ltt_relay. Iterations %u, "
> > + "offset %zu, orig iter->off %zu, "
> > + "iter->off %zu diff_offset %zd.\n", i,
> > + offset, orig_iter_off, iter->offset,
> > + diff_offset);
> > + WARN_ON(1);
> > + }
> > +#endif
> > + return iter;
> > + }
> > + }
> > + return NULL;
> > +}
> > +
> > +/*
> > + * Start iteration at the next element. Skip the real list head.
> > + */
> > +static struct buf_page *ltt_relay_find_next_page(struct rchan_buf *buf,
> > + struct buf_page *page, size_t offset, ssize_t diff_offset)
> > +{
> > + struct buf_page *iter;
> > + unsigned int i = 0;
> > + size_t orig_iter_off;
> > +
> > + orig_iter_off = page->offset;
> > + list_for_each_entry(iter, &page->list, list) {
> > + /*
> > + * Skip the real list head.
> > + */
> > + if (&iter->list == &buf->pages)
> > + continue;
> > + i++;
> > + if (offset >= iter->offset
> > + && offset < iter->offset + PAGE_SIZE) {
> > +#ifdef CONFIG_LTT_RELAY_CHECK_RANDOM_ACCESS
> > + if (i > 1) {
> > + printk(KERN_WARNING
> > + "Forward random access detected in "
> > + "ltt_relay. Iterations %u, "
> > + "offset %zu, orig iter->off %zu, "
> > + "iter->off %zu diff_offset %zd.\n", i,
> > + offset, orig_iter_off, iter->offset,
> > + diff_offset);
> > + WARN_ON(1);
> > + }
> > +#endif
> > + return iter;
> > + }
> > + }
> > + return NULL;
> > +}
> > +
> > +/*
> > + * Find the page containing "offset". Cache it if it is after the currently
> > + * cached page.
> > + */
> > +static struct buf_page *ltt_relay_cache_page(struct rchan_buf *buf,
> > + struct buf_page **page_cache,
> > + struct buf_page *page, size_t offset)
> > +{
> > + ssize_t diff_offset;
> > + ssize_t half_buf_size = buf->chan->alloc_size >> 1;
> > +
> > + /*
> > + * Make sure this is the page we want to write into. The current
> > + * page is changed concurrently by other writers. [wrh]page are
> > + * used as a cache remembering the last page written
> > + * to/read/looked up for header address. No synchronization;
> > + * could have to find the previous page is a nested write
> > + * occured. Finding the right page is done by comparing the
> > + * dest_offset with the buf_page offsets.
> > + * When at the exact opposite of the buffer, bias towards forward search
> > + * because it will be cached.
> > + */
> > +
> > + diff_offset = (ssize_t)offset - (ssize_t)page->offset;
> > + if (diff_offset <= -(ssize_t)half_buf_size)
> > + diff_offset += buf->chan->alloc_size;
> > + else if (diff_offset > half_buf_size)
> > + diff_offset -= buf->chan->alloc_size;
> > +
> > + if (unlikely(diff_offset >= (ssize_t)PAGE_SIZE)) {
> > + page = ltt_relay_find_next_page(buf, page, offset, diff_offset);
> > + WARN_ON(!page);
> > + *page_cache = page;
> > + } else if (unlikely(diff_offset < 0)) {
> > + page = ltt_relay_find_prev_page(buf, page, offset, diff_offset);
> > + WARN_ON(!page);
> > + }
> > + return page;
> > +}
> > +
> > +/**
> > + * ltt_relay_write - write data to a ltt_relay buffer.
> > + * @buf : buffer
> > + * @offset : offset within the buffer
> > + * @src : source address
> > + * @len : length to write
> > + */
> > +int ltt_relay_write(struct rchan_buf *buf, size_t offset,
> > + const void *src, size_t len)
> > +{
> > + struct buf_page *page;
> > + ssize_t pagecpy, orig_len;
> > +
> > + orig_len = len;
> > + offset &= buf->chan->alloc_size - 1;
> > + page = buf->wpage;
> > + if (unlikely(!len))
> > + return 0;
> > + for (;;) {
> > + page = ltt_relay_cache_page(buf, &buf->wpage, page, offset);
> > + pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
> > + memcpy(page_address(page->page)
> > + + (offset & ~PAGE_MASK), src, pagecpy);
> > + len -= pagecpy;
> > + if (likely(!len))
> > + break;
> > + src += pagecpy;
> > + offset += pagecpy;
> > + /*
> > + * Underlying layer should never ask for writes across
> > + * subbuffers.
> > + */
> > + WARN_ON(offset >= buf->chan->alloc_size);
> > + }
> > + return orig_len;
> > +}
> > +EXPORT_SYMBOL_GPL(ltt_relay_write);
> > +
> > +/**
> > + * ltt_relay_read - read data from ltt_relay_buffer.
> > + * @buf : buffer
> > + * @offset : offset within the buffer
> > + * @dest : destination address
> > + * @len : length to write
> > + */
> > +int ltt_relay_read(struct rchan_buf *buf, size_t offset,
> > + void *dest, size_t len)
> > +{
> > + struct buf_page *page;
> > + ssize_t pagecpy, orig_len;
> > +
> > + orig_len = len;
> > + offset &= buf->chan->alloc_size - 1;
> > + page = buf->rpage;
> > + if (unlikely(!len))
> > + return 0;
> > + for (;;) {
> > + page = ltt_relay_cache_page(buf, &buf->rpage, page, offset);
> > + pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
> > + memcpy(dest, page_address(page->page) + (offset & ~PAGE_MASK),
> > + pagecpy);
> > + len -= pagecpy;
> > + if (likely(!len))
> > + break;
> > + dest += pagecpy;
> > + offset += pagecpy;
> > + /*
> > + * Underlying layer should never ask for reads across
> > + * subbuffers.
> > + */
> > + WARN_ON(offset >= buf->chan->alloc_size);
> > + }
> > + return orig_len;
> > +}
> > +EXPORT_SYMBOL_GPL(ltt_relay_read);
> > +
> > +/**
> > + * ltt_relay_read_get_page - Get a whole page to read from
> > + * @buf : buffer
> > + * @offset : offset within the buffer
> > + */
> > +struct buf_page *ltt_relay_read_get_page(struct rchan_buf *buf, size_t offset)
> > +{
> > + struct buf_page *page;
> > +
> > + offset &= buf->chan->alloc_size - 1;
> > + page = buf->rpage;
> > + page = ltt_relay_cache_page(buf, &buf->rpage, page, offset);
> > + return page;
> > +}
> > +EXPORT_SYMBOL_GPL(ltt_relay_read_get_page);
> > +
> > +/**
> > + * ltt_relay_offset_address - get address of a location within the buffer
> > + * @buf : buffer
> > + * @offset : offset within the buffer.
> > + *
> > + * Return the address where a given offset is located.
> > + * Should be used to get the current subbuffer header pointer. Given we know
> > + * it's never on a page boundary, it's safe to write directly to this address,
> > + * as long as the write is never bigger than a page size.
> > + */
> > +void *ltt_relay_offset_address(struct rchan_buf *buf, size_t offset)
> > +{
> > + struct buf_page *page;
> > + unsigned int odd;
> > +
> > + offset &= buf->chan->alloc_size - 1;
> > + odd = !!(offset & buf->chan->subbuf_size);
> > + page = buf->hpage[odd];
> > + if (offset < page->offset || offset >= page->offset + PAGE_SIZE)
> > + buf->hpage[odd] = page = buf->wpage;
> > + page = ltt_relay_cache_page(buf, &buf->hpage[odd], page, offset);
> > + return page_address(page->page) + (offset & ~PAGE_MASK);
> > +}
> > +EXPORT_SYMBOL_GPL(ltt_relay_offset_address);
> > +
> > +/**
> > + * relay_file_open - open file op for relay files
> > + * @inode: the inode
> > + * @filp: the file
> > + *
> > + * Increments the channel buffer refcount.
> > + */
> > +static int relay_file_open(struct inode *inode, struct file *filp)
> > +{
> > + struct rchan_buf *buf = inode->i_private;
> > + kref_get(&buf->kref);
> > + filp->private_data = buf;
> > +
> > + return nonseekable_open(inode, filp);
> > +}
> > +
> > +/**
> > + * relay_file_release - release file op for relay files
> > + * @inode: the inode
> > + * @filp: the file
> > + *
> > + * Decrements the channel refcount, as the filesystem is
> > + * no longer using it.
> > + */
> > +static int relay_file_release(struct inode *inode, struct file *filp)
> > +{
> > + struct rchan_buf *buf = filp->private_data;
> > + kref_put(&buf->kref, relay_remove_buf);
> > +
> > + return 0;
> > +}
> > +
> > +const struct file_operations ltt_relay_file_operations = {
> > + .open = relay_file_open,
> > + .release = relay_file_release,
> > +};
> > +EXPORT_SYMBOL_GPL(ltt_relay_file_operations);
> > +
> > +static __init int relay_init(void)
> > +{
> > + hotcpu_notifier(relay_hotcpu_callback, 5);
> > + return 0;
> > +}
> > +
> > +module_init(relay_init);
> > Index: linux-2.6-lttng/include/linux/ltt-relay.h
> > ===================================================================
> > --- /dev/null 1970-01-01 00:00:00.000000000 +0000
> > +++ linux-2.6-lttng/include/linux/ltt-relay.h 2009-03-05 15:05:56.000000000 -0500
> > @@ -0,0 +1,182 @@
> > +/*
> > + * linux/include/linux/ltt-relay.h
> > + *
> > + * Copyright (C) 2002, 2003 - Tom Zanussi (zanussi at us.ibm.com), IBM Corp
> > + * Copyright (C) 1999, 2000, 2001, 2002 - Karim Yaghmour (karim at opersys.com)
> > + * Copyright (C) 2008 - Mathieu Desnoyers (mathieu.desnoyers at polymtl.ca)
> > + *
> > + * CONFIG_RELAY definitions and declarations
> > + */
> > +
> > +#ifndef _LINUX_LTT_RELAY_H
> > +#define _LINUX_LTT_RELAY_H
> > +
> > +#include <linux/types.h>
> > +#include <linux/sched.h>
> > +#include <linux/timer.h>
> > +#include <linux/wait.h>
> > +#include <linux/list.h>
> > +#include <linux/fs.h>
> > +#include <linux/poll.h>
> > +#include <linux/kref.h>
> > +#include <linux/mm.h>
> > +
> > +/* Needs a _much_ better name... */
> > +#define FIX_SIZE(x) ((((x) - 1) & PAGE_MASK) + PAGE_SIZE)
> > +
> > +/*
> > + * Tracks changes to rchan/rchan_buf structs
> > + */
> > +#define LTT_RELAY_CHANNEL_VERSION 8
> > +
> > +struct rchan_buf;
> > +
> > +struct buf_page {
> > + struct page *page;
> > + size_t offset; /* page offset in the buffer */
> > + struct list_head list; /* buffer linked list */
> > +};
> > +
> > +/*
> > + * Per-cpu relay channel buffer
> > + */
> > +struct rchan_buf {
> > + void *chan_private; /* private data for this buf */
> > + struct rchan *chan; /* associated channel */
> > + struct dentry *dentry; /* channel file dentry */
> > + struct kref kref; /* channel buffer refcount */
> > + struct list_head pages; /* list of buffer pages */
> > + struct buf_page *wpage; /* current write page (cache) */
> > + struct buf_page *hpage[2]; /* current subbuf header page (cache) */
> > + struct buf_page *rpage; /* current subbuf read page (cache) */
> > + unsigned int page_count; /* number of current buffer pages */
> > + unsigned int cpu; /* this buf's cpu */
> > +} ____cacheline_aligned;
> > +
> > +/*
> > + * Relay channel data structure
> > + */
> > +struct rchan {
> > + u32 version; /* the version of this struct */
> > + size_t subbuf_size; /* sub-buffer size */
> > + size_t n_subbufs; /* number of sub-buffers per buffer */
> > + size_t alloc_size; /* total buffer size allocated */
> > + struct rchan_callbacks *cb; /* client callbacks */
> > + struct kref kref; /* channel refcount */
> > + void *private_data; /* for user-defined data */
> > + struct rchan_buf *buf[NR_CPUS]; /* per-cpu channel buffers */
> > + struct list_head list; /* for channel list */
> > + struct dentry *parent; /* parent dentry passed to open */
> > + int subbuf_size_order; /* order of sub-buffer size */
> > + char base_filename[NAME_MAX]; /* saved base filename */
> > +};
> > +
> > +/*
> > + * Relay channel client callbacks
> > + */
> > +struct rchan_callbacks {
> > + /*
> > + * subbuf_start - called on buffer-switch to a new sub-buffer
> > + * @buf: the channel buffer containing the new sub-buffer
> > + * @subbuf: the start of the new sub-buffer
> > + * @prev_subbuf: the start of the previous sub-buffer
> > + * @prev_padding: unused space at the end of previous sub-buffer
> > + *
> > + * The client should return 1 to continue logging, 0 to stop
> > + * logging.
> > + *
> > + * NOTE: subbuf_start will also be invoked when the buffer is
> > + * created, so that the first sub-buffer can be initialized
> > + * if necessary. In this case, prev_subbuf will be NULL.
> > + *
> > + * NOTE: the client can reserve bytes at the beginning of the new
> > + * sub-buffer by calling subbuf_start_reserve() in this callback.
> > + */
> > + int (*subbuf_start) (struct rchan_buf *buf,
> > + void *subbuf,
> > + void *prev_subbuf,
> > + size_t prev_padding);
> > +
> > + /*
> > + * create_buf_file - create file to represent a relay channel buffer
> > + * @filename: the name of the file to create
> > + * @parent: the parent of the file to create
> > + * @mode: the mode of the file to create
> > + * @buf: the channel buffer
> > + *
> > + * Called during relay_open(), once for each per-cpu buffer,
> > + * to allow the client to create a file to be used to
> > + * represent the corresponding channel buffer. If the file is
> > + * created outside of relay, the parent must also exist in
> > + * that filesystem.
> > + *
> > + * The callback should return the dentry of the file created
> > + * to represent the relay buffer.
> > + *
> > + * Setting the is_global outparam to a non-zero value will
> > + * cause relay_open() to create a single global buffer rather
> > + * than the default set of per-cpu buffers.
> > + *
> > + * See Documentation/filesystems/relayfs.txt for more info.
> > + */
> > + struct dentry *(*create_buf_file)(const char *filename,
> > + struct dentry *parent,
> > + int mode,
> > + struct rchan_buf *buf);
> > +
> > + /*
> > + * remove_buf_file - remove file representing a relay channel buffer
> > + * @dentry: the dentry of the file to remove
> > + *
> > + * Called during relay_close(), once for each per-cpu buffer,
> > + * to allow the client to remove a file used to represent a
> > + * channel buffer.
> > + *
> > + * The callback should return 0 if successful, negative if not.
> > + */
> > + int (*remove_buf_file)(struct dentry *dentry);
> > +};
> > +
> > +extern int ltt_relay_write(struct rchan_buf *buf, size_t offset,
> > + const void *src, size_t len);
> > +
> > +extern int ltt_relay_read(struct rchan_buf *buf, size_t offset,
> > + void *dest, size_t len);
> > +
> > +extern struct buf_page *ltt_relay_read_get_page(struct rchan_buf *buf,
> > + size_t offset);
> > +
> > +/*
> > + * Return the address where a given offset is located.
> > + * Should be used to get the current subbuffer header pointer. Given we know
> > + * it's never on a page boundary, it's safe to write directly to this address,
> > + * as long as the write is never bigger than a page size.
> > + */
> > +extern void *ltt_relay_offset_address(struct rchan_buf *buf,
> > + size_t offset);
> > +
> > +/*
> > + * CONFIG_LTT_RELAY kernel API, ltt/ltt-relay-alloc.c
> > + */
> > +
> > +struct rchan *ltt_relay_open(const char *base_filename,
> > + struct dentry *parent,
> > + size_t subbuf_size,
> > + size_t n_subbufs,
> > + struct rchan_callbacks *cb,
> > + void *private_data);
> > +extern void ltt_relay_close(struct rchan *chan);
> > +
> > +void ltt_relay_get_chan(struct rchan *chan);
> > +void ltt_relay_put_chan(struct rchan *chan);
> > +
> > +void ltt_relay_get_chan_buf(struct rchan_buf *buf);
> > +void ltt_relay_put_chan_buf(struct rchan_buf *buf);
> > +
> > +/*
> > + * exported ltt_relay file operations, ltt/ltt-relay-alloc.c
> > + */
> > +extern const struct file_operations ltt_relay_file_operations;
> > +
> > +#endif /* _LINUX_LTT_RELAY_H */
> > +
> >
> > --
> > Mathieu Desnoyers
> > OpenPGP key fingerprint: 8CD5 52C3 8E3C 4140 715F BA06 3F25 A8FE 3BAE 9A68
> >
--
Mathieu Desnoyers
OpenPGP key fingerprint: 8CD5 52C3 8E3C 4140 715F BA06 3F25 A8FE 3BAE 9A68
More information about the lttng-dev
mailing list