[lttng-dev] [lttng-tools PATCH] Don't send the subbuffer padding for streaming

David Goulet dgoulet at efficios.com
Thu Sep 13 15:57:17 EDT 2012


-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Mathieu Desnoyers:
> * David Goulet (dgoulet at efficios.com) wrote:
>> For network streaming, with the mmap() mechanism only for now,
>> the consumer does NOT send the padding over the network. Instead,
>> the size of the padding is specified in the data header or
>> metadata payload.
>> 
>> The lttng-relayd now is the one appending the zeros to the trace
>> files.
>> 
>> Again, this functionnality is NOT available yet for splice
>> output.
> 
> functionality -> feature
> 
>> 
>> Signed-off-by: David Goulet <dgoulet at efficios.com> --- 
>> src/bin/lttng-relayd/main.c                  |   39 ++++++++++ 
>> src/common/consumer.c                        |   97
>> ++++++++++++++---------- src/common/consumer.h
>> |    6 +- src/common/kernel-consumer/kernel-consumer.c |  105
>> +++++++++++++++----------- src/common/sessiond-comm/relayd.h
>> |    2 + src/common/ust-consumer/ust-consumer.c       |   19
>> +++-- src/common/ust-consumer/ust-consumer.h       |    6 +- 7
>> files changed, 183 insertions(+), 91 deletions(-)
>> 
>> diff --git a/src/bin/lttng-relayd/main.c
>> b/src/bin/lttng-relayd/main.c index e9b70a3..f60199b 100644 ---
>> a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c 
>> @@ -1145,6 +1145,31 @@ end: }
>> 
>> /* + * Append padding to the file pointed by the file descriptor
>> fd. + */ +static int write_padding_to_file(int fd, uint32_t
>> size) +{ +	int ret = 0; +	char zeros[size];
> 
> Ouch, I don't like to see this on the stack. In user-space, the
> stack is usually limited to 8MB, but pthread could be configured to
> less than that.
> 
> A quick solution is to allocate memory dynamically for that, and
> free it at the end of the function.

Hmmm right!

> 
>> + +	if (size == 0) { +		goto end; +	} + +	memset(zeros, 0,
>> size); + +	do { +		ret = write(fd, zeros, size); +	} while (ret <
>> 0 && errno == EINTR); +	if (ret < 0) { +		PERROR("write padding
>> to file"); +	} + +end: +	return ret; +} + +/* *
>> relay_recv_metadata: receive the metada for the session. */ 
>> static @@ -1208,6 +1233,14 @@ int relay_recv_metadata(struct
>> lttcomm_relayd_hdr *recv_hdr, ret = -1; goto end_unlock; } + +
> 
> Could remove a newline here.
> 
>> +	ret = write_padding_to_file(metadata_stream->fd, +
>> be32toh(metadata_struct->padding_size)); +	if (ret < 0) { +		goto
>> end_unlock; +	} + DBG2("Relay metadata written");
>> 
>> end_unlock: @@ -1357,6 +1390,12 @@ int relay_process_data(struct
>> relay_command *cmd, struct lttng_ht *streams_ht) ret = -1; goto
>> end_unlock; } + +	ret = write_padding_to_file(stream->fd,
>> be32toh(data_hdr.padding_size)); +	if (ret < 0) { +		goto
>> end_unlock; +	} + DBG2("Relay wrote %d bytes to tracefile for
>> stream id %" PRIu64, ret, stream->stream_handle);
>> 
>> diff --git a/src/common/consumer.c b/src/common/consumer.c index
>> f093f0c..16a6c47 100644 --- a/src/common/consumer.c +++
>> b/src/common/consumer.c @@ -542,7 +542,8 @@ error: * Return
>> destination file descriptor or negative value on error. */ static
>> int write_relayd_stream_header(struct lttng_consumer_stream
>> *stream, -		size_t data_size, struct consumer_relayd_sock_pair
>> *relayd) +		size_t data_size, unsigned long padding, +		struct
>> consumer_relayd_sock_pair *relayd) { int outfd = -1, ret; struct
>> lttcomm_relayd_data_hdr data_hdr; @@ -567,6 +568,7 @@ static int
>> write_relayd_stream_header(struct lttng_consumer_stream *stream, 
>> /* Set header with stream information */ data_hdr.stream_id =
>> htobe64(stream->relayd_stream_id); data_hdr.data_size =
>> htobe32(data_size); +		data_hdr.padding_size = htobe32(padding); 
>> data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++); /*
>> Other fields are zeroed previously */
>> 
>> @@ -1094,22 +1096,23 @@ void lttng_consumer_destroy(struct
>> lttng_consumer_local_data *ctx) */ static int
>> write_relayd_metadata_id(int fd, struct lttng_consumer_stream
>> *stream, -		struct consumer_relayd_sock_pair *relayd) +		struct
>> consumer_relayd_sock_pair *relayd, +		unsigned long padding) { 
>> int ret; -	uint64_t metadata_id; +	struct
>> lttcomm_relayd_metadata_payload hdr;
>> 
>> -	metadata_id = htobe64(stream->relayd_stream_id); +
>> hdr.stream_id = htobe64(stream->relayd_stream_id); +
>> hdr.padding_size = htobe32(padding); do { -		ret = write(fd,
>> (void *) &metadata_id, -				sizeof(stream->relayd_stream_id)); +
>> ret = write(fd, (void *) &hdr, sizeof(hdr)); } while (ret < 0 &&
>> errno == EINTR); if (ret < 0) { PERROR("write metadata stream
>> id"); goto end; } -	DBG("Metadata stream id %" PRIu64 " written
>> before data", -			stream->relayd_stream_id); +	DBG("Metadata
>> stream id %" PRIu64 " with padding %lu written before data", +
>> stream->relayd_stream_id, padding);
>> 
>> end: return ret; @@ -1126,7 +1129,8 @@ end: */ ssize_t
>> lttng_consumer_on_read_subbuffer_mmap( struct
>> lttng_consumer_local_data *ctx, -		struct lttng_consumer_stream
>> *stream, unsigned long len) +		struct lttng_consumer_stream
>> *stream, unsigned long len, +		unsigned long padding) { unsigned
>> long mmap_offset; ssize_t ret = 0, written = 0; @@ -1178,17
>> +1182,17 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( if
>> (stream->metadata_flag) { /* Metadata requires the control
>> socket. */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); -
>> netlen += sizeof(stream->relayd_stream_id); +			netlen +=
>> sizeof(struct lttcomm_relayd_metadata_payload); }
>> 
>> -		ret = write_relayd_stream_header(stream, netlen, relayd); +
>> ret = write_relayd_stream_header(stream, netlen, padding,
>> relayd); if (ret >= 0) { /* Use the returned socket. */ outfd =
>> ret;
>> 
>> /* Write metadata stream id before payload */ if
>> (stream->metadata_flag) { -				ret =
>> write_relayd_metadata_id(outfd, stream, relayd); +				ret =
>> write_relayd_metadata_id(outfd, stream, relayd, padding); if (ret
>> < 0) { written = ret; goto end; @@ -1196,12 +1200,16 @@ ssize_t
>> lttng_consumer_on_read_subbuffer_mmap( } } /* Else, use the
>> default set before which is the filesystem. */ +	} else { +		/*
>> No streaming, we have to set the len with the full padding */ +
>> len += padding; }
>> 
>> while (len > 0) { do { ret = write(outfd, stream->mmap_base +
>> mmap_offset, len); } while (ret < 0 && errno == EINTR); +
>> DBG("Consumer mmap write() ret %zd (len %lu)", ret, len); if (ret
>> < 0) { PERROR("Error in file write"); if (written == 0) { @@
>> -1216,7 +1224,6 @@ ssize_t
>> lttng_consumer_on_read_subbuffer_mmap( len -= ret; mmap_offset +=
>> ret; } -		DBG("Consumer mmap write() ret %zd (len %lu)", ret,
>> len);
>> 
>> /* This call is useless on a socket so better save a syscall. */ 
>> if (!relayd) { @@ -1246,7 +1253,8 @@ end: */ ssize_t
>> lttng_consumer_on_read_subbuffer_splice( struct
>> lttng_consumer_local_data *ctx, -		struct lttng_consumer_stream
>> *stream, unsigned long len) +		struct lttng_consumer_stream
>> *stream, unsigned long len, +		unsigned long padding) { ssize_t
>> ret = 0, written = 0, ret_splice = 0; loff_t offset = 0; @@
>> -1292,23 +1300,42 @@ ssize_t
>> lttng_consumer_on_read_subbuffer_splice( }
>> 
>> /* Write metadata stream id before payload */ -	if
>> (stream->metadata_flag && relayd) { -		/* -		 * Lock the control
>> socket for the complete duration of the function -		 * since from
>> this point on we will use the socket. -		 */ -
>> pthread_mutex_lock(&relayd->ctrl_sock_mutex); +	if (relayd) { +
>> int total_len = len;
>> 
>> -		ret = write_relayd_metadata_id(splice_pipe[1], stream,
>> relayd); -		if (ret < 0) { -			written = ret; +		if
>> (stream->metadata_flag) { +			/* +			 * Lock the control socket
>> for the complete duration of the function +			 * since from this
>> point on we will use the socket. +			 */ +
>> pthread_mutex_lock(&relayd->ctrl_sock_mutex); + +			ret =
>> write_relayd_metadata_id(splice_pipe[1], stream, relayd, +
>> padding); +			if (ret < 0) { +				written = ret; +				goto end; +
>> } + +			total_len += sizeof(struct
>> lttcomm_relayd_metadata_payload); +		} + +		ret =
>> write_relayd_stream_header(stream, total_len, padding, relayd); +
>> if (ret >= 0) { +			/* Use the returned socket. */ +			outfd =
>> ret; +		} else { +			ERR("Remote relayd disconnected.
>> Stopping"); goto end; } +	} else { +		/* No streaming, we have to
>> set the len with the full padding */ +		len += padding; }
>> 
>> while (len > 0) { -		DBG("splice chan to pipe offset %lu of len
>> %lu (fd : %d)", -				(unsigned long)offset, len, fd); +
>> DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe:
>> %d)", +				(unsigned long)offset, len, fd, splice_pipe[1]); 
>> ret_splice = splice(fd, &offset, splice_pipe[1], NULL, len, 
>> SPLICE_F_MOVE | SPLICE_F_MORE); DBG("splice chan to pipe, ret
>> %zd", ret_splice); @@ -1324,30 +1351,24 @@ ssize_t
>> lttng_consumer_on_read_subbuffer_splice( /* Handle stream on the
>> relayd if the output is on the network */ if (relayd) { if
>> (stream->metadata_flag) { +				size_t metadata_payload_size = +
>> sizeof(struct lttcomm_relayd_metadata_payload); + /* Update
>> counter to fit the spliced data */ -				ret_splice +=
>> sizeof(stream->relayd_stream_id); -				len +=
>> sizeof(stream->relayd_stream_id); +				ret_splice +=
>> metadata_payload_size; +				len += metadata_payload_size; /* * We
>> do this so the return value can match the len passed as *
>> argument to this function. */ -				written -=
>> sizeof(stream->relayd_stream_id); -			} - -			ret =
>> write_relayd_stream_header(stream, ret_splice, relayd); -			if
>> (ret >= 0) { -				/* Use the returned socket. */ -				outfd =
>> ret; -			} else { -				ERR("Remote relayd disconnected.
>> Stopping"); -				goto end; +				written -=
>> metadata_payload_size; } }
>> 
>> /* Splice data out */ ret_splice = splice(splice_pipe[0], NULL,
>> outfd, NULL, ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE); -
>> DBG("Kernel consumer splice pipe to file, ret %zd", ret_splice); 
>> +		DBG("Consumer splice pipe to file, ret %zd", ret_splice); if
>> (ret_splice < 0) { PERROR("Error in file splice"); if (written ==
>> 0) { diff --git a/src/common/consumer.h b/src/common/consumer.h 
>> index e307b18..4da4b70 100644 --- a/src/common/consumer.h +++
>> b/src/common/consumer.h @@ -364,10 +364,12 @@ extern struct
>> lttng_consumer_local_data *lttng_consumer_create( extern void
>> lttng_consumer_destroy(struct lttng_consumer_local_data *ctx); 
>> extern ssize_t lttng_consumer_on_read_subbuffer_mmap( struct
>> lttng_consumer_local_data *ctx, -		struct lttng_consumer_stream
>> *stream, unsigned long len); +		struct lttng_consumer_stream
>> *stream, unsigned long len, +		unsigned long padding); extern
>> ssize_t lttng_consumer_on_read_subbuffer_splice( struct
>> lttng_consumer_local_data *ctx, -		struct lttng_consumer_stream
>> *stream, unsigned long len); +		struct lttng_consumer_stream
>> *stream, unsigned long len, +		unsigned long padding); extern int
>> lttng_consumer_take_snapshot(struct lttng_consumer_local_data
>> *ctx, struct lttng_consumer_stream *stream); extern int
>> lttng_consumer_get_produced_snapshot( diff --git
>> a/src/common/kernel-consumer/kernel-consumer.c
>> b/src/common/kernel-consumer/kernel-consumer.c index
>> fe93c2e..56f3011 100644 ---
>> a/src/common/kernel-consumer/kernel-consumer.c +++
>> b/src/common/kernel-consumer/kernel-consumer.c @@ -295,7 +295,7
>> @@ end_nosignal: ssize_t lttng_kconsumer_read_subbuffer(struct
>> lttng_consumer_stream *stream, struct lttng_consumer_local_data
>> *ctx) { -	unsigned long len; +	unsigned long len, subbuf_size,
>> padding; int err; ssize_t ret = 0; int infd = stream->wait_fd; @@
>> -304,7 +304,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct
>> lttng_consumer_stream *stream, /* Get the next subbuffer */ err =
>> kernctl_get_next_subbuf(infd); if (err != 0) { -		ret = -err; +
>> ret = err; /* * This is a debug message even for single-threaded
>> consumer, * because poll() have more relaxed criterions than get
>> subbuf, @@ -316,51 +316,68 @@ ssize_t
>> lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream
>> *stream, goto end; }
>> 
>> +	/* Get the full subbuffer size including padding */ +	err =
>> kernctl_get_padded_subbuf_size(infd, &len); +	if (err != 0) { +
>> errno = -err; +		perror("Getting sub-buffer len failed."); +		ret
>> = -err;
> 
> ret = err instead ?

Yes and yes for the followings :P

Thanks
David

> 
>> +		goto end; +	} + switch (stream->output) { -		case
>> LTTNG_EVENT_SPLICE: -			/* read the whole subbuffer */ -			err =
>> kernctl_get_padded_subbuf_size(infd, &len); -			if (err != 0) { -
>> errno = -err; -				perror("Getting sub-buffer len failed."); -
>> ret = -err; -				goto end; -			} +	case LTTNG_EVENT_SPLICE:
>> 
>> -			/* splice the subbuffer to the tracefile */ -			ret =
>> lttng_consumer_on_read_subbuffer_splice(ctx, stream, len); -			if
>> (ret != len) { -				/* -				 * display the error but continue
>> processing to try -				 * to release the subbuffer -				 */ -
>> ERR("Error splicing to tracefile (ret: %zd != len: %lu)", -
>> ret, len); -			} +		/* +		 * XXX: The lttng-modules splice
>> "actor" does not handle copying +		 * partial pages hence only
>> using the subbuffer size without the +		 * padding makes the
>> splice fail. +		 */ +		subbuf_size = len; +		padding = 0; + +		/*
>> splice the subbuffer to the tracefile */ +		ret =
>> lttng_consumer_on_read_subbuffer_splice(ctx, stream, +
>> subbuf_size, padding); +		if (ret != subbuf_size) { +			/* +			 *
>> display the error but continue processing to try +			 * to
>> release the subbuffer +			 */ +			ERR("Error splicing to
>> tracefile (ret: %zd != len: %lu)", +					ret, subbuf_size); +		} 
>> +		break; +	case LTTNG_EVENT_MMAP: +		/* Get subbuffer size
>> without padding */ +		err = kernctl_get_subbuf_size(infd,
>> &subbuf_size); +		if (err != 0) { +			errno = -err; +
>> perror("Getting sub-buffer len failed."); +			ret = -err;
> 
> ret = err ?
> 
> Thanks,
> 
> Mathieu
> 
>> +			goto end; +		}
>> 
>> -			break; -		case LTTNG_EVENT_MMAP: -			/* read the used
>> subbuffer size */ -			err = kernctl_get_padded_subbuf_size(infd,
>> &len); -			if (err != 0) { -				errno = -err; -
>> perror("Getting sub-buffer len failed."); -				ret = -err; -
>> goto end; -			} -			/* write the subbuffer to the tracefile */ -
>> ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len); -
>> if (ret != len) { -				/* -				 * display the error but continue
>> processing to try -				 * to release the subbuffer -				 */ -
>> ERR("Error writing to tracefile"); -			} -			break; -		default: -
>> ERR("Unknown output method"); -			ret = -1; +		/* Make sure the
>> tracer is not gone mad on us! */ +		assert(len >= subbuf_size); 
>> + +		padding = len - subbuf_size; + +		/* write the subbuffer to
>> the tracefile */ +		ret =
>> lttng_consumer_on_read_subbuffer_mmap(ctx, stream, +
>> subbuf_size, padding); +		if (ret != subbuf_size) { +			/* +			 *
>> display the error but continue processing to try +			 * to
>> release the subbuffer +			 */ +			ERR("Error writing to tracefile
>> (ret: %zd != len: %lu", +					ret, subbuf_size); +		} +		break; +
>> default: +		ERR("Unknown output method"); +		ret = -1; }
>> 
>> err = kernctl_put_next_subbuf(infd); diff --git
>> a/src/common/sessiond-comm/relayd.h
>> b/src/common/sessiond-comm/relayd.h index 24b7c91..5d4fddf
>> 100644 --- a/src/common/sessiond-comm/relayd.h +++
>> b/src/common/sessiond-comm/relayd.h @@ -49,6 +49,7 @@ struct
>> lttcomm_relayd_data_hdr { uint64_t stream_id;     /* Stream ID
>> known by the relayd */ uint64_t net_seq_num;   /* Network
>> sequence number, per stream. */ uint32_t data_size;     /* data
>> size following this header */ +	uint32_t padding_size;  /* Size
>> of 0 padding the data */ } __attribute__ ((__packed__));
>> 
>> /* @@ -94,6 +95,7 @@ struct lttcomm_relayd_version { */ struct
>> lttcomm_relayd_metadata_payload { uint64_t stream_id; +	uint32_t
>> padding_size; char payload[]; } __attribute__ ((__packed__));
>> 
>> diff --git a/src/common/ust-consumer/ust-consumer.c
>> b/src/common/ust-consumer/ust-consumer.c index 1544ddb..e7d6dd4
>> 100644 --- a/src/common/ust-consumer/ust-consumer.c +++
>> b/src/common/ust-consumer/ust-consumer.c @@ -399,7 +399,7 @@ void
>> lttng_ustconsumer_del_stream(struct lttng_consumer_stream
>> *stream) int lttng_ustconsumer_read_subbuffer(struct
>> lttng_consumer_stream *stream, struct lttng_consumer_local_data
>> *ctx) { -	unsigned long len; +	unsigned long len, subbuf_size,
>> padding; int err; long ret = 0; struct lttng_ust_shm_handle
>> *handle; @@ -426,7 +426,7 @@ int
>> lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream
>> *stream, /* Get the next subbuffer */ err =
>> ustctl_get_next_subbuf(handle, buf); if (err != 0) { -		ret =
>> -err;	/* ustctl_get_next_subbuf returns negative, caller expect
>> positive. */ +		ret = err;	/* ustctl_get_next_subbuf returns
>> negative, caller expect positive. */ /* * This is a debug message
>> even for single-threaded consumer, * because poll() have more
>> relaxed criterions than get subbuf, @@ -438,12 +438,21 @@ int
>> lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream
>> *stream, goto end; } assert(stream->output == LTTNG_EVENT_MMAP); 
>> -	/* read the used subbuffer size */ +	/* Get the full padded
>> subbuffer size */ err = ustctl_get_padded_subbuf_size(handle,
>> buf, &len); assert(err == 0); + +	/* Get subbuffer data size
>> (without padding) */ +	err = ustctl_get_subbuf_size(handle, buf,
>> &subbuf_size); +	assert(err == 0); + +	/* Make sure we don't get
>> a subbuffer size bigger than the padded */ +	assert(len >=
>> subbuf_size); + +	padding = len - subbuf_size; /* write the
>> subbuffer to the tracefile */ -	ret =
>> lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len); -	if
>> (ret != len) { +	ret = lttng_consumer_on_read_subbuffer_mmap(ctx,
>> stream, subbuf_size, padding); +	if (ret != subbuf_size) { /* *
>> display the error but continue processing to try * to release the
>> subbuffer diff --git a/src/common/ust-consumer/ust-consumer.h
>> b/src/common/ust-consumer/ust-consumer.h index 7e055a9..3f76f23
>> 100644 --- a/src/common/ust-consumer/ust-consumer.h +++
>> b/src/common/ust-consumer/ust-consumer.h @@ -67,7 +67,8 @@ extern
>> int lttng_ustctl_get_mmap_read_offset( static inline ssize_t
>> lttng_ustconsumer_on_read_subbuffer_mmap( struct
>> lttng_consumer_local_data *ctx, -		struct lttng_consumer_stream
>> *stream, unsigned long len) +		struct lttng_consumer_stream
>> *stream, unsigned long len, +		unsigned long padding) { return
>> -ENOSYS; } @@ -75,7 +76,8 @@ ssize_t
>> lttng_ustconsumer_on_read_subbuffer_mmap( static inline ssize_t
>> lttng_ustconsumer_on_read_subbuffer_splice( struct
>> lttng_consumer_local_data *ctx, -		struct lttng_consumer_stream
>> *uststream, unsigned long len) +		struct lttng_consumer_stream
>> *uststream, unsigned long len, +		unsigned long padding) { return
>> -ENOSYS; } -- 1.7.10.4
>> 
>> 
>> _______________________________________________ lttng-dev mailing
>> list lttng-dev at lists.lttng.org 
>> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev
> 
-----BEGIN PGP SIGNATURE-----

iQEcBAEBCgAGBQJQUjqaAAoJEELoaioR9I02JP8H+gJDu79hoYKiyysdPNwq9f5d
afC0l5StJttWH3tntC50KTQbl2fg/ghQ65tsxVUbfRe3BG0FngVH99vf06Lg23XL
IEpk+m9EmvmtIYZAcqzO1dzPrEL8Ij5/frZ5fU7RxtNa3xVwr9IEnvBAuKYKYyXj
6ilpXje6rz5Uxcv5M+MNNfgbdZC86rfh4ZzZtDwJywJwR5XJmIV8YzRrBr8FpAkL
Exu1NyrxVuI3GnHSVKBRZJTVJBz1qIa8VfzSWF3FHJ99ZXUju76PMzEeFSUU7lOp
rlYqviwBu9fxrYxfJNwfpms/VyyC15t2pp7V++/LHOFc0NU62QI41CP3OFGzmAs=
=pfLj
-----END PGP SIGNATURE-----



More information about the lttng-dev mailing list