[ltt-dev] [LTTNG-TOOLS PATCH 2/2] Callbacks on receive and update FD

Julien Desfossez julien.desfossez at polymtl.ca
Tue Aug 16 13:35:33 EDT 2011


The user of the lib can now take control over a new FD or the update
operation of an existing FD.
Opening the output tracefile is now the responsiblity of the user
and not the library itself.

Signed-off-by: Julien Desfossez <julien.desfossez at polymtl.ca>
---
 include/lttng/lttng-kconsumerd.h     |   25 +++++++--
 liblttngkconsumerd/lttngkconsumerd.c |  102 ++++++++++++++++++++--------------
 ltt-kconsumerd/ltt-kconsumerd.c      |   45 ++++++++++++++-
 3 files changed, 124 insertions(+), 48 deletions(-)

diff --git a/include/lttng/lttng-kconsumerd.h b/include/lttng/lttng-kconsumerd.h
index 7e195ab..a24a509 100644
--- a/include/lttng/lttng-kconsumerd.h
+++ b/include/lttng/lttng-kconsumerd.h
@@ -79,6 +79,21 @@ struct lttng_kconsumerd_fd {
 struct lttng_kconsumerd_local_data {
 	/* function to call when data is available on a buffer */
 	int (*on_buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd);
+	/*
+	 * function to call when we receive a new fd, it receives a newly allocated
+	 * kconsumerd_fd, if it returns the FD (as seen by the sessiond daemon :
+	 * sessiond_fd), the FD will be handled by the lib in the local FD list,
+	 * otherwise we assume the external consumer is taking care of it.
+	 */
+	int (*on_recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd);
+	/*
+	 * function to call when a FD is getting updated by the session daemon,
+	 * this function receives the FD as seen by the session daemon
+	 * (sessiond_fd) and the new state, if it returns the fd, it will be
+	 * handled locally by the lib, otherwise we assume the consumer is taking
+	 * care of it.
+	 */
+	int (*on_update_fd)(int sessiond_fd, uint32_t state);
 	/* socket to communicate errors with sessiond */
 	int kconsumerd_error_socket;
 	/* socket to exchange commands with sessiond */
@@ -98,15 +113,15 @@ struct lttng_kconsumerd_local_data {
  * - create the should_quit pipe (for signal handler)
  * - create the thread pipe (for splice)
  *
- * Takes a function pointer as argument, this function is called when data is
- * available on a buffer. This function is responsible to do the
- * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
- * buffer configuration and then kernctl_put_next_subbuf at the end.
+ * Takes the function pointers to the on_buffer_ready, on_recv_fd, and
+ * on_update_fd callbacks.
  *
  * Returns a pointer to the new context or NULL on error.
  */
 extern struct lttng_kconsumerd_local_data *lttng_kconsumerd_create(
-		int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd));
+		int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd),
+		int (*recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd),
+		int (*update_fd)(int sessiond_fd, uint32_t state));
 
 /*
  * Close all fds associated with the instance and free the context.
diff --git a/liblttngkconsumerd/lttngkconsumerd.c b/liblttngkconsumerd/lttngkconsumerd.c
index d36da9b..ed7951b 100644
--- a/liblttngkconsumerd/lttngkconsumerd.c
+++ b/liblttngkconsumerd/lttngkconsumerd.c
@@ -125,22 +125,21 @@ static void kconsumerd_del_fd(struct lttng_kconsumerd_fd *lcf)
 }
 
 /*
- * Add a fd to the global list protected by a mutex.
+ * Create a struct lttcomm_kconsumerd_msg from the
+ * information received on the receiving socket
  */
