[lttng-dev] [PATCH lttng-tools v2] Add wrappers for pipe

Mathieu Desnoyers mathieu.desnoyers at efficios.com
Mon May 13 16:05:38 EDT 2013


* David Goulet (dgoulet at efficios.com) wrote:
> 
> 
> Mathieu Desnoyers:
> > * David Goulet (dgoulet at efficios.com) wrote:
> >> This is to help use pipes in a way where partial read/write and EINTR
> >> are handled in one single call site.
> >>
> >> Two new files are created, pipe.c/.h which are part of libcommon. The
> >> open, close, read_close, write_close, read and write calls are
> >> implemented using a custom lttng_pipe data structure and protected by
> >> operation's mutex. A destroy function is also available to cleanup
> >> memory once done with a pipe.
> >>
> >> Signed-off-by: David Goulet <dgoulet at efficios.com>
> >> ---
> >>  src/common/Makefile.am |    3 +-
> >>  src/common/pipe.c      |  315 ++++++++++++++++++++++++++++++++++++++++++++++++
> >>  src/common/pipe.h      |   76 ++++++++++++
> >>  3 files changed, 393 insertions(+), 1 deletion(-)
> >>  create mode 100644 src/common/pipe.c
> >>  create mode 100644 src/common/pipe.h
> >>
> >> diff --git a/src/common/Makefile.am b/src/common/Makefile.am
> >> index f2ea40a..6ba6c2b 100644
> >> --- a/src/common/Makefile.am
> >> +++ b/src/common/Makefile.am
> >> @@ -13,7 +13,8 @@ noinst_HEADERS = lttng-kernel.h defaults.h macros.h error.h futex.h \
> >>  noinst_LTLIBRARIES = libcommon.la
> >>  
> >>  libcommon_la_SOURCES = error.h error.c utils.c utils.h runas.c runas.h \
> >> -                       common.h futex.c futex.h uri.c uri.h defaults.c
> >> +                       common.h futex.c futex.h uri.c uri.h defaults.c \
> >> +                       pipe.c pipe.h
> >>  libcommon_la_LIBADD = -luuid
> >>  
> >>  # Consumer library
> >> diff --git a/src/common/pipe.c b/src/common/pipe.c
> >> new file mode 100644
> >> index 0000000..9242a09
> >> --- /dev/null
> >> +++ b/src/common/pipe.c
> >> @@ -0,0 +1,315 @@
> >> +/*
> >> + * Copyright (C) 2013 - David Goulet <dgoulet at efficios.com>
> >> + *
> >> + * This program is free software; you can redistribute it and/or modify it
> >> + * under the terms of the GNU General Public License, version 2 only, as
> >> + * published by the Free Software Foundation.
> >> + *
> >> + * This program is distributed in the hope that it will be useful, but WITHOUT
> >> + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
> >> + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
> >> + * more details.
> >> + *
> >> + * You should have received a copy of the GNU General Public License along with
> >> + * this program; if not, write to the Free Software Foundation, Inc., 51
> >> + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
> >> + */
> >> +
> >> +#define _GNU_SOURCE
> >> +#include <assert.h>
> >> +#include <fcntl.h>
> >> +#include <unistd.h>
> >> +
> >> +#include <common/common.h>
> >> +
> >> +#include "pipe.h"
> >> +
> >> +/*
> >> + * Lock read side of a pipe.
> >> + */
> >> +static void lock_read_side(struct lttng_pipe *pipe)
> >> +{
> >> +	pthread_mutex_lock(&pipe->read_mutex);
> >> +}
> >> +
> >> +/*
> >> + * Unlock read side of a pipe.
> >> + */
> >> +static void unlock_read_side(struct lttng_pipe *pipe)
> >> +{
> >> +	pthread_mutex_unlock(&pipe->read_mutex);
> >> +}
> >> +
> >> +/*
> >> + * Lock write side of a pipe.
> >> + */
> >> +static void lock_write_side(struct lttng_pipe *pipe)
> >> +{
> >> +	pthread_mutex_lock(&pipe->write_mutex);
> >> +}
> >> +
> >> +/*
> >> + * Unlock write side of a pipe.
> >> + */
> >> +static void unlock_write_side(struct lttng_pipe *pipe)
> >> +{
> >> +	pthread_mutex_unlock(&pipe->write_mutex);
> >> +}
> >> +
> >> +/*
> >> + * Open a new lttng pipe and set flags using fcntl().
> >> + *
> >> + * Return a newly allocated lttng pipe on success or else NULL.
> >> + */
> >> +struct lttng_pipe *lttng_pipe_open(int flags)
> >> +{
> >> +	int ret;
> >> +	struct lttng_pipe *p;
> >> +
> >> +	p = zmalloc(sizeof(*p));
> >> +	if (!p) {
> >> +		PERROR("zmalloc pipe open");
> >> +		goto error;
> >> +	}
> >> +
> >> +	ret = pipe(p->fd);
> >> +	if (ret < 0) {
> >> +		PERROR("lttng pipe");
> >> +		goto error;
> >> +	}
> >> +
> >> +	if (flags) {
> >> +		int i;
> >> +
> >> +		for (i = 0; i < 2; i++) {
> >> +			ret = fcntl(p->fd[i], F_SETFD, flags);
> >> +			if (ret < 0) {
> >> +				PERROR("fcntl lttng pipe %d", flags);
> >> +				goto error;
> >> +			}
> >> +		}
> >> +	}
> >> +
> >> +	pthread_mutex_init(&p->read_mutex, NULL);
> >> +	pthread_mutex_init(&p->write_mutex, NULL);
> >> +	p->r_state = LTTNG_PIPE_STATE_OPENED;
> >> +	p->w_state = LTTNG_PIPE_STATE_OPENED;
> >> +	p->flags = flags;
> >> +
> >> +	return p;
> >> +
> >> +error:
> >> +	lttng_pipe_destroy(p);
> >> +	return NULL;
> >> +}
> >> +
> >> +/*
> >> + * Close read side of a lttng pipe.
> >> + *
> >> + * Return 0 on success else a negative value.
> >> + */
> >> +int lttng_pipe_read_close(struct lttng_pipe *pipe)
> >> +{
> >> +	int ret, ret_val = 0;
> >> +
> >> +	assert(pipe);
> >> +
> >> +	/* Handle read side first. */
> >> +	lock_read_side(pipe);
> >> +	if (lttng_pipe_is_read_open(pipe)) {
> >> +		do {
> >> +			ret = close(pipe->fd[0]);
> >> +		} while (ret < 0 && errno == EINTR);
> >> +		if (ret < 0) {
> >> +			PERROR("close lttng read pipe");
> >> +			ret_val = -errno;
> >> +		}
> >> +		pipe->r_state = LTTNG_PIPE_STATE_CLOSED;
> >> +	}
> >> +	unlock_read_side(pipe);
> >> +
> >> +	return ret_val;
> >> +}
> >> +
> >> +/*
> >> + * Close write side of a lttng pipe.
> >> + *
> >> + * Return 0 on success else a negative value.
> >> + */
> >> +int lttng_pipe_write_close(struct lttng_pipe *pipe)
> >> +{
> >> +	int ret, ret_val = 0;
> >> +
> >> +	assert(pipe);
> >> +
> >> +	lock_write_side(pipe);
> >> +	if (lttng_pipe_is_write_open(pipe)) {
> >> +		do {
> >> +			ret = close(pipe->fd[1]);
> >> +		} while (ret < 0 && errno == EINTR);
> >> +		if (ret < 0) {
> >> +			PERROR("close lttng write pipe");
> >> +			ret_val = -errno;
> >> +		}
> >> +		pipe->w_state = LTTNG_PIPE_STATE_CLOSED;
> >> +	}
> >> +	unlock_write_side(pipe);
> >> +
> >> +	return ret_val;
> >> +}
> >> +
> >> +/*
> >> + * Close both read and write side of a lttng pipe.
> >> + *
> >> + * Return 0 on success else a negative value.
> >> + */
> >> +int lttng_pipe_close(struct lttng_pipe *pipe)
> >> +{
> >> +	int ret, ret_val = 0;
> >> +
> >> +	assert(pipe);
> >> +
> >> +	ret = lttng_pipe_read_close(pipe);
> >> +	if (ret < 0) {
> >> +		ret_val = ret;
> >> +	}
> >> +
> >> +	ret = lttng_pipe_write_close(pipe);
> >> +	if (ret < 0) {
> >> +		ret_val = ret;
> >> +	}
> >> +
> >> +	return ret_val;
> >> +}
> >> +
> >> +/*
> >> + * Close and destroy a lttng pipe object. Finally, pipe is freed.
> >> + */
> >> +void lttng_pipe_destroy(struct lttng_pipe *pipe)
> >> +{
> >> +	int ret;
> >> +
> >> +	if (!pipe) {
> >> +		return;
> >> +	}
> >> +
> >> +	/* Wrong code flow. Destroy should *never* be called with mutex locked. */
> >> +	ret = pthread_mutex_trylock(&pipe->read_mutex);
> >> +	assert(!ret);
> >> +	ret = pthread_mutex_trylock(&pipe->write_mutex);
> >> +	assert(!ret);
> > 
> 
> What should be done then ?

try lock
try lock

call versions of function that don't take the lock internally

unlock
unlock

destroy x2
free

split the close pipe functions into one internal (static) that does not
take the lock, and one externally exposed that takes the lock and calls
the internal one.

A leading unscore in front of the internal functions should do the
trick.

Thanks,

Mathieu

> 
> David
> 
> > the issue here is that you trylock, then succeed, and then take the
> > locks again within close, thus deadlocking.
> > 
> > The rest looks good, thanks!
> > 
> > Mathieu
> > 
> >> +
> >> +	/* If pipe is already closed or not opened, this returns gracefully. */
> >> +	(void) lttng_pipe_close(pipe);
> >> +
> >> +	(void) pthread_mutex_destroy(&pipe->read_mutex);
> >> +	(void) pthread_mutex_destroy(&pipe->write_mutex);
> >> +	free(pipe);
> >> +}
> >> +
> >> +/*
> >> + * Read on a lttng pipe and put the data in buf of at least size count.
> >> + *
> >> + * Return 0 on success or else a negative errno message from read(2).
> >> + */
> >> +ssize_t lttng_pipe_read(struct lttng_pipe *pipe, void *buf, size_t count)
> >> +{
> >> +	ssize_t ret, read_len, read_left, index;
> >> +
> >> +	assert(pipe);
> >> +	assert(buf);
> >> +
> >> +	lock_read_side(pipe);
> >> +
> >> +	if (!lttng_pipe_is_read_open(pipe)) {
> >> +		ret = -EBADF;
> >> +		goto error;
> >> +	}
> >> +
> >> +	read_left = count;
> >> +	index = 0;
> >> +	do {
> >> +		read_len = read(pipe->fd[0], buf + index, read_left);
> >> +		if (read_len < 0) {
> >> +			ret = -errno;
> >> +			if (errno == EINTR) {
> >> +				/* Read again. */
> >> +				continue;
> >> +			} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
> >> +				/*
> >> +				 * Return the number of bytes read up to this point if any.
> >> +				 */
> >> +				if (index) {
> >> +					ret = index;
> >> +				}
> >> +				goto error;
> >> +			} else {
> >> +				PERROR("lttng pipe read");
> >> +				goto error;
> >> +			}
> >> +		}
> >> +		read_left -= read_len;
> >> +		index += read_len;
> >> +	} while (read_left > 0);
> >> +
> >> +	/* Everything went fine. */
> >> +	ret = index;
> >> +
> >> +error:
> >> +	unlock_read_side(pipe);
> >> +	return ret;
> >> +}
> >> +
> >> +/*
> >> + * Write on a lttng pipe using the data in buf and size of count.
> >> + *
> >> + * Return 0 on success or else a negative errno message from write(2).
> >> + */
> >> +ssize_t lttng_pipe_write(struct lttng_pipe *pipe, const void *buf,
> >> +		size_t count)
> >> +{
> >> +	ssize_t ret, write_len, write_left, index;
> >> +
> >> +	assert(pipe);
> >> +	assert(buf);
> >> +
> >> +	lock_write_side(pipe);
> >> +
> >> +	if (!lttng_pipe_is_write_open(pipe)) {
> >> +		ret = -EBADF;
> >> +		goto error;
> >> +	}
> >> +
> >> +	write_left = count;
> >> +	index = 0;
> >> +	do {
> >> +		write_len = write(pipe->fd[1], buf + index, write_left);
> >> +		if (write_len < 0) {
> >> +			ret = -errno;
> >> +			if (errno == EINTR) {
> >> +				/* Read again. */
> >> +				continue;
> >> +			} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
> >> +				/*
> >> +				 * Return the number of bytes read up to this point if any.
> >> +				 */
> >> +				if (index) {
> >> +					ret = index;
> >> +				}
> >> +				goto error;
> >> +			} else {
> >> +				PERROR("lttng pipe write");
> >> +				goto error;
> >> +			}
> >> +		}
> >> +		write_left -= write_len;
> >> +		index += write_len;
> >> +	} while (write_left > 0);
> >> +
> >> +	/* Everything went fine. */
> >> +	ret = index;
> >> +
> >> +error:
> >> +	unlock_write_side(pipe);
> >> +	return ret;
> >> +}
> >> diff --git a/src/common/pipe.h b/src/common/pipe.h
> >> new file mode 100644
> >> index 0000000..acd05d8
> >> --- /dev/null
> >> +++ b/src/common/pipe.h
> >> @@ -0,0 +1,76 @@
> >> +/*
> >> + * Copyright (C) 2013 - David Goulet <dgoulet at efficios.com>
> >> + *
> >> + * This program is free software; you can redistribute it and/or modify it
> >> + * under the terms of the GNU General Public License, version 2 only, as
> >> + * published by the Free Software Foundation.
> >> + *
> >> + * This program is distributed in the hope that it will be useful, but WITHOUT
> >> + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
> >> + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
> >> + * more details.
> >> + *
> >> + * You should have received a copy of the GNU General Public License along with
> >> + * this program; if not, write to the Free Software Foundation, Inc., 51
> >> + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
> >> + */
> >> +
> >> +#ifndef LTTNG_PIPE_H
> >> +#define LTTNG_PIPE_H
> >> +
> >> +#include <pthread.h>
> >> +
> >> +enum lttng_pipe_state {
> >> +	LTTNG_PIPE_STATE_OPENED = 1,
> >> +	LTTNG_PIPE_STATE_CLOSED = 2,
> >> +};
> >> +
> >> +struct lttng_pipe {
> >> +	/* Read: 0, Write: 1. */
> >> +	int fd[2];
> >> +	/*
> >> +	 * Flags of the pipe once opened. pipe(2) specifies either O_NONBLOCK or
> >> +	 * O_CLOEXEC can be used. Flags are set using fcntl(2) call.
> >> +	 */
> >> +	int flags;
> >> +
> >> +	/*
> >> +	 * These states are protected by the operation mutex below.
> >> +	 */
> >> +	enum lttng_pipe_state r_state;
> >> +	enum lttng_pipe_state w_state;
> >> +
> >> +	/* Held for each read(2) operation. */
> >> +	pthread_mutex_t read_mutex;
> >> +	/* Held for each write(2) operation. */
> >> +	pthread_mutex_t write_mutex;
> >> +};
> >> +
> >> +/*
> >> + * Return 1 if read side is open else 0.
> >> + */
> >> +static inline int lttng_pipe_is_read_open(struct lttng_pipe *pipe)
> >> +{
> >> +	return pipe->r_state == LTTNG_PIPE_STATE_OPENED ? 1 : 0;
> >> +}
> >> +
> >> +/*
> >> + * Return 1 if write side is open else 0.
> >> + */
> >> +static inline int lttng_pipe_is_write_open(struct lttng_pipe *pipe)
> >> +{
> >> +	return pipe->w_state == LTTNG_PIPE_STATE_OPENED ? 1 : 0;
> >> +}
> >> +
> >> +struct lttng_pipe *lttng_pipe_open(int flags);
> >> +int lttng_pipe_write_close(struct lttng_pipe *pipe);
> >> +int lttng_pipe_read_close(struct lttng_pipe *pipe);
> >> +/* Close both side of pipe. */
> >> +int lttng_pipe_close(struct lttng_pipe *pipe);
> >> +void lttng_pipe_destroy(struct lttng_pipe *pipe);
> >> +
> >> +ssize_t lttng_pipe_read(struct lttng_pipe *pipe, void *buf, size_t count);
> >> +ssize_t lttng_pipe_write(struct lttng_pipe *pipe, const void *buf,
> >> +		size_t count);
> >> +
> >> +#endif /* LTTNG_PIPE_H */
> >> -- 
> >> 1.7.10.4
> >>
> >>
> >> _______________________________________________
> >> lttng-dev mailing list
> >> lttng-dev at lists.lttng.org
> >> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev
> > 

-- 
Mathieu Desnoyers
EfficiOS Inc.
http://www.efficios.com



More information about the lttng-dev mailing list