[ltt-dev] [RFC patch 05/41] LTTng relay buffer allocation, read, write

Mathieu Desnoyers mathieu.desnoyers at polymtl.ca
Fri Mar 6 15:01:51 EST 2009


* Steven Rostedt (rostedt at goodmis.org) wrote:
> 
> On Thu, 5 Mar 2009, Mathieu Desnoyers wrote:
> 
> > As I told Martin, I was thinking about taking an axe and moving stuff around in
> > relay. Which I just did.
> > 
> > This patch reimplements relay with a linked list of pages. Provides read/write
> > wrappers which should be used to read or write from the buffers. It's the core
> > of a layered approach to the design requirements expressed by Martin and
> > discussed earlier.
> > 
> > It does not provide _any_ sort of locking on buffer data. Locking should be done
> > by the caller. Given that we might think of very lightweight locking schemes, it
> > makes sense to me that the underlying buffering infrastructure supports event
> > records larger than 1 page.
> 
> You bring up two points.
> 
> 1) lockless
> 
> 2) larger than 1 page of data
> 
> Soon the ring buffer will be lockless on the write side. On the read side 
> we have locking. This could be changed to allow for a new API with 
> specific requirements that does not need reader side locking. If the ring 
> buffer is strictly produce/consumer without overwrite, then it would make 
> sense to have a lockless system on both reader and writer.
> 
> I have even expressed interest in implementing this. But right now my 
> focus has been on getting other aspects working. Ftrace runs in overwrite 
> mode so it must have the locking.
> 

Note that this "buffer allocation" is not lockless in the sense that it
deals with concurrent writers in a lockless manner. It just _does not
provide_ any protection for the write side nor the read-side. The layer
over this provides this. In this patchset, it's called
"ltt-relay-locked" for the irq off/spinlock version of the concurrency
management. I did not post the lockless version in this post, it is
further down in my patchset and will probably require a bit more
discussion.

I really like to have the allocation and "locking management" layers
separated, because we can then easily mix and match each of those.

For allocation :
- Page-based backend
- Static array-based backend
- Video memory based backend (useful for crash trace extraction, because
  this storage survives a hot reboot)

For locking :
- lockless
- irq off/spinlock

> 
> > 
> > A cache saving 4 pointers is used to keep track of current page used for the
> > buffer for write, current page read and two contiguous subbuffer header pointer
> > lookup. The offset of each page within the buffer is saved in a structure
> > containing the offset, linked list and page frame pointer to permit cache lookup
> > without extra locking.
> 
> I'm also all for optimizations. Right now the focus has been on making 
> sure the ring buffer can do all that is requested of it. I wanted it to be 
> able to useful for all users, not focused on a select few. This actually 
> includes ftrace. That is, I did not develop the ring buffer to have only 
> ftrace as its only user.
> 

I think we both agree that it's good to make the buffering
infrastructure usable by all.

> > 
> > The offset and linked list are not placed in the page frame itself to allow
> > using the pages directly for disk I/O, network I/O or to mmap it to userspace
> > for live processing.
> > 
> > Write and header address lookup tested through LTTng. This patch contains
> > self-test code which detects if a client is actually trying to use the
> > read/write/get header address API to do random buffer offset access. If such
> > behavior is detected, a warning message is issued and the random access is done
> > as requested.
> 
> Hmm, if a warning message is done, it seems that the tracer should shut 
> down. Either support the operation or do not support it. Do not give the 
> user a "Oh, you really should not do that, but I'll let you anyway". 
> Otherwise you will be pressured to make it a true feature.
> 

It is _possible_ that such scenario occurs, and it is supported. It's
just very unlikely. With the lockless tracer, if the tracing code is
interrupted for a long time and still holds a reference to a previous
page while the current write offset went too far away, then it's OK to
do a backward-multiple-pages-walk. But it's very, very unlikely. And
this debugging option helps pinpointing ill uses of the buffers in the
"likely" path very quickly. So I would never consider it a "failure" per
se. It's one of those useful tracer-debug-only options.

