[ltt-dev] [RFC patch 05/41] LTTng relay buffer allocation, read, write
Steven Rostedt
rostedt at goodmis.org
Fri Mar 6 14:19:06 EST 2009
On Thu, 5 Mar 2009, Mathieu Desnoyers wrote:
> As I told Martin, I was thinking about taking an axe and moving stuff around in
> relay. Which I just did.
>
> This patch reimplements relay with a linked list of pages. Provides read/write
> wrappers which should be used to read or write from the buffers. It's the core
> of a layered approach to the design requirements expressed by Martin and
> discussed earlier.
>
> It does not provide _any_ sort of locking on buffer data. Locking should be done
> by the caller. Given that we might think of very lightweight locking schemes, it
> makes sense to me that the underlying buffering infrastructure supports event
> records larger than 1 page.
You bring up two points.
1) lockless
2) larger than 1 page of data
Soon the ring buffer will be lockless on the write side. On the read side
we have locking. This could be changed to allow for a new API with
specific requirements that does not need reader side locking. If the ring
buffer is strictly produce/consumer without overwrite, then it would make
sense to have a lockless system on both reader and writer.
I have even expressed interest in implementing this. But right now my
focus has been on getting other aspects working. Ftrace runs in overwrite
mode so it must have the locking.
>
> A cache saving 4 pointers is used to keep track of current page used for the
> buffer for write, current page read and two contiguous subbuffer header pointer
> lookup. The offset of each page within the buffer is saved in a structure
> containing the offset, linked list and page frame pointer to permit cache lookup
> without extra locking.
I'm also all for optimizations. Right now the focus has been on making
sure the ring buffer can do all that is requested of it. I wanted it to be
able to useful for all users, not focused on a select few. This actually
includes ftrace. That is, I did not develop the ring buffer to have only
ftrace as its only user.
>
> The offset and linked list are not placed in the page frame itself to allow
> using the pages directly for disk I/O, network I/O or to mmap it to userspace
> for live processing.
>
> Write and header address lookup tested through LTTng. This patch contains
> self-test code which detects if a client is actually trying to use the
> read/write/get header address API to do random buffer offset access. If such
> behavior is detected, a warning message is issued and the random access is done
> as requested.
Hmm, if a warning message is done, it seems that the tracer should shut
down. Either support the operation or do not support it. Do not give the
user a "Oh, you really should not do that, but I'll let you anyway".
Otherwise you will be pressured to make it a true feature.
>
> TODO : Currently, no splice file operations are implemented. Should come soon.
> The idea is to splice the buffers directly into files or to the network.
> We have to make sure the page frame fields used are not used by disk I/O or
> network.
>
> Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at polymtl.ca>
> CC: Jens Axboe <jens.axboe at oracle.com>
> CC: Martin Bligh <mbligh at google.com>
> CC: Peter Zijlstra <a.p.zijlstra at chello.nl>
> CC: Tom Zanussi <zanussi at comcast.net>
> CC: prasad at linux.vnet.ibm.com
> CC: Linus Torvalds <torvalds at linux-foundation.org>
> CC: Thomas Gleixner <tglx at linutronix.de>
> CC: Steven Rostedt <rostedt at goodmis.org>
> CC: od at suse.com
> CC: "Frank Ch. Eigler" <fche at redhat.com>
> CC: Andrew Morton <akpm at linux-foundation.org>
> CC: hch at lst.de
> CC: David Wilder <dwilder at us.ibm.com>
> ---
> include/linux/ltt-relay.h | 182 +++++++++++
> ltt/ltt-relay-alloc.c | 705 ++++++++++++++++++++++++++++++++++++++++++++++
> 2 files changed, 887 insertions(+)
>
> Index: linux-2.6-lttng/ltt/ltt-relay-alloc.c
> ===================================================================
> --- /dev/null 1970-01-01 00:00:00.000000000 +0000
> +++ linux-2.6-lttng/ltt/ltt-relay-alloc.c 2009-03-05 15:05:56.000000000 -0500
> @@ -0,0 +1,705 @@
> +/*
> + * Public API and common code for kernel->userspace relay file support.
> + *
> + * Copyright (C) 2002-2005 - Tom Zanussi (zanussi at us.ibm.com), IBM Corp
> + * Copyright (C) 1999-2005 - Karim Yaghmour (karim at opersys.com)
> + * Copyright (C) 2008 - Mathieu Desnoyers (mathieu.desnoyers at polymtl.ca)
> + *
> + * Moved to kernel/relay.c by Paul Mundt, 2006.
> + * November 2006 - CPU hotplug support by Mathieu Desnoyers
> + * (mathieu.desnoyers at polymtl.ca)
> + *
> + * This file is released under the GPL.
> + */
> +#include <linux/errno.h>
> +#include <linux/stddef.h>
> +#include <linux/slab.h>
> +#include <linux/module.h>
> +#include <linux/string.h>
> +#include <linux/ltt-relay.h>
> +#include <linux/vmalloc.h>
> +#include <linux/mm.h>
> +#include <linux/cpu.h>
> +#include <linux/splice.h>
> +#include <linux/bitops.h>
> +
> +/* list of open channels, for cpu hotplug */
> +static DEFINE_MUTEX(relay_channels_mutex);
> +static LIST_HEAD(relay_channels);
> +
> +/**
> + * relay_alloc_buf - allocate a channel buffer
> + * @buf: the buffer struct
> + * @size: total size of the buffer
> + */
> +static int relay_alloc_buf(struct rchan_buf *buf, size_t *size)
> +{
> + unsigned int i, n_pages;
> + struct buf_page *buf_page, *n;
> +
> + *size = PAGE_ALIGN(*size);
> + n_pages = *size >> PAGE_SHIFT;
> +
> + INIT_LIST_HEAD(&buf->pages);
> +
> + for (i = 0; i < n_pages; i++) {
> + buf_page = kmalloc_node(sizeof(*buf_page), GFP_KERNEL,
> + cpu_to_node(buf->cpu));
> + if (unlikely(!buf_page))
> + goto depopulate;
> + buf_page->page = alloc_pages_node(cpu_to_node(buf->cpu),
> + GFP_KERNEL | __GFP_ZERO, 0);
So these buffers allocate single pages?
> + if (unlikely(!buf_page->page)) {
> + kfree(buf_page);
> + goto depopulate;
> + }
> + list_add_tail(&buf_page->list, &buf->pages);
> + buf_page->offset = (size_t)i << PAGE_SHIFT;
> + set_page_private(buf_page->page, (unsigned long)buf_page);
> + if (i == 0) {
> + buf->wpage = buf_page;
> + buf->hpage[0] = buf_page;
> + buf->hpage[1] = buf_page;
> + buf->rpage = buf_page;
> + }
> + }
> + buf->page_count = n_pages;
> + return 0;
> +
> +depopulate:
> + list_for_each_entry_safe(buf_page, n, &buf->pages, list) {
> + list_del_init(&buf_page->list);
> + __free_page(buf_page->page);
> + kfree(buf_page);
> + }
> + return -ENOMEM;
> +}
> +
> +/**
> + * relay_create_buf - allocate and initialize a channel buffer
> + * @chan: the relay channel
> + * @cpu: cpu the buffer belongs to
> + *
> + * Returns channel buffer if successful, %NULL otherwise.
> + */
> +static struct rchan_buf *relay_create_buf(struct rchan *chan, int cpu)
> +{
> + int ret;
> + struct rchan_buf *buf = kzalloc(sizeof(struct rchan_buf), GFP_KERNEL);
> + if (!buf)
> + return NULL;
> +
> + buf->cpu = cpu;
> + ret = relay_alloc_buf(buf, &chan->alloc_size);
> + if (ret)
> + goto free_buf;
> +
> + buf->chan = chan;
> + kref_get(&buf->chan->kref);
> + return buf;
> +
> +free_buf:
> + kfree(buf);
> + return NULL;
> +}
> +
> +/**
> + * relay_destroy_channel - free the channel struct
> + * @kref: target kernel reference that contains the relay channel
> + *
> + * Should only be called from kref_put().
> + */
> +static void relay_destroy_channel(struct kref *kref)
> +{
> + struct rchan *chan = container_of(kref, struct rchan, kref);
> + kfree(chan);
> +}
> +
> +void ltt_relay_get_chan(struct rchan *chan)
> +{
> + kref_get(&chan->kref);
> +}
> +EXPORT_SYMBOL_GPL(ltt_relay_get_chan);
> +
> +void ltt_relay_put_chan(struct rchan *chan)
> +{
> + kref_put(&chan->kref, relay_destroy_channel);
> +}
> +EXPORT_SYMBOL_GPL(ltt_relay_put_chan);
> +
> +/**
> + * relay_destroy_buf - destroy an rchan_buf struct and associated buffer
> + * @buf: the buffer struct
> + */
> +static void relay_destroy_buf(struct rchan_buf *buf)
> +{
> + struct rchan *chan = buf->chan;
> + struct buf_page *buf_page, *n;
> +
> + list_for_each_entry_safe(buf_page, n, &buf->pages, list) {
> + list_del_init(&buf_page->list);
> + __free_page(buf_page->page);
> + kfree(buf_page);
> + }
> + chan->buf[buf->cpu] = NULL;
> + kfree(buf);
> + kref_put(&chan->kref, relay_destroy_channel);
> +}
> +
> +/**
> + * relay_remove_buf - remove a channel buffer
> + * @kref: target kernel reference that contains the relay buffer
> + *
> + * Removes the file from the fileystem, which also frees the
> + * rchan_buf_struct and the channel buffer. Should only be called from
> + * kref_put().
> + */
> +static void relay_remove_buf(struct kref *kref)
> +{
> + struct rchan_buf *buf = container_of(kref, struct rchan_buf, kref);
> + buf->chan->cb->remove_buf_file(buf->dentry);
> + relay_destroy_buf(buf);
> +}
> +
> +void ltt_relay_get_chan_buf(struct rchan_buf *buf)
> +{
> + kref_get(&buf->kref);
> +}
> +EXPORT_SYMBOL_GPL(ltt_relay_get_chan_buf);
> +
> +void ltt_relay_put_chan_buf(struct rchan_buf *buf)
> +{
> + kref_put(&buf->kref, relay_remove_buf);
> +}
> +EXPORT_SYMBOL_GPL(ltt_relay_put_chan_buf);
> +
> +/*
> + * High-level relay kernel API and associated functions.
> + */
> +
> +/*
> + * rchan_callback implementations defining default channel behavior. Used
> + * in place of corresponding NULL values in client callback struct.
> + */
> +
> +/*
> + * create_buf_file_create() default callback. Does nothing.
> + */
> +static struct dentry *create_buf_file_default_callback(const char *filename,
> + struct dentry *parent,
> + int mode,
> + struct rchan_buf *buf)
> +{
> + return NULL;
> +}
> +
> +/*
> + * remove_buf_file() default callback. Does nothing.
> + */
> +static int remove_buf_file_default_callback(struct dentry *dentry)
> +{
> + return -EINVAL;
> +}
> +
> +/* relay channel default callbacks */
> +static struct rchan_callbacks default_channel_callbacks = {
> + .create_buf_file = create_buf_file_default_callback,
> + .remove_buf_file = remove_buf_file_default_callback,
> +};
> +
> +/**
> + * __relay_reset - reset a channel buffer
> + * @buf: the channel buffer
> + * @init: 1 if this is a first-time initialization
> + *
> + * See relay_reset() for description of effect.
> + */
> +static void __relay_reset(struct rchan_buf *buf, unsigned int init)
> +{
> + if (init)
> + kref_init(&buf->kref);
> +}
> +
> +/*
> + * relay_open_buf - create a new relay channel buffer
> + *
> + * used by relay_open() and CPU hotplug.
> + */
> +static struct rchan_buf *relay_open_buf(struct rchan *chan, unsigned int cpu)
> +{
> + struct rchan_buf *buf = NULL;
> + struct dentry *dentry;
> + char *tmpname;
> +
> + tmpname = kzalloc(NAME_MAX + 1, GFP_KERNEL);
> + if (!tmpname)
> + goto end;
> + snprintf(tmpname, NAME_MAX, "%s%d", chan->base_filename, cpu);
> +
> + buf = relay_create_buf(chan, cpu);
> + if (!buf)
> + goto free_name;
> +
> + __relay_reset(buf, 1);
> +
> + /* Create file in fs */
> + dentry = chan->cb->create_buf_file(tmpname, chan->parent, S_IRUSR,
> + buf);
> + if (!dentry)
> + goto free_buf;
> +
> + buf->dentry = dentry;
> +
> + goto free_name;
> +
> +free_buf:
> + relay_destroy_buf(buf);
> + buf = NULL;
> +free_name:
> + kfree(tmpname);
> +end:
> + return buf;
> +}
> +
> +/**
> + * relay_close_buf - close a channel buffer
> + * @buf: channel buffer
> + *
> + * Restores the default callbacks.
> + * The channel buffer and channel buffer data structure are then freed
> + * automatically when the last reference is given up.
> + */
> +static void relay_close_buf(struct rchan_buf *buf)
> +{
> + kref_put(&buf->kref, relay_remove_buf);
> +}
> +
> +static void setup_callbacks(struct rchan *chan,
> + struct rchan_callbacks *cb)
> +{
> + if (!cb) {
> + chan->cb = &default_channel_callbacks;
> + return;
> + }
> +
> + if (!cb->create_buf_file)
> + cb->create_buf_file = create_buf_file_default_callback;
> + if (!cb->remove_buf_file)
> + cb->remove_buf_file = remove_buf_file_default_callback;
> + chan->cb = cb;
> +}
> +
> +/**
> + * relay_hotcpu_callback - CPU hotplug callback
> + * @nb: notifier block
> + * @action: hotplug action to take
> + * @hcpu: CPU number
> + *
> + * Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
> + */
> +static int __cpuinit relay_hotcpu_callback(struct notifier_block *nb,
> + unsigned long action,
> + void *hcpu)
> +{
> + unsigned int hotcpu = (unsigned long)hcpu;
> + struct rchan *chan;
> +
> + switch (action) {
> + case CPU_UP_PREPARE:
> + case CPU_UP_PREPARE_FROZEN:
> + mutex_lock(&relay_channels_mutex);
> + list_for_each_entry(chan, &relay_channels, list) {
> + if (chan->buf[hotcpu])
> + continue;
> + chan->buf[hotcpu] = relay_open_buf(chan, hotcpu);
> + if (!chan->buf[hotcpu]) {
> + printk(KERN_ERR
> + "relay_hotcpu_callback: cpu %d buffer "
> + "creation failed\n", hotcpu);
> + mutex_unlock(&relay_channels_mutex);
> + return NOTIFY_BAD;
> + }
> + }
> + mutex_unlock(&relay_channels_mutex);
> + break;
> + case CPU_DEAD:
> + case CPU_DEAD_FROZEN:
> + /* No need to flush the cpu : will be flushed upon
> + * final relay_flush() call. */
> + break;
> + }
> + return NOTIFY_OK;
> +}
> +
> +/**
> + * ltt_relay_open - create a new relay channel
> + * @base_filename: base name of files to create
> + * @parent: dentry of parent directory, %NULL for root directory
> + * @subbuf_size: size of sub-buffers
> + * @n_subbufs: number of sub-buffers
> + * @cb: client callback functions
> + * @private_data: user-defined data
> + *
> + * Returns channel pointer if successful, %NULL otherwise.
> + *
> + * Creates a channel buffer for each cpu using the sizes and
> + * attributes specified. The created channel buffer files
> + * will be named base_filename0...base_filenameN-1. File
> + * permissions will be %S_IRUSR.
> + */
> +struct rchan *ltt_relay_open(const char *base_filename,
> + struct dentry *parent,
> + size_t subbuf_size,
> + size_t n_subbufs,
> + struct rchan_callbacks *cb,
> + void *private_data)
> +{
> + unsigned int i;
> + struct rchan *chan;
> + if (!base_filename)
> + return NULL;
> +
> + if (!(subbuf_size && n_subbufs))
> + return NULL;
> +
> + chan = kzalloc(sizeof(struct rchan), GFP_KERNEL);
> + if (!chan)
> + return NULL;
> +
> + chan->version = LTT_RELAY_CHANNEL_VERSION;
> + chan->n_subbufs = n_subbufs;
> + chan->subbuf_size = subbuf_size;
You declare the sub buf size here, but I do not see how it gets allocated.
-- Steve
> + chan->subbuf_size_order = get_count_order(subbuf_size);
> + chan->alloc_size = FIX_SIZE(subbuf_size * n_subbufs);
> + chan->parent = parent;
> + chan->private_data = private_data;
> + strlcpy(chan->base_filename, base_filename, NAME_MAX);
> + setup_callbacks(chan, cb);
> + kref_init(&chan->kref);
> +
> + mutex_lock(&relay_channels_mutex);
> + for_each_online_cpu(i) {
> + chan->buf[i] = relay_open_buf(chan, i);
> + if (!chan->buf[i])
> + goto free_bufs;
> + }
> + list_add(&chan->list, &relay_channels);
> + mutex_unlock(&relay_channels_mutex);
> +
> + return chan;
> +
> +free_bufs:
> + for_each_possible_cpu(i) {
> + if (!chan->buf[i])
> + break;
> + relay_close_buf(chan->buf[i]);
> + }
> +
> + kref_put(&chan->kref, relay_destroy_channel);
> + mutex_unlock(&relay_channels_mutex);
> + return NULL;
> +}
> +EXPORT_SYMBOL_GPL(ltt_relay_open);
> +
> +/**
> + * ltt_relay_close - close the channel
> + * @chan: the channel
> + *
> + * Closes all channel buffers and frees the channel.
> + */
> +void ltt_relay_close(struct rchan *chan)
> +{
> + unsigned int i;
> +
> + if (!chan)
> + return;
> +
> + mutex_lock(&relay_channels_mutex);
> + for_each_possible_cpu(i)
> + if (chan->buf[i])
> + relay_close_buf(chan->buf[i]);
> +
> + list_del(&chan->list);
> + kref_put(&chan->kref, relay_destroy_channel);
> + mutex_unlock(&relay_channels_mutex);
> +}
> +EXPORT_SYMBOL_GPL(ltt_relay_close);
> +
> +/*
> + * Start iteration at the previous element. Skip the real list head.
> + */
> +static struct buf_page *ltt_relay_find_prev_page(struct rchan_buf *buf,
> + struct buf_page *page, size_t offset, ssize_t diff_offset)
> +{
> + struct buf_page *iter;
> + size_t orig_iter_off;
> + unsigned int i = 0;
> +
> + orig_iter_off = page->offset;
> + list_for_each_entry_reverse(iter, &page->list, list) {
> + /*
> + * Skip the real list head.
> + */
> + if (&iter->list == &buf->pages)
> + continue;
> + i++;
> + if (offset >= iter->offset
> + && offset < iter->offset + PAGE_SIZE) {
> +#ifdef CONFIG_LTT_RELAY_CHECK_RANDOM_ACCESS
> + if (i > 1) {
> + printk(KERN_WARNING
> + "Backward random access detected in "
> + "ltt_relay. Iterations %u, "
> + "offset %zu, orig iter->off %zu, "
> + "iter->off %zu diff_offset %zd.\n", i,
> + offset, orig_iter_off, iter->offset,
> + diff_offset);
> + WARN_ON(1);
> + }
> +#endif
> + return iter;
> + }
> + }
> + return NULL;
> +}
> +
> +/*
> + * Start iteration at the next element. Skip the real list head.
> + */
> +static struct buf_page *ltt_relay_find_next_page(struct rchan_buf *buf,
> + struct buf_page *page, size_t offset, ssize_t diff_offset)
> +{
> + struct buf_page *iter;
> + unsigned int i = 0;
> + size_t orig_iter_off;
> +
> + orig_iter_off = page->offset;
> + list_for_each_entry(iter, &page->list, list) {
> + /*
> + * Skip the real list head.
> + */
> + if (&iter->list == &buf->pages)
> + continue;
> + i++;
> + if (offset >= iter->offset
> + && offset < iter->offset + PAGE_SIZE) {
> +#ifdef CONFIG_LTT_RELAY_CHECK_RANDOM_ACCESS
> + if (i > 1) {
> + printk(KERN_WARNING
> + "Forward random access detected in "
> + "ltt_relay. Iterations %u, "
> + "offset %zu, orig iter->off %zu, "
> + "iter->off %zu diff_offset %zd.\n", i,
> + offset, orig_iter_off, iter->offset,
> + diff_offset);
> + WARN_ON(1);
> + }
> +#endif
> + return iter;
> + }
> + }
> + return NULL;
> +}
> +
> +/*
> + * Find the page containing "offset". Cache it if it is after the currently
> + * cached page.
> + */
> +static struct buf_page *ltt_relay_cache_page(struct rchan_buf *buf,
> + struct buf_page **page_cache,
> + struct buf_page *page, size_t offset)
> +{
> + ssize_t diff_offset;
> + ssize_t half_buf_size = buf->chan->alloc_size >> 1;
> +
> + /*
> + * Make sure this is the page we want to write into. The current
> + * page is changed concurrently by other writers. [wrh]page are
> + * used as a cache remembering the last page written
> + * to/read/looked up for header address. No synchronization;
> + * could have to find the previous page is a nested write
> + * occured. Finding the right page is done by comparing the
> + * dest_offset with the buf_page offsets.
> + * When at the exact opposite of the buffer, bias towards forward search
> + * because it will be cached.
> + */
> +
> + diff_offset = (ssize_t)offset - (ssize_t)page->offset;
> + if (diff_offset <= -(ssize_t)half_buf_size)
> + diff_offset += buf->chan->alloc_size;
> + else if (diff_offset > half_buf_size)
> + diff_offset -= buf->chan->alloc_size;
> +
> + if (unlikely(diff_offset >= (ssize_t)PAGE_SIZE)) {
> + page = ltt_relay_find_next_page(buf, page, offset, diff_offset);
> + WARN_ON(!page);
> + *page_cache = page;
> + } else if (unlikely(diff_offset < 0)) {
> + page = ltt_relay_find_prev_page(buf, page, offset, diff_offset);
> + WARN_ON(!page);
> + }
> + return page;
> +}
> +
> +/**
> + * ltt_relay_write - write data to a ltt_relay buffer.
> + * @buf : buffer
> + * @offset : offset within the buffer
> + * @src : source address
> + * @len : length to write
> + */
> +int ltt_relay_write(struct rchan_buf *buf, size_t offset,
> + const void *src, size_t len)
> +{
> + struct buf_page *page;
> + ssize_t pagecpy, orig_len;
> +
> + orig_len = len;
> + offset &= buf->chan->alloc_size - 1;
> + page = buf->wpage;
> + if (unlikely(!len))
> + return 0;
> + for (;;) {
> + page = ltt_relay_cache_page(buf, &buf->wpage, page, offset);
> + pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
> + memcpy(page_address(page->page)
> + + (offset & ~PAGE_MASK), src, pagecpy);
> + len -= pagecpy;
> + if (likely(!len))
> + break;
> + src += pagecpy;
> + offset += pagecpy;
> + /*
> + * Underlying layer should never ask for writes across
> + * subbuffers.
> + */
> + WARN_ON(offset >= buf->chan->alloc_size);
> + }
> + return orig_len;
> +}
> +EXPORT_SYMBOL_GPL(ltt_relay_write);
> +
> +/**
> + * ltt_relay_read - read data from ltt_relay_buffer.
> + * @buf : buffer
> + * @offset : offset within the buffer
> + * @dest : destination address
> + * @len : length to write
> + */
> +int ltt_relay_read(struct rchan_buf *buf, size_t offset,
> + void *dest, size_t len)
> +{
> + struct buf_page *page;
> + ssize_t pagecpy, orig_len;
> +
> + orig_len = len;
> + offset &= buf->chan->alloc_size - 1;
> + page = buf->rpage;
> + if (unlikely(!len))
> + return 0;
> + for (;;) {
> + page = ltt_relay_cache_page(buf, &buf->rpage, page, offset);
> + pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
> + memcpy(dest, page_address(page->page) + (offset & ~PAGE_MASK),
> + pagecpy);
> + len -= pagecpy;
> + if (likely(!len))
> + break;
> + dest += pagecpy;
> + offset += pagecpy;
> + /*
> + * Underlying layer should never ask for reads across
> + * subbuffers.
> + */
> + WARN_ON(offset >= buf->chan->alloc_size);
> + }
> + return orig_len;
> +}
> +EXPORT_SYMBOL_GPL(ltt_relay_read);
> +
> +/**
> + * ltt_relay_read_get_page - Get a whole page to read from
> + * @buf : buffer
> + * @offset : offset within the buffer
> + */
> +struct buf_page *ltt_relay_read_get_page(struct rchan_buf *buf, size_t offset)
> +{
> + struct buf_page *page;
> +
> + offset &= buf->chan->alloc_size - 1;
> + page = buf->rpage;
> + page = ltt_relay_cache_page(buf, &buf->rpage, page, offset);
> + return page;
> +}
> +EXPORT_SYMBOL_GPL(ltt_relay_read_get_page);
> +
> +/**
> + * ltt_relay_offset_address - get address of a location within the buffer
> + * @buf : buffer
> + * @offset : offset within the buffer.
> + *
> + * Return the address where a given offset is located.
> + * Should be used to get the current subbuffer header pointer. Given we know
> + * it's never on a page boundary, it's safe to write directly to this address,
> + * as long as the write is never bigger than a page size.
> + */
> +void *ltt_relay_offset_address(struct rchan_buf *buf, size_t offset)
> +{
> + struct buf_page *page;
> + unsigned int odd;
> +
> + offset &= buf->chan->alloc_size - 1;
> + odd = !!(offset & buf->chan->subbuf_size);
> + page = buf->hpage[odd];
> + if (offset < page->offset || offset >= page->offset + PAGE_SIZE)
> + buf->hpage[odd] = page = buf->wpage;
> + page = ltt_relay_cache_page(buf, &buf->hpage[odd], page, offset);
> + return page_address(page->page) + (offset & ~PAGE_MASK);
> +}
> +EXPORT_SYMBOL_GPL(ltt_relay_offset_address);
> +
> +/**
> + * relay_file_open - open file op for relay files
> + * @inode: the inode
> + * @filp: the file
> + *
> + * Increments the channel buffer refcount.
> + */
> +static int relay_file_open(struct inode *inode, struct file *filp)
> +{
> + struct rchan_buf *buf = inode->i_private;
> + kref_get(&buf->kref);
> + filp->private_data = buf;
> +
> + return nonseekable_open(inode, filp);
> +}
> +
> +/**
> + * relay_file_release - release file op for relay files
> + * @inode: the inode
> + * @filp: the file
> + *
> + * Decrements the channel refcount, as the filesystem is
> + * no longer using it.
> + */
> +static int relay_file_release(struct inode *inode, struct file *filp)
> +{
> + struct rchan_buf *buf = filp->private_data;
> + kref_put(&buf->kref, relay_remove_buf);
> +
> + return 0;
> +}
> +
> +const struct file_operations ltt_relay_file_operations = {
> + .open = relay_file_open,
> + .release = relay_file_release,
> +};
> +EXPORT_SYMBOL_GPL(ltt_relay_file_operations);
> +
> +static __init int relay_init(void)
> +{
> + hotcpu_notifier(relay_hotcpu_callback, 5);
> + return 0;
> +}
> +
> +module_init(relay_init);
> Index: linux-2.6-lttng/include/linux/ltt-relay.h
> ===================================================================
> --- /dev/null 1970-01-01 00:00:00.000000000 +0000
> +++ linux-2.6-lttng/include/linux/ltt-relay.h 2009-03-05 15:05:56.000000000 -0500
> @@ -0,0 +1,182 @@
> +/*
> + * linux/include/linux/ltt-relay.h
> + *
> + * Copyright (C) 2002, 2003 - Tom Zanussi (zanussi at us.ibm.com), IBM Corp
> + * Copyright (C) 1999, 2000, 2001, 2002 - Karim Yaghmour (karim at opersys.com)
> + * Copyright (C) 2008 - Mathieu Desnoyers (mathieu.desnoyers at polymtl.ca)
> + *
> + * CONFIG_RELAY definitions and declarations
> + */
> +
> +#ifndef _LINUX_LTT_RELAY_H
> +#define _LINUX_LTT_RELAY_H
> +
> +#include <linux/types.h>
> +#include <linux/sched.h>
> +#include <linux/timer.h>
> +#include <linux/wait.h>
> +#include <linux/list.h>
> +#include <linux/fs.h>
> +#include <linux/poll.h>
> +#include <linux/kref.h>
> +#include <linux/mm.h>
> +
> +/* Needs a _much_ better name... */
> +#define FIX_SIZE(x) ((((x) - 1) & PAGE_MASK) + PAGE_SIZE)
> +
> +/*
> + * Tracks changes to rchan/rchan_buf structs
> + */
> +#define LTT_RELAY_CHANNEL_VERSION 8
> +
> +struct rchan_buf;
> +
> +struct buf_page {
> + struct page *page;
> + size_t offset; /* page offset in the buffer */
> + struct list_head list; /* buffer linked list */
> +};
> +
> +/*
> + * Per-cpu relay channel buffer
> + */
> +struct rchan_buf {
> + void *chan_private; /* private data for this buf */
> + struct rchan *chan; /* associated channel */
> + struct dentry *dentry; /* channel file dentry */
> + struct kref kref; /* channel buffer refcount */
> + struct list_head pages; /* list of buffer pages */
> + struct buf_page *wpage; /* current write page (cache) */
> + struct buf_page *hpage[2]; /* current subbuf header page (cache) */
> + struct buf_page *rpage; /* current subbuf read page (cache) */
> + unsigned int page_count; /* number of current buffer pages */
> + unsigned int cpu; /* this buf's cpu */
> +} ____cacheline_aligned;
> +
> +/*
> + * Relay channel data structure
> + */
> +struct rchan {
> + u32 version; /* the version of this struct */
> + size_t subbuf_size; /* sub-buffer size */
> + size_t n_subbufs; /* number of sub-buffers per buffer */
> + size_t alloc_size; /* total buffer size allocated */
> + struct rchan_callbacks *cb; /* client callbacks */
> + struct kref kref; /* channel refcount */
> + void *private_data; /* for user-defined data */
> + struct rchan_buf *buf[NR_CPUS]; /* per-cpu channel buffers */
> + struct list_head list; /* for channel list */
> + struct dentry *parent; /* parent dentry passed to open */
> + int subbuf_size_order; /* order of sub-buffer size */
> + char base_filename[NAME_MAX]; /* saved base filename */
> +};
> +
> +/*
> + * Relay channel client callbacks
> + */
> +struct rchan_callbacks {
> + /*
> + * subbuf_start - called on buffer-switch to a new sub-buffer
> + * @buf: the channel buffer containing the new sub-buffer
> + * @subbuf: the start of the new sub-buffer
> + * @prev_subbuf: the start of the previous sub-buffer
> + * @prev_padding: unused space at the end of previous sub-buffer
> + *
> + * The client should return 1 to continue logging, 0 to stop
> + * logging.
> + *
> + * NOTE: subbuf_start will also be invoked when the buffer is
> + * created, so that the first sub-buffer can be initialized
> + * if necessary. In this case, prev_subbuf will be NULL.
> + *
> + * NOTE: the client can reserve bytes at the beginning of the new
> + * sub-buffer by calling subbuf_start_reserve() in this callback.
> + */
> + int (*subbuf_start) (struct rchan_buf *buf,
> + void *subbuf,
> + void *prev_subbuf,
> + size_t prev_padding);
> +
> + /*
> + * create_buf_file - create file to represent a relay channel buffer
> + * @filename: the name of the file to create
> + * @parent: the parent of the file to create
> + * @mode: the mode of the file to create
> + * @buf: the channel buffer
> + *
> + * Called during relay_open(), once for each per-cpu buffer,
> + * to allow the client to create a file to be used to
> + * represent the corresponding channel buffer. If the file is
> + * created outside of relay, the parent must also exist in
> + * that filesystem.
> + *
> + * The callback should return the dentry of the file created
> + * to represent the relay buffer.
> + *
> + * Setting the is_global outparam to a non-zero value will
> + * cause relay_open() to create a single global buffer rather
> + * than the default set of per-cpu buffers.
> + *
> + * See Documentation/filesystems/relayfs.txt for more info.
> + */
> + struct dentry *(*create_buf_file)(const char *filename,
> + struct dentry *parent,
> + int mode,
> + struct rchan_buf *buf);
> +
> + /*
> + * remove_buf_file - remove file representing a relay channel buffer
> + * @dentry: the dentry of the file to remove
> + *
> + * Called during relay_close(), once for each per-cpu buffer,
> + * to allow the client to remove a file used to represent a
> + * channel buffer.
> + *
> + * The callback should return 0 if successful, negative if not.
> + */
> + int (*remove_buf_file)(struct dentry *dentry);
> +};
> +
> +extern int ltt_relay_write(struct rchan_buf *buf, size_t offset,
> + const void *src, size_t len);
> +
> +extern int ltt_relay_read(struct rchan_buf *buf, size_t offset,
> + void *dest, size_t len);
> +
> +extern struct buf_page *ltt_relay_read_get_page(struct rchan_buf *buf,
> + size_t offset);
> +
> +/*
> + * Return the address where a given offset is located.
> + * Should be used to get the current subbuffer header pointer. Given we know
> + * it's never on a page boundary, it's safe to write directly to this address,
> + * as long as the write is never bigger than a page size.
> + */
> +extern void *ltt_relay_offset_address(struct rchan_buf *buf,
> + size_t offset);
> +
> +/*
> + * CONFIG_LTT_RELAY kernel API, ltt/ltt-relay-alloc.c
> + */
> +
> +struct rchan *ltt_relay_open(const char *base_filename,
> + struct dentry *parent,
> + size_t subbuf_size,
> + size_t n_subbufs,
> + struct rchan_callbacks *cb,
> + void *private_data);
> +extern void ltt_relay_close(struct rchan *chan);
> +
> +void ltt_relay_get_chan(struct rchan *chan);
> +void ltt_relay_put_chan(struct rchan *chan);
> +
> +void ltt_relay_get_chan_buf(struct rchan_buf *buf);
> +void ltt_relay_put_chan_buf(struct rchan_buf *buf);
> +
> +/*
> + * exported ltt_relay file operations, ltt/ltt-relay-alloc.c
> + */
> +extern const struct file_operations ltt_relay_file_operations;
> +
> +#endif /* _LINUX_LTT_RELAY_H */
> +
>
> --
> Mathieu Desnoyers
> OpenPGP key fingerprint: 8CD5 52C3 8E3C 4140 715F BA06 3F25 A8FE 3BAE 9A68
>
More information about the lttng-dev
mailing list