[lttng-dev] [PATCH lttng-tools 3/3] Fix: relayd: file rotation and live read
Jérémie Galarneau
jeremie.galarneau at efficios.com
Sat Sep 5 12:14:03 EDT 2015
Merged, thanks!
Jérémie
On Thu, Sep 3, 2015 at 5:17 PM, Mathieu Desnoyers
<mathieu.desnoyers at efficios.com> wrote:
> Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> ---
> src/bin/lttng-relayd/Makefile.am | 3 +-
> src/bin/lttng-relayd/live.c | 89 +++++++-----------
> src/bin/lttng-relayd/main.c | 39 ++++----
> src/bin/lttng-relayd/stream.c | 8 ++
> src/bin/lttng-relayd/stream.h | 22 +++--
> src/bin/lttng-relayd/tracefile-array.c | 159 +++++++++++++++++++++++++++++++++
> src/bin/lttng-relayd/tracefile-array.h | 63 +++++++++++++
> src/bin/lttng-relayd/viewer-stream.c | 93 +++++++++----------
> src/bin/lttng-relayd/viewer-stream.h | 11 ++-
> 9 files changed, 349 insertions(+), 138 deletions(-)
> create mode 100644 src/bin/lttng-relayd/tracefile-array.c
> create mode 100644 src/bin/lttng-relayd/tracefile-array.h
>
> diff --git a/src/bin/lttng-relayd/Makefile.am b/src/bin/lttng-relayd/Makefile.am
> index 428f352..07eb732 100644
> --- a/src/bin/lttng-relayd/Makefile.am
> +++ b/src/bin/lttng-relayd/Makefile.am
> @@ -19,7 +19,8 @@ lttng_relayd_SOURCES = main.c lttng-relayd.h utils.h utils.c cmd.h \
> stream.c stream.h \
> stream-fd.c stream-fd.h \
> connection.c connection.h \
> - viewer-session.c viewer-session.h
> + viewer-session.c viewer-session.h \
> + tracefile-array.c tracefile-array.h
>
> # link on liblttngctl for check if relayd is already alive.
> lttng_relayd_LDADD = -lrt -lurcu-common -lurcu \
> diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c
> index eb57421..7cb1d48 100644
> --- a/src/bin/lttng-relayd/live.c
> +++ b/src/bin/lttng-relayd/live.c
> @@ -1130,7 +1130,7 @@ static int try_open_index(struct relay_viewer_stream *vstream,
> /*
> * First time, we open the index file and at least one index is ready.
> */
> - if (rstream->total_index_received == 0) {
> + if (rstream->index_received_seqcount == 0) {
> ret = -ENOENT;
> goto end;
> }
> @@ -1172,14 +1172,14 @@ static int check_index_status(struct relay_viewer_stream *vstream,
> int ret;
>
> if (trace->session->connection_closed
> - && rstream->total_index_received
> - == vstream->last_sent_index) {
> + && rstream->index_received_seqcount
> + == vstream->index_sent_seqcount) {
> /* Last index sent and session connection is closed. */
> index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
> goto hup;
> } else if (rstream->beacon_ts_end != -1ULL &&
> - rstream->total_index_received
> - == vstream->last_sent_index) {
> + rstream->index_received_seqcount
> + == vstream->index_sent_seqcount) {
> /*
> * We've received a synchronization beacon and the last index
> * available has been sent, the index for now is inactive.
> @@ -1193,21 +1193,24 @@ static int check_index_status(struct relay_viewer_stream *vstream,
> index->timestamp_end = htobe64(rstream->beacon_ts_end);
> index->stream_id = htobe64(rstream->ctf_stream_id);
> goto index_ready;
> - } else if (rstream->total_index_received <= vstream->last_sent_index) {
> + } else if (rstream->index_received_seqcount
> + == vstream->index_sent_seqcount) {
> /*
> - * This actually checks the case where recv == last_sent.
> - * In this case, we have not received a beacon. Therefore, we
> - * can only ask the client to retry later.
> + * This checks whether received == sent seqcount. In
> + * this case, we have not received a beacon. Therefore,
> + * we can only ask the client to retry later.
> */
> index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
> goto index_ready;
> - } else if (!viewer_stream_is_tracefile_seq_readable(vstream,
> - vstream->current_tracefile_seq)) {
> + } else if (!tracefile_array_seq_in_file(rstream->tfa,
> + vstream->current_tracefile_id,
> + vstream->index_sent_seqcount)) {
> /*
> - * The producer has overwritten our current file. We
> - * need to rotate.
> + * The next index we want to send cannot be read either
> + * because we need to perform a rotation, or due to
> + * the producer having overwritten its trace file.
> */
> - DBG("Viewer stream %" PRIu64 " rotation due to overwrite",
> + DBG("Viewer stream %" PRIu64 " rotation",
> vstream->stream->stream_handle);
> ret = viewer_stream_rotate(vstream);
> if (ret < 0) {
> @@ -1217,50 +1220,22 @@ static int check_index_status(struct relay_viewer_stream *vstream,
> index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
> goto hup;
> }
> - assert(viewer_stream_is_tracefile_seq_readable(vstream,
> - vstream->current_tracefile_seq));
> - /* ret == 0 means successful so we continue. */
> - ret = 0;
> - } else {
> - ssize_t read_ret;
> - char tmp[1];
> -
> /*
> - * Use EOF on current index file to find out when we
> - * need to rotate.
> + * If we have been pushed due to overwrite, it
> + * necessarily means there is data that can be read in
> + * the stream. If we rotated because we reached the end
> + * of a tracefile, it means the following tracefile
> + * needs to contain at least one index, else we would
> + * have already returned LTTNG_VIEWER_INDEX_RETRY to the
> + * viewer. The updated index_sent_seqcount needs to
> + * point to a readable index entry now.
> */
> - read_ret = lttng_read(vstream->index_fd->fd, tmp, 1);
> - if (read_ret == 1) {
> - off_t seek_ret;
> -
> - /* There is still data to read. Rewind position. */
> - seek_ret = lseek(vstream->index_fd->fd, -1, SEEK_CUR);
> - if (seek_ret < 0) {
> - ret = -1;
> - goto end;
> - }
> - ret = 0;
> - } else if (read_ret == 0) {
> - /* EOF. We need to rotate. */
> - DBG("Viewer stream %" PRIu64 " rotation due to EOF",
> - vstream->stream->stream_handle);
> - ret = viewer_stream_rotate(vstream);
> - if (ret < 0) {
> - goto end;
> - } else if (ret == 1) {
> - /* EOF across entire stream. */
> - index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
> - goto hup;
> - }
> - assert(viewer_stream_is_tracefile_seq_readable(vstream,
> - vstream->current_tracefile_seq));
> - /* ret == 0 means successful so we continue. */
> - ret = 0;
> - } else {
> - /* Error reading index. */
> - ret = -1;
> - }
> + assert(tracefile_array_seq_in_file(rstream->tfa,
> + vstream->current_tracefile_id,
> + vstream->index_sent_seqcount));
> }
> + /* ret == 0 means successful so we continue. */
> + ret = 0;
> end:
> return ret;
>
> @@ -1409,7 +1384,7 @@ int viewer_get_next_index(struct relay_connection *conn)
> goto send_reply;
> } else {
> viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_OK);
> - vstream->last_sent_index++;
> + vstream->index_sent_seqcount++;
> }
>
> /*
> @@ -1456,7 +1431,7 @@ send_reply:
>
> if (vstream) {
> DBG("Index %" PRIu64 " for stream %" PRIu64 " sent",
> - vstream->last_sent_index,
> + vstream->index_sent_seqcount,
> vstream->stream->stream_handle);
> }
> end:
> diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c
> index adb044f..7b385b4 100644
> --- a/src/bin/lttng-relayd/main.c
> +++ b/src/bin/lttng-relayd/main.c
> @@ -71,6 +71,7 @@
> #include "session.h"
> #include "stream.h"
> #include "connection.h"
> +#include "tracefile-array.h"
>
> /* command line options */
> char *opt_output_path;
> @@ -1890,7 +1891,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
> * Only flag a stream inactive when it has already
> * received data and no indexes are in flight.
> */
> - if (stream->total_index_received > 0
> + if (stream->index_received_seqcount > 0
> && stream->indexes_in_flight == 0) {
> stream->beacon_ts_end =
> be64toh(index_info.timestamp_end);
> @@ -1918,7 +1919,8 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
> }
> ret = relay_index_try_flush(index);
> if (ret == 0) {
> - stream->total_index_received++;
> + tracefile_array_commit_seq(stream->tfa);
> + stream->index_received_seqcount++;
> } else if (ret > 0) {
> /* no flush. */
> ret = 0;
> @@ -2091,7 +2093,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
>
> fd = index_create_file(stream->path_name, stream->channel_name,
> -1, -1, stream->tracefile_size,
> - stream->current_tracefile_id);
> + tracefile_array_get_file_index_head(stream->tfa));
> if (fd < 0) {
> ret = -1;
> /* Put self-ref for this index due to error. */
> @@ -2120,7 +2122,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
>
> ret = relay_index_try_flush(index);
> if (ret == 0) {
> - stream->total_index_received++;
> + tracefile_array_commit_seq(stream->tfa);
> + stream->index_received_seqcount++;
> } else if (ret > 0) {
> /* No flush. */
> ret = 0;
> @@ -2204,35 +2207,23 @@ static int relay_process_data(struct relay_connection *conn)
> if (stream->tracefile_size > 0 &&
> (stream->tracefile_size_current + data_size) >
> stream->tracefile_size) {
> - uint64_t new_id;
> + uint64_t old_id, new_id;
> +
> + old_id = tracefile_array_get_file_index_head(stream->tfa);
> + tracefile_array_file_rotate(stream->tfa);
> +
> + /* new_id is updated by utils_rotate_stream_file. */
> + new_id = old_id;
>
> - new_id = (stream->current_tracefile_id + 1) %
> - stream->tracefile_count;
> - /*
> - * Move viewer oldest available data position forward if
> - * we are overwriting a tracefile.
> - */
> - if (new_id == stream->oldest_tracefile_id) {
> - stream->oldest_tracefile_id =
> - (stream->oldest_tracefile_id + 1) %
> - stream->tracefile_count;
> - }
> ret = utils_rotate_stream_file(stream->path_name,
> stream->channel_name, stream->tracefile_size,
> stream->tracefile_count, -1,
> -1, stream->stream_fd->fd,
> - &stream->current_tracefile_id,
> - &stream->stream_fd->fd);
> + &new_id, &stream->stream_fd->fd);
> if (ret < 0) {
> ERR("Rotating stream output file");
> goto end_stream_unlock;
> }
> - stream->current_tracefile_seq++;
> - if (stream->current_tracefile_seq
> - - stream->oldest_tracefile_seq >=
> - stream->tracefile_count) {
> - stream->oldest_tracefile_seq++;
> - }
> /*
> * Reset current size because we just performed a stream
> * rotation.
> diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c
> index 9fd9dce..870b75a 100644
> --- a/src/bin/lttng-relayd/stream.c
> +++ b/src/bin/lttng-relayd/stream.c
> @@ -137,6 +137,11 @@ struct relay_stream *stream_create(struct ctf_trace *trace,
> ret = -1;
> goto end;
> }
> + stream->tfa = tracefile_array_create(stream->tracefile_count);
> + if (!stream->tfa) {
> + ret = -1;
> + goto end;
> + }
> if (stream->tracefile_size) {
> DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name);
> } else {
> @@ -241,6 +246,9 @@ static void stream_destroy(struct relay_stream *stream)
> if (stream->indexes_ht) {
> lttng_ht_destroy(stream->indexes_ht);
> }
> + if (stream->tfa) {
> + tracefile_array_destroy(stream->tfa);
> + }
> free(stream->path_name);
> free(stream->channel_name);
> free(stream);
> diff --git a/src/bin/lttng-relayd/stream.h b/src/bin/lttng-relayd/stream.h
> index 7e2b133..419111c 100644
> --- a/src/bin/lttng-relayd/stream.h
> +++ b/src/bin/lttng-relayd/stream.h
> @@ -29,6 +29,7 @@
>
> #include "session.h"
> #include "stream-fd.h"
> +#include "tracefile-array.h"
>
> /*
> * Represents a stream in the relay
> @@ -67,15 +68,22 @@ struct relay_stream {
> uint64_t tracefile_size;
> uint64_t tracefile_size_current;
> uint64_t tracefile_count;
> - uint64_t current_tracefile_id;
>
> - uint64_t current_tracefile_seq; /* Free-running counter. */
> - uint64_t oldest_tracefile_seq; /* Free-running counter. */
> -
> - /* To inform the viewer up to where it can go back in time. */
> - uint64_t oldest_tracefile_id;
> + /*
> + * Counts the number of received indexes. The "tag" associated
> + * with an index is taken before incrementing this seqcount.
> + * Therefore, the sequence tag associated with the last index
> + * received is always index_received_seqcount - 1.
> + */
> + uint64_t index_received_seqcount;
>
> - uint64_t total_index_received;
> + /*
> + * Tracefile array is an index of the stream trace files,
> + * indexed by position. It allows keeping track of the oldest
> + * available indexes when overwriting trace files in tracefile
> + * rotation. It is left NULL when tracefile rotation is unused.
> + */
> + struct tracefile_array *tfa;
>
> bool closed; /* Stream is closed. */
>
> diff --git a/src/bin/lttng-relayd/tracefile-array.c b/src/bin/lttng-relayd/tracefile-array.c
> new file mode 100644
> index 0000000..7ab1f8e
> --- /dev/null
> +++ b/src/bin/lttng-relayd/tracefile-array.c
> @@ -0,0 +1,159 @@
> +/*
> + * Copyright (C) 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> + *
> + * This program is free software; you can redistribute it and/or modify it
> + * under the terms of the GNU General Public License, version 2 only, as
> + * published by the Free Software Foundation.
> + *
> + * This program is distributed in the hope that it will be useful, but WITHOUT
> + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
> + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
> + * more details.
> + *
> + * You should have received a copy of the GNU General Public License along with
> + * this program; if not, write to the Free Software Foundation, Inc., 51
> + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
> + */
> +
> +#define _GNU_SOURCE
> +#define _LGPL_SOURCE
> +#include <assert.h>
> +#include <common/common.h>
> +#include <common/utils.h>
> +#include <common/defaults.h>
> +
> +#include "tracefile-array.h"
> +
> +struct tracefile_array *tracefile_array_create(size_t count)
> +{
> + struct tracefile_array *tfa = NULL;
> + int i;
> +
> + tfa = zmalloc(sizeof(*tfa));
> + if (!tfa) {
> + goto error;
> + }
> + tfa->tf = zmalloc(sizeof(*tfa->tf) * count);
> + if (!tfa->tf) {
> + goto error;
> + }
> + tfa->count = count;
> + for (i = 0; i < count; i++) {
> + tfa->tf[i].seq_head = -1ULL;
> + tfa->tf[i].seq_tail = -1ULL;
> + }
> + tfa->seq_head = -1ULL;
> + tfa->seq_tail = -1ULL;
> + return tfa;
> +
> +error:
> + if (tfa) {
> + free(tfa->tf);
> + }
> + free(tfa);
> + return NULL;
> +}
> +
> +void tracefile_array_destroy(struct tracefile_array *tfa)
> +{
> + if (!tfa) {
> + return;
> + }
> + free(tfa->tf);
> + free(tfa);
> +}
> +
> +void tracefile_array_file_rotate(struct tracefile_array *tfa)
> +{
> + uint64_t *headp, *tailp;
> +
> + if (tfa->count <= 1) {
> + return;
> + }
> + /* Rotate to next file. */
> + tfa->file_head = (tfa->file_head + 1) % tfa->count;
> + if (tfa->file_head == tfa->file_tail) {
> + /* Move tail. */
> + tfa->file_tail = (tfa->file_tail + 1) % tfa->count;
> + }
> + headp = &tfa->tf[tfa->file_head].seq_head;
> + tailp = &tfa->tf[tfa->file_head].seq_tail;
> + /*
> + * If we overwrite a file with content, we need to push the tail
> + * to the position following the content we are overwriting.
> + */
> + if (*headp != -1ULL) {
> + tfa->seq_tail = tfa->tf[tfa->file_tail].seq_tail;
> + }
> + /* Reset this file head/tail (overwrite). */
> + *headp = -1ULL;
> + *tailp = -1ULL;
> +}
> +
> +void tracefile_array_commit_seq(struct tracefile_array *tfa)
> +{
> + uint64_t *headp, *tailp;
> +
> + /* Increment overall head. */
> + tfa->seq_head++;
> + /* If we are committing our first index overall, set tail to 0. */
> + if (tfa->seq_tail == -1ULL) {
> + tfa->seq_tail = 0;
> + }
> + if (tfa->count <= 1) {
> + return;
> + }
> + headp = &tfa->tf[tfa->file_head].seq_head;
> + tailp = &tfa->tf[tfa->file_head].seq_tail;
> + /* Update head tracefile seq_head. */
> + *headp = tfa->seq_head;
> + /*
> + * If we are committing our first index in this packet, set tail
> + * to this index seq count.
> + */
> + if (*tailp == -1ULL) {
> + *tailp = tfa->seq_head;
> + }
> +}
> +
> +uint64_t tracefile_array_get_file_index_head(struct tracefile_array *tfa)
> +{
> + return tfa->file_head;
> +}
> +
> +uint64_t tracefile_array_get_seq_head(struct tracefile_array *tfa)
> +{
> + return tfa->seq_head;
> +}
> +
> +uint64_t tracefile_array_get_file_index_tail(struct tracefile_array *tfa)
> +{
> + return tfa->file_tail;
> +}
> +
> +uint64_t tracefile_array_get_seq_tail(struct tracefile_array *tfa)
> +{
> + return tfa->seq_tail;
> +}
> +
> +bool tracefile_array_seq_in_file(struct tracefile_array *tfa,
> + uint64_t file_index, uint64_t seq)
> +{
> + if (tfa->count <= 1) {
> + /*
> + * With a single file, we are guaranteed to have the
> + * index in this file.
> + */
> + return true;
> + }
> + assert(file_index < tfa->count);
> + if (seq == -1ULL) {
> + return false;
> + }
> + if (seq >= tfa->tf[file_index].seq_tail
> + && seq <= tfa->tf[file_index].seq_head) {
> + return true;
> + } else {
> + return false;
> + }
> +}
> diff --git a/src/bin/lttng-relayd/tracefile-array.h b/src/bin/lttng-relayd/tracefile-array.h
> new file mode 100644
> index 0000000..c947078
> --- /dev/null
> +++ b/src/bin/lttng-relayd/tracefile-array.h
> @@ -0,0 +1,63 @@
> +#ifndef _TRACEFILE_ARRAY_H
> +#define _TRACEFILE_ARRAY_H
> +
> +/*
> + * Copyright (C) 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> + *
> + * This program is free software; you can redistribute it and/or modify it
> + * under the terms of the GNU General Public License, version 2 only, as
> + * published by the Free Software Foundation.
> + *
> + * This program is distributed in the hope that it will be useful, but WITHOUT
> + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
> + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
> + * more details.
> + *
> + * You should have received a copy of the GNU General Public License along with
> + * this program; if not, write to the Free Software Foundation, Inc., 51
> + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
> + */
> +
> +#include <limits.h>
> +#include <inttypes.h>
> +#include <pthread.h>
> +#include <stdbool.h>
> +
> +struct tracefile {
> + /* Per-tracefile head/tail seq. */
> + uint64_t seq_head; /* Newest seqcount. Inclusive. */
> + uint64_t seq_tail; /* Oldest seqcount. Inclusive. */
> +};
> +
> +/*
> + * Represents an array of trace files in a stream.
> + */
> +struct tracefile_array {
> + struct tracefile *tf;
> + size_t count;
> +
> + /* Current head/tail files. */
> + uint64_t file_head;
> + uint64_t file_tail;
> +
> + /* Overall head/tail seq for the entire array. Inclusive. */
> + uint64_t seq_head;
> + uint64_t seq_tail;
> +};
> +
> +struct tracefile_array *tracefile_array_create(size_t count);
> +void tracefile_array_destroy(struct tracefile_array *tfa);
> +
> +void tracefile_array_file_rotate(struct tracefile_array *tfa);
> +void tracefile_array_commit_seq(struct tracefile_array *tfa);
> +
> +uint64_t tracefile_array_get_file_index_head(struct tracefile_array *tfa);
> +uint64_t tracefile_array_get_seq_head(struct tracefile_array *tfa);
> +
> +uint64_t tracefile_array_get_file_index_tail(struct tracefile_array *tfa);
> +uint64_t tracefile_array_get_seq_tail(struct tracefile_array *tfa);
> +
> +bool tracefile_array_seq_in_file(struct tracefile_array *tfa,
> + uint64_t file_index, uint64_t seq);
> +
> +#endif /* _STREAM_H */
> diff --git a/src/bin/lttng-relayd/viewer-stream.c b/src/bin/lttng-relayd/viewer-stream.c
> index 1d02ee3..31b0257 100644
> --- a/src/bin/lttng-relayd/viewer-stream.c
> +++ b/src/bin/lttng-relayd/viewer-stream.c
> @@ -63,29 +63,45 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
> goto error;
> }
>
> + if (!stream_get(stream)) {
> + ERR("Cannot get stream");
> + goto error;
> + }
> + vstream->stream = stream;
> +
> + pthread_mutex_lock(&stream->lock);
> +
> + if (stream->is_metadata && stream->trace->viewer_metadata_stream) {
> + ERR("Cannot attach viewer metadata stream to trace (busy).");
> + goto error_unlock;
> + }
> +
> switch (seek_t) {
> case LTTNG_VIEWER_SEEK_BEGINNING:
> - vstream->current_tracefile_id = stream->oldest_tracefile_id;
> + vstream->current_tracefile_id =
> + tracefile_array_get_file_index_tail(stream->tfa);
> + vstream->index_sent_seqcount =
> + tracefile_array_get_seq_tail(stream->tfa);
> break;
> case LTTNG_VIEWER_SEEK_LAST:
> - vstream->current_tracefile_id = stream->current_tracefile_id;
> + vstream->current_tracefile_id =
> + tracefile_array_get_file_index_head(stream->tfa);
> + /*
> + * We seek at the very end of each stream, awaiting for
> + * a future packet to eventually come in.
> + */
> + vstream->index_sent_seqcount =
> + tracefile_array_get_seq_head(stream->tfa) + 1;
> break;
> default:
> - goto error;
> - }
> - if (!stream_get(stream)) {
> - ERR("Cannot get stream");
> - goto error;
> + goto error_unlock;
> }
> - vstream->stream = stream;
>
> - pthread_mutex_lock(&stream->lock);
> /*
> - * If we never received an index for the current stream, delay the opening
> - * of the index, otherwise open it right now.
> + * If we never received an index for the current stream, delay
> + * the opening of the index, otherwise open it right now.
> */
> - if (vstream->current_tracefile_id == stream->current_tracefile_id
> - && stream->total_index_received == 0) {
> + if (stream->index_received_seqcount == 0) {
> vstream->index_fd = NULL;
> } else {
> int read_fd;
> @@ -112,14 +128,12 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
> if (lseek_ret < 0) {
> goto error_unlock;
> }
> - vstream->last_sent_index = stream->total_index_received;
> }
> - pthread_mutex_unlock(&stream->lock);
> -
> if (stream->is_metadata) {
> rcu_assign_pointer(stream->trace->viewer_metadata_stream,
> vstream);
> }
> + pthread_mutex_unlock(&stream->lock);
>
> /* Globally visible after the add unique. */
> lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle);
> @@ -227,26 +241,6 @@ void viewer_stream_put(struct relay_viewer_stream *vstream)
> }
>
> /*
> - * Returns whether the current tracefile is readable. If not, it has
> - * been overwritten.
> - * Must be called with rstream lock held.
> - */
> -bool viewer_stream_is_tracefile_seq_readable(struct relay_viewer_stream *vstream,
> - uint64_t seq)
> -{
> - struct relay_stream *stream = vstream->stream;
> -
> - if (seq >= stream->oldest_tracefile_seq
> - && seq <= stream->current_tracefile_seq) {
> - /* seq is a readable file. */
> - return true;
> - } else {
> - /* seq is not readable. */
> - return false;
> - }
> -}
> -
> -/*
> * Rotate a stream to the next tracefile.
> *
> * Must be called with the rstream lock held.
> @@ -256,9 +250,11 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream)
> {
> int ret;
> struct relay_stream *stream = vstream->stream;
> + uint64_t new_id;
>
> /* Detect the last tracefile to open. */
> - if (stream->total_index_received == vstream->last_sent_index
> + if (stream->index_received_seqcount
> + == vstream->index_sent_seqcount
> && stream->trace->session->connection_closed) {
> ret = 1;
> goto end;
> @@ -270,17 +266,22 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream)
> goto end;
> }
>
> - if (!viewer_stream_is_tracefile_seq_readable(vstream,
> - vstream->current_tracefile_seq + 1)) {
> - vstream->current_tracefile_id =
> - stream->oldest_tracefile_id;
> - vstream->current_tracefile_seq =
> - stream->oldest_tracefile_seq;
> + /*
> + * Try to move to the next file.
> + */
> + new_id = (vstream->current_tracefile_id + 1)
> + % stream->tracefile_count;
> + if (tracefile_array_seq_in_file(stream->tfa, new_id,
> + vstream->index_sent_seqcount)) {
> + vstream->current_tracefile_id = new_id;
> } else {
> + /*
> + * We need to resync because we lag behind tail.
> + */
> vstream->current_tracefile_id =
> - (vstream->current_tracefile_id + 1)
> - % stream->tracefile_count;
> - vstream->current_tracefile_seq++;
> + tracefile_array_get_file_index_tail(stream->tfa);
> + vstream->index_sent_seqcount =
> + tracefile_array_get_seq_tail(stream->tfa);
> }
>
> if (vstream->index_fd) {
> diff --git a/src/bin/lttng-relayd/viewer-stream.h b/src/bin/lttng-relayd/viewer-stream.h
> index cc46db4..5dc135d 100644
> --- a/src/bin/lttng-relayd/viewer-stream.h
> +++ b/src/bin/lttng-relayd/viewer-stream.h
> @@ -59,10 +59,15 @@ struct relay_viewer_stream {
> char *channel_name;
>
> uint64_t current_tracefile_id;
> - /* Free-running counter. */
> - uint64_t current_tracefile_seq;
>
> - uint64_t last_sent_index;
> + /*
> + * Counts the number of sent indexes. The "tag" associated
> + * with an index to send is the current index_received_seqcount,
> + * because we increment index_received_seqcount after sending
> + * each index. This index_received_seqcount counter can also be
> + * updated when catching up with the producer.
> + */
> + uint64_t index_sent_seqcount;
>
> /* Indicates if this stream has been sent to a viewer client. */
> bool sent_flag;
> --
> 2.1.4
>
--
Jérémie Galarneau
EfficiOS Inc.
http://www.efficios.com
More information about the lttng-dev
mailing list