-static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf,
+struct lttng_kconsumerd_fd *kconsumerd_allocate_fd(
+		struct lttcomm_kconsumerd_msg *buf,
 		int consumerd_fd)
 {
 	struct lttng_kconsumerd_fd *tmp_fd;
-	int ret = 0;
 
-	pthread_mutex_lock(&kconsumerd_data.lock);
-	/* Check if already exist */
-	ret = kconsumerd_find_session_fd(buf->fd);
-	if (ret == 1) {
+	tmp_fd = malloc(sizeof(struct lttng_kconsumerd_fd));
+	if (tmp_fd == NULL) {
+		perror("malloc struct lttng_kconsumerd_fd");
 		goto end;
 	}
 
-	tmp_fd = malloc(sizeof(struct lttng_kconsumerd_fd));
 	tmp_fd->sessiond_fd = buf->fd;
 	tmp_fd->consumerd_fd = consumerd_fd;
 	tmp_fd->state = buf->state;
@@ -152,42 +151,31 @@ static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf,
 	tmp_fd->output = buf->output;
 	strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX);
 	tmp_fd->path_name[PATH_MAX - 1] = '\0';
+	DBG("Allocated %s (sessiond_fd %d, consumerd_fd %d, out_fd %d)",
+			tmp_fd->path_name, tmp_fd->sessiond_fd,
+			tmp_fd->consumerd_fd, tmp_fd->out_fd);
 
-	/* Opening the tracefile in write mode */
-	if (tmp_fd->path_name != NULL) {
-		ret = open(tmp_fd->path_name,
-				O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
-		if (ret < 0) {
-			ERR("Opening %s", tmp_fd->path_name);
-			perror("open");
-			goto end;
-		}
-		tmp_fd->out_fd = ret;
-		DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name,
-				tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd);
-	}
+end:
+	return tmp_fd;
+}
 
-	if (tmp_fd->output == LTTNG_EVENT_MMAP) {
-		/* get the len of the mmap region */
-		ret = kernctl_get_mmap_len(tmp_fd->consumerd_fd, &tmp_fd->mmap_len);
-		if (ret != 0) {
-			ret = errno;
-			perror("kernctl_get_mmap_len");
-			goto end;
-		}
+/*
+ * Add a fd to the global list protected by a mutex.
+ */
+static int kconsumerd_add_fd(struct lttng_kconsumerd_fd *tmp_fd)
+{
+	int ret;
 
-		tmp_fd->mmap_base = mmap(NULL, tmp_fd->mmap_len,
-				PROT_READ, MAP_PRIVATE, tmp_fd->consumerd_fd, 0);
-		if (tmp_fd->mmap_base == MAP_FAILED) {
-			perror("Error mmaping");
-			ret = -1;
-			goto end;
-		}
+	pthread_mutex_lock(&kconsumerd_data.lock);
+	/* Check if already exist */
+	ret = kconsumerd_find_session_fd(tmp_fd->sessiond_fd);
+	if (ret == 1) {
+		goto end;
 	}
-
 	cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head);
 	kconsumerd_data.fds_count++;
 	kconsumerd_data.need_update = 1;
+
 end:
 	pthread_mutex_unlock(&kconsumerd_data.lock);
 	return ret;
@@ -263,6 +251,7 @@ static int kconsumerd_consumerd_recv_fd(
 	int nb_fd;
 	char recv_fd[CMSG_SPACE(sizeof(int))];
 	struct lttcomm_kconsumerd_msg lkm;
+	struct lttng_kconsumerd_fd *new_fd;
 
 	/* the number of fds we are about to receive */
 	nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg);
@@ -313,14 +302,40 @@ static int kconsumerd_consumerd_recv_fd(
 					DBG("kconsumerd_add_fd %s (%d)", lkm.path_name,
 							((int *) CMSG_DATA(cmsg))[0]);
 
-					ret = kconsumerd_add_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]);
-					if (ret < 0) {
+					new_fd = kconsumerd_allocate_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]);
+					if (new_fd == NULL) {
 						lttng_kconsumerd_send_error(ctx, KCONSUMERD_OUTFD_ERROR);
 						goto end;
 					}
+
+					if (ctx->on_recv_fd != NULL) {
+						ret = ctx->on_recv_fd(new_fd);
+						/*
+						 * if we receive the FD back, we insert it in the local
+						 * FD list, otherwise we assume it is handled by the
+						 * external consumer.
+						 */
+						if (ret == new_fd->sessiond_fd) {
+							kconsumerd_add_fd(new_fd);
+						}
+					} else {
+						kconsumerd_add_fd(new_fd);
+					}
 					break;
 				case UPDATE_STREAM:
-					kconsumerd_change_fd_state(lkm.fd, lkm.state);
+					if (ctx->on_update_fd != NULL) {
+						ret = ctx->on_update_fd(lkm.fd, lkm.state);
+						/*
+						 * if we receive the FD back, we have to handle it locally,
+						 * otherwise we assume the external consumer is taking care
+						 * of it.
+						 */
+						if (ret == lkm.fd) {
+							kconsumerd_change_fd_state(lkm.fd, lkm.state);
+						}
+					} else {
+							kconsumerd_change_fd_state(lkm.fd, lkm.state);
+					}
 					break;
 				default:
 					break;
@@ -756,7 +771,9 @@ end:
  * Returns a pointer to the new context or NULL on error.
  */
 struct lttng_kconsumerd_local_data *lttng_kconsumerd_create(
-		int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd))
+		int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd),
+		int (*recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd),
+		int (*update_fd)(int sessiond_fd, uint32_t state))
 {
 	int ret;
 	struct lttng_kconsumerd_local_data *ctx;
@@ -767,7 +784,10 @@ struct lttng_kconsumerd_local_data *lttng_kconsumerd_create(
 		goto end;
 	}
 
+	/* assign the callbacks */
 	ctx->on_buffer_ready = buffer_ready;
+	ctx->on_recv_fd = recv_fd;
+	ctx->on_update_fd = update_fd;
 
 	ret = pipe(ctx->kconsumerd_poll_pipe);
 	if (ret < 0) {
diff --git a/ltt-kconsumerd/ltt-kconsumerd.c b/ltt-kconsumerd/ltt-kconsumerd.c
index cd4b00e..f2bb211 100644
--- a/ltt-kconsumerd/ltt-kconsumerd.c
+++ b/ltt-kconsumerd/ltt-kconsumerd.c
@@ -271,6 +271,47 @@ end:
 	return ret;
 }
 
+static int on_recv_fd(struct lttng_kconsumerd_fd *kconsumerd_fd)
+{
+	int ret;
+
+	/* Opening the tracefile in write mode */
+	if (kconsumerd_fd->path_name != NULL) {
+		ret = open(kconsumerd_fd->path_name,
+				O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
+		if (ret < 0) {
+			ERR("Opening %s", kconsumerd_fd->path_name);
+			perror("open");
+			goto error;
+		}
+		kconsumerd_fd->out_fd = ret;
+	}
+
+	if (kconsumerd_fd->output == LTTNG_EVENT_MMAP) {
+		/* get the len of the mmap region */
+		ret = kernctl_get_mmap_len(kconsumerd_fd->consumerd_fd, &kconsumerd_fd->mmap_len);
+		if (ret != 0) {
+			ret = errno;
+			perror("kernctl_get_mmap_len");
+			goto error;
+		}
+
+		kconsumerd_fd->mmap_base = mmap(NULL, kconsumerd_fd->mmap_len,
+				PROT_READ, MAP_PRIVATE, kconsumerd_fd->consumerd_fd, 0);
+		if (kconsumerd_fd->mmap_base == MAP_FAILED) {
+			perror("Error mmaping");
+			ret = -1;
+			goto error;
+		}
+	}
+
+	/* we return the FD back to the lib to let it handle the FD internally */
+	return kconsumerd_fd->sessiond_fd;
+
+error:
+	return ret;
+}
+
 /*
  * main
  */
@@ -297,8 +338,8 @@ int main(int argc, char **argv)
 		snprintf(command_sock_path, PATH_MAX,
 				KCONSUMERD_CMD_SOCK_PATH);
 	}
-	/* create the pipe to wake to receiving thread when needed */
-	ctx = lttng_kconsumerd_create(read_subbuffer);
+	/* create the consumer instance with and assign the callbacks */
+	ctx = lttng_kconsumerd_create(read_subbuffer, on_recv_fd, NULL);
 	if (ctx == NULL) {
 		goto error;
 	}
-- 
1.7.4.1





More information about the lttng-dev mailing list