[ltt-dev] [LTTNG-TOOLS PATCH 2/3] Register the consuming function

Mathieu Desnoyers compudj at krystal.dyndns.org
Fri Aug 5 11:34:34 EDT 2011


* Julien Desfossez (julien.desfossez at polymtl.ca) wrote:
> The init function of the library now takes a function as argument to
> allow a consumer using the library to control the function to be called
> when data is ready in a buffer.
> The kconsumerd_on_read_subbuffer_mmap and
> kconsumerd_on_read_subbuffer_splice are now exported to allow a consumer
> to use them directly if needed.
> 
> Signed-off-by: Julien Desfossez <julien.desfossez at polymtl.ca>
> ---
>  liblttkconsumerd/liblttkconsumerd.c |   99 ++++-------------------------------
>  liblttkconsumerd/liblttkconsumerd.h |   26 ++++++++-
>  ltt-kconsumerd/ltt-kconsumerd.c     |   82 ++++++++++++++++++++++++++++-
>  3 files changed, 116 insertions(+), 91 deletions(-)
> 
> diff --git a/liblttkconsumerd/liblttkconsumerd.c b/liblttkconsumerd/liblttkconsumerd.c
> index f60888a..9d8cb00 100644
> --- a/liblttkconsumerd/liblttkconsumerd.c
> +++ b/liblttkconsumerd/liblttkconsumerd.c
> @@ -34,6 +34,8 @@
>  #include "liblttkconsumerd.h"
>  #include "lttngerr.h"
>  
> +static int (*on_buffer_ready)(struct kconsumerd_fd *kconsumerd_fd);

Hrm. Static variables. :-/ The normal way to do this, within a library,
would be to initialize a "context" you plan to work on rather than
initialize some static data global to the whole process that uses the
library. This will come handy if we have multiple callers using the
library within the same process.

So instead of:

static variables

init() (global to lib)

fini() (global to lib)

This would become:

create() (returns an allocated and initialized object)

destroy() (fre the allocated object)

I know you have some global "events" for signal handling. These should
be dispatched to all the objects currently live.

Does it make sense ?

Thanks,

Mathieu

> +
>  static
>  struct kconsumerd_global_data {
>  	/*
> @@ -265,8 +267,8 @@ static int kconsumerd_update_poll_array(struct pollfd **pollfd,
>   * mmap the ring buffer, read it and write the data to the tracefile.
>   * Returns the number of bytes written
>   */
> -static int kconsumerd_on_read_subbuffer_mmap(
> -		struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
> +int kconsumerd_on_read_subbuffer_mmap(struct kconsumerd_fd *kconsumerd_fd,
> +		unsigned long len)
>  {
>  	unsigned long mmap_len, mmap_offset, padded_len, padding_len;
>  	char *mmap_base;
> @@ -378,8 +380,8 @@ end:
>   * Splice the data from the ring buffer to the tracefile.
>   * Returns the number of bytes spliced
>   */
> -static int kconsumerd_on_read_subbuffer(
> -		struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
> +int kconsumerd_on_read_subbuffer_splice(struct kconsumerd_fd *kconsumerd_fd,
> +		unsigned long len)
>  {
>  	long ret = 0;
>  	loff_t offset = 0;
> @@ -469,87 +471,6 @@ end:
>  }
>  
>  /*
> - * kconsumerd_read_subbuffer
> - *
> - * Consume data on a file descriptor and write it on a trace file
> - */
> -static int kconsumerd_read_subbuffer(struct kconsumerd_fd *kconsumerd_fd)
> -{
> -	unsigned long len;
> -	int err;
> -	long ret = 0;
> -	int infd = kconsumerd_fd->consumerd_fd;
> -
> -	DBG("In kconsumerd_read_subbuffer (infd : %d)", infd);
> -	/* Get the next subbuffer */
> -	err = kernctl_get_next_subbuf(infd);
> -	if (err != 0) {
> -		ret = errno;
> -		perror("Reserving sub buffer failed (everything is normal, "
> -				"it is due to concurrency)");
> -		goto end;
> -	}
> -
> -	switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) {
> -	case LTTNG_EVENT_SPLICE:
> -		/* read the whole subbuffer */
> -		err = kernctl_get_padded_subbuf_size(infd, &len);
> -		if (err != 0) {
> -			ret = errno;
> -			perror("Getting sub-buffer len failed.");
> -			goto end;
> -		}
> -
> -		/* splice the subbuffer to the tracefile */
> -		ret = kconsumerd_on_read_subbuffer(kconsumerd_fd, len);
> -		if (ret < 0) {
> -			/*
> -			 * display the error but continue processing to try
> -			 * to release the subbuffer
> -			 */
> -			ERR("Error splicing to tracefile");
> -		}
> -		break;
> -	case LTTNG_EVENT_MMAP:
> -		/* read the used subbuffer size */
> -		err = kernctl_get_subbuf_size(infd, &len);
> -		if (err != 0) {
> -			ret = errno;
> -			perror("Getting sub-buffer len failed.");
> -			goto end;
> -		}
> -		/* write the subbuffer to the tracefile */
> -		ret = kconsumerd_on_read_subbuffer_mmap(kconsumerd_fd, len);
> -		if (ret < 0) {
> -			/*
> -			 * display the error but continue processing to try
> -			 * to release the subbuffer
> -			 */
> -			ERR("Error writing to tracefile");
> -		}
> -		break;
> -	default:
> -		ERR("Unknown output method");
> -		ret = -1;
> -	}
> -
> -	err = kernctl_put_next_subbuf(infd);
> -	if (err != 0) {
> -		ret = errno;
> -		if (errno == EFAULT) {
> -			perror("Error in unreserving sub buffer\n");
> -		} else if (errno == EIO) {
> -			/* Should never happen with newer LTTng versions */
> -			perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
> -		}
> -		goto end;
> -	}
> -
> -end:
> -	return ret;
> -}
> -
> -/*
>   * kconsumerd_poll_socket
>   *
>   * Poll on the should_quit pipe and the command socket
> @@ -795,7 +716,7 @@ void *kconsumerd_thread_poll_fds(void *data)
>  			case POLLPRI:
>  				DBG("Urgent read on fd %d", pollfd[i].fd);
>  				high_prio = 1;
> -				ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
> +				ret = on_buffer_ready(local_kconsumerd_fd[i]);
>  				/* it's ok to have an unavailable sub-buffer */
>  				if (ret == EAGAIN) {
>  					ret = 0;
> @@ -818,7 +739,7 @@ void *kconsumerd_thread_poll_fds(void *data)
>  			for (i = 0; i < nb_fd; i++) {
>  				if (pollfd[i].revents == POLLIN) {
>  					DBG("Normal read on fd %d", pollfd[i].fd);
> -					ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
> +					ret = on_buffer_ready(local_kconsumerd_fd[i]);
>  					/* it's ok to have an unavailable subbuffer */
>  					if (ret == EAGAIN) {
>  						ret = 0;
> @@ -848,10 +769,12 @@ end:
>   * - create the poll_pipe
>   * - create the should_quit pipe (for signal handler)
>   */
> -int kconsumerd_init(void)
> +int kconsumerd_init(int (*buffer_ready)(struct kconsumerd_fd *kconsumerd_fd))
>  {
>  	int ret;
>  
> +	on_buffer_ready = buffer_ready;
> +
>  	/* need to update the polling array at init time */
>  	kconsumerd_data.need_update = 1;
>  
> diff --git a/liblttkconsumerd/liblttkconsumerd.h b/liblttkconsumerd/liblttkconsumerd.h
> index 9e0b9ff..f98621f 100644
> --- a/liblttkconsumerd/liblttkconsumerd.h
> +++ b/liblttkconsumerd/liblttkconsumerd.h
> @@ -58,13 +58,35 @@ struct kconsumerd_fd {
>  };
>  
>  /*
> - * kconsumerd_init(void)
> + * kconsumerd_init
>   * initialise the necessary environnement :
>   * - inform the polling thread to update the polling array
>   * - create the poll_pipe
>   * - create the should_quit pipe (for signal handler)
> + *
> + * Takes a function pointer as argument, this function is called when data is
> + * available on a buffer. This function is responsible to do the
> + * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
> + * buffer configuration and then kernctl_put_next_subbuf at the end.
> + */
> +int kconsumerd_init(int (*kbuffer_ready)(struct kconsumerd_fd *kconsumerd_fd));
> +
> +/*
> + * kconsumerd_on_read_subbuffer_mmap
> + * mmap the ring buffer, read it and write the data to the tracefile.
> + * Returns the number of bytes written
> + */
> +int kconsumerd_on_read_subbuffer_mmap(struct kconsumerd_fd *kconsumerd_fd,
> +		        unsigned long len);
> +
> +/*
> + * kconsumerd_on_read_subbuffer
> + *
> + * Splice the data from the ring buffer to the tracefile.
> + * Returns the number of bytes spliced
>   */
> -int kconsumerd_init(void);
> +int kconsumerd_on_read_subbuffer_splice(struct kconsumerd_fd *kconsumerd_fd,
> +		        unsigned long len);
>  
>  /*
>   * kconsumerd_send_error
> diff --git a/ltt-kconsumerd/ltt-kconsumerd.c b/ltt-kconsumerd/ltt-kconsumerd.c
> index 4180f89..61ed005 100644
> --- a/ltt-kconsumerd/ltt-kconsumerd.c
> +++ b/ltt-kconsumerd/ltt-kconsumerd.c
> @@ -190,6 +190,86 @@ static void parse_args(int argc, char **argv)
>  	}
>  }
>  
> +/*
> + * read_subbuffer
> + *
> + * Consume data on a file descriptor and write it on a trace file
> + */
> +static int read_subbuffer(struct kconsumerd_fd *kconsumerd_fd)
> +{
> +	unsigned long len;
> +	int err;
> +	long ret = 0;
> +	int infd = kconsumerd_fd->consumerd_fd;
> +
> +	DBG("In kconsumerd_read_subbuffer (infd : %d)", infd);
> +	/* Get the next subbuffer */
> +	err = kernctl_get_next_subbuf(infd);
> +	if (err != 0) {
> +		ret = errno;
> +		perror("Reserving sub buffer failed (everything is normal, "
> +				"it is due to concurrency)");
> +		goto end;
> +	}
> +
> +	switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) {
> +		case LTTNG_EVENT_SPLICE:
> +			/* read the whole subbuffer */
> +			err = kernctl_get_padded_subbuf_size(infd, &len);
> +			if (err != 0) {
> +				ret = errno;
> +				perror("Getting sub-buffer len failed.");
> +				goto end;
> +			}
> +
> +			/* splice the subbuffer to the tracefile */
> +			ret = kconsumerd_on_read_subbuffer_splice(kconsumerd_fd, len);
> +			if (ret < 0) {
> +				/*
> +				 * display the error but continue processing to try
> +				 * to release the subbuffer
> +				 */
> +				ERR("Error splicing to tracefile");
> +			}
> +			break;
> +		case LTTNG_EVENT_MMAP:
> +			/* read the used subbuffer size */
> +			err = kernctl_get_subbuf_size(infd, &len);
> +			if (err != 0) {
> +				ret = errno;
> +				perror("Getting sub-buffer len failed.");
> +				goto end;
> +			}
> +			/* write the subbuffer to the tracefile */
> +			ret = kconsumerd_on_read_subbuffer_mmap(kconsumerd_fd, len);
> +			if (ret < 0) {
> +				/*
> +				 * display the error but continue processing to try
> +				 * to release the subbuffer
> +				 */
> +				ERR("Error writing to tracefile");
> +			}
> +			break;
> +		default:
> +			ERR("Unknown output method");
> +			ret = -1;
> +	}
> +
> +	err = kernctl_put_next_subbuf(infd);
> +	if (err != 0) {
> +		ret = errno;
> +		if (errno == EFAULT) {
> +			perror("Error in unreserving sub buffer\n");
> +		} else if (errno == EIO) {
> +			/* Should never happen with newer LTTng versions */
> +			perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
> +		}
> +		goto end;
> +	}
> +
> +end:
> +	return ret;
> +}
>  
>  /*
>   * main
> @@ -228,7 +308,7 @@ int main(int argc, char **argv)
>  	}
>  
>  	/* create the pipe to wake to receiving thread when needed */
> -	ret = kconsumerd_init();
> +	ret = kconsumerd_init(read_subbuffer);
>  	if (ret < 0) {
>  		goto end;
>  	}
> -- 
> 1.7.4.1
> 
> 
> _______________________________________________
> ltt-dev mailing list
> ltt-dev at lists.casi.polymtl.ca
> http://lists.casi.polymtl.ca/cgi-bin/mailman/listinfo/ltt-dev
> 

-- 
Mathieu Desnoyers
Operating System Efficiency R&D Consultant
EfficiOS Inc.
http://www.efficios.com




More information about the lttng-dev mailing list