[ltt-dev] [LTTNG-TOOLS PATCH 2/3] Register the consuming function

Julien Desfossez julien.desfossez at polymtl.ca
Fri Aug 5 08:45:52 EDT 2011


The init function of the library now takes a function as argument to
allow a consumer using the library to control the function to be called
when data is ready in a buffer.
The kconsumerd_on_read_subbuffer_mmap and
kconsumerd_on_read_subbuffer_splice are now exported to allow a consumer
to use them directly if needed.

Signed-off-by: Julien Desfossez <julien.desfossez at polymtl.ca>
---
 liblttkconsumerd/liblttkconsumerd.c |   99 ++++-------------------------------
 liblttkconsumerd/liblttkconsumerd.h |   26 ++++++++-
 ltt-kconsumerd/ltt-kconsumerd.c     |   82 ++++++++++++++++++++++++++++-
 3 files changed, 116 insertions(+), 91 deletions(-)

diff --git a/liblttkconsumerd/liblttkconsumerd.c b/liblttkconsumerd/liblttkconsumerd.c
index f60888a..9d8cb00 100644
--- a/liblttkconsumerd/liblttkconsumerd.c
+++ b/liblttkconsumerd/liblttkconsumerd.c
@@ -34,6 +34,8 @@
 #include "liblttkconsumerd.h"
 #include "lttngerr.h"
 
