[ltt-dev] [LTTNG-TOOLS PATCH v2 2/3] Register the consuming function and add a library context
Mathieu Desnoyers
mathieu.desnoyers at efficios.com
Fri Aug 12 10:14:43 EDT 2011
* Julien Desfossez (julien.desfossez at polymtl.ca) wrote:
> The init function of the library now takes a function as argument to
> allow a consumer using the library to control the function to be called
> when data is ready in a buffer.
> The kconsumerd_on_read_subbuffer_mmap and
> kconsumerd_on_read_subbuffer_splice are now exported to allow a consumer
> to use them directly if needed.
> Also the library has now a context, where all local parameters are
> registered instead of static variables. That way, we can have multiple
> callers using the library within the same process.
> Only the flag indicating that all fds are closed remain global to the
> library and shared among callers.
Acked-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
Thanks!
Mathieu
>
> Signed-off-by: Julien Desfossez <julien.desfossez at polymtl.ca>
> ---
> liblttkconsumerd/lttkconsumerd.c | 281 +++++++++++++++-----------------------
> liblttkconsumerd/lttkconsumerd.h | 61 +++++++-
> ltt-kconsumerd/ltt-kconsumerd.c | 110 +++++++++++++--
> 3 files changed, 263 insertions(+), 189 deletions(-)
>
> diff --git a/liblttkconsumerd/lttkconsumerd.c b/liblttkconsumerd/lttkconsumerd.c
> index 5c22d5e..d4908d1 100644
> --- a/liblttkconsumerd/lttkconsumerd.c
> +++ b/liblttkconsumerd/lttkconsumerd.c
> @@ -62,26 +62,13 @@ struct kconsumerd_global_data {
> unsigned int need_update;
> } kconsumerd_data = {
> .fd_list.head = CDS_LIST_HEAD_INIT(kconsumerd_data.fd_list.head),
> + .fds_count = 0,
> + .need_update = 1,
> };
>
> -/* communication with splice */
> -static int kconsumerd_thread_pipe[2];
> -
> -/* pipe to wake the poll thread when necessary */
> -static int kconsumerd_poll_pipe[2];
> -
> -/* to let the signal handler wake up the fd receiver thread */
> -static int kconsumerd_should_quit[2];
> -
> /* timeout parameter, to control the polling thread grace period */
> static int kconsumerd_poll_timeout = -1;
>
> -/* socket to communicate errors with sessiond */
> -static int kconsumerd_error_socket;
> -
> -/* socket to exchange commands with sessiond */
> -static char *kconsumerd_command_sock_path;
> -
> /*
> * flag to inform the polling thread to quit when all fd hung up.
> * Updated by the kconsumerd_thread_receive_fds when it notices that all
> @@ -95,9 +82,9 @@ static volatile int kconsumerd_quit = 0;
> *
> * Set the error socket
> */
> -void kconsumerd_set_error_socket(int sock)
> +void kconsumerd_set_error_socket(struct kconsumerd_local_data *ctx, int sock)
> {
> - kconsumerd_error_socket = sock;
> + ctx->kconsumerd_error_socket = sock;
> }
>
> /*
> @@ -105,9 +92,10 @@ void kconsumerd_set_error_socket(int sock)
> *
> * Set the command socket path
> */
> -void kconsumerd_set_command_socket_path(char *sock)
> +void kconsumerd_set_command_socket_path(struct kconsumerd_local_data *ctx,
> + char *sock)
> {
> - kconsumerd_command_sock_path = sock;
> + ctx->kconsumerd_command_sock_path = sock;
> }
>
> /*
> @@ -144,7 +132,9 @@ static void kconsumerd_del_fd(struct kconsumerd_fd *lcf)
> if (kconsumerd_data.fds_count > 0) {
> kconsumerd_data.fds_count--;
> if (lcf != NULL) {
> - close(lcf->out_fd);
> + if (lcf->out_fd != 0) {
> + close(lcf->out_fd);
> + }
> close(lcf->consumerd_fd);
> free(lcf);
> lcf = NULL;
> @@ -161,8 +151,8 @@ static void kconsumerd_del_fd(struct kconsumerd_fd *lcf)
> */
> static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_fd)
> {
> - int ret;
> struct kconsumerd_fd *tmp_fd;
> + int ret = 0;
>
> pthread_mutex_lock(&kconsumerd_data.lock);
> /* Check if already exist */
> @@ -176,22 +166,24 @@ static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_f
> tmp_fd->consumerd_fd = consumerd_fd;
> tmp_fd->state = buf->state;
> tmp_fd->max_sb_size = buf->max_sb_size;
> + tmp_fd->out_fd = 0;
> + tmp_fd->out_fd_offset = 0;
> strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX);
> tmp_fd->path_name[PATH_MAX - 1] = '\0';
>
> /* Opening the tracefile in write mode */
> - ret = open(tmp_fd->path_name,
> - O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
> - if (ret < 0) {
> - ERR("Opening %s", tmp_fd->path_name);
> - perror("open");
> - goto end;
> + if (tmp_fd->path_name != NULL) {
> + ret = open(tmp_fd->path_name,
> + O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
> + if (ret < 0) {
> + ERR("Opening %s", tmp_fd->path_name);
> + perror("open");
> + goto end;
> + }
> + tmp_fd->out_fd = ret;
> + DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name,
> + tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd);
> }
> - tmp_fd->out_fd = ret;
> - tmp_fd->out_fd_offset = 0;
> -
> - DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name,
> - tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd);
>
> cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head);
> kconsumerd_data.fds_count++;
> @@ -231,16 +223,14 @@ static void kconsumerd_change_fd_state(int sessiond_fd,
> * Returns the number of fds in the structures
> * Called with kconsumerd_data.lock held.
> */
> -static int kconsumerd_update_poll_array(struct pollfd **pollfd,
> - struct kconsumerd_fd **local_kconsumerd_fd)
> +static int kconsumerd_update_poll_array(struct kconsumerd_local_data *ctx,
> + struct pollfd **pollfd, struct kconsumerd_fd **local_kconsumerd_fd)
> {
> struct kconsumerd_fd *iter;
> int i = 0;
>
> DBG("Updating poll fd array");
> -
> cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
> - DBG("Inside for each");
> if (iter->state == ACTIVE_FD) {
> DBG("Active FD %d", iter->consumerd_fd);
> (*pollfd)[i].fd = iter->consumerd_fd;
> @@ -254,7 +244,7 @@ static int kconsumerd_update_poll_array(struct pollfd **pollfd,
> * insert the kconsumerd_poll_pipe at the end of the array and don't
> * increment i so nb_fd is the number of real FD
> */
> - (*pollfd)[i].fd = kconsumerd_poll_pipe[0];
> + (*pollfd)[i].fd = ctx->kconsumerd_poll_pipe[0];
> (*pollfd)[i].events = POLLIN;
> return i;
> }
> @@ -266,7 +256,7 @@ static int kconsumerd_update_poll_array(struct pollfd **pollfd,
> * mmap the ring buffer, read it and write the data to the tracefile.
> * Returns the number of bytes written
> */
> -static int kconsumerd_on_read_subbuffer_mmap(
> +int kconsumerd_on_read_subbuffer_mmap(struct kconsumerd_local_data *ctx,
> struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
> {
> unsigned long mmap_len, mmap_offset, padded_len, padding_len;
> @@ -379,7 +369,7 @@ end:
> * Splice the data from the ring buffer to the tracefile.
> * Returns the number of bytes spliced
> */
> -static int kconsumerd_on_read_subbuffer(
> +int kconsumerd_on_read_subbuffer_splice(struct kconsumerd_local_data *ctx,
> struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
> {
> long ret = 0;
> @@ -391,7 +381,7 @@ static int kconsumerd_on_read_subbuffer(
> while (len > 0) {
> DBG("splice chan to pipe offset %lu (fd : %d)",
> (unsigned long)offset, fd);
> - ret = splice(fd, &offset, kconsumerd_thread_pipe[1], NULL, len,
> + ret = splice(fd, &offset, ctx->kconsumerd_thread_pipe[1], NULL, len,
> SPLICE_F_MOVE | SPLICE_F_MORE);
> DBG("splice chan to pipe ret %ld", ret);
> if (ret < 0) {
> @@ -400,7 +390,7 @@ static int kconsumerd_on_read_subbuffer(
> goto splice_error;
> }
>
> - ret = splice(kconsumerd_thread_pipe[0], NULL, outfd, NULL, ret,
> + ret = splice(ctx->kconsumerd_thread_pipe[0], NULL, outfd, NULL, ret,
> SPLICE_F_MOVE | SPLICE_F_MORE);
> DBG("splice pipe to file %ld", ret);
> if (ret < 0) {
> @@ -452,98 +442,17 @@ splice_error:
> /* send the appropriate error description to sessiond */
> switch(ret) {
> case EBADF:
> - kconsumerd_send_error(KCONSUMERD_SPLICE_EBADF);
> + kconsumerd_send_error(ctx, KCONSUMERD_SPLICE_EBADF);
> break;
> case EINVAL:
> - kconsumerd_send_error(KCONSUMERD_SPLICE_EINVAL);
> + kconsumerd_send_error(ctx, KCONSUMERD_SPLICE_EINVAL);
> break;
> case ENOMEM:
> - kconsumerd_send_error(KCONSUMERD_SPLICE_ENOMEM);
> + kconsumerd_send_error(ctx, KCONSUMERD_SPLICE_ENOMEM);
> break;
> case ESPIPE:
> - kconsumerd_send_error(KCONSUMERD_SPLICE_ESPIPE);
> - break;
> - }
> -
> -end:
> - return ret;
> -}
> -
> -/*
> - * kconsumerd_read_subbuffer
> - *
> - * Consume data on a file descriptor and write it on a trace file
> - */
> -static int kconsumerd_read_subbuffer(struct kconsumerd_fd *kconsumerd_fd)
> -{
> - unsigned long len;
> - int err;
> - long ret = 0;
> - int infd = kconsumerd_fd->consumerd_fd;
> -
> - DBG("In kconsumerd_read_subbuffer (infd : %d)", infd);
> - /* Get the next subbuffer */
> - err = kernctl_get_next_subbuf(infd);
> - if (err != 0) {
> - ret = errno;
> - perror("Reserving sub buffer failed (everything is normal, "
> - "it is due to concurrency)");
> - goto end;
> - }
> -
> - switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) {
> - case LTTNG_EVENT_SPLICE:
> - /* read the whole subbuffer */
> - err = kernctl_get_padded_subbuf_size(infd, &len);
> - if (err != 0) {
> - ret = errno;
> - perror("Getting sub-buffer len failed.");
> - goto end;
> - }
> -
> - /* splice the subbuffer to the tracefile */
> - ret = kconsumerd_on_read_subbuffer(kconsumerd_fd, len);
> - if (ret < 0) {
> - /*
> - * display the error but continue processing to try
> - * to release the subbuffer
> - */
> - ERR("Error splicing to tracefile");
> - }
> - break;
> - case LTTNG_EVENT_MMAP:
> - /* read the used subbuffer size */
> - err = kernctl_get_subbuf_size(infd, &len);
> - if (err != 0) {
> - ret = errno;
> - perror("Getting sub-buffer len failed.");
> - goto end;
> - }
> - /* write the subbuffer to the tracefile */
> - ret = kconsumerd_on_read_subbuffer_mmap(kconsumerd_fd, len);
> - if (ret < 0) {
> - /*
> - * display the error but continue processing to try
> - * to release the subbuffer
> - */
> - ERR("Error writing to tracefile");
> - }
> + kconsumerd_send_error(ctx, KCONSUMERD_SPLICE_ESPIPE);
> break;
> - default:
> - ERR("Unknown output method");
> - ret = -1;
> - }
> -
> - err = kernctl_put_next_subbuf(infd);
> - if (err != 0) {
> - ret = errno;
> - if (errno == EFAULT) {
> - perror("Error in unreserving sub buffer\n");
> - } else if (errno == EIO) {
> - /* Should never happen with newer LTTng versions */
> - perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
> - }
> - goto end;
> }
>
> end:
> @@ -583,8 +492,8 @@ exit:
> * structures describing each fd (path name).
> * Returns the size of received data
> */
> -static int kconsumerd_consumerd_recv_fd(int sfd,
> - struct pollfd *kconsumerd_sockpoll, int size,
> +static int kconsumerd_consumerd_recv_fd(struct kconsumerd_local_data *ctx,
> + int sfd, struct pollfd *kconsumerd_sockpoll, int size,
> enum kconsumerd_command cmd_type)
> {
> struct iovec iov[1];
> @@ -624,7 +533,7 @@ static int kconsumerd_consumerd_recv_fd(int sfd,
>
> if (ret != (size / nb_fd)) {
> ERR("Received only %d, expected %d", ret, size);
> - kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
> + kconsumerd_send_error(ctx, KCONSUMERD_ERROR_RECV_FD);
> goto end;
> }
>
> @@ -632,7 +541,7 @@ static int kconsumerd_consumerd_recv_fd(int sfd,
> if (!cmsg) {
> ERR("Invalid control message header");
> ret = -1;
> - kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
> + kconsumerd_send_error(ctx, KCONSUMERD_ERROR_RECV_FD);
> goto end;
> }
>
> @@ -643,7 +552,7 @@ static int kconsumerd_consumerd_recv_fd(int sfd,
> DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, ((int *) CMSG_DATA(cmsg))[0]);
> ret = kconsumerd_add_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]);
> if (ret < 0) {
> - kconsumerd_send_error(KCONSUMERD_OUTFD_ERROR);
> + kconsumerd_send_error(ctx, KCONSUMERD_OUTFD_ERROR);
> goto end;
> }
> break;
> @@ -654,13 +563,13 @@ static int kconsumerd_consumerd_recv_fd(int sfd,
> break;
> }
> /* signal the poll thread */
> - tmp2 = write(kconsumerd_poll_pipe[1], "4", 1);
> + tmp2 = write(ctx->kconsumerd_poll_pipe[1], "4", 1);
> if (tmp2 < 0) {
> perror("write kconsumerd poll");
> }
> } else {
> ERR("Didn't received any fd");
> - kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
> + kconsumerd_send_error(ctx, KCONSUMERD_ERROR_RECV_FD);
> ret = -1;
> goto end;
> }
> @@ -686,12 +595,8 @@ void *kconsumerd_thread_poll_fds(void *data)
> int nb_fd = 0;
> char tmp;
> int tmp2;
> + struct kconsumerd_local_data *ctx = data;
>
> - ret = pipe(kconsumerd_thread_pipe);
> - if (ret < 0) {
> - perror("Error creating pipe");
> - goto end;
> - }
>
> local_kconsumerd_fd = malloc(sizeof(struct kconsumerd_fd));
>
> @@ -730,10 +635,10 @@ void *kconsumerd_thread_poll_fds(void *data)
> pthread_mutex_unlock(&kconsumerd_data.lock);
> goto end;
> }
> - ret = kconsumerd_update_poll_array(&pollfd, local_kconsumerd_fd);
> + ret = kconsumerd_update_poll_array(ctx, &pollfd, local_kconsumerd_fd);
> if (ret < 0) {
> ERR("Error in allocating pollfd or local_outfds");
> - kconsumerd_send_error(KCONSUMERD_POLL_ERROR);
> + kconsumerd_send_error(ctx, KCONSUMERD_POLL_ERROR);
> pthread_mutex_unlock(&kconsumerd_data.lock);
> goto end;
> }
> @@ -748,7 +653,7 @@ void *kconsumerd_thread_poll_fds(void *data)
> DBG("poll num_rdy : %d", num_rdy);
> if (num_rdy == -1) {
> perror("Poll error");
> - kconsumerd_send_error(KCONSUMERD_POLL_ERROR);
> + kconsumerd_send_error(ctx, KCONSUMERD_POLL_ERROR);
> goto end;
> } else if (num_rdy == 0) {
> DBG("Polling thread timed out");
> @@ -768,7 +673,7 @@ void *kconsumerd_thread_poll_fds(void *data)
> */
> if (pollfd[nb_fd].revents == POLLIN) {
> DBG("kconsumerd_poll_pipe wake up");
> - tmp2 = read(kconsumerd_poll_pipe[0], &tmp, 1);
> + tmp2 = read(ctx->kconsumerd_poll_pipe[0], &tmp, 1);
> if (tmp2 < 0) {
> perror("read kconsumerd poll");
> }
> @@ -796,7 +701,7 @@ void *kconsumerd_thread_poll_fds(void *data)
> case POLLPRI:
> DBG("Urgent read on fd %d", pollfd[i].fd);
> high_prio = 1;
> - ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
> + ret = ctx->on_buffer_ready(local_kconsumerd_fd[i]);
> /* it's ok to have an unavailable sub-buffer */
> if (ret == EAGAIN) {
> ret = 0;
> @@ -819,7 +724,7 @@ void *kconsumerd_thread_poll_fds(void *data)
> for (i = 0; i < nb_fd; i++) {
> if (pollfd[i].revents == POLLIN) {
> DBG("Normal read on fd %d", pollfd[i].fd);
> - ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
> + ret = ctx->on_buffer_ready(local_kconsumerd_fd[i]);
> /* it's ok to have an unavailable subbuffer */
> if (ret == EAGAIN) {
> ret = 0;
> @@ -842,34 +747,75 @@ end:
> }
>
> /*
> - * kconsumerd_init(void)
> + * kconsumerd_create
> *
> * initialise the necessary environnement :
> - * - inform the polling thread to update the polling array
> + * - create a new context
> * - create the poll_pipe
> * - create the should_quit pipe (for signal handler)
> + * - create the thread pipe (for splice)
> + * Takes a function pointer as argument, this function is called when data is
> + * available on a buffer. This function is responsible to do the
> + * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
> + * buffer configuration and then kernctl_put_next_subbuf at the end.
> + * Returns a pointer to the new context or NULL on error.
> */
> -int kconsumerd_init(void)
> +struct kconsumerd_local_data *kconsumerd_create(
> + int (*buffer_ready)(struct kconsumerd_fd *kconsumerd_fd))
> {
> int ret;
> + struct kconsumerd_local_data *ctx;
>
> - /* need to update the polling array at init time */
> - kconsumerd_data.need_update = 1;
> + ctx = malloc(sizeof(struct kconsumerd_local_data));
> + if (ctx == NULL) {
> + perror("allocating context");
> + goto end;
> + }
> +
> + ctx->on_buffer_ready = buffer_ready;
>
> - ret = pipe(kconsumerd_poll_pipe);
> + ret = pipe(ctx->kconsumerd_poll_pipe);
> if (ret < 0) {
> perror("Error creating poll pipe");
> + ctx = NULL;
> goto end;
> }
>
> - ret = pipe(kconsumerd_should_quit);
> + ret = pipe(ctx->kconsumerd_should_quit);
> if (ret < 0) {
> perror("Error creating recv pipe");
> + ctx = NULL;
> + goto end;
> + }
> +
> + ret = pipe(ctx->kconsumerd_thread_pipe);
> + if (ret < 0) {
> + perror("Error creating thread pipe");
> + ctx = NULL;
> goto end;
> }
>
> end:
> - return ret;
> + return ctx;
> +}
> +
> +/*
> + * kconsumerd_destroy
> + *
> + * Close all fds associated with the instance and free the context
> + */
> +void kconsumerd_destroy(struct kconsumerd_local_data *ctx)
> +{
> + close(ctx->kconsumerd_error_socket);
> + close(ctx->kconsumerd_thread_pipe[0]);
> + close(ctx->kconsumerd_thread_pipe[1]);
> + close(ctx->kconsumerd_poll_pipe[0]);
> + close(ctx->kconsumerd_poll_pipe[1]);
> + close(ctx->kconsumerd_should_quit[0]);
> + close(ctx->kconsumerd_should_quit[1]);
> + unlink(ctx->kconsumerd_command_sock_path);
> + free(ctx);
> + ctx = NULL;
> }
>
> /*
> @@ -887,11 +833,12 @@ void *kconsumerd_thread_receive_fds(void *data)
> * avoids making blocking sockets
> */
> struct pollfd kconsumerd_sockpoll[2];
> + struct kconsumerd_local_data *ctx = data;
>
>
> - DBG("Creating command socket %s", kconsumerd_command_sock_path);
> - unlink(kconsumerd_command_sock_path);
> - client_socket = lttcomm_create_unix_sock(kconsumerd_command_sock_path);
> + DBG("Creating command socket %s", ctx->kconsumerd_command_sock_path);
> + unlink(ctx->kconsumerd_command_sock_path);
> + client_socket = lttcomm_create_unix_sock(ctx->kconsumerd_command_sock_path);
> if (client_socket < 0) {
> ERR("Cannot create command socket");
> goto end;
> @@ -903,7 +850,7 @@ void *kconsumerd_thread_receive_fds(void *data)
> }
>
> DBG("Sending ready command to ltt-sessiond");
> - ret = kconsumerd_send_error(KCONSUMERD_COMMAND_SOCK_READY);
> + ret = kconsumerd_send_error(ctx, KCONSUMERD_COMMAND_SOCK_READY);
> if (ret < 0) {
> ERR("Error sending ready command to ltt-sessiond");
> goto end;
> @@ -916,7 +863,7 @@ void *kconsumerd_thread_receive_fds(void *data)
> }
>
> /* prepare the FDs to poll : to client socket and the should_quit pipe */
> - kconsumerd_sockpoll[0].fd = kconsumerd_should_quit[0];
> + kconsumerd_sockpoll[0].fd = ctx->kconsumerd_should_quit[0];
> kconsumerd_sockpoll[0].events = POLLIN | POLLPRI;
> kconsumerd_sockpoll[1].fd = client_socket;
> kconsumerd_sockpoll[1].events = POLLIN | POLLPRI;
> @@ -965,7 +912,7 @@ void *kconsumerd_thread_receive_fds(void *data)
> }
>
> /* we received a command to add or update fds */
> - ret = kconsumerd_consumerd_recv_fd(sock, kconsumerd_sockpoll,
> + ret = kconsumerd_consumerd_recv_fd(ctx, sock, kconsumerd_sockpoll,
> tmp.payload_size, tmp.cmd_type);
> if (ret <= 0) {
> ERR("Receiving the FD, exiting");
> @@ -991,7 +938,7 @@ end:
> kconsumerd_poll_timeout = KCONSUMERD_POLL_GRACE_PERIOD;
>
> /* wake up the polling thread */
> - ret = write(kconsumerd_poll_pipe[1], "4", 1);
> + ret = write(ctx->kconsumerd_poll_pipe[1], "4", 1);
> if (ret < 0) {
> perror("poll pipe write");
> }
> @@ -1001,15 +948,13 @@ end:
> /*
> * kconsumerd_cleanup
> *
> - * Cleanup the daemon's socket on exit
> + * Close all the tracefiles and stream fds, should be called when all
> + * instances are destroyed.
> */
> void kconsumerd_cleanup(void)
> {
> struct kconsumerd_fd *iter, *tmp;
>
> - /* remove the socket file */
> - unlink(kconsumerd_command_sock_path);
> -
> /*
> * close all outfd. Called when there are no more threads
> * running (after joining on the threads), no need to protect
> @@ -1025,11 +970,11 @@ void kconsumerd_cleanup(void)
> *
> * Called from signal handler.
> */
> -void kconsumerd_should_exit(void)
> +void kconsumerd_should_exit(struct kconsumerd_local_data *ctx)
> {
> int ret;
> kconsumerd_quit = 1;
> - ret = write(kconsumerd_should_quit[1], "4", 1);
> + ret = write(ctx->kconsumerd_should_quit[1], "4", 1);
> if (ret < 0) {
> perror("write kconsumerd quit");
> }
> @@ -1040,10 +985,10 @@ void kconsumerd_should_exit(void)
> *
> * send return code to ltt-sessiond
> */
> -int kconsumerd_send_error(enum lttcomm_return_code cmd)
> +int kconsumerd_send_error(struct kconsumerd_local_data *ctx, enum lttcomm_return_code cmd)
> {
> - if (kconsumerd_error_socket > 0) {
> - return lttcomm_send_unix_sock(kconsumerd_error_socket, &cmd,
> + if (ctx->kconsumerd_error_socket > 0) {
> + return lttcomm_send_unix_sock(ctx->kconsumerd_error_socket, &cmd,
> sizeof(enum lttcomm_sessiond_command));
> }
>
> diff --git a/liblttkconsumerd/lttkconsumerd.h b/liblttkconsumerd/lttkconsumerd.h
> index cbdedd2..10e4a55 100644
> --- a/liblttkconsumerd/lttkconsumerd.h
> +++ b/liblttkconsumerd/lttkconsumerd.h
> @@ -57,15 +57,59 @@ struct kconsumerd_fd {
> unsigned long max_sb_size; /* the subbuffer size for this channel */
> };
>
> +struct kconsumerd_local_data {
> + /* function to call when data is available on a buffer */
> + int (*on_buffer_ready)(struct kconsumerd_fd *kconsumerd_fd);
> + /* socket to communicate errors with sessiond */
> + int kconsumerd_error_socket;
> + /* socket to exchange commands with sessiond */
> + char *kconsumerd_command_sock_path;
> + /* communication with splice */
> + int kconsumerd_thread_pipe[2];
> + /* pipe to wake the poll thread when necessary */
> + int kconsumerd_poll_pipe[2];
> + /* to let the signal handler wake up the fd receiver thread */
> + int kconsumerd_should_quit[2];
> +};
> +
> /*
> - * kconsumerd_init(void)
> + * kconsumerd_create
> * initialise the necessary environnement :
> - * - inform the polling thread to update the polling array
> + * - create a new context
> * - create the poll_pipe
> * - create the should_quit pipe (for signal handler)
> - * returns the return code of pipe, 0 on success, -1 on error
> + * - create the thread pipe (for splice)
> + * Takes a function pointer as argument, this function is called when data is
> + * available on a buffer. This function is responsible to do the
> + * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
> + * buffer configuration and then kernctl_put_next_subbuf at the end.
> + * Returns a pointer to the new context or NULL on error.
> + */
> +struct kconsumerd_local_data *kconsumerd_create(
> + int (*buffer_ready)(struct kconsumerd_fd *kconsumerd_fd));
> +
> +/*
> + * kconsumerd_destroy
> + * Close all fds associated with the instance and free the context
> + */
> +void kconsumerd_destroy(struct kconsumerd_local_data *ctx);
> +
> +/*
> + * kconsumerd_on_read_subbuffer_mmap
> + * mmap the ring buffer, read it and write the data to the tracefile.
> + * Returns the number of bytes written
> + */
> +int kconsumerd_on_read_subbuffer_mmap(struct kconsumerd_local_data *ctx,
> + struct kconsumerd_fd *kconsumerd_fd, unsigned long len);
> +
> +/*
> + * kconsumerd_on_read_subbuffer
> + *
> + * Splice the data from the ring buffer to the tracefile.
> + * Returns the number of bytes spliced
> */
> -int kconsumerd_init(void);
> +int kconsumerd_on_read_subbuffer_splice(struct kconsumerd_local_data *ctx,
> + struct kconsumerd_fd *kconsumerd_fd, unsigned long len);
>
> /*
> * kconsumerd_send_error
> @@ -73,7 +117,8 @@ int kconsumerd_init(void);
> * returns the return code of sendmsg : the number of bytes transmitted
> * or -1 on error.
> */
> -int kconsumerd_send_error(enum lttcomm_return_code cmd);
> +int kconsumerd_send_error(struct kconsumerd_local_data *ctx,
> + enum lttcomm_return_code cmd);
>
> /*
> * kconsumerd_poll_socket
> @@ -101,7 +146,7 @@ void *kconsumerd_thread_receive_fds(void *data);
> * kconsumerd_should_exit
> * Called from signal handler to ensure a clean exit
> */
> -void kconsumerd_should_exit(void);
> +void kconsumerd_should_exit(struct kconsumerd_local_data *ctx);
>
> /*
> * kconsumerd_cleanup
> @@ -113,12 +158,12 @@ void kconsumerd_cleanup(void);
> * kconsumerd_set_error_socket
> * Set the error socket for communication with a session daemon
> */
> -void kconsumerd_set_error_socket(int sock);
> +void kconsumerd_set_error_socket(struct kconsumerd_local_data *ctx, int sock);
>
> /*
> * kconsumerd_set_command_socket_path
> * Set the command socket path for communication with a session daemon
> */
> -void kconsumerd_set_command_socket_path(char *sock);
> +void kconsumerd_set_command_socket_path(struct kconsumerd_local_data *ctx, char *sock);
>
> #endif /* _LIBLTTKCONSUMERD_H */
> diff --git a/ltt-kconsumerd/ltt-kconsumerd.c b/ltt-kconsumerd/ltt-kconsumerd.c
> index 1e2841c..64c5ccf 100644
> --- a/ltt-kconsumerd/ltt-kconsumerd.c
> +++ b/ltt-kconsumerd/ltt-kconsumerd.c
> @@ -55,6 +55,9 @@ static const char *progname;
> char command_sock_path[PATH_MAX]; /* Global command socket path */
> char error_sock_path[PATH_MAX]; /* Global error path */
>
> +/* the liblttkconsumerd context */
> +struct kconsumerd_local_data *ctx;
> +
> /*
> * sighandler
> *
> @@ -67,7 +70,7 @@ static void sighandler(int sig)
> return;
> }
>
> - kconsumerd_should_exit();
> + kconsumerd_should_exit(ctx);
> }
>
> /*
> @@ -190,6 +193,86 @@ static void parse_args(int argc, char **argv)
> }
> }
>
> +/*
> + * read_subbuffer
> + *
> + * Consume data on a file descriptor and write it on a trace file
> + */
> +static int read_subbuffer(struct kconsumerd_fd *kconsumerd_fd)
> +{
> + unsigned long len;
> + int err;
> + long ret = 0;
> + int infd = kconsumerd_fd->consumerd_fd;
> +
> + DBG("In kconsumerd_read_subbuffer (infd : %d)", infd);
> + /* Get the next subbuffer */
> + err = kernctl_get_next_subbuf(infd);
> + if (err != 0) {
> + ret = errno;
> + perror("Reserving sub buffer failed (everything is normal, "
> + "it is due to concurrency)");
> + goto end;
> + }
> +
> + switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) {
> + case LTTNG_EVENT_SPLICE:
> + /* read the whole subbuffer */
> + err = kernctl_get_padded_subbuf_size(infd, &len);
> + if (err != 0) {
> + ret = errno;
> + perror("Getting sub-buffer len failed.");
> + goto end;
> + }
> +
> + /* splice the subbuffer to the tracefile */
> + ret = kconsumerd_on_read_subbuffer_splice(ctx, kconsumerd_fd, len);
> + if (ret < 0) {
> + /*
> + * display the error but continue processing to try
> + * to release the subbuffer
> + */
> + ERR("Error splicing to tracefile");
> + }
> + break;
> + case LTTNG_EVENT_MMAP:
> + /* read the used subbuffer size */
> + err = kernctl_get_subbuf_size(infd, &len);
> + if (err != 0) {
> + ret = errno;
> + perror("Getting sub-buffer len failed.");
> + goto end;
> + }
> + /* write the subbuffer to the tracefile */
> + ret = kconsumerd_on_read_subbuffer_mmap(ctx, kconsumerd_fd, len);
> + if (ret < 0) {
> + /*
> + * display the error but continue processing to try
> + * to release the subbuffer
> + */
> + ERR("Error writing to tracefile");
> + }
> + break;
> + default:
> + ERR("Unknown output method");
> + ret = -1;
> + }
> +
> + err = kernctl_put_next_subbuf(infd);
> + if (err != 0) {
> + ret = errno;
> + if (errno == EFAULT) {
> + perror("Error in unreserving sub buffer\n");
> + } else if (errno == EIO) {
> + /* Should never happen with newer LTTng versions */
> + perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
> + }
> + goto end;
> + }
> +
> +end:
> + return ret;
> +}
>
> /*
> * main
> @@ -217,7 +300,13 @@ int main(int argc, char **argv)
> snprintf(command_sock_path, PATH_MAX,
> KCONSUMERD_CMD_SOCK_PATH);
> }
> - kconsumerd_set_command_socket_path(command_sock_path);
> + /* create the pipe to wake to receiving thread when needed */
> + ctx = kconsumerd_create(read_subbuffer);
> + if (ctx == NULL) {
> + goto error;
> + }
> +
> + kconsumerd_set_command_socket_path(ctx, command_sock_path);
> if (strlen(error_sock_path) == 0) {
> snprintf(error_sock_path, PATH_MAX,
> KCONSUMERD_ERR_SOCK_PATH);
> @@ -227,12 +316,6 @@ int main(int argc, char **argv)
> goto error;
> }
>
> - /* create the pipe to wake to receiving thread when needed */
> - ret = kconsumerd_init();
> - if (ret < 0) {
> - goto end;
> - }
> -
> /* Connect to the socket created by ltt-sessiond to report errors */
> DBG("Connecting to error socket %s", error_sock_path);
> ret = lttcomm_connect_unix_sock(error_sock_path);
> @@ -240,11 +323,11 @@ int main(int argc, char **argv)
> if (ret < 0) {
> WARN("Cannot connect to error socket, is ltt-sessiond started ?");
> }
> - kconsumerd_set_error_socket(ret);
> + kconsumerd_set_error_socket(ctx, ret);
>
> /* Create the thread to manage the receive of fd */
> ret = pthread_create(&threads[0], NULL, kconsumerd_thread_receive_fds,
> - (void *) NULL);
> + (void *) ctx);
> if (ret != 0) {
> perror("pthread_create");
> goto error;
> @@ -252,7 +335,7 @@ int main(int argc, char **argv)
>
> /* Create thread to manage the polling/writing of traces */
> ret = pthread_create(&threads[1], NULL, kconsumerd_thread_poll_fds,
> - (void *) NULL);
> + (void *) ctx);
> if (ret != 0) {
> perror("pthread_create");
> goto error;
> @@ -266,14 +349,15 @@ int main(int argc, char **argv)
> }
> }
> ret = EXIT_SUCCESS;
> - kconsumerd_send_error(KCONSUMERD_EXIT_SUCCESS);
> + kconsumerd_send_error(ctx, KCONSUMERD_EXIT_SUCCESS);
> goto end;
>
> error:
> ret = EXIT_FAILURE;
> - kconsumerd_send_error(KCONSUMERD_EXIT_FAILURE);
> + kconsumerd_send_error(ctx, KCONSUMERD_EXIT_FAILURE);
>
> end:
> + kconsumerd_destroy(ctx);
> kconsumerd_cleanup();
>
> return ret;
> --
> 1.7.4.1
>
--
Mathieu Desnoyers
Operating System Efficiency R&D Consultant
EfficiOS Inc.
http://www.efficios.com
More information about the lttng-dev
mailing list