[lttng-dev] [RFC PATCH v2 lttng-tools] Fix: Relay daemon ownership and reference counting

Mathieu Desnoyers mathieu.desnoyers at efficios.com
Thu Aug 6 19:18:18 EDT 2015


The ownership and reference counting of the relay daemon is unclear and
buggy in many ways. It is the cause of memory corruptions, double-free,
leaks, segmentation faults, observed in various conditions.

Fix this situation by introducing a clear ownership and reference
counting scheme for this daemon.

See doc/relayd-architecture.txt for details.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
---
 doc/Makefile.am                       |    3 +-
 doc/relayd-architecture.txt           |   96 ++++
 src/bin/lttng-relayd/Makefile.am      |    4 +-
 src/bin/lttng-relayd/cmd-2-1.c        |   17 +-
 src/bin/lttng-relayd/cmd-2-1.h        |   10 +-
 src/bin/lttng-relayd/cmd-2-2.c        |   22 +-
 src/bin/lttng-relayd/cmd-2-2.h        |   10 +-
 src/bin/lttng-relayd/cmd-2-4.c        |   19 +-
 src/bin/lttng-relayd/cmd-2-4.h        |   10 +-
 src/bin/lttng-relayd/cmd-generic.c    |    1 +
 src/bin/lttng-relayd/cmd-generic.h    |    7 +-
 src/bin/lttng-relayd/cmd.h            |    7 +-
 src/bin/lttng-relayd/connection.c     |  128 +++--
 src/bin/lttng-relayd/connection.h     |   63 +-
 src/bin/lttng-relayd/ctf-trace.c      |  185 +++---
 src/bin/lttng-relayd/ctf-trace.h      |   60 +-
 src/bin/lttng-relayd/index.c          |  343 +++++++----
 src/bin/lttng-relayd/index.h          |   59 +-
 src/bin/lttng-relayd/live.c           |  833 ++++++++++++---------------
 src/bin/lttng-relayd/live.h           |   10 +-
 src/bin/lttng-relayd/lttng-relayd.h   |   23 +-
 src/bin/lttng-relayd/main.c           | 1011 ++++++++++++---------------------
 src/bin/lttng-relayd/session.c        |  191 ++++---
 src/bin/lttng-relayd/session.h        |  115 ++--
 src/bin/lttng-relayd/stream-fd.c      |   58 ++
 src/bin/lttng-relayd/stream-fd.h      |   32 ++
 src/bin/lttng-relayd/stream.c         |  319 ++++++++---
 src/bin/lttng-relayd/stream.h         |  129 +++--
 src/bin/lttng-relayd/utils.h          |    7 +-
 src/bin/lttng-relayd/viewer-session.c |  159 ++++++
 src/bin/lttng-relayd/viewer-session.h |   55 ++
 src/bin/lttng-relayd/viewer-stream.c  |  293 +++++-----
 src/bin/lttng-relayd/viewer-stream.h  |   81 ++-
 src/bin/lttng-sessiond/consumer.c     |    7 +-
 34 files changed, 2428 insertions(+), 1939 deletions(-)
 create mode 100644 doc/relayd-architecture.txt
 create mode 100644 src/bin/lttng-relayd/stream-fd.c
 create mode 100644 src/bin/lttng-relayd/stream-fd.h
 create mode 100644 src/bin/lttng-relayd/viewer-session.c
 create mode 100644 src/bin/lttng-relayd/viewer-session.h

diff --git a/doc/Makefile.am b/doc/Makefile.am
index 4ff71eb..7ae9303 100644
--- a/doc/Makefile.am
+++ b/doc/Makefile.am
@@ -1,7 +1,8 @@
 SUBDIRS = man
 EXTRA_DIST = quickstart.txt streaming-howto.txt python-howto.txt \
 	snapshot-howto.txt calibrate.txt kernel-CodingStyle.txt \
-	live-reading-howto.txt live-reading-protocol.txt
+	live-reading-howto.txt live-reading-protocol.txt \
+	relayd-architecture.txt
 
 dist_doc_DATA = quickstart.txt streaming-howto.txt python-howto.txt \
 	snapshot-howto.txt calibrate.txt live-reading-howto.txt \
diff --git a/doc/relayd-architecture.txt b/doc/relayd-architecture.txt
new file mode 100644
index 0000000..c45bd56
--- /dev/null
+++ b/doc/relayd-architecture.txt
@@ -0,0 +1,96 @@
+LTTng Relay Daemon Architecture
+Mathieu Desnoyers, August 2015
+
+This document describes the object model and architecture of the relay
+daemon, after the refactoring done within the commit "Fix: Relay daemon
+ownership and reference counting".
+
+We have the following object composition hierarchy:
+
+relay connection (main.c, for sessiond/consumer)
+  |
+  \-> 0 or 1 session
+       |
+       \-> 0 or many ctf-trace
+                 |
+                 \-> 0 or many stream
+                      |   |
+                      |   \-> 0 or many index
+                      |
+                      \-------> 0 or 1 viewer stream
+
+live connection (live.c, for client)
+  |
+  \-> 1 viewer session
+       |
+       \-> 0 or many session (actually a reference to session as created
+             |                by the relay connection)
+             |
+             \-> ..... (ctf-trace, stream, index, viewer stream)
+
+There are global tables declared in lttng-relayd.h for sessions
+(sessions_ht, indexed by session id), streams (relay_streams_ht, indexed
+by stream handle), and viewer streams (viewer_streams_ht, indexed by
+stream handle). The purpuse of those tables is to allow fast lookup of
+those objects using the IDs received in the communication protocols.
+
+There is also one connection hash table per worker thread. There is one
+worker thread to receive data (main.c), and one worker thread to
+interact with viewer clients (live.c). Those tables are indexed by
+socket file descriptor.
+
+A RCU lookup+refcounting scheme has been introduced for all objects
+(except viewer session which is still an exception at the moment). This
+scheme allows looking up the objects or doing a traversal on the RCU
+linked list or hash table in combination with a getter on the object.
+This getter validables that there is still at least one reference to the
+object, else the lookup acts just as if the object does not exist. This
+scheme is protected by a "reflock" mutex in each object. "reflock"
+mutexes can be nested from the innermost object to the outermost object.
+IOW, the session reflock can nest within the ctf-trace reflock.
+
+There is also a "lock" mutex in each object. Those are used to
+synchronize between threads (currently the main.c relay thread and
+live.c client thread) when objects are shared. Locks can be nested form
+the outmost object to the innermost object. IOW, the ctf-trace lock can
+nest within the session lock.
+
+A "lock" should never nest within a "reflock".
+
+RCU linked lists are used to iterate using RCU, and protected by
+their own mutex for modifications. Iterations should be confirmed using
+the object "getter" to ensure its refcount is not 0 (exept in cases
+where the caller actually owns the objects and therefore can assume its
+refcount is not 0).
+
+RCU hash tables are used to iterate using RCU. Iteration should be
+confirmed using the object "getter" to ensure its refcount is not 0
+(except again if we have ownership and can assume the object refcount is
+not 0).
+
+Object creation has a refcount of 1. Each getter increments the
+refcount, and need to be paired with a "put" to decrement it. A final
+put on "self" (ownership) will allow refcount to reach 0, therefore
+triggering release, and thus free through call_rcu.
+
+In the composition scheme, we find back references from each composite
+to its container. Therefore, each composite holds a reference (refcount)
+on its container. This allows following pointers from e.g. viewer stream
+to stream to ctf-trace to session without performing any validation,
+due to transitive refcounting of those back-references.
+
+In addition to those back references, there are a few key ownership
+references held. The connection in the relay worker thread (main.c)
+holds ownership on the session, and on each stream it contains. The
+connection in the live worker thread (live.c) holds ownership on each
+the viewer stream it creates. The rest is ensured by back references
+from composite to container objects. When a connection is closed, it
+puts all the ownership references it is holding. This will then
+eventually trigger destruction of the session, streams, and viewer
+streams associated with the connection when all the back references
+reach 0.
+
+RCU read-side locks are now only held during iteration on RCU lists and
+hash tables, and within the internals of the get (lookup) and put
+functions. Those functions then use refcounting to ensure existance of
+the object when returned to their caller.
diff --git a/src/bin/lttng-relayd/Makefile.am b/src/bin/lttng-relayd/Makefile.am
index 126cdaf..428f352 100644
--- a/src/bin/lttng-relayd/Makefile.am
+++ b/src/bin/lttng-relayd/Makefile.am
@@ -17,7 +17,9 @@ lttng_relayd_SOURCES = main.c lttng-relayd.h utils.h utils.c cmd.h \
                        viewer-stream.h viewer-stream.c \
                        session.c session.h \
                        stream.c stream.h \
-                       connection.c connection.h
+                       stream-fd.c stream-fd.h \
+                       connection.c connection.h \
+                       viewer-session.c viewer-session.h
 
 # link on liblttngctl for check if relayd is already alive.
 lttng_relayd_LDADD = -lrt -lurcu-common -lurcu \
