[lttng-dev] [PATCH lttng-tools 3/3] Fix: relayd: file rotation and live read

Mathieu Desnoyers mathieu.desnoyers at efficios.com
Thu Sep 3 17:17:30 EDT 2015


Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
---
 src/bin/lttng-relayd/Makefile.am       |   3 +-
 src/bin/lttng-relayd/live.c            |  89 +++++++-----------
 src/bin/lttng-relayd/main.c            |  39 ++++----
 src/bin/lttng-relayd/stream.c          |   8 ++
 src/bin/lttng-relayd/stream.h          |  22 +++--
 src/bin/lttng-relayd/tracefile-array.c | 159 +++++++++++++++++++++++++++++++++
 src/bin/lttng-relayd/tracefile-array.h |  63 +++++++++++++
 src/bin/lttng-relayd/viewer-stream.c   |  93 +++++++++----------
 src/bin/lttng-relayd/viewer-stream.h   |  11 ++-
 9 files changed, 349 insertions(+), 138 deletions(-)
 create mode 100644 src/bin/lttng-relayd/tracefile-array.c
 create mode 100644 src/bin/lttng-relayd/tracefile-array.h

diff --git a/src/bin/lttng-relayd/Makefile.am b/src/bin/lttng-relayd/Makefile.am
index 428f352..07eb732 100644
--- a/src/bin/lttng-relayd/Makefile.am
+++ b/src/bin/lttng-relayd/Makefile.am
@@ -19,7 +19,8 @@ lttng_relayd_SOURCES = main.c lttng-relayd.h utils.h utils.c cmd.h \
                        stream.c stream.h \
                        stream-fd.c stream-fd.h \
                        connection.c connection.h \
-                       viewer-session.c viewer-session.h
+                       viewer-session.c viewer-session.h \
+                       tracefile-array.c tracefile-array.h
 
 # link on liblttngctl for check if relayd is already alive.
 lttng_relayd_LDADD = -lrt -lurcu-common -lurcu \
diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c
index eb57421..7cb1d48 100644
--- a/src/bin/lttng-relayd/live.c
+++ b/src/bin/lttng-relayd/live.c
@@ -1130,7 +1130,7 @@ static int try_open_index(struct relay_viewer_stream *vstream,
 	/*
 	 * First time, we open the index file and at least one index is ready.
 	 */
-	if (rstream->total_index_received == 0) {
+	if (rstream->index_received_seqcount == 0) {
 		ret = -ENOENT;
 		goto end;
 	}
@@ -1172,14 +1172,14 @@ static int check_index_status(struct relay_viewer_stream *vstream,
 	int ret;
 
 	if (trace->session->connection_closed
-			&& rstream->total_index_received
-				== vstream->last_sent_index) {
+			&& rstream->index_received_seqcount
+				== vstream->index_sent_seqcount) {
 		/* Last index sent and session connection is closed. */
 		index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
 		goto hup;
 	} else if (rstream->beacon_ts_end != -1ULL &&
-			rstream->total_index_received
-				== vstream->last_sent_index) {
+			rstream->index_received_seqcount
+				== vstream->index_sent_seqcount) {
 		/*
 		 * We've received a synchronization beacon and the last index
 		 * available has been sent, the index for now is inactive.
@@ -1193,21 +1193,24 @@ static int check_index_status(struct relay_viewer_stream *vstream,
 		index->timestamp_end = htobe64(rstream->beacon_ts_end);
 		index->stream_id = htobe64(rstream->ctf_stream_id);
 		goto index_ready;
-	} else if (rstream->total_index_received <= vstream->last_sent_index) {
+	} else if (rstream->index_received_seqcount
+			== vstream->index_sent_seqcount) {
 		/*
-		 * This actually checks the case where recv == last_sent.
-		 * In this case, we have not received a beacon. Therefore, we
-		 * can only ask the client to retry later.
+		 * This checks whether received == sent seqcount. In
+		 * this case, we have not received a beacon. Therefore,
+		 * we can only ask the client to retry later.
 		 */
 		index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
 		goto index_ready;
-	} else if (!viewer_stream_is_tracefile_seq_readable(vstream,
-			vstream->current_tracefile_seq)) {
+	} else if (!tracefile_array_seq_in_file(rstream->tfa,
+			vstream->current_tracefile_id,
+			vstream->index_sent_seqcount)) {
 		/*
-		 * The producer has overwritten our current file. We
-		 * need to rotate.
+		 * The next index we want to send cannot be read either
+		 * because we need to perform a rotation, or due to
+		 * the producer having overwritten its trace file.
 		 */
-		DBG("Viewer stream %" PRIu64 " rotation due to overwrite",
+		DBG("Viewer stream %" PRIu64 " rotation",
 				vstream->stream->stream_handle);
 		ret = viewer_stream_rotate(vstream);
 		if (ret < 0) {
@@ -1217,50 +1220,22 @@ static int check_index_status(struct relay_viewer_stream *vstream,
 			index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
 			goto hup;
 		}
-		assert(viewer_stream_is_tracefile_seq_readable(vstream,
-			vstream->current_tracefile_seq));
-		/* ret == 0 means successful so we continue. */
-		ret = 0;
-	} else {
-		ssize_t read_ret;
-		char tmp[1];
-
 		/*
-		 * Use EOF on current index file to find out when we
-		 * need to rotate.
+		 * If we have been pushed due to overwrite, it
+		 * necessarily means there is data that can be read in
+		 * the stream. If we rotated because we reached the end
+		 * of a tracefile, it means the following tracefile
+		 * needs to contain at least one index, else we would
+		 * have already returned LTTNG_VIEWER_INDEX_RETRY to the
+		 * viewer. The updated index_sent_seqcount needs to
+		 * point to a readable index entry now.
 		 */
-		read_ret = lttng_read(vstream->index_fd->fd, tmp, 1);
-		if (read_ret == 1) {
-			off_t seek_ret;
-
-			/* There is still data to read. Rewind position. */
-			seek_ret = lseek(vstream->index_fd->fd, -1, SEEK_CUR);
-			if (seek_ret < 0) {
-				ret = -1;
-				goto end;
-			}
-			ret = 0;
-		} else if (read_ret == 0) {
-			/* EOF. We need to rotate. */
-			DBG("Viewer stream %" PRIu64 " rotation due to EOF",
-					vstream->stream->stream_handle);
-			ret = viewer_stream_rotate(vstream);
-			if (ret < 0) {
-				goto end;
-			} else if (ret == 1) {
-				/* EOF across entire stream. */
-				index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
-				goto hup;
-			}
-			assert(viewer_stream_is_tracefile_seq_readable(vstream,
-				vstream->current_tracefile_seq));
-			/* ret == 0 means successful so we continue. */
-			ret = 0;
-		} else {
-			/* Error reading index. */
-			ret = -1;
-		}
+		assert(tracefile_array_seq_in_file(rstream->tfa,
+			vstream->current_tracefile_id,
+			vstream->index_sent_seqcount));
 	}
+	/* ret == 0 means successful so we continue. */
+	ret = 0;
 end:
 	return ret;
 
@@ -1409,7 +1384,7 @@ int viewer_get_next_index(struct relay_connection *conn)
 		goto send_reply;
 	} else {
 		viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_OK);
-		vstream->last_sent_index++;
+		vstream->index_sent_seqcount++;
 	}
 
 	/*
@@ -1456,7 +1431,7 @@ send_reply:
 
 	if (vstream) {
 		DBG("Index %" PRIu64 " for stream %" PRIu64 " sent",
-				vstream->last_sent_index,
+				vstream->index_sent_seqcount,
 				vstream->stream->stream_handle);
 	}
 end:
diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c
index adb044f..7b385b4 100644
--- a/src/bin/lttng-relayd/main.c
+++ b/src/bin/lttng-relayd/main.c
@@ -71,6 +71,7 @@
 #include "session.h"
 #include "stream.h"
 #include "connection.h"
+#include "tracefile-array.h"
 
 /* command line options */
 char *opt_output_path;
@@ -1890,7 +1891,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
 		 * Only flag a stream inactive when it has already
 		 * received data and no indexes are in flight.
 		 */
-		if (stream->total_index_received > 0
+		if (stream->index_received_seqcount > 0
 				&& stream->indexes_in_flight == 0) {
 			stream->beacon_ts_end =
 				be64toh(index_info.timestamp_end);
@@ -1918,7 +1919,8 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
 	}
 	ret = relay_index_try_flush(index);
 	if (ret == 0) {
-		stream->total_index_received++;
+		tracefile_array_commit_seq(stream->tfa);
+		stream->index_received_seqcount++;
 	} else if (ret > 0) {
 		/* no flush. */
 		ret = 0;
@@ -2091,7 +2093,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
 
 		fd = index_create_file(stream->path_name, stream->channel_name,
 			        -1, -1, stream->tracefile_size,
-				stream->current_tracefile_id);
+				tracefile_array_get_file_index_head(stream->tfa));
 		if (fd < 0) {
 			ret = -1;
 			/* Put self-ref for this index due to error. */
@@ -2120,7 +2122,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
 
 	ret = relay_index_try_flush(index);
 	if (ret == 0) {
-		stream->total_index_received++;
+		tracefile_array_commit_seq(stream->tfa);
+		stream->index_received_seqcount++;
 	} else if (ret > 0) {
 		/* No flush. */
 		ret = 0;
@@ -2204,35 +2207,23 @@ static int relay_process_data(struct relay_connection *conn)
 	if (stream->tracefile_size > 0 &&
 			(stream->tracefile_size_current + data_size) >
 			stream->tracefile_size) {
-		uint64_t new_id;
+		uint64_t old_id, new_id;
+
+		old_id = tracefile_array_get_file_index_head(stream->tfa);
+		tracefile_array_file_rotate(stream->tfa);
+
+		/* new_id is updated by utils_rotate_stream_file. */
+		new_id = old_id;
 
-		new_id = (stream->current_tracefile_id + 1) %
-			stream->tracefile_count;
-		/*
-		 * Move viewer oldest available data position forward if
-		 * we are overwriting a tracefile.
-		 */
-		if (new_id == stream->oldest_tracefile_id) {
-			stream->oldest_tracefile_id =
-				(stream->oldest_tracefile_id + 1) %
-				stream->tracefile_count;
-		}
 		ret = utils_rotate_stream_file(stream->path_name,
 				stream->channel_name, stream->tracefile_size,
 				stream->tracefile_count, -1,
 			        -1, stream->stream_fd->fd,
-				&stream->current_tracefile_id,
-				&stream->stream_fd->fd);
+				&new_id, &stream->stream_fd->fd);
 		if (ret < 0) {
 			ERR("Rotating stream output file");
 			goto end_stream_unlock;
 		}
-		stream->current_tracefile_seq++;
-		if (stream->current_tracefile_seq
-			- stream->oldest_tracefile_seq >=
-				stream->tracefile_count) {
-			stream->oldest_tracefile_seq++;
-		}
 		/*
 		 * Reset current size because we just performed a stream
 		 * rotation.
diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c
index 9fd9dce..870b75a 100644
--- a/src/bin/lttng-relayd/stream.c
+++ b/src/bin/lttng-relayd/stream.c
@@ -137,6 +137,11 @@ struct relay_stream *stream_create(struct ctf_trace *trace,
 		ret = -1;
 		goto end;
 	}
+	stream->tfa = tracefile_array_create(stream->tracefile_count);
+	if (!stream->tfa) {
+		ret = -1;
+		goto end;
+	}
 	if (stream->tracefile_size) {
 		DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name);
 	} else {
@@ -241,6 +246,9 @@ static void stream_destroy(struct relay_stream *stream)
 	if (stream->indexes_ht) {
 		lttng_ht_destroy(stream->indexes_ht);
 	}
+	if (stream->tfa) {
+		tracefile_array_destroy(stream->tfa);
+	}
 	free(stream->path_name);
 	free(stream->channel_name);
 	free(stream);
diff --git a/src/bin/lttng-relayd/stream.h b/src/bin/lttng-relayd/stream.h
index 7e2b133..419111c 100644
--- a/src/bin/lttng-relayd/stream.h
+++ b/src/bin/lttng-relayd/stream.h
@@ -29,6 +29,7 @@
 
 #include "session.h"
 #include "stream-fd.h"
+#include "tracefile-array.h"
 
 /*
  * Represents a stream in the relay
@@ -67,15 +68,22 @@ struct relay_stream {
 	uint64_t tracefile_size;
 	uint64_t tracefile_size_current;
 	uint64_t tracefile_count;
-	uint64_t current_tracefile_id;
 
-	uint64_t current_tracefile_seq;	/* Free-running counter. */
-	uint64_t oldest_tracefile_seq;	/* Free-running counter. */
-
-	/* To inform the viewer up to where it can go back in time. */
-	uint64_t oldest_tracefile_id;
+	/*
+	 * Counts the number of received indexes. The "tag" associated
+	 * with an index is taken before incrementing this seqcount.
+	 * Therefore, the sequence tag associated with the last index
+	 * received is always index_received_seqcount - 1.
+	 */
+	uint64_t index_received_seqcount;
 
-	uint64_t total_index_received;
+	/*
+	 * Tracefile array is an index of the stream trace files,
+	 * indexed by position. It allows keeping track of the oldest
+	 * available indexes when overwriting trace files in tracefile
+	 * rotation. It is left NULL when tracefile rotation is unused.
+	 */
+	struct tracefile_array *tfa;
 
 	bool closed;	/* Stream is closed. */
 
diff --git a/src/bin/lttng-relayd/tracefile-array.c b/src/bin/lttng-relayd/tracefile-array.c
new file mode 100644
index 0000000..7ab1f8e
--- /dev/null
+++ b/src/bin/lttng-relayd/tracefile-array.c
@@ -0,0 +1,159 @@
+/*
+ * Copyright (C) 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#define _GNU_SOURCE
+#define _LGPL_SOURCE
+#include <assert.h>
+#include <common/common.h>
+#include <common/utils.h>
+#include <common/defaults.h>
+
+#include "tracefile-array.h"
+
+struct tracefile_array *tracefile_array_create(size_t count)
+{
+	struct tracefile_array *tfa = NULL;
+	int i;
+
+	tfa = zmalloc(sizeof(*tfa));
+	if (!tfa) {
+		goto error;
+	}
+	tfa->tf = zmalloc(sizeof(*tfa->tf) * count);
+	if (!tfa->tf) {
+		goto error;
+	}
+	tfa->count = count;
+	for (i = 0; i < count; i++) {
+		tfa->tf[i].seq_head = -1ULL;
+		tfa->tf[i].seq_tail = -1ULL;
+	}
+	tfa->seq_head = -1ULL;
+	tfa->seq_tail = -1ULL;
+	return tfa;
+
+error:
+	if (tfa) {
+		free(tfa->tf);
+	}
+	free(tfa);
+	return NULL;
+}
+
+void tracefile_array_destroy(struct tracefile_array *tfa)
+{
+	if (!tfa) {
+		return;
+	}
+	free(tfa->tf);
+	free(tfa);
+}
+
+void tracefile_array_file_rotate(struct tracefile_array *tfa)
+{
+	uint64_t *headp, *tailp;
+
+	if (tfa->count <= 1) {
+		return;
+	}
+	/* Rotate to next file.  */
+	tfa->file_head = (tfa->file_head + 1) % tfa->count;
+	if (tfa->file_head == tfa->file_tail) {
+		/* Move tail. */
+		tfa->file_tail = (tfa->file_tail + 1) % tfa->count;
+	}
+	headp = &tfa->tf[tfa->file_head].seq_head;
+	tailp = &tfa->tf[tfa->file_head].seq_tail;
+	/*
+	 * If we overwrite a file with content, we need to push the tail
+	 * to the position following the content we are overwriting.
+	 */
+	if (*headp != -1ULL) {
+		tfa->seq_tail = tfa->tf[tfa->file_tail].seq_tail;
+	}
+	/* Reset this file head/tail (overwrite). */
+	*headp = -1ULL;
+	*tailp = -1ULL;
+}
+
+void tracefile_array_commit_seq(struct tracefile_array *tfa)
+{
+	uint64_t *headp, *tailp;
+
+	/* Increment overall head. */
+	tfa->seq_head++;
+	/* If we are committing our first index overall, set tail to 0. */
+	if (tfa->seq_tail == -1ULL) {
+		tfa->seq_tail = 0;
+	}
+	if (tfa->count <= 1) {
+		return;
+	}
+	headp = &tfa->tf[tfa->file_head].seq_head;
+	tailp = &tfa->tf[tfa->file_head].seq_tail;
+	/* Update head tracefile seq_head. */
+	*headp = tfa->seq_head;
+	/*
+	 * If we are committing our first index in this packet, set tail
+	 * to this index seq count.
+	 */
+	if (*tailp == -1ULL) {
+		*tailp = tfa->seq_head;
+	}
+}
+
+uint64_t tracefile_array_get_file_index_head(struct tracefile_array *tfa)
+{
+	return tfa->file_head;
+}
+
+uint64_t tracefile_array_get_seq_head(struct tracefile_array *tfa)
+{
+	return tfa->seq_head;
+}
+
+uint64_t tracefile_array_get_file_index_tail(struct tracefile_array *tfa)
+{
+	return tfa->file_tail;
+}
+
+uint64_t tracefile_array_get_seq_tail(struct tracefile_array *tfa)
+{
+	return tfa->seq_tail;
+}
+
+bool tracefile_array_seq_in_file(struct tracefile_array *tfa,
+		uint64_t file_index, uint64_t seq)
+{
+	if (tfa->count <= 1) {
+		/*
+		 * With a single file, we are guaranteed to have the
+		 * index in this file.
+		 */
+		return true;
+	}
+	assert(file_index < tfa->count);
+	if (seq == -1ULL) {
+		return false;
+	}
+	if (seq >= tfa->tf[file_index].seq_tail
+			&& seq <= tfa->tf[file_index].seq_head) {
+		return true;
+	} else {
+		return false;
+	}
+}
diff --git a/src/bin/lttng-relayd/tracefile-array.h b/src/bin/lttng-relayd/tracefile-array.h
new file mode 100644
index 0000000..c947078
--- /dev/null
+++ b/src/bin/lttng-relayd/tracefile-array.h
@@ -0,0 +1,63 @@
+#ifndef _TRACEFILE_ARRAY_H
+#define _TRACEFILE_ARRAY_H
+
+/*
+ * Copyright (C) 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include <limits.h>
+#include <inttypes.h>
+#include <pthread.h>
+#include <stdbool.h>
+
+struct tracefile {
+	/* Per-tracefile head/tail seq. */
+	uint64_t seq_head;	/* Newest seqcount. Inclusive. */
+	uint64_t seq_tail;	/* Oldest seqcount. Inclusive. */
+};
+
+/*
+ * Represents an array of trace files in a stream.
+ */
+struct tracefile_array {
+	struct tracefile *tf;
+	size_t count;
+
+	/* Current head/tail files. */
+	uint64_t file_head;
+	uint64_t file_tail;
+
+	/* Overall head/tail seq for the entire array. Inclusive. */
+	uint64_t seq_head;
+	uint64_t seq_tail;
+};
+
+struct tracefile_array *tracefile_array_create(size_t count);
+void tracefile_array_destroy(struct tracefile_array *tfa);
+
+void tracefile_array_file_rotate(struct tracefile_array *tfa);
+void tracefile_array_commit_seq(struct tracefile_array *tfa);
+
+uint64_t tracefile_array_get_file_index_head(struct tracefile_array *tfa);
+uint64_t tracefile_array_get_seq_head(struct tracefile_array *tfa);
+
+uint64_t tracefile_array_get_file_index_tail(struct tracefile_array *tfa);
+uint64_t tracefile_array_get_seq_tail(struct tracefile_array *tfa);
+
+bool tracefile_array_seq_in_file(struct tracefile_array *tfa,
+		uint64_t file_index, uint64_t seq);
+
+#endif /* _STREAM_H */
diff --git a/src/bin/lttng-relayd/viewer-stream.c b/src/bin/lttng-relayd/viewer-stream.c
index 1d02ee3..31b0257 100644
--- a/src/bin/lttng-relayd/viewer-stream.c
+++ b/src/bin/lttng-relayd/viewer-stream.c
@@ -63,29 +63,45 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
 		goto error;
 	}
 
+	if (!stream_get(stream)) {
+		ERR("Cannot get stream");
+		goto error;
+	}
+	vstream->stream = stream;
+
+	pthread_mutex_lock(&stream->lock);
+
+	if (stream->is_metadata && stream->trace->viewer_metadata_stream) {
+		ERR("Cannot attach viewer metadata stream to trace (busy).");
+		goto error_unlock;
+	}
+
 	switch (seek_t) {
 	case LTTNG_VIEWER_SEEK_BEGINNING:
-		vstream->current_tracefile_id = stream->oldest_tracefile_id;
+		vstream->current_tracefile_id =
+			tracefile_array_get_file_index_tail(stream->tfa);
+		vstream->index_sent_seqcount =
+			tracefile_array_get_seq_tail(stream->tfa);
 		break;
 	case LTTNG_VIEWER_SEEK_LAST:
-		vstream->current_tracefile_id = stream->current_tracefile_id;
+		vstream->current_tracefile_id =
+			tracefile_array_get_file_index_head(stream->tfa);
+		/*
+		 * We seek at the very end of each stream, awaiting for
+		 * a future packet to eventually come in.
+		 */
+		vstream->index_sent_seqcount =
+			tracefile_array_get_seq_head(stream->tfa) + 1;
 		break;
 	default:
-		goto error;
-	}
-	if (!stream_get(stream)) {
-		ERR("Cannot get stream");
-		goto error;
+		goto error_unlock;
 	}
-	vstream->stream = stream;
 
-	pthread_mutex_lock(&stream->lock);
 	/*
-	 * If we never received an index for the current stream, delay the opening
-	 * of the index, otherwise open it right now.
+	 * If we never received an index for the current stream, delay
+	 * the opening of the index, otherwise open it right now.
 	 */
-	if (vstream->current_tracefile_id == stream->current_tracefile_id
-			&& stream->total_index_received == 0) {
+	if (stream->index_received_seqcount == 0) {
 		vstream->index_fd = NULL;
 	} else {
 		int read_fd;
@@ -112,14 +128,12 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
 		if (lseek_ret < 0) {
 			goto error_unlock;
 		}
-		vstream->last_sent_index = stream->total_index_received;
 	}
-	pthread_mutex_unlock(&stream->lock);
-
 	if (stream->is_metadata) {
 		rcu_assign_pointer(stream->trace->viewer_metadata_stream,
 				vstream);
 	}
+	pthread_mutex_unlock(&stream->lock);
 
 	/* Globally visible after the add unique. */
 	lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle);
@@ -227,26 +241,6 @@ void viewer_stream_put(struct relay_viewer_stream *vstream)
 }
 
 /*
- * Returns whether the current tracefile is readable. If not, it has
- * been overwritten.
- * Must be called with rstream lock held.
- */
-bool viewer_stream_is_tracefile_seq_readable(struct relay_viewer_stream *vstream,
-		 uint64_t seq)
-{
-	struct relay_stream *stream = vstream->stream;
-
-	if (seq >= stream->oldest_tracefile_seq
-			&& seq <= stream->current_tracefile_seq) {
-		/* seq is a readable file. */
-		return true;
-	} else {
-		/* seq is not readable. */
-		return false;
-	}
-}
-
-/*
  * Rotate a stream to the next tracefile.
  *
  * Must be called with the rstream lock held.
@@ -256,9 +250,11 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream)
 {
 	int ret;
 	struct relay_stream *stream = vstream->stream;
+	uint64_t new_id;
 
 	/* Detect the last tracefile to open. */
-	if (stream->total_index_received == vstream->last_sent_index
+	if (stream->index_received_seqcount
+			== vstream->index_sent_seqcount
 			&& stream->trace->session->connection_closed) {
 		ret = 1;
 		goto end;
@@ -270,17 +266,22 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream)
 		goto end;
 	}
 
-	if (!viewer_stream_is_tracefile_seq_readable(vstream,
-			vstream->current_tracefile_seq + 1)) {
-		vstream->current_tracefile_id =
-				stream->oldest_tracefile_id;
-		vstream->current_tracefile_seq =
-				stream->oldest_tracefile_seq;
+	/*
+	 * Try to move to the next file.
+	 */
+	new_id = (vstream->current_tracefile_id + 1)
+			% stream->tracefile_count;
+	if (tracefile_array_seq_in_file(stream->tfa, new_id,
+			vstream->index_sent_seqcount)) {
+		vstream->current_tracefile_id = new_id;
 	} else {
+		/*
+		 * We need to resync because we lag behind tail.
+		 */
 		vstream->current_tracefile_id =
-				(vstream->current_tracefile_id + 1)
-					% stream->tracefile_count;
-		vstream->current_tracefile_seq++;
+			tracefile_array_get_file_index_tail(stream->tfa);
+		vstream->index_sent_seqcount =
+			tracefile_array_get_seq_tail(stream->tfa);
 	}
 
 	if (vstream->index_fd) {
diff --git a/src/bin/lttng-relayd/viewer-stream.h b/src/bin/lttng-relayd/viewer-stream.h
index cc46db4..5dc135d 100644
--- a/src/bin/lttng-relayd/viewer-stream.h
+++ b/src/bin/lttng-relayd/viewer-stream.h
@@ -59,10 +59,15 @@ struct relay_viewer_stream {
 	char *channel_name;
 
 	uint64_t current_tracefile_id;
-	/* Free-running counter. */
-	uint64_t current_tracefile_seq;
 
-	uint64_t last_sent_index;
+	/*
+	 * Counts the number of sent indexes. The "tag" associated
+	 * with an index to send is the current index_received_seqcount,
+	 * because we increment index_received_seqcount after sending
+	 * each index. This index_received_seqcount counter can also be
+	 * updated when catching up with the producer.
+	 */
+	uint64_t index_sent_seqcount;
 
 	/* Indicates if this stream has been sent to a viewer client. */
 	bool sent_flag;
-- 
2.1.4




More information about the lttng-dev mailing list