[lttng-dev] [lttng-tools PATCH] Don't send the subbuffer padding for streaming
Mathieu Desnoyers
mathieu.desnoyers at efficios.com
Thu Sep 13 15:52:06 EDT 2012
* David Goulet (dgoulet at efficios.com) wrote:
> For network streaming, with the mmap() mechanism only for now, the
> consumer does NOT send the padding over the network. Instead, the size
> of the padding is specified in the data header or metadata payload.
>
> The lttng-relayd now is the one appending the zeros to the trace files.
>
> Again, this functionnality is NOT available yet for splice output.
functionality -> feature
>
> Signed-off-by: David Goulet <dgoulet at efficios.com>
> ---
> src/bin/lttng-relayd/main.c | 39 ++++++++++
> src/common/consumer.c | 97 ++++++++++++++----------
> src/common/consumer.h | 6 +-
> src/common/kernel-consumer/kernel-consumer.c | 105 +++++++++++++++-----------
> src/common/sessiond-comm/relayd.h | 2 +
> src/common/ust-consumer/ust-consumer.c | 19 +++--
> src/common/ust-consumer/ust-consumer.h | 6 +-
> 7 files changed, 183 insertions(+), 91 deletions(-)
>
> diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c
> index e9b70a3..f60199b 100644
> --- a/src/bin/lttng-relayd/main.c
> +++ b/src/bin/lttng-relayd/main.c
> @@ -1145,6 +1145,31 @@ end:
> }
>
> /*
> + * Append padding to the file pointed by the file descriptor fd.
> + */
> +static int write_padding_to_file(int fd, uint32_t size)
> +{
> + int ret = 0;
> + char zeros[size];
Ouch, I don't like to see this on the stack. In user-space, the stack is
usually limited to 8MB, but pthread could be configured to less than
that.
A quick solution is to allocate memory dynamically for that, and free it
at the end of the function.
> +
> + if (size == 0) {
> + goto end;
> + }
> +
> + memset(zeros, 0, size);
> +
> + do {
> + ret = write(fd, zeros, size);
> + } while (ret < 0 && errno == EINTR);
> + if (ret < 0) {
> + PERROR("write padding to file");
> + }
> +
> +end:
> + return ret;
> +}
> +
> +/*
> * relay_recv_metadata: receive the metada for the session.
> */
> static
> @@ -1208,6 +1233,14 @@ int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
> ret = -1;
> goto end_unlock;
> }
> +
> +
Could remove a newline here.
> + ret = write_padding_to_file(metadata_stream->fd,
> + be32toh(metadata_struct->padding_size));
> + if (ret < 0) {
> + goto end_unlock;
> + }
> +
> DBG2("Relay metadata written");
>
> end_unlock:
> @@ -1357,6 +1390,12 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht)
> ret = -1;
> goto end_unlock;
> }
> +
> + ret = write_padding_to_file(stream->fd, be32toh(data_hdr.padding_size));
> + if (ret < 0) {
> + goto end_unlock;
> + }
> +
> DBG2("Relay wrote %d bytes to tracefile for stream id %" PRIu64,
> ret, stream->stream_handle);
>
> diff --git a/src/common/consumer.c b/src/common/consumer.c
> index f093f0c..16a6c47 100644
> --- a/src/common/consumer.c
> +++ b/src/common/consumer.c
> @@ -542,7 +542,8 @@ error:
> * Return destination file descriptor or negative value on error.
> */
> static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
> - size_t data_size, struct consumer_relayd_sock_pair *relayd)
> + size_t data_size, unsigned long padding,
> + struct consumer_relayd_sock_pair *relayd)
> {
> int outfd = -1, ret;
> struct lttcomm_relayd_data_hdr data_hdr;
> @@ -567,6 +568,7 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
> /* Set header with stream information */
> data_hdr.stream_id = htobe64(stream->relayd_stream_id);
> data_hdr.data_size = htobe32(data_size);
> + data_hdr.padding_size = htobe32(padding);
> data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++);
> /* Other fields are zeroed previously */
>
> @@ -1094,22 +1096,23 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
> */
> static int write_relayd_metadata_id(int fd,
> struct lttng_consumer_stream *stream,
> - struct consumer_relayd_sock_pair *relayd)
> + struct consumer_relayd_sock_pair *relayd,
> + unsigned long padding)
> {
> int ret;
> - uint64_t metadata_id;
> + struct lttcomm_relayd_metadata_payload hdr;
>
> - metadata_id = htobe64(stream->relayd_stream_id);
> + hdr.stream_id = htobe64(stream->relayd_stream_id);
> + hdr.padding_size = htobe32(padding);
> do {
> - ret = write(fd, (void *) &metadata_id,
> - sizeof(stream->relayd_stream_id));
> + ret = write(fd, (void *) &hdr, sizeof(hdr));
> } while (ret < 0 && errno == EINTR);
> if (ret < 0) {
> PERROR("write metadata stream id");
> goto end;
> }
> - DBG("Metadata stream id %" PRIu64 " written before data",
> - stream->relayd_stream_id);
> + DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
> + stream->relayd_stream_id, padding);
>
> end:
> return ret;
> @@ -1126,7 +1129,8 @@ end:
> */
> ssize_t lttng_consumer_on_read_subbuffer_mmap(
> struct lttng_consumer_local_data *ctx,
> - struct lttng_consumer_stream *stream, unsigned long len)
> + struct lttng_consumer_stream *stream, unsigned long len,
> + unsigned long padding)
> {
> unsigned long mmap_offset;
> ssize_t ret = 0, written = 0;
> @@ -1178,17 +1182,17 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
> if (stream->metadata_flag) {
> /* Metadata requires the control socket. */
> pthread_mutex_lock(&relayd->ctrl_sock_mutex);
> - netlen += sizeof(stream->relayd_stream_id);
> + netlen += sizeof(struct lttcomm_relayd_metadata_payload);
> }
>
> - ret = write_relayd_stream_header(stream, netlen, relayd);
> + ret = write_relayd_stream_header(stream, netlen, padding, relayd);
> if (ret >= 0) {
> /* Use the returned socket. */
> outfd = ret;
>
> /* Write metadata stream id before payload */
> if (stream->metadata_flag) {
> - ret = write_relayd_metadata_id(outfd, stream, relayd);
> + ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
> if (ret < 0) {
> written = ret;
> goto end;
> @@ -1196,12 +1200,16 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
> }
> }
> /* Else, use the default set before which is the filesystem. */
> + } else {
> + /* No streaming, we have to set the len with the full padding */
> + len += padding;
> }
>
> while (len > 0) {
> do {
> ret = write(outfd, stream->mmap_base + mmap_offset, len);
> } while (ret < 0 && errno == EINTR);
> + DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
> if (ret < 0) {
> PERROR("Error in file write");
> if (written == 0) {
> @@ -1216,7 +1224,6 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
> len -= ret;
> mmap_offset += ret;
> }
> - DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
>
> /* This call is useless on a socket so better save a syscall. */
> if (!relayd) {
> @@ -1246,7 +1253,8 @@ end:
> */
> ssize_t lttng_consumer_on_read_subbuffer_splice(
> struct lttng_consumer_local_data *ctx,
> - struct lttng_consumer_stream *stream, unsigned long len)
> + struct lttng_consumer_stream *stream, unsigned long len,
> + unsigned long padding)
> {
> ssize_t ret = 0, written = 0, ret_splice = 0;
> loff_t offset = 0;
> @@ -1292,23 +1300,42 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
> }
>
> /* Write metadata stream id before payload */
> - if (stream->metadata_flag && relayd) {
> - /*
> - * Lock the control socket for the complete duration of the function
> - * since from this point on we will use the socket.
> - */
> - pthread_mutex_lock(&relayd->ctrl_sock_mutex);
> + if (relayd) {
> + int total_len = len;
>
> - ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd);
> - if (ret < 0) {
> - written = ret;
> + if (stream->metadata_flag) {
> + /*
> + * Lock the control socket for the complete duration of the function
> + * since from this point on we will use the socket.
> + */
> + pthread_mutex_lock(&relayd->ctrl_sock_mutex);
> +
> + ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd,
> + padding);
> + if (ret < 0) {
> + written = ret;
> + goto end;
> + }
> +
> + total_len += sizeof(struct lttcomm_relayd_metadata_payload);
> + }
> +
> + ret = write_relayd_stream_header(stream, total_len, padding, relayd);
> + if (ret >= 0) {
> + /* Use the returned socket. */
> + outfd = ret;
> + } else {
> + ERR("Remote relayd disconnected. Stopping");
> goto end;
> }
> + } else {
> + /* No streaming, we have to set the len with the full padding */
> + len += padding;
> }
>
> while (len > 0) {
> - DBG("splice chan to pipe offset %lu of len %lu (fd : %d)",
> - (unsigned long)offset, len, fd);
> + DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
> + (unsigned long)offset, len, fd, splice_pipe[1]);
> ret_splice = splice(fd, &offset, splice_pipe[1], NULL, len,
> SPLICE_F_MOVE | SPLICE_F_MORE);
> DBG("splice chan to pipe, ret %zd", ret_splice);
> @@ -1324,30 +1351,24 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
> /* Handle stream on the relayd if the output is on the network */
> if (relayd) {
> if (stream->metadata_flag) {
> + size_t metadata_payload_size =
> + sizeof(struct lttcomm_relayd_metadata_payload);
> +
> /* Update counter to fit the spliced data */
> - ret_splice += sizeof(stream->relayd_stream_id);
> - len += sizeof(stream->relayd_stream_id);
> + ret_splice += metadata_payload_size;
> + len += metadata_payload_size;
> /*
> * We do this so the return value can match the len passed as
> * argument to this function.
> */
> - written -= sizeof(stream->relayd_stream_id);
> - }
> -
> - ret = write_relayd_stream_header(stream, ret_splice, relayd);
> - if (ret >= 0) {
> - /* Use the returned socket. */
> - outfd = ret;
> - } else {
> - ERR("Remote relayd disconnected. Stopping");
> - goto end;
> + written -= metadata_payload_size;
> }
> }
>
> /* Splice data out */
> ret_splice = splice(splice_pipe[0], NULL, outfd, NULL,
> ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
> - DBG("Kernel consumer splice pipe to file, ret %zd", ret_splice);
> + DBG("Consumer splice pipe to file, ret %zd", ret_splice);
> if (ret_splice < 0) {
> PERROR("Error in file splice");
> if (written == 0) {
> diff --git a/src/common/consumer.h b/src/common/consumer.h
> index e307b18..4da4b70 100644
> --- a/src/common/consumer.h
> +++ b/src/common/consumer.h
> @@ -364,10 +364,12 @@ extern struct lttng_consumer_local_data *lttng_consumer_create(
> extern void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx);
> extern ssize_t lttng_consumer_on_read_subbuffer_mmap(
> struct lttng_consumer_local_data *ctx,
> - struct lttng_consumer_stream *stream, unsigned long len);
> + struct lttng_consumer_stream *stream, unsigned long len,
> + unsigned long padding);
> extern ssize_t lttng_consumer_on_read_subbuffer_splice(
> struct lttng_consumer_local_data *ctx,
> - struct lttng_consumer_stream *stream, unsigned long len);
> + struct lttng_consumer_stream *stream, unsigned long len,
> + unsigned long padding);
> extern int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
> struct lttng_consumer_stream *stream);
> extern int lttng_consumer_get_produced_snapshot(
> diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c
> index fe93c2e..56f3011 100644
> --- a/src/common/kernel-consumer/kernel-consumer.c
> +++ b/src/common/kernel-consumer/kernel-consumer.c
> @@ -295,7 +295,7 @@ end_nosignal:
> ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
> struct lttng_consumer_local_data *ctx)
> {
> - unsigned long len;
> + unsigned long len, subbuf_size, padding;
> int err;
> ssize_t ret = 0;
> int infd = stream->wait_fd;
> @@ -304,7 +304,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
> /* Get the next subbuffer */
> err = kernctl_get_next_subbuf(infd);
> if (err != 0) {
> - ret = -err;
> + ret = err;
> /*
> * This is a debug message even for single-threaded consumer,
> * because poll() have more relaxed criterions than get subbuf,
> @@ -316,51 +316,68 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
> goto end;
> }
>
> + /* Get the full subbuffer size including padding */
> + err = kernctl_get_padded_subbuf_size(infd, &len);
> + if (err != 0) {
> + errno = -err;
> + perror("Getting sub-buffer len failed.");
> + ret = -err;
ret = err instead ?
> + goto end;
> + }
> +
> switch (stream->output) {
> - case LTTNG_EVENT_SPLICE:
> - /* read the whole subbuffer */
> - err = kernctl_get_padded_subbuf_size(infd, &len);
> - if (err != 0) {
> - errno = -err;
> - perror("Getting sub-buffer len failed.");
> - ret = -err;
> - goto end;
> - }
> + case LTTNG_EVENT_SPLICE:
>
> - /* splice the subbuffer to the tracefile */
> - ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, len);
> - if (ret != len) {
> - /*
> - * display the error but continue processing to try
> - * to release the subbuffer
> - */
> - ERR("Error splicing to tracefile (ret: %zd != len: %lu)",
> - ret, len);
> - }
> + /*
> + * XXX: The lttng-modules splice "actor" does not handle copying
> + * partial pages hence only using the subbuffer size without the
> + * padding makes the splice fail.
> + */
> + subbuf_size = len;
> + padding = 0;
> +
> + /* splice the subbuffer to the tracefile */
> + ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream,
> + subbuf_size, padding);
> + if (ret != subbuf_size) {
> + /*
> + * display the error but continue processing to try
> + * to release the subbuffer
> + */
> + ERR("Error splicing to tracefile (ret: %zd != len: %lu)",
> + ret, subbuf_size);
> + }
> + break;
> + case LTTNG_EVENT_MMAP:
> + /* Get subbuffer size without padding */
> + err = kernctl_get_subbuf_size(infd, &subbuf_size);
> + if (err != 0) {
> + errno = -err;
> + perror("Getting sub-buffer len failed.");
> + ret = -err;
ret = err ?
Thanks,
Mathieu
> + goto end;
> + }
>
> - break;
> - case LTTNG_EVENT_MMAP:
> - /* read the used subbuffer size */
> - err = kernctl_get_padded_subbuf_size(infd, &len);
> - if (err != 0) {
> - errno = -err;
> - perror("Getting sub-buffer len failed.");
> - ret = -err;
> - goto end;
> - }
> - /* write the subbuffer to the tracefile */
> - ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
> - if (ret != len) {
> - /*
> - * display the error but continue processing to try
> - * to release the subbuffer
> - */
> - ERR("Error writing to tracefile");
> - }
> - break;
> - default:
> - ERR("Unknown output method");
> - ret = -1;
> + /* Make sure the tracer is not gone mad on us! */
> + assert(len >= subbuf_size);
> +
> + padding = len - subbuf_size;
> +
> + /* write the subbuffer to the tracefile */
> + ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream,
> + subbuf_size, padding);
> + if (ret != subbuf_size) {
> + /*
> + * display the error but continue processing to try
> + * to release the subbuffer
> + */
> + ERR("Error writing to tracefile (ret: %zd != len: %lu",
> + ret, subbuf_size);
> + }
> + break;
> + default:
> + ERR("Unknown output method");
> + ret = -1;
> }
>
> err = kernctl_put_next_subbuf(infd);
> diff --git a/src/common/sessiond-comm/relayd.h b/src/common/sessiond-comm/relayd.h
> index 24b7c91..5d4fddf 100644
> --- a/src/common/sessiond-comm/relayd.h
> +++ b/src/common/sessiond-comm/relayd.h
> @@ -49,6 +49,7 @@ struct lttcomm_relayd_data_hdr {
> uint64_t stream_id; /* Stream ID known by the relayd */
> uint64_t net_seq_num; /* Network sequence number, per stream. */
> uint32_t data_size; /* data size following this header */
> + uint32_t padding_size; /* Size of 0 padding the data */
> } __attribute__ ((__packed__));
>
> /*
> @@ -94,6 +95,7 @@ struct lttcomm_relayd_version {
> */
> struct lttcomm_relayd_metadata_payload {
> uint64_t stream_id;
> + uint32_t padding_size;
> char payload[];
> } __attribute__ ((__packed__));
>
> diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c
> index 1544ddb..e7d6dd4 100644
> --- a/src/common/ust-consumer/ust-consumer.c
> +++ b/src/common/ust-consumer/ust-consumer.c
> @@ -399,7 +399,7 @@ void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
> int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
> struct lttng_consumer_local_data *ctx)
> {
> - unsigned long len;
> + unsigned long len, subbuf_size, padding;
> int err;
> long ret = 0;
> struct lttng_ust_shm_handle *handle;
> @@ -426,7 +426,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
> /* Get the next subbuffer */
> err = ustctl_get_next_subbuf(handle, buf);
> if (err != 0) {
> - ret = -err; /* ustctl_get_next_subbuf returns negative, caller expect positive. */
> + ret = err; /* ustctl_get_next_subbuf returns negative, caller expect positive. */
> /*
> * This is a debug message even for single-threaded consumer,
> * because poll() have more relaxed criterions than get subbuf,
> @@ -438,12 +438,21 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
> goto end;
> }
> assert(stream->output == LTTNG_EVENT_MMAP);
> - /* read the used subbuffer size */
> + /* Get the full padded subbuffer size */
> err = ustctl_get_padded_subbuf_size(handle, buf, &len);
> assert(err == 0);
> +
> + /* Get subbuffer data size (without padding) */
> + err = ustctl_get_subbuf_size(handle, buf, &subbuf_size);
> + assert(err == 0);
> +
> + /* Make sure we don't get a subbuffer size bigger than the padded */
> + assert(len >= subbuf_size);
> +
> + padding = len - subbuf_size;
> /* write the subbuffer to the tracefile */
> - ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
> - if (ret != len) {
> + ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding);
> + if (ret != subbuf_size) {
> /*
> * display the error but continue processing to try
> * to release the subbuffer
> diff --git a/src/common/ust-consumer/ust-consumer.h b/src/common/ust-consumer/ust-consumer.h
> index 7e055a9..3f76f23 100644
> --- a/src/common/ust-consumer/ust-consumer.h
> +++ b/src/common/ust-consumer/ust-consumer.h
> @@ -67,7 +67,8 @@ extern int lttng_ustctl_get_mmap_read_offset(
> static inline
> ssize_t lttng_ustconsumer_on_read_subbuffer_mmap(
> struct lttng_consumer_local_data *ctx,
> - struct lttng_consumer_stream *stream, unsigned long len)
> + struct lttng_consumer_stream *stream, unsigned long len,
> + unsigned long padding)
> {
> return -ENOSYS;
> }
> @@ -75,7 +76,8 @@ ssize_t lttng_ustconsumer_on_read_subbuffer_mmap(
> static inline
> ssize_t lttng_ustconsumer_on_read_subbuffer_splice(
> struct lttng_consumer_local_data *ctx,
> - struct lttng_consumer_stream *uststream, unsigned long len)
> + struct lttng_consumer_stream *uststream, unsigned long len,
> + unsigned long padding)
> {
> return -ENOSYS;
> }
> --
> 1.7.10.4
>
>
> _______________________________________________
> lttng-dev mailing list
> lttng-dev at lists.lttng.org
> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev
--
Mathieu Desnoyers
Operating System Efficiency R&D Consultant
EfficiOS Inc.
http://www.efficios.com
More information about the lttng-dev
mailing list