diff --git a/src/bin/lttng-relayd/cmd-2-1.c b/src/bin/lttng-relayd/cmd-2-1.c
index 0cd9b5a..a2c340f 100644
--- a/src/bin/lttng-relayd/cmd-2-1.c
+++ b/src/bin/lttng-relayd/cmd-2-1.c
@@ -1,6 +1,7 @@
 /*
  * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
  *                      David Goulet <dgoulet at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
@@ -28,30 +29,30 @@
 #include "cmd-2-1.h"
 #include "utils.h"
 
+/*
+ * cmd_recv_stream_2_1 allocates path_name and channel_name.
+ */
 int cmd_recv_stream_2_1(struct relay_connection *conn,
-		struct relay_stream *stream)
+		char **path_name, char **channel_name)
 {
 	int ret;
 	struct lttcomm_relayd_add_stream stream_info;
 
-	assert(conn);
-	assert(stream);
-
 	ret = cmd_recv(conn->sock, &stream_info, sizeof(stream_info));
 	if (ret < 0) {
 		ERR("Unable to recv stream version 2.1");
 		goto error;
 	}
 
-	stream->path_name = create_output_path(stream_info.pathname);
-	if (stream->path_name == NULL) {
+	*path_name = create_output_path(stream_info.pathname);
+	if (*path_name == NULL) {
 		PERROR("Path name allocation");
 		ret = -ENOMEM;
 		goto error;
 	}
 
-	stream->channel_name = strdup(stream_info.channel_name);
-	if (stream->channel_name == NULL) {
+	*channel_name = strdup(stream_info.channel_name);
+	if (*channel_name == NULL) {
 		ret = -errno;
 		PERROR("Path name allocation");
 		goto error;
diff --git a/src/bin/lttng-relayd/cmd-2-1.h b/src/bin/lttng-relayd/cmd-2-1.h
index bab8190..46283dc 100644
--- a/src/bin/lttng-relayd/cmd-2-1.h
+++ b/src/bin/lttng-relayd/cmd-2-1.h
@@ -1,6 +1,10 @@
+#ifndef RELAYD_CMD_2_1_H
+#define RELAYD_CMD_2_1_H
+
 /*
  * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
  *                      David Goulet <dgoulet at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
@@ -16,13 +20,9 @@
  * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#ifndef RELAYD_CMD_2_1_H
-#define RELAYD_CMD_2_1_H
-
 #include "lttng-relayd.h"
-#include "stream.h"
 
 int cmd_recv_stream_2_1(struct relay_connection *conn,
-		struct relay_stream *stream);
+		char **path_name, char **channel_name);
 
 #endif /* RELAYD_CMD_2_1_H */
diff --git a/src/bin/lttng-relayd/cmd-2-2.c b/src/bin/lttng-relayd/cmd-2-2.c
index 7dd99ad..1493b73 100644
--- a/src/bin/lttng-relayd/cmd-2-2.c
+++ b/src/bin/lttng-relayd/cmd-2-2.c
@@ -1,6 +1,7 @@
 /*
  * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
  *                      David Goulet <dgoulet at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
@@ -30,37 +31,38 @@
 #include "cmd-2-1.h"
 #include "utils.h"
 
+/*
+ * cmd_recv_stream_2_2 allocates path_name and channel_name.
+ */
 int cmd_recv_stream_2_2(struct relay_connection *conn,
-		struct relay_stream *stream)
+		char **path_name, char **channel_name,
+		uint64_t *tracefile_size, uint64_t *tracefile_count)
 {
 	int ret;
 	struct lttcomm_relayd_add_stream_2_2 stream_info;
 
-	assert(conn);
-	assert(stream);
-
 	ret = cmd_recv(conn->sock, &stream_info, sizeof(stream_info));
 	if (ret < 0) {
 		ERR("Unable to recv stream version 2.2");
 		goto error;
 	}
 
-	stream->path_name = create_output_path(stream_info.pathname);
-	if (stream->path_name == NULL) {
+	*path_name = create_output_path(stream_info.pathname);
+	if (*path_name == NULL) {
 		PERROR("Path name allocation");
 		ret = -ENOMEM;
 		goto error;
 	}
 
-	stream->channel_name = strdup(stream_info.channel_name);
-	if (stream->channel_name == NULL) {
+	*channel_name = strdup(stream_info.channel_name);
+	if (*channel_name == NULL) {
 		ret = -errno;
 		PERROR("Path name allocation");
 		goto error;
 	}
 
-	stream->tracefile_size = be64toh(stream_info.tracefile_size);
-	stream->tracefile_count = be64toh(stream_info.tracefile_count);
+	*tracefile_size = be64toh(stream_info.tracefile_size);
+	*tracefile_count = be64toh(stream_info.tracefile_count);
 	ret = 0;
 
 error:
diff --git a/src/bin/lttng-relayd/cmd-2-2.h b/src/bin/lttng-relayd/cmd-2-2.h
index bd1cd14..894a63a 100644
--- a/src/bin/lttng-relayd/cmd-2-2.h
+++ b/src/bin/lttng-relayd/cmd-2-2.h
@@ -1,6 +1,10 @@
+#ifndef RELAYD_CMD_2_2_H
+#define RELAYD_CMD_2_2_H
+
 /*
  * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
  *                      David Goulet <dgoulet at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
@@ -16,12 +20,10 @@
  * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#ifndef RELAYD_CMD_2_2_H
-#define RELAYD_CMD_2_2_H
-
 #include "lttng-relayd.h"
 
 int cmd_recv_stream_2_2(struct relay_connection *conn,
-		struct relay_stream *stream);
+		char **path_name, char **channel_name,
+		uint64_t *tracefile_size, uint64_t *tracefile_count);
 
 #endif /* RELAYD_CMD_2_2_H */
diff --git a/src/bin/lttng-relayd/cmd-2-4.c b/src/bin/lttng-relayd/cmd-2-4.c
index d8aa737..a3290cb 100644
--- a/src/bin/lttng-relayd/cmd-2-4.c
+++ b/src/bin/lttng-relayd/cmd-2-4.c
@@ -1,6 +1,7 @@
 /*
  * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
  *                      David Goulet <dgoulet at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
@@ -30,26 +31,24 @@
 #include "lttng-relayd.h"
 
 int cmd_create_session_2_4(struct relay_connection *conn,
-		struct relay_session *session)
+		char *session_name, char *hostname,
+		uint32_t *live_timer, bool *snapshot)
 {
 	int ret;
 	struct lttcomm_relayd_create_session_2_4 session_info;
 
-	assert(conn);
-	assert(session);
-
 	ret = cmd_recv(conn->sock, &session_info, sizeof(session_info));
 	if (ret < 0) {
 		ERR("Unable to recv session info version 2.4");
 		goto error;
 	}
 
-	strncpy(session->session_name, session_info.session_name,
-			sizeof(session->session_name));
-	strncpy(session->hostname, session_info.hostname,
-			sizeof(session->hostname));
-	session->live_timer = be32toh(session_info.live_timer);
-	session->snapshot = be32toh(session_info.snapshot);
+	strncpy(session_name, session_info.session_name,
+			sizeof(session_info.session_name));
+	strncpy(hostname, session_info.hostname,
+			sizeof(session_info.hostname));
+	*live_timer = be32toh(session_info.live_timer);
+	*snapshot = be32toh(session_info.snapshot);
 
 	ret = 0;
 
diff --git a/src/bin/lttng-relayd/cmd-2-4.h b/src/bin/lttng-relayd/cmd-2-4.h
index aaf572a..ab73478 100644
--- a/src/bin/lttng-relayd/cmd-2-4.h
+++ b/src/bin/lttng-relayd/cmd-2-4.h
@@ -1,6 +1,10 @@
+#ifndef RELAYD_CMD_2_4_H
+#define RELAYD_CMD_2_4_H
+
 /*
  * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
  *                      David Goulet <dgoulet at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
@@ -16,12 +20,10 @@
  * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#ifndef RELAYD_CMD_2_4_H
-#define RELAYD_CMD_2_4_H
-
 #include "lttng-relayd.h"
 
 int cmd_create_session_2_4(struct relay_connection *conn,
-		struct relay_session *session);
+		char *session_name, char *hostname,
+		uint32_t *live_timer, bool *snapshot);
 
 #endif /* RELAYD_CMD_2_4_H */
diff --git a/src/bin/lttng-relayd/cmd-generic.c b/src/bin/lttng-relayd/cmd-generic.c
index 417d6d3..276e85b 100644
--- a/src/bin/lttng-relayd/cmd-generic.c
+++ b/src/bin/lttng-relayd/cmd-generic.c
@@ -1,6 +1,7 @@
 /*
  * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
  *                      David Goulet <dgoulet at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
diff --git a/src/bin/lttng-relayd/cmd-generic.h b/src/bin/lttng-relayd/cmd-generic.h
index 640fed7..4551f0a 100644
--- a/src/bin/lttng-relayd/cmd-generic.h
+++ b/src/bin/lttng-relayd/cmd-generic.h
@@ -1,6 +1,10 @@
+#ifndef RELAYD_CMD_GENERIC_H
+#define RELAYD_CMD_GENERIC_H
+
 /*
  * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
  *                      David Goulet <dgoulet at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
@@ -16,9 +20,6 @@
  * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#ifndef RELAYD_CMD_GENERIC_H
-#define RELAYD_CMD_GENERIC_H
-
 #include <common/sessiond-comm/sessiond-comm.h>
 
 #include "connection.h"
diff --git a/src/bin/lttng-relayd/cmd.h b/src/bin/lttng-relayd/cmd.h
index c8b37d5..88db09a 100644
--- a/src/bin/lttng-relayd/cmd.h
+++ b/src/bin/lttng-relayd/cmd.h
@@ -1,6 +1,10 @@
+#ifndef RELAYD_CMD_H
+#define RELAYD_CMD_H
+
 /*
  * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
  *                      David Goulet <dgoulet at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
@@ -16,9 +20,6 @@
  * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#ifndef RELAYD_CMD_H
-#define RELAYD_CMD_H
-
 #include "cmd-generic.h"
 #include "cmd-2-1.h"
 #include "cmd-2-2.h"
diff --git a/src/bin/lttng-relayd/connection.c b/src/bin/lttng-relayd/connection.c
index 76e48a6..b425548 100644
--- a/src/bin/lttng-relayd/connection.c
+++ b/src/bin/lttng-relayd/connection.c
@@ -1,6 +1,7 @@
 /*
  * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
  *                      David Goulet <dgoulet at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
@@ -19,91 +20,132 @@
 #define _GNU_SOURCE
 #define _LGPL_SOURCE
 #include <common/common.h>
+#include <urcu/rculist.h>
 
 #include "connection.h"
 #include "stream.h"
+#include "viewer-session.h"
 
-static void rcu_free_connection(struct rcu_head *head)
+bool connection_get(struct relay_connection *conn)
 {
-	struct relay_connection *conn =
-		caa_container_of(head, struct relay_connection, rcu_node);
+	bool has_ref = false;
 
-	lttcomm_destroy_sock(conn->sock);
-	connection_free(conn);
+	pthread_mutex_lock(&conn->reflock);
+	if (conn->ref.refcount != 0) {
+		has_ref = true;
+		urcu_ref_get(&conn->ref);
+	}
+	pthread_mutex_unlock(&conn->reflock);
+
+	return has_ref;
 }
 
-/*
- * Must be called with a read side lock held. The read side lock must be
- * kept until the returned relay_connection is no longer in use.
- */
-struct relay_connection *connection_find_by_sock(struct lttng_ht *ht, int sock)
+struct relay_connection *connection_get_by_sock(struct lttng_ht *relay_connections_ht,
+		int sock)
 {
 	struct lttng_ht_node_ulong *node;
 	struct lttng_ht_iter iter;
 	struct relay_connection *conn = NULL;
 
-	assert(ht);
 	assert(sock >= 0);
 
-	lttng_ht_lookup(ht, (void *)((unsigned long) sock), &iter);
+	rcu_read_lock();
+	lttng_ht_lookup(relay_connections_ht, (void *)((unsigned long) sock),
+			&iter);
 	node = lttng_ht_iter_get_node_ulong(&iter);
 	if (!node) {
 		DBG2("Relay connection by sock %d not found", sock);
 		goto end;
 	}
 	conn = caa_container_of(node, struct relay_connection, sock_n);
-
+	if (!connection_get(conn)) {
+		conn = NULL;
+	}
 end:
+	rcu_read_unlock();
 	return conn;
 }
 
-void connection_delete(struct lttng_ht *ht, struct relay_connection *conn)
+struct relay_connection *connection_create(struct lttcomm_sock *sock,
+		enum connection_type type)
 {
-	int ret;
-	struct lttng_ht_iter iter;
-
-	assert(ht);
-	assert(conn);
+	struct relay_connection *conn;
 
-	iter.iter.node = &conn->sock_n.node;
-	ret = lttng_ht_del(ht, &iter);
-	assert(!ret);
+	conn = zmalloc(sizeof(*conn));
+	if (!conn) {
+		PERROR("zmalloc relay connection");
+		goto end;
+	}
+	pthread_mutex_init(&conn->lock, NULL);
+	pthread_mutex_init(&conn->reflock, NULL);
+	urcu_ref_init(&conn->ref);
+	conn->type = type;
+	conn->sock = sock;
+	lttng_ht_node_init_ulong(&conn->sock_n, (unsigned long) conn->sock->fd);
+end:
+	return conn;
 }
 
-void connection_destroy(struct relay_connection *conn)
+static void rcu_free_connection(struct rcu_head *head)
 {
-	assert(conn);
+	struct relay_connection *conn =
+		caa_container_of(head, struct relay_connection, rcu_node);
 
+	lttcomm_destroy_sock(conn->sock);
+	if (conn->viewer_session) {
+		viewer_session_destroy(conn->viewer_session);
+		conn->viewer_session = NULL;
+	}
+	free(conn);
+}
+
+static void destroy_connection(struct relay_connection *conn)
+{
 	call_rcu(&conn->rcu_node, rcu_free_connection);
 }
 
-struct relay_connection *connection_create(void)
+static void connection_release(struct urcu_ref *ref)
 {
-	struct relay_connection *conn;
+	struct relay_connection *conn =
+		caa_container_of(ref, struct relay_connection, ref);
 
-	conn = zmalloc(sizeof(*conn));
-	if (!conn) {
-		PERROR("zmalloc relay connection");
-		goto error;
+	if (conn->in_socket_ht) {
+		struct lttng_ht_iter iter;
+		int ret;
+
+		iter.iter.node = &conn->sock_n.node;
+		ret = lttng_ht_del(conn->socket_ht, &iter);
+		assert(!ret);
 	}
 
-error:
-	return conn;
+	if (conn->session) {
+		if (session_close(conn->session)) {
+			ERR("session_close");
+		}
+		conn->session = NULL;
+	}
+	if (conn->viewer_session) {
+		viewer_session_close(conn->viewer_session);
+	}
+	destroy_connection(conn);
 }
 
-void connection_init(struct relay_connection *conn)
+void connection_put(struct relay_connection *conn)
 {
-	assert(conn);
-	assert(conn->sock);
-
-	CDS_INIT_LIST_HEAD(&conn->recv_head);
-	lttng_ht_node_init_ulong(&conn->sock_n, (unsigned long) conn->sock->fd);
+	rcu_read_lock();
+	pthread_mutex_lock(&conn->reflock);
+	urcu_ref_put(&conn->ref, connection_release);
+	pthread_mutex_unlock(&conn->reflock);
+	rcu_read_unlock();
 }
 
-void connection_free(struct relay_connection *conn)
+void connection_ht_add(struct lttng_ht *relay_connections_ht,
+		struct relay_connection *conn)
 {
-	assert(conn);
-
-	free(conn->viewer_session);
-	free(conn);
+	pthread_mutex_lock(&conn->lock);
+	assert(!conn->in_socket_ht);
+	lttng_ht_add_unique_ulong(relay_connections_ht, &conn->sock_n);
+	conn->in_socket_ht = 1;
+	conn->socket_ht = relay_connections_ht;
+	pthread_mutex_unlock(&conn->lock);
 }
diff --git a/src/bin/lttng-relayd/connection.h b/src/bin/lttng-relayd/connection.h
index 70fe4ba..1fff515 100644
--- a/src/bin/lttng-relayd/connection.h
+++ b/src/bin/lttng-relayd/connection.h
@@ -1,6 +1,10 @@
+#ifndef _CONNECTION_H
+#define _CONNECTION_H
+
 /*
  * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
  *                      David Goulet <dgoulet at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
@@ -16,9 +20,6 @@
  * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#ifndef _CONNECTION_H
-#define _CONNECTION_H
-
 #include <limits.h>
 #include <inttypes.h>
 #include <pthread.h>
@@ -32,6 +33,7 @@
 #include "session.h"
 
 enum connection_type {
+	RELAY_CONNECTION_UNKNOWN    = 0,
 	RELAY_DATA                  = 1,
 	RELAY_CONTROL               = 2,
 	RELAY_VIEWER_COMMAND        = 3,
@@ -44,36 +46,49 @@ enum connection_type {
  */
 struct relay_connection {
 	struct lttcomm_sock *sock;
-	struct relay_session *session;
-	struct relay_viewer_session *viewer_session;
 	struct cds_wfcq_node qnode;
-	struct lttng_ht_node_ulong sock_n;
-	struct rcu_head rcu_node;
+
 	enum connection_type type;
-	/* Protocol version to use for this connection. */
+	/*
+	 * session is only ever set for RELAY_CONTROL connection type.
+	 */
+	struct relay_session *session;
+	/*
+	 * viewer_session is only ever set for RELAY_VIEWER_COMMAND
+	 * connection type.
+	 */
+	struct relay_viewer_session *viewer_session;
+
+	/*
+	 * Protocol version to use for this connection. Only valid for
+	 * RELAY_CONTROL connection type.
+	 */
 	uint32_t major;
 	uint32_t minor;
-	uint64_t session_id;
+
+	struct urcu_ref ref;
+	pthread_mutex_t reflock;
+
+	pthread_mutex_t lock;
+
+	bool version_check_done;
 
 	/*
-	 * This contains streams that are received on that connection. It's used to
-	 * store them until we get the streams sent command where they are removed
-	 * and flagged ready for the viewer. This is ONLY used by the control
-	 * thread thus any action on it should happen in that thread.
+	 * Node member of connection within global socket hash table.
 	 */
-	struct cds_list_head recv_head;
-	unsigned int version_check_done:1;
-
-	/* Pointer to the sessions HT that this connection can use. */
-	struct lttng_ht *sessions_ht;
+	struct lttng_ht_node_ulong sock_n;
+	bool in_socket_ht;
+	struct lttng_ht *socket_ht;	/* HACK: Contained within this hash table. */
+	struct rcu_head rcu_node;	/* For call_rcu teardown. */
 };
 
-struct relay_connection *connection_find_by_sock(struct lttng_ht *ht,
+struct relay_connection *connection_create(struct lttcomm_sock *sock,
+		enum connection_type type);
+struct relay_connection *connection_get_by_sock(struct lttng_ht *relay_connections_ht,
 		int sock);
-struct relay_connection *connection_create(void);
-void connection_init(struct relay_connection *conn);
-void connection_delete(struct lttng_ht *ht, struct relay_connection *conn);
-void connection_destroy(struct relay_connection *conn);
-void connection_free(struct relay_connection *conn);
+bool connection_get(struct relay_connection *connection);
+void connection_put(struct relay_connection *connection);
+void connection_ht_add(struct lttng_ht *relay_connections_ht,
+		struct relay_connection *conn);
 
 #endif /* _CONNECTION_H */
diff --git a/src/bin/lttng-relayd/ctf-trace.c b/src/bin/lttng-relayd/ctf-trace.c
index 02a8b2b..df96460 100644
--- a/src/bin/lttng-relayd/ctf-trace.c
+++ b/src/bin/lttng-relayd/ctf-trace.c
@@ -1,6 +1,7 @@
 /*
  * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
  *                      David Goulet <dgoulet at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
@@ -22,148 +23,174 @@
 
 #include <common/common.h>
 #include <common/utils.h>
+#include <urcu/rculist.h>
 
 #include "ctf-trace.h"
 #include "lttng-relayd.h"
 #include "stream.h"
 
 static uint64_t last_relay_ctf_trace_id;
+static pthread_mutex_t last_relay_ctf_trace_id_lock = PTHREAD_MUTEX_INITIALIZER;
 
-static void rcu_destroy_ctf_trace(struct rcu_head *head)
+static void rcu_destroy_ctf_trace(struct rcu_head *rcu_head)
 {
-	struct lttng_ht_node_str *node =
-		caa_container_of(head, struct lttng_ht_node_str, head);
-	struct ctf_trace *trace=
-		caa_container_of(node, struct ctf_trace, node);
+	struct ctf_trace *trace =
+		caa_container_of(rcu_head, struct ctf_trace, rcu_node);
 
 	free(trace);
 }
 
-static void rcu_destroy_stream(struct rcu_head *head)
-{
-	struct relay_stream *stream =
-		caa_container_of(head, struct relay_stream, rcu_node);
-
-	stream_destroy(stream);
-}
-
 /*
  * Destroy a ctf trace and all stream contained in it.
  *
  * MUST be called with the RCU read side lock.
  */
-void ctf_trace_destroy(struct ctf_trace *obj)
+void ctf_trace_destroy(struct ctf_trace *trace)
 {
-	struct relay_stream *stream, *tmp_stream;
-
-	assert(obj);
 	/*
-	 * Getting to this point, every stream referenced to that object have put
-	 * back their ref since the've been closed by the control side.
+	 * Getting to this point, every stream referenced by that traceect
+	 * have put back their ref since the've been closed by the
+	 * control side.
 	 */
-	assert(!obj->refcount);
+	assert(cds_list_empty(&trace->stream_list));
+	session_put(trace->session);
+	trace->session = NULL;
+	call_rcu(&trace->rcu_node, rcu_destroy_ctf_trace);
+}
 
-	cds_list_for_each_entry_safe(stream, tmp_stream, &obj->stream_list,
-			trace_list) {
-		stream_delete(relay_streams_ht, stream);
-		call_rcu(&stream->rcu_node, rcu_destroy_stream);
-	}
+void ctf_trace_release(struct urcu_ref *ref)
+{
+	struct ctf_trace *trace =
+		caa_container_of(ref, struct ctf_trace, ref);
+	int ret;
+	struct lttng_ht_iter iter;
 
-	call_rcu(&obj->node.head, rcu_destroy_ctf_trace);
+	iter.iter.node = &trace->node.node;
+	ret = lttng_ht_del(trace->session->ctf_traces_ht, &iter);
+	assert(!ret);
+	ctf_trace_destroy(trace);
 }
 
-void ctf_trace_try_destroy(struct relay_session *session,
-		struct ctf_trace *ctf_trace)
+/*
+ * Should be called with RCU read-side lock held.
+ */
+bool ctf_trace_get(struct ctf_trace *trace)
 {
-	assert(session);
-	assert(ctf_trace);
+	bool has_ref = false;
 
-	/*
-	 * Considering no viewer attach to the session and the trace having no more
-	 * stream attached, wipe the trace.
-	 */
-	if (uatomic_read(&session->viewer_refcount) == 0 &&
-			uatomic_read(&ctf_trace->refcount) == 0) {
-		ctf_trace_delete(session->ctf_traces_ht, ctf_trace);
-		ctf_trace_destroy(ctf_trace);
+	/* Confirm that the trace refcount has not reached 0. */
+	pthread_mutex_lock(&trace->reflock);
+	if (trace->ref.refcount != 0) {
+		has_ref = true;
+		urcu_ref_get(&trace->ref);
 	}
+	pthread_mutex_unlock(&trace->reflock);
+
+	return has_ref;
 }
 
 /*
- * Create and return an allocated ctf_trace object. NULL on error.
+ * Create and return an allocated ctf_trace. NULL on error.
+ * There is no "open" and "close" for a ctf_trace, but rather just a
+ * create and refcounting. Whenever all the streams belonging to a trace
+ * put their reference, its refcount drops to 0.
  */
-struct ctf_trace *ctf_trace_create(char *path_name)
+static struct ctf_trace *ctf_trace_create(struct relay_session *session,
+		char *path_name)
 {
-	struct ctf_trace *obj;
-
-	assert(path_name);
+	struct ctf_trace *trace;
 
-	obj = zmalloc(sizeof(*obj));
-	if (!obj) {
+	trace = zmalloc(sizeof(*trace));
+	if (!trace) {
 		PERROR("ctf_trace alloc");
 		goto error;
 	}
 
-	CDS_INIT_LIST_HEAD(&obj->stream_list);
+	if (!session_get(session)) {
+		ERR("Cannot get session");
+		free(trace);
+		trace = NULL;
+		goto error;
+	}
+	trace->session = session;
+
+	CDS_INIT_LIST_HEAD(&trace->stream_list);
 
-	obj->id = ++last_relay_ctf_trace_id;
-	lttng_ht_node_init_str(&obj->node, path_name);
+	pthread_mutex_lock(&last_relay_ctf_trace_id_lock);
+	trace->id = ++last_relay_ctf_trace_id;
+	pthread_mutex_unlock(&last_relay_ctf_trace_id_lock);
 
-	DBG("Created ctf_trace %" PRIu64 " with path: %s", obj->id, path_name);
+	lttng_ht_node_init_str(&trace->node, path_name);
+	trace->session = session;
+	urcu_ref_init(&trace->ref);
+	pthread_mutex_init(&trace->lock, NULL);
+	pthread_mutex_init(&trace->reflock, NULL);
+	pthread_mutex_init(&trace->stream_list_lock, NULL);
+	lttng_ht_add_str(session->ctf_traces_ht, &trace->node);
+
+	DBG("Created ctf_trace %" PRIu64 " with path: %s", trace->id, path_name);
 
 error:
-	return obj;
+	return trace;
 }
 
 /*
- * Return a ctf_trace object if found by id in the given hash table else NULL.
- *
- * Must be called with rcu_read_lock() taken.
+ * Return a ctf_trace if found by id in the given hash table else NULL.
+ * Hold a reference on the ctf_trace, and must be paired with
+ * ctf_trace_put().
  */
-struct ctf_trace *ctf_trace_find_by_path(struct lttng_ht *ht,
+struct ctf_trace *ctf_trace_get_by_path_or_create(struct relay_session *session,
 		char *path_name)
 {
 	struct lttng_ht_node_str *node;
 	struct lttng_ht_iter iter;
 	struct ctf_trace *trace = NULL;
 
-	assert(ht);
-
-	lttng_ht_lookup(ht, (void *) path_name, &iter);
+	rcu_read_lock();
+	lttng_ht_lookup(session->ctf_traces_ht, (void *) path_name, &iter);
 	node = lttng_ht_iter_get_node_str(&iter);
 	if (!node) {
 		DBG("CTF Trace path %s not found", path_name);
 		goto end;
 	}
 	trace = caa_container_of(node, struct ctf_trace, node);
-
+	if (!ctf_trace_get(trace)) {
+		trace = NULL;
+	}
 end:
+	rcu_read_unlock();
+	if (!trace) {
+		/* Try to create */
+		trace = ctf_trace_create(session, path_name);
+	}
 	return trace;
 }
 
-/*
- * Add stream to a given hash table.
- */
-void ctf_trace_add(struct lttng_ht *ht, struct ctf_trace *trace)
+void ctf_trace_put(struct ctf_trace *trace)
 {
-	assert(ht);
-	assert(trace);
-
-	lttng_ht_add_str(ht, &trace->node);
+	rcu_read_lock();
+	pthread_mutex_lock(&trace->reflock);
+	urcu_ref_put(&trace->ref, ctf_trace_release);
+	pthread_mutex_unlock(&trace->reflock);
+	rcu_read_unlock();
 }
 
-/*
- * Delete stream from a given hash table.
- */
-void ctf_trace_delete(struct lttng_ht *ht, struct ctf_trace *trace)
+int ctf_trace_close(struct ctf_trace *trace)
 {
-	int ret;
-	struct lttng_ht_iter iter;
-
-	assert(ht);
-	assert(trace);
-
-	iter.iter.node = &trace->node.node;
-	ret = lttng_ht_del(ht, &iter);
-	assert(!ret);
+	struct relay_stream *stream;
+
+	rcu_read_lock();
+	cds_list_for_each_entry_rcu(stream, &trace->stream_list,
+			stream_node) {
+		/*
+		 * Close the stream.
+		 */
+		stream_close(stream);
+	}
+	rcu_read_unlock();
+	/*
+	 * Since all references to the trace are held by its streams, we
+	 * don't need to do any self-ref put.
+	 */
+	return 0;
 }
diff --git a/src/bin/lttng-relayd/ctf-trace.h b/src/bin/lttng-relayd/ctf-trace.h
index 489c5f1..edc7053 100644
--- a/src/bin/lttng-relayd/ctf-trace.h
+++ b/src/bin/lttng-relayd/ctf-trace.h
@@ -1,6 +1,10 @@
+#ifndef _CTF_TRACE_H
+#define _CTF_TRACE_H
+
 /*
  * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
  *                      David Goulet <dgoulet at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
@@ -16,10 +20,8 @@
  * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#ifndef _CTF_TRACE_H
-#define _CTF_TRACE_H
-
 #include <inttypes.h>
+#include <urcu/ref.h>
 
 #include <common/hashtable/hashtable.h>
 
@@ -27,38 +29,44 @@
 #include "stream.h"
 
 struct ctf_trace {
-	int refcount;
-	unsigned int invalid_flag:1;
+	/*
+	 * The ctf_trace reflock nests inside the stream reflock.
+	 */
+	pthread_mutex_t reflock;	/* Protects refcounting */
+	struct urcu_ref ref;		/* Every stream has a ref on the trace. */
+	struct relay_session *session;	/* Back ref to trace session */
+
+	/*
+	 * The ctf_trace lock nests inside the session lock.
+	 */
+	pthread_mutex_t lock;
 	uint64_t id;
 	uint64_t metadata_received;
 	uint64_t metadata_sent;
 	struct relay_stream *metadata_stream;
 	struct relay_viewer_stream *viewer_metadata_stream;
-	/* Node indexed by stream path name in the corresponding session. */
-	struct lttng_ht_node_str node;
 
-	/* Relay stream associated with this ctf trace. */
+	/*
+	 * Relay streams associated with this ctf trace.
+	 * Updates are protected by the stream_list lock.
+	 * Traversals are protected by RCU.
+	 */
 	struct cds_list_head stream_list;
-};
-
-static inline void ctf_trace_get_ref(struct ctf_trace *trace)
-{
-	uatomic_inc(&trace->refcount);
-}
+	pthread_mutex_t stream_list_lock;
 
-static inline void ctf_trace_put_ref(struct ctf_trace *trace)
-{
-	uatomic_add(&trace->refcount, -1);
-}
+	/*
+	 * Node within session trace hash table. Node is indexed by
+	 * stream path name.
+	 */
+	struct lttng_ht_node_str node;
+	struct rcu_head rcu_node;	/* For call_rcu teardown. */
+};
 
-void ctf_trace_assign(struct relay_stream *stream);
-struct ctf_trace *ctf_trace_create(char *path_name);
-void ctf_trace_destroy(struct ctf_trace *obj);
-void ctf_trace_try_destroy(struct relay_session *session,
-		struct ctf_trace *ctf_trace);
-struct ctf_trace *ctf_trace_find_by_path(struct lttng_ht *ht,
+struct ctf_trace *ctf_trace_get_by_path_or_create(struct relay_session *session,
 		char *path_name);
-void ctf_trace_add(struct lttng_ht *ht, struct ctf_trace *trace);
-void ctf_trace_delete(struct lttng_ht *ht, struct ctf_trace *trace);
+bool ctf_trace_get(struct ctf_trace *trace);
+void ctf_trace_put(struct ctf_trace *trace);
+
+int ctf_trace_close(struct ctf_trace *trace);
 
 #endif /* _CTF_TRACE_H */
diff --git a/src/bin/lttng-relayd/index.c b/src/bin/lttng-relayd/index.c
index b7507a0..99715c2 100644
--- a/src/bin/lttng-relayd/index.c
+++ b/src/bin/lttng-relayd/index.c
@@ -1,6 +1,7 @@
 /*
  * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
  *                      David Goulet <dgoulet at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
@@ -24,190 +25,308 @@
 #include <common/utils.h>
 
 #include "lttng-relayd.h"
+#include "stream.h"
 #include "index.h"
 
 /*
- * Deferred free of a relay index object. MUST only be called by a call RCU.
+ * Allocate a new relay index object. Pass the stream in which it is
+ * contained as parameter. The sequence number will be used as the hash
+ * table key.
+ *
+ * Called with stream mutex held.
+ * Return allocated object or else NULL on error.
  */
-static void deferred_free_relay_index(struct rcu_head *head)
+static struct relay_index *relay_index_create(struct relay_stream *stream,
+		uint64_t net_seq_num)
 {
-	struct relay_index *index =
-		caa_container_of(head, struct relay_index, rcu_node);
+	struct relay_index *index;
 
-	if (index->to_close_fd >= 0) {
-		int ret;
+	DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64,
+			stream->stream_handle, net_seq_num);
 
-		ret = close(index->to_close_fd);
-		if (ret < 0) {
-			PERROR("Relay index to close fd %d", index->to_close_fd);
-		}
+	index = zmalloc(sizeof(*index));
+	if (!index) {
+		PERROR("Relay index zmalloc");
+		goto end;
+	}
+	if (!stream_get(stream)) {
+		ERR("Cannot get stream");
+		free(index);
+		index = NULL;
+		goto end;
 	}
+	index->stream = stream;
 
-	relay_index_free(index);
+	lttng_ht_node_init_u64(&index->index_n, net_seq_num);
+	pthread_mutex_init(&index->lock, NULL);
+	pthread_mutex_init(&index->reflock, NULL);
+	urcu_ref_init(&index->ref);
+
+end:
+	return index;
 }
 
 /*
- * Allocate a new relay index object using the given stream ID and sequence
- * number as the hash table key.
+ * Add unique relay index to the given hash table. In case of a collision, the
+ * already existing object is put in the given _index variable.
  *
- * Return allocated object or else NULL on error.
+ * RCU read side lock MUST be acquired.
  */
-struct relay_index *relay_index_create(uint64_t stream_id,
-		uint64_t net_seq_num)
+static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
+		struct relay_index *index)
 {
-	struct relay_index *index;
+	struct cds_lfht_node *node_ptr;
+	struct relay_index *_index;
 
-	DBG2("Creating relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
-			stream_id, net_seq_num);
+	DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
+			stream->stream_handle, index->index_n.key);
 
-	index = zmalloc(sizeof(*index));
-	if (index == NULL) {
-		PERROR("Relay index zmalloc");
-		goto error;
+	node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht,
+			stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed),
+			stream->indexes_ht->match_fct, &index->index_n,
+			&index->index_n.node);
+	if (node_ptr != &index->index_n.node) {
+		_index = caa_container_of(node_ptr, struct relay_index,
+				index_n.node);
+	} else {
+		_index = NULL;
 	}
+	return _index;
+}
+
+/*
+ * Should be called with RCU read-side lock held.
+ */
+static bool relay_index_get(struct relay_index *index)
+{
+	bool has_ref = false;
 
-	index->to_close_fd = -1;
-	lttng_ht_node_init_two_u64(&index->index_n, stream_id, net_seq_num);
+	DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
+			index->stream->stream_handle, index->index_n.key,
+			(int) index->ref.refcount);
 
-error:
-	return index;
+	/* Confirm that the index refcount has not reached 0. */
+	pthread_mutex_lock(&index->reflock);
+	if (index->ref.refcount != 0) {
+		has_ref = true;
+		urcu_ref_get(&index->ref);
+	}
+	pthread_mutex_unlock(&index->reflock);
+
+	return has_ref;
 }
 
 /*
- * Find a relayd index in the given hash table.
+ * Get a relayd index in within the given stream, or create it if not
+ * present.
  *
+ * Called with stream mutex held.
  * Return index object or else NULL on error.
  */
-struct relay_index *relay_index_find(uint64_t stream_id, uint64_t net_seq_num)
+struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
+		 uint64_t net_seq_num)
 {
-	struct lttng_ht_node_two_u64 *node;
+	struct lttng_ht_node_u64 *node;
 	struct lttng_ht_iter iter;
-	struct lttng_ht_two_u64 key;
 	struct relay_index *index = NULL;
+	bool has_ref = false;
 
 	DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64,
-			stream_id, net_seq_num);
-
-	key.key1 = stream_id;
-	key.key2 = net_seq_num;
+			stream->stream_handle, net_seq_num);
 
-	lttng_ht_lookup(indexes_ht, (void *)(&key), &iter);
-	node = lttng_ht_iter_get_node_two_u64(&iter);
-	if (node == NULL) {
-		goto end;
+	rcu_read_lock();
+	lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter);
+	node = lttng_ht_iter_get_node_u64(&iter);
+	if (node) {
+		index = caa_container_of(node, struct relay_index, index_n);
+	} else {
+		struct relay_index *oldindex;
+
+		index = relay_index_create(stream, net_seq_num);
+		if (!index) {
+			ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64,
+				index->stream->stream_handle, net_seq_num);
+			goto end;
+		}
+		oldindex = relay_index_add_unique(stream, index);
+		if (oldindex) {
+			/* Added concurrently, keep old. */
+			relay_index_put(index);
+			index = oldindex;
+			if (!relay_index_get(index)) {
+				index = NULL;
+			}
+		} else {
+			stream->indexes_in_flight++;
+			index->in_hash_table = true;
+		}
 	}
-	index = caa_container_of(node, struct relay_index, index_n);
-
 end:
-	DBG2("Index %sfound in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
-			(index == NULL) ? "NOT " : "", stream_id, net_seq_num);
+	rcu_read_unlock();
+	DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
+			(index == NULL) ? "NOT " : "", index->stream->stream_handle, net_seq_num);
 	return index;
 }
 
-/*
- * Add unique relay index to the given hash table. In case of a collision, the
- * already existing object is put in the given _index variable.
- *
- * RCU read side lock MUST be acquired.
- */
-void relay_index_add(struct relay_index *index, struct relay_index **_index)
+int relay_index_set_fd(struct relay_index *index, struct stream_fd *index_fd,
+		uint64_t data_offset)
 {
-	struct cds_lfht_node *node_ptr;
+	int ret = 0;
 
-	assert(index);
+	pthread_mutex_lock(&index->lock);
+	if (index->index_fd) {
+		ret = -1;
+		goto end;
+	}
+	stream_fd_get(index_fd);
+	index->index_fd = index_fd;
+	index->index_data.offset = data_offset;
+end:
+	pthread_mutex_unlock(&index->lock);
+	return ret;
+}
 
-	DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
-			index->index_n.key.key1, index->index_n.key.key2);
+int relay_index_set_data(struct relay_index *index,
+                const struct ctf_packet_index *data)
+{
+	int ret = 0;
 
-	node_ptr = cds_lfht_add_unique(indexes_ht->ht,
-			indexes_ht->hash_fct((void *) &index->index_n.key, lttng_ht_seed),
-			indexes_ht->match_fct, (void *) &index->index_n.key,
-			&index->index_n.node);
-	if (node_ptr != &index->index_n.node) {
-		*_index = caa_container_of(node_ptr, struct relay_index, index_n.node);
+	pthread_mutex_lock(&index->lock);
+	if (index->has_index_data) {
+		ret = -1;
+		goto end;
 	}
+	/* Set everything except data_offset. */
+	index->index_data.packet_size = data->packet_size;
+	index->index_data.content_size = data->content_size;
+	index->index_data.timestamp_begin = data->timestamp_begin;
+	index->index_data.timestamp_end = data->timestamp_end;
+	index->index_data.events_discarded = data->events_discarded;
+	index->index_data.stream_id = data->stream_id;
+	index->has_index_data = true;
+end:
+	pthread_mutex_unlock(&index->lock);
+	return ret;
 }
 
-/*
- * Write index on disk to the given fd. Once done error or not, it is removed
- * from the hash table and destroy the object.
- *
- * MUST be called with a RCU read side lock held.
- *
- * Return 0 on success else a negative value.
- */
-int relay_index_write(int fd, struct relay_index *index)
+static void index_destroy(struct relay_index *index)
 {
-	int ret;
-	struct lttng_ht_iter iter;
-
-	DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
-			" on fd %d", index->index_n.key.key1,
-			index->index_n.key.key2, fd);
+	free(index);
+}
 
-	/* Delete index from hash table. */
-	iter.iter.node = &index->index_n.node;
-	ret = lttng_ht_del(indexes_ht, &iter);
-	assert(!ret);
-	call_rcu(&index->rcu_node, deferred_free_relay_index);
+static void index_destroy_rcu(struct rcu_head *rcu_head)
+{
+	struct relay_index *index =
+		caa_container_of(rcu_head, struct relay_index, rcu_node);
 
-	return index_write(fd, &index->index_data, sizeof(index->index_data));
+	index_destroy(index);
 }
 
-/*
- * Free the given index.
- */
-void relay_index_free(struct relay_index *index)
+static void index_release(struct urcu_ref *ref)
 {
-	free(index);
+	struct relay_index *index = caa_container_of(ref, struct relay_index, ref);
+	struct relay_stream *stream = index->stream;
+	int ret;
+	struct lttng_ht_iter iter;
+
+	if (index->index_fd) {
+		stream_fd_put(index->index_fd);
+		index->index_fd = NULL;
+	}
+	if (index->in_hash_table) {
+		/* Delete index from hash table. */
+		iter.iter.node = &index->index_n.node;
+		ret = lttng_ht_del(stream->indexes_ht, &iter);
+		assert(!ret);
+		stream->indexes_in_flight--;
+	}
+
+	stream_put(index->stream);
+	index->stream = NULL;
+
+	call_rcu(&index->rcu_node, index_destroy_rcu);
 }
 
 /*
- * Safely free the given index using a call RCU.
+ * Called with stream mutex held.
  */
-void relay_index_free_safe(struct relay_index *index)
+void relay_index_put(struct relay_index *index)
 {
-	if (!index) {
-		return;
-	}
-
-	call_rcu(&index->rcu_node, deferred_free_relay_index);
+	DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
+			index->stream->stream_handle, index->index_n.key,
+			(int) index->ref.refcount);
+	/*
+	 * Ensure existance of index->lock for index unlock.
+	 */
+	rcu_read_lock();
+	/*
+	 * Index lock ensures that concurrent test and update of stream
+	 * ref is atomic.
+	 */
+	pthread_mutex_lock(&index->reflock);
+	assert(index->ref.refcount != 0);
+	urcu_ref_put(&index->ref, index_release);
+	pthread_mutex_unlock(&index->reflock);
+	rcu_read_unlock();
 }
 
 /*
- * Delete index from the given hash table.
+ * Try to flush index to disk. Releases self-reference to index once
+ * flush succeeds.
  *
- * RCU read side lock MUST be acquired.
+ * Return 0 on successful flush, a negative value on error, or positive
+ * value if no flush was performed.
  */
-void relay_index_delete(struct relay_index *index)
+int relay_index_try_flush(struct relay_index *index)
 {
-	int ret;
-	struct lttng_ht_iter iter;
+	int ret = 1;
+	bool flushed = false;
+	int fd;
 
-	DBG3("Relay index with stream ID %" PRIu64 " and seq num %" PRIu64
-			" deleted.", index->index_n.key.key1,
-			index->index_n.key.key2);
+	pthread_mutex_lock(&index->lock);
+	if (index->flushed) {
+		goto skip;
+	}
+	/* Check if we are ready to flush. */
+	if (!index->has_index_data || !index->index_fd) {
+		goto skip;
+	}
+	fd = index->index_fd->fd;
+	DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
+			" on fd %d", index->stream->stream_handle,
+			index->index_n.key, fd);
+	flushed = true;
+	index->flushed = true;
+	ret = index_write(fd, &index->index_data, sizeof(index->index_data));
+	if (ret == sizeof(index->index_data)) {
+		ret = 0;
+	} else {
+		ret = -1;
+	}
+skip:
+	pthread_mutex_unlock(&index->lock);
 
-	/* Delete index from hash table. */
-	iter.iter.node = &index->index_n.node;
-	ret = lttng_ht_del(indexes_ht, &iter);
-	assert(!ret);
+	if (flushed) {
+		/* Put self-ref from index now that it has been flushed. */
+		relay_index_put(index);
+	}
+	return ret;
 }
 
 /*
- * Destroy every relay index with the given stream id as part of the key.
+ * Close every relay index within a given stream, without flushing
+ * them.
  */
-void relay_index_destroy_by_stream_id(uint64_t stream_id)
+void relay_index_close_all(struct relay_stream *stream)
 {
 	struct lttng_ht_iter iter;
 	struct relay_index *index;
 
 	rcu_read_lock();
-	cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, index_n.node) {
-		if (index->index_n.key.key1 == stream_id) {
-			relay_index_delete(index);
-			relay_index_free_safe(index);
-		}
+	cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
+			index, index_n.node) {
+		/* Put self-ref from index. */
+		relay_index_put(index);
 	}
 	rcu_read_unlock();
 }
diff --git a/src/bin/lttng-relayd/index.h b/src/bin/lttng-relayd/index.h
index e7f9cdb..adf85bb 100644
--- a/src/bin/lttng-relayd/index.h
+++ b/src/bin/lttng-relayd/index.h
@@ -1,6 +1,10 @@
+#ifndef _RELAY_INDEX_H
+#define _RELAY_INDEX_H
+
 /*
  * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
  *                      David Goulet <dgoulet at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
@@ -16,42 +20,55 @@
  * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#ifndef _RELAY_INDEX_H
-#define _RELAY_INDEX_H
-
 #include <inttypes.h>
 #include <pthread.h>
 
 #include <common/hashtable/hashtable.h>
 #include <common/index/index.h>
 
+#include "stream-fd.h"
+
+struct relay_stream;
+
 struct relay_index {
-	/* FD on which to write the index data. */
-	int fd;
 	/*
-	 * When destroying this object, this fd is checked and if valid, close it
-	 * so this is basically a lazy close of the previous fd corresponding to
-	 * the same stream id. This is used for the rotate file feature.
+	 * index lock nests inside stream lock.
 	 */
-	int to_close_fd;
+	pthread_mutex_t reflock;	/* Protects refcounting. */
+	struct urcu_ref ref;		/* Reference from getters. */
+	struct relay_stream *stream;	/* Back ref to stream */
+
+	pthread_mutex_t lock;
+	/*
+	 * FD on which to write the index data. May differ from
+	 * stream->index_fd due to tracefile rotation.
+	 */
+	struct stream_fd *index_fd;
 
 	/* Index packet data. This is the data that is written on disk. */
 	struct ctf_packet_index index_data;
 
-	/* key1 = stream_id, key2 = net_seq_num */
-	struct lttng_ht_node_two_u64 index_n;
-	struct rcu_head rcu_node;
-	pthread_mutex_t mutex;
+	bool has_index_data;
+	bool flushed;
+	bool in_hash_table;
+
+	/*
+	 * Node within indexes_ht that corresponds to this struct
+	 * relay_index. Indexed by net_seq_num.
+	 */
+	struct lttng_ht_node_u64 index_n;
+	struct rcu_head rcu_node;	/* For call_rcu teardown. */
 };
 
-struct relay_index *relay_index_create(uint64_t stream_id,
+struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
 		uint64_t net_seq_num);
-struct relay_index *relay_index_find(uint64_t stream_id, uint64_t net_seq_num);
-void relay_index_add(struct relay_index *index, struct relay_index **_index);
-int relay_index_write(int fd, struct relay_index *index);
-void relay_index_free(struct relay_index *index);
-void relay_index_free_safe(struct relay_index *index);
-void relay_index_delete(struct relay_index *index);
-void relay_index_destroy_by_stream_id(uint64_t stream_id);
+void relay_index_put(struct relay_index *index);
+int relay_index_set_fd(struct relay_index *index, struct stream_fd *index_fd,
+		 uint64_t data_offset);
+int relay_index_set_data(struct relay_index *index,
+                const struct ctf_packet_index *data);
+int relay_index_try_flush(struct relay_index *index);
+
+void relay_index_close_all(struct relay_stream *stream);
 
 #endif /* _RELAY_INDEX_H */
diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c
index 562a7fa..90af673 100644
--- a/src/bin/lttng-relayd/live.c
+++ b/src/bin/lttng-relayd/live.c
@@ -1,6 +1,7 @@
 /*
  * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
  *                      David Goulet <dgoulet at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License, version 2 only,
@@ -36,6 +37,7 @@
 #include <inttypes.h>
 #include <urcu/futex.h>
 #include <urcu/uatomic.h>
+#include <urcu/rculist.h>
 #include <unistd.h>
 #include <fcntl.h>
 #include <config.h>
@@ -65,6 +67,9 @@
 #include "session.h"
 #include "ctf-trace.h"
 #include "connection.h"
+#include "viewer-session.h"
+
+#define SESSION_BUF_DEFAULT_COUNT	16
 
 static struct lttng_uri *live_uri;
 
@@ -90,6 +95,8 @@ static pthread_t live_worker_thread;
 static struct relay_conn_queue viewer_conn_queue;
 
 static uint64_t last_relay_viewer_session_id;
+static pthread_mutex_t last_relay_viewer_session_id_lock =
+		PTHREAD_MUTEX_INITIALIZER;
 
 /*
  * Cleanup the daemon
@@ -114,9 +121,6 @@ ssize_t recv_request(struct lttcomm_sock *sock, void *buf, size_t size)
 {
 	ssize_t ret;
 
-	assert(sock);
-	assert(buf);
-
 	ret = sock->ops->recvmsg(sock, buf, size, 0);
 	if (ret < 0 || ret != size) {
 		if (ret == 0) {
@@ -143,9 +147,6 @@ ssize_t send_response(struct lttcomm_sock *sock, void *buf, size_t size)
 {
 	ssize_t ret;
 
-	assert(sock);
-	assert(buf);
-
 	ret = sock->ops->sendmsg(sock, buf, size, 0);
 	if (ret < 0) {
 		ERR("Relayd failed to send response.");
@@ -171,17 +172,22 @@ int check_new_streams(struct relay_connection *conn)
 	if (!conn->viewer_session) {
 		goto end;
 	}
-	cds_list_for_each_entry(session,
-			&conn->viewer_session->sessions_head,
-			viewer_session_list) {
+	rcu_read_lock();
+	cds_list_for_each_entry_rcu(session,
+			&conn->viewer_session->session_list,
+			viewer_session_node) {
+		if (!session_get(session)) {
+			continue;
+		}
 		current_val = uatomic_cmpxchg(&session->new_streams, 1, 0);
 		ret = current_val;
+		session_put(session);
 		if (ret == 1) {
 			goto end;
 		}
 	}
-
 end:
+	rcu_read_unlock();
 	return ret;
 }
 
@@ -200,8 +206,6 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock,
 	struct lttng_ht_iter iter;
 	struct relay_viewer_stream *vstream;
 
-	assert(session);
-
 	rcu_read_lock();
 
 	cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream,
@@ -210,30 +214,38 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock,
 
 		health_code_update();
 
+		if (!viewer_stream_get(vstream)) {
+			continue;
+		}
+
+		pthread_mutex_lock(&vstream->lock);
 		/* Ignore if not the same session. */
-		if (vstream->session_id != session->id ||
+		if (vstream->stream->trace->session->id != session->id ||
 				(!ignore_sent_flag && vstream->sent_flag)) {
+			pthread_mutex_unlock(&vstream->lock);
+			viewer_stream_put(vstream);
 			continue;
 		}
 
-		ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
-				vstream->path_name);
-		assert(ctf_trace);
-
-		send_stream.id = htobe64(vstream->stream_handle);
+		ctf_trace = vstream->stream->trace;
+		send_stream.id = htobe64(vstream->stream->stream_handle);
 		send_stream.ctf_trace_id = htobe64(ctf_trace->id);
-		send_stream.metadata_flag = htobe32(vstream->metadata_flag);
+		send_stream.metadata_flag = htobe32(vstream->stream->is_metadata);
 		strncpy(send_stream.path_name, vstream->path_name,
 				sizeof(send_stream.path_name));
 		strncpy(send_stream.channel_name, vstream->channel_name,
 				sizeof(send_stream.channel_name));
 
-		DBG("Sending stream %" PRIu64 " to viewer", vstream->stream_handle);
+		DBG("Sending stream %" PRIu64 " to viewer",
+				vstream->stream->stream_handle);
+		vstream->sent_flag = 1;
+		pthread_mutex_unlock(&vstream->lock);
+
 		ret = send_response(sock, &send_stream, sizeof(send_stream));
 		if (ret < 0) {
 			goto end_unlock;
 		}
-		vstream->sent_flag = 1;
+		viewer_stream_put(vstream);
 	}
 
 	ret = 0;
@@ -263,13 +275,10 @@ int make_viewer_streams(struct relay_session *session,
 	assert(session);
 
 	/*
-	 * This is to make sure we create viewer streams for a full received
-	 * channel. For instance, if we have 8 streams for a channel that are
-	 * concurrently being flagged ready, we can end up creating just a subset
-	 * of the 8 streams (the ones that are flagged). This lock avoids this
-	 * limbo state.
+	 * Hold the session lock to ensure that we see either none or
+	 * all initial streams for a session, but no intermediate state.
 	 */
-	pthread_mutex_lock(&session->viewer_ready_lock);
+	pthread_mutex_lock(&session->lock);
 
 	/*
 	 * Create viewer streams for relay streams that are ready to be used for a
@@ -282,47 +291,46 @@ int make_viewer_streams(struct relay_session *session,
 
 		health_code_update();
 
-		if (ctf_trace->invalid_flag) {
+		if (!ctf_trace_get(ctf_trace)) {
 			continue;
 		}
 
-		cds_list_for_each_entry(stream, &ctf_trace->stream_list, trace_list) {
+		cds_list_for_each_entry_rcu(stream, &ctf_trace->stream_list, stream_node) {
 			struct relay_viewer_stream *vstream;
 
-			if (!stream->viewer_ready) {
-				continue;
-			}
-
-			vstream = viewer_stream_find_by_id(stream->stream_handle);
+			vstream = viewer_stream_get_by_id(stream->stream_handle);
 			if (!vstream) {
-				vstream = viewer_stream_create(stream, seek_t, ctf_trace);
+				vstream = viewer_stream_create(stream, seek_t);
 				if (!vstream) {
 					ret = -1;
+					ctf_trace_put(ctf_trace);
 					goto error_unlock;
 				}
-				/* Acquire reference to ctf_trace. */
-				ctf_trace_get_ref(ctf_trace);
 
 				if (nb_created) {
 					/* Update number of created stream counter. */
 					(*nb_created)++;
 				}
-			} else if (!vstream->sent_flag && nb_unsent) {
-				/* Update number of unsent stream counter. */
-				(*nb_unsent)++;
+			} else {
+				if (!vstream->sent_flag && nb_unsent) {
+					/* Update number of unsent stream counter. */
+					(*nb_unsent)++;
+				}
+				viewer_stream_put(vstream);
 			}
 			/* Update number of total stream counter. */
 			if (nb_total) {
 				(*nb_total)++;
 			}
 		}
+		ctf_trace_put(ctf_trace);
 	}
 
 	ret = 0;
 
 error_unlock:
 	rcu_read_unlock();
-	pthread_mutex_unlock(&session->viewer_ready_lock);
+	pthread_mutex_unlock(&session->lock);
 	return ret;
 }
 
@@ -512,15 +520,9 @@ restart:
 				struct relay_connection *new_conn;
 				struct lttcomm_sock *newsock;
 
-				new_conn = connection_create();
-				if (!new_conn) {
-					goto error;
-				}
-
 				newsock = live_control_sock->ops->accept(live_control_sock);
 				if (!newsock) {
 					PERROR("accepting control sock");
-					connection_free(new_conn);
 					goto error;
 				}
 				DBG("Relay viewer connection accepted socket %d", newsock->fd);
@@ -530,10 +532,13 @@ restart:
 				if (ret < 0) {
 					PERROR("setsockopt inet");
 					lttcomm_destroy_sock(newsock);
-					connection_free(new_conn);
 					goto error;
 				}
-				new_conn->sock = newsock;
+				new_conn = connection_create(newsock, RELAY_CONNECTION_UNKNOWN);
+				if (!new_conn) {
+					lttcomm_destroy_sock(newsock);
+					goto error;
+				}
 
 				/* Enqueue request for the dispatcher thread. */
 				cds_wfcq_enqueue(&viewer_conn_queue.head, &viewer_conn_queue.tail,
@@ -625,7 +630,7 @@ void *thread_dispatcher(void *data)
 			ret = lttng_write(live_conn_pipe[1], &conn, sizeof(conn));
 			if (ret < 0) {
 				PERROR("write conn pipe");
-				connection_destroy(conn);
+				connection_put(conn);
 				goto error;
 			}
 		} while (node != NULL);
@@ -664,8 +669,6 @@ int viewer_connect(struct relay_connection *conn)
 	int ret;
 	struct lttng_viewer_connect reply, msg;
 
-	assert(conn);
-
 	conn->version_check_done = 1;
 
 	health_code_update();
@@ -716,7 +719,9 @@ int viewer_connect(struct relay_connection *conn)
 		 * Increment outside of htobe64 macro, because can be used more than once
 		 * within the macro, and thus the operation may be undefined.
 		 */
+		pthread_mutex_lock(&last_relay_viewer_session_id_lock);
 		last_relay_viewer_session_id++;
+		pthread_mutex_unlock(&last_relay_viewer_session_id_lock);
 		reply.viewer_session_id = htobe64(last_relay_viewer_session_id);
 	}
 
@@ -738,6 +743,9 @@ end:
 
 /*
  * Send the viewer the list of current sessions.
+ * We need to create a copy of the hash table content because otherwise
+ * we cannot assume the number of entries stays the same between getting
+ * the number of HT elements and iteration over the HT.
  *
  * Return 0 on success or else a negative value.
  */
@@ -746,146 +754,79 @@ int viewer_list_sessions(struct relay_connection *conn)
 {
 	int ret;
 	struct lttng_viewer_list_sessions session_list;
-	unsigned long count;
-	long approx_before, approx_after;
 	struct lttng_ht_iter iter;
-	struct lttng_viewer_session send_session;
 	struct relay_session *session;
+	struct lttng_viewer_session *send_session_buf = NULL;
+	uint32_t buf_count = SESSION_BUF_DEFAULT_COUNT;
+	uint32_t count = 0;
 
 	DBG("List sessions received");
 
-	rcu_read_lock();
-	cds_lfht_count_nodes(conn->sessions_ht->ht, &approx_before, &count,
-			&approx_after);
-	session_list.sessions_count = htobe32(count);
-
-	health_code_update();
-
-	ret = send_response(conn->sock, &session_list, sizeof(session_list));
-	if (ret < 0) {
-		goto end_unlock;
+	send_session_buf = zmalloc(SESSION_BUF_DEFAULT_COUNT * sizeof(*send_session_buf));
+	if (!send_session_buf) {
+		return -1;
 	}
 
-	health_code_update();
-
-	cds_lfht_for_each_entry(conn->sessions_ht->ht, &iter.iter, session,
+	rcu_read_lock();
+	cds_lfht_for_each_entry(sessions_ht->ht, &iter.iter, session,
 			session_n.node) {
-		health_code_update();
-
-		strncpy(send_session.session_name, session->session_name,
-				sizeof(send_session.session_name));
-		strncpy(send_session.hostname, session->hostname,
-				sizeof(send_session.hostname));
-		send_session.id = htobe64(session->id);
-		send_session.live_timer = htobe32(session->live_timer);
-		send_session.clients = htobe32(session->viewer_refcount);
-		send_session.streams = htobe32(session->stream_count);
+		struct lttng_viewer_session *send_session;
 
 		health_code_update();
 
-		ret = send_response(conn->sock, &send_session, sizeof(send_session));
-		if (ret < 0) {
-			goto end_unlock;
-		}
-	}
-	health_code_update();
+		if (count >= buf_count) {
+			struct lttng_viewer_session *newbuf;
+			uint32_t new_buf_count = buf_count << 1;
 
-	ret = 0;
-end_unlock:
-	rcu_read_unlock();
-	return ret;
-}
-
-/*
- * Check if a connection is attached to a session.
- * Return 1 if attached, 0 if not attached, a negative value on error.
- */
-static
-int session_attached(struct relay_connection *conn, uint64_t session_id)
-{
-	struct relay_session *session;
-	int found = 0;
-
-	if (!conn->viewer_session) {
-		goto end;
-	}
-	cds_list_for_each_entry(session,
-			&conn->viewer_session->sessions_head,
-			viewer_session_list) {
-		if (session->id == session_id) {
-			found = 1;
-			goto end;
+			newbuf = realloc(send_session_buf,
+				new_buf_count * sizeof(*send_session_buf));
+			if (!new_buf_count) {
+				ret = -1;
+				rcu_read_unlock();
+				goto end_free;
+			}
+			send_session_buf = newbuf;
+			buf_count = new_buf_count;
 		}
-	}
-
-end:
-	return found;
-}
-
-/*
- * Delete all streams for a specific session ID.
- */
-static void destroy_viewer_streams_by_session(struct relay_session *session)
-{
-	struct relay_viewer_stream *stream;
-	struct lttng_ht_iter iter;
-
-	assert(session);
-
-	rcu_read_lock();
-	cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, stream,
-			stream_n.node) {
-		struct ctf_trace *ctf_trace;
-
-		health_code_update();
-		if (stream->session_id != session->id) {
-			continue;
+		send_session = &send_session_buf[count];
+		strncpy(send_session->session_name, session->session_name,
+				sizeof(send_session->session_name));
+		strncpy(send_session->hostname, session->hostname,
+				sizeof(send_session->hostname));
+		send_session->id = htobe64(session->id);
+		send_session->live_timer = htobe32(session->live_timer);
+		if (session->viewer_attached) {
+			send_session->clients = htobe32(1);
+		} else {
+			send_session->clients = htobe32(0);
 		}
+		send_session->streams = htobe32(session->stream_count);
+		count++;
+	}
+	rcu_read_unlock();
 
-		ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
-				stream->path_name);
-		assert(ctf_trace);
-
-		viewer_stream_delete(stream);
+	session_list.sessions_count = htobe32(count);
 
-		if (stream->metadata_flag) {
-			ctf_trace->metadata_sent = 0;
-			ctf_trace->viewer_metadata_stream = NULL;
-		}
+	health_code_update();
 
-		viewer_stream_destroy(ctf_trace, stream);
+	ret = send_response(conn->sock, &session_list, sizeof(session_list));
+	if (ret < 0) {
+		goto end_free;
 	}
-	rcu_read_unlock();
-}
-
-static void try_destroy_streams(struct relay_session *session)
-{
-	struct ctf_trace *ctf_trace;
-	struct lttng_ht_iter iter;
 
-	assert(session);
+	health_code_update();
 
-	cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace,
-			node.node) {
-		/* Attempt to destroy the ctf trace of that session. */
-		ctf_trace_try_destroy(session, ctf_trace);
+	ret = send_response(conn->sock, send_session_buf,
+			count * sizeof(*send_session_buf));
+	if (ret < 0) {
+		goto end_free;
 	}
-}
+	health_code_update();
 
-/*
- * Cleanup a session.
- */
-static void cleanup_session(struct relay_connection *conn,
-		struct relay_session *session)
-{
-	/*
-	 * Very important that this is done before destroying the session so we
-	 * can put back every viewer stream reference from the ctf_trace.
-	 */
-	destroy_viewer_streams_by_session(session);
-	try_destroy_streams(session);
-	cds_list_del(&session->viewer_session_list);
-	session_viewer_try_destroy(conn->sessions_ht, session);
+	ret = 0;
+end_free:
+	free(send_session_buf);
+	return ret;
 }
 
 /*
@@ -918,15 +859,14 @@ int viewer_get_new_streams(struct relay_connection *conn)
 
 	memset(&response, 0, sizeof(response));
 
-	rcu_read_lock();
-	session = session_find_by_id(conn->sessions_ht, session_id);
+	session = session_get_by_id(session_id);
 	if (!session) {
 		DBG("Relay session %" PRIu64 " not found", session_id);
 		response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
 		goto send_reply;
 	}
 
-	if (!session_attached(conn, session_id)) {
+	if (!viewer_session_is_attached(conn->viewer_session, session)) {
 		send_streams = 0;
 		response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
 		goto send_reply;
@@ -938,7 +878,7 @@ int viewer_get_new_streams(struct relay_connection *conn)
 	ret = make_viewer_streams(session, LTTNG_VIEWER_SEEK_LAST, NULL, &nb_unsent,
 			&nb_created);
 	if (ret < 0) {
-		goto end_unlock;
+		goto end_put_session;
 	}
 	/* Only send back the newly created streams with the unsent ones. */
 	nb_streams = nb_created + nb_unsent;
@@ -949,15 +889,13 @@ int viewer_get_new_streams(struct relay_connection *conn)
 	 * it means that the viewer has already received the whole trace
 	 * for this session and should now close it.
 	 */
-	if (nb_streams == 0 && session->close_flag) {
+	if (nb_streams == 0 && session->connection_closed) {
 		send_streams = 0;
-		response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP);
-		/*
-		 * Remove the session from the attached list of the connection
-		 * and try to destroy it.
-		 */
-		cds_list_del(&session->viewer_session_list);
-		cleanup_session(conn, session);
+		if (viewer_session_detach(conn->viewer_session, session)) {
+			response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
+		} else {
+			response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP);
+		}
 		goto send_reply;
 	}
 
@@ -965,7 +903,7 @@ send_reply:
 	health_code_update();
 	ret = send_response(conn->sock, &response, sizeof(response));
 	if (ret < 0) {
-		goto end_unlock;
+		goto end_put_session;
 	}
 	health_code_update();
 
@@ -975,7 +913,7 @@ send_reply:
 	 */
 	if (!send_streams || !nb_streams) {
 		ret = 0;
-		goto end_unlock;
+		goto end_put_session;
 	}
 
 	/*
@@ -984,11 +922,13 @@ send_reply:
 	 */
 	ret = send_viewer_streams(conn->sock, session, 0);
 	if (ret < 0) {
-		goto end_unlock;
+		goto end_put_session;
 	}
 
-end_unlock:
-	rcu_read_unlock();
+end_put_session:
+	if (session) {
+		session_put(session);
+	}
 error:
 	return ret;
 }
@@ -1005,7 +945,7 @@ int viewer_attach_session(struct relay_connection *conn)
 	enum lttng_viewer_seek seek_type;
 	struct lttng_viewer_attach_session_request request;
 	struct lttng_viewer_attach_session_response response;
-	struct relay_session *session;
+	struct relay_session *session = NULL;
 
 	assert(conn);
 
@@ -1027,37 +967,33 @@ int viewer_attach_session(struct relay_connection *conn)
 		goto send_reply;
 	}
 
-	rcu_read_lock();
-	session = session_find_by_id(conn->sessions_ht,
-			be64toh(request.session_id));
+	session = session_get_by_id(be64toh(request.session_id));
 	if (!session) {
 		DBG("Relay session %" PRIu64 " not found",
 				be64toh(request.session_id));
 		response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
 		goto send_reply;
 	}
-	session_viewer_attach(session);
 	DBG("Attach session ID %" PRIu64 " received", be64toh(request.session_id));
 
