[lttng-dev] [PATCH v3 lttng-tools 4/4] Fix: Relay daemon ownership and reference counting
Mathieu Desnoyers
mathieu.desnoyers at efficios.com
Mon Aug 17 14:12:10 EDT 2015
The ownership and reference counting of the relay daemon is unclear and
buggy in many ways. It is the cause of memory corruptions, double-free,
leaks, segmentation faults, observed in various conditions.
Fix this situation by introducing a clear ownership and reference
counting scheme for this daemon.
See doc/relayd-architecture.txt for details.
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
---
doc/Makefile.am | 3 +-
doc/relayd-architecture.txt | 96 +++
src/bin/lttng-relayd/Makefile.am | 4 +-
src/bin/lttng-relayd/cmd-2-1.c | 17 +-
src/bin/lttng-relayd/cmd-2-1.h | 10 +-
src/bin/lttng-relayd/cmd-2-2.c | 22 +-
src/bin/lttng-relayd/cmd-2-2.h | 10 +-
src/bin/lttng-relayd/cmd-2-4.c | 19 +-
src/bin/lttng-relayd/cmd-2-4.h | 10 +-
src/bin/lttng-relayd/cmd-generic.c | 1 +
src/bin/lttng-relayd/cmd-generic.h | 7 +-
src/bin/lttng-relayd/cmd.h | 7 +-
src/bin/lttng-relayd/connection.c | 128 ++--
src/bin/lttng-relayd/connection.h | 63 +-
src/bin/lttng-relayd/ctf-trace.c | 198 +++---
src/bin/lttng-relayd/ctf-trace.h | 66 +-
src/bin/lttng-relayd/index.c | 342 ++++++----
src/bin/lttng-relayd/index.h | 60 +-
src/bin/lttng-relayd/live.c | 1049 ++++++++++++++---------------
src/bin/lttng-relayd/live.h | 10 +-
src/bin/lttng-relayd/lttng-relayd.h | 23 +-
src/bin/lttng-relayd/main.c | 1167 ++++++++++++++-------------------
src/bin/lttng-relayd/session.c | 214 +++---
src/bin/lttng-relayd/session.h | 116 ++--
src/bin/lttng-relayd/stream-fd.c | 58 ++
src/bin/lttng-relayd/stream-fd.h | 32 +
src/bin/lttng-relayd/stream.c | 343 ++++++++--
src/bin/lttng-relayd/stream.h | 130 ++--
src/bin/lttng-relayd/utils.h | 7 +-
src/bin/lttng-relayd/viewer-session.c | 164 +++++
src/bin/lttng-relayd/viewer-session.h | 53 ++
src/bin/lttng-relayd/viewer-stream.c | 325 +++++----
src/bin/lttng-relayd/viewer-stream.h | 83 ++-
src/bin/lttng-sessiond/consumer.c | 7 +-
src/bin/lttng-sessiond/ust-consumer.c | 3 +-
src/common/index/ctf-index.h | 2 +-
src/common/index/index.c | 13 +
src/common/utils.c | 109 ++-
src/common/utils.h | 2 +
39 files changed, 2909 insertions(+), 2064 deletions(-)
create mode 100644 doc/relayd-architecture.txt
create mode 100644 src/bin/lttng-relayd/stream-fd.c
create mode 100644 src/bin/lttng-relayd/stream-fd.h
create mode 100644 src/bin/lttng-relayd/viewer-session.c
create mode 100644 src/bin/lttng-relayd/viewer-session.h
diff --git a/doc/Makefile.am b/doc/Makefile.am
index 4ff71eb..7ae9303 100644
--- a/doc/Makefile.am
+++ b/doc/Makefile.am
@@ -1,7 +1,8 @@
SUBDIRS = man
EXTRA_DIST = quickstart.txt streaming-howto.txt python-howto.txt \
snapshot-howto.txt calibrate.txt kernel-CodingStyle.txt \
- live-reading-howto.txt live-reading-protocol.txt
+ live-reading-howto.txt live-reading-protocol.txt \
+ relayd-architecture.txt
dist_doc_DATA = quickstart.txt streaming-howto.txt python-howto.txt \
snapshot-howto.txt calibrate.txt live-reading-howto.txt \
diff --git a/doc/relayd-architecture.txt b/doc/relayd-architecture.txt
new file mode 100644
index 0000000..c45bd56
--- /dev/null
+++ b/doc/relayd-architecture.txt
@@ -0,0 +1,96 @@
+LTTng Relay Daemon Architecture
+Mathieu Desnoyers, August 2015
+
+This document describes the object model and architecture of the relay
+daemon, after the refactoring done within the commit "Fix: Relay daemon
+ownership and reference counting".
+
+We have the following object composition hierarchy:
+
+relay connection (main.c, for sessiond/consumer)
+ |
+ \-> 0 or 1 session
+ |
+ \-> 0 or many ctf-trace
+ |
+ \-> 0 or many stream
+ | |
+ | \-> 0 or many index
+ |
+ \-------> 0 or 1 viewer stream
+
+live connection (live.c, for client)
+ |
+ \-> 1 viewer session
+ |
+ \-> 0 or many session (actually a reference to session as created
+ | by the relay connection)
+ |
+ \-> ..... (ctf-trace, stream, index, viewer stream)
+
+There are global tables declared in lttng-relayd.h for sessions
+(sessions_ht, indexed by session id), streams (relay_streams_ht, indexed
+by stream handle), and viewer streams (viewer_streams_ht, indexed by
+stream handle). The purpuse of those tables is to allow fast lookup of
+those objects using the IDs received in the communication protocols.
+
+There is also one connection hash table per worker thread. There is one
+worker thread to receive data (main.c), and one worker thread to
+interact with viewer clients (live.c). Those tables are indexed by
+socket file descriptor.
+
+A RCU lookup+refcounting scheme has been introduced for all objects
+(except viewer session which is still an exception at the moment). This
+scheme allows looking up the objects or doing a traversal on the RCU
+linked list or hash table in combination with a getter on the object.
+This getter validables that there is still at least one reference to the
+object, else the lookup acts just as if the object does not exist. This
+scheme is protected by a "reflock" mutex in each object. "reflock"
+mutexes can be nested from the innermost object to the outermost object.
+IOW, the session reflock can nest within the ctf-trace reflock.
+
+There is also a "lock" mutex in each object. Those are used to
+synchronize between threads (currently the main.c relay thread and
+live.c client thread) when objects are shared. Locks can be nested form
+the outmost object to the innermost object. IOW, the ctf-trace lock can
+nest within the session lock.
+
+A "lock" should never nest within a "reflock".
+
+RCU linked lists are used to iterate using RCU, and protected by
+their own mutex for modifications. Iterations should be confirmed using
+the object "getter" to ensure its refcount is not 0 (exept in cases
+where the caller actually owns the objects and therefore can assume its
+refcount is not 0).
+
+RCU hash tables are used to iterate using RCU. Iteration should be
+confirmed using the object "getter" to ensure its refcount is not 0
+(except again if we have ownership and can assume the object refcount is
+not 0).
+
+Object creation has a refcount of 1. Each getter increments the
+refcount, and need to be paired with a "put" to decrement it. A final
+put on "self" (ownership) will allow refcount to reach 0, therefore
+triggering release, and thus free through call_rcu.
+
+In the composition scheme, we find back references from each composite
+to its container. Therefore, each composite holds a reference (refcount)
+on its container. This allows following pointers from e.g. viewer stream
+to stream to ctf-trace to session without performing any validation,
+due to transitive refcounting of those back-references.
+
+In addition to those back references, there are a few key ownership
+references held. The connection in the relay worker thread (main.c)
+holds ownership on the session, and on each stream it contains. The
+connection in the live worker thread (live.c) holds ownership on each
+the viewer stream it creates. The rest is ensured by back references
+from composite to container objects. When a connection is closed, it
+puts all the ownership references it is holding. This will then
+eventually trigger destruction of the session, streams, and viewer
+streams associated with the connection when all the back references
+reach 0.
+
+RCU read-side locks are now only held during iteration on RCU lists and
+hash tables, and within the internals of the get (lookup) and put
+functions. Those functions then use refcounting to ensure existance of
+the object when returned to their caller.
diff --git a/src/bin/lttng-relayd/Makefile.am b/src/bin/lttng-relayd/Makefile.am
index 126cdaf..428f352 100644
--- a/src/bin/lttng-relayd/Makefile.am
+++ b/src/bin/lttng-relayd/Makefile.am
@@ -17,7 +17,9 @@ lttng_relayd_SOURCES = main.c lttng-relayd.h utils.h utils.c cmd.h \
viewer-stream.h viewer-stream.c \
session.c session.h \
stream.c stream.h \
- connection.c connection.h
+ stream-fd.c stream-fd.h \
+ connection.c connection.h \
+ viewer-session.c viewer-session.h
# link on liblttngctl for check if relayd is already alive.
lttng_relayd_LDADD = -lrt -lurcu-common -lurcu \
diff --git a/src/bin/lttng-relayd/cmd-2-1.c b/src/bin/lttng-relayd/cmd-2-1.c
index 0cd9b5a..a2c340f 100644
--- a/src/bin/lttng-relayd/cmd-2-1.c
+++ b/src/bin/lttng-relayd/cmd-2-1.c
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -28,30 +29,30 @@
#include "cmd-2-1.h"
#include "utils.h"
+/*
+ * cmd_recv_stream_2_1 allocates path_name and channel_name.
+ */
int cmd_recv_stream_2_1(struct relay_connection *conn,
- struct relay_stream *stream)
+ char **path_name, char **channel_name)
{
int ret;
struct lttcomm_relayd_add_stream stream_info;
- assert(conn);
- assert(stream);
-
ret = cmd_recv(conn->sock, &stream_info, sizeof(stream_info));
if (ret < 0) {
ERR("Unable to recv stream version 2.1");
goto error;
}
- stream->path_name = create_output_path(stream_info.pathname);
- if (stream->path_name == NULL) {
+ *path_name = create_output_path(stream_info.pathname);
+ if (*path_name == NULL) {
PERROR("Path name allocation");
ret = -ENOMEM;
goto error;
}
- stream->channel_name = strdup(stream_info.channel_name);
- if (stream->channel_name == NULL) {
+ *channel_name = strdup(stream_info.channel_name);
+ if (*channel_name == NULL) {
ret = -errno;
PERROR("Path name allocation");
goto error;
diff --git a/src/bin/lttng-relayd/cmd-2-1.h b/src/bin/lttng-relayd/cmd-2-1.h
index bab8190..46283dc 100644
--- a/src/bin/lttng-relayd/cmd-2-1.h
+++ b/src/bin/lttng-relayd/cmd-2-1.h
@@ -1,6 +1,10 @@
+#ifndef RELAYD_CMD_2_1_H
+#define RELAYD_CMD_2_1_H
+
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -16,13 +20,9 @@
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef RELAYD_CMD_2_1_H
-#define RELAYD_CMD_2_1_H
-
#include "lttng-relayd.h"
-#include "stream.h"
int cmd_recv_stream_2_1(struct relay_connection *conn,
- struct relay_stream *stream);
+ char **path_name, char **channel_name);
#endif /* RELAYD_CMD_2_1_H */
diff --git a/src/bin/lttng-relayd/cmd-2-2.c b/src/bin/lttng-relayd/cmd-2-2.c
index 7dd99ad..1493b73 100644
--- a/src/bin/lttng-relayd/cmd-2-2.c
+++ b/src/bin/lttng-relayd/cmd-2-2.c
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -30,37 +31,38 @@
#include "cmd-2-1.h"
#include "utils.h"
+/*
+ * cmd_recv_stream_2_2 allocates path_name and channel_name.
+ */
int cmd_recv_stream_2_2(struct relay_connection *conn,
- struct relay_stream *stream)
+ char **path_name, char **channel_name,
+ uint64_t *tracefile_size, uint64_t *tracefile_count)
{
int ret;
struct lttcomm_relayd_add_stream_2_2 stream_info;
- assert(conn);
- assert(stream);
-
ret = cmd_recv(conn->sock, &stream_info, sizeof(stream_info));
if (ret < 0) {
ERR("Unable to recv stream version 2.2");
goto error;
}
- stream->path_name = create_output_path(stream_info.pathname);
- if (stream->path_name == NULL) {
+ *path_name = create_output_path(stream_info.pathname);
+ if (*path_name == NULL) {
PERROR("Path name allocation");
ret = -ENOMEM;
goto error;
}
- stream->channel_name = strdup(stream_info.channel_name);
- if (stream->channel_name == NULL) {
+ *channel_name = strdup(stream_info.channel_name);
+ if (*channel_name == NULL) {
ret = -errno;
PERROR("Path name allocation");
goto error;
}
- stream->tracefile_size = be64toh(stream_info.tracefile_size);
- stream->tracefile_count = be64toh(stream_info.tracefile_count);
+ *tracefile_size = be64toh(stream_info.tracefile_size);
+ *tracefile_count = be64toh(stream_info.tracefile_count);
ret = 0;
error:
diff --git a/src/bin/lttng-relayd/cmd-2-2.h b/src/bin/lttng-relayd/cmd-2-2.h
index bd1cd14..894a63a 100644
--- a/src/bin/lttng-relayd/cmd-2-2.h
+++ b/src/bin/lttng-relayd/cmd-2-2.h
@@ -1,6 +1,10 @@
+#ifndef RELAYD_CMD_2_2_H
+#define RELAYD_CMD_2_2_H
+
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -16,12 +20,10 @@
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef RELAYD_CMD_2_2_H
-#define RELAYD_CMD_2_2_H
-
#include "lttng-relayd.h"
int cmd_recv_stream_2_2(struct relay_connection *conn,
- struct relay_stream *stream);
+ char **path_name, char **channel_name,
+ uint64_t *tracefile_size, uint64_t *tracefile_count);
#endif /* RELAYD_CMD_2_2_H */
diff --git a/src/bin/lttng-relayd/cmd-2-4.c b/src/bin/lttng-relayd/cmd-2-4.c
index d8aa737..a3290cb 100644
--- a/src/bin/lttng-relayd/cmd-2-4.c
+++ b/src/bin/lttng-relayd/cmd-2-4.c
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -30,26 +31,24 @@
#include "lttng-relayd.h"
int cmd_create_session_2_4(struct relay_connection *conn,
- struct relay_session *session)
+ char *session_name, char *hostname,
+ uint32_t *live_timer, bool *snapshot)
{
int ret;
struct lttcomm_relayd_create_session_2_4 session_info;
- assert(conn);
- assert(session);
-
ret = cmd_recv(conn->sock, &session_info, sizeof(session_info));
if (ret < 0) {
ERR("Unable to recv session info version 2.4");
goto error;
}
- strncpy(session->session_name, session_info.session_name,
- sizeof(session->session_name));
- strncpy(session->hostname, session_info.hostname,
- sizeof(session->hostname));
- session->live_timer = be32toh(session_info.live_timer);
- session->snapshot = be32toh(session_info.snapshot);
+ strncpy(session_name, session_info.session_name,
+ sizeof(session_info.session_name));
+ strncpy(hostname, session_info.hostname,
+ sizeof(session_info.hostname));
+ *live_timer = be32toh(session_info.live_timer);
+ *snapshot = be32toh(session_info.snapshot);
ret = 0;
diff --git a/src/bin/lttng-relayd/cmd-2-4.h b/src/bin/lttng-relayd/cmd-2-4.h
index aaf572a..ab73478 100644
--- a/src/bin/lttng-relayd/cmd-2-4.h
+++ b/src/bin/lttng-relayd/cmd-2-4.h
@@ -1,6 +1,10 @@
+#ifndef RELAYD_CMD_2_4_H
+#define RELAYD_CMD_2_4_H
+
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -16,12 +20,10 @@
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef RELAYD_CMD_2_4_H
-#define RELAYD_CMD_2_4_H
-
#include "lttng-relayd.h"
int cmd_create_session_2_4(struct relay_connection *conn,
- struct relay_session *session);
+ char *session_name, char *hostname,
+ uint32_t *live_timer, bool *snapshot);
#endif /* RELAYD_CMD_2_4_H */
diff --git a/src/bin/lttng-relayd/cmd-generic.c b/src/bin/lttng-relayd/cmd-generic.c
index 417d6d3..276e85b 100644
--- a/src/bin/lttng-relayd/cmd-generic.c
+++ b/src/bin/lttng-relayd/cmd-generic.c
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
diff --git a/src/bin/lttng-relayd/cmd-generic.h b/src/bin/lttng-relayd/cmd-generic.h
index 640fed7..4551f0a 100644
--- a/src/bin/lttng-relayd/cmd-generic.h
+++ b/src/bin/lttng-relayd/cmd-generic.h
@@ -1,6 +1,10 @@
+#ifndef RELAYD_CMD_GENERIC_H
+#define RELAYD_CMD_GENERIC_H
+
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -16,9 +20,6 @@
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef RELAYD_CMD_GENERIC_H
-#define RELAYD_CMD_GENERIC_H
-
#include <common/sessiond-comm/sessiond-comm.h>
#include "connection.h"
diff --git a/src/bin/lttng-relayd/cmd.h b/src/bin/lttng-relayd/cmd.h
index c8b37d5..88db09a 100644
--- a/src/bin/lttng-relayd/cmd.h
+++ b/src/bin/lttng-relayd/cmd.h
@@ -1,6 +1,10 @@
+#ifndef RELAYD_CMD_H
+#define RELAYD_CMD_H
+
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -16,9 +20,6 @@
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef RELAYD_CMD_H
-#define RELAYD_CMD_H
-
#include "cmd-generic.h"
#include "cmd-2-1.h"
#include "cmd-2-2.h"
diff --git a/src/bin/lttng-relayd/connection.c b/src/bin/lttng-relayd/connection.c
index 76e48a6..b425548 100644
--- a/src/bin/lttng-relayd/connection.c
+++ b/src/bin/lttng-relayd/connection.c
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -19,91 +20,132 @@
#define _GNU_SOURCE
#define _LGPL_SOURCE
#include <common/common.h>
+#include <urcu/rculist.h>
#include "connection.h"
#include "stream.h"
+#include "viewer-session.h"
-static void rcu_free_connection(struct rcu_head *head)
+bool connection_get(struct relay_connection *conn)
{
- struct relay_connection *conn =
- caa_container_of(head, struct relay_connection, rcu_node);
+ bool has_ref = false;
- lttcomm_destroy_sock(conn->sock);
- connection_free(conn);
+ pthread_mutex_lock(&conn->reflock);
+ if (conn->ref.refcount != 0) {
+ has_ref = true;
+ urcu_ref_get(&conn->ref);
+ }
+ pthread_mutex_unlock(&conn->reflock);
+
+ return has_ref;
}
-/*
- * Must be called with a read side lock held. The read side lock must be
- * kept until the returned relay_connection is no longer in use.
- */
-struct relay_connection *connection_find_by_sock(struct lttng_ht *ht, int sock)
+struct relay_connection *connection_get_by_sock(struct lttng_ht *relay_connections_ht,
+ int sock)
{
struct lttng_ht_node_ulong *node;
struct lttng_ht_iter iter;
struct relay_connection *conn = NULL;
- assert(ht);
assert(sock >= 0);
- lttng_ht_lookup(ht, (void *)((unsigned long) sock), &iter);
+ rcu_read_lock();
+ lttng_ht_lookup(relay_connections_ht, (void *)((unsigned long) sock),
+ &iter);
node = lttng_ht_iter_get_node_ulong(&iter);
if (!node) {
DBG2("Relay connection by sock %d not found", sock);
goto end;
}
conn = caa_container_of(node, struct relay_connection, sock_n);
-
+ if (!connection_get(conn)) {
+ conn = NULL;
+ }
end:
+ rcu_read_unlock();
return conn;
}
-void connection_delete(struct lttng_ht *ht, struct relay_connection *conn)
+struct relay_connection *connection_create(struct lttcomm_sock *sock,
+ enum connection_type type)
{
- int ret;
- struct lttng_ht_iter iter;
-
- assert(ht);
- assert(conn);
+ struct relay_connection *conn;
- iter.iter.node = &conn->sock_n.node;
- ret = lttng_ht_del(ht, &iter);
- assert(!ret);
+ conn = zmalloc(sizeof(*conn));
+ if (!conn) {
+ PERROR("zmalloc relay connection");
+ goto end;
+ }
+ pthread_mutex_init(&conn->lock, NULL);
+ pthread_mutex_init(&conn->reflock, NULL);
+ urcu_ref_init(&conn->ref);
+ conn->type = type;
+ conn->sock = sock;
+ lttng_ht_node_init_ulong(&conn->sock_n, (unsigned long) conn->sock->fd);
+end:
+ return conn;
}
-void connection_destroy(struct relay_connection *conn)
+static void rcu_free_connection(struct rcu_head *head)
{
- assert(conn);
+ struct relay_connection *conn =
+ caa_container_of(head, struct relay_connection, rcu_node);
+ lttcomm_destroy_sock(conn->sock);
+ if (conn->viewer_session) {
+ viewer_session_destroy(conn->viewer_session);
+ conn->viewer_session = NULL;
+ }
+ free(conn);
+}
+
+static void destroy_connection(struct relay_connection *conn)
+{
call_rcu(&conn->rcu_node, rcu_free_connection);
}
-struct relay_connection *connection_create(void)
+static void connection_release(struct urcu_ref *ref)
{
- struct relay_connection *conn;
+ struct relay_connection *conn =
+ caa_container_of(ref, struct relay_connection, ref);
- conn = zmalloc(sizeof(*conn));
- if (!conn) {
- PERROR("zmalloc relay connection");
- goto error;
+ if (conn->in_socket_ht) {
+ struct lttng_ht_iter iter;
+ int ret;
+
+ iter.iter.node = &conn->sock_n.node;
+ ret = lttng_ht_del(conn->socket_ht, &iter);
+ assert(!ret);
}
-error:
- return conn;
+ if (conn->session) {
+ if (session_close(conn->session)) {
+ ERR("session_close");
+ }
+ conn->session = NULL;
+ }
+ if (conn->viewer_session) {
+ viewer_session_close(conn->viewer_session);
+ }
+ destroy_connection(conn);
}
-void connection_init(struct relay_connection *conn)
+void connection_put(struct relay_connection *conn)
{
- assert(conn);
- assert(conn->sock);
-
- CDS_INIT_LIST_HEAD(&conn->recv_head);
- lttng_ht_node_init_ulong(&conn->sock_n, (unsigned long) conn->sock->fd);
+ rcu_read_lock();
+ pthread_mutex_lock(&conn->reflock);
+ urcu_ref_put(&conn->ref, connection_release);
+ pthread_mutex_unlock(&conn->reflock);
+ rcu_read_unlock();
}
-void connection_free(struct relay_connection *conn)
+void connection_ht_add(struct lttng_ht *relay_connections_ht,
+ struct relay_connection *conn)
{
- assert(conn);
-
- free(conn->viewer_session);
- free(conn);
+ pthread_mutex_lock(&conn->lock);
+ assert(!conn->in_socket_ht);
+ lttng_ht_add_unique_ulong(relay_connections_ht, &conn->sock_n);
+ conn->in_socket_ht = 1;
+ conn->socket_ht = relay_connections_ht;
+ pthread_mutex_unlock(&conn->lock);
}
diff --git a/src/bin/lttng-relayd/connection.h b/src/bin/lttng-relayd/connection.h
index 70fe4ba..1fff515 100644
--- a/src/bin/lttng-relayd/connection.h
+++ b/src/bin/lttng-relayd/connection.h
@@ -1,6 +1,10 @@
+#ifndef _CONNECTION_H
+#define _CONNECTION_H
+
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -16,9 +20,6 @@
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef _CONNECTION_H
-#define _CONNECTION_H
-
#include <limits.h>
#include <inttypes.h>
#include <pthread.h>
@@ -32,6 +33,7 @@
#include "session.h"
enum connection_type {
+ RELAY_CONNECTION_UNKNOWN = 0,
RELAY_DATA = 1,
RELAY_CONTROL = 2,
RELAY_VIEWER_COMMAND = 3,
@@ -44,36 +46,49 @@ enum connection_type {
*/
struct relay_connection {
struct lttcomm_sock *sock;
- struct relay_session *session;
- struct relay_viewer_session *viewer_session;
struct cds_wfcq_node qnode;
- struct lttng_ht_node_ulong sock_n;
- struct rcu_head rcu_node;
+
enum connection_type type;
- /* Protocol version to use for this connection. */
+ /*
+ * session is only ever set for RELAY_CONTROL connection type.
+ */
+ struct relay_session *session;
+ /*
+ * viewer_session is only ever set for RELAY_VIEWER_COMMAND
+ * connection type.
+ */
+ struct relay_viewer_session *viewer_session;
+
+ /*
+ * Protocol version to use for this connection. Only valid for
+ * RELAY_CONTROL connection type.
+ */
uint32_t major;
uint32_t minor;
- uint64_t session_id;
+
+ struct urcu_ref ref;
+ pthread_mutex_t reflock;
+
+ pthread_mutex_t lock;
+
+ bool version_check_done;
/*
- * This contains streams that are received on that connection. It's used to
- * store them until we get the streams sent command where they are removed
- * and flagged ready for the viewer. This is ONLY used by the control
- * thread thus any action on it should happen in that thread.
+ * Node member of connection within global socket hash table.
*/
- struct cds_list_head recv_head;
- unsigned int version_check_done:1;
-
- /* Pointer to the sessions HT that this connection can use. */
- struct lttng_ht *sessions_ht;
+ struct lttng_ht_node_ulong sock_n;
+ bool in_socket_ht;
+ struct lttng_ht *socket_ht; /* HACK: Contained within this hash table. */
+ struct rcu_head rcu_node; /* For call_rcu teardown. */
};
-struct relay_connection *connection_find_by_sock(struct lttng_ht *ht,
+struct relay_connection *connection_create(struct lttcomm_sock *sock,
+ enum connection_type type);
+struct relay_connection *connection_get_by_sock(struct lttng_ht *relay_connections_ht,
int sock);
-struct relay_connection *connection_create(void);
-void connection_init(struct relay_connection *conn);
-void connection_delete(struct lttng_ht *ht, struct relay_connection *conn);
-void connection_destroy(struct relay_connection *conn);
-void connection_free(struct relay_connection *conn);
+bool connection_get(struct relay_connection *connection);
+void connection_put(struct relay_connection *connection);
+void connection_ht_add(struct lttng_ht *relay_connections_ht,
+ struct relay_connection *conn);
#endif /* _CONNECTION_H */
diff --git a/src/bin/lttng-relayd/ctf-trace.c b/src/bin/lttng-relayd/ctf-trace.c
index 02a8b2b..20e1543 100644
--- a/src/bin/lttng-relayd/ctf-trace.c
+++ b/src/bin/lttng-relayd/ctf-trace.c
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -22,148 +23,191 @@
#include <common/common.h>
#include <common/utils.h>
+#include <urcu/rculist.h>
#include "ctf-trace.h"
#include "lttng-relayd.h"
#include "stream.h"
static uint64_t last_relay_ctf_trace_id;
+static pthread_mutex_t last_relay_ctf_trace_id_lock = PTHREAD_MUTEX_INITIALIZER;
-static void rcu_destroy_ctf_trace(struct rcu_head *head)
+static void rcu_destroy_ctf_trace(struct rcu_head *rcu_head)
{
- struct lttng_ht_node_str *node =
- caa_container_of(head, struct lttng_ht_node_str, head);
- struct ctf_trace *trace=
- caa_container_of(node, struct ctf_trace, node);
+ struct ctf_trace *trace =
+ caa_container_of(rcu_head, struct ctf_trace, rcu_node);
free(trace);
}
-static void rcu_destroy_stream(struct rcu_head *head)
-{
- struct relay_stream *stream =
- caa_container_of(head, struct relay_stream, rcu_node);
-
- stream_destroy(stream);
-}
-
/*
* Destroy a ctf trace and all stream contained in it.
*
* MUST be called with the RCU read side lock.
*/
-void ctf_trace_destroy(struct ctf_trace *obj)
+void ctf_trace_destroy(struct ctf_trace *trace)
{
- struct relay_stream *stream, *tmp_stream;
-
- assert(obj);
/*
- * Getting to this point, every stream referenced to that object have put
- * back their ref since the've been closed by the control side.
+ * Getting to this point, every stream referenced by that traceect
+ * have put back their ref since the've been closed by the
+ * control side.
*/
- assert(!obj->refcount);
+ assert(cds_list_empty(&trace->stream_list));
+ session_put(trace->session);
+ trace->session = NULL;
+ call_rcu(&trace->rcu_node, rcu_destroy_ctf_trace);
+}
- cds_list_for_each_entry_safe(stream, tmp_stream, &obj->stream_list,
- trace_list) {
- stream_delete(relay_streams_ht, stream);
- call_rcu(&stream->rcu_node, rcu_destroy_stream);
- }
+void ctf_trace_release(struct urcu_ref *ref)
+{
+ struct ctf_trace *trace =
+ caa_container_of(ref, struct ctf_trace, ref);
+ int ret;
+ struct lttng_ht_iter iter;
- call_rcu(&obj->node.head, rcu_destroy_ctf_trace);
+ iter.iter.node = &trace->node.node;
+ ret = lttng_ht_del(trace->session->ctf_traces_ht, &iter);
+ assert(!ret);
+ ctf_trace_destroy(trace);
}
-void ctf_trace_try_destroy(struct relay_session *session,
- struct ctf_trace *ctf_trace)
+/*
+ * Should be called with RCU read-side lock held.
+ */
+bool ctf_trace_get(struct ctf_trace *trace)
{
- assert(session);
- assert(ctf_trace);
+ bool has_ref = false;
- /*
- * Considering no viewer attach to the session and the trace having no more
- * stream attached, wipe the trace.
- */
- if (uatomic_read(&session->viewer_refcount) == 0 &&
- uatomic_read(&ctf_trace->refcount) == 0) {
- ctf_trace_delete(session->ctf_traces_ht, ctf_trace);
- ctf_trace_destroy(ctf_trace);
+ /* Confirm that the trace refcount has not reached 0. */
+ pthread_mutex_lock(&trace->reflock);
+ if (trace->ref.refcount != 0) {
+ has_ref = true;
+ urcu_ref_get(&trace->ref);
}
+ pthread_mutex_unlock(&trace->reflock);
+
+ return has_ref;
}
/*
- * Create and return an allocated ctf_trace object. NULL on error.
+ * Create and return an allocated ctf_trace. NULL on error.
+ * There is no "open" and "close" for a ctf_trace, but rather just a
+ * create and refcounting. Whenever all the streams belonging to a trace
+ * put their reference, its refcount drops to 0.
*/
-struct ctf_trace *ctf_trace_create(char *path_name)
+static struct ctf_trace *ctf_trace_create(struct relay_session *session,
+ char *path_name)
{
- struct ctf_trace *obj;
+ struct ctf_trace *trace;
- assert(path_name);
-
- obj = zmalloc(sizeof(*obj));
- if (!obj) {
+ trace = zmalloc(sizeof(*trace));
+ if (!trace) {
PERROR("ctf_trace alloc");
goto error;
}
- CDS_INIT_LIST_HEAD(&obj->stream_list);
+ if (!session_get(session)) {
+ ERR("Cannot get session");
+ free(trace);
+ trace = NULL;
+ goto error;
+ }
+ trace->session = session;
+
+ CDS_INIT_LIST_HEAD(&trace->stream_list);
+
+ pthread_mutex_lock(&last_relay_ctf_trace_id_lock);
+ trace->id = ++last_relay_ctf_trace_id;
+ pthread_mutex_unlock(&last_relay_ctf_trace_id_lock);
- obj->id = ++last_relay_ctf_trace_id;
- lttng_ht_node_init_str(&obj->node, path_name);
+ lttng_ht_node_init_str(&trace->node, path_name);
+ trace->session = session;
+ urcu_ref_init(&trace->ref);
+ pthread_mutex_init(&trace->lock, NULL);
+ pthread_mutex_init(&trace->reflock, NULL);
+ pthread_mutex_init(&trace->stream_list_lock, NULL);
+ lttng_ht_add_str(session->ctf_traces_ht, &trace->node);
- DBG("Created ctf_trace %" PRIu64 " with path: %s", obj->id, path_name);
+ DBG("Created ctf_trace %" PRIu64 " with path: %s", trace->id, path_name);
error:
- return obj;
+ return trace;
}
/*
- * Return a ctf_trace object if found by id in the given hash table else NULL.
- *
- * Must be called with rcu_read_lock() taken.
+ * Return a ctf_trace if found by id in the given hash table else NULL.
+ * Hold a reference on the ctf_trace, and must be paired with
+ * ctf_trace_put().
*/
-struct ctf_trace *ctf_trace_find_by_path(struct lttng_ht *ht,
+struct ctf_trace *ctf_trace_get_by_path_or_create(struct relay_session *session,
char *path_name)
{
struct lttng_ht_node_str *node;
struct lttng_ht_iter iter;
struct ctf_trace *trace = NULL;
- assert(ht);
-
- lttng_ht_lookup(ht, (void *) path_name, &iter);
+ rcu_read_lock();
+ lttng_ht_lookup(session->ctf_traces_ht, (void *) path_name, &iter);
node = lttng_ht_iter_get_node_str(&iter);
if (!node) {
DBG("CTF Trace path %s not found", path_name);
goto end;
}
trace = caa_container_of(node, struct ctf_trace, node);
-
+ if (!ctf_trace_get(trace)) {
+ trace = NULL;
+ }
end:
+ rcu_read_unlock();
+ if (!trace) {
+ /* Try to create */
+ trace = ctf_trace_create(session, path_name);
+ }
return trace;
}
-/*
- * Add stream to a given hash table.
- */
-void ctf_trace_add(struct lttng_ht *ht, struct ctf_trace *trace)
+void ctf_trace_put(struct ctf_trace *trace)
{
- assert(ht);
- assert(trace);
-
- lttng_ht_add_str(ht, &trace->node);
+ rcu_read_lock();
+ pthread_mutex_lock(&trace->reflock);
+ urcu_ref_put(&trace->ref, ctf_trace_release);
+ pthread_mutex_unlock(&trace->reflock);
+ rcu_read_unlock();
}
-/*
- * Delete stream from a given hash table.
- */
-void ctf_trace_delete(struct lttng_ht *ht, struct ctf_trace *trace)
+int ctf_trace_close(struct ctf_trace *trace)
{
- int ret;
- struct lttng_ht_iter iter;
+ struct relay_stream *stream;
+
+ rcu_read_lock();
+ cds_list_for_each_entry_rcu(stream, &trace->stream_list,
+ stream_node) {
+ /*
+ * Close the stream.
+ */
+ stream_close(stream);
+ }
+ rcu_read_unlock();
+ /*
+ * Since all references to the trace are held by its streams, we
+ * don't need to do any self-ref put.
+ */
+ return 0;
+}
- assert(ht);
- assert(trace);
+struct relay_viewer_stream *ctf_trace_get_viewer_metadata_stream(struct ctf_trace *trace)
+{
+ struct relay_viewer_stream *vstream;
- iter.iter.node = &trace->node.node;
- ret = lttng_ht_del(ht, &iter);
- assert(!ret);
+ rcu_read_lock();
+ vstream = rcu_dereference(trace->viewer_metadata_stream);
+ if (!vstream) {
+ goto end;
+ }
+ if (!viewer_stream_get(vstream)) {
+ vstream = NULL;
+ }
+end:
+ rcu_read_unlock();
+ return vstream;
}
diff --git a/src/bin/lttng-relayd/ctf-trace.h b/src/bin/lttng-relayd/ctf-trace.h
index 489c5f1..d051f80 100644
--- a/src/bin/lttng-relayd/ctf-trace.h
+++ b/src/bin/lttng-relayd/ctf-trace.h
@@ -1,6 +1,10 @@
+#ifndef _CTF_TRACE_H
+#define _CTF_TRACE_H
+
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -16,49 +20,53 @@
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef _CTF_TRACE_H
-#define _CTF_TRACE_H
-
#include <inttypes.h>
+#include <urcu/ref.h>
#include <common/hashtable/hashtable.h>
#include "session.h"
#include "stream.h"
+#include "viewer-stream.h"
struct ctf_trace {
- int refcount;
- unsigned int invalid_flag:1;
+ /*
+ * The ctf_trace reflock nests inside the stream reflock.
+ */
+ pthread_mutex_t reflock; /* Protects refcounting */
+ struct urcu_ref ref; /* Every stream has a ref on the trace. */
+ struct relay_session *session; /* Back ref to trace session */
+
+ /*
+ * The ctf_trace lock nests inside the session lock.
+ */
+ pthread_mutex_t lock;
uint64_t id;
- uint64_t metadata_received;
- uint64_t metadata_sent;
- struct relay_stream *metadata_stream;
- struct relay_viewer_stream *viewer_metadata_stream;
- /* Node indexed by stream path name in the corresponding session. */
- struct lttng_ht_node_str node;
+ struct relay_viewer_stream *viewer_metadata_stream; /* RCU protected */
- /* Relay stream associated with this ctf trace. */
+ /*
+ * Relay streams associated with this ctf trace.
+ * Updates are protected by the stream_list lock.
+ * Traversals are protected by RCU.
+ */
struct cds_list_head stream_list;
+ pthread_mutex_t stream_list_lock;
+
+ /*
+ * Node within session trace hash table. Node is indexed by
+ * stream path name.
+ */
+ struct lttng_ht_node_str node;
+ struct rcu_head rcu_node; /* For call_rcu teardown. */
};
-static inline void ctf_trace_get_ref(struct ctf_trace *trace)
-{
- uatomic_inc(&trace->refcount);
-}
+struct ctf_trace *ctf_trace_get_by_path_or_create(struct relay_session *session,
+ char *path_name);
+bool ctf_trace_get(struct ctf_trace *trace);
+void ctf_trace_put(struct ctf_trace *trace);
-static inline void ctf_trace_put_ref(struct ctf_trace *trace)
-{
- uatomic_add(&trace->refcount, -1);
-}
+int ctf_trace_close(struct ctf_trace *trace);
-void ctf_trace_assign(struct relay_stream *stream);
-struct ctf_trace *ctf_trace_create(char *path_name);
-void ctf_trace_destroy(struct ctf_trace *obj);
-void ctf_trace_try_destroy(struct relay_session *session,
- struct ctf_trace *ctf_trace);
-struct ctf_trace *ctf_trace_find_by_path(struct lttng_ht *ht,
- char *path_name);
-void ctf_trace_add(struct lttng_ht *ht, struct ctf_trace *trace);
-void ctf_trace_delete(struct lttng_ht *ht, struct ctf_trace *trace);
+struct relay_viewer_stream *ctf_trace_get_viewer_metadata_stream(struct ctf_trace *trace);
#endif /* _CTF_TRACE_H */
diff --git a/src/bin/lttng-relayd/index.c b/src/bin/lttng-relayd/index.c
index b7507a0..c092b7f 100644
--- a/src/bin/lttng-relayd/index.c
+++ b/src/bin/lttng-relayd/index.c
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -24,190 +25,307 @@
#include <common/utils.h>
#include "lttng-relayd.h"
+#include "stream.h"
#include "index.h"
/*
- * Deferred free of a relay index object. MUST only be called by a call RCU.
+ * Allocate a new relay index object. Pass the stream in which it is
+ * contained as parameter. The sequence number will be used as the hash
+ * table key.
+ *
+ * Called with stream mutex held.
+ * Return allocated object or else NULL on error.
*/
-static void deferred_free_relay_index(struct rcu_head *head)
+static struct relay_index *relay_index_create(struct relay_stream *stream,
+ uint64_t net_seq_num)
{
- struct relay_index *index =
- caa_container_of(head, struct relay_index, rcu_node);
+ struct relay_index *index;
- if (index->to_close_fd >= 0) {
- int ret;
+ DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64,
+ stream->stream_handle, net_seq_num);
- ret = close(index->to_close_fd);
- if (ret < 0) {
- PERROR("Relay index to close fd %d", index->to_close_fd);
- }
+ index = zmalloc(sizeof(*index));
+ if (!index) {
+ PERROR("Relay index zmalloc");
+ goto end;
+ }
+ if (!stream_get(stream)) {
+ ERR("Cannot get stream");
+ free(index);
+ index = NULL;
+ goto end;
}
+ index->stream = stream;
- relay_index_free(index);
+ lttng_ht_node_init_u64(&index->index_n, net_seq_num);
+ pthread_mutex_init(&index->lock, NULL);
+ pthread_mutex_init(&index->reflock, NULL);
+ urcu_ref_init(&index->ref);
+
+end:
+ return index;
}
/*
- * Allocate a new relay index object using the given stream ID and sequence
- * number as the hash table key.
+ * Add unique relay index to the given hash table. In case of a collision, the
+ * already existing object is put in the given _index variable.
*
- * Return allocated object or else NULL on error.
+ * RCU read side lock MUST be acquired.
*/
-struct relay_index *relay_index_create(uint64_t stream_id,
- uint64_t net_seq_num)
+static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
+ struct relay_index *index)
{
- struct relay_index *index;
+ struct cds_lfht_node *node_ptr;
+ struct relay_index *_index;
- DBG2("Creating relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
- stream_id, net_seq_num);
+ DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
+ stream->stream_handle, index->index_n.key);
- index = zmalloc(sizeof(*index));
- if (index == NULL) {
- PERROR("Relay index zmalloc");
- goto error;
+ node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht,
+ stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed),
+ stream->indexes_ht->match_fct, &index->index_n,
+ &index->index_n.node);
+ if (node_ptr != &index->index_n.node) {
+ _index = caa_container_of(node_ptr, struct relay_index,
+ index_n.node);
+ } else {
+ _index = NULL;
}
+ return _index;
+}
+
+/*
+ * Should be called with RCU read-side lock held.
+ */
+static bool relay_index_get(struct relay_index *index)
+{
+ bool has_ref = false;
- index->to_close_fd = -1;
- lttng_ht_node_init_two_u64(&index->index_n, stream_id, net_seq_num);
+ DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
+ index->stream->stream_handle, index->index_n.key,
+ (int) index->ref.refcount);
-error:
- return index;
+ /* Confirm that the index refcount has not reached 0. */
+ pthread_mutex_lock(&index->reflock);
+ if (index->ref.refcount != 0) {
+ has_ref = true;
+ urcu_ref_get(&index->ref);
+ }
+ pthread_mutex_unlock(&index->reflock);
+
+ return has_ref;
}
/*
- * Find a relayd index in the given hash table.
+ * Get a relayd index in within the given stream, or create it if not
+ * present.
*
+ * Called with stream mutex held.
* Return index object or else NULL on error.
*/
-struct relay_index *relay_index_find(uint64_t stream_id, uint64_t net_seq_num)
+struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
+ uint64_t net_seq_num)
{
- struct lttng_ht_node_two_u64 *node;
+ struct lttng_ht_node_u64 *node;
struct lttng_ht_iter iter;
- struct lttng_ht_two_u64 key;
struct relay_index *index = NULL;
DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64,
- stream_id, net_seq_num);
-
- key.key1 = stream_id;
- key.key2 = net_seq_num;
+ stream->stream_handle, net_seq_num);
- lttng_ht_lookup(indexes_ht, (void *)(&key), &iter);
- node = lttng_ht_iter_get_node_two_u64(&iter);
- if (node == NULL) {
- goto end;
+ rcu_read_lock();
+ lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter);
+ node = lttng_ht_iter_get_node_u64(&iter);
+ if (node) {
+ index = caa_container_of(node, struct relay_index, index_n);
+ } else {
+ struct relay_index *oldindex;
+
+ index = relay_index_create(stream, net_seq_num);
+ if (!index) {
+ ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64,
+ index->stream->stream_handle, net_seq_num);
+ goto end;
+ }
+ oldindex = relay_index_add_unique(stream, index);
+ if (oldindex) {
+ /* Added concurrently, keep old. */
+ relay_index_put(index);
+ index = oldindex;
+ if (!relay_index_get(index)) {
+ index = NULL;
+ }
+ } else {
+ stream->indexes_in_flight++;
+ index->in_hash_table = true;
+ }
}
- index = caa_container_of(node, struct relay_index, index_n);
-
end:
- DBG2("Index %sfound in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
- (index == NULL) ? "NOT " : "", stream_id, net_seq_num);
+ rcu_read_unlock();
+ DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
+ (index == NULL) ? "NOT " : "", index->stream->stream_handle, net_seq_num);
return index;
}
-/*
- * Add unique relay index to the given hash table. In case of a collision, the
- * already existing object is put in the given _index variable.
- *
- * RCU read side lock MUST be acquired.
- */
-void relay_index_add(struct relay_index *index, struct relay_index **_index)
+int relay_index_set_fd(struct relay_index *index, struct stream_fd *index_fd,
+ uint64_t data_offset)
{
- struct cds_lfht_node *node_ptr;
+ int ret = 0;
- assert(index);
+ pthread_mutex_lock(&index->lock);
+ if (index->index_fd) {
+ ret = -1;
+ goto end;
+ }
+ stream_fd_get(index_fd);
+ index->index_fd = index_fd;
+ index->index_data.offset = data_offset;
+end:
+ pthread_mutex_unlock(&index->lock);
+ return ret;
+}
- DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
- index->index_n.key.key1, index->index_n.key.key2);
+int relay_index_set_data(struct relay_index *index,
+ const struct ctf_packet_index *data)
+{
+ int ret = 0;
- node_ptr = cds_lfht_add_unique(indexes_ht->ht,
- indexes_ht->hash_fct((void *) &index->index_n.key, lttng_ht_seed),
- indexes_ht->match_fct, (void *) &index->index_n.key,
- &index->index_n.node);
- if (node_ptr != &index->index_n.node) {
- *_index = caa_container_of(node_ptr, struct relay_index, index_n.node);
+ pthread_mutex_lock(&index->lock);
+ if (index->has_index_data) {
+ ret = -1;
+ goto end;
}
+ /* Set everything except data_offset. */
+ index->index_data.packet_size = data->packet_size;
+ index->index_data.content_size = data->content_size;
+ index->index_data.timestamp_begin = data->timestamp_begin;
+ index->index_data.timestamp_end = data->timestamp_end;
+ index->index_data.events_discarded = data->events_discarded;
+ index->index_data.stream_id = data->stream_id;
+ index->has_index_data = true;
+end:
+ pthread_mutex_unlock(&index->lock);
+ return ret;
}
-/*
- * Write index on disk to the given fd. Once done error or not, it is removed
- * from the hash table and destroy the object.
- *
- * MUST be called with a RCU read side lock held.
- *
- * Return 0 on success else a negative value.
- */
-int relay_index_write(int fd, struct relay_index *index)
+static void index_destroy(struct relay_index *index)
{
- int ret;
- struct lttng_ht_iter iter;
-
- DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
- " on fd %d", index->index_n.key.key1,
- index->index_n.key.key2, fd);
+ free(index);
+}
- /* Delete index from hash table. */
- iter.iter.node = &index->index_n.node;
- ret = lttng_ht_del(indexes_ht, &iter);
- assert(!ret);
- call_rcu(&index->rcu_node, deferred_free_relay_index);
+static void index_destroy_rcu(struct rcu_head *rcu_head)
+{
+ struct relay_index *index =
+ caa_container_of(rcu_head, struct relay_index, rcu_node);
- return index_write(fd, &index->index_data, sizeof(index->index_data));
+ index_destroy(index);
}
-/*
- * Free the given index.
- */
-void relay_index_free(struct relay_index *index)
+static void index_release(struct urcu_ref *ref)
{
- free(index);
+ struct relay_index *index = caa_container_of(ref, struct relay_index, ref);
+ struct relay_stream *stream = index->stream;
+ int ret;
+ struct lttng_ht_iter iter;
+
+ if (index->index_fd) {
+ stream_fd_put(index->index_fd);
+ index->index_fd = NULL;
+ }
+ if (index->in_hash_table) {
+ /* Delete index from hash table. */
+ iter.iter.node = &index->index_n.node;
+ ret = lttng_ht_del(stream->indexes_ht, &iter);
+ assert(!ret);
+ stream->indexes_in_flight--;
+ }
+
+ stream_put(index->stream);
+ index->stream = NULL;
+
+ call_rcu(&index->rcu_node, index_destroy_rcu);
}
/*
- * Safely free the given index using a call RCU.
+ * Called with stream mutex held.
*/
-void relay_index_free_safe(struct relay_index *index)
+void relay_index_put(struct relay_index *index)
{
- if (!index) {
- return;
- }
-
- call_rcu(&index->rcu_node, deferred_free_relay_index);
+ DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
+ index->stream->stream_handle, index->index_n.key,
+ (int) index->ref.refcount);
+ /*
+ * Ensure existance of index->lock for index unlock.
+ */
+ rcu_read_lock();
+ /*
+ * Index lock ensures that concurrent test and update of stream
+ * ref is atomic.
+ */
+ pthread_mutex_lock(&index->reflock);
+ assert(index->ref.refcount != 0);
+ urcu_ref_put(&index->ref, index_release);
+ pthread_mutex_unlock(&index->reflock);
+ rcu_read_unlock();
}
/*
- * Delete index from the given hash table.
+ * Try to flush index to disk. Releases self-reference to index once
+ * flush succeeds.
*
- * RCU read side lock MUST be acquired.
+ * Return 0 on successful flush, a negative value on error, or positive
+ * value if no flush was performed.
*/
-void relay_index_delete(struct relay_index *index)
+int relay_index_try_flush(struct relay_index *index)
{
- int ret;
- struct lttng_ht_iter iter;
+ int ret = 1;
+ bool flushed = false;
+ int fd;
- DBG3("Relay index with stream ID %" PRIu64 " and seq num %" PRIu64
- " deleted.", index->index_n.key.key1,
- index->index_n.key.key2);
+ pthread_mutex_lock(&index->lock);
+ if (index->flushed) {
+ goto skip;
+ }
+ /* Check if we are ready to flush. */
+ if (!index->has_index_data || !index->index_fd) {
+ goto skip;
+ }
+ fd = index->index_fd->fd;
+ DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
+ " on fd %d", index->stream->stream_handle,
+ index->index_n.key, fd);
+ flushed = true;
+ index->flushed = true;
+ ret = index_write(fd, &index->index_data, sizeof(index->index_data));
+ if (ret == sizeof(index->index_data)) {
+ ret = 0;
+ } else {
+ ret = -1;
+ }
+skip:
+ pthread_mutex_unlock(&index->lock);
- /* Delete index from hash table. */
- iter.iter.node = &index->index_n.node;
- ret = lttng_ht_del(indexes_ht, &iter);
- assert(!ret);
+ if (flushed) {
+ /* Put self-ref from index now that it has been flushed. */
+ relay_index_put(index);
+ }
+ return ret;
}
/*
- * Destroy every relay index with the given stream id as part of the key.
+ * Close every relay index within a given stream, without flushing
+ * them.
*/
-void relay_index_destroy_by_stream_id(uint64_t stream_id)
+void relay_index_close_all(struct relay_stream *stream)
{
struct lttng_ht_iter iter;
struct relay_index *index;
rcu_read_lock();
- cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, index_n.node) {
- if (index->index_n.key.key1 == stream_id) {
- relay_index_delete(index);
- relay_index_free_safe(index);
- }
+ cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
+ index, index_n.node) {
+ /* Put self-ref from index. */
+ relay_index_put(index);
}
rcu_read_unlock();
}
diff --git a/src/bin/lttng-relayd/index.h b/src/bin/lttng-relayd/index.h
index e7f9cdb..e882ed9 100644
--- a/src/bin/lttng-relayd/index.h
+++ b/src/bin/lttng-relayd/index.h
@@ -1,6 +1,10 @@
+#ifndef _RELAY_INDEX_H
+#define _RELAY_INDEX_H
+
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -16,42 +20,56 @@
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef _RELAY_INDEX_H
-#define _RELAY_INDEX_H
-
#include <inttypes.h>
#include <pthread.h>
#include <common/hashtable/hashtable.h>
#include <common/index/index.h>
+#include "stream-fd.h"
+
+struct relay_stream;
+
struct relay_index {
- /* FD on which to write the index data. */
- int fd;
/*
- * When destroying this object, this fd is checked and if valid, close it
- * so this is basically a lazy close of the previous fd corresponding to
- * the same stream id. This is used for the rotate file feature.
+ * index lock nests inside stream lock.
*/
- int to_close_fd;
+ pthread_mutex_t reflock; /* Protects refcounting. */
+ struct urcu_ref ref; /* Reference from getters. */
+ struct relay_stream *stream; /* Back ref to stream */
+
+ pthread_mutex_t lock;
+ /*
+ * FD on which to write the index data. May differ from
+ * stream->index_fd due to tracefile rotation.
+ */
+ struct stream_fd *index_fd;
/* Index packet data. This is the data that is written on disk. */
struct ctf_packet_index index_data;
- /* key1 = stream_id, key2 = net_seq_num */
- struct lttng_ht_node_two_u64 index_n;
- struct rcu_head rcu_node;
- pthread_mutex_t mutex;
+ bool has_index_data;
+ bool flushed;
+ bool in_hash_table;
+
+ /*
+ * Node within indexes_ht that corresponds to this struct
+ * relay_index. Indexed by net_seq_num, which is unique for this
+ * index across the stream.
+ */
+ struct lttng_ht_node_u64 index_n;
+ struct rcu_head rcu_node; /* For call_rcu teardown. */
};
-struct relay_index *relay_index_create(uint64_t stream_id,
+struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
uint64_t net_seq_num);
-struct relay_index *relay_index_find(uint64_t stream_id, uint64_t net_seq_num);
-void relay_index_add(struct relay_index *index, struct relay_index **_index);
-int relay_index_write(int fd, struct relay_index *index);
-void relay_index_free(struct relay_index *index);
-void relay_index_free_safe(struct relay_index *index);
-void relay_index_delete(struct relay_index *index);
-void relay_index_destroy_by_stream_id(uint64_t stream_id);
+void relay_index_put(struct relay_index *index);
+int relay_index_set_fd(struct relay_index *index, struct stream_fd *index_fd,
+ uint64_t data_offset);
+int relay_index_set_data(struct relay_index *index,
+ const struct ctf_packet_index *data);
+int relay_index_try_flush(struct relay_index *index);
+
+void relay_index_close_all(struct relay_stream *stream);
#endif /* _RELAY_INDEX_H */
diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c
index 562a7fa..64d37fa 100644
--- a/src/bin/lttng-relayd/live.c
+++ b/src/bin/lttng-relayd/live.c
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2 only,
@@ -36,6 +37,7 @@
#include <inttypes.h>
#include <urcu/futex.h>
#include <urcu/uatomic.h>
+#include <urcu/rculist.h>
#include <unistd.h>
#include <fcntl.h>
#include <config.h>
@@ -65,6 +67,9 @@
#include "session.h"
#include "ctf-trace.h"
#include "connection.h"
+#include "viewer-session.h"
+
+#define SESSION_BUF_DEFAULT_COUNT 16
static struct lttng_uri *live_uri;
@@ -90,6 +95,8 @@ static pthread_t live_worker_thread;
static struct relay_conn_queue viewer_conn_queue;
static uint64_t last_relay_viewer_session_id;
+static pthread_mutex_t last_relay_viewer_session_id_lock =
+ PTHREAD_MUTEX_INITIALIZER;
/*
* Cleanup the daemon
@@ -114,9 +121,6 @@ ssize_t recv_request(struct lttcomm_sock *sock, void *buf, size_t size)
{
ssize_t ret;
- assert(sock);
- assert(buf);
-
ret = sock->ops->recvmsg(sock, buf, size, 0);
if (ret < 0 || ret != size) {
if (ret == 0) {
@@ -143,9 +147,6 @@ ssize_t send_response(struct lttcomm_sock *sock, void *buf, size_t size)
{
ssize_t ret;
- assert(sock);
- assert(buf);
-
ret = sock->ops->sendmsg(sock, buf, size, 0);
if (ret < 0) {
ERR("Relayd failed to send response.");
@@ -171,17 +172,22 @@ int check_new_streams(struct relay_connection *conn)
if (!conn->viewer_session) {
goto end;
}
- cds_list_for_each_entry(session,
- &conn->viewer_session->sessions_head,
- viewer_session_list) {
+ rcu_read_lock();
+ cds_list_for_each_entry_rcu(session,
+ &conn->viewer_session->session_list,
+ viewer_session_node) {
+ if (!session_get(session)) {
+ continue;
+ }
current_val = uatomic_cmpxchg(&session->new_streams, 1, 0);
ret = current_val;
+ session_put(session);
if (ret == 1) {
goto end;
}
}
-
end:
+ rcu_read_unlock();
return ret;
}
@@ -200,8 +206,6 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock,
struct lttng_ht_iter iter;
struct relay_viewer_stream *vstream;
- assert(session);
-
rcu_read_lock();
cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream,
@@ -210,30 +214,38 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock,
health_code_update();
+ if (!viewer_stream_get(vstream)) {
+ continue;
+ }
+
+ pthread_mutex_lock(&vstream->lock);
/* Ignore if not the same session. */
- if (vstream->session_id != session->id ||
+ if (vstream->stream->trace->session->id != session->id ||
(!ignore_sent_flag && vstream->sent_flag)) {
+ pthread_mutex_unlock(&vstream->lock);
+ viewer_stream_put(vstream);
continue;
}
- ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
- vstream->path_name);
- assert(ctf_trace);
-
- send_stream.id = htobe64(vstream->stream_handle);
+ ctf_trace = vstream->stream->trace;
+ send_stream.id = htobe64(vstream->stream->stream_handle);
send_stream.ctf_trace_id = htobe64(ctf_trace->id);
- send_stream.metadata_flag = htobe32(vstream->metadata_flag);
+ send_stream.metadata_flag = htobe32(vstream->stream->is_metadata);
strncpy(send_stream.path_name, vstream->path_name,
sizeof(send_stream.path_name));
strncpy(send_stream.channel_name, vstream->channel_name,
sizeof(send_stream.channel_name));
- DBG("Sending stream %" PRIu64 " to viewer", vstream->stream_handle);
+ DBG("Sending stream %" PRIu64 " to viewer",
+ vstream->stream->stream_handle);
+ vstream->sent_flag = 1;
+ pthread_mutex_unlock(&vstream->lock);
+
ret = send_response(sock, &send_stream, sizeof(send_stream));
if (ret < 0) {
goto end_unlock;
}
- vstream->sent_flag = 1;
+ viewer_stream_put(vstream);
}
ret = 0;
@@ -263,17 +275,14 @@ int make_viewer_streams(struct relay_session *session,
assert(session);
/*
- * This is to make sure we create viewer streams for a full received
- * channel. For instance, if we have 8 streams for a channel that are
- * concurrently being flagged ready, we can end up creating just a subset
- * of the 8 streams (the ones that are flagged). This lock avoids this
- * limbo state.
+ * Hold the session lock to ensure that we see either none or
+ * all initial streams for a session, but no intermediate state.
*/
- pthread_mutex_lock(&session->viewer_ready_lock);
+ pthread_mutex_lock(&session->lock);
/*
- * Create viewer streams for relay streams that are ready to be used for a
- * the given session id only.
+ * Create viewer streams for relay streams that are ready to be
+ * used for a the given session id only.
*/
rcu_read_lock();
cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace,
@@ -282,47 +291,59 @@ int make_viewer_streams(struct relay_session *session,
health_code_update();
- if (ctf_trace->invalid_flag) {
+ if (!ctf_trace_get(ctf_trace)) {
continue;
}
- cds_list_for_each_entry(stream, &ctf_trace->stream_list, trace_list) {
+ cds_list_for_each_entry_rcu(stream, &ctf_trace->stream_list, stream_node) {
struct relay_viewer_stream *vstream;
- if (!stream->viewer_ready) {
+ if (!stream_get(stream)) {
continue;
}
-
- vstream = viewer_stream_find_by_id(stream->stream_handle);
+ /*
+ * stream published is protected by the session
+ * lock.
+ */
+ if (!stream->published) {
+ goto next;
+ }
+ vstream = viewer_stream_get_by_id(stream->stream_handle);
if (!vstream) {
- vstream = viewer_stream_create(stream, seek_t, ctf_trace);
+ vstream = viewer_stream_create(stream, seek_t);
if (!vstream) {
ret = -1;
+ ctf_trace_put(ctf_trace);
+ stream_put(stream);
goto error_unlock;
}
- /* Acquire reference to ctf_trace. */
- ctf_trace_get_ref(ctf_trace);
if (nb_created) {
/* Update number of created stream counter. */
(*nb_created)++;
}
- } else if (!vstream->sent_flag && nb_unsent) {
- /* Update number of unsent stream counter. */
- (*nb_unsent)++;
+ } else {
+ if (!vstream->sent_flag && nb_unsent) {
+ /* Update number of unsent stream counter. */
+ (*nb_unsent)++;
+ }
+ viewer_stream_put(vstream);
}
/* Update number of total stream counter. */
if (nb_total) {
(*nb_total)++;
}
+ next:
+ stream_put(stream);
}
+ ctf_trace_put(ctf_trace);
}
ret = 0;
error_unlock:
rcu_read_unlock();
- pthread_mutex_unlock(&session->viewer_ready_lock);
+ pthread_mutex_unlock(&session->lock);
return ret;
}
@@ -505,22 +526,17 @@ restart:
goto error;
} else if (revents & LPOLLIN) {
/*
- * Get allocated in this thread, enqueued to a global queue,
- * dequeued and freed in the worker thread.
+ * Get allocated in this thread,
+ * enqueued to a global queue, dequeued
+ * and freed in the worker thread.
*/
int val = 1;
struct relay_connection *new_conn;
struct lttcomm_sock *newsock;
- new_conn = connection_create();
- if (!new_conn) {
- goto error;
- }
-
newsock = live_control_sock->ops->accept(live_control_sock);
if (!newsock) {
PERROR("accepting control sock");
- connection_free(new_conn);
goto error;
}
DBG("Relay viewer connection accepted socket %d", newsock->fd);
@@ -530,18 +546,22 @@ restart:
if (ret < 0) {
PERROR("setsockopt inet");
lttcomm_destroy_sock(newsock);
- connection_free(new_conn);
goto error;
}
- new_conn->sock = newsock;
+ new_conn = connection_create(newsock, RELAY_CONNECTION_UNKNOWN);
+ if (!new_conn) {
+ lttcomm_destroy_sock(newsock);
+ goto error;
+ }
/* Enqueue request for the dispatcher thread. */
cds_wfcq_enqueue(&viewer_conn_queue.head, &viewer_conn_queue.tail,
&new_conn->qnode);
/*
- * Wake the dispatch queue futex. Implicit memory barrier with
- * the exchange in cds_wfcq_enqueue.
+ * Wake the dispatch queue futex.
+ * Implicit memory barrier with the
+ * exchange in cds_wfcq_enqueue.
*/
futex_nto1_wake(&viewer_conn_queue.futex);
}
@@ -618,14 +638,15 @@ void *thread_dispatcher(void *data)
conn->sock->fd);
/*
- * Inform worker thread of the new request. This call is blocking
- * so we can be assured that the data will be read at some point in
- * time or wait to the end of the world :)
+ * Inform worker thread of the new request. This
+ * call is blocking so we can be assured that
+ * the data will be read at some point in time
+ * or wait to the end of the world :)
*/
ret = lttng_write(live_conn_pipe[1], &conn, sizeof(conn));
if (ret < 0) {
PERROR("write conn pipe");
- connection_destroy(conn);
+ connection_put(conn);
goto error;
}
} while (node != NULL);
@@ -664,8 +685,6 @@ int viewer_connect(struct relay_connection *conn)
int ret;
struct lttng_viewer_connect reply, msg;
- assert(conn);
-
conn->version_check_done = 1;
health_code_update();
@@ -713,10 +732,13 @@ int viewer_connect(struct relay_connection *conn)
reply.minor = htobe32(reply.minor);
if (conn->type == RELAY_VIEWER_COMMAND) {
/*
- * Increment outside of htobe64 macro, because can be used more than once
- * within the macro, and thus the operation may be undefined.
+ * Increment outside of htobe64 macro, because can be
+ * used more than once within the macro, and thus the
+ * operation may be undefined.
*/
+ pthread_mutex_lock(&last_relay_viewer_session_id_lock);
last_relay_viewer_session_id++;
+ pthread_mutex_unlock(&last_relay_viewer_session_id_lock);
reply.viewer_session_id = htobe64(last_relay_viewer_session_id);
}
@@ -738,6 +760,9 @@ end:
/*
* Send the viewer the list of current sessions.
+ * We need to create a copy of the hash table content because otherwise
+ * we cannot assume the number of entries stays the same between getting
+ * the number of HT elements and iteration over the HT.
*
* Return 0 on success or else a negative value.
*/
@@ -746,156 +771,89 @@ int viewer_list_sessions(struct relay_connection *conn)
{
int ret;
struct lttng_viewer_list_sessions session_list;
- unsigned long count;
- long approx_before, approx_after;
struct lttng_ht_iter iter;
- struct lttng_viewer_session send_session;
struct relay_session *session;
+ struct lttng_viewer_session *send_session_buf = NULL;
+ uint32_t buf_count = SESSION_BUF_DEFAULT_COUNT;
+ uint32_t count = 0;
DBG("List sessions received");
- rcu_read_lock();
- cds_lfht_count_nodes(conn->sessions_ht->ht, &approx_before, &count,
- &approx_after);
- session_list.sessions_count = htobe32(count);
-
- health_code_update();
-
- ret = send_response(conn->sock, &session_list, sizeof(session_list));
- if (ret < 0) {
- goto end_unlock;
+ send_session_buf = zmalloc(SESSION_BUF_DEFAULT_COUNT * sizeof(*send_session_buf));
+ if (!send_session_buf) {
+ return -1;
}
- health_code_update();
-
- cds_lfht_for_each_entry(conn->sessions_ht->ht, &iter.iter, session,
+ rcu_read_lock();
+ cds_lfht_for_each_entry(sessions_ht->ht, &iter.iter, session,
session_n.node) {
- health_code_update();
-
- strncpy(send_session.session_name, session->session_name,
- sizeof(send_session.session_name));
- strncpy(send_session.hostname, session->hostname,
- sizeof(send_session.hostname));
- send_session.id = htobe64(session->id);
- send_session.live_timer = htobe32(session->live_timer);
- send_session.clients = htobe32(session->viewer_refcount);
- send_session.streams = htobe32(session->stream_count);
+ struct lttng_viewer_session *send_session;
health_code_update();
- ret = send_response(conn->sock, &send_session, sizeof(send_session));
- if (ret < 0) {
- goto end_unlock;
- }
- }
- health_code_update();
+ if (count >= buf_count) {
+ struct lttng_viewer_session *newbuf;
+ uint32_t new_buf_count = buf_count << 1;
- ret = 0;
-end_unlock:
- rcu_read_unlock();
- return ret;
-}
-
-/*
- * Check if a connection is attached to a session.
- * Return 1 if attached, 0 if not attached, a negative value on error.
- */
-static
-int session_attached(struct relay_connection *conn, uint64_t session_id)
-{
- struct relay_session *session;
- int found = 0;
-
- if (!conn->viewer_session) {
- goto end;
- }
- cds_list_for_each_entry(session,
- &conn->viewer_session->sessions_head,
- viewer_session_list) {
- if (session->id == session_id) {
- found = 1;
- goto end;
+ newbuf = realloc(send_session_buf,
+ new_buf_count * sizeof(*send_session_buf));
+ if (!new_buf_count) {
+ ret = -1;
+ rcu_read_unlock();
+ goto end_free;
+ }
+ send_session_buf = newbuf;
+ buf_count = new_buf_count;
}
- }
-
-end:
- return found;
-}
-
-/*
- * Delete all streams for a specific session ID.
- */
-static void destroy_viewer_streams_by_session(struct relay_session *session)
-{
- struct relay_viewer_stream *stream;
- struct lttng_ht_iter iter;
-
- assert(session);
-
- rcu_read_lock();
- cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, stream,
- stream_n.node) {
- struct ctf_trace *ctf_trace;
-
- health_code_update();
- if (stream->session_id != session->id) {
- continue;
+ send_session = &send_session_buf[count];
+ strncpy(send_session->session_name, session->session_name,
+ sizeof(send_session->session_name));
+ strncpy(send_session->hostname, session->hostname,
+ sizeof(send_session->hostname));
+ send_session->id = htobe64(session->id);
+ send_session->live_timer = htobe32(session->live_timer);
+ if (session->viewer_attached) {
+ send_session->clients = htobe32(1);
+ } else {
+ send_session->clients = htobe32(0);
}
+ send_session->streams = htobe32(session->stream_count);
+ count++;
+ }
+ rcu_read_unlock();
- ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
- stream->path_name);
- assert(ctf_trace);
-
- viewer_stream_delete(stream);
+ session_list.sessions_count = htobe32(count);
- if (stream->metadata_flag) {
- ctf_trace->metadata_sent = 0;
- ctf_trace->viewer_metadata_stream = NULL;
- }
+ health_code_update();
- viewer_stream_destroy(ctf_trace, stream);
+ ret = send_response(conn->sock, &session_list, sizeof(session_list));
+ if (ret < 0) {
+ goto end_free;
}
- rcu_read_unlock();
-}
-
-static void try_destroy_streams(struct relay_session *session)
-{
- struct ctf_trace *ctf_trace;
- struct lttng_ht_iter iter;
- assert(session);
+ health_code_update();
- cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace,
- node.node) {
- /* Attempt to destroy the ctf trace of that session. */
- ctf_trace_try_destroy(session, ctf_trace);
+ ret = send_response(conn->sock, send_session_buf,
+ count * sizeof(*send_session_buf));
+ if (ret < 0) {
+ goto end_free;
}
-}
+ health_code_update();
-/*
- * Cleanup a session.
- */
-static void cleanup_session(struct relay_connection *conn,
- struct relay_session *session)
-{
- /*
- * Very important that this is done before destroying the session so we
- * can put back every viewer stream reference from the ctf_trace.
- */
- destroy_viewer_streams_by_session(session);
- try_destroy_streams(session);
- cds_list_del(&session->viewer_session_list);
- session_viewer_try_destroy(conn->sessions_ht, session);
+ ret = 0;
+end_free:
+ free(send_session_buf);
+ return ret;
}
/*
- * Send the viewer the list of current sessions.
+ * Send the viewer the list of current streams.
*/
static
int viewer_get_new_streams(struct relay_connection *conn)
{
int ret, send_streams = 0;
- uint32_t nb_created = 0, nb_unsent = 0, nb_streams = 0;
+ uint32_t nb_created = 0, nb_unsent = 0, nb_streams = 0, nb_total = 0;
struct lttng_viewer_new_streams_request request;
struct lttng_viewer_new_streams_response response;
struct relay_session *session;
@@ -918,15 +876,14 @@ int viewer_get_new_streams(struct relay_connection *conn)
memset(&response, 0, sizeof(response));
- rcu_read_lock();
- session = session_find_by_id(conn->sessions_ht, session_id);
+ session = session_get_by_id(session_id);
if (!session) {
DBG("Relay session %" PRIu64 " not found", session_id);
response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
goto send_reply;
}
- if (!session_attached(conn, session_id)) {
+ if (!viewer_session_is_attached(conn->viewer_session, session)) {
send_streams = 0;
response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
goto send_reply;
@@ -935,10 +892,10 @@ int viewer_get_new_streams(struct relay_connection *conn)
send_streams = 1;
response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
- ret = make_viewer_streams(session, LTTNG_VIEWER_SEEK_LAST, NULL, &nb_unsent,
+ ret = make_viewer_streams(session, LTTNG_VIEWER_SEEK_LAST, &nb_total, &nb_unsent,
&nb_created);
if (ret < 0) {
- goto end_unlock;
+ goto end_put_session;
}
/* Only send back the newly created streams with the unsent ones. */
nb_streams = nb_created + nb_unsent;
@@ -949,15 +906,9 @@ int viewer_get_new_streams(struct relay_connection *conn)
* it means that the viewer has already received the whole trace
* for this session and should now close it.
*/
- if (nb_streams == 0 && session->close_flag) {
+ if (nb_total == 0 && session->connection_closed) {
send_streams = 0;
response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP);
- /*
- * Remove the session from the attached list of the connection
- * and try to destroy it.
- */
- cds_list_del(&session->viewer_session_list);
- cleanup_session(conn, session);
goto send_reply;
}
@@ -965,30 +916,33 @@ send_reply:
health_code_update();
ret = send_response(conn->sock, &response, sizeof(response));
if (ret < 0) {
- goto end_unlock;
+ goto end_put_session;
}
health_code_update();
/*
- * Unknown or empty session, just return gracefully, the viewer knows what
- * is happening.
+ * Unknown or empty session, just return gracefully, the viewer
+ * knows what is happening.
*/
if (!send_streams || !nb_streams) {
ret = 0;
- goto end_unlock;
+ goto end_put_session;
}
/*
- * Send stream and *DON'T* ignore the sent flag so every viewer streams
- * that were not sent from that point will be sent to the viewer.
+ * Send stream and *DON'T* ignore the sent flag so every viewer
+ * streams that were not sent from that point will be sent to
+ * the viewer.
*/
ret = send_viewer_streams(conn->sock, session, 0);
if (ret < 0) {
- goto end_unlock;
+ goto end_put_session;
}
-end_unlock:
- rcu_read_unlock();
+end_put_session:
+ if (session) {
+ session_put(session);
+ }
error:
return ret;
}
@@ -1005,7 +959,7 @@ int viewer_attach_session(struct relay_connection *conn)
enum lttng_viewer_seek seek_type;
struct lttng_viewer_attach_session_request request;
struct lttng_viewer_attach_session_response response;
- struct relay_session *session;
+ struct relay_session *session = NULL;
assert(conn);
@@ -1027,37 +981,34 @@ int viewer_attach_session(struct relay_connection *conn)
goto send_reply;
}
- rcu_read_lock();
- session = session_find_by_id(conn->sessions_ht,
- be64toh(request.session_id));
+ session = session_get_by_id(be64toh(request.session_id));
if (!session) {
DBG("Relay session %" PRIu64 " not found",
be64toh(request.session_id));
response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
goto send_reply;
}
- session_viewer_attach(session);
- DBG("Attach session ID %" PRIu64 " received", be64toh(request.session_id));
+ DBG("Attach session ID %" PRIu64 " received",
+ be64toh(request.session_id));
- if (uatomic_read(&session->viewer_refcount) > 1) {
- DBG("Already a viewer attached");
- response.status = htobe32(LTTNG_VIEWER_ATTACH_ALREADY);
- session_viewer_detach(session);
- goto send_reply;
- } else if (session->live_timer == 0) {
+ if (session->live_timer == 0) {
DBG("Not live session");
response.status = htobe32(LTTNG_VIEWER_ATTACH_NOT_LIVE);
goto send_reply;
- } else {
- send_streams = 1;
- response.status = htobe32(LTTNG_VIEWER_ATTACH_OK);
- cds_list_add(&session->viewer_session_list,
- &conn->viewer_session->sessions_head);
+ }
+
+ send_streams = 1;
+ ret = viewer_session_attach(conn->viewer_session, session);
+ if (ret) {
+ DBG("Already a viewer attached");
+ response.status = htobe32(LTTNG_VIEWER_ATTACH_ALREADY);
+ goto send_reply;
}
switch (be32toh(request.seek)) {
case LTTNG_VIEWER_SEEK_BEGINNING:
case LTTNG_VIEWER_SEEK_LAST:
+ response.status = htobe32(LTTNG_VIEWER_ATTACH_OK);
seek_type = be32toh(request.seek);
break;
default:
@@ -1069,7 +1020,7 @@ int viewer_attach_session(struct relay_connection *conn)
ret = make_viewer_streams(session, seek_type, &nb_streams, NULL, NULL);
if (ret < 0) {
- goto end_unlock;
+ goto end_put_session;
}
response.streams_count = htobe32(nb_streams);
@@ -1077,27 +1028,29 @@ send_reply:
health_code_update();
ret = send_response(conn->sock, &response, sizeof(response));
if (ret < 0) {
- goto end_unlock;
+ goto end_put_session;
}
health_code_update();
/*
- * Unknown or empty session, just return gracefully, the viewer knows what
- * is happening.
+ * Unknown or empty session, just return gracefully, the viewer
+ * knows what is happening.
*/
if (!send_streams || !nb_streams) {
ret = 0;
- goto end_unlock;
+ goto end_put_session;
}
/* Send stream and ignore the sent flag. */
ret = send_viewer_streams(conn->sock, session, 1);
if (ret < 0) {
- goto end_unlock;
+ goto end_put_session;
}
-end_unlock:
- rcu_read_unlock();
+end_put_session:
+ if (session) {
+ session_put(session);
+ }
error:
return ret;
}
@@ -1105,38 +1058,42 @@ error:
/*
* Open the index file if needed for the given vstream.
*
- * If an index file is successfully opened, the index_read_fd of the stream is
- * set with it.
+ * If an index file is successfully opened, the vstream index_fd set with
+ * it.
*
* Return 0 on success, a negative value on error (-ENOENT if not ready yet).
+ *
+ * Called with both rstream and vstream locks held.
*/
static int try_open_index(struct relay_viewer_stream *vstream,
struct relay_stream *rstream)
{
int ret = 0;
- assert(vstream);
- assert(rstream);
-
- if (vstream->index_read_fd >= 0) {
+ if (vstream->index_fd) {
goto end;
}
/*
- * First time, we open the index file and at least one index is ready. The
- * race between the read and write of the total_index_received is
- * acceptable here since the client will be notified to simply come back
- * and get the next index.
+ * First time, we open the index file and at least one index is ready.
*/
if (rstream->total_index_received <= 0) {
ret = -ENOENT;
goto end;
}
ret = index_open(vstream->path_name, vstream->channel_name,
- vstream->tracefile_count, vstream->tracefile_count_current);
+ vstream->stream->tracefile_count,
+ vstream->current_tracefile_id);
if (ret >= 0) {
- vstream->index_read_fd = ret;
- ret = 0;
+ vstream->index_fd = stream_fd_create(ret);
+ if (!vstream->index_fd) {
+ if (close(ret)) {
+ PERROR("close");
+ }
+ ret = -1;
+ } else {
+ ret = 0;
+ }
goto end;
}
@@ -1145,13 +1102,15 @@ end:
}
/*
- * Check the status of the index for the given stream. This function updates
- * the index structure if needed and can destroy the vstream also for the HUP
- * situation.
+ * Check the status of the index for the given stream. This function
+ * updates the index structure if needed and can put (close) the vstream
+ * in the HUP situation.
+ *
+ * Return 0 means that we can proceed with the index. A value of 1 means
+ * that the index has been updated and is ready to be send to the
+ * client. A negative value indicates an error that can't be handled.
*
- * Return 0 means that we can proceed with the index. A value of 1 means that
- * the index has been updated and is ready to be send to the client. A negative
- * value indicates an error that can't be handled.
+ * Called with both rstream and vstream locks held.
*/
static int check_index_status(struct relay_viewer_stream *vstream,
struct relay_stream *rstream, struct ctf_trace *trace,
@@ -1159,68 +1118,104 @@ static int check_index_status(struct relay_viewer_stream *vstream,
{
int ret;
- assert(vstream);
- assert(rstream);
- assert(index);
- assert(trace);
-
- if (!rstream->close_flag) {
- /* Rotate on abort (overwrite). */
- if (vstream->abort_flag) {
- DBG("Viewer stream %" PRIu64 " rotate because of overwrite",
- vstream->stream_handle);
- ret = viewer_stream_rotate(vstream, rstream);
+ if (trace->session->connection_closed
+ && rstream->total_index_received
+ == vstream->last_sent_index) {
+ /* Last index sent and session connection is closed. */
+ index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
+ goto hup;
+ } else if (rstream->beacon_ts_end != -1ULL &&
+ rstream->total_index_received
+ == vstream->last_sent_index) {
+ /*
+ * We've received a synchronization beacon and the last index
+ * available has been sent, the index for now is inactive.
+ */
+ index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
+ index->timestamp_end = htobe64(rstream->beacon_ts_end);
+ index->stream_id = htobe64(rstream->ctf_stream_id);
+ goto index_ready;
+ } else if (rstream->total_index_received <= vstream->last_sent_index) {
+ /* This actually checks the case where recv == last_sent. */
+ index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+ goto index_ready;
+ } else if (!viewer_stream_is_tracefile_id_readable(vstream,
+ vstream->current_tracefile_id)) {
+ /*
+ * The producer has overwritten our current file. We
+ * need to rotate.
+ */
+ DBG("Viewer stream %" PRIu64 " rotation due to overwrite",
+ vstream->stream->stream_handle);
+ ret = viewer_stream_rotate(vstream);
+ if (ret < 0) {
+ goto end;
+ } else if (ret == 1) {
+ /* EOF across entire stream. */
+ index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
+ goto hup;
+ } else if (!viewer_stream_is_tracefile_id_readable(vstream,
+ vstream->current_tracefile_id)) {
+ /*
+ * We rotated into a file being concurrently
+ * updated, need to retry later.
+ */
+ index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+ goto index_ready;
+ }
+ /* ret == 0 means successful so we continue. */
+ ret = 0;
+ } else {
+ ssize_t read_ret;
+ char tmp[1];
+
+ /*
+ * Use EOF on current index file to find out when we
+ * need to rotate.
+ */
+ read_ret = lttng_read(vstream->index_fd->fd, tmp, 1);
+ if (read_ret == 1) {
+ off_t seek_ret;
+
+ /* There is still data to read. Rewind position. */
+ seek_ret = lseek(vstream->index_fd->fd, -1, SEEK_CUR);
+ if (seek_ret < 0) {
+ ret = -1;
+ goto end;
+ }
+ ret = 0;
+ } else if (read_ret == 0) {
+ /* EOF. We need to rotate. */
+ DBG("Viewer stream %" PRIu64 " rotation due to EOF",
+ vstream->stream->stream_handle);
+ ret = viewer_stream_rotate(vstream);
if (ret < 0) {
- goto error;
+ goto end;
} else if (ret == 1) {
- /* EOF */
+ /* EOF across entire stream. */
index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
goto hup;
- }
- /* ret == 0 means successful so we continue. */
- }
-
- /* Check if we are in the same trace file at this point. */
- if (rstream->tracefile_count_current == vstream->tracefile_count_current) {
- if (rstream->beacon_ts_end != -1ULL &&
- vstream->last_sent_index == rstream->total_index_received) {
+ } else if (!viewer_stream_is_tracefile_id_readable(vstream,
+ vstream->current_tracefile_id)) {
/*
- * We've received a synchronization beacon and the last index
- * available has been sent, the index for now is inactive.
- */
- index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
- index->timestamp_end = htobe64(rstream->beacon_ts_end);
- index->stream_id = htobe64(rstream->ctf_stream_id);
- goto index_ready;
- } else if (rstream->total_index_received <= vstream->last_sent_index
- && !vstream->close_write_flag) {
- /*
- * Reader and writer are working in the same tracefile, so we care
- * about the number of index received and sent. Otherwise, we read
- * up to EOF.
+ * We rotated into a file being concurrently
+ * updated, need to retry later.
*/
index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
goto index_ready;
}
+ /* ret == 0 means successful so we continue. */
+ ret = 0;
+ } else {
+ /* Error reading index. */
+ ret = -1;
}
- /* Nothing to do with the index, continue with it. */
- ret = 0;
- } else if (rstream->close_flag && vstream->close_write_flag &&
- vstream->total_index_received == vstream->last_sent_index) {
- /* Last index sent and current tracefile closed in write */
- index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
- goto hup;
- } else {
- vstream->close_write_flag = 1;
- ret = 0;
}
-
-error:
+end:
return ret;
hup:
- viewer_stream_delete(vstream);
- viewer_stream_destroy(trace, vstream);
+ viewer_stream_put(vstream);
index_ready:
return 1;
}
@@ -1238,10 +1233,10 @@ int viewer_get_next_index(struct relay_connection *conn)
struct lttng_viewer_get_next_index request_index;
struct lttng_viewer_index viewer_index;
struct ctf_packet_index packet_index;
- struct relay_viewer_stream *vstream;
- struct relay_stream *rstream;
- struct ctf_trace *ctf_trace;
- struct relay_session *session;
+ struct relay_viewer_stream *vstream = NULL;
+ struct relay_stream *rstream = NULL;
+ struct ctf_trace *ctf_trace = NULL;
+ struct relay_viewer_stream *metadata_viewer_stream = NULL;
assert(conn);
@@ -1255,42 +1250,41 @@ int viewer_get_next_index(struct relay_connection *conn)
}
health_code_update();
- rcu_read_lock();
- vstream = viewer_stream_find_by_id(be64toh(request_index.stream_id));
+ vstream = viewer_stream_get_by_id(be64toh(request_index.stream_id));
if (!vstream) {
ret = -1;
- goto end_unlock;
+ goto end;
}
- session = session_find_by_id(conn->sessions_ht, vstream->session_id);
- if (!session) {
- ret = -1;
- goto end_unlock;
- }
+ /* Use back. ref. Protected by refcounts. */
+ rstream = vstream->stream;
+ ctf_trace = rstream->trace;
- ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht, vstream->path_name);
- assert(ctf_trace);
+ /* metadata_viewer_stream may be NULL. */
+ metadata_viewer_stream =
+ ctf_trace_get_viewer_metadata_stream(ctf_trace);
memset(&viewer_index, 0, sizeof(viewer_index));
+ pthread_mutex_lock(&rstream->lock);
+ pthread_mutex_lock(&vstream->lock);
+
/*
* The viewer should not ask for index on metadata stream.
*/
- if (vstream->metadata_flag) {
+ if (rstream->is_metadata) {
viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
goto send_reply;
}
- rstream = stream_find_by_id(relay_streams_ht, vstream->stream_handle);
- assert(rstream);
-
/* Try to open an index if one is needed for that stream. */
ret = try_open_index(vstream, rstream);
if (ret < 0) {
if (ret == -ENOENT) {
/*
- * The index is created only when the first data packet arrives, it
- * might not be ready at the beginning of the session
+ * The index is created only when the first data
+ * packet arrives, it might not be ready at the
+ * beginning of the session
*/
viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
} else {
@@ -1300,80 +1294,71 @@ int viewer_get_next_index(struct relay_connection *conn)
goto send_reply;
}
- pthread_mutex_lock(&rstream->viewer_stream_rotation_lock);
ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index);
- pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
if (ret < 0) {
- goto end_unlock;
+ goto error_put;
} else if (ret == 1) {
/*
- * This means the viewer index data structure has been populated by the
- * check call thus we now send back the reply to the client.
+ * This means the viewer index data structure has been
+ * populated by the check call thus we now send back the
+ * reply to the client.
*/
goto send_reply;
}
/* At this point, ret MUST be 0 thus we continue with the get. */
assert(!ret);
- if (!ctf_trace->metadata_received ||
- ctf_trace->metadata_received > ctf_trace->metadata_sent) {
- viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
+ /*
+ * vstream->stream_fd may be NULL if it has been closed by
+ * tracefile rotation, or if we are at the beginning of the
+ * stream. We open the data stream file here to protect against
+ * overwrite caused by tracefile rotation (in association with
+ * unlink performed before overwrite).
+ */
+ if (!vstream->stream_fd) {
+ char fullpath[PATH_MAX];
+
+ if (vstream->stream->tracefile_count > 0) {
+ ret = snprintf(fullpath, PATH_MAX, "%s/%s_%" PRIu64,
+ vstream->path_name,
+ vstream->channel_name,
+ vstream->current_tracefile_id);
+ } else {
+ ret = snprintf(fullpath, PATH_MAX, "%s/%s",
+ vstream->path_name,
+ vstream->channel_name);
+ }
+ if (ret < 0) {
+ goto error_put;
+ }
+ ret = open(fullpath, O_RDONLY);
+ if (ret < 0) {
+ PERROR("Relay opening trace file");
+ goto error_put;
+ }
+ vstream->stream_fd = stream_fd_create(ret);
+ if (!vstream->stream_fd) {
+ if (close(ret)) {
+ PERROR("close");
+ }
+ goto error_put;
+ }
}
ret = check_new_streams(conn);
if (ret < 0) {
- goto end_unlock;
+ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
+ goto send_reply;
} else if (ret == 1) {
viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
}
- pthread_mutex_lock(&rstream->viewer_stream_rotation_lock);
- pthread_mutex_lock(&vstream->overwrite_lock);
- if (vstream->abort_flag) {
- /* The file is being overwritten by the writer, we cannot use it. */
- pthread_mutex_unlock(&vstream->overwrite_lock);
- ret = viewer_stream_rotate(vstream, rstream);
- pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
- if (ret < 0) {
- goto end_unlock;
- } else if (ret == 1) {
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
- viewer_stream_delete(vstream);
- viewer_stream_destroy(ctf_trace, vstream);
- } else {
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
- }
- goto send_reply;
- }
-
- read_ret = lttng_read(vstream->index_read_fd, &packet_index,
+ read_ret = lttng_read(vstream->index_fd->fd, &packet_index,
sizeof(packet_index));
- pthread_mutex_unlock(&vstream->overwrite_lock);
- pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
- if (read_ret < 0) {
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
- viewer_stream_delete(vstream);
- viewer_stream_destroy(ctf_trace, vstream);
- goto send_reply;
- } else if (read_ret < sizeof(packet_index)) {
- pthread_mutex_lock(&rstream->viewer_stream_rotation_lock);
- if (vstream->close_write_flag) {
- ret = viewer_stream_rotate(vstream, rstream);
- if (ret < 0) {
- pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
- goto end_unlock;
- } else if (ret == 1) {
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
- viewer_stream_delete(vstream);
- viewer_stream_destroy(ctf_trace, vstream);
- } else {
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
- }
- } else {
- ERR("Relay reading index file %d", vstream->index_read_fd);
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
- }
- pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
+ if (read_ret < sizeof(packet_index)) {
+ ERR("Relay reading index file %d returned %zd",
+ vstream->index_fd->fd, read_ret);
+ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
goto send_reply;
} else {
viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_OK);
@@ -1383,6 +1368,9 @@ int viewer_get_next_index(struct relay_connection *conn)
/*
* Indexes are stored in big endian, no need to switch before sending.
*/
+ DBG("Sending viewer index for stream %" PRIu64 " offset %" PRIu64,
+ rstream->stream_handle,
+ be64toh(packet_index.offset));
viewer_index.offset = packet_index.offset;
viewer_index.packet_size = packet_index.packet_size;
viewer_index.content_size = packet_index.content_size;
@@ -1392,22 +1380,53 @@ int viewer_get_next_index(struct relay_connection *conn)
viewer_index.stream_id = packet_index.stream_id;
send_reply:
+ pthread_mutex_unlock(&vstream->lock);
+ pthread_mutex_unlock(&rstream->lock);
+
+ if (metadata_viewer_stream) {
+ pthread_mutex_lock(&metadata_viewer_stream->stream->lock);
+ pthread_mutex_lock(&metadata_viewer_stream->lock);
+ DBG("get next index metadata check: recv %" PRIu64
+ " sent %" PRIu64,
+ metadata_viewer_stream->stream->metadata_received,
+ metadata_viewer_stream->metadata_sent);
+ if (!metadata_viewer_stream->stream->metadata_received ||
+ metadata_viewer_stream->stream->metadata_received >
+ metadata_viewer_stream->metadata_sent) {
+ viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
+ }
+ pthread_mutex_unlock(&metadata_viewer_stream->lock);
+ pthread_mutex_unlock(&metadata_viewer_stream->stream->lock);
+ }
+
viewer_index.flags = htobe32(viewer_index.flags);
health_code_update();
ret = send_response(conn->sock, &viewer_index, sizeof(viewer_index));
if (ret < 0) {
- goto end_unlock;
+ goto end;
}
health_code_update();
DBG("Index %" PRIu64 " for stream %" PRIu64 " sent",
- vstream->last_sent_index, vstream->stream_handle);
-
-end_unlock:
- rcu_read_unlock();
-
+ vstream->last_sent_index,
+ vstream->stream->stream_handle);
end:
+ if (metadata_viewer_stream) {
+ viewer_stream_put(metadata_viewer_stream);
+ }
+ if (vstream) {
+ viewer_stream_put(vstream);
+ }
+ return ret;
+
+error_put:
+ pthread_mutex_unlock(&vstream->lock);
+ pthread_mutex_unlock(&rstream->lock);
+ if (metadata_viewer_stream) {
+ viewer_stream_put(metadata_viewer_stream);
+ }
+ viewer_stream_put(vstream);
return ret;
}
@@ -1425,17 +1444,16 @@ int viewer_get_packet(struct relay_connection *conn)
ssize_t read_len;
struct lttng_viewer_get_packet get_packet_info;
struct lttng_viewer_trace_packet reply;
- struct relay_viewer_stream *stream;
- struct relay_session *session;
+ struct relay_viewer_stream *vstream = NULL;
struct ctf_trace *ctf_trace;
-
- assert(conn);
+ struct relay_viewer_stream *metadata_viewer_stream = NULL;
DBG2("Relay get data packet");
health_code_update();
- ret = recv_request(conn->sock, &get_packet_info, sizeof(get_packet_info));
+ ret = recv_request(conn->sock, &get_packet_info,
+ sizeof(get_packet_info));
if (ret < 0) {
goto end;
}
@@ -1444,59 +1462,53 @@ int viewer_get_packet(struct relay_connection *conn)
/* From this point on, the error label can be reached. */
memset(&reply, 0, sizeof(reply));
- rcu_read_lock();
- stream = viewer_stream_find_by_id(be64toh(get_packet_info.stream_id));
- if (!stream) {
+ vstream = viewer_stream_get_by_id(be64toh(get_packet_info.stream_id));
+ if (!vstream) {
goto error;
}
- session = session_find_by_id(conn->sessions_ht, stream->session_id);
- if (!session) {
- ret = -1;
- goto error;
- }
+ ctf_trace = vstream->stream->trace;
- ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
- stream->path_name);
- assert(ctf_trace);
+ /* metadata_viewer_stream may be NULL. */
+ metadata_viewer_stream =
+ ctf_trace_get_viewer_metadata_stream(ctf_trace);
- /*
- * First time we read this stream, we need open the tracefile, we should
- * only arrive here if an index has already been sent to the viewer, so the
- * tracefile must exist, if it does not it is a fatal error.
- */
- if (stream->read_fd < 0) {
- char fullpath[PATH_MAX];
+ if (metadata_viewer_stream) {
+ bool get_packet_err = false;
- if (stream->tracefile_count > 0) {
- ret = snprintf(fullpath, PATH_MAX, "%s/%s_%" PRIu64, stream->path_name,
- stream->channel_name,
- stream->tracefile_count_current);
- } else {
- ret = snprintf(fullpath, PATH_MAX, "%s/%s", stream->path_name,
- stream->channel_name);
+ pthread_mutex_lock(&metadata_viewer_stream->stream->lock);
+ pthread_mutex_lock(&metadata_viewer_stream->lock);
+ DBG("get packet metadata check: recv %" PRIu64 " sent %" PRIu64,
+ metadata_viewer_stream->stream->metadata_received,
+ metadata_viewer_stream->metadata_sent);
+ if (!metadata_viewer_stream->stream->metadata_received ||
+ metadata_viewer_stream->stream->metadata_received >
+ metadata_viewer_stream->metadata_sent) {
+ get_packet_err = true;
}
- if (ret < 0) {
- goto error;
- }
- ret = open(fullpath, O_RDONLY);
- if (ret < 0) {
- PERROR("Relay opening trace file");
- goto error;
+ pthread_mutex_unlock(&metadata_viewer_stream->lock);
+ pthread_mutex_unlock(&metadata_viewer_stream->stream->lock);
+ viewer_stream_put(metadata_viewer_stream);
+ if (get_packet_err) {
+ reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
+ reply.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
+ goto send_reply_nolock;
}
- stream->read_fd = ret;
- }
-
- if (!ctf_trace->metadata_received ||
- ctf_trace->metadata_received > ctf_trace->metadata_sent) {
- reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
- reply.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
- goto send_reply;
}
+ pthread_mutex_lock(&vstream->lock);
+ /*
+ * The vstream->stream_fd used here has been opened by
+ * get_next_index. It is opened there because this is what
+ * allows us to grab a reference to the file with stream lock
+ * held, thus protecting us against overwrite caused by
+ * tracefile rotation. Since tracefile rotation unlinks the old
+ * data file, we are ensured that we won't have our data
+ * overwritten under us.
+ */
ret = check_new_streams(conn);
if (ret < 0) {
- goto end_unlock;
+ goto end_free;
} else if (ret == 1) {
reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
reply.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
@@ -1510,34 +1522,19 @@ int viewer_get_packet(struct relay_connection *conn)
goto error;
}
- ret = lseek(stream->read_fd, be64toh(get_packet_info.offset), SEEK_SET);
+ ret = lseek(vstream->stream_fd->fd, be64toh(get_packet_info.offset),
+ SEEK_SET);
if (ret < 0) {
- /*
- * If the read fd was closed by the streaming side, the
- * abort_flag will be set to 1, otherwise it is an error.
- */
- if (stream->abort_flag == 0) {
- PERROR("lseek");
- goto error;
- }
- reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_EOF);
- goto send_reply;
+ PERROR("lseek fd %d to offset %" PRIu64, vstream->stream_fd->fd,
+ be64toh(get_packet_info.offset));
+ goto error;
}
- read_len = lttng_read(stream->read_fd, data, len);
+ read_len = lttng_read(vstream->stream_fd->fd, data, len);
if (read_len < len) {
- /*
- * If the read fd was closed by the streaming side, the
- * abort_flag will be set to 1, otherwise it is an error.
- */
- if (stream->abort_flag == 0) {
- PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64,
- stream->read_fd,
- be64toh(get_packet_info.offset));
- goto error;
- } else {
- reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_EOF);
- goto send_reply;
- }
+ PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64,
+ vstream->stream_fd->fd,
+ be64toh(get_packet_info.offset));
+ goto error;
}
reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
reply.len = htobe32(len);
@@ -1548,13 +1545,17 @@ error:
reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
send_reply:
+ if (vstream) {
+ pthread_mutex_unlock(&vstream->lock);
+ }
+send_reply_nolock:
reply.flags = htobe32(reply.flags);
health_code_update();
ret = send_response(conn->sock, &reply, sizeof(reply));
if (ret < 0) {
- goto end_unlock;
+ goto end_free;
}
health_code_update();
@@ -1562,7 +1563,7 @@ send_reply:
health_code_update();
ret = send_response(conn->sock, data, len);
if (ret < 0) {
- goto end_unlock;
+ goto end_free;
}
health_code_update();
}
@@ -1570,11 +1571,12 @@ send_reply:
DBG("Sent %u bytes for stream %" PRIu64, len,
be64toh(get_packet_info.stream_id));
-end_unlock:
+end_free:
free(data);
- rcu_read_unlock();
-
end:
+ if (vstream) {
+ viewer_stream_put(vstream);
+ }
return ret;
}
@@ -1592,9 +1594,7 @@ int viewer_get_metadata(struct relay_connection *conn)
char *data = NULL;
struct lttng_viewer_get_metadata request;
struct lttng_viewer_metadata_packet reply;
- struct relay_viewer_stream *stream;
- struct ctf_trace *ctf_trace;
- struct relay_session *session;
+ struct relay_viewer_stream *vstream = NULL;
assert(conn);
@@ -1610,36 +1610,30 @@ int viewer_get_metadata(struct relay_connection *conn)
memset(&reply, 0, sizeof(reply));
- rcu_read_lock();
- stream = viewer_stream_find_by_id(be64toh(request.stream_id));
- if (!stream || !stream->metadata_flag) {
- ERR("Invalid metadata stream");
- goto error;
+ vstream = viewer_stream_get_by_id(be64toh(request.stream_id));
+ if (!vstream) {
+ reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
+ goto send_reply;
}
-
- session = session_find_by_id(conn->sessions_ht, stream->session_id);
- if (!session) {
- ret = -1;
+ if (!vstream->stream->is_metadata) {
+ ERR("Invalid metadata stream");
goto error;
}
- ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
- stream->path_name);
- assert(ctf_trace);
- assert(ctf_trace->metadata_sent <= ctf_trace->metadata_received);
+ assert(vstream->metadata_sent <= vstream->stream->metadata_received);
- len = ctf_trace->metadata_received - ctf_trace->metadata_sent;
+ len = vstream->stream->metadata_received - vstream->metadata_sent;
if (len == 0) {
reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
goto send_reply;
}
/* first time, we open the metadata file */
- if (stream->read_fd < 0) {
+ if (!vstream->stream_fd) {
char fullpath[PATH_MAX];
- ret = snprintf(fullpath, PATH_MAX, "%s/%s", stream->path_name,
- stream->channel_name);
+ ret = snprintf(fullpath, PATH_MAX, "%s/%s", vstream->path_name,
+ vstream->channel_name);
if (ret < 0) {
goto error;
}
@@ -1648,7 +1642,13 @@ int viewer_get_metadata(struct relay_connection *conn)
PERROR("Relay opening metadata file");
goto error;
}
- stream->read_fd = ret;
+ vstream->stream_fd = stream_fd_create(ret);
+ if (!vstream->stream_fd) {
+ if (close(ret)) {
+ PERROR("close");
+ }
+ goto error;
+ }
}
reply.len = htobe64(len);
@@ -1658,13 +1658,20 @@ int viewer_get_metadata(struct relay_connection *conn)
goto error;
}
- read_len = lttng_read(stream->read_fd, data, len);
+ read_len = lttng_read(vstream->stream_fd->fd, data, len);
if (read_len < len) {
PERROR("Relay reading metadata file");
goto error;
}
- ctf_trace->metadata_sent += read_len;
+ vstream->metadata_sent += read_len;
+ if (vstream->metadata_sent == vstream->stream->metadata_received
+ && vstream->stream->closed) {
+ /* Release ownership for the viewer metadata stream. */
+ viewer_stream_put(vstream);
+ }
+
reply.status = htobe32(LTTNG_VIEWER_METADATA_OK);
+
goto send_reply;
error:
@@ -1674,14 +1681,14 @@ send_reply:
health_code_update();
ret = send_response(conn->sock, &reply, sizeof(reply));
if (ret < 0) {
- goto end_unlock;
+ goto end_free;
}
health_code_update();
if (len > 0) {
ret = send_response(conn->sock, data, len);
if (ret < 0) {
- goto end_unlock;
+ goto end_free;
}
}
@@ -1690,10 +1697,12 @@ send_reply:
DBG("Metadata sent");
-end_unlock:
+end_free:
free(data);
- rcu_read_unlock();
end:
+ if (vstream) {
+ viewer_stream_put(vstream);
+ }
return ret;
}
@@ -1712,13 +1721,12 @@ int viewer_create_session(struct relay_connection *conn)
memset(&resp, 0, sizeof(resp));
resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_OK);
- conn->viewer_session = zmalloc(sizeof(*conn->viewer_session));
+ conn->viewer_session = viewer_session_create();
if (!conn->viewer_session) {
ERR("Allocation viewer session");
resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_ERR);
goto send_reply;
}
- CDS_INIT_LIST_HEAD(&conn->viewer_session->sessions_head);
send_reply:
health_code_update();
@@ -1757,9 +1765,6 @@ int process_control(struct lttng_viewer_cmd *recv_hdr,
int ret = 0;
uint32_t msg_value;
- assert(recv_hdr);
- assert(conn);
-
msg_value = be32toh(recv_hdr->cmd);
/*
@@ -1798,7 +1803,8 @@ int process_control(struct lttng_viewer_cmd *recv_hdr,
ret = viewer_create_session(conn);
break;
default:
- ERR("Received unknown viewer command (%u)", be32toh(recv_hdr->cmd));
+ ERR("Received unknown viewer command (%u)",
+ be32toh(recv_hdr->cmd));
live_relay_unknown_command(conn);
ret = -1;
goto end;
@@ -1813,8 +1819,6 @@ void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
{
int ret;
- assert(events);
-
(void) lttng_poll_del(events, pollfd);
ret = close(pollfd);
@@ -1824,38 +1828,6 @@ void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
}
/*
- * Delete and destroy a connection.
- *
- * RCU read side lock MUST be acquired.
- */
-static void destroy_connection(struct lttng_ht *relay_connections_ht,
- struct relay_connection *conn)
-{
- struct relay_session *session, *tmp_session;
-
- assert(relay_connections_ht);
- assert(conn);
-
- connection_delete(relay_connections_ht, conn);
-
- if (!conn->viewer_session) {
- goto end;
- }
-
- rcu_read_lock();
- cds_list_for_each_entry_safe(session, tmp_session,
- &conn->viewer_session->sessions_head,
- viewer_session_list) {
- DBG("Cleaning connection of session ID %" PRIu64, session->id);
- cleanup_session(conn, session);
- }
- rcu_read_unlock();
-
-end:
- connection_destroy(conn);
-}
-
-/*
* This thread does the actual work
*/
static
@@ -1867,8 +1839,6 @@ void *thread_worker(void *data)
struct lttng_ht *relay_connections_ht;
struct lttng_ht_iter iter;
struct lttng_viewer_cmd recv_hdr;
- struct relay_local_data *relay_ctx = (struct relay_local_data *) data;
- struct lttng_ht *sessions_ht = relay_ctx->sessions_ht;
struct relay_connection *destroy_conn;
DBG("[thread] Live viewer relay worker started");
@@ -1952,50 +1922,50 @@ restart:
} else if (revents & LPOLLIN) {
struct relay_connection *conn;
- ret = lttng_read(live_conn_pipe[0], &conn, sizeof(conn));
+ ret = lttng_read(live_conn_pipe[0],
+ &conn, sizeof(conn));
if (ret < 0) {
goto error;
}
- conn->sessions_ht = sessions_ht;
- connection_init(conn);
lttng_poll_add(&events, conn->sock->fd,
LPOLLIN | LPOLLRDHUP);
- rcu_read_lock();
- lttng_ht_add_unique_ulong(relay_connections_ht,
- &conn->sock_n);
- rcu_read_unlock();
- DBG("Connection socket %d added", conn->sock->fd);
+ connection_ht_add(relay_connections_ht, conn);
+ DBG("Connection socket %d added to poll", conn->sock->fd);
}
} else {
struct relay_connection *conn;
- rcu_read_lock();
- conn = connection_find_by_sock(relay_connections_ht, pollfd);
- /* If not found, there is a synchronization issue. */
- assert(conn);
+ conn = connection_get_by_sock(relay_connections_ht, pollfd);
+ if (!conn) {
+ continue;
+ }
if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
cleanup_connection_pollfd(&events, pollfd);
- destroy_connection(relay_connections_ht, conn);
+ /* Put "open" ownership reference. */
+ connection_put(conn);
} else if (revents & LPOLLIN) {
ret = conn->sock->ops->recvmsg(conn->sock, &recv_hdr,
sizeof(recv_hdr), 0);
if (ret <= 0) {
/* Connection closed */
cleanup_connection_pollfd(&events, pollfd);
- destroy_connection(relay_connections_ht, conn);
+ /* Put "open" ownership reference. */
+ connection_put(conn);
DBG("Viewer control conn closed with %d", pollfd);
} else {
ret = process_control(&recv_hdr, conn);
if (ret < 0) {
/* Clear the session on error. */
cleanup_connection_pollfd(&events, pollfd);
- destroy_connection(relay_connections_ht, conn);
+ /* Put "open" ownership reference. */
+ connection_put(conn);
DBG("Viewer connection closed with %d", pollfd);
}
}
}
- rcu_read_unlock();
+ /* Put "get_by_sock" reference. */
+ connection_put(conn);
}
}
}
@@ -2010,7 +1980,7 @@ error:
destroy_conn,
sock_n.node) {
health_code_update();
- destroy_connection(relay_connections_ht, destroy_conn);
+ connection_put(destroy_conn);
}
rcu_read_unlock();
error_poll_create:
@@ -2078,8 +2048,7 @@ int relayd_live_join(void)
/*
* main
*/
-int relayd_live_create(struct lttng_uri *uri,
- struct relay_local_data *relay_ctx)
+int relayd_live_create(struct lttng_uri *uri)
{
int ret = 0, retval = 0;
void *status;
@@ -2129,7 +2098,7 @@ int relayd_live_create(struct lttng_uri *uri,
/* Setup the worker thread */
ret = pthread_create(&live_worker_thread, NULL,
- thread_worker, relay_ctx);
+ thread_worker, NULL);
if (ret) {
errno = ret;
PERROR("pthread_create viewer worker");
diff --git a/src/bin/lttng-relayd/live.h b/src/bin/lttng-relayd/live.h
index 5db940b..2b8a3a0 100644
--- a/src/bin/lttng-relayd/live.h
+++ b/src/bin/lttng-relayd/live.h
@@ -1,6 +1,10 @@
+#ifndef LTTNG_RELAYD_LIVE_H
+#define LTTNG_RELAYD_LIVE_H
+
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -16,15 +20,11 @@
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef LTTNG_RELAYD_LIVE_H
-#define LTTNG_RELAYD_LIVE_H
-
#include <common/uri.h>
#include "lttng-relayd.h"
-int relayd_live_create(struct lttng_uri *live_uri,
- struct relay_local_data *relay_ctx);
+int relayd_live_create(struct lttng_uri *live_uri);
int relayd_live_stop(void);
int relayd_live_join(void);
diff --git a/src/bin/lttng-relayd/lttng-relayd.h b/src/bin/lttng-relayd/lttng-relayd.h
index 245c5fd..889071c 100644
--- a/src/bin/lttng-relayd/lttng-relayd.h
+++ b/src/bin/lttng-relayd/lttng-relayd.h
@@ -1,6 +1,10 @@
+#ifndef LTTNG_RELAYD_H
+#define LTTNG_RELAYD_H
+
/*
* Copyright (C) 2012 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2 only,
@@ -16,9 +20,6 @@
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef LTTNG_RELAYD_H
-#define LTTNG_RELAYD_H
-
#include <limits.h>
#include <urcu.h>
#include <urcu/wfcqueue.h>
@@ -34,26 +35,22 @@ struct relay_conn_queue {
int32_t futex;
};
-struct relay_local_data {
- struct lttng_ht *sessions_ht;
-};
-
-extern char *opt_output_path;
-
/*
* Contains stream indexed by ID. This is important since many commands lookup
* streams only by ID thus also keeping them in this hash table makes the
- * search O(1) instead of iterating over the ctf_traces_ht of the session.
+ * search O(1).
*/
+extern struct lttng_ht *sessions_ht;
extern struct lttng_ht *relay_streams_ht;
-
extern struct lttng_ht *viewer_streams_ht;
-extern struct lttng_ht *indexes_ht;
+extern char *opt_output_path;
extern const char *tracing_group_name;
-
extern const char * const config_section_name;
+extern uid_t relayd_uid;
+extern gid_t relayd_gid;
+
extern int thread_quit_pipe[2];
void lttng_relay_notify_ready(void);
diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c
index 8e1879f..46a4e55 100644
--- a/src/bin/lttng-relayd/main.c
+++ b/src/bin/lttng-relayd/main.c
@@ -2,6 +2,7 @@
* Copyright (C) 2012 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
* 2013 - Jérémie Galarneau <jeremie.galarneau at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2 only,
@@ -56,6 +57,7 @@
#include <common/uri.h>
#include <common/utils.h>
#include <common/config/config.h>
+#include <urcu/rculist.h>
#include "cmd.h"
#include "ctf-trace.h"
@@ -114,6 +116,11 @@ static pthread_t dispatcher_thread;
static pthread_t worker_thread;
static pthread_t health_thread;
+/*
+ * last_relay_stream_id_lock protects last_relay_stream_id increment
+ * atomicity on 32-bit architectures.
+ */
+static pthread_mutex_t last_relay_stream_id_lock = PTHREAD_MUTEX_INITIALIZER;
static uint64_t last_relay_stream_id;
/*
@@ -129,8 +136,8 @@ static char *data_buffer;
static unsigned int data_buffer_size;
/* We need those values for the file/dir creation. */
-static uid_t relayd_uid;
-static gid_t relayd_gid;
+uid_t relayd_uid;
+gid_t relayd_gid;
/* Global relay stream hash table. */
struct lttng_ht *relay_streams_ht;
@@ -138,8 +145,8 @@ struct lttng_ht *relay_streams_ht;
/* Global relay viewer stream hash table. */
struct lttng_ht *viewer_streams_ht;
-/* Global hash table that stores relay index object. */
-struct lttng_ht *indexes_ht;
+/* Global relay sessions hash table. */
+struct lttng_ht *sessions_ht;
/* Relayd health monitoring */
struct health_app *health_relayd;
@@ -163,8 +170,7 @@ static const char *config_ignore_options[] = { "help", "config" };
/*
* usage function on stderr
*/
-static
-void usage(void)
+static void usage(void)
{
fprintf(stderr, "Usage: %s OPTIONS\n\nOptions:\n", progname);
fprintf(stderr, " -h, --help Display this usage.\n");
@@ -185,8 +191,7 @@ void usage(void)
*
* Return 0 on success else a negative value.
*/
-static
-int set_option(int opt, const char *arg, const char *optname)
+static int set_option(int opt, const char *arg, const char *optname)
{
int ret;
@@ -308,8 +313,7 @@ end:
* See config_entry_handler_cb comment in common/config/config.h for the
* return value conventions.
*/
-static
-int config_entry_handler(const struct config_entry *entry, void *unused)
+static int config_entry_handler(const struct config_entry *entry, void *unused)
{
int ret = 0, i;
@@ -332,9 +336,9 @@ int config_entry_handler(const struct config_entry *entry, void *unused)
}
/*
- * If the option takes no argument on the command line, we have to
- * check if the value is "true". We support non-zero numeric values,
- * true, on and yes.
+ * If the option takes no argument on the command line,
+ * we have to check if the value is "true". We support
+ * non-zero numeric values, true, on and yes.
*/
if (!long_options[i].has_arg) {
ret = config_parse_value(entry->value);
@@ -359,8 +363,7 @@ end:
return ret;
}
-static
-int set_options(int argc, char **argv)
+static int set_options(int argc, char **argv)
{
int c, ret = 0, option_index = 0, retval = 0;
int orig_optopt = optopt, orig_optind = optind;
@@ -483,21 +486,32 @@ exit:
return retval;
}
+static void print_global_objects(void)
+{
+ rcu_register_thread();
+
+ print_viewer_streams();
+ print_relay_streams();
+ print_sessions();
+
+ rcu_unregister_thread();
+}
+
/*
* Cleanup the daemon
*/
-static
-void relayd_cleanup(struct relay_local_data *relay_ctx)
+static void relayd_cleanup(void)
{
+ print_global_objects();
+
DBG("Cleaning up");
if (viewer_streams_ht)
lttng_ht_destroy(viewer_streams_ht);
if (relay_streams_ht)
lttng_ht_destroy(relay_streams_ht);
- if (relay_ctx && relay_ctx->sessions_ht)
- lttng_ht_destroy(relay_ctx->sessions_ht);
- free(relay_ctx);
+ if (sessions_ht)
+ lttng_ht_destroy(sessions_ht);
/* free the dynamically allocated opt_output_path */
free(opt_output_path);
@@ -517,8 +531,7 @@ void relayd_cleanup(struct relay_local_data *relay_ctx)
/*
* Write to writable pipe used to notify a thread.
*/
-static
-int notify_thread_pipe(int wpipe)
+static int notify_thread_pipe(int wpipe)
{
ssize_t ret;
@@ -532,8 +545,7 @@ end:
return ret;
}
-static
-int notify_health_quit_pipe(int *pipe)
+static int notify_health_quit_pipe(int *pipe)
{
ssize_t ret;
@@ -582,8 +594,7 @@ int lttng_relay_stop_threads(void)
* Simply stop all worker threads, leaving main() return gracefully after
* joining all threads and calling cleanup().
*/
-static
-void sighandler(int sig)
+static void sighandler(int sig)
{
switch (sig) {
case SIGPIPE:
@@ -613,8 +624,7 @@ void sighandler(int sig)
* Setup signal handler for :
* SIGINT, SIGTERM, SIGPIPE
*/
-static
-int set_signal_handler(void)
+static int set_signal_handler(void)
{
int ret = 0;
struct sigaction sa;
@@ -668,8 +678,7 @@ void lttng_relay_notify_ready(void)
*
* Return -1 on error or 0 if all pipes are created.
*/
-static
-int init_thread_quit_pipe(void)
+static int init_thread_quit_pipe(void)
{
int ret;
@@ -681,8 +690,7 @@ int init_thread_quit_pipe(void)
/*
* Create a poll set with O_CLOEXEC and add the thread quit pipe to the set.
*/
-static
-int create_thread_poll_set(struct lttng_poll_event *events, int size)
+static int create_thread_poll_set(struct lttng_poll_event *events, int size)
{
int ret;
@@ -713,8 +721,7 @@ error:
*
* Return 1 if it was triggered else 0;
*/
-static
-int check_thread_quit_pipe(int fd, uint32_t events)
+static int check_thread_quit_pipe(int fd, uint32_t events)
{
if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) {
return 1;
@@ -726,8 +733,7 @@ int check_thread_quit_pipe(int fd, uint32_t events)
/*
* Create and init socket from uri.
*/
-static
-struct lttcomm_sock *relay_init_sock(struct lttng_uri *uri)
+static struct lttcomm_sock *relay_socket_create(struct lttng_uri *uri)
{
int ret;
struct lttcomm_sock *sock = NULL;
@@ -765,63 +771,9 @@ error:
}
/*
- * Return nonzero if stream needs to be closed.
- */
-static
-int close_stream_check(struct relay_stream *stream)
-{
- if (stream->close_flag && stream->prev_seq == stream->last_net_seq_num) {
- /*
- * We are about to close the stream so set the data pending flag to 1
- * which will make the end data pending command skip the stream which
- * is now closed and ready. Note that after proceeding to a file close,
- * the written file is ready for reading.
- */
- stream->data_pending_check_done = 1;
- return 1;
- }
- return 0;
-}
-
-static void try_close_stream(struct relay_session *session,
- struct relay_stream *stream)
-{
- int ret;
- struct ctf_trace *ctf_trace;
-
- assert(session);
- assert(stream);
-
- if (!close_stream_check(stream)) {
- /* Can't close it, not ready for that. */
- goto end;
- }
-
- ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
- stream->path_name);
- assert(ctf_trace);
-
- pthread_mutex_lock(&session->viewer_ready_lock);
- ctf_trace->invalid_flag = 1;
- pthread_mutex_unlock(&session->viewer_ready_lock);
-
- ret = stream_close(session, stream);
- if (ret || session->snapshot) {
- /* Already close thus the ctf trace is being or has been destroyed. */
- goto end;
- }
-
- ctf_trace_try_destroy(session, ctf_trace);
-
-end:
- return;
-}
-
-/*
* This thread manages the listening for new connections on the network
*/
-static
-void *relay_thread_listener(void *data)
+static void *relay_thread_listener(void *data)
{
int i, ret, pollfd, err = -1;
uint32_t revents, nb_fd;
@@ -834,18 +786,19 @@ void *relay_thread_listener(void *data)
health_code_update();
- control_sock = relay_init_sock(control_uri);
+ control_sock = relay_socket_create(control_uri);
if (!control_sock) {
goto error_sock_control;
}
- data_sock = relay_init_sock(data_uri);
+ data_sock = relay_socket_create(data_uri);
if (!data_sock) {
goto error_sock_relay;
}
/*
- * Pass 3 as size here for the thread quit pipe, control and data socket.
+ * Pass 3 as size here for the thread quit pipe, control and
+ * data socket.
*/
ret = create_thread_poll_set(&events, 3);
if (ret < 0) {
@@ -900,7 +853,10 @@ restart:
pollfd = LTTNG_POLL_GETFD(&events, i);
if (!revents) {
- /* No activity for this FD (poll implementation). */
+ /*
+ * No activity for this FD (poll
+ * implementation).
+ */
continue;
}
@@ -916,33 +872,29 @@ restart:
goto error;
} else if (revents & LPOLLIN) {
/*
- * Get allocated in this thread, enqueued to a global queue,
- * dequeued and freed in the worker thread.
+ * Get allocated in this thread,
+ * enqueued to a global queue, dequeued
+ * and freed in the worker thread.
*/
int val = 1;
struct relay_connection *new_conn;
struct lttcomm_sock *newsock;
-
- new_conn = connection_create();
- if (!new_conn) {
- goto error;
- }
+ enum connection_type type;
if (pollfd == data_sock->fd) {
- new_conn->type = RELAY_DATA;
+ type = RELAY_DATA;
newsock = data_sock->ops->accept(data_sock);
DBG("Relay data connection accepted, socket %d",
newsock->fd);
} else {
assert(pollfd == control_sock->fd);
- new_conn->type = RELAY_CONTROL;
+ type = RELAY_CONTROL;
newsock = control_sock->ops->accept(control_sock);
DBG("Relay control connection accepted, socket %d",
newsock->fd);
}
if (!newsock) {
PERROR("accepting sock");
- connection_free(new_conn);
goto error;
}
@@ -951,18 +903,22 @@ restart:
if (ret < 0) {
PERROR("setsockopt inet");
lttcomm_destroy_sock(newsock);
- connection_free(new_conn);
goto error;
}
- new_conn->sock = newsock;
+ new_conn = connection_create(newsock, type);
+ if (!new_conn) {
+ lttcomm_destroy_sock(newsock);
+ goto error;
+ }
/* Enqueue request for the dispatcher thread. */
cds_wfcq_enqueue(&relay_conn_queue.head, &relay_conn_queue.tail,
&new_conn->qnode);
/*
- * Wake the dispatch queue futex. Implicit memory barrier with
- * the exchange in cds_wfcq_enqueue.
+ * Wake the dispatch queue futex.
+ * Implicit memory barrier with the
+ * exchange in cds_wfcq_enqueue.
*/
futex_nto1_wake(&relay_conn_queue.futex);
}
@@ -1004,8 +960,7 @@ error_sock_control:
/*
* This thread manages the dispatching of the requests to worker threads
*/
-static
-void *relay_thread_dispatcher(void *data)
+static void *relay_thread_dispatcher(void *data)
{
int err = -1;
ssize_t ret;
@@ -1044,14 +999,15 @@ void *relay_thread_dispatcher(void *data)
DBG("Dispatching request waiting on sock %d", new_conn->sock->fd);
/*
- * Inform worker thread of the new request. This call is blocking
- * so we can be assured that the data will be read at some point in
- * time or wait to the end of the world :)
+ * Inform worker thread of the new request. This
+ * call is blocking so we can be assured that
+ * the data will be read at some point in time
+ * or wait to the end of the world :)
*/
ret = lttng_write(relay_conn_pipe[1], &new_conn, sizeof(new_conn));
if (ret < 0) {
PERROR("write connection pipe");
- connection_destroy(new_conn);
+ connection_put(new_conn);
goto error;
}
} while (node != NULL);
@@ -1077,72 +1033,27 @@ error_testpoint:
return NULL;
}
-static void try_close_streams(struct relay_session *session)
-{
- struct ctf_trace *ctf_trace;
- struct lttng_ht_iter iter;
-
- assert(session);
-
- pthread_mutex_lock(&session->viewer_ready_lock);
- rcu_read_lock();
- cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace,
- node.node) {
- struct relay_stream *stream;
-
- /* Close streams. */
- cds_list_for_each_entry(stream, &ctf_trace->stream_list, trace_list) {
- stream_close(session, stream);
- }
-
- ctf_trace->invalid_flag = 1;
- ctf_trace_try_destroy(session, ctf_trace);
- }
- rcu_read_unlock();
- pthread_mutex_unlock(&session->viewer_ready_lock);
-}
-
/*
- * Try to destroy a session within a connection.
+ * Set index data from the control port to a given index object.
*/
-static void destroy_session(struct relay_session *session,
- struct lttng_ht *sessions_ht)
-{
- assert(session);
- assert(sessions_ht);
-
- /* Indicate that this session can be destroyed from now on. */
- session->close_flag = 1;
-
- try_close_streams(session);
-
- /*
- * This will try to delete and destroy the session if no viewer is attached
- * to it meaning the refcount is down to zero.
- */
- session_try_destroy(sessions_ht, session);
-}
-
-/*
- * Copy index data from the control port to a given index object.
- */
-static void copy_index_control_data(struct relay_index *index,
+static int set_index_control_data(struct relay_index *index,
struct lttcomm_relayd_index *data)
{
- assert(index);
- assert(data);
+ struct ctf_packet_index index_data;
/*
- * The index on disk is encoded in big endian, so we don't need to convert
- * the data received on the network. The data_offset value is NEVER
- * modified here and is updated by the data thread.
+ * The index on disk is encoded in big endian, so we don't need
+ * to convert the data received on the network. The data_offset
+ * value is NEVER modified here and is updated by the data
+ * thread.
*/
- index->index_data.packet_size = data->packet_size;
- index->index_data.content_size = data->content_size;
- index->index_data.timestamp_begin = data->timestamp_begin;
- index->index_data.timestamp_end = data->timestamp_end;
- index->index_data.events_discarded = data->events_discarded;
- index->index_data.stream_id = data->stream_id;
+ index_data.packet_size = data->packet_size;
+ index_data.content_size = data->content_size;
+ index_data.timestamp_begin = data->timestamp_begin;
+ index_data.timestamp_end = data->timestamp_end;
+ index_data.events_discarded = data->events_discarded;
+ index_data.stream_id = data->stream_id;
+ return relay_index_set_data(index, &index_data);
}
/*
@@ -1150,31 +1061,22 @@ static void copy_index_control_data(struct relay_index *index,
*
* On success, send back the session id or else return a negative value.
*/
-static
-int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
int ret = 0, send_ret;
struct relay_session *session;
struct lttcomm_relayd_status_session reply;
+ char session_name[NAME_MAX];
+ char hostname[HOST_NAME_MAX];
+ uint32_t live_timer = 0;
+ bool snapshot = false;
- assert(recv_hdr);
- assert(conn);
+ memset(session_name, 0, NAME_MAX);
+ memset(hostname, 0, HOST_NAME_MAX);
memset(&reply, 0, sizeof(reply));
- session = session_create();
- if (!session) {
- ret = -1;
- goto error;
- }
- session->minor = conn->minor;
- session->major = conn->major;
- conn->session_id = session->id;
- conn->session = session;
-
- reply.session_id = htobe64(session->id);
-
switch (conn->minor) {
case 1:
case 2:
@@ -1182,13 +1084,26 @@ int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr,
break;
case 4: /* LTTng sessiond 2.4 */
default:
- ret = cmd_create_session_2_4(conn, session);
+ ret = cmd_create_session_2_4(conn, session_name,
+ hostname, &live_timer, &snapshot);
+ }
+ if (ret < 0) {
+ goto send_reply;
}
- lttng_ht_add_unique_u64(conn->sessions_ht, &session->session_n);
+ session = session_create(session_name, hostname, live_timer,
+ snapshot, conn->major, conn->minor);
+ if (!session) {
+ ret = -1;
+ goto send_reply;
+ }
+ assert(!conn->session);
+ conn->session = session;
DBG("Created session %" PRIu64, session->id);
-error:
+ reply.session_id = htobe64(session->id);
+
+send_reply:
if (ret < 0) {
reply.ret_code = htobe32(LTTNG_ERR_FATAL);
} else {
@@ -1208,47 +1123,48 @@ error:
* When we have received all the streams and the metadata for a channel,
* we make them visible to the viewer threads.
*/
-static
-void set_viewer_ready_flag(struct relay_connection *conn)
+static void publish_connection_local_streams(struct relay_connection *conn)
{
- struct relay_stream *stream, *tmp_stream;
+ struct relay_stream *stream;
+ struct relay_session *session = conn->session;
- pthread_mutex_lock(&conn->session->viewer_ready_lock);
- cds_list_for_each_entry_safe(stream, tmp_stream, &conn->recv_head,
- recv_list) {
- stream->viewer_ready = 1;
- cds_list_del(&stream->recv_list);
+ /*
+ * We publish all streams belonging to a session atomically wrt
+ * session lock.
+ */
+ pthread_mutex_lock(&session->lock);
+ rcu_read_lock();
+ cds_list_for_each_entry_rcu(stream, &session->recv_list,
+ recv_node) {
+ stream_publish(stream);
}
- pthread_mutex_unlock(&conn->session->viewer_ready_lock);
- return;
-}
-
-/*
- * Add a recv handle node to the connection recv list with the given stream
- * handle. A new node is allocated thus must be freed when the node is deleted
- * from the list.
- */
-static void queue_stream(struct relay_stream *stream,
- struct relay_connection *conn)
-{
- assert(conn);
- assert(stream);
+ rcu_read_unlock();
- cds_list_add(&stream->recv_list, &conn->recv_head);
+ /*
+ * Inform the viewer that there are new streams in the session.
+ */
+ if (session->viewer_attached) {
+ uatomic_set(&session->new_streams, 1);
+ }
+ pthread_mutex_unlock(&session->lock);
+ return;
}
/*
* relay_add_stream: allocate a new stream for a session
*/
-static
-int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
- int ret, send_ret;
+ int ret;
+ ssize_t send_ret;
struct relay_session *session = conn->session;
struct relay_stream *stream = NULL;
struct lttcomm_relayd_status_stream reply;
struct ctf_trace *trace = NULL;
+ uint64_t stream_handle = -1ULL;
+ char *path_name = NULL, *channel_name = NULL;
+ uint64_t tracefile_size = 0, tracefile_count = 0;
if (!session || conn->version_check_done == 0) {
ERR("Trying to add a stream before version check");
@@ -1256,107 +1172,46 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
goto end_no_session;
}
- stream = zmalloc(sizeof(struct relay_stream));
- if (stream == NULL) {
- PERROR("relay stream zmalloc");
- ret = -1;
- goto end_no_session;
- }
-
- switch (conn->minor) {
- case 1: /* LTTng sessiond 2.1 */
- ret = cmd_recv_stream_2_1(conn, stream);
+ switch (session->minor) {
+ case 1: /* LTTng sessiond 2.1. Allocates path_name and channel_name. */
+ ret = cmd_recv_stream_2_1(conn, &path_name,
+ &channel_name);
break;
- case 2: /* LTTng sessiond 2.2 */
+ case 2: /* LTTng sessiond 2.2. Allocates path_name and channel_name. */
default:
- ret = cmd_recv_stream_2_2(conn, stream);
+ ret = cmd_recv_stream_2_2(conn, &path_name,
+ &channel_name, &tracefile_size, &tracefile_count);
break;
}
if (ret < 0) {
- goto err_free_stream;
+ goto send_reply;
}
- stream->stream_handle = ++last_relay_stream_id;
- stream->prev_seq = -1ULL;
- stream->session_id = session->id;
- stream->index_fd = -1;
- stream->read_index_fd = -1;
- stream->ctf_stream_id = -1ULL;
- lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
- pthread_mutex_init(&stream->lock, NULL);
-
- ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG);
- if (ret < 0) {
- ERR("relay creating output directory");
- goto err_free_stream;
- }
-
- /*
- * No need to use run_as API here because whatever we receives, the relayd
- * uses its own credentials for the stream files.
- */
- ret = utils_create_stream_file(stream->path_name, stream->channel_name,
- stream->tracefile_size, 0, relayd_uid, relayd_gid, NULL);
- if (ret < 0) {
- ERR("Create output file");
- goto err_free_stream;
- }
- stream->fd = ret;
- if (stream->tracefile_size) {
- DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name);
- } else {
- DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name);
- }
-
- /* Protect access to "trace" */
- rcu_read_lock();
- trace = ctf_trace_find_by_path(session->ctf_traces_ht, stream->path_name);
+ trace = ctf_trace_get_by_path_or_create(session, path_name);
if (!trace) {
- trace = ctf_trace_create(stream->path_name);
- if (!trace) {
- ret = -1;
- goto end;
- }
- ctf_trace_add(session->ctf_traces_ht, trace);
+ goto send_reply;
}
- ctf_trace_get_ref(trace);
+ /* This stream here has one reference on the trace. */
- if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, NAME_MAX)) {
- stream->metadata_flag = 1;
- /* Assign quick reference to the metadata stream in the trace. */
- trace->metadata_stream = stream;
- }
+ pthread_mutex_lock(&last_relay_stream_id_lock);
+ stream_handle = ++last_relay_stream_id;
+ pthread_mutex_unlock(&last_relay_stream_id_lock);
- /*
- * Add the stream in the recv list of the connection. Once the end stream
- * message is received, this list is emptied and streams are set with the
- * viewer ready flag.
- */
- queue_stream(stream, conn);
+ /* We pass ownership of path_name and channel_name. */
+ stream = stream_create(trace, stream_handle, path_name,
+ channel_name, tracefile_size, tracefile_count);
/*
- * Both in the ctf_trace object and the global stream ht since the data
- * side of the relayd does not have the concept of session.
- *
- * rcu_read_lock() is kept to protect the stream which is now part of
- * the relay_streams_ht.
+ * Streams are the owners of their trace. Reference to trace is
+ * kept within stream_create().
*/
- lttng_ht_add_unique_u64(relay_streams_ht, &stream->node);
- cds_list_add_tail(&stream->trace_list, &trace->stream_list);
-
- session->stream_count++;
+ ctf_trace_put(trace);
- DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
- stream->stream_handle);
-
-end:
+send_reply:
memset(&reply, 0, sizeof(reply));
- reply.handle = htobe64(stream->stream_handle);
- /* send the session id to the client or a negative return code on error */
- if (ret < 0) {
+ reply.handle = htobe64(stream_handle);
+ if (!stream) {
reply.ret_code = htobe32(LTTNG_ERR_UNK);
- /* stream was not properly added to the ht, so free it */
- stream_destroy(stream);
} else {
reply.ret_code = htobe32(LTTNG_OK);
}
@@ -1365,29 +1220,17 @@ end:
sizeof(struct lttcomm_relayd_status_stream), 0);
if (send_ret < 0) {
ERR("Relay sending stream id");
- ret = send_ret;
+ ret = (int) send_ret;
}
- /*
- * rcu_read_lock() was held to protect either "trace" OR the "stream" at
- * this point.
- */
- rcu_read_unlock();
- trace = NULL;
- stream = NULL;
end_no_session:
return ret;
-
-err_free_stream:
- stream_destroy(stream);
- return ret;
}
/*
* relay_close_stream: close a specific stream
*/
-static
-int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
int ret, send_ret;
@@ -1417,23 +1260,37 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
goto end_no_session;
}
- rcu_read_lock();
- stream = stream_find_by_id(relay_streams_ht,
- be64toh(stream_info.stream_id));
+ stream = stream_get_by_id(be64toh(stream_info.stream_id));
if (!stream) {
ret = -1;
- goto end_unlock;
+ goto end;
}
-
+ pthread_mutex_lock(&stream->lock);
+ stream->closed = true;
stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
- stream->close_flag = 1;
- session->stream_count--;
+ if (stream->is_metadata) {
+ struct relay_viewer_stream *vstream;
+ bool close_metadata = false;
- /* Check if we can close it or else the data will do it. */
- try_close_stream(session, stream);
+ vstream = viewer_stream_get_by_id(stream->stream_handle);
+ if (vstream) {
+ pthread_mutex_lock(&vstream->lock);
+ if (vstream->metadata_sent == stream->metadata_received) {
+ close_metadata = true;
+ ERR("main putting ownership of vstream %" PRIu64,
+ vstream->stream->stream_handle);
+ }
+ pthread_mutex_unlock(&vstream->lock);
+ viewer_stream_put(vstream);
+ }
+ if (close_metadata) {
+ viewer_stream_put(vstream);
+ }
+ }
+ pthread_mutex_unlock(&stream->lock);
+ stream_put(stream);
-end_unlock:
- rcu_read_unlock();
+end:
memset(&reply, 0, sizeof(reply));
if (ret < 0) {
@@ -1455,8 +1312,7 @@ end_no_session:
/*
* relay_unknown_command: send -1 if received unknown command
*/
-static
-void relay_unknown_command(struct relay_connection *conn)
+static void relay_unknown_command(struct relay_connection *conn)
{
struct lttcomm_relayd_generic_reply reply;
int ret;
@@ -1474,8 +1330,7 @@ void relay_unknown_command(struct relay_connection *conn)
* relay_start: send an acknowledgment to the client to tell if we are
* ready to receive data. We are ready if a session is established.
*/
-static
-int relay_start(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_start(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
int ret = htobe32(LTTNG_OK);
@@ -1529,10 +1384,9 @@ end:
}
/*
- * relay_recv_metadata: receive the metada for the session.
+ * relay_recv_metadata: receive the metadata for the session.
*/
-static
-int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
int ret = htobe32(LTTNG_OK);
@@ -1541,7 +1395,6 @@ int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
struct lttcomm_relayd_metadata_payload *metadata_struct;
struct relay_stream *metadata_stream;
uint64_t data_size, payload_size;
- struct ctf_trace *ctf_trace;
if (!session) {
ERR("Metadata sent before version check");
@@ -1586,38 +1439,37 @@ int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
}
metadata_struct = (struct lttcomm_relayd_metadata_payload *) data_buffer;
- rcu_read_lock();
- metadata_stream = stream_find_by_id(relay_streams_ht,
- be64toh(metadata_struct->stream_id));
+ metadata_stream = stream_get_by_id(be64toh(metadata_struct->stream_id));
if (!metadata_stream) {
ret = -1;
- goto end_unlock;
+ goto end;
}
- size_ret = lttng_write(metadata_stream->fd, metadata_struct->payload,
+ pthread_mutex_lock(&metadata_stream->lock);
+
+ size_ret = lttng_write(metadata_stream->stream_fd->fd, metadata_struct->payload,
payload_size);
if (size_ret < payload_size) {
ERR("Relay error writing metadata on file");
ret = -1;
- goto end_unlock;
+ goto end_put;
}
- ret = write_padding_to_file(metadata_stream->fd,
+ ret = write_padding_to_file(metadata_stream->stream_fd->fd,
be32toh(metadata_struct->padding_size));
if (ret < 0) {
- goto end_unlock;
+ goto end_put;
}
- ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
- metadata_stream->path_name);
- assert(ctf_trace);
- ctf_trace->metadata_received +=
+ metadata_stream->metadata_received +=
payload_size + be32toh(metadata_struct->padding_size);
+ DBG2("Relay metadata written. Updated metadata_received %" PRIu64,
+ metadata_stream->metadata_received);
- DBG2("Relay metadata written");
+end_put:
+ pthread_mutex_unlock(&metadata_stream->lock);
+ stream_put(metadata_stream);
-end_unlock:
- rcu_read_unlock();
end:
return ret;
}
@@ -1625,15 +1477,12 @@ end:
/*
* relay_send_version: send relayd version number
*/
-static
-int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
int ret;
struct lttcomm_relayd_version reply, msg;
- assert(conn);
-
conn->version_check_done = 1;
/* Get version from the other side. */
@@ -1657,7 +1506,7 @@ int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
if (reply.major != be32toh(msg.major)) {
DBG("Incompatible major versions (%u vs %u), deleting session",
reply.major, be32toh(msg.major));
- destroy_session(conn->session, conn->sessions_ht);
+ connection_put(conn);
ret = 0;
goto end;
}
@@ -1688,8 +1537,7 @@ end:
/*
* Check for data pending for a given stream id from the session daemon.
*/
-static
-int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
struct relay_session *session = conn->session;
@@ -1723,13 +1571,14 @@ int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
stream_id = be64toh(msg.stream_id);
last_net_seq_num = be64toh(msg.last_net_seq_num);
- rcu_read_lock();
- stream = stream_find_by_id(relay_streams_ht, stream_id);
+ stream = stream_get_by_id(stream_id);
if (stream == NULL) {
ret = -1;
- goto end_unlock;
+ goto end;
}
+ pthread_mutex_lock(&stream->lock);
+
DBG("Data pending for stream id %" PRIu64 " prev_seq %" PRIu64
" and last_seq %" PRIu64, stream_id, stream->prev_seq,
last_net_seq_num);
@@ -1743,11 +1592,11 @@ int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
ret = 1;
}
- /* Pending check is now done. */
- stream->data_pending_check_done = 1;
+ stream->data_pending_check_done = true;
+ pthread_mutex_unlock(&stream->lock);
-end_unlock:
- rcu_read_unlock();
+ stream_put(stream);
+end:
memset(&reply, 0, sizeof(reply));
reply.ret_code = htobe32(ret);
@@ -1763,18 +1612,17 @@ end_no_session:
/*
* Wait for the control socket to reach a quiescent state.
*
- * Note that for now, when receiving this command from the session daemon, this
- * means that every subsequent commands or data received on the control socket
- * has been handled. So, this is why we simply return OK here.
+ * Note that for now, when receiving this command from the session
+ * daemon, this means that every subsequent commands or data received on
+ * the control socket has been handled. So, this is why we simply return
+ * OK here.
*/
-static
-int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
int ret;
uint64_t stream_id;
struct relay_stream *stream;
- struct lttng_ht_iter iter;
struct lttcomm_relayd_quiescent_control msg;
struct lttcomm_relayd_generic_reply reply;
@@ -1800,19 +1648,16 @@ int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr,
}
stream_id = be64toh(msg.stream_id);
-
- rcu_read_lock();
- cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
- node.node) {
- if (stream->stream_handle == stream_id) {
- stream->data_pending_check_done = 1;
- DBG("Relay quiescent control pending flag set to %" PRIu64,
- stream_id);
- break;
- }
- }
- rcu_read_unlock();
-
+ stream = stream_get_by_id(stream_id);
+ if (!stream) {
+ goto reply;
+ }
+ pthread_mutex_lock(&stream->lock);
+ stream->data_pending_check_done = true;
+ pthread_mutex_unlock(&stream->lock);
+ DBG("Relay quiescent control pending flag set to %" PRIu64, stream_id);
+ stream_put(stream);
+reply:
memset(&reply, 0, sizeof(reply));
reply.ret_code = htobe32(LTTNG_OK);
ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
@@ -1825,14 +1670,14 @@ end_no_session:
}
/*
- * Initialize a data pending command. This means that a client is about to ask
- * for data pending for each stream he/she holds. Simply iterate over all
- * streams of a session and set the data_pending_check_done flag.
+ * Initialize a data pending command. This means that a client is about
+ * to ask for data pending for each stream he/she holds. Simply iterate
+ * over all streams of a session and set the data_pending_check_done
+ * flag.
*
* This command returns to the client a LTTNG_OK code.
*/
-static
-int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
int ret;
@@ -1869,18 +1714,25 @@ int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
session_id = be64toh(msg.session_id);
/*
- * Iterate over all streams to set the begin data pending flag. For now, the
- * streams are indexed by stream handle so we have to iterate over all
- * streams to find the one associated with the right session_id.
+ * Iterate over all streams to set the begin data pending flag.
+ * For now, the streams are indexed by stream handle so we have
+ * to iterate over all streams to find the one associated with
+ * the right session_id.
*/
rcu_read_lock();
cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
node.node) {
- if (stream->session_id == session_id) {
- stream->data_pending_check_done = 0;
+ if (!stream_get(stream)) {
+ continue;
+ }
+ if (stream->trace->session->id == session_id) {
+ pthread_mutex_lock(&stream->lock);
+ stream->data_pending_check_done = false;
+ pthread_mutex_unlock(&stream->lock);
DBG("Set begin data pending flag to stream %" PRIu64,
stream->stream_handle);
}
+ stream_put(stream);
}
rcu_read_unlock();
@@ -1898,16 +1750,15 @@ end_no_session:
}
/*
- * End data pending command. This will check, for a given session id, if each
- * stream associated with it has its data_pending_check_done flag set. If not,
- * this means that the client lost track of the stream but the data is still
- * being streamed on our side. In this case, we inform the client that data is
- * inflight.
+ * End data pending command. This will check, for a given session id, if
+ * each stream associated with it has its data_pending_check_done flag
+ * set. If not, this means that the client lost track of the stream but
+ * the data is still being streamed on our side. In this case, we inform
+ * the client that data is inflight.
*
* Return to the client if there is data in flight or not with a ret_code.
*/
-static
-int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
int ret;
@@ -1918,9 +1769,6 @@ int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
uint64_t session_id;
uint32_t is_data_inflight = 0;
- assert(recv_hdr);
- assert(conn);
-
DBG("End data pending command");
if (!conn->session || conn->version_check_done == 0) {
@@ -1944,17 +1792,33 @@ int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
session_id = be64toh(msg.session_id);
- /* Iterate over all streams to see if the begin data pending flag is set. */
+ /*
+ * Iterate over all streams to see if the begin data pending
+ * flag is set.
+ */
rcu_read_lock();
cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
node.node) {
- if (stream->session_id == session_id &&
- !stream->data_pending_check_done && !stream->terminated_flag) {
- is_data_inflight = 1;
- DBG("Data is still in flight for stream %" PRIu64,
- stream->stream_handle);
- break;
+ if (!stream_get(stream)) {
+ continue;
+ }
+ if (stream->trace->session->id != session_id) {
+ stream_put(stream);
+ continue;
+ }
+ pthread_mutex_lock(&stream->lock);
+ if (!stream->data_pending_check_done) {
+ if (!stream->closed || !(((int64_t) (stream->prev_seq - stream->last_net_seq_num)) >= 0)) {
+ is_data_inflight = 1;
+ DBG("Data is still in flight for stream %" PRIu64,
+ stream->stream_handle);
+ pthread_mutex_unlock(&stream->lock);
+ stream_put(stream);
+ break;
+ }
}
+ pthread_mutex_unlock(&stream->lock);
+ stream_put(stream);
}
rcu_read_unlock();
@@ -1976,14 +1840,13 @@ end_no_session:
*
* Return 0 on success else a negative value.
*/
-static
-int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
- int ret, send_ret, index_created = 0;
+ int ret, send_ret;
struct relay_session *session = conn->session;
struct lttcomm_relayd_index index_info;
- struct relay_index *index, *wr_index = NULL;
+ struct relay_index *index;
struct lttcomm_relayd_generic_reply reply;
struct relay_stream *stream;
uint64_t net_seq_num;
@@ -2013,76 +1876,66 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
net_seq_num = be64toh(index_info.net_seq_num);
- rcu_read_lock();
- stream = stream_find_by_id(relay_streams_ht,
- be64toh(index_info.relay_stream_id));
+ stream = stream_get_by_id(be64toh(index_info.relay_stream_id));
if (!stream) {
+ ERR("stream_get_by_id not found");
ret = -1;
- goto end_rcu_unlock;
+ goto end;
}
+ pthread_mutex_lock(&stream->lock);
/* Live beacon handling */
if (index_info.packet_size == 0) {
- DBG("Received live beacon for stream %" PRIu64, stream->stream_handle);
+ DBG("Received live beacon for stream %" PRIu64,
+ stream->stream_handle);
/*
- * Only flag a stream inactive when it has already received data
- * and no indexes are in flight.
+ * Only flag a stream inactive when it has already
+ * received data and no indexes are in flight.
*/
- if (stream->total_index_received > 0 && stream->indexes_in_flight == 0) {
- stream->beacon_ts_end = be64toh(index_info.timestamp_end);
+ if (stream->total_index_received > 0
+ && stream->indexes_in_flight == 0) {
+ stream->beacon_ts_end =
+ be64toh(index_info.timestamp_end);
}
ret = 0;
- goto end_rcu_unlock;
+ goto end_stream_put;
} else {
stream->beacon_ts_end = -1ULL;
}
- index = relay_index_find(stream->stream_handle, net_seq_num);
- if (!index) {
- /* A successful creation will add the object to the HT. */
- index = relay_index_create(stream->stream_handle, net_seq_num);
- if (!index) {
- goto end_rcu_unlock;
- }
- index_created = 1;
- stream->indexes_in_flight++;
- }
-
- copy_index_control_data(index, &index_info);
if (stream->ctf_stream_id == -1ULL) {
stream->ctf_stream_id = be64toh(index_info.stream_id);
}
-
- if (index_created) {
- /*
- * Try to add the relay index object to the hash table. If an object
- * already exist, destroy back the index created, set the data in this
- * object and write it on disk.
- */
- relay_index_add(index, &wr_index);
- if (wr_index) {
- copy_index_control_data(wr_index, &index_info);
- free(index);
- }
- } else {
- /* The index already exists so write it on disk. */
- wr_index = index;
+ index = relay_index_get_by_id_or_create(stream, net_seq_num);
+ if (!index) {
+ ret = -1;
+ ERR("relay_index_get_by_id_or_create index NULL");
+ goto end_stream_put;
}
-
- /* Do we have a writable ready index to write on disk. */
- if (wr_index) {
- ret = relay_index_write(wr_index->fd, wr_index);
- if (ret < 0) {
- goto end_rcu_unlock;
- }
+ if (set_index_control_data(index, &index_info)) {
+ ERR("set_index_control_data error");
+ relay_index_put(index);
+ ret = -1;
+ goto end_stream_put;
+ }
+ ret = relay_index_try_flush(index);
+ if (ret == 0) {
stream->total_index_received++;
- stream->indexes_in_flight--;
- assert(stream->indexes_in_flight >= 0);
+ } else if (ret > 0) {
+ /* no flush. */
+ ret = 0;
+ } else {
+ ERR("relay_index_try_flush error %d", ret);
+ relay_index_put(index);
+ ret = -1;
}
-end_rcu_unlock:
- rcu_read_unlock();
+end_stream_put:
+ pthread_mutex_unlock(&stream->lock);
+ stream_put(stream);
+
+end:
memset(&reply, 0, sizeof(reply));
if (ret < 0) {
@@ -2105,8 +1958,7 @@ end_no_session:
*
* Return 0 on success else a negative value.
*/
-static
-int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
int ret, send_ret;
@@ -2123,17 +1975,10 @@ int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr,
}
/*
- * Flag every pending stream in the connection recv list that they are
- * ready to be used by the viewer.
- */
- set_viewer_ready_flag(conn);
-
- /*
- * Inform the viewer that there are new streams in the session.
+ * Publish every pending stream in the connection recv list that
+ * they are ready to be used by the viewer.
*/
- if (conn->session->viewer_refcount) {
- uatomic_set(&conn->session->new_streams, 1);
- }
+ publish_connection_local_streams(conn);
memset(&reply, 0, sizeof(reply));
reply.ret_code = htobe32(LTTNG_OK);
@@ -2153,8 +1998,7 @@ end_no_session:
/*
* Process the commands received on the control socket
*/
-static
-int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
int ret = 0;
@@ -2211,93 +2055,91 @@ end:
/*
* Handle index for a data stream.
*
- * RCU read side lock MUST be acquired.
+ * Called with the stream lock held.
*
* Return 0 on success else a negative value.
*/
static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
int rotate_index)
{
- int ret = 0, index_created = 0;
- uint64_t stream_id, data_offset;
- struct relay_index *index, *wr_index = NULL;
-
- assert(stream);
+ int ret = 0;
+ uint64_t data_offset;
+ struct relay_index *index;
- stream_id = stream->stream_handle;
/* Get data offset because we are about to update the index. */
data_offset = htobe64(stream->tracefile_size_current);
+ DBG("handle_index_data: stream %" PRIu64 " data offset %" PRIu64,
+ stream->stream_handle, stream->tracefile_size_current);
+
/*
- * Lookup for an existing index for that stream id/sequence number. If on
- * exists, the control thread already received the data for it thus we need
- * to write it on disk.
+ * Lookup for an existing index for that stream id/sequence
+ * number. If on exists, the control thread already received the
+ * data for it thus we need to write it on disk.
*/
- index = relay_index_find(stream_id, net_seq_num);
+ index = relay_index_get_by_id_or_create(stream, net_seq_num);
if (!index) {
- /* A successful creation will add the object to the HT. */
- index = relay_index_create(stream_id, net_seq_num);
- if (!index) {
- ret = -1;
- goto error;
- }
- index_created = 1;
- stream->indexes_in_flight++;
+ ret = -1;
+ goto end;
}
- if (rotate_index || stream->index_fd < 0) {
- index->to_close_fd = stream->index_fd;
- ret = index_create_file(stream->path_name, stream->channel_name,
+ if (rotate_index || !stream->index_fd) {
+ int fd;
+
+ /* Put ref on previous index_fd. */
+ if (stream->index_fd) {
+ stream_fd_put(stream->index_fd);
+ stream->index_fd = NULL;
+ }
+
+ fd = index_create_file(stream->path_name, stream->channel_name,
relayd_uid, relayd_gid, stream->tracefile_size,
- stream->tracefile_count_current);
- if (ret < 0) {
- /* This will close the stream's index fd if one. */
- relay_index_free_safe(index);
- goto error;
+ stream->current_tracefile_id);
+ if (fd < 0) {
+ ret = -1;
+ /* Put self-ref for this index due to error. */
+ relay_index_put(index);
+ goto end;
+ }
+ stream->index_fd = stream_fd_create(fd);
+ if (!stream->index_fd) {
+ ret = -1;
+ if (close(fd)) {
+ PERROR("Error closing FD %d", fd);
+ }
+ /* Put self-ref for this index due to error. */
+ relay_index_put(index);
+ /* Will put the local ref. */
+ goto end;
}
- stream->index_fd = ret;
}
- index->fd = stream->index_fd;
- index->index_data.offset = data_offset;
- if (index_created) {
- /*
- * Try to add the relay index object to the hash table. If an object
- * already exist, destroy back the index created and set the data.
- */
- relay_index_add(index, &wr_index);
- if (wr_index) {
- /* Copy back data from the created index. */
- wr_index->fd = index->fd;
- wr_index->to_close_fd = index->to_close_fd;
- wr_index->index_data.offset = data_offset;
- free(index);
- }
- } else {
- /* The index already exists so write it on disk. */
- wr_index = index;
+ if (relay_index_set_fd(index, stream->index_fd, data_offset)) {
+ ret = -1;
+ /* Put self-ref for this index due to error. */
+ relay_index_put(index);
+ goto end;
}
- /* Do we have a writable ready index to write on disk. */
- if (wr_index) {
- ret = relay_index_write(wr_index->fd, wr_index);
- if (ret < 0) {
- goto error;
- }
+ ret = relay_index_try_flush(index);
+ if (ret == 0) {
stream->total_index_received++;
- stream->indexes_in_flight--;
- assert(stream->indexes_in_flight >= 0);
+ } else if (ret > 0) {
+ /* No flush. */
+ ret = 0;
+ } else {
+ /* Put self-ref for this index due to error. */
+ relay_index_put(index);
+ ret = -1;
}
-
-error:
+end:
return ret;
}
/*
* relay_process_data: Process the data received on the data socket
*/
-static
-int relay_process_data(struct relay_connection *conn)
+static int relay_process_data(struct relay_connection *conn)
{
int ret = 0, rotate_index = 0;
ssize_t size_ret;
@@ -2308,8 +2150,6 @@ int relay_process_data(struct relay_connection *conn)
uint32_t data_size;
struct relay_session *session;
- assert(conn);
-
ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr,
sizeof(struct lttcomm_relayd_data_hdr), 0);
if (ret <= 0) {
@@ -2324,17 +2164,12 @@ int relay_process_data(struct relay_connection *conn)
}
stream_id = be64toh(data_hdr.stream_id);
-
- rcu_read_lock();
- stream = stream_find_by_id(relay_streams_ht, stream_id);
+ stream = stream_get_by_id(stream_id);
if (!stream) {
ret = -1;
- goto end_rcu_unlock;
+ goto end;
}
-
- session = session_find_by_id(conn->sessions_ht, stream->session_id);
- assert(session);
-
+ session = stream->trace->session;
data_size = be32toh(data_hdr.data_size);
if (data_buffer_size < data_size) {
char *tmp_data_ptr;
@@ -2344,7 +2179,7 @@ int relay_process_data(struct relay_connection *conn)
ERR("Allocating data buffer");
free(data_buffer);
ret = -1;
- goto end_rcu_unlock;
+ goto end_stream_put;
}
data_buffer = tmp_data_ptr;
data_buffer_size = data_size;
@@ -2362,9 +2197,11 @@ int relay_process_data(struct relay_connection *conn)
DBG("Socket %d did an orderly shutdown", conn->sock->fd);
}
ret = -1;
- goto end_rcu_unlock;
+ goto end_stream_put;
}
+ pthread_mutex_lock(&stream->lock);
+
/* Check if a rotation is needed. */
if (stream->tracefile_size > 0 &&
(stream->tracefile_size_current + data_size) >
@@ -2372,104 +2209,84 @@ int relay_process_data(struct relay_connection *conn)
struct relay_viewer_stream *vstream;
uint64_t new_id;
- new_id = (stream->tracefile_count_current + 1) %
+ new_id = (stream->current_tracefile_id + 1) %
stream->tracefile_count;
/*
- * When we wrap-around back to 0, we start overwriting old
- * trace data.
+ * Move viewer oldest available data position forward if
+ * we are overwriting a tracefile.
*/
- if (!stream->tracefile_overwrite && new_id == 0) {
- stream->tracefile_overwrite = 1;
- }
- pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
- if (stream->tracefile_overwrite) {
+ if (new_id == stream->oldest_tracefile_id) {
stream->oldest_tracefile_id =
(stream->oldest_tracefile_id + 1) %
stream->tracefile_count;
}
- vstream = viewer_stream_find_by_id(stream->stream_handle);
+ vstream = viewer_stream_get_by_id(stream->stream_handle);
if (vstream) {
- /*
- * The viewer is reading a file about to be
- * overwritten. Close the FDs it is
- * currently using and let it handle the fault.
- */
- if (vstream->tracefile_count_current == new_id) {
- pthread_mutex_lock(&vstream->overwrite_lock);
- vstream->abort_flag = 1;
- pthread_mutex_unlock(&vstream->overwrite_lock);
- DBG("Streaming side setting abort_flag on stream %s_%" PRIu64 "\n",
- stream->channel_name, new_id);
- } else if (vstream->tracefile_count_current ==
- stream->tracefile_count_current) {
- /*
- * The reader and writer were in the
- * same trace file, inform the viewer
- * that no new index will ever be added
- * to this file.
- */
- vstream->close_write_flag = 1;
+ pthread_mutex_lock(&vstream->lock);
+ ret = utils_rotate_stream_file(stream->path_name,
+ stream->channel_name, stream->tracefile_size,
+ stream->tracefile_count, relayd_uid,
+ relayd_gid, stream->stream_fd->fd,
+ &stream->current_tracefile_id,
+ &stream->stream_fd->fd);
+ pthread_mutex_unlock(&vstream->lock);
+ viewer_stream_put(vstream);
+ if (ret < 0) {
+ ERR("Rotating stream output file");
+ goto end_stream_unlock;
}
}
- ret = utils_rotate_stream_file(stream->path_name, stream->channel_name,
- stream->tracefile_size, stream->tracefile_count,
- relayd_uid, relayd_gid, stream->fd,
- &(stream->tracefile_count_current), &stream->fd);
- pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
- if (ret < 0) {
- ERR("Rotating stream output file");
- goto end_rcu_unlock;
- }
- /* Reset current size because we just perform a stream rotation. */
+ /*
+ * Reset current size because we just performed a stream
+ * rotation.
+ */
stream->tracefile_size_current = 0;
rotate_index = 1;
}
/*
- * Index are handled in protocol version 2.4 and above. Also, snapshot and
- * index are NOT supported.
+ * Index are handled in protocol version 2.4 and above. Also,
+ * snapshot and index are NOT supported.
*/
if (session->minor >= 4 && !session->snapshot) {
ret = handle_index_data(stream, net_seq_num, rotate_index);
if (ret < 0) {
- goto end_rcu_unlock;
+ goto end_stream_unlock;
}
}
/* Write data to stream output fd. */
- size_ret = lttng_write(stream->fd, data_buffer, data_size);
+ size_ret = lttng_write(stream->stream_fd->fd, data_buffer, data_size);
if (size_ret < data_size) {
ERR("Relay error writing data to file");
ret = -1;
- goto end_rcu_unlock;
+ goto end_stream_unlock;
}
- DBG2("Relay wrote %d bytes to tracefile for stream id %" PRIu64,
- ret, stream->stream_handle);
+ DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64,
+ size_ret, stream->stream_handle);
- ret = write_padding_to_file(stream->fd, be32toh(data_hdr.padding_size));
+ ret = write_padding_to_file(stream->stream_fd->fd,
+ be32toh(data_hdr.padding_size));
if (ret < 0) {
- goto end_rcu_unlock;
+ goto end_stream_unlock;
}
- stream->tracefile_size_current += data_size + be32toh(data_hdr.padding_size);
-
+ stream->tracefile_size_current +=
+ data_size + be32toh(data_hdr.padding_size);
stream->prev_seq = net_seq_num;
- try_close_stream(session, stream);
-
-end_rcu_unlock:
- rcu_read_unlock();
+end_stream_unlock:
+ pthread_mutex_unlock(&stream->lock);
+end_stream_put:
+ stream_put(stream);
end:
return ret;
}
-static
-void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
+static void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
{
int ret;
- assert(events);
-
(void) lttng_poll_del(events, pollfd);
ret = close(pollfd);
@@ -2478,27 +2295,36 @@ void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
}
}
-static void destroy_connection(struct lttng_ht *relay_connections_ht,
- struct relay_connection *conn)
+static void relay_thread_close_connection(struct lttng_poll_event *events,
+ int pollfd, struct relay_connection *conn)
{
- assert(relay_connections_ht);
- assert(conn);
+ const char *type_str;
- connection_delete(relay_connections_ht, conn);
-
- /* For the control socket, we try to destroy the session. */
- if (conn->type == RELAY_CONTROL && conn->session) {
- destroy_session(conn->session, conn->sessions_ht);
+ switch (conn->type) {
+ case RELAY_DATA:
+ type_str = "Data";
+ break;
+ case RELAY_CONTROL:
+ type_str = "Control";
+ break;
+ case RELAY_VIEWER_COMMAND:
+ type_str = "Viewer Command";
+ break;
+ case RELAY_VIEWER_NOTIFICATION:
+ type_str = "Viewer Notification";
+ break;
+ default:
+ type_str = "Unknown";
}
-
- connection_destroy(conn);
+ cleanup_connection_pollfd(events, pollfd);
+ connection_put(conn);
+ DBG("%s connection closed with %d", type_str, pollfd);
}
/*
* This thread does the actual work
*/
-static
-void *relay_thread_worker(void *data)
+static void *relay_thread_worker(void *data)
{
int ret, err = -1, last_seen_data_fd = -1;
uint32_t nb_fd;
@@ -2506,9 +2332,6 @@ void *relay_thread_worker(void *data)
struct lttng_ht *relay_connections_ht;
struct lttng_ht_iter iter;
struct lttcomm_relayd_hdr recv_hdr;
- struct relay_local_data *relay_ctx = (struct relay_local_data *) data;
- struct lttng_ht *sessions_ht = relay_ctx->sessions_ht;
- struct relay_index *index;
struct relay_connection *destroy_conn = NULL;
DBG("[thread] Relay worker started");
@@ -2529,12 +2352,6 @@ void *relay_thread_worker(void *data)
goto relay_connections_ht_error;
}
- /* Tables of received indexes indexed by index handle and net_seq_num. */
- indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_TWO_U64);
- if (!indexes_ht) {
- goto indexes_ht_error;
- }
-
ret = create_thread_poll_set(&events, 2);
if (ret < 0) {
goto error_poll_create;
@@ -2569,9 +2386,9 @@ restart:
nb_fd = ret;
/*
- * Process control. The control connection is prioritised so we
- * don't starve it with high throughput tracing data on the data
- * connection.
+ * Process control. The control connection is
+ * prioritised so we don't starve it with high
+ * throughput tracing data on the data connection.
*/
for (i = 0; i < nb_fd; i++) {
/* Fetch once the poll data */
@@ -2581,7 +2398,10 @@ restart:
health_code_update();
if (!revents) {
- /* No activity for this FD (poll implementation). */
+ /*
+ * No activity for this FD (poll
+ * implementation).
+ */
continue;
}
@@ -2604,27 +2424,20 @@ restart:
if (ret < 0) {
goto error;
}
- conn->sessions_ht = sessions_ht;
- connection_init(conn);
lttng_poll_add(&events, conn->sock->fd,
LPOLLIN | LPOLLRDHUP);
- rcu_read_lock();
- lttng_ht_add_unique_ulong(relay_connections_ht,
- &conn->sock_n);
- rcu_read_unlock();
+ connection_ht_add(relay_connections_ht, conn);
DBG("Connection socket %d added", conn->sock->fd);
}
} else {
struct relay_connection *ctrl_conn;
- rcu_read_lock();
- ctrl_conn = connection_find_by_sock(relay_connections_ht, pollfd);
+ ctrl_conn = connection_get_by_sock(relay_connections_ht, pollfd);
/* If not found, there is a synchronization issue. */
assert(ctrl_conn);
if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
- cleanup_connection_pollfd(&events, pollfd);
- destroy_connection(relay_connections_ht, ctrl_conn);
+ relay_thread_close_connection(&events, pollfd, ctrl_conn);
if (last_seen_data_fd == pollfd) {
last_seen_data_fd = last_notdel_data_fd;
}
@@ -2634,16 +2447,14 @@ restart:
sizeof(recv_hdr), 0);
if (ret <= 0) {
/* Connection closed */
- cleanup_connection_pollfd(&events, pollfd);
- destroy_connection(relay_connections_ht, ctrl_conn);
- DBG("Control connection closed with %d", pollfd);
+ relay_thread_close_connection(&events, pollfd,
+ ctrl_conn);
} else {
ret = relay_process_control(&recv_hdr, ctrl_conn);
if (ret < 0) {
/* Clear the session on error. */
- cleanup_connection_pollfd(&events, pollfd);
- destroy_connection(relay_connections_ht, ctrl_conn);
- DBG("Connection closed with %d", pollfd);
+ relay_thread_close_connection(&events, pollfd,
+ ctrl_conn);
}
seen_control = 1;
}
@@ -2658,7 +2469,7 @@ restart:
} else {
ERR("Unknown poll events %u for sock %d", revents, pollfd);
}
- rcu_read_unlock();
+ connection_put(ctrl_conn);
}
}
@@ -2702,26 +2513,22 @@ restart:
continue;
}
- rcu_read_lock();
- data_conn = connection_find_by_sock(relay_connections_ht, pollfd);
+ data_conn = connection_get_by_sock(relay_connections_ht, pollfd);
if (!data_conn) {
/* Skip it. Might be removed before. */
- rcu_read_unlock();
continue;
}
if (revents & LPOLLIN) {
if (data_conn->type != RELAY_DATA) {
- rcu_read_unlock();
- continue;
+ goto put_connection;
}
ret = relay_process_data(data_conn);
/* Connection closed */
if (ret < 0) {
- cleanup_connection_pollfd(&events, pollfd);
- destroy_connection(relay_connections_ht, data_conn);
- DBG("Data connection closed with %d", pollfd);
+ relay_thread_close_connection(&events, pollfd,
+ data_conn);
/*
* Every goto restart call sets the last seen fd where
* here we don't really care since we gracefully
@@ -2730,11 +2537,12 @@ restart:
} else {
/* Keep last seen port. */
last_seen_data_fd = pollfd;
- rcu_read_unlock();
+ connection_put(data_conn);
goto restart;
}
}
- rcu_read_unlock();
+ put_connection:
+ connection_put(data_conn);
}
last_seen_data_fd = -1;
}
@@ -2744,28 +2552,24 @@ restart:
exit:
error:
- lttng_poll_clean(&events);
-
/* Cleanup reamaining connection object. */
rcu_read_lock();
cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter,
destroy_conn,
sock_n.node) {
health_code_update();
- destroy_connection(relay_connections_ht, destroy_conn);
+ /*
+ * No need to grab another ref, because we own
+ * destroy_conn.
+ */
+ relay_thread_close_connection(&events, destroy_conn->sock->fd,
+ destroy_conn);
}
rcu_read_unlock();
+
+ lttng_poll_clean(&events);
+
error_poll_create:
- rcu_read_lock();
- cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index,
- index_n.node) {
- health_code_update();
- relay_index_delete(index);
- relay_index_free_safe(index);
- }
- rcu_read_unlock();
- lttng_ht_destroy(indexes_ht);
-indexes_ht_error:
lttng_ht_destroy(relay_connections_ht);
relay_connections_ht_error:
/* Close relay conn pipes */
@@ -2806,7 +2610,6 @@ int main(int argc, char **argv)
{
int ret = 0, retval = 0;
void *status;
- struct relay_local_data *relay_ctx = NULL;
/* Parse arguments */
progname = argv[0];
@@ -2904,16 +2707,9 @@ int main(int argc, char **argv)
lttcomm_init();
lttcomm_inet_init();
- relay_ctx = zmalloc(sizeof(struct relay_local_data));
- if (!relay_ctx) {
- PERROR("relay_ctx");
- retval = -1;
- goto exit_init_data;
- }
-
/* tables of sessions indexed by session ID */
- relay_ctx->sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
- if (!relay_ctx->sessions_ht) {
+ sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!sessions_ht) {
retval = -1;
goto exit_init_data;
}
@@ -2960,7 +2756,7 @@ int main(int argc, char **argv)
/* Setup the worker thread */
ret = pthread_create(&worker_thread, NULL,
- relay_thread_worker, (void *) relay_ctx);
+ relay_thread_worker, NULL);
if (ret) {
errno = ret;
PERROR("pthread_create worker");
@@ -2978,7 +2774,7 @@ int main(int argc, char **argv)
goto exit_listener_thread;
}
- ret = relayd_live_create(live_uri, relay_ctx);
+ ret = relayd_live_create(live_uri);
if (ret) {
ERR("Starting live viewer threads");
retval = -1;
@@ -3035,7 +2831,10 @@ exit_init_data:
health_app_destroy(health_relayd);
exit_health_app_create:
exit_options:
- relayd_cleanup(relay_ctx);
+ relayd_cleanup();
+
+ /* Ensure all prior call_rcu are done. */
+ rcu_barrier();
if (!retval) {
exit(EXIT_SUCCESS);
diff --git a/src/bin/lttng-relayd/session.c b/src/bin/lttng-relayd/session.c
index 46d9cc6..de00fc3 100644
--- a/src/bin/lttng-relayd/session.c
+++ b/src/bin/lttng-relayd/session.c
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -19,28 +20,25 @@
#define _GNU_SOURCE
#define _LGPL_SOURCE
#include <common/common.h>
+#include <urcu/rculist.h>
+#include "lttng-relayd.h"
#include "ctf-trace.h"
#include "session.h"
#include "stream.h"
/* Global session id used in the session creation. */
static uint64_t last_relay_session_id;
-
-static void rcu_destroy_session(struct rcu_head *head)
-{
- struct relay_session *session =
- caa_container_of(head, struct relay_session, rcu_node);
-
- free(session);
-}
+static pthread_mutex_t last_relay_session_id_lock = PTHREAD_MUTEX_INITIALIZER;
/*
* Create a new session by assigning a new session ID.
*
* Return allocated session or else NULL.
*/
-struct relay_session *session_create(void)
+struct relay_session *session_create(const char *session_name,
+ const char *hostname, uint32_t live_timer,
+ bool snapshot, uint32_t major, uint32_t minor)
{
struct relay_session *session;
@@ -57,30 +55,62 @@ struct relay_session *session_create(void)
goto error;
}
- pthread_mutex_init(&session->viewer_ready_lock, NULL);
+ pthread_mutex_lock(&last_relay_session_id_lock);
session->id = ++last_relay_session_id;
+ pthread_mutex_unlock(&last_relay_session_id_lock);
+
+ session->major = major;
+ session->minor = minor;
lttng_ht_node_init_u64(&session->session_n, session->id);
+ urcu_ref_init(&session->ref);
+ CDS_INIT_LIST_HEAD(&session->recv_list);
+ pthread_mutex_init(&session->lock, NULL);
+ pthread_mutex_init(&session->reflock, NULL);
+ pthread_mutex_init(&session->recv_list_lock, NULL);
+
+ strncpy(session->session_name, session_name,
+ sizeof(session->session_name));
+ strncpy(session->hostname, hostname,
+ sizeof(session->hostname));
+ session->live_timer = live_timer;
+ session->snapshot = snapshot;
+
+ lttng_ht_add_unique_u64(sessions_ht, &session->session_n);
error:
return session;
}
+/* Should be called with RCU read-side lock held. */
+bool session_get(struct relay_session *session)
+{
+ bool has_ref = false;
+
+ pthread_mutex_lock(&session->reflock);
+ if (session->ref.refcount != 0) {
+ has_ref = true;
+ urcu_ref_get(&session->ref);
+ }
+ pthread_mutex_unlock(&session->reflock);
+
+ return has_ref;
+}
+
/*
- * Lookup a session within the given hash table and session id. RCU read side
- * lock MUST be acquired before calling this and as long as the caller has a
- * reference to the object.
+ * Lookup a session within the session hash table using the session id
+ * as key. A session reference is taken when a session is returned.
+ * session_put() must be called on that session.
*
* Return session or NULL if not found.
*/
-struct relay_session *session_find_by_id(struct lttng_ht *ht, uint64_t id)
+struct relay_session *session_get_by_id(uint64_t id)
{
struct relay_session *session = NULL;
struct lttng_ht_node_u64 *node;
struct lttng_ht_iter iter;
- assert(ht);
-
- lttng_ht_lookup(ht, &id, &iter);
+ rcu_read_lock();
+ lttng_ht_lookup(sessions_ht, &id, &iter);
node = lttng_ht_iter_get_node_u64(&iter);
if (!node) {
DBG("Session find by ID %" PRIu64 " id NOT found", id);
@@ -88,97 +118,127 @@ struct relay_session *session_find_by_id(struct lttng_ht *ht, uint64_t id)
}
session = caa_container_of(node, struct relay_session, session_n);
DBG("Session find by ID %" PRIu64 " id found", id);
-
+ if (!session_get(session)) {
+ session = NULL;
+ }
end:
+ rcu_read_unlock();
return session;
}
+static void rcu_destroy_session(struct rcu_head *rcu_head)
+{
+ struct relay_session *session =
+ caa_container_of(rcu_head, struct relay_session, rcu_node);
+
+ free(session);
+}
+
/*
* Delete session from the given hash table.
*
* Return lttng ht del error code being 0 on success and 1 on failure.
*/
-int session_delete(struct lttng_ht *ht, struct relay_session *session)
+static int session_delete(struct relay_session *session)
{
struct lttng_ht_iter iter;
- assert(ht);
- assert(session);
-
iter.iter.node = &session->session_n.node;
- return lttng_ht_del(ht, &iter);
+ return lttng_ht_del(sessions_ht, &iter);
}
-/*
- * The caller MUST be from the viewer thread since the viewer refcount is
- * decremented. With this calue down to 0, it will try to destroy the session.
- */
-void session_viewer_try_destroy(struct lttng_ht *ht,
- struct relay_session *session)
+
+static void destroy_session(struct relay_session *session)
+{
+ int ret;
+
+ ret = session_delete(session);
+ assert(!ret);
+ /*
+ * Since each trace has a reference on the session, it means
+ * that if we are at the point where we teardown the session, no
+ * trace belonging to that session exist at this point.
+ */
+ lttng_ht_destroy(session->ctf_traces_ht);
+ call_rcu(&session->rcu_node, rcu_destroy_session);
+}
+
+void session_release(struct urcu_ref *ref)
{
- unsigned long ret_ref;
+ struct relay_session *session =
+ caa_container_of(ref, struct relay_session, ref);
- assert(session);
+ destroy_session(session);
+}
- ret_ref = uatomic_add_return(&session->viewer_refcount, -1);
- if (ret_ref == 0) {
- session_try_destroy(ht, session);
- }
+void session_put(struct relay_session *session)
+{
+ rcu_read_lock();
+ pthread_mutex_lock(&session->reflock);
+ urcu_ref_put(&session->ref, session_release);
+ pthread_mutex_unlock(&session->reflock);
+ rcu_read_unlock();
}
-/*
- * Should only be called from the main streaming thread since it does not touch
- * the viewer refcount. If this refcount is down to 0, destroy the session only
- * and only if the session deletion succeeds. This is done because the viewer
- * *and* the streaming thread can both concurently try to destroy the session
- * thus the first come first serve.
- */
-void session_try_destroy(struct lttng_ht *ht, struct relay_session *session)
+int session_close(struct relay_session *session)
{
int ret = 0;
- unsigned long ret_ref;
-
- assert(session);
+ struct ctf_trace *trace;
+ struct lttng_ht_iter iter;
+ struct relay_stream *stream;
+
+ pthread_mutex_lock(&session->lock);
+ DBG("closing session %" PRIu64 ": is conn already closed %d",
+ session->id, session->connection_closed);
+ if (session->connection_closed) {
+ ret = -1;
+ goto unlock;
+ }
+ session->connection_closed = true;
+unlock:
+ pthread_mutex_unlock(&session->lock);
+ if (ret) {
+ return ret;
+ }
- ret_ref = uatomic_read(&session->viewer_refcount);
- if (ret_ref == 0 && session->close_flag) {
- if (ht) {
- ret = session_delete(ht, session);
- }
- if (!ret) {
- /* Only destroy the session if the deletion was successful. */
- session_destroy(session);
+ rcu_read_lock();
+ cds_lfht_for_each_entry(session->ctf_traces_ht->ht,
+ &iter.iter, trace, node.node) {
+ ret = ctf_trace_close(trace);
+ if (ret) {
+ goto rcu_unlock;
}
}
+ cds_list_for_each_entry_rcu(stream, &session->recv_list,
+ recv_node) {
+ stream_close(stream);
+ }
+rcu_unlock:
+ rcu_read_unlock();
+ if (ret) {
+ return ret;
+ }
+ /* Put self-reference from create. */
+ session_put(session);
+ return 0;
}
-/*
- * Destroy a session object.
- *
- * This function must *NOT* be called with an RCU read lock held since
- * the session's ctf_traces_ht is destroyed.
- */
-void session_destroy(struct relay_session *session)
+void print_sessions(void)
{
- struct ctf_trace *ctf_trace;
struct lttng_ht_iter iter;
+ struct relay_session *session;
- assert(session);
-
- DBG("Relay destroying session %" PRIu64, session->id);
-
- /*
- * Empty the ctf trace hash table which will destroy the stream contained
- * in that table.
- */
rcu_read_lock();
- cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace,
- node.node) {
- ctf_trace_delete(session->ctf_traces_ht, ctf_trace);
- ctf_trace_destroy(ctf_trace);
+ cds_lfht_for_each_entry(sessions_ht->ht, &iter.iter, session,
+ session_n.node) {
+ if (!session_get(session)) {
+ continue;
+ }
+ DBG("session %p refcount %ld session %" PRIu64,
+ session,
+ session->ref.refcount,
+ session->id);
+ session_put(session);
}
rcu_read_unlock();
- lttng_ht_destroy(session->ctf_traces_ht);
-
- call_rcu(&session->rcu_node, rcu_destroy_session);
}
diff --git a/src/bin/lttng-relayd/session.h b/src/bin/lttng-relayd/session.h
index cb125be..0126e12 100644
--- a/src/bin/lttng-relayd/session.h
+++ b/src/bin/lttng-relayd/session.h
@@ -1,6 +1,10 @@
+#ifndef _SESSION_H
+#define _SESSION_H
+
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -16,13 +20,11 @@
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef _SESSION_H
-#define _SESSION_H
-
#include <limits.h>
#include <inttypes.h>
#include <pthread.h>
#include <urcu/list.h>
+#include <urcu/ref.h>
#include <common/hashtable/hashtable.h>
@@ -31,35 +33,59 @@
*/
struct relay_session {
/*
- * This session id is used to identify a set of stream to a tracing session
- * but also make sure we have a unique session id associated with a session
- * daemon which can provide multiple data source.
+ * This session id is used to identify a set of stream to a
+ * tracing session but also make sure we have a unique session
+ * id associated with a session daemon which can provide
+ * multiple data source.
*/
uint64_t id;
char session_name[NAME_MAX];
char hostname[HOST_NAME_MAX];
uint32_t live_timer;
- struct lttng_ht_node_u64 session_n;
- struct rcu_head rcu_node;
- uint32_t stream_count;
+
/* Tell if this session is for a snapshot or not. */
- unsigned int snapshot:1;
- /* Tell if the session has been closed on the streaming side. */
- unsigned int close_flag:1;
+ bool snapshot;
+
+ /*
+ * Session has no back reference to its connection because it
+ * has a life-time that can be longer than the consumer connection
+ * life-time: a reference can still be held by the viewer
+ * connection.
+ */
+
+ /* Reference count of ctf-traces and viewers using the session. */
+ struct urcu_ref ref;
+ /* session reflock nests inside ctf_trace reflock. */
+ pthread_mutex_t reflock;
+
+ pthread_mutex_t lock;
+
+ /*
+ * major/minor version used for this session.
+ */
+ uint32_t major;
+ uint32_t minor;
- /* Number of viewer using it. Set to 0, it should be destroyed. */
- int viewer_refcount;
+ bool viewer_attached;
+ /* Tell if the session connection has been closed on the streaming side. */
+ bool connection_closed;
/* Contains ctf_trace object of that session indexed by path name. */
struct lttng_ht *ctf_traces_ht;
/*
- * Indicate version protocol for this session. This is especially useful
- * for the data thread that has no idea which version it operates on since
- * linking control/data sockets is non trivial.
+ * This contains streams that are received on that connection.
+ * It's used to store them until we get the streams sent
+ * command. When this is received, we remove those streams for
+ * the list and publish them.
+ * Updates are protected by the recv_list_lock.
+ * Traversals are protected by RCU.
+ * recv_list_lock also protects stream_count.
*/
- uint64_t minor;
- uint64_t major;
+ struct cds_list_head recv_list; /* RCU list. */
+ uint32_t stream_count;
+ pthread_mutex_t recv_list_lock;
+
/*
* Flag checked and exchanged with uatomic_cmpxchg to tell the
* viewer-side if new streams got added since the last check.
@@ -67,50 +93,26 @@ struct relay_session {
unsigned long new_streams;
/*
- * Used to synchronize the process where we flag every streams readiness
- * for the viewer when the streams_sent message is received and the viewer
- * process of sending those streams.
+ * Node in the global session hash table.
*/
- pthread_mutex_t viewer_ready_lock;
-
+ struct lttng_ht_node_u64 session_n;
/*
* Member of the session list in struct relay_viewer_session.
+ * Updates are protected by the relay_viewer_session
+ * session_list_lock. Traversals are protected by RCU.
*/
- struct cds_list_head viewer_session_list;
+ struct cds_list_head viewer_session_node;
+ struct rcu_head rcu_node; /* For call_rcu teardown. */
};
-struct relay_viewer_session {
- struct cds_list_head sessions_head;
-};
-
-static inline void session_viewer_attach(struct relay_session *session)
-{
- uatomic_inc(&session->viewer_refcount);
-}
-
-static inline void session_viewer_detach(struct relay_session *session)
-{
- uatomic_add(&session->viewer_refcount, -1);
-}
-
-struct relay_session *session_find_by_id(struct lttng_ht *ht, uint64_t id);
-struct relay_session *session_create(void);
-int session_delete(struct lttng_ht *ht, struct relay_session *session);
+struct relay_session *session_create(const char *session_name,
+ const char *hostname, uint32_t live_timer,
+ bool snapshot, uint32_t major, uint32_t minor);
+struct relay_session *session_get_by_id(uint64_t id);
+bool session_get(struct relay_session *session);
+void session_put(struct relay_session *session);
-/*
- * Direct destroy without reading the refcount.
- */
-void session_destroy(struct relay_session *session);
-
-/*
- * Destroy the session if the refcount is down to 0.
- */
-void session_try_destroy(struct lttng_ht *ht, struct relay_session *session);
-
-/*
- * Decrement the viewer refcount and destroy it if down to 0.
- */
-void session_viewer_try_destroy(struct lttng_ht *ht,
- struct relay_session *session);
+int session_close(struct relay_session *session);
+void print_sessions(void);
#endif /* _SESSION_H */
diff --git a/src/bin/lttng-relayd/stream-fd.c b/src/bin/lttng-relayd/stream-fd.c
new file mode 100644
index 0000000..57324d7
--- /dev/null
+++ b/src/bin/lttng-relayd/stream-fd.c
@@ -0,0 +1,58 @@
+/*
+ * Copyright (C) 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#define _GNU_SOURCE
+#define _LGPL_SOURCE
+#include <common/common.h>
+
+#include "stream-fd.h"
+
+struct stream_fd *stream_fd_create(int fd)
+{
+ struct stream_fd *sf;
+
+ sf = zmalloc(sizeof(*sf));
+ if (!sf) {
+ goto end;
+ }
+ urcu_ref_init(&sf->ref);
+ sf->fd = fd;
+end:
+ return sf;
+}
+
+void stream_fd_get(struct stream_fd *sf)
+{
+ urcu_ref_get(&sf->ref);
+}
+
+static void stream_fd_release(struct urcu_ref *ref)
+{
+ struct stream_fd *sf = caa_container_of(ref, struct stream_fd, ref);
+ int ret;
+
+ ret = close(sf->fd);
+ if (ret) {
+ PERROR("Error closing stream FD %d", sf->fd);
+ }
+ free(sf);
+}
+
+void stream_fd_put(struct stream_fd *sf)
+{
+ urcu_ref_put(&sf->ref, stream_fd_release);
+}
diff --git a/src/bin/lttng-relayd/stream-fd.h b/src/bin/lttng-relayd/stream-fd.h
new file mode 100644
index 0000000..64f3b16
--- /dev/null
+++ b/src/bin/lttng-relayd/stream-fd.h
@@ -0,0 +1,32 @@
+#ifndef _STREAM_FD_H
+#define _STREAM_FD_H
+
+/*
+ * Copyright (C) 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include <urcu/ref.h>
+
+struct stream_fd {
+ int fd;
+ struct urcu_ref ref;
+};
+
+struct stream_fd *stream_fd_create(int fd);
+void stream_fd_get(struct stream_fd *sf);
+void stream_fd_put(struct stream_fd *sf);
+
+#endif /* _STREAM_FD_H */
diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c
index 17a5bcd..5525806 100644
--- a/src/bin/lttng-relayd/stream.c
+++ b/src/bin/lttng-relayd/stream.c
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -19,128 +20,332 @@
#define _GNU_SOURCE
#define _LGPL_SOURCE
#include <common/common.h>
+#include <common/utils.h>
+#include <common/defaults.h>
+#include <urcu/rculist.h>
+#include <sys/stat.h>
+#include "lttng-relayd.h"
#include "index.h"
#include "stream.h"
#include "viewer-stream.h"
+/* Should be called with RCU read-side lock held. */
+bool stream_get(struct relay_stream *stream)
+{
+ bool has_ref = false;
+
+ pthread_mutex_lock(&stream->reflock);
+ if (stream->ref.refcount != 0) {
+ has_ref = true;
+ urcu_ref_get(&stream->ref);
+ }
+ pthread_mutex_unlock(&stream->reflock);
+
+ return has_ref;
+}
+
/*
- * Get stream from stream id from the given hash table. Return stream if found
- * else NULL.
- *
- * Need to be called with RCU read-side lock held.
+ * Get stream from stream id from the streams hash table. Return stream
+ * if found else NULL. A stream reference is taken when a stream is
+ * returned. stream_put() must be called on that stream.
*/
-struct relay_stream *stream_find_by_id(struct lttng_ht *ht,
- uint64_t stream_id)
+struct relay_stream *stream_get_by_id(uint64_t stream_id)
{
struct lttng_ht_node_u64 *node;
struct lttng_ht_iter iter;
struct relay_stream *stream = NULL;
- assert(ht);
-
- lttng_ht_lookup(ht, &stream_id, &iter);
+ rcu_read_lock();
+ lttng_ht_lookup(relay_streams_ht, &stream_id, &iter);
node = lttng_ht_iter_get_node_u64(&iter);
- if (node == NULL) {
+ if (!node) {
DBG("Relay stream %" PRIu64 " not found", stream_id);
goto end;
}
stream = caa_container_of(node, struct relay_stream, node);
-
+ if (!stream_get(stream)) {
+ stream = NULL;
+ }
end:
+ rcu_read_unlock();
return stream;
}
/*
- * Close a given stream. If an assosiated viewer stream exists it is updated.
- *
- * RCU read side lock MUST be acquired.
- *
- * Return 0 if close was successful or 1 if already closed.
+ * We keep ownership of path_name and channel_name.
*/
-int stream_close(struct relay_session *session, struct relay_stream *stream)
+struct relay_stream *stream_create(struct ctf_trace *trace,
+ uint64_t stream_handle, char *path_name,
+ char *channel_name, uint64_t tracefile_size,
+ uint64_t tracefile_count)
{
- int delret, ret;
- struct relay_viewer_stream *vstream;
- struct ctf_trace *ctf_trace;
+ int ret;
+ struct relay_stream *stream = NULL;
+ struct relay_session *session = trace->session;
- assert(stream);
+ stream = zmalloc(sizeof(struct relay_stream));
+ if (stream == NULL) {
+ PERROR("relay stream zmalloc");
+ ret = -1;
+ goto error_no_alloc;
+ }
- pthread_mutex_lock(&stream->lock);
+ stream->stream_handle = stream_handle;
+ stream->prev_seq = -1ULL;
+ stream->ctf_stream_id = -1ULL;
+ stream->tracefile_size = tracefile_size;
+ stream->tracefile_count = tracefile_count;
+ stream->path_name = path_name;
+ stream->channel_name = channel_name;
+ lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
+ pthread_mutex_init(&stream->lock, NULL);
+ pthread_mutex_init(&stream->reflock, NULL);
+ urcu_ref_init(&stream->ref);
+ ctf_trace_get(trace);
+ stream->trace = trace;
- if (stream->terminated_flag) {
- /* This stream is already closed. Ignore. */
- ret = 1;
- goto end_unlock;
+ stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!stream->indexes_ht) {
+ ERR("Cannot created indexes_ht");
+ ret = -1;
+ goto end;
}
- DBG("Closing stream id %" PRIu64, stream->stream_handle);
+ ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG);
+ if (ret < 0) {
+ ERR("relay creating output directory");
+ goto end;
+ }
- if (stream->fd >= 0) {
- delret = close(stream->fd);
- if (delret < 0) {
- PERROR("close stream");
+ /*
+ * No need to use run_as API here because whatever we receives,
+ * the relayd uses its own credentials for the stream files.
+ */
+ ret = utils_create_stream_file(stream->path_name, stream->channel_name,
+ stream->tracefile_size, 0, relayd_uid, relayd_gid, NULL);
+ if (ret < 0) {
+ ERR("Create output file");
+ goto end;
+ }
+ stream->stream_fd = stream_fd_create(ret);
+ if (!stream->stream_fd) {
+ if (close(ret)) {
+ PERROR("Error closing file %d", ret);
}
+ ret = -1;
+ goto end;
+ }
+ if (stream->tracefile_size) {
+ DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name);
+ } else {
+ DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name);
+ }
+
+ if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, NAME_MAX)) {
+ stream->is_metadata = 1;
}
- if (stream->index_fd >= 0) {
- delret = close(stream->index_fd);
- if (delret < 0) {
- PERROR("close stream index_fd");
+ stream->in_recv_list = true;
+
+ /*
+ * Add the stream in the recv list of the session. Once the end stream
+ * message is received, all session streams are published.
+ */
+ pthread_mutex_lock(&session->recv_list_lock);
+ cds_list_add_rcu(&stream->recv_node, &session->recv_list);
+ session->stream_count++;
+ pthread_mutex_unlock(&session->recv_list_lock);
+
+ /*
+ * Both in the ctf_trace object and the global stream ht since the data
+ * side of the relayd does not have the concept of session.
+ */
+ lttng_ht_add_unique_u64(relay_streams_ht, &stream->node);
+
+ DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
+ stream->stream_handle);
+ ret = 0;
+
+end:
+ if (ret) {
+ if (stream->stream_fd) {
+ stream_fd_put(stream->stream_fd);
+ stream->stream_fd = NULL;
}
+ stream_put(stream);
+ stream = NULL;
}
+ return stream;
+
+error_no_alloc:
+ /*
+ * path_name and channel_name need to be freed explicitly here
+ * because we cannot rely on stream_put().
+ */
+ free(path_name);
+ free(channel_name);
+ return NULL;
+}
+
+/*
+ * Called with the session lock held.
+ */
+void stream_publish(struct relay_stream *stream)
+{
+ struct relay_session *session;
- vstream = viewer_stream_find_by_id(stream->stream_handle);
- if (vstream) {
- /*
- * Set the last good value into the viewer stream. This is done
- * right before the stream gets deleted from the hash table. The
- * lookup failure on the live thread side of a stream indicates
- * that the viewer stream index received value should be used.
- */
- pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
- vstream->total_index_received = stream->total_index_received;
- vstream->tracefile_count_last = stream->tracefile_count_current;
- vstream->close_write_flag = 1;
- pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
+ pthread_mutex_lock(&stream->lock);
+ if (stream->published) {
+ goto unlock;
}
- /* Cleanup index of that stream. */
- relay_index_destroy_by_stream_id(stream->stream_handle);
+ session = stream->trace->session;
- ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
- stream->path_name);
- assert(ctf_trace);
- ctf_trace_put_ref(ctf_trace);
+ pthread_mutex_lock(&session->recv_list_lock);
+ if (stream->in_recv_list) {
+ cds_list_del_rcu(&stream->recv_node);
+ stream->in_recv_list = false;
+ }
+ pthread_mutex_unlock(&session->recv_list_lock);
- stream->close_flag = 1;
- stream->terminated_flag = 1;
- ret = 0;
+ pthread_mutex_lock(&stream->trace->stream_list_lock);
+ cds_list_add_rcu(&stream->stream_node, &stream->trace->stream_list);
+ pthread_mutex_unlock(&stream->trace->stream_list_lock);
-end_unlock:
+ stream->published = true;
+unlock:
pthread_mutex_unlock(&stream->lock);
- return ret;
}
-void stream_delete(struct lttng_ht *ht, struct relay_stream *stream)
+/*
+ * Only called from destroy. No stream lock needed, since there is a
+ * single user at this point. This is ensured by having the refcount
+ * reaching 0.
+ */
+static void stream_unpublish(struct relay_stream *stream)
+{
+ if (!stream->published) {
+ return;
+ }
+ pthread_mutex_lock(&stream->trace->stream_list_lock);
+ cds_list_del_rcu(&stream->stream_node);
+ pthread_mutex_unlock(&stream->trace->stream_list_lock);
+
+ stream->published = false;
+}
+
+static void stream_destroy(struct relay_stream *stream)
+{
+ if (stream->indexes_ht) {
+ lttng_ht_destroy(stream->indexes_ht);
+ }
+ free(stream->path_name);
+ free(stream->channel_name);
+ free(stream);
+}
+
+static void stream_destroy_rcu(struct rcu_head *rcu_head)
{
+ struct relay_stream *stream =
+ caa_container_of(rcu_head, struct relay_stream, rcu_node);
+
+ stream_destroy(stream);
+}
+
+static void stream_release(struct urcu_ref *ref)
+{
+ struct relay_stream *stream =
+ caa_container_of(ref, struct relay_stream, ref);
+ struct relay_session *session;
int ret;
struct lttng_ht_iter iter;
- assert(ht);
- assert(stream);
+ session = stream->trace->session;
+
+ DBG("Releasing stream id %" PRIu64, stream->stream_handle);
+
+ pthread_mutex_lock(&session->recv_list_lock);
+ session->stream_count--;
+ if (stream->in_recv_list) {
+ cds_list_del_rcu(&stream->recv_node);
+ stream->in_recv_list = false;
+ }
+ pthread_mutex_unlock(&session->recv_list_lock);
iter.iter.node = &stream->node.node;
- ret = lttng_ht_del(ht, &iter);
+ ret = lttng_ht_del(relay_streams_ht, &iter);
assert(!ret);
- cds_list_del(&stream->trace_list);
+ stream_unpublish(stream);
+
+ if (stream->stream_fd) {
+ stream_fd_put(stream->stream_fd);
+ stream->stream_fd = NULL;
+ }
+ if (stream->index_fd) {
+ stream_fd_put(stream->index_fd);
+ stream->index_fd = NULL;
+ }
+ if (stream->trace) {
+ ctf_trace_put(stream->trace);
+ stream->trace = NULL;
+ }
+
+ call_rcu(&stream->rcu_node, stream_destroy_rcu);
}
-void stream_destroy(struct relay_stream *stream)
+void stream_put(struct relay_stream *stream)
{
- assert(stream);
- free(stream->path_name);
- free(stream->channel_name);
- free(stream);
+ DBG("stream put for stream id %" PRIu64, stream->stream_handle);
+ /*
+ * Ensure existance of stream->reflock for stream unlock.
+ */
+ rcu_read_lock();
+ /*
+ * Stream reflock ensures that concurrent test and update of
+ * stream ref is atomic.
+ */
+ pthread_mutex_lock(&stream->reflock);
+ assert(stream->ref.refcount != 0);
+ /*
+ * Wait until we have processed all the stream packets before
+ * actually putting our last stream reference.
+ */
+ DBG("stream put stream id %" PRIu64 " refcount %d",
+ stream->stream_handle,
+ (int) stream->ref.refcount);
+ urcu_ref_put(&stream->ref, stream_release);
+ pthread_mutex_unlock(&stream->reflock);
+ rcu_read_unlock();
+}
+
+void stream_close(struct relay_stream *stream)
+{
+ DBG("closing stream %" PRIu64, stream->stream_handle);
+ relay_index_close_all(stream);
+ stream_put(stream);
+}
+
+void print_relay_streams(void)
+{
+ struct lttng_ht_iter iter;
+ struct relay_stream *stream;
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
+ node.node) {
+ if (!stream_get(stream)) {
+ continue;
+ }
+ DBG("stream %p refcount %ld stream %" PRIu64 " trace %" PRIu64
+ " session %" PRIu64,
+ stream,
+ stream->ref.refcount,
+ stream->stream_handle,
+ stream->trace->id,
+ stream->trace->session->id);
+ stream_put(stream);
+ }
+ rcu_read_unlock();
}
diff --git a/src/bin/lttng-relayd/stream.h b/src/bin/lttng-relayd/stream.h
index 4dd2e62..92a809f 100644
--- a/src/bin/lttng-relayd/stream.h
+++ b/src/bin/lttng-relayd/stream.h
@@ -1,6 +1,10 @@
+#ifndef _STREAM_H
+#define _STREAM_H
+
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -16,9 +20,6 @@
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef _STREAM_H
-#define _STREAM_H
-
#include <limits.h>
#include <inttypes.h>
#include <pthread.h>
@@ -27,100 +28,111 @@
#include <common/hashtable/hashtable.h>
#include "session.h"
+#include "stream-fd.h"
/*
* Represents a stream in the relay
*/
struct relay_stream {
uint64_t stream_handle;
- uint64_t prev_seq; /* previous data sequence number encountered */
- struct lttng_ht_node_u64 node;
+
/*
- * When we receive a stream, it gets stored in a list (on a per connection
- * basis) until we have all the streams of the same channel and the metadata
- * associated with it, then it gets flagged with viewer_ready.
+ * reflock used to synchronize the closing of this stream.
+ * stream reflock nests inside viewer stream reflock.
+ * stream reflock nests inside index reflock.
*/
- struct cds_list_head recv_list;
+ pthread_mutex_t reflock;
+ struct urcu_ref ref;
+ /* Back reference to trace. Protected by refcount on trace object. */
+ struct ctf_trace *trace;
- /* Added to the corresponding ctf_trace. */
- struct cds_list_head trace_list;
- struct rcu_head rcu_node;
- uint64_t session_id;
- int fd;
+ /*
+ * To protect from concurrent read/update. The viewer stream
+ * lock nests inside the stream lock. The stream lock nests
+ * inside the ctf_trace lock.
+ */
+ pthread_mutex_t lock;
+ uint64_t prev_seq; /* previous data sequence number encountered */
+ uint64_t last_net_seq_num; /* seq num to encounter before closing. */
+
+ /* FD on which to write the stream data. */
+ struct stream_fd *stream_fd;
/* FD on which to write the index data. */
- int index_fd;
- /* FD on which to read the index data for the viewer. */
- int read_index_fd;
+ struct stream_fd *index_fd;
char *path_name;
char *channel_name;
+
/* on-disk circular buffer of tracefiles */
uint64_t tracefile_size;
uint64_t tracefile_size_current;
uint64_t tracefile_count;
- uint64_t tracefile_count_current;
+ uint64_t current_tracefile_id;
+
/* To inform the viewer up to where it can go back in time. */
uint64_t oldest_tracefile_id;
- uint64_t total_index_received;
- uint64_t last_net_seq_num;
-
+ struct lttng_ht *indexes_ht;
/*
- * To protect from concurrent read/update. Also used to synchronize the
- * closing of this stream.
+ * Counts number of indexes in indexes_ht. Redundant info.
+ * Protected by stream lock.
*/
- pthread_mutex_t lock;
+ uint64_t total_index_received;
+ bool closed; /* Stream is closed. */
+
+ int indexes_in_flight;
/*
- * If the stream is inactive, this field is updated with the live beacon
- * timestamp end, when it is active, this field == -1ULL.
+ * If the stream is inactive, this field is updated with the
+ * live beacon timestamp end, when it is active, this
+ * field == -1ULL.
*/
uint64_t beacon_ts_end;
/*
- * Number of indexes that are supposed to be complete soon.
- * Avoid sending the inactivity beacon to the client when data is in
- * transit.
- */
- int indexes_in_flight;
- /*
* CTF stream ID, -1ULL when unset.
*/
uint64_t ctf_stream_id;
- /*
- * To protect the update of the close_write_flag and the checks of
- * the tracefile_count_current.
- * It is taken before checking whenever we need to know if the
- * writer and reader are working in the same tracefile.
- */
- pthread_mutex_t viewer_stream_rotation_lock;
- /* Information telling us when to close the stream */
- unsigned int close_flag:1;
+ /* Indicate if the stream was initialized for a data pending command. */
+ bool data_pending_check_done;
+
+ /* Is this stream a metadata stream ? */
+ int32_t is_metadata;
+ uint64_t metadata_received;
+
/*
- * Indicates if the stream has been effectively closed thus having the
- * information in it invalidated but NOT freed. The stream lock MUST be
- * held to read/update that value.
+ * Member of the stream list in struct ctf_trace.
+ * Updates are protected by the stream_list_lock.
+ * Traversals are protected by RCU.
*/
- unsigned int terminated_flag:1;
- /* Indicate if the stream was initialized for a data pending command. */
- unsigned int data_pending_check_done:1;
- unsigned int metadata_flag:1;
+ struct cds_list_head stream_node;
/*
- * To detect when we start overwriting old data, it is used to
- * update the oldest_tracefile_id.
+ * Temporary list belonging to the connection until all streams
+ * are received for that connection.
+ * Member of the stream recv list in the connection.
+ * Updates are protected by the stream_recv_list_lock.
+ * Traversals are protected by RCU.
*/
- unsigned int tracefile_overwrite:1;
+ bool in_recv_list;
+ struct cds_list_head recv_node;
+ bool published; /* Protected by session lock. */
/*
- * Can this stream be used by a viewer or are we waiting for additional
- * information.
+ * Node of stream within global stream hash table.
*/
- unsigned int viewer_ready:1;
+ struct lttng_ht_node_u64 node;
+ struct rcu_head rcu_node; /* For call_rcu teardown. */
};
-struct relay_stream *stream_find_by_id(struct lttng_ht *ht,
- uint64_t stream_id);
-int stream_close(struct relay_session *session, struct relay_stream *stream);
-void stream_delete(struct lttng_ht *ht, struct relay_stream *stream);
-void stream_destroy(struct relay_stream *stream);
+struct relay_stream *stream_create(struct ctf_trace *trace,
+ uint64_t stream_handle, char *path_name,
+ char *channel_name, uint64_t tracefile_size,
+ uint64_t tracefile_count);
+
+struct relay_stream *stream_get_by_id(uint64_t stream_id);
+bool stream_get(struct relay_stream *stream);
+void stream_put(struct relay_stream *stream);
+void stream_close(struct relay_stream *stream);
+void stream_publish(struct relay_stream *stream);
+void print_relay_streams(void);
#endif /* _STREAM_H */
diff --git a/src/bin/lttng-relayd/utils.h b/src/bin/lttng-relayd/utils.h
index de1521d..4a56980 100644
--- a/src/bin/lttng-relayd/utils.h
+++ b/src/bin/lttng-relayd/utils.h
@@ -1,6 +1,10 @@
+#ifndef RELAYD_UTILS_H
+#define RELAYD_UTILS_H
+
/*
* Copyright (C) 2012 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2 only,
@@ -16,9 +20,6 @@
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef RELAYD_UTILS_H
-#define RELAYD_UTILS_H
-
char *create_output_path(char *path_name);
#endif /* RELAYD_UTILS_H */
diff --git a/src/bin/lttng-relayd/viewer-session.c b/src/bin/lttng-relayd/viewer-session.c
new file mode 100644
index 0000000..ff2e41b
--- /dev/null
+++ b/src/bin/lttng-relayd/viewer-session.c
@@ -0,0 +1,164 @@
+/*
+ * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
+ * David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#define _GNU_SOURCE
+#define _LGPL_SOURCE
+#include <common/common.h>
+#include <urcu/rculist.h>
+
+#include "lttng-relayd.h"
+#include "ctf-trace.h"
+#include "session.h"
+#include "viewer-session.h"
+#include "viewer-stream.h"
+#include "stream.h"
+
+struct relay_viewer_session *viewer_session_create(void)
+{
+ struct relay_viewer_session *vsession;
+
+ vsession = zmalloc(sizeof(*vsession));
+ if (!vsession) {
+ goto end;
+ }
+ CDS_INIT_LIST_HEAD(&vsession->session_list);
+end:
+ return vsession;
+}
+
+int viewer_session_attach(struct relay_viewer_session *vsession,
+ struct relay_session *session)
+{
+ int ret = 0;
+
+ if (!session_get(session)) {
+ ret = -1;
+ goto end;
+ }
+ pthread_mutex_lock(&session->lock);
+ if (session->viewer_attached) {
+ ret = -1;
+ } else {
+ session->viewer_attached = true;
+ }
+ pthread_mutex_unlock(&session->lock);
+
+ if (!ret) {
+ pthread_mutex_lock(&vsession->session_list_lock);
+ cds_list_add_rcu(&session->viewer_session_node,
+ &vsession->session_list);
+ pthread_mutex_unlock(&vsession->session_list_lock);
+ } else {
+ session_put(session);
+ }
+end:
+ return ret;
+}
+
+static int viewer_session_detach(struct relay_viewer_session *vsession,
+ struct relay_session *session)
+{
+ int ret = 0;
+
+ pthread_mutex_lock(&session->lock);
+ if (!session->viewer_attached) {
+ ret = -1;
+ } else {
+ session->viewer_attached = false;
+ }
+ pthread_mutex_unlock(&session->lock);
+
+ if (!ret) {
+ pthread_mutex_lock(&vsession->session_list_lock);
+ cds_list_del_rcu(&session->viewer_session_node);
+ pthread_mutex_unlock(&vsession->session_list_lock);
+ session_put(session);
+ }
+ return ret;
+}
+
+void viewer_session_destroy(struct relay_viewer_session *vsession)
+{
+ free(vsession);
+}
+
+void viewer_session_close(struct relay_viewer_session *vsession)
+{
+ struct relay_session *session;
+
+ rcu_read_lock();
+ cds_list_for_each_entry_rcu(session,
+ &vsession->session_list, viewer_session_node) {
+ struct lttng_ht_iter iter;
+ struct relay_viewer_stream *vstream;
+
+ /*
+ * TODO: improvement: create more efficient list of
+ * vstream per session.
+ */
+ cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter,
+ vstream, stream_n.node) {
+ if (!viewer_stream_get(vstream)) {
+ continue;
+ }
+ if (vstream->stream->trace->session != session) {
+ viewer_stream_put(vstream);
+ continue;
+ }
+ viewer_stream_put(vstream); /* put local ref */
+ viewer_stream_put(vstream); /* release ownership */
+ }
+
+ viewer_session_detach(vsession, session);
+ }
+ rcu_read_unlock();
+}
+
+/*
+ * Check if a connection is attached to a session.
+ * Return 1 if attached, 0 if not attached, a negative value on error.
+ */
+int viewer_session_is_attached(struct relay_viewer_session *vsession,
+ struct relay_session *session)
+{
+ struct relay_session *iter;
+ int found = 0;
+
+ pthread_mutex_lock(&session->lock);
+ if (!vsession) {
+ goto end;
+ }
+ if (!session->viewer_attached) {
+ goto end;
+ }
+ rcu_read_lock();
+ cds_list_for_each_entry_rcu(iter,
+ &vsession->session_list,
+ viewer_session_node) {
+ if (session == iter) {
+ found = 1;
+ goto end;
+ }
+ }
+end:
+ pthread_mutex_unlock(&session->lock);
+ rcu_read_unlock();
+ return found;
+}
+
diff --git a/src/bin/lttng-relayd/viewer-session.h b/src/bin/lttng-relayd/viewer-session.h
new file mode 100644
index 0000000..4013b35
--- /dev/null
+++ b/src/bin/lttng-relayd/viewer-session.h
@@ -0,0 +1,53 @@
+#ifndef _VIEWER_SESSION_H
+#define _VIEWER_SESSION_H
+
+/*
+ * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
+ * David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include <limits.h>
+#include <inttypes.h>
+#include <pthread.h>
+#include <urcu/list.h>
+#include <urcu/ref.h>
+
+#include <common/hashtable/hashtable.h>
+
+#include "session.h"
+
+struct relay_viewer_session {
+ /*
+ * Session list. Updates are protected by the session_list_lock.
+ * Traversals are protected by RCU.
+ * This list limits the design to having the sessions in at most
+ * one viewer session.
+ */
+ struct cds_list_head session_list; /* RCU list. */
+ pthread_mutex_t session_list_lock; /* Protects list updates. */
+};
+
+struct relay_viewer_session *viewer_session_create(void);
+void viewer_session_destroy(struct relay_viewer_session *vsession);
+void viewer_session_close(struct relay_viewer_session *vsession);
+
+int viewer_session_attach(struct relay_viewer_session *vsession,
+ struct relay_session *session);
+int viewer_session_is_attached(struct relay_viewer_session *vsession,
+ struct relay_session *session);
+
+#endif /* _VIEWER_SESSION_H */
diff --git a/src/bin/lttng-relayd/viewer-stream.c b/src/bin/lttng-relayd/viewer-stream.c
index 3748629..6ecb83e 100644
--- a/src/bin/lttng-relayd/viewer-stream.c
+++ b/src/bin/lttng-relayd/viewer-stream.c
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -24,39 +25,32 @@
#include "lttng-relayd.h"
#include "viewer-stream.h"
-static void free_stream(struct relay_viewer_stream *stream)
+static void viewer_stream_destroy(struct relay_viewer_stream *vstream)
{
- assert(stream);
-
- free(stream->path_name);
- free(stream->channel_name);
- free(stream);
+ free(vstream->path_name);
+ free(vstream->channel_name);
+ free(vstream);
}
-static void deferred_free_viewer_stream(struct rcu_head *head)
+static void viewer_stream_destroy_rcu(struct rcu_head *head)
{
- struct relay_viewer_stream *stream =
+ struct relay_viewer_stream *vstream =
caa_container_of(head, struct relay_viewer_stream, rcu_node);
- free_stream(stream);
+ viewer_stream_destroy(vstream);
}
struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
- enum lttng_viewer_seek seek_t, struct ctf_trace *ctf_trace)
+ enum lttng_viewer_seek seek_t)
{
struct relay_viewer_stream *vstream;
- assert(stream);
- assert(ctf_trace);
-
vstream = zmalloc(sizeof(*vstream));
if (!vstream) {
PERROR("relay viewer stream zmalloc");
goto error;
}
- vstream->session_id = stream->session_id;
- vstream->stream_handle = stream->stream_handle;
vstream->path_name = strndup(stream->path_name, LTTNG_VIEWER_PATH_MAX);
if (vstream->path_name == NULL) {
PERROR("relay viewer path_name alloc");
@@ -68,216 +62,285 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
PERROR("relay viewer channel_name alloc");
goto error;
}
- vstream->tracefile_count = stream->tracefile_count;
- vstream->metadata_flag = stream->metadata_flag;
- vstream->tracefile_count_last = -1ULL;
switch (seek_t) {
case LTTNG_VIEWER_SEEK_BEGINNING:
- vstream->tracefile_count_current = stream->oldest_tracefile_id;
+ vstream->current_tracefile_id = stream->oldest_tracefile_id;
break;
case LTTNG_VIEWER_SEEK_LAST:
- vstream->tracefile_count_current = stream->tracefile_count_current;
+ vstream->current_tracefile_id = stream->current_tracefile_id;
break;
default:
- assert(0);
goto error;
}
-
- if (vstream->metadata_flag) {
- ctf_trace->viewer_metadata_stream = vstream;
+ if (!stream_get(stream)) {
+ ERR("Cannot get stream");
+ goto error;
}
+ vstream->stream = stream;
- /* Globally visible after the add unique. */
- lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle);
- lttng_ht_add_unique_u64(viewer_streams_ht, &vstream->stream_n);
-
- vstream->index_read_fd = -1;
- vstream->read_fd = -1;
-
- /*
- * This is to avoid a race between the initialization of this object and
- * the close of the given stream. If the stream is unable to find this
- * viewer stream when closing, this copy will at least take the latest
- * value. We also need that for the seek_last.
- */
- vstream->total_index_received = stream->total_index_received;
-
+ pthread_mutex_lock(&stream->lock);
/*
* If we never received an index for the current stream, delay the opening
* of the index, otherwise open it right now.
*/
- if (vstream->tracefile_count_current == stream->tracefile_count_current
- && vstream->total_index_received == 0) {
- vstream->index_read_fd = -1;
+ if (vstream->current_tracefile_id == stream->current_tracefile_id
+ && stream->total_index_received == 0) {
+ vstream->index_fd = NULL;
} else {
int read_fd;
read_fd = index_open(vstream->path_name, vstream->channel_name,
- vstream->tracefile_count, vstream->tracefile_count_current);
+ stream->tracefile_count,
+ vstream->current_tracefile_id);
if (read_fd < 0) {
+ pthread_mutex_unlock(&stream->lock);
+ goto error;
+ }
+ vstream->index_fd = stream_fd_create(read_fd);
+ if (!vstream->index_fd) {
+ if (close(read_fd)) {
+ PERROR("close");
+ }
goto error;
}
- vstream->index_read_fd = read_fd;
}
- if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_read_fd >= 0) {
+ if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_fd) {
off_t lseek_ret;
- lseek_ret = lseek(vstream->index_read_fd,
- vstream->total_index_received * sizeof(struct ctf_packet_index),
- SEEK_CUR);
+ lseek_ret = lseek(vstream->index_fd->fd, 0, SEEK_END);
if (lseek_ret < 0) {
+ pthread_mutex_unlock(&stream->lock);
goto error;
}
- vstream->last_sent_index = vstream->total_index_received;
+ vstream->last_sent_index = stream->total_index_received;
+ }
+ pthread_mutex_unlock(&stream->lock);
+
+ if (stream->is_metadata) {
+ stream->trace->viewer_metadata_stream = vstream;
}
+ /* Globally visible after the add unique. */
+ lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle);
+ lttng_ht_add_unique_u64(viewer_streams_ht, &vstream->stream_n);
+
+ pthread_mutex_init(&vstream->lock, NULL);
+ pthread_mutex_init(&vstream->reflock, NULL);
+ urcu_ref_init(&vstream->ref);
+
return vstream;
error:
if (vstream) {
- free_stream(vstream);
+ viewer_stream_destroy(vstream);
}
return NULL;
}
-void viewer_stream_delete(struct relay_viewer_stream *stream)
+static void viewer_stream_unpublish(struct relay_viewer_stream *vstream)
{
int ret;
struct lttng_ht_iter iter;
- iter.iter.node = &stream->stream_n.node;
+ iter.iter.node = &vstream->stream_n.node;
ret = lttng_ht_del(viewer_streams_ht, &iter);
assert(!ret);
}
-void viewer_stream_destroy(struct ctf_trace *ctf_trace,
- struct relay_viewer_stream *stream)
+static void viewer_stream_release(struct urcu_ref *ref)
{
- int ret;
-
- assert(stream);
+ struct relay_viewer_stream *vstream = caa_container_of(ref,
+ struct relay_viewer_stream, ref);
- if (ctf_trace) {
- ctf_trace_put_ref(ctf_trace);
+ if (vstream->stream->is_metadata) {
+ rcu_assign_pointer(vstream->stream->trace->viewer_metadata_stream, NULL);
}
- if (stream->read_fd >= 0) {
- ret = close(stream->read_fd);
- if (ret < 0) {
- PERROR("close read_fd");
- }
+ viewer_stream_unpublish(vstream);
+
+ if (vstream->stream_fd) {
+ stream_fd_put(vstream->stream_fd);
+ vstream->stream_fd = NULL;
}
- if (stream->index_read_fd >= 0) {
- ret = close(stream->index_read_fd);
- if (ret < 0) {
- PERROR("close index_read_fd");
- }
+ if (vstream->index_fd) {
+ stream_fd_put(vstream->index_fd);
+ vstream->index_fd = NULL;
+ }
+ if (vstream->stream) {
+ stream_put(vstream->stream);
+ vstream->stream = NULL;
+ }
+ call_rcu(&vstream->rcu_node, viewer_stream_destroy_rcu);
+}
+
+/* Should be called with RCU read-side lock held. */
+bool viewer_stream_get(struct relay_viewer_stream *vstream)
+{
+ bool has_ref = false;
+
+ pthread_mutex_lock(&vstream->reflock);
+ if (vstream->ref.refcount != 0) {
+ has_ref = true;
+ urcu_ref_get(&vstream->ref);
}
+ pthread_mutex_unlock(&vstream->reflock);
- call_rcu(&stream->rcu_node, deferred_free_viewer_stream);
+ return has_ref;
}
/*
- * Find viewer stream by id. RCU read side lock MUST be acquired.
+ * Get viewer stream by id.
*
- * Return stream if found else NULL.
+ * Return viewer stream if found else NULL.
*/
-struct relay_viewer_stream *viewer_stream_find_by_id(uint64_t id)
+struct relay_viewer_stream *viewer_stream_get_by_id(uint64_t id)
{
struct lttng_ht_node_u64 *node;
struct lttng_ht_iter iter;
- struct relay_viewer_stream *stream = NULL;
+ struct relay_viewer_stream *vstream = NULL;
+ rcu_read_lock();
lttng_ht_lookup(viewer_streams_ht, &id, &iter);
node = lttng_ht_iter_get_node_u64(&iter);
if (!node) {
DBG("Relay viewer stream %" PRIu64 " not found", id);
goto end;
}
- stream = caa_container_of(node, struct relay_viewer_stream, stream_n);
-
+ vstream = caa_container_of(node, struct relay_viewer_stream, stream_n);
+ if (!viewer_stream_get(vstream)) {
+ vstream = NULL;
+ }
end:
- return stream;
+ rcu_read_unlock();
+ return vstream;
+}
+
+void viewer_stream_put(struct relay_viewer_stream *vstream)
+{
+ rcu_read_lock();
+ pthread_mutex_lock(&vstream->reflock);
+ urcu_ref_put(&vstream->ref, viewer_stream_release);
+ pthread_mutex_unlock(&vstream->reflock);
+ rcu_read_unlock();
+}
+
+/*
+ * Returns whether the current tracefile is readable. If not, it has
+ * been overwritten.
+ * Must be called with rstream and vstream locks held.
+ */
+bool viewer_stream_is_tracefile_id_readable(struct relay_viewer_stream *vstream,
+ uint64_t id)
+{
+ struct relay_stream *stream = vstream->stream;
+
+ if (stream->oldest_tracefile_id <= stream->current_tracefile_id) {
+ if (id >= stream->oldest_tracefile_id
+ && id <= stream->current_tracefile_id) {
+ /* id is a readable file. */
+ return true;
+ } else {
+ /* id is not readable. */
+ return false;
+ }
+ } else {
+ if (id >= stream->oldest_tracefile_id
+ || id <= stream->current_tracefile_id) {
+ /* id is a readable file. */
+ return true;
+ } else {
+ /* id is not readable. */
+ return false;
+ }
+ }
}
/*
* Rotate a stream to the next tracefile.
*
- * Must be called with viewer_stream_rotation_lock held.
+ * Must be called with rstream and vstream locks held.
* Returns 0 on success, 1 on EOF, a negative value on error.
*/
-int viewer_stream_rotate(struct relay_viewer_stream *vstream,
- struct relay_stream *stream)
+int viewer_stream_rotate(struct relay_viewer_stream *vstream)
{
int ret;
- uint64_t tracefile_id;
-
- assert(vstream);
- assert(stream);
-
- if (vstream->tracefile_count == 0) {
- /* Ignore rotation, there is none to do. */
- ret = 0;
- goto end;
- }
-
- tracefile_id = (vstream->tracefile_count_current + 1) %
- vstream->tracefile_count;
+ uint64_t new_id;
+ struct relay_stream *stream = vstream->stream;
/* Detect the last tracefile to open. */
- if (vstream->tracefile_count_last != -1ULL &&
- vstream->tracefile_count_last ==
- vstream->tracefile_count_current) {
+ if (stream->total_index_received == vstream->last_sent_index
+ && stream->trace->session->connection_closed) {
ret = 1;
goto end;
}
- /*
- * The writer and the reader are not working in the same tracefile, we can
- * read up to EOF, we don't care about the total_index_received.
- */
- if (stream->close_flag || (stream->tracefile_count_current != tracefile_id)) {
- vstream->close_write_flag = 1;
- } else {
- /*
- * We are opening a file that is still open in write, make sure we
- * limit our reading to the number of indexes received.
- */
- vstream->close_write_flag = 0;
- if (stream->close_flag) {
- vstream->total_index_received = stream->total_index_received;
- }
+ if (stream->tracefile_count == 0) {
+ /* Ignore rotation, there is none to do. */
+ ret = 0;
+ goto end;
}
- vstream->tracefile_count_current = tracefile_id;
- ret = close(vstream->index_read_fd);
- if (ret < 0) {
- PERROR("close index file %d", vstream->index_read_fd);
+ new_id = (vstream->current_tracefile_id + 1) % stream->tracefile_count;
+ DBG("viewer_stream_rotate: stream handle %" PRIu64
+ " new_id %" PRIu64 " tfcount %" PRIu64
+ " oldestid %" PRIu64 " isnewidreadable %d",
+ stream->stream_handle, new_id, stream->tracefile_count,
+ stream->oldest_tracefile_id,
+ viewer_stream_is_tracefile_id_readable(vstream, new_id));
+ if (!viewer_stream_is_tracefile_id_readable(vstream, new_id)) {
+ new_id = stream->oldest_tracefile_id;
}
- vstream->index_read_fd = -1;
+ vstream->current_tracefile_id = new_id;
- ret = close(vstream->read_fd);
- if (ret < 0) {
- PERROR("close tracefile %d", vstream->read_fd);
+ if (vstream->index_fd) {
+ stream_fd_put(vstream->index_fd);
+ vstream->index_fd = NULL;
+ }
+ if (vstream->stream_fd) {
+ stream_fd_put(vstream->stream_fd);
+ vstream->stream_fd = NULL;
}
- vstream->read_fd = -1;
-
- pthread_mutex_lock(&vstream->overwrite_lock);
- vstream->abort_flag = 0;
- pthread_mutex_unlock(&vstream->overwrite_lock);
ret = index_open(vstream->path_name, vstream->channel_name,
- vstream->tracefile_count, vstream->tracefile_count_current);
+ stream->tracefile_count,
+ vstream->current_tracefile_id);
if (ret < 0) {
- goto error;
+ goto end;
+ }
+ vstream->index_fd = stream_fd_create(ret);
+ if (vstream->index_fd) {
+ ret = 0;
+ } else {
+ if (close(ret)) {
+ PERROR("close");
+ }
+ ret = -1;
}
- vstream->index_read_fd = ret;
-
- ret = 0;
-
end:
-error:
return ret;
}
+
+void print_viewer_streams(void)
+{
+ struct lttng_ht_iter iter;
+ struct relay_viewer_stream *vstream;
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream,
+ stream_n.node) {
+ if (!viewer_stream_get(vstream)) {
+ continue;
+ }
+ DBG("vstream %p refcount %ld stream %" PRIu64 " trace %" PRIu64
+ " session %" PRIu64,
+ vstream,
+ vstream->ref.refcount,
+ vstream->stream->stream_handle,
+ vstream->stream->trace->id,
+ vstream->stream->trace->session->id);
+ viewer_stream_put(vstream);
+ }
+ rcu_read_unlock();
+}
diff --git a/src/bin/lttng-relayd/viewer-stream.h b/src/bin/lttng-relayd/viewer-stream.h
index 003b119..a19ffe0 100644
--- a/src/bin/lttng-relayd/viewer-stream.h
+++ b/src/bin/lttng-relayd/viewer-stream.h
@@ -1,6 +1,10 @@
+#ifndef _VIEWER_STREAM_H
+#define _VIEWER_STREAM_H
+
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -16,9 +20,6 @@
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef _VIEWER_STREAM_H
-#define _VIEWER_STREAM_H
-
#include <limits.h>
#include <inttypes.h>
#include <pthread.h>
@@ -33,57 +34,49 @@
struct relay_stream;
/*
- * Shadow copy of the relay_stream structure for the viewer side. The only
- * fields updated by the writer (streaming side) after allocation are :
- * total_index_received and close_flag. Everything else is updated by the
- * reader (viewer side).
+ * Shadow copy of the relay_stream structure for the viewer side.
*/
struct relay_viewer_stream {
- uint64_t stream_handle;
- uint64_t session_id;
- int read_fd;
- int index_read_fd;
+ struct urcu_ref ref;
+ pthread_mutex_t reflock;
+
+ /*
+ * This lock nests inside the stream lock.
+ */
+ pthread_mutex_t lock;
+
+ /* Back ref to stream */
+ struct relay_stream *stream;
+
+ /* FD from which to read the stream data. */
+ struct stream_fd *stream_fd;
+ /* FD from which to read the index data. */
+ struct stream_fd *index_fd;
+
char *path_name;
char *channel_name;
+
+ uint64_t current_tracefile_id;
uint64_t last_sent_index;
- uint64_t total_index_received;
- uint64_t tracefile_count;
- uint64_t tracefile_count_current;
- /* Stop after reading this tracefile. */
- uint64_t tracefile_count_last;
+
+ /* Indicates if this stream has been sent to a viewer client. */
+ bool sent_flag;
+ /* For metadata stream, how much metadata has been sent. */
+ uint64_t metadata_sent;
+
struct lttng_ht_node_u64 stream_n;
struct rcu_head rcu_node;
- struct ctf_trace *ctf_trace;
- /*
- * This lock blocks only when the writer is about to start overwriting
- * a file currently read by the reader.
- *
- * This is nested INSIDE the viewer_stream_rotation_lock.
- */
- pthread_mutex_t overwrite_lock;
- /* Information telling us if the stream is a metadata stream. */
- unsigned int metadata_flag:1;
- /*
- * Information telling us that the stream is closed in write, so
- * we don't expect new indexes and we can read up to EOF.
- */
- unsigned int close_write_flag:1;
- /*
- * If the streaming side closes a FD in use in the viewer side,
- * it sets this flag to inform that it is a normal error.
- */
- unsigned int abort_flag:1;
- /* Indicates if this stream has been sent to a viewer client. */
- unsigned int sent_flag:1;
};
struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
- enum lttng_viewer_seek seek_t, struct ctf_trace *ctf_trace);
-struct relay_viewer_stream *viewer_stream_find_by_id(uint64_t id);
-void viewer_stream_destroy(struct ctf_trace *ctf_trace,
- struct relay_viewer_stream *stream);
-void viewer_stream_delete(struct relay_viewer_stream *stream);
-int viewer_stream_rotate(struct relay_viewer_stream *vstream,
- struct relay_stream *stream);
+ enum lttng_viewer_seek seek_t);
+
+struct relay_viewer_stream *viewer_stream_get_by_id(uint64_t id);
+bool viewer_stream_get(struct relay_viewer_stream *vstream);
+void viewer_stream_put(struct relay_viewer_stream *vstream);
+int viewer_stream_rotate(struct relay_viewer_stream *vstream);
+bool viewer_stream_is_tracefile_id_readable(struct relay_viewer_stream *vstream,
+ uint64_t id);
+void print_viewer_streams(void);
#endif /* _VIEWER_STREAM_H */
diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c
index 87d5f34..0c5ffff 100644
--- a/src/bin/lttng-sessiond/consumer.c
+++ b/src/bin/lttng-sessiond/consumer.c
@@ -1077,11 +1077,8 @@ error:
}
/*
- * Ask the consumer if the data is ready to read (NOT pending) for the specific
- * session id.
- *
- * This function has a different behavior with the consumer i.e. that it waits
- * for a reply from the consumer if yes or no the data is pending.
+ * Ask the consumer if the data is pending for the specific session id.
+ * Returns 1 if data is pending, 0 otherwise, or < 0 on error.
*/
int consumer_is_data_pending(uint64_t session_id,
struct consumer_output *consumer)
diff --git a/src/bin/lttng-sessiond/ust-consumer.c b/src/bin/lttng-sessiond/ust-consumer.c
index bacbf92..4eaf341 100644
--- a/src/bin/lttng-sessiond/ust-consumer.c
+++ b/src/bin/lttng-sessiond/ust-consumer.c
@@ -245,14 +245,13 @@ int ust_consumer_ask_channel(struct ust_app_session *ua_sess,
}
pthread_mutex_lock(socket->lock);
-
ret = ask_channel_creation(ua_sess, ua_chan, consumer, socket, registry);
+ pthread_mutex_unlock(socket->lock);
if (ret < 0) {
goto error;
}
error:
- pthread_mutex_unlock(socket->lock);
return ret;
}
diff --git a/src/common/index/ctf-index.h b/src/common/index/ctf-index.h
index 0efa888..1f38d9a 100644
--- a/src/common/index/ctf-index.h
+++ b/src/common/index/ctf-index.h
@@ -44,7 +44,7 @@ struct ctf_packet_index_file_hdr {
} __attribute__((__packed__));
/*
- * Packet index generated for each trace packet store in a trace file.
+ * Packet index generated for each trace packet stored in a trace file.
* All integer fields are stored in big endian.
*/
struct ctf_packet_index {
diff --git a/src/common/index/index.c b/src/common/index/index.c
index 35cff53..3fce35d 100644
--- a/src/common/index/index.c
+++ b/src/common/index/index.c
@@ -59,6 +59,19 @@ int index_create_file(char *path_name, char *stream_name, int uid, int gid,
}
}
+ /*
+ * For tracefile rotation. We need to unlink the old
+ * file if present to synchronize with the tail of the
+ * live viewer which could be working on this same file.
+ * By doing so, any reference to the old index file
+ * stays valid even if we re-create a new file with the
+ * same name afterwards.
+ */
+ ret = utils_unlink_stream_file(fullpath, stream_name, size, count, uid,
+ gid, DEFAULT_INDEX_FILE_SUFFIX);
+ if (ret < 0 && errno != ENOENT) {
+ goto error;
+ }
ret = utils_create_stream_file(fullpath, stream_name, size, count, uid,
gid, DEFAULT_INDEX_FILE_SUFFIX);
if (ret < 0) {
diff --git a/src/common/utils.c b/src/common/utils.c
index 766f224..7681367 100644
--- a/src/common/utils.c
+++ b/src/common/utils.c
@@ -594,21 +594,18 @@ error:
}
/*
- * Create the stream tracefile on disk.
- *
- * Return 0 on success or else a negative value.
+ * path is the output parameter. It needs to be PATH_MAX len.
*/
-LTTNG_HIDDEN
-int utils_create_stream_file(const char *path_name, char *file_name, uint64_t size,
- uint64_t count, int uid, int gid, char *suffix)
+static int utils_stream_file_name(char *path,
+ const char *path_name, const char *file_name,
+ uint64_t size, uint64_t count,
+ const char *suffix)
{
- int ret, out_fd, flags, mode;
- char full_path[PATH_MAX], *path_name_suffix = NULL, *path;
+ int ret;
+ char full_path[PATH_MAX];
+ char *path_name_suffix = NULL;
char *extra = NULL;
- assert(path_name);
- assert(file_name);
-
ret = snprintf(full_path, sizeof(full_path), "%s/%s",
path_name, file_name);
if (ret < 0) {
@@ -639,9 +636,37 @@ int utils_create_stream_file(const char *path_name, char *file_name, uint64_t si
PERROR("Allocating path name with extra string");
goto error_free_suffix;
}
- path = path_name_suffix;
+ strncpy(path, path_name_suffix, PATH_MAX - 1);
+ path[PATH_MAX - 1] = '\0';
} else {
- path = full_path;
+ strncpy(path, full_path, PATH_MAX - 1);
+ }
+ path[PATH_MAX - 1] = '\0';
+ ret = 0;
+
+ free(path_name_suffix);
+error_free_suffix:
+ free(extra);
+error:
+ return ret;
+}
+
+/*
+ * Create the stream tracefile on disk.
+ *
+ * Return 0 on success or else a negative value.
+ */
+LTTNG_HIDDEN
+int utils_create_stream_file(const char *path_name, char *file_name, uint64_t size,
+ uint64_t count, int uid, int gid, char *suffix)
+{
+ int ret, out_fd, flags, mode;
+ char path[PATH_MAX];
+
+ ret = utils_stream_file_name(path, path_name, file_name,
+ size, count, suffix);
+ if (ret < 0) {
+ goto error;
}
flags = O_WRONLY | O_CREAT | O_TRUNC;
@@ -655,19 +680,49 @@ int utils_create_stream_file(const char *path_name, char *file_name, uint64_t si
}
if (out_fd < 0) {
PERROR("open stream path %s", path);
- goto error_open;
+ goto error;
}
ret = out_fd;
-error_open:
- free(path_name_suffix);
-error_free_suffix:
- free(extra);
error:
return ret;
}
/*
+ * Unlink the stream tracefile from disk.
+ *
+ * Return 0 on success or else a negative value.
+ */
+LTTNG_HIDDEN
+int utils_unlink_stream_file(const char *path_name, char *file_name, uint64_t size,
+ uint64_t count, int uid, int gid, char *suffix)
+{
+ int ret;
+ char path[PATH_MAX];
+
+ ret = utils_stream_file_name(path, path_name, file_name,
+ size, count, suffix);
+ if (ret < 0) {
+ goto error;
+ }
+ if (uid < 0 || gid < 0) {
+ ret = unlink(path);
+ } else {
+ ret = run_as_unlink(path, uid, gid);
+ if (ret < 0) {
+ errno = -ret;
+ ret = -1;
+ }
+ }
+ if (ret < 0) {
+ goto error;
+ }
+error:
+ DBG("utils_unlink_stream_file %s returns %d", path, ret);
+ return ret;
+}
+
+/*
* Change the output tracefile according to the given size and count The
* new_count pointer is set during this operation.
*
@@ -693,7 +748,25 @@ int utils_rotate_stream_file(char *path_name, char *file_name, uint64_t size,
}
if (count > 0) {
+ /*
+ * In tracefile rotation, for the relay daemon we need
+ * to unlink the old file is present, because it may
+ * still be open in reading by the live thread, and we
+ * need to ensure that we do not overwrite the content
+ * between get_index and get_packet. Since we have no
+ * way to verify integrity of the data content compared
+ * to the associated index, we need to ensure the reader
+ * has exclusive access to the file content, and that
+ * the open of the data file is performed in get_index.
+ * Unlinking the old file rather than overwriting it
+ * achieves this.
+ */
*new_count = (*new_count + 1) % count;
+ ret = utils_unlink_stream_file(path_name, file_name,
+ size, *new_count, uid, gid, 0);
+ if (ret < 0 && errno != ENOENT) {
+ goto error;
+ }
} else {
(*new_count)++;
}
diff --git a/src/common/utils.h b/src/common/utils.h
index 05914cc..8bc5a3f 100644
--- a/src/common/utils.h
+++ b/src/common/utils.h
@@ -40,6 +40,8 @@ int utils_create_pid_file(pid_t pid, const char *filepath);
int utils_mkdir_recursive(const char *path, mode_t mode);
int utils_create_stream_file(const char *path_name, char *file_name, uint64_t size,
uint64_t count, int uid, int gid, char *suffix);
+int utils_unlink_stream_file(const char *path_name, char *file_name, uint64_t size,
+ uint64_t count, int uid, int gid, char *suffix);
int utils_rotate_stream_file(char *path_name, char *file_name, uint64_t size,
uint64_t count, int uid, int gid, int out_fd, uint64_t *new_count,
int *stream_fd);
--
2.1.4
More information about the lttng-dev
mailing list