[ltt-dev] [LTTNG-TOOLS PATCH v2] Callbacks on receive and update FD

Mathieu Desnoyers mathieu.desnoyers at efficios.com
Tue Aug 16 19:23:39 EDT 2011


* Julien Desfossez (julien.desfossez at polymtl.ca) wrote:
> The user of the lib can now take control over a new FD or the update
> operation of an existing FD.
> Opening the output tracefile is now the responsiblity of the user
> and not the library itself.

Merged, with edit.

Thanks,

Mathieu

> 
> Signed-off-by: Julien Desfossez <julien.desfossez at polymtl.ca>
> ---
>  include/lttng/lttng-kconsumerd.h     |   29 ++++++++--
>  liblttngkconsumerd/lttngkconsumerd.c |   96 +++++++++++++++++++--------------
>  ltt-kconsumerd/ltt-kconsumerd.c      |   45 +++++++++++++++-
>  3 files changed, 122 insertions(+), 48 deletions(-)
> 
> diff --git a/include/lttng/lttng-kconsumerd.h b/include/lttng/lttng-kconsumerd.h
> index 7e195ab..edff0ba 100644
> --- a/include/lttng/lttng-kconsumerd.h
> +++ b/include/lttng/lttng-kconsumerd.h
> @@ -79,6 +79,25 @@ struct lttng_kconsumerd_fd {
>  struct lttng_kconsumerd_local_data {
>  	/* function to call when data is available on a buffer */
>  	int (*on_buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd);
> +	/*
> +	 * function to call when we receive a new fd, it receives a newly allocated
> +	 * kconsumerd_fd, depending on the return code of this function, the new FD
> +	 * will be handled by the application or the library :
> +	 * - > 0 (success, FD is kept by application)
> +	 * - == 0 (success, FD is left to library)
> +	 * - < 0 (error)
> +	 */
> +	int (*on_recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd);
> +	/*
> +	 * function to call when a FD is getting updated by the session daemon,
> +	 * this function receives the FD as seen by the session daemon
> +	 * (sessiond_fd) and the new state, depending on the return code of this function
> +	 * the update of state for the FD is handled by the application or the library :
> +	 * - > 0 (success, FD is kept by application)
> +	 * - == 0 (success, FD is left to library)
> +	 * - < 0 (error)
> +	 */
> +	int (*on_update_fd)(int sessiond_fd, uint32_t state);
>  	/* socket to communicate errors with sessiond */
>  	int kconsumerd_error_socket;
>  	/* socket to exchange commands with sessiond */
> @@ -98,15 +117,15 @@ struct lttng_kconsumerd_local_data {
>   * - create the should_quit pipe (for signal handler)
>   * - create the thread pipe (for splice)
>   *
> - * Takes a function pointer as argument, this function is called when data is
> - * available on a buffer. This function is responsible to do the
> - * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
> - * buffer configuration and then kernctl_put_next_subbuf at the end.
> + * Takes the function pointers to the on_buffer_ready, on_recv_fd, and
> + * on_update_fd callbacks.
>   *
>   * Returns a pointer to the new context or NULL on error.
>   */
>  extern struct lttng_kconsumerd_local_data *lttng_kconsumerd_create(
> -		int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd));
> +		int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd),
> +		int (*recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd),
> +		int (*update_fd)(int sessiond_fd, uint32_t state));
>  
>  /*
>   * Close all fds associated with the instance and free the context.
> diff --git a/liblttngkconsumerd/lttngkconsumerd.c b/liblttngkconsumerd/lttngkconsumerd.c
> index 69ef9a0..751cea1 100644
> --- a/liblttngkconsumerd/lttngkconsumerd.c
> +++ b/liblttngkconsumerd/lttngkconsumerd.c
> @@ -125,22 +125,21 @@ static void kconsumerd_del_fd(struct lttng_kconsumerd_fd *lcf)
>  }
>  
>  /*
> - * Add a fd to the global list protected by a mutex.
> + * Create a struct lttcomm_kconsumerd_msg from the
> + * information received on the receiving socket
>   */
> -static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf,
> +struct lttng_kconsumerd_fd *kconsumerd_allocate_fd(
> +		struct lttcomm_kconsumerd_msg *buf,
>  		int consumerd_fd)
>  {
>  	struct lttng_kconsumerd_fd *tmp_fd;
> -	int ret = 0;
>  
> -	pthread_mutex_lock(&kconsumerd_data.lock);
> -	/* Check if already exist */
> -	ret = kconsumerd_find_session_fd(buf->fd);
> -	if (ret == 1) {
> +	tmp_fd = malloc(sizeof(struct lttng_kconsumerd_fd));
> +	if (tmp_fd == NULL) {
> +		perror("malloc struct lttng_kconsumerd_fd");
>  		goto end;
>  	}
>  
> -	tmp_fd = malloc(sizeof(struct lttng_kconsumerd_fd));
>  	tmp_fd->sessiond_fd = buf->fd;
>  	tmp_fd->consumerd_fd = consumerd_fd;
>  	tmp_fd->state = buf->state;
> @@ -152,42 +151,31 @@ static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf,
>  	tmp_fd->output = buf->output;
>  	strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX);
>  	tmp_fd->path_name[PATH_MAX - 1] = '\0';
> +	DBG("Allocated %s (sessiond_fd %d, consumerd_fd %d, out_fd %d)",
> +			tmp_fd->path_name, tmp_fd->sessiond_fd,
> +			tmp_fd->consumerd_fd, tmp_fd->out_fd);
>  
> -	/* Opening the tracefile in write mode */
> -	if (tmp_fd->path_name != NULL) {
> -		ret = open(tmp_fd->path_name,
> -				O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
> -		if (ret < 0) {
> -			ERR("Opening %s", tmp_fd->path_name);
> -			perror("open");
> -			goto end;
> -		}
> -		tmp_fd->out_fd = ret;
> -		DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name,
> -				tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd);
> -	}
> +end:
> +	return tmp_fd;
> +}
>  
> -	if (tmp_fd->output == LTTNG_EVENT_MMAP) {
> -		/* get the len of the mmap region */
> -		ret = kernctl_get_mmap_len(tmp_fd->consumerd_fd, &tmp_fd->mmap_len);
> -		if (ret != 0) {
> -			ret = errno;
> -			perror("kernctl_get_mmap_len");
> -			goto end;
> -		}
> +/*
> + * Add a fd to the global list protected by a mutex.
> + */
> +static int kconsumerd_add_fd(struct lttng_kconsumerd_fd *tmp_fd)
> +{
> +	int ret;
>  
> -		tmp_fd->mmap_base = mmap(NULL, tmp_fd->mmap_len,
> -				PROT_READ, MAP_PRIVATE, tmp_fd->consumerd_fd, 0);
> -		if (tmp_fd->mmap_base == MAP_FAILED) {
> -			perror("Error mmaping");
> -			ret = -1;
> -			goto end;
> -		}
> +	pthread_mutex_lock(&kconsumerd_data.lock);
> +	/* Check if already exist */
> +	ret = kconsumerd_find_session_fd(tmp_fd->sessiond_fd);
> +	if (ret == 1) {
> +		goto end;
>  	}
> -
>  	cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head);
>  	kconsumerd_data.fds_count++;
>  	kconsumerd_data.need_update = 1;
> +
>  end:
>  	pthread_mutex_unlock(&kconsumerd_data.lock);
>  	return ret;
> @@ -263,6 +251,7 @@ static int kconsumerd_consumerd_recv_fd(
>  	int nb_fd;
>  	char recv_fd[CMSG_SPACE(sizeof(int))];
>  	struct lttcomm_kconsumerd_msg lkm;
> +	struct lttng_kconsumerd_fd *new_fd;
>  
>  	/* the number of fds we are about to receive */
>  	nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg);
> @@ -313,14 +302,34 @@ static int kconsumerd_consumerd_recv_fd(
>  					DBG("kconsumerd_add_fd %s (%d)", lkm.path_name,
>  							((int *) CMSG_DATA(cmsg))[0]);
>  
> -					ret = kconsumerd_add_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]);
> -					if (ret < 0) {
> +					new_fd = kconsumerd_allocate_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]);
> +					if (new_fd == NULL) {
>  						lttng_kconsumerd_send_error(ctx, KCONSUMERD_OUTFD_ERROR);
>  						goto end;
>  					}
> +
> +					if (ctx->on_recv_fd != NULL) {
> +						ret = ctx->on_recv_fd(new_fd);
> +						if (ret == 0) {
> +							kconsumerd_add_fd(new_fd);
> +						} else if (ret < 0) {
> +							goto end;
> +						}
> +					} else {
> +						kconsumerd_add_fd(new_fd);
> +					}
>  					break;
>  				case UPDATE_STREAM:
> -					kconsumerd_change_fd_state(lkm.fd, lkm.state);
> +					if (ctx->on_update_fd != NULL) {
> +						ret = ctx->on_update_fd(lkm.fd, lkm.state);
> +						if (ret == 0) {
> +							kconsumerd_change_fd_state(lkm.fd, lkm.state);
> +						} else if (ret < 0) {
> +							goto end;
> +						}
> +					} else {
> +							kconsumerd_change_fd_state(lkm.fd, lkm.state);
> +					}
>  					break;
>  				default:
>  					break;
> @@ -754,7 +763,9 @@ end:
>   * Returns a pointer to the new context or NULL on error.
>   */
>  struct lttng_kconsumerd_local_data *lttng_kconsumerd_create(
> -		int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd))
> +		int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd),
> +		int (*recv_fd)(struct lttng_kconsumerd_fd *kconsumerd_fd),
> +		int (*update_fd)(int sessiond_fd, uint32_t state))
>  {
>  	int ret;
>  	struct lttng_kconsumerd_local_data *ctx;
> @@ -765,7 +776,10 @@ struct lttng_kconsumerd_local_data *lttng_kconsumerd_create(
>  		goto end;
>  	}
>  
> +	/* assign the callbacks */
>  	ctx->on_buffer_ready = buffer_ready;
> +	ctx->on_recv_fd = recv_fd;
> +	ctx->on_update_fd = update_fd;
>  
>  	ret = pipe(ctx->kconsumerd_poll_pipe);
>  	if (ret < 0) {
> diff --git a/ltt-kconsumerd/ltt-kconsumerd.c b/ltt-kconsumerd/ltt-kconsumerd.c
> index ca93965..84ea95d 100644
> --- a/ltt-kconsumerd/ltt-kconsumerd.c
> +++ b/ltt-kconsumerd/ltt-kconsumerd.c
> @@ -277,6 +277,47 @@ end:
>  	return ret;
>  }
>  
> +static int on_recv_fd(struct lttng_kconsumerd_fd *kconsumerd_fd)
> +{
> +	int ret;
> +
> +	/* Opening the tracefile in write mode */
> +	if (kconsumerd_fd->path_name != NULL) {
> +		ret = open(kconsumerd_fd->path_name,
> +				O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
> +		if (ret < 0) {
> +			ERR("Opening %s", kconsumerd_fd->path_name);
> +			perror("open");
> +			goto error;
> +		}
> +		kconsumerd_fd->out_fd = ret;
> +	}
> +
> +	if (kconsumerd_fd->output == LTTNG_EVENT_MMAP) {
> +		/* get the len of the mmap region */
> +		ret = kernctl_get_mmap_len(kconsumerd_fd->consumerd_fd, &kconsumerd_fd->mmap_len);
> +		if (ret != 0) {
> +			ret = errno;
> +			perror("kernctl_get_mmap_len");
> +			goto error;
> +		}
> +
> +		kconsumerd_fd->mmap_base = mmap(NULL, kconsumerd_fd->mmap_len,
> +				PROT_READ, MAP_PRIVATE, kconsumerd_fd->consumerd_fd, 0);
> +		if (kconsumerd_fd->mmap_base == MAP_FAILED) {
> +			perror("Error mmaping");
> +			ret = -1;
> +			goto error;
> +		}
> +	}
> +
> +	/* we return 0 to let the library handle the FD internally */
> +	return 0;
> +
> +error:
> +	return ret;
> +}
> +
>  /*
>   * main
>   */
> @@ -303,8 +344,8 @@ int main(int argc, char **argv)
>  		snprintf(command_sock_path, PATH_MAX,
>  				KCONSUMERD_CMD_SOCK_PATH);
>  	}
> -	/* create the pipe to wake to receiving thread when needed */
> -	ctx = lttng_kconsumerd_create(read_subbuffer);
> +	/* create the consumer instance with and assign the callbacks */
> +	ctx = lttng_kconsumerd_create(read_subbuffer, on_recv_fd, NULL);
>  	if (ctx == NULL) {
>  		goto error;
>  	}
> -- 
> 1.7.4.1
> 

-- 
Mathieu Desnoyers
Operating System Efficiency R&D Consultant
EfficiOS Inc.
http://www.efficios.com




More information about the lttng-dev mailing list