[ltt-dev] [RFC PATCH] LTTng relay buffer allocation, read, write v2

Mathieu Desnoyers mathieu.desnoyers at polymtl.ca
Wed Oct 1 13:07:56 EDT 2008


As I told Martin, I was thinking about taking an axe and moving stuff around in
relay. Which I just did.

This patch reimplements relay with a linked list of pages. Provides read/write
wrappers which should be used to read or write from the buffers. It's the core
of a layered approach to the design requirements expressed by Martin and
discussed earlier.

It does not provide _any_ sort of locking on buffer data. Locking should be done
by the caller. Given that we might think of very lightweight locking schemes, it
makes sense to me that the underlying buffering infrastructure supports event
records larger than 1 page.

A cache saving 4 pointers is used to keep track of current page used for the
buffer for write, current page read and two contiguous subbuffer header pointer
lookup. The offset of each page within the buffer is saved in a structure
containing the offset, linked list and page frame pointer to permit cache lookup
without extra locking.

The offset and linked list are not placed in the page frame itself to allow
using the pages directly for disk I/O, network I/O or to mmap it to userspace
for live processing.

Write and header address lookup tested through LTTng. This patch contains
self-test code which detects if a client is actually trying to use the
read/write/get header address API to do random buffer offset access. If such
behavior is detected, a warning message is issued and the random access is done
as requested.

TODO : Currently, no splice file operations are implemented. Should come soon.
The idea is to splice the buffers directly into files or to the network.
We have to make sure the page frame fields used are not used by disk I/O or
network.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at polymtl.ca>
CC: Jens Axboe <jens.axboe at oracle.com>
CC: Martin Bligh <mbligh at google.com>
CC: Peter Zijlstra <a.p.zijlstra at chello.nl>
CC: Tom Zanussi <zanussi at comcast.net>
CC: prasad at linux.vnet.ibm.com
CC: Linus Torvalds <torvalds at linux-foundation.org>
CC: Thomas Gleixner <tglx at linutronix.de>
CC: Steven Rostedt <rostedt at goodmis.org>
CC: od at suse.com
CC: "Frank Ch. Eigler" <fche at redhat.com>
CC: Andrew Morton <akpm at linux-foundation.org>
CC: hch at lst.de
CC: David Wilder <dwilder at us.ibm.com>
---
 include/linux/ltt-relay.h |  351 ++++++++++++++++++++++++++++++++++
 ltt/ltt-relay-alloc.c     |  462 ++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 813 insertions(+)

