[ltt-dev] Liblttd patch

Mathieu Desnoyers compudj at krystal.dyndns.org
Tue Feb 16 14:35:51 EST 2010


* Michael Sills Lavoie (michael.sills-lavoie at polymtl.ca) wrote:
> Hey,
> 
> This is a working implementation of the new liblttd and lttd. This is only a
> work in progress because we don't have the possibility to create multiple
> instances of the "daemon" yet.
> 
> You can apply the patch with : git apply liblttd.patch in the ltt-control
> root directory.
> 
> What do you think about it ?


A few things: please take the habit of writing your comments in english
so everyone on the mailing list can parse them ;)

You should probably review the kernel doc coding styles for the function
comments.

Besides that, things seems to be generally on the right track.

You should probably submit these changes in two patches:

- first, just move the code from lttd to liblttd and adapt everything so
  it works (more or less this patch).
- second, a patch that adds the multiple instances feature.

Thanks,

Mathieu

> 
> Michael

> diff --git a/Makefile.am b/Makefile.am
> index 8f9974d..06500fd 100644
> --- a/Makefile.am
> +++ b/Makefile.am
> @@ -1,2 +1,2 @@
> -SUBDIRS = liblttctl lttctl lttd specs
> +SUBDIRS = liblttctl lttctl liblttd lttd specs
>  
> diff --git a/configure.in b/configure.in
> index d67e7d4..a992cd8 100644
> --- a/configure.in
> +++ b/configure.in
> @@ -76,8 +76,10 @@ DEFAULT_INCLUDES="-I\$(top_srcdir) -I\$(top_builddir)"
>  #AC_SUBST(CPPFLAGS)
>  
>  lttctlincludedir="${includedir}/liblttctl"
> +liblttdincludedir="${includedir}/liblttd"
>  
>  AC_SUBST(lttctlincludedir)
> +AC_SUBST(liblttdincludedir)
>  AC_SUBST(UTIL_LIBS)
>  AC_SUBST(THREAD_LIBS)
>  AC_SUBST(DEFAULT_INCLUDES)
> @@ -85,6 +87,7 @@ AC_SUBST(DEFAULT_INCLUDES)
>  AC_CONFIG_FILES([Makefile
>       liblttctl/Makefile
>       lttctl/Makefile
> +     liblttd/Makefile
>       lttd/Makefile
>       specs/Makefile])
>  AC_OUTPUT
> diff --git a/liblttd/Makefile.am b/liblttd/Makefile.am
> new file mode 100644
> index 0000000..3c1eeda
> --- /dev/null
> +++ b/liblttd/Makefile.am
> @@ -0,0 +1,7 @@
> +
> +
> +lib_LTLIBRARIES = liblttd.la
> +liblttd_la_SOURCES = liblttd.c
> +
> +liblttdinclude_HEADERS = \
> +	liblttd.h
> diff --git a/liblttd/liblttd.c b/liblttd/liblttd.c
> new file mode 100644
> index 0000000..941ca87
> --- /dev/null
> +++ b/liblttd/liblttd.c
> @@ -0,0 +1,754 @@
> +/* lttd
> + *
> + * Linux Trace Toolkit Daemon
> + *
> + * This is a simple daemon that reads a few relay+debugfs channels and save
> + * them in a trace.
> + *
> + * CPU hot-plugging is supported using inotify.
> + *
> + * Copyright 2005 -
> + * 	Mathieu Desnoyers <mathieu.desnoyers at polymtl.ca>
> + */
> +
> +#ifdef HAVE_CONFIG_H
> +#include <config.h>
> +#endif
> +
> +#include "liblttd.h"
> +
> +#define _REENTRANT
> +#define _GNU_SOURCE
> +#include <features.h>
> +#include <stdio.h>
> +#include <unistd.h>
> +#include <errno.h>
> +#include <sys/types.h>
> +#include <stdlib.h>
> +#include <dirent.h>
> +#include <string.h>
> +#include <fcntl.h>
> +#include <sys/stat.h>
> +#include <sys/poll.h>
> +#include <sys/mman.h>
> +#include <sys/syscall.h>
> +#include <unistd.h>
> +#include <asm/ioctls.h>
> +
> +#include <linux/version.h>
> +
> +/* Relayfs IOCTL */
> +#include <asm/ioctl.h>
> +#include <asm/types.h>
> +
> +/* Get the next sub buffer that can be read. */
> +#define RELAY_GET_SB		_IOR(0xF5, 0x00,__u32)
> +/* Release the oldest reserved (by "get") sub buffer. */
> +#define RELAY_PUT_SB		_IOW(0xF5, 0x01,__u32)
> +/* returns the number of sub buffers in the per cpu channel. */
> +#define RELAY_GET_N_SB		_IOR(0xF5, 0x02,__u32)
> +/* returns the size of the current sub buffer. */
> +#define RELAY_GET_SB_SIZE	_IOR(0xF5, 0x03, __u32)
> +/* returns the size of data to consume in the current sub-buffer. */
> +#define RELAY_GET_MAX_SB_SIZE	_IOR(0xF5, 0x04, __u32)
> +
> +
> +#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,14)
> +#include <sys/inotify.h>
> +
> +#define HAS_INOTIFY
> +#else
> +static inline int inotify_init (void)
> +{
> +	return -1;
> +}
> +
> +static inline int inotify_add_watch (int fd, const char *name, __u32 mask)
> +{
> +	return 0;
> +}
> +
> +static inline int inotify_rm_watch (int fd, __u32 wd)
> +{
> +	return 0;
> +}
> +#undef HAS_INOTIFY
> +#endif
> +
> +/* !modif! Enlever, seulement utilisé pendant l'implémentation*/
> +struct liblttd_callbacks *callbacks;
> +
> +struct channel_trace_fd {
> +	struct fd_pair *pair;
> +	int num_pairs;
> +};
> +
> +/* !modif! Modifier pour pas utiliser le chemin de la trace*/
> +struct inotify_watch {
> +	int wd;
> +	char path_channel[PATH_MAX];
> +	char *base_path_channel;
> +};
> +
> +struct inotify_watch_array {
> +	struct inotify_watch *elem;
> +	int num;
> +};
> +
> +struct channel_trace_fd fd_pairs = { NULL, 0 };
> +int inotify_fd = -1;
> +struct inotify_watch_array inotify_watch_array = { NULL, 0 };
> +
> +/* protects fd_pairs and inotify_watch_array */
> +pthread_rwlock_t fd_pairs_lock = PTHREAD_RWLOCK_INITIALIZER;
> +
> +/* !modif! Mettre dans une structure s'il faut avoir plus d'une instance*/
> +static char		*channel_name = NULL;
> +static unsigned long	num_threads = 1;
> +volatile static int	quit_program = 0;	/* For signal handler */
> +static int		dump_flight_only = 0;
> +static int		dump_normal_only = 0;
> +static int		verbose_mode = 0;
> +
> +#define printf_verbose(fmt, args...) \
> +  do {                               \
> +    if (verbose_mode)                \
> +      printf(fmt, ##args);           \
> +  } while (0)
> +
> +
> +int open_buffer_file(char *filename, char *path_channel,
> +	char *base_path_channel, struct channel_trace_fd *fd_pairs)
> +{
> +	int open_ret = 0;
> +	int ret = 0;
> +
> +	if(strncmp(filename, "flight-", sizeof("flight-")-1) != 0) {
> +		if(dump_flight_only) {
> +			printf_verbose("Skipping normal channel %s\n",
> +				path_channel);
> +			return 0;
> +		}
> +	} else {
> +		if(dump_normal_only) {
> +			printf_verbose("Skipping flight channel %s\n",
> +				path_channel);
> +			return 0;
> +		}
> +	}
> +	printf_verbose("Opening file.\n");
> +
> +	fd_pairs->pair = realloc(fd_pairs->pair,
> +			++fd_pairs->num_pairs * sizeof(struct fd_pair));
> +
> +	/* Open the channel in read mode */
> +	fd_pairs->pair[fd_pairs->num_pairs-1].channel =
> +		open(path_channel, O_RDONLY | O_NONBLOCK);
> +	if(fd_pairs->pair[fd_pairs->num_pairs-1].channel == -1) {
> +		perror(path_channel);
> +		fd_pairs->num_pairs--;
> +		return 0;	/* continue */
> +	}
> +
> +	/*!callback! Ajouter un callback pour l'ouverture.*/
> +	if(callbacks->on_open_channel) ret = callbacks->on_open_channel(
> +			callbacks, &fd_pairs->pair[fd_pairs->num_pairs-1],
> +			base_path_channel);
> +
> +	if(ret != 0) {
> +		open_ret = -1;
> +		close(fd_pairs->pair[fd_pairs->num_pairs-1].channel);
> +		fd_pairs->num_pairs--;
> +		goto end;
> +	}
> +
> +end:
> +	return open_ret;
> +}
> +
> +int open_channel_trace_pairs(char *subchannel_name,
> +		char *base_subchannel_name,
> +		struct channel_trace_fd *fd_pairs, int *inotify_fd,
> +		struct inotify_watch_array *iwatch_array)
> +{
> +	DIR *channel_dir = opendir(subchannel_name);
> +	struct dirent *entry;
> +	struct stat stat_buf;
> +	int ret;
> +	char path_channel[PATH_MAX];
> +	int path_channel_len;
> +	char *path_channel_ptr;
> +	char *base_subchannel_ptr;
> +
> +	int open_ret = 0;
> +
> +	if(channel_dir == NULL) {
> +		perror(subchannel_name);
> +		open_ret = ENOENT;
> +		goto end;
> +	}
> +
> +	printf_verbose("Calling on new channels folder");
> +	if(callbacks->on_new_channels_folder) ret = callbacks->
> +			on_new_channels_folder(callbacks,
> +			base_subchannel_name);
> +	if(ret == -1) {
> +		open_ret = -1;
> +		goto end;
> +	}
> +
> +	strncpy(path_channel, subchannel_name, PATH_MAX-1);
> +	path_channel_len = strlen(path_channel);
> +	path_channel[path_channel_len] = '/';
> +	path_channel_len++;
> +	path_channel_ptr = path_channel + path_channel_len;
> +	base_subchannel_ptr = path_channel +
> +		(base_subchannel_name - subchannel_name);
> +
> +#ifdef HAS_INOTIFY
> +	iwatch_array->elem = realloc(iwatch_array->elem,
> +		++iwatch_array->num * sizeof(struct inotify_watch));
> +
> +	printf_verbose("Adding inotify for channel %s\n", path_channel);
> +	iwatch_array->elem[iwatch_array->num-1].wd = inotify_add_watch(*inotify_fd, path_channel, IN_CREATE);
> +	strcpy(iwatch_array->elem[iwatch_array->num-1].path_channel, path_channel);
> +	iwatch_array->elem[iwatch_array->num-1].base_path_channel =
> +		iwatch_array->elem[iwatch_array->num-1].path_channel +
> +		(base_subchannel_name - subchannel_name);
> +	printf_verbose("Added inotify for channel %s, wd %u\n",
> +		iwatch_array->elem[iwatch_array->num-1].path_channel,
> +		iwatch_array->elem[iwatch_array->num-1].wd);
> +#endif
> +
> +	while((entry = readdir(channel_dir)) != NULL) {
> +
> +		if(entry->d_name[0] == '.') continue;
> +
> +		strncpy(path_channel_ptr, entry->d_name, PATH_MAX - path_channel_len);
> +
> +		ret = stat(path_channel, &stat_buf);
> +		if(ret == -1) {
> +			perror(path_channel);
> +			continue;
> +		}
> +
> +		printf_verbose("Channel file : %s\n", path_channel);
> +
> +		if(S_ISDIR(stat_buf.st_mode)) {
> +
> +			printf_verbose("Entering channel subdirectory...\n");
> +			ret = open_channel_trace_pairs(path_channel, base_subchannel_ptr, fd_pairs,
> +				inotify_fd, iwatch_array);
> +			if(ret < 0) continue;
> +		} else if(S_ISREG(stat_buf.st_mode)) {
> +			open_ret = open_buffer_file(entry->d_name, path_channel, base_subchannel_ptr,
> +				fd_pairs);
> +			if(open_ret)
> +				goto end;
> +		}
> +	}
> +
> +end:
> +	closedir(channel_dir);
> +
> +	return open_ret;
> +}
> +
> +
> +int read_subbuffer(struct fd_pair *pair)
> +{
> +	unsigned int consumed_old, len;
> +	int err;
> +	long ret;
> +	off_t offset;
> +
> +
> +	err = ioctl(pair->channel, RELAY_GET_SB, &consumed_old);
> +	printf_verbose("cookie : %u\n", consumed_old);
> +	if(err != 0) {
> +		ret = errno;
> +		perror("Reserving sub buffer failed (everything is normal, it is due to concurrency)");
> +		goto get_error;
> +	}
> +
> +	err = ioctl(pair->channel, RELAY_GET_SB_SIZE, &len);
> +	if(err != 0) {
> +		ret = errno;
> +		perror("Getting sub-buffer len failed.");
> +		goto get_error;
> +	}
> +
> +	if(callbacks->on_read_subbuffer) ret = callbacks->on_read_subbuffer(
> +		callbacks, pair, len);
> +
> +write_error:
> +	ret = 0;
> +	err = ioctl(pair->channel, RELAY_PUT_SB, &consumed_old);
> +	if(err != 0) {
> +		ret = errno;
> +		if(errno == EFAULT) {
> +			perror("Error in unreserving sub buffer\n");
> +		} else if(errno == EIO) {
> +			/* Should never happen with newer LTTng versions */
> +			perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
> +		}
> +		goto get_error;
> +	}
> +
> +get_error:
> +	return ret;
> +}
> +
> +
> +int map_channels(struct channel_trace_fd *fd_pairs,
> +	int idx_begin, int idx_end)
> +{
> +	int i,j;
> +	int ret=0;
> +
> +	if(fd_pairs->num_pairs <= 0) {
> +		printf("No channel to read\n");
> +		goto end;
> +	}
> +
> +	/* Get the subbuf sizes and number */
> +
> +	for(i=idx_begin;i<idx_end;i++) {
> +		struct fd_pair *pair = &fd_pairs->pair[i];
> +
> +		ret = ioctl(pair->channel, RELAY_GET_N_SB, &pair->n_sb);
> +		if(ret != 0) {
> +			perror("Error in getting the number of sub-buffers");
> +			goto end;
> +		}
> +		ret = ioctl(pair->channel, RELAY_GET_MAX_SB_SIZE,
> +			    &pair->max_sb_size);
> +		if(ret != 0) {
> +			perror("Error in getting the max sub-buffer size");
> +			goto end;
> +		}
> +		ret = pthread_mutex_init(&pair->mutex, NULL);	/* Fast mutex */
> +		if(ret != 0) {
> +			perror("Error in mutex init");
> +			goto end;
> +		}
> +	}
> +
> +end:
> +	return ret;
> +}
> +
> +int unmap_channels(struct channel_trace_fd *fd_pairs)
> +{
> +	int j;
> +	int ret=0;
> +
> +	/* Munmap each FD */
> +	for(j=0;j<fd_pairs->num_pairs;j++) {
> +		struct fd_pair *pair = &fd_pairs->pair[j];
> +		int err_ret;
> +
> +		err_ret = pthread_mutex_destroy(&pair->mutex);
> +		if(err_ret != 0) {
> +			perror("Error in mutex destroy");
> +		}
> +		ret |= err_ret;
> +	}
> +
> +	return ret;
> +}
> +
> +#ifdef HAS_INOTIFY
> +/* Inotify event arrived.
> + *
> + * Only support add file for now.
> + */
> +
> +int read_inotify(int inotify_fd,
> +	struct channel_trace_fd *fd_pairs,
> +	struct inotify_watch_array *iwatch_array)
> +{
> +	char buf[sizeof(struct inotify_event) + PATH_MAX];
> +	char path_channel[PATH_MAX];
> +	ssize_t len;
> +	struct inotify_event *ievent;
> +	size_t offset;
> +	unsigned int i;
> +	int ret;
> +	int old_num;
> +
> +	offset = 0;
> +	len = read(inotify_fd, buf, sizeof(struct inotify_event) + PATH_MAX);
> +	if(len < 0) {
> +
> +		if(errno == EAGAIN)
> +			return 0;  /* another thread got the data before us */
> +
> +		printf("Error in read from inotify FD %s.\n", strerror(len));
> +		return -1;
> +	}
> +	while(offset < len) {
> +		ievent = (struct inotify_event *)&(buf[offset]);
> +		for(i=0; i<iwatch_array->num; i++) {
> +			if(iwatch_array->elem[i].wd == ievent->wd &&
> +				ievent->mask == IN_CREATE) {
> +				printf_verbose(
> +					"inotify wd %u event mask : %u for %s%s\n",
> +					ievent->wd, ievent->mask,
> +					iwatch_array->elem[i].path_channel,
> +					ievent->name);
> +				old_num = fd_pairs->num_pairs;
> +				strcpy(path_channel, iwatch_array->elem[i].path_channel);
> +				strcat(path_channel, ievent->name);
> +				if(ret = open_buffer_file(ievent->name, path_channel,
> +					path_channel + (iwatch_array->elem[i].base_path_channel -
> +					iwatch_array->elem[i].path_channel), fd_pairs)) {
> +					printf("Error opening buffer file\n");
> +					return -1;
> +				}
> +				if(ret = map_channels(fd_pairs, old_num, fd_pairs->num_pairs)) {
> +					printf("Error mapping channel\n");
> +					return -1;
> +				}
> +
> +			}
> +		}
> +		offset += sizeof(*ievent) + ievent->len;
> +	}
> +}
> +#endif //HAS_INOTIFY
> +
> +/* read_channels
> + *
> + * Thread worker.
> + *
> + * Read the debugfs channels and write them in the paired tracefiles.
> + *
> + * @fd_pairs : paired channels and trace files.
> + *
> + * returns 0 on success, -1 on error.
> + *
> + * Note that the high priority polled channels are consumed first. We then poll
> + * again to see if these channels are still in priority. Only when no
> + * high priority channel is left, we start reading low priority channels.
> + *
> + * Note that a channel is considered high priority when the buffer is almost
> + * full.
> + */
> +
> +int read_channels(unsigned long thread_num, struct channel_trace_fd *fd_pairs,
> +	int inotify_fd, struct inotify_watch_array *iwatch_array)
> +{
> +	struct pollfd *pollfd = NULL;
> +	int num_pollfd;
> +	int i,j;
> +	int num_rdy, num_hup;
> +	int high_prio;
> +	int ret = 0;
> +	int inotify_fds;
> +	unsigned int old_num;
> +
> +#ifdef HAS_INOTIFY
> +	inotify_fds = 1;
> +#else
> +	inotify_fds = 0;
> +#endif
> +
> +	pthread_rwlock_rdlock(&fd_pairs_lock);
> +
> +	/* Start polling the FD. Keep one fd for inotify */
> +	pollfd = malloc((inotify_fds + fd_pairs->num_pairs) * sizeof(struct pollfd));
> +
> +#ifdef HAS_INOTIFY
> +	pollfd[0].fd = inotify_fd;
> +	pollfd[0].events = POLLIN|POLLPRI;
> +#endif
> +
> +	for(i=0;i<fd_pairs->num_pairs;i++) {
> +		pollfd[inotify_fds+i].fd = fd_pairs->pair[i].channel;
> +		pollfd[inotify_fds+i].events = POLLIN|POLLPRI;
> +	}
> +	num_pollfd = inotify_fds + fd_pairs->num_pairs;
> +
> +
> +	pthread_rwlock_unlock(&fd_pairs_lock);
> +
> +	while(1) {
> +		high_prio = 0;
> +		num_hup = 0;
> +#ifdef DEBUG
> +		printf("Press a key for next poll...\n");
> +		char buf[1];
> +		read(STDIN_FILENO, &buf, 1);
> +		printf("Next poll (polling %d fd) :\n", num_pollfd);
> +#endif //DEBUG
> +
> +		/* Have we received a signal ? */
> +		if(quit_program) break;
> +
> +		num_rdy = poll(pollfd, num_pollfd, -1);
> +
> +		if(num_rdy == -1) {
> +			perror("Poll error");
> +			goto free_fd;
> +		}
> +
> +		printf_verbose("Data received\n");
> +#ifdef HAS_INOTIFY
> +		switch(pollfd[0].revents) {
> +			case POLLERR:
> +				printf_verbose(
> +					"Error returned in polling inotify fd %d.\n",
> +					pollfd[0].fd);
> +				break;
> +			case POLLHUP:
> +				printf_verbose(
> +					"Polling inotify fd %d tells it has hung up.\n",
> +					pollfd[0].fd);
> +				break;
> +			case POLLNVAL:
> +				printf_verbose(
> +					"Polling inotify fd %d tells fd is not open.\n",
> +					pollfd[0].fd);
> +				break;
> +			case POLLPRI:
> +			case POLLIN:
> +				printf_verbose(
> +					"Polling inotify fd %d : data ready.\n",
> +					pollfd[0].fd);
> +
> +				pthread_rwlock_wrlock(&fd_pairs_lock);
> +				read_inotify(inotify_fd, fd_pairs, iwatch_array);
> +				pthread_rwlock_unlock(&fd_pairs_lock);
> +
> +			break;
> +		}
> +#endif
> +
> +		for(i=inotify_fds;i<num_pollfd;i++) {
> +			switch(pollfd[i].revents) {
> +				case POLLERR:
> +					printf_verbose(
> +						"Error returned in polling fd %d.\n",
> +						pollfd[i].fd);
> +					num_hup++;
> +					break;
> +				case POLLHUP:
> +					printf_verbose(
> +						"Polling fd %d tells it has hung up.\n",
> +						pollfd[i].fd);
> +					num_hup++;
> +					break;
> +				case POLLNVAL:
> +					printf_verbose(
> +						"Polling fd %d tells fd is not open.\n",
> +						pollfd[i].fd);
> +					num_hup++;
> +					break;
> +				case POLLPRI:
> +					pthread_rwlock_rdlock(&fd_pairs_lock);
> +					if(pthread_mutex_trylock(&fd_pairs->pair[i-inotify_fds].mutex) == 0) {
> +						printf_verbose(
> +							"Urgent read on fd %d\n",
> +							pollfd[i].fd);
> +						/* Take care of high priority channels first. */
> +						high_prio = 1;
> +						/* it's ok to have an unavailable sub-buffer */
> +						ret = read_subbuffer(&fd_pairs->pair[i-inotify_fds]);
> +						if(ret == EAGAIN) ret = 0;
> +
> +						ret = pthread_mutex_unlock(&fd_pairs->pair[i-inotify_fds].mutex);
> +						if(ret)
> +							printf("Error in mutex unlock : %s\n", strerror(ret));
> +					}
> +					pthread_rwlock_unlock(&fd_pairs_lock);
> +					break;
> +			}
> +		}
> +		/* If every buffer FD has hung up, we end the read loop here */
> +		if(num_hup == num_pollfd - inotify_fds) break;
> +
> +		if(!high_prio) {
> +			for(i=inotify_fds;i<num_pollfd;i++) {
> +				switch(pollfd[i].revents) {
> +					case POLLIN:
> +						pthread_rwlock_rdlock(&fd_pairs_lock);
> +						if(pthread_mutex_trylock(&fd_pairs->pair[i-inotify_fds].mutex) == 0) {
> +							/* Take care of low priority channels. */
> +							printf_verbose(
> +								"Normal read on fd %d\n",
> +								pollfd[i].fd);
> +							/* it's ok to have an unavailable subbuffer */
> +							ret = read_subbuffer(&fd_pairs->pair[i-inotify_fds]);
> +							if(ret == EAGAIN) ret = 0;
> +
> +							ret = pthread_mutex_unlock(&fd_pairs->pair[i-inotify_fds].mutex);
> +							if(ret)
> +								printf("Error in mutex unlock : %s\n", strerror(ret));
> +						}
> +						pthread_rwlock_unlock(&fd_pairs_lock);
> +						break;
> +				}
> +			}
> +		}
> +
> +		/* Update pollfd array if an entry was added to fd_pairs */
> +		pthread_rwlock_rdlock(&fd_pairs_lock);
> +		if((inotify_fds + fd_pairs->num_pairs) != num_pollfd) {
> +			pollfd = realloc(pollfd,
> +					(inotify_fds + fd_pairs->num_pairs) * sizeof(struct pollfd));
> +			for(i=num_pollfd-inotify_fds;i<fd_pairs->num_pairs;i++) {
> +				pollfd[inotify_fds+i].fd = fd_pairs->pair[i].channel;
> +				pollfd[inotify_fds+i].events = POLLIN|POLLPRI;
> +			}
> +			num_pollfd = fd_pairs->num_pairs + inotify_fds;
> +		}
> +		pthread_rwlock_unlock(&fd_pairs_lock);
> +
> +		/* NB: If the fd_pairs structure is updated by another thread from this
> +		 *     point forward, the current thread will wait in the poll without
> +		 *     monitoring the new channel. However, this thread will add the
> +		 *     new channel on next poll (and this should not take too much time
> +		 *     on a loaded system).
> +		 *
> +		 *     This event is quite unlikely and can only occur if a CPU is
> +		 *     hot-plugged while multple lttd threads are running.
> +		 */
> +	}
> +
> +free_fd:
> +	free(pollfd);
> +
> +end:
> +	return ret;
> +}
> +
> +
> +void close_channel_trace_pairs(struct channel_trace_fd *fd_pairs, int inotify_fd,
> +	struct inotify_watch_array *iwatch_array)
> +{
> +	int i;
> +	int ret;
> +
> +	for(i=0;i<fd_pairs->num_pairs;i++) {
> +		ret = close(fd_pairs->pair[i].channel);
> +		if(ret == -1) perror("Close error on channel");
> +		if(callbacks->on_close_channel) {
> +			ret = callbacks->on_close_channel(
> +				callbacks, &fd_pairs->pair[i]);
> +			if(ret != 0) perror("Error on close channel callback");
> +		}
> +	}
> +	free(fd_pairs->pair);
> +	free(iwatch_array->elem);
> +}
> +
> +/* Thread worker */
> +void * thread_main(void *arg)
> +{
> +	long ret = 0;
> +	unsigned long thread_num = (unsigned long)arg;
> +
> +	if(callbacks->on_new_thread)
> +		ret = callbacks->on_new_thread(callbacks, thread_num);
> +
> +	if (ret < 0) {
> +		return (void*)ret;
> +	}
> +	ret = read_channels(thread_num, &fd_pairs, inotify_fd, &inotify_watch_array);
> +
> +	if(callbacks->on_close_thread)
> +		callbacks->on_close_thread(callbacks, thread_num);
> +
> +	return (void*)ret;
> +}
> +
> +/*on_close_thread has to be reentrant, it'll be called by many threads*/
> +int(*on_close_thread)(struct liblttd_callbacks *data, unsigned long thread_num);
> +
> +int channels_init()
> +{
> +	int ret = 0;
> +
> +	inotify_fd = inotify_init();
> +	fcntl(inotify_fd, F_SETFL, O_NONBLOCK);
> +
> +	if(ret = open_channel_trace_pairs(channel_name,
> +			channel_name + strlen(channel_name), &fd_pairs,
> +			&inotify_fd, &inotify_watch_array))
> +		goto close_channel;
> +	if (fd_pairs.num_pairs == 0) {
> +		printf("No channel available for reading, exiting\n");
> +		ret = -ENOENT;
> +		goto close_channel;
> +	}
> +	if(ret = map_channels(&fd_pairs, 0, fd_pairs.num_pairs))
> +		goto close_channel;
> +	return 0;
> +
> +close_channel:
> +	close_channel_trace_pairs(&fd_pairs, inotify_fd, &inotify_watch_array);
> +	if(inotify_fd >= 0)
> +		close(inotify_fd);
> +	return ret;
> +}
> +
> +int liblttd_start(char *channel_path, unsigned long n_threads,
> +		int flight_only, int normal_only, int verbose,
> +		struct liblttd_callbacks *user_data){
> +	int ret = 0;
> +	pthread_t *tids;
> +	unsigned long i;
> +	void *tret;
> +
> +	/*!modif! C'est seulement la pour tester, on devrait tester pour voir si
> +	les arguments sont valide. On fera surement ca autrement avec plusieurs
> +	instances*/
> +	channel_name = channel_path;
> +	num_threads = n_threads;
> +	dump_flight_only = flight_only;
> +	dump_normal_only = normal_only;
> +	verbose_mode = verbose;
> +	callbacks = user_data;
> +
> +	if(ret = channels_init())
> +		return ret;
> +
> +	tids = malloc(sizeof(pthread_t) * num_threads);
> +	for(i=0; i<num_threads; i++) {
> +
> +		ret = pthread_create(&tids[i], NULL, thread_main, (void*)i);
> +		if(ret) {
> +			perror("Error creating thread");
> +			break;
> +		}
> +	}
> +
> +	for(i=0; i<num_threads; i++) {
> +		ret = pthread_join(tids[i], &tret);
> +		if(ret) {
> +			perror("Error joining thread");
> +			break;
> +		}
> +		if((long)tret != 0) {
> +			printf("Error %s occured in thread %ld\n",
> +				strerror((long)tret), i);
> +		}
> +	}
> +
> +	free(tids);
> +	ret = unmap_channels(&fd_pairs);
> +	close_channel_trace_pairs(&fd_pairs, inotify_fd, &inotify_watch_array);
> +	if(inotify_fd >= 0)
> +		close(inotify_fd);
> +
> +	if(callbacks->on_trace_end) callbacks->on_trace_end(callbacks);
> +
> +	return ret;
> +}
> +
> +int liblttd_stop() {
> +	quit_program = 1;
> +	return 0;
> +}
> +
> diff --git a/liblttd/liblttd.h b/liblttd/liblttd.h
> new file mode 100644
> index 0000000..bd76eeb
> --- /dev/null
> +++ b/liblttd/liblttd.h
> @@ -0,0 +1,218 @@
> +/* liblttd header file
> + *
> + * Copyright 2010-
> + *		 Oumarou Dicko <oumarou.dicko at polymtl.ca>
> + *		 Michael Sills-Lavoie <michael.sills-lavoie at polymtl.ca>
> + *
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU General Public License as published by
> + * the Free Software Foundation; either version 2 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	See the
> + * GNU General Public License for more details.
> + *
> + */
> +
> +#ifndef _LIBLTTD_H
> +#define _LIBLTTD_H
> +
> +#include <pthread.h>
> +
> +/**
> +* This structure contains the data associated with the channel file descriptor.
> +* The lib user can use user_data to store the data associated to the specified
> +* channel. The lib user can read but MUST NOT change the other attributes.
> +*/
> +struct fd_pair {
> +	/**
> +	* This is the channel file descriptor.
> +	*/
> +	int channel;
> +
> +	/**
> +	* This is the number of subbuffer for this channel.
> +	*/
> +	unsigned int n_sb;
> +
> +	/**
> +	* This is the subbuffer size for this channel.
> +	*/
> +	unsigned int max_sb_size;
> +
> +	/**
> +	* Not used anymore.
> +	*/
> +	void *mmap;
> +
> +	/**
> +	* This is a mutex for internal library usage.
> +	*/
> +	pthread_mutex_t	mutex;
> +
> +	/**
> +	* Library user data.
> +	*/
> +	void *user_data;
> +};
> +
> +/**
> +* This structure contains the necessary callbacks for a tracing session. The
> +* user can set the unnecessary functions to NULL if he does not need them.
> +*/
> +struct liblttd_callbacks {
> +	/**
> +	* This callback is called after a channel file is open.
> +	*
> +	* @args data This argument is a pointeur to the callbacks struct that
> +	*            has been passed to the lib.
> +	* @args pair This structure contains the data associated with the
> +	*            channel file descriptor. The lib user can use user_data to
> +	*            store the data associated to the specified channel.
> +	* @args relative_channel_path This argument represents a relative path
> +	*            to the channel file. This path is relative to the root
> +	*            folder of the trace channels.
> +	*
> +	* @return Should return 0 if the callback succeeds else not 0.
> +	*/
> +	int(*on_open_channel)(struct liblttd_callbacks *data,
> +		struct fd_pair *pair, char *relative_channel_path);
> +
> +	/**
> +	* This callback is called after a channel file is closed.
> +	*
> +	* @remarks After a channel file has been closed, it will never be read
> +	*            again.
> +	*
> +	* @args data This argument is a pointeur to the callbacks struct that
> +	*            has been passed to the lib.
> +	* @args pair This structure contains the data associated with the
> +	*            channel file descriptor. The lib user should clean
> +	*            user_data at this time.
> +	*
> +	* @return Should return 0 if the callback succeeds else not 0.
> +	*/
> +	int(*on_close_channel)(struct liblttd_callbacks *data,
> +		struct fd_pair *pair);
> +
> +
> +	/**
> +	* This callback is called when the library enter in a new subfolder
> +	* while it is scanning the trace channel tree. It can be used to create
> +	* the output file structure of the trace.
> +	*
> +	* @args data This argument is a pointeur to the callbacks struct that
> +	*            has been passed to the lib.
> +	* @args relative_folder_path This argument represents a relative path
> +	*            to the channel folder. This path is relative to the root
> +	*            folder of the trace channels.
> +	*
> +	* @return Should return 0 if the callback succeeds else not 0.
> +	*/
> +	int(*on_new_channels_folder)(struct liblttd_callbacks *data,
> +		char *relative_folder_path);
> +
> +	/**
> +	* This callback is called after a subbuffer is a reserved.
> +	*
> +	* @attention It has to be thread safe, it'll be called by many threads.
> +	*
> +	* @args data This argument is a pointeur to the callbacks struct that
> +	*            has been passed to the lib.
> +	* @args pair This structure contains the data associated with the
> +	*            channel file descriptor. The lib user should clean
> +	*            user_data at this time.
> +	* @args len This argument represents the length the data that has to be
> +	*            read.
> +	*
> +	* @return Should return 0 if the callback succeeds else not 0.
> +	*/
> +	int(*on_read_subbuffer)(struct liblttd_callbacks *data,
> +		struct fd_pair *pair, unsigned int len);
> +
> +	/**
> +	* This callback is called at the very end of the tracing session. At
> +	* this time, all the channels have been closed and the threads have been
> +	* destroyed.
> +	*
> +	* @remarks After this callback is called, no other callback will be
> +	*            called again.
> +	*
> +	* @attention It has to be thread safe, it'll be called by many threads.
> +	*
> +	* @args data This argument is a pointeur to the callbacks struct that
> +	*            has been passed to the lib.
> +	*
> +	* @return Should return 0 if the callback succeeds else not 0.
> +	*/
> +	int(*on_trace_end)(struct liblttd_callbacks *data);
> +
> +	/**
> +	* This callback is called after a new thread has been created.
> +	*
> +	* @attention It has to be thread safe, it'll be called by many threads.
> +	*
> +	* @args data This argument is a pointeur to the callbacks struct that
> +	*            has been passed to the lib.
> +	* @args thread_num This argument represents the id of the thread.
> +	*
> +	* @return Should return 0 if the callback succeeds else not 0.
> +	*/
> +	int(*on_new_thread)(struct liblttd_callbacks *data,
> +		unsigned long thread_num);
> +
> +	/**
> +	* This callback is called just before a thread is destroyed.
> +	*
> +	* @attention It has to be thread safe, it'll be called by many threads.
> +	*
> +	* @args data This argument is a pointeur to the callbacks struct that
> +	*            has been passed to the lib.
> +	* @args thread_num This argument represents the number of the thread.
> +	*
> +	* @return Should return 0 if the callback succeeds else not 0.
> +	*/
> +	int(*on_close_thread)(struct liblttd_callbacks *data,
> +		unsigned long thread_num);
> +
> +	/**
> +	* This is where the user can put the library's data.
> +	*/
> +	void *user_data;
> +};
> +
> +/**
> +* This function is called to start a new tracing session.
> +*
> +* @attention It has to be thread safe, it'll be called by many threads.
> +*
> +* @args channel_path This argument is a path to the root folder of the trace's
> +*            channels.
> +* @args n_threads This argument represents the number of threads that will be
> +*            used by the library.
> +* @args flight_only If this argument to set to 1, only the channel that are in
> +*            flight recorder mode will be recorded.
> +* @args normal_only If this argument to set to 1, only the channel that are in
> +*            normal mode will be recorded.
> +* @args verbose If this argument to set to 1, more informations will be printed.
> +* @args user_data This argument is a pointeur to the callbacks struct that
> +*            contains the user's functions.
> +*
> +* @return Return 0 if the function succeeds else not 0.
> +*/
> +int liblttd_start(char *channel_path, unsigned long n_threads,
> +	int flight_only, int normal_only, int verbose,
> +	struct liblttd_callbacks *user_data);
> +
> +/**
> +* This function is called to stop a tracing session.
> +*
> +* @return Return 0 if the function succeeds.
> +*/
> +int liblttd_stop();
> +
> +#endif /*_LIBLTTD_H */
> +
> diff --git a/lttd/Makefile.am b/lttd/Makefile.am
> index bb860bc..fc9b219 100644
> --- a/lttd/Makefile.am
> +++ b/lttd/Makefile.am
> @@ -6,3 +6,6 @@ bin_PROGRAMS = lttd
>  
>  lttd_SOURCES = lttd.c
>  
> +lttd_DEPENDENCIES = ../liblttd/liblttd.la
> +lttd_LDADD = $(lttd_DEPENDENCIES)
> +
> diff --git a/lttd/lttd.c b/lttd/lttd.c
> index 773cb23..9143ef7 100644
> --- a/lttd/lttd.c
> +++ b/lttd/lttd.c
> @@ -17,125 +17,36 @@
>  
>  #define _REENTRANT
>  #define _GNU_SOURCE
> -#include <features.h>
> +
>  #include <stdio.h>
> -#include <unistd.h>
> -#include <errno.h>
> -#include <sys/types.h>
> -#include <sys/stat.h>
>  #include <stdlib.h>
> -#include <dirent.h>
>  #include <string.h>
> -#include <fcntl.h>
> -#include <sys/poll.h>
> -#include <sys/mman.h>
>  #include <signal.h>
> -#include <pthread.h>
> -#include <sys/syscall.h>
> -#include <unistd.h>
> -#include <asm/ioctls.h>
> -
> -#include <linux/version.h>
> -
> -/* Relayfs IOCTL */
> -#include <asm/ioctl.h>
> -#include <asm/types.h>
> -
> -/* Get the next sub buffer that can be read. */
> -#define RELAY_GET_SB		_IOR(0xF5, 0x00,__u32)
> -/* Release the oldest reserved (by "get") sub buffer. */
> -#define RELAY_PUT_SB		_IOW(0xF5, 0x01,__u32)
> -/* returns the number of sub buffers in the per cpu channel. */
> -#define RELAY_GET_N_SB		_IOR(0xF5, 0x02,__u32)
> -/* returns the size of the current sub buffer. */
> -#define RELAY_GET_SB_SIZE	_IOR(0xF5, 0x03, __u32)
> -/* returns the size of data to consume in the current sub-buffer. */
> -#define RELAY_GET_MAX_SB_SIZE	_IOR(0xF5, 0x04, __u32)
> -
> -
> -#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,14)
> -#include <sys/inotify.h>
> -#if 0	/* should now be provided by libc. */
> -/* From the inotify-tools 2.6 package */
> -static inline int inotify_init (void)
> -{
> -	return syscall (__NR_inotify_init);
> -}
> -
> -static inline int inotify_add_watch (int fd, const char *name, __u32 mask)
> -{
> -	return syscall (__NR_inotify_add_watch, fd, name, mask);
> -}
> -
> -static inline int inotify_rm_watch (int fd, __u32 wd)
> -{
> -	return syscall (__NR_inotify_rm_watch, fd, wd);
> -}
> -#endif //0
> -#define HAS_INOTIFY
> -#else
> -static inline int inotify_init (void)
> -{
> -	return -1;
> -}
> +#include <errno.h>
> +#include <fcntl.h>
> +#include <dirent.h>
> +#include <sys/stat.h>
>  
> -static inline int inotify_add_watch (int fd, const char *name, __u32 mask)
> -{
> -	return 0;
> -}
> +#include <liblttd/liblttd.h>
>  
> -static inline int inotify_rm_watch (int fd, __u32 wd)
> -{
> -	return 0;
> -}
> -#undef HAS_INOTIFY
> -#endif
> -
> -struct fd_pair {
> -	int channel;
> +struct lttd_channel_data {
>  	int trace;
> -	unsigned int n_sb;
> -	unsigned int max_sb_size;
> -	void *mmap;
> -	pthread_mutex_t	mutex;
> -};
> -
> -struct channel_trace_fd {
> -	struct fd_pair *pair;
> -	int num_pairs;
> -};
> -
> -struct inotify_watch {
> -	int wd;
> -	char path_channel[PATH_MAX];
> -	char path_trace[PATH_MAX];
> -};
> -
> -struct inotify_watch_array {
> -	struct inotify_watch *elem;
> -	int num;
>  };
>  
> -static __thread int thread_pipe[2];
> -
> -struct channel_trace_fd fd_pairs = { NULL, 0 };
> -int inotify_fd = -1;
> -struct inotify_watch_array inotify_watch_array = { NULL, 0 };
> -
> -/* protects fd_pairs and inotify_watch_array */
> -pthread_rwlock_t fd_pairs_lock = PTHREAD_RWLOCK_INITIALIZER;
> -
> -
> +static char		path_trace[PATH_MAX];
> +static char		*end_path_trace;
> +static int		path_trace_len = 0;
>  static char		*trace_name = NULL;
>  static char		*channel_name = NULL;
>  static int		daemon_mode = 0;
>  static int		append_mode = 0;
>  static unsigned long	num_threads = 1;
> -volatile static int	quit_program = 0;	/* For signal handler */
>  static int		dump_flight_only = 0;
>  static int		dump_normal_only = 0;
>  static int		verbose_mode = 0;
>  
> +static __thread int thread_pipe[2];
> +
>  #define printf_verbose(fmt, args...) \
>    do {                               \
>      if (verbose_mode)                \
> @@ -178,7 +89,7 @@ int parse_arguments(int argc, char **argv)
>  {
>  	int ret = 0;
>  	int argn = 1;
> -	
> +
>  	if(argc == 2) {
>  		if(strcmp(argv[1], "-h") == 0) {
>  			return 1;
> @@ -236,19 +147,19 @@ int parse_arguments(int argc, char **argv)
>  		}
>  		argn++;
>  	}
> -	
> +
>  	if(trace_name == NULL) {
>  		printf("Please specify a trace name.\n");
>  		printf("\n");
>  		ret = -1;
>  	}
> -	
> +
>  	if(channel_name == NULL) {
>  		printf("Please specify a channel name.\n");
>  		printf("\n");
>  		ret = -1;
>  	}
> -	
> +
>  	return ret;
>  }
>  
> @@ -267,223 +178,96 @@ void show_info(void)
>  static void handler(int signo)
>  {
>  	printf("Signal %d received : exiting cleanly\n", signo);
> -	quit_program = 1;
> +	liblttd_stop();
>  }
>  
> -
> -int open_buffer_file(char *filename, char *path_channel, char *path_trace,
> -	struct channel_trace_fd *fd_pairs)
> +int lttd_on_open_channel(struct liblttd_callbacks *data, struct fd_pair *pair, char *relative_channel_path)
>  {
>  	int open_ret = 0;
> -	int ret = 0;
> +	int ret;
>  	struct stat stat_buf;
> +	struct lttd_channel_data *channel_data;
>  
> -	if(strncmp(filename, "flight-", sizeof("flight-")-1) != 0) {
> -		if(dump_flight_only) {
> -			printf_verbose("Skipping normal channel %s\n",
> -				path_channel);
> -			return 0;
> -		}
> -	} else {
> -		if(dump_normal_only) {
> -			printf_verbose("Skipping flight channel %s\n",
> -				path_channel);
> -			return 0;
> -		}
> -	}
> -	printf_verbose("Opening file.\n");
> -	
> -	fd_pairs->pair = realloc(fd_pairs->pair,
> -			++fd_pairs->num_pairs * sizeof(struct fd_pair));
> +	pair->user_data = malloc(sizeof(struct lttd_channel_data));
> +	channel_data = pair->user_data;
> +
> +	strncpy(end_path_trace, relative_channel_path, PATH_MAX - path_trace_len);
> +	printf_verbose("Creating trace file %s\n", path_trace);
>  
> -	/* Open the channel in read mode */
> -	fd_pairs->pair[fd_pairs->num_pairs-1].channel = 
> -		open(path_channel, O_RDONLY | O_NONBLOCK);
> -	if(fd_pairs->pair[fd_pairs->num_pairs-1].channel == -1) {
> -		perror(path_channel);
> -		fd_pairs->num_pairs--;
> -		return 0;	/* continue */
> -	}
> -	/* Open the trace in write mode, only append if append_mode */
>  	ret = stat(path_trace, &stat_buf);
>  	if(ret == 0) {
>  		if(append_mode) {
>  			printf_verbose("Appending to file %s as requested\n",
>  				path_trace);
>  
> -			fd_pairs->pair[fd_pairs->num_pairs-1].trace = 
> -				open(path_trace, O_WRONLY,
> -						S_IRWXU|S_IRWXG|S_IRWXO);
> -			if(fd_pairs->pair[fd_pairs->num_pairs-1].trace == -1) {
> +			channel_data->trace = open(path_trace, O_WRONLY, S_IRWXU|S_IRWXG|S_IRWXO);
> +			if(channel_data->trace == -1) {
>  				perror(path_trace);
>  				open_ret = -1;
> -				close(fd_pairs->pair[fd_pairs->num_pairs-1].channel);
> -				fd_pairs->num_pairs--;
>  				goto end;
>  			}
> -			ret = lseek(fd_pairs->pair[fd_pairs->num_pairs-1].trace,
> -				    0, SEEK_END);
> +			ret = lseek(channel_data->trace, 0, SEEK_END);
>  			if (ret < 0) {
>  				perror(path_trace);
>  				open_ret = -1;
> -				close(fd_pairs->pair[fd_pairs->num_pairs-1].channel);
> -				close(fd_pairs->pair[fd_pairs->num_pairs-1].trace);
> -				fd_pairs->num_pairs--;
> +				close(channel_data->trace);
>  				goto end;
>  			}
>  		} else {
>  			printf("File %s exists, cannot open. Try append mode.\n", path_trace);
>  			open_ret = -1;
> -			close(fd_pairs->pair[fd_pairs->num_pairs-1].channel);
> -			fd_pairs->num_pairs--;
>  			goto end;
>  		}
>  	} else {
>  		if(errno == ENOENT) {
> -			fd_pairs->pair[fd_pairs->num_pairs-1].trace = 
> -				open(path_trace, O_WRONLY|O_CREAT|O_EXCL,
> -						S_IRWXU|S_IRWXG|S_IRWXO);
> -			if(fd_pairs->pair[fd_pairs->num_pairs-1].trace == -1) {
> +			channel_data->trace = open(path_trace, O_WRONLY|O_CREAT|O_EXCL, S_IRWXU|S_IRWXG|S_IRWXO);
> +			if(channel_data->trace == -1) {
>  				perror(path_trace);
>  				open_ret = -1;
> -				close(fd_pairs->pair[fd_pairs->num_pairs-1].channel);
> -				fd_pairs->num_pairs--;
>  				goto end;
>  			}
>  		}
>  	}
> +
>  end:
>  	return open_ret;
> +
>  }
>  
> -int open_channel_trace_pairs(char *subchannel_name, char *subtrace_name,
> -		struct channel_trace_fd *fd_pairs, int *inotify_fd,
> -		struct inotify_watch_array *iwatch_array)
> +int lttd_on_close_channel(struct liblttd_callbacks *data, struct fd_pair *pair)
> +{
> +	int ret;
> +	ret = close(((struct lttd_channel_data *)(pair->user_data))->trace);
> +	free(pair->user_data);
> +	return ret;
> +}
> +
> +int lttd_on_new_channels_folder(struct liblttd_callbacks *data, char *relative_folder_path)
>  {
> -	DIR *channel_dir = opendir(subchannel_name);
> -	struct dirent *entry;
> -	struct stat stat_buf;
>  	int ret;
> -	char path_channel[PATH_MAX];
> -	int path_channel_len;
> -	char *path_channel_ptr;
> -	char path_trace[PATH_MAX];
> -	int path_trace_len;
> -	char *path_trace_ptr;
>  	int open_ret = 0;
>  
> -	if(channel_dir == NULL) {
> -		perror(subchannel_name);
> -		open_ret = ENOENT;
> -		goto end;
> -	}
> +	strncpy(end_path_trace, relative_folder_path, PATH_MAX - path_trace_len);
> +	printf_verbose("Creating trace subdirectory %s\n", path_trace);
>  
> -	printf_verbose("Creating trace subdirectory %s\n", subtrace_name);
> -	ret = mkdir(subtrace_name, S_IRWXU|S_IRWXG|S_IRWXO);
> +	ret = mkdir(path_trace, S_IRWXU|S_IRWXG|S_IRWXO);
>  	if(ret == -1) {
>  		if(errno != EEXIST) {
> -			perror(subtrace_name);
> +			perror(path_trace);
>  			open_ret = -1;
>  			goto end;
>  		}
>  	}
>  
> -	strncpy(path_channel, subchannel_name, PATH_MAX-1);
> -	path_channel_len = strlen(path_channel);
> -	path_channel[path_channel_len] = '/';
> -	path_channel_len++;
> -	path_channel_ptr = path_channel + path_channel_len;
> -
> -	strncpy(path_trace, subtrace_name, PATH_MAX-1);
> -	path_trace_len = strlen(path_trace);
> -	path_trace[path_trace_len] = '/';
> -	path_trace_len++;
> -	path_trace_ptr = path_trace + path_trace_len;
> -	
> -#ifdef HAS_INOTIFY
> -	iwatch_array->elem = realloc(iwatch_array->elem,
> -		++iwatch_array->num * sizeof(struct inotify_watch));
> -	
> -	printf_verbose("Adding inotify for channel %s\n", path_channel);
> -	iwatch_array->elem[iwatch_array->num-1].wd = inotify_add_watch(*inotify_fd, path_channel, IN_CREATE);
> -	strcpy(iwatch_array->elem[iwatch_array->num-1].path_channel, path_channel);
> -	strcpy(iwatch_array->elem[iwatch_array->num-1].path_trace, path_trace);
> -	printf_verbose("Added inotify for channel %s, wd %u\n",
> -		iwatch_array->elem[iwatch_array->num-1].path_channel,
> -		iwatch_array->elem[iwatch_array->num-1].wd);
> -#endif
> -
> -	while((entry = readdir(channel_dir)) != NULL) {
> -
> -		if(entry->d_name[0] == '.') continue;
> -		
> -		strncpy(path_channel_ptr, entry->d_name, PATH_MAX - path_channel_len);
> -		strncpy(path_trace_ptr, entry->d_name, PATH_MAX - path_trace_len);
> -		
> -		ret = stat(path_channel, &stat_buf);
> -		if(ret == -1) {
> -			perror(path_channel);
> -			continue;
> -		}
> -		
> -		printf_verbose("Channel file : %s\n", path_channel);
> -		
> -		if(S_ISDIR(stat_buf.st_mode)) {
> -
> -			printf_verbose("Entering channel subdirectory...\n");
> -			ret = open_channel_trace_pairs(path_channel, path_trace, fd_pairs,
> -				inotify_fd, iwatch_array);
> -			if(ret < 0) continue;
> -		} else if(S_ISREG(stat_buf.st_mode)) {
> -			open_ret = open_buffer_file(entry->d_name, path_channel, path_trace,
> -				fd_pairs);
> -			if(open_ret)
> -				goto end;
> -		}
> -	}
> -	
>  end:
> -	closedir(channel_dir);
> -
>  	return open_ret;
>  }
>  
> -
> -int read_subbuffer(struct fd_pair *pair)
> +int lttd_on_read_subbuffer(struct liblttd_callbacks *data, struct fd_pair *pair, unsigned int len)
>  {
> -	unsigned int consumed_old, len;
> -	int err;
>  	long ret;
> -	off_t offset;
> -
> +	off_t offset = 0;
>  
> -	err = ioctl(pair->channel, RELAY_GET_SB, &consumed_old);
> -	printf_verbose("cookie : %u\n", consumed_old);
> -	if(err != 0) {
> -		ret = errno;
> -		perror("Reserving sub buffer failed (everything is normal, it is due to concurrency)");
> -		goto get_error;
> -	}
> -#if 0
> -	err = TEMP_FAILURE_RETRY(write(pair->trace,
> -				pair->mmap 
> -					+ (consumed_old & ((pair->n_subbufs * pair->subbuf_size)-1)),
> -				pair->subbuf_size));
> -
> -	if(err < 0) {
> -		ret = errno;
> -		perror("Error in writing to file");
> -		goto write_error;
> -	}
> -#endif //0
> -	err = ioctl(pair->channel, RELAY_GET_SB_SIZE, &len);
> -	if(err != 0) {
> -		ret = errno;
> -		perror("Getting sub-buffer len failed.");
> -		goto get_error;
> -	}
> -
> -	offset = 0;
>  	while (len > 0) {
>  		printf_verbose("splice chan to pipe offset %lu\n",
>  			(unsigned long)offset);
> @@ -494,8 +278,9 @@ int read_subbuffer(struct fd_pair *pair)
>  			perror("Error in relay splice");
>  			goto write_error;
>  		}
> -		ret = splice(thread_pipe[0], NULL, pair->trace, NULL,
> -			ret, SPLICE_F_MOVE | SPLICE_F_MORE);
> +		ret = splice(thread_pipe[0], NULL,
> +			((struct lttd_channel_data *)(pair->user_data))->trace,
> +			NULL, ret, SPLICE_F_MOVE | SPLICE_F_MORE);
>  		printf_verbose("splice pipe to file %ld\n", ret);
>  		if (ret < 0) {
>  			perror("Error in file splice");
> @@ -504,466 +289,42 @@ int read_subbuffer(struct fd_pair *pair)
>  		len -= ret;
>  	}
>  
> -#if 0
> -	err = fsync(pair->trace);
> -	if(err < 0) {
> -		ret = errno;
> -		perror("Error in writing to file");
> -		goto write_error;
> -	}
> -#endif //0
>  write_error:
> -	ret = 0;
> -	err = ioctl(pair->channel, RELAY_PUT_SB, &consumed_old);
> -	if(err != 0) {
> -		ret = errno;
> -		if(errno == EFAULT) {
> -			perror("Error in unreserving sub buffer\n");
> -		} else if(errno == EIO) {
> -			/* Should never happen with newer LTTng versions */
> -			perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
> -		}
> -		goto get_error;
> -	}
> -
> -get_error:
> -	return ret;
> -}
> -
> -
> -int map_channels(struct channel_trace_fd *fd_pairs,
> -	int idx_begin, int idx_end)
> -{
> -	int i,j;
> -	int ret=0;
> -
> -	if(fd_pairs->num_pairs <= 0) {
> -		printf("No channel to read\n");
> -		goto end;
> -	}
> -	
> -	/* Get the subbuf sizes and number */
> -
> -	for(i=idx_begin;i<idx_end;i++) {
> -		struct fd_pair *pair = &fd_pairs->pair[i];
> -
> -		ret = ioctl(pair->channel, RELAY_GET_N_SB, &pair->n_sb);
> -		if(ret != 0) {
> -			perror("Error in getting the number of sub-buffers");
> -			goto end;
> -		}
> -		ret = ioctl(pair->channel, RELAY_GET_MAX_SB_SIZE, 
> -			    &pair->max_sb_size);
> -		if(ret != 0) {
> -			perror("Error in getting the max sub-buffer size");
> -			goto end;
> -		}
> -		ret = pthread_mutex_init(&pair->mutex, NULL);	/* Fast mutex */
> -		if(ret != 0) {
> -			perror("Error in mutex init");
> -			goto end;
> -		}
> -	}
> -
> -#if 0
> -	/* Mmap each FD */
> -	for(i=idx_begin;i<idx_end;i++) {
> -		struct fd_pair *pair = &fd_pairs->pair[i];
> -
> -		pair->mmap = mmap(0, pair->subbuf_size * pair->n_subbufs, PROT_READ,
> -				MAP_SHARED, pair->channel, 0);
> -		if(pair->mmap == MAP_FAILED) {
> -			perror("Mmap error");
> -			goto munmap;
> -		}
> -	}
> -
> -	goto end; /* success */
> -
> -	/* Error handling */
> -	/* munmap only the successfully mmapped indexes */
> -munmap:
> -		/* Munmap each FD */
> -	for(j=idx_begin;j<i;j++) {
> -		struct fd_pair *pair = &fd_pairs->pair[j];
> -		int err_ret;
> -
> -		err_ret = munmap(pair->mmap, pair->subbuf_size * pair->n_subbufs);
> -		if(err_ret != 0) {
> -			perror("Error in munmap");
> -		}
> -		ret |= err_ret;
> -	}
> -
> -#endif //0
> -end:
>  	return ret;
>  }
>  
> -int unmap_channels(struct channel_trace_fd *fd_pairs)
> -{
> -	int j;
> -	int ret=0;
> -
> -	/* Munmap each FD */
> -	for(j=0;j<fd_pairs->num_pairs;j++) {
> -		struct fd_pair *pair = &fd_pairs->pair[j];
> -		int err_ret;
> -
> -#if 0
> -		err_ret = munmap(pair->mmap, pair->subbuf_size * pair->n_subbufs);
> -		if(err_ret != 0) {
> -			perror("Error in munmap");
> -		}
> -		ret |= err_ret;
> -#endif //0
> -		err_ret = pthread_mutex_destroy(&pair->mutex);
> -		if(err_ret != 0) {
> -			perror("Error in mutex destroy");
> -		}
> -		ret |= err_ret;
> -	}
> -
> -	return ret;
> -}
> -
> -#ifdef HAS_INOTIFY
> -/* Inotify event arrived.
> - *
> - * Only support add file for now.
> - */
> -
> -int read_inotify(int inotify_fd,
> -	struct channel_trace_fd *fd_pairs,
> -	struct inotify_watch_array *iwatch_array)
> -{
> -	char buf[sizeof(struct inotify_event) + PATH_MAX];
> -	char path_channel[PATH_MAX];
> -	char path_trace[PATH_MAX];
> -	ssize_t len;
> -	struct inotify_event *ievent;
> -	size_t offset;
> -	unsigned int i;
> -	int ret;
> -	int old_num;
> -	
> -	offset = 0;
> -	len = read(inotify_fd, buf, sizeof(struct inotify_event) + PATH_MAX);
> -	if(len < 0) {
> -
> -		if(errno == EAGAIN)
> -			return 0;  /* another thread got the data before us */
> -
> -		printf("Error in read from inotify FD %s.\n", strerror(len));
> -		return -1;
> -	}
> -	while(offset < len) {
> -		ievent = (struct inotify_event *)&(buf[offset]);
> -		for(i=0; i<iwatch_array->num; i++) {
> -			if(iwatch_array->elem[i].wd == ievent->wd &&
> -				ievent->mask == IN_CREATE) {
> -				printf_verbose(
> -					"inotify wd %u event mask : %u for %s%s\n",
> -					ievent->wd, ievent->mask,
> -					iwatch_array->elem[i].path_channel,
> -					ievent->name);
> -				old_num = fd_pairs->num_pairs;
> -				strcpy(path_channel, iwatch_array->elem[i].path_channel);
> -				strcat(path_channel, ievent->name);
> -				strcpy(path_trace, iwatch_array->elem[i].path_trace);
> -				strcat(path_trace, ievent->name);
> -				if(ret = open_buffer_file(ievent->name, path_channel,
> -					path_trace, fd_pairs)) {
> -					printf("Error opening buffer file\n");
> -					return -1;
> -				}
> -				if(ret = map_channels(fd_pairs, old_num, fd_pairs->num_pairs)) {
> -					printf("Error mapping channel\n");
> -					return -1;
> -				}
> -
> -			}
> -		}
> -		offset += sizeof(*ievent) + ievent->len;
> -	}
> -}
> -#endif //HAS_INOTIFY
> -
> -/* read_channels
> - *
> - * Thread worker.
> - *
> - * Read the debugfs channels and write them in the paired tracefiles.
> - *
> - * @fd_pairs : paired channels and trace files.
> - *
> - * returns 0 on success, -1 on error.
> - *
> - * Note that the high priority polled channels are consumed first. We then poll
> - * again to see if these channels are still in priority. Only when no
> - * high priority channel is left, we start reading low priority channels.
> - *
> - * Note that a channel is considered high priority when the buffer is almost
> - * full.
> - */
> -
> -int read_channels(unsigned long thread_num, struct channel_trace_fd *fd_pairs,
> -	int inotify_fd, struct inotify_watch_array *iwatch_array)
> -{
> -	struct pollfd *pollfd = NULL;
> -	int num_pollfd;
> -	int i,j;
> -	int num_rdy, num_hup;
> -	int high_prio;
> -	int ret = 0;
> -	int inotify_fds;
> -	unsigned int old_num;
> -
> -#ifdef HAS_INOTIFY
> -	inotify_fds = 1;
> -#else
> -	inotify_fds = 0;
> -#endif
> -
> -	pthread_rwlock_rdlock(&fd_pairs_lock);
> -
> -	/* Start polling the FD. Keep one fd for inotify */
> -	pollfd = malloc((inotify_fds + fd_pairs->num_pairs) * sizeof(struct pollfd));
> -
> -#ifdef HAS_INOTIFY
> -	pollfd[0].fd = inotify_fd;
> -	pollfd[0].events = POLLIN|POLLPRI;
> -#endif
> -
> -	for(i=0;i<fd_pairs->num_pairs;i++) {
> -		pollfd[inotify_fds+i].fd = fd_pairs->pair[i].channel;
> -		pollfd[inotify_fds+i].events = POLLIN|POLLPRI;
> -	}
> -	num_pollfd = inotify_fds + fd_pairs->num_pairs;
> -
> -
> -	pthread_rwlock_unlock(&fd_pairs_lock);
> -
> -	while(1) {
> -		high_prio = 0;
> -		num_hup = 0; 
> -#ifdef DEBUG
> -		printf("Press a key for next poll...\n");
> -		char buf[1];
> -		read(STDIN_FILENO, &buf, 1);
> -		printf("Next poll (polling %d fd) :\n", num_pollfd);
> -#endif //DEBUG
> -
> -		/* Have we received a signal ? */
> -		if(quit_program) break;
> -		
> -		num_rdy = poll(pollfd, num_pollfd, -1);
> -
> -		if(num_rdy == -1) {
> -			perror("Poll error");
> -			goto free_fd;
> -		}
> -
> -		printf_verbose("Data received\n");
> -#ifdef HAS_INOTIFY
> -		switch(pollfd[0].revents) {
> -			case POLLERR:
> -				printf_verbose(
> -					"Error returned in polling inotify fd %d.\n",
> -					pollfd[0].fd);
> -				break;
> -			case POLLHUP:
> -				printf_verbose(
> -					"Polling inotify fd %d tells it has hung up.\n",
> -					pollfd[0].fd);
> -				break;
> -			case POLLNVAL:
> -				printf_verbose(
> -					"Polling inotify fd %d tells fd is not open.\n",
> -					pollfd[0].fd);
> -				break;
> -			case POLLPRI:
> -			case POLLIN:
> -				printf_verbose(
> -					"Polling inotify fd %d : data ready.\n",
> -					pollfd[0].fd);
> -
> -				pthread_rwlock_wrlock(&fd_pairs_lock);
> -				read_inotify(inotify_fd, fd_pairs, iwatch_array);
> -				pthread_rwlock_unlock(&fd_pairs_lock);
> -
> -			break;
> -		}
> -#endif
> -
> -		for(i=inotify_fds;i<num_pollfd;i++) {
> -			switch(pollfd[i].revents) {
> -				case POLLERR:
> -					printf_verbose(
> -						"Error returned in polling fd %d.\n",
> -						pollfd[i].fd);
> -					num_hup++;
> -					break;
> -				case POLLHUP:
> -					printf_verbose(
> -						"Polling fd %d tells it has hung up.\n",
> -						pollfd[i].fd);
> -					num_hup++;
> -					break;
> -				case POLLNVAL:
> -					printf_verbose(
> -						"Polling fd %d tells fd is not open.\n",
> -						pollfd[i].fd);
> -					num_hup++;
> -					break;
> -				case POLLPRI:
> -					pthread_rwlock_rdlock(&fd_pairs_lock);
> -					if(pthread_mutex_trylock(&fd_pairs->pair[i-inotify_fds].mutex) == 0) {
> -						printf_verbose(
> -							"Urgent read on fd %d\n",
> -							pollfd[i].fd);
> -						/* Take care of high priority channels first. */
> -						high_prio = 1;
> -						/* it's ok to have an unavailable sub-buffer */
> -						ret = read_subbuffer(&fd_pairs->pair[i-inotify_fds]);
> -						if(ret == EAGAIN) ret = 0;
> -
> -						ret = pthread_mutex_unlock(&fd_pairs->pair[i-inotify_fds].mutex);
> -						if(ret)
> -							printf("Error in mutex unlock : %s\n", strerror(ret));
> -					}
> -					pthread_rwlock_unlock(&fd_pairs_lock);
> -					break;
> -			}
> -		}
> -		/* If every buffer FD has hung up, we end the read loop here */
> -		if(num_hup == num_pollfd - inotify_fds) break;
> -
> -		if(!high_prio) {
> -			for(i=inotify_fds;i<num_pollfd;i++) {
> -				switch(pollfd[i].revents) {
> -					case POLLIN:
> -						pthread_rwlock_rdlock(&fd_pairs_lock);
> -						if(pthread_mutex_trylock(&fd_pairs->pair[i-inotify_fds].mutex) == 0) {
> -							/* Take care of low priority channels. */
> -							printf_verbose(
> -								"Normal read on fd %d\n",
> -								pollfd[i].fd);
> -							/* it's ok to have an unavailable subbuffer */
> -							ret = read_subbuffer(&fd_pairs->pair[i-inotify_fds]);
> -							if(ret == EAGAIN) ret = 0;
> -
> -							ret = pthread_mutex_unlock(&fd_pairs->pair[i-inotify_fds].mutex);
> -							if(ret)
> -								printf("Error in mutex unlock : %s\n", strerror(ret));
> -						}
> -						pthread_rwlock_unlock(&fd_pairs_lock);
> -						break;
> -				}
> -			}
> -		}
> -
> -		/* Update pollfd array if an entry was added to fd_pairs */
> -		pthread_rwlock_rdlock(&fd_pairs_lock);
> -		if((inotify_fds + fd_pairs->num_pairs) != num_pollfd) {
> -			pollfd = realloc(pollfd,
> -					(inotify_fds + fd_pairs->num_pairs) * sizeof(struct pollfd));
> -			for(i=num_pollfd-inotify_fds;i<fd_pairs->num_pairs;i++) {
> -				pollfd[inotify_fds+i].fd = fd_pairs->pair[i].channel;
> -				pollfd[inotify_fds+i].events = POLLIN|POLLPRI;
> -			}
> -			num_pollfd = fd_pairs->num_pairs + inotify_fds;
> -		}
> -		pthread_rwlock_unlock(&fd_pairs_lock);
> -
> -		/* NB: If the fd_pairs structure is updated by another thread from this
> -		 *     point forward, the current thread will wait in the poll without
> -		 *     monitoring the new channel. However, this thread will add the
> -		 *     new channel on next poll (and this should not take too much time
> -		 *     on a loaded system).
> -		 *
> -		 *     This event is quite unlikely and can only occur if a CPU is
> -		 *     hot-plugged while multple lttd threads are running.
> -		 */
> -	}
> -
> -free_fd:
> -	free(pollfd);
> -
> -end:
> -	return ret;
> -}
> -
> -
> -void close_channel_trace_pairs(struct channel_trace_fd *fd_pairs, int inotify_fd,
> -	struct inotify_watch_array *iwatch_array)
> -{
> -	int i;
> +int on_new_thread(struct liblttd_callbacks *data, unsigned long thread_num) {
>  	int ret;
> -
> -	for(i=0;i<fd_pairs->num_pairs;i++) {
> -		ret = close(fd_pairs->pair[i].channel);
> -		if(ret == -1) perror("Close error on channel");
> -		ret = close(fd_pairs->pair[i].trace);
> -		if(ret == -1) perror("Close error on trace");
> -	}
> -	free(fd_pairs->pair);
> -	free(iwatch_array->elem);
> -}
> -
> -/* Thread worker */
> -void * thread_main(void *arg)
> -{
> -	long ret;
> -	unsigned long thread_num = (unsigned long)arg;
> -
>  	ret = pipe(thread_pipe);
>  	if (ret < 0) {
>  		perror("Error creating pipe");
> -		return (void*)ret;
> +		return ret;
>  	}
> -	ret = read_channels(thread_num, &fd_pairs, inotify_fd, &inotify_watch_array);
> -	close(thread_pipe[0]);	/* close read end */
> -	close(thread_pipe[1]);	/* close write end */
> -	return (void*)ret;
> +	return 0;
>  }
>  
> -
> -int channels_init()
> -{
> -	int ret = 0;
> -
> -	inotify_fd = inotify_init();
> -	fcntl(inotify_fd, F_SETFL, O_NONBLOCK);
> -
> -	if(ret = open_channel_trace_pairs(channel_name, trace_name, &fd_pairs,
> -			&inotify_fd, &inotify_watch_array))
> -		goto close_channel;
> -	if (fd_pairs.num_pairs == 0) {
> -		printf("No channel available for reading, exiting\n");
> -		ret = -ENOENT;
> -		goto close_channel;
> -	}
> -	if(ret = map_channels(&fd_pairs, 0, fd_pairs.num_pairs))
> -		goto close_channel;
> +int on_close_thread(struct liblttd_callbacks *data, unsigned long thread_num) {
> +	close(thread_pipe[0]);	/* close read end */
> +	close(thread_pipe[1]);	/* close write end */
>  	return 0;
> -
> -close_channel:
> -	close_channel_trace_pairs(&fd_pairs, inotify_fd, &inotify_watch_array);
> -	if(inotify_fd >= 0)
> -		close(inotify_fd);
> -	return ret;
>  }
>  
> -
>  int main(int argc, char ** argv)
>  {
>  	int ret = 0;
>  	struct sigaction act;
> -	pthread_t *tids;
> -	unsigned long i;
> -	void *tret;
> -	
> +
> +	struct liblttd_callbacks callbacks = {
> +		lttd_on_open_channel,
> +		lttd_on_close_channel,
> +		lttd_on_new_channels_folder,
> +		lttd_on_read_subbuffer,
> +		NULL,
> +		on_new_thread,
> +		on_close_thread,
> +		NULL
> +	};
> +
>  	ret = parse_arguments(argc, argv);
>  
>  	if(ret != 0) show_arguments();
> @@ -983,9 +344,6 @@ int main(int argc, char ** argv)
>  	sigaction(SIGQUIT, &act, NULL);
>  	sigaction(SIGINT, &act, NULL);
>  
> -	if(ret = channels_init())
> -		return ret;
> -
>  	if(daemon_mode) {
>  		ret = daemon(0, 0);
>  
> @@ -995,33 +353,13 @@ int main(int argc, char ** argv)
>  		}
>  	}
>  
> -	tids = malloc(sizeof(pthread_t) * num_threads);
> -	for(i=0; i<num_threads; i++) {
> -
> -		ret = pthread_create(&tids[i], NULL, thread_main, (void*)i);
> -		if(ret) {
> -			perror("Error creating thread");
> -			break;
> -		}
> -	}
> +	strncpy(path_trace, trace_name, PATH_MAX-1);
> +	path_trace_len = strlen(path_trace);
> +	end_path_trace = path_trace + path_trace_len;
>  
> -	for(i=0; i<num_threads; i++) {
> -		ret = pthread_join(tids[i], &tret);
> -		if(ret) {
> -			perror("Error joining thread");
> -			break;
> -		}
> -		if((long)tret != 0) {
> -			printf("Error %s occured in thread %u\n",
> -				strerror((long)tret), i);
> -		}
> -	}
> +	liblttd_start(channel_name, num_threads, dump_flight_only, dump_normal_only,
> +		verbose_mode, &callbacks);
>  
> -	free(tids);
> -	ret = unmap_channels(&fd_pairs);
> -	close_channel_trace_pairs(&fd_pairs, inotify_fd, &inotify_watch_array);
> -	if(inotify_fd >= 0)
> -		close(inotify_fd);
> -			
>  	return ret;
>  }
> +


-- 
Mathieu Desnoyers
OpenPGP key fingerprint: 8CD5 52C3 8E3C 4140 715F  BA06 3F25 A8FE 3BAE 9A68




More information about the lttng-dev mailing list