[lttng-dev] [PATCH 02/20] lttng lib: ring buffer

Mathieu Desnoyers mathieu.desnoyers at efficios.com
Mon Nov 28 07:42:09 EST 2011


Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
---
 drivers/staging/lttng/lib/ringbuffer/api.h         |   25 +
 drivers/staging/lttng/lib/ringbuffer/backend.h     |  250 +++
 .../lttng/lib/ringbuffer/backend_internal.h        |  449 +++++
 .../staging/lttng/lib/ringbuffer/backend_types.h   |   80 +
 drivers/staging/lttng/lib/ringbuffer/config.h      |  298 ++++
 drivers/staging/lttng/lib/ringbuffer/frontend.h    |  228 +++
 .../staging/lttng/lib/ringbuffer/frontend_api.h    |  358 ++++
 .../lttng/lib/ringbuffer/frontend_internal.h       |  424 +++++
 .../staging/lttng/lib/ringbuffer/frontend_types.h  |  176 ++
 drivers/staging/lttng/lib/ringbuffer/iterator.h    |   70 +
 drivers/staging/lttng/lib/ringbuffer/nohz.h        |   30 +
 .../lttng/lib/ringbuffer/ring_buffer_backend.c     |  854 ++++++++++
 .../lttng/lib/ringbuffer/ring_buffer_frontend.c    | 1721 ++++++++++++++++++++
 .../lttng/lib/ringbuffer/ring_buffer_iterator.c    |  798 +++++++++
 .../lttng/lib/ringbuffer/ring_buffer_mmap.c        |  115 ++
 .../lttng/lib/ringbuffer/ring_buffer_splice.c      |  202 +++
 .../staging/lttng/lib/ringbuffer/ring_buffer_vfs.c |  387 +++++
 drivers/staging/lttng/lib/ringbuffer/vatomic.h     |   85 +
 drivers/staging/lttng/lib/ringbuffer/vfs.h         |   89 +
 19 files changed, 6639 insertions(+), 0 deletions(-)
 create mode 100644 drivers/staging/lttng/lib/ringbuffer/api.h
 create mode 100644 drivers/staging/lttng/lib/ringbuffer/backend.h
 create mode 100644 drivers/staging/lttng/lib/ringbuffer/backend_internal.h
 create mode 100644 drivers/staging/lttng/lib/ringbuffer/backend_types.h
 create mode 100644 drivers/staging/lttng/lib/ringbuffer/config.h
 create mode 100644 drivers/staging/lttng/lib/ringbuffer/frontend.h
 create mode 100644 drivers/staging/lttng/lib/ringbuffer/frontend_api.h
 create mode 100644 drivers/staging/lttng/lib/ringbuffer/frontend_internal.h
 create mode 100644 drivers/staging/lttng/lib/ringbuffer/frontend_types.h
 create mode 100644 drivers/staging/lttng/lib/ringbuffer/iterator.h
 create mode 100644 drivers/staging/lttng/lib/ringbuffer/nohz.h
 create mode 100644 drivers/staging/lttng/lib/ringbuffer/ring_buffer_backend.c
 create mode 100644 drivers/staging/lttng/lib/ringbuffer/ring_buffer_frontend.c
 create mode 100644 drivers/staging/lttng/lib/ringbuffer/ring_buffer_iterator.c
 create mode 100644 drivers/staging/lttng/lib/ringbuffer/ring_buffer_mmap.c
 create mode 100644 drivers/staging/lttng/lib/ringbuffer/ring_buffer_splice.c
 create mode 100644 drivers/staging/lttng/lib/ringbuffer/ring_buffer_vfs.c
 create mode 100644 drivers/staging/lttng/lib/ringbuffer/vatomic.h
 create mode 100644 drivers/staging/lttng/lib/ringbuffer/vfs.h