Index: linux-2.6-lttng/ltt/ltt-relay-alloc.c
===================================================================
--- /dev/null	1970-01-01 00:00:00.000000000 +0000
+++ linux-2.6-lttng/ltt/ltt-relay-alloc.c	2008-10-01 12:43:21.000000000 -0400
@@ -0,0 +1,462 @@
+/*
+ * Public API and common code for kernel->userspace relay file support.
+ *
+ * Copyright (C) 2002-2005 - Tom Zanussi (zanussi at us.ibm.com), IBM Corp
+ * Copyright (C) 1999-2005 - Karim Yaghmour (karim at opersys.com)
+ * Copyright (C) 2008 - Mathieu Desnoyers (mathieu.desnoyers at polymtl.ca)
+ *
+ * Moved to kernel/relay.c by Paul Mundt, 2006.
+ * November 2006 - CPU hotplug support by Mathieu Desnoyers
+ * 	(mathieu.desnoyers at polymtl.ca)
+ *
+ * This file is released under the GPL.
+ */
+#include <linux/errno.h>
+#include <linux/stddef.h>
+#include <linux/slab.h>
+#include <linux/module.h>
+#include <linux/string.h>
+#include <linux/ltt-relay.h>
+#include <linux/vmalloc.h>
+#include <linux/mm.h>
+#include <linux/cpu.h>
+#include <linux/splice.h>
+
+/* list of open channels, for cpu hotplug */
+static DEFINE_MUTEX(relay_channels_mutex);
+static LIST_HEAD(relay_channels);
+
+/**
+ *	relay_alloc_buf - allocate a channel buffer
+ *	@buf: the buffer struct
+ *	@size: total size of the buffer
+ */
+static int relay_alloc_buf(struct rchan_buf *buf, size_t *size)
+{
+	unsigned int i, n_pages;
+	struct buf_page *buf_page, *n;
+
+	*size = PAGE_ALIGN(*size);
+	n_pages = *size >> PAGE_SHIFT;
+
+	INIT_LIST_HEAD(&buf->pages);
+
+	for (i = 0; i < n_pages; i++) {
+		buf_page = kmalloc(sizeof(struct buf_page), GFP_KERNEL);
+		if (unlikely(!buf_page))
+			goto depopulate;
+		buf_page->page = alloc_page(GFP_KERNEL | __GFP_ZERO);
+		if (unlikely(!buf_page->page)) {
+			kfree(buf_page);
+			goto depopulate;
+		}
+		list_add_tail(&buf_page->list, &buf->pages);
+		buf_page->offset = (size_t)i << PAGE_SHIFT;
+		if (i == 0) {
+			buf->wpage = buf_page;
+			buf->hpage[0] = buf_page;
+			buf->hpage[1] = buf_page;
+			buf->rpage = buf_page;
+		}
+	}
+	buf->page_count = n_pages;
+	return 0;
+
+depopulate:
+	list_for_each_entry_safe(buf_page, n, &buf->pages, list) {
+		list_del_init(&buf_page->list);
+		__free_page(buf_page->page);
+		kfree(buf_page);
+	}
+	return -ENOMEM;
+}
+
+/**
+ *	relay_create_buf - allocate and initialize a channel buffer
+ *	@chan: the relay channel
+ *
+ *	Returns channel buffer if successful, %NULL otherwise.
+ */
+static struct rchan_buf *relay_create_buf(struct rchan *chan)
+{
+	int ret;
+	struct rchan_buf *buf = kzalloc(sizeof(struct rchan_buf), GFP_KERNEL);
+	if (!buf)
+		return NULL;
+
+	ret = relay_alloc_buf(buf, &chan->alloc_size);
+	if (ret)
+		goto free_buf;
+
+	buf->chan = chan;
+	kref_get(&buf->chan->kref);
+	return buf;
+
+free_buf:
+	kfree(buf);
+	return NULL;
+}
+
+/**
+ *	relay_destroy_channel - free the channel struct
+ *	@kref: target kernel reference that contains the relay channel
+ *
+ *	Should only be called from kref_put().
+ */
+static void relay_destroy_channel(struct kref *kref)
+{
+	struct rchan *chan = container_of(kref, struct rchan, kref);
+	kfree(chan);
+}
+
+/**
+ *	relay_destroy_buf - destroy an rchan_buf struct and associated buffer
+ *	@buf: the buffer struct
+ */
+static void relay_destroy_buf(struct rchan_buf *buf)
+{
+	struct rchan *chan = buf->chan;
+	struct buf_page *buf_page, *n;
+
+	list_for_each_entry_safe(buf_page, n, &buf->pages, list) {
+		list_del_init(&buf_page->list);
+		__free_page(buf_page->page);
+		kfree(buf_page);
+	}
+	chan->buf[buf->cpu] = NULL;
+	kfree(buf);
+	kref_put(&chan->kref, relay_destroy_channel);
+}
+
+/**
+ *	relay_remove_buf - remove a channel buffer
+ *	@kref: target kernel reference that contains the relay buffer
+ *
+ *	Removes the file from the fileystem, which also frees the
+ *	rchan_buf_struct and the channel buffer.  Should only be called from
+ *	kref_put().
+ */
+static void relay_remove_buf(struct kref *kref)
+{
+	struct rchan_buf *buf = container_of(kref, struct rchan_buf, kref);
+	buf->chan->cb->remove_buf_file(buf->dentry);
+	relay_destroy_buf(buf);
+}
+
+/*
+ * High-level relay kernel API and associated functions.
+ */
+
+/*
+ * rchan_callback implementations defining default channel behavior.  Used
+ * in place of corresponding NULL values in client callback struct.
+ */
+
+/*
+ * create_buf_file_create() default callback.  Does nothing.
+ */
+static struct dentry *create_buf_file_default_callback(const char *filename,
+						       struct dentry *parent,
+						       int mode,
+						       struct rchan_buf *buf)
+{
+	return NULL;
+}
+
+/*
+ * remove_buf_file() default callback.  Does nothing.
+ */
+static int remove_buf_file_default_callback(struct dentry *dentry)
+{
+	return -EINVAL;
+}
+
+/* relay channel default callbacks */
+static struct rchan_callbacks default_channel_callbacks = {
+	.create_buf_file = create_buf_file_default_callback,
+	.remove_buf_file = remove_buf_file_default_callback,
+};
+
+/**
+ *	wakeup_readers - wake up readers waiting on a channel
+ *	@data: contains the channel buffer
+ *
+ *	This is the timer function used to defer reader waking.
+ */
+static void wakeup_readers(unsigned long data)
+{
+	struct rchan_buf *buf = (struct rchan_buf *)data;
+	wake_up_interruptible(&buf->read_wait);
+}
+
+/**
+ *	__relay_reset - reset a channel buffer
+ *	@buf: the channel buffer
+ *	@init: 1 if this is a first-time initialization
+ *
+ *	See relay_reset() for description of effect.
+ */
+static void __relay_reset(struct rchan_buf *buf, unsigned int init)
+{
+	if (init) {
+		init_waitqueue_head(&buf->read_wait);
+		kref_init(&buf->kref);
+		setup_timer(&buf->timer, wakeup_readers, (unsigned long)buf);
+	} else
+		del_timer_sync(&buf->timer);
+
+	buf->finalized = 0;
+}
+
+/*
+ *	relay_open_buf - create a new relay channel buffer
+ *
+ *	used by relay_open() and CPU hotplug.
+ */
+static struct rchan_buf *relay_open_buf(struct rchan *chan, unsigned int cpu)
+{
+ 	struct rchan_buf *buf = NULL;
+	struct dentry *dentry;
+ 	char *tmpname;
+
+	tmpname = kzalloc(NAME_MAX + 1, GFP_KERNEL);
+ 	if (!tmpname)
+ 		goto end;
+ 	snprintf(tmpname, NAME_MAX, "%s%d", chan->base_filename, cpu);
+
+	buf = relay_create_buf(chan);
+	if (!buf)
+ 		goto free_name;
+
+ 	buf->cpu = cpu;
+ 	__relay_reset(buf, 1);
+
+	/* Create file in fs */
+ 	dentry = chan->cb->create_buf_file(tmpname, chan->parent, S_IRUSR,
+ 					   buf);
+ 	if (!dentry)
+ 		goto free_buf;
+
+	buf->dentry = dentry;
+
+ 	goto free_name;
+
+free_buf:
+ 	relay_destroy_buf(buf);
+ 	buf = NULL;
+free_name:
+ 	kfree(tmpname);
+end:
+	return buf;
+}
+
+/**
+ *	relay_close_buf - close a channel buffer
+ *	@buf: channel buffer
+ *
+ *	Marks the buffer finalized and restores the default callbacks.
+ *	The channel buffer and channel buffer data structure are then freed
+ *	automatically when the last reference is given up.
+ */
+static void relay_close_buf(struct rchan_buf *buf)
+{
+	del_timer_sync(&buf->timer);
+	kref_put(&buf->kref, relay_remove_buf);
+}
+
+static void setup_callbacks(struct rchan *chan,
+				   struct rchan_callbacks *cb)
+{
+	if (!cb) {
+		chan->cb = &default_channel_callbacks;
+		return;
+	}
+
+	if (!cb->create_buf_file)
+		cb->create_buf_file = create_buf_file_default_callback;
+	if (!cb->remove_buf_file)
+		cb->remove_buf_file = remove_buf_file_default_callback;
+	chan->cb = cb;
+}
+
+/**
+ * 	relay_hotcpu_callback - CPU hotplug callback
+ * 	@nb: notifier block
+ * 	@action: hotplug action to take
+ * 	@hcpu: CPU number
+ *
+ * 	Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
+ */
+static int __cpuinit relay_hotcpu_callback(struct notifier_block *nb,
+				unsigned long action,
+				void *hcpu)
+{
+	unsigned int hotcpu = (unsigned long)hcpu;
+	struct rchan *chan;
+
+	switch(action) {
+	case CPU_UP_PREPARE:
+	case CPU_UP_PREPARE_FROZEN:
+		mutex_lock(&relay_channels_mutex);
+		list_for_each_entry(chan, &relay_channels, list) {
+			if (chan->buf[hotcpu])
+				continue;
+			chan->buf[hotcpu] = relay_open_buf(chan, hotcpu);
+			if(!chan->buf[hotcpu]) {
+				printk(KERN_ERR
+					"relay_hotcpu_callback: cpu %d buffer "
+					"creation failed\n", hotcpu);
+				mutex_unlock(&relay_channels_mutex);
+				return NOTIFY_BAD;
+			}
+		}
+		mutex_unlock(&relay_channels_mutex);
+		break;
+	case CPU_DEAD:
+	case CPU_DEAD_FROZEN:
+		/* No need to flush the cpu : will be flushed upon
+		 * final relay_flush() call. */
+		break;
+	}
+	return NOTIFY_OK;
+}
+
+/**
+ *	ltt_relay_open - create a new relay channel
+ *	@base_filename: base name of files to create
+ *	@parent: dentry of parent directory, %NULL for root directory
+ *	@subbuf_size: size of sub-buffers
+ *	@n_subbufs: number of sub-buffers
+ *	@cb: client callback functions
+ *	@private_data: user-defined data
+ *
+ *	Returns channel pointer if successful, %NULL otherwise.
+ *
+ *	Creates a channel buffer for each cpu using the sizes and
+ *	attributes specified.  The created channel buffer files
+ *	will be named base_filename0...base_filenameN-1.  File
+ *	permissions will be %S_IRUSR.
+ */
+struct rchan *ltt_relay_open(const char *base_filename,
+			 struct dentry *parent,
+			 size_t subbuf_size,
+			 size_t n_subbufs,
+			 struct rchan_callbacks *cb,
+			 void *private_data)
+{
+	unsigned int i;
+	struct rchan *chan;
+	if (!base_filename)
+		return NULL;
+
+	if (!(subbuf_size && n_subbufs))
+		return NULL;
+
+	chan = kzalloc(sizeof(struct rchan), GFP_KERNEL);
+	if (!chan)
+		return NULL;
+
+	chan->version = LTT_RELAY_CHANNEL_VERSION;
+	chan->n_subbufs = n_subbufs;
+	chan->subbuf_size = subbuf_size;
+	chan->alloc_size = FIX_SIZE(subbuf_size * n_subbufs);
+	chan->parent = parent;
+	chan->private_data = private_data;
+	strlcpy(chan->base_filename, base_filename, NAME_MAX);
+	setup_callbacks(chan, cb);
+	kref_init(&chan->kref);
+
+	mutex_lock(&relay_channels_mutex);
+	for_each_online_cpu(i) {
+		chan->buf[i] = relay_open_buf(chan, i);
+		if (!chan->buf[i])
+			goto free_bufs;
+	}
+	list_add(&chan->list, &relay_channels);
+	mutex_unlock(&relay_channels_mutex);
+
+	return chan;
+
+free_bufs:
+	for_each_online_cpu(i) {
+		if (!chan->buf[i])
+			break;
+		relay_close_buf(chan->buf[i]);
+	}
+
+	kref_put(&chan->kref, relay_destroy_channel);
+	mutex_unlock(&relay_channels_mutex);
+	return NULL;
+}
+EXPORT_SYMBOL_GPL(ltt_relay_open);
+
+/**
+ *	ltt_relay_close - close the channel
+ *	@chan: the channel
+ *
+ *	Closes all channel buffers and frees the channel.
+ */
+void ltt_relay_close(struct rchan *chan)
+{
+	unsigned int i;
+
+	if (!chan)
+		return;
+
+	mutex_lock(&relay_channels_mutex);
+	for_each_possible_cpu(i)
+		if (chan->buf[i])
+			relay_close_buf(chan->buf[i]);
+
+	list_del(&chan->list);
+	kref_put(&chan->kref, relay_destroy_channel);
+	mutex_unlock(&relay_channels_mutex);
+}
+EXPORT_SYMBOL_GPL(ltt_relay_close);
+
+/**
+ *	relay_file_open - open file op for relay files
+ *	@inode: the inode
+ *	@filp: the file
+ *
+ *	Increments the channel buffer refcount.
+ */
+static int relay_file_open(struct inode *inode, struct file *filp)
+{
+	struct rchan_buf *buf = inode->i_private;
+	kref_get(&buf->kref);
+	filp->private_data = buf;
+
+	return nonseekable_open(inode, filp);
+}
+
+/**
+ *	relay_file_release - release file op for relay files
+ *	@inode: the inode
+ *	@filp: the file
+ *
+ *	Decrements the channel refcount, as the filesystem is
+ *	no longer using it.
+ */
+static int relay_file_release(struct inode *inode, struct file *filp)
+{
+	struct rchan_buf *buf = filp->private_data;
+	kref_put(&buf->kref, relay_remove_buf);
+
+	return 0;
+}
+
+const struct file_operations ltt_relay_file_operations = {
+	.open		= relay_file_open,
+	.llseek		= no_llseek,
+	.release	= relay_file_release,
+};
+EXPORT_SYMBOL_GPL(ltt_relay_file_operations);
+
+static __init int relay_init(void)
+{
+	hotcpu_notifier(relay_hotcpu_callback, 5);
+	return 0;
+}
+
+module_init(relay_init);
Index: linux-2.6-lttng/include/linux/ltt-relay.h
===================================================================
--- /dev/null	1970-01-01 00:00:00.000000000 +0000
+++ linux-2.6-lttng/include/linux/ltt-relay.h	2008-10-01 12:40:18.000000000 -0400
@@ -0,0 +1,351 @@
+/*
+ * linux/include/linux/ltt-relay.h
+ *
+ * Copyright (C) 2002, 2003 - Tom Zanussi (zanussi at us.ibm.com), IBM Corp
+ * Copyright (C) 1999, 2000, 2001, 2002 - Karim Yaghmour (karim at opersys.com)
+ * Copyright (C) 2008 - Mathieu Desnoyers (mathieu.desnoyers at polymtl.ca)
+ *
+ * CONFIG_RELAY definitions and declarations
+ */
+
+#ifndef _LINUX_LTT_RELAY_H
+#define _LINUX_LTT_RELAY_H
+
+#include <linux/types.h>
+#include <linux/sched.h>
+#include <linux/timer.h>
+#include <linux/wait.h>
+#include <linux/list.h>
+#include <linux/fs.h>
+#include <linux/poll.h>
+#include <linux/kref.h>
+#include <linux/mm.h>
+
+/* Needs a _much_ better name... */
+#define FIX_SIZE(x) ((((x) - 1) & PAGE_MASK) + PAGE_SIZE)
+
+/*
+ * Tracks changes to rchan/rchan_buf structs
+ */
+#define LTT_RELAY_CHANNEL_VERSION		8
+
+struct buf_page {
+	struct page *page;
+	size_t offset;		/* page offset in the buffer */
+	struct list_head list;	/* buffer linked list */
+};
+
+/*
+ * Per-cpu relay channel buffer
+ */
+struct rchan_buf {
+	struct rchan *chan;		/* associated channel */
+	wait_queue_head_t read_wait;	/* reader wait queue */
+	struct timer_list timer; 	/* reader wake-up timer */
+	struct dentry *dentry;		/* channel file dentry */
+	struct kref kref;		/* channel buffer refcount */
+	struct list_head pages;		/* list of buffer pages */
+	struct buf_page *wpage;		/* current write page (cache) */
+	struct buf_page *hpage[2];	/* current subbuf header page (cache) */
+	struct buf_page *rpage;		/* current subbuf read page (cache) */
+	unsigned int page_count;	/* number of current buffer pages */
+	unsigned int finalized;		/* buffer has been finalized */
+	unsigned int cpu;		/* this buf's cpu */
+} ____cacheline_aligned;
+
+/*
+ * Relay channel data structure
+ */
+struct rchan {
+	u32 version;			/* the version of this struct */
+	size_t subbuf_size;		/* sub-buffer size */
+	size_t n_subbufs;		/* number of sub-buffers per buffer */
+	size_t alloc_size;		/* total buffer size allocated */
+	struct rchan_callbacks *cb;	/* client callbacks */
+	struct kref kref;		/* channel refcount */
+	void *private_data;		/* for user-defined data */
+	struct rchan_buf *buf[NR_CPUS]; /* per-cpu channel buffers */
+	struct list_head list;		/* for channel list */
+	struct dentry *parent;		/* parent dentry passed to open */
+	char base_filename[NAME_MAX];	/* saved base filename */
+};
+
+/*
+ * Relay channel client callbacks
+ */
+struct rchan_callbacks {
+	/*
+	 * subbuf_start - called on buffer-switch to a new sub-buffer
+	 * @buf: the channel buffer containing the new sub-buffer
+	 * @subbuf: the start of the new sub-buffer
+	 * @prev_subbuf: the start of the previous sub-buffer
+	 * @prev_padding: unused space at the end of previous sub-buffer
+	 *
+	 * The client should return 1 to continue logging, 0 to stop
+	 * logging.
+	 *
+	 * NOTE: subbuf_start will also be invoked when the buffer is
+	 *       created, so that the first sub-buffer can be initialized
+	 *       if necessary.  In this case, prev_subbuf will be NULL.
+	 *
+	 * NOTE: the client can reserve bytes at the beginning of the new
+	 *       sub-buffer by calling subbuf_start_reserve() in this callback.
+	 */
+	int (*subbuf_start) (struct rchan_buf *buf,
+			     void *subbuf,
+			     void *prev_subbuf,
+			     size_t prev_padding);
+
+	/*
+	 * create_buf_file - create file to represent a relay channel buffer
+	 * @filename: the name of the file to create
+	 * @parent: the parent of the file to create
+	 * @mode: the mode of the file to create
+	 * @buf: the channel buffer
+	 *
+	 * Called during relay_open(), once for each per-cpu buffer,
+	 * to allow the client to create a file to be used to
+	 * represent the corresponding channel buffer.  If the file is
+	 * created outside of relay, the parent must also exist in
+	 * that filesystem.
+	 *
+	 * The callback should return the dentry of the file created
+	 * to represent the relay buffer.
+	 *
+	 * Setting the is_global outparam to a non-zero value will
+	 * cause relay_open() to create a single global buffer rather
+	 * than the default set of per-cpu buffers.
+	 *
+	 * See Documentation/filesystems/relayfs.txt for more info.
+	 */
+	struct dentry *(*create_buf_file)(const char *filename,
+					  struct dentry *parent,
+					  int mode,
+					  struct rchan_buf *buf);
+
+	/*
+	 * remove_buf_file - remove file representing a relay channel buffer
+	 * @dentry: the dentry of the file to remove
+	 *
+	 * Called during relay_close(), once for each per-cpu buffer,
+	 * to allow the client to remove a file used to represent a
+	 * channel buffer.
+	 *
+	 * The callback should return 0 if successful, negative if not.
+	 */
+	int (*remove_buf_file)(struct dentry *dentry);
+};
+
+/*
+ * Start iteration at the previous element. Skip the real list head.
+ */
+static inline struct buf_page *ltt_relay_find_prev_page(struct rchan_buf *buf,
+	struct buf_page *page, size_t offset, ssize_t diff_offset)
+{
+	struct buf_page *iter;
+	unsigned int i = 0;
+	size_t orig_iter_off;
+
+	orig_iter_off = page->offset;
+	list_for_each_entry_reverse(iter, &page->list, list) {
+		/*
+		 * Skip the real list head.
+		 */
+		if (&iter->list == &buf->pages)
+			continue;
+		i++;
+		if (offset >= iter->offset
+			&& offset < iter->offset + PAGE_SIZE) {
+			if (i > 1) {
+				printk("Backward random access detected in "
+					"ltt_relay. Iterations %u, "
+					"offset %zu, orig iter->off %zu, "
+					"iter->off %zu diff_offset %zd.\n", i,
+					offset, orig_iter_off, iter->offset,
+					diff_offset);
+				WARN_ON(1);
+			}
+			return iter;
+		}
+	}
+	return NULL;
+}
+
+/*
+ * Start iteration at the next element. Skip the real list head.
+ */
+static inline struct buf_page *ltt_relay_find_next_page(struct rchan_buf *buf,
+	struct buf_page *page, size_t offset, ssize_t diff_offset)
+{
+	struct buf_page *iter;
+	unsigned int i = 0;
+	size_t orig_iter_off;
+
+	orig_iter_off = page->offset;
+	list_for_each_entry(iter, &page->list, list) {
+		/*
+		 * Skip the real list head.
+		 */
+		if (&iter->list == &buf->pages)
+			continue;
+		i++;
+		if (offset >= iter->offset
+			&& offset < iter->offset + PAGE_SIZE) {
+			if (i > 1) {
+				printk("Forward random access detected in "
+					"ltt_relay. Iterations %u, "
+					"offset %zu, orig iter->off %zu, "
+					"iter->off %zu diff_offset %zd.\n", i,
+					offset, orig_iter_off, iter->offset,
+					diff_offset);
+				WARN_ON(1);
+			}
+			return iter;
+		}
+	}
+	return NULL;
+}
+
+/*
+ * Find the page containing "offset". Cache it if it is after the currently
+ * cached page.
+ */
+static inline struct buf_page *ltt_relay_cache_page(struct rchan_buf *buf,
+		struct buf_page **page_cache,
+		struct buf_page *page, size_t offset)
+{
+	ssize_t diff_offset;
+	ssize_t half_buf_size = buf->chan->alloc_size >> 1;
+
+	/*
+	 * Make sure this is the page we want to write into. The current
+	 * page is changed concurrently by other writers. [wrh]page are
+	 * used as a cache remembering the last page written
+	 * to/read/looked up for header address. No synchronization;
+	 * could have to find the previous page is a nested write
+	 * occured. Finding the right page is done by comparing the
+	 * dest_offset with the buf_page offsets.
+	 * When at the exact opposite of the buffer, bias towards forward search
+	 * because it will be cached.
+	 */
+
+	diff_offset = (ssize_t)offset - (ssize_t)page->offset;
+	if (diff_offset <= -(ssize_t)half_buf_size)
+		diff_offset += buf->chan->alloc_size;
+	else if (diff_offset > half_buf_size)
+		diff_offset -= buf->chan->alloc_size;
+
+	if (unlikely(diff_offset >= (ssize_t)PAGE_SIZE)) {
+		page = ltt_relay_find_next_page(buf, page, offset, diff_offset);
+		WARN_ON(!page);
+		*page_cache = page;
+	} else if (unlikely(diff_offset < 0)) {
+		page = ltt_relay_find_prev_page(buf, page, offset, diff_offset);
+		WARN_ON(!page);
+	} else {
+		WARN_ON(!(diff_offset >= 0
+			&& diff_offset < (ssize_t)PAGE_SIZE));
+	}
+	return page;
+}
+
+static inline int ltt_relay_write(struct rchan_buf *buf, size_t offset,
+	const void *src, size_t len)
+{
+	struct buf_page *page;
+	ssize_t pagecpy, orig_len;
+
+	orig_len = len;
+	offset &= buf->chan->alloc_size - 1;
+	page = buf->wpage;
+	if (unlikely(!len))
+		return 0;
+	for (;;) {
+		page = ltt_relay_cache_page(buf, &buf->wpage, page, offset);
+		pagecpy = min(len, PAGE_SIZE - (offset & ~PAGE_MASK));
+		memcpy(page_address(page->page)
+			+ (offset & ~PAGE_MASK), src, pagecpy);
+		len -= pagecpy;
+		if (likely(!len))
+			break;
+		src += pagecpy;
+		offset += pagecpy;
+		/*
+		 * Underlying layer should never ask for writes across
+		 * subbuffers.
+		 */
+		WARN_ON(offset >= buf->chan->alloc_size);
+	}
+	return orig_len;
+}
+
+static inline int ltt_relay_read(struct rchan_buf *buf, size_t offset,
+	void *dest, size_t len)
+{
+	struct buf_page *page;
+	ssize_t pagecpy, orig_len;
+
+	orig_len = len;
+	offset &= buf->chan->alloc_size - 1;
+	page = buf->rpage;
+	if (unlikely(!len))
+		return 0;
+	for (;;) {
+		page = ltt_relay_cache_page(buf, &buf->rpage, page, offset);
+		pagecpy = min(len, PAGE_SIZE - (offset & ~PAGE_MASK));
+		memcpy(dest, page_address(page->page) + (offset & ~PAGE_MASK),
+			pagecpy);
+		len -= pagecpy;
+		if (likely(!len))
+			break;
+		dest += pagecpy;
+		offset += pagecpy;
+		/*
+		 * Underlying layer should never ask for reads across
+		 * subbuffers.
+		 */
+		WARN_ON(offset >= buf->chan->alloc_size);
+	}
+	return orig_len;
+}
+
+/*
+ * Return the address where a given offset is located.
+ * Should be used to get the current subbuffer header pointer. Given we know
+ * it's never on a page boundary, it's safe to write directly to this address,
+ * as long as the write is never bigger than a page size.
+ */
+static inline void *ltt_relay_offset_address(struct rchan_buf *buf,
+	size_t offset)
+{
+	struct buf_page *page;
+	unsigned int odd;
+
+	offset &= buf->chan->alloc_size - 1;
+	odd = !!(offset & buf->chan->subbuf_size);
+	page = buf->hpage[odd];
+	if (offset < page->offset || offset >= page->offset + PAGE_SIZE)
+		buf->hpage[odd] = page = buf->wpage;
+	page = ltt_relay_cache_page(buf, &buf->hpage[odd], page, offset);
+	return page_address(page->page) + (offset & ~PAGE_MASK);
+}
+
+/*
+ * CONFIG_LTT_RELAY kernel API, ltt/ltt-relay-alloc.c
+ */
+
+struct rchan *ltt_relay_open(const char *base_filename,
+			 struct dentry *parent,
+			 size_t subbuf_size,
+			 size_t n_subbufs,
+			 struct rchan_callbacks *cb,
+			 void *private_data);
+extern void ltt_relay_close(struct rchan *chan);
+
+/*
+ * exported ltt_relay file operations, ltt/ltt-relay-alloc.c
+ */
+extern const struct file_operations ltt_relay_file_operations;
+
+#endif /* _LINUX_LTT_RELAY_H */
+
-- 
Mathieu Desnoyers
OpenPGP key fingerprint: 8CD5 52C3 8E3C 4140 715F  BA06 3F25 A8FE 3BAE 9A68




More information about the lttng-dev mailing list