[ltt-dev] [LTTNG-TOOLS PATCH v2] Callbacks on receive and update FD
Julien Desfossez
julien.desfossez at polymtl.ca
Tue Aug 16 18:55:47 EDT 2011
The user of the lib can now take control over a new FD or the update
operation of an existing FD.
Opening the output tracefile is now the responsiblity of the user
and not the library itself.
Signed-off-by: Julien Desfossez <julien.desfossez at polymtl.ca>
---
include/lttng/lttng-kconsumerd.h | 29 ++++++++--
liblttngkconsumerd/lttngkconsumerd.c | 96 +++++++++++++++++++--------------
ltt-kconsumerd/ltt-kconsumerd.c | 45 +++++++++++++++-
3 files changed, 122 insertions(+), 48 deletions(-)
diff --git a/include/lttng/lttng-kconsumerd.h b/include/lttng/lttng-kconsumerd.h
index 7e195ab..edff0ba 100644
--- a/include/lttng/lttng-kconsumerd.h
+++ b/include/lttng/lttng-kconsumerd.h
@@ -79,6 +79,25 @@ struct lttng_kconsumerd_fd {
struct lttng_kconsumerd_local_data {
/* function to call when data is available on a buffer */
int (*on_buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd);
+ /*
+ * function to call when we receive a new fd, it receives a newly allocated
+ * kconsumerd_fd, depending on the return code of this function, the new FD
+ * will be handled by the application or the library :
+ * - > 0 (success, FD is kept by application)
+ * - == 0 (success, FD is left to library)
+ * - < 0 (error)
+ */
+ int (*on_recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd);
+ /*
+ * function to call when a FD is getting updated by the session daemon,
+ * this function receives the FD as seen by the session daemon
+ * (sessiond_fd) and the new state, depending on the return code of this function
+ * the update of state for the FD is handled by the application or the library :
+ * - > 0 (success, FD is kept by application)
+ * - == 0 (success, FD is left to library)
+ * - < 0 (error)
+ */
+ int (*on_update_fd)(int sessiond_fd, uint32_t state);
/* socket to communicate errors with sessiond */
int kconsumerd_error_socket;
/* socket to exchange commands with sessiond */
@@ -98,15 +117,15 @@ struct lttng_kconsumerd_local_data {
* - create the should_quit pipe (for signal handler)
* - create the thread pipe (for splice)
*
- * Takes a function pointer as argument, this function is called when data is
- * available on a buffer. This function is responsible to do the
- * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
- * buffer configuration and then kernctl_put_next_subbuf at the end.
+ * Takes the function pointers to the on_buffer_ready, on_recv_fd, and
+ * on_update_fd callbacks.
*
* Returns a pointer to the new context or NULL on error.
*/
extern struct lttng_kconsumerd_local_data *lttng_kconsumerd_create(
- int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd));
+ int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd),
+ int (*recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd),
+ int (*update_fd)(int sessiond_fd, uint32_t state));
/*
* Close all fds associated with the instance and free the context.
diff --git a/liblttngkconsumerd/lttngkconsumerd.c b/liblttngkconsumerd/lttngkconsumerd.c
index 69ef9a0..751cea1 100644
--- a/liblttngkconsumerd/lttngkconsumerd.c
+++ b/liblttngkconsumerd/lttngkconsumerd.c
@@ -125,22 +125,21 @@ static void kconsumerd_del_fd(struct lttng_kconsumerd_fd *lcf)
}
/*
- * Add a fd to the global list protected by a mutex.
+ * Create a struct lttcomm_kconsumerd_msg from the
+ * information received on the receiving socket
*/
-static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf,
+struct lttng_kconsumerd_fd *kconsumerd_allocate_fd(
+ struct lttcomm_kconsumerd_msg *buf,
int consumerd_fd)
{
struct lttng_kconsumerd_fd *tmp_fd;
- int ret = 0;
- pthread_mutex_lock(&kconsumerd_data.lock);
- /* Check if already exist */
- ret = kconsumerd_find_session_fd(buf->fd);
- if (ret == 1) {
+ tmp_fd = malloc(sizeof(struct lttng_kconsumerd_fd));
+ if (tmp_fd == NULL) {
+ perror("malloc struct lttng_kconsumerd_fd");
goto end;
}
- tmp_fd = malloc(sizeof(struct lttng_kconsumerd_fd));
tmp_fd->sessiond_fd = buf->fd;
tmp_fd->consumerd_fd = consumerd_fd;
tmp_fd->state = buf->state;
@@ -152,42 +151,31 @@ static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf,
tmp_fd->output = buf->output;
strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX);
tmp_fd->path_name[PATH_MAX - 1] = '\0';
+ DBG("Allocated %s (sessiond_fd %d, consumerd_fd %d, out_fd %d)",
+ tmp_fd->path_name, tmp_fd->sessiond_fd,
+ tmp_fd->consumerd_fd, tmp_fd->out_fd);
- /* Opening the tracefile in write mode */
- if (tmp_fd->path_name != NULL) {
- ret = open(tmp_fd->path_name,
- O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
- if (ret < 0) {
- ERR("Opening %s", tmp_fd->path_name);
- perror("open");
- goto end;
- }
- tmp_fd->out_fd = ret;
- DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name,
- tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd);
- }
+end:
+ return tmp_fd;
+}
- if (tmp_fd->output == LTTNG_EVENT_MMAP) {
- /* get the len of the mmap region */
- ret = kernctl_get_mmap_len(tmp_fd->consumerd_fd, &tmp_fd->mmap_len);
- if (ret != 0) {
- ret = errno;
- perror("kernctl_get_mmap_len");
- goto end;
- }
+/*
+ * Add a fd to the global list protected by a mutex.
+ */
+static int kconsumerd_add_fd(struct lttng_kconsumerd_fd *tmp_fd)
+{
+ int ret;
- tmp_fd->mmap_base = mmap(NULL, tmp_fd->mmap_len,
- PROT_READ, MAP_PRIVATE, tmp_fd->consumerd_fd, 0);
- if (tmp_fd->mmap_base == MAP_FAILED) {
- perror("Error mmaping");
- ret = -1;
- goto end;
- }
+ pthread_mutex_lock(&kconsumerd_data.lock);
+ /* Check if already exist */
+ ret = kconsumerd_find_session_fd(tmp_fd->sessiond_fd);
+ if (ret == 1) {
+ goto end;
}
-
cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head);
kconsumerd_data.fds_count++;
kconsumerd_data.need_update = 1;
+
end:
pthread_mutex_unlock(&kconsumerd_data.lock);
return ret;
@@ -263,6 +251,7 @@ static int kconsumerd_consumerd_recv_fd(
int nb_fd;
char recv_fd[CMSG_SPACE(sizeof(int))];
struct lttcomm_kconsumerd_msg lkm;
+ struct lttng_kconsumerd_fd *new_fd;
/* the number of fds we are about to receive */
nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg);
@@ -313,14 +302,34 @@ static int kconsumerd_consumerd_recv_fd(
DBG("kconsumerd_add_fd %s (%d)", lkm.path_name,
((int *) CMSG_DATA(cmsg))[0]);
- ret = kconsumerd_add_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]);
- if (ret < 0) {
+ new_fd = kconsumerd_allocate_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]);
+ if (new_fd == NULL) {
lttng_kconsumerd_send_error(ctx, KCONSUMERD_OUTFD_ERROR);
goto end;
}
+
+ if (ctx->on_recv_fd != NULL) {
+ ret = ctx->on_recv_fd(new_fd);
+ if (ret == 0) {
+ kconsumerd_add_fd(new_fd);
+ } else if (ret < 0) {
+ goto end;
+ }
+ } else {
+ kconsumerd_add_fd(new_fd);
+ }
break;
case UPDATE_STREAM:
- kconsumerd_change_fd_state(lkm.fd, lkm.state);
+ if (ctx->on_update_fd != NULL) {
+ ret = ctx->on_update_fd(lkm.fd, lkm.state);
+ if (ret == 0) {
+ kconsumerd_change_fd_state(lkm.fd, lkm.state);
+ } else if (ret < 0) {
+ goto end;
+ }
+ } else {
+ kconsumerd_change_fd_state(lkm.fd, lkm.state);
+ }
break;
default:
break;
@@ -754,7 +763,9 @@ end:
* Returns a pointer to the new context or NULL on error.
*/
struct lttng_kconsumerd_local_data *lttng_kconsumerd_create(
- int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd))
+ int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd),
+ int (*recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd),
+ int (*update_fd)(int sessiond_fd, uint32_t state))
{
int ret;
struct lttng_kconsumerd_local_data *ctx;
@@ -765,7 +776,10 @@ struct lttng_kconsumerd_local_data *lttng_kconsumerd_create(
goto end;
}
+ /* assign the callbacks */
ctx->on_buffer_ready = buffer_ready;
+ ctx->on_recv_fd = recv_fd;
+ ctx->on_update_fd = update_fd;
ret = pipe(ctx->kconsumerd_poll_pipe);
if (ret < 0) {
diff --git a/ltt-kconsumerd/ltt-kconsumerd.c b/ltt-kconsumerd/ltt-kconsumerd.c
index ca93965..84ea95d 100644
--- a/ltt-kconsumerd/ltt-kconsumerd.c
+++ b/ltt-kconsumerd/ltt-kconsumerd.c
@@ -277,6 +277,47 @@ end:
return ret;
}
+static int on_recv_fd(struct lttng_kconsumerd_fd *kconsumerd_fd)
+{
+ int ret;
+
+ /* Opening the tracefile in write mode */
+ if (kconsumerd_fd->path_name != NULL) {
+ ret = open(kconsumerd_fd->path_name,
+ O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
+ if (ret < 0) {
+ ERR("Opening %s", kconsumerd_fd->path_name);
+ perror("open");
+ goto error;
+ }
+ kconsumerd_fd->out_fd = ret;
+ }
+
+ if (kconsumerd_fd->output == LTTNG_EVENT_MMAP) {
+ /* get the len of the mmap region */
+ ret = kernctl_get_mmap_len(kconsumerd_fd->consumerd_fd, &kconsumerd_fd->mmap_len);
+ if (ret != 0) {
+ ret = errno;
+ perror("kernctl_get_mmap_len");
+ goto error;
+ }
+
+ kconsumerd_fd->mmap_base = mmap(NULL, kconsumerd_fd->mmap_len,
+ PROT_READ, MAP_PRIVATE, kconsumerd_fd->consumerd_fd, 0);
+ if (kconsumerd_fd->mmap_base == MAP_FAILED) {
+ perror("Error mmaping");
+ ret = -1;
+ goto error;
+ }
+ }
+
+ /* we return 0 to let the library handle the FD internally */
+ return 0;
+
+error:
+ return ret;
+}
+
/*
* main
*/
@@ -303,8 +344,8 @@ int main(int argc, char **argv)
snprintf(command_sock_path, PATH_MAX,
KCONSUMERD_CMD_SOCK_PATH);
}
- /* create the pipe to wake to receiving thread when needed */
- ctx = lttng_kconsumerd_create(read_subbuffer);
+ /* create the consumer instance with and assign the callbacks */
+ ctx = lttng_kconsumerd_create(read_subbuffer, on_recv_fd, NULL);
if (ctx == NULL) {
goto error;
}
--
1.7.4.1
More information about the lttng-dev
mailing list