+static int (*on_buffer_ready)(struct kconsumerd_fd *kconsumerd_fd);
+
 static
 struct kconsumerd_global_data {
 	/*
@@ -265,8 +267,8 @@ static int kconsumerd_update_poll_array(struct pollfd **pollfd,
  * mmap the ring buffer, read it and write the data to the tracefile.
  * Returns the number of bytes written
  */
-static int kconsumerd_on_read_subbuffer_mmap(
-		struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
+int kconsumerd_on_read_subbuffer_mmap(struct kconsumerd_fd *kconsumerd_fd,
+		unsigned long len)
 {
 	unsigned long mmap_len, mmap_offset, padded_len, padding_len;
 	char *mmap_base;
@@ -378,8 +380,8 @@ end:
  * Splice the data from the ring buffer to the tracefile.
  * Returns the number of bytes spliced
  */
-static int kconsumerd_on_read_subbuffer(
-		struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
+int kconsumerd_on_read_subbuffer_splice(struct kconsumerd_fd *kconsumerd_fd,
+		unsigned long len)
 {
 	long ret = 0;
 	loff_t offset = 0;
@@ -469,87 +471,6 @@ end:
 }
 
 /*
- * kconsumerd_read_subbuffer
- *
- * Consume data on a file descriptor and write it on a trace file
- */
-static int kconsumerd_read_subbuffer(struct kconsumerd_fd *kconsumerd_fd)
-{
-	unsigned long len;
-	int err;
-	long ret = 0;
-	int infd = kconsumerd_fd->consumerd_fd;
-
-	DBG("In kconsumerd_read_subbuffer (infd : %d)", infd);
-	/* Get the next subbuffer */
-	err = kernctl_get_next_subbuf(infd);
-	if (err != 0) {
-		ret = errno;
-		perror("Reserving sub buffer failed (everything is normal, "
-				"it is due to concurrency)");
-		goto end;
-	}
-
-	switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) {
-	case LTTNG_EVENT_SPLICE:
-		/* read the whole subbuffer */
-		err = kernctl_get_padded_subbuf_size(infd, &len);
-		if (err != 0) {
-			ret = errno;
-			perror("Getting sub-buffer len failed.");
-			goto end;
-		}
-
-		/* splice the subbuffer to the tracefile */
-		ret = kconsumerd_on_read_subbuffer(kconsumerd_fd, len);
-		if (ret < 0) {
-			/*
-			 * display the error but continue processing to try
-			 * to release the subbuffer
-			 */
-			ERR("Error splicing to tracefile");
-		}
-		break;
-	case LTTNG_EVENT_MMAP:
-		/* read the used subbuffer size */
-		err = kernctl_get_subbuf_size(infd, &len);
-		if (err != 0) {
-			ret = errno;
-			perror("Getting sub-buffer len failed.");
-			goto end;
-		}
-		/* write the subbuffer to the tracefile */
-		ret = kconsumerd_on_read_subbuffer_mmap(kconsumerd_fd, len);
-		if (ret < 0) {
-			/*
-			 * display the error but continue processing to try
-			 * to release the subbuffer
-			 */
-			ERR("Error writing to tracefile");
-		}
-		break;
-	default:
-		ERR("Unknown output method");
-		ret = -1;
-	}
-
-	err = kernctl_put_next_subbuf(infd);
-	if (err != 0) {
-		ret = errno;
-		if (errno == EFAULT) {
-			perror("Error in unreserving sub buffer\n");
-		} else if (errno == EIO) {
-			/* Should never happen with newer LTTng versions */
-			perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
-		}
-		goto end;
-	}
-
-end:
-	return ret;
-}
-
-/*
  * kconsumerd_poll_socket
  *
  * Poll on the should_quit pipe and the command socket
@@ -795,7 +716,7 @@ void *kconsumerd_thread_poll_fds(void *data)
 			case POLLPRI:
 				DBG("Urgent read on fd %d", pollfd[i].fd);
 				high_prio = 1;
-				ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
+				ret = on_buffer_ready(local_kconsumerd_fd[i]);
 				/* it's ok to have an unavailable sub-buffer */
 				if (ret == EAGAIN) {
 					ret = 0;
@@ -818,7 +739,7 @@ void *kconsumerd_thread_poll_fds(void *data)
 			for (i = 0; i < nb_fd; i++) {
 				if (pollfd[i].revents == POLLIN) {
 					DBG("Normal read on fd %d", pollfd[i].fd);
-					ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
+					ret = on_buffer_ready(local_kconsumerd_fd[i]);
 					/* it's ok to have an unavailable subbuffer */
 					if (ret == EAGAIN) {
 						ret = 0;
@@ -848,10 +769,12 @@ end:
  * - create the poll_pipe
  * - create the should_quit pipe (for signal handler)
  */
-int kconsumerd_init(void)
+int kconsumerd_init(int (*buffer_ready)(struct kconsumerd_fd *kconsumerd_fd))
 {
 	int ret;
 
+	on_buffer_ready = buffer_ready;
+
 	/* need to update the polling array at init time */
 	kconsumerd_data.need_update = 1;
 
diff --git a/liblttkconsumerd/liblttkconsumerd.h b/liblttkconsumerd/liblttkconsumerd.h
index 9e0b9ff..f98621f 100644
--- a/liblttkconsumerd/liblttkconsumerd.h
+++ b/liblttkconsumerd/liblttkconsumerd.h
@@ -58,13 +58,35 @@ struct kconsumerd_fd {
 };
 
 /*
- * kconsumerd_init(void)
+ * kconsumerd_init
  * initialise the necessary environnement :
  * - inform the polling thread to update the polling array
  * - create the poll_pipe
  * - create the should_quit pipe (for signal handler)
+ *
+ * Takes a function pointer as argument, this function is called when data is
+ * available on a buffer. This function is responsible to do the
+ * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
+ * buffer configuration and then kernctl_put_next_subbuf at the end.
+ */
+int kconsumerd_init(int (*kbuffer_ready)(struct kconsumerd_fd *kconsumerd_fd));
+
+/*
+ * kconsumerd_on_read_subbuffer_mmap
+ * mmap the ring buffer, read it and write the data to the tracefile.
+ * Returns the number of bytes written
+ */
+int kconsumerd_on_read_subbuffer_mmap(struct kconsumerd_fd *kconsumerd_fd,
+		        unsigned long len);
+
+/*
+ * kconsumerd_on_read_subbuffer
+ *
+ * Splice the data from the ring buffer to the tracefile.
+ * Returns the number of bytes spliced
  */
-int kconsumerd_init(void);
+int kconsumerd_on_read_subbuffer_splice(struct kconsumerd_fd *kconsumerd_fd,
+		        unsigned long len);
 
 /*
  * kconsumerd_send_error
diff --git a/ltt-kconsumerd/ltt-kconsumerd.c b/ltt-kconsumerd/ltt-kconsumerd.c
index 4180f89..61ed005 100644
--- a/ltt-kconsumerd/ltt-kconsumerd.c
+++ b/ltt-kconsumerd/ltt-kconsumerd.c
@@ -190,6 +190,86 @@ static void parse_args(int argc, char **argv)
 	}
 }
 
+/*
+ * read_subbuffer
+ *
+ * Consume data on a file descriptor and write it on a trace file
+ */
+static int read_subbuffer(struct kconsumerd_fd *kconsumerd_fd)
+{
+	unsigned long len;
+	int err;
+	long ret = 0;
+	int infd = kconsumerd_fd->consumerd_fd;
+
+	DBG("In kconsumerd_read_subbuffer (infd : %d)", infd);
+	/* Get the next subbuffer */
+	err = kernctl_get_next_subbuf(infd);
+	if (err != 0) {
+		ret = errno;
+		perror("Reserving sub buffer failed (everything is normal, "
+				"it is due to concurrency)");
+		goto end;
+	}
+
+	switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) {
+		case LTTNG_EVENT_SPLICE:
+			/* read the whole subbuffer */
+			err = kernctl_get_padded_subbuf_size(infd, &len);
+			if (err != 0) {
+				ret = errno;
+				perror("Getting sub-buffer len failed.");
+				goto end;
+			}
+
+			/* splice the subbuffer to the tracefile */
+			ret = kconsumerd_on_read_subbuffer_splice(kconsumerd_fd, len);
+			if (ret < 0) {
+				/*
+				 * display the error but continue processing to try
+				 * to release the subbuffer
+				 */
+				ERR("Error splicing to tracefile");
+			}
+			break;
+		case LTTNG_EVENT_MMAP:
+			/* read the used subbuffer size */
+			err = kernctl_get_subbuf_size(infd, &len);
+			if (err != 0) {
+				ret = errno;
+				perror("Getting sub-buffer len failed.");
+				goto end;
+			}
+			/* write the subbuffer to the tracefile */
+			ret = kconsumerd_on_read_subbuffer_mmap(kconsumerd_fd, len);
+			if (ret < 0) {
+				/*
+				 * display the error but continue processing to try
+				 * to release the subbuffer
+				 */
+				ERR("Error writing to tracefile");
+			}
+			break;
+		default:
+			ERR("Unknown output method");
+			ret = -1;
+	}
+
+	err = kernctl_put_next_subbuf(infd);
+	if (err != 0) {
+		ret = errno;
+		if (errno == EFAULT) {
+			perror("Error in unreserving sub buffer\n");
+		} else if (errno == EIO) {
+			/* Should never happen with newer LTTng versions */
+			perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
+		}
+		goto end;
+	}
+
+end:
+	return ret;
+}
 
 /*
  * main
@@ -228,7 +308,7 @@ int main(int argc, char **argv)
 	}
 
 	/* create the pipe to wake to receiving thread when needed */
-	ret = kconsumerd_init();
+	ret = kconsumerd_init(read_subbuffer);
 	if (ret < 0) {
 		goto end;
 	}
-- 
1.7.4.1





More information about the lttng-dev mailing list