-	if (uatomic_read(&session->viewer_refcount) > 1) {
-		DBG("Already a viewer attached");
-		response.status = htobe32(LTTNG_VIEWER_ATTACH_ALREADY);
-		session_viewer_detach(session);
-		goto send_reply;
-	} else if (session->live_timer == 0) {
+	if (session->live_timer == 0) {
 		DBG("Not live session");
 		response.status = htobe32(LTTNG_VIEWER_ATTACH_NOT_LIVE);
 		goto send_reply;
-	} else {
-		send_streams = 1;
-		response.status = htobe32(LTTNG_VIEWER_ATTACH_OK);
-		cds_list_add(&session->viewer_session_list,
-				&conn->viewer_session->sessions_head);
+	}
+
+	send_streams = 1;
+	ret = viewer_session_attach(conn->viewer_session, session);
+	if (ret) {
+		DBG("Already a viewer attached");
+		response.status = htobe32(LTTNG_VIEWER_ATTACH_ALREADY);
+		goto send_reply;
 	}
 
 	switch (be32toh(request.seek)) {
 	case LTTNG_VIEWER_SEEK_BEGINNING:
 	case LTTNG_VIEWER_SEEK_LAST:
+		response.status = htobe32(LTTNG_VIEWER_ATTACH_OK);
 		seek_type = be32toh(request.seek);
 		break;
 	default:
@@ -1069,7 +1005,7 @@ int viewer_attach_session(struct relay_connection *conn)
 
 	ret = make_viewer_streams(session, seek_type, &nb_streams, NULL, NULL);
 	if (ret < 0) {
-		goto end_unlock;
+		goto end_put_session;
 	}
 	response.streams_count = htobe32(nb_streams);
 
@@ -1077,7 +1013,7 @@ send_reply:
 	health_code_update();
 	ret = send_response(conn->sock, &response, sizeof(response));
 	if (ret < 0) {
-		goto end_unlock;
+		goto end_put_session;
 	}
 	health_code_update();
 
@@ -1087,17 +1023,19 @@ send_reply:
 	 */
 	if (!send_streams || !nb_streams) {
 		ret = 0;
-		goto end_unlock;
+		goto end_put_session;
 	}
 
 	/* Send stream and ignore the sent flag. */
 	ret = send_viewer_streams(conn->sock, session, 1);
 	if (ret < 0) {
-		goto end_unlock;
+		goto end_put_session;
 	}
 
-end_unlock:
-	rcu_read_unlock();
+end_put_session:
+	if (session) {
+		session_put(session);
+	}
 error:
 	return ret;
 }
@@ -1105,38 +1043,42 @@ error:
 /*
  * Open the index file if needed for the given vstream.
  *
- * If an index file is successfully opened, the index_read_fd of the stream is
- * set with it.
+ * If an index file is successfully opened, the vstream index_fd set with
+ * it.
  *
  * Return 0 on success, a negative value on error (-ENOENT if not ready yet).
+ *
+ * Called with both rstream and vstream locks held.
  */
 static int try_open_index(struct relay_viewer_stream *vstream,
 		struct relay_stream *rstream)
 {
 	int ret = 0;
 
-	assert(vstream);
-	assert(rstream);
-
-	if (vstream->index_read_fd >= 0) {
+	if (vstream->index_fd) {
 		goto end;
 	}
 
 	/*
-	 * First time, we open the index file and at least one index is ready.  The
-	 * race between the read and write of the total_index_received is
-	 * acceptable here since the client will be notified to simply come back
-	 * and get the next index.
+	 * First time, we open the index file and at least one index is ready.
 	 */
 	if (rstream->total_index_received <= 0) {
 		ret = -ENOENT;
 		goto end;
 	}
 	ret = index_open(vstream->path_name, vstream->channel_name,
-			vstream->tracefile_count, vstream->tracefile_count_current);
+			vstream->stream->tracefile_count,
+			vstream->current_tracefile_id);
 	if (ret >= 0) {
-		vstream->index_read_fd = ret;
-		ret = 0;
+		vstream->index_fd = stream_fd_create(ret);
+		if (!vstream->index_fd) {
+			if (close(ret)) {
+				PERROR("close");
+			}
+			ret = -1;
+		} else {
+			ret = 0;
+		}
 		goto end;
 	}
 
@@ -1146,12 +1088,14 @@ end:
 
 /*
  * Check the status of the index for the given stream. This function updates
- * the index structure if needed and can destroy the vstream also for the HUP
- * situation.
+ * the index structure if needed and can put (close) the vstream in the
+ * HUP situation.
  *
  * Return 0 means that we can proceed with the index. A value of 1 means that
  * the index has been updated and is ready to be send to the client. A negative
  * value indicates an error that can't be handled.
+ *
+ * Called with both rstream and vstream locks held.
  */
 static int check_index_status(struct relay_viewer_stream *vstream,
 		struct relay_stream *rstream, struct ctf_trace *trace,
@@ -1159,68 +1103,95 @@ static int check_index_status(struct relay_viewer_stream *vstream,
 {
 	int ret;
 
-	assert(vstream);
-	assert(rstream);
-	assert(index);
-	assert(trace);
-
-	if (!rstream->close_flag) {
-		/* Rotate on abort (overwrite). */
-		if (vstream->abort_flag) {
-			DBG("Viewer stream %" PRIu64 " rotate because of overwrite",
-					vstream->stream_handle);
-			ret = viewer_stream_rotate(vstream, rstream);
+	if (trace->session->connection_closed) {
+		if (rstream->total_index_received == vstream->last_sent_index) {
+			/* Last index sent and session connection is closed. */
+			index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
+			goto hup;
+		} else {
+			/*
+			 * Session connection is closed, but there are
+			 * still indexes to read.
+			 */
+			ret = 0;
+			goto end;
+		}
+	} else {
+		if (rstream->beacon_ts_end != -1ULL &&
+				rstream->total_index_received == vstream->last_sent_index) {
+			/*
+			 * We've received a synchronization beacon and the last index
+			 * available has been sent, the index for now is inactive.
+			 */
+			index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
+			index->timestamp_end = htobe64(rstream->beacon_ts_end);
+			index->stream_id = htobe64(rstream->ctf_stream_id);
+			goto index_ready;
+		} else if (rstream->total_index_received <= vstream->last_sent_index) {
+			/* This actually checks the case where recv ==  last_sent. */
+			index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+			goto index_ready;
+		} else if (!viewer_stream_is_tracefile_id_readable(vstream, vstream->current_tracefile_id)) {
+			/*
+			 * The producer has overwritten our current
+			 * file. We need to rotate.
+			 */
+			DBG("Viewer stream %" PRIu64 " rotation due to overwrite",
+					vstream->stream->stream_handle);
+			ret = viewer_stream_rotate(vstream);
 			if (ret < 0) {
-				goto error;
+				goto end;
 			} else if (ret == 1) {
-				/* EOF */
+				/* EOF across entire stream. */
 				index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
 				goto hup;
 			}
 			/* ret == 0 means successful so we continue. */
-		}
+			ret = 0;
+		} else {
+			ssize_t read_ret;
+			char tmp[1];
 
-		/* Check if we are in the same trace file at this point. */
-		if (rstream->tracefile_count_current == vstream->tracefile_count_current) {
-			if (rstream->beacon_ts_end != -1ULL &&
-					vstream->last_sent_index == rstream->total_index_received) {
-				/*
-				 * We've received a synchronization beacon and the last index
-				 * available has been sent, the index for now is inactive.
-				 */
-				index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
-				index->timestamp_end = htobe64(rstream->beacon_ts_end);
-				index->stream_id = htobe64(rstream->ctf_stream_id);
-				goto index_ready;
-			} else if (rstream->total_index_received <= vstream->last_sent_index
-					&& !vstream->close_write_flag) {
-				/*
-				 * Reader and writer are working in the same tracefile, so we care
-				 * about the number of index received and sent. Otherwise, we read
-				 * up to EOF.
-				 */
-				index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
-				goto index_ready;
+			/*
+			 * Use EOF on current index file to find out when we
+			 * need to rotate.
+			 */
+			read_ret = lttng_read(vstream->index_fd->fd, tmp, 1);
+			if (read_ret == 1) {
+				off_t seek_ret;
+
+				/* There is still data to read. Rewind position. */
+				seek_ret = lseek(vstream->index_fd->fd, -1, SEEK_CUR);
+				if (seek_ret < 0) {
+					ret = -1;
+					goto end;
+				}
+				ret = 0;
+			} else if (read_ret == 0) {
+				/* EOF. We need to rotate. */
+				DBG("Viewer stream %" PRIu64 " rotation due to EOF",
+						vstream->stream->stream_handle);
+				ret = viewer_stream_rotate(vstream);
+				if (ret < 0) {
+					goto end;
+				} else if (ret == 1) {
+					/* EOF across entire stream. */
+					index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
+					goto hup;
+				}
+				/* ret == 0 means successful so we continue. */
+				ret = 0;
+			} else {
+				/* Error reading index. */
+				ret = -1;
 			}
 		}
-		/* Nothing to do with the index, continue with it. */
-		ret = 0;
-	} else if (rstream->close_flag && vstream->close_write_flag &&
-			vstream->total_index_received == vstream->last_sent_index) {
-		/* Last index sent and current tracefile closed in write */
-		index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
-		goto hup;
-	} else {
-		vstream->close_write_flag = 1;
-		ret = 0;
 	}
-
-error:
+end:
 	return ret;
 
 hup:
-	viewer_stream_delete(vstream);
-	viewer_stream_destroy(trace, vstream);
+	viewer_stream_put(vstream);
 index_ready:
 	return 1;
 }
@@ -1238,10 +1209,9 @@ int viewer_get_next_index(struct relay_connection *conn)
 	struct lttng_viewer_get_next_index request_index;
 	struct lttng_viewer_index viewer_index;
 	struct ctf_packet_index packet_index;
-	struct relay_viewer_stream *vstream;
-	struct relay_stream *rstream;
-	struct ctf_trace *ctf_trace;
-	struct relay_session *session;
+	struct relay_viewer_stream *vstream = NULL;
+	struct relay_stream *rstream = NULL;
+	struct ctf_trace *ctf_trace = NULL;
 
 	assert(conn);
 
@@ -1255,35 +1225,29 @@ int viewer_get_next_index(struct relay_connection *conn)
 	}
 	health_code_update();
 
-	rcu_read_lock();
-	vstream = viewer_stream_find_by_id(be64toh(request_index.stream_id));
+	vstream = viewer_stream_get_by_id(be64toh(request_index.stream_id));
 	if (!vstream) {
 		ret = -1;
-		goto end_unlock;
-	}
-
-	session = session_find_by_id(conn->sessions_ht, vstream->session_id);
-	if (!session) {
-		ret = -1;
-		goto end_unlock;
+		goto end;
 	}
 
-	ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht, vstream->path_name);
-	assert(ctf_trace);
+	/* Use back. ref. Protected by refcounts. */
+	rstream = vstream->stream;
+	ctf_trace = rstream->trace;
 
 	memset(&viewer_index, 0, sizeof(viewer_index));
 
+	pthread_mutex_lock(&rstream->lock);
+	pthread_mutex_lock(&vstream->lock);
+
 	/*
 	 * The viewer should not ask for index on metadata stream.
 	 */
-	if (vstream->metadata_flag) {
+	if (rstream->is_metadata) {
 		viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
 		goto send_reply;
 	}
 
-	rstream = stream_find_by_id(relay_streams_ht, vstream->stream_handle);
-	assert(rstream);
-
 	/* Try to open an index if one is needed for that stream. */
 	ret = try_open_index(vstream, rstream);
 	if (ret < 0) {
@@ -1300,11 +1264,9 @@ int viewer_get_next_index(struct relay_connection *conn)
 		goto send_reply;
 	}
 
-	pthread_mutex_lock(&rstream->viewer_stream_rotation_lock);
 	ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index);
-	pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
 	if (ret < 0) {
-		goto end_unlock;
+		goto error_put;
 	} else if (ret == 1) {
 		/*
 		 * This means the viewer index data structure has been populated by the
@@ -1322,58 +1284,17 @@ int viewer_get_next_index(struct relay_connection *conn)
 
 	ret = check_new_streams(conn);
 	if (ret < 0) {
-		goto end_unlock;
+		viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
+		goto send_reply;
 	} else if (ret == 1) {
 		viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
 	}
 
-	pthread_mutex_lock(&rstream->viewer_stream_rotation_lock);
-	pthread_mutex_lock(&vstream->overwrite_lock);
-	if (vstream->abort_flag) {
-		/* The file is being overwritten by the writer, we cannot use it. */
-		pthread_mutex_unlock(&vstream->overwrite_lock);
-		ret = viewer_stream_rotate(vstream, rstream);
-		pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
-		if (ret < 0) {
-			goto end_unlock;
-		} else if (ret == 1) {
-			viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
-			viewer_stream_delete(vstream);
-			viewer_stream_destroy(ctf_trace, vstream);
-		} else {
-			viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
-		}
-		goto send_reply;
-	}
-
-	read_ret = lttng_read(vstream->index_read_fd, &packet_index,
+	read_ret = lttng_read(vstream->index_fd->fd, &packet_index,
 			sizeof(packet_index));
-	pthread_mutex_unlock(&vstream->overwrite_lock);
-	pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
 	if (read_ret < 0) {
-		viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
-		viewer_stream_delete(vstream);
-		viewer_stream_destroy(ctf_trace, vstream);
-		goto send_reply;
-	} else if (read_ret < sizeof(packet_index)) {
-		pthread_mutex_lock(&rstream->viewer_stream_rotation_lock);
-		if (vstream->close_write_flag) {
-			ret = viewer_stream_rotate(vstream, rstream);
-			if (ret < 0) {
-				pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
-				goto end_unlock;
-			} else if (ret == 1) {
-				viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
-				viewer_stream_delete(vstream);
-				viewer_stream_destroy(ctf_trace, vstream);
-			} else {
-				viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
-			}
-		} else {
-			ERR("Relay reading index file %d", vstream->index_read_fd);
-			viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
-		}
-		pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
+		ERR("Relay reading index file %d", vstream->index_fd->fd);
+		viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
 		goto send_reply;
 	} else {
 		viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_OK);
@@ -1392,22 +1313,31 @@ int viewer_get_next_index(struct relay_connection *conn)
 	viewer_index.stream_id = packet_index.stream_id;
 
 send_reply:
+	pthread_mutex_unlock(&vstream->lock);
+	pthread_mutex_unlock(&rstream->lock);
+
 	viewer_index.flags = htobe32(viewer_index.flags);
 	health_code_update();
 
 	ret = send_response(conn->sock, &viewer_index, sizeof(viewer_index));
 	if (ret < 0) {
-		goto end_unlock;
+		goto end;
 	}
 	health_code_update();
 
 	DBG("Index %" PRIu64 " for stream %" PRIu64 " sent",
-			vstream->last_sent_index, vstream->stream_handle);
-
-end_unlock:
-	rcu_read_unlock();
-
+			vstream->last_sent_index,
+			vstream->stream->stream_handle);
 end:
+	if (vstream) {
+		viewer_stream_put(vstream);
+	}
+	return ret;
+
+error_put:
+	pthread_mutex_unlock(&vstream->lock);
+	pthread_mutex_unlock(&rstream->lock);
+	viewer_stream_put(vstream);
 	return ret;
 }
 
@@ -1425,12 +1355,9 @@ int viewer_get_packet(struct relay_connection *conn)
 	ssize_t read_len;
 	struct lttng_viewer_get_packet get_packet_info;
 	struct lttng_viewer_trace_packet reply;
-	struct relay_viewer_stream *stream;
-	struct relay_session *session;
+	struct relay_viewer_stream *vstream = NULL;
 	struct ctf_trace *ctf_trace;
 
-	assert(conn);
-
 	DBG2("Relay get data packet");
 
 	health_code_update();
@@ -1444,37 +1371,28 @@ int viewer_get_packet(struct relay_connection *conn)
 	/* From this point on, the error label can be reached. */
 	memset(&reply, 0, sizeof(reply));
 
-	rcu_read_lock();
-	stream = viewer_stream_find_by_id(be64toh(get_packet_info.stream_id));
-	if (!stream) {
-		goto error;
-	}
-
-	session = session_find_by_id(conn->sessions_ht, stream->session_id);
-	if (!session) {
-		ret = -1;
+	vstream = viewer_stream_get_by_id(be64toh(get_packet_info.stream_id));
+	if (!vstream) {
 		goto error;
 	}
 
-	ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
-			stream->path_name);
-	assert(ctf_trace);
+	ctf_trace = vstream->stream->trace;
 
 	/*
 	 * First time we read this stream, we need open the tracefile, we should
 	 * only arrive here if an index has already been sent to the viewer, so the
 	 * tracefile must exist, if it does not it is a fatal error.
 	 */
-	if (stream->read_fd < 0) {
+	if (!vstream->stream_fd) {
 		char fullpath[PATH_MAX];
 
-		if (stream->tracefile_count > 0) {
-			ret = snprintf(fullpath, PATH_MAX, "%s/%s_%" PRIu64, stream->path_name,
-					stream->channel_name,
-					stream->tracefile_count_current);
+		if (vstream->stream->tracefile_count > 0) {
+			ret = snprintf(fullpath, PATH_MAX, "%s/%s_%" PRIu64, vstream->path_name,
+					vstream->channel_name,
+					vstream->current_tracefile_id);
 		} else {
-			ret = snprintf(fullpath, PATH_MAX, "%s/%s", stream->path_name,
-					stream->channel_name);
+			ret = snprintf(fullpath, PATH_MAX, "%s/%s", vstream->path_name,
+					vstream->channel_name);
 		}
 		if (ret < 0) {
 			goto error;
@@ -1484,7 +1402,13 @@ int viewer_get_packet(struct relay_connection *conn)
 			PERROR("Relay opening trace file");
 			goto error;
 		}
-		stream->read_fd = ret;
+		vstream->stream_fd = stream_fd_create(ret);
+		if (!vstream->stream_fd) {
+			if (close(ret)) {
+				PERROR("close");
+			}
+			goto error;
+		}
 	}
 
 	if (!ctf_trace->metadata_received ||
@@ -1496,7 +1420,7 @@ int viewer_get_packet(struct relay_connection *conn)
 
 	ret = check_new_streams(conn);
 	if (ret < 0) {
-		goto end_unlock;
+		goto end_free;
 	} else if (ret == 1) {
 		reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
 		reply.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
@@ -1510,34 +1434,17 @@ int viewer_get_packet(struct relay_connection *conn)
 		goto error;
 	}
 
-	ret = lseek(stream->read_fd, be64toh(get_packet_info.offset), SEEK_SET);
+	ret = lseek(vstream->stream_fd->fd, be64toh(get_packet_info.offset), SEEK_SET);
 	if (ret < 0) {
-		/*
-		 * If the read fd was closed by the streaming side, the
-		 * abort_flag will be set to 1, otherwise it is an error.
-		 */
-		if (stream->abort_flag == 0) {
-			PERROR("lseek");
-			goto error;
-		}
-		reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_EOF);
-		goto send_reply;
+		PERROR("lseek");
+		goto error;
 	}
-	read_len = lttng_read(stream->read_fd, data, len);
+	read_len = lttng_read(vstream->stream_fd->fd, data, len);
 	if (read_len < len) {
-		/*
-		 * If the read fd was closed by the streaming side, the
-		 * abort_flag will be set to 1, otherwise it is an error.
-		 */
-		if (stream->abort_flag == 0) {
-			PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64,
-					stream->read_fd,
-					be64toh(get_packet_info.offset));
-			goto error;
-		} else {
-			reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_EOF);
-			goto send_reply;
-		}
+		PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64,
+				vstream->stream_fd->fd,
+				be64toh(get_packet_info.offset));
+		goto error;
 	}
 	reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
 	reply.len = htobe32(len);
@@ -1554,7 +1461,7 @@ send_reply:
 
 	ret = send_response(conn->sock, &reply, sizeof(reply));
 	if (ret < 0) {
-		goto end_unlock;
+		goto end_free;
 	}
 	health_code_update();
 
@@ -1562,7 +1469,7 @@ send_reply:
 		health_code_update();
 		ret = send_response(conn->sock, data, len);
 		if (ret < 0) {
-			goto end_unlock;
+			goto end_free;
 		}
 		health_code_update();
 	}
@@ -1570,11 +1477,12 @@ send_reply:
 	DBG("Sent %u bytes for stream %" PRIu64, len,
 			be64toh(get_packet_info.stream_id));
 
-end_unlock:
+end_free:
 	free(data);
-	rcu_read_unlock();
-
 end:
+	if (vstream) {
+		viewer_stream_put(vstream);
+	}
 	return ret;
 }
 
@@ -1592,9 +1500,8 @@ int viewer_get_metadata(struct relay_connection *conn)
 	char *data = NULL;
 	struct lttng_viewer_get_metadata request;
 	struct lttng_viewer_metadata_packet reply;
-	struct relay_viewer_stream *stream;
+	struct relay_viewer_stream *vstream = NULL;
 	struct ctf_trace *ctf_trace;
-	struct relay_session *session;
 
 	assert(conn);
 
@@ -1610,22 +1517,14 @@ int viewer_get_metadata(struct relay_connection *conn)
 
 	memset(&reply, 0, sizeof(reply));
 
-	rcu_read_lock();
-	stream = viewer_stream_find_by_id(be64toh(request.stream_id));
-	if (!stream || !stream->metadata_flag) {
+	vstream = viewer_stream_get_by_id(be64toh(request.stream_id));
+	if (!vstream || !vstream->stream->is_metadata) {
 		ERR("Invalid metadata stream");
 		goto error;
 	}
 
-	session = session_find_by_id(conn->sessions_ht, stream->session_id);
-	if (!session) {
-		ret = -1;
-		goto error;
-	}
+	ctf_trace = vstream->stream->trace;
 
-	ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
-			stream->path_name);
-	assert(ctf_trace);
 	assert(ctf_trace->metadata_sent <= ctf_trace->metadata_received);
 
 	len = ctf_trace->metadata_received - ctf_trace->metadata_sent;
@@ -1635,11 +1534,11 @@ int viewer_get_metadata(struct relay_connection *conn)
 	}
 
 	/* first time, we open the metadata file */
-	if (stream->read_fd < 0) {
+	if (!vstream->stream_fd) {
 		char fullpath[PATH_MAX];
 
-		ret = snprintf(fullpath, PATH_MAX, "%s/%s", stream->path_name,
-				stream->channel_name);
+		ret = snprintf(fullpath, PATH_MAX, "%s/%s", vstream->path_name,
+				vstream->channel_name);
 		if (ret < 0) {
 			goto error;
 		}
@@ -1648,7 +1547,13 @@ int viewer_get_metadata(struct relay_connection *conn)
 			PERROR("Relay opening metadata file");
 			goto error;
 		}
-		stream->read_fd = ret;
+		vstream->stream_fd = stream_fd_create(ret);
+		if (!vstream->stream_fd) {
+			if (close(ret)) {
+				PERROR("close");
+			}
+			goto error;
+		}
 	}
 
 	reply.len = htobe64(len);
@@ -1658,7 +1563,7 @@ int viewer_get_metadata(struct relay_connection *conn)
 		goto error;
 	}
 
-	read_len = lttng_read(stream->read_fd, data, len);
+	read_len = lttng_read(vstream->stream_fd->fd, data, len);
 	if (read_len < len) {
 		PERROR("Relay reading metadata file");
 		goto error;
@@ -1674,14 +1579,14 @@ send_reply:
 	health_code_update();
 	ret = send_response(conn->sock, &reply, sizeof(reply));
 	if (ret < 0) {
-		goto end_unlock;
+		goto end_free;
 	}
 	health_code_update();
 
 	if (len > 0) {
 		ret = send_response(conn->sock, data, len);
 		if (ret < 0) {
-			goto end_unlock;
+			goto end_free;
 		}
 	}
 
@@ -1690,10 +1595,12 @@ send_reply:
 
 	DBG("Metadata sent");
 
-end_unlock:
+end_free:
 	free(data);
-	rcu_read_unlock();
 end:
+	if (vstream) {
+		viewer_stream_put(vstream);
+	}
 	return ret;
 }
 
@@ -1712,13 +1619,12 @@ int viewer_create_session(struct relay_connection *conn)
 
 	memset(&resp, 0, sizeof(resp));
 	resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_OK);
-	conn->viewer_session = zmalloc(sizeof(*conn->viewer_session));
+	conn->viewer_session = viewer_session_create();
 	if (!conn->viewer_session) {
 		ERR("Allocation viewer session");
 		resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_ERR);
 		goto send_reply;
 	}
-	CDS_INIT_LIST_HEAD(&conn->viewer_session->sessions_head);
 
 send_reply:
 	health_code_update();
@@ -1757,9 +1663,6 @@ int process_control(struct lttng_viewer_cmd *recv_hdr,
 	int ret = 0;
 	uint32_t msg_value;
 
-	assert(recv_hdr);
-	assert(conn);
-
 	msg_value = be32toh(recv_hdr->cmd);
 
 	/*
@@ -1813,8 +1716,6 @@ void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
 {
 	int ret;
 
-	assert(events);
-
 	(void) lttng_poll_del(events, pollfd);
 
 	ret = close(pollfd);
@@ -1824,38 +1725,6 @@ void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
 }
 
 /*
- * Delete and destroy a connection.
- *
- * RCU read side lock MUST be acquired.
- */
-static void destroy_connection(struct lttng_ht *relay_connections_ht,
-		struct relay_connection *conn)
-{
-	struct relay_session *session, *tmp_session;
-
-	assert(relay_connections_ht);
-	assert(conn);
-
-	connection_delete(relay_connections_ht, conn);
-
-	if (!conn->viewer_session) {
-		goto end;
-	}
-
-	rcu_read_lock();
-	cds_list_for_each_entry_safe(session, tmp_session,
-			&conn->viewer_session->sessions_head,
-			viewer_session_list) {
-		DBG("Cleaning connection of session ID %" PRIu64, session->id);
-		cleanup_session(conn, session);
-	}
-	rcu_read_unlock();
-
-end:
-	connection_destroy(conn);
-}
-
-/*
  * This thread does the actual work
  */
 static
@@ -1867,8 +1736,6 @@ void *thread_worker(void *data)
 	struct lttng_ht *relay_connections_ht;
 	struct lttng_ht_iter iter;
 	struct lttng_viewer_cmd recv_hdr;
-	struct relay_local_data *relay_ctx = (struct relay_local_data *) data;
-	struct lttng_ht *sessions_ht = relay_ctx->sessions_ht;
 	struct relay_connection *destroy_conn;
 
 	DBG("[thread] Live viewer relay worker started");
@@ -1956,46 +1823,45 @@ restart:
 					if (ret < 0) {
 						goto error;
 					}
-					conn->sessions_ht = sessions_ht;
-					connection_init(conn);
 					lttng_poll_add(&events, conn->sock->fd,
 							LPOLLIN | LPOLLRDHUP);
-					rcu_read_lock();
-					lttng_ht_add_unique_ulong(relay_connections_ht,
-							&conn->sock_n);
-					rcu_read_unlock();
-					DBG("Connection socket %d added", conn->sock->fd);
+					connection_ht_add(relay_connections_ht, conn);
+					DBG("Connection socket %d added to poll", conn->sock->fd);
 				}
 			} else {
 				struct relay_connection *conn;
 
-				rcu_read_lock();
-				conn = connection_find_by_sock(relay_connections_ht, pollfd);
-				/* If not found, there is a synchronization issue. */
-				assert(conn);
+				conn = connection_get_by_sock(relay_connections_ht, pollfd);
+				if (!conn) {
+					continue;
+				}
 
 				if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
 					cleanup_connection_pollfd(&events, pollfd);
-					destroy_connection(relay_connections_ht, conn);
+					/* Put "open" ownership reference. */
+					connection_put(conn);
 				} else if (revents & LPOLLIN) {
 					ret = conn->sock->ops->recvmsg(conn->sock, &recv_hdr,
 							sizeof(recv_hdr), 0);
 					if (ret <= 0) {
 						/* Connection closed */
 						cleanup_connection_pollfd(&events, pollfd);
-						destroy_connection(relay_connections_ht, conn);
+						/* Put "open" ownership reference. */
+						connection_put(conn);
 						DBG("Viewer control conn closed with %d", pollfd);
 					} else {
 						ret = process_control(&recv_hdr, conn);
 						if (ret < 0) {
 							/* Clear the session on error. */
 							cleanup_connection_pollfd(&events, pollfd);
-							destroy_connection(relay_connections_ht, conn);
+							/* Put "open" ownership reference. */
+							connection_put(conn);
 							DBG("Viewer connection closed with %d", pollfd);
 						}
 					}
 				}
-				rcu_read_unlock();
+				/* Put "get_by_sock" reference. */
+				connection_put(conn);
 			}
 		}
 	}
@@ -2010,7 +1876,7 @@ error:
 			destroy_conn,
 			sock_n.node) {
 		health_code_update();
-		destroy_connection(relay_connections_ht, destroy_conn);
+		connection_put(destroy_conn);
 	}
 	rcu_read_unlock();
 error_poll_create:
@@ -2078,8 +1944,7 @@ int relayd_live_join(void)
 /*
  * main
  */