> > 
> > TODO : Currently, no splice file operations are implemented. Should come soon.
> > The idea is to splice the buffers directly into files or to the network.
> > We have to make sure the page frame fields used are not used by disk I/O or
> > network.
> > 
> > Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at polymtl.ca>
> > CC: Jens Axboe <jens.axboe at oracle.com>
> > CC: Martin Bligh <mbligh at google.com>
> > CC: Peter Zijlstra <a.p.zijlstra at chello.nl>
> > CC: Tom Zanussi <zanussi at comcast.net>
> > CC: prasad at linux.vnet.ibm.com
> > CC: Linus Torvalds <torvalds at linux-foundation.org>
> > CC: Thomas Gleixner <tglx at linutronix.de>
> > CC: Steven Rostedt <rostedt at goodmis.org>
> > CC: od at suse.com
> > CC: "Frank Ch. Eigler" <fche at redhat.com>
> > CC: Andrew Morton <akpm at linux-foundation.org>
> > CC: hch at lst.de
> > CC: David Wilder <dwilder at us.ibm.com>
> > ---
> >  include/linux/ltt-relay.h |  182 +++++++++++
> >  ltt/ltt-relay-alloc.c     |  705 ++++++++++++++++++++++++++++++++++++++++++++++
> >  2 files changed, 887 insertions(+)
> > 
> > Index: linux-2.6-lttng/ltt/ltt-relay-alloc.c
> > ===================================================================
> > --- /dev/null	1970-01-01 00:00:00.000000000 +0000
> > +++ linux-2.6-lttng/ltt/ltt-relay-alloc.c	2009-03-05 15:05:56.000000000 -0500
> > @@ -0,0 +1,705 @@
> > +/*
> > + * Public API and common code for kernel->userspace relay file support.
> > + *
> > + * Copyright (C) 2002-2005 - Tom Zanussi (zanussi at us.ibm.com), IBM Corp
> > + * Copyright (C) 1999-2005 - Karim Yaghmour (karim at opersys.com)
> > + * Copyright (C) 2008 - Mathieu Desnoyers (mathieu.desnoyers at polymtl.ca)
> > + *
> > + * Moved to kernel/relay.c by Paul Mundt, 2006.
> > + * November 2006 - CPU hotplug support by Mathieu Desnoyers
> > + * 	(mathieu.desnoyers at polymtl.ca)
> > + *
> > + * This file is released under the GPL.
> > + */
> > +#include <linux/errno.h>
> > +#include <linux/stddef.h>
> > +#include <linux/slab.h>
> > +#include <linux/module.h>
> > +#include <linux/string.h>
> > +#include <linux/ltt-relay.h>
> > +#include <linux/vmalloc.h>
> > +#include <linux/mm.h>
> > +#include <linux/cpu.h>
> > +#include <linux/splice.h>
> > +#include <linux/bitops.h>
> > +
> > +/* list of open channels, for cpu hotplug */
> > +static DEFINE_MUTEX(relay_channels_mutex);
> > +static LIST_HEAD(relay_channels);
> > +
> > +/**
> > + *	relay_alloc_buf - allocate a channel buffer
> > + *	@buf: the buffer struct
> > + *	@size: total size of the buffer
> > + */
> > +static int relay_alloc_buf(struct rchan_buf *buf, size_t *size)
> > +{
> > +	unsigned int i, n_pages;
> > +	struct buf_page *buf_page, *n;
> > +
> > +	*size = PAGE_ALIGN(*size);
> > +	n_pages = *size >> PAGE_SHIFT;
> > +
> > +	INIT_LIST_HEAD(&buf->pages);
> > +
> > +	for (i = 0; i < n_pages; i++) {
> > +		buf_page = kmalloc_node(sizeof(*buf_page), GFP_KERNEL,
> > +			cpu_to_node(buf->cpu));
> > +		if (unlikely(!buf_page))
> > +			goto depopulate;
> > +		buf_page->page = alloc_pages_node(cpu_to_node(buf->cpu),
> > +			GFP_KERNEL | __GFP_ZERO, 0);
> 
> So these buffers allocate single pages?
> 

Each buffer allocate n_pages. Each of these pages are allocated with
alloc_pages_node(), and linked together in a linked list.

I use a linked list of pages rather than a page pointer array to make
sure I never depend on vmalloc if the page pointer array grows too
large. vmalloc'd data can trigger page faults, which I don't want. Note
that an alternate option to this would be to use vmalloc_sync_all()
after the pages have been allocated, but I prefer not to use vmalloc at
all unless there is a clear advantage.

Note that the tracer calls vmalloc_sync_all() for each tracer module
registered. Actually, the probe modules should also call
vmalloc_sync_all() before their callback gets registered. This would
ensure the module code and data would never trigger a page fault.

I actually wonder why we don't add a vmalloc_sync_all() call in
module.c ? this is a slow path anyway...

> > +		if (unlikely(!buf_page->page)) {
> > +			kfree(buf_page);
> > +			goto depopulate;
> > +		}
> > +		list_add_tail(&buf_page->list, &buf->pages);
> > +		buf_page->offset = (size_t)i << PAGE_SHIFT;
> > +		set_page_private(buf_page->page, (unsigned long)buf_page);
> > +		if (i == 0) {
> > +			buf->wpage = buf_page;
> > +			buf->hpage[0] = buf_page;
> > +			buf->hpage[1] = buf_page;
> > +			buf->rpage = buf_page;
> > +		}
> > +	}
> > +	buf->page_count = n_pages;
> > +	return 0;
> > +
> > +depopulate:
> > +	list_for_each_entry_safe(buf_page, n, &buf->pages, list) {
> > +		list_del_init(&buf_page->list);
> > +		__free_page(buf_page->page);
> > +		kfree(buf_page);
> > +	}
> > +	return -ENOMEM;
> > +}
> > +
> > +/**
> > + *	relay_create_buf - allocate and initialize a channel buffer
> > + *	@chan: the relay channel
> > + *	@cpu: cpu the buffer belongs to
> > + *
> > + *	Returns channel buffer if successful, %NULL otherwise.
> > + */
> > +static struct rchan_buf *relay_create_buf(struct rchan *chan, int cpu)
> > +{
> > +	int ret;
> > +	struct rchan_buf *buf = kzalloc(sizeof(struct rchan_buf), GFP_KERNEL);
> > +	if (!buf)
> > +		return NULL;
> > +
> > +	buf->cpu = cpu;
> > +	ret = relay_alloc_buf(buf, &chan->alloc_size);
> > +	if (ret)
> > +		goto free_buf;
> > +
> > +	buf->chan = chan;
> > +	kref_get(&buf->chan->kref);
> > +	return buf;
> > +
> > +free_buf:
> > +	kfree(buf);
> > +	return NULL;
> > +}
> > +
> > +/**
> > + *	relay_destroy_channel - free the channel struct
> > + *	@kref: target kernel reference that contains the relay channel
> > + *
> > + *	Should only be called from kref_put().
> > + */
> > +static void relay_destroy_channel(struct kref *kref)
> > +{
> > +	struct rchan *chan = container_of(kref, struct rchan, kref);
> > +	kfree(chan);
> > +}
> > +
> > +void ltt_relay_get_chan(struct rchan *chan)
> > +{
> > +	kref_get(&chan->kref);
> > +}
> > +EXPORT_SYMBOL_GPL(ltt_relay_get_chan);
> > +
> > +void ltt_relay_put_chan(struct rchan *chan)
> > +{
> > +	kref_put(&chan->kref, relay_destroy_channel);
> > +}
> > +EXPORT_SYMBOL_GPL(ltt_relay_put_chan);
> > +
> > +/**
> > + *	relay_destroy_buf - destroy an rchan_buf struct and associated buffer
> > + *	@buf: the buffer struct
> > + */
> > +static void relay_destroy_buf(struct rchan_buf *buf)
> > +{
> > +	struct rchan *chan = buf->chan;
> > +	struct buf_page *buf_page, *n;
> > +
> > +	list_for_each_entry_safe(buf_page, n, &buf->pages, list) {
> > +		list_del_init(&buf_page->list);
> > +		__free_page(buf_page->page);
> > +		kfree(buf_page);
> > +	}
> > +	chan->buf[buf->cpu] = NULL;
> > +	kfree(buf);
> > +	kref_put(&chan->kref, relay_destroy_channel);
> > +}
> > +
> > +/**
> > + *	relay_remove_buf - remove a channel buffer
> > + *	@kref: target kernel reference that contains the relay buffer
> > + *
> > + *	Removes the file from the fileystem, which also frees the
> > + *	rchan_buf_struct and the channel buffer.  Should only be called from
> > + *	kref_put().
> > + */
> > +static void relay_remove_buf(struct kref *kref)
> > +{
> > +	struct rchan_buf *buf = container_of(kref, struct rchan_buf, kref);
> > +	buf->chan->cb->remove_buf_file(buf->dentry);
> > +	relay_destroy_buf(buf);
> > +}
> > +
> > +void ltt_relay_get_chan_buf(struct rchan_buf *buf)
> > +{
> > +	kref_get(&buf->kref);
> > +}
> > +EXPORT_SYMBOL_GPL(ltt_relay_get_chan_buf);
> > +
> > +void ltt_relay_put_chan_buf(struct rchan_buf *buf)
> > +{
> > +	kref_put(&buf->kref, relay_remove_buf);
> > +}
> > +EXPORT_SYMBOL_GPL(ltt_relay_put_chan_buf);
> > +
> > +/*
> > + * High-level relay kernel API and associated functions.
> > + */
> > +
> > +/*
> > + * rchan_callback implementations defining default channel behavior.  Used
> > + * in place of corresponding NULL values in client callback struct.
> > + */
> > +
> > +/*
> > + * create_buf_file_create() default callback.  Does nothing.
> > + */
> > +static struct dentry *create_buf_file_default_callback(const char *filename,
> > +						       struct dentry *parent,
> > +						       int mode,
> > +						       struct rchan_buf *buf)
> > +{
> > +	return NULL;
> > +}
> > +
> > +/*
> > + * remove_buf_file() default callback.  Does nothing.
> > + */
> > +static int remove_buf_file_default_callback(struct dentry *dentry)
> > +{
> > +	return -EINVAL;
> > +}
> > +
> > +/* relay channel default callbacks */
> > +static struct rchan_callbacks default_channel_callbacks = {
> > +	.create_buf_file = create_buf_file_default_callback,
> > +	.remove_buf_file = remove_buf_file_default_callback,
> > +};
> > +
> > +/**
> > + *	__relay_reset - reset a channel buffer
> > + *	@buf: the channel buffer
> > + *	@init: 1 if this is a first-time initialization
> > + *
> > + *	See relay_reset() for description of effect.
> > + */
> > +static void __relay_reset(struct rchan_buf *buf, unsigned int init)
> > +{
> > +	if (init)
> > +		kref_init(&buf->kref);
> > +}
> > +
> > +/*
> > + *	relay_open_buf - create a new relay channel buffer
> > + *
> > + *	used by relay_open() and CPU hotplug.
> > + */
> > +static struct rchan_buf *relay_open_buf(struct rchan *chan, unsigned int cpu)
> > +{
> > +	struct rchan_buf *buf = NULL;
> > +	struct dentry *dentry;
> > +	char *tmpname;
> > +
> > +	tmpname = kzalloc(NAME_MAX + 1, GFP_KERNEL);
> > +	if (!tmpname)
> > +		goto end;
> > +	snprintf(tmpname, NAME_MAX, "%s%d", chan->base_filename, cpu);
> > +
> > +	buf = relay_create_buf(chan, cpu);
> > +	if (!buf)
> > +		goto free_name;
> > +
> > +	__relay_reset(buf, 1);
> > +
> > +	/* Create file in fs */
> > +	dentry = chan->cb->create_buf_file(tmpname, chan->parent, S_IRUSR,
> > +					   buf);
> > +	if (!dentry)
> > +		goto free_buf;
> > +
> > +	buf->dentry = dentry;
> > +
> > +	goto free_name;
> > +
> > +free_buf:
> > +	relay_destroy_buf(buf);
> > +	buf = NULL;
> > +free_name:
> > +	kfree(tmpname);
> > +end:
> > +	return buf;
> > +}
> > +
> > +/**
> > + *	relay_close_buf - close a channel buffer
> > + *	@buf: channel buffer
> > + *
> > + *	Restores the default callbacks.
> > + *	The channel buffer and channel buffer data structure are then freed
> > + *	automatically when the last reference is given up.
> > + */
> > +static void relay_close_buf(struct rchan_buf *buf)
> > +{
> > +	kref_put(&buf->kref, relay_remove_buf);
> > +}
> > +
> > +static void setup_callbacks(struct rchan *chan,
> > +				   struct rchan_callbacks *cb)
> > +{
> > +	if (!cb) {
> > +		chan->cb = &default_channel_callbacks;
> > +		return;
> > +	}
> > +
> > +	if (!cb->create_buf_file)
> > +		cb->create_buf_file = create_buf_file_default_callback;
> > +	if (!cb->remove_buf_file)
> > +		cb->remove_buf_file = remove_buf_file_default_callback;
> > +	chan->cb = cb;
> > +}
> > +
> > +/**
> > + * 	relay_hotcpu_callback - CPU hotplug callback
> > + * 	@nb: notifier block
> > + * 	@action: hotplug action to take
> > + * 	@hcpu: CPU number
> > + *
> > + * 	Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
> > + */
> > +static int __cpuinit relay_hotcpu_callback(struct notifier_block *nb,
> > +				unsigned long action,
> > +				void *hcpu)
> > +{
> > +	unsigned int hotcpu = (unsigned long)hcpu;
> > +	struct rchan *chan;
> > +
> > +	switch (action) {
> > +	case CPU_UP_PREPARE:
> > +	case CPU_UP_PREPARE_FROZEN:
> > +		mutex_lock(&relay_channels_mutex);
> > +		list_for_each_entry(chan, &relay_channels, list) {
> > +			if (chan->buf[hotcpu])
> > +				continue;
> > +			chan->buf[hotcpu] = relay_open_buf(chan, hotcpu);
> > +			if (!chan->buf[hotcpu]) {
> > +				printk(KERN_ERR
> > +					"relay_hotcpu_callback: cpu %d buffer "
> > +					"creation failed\n", hotcpu);
> > +				mutex_unlock(&relay_channels_mutex);
> > +				return NOTIFY_BAD;
> > +			}
> > +		}
> > +		mutex_unlock(&relay_channels_mutex);
> > +		break;
> > +	case CPU_DEAD:
> > +	case CPU_DEAD_FROZEN:
> > +		/* No need to flush the cpu : will be flushed upon
> > +		 * final relay_flush() call. */
> > +		break;
> > +	}
> > +	return NOTIFY_OK;
> > +}
> > +
> > +/**
> > + *	ltt_relay_open - create a new relay channel
> > + *	@base_filename: base name of files to create
> > + *	@parent: dentry of parent directory, %NULL for root directory
> > + *	@subbuf_size: size of sub-buffers
> > + *	@n_subbufs: number of sub-buffers
> > + *	@cb: client callback functions
> > + *	@private_data: user-defined data
> > + *
> > + *	Returns channel pointer if successful, %NULL otherwise.
> > + *
> > + *	Creates a channel buffer for each cpu using the sizes and
> > + *	attributes specified.  The created channel buffer files
> > + *	will be named base_filename0...base_filenameN-1.  File
> > + *	permissions will be %S_IRUSR.
> > + */
> > +struct rchan *ltt_relay_open(const char *base_filename,
> > +			 struct dentry *parent,
> > +			 size_t subbuf_size,
> > +			 size_t n_subbufs,
> > +			 struct rchan_callbacks *cb,
> > +			 void *private_data)
> > +{
> > +	unsigned int i;
> > +	struct rchan *chan;
> > +	if (!base_filename)
> > +		return NULL;
> > +
> > +	if (!(subbuf_size && n_subbufs))
> > +		return NULL;
> > +
> > +	chan = kzalloc(sizeof(struct rchan), GFP_KERNEL);
> > +	if (!chan)
> > +		return NULL;
> > +
> > +	chan->version = LTT_RELAY_CHANNEL_VERSION;
> > +	chan->n_subbufs = n_subbufs;
> > +	chan->subbuf_size = subbuf_size;
> 
> You declare the sub buf size here, but I do not see how it gets allocated.
> 

below :

chan->alloc_size = FIX_SIZE(subbuf_size * n_subbufs);

Then :

relay_open_buf()
  relay_create_buf()
    relay_alloc_buf(buf, &chan->alloc_size); 

And there we iterate on n_pages, this is PAGE_ALIGN(*size) >> PAGE_SHIFT

> > +static int relay_alloc_buf(struct rchan_buf *buf, size_t *size)
> > +{
> > +   unsigned int i, n_pages;
> > +   struct buf_page *buf_page, *n;
> > +
> > +   *size = PAGE_ALIGN(*size);
> > +   n_pages = *size >> PAGE_SHIFT;
> > +
> > +   INIT_LIST_HEAD(&buf->pages);
> > +
> > +   for (i = 0; i < n_pages; i++) {

Mathieu


> -- Steve
> 
> > +	chan->subbuf_size_order = get_count_order(subbuf_size);
> > +	chan->alloc_size = FIX_SIZE(subbuf_size * n_subbufs);
> > +	chan->parent = parent;
> > +	chan->private_data = private_data;
> > +	strlcpy(chan->base_filename, base_filename, NAME_MAX);
> > +	setup_callbacks(chan, cb);
> > +	kref_init(&chan->kref);
> > +
> > +	mutex_lock(&relay_channels_mutex);
> > +	for_each_online_cpu(i) {
> > +		chan->buf[i] = relay_open_buf(chan, i);
> > +		if (!chan->buf[i])
> > +			goto free_bufs;
> > +	}
> > +	list_add(&chan->list, &relay_channels);
> > +	mutex_unlock(&relay_channels_mutex);
> > +
> > +	return chan;
> > +
> > +free_bufs:
> > +	for_each_possible_cpu(i) {
> > +		if (!chan->buf[i])
> > +			break;
> > +		relay_close_buf(chan->buf[i]);
> > +	}
> > +
> > +	kref_put(&chan->kref, relay_destroy_channel);
> > +	mutex_unlock(&relay_channels_mutex);
> > +	return NULL;
> > +}
> > +EXPORT_SYMBOL_GPL(ltt_relay_open);
> > +
> > +/**
> > + *	ltt_relay_close - close the channel
> > + *	@chan: the channel
> > + *
> > + *	Closes all channel buffers and frees the channel.
> > + */
> > +void ltt_relay_close(struct rchan *chan)
> > +{
> > +	unsigned int i;
> > +
> > +	if (!chan)
> > +		return;
> > +
> > +	mutex_lock(&relay_channels_mutex);
> > +	for_each_possible_cpu(i)
> > +		if (chan->buf[i])
> > +			relay_close_buf(chan->buf[i]);
> > +
> > +	list_del(&chan->list);
> > +	kref_put(&chan->kref, relay_destroy_channel);
> > +	mutex_unlock(&relay_channels_mutex);
> > +}
> > +EXPORT_SYMBOL_GPL(ltt_relay_close);
> > +
> > +/*
> > + * Start iteration at the previous element. Skip the real list head.
> > + */
> > +static struct buf_page *ltt_relay_find_prev_page(struct rchan_buf *buf,
> > +	struct buf_page *page, size_t offset, ssize_t diff_offset)
> > +{
> > +	struct buf_page *iter;
> > +	size_t orig_iter_off;
> > +	unsigned int i = 0;
> > +
> > +	orig_iter_off = page->offset;
> > +	list_for_each_entry_reverse(iter, &page->list, list) {
> > +		/*
> > +		 * Skip the real list head.
> > +		 */
> > +		if (&iter->list == &buf->pages)
> > +			continue;
> > +		i++;
> > +		if (offset >= iter->offset
> > +			&& offset < iter->offset + PAGE_SIZE) {
> > +#ifdef CONFIG_LTT_RELAY_CHECK_RANDOM_ACCESS
> > +			if (i > 1) {
> > +				printk(KERN_WARNING
> > +					"Backward random access detected in "
> > +					"ltt_relay. Iterations %u, "
> > +					"offset %zu, orig iter->off %zu, "
> > +					"iter->off %zu diff_offset %zd.\n", i,
> > +					offset, orig_iter_off, iter->offset,
> > +					diff_offset);
> > +				WARN_ON(1);
> > +			}
> > +#endif
> > +			return iter;
> > +		}
> > +	}
> > +	return NULL;
> > +}
> > +
> > +/*
> > + * Start iteration at the next element. Skip the real list head.
> > + */
> > +static struct buf_page *ltt_relay_find_next_page(struct rchan_buf *buf,
> > +	struct buf_page *page, size_t offset, ssize_t diff_offset)
> > +{
> > +	struct buf_page *iter;
> > +	unsigned int i = 0;
> > +	size_t orig_iter_off;
> > +
> > +	orig_iter_off = page->offset;
> > +	list_for_each_entry(iter, &page->list, list) {
> > +		/*
> > +		 * Skip the real list head.
> > +		 */
> > +		if (&iter->list == &buf->pages)
> > +			continue;
> > +		i++;
> > +		if (offset >= iter->offset
> > +			&& offset < iter->offset + PAGE_SIZE) {
> > +#ifdef CONFIG_LTT_RELAY_CHECK_RANDOM_ACCESS
> > +			if (i > 1) {
> > +				printk(KERN_WARNING
> > +					"Forward random access detected in "
> > +					"ltt_relay. Iterations %u, "
> > +					"offset %zu, orig iter->off %zu, "
> > +					"iter->off %zu diff_offset %zd.\n", i,
> > +					offset, orig_iter_off, iter->offset,
> > +					diff_offset);
> > +				WARN_ON(1);
> > +			}
> > +#endif
> > +			return iter;
> > +		}
> > +	}
> > +	return NULL;
> > +}
> > +
> > +/*
> > + * Find the page containing "offset". Cache it if it is after the currently
> > + * cached page.
> > + */
> > +static struct buf_page *ltt_relay_cache_page(struct rchan_buf *buf,
> > +		struct buf_page **page_cache,
> > +		struct buf_page *page, size_t offset)
> > +{
> > +	ssize_t diff_offset;
> > +	ssize_t half_buf_size = buf->chan->alloc_size >> 1;
> > +
> > +	/*
> > +	 * Make sure this is the page we want to write into. The current
> > +	 * page is changed concurrently by other writers. [wrh]page are
> > +	 * used as a cache remembering the last page written
> > +	 * to/read/looked up for header address. No synchronization;
> > +	 * could have to find the previous page is a nested write
> > +	 * occured. Finding the right page is done by comparing the
> > +	 * dest_offset with the buf_page offsets.
> > +	 * When at the exact opposite of the buffer, bias towards forward search
> > +	 * because it will be cached.
> > +	 */
> > +
> > +	diff_offset = (ssize_t)offset - (ssize_t)page->offset;
> > +	if (diff_offset <= -(ssize_t)half_buf_size)
> > +		diff_offset += buf->chan->alloc_size;
> > +	else if (diff_offset > half_buf_size)
> > +		diff_offset -= buf->chan->alloc_size;
> > +
> > +	if (unlikely(diff_offset >= (ssize_t)PAGE_SIZE)) {
> > +		page = ltt_relay_find_next_page(buf, page, offset, diff_offset);
> > +		WARN_ON(!page);
> > +		*page_cache = page;
> > +	} else if (unlikely(diff_offset < 0)) {
> > +		page = ltt_relay_find_prev_page(buf, page, offset, diff_offset);
> > +		WARN_ON(!page);
> > +	}
> > +	return page;
> > +}
> > +
> > +/**
> > + * ltt_relay_write - write data to a ltt_relay buffer.
> > + * @buf : buffer
> > + * @offset : offset within the buffer
> > + * @src : source address
> > + * @len : length to write
> > + */
> > +int ltt_relay_write(struct rchan_buf *buf, size_t offset,
> > +	const void *src, size_t len)
> > +{
> > +	struct buf_page *page;
> > +	ssize_t pagecpy, orig_len;
> > +
> > +	orig_len = len;
> > +	offset &= buf->chan->alloc_size - 1;
> > +	page = buf->wpage;
> > +	if (unlikely(!len))
> > +		return 0;
> > +	for (;;) {
> > +		page = ltt_relay_cache_page(buf, &buf->wpage, page, offset);
> > +		pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
> > +		memcpy(page_address(page->page)
> > +			+ (offset & ~PAGE_MASK), src, pagecpy);
> > +		len -= pagecpy;
> > +		if (likely(!len))
> > +			break;
> > +		src += pagecpy;
> > +		offset += pagecpy;
> > +		/*
> > +		 * Underlying layer should never ask for writes across
> > +		 * subbuffers.
> > +		 */
> > +		WARN_ON(offset >= buf->chan->alloc_size);
> > +	}
> > +	return orig_len;
> > +}
> > +EXPORT_SYMBOL_GPL(ltt_relay_write);
> > +
> > +/**
> > + * ltt_relay_read - read data from ltt_relay_buffer.
> > + * @buf : buffer
> > + * @offset : offset within the buffer
> > + * @dest : destination address
> > + * @len : length to write
> > + */
> > +int ltt_relay_read(struct rchan_buf *buf, size_t offset,
> > +	void *dest, size_t len)
> > +{
> > +	struct buf_page *page;
> > +	ssize_t pagecpy, orig_len;
> > +
> > +	orig_len = len;
> > +	offset &= buf->chan->alloc_size - 1;
> > +	page = buf->rpage;
> > +	if (unlikely(!len))
> > +		return 0;
> > +	for (;;) {
> > +		page = ltt_relay_cache_page(buf, &buf->rpage, page, offset);
> > +		pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
> > +		memcpy(dest, page_address(page->page) + (offset & ~PAGE_MASK),
> > +			pagecpy);
> > +		len -= pagecpy;
> > +		if (likely(!len))
> > +			break;
> > +		dest += pagecpy;
> > +		offset += pagecpy;
> > +		/*
> > +		 * Underlying layer should never ask for reads across
> > +		 * subbuffers.
> > +		 */
> > +		WARN_ON(offset >= buf->chan->alloc_size);
> > +	}
> > +	return orig_len;
> > +}
> > +EXPORT_SYMBOL_GPL(ltt_relay_read);
> > +
> > +/**
> > + * ltt_relay_read_get_page - Get a whole page to read from
> > + * @buf : buffer
> > + * @offset : offset within the buffer
> > + */
> > +struct buf_page *ltt_relay_read_get_page(struct rchan_buf *buf, size_t offset)
> > +{
> > +	struct buf_page *page;
> > +
> > +	offset &= buf->chan->alloc_size - 1;
> > +	page = buf->rpage;
> > +	page = ltt_relay_cache_page(buf, &buf->rpage, page, offset);
> > +	return page;
> > +}
> > +EXPORT_SYMBOL_GPL(ltt_relay_read_get_page);
> > +
> > +/**
> > + * ltt_relay_offset_address - get address of a location within the buffer
> > + * @buf : buffer
> > + * @offset : offset within the buffer.
> > + *
> > + * Return the address where a given offset is located.
> > + * Should be used to get the current subbuffer header pointer. Given we know
> > + * it's never on a page boundary, it's safe to write directly to this address,
> > + * as long as the write is never bigger than a page size.
> > + */
> > +void *ltt_relay_offset_address(struct rchan_buf *buf, size_t offset)
> > +{
> > +	struct buf_page *page;
> > +	unsigned int odd;
> > +
> > +	offset &= buf->chan->alloc_size - 1;
> > +	odd = !!(offset & buf->chan->subbuf_size);
> > +	page = buf->hpage[odd];
> > +	if (offset < page->offset || offset >= page->offset + PAGE_SIZE)
> > +		buf->hpage[odd] = page = buf->wpage;
> > +	page = ltt_relay_cache_page(buf, &buf->hpage[odd], page, offset);
> > +	return page_address(page->page) + (offset & ~PAGE_MASK);
> > +}
> > +EXPORT_SYMBOL_GPL(ltt_relay_offset_address);
> > +
> > +/**
> > + *	relay_file_open - open file op for relay files
> > + *	@inode: the inode
> > + *	@filp: the file
> > + *
> > + *	Increments the channel buffer refcount.
> > + */
> > +static int relay_file_open(struct inode *inode, struct file *filp)
> > +{
> > +	struct rchan_buf *buf = inode->i_private;
> > +	kref_get(&buf->kref);
> > +	filp->private_data = buf;
> > +
> > +	return nonseekable_open(inode, filp);
> > +}
> > +
> > +/**
> > + *	relay_file_release - release file op for relay files
> > + *	@inode: the inode
> > + *	@filp: the file
> > + *
> > + *	Decrements the channel refcount, as the filesystem is
> > + *	no longer using it.
> > + */
> > +static int relay_file_release(struct inode *inode, struct file *filp)
> > +{
> > +	struct rchan_buf *buf = filp->private_data;
> > +	kref_put(&buf->kref, relay_remove_buf);
> > +
> > +	return 0;
> > +}
> > +
> > +const struct file_operations ltt_relay_file_operations = {
> > +	.open		= relay_file_open,
> > +	.release	= relay_file_release,
> > +};
> > +EXPORT_SYMBOL_GPL(ltt_relay_file_operations);
> > +
> > +static __init int relay_init(void)
> > +{
> > +	hotcpu_notifier(relay_hotcpu_callback, 5);
> > +	return 0;
> > +}
> > +
> > +module_init(relay_init);
> > Index: linux-2.6-lttng/include/linux/ltt-relay.h
> > ===================================================================
> > --- /dev/null	1970-01-01 00:00:00.000000000 +0000
> > +++ linux-2.6-lttng/include/linux/ltt-relay.h	2009-03-05 15:05:56.000000000 -0500
> > @@ -0,0 +1,182 @@
> > +/*
> > + * linux/include/linux/ltt-relay.h
> > + *
> > + * Copyright (C) 2002, 2003 - Tom Zanussi (zanussi at us.ibm.com), IBM Corp
> > + * Copyright (C) 1999, 2000, 2001, 2002 - Karim Yaghmour (karim at opersys.com)
> > + * Copyright (C) 2008 - Mathieu Desnoyers (mathieu.desnoyers at polymtl.ca)
> > + *
> > + * CONFIG_RELAY definitions and declarations
> > + */
> > +
> > +#ifndef _LINUX_LTT_RELAY_H
> > +#define _LINUX_LTT_RELAY_H
> > +
> > +#include <linux/types.h>
> > +#include <linux/sched.h>
> > +#include <linux/timer.h>
> > +#include <linux/wait.h>
> > +#include <linux/list.h>
> > +#include <linux/fs.h>
> > +#include <linux/poll.h>
> > +#include <linux/kref.h>
> > +#include <linux/mm.h>
> > +
> > +/* Needs a _much_ better name... */
> > +#define FIX_SIZE(x) ((((x) - 1) & PAGE_MASK) + PAGE_SIZE)
> > +
> > +/*
> > + * Tracks changes to rchan/rchan_buf structs
> > + */
> > +#define LTT_RELAY_CHANNEL_VERSION		8
> > +
> > +struct rchan_buf;
> > +
> > +struct buf_page {
> > +	struct page *page;
> > +	size_t offset;		/* page offset in the buffer */
> > +	struct list_head list;	/* buffer linked list */
> > +};
> > +
> > +/*
> > + * Per-cpu relay channel buffer
> > + */
> > +struct rchan_buf {
> > +	void *chan_private;		/* private data for this buf */
> > +	struct rchan *chan;		/* associated channel */
> > +	struct dentry *dentry;		/* channel file dentry */
> > +	struct kref kref;		/* channel buffer refcount */
> > +	struct list_head pages;		/* list of buffer pages */
> > +	struct buf_page *wpage;		/* current write page (cache) */
> > +	struct buf_page *hpage[2];	/* current subbuf header page (cache) */
> > +	struct buf_page *rpage;		/* current subbuf read page (cache) */
> > +	unsigned int page_count;	/* number of current buffer pages */
> > +	unsigned int cpu;		/* this buf's cpu */
> > +} ____cacheline_aligned;
> > +
> > +/*
> > + * Relay channel data structure
> > + */
> > +struct rchan {
> > +	u32 version;			/* the version of this struct */
> > +	size_t subbuf_size;		/* sub-buffer size */
> > +	size_t n_subbufs;		/* number of sub-buffers per buffer */
> > +	size_t alloc_size;		/* total buffer size allocated */
> > +	struct rchan_callbacks *cb;	/* client callbacks */
> > +	struct kref kref;		/* channel refcount */
> > +	void *private_data;		/* for user-defined data */
> > +	struct rchan_buf *buf[NR_CPUS]; /* per-cpu channel buffers */
> > +	struct list_head list;		/* for channel list */
> > +	struct dentry *parent;		/* parent dentry passed to open */
> > +	int subbuf_size_order;		/* order of sub-buffer size */
> > +	char base_filename[NAME_MAX];	/* saved base filename */
> > +};
> > +
> > +/*
> > + * Relay channel client callbacks
> > + */
> > +struct rchan_callbacks {
> > +	/*
> > +	 * subbuf_start - called on buffer-switch to a new sub-buffer
> > +	 * @buf: the channel buffer containing the new sub-buffer
> > +	 * @subbuf: the start of the new sub-buffer
> > +	 * @prev_subbuf: the start of the previous sub-buffer
> > +	 * @prev_padding: unused space at the end of previous sub-buffer
> > +	 *
> > +	 * The client should return 1 to continue logging, 0 to stop
> > +	 * logging.
> > +	 *
> > +	 * NOTE: subbuf_start will also be invoked when the buffer is
> > +	 *       created, so that the first sub-buffer can be initialized
> > +	 *       if necessary.  In this case, prev_subbuf will be NULL.
> > +	 *
> > +	 * NOTE: the client can reserve bytes at the beginning of the new
> > +	 *       sub-buffer by calling subbuf_start_reserve() in this callback.
> > +	 */
> > +	int (*subbuf_start) (struct rchan_buf *buf,
> > +			     void *subbuf,
> > +			     void *prev_subbuf,
> > +			     size_t prev_padding);
> > +
> > +	/*
> > +	 * create_buf_file - create file to represent a relay channel buffer
> > +	 * @filename: the name of the file to create
> > +	 * @parent: the parent of the file to create
> > +	 * @mode: the mode of the file to create
> > +	 * @buf: the channel buffer
> > +	 *
> > +	 * Called during relay_open(), once for each per-cpu buffer,
> > +	 * to allow the client to create a file to be used to
> > +	 * represent the corresponding channel buffer.  If the file is
> > +	 * created outside of relay, the parent must also exist in
> > +	 * that filesystem.
> > +	 *
> > +	 * The callback should return the dentry of the file created
> > +	 * to represent the relay buffer.
> > +	 *
> > +	 * Setting the is_global outparam to a non-zero value will
> > +	 * cause relay_open() to create a single global buffer rather
> > +	 * than the default set of per-cpu buffers.
> > +	 *
> > +	 * See Documentation/filesystems/relayfs.txt for more info.
> > +	 */
> > +	struct dentry *(*create_buf_file)(const char *filename,
> > +					  struct dentry *parent,
> > +					  int mode,
> > +					  struct rchan_buf *buf);
> > +
> > +	/*
> > +	 * remove_buf_file - remove file representing a relay channel buffer
> > +	 * @dentry: the dentry of the file to remove
> > +	 *
> > +	 * Called during relay_close(), once for each per-cpu buffer,
> > +	 * to allow the client to remove a file used to represent a
> > +	 * channel buffer.
> > +	 *
> > +	 * The callback should return 0 if successful, negative if not.
> > +	 */
> > +	int (*remove_buf_file)(struct dentry *dentry);
> > +};
> > +
> > +extern int ltt_relay_write(struct rchan_buf *buf, size_t offset,
> > +	const void *src, size_t len);
> > +
> > +extern int ltt_relay_read(struct rchan_buf *buf, size_t offset,
> > +	void *dest, size_t len);
> > +
> > +extern struct buf_page *ltt_relay_read_get_page(struct rchan_buf *buf,
> > +	size_t offset);
> > +
> > +/*
> > + * Return the address where a given offset is located.
> > + * Should be used to get the current subbuffer header pointer. Given we know
> > + * it's never on a page boundary, it's safe to write directly to this address,
> > + * as long as the write is never bigger than a page size.
> > + */
> > +extern void *ltt_relay_offset_address(struct rchan_buf *buf,
> > +	size_t offset);
> > +
> > +/*
> > + * CONFIG_LTT_RELAY kernel API, ltt/ltt-relay-alloc.c
> > + */
> > +
> > +struct rchan *ltt_relay_open(const char *base_filename,
> > +			 struct dentry *parent,
> > +			 size_t subbuf_size,
> > +			 size_t n_subbufs,
> > +			 struct rchan_callbacks *cb,
> > +			 void *private_data);
> > +extern void ltt_relay_close(struct rchan *chan);
> > +
> > +void ltt_relay_get_chan(struct rchan *chan);
> > +void ltt_relay_put_chan(struct rchan *chan);
> > +
> > +void ltt_relay_get_chan_buf(struct rchan_buf *buf);
> > +void ltt_relay_put_chan_buf(struct rchan_buf *buf);
> > +
> > +/*
> > + * exported ltt_relay file operations, ltt/ltt-relay-alloc.c
> > + */
> > +extern const struct file_operations ltt_relay_file_operations;
> > +
> > +#endif /* _LINUX_LTT_RELAY_H */
> > +
> > 
> > -- 
> > Mathieu Desnoyers
> > OpenPGP key fingerprint: 8CD5 52C3 8E3C 4140 715F  BA06 3F25 A8FE 3BAE 9A68
> > 

-- 
Mathieu Desnoyers
OpenPGP key fingerprint: 8CD5 52C3 8E3C 4140 715F  BA06 3F25 A8FE 3BAE 9A68




More information about the lttng-dev mailing list