diff --git a/drivers/staging/lttng/lib/ringbuffer/api.h b/drivers/staging/lttng/lib/ringbuffer/api.h
new file mode 100644
index 0000000..f8a1145
--- /dev/null
+++ b/drivers/staging/lttng/lib/ringbuffer/api.h
@@ -0,0 +1,25 @@
+#ifndef _LINUX_RING_BUFFER_API_H
+#define _LINUX_RING_BUFFER_API_H
+
+/*
+ * linux/ringbuffer/api.h
+ *
+ * Copyright (C) 2010 - Mathieu Desnoyers "mathieu.desnoyers at efficios.com"
+ *
+ * Ring Buffer API.
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include "../../wrapper/ringbuffer/backend.h"
+#include "../../wrapper/ringbuffer/frontend.h"
+#include "../../wrapper/ringbuffer/vfs.h"
+
+/*
+ * ring_buffer_frontend_api.h contains static inline functions that depend on
+ * client static inlines. Hence the inclusion of this "api" header only
+ * within the client.
+ */
+#include "../../wrapper/ringbuffer/frontend_api.h"
+
+#endif /* _LINUX_RING_BUFFER_API_H */
diff --git a/drivers/staging/lttng/lib/ringbuffer/backend.h b/drivers/staging/lttng/lib/ringbuffer/backend.h
new file mode 100644
index 0000000..541dc53
--- /dev/null
+++ b/drivers/staging/lttng/lib/ringbuffer/backend.h
@@ -0,0 +1,250 @@
+#ifndef _LINUX_RING_BUFFER_BACKEND_H
+#define _LINUX_RING_BUFFER_BACKEND_H
+
+/*
+ * linux/ringbuffer/backend.h
+ *
+ * Copyright (C) 2008-2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * Ring buffer backend (API).
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ *
+ * Credits to Steven Rostedt for proposing to use an extra-subbuffer owned by
+ * the reader in flight recorder mode.
+ */
+
+#include <linux/types.h>
+#include <linux/sched.h>
+#include <linux/timer.h>
+#include <linux/wait.h>
+#include <linux/poll.h>
+#include <linux/list.h>
+#include <linux/fs.h>
+#include <linux/mm.h>
+
+/* Internal helpers */
+#include "../../wrapper/ringbuffer/backend_internal.h"
+#include "../../wrapper/ringbuffer/frontend_internal.h"
+
+/* Ring buffer backend API */
+
+/* Ring buffer backend access (read/write) */
+
+extern size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb,
+				   size_t offset, void *dest, size_t len);
+
+extern int __lib_ring_buffer_copy_to_user(struct lib_ring_buffer_backend *bufb,
+					  size_t offset, void __user *dest,
+					  size_t len);
+
+extern int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb,
+				     size_t offset, void *dest, size_t len);
+
+extern struct page **
+lib_ring_buffer_read_get_page(struct lib_ring_buffer_backend *bufb, size_t offset,
+			      void ***virt);
+
+/*
+ * Return the address where a given offset is located.
+ * Should be used to get the current subbuffer header pointer. Given we know
+ * it's never on a page boundary, it's safe to write directly to this address,
+ * as long as the write is never bigger than a page size.
+ */
+extern void *
+lib_ring_buffer_offset_address(struct lib_ring_buffer_backend *bufb,
+			       size_t offset);
+extern void *
+lib_ring_buffer_read_offset_address(struct lib_ring_buffer_backend *bufb,
+				    size_t offset);
+
+/**
+ * lib_ring_buffer_write - write data to a buffer backend
+ * @config : ring buffer instance configuration
+ * @ctx: ring buffer context. (input arguments only)
+ * @src : source pointer to copy from
+ * @len : length of data to copy
+ *
+ * This function copies "len" bytes of data from a source pointer to a buffer
+ * backend, at the current context offset. This is more or less a buffer
+ * backend-specific memcpy() operation. Calls the slow path (_ring_buffer_write)
+ * if copy is crossing a page boundary.
+ */
+static inline
+void lib_ring_buffer_write(const struct lib_ring_buffer_config *config,
+			   struct lib_ring_buffer_ctx *ctx,
+			   const void *src, size_t len)
+{
+	struct lib_ring_buffer_backend *bufb = &ctx->buf->backend;
+	struct channel_backend *chanb = &ctx->chan->backend;
+	size_t sbidx, index;
+	size_t offset = ctx->buf_offset;
+	ssize_t pagecpy;
+	struct lib_ring_buffer_backend_pages *rpages;
+	unsigned long sb_bindex, id;
+
+	offset &= chanb->buf_size - 1;
+	sbidx = offset >> chanb->subbuf_size_order;
+	index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+	pagecpy = min_t(size_t, len, (-offset) & ~PAGE_MASK);
+	id = bufb->buf_wsb[sbidx].id;
+	sb_bindex = subbuffer_id_get_index(config, id);
+	rpages = bufb->array[sb_bindex];
+	CHAN_WARN_ON(ctx->chan,
+		     config->mode == RING_BUFFER_OVERWRITE
+		     && subbuffer_id_is_noref(config, id));
+	if (likely(pagecpy == len))
+		lib_ring_buffer_do_copy(config,
+					rpages->p[index].virt
+					    + (offset & ~PAGE_MASK),
+					src, len);
+	else
+		_lib_ring_buffer_write(bufb, offset, src, len, 0);
+	ctx->buf_offset += len;
+}
+
+/**
+ * lib_ring_buffer_memset - write len bytes of c to a buffer backend
+ * @config : ring buffer instance configuration
+ * @bufb : ring buffer backend
+ * @offset : offset within the buffer
+ * @c : the byte to copy
+ * @len : number of bytes to copy
+ *
+ * This function writes "len" bytes of "c" to a buffer backend, at a specific
+ * offset. This is more or less a buffer backend-specific memset() operation.
+ * Calls the slow path (_ring_buffer_memset) if write is crossing a page
+ * boundary.
+ */
+static inline
+void lib_ring_buffer_memset(const struct lib_ring_buffer_config *config,
+			    struct lib_ring_buffer_ctx *ctx, int c, size_t len)
+{
+
+	struct lib_ring_buffer_backend *bufb = &ctx->buf->backend;
+	struct channel_backend *chanb = &ctx->chan->backend;
+	size_t sbidx, index;
+	size_t offset = ctx->buf_offset;
+	ssize_t pagecpy;
+	struct lib_ring_buffer_backend_pages *rpages;
+	unsigned long sb_bindex, id;
+
+	offset &= chanb->buf_size - 1;
+	sbidx = offset >> chanb->subbuf_size_order;
+	index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+	pagecpy = min_t(size_t, len, (-offset) & ~PAGE_MASK);
+	id = bufb->buf_wsb[sbidx].id;
+	sb_bindex = subbuffer_id_get_index(config, id);
+	rpages = bufb->array[sb_bindex];
+	CHAN_WARN_ON(ctx->chan,
+		     config->mode == RING_BUFFER_OVERWRITE
+		     && subbuffer_id_is_noref(config, id));
+	if (likely(pagecpy == len))
+		lib_ring_buffer_do_memset(rpages->p[index].virt
+					  + (offset & ~PAGE_MASK),
+					  c, len);
+	else
+		_lib_ring_buffer_memset(bufb, offset, c, len, 0);
+	ctx->buf_offset += len;
+}
+
+/**
+ * lib_ring_buffer_copy_from_user - write userspace data to a buffer backend
+ * @config : ring buffer instance configuration
+ * @ctx: ring buffer context. (input arguments only)
+ * @src : userspace source pointer to copy from
+ * @len : length of data to copy
+ *
+ * This function copies "len" bytes of data from a userspace pointer to a
+ * buffer backend, at the current context offset. This is more or less a buffer
+ * backend-specific memcpy() operation. Calls the slow path
+ * (_ring_buffer_write_from_user) if copy is crossing a page boundary.
+ */
+static inline
+void lib_ring_buffer_copy_from_user(const struct lib_ring_buffer_config *config,
+				    struct lib_ring_buffer_ctx *ctx,
+				    const void __user *src, size_t len)
+{
+	struct lib_ring_buffer_backend *bufb = &ctx->buf->backend;
+	struct channel_backend *chanb = &ctx->chan->backend;
+	size_t sbidx, index;
+	size_t offset = ctx->buf_offset;
+	ssize_t pagecpy;
+	struct lib_ring_buffer_backend_pages *rpages;
+	unsigned long sb_bindex, id;
+	unsigned long ret;
+
+	offset &= chanb->buf_size - 1;
+	sbidx = offset >> chanb->subbuf_size_order;
+	index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+	pagecpy = min_t(size_t, len, (-offset) & ~PAGE_MASK);
+	id = bufb->buf_wsb[sbidx].id;
+	sb_bindex = subbuffer_id_get_index(config, id);
+	rpages = bufb->array[sb_bindex];
+	CHAN_WARN_ON(ctx->chan,
+		     config->mode == RING_BUFFER_OVERWRITE
+		     && subbuffer_id_is_noref(config, id));
+
+	if (unlikely(!access_ok(VERIFY_READ, src, len)))
+		goto fill_buffer;
+
+	if (likely(pagecpy == len)) {
+		ret = lib_ring_buffer_do_copy_from_user(
+			rpages->p[index].virt + (offset & ~PAGE_MASK),
+			src, len);
+		if (unlikely(ret > 0)) {
+			len -= (pagecpy - ret);
+			offset += (pagecpy - ret);
+			goto fill_buffer;
+		}
+	} else {
+		_lib_ring_buffer_copy_from_user(bufb, offset, src, len, 0);
+	}
+	ctx->buf_offset += len;
+
+	return;
+
+fill_buffer:
+	/*
+	 * In the error path we call the slow path version to avoid
+	 * the pollution of static inline code.
+	 */
+	_lib_ring_buffer_memset(bufb, offset, 0, len, 0);
+}
+
+/*
+ * This accessor counts the number of unread records in a buffer.
+ * It only provides a consistent value if no reads not writes are performed
+ * concurrently.
+ */
+static inline
+unsigned long lib_ring_buffer_get_records_unread(
+				const struct lib_ring_buffer_config *config,
+				struct lib_ring_buffer *buf)
+{
+	struct lib_ring_buffer_backend *bufb = &buf->backend;
+	struct lib_ring_buffer_backend_pages *pages;
+	unsigned long records_unread = 0, sb_bindex, id;
+	unsigned int i;
+
+	for (i = 0; i < bufb->chan->backend.num_subbuf; i++) {
+		id = bufb->buf_wsb[i].id;
+		sb_bindex = subbuffer_id_get_index(config, id);
+		pages = bufb->array[sb_bindex];
+		records_unread += v_read(config, &pages->records_unread);
+	}
+	if (config->mode == RING_BUFFER_OVERWRITE) {
+		id = bufb->buf_rsb.id;
+		sb_bindex = subbuffer_id_get_index(config, id);
+		pages = bufb->array[sb_bindex];
+		records_unread += v_read(config, &pages->records_unread);
+	}
+	return records_unread;
+}
+
+ssize_t lib_ring_buffer_file_splice_read(struct file *in, loff_t *ppos,
+					 struct pipe_inode_info *pipe,
+					 size_t len, unsigned int flags);
+loff_t lib_ring_buffer_no_llseek(struct file *file, loff_t offset, int origin);
+
+#endif /* _LINUX_RING_BUFFER_BACKEND_H */
diff --git a/drivers/staging/lttng/lib/ringbuffer/backend_internal.h b/drivers/staging/lttng/lib/ringbuffer/backend_internal.h
new file mode 100644
index 0000000..442f357
--- /dev/null
+++ b/drivers/staging/lttng/lib/ringbuffer/backend_internal.h
@@ -0,0 +1,449 @@
+#ifndef _LINUX_RING_BUFFER_BACKEND_INTERNAL_H
+#define _LINUX_RING_BUFFER_BACKEND_INTERNAL_H
+
+/*
+ * linux/ringbuffer/backend_internal.h
+ *
+ * Copyright (C) 2008-2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * Ring buffer backend (internal helpers).
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include "../../wrapper/ringbuffer/config.h"
+#include "../../wrapper/ringbuffer/backend_types.h"
+#include "../../wrapper/ringbuffer/frontend_types.h"
+#include <linux/string.h>
+#include <linux/uaccess.h>
+
+/* Ring buffer backend API presented to the frontend */
+
+/* Ring buffer and channel backend create/free */
+
+int lib_ring_buffer_backend_create(struct lib_ring_buffer_backend *bufb,
+				   struct channel_backend *chan, int cpu);
+void channel_backend_unregister_notifiers(struct channel_backend *chanb);
+void lib_ring_buffer_backend_free(struct lib_ring_buffer_backend *bufb);
+int channel_backend_init(struct channel_backend *chanb,
+			 const char *name,
+			 const struct lib_ring_buffer_config *config,
+			 void *priv, size_t subbuf_size,
+			 size_t num_subbuf);
+void channel_backend_free(struct channel_backend *chanb);
+
+void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb);
+void channel_backend_reset(struct channel_backend *chanb);
+
+int lib_ring_buffer_backend_init(void);
+void lib_ring_buffer_backend_exit(void);
+
+extern void _lib_ring_buffer_write(struct lib_ring_buffer_backend *bufb,
+				   size_t offset, const void *src, size_t len,
+				   ssize_t pagecpy);
+extern void _lib_ring_buffer_memset(struct lib_ring_buffer_backend *bufb,
+				    size_t offset, int c, size_t len,
+				    ssize_t pagecpy);
+extern void _lib_ring_buffer_copy_from_user(struct lib_ring_buffer_backend *bufb,
+					    size_t offset, const void *src,
+					    size_t len, ssize_t pagecpy);
+
+/*
+ * Subbuffer ID bits for overwrite mode. Need to fit within a single word to be
+ * exchanged atomically.
+ *
+ * Top half word, except lowest bit, belongs to "offset", which is used to keep
+ * to count the produced buffers.  For overwrite mode, this provides the
+ * consumer with the capacity to read subbuffers in order, handling the
+ * situation where producers would write up to 2^15 buffers (or 2^31 for 64-bit
+ * systems) concurrently with a single execution of get_subbuf (between offset
+ * sampling and subbuffer ID exchange).
+ */
+
+#define HALF_ULONG_BITS		(BITS_PER_LONG >> 1)
+
+#define SB_ID_OFFSET_SHIFT	(HALF_ULONG_BITS + 1)
+#define SB_ID_OFFSET_COUNT	(1UL << SB_ID_OFFSET_SHIFT)
+#define SB_ID_OFFSET_MASK	(~(SB_ID_OFFSET_COUNT - 1))
+/*
+ * Lowest bit of top word half belongs to noref. Used only for overwrite mode.
+ */
+#define SB_ID_NOREF_SHIFT	(SB_ID_OFFSET_SHIFT - 1)
+#define SB_ID_NOREF_COUNT	(1UL << SB_ID_NOREF_SHIFT)
+#define SB_ID_NOREF_MASK	SB_ID_NOREF_COUNT
+/*
+ * In overwrite mode: lowest half of word is used for index.
+ * Limit of 2^16 subbuffers per buffer on 32-bit, 2^32 on 64-bit.
+ * In producer-consumer mode: whole word used for index.
+ */
+#define SB_ID_INDEX_SHIFT	0
+#define SB_ID_INDEX_COUNT	(1UL << SB_ID_INDEX_SHIFT)
+#define SB_ID_INDEX_MASK	(SB_ID_NOREF_COUNT - 1)
+
+/*
+ * Construct the subbuffer id from offset, index and noref. Use only the index
+ * for producer-consumer mode (offset and noref are only used in overwrite
+ * mode).
+ */
+static inline
+unsigned long subbuffer_id(const struct lib_ring_buffer_config *config,
+			   unsigned long offset, unsigned long noref,
+			   unsigned long index)
+{
+	if (config->mode == RING_BUFFER_OVERWRITE)
+		return (offset << SB_ID_OFFSET_SHIFT)
+		       | (noref << SB_ID_NOREF_SHIFT)
+		       | index;
+	else
+		return index;
+}
+
+/*
+ * Compare offset with the offset contained within id. Return 1 if the offset
+ * bits are identical, else 0.
+ */
+static inline
+int subbuffer_id_compare_offset(const struct lib_ring_buffer_config *config,
+				unsigned long id, unsigned long offset)
+{
+	return (id & SB_ID_OFFSET_MASK) == (offset << SB_ID_OFFSET_SHIFT);
+}
+
+static inline
+unsigned long subbuffer_id_get_index(const struct lib_ring_buffer_config *config,
+				     unsigned long id)
+{
+	if (config->mode == RING_BUFFER_OVERWRITE)
+		return id & SB_ID_INDEX_MASK;
+	else
+		return id;
+}
+
+static inline
+unsigned long subbuffer_id_is_noref(const struct lib_ring_buffer_config *config,
+				    unsigned long id)
+{
+	if (config->mode == RING_BUFFER_OVERWRITE)
+		return !!(id & SB_ID_NOREF_MASK);
+	else
+		return 1;
+}
+
+/*
+ * Only used by reader on subbuffer ID it has exclusive access to. No volatile
+ * needed.
+ */
+static inline
+void subbuffer_id_set_noref(const struct lib_ring_buffer_config *config,
+			    unsigned long *id)
+{
+	if (config->mode == RING_BUFFER_OVERWRITE)
+		*id |= SB_ID_NOREF_MASK;
+}
+
+static inline
+void subbuffer_id_set_noref_offset(const struct lib_ring_buffer_config *config,
+				   unsigned long *id, unsigned long offset)
+{
+	unsigned long tmp;
+
+	if (config->mode == RING_BUFFER_OVERWRITE) {
+		tmp = *id;
+		tmp &= ~SB_ID_OFFSET_MASK;
+		tmp |= offset << SB_ID_OFFSET_SHIFT;
+		tmp |= SB_ID_NOREF_MASK;
+		/* Volatile store, read concurrently by readers. */
+		ACCESS_ONCE(*id) = tmp;
+	}
+}
+
+/* No volatile access, since already used locally */
+static inline
+void subbuffer_id_clear_noref(const struct lib_ring_buffer_config *config,
+			      unsigned long *id)
+{
+	if (config->mode == RING_BUFFER_OVERWRITE)
+		*id &= ~SB_ID_NOREF_MASK;
+}
+
+/*
+ * For overwrite mode, cap the number of subbuffers per buffer to:
+ * 2^16 on 32-bit architectures
+ * 2^32 on 64-bit architectures
+ * This is required to fit in the index part of the ID. Return 0 on success,
+ * -EPERM on failure.
+ */
+static inline
+int subbuffer_id_check_index(const struct lib_ring_buffer_config *config,
+			     unsigned long num_subbuf)
+{
+	if (config->mode == RING_BUFFER_OVERWRITE)
+		return (num_subbuf > (1UL << HALF_ULONG_BITS)) ? -EPERM : 0;
+	else
+		return 0;
+}
+
+static inline
+void subbuffer_count_record(const struct lib_ring_buffer_config *config,
+			    struct lib_ring_buffer_backend *bufb,
+			    unsigned long idx)
+{
+	unsigned long sb_bindex;
+
+	sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id);
+	v_inc(config, &bufb->array[sb_bindex]->records_commit);
+}
+
+/*
+ * Reader has exclusive subbuffer access for record consumption. No need to
+ * perform the decrement atomically.
+ */
+static inline
+void subbuffer_consume_record(const struct lib_ring_buffer_config *config,
+			      struct lib_ring_buffer_backend *bufb)
+{
+	unsigned long sb_bindex;
+
+	sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id);
+	CHAN_WARN_ON(bufb->chan,
+		     !v_read(config, &bufb->array[sb_bindex]->records_unread));
+	/* Non-atomic decrement protected by exclusive subbuffer access */
+	_v_dec(config, &bufb->array[sb_bindex]->records_unread);
+	v_inc(config, &bufb->records_read);
+}
+
+static inline
+unsigned long subbuffer_get_records_count(
+				const struct lib_ring_buffer_config *config,
+				struct lib_ring_buffer_backend *bufb,
+				unsigned long idx)
+{
+	unsigned long sb_bindex;
+
+	sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id);
+	return v_read(config, &bufb->array[sb_bindex]->records_commit);
+}
+
+/*
+ * Must be executed at subbuffer delivery when the writer has _exclusive_
+ * subbuffer access. See ring_buffer_check_deliver() for details.
+ * ring_buffer_get_records_count() must be called to get the records count
+ * before this function, because it resets the records_commit count.
+ */
+static inline
+unsigned long subbuffer_count_records_overrun(
+				const struct lib_ring_buffer_config *config,
+				struct lib_ring_buffer_backend *bufb,
+				unsigned long idx)
+{
+	struct lib_ring_buffer_backend_pages *pages;
+	unsigned long overruns, sb_bindex;
+
+	sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id);
+	pages = bufb->array[sb_bindex];
+	overruns = v_read(config, &pages->records_unread);
+	v_set(config, &pages->records_unread,
+	      v_read(config, &pages->records_commit));
+	v_set(config, &pages->records_commit, 0);
+
+	return overruns;
+}
+
+static inline
+void subbuffer_set_data_size(const struct lib_ring_buffer_config *config,
+			     struct lib_ring_buffer_backend *bufb,
+			     unsigned long idx,
+			     unsigned long data_size)
+{
+	struct lib_ring_buffer_backend_pages *pages;
+	unsigned long sb_bindex;
+
+	sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id);
+	pages = bufb->array[sb_bindex];
+	pages->data_size = data_size;
+}
+
+static inline
+unsigned long subbuffer_get_read_data_size(
+				const struct lib_ring_buffer_config *config,
+				struct lib_ring_buffer_backend *bufb)
+{
+	struct lib_ring_buffer_backend_pages *pages;
+	unsigned long sb_bindex;
+
+	sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id);
+	pages = bufb->array[sb_bindex];
+	return pages->data_size;
+}
+
+static inline
+unsigned long subbuffer_get_data_size(
+				const struct lib_ring_buffer_config *config,
+				struct lib_ring_buffer_backend *bufb,
+				unsigned long idx)
+{
+	struct lib_ring_buffer_backend_pages *pages;
+	unsigned long sb_bindex;
+
+	sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id);
+	pages = bufb->array[sb_bindex];
+	return pages->data_size;
+}
+
+/**
+ * lib_ring_buffer_clear_noref - Clear the noref subbuffer flag, called by
+ *                               writer.
+ */
+static inline
+void lib_ring_buffer_clear_noref(const struct lib_ring_buffer_config *config,
+				 struct lib_ring_buffer_backend *bufb,
+				 unsigned long idx)
+{
+	unsigned long id, new_id;
+
+	if (config->mode != RING_BUFFER_OVERWRITE)
+		return;
+
+	/*
+	 * Performing a volatile access to read the sb_pages, because we want to
+	 * read a coherent version of the pointer and the associated noref flag.
+	 */
+	id = ACCESS_ONCE(bufb->buf_wsb[idx].id);
+	for (;;) {
+		/* This check is called on the fast path for each record. */
+		if (likely(!subbuffer_id_is_noref(config, id))) {
+			/*
+			 * Store after load dependency ordering the writes to
+			 * the subbuffer after load and test of the noref flag
+			 * matches the memory barrier implied by the cmpxchg()
+			 * in update_read_sb_index().
+			 */
+			return;	/* Already writing to this buffer */
+		}
+		new_id = id;
+		subbuffer_id_clear_noref(config, &new_id);
+		new_id = cmpxchg(&bufb->buf_wsb[idx].id, id, new_id);
+		if (likely(new_id == id))
+			break;
+		id = new_id;
+	}
+}
+
+/**
+ * lib_ring_buffer_set_noref_offset - Set the noref subbuffer flag and offset,
+ *                                    called by writer.
+ */
+static inline
+void lib_ring_buffer_set_noref_offset(const struct lib_ring_buffer_config *config,
+				      struct lib_ring_buffer_backend *bufb,
+				      unsigned long idx, unsigned long offset)
+{
+	if (config->mode != RING_BUFFER_OVERWRITE)
+		return;
+
+	/*
+	 * Because ring_buffer_set_noref() is only called by a single thread
+	 * (the one which updated the cc_sb value), there are no concurrent
+	 * updates to take care of: other writers have not updated cc_sb, so
+	 * they cannot set the noref flag, and concurrent readers cannot modify
+	 * the pointer because the noref flag is not set yet.
+	 * The smp_wmb() in ring_buffer_commit() takes care of ordering writes
+	 * to the subbuffer before this set noref operation.
+	 * subbuffer_set_noref() uses a volatile store to deal with concurrent
+	 * readers of the noref flag.
+	 */
+	CHAN_WARN_ON(bufb->chan,
+		     subbuffer_id_is_noref(config, bufb->buf_wsb[idx].id));
+	/*
+	 * Memory barrier that ensures counter stores are ordered before set
+	 * noref and offset.
+	 */
+	smp_mb();
+	subbuffer_id_set_noref_offset(config, &bufb->buf_wsb[idx].id, offset);
+}
+
+/**
+ * update_read_sb_index - Read-side subbuffer index update.
+ */
+static inline
+int update_read_sb_index(const struct lib_ring_buffer_config *config,
+			 struct lib_ring_buffer_backend *bufb,
+			 struct channel_backend *chanb,
+			 unsigned long consumed_idx,
+			 unsigned long consumed_count)
+{
+	unsigned long old_id, new_id;
+
+	if (config->mode == RING_BUFFER_OVERWRITE) {
+		/*
+		 * Exchange the target writer subbuffer with our own unused
+		 * subbuffer. No need to use ACCESS_ONCE() here to read the
+		 * old_wpage, because the value read will be confirmed by the
+		 * following cmpxchg().
+		 */
+		old_id = bufb->buf_wsb[consumed_idx].id;
+		if (unlikely(!subbuffer_id_is_noref(config, old_id)))
+			return -EAGAIN;
+		/*
+		 * Make sure the offset count we are expecting matches the one
+		 * indicated by the writer.
+		 */
+		if (unlikely(!subbuffer_id_compare_offset(config, old_id,
+							  consumed_count)))
+			return -EAGAIN;
+		CHAN_WARN_ON(bufb->chan,
+			     !subbuffer_id_is_noref(config, bufb->buf_rsb.id));
+		subbuffer_id_set_noref_offset(config, &bufb->buf_rsb.id,
+					      consumed_count);
+		new_id = cmpxchg(&bufb->buf_wsb[consumed_idx].id, old_id,
+				 bufb->buf_rsb.id);
+		if (unlikely(old_id != new_id))
+			return -EAGAIN;
+		bufb->buf_rsb.id = new_id;
+	} else {
+		/* No page exchange, use the writer page directly */
+		bufb->buf_rsb.id = bufb->buf_wsb[consumed_idx].id;
+	}
+	return 0;
+}
+
+/*
+ * Use the architecture-specific memcpy implementation for constant-sized
+ * inputs, but rely on an inline memcpy for length statically unknown.
+ * The function call to memcpy is just way too expensive for a fast path.
+ */
+#define lib_ring_buffer_do_copy(config, dest, src, len)		\
+do {								\
+	size_t __len = (len);					\
+	if (__builtin_constant_p(len))				\
+		memcpy(dest, src, __len);			\
+	else							\
+		inline_memcpy(dest, src, __len);		\
+} while (0)
+
+/*
+ * We use __copy_from_user to copy userspace data since we already
+ * did the access_ok for the whole range.
+ */
+static inline
+unsigned long lib_ring_buffer_do_copy_from_user(void *dest,
+						const void __user *src,
+						unsigned long len)
+{
+	return __copy_from_user(dest, src, len);
+}
+
+/*
+ * write len bytes to dest with c
+ */
+static inline
+void lib_ring_buffer_do_memset(char *dest, int c,
+	unsigned long len)
+{
+	unsigned long i;
+
+	for (i = 0; i < len; i++)
+		dest[i] = c;
+}
+
+#endif /* _LINUX_RING_BUFFER_BACKEND_INTERNAL_H */
diff --git a/drivers/staging/lttng/lib/ringbuffer/backend_types.h b/drivers/staging/lttng/lib/ringbuffer/backend_types.h
new file mode 100644
index 0000000..1d301de
--- /dev/null
+++ b/drivers/staging/lttng/lib/ringbuffer/backend_types.h
@@ -0,0 +1,80 @@
+#ifndef _LINUX_RING_BUFFER_BACKEND_TYPES_H
+#define _LINUX_RING_BUFFER_BACKEND_TYPES_H
+
+/*
+ * linux/ringbuffer/backend_types.h
+ *
+ * Copyright (C) 2008-2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * Ring buffer backend (types).
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <linux/cpumask.h>
+#include <linux/types.h>
+
+struct lib_ring_buffer_backend_page {
+	void *virt;			/* page virtual address (cached) */
+	struct page *page;		/* pointer to page structure */
+};
+
+struct lib_ring_buffer_backend_pages {
+	unsigned long mmap_offset;	/* offset of the subbuffer in mmap */
+	union v_atomic records_commit;	/* current records committed count */
+	union v_atomic records_unread;	/* records to read */
+	unsigned long data_size;	/* Amount of data to read from subbuf */
+	struct lib_ring_buffer_backend_page p[];
+};
+
+struct lib_ring_buffer_backend_subbuffer {
+	/* Identifier for subbuf backend pages. Exchanged atomically. */
+	unsigned long id;		/* backend subbuffer identifier */
+};
+
+/*
+ * Forward declaration of frontend-specific channel and ring_buffer.
+ */
+struct channel;
+struct lib_ring_buffer;
+
+struct lib_ring_buffer_backend {
+	/* Array of ring_buffer_backend_subbuffer for writer */
+	struct lib_ring_buffer_backend_subbuffer *buf_wsb;
+	/* ring_buffer_backend_subbuffer for reader */
+	struct lib_ring_buffer_backend_subbuffer buf_rsb;
+	/*
+	 * Pointer array of backend pages, for whole buffer.
+	 * Indexed by ring_buffer_backend_subbuffer identifier (id) index.
+	 */
+	struct lib_ring_buffer_backend_pages **array;
+	unsigned int num_pages_per_subbuf;
+
+	struct channel *chan;		/* Associated channel */
+	int cpu;			/* This buffer's cpu. -1 if global. */
+	union v_atomic records_read;	/* Number of records read */
+	unsigned int allocated:1;	/* Bool: is buffer allocated ? */
+};
+
+struct channel_backend {
+	unsigned long buf_size;		/* Size of the buffer */
+	unsigned long subbuf_size;	/* Sub-buffer size */
+	unsigned int subbuf_size_order;	/* Order of sub-buffer size */
+	unsigned int num_subbuf_order;	/*
+					 * Order of number of sub-buffers/buffer
+					 * for writer.
+					 */
+	unsigned int buf_size_order;	/* Order of buffer size */
+	int extra_reader_sb:1;		/* Bool: has extra reader subbuffer */
+	struct lib_ring_buffer *buf;	/* Channel per-cpu buffers */
+
+	unsigned long num_subbuf;	/* Number of sub-buffers for writer */
+	u64 start_tsc;			/* Channel creation TSC value */
+	void *priv;			/* Client-specific information */
+	struct notifier_block cpu_hp_notifier;	 /* CPU hotplug notifier */
+	const struct lib_ring_buffer_config *config; /* Ring buffer configuration */
+	cpumask_var_t cpumask;		/* Allocated per-cpu buffers cpumask */
+	char name[NAME_MAX];		/* Channel name */
+};
+
+#endif /* _LINUX_RING_BUFFER_BACKEND_TYPES_H */
diff --git a/drivers/staging/lttng/lib/ringbuffer/config.h b/drivers/staging/lttng/lib/ringbuffer/config.h
new file mode 100644
index 0000000..fd73d55
--- /dev/null
+++ b/drivers/staging/lttng/lib/ringbuffer/config.h
@@ -0,0 +1,298 @@
+#ifndef _LINUX_RING_BUFFER_CONFIG_H
+#define _LINUX_RING_BUFFER_CONFIG_H
+
+/*
+ * linux/ringbuffer/config.h
+ *
+ * Copyright (C) 2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * Ring buffer configuration header. Note: after declaring the standard inline
+ * functions, clients should also include linux/ringbuffer/api.h.
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <linux/types.h>
+#include <linux/percpu.h>
+#include "../align.h"
+
+struct lib_ring_buffer;
+struct channel;
+struct lib_ring_buffer_config;
+struct lib_ring_buffer_ctx;
+
+/*
+ * Ring buffer client callbacks. Only used by slow path, never on fast path.
+ * For the fast path, record_header_size(), ring_buffer_clock_read() should be
+ * provided as inline functions too.  These may simply return 0 if not used by
+ * the client.
+ */
+struct lib_ring_buffer_client_cb {
+	/* Mandatory callbacks */
+
+	/* A static inline version is also required for fast path */
+	u64 (*ring_buffer_clock_read) (struct channel *chan);
+	size_t (*record_header_size) (const struct lib_ring_buffer_config *config,
+				      struct channel *chan, size_t offset,
+				      size_t *pre_header_padding,
+				      struct lib_ring_buffer_ctx *ctx);
+
+	/* Slow path only, at subbuffer switch */
+	size_t (*subbuffer_header_size) (void);
+	void (*buffer_begin) (struct lib_ring_buffer *buf, u64 tsc,
+			      unsigned int subbuf_idx);
+	void (*buffer_end) (struct lib_ring_buffer *buf, u64 tsc,
+			    unsigned int subbuf_idx, unsigned long data_size);
+
+	/* Optional callbacks (can be set to NULL) */
+
+	/* Called at buffer creation/finalize */
+	int (*buffer_create) (struct lib_ring_buffer *buf, void *priv,
+			      int cpu, const char *name);
+	/*
+	 * Clients should guarantee that no new reader handle can be opened
+	 * after finalize.
+	 */
+	void (*buffer_finalize) (struct lib_ring_buffer *buf, void *priv, int cpu);
+
+	/*
+	 * Extract header length, payload length and timestamp from event
+	 * record. Used by buffer iterators. Timestamp is only used by channel
+	 * iterator.
+	 */
+	void (*record_get) (const struct lib_ring_buffer_config *config,
+			    struct channel *chan, struct lib_ring_buffer *buf,
+			    size_t offset, size_t *header_len,
+			    size_t *payload_len, u64 *timestamp);
+};
+
+/*
+ * Ring buffer instance configuration.
+ *
+ * Declare as "static const" within the client object to ensure the inline fast
+ * paths can be optimized.
+ *
+ * alloc/sync pairs:
+ *
+ * RING_BUFFER_ALLOC_PER_CPU and RING_BUFFER_SYNC_PER_CPU :
+ *   Per-cpu buffers with per-cpu synchronization. Tracing must be performed
+ *   with preemption disabled (lib_ring_buffer_get_cpu() and
+ *   lib_ring_buffer_put_cpu()).
+ *
+ * RING_BUFFER_ALLOC_PER_CPU and RING_BUFFER_SYNC_GLOBAL :
+ *   Per-cpu buffer with global synchronization. Tracing can be performed with
+ *   preemption enabled, statistically stays on the local buffers.
+ *
+ * RING_BUFFER_ALLOC_GLOBAL and RING_BUFFER_SYNC_PER_CPU :
+ *   Should only be used for buffers belonging to a single thread or protected
+ *   by mutual exclusion by the client. Note that periodical sub-buffer switch
+ *   should be disabled in this kind of configuration.
+ *
+ * RING_BUFFER_ALLOC_GLOBAL and RING_BUFFER_SYNC_GLOBAL :
+ *   Global shared buffer with global synchronization.
+ *
+ * wakeup:
+ *
+ * RING_BUFFER_WAKEUP_BY_TIMER uses per-cpu deferrable timers to poll the
+ * buffers and wake up readers if data is ready. Mainly useful for tracers which
+ * don't want to call into the wakeup code on the tracing path. Use in
+ * combination with "read_timer_interval" channel_create() argument.
+ *
+ * RING_BUFFER_WAKEUP_BY_WRITER directly wakes up readers when a subbuffer is
+ * ready to read. Lower latencies before the reader is woken up. Mainly suitable
+ * for drivers.
+ *
+ * RING_BUFFER_WAKEUP_NONE does not perform any wakeup whatsoever. The client
+ * has the responsibility to perform wakeups.
+ */
+struct lib_ring_buffer_config {
+	enum {
+		RING_BUFFER_ALLOC_PER_CPU,
+		RING_BUFFER_ALLOC_GLOBAL,
+	} alloc;
+	enum {
+		RING_BUFFER_SYNC_PER_CPU,	/* Wait-free */
+		RING_BUFFER_SYNC_GLOBAL,	/* Lock-free */
+	} sync;
+	enum {
+		RING_BUFFER_OVERWRITE,		/* Overwrite when buffer full */
+		RING_BUFFER_DISCARD,		/* Discard when buffer full */
+	} mode;
+	enum {
+		RING_BUFFER_SPLICE,
+		RING_BUFFER_MMAP,
+		RING_BUFFER_READ,		/* TODO */
+		RING_BUFFER_ITERATOR,
+		RING_BUFFER_NONE,
+	} output;
+	enum {
+		RING_BUFFER_PAGE,
+		RING_BUFFER_VMAP,		/* TODO */
+		RING_BUFFER_STATIC,		/* TODO */
+	} backend;
+	enum {
+		RING_BUFFER_NO_OOPS_CONSISTENCY,
+		RING_BUFFER_OOPS_CONSISTENCY,
+	} oops;
+	enum {
+		RING_BUFFER_IPI_BARRIER,
+		RING_BUFFER_NO_IPI_BARRIER,
+	} ipi;
+	enum {
+		RING_BUFFER_WAKEUP_BY_TIMER,	/* wake up performed by timer */
+		RING_BUFFER_WAKEUP_BY_WRITER,	/*
+						 * writer wakes up reader,
+						 * not lock-free
+						 * (takes spinlock).
+						 */
+	} wakeup;
+	/*
+	 * tsc_bits: timestamp bits saved at each record.
+	 *   0 and 64 disable the timestamp compression scheme.
+	 */
+	unsigned int tsc_bits;
+	struct lib_ring_buffer_client_cb cb;
+};
+
+/*
+ * ring buffer context
+ *
+ * Context passed to lib_ring_buffer_reserve(), lib_ring_buffer_commit(),
+ * lib_ring_buffer_try_discard_reserve(), lib_ring_buffer_align_ctx() and
+ * lib_ring_buffer_write().
+ */
+struct lib_ring_buffer_ctx {
+	/* input received by lib_ring_buffer_reserve(), saved here. */
+	struct channel *chan;		/* channel */
+	void *priv;			/* client private data */
+	size_t data_size;		/* size of payload */
+	int largest_align;		/*
+					 * alignment of the largest element
+					 * in the payload
+					 */
+	int cpu;			/* processor id */
+
+	/* output from lib_ring_buffer_reserve() */
+	struct lib_ring_buffer *buf;	/*
+					 * buffer corresponding to processor id
+					 * for this channel
+					 */
+	size_t slot_size;		/* size of the reserved slot */
+	unsigned long buf_offset;	/* offset following the record header */
+	unsigned long pre_offset;	/*
+					 * Initial offset position _before_
+					 * the record is written. Positioned
+					 * prior to record header alignment
+					 * padding.
+					 */
+	u64 tsc;			/* time-stamp counter value */
+	unsigned int rflags;		/* reservation flags */
+};
+
+/**
+ * lib_ring_buffer_ctx_init - initialize ring buffer context
+ * @ctx: ring buffer context to initialize
+ * @chan: channel
+ * @priv: client private data
+ * @data_size: size of record data payload
+ * @largest_align: largest alignment within data payload types
+ * @cpu: processor id
+ */
+static inline
+void lib_ring_buffer_ctx_init(struct lib_ring_buffer_ctx *ctx,
+			      struct channel *chan, void *priv,
+			      size_t data_size, int largest_align,
+			      int cpu)
+{
+	ctx->chan = chan;
+	ctx->priv = priv;
+	ctx->data_size = data_size;
+	ctx->largest_align = largest_align;
+	ctx->cpu = cpu;
+	ctx->rflags = 0;
+}
+
+/*
+ * Reservation flags.
+ *
+ * RING_BUFFER_RFLAG_FULL_TSC
+ *
+ * This flag is passed to record_header_size() and to the primitive used to
+ * write the record header. It indicates that the full 64-bit time value is
+ * needed in the record header. If this flag is not set, the record header needs
+ * only to contain "tsc_bits" bit of time value.
+ *
+ * Reservation flags can be added by the client, starting from
+ * "(RING_BUFFER_FLAGS_END << 0)". It can be used to pass information from
+ * record_header_size() to lib_ring_buffer_write_record_header().
+ */
+#define	RING_BUFFER_RFLAG_FULL_TSC		(1U << 0)
+#define RING_BUFFER_RFLAG_END			(1U << 1)
+
+/*
+ * We need to define RING_BUFFER_ALIGN_ATTR so it is known early at
+ * compile-time. We have to duplicate the "config->align" information and the
+ * definition here because config->align is used both in the slow and fast
+ * paths, but RING_BUFFER_ALIGN_ATTR is only available for the client code.
+ */
+#ifdef RING_BUFFER_ALIGN
+
+# define RING_BUFFER_ALIGN_ATTR		/* Default arch alignment */
+
+/*
+ * Calculate the offset needed to align the type.
+ * size_of_type must be non-zero.
+ */
+static inline
+unsigned int lib_ring_buffer_align(size_t align_drift, size_t size_of_type)
+{
+	return offset_align(align_drift, size_of_type);
+}
+
+#else
+
+# define RING_BUFFER_ALIGN_ATTR __attribute__((packed))
+
+/*
+ * Calculate the offset needed to align the type.
+ * size_of_type must be non-zero.
+ */
+static inline
+unsigned int lib_ring_buffer_align(size_t align_drift, size_t size_of_type)
+{
+	return 0;
+}
+
+#endif
+
+/**
+ * lib_ring_buffer_align_ctx - Align context offset on "alignment"
+ * @ctx: ring buffer context.
+ */
+static inline
+void lib_ring_buffer_align_ctx(struct lib_ring_buffer_ctx *ctx,
+			   size_t alignment)
+{
+	ctx->buf_offset += lib_ring_buffer_align(ctx->buf_offset,
+						 alignment);
+}
+
+/*
+ * lib_ring_buffer_check_config() returns 0 on success.
+ * Used internally to check for valid configurations at channel creation.
+ */
+static inline
+int lib_ring_buffer_check_config(const struct lib_ring_buffer_config *config,
+			     unsigned int switch_timer_interval,
+			     unsigned int read_timer_interval)
+{
+	if (config->alloc == RING_BUFFER_ALLOC_GLOBAL
+	    && config->sync == RING_BUFFER_SYNC_PER_CPU
+	    && switch_timer_interval)
+		return -EINVAL;
+	return 0;
+}
+
+#include "../../wrapper/ringbuffer/vatomic.h"
+
+#endif /* _LINUX_RING_BUFFER_CONFIG_H */
diff --git a/drivers/staging/lttng/lib/ringbuffer/frontend.h b/drivers/staging/lttng/lib/ringbuffer/frontend.h
new file mode 100644
index 0000000..01af77a
--- /dev/null
+++ b/drivers/staging/lttng/lib/ringbuffer/frontend.h
@@ -0,0 +1,228 @@
+#ifndef _LINUX_RING_BUFFER_FRONTEND_H
+#define _LINUX_RING_BUFFER_FRONTEND_H
+
+/*
+ * linux/ringbuffer/frontend.h
+ *
+ * (C) Copyright 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * Ring Buffer Library Synchronization Header (API).
+ *
+ * Author:
+ *	Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * See ring_buffer_frontend.c for more information on wait-free algorithms.
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <linux/pipe_fs_i.h>
+#include <linux/rcupdate.h>
+#include <linux/cpumask.h>
+#include <linux/module.h>
+#include <linux/bitops.h>
+#include <linux/splice.h>
+#include <linux/string.h>
+#include <linux/timer.h>
+#include <linux/sched.h>
+#include <linux/cache.h>
+#include <linux/time.h>
+#include <linux/slab.h>
+#include <linux/init.h>
+#include <linux/stat.h>
+#include <linux/cpu.h>
+#include <linux/fs.h>
+
+#include <asm/atomic.h>
+#include <asm/local.h>
+
+/* Internal helpers */
+#include "../../wrapper/ringbuffer/frontend_internal.h"
+
+/* Buffer creation/removal and setup operations */
+
+/*
+ * switch_timer_interval is the time interval (in us) to fill sub-buffers with
+ * padding to let readers get those sub-buffers.  Used for live streaming.
+ *
+ * read_timer_interval is the time interval (in us) to wake up pending readers.
+ *
+ * buf_addr is a pointer the the beginning of the preallocated buffer contiguous
+ * address mapping. It is used only by RING_BUFFER_STATIC configuration. It can
+ * be set to NULL for other backends.
+ */
+
+extern
+struct channel *channel_create(const struct lib_ring_buffer_config *config,
+			       const char *name, void *priv,
+			       void *buf_addr,
+			       size_t subbuf_size, size_t num_subbuf,
+			       unsigned int switch_timer_interval,
+			       unsigned int read_timer_interval);
+
+/*
+ * channel_destroy returns the private data pointer. It finalizes all channel's
+ * buffers, waits for readers to release all references, and destroys the
+ * channel.
+ */
+extern
+void *channel_destroy(struct channel *chan);
+
+
+/* Buffer read operations */
+
+/*
+ * Iteration on channel cpumask needs to issue a read barrier to match the write
+ * barrier in cpu hotplug. It orders the cpumask read before read of per-cpu
+ * buffer data. The per-cpu buffer is never removed by cpu hotplug; teardown is
+ * only performed at channel destruction.
+ */
+#define for_each_channel_cpu(cpu, chan)					\
+	for ((cpu) = -1;						\
+		({ (cpu) = cpumask_next(cpu, (chan)->backend.cpumask);	\
+		   smp_read_barrier_depends(); (cpu) < nr_cpu_ids; });)
+
+extern struct lib_ring_buffer *channel_get_ring_buffer(
+				const struct lib_ring_buffer_config *config,
+				struct channel *chan, int cpu);
+extern int lib_ring_buffer_open_read(struct lib_ring_buffer *buf);
+extern void lib_ring_buffer_release_read(struct lib_ring_buffer *buf);
+
+/*
+ * Read sequence: snapshot, many get_subbuf/put_subbuf, move_consumer.
+ */
+extern int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf,
+				    unsigned long *consumed,
+				    unsigned long *produced);
+extern void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf,
+					  unsigned long consumed_new);
+
+extern int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf,
+				      unsigned long consumed);
+extern void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf);
+
+/*
+ * lib_ring_buffer_get_next_subbuf/lib_ring_buffer_put_next_subbuf are helpers
+ * to read sub-buffers sequentially.
+ */
+static inline int lib_ring_buffer_get_next_subbuf(struct lib_ring_buffer *buf)
+{
+	int ret;
+
+	ret = lib_ring_buffer_snapshot(buf, &buf->cons_snapshot,
+				       &buf->prod_snapshot);
+	if (ret)
+		return ret;
+	ret = lib_ring_buffer_get_subbuf(buf, buf->cons_snapshot);
+	return ret;
+}
+
+static inline void lib_ring_buffer_put_next_subbuf(struct lib_ring_buffer *buf)
+{
+	lib_ring_buffer_put_subbuf(buf);
+	lib_ring_buffer_move_consumer(buf, subbuf_align(buf->cons_snapshot,
+						    buf->backend.chan));
+}
+
+extern void channel_reset(struct channel *chan);
+extern void lib_ring_buffer_reset(struct lib_ring_buffer *buf);
+
+static inline
+unsigned long lib_ring_buffer_get_offset(const struct lib_ring_buffer_config *config,
+					 struct lib_ring_buffer *buf)
+{
+	return v_read(config, &buf->offset);
+}
+
+static inline
+unsigned long lib_ring_buffer_get_consumed(const struct lib_ring_buffer_config *config,
+					   struct lib_ring_buffer *buf)
+{
+	return atomic_long_read(&buf->consumed);
+}
+
+/*
+ * Must call lib_ring_buffer_is_finalized before reading counters (memory
+ * ordering enforced with respect to trace teardown).
+ */
+static inline
+int lib_ring_buffer_is_finalized(const struct lib_ring_buffer_config *config,
+				 struct lib_ring_buffer *buf)
+{
+	int finalized = ACCESS_ONCE(buf->finalized);
+	/*
+	 * Read finalized before counters.
+	 */
+	smp_rmb();
+	return finalized;
+}
+
+static inline
+int lib_ring_buffer_channel_is_finalized(const struct channel *chan)
+{
+	return chan->finalized;
+}
+
+static inline
+int lib_ring_buffer_channel_is_disabled(const struct channel *chan)
+{
+	return atomic_read(&chan->record_disabled);
+}
+
+static inline
+unsigned long lib_ring_buffer_get_read_data_size(
+				const struct lib_ring_buffer_config *config,
+				struct lib_ring_buffer *buf)
+{
+	return subbuffer_get_read_data_size(config, &buf->backend);
+}
+
+static inline
+unsigned long lib_ring_buffer_get_records_count(
+				const struct lib_ring_buffer_config *config,
+				struct lib_ring_buffer *buf)
+{
+	return v_read(config, &buf->records_count);
+}
+
+static inline
+unsigned long lib_ring_buffer_get_records_overrun(
+				const struct lib_ring_buffer_config *config,
+				struct lib_ring_buffer *buf)
+{
+	return v_read(config, &buf->records_overrun);
+}
+
+static inline
+unsigned long lib_ring_buffer_get_records_lost_full(
+				const struct lib_ring_buffer_config *config,
+				struct lib_ring_buffer *buf)
+{
+	return v_read(config, &buf->records_lost_full);
+}
+
+static inline
+unsigned long lib_ring_buffer_get_records_lost_wrap(
+				const struct lib_ring_buffer_config *config,
+				struct lib_ring_buffer *buf)
+{
+	return v_read(config, &buf->records_lost_wrap);
+}
+
+static inline
+unsigned long lib_ring_buffer_get_records_lost_big(
+				const struct lib_ring_buffer_config *config,
+				struct lib_ring_buffer *buf)
+{
+	return v_read(config, &buf->records_lost_big);
+}
+
+static inline
+unsigned long lib_ring_buffer_get_records_read(
+				const struct lib_ring_buffer_config *config,
+				struct lib_ring_buffer *buf)
+{
+	return v_read(config, &buf->backend.records_read);
+}
+
+#endif /* _LINUX_RING_BUFFER_FRONTEND_H */
diff --git a/drivers/staging/lttng/lib/ringbuffer/frontend_api.h b/drivers/staging/lttng/lib/ringbuffer/frontend_api.h
new file mode 100644
index 0000000..391e593
--- /dev/null
+++ b/drivers/staging/lttng/lib/ringbuffer/frontend_api.h
@@ -0,0 +1,358 @@
+#ifndef _LINUX_RING_BUFFER_FRONTEND_API_H
+#define _LINUX_RING_BUFFER_FRONTEND_API_H
+
+/*
+ * linux/ringbuffer/frontend_api.h
+ *
+ * (C) Copyright 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * Ring Buffer Library Synchronization Header (buffer write API).
+ *
+ * Author:
+ *	Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * See ring_buffer_frontend.c for more information on wait-free algorithms.
+ * See linux/ringbuffer/frontend.h for channel allocation and read-side API.
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include "../../wrapper/ringbuffer/frontend.h"
+#include <linux/errno.h>
+
+/**
+ * lib_ring_buffer_get_cpu - Precedes ring buffer reserve/commit.
+ *
+ * Disables preemption (acts as a RCU read-side critical section) and keeps a
+ * ring buffer nesting count as supplementary safety net to ensure tracer client
+ * code will never trigger an endless recursion. Returns the processor ID on
+ * success, -EPERM on failure (nesting count too high).
+ *
+ * asm volatile and "memory" clobber prevent the compiler from moving
+ * instructions out of the ring buffer nesting count. This is required to ensure
+ * that probe side-effects which can cause recursion (e.g. unforeseen traps,
+ * divisions by 0, ...) are triggered within the incremented nesting count
+ * section.
+ */
+static inline
+int lib_ring_buffer_get_cpu(const struct lib_ring_buffer_config *config)
+{
+	int cpu, nesting;
+
+	rcu_read_lock_sched_notrace();
+	cpu = smp_processor_id();
+	nesting = ++per_cpu(lib_ring_buffer_nesting, cpu);
+	barrier();
+
+	if (unlikely(nesting > 4)) {
+		WARN_ON_ONCE(1);
+		per_cpu(lib_ring_buffer_nesting, cpu)--;
+		rcu_read_unlock_sched_notrace();
+		return -EPERM;
+	} else
+		return cpu;
+}
+
+/**
+ * lib_ring_buffer_put_cpu - Follows ring buffer reserve/commit.
+ */
+static inline
+void lib_ring_buffer_put_cpu(const struct lib_ring_buffer_config *config)
+{
+	barrier();
+	__get_cpu_var(lib_ring_buffer_nesting)--;
+	rcu_read_unlock_sched_notrace();
+}
+
+/*
+ * lib_ring_buffer_try_reserve is called by lib_ring_buffer_reserve(). It is not
+ * part of the API per se.
+ *
+ * returns 0 if reserve ok, or 1 if the slow path must be taken.
+ */
+static inline
+int lib_ring_buffer_try_reserve(const struct lib_ring_buffer_config *config,
+				struct lib_ring_buffer_ctx *ctx,
+				unsigned long *o_begin, unsigned long *o_end,
+				unsigned long *o_old, size_t *before_hdr_pad)
+{
+	struct channel *chan = ctx->chan;
+	struct lib_ring_buffer *buf = ctx->buf;
+	*o_begin = v_read(config, &buf->offset);
+	*o_old = *o_begin;
+
+	ctx->tsc = lib_ring_buffer_clock_read(chan);
+	if ((int64_t) ctx->tsc == -EIO)
+		return 1;
+
+	/*
+	 * Prefetch cacheline for read because we have to read the previous
+	 * commit counter to increment it and commit seq value to compare it to
+	 * the commit counter.
+	 */
+	prefetch(&buf->commit_hot[subbuf_index(*o_begin, chan)]);
+
+	if (last_tsc_overflow(config, buf, ctx->tsc))
+		ctx->rflags |= RING_BUFFER_RFLAG_FULL_TSC;
+
+	if (unlikely(subbuf_offset(*o_begin, chan) == 0))
+		return 1;
+
+	ctx->slot_size = record_header_size(config, chan, *o_begin,
+					    before_hdr_pad, ctx);
+	ctx->slot_size +=
+		lib_ring_buffer_align(*o_begin + ctx->slot_size,
+				      ctx->largest_align) + ctx->data_size;
+	if (unlikely((subbuf_offset(*o_begin, chan) + ctx->slot_size)
+		     > chan->backend.subbuf_size))
+		return 1;
+
+	/*
+	 * Record fits in the current buffer and we are not on a switch
+	 * boundary. It's safe to write.
+	 */
+	*o_end = *o_begin + ctx->slot_size;
+
+	if (unlikely((subbuf_offset(*o_end, chan)) == 0))
+		/*
+		 * The offset_end will fall at the very beginning of the next
+		 * subbuffer.
+		 */
+		return 1;
+
+	return 0;
+}
+
+/**
+ * lib_ring_buffer_reserve - Reserve space in a ring buffer.
+ * @config: ring buffer instance configuration.
+ * @ctx: ring buffer context. (input and output) Must be already initialized.
+ *
+ * Atomic wait-free slot reservation. The reserved space starts at the context
+ * "pre_offset". Its length is "slot_size". The associated time-stamp is "tsc".
+ *
+ * Return :
+ *  0 on success.
+ * -EAGAIN if channel is disabled.
+ * -ENOSPC if event size is too large for packet.
+ * -ENOBUFS if there is currently not enough space in buffer for the event.
+ * -EIO if data cannot be written into the buffer for any other reason.
+ */
+
+static inline
+int lib_ring_buffer_reserve(const struct lib_ring_buffer_config *config,
+			    struct lib_ring_buffer_ctx *ctx)
+{
+	struct channel *chan = ctx->chan;
+	struct lib_ring_buffer *buf;
+	unsigned long o_begin, o_end, o_old;
+	size_t before_hdr_pad = 0;
+
+	if (atomic_read(&chan->record_disabled))
+		return -EAGAIN;
+
+	if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+		buf = per_cpu_ptr(chan->backend.buf, ctx->cpu);
+	else
+		buf = chan->backend.buf;
+	if (atomic_read(&buf->record_disabled))
+		return -EAGAIN;
+	ctx->buf = buf;
+
+	/*
+	 * Perform retryable operations.
+	 */
+	if (unlikely(lib_ring_buffer_try_reserve(config, ctx, &o_begin,
+						 &o_end, &o_old, &before_hdr_pad)))
+		goto slow_path;
+
+	if (unlikely(v_cmpxchg(config, &ctx->buf->offset, o_old, o_end)
+		     != o_old))
+		goto slow_path;
+
+	/*
+	 * Atomically update last_tsc. This update races against concurrent
+	 * atomic updates, but the race will always cause supplementary full TSC
+	 * record headers, never the opposite (missing a full TSC record header
+	 * when it would be needed).
+	 */
+	save_last_tsc(config, ctx->buf, ctx->tsc);
+
+	/*
+	 * Push the reader if necessary
+	 */
+	lib_ring_buffer_reserve_push_reader(ctx->buf, chan, o_end - 1);
+
+	/*
+	 * Clear noref flag for this subbuffer.
+	 */
+	lib_ring_buffer_clear_noref(config, &ctx->buf->backend,
+				subbuf_index(o_end - 1, chan));
+
+	ctx->pre_offset = o_begin;
+	ctx->buf_offset = o_begin + before_hdr_pad;
+	return 0;
+slow_path:
+	return lib_ring_buffer_reserve_slow(ctx);
+}
+
+/**
+ * lib_ring_buffer_switch - Perform a sub-buffer switch for a per-cpu buffer.
+ * @config: ring buffer instance configuration.
+ * @buf: buffer
+ * @mode: buffer switch mode (SWITCH_ACTIVE or SWITCH_FLUSH)
+ *
+ * This operation is completely reentrant : can be called while tracing is
+ * active with absolutely no lock held.
+ *
+ * Note, however, that as a v_cmpxchg is used for some atomic operations and
+ * requires to be executed locally for per-CPU buffers, this function must be
+ * called from the CPU which owns the buffer for a ACTIVE flush, with preemption
+ * disabled, for RING_BUFFER_SYNC_PER_CPU configuration.
+ */
+static inline
+void lib_ring_buffer_switch(const struct lib_ring_buffer_config *config,
+			    struct lib_ring_buffer *buf, enum switch_mode mode)
+{
+	lib_ring_buffer_switch_slow(buf, mode);
+}
+
+/* See ring_buffer_frontend_api.h for lib_ring_buffer_reserve(). */
+
+/**
+ * lib_ring_buffer_commit - Commit an record.
+ * @config: ring buffer instance configuration.
+ * @ctx: ring buffer context. (input arguments only)
+ *
+ * Atomic unordered slot commit. Increments the commit count in the
+ * specified sub-buffer, and delivers it if necessary.
+ */
+static inline
+void lib_ring_buffer_commit(const struct lib_ring_buffer_config *config,
+			    const struct lib_ring_buffer_ctx *ctx)
+{
+	struct channel *chan = ctx->chan;
+	struct lib_ring_buffer *buf = ctx->buf;
+	unsigned long offset_end = ctx->buf_offset;
+	unsigned long endidx = subbuf_index(offset_end - 1, chan);
+	unsigned long commit_count;
+
+	/*
+	 * Must count record before incrementing the commit count.
+	 */
+	subbuffer_count_record(config, &buf->backend, endidx);
+
+	/*
+	 * Order all writes to buffer before the commit count update that will
+	 * determine that the subbuffer is full.
+	 */
+	if (config->ipi == RING_BUFFER_IPI_BARRIER) {
+		/*
+		 * Must write slot data before incrementing commit count.  This
+		 * compiler barrier is upgraded into a smp_mb() by the IPI sent
+		 * by get_subbuf().
+		 */
+		barrier();
+	} else
+		smp_wmb();
+
+	v_add(config, ctx->slot_size, &buf->commit_hot[endidx].cc);
+
+	/*
+	 * commit count read can race with concurrent OOO commit count updates.
+	 * This is only needed for lib_ring_buffer_check_deliver (for
+	 * non-polling delivery only) and for
+	 * lib_ring_buffer_write_commit_counter.  The race can only cause the
+	 * counter to be read with the same value more than once, which could
+	 * cause :
+	 * - Multiple delivery for the same sub-buffer (which is handled
+	 *   gracefully by the reader code) if the value is for a full
+	 *   sub-buffer. It's important that we can never miss a sub-buffer
+	 *   delivery. Re-reading the value after the v_add ensures this.
+	 * - Reading a commit_count with a higher value that what was actually
+	 *   added to it for the lib_ring_buffer_write_commit_counter call
+	 *   (again caused by a concurrent committer). It does not matter,
+	 *   because this function is interested in the fact that the commit
+	 *   count reaches back the reserve offset for a specific sub-buffer,
+	 *   which is completely independent of the order.
+	 */
+	commit_count = v_read(config, &buf->commit_hot[endidx].cc);
+
+	lib_ring_buffer_check_deliver(config, buf, chan, offset_end - 1,
+				      commit_count, endidx);
+	/*
+	 * Update used size at each commit. It's needed only for extracting
+	 * ring_buffer buffers from vmcore, after crash.
+	 */
+	lib_ring_buffer_write_commit_counter(config, buf, chan, endidx,
+					     ctx->buf_offset, commit_count,
+					 ctx->slot_size);
+}
+
+/**
+ * lib_ring_buffer_try_discard_reserve - Try discarding a record.
+ * @config: ring buffer instance configuration.
+ * @ctx: ring buffer context. (input arguments only)
+ *
+ * Only succeeds if no other record has been written after the record to
+ * discard. If discard fails, the record must be committed to the buffer.
+ *
+ * Returns 0 upon success, -EPERM if the record cannot be discarded.
+ */
+static inline
+int lib_ring_buffer_try_discard_reserve(const struct lib_ring_buffer_config *config,
+					const struct lib_ring_buffer_ctx *ctx)
+{
+	struct lib_ring_buffer *buf = ctx->buf;
+	unsigned long end_offset = ctx->pre_offset + ctx->slot_size;
+
+	/*
+	 * We need to ensure that if the cmpxchg succeeds and discards the
+	 * record, the next record will record a full TSC, because it cannot
+	 * rely on the last_tsc associated with the discarded record to detect
+	 * overflows. The only way to ensure this is to set the last_tsc to 0
+	 * (assuming no 64-bit TSC overflow), which forces to write a 64-bit
+	 * timestamp in the next record.
+	 *
+	 * Note: if discard fails, we must leave the TSC in the record header.
+	 * It is needed to keep track of TSC overflows for the following
+	 * records.
+	 */
+	save_last_tsc(config, buf, 0ULL);
+
+	if (likely(v_cmpxchg(config, &buf->offset, end_offset, ctx->pre_offset)
+		   != end_offset))
+		return -EPERM;
+	else
+		return 0;
+}
+
+static inline
+void channel_record_disable(const struct lib_ring_buffer_config *config,
+			    struct channel *chan)
+{
+	atomic_inc(&chan->record_disabled);
+}
+
+static inline
+void channel_record_enable(const struct lib_ring_buffer_config *config,
+			   struct channel *chan)
+{
+	atomic_dec(&chan->record_disabled);
+}
+
+static inline
+void lib_ring_buffer_record_disable(const struct lib_ring_buffer_config *config,
+				    struct lib_ring_buffer *buf)
+{
+	atomic_inc(&buf->record_disabled);
+}
+
+static inline
+void lib_ring_buffer_record_enable(const struct lib_ring_buffer_config *config,
+				   struct lib_ring_buffer *buf)
+{
+	atomic_dec(&buf->record_disabled);
+}
+
+#endif /* _LINUX_RING_BUFFER_FRONTEND_API_H */
diff --git a/drivers/staging/lttng/lib/ringbuffer/frontend_internal.h b/drivers/staging/lttng/lib/ringbuffer/frontend_internal.h
new file mode 100644
index 0000000..3bd5721
--- /dev/null
+++ b/drivers/staging/lttng/lib/ringbuffer/frontend_internal.h
@@ -0,0 +1,424 @@
+#ifndef _LINUX_RING_BUFFER_FRONTEND_INTERNAL_H
+#define _LINUX_RING_BUFFER_FRONTEND_INTERNAL_H
+
+/*
+ * linux/ringbuffer/frontend_internal.h
+ *
+ * (C) Copyright 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * Ring Buffer Library Synchronization Header (internal helpers).
+ *
+ * Author:
+ *	Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * See ring_buffer_frontend.c for more information on wait-free algorithms.
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include "../../wrapper/ringbuffer/config.h"
+#include "../../wrapper/ringbuffer/backend_types.h"
+#include "../../wrapper/ringbuffer/frontend_types.h"
+#include "../../lib/prio_heap/lttng_prio_heap.h"	/* For per-CPU read-side iterator */
+
+/* Buffer offset macros */
+
+/* buf_trunc mask selects only the buffer number. */
+static inline
+unsigned long buf_trunc(unsigned long offset, struct channel *chan)
+{
+	return offset & ~(chan->backend.buf_size - 1);
+
+}
+
+/* Select the buffer number value (counter). */
+static inline
+unsigned long buf_trunc_val(unsigned long offset, struct channel *chan)
+{
+	return buf_trunc(offset, chan) >> chan->backend.buf_size_order;
+}
+
+/* buf_offset mask selects only the offset within the current buffer. */
+static inline
+unsigned long buf_offset(unsigned long offset, struct channel *chan)
+{
+	return offset & (chan->backend.buf_size - 1);
+}
+
+/* subbuf_offset mask selects the offset within the current subbuffer. */
+static inline
+unsigned long subbuf_offset(unsigned long offset, struct channel *chan)
+{
+	return offset & (chan->backend.subbuf_size - 1);
+}
+
+/* subbuf_trunc mask selects the subbuffer number. */
+static inline
+unsigned long subbuf_trunc(unsigned long offset, struct channel *chan)
+{
+	return offset & ~(chan->backend.subbuf_size - 1);
+}
+
+/* subbuf_align aligns the offset to the next subbuffer. */
+static inline
+unsigned long subbuf_align(unsigned long offset, struct channel *chan)
+{
+	return (offset + chan->backend.subbuf_size)
+	       & ~(chan->backend.subbuf_size - 1);
+}
+
+/* subbuf_index returns the index of the current subbuffer within the buffer. */
+static inline
+unsigned long subbuf_index(unsigned long offset, struct channel *chan)
+{
+	return buf_offset(offset, chan) >> chan->backend.subbuf_size_order;
+}
+
+/*
+ * Last TSC comparison functions. Check if the current TSC overflows tsc_bits
+ * bits from the last TSC read. When overflows are detected, the full 64-bit
+ * timestamp counter should be written in the record header. Reads and writes
+ * last_tsc atomically.
+ */
+
+#if (BITS_PER_LONG == 32)
+static inline
+void save_last_tsc(const struct lib_ring_buffer_config *config,
+		   struct lib_ring_buffer *buf, u64 tsc)
+{
+	if (config->tsc_bits == 0 || config->tsc_bits == 64)
+		return;
+
+	/*
+	 * Ensure the compiler performs this update in a single instruction.
+	 */
+	v_set(config, &buf->last_tsc, (unsigned long)(tsc >> config->tsc_bits));
+}
+
+static inline
+int last_tsc_overflow(const struct lib_ring_buffer_config *config,
+		      struct lib_ring_buffer *buf, u64 tsc)
+{
+	unsigned long tsc_shifted;
+
+	if (config->tsc_bits == 0 || config->tsc_bits == 64)
+		return 0;
+
+	tsc_shifted = (unsigned long)(tsc >> config->tsc_bits);
+	if (unlikely(tsc_shifted
+		     - (unsigned long)v_read(config, &buf->last_tsc)))
+		return 1;
+	else
+		return 0;
+}
+#else
+static inline
+void save_last_tsc(const struct lib_ring_buffer_config *config,
+		   struct lib_ring_buffer *buf, u64 tsc)
+{
+	if (config->tsc_bits == 0 || config->tsc_bits == 64)
+		return;
+
+	v_set(config, &buf->last_tsc, (unsigned long)tsc);
+}
+
+static inline
+int last_tsc_overflow(const struct lib_ring_buffer_config *config,
+		      struct lib_ring_buffer *buf, u64 tsc)
+{
+	if (config->tsc_bits == 0 || config->tsc_bits == 64)
+		return 0;
+
+	if (unlikely((tsc - v_read(config, &buf->last_tsc))
+		     >> config->tsc_bits))
+		return 1;
+	else
+		return 0;
+}
+#endif
+
+extern
+int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx);
+
+extern
+void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf,
+				 enum switch_mode mode);
+
+/* Buffer write helpers */
+
+static inline
+void lib_ring_buffer_reserve_push_reader(struct lib_ring_buffer *buf,
+					 struct channel *chan,
+					 unsigned long offset)
+{
+	unsigned long consumed_old, consumed_new;
+
+	do {
+		consumed_old = atomic_long_read(&buf->consumed);
+		/*
+		 * If buffer is in overwrite mode, push the reader consumed
+		 * count if the write position has reached it and we are not
+		 * at the first iteration (don't push the reader farther than
+		 * the writer). This operation can be done concurrently by many
+		 * writers in the same buffer, the writer being at the farthest
+		 * write position sub-buffer index in the buffer being the one
+		 * which will win this loop.
+		 */
+		if (unlikely(subbuf_trunc(offset, chan)
+			      - subbuf_trunc(consumed_old, chan)
+			     >= chan->backend.buf_size))
+			consumed_new = subbuf_align(consumed_old, chan);
+		else
+			return;
+	} while (unlikely(atomic_long_cmpxchg(&buf->consumed, consumed_old,
+					      consumed_new) != consumed_old));
+}
+
+static inline
+void lib_ring_buffer_vmcore_check_deliver(const struct lib_ring_buffer_config *config,
+					  struct lib_ring_buffer *buf,
+				          unsigned long commit_count,
+				          unsigned long idx)
+{
+	if (config->oops == RING_BUFFER_OOPS_CONSISTENCY)
+		v_set(config, &buf->commit_hot[idx].seq, commit_count);
+}
+
+static inline
+int lib_ring_buffer_poll_deliver(const struct lib_ring_buffer_config *config,
+				 struct lib_ring_buffer *buf,
+			         struct channel *chan)
+{
+	unsigned long consumed_old, consumed_idx, commit_count, write_offset;
+
+	consumed_old = atomic_long_read(&buf->consumed);
+	consumed_idx = subbuf_index(consumed_old, chan);
+	commit_count = v_read(config, &buf->commit_cold[consumed_idx].cc_sb);
+	/*
+	 * No memory barrier here, since we are only interested
+	 * in a statistically correct polling result. The next poll will
+	 * get the data is we are racing. The mb() that ensures correct
+	 * memory order is in get_subbuf.
+	 */
+	write_offset = v_read(config, &buf->offset);
+
+	/*
+	 * Check that the subbuffer we are trying to consume has been
+	 * already fully committed.
+	 */
+
+	if (((commit_count - chan->backend.subbuf_size)
+	     & chan->commit_count_mask)
+	    - (buf_trunc(consumed_old, chan)
+	       >> chan->backend.num_subbuf_order)
+	    != 0)
+		return 0;
+
+	/*
+	 * Check that we are not about to read the same subbuffer in
+	 * which the writer head is.
+	 */
+	if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_old, chan)
+	    == 0)
+		return 0;
+
+	return 1;
+
+}
+
+static inline
+int lib_ring_buffer_pending_data(const struct lib_ring_buffer_config *config,
+				 struct lib_ring_buffer *buf,
+				 struct channel *chan)
+{
+	return !!subbuf_offset(v_read(config, &buf->offset), chan);
+}
+
+static inline
+unsigned long lib_ring_buffer_get_data_size(const struct lib_ring_buffer_config *config,
+					    struct lib_ring_buffer *buf,
+					    unsigned long idx)
+{
+	return subbuffer_get_data_size(config, &buf->backend, idx);
+}
+
+/*
+ * Check if all space reservation in a buffer have been committed. This helps
+ * knowing if an execution context is nested (for per-cpu buffers only).
+ * This is a very specific ftrace use-case, so we keep this as "internal" API.
+ */
+static inline
+int lib_ring_buffer_reserve_committed(const struct lib_ring_buffer_config *config,
+				      struct lib_ring_buffer *buf,
+				      struct channel *chan)
+{
+	unsigned long offset, idx, commit_count;
+
+	CHAN_WARN_ON(chan, config->alloc != RING_BUFFER_ALLOC_PER_CPU);
+	CHAN_WARN_ON(chan, config->sync != RING_BUFFER_SYNC_PER_CPU);
+
+	/*
+	 * Read offset and commit count in a loop so they are both read
+	 * atomically wrt interrupts. By deal with interrupt concurrency by
+	 * restarting both reads if the offset has been pushed. Note that given
+	 * we only have to deal with interrupt concurrency here, an interrupt
+	 * modifying the commit count will also modify "offset", so it is safe
+	 * to only check for offset modifications.
+	 */
+	do {
+		offset = v_read(config, &buf->offset);
+		idx = subbuf_index(offset, chan);
+		commit_count = v_read(config, &buf->commit_hot[idx].cc);
+	} while (offset != v_read(config, &buf->offset));
+
+	return ((buf_trunc(offset, chan) >> chan->backend.num_subbuf_order)
+		     - (commit_count & chan->commit_count_mask) == 0);
+}
+
+static inline
+void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config,
+				   struct lib_ring_buffer *buf,
+			           struct channel *chan,
+			           unsigned long offset,
+				   unsigned long commit_count,
+			           unsigned long idx)
+{
+	unsigned long old_commit_count = commit_count
+					 - chan->backend.subbuf_size;
+	u64 tsc;
+
+	/* Check if all commits have been done */
+	if (unlikely((buf_trunc(offset, chan) >> chan->backend.num_subbuf_order)
+		     - (old_commit_count & chan->commit_count_mask) == 0)) {
+		/*
+		 * If we succeeded at updating cc_sb below, we are the subbuffer
+		 * writer delivering the subbuffer. Deals with concurrent
+		 * updates of the "cc" value without adding a add_return atomic
+		 * operation to the fast path.
+		 *
+		 * We are doing the delivery in two steps:
+		 * - First, we cmpxchg() cc_sb to the new value
+		 *   old_commit_count + 1. This ensures that we are the only
+		 *   subbuffer user successfully filling the subbuffer, but we
+		 *   do _not_ set the cc_sb value to "commit_count" yet.
+		 *   Therefore, other writers that would wrap around the ring
+		 *   buffer and try to start writing to our subbuffer would
+		 *   have to drop records, because it would appear as
+		 *   non-filled.
+		 *   We therefore have exclusive access to the subbuffer control
+		 *   structures.  This mutual exclusion with other writers is
+		 *   crucially important to perform record overruns count in
+		 *   flight recorder mode locklessly.
+		 * - When we are ready to release the subbuffer (either for
+		 *   reading or for overrun by other writers), we simply set the
+		 *   cc_sb value to "commit_count" and perform delivery.
+		 *
+		 * The subbuffer size is least 2 bytes (minimum size: 1 page).
+		 * This guarantees that old_commit_count + 1 != commit_count.
+		 */
+		if (likely(v_cmpxchg(config, &buf->commit_cold[idx].cc_sb,
+					 old_commit_count, old_commit_count + 1)
+			   == old_commit_count)) {
+			/*
+			 * Start of exclusive subbuffer access. We are
+			 * guaranteed to be the last writer in this subbuffer
+			 * and any other writer trying to access this subbuffer
+			 * in this state is required to drop records.
+			 */
+			tsc = config->cb.ring_buffer_clock_read(chan);
+			v_add(config,
+			      subbuffer_get_records_count(config,
+							  &buf->backend, idx),
+			      &buf->records_count);
+			v_add(config,
+			      subbuffer_count_records_overrun(config,
+							      &buf->backend,
+							      idx),
+			      &buf->records_overrun);
+			config->cb.buffer_end(buf, tsc, idx,
+					      lib_ring_buffer_get_data_size(config,
+									buf,
+									idx));
+
+			/*
+			 * Set noref flag and offset for this subbuffer id.
+			 * Contains a memory barrier that ensures counter stores
+			 * are ordered before set noref and offset.
+			 */
+			lib_ring_buffer_set_noref_offset(config, &buf->backend, idx,
+							 buf_trunc_val(offset, chan));
+
+			/*
+			 * Order set_noref and record counter updates before the
+			 * end of subbuffer exclusive access. Orders with
+			 * respect to writers coming into the subbuffer after
+			 * wrap around, and also order wrt concurrent readers.
+			 */
+			smp_mb();
+			/* End of exclusive subbuffer access */
+			v_set(config, &buf->commit_cold[idx].cc_sb,
+			      commit_count);
+			lib_ring_buffer_vmcore_check_deliver(config, buf,
+							 commit_count, idx);
+
+			/*
+			 * RING_BUFFER_WAKEUP_BY_WRITER wakeup is not lock-free.
+			 */
+			if (config->wakeup == RING_BUFFER_WAKEUP_BY_WRITER
+			    && atomic_long_read(&buf->active_readers)
+			    && lib_ring_buffer_poll_deliver(config, buf, chan)) {
+				wake_up_interruptible(&buf->read_wait);
+				wake_up_interruptible(&chan->read_wait);
+			}
+
+		}
+	}
+}
+
+/*
+ * lib_ring_buffer_write_commit_counter
+ *
+ * For flight recording. must be called after commit.
+ * This function increments the subbuffer's commit_seq counter each time the
+ * commit count reaches back the reserve offset (modulo subbuffer size). It is
+ * useful for crash dump.
+ */
+static inline
+void lib_ring_buffer_write_commit_counter(const struct lib_ring_buffer_config *config,
+					  struct lib_ring_buffer *buf,
+				          struct channel *chan,
+				          unsigned long idx,
+				          unsigned long buf_offset,
+				          unsigned long commit_count,
+				          size_t slot_size)
+{
+	unsigned long offset, commit_seq_old;
+
+	if (config->oops != RING_BUFFER_OOPS_CONSISTENCY)
+		return;
+
+	offset = buf_offset + slot_size;
+
+	/*
+	 * subbuf_offset includes commit_count_mask. We can simply
+	 * compare the offsets within the subbuffer without caring about
+	 * buffer full/empty mismatch because offset is never zero here
+	 * (subbuffer header and record headers have non-zero length).
+	 */
+	if (unlikely(subbuf_offset(offset - commit_count, chan)))
+		return;
+
+	commit_seq_old = v_read(config, &buf->commit_hot[idx].seq);
+	while ((long) (commit_seq_old - commit_count) < 0)
+		commit_seq_old = v_cmpxchg(config, &buf->commit_hot[idx].seq,
+					   commit_seq_old, commit_count);
+}
+
+extern int lib_ring_buffer_create(struct lib_ring_buffer *buf,
+				  struct channel_backend *chanb, int cpu);
+extern void lib_ring_buffer_free(struct lib_ring_buffer *buf);
+
+/* Keep track of trap nesting inside ring buffer code */
+DECLARE_PER_CPU(unsigned int, lib_ring_buffer_nesting);
+
+#endif /* _LINUX_RING_BUFFER_FRONTEND_INTERNAL_H */
diff --git a/drivers/staging/lttng/lib/ringbuffer/frontend_types.h b/drivers/staging/lttng/lib/ringbuffer/frontend_types.h
new file mode 100644
index 0000000..5c7437f
--- /dev/null
+++ b/drivers/staging/lttng/lib/ringbuffer/frontend_types.h
@@ -0,0 +1,176 @@
+#ifndef _LINUX_RING_BUFFER_FRONTEND_TYPES_H
+#define _LINUX_RING_BUFFER_FRONTEND_TYPES_H
+
+/*
+ * linux/ringbuffer/frontend_types.h
+ *
+ * (C) Copyright 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * Ring Buffer Library Synchronization Header (types).
+ *
+ * Author:
+ *	Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * See ring_buffer_frontend.c for more information on wait-free algorithms.
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <linux/kref.h>
+#include "../../wrapper/ringbuffer/config.h"
+#include "../../wrapper/ringbuffer/backend_types.h"
+#include "../../wrapper/spinlock.h"
+#include "../../lib/prio_heap/lttng_prio_heap.h"	/* For per-CPU read-side iterator */
+
+/*
+ * A switch is done during tracing or as a final flush after tracing (so it
+ * won't write in the new sub-buffer).
+ */
+enum switch_mode { SWITCH_ACTIVE, SWITCH_FLUSH };
+
+/* channel-level read-side iterator */
+struct channel_iter {
+	/* Prio heap of buffers. Lowest timestamps at the top. */
+	struct lttng_ptr_heap heap;	/* Heap of struct lib_ring_buffer ptrs */
+	struct list_head empty_head;	/* Empty buffers linked-list head */
+	int read_open;			/* Opened for reading ? */
+	u64 last_qs;			/* Last quiescent state timestamp */
+	u64 last_timestamp;		/* Last timestamp (for WARN_ON) */
+	int last_cpu;			/* Last timestamp cpu */
+	/*
+	 * read() file operation state.
+	 */
+	unsigned long len_left;
+};
+
+/* channel: collection of per-cpu ring buffers. */
+struct channel {
+	atomic_t record_disabled;
+	unsigned long commit_count_mask;	/*
+						 * Commit count mask, removing
+						 * the MSBs corresponding to
+						 * bits used to represent the
+						 * subbuffer index.
+						 */
+
+	struct channel_backend backend;		/* Associated backend */
+
+	unsigned long switch_timer_interval;	/* Buffer flush (jiffies) */
+	unsigned long read_timer_interval;	/* Reader wakeup (jiffies) */
+	struct notifier_block cpu_hp_notifier;	/* CPU hotplug notifier */
+	struct notifier_block tick_nohz_notifier; /* CPU nohz notifier */
+	struct notifier_block hp_iter_notifier;	/* hotplug iterator notifier */
+	int cpu_hp_enable:1;			/* Enable CPU hotplug notif. */
+	int hp_iter_enable:1;			/* Enable hp iter notif. */
+	wait_queue_head_t read_wait;		/* reader wait queue */
+	wait_queue_head_t hp_wait;		/* CPU hotplug wait queue */
+	int finalized;				/* Has channel been finalized */
+	struct channel_iter iter;		/* Channel read-side iterator */
+	struct kref ref;			/* Reference count */
+};
+
+/* Per-subbuffer commit counters used on the hot path */
+struct commit_counters_hot {
+	union v_atomic cc;		/* Commit counter */
+	union v_atomic seq;		/* Consecutive commits */
+};
+
+/* Per-subbuffer commit counters used only on cold paths */
+struct commit_counters_cold {
+	union v_atomic cc_sb;		/* Incremented _once_ at sb switch */
+};
+
+/* Per-buffer read iterator */
+struct lib_ring_buffer_iter {
+	u64 timestamp;			/* Current record timestamp */
+	size_t header_len;		/* Current record header length */
+	size_t payload_len;		/* Current record payload length */
+
+	struct list_head empty_node;	/* Linked list of empty buffers */
+	unsigned long consumed, read_offset, data_size;
+	enum {
+		ITER_GET_SUBBUF = 0,
+		ITER_TEST_RECORD,
+		ITER_NEXT_RECORD,
+		ITER_PUT_SUBBUF,
+	} state;
+	int allocated:1;
+	int read_open:1;		/* Opened for reading ? */
+};
+
+/* ring buffer state */
+struct lib_ring_buffer {
+	/* First 32 bytes cache-hot cacheline */
+	union v_atomic offset;		/* Current offset in the buffer */
+	struct commit_counters_hot *commit_hot;
+					/* Commit count per sub-buffer */
+	atomic_long_t consumed;		/*
+					 * Current offset in the buffer
+					 * standard atomic access (shared)
+					 */
+	atomic_t record_disabled;
+	/* End of first 32 bytes cacheline */
+	union v_atomic last_tsc;	/*
+					 * Last timestamp written in the buffer.
+					 */
+
+	struct lib_ring_buffer_backend backend;	/* Associated backend */
+
+	struct commit_counters_cold *commit_cold;
+					/* Commit count per sub-buffer */
+	atomic_long_t active_readers;	/*
+					 * Active readers count
+					 * standard atomic access (shared)
+					 */
+					/* Dropped records */
+	union v_atomic records_lost_full;	/* Buffer full */
+	union v_atomic records_lost_wrap;	/* Nested wrap-around */
+	union v_atomic records_lost_big;	/* Events too big */
+	union v_atomic records_count;	/* Number of records written */
+	union v_atomic records_overrun;	/* Number of overwritten records */
+	wait_queue_head_t read_wait;	/* reader buffer-level wait queue */
+	wait_queue_head_t write_wait;	/* writer buffer-level wait queue (for metadata only) */
+	int finalized;			/* buffer has been finalized */
+	struct timer_list switch_timer;	/* timer for periodical switch */
+	struct timer_list read_timer;	/* timer for read poll */
+	raw_spinlock_t raw_tick_nohz_spinlock;	/* nohz entry lock/trylock */
+	struct lib_ring_buffer_iter iter;	/* read-side iterator */
+	unsigned long get_subbuf_consumed;	/* Read-side consumed */
+	unsigned long prod_snapshot;	/* Producer count snapshot */
+	unsigned long cons_snapshot;	/* Consumer count snapshot */
+	int get_subbuf:1;		/* Sub-buffer being held by reader */
+	int switch_timer_enabled:1;	/* Protected by ring_buffer_nohz_lock */
+	int read_timer_enabled:1;	/* Protected by ring_buffer_nohz_lock */
+};
+
+static inline
+void *channel_get_private(struct channel *chan)
+{
+	return chan->backend.priv;
+}
+
+/*
+ * Issue warnings and disable channels upon internal error.
+ * Can receive struct lib_ring_buffer or struct lib_ring_buffer_backend
+ * parameters.
+ */
+#define CHAN_WARN_ON(c, cond)						\
+	({								\
+		struct channel *__chan;					\
+		int _____ret = unlikely(cond);				\
+		if (_____ret) {						\
+			if (__same_type(*(c), struct channel_backend))	\
+				__chan = container_of((void *) (c),	\
+							struct channel, \
+							backend);	\
+			else if (__same_type(*(c), struct channel))	\
+				__chan = (void *) (c);			\
+			else						\
+				BUG_ON(1);				\
+			atomic_inc(&__chan->record_disabled);		\
+			WARN_ON(1);					\
+		}							\
+		_____ret;						\
+	})
+
+#endif /* _LINUX_RING_BUFFER_FRONTEND_TYPES_H */
diff --git a/drivers/staging/lttng/lib/ringbuffer/iterator.h b/drivers/staging/lttng/lib/ringbuffer/iterator.h
new file mode 100644
index 0000000..f2bd50d
--- /dev/null
+++ b/drivers/staging/lttng/lib/ringbuffer/iterator.h
@@ -0,0 +1,70 @@
+#ifndef _LINUX_RING_BUFFER_ITERATOR_H
+#define _LINUX_RING_BUFFER_ITERATOR_H
+
+/*
+ * linux/ringbuffer/iterator.h
+ *
+ * (C) Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * Ring buffer and channel iterators.
+ *
+ * Author:
+ *	Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include "../../wrapper/ringbuffer/backend.h"
+#include "../../wrapper/ringbuffer/frontend.h"
+
+/*
+ * lib_ring_buffer_get_next_record advances the buffer read position to the next
+ * record. It returns either the size of the next record, -EAGAIN if there is
+ * currently no data available, or -ENODATA if no data is available and buffer
+ * is finalized.
+ */
+extern ssize_t lib_ring_buffer_get_next_record(struct channel *chan,
+					       struct lib_ring_buffer *buf);
+
+/*
+ * channel_get_next_record advances the buffer read position to the next record.
+ * It returns either the size of the next record, -EAGAIN if there is currently
+ * no data available, or -ENODATA if no data is available and buffer is
+ * finalized.
+ * Returns the current buffer in ret_buf.
+ */
+extern ssize_t channel_get_next_record(struct channel *chan,
+				       struct lib_ring_buffer **ret_buf);
+
+/**
+ * read_current_record - copy the buffer current record into dest.
+ * @buf: ring buffer
+ * @dest: destination where the record should be copied
+ *
+ * dest should be large enough to contain the record. Returns the number of
+ * bytes copied.
+ */
+static inline size_t read_current_record(struct lib_ring_buffer *buf, void *dest)
+{
+	return lib_ring_buffer_read(&buf->backend, buf->iter.read_offset,
+				    dest, buf->iter.payload_len);
+}
+
+extern int lib_ring_buffer_iterator_open(struct lib_ring_buffer *buf);
+extern void lib_ring_buffer_iterator_release(struct lib_ring_buffer *buf);
+extern int channel_iterator_open(struct channel *chan);
+extern void channel_iterator_release(struct channel *chan);
+
+extern const struct file_operations channel_payload_file_operations;
+extern const struct file_operations lib_ring_buffer_payload_file_operations;
+
+/*
+ * Used internally.
+ */
+int channel_iterator_init(struct channel *chan);
+void channel_iterator_unregister_notifiers(struct channel *chan);
+void channel_iterator_free(struct channel *chan);
+void channel_iterator_reset(struct channel *chan);
+void lib_ring_buffer_iterator_reset(struct lib_ring_buffer *buf);
+
+#endif /* _LINUX_RING_BUFFER_ITERATOR_H */
diff --git a/drivers/staging/lttng/lib/ringbuffer/nohz.h b/drivers/staging/lttng/lib/ringbuffer/nohz.h
new file mode 100644
index 0000000..3c31072
--- /dev/null
+++ b/drivers/staging/lttng/lib/ringbuffer/nohz.h
@@ -0,0 +1,30 @@
+#ifndef _LINUX_RING_BUFFER_NOHZ_H
+#define _LINUX_RING_BUFFER_NOHZ_H
+
+/*
+ * ringbuffer/nohz.h
+ *
+ * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#ifdef CONFIG_LIB_RING_BUFFER
+void lib_ring_buffer_tick_nohz_flush(void);
+void lib_ring_buffer_tick_nohz_stop(void);
+void lib_ring_buffer_tick_nohz_restart(void);
+#else
+static inline void lib_ring_buffer_tick_nohz_flush(void)
+{
+}
+
+static inline void lib_ring_buffer_tick_nohz_stop(void)
+{
+}
+
+static inline void lib_ring_buffer_tick_nohz_restart(void)
+{
+}
+#endif
+
+#endif /* _LINUX_RING_BUFFER_NOHZ_H */
diff --git a/drivers/staging/lttng/lib/ringbuffer/ring_buffer_backend.c b/drivers/staging/lttng/lib/ringbuffer/ring_buffer_backend.c
new file mode 100644
index 0000000..d1b5b8c
--- /dev/null
+++ b/drivers/staging/lttng/lib/ringbuffer/ring_buffer_backend.c
@@ -0,0 +1,854 @@
+/*
+ * ring_buffer_backend.c
+ *
+ * Copyright (C) 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <linux/stddef.h>
+#include <linux/module.h>
+#include <linux/string.h>
+#include <linux/bitops.h>
+#include <linux/delay.h>
+#include <linux/errno.h>
+#include <linux/slab.h>
+#include <linux/cpu.h>
+#include <linux/mm.h>
+
+#include "../../wrapper/vmalloc.h"	/* for wrapper_vmalloc_sync_all() */
+#include "../../wrapper/ringbuffer/config.h"
+#include "../../wrapper/ringbuffer/backend.h"
+#include "../../wrapper/ringbuffer/frontend.h"
+
+/**
+ * lib_ring_buffer_backend_allocate - allocate a channel buffer
+ * @config: ring buffer instance configuration
+ * @buf: the buffer struct
+ * @size: total size of the buffer
+ * @num_subbuf: number of subbuffers
+ * @extra_reader_sb: need extra subbuffer for reader
+ */
+static
+int lib_ring_buffer_backend_allocate(const struct lib_ring_buffer_config *config,
+				     struct lib_ring_buffer_backend *bufb,
+				     size_t size, size_t num_subbuf,
+				     int extra_reader_sb)
+{
+	struct channel_backend *chanb = &bufb->chan->backend;
+	unsigned long j, num_pages, num_pages_per_subbuf, page_idx = 0;
+	unsigned long subbuf_size, mmap_offset = 0;
+	unsigned long num_subbuf_alloc;
+	struct page **pages;
+	void **virt;
+	unsigned long i;
+
+	num_pages = size >> PAGE_SHIFT;
+	num_pages_per_subbuf = num_pages >> get_count_order(num_subbuf);
+	subbuf_size = chanb->subbuf_size;
+	num_subbuf_alloc = num_subbuf;
+
+	if (extra_reader_sb) {
+		num_pages += num_pages_per_subbuf; /* Add pages for reader */
+		num_subbuf_alloc++;
+	}
+
+	pages = kmalloc_node(ALIGN(sizeof(*pages) * num_pages,
+				   1 << INTERNODE_CACHE_SHIFT),
+			GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
+	if (unlikely(!pages))
+		goto pages_error;
+
+	virt = kmalloc_node(ALIGN(sizeof(*virt) * num_pages,
+				  1 << INTERNODE_CACHE_SHIFT),
+			GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
+	if (unlikely(!virt))
+		goto virt_error;
+
+	bufb->array = kmalloc_node(ALIGN(sizeof(*bufb->array)
+					 * num_subbuf_alloc,
+				  1 << INTERNODE_CACHE_SHIFT),
+			GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
+	if (unlikely(!bufb->array))
+		goto array_error;
+
+	for (i = 0; i < num_pages; i++) {
+		pages[i] = alloc_pages_node(cpu_to_node(max(bufb->cpu, 0)),
+					    GFP_KERNEL | __GFP_ZERO, 0);
+		if (unlikely(!pages[i]))
+			goto depopulate;
+		virt[i] = page_address(pages[i]);
+	}
+	bufb->num_pages_per_subbuf = num_pages_per_subbuf;
+
+	/* Allocate backend pages array elements */
+	for (i = 0; i < num_subbuf_alloc; i++) {
+		bufb->array[i] =
+			kzalloc_node(ALIGN(
+				sizeof(struct lib_ring_buffer_backend_pages) +
+				sizeof(struct lib_ring_buffer_backend_page)
+				* num_pages_per_subbuf,
+				1 << INTERNODE_CACHE_SHIFT),
+				GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
+		if (!bufb->array[i])
+			goto free_array;
+	}
+
+	/* Allocate write-side subbuffer table */
+	bufb->buf_wsb = kzalloc_node(ALIGN(
+				sizeof(struct lib_ring_buffer_backend_subbuffer)
+				* num_subbuf,
+				1 << INTERNODE_CACHE_SHIFT),
+				GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
+	if (unlikely(!bufb->buf_wsb))
+		goto free_array;
+
+	for (i = 0; i < num_subbuf; i++)
+		bufb->buf_wsb[i].id = subbuffer_id(config, 0, 1, i);
+
+	/* Assign read-side subbuffer table */
+	if (extra_reader_sb)
+		bufb->buf_rsb.id = subbuffer_id(config, 0, 1,
+						num_subbuf_alloc - 1);
+	else
+		bufb->buf_rsb.id = subbuffer_id(config, 0, 1, 0);
+
+	/* Assign pages to page index */
+	for (i = 0; i < num_subbuf_alloc; i++) {
+		for (j = 0; j < num_pages_per_subbuf; j++) {
+			CHAN_WARN_ON(chanb, page_idx > num_pages);
+			bufb->array[i]->p[j].virt = virt[page_idx];
+			bufb->array[i]->p[j].page = pages[page_idx];
+			page_idx++;
+		}
+		if (config->output == RING_BUFFER_MMAP) {
+			bufb->array[i]->mmap_offset = mmap_offset;
+			mmap_offset += subbuf_size;
+		}
+	}
+
+	/*
+	 * If kmalloc ever uses vmalloc underneath, make sure the buffer pages
+	 * will not fault.
+	 */
+	wrapper_vmalloc_sync_all();
+	kfree(virt);
+	kfree(pages);
+	return 0;
+
+free_array:
+	for (i = 0; (i < num_subbuf_alloc && bufb->array[i]); i++)
+		kfree(bufb->array[i]);
+depopulate:
+	/* Free all allocated pages */
+	for (i = 0; (i < num_pages && pages[i]); i++)
+		__free_page(pages[i]);
+	kfree(bufb->array);
+array_error:
+	kfree(virt);
+virt_error:
+	kfree(pages);
+pages_error:
+	return -ENOMEM;
+}
+
+int lib_ring_buffer_backend_create(struct lib_ring_buffer_backend *bufb,
+				   struct channel_backend *chanb, int cpu)
+{
+	const struct lib_ring_buffer_config *config = chanb->config;
+
+	bufb->chan = container_of(chanb, struct channel, backend);
+	bufb->cpu = cpu;
+
+	return lib_ring_buffer_backend_allocate(config, bufb, chanb->buf_size,
+						chanb->num_subbuf,
+						chanb->extra_reader_sb);
+}
+
+void lib_ring_buffer_backend_free(struct lib_ring_buffer_backend *bufb)
+{
+	struct channel_backend *chanb = &bufb->chan->backend;
+	unsigned long i, j, num_subbuf_alloc;
+
+	num_subbuf_alloc = chanb->num_subbuf;
+	if (chanb->extra_reader_sb)
+		num_subbuf_alloc++;
+
+	kfree(bufb->buf_wsb);
+	for (i = 0; i < num_subbuf_alloc; i++) {
+		for (j = 0; j < bufb->num_pages_per_subbuf; j++)
+			__free_page(bufb->array[i]->p[j].page);
+		kfree(bufb->array[i]);
+	}
+	kfree(bufb->array);
+	bufb->allocated = 0;
+}
+
+void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb)
+{
+	struct channel_backend *chanb = &bufb->chan->backend;
+	const struct lib_ring_buffer_config *config = chanb->config;
+	unsigned long num_subbuf_alloc;
+	unsigned int i;
+
+	num_subbuf_alloc = chanb->num_subbuf;
+	if (chanb->extra_reader_sb)
+		num_subbuf_alloc++;
+
+	for (i = 0; i < chanb->num_subbuf; i++)
+		bufb->buf_wsb[i].id = subbuffer_id(config, 0, 1, i);
+	if (chanb->extra_reader_sb)
+		bufb->buf_rsb.id = subbuffer_id(config, 0, 1,
+						num_subbuf_alloc - 1);
+	else
+		bufb->buf_rsb.id = subbuffer_id(config, 0, 1, 0);
+
+	for (i = 0; i < num_subbuf_alloc; i++) {
+		/* Don't reset mmap_offset */
+		v_set(config, &bufb->array[i]->records_commit, 0);
+		v_set(config, &bufb->array[i]->records_unread, 0);
+		bufb->array[i]->data_size = 0;
+		/* Don't reset backend page and virt addresses */
+	}
+	/* Don't reset num_pages_per_subbuf, cpu, allocated */
+	v_set(config, &bufb->records_read, 0);
+}
+
+/*
+ * The frontend is responsible for also calling ring_buffer_backend_reset for
+ * each buffer when calling channel_backend_reset.
+ */
+void channel_backend_reset(struct channel_backend *chanb)
+{
+	struct channel *chan = container_of(chanb, struct channel, backend);
+	const struct lib_ring_buffer_config *config = chanb->config;
+
+	/*
+	 * Don't reset buf_size, subbuf_size, subbuf_size_order,
+	 * num_subbuf_order, buf_size_order, extra_reader_sb, num_subbuf,
+	 * priv, notifiers, config, cpumask and name.
+	 */
+	chanb->start_tsc = config->cb.ring_buffer_clock_read(chan);
+}
+
+#ifdef CONFIG_HOTPLUG_CPU
+/**
+ *	lib_ring_buffer_cpu_hp_callback - CPU hotplug callback
+ *	@nb: notifier block
+ *	@action: hotplug action to take
+ *	@hcpu: CPU number
+ *
+ *	Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
+ */
+static
+int __cpuinit lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb,
+					      unsigned long action,
+					      void *hcpu)
+{
+	unsigned int cpu = (unsigned long)hcpu;
+	struct channel_backend *chanb = container_of(nb, struct channel_backend,
+						     cpu_hp_notifier);
+	const struct lib_ring_buffer_config *config = chanb->config;
+	struct lib_ring_buffer *buf;
+	int ret;
+
+	CHAN_WARN_ON(chanb, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
+
+	switch (action) {
+	case CPU_UP_PREPARE:
+	case CPU_UP_PREPARE_FROZEN:
+		buf = per_cpu_ptr(chanb->buf, cpu);
+		ret = lib_ring_buffer_create(buf, chanb, cpu);
+		if (ret) {
+			printk(KERN_ERR
+			  "ring_buffer_cpu_hp_callback: cpu %d "
+			  "buffer creation failed\n", cpu);
+			return NOTIFY_BAD;
+		}
+		break;
+	case CPU_DEAD:
+	case CPU_DEAD_FROZEN:
+		/* No need to do a buffer switch here, because it will happen
+		 * when tracing is stopped, or will be done by switch timer CPU
+		 * DEAD callback. */
+		break;
+	}
+	return NOTIFY_OK;
+}
+#endif
+
+/**
+ * channel_backend_init - initialize a channel backend
+ * @chanb: channel backend
+ * @name: channel name
+ * @config: client ring buffer configuration
+ * @priv: client private data
+ * @parent: dentry of parent directory, %NULL for root directory
+ * @subbuf_size: size of sub-buffers (> PAGE_SIZE, power of 2)
+ * @num_subbuf: number of sub-buffers (power of 2)
+ *
+ * Returns channel pointer if successful, %NULL otherwise.
+ *
+ * Creates per-cpu channel buffers using the sizes and attributes
+ * specified.  The created channel buffer files will be named
+ * name_0...name_N-1.  File permissions will be %S_IRUSR.
+ *
+ * Called with CPU hotplug disabled.
+ */
+int channel_backend_init(struct channel_backend *chanb,
+			 const char *name,
+			 const struct lib_ring_buffer_config *config,
+			 void *priv, size_t subbuf_size, size_t num_subbuf)
+{
+	struct channel *chan = container_of(chanb, struct channel, backend);
+	unsigned int i;
+	int ret;
+
+	if (!name)
+		return -EPERM;
+
+	if (!(subbuf_size && num_subbuf))
+		return -EPERM;
+
+	/* Check that the subbuffer size is larger than a page. */
+	if (subbuf_size < PAGE_SIZE)
+		return -EINVAL;
+
+	/*
+	 * Make sure the number of subbuffers and subbuffer size are power of 2.
+	 */
+	CHAN_WARN_ON(chanb, hweight32(subbuf_size) != 1);
+	CHAN_WARN_ON(chanb, hweight32(num_subbuf) != 1);
+
+	ret = subbuffer_id_check_index(config, num_subbuf);
+	if (ret)
+		return ret;
+
+	chanb->priv = priv;
+	chanb->buf_size = num_subbuf * subbuf_size;
+	chanb->subbuf_size = subbuf_size;
+	chanb->buf_size_order = get_count_order(chanb->buf_size);
+	chanb->subbuf_size_order = get_count_order(subbuf_size);
+	chanb->num_subbuf_order = get_count_order(num_subbuf);
+	chanb->extra_reader_sb =
+			(config->mode == RING_BUFFER_OVERWRITE) ? 1 : 0;
+	chanb->num_subbuf = num_subbuf;
+	strlcpy(chanb->name, name, NAME_MAX);
+	chanb->config = config;
+
+	if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+		if (!zalloc_cpumask_var(&chanb->cpumask, GFP_KERNEL))
+			return -ENOMEM;
+	}
+
+	if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+		/* Allocating the buffer per-cpu structures */
+		chanb->buf = alloc_percpu(struct lib_ring_buffer);
+		if (!chanb->buf)
+			goto free_cpumask;
+
+		/*
+		 * In case of non-hotplug cpu, if the ring-buffer is allocated
+		 * in early initcall, it will not be notified of secondary cpus.
+		 * In that off case, we need to allocate for all possible cpus.
+		 */
+#ifdef CONFIG_HOTPLUG_CPU
+		/*
+		 * buf->backend.allocated test takes care of concurrent CPU
+		 * hotplug.
+		 * Priority higher than frontend, so we create the ring buffer
+		 * before we start the timer.
+		 */
+		chanb->cpu_hp_notifier.notifier_call =
+				lib_ring_buffer_cpu_hp_callback;
+		chanb->cpu_hp_notifier.priority = 5;
+		register_hotcpu_notifier(&chanb->cpu_hp_notifier);
+
+		get_online_cpus();
+		for_each_online_cpu(i) {
+			ret = lib_ring_buffer_create(per_cpu_ptr(chanb->buf, i),
+						 chanb, i);
+			if (ret)
+				goto free_bufs;	/* cpu hotplug locked */
+		}
+		put_online_cpus();
+#else
+		for_each_possible_cpu(i) {
+			ret = lib_ring_buffer_create(per_cpu_ptr(chanb->buf, i),
+						 chanb, i);
+			if (ret)
+				goto free_bufs;	/* cpu hotplug locked */
+		}
+#endif
+	} else {
+		chanb->buf = kzalloc(sizeof(struct lib_ring_buffer), GFP_KERNEL);
+		if (!chanb->buf)
+			goto free_cpumask;
+		ret = lib_ring_buffer_create(chanb->buf, chanb, -1);
+		if (ret)
+			goto free_bufs;
+	}
+	chanb->start_tsc = config->cb.ring_buffer_clock_read(chan);
+
+	return 0;
+
+free_bufs:
+	if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+		for_each_possible_cpu(i) {
+			struct lib_ring_buffer *buf = per_cpu_ptr(chanb->buf, i);
+
+			if (!buf->backend.allocated)
+				continue;
+			lib_ring_buffer_free(buf);
+		}
+#ifdef CONFIG_HOTPLUG_CPU
+		put_online_cpus();
+#endif
+		free_percpu(chanb->buf);
+	} else
+		kfree(chanb->buf);
+free_cpumask:
+	if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+		free_cpumask_var(chanb->cpumask);
+	return -ENOMEM;
+}
+
+/**
+ * channel_backend_unregister_notifiers - unregister notifiers
+ * @chan: the channel
+ *
+ * Holds CPU hotplug.
+ */
+void channel_backend_unregister_notifiers(struct channel_backend *chanb)
+{
+	const struct lib_ring_buffer_config *config = chanb->config;
+
+	if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+		unregister_hotcpu_notifier(&chanb->cpu_hp_notifier);
+}
+
+/**
+ * channel_backend_free - destroy the channel
+ * @chan: the channel
+ *
+ * Destroy all channel buffers and frees the channel.
+ */
+void channel_backend_free(struct channel_backend *chanb)
+{
+	const struct lib_ring_buffer_config *config = chanb->config;
+	unsigned int i;
+
+	if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+		for_each_possible_cpu(i) {
+			struct lib_ring_buffer *buf = per_cpu_ptr(chanb->buf, i);
+
+			if (!buf->backend.allocated)
+				continue;
+			lib_ring_buffer_free(buf);
+		}
+		free_cpumask_var(chanb->cpumask);
+		free_percpu(chanb->buf);
+	} else {
+		struct lib_ring_buffer *buf = chanb->buf;
+
+		CHAN_WARN_ON(chanb, !buf->backend.allocated);
+		lib_ring_buffer_free(buf);
+		kfree(buf);
+	}
+}
+
+/**
+ * lib_ring_buffer_write - write data to a ring_buffer buffer.
+ * @bufb : buffer backend
+ * @offset : offset within the buffer
+ * @src : source address
+ * @len : length to write
+ * @pagecpy : page size copied so far
+ */
+void _lib_ring_buffer_write(struct lib_ring_buffer_backend *bufb, size_t offset,
+			    const void *src, size_t len, ssize_t pagecpy)
+{
+	struct channel_backend *chanb = &bufb->chan->backend;
+	const struct lib_ring_buffer_config *config = chanb->config;
+	size_t sbidx, index;
+	struct lib_ring_buffer_backend_pages *rpages;
+	unsigned long sb_bindex, id;
+
+	do {
+		len -= pagecpy;
+		src += pagecpy;
+		offset += pagecpy;
+		sbidx = offset >> chanb->subbuf_size_order;
+		index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+
+		/*
+		 * Underlying layer should never ask for writes across
+		 * subbuffers.
+		 */
+		CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
+
+		pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
+		id = bufb->buf_wsb[sbidx].id;
+		sb_bindex = subbuffer_id_get_index(config, id);
+		rpages = bufb->array[sb_bindex];
+		CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
+			     && subbuffer_id_is_noref(config, id));
+		lib_ring_buffer_do_copy(config,
+					rpages->p[index].virt
+						+ (offset & ~PAGE_MASK),
+					src, pagecpy);
+	} while (unlikely(len != pagecpy));
+}
+EXPORT_SYMBOL_GPL(_lib_ring_buffer_write);
+
+
+/**
+ * lib_ring_buffer_memset - write len bytes of c to a ring_buffer buffer.
+ * @bufb : buffer backend
+ * @offset : offset within the buffer
+ * @c : the byte to write
+ * @len : length to write
+ * @pagecpy : page size copied so far
+ */
+void _lib_ring_buffer_memset(struct lib_ring_buffer_backend *bufb,
+			     size_t offset,
+			     int c, size_t len, ssize_t pagecpy)
+{
+	struct channel_backend *chanb = &bufb->chan->backend;
+	const struct lib_ring_buffer_config *config = chanb->config;
+	size_t sbidx, index;
+	struct lib_ring_buffer_backend_pages *rpages;
+	unsigned long sb_bindex, id;
+
+	do {
+		len -= pagecpy;
+		offset += pagecpy;
+		sbidx = offset >> chanb->subbuf_size_order;
+		index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+
+		/*
+		 * Underlying layer should never ask for writes across
+		 * subbuffers.
+		 */
+		CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
+
+		pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
+		id = bufb->buf_wsb[sbidx].id;
+		sb_bindex = subbuffer_id_get_index(config, id);
+		rpages = bufb->array[sb_bindex];
+		CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
+			     && subbuffer_id_is_noref(config, id));
+		lib_ring_buffer_do_memset(rpages->p[index].virt
+					  + (offset & ~PAGE_MASK),
+					  c, pagecpy);
+	} while (unlikely(len != pagecpy));
+}
+EXPORT_SYMBOL_GPL(_lib_ring_buffer_memset);
+
+
+/**
+ * lib_ring_buffer_copy_from_user - write user data to a ring_buffer buffer.
+ * @bufb : buffer backend
+ * @offset : offset within the buffer
+ * @src : source address
+ * @len : length to write
+ * @pagecpy : page size copied so far
+ *
+ * This function deals with userspace pointers, it should never be called
+ * directly without having the src pointer checked with access_ok()
+ * previously.
+ */
+void _lib_ring_buffer_copy_from_user(struct lib_ring_buffer_backend *bufb,
+				      size_t offset,
+				      const void __user *src, size_t len,
+				      ssize_t pagecpy)
+{
+	struct channel_backend *chanb = &bufb->chan->backend;
+	const struct lib_ring_buffer_config *config = chanb->config;
+	size_t sbidx, index;
+	struct lib_ring_buffer_backend_pages *rpages;
+	unsigned long sb_bindex, id;
+	int ret;
+
+	do {
+		len -= pagecpy;
+		src += pagecpy;
+		offset += pagecpy;
+		sbidx = offset >> chanb->subbuf_size_order;
+		index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+
+		/*
+		 * Underlying layer should never ask for writes across
+		 * subbuffers.
+		 */
+		CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
+
+		pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
+		id = bufb->buf_wsb[sbidx].id;
+		sb_bindex = subbuffer_id_get_index(config, id);
+		rpages = bufb->array[sb_bindex];
+		CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
+				&& subbuffer_id_is_noref(config, id));
+		ret = lib_ring_buffer_do_copy_from_user(rpages->p[index].virt
+							+ (offset & ~PAGE_MASK),
+							src, pagecpy) != 0;
+		if (ret > 0) {
+			offset += (pagecpy - ret);
+			len -= (pagecpy - ret);
+			_lib_ring_buffer_memset(bufb, offset, 0, len, 0);
+			break; /* stop copy */
+		}
+	} while (unlikely(len != pagecpy));
+}
+EXPORT_SYMBOL_GPL(_lib_ring_buffer_copy_from_user);
+
+/**
+ * lib_ring_buffer_read - read data from ring_buffer_buffer.
+ * @bufb : buffer backend
+ * @offset : offset within the buffer
+ * @dest : destination address
+ * @len : length to copy to destination
+ *
+ * Should be protected by get_subbuf/put_subbuf.
+ * Returns the length copied.
+ */
+size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, size_t offset,
+			    void *dest, size_t len)
+{
+	struct channel_backend *chanb = &bufb->chan->backend;
+	const struct lib_ring_buffer_config *config = chanb->config;
+	size_t index;
+	ssize_t pagecpy, orig_len;
+	struct lib_ring_buffer_backend_pages *rpages;
+	unsigned long sb_bindex, id;
+
+	orig_len = len;
+	offset &= chanb->buf_size - 1;
+	index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+	if (unlikely(!len))
+		return 0;
+	for (;;) {
+		pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
+		id = bufb->buf_rsb.id;
+		sb_bindex = subbuffer_id_get_index(config, id);
+		rpages = bufb->array[sb_bindex];
+		CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
+			     && subbuffer_id_is_noref(config, id));
+		memcpy(dest, rpages->p[index].virt + (offset & ~PAGE_MASK),
+		       pagecpy);
+		len -= pagecpy;
+		if (likely(!len))
+			break;
+		dest += pagecpy;
+		offset += pagecpy;
+		index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+		/*
+		 * Underlying layer should never ask for reads across
+		 * subbuffers.
+		 */
+		CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
+	}
+	return orig_len;
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_read);
+
+/**
+ * __lib_ring_buffer_copy_to_user - read data from ring_buffer to userspace
+ * @bufb : buffer backend
+ * @offset : offset within the buffer
+ * @dest : destination userspace address
+ * @len : length to copy to destination
+ *
+ * Should be protected by get_subbuf/put_subbuf.
+ * access_ok() must have been performed on dest addresses prior to call this
+ * function.
+ * Returns -EFAULT on error, 0 if ok.
+ */
+int __lib_ring_buffer_copy_to_user(struct lib_ring_buffer_backend *bufb,
+				   size_t offset, void __user *dest, size_t len)
+{
+	struct channel_backend *chanb = &bufb->chan->backend;
+	const struct lib_ring_buffer_config *config = chanb->config;
+	size_t index;
+	ssize_t pagecpy;
+	struct lib_ring_buffer_backend_pages *rpages;
+	unsigned long sb_bindex, id;
+
+	offset &= chanb->buf_size - 1;
+	index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+	if (unlikely(!len))
+		return 0;
+	for (;;) {
+		pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
+		id = bufb->buf_rsb.id;
+		sb_bindex = subbuffer_id_get_index(config, id);
+		rpages = bufb->array[sb_bindex];
+		CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
+			     && subbuffer_id_is_noref(config, id));
+		if (__copy_to_user(dest,
+			       rpages->p[index].virt + (offset & ~PAGE_MASK),
+			       pagecpy))
+			return -EFAULT;
+		len -= pagecpy;
+		if (likely(!len))
+			break;
+		dest += pagecpy;
+		offset += pagecpy;
+		index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+		/*
+		 * Underlying layer should never ask for reads across
+		 * subbuffers.
+		 */
+		CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
+	}
+	return 0;
+}
+EXPORT_SYMBOL_GPL(__lib_ring_buffer_copy_to_user);
+
+/**
+ * lib_ring_buffer_read_cstr - read a C-style string from ring_buffer.
+ * @bufb : buffer backend
+ * @offset : offset within the buffer
+ * @dest : destination address
+ * @len : destination's length
+ *
+ * return string's length
+ * Should be protected by get_subbuf/put_subbuf.
+ */
+int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb, size_t offset,
+			      void *dest, size_t len)
+{
+	struct channel_backend *chanb = &bufb->chan->backend;
+	const struct lib_ring_buffer_config *config = chanb->config;
+	size_t index;
+	ssize_t pagecpy, pagelen, strpagelen, orig_offset;
+	char *str;
+	struct lib_ring_buffer_backend_pages *rpages;
+	unsigned long sb_bindex, id;
+
+	offset &= chanb->buf_size - 1;
+	index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+	orig_offset = offset;
+	for (;;) {
+		id = bufb->buf_rsb.id;
+		sb_bindex = subbuffer_id_get_index(config, id);
+		rpages = bufb->array[sb_bindex];
+		CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
+			     && subbuffer_id_is_noref(config, id));
+		str = (char *)rpages->p[index].virt + (offset & ~PAGE_MASK);
+		pagelen = PAGE_SIZE - (offset & ~PAGE_MASK);
+		strpagelen = strnlen(str, pagelen);
+		if (len) {
+			pagecpy = min_t(size_t, len, strpagelen);
+			if (dest) {
+				memcpy(dest, str, pagecpy);
+				dest += pagecpy;
+			}
+			len -= pagecpy;
+		}
+		offset += strpagelen;
+		index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+		if (strpagelen < pagelen)
+			break;
+		/*
+		 * Underlying layer should never ask for reads across
+		 * subbuffers.
+		 */
+		CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
+	}
+	if (dest && len)
+		((char *)dest)[0] = 0;
+	return offset - orig_offset;
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_read_cstr);
+
+/**
+ * lib_ring_buffer_read_get_page - Get a whole page to read from
+ * @bufb : buffer backend
+ * @offset : offset within the buffer
+ * @virt : pointer to page address (output)
+ *
+ * Should be protected by get_subbuf/put_subbuf.
+ * Returns the pointer to the page struct pointer.
+ */
+struct page **lib_ring_buffer_read_get_page(struct lib_ring_buffer_backend *bufb,
+					    size_t offset, void ***virt)
+{
+	size_t index;
+	struct lib_ring_buffer_backend_pages *rpages;
+	struct channel_backend *chanb = &bufb->chan->backend;
+	const struct lib_ring_buffer_config *config = chanb->config;
+	unsigned long sb_bindex, id;
+
+	offset &= chanb->buf_size - 1;
+	index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+	id = bufb->buf_rsb.id;
+	sb_bindex = subbuffer_id_get_index(config, id);
+	rpages = bufb->array[sb_bindex];
+	CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
+		     && subbuffer_id_is_noref(config, id));
+	*virt = &rpages->p[index].virt;
+	return &rpages->p[index].page;
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_read_get_page);
+
+/**
+ * lib_ring_buffer_read_offset_address - get address of a buffer location
+ * @bufb : buffer backend
+ * @offset : offset within the buffer.
+ *
+ * Return the address where a given offset is located (for read).
+ * Should be used to get the current subbuffer header pointer. Given we know
+ * it's never on a page boundary, it's safe to write directly to this address,
+ * as long as the write is never bigger than a page size.
+ */
+void *lib_ring_buffer_read_offset_address(struct lib_ring_buffer_backend *bufb,
+					  size_t offset)
+{
+	size_t index;
+	struct lib_ring_buffer_backend_pages *rpages;
+	struct channel_backend *chanb = &bufb->chan->backend;
+	const struct lib_ring_buffer_config *config = chanb->config;
+	unsigned long sb_bindex, id;
+
+	offset &= chanb->buf_size - 1;
+	index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+	id = bufb->buf_rsb.id;
+	sb_bindex = subbuffer_id_get_index(config, id);
+	rpages = bufb->array[sb_bindex];
+	CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
+		     && subbuffer_id_is_noref(config, id));
+	return rpages->p[index].virt + (offset & ~PAGE_MASK);
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_read_offset_address);
+
+/**
+ * lib_ring_buffer_offset_address - get address of a location within the buffer
+ * @bufb : buffer backend
+ * @offset : offset within the buffer.
+ *
+ * Return the address where a given offset is located.
+ * Should be used to get the current subbuffer header pointer. Given we know
+ * it's always at the beginning of a page, it's safe to write directly to this
+ * address, as long as the write is never bigger than a page size.
+ */
+void *lib_ring_buffer_offset_address(struct lib_ring_buffer_backend *bufb,
+				     size_t offset)
+{
+	size_t sbidx, index;
+	struct lib_ring_buffer_backend_pages *rpages;
+	struct channel_backend *chanb = &bufb->chan->backend;
+	const struct lib_ring_buffer_config *config = chanb->config;
+	unsigned long sb_bindex, id;
+
+	offset &= chanb->buf_size - 1;
+	sbidx = offset >> chanb->subbuf_size_order;
+	index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+	id = bufb->buf_wsb[sbidx].id;
+	sb_bindex = subbuffer_id_get_index(config, id);
+	rpages = bufb->array[sb_bindex];
+	CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
+		     && subbuffer_id_is_noref(config, id));
+	return rpages->p[index].virt + (offset & ~PAGE_MASK);
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_offset_address);
diff --git a/drivers/staging/lttng/lib/ringbuffer/ring_buffer_frontend.c b/drivers/staging/lttng/lib/ringbuffer/ring_buffer_frontend.c
new file mode 100644
index 0000000..802f5cd
--- /dev/null
+++ b/drivers/staging/lttng/lib/ringbuffer/ring_buffer_frontend.c
@@ -0,0 +1,1721 @@
+/*
+ * ring_buffer_frontend.c
+ *
+ * (C) Copyright 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * Ring buffer wait-free buffer synchronization. Producer-consumer and flight
+ * recorder (overwrite) modes. See thesis:
+ *
+ * Desnoyers, Mathieu (2009), "Low-Impact Operating System Tracing", Ph.D.
+ * dissertation, Ecole Polytechnique de Montreal.
+ * http://www.lttng.org/pub/thesis/desnoyers-dissertation-2009-12.pdf
+ *
+ * - Algorithm presentation in Chapter 5:
+ *     "Lockless Multi-Core High-Throughput Buffering".
+ * - Algorithm formal verification in Section 8.6:
+ *     "Formal verification of LTTng"
+ *
+ * Author:
+ *	Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * Inspired from LTT and RelayFS:
+ *  Karim Yaghmour <karim at opersys.com>
+ *  Tom Zanussi <zanussi at us.ibm.com>
+ *  Bob Wisniewski <bob at watson.ibm.com>
+ * And from K42 :
+ *  Bob Wisniewski <bob at watson.ibm.com>
+ *
+ * Buffer reader semantic :
+ *
+ * - get_subbuf_size
+ * while buffer is not finalized and empty
+ *   - get_subbuf
+ *     - if return value != 0, continue
+ *   - splice one subbuffer worth of data to a pipe
+ *   - splice the data from pipe to disk/network
+ *   - put_subbuf
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <linux/delay.h>
+#include <linux/module.h>
+#include <linux/percpu.h>
+
+#include "../../wrapper/ringbuffer/config.h"
+#include "../../wrapper/ringbuffer/backend.h"
+#include "../../wrapper/ringbuffer/frontend.h"
+#include "../../wrapper/ringbuffer/iterator.h"
+#include "../../wrapper/ringbuffer/nohz.h"
+
+/*
+ * Internal structure representing offsets to use at a sub-buffer switch.
+ */
+struct switch_offsets {
+	unsigned long begin, end, old;
+	size_t pre_header_padding, size;
+	unsigned int switch_new_start:1, switch_new_end:1, switch_old_start:1,
+		     switch_old_end:1;
+};
+
+#ifdef CONFIG_NO_HZ
+enum tick_nohz_val {
+	TICK_NOHZ_STOP,
+	TICK_NOHZ_FLUSH,
+	TICK_NOHZ_RESTART,
+};
+
+static ATOMIC_NOTIFIER_HEAD(tick_nohz_notifier);
+#endif /* CONFIG_NO_HZ */
+
+static DEFINE_PER_CPU(spinlock_t, ring_buffer_nohz_lock);
+
+DEFINE_PER_CPU(unsigned int, lib_ring_buffer_nesting);
+EXPORT_PER_CPU_SYMBOL(lib_ring_buffer_nesting);
+
+static
+void lib_ring_buffer_print_errors(struct channel *chan,
+				  struct lib_ring_buffer *buf, int cpu);
+
+/*
+ * Must be called under cpu hotplug protection.
+ */
+void lib_ring_buffer_free(struct lib_ring_buffer *buf)
+{
+	struct channel *chan = buf->backend.chan;
+
+	lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu);
+	kfree(buf->commit_hot);
+	kfree(buf->commit_cold);
+
+	lib_ring_buffer_backend_free(&buf->backend);
+}
+
+/**
+ * lib_ring_buffer_reset - Reset ring buffer to initial values.
+ * @buf: Ring buffer.
+ *
+ * Effectively empty the ring buffer. Should be called when the buffer is not
+ * used for writing. The ring buffer can be opened for reading, but the reader
+ * should not be using the iterator concurrently with reset. The previous
+ * current iterator record is reset.
+ */
+void lib_ring_buffer_reset(struct lib_ring_buffer *buf)
+{
+	struct channel *chan = buf->backend.chan;
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	unsigned int i;
+
+	/*
+	 * Reset iterator first. It will put the subbuffer if it currently holds
+	 * it.
+	 */
+	lib_ring_buffer_iterator_reset(buf);
+	v_set(config, &buf->offset, 0);
+	for (i = 0; i < chan->backend.num_subbuf; i++) {
+		v_set(config, &buf->commit_hot[i].cc, 0);
+		v_set(config, &buf->commit_hot[i].seq, 0);
+		v_set(config, &buf->commit_cold[i].cc_sb, 0);
+	}
+	atomic_long_set(&buf->consumed, 0);
+	atomic_set(&buf->record_disabled, 0);
+	v_set(config, &buf->last_tsc, 0);
+	lib_ring_buffer_backend_reset(&buf->backend);
+	/* Don't reset number of active readers */
+	v_set(config, &buf->records_lost_full, 0);
+	v_set(config, &buf->records_lost_wrap, 0);
+	v_set(config, &buf->records_lost_big, 0);
+	v_set(config, &buf->records_count, 0);
+	v_set(config, &buf->records_overrun, 0);
+	buf->finalized = 0;
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_reset);
+
+/**
+ * channel_reset - Reset channel to initial values.
+ * @chan: Channel.
+ *
+ * Effectively empty the channel. Should be called when the channel is not used
+ * for writing. The channel can be opened for reading, but the reader should not
+ * be using the iterator concurrently with reset. The previous current iterator
+ * record is reset.
+ */
+void channel_reset(struct channel *chan)
+{
+	/*
+	 * Reset iterators first. Will put the subbuffer if held for reading.
+	 */
+	channel_iterator_reset(chan);
+	atomic_set(&chan->record_disabled, 0);
+	/* Don't reset commit_count_mask, still valid */
+	channel_backend_reset(&chan->backend);
+	/* Don't reset switch/read timer interval */
+	/* Don't reset notifiers and notifier enable bits */
+	/* Don't reset reader reference count */
+}
+EXPORT_SYMBOL_GPL(channel_reset);
+
+/*
+ * Must be called under cpu hotplug protection.
+ */
+int lib_ring_buffer_create(struct lib_ring_buffer *buf,
+			   struct channel_backend *chanb, int cpu)
+{
+	const struct lib_ring_buffer_config *config = chanb->config;
+	struct channel *chan = container_of(chanb, struct channel, backend);
+	void *priv = chanb->priv;
+	size_t subbuf_header_size;
+	u64 tsc;
+	int ret;
+
+	/* Test for cpu hotplug */
+	if (buf->backend.allocated)
+		return 0;
+
+	/*
+	 * Paranoia: per cpu dynamic allocation is not officially documented as
+	 * zeroing the memory, so let's do it here too, just in case.
+	 */
+	memset(buf, 0, sizeof(*buf));
+
+	ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend, cpu);
+	if (ret)
+		return ret;
+
+	buf->commit_hot =
+		kzalloc_node(ALIGN(sizeof(*buf->commit_hot)
+				   * chan->backend.num_subbuf,
+				   1 << INTERNODE_CACHE_SHIFT),
+			GFP_KERNEL, cpu_to_node(max(cpu, 0)));
+	if (!buf->commit_hot) {
+		ret = -ENOMEM;
+		goto free_chanbuf;
+	}
+
+	buf->commit_cold =
+		kzalloc_node(ALIGN(sizeof(*buf->commit_cold)
+				   * chan->backend.num_subbuf,
+				   1 << INTERNODE_CACHE_SHIFT),
+			GFP_KERNEL, cpu_to_node(max(cpu, 0)));
+	if (!buf->commit_cold) {
+		ret = -ENOMEM;
+		goto free_commit;
+	}
+
+	init_waitqueue_head(&buf->read_wait);
+	init_waitqueue_head(&buf->write_wait);
+	raw_spin_lock_init(&buf->raw_tick_nohz_spinlock);
+
+	/*
+	 * Write the subbuffer header for first subbuffer so we know the total
+	 * duration of data gathering.
+	 */
+	subbuf_header_size = config->cb.subbuffer_header_size();
+	v_set(config, &buf->offset, subbuf_header_size);
+	subbuffer_id_clear_noref(config, &buf->backend.buf_wsb[0].id);
+	tsc = config->cb.ring_buffer_clock_read(buf->backend.chan);
+	config->cb.buffer_begin(buf, tsc, 0);
+	v_add(config, subbuf_header_size, &buf->commit_hot[0].cc);
+
+	if (config->cb.buffer_create) {
+		ret = config->cb.buffer_create(buf, priv, cpu, chanb->name);
+		if (ret)
+			goto free_init;
+	}
+
+	/*
+	 * Ensure the buffer is ready before setting it to allocated and setting
+	 * the cpumask.
+	 * Used for cpu hotplug vs cpumask iteration.
+	 */
+	smp_wmb();
+	buf->backend.allocated = 1;
+
+	if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+		CHAN_WARN_ON(chan, cpumask_test_cpu(cpu,
+			     chan->backend.cpumask));
+		cpumask_set_cpu(cpu, chan->backend.cpumask);
+	}
+
+	return 0;
+
+	/* Error handling */
+free_init:
+	kfree(buf->commit_cold);
+free_commit:
+	kfree(buf->commit_hot);
+free_chanbuf:
+	lib_ring_buffer_backend_free(&buf->backend);
+	return ret;
+}
+
+static void switch_buffer_timer(unsigned long data)
+{
+	struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data;
+	struct channel *chan = buf->backend.chan;
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+
+	/*
+	 * Only flush buffers periodically if readers are active.
+	 */
+	if (atomic_long_read(&buf->active_readers))
+		lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+
+	if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+		mod_timer_pinned(&buf->switch_timer,
+				 jiffies + chan->switch_timer_interval);
+	else
+		mod_timer(&buf->switch_timer,
+			  jiffies + chan->switch_timer_interval);
+}
+
+/*
+ * Called with ring_buffer_nohz_lock held for per-cpu buffers.
+ */
+static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf)
+{
+	struct channel *chan = buf->backend.chan;
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+
+	if (!chan->switch_timer_interval || buf->switch_timer_enabled)
+		return;
+	init_timer(&buf->switch_timer);
+	buf->switch_timer.function = switch_buffer_timer;
+	buf->switch_timer.expires = jiffies + chan->switch_timer_interval;
+	buf->switch_timer.data = (unsigned long)buf;
+	if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+		add_timer_on(&buf->switch_timer, buf->backend.cpu);
+	else
+		add_timer(&buf->switch_timer);
+	buf->switch_timer_enabled = 1;
+}
+
+/*
+ * Called with ring_buffer_nohz_lock held for per-cpu buffers.
+ */
+static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf)
+{
+	struct channel *chan = buf->backend.chan;
+
+	if (!chan->switch_timer_interval || !buf->switch_timer_enabled)
+		return;
+
+	del_timer_sync(&buf->switch_timer);
+	buf->switch_timer_enabled = 0;
+}
+
+/*
+ * Polling timer to check the channels for data.
+ */
+static void read_buffer_timer(unsigned long data)
+{
+	struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data;
+	struct channel *chan = buf->backend.chan;
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+
+	CHAN_WARN_ON(chan, !buf->backend.allocated);
+
+	if (atomic_long_read(&buf->active_readers)
+	    && lib_ring_buffer_poll_deliver(config, buf, chan)) {
+		wake_up_interruptible(&buf->read_wait);
+		wake_up_interruptible(&chan->read_wait);
+	}
+
+	if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+		mod_timer_pinned(&buf->read_timer,
+				 jiffies + chan->read_timer_interval);
+	else
+		mod_timer(&buf->read_timer,
+			  jiffies + chan->read_timer_interval);
+}
+
+/*
+ * Called with ring_buffer_nohz_lock held for per-cpu buffers.
+ */
+static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf)
+{
+	struct channel *chan = buf->backend.chan;
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+
+	if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
+	    || !chan->read_timer_interval
+	    || buf->read_timer_enabled)
+		return;
+
+	init_timer(&buf->read_timer);
+	buf->read_timer.function = read_buffer_timer;
+	buf->read_timer.expires = jiffies + chan->read_timer_interval;
+	buf->read_timer.data = (unsigned long)buf;
+
+	if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+		add_timer_on(&buf->read_timer, buf->backend.cpu);
+	else
+		add_timer(&buf->read_timer);
+	buf->read_timer_enabled = 1;
+}
+
+/*
+ * Called with ring_buffer_nohz_lock held for per-cpu buffers.
+ */
+static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf)
+{
+	struct channel *chan = buf->backend.chan;
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+
+	if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
+	    || !chan->read_timer_interval
+	    || !buf->read_timer_enabled)
+		return;
+
+	del_timer_sync(&buf->read_timer);
+	/*
+	 * do one more check to catch data that has been written in the last
+	 * timer period.
+	 */
+	if (lib_ring_buffer_poll_deliver(config, buf, chan)) {
+		wake_up_interruptible(&buf->read_wait);
+		wake_up_interruptible(&chan->read_wait);
+	}
+	buf->read_timer_enabled = 0;
+}
+
+#ifdef CONFIG_HOTPLUG_CPU
+/**
+ *	lib_ring_buffer_cpu_hp_callback - CPU hotplug callback
+ *	@nb: notifier block
+ *	@action: hotplug action to take
+ *	@hcpu: CPU number
+ *
+ *	Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
+ */
+static
+int __cpuinit lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb,
+					      unsigned long action,
+					      void *hcpu)
+{
+	unsigned int cpu = (unsigned long)hcpu;
+	struct channel *chan = container_of(nb, struct channel,
+					    cpu_hp_notifier);
+	struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+
+	if (!chan->cpu_hp_enable)
+		return NOTIFY_DONE;
+
+	CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
+
+	switch (action) {
+	case CPU_DOWN_FAILED:
+	case CPU_DOWN_FAILED_FROZEN:
+	case CPU_ONLINE:
+	case CPU_ONLINE_FROZEN:
+		wake_up_interruptible(&chan->hp_wait);
+		lib_ring_buffer_start_switch_timer(buf);
+		lib_ring_buffer_start_read_timer(buf);
+		return NOTIFY_OK;
+
+	case CPU_DOWN_PREPARE:
+	case CPU_DOWN_PREPARE_FROZEN:
+		lib_ring_buffer_stop_switch_timer(buf);
+		lib_ring_buffer_stop_read_timer(buf);
+		return NOTIFY_OK;
+
+	case CPU_DEAD:
+	case CPU_DEAD_FROZEN:
+		/*
+		 * Performing a buffer switch on a remote CPU. Performed by
+		 * the CPU responsible for doing the hotunplug after the target
+		 * CPU stopped running completely. Ensures that all data
+		 * from that remote CPU is flushed.
+		 */
+		lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+		return NOTIFY_OK;
+
+	default:
+		return NOTIFY_DONE;
+	}
+}
+#endif
+
+#if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER)
+/*
+ * For per-cpu buffers, call the reader wakeups before switching the buffer, so
+ * that wake-up-tracing generated events are flushed before going idle (in
+ * tick_nohz). We test if the spinlock is locked to deal with the race where
+ * readers try to sample the ring buffer before we perform the switch. We let
+ * the readers retry in that case. If there is data in the buffer, the wake up
+ * is going to forbid the CPU running the reader thread from going idle.
+ */
+static int notrace ring_buffer_tick_nohz_callback(struct notifier_block *nb,
+						  unsigned long val,
+						  void *data)
+{
+	struct channel *chan = container_of(nb, struct channel,
+					    tick_nohz_notifier);
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	struct lib_ring_buffer *buf;
+	int cpu = smp_processor_id();
+
+	if (config->alloc != RING_BUFFER_ALLOC_PER_CPU) {
+		/*
+		 * We don't support keeping the system idle with global buffers
+		 * and streaming active. In order to do so, we would need to
+		 * sample a non-nohz-cpumask racelessly with the nohz updates
+		 * without adding synchronization overhead to nohz. Leave this
+		 * use-case out for now.
+		 */
+		return 0;
+	}
+
+	buf = channel_get_ring_buffer(config, chan, cpu);
+	switch (val) {
+	case TICK_NOHZ_FLUSH:
+		raw_spin_lock(&buf->raw_tick_nohz_spinlock);
+		if (config->wakeup == RING_BUFFER_WAKEUP_BY_TIMER
+		    && chan->read_timer_interval
+		    && atomic_long_read(&buf->active_readers)
+		    && (lib_ring_buffer_poll_deliver(config, buf, chan)
+			|| lib_ring_buffer_pending_data(config, buf, chan))) {
+			wake_up_interruptible(&buf->read_wait);
+			wake_up_interruptible(&chan->read_wait);
+		}
+		if (chan->switch_timer_interval)
+			lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+		raw_spin_unlock(&buf->raw_tick_nohz_spinlock);
+		break;
+	case TICK_NOHZ_STOP:
+		spin_lock(&__get_cpu_var(ring_buffer_nohz_lock));
+		lib_ring_buffer_stop_switch_timer(buf);
+		lib_ring_buffer_stop_read_timer(buf);
+		spin_unlock(&__get_cpu_var(ring_buffer_nohz_lock));
+		break;
+	case TICK_NOHZ_RESTART:
+		spin_lock(&__get_cpu_var(ring_buffer_nohz_lock));
+		lib_ring_buffer_start_read_timer(buf);
+		lib_ring_buffer_start_switch_timer(buf);
+		spin_unlock(&__get_cpu_var(ring_buffer_nohz_lock));
+		break;
+	}
+
+	return 0;
+}
+
+void notrace lib_ring_buffer_tick_nohz_flush(void)
+{
+	atomic_notifier_call_chain(&tick_nohz_notifier, TICK_NOHZ_FLUSH,
+				   NULL);
+}
+
+void notrace lib_ring_buffer_tick_nohz_stop(void)
+{
+	atomic_notifier_call_chain(&tick_nohz_notifier, TICK_NOHZ_STOP,
+				   NULL);
+}
+
+void notrace lib_ring_buffer_tick_nohz_restart(void)
+{
+	atomic_notifier_call_chain(&tick_nohz_notifier, TICK_NOHZ_RESTART,
+				   NULL);
+}
+#endif /* defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) */
+
+/*
+ * Holds CPU hotplug.
+ */
+static void channel_unregister_notifiers(struct channel *chan)
+{
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	int cpu;
+
+	channel_iterator_unregister_notifiers(chan);
+	if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+#ifdef CONFIG_NO_HZ
+		/*
+		 * Remove the nohz notifier first, so we are certain we stop
+		 * the timers.
+		 */
+		atomic_notifier_chain_unregister(&tick_nohz_notifier,
+						 &chan->tick_nohz_notifier);
+		/*
+		 * ring_buffer_nohz_lock will not be needed below, because
+		 * we just removed the notifiers, which were the only source of
+		 * concurrency.
+		 */
+#endif /* CONFIG_NO_HZ */
+#ifdef CONFIG_HOTPLUG_CPU
+		get_online_cpus();
+		chan->cpu_hp_enable = 0;
+		for_each_online_cpu(cpu) {
+			struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+							      cpu);
+			lib_ring_buffer_stop_switch_timer(buf);
+			lib_ring_buffer_stop_read_timer(buf);
+		}
+		put_online_cpus();
+		unregister_cpu_notifier(&chan->cpu_hp_notifier);
+#else
+		for_each_possible_cpu(cpu) {
+			struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+							      cpu);
+			lib_ring_buffer_stop_switch_timer(buf);
+			lib_ring_buffer_stop_read_timer(buf);
+		}
+#endif
+	} else {
+		struct lib_ring_buffer *buf = chan->backend.buf;
+
+		lib_ring_buffer_stop_switch_timer(buf);
+		lib_ring_buffer_stop_read_timer(buf);
+	}
+	channel_backend_unregister_notifiers(&chan->backend);
+}
+
+static void channel_free(struct channel *chan)
+{
+	channel_iterator_free(chan);
+	channel_backend_free(&chan->backend);
+	kfree(chan);
+}
+
+/**
+ * channel_create - Create channel.
+ * @config: ring buffer instance configuration
+ * @name: name of the channel
+ * @priv: ring buffer client private data
+ * @buf_addr: pointer the the beginning of the preallocated buffer contiguous
+ *            address mapping. It is used only by RING_BUFFER_STATIC
+ *            configuration. It can be set to NULL for other backends.
+ * @subbuf_size: subbuffer size
+ * @num_subbuf: number of subbuffers
+ * @switch_timer_interval: Time interval (in us) to fill sub-buffers with
+ *                         padding to let readers get those sub-buffers.
+ *                         Used for live streaming.
+ * @read_timer_interval: Time interval (in us) to wake up pending readers.
+ *
+ * Holds cpu hotplug.
+ * Returns NULL on failure.
+ */
+struct channel *channel_create(const struct lib_ring_buffer_config *config,
+		   const char *name, void *priv, void *buf_addr,
+		   size_t subbuf_size,
+		   size_t num_subbuf, unsigned int switch_timer_interval,
+		   unsigned int read_timer_interval)
+{
+	int ret, cpu;
+	struct channel *chan;
+
+	if (lib_ring_buffer_check_config(config, switch_timer_interval,
+					 read_timer_interval))
+		return NULL;
+
+	chan = kzalloc(sizeof(struct channel), GFP_KERNEL);
+	if (!chan)
+		return NULL;
+
+	ret = channel_backend_init(&chan->backend, name, config, priv,
+				   subbuf_size, num_subbuf);
+	if (ret)
+		goto error;
+
+	ret = channel_iterator_init(chan);
+	if (ret)
+		goto error_free_backend;
+
+	chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order);
+	chan->switch_timer_interval = usecs_to_jiffies(switch_timer_interval);
+	chan->read_timer_interval = usecs_to_jiffies(read_timer_interval);
+	kref_init(&chan->ref);
+	init_waitqueue_head(&chan->read_wait);
+	init_waitqueue_head(&chan->hp_wait);
+
+	if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+#if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER)
+		/* Only benefit from NO_HZ idle with per-cpu buffers for now. */
+		chan->tick_nohz_notifier.notifier_call =
+			ring_buffer_tick_nohz_callback;
+		chan->tick_nohz_notifier.priority = ~0U;
+		atomic_notifier_chain_register(&tick_nohz_notifier,
+				       &chan->tick_nohz_notifier);
+#endif /* defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) */
+
+		/*
+		 * In case of non-hotplug cpu, if the ring-buffer is allocated
+		 * in early initcall, it will not be notified of secondary cpus.
+		 * In that off case, we need to allocate for all possible cpus.
+		 */
+#ifdef CONFIG_HOTPLUG_CPU
+		chan->cpu_hp_notifier.notifier_call =
+				lib_ring_buffer_cpu_hp_callback;
+		chan->cpu_hp_notifier.priority = 6;
+		register_cpu_notifier(&chan->cpu_hp_notifier);
+
+		get_online_cpus();
+		for_each_online_cpu(cpu) {
+			struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+							       cpu);
+			spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu));
+			lib_ring_buffer_start_switch_timer(buf);
+			lib_ring_buffer_start_read_timer(buf);
+			spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu));
+		}
+		chan->cpu_hp_enable = 1;
+		put_online_cpus();
+#else
+		for_each_possible_cpu(cpu) {
+			struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+							      cpu);
+			spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu));
+			lib_ring_buffer_start_switch_timer(buf);
+			lib_ring_buffer_start_read_timer(buf);
+			spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu));
+		}
+#endif
+	} else {
+		struct lib_ring_buffer *buf = chan->backend.buf;
+
+		lib_ring_buffer_start_switch_timer(buf);
+		lib_ring_buffer_start_read_timer(buf);
+	}
+
+	return chan;
+
+error_free_backend:
+	channel_backend_free(&chan->backend);
+error:
+	kfree(chan);
+	return NULL;
+}
+EXPORT_SYMBOL_GPL(channel_create);
+
+static
+void channel_release(struct kref *kref)
+{
+	struct channel *chan = container_of(kref, struct channel, ref);
+	channel_free(chan);
+}
+
+/**
+ * channel_destroy - Finalize, wait for q.s. and destroy channel.
+ * @chan: channel to destroy
+ *
+ * Holds cpu hotplug.
+ * Call "destroy" callback, finalize channels, and then decrement the
+ * channel reference count.  Note that when readers have completed data
+ * consumption of finalized channels, get_subbuf() will return -ENODATA.
+ * They should release their handle at that point.  Returns the private
+ * data pointer.
+ */
+void *channel_destroy(struct channel *chan)
+{
+	int cpu;
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	void *priv;
+
+	channel_unregister_notifiers(chan);
+
+	if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+		/*
+		 * No need to hold cpu hotplug, because all notifiers have been
+		 * unregistered.
+		 */
+		for_each_channel_cpu(cpu, chan) {
+			struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+							      cpu);
+
+			if (config->cb.buffer_finalize)
+				config->cb.buffer_finalize(buf,
+							   chan->backend.priv,
+							   cpu);
+			if (buf->backend.allocated)
+				lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
+			/*
+			 * Perform flush before writing to finalized.
+			 */
+			smp_wmb();
+			ACCESS_ONCE(buf->finalized) = 1;
+			wake_up_interruptible(&buf->read_wait);
+		}
+	} else {
+		struct lib_ring_buffer *buf = chan->backend.buf;
+
+		if (config->cb.buffer_finalize)
+			config->cb.buffer_finalize(buf, chan->backend.priv, -1);
+		if (buf->backend.allocated)
+			lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
+		/*
+		 * Perform flush before writing to finalized.
+		 */
+		smp_wmb();
+		ACCESS_ONCE(buf->finalized) = 1;
+		wake_up_interruptible(&buf->read_wait);
+	}
+	ACCESS_ONCE(chan->finalized) = 1;
+	wake_up_interruptible(&chan->hp_wait);
+	wake_up_interruptible(&chan->read_wait);
+	priv = chan->backend.priv;
+	kref_put(&chan->ref, channel_release);
+	return priv;
+}
+EXPORT_SYMBOL_GPL(channel_destroy);
+
+struct lib_ring_buffer *channel_get_ring_buffer(
+					const struct lib_ring_buffer_config *config,
+					struct channel *chan, int cpu)
+{
+	if (config->alloc == RING_BUFFER_ALLOC_GLOBAL)
+		return chan->backend.buf;
+	else
+		return per_cpu_ptr(chan->backend.buf, cpu);
+}
+EXPORT_SYMBOL_GPL(channel_get_ring_buffer);
+
+int lib_ring_buffer_open_read(struct lib_ring_buffer *buf)
+{
+	struct channel *chan = buf->backend.chan;
+
+	if (!atomic_long_add_unless(&buf->active_readers, 1, 1))
+		return -EBUSY;
+	kref_get(&chan->ref);
+	smp_mb__after_atomic_inc();
+	return 0;
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_open_read);
+
+void lib_ring_buffer_release_read(struct lib_ring_buffer *buf)
+{
+	struct channel *chan = buf->backend.chan;
+
+	CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1);
+	smp_mb__before_atomic_dec();
+	atomic_long_dec(&buf->active_readers);
+	kref_put(&chan->ref, channel_release);
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_release_read);
+
+/*
+ * Promote compiler barrier to a smp_mb().
+ * For the specific ring buffer case, this IPI call should be removed if the
+ * architecture does not reorder writes.  This should eventually be provided by
+ * a separate architecture-specific infrastructure.
+ */
+static void remote_mb(void *info)
+{
+	smp_mb();
+}
+
+/**
+ * lib_ring_buffer_snapshot - save subbuffer position snapshot (for read)
+ * @buf: ring buffer
+ * @consumed: consumed count indicating the position where to read
+ * @produced: produced count, indicates position when to stop reading
+ *
+ * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
+ * data to read at consumed position, or 0 if the get operation succeeds.
+ * Busy-loop trying to get data if the tick_nohz sequence lock is held.
+ */
+
+int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf,
+			     unsigned long *consumed, unsigned long *produced)
+{
+	struct channel *chan = buf->backend.chan;
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	unsigned long consumed_cur, write_offset;
+	int finalized;
+
+retry:
+	finalized = ACCESS_ONCE(buf->finalized);
+	/*
+	 * Read finalized before counters.
+	 */
+	smp_rmb();
+	consumed_cur = atomic_long_read(&buf->consumed);
+	/*
+	 * No need to issue a memory barrier between consumed count read and
+	 * write offset read, because consumed count can only change
+	 * concurrently in overwrite mode, and we keep a sequence counter
+	 * identifier derived from the write offset to check we are getting
+	 * the same sub-buffer we are expecting (the sub-buffers are atomically
+	 * "tagged" upon writes, tags are checked upon read).
+	 */
+	write_offset = v_read(config, &buf->offset);
+
+	/*
+	 * Check that we are not about to read the same subbuffer in
+	 * which the writer head is.
+	 */
+	if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
+	    == 0)
+		goto nodata;
+
+	*consumed = consumed_cur;
+	*produced = subbuf_trunc(write_offset, chan);
+
+	return 0;
+
+nodata:
+	/*
+	 * The memory barriers __wait_event()/wake_up_interruptible() take care
+	 * of "raw_spin_is_locked" memory ordering.
+	 */
+	if (finalized)
+		return -ENODATA;
+	else if (raw_spin_is_locked(&buf->raw_tick_nohz_spinlock))
+		goto retry;
+	else
+		return -EAGAIN;
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_snapshot);
+
+/**
+ * lib_ring_buffer_put_snapshot - move consumed counter forward
+ *
+ * Should only be called from consumer context.
+ * @buf: ring buffer
+ * @consumed_new: new consumed count value
+ */
+void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf,
+				   unsigned long consumed_new)
+{
+	struct lib_ring_buffer_backend *bufb = &buf->backend;
+	struct channel *chan = bufb->chan;
+	unsigned long consumed;
+
+	CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1);
+
+	/*
+	 * Only push the consumed value forward.
+	 * If the consumed cmpxchg fails, this is because we have been pushed by
+	 * the writer in flight recorder mode.
+	 */
+	consumed = atomic_long_read(&buf->consumed);
+	while ((long) consumed - (long) consumed_new < 0)
+		consumed = atomic_long_cmpxchg(&buf->consumed, consumed,
+					       consumed_new);
+	/* Wake-up the metadata producer */
+	wake_up_interruptible(&buf->write_wait);
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_move_consumer);
+
+/**
+ * lib_ring_buffer_get_subbuf - get exclusive access to subbuffer for reading
+ * @buf: ring buffer
+ * @consumed: consumed count indicating the position where to read
+ *
+ * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no
+ * data to read at consumed position, or 0 if the get operation succeeds.
+ * Busy-loop trying to get data if the tick_nohz sequence lock is held.
+ */
+int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf,
+			       unsigned long consumed)
+{
+	struct channel *chan = buf->backend.chan;
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	unsigned long consumed_cur, consumed_idx, commit_count, write_offset;
+	int ret;
+	int finalized;
+
+retry:
+	finalized = ACCESS_ONCE(buf->finalized);
+	/*
+	 * Read finalized before counters.
+	 */
+	smp_rmb();
+	consumed_cur = atomic_long_read(&buf->consumed);
+	consumed_idx = subbuf_index(consumed, chan);
+	commit_count = v_read(config, &buf->commit_cold[consumed_idx].cc_sb);
+	/*
+	 * Make sure we read the commit count before reading the buffer
+	 * data and the write offset. Correct consumed offset ordering
+	 * wrt commit count is insured by the use of cmpxchg to update
+	 * the consumed offset.
+	 * smp_call_function_single can fail if the remote CPU is offline,
+	 * this is OK because then there is no wmb to execute there.
+	 * If our thread is executing on the same CPU as the on the buffers
+	 * belongs to, we don't have to synchronize it at all. If we are
+	 * migrated, the scheduler will take care of the memory barriers.
+	 * Normally, smp_call_function_single() should ensure program order when
+	 * executing the remote function, which implies that it surrounds the
+	 * function execution with :
+	 * smp_mb()
+	 * send IPI
+	 * csd_lock_wait
+	 *                recv IPI
+	 *                smp_mb()
+	 *                exec. function
+	 *                smp_mb()
+	 *                csd unlock
+	 * smp_mb()
+	 *
+	 * However, smp_call_function_single() does not seem to clearly execute
+	 * such barriers. It depends on spinlock semantic to provide the barrier
+	 * before executing the IPI and, when busy-looping, csd_lock_wait only
+	 * executes smp_mb() when it has to wait for the other CPU.
+	 *
+	 * I don't trust this code. Therefore, let's add the smp_mb() sequence
+	 * required ourself, even if duplicated. It has no performance impact
+	 * anyway.
+	 *
+	 * smp_mb() is needed because smp_rmb() and smp_wmb() only order read vs
+	 * read and write vs write. They do not ensure core synchronization. We
+	 * really have to ensure total order between the 3 barriers running on
+	 * the 2 CPUs.
+	 */
+	if (config->ipi == RING_BUFFER_IPI_BARRIER) {
+		if (config->sync == RING_BUFFER_SYNC_PER_CPU
+		    && config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+			if (raw_smp_processor_id() != buf->backend.cpu) {
+				/* Total order with IPI handler smp_mb() */
+				smp_mb();
+				smp_call_function_single(buf->backend.cpu,
+							 remote_mb, NULL, 1);
+				/* Total order with IPI handler smp_mb() */
+				smp_mb();
+			}
+		} else {
+			/* Total order with IPI handler smp_mb() */
+			smp_mb();
+			smp_call_function(remote_mb, NULL, 1);
+			/* Total order with IPI handler smp_mb() */
+			smp_mb();
+		}
+	} else {
+		/*
+		 * Local rmb to match the remote wmb to read the commit count
+		 * before the buffer data and the write offset.
+		 */
+		smp_rmb();
+	}
+
+	write_offset = v_read(config, &buf->offset);
+
+	/*
+	 * Check that the buffer we are getting is after or at consumed_cur
+	 * position.
+	 */
+	if ((long) subbuf_trunc(consumed, chan)
+	    - (long) subbuf_trunc(consumed_cur, chan) < 0)
+		goto nodata;
+
+	/*
+	 * Check that the subbuffer we are trying to consume has been
+	 * already fully committed.
+	 */
+	if (((commit_count - chan->backend.subbuf_size)
+	     & chan->commit_count_mask)
+	    - (buf_trunc(consumed_cur, chan)
+	       >> chan->backend.num_subbuf_order)
+	    != 0)
+		goto nodata;
+
+	/*
+	 * Check that we are not about to read the same subbuffer in
+	 * which the writer head is.
+	 */
+	if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
+	    == 0)
+		goto nodata;
+
+	/*
+	 * Failure to get the subbuffer causes a busy-loop retry without going
+	 * to a wait queue. These are caused by short-lived race windows where
+	 * the writer is getting access to a subbuffer we were trying to get
+	 * access to. Also checks that the "consumed" buffer count we are
+	 * looking for matches the one contained in the subbuffer id.
+	 */
+	ret = update_read_sb_index(config, &buf->backend, &chan->backend,
+				   consumed_idx, buf_trunc_val(consumed, chan));
+	if (ret)
+		goto retry;
+	subbuffer_id_clear_noref(config, &buf->backend.buf_rsb.id);
+
+	buf->get_subbuf_consumed = consumed;
+	buf->get_subbuf = 1;
+
+	return 0;
+
+nodata:
+	/*
+	 * The memory barriers __wait_event()/wake_up_interruptible() take care
+	 * of "raw_spin_is_locked" memory ordering.
+	 */
+	if (finalized)
+		return -ENODATA;
+	else if (raw_spin_is_locked(&buf->raw_tick_nohz_spinlock))
+		goto retry;
+	else
+		return -EAGAIN;
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_get_subbuf);
+
+/**
+ * lib_ring_buffer_put_subbuf - release exclusive subbuffer access
+ * @buf: ring buffer
+ */
+void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf)
+{
+	struct lib_ring_buffer_backend *bufb = &buf->backend;
+	struct channel *chan = bufb->chan;
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	unsigned long read_sb_bindex, consumed_idx, consumed;
+
+	CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1);
+
+	if (!buf->get_subbuf) {
+		/*
+		 * Reader puts a subbuffer it did not get.
+		 */
+		CHAN_WARN_ON(chan, 1);
+		return;
+	}
+	consumed = buf->get_subbuf_consumed;
+	buf->get_subbuf = 0;
+
+	/*
+	 * Clear the records_unread counter. (overruns counter)
+	 * Can still be non-zero if a file reader simply grabbed the data
+	 * without using iterators.
+	 * Can be below zero if an iterator is used on a snapshot more than
+	 * once.
+	 */
+	read_sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id);
+	v_add(config, v_read(config,
+			     &bufb->array[read_sb_bindex]->records_unread),
+	      &bufb->records_read);
+	v_set(config, &bufb->array[read_sb_bindex]->records_unread, 0);
+	CHAN_WARN_ON(chan, config->mode == RING_BUFFER_OVERWRITE
+		     && subbuffer_id_is_noref(config, bufb->buf_rsb.id));
+	subbuffer_id_set_noref(config, &bufb->buf_rsb.id);
+
+	/*
+	 * Exchange the reader subbuffer with the one we put in its place in the
+	 * writer subbuffer table. Expect the original consumed count. If
+	 * update_read_sb_index fails, this is because the writer updated the
+	 * subbuffer concurrently. We should therefore keep the subbuffer we
+	 * currently have: it has become invalid to try reading this sub-buffer
+	 * consumed count value anyway.
+	 */
+	consumed_idx = subbuf_index(consumed, chan);
+	update_read_sb_index(config, &buf->backend, &chan->backend,
+			     consumed_idx, buf_trunc_val(consumed, chan));
+	/*
+	 * update_read_sb_index return value ignored. Don't exchange sub-buffer
+	 * if the writer concurrently updated it.
+	 */
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_put_subbuf);
+
+/*
+ * cons_offset is an iterator on all subbuffer offsets between the reader
+ * position and the writer position. (inclusive)
+ */
+static
+void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf,
+					    struct channel *chan,
+					    unsigned long cons_offset,
+					    int cpu)
+{
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	unsigned long cons_idx, commit_count, commit_count_sb;
+
+	cons_idx = subbuf_index(cons_offset, chan);
+	commit_count = v_read(config, &buf->commit_hot[cons_idx].cc);
+	commit_count_sb = v_read(config, &buf->commit_cold[cons_idx].cc_sb);
+
+	if (subbuf_offset(commit_count, chan) != 0)
+		printk(KERN_WARNING
+		       "ring buffer %s, cpu %d: "
+		       "commit count in subbuffer %lu,\n"
+		       "expecting multiples of %lu bytes\n"
+		       "  [ %lu bytes committed, %lu bytes reader-visible ]\n",
+		       chan->backend.name, cpu, cons_idx,
+		       chan->backend.subbuf_size,
+		       commit_count, commit_count_sb);
+
+	printk(KERN_DEBUG "ring buffer: %s, cpu %d: %lu bytes committed\n",
+	       chan->backend.name, cpu, commit_count);
+}
+
+static
+void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf,
+					 struct channel *chan,
+					 void *priv, int cpu)
+{
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	unsigned long write_offset, cons_offset;
+
+	/*
+	 * Can be called in the error path of allocation when
+	 * trans_channel_data is not yet set.
+	 */
+	if (!chan)
+		return;
+	/*
+	 * No need to order commit_count, write_offset and cons_offset reads
+	 * because we execute at teardown when no more writer nor reader
+	 * references are left.
+	 */
+	write_offset = v_read(config, &buf->offset);
+	cons_offset = atomic_long_read(&buf->consumed);
+	if (write_offset != cons_offset)
+		printk(KERN_DEBUG
+		       "ring buffer %s, cpu %d: "
+		       "non-consumed data\n"
+		       "  [ %lu bytes written, %lu bytes read ]\n",
+		       chan->backend.name, cpu, write_offset, cons_offset);
+
+	for (cons_offset = atomic_long_read(&buf->consumed);
+	     (long) (subbuf_trunc((unsigned long) v_read(config, &buf->offset),
+				  chan)
+		     - cons_offset) > 0;
+	     cons_offset = subbuf_align(cons_offset, chan))
+		lib_ring_buffer_print_subbuffer_errors(buf, chan, cons_offset,
+						       cpu);
+}
+
+static
+void lib_ring_buffer_print_errors(struct channel *chan,
+				  struct lib_ring_buffer *buf, int cpu)
+{
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	void *priv = chan->backend.priv;
+
+	printk(KERN_DEBUG "ring buffer %s, cpu %d: %lu records written, "
+			  "%lu records overrun\n",
+			  chan->backend.name, cpu,
+			  v_read(config, &buf->records_count),
+			  v_read(config, &buf->records_overrun));
+
+	if (v_read(config, &buf->records_lost_full)
+	    || v_read(config, &buf->records_lost_wrap)
+	    || v_read(config, &buf->records_lost_big))
+		printk(KERN_WARNING
+		       "ring buffer %s, cpu %d: records were lost. Caused by:\n"
+		       "  [ %lu buffer full, %lu nest buffer wrap-around, "
+		       "%lu event too big ]\n",
+		       chan->backend.name, cpu,
+		       v_read(config, &buf->records_lost_full),
+		       v_read(config, &buf->records_lost_wrap),
+		       v_read(config, &buf->records_lost_big));
+
+	lib_ring_buffer_print_buffer_errors(buf, chan, priv, cpu);
+}
+
+/*
+ * lib_ring_buffer_switch_old_start: Populate old subbuffer header.
+ *
+ * Only executed when the buffer is finalized, in SWITCH_FLUSH.
+ */
+static
+void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf,
+				      struct channel *chan,
+				      struct switch_offsets *offsets,
+				      u64 tsc)
+{
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	unsigned long oldidx = subbuf_index(offsets->old, chan);
+	unsigned long commit_count;
+
+	config->cb.buffer_begin(buf, tsc, oldidx);
+
+	/*
+	 * Order all writes to buffer before the commit count update that will
+	 * determine that the subbuffer is full.
+	 */
+	if (config->ipi == RING_BUFFER_IPI_BARRIER) {
+		/*
+		 * Must write slot data before incrementing commit count.  This
+		 * compiler barrier is upgraded into a smp_mb() by the IPI sent
+		 * by get_subbuf().
+		 */
+		barrier();
+	} else
+		smp_wmb();
+	v_add(config, config->cb.subbuffer_header_size(),
+	      &buf->commit_hot[oldidx].cc);
+	commit_count = v_read(config, &buf->commit_hot[oldidx].cc);
+	/* Check if the written buffer has to be delivered */
+	lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
+				      commit_count, oldidx);
+	lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
+					     offsets->old, commit_count,
+					     config->cb.subbuffer_header_size());
+}
+
+/*
+ * lib_ring_buffer_switch_old_end: switch old subbuffer
+ *
+ * Note : offset_old should never be 0 here. It is ok, because we never perform
+ * buffer switch on an empty subbuffer in SWITCH_ACTIVE mode. The caller
+ * increments the offset_old value when doing a SWITCH_FLUSH on an empty
+ * subbuffer.
+ */
+static
+void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf,
+				    struct channel *chan,
+				    struct switch_offsets *offsets,
+				    u64 tsc)
+{
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	unsigned long oldidx = subbuf_index(offsets->old - 1, chan);
+	unsigned long commit_count, padding_size, data_size;
+
+	data_size = subbuf_offset(offsets->old - 1, chan) + 1;
+	padding_size = chan->backend.subbuf_size - data_size;
+	subbuffer_set_data_size(config, &buf->backend, oldidx, data_size);
+
+	/*
+	 * Order all writes to buffer before the commit count update that will
+	 * determine that the subbuffer is full.
+	 */
+	if (config->ipi == RING_BUFFER_IPI_BARRIER) {
+		/*
+		 * Must write slot data before incrementing commit count.  This
+		 * compiler barrier is upgraded into a smp_mb() by the IPI sent
+		 * by get_subbuf().
+		 */
+		barrier();
+	} else
+		smp_wmb();
+	v_add(config, padding_size, &buf->commit_hot[oldidx].cc);
+	commit_count = v_read(config, &buf->commit_hot[oldidx].cc);
+	lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
+				      commit_count, oldidx);
+	lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
+					     offsets->old, commit_count,
+					     padding_size);
+}
+
+/*
+ * lib_ring_buffer_switch_new_start: Populate new subbuffer.
+ *
+ * This code can be executed unordered : writers may already have written to the
+ * sub-buffer before this code gets executed, caution.  The commit makes sure
+ * that this code is executed before the deliver of this sub-buffer.
+ */
+static
+void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf,
+				      struct channel *chan,
+				      struct switch_offsets *offsets,
+				      u64 tsc)
+{
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	unsigned long beginidx = subbuf_index(offsets->begin, chan);
+	unsigned long commit_count;
+
+	config->cb.buffer_begin(buf, tsc, beginidx);
+
+	/*
+	 * Order all writes to buffer before the commit count update that will
+	 * determine that the subbuffer is full.
+	 */
+	if (config->ipi == RING_BUFFER_IPI_BARRIER) {
+		/*
+		 * Must write slot data before incrementing commit count.  This
+		 * compiler barrier is upgraded into a smp_mb() by the IPI sent
+		 * by get_subbuf().
+		 */
+		barrier();
+	} else
+		smp_wmb();
+	v_add(config, config->cb.subbuffer_header_size(),
+	      &buf->commit_hot[beginidx].cc);
+	commit_count = v_read(config, &buf->commit_hot[beginidx].cc);
+	/* Check if the written buffer has to be delivered */
+	lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
+				      commit_count, beginidx);
+	lib_ring_buffer_write_commit_counter(config, buf, chan, beginidx,
+					     offsets->begin, commit_count,
+					     config->cb.subbuffer_header_size());
+}
+
+/*
+ * lib_ring_buffer_switch_new_end: finish switching current subbuffer
+ *
+ * The only remaining threads could be the ones with pending commits. They will
+ * have to do the deliver themselves.
+ */
+static
+void lib_ring_buffer_switch_new_end(struct lib_ring_buffer *buf,
+					    struct channel *chan,
+					    struct switch_offsets *offsets,
+					    u64 tsc)
+{
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	unsigned long endidx = subbuf_index(offsets->end - 1, chan);
+	unsigned long commit_count, padding_size, data_size;
+
+	data_size = subbuf_offset(offsets->end - 1, chan) + 1;
+	padding_size = chan->backend.subbuf_size - data_size;
+	subbuffer_set_data_size(config, &buf->backend, endidx, data_size);
+
+	/*
+	 * Order all writes to buffer before the commit count update that will
+	 * determine that the subbuffer is full.
+	 */
+	if (config->ipi == RING_BUFFER_IPI_BARRIER) {
+		/*
+		 * Must write slot data before incrementing commit count.  This
+		 * compiler barrier is upgraded into a smp_mb() by the IPI sent
+		 * by get_subbuf().
+		 */
+		barrier();
+	} else
+		smp_wmb();
+	v_add(config, padding_size, &buf->commit_hot[endidx].cc);
+	commit_count = v_read(config, &buf->commit_hot[endidx].cc);
+	lib_ring_buffer_check_deliver(config, buf, chan, offsets->end - 1,
+				  commit_count, endidx);
+	lib_ring_buffer_write_commit_counter(config, buf, chan, endidx,
+					     offsets->end, commit_count,
+					     padding_size);
+}
+
+/*
+ * Returns :
+ * 0 if ok
+ * !0 if execution must be aborted.
+ */
+static
+int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
+				    struct lib_ring_buffer *buf,
+				    struct channel *chan,
+				    struct switch_offsets *offsets,
+				    u64 *tsc)
+{
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	unsigned long off;
+
+	offsets->begin = v_read(config, &buf->offset);
+	offsets->old = offsets->begin;
+	offsets->switch_old_start = 0;
+	off = subbuf_offset(offsets->begin, chan);
+
+	*tsc = config->cb.ring_buffer_clock_read(chan);
+
+	/*
+	 * Ensure we flush the header of an empty subbuffer when doing the
+	 * finalize (SWITCH_FLUSH). This ensures that we end up knowing the
+	 * total data gathering duration even if there were no records saved
+	 * after the last buffer switch.
+	 * In SWITCH_ACTIVE mode, switch the buffer when it contains events.
+	 * SWITCH_ACTIVE only flushes the current subbuffer, dealing with end of
+	 * subbuffer header as appropriate.
+	 * The next record that reserves space will be responsible for
+	 * populating the following subbuffer header. We choose not to populate
+	 * the next subbuffer header here because we want to be able to use
+	 * SWITCH_ACTIVE for periodical buffer flush and CPU tick_nohz stop
+	 * buffer flush, which must guarantee that all the buffer content
+	 * (records and header timestamps) are visible to the reader. This is
+	 * required for quiescence guarantees for the fusion merge.
+	 */
+	if (mode == SWITCH_FLUSH || off > 0) {
+		if (unlikely(off == 0)) {
+			/*
+			 * The client does not save any header information.
+			 * Don't switch empty subbuffer on finalize, because it
+			 * is invalid to deliver a completely empty subbuffer.
+			 */
+			if (!config->cb.subbuffer_header_size())
+				return -1;
+			/*
+			 * Need to write the subbuffer start header on finalize.
+			 */
+			offsets->switch_old_start = 1;
+		}
+		offsets->begin = subbuf_align(offsets->begin, chan);
+	} else
+		return -1;	/* we do not have to switch : buffer is empty */
+	/* Note: old points to the next subbuf at offset 0 */
+	offsets->end = offsets->begin;
+	return 0;
+}
+
+/*
+ * Force a sub-buffer switch. This operation is completely reentrant : can be
+ * called while tracing is active with absolutely no lock held.
+ *
+ * Note, however, that as a v_cmpxchg is used for some atomic
+ * operations, this function must be called from the CPU which owns the buffer
+ * for a ACTIVE flush.
+ */
+void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode mode)
+{
+	struct channel *chan = buf->backend.chan;
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	struct switch_offsets offsets;
+	unsigned long oldidx;
+	u64 tsc;
+
+	offsets.size = 0;
+
+	/*
+	 * Perform retryable operations.
+	 */
+	do {
+		if (lib_ring_buffer_try_switch_slow(mode, buf, chan, &offsets,
+						    &tsc))
+			return;	/* Switch not needed */
+	} while (v_cmpxchg(config, &buf->offset, offsets.old, offsets.end)
+		 != offsets.old);
+
+	/*
+	 * Atomically update last_tsc. This update races against concurrent
+	 * atomic updates, but the race will always cause supplementary full TSC
+	 * records, never the opposite (missing a full TSC record when it would
+	 * be needed).
+	 */
+	save_last_tsc(config, buf, tsc);
+
+	/*
+	 * Push the reader if necessary
+	 */
+	lib_ring_buffer_reserve_push_reader(buf, chan, offsets.old);
+
+	oldidx = subbuf_index(offsets.old, chan);
+	lib_ring_buffer_clear_noref(config, &buf->backend, oldidx);
+
+	/*
+	 * May need to populate header start on SWITCH_FLUSH.
+	 */
+	if (offsets.switch_old_start) {
+		lib_ring_buffer_switch_old_start(buf, chan, &offsets, tsc);
+		offsets.old += config->cb.subbuffer_header_size();
+	}
+
+	/*
+	 * Switch old subbuffer.
+	 */
+	lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc);
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_slow);
+
+/*
+ * Returns :
+ * 0 if ok
+ * -ENOSPC if event size is too large for packet.
+ * -ENOBUFS if there is currently not enough space in buffer for the event.
+ * -EIO if data cannot be written into the buffer for any other reason.
+ */
+static
+int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf,
+				     struct channel *chan,
+				     struct switch_offsets *offsets,
+				     struct lib_ring_buffer_ctx *ctx)
+{
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	unsigned long reserve_commit_diff;
+
+	offsets->begin = v_read(config, &buf->offset);
+	offsets->old = offsets->begin;
+	offsets->switch_new_start = 0;
+	offsets->switch_new_end = 0;
+	offsets->switch_old_end = 0;
+	offsets->pre_header_padding = 0;
+
+	ctx->tsc = config->cb.ring_buffer_clock_read(chan);
+	if ((int64_t) ctx->tsc == -EIO)
+		return -EIO;
+
+	if (last_tsc_overflow(config, buf, ctx->tsc))
+		ctx->rflags |= RING_BUFFER_RFLAG_FULL_TSC;
+
+	if (unlikely(subbuf_offset(offsets->begin, ctx->chan) == 0)) {
+		offsets->switch_new_start = 1;		/* For offsets->begin */
+	} else {
+		offsets->size = config->cb.record_header_size(config, chan,
+						offsets->begin,
+						&offsets->pre_header_padding,
+						ctx);
+		offsets->size +=
+			lib_ring_buffer_align(offsets->begin + offsets->size,
+					      ctx->largest_align)
+			+ ctx->data_size;
+		if (unlikely(subbuf_offset(offsets->begin, chan) +
+			     offsets->size > chan->backend.subbuf_size)) {
+			offsets->switch_old_end = 1;	/* For offsets->old */
+			offsets->switch_new_start = 1;	/* For offsets->begin */
+		}
+	}
+	if (unlikely(offsets->switch_new_start)) {
+		unsigned long sb_index;
+
+		/*
+		 * We are typically not filling the previous buffer completely.
+		 */
+		if (likely(offsets->switch_old_end))
+			offsets->begin = subbuf_align(offsets->begin, chan);
+		offsets->begin = offsets->begin
+				 + config->cb.subbuffer_header_size();
+		/* Test new buffer integrity */
+		sb_index = subbuf_index(offsets->begin, chan);
+		reserve_commit_diff =
+		  (buf_trunc(offsets->begin, chan)
+		   >> chan->backend.num_subbuf_order)
+		  - ((unsigned long) v_read(config,
+					    &buf->commit_cold[sb_index].cc_sb)
+		     & chan->commit_count_mask);
+		if (likely(reserve_commit_diff == 0)) {
+			/* Next subbuffer not being written to. */
+			if (unlikely(config->mode != RING_BUFFER_OVERWRITE &&
+				subbuf_trunc(offsets->begin, chan)
+				 - subbuf_trunc((unsigned long)
+				     atomic_long_read(&buf->consumed), chan)
+				>= chan->backend.buf_size)) {
+				/*
+				 * We do not overwrite non consumed buffers
+				 * and we are full : record is lost.
+				 */
+				v_inc(config, &buf->records_lost_full);
+				return -ENOBUFS;
+			} else {
+				/*
+				 * Next subbuffer not being written to, and we
+				 * are either in overwrite mode or the buffer is
+				 * not full. It's safe to write in this new
+				 * subbuffer.
+				 */
+			}
+		} else {
+			/*
+			 * Next subbuffer reserve offset does not match the
+			 * commit offset. Drop record in producer-consumer and
+			 * overwrite mode. Caused by either a writer OOPS or too
+			 * many nested writes over a reserve/commit pair.
+			 */
+			v_inc(config, &buf->records_lost_wrap);
+			return -EIO;
+		}
+		offsets->size =
+			config->cb.record_header_size(config, chan,
+						offsets->begin,
+						&offsets->pre_header_padding,
+						ctx);
+		offsets->size +=
+			lib_ring_buffer_align(offsets->begin + offsets->size,
+					      ctx->largest_align)
+			+ ctx->data_size;
+		if (unlikely(subbuf_offset(offsets->begin, chan)
+			     + offsets->size > chan->backend.subbuf_size)) {
+			/*
+			 * Record too big for subbuffers, report error, don't
+			 * complete the sub-buffer switch.
+			 */
+			v_inc(config, &buf->records_lost_big);
+			return -ENOSPC;
+		} else {
+			/*
+			 * We just made a successful buffer switch and the
+			 * record fits in the new subbuffer. Let's write.
+			 */
+		}
+	} else {
+		/*
+		 * Record fits in the current buffer and we are not on a switch
+		 * boundary. It's safe to write.
+		 */
+	}
+	offsets->end = offsets->begin + offsets->size;
+
+	if (unlikely(subbuf_offset(offsets->end, chan) == 0)) {
+		/*
+		 * The offset_end will fall at the very beginning of the next
+		 * subbuffer.
+		 */
+		offsets->switch_new_end = 1;	/* For offsets->begin */
+	}
+	return 0;
+}
+
+/**
+ * lib_ring_buffer_reserve_slow - Atomic slot reservation in a buffer.
+ * @ctx: ring buffer context.
+ *
+ * Return : -NOBUFS if not enough space, -ENOSPC if event size too large,
+ * -EIO for other errors, else returns 0.
+ * It will take care of sub-buffer switching.
+ */
+int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx)
+{
+	struct channel *chan = ctx->chan;
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	struct lib_ring_buffer *buf;
+	struct switch_offsets offsets;
+	int ret;
+
+	if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+		buf = per_cpu_ptr(chan->backend.buf, ctx->cpu);
+	else
+		buf = chan->backend.buf;
+	ctx->buf = buf;
+
+	offsets.size = 0;
+
+	do {
+		ret = lib_ring_buffer_try_reserve_slow(buf, chan, &offsets,
+						       ctx);
+		if (unlikely(ret))
+			return ret;
+	} while (unlikely(v_cmpxchg(config, &buf->offset, offsets.old,
+				    offsets.end)
+			  != offsets.old));
+
+	/*
+	 * Atomically update last_tsc. This update races against concurrent
+	 * atomic updates, but the race will always cause supplementary full TSC
+	 * records, never the opposite (missing a full TSC record when it would
+	 * be needed).
+	 */
+	save_last_tsc(config, buf, ctx->tsc);
+
+	/*
+	 * Push the reader if necessary
+	 */
+	lib_ring_buffer_reserve_push_reader(buf, chan, offsets.end - 1);
+
+	/*
+	 * Clear noref flag for this subbuffer.
+	 */
+	lib_ring_buffer_clear_noref(config, &buf->backend,
+				    subbuf_index(offsets.end - 1, chan));
+
+	/*
+	 * Switch old subbuffer if needed.
+	 */
+	if (unlikely(offsets.switch_old_end)) {
+		lib_ring_buffer_clear_noref(config, &buf->backend,
+					    subbuf_index(offsets.old - 1, chan));
+		lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->tsc);
+	}
+
+	/*
+	 * Populate new subbuffer.
+	 */
+	if (unlikely(offsets.switch_new_start))
+		lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->tsc);
+
+	if (unlikely(offsets.switch_new_end))
+		lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->tsc);
+
+	ctx->slot_size = offsets.size;
+	ctx->pre_offset = offsets.begin;
+	ctx->buf_offset = offsets.begin + offsets.pre_header_padding;
+	return 0;
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_reserve_slow);
+
+int __init init_lib_ring_buffer_frontend(void)
+{
+	int cpu;
+
+	for_each_possible_cpu(cpu)
+		spin_lock_init(&per_cpu(ring_buffer_nohz_lock, cpu));
+	return 0;
+}
+
+module_init(init_lib_ring_buffer_frontend);
+
+void __exit exit_lib_ring_buffer_frontend(void)
+{
+}
+
+module_exit(exit_lib_ring_buffer_frontend);
diff --git a/drivers/staging/lttng/lib/ringbuffer/ring_buffer_iterator.c b/drivers/staging/lttng/lib/ringbuffer/ring_buffer_iterator.c
new file mode 100644
index 0000000..1321b5f
--- /dev/null
+++ b/drivers/staging/lttng/lib/ringbuffer/ring_buffer_iterator.c
@@ -0,0 +1,798 @@
+/*
+ * ring_buffer_iterator.c
+ *
+ * (C) Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * Ring buffer and channel iterators. Get each event of a channel in order. Uses
+ * a prio heap for per-cpu buffers, giving a O(log(NR_CPUS)) algorithmic
+ * complexity for the "get next event" operation.
+ *
+ * Author:
+ *	Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include "../../wrapper/ringbuffer/iterator.h"
+#include <linux/jiffies.h>
+#include <linux/delay.h>
+#include <linux/module.h>
+
+/*
+ * Safety factor taking into account internal kernel interrupt latency.
+ * Assuming 250ms worse-case latency.
+ */
+#define MAX_SYSTEM_LATENCY	250
+
+/*
+ * Maximum delta expected between trace clocks. At most 1 jiffy delta.
+ */
+#define MAX_CLOCK_DELTA		(jiffies_to_usecs(1) * 1000)
+
+/**
+ * lib_ring_buffer_get_next_record - Get the next record in a buffer.
+ * @chan: channel
+ * @buf: buffer
+ *
+ * Returns the size of the event read, -EAGAIN if buffer is empty, -ENODATA if
+ * buffer is empty and finalized. The buffer must already be opened for reading.
+ */
+ssize_t lib_ring_buffer_get_next_record(struct channel *chan,
+					struct lib_ring_buffer *buf)
+{
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	struct lib_ring_buffer_iter *iter = &buf->iter;
+	int ret;
+
+restart:
+	switch (iter->state) {
+	case ITER_GET_SUBBUF:
+		ret = lib_ring_buffer_get_next_subbuf(buf);
+		if (ret && !ACCESS_ONCE(buf->finalized)
+		    && config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
+			/*
+			 * Use "pull" scheme for global buffers. The reader
+			 * itself flushes the buffer to "pull" data not visible
+			 * to readers yet. Flush current subbuffer and re-try.
+			 *
+			 * Per-CPU buffers rather use a "push" scheme because
+			 * the IPI needed to flush all CPU's buffers is too
+			 * costly. In the "push" scheme, the reader waits for
+			 * the writer periodic deferrable timer to flush the
+			 * buffers (keeping track of a quiescent state
+			 * timestamp). Therefore, the writer "pushes" data out
+			 * of the buffers rather than letting the reader "pull"
+			 * data from the buffer.
+			 */
+			lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+			ret = lib_ring_buffer_get_next_subbuf(buf);
+		}
+		if (ret)
+			return ret;
+		iter->consumed = buf->cons_snapshot;
+		iter->data_size = lib_ring_buffer_get_read_data_size(config, buf);
+		iter->read_offset = iter->consumed;
+		/* skip header */
+		iter->read_offset += config->cb.subbuffer_header_size();
+		iter->state = ITER_TEST_RECORD;
+		goto restart;
+	case ITER_TEST_RECORD:
+		if (iter->read_offset - iter->consumed >= iter->data_size) {
+			iter->state = ITER_PUT_SUBBUF;
+		} else {
+			CHAN_WARN_ON(chan, !config->cb.record_get);
+			config->cb.record_get(config, chan, buf,
+					      iter->read_offset,
+					      &iter->header_len,
+					      &iter->payload_len,
+					      &iter->timestamp);
+			iter->read_offset += iter->header_len;
+			subbuffer_consume_record(config, &buf->backend);
+			iter->state = ITER_NEXT_RECORD;
+			return iter->payload_len;
+		}
+		goto restart;
+	case ITER_NEXT_RECORD:
+		iter->read_offset += iter->payload_len;
+		iter->state = ITER_TEST_RECORD;
+		goto restart;
+	case ITER_PUT_SUBBUF:
+		lib_ring_buffer_put_next_subbuf(buf);
+		iter->state = ITER_GET_SUBBUF;
+		goto restart;
+	default:
+		CHAN_WARN_ON(chan, 1);	/* Should not happen */
+		return -EPERM;
+	}
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_get_next_record);
+
+static int buf_is_higher(void *a, void *b)
+{
+	struct lib_ring_buffer *bufa = a;
+	struct lib_ring_buffer *bufb = b;
+
+	/* Consider lowest timestamps to be at the top of the heap */
+	return (bufa->iter.timestamp < bufb->iter.timestamp);
+}
+
+static
+void lib_ring_buffer_get_empty_buf_records(const struct lib_ring_buffer_config *config,
+					   struct channel *chan)
+{
+	struct lttng_ptr_heap *heap = &chan->iter.heap;
+	struct lib_ring_buffer *buf, *tmp;
+	ssize_t len;
+
+	list_for_each_entry_safe(buf, tmp, &chan->iter.empty_head,
+				 iter.empty_node) {
+		len = lib_ring_buffer_get_next_record(chan, buf);
+
+		/*
+		 * Deal with -EAGAIN and -ENODATA.
+		 * len >= 0 means record contains data.
+		 * -EBUSY should never happen, because we support only one
+		 * reader.
+		 */
+		switch (len) {
+		case -EAGAIN:
+			/* Keep node in empty list */
+			break;
+		case -ENODATA:
+			/*
+			 * Buffer is finalized. Don't add to list of empty
+			 * buffer, because it has no more data to provide, ever.
+			 */
+			list_del(&buf->iter.empty_node);
+			break;
+		case -EBUSY:
+			CHAN_WARN_ON(chan, 1);
+			break;
+		default:
+			/*
+			 * Insert buffer into the heap, remove from empty buffer
+			 * list.
+			 */
+			CHAN_WARN_ON(chan, len < 0);
+			list_del(&buf->iter.empty_node);
+			CHAN_WARN_ON(chan, lttng_heap_insert(heap, buf));
+		}
+	}
+}
+
+static
+void lib_ring_buffer_wait_for_qs(const struct lib_ring_buffer_config *config,
+				 struct channel *chan)
+{
+	u64 timestamp_qs;
+	unsigned long wait_msecs;
+
+	/*
+	 * No need to wait if no empty buffers are present.
+	 */
+	if (list_empty(&chan->iter.empty_head))
+		return;
+
+	timestamp_qs = config->cb.ring_buffer_clock_read(chan);
+	/*
+	 * We need to consider previously empty buffers.
+	 * Do a get next buf record on each of them. Add them to
+	 * the heap if they have data. If at least one of them
+	 * don't have data, we need to wait for
+	 * switch_timer_interval + MAX_SYSTEM_LATENCY (so we are sure the
+	 * buffers have been switched either by the timer or idle entry) and
+	 * check them again, adding them if they have data.
+	 */
+	lib_ring_buffer_get_empty_buf_records(config, chan);
+
+	/*
+	 * No need to wait if no empty buffers are present.
+	 */
+	if (list_empty(&chan->iter.empty_head))
+		return;
+
+	/*
+	 * We need to wait for the buffer switch timer to run. If the
+	 * CPU is idle, idle entry performed the switch.
+	 * TODO: we could optimize further by skipping the sleep if all
+	 * empty buffers belong to idle or offline cpus.
+	 */
+	wait_msecs = jiffies_to_msecs(chan->switch_timer_interval);
+	wait_msecs += MAX_SYSTEM_LATENCY;
+	msleep(wait_msecs);
+	lib_ring_buffer_get_empty_buf_records(config, chan);
+	/*
+	 * Any buffer still in the empty list here cannot possibly
+	 * contain an event with a timestamp prior to "timestamp_qs".
+	 * The new quiescent state timestamp is the one we grabbed
+	 * before waiting for buffer data.  It is therefore safe to
+	 * ignore empty buffers up to last_qs timestamp for fusion
+	 * merge.
+	 */
+	chan->iter.last_qs = timestamp_qs;
+}
+
+/**
+ * channel_get_next_record - Get the next record in a channel.
+ * @chan: channel
+ * @ret_buf: the buffer in which the event is located (output)
+ *
+ * Returns the size of new current event, -EAGAIN if all buffers are empty,
+ * -ENODATA if all buffers are empty and finalized. The channel must already be
+ * opened for reading.
+ */
+
+ssize_t channel_get_next_record(struct channel *chan,
+				struct lib_ring_buffer **ret_buf)
+{
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	struct lib_ring_buffer *buf;
+	struct lttng_ptr_heap *heap;
+	ssize_t len;
+
+	if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
+		*ret_buf = channel_get_ring_buffer(config, chan, 0);
+		return lib_ring_buffer_get_next_record(chan, *ret_buf);
+	}
+
+	heap = &chan->iter.heap;
+
+	/*
+	 * get next record for topmost buffer.
+	 */
+	buf = lttng_heap_maximum(heap);
+	if (buf) {
+		len = lib_ring_buffer_get_next_record(chan, buf);
+		/*
+		 * Deal with -EAGAIN and -ENODATA.
+		 * len >= 0 means record contains data.
+		 */
+		switch (len) {
+		case -EAGAIN:
+			buf->iter.timestamp = 0;
+			list_add(&buf->iter.empty_node, &chan->iter.empty_head);
+			/* Remove topmost buffer from the heap */
+			CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
+			break;
+		case -ENODATA:
+			/*
+			 * Buffer is finalized. Remove buffer from heap and
+			 * don't add to list of empty buffer, because it has no
+			 * more data to provide, ever.
+			 */
+			CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
+			break;
+		case -EBUSY:
+			CHAN_WARN_ON(chan, 1);
+			break;
+		default:
+			/*
+			 * Reinsert buffer into the heap. Note that heap can be
+			 * partially empty, so we need to use
+			 * lttng_heap_replace_max().
+			 */
+			CHAN_WARN_ON(chan, len < 0);
+			CHAN_WARN_ON(chan, lttng_heap_replace_max(heap, buf) != buf);
+			break;
+		}
+	}
+
+	buf = lttng_heap_maximum(heap);
+	if (!buf || buf->iter.timestamp > chan->iter.last_qs) {
+		/*
+		 * Deal with buffers previously showing no data.
+		 * Add buffers containing data to the heap, update
+		 * last_qs.
+		 */
+		lib_ring_buffer_wait_for_qs(config, chan);
+	}
+
+	*ret_buf = buf = lttng_heap_maximum(heap);
+	if (buf) {
+		/*
+		 * If this warning triggers, you probably need to check your
+		 * system interrupt latency. Typical causes: too many printk()
+		 * output going to a serial console with interrupts off.
+		 * Allow for MAX_CLOCK_DELTA ns timestamp delta going backward.
+		 * Observed on SMP KVM setups with trace_clock().
+		 */
+		if (chan->iter.last_timestamp
+		    > (buf->iter.timestamp + MAX_CLOCK_DELTA)) {
+			printk(KERN_WARNING "ring_buffer: timestamps going "
+			       "backward. Last time %llu ns, cpu %d, "
+			       "current time %llu ns, cpu %d, "
+			       "delta %llu ns.\n",
+			       chan->iter.last_timestamp, chan->iter.last_cpu,
+			       buf->iter.timestamp, buf->backend.cpu,
+			       chan->iter.last_timestamp - buf->iter.timestamp);
+			CHAN_WARN_ON(chan, 1);
+		}
+		chan->iter.last_timestamp = buf->iter.timestamp;
+		chan->iter.last_cpu = buf->backend.cpu;
+		return buf->iter.payload_len;
+	} else {
+		/* Heap is empty */
+		if (list_empty(&chan->iter.empty_head))
+			return -ENODATA;	/* All buffers finalized */
+		else
+			return -EAGAIN;		/* Temporarily empty */
+	}
+}
+EXPORT_SYMBOL_GPL(channel_get_next_record);
+
+static
+void lib_ring_buffer_iterator_init(struct channel *chan, struct lib_ring_buffer *buf)
+{
+	if (buf->iter.allocated)
+		return;
+
+	buf->iter.allocated = 1;
+	if (chan->iter.read_open && !buf->iter.read_open) {
+		CHAN_WARN_ON(chan, lib_ring_buffer_open_read(buf) != 0);
+		buf->iter.read_open = 1;
+	}
+
+	/* Add to list of buffers without any current record */
+	if (chan->backend.config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+		list_add(&buf->iter.empty_node, &chan->iter.empty_head);
+}
+
+#ifdef CONFIG_HOTPLUG_CPU
+static
+int __cpuinit channel_iterator_cpu_hotplug(struct notifier_block *nb,
+					   unsigned long action,
+					   void *hcpu)
+{
+	unsigned int cpu = (unsigned long)hcpu;
+	struct channel *chan = container_of(nb, struct channel,
+					    hp_iter_notifier);
+	struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+
+	if (!chan->hp_iter_enable)
+		return NOTIFY_DONE;
+
+	CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
+
+	switch (action) {
+	case CPU_DOWN_FAILED:
+	case CPU_DOWN_FAILED_FROZEN:
+	case CPU_ONLINE:
+	case CPU_ONLINE_FROZEN:
+		lib_ring_buffer_iterator_init(chan, buf);
+		return NOTIFY_OK;
+	default:
+		return NOTIFY_DONE;
+	}
+}
+#endif
+
+int channel_iterator_init(struct channel *chan)
+{
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	struct lib_ring_buffer *buf;
+
+	if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+		int cpu, ret;
+
+		INIT_LIST_HEAD(&chan->iter.empty_head);
+		ret = lttng_heap_init(&chan->iter.heap,
+				num_possible_cpus(),
+				GFP_KERNEL, buf_is_higher);
+		if (ret)
+			return ret;
+		/*
+		 * In case of non-hotplug cpu, if the ring-buffer is allocated
+		 * in early initcall, it will not be notified of secondary cpus.
+		 * In that off case, we need to allocate for all possible cpus.
+		 */
+#ifdef CONFIG_HOTPLUG_CPU
+		chan->hp_iter_notifier.notifier_call =
+			channel_iterator_cpu_hotplug;
+		chan->hp_iter_notifier.priority = 10;
+		register_cpu_notifier(&chan->hp_iter_notifier);
+		get_online_cpus();
+		for_each_online_cpu(cpu) {
+			buf = per_cpu_ptr(chan->backend.buf, cpu);
+			lib_ring_buffer_iterator_init(chan, buf);
+		}
+		chan->hp_iter_enable = 1;
+		put_online_cpus();
+#else
+		for_each_possible_cpu(cpu) {
+			buf = per_cpu_ptr(chan->backend.buf, cpu);
+			lib_ring_buffer_iterator_init(chan, buf);
+		}
+#endif
+	} else {
+		buf = channel_get_ring_buffer(config, chan, 0);
+		lib_ring_buffer_iterator_init(chan, buf);
+	}
+	return 0;
+}
+
+void channel_iterator_unregister_notifiers(struct channel *chan)
+{
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+
+	if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+		chan->hp_iter_enable = 0;
+		unregister_cpu_notifier(&chan->hp_iter_notifier);
+	}
+}
+
+void channel_iterator_free(struct channel *chan)
+{
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+
+	if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+		lttng_heap_free(&chan->iter.heap);
+}
+
+int lib_ring_buffer_iterator_open(struct lib_ring_buffer *buf)
+{
+	struct channel *chan = buf->backend.chan;
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
+	return lib_ring_buffer_open_read(buf);
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_open);
+
+/*
+ * Note: Iterators must not be mixed with other types of outputs, because an
+ * iterator can leave the buffer in "GET" state, which is not consistent with
+ * other types of output (mmap, splice, raw data read).
+ */
+void lib_ring_buffer_iterator_release(struct lib_ring_buffer *buf)
+{
+	lib_ring_buffer_release_read(buf);
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_release);
+
+int channel_iterator_open(struct channel *chan)
+{
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	struct lib_ring_buffer *buf;
+	int ret = 0, cpu;
+
+	CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
+
+	if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+		get_online_cpus();
+		/* Allow CPU hotplug to keep track of opened reader */
+		chan->iter.read_open = 1;
+		for_each_channel_cpu(cpu, chan) {
+			buf = channel_get_ring_buffer(config, chan, cpu);
+			ret = lib_ring_buffer_iterator_open(buf);
+			if (ret)
+				goto error;
+			buf->iter.read_open = 1;
+		}
+		put_online_cpus();
+	} else {
+		buf = channel_get_ring_buffer(config, chan, 0);
+		ret = lib_ring_buffer_iterator_open(buf);
+	}
+	return ret;
+error:
+	/* Error should always happen on CPU 0, hence no close is required. */
+	CHAN_WARN_ON(chan, cpu != 0);
+	put_online_cpus();
+	return ret;
+}
+EXPORT_SYMBOL_GPL(channel_iterator_open);
+
+void channel_iterator_release(struct channel *chan)
+{
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	struct lib_ring_buffer *buf;
+	int cpu;
+
+	if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+		get_online_cpus();
+		for_each_channel_cpu(cpu, chan) {
+			buf = channel_get_ring_buffer(config, chan, cpu);
+			if (buf->iter.read_open) {
+				lib_ring_buffer_iterator_release(buf);
+				buf->iter.read_open = 0;
+			}
+		}
+		chan->iter.read_open = 0;
+		put_online_cpus();
+	} else {
+		buf = channel_get_ring_buffer(config, chan, 0);
+		lib_ring_buffer_iterator_release(buf);
+	}
+}
+EXPORT_SYMBOL_GPL(channel_iterator_release);
+
+void lib_ring_buffer_iterator_reset(struct lib_ring_buffer *buf)
+{
+	struct channel *chan = buf->backend.chan;
+
+	if (buf->iter.state != ITER_GET_SUBBUF)
+		lib_ring_buffer_put_next_subbuf(buf);
+	buf->iter.state = ITER_GET_SUBBUF;
+	/* Remove from heap (if present). */
+	if (lttng_heap_cherrypick(&chan->iter.heap, buf))
+		list_add(&buf->iter.empty_node, &chan->iter.empty_head);
+	buf->iter.timestamp = 0;
+	buf->iter.header_len = 0;
+	buf->iter.payload_len = 0;
+	buf->iter.consumed = 0;
+	buf->iter.read_offset = 0;
+	buf->iter.data_size = 0;
+	/* Don't reset allocated and read_open */
+}
+
+void channel_iterator_reset(struct channel *chan)
+{
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	struct lib_ring_buffer *buf;
+	int cpu;
+
+	/* Empty heap, put into empty_head */
+	while ((buf = lttng_heap_remove(&chan->iter.heap)) != NULL)
+		list_add(&buf->iter.empty_node, &chan->iter.empty_head);
+
+	for_each_channel_cpu(cpu, chan) {
+		buf = channel_get_ring_buffer(config, chan, cpu);
+		lib_ring_buffer_iterator_reset(buf);
+	}
+	/* Don't reset read_open */
+	chan->iter.last_qs = 0;
+	chan->iter.last_timestamp = 0;
+	chan->iter.last_cpu = 0;
+	chan->iter.len_left = 0;
+}
+
+/*
+ * Ring buffer payload extraction read() implementation.
+ */
+static
+ssize_t channel_ring_buffer_file_read(struct file *filp,
+				      char __user *user_buf,
+				      size_t count,
+				      loff_t *ppos,
+				      struct channel *chan,
+				      struct lib_ring_buffer *buf,
+				      int fusionmerge)
+{
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	size_t read_count = 0, read_offset;
+	ssize_t len;
+
+	might_sleep();
+	if (!access_ok(VERIFY_WRITE, user_buf, count))
+		return -EFAULT;
+
+	/* Finish copy of previous record */
+	if (*ppos != 0) {
+		if (read_count < count) {
+			len = chan->iter.len_left;
+			read_offset = *ppos;
+			if (config->alloc == RING_BUFFER_ALLOC_PER_CPU
+			    && fusionmerge)
+				buf = lttng_heap_maximum(&chan->iter.heap);
+			CHAN_WARN_ON(chan, !buf);
+			goto skip_get_next;
+		}
+	}
+
+	while (read_count < count) {
+		size_t copy_len, space_left;
+
+		if (fusionmerge)
+			len = channel_get_next_record(chan, &buf);
+		else
+			len = lib_ring_buffer_get_next_record(chan, buf);
+len_test:
+		if (len < 0) {
+			/*
+			 * Check if buffer is finalized (end of file).
+			 */
+			if (len == -ENODATA) {
+				/* A 0 read_count will tell about end of file */
+				goto nodata;
+			}
+			if (filp->f_flags & O_NONBLOCK) {
+				if (!read_count)
+					read_count = -EAGAIN;
+				goto nodata;
+			} else {
+				int error;
+
+				/*
+				 * No data available at the moment, return what
+				 * we got.
+				 */
+				if (read_count)
+					goto nodata;
+
+				/*
+				 * Wait for returned len to be >= 0 or -ENODATA.
+				 */
+				if (fusionmerge)
+					error = wait_event_interruptible(
+					  chan->read_wait,
+					  ((len = channel_get_next_record(chan,
+						&buf)), len != -EAGAIN));
+				else
+					error = wait_event_interruptible(
+					  buf->read_wait,
+					  ((len = lib_ring_buffer_get_next_record(
+						  chan, buf)), len != -EAGAIN));
+				CHAN_WARN_ON(chan, len == -EBUSY);
+				if (error) {
+					read_count = error;
+					goto nodata;
+				}
+				CHAN_WARN_ON(chan, len < 0 && len != -ENODATA);
+				goto len_test;
+			}
+		}
+		read_offset = buf->iter.read_offset;
+skip_get_next:
+		space_left = count - read_count;
+		if (len <= space_left) {
+			copy_len = len;
+			chan->iter.len_left = 0;
+			*ppos = 0;
+		} else {
+			copy_len = space_left;
+			chan->iter.len_left = len - copy_len;
+			*ppos = read_offset + copy_len;
+		}
+		if (__lib_ring_buffer_copy_to_user(&buf->backend, read_offset,
+					       &user_buf[read_count],
+					       copy_len)) {
+			/*
+			 * Leave the len_left and ppos values at their current
+			 * state, as we currently have a valid event to read.
+			 */
+			return -EFAULT;
+		}
+		read_count += copy_len;
+	};
+	return read_count;
+
+nodata:
+	*ppos = 0;
+	chan->iter.len_left = 0;
+	return read_count;
+}
+
+/**
+ * lib_ring_buffer_file_read - Read buffer record payload.
+ * @filp: file structure pointer.
+ * @buffer: user buffer to read data into.
+ * @count: number of bytes to read.
+ * @ppos: file read position.
+ *
+ * Returns a negative value on error, or the number of bytes read on success.
+ * ppos is used to save the position _within the current record_ between calls
+ * to read().
+ */
+static
+ssize_t lib_ring_buffer_file_read(struct file *filp,
+				  char __user *user_buf,
+			          size_t count,
+			          loff_t *ppos)
+{
+	struct inode *inode = filp->f_dentry->d_inode;
+	struct lib_ring_buffer *buf = inode->i_private;
+	struct channel *chan = buf->backend.chan;
+
+	return channel_ring_buffer_file_read(filp, user_buf, count, ppos,
+					     chan, buf, 0);
+}
+
+/**
+ * channel_file_read - Read channel record payload.
+ * @filp: file structure pointer.
+ * @buffer: user buffer to read data into.
+ * @count: number of bytes to read.
+ * @ppos: file read position.
+ *
+ * Returns a negative value on error, or the number of bytes read on success.
+ * ppos is used to save the position _within the current record_ between calls
+ * to read().
+ */
+static
+ssize_t channel_file_read(struct file *filp,
+			  char __user *user_buf,
+			  size_t count,
+			  loff_t *ppos)
+{
+	struct inode *inode = filp->f_dentry->d_inode;
+	struct channel *chan = inode->i_private;
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+
+	if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+		return channel_ring_buffer_file_read(filp, user_buf, count,
+						     ppos, chan, NULL, 1);
+	else {
+		struct lib_ring_buffer *buf =
+			channel_get_ring_buffer(config, chan, 0);
+		return channel_ring_buffer_file_read(filp, user_buf, count,
+						     ppos, chan, buf, 0);
+	}
+}
+
+static
+int lib_ring_buffer_file_open(struct inode *inode, struct file *file)
+{
+	struct lib_ring_buffer *buf = inode->i_private;
+	int ret;
+
+	ret = lib_ring_buffer_iterator_open(buf);
+	if (ret)
+		return ret;
+
+	file->private_data = buf;
+	ret = nonseekable_open(inode, file);
+	if (ret)
+		goto release_iter;
+	return 0;
+
+release_iter:
+	lib_ring_buffer_iterator_release(buf);
+	return ret;
+}
+
+static
+int lib_ring_buffer_file_release(struct inode *inode, struct file *file)
+{
+	struct lib_ring_buffer *buf = inode->i_private;
+
+	lib_ring_buffer_iterator_release(buf);
+	return 0;
+}
+
+static
+int channel_file_open(struct inode *inode, struct file *file)
+{
+	struct channel *chan = inode->i_private;
+	int ret;
+
+	ret = channel_iterator_open(chan);
+	if (ret)
+		return ret;
+
+	file->private_data = chan;
+	ret = nonseekable_open(inode, file);
+	if (ret)
+		goto release_iter;
+	return 0;
+
+release_iter:
+	channel_iterator_release(chan);
+	return ret;
+}
+
+static
+int channel_file_release(struct inode *inode, struct file *file)
+{
+	struct channel *chan = inode->i_private;
+
+	channel_iterator_release(chan);
+	return 0;
+}
+
+const struct file_operations channel_payload_file_operations = {
+	.owner = THIS_MODULE,
+	.open = channel_file_open,
+	.release = channel_file_release,
+	.read = channel_file_read,
+	.llseek = lib_ring_buffer_no_llseek,
+};
+EXPORT_SYMBOL_GPL(channel_payload_file_operations);
+
+const struct file_operations lib_ring_buffer_payload_file_operations = {
+	.owner = THIS_MODULE,
+	.open = lib_ring_buffer_file_open,
+	.release = lib_ring_buffer_file_release,
+	.read = lib_ring_buffer_file_read,
+	.llseek = lib_ring_buffer_no_llseek,
+};
+EXPORT_SYMBOL_GPL(lib_ring_buffer_payload_file_operations);
diff --git a/drivers/staging/lttng/lib/ringbuffer/ring_buffer_mmap.c b/drivers/staging/lttng/lib/ringbuffer/ring_buffer_mmap.c
new file mode 100644
index 0000000..68221ee
--- /dev/null
+++ b/drivers/staging/lttng/lib/ringbuffer/ring_buffer_mmap.c
@@ -0,0 +1,115 @@
+/*
+ * ring_buffer_mmap.c
+ *
+ * Copyright (C) 2002-2005 - Tom Zanussi <zanussi at us.ibm.com>, IBM Corp
+ * Copyright (C) 1999-2005 - Karim Yaghmour <karim at opersys.com>
+ * Copyright (C) 2008-2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * Re-using content from kernel/relay.c.
+ *
+ * This file is released under the GPL v2.
+ */
+
+#include <linux/module.h>
+#include <linux/mm.h>
+
+#include "../../wrapper/ringbuffer/backend.h"
+#include "../../wrapper/ringbuffer/frontend.h"
+#include "../../wrapper/ringbuffer/vfs.h"
+
+/*
+ * fault() vm_op implementation for ring buffer file mapping.
+ */
+static int lib_ring_buffer_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
+{
+	struct lib_ring_buffer *buf = vma->vm_private_data;
+	struct channel *chan = buf->backend.chan;
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	pgoff_t pgoff = vmf->pgoff;
+	struct page **page;
+	void **virt;
+	unsigned long offset, sb_bindex;
+
+	if (!buf)
+		return VM_FAULT_OOM;
+
+	/*
+	 * Verify that faults are only done on the range of pages owned by the
+	 * reader.
+	 */
+	offset = pgoff << PAGE_SHIFT;
+	sb_bindex = subbuffer_id_get_index(config, buf->backend.buf_rsb.id);
+	if (!(offset >= buf->backend.array[sb_bindex]->mmap_offset
+	      && offset < buf->backend.array[sb_bindex]->mmap_offset +
+			  buf->backend.chan->backend.subbuf_size))
+		return VM_FAULT_SIGBUS;
+	/*
+	 * ring_buffer_read_get_page() gets the page in the current reader's
+	 * pages.
+	 */
+	page = lib_ring_buffer_read_get_page(&buf->backend, offset, &virt);
+	if (!*page)
+		return VM_FAULT_SIGBUS;
+	get_page(*page);
+	vmf->page = *page;
+
+	return 0;
+}
+
+/*
+ * vm_ops for ring buffer file mappings.
+ */
+static const struct vm_operations_struct lib_ring_buffer_mmap_ops = {
+	.fault = lib_ring_buffer_fault,
+};
+
+/**
+ *	lib_ring_buffer_mmap_buf: - mmap channel buffer to process address space
+ *	@buf: ring buffer to map
+ *	@vma: vm_area_struct describing memory to be mapped
+ *
+ *	Returns 0 if ok, negative on error
+ *
+ *	Caller should already have grabbed mmap_sem.
+ */
+static int lib_ring_buffer_mmap_buf(struct lib_ring_buffer *buf,
+				    struct vm_area_struct *vma)
+{
+	unsigned long length = vma->vm_end - vma->vm_start;
+	struct channel *chan = buf->backend.chan;
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	unsigned long mmap_buf_len;
+
+	if (config->output != RING_BUFFER_MMAP)
+		return -EINVAL;
+
+	if (!buf)
+		return -EBADF;
+
+	mmap_buf_len = chan->backend.buf_size;
+	if (chan->backend.extra_reader_sb)
+		mmap_buf_len += chan->backend.subbuf_size;
+
+	if (length != mmap_buf_len)
+		return -EINVAL;
+
+	vma->vm_ops = &lib_ring_buffer_mmap_ops;
+	vma->vm_flags |= VM_DONTEXPAND;
+	vma->vm_private_data = buf;
+
+	return 0;
+}
+
+/**
+ *	lib_ring_buffer_mmap - mmap file op
+ *	@filp: the file
+ *	@vma: the vma describing what to map
+ *
+ *	Calls upon lib_ring_buffer_mmap_buf() to map the file into user space.
+ */
+int lib_ring_buffer_mmap(struct file *filp, struct vm_area_struct *vma)
+{
+	struct lib_ring_buffer *buf = filp->private_data;
+	return lib_ring_buffer_mmap_buf(buf, vma);
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_mmap);
diff --git a/drivers/staging/lttng/lib/ringbuffer/ring_buffer_splice.c b/drivers/staging/lttng/lib/ringbuffer/ring_buffer_splice.c
new file mode 100644
index 0000000..ded18ba
--- /dev/null
+++ b/drivers/staging/lttng/lib/ringbuffer/ring_buffer_splice.c
@@ -0,0 +1,202 @@
+/*
+ * ring_buffer_splice.c
+ *
+ * Copyright (C) 2002-2005 - Tom Zanussi <zanussi at us.ibm.com>, IBM Corp
+ * Copyright (C) 1999-2005 - Karim Yaghmour <karim at opersys.com>
+ * Copyright (C) 2008-2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * Re-using content from kernel/relay.c.
+ *
+ * This file is released under the GPL v2.
+ */
+
+#include <linux/module.h>
+#include <linux/fs.h>
+
+#include "../../wrapper/splice.h"
+#include "../../wrapper/ringbuffer/backend.h"
+#include "../../wrapper/ringbuffer/frontend.h"
+#include "../../wrapper/ringbuffer/vfs.h"
+
+#if 0
+#define printk_dbg(fmt, args...) printk(fmt, args)
+#else
+#define printk_dbg(fmt, args...)
+#endif
+
+loff_t lib_ring_buffer_no_llseek(struct file *file, loff_t offset, int origin)
+{
+	return -ESPIPE;
+}
+
+/*
+ * Release pages from the buffer so splice pipe_to_file can move them.
+ * Called after the pipe has been populated with buffer pages.
+ */
+static void lib_ring_buffer_pipe_buf_release(struct pipe_inode_info *pipe,
+					     struct pipe_buffer *pbuf)
+{
+	__free_page(pbuf->page);
+}
+
+static const struct pipe_buf_operations ring_buffer_pipe_buf_ops = {
+	.can_merge = 0,
+	.map = generic_pipe_buf_map,
+	.unmap = generic_pipe_buf_unmap,
+	.confirm = generic_pipe_buf_confirm,
+	.release = lib_ring_buffer_pipe_buf_release,
+	.steal = generic_pipe_buf_steal,
+	.get = generic_pipe_buf_get,
+};
+
+/*
+ * Page release operation after splice pipe_to_file ends.
+ */
+static void lib_ring_buffer_page_release(struct splice_pipe_desc *spd,
+					 unsigned int i)
+{
+	__free_page(spd->pages[i]);
+}
+
+/*
+ *	subbuf_splice_actor - splice up to one subbuf's worth of data
+ */
+static int subbuf_splice_actor(struct file *in,
+			       loff_t *ppos,
+			       struct pipe_inode_info *pipe,
+			       size_t len,
+			       unsigned int flags)
+{
+	struct lib_ring_buffer *buf = in->private_data;
+	struct channel *chan = buf->backend.chan;
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	unsigned int poff, subbuf_pages, nr_pages;
+	struct page *pages[PIPE_DEF_BUFFERS];
+	struct partial_page partial[PIPE_DEF_BUFFERS];
+	struct splice_pipe_desc spd = {
+		.pages = pages,
+		.nr_pages = 0,
+		.partial = partial,
+		.flags = flags,
+		.ops = &ring_buffer_pipe_buf_ops,
+		.spd_release = lib_ring_buffer_page_release,
+	};
+	unsigned long consumed_old, roffset;
+	unsigned long bytes_avail;
+
+	/*
+	 * Check that a GET_SUBBUF ioctl has been done before.
+	 */
+	WARN_ON(atomic_long_read(&buf->active_readers) != 1);
+	consumed_old = lib_ring_buffer_get_consumed(config, buf);
+	consumed_old += *ppos;
+
+	/*
+	 * Adjust read len, if longer than what is available.
+	 * Max read size is 1 subbuffer due to get_subbuf/put_subbuf for
+	 * protection.
+	 */
+	bytes_avail = chan->backend.subbuf_size;
+	WARN_ON(bytes_avail > chan->backend.buf_size);
+	len = min_t(size_t, len, bytes_avail);
+	subbuf_pages = bytes_avail >> PAGE_SHIFT;
+	nr_pages = min_t(unsigned int, subbuf_pages, PIPE_DEF_BUFFERS);
+	roffset = consumed_old & PAGE_MASK;
+	poff = consumed_old & ~PAGE_MASK;
+	printk_dbg(KERN_DEBUG "SPLICE actor len %zu pos %zd write_pos %ld\n",
+		   len, (ssize_t)*ppos, lib_ring_buffer_get_offset(config, buf));
+
+	for (; spd.nr_pages < nr_pages; spd.nr_pages++) {
+		unsigned int this_len;
+		struct page **page, *new_page;
+		void **virt;
+
+		if (!len)
+			break;
+		printk_dbg(KERN_DEBUG "SPLICE actor loop len %zu roffset %ld\n",
+			   len, roffset);
+
+		/*
+		 * We have to replace the page we are moving into the splice
+		 * pipe.
+		 */
+		new_page = alloc_pages_node(cpu_to_node(max(buf->backend.cpu,
+							    0)),
+					    GFP_KERNEL | __GFP_ZERO, 0);
+		if (!new_page)
+			break;
+
+		this_len = PAGE_SIZE - poff;
+		page = lib_ring_buffer_read_get_page(&buf->backend, roffset, &virt);
+		spd.pages[spd.nr_pages] = *page;
+		*page = new_page;
+		*virt = page_address(new_page);
+		spd.partial[spd.nr_pages].offset = poff;
+		spd.partial[spd.nr_pages].len = this_len;
+
+		poff = 0;
+		roffset += PAGE_SIZE;
+		len -= this_len;
+	}
+
+	if (!spd.nr_pages)
+		return 0;
+
+	return wrapper_splice_to_pipe(pipe, &spd);
+}
+
+ssize_t lib_ring_buffer_splice_read(struct file *in, loff_t *ppos,
+				    struct pipe_inode_info *pipe, size_t len,
+				    unsigned int flags)
+{
+	struct lib_ring_buffer *buf = in->private_data;
+	struct channel *chan = buf->backend.chan;
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	ssize_t spliced;
+	int ret;
+
+	if (config->output != RING_BUFFER_SPLICE)
+		return -EINVAL;
+
+	/*
+	 * We require ppos and length to be page-aligned for performance reasons
+	 * (no page copy). Size is known using the ioctl
+	 * RING_BUFFER_GET_PADDED_SUBBUF_SIZE, which is page-size padded.
+	 * We fail when the ppos or len passed is not page-sized, because splice
+	 * is not allowed to copy more than the length passed as parameter (so
+	 * the ABI does not let us silently copy more than requested to include
+	 * padding).
+	 */
+	if (*ppos != PAGE_ALIGN(*ppos) || len != PAGE_ALIGN(len))
+		return -EINVAL;
+
+	ret = 0;
+	spliced = 0;
+
+	printk_dbg(KERN_DEBUG "SPLICE read len %zu pos %zd\n", len,
+		   (ssize_t)*ppos);
+	while (len && !spliced) {
+		ret = subbuf_splice_actor(in, ppos, pipe, len, flags);
+		printk_dbg(KERN_DEBUG "SPLICE read loop ret %d\n", ret);
+		if (ret < 0)
+			break;
+		else if (!ret) {
+			if (flags & SPLICE_F_NONBLOCK)
+				ret = -EAGAIN;
+			break;
+		}
+
+		*ppos += ret;
+		if (ret > len)
+			len = 0;
+		else
+			len -= ret;
+		spliced += ret;
+	}
+
+	if (spliced)
+		return spliced;
+
+	return ret;
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_splice_read);
diff --git a/drivers/staging/lttng/lib/ringbuffer/ring_buffer_vfs.c b/drivers/staging/lttng/lib/ringbuffer/ring_buffer_vfs.c
new file mode 100644
index 0000000..1708ffd
--- /dev/null
+++ b/drivers/staging/lttng/lib/ringbuffer/ring_buffer_vfs.c
@@ -0,0 +1,387 @@
+/*
+ * ring_buffer_vfs.c
+ *
+ * Copyright (C) 2009-2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * Ring Buffer VFS file operations.
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <linux/module.h>
+#include <linux/fs.h>
+#include <linux/compat.h>
+
+#include "../../wrapper/ringbuffer/backend.h"
+#include "../../wrapper/ringbuffer/frontend.h"
+#include "../../wrapper/ringbuffer/vfs.h"
+#include "../../wrapper/poll.h"
+
+static int put_ulong(unsigned long val, unsigned long arg)
+{
+	return put_user(val, (unsigned long __user *)arg);
+}
+
+#ifdef CONFIG_COMPAT
+static int compat_put_ulong(compat_ulong_t val, unsigned long arg)
+{
+	return put_user(val, (compat_ulong_t __user *)compat_ptr(arg));
+}
+#endif
+
+/**
+ *	lib_ring_buffer_open - ring buffer open file operation
+ *	@inode: opened inode
+ *	@file: opened file
+ *
+ *	Open implementation. Makes sure only one open instance of a buffer is
+ *	done at a given moment.
+ */
+int lib_ring_buffer_open(struct inode *inode, struct file *file)
+{
+	struct lib_ring_buffer *buf = inode->i_private;
+	int ret;
+
+	ret = lib_ring_buffer_open_read(buf);
+	if (ret)
+		return ret;
+
+	file->private_data = buf;
+	ret = nonseekable_open(inode, file);
+	if (ret)
+		goto release_read;
+	return 0;
+
+release_read:
+	lib_ring_buffer_release_read(buf);
+	return ret;
+}
+
+/**
+ *	lib_ring_buffer_release - ring buffer release file operation
+ *	@inode: opened inode
+ *	@file: opened file
+ *
+ *	Release implementation.
+ */
+int lib_ring_buffer_release(struct inode *inode, struct file *file)
+{
+	struct lib_ring_buffer *buf = file->private_data;
+
+	lib_ring_buffer_release_read(buf);
+
+	return 0;
+}
+
+/**
+ *	lib_ring_buffer_poll - ring buffer poll file operation
+ *	@filp: the file
+ *	@wait: poll table
+ *
+ *	Poll implementation.
+ */
+unsigned int lib_ring_buffer_poll(struct file *filp, poll_table *wait)
+{
+	unsigned int mask = 0;
+	struct lib_ring_buffer *buf = filp->private_data;
+	struct channel *chan = buf->backend.chan;
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+	int finalized, disabled;
+
+	if (filp->f_mode & FMODE_READ) {
+		poll_wait_set_exclusive(wait);
+		poll_wait(filp, &buf->read_wait, wait);
+
+		finalized = lib_ring_buffer_is_finalized(config, buf);
+		disabled = lib_ring_buffer_channel_is_disabled(chan);
+
+		/*
+		 * lib_ring_buffer_is_finalized() contains a smp_rmb() ordering
+		 * finalized load before offsets loads.
+		 */
+		WARN_ON(atomic_long_read(&buf->active_readers) != 1);
+retry:
+		if (disabled)
+			return POLLERR;
+
+		if (subbuf_trunc(lib_ring_buffer_get_offset(config, buf), chan)
+		  - subbuf_trunc(lib_ring_buffer_get_consumed(config, buf), chan)
+		  == 0) {
+			if (finalized)
+				return POLLHUP;
+			else {
+				/*
+				 * The memory barriers
+				 * __wait_event()/wake_up_interruptible() take
+				 * care of "raw_spin_is_locked" memory ordering.
+				 */
+				if (raw_spin_is_locked(&buf->raw_tick_nohz_spinlock))
+					goto retry;
+				else
+					return 0;
+			}
+		} else {
+			if (subbuf_trunc(lib_ring_buffer_get_offset(config, buf),
+					 chan)
+			  - subbuf_trunc(lib_ring_buffer_get_consumed(config, buf),
+					 chan)
+			  >= chan->backend.buf_size)
+				return POLLPRI | POLLRDBAND;
+			else
+				return POLLIN | POLLRDNORM;
+		}
+	}
+	return mask;
+}
+
+/**
+ *	lib_ring_buffer_ioctl - control ring buffer reader synchronization
+ *
+ *	@filp: the file
+ *	@cmd: the command
+ *	@arg: command arg
+ *
+ *	This ioctl implements commands necessary for producer/consumer
+ *	and flight recorder reader interaction :
+ *	RING_BUFFER_GET_NEXT_SUBBUF
+ *		Get the next sub-buffer that can be read. It never blocks.
+ *	RING_BUFFER_PUT_NEXT_SUBBUF
+ *		Release the currently read sub-buffer.
+ *	RING_BUFFER_GET_SUBBUF_SIZE
+ *		returns the size of the current sub-buffer.
+ *	RING_BUFFER_GET_MAX_SUBBUF_SIZE
+ *		returns the maximum size for sub-buffers.
+ *	RING_BUFFER_GET_NUM_SUBBUF
+ *		returns the number of reader-visible sub-buffers in the per cpu
+ *              channel (for mmap).
+ *      RING_BUFFER_GET_MMAP_READ_OFFSET
+ *              returns the offset of the subbuffer belonging to the reader.
+ *              Should only be used for mmap clients.
+ */
+long lib_ring_buffer_ioctl(struct file *filp, unsigned int cmd, unsigned long arg)
+{
+	struct lib_ring_buffer *buf = filp->private_data;
+	struct channel *chan = buf->backend.chan;
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+
+	if (lib_ring_buffer_channel_is_disabled(chan))
+		return -EIO;
+
+	switch (cmd) {
+	case RING_BUFFER_SNAPSHOT:
+		return lib_ring_buffer_snapshot(buf, &buf->cons_snapshot,
+					    &buf->prod_snapshot);
+	case RING_BUFFER_SNAPSHOT_GET_CONSUMED:
+		return put_ulong(buf->cons_snapshot, arg);
+	case RING_BUFFER_SNAPSHOT_GET_PRODUCED:
+		return put_ulong(buf->prod_snapshot, arg);
+	case RING_BUFFER_GET_SUBBUF:
+	{
+		unsigned long uconsume;
+		long ret;
+
+		ret = get_user(uconsume, (unsigned long __user *) arg);
+		if (ret)
+			return ret; /* will return -EFAULT */
+		ret = lib_ring_buffer_get_subbuf(buf, uconsume);
+		if (!ret) {
+			/* Set file position to zero at each successful "get" */
+			filp->f_pos = 0;
+		}
+		return ret;
+	}
+	case RING_BUFFER_PUT_SUBBUF:
+		lib_ring_buffer_put_subbuf(buf);
+		return 0;
+
+	case RING_BUFFER_GET_NEXT_SUBBUF:
+	{
+		long ret;
+
+		ret = lib_ring_buffer_get_next_subbuf(buf);
+		if (!ret) {
+			/* Set file position to zero at each successful "get" */
+			filp->f_pos = 0;
+		}
+		return ret;
+	}
+	case RING_BUFFER_PUT_NEXT_SUBBUF:
+		lib_ring_buffer_put_next_subbuf(buf);
+		return 0;
+	case RING_BUFFER_GET_SUBBUF_SIZE:
+		return put_ulong(lib_ring_buffer_get_read_data_size(config, buf),
+				 arg);
+	case RING_BUFFER_GET_PADDED_SUBBUF_SIZE:
+	{
+		unsigned long size;
+
+		size = lib_ring_buffer_get_read_data_size(config, buf);
+		size = PAGE_ALIGN(size);
+		return put_ulong(size, arg);
+	}
+	case RING_BUFFER_GET_MAX_SUBBUF_SIZE:
+		return put_ulong(chan->backend.subbuf_size, arg);
+	case RING_BUFFER_GET_MMAP_LEN:
+	{
+		unsigned long mmap_buf_len;
+
+		if (config->output != RING_BUFFER_MMAP)
+			return -EINVAL;
+		mmap_buf_len = chan->backend.buf_size;
+		if (chan->backend.extra_reader_sb)
+			mmap_buf_len += chan->backend.subbuf_size;
+		if (mmap_buf_len > INT_MAX)
+			return -EFBIG;
+		return put_ulong(mmap_buf_len, arg);
+	}
+	case RING_BUFFER_GET_MMAP_READ_OFFSET:
+	{
+		unsigned long sb_bindex;
+
+		if (config->output != RING_BUFFER_MMAP)
+			return -EINVAL;
+		sb_bindex = subbuffer_id_get_index(config,
+						   buf->backend.buf_rsb.id);
+		return put_ulong(buf->backend.array[sb_bindex]->mmap_offset,
+				 arg);
+	}
+	case RING_BUFFER_FLUSH:
+		lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+		return 0;
+	default:
+		return -ENOIOCTLCMD;
+	}
+}
+
+#ifdef CONFIG_COMPAT
+long lib_ring_buffer_compat_ioctl(struct file *filp, unsigned int cmd,
+				  unsigned long arg)
+{
+	struct lib_ring_buffer *buf = filp->private_data;
+	struct channel *chan = buf->backend.chan;
+	const struct lib_ring_buffer_config *config = chan->backend.config;
+
+	if (lib_ring_buffer_channel_is_disabled(chan))
+		return -EIO;
+
+	switch (cmd) {
+	case RING_BUFFER_SNAPSHOT:
+		return lib_ring_buffer_snapshot(buf, &buf->cons_snapshot,
+						&buf->prod_snapshot);
+	case RING_BUFFER_SNAPSHOT_GET_CONSUMED:
+		return compat_put_ulong(buf->cons_snapshot, arg);
+	case RING_BUFFER_SNAPSHOT_GET_PRODUCED:
+		return compat_put_ulong(buf->prod_snapshot, arg);
+	case RING_BUFFER_GET_SUBBUF:
+	{
+		__u32 uconsume;
+		unsigned long consume;
+		long ret;
+
+		ret = get_user(uconsume, (__u32 __user *) arg);
+		if (ret)
+			return ret; /* will return -EFAULT */
+		consume = buf->cons_snapshot;
+		consume &= ~0xFFFFFFFFL;
+		consume |= uconsume;
+		ret = lib_ring_buffer_get_subbuf(buf, consume);
+		if (!ret) {
+			/* Set file position to zero at each successful "get" */
+			filp->f_pos = 0;
+		}
+		return ret;
+	}
+	case RING_BUFFER_PUT_SUBBUF:
+		lib_ring_buffer_put_subbuf(buf);
+		return 0;
+
+	case RING_BUFFER_GET_NEXT_SUBBUF:
+	{
+		long ret;
+
+		ret = lib_ring_buffer_get_next_subbuf(buf);
+		if (!ret) {
+			/* Set file position to zero at each successful "get" */
+			filp->f_pos = 0;
+		}
+		return ret;
+	}
+	case RING_BUFFER_PUT_NEXT_SUBBUF:
+		lib_ring_buffer_put_next_subbuf(buf);
+		return 0;
+	case RING_BUFFER_GET_SUBBUF_SIZE:
+	{
+		unsigned long data_size;
+
+		data_size = lib_ring_buffer_get_read_data_size(config, buf);
+		if (data_size > UINT_MAX)
+			return -EFBIG;
+		return put_ulong(data_size, arg);
+	}
+	case RING_BUFFER_GET_PADDED_SUBBUF_SIZE:
+	{
+		unsigned long size;
+
+		size = lib_ring_buffer_get_read_data_size(config, buf);
+		size = PAGE_ALIGN(size);
+		if (size > UINT_MAX)
+			return -EFBIG;
+		return put_ulong(size, arg);
+	}
+	case RING_BUFFER_GET_MAX_SUBBUF_SIZE:
+		if (chan->backend.subbuf_size > UINT_MAX)
+			return -EFBIG;
+		return put_ulong(chan->backend.subbuf_size, arg);
+	case RING_BUFFER_GET_MMAP_LEN:
+	{
+		unsigned long mmap_buf_len;
+
+		if (config->output != RING_BUFFER_MMAP)
+			return -EINVAL;
+		mmap_buf_len = chan->backend.buf_size;
+		if (chan->backend.extra_reader_sb)
+			mmap_buf_len += chan->backend.subbuf_size;
+		if (mmap_buf_len > UINT_MAX)
+			return -EFBIG;
+		return put_ulong(mmap_buf_len, arg);
+	}
+	case RING_BUFFER_GET_MMAP_READ_OFFSET:
+	{
+		unsigned long sb_bindex, read_offset;
+
+		if (config->output != RING_BUFFER_MMAP)
+			return -EINVAL;
+		sb_bindex = subbuffer_id_get_index(config,
+						   buf->backend.buf_rsb.id);
+		read_offset = buf->backend.array[sb_bindex]->mmap_offset;
+		if (read_offset > UINT_MAX)
+			return -EINVAL;
+		return put_ulong(read_offset, arg);
+	}
+	case RING_BUFFER_FLUSH:
+		lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+		return 0;
+	default:
+		return -ENOIOCTLCMD;
+	}
+}
+#endif
+
+const struct file_operations lib_ring_buffer_file_operations = {
+	.owner = THIS_MODULE,
+	.open = lib_ring_buffer_open,
+	.release = lib_ring_buffer_release,
+	.poll = lib_ring_buffer_poll,
+	.splice_read = lib_ring_buffer_splice_read,
+	.mmap = lib_ring_buffer_mmap,
+	.unlocked_ioctl = lib_ring_buffer_ioctl,
+	.llseek = lib_ring_buffer_no_llseek,
+#ifdef CONFIG_COMPAT
+	.compat_ioctl = lib_ring_buffer_compat_ioctl,
+#endif
+};
+EXPORT_SYMBOL_GPL(lib_ring_buffer_file_operations);
+
+MODULE_LICENSE("GPL and additional rights");
+MODULE_AUTHOR("Mathieu Desnoyers");
+MODULE_DESCRIPTION("Ring Buffer Library VFS");
diff --git a/drivers/staging/lttng/lib/ringbuffer/vatomic.h b/drivers/staging/lttng/lib/ringbuffer/vatomic.h
new file mode 100644
index 0000000..b944dd6
--- /dev/null
+++ b/drivers/staging/lttng/lib/ringbuffer/vatomic.h
@@ -0,0 +1,85 @@
+#ifndef _LINUX_RING_BUFFER_VATOMIC_H
+#define _LINUX_RING_BUFFER_VATOMIC_H
+
+/*
+ * linux/ringbuffer/vatomic.h
+ *
+ * Copyright (C) 2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <asm/atomic.h>
+#include <asm/local.h>
+
+/*
+ * Same data type (long) accessed differently depending on configuration.
+ * v field is for non-atomic access (protected by mutual exclusion).
+ * In the fast-path, the ring_buffer_config structure is constant, so the
+ * compiler can statically select the appropriate branch.
+ * local_t is used for per-cpu and per-thread buffers.
+ * atomic_long_t is used for globally shared buffers.
+ */
+union v_atomic {
+	local_t l;
+	atomic_long_t a;
+	long v;
+};
+
+static inline
+long v_read(const struct lib_ring_buffer_config *config, union v_atomic *v_a)
+{
+	if (config->sync == RING_BUFFER_SYNC_PER_CPU)
+		return local_read(&v_a->l);
+	else
+		return atomic_long_read(&v_a->a);
+}
+
+static inline
+void v_set(const struct lib_ring_buffer_config *config, union v_atomic *v_a,
+	   long v)
+{
+	if (config->sync == RING_BUFFER_SYNC_PER_CPU)
+		local_set(&v_a->l, v);
+	else
+		atomic_long_set(&v_a->a, v);
+}
+
+static inline
+void v_add(const struct lib_ring_buffer_config *config, long v, union v_atomic *v_a)
+{
+	if (config->sync == RING_BUFFER_SYNC_PER_CPU)
+		local_add(v, &v_a->l);
+	else
+		atomic_long_add(v, &v_a->a);
+}
+
+static inline
+void v_inc(const struct lib_ring_buffer_config *config, union v_atomic *v_a)
+{
+	if (config->sync == RING_BUFFER_SYNC_PER_CPU)
+		local_inc(&v_a->l);
+	else
+		atomic_long_inc(&v_a->a);
+}
+
+/*
+ * Non-atomic decrement. Only used by reader, apply to reader-owned subbuffer.
+ */
+static inline
+void _v_dec(const struct lib_ring_buffer_config *config, union v_atomic *v_a)
+{
+	--v_a->v;
+}
+
+static inline
+long v_cmpxchg(const struct lib_ring_buffer_config *config, union v_atomic *v_a,
+	       long old, long _new)
+{
+	if (config->sync == RING_BUFFER_SYNC_PER_CPU)
+		return local_cmpxchg(&v_a->l, old, _new);
+	else
+		return atomic_long_cmpxchg(&v_a->a, old, _new);
+}
+
+#endif /* _LINUX_RING_BUFFER_VATOMIC_H */
diff --git a/drivers/staging/lttng/lib/ringbuffer/vfs.h b/drivers/staging/lttng/lib/ringbuffer/vfs.h
new file mode 100644
index 0000000..d073e4c
--- /dev/null
+++ b/drivers/staging/lttng/lib/ringbuffer/vfs.h
@@ -0,0 +1,89 @@
+#ifndef _LINUX_RING_BUFFER_VFS_H
+#define _LINUX_RING_BUFFER_VFS_H
+
+/*
+ * linux/ringbuffer/vfs.h
+ *
+ * (C) Copyright 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * Wait-free ring buffer VFS file operations.
+ *
+ * Author:
+ *	Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <linux/fs.h>
+#include <linux/poll.h>
+
+/* VFS API */
+
+extern const struct file_operations lib_ring_buffer_file_operations;
+
+/*
+ * Internal file operations.
+ */
+
+int lib_ring_buffer_open(struct inode *inode, struct file *file);
+int lib_ring_buffer_release(struct inode *inode, struct file *file);
+unsigned int lib_ring_buffer_poll(struct file *filp, poll_table *wait);
+ssize_t lib_ring_buffer_splice_read(struct file *in, loff_t *ppos,
+				    struct pipe_inode_info *pipe, size_t len,
+				    unsigned int flags);
+int lib_ring_buffer_mmap(struct file *filp, struct vm_area_struct *vma);
+
+/* Ring Buffer ioctl() and ioctl numbers */
+long lib_ring_buffer_ioctl(struct file *filp, unsigned int cmd, unsigned long arg);
+#ifdef CONFIG_COMPAT
+long lib_ring_buffer_compat_ioctl(struct file *filp, unsigned int cmd,
+				  unsigned long arg);
+#endif
+
+/*
+ * Use RING_BUFFER_GET_NEXT_SUBBUF / RING_BUFFER_PUT_NEXT_SUBBUF to read and
+ * consume sub-buffers sequentially.
+ *
+ * Reading sub-buffers without consuming them can be performed with:
+ *
+ * RING_BUFFER_SNAPSHOT
+ * RING_BUFFER_SNAPSHOT_GET_CONSUMED
+ * RING_BUFFER_SNAPSHOT_GET_PRODUCED
+ *
+ * to get the offset range to consume, and then by passing each sub-buffer
+ * offset to RING_BUFFER_GET_SUBBUF, read the sub-buffer, and then release it
+ * with RING_BUFFER_PUT_SUBBUF.
+ *
+ * Note that the "snapshot" API can be used to read the sub-buffer in reverse
+ * order, which is useful for flight recorder snapshots.
+ */
+
+/* Get a snapshot of the current ring buffer producer and consumer positions */
+#define RING_BUFFER_SNAPSHOT			_IO(0xF6, 0x00)
+/* Get the consumer position (iteration start) */
+#define RING_BUFFER_SNAPSHOT_GET_CONSUMED	_IOR(0xF6, 0x01, unsigned long)
+/* Get the producer position (iteration end) */
+#define RING_BUFFER_SNAPSHOT_GET_PRODUCED	_IOR(0xF6, 0x02, unsigned long)
+/* Get exclusive read access to the specified sub-buffer position */
+#define RING_BUFFER_GET_SUBBUF			_IOW(0xF6, 0x03, unsigned long)
+/* Release exclusive sub-buffer access */
+#define RING_BUFFER_PUT_SUBBUF			_IO(0xF6, 0x04)
+
+/* Get exclusive read access to the next sub-buffer that can be read. */
+#define RING_BUFFER_GET_NEXT_SUBBUF		_IO(0xF6, 0x05)
+/* Release exclusive sub-buffer access, move consumer forward. */
+#define RING_BUFFER_PUT_NEXT_SUBBUF		_IO(0xF6, 0x06)
+/* returns the size of the current sub-buffer, without padding (for mmap). */
+#define RING_BUFFER_GET_SUBBUF_SIZE		_IOR(0xF6, 0x07, unsigned long)
+/* returns the size of the current sub-buffer, with padding (for splice). */
+#define RING_BUFFER_GET_PADDED_SUBBUF_SIZE	_IOR(0xF6, 0x08, unsigned long)
+/* returns the maximum size for sub-buffers. */
+#define RING_BUFFER_GET_MAX_SUBBUF_SIZE		_IOR(0xF6, 0x09, unsigned long)
+/* returns the length to mmap. */
+#define RING_BUFFER_GET_MMAP_LEN		_IOR(0xF6, 0x0A, unsigned long)
+/* returns the offset of the subbuffer belonging to the mmap reader. */
+#define RING_BUFFER_GET_MMAP_READ_OFFSET	_IOR(0xF6, 0x0B, unsigned long)
+/* flush the current sub-buffer */
+#define RING_BUFFER_FLUSH			_IO(0xF6, 0x0C)
+
+#endif /* _LINUX_RING_BUFFER_VFS_H */
-- 
1.7.2.5




More information about the lttng-dev mailing list