[lttng-dev] [RFC PATCH lttng-tools] relayd: optimize receive throughput
Jérémie Galarneau
jeremie.galarneau at efficios.com
Fri Jun 3 06:29:15 UTC 2016
Merged in master, stable-2.8 and stable-2.7.
Thanks!
Jérémie
On Thu, Jun 2, 2016 at 4:27 PM, Mathieu Desnoyers
<mathieu.desnoyers at efficios.com> wrote:
> For channels configured with large sub-buffer size, the relayd copies
> the entire trace sub-buffer (trace packet) into a large buffer, and then
> copies the large buffer to disk. It is inefficient from a point of view
> of cache locality.
>
> Use a 64k buffer on the stack instead, and move the data piece-wise.
>
> Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> ---
> src/bin/lttng-relayd/main.c | 69 ++++++++++++++++++++++-----------------------
> 1 file changed, 33 insertions(+), 36 deletions(-)
>
> diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c
> index b5b56aa..a1e94dc 100644
> --- a/src/bin/lttng-relayd/main.c
> +++ b/src/bin/lttng-relayd/main.c
> @@ -81,6 +81,10 @@ static int opt_daemon, opt_background;
> */
> #define NR_LTTNG_RELAY_READY 3
> static int lttng_relay_ready = NR_LTTNG_RELAY_READY;
> +
> +/* Size of receive buffer. */
> +#define RECV_DATA_BUFFER_SIZE 65536
> +
> static int recv_child_signal; /* Set to 1 when a SIGUSR1 signal is received. */
> static pid_t child_ppid; /* Internal parent PID use with daemonize. */
>
> @@ -2244,6 +2248,9 @@ static int relay_process_data(struct relay_connection *conn)
> uint32_t data_size;
> struct relay_session *session;
> bool new_stream = false, close_requested = false;
> + size_t chunk_size = RECV_DATA_BUFFER_SIZE;
> + size_t recv_off = 0;
> + char data_buffer[chunk_size];
>
> ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr,
> sizeof(struct lttcomm_relayd_data_hdr), 0);
> @@ -2267,36 +2274,11 @@ static int relay_process_data(struct relay_connection *conn)
> }
> session = stream->trace->session;
> data_size = be32toh(data_hdr.data_size);
> - if (data_buffer_size < data_size) {
> - char *tmp_data_ptr;
> -
> - tmp_data_ptr = realloc(data_buffer, data_size);
> - if (!tmp_data_ptr) {
> - ERR("Allocating data buffer");
> - free(data_buffer);
> - ret = -1;
> - goto end_stream_put;
> - }
> - data_buffer = tmp_data_ptr;
> - data_buffer_size = data_size;
> - }
> - memset(data_buffer, 0, data_size);
>
> net_seq_num = be64toh(data_hdr.net_seq_num);
>
> DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64,
> data_size, stream_id, net_seq_num);
> - ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0);
> - if (ret <= 0) {
> - if (ret == 0) {
> - /* Orderly shutdown. Not necessary to print an error. */
> - DBG("Socket %d did an orderly shutdown", conn->sock->fd);
> - } else {
> - ERR("Socket %d error %d", conn->sock->fd, ret);
> - }
> - ret = -1;
> - goto end_stream_put;
> - }
>
> pthread_mutex_lock(&stream->lock);
>
> @@ -2342,16 +2324,33 @@ static int relay_process_data(struct relay_connection *conn)
> }
> }
>
> - /* Write data to stream output fd. */
> - size_ret = lttng_write(stream->stream_fd->fd, data_buffer, data_size);
> - if (size_ret < data_size) {
> - ERR("Relay error writing data to file");
> - ret = -1;
> - goto end_stream_unlock;
> - }
> + for (recv_off = 0; recv_off < data_size; recv_off += chunk_size) {
> + size_t recv_size = min(data_size - recv_off, chunk_size);
>
> - DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64,
> - size_ret, stream->stream_handle);
> + ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, recv_size, 0);
> + if (ret <= 0) {
> + if (ret == 0) {
> + /* Orderly shutdown. Not necessary to print an error. */
> + DBG("Socket %d did an orderly shutdown", conn->sock->fd);
> + } else {
> + ERR("Socket %d error %d", conn->sock->fd, ret);
> + }
> + ret = -1;
> + goto end_stream_unlock;
> + }
> +
> + /* Write data to stream output fd. */
> + size_ret = lttng_write(stream->stream_fd->fd, data_buffer,
> + recv_size);
> + if (size_ret < recv_size) {
> + ERR("Relay error writing data to file");
> + ret = -1;
> + goto end_stream_unlock;
> + }
> +
> + DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64,
> + size_ret, stream->stream_handle);
> + }
>
> ret = write_padding_to_file(stream->stream_fd->fd,
> be32toh(data_hdr.padding_size));
> @@ -2380,7 +2379,6 @@ end_stream_unlock:
> uatomic_set(&session->new_streams, 1);
> pthread_mutex_unlock(&session->lock);
> }
> -end_stream_put:
> stream_put(stream);
> end:
> return ret;
> @@ -2698,7 +2696,6 @@ relay_connections_ht_error:
> DBG("Thread exited with error");
> }
> DBG("Worker thread cleanup complete");
> - free(data_buffer);
> error_testpoint:
> if (err) {
> health_error();
> --
> 2.1.4
>
--
Jérémie Galarneau
EfficiOS Inc.
http://www.efficios.com
More information about the lttng-dev
mailing list