[ltt-dev] [LTTNG-TOOLS PATCH 2/3] Register the consuming function
Julien Desfossez
julien.desfossez at polymtl.ca
Fri Aug 5 08:45:52 EDT 2011
The init function of the library now takes a function as argument to
allow a consumer using the library to control the function to be called
when data is ready in a buffer.
The kconsumerd_on_read_subbuffer_mmap and
kconsumerd_on_read_subbuffer_splice are now exported to allow a consumer
to use them directly if needed.
Signed-off-by: Julien Desfossez <julien.desfossez at polymtl.ca>
---
liblttkconsumerd/liblttkconsumerd.c | 99 ++++-------------------------------
liblttkconsumerd/liblttkconsumerd.h | 26 ++++++++-
ltt-kconsumerd/ltt-kconsumerd.c | 82 ++++++++++++++++++++++++++++-
3 files changed, 116 insertions(+), 91 deletions(-)
diff --git a/liblttkconsumerd/liblttkconsumerd.c b/liblttkconsumerd/liblttkconsumerd.c
index f60888a..9d8cb00 100644
--- a/liblttkconsumerd/liblttkconsumerd.c
+++ b/liblttkconsumerd/liblttkconsumerd.c
@@ -34,6 +34,8 @@
#include "liblttkconsumerd.h"
#include "lttngerr.h"
+static int (*on_buffer_ready)(struct kconsumerd_fd *kconsumerd_fd);
+
static
struct kconsumerd_global_data {
/*
@@ -265,8 +267,8 @@ static int kconsumerd_update_poll_array(struct pollfd **pollfd,
* mmap the ring buffer, read it and write the data to the tracefile.
* Returns the number of bytes written
*/
-static int kconsumerd_on_read_subbuffer_mmap(
- struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
+int kconsumerd_on_read_subbuffer_mmap(struct kconsumerd_fd *kconsumerd_fd,
+ unsigned long len)
{
unsigned long mmap_len, mmap_offset, padded_len, padding_len;
char *mmap_base;
@@ -378,8 +380,8 @@ end:
* Splice the data from the ring buffer to the tracefile.
* Returns the number of bytes spliced
*/
-static int kconsumerd_on_read_subbuffer(
- struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
+int kconsumerd_on_read_subbuffer_splice(struct kconsumerd_fd *kconsumerd_fd,
+ unsigned long len)
{
long ret = 0;
loff_t offset = 0;
@@ -469,87 +471,6 @@ end:
}
/*
- * kconsumerd_read_subbuffer
- *
- * Consume data on a file descriptor and write it on a trace file
- */
-static int kconsumerd_read_subbuffer(struct kconsumerd_fd *kconsumerd_fd)
-{
- unsigned long len;
- int err;
- long ret = 0;
- int infd = kconsumerd_fd->consumerd_fd;
-
- DBG("In kconsumerd_read_subbuffer (infd : %d)", infd);
- /* Get the next subbuffer */
- err = kernctl_get_next_subbuf(infd);
- if (err != 0) {
- ret = errno;
- perror("Reserving sub buffer failed (everything is normal, "
- "it is due to concurrency)");
- goto end;
- }
-
- switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) {
- case LTTNG_EVENT_SPLICE:
- /* read the whole subbuffer */
- err = kernctl_get_padded_subbuf_size(infd, &len);
- if (err != 0) {
- ret = errno;
- perror("Getting sub-buffer len failed.");
- goto end;
- }
-
- /* splice the subbuffer to the tracefile */
- ret = kconsumerd_on_read_subbuffer(kconsumerd_fd, len);
- if (ret < 0) {
- /*
- * display the error but continue processing to try
- * to release the subbuffer
- */
- ERR("Error splicing to tracefile");
- }
- break;
- case LTTNG_EVENT_MMAP:
- /* read the used subbuffer size */
- err = kernctl_get_subbuf_size(infd, &len);
- if (err != 0) {
- ret = errno;
- perror("Getting sub-buffer len failed.");
- goto end;
- }
- /* write the subbuffer to the tracefile */
- ret = kconsumerd_on_read_subbuffer_mmap(kconsumerd_fd, len);
- if (ret < 0) {
- /*
- * display the error but continue processing to try
- * to release the subbuffer
- */
- ERR("Error writing to tracefile");
- }
- break;
- default:
- ERR("Unknown output method");
- ret = -1;
- }
-
- err = kernctl_put_next_subbuf(infd);
- if (err != 0) {
- ret = errno;
- if (errno == EFAULT) {
- perror("Error in unreserving sub buffer\n");
- } else if (errno == EIO) {
- /* Should never happen with newer LTTng versions */
- perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
- }
- goto end;
- }
-
-end:
- return ret;
-}
-
-/*
* kconsumerd_poll_socket
*
* Poll on the should_quit pipe and the command socket
@@ -795,7 +716,7 @@ void *kconsumerd_thread_poll_fds(void *data)
case POLLPRI:
DBG("Urgent read on fd %d", pollfd[i].fd);
high_prio = 1;
- ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
+ ret = on_buffer_ready(local_kconsumerd_fd[i]);
/* it's ok to have an unavailable sub-buffer */
if (ret == EAGAIN) {
ret = 0;
@@ -818,7 +739,7 @@ void *kconsumerd_thread_poll_fds(void *data)
for (i = 0; i < nb_fd; i++) {
if (pollfd[i].revents == POLLIN) {
DBG("Normal read on fd %d", pollfd[i].fd);
- ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
+ ret = on_buffer_ready(local_kconsumerd_fd[i]);
/* it's ok to have an unavailable subbuffer */
if (ret == EAGAIN) {
ret = 0;
@@ -848,10 +769,12 @@ end:
* - create the poll_pipe
* - create the should_quit pipe (for signal handler)
*/
-int kconsumerd_init(void)
+int kconsumerd_init(int (*buffer_ready)(struct kconsumerd_fd *kconsumerd_fd))
{
int ret;
+ on_buffer_ready = buffer_ready;
+
/* need to update the polling array at init time */
kconsumerd_data.need_update = 1;
diff --git a/liblttkconsumerd/liblttkconsumerd.h b/liblttkconsumerd/liblttkconsumerd.h
index 9e0b9ff..f98621f 100644
--- a/liblttkconsumerd/liblttkconsumerd.h
+++ b/liblttkconsumerd/liblttkconsumerd.h
@@ -58,13 +58,35 @@ struct kconsumerd_fd {
};
/*
- * kconsumerd_init(void)
+ * kconsumerd_init
* initialise the necessary environnement :
* - inform the polling thread to update the polling array
* - create the poll_pipe
* - create the should_quit pipe (for signal handler)
+ *
+ * Takes a function pointer as argument, this function is called when data is
+ * available on a buffer. This function is responsible to do the
+ * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
+ * buffer configuration and then kernctl_put_next_subbuf at the end.
+ */
+int kconsumerd_init(int (*kbuffer_ready)(struct kconsumerd_fd *kconsumerd_fd));
+
+/*
+ * kconsumerd_on_read_subbuffer_mmap
+ * mmap the ring buffer, read it and write the data to the tracefile.
+ * Returns the number of bytes written
+ */
+int kconsumerd_on_read_subbuffer_mmap(struct kconsumerd_fd *kconsumerd_fd,
+ unsigned long len);
+
+/*
+ * kconsumerd_on_read_subbuffer
+ *
+ * Splice the data from the ring buffer to the tracefile.
+ * Returns the number of bytes spliced
*/
-int kconsumerd_init(void);
+int kconsumerd_on_read_subbuffer_splice(struct kconsumerd_fd *kconsumerd_fd,
+ unsigned long len);
/*
* kconsumerd_send_error
diff --git a/ltt-kconsumerd/ltt-kconsumerd.c b/ltt-kconsumerd/ltt-kconsumerd.c
index 4180f89..61ed005 100644
--- a/ltt-kconsumerd/ltt-kconsumerd.c
+++ b/ltt-kconsumerd/ltt-kconsumerd.c
@@ -190,6 +190,86 @@ static void parse_args(int argc, char **argv)
}
}
+/*
+ * read_subbuffer
+ *
+ * Consume data on a file descriptor and write it on a trace file
+ */
+static int read_subbuffer(struct kconsumerd_fd *kconsumerd_fd)
+{
+ unsigned long len;
+ int err;
+ long ret = 0;
+ int infd = kconsumerd_fd->consumerd_fd;
+
+ DBG("In kconsumerd_read_subbuffer (infd : %d)", infd);
+ /* Get the next subbuffer */
+ err = kernctl_get_next_subbuf(infd);
+ if (err != 0) {
+ ret = errno;
+ perror("Reserving sub buffer failed (everything is normal, "
+ "it is due to concurrency)");
+ goto end;
+ }
+
+ switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) {
+ case LTTNG_EVENT_SPLICE:
+ /* read the whole subbuffer */
+ err = kernctl_get_padded_subbuf_size(infd, &len);
+ if (err != 0) {
+ ret = errno;
+ perror("Getting sub-buffer len failed.");
+ goto end;
+ }
+
+ /* splice the subbuffer to the tracefile */
+ ret = kconsumerd_on_read_subbuffer_splice(kconsumerd_fd, len);
+ if (ret < 0) {
+ /*
+ * display the error but continue processing to try
+ * to release the subbuffer
+ */
+ ERR("Error splicing to tracefile");
+ }
+ break;
+ case LTTNG_EVENT_MMAP:
+ /* read the used subbuffer size */
+ err = kernctl_get_subbuf_size(infd, &len);
+ if (err != 0) {
+ ret = errno;
+ perror("Getting sub-buffer len failed.");
+ goto end;
+ }
+ /* write the subbuffer to the tracefile */
+ ret = kconsumerd_on_read_subbuffer_mmap(kconsumerd_fd, len);
+ if (ret < 0) {
+ /*
+ * display the error but continue processing to try
+ * to release the subbuffer
+ */
+ ERR("Error writing to tracefile");
+ }
+ break;
+ default:
+ ERR("Unknown output method");
+ ret = -1;
+ }
+
+ err = kernctl_put_next_subbuf(infd);
+ if (err != 0) {
+ ret = errno;
+ if (errno == EFAULT) {
+ perror("Error in unreserving sub buffer\n");
+ } else if (errno == EIO) {
+ /* Should never happen with newer LTTng versions */
+ perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
+ }
+ goto end;
+ }
+
+end:
+ return ret;
+}
/*
* main
@@ -228,7 +308,7 @@ int main(int argc, char **argv)
}
/* create the pipe to wake to receiving thread when needed */
- ret = kconsumerd_init();
+ ret = kconsumerd_init(read_subbuffer);
if (ret < 0) {
goto end;
}
--
1.7.4.1
More information about the lttng-dev
mailing list