-int relayd_live_create(struct lttng_uri *uri,
-		struct relay_local_data *relay_ctx)
+int relayd_live_create(struct lttng_uri *uri)
 {
 	int ret = 0, retval = 0;
 	void *status;
@@ -2129,7 +1994,7 @@ int relayd_live_create(struct lttng_uri *uri,
 
 	/* Setup the worker thread */
 	ret = pthread_create(&live_worker_thread, NULL,
-			thread_worker, relay_ctx);
+			thread_worker, NULL);
 	if (ret) {
 		errno = ret;
 		PERROR("pthread_create viewer worker");
diff --git a/src/bin/lttng-relayd/live.h b/src/bin/lttng-relayd/live.h
index 5db940b..2b8a3a0 100644
--- a/src/bin/lttng-relayd/live.h
+++ b/src/bin/lttng-relayd/live.h
@@ -1,6 +1,10 @@
+#ifndef LTTNG_RELAYD_LIVE_H
+#define LTTNG_RELAYD_LIVE_H
+
 /*
  * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
  *                      David Goulet <dgoulet at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
@@ -16,15 +20,11 @@
  * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#ifndef LTTNG_RELAYD_LIVE_H
-#define LTTNG_RELAYD_LIVE_H
-
 #include <common/uri.h>
 
 #include "lttng-relayd.h"
 
-int relayd_live_create(struct lttng_uri *live_uri,
-		struct relay_local_data *relay_ctx);
+int relayd_live_create(struct lttng_uri *live_uri);
 int relayd_live_stop(void);
 int relayd_live_join(void);
 
diff --git a/src/bin/lttng-relayd/lttng-relayd.h b/src/bin/lttng-relayd/lttng-relayd.h
index 245c5fd..889071c 100644
--- a/src/bin/lttng-relayd/lttng-relayd.h
+++ b/src/bin/lttng-relayd/lttng-relayd.h
@@ -1,6 +1,10 @@
+#ifndef LTTNG_RELAYD_H
+#define LTTNG_RELAYD_H
+
 /*
  * Copyright (C) 2012 - Julien Desfossez <jdesfossez at efficios.com>
  *                      David Goulet <dgoulet at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License, version 2 only,
@@ -16,9 +20,6 @@
  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#ifndef LTTNG_RELAYD_H
-#define LTTNG_RELAYD_H
-
 #include <limits.h>
 #include <urcu.h>
 #include <urcu/wfcqueue.h>
@@ -34,26 +35,22 @@ struct relay_conn_queue {
 	int32_t futex;
 };
 
-struct relay_local_data {
-	struct lttng_ht *sessions_ht;
-};
-
-extern char *opt_output_path;
-
 /*
  * Contains stream indexed by ID. This is important since many commands lookup
  * streams only by ID thus also keeping them in this hash table makes the
- * search O(1) instead of iterating over the ctf_traces_ht of the session.
+ * search O(1).
  */
+extern struct lttng_ht *sessions_ht;
 extern struct lttng_ht *relay_streams_ht;
-
 extern struct lttng_ht *viewer_streams_ht;
-extern struct lttng_ht *indexes_ht;
 
+extern char *opt_output_path;
 extern const char *tracing_group_name;
-
 extern const char * const config_section_name;
 
+extern uid_t relayd_uid;
+extern gid_t relayd_gid;
+
 extern int thread_quit_pipe[2];
 
 void lttng_relay_notify_ready(void);
diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c
index 8e1879f..d31f858 100644
--- a/src/bin/lttng-relayd/main.c
+++ b/src/bin/lttng-relayd/main.c
@@ -2,6 +2,7 @@
  * Copyright (C) 2012 - Julien Desfossez <jdesfossez at efficios.com>
  *                      David Goulet <dgoulet at efficios.com>
  *               2013 - Jérémie Galarneau <jeremie.galarneau at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License, version 2 only,
@@ -56,6 +57,7 @@
 #include <common/uri.h>
 #include <common/utils.h>
 #include <common/config/config.h>
+#include <urcu/rculist.h>
 
 #include "cmd.h"
 #include "ctf-trace.h"
@@ -114,6 +116,11 @@ static pthread_t dispatcher_thread;
 static pthread_t worker_thread;
 static pthread_t health_thread;
 
+/*
+ * last_relay_stream_id_lock protects last_relay_stream_id increment
+ * atomicity on 32-bit architectures.
+ */
+static pthread_mutex_t last_relay_stream_id_lock = PTHREAD_MUTEX_INITIALIZER;
 static uint64_t last_relay_stream_id;
 
 /*
@@ -129,8 +136,8 @@ static char *data_buffer;
 static unsigned int data_buffer_size;
 
 /* We need those values for the file/dir creation. */
-static uid_t relayd_uid;
-static gid_t relayd_gid;
+uid_t relayd_uid;
+gid_t relayd_gid;
 
 /* Global relay stream hash table. */
 struct lttng_ht *relay_streams_ht;
@@ -138,8 +145,8 @@ struct lttng_ht *relay_streams_ht;
 /* Global relay viewer stream hash table. */
 struct lttng_ht *viewer_streams_ht;
 
-/* Global hash table that stores relay index object. */
-struct lttng_ht *indexes_ht;
+/* Global relay sessions hash table. */
+struct lttng_ht *sessions_ht;
 
 /* Relayd health monitoring */
 struct health_app *health_relayd;
@@ -163,8 +170,7 @@ static const char *config_ignore_options[] = { "help", "config" };
 /*
  * usage function on stderr
  */
-static
-void usage(void)
+static void usage(void)
 {
 	fprintf(stderr, "Usage: %s OPTIONS\n\nOptions:\n", progname);
 	fprintf(stderr, "  -h, --help                Display this usage.\n");
@@ -185,8 +191,7 @@ void usage(void)
  *
  * Return 0 on success else a negative value.
  */
-static
-int set_option(int opt, const char *arg, const char *optname)
+static int set_option(int opt, const char *arg, const char *optname)
 {
 	int ret;
 
@@ -308,8 +313,7 @@ end:
  * See config_entry_handler_cb comment in common/config/config.h for the
  * return value conventions.
  */
-static
-int config_entry_handler(const struct config_entry *entry, void *unused)
+static int config_entry_handler(const struct config_entry *entry, void *unused)
 {
 	int ret = 0, i;
 
@@ -359,8 +363,7 @@ end:
 	return ret;
 }
 
-static
-int set_options(int argc, char **argv)
+static int set_options(int argc, char **argv)
 {
 	int c, ret = 0, option_index = 0, retval = 0;
 	int orig_optopt = optopt, orig_optind = optind;
@@ -486,8 +489,7 @@ exit:
 /*
  * Cleanup the daemon
  */
-static
-void relayd_cleanup(struct relay_local_data *relay_ctx)
+static void relayd_cleanup(void)
 {
 	DBG("Cleaning up");
 
@@ -495,9 +497,8 @@ void relayd_cleanup(struct relay_local_data *relay_ctx)
 		lttng_ht_destroy(viewer_streams_ht);
 	if (relay_streams_ht)
 		lttng_ht_destroy(relay_streams_ht);
-	if (relay_ctx && relay_ctx->sessions_ht)
-		lttng_ht_destroy(relay_ctx->sessions_ht);
-	free(relay_ctx);
+	if (sessions_ht)
+		lttng_ht_destroy(sessions_ht);
 
 	/* free the dynamically allocated opt_output_path */
 	free(opt_output_path);
@@ -517,8 +518,7 @@ void relayd_cleanup(struct relay_local_data *relay_ctx)
 /*
  * Write to writable pipe used to notify a thread.
  */
-static
-int notify_thread_pipe(int wpipe)
+static int notify_thread_pipe(int wpipe)
 {
 	ssize_t ret;
 
@@ -532,8 +532,7 @@ end:
 	return ret;
 }
 
-static
-int notify_health_quit_pipe(int *pipe)
+static int notify_health_quit_pipe(int *pipe)
 {
 	ssize_t ret;
 
@@ -582,8 +581,7 @@ int lttng_relay_stop_threads(void)
  * Simply stop all worker threads, leaving main() return gracefully after
  * joining all threads and calling cleanup().
  */
-static
-void sighandler(int sig)
+static void sighandler(int sig)
 {
 	switch (sig) {
 	case SIGPIPE:
@@ -613,8 +611,7 @@ void sighandler(int sig)
  * Setup signal handler for :
  *		SIGINT, SIGTERM, SIGPIPE
  */
-static
-int set_signal_handler(void)
+static int set_signal_handler(void)
 {
 	int ret = 0;
 	struct sigaction sa;
@@ -668,8 +665,7 @@ void lttng_relay_notify_ready(void)
  *
  * Return -1 on error or 0 if all pipes are created.
  */
-static
-int init_thread_quit_pipe(void)
+static int init_thread_quit_pipe(void)
 {
 	int ret;
 
@@ -681,8 +677,7 @@ int init_thread_quit_pipe(void)
 /*
  * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set.
  */
-static
-int create_thread_poll_set(struct lttng_poll_event *events, int size)
+static int create_thread_poll_set(struct lttng_poll_event *events, int size)
 {
 	int ret;
 
@@ -713,8 +708,7 @@ error:
  *
  * Return 1 if it was triggered else 0;
  */
-static
-int check_thread_quit_pipe(int fd, uint32_t events)
+static int check_thread_quit_pipe(int fd, uint32_t events)
 {
 	if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) {
 		return 1;
@@ -726,8 +720,7 @@ int check_thread_quit_pipe(int fd, uint32_t events)
 /*
  * Create and init socket from uri.
  */
-static
-struct lttcomm_sock *relay_init_sock(struct lttng_uri *uri)
+static struct lttcomm_sock *relay_socket_create(struct lttng_uri *uri)
 {
 	int ret;
 	struct lttcomm_sock *sock = NULL;
@@ -765,63 +758,9 @@ error:
 }
 
 /*
- * Return nonzero if stream needs to be closed.
- */
-static
-int close_stream_check(struct relay_stream *stream)
-{
-	if (stream->close_flag && stream->prev_seq == stream->last_net_seq_num) {
-		/*
-		 * We are about to close the stream so set the data pending flag to 1
-		 * which will make the end data pending command skip the stream which
-		 * is now closed and ready. Note that after proceeding to a file close,
-		 * the written file is ready for reading.
-		 */
-		stream->data_pending_check_done = 1;
-		return 1;
-	}
-	return 0;
-}
-
-static void try_close_stream(struct relay_session *session,
-		struct relay_stream *stream)
-{
-	int ret;
-	struct ctf_trace *ctf_trace;
-
-	assert(session);
-	assert(stream);
-
-	if (!close_stream_check(stream)) {
-		/* Can't close it, not ready for that. */
-		goto end;
-	}
-
-	ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
-			stream->path_name);
-	assert(ctf_trace);
-
-	pthread_mutex_lock(&session->viewer_ready_lock);
-	ctf_trace->invalid_flag = 1;
-	pthread_mutex_unlock(&session->viewer_ready_lock);
-
-	ret = stream_close(session, stream);
-	if (ret || session->snapshot) {
-		/* Already close thus the ctf trace is being or has been destroyed. */
-		goto end;
-	}
-
-	ctf_trace_try_destroy(session, ctf_trace);
-
-end:
-	return;
-}
-
-/*
  * This thread manages the listening for new connections on the network
  */
-static
-void *relay_thread_listener(void *data)
+static void *relay_thread_listener(void *data)
 {
 	int i, ret, pollfd, err = -1;
 	uint32_t revents, nb_fd;
@@ -834,12 +773,12 @@ void *relay_thread_listener(void *data)
 
 	health_code_update();
 
-	control_sock = relay_init_sock(control_uri);
+	control_sock = relay_socket_create(control_uri);
 	if (!control_sock) {
 		goto error_sock_control;
 	}
 
-	data_sock = relay_init_sock(data_uri);
+	data_sock = relay_socket_create(data_uri);
 	if (!data_sock) {
 		goto error_sock_relay;
 	}
@@ -922,27 +861,22 @@ restart:
 				int val = 1;
 				struct relay_connection *new_conn;
 				struct lttcomm_sock *newsock;
-
-				new_conn = connection_create();
-				if (!new_conn) {
-					goto error;
-				}
+				enum connection_type type;
 
 				if (pollfd == data_sock->fd) {
-					new_conn->type = RELAY_DATA;
+					type = RELAY_DATA;
 					newsock = data_sock->ops->accept(data_sock);
 					DBG("Relay data connection accepted, socket %d",
 							newsock->fd);
 				} else {
 					assert(pollfd == control_sock->fd);
-					new_conn->type = RELAY_CONTROL;
+					type = RELAY_CONTROL;
 					newsock = control_sock->ops->accept(control_sock);
 					DBG("Relay control connection accepted, socket %d",
 							newsock->fd);
 				}
 				if (!newsock) {
 					PERROR("accepting sock");
-					connection_free(new_conn);
 					goto error;
 				}
 
@@ -951,10 +885,13 @@ restart:
 				if (ret < 0) {
 					PERROR("setsockopt inet");
 					lttcomm_destroy_sock(newsock);
-					connection_free(new_conn);
 					goto error;
 				}
-				new_conn->sock = newsock;
+				new_conn = connection_create(newsock, type);
+				if (!new_conn) {
+					lttcomm_destroy_sock(newsock);
+					goto error;
+				}
 
 				/* Enqueue request for the dispatcher thread. */
 				cds_wfcq_enqueue(&relay_conn_queue.head, &relay_conn_queue.tail,
@@ -1004,8 +941,7 @@ error_sock_control:
 /*
  * This thread manages the dispatching of the requests to worker threads
  */
-static
-void *relay_thread_dispatcher(void *data)
+static void *relay_thread_dispatcher(void *data)
 {
 	int err = -1;
 	ssize_t ret;
@@ -1051,7 +987,7 @@ void *relay_thread_dispatcher(void *data)
 			ret = lttng_write(relay_conn_pipe[1], &new_conn, sizeof(new_conn));
 			if (ret < 0) {
 				PERROR("write connection pipe");
-				connection_destroy(new_conn);
+				connection_put(new_conn);
 				goto error;
 			}
 		} while (node != NULL);
@@ -1077,72 +1013,27 @@ error_testpoint:
 	return NULL;
 }
 
-static void try_close_streams(struct relay_session *session)
-{
-	struct ctf_trace *ctf_trace;
-	struct lttng_ht_iter iter;
-
-	assert(session);
-
-	pthread_mutex_lock(&session->viewer_ready_lock);
-	rcu_read_lock();
-	cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace,
-			node.node) {
-		struct relay_stream *stream;
-
-		/* Close streams. */
-		cds_list_for_each_entry(stream, &ctf_trace->stream_list, trace_list) {
-			stream_close(session, stream);
-		}
-
-		ctf_trace->invalid_flag = 1;
-		ctf_trace_try_destroy(session, ctf_trace);
-	}
-	rcu_read_unlock();
-	pthread_mutex_unlock(&session->viewer_ready_lock);
-}
-
-/*
- * Try to destroy a session within a connection.
- */
-static void destroy_session(struct relay_session *session,
-		struct lttng_ht *sessions_ht)
-{
-	assert(session);
-	assert(sessions_ht);
-
-	/* Indicate that this session can be destroyed from now on. */
-	session->close_flag = 1;
-
-	try_close_streams(session);
-
-	/*
-	 * This will try to delete and destroy the session if no viewer is attached
-	 * to it meaning the refcount is down to zero.
-	 */
-	session_try_destroy(sessions_ht, session);
-}
-
 /*
- * Copy index data from the control port to a given index object.
+ * Set index data from the control port to a given index object.
  */
-static void copy_index_control_data(struct relay_index *index,
+static int set_index_control_data(struct relay_index *index,
 		struct lttcomm_relayd_index *data)
 {
-	assert(index);
-	assert(data);
+	struct ctf_packet_index index_data;
 
 	/*
-	 * The index on disk is encoded in big endian, so we don't need to convert
-	 * the data received on the network. The data_offset value is NEVER
-	 * modified here and is updated by the data thread.
+	 * The index on disk is encoded in big endian, so we don't need
+	 * to convert the data received on the network. The data_offset
+	 * value is NEVER modified here and is updated by the data
+	 * thread.
 	 */
-	index->index_data.packet_size = data->packet_size;
-	index->index_data.content_size = data->content_size;
-	index->index_data.timestamp_begin = data->timestamp_begin;
-	index->index_data.timestamp_end = data->timestamp_end;
-	index->index_data.events_discarded = data->events_discarded;
-	index->index_data.stream_id = data->stream_id;
+	index_data.packet_size = data->packet_size;
+	index_data.content_size = data->content_size;
+	index_data.timestamp_begin = data->timestamp_begin;
+	index_data.timestamp_end = data->timestamp_end;
+	index_data.events_discarded = data->events_discarded;
+	index_data.stream_id = data->stream_id;
+	return relay_index_set_data(index, &index_data);
 }
 
 /*
@@ -1150,31 +1041,22 @@ static void copy_index_control_data(struct relay_index *index,
  *
  * On success, send back the session id or else return a negative value.
  */
-static
-int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr,
 		struct relay_connection *conn)
 {
 	int ret = 0, send_ret;
 	struct relay_session *session;
 	struct lttcomm_relayd_status_session reply;
+	char session_name[NAME_MAX];
+	char hostname[HOST_NAME_MAX];
+	uint32_t live_timer = 0;
+	bool snapshot = false;
 
-	assert(recv_hdr);
-	assert(conn);
+	memset(session_name, 0, NAME_MAX);
+	memset(hostname, 0, HOST_NAME_MAX);
 
 	memset(&reply, 0, sizeof(reply));
 
-	session = session_create();
-	if (!session) {
-		ret = -1;
-		goto error;
-	}
-	session->minor = conn->minor;
-	session->major = conn->major;
-	conn->session_id = session->id;
-	conn->session = session;
-
-	reply.session_id = htobe64(session->id);
-
 	switch (conn->minor) {
 	case 1:
 	case 2:
@@ -1182,13 +1064,25 @@ int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr,
 		break;
 	case 4: /* LTTng sessiond 2.4 */
 	default:
-		ret = cmd_create_session_2_4(conn, session);
+		ret = cmd_create_session_2_4(conn, session_name,
+			hostname, &live_timer, &snapshot);
+	}
+	if (ret < 0) {
+		goto send_reply;
 	}
 
-	lttng_ht_add_unique_u64(conn->sessions_ht, &session->session_n);
+	session = session_create(session_name, hostname, live_timer,
+			snapshot, conn->major, conn->minor);
+	if (!session) {
+		ret = -1;
+		goto send_reply;
+	}
+	conn->session = session;
 	DBG("Created session %" PRIu64, session->id);
 
-error:
+	reply.session_id = htobe64(session->id);
+
+send_reply:
 	if (ret < 0) {
 		reply.ret_code = htobe32(LTTNG_ERR_FATAL);
 	} else {
@@ -1208,47 +1102,48 @@ error:
  * When we have received all the streams and the metadata for a channel,
  * we make them visible to the viewer threads.
  */
-static
-void set_viewer_ready_flag(struct relay_connection *conn)
+static void publish_connection_local_streams(struct relay_connection *conn)
 {
-	struct relay_stream *stream, *tmp_stream;
+	struct relay_stream *stream;
+	struct relay_session *session = conn->session;
 
-	pthread_mutex_lock(&conn->session->viewer_ready_lock);
-	cds_list_for_each_entry_safe(stream, tmp_stream, &conn->recv_head,
-			recv_list) {
-		stream->viewer_ready = 1;
-		cds_list_del(&stream->recv_list);
+	/*
+	 * We publish all streams belonging to a session atomically wrt
+	 * session lock.
+	 */
+	pthread_mutex_lock(&session->lock);
+	rcu_read_lock();
+	cds_list_for_each_entry_rcu(stream, &session->recv_list,
+			recv_node) {
+		stream_publish(stream);
 	}
-	pthread_mutex_unlock(&conn->session->viewer_ready_lock);
-	return;
-}
-
-/*
- * Add a recv handle node to the connection recv list with the given stream
- * handle. A new node is allocated thus must be freed when the node is deleted
- * from the list.
- */
-static void queue_stream(struct relay_stream *stream,
-		struct relay_connection *conn)
-{
-	assert(conn);
-	assert(stream);
+	rcu_read_unlock();
 
-	cds_list_add(&stream->recv_list, &conn->recv_head);
+	/*
+	 * Inform the viewer that there are new streams in the session.
+	 */
+	if (session->viewer_attached) {
+		uatomic_set(&session->new_streams, 1);
+	}
+	pthread_mutex_unlock(&session->lock);
+	return;
 }
 
 /*
  * relay_add_stream: allocate a new stream for a session
  */
-static
-int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
 		struct relay_connection *conn)
 {
-	int ret, send_ret;
+	int ret;
+	ssize_t send_ret;
 	struct relay_session *session = conn->session;
 	struct relay_stream *stream = NULL;
 	struct lttcomm_relayd_status_stream reply;
 	struct ctf_trace *trace = NULL;
+	uint64_t stream_handle = -1ULL;
+	char *path_name = NULL, *channel_name = NULL;
+	uint64_t tracefile_size = 0, tracefile_count = 0;
 
 	if (!session || conn->version_check_done == 0) {
 		ERR("Trying to add a stream before version check");
@@ -1256,107 +1151,46 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
 		goto end_no_session;
 	}
 
-	stream = zmalloc(sizeof(struct relay_stream));
-	if (stream == NULL) {
-		PERROR("relay stream zmalloc");
-		ret = -1;
-		goto end_no_session;
-	}
-
-	switch (conn->minor) {
-	case 1: /* LTTng sessiond 2.1 */
-		ret = cmd_recv_stream_2_1(conn, stream);
+	switch (session->minor) {
+	case 1: /* LTTng sessiond 2.1. Allocates path_name and channel_name. */
+		ret = cmd_recv_stream_2_1(conn, &path_name,
+			&channel_name);
 		break;
-	case 2: /* LTTng sessiond 2.2 */
+	case 2: /* LTTng sessiond 2.2. Allocates path_name and channel_name. */
 	default:
-		ret = cmd_recv_stream_2_2(conn, stream);
+		ret = cmd_recv_stream_2_2(conn, &path_name,
+			&channel_name, &tracefile_size, &tracefile_count);
 		break;
 	}
 	if (ret < 0) {
-		goto err_free_stream;
-	}
-
-	stream->stream_handle = ++last_relay_stream_id;
-	stream->prev_seq = -1ULL;
-	stream->session_id = session->id;
-	stream->index_fd = -1;
-	stream->read_index_fd = -1;
-	stream->ctf_stream_id = -1ULL;
-	lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
-	pthread_mutex_init(&stream->lock, NULL);
-
-	ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG);
-	if (ret < 0) {
-		ERR("relay creating output directory");
-		goto err_free_stream;
-	}
-
-	/*
-	 * No need to use run_as API here because whatever we receives, the relayd
-	 * uses its own credentials for the stream files.
-	 */
-	ret = utils_create_stream_file(stream->path_name, stream->channel_name,
-			stream->tracefile_size, 0, relayd_uid, relayd_gid, NULL);
-	if (ret < 0) {
-		ERR("Create output file");
-		goto err_free_stream;
-	}
-	stream->fd = ret;
-	if (stream->tracefile_size) {
-		DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name);
-	} else {
-		DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name);
+		goto send_reply;
 	}
 
-	/* Protect access to "trace" */
-	rcu_read_lock();
-	trace = ctf_trace_find_by_path(session->ctf_traces_ht, stream->path_name);
+	trace = ctf_trace_get_by_path_or_create(session, path_name);
 	if (!trace) {
-		trace = ctf_trace_create(stream->path_name);
-		if (!trace) {
-			ret = -1;
-			goto end;
-		}
-		ctf_trace_add(session->ctf_traces_ht, trace);
+		goto send_reply;
 	}
-	ctf_trace_get_ref(trace);
+	/* This stream here has one reference on the trace. */
 
-	if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, NAME_MAX)) {
-		stream->metadata_flag = 1;
-		/* Assign quick reference to the metadata stream in the trace. */
-		trace->metadata_stream = stream;
-	}
+	pthread_mutex_lock(&last_relay_stream_id_lock);
+	stream_handle = ++last_relay_stream_id;
+	pthread_mutex_unlock(&last_relay_stream_id_lock);
 
-	/*
-	 * Add the stream in the recv list of the connection. Once the end stream
-	 * message is received, this list is emptied and streams are set with the
-	 * viewer ready flag.
-	 */
-	queue_stream(stream, conn);
+	/* We pass ownership of path_name and channel_name. */
+	stream = stream_create(trace, stream_handle, path_name,
+		channel_name, tracefile_size, tracefile_count);
 
 	/*
-	 * Both in the ctf_trace object and the global stream ht since the data
-	 * side of the relayd does not have the concept of session.
-	 *
-	 * rcu_read_lock() is kept to protect the stream which is now part of
-	 * the relay_streams_ht.
+	 * Streams are the owners of their trace. Reference to trace is
+	 * kept within stream_create().
 	 */
-	lttng_ht_add_unique_u64(relay_streams_ht, &stream->node);
-	cds_list_add_tail(&stream->trace_list, &trace->stream_list);
-
-	session->stream_count++;
-
-	DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
-			stream->stream_handle);
+	ctf_trace_put(trace);
 
-end:
+send_reply:
 	memset(&reply, 0, sizeof(reply));
-	reply.handle = htobe64(stream->stream_handle);
-	/* send the session id to the client or a negative return code on error */
-	if (ret < 0) {
+	reply.handle = htobe64(stream_handle);
+	if (!stream) {
 		reply.ret_code = htobe32(LTTNG_ERR_UNK);
-		/* stream was not properly added to the ht, so free it */
-		stream_destroy(stream);
 	} else {
 		reply.ret_code = htobe32(LTTNG_OK);
 	}
@@ -1365,29 +1199,17 @@ end:
 			sizeof(struct lttcomm_relayd_status_stream), 0);
 	if (send_ret < 0) {
 		ERR("Relay sending stream id");
-		ret = send_ret;
+		ret = (int) send_ret;
 	}
-	/*
-	 * rcu_read_lock() was held to protect either "trace" OR the "stream" at
-	 * this point.
-	 */
-	rcu_read_unlock();
-	trace = NULL;
-	stream = NULL;
 
 end_no_session:
 	return ret;
-
-err_free_stream:
-	stream_destroy(stream);
-	return ret;
 }
 
 /*
  * relay_close_stream: close a specific stream
  */
-static
-int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
 		struct relay_connection *conn)
 {
 	int ret, send_ret;
@@ -1417,23 +1239,18 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
 		goto end_no_session;
 	}
 
-	rcu_read_lock();
-	stream = stream_find_by_id(relay_streams_ht,
-			be64toh(stream_info.stream_id));
+	stream = stream_get_by_id(be64toh(stream_info.stream_id));
 	if (!stream) {
 		ret = -1;
-		goto end_unlock;
+		goto end;
 	}
-
+	pthread_mutex_lock(&stream->lock);
+	stream->closed = true;
 	stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
-	stream->close_flag = 1;
-	session->stream_count--;
+	pthread_mutex_unlock(&stream->lock);
+	stream_put(stream);
 
-	/* Check if we can close it or else the data will do it. */
-	try_close_stream(session, stream);
-
-end_unlock:
-	rcu_read_unlock();
+end:
 
 	memset(&reply, 0, sizeof(reply));
 	if (ret < 0) {
@@ -1455,8 +1272,7 @@ end_no_session:
 /*
  * relay_unknown_command: send -1 if received unknown command
  */
-static
-void relay_unknown_command(struct relay_connection *conn)
+static void relay_unknown_command(struct relay_connection *conn)
 {
 	struct lttcomm_relayd_generic_reply reply;
 	int ret;
@@ -1474,8 +1290,7 @@ void relay_unknown_command(struct relay_connection *conn)
  * relay_start: send an acknowledgment to the client to tell if we are
  * ready to receive data. We are ready if a session is established.
  */
-static
-int relay_start(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_start(struct lttcomm_relayd_hdr *recv_hdr,
 		struct relay_connection *conn)
 {
 	int ret = htobe32(LTTNG_OK);
@@ -1529,10 +1344,9 @@ end:
 }
 
 /*
- * relay_recv_metadata: receive the metada for the session.
+ * relay_recv_metadata: receive the metadata for the session.
  */
-static
-int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
 		struct relay_connection *conn)
 {
 	int ret = htobe32(LTTNG_OK);
@@ -1541,7 +1355,6 @@ int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
 	struct lttcomm_relayd_metadata_payload *metadata_struct;
 	struct relay_stream *metadata_stream;
 	uint64_t data_size, payload_size;
-	struct ctf_trace *ctf_trace;
 
 	if (!session) {
 		ERR("Metadata sent before version check");
@@ -1586,38 +1399,37 @@ int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
 	}
 	metadata_struct = (struct lttcomm_relayd_metadata_payload *) data_buffer;
 
-	rcu_read_lock();
-	metadata_stream = stream_find_by_id(relay_streams_ht,
-			be64toh(metadata_struct->stream_id));
+	metadata_stream = stream_get_by_id(be64toh(metadata_struct->stream_id));
 	if (!metadata_stream) {
 		ret = -1;
-		goto end_unlock;
+		goto end;
 	}
 
-	size_ret = lttng_write(metadata_stream->fd, metadata_struct->payload,
+	pthread_mutex_lock(&metadata_stream->lock);
+
+	size_ret = lttng_write(metadata_stream->stream_fd->fd, metadata_struct->payload,
 			payload_size);
 	if (size_ret < payload_size) {
 		ERR("Relay error writing metadata on file");
 		ret = -1;
-		goto end_unlock;
+		goto end_put;
 	}
 
-	ret = write_padding_to_file(metadata_stream->fd,
+	ret = write_padding_to_file(metadata_stream->stream_fd->fd,
 			be32toh(metadata_struct->padding_size));
 	if (ret < 0) {
-		goto end_unlock;
+		goto end_put;
 	}
 
-	ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
-			metadata_stream->path_name);
-	assert(ctf_trace);
-	ctf_trace->metadata_received +=
+	metadata_stream->trace->metadata_received +=
 		payload_size + be32toh(metadata_struct->padding_size);
 
 	DBG2("Relay metadata written");
 
-end_unlock:
-	rcu_read_unlock();
+end_put:
+	pthread_mutex_unlock(&metadata_stream->lock);
+	stream_put(metadata_stream);
+
 end:
 	return ret;
 }
@@ -1625,15 +1437,12 @@ end:
 /*
  * relay_send_version: send relayd version number
  */
-static
-int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
 		struct relay_connection *conn)
 {
 	int ret;
 	struct lttcomm_relayd_version reply, msg;
 
-	assert(conn);
-
 	conn->version_check_done = 1;
 
 	/* Get version from the other side. */
@@ -1657,7 +1466,7 @@ int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
 	if (reply.major != be32toh(msg.major)) {
 		DBG("Incompatible major versions (%u vs %u), deleting session",
 				reply.major, be32toh(msg.major));
-		destroy_session(conn->session, conn->sessions_ht);
+		connection_put(conn);
 		ret = 0;
 		goto end;
 	}
@@ -1688,8 +1497,7 @@ end:
 /*
  * Check for data pending for a given stream id from the session daemon.
  */
-static
-int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
 		struct relay_connection *conn)
 {
 	struct relay_session *session = conn->session;
@@ -1723,13 +1531,14 @@ int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
 	stream_id = be64toh(msg.stream_id);
 	last_net_seq_num = be64toh(msg.last_net_seq_num);
 
-	rcu_read_lock();
-	stream = stream_find_by_id(relay_streams_ht, stream_id);
+	stream = stream_get_by_id(stream_id);
 	if (stream == NULL) {
 		ret = -1;
-		goto end_unlock;
+		goto end;
 	}
 
+	pthread_mutex_lock(&stream->lock);
+
 	DBG("Data pending for stream id %" PRIu64 " prev_seq %" PRIu64
 			" and last_seq %" PRIu64, stream_id, stream->prev_seq,
 			last_net_seq_num);
@@ -1743,11 +1552,11 @@ int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
 		ret = 1;
 	}
 
-	/* Pending check is now done. */
-	stream->data_pending_check_done = 1;
+	stream->data_pending_check_done = true;
+	pthread_mutex_unlock(&stream->lock);
 
-end_unlock:
-	rcu_read_unlock();
+	stream_put(stream);
+end:
 
 	memset(&reply, 0, sizeof(reply));
 	reply.ret_code = htobe32(ret);
@@ -1767,14 +1576,12 @@ end_no_session:
  * means that every subsequent commands or data received on the control socket
  * has been handled. So, this is why we simply return OK here.
  */
-static
-int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr,
 		struct relay_connection *conn)
 {
 	int ret;
 	uint64_t stream_id;
 	struct relay_stream *stream;
-	struct lttng_ht_iter iter;
 	struct lttcomm_relayd_quiescent_control msg;
 	struct lttcomm_relayd_generic_reply reply;
 
@@ -1800,19 +1607,16 @@ int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr,
 	}
 
 	stream_id = be64toh(msg.stream_id);
-
-	rcu_read_lock();
-	cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
-			node.node) {
-		if (stream->stream_handle == stream_id) {
-			stream->data_pending_check_done = 1;
-			DBG("Relay quiescent control pending flag set to %" PRIu64,
-					stream_id);
-			break;
-		}
-	}
-	rcu_read_unlock();
-
+	stream = stream_get_by_id(stream_id);
+	if (!stream) {
+		goto reply;
+	}
+	pthread_mutex_lock(&stream->lock);
+	stream->data_pending_check_done = true;
+	pthread_mutex_unlock(&stream->lock);
+	DBG("Relay quiescent control pending flag set to %" PRIu64, stream_id);
+	stream_put(stream);
+reply:
 	memset(&reply, 0, sizeof(reply));
 	reply.ret_code = htobe32(LTTNG_OK);
 	ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
@@ -1831,8 +1635,7 @@ end_no_session:
  *
  * This command returns to the client a LTTNG_OK code.
  */
-static
-int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
 		struct relay_connection *conn)
 {
 	int ret;
@@ -1876,11 +1679,17 @@ int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
 	rcu_read_lock();
 	cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
 			node.node) {
-		if (stream->session_id == session_id) {
-			stream->data_pending_check_done = 0;
+		if (!stream_get(stream)) {
+			continue;
+		}
+		if (stream->trace->session->id == session_id) {
+			pthread_mutex_lock(&stream->lock);
+			stream->data_pending_check_done = false;
+			pthread_mutex_unlock(&stream->lock);
 			DBG("Set begin data pending flag to stream %" PRIu64,
 					stream->stream_handle);
 		}
+		stream_put(stream);
 	}
 	rcu_read_unlock();
 
@@ -1906,8 +1715,7 @@ end_no_session:
  *
  * Return to the client if there is data in flight or not with a ret_code.
  */
-static
-int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
 		struct relay_connection *conn)
 {
 	int ret;
@@ -1918,9 +1726,6 @@ int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
 	uint64_t session_id;
 	uint32_t is_data_inflight = 0;
 
-	assert(recv_hdr);
-	assert(conn);
-
 	DBG("End data pending command");
 
 	if (!conn->session || conn->version_check_done == 0) {
@@ -1948,13 +1753,26 @@ int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
 	rcu_read_lock();
 	cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
 			node.node) {
-		if (stream->session_id == session_id &&
-				!stream->data_pending_check_done && !stream->terminated_flag) {
-			is_data_inflight = 1;
-			DBG("Data is still in flight for stream %" PRIu64,
-					stream->stream_handle);
-			break;
+		if (!stream_get(stream)) {
+			continue;
+		}
+		if (stream->trace->session->id != session_id) {
+			stream_put(stream);
+			continue;
 		}
+		pthread_mutex_lock(&stream->lock);
+		if (!stream->data_pending_check_done) {
+			if (!stream->closed || !(((int64_t) (stream->prev_seq - stream->last_net_seq_num)) >= 0)) {
+				is_data_inflight = 1;
+				DBG("Data is still in flight for stream %" PRIu64,
+						stream->stream_handle);
+				pthread_mutex_unlock(&stream->lock);
+				stream_put(stream);
+				break;
+			}
+		}
+		pthread_mutex_unlock(&stream->lock);
+		stream_put(stream);
 	}
 	rcu_read_unlock();
 
@@ -1976,14 +1794,13 @@ end_no_session:
  *
  * Return 0 on success else a negative value.
  */
-static
-int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
 		struct relay_connection *conn)
 {
-	int ret, send_ret, index_created = 0;
+	int ret, send_ret;
 	struct relay_session *session = conn->session;
 	struct lttcomm_relayd_index index_info;
-	struct relay_index *index, *wr_index = NULL;
+	struct relay_index *index;
 	struct lttcomm_relayd_generic_reply reply;
 	struct relay_stream *stream;
 	uint64_t net_seq_num;
@@ -2013,13 +1830,12 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
 
 	net_seq_num = be64toh(index_info.net_seq_num);
 
-	rcu_read_lock();
-	stream = stream_find_by_id(relay_streams_ht,
-			be64toh(index_info.relay_stream_id));
+	stream = stream_get_by_id(be64toh(index_info.relay_stream_id));
 	if (!stream) {
 		ret = -1;
-		goto end_rcu_unlock;
+		goto end;
 	}
+	pthread_mutex_lock(&stream->lock);
 
 	/* Live beacon handling */
 	if (index_info.packet_size == 0) {
@@ -2033,56 +1849,40 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
 			stream->beacon_ts_end = be64toh(index_info.timestamp_end);
 		}
 		ret = 0;
-		goto end_rcu_unlock;
+		goto end_stream_put;
 	} else {
 		stream->beacon_ts_end = -1ULL;
 	}
 
-	index = relay_index_find(stream->stream_handle, net_seq_num);
-	if (!index) {
-		/* A successful creation will add the object to the HT. */
-		index = relay_index_create(stream->stream_handle, net_seq_num);
-		if (!index) {
-			goto end_rcu_unlock;
-		}
-		index_created = 1;
-		stream->indexes_in_flight++;
-	}
-
-	copy_index_control_data(index, &index_info);
 	if (stream->ctf_stream_id == -1ULL) {
 		stream->ctf_stream_id = be64toh(index_info.stream_id);
 	}
-
-	if (index_created) {
-		/*
-		 * Try to add the relay index object to the hash table. If an object
-		 * already exist, destroy back the index created, set the data in this
-		 * object and write it on disk.
-		 */
-		relay_index_add(index, &wr_index);
-		if (wr_index) {
-			copy_index_control_data(wr_index, &index_info);
-			free(index);
-		}
-	} else {
-		/* The index already exists so write it on disk. */
-		wr_index = index;
+	index = relay_index_get_by_id_or_create(stream, net_seq_num);
+	if (!index) {
+		ret = -1;
+		goto end_stream_put;
 	}
-
-	/* Do we have a writable ready index to write on disk. */
-	if (wr_index) {
-		ret = relay_index_write(wr_index->fd, wr_index);
-		if (ret < 0) {
-			goto end_rcu_unlock;
-		}
+	if (set_index_control_data(index, &index_info)) {
+		relay_index_put(index);
+		ret = -1;
+		goto end_stream_put;
+	}
+	ret = relay_index_try_flush(index);
+	if (ret == 0) {
 		stream->total_index_received++;
-		stream->indexes_in_flight--;
-		assert(stream->indexes_in_flight >= 0);
+	} else if (ret > 0) {
+		/* no flush. */
+		ret = 0;
+	} else {
+		relay_index_put(index);
+		ret = -1;
 	}
 
-end_rcu_unlock:
-	rcu_read_unlock();
+end_stream_put:
+	pthread_mutex_unlock(&stream->lock);
+	stream_put(stream);
+
+end:
 
 	memset(&reply, 0, sizeof(reply));
 	if (ret < 0) {
@@ -2105,8 +1905,7 @@ end_no_session:
  *
  * Return 0 on success else a negative value.
  */
-static
-int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr,
 		struct relay_connection *conn)
 {
 	int ret, send_ret;
@@ -2123,17 +1922,10 @@ int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr,
 	}
 
 	/*
-	 * Flag every pending stream in the connection recv list that they are
-	 * ready to be used by the viewer.
-	 */
-	set_viewer_ready_flag(conn);
-
-	/*
-	 * Inform the viewer that there are new streams in the session.
+	 * Publish every pending stream in the connection recv list that
+	 * they are ready to be used by the viewer.
 	 */
-	if (conn->session->viewer_refcount) {
-		uatomic_set(&conn->session->new_streams, 1);
-	}
+	publish_connection_local_streams(conn);
 
 	memset(&reply, 0, sizeof(reply));
 	reply.ret_code = htobe32(LTTNG_OK);
@@ -2153,8 +1945,7 @@ end_no_session:
 /*
  * Process the commands received on the control socket
  */
-static
-int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
 		struct relay_connection *conn)
 {
 	int ret = 0;
@@ -2211,20 +2002,17 @@ end:
 /*
  * Handle index for a data stream.
  *
- * RCU read side lock MUST be acquired.
+ * Called with the stream lock held.
  *
  * Return 0 on success else a negative value.
  */
 static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
 		int rotate_index)
 {
-	int ret = 0, index_created = 0;
-	uint64_t stream_id, data_offset;
-	struct relay_index *index, *wr_index = NULL;
-
-	assert(stream);
+	int ret = 0;
+	uint64_t data_offset;
+	struct relay_index *index;
 
-	stream_id = stream->stream_handle;
 	/* Get data offset because we are about to update the index. */
 	data_offset = htobe64(stream->tracefile_size_current);
 
@@ -2233,71 +2021,68 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
 	 * exists, the control thread already received the data for it thus we need
 	 * to write it on disk.
 	 */
-	index = relay_index_find(stream_id, net_seq_num);
+	index = relay_index_get_by_id_or_create(stream, net_seq_num);
 	if (!index) {
-		/* A successful creation will add the object to the HT. */
-		index = relay_index_create(stream_id, net_seq_num);
-		if (!index) {
-			ret = -1;
-			goto error;
-		}
-		index_created = 1;
-		stream->indexes_in_flight++;
+		ret = -1;
+		goto end;
 	}
 
-	if (rotate_index || stream->index_fd < 0) {
-		index->to_close_fd = stream->index_fd;
-		ret = index_create_file(stream->path_name, stream->channel_name,
-				relayd_uid, relayd_gid, stream->tracefile_size,
-				stream->tracefile_count_current);
-		if (ret < 0) {
-			/* This will close the stream's index fd if one. */
-			relay_index_free_safe(index);
-			goto error;
+	if (rotate_index || !stream->index_fd) {
+		int fd;
+
+		/* Put ref on previous index_fd. */
+		if (stream->index_fd) {
+			stream_fd_put(stream->index_fd);
+			stream->index_fd = NULL;
 		}
-		stream->index_fd = ret;
-	}
-	index->fd = stream->index_fd;
-	index->index_data.offset = data_offset;
 
-	if (index_created) {
-		/*
-		 * Try to add the relay index object to the hash table. If an object
-		 * already exist, destroy back the index created and set the data.
-		 */
-		relay_index_add(index, &wr_index);
-		if (wr_index) {
-			/* Copy back data from the created index. */
-			wr_index->fd = index->fd;
-			wr_index->to_close_fd = index->to_close_fd;
-			wr_index->index_data.offset = data_offset;
-			free(index);
+		fd = index_create_file(stream->path_name, stream->channel_name,
+				relayd_uid, relayd_gid, stream->tracefile_size,
+				stream->current_tracefile_id);
+		if (fd < 0) {
+			ret = -1;
+			/* Put self-ref for this index due to error. */
+			relay_index_put(index);
+			goto end;
 		}
-	} else {
-		/* The index already exists so write it on disk. */
-		wr_index = index;
+		stream->index_fd = stream_fd_create(fd);
+		if (!stream->index_fd) {
+			ret = -1;
+			if (close(fd)) {
+				PERROR("Error closing FD %d", fd);
+			}
+			/* Put self-ref for this index due to error. */
+			relay_index_put(index);
+			/* Will put the local ref. */
+			goto end;
+		}
+	}
+	if (relay_index_set_fd(index, stream->index_fd, data_offset)) {
+		ret = -1;
+		/* Put self-ref for this index due to error. */
+		relay_index_put(index);
+		goto end;
 	}
 
-	/* Do we have a writable ready index to write on disk. */
-	if (wr_index) {
-		ret = relay_index_write(wr_index->fd, wr_index);
-		if (ret < 0) {
-			goto error;
-		}
+	ret = relay_index_try_flush(index);
+	if (ret == 0) {
 		stream->total_index_received++;
-		stream->indexes_in_flight--;
-		assert(stream->indexes_in_flight >= 0);
+	} else if (ret > 0) {
+		/* No flush. */
+		ret = 0;
+	} else {
+		/* Put self-ref for this index due to error. */
+		relay_index_put(index);
+		ret = -1;
 	}
-
-error:
+end:
 	return ret;
 }
 
 /*
  * relay_process_data: Process the data received on the data socket
  */
-static
-int relay_process_data(struct relay_connection *conn)
+static int relay_process_data(struct relay_connection *conn)
 {
 	int ret = 0, rotate_index = 0;
 	ssize_t size_ret;
@@ -2324,17 +2109,12 @@ int relay_process_data(struct relay_connection *conn)
 	}
 
 	stream_id = be64toh(data_hdr.stream_id);
-
-	rcu_read_lock();
-	stream = stream_find_by_id(relay_streams_ht, stream_id);
+	stream = stream_get_by_id(stream_id);
 	if (!stream) {
 		ret = -1;
-		goto end_rcu_unlock;
+		goto end;
 	}
-
-	session = session_find_by_id(conn->sessions_ht, stream->session_id);
-	assert(session);
-
+	session = stream->trace->session;
 	data_size = be32toh(data_hdr.data_size);
 	if (data_buffer_size < data_size) {
 		char *tmp_data_ptr;
@@ -2344,7 +2124,7 @@ int relay_process_data(struct relay_connection *conn)
 			ERR("Allocating data buffer");
 			free(data_buffer);
 			ret = -1;
-			goto end_rcu_unlock;
+			goto end_stream_put;
 		}
 		data_buffer = tmp_data_ptr;
 		data_buffer_size = data_size;
@@ -2362,9 +2142,11 @@ int relay_process_data(struct relay_connection *conn)
 			DBG("Socket %d did an orderly shutdown", conn->sock->fd);
 		}
 		ret = -1;
-		goto end_rcu_unlock;
+		goto end_stream_put;
 	}
 
+	pthread_mutex_lock(&stream->lock);
+
 	/* Check if a rotation is needed. */
 	if (stream->tracefile_size > 0 &&
 			(stream->tracefile_size_current + data_size) >
@@ -2372,55 +2154,32 @@ int relay_process_data(struct relay_connection *conn)
 		struct relay_viewer_stream *vstream;
 		uint64_t new_id;
 
-		new_id = (stream->tracefile_count_current + 1) %
+		new_id = (stream->current_tracefile_id + 1) %
 			stream->tracefile_count;
 		/*
-		 * When we wrap-around back to 0, we start overwriting old
-		 * trace data.
+		 * Move viewer oldest available data position forward if
+		 * we are overwriting a tracefile.
 		 */
-		if (!stream->tracefile_overwrite && new_id == 0) {
-			stream->tracefile_overwrite = 1;
-		}
-		pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
-		if (stream->tracefile_overwrite) {
+		if (new_id == stream->oldest_tracefile_id) {
 			stream->oldest_tracefile_id =
 				(stream->oldest_tracefile_id + 1) %
 				stream->tracefile_count;
 		}
-		vstream = viewer_stream_find_by_id(stream->stream_handle);
+		vstream = viewer_stream_get_by_id(stream->stream_handle);
 		if (vstream) {
-			/*
-			 * The viewer is reading a file about to be
-			 * overwritten. Close the FDs it is
-			 * currently using and let it handle the fault.
-			 */
-			if (vstream->tracefile_count_current == new_id) {
-				pthread_mutex_lock(&vstream->overwrite_lock);
-				vstream->abort_flag = 1;
-				pthread_mutex_unlock(&vstream->overwrite_lock);
-				DBG("Streaming side setting abort_flag on stream %s_%" PRIu64 "\n",
-						stream->channel_name, new_id);
-			} else if (vstream->tracefile_count_current ==
-					stream->tracefile_count_current) {
-				/*
-				 * The reader and writer were in the
-				 * same trace file, inform the viewer
-				 * that no new index will ever be added
-				 * to this file.
-				 */
-				vstream->close_write_flag = 1;
+			pthread_mutex_lock(&vstream->lock);
+			ret = utils_rotate_stream_file(stream->path_name, stream->channel_name,
+					stream->tracefile_size, stream->tracefile_count,
+					relayd_uid, relayd_gid, stream->stream_fd->fd,
+					&stream->current_tracefile_id, &stream->stream_fd->fd);
+			pthread_mutex_unlock(&vstream->lock);
+			viewer_stream_put(vstream);
+			if (ret < 0) {
+				ERR("Rotating stream output file");
+				goto end_stream_unlock;
 			}
 		}
-		ret = utils_rotate_stream_file(stream->path_name, stream->channel_name,
-				stream->tracefile_size, stream->tracefile_count,
-				relayd_uid, relayd_gid, stream->fd,
-				&(stream->tracefile_count_current), &stream->fd);
-		pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
-		if (ret < 0) {
-			ERR("Rotating stream output file");
-			goto end_rcu_unlock;
-		}
-		/* Reset current size because we just perform a stream rotation. */
+		/* Reset current size because we just performed a stream rotation. */
 		stream->tracefile_size_current = 0;
 		rotate_index = 1;
 	}
@@ -2432,44 +2191,41 @@ int relay_process_data(struct relay_connection *conn)
 	if (session->minor >= 4 && !session->snapshot) {
 		ret = handle_index_data(stream, net_seq_num, rotate_index);
 		if (ret < 0) {
-			goto end_rcu_unlock;
+			goto end_stream_unlock;
 		}
 	}
 
 	/* Write data to stream output fd. */
-	size_ret = lttng_write(stream->fd, data_buffer, data_size);
+	size_ret = lttng_write(stream->stream_fd->fd, data_buffer, data_size);
 	if (size_ret < data_size) {
 		ERR("Relay error writing data to file");
 		ret = -1;
-		goto end_rcu_unlock;
+		goto end_stream_unlock;
 	}
 
 	DBG2("Relay wrote %d bytes to tracefile for stream id %" PRIu64,
 			ret, stream->stream_handle);
 
-	ret = write_padding_to_file(stream->fd, be32toh(data_hdr.padding_size));
+	ret = write_padding_to_file(stream->stream_fd->fd, be32toh(data_hdr.padding_size));
 	if (ret < 0) {
-		goto end_rcu_unlock;
+		goto end_stream_unlock;
 	}
 	stream->tracefile_size_current += data_size + be32toh(data_hdr.padding_size);
 
 	stream->prev_seq = net_seq_num;
 
-	try_close_stream(session, stream);
-
-end_rcu_unlock:
-	rcu_read_unlock();
+end_stream_unlock:
+	pthread_mutex_unlock(&stream->lock);
+end_stream_put:
+	stream_put(stream);
 end:
 	return ret;
 }
 
-static
-void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
+static void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
 {
 	int ret;
 
-	assert(events);
-
 	(void) lttng_poll_del(events, pollfd);
 
 	ret = close(pollfd);
@@ -2478,27 +2234,36 @@ void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
 	}
 }
 
-static void destroy_connection(struct lttng_ht *relay_connections_ht,
-		struct relay_connection *conn)
+static void relay_thread_close_connection(struct lttng_poll_event *events,
+		int pollfd, struct relay_connection *conn)
 {
-	assert(relay_connections_ht);
-	assert(conn);
-
-	connection_delete(relay_connections_ht, conn);
+	const char *type_str;
 
-	/* For the control socket, we try to destroy the session. */
-	if (conn->type == RELAY_CONTROL && conn->session) {
-		destroy_session(conn->session, conn->sessions_ht);
+	switch (conn->type) {
+	case RELAY_DATA:
+		type_str = "Data";
+		break;
+	case RELAY_CONTROL:
+		type_str = "Control";
+		break;
+	case RELAY_VIEWER_COMMAND:
+		type_str = "Viewer Command";
+		break;
+	case RELAY_VIEWER_NOTIFICATION:
+		type_str = "Viewer Notification";
+		break;
+	default:
+		type_str = "Unknown";
 	}
-
-	connection_destroy(conn);
+	cleanup_connection_pollfd(events, pollfd);
+	connection_put(conn);
+	DBG("%s connection closed with %d", type_str, pollfd);
 }
 
 /*
  * This thread does the actual work
  */
-static
-void *relay_thread_worker(void *data)
+static void *relay_thread_worker(void *data)
 {
 	int ret, err = -1, last_seen_data_fd = -1;
 	uint32_t nb_fd;
@@ -2506,9 +2271,6 @@ void *relay_thread_worker(void *data)
 	struct lttng_ht *relay_connections_ht;
 	struct lttng_ht_iter iter;
 	struct lttcomm_relayd_hdr recv_hdr;
-	struct relay_local_data *relay_ctx = (struct relay_local_data *) data;
-	struct lttng_ht *sessions_ht = relay_ctx->sessions_ht;
-	struct relay_index *index;
 	struct relay_connection *destroy_conn = NULL;
 
 	DBG("[thread] Relay worker started");
@@ -2529,12 +2291,6 @@ void *relay_thread_worker(void *data)
 		goto relay_connections_ht_error;
 	}
 
-	/* Tables of received indexes indexed by index handle and net_seq_num. */
-	indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_TWO_U64);
-	if (!indexes_ht) {
-		goto indexes_ht_error;
-	}
-
 	ret = create_thread_poll_set(&events, 2);
 	if (ret < 0) {
 		goto error_poll_create;
@@ -2604,27 +2360,20 @@ restart:
 					if (ret < 0) {
 						goto error;
 					}
-					conn->sessions_ht = sessions_ht;
-					connection_init(conn);
 					lttng_poll_add(&events, conn->sock->fd,
 							LPOLLIN | LPOLLRDHUP);
-					rcu_read_lock();
-					lttng_ht_add_unique_ulong(relay_connections_ht,
-							&conn->sock_n);
-					rcu_read_unlock();
+					connection_ht_add(relay_connections_ht, conn);
 					DBG("Connection socket %d added", conn->sock->fd);
 				}
 			} else {
 				struct relay_connection *ctrl_conn;
 
-				rcu_read_lock();
-				ctrl_conn = connection_find_by_sock(relay_connections_ht, pollfd);
+				ctrl_conn = connection_get_by_sock(relay_connections_ht, pollfd);
 				/* If not found, there is a synchronization issue. */
 				assert(ctrl_conn);
 
 				if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-					cleanup_connection_pollfd(&events, pollfd);
-					destroy_connection(relay_connections_ht, ctrl_conn);
+					relay_thread_close_connection(&events, pollfd, ctrl_conn);
 					if (last_seen_data_fd == pollfd) {
 						last_seen_data_fd = last_notdel_data_fd;
 					}
@@ -2634,16 +2383,14 @@ restart:
 								sizeof(recv_hdr), 0);
 						if (ret <= 0) {
 							/* Connection closed */
-							cleanup_connection_pollfd(&events, pollfd);
-							destroy_connection(relay_connections_ht, ctrl_conn);
-							DBG("Control connection closed with %d", pollfd);
+							relay_thread_close_connection(&events, pollfd,
+								ctrl_conn);
 						} else {
 							ret = relay_process_control(&recv_hdr, ctrl_conn);
 							if (ret < 0) {
 								/* Clear the session on error. */
-								cleanup_connection_pollfd(&events, pollfd);
-								destroy_connection(relay_connections_ht, ctrl_conn);
-								DBG("Connection closed with %d", pollfd);
+								relay_thread_close_connection(&events, pollfd,
+									ctrl_conn);
 							}
 							seen_control = 1;
 						}
@@ -2658,7 +2405,7 @@ restart:
 				} else {
 					ERR("Unknown poll events %u for sock %d", revents, pollfd);
 				}
-				rcu_read_unlock();
+				connection_put(ctrl_conn);
 			}
 		}
 
@@ -2702,26 +2449,22 @@ restart:
 				continue;
 			}
 
-			rcu_read_lock();
-			data_conn = connection_find_by_sock(relay_connections_ht, pollfd);
+			data_conn = connection_get_by_sock(relay_connections_ht, pollfd);
 			if (!data_conn) {
 				/* Skip it. Might be removed before. */
-				rcu_read_unlock();
 				continue;
 			}
 
 			if (revents & LPOLLIN) {
 				if (data_conn->type != RELAY_DATA) {
-					rcu_read_unlock();
-					continue;
+					goto put_connection;
 				}
 
 				ret = relay_process_data(data_conn);
 				/* Connection closed */
 				if (ret < 0) {
-					cleanup_connection_pollfd(&events, pollfd);
-					destroy_connection(relay_connections_ht, data_conn);
-					DBG("Data connection closed with %d", pollfd);
+					relay_thread_close_connection(&events, pollfd,
+						data_conn);
 					/*
 					 * Every goto restart call sets the last seen fd where
 					 * here we don't really care since we gracefully
@@ -2730,11 +2473,12 @@ restart:
 				} else {
 					/* Keep last seen port. */
 					last_seen_data_fd = pollfd;
-					rcu_read_unlock();
+					connection_put(data_conn);
 					goto restart;
 				}
 			}
-			rcu_read_unlock();
+		put_connection:
+			connection_put(data_conn);
 		}
 		last_seen_data_fd = -1;
 	}
@@ -2744,28 +2488,24 @@ restart:
 
 exit:
 error:
-	lttng_poll_clean(&events);
-
 	/* Cleanup reamaining connection object. */
 	rcu_read_lock();
 	cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter,
 			destroy_conn,
 			sock_n.node) {
 		health_code_update();
-		destroy_connection(relay_connections_ht, destroy_conn);
+		if (!connection_get(destroy_conn)) {
+			continue;
+		}
+		relay_thread_close_connection(&events, destroy_conn->sock->fd,
+			destroy_conn);
+		connection_put(destroy_conn);
 	}
 	rcu_read_unlock();
+
+	lttng_poll_clean(&events);
+
 error_poll_create:
-	rcu_read_lock();
-	cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index,
-			index_n.node) {
-		health_code_update();
-		relay_index_delete(index);
-		relay_index_free_safe(index);
-	}
-	rcu_read_unlock();
-	lttng_ht_destroy(indexes_ht);
-indexes_ht_error:
 	lttng_ht_destroy(relay_connections_ht);
 relay_connections_ht_error:
 	/* Close relay conn pipes */
@@ -2806,7 +2546,6 @@ int main(int argc, char **argv)
 {
 	int ret = 0, retval = 0;
 	void *status;
-	struct relay_local_data *relay_ctx = NULL;
 
 	/* Parse arguments */
 	progname = argv[0];
@@ -2904,16 +2643,9 @@ int main(int argc, char **argv)
 	lttcomm_init();
 	lttcomm_inet_init();
 
-	relay_ctx = zmalloc(sizeof(struct relay_local_data));
-	if (!relay_ctx) {
-		PERROR("relay_ctx");
-		retval = -1;
-		goto exit_init_data;
-	}
-
 	/* tables of sessions indexed by session ID */
-	relay_ctx->sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
-	if (!relay_ctx->sessions_ht) {
+	sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+	if (!sessions_ht) {
 		retval = -1;
 		goto exit_init_data;
 	}
@@ -2960,7 +2692,7 @@ int main(int argc, char **argv)
 
 	/* Setup the worker thread */
 	ret = pthread_create(&worker_thread, NULL,
-			relay_thread_worker, (void *) relay_ctx);
+			relay_thread_worker, NULL);
 	if (ret) {
 		errno = ret;
 		PERROR("pthread_create worker");
@@ -2978,7 +2710,7 @@ int main(int argc, char **argv)
 		goto exit_listener_thread;
 	}
 
-	ret = relayd_live_create(live_uri, relay_ctx);
+	ret = relayd_live_create(live_uri);
 	if (ret) {
 		ERR("Starting live viewer threads");
 		retval = -1;
@@ -3035,7 +2767,10 @@ exit_init_data:
 	health_app_destroy(health_relayd);
 exit_health_app_create:
 exit_options:
-	relayd_cleanup(relay_ctx);
+	relayd_cleanup();
+
+	/* Ensure all prior call_rcu are done. */
+	rcu_barrier();
 
 	if (!retval) {
 		exit(EXIT_SUCCESS);
diff --git a/src/bin/lttng-relayd/session.c b/src/bin/lttng-relayd/session.c
index 46d9cc6..8943216 100644
--- a/src/bin/lttng-relayd/session.c
+++ b/src/bin/lttng-relayd/session.c
@@ -1,6 +1,7 @@
 /*
  * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
  *                      David Goulet <dgoulet at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
@@ -19,28 +20,25 @@
 #define _GNU_SOURCE
 #define _LGPL_SOURCE
 #include <common/common.h>
+#include <urcu/rculist.h>
 
+#include "lttng-relayd.h"
 #include "ctf-trace.h"
 #include "session.h"
 #include "stream.h"
 
 /* Global session id used in the session creation. */
 static uint64_t last_relay_session_id;
-
-static void rcu_destroy_session(struct rcu_head *head)
-{
-	struct relay_session *session =
-		caa_container_of(head, struct relay_session, rcu_node);
-
-	free(session);
-}
+static pthread_mutex_t last_relay_session_id_lock = PTHREAD_MUTEX_INITIALIZER;
 
 /*
  * Create a new session by assigning a new session ID.
  *
  * Return allocated session or else NULL.
  */
-struct relay_session *session_create(void)
+struct relay_session *session_create(const char *session_name,
+		const char *hostname, uint32_t live_timer,
+		bool snapshot, uint32_t major, uint32_t minor)
 {
 	struct relay_session *session;
 
@@ -57,30 +55,62 @@ struct relay_session *session_create(void)
 		goto error;
 	}
 
-	pthread_mutex_init(&session->viewer_ready_lock, NULL);
+	pthread_mutex_lock(&last_relay_session_id_lock);
 	session->id = ++last_relay_session_id;
+	pthread_mutex_unlock(&last_relay_session_id_lock);
+
+	session->major = major;
+	session->minor = minor;
 	lttng_ht_node_init_u64(&session->session_n, session->id);
+	urcu_ref_init(&session->ref);
+	CDS_INIT_LIST_HEAD(&session->recv_list);
+	pthread_mutex_init(&session->lock, NULL);
+	pthread_mutex_init(&session->reflock, NULL);
+	pthread_mutex_init(&session->recv_list_lock, NULL);
+
+	strncpy(session->session_name, session_name,
+			sizeof(session->session_name));
+	strncpy(session->hostname, hostname,
+			sizeof(session->hostname));
+	session->live_timer = live_timer;
+	session->snapshot = snapshot;
+
+	lttng_ht_add_unique_u64(sessions_ht, &session->session_n);
 
 error:
 	return session;
 }
 
+/* Should be called with RCU read-side lock held. */
+bool session_get(struct relay_session *session)
+{
+	bool has_ref = false;
+
+	pthread_mutex_lock(&session->reflock);
+	if (session->ref.refcount != 0) {
+		has_ref = true;
+		urcu_ref_get(&session->ref);
+	}
+	pthread_mutex_unlock(&session->reflock);
+
+	return has_ref;
+}
+
 /*
- * Lookup a session within the given hash table and session id. RCU read side
- * lock MUST be acquired before calling this and as long as the caller has a
- * reference to the object.
+ * Lookup a session within the session hash table using the session id
+ * as key. A session reference is taken when a session is returned.
+ * session_put() must be called on that session.
  *
  * Return session or NULL if not found.
  */
-struct relay_session *session_find_by_id(struct lttng_ht *ht, uint64_t id)
+struct relay_session *session_get_by_id(uint64_t id)
 {
 	struct relay_session *session = NULL;
 	struct lttng_ht_node_u64 *node;
 	struct lttng_ht_iter iter;
 
-	assert(ht);
-
-	lttng_ht_lookup(ht, &id, &iter);
+	rcu_read_lock();
+	lttng_ht_lookup(sessions_ht, &id, &iter);
 	node = lttng_ht_iter_get_node_u64(&iter);
 	if (!node) {
 		DBG("Session find by ID %" PRIu64 " id NOT found", id);
@@ -88,97 +118,100 @@ struct relay_session *session_find_by_id(struct lttng_ht *ht, uint64_t id)
 	}
 	session = caa_container_of(node, struct relay_session, session_n);
 	DBG("Session find by ID %" PRIu64 " id found", id);
-
+	if (!session_get(session)) {
+		session = NULL;
+	}
 end:
+	rcu_read_unlock();
 	return session;
 }
 
+static void rcu_destroy_session(struct rcu_head *rcu_head)
+{
+	struct relay_session *session =
+		caa_container_of(rcu_head, struct relay_session, rcu_node);
+
+	free(session);
+}
+
 /*
  * Delete session from the given hash table.
  *
  * Return lttng ht del error code being 0 on success and 1 on failure.
  */
-int session_delete(struct lttng_ht *ht, struct relay_session *session)
+static int session_delete(struct relay_session *session)
 {
 	struct lttng_ht_iter iter;
 
-	assert(ht);
-	assert(session);
-
 	iter.iter.node = &session->session_n.node;
-	return lttng_ht_del(ht, &iter);
+	return lttng_ht_del(sessions_ht, &iter);
 }
 
-/*
- * The caller MUST be from the viewer thread since the viewer refcount is
- * decremented. With this calue down to 0, it will try to destroy the session.
- */
-void session_viewer_try_destroy(struct lttng_ht *ht,
-		struct relay_session *session)
-{
-	unsigned long ret_ref;
 
-	assert(session);
+static void destroy_session(struct relay_session *session)
+{
+	int ret;
 
-	ret_ref = uatomic_add_return(&session->viewer_refcount, -1);
-	if (ret_ref == 0) {
-		session_try_destroy(ht, session);
-	}
+	ret = session_delete(session);
+	assert(!ret);
+	/*
+	 * Since each trace has a reference on the session, it means
+	 * that if we are at the point where we teardown the session, no
+	 * trace belonging to that session exist at this point.
+	 */
+	lttng_ht_destroy(session->ctf_traces_ht);
+	call_rcu(&session->rcu_node, rcu_destroy_session);
 }
 
-/*
- * Should only be called from the main streaming thread since it does not touch
- * the viewer refcount. If this refcount is down to 0, destroy the session only
- * and only if the session deletion succeeds. This is done because the viewer
- * *and* the streaming thread can both concurently try to destroy the session
- * thus the first come first serve.
- */
-void session_try_destroy(struct lttng_ht *ht, struct relay_session *session)
+void session_release(struct urcu_ref *ref)
 {
-	int ret = 0;
-	unsigned long ret_ref;
+	struct relay_session *session =
+		caa_container_of(ref, struct relay_session, ref);
 
-	assert(session);
+	destroy_session(session);
+}
 
-	ret_ref = uatomic_read(&session->viewer_refcount);
-	if (ret_ref == 0 && session->close_flag) {
-		if (ht) {
-			ret = session_delete(ht, session);
-		}
-		if (!ret) {
-			/* Only destroy the session if the deletion was successful. */
-			session_destroy(session);
-		}
-	}
+void session_put(struct relay_session *session)
+{
+	rcu_read_lock();
+	pthread_mutex_lock(&session->reflock);
+	urcu_ref_put(&session->ref, session_release);
+	pthread_mutex_unlock(&session->reflock);
+	rcu_read_unlock();
 }
 
-/*
- * Destroy a session object.
- *
- * This function must *NOT* be called with an RCU read lock held since
- * the session's ctf_traces_ht is destroyed.
- */
-void session_destroy(struct relay_session *session)
+int session_close(struct relay_session *session)
 {
-	struct ctf_trace *ctf_trace;
+	int ret = 0;
+	struct ctf_trace *trace;
 	struct lttng_ht_iter iter;
 
-	assert(session);
-
-	DBG("Relay destroying session %" PRIu64, session->id);
+	pthread_mutex_lock(&session->lock);
+	if (session->connection_closed) {
+		ret = -1;
+		goto unlock;
+	}
+	session->connection_closed = true;
+unlock:
+	pthread_mutex_unlock(&session->lock);
+	if (ret) {
+		return ret;
+	}
 
-	/*
-	 * Empty the ctf trace hash table which will destroy the stream contained
-	 * in that table.
-	 */
 	rcu_read_lock();
-	cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace,
-			node.node) {
-		ctf_trace_delete(session->ctf_traces_ht, ctf_trace);
-		ctf_trace_destroy(ctf_trace);
+	cds_lfht_for_each_entry(session->ctf_traces_ht->ht,
+			&iter.iter, trace, node.node) {
+		ret = ctf_trace_close(trace);
+		if (ret) {
+			goto rcu_unlock;
+		}
 	}
+rcu_unlock:
 	rcu_read_unlock();
-	lttng_ht_destroy(session->ctf_traces_ht);
-
-	call_rcu(&session->rcu_node, rcu_destroy_session);
+	if (ret) {
+		return ret;
+	}
+	/* Put self-reference from create. */
+	session_put(session);
+	return 0;
 }
diff --git a/src/bin/lttng-relayd/session.h b/src/bin/lttng-relayd/session.h
index cb125be..960d8f9 100644
--- a/src/bin/lttng-relayd/session.h
+++ b/src/bin/lttng-relayd/session.h
@@ -1,6 +1,10 @@
+#ifndef _SESSION_H
+#define _SESSION_H
+
 /*
  * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
  *                      David Goulet <dgoulet at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
@@ -16,13 +20,11 @@
  * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#ifndef _SESSION_H
-#define _SESSION_H
-
 #include <limits.h>
 #include <inttypes.h>
 #include <pthread.h>
 #include <urcu/list.h>
+#include <urcu/ref.h>
 
 #include <common/hashtable/hashtable.h>
 
@@ -31,35 +33,59 @@
  */
 struct relay_session {
 	/*
-	 * This session id is used to identify a set of stream to a tracing session
-	 * but also make sure we have a unique session id associated with a session
-	 * daemon which can provide multiple data source.
+	 * This session id is used to identify a set of stream to a
+	 * tracing session but also make sure we have a unique session
+	 * id associated with a session daemon which can provide
+	 * multiple data source.
 	 */
 	uint64_t id;
 	char session_name[NAME_MAX];
 	char hostname[HOST_NAME_MAX];
 	uint32_t live_timer;
-	struct lttng_ht_node_u64 session_n;
-	struct rcu_head rcu_node;
-	uint32_t stream_count;
+
 	/* Tell if this session is for a snapshot or not. */
-	unsigned int snapshot:1;
-	/* Tell if the session has been closed on the streaming side. */
-	unsigned int close_flag:1;
+	bool snapshot;
+
+	/*
+	 * Session has no back reference to its connection because it
+	 * has a life-time that can be longer than the consumer connection
+	 * life-time: a reference can still be held by the viewer
+	 * connection.
+	 */
+
+	/* Reference count of ctf-traces and viewers using the session. */
+	struct urcu_ref ref;
+	/* session reflock nests inside ctf_trace reflock. */
+	pthread_mutex_t reflock;
+
+	pthread_mutex_t lock;
+
+	/*
+	 * major/minor version used for this session.
+	 */
+	uint32_t major;
+	uint32_t minor;
 
-	/* Number of viewer using it. Set to 0, it should be destroyed. */
-	int viewer_refcount;
+	bool viewer_attached;
+	/* Tell if the session connection has been closed on the streaming side. */
+	bool connection_closed;
 
 	/* Contains ctf_trace object of that session indexed by path name. */
 	struct lttng_ht *ctf_traces_ht;
 
 	/*
-	 * Indicate version protocol for this session. This is especially useful
-	 * for the data thread that has no idea which version it operates on since
-	 * linking control/data sockets is non trivial.
+	 * This contains streams that are received on that connection.
+	 * It's used to store them until we get the streams sent
+	 * command. When this is received, we remove those streams for
+	 * the list and publish them.
+	 * Updates are protected by the recv_list_lock.
+	 * Traversals are protected by RCU.
+	 * recv_list_lock also protects stream_count.
 	 */
-	uint64_t minor;
-	uint64_t major;
+	struct cds_list_head recv_list;	/* RCU list. */
+	uint32_t stream_count;
+	pthread_mutex_t recv_list_lock;
+
 	/*
 	 * Flag checked and exchanged with uatomic_cmpxchg to tell the
 	 * viewer-side if new streams got added since the last check.
@@ -67,50 +93,25 @@ struct relay_session {
 	unsigned long new_streams;
 
 	/*
-	 * Used to synchronize the process where we flag every streams readiness
-	 * for the viewer when the streams_sent message is received and the viewer
-	 * process of sending those streams.
+	 * Node in the global session hash table.
 	 */
-	pthread_mutex_t viewer_ready_lock;
-
+	struct lttng_ht_node_u64 session_n;
 	/*
 	 * Member of the session list in struct relay_viewer_session.
+	 * Updates are protected by the relay_viewer_session
+	 * session_list_lock. Traversals are protected by RCU.
 	 */
-	struct cds_list_head viewer_session_list;
+	struct cds_list_head viewer_session_node;
+	struct rcu_head rcu_node;	/* For call_rcu teardown. */
 };
 
-struct relay_viewer_session {
-	struct cds_list_head sessions_head;
-};
-
-static inline void session_viewer_attach(struct relay_session *session)
-{
-	uatomic_inc(&session->viewer_refcount);
-}
-
-static inline void session_viewer_detach(struct relay_session *session)
-{
-	uatomic_add(&session->viewer_refcount, -1);
-}
-
-struct relay_session *session_find_by_id(struct lttng_ht *ht, uint64_t id);
-struct relay_session *session_create(void);
-int session_delete(struct lttng_ht *ht, struct relay_session *session);
+struct relay_session *session_create(const char *session_name,
+		const char *hostname, uint32_t live_timer,
+		bool snapshot, uint32_t major, uint32_t minor);
+struct relay_session *session_get_by_id(uint64_t id);
+bool session_get(struct relay_session *session);
+void session_put(struct relay_session *session);
 
-/*
- * Direct destroy without reading the refcount.
- */
-void session_destroy(struct relay_session *session);
-
-/*
- * Destroy the session if the refcount is down to 0.
- */
-void session_try_destroy(struct lttng_ht *ht, struct relay_session *session);
-
-/*
- * Decrement the viewer refcount and destroy it if down to 0.
- */
-void session_viewer_try_destroy(struct lttng_ht *ht,
-		struct relay_session *session);
+int session_close(struct relay_session *session);
 
 #endif /* _SESSION_H */
diff --git a/src/bin/lttng-relayd/stream-fd.c b/src/bin/lttng-relayd/stream-fd.c
new file mode 100644
index 0000000..57324d7
--- /dev/null
+++ b/src/bin/lttng-relayd/stream-fd.c
@@ -0,0 +1,58 @@
+/*
+ * Copyright (C) 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#define _GNU_SOURCE
+#define _LGPL_SOURCE
+#include <common/common.h>
+
+#include "stream-fd.h"
+
+struct stream_fd *stream_fd_create(int fd)
+{
+	struct stream_fd *sf;
+
+	sf = zmalloc(sizeof(*sf));
+	if (!sf) {
+		goto end;
+	}
+	urcu_ref_init(&sf->ref);
+	sf->fd = fd;
+end:
+	return sf;
+}
+
+void stream_fd_get(struct stream_fd *sf)
+{
+	urcu_ref_get(&sf->ref);
+}
+
+static void stream_fd_release(struct urcu_ref *ref)
+{
+	struct stream_fd *sf = caa_container_of(ref, struct stream_fd, ref);
+	int ret;
+
+	ret = close(sf->fd);
+	if (ret) {
+		PERROR("Error closing stream FD %d", sf->fd);
+	}
+	free(sf);
+}
+
+void stream_fd_put(struct stream_fd *sf)
+{
+	urcu_ref_put(&sf->ref, stream_fd_release);
+}
diff --git a/src/bin/lttng-relayd/stream-fd.h b/src/bin/lttng-relayd/stream-fd.h
new file mode 100644
index 0000000..64f3b16
--- /dev/null
+++ b/src/bin/lttng-relayd/stream-fd.h
@@ -0,0 +1,32 @@
+#ifndef _STREAM_FD_H
+#define _STREAM_FD_H
+
+/*
+ * Copyright (C) 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include <urcu/ref.h>
+
+struct stream_fd {
+	int fd;
+	struct urcu_ref ref;
+};
+
+struct stream_fd *stream_fd_create(int fd);
+void stream_fd_get(struct stream_fd *sf);
+void stream_fd_put(struct stream_fd *sf);
+
+#endif /* _STREAM_FD_H */
diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c
index 17a5bcd..de2dbf6 100644
--- a/src/bin/lttng-relayd/stream.c
+++ b/src/bin/lttng-relayd/stream.c
@@ -1,6 +1,7 @@
 /*
  * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
  *                      David Goulet <dgoulet at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
@@ -19,128 +20,314 @@
 #define _GNU_SOURCE
 #define _LGPL_SOURCE
 #include <common/common.h>
+#include <common/utils.h>
+#include <common/defaults.h>
+#include <urcu/rculist.h>
+#include <sys/stat.h>
 
+#include "lttng-relayd.h"
 #include "index.h"
 #include "stream.h"
 #include "viewer-stream.h"
 
+/* Should be called with RCU read-side lock held. */
+bool stream_get(struct relay_stream *stream)
+{
+	bool has_ref = false;
+
+	pthread_mutex_lock(&stream->reflock);
+	if (stream->ref.refcount != 0) {
+		has_ref = true;
+		urcu_ref_get(&stream->ref);
+	}
+	pthread_mutex_unlock(&stream->reflock);
+
+	return has_ref;
+}
+
 /*
- * Get stream from stream id from the given hash table. Return stream if found
- * else NULL.
- *
- * Need to be called with RCU read-side lock held.
+ * Get stream from stream id from the streams hash table. Return stream
+ * if found else NULL. A stream reference is taken when a stream is
+ * returned. stream_put() must be called on that stream.
  */
-struct relay_stream *stream_find_by_id(struct lttng_ht *ht,
-		uint64_t stream_id)
+struct relay_stream *stream_get_by_id(uint64_t stream_id)
 {
 	struct lttng_ht_node_u64 *node;
 	struct lttng_ht_iter iter;
 	struct relay_stream *stream = NULL;
 
-	assert(ht);
-
-	lttng_ht_lookup(ht, &stream_id, &iter);
+	rcu_read_lock();
+	lttng_ht_lookup(relay_streams_ht, &stream_id, &iter);
 	node = lttng_ht_iter_get_node_u64(&iter);
-	if (node == NULL) {
+	if (!node) {
 		DBG("Relay stream %" PRIu64 " not found", stream_id);
 		goto end;
 	}
 	stream = caa_container_of(node, struct relay_stream, node);
-
+	if (!stream_get(stream)) {
+		stream = NULL;
+	}
 end:
+	rcu_read_unlock();
 	return stream;
 }
 
 /*
- * Close a given stream. If an assosiated viewer stream exists it is updated.
- *
- * RCU read side lock MUST be acquired.
- *
- * Return 0 if close was successful or 1 if already closed.
+ * We keep ownership of path_name and channel_name.
  */
-int stream_close(struct relay_session *session, struct relay_stream *stream)
+struct relay_stream *stream_create(struct ctf_trace *trace,
+	uint64_t stream_handle, char *path_name,
+	char *channel_name, uint64_t tracefile_size,
+	uint64_t tracefile_count)
 {
-	int delret, ret;
-	struct relay_viewer_stream *vstream;
-	struct ctf_trace *ctf_trace;
+	int ret;
+	struct relay_stream *stream = NULL;
+	struct relay_session *session = trace->session;
 
-	assert(stream);
+	stream = zmalloc(sizeof(struct relay_stream));
+	if (stream == NULL) {
+		PERROR("relay stream zmalloc");
+		ret = -1;
+		goto error_no_alloc;
+	}
 
-	pthread_mutex_lock(&stream->lock);
+	stream->stream_handle = stream_handle;
+	stream->prev_seq = -1ULL;
+	stream->ctf_stream_id = -1ULL;
+	stream->tracefile_size = tracefile_size;
+	stream->tracefile_count = tracefile_count;
+	stream->path_name = path_name;
+	stream->channel_name = channel_name;
+	lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
+	pthread_mutex_init(&stream->lock, NULL);
+	pthread_mutex_init(&stream->reflock, NULL);
+	urcu_ref_init(&stream->ref);
+	ctf_trace_get(trace);
+	stream->trace = trace;
 
-	if (stream->terminated_flag) {
-		/* This stream is already closed. Ignore. */
-		ret = 1;
-		goto end_unlock;
+	stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+	if (!stream->indexes_ht) {
+		ERR("Cannot created indexes_ht");
+		ret = -1;
+		goto end;
 	}
 
-	DBG("Closing stream id %" PRIu64, stream->stream_handle);
+	ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG);
+	if (ret < 0) {
+		ERR("relay creating output directory");
+		goto end;
+	}
 
-	if (stream->fd >= 0) {
-		delret = close(stream->fd);
-		if (delret < 0) {
-			PERROR("close stream");
+	/*
+	 * No need to use run_as API here because whatever we receives,
+	 * the relayd uses its own credentials for the stream files.
+	 */
+	ret = utils_create_stream_file(stream->path_name, stream->channel_name,
+			stream->tracefile_size, 0, relayd_uid, relayd_gid, NULL);
+	if (ret < 0) {
+		ERR("Create output file");
+		goto end;
+	}
+	stream->stream_fd = stream_fd_create(ret);
+	if (!stream->stream_fd) {
+		if (close(ret)) {
+			PERROR("Error closing file %d", ret);
 		}
+		ret = -1;
+		goto end;
+	}
+	if (stream->tracefile_size) {
+		DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name);
+	} else {
+		DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name);
 	}
 
-	if (stream->index_fd >= 0) {
-		delret = close(stream->index_fd);
-		if (delret < 0) {
-			PERROR("close stream index_fd");
-		}
+	if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, NAME_MAX)) {
+		stream->is_metadata = 1;
+		/* Assign quick reference to the metadata stream in the trace. */
+		trace->metadata_stream = stream;
 	}
 
-	vstream = viewer_stream_find_by_id(stream->stream_handle);
-	if (vstream) {
+	/*
+	 * Add the stream in the recv list of the session. Once the end stream
+	 * message is received, all session streams are published.
+	 */
+	pthread_mutex_lock(&session->recv_list_lock);
+	cds_list_add_rcu(&stream->recv_node, &session->recv_list);
+	session->stream_count++;
+	pthread_mutex_unlock(&session->recv_list_lock);
+
+	if (stream->is_metadata) {
 		/*
-		 * Set the last good value into the viewer stream. This is done
-		 * right before the stream gets deleted from the hash table. The
-		 * lookup failure on the live thread side of a stream indicates
-		 * that the viewer stream index received value should be used.
+		 * Session daemon expects metadata to be published
+		 * without issuing any streams sent cmd in snapshot
+		 * mode.
 		 */
-		pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
-		vstream->total_index_received = stream->total_index_received;
-		vstream->tracefile_count_last = stream->tracefile_count_current;
-		vstream->close_write_flag = 1;
-		pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
+		stream_publish(stream);
 	}
 
-	/* Cleanup index of that stream. */
-	relay_index_destroy_by_stream_id(stream->stream_handle);
+	DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
+			stream->stream_handle);
+	ret = 0;
 
-	ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
-			stream->path_name);
-	assert(ctf_trace);
-	ctf_trace_put_ref(ctf_trace);
+end:
+	if (ret) {
+		if (stream->stream_fd) {
+			stream_fd_put(stream->stream_fd);
+			stream->stream_fd = NULL;
+		}
+		stream_put(stream);
+		ctf_trace_put(trace);
+		stream = NULL;
+	}
+	return stream;
 
-	stream->close_flag = 1;
-	stream->terminated_flag = 1;
-	ret = 0;
+error_no_alloc:
+	/*
+	 * path_name and channel_name need to be freed explicitly here
+	 * because we cannot rely on stream_put().
+	 */
+	free(path_name);
+	free(channel_name);
+	return NULL;
+}
+
+void stream_publish(struct relay_stream *stream)
+{
+	struct relay_session *session;
+
+	pthread_mutex_lock(&stream->lock);
+	if (stream->published) {
+		goto unlock;
+	}
+
+	session = stream->trace->session;
+
+	pthread_mutex_lock(&session->recv_list_lock);
+	if (stream->in_recv_list) {
+		cds_list_del_rcu(&stream->recv_node);
+		stream->in_recv_list = false;
+	}
+	pthread_mutex_unlock(&session->recv_list_lock);
 
-end_unlock:
+	/*
+	 * Both in the ctf_trace object and the global stream ht since the data
+	 * side of the relayd does not have the concept of session.
+	 */
+	lttng_ht_add_unique_u64(relay_streams_ht, &stream->node);
+
+	pthread_mutex_lock(&stream->trace->stream_list_lock);
+	cds_list_add_rcu(&stream->stream_node, &stream->trace->stream_list);
+	pthread_mutex_unlock(&stream->trace->stream_list_lock);
+
+	stream->published = true;
+unlock:
 	pthread_mutex_unlock(&stream->lock);
-	return ret;
 }
 
-void stream_delete(struct lttng_ht *ht, struct relay_stream *stream)
+/*
+ * Only called from destroy. No stream lock needed, since there is a
+ * single user at this point.
+ */
+static void stream_unpublish(struct relay_stream *stream)
 {
 	int ret;
 	struct lttng_ht_iter iter;
 
-	assert(ht);
-	assert(stream);
+	if (!stream->published) {
+		return;
+	}
+	pthread_mutex_lock(&stream->trace->stream_list_lock);
+	cds_list_del_rcu(&stream->stream_node);
+	pthread_mutex_unlock(&stream->trace->stream_list_lock);
 
 	iter.iter.node = &stream->node.node;
-	ret = lttng_ht_del(ht, &iter);
+	ret = lttng_ht_del(relay_streams_ht, &iter);
 	assert(!ret);
-
-	cds_list_del(&stream->trace_list);
+	stream->published = false;
 }
 
-void stream_destroy(struct relay_stream *stream)
+static void stream_destroy(struct relay_stream *stream)
 {
-	assert(stream);
+	if (stream->indexes_ht) {
+		lttng_ht_destroy(stream->indexes_ht);
+	}
 	free(stream->path_name);
 	free(stream->channel_name);
 	free(stream);
 }
+
+static void stream_destroy_rcu(struct rcu_head *rcu_head)
+{
+	struct relay_stream *stream =
+		caa_container_of(rcu_head, struct relay_stream, rcu_node);
+
+	stream_destroy(stream);
+}
+
+static void stream_release(struct urcu_ref *ref)
+{
+	struct relay_stream *stream =
+		caa_container_of(ref, struct relay_stream, ref);
+	struct relay_session *session;
+
+	session = stream->trace->session;
+
+	DBG("Releasing stream id %" PRIu64, stream->stream_handle);
+
+	pthread_mutex_lock(&session->recv_list_lock);
+	session->stream_count--;
+	if (stream->in_recv_list) {
+		cds_list_del_rcu(&stream->recv_node);
+		stream->in_recv_list = false;
+	}
+	pthread_mutex_unlock(&session->recv_list_lock);
+
+	stream_unpublish(stream);
+
+	if (stream->stream_fd) {
+		stream_fd_put(stream->stream_fd);
+		stream->stream_fd = NULL;
+	}
+	if (stream->index_fd) {
+		stream_fd_put(stream->index_fd);
+		stream->index_fd = NULL;
+	}
+	if (stream->trace) {
+		ctf_trace_put(stream->trace);
+		stream->trace = NULL;
+	}
+
+	call_rcu(&stream->rcu_node, stream_destroy_rcu);
+}
+
+void stream_put(struct relay_stream *stream)
+{
+	DBG("stream put for stream id %" PRIu64, stream->stream_handle);
+	/*
+	 * Ensure existance of stream->reflock for stream unlock.
+	 */
+	rcu_read_lock();
+	/*
+	 * Stream reflock ensures that concurrent test and update of
+	 * stream ref is atomic.
+	 */
+	pthread_mutex_lock(&stream->reflock);
+	assert(stream->ref.refcount != 0);
+	/*
+	 * Wait until we have processed all the stream packets before
+	 * actually putting our last stream reference.
+	 */
+	DBG("stream put stream id %" PRIu64 " refcount %d",
+		stream->stream_handle,
+		(int) stream->ref.refcount);
+	urcu_ref_put(&stream->ref, stream_release);
+	pthread_mutex_unlock(&stream->reflock);
+	rcu_read_unlock();
+}
+
+void stream_close(struct relay_stream *stream)
+{
+	relay_index_close_all(stream);
+	stream_put(stream);
+}
diff --git a/src/bin/lttng-relayd/stream.h b/src/bin/lttng-relayd/stream.h
index 4dd2e62..fd84f2e 100644
--- a/src/bin/lttng-relayd/stream.h
+++ b/src/bin/lttng-relayd/stream.h
@@ -1,6 +1,10 @@
+#ifndef _STREAM_H
+#define _STREAM_H
+
 /*
  * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
  *                      David Goulet <dgoulet at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
@@ -16,9 +20,6 @@
  * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#ifndef _STREAM_H
-#define _STREAM_H
-
 #include <limits.h>
 #include <inttypes.h>
 #include <pthread.h>
@@ -27,100 +28,110 @@
 #include <common/hashtable/hashtable.h>
 
 #include "session.h"
+#include "stream-fd.h"
 
 /*
  * Represents a stream in the relay
  */
 struct relay_stream {
 	uint64_t stream_handle;
-	uint64_t prev_seq;	/* previous data sequence number encountered */
-	struct lttng_ht_node_u64 node;
+
 	/*
-	 * When we receive a stream, it gets stored in a list (on a per connection
-	 * basis) until we have all the streams of the same channel and the metadata
-	 * associated with it, then it gets flagged with viewer_ready.
+	 * reflock used to synchronize the closing of this stream.
+	 * stream reflock nests inside viewer stream reflock.
+	 * stream reflock nests inside index reflock.
 	 */
-	struct cds_list_head recv_list;
+	pthread_mutex_t reflock;
+	struct urcu_ref ref;
+	/* Back reference to trace. Protected by refcount on trace object. */
+	struct ctf_trace *trace;
 
-	/* Added to the corresponding ctf_trace. */
-	struct cds_list_head trace_list;
-	struct rcu_head rcu_node;
-	uint64_t session_id;
-	int fd;
+	/*
+	 * To protect from concurrent read/update. The viewer stream
+	 * lock nests inside the stream lock. The stream lock nests
+	 * inside the ctf_trace lock.
+	 */
+	pthread_mutex_t lock;
+	uint64_t prev_seq;		/* previous data sequence number encountered */
+	uint64_t last_net_seq_num;	/* seq num to encounter before closing. */
+
+	/* FD on which to write the stream data. */
+	struct stream_fd *stream_fd;
 	/* FD on which to write the index data. */
-	int index_fd;
-	/* FD on which to read the index data for the viewer. */
-	int read_index_fd;
+	struct stream_fd *index_fd;
 
 	char *path_name;
 	char *channel_name;
+
 	/* on-disk circular buffer of tracefiles */
 	uint64_t tracefile_size;
 	uint64_t tracefile_size_current;
 	uint64_t tracefile_count;
-	uint64_t tracefile_count_current;
+	uint64_t current_tracefile_id;
+
 	/* To inform the viewer up to where it can go back in time. */
 	uint64_t oldest_tracefile_id;
 
-	uint64_t total_index_received;
-	uint64_t last_net_seq_num;
-
+	struct lttng_ht *indexes_ht;
 	/*
-	 * To protect from concurrent read/update. Also used to synchronize the
-	 * closing of this stream.
+	 * Counts number of indexes in indexes_ht. Redundant info.
+	 * Protected by stream lock.
 	 */
-	pthread_mutex_t lock;
+	uint64_t total_index_received;
 
+	bool closed;	/* Stream is closed. */
+
+	int indexes_in_flight;
 	/*
-	 * If the stream is inactive, this field is updated with the live beacon
-	 * timestamp end, when it is active, this field == -1ULL.
+	 * If the stream is inactive, this field is updated with the
+	 * live beacon timestamp end, when it is active, this
+	 * field == -1ULL.
 	 */
 	uint64_t beacon_ts_end;
 	/*
-	 * Number of indexes that are supposed to be complete soon.
-	 * Avoid sending the inactivity beacon to the client when data is in
-	 * transit.
-	 */
-	int indexes_in_flight;
-	/*
 	 * CTF stream ID, -1ULL when unset.
 	 */
 	uint64_t ctf_stream_id;
-	/*
-	 * To protect the update of the close_write_flag and the checks of
-	 * the tracefile_count_current.
-	 * It is taken before checking whenever we need to know if the
-	 * writer and reader are working in the same tracefile.
-	 */
-	pthread_mutex_t viewer_stream_rotation_lock;
 
-	/* Information telling us when to close the stream  */
-	unsigned int close_flag:1;
+	/* Indicate if the stream was initialized for a data pending command. */
+	bool data_pending_check_done;
+
+	/* Is this stream a metadata stream ? */
+	int32_t is_metadata;
+
 	/*
-	 * Indicates if the stream has been effectively closed thus having the
-	 * information in it invalidated but NOT freed. The stream lock MUST be
-	 * held to read/update that value.
+	 * Member of the stream list in struct ctf_trace.
+	 * Updates are protected by the stream_list_lock.
+	 * Traversals are protected by RCU.
 	 */
-	unsigned int terminated_flag:1;
-	/* Indicate if the stream was initialized for a data pending command. */
-	unsigned int data_pending_check_done:1;
-	unsigned int metadata_flag:1;
+	struct cds_list_head stream_node;
 	/*
-	 * To detect when we start overwriting old data, it is used to
-	 * update the oldest_tracefile_id.
+	 * Temporary list belonging to the connection until all streams
+	 * are received for that connection.
+	 * Member of the stream recv list in the connection.
+	 * Updates are protected by the stream_recv_list_lock.
+	 * Traversals are protected by RCU.
 	 */
-	unsigned int tracefile_overwrite:1;
+	bool in_recv_list;
+	struct cds_list_head recv_node;
+	bool published;
 	/*
-	 * Can this stream be used by a viewer or are we waiting for additional
-	 * information.
+	 * Node of stream within global stream hash table.
 	 */
-	unsigned int viewer_ready:1;
+	struct lttng_ht_node_u64 node;
+	struct rcu_head rcu_node;	/* For call_rcu teardown. */
 };
 
-struct relay_stream *stream_find_by_id(struct lttng_ht *ht,
-		uint64_t stream_id);
-int stream_close(struct relay_session *session, struct relay_stream *stream);
-void stream_delete(struct lttng_ht *ht, struct relay_stream *stream);
-void stream_destroy(struct relay_stream *stream);
+struct relay_stream *stream_create(struct ctf_trace *trace,
+	uint64_t stream_handle, char *path_name,
+	char *channel_name, uint64_t tracefile_size,
+	uint64_t tracefile_count);
+
+struct relay_stream *stream_get_by_id(uint64_t stream_id);
+bool stream_get(struct relay_stream *stream);
+void stream_put(struct relay_stream *stream);
+void stream_close(struct relay_stream *stream);
+
+void stream_publish(struct relay_stream *stream);
 
 #endif /* _STREAM_H */
diff --git a/src/bin/lttng-relayd/utils.h b/src/bin/lttng-relayd/utils.h
index de1521d..4a56980 100644
--- a/src/bin/lttng-relayd/utils.h
+++ b/src/bin/lttng-relayd/utils.h
@@ -1,6 +1,10 @@
+#ifndef RELAYD_UTILS_H
+#define RELAYD_UTILS_H
+
 /*
  * Copyright (C) 2012 - Julien Desfossez <jdesfossez at efficios.com>
  *                      David Goulet <dgoulet at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License, version 2 only,
@@ -16,9 +20,6 @@
  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#ifndef RELAYD_UTILS_H
-#define RELAYD_UTILS_H
-
 char *create_output_path(char *path_name);
 
 #endif /* RELAYD_UTILS_H */
diff --git a/src/bin/lttng-relayd/viewer-session.c b/src/bin/lttng-relayd/viewer-session.c
new file mode 100644
index 0000000..38e7e78
--- /dev/null
+++ b/src/bin/lttng-relayd/viewer-session.c
@@ -0,0 +1,159 @@
+/*
+ * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
+ *                      David Goulet <dgoulet at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#define _GNU_SOURCE
+#define _LGPL_SOURCE
+#include <common/common.h>
+#include <urcu/rculist.h>
+
+#include "lttng-relayd.h"
+#include "ctf-trace.h"
+#include "session.h"
+#include "viewer-session.h"
+#include "viewer-stream.h"
+#include "stream.h"
+
+struct relay_viewer_session *viewer_session_create(void)
+{
+	struct relay_viewer_session *vsession;
+
+	vsession = zmalloc(sizeof(*vsession));
+	if (!vsession) {
+		goto end;
+	}
+	CDS_INIT_LIST_HEAD(&vsession->session_list);
+end:
+	return vsession;
+}
+
+int viewer_session_attach(struct relay_viewer_session *vsession,
+		struct relay_session *session)
+{
+	int ret = 0;
+
+	if (!session_get(session)) {
+		ret = -1;
+		goto end;
+	}
+	pthread_mutex_lock(&session->lock);
+	if (session->viewer_attached) {
+		ret = -1;
+	} else {
+		session->viewer_attached = true;
+	}
+	pthread_mutex_unlock(&session->lock);
+
+	if (!ret) {
+		pthread_mutex_lock(&vsession->session_list_lock);
+		cds_list_add_rcu(&session->viewer_session_node,
+				&vsession->session_list);
+		pthread_mutex_unlock(&vsession->session_list_lock);
+	} else {
+		session_put(session);
+	}
+end:
+	return ret;
+}
+
+int viewer_session_detach(struct relay_viewer_session *vsession,
+		struct relay_session *session)
+{
+	int ret = 0;
+
+	pthread_mutex_lock(&session->lock);
+	if (!session->viewer_attached) {
+		ret = -1;
+	} else {
+		session->viewer_attached = false;
+	}
+	pthread_mutex_unlock(&session->lock);
+
+	if (!ret) {
+		pthread_mutex_lock(&vsession->session_list_lock);
+		cds_list_del_rcu(&session->viewer_session_node);
+		pthread_mutex_unlock(&vsession->session_list_lock);
+		session_put(session);
+	}
+	return ret;
+}
+
+void viewer_session_destroy(struct relay_viewer_session *vsession)
+{
+	free(vsession);
+}
+
+void viewer_session_close(struct relay_viewer_session *vsession)
+{
+	struct relay_session *session;
+
+	rcu_read_lock();
+	cds_list_for_each_entry_rcu(session,
+			&vsession->session_list, viewer_session_node) {
+		struct lttng_ht_iter iter;
+		struct relay_viewer_stream *vstream;
+
+		/*
+		 * TODO: improvement: create more efficient list of
+		 * vstream per session.
+		 */
+		cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter,
+				vstream, stream_n.node) {
+			if (vstream->stream->trace->session != session) {
+				continue;
+			}
+			viewer_stream_put(vstream);
+		}
+
+		viewer_session_detach(vsession, session);
+	}
+	rcu_read_unlock();
+}
+
+/*
+ * Check if a connection is attached to a session.
+ * Return 1 if attached, 0 if not attached, a negative value on error.
+ */
+int viewer_session_is_attached(struct relay_viewer_session *vsession,
+		struct relay_session *session)
+{
+	struct relay_session *iter;
+	int found = 0;
+
+	pthread_mutex_lock(&session->lock);
+	if (!vsession) {
+		goto end;
+	}
+	if (!session->viewer_attached) {
+		goto end;
+	}
+	rcu_read_lock();
+	cds_list_for_each_entry_rcu(iter,
+			&vsession->session_list,
+			viewer_session_node) {
+		if (session == iter) {
+			found = 1;
+			goto end;
+		}
+	}
+end:
+	pthread_mutex_unlock(&session->lock);
+	rcu_read_unlock();
+	return found;
+}
+
diff --git a/src/bin/lttng-relayd/viewer-session.h b/src/bin/lttng-relayd/viewer-session.h
new file mode 100644
index 0000000..dd3e76b
--- /dev/null
+++ b/src/bin/lttng-relayd/viewer-session.h
@@ -0,0 +1,55 @@
+#ifndef _VIEWER_SESSION_H
+#define _VIEWER_SESSION_H
+
+/*
+ * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
+ *                      David Goulet <dgoulet at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include <limits.h>
+#include <inttypes.h>
+#include <pthread.h>
+#include <urcu/list.h>
+#include <urcu/ref.h>
+
+#include <common/hashtable/hashtable.h>
+
+#include "session.h"
+
+struct relay_viewer_session {
+	/*
+	 * Session list. Updates are protected by the session_list_lock.
+	 * Traversals are protected by RCU.
+	 * This list limits the design to having the sessions in at most
+	 * one viewer session.
+	 */
+	struct cds_list_head session_list;	/* RCU list. */
+	pthread_mutex_t session_list_lock;	/* Protects list updates. */
+};
+
+struct relay_viewer_session *viewer_session_create(void);
+void viewer_session_destroy(struct relay_viewer_session *vsession);
+void viewer_session_close(struct relay_viewer_session *vsession);
+
+int viewer_session_attach(struct relay_viewer_session *vsession,
+		struct relay_session *session);
+int viewer_session_detach(struct relay_viewer_session *vsession,
+		struct relay_session *session);
+int viewer_session_is_attached(struct relay_viewer_session *vsession,
+		struct relay_session *session);
+
+#endif /* _VIEWER_SESSION_H */
diff --git a/src/bin/lttng-relayd/viewer-stream.c b/src/bin/lttng-relayd/viewer-stream.c
index 3748629..60117f9 100644
--- a/src/bin/lttng-relayd/viewer-stream.c
+++ b/src/bin/lttng-relayd/viewer-stream.c
@@ -1,6 +1,7 @@
 /*
  * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
  *                      David Goulet <dgoulet at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
@@ -24,39 +25,32 @@
 #include "lttng-relayd.h"
 #include "viewer-stream.h"
 
-static void free_stream(struct relay_viewer_stream *stream)
+static void viewer_stream_destroy(struct relay_viewer_stream *vstream)
 {
-	assert(stream);
-
-	free(stream->path_name);
-	free(stream->channel_name);
-	free(stream);
+	free(vstream->path_name);
+	free(vstream->channel_name);
+	free(vstream);
 }
 
-static void deferred_free_viewer_stream(struct rcu_head *head)
+static void viewer_stream_destroy_rcu(struct rcu_head *head)
 {
-	struct relay_viewer_stream *stream =
+	struct relay_viewer_stream *vstream =
 		caa_container_of(head, struct relay_viewer_stream, rcu_node);
 
-	free_stream(stream);
+	viewer_stream_destroy(vstream);
 }
 
 struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
-		enum lttng_viewer_seek seek_t, struct ctf_trace *ctf_trace)
+		enum lttng_viewer_seek seek_t)
 {
 	struct relay_viewer_stream *vstream;
 
-	assert(stream);
-	assert(ctf_trace);
-
 	vstream = zmalloc(sizeof(*vstream));
 	if (!vstream) {
 		PERROR("relay viewer stream zmalloc");
 		goto error;
 	}
 
-	vstream->session_id = stream->session_id;
-	vstream->stream_handle = stream->stream_handle;
 	vstream->path_name = strndup(stream->path_name, LTTNG_VIEWER_PATH_MAX);
 	if (vstream->path_name == NULL) {
 		PERROR("relay viewer path_name alloc");
@@ -68,216 +62,253 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
 		PERROR("relay viewer channel_name alloc");
 		goto error;
 	}
-	vstream->tracefile_count = stream->tracefile_count;
-	vstream->metadata_flag = stream->metadata_flag;
-	vstream->tracefile_count_last = -1ULL;
 
 	switch (seek_t) {
 	case LTTNG_VIEWER_SEEK_BEGINNING:
-		vstream->tracefile_count_current = stream->oldest_tracefile_id;
+		vstream->current_tracefile_id = stream->oldest_tracefile_id;
 		break;
 	case LTTNG_VIEWER_SEEK_LAST:
-		vstream->tracefile_count_current = stream->tracefile_count_current;
+		vstream->current_tracefile_id = stream->current_tracefile_id;
 		break;
 	default:
-		assert(0);
 		goto error;
 	}
-
-	if (vstream->metadata_flag) {
-		ctf_trace->viewer_metadata_stream = vstream;
+	if (!stream_get(stream)) {
+		ERR("Cannot get stream");
+		goto error;
 	}
+	vstream->stream = stream;
 
-	/* Globally visible after the add unique. */
-	lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle);
-	lttng_ht_add_unique_u64(viewer_streams_ht, &vstream->stream_n);
-
-	vstream->index_read_fd = -1;
-	vstream->read_fd = -1;
-
-	/*
-	 * This is to avoid a race between the initialization of this object and
-	 * the close of the given stream. If the stream is unable to find this
-	 * viewer stream when closing, this copy will at least take the latest
-	 * value. We also need that for the seek_last.
-	 */
-	vstream->total_index_received = stream->total_index_received;
-
+	pthread_mutex_lock(&stream->lock);
 	/*
 	 * If we never received an index for the current stream, delay the opening
 	 * of the index, otherwise open it right now.
 	 */
-	if (vstream->tracefile_count_current == stream->tracefile_count_current
-			&& vstream->total_index_received == 0) {
-		vstream->index_read_fd = -1;
+	if (vstream->current_tracefile_id == stream->current_tracefile_id
+			&& stream->total_index_received == 0) {
+		vstream->index_fd = NULL;
 	} else {
 		int read_fd;
 
 		read_fd = index_open(vstream->path_name, vstream->channel_name,
-				vstream->tracefile_count, vstream->tracefile_count_current);
+				stream->tracefile_count,
+				vstream->current_tracefile_id);
 		if (read_fd < 0) {
+			pthread_mutex_unlock(&stream->lock);
+			goto error;
+		}
+		vstream->index_fd = stream_fd_create(read_fd);
+		if (!vstream->index_fd) {
+			if (close(read_fd)) {
+				PERROR("close");
+			}
 			goto error;
 		}
-		vstream->index_read_fd = read_fd;
 	}
 
-	if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_read_fd >= 0) {
+	if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_fd) {
 		off_t lseek_ret;
 
-		lseek_ret = lseek(vstream->index_read_fd,
-				vstream->total_index_received * sizeof(struct ctf_packet_index),
-				SEEK_CUR);
+		lseek_ret = lseek(vstream->index_fd->fd, 0, SEEK_END);
 		if (lseek_ret < 0) {
+			pthread_mutex_unlock(&stream->lock);
 			goto error;
 		}
-		vstream->last_sent_index = vstream->total_index_received;
+		vstream->last_sent_index = stream->total_index_received;
 	}
+	pthread_mutex_unlock(&stream->lock);
+
+	if (stream->is_metadata) {
+		stream->trace->viewer_metadata_stream = vstream;
+	}
+
+	/* Globally visible after the add unique. */
+	lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle);
+	lttng_ht_add_unique_u64(viewer_streams_ht, &vstream->stream_n);
+
+	pthread_mutex_init(&vstream->lock, NULL);
+	pthread_mutex_init(&vstream->reflock, NULL);
+	urcu_ref_init(&vstream->ref);
 
 	return vstream;
 
 error:
 	if (vstream) {
-		free_stream(vstream);
+		viewer_stream_destroy(vstream);
 	}
 	return NULL;
 }
 
-void viewer_stream_delete(struct relay_viewer_stream *stream)
+static void viewer_stream_unpublish(struct relay_viewer_stream *vstream)
 {
 	int ret;
 	struct lttng_ht_iter iter;
 
-	iter.iter.node = &stream->stream_n.node;
+	iter.iter.node = &vstream->stream_n.node;
 	ret = lttng_ht_del(viewer_streams_ht, &iter);
 	assert(!ret);
 }
 
-void viewer_stream_destroy(struct ctf_trace *ctf_trace,
-		struct relay_viewer_stream *stream)
+static void viewer_stream_release(struct urcu_ref *ref)
 {
-	int ret;
+	struct relay_viewer_stream *vstream = caa_container_of(ref,
+			struct relay_viewer_stream, ref);
 
-	assert(stream);
-
-	if (ctf_trace) {
-		ctf_trace_put_ref(ctf_trace);
+	if (vstream->stream->is_metadata) {
+		vstream->stream->trace->viewer_metadata_stream = NULL;
 	}
 
-	if (stream->read_fd >= 0) {
-		ret = close(stream->read_fd);
-		if (ret < 0) {
-			PERROR("close read_fd");
-		}
+	viewer_stream_unpublish(vstream);
+
+	if (vstream->stream_fd) {
+		stream_fd_put(vstream->stream_fd);
+		vstream->stream_fd = NULL;
 	}
-	if (stream->index_read_fd >= 0) {
-		ret = close(stream->index_read_fd);
-		if (ret < 0) {
-			PERROR("close index_read_fd");
-		}
+	if (vstream->index_fd) {
+		stream_fd_put(vstream->index_fd);
+		vstream->index_fd = NULL;
+	}
+	if (vstream->stream) {
+		stream_put(vstream->stream);
+		vstream->stream = NULL;
+	}
+	call_rcu(&vstream->rcu_node, viewer_stream_destroy_rcu);
+}
+
+/* Should be called with RCU read-side lock held. */
+bool viewer_stream_get(struct relay_viewer_stream *vstream)
+{
+	bool has_ref = false;
+
+	pthread_mutex_lock(&vstream->reflock);
+	if (vstream->ref.refcount != 0) {
+		has_ref = true;
+		urcu_ref_get(&vstream->ref);
 	}
+	pthread_mutex_unlock(&vstream->reflock);
 
-	call_rcu(&stream->rcu_node, deferred_free_viewer_stream);
+	return has_ref;
 }
 
 /*
- * Find viewer stream by id. RCU read side lock MUST be acquired.
+ * Get viewer stream by id.
  *
- * Return stream if found else NULL.
+ * Return viewer stream if found else NULL.
  */
-struct relay_viewer_stream *viewer_stream_find_by_id(uint64_t id)
+struct relay_viewer_stream *viewer_stream_get_by_id(uint64_t id)
 {
 	struct lttng_ht_node_u64 *node;
 	struct lttng_ht_iter iter;
-	struct relay_viewer_stream *stream = NULL;
+	struct relay_viewer_stream *vstream = NULL;
 
+	rcu_read_lock();
 	lttng_ht_lookup(viewer_streams_ht, &id, &iter);
 	node = lttng_ht_iter_get_node_u64(&iter);
 	if (!node) {
 		DBG("Relay viewer stream %" PRIu64 " not found", id);
 		goto end;
 	}
-	stream = caa_container_of(node, struct relay_viewer_stream, stream_n);
-
+	vstream = caa_container_of(node, struct relay_viewer_stream, stream_n);
+	if (!viewer_stream_get(vstream)) {
+		vstream = NULL;
+	}
 end:
-	return stream;
+	rcu_read_unlock();
+	return vstream;
+}
+
+void viewer_stream_put(struct relay_viewer_stream *vstream)
+{
+	rcu_read_lock();
+	pthread_mutex_lock(&vstream->reflock);
+	urcu_ref_put(&vstream->ref, viewer_stream_release);
+	pthread_mutex_unlock(&vstream->reflock);
+	rcu_read_unlock();
+}
+
+/*
+ * Returns whether the current tracefile is readable. If not, it has
+ * been overwritten.
+ * Must be called with rstream and vstream locks held.
+ */
+bool viewer_stream_is_tracefile_id_readable(struct relay_viewer_stream *vstream, uint64_t id)
+{
+	struct relay_stream *stream = vstream->stream;
+
+	if (stream->oldest_tracefile_id <= stream->current_tracefile_id) {
+		if (id >= stream->oldest_tracefile_id && id < stream->current_tracefile_id) {
+			/* id is a readable file. */
+			return true;
+		} else {
+			/* id is not readable. */
+			return false;
+		}
+	} else {
+		if (id >= stream->oldest_tracefile_id || id < stream->current_tracefile_id) {
+			/* id is a readable file. */
+			return true;
+		} else {
+			/* id is not readable. */
+			return false;
+		}
+	}
 }
 
 /*
  * Rotate a stream to the next tracefile.
  *
- * Must be called with viewer_stream_rotation_lock held.
+ * Must be called with rstream and vstream locks held.
  * Returns 0 on success, 1 on EOF, a negative value on error.
  */
-int viewer_stream_rotate(struct relay_viewer_stream *vstream,
-		struct relay_stream *stream)
+int viewer_stream_rotate(struct relay_viewer_stream *vstream)
 {
 	int ret;
-	uint64_t tracefile_id;
-
-	assert(vstream);
-	assert(stream);
-
-	if (vstream->tracefile_count == 0) {
-		/* Ignore rotation, there is none to do. */
-		ret = 0;
-		goto end;
-	}
-
-	tracefile_id = (vstream->tracefile_count_current + 1) %
-		vstream->tracefile_count;
+	uint64_t new_id;
+	struct relay_stream *stream = vstream->stream;
 
 	/* Detect the last tracefile to open. */
-	if (vstream->tracefile_count_last != -1ULL &&
-			vstream->tracefile_count_last ==
-			vstream->tracefile_count_current) {
+	if (stream->total_index_received == vstream->last_sent_index
+			&& stream->trace->session->connection_closed) {
 		ret = 1;
 		goto end;
 	}
 
-	/*
-	 * The writer and the reader are not working in the same tracefile, we can
-	 * read up to EOF, we don't care about the total_index_received.
-	 */
-	if (stream->close_flag || (stream->tracefile_count_current != tracefile_id)) {
-		vstream->close_write_flag = 1;
-	} else {
-		/*
-		 * We are opening a file that is still open in write, make sure we
-		 * limit our reading to the number of indexes received.
-		 */
-		vstream->close_write_flag = 0;
-		if (stream->close_flag) {
-			vstream->total_index_received = stream->total_index_received;
-		}
+	if (stream->tracefile_count == 0) {
+		/* Ignore rotation, there is none to do. */
+		ret = 0;
+		goto end;
 	}
-	vstream->tracefile_count_current = tracefile_id;
 
-	ret = close(vstream->index_read_fd);
-	if (ret < 0) {
-		PERROR("close index file %d", vstream->index_read_fd);
+	new_id = (vstream->current_tracefile_id + 1) % stream->tracefile_count;
+	if (!viewer_stream_is_tracefile_id_readable(vstream, new_id)) {
+		new_id = stream->oldest_tracefile_id;
 	}
-	vstream->index_read_fd = -1;
+	vstream->current_tracefile_id = new_id;
 
-	ret = close(vstream->read_fd);
-	if (ret < 0) {
-		PERROR("close tracefile %d", vstream->read_fd);
+	if (vstream->index_fd) {
+		stream_fd_put(vstream->index_fd);
+		vstream->index_fd = NULL;
+	}
+	if (vstream->stream_fd) {
+		stream_fd_put(vstream->stream_fd);
+		vstream->stream_fd = NULL;
 	}
-	vstream->read_fd = -1;
-
-	pthread_mutex_lock(&vstream->overwrite_lock);
-	vstream->abort_flag = 0;
-	pthread_mutex_unlock(&vstream->overwrite_lock);
 
 	ret = index_open(vstream->path_name, vstream->channel_name,
-			vstream->tracefile_count, vstream->tracefile_count_current);
+			stream->tracefile_count,
+			vstream->current_tracefile_id);
 	if (ret < 0) {
-		goto error;
+		goto end;
+	}
+	vstream->index_fd = stream_fd_create(ret);
+	if (vstream->index_fd) {
+		ret = 0;
+	} else {
+		if (close(ret)) {
+			PERROR("close");
+		}
+		ret = -1;
 	}
-	vstream->index_read_fd = ret;
-
-	ret = 0;
-
 end:
-error:
 	return ret;
 }
diff --git a/src/bin/lttng-relayd/viewer-stream.h b/src/bin/lttng-relayd/viewer-stream.h
index 003b119..75527d2 100644
--- a/src/bin/lttng-relayd/viewer-stream.h
+++ b/src/bin/lttng-relayd/viewer-stream.h
@@ -1,6 +1,10 @@
+#ifndef _VIEWER_STREAM_H
+#define _VIEWER_STREAM_H
+
 /*
  * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
  *                      David Goulet <dgoulet at efficios.com>
+ *               2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License, version 2 only, as
@@ -16,9 +20,6 @@
  * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
-#ifndef _VIEWER_STREAM_H
-#define _VIEWER_STREAM_H
-
 #include <limits.h>
 #include <inttypes.h>
 #include <pthread.h>
@@ -33,57 +34,47 @@
 struct relay_stream;
 
 /*
- * Shadow copy of the relay_stream structure for the viewer side.  The only
- * fields updated by the writer (streaming side) after allocation are :
- * total_index_received and close_flag. Everything else is updated by the
- * reader (viewer side).
+ * Shadow copy of the relay_stream structure for the viewer side.
  */
 struct relay_viewer_stream {
-	uint64_t stream_handle;
-	uint64_t session_id;
-	int read_fd;
-	int index_read_fd;
+	struct urcu_ref ref;
+	pthread_mutex_t reflock;
+
+	/*
+	 * This lock nests inside the stream lock.
+	 */
+	pthread_mutex_t lock;
+
+	/* Back ref to stream */
+	struct relay_stream *stream;
+
+	/* FD from which to read the stream data. */
+	struct stream_fd *stream_fd;
+	/* FD from which to read the index data. */
+	struct stream_fd *index_fd;
+
 	char *path_name;
 	char *channel_name;
+
+	uint64_t current_tracefile_id;
 	uint64_t last_sent_index;
-	uint64_t total_index_received;
-	uint64_t tracefile_count;
-	uint64_t tracefile_count_current;
-	/* Stop after reading this tracefile. */
-	uint64_t tracefile_count_last;
+
+	/* Indicates if this stream has been sent to a viewer client. */
+	bool sent_flag;
+
 	struct lttng_ht_node_u64 stream_n;
 	struct rcu_head rcu_node;
-	struct ctf_trace *ctf_trace;
-	/*
-	 * This lock blocks only when the writer is about to start overwriting
-	 * a file currently read by the reader.
-	 *
-	 * This is nested INSIDE the viewer_stream_rotation_lock.
-	 */
-	pthread_mutex_t overwrite_lock;
-	/* Information telling us if the stream is a metadata stream. */
-	unsigned int metadata_flag:1;
-	/*
-	 * Information telling us that the stream is closed in write, so
-	 * we don't expect new indexes and we can read up to EOF.
-	 */
-	unsigned int close_write_flag:1;
-	/*
-	 * If the streaming side closes a FD in use in the viewer side,
-	 * it sets this flag to inform that it is a normal error.
-	 */
-	unsigned int abort_flag:1;
-	/* Indicates if this stream has been sent to a viewer client. */
-	unsigned int sent_flag:1;
 };
 
 struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
-		enum lttng_viewer_seek seek_t, struct ctf_trace *ctf_trace);
-struct relay_viewer_stream *viewer_stream_find_by_id(uint64_t id);
-void viewer_stream_destroy(struct ctf_trace *ctf_trace,
-		struct relay_viewer_stream *stream);
-void viewer_stream_delete(struct relay_viewer_stream *stream);
-int viewer_stream_rotate(struct relay_viewer_stream *vstream,
-		struct relay_stream *stream);
+		enum lttng_viewer_seek seek_t);
+
+struct relay_viewer_stream *viewer_stream_get_by_id(uint64_t id);
+bool viewer_stream_get(struct relay_viewer_stream *vstream);
+void viewer_stream_put(struct relay_viewer_stream *vstream);
+int viewer_stream_rotate(struct relay_viewer_stream *vstream);
+bool viewer_stream_is_tracefile_id_readable(struct relay_viewer_stream *vstream,
+		uint64_t id);
+
 
 #endif /* _VIEWER_STREAM_H */
diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c
index 87d5f34..0c5ffff 100644
--- a/src/bin/lttng-sessiond/consumer.c
+++ b/src/bin/lttng-sessiond/consumer.c
@@ -1077,11 +1077,8 @@ error:
 }
 
 /*
- * Ask the consumer if the data is ready to read (NOT pending) for the specific
- * session id.
- *
- * This function has a different behavior with the consumer i.e. that it waits
- * for a reply from the consumer if yes or no the data is pending.
+ * Ask the consumer if the data is pending for the specific session id.
+ * Returns 1 if data is pending, 0 otherwise, or < 0 on error.
  */
 int consumer_is_data_pending(uint64_t session_id,
 		struct consumer_output *consumer)
-- 
2.1.4




More information about the lttng-dev mailing list