[lttng-dev] [RFC PATCH lttng-tools] Fix: Relay daemon ownership and reference counting
Mathieu Desnoyers
mathieu.desnoyers at efficios.com
Thu Aug 6 18:06:16 EDT 2015
The ownership and reference counting of the relay daemon is unclear and
buggy in many ways. It is the cause of memory corruptions, double-free,
leaks, segmentation faults, observed in various conditions.
Fix this situation by introducing a clear ownership and reference
counting scheme for this daemon.
See doc/relayd-architecture.txt for details.
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
---
src/bin/lttng-relayd/Makefile.am | 4 +-
src/bin/lttng-relayd/cmd-2-1.c | 17 +-
src/bin/lttng-relayd/cmd-2-1.h | 10 +-
src/bin/lttng-relayd/cmd-2-2.c | 22 +-
src/bin/lttng-relayd/cmd-2-2.h | 10 +-
src/bin/lttng-relayd/cmd-2-4.c | 19 +-
src/bin/lttng-relayd/cmd-2-4.h | 10 +-
src/bin/lttng-relayd/cmd-generic.c | 1 +
src/bin/lttng-relayd/cmd-generic.h | 7 +-
src/bin/lttng-relayd/cmd.h | 7 +-
src/bin/lttng-relayd/connection.c | 128 +++--
src/bin/lttng-relayd/connection.h | 63 ++-
src/bin/lttng-relayd/ctf-trace.c | 185 ++++---
src/bin/lttng-relayd/ctf-trace.h | 60 +-
src/bin/lttng-relayd/index.c | 343 ++++++++----
src/bin/lttng-relayd/index.h | 59 +-
src/bin/lttng-relayd/live.c | 833 ++++++++++++----------------
src/bin/lttng-relayd/live.h | 10 +-
src/bin/lttng-relayd/lttng-relayd.h | 23 +-
src/bin/lttng-relayd/main.c | 1011 +++++++++++++---------------------
src/bin/lttng-relayd/session.c | 191 ++++---
src/bin/lttng-relayd/session.h | 115 ++--
src/bin/lttng-relayd/stream.c | 319 ++++++++---
src/bin/lttng-relayd/stream.h | 129 +++--
src/bin/lttng-relayd/utils.h | 7 +-
src/bin/lttng-relayd/viewer-stream.c | 293 +++++-----
src/bin/lttng-relayd/viewer-stream.h | 81 ++-
src/bin/lttng-sessiond/consumer.c | 7 +-
28 files changed, 2026 insertions(+), 1938 deletions(-)
diff --git a/src/bin/lttng-relayd/Makefile.am b/src/bin/lttng-relayd/Makefile.am
index 126cdaf..428f352 100644
--- a/src/bin/lttng-relayd/Makefile.am
+++ b/src/bin/lttng-relayd/Makefile.am
@@ -17,7 +17,9 @@ lttng_relayd_SOURCES = main.c lttng-relayd.h utils.h utils.c cmd.h \
viewer-stream.h viewer-stream.c \
session.c session.h \
stream.c stream.h \
- connection.c connection.h
+ stream-fd.c stream-fd.h \
+ connection.c connection.h \
+ viewer-session.c viewer-session.h
# link on liblttngctl for check if relayd is already alive.
lttng_relayd_LDADD = -lrt -lurcu-common -lurcu \
diff --git a/src/bin/lttng-relayd/cmd-2-1.c b/src/bin/lttng-relayd/cmd-2-1.c
index 0cd9b5a..a2c340f 100644
--- a/src/bin/lttng-relayd/cmd-2-1.c
+++ b/src/bin/lttng-relayd/cmd-2-1.c
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -28,30 +29,30 @@
#include "cmd-2-1.h"
#include "utils.h"
+/*
+ * cmd_recv_stream_2_1 allocates path_name and channel_name.
+ */
int cmd_recv_stream_2_1(struct relay_connection *conn,
- struct relay_stream *stream)
+ char **path_name, char **channel_name)
{
int ret;
struct lttcomm_relayd_add_stream stream_info;
- assert(conn);
- assert(stream);
-
ret = cmd_recv(conn->sock, &stream_info, sizeof(stream_info));
if (ret < 0) {
ERR("Unable to recv stream version 2.1");
goto error;
}
- stream->path_name = create_output_path(stream_info.pathname);
- if (stream->path_name == NULL) {
+ *path_name = create_output_path(stream_info.pathname);
+ if (*path_name == NULL) {
PERROR("Path name allocation");
ret = -ENOMEM;
goto error;
}
- stream->channel_name = strdup(stream_info.channel_name);
- if (stream->channel_name == NULL) {
+ *channel_name = strdup(stream_info.channel_name);
+ if (*channel_name == NULL) {
ret = -errno;
PERROR("Path name allocation");
goto error;
diff --git a/src/bin/lttng-relayd/cmd-2-1.h b/src/bin/lttng-relayd/cmd-2-1.h
index bab8190..46283dc 100644
--- a/src/bin/lttng-relayd/cmd-2-1.h
+++ b/src/bin/lttng-relayd/cmd-2-1.h
@@ -1,6 +1,10 @@
+#ifndef RELAYD_CMD_2_1_H
+#define RELAYD_CMD_2_1_H
+
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -16,13 +20,9 @@
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef RELAYD_CMD_2_1_H
-#define RELAYD_CMD_2_1_H
-
#include "lttng-relayd.h"
-#include "stream.h"
int cmd_recv_stream_2_1(struct relay_connection *conn,
- struct relay_stream *stream);
+ char **path_name, char **channel_name);
#endif /* RELAYD_CMD_2_1_H */
diff --git a/src/bin/lttng-relayd/cmd-2-2.c b/src/bin/lttng-relayd/cmd-2-2.c
index 7dd99ad..1493b73 100644
--- a/src/bin/lttng-relayd/cmd-2-2.c
+++ b/src/bin/lttng-relayd/cmd-2-2.c
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -30,37 +31,38 @@
#include "cmd-2-1.h"
#include "utils.h"
+/*
+ * cmd_recv_stream_2_2 allocates path_name and channel_name.
+ */
int cmd_recv_stream_2_2(struct relay_connection *conn,
- struct relay_stream *stream)
+ char **path_name, char **channel_name,
+ uint64_t *tracefile_size, uint64_t *tracefile_count)
{
int ret;
struct lttcomm_relayd_add_stream_2_2 stream_info;
- assert(conn);
- assert(stream);
-
ret = cmd_recv(conn->sock, &stream_info, sizeof(stream_info));
if (ret < 0) {
ERR("Unable to recv stream version 2.2");
goto error;
}
- stream->path_name = create_output_path(stream_info.pathname);
- if (stream->path_name == NULL) {
+ *path_name = create_output_path(stream_info.pathname);
+ if (*path_name == NULL) {
PERROR("Path name allocation");
ret = -ENOMEM;
goto error;
}
- stream->channel_name = strdup(stream_info.channel_name);
- if (stream->channel_name == NULL) {
+ *channel_name = strdup(stream_info.channel_name);
+ if (*channel_name == NULL) {
ret = -errno;
PERROR("Path name allocation");
goto error;
}
- stream->tracefile_size = be64toh(stream_info.tracefile_size);
- stream->tracefile_count = be64toh(stream_info.tracefile_count);
+ *tracefile_size = be64toh(stream_info.tracefile_size);
+ *tracefile_count = be64toh(stream_info.tracefile_count);
ret = 0;
error:
diff --git a/src/bin/lttng-relayd/cmd-2-2.h b/src/bin/lttng-relayd/cmd-2-2.h
index bd1cd14..894a63a 100644
--- a/src/bin/lttng-relayd/cmd-2-2.h
+++ b/src/bin/lttng-relayd/cmd-2-2.h
@@ -1,6 +1,10 @@
+#ifndef RELAYD_CMD_2_2_H
+#define RELAYD_CMD_2_2_H
+
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -16,12 +20,10 @@
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef RELAYD_CMD_2_2_H
-#define RELAYD_CMD_2_2_H
-
#include "lttng-relayd.h"
int cmd_recv_stream_2_2(struct relay_connection *conn,
- struct relay_stream *stream);
+ char **path_name, char **channel_name,
+ uint64_t *tracefile_size, uint64_t *tracefile_count);
#endif /* RELAYD_CMD_2_2_H */
diff --git a/src/bin/lttng-relayd/cmd-2-4.c b/src/bin/lttng-relayd/cmd-2-4.c
index d8aa737..a3290cb 100644
--- a/src/bin/lttng-relayd/cmd-2-4.c
+++ b/src/bin/lttng-relayd/cmd-2-4.c
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -30,26 +31,24 @@
#include "lttng-relayd.h"
int cmd_create_session_2_4(struct relay_connection *conn,
- struct relay_session *session)
+ char *session_name, char *hostname,
+ uint32_t *live_timer, bool *snapshot)
{
int ret;
struct lttcomm_relayd_create_session_2_4 session_info;
- assert(conn);
- assert(session);
-
ret = cmd_recv(conn->sock, &session_info, sizeof(session_info));
if (ret < 0) {
ERR("Unable to recv session info version 2.4");
goto error;
}
- strncpy(session->session_name, session_info.session_name,
- sizeof(session->session_name));
- strncpy(session->hostname, session_info.hostname,
- sizeof(session->hostname));
- session->live_timer = be32toh(session_info.live_timer);
- session->snapshot = be32toh(session_info.snapshot);
+ strncpy(session_name, session_info.session_name,
+ sizeof(session_info.session_name));
+ strncpy(hostname, session_info.hostname,
+ sizeof(session_info.hostname));
+ *live_timer = be32toh(session_info.live_timer);
+ *snapshot = be32toh(session_info.snapshot);
ret = 0;
diff --git a/src/bin/lttng-relayd/cmd-2-4.h b/src/bin/lttng-relayd/cmd-2-4.h
index aaf572a..ab73478 100644
--- a/src/bin/lttng-relayd/cmd-2-4.h
+++ b/src/bin/lttng-relayd/cmd-2-4.h
@@ -1,6 +1,10 @@
+#ifndef RELAYD_CMD_2_4_H
+#define RELAYD_CMD_2_4_H
+
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -16,12 +20,10 @@
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef RELAYD_CMD_2_4_H
-#define RELAYD_CMD_2_4_H
-
#include "lttng-relayd.h"
int cmd_create_session_2_4(struct relay_connection *conn,
- struct relay_session *session);
+ char *session_name, char *hostname,
+ uint32_t *live_timer, bool *snapshot);
#endif /* RELAYD_CMD_2_4_H */
diff --git a/src/bin/lttng-relayd/cmd-generic.c b/src/bin/lttng-relayd/cmd-generic.c
index 417d6d3..276e85b 100644
--- a/src/bin/lttng-relayd/cmd-generic.c
+++ b/src/bin/lttng-relayd/cmd-generic.c
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
diff --git a/src/bin/lttng-relayd/cmd-generic.h b/src/bin/lttng-relayd/cmd-generic.h
index 640fed7..4551f0a 100644
--- a/src/bin/lttng-relayd/cmd-generic.h
+++ b/src/bin/lttng-relayd/cmd-generic.h
@@ -1,6 +1,10 @@
+#ifndef RELAYD_CMD_GENERIC_H
+#define RELAYD_CMD_GENERIC_H
+
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -16,9 +20,6 @@
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef RELAYD_CMD_GENERIC_H
-#define RELAYD_CMD_GENERIC_H
-
#include <common/sessiond-comm/sessiond-comm.h>
#include "connection.h"
diff --git a/src/bin/lttng-relayd/cmd.h b/src/bin/lttng-relayd/cmd.h
index c8b37d5..88db09a 100644
--- a/src/bin/lttng-relayd/cmd.h
+++ b/src/bin/lttng-relayd/cmd.h
@@ -1,6 +1,10 @@
+#ifndef RELAYD_CMD_H
+#define RELAYD_CMD_H
+
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -16,9 +20,6 @@
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef RELAYD_CMD_H
-#define RELAYD_CMD_H
-
#include "cmd-generic.h"
#include "cmd-2-1.h"
#include "cmd-2-2.h"
diff --git a/src/bin/lttng-relayd/connection.c b/src/bin/lttng-relayd/connection.c
index 76e48a6..b425548 100644
--- a/src/bin/lttng-relayd/connection.c
+++ b/src/bin/lttng-relayd/connection.c
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -19,91 +20,132 @@
#define _GNU_SOURCE
#define _LGPL_SOURCE
#include <common/common.h>
+#include <urcu/rculist.h>
#include "connection.h"
#include "stream.h"
+#include "viewer-session.h"
-static void rcu_free_connection(struct rcu_head *head)
+bool connection_get(struct relay_connection *conn)
{
- struct relay_connection *conn =
- caa_container_of(head, struct relay_connection, rcu_node);
+ bool has_ref = false;
- lttcomm_destroy_sock(conn->sock);
- connection_free(conn);
+ pthread_mutex_lock(&conn->reflock);
+ if (conn->ref.refcount != 0) {
+ has_ref = true;
+ urcu_ref_get(&conn->ref);
+ }
+ pthread_mutex_unlock(&conn->reflock);
+
+ return has_ref;
}
-/*
- * Must be called with a read side lock held. The read side lock must be
- * kept until the returned relay_connection is no longer in use.
- */
-struct relay_connection *connection_find_by_sock(struct lttng_ht *ht, int sock)
+struct relay_connection *connection_get_by_sock(struct lttng_ht *relay_connections_ht,
+ int sock)
{
struct lttng_ht_node_ulong *node;
struct lttng_ht_iter iter;
struct relay_connection *conn = NULL;
- assert(ht);
assert(sock >= 0);
- lttng_ht_lookup(ht, (void *)((unsigned long) sock), &iter);
+ rcu_read_lock();
+ lttng_ht_lookup(relay_connections_ht, (void *)((unsigned long) sock),
+ &iter);
node = lttng_ht_iter_get_node_ulong(&iter);
if (!node) {
DBG2("Relay connection by sock %d not found", sock);
goto end;
}
conn = caa_container_of(node, struct relay_connection, sock_n);
-
+ if (!connection_get(conn)) {
+ conn = NULL;
+ }
end:
+ rcu_read_unlock();
return conn;
}
-void connection_delete(struct lttng_ht *ht, struct relay_connection *conn)
+struct relay_connection *connection_create(struct lttcomm_sock *sock,
+ enum connection_type type)
{
- int ret;
- struct lttng_ht_iter iter;
-
- assert(ht);
- assert(conn);
+ struct relay_connection *conn;
- iter.iter.node = &conn->sock_n.node;
- ret = lttng_ht_del(ht, &iter);
- assert(!ret);
+ conn = zmalloc(sizeof(*conn));
+ if (!conn) {
+ PERROR("zmalloc relay connection");
+ goto end;
+ }
+ pthread_mutex_init(&conn->lock, NULL);
+ pthread_mutex_init(&conn->reflock, NULL);
+ urcu_ref_init(&conn->ref);
+ conn->type = type;
+ conn->sock = sock;
+ lttng_ht_node_init_ulong(&conn->sock_n, (unsigned long) conn->sock->fd);
+end:
+ return conn;
}
-void connection_destroy(struct relay_connection *conn)
+static void rcu_free_connection(struct rcu_head *head)
{
- assert(conn);
+ struct relay_connection *conn =
+ caa_container_of(head, struct relay_connection, rcu_node);
+ lttcomm_destroy_sock(conn->sock);
+ if (conn->viewer_session) {
+ viewer_session_destroy(conn->viewer_session);
+ conn->viewer_session = NULL;
+ }
+ free(conn);
+}
+
+static void destroy_connection(struct relay_connection *conn)
+{
call_rcu(&conn->rcu_node, rcu_free_connection);
}
-struct relay_connection *connection_create(void)
+static void connection_release(struct urcu_ref *ref)
{
- struct relay_connection *conn;
+ struct relay_connection *conn =
+ caa_container_of(ref, struct relay_connection, ref);
- conn = zmalloc(sizeof(*conn));
- if (!conn) {
- PERROR("zmalloc relay connection");
- goto error;
+ if (conn->in_socket_ht) {
+ struct lttng_ht_iter iter;
+ int ret;
+
+ iter.iter.node = &conn->sock_n.node;
+ ret = lttng_ht_del(conn->socket_ht, &iter);
+ assert(!ret);
}
-error:
- return conn;
+ if (conn->session) {
+ if (session_close(conn->session)) {
+ ERR("session_close");
+ }
+ conn->session = NULL;
+ }
+ if (conn->viewer_session) {
+ viewer_session_close(conn->viewer_session);
+ }
+ destroy_connection(conn);
}
-void connection_init(struct relay_connection *conn)
+void connection_put(struct relay_connection *conn)
{
- assert(conn);
- assert(conn->sock);
-
- CDS_INIT_LIST_HEAD(&conn->recv_head);
- lttng_ht_node_init_ulong(&conn->sock_n, (unsigned long) conn->sock->fd);
+ rcu_read_lock();
+ pthread_mutex_lock(&conn->reflock);
+ urcu_ref_put(&conn->ref, connection_release);
+ pthread_mutex_unlock(&conn->reflock);
+ rcu_read_unlock();
}
-void connection_free(struct relay_connection *conn)
+void connection_ht_add(struct lttng_ht *relay_connections_ht,
+ struct relay_connection *conn)
{
- assert(conn);
-
- free(conn->viewer_session);
- free(conn);
+ pthread_mutex_lock(&conn->lock);
+ assert(!conn->in_socket_ht);
+ lttng_ht_add_unique_ulong(relay_connections_ht, &conn->sock_n);
+ conn->in_socket_ht = 1;
+ conn->socket_ht = relay_connections_ht;
+ pthread_mutex_unlock(&conn->lock);
}
diff --git a/src/bin/lttng-relayd/connection.h b/src/bin/lttng-relayd/connection.h
index 70fe4ba..1fff515 100644
--- a/src/bin/lttng-relayd/connection.h
+++ b/src/bin/lttng-relayd/connection.h
@@ -1,6 +1,10 @@
+#ifndef _CONNECTION_H
+#define _CONNECTION_H
+
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -16,9 +20,6 @@
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef _CONNECTION_H
-#define _CONNECTION_H
-
#include <limits.h>
#include <inttypes.h>
#include <pthread.h>
@@ -32,6 +33,7 @@
#include "session.h"
enum connection_type {
+ RELAY_CONNECTION_UNKNOWN = 0,
RELAY_DATA = 1,
RELAY_CONTROL = 2,
RELAY_VIEWER_COMMAND = 3,
@@ -44,36 +46,49 @@ enum connection_type {
*/
struct relay_connection {
struct lttcomm_sock *sock;
- struct relay_session *session;
- struct relay_viewer_session *viewer_session;
struct cds_wfcq_node qnode;
- struct lttng_ht_node_ulong sock_n;
- struct rcu_head rcu_node;
+
enum connection_type type;
- /* Protocol version to use for this connection. */
+ /*
+ * session is only ever set for RELAY_CONTROL connection type.
+ */
+ struct relay_session *session;
+ /*
+ * viewer_session is only ever set for RELAY_VIEWER_COMMAND
+ * connection type.
+ */
+ struct relay_viewer_session *viewer_session;
+
+ /*
+ * Protocol version to use for this connection. Only valid for
+ * RELAY_CONTROL connection type.
+ */
uint32_t major;
uint32_t minor;
- uint64_t session_id;
+
+ struct urcu_ref ref;
+ pthread_mutex_t reflock;
+
+ pthread_mutex_t lock;
+
+ bool version_check_done;
/*
- * This contains streams that are received on that connection. It's used to
- * store them until we get the streams sent command where they are removed
- * and flagged ready for the viewer. This is ONLY used by the control
- * thread thus any action on it should happen in that thread.
+ * Node member of connection within global socket hash table.
*/
- struct cds_list_head recv_head;
- unsigned int version_check_done:1;
-
- /* Pointer to the sessions HT that this connection can use. */
- struct lttng_ht *sessions_ht;
+ struct lttng_ht_node_ulong sock_n;
+ bool in_socket_ht;
+ struct lttng_ht *socket_ht; /* HACK: Contained within this hash table. */
+ struct rcu_head rcu_node; /* For call_rcu teardown. */
};
-struct relay_connection *connection_find_by_sock(struct lttng_ht *ht,
+struct relay_connection *connection_create(struct lttcomm_sock *sock,
+ enum connection_type type);
+struct relay_connection *connection_get_by_sock(struct lttng_ht *relay_connections_ht,
int sock);
-struct relay_connection *connection_create(void);
-void connection_init(struct relay_connection *conn);
-void connection_delete(struct lttng_ht *ht, struct relay_connection *conn);
-void connection_destroy(struct relay_connection *conn);
-void connection_free(struct relay_connection *conn);
+bool connection_get(struct relay_connection *connection);
+void connection_put(struct relay_connection *connection);
+void connection_ht_add(struct lttng_ht *relay_connections_ht,
+ struct relay_connection *conn);
#endif /* _CONNECTION_H */
diff --git a/src/bin/lttng-relayd/ctf-trace.c b/src/bin/lttng-relayd/ctf-trace.c
index 02a8b2b..df96460 100644
--- a/src/bin/lttng-relayd/ctf-trace.c
+++ b/src/bin/lttng-relayd/ctf-trace.c
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -22,148 +23,174 @@
#include <common/common.h>
#include <common/utils.h>
+#include <urcu/rculist.h>
#include "ctf-trace.h"
#include "lttng-relayd.h"
#include "stream.h"
static uint64_t last_relay_ctf_trace_id;
+static pthread_mutex_t last_relay_ctf_trace_id_lock = PTHREAD_MUTEX_INITIALIZER;
-static void rcu_destroy_ctf_trace(struct rcu_head *head)
+static void rcu_destroy_ctf_trace(struct rcu_head *rcu_head)
{
- struct lttng_ht_node_str *node =
- caa_container_of(head, struct lttng_ht_node_str, head);
- struct ctf_trace *trace=
- caa_container_of(node, struct ctf_trace, node);
+ struct ctf_trace *trace =
+ caa_container_of(rcu_head, struct ctf_trace, rcu_node);
free(trace);
}
-static void rcu_destroy_stream(struct rcu_head *head)
-{
- struct relay_stream *stream =
- caa_container_of(head, struct relay_stream, rcu_node);
-
- stream_destroy(stream);
-}
-
/*
* Destroy a ctf trace and all stream contained in it.
*
* MUST be called with the RCU read side lock.
*/
-void ctf_trace_destroy(struct ctf_trace *obj)
+void ctf_trace_destroy(struct ctf_trace *trace)
{
- struct relay_stream *stream, *tmp_stream;
-
- assert(obj);
/*
- * Getting to this point, every stream referenced to that object have put
- * back their ref since the've been closed by the control side.
+ * Getting to this point, every stream referenced by that traceect
+ * have put back their ref since the've been closed by the
+ * control side.
*/
- assert(!obj->refcount);
+ assert(cds_list_empty(&trace->stream_list));
+ session_put(trace->session);
+ trace->session = NULL;
+ call_rcu(&trace->rcu_node, rcu_destroy_ctf_trace);
+}
- cds_list_for_each_entry_safe(stream, tmp_stream, &obj->stream_list,
- trace_list) {
- stream_delete(relay_streams_ht, stream);
- call_rcu(&stream->rcu_node, rcu_destroy_stream);
- }
+void ctf_trace_release(struct urcu_ref *ref)
+{
+ struct ctf_trace *trace =
+ caa_container_of(ref, struct ctf_trace, ref);
+ int ret;
+ struct lttng_ht_iter iter;
- call_rcu(&obj->node.head, rcu_destroy_ctf_trace);
+ iter.iter.node = &trace->node.node;
+ ret = lttng_ht_del(trace->session->ctf_traces_ht, &iter);
+ assert(!ret);
+ ctf_trace_destroy(trace);
}
-void ctf_trace_try_destroy(struct relay_session *session,
- struct ctf_trace *ctf_trace)
+/*
+ * Should be called with RCU read-side lock held.
+ */
+bool ctf_trace_get(struct ctf_trace *trace)
{
- assert(session);
- assert(ctf_trace);
+ bool has_ref = false;
- /*
- * Considering no viewer attach to the session and the trace having no more
- * stream attached, wipe the trace.
- */
- if (uatomic_read(&session->viewer_refcount) == 0 &&
- uatomic_read(&ctf_trace->refcount) == 0) {
- ctf_trace_delete(session->ctf_traces_ht, ctf_trace);
- ctf_trace_destroy(ctf_trace);
+ /* Confirm that the trace refcount has not reached 0. */
+ pthread_mutex_lock(&trace->reflock);
+ if (trace->ref.refcount != 0) {
+ has_ref = true;
+ urcu_ref_get(&trace->ref);
}
+ pthread_mutex_unlock(&trace->reflock);
+
+ return has_ref;
}
/*
- * Create and return an allocated ctf_trace object. NULL on error.
+ * Create and return an allocated ctf_trace. NULL on error.
+ * There is no "open" and "close" for a ctf_trace, but rather just a
+ * create and refcounting. Whenever all the streams belonging to a trace
+ * put their reference, its refcount drops to 0.
*/
-struct ctf_trace *ctf_trace_create(char *path_name)
+static struct ctf_trace *ctf_trace_create(struct relay_session *session,
+ char *path_name)
{
- struct ctf_trace *obj;
-
- assert(path_name);
+ struct ctf_trace *trace;
- obj = zmalloc(sizeof(*obj));
- if (!obj) {
+ trace = zmalloc(sizeof(*trace));
+ if (!trace) {
PERROR("ctf_trace alloc");
goto error;
}
- CDS_INIT_LIST_HEAD(&obj->stream_list);
+ if (!session_get(session)) {
+ ERR("Cannot get session");
+ free(trace);
+ trace = NULL;
+ goto error;
+ }
+ trace->session = session;
+
+ CDS_INIT_LIST_HEAD(&trace->stream_list);
- obj->id = ++last_relay_ctf_trace_id;
- lttng_ht_node_init_str(&obj->node, path_name);
+ pthread_mutex_lock(&last_relay_ctf_trace_id_lock);
+ trace->id = ++last_relay_ctf_trace_id;
+ pthread_mutex_unlock(&last_relay_ctf_trace_id_lock);
- DBG("Created ctf_trace %" PRIu64 " with path: %s", obj->id, path_name);
+ lttng_ht_node_init_str(&trace->node, path_name);
+ trace->session = session;
+ urcu_ref_init(&trace->ref);
+ pthread_mutex_init(&trace->lock, NULL);
+ pthread_mutex_init(&trace->reflock, NULL);
+ pthread_mutex_init(&trace->stream_list_lock, NULL);
+ lttng_ht_add_str(session->ctf_traces_ht, &trace->node);
+
+ DBG("Created ctf_trace %" PRIu64 " with path: %s", trace->id, path_name);
error:
- return obj;
+ return trace;
}
/*
- * Return a ctf_trace object if found by id in the given hash table else NULL.
- *
- * Must be called with rcu_read_lock() taken.
+ * Return a ctf_trace if found by id in the given hash table else NULL.
+ * Hold a reference on the ctf_trace, and must be paired with
+ * ctf_trace_put().
*/
-struct ctf_trace *ctf_trace_find_by_path(struct lttng_ht *ht,
+struct ctf_trace *ctf_trace_get_by_path_or_create(struct relay_session *session,
char *path_name)
{
struct lttng_ht_node_str *node;
struct lttng_ht_iter iter;
struct ctf_trace *trace = NULL;
- assert(ht);
-
- lttng_ht_lookup(ht, (void *) path_name, &iter);
+ rcu_read_lock();
+ lttng_ht_lookup(session->ctf_traces_ht, (void *) path_name, &iter);
node = lttng_ht_iter_get_node_str(&iter);
if (!node) {
DBG("CTF Trace path %s not found", path_name);
goto end;
}
trace = caa_container_of(node, struct ctf_trace, node);
-
+ if (!ctf_trace_get(trace)) {
+ trace = NULL;
+ }
end:
+ rcu_read_unlock();
+ if (!trace) {
+ /* Try to create */
+ trace = ctf_trace_create(session, path_name);
+ }
return trace;
}
-/*
- * Add stream to a given hash table.
- */
-void ctf_trace_add(struct lttng_ht *ht, struct ctf_trace *trace)
+void ctf_trace_put(struct ctf_trace *trace)
{
- assert(ht);
- assert(trace);
-
- lttng_ht_add_str(ht, &trace->node);
+ rcu_read_lock();
+ pthread_mutex_lock(&trace->reflock);
+ urcu_ref_put(&trace->ref, ctf_trace_release);
+ pthread_mutex_unlock(&trace->reflock);
+ rcu_read_unlock();
}
-/*
- * Delete stream from a given hash table.
- */
-void ctf_trace_delete(struct lttng_ht *ht, struct ctf_trace *trace)
+int ctf_trace_close(struct ctf_trace *trace)
{
- int ret;
- struct lttng_ht_iter iter;
-
- assert(ht);
- assert(trace);
-
- iter.iter.node = &trace->node.node;
- ret = lttng_ht_del(ht, &iter);
- assert(!ret);
+ struct relay_stream *stream;
+
+ rcu_read_lock();
+ cds_list_for_each_entry_rcu(stream, &trace->stream_list,
+ stream_node) {
+ /*
+ * Close the stream.
+ */
+ stream_close(stream);
+ }
+ rcu_read_unlock();
+ /*
+ * Since all references to the trace are held by its streams, we
+ * don't need to do any self-ref put.
+ */
+ return 0;
}
diff --git a/src/bin/lttng-relayd/ctf-trace.h b/src/bin/lttng-relayd/ctf-trace.h
index 489c5f1..edc7053 100644
--- a/src/bin/lttng-relayd/ctf-trace.h
+++ b/src/bin/lttng-relayd/ctf-trace.h
@@ -1,6 +1,10 @@
+#ifndef _CTF_TRACE_H
+#define _CTF_TRACE_H
+
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -16,10 +20,8 @@
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef _CTF_TRACE_H
-#define _CTF_TRACE_H
-
#include <inttypes.h>
+#include <urcu/ref.h>
#include <common/hashtable/hashtable.h>
@@ -27,38 +29,44 @@
#include "stream.h"
struct ctf_trace {
- int refcount;
- unsigned int invalid_flag:1;
+ /*
+ * The ctf_trace reflock nests inside the stream reflock.
+ */
+ pthread_mutex_t reflock; /* Protects refcounting */
+ struct urcu_ref ref; /* Every stream has a ref on the trace. */
+ struct relay_session *session; /* Back ref to trace session */
+
+ /*
+ * The ctf_trace lock nests inside the session lock.
+ */
+ pthread_mutex_t lock;
uint64_t id;
uint64_t metadata_received;
uint64_t metadata_sent;
struct relay_stream *metadata_stream;
struct relay_viewer_stream *viewer_metadata_stream;
- /* Node indexed by stream path name in the corresponding session. */
- struct lttng_ht_node_str node;
- /* Relay stream associated with this ctf trace. */
+ /*
+ * Relay streams associated with this ctf trace.
+ * Updates are protected by the stream_list lock.
+ * Traversals are protected by RCU.
+ */
struct cds_list_head stream_list;
-};
-
-static inline void ctf_trace_get_ref(struct ctf_trace *trace)
-{
- uatomic_inc(&trace->refcount);
-}
+ pthread_mutex_t stream_list_lock;
-static inline void ctf_trace_put_ref(struct ctf_trace *trace)
-{
- uatomic_add(&trace->refcount, -1);
-}
+ /*
+ * Node within session trace hash table. Node is indexed by
+ * stream path name.
+ */
+ struct lttng_ht_node_str node;
+ struct rcu_head rcu_node; /* For call_rcu teardown. */
+};
-void ctf_trace_assign(struct relay_stream *stream);
-struct ctf_trace *ctf_trace_create(char *path_name);
-void ctf_trace_destroy(struct ctf_trace *obj);
-void ctf_trace_try_destroy(struct relay_session *session,
- struct ctf_trace *ctf_trace);
-struct ctf_trace *ctf_trace_find_by_path(struct lttng_ht *ht,
+struct ctf_trace *ctf_trace_get_by_path_or_create(struct relay_session *session,
char *path_name);
-void ctf_trace_add(struct lttng_ht *ht, struct ctf_trace *trace);
-void ctf_trace_delete(struct lttng_ht *ht, struct ctf_trace *trace);
+bool ctf_trace_get(struct ctf_trace *trace);
+void ctf_trace_put(struct ctf_trace *trace);
+
+int ctf_trace_close(struct ctf_trace *trace);
#endif /* _CTF_TRACE_H */
diff --git a/src/bin/lttng-relayd/index.c b/src/bin/lttng-relayd/index.c
index b7507a0..99715c2 100644
--- a/src/bin/lttng-relayd/index.c
+++ b/src/bin/lttng-relayd/index.c
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -24,190 +25,308 @@
#include <common/utils.h>
#include "lttng-relayd.h"
+#include "stream.h"
#include "index.h"
/*
- * Deferred free of a relay index object. MUST only be called by a call RCU.
+ * Allocate a new relay index object. Pass the stream in which it is
+ * contained as parameter. The sequence number will be used as the hash
+ * table key.
+ *
+ * Called with stream mutex held.
+ * Return allocated object or else NULL on error.
*/
-static void deferred_free_relay_index(struct rcu_head *head)
+static struct relay_index *relay_index_create(struct relay_stream *stream,
+ uint64_t net_seq_num)
{
- struct relay_index *index =
- caa_container_of(head, struct relay_index, rcu_node);
+ struct relay_index *index;
- if (index->to_close_fd >= 0) {
- int ret;
+ DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64,
+ stream->stream_handle, net_seq_num);
- ret = close(index->to_close_fd);
- if (ret < 0) {
- PERROR("Relay index to close fd %d", index->to_close_fd);
- }
+ index = zmalloc(sizeof(*index));
+ if (!index) {
+ PERROR("Relay index zmalloc");
+ goto end;
+ }
+ if (!stream_get(stream)) {
+ ERR("Cannot get stream");
+ free(index);
+ index = NULL;
+ goto end;
}
+ index->stream = stream;
- relay_index_free(index);
+ lttng_ht_node_init_u64(&index->index_n, net_seq_num);
+ pthread_mutex_init(&index->lock, NULL);
+ pthread_mutex_init(&index->reflock, NULL);
+ urcu_ref_init(&index->ref);
+
+end:
+ return index;
}
/*
- * Allocate a new relay index object using the given stream ID and sequence
- * number as the hash table key.
+ * Add unique relay index to the given hash table. In case of a collision, the
+ * already existing object is put in the given _index variable.
*
- * Return allocated object or else NULL on error.
+ * RCU read side lock MUST be acquired.
*/
-struct relay_index *relay_index_create(uint64_t stream_id,
- uint64_t net_seq_num)
+static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
+ struct relay_index *index)
{
- struct relay_index *index;
+ struct cds_lfht_node *node_ptr;
+ struct relay_index *_index;
- DBG2("Creating relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
- stream_id, net_seq_num);
+ DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
+ stream->stream_handle, index->index_n.key);
- index = zmalloc(sizeof(*index));
- if (index == NULL) {
- PERROR("Relay index zmalloc");
- goto error;
+ node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht,
+ stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed),
+ stream->indexes_ht->match_fct, &index->index_n,
+ &index->index_n.node);
+ if (node_ptr != &index->index_n.node) {
+ _index = caa_container_of(node_ptr, struct relay_index,
+ index_n.node);
+ } else {
+ _index = NULL;
}
+ return _index;
+}
+
+/*
+ * Should be called with RCU read-side lock held.
+ */
+static bool relay_index_get(struct relay_index *index)
+{
+ bool has_ref = false;
- index->to_close_fd = -1;
- lttng_ht_node_init_two_u64(&index->index_n, stream_id, net_seq_num);
+ DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
+ index->stream->stream_handle, index->index_n.key,
+ (int) index->ref.refcount);
-error:
- return index;
+ /* Confirm that the index refcount has not reached 0. */
+ pthread_mutex_lock(&index->reflock);
+ if (index->ref.refcount != 0) {
+ has_ref = true;
+ urcu_ref_get(&index->ref);
+ }
+ pthread_mutex_unlock(&index->reflock);
+
+ return has_ref;
}
/*
- * Find a relayd index in the given hash table.
+ * Get a relayd index in within the given stream, or create it if not
+ * present.
*
+ * Called with stream mutex held.
* Return index object or else NULL on error.
*/
-struct relay_index *relay_index_find(uint64_t stream_id, uint64_t net_seq_num)
+struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
+ uint64_t net_seq_num)
{
- struct lttng_ht_node_two_u64 *node;
+ struct lttng_ht_node_u64 *node;
struct lttng_ht_iter iter;
- struct lttng_ht_two_u64 key;
struct relay_index *index = NULL;
+ bool has_ref = false;
DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64,
- stream_id, net_seq_num);
-
- key.key1 = stream_id;
- key.key2 = net_seq_num;
+ stream->stream_handle, net_seq_num);
- lttng_ht_lookup(indexes_ht, (void *)(&key), &iter);
- node = lttng_ht_iter_get_node_two_u64(&iter);
- if (node == NULL) {
- goto end;
+ rcu_read_lock();
+ lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter);
+ node = lttng_ht_iter_get_node_u64(&iter);
+ if (node) {
+ index = caa_container_of(node, struct relay_index, index_n);
+ } else {
+ struct relay_index *oldindex;
+
+ index = relay_index_create(stream, net_seq_num);
+ if (!index) {
+ ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64,
+ index->stream->stream_handle, net_seq_num);
+ goto end;
+ }
+ oldindex = relay_index_add_unique(stream, index);
+ if (oldindex) {
+ /* Added concurrently, keep old. */
+ relay_index_put(index);
+ index = oldindex;
+ if (!relay_index_get(index)) {
+ index = NULL;
+ }
+ } else {
+ stream->indexes_in_flight++;
+ index->in_hash_table = true;
+ }
}
- index = caa_container_of(node, struct relay_index, index_n);
-
end:
- DBG2("Index %sfound in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
- (index == NULL) ? "NOT " : "", stream_id, net_seq_num);
+ rcu_read_unlock();
+ DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
+ (index == NULL) ? "NOT " : "", index->stream->stream_handle, net_seq_num);
return index;
}
-/*
- * Add unique relay index to the given hash table. In case of a collision, the
- * already existing object is put in the given _index variable.
- *
- * RCU read side lock MUST be acquired.
- */
-void relay_index_add(struct relay_index *index, struct relay_index **_index)
+int relay_index_set_fd(struct relay_index *index, struct stream_fd *index_fd,
+ uint64_t data_offset)
{
- struct cds_lfht_node *node_ptr;
+ int ret = 0;
- assert(index);
+ pthread_mutex_lock(&index->lock);
+ if (index->index_fd) {
+ ret = -1;
+ goto end;
+ }
+ stream_fd_get(index_fd);
+ index->index_fd = index_fd;
+ index->index_data.offset = data_offset;
+end:
+ pthread_mutex_unlock(&index->lock);
+ return ret;
+}
- DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
- index->index_n.key.key1, index->index_n.key.key2);
+int relay_index_set_data(struct relay_index *index,
+ const struct ctf_packet_index *data)
+{
+ int ret = 0;
- node_ptr = cds_lfht_add_unique(indexes_ht->ht,
- indexes_ht->hash_fct((void *) &index->index_n.key, lttng_ht_seed),
- indexes_ht->match_fct, (void *) &index->index_n.key,
- &index->index_n.node);
- if (node_ptr != &index->index_n.node) {
- *_index = caa_container_of(node_ptr, struct relay_index, index_n.node);
+ pthread_mutex_lock(&index->lock);
+ if (index->has_index_data) {
+ ret = -1;
+ goto end;
}
+ /* Set everything except data_offset. */
+ index->index_data.packet_size = data->packet_size;
+ index->index_data.content_size = data->content_size;
+ index->index_data.timestamp_begin = data->timestamp_begin;
+ index->index_data.timestamp_end = data->timestamp_end;
+ index->index_data.events_discarded = data->events_discarded;
+ index->index_data.stream_id = data->stream_id;
+ index->has_index_data = true;
+end:
+ pthread_mutex_unlock(&index->lock);
+ return ret;
}
-/*
- * Write index on disk to the given fd. Once done error or not, it is removed
- * from the hash table and destroy the object.
- *
- * MUST be called with a RCU read side lock held.
- *
- * Return 0 on success else a negative value.
- */
-int relay_index_write(int fd, struct relay_index *index)
+static void index_destroy(struct relay_index *index)
{
- int ret;
- struct lttng_ht_iter iter;
-
- DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
- " on fd %d", index->index_n.key.key1,
- index->index_n.key.key2, fd);
+ free(index);
+}
- /* Delete index from hash table. */
- iter.iter.node = &index->index_n.node;
- ret = lttng_ht_del(indexes_ht, &iter);
- assert(!ret);
- call_rcu(&index->rcu_node, deferred_free_relay_index);
+static void index_destroy_rcu(struct rcu_head *rcu_head)
+{
+ struct relay_index *index =
+ caa_container_of(rcu_head, struct relay_index, rcu_node);
- return index_write(fd, &index->index_data, sizeof(index->index_data));
+ index_destroy(index);
}
-/*
- * Free the given index.
- */
-void relay_index_free(struct relay_index *index)
+static void index_release(struct urcu_ref *ref)
{
- free(index);
+ struct relay_index *index = caa_container_of(ref, struct relay_index, ref);
+ struct relay_stream *stream = index->stream;
+ int ret;
+ struct lttng_ht_iter iter;
+
+ if (index->index_fd) {
+ stream_fd_put(index->index_fd);
+ index->index_fd = NULL;
+ }
+ if (index->in_hash_table) {
+ /* Delete index from hash table. */
+ iter.iter.node = &index->index_n.node;
+ ret = lttng_ht_del(stream->indexes_ht, &iter);
+ assert(!ret);
+ stream->indexes_in_flight--;
+ }
+
+ stream_put(index->stream);
+ index->stream = NULL;
+
+ call_rcu(&index->rcu_node, index_destroy_rcu);
}
/*
- * Safely free the given index using a call RCU.
+ * Called with stream mutex held.
*/
-void relay_index_free_safe(struct relay_index *index)
+void relay_index_put(struct relay_index *index)
{
- if (!index) {
- return;
- }
-
- call_rcu(&index->rcu_node, deferred_free_relay_index);
+ DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
+ index->stream->stream_handle, index->index_n.key,
+ (int) index->ref.refcount);
+ /*
+ * Ensure existance of index->lock for index unlock.
+ */
+ rcu_read_lock();
+ /*
+ * Index lock ensures that concurrent test and update of stream
+ * ref is atomic.
+ */
+ pthread_mutex_lock(&index->reflock);
+ assert(index->ref.refcount != 0);
+ urcu_ref_put(&index->ref, index_release);
+ pthread_mutex_unlock(&index->reflock);
+ rcu_read_unlock();
}
/*
- * Delete index from the given hash table.
+ * Try to flush index to disk. Releases self-reference to index once
+ * flush succeeds.
*
- * RCU read side lock MUST be acquired.
+ * Return 0 on successful flush, a negative value on error, or positive
+ * value if no flush was performed.
*/
-void relay_index_delete(struct relay_index *index)
+int relay_index_try_flush(struct relay_index *index)
{
- int ret;
- struct lttng_ht_iter iter;
+ int ret = 1;
+ bool flushed = false;
+ int fd;
- DBG3("Relay index with stream ID %" PRIu64 " and seq num %" PRIu64
- " deleted.", index->index_n.key.key1,
- index->index_n.key.key2);
+ pthread_mutex_lock(&index->lock);
+ if (index->flushed) {
+ goto skip;
+ }
+ /* Check if we are ready to flush. */
+ if (!index->has_index_data || !index->index_fd) {
+ goto skip;
+ }
+ fd = index->index_fd->fd;
+ DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
+ " on fd %d", index->stream->stream_handle,
+ index->index_n.key, fd);
+ flushed = true;
+ index->flushed = true;
+ ret = index_write(fd, &index->index_data, sizeof(index->index_data));
+ if (ret == sizeof(index->index_data)) {
+ ret = 0;
+ } else {
+ ret = -1;
+ }
+skip:
+ pthread_mutex_unlock(&index->lock);
- /* Delete index from hash table. */
- iter.iter.node = &index->index_n.node;
- ret = lttng_ht_del(indexes_ht, &iter);
- assert(!ret);
+ if (flushed) {
+ /* Put self-ref from index now that it has been flushed. */
+ relay_index_put(index);
+ }
+ return ret;
}
/*
- * Destroy every relay index with the given stream id as part of the key.
+ * Close every relay index within a given stream, without flushing
+ * them.
*/
-void relay_index_destroy_by_stream_id(uint64_t stream_id)
+void relay_index_close_all(struct relay_stream *stream)
{
struct lttng_ht_iter iter;
struct relay_index *index;
rcu_read_lock();
- cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, index_n.node) {
- if (index->index_n.key.key1 == stream_id) {
- relay_index_delete(index);
- relay_index_free_safe(index);
- }
+ cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
+ index, index_n.node) {
+ /* Put self-ref from index. */
+ relay_index_put(index);
}
rcu_read_unlock();
}
diff --git a/src/bin/lttng-relayd/index.h b/src/bin/lttng-relayd/index.h
index e7f9cdb..adf85bb 100644
--- a/src/bin/lttng-relayd/index.h
+++ b/src/bin/lttng-relayd/index.h
@@ -1,6 +1,10 @@
+#ifndef _RELAY_INDEX_H
+#define _RELAY_INDEX_H
+
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -16,42 +20,55 @@
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef _RELAY_INDEX_H
-#define _RELAY_INDEX_H
-
#include <inttypes.h>
#include <pthread.h>
#include <common/hashtable/hashtable.h>
#include <common/index/index.h>
+#include "stream-fd.h"
+
+struct relay_stream;
+
struct relay_index {
- /* FD on which to write the index data. */
- int fd;
/*
- * When destroying this object, this fd is checked and if valid, close it
- * so this is basically a lazy close of the previous fd corresponding to
- * the same stream id. This is used for the rotate file feature.
+ * index lock nests inside stream lock.
*/
- int to_close_fd;
+ pthread_mutex_t reflock; /* Protects refcounting. */
+ struct urcu_ref ref; /* Reference from getters. */
+ struct relay_stream *stream; /* Back ref to stream */
+
+ pthread_mutex_t lock;
+ /*
+ * FD on which to write the index data. May differ from
+ * stream->index_fd due to tracefile rotation.
+ */
+ struct stream_fd *index_fd;
/* Index packet data. This is the data that is written on disk. */
struct ctf_packet_index index_data;
- /* key1 = stream_id, key2 = net_seq_num */
- struct lttng_ht_node_two_u64 index_n;
- struct rcu_head rcu_node;
- pthread_mutex_t mutex;
+ bool has_index_data;
+ bool flushed;
+ bool in_hash_table;
+
+ /*
+ * Node within indexes_ht that corresponds to this struct
+ * relay_index. Indexed by net_seq_num.
+ */
+ struct lttng_ht_node_u64 index_n;
+ struct rcu_head rcu_node; /* For call_rcu teardown. */
};
-struct relay_index *relay_index_create(uint64_t stream_id,
+struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
uint64_t net_seq_num);
-struct relay_index *relay_index_find(uint64_t stream_id, uint64_t net_seq_num);
-void relay_index_add(struct relay_index *index, struct relay_index **_index);
-int relay_index_write(int fd, struct relay_index *index);
-void relay_index_free(struct relay_index *index);
-void relay_index_free_safe(struct relay_index *index);
-void relay_index_delete(struct relay_index *index);
-void relay_index_destroy_by_stream_id(uint64_t stream_id);
+void relay_index_put(struct relay_index *index);
+int relay_index_set_fd(struct relay_index *index, struct stream_fd *index_fd,
+ uint64_t data_offset);
+int relay_index_set_data(struct relay_index *index,
+ const struct ctf_packet_index *data);
+int relay_index_try_flush(struct relay_index *index);
+
+void relay_index_close_all(struct relay_stream *stream);
#endif /* _RELAY_INDEX_H */
diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c
index 562a7fa..90af673 100644
--- a/src/bin/lttng-relayd/live.c
+++ b/src/bin/lttng-relayd/live.c
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2 only,
@@ -36,6 +37,7 @@
#include <inttypes.h>
#include <urcu/futex.h>
#include <urcu/uatomic.h>
+#include <urcu/rculist.h>
#include <unistd.h>
#include <fcntl.h>
#include <config.h>
@@ -65,6 +67,9 @@
#include "session.h"
#include "ctf-trace.h"
#include "connection.h"
+#include "viewer-session.h"
+
+#define SESSION_BUF_DEFAULT_COUNT 16
static struct lttng_uri *live_uri;
@@ -90,6 +95,8 @@ static pthread_t live_worker_thread;
static struct relay_conn_queue viewer_conn_queue;
static uint64_t last_relay_viewer_session_id;
+static pthread_mutex_t last_relay_viewer_session_id_lock =
+ PTHREAD_MUTEX_INITIALIZER;
/*
* Cleanup the daemon
@@ -114,9 +121,6 @@ ssize_t recv_request(struct lttcomm_sock *sock, void *buf, size_t size)
{
ssize_t ret;
- assert(sock);
- assert(buf);
-
ret = sock->ops->recvmsg(sock, buf, size, 0);
if (ret < 0 || ret != size) {
if (ret == 0) {
@@ -143,9 +147,6 @@ ssize_t send_response(struct lttcomm_sock *sock, void *buf, size_t size)
{
ssize_t ret;
- assert(sock);
- assert(buf);
-
ret = sock->ops->sendmsg(sock, buf, size, 0);
if (ret < 0) {
ERR("Relayd failed to send response.");
@@ -171,17 +172,22 @@ int check_new_streams(struct relay_connection *conn)
if (!conn->viewer_session) {
goto end;
}
- cds_list_for_each_entry(session,
- &conn->viewer_session->sessions_head,
- viewer_session_list) {
+ rcu_read_lock();
+ cds_list_for_each_entry_rcu(session,
+ &conn->viewer_session->session_list,
+ viewer_session_node) {
+ if (!session_get(session)) {
+ continue;
+ }
current_val = uatomic_cmpxchg(&session->new_streams, 1, 0);
ret = current_val;
+ session_put(session);
if (ret == 1) {
goto end;
}
}
-
end:
+ rcu_read_unlock();
return ret;
}
@@ -200,8 +206,6 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock,
struct lttng_ht_iter iter;
struct relay_viewer_stream *vstream;
- assert(session);
-
rcu_read_lock();
cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream,
@@ -210,30 +214,38 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock,
health_code_update();
+ if (!viewer_stream_get(vstream)) {
+ continue;
+ }
+
+ pthread_mutex_lock(&vstream->lock);
/* Ignore if not the same session. */
- if (vstream->session_id != session->id ||
+ if (vstream->stream->trace->session->id != session->id ||
(!ignore_sent_flag && vstream->sent_flag)) {
+ pthread_mutex_unlock(&vstream->lock);
+ viewer_stream_put(vstream);
continue;
}
- ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
- vstream->path_name);
- assert(ctf_trace);
-
- send_stream.id = htobe64(vstream->stream_handle);
+ ctf_trace = vstream->stream->trace;
+ send_stream.id = htobe64(vstream->stream->stream_handle);
send_stream.ctf_trace_id = htobe64(ctf_trace->id);
- send_stream.metadata_flag = htobe32(vstream->metadata_flag);
+ send_stream.metadata_flag = htobe32(vstream->stream->is_metadata);
strncpy(send_stream.path_name, vstream->path_name,
sizeof(send_stream.path_name));
strncpy(send_stream.channel_name, vstream->channel_name,
sizeof(send_stream.channel_name));
- DBG("Sending stream %" PRIu64 " to viewer", vstream->stream_handle);
+ DBG("Sending stream %" PRIu64 " to viewer",
+ vstream->stream->stream_handle);
+ vstream->sent_flag = 1;
+ pthread_mutex_unlock(&vstream->lock);
+
ret = send_response(sock, &send_stream, sizeof(send_stream));
if (ret < 0) {
goto end_unlock;
}
- vstream->sent_flag = 1;
+ viewer_stream_put(vstream);
}
ret = 0;
@@ -263,13 +275,10 @@ int make_viewer_streams(struct relay_session *session,
assert(session);
/*
- * This is to make sure we create viewer streams for a full received
- * channel. For instance, if we have 8 streams for a channel that are
- * concurrently being flagged ready, we can end up creating just a subset
- * of the 8 streams (the ones that are flagged). This lock avoids this
- * limbo state.
+ * Hold the session lock to ensure that we see either none or
+ * all initial streams for a session, but no intermediate state.
*/
- pthread_mutex_lock(&session->viewer_ready_lock);
+ pthread_mutex_lock(&session->lock);
/*
* Create viewer streams for relay streams that are ready to be used for a
@@ -282,47 +291,46 @@ int make_viewer_streams(struct relay_session *session,
health_code_update();
- if (ctf_trace->invalid_flag) {
+ if (!ctf_trace_get(ctf_trace)) {
continue;
}
- cds_list_for_each_entry(stream, &ctf_trace->stream_list, trace_list) {
+ cds_list_for_each_entry_rcu(stream, &ctf_trace->stream_list, stream_node) {
struct relay_viewer_stream *vstream;
- if (!stream->viewer_ready) {
- continue;
- }
-
- vstream = viewer_stream_find_by_id(stream->stream_handle);
+ vstream = viewer_stream_get_by_id(stream->stream_handle);
if (!vstream) {
- vstream = viewer_stream_create(stream, seek_t, ctf_trace);
+ vstream = viewer_stream_create(stream, seek_t);
if (!vstream) {
ret = -1;
+ ctf_trace_put(ctf_trace);
goto error_unlock;
}
- /* Acquire reference to ctf_trace. */
- ctf_trace_get_ref(ctf_trace);
if (nb_created) {
/* Update number of created stream counter. */
(*nb_created)++;
}
- } else if (!vstream->sent_flag && nb_unsent) {
- /* Update number of unsent stream counter. */
- (*nb_unsent)++;
+ } else {
+ if (!vstream->sent_flag && nb_unsent) {
+ /* Update number of unsent stream counter. */
+ (*nb_unsent)++;
+ }
+ viewer_stream_put(vstream);
}
/* Update number of total stream counter. */
if (nb_total) {
(*nb_total)++;
}
}
+ ctf_trace_put(ctf_trace);
}
ret = 0;
error_unlock:
rcu_read_unlock();
- pthread_mutex_unlock(&session->viewer_ready_lock);
+ pthread_mutex_unlock(&session->lock);
return ret;
}
@@ -512,15 +520,9 @@ restart:
struct relay_connection *new_conn;
struct lttcomm_sock *newsock;
- new_conn = connection_create();
- if (!new_conn) {
- goto error;
- }
-
newsock = live_control_sock->ops->accept(live_control_sock);
if (!newsock) {
PERROR("accepting control sock");
- connection_free(new_conn);
goto error;
}
DBG("Relay viewer connection accepted socket %d", newsock->fd);
@@ -530,10 +532,13 @@ restart:
if (ret < 0) {
PERROR("setsockopt inet");
lttcomm_destroy_sock(newsock);
- connection_free(new_conn);
goto error;
}
- new_conn->sock = newsock;
+ new_conn = connection_create(newsock, RELAY_CONNECTION_UNKNOWN);
+ if (!new_conn) {
+ lttcomm_destroy_sock(newsock);
+ goto error;
+ }
/* Enqueue request for the dispatcher thread. */
cds_wfcq_enqueue(&viewer_conn_queue.head, &viewer_conn_queue.tail,
@@ -625,7 +630,7 @@ void *thread_dispatcher(void *data)
ret = lttng_write(live_conn_pipe[1], &conn, sizeof(conn));
if (ret < 0) {
PERROR("write conn pipe");
- connection_destroy(conn);
+ connection_put(conn);
goto error;
}
} while (node != NULL);
@@ -664,8 +669,6 @@ int viewer_connect(struct relay_connection *conn)
int ret;
struct lttng_viewer_connect reply, msg;
- assert(conn);
-
conn->version_check_done = 1;
health_code_update();
@@ -716,7 +719,9 @@ int viewer_connect(struct relay_connection *conn)
* Increment outside of htobe64 macro, because can be used more than once
* within the macro, and thus the operation may be undefined.
*/
+ pthread_mutex_lock(&last_relay_viewer_session_id_lock);
last_relay_viewer_session_id++;
+ pthread_mutex_unlock(&last_relay_viewer_session_id_lock);
reply.viewer_session_id = htobe64(last_relay_viewer_session_id);
}
@@ -738,6 +743,9 @@ end:
/*
* Send the viewer the list of current sessions.
+ * We need to create a copy of the hash table content because otherwise
+ * we cannot assume the number of entries stays the same between getting
+ * the number of HT elements and iteration over the HT.
*
* Return 0 on success or else a negative value.
*/
@@ -746,146 +754,79 @@ int viewer_list_sessions(struct relay_connection *conn)
{
int ret;
struct lttng_viewer_list_sessions session_list;
- unsigned long count;
- long approx_before, approx_after;
struct lttng_ht_iter iter;
- struct lttng_viewer_session send_session;
struct relay_session *session;
+ struct lttng_viewer_session *send_session_buf = NULL;
+ uint32_t buf_count = SESSION_BUF_DEFAULT_COUNT;
+ uint32_t count = 0;
DBG("List sessions received");
- rcu_read_lock();
- cds_lfht_count_nodes(conn->sessions_ht->ht, &approx_before, &count,
- &approx_after);
- session_list.sessions_count = htobe32(count);
-
- health_code_update();
-
- ret = send_response(conn->sock, &session_list, sizeof(session_list));
- if (ret < 0) {
- goto end_unlock;
+ send_session_buf = zmalloc(SESSION_BUF_DEFAULT_COUNT * sizeof(*send_session_buf));
+ if (!send_session_buf) {
+ return -1;
}
- health_code_update();
-
- cds_lfht_for_each_entry(conn->sessions_ht->ht, &iter.iter, session,
+ rcu_read_lock();
+ cds_lfht_for_each_entry(sessions_ht->ht, &iter.iter, session,
session_n.node) {
- health_code_update();
-
- strncpy(send_session.session_name, session->session_name,
- sizeof(send_session.session_name));
- strncpy(send_session.hostname, session->hostname,
- sizeof(send_session.hostname));
- send_session.id = htobe64(session->id);
- send_session.live_timer = htobe32(session->live_timer);
- send_session.clients = htobe32(session->viewer_refcount);
- send_session.streams = htobe32(session->stream_count);
+ struct lttng_viewer_session *send_session;
health_code_update();
- ret = send_response(conn->sock, &send_session, sizeof(send_session));
- if (ret < 0) {
- goto end_unlock;
- }
- }
- health_code_update();
+ if (count >= buf_count) {
+ struct lttng_viewer_session *newbuf;
+ uint32_t new_buf_count = buf_count << 1;
- ret = 0;
-end_unlock:
- rcu_read_unlock();
- return ret;
-}
-
-/*
- * Check if a connection is attached to a session.
- * Return 1 if attached, 0 if not attached, a negative value on error.
- */
-static
-int session_attached(struct relay_connection *conn, uint64_t session_id)
-{
- struct relay_session *session;
- int found = 0;
-
- if (!conn->viewer_session) {
- goto end;
- }
- cds_list_for_each_entry(session,
- &conn->viewer_session->sessions_head,
- viewer_session_list) {
- if (session->id == session_id) {
- found = 1;
- goto end;
+ newbuf = realloc(send_session_buf,
+ new_buf_count * sizeof(*send_session_buf));
+ if (!new_buf_count) {
+ ret = -1;
+ rcu_read_unlock();
+ goto end_free;
+ }
+ send_session_buf = newbuf;
+ buf_count = new_buf_count;
}
- }
-
-end:
- return found;
-}
-
-/*
- * Delete all streams for a specific session ID.
- */
-static void destroy_viewer_streams_by_session(struct relay_session *session)
-{
- struct relay_viewer_stream *stream;
- struct lttng_ht_iter iter;
-
- assert(session);
-
- rcu_read_lock();
- cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, stream,
- stream_n.node) {
- struct ctf_trace *ctf_trace;
-
- health_code_update();
- if (stream->session_id != session->id) {
- continue;
+ send_session = &send_session_buf[count];
+ strncpy(send_session->session_name, session->session_name,
+ sizeof(send_session->session_name));
+ strncpy(send_session->hostname, session->hostname,
+ sizeof(send_session->hostname));
+ send_session->id = htobe64(session->id);
+ send_session->live_timer = htobe32(session->live_timer);
+ if (session->viewer_attached) {
+ send_session->clients = htobe32(1);
+ } else {
+ send_session->clients = htobe32(0);
}
+ send_session->streams = htobe32(session->stream_count);
+ count++;
+ }
+ rcu_read_unlock();
- ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
- stream->path_name);
- assert(ctf_trace);
-
- viewer_stream_delete(stream);
+ session_list.sessions_count = htobe32(count);
- if (stream->metadata_flag) {
- ctf_trace->metadata_sent = 0;
- ctf_trace->viewer_metadata_stream = NULL;
- }
+ health_code_update();
- viewer_stream_destroy(ctf_trace, stream);
+ ret = send_response(conn->sock, &session_list, sizeof(session_list));
+ if (ret < 0) {
+ goto end_free;
}
- rcu_read_unlock();
-}
-
-static void try_destroy_streams(struct relay_session *session)
-{
- struct ctf_trace *ctf_trace;
- struct lttng_ht_iter iter;
- assert(session);
+ health_code_update();
- cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace,
- node.node) {
- /* Attempt to destroy the ctf trace of that session. */
- ctf_trace_try_destroy(session, ctf_trace);
+ ret = send_response(conn->sock, send_session_buf,
+ count * sizeof(*send_session_buf));
+ if (ret < 0) {
+ goto end_free;
}
-}
+ health_code_update();
-/*
- * Cleanup a session.
- */
-static void cleanup_session(struct relay_connection *conn,
- struct relay_session *session)
-{
- /*
- * Very important that this is done before destroying the session so we
- * can put back every viewer stream reference from the ctf_trace.
- */
- destroy_viewer_streams_by_session(session);
- try_destroy_streams(session);
- cds_list_del(&session->viewer_session_list);
- session_viewer_try_destroy(conn->sessions_ht, session);
+ ret = 0;
+end_free:
+ free(send_session_buf);
+ return ret;
}
/*
@@ -918,15 +859,14 @@ int viewer_get_new_streams(struct relay_connection *conn)
memset(&response, 0, sizeof(response));
- rcu_read_lock();
- session = session_find_by_id(conn->sessions_ht, session_id);
+ session = session_get_by_id(session_id);
if (!session) {
DBG("Relay session %" PRIu64 " not found", session_id);
response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
goto send_reply;
}
- if (!session_attached(conn, session_id)) {
+ if (!viewer_session_is_attached(conn->viewer_session, session)) {
send_streams = 0;
response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
goto send_reply;
@@ -938,7 +878,7 @@ int viewer_get_new_streams(struct relay_connection *conn)
ret = make_viewer_streams(session, LTTNG_VIEWER_SEEK_LAST, NULL, &nb_unsent,
&nb_created);
if (ret < 0) {
- goto end_unlock;
+ goto end_put_session;
}
/* Only send back the newly created streams with the unsent ones. */
nb_streams = nb_created + nb_unsent;
@@ -949,15 +889,13 @@ int viewer_get_new_streams(struct relay_connection *conn)
* it means that the viewer has already received the whole trace
* for this session and should now close it.
*/
- if (nb_streams == 0 && session->close_flag) {
+ if (nb_streams == 0 && session->connection_closed) {
send_streams = 0;
- response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP);
- /*
- * Remove the session from the attached list of the connection
- * and try to destroy it.
- */
- cds_list_del(&session->viewer_session_list);
- cleanup_session(conn, session);
+ if (viewer_session_detach(conn->viewer_session, session)) {
+ response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
+ } else {
+ response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP);
+ }
goto send_reply;
}
@@ -965,7 +903,7 @@ send_reply:
health_code_update();
ret = send_response(conn->sock, &response, sizeof(response));
if (ret < 0) {
- goto end_unlock;
+ goto end_put_session;
}
health_code_update();
@@ -975,7 +913,7 @@ send_reply:
*/
if (!send_streams || !nb_streams) {
ret = 0;
- goto end_unlock;
+ goto end_put_session;
}
/*
@@ -984,11 +922,13 @@ send_reply:
*/
ret = send_viewer_streams(conn->sock, session, 0);
if (ret < 0) {
- goto end_unlock;
+ goto end_put_session;
}
-end_unlock:
- rcu_read_unlock();
+end_put_session:
+ if (session) {
+ session_put(session);
+ }
error:
return ret;
}
@@ -1005,7 +945,7 @@ int viewer_attach_session(struct relay_connection *conn)
enum lttng_viewer_seek seek_type;
struct lttng_viewer_attach_session_request request;
struct lttng_viewer_attach_session_response response;
- struct relay_session *session;
+ struct relay_session *session = NULL;
assert(conn);
@@ -1027,37 +967,33 @@ int viewer_attach_session(struct relay_connection *conn)
goto send_reply;
}
- rcu_read_lock();
- session = session_find_by_id(conn->sessions_ht,
- be64toh(request.session_id));
+ session = session_get_by_id(be64toh(request.session_id));
if (!session) {
DBG("Relay session %" PRIu64 " not found",
be64toh(request.session_id));
response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
goto send_reply;
}
- session_viewer_attach(session);
DBG("Attach session ID %" PRIu64 " received", be64toh(request.session_id));
- if (uatomic_read(&session->viewer_refcount) > 1) {
- DBG("Already a viewer attached");
- response.status = htobe32(LTTNG_VIEWER_ATTACH_ALREADY);
- session_viewer_detach(session);
- goto send_reply;
- } else if (session->live_timer == 0) {
+ if (session->live_timer == 0) {
DBG("Not live session");
response.status = htobe32(LTTNG_VIEWER_ATTACH_NOT_LIVE);
goto send_reply;
- } else {
- send_streams = 1;
- response.status = htobe32(LTTNG_VIEWER_ATTACH_OK);
- cds_list_add(&session->viewer_session_list,
- &conn->viewer_session->sessions_head);
+ }
+
+ send_streams = 1;
+ ret = viewer_session_attach(conn->viewer_session, session);
+ if (ret) {
+ DBG("Already a viewer attached");
+ response.status = htobe32(LTTNG_VIEWER_ATTACH_ALREADY);
+ goto send_reply;
}
switch (be32toh(request.seek)) {
case LTTNG_VIEWER_SEEK_BEGINNING:
case LTTNG_VIEWER_SEEK_LAST:
+ response.status = htobe32(LTTNG_VIEWER_ATTACH_OK);
seek_type = be32toh(request.seek);
break;
default:
@@ -1069,7 +1005,7 @@ int viewer_attach_session(struct relay_connection *conn)
ret = make_viewer_streams(session, seek_type, &nb_streams, NULL, NULL);
if (ret < 0) {
- goto end_unlock;
+ goto end_put_session;
}
response.streams_count = htobe32(nb_streams);
@@ -1077,7 +1013,7 @@ send_reply:
health_code_update();
ret = send_response(conn->sock, &response, sizeof(response));
if (ret < 0) {
- goto end_unlock;
+ goto end_put_session;
}
health_code_update();
@@ -1087,17 +1023,19 @@ send_reply:
*/
if (!send_streams || !nb_streams) {
ret = 0;
- goto end_unlock;
+ goto end_put_session;
}
/* Send stream and ignore the sent flag. */
ret = send_viewer_streams(conn->sock, session, 1);
if (ret < 0) {
- goto end_unlock;
+ goto end_put_session;
}
-end_unlock:
- rcu_read_unlock();
+end_put_session:
+ if (session) {
+ session_put(session);
+ }
error:
return ret;
}
@@ -1105,38 +1043,42 @@ error:
/*
* Open the index file if needed for the given vstream.
*
- * If an index file is successfully opened, the index_read_fd of the stream is
- * set with it.
+ * If an index file is successfully opened, the vstream index_fd set with
+ * it.
*
* Return 0 on success, a negative value on error (-ENOENT if not ready yet).
+ *
+ * Called with both rstream and vstream locks held.
*/
static int try_open_index(struct relay_viewer_stream *vstream,
struct relay_stream *rstream)
{
int ret = 0;
- assert(vstream);
- assert(rstream);
-
- if (vstream->index_read_fd >= 0) {
+ if (vstream->index_fd) {
goto end;
}
/*
- * First time, we open the index file and at least one index is ready. The
- * race between the read and write of the total_index_received is
- * acceptable here since the client will be notified to simply come back
- * and get the next index.
+ * First time, we open the index file and at least one index is ready.
*/
if (rstream->total_index_received <= 0) {
ret = -ENOENT;
goto end;
}
ret = index_open(vstream->path_name, vstream->channel_name,
- vstream->tracefile_count, vstream->tracefile_count_current);
+ vstream->stream->tracefile_count,
+ vstream->current_tracefile_id);
if (ret >= 0) {
- vstream->index_read_fd = ret;
- ret = 0;
+ vstream->index_fd = stream_fd_create(ret);
+ if (!vstream->index_fd) {
+ if (close(ret)) {
+ PERROR("close");
+ }
+ ret = -1;
+ } else {
+ ret = 0;
+ }
goto end;
}
@@ -1146,12 +1088,14 @@ end:
/*
* Check the status of the index for the given stream. This function updates
- * the index structure if needed and can destroy the vstream also for the HUP
- * situation.
+ * the index structure if needed and can put (close) the vstream in the
+ * HUP situation.
*
* Return 0 means that we can proceed with the index. A value of 1 means that
* the index has been updated and is ready to be send to the client. A negative
* value indicates an error that can't be handled.
+ *
+ * Called with both rstream and vstream locks held.
*/
static int check_index_status(struct relay_viewer_stream *vstream,
struct relay_stream *rstream, struct ctf_trace *trace,
@@ -1159,68 +1103,95 @@ static int check_index_status(struct relay_viewer_stream *vstream,
{
int ret;
- assert(vstream);
- assert(rstream);
- assert(index);
- assert(trace);
-
- if (!rstream->close_flag) {
- /* Rotate on abort (overwrite). */
- if (vstream->abort_flag) {
- DBG("Viewer stream %" PRIu64 " rotate because of overwrite",
- vstream->stream_handle);
- ret = viewer_stream_rotate(vstream, rstream);
+ if (trace->session->connection_closed) {
+ if (rstream->total_index_received == vstream->last_sent_index) {
+ /* Last index sent and session connection is closed. */
+ index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
+ goto hup;
+ } else {
+ /*
+ * Session connection is closed, but there are
+ * still indexes to read.
+ */
+ ret = 0;
+ goto end;
+ }
+ } else {
+ if (rstream->beacon_ts_end != -1ULL &&
+ rstream->total_index_received == vstream->last_sent_index) {
+ /*
+ * We've received a synchronization beacon and the last index
+ * available has been sent, the index for now is inactive.
+ */
+ index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
+ index->timestamp_end = htobe64(rstream->beacon_ts_end);
+ index->stream_id = htobe64(rstream->ctf_stream_id);
+ goto index_ready;
+ } else if (rstream->total_index_received <= vstream->last_sent_index) {
+ /* This actually checks the case where recv == last_sent. */
+ index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+ goto index_ready;
+ } else if (!viewer_stream_is_tracefile_id_readable(vstream, vstream->current_tracefile_id)) {
+ /*
+ * The producer has overwritten our current
+ * file. We need to rotate.
+ */
+ DBG("Viewer stream %" PRIu64 " rotation due to overwrite",
+ vstream->stream->stream_handle);
+ ret = viewer_stream_rotate(vstream);
if (ret < 0) {
- goto error;
+ goto end;
} else if (ret == 1) {
- /* EOF */
+ /* EOF across entire stream. */
index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
goto hup;
}
/* ret == 0 means successful so we continue. */
- }
+ ret = 0;
+ } else {
+ ssize_t read_ret;
+ char tmp[1];
- /* Check if we are in the same trace file at this point. */
- if (rstream->tracefile_count_current == vstream->tracefile_count_current) {
- if (rstream->beacon_ts_end != -1ULL &&
- vstream->last_sent_index == rstream->total_index_received) {
- /*
- * We've received a synchronization beacon and the last index
- * available has been sent, the index for now is inactive.
- */
- index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
- index->timestamp_end = htobe64(rstream->beacon_ts_end);
- index->stream_id = htobe64(rstream->ctf_stream_id);
- goto index_ready;
- } else if (rstream->total_index_received <= vstream->last_sent_index
- && !vstream->close_write_flag) {
- /*
- * Reader and writer are working in the same tracefile, so we care
- * about the number of index received and sent. Otherwise, we read
- * up to EOF.
- */
- index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
- goto index_ready;
+ /*
+ * Use EOF on current index file to find out when we
+ * need to rotate.
+ */
+ read_ret = lttng_read(vstream->index_fd->fd, tmp, 1);
+ if (read_ret == 1) {
+ off_t seek_ret;
+
+ /* There is still data to read. Rewind position. */
+ seek_ret = lseek(vstream->index_fd->fd, -1, SEEK_CUR);
+ if (seek_ret < 0) {
+ ret = -1;
+ goto end;
+ }
+ ret = 0;
+ } else if (read_ret == 0) {
+ /* EOF. We need to rotate. */
+ DBG("Viewer stream %" PRIu64 " rotation due to EOF",
+ vstream->stream->stream_handle);
+ ret = viewer_stream_rotate(vstream);
+ if (ret < 0) {
+ goto end;
+ } else if (ret == 1) {
+ /* EOF across entire stream. */
+ index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
+ goto hup;
+ }
+ /* ret == 0 means successful so we continue. */
+ ret = 0;
+ } else {
+ /* Error reading index. */
+ ret = -1;
}
}
- /* Nothing to do with the index, continue with it. */
- ret = 0;
- } else if (rstream->close_flag && vstream->close_write_flag &&
- vstream->total_index_received == vstream->last_sent_index) {
- /* Last index sent and current tracefile closed in write */
- index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
- goto hup;
- } else {
- vstream->close_write_flag = 1;
- ret = 0;
}
-
-error:
+end:
return ret;
hup:
- viewer_stream_delete(vstream);
- viewer_stream_destroy(trace, vstream);
+ viewer_stream_put(vstream);
index_ready:
return 1;
}
@@ -1238,10 +1209,9 @@ int viewer_get_next_index(struct relay_connection *conn)
struct lttng_viewer_get_next_index request_index;
struct lttng_viewer_index viewer_index;
struct ctf_packet_index packet_index;
- struct relay_viewer_stream *vstream;
- struct relay_stream *rstream;
- struct ctf_trace *ctf_trace;
- struct relay_session *session;
+ struct relay_viewer_stream *vstream = NULL;
+ struct relay_stream *rstream = NULL;
+ struct ctf_trace *ctf_trace = NULL;
assert(conn);
@@ -1255,35 +1225,29 @@ int viewer_get_next_index(struct relay_connection *conn)
}
health_code_update();
- rcu_read_lock();
- vstream = viewer_stream_find_by_id(be64toh(request_index.stream_id));
+ vstream = viewer_stream_get_by_id(be64toh(request_index.stream_id));
if (!vstream) {
ret = -1;
- goto end_unlock;
- }
-
- session = session_find_by_id(conn->sessions_ht, vstream->session_id);
- if (!session) {
- ret = -1;
- goto end_unlock;
+ goto end;
}
- ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht, vstream->path_name);
- assert(ctf_trace);
+ /* Use back. ref. Protected by refcounts. */
+ rstream = vstream->stream;
+ ctf_trace = rstream->trace;
memset(&viewer_index, 0, sizeof(viewer_index));
+ pthread_mutex_lock(&rstream->lock);
+ pthread_mutex_lock(&vstream->lock);
+
/*
* The viewer should not ask for index on metadata stream.
*/
- if (vstream->metadata_flag) {
+ if (rstream->is_metadata) {
viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
goto send_reply;
}
- rstream = stream_find_by_id(relay_streams_ht, vstream->stream_handle);
- assert(rstream);
-
/* Try to open an index if one is needed for that stream. */
ret = try_open_index(vstream, rstream);
if (ret < 0) {
@@ -1300,11 +1264,9 @@ int viewer_get_next_index(struct relay_connection *conn)
goto send_reply;
}
- pthread_mutex_lock(&rstream->viewer_stream_rotation_lock);
ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index);
- pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
if (ret < 0) {
- goto end_unlock;
+ goto error_put;
} else if (ret == 1) {
/*
* This means the viewer index data structure has been populated by the
@@ -1322,58 +1284,17 @@ int viewer_get_next_index(struct relay_connection *conn)
ret = check_new_streams(conn);
if (ret < 0) {
- goto end_unlock;
+ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
+ goto send_reply;
} else if (ret == 1) {
viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
}
- pthread_mutex_lock(&rstream->viewer_stream_rotation_lock);
- pthread_mutex_lock(&vstream->overwrite_lock);
- if (vstream->abort_flag) {
- /* The file is being overwritten by the writer, we cannot use it. */
- pthread_mutex_unlock(&vstream->overwrite_lock);
- ret = viewer_stream_rotate(vstream, rstream);
- pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
- if (ret < 0) {
- goto end_unlock;
- } else if (ret == 1) {
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
- viewer_stream_delete(vstream);
- viewer_stream_destroy(ctf_trace, vstream);
- } else {
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
- }
- goto send_reply;
- }
-
- read_ret = lttng_read(vstream->index_read_fd, &packet_index,
+ read_ret = lttng_read(vstream->index_fd->fd, &packet_index,
sizeof(packet_index));
- pthread_mutex_unlock(&vstream->overwrite_lock);
- pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
if (read_ret < 0) {
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
- viewer_stream_delete(vstream);
- viewer_stream_destroy(ctf_trace, vstream);
- goto send_reply;
- } else if (read_ret < sizeof(packet_index)) {
- pthread_mutex_lock(&rstream->viewer_stream_rotation_lock);
- if (vstream->close_write_flag) {
- ret = viewer_stream_rotate(vstream, rstream);
- if (ret < 0) {
- pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
- goto end_unlock;
- } else if (ret == 1) {
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
- viewer_stream_delete(vstream);
- viewer_stream_destroy(ctf_trace, vstream);
- } else {
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
- }
- } else {
- ERR("Relay reading index file %d", vstream->index_read_fd);
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
- }
- pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
+ ERR("Relay reading index file %d", vstream->index_fd->fd);
+ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
goto send_reply;
} else {
viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_OK);
@@ -1392,22 +1313,31 @@ int viewer_get_next_index(struct relay_connection *conn)
viewer_index.stream_id = packet_index.stream_id;
send_reply:
+ pthread_mutex_unlock(&vstream->lock);
+ pthread_mutex_unlock(&rstream->lock);
+
viewer_index.flags = htobe32(viewer_index.flags);
health_code_update();
ret = send_response(conn->sock, &viewer_index, sizeof(viewer_index));
if (ret < 0) {
- goto end_unlock;
+ goto end;
}
health_code_update();
DBG("Index %" PRIu64 " for stream %" PRIu64 " sent",
- vstream->last_sent_index, vstream->stream_handle);
-
-end_unlock:
- rcu_read_unlock();
-
+ vstream->last_sent_index,
+ vstream->stream->stream_handle);
end:
+ if (vstream) {
+ viewer_stream_put(vstream);
+ }
+ return ret;
+
+error_put:
+ pthread_mutex_unlock(&vstream->lock);
+ pthread_mutex_unlock(&rstream->lock);
+ viewer_stream_put(vstream);
return ret;
}
@@ -1425,12 +1355,9 @@ int viewer_get_packet(struct relay_connection *conn)
ssize_t read_len;
struct lttng_viewer_get_packet get_packet_info;
struct lttng_viewer_trace_packet reply;
- struct relay_viewer_stream *stream;
- struct relay_session *session;
+ struct relay_viewer_stream *vstream = NULL;
struct ctf_trace *ctf_trace;
- assert(conn);
-
DBG2("Relay get data packet");
health_code_update();
@@ -1444,37 +1371,28 @@ int viewer_get_packet(struct relay_connection *conn)
/* From this point on, the error label can be reached. */
memset(&reply, 0, sizeof(reply));
- rcu_read_lock();
- stream = viewer_stream_find_by_id(be64toh(get_packet_info.stream_id));
- if (!stream) {
- goto error;
- }
-
- session = session_find_by_id(conn->sessions_ht, stream->session_id);
- if (!session) {
- ret = -1;
+ vstream = viewer_stream_get_by_id(be64toh(get_packet_info.stream_id));
+ if (!vstream) {
goto error;
}
- ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
- stream->path_name);
- assert(ctf_trace);
+ ctf_trace = vstream->stream->trace;
/*
* First time we read this stream, we need open the tracefile, we should
* only arrive here if an index has already been sent to the viewer, so the
* tracefile must exist, if it does not it is a fatal error.
*/
- if (stream->read_fd < 0) {
+ if (!vstream->stream_fd) {
char fullpath[PATH_MAX];
- if (stream->tracefile_count > 0) {
- ret = snprintf(fullpath, PATH_MAX, "%s/%s_%" PRIu64, stream->path_name,
- stream->channel_name,
- stream->tracefile_count_current);
+ if (vstream->stream->tracefile_count > 0) {
+ ret = snprintf(fullpath, PATH_MAX, "%s/%s_%" PRIu64, vstream->path_name,
+ vstream->channel_name,
+ vstream->current_tracefile_id);
} else {
- ret = snprintf(fullpath, PATH_MAX, "%s/%s", stream->path_name,
- stream->channel_name);
+ ret = snprintf(fullpath, PATH_MAX, "%s/%s", vstream->path_name,
+ vstream->channel_name);
}
if (ret < 0) {
goto error;
@@ -1484,7 +1402,13 @@ int viewer_get_packet(struct relay_connection *conn)
PERROR("Relay opening trace file");
goto error;
}
- stream->read_fd = ret;
+ vstream->stream_fd = stream_fd_create(ret);
+ if (!vstream->stream_fd) {
+ if (close(ret)) {
+ PERROR("close");
+ }
+ goto error;
+ }
}
if (!ctf_trace->metadata_received ||
@@ -1496,7 +1420,7 @@ int viewer_get_packet(struct relay_connection *conn)
ret = check_new_streams(conn);
if (ret < 0) {
- goto end_unlock;
+ goto end_free;
} else if (ret == 1) {
reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
reply.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
@@ -1510,34 +1434,17 @@ int viewer_get_packet(struct relay_connection *conn)
goto error;
}
- ret = lseek(stream->read_fd, be64toh(get_packet_info.offset), SEEK_SET);
+ ret = lseek(vstream->stream_fd->fd, be64toh(get_packet_info.offset), SEEK_SET);
if (ret < 0) {
- /*
- * If the read fd was closed by the streaming side, the
- * abort_flag will be set to 1, otherwise it is an error.
- */
- if (stream->abort_flag == 0) {
- PERROR("lseek");
- goto error;
- }
- reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_EOF);
- goto send_reply;
+ PERROR("lseek");
+ goto error;
}
- read_len = lttng_read(stream->read_fd, data, len);
+ read_len = lttng_read(vstream->stream_fd->fd, data, len);
if (read_len < len) {
- /*
- * If the read fd was closed by the streaming side, the
- * abort_flag will be set to 1, otherwise it is an error.
- */
- if (stream->abort_flag == 0) {
- PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64,
- stream->read_fd,
- be64toh(get_packet_info.offset));
- goto error;
- } else {
- reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_EOF);
- goto send_reply;
- }
+ PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64,
+ vstream->stream_fd->fd,
+ be64toh(get_packet_info.offset));
+ goto error;
}
reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
reply.len = htobe32(len);
@@ -1554,7 +1461,7 @@ send_reply:
ret = send_response(conn->sock, &reply, sizeof(reply));
if (ret < 0) {
- goto end_unlock;
+ goto end_free;
}
health_code_update();
@@ -1562,7 +1469,7 @@ send_reply:
health_code_update();
ret = send_response(conn->sock, data, len);
if (ret < 0) {
- goto end_unlock;
+ goto end_free;
}
health_code_update();
}
@@ -1570,11 +1477,12 @@ send_reply:
DBG("Sent %u bytes for stream %" PRIu64, len,
be64toh(get_packet_info.stream_id));
-end_unlock:
+end_free:
free(data);
- rcu_read_unlock();
-
end:
+ if (vstream) {
+ viewer_stream_put(vstream);
+ }
return ret;
}
@@ -1592,9 +1500,8 @@ int viewer_get_metadata(struct relay_connection *conn)
char *data = NULL;
struct lttng_viewer_get_metadata request;
struct lttng_viewer_metadata_packet reply;
- struct relay_viewer_stream *stream;
+ struct relay_viewer_stream *vstream = NULL;
struct ctf_trace *ctf_trace;
- struct relay_session *session;
assert(conn);
@@ -1610,22 +1517,14 @@ int viewer_get_metadata(struct relay_connection *conn)
memset(&reply, 0, sizeof(reply));
- rcu_read_lock();
- stream = viewer_stream_find_by_id(be64toh(request.stream_id));
- if (!stream || !stream->metadata_flag) {
+ vstream = viewer_stream_get_by_id(be64toh(request.stream_id));
+ if (!vstream || !vstream->stream->is_metadata) {
ERR("Invalid metadata stream");
goto error;
}
- session = session_find_by_id(conn->sessions_ht, stream->session_id);
- if (!session) {
- ret = -1;
- goto error;
- }
+ ctf_trace = vstream->stream->trace;
- ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
- stream->path_name);
- assert(ctf_trace);
assert(ctf_trace->metadata_sent <= ctf_trace->metadata_received);
len = ctf_trace->metadata_received - ctf_trace->metadata_sent;
@@ -1635,11 +1534,11 @@ int viewer_get_metadata(struct relay_connection *conn)
}
/* first time, we open the metadata file */
- if (stream->read_fd < 0) {
+ if (!vstream->stream_fd) {
char fullpath[PATH_MAX];
- ret = snprintf(fullpath, PATH_MAX, "%s/%s", stream->path_name,
- stream->channel_name);
+ ret = snprintf(fullpath, PATH_MAX, "%s/%s", vstream->path_name,
+ vstream->channel_name);
if (ret < 0) {
goto error;
}
@@ -1648,7 +1547,13 @@ int viewer_get_metadata(struct relay_connection *conn)
PERROR("Relay opening metadata file");
goto error;
}
- stream->read_fd = ret;
+ vstream->stream_fd = stream_fd_create(ret);
+ if (!vstream->stream_fd) {
+ if (close(ret)) {
+ PERROR("close");
+ }
+ goto error;
+ }
}
reply.len = htobe64(len);
@@ -1658,7 +1563,7 @@ int viewer_get_metadata(struct relay_connection *conn)
goto error;
}
- read_len = lttng_read(stream->read_fd, data, len);
+ read_len = lttng_read(vstream->stream_fd->fd, data, len);
if (read_len < len) {
PERROR("Relay reading metadata file");
goto error;
@@ -1674,14 +1579,14 @@ send_reply:
health_code_update();
ret = send_response(conn->sock, &reply, sizeof(reply));
if (ret < 0) {
- goto end_unlock;
+ goto end_free;
}
health_code_update();
if (len > 0) {
ret = send_response(conn->sock, data, len);
if (ret < 0) {
- goto end_unlock;
+ goto end_free;
}
}
@@ -1690,10 +1595,12 @@ send_reply:
DBG("Metadata sent");
-end_unlock:
+end_free:
free(data);
- rcu_read_unlock();
end:
+ if (vstream) {
+ viewer_stream_put(vstream);
+ }
return ret;
}
@@ -1712,13 +1619,12 @@ int viewer_create_session(struct relay_connection *conn)
memset(&resp, 0, sizeof(resp));
resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_OK);
- conn->viewer_session = zmalloc(sizeof(*conn->viewer_session));
+ conn->viewer_session = viewer_session_create();
if (!conn->viewer_session) {
ERR("Allocation viewer session");
resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_ERR);
goto send_reply;
}
- CDS_INIT_LIST_HEAD(&conn->viewer_session->sessions_head);
send_reply:
health_code_update();
@@ -1757,9 +1663,6 @@ int process_control(struct lttng_viewer_cmd *recv_hdr,
int ret = 0;
uint32_t msg_value;
- assert(recv_hdr);
- assert(conn);
-
msg_value = be32toh(recv_hdr->cmd);
/*
@@ -1813,8 +1716,6 @@ void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
{
int ret;
- assert(events);
-
(void) lttng_poll_del(events, pollfd);
ret = close(pollfd);
@@ -1824,38 +1725,6 @@ void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
}
/*
- * Delete and destroy a connection.
- *
- * RCU read side lock MUST be acquired.
- */
-static void destroy_connection(struct lttng_ht *relay_connections_ht,
- struct relay_connection *conn)
-{
- struct relay_session *session, *tmp_session;
-
- assert(relay_connections_ht);
- assert(conn);
-
- connection_delete(relay_connections_ht, conn);
-
- if (!conn->viewer_session) {
- goto end;
- }
-
- rcu_read_lock();
- cds_list_for_each_entry_safe(session, tmp_session,
- &conn->viewer_session->sessions_head,
- viewer_session_list) {
- DBG("Cleaning connection of session ID %" PRIu64, session->id);
- cleanup_session(conn, session);
- }
- rcu_read_unlock();
-
-end:
- connection_destroy(conn);
-}
-
-/*
* This thread does the actual work
*/
static
@@ -1867,8 +1736,6 @@ void *thread_worker(void *data)
struct lttng_ht *relay_connections_ht;
struct lttng_ht_iter iter;
struct lttng_viewer_cmd recv_hdr;
- struct relay_local_data *relay_ctx = (struct relay_local_data *) data;
- struct lttng_ht *sessions_ht = relay_ctx->sessions_ht;
struct relay_connection *destroy_conn;
DBG("[thread] Live viewer relay worker started");
@@ -1956,46 +1823,45 @@ restart:
if (ret < 0) {
goto error;
}
- conn->sessions_ht = sessions_ht;
- connection_init(conn);
lttng_poll_add(&events, conn->sock->fd,
LPOLLIN | LPOLLRDHUP);
- rcu_read_lock();
- lttng_ht_add_unique_ulong(relay_connections_ht,
- &conn->sock_n);
- rcu_read_unlock();
- DBG("Connection socket %d added", conn->sock->fd);
+ connection_ht_add(relay_connections_ht, conn);
+ DBG("Connection socket %d added to poll", conn->sock->fd);
}
} else {
struct relay_connection *conn;
- rcu_read_lock();
- conn = connection_find_by_sock(relay_connections_ht, pollfd);
- /* If not found, there is a synchronization issue. */
- assert(conn);
+ conn = connection_get_by_sock(relay_connections_ht, pollfd);
+ if (!conn) {
+ continue;
+ }
if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
cleanup_connection_pollfd(&events, pollfd);
- destroy_connection(relay_connections_ht, conn);
+ /* Put "open" ownership reference. */
+ connection_put(conn);
} else if (revents & LPOLLIN) {
ret = conn->sock->ops->recvmsg(conn->sock, &recv_hdr,
sizeof(recv_hdr), 0);
if (ret <= 0) {
/* Connection closed */
cleanup_connection_pollfd(&events, pollfd);
- destroy_connection(relay_connections_ht, conn);
+ /* Put "open" ownership reference. */
+ connection_put(conn);
DBG("Viewer control conn closed with %d", pollfd);
} else {
ret = process_control(&recv_hdr, conn);
if (ret < 0) {
/* Clear the session on error. */
cleanup_connection_pollfd(&events, pollfd);
- destroy_connection(relay_connections_ht, conn);
+ /* Put "open" ownership reference. */
+ connection_put(conn);
DBG("Viewer connection closed with %d", pollfd);
}
}
}
- rcu_read_unlock();
+ /* Put "get_by_sock" reference. */
+ connection_put(conn);
}
}
}
@@ -2010,7 +1876,7 @@ error:
destroy_conn,
sock_n.node) {
health_code_update();
- destroy_connection(relay_connections_ht, destroy_conn);
+ connection_put(destroy_conn);
}
rcu_read_unlock();
error_poll_create:
@@ -2078,8 +1944,7 @@ int relayd_live_join(void)
/*
* main
*/
-int relayd_live_create(struct lttng_uri *uri,
- struct relay_local_data *relay_ctx)
+int relayd_live_create(struct lttng_uri *uri)
{
int ret = 0, retval = 0;
void *status;
@@ -2129,7 +1994,7 @@ int relayd_live_create(struct lttng_uri *uri,
/* Setup the worker thread */
ret = pthread_create(&live_worker_thread, NULL,
- thread_worker, relay_ctx);
+ thread_worker, NULL);
if (ret) {
errno = ret;
PERROR("pthread_create viewer worker");
diff --git a/src/bin/lttng-relayd/live.h b/src/bin/lttng-relayd/live.h
index 5db940b..2b8a3a0 100644
--- a/src/bin/lttng-relayd/live.h
+++ b/src/bin/lttng-relayd/live.h
@@ -1,6 +1,10 @@
+#ifndef LTTNG_RELAYD_LIVE_H
+#define LTTNG_RELAYD_LIVE_H
+
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -16,15 +20,11 @@
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef LTTNG_RELAYD_LIVE_H
-#define LTTNG_RELAYD_LIVE_H
-
#include <common/uri.h>
#include "lttng-relayd.h"
-int relayd_live_create(struct lttng_uri *live_uri,
- struct relay_local_data *relay_ctx);
+int relayd_live_create(struct lttng_uri *live_uri);
int relayd_live_stop(void);
int relayd_live_join(void);
diff --git a/src/bin/lttng-relayd/lttng-relayd.h b/src/bin/lttng-relayd/lttng-relayd.h
index 245c5fd..889071c 100644
--- a/src/bin/lttng-relayd/lttng-relayd.h
+++ b/src/bin/lttng-relayd/lttng-relayd.h
@@ -1,6 +1,10 @@
+#ifndef LTTNG_RELAYD_H
+#define LTTNG_RELAYD_H
+
/*
* Copyright (C) 2012 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2 only,
@@ -16,9 +20,6 @@
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef LTTNG_RELAYD_H
-#define LTTNG_RELAYD_H
-
#include <limits.h>
#include <urcu.h>
#include <urcu/wfcqueue.h>
@@ -34,26 +35,22 @@ struct relay_conn_queue {
int32_t futex;
};
-struct relay_local_data {
- struct lttng_ht *sessions_ht;
-};
-
-extern char *opt_output_path;
-
/*
* Contains stream indexed by ID. This is important since many commands lookup
* streams only by ID thus also keeping them in this hash table makes the
- * search O(1) instead of iterating over the ctf_traces_ht of the session.
+ * search O(1).
*/
+extern struct lttng_ht *sessions_ht;
extern struct lttng_ht *relay_streams_ht;
-
extern struct lttng_ht *viewer_streams_ht;
-extern struct lttng_ht *indexes_ht;
+extern char *opt_output_path;
extern const char *tracing_group_name;
-
extern const char * const config_section_name;
+extern uid_t relayd_uid;
+extern gid_t relayd_gid;
+
extern int thread_quit_pipe[2];
void lttng_relay_notify_ready(void);
diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c
index 8e1879f..d31f858 100644
--- a/src/bin/lttng-relayd/main.c
+++ b/src/bin/lttng-relayd/main.c
@@ -2,6 +2,7 @@
* Copyright (C) 2012 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
* 2013 - Jérémie Galarneau <jeremie.galarneau at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2 only,
@@ -56,6 +57,7 @@
#include <common/uri.h>
#include <common/utils.h>
#include <common/config/config.h>
+#include <urcu/rculist.h>
#include "cmd.h"
#include "ctf-trace.h"
@@ -114,6 +116,11 @@ static pthread_t dispatcher_thread;
static pthread_t worker_thread;
static pthread_t health_thread;
+/*
+ * last_relay_stream_id_lock protects last_relay_stream_id increment
+ * atomicity on 32-bit architectures.
+ */
+static pthread_mutex_t last_relay_stream_id_lock = PTHREAD_MUTEX_INITIALIZER;
static uint64_t last_relay_stream_id;
/*
@@ -129,8 +136,8 @@ static char *data_buffer;
static unsigned int data_buffer_size;
/* We need those values for the file/dir creation. */
-static uid_t relayd_uid;
-static gid_t relayd_gid;
+uid_t relayd_uid;
+gid_t relayd_gid;
/* Global relay stream hash table. */
struct lttng_ht *relay_streams_ht;
@@ -138,8 +145,8 @@ struct lttng_ht *relay_streams_ht;
/* Global relay viewer stream hash table. */
struct lttng_ht *viewer_streams_ht;
-/* Global hash table that stores relay index object. */
-struct lttng_ht *indexes_ht;
+/* Global relay sessions hash table. */
+struct lttng_ht *sessions_ht;
/* Relayd health monitoring */
struct health_app *health_relayd;
@@ -163,8 +170,7 @@ static const char *config_ignore_options[] = { "help", "config" };
/*
* usage function on stderr
*/
-static
-void usage(void)
+static void usage(void)
{
fprintf(stderr, "Usage: %s OPTIONS\n\nOptions:\n", progname);
fprintf(stderr, " -h, --help Display this usage.\n");
@@ -185,8 +191,7 @@ void usage(void)
*
* Return 0 on success else a negative value.
*/
-static
-int set_option(int opt, const char *arg, const char *optname)
+static int set_option(int opt, const char *arg, const char *optname)
{
int ret;
@@ -308,8 +313,7 @@ end:
* See config_entry_handler_cb comment in common/config/config.h for the
* return value conventions.
*/
-static
-int config_entry_handler(const struct config_entry *entry, void *unused)
+static int config_entry_handler(const struct config_entry *entry, void *unused)
{
int ret = 0, i;
@@ -359,8 +363,7 @@ end:
return ret;
}
-static
-int set_options(int argc, char **argv)
+static int set_options(int argc, char **argv)
{
int c, ret = 0, option_index = 0, retval = 0;
int orig_optopt = optopt, orig_optind = optind;
@@ -486,8 +489,7 @@ exit:
/*
* Cleanup the daemon
*/
-static
-void relayd_cleanup(struct relay_local_data *relay_ctx)
+static void relayd_cleanup(void)
{
DBG("Cleaning up");
@@ -495,9 +497,8 @@ void relayd_cleanup(struct relay_local_data *relay_ctx)
lttng_ht_destroy(viewer_streams_ht);
if (relay_streams_ht)
lttng_ht_destroy(relay_streams_ht);
- if (relay_ctx && relay_ctx->sessions_ht)
- lttng_ht_destroy(relay_ctx->sessions_ht);
- free(relay_ctx);
+ if (sessions_ht)
+ lttng_ht_destroy(sessions_ht);
/* free the dynamically allocated opt_output_path */
free(opt_output_path);
@@ -517,8 +518,7 @@ void relayd_cleanup(struct relay_local_data *relay_ctx)
/*
* Write to writable pipe used to notify a thread.
*/
-static
-int notify_thread_pipe(int wpipe)
+static int notify_thread_pipe(int wpipe)
{
ssize_t ret;
@@ -532,8 +532,7 @@ end:
return ret;
}
-static
-int notify_health_quit_pipe(int *pipe)
+static int notify_health_quit_pipe(int *pipe)
{
ssize_t ret;
@@ -582,8 +581,7 @@ int lttng_relay_stop_threads(void)
* Simply stop all worker threads, leaving main() return gracefully after
* joining all threads and calling cleanup().
*/
-static
-void sighandler(int sig)
+static void sighandler(int sig)
{
switch (sig) {
case SIGPIPE:
@@ -613,8 +611,7 @@ void sighandler(int sig)
* Setup signal handler for :
* SIGINT, SIGTERM, SIGPIPE
*/
-static
-int set_signal_handler(void)
+static int set_signal_handler(void)
{
int ret = 0;
struct sigaction sa;
@@ -668,8 +665,7 @@ void lttng_relay_notify_ready(void)
*
* Return -1 on error or 0 if all pipes are created.
*/
-static
-int init_thread_quit_pipe(void)
+static int init_thread_quit_pipe(void)
{
int ret;
@@ -681,8 +677,7 @@ int init_thread_quit_pipe(void)
/*
* Create a poll set with O_CLOEXEC and add the thread quit pipe to the set.
*/
-static
-int create_thread_poll_set(struct lttng_poll_event *events, int size)
+static int create_thread_poll_set(struct lttng_poll_event *events, int size)
{
int ret;
@@ -713,8 +708,7 @@ error:
*
* Return 1 if it was triggered else 0;
*/
-static
-int check_thread_quit_pipe(int fd, uint32_t events)
+static int check_thread_quit_pipe(int fd, uint32_t events)
{
if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) {
return 1;
@@ -726,8 +720,7 @@ int check_thread_quit_pipe(int fd, uint32_t events)
/*
* Create and init socket from uri.
*/
-static
-struct lttcomm_sock *relay_init_sock(struct lttng_uri *uri)
+static struct lttcomm_sock *relay_socket_create(struct lttng_uri *uri)
{
int ret;
struct lttcomm_sock *sock = NULL;
@@ -765,63 +758,9 @@ error:
}
/*
- * Return nonzero if stream needs to be closed.
- */
-static
-int close_stream_check(struct relay_stream *stream)
-{
- if (stream->close_flag && stream->prev_seq == stream->last_net_seq_num) {
- /*
- * We are about to close the stream so set the data pending flag to 1
- * which will make the end data pending command skip the stream which
- * is now closed and ready. Note that after proceeding to a file close,
- * the written file is ready for reading.
- */
- stream->data_pending_check_done = 1;
- return 1;
- }
- return 0;
-}
-
-static void try_close_stream(struct relay_session *session,
- struct relay_stream *stream)
-{
- int ret;
- struct ctf_trace *ctf_trace;
-
- assert(session);
- assert(stream);
-
- if (!close_stream_check(stream)) {
- /* Can't close it, not ready for that. */
- goto end;
- }
-
- ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
- stream->path_name);
- assert(ctf_trace);
-
- pthread_mutex_lock(&session->viewer_ready_lock);
- ctf_trace->invalid_flag = 1;
- pthread_mutex_unlock(&session->viewer_ready_lock);
-
- ret = stream_close(session, stream);
- if (ret || session->snapshot) {
- /* Already close thus the ctf trace is being or has been destroyed. */
- goto end;
- }
-
- ctf_trace_try_destroy(session, ctf_trace);
-
-end:
- return;
-}
-
-/*
* This thread manages the listening for new connections on the network
*/
-static
-void *relay_thread_listener(void *data)
+static void *relay_thread_listener(void *data)
{
int i, ret, pollfd, err = -1;
uint32_t revents, nb_fd;
@@ -834,12 +773,12 @@ void *relay_thread_listener(void *data)
health_code_update();
- control_sock = relay_init_sock(control_uri);
+ control_sock = relay_socket_create(control_uri);
if (!control_sock) {
goto error_sock_control;
}
- data_sock = relay_init_sock(data_uri);
+ data_sock = relay_socket_create(data_uri);
if (!data_sock) {
goto error_sock_relay;
}
@@ -922,27 +861,22 @@ restart:
int val = 1;
struct relay_connection *new_conn;
struct lttcomm_sock *newsock;
-
- new_conn = connection_create();
- if (!new_conn) {
- goto error;
- }
+ enum connection_type type;
if (pollfd == data_sock->fd) {
- new_conn->type = RELAY_DATA;
+ type = RELAY_DATA;
newsock = data_sock->ops->accept(data_sock);
DBG("Relay data connection accepted, socket %d",
newsock->fd);
} else {
assert(pollfd == control_sock->fd);
- new_conn->type = RELAY_CONTROL;
+ type = RELAY_CONTROL;
newsock = control_sock->ops->accept(control_sock);
DBG("Relay control connection accepted, socket %d",
newsock->fd);
}
if (!newsock) {
PERROR("accepting sock");
- connection_free(new_conn);
goto error;
}
@@ -951,10 +885,13 @@ restart:
if (ret < 0) {
PERROR("setsockopt inet");
lttcomm_destroy_sock(newsock);
- connection_free(new_conn);
goto error;
}
- new_conn->sock = newsock;
+ new_conn = connection_create(newsock, type);
+ if (!new_conn) {
+ lttcomm_destroy_sock(newsock);
+ goto error;
+ }
/* Enqueue request for the dispatcher thread. */
cds_wfcq_enqueue(&relay_conn_queue.head, &relay_conn_queue.tail,
@@ -1004,8 +941,7 @@ error_sock_control:
/*
* This thread manages the dispatching of the requests to worker threads
*/
-static
-void *relay_thread_dispatcher(void *data)
+static void *relay_thread_dispatcher(void *data)
{
int err = -1;
ssize_t ret;
@@ -1051,7 +987,7 @@ void *relay_thread_dispatcher(void *data)
ret = lttng_write(relay_conn_pipe[1], &new_conn, sizeof(new_conn));
if (ret < 0) {
PERROR("write connection pipe");
- connection_destroy(new_conn);
+ connection_put(new_conn);
goto error;
}
} while (node != NULL);
@@ -1077,72 +1013,27 @@ error_testpoint:
return NULL;
}
-static void try_close_streams(struct relay_session *session)
-{
- struct ctf_trace *ctf_trace;
- struct lttng_ht_iter iter;
-
- assert(session);
-
- pthread_mutex_lock(&session->viewer_ready_lock);
- rcu_read_lock();
- cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace,
- node.node) {
- struct relay_stream *stream;
-
- /* Close streams. */
- cds_list_for_each_entry(stream, &ctf_trace->stream_list, trace_list) {
- stream_close(session, stream);
- }
-
- ctf_trace->invalid_flag = 1;
- ctf_trace_try_destroy(session, ctf_trace);
- }
- rcu_read_unlock();
- pthread_mutex_unlock(&session->viewer_ready_lock);
-}
-
-/*
- * Try to destroy a session within a connection.
- */
-static void destroy_session(struct relay_session *session,
- struct lttng_ht *sessions_ht)
-{
- assert(session);
- assert(sessions_ht);
-
- /* Indicate that this session can be destroyed from now on. */
- session->close_flag = 1;
-
- try_close_streams(session);
-
- /*
- * This will try to delete and destroy the session if no viewer is attached
- * to it meaning the refcount is down to zero.
- */
- session_try_destroy(sessions_ht, session);
-}
-
/*
- * Copy index data from the control port to a given index object.
+ * Set index data from the control port to a given index object.
*/
-static void copy_index_control_data(struct relay_index *index,
+static int set_index_control_data(struct relay_index *index,
struct lttcomm_relayd_index *data)
{
- assert(index);
- assert(data);
+ struct ctf_packet_index index_data;
/*
- * The index on disk is encoded in big endian, so we don't need to convert
- * the data received on the network. The data_offset value is NEVER
- * modified here and is updated by the data thread.
+ * The index on disk is encoded in big endian, so we don't need
+ * to convert the data received on the network. The data_offset
+ * value is NEVER modified here and is updated by the data
+ * thread.
*/
- index->index_data.packet_size = data->packet_size;
- index->index_data.content_size = data->content_size;
- index->index_data.timestamp_begin = data->timestamp_begin;
- index->index_data.timestamp_end = data->timestamp_end;
- index->index_data.events_discarded = data->events_discarded;
- index->index_data.stream_id = data->stream_id;
+ index_data.packet_size = data->packet_size;
+ index_data.content_size = data->content_size;
+ index_data.timestamp_begin = data->timestamp_begin;
+ index_data.timestamp_end = data->timestamp_end;
+ index_data.events_discarded = data->events_discarded;
+ index_data.stream_id = data->stream_id;
+ return relay_index_set_data(index, &index_data);
}
/*
@@ -1150,31 +1041,22 @@ static void copy_index_control_data(struct relay_index *index,
*
* On success, send back the session id or else return a negative value.
*/
-static
-int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
int ret = 0, send_ret;
struct relay_session *session;
struct lttcomm_relayd_status_session reply;
+ char session_name[NAME_MAX];
+ char hostname[HOST_NAME_MAX];
+ uint32_t live_timer = 0;
+ bool snapshot = false;
- assert(recv_hdr);
- assert(conn);
+ memset(session_name, 0, NAME_MAX);
+ memset(hostname, 0, HOST_NAME_MAX);
memset(&reply, 0, sizeof(reply));
- session = session_create();
- if (!session) {
- ret = -1;
- goto error;
- }
- session->minor = conn->minor;
- session->major = conn->major;
- conn->session_id = session->id;
- conn->session = session;
-
- reply.session_id = htobe64(session->id);
-
switch (conn->minor) {
case 1:
case 2:
@@ -1182,13 +1064,25 @@ int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr,
break;
case 4: /* LTTng sessiond 2.4 */
default:
- ret = cmd_create_session_2_4(conn, session);
+ ret = cmd_create_session_2_4(conn, session_name,
+ hostname, &live_timer, &snapshot);
+ }
+ if (ret < 0) {
+ goto send_reply;
}
- lttng_ht_add_unique_u64(conn->sessions_ht, &session->session_n);
+ session = session_create(session_name, hostname, live_timer,
+ snapshot, conn->major, conn->minor);
+ if (!session) {
+ ret = -1;
+ goto send_reply;
+ }
+ conn->session = session;
DBG("Created session %" PRIu64, session->id);
-error:
+ reply.session_id = htobe64(session->id);
+
+send_reply:
if (ret < 0) {
reply.ret_code = htobe32(LTTNG_ERR_FATAL);
} else {
@@ -1208,47 +1102,48 @@ error:
* When we have received all the streams and the metadata for a channel,
* we make them visible to the viewer threads.
*/
-static
-void set_viewer_ready_flag(struct relay_connection *conn)
+static void publish_connection_local_streams(struct relay_connection *conn)
{
- struct relay_stream *stream, *tmp_stream;
+ struct relay_stream *stream;
+ struct relay_session *session = conn->session;
- pthread_mutex_lock(&conn->session->viewer_ready_lock);
- cds_list_for_each_entry_safe(stream, tmp_stream, &conn->recv_head,
- recv_list) {
- stream->viewer_ready = 1;
- cds_list_del(&stream->recv_list);
+ /*
+ * We publish all streams belonging to a session atomically wrt
+ * session lock.
+ */
+ pthread_mutex_lock(&session->lock);
+ rcu_read_lock();
+ cds_list_for_each_entry_rcu(stream, &session->recv_list,
+ recv_node) {
+ stream_publish(stream);
}
- pthread_mutex_unlock(&conn->session->viewer_ready_lock);
- return;
-}
-
-/*
- * Add a recv handle node to the connection recv list with the given stream
- * handle. A new node is allocated thus must be freed when the node is deleted
- * from the list.
- */
-static void queue_stream(struct relay_stream *stream,
- struct relay_connection *conn)
-{
- assert(conn);
- assert(stream);
+ rcu_read_unlock();
- cds_list_add(&stream->recv_list, &conn->recv_head);
+ /*
+ * Inform the viewer that there are new streams in the session.
+ */
+ if (session->viewer_attached) {
+ uatomic_set(&session->new_streams, 1);
+ }
+ pthread_mutex_unlock(&session->lock);
+ return;
}
/*
* relay_add_stream: allocate a new stream for a session
*/
-static
-int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
- int ret, send_ret;
+ int ret;
+ ssize_t send_ret;
struct relay_session *session = conn->session;
struct relay_stream *stream = NULL;
struct lttcomm_relayd_status_stream reply;
struct ctf_trace *trace = NULL;
+ uint64_t stream_handle = -1ULL;
+ char *path_name = NULL, *channel_name = NULL;
+ uint64_t tracefile_size = 0, tracefile_count = 0;
if (!session || conn->version_check_done == 0) {
ERR("Trying to add a stream before version check");
@@ -1256,107 +1151,46 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
goto end_no_session;
}
- stream = zmalloc(sizeof(struct relay_stream));
- if (stream == NULL) {
- PERROR("relay stream zmalloc");
- ret = -1;
- goto end_no_session;
- }
-
- switch (conn->minor) {
- case 1: /* LTTng sessiond 2.1 */
- ret = cmd_recv_stream_2_1(conn, stream);
+ switch (session->minor) {
+ case 1: /* LTTng sessiond 2.1. Allocates path_name and channel_name. */
+ ret = cmd_recv_stream_2_1(conn, &path_name,
+ &channel_name);
break;
- case 2: /* LTTng sessiond 2.2 */
+ case 2: /* LTTng sessiond 2.2. Allocates path_name and channel_name. */
default:
- ret = cmd_recv_stream_2_2(conn, stream);
+ ret = cmd_recv_stream_2_2(conn, &path_name,
+ &channel_name, &tracefile_size, &tracefile_count);
break;
}
if (ret < 0) {
- goto err_free_stream;
- }
-
- stream->stream_handle = ++last_relay_stream_id;
- stream->prev_seq = -1ULL;
- stream->session_id = session->id;
- stream->index_fd = -1;
- stream->read_index_fd = -1;
- stream->ctf_stream_id = -1ULL;
- lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
- pthread_mutex_init(&stream->lock, NULL);
-
- ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG);
- if (ret < 0) {
- ERR("relay creating output directory");
- goto err_free_stream;
- }
-
- /*
- * No need to use run_as API here because whatever we receives, the relayd
- * uses its own credentials for the stream files.
- */
- ret = utils_create_stream_file(stream->path_name, stream->channel_name,
- stream->tracefile_size, 0, relayd_uid, relayd_gid, NULL);
- if (ret < 0) {
- ERR("Create output file");
- goto err_free_stream;
- }
- stream->fd = ret;
- if (stream->tracefile_size) {
- DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name);
- } else {
- DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name);
+ goto send_reply;
}
- /* Protect access to "trace" */
- rcu_read_lock();
- trace = ctf_trace_find_by_path(session->ctf_traces_ht, stream->path_name);
+ trace = ctf_trace_get_by_path_or_create(session, path_name);
if (!trace) {
- trace = ctf_trace_create(stream->path_name);
- if (!trace) {
- ret = -1;
- goto end;
- }
- ctf_trace_add(session->ctf_traces_ht, trace);
+ goto send_reply;
}
- ctf_trace_get_ref(trace);
+ /* This stream here has one reference on the trace. */
- if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, NAME_MAX)) {
- stream->metadata_flag = 1;
- /* Assign quick reference to the metadata stream in the trace. */
- trace->metadata_stream = stream;
- }
+ pthread_mutex_lock(&last_relay_stream_id_lock);
+ stream_handle = ++last_relay_stream_id;
+ pthread_mutex_unlock(&last_relay_stream_id_lock);
- /*
- * Add the stream in the recv list of the connection. Once the end stream
- * message is received, this list is emptied and streams are set with the
- * viewer ready flag.
- */
- queue_stream(stream, conn);
+ /* We pass ownership of path_name and channel_name. */
+ stream = stream_create(trace, stream_handle, path_name,
+ channel_name, tracefile_size, tracefile_count);
/*
- * Both in the ctf_trace object and the global stream ht since the data
- * side of the relayd does not have the concept of session.
- *
- * rcu_read_lock() is kept to protect the stream which is now part of
- * the relay_streams_ht.
+ * Streams are the owners of their trace. Reference to trace is
+ * kept within stream_create().
*/
- lttng_ht_add_unique_u64(relay_streams_ht, &stream->node);
- cds_list_add_tail(&stream->trace_list, &trace->stream_list);
-
- session->stream_count++;
-
- DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
- stream->stream_handle);
+ ctf_trace_put(trace);
-end:
+send_reply:
memset(&reply, 0, sizeof(reply));
- reply.handle = htobe64(stream->stream_handle);
- /* send the session id to the client or a negative return code on error */
- if (ret < 0) {
+ reply.handle = htobe64(stream_handle);
+ if (!stream) {
reply.ret_code = htobe32(LTTNG_ERR_UNK);
- /* stream was not properly added to the ht, so free it */
- stream_destroy(stream);
} else {
reply.ret_code = htobe32(LTTNG_OK);
}
@@ -1365,29 +1199,17 @@ end:
sizeof(struct lttcomm_relayd_status_stream), 0);
if (send_ret < 0) {
ERR("Relay sending stream id");
- ret = send_ret;
+ ret = (int) send_ret;
}
- /*
- * rcu_read_lock() was held to protect either "trace" OR the "stream" at
- * this point.
- */
- rcu_read_unlock();
- trace = NULL;
- stream = NULL;
end_no_session:
return ret;
-
-err_free_stream:
- stream_destroy(stream);
- return ret;
}
/*
* relay_close_stream: close a specific stream
*/
-static
-int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
int ret, send_ret;
@@ -1417,23 +1239,18 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
goto end_no_session;
}
- rcu_read_lock();
- stream = stream_find_by_id(relay_streams_ht,
- be64toh(stream_info.stream_id));
+ stream = stream_get_by_id(be64toh(stream_info.stream_id));
if (!stream) {
ret = -1;
- goto end_unlock;
+ goto end;
}
-
+ pthread_mutex_lock(&stream->lock);
+ stream->closed = true;
stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
- stream->close_flag = 1;
- session->stream_count--;
+ pthread_mutex_unlock(&stream->lock);
+ stream_put(stream);
- /* Check if we can close it or else the data will do it. */
- try_close_stream(session, stream);
-
-end_unlock:
- rcu_read_unlock();
+end:
memset(&reply, 0, sizeof(reply));
if (ret < 0) {
@@ -1455,8 +1272,7 @@ end_no_session:
/*
* relay_unknown_command: send -1 if received unknown command
*/
-static
-void relay_unknown_command(struct relay_connection *conn)
+static void relay_unknown_command(struct relay_connection *conn)
{
struct lttcomm_relayd_generic_reply reply;
int ret;
@@ -1474,8 +1290,7 @@ void relay_unknown_command(struct relay_connection *conn)
* relay_start: send an acknowledgment to the client to tell if we are
* ready to receive data. We are ready if a session is established.
*/
-static
-int relay_start(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_start(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
int ret = htobe32(LTTNG_OK);
@@ -1529,10 +1344,9 @@ end:
}
/*
- * relay_recv_metadata: receive the metada for the session.
+ * relay_recv_metadata: receive the metadata for the session.
*/
-static
-int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
int ret = htobe32(LTTNG_OK);
@@ -1541,7 +1355,6 @@ int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
struct lttcomm_relayd_metadata_payload *metadata_struct;
struct relay_stream *metadata_stream;
uint64_t data_size, payload_size;
- struct ctf_trace *ctf_trace;
if (!session) {
ERR("Metadata sent before version check");
@@ -1586,38 +1399,37 @@ int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
}
metadata_struct = (struct lttcomm_relayd_metadata_payload *) data_buffer;
- rcu_read_lock();
- metadata_stream = stream_find_by_id(relay_streams_ht,
- be64toh(metadata_struct->stream_id));
+ metadata_stream = stream_get_by_id(be64toh(metadata_struct->stream_id));
if (!metadata_stream) {
ret = -1;
- goto end_unlock;
+ goto end;
}
- size_ret = lttng_write(metadata_stream->fd, metadata_struct->payload,
+ pthread_mutex_lock(&metadata_stream->lock);
+
+ size_ret = lttng_write(metadata_stream->stream_fd->fd, metadata_struct->payload,
payload_size);
if (size_ret < payload_size) {
ERR("Relay error writing metadata on file");
ret = -1;
- goto end_unlock;
+ goto end_put;
}
- ret = write_padding_to_file(metadata_stream->fd,
+ ret = write_padding_to_file(metadata_stream->stream_fd->fd,
be32toh(metadata_struct->padding_size));
if (ret < 0) {
- goto end_unlock;
+ goto end_put;
}
- ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
- metadata_stream->path_name);
- assert(ctf_trace);
- ctf_trace->metadata_received +=
+ metadata_stream->trace->metadata_received +=
payload_size + be32toh(metadata_struct->padding_size);
DBG2("Relay metadata written");
-end_unlock:
- rcu_read_unlock();
+end_put:
+ pthread_mutex_unlock(&metadata_stream->lock);
+ stream_put(metadata_stream);
+
end:
return ret;
}
@@ -1625,15 +1437,12 @@ end:
/*
* relay_send_version: send relayd version number
*/
-static
-int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
int ret;
struct lttcomm_relayd_version reply, msg;
- assert(conn);
-
conn->version_check_done = 1;
/* Get version from the other side. */
@@ -1657,7 +1466,7 @@ int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
if (reply.major != be32toh(msg.major)) {
DBG("Incompatible major versions (%u vs %u), deleting session",
reply.major, be32toh(msg.major));
- destroy_session(conn->session, conn->sessions_ht);
+ connection_put(conn);
ret = 0;
goto end;
}
@@ -1688,8 +1497,7 @@ end:
/*
* Check for data pending for a given stream id from the session daemon.
*/
-static
-int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
struct relay_session *session = conn->session;
@@ -1723,13 +1531,14 @@ int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
stream_id = be64toh(msg.stream_id);
last_net_seq_num = be64toh(msg.last_net_seq_num);
- rcu_read_lock();
- stream = stream_find_by_id(relay_streams_ht, stream_id);
+ stream = stream_get_by_id(stream_id);
if (stream == NULL) {
ret = -1;
- goto end_unlock;
+ goto end;
}
+ pthread_mutex_lock(&stream->lock);
+
DBG("Data pending for stream id %" PRIu64 " prev_seq %" PRIu64
" and last_seq %" PRIu64, stream_id, stream->prev_seq,
last_net_seq_num);
@@ -1743,11 +1552,11 @@ int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
ret = 1;
}
- /* Pending check is now done. */
- stream->data_pending_check_done = 1;
+ stream->data_pending_check_done = true;
+ pthread_mutex_unlock(&stream->lock);
-end_unlock:
- rcu_read_unlock();
+ stream_put(stream);
+end:
memset(&reply, 0, sizeof(reply));
reply.ret_code = htobe32(ret);
@@ -1767,14 +1576,12 @@ end_no_session:
* means that every subsequent commands or data received on the control socket
* has been handled. So, this is why we simply return OK here.
*/
-static
-int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
int ret;
uint64_t stream_id;
struct relay_stream *stream;
- struct lttng_ht_iter iter;
struct lttcomm_relayd_quiescent_control msg;
struct lttcomm_relayd_generic_reply reply;
@@ -1800,19 +1607,16 @@ int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr,
}
stream_id = be64toh(msg.stream_id);
-
- rcu_read_lock();
- cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
- node.node) {
- if (stream->stream_handle == stream_id) {
- stream->data_pending_check_done = 1;
- DBG("Relay quiescent control pending flag set to %" PRIu64,
- stream_id);
- break;
- }
- }
- rcu_read_unlock();
-
+ stream = stream_get_by_id(stream_id);
+ if (!stream) {
+ goto reply;
+ }
+ pthread_mutex_lock(&stream->lock);
+ stream->data_pending_check_done = true;
+ pthread_mutex_unlock(&stream->lock);
+ DBG("Relay quiescent control pending flag set to %" PRIu64, stream_id);
+ stream_put(stream);
+reply:
memset(&reply, 0, sizeof(reply));
reply.ret_code = htobe32(LTTNG_OK);
ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
@@ -1831,8 +1635,7 @@ end_no_session:
*
* This command returns to the client a LTTNG_OK code.
*/
-static
-int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
int ret;
@@ -1876,11 +1679,17 @@ int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
rcu_read_lock();
cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
node.node) {
- if (stream->session_id == session_id) {
- stream->data_pending_check_done = 0;
+ if (!stream_get(stream)) {
+ continue;
+ }
+ if (stream->trace->session->id == session_id) {
+ pthread_mutex_lock(&stream->lock);
+ stream->data_pending_check_done = false;
+ pthread_mutex_unlock(&stream->lock);
DBG("Set begin data pending flag to stream %" PRIu64,
stream->stream_handle);
}
+ stream_put(stream);
}
rcu_read_unlock();
@@ -1906,8 +1715,7 @@ end_no_session:
*
* Return to the client if there is data in flight or not with a ret_code.
*/
-static
-int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
int ret;
@@ -1918,9 +1726,6 @@ int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
uint64_t session_id;
uint32_t is_data_inflight = 0;
- assert(recv_hdr);
- assert(conn);
-
DBG("End data pending command");
if (!conn->session || conn->version_check_done == 0) {
@@ -1948,13 +1753,26 @@ int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
rcu_read_lock();
cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
node.node) {
- if (stream->session_id == session_id &&
- !stream->data_pending_check_done && !stream->terminated_flag) {
- is_data_inflight = 1;
- DBG("Data is still in flight for stream %" PRIu64,
- stream->stream_handle);
- break;
+ if (!stream_get(stream)) {
+ continue;
+ }
+ if (stream->trace->session->id != session_id) {
+ stream_put(stream);
+ continue;
}
+ pthread_mutex_lock(&stream->lock);
+ if (!stream->data_pending_check_done) {
+ if (!stream->closed || !(((int64_t) (stream->prev_seq - stream->last_net_seq_num)) >= 0)) {
+ is_data_inflight = 1;
+ DBG("Data is still in flight for stream %" PRIu64,
+ stream->stream_handle);
+ pthread_mutex_unlock(&stream->lock);
+ stream_put(stream);
+ break;
+ }
+ }
+ pthread_mutex_unlock(&stream->lock);
+ stream_put(stream);
}
rcu_read_unlock();
@@ -1976,14 +1794,13 @@ end_no_session:
*
* Return 0 on success else a negative value.
*/
-static
-int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
- int ret, send_ret, index_created = 0;
+ int ret, send_ret;
struct relay_session *session = conn->session;
struct lttcomm_relayd_index index_info;
- struct relay_index *index, *wr_index = NULL;
+ struct relay_index *index;
struct lttcomm_relayd_generic_reply reply;
struct relay_stream *stream;
uint64_t net_seq_num;
@@ -2013,13 +1830,12 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
net_seq_num = be64toh(index_info.net_seq_num);
- rcu_read_lock();
- stream = stream_find_by_id(relay_streams_ht,
- be64toh(index_info.relay_stream_id));
+ stream = stream_get_by_id(be64toh(index_info.relay_stream_id));
if (!stream) {
ret = -1;
- goto end_rcu_unlock;
+ goto end;
}
+ pthread_mutex_lock(&stream->lock);
/* Live beacon handling */
if (index_info.packet_size == 0) {
@@ -2033,56 +1849,40 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
stream->beacon_ts_end = be64toh(index_info.timestamp_end);
}
ret = 0;
- goto end_rcu_unlock;
+ goto end_stream_put;
} else {
stream->beacon_ts_end = -1ULL;
}
- index = relay_index_find(stream->stream_handle, net_seq_num);
- if (!index) {
- /* A successful creation will add the object to the HT. */
- index = relay_index_create(stream->stream_handle, net_seq_num);
- if (!index) {
- goto end_rcu_unlock;
- }
- index_created = 1;
- stream->indexes_in_flight++;
- }
-
- copy_index_control_data(index, &index_info);
if (stream->ctf_stream_id == -1ULL) {
stream->ctf_stream_id = be64toh(index_info.stream_id);
}
-
- if (index_created) {
- /*
- * Try to add the relay index object to the hash table. If an object
- * already exist, destroy back the index created, set the data in this
- * object and write it on disk.
- */
- relay_index_add(index, &wr_index);
- if (wr_index) {
- copy_index_control_data(wr_index, &index_info);
- free(index);
- }
- } else {
- /* The index already exists so write it on disk. */
- wr_index = index;
+ index = relay_index_get_by_id_or_create(stream, net_seq_num);
+ if (!index) {
+ ret = -1;
+ goto end_stream_put;
}
-
- /* Do we have a writable ready index to write on disk. */
- if (wr_index) {
- ret = relay_index_write(wr_index->fd, wr_index);
- if (ret < 0) {
- goto end_rcu_unlock;
- }
+ if (set_index_control_data(index, &index_info)) {
+ relay_index_put(index);
+ ret = -1;
+ goto end_stream_put;
+ }
+ ret = relay_index_try_flush(index);
+ if (ret == 0) {
stream->total_index_received++;
- stream->indexes_in_flight--;
- assert(stream->indexes_in_flight >= 0);
+ } else if (ret > 0) {
+ /* no flush. */
+ ret = 0;
+ } else {
+ relay_index_put(index);
+ ret = -1;
}
-end_rcu_unlock:
- rcu_read_unlock();
+end_stream_put:
+ pthread_mutex_unlock(&stream->lock);
+ stream_put(stream);
+
+end:
memset(&reply, 0, sizeof(reply));
if (ret < 0) {
@@ -2105,8 +1905,7 @@ end_no_session:
*
* Return 0 on success else a negative value.
*/
-static
-int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
int ret, send_ret;
@@ -2123,17 +1922,10 @@ int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr,
}
/*
- * Flag every pending stream in the connection recv list that they are
- * ready to be used by the viewer.
- */
- set_viewer_ready_flag(conn);
-
- /*
- * Inform the viewer that there are new streams in the session.
+ * Publish every pending stream in the connection recv list that
+ * they are ready to be used by the viewer.
*/
- if (conn->session->viewer_refcount) {
- uatomic_set(&conn->session->new_streams, 1);
- }
+ publish_connection_local_streams(conn);
memset(&reply, 0, sizeof(reply));
reply.ret_code = htobe32(LTTNG_OK);
@@ -2153,8 +1945,7 @@ end_no_session:
/*
* Process the commands received on the control socket
*/
-static
-int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
int ret = 0;
@@ -2211,20 +2002,17 @@ end:
/*
* Handle index for a data stream.
*
- * RCU read side lock MUST be acquired.
+ * Called with the stream lock held.
*
* Return 0 on success else a negative value.
*/
static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
int rotate_index)
{
- int ret = 0, index_created = 0;
- uint64_t stream_id, data_offset;
- struct relay_index *index, *wr_index = NULL;
-
- assert(stream);
+ int ret = 0;
+ uint64_t data_offset;
+ struct relay_index *index;
- stream_id = stream->stream_handle;
/* Get data offset because we are about to update the index. */
data_offset = htobe64(stream->tracefile_size_current);
@@ -2233,71 +2021,68 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
* exists, the control thread already received the data for it thus we need
* to write it on disk.
*/
- index = relay_index_find(stream_id, net_seq_num);
+ index = relay_index_get_by_id_or_create(stream, net_seq_num);
if (!index) {
- /* A successful creation will add the object to the HT. */
- index = relay_index_create(stream_id, net_seq_num);
- if (!index) {
- ret = -1;
- goto error;
- }
- index_created = 1;
- stream->indexes_in_flight++;
+ ret = -1;
+ goto end;
}
- if (rotate_index || stream->index_fd < 0) {
- index->to_close_fd = stream->index_fd;
- ret = index_create_file(stream->path_name, stream->channel_name,
- relayd_uid, relayd_gid, stream->tracefile_size,
- stream->tracefile_count_current);
- if (ret < 0) {
- /* This will close the stream's index fd if one. */
- relay_index_free_safe(index);
- goto error;
+ if (rotate_index || !stream->index_fd) {
+ int fd;
+
+ /* Put ref on previous index_fd. */
+ if (stream->index_fd) {
+ stream_fd_put(stream->index_fd);
+ stream->index_fd = NULL;
}
- stream->index_fd = ret;
- }
- index->fd = stream->index_fd;
- index->index_data.offset = data_offset;
- if (index_created) {
- /*
- * Try to add the relay index object to the hash table. If an object
- * already exist, destroy back the index created and set the data.
- */
- relay_index_add(index, &wr_index);
- if (wr_index) {
- /* Copy back data from the created index. */
- wr_index->fd = index->fd;
- wr_index->to_close_fd = index->to_close_fd;
- wr_index->index_data.offset = data_offset;
- free(index);
+ fd = index_create_file(stream->path_name, stream->channel_name,
+ relayd_uid, relayd_gid, stream->tracefile_size,
+ stream->current_tracefile_id);
+ if (fd < 0) {
+ ret = -1;
+ /* Put self-ref for this index due to error. */
+ relay_index_put(index);
+ goto end;
}
- } else {
- /* The index already exists so write it on disk. */
- wr_index = index;
+ stream->index_fd = stream_fd_create(fd);
+ if (!stream->index_fd) {
+ ret = -1;
+ if (close(fd)) {
+ PERROR("Error closing FD %d", fd);
+ }
+ /* Put self-ref for this index due to error. */
+ relay_index_put(index);
+ /* Will put the local ref. */
+ goto end;
+ }
+ }
+ if (relay_index_set_fd(index, stream->index_fd, data_offset)) {
+ ret = -1;
+ /* Put self-ref for this index due to error. */
+ relay_index_put(index);
+ goto end;
}
- /* Do we have a writable ready index to write on disk. */
- if (wr_index) {
- ret = relay_index_write(wr_index->fd, wr_index);
- if (ret < 0) {
- goto error;
- }
+ ret = relay_index_try_flush(index);
+ if (ret == 0) {
stream->total_index_received++;
- stream->indexes_in_flight--;
- assert(stream->indexes_in_flight >= 0);
+ } else if (ret > 0) {
+ /* No flush. */
+ ret = 0;
+ } else {
+ /* Put self-ref for this index due to error. */
+ relay_index_put(index);
+ ret = -1;
}
-
-error:
+end:
return ret;
}
/*
* relay_process_data: Process the data received on the data socket
*/
-static
-int relay_process_data(struct relay_connection *conn)
+static int relay_process_data(struct relay_connection *conn)
{
int ret = 0, rotate_index = 0;
ssize_t size_ret;
@@ -2324,17 +2109,12 @@ int relay_process_data(struct relay_connection *conn)
}
stream_id = be64toh(data_hdr.stream_id);
-
- rcu_read_lock();
- stream = stream_find_by_id(relay_streams_ht, stream_id);
+ stream = stream_get_by_id(stream_id);
if (!stream) {
ret = -1;
- goto end_rcu_unlock;
+ goto end;
}
-
- session = session_find_by_id(conn->sessions_ht, stream->session_id);
- assert(session);
-
+ session = stream->trace->session;
data_size = be32toh(data_hdr.data_size);
if (data_buffer_size < data_size) {
char *tmp_data_ptr;
@@ -2344,7 +2124,7 @@ int relay_process_data(struct relay_connection *conn)
ERR("Allocating data buffer");
free(data_buffer);
ret = -1;
- goto end_rcu_unlock;
+ goto end_stream_put;
}
data_buffer = tmp_data_ptr;
data_buffer_size = data_size;
@@ -2362,9 +2142,11 @@ int relay_process_data(struct relay_connection *conn)
DBG("Socket %d did an orderly shutdown", conn->sock->fd);
}
ret = -1;
- goto end_rcu_unlock;
+ goto end_stream_put;
}
+ pthread_mutex_lock(&stream->lock);
+
/* Check if a rotation is needed. */
if (stream->tracefile_size > 0 &&
(stream->tracefile_size_current + data_size) >
@@ -2372,55 +2154,32 @@ int relay_process_data(struct relay_connection *conn)
struct relay_viewer_stream *vstream;
uint64_t new_id;
- new_id = (stream->tracefile_count_current + 1) %
+ new_id = (stream->current_tracefile_id + 1) %
stream->tracefile_count;
/*
- * When we wrap-around back to 0, we start overwriting old
- * trace data.
+ * Move viewer oldest available data position forward if
+ * we are overwriting a tracefile.
*/
- if (!stream->tracefile_overwrite && new_id == 0) {
- stream->tracefile_overwrite = 1;
- }
- pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
- if (stream->tracefile_overwrite) {
+ if (new_id == stream->oldest_tracefile_id) {
stream->oldest_tracefile_id =
(stream->oldest_tracefile_id + 1) %
stream->tracefile_count;
}
- vstream = viewer_stream_find_by_id(stream->stream_handle);
+ vstream = viewer_stream_get_by_id(stream->stream_handle);
if (vstream) {
- /*
- * The viewer is reading a file about to be
- * overwritten. Close the FDs it is
- * currently using and let it handle the fault.
- */
- if (vstream->tracefile_count_current == new_id) {
- pthread_mutex_lock(&vstream->overwrite_lock);
- vstream->abort_flag = 1;
- pthread_mutex_unlock(&vstream->overwrite_lock);
- DBG("Streaming side setting abort_flag on stream %s_%" PRIu64 "\n",
- stream->channel_name, new_id);
- } else if (vstream->tracefile_count_current ==
- stream->tracefile_count_current) {
- /*
- * The reader and writer were in the
- * same trace file, inform the viewer
- * that no new index will ever be added
- * to this file.
- */
- vstream->close_write_flag = 1;
+ pthread_mutex_lock(&vstream->lock);
+ ret = utils_rotate_stream_file(stream->path_name, stream->channel_name,
+ stream->tracefile_size, stream->tracefile_count,
+ relayd_uid, relayd_gid, stream->stream_fd->fd,
+ &stream->current_tracefile_id, &stream->stream_fd->fd);
+ pthread_mutex_unlock(&vstream->lock);
+ viewer_stream_put(vstream);
+ if (ret < 0) {
+ ERR("Rotating stream output file");
+ goto end_stream_unlock;
}
}
- ret = utils_rotate_stream_file(stream->path_name, stream->channel_name,
- stream->tracefile_size, stream->tracefile_count,
- relayd_uid, relayd_gid, stream->fd,
- &(stream->tracefile_count_current), &stream->fd);
- pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
- if (ret < 0) {
- ERR("Rotating stream output file");
- goto end_rcu_unlock;
- }
- /* Reset current size because we just perform a stream rotation. */
+ /* Reset current size because we just performed a stream rotation. */
stream->tracefile_size_current = 0;
rotate_index = 1;
}
@@ -2432,44 +2191,41 @@ int relay_process_data(struct relay_connection *conn)
if (session->minor >= 4 && !session->snapshot) {
ret = handle_index_data(stream, net_seq_num, rotate_index);
if (ret < 0) {
- goto end_rcu_unlock;
+ goto end_stream_unlock;
}
}
/* Write data to stream output fd. */
- size_ret = lttng_write(stream->fd, data_buffer, data_size);
+ size_ret = lttng_write(stream->stream_fd->fd, data_buffer, data_size);
if (size_ret < data_size) {
ERR("Relay error writing data to file");
ret = -1;
- goto end_rcu_unlock;
+ goto end_stream_unlock;
}
DBG2("Relay wrote %d bytes to tracefile for stream id %" PRIu64,
ret, stream->stream_handle);
- ret = write_padding_to_file(stream->fd, be32toh(data_hdr.padding_size));
+ ret = write_padding_to_file(stream->stream_fd->fd, be32toh(data_hdr.padding_size));
if (ret < 0) {
- goto end_rcu_unlock;
+ goto end_stream_unlock;
}
stream->tracefile_size_current += data_size + be32toh(data_hdr.padding_size);
stream->prev_seq = net_seq_num;
- try_close_stream(session, stream);
-
-end_rcu_unlock:
- rcu_read_unlock();
+end_stream_unlock:
+ pthread_mutex_unlock(&stream->lock);
+end_stream_put:
+ stream_put(stream);
end:
return ret;
}
-static
-void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
+static void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
{
int ret;
- assert(events);
-
(void) lttng_poll_del(events, pollfd);
ret = close(pollfd);
@@ -2478,27 +2234,36 @@ void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
}
}
-static void destroy_connection(struct lttng_ht *relay_connections_ht,
- struct relay_connection *conn)
+static void relay_thread_close_connection(struct lttng_poll_event *events,
+ int pollfd, struct relay_connection *conn)
{
- assert(relay_connections_ht);
- assert(conn);
-
- connection_delete(relay_connections_ht, conn);
+ const char *type_str;
- /* For the control socket, we try to destroy the session. */
- if (conn->type == RELAY_CONTROL && conn->session) {
- destroy_session(conn->session, conn->sessions_ht);
+ switch (conn->type) {
+ case RELAY_DATA:
+ type_str = "Data";
+ break;
+ case RELAY_CONTROL:
+ type_str = "Control";
+ break;
+ case RELAY_VIEWER_COMMAND:
+ type_str = "Viewer Command";
+ break;
+ case RELAY_VIEWER_NOTIFICATION:
+ type_str = "Viewer Notification";
+ break;
+ default:
+ type_str = "Unknown";
}
-
- connection_destroy(conn);
+ cleanup_connection_pollfd(events, pollfd);
+ connection_put(conn);
+ DBG("%s connection closed with %d", type_str, pollfd);
}
/*
* This thread does the actual work
*/
-static
-void *relay_thread_worker(void *data)
+static void *relay_thread_worker(void *data)
{
int ret, err = -1, last_seen_data_fd = -1;
uint32_t nb_fd;
@@ -2506,9 +2271,6 @@ void *relay_thread_worker(void *data)
struct lttng_ht *relay_connections_ht;
struct lttng_ht_iter iter;
struct lttcomm_relayd_hdr recv_hdr;
- struct relay_local_data *relay_ctx = (struct relay_local_data *) data;
- struct lttng_ht *sessions_ht = relay_ctx->sessions_ht;
- struct relay_index *index;
struct relay_connection *destroy_conn = NULL;
DBG("[thread] Relay worker started");
@@ -2529,12 +2291,6 @@ void *relay_thread_worker(void *data)
goto relay_connections_ht_error;
}
- /* Tables of received indexes indexed by index handle and net_seq_num. */
- indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_TWO_U64);
- if (!indexes_ht) {
- goto indexes_ht_error;
- }
-
ret = create_thread_poll_set(&events, 2);
if (ret < 0) {
goto error_poll_create;
@@ -2604,27 +2360,20 @@ restart:
if (ret < 0) {
goto error;
}
- conn->sessions_ht = sessions_ht;
- connection_init(conn);
lttng_poll_add(&events, conn->sock->fd,
LPOLLIN | LPOLLRDHUP);
- rcu_read_lock();
- lttng_ht_add_unique_ulong(relay_connections_ht,
- &conn->sock_n);
- rcu_read_unlock();
+ connection_ht_add(relay_connections_ht, conn);
DBG("Connection socket %d added", conn->sock->fd);
}
} else {
struct relay_connection *ctrl_conn;
- rcu_read_lock();
- ctrl_conn = connection_find_by_sock(relay_connections_ht, pollfd);
+ ctrl_conn = connection_get_by_sock(relay_connections_ht, pollfd);
/* If not found, there is a synchronization issue. */
assert(ctrl_conn);
if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
- cleanup_connection_pollfd(&events, pollfd);
- destroy_connection(relay_connections_ht, ctrl_conn);
+ relay_thread_close_connection(&events, pollfd, ctrl_conn);
if (last_seen_data_fd == pollfd) {
last_seen_data_fd = last_notdel_data_fd;
}
@@ -2634,16 +2383,14 @@ restart:
sizeof(recv_hdr), 0);
if (ret <= 0) {
/* Connection closed */
- cleanup_connection_pollfd(&events, pollfd);
- destroy_connection(relay_connections_ht, ctrl_conn);
- DBG("Control connection closed with %d", pollfd);
+ relay_thread_close_connection(&events, pollfd,
+ ctrl_conn);
} else {
ret = relay_process_control(&recv_hdr, ctrl_conn);
if (ret < 0) {
/* Clear the session on error. */
- cleanup_connection_pollfd(&events, pollfd);
- destroy_connection(relay_connections_ht, ctrl_conn);
- DBG("Connection closed with %d", pollfd);
+ relay_thread_close_connection(&events, pollfd,
+ ctrl_conn);
}
seen_control = 1;
}
@@ -2658,7 +2405,7 @@ restart:
} else {
ERR("Unknown poll events %u for sock %d", revents, pollfd);
}
- rcu_read_unlock();
+ connection_put(ctrl_conn);
}
}
@@ -2702,26 +2449,22 @@ restart:
continue;
}
- rcu_read_lock();
- data_conn = connection_find_by_sock(relay_connections_ht, pollfd);
+ data_conn = connection_get_by_sock(relay_connections_ht, pollfd);
if (!data_conn) {
/* Skip it. Might be removed before. */
- rcu_read_unlock();
continue;
}
if (revents & LPOLLIN) {
if (data_conn->type != RELAY_DATA) {
- rcu_read_unlock();
- continue;
+ goto put_connection;
}
ret = relay_process_data(data_conn);
/* Connection closed */
if (ret < 0) {
- cleanup_connection_pollfd(&events, pollfd);
- destroy_connection(relay_connections_ht, data_conn);
- DBG("Data connection closed with %d", pollfd);
+ relay_thread_close_connection(&events, pollfd,
+ data_conn);
/*
* Every goto restart call sets the last seen fd where
* here we don't really care since we gracefully
@@ -2730,11 +2473,12 @@ restart:
} else {
/* Keep last seen port. */
last_seen_data_fd = pollfd;
- rcu_read_unlock();
+ connection_put(data_conn);
goto restart;
}
}
- rcu_read_unlock();
+ put_connection:
+ connection_put(data_conn);
}
last_seen_data_fd = -1;
}
@@ -2744,28 +2488,24 @@ restart:
exit:
error:
- lttng_poll_clean(&events);
-
/* Cleanup reamaining connection object. */
rcu_read_lock();
cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter,
destroy_conn,
sock_n.node) {
health_code_update();
- destroy_connection(relay_connections_ht, destroy_conn);
+ if (!connection_get(destroy_conn)) {
+ continue;
+ }
+ relay_thread_close_connection(&events, destroy_conn->sock->fd,
+ destroy_conn);
+ connection_put(destroy_conn);
}
rcu_read_unlock();
+
+ lttng_poll_clean(&events);
+
error_poll_create:
- rcu_read_lock();
- cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index,
- index_n.node) {
- health_code_update();
- relay_index_delete(index);
- relay_index_free_safe(index);
- }
- rcu_read_unlock();
- lttng_ht_destroy(indexes_ht);
-indexes_ht_error:
lttng_ht_destroy(relay_connections_ht);
relay_connections_ht_error:
/* Close relay conn pipes */
@@ -2806,7 +2546,6 @@ int main(int argc, char **argv)
{
int ret = 0, retval = 0;
void *status;
- struct relay_local_data *relay_ctx = NULL;
/* Parse arguments */
progname = argv[0];
@@ -2904,16 +2643,9 @@ int main(int argc, char **argv)
lttcomm_init();
lttcomm_inet_init();
- relay_ctx = zmalloc(sizeof(struct relay_local_data));
- if (!relay_ctx) {
- PERROR("relay_ctx");
- retval = -1;
- goto exit_init_data;
- }
-
/* tables of sessions indexed by session ID */
- relay_ctx->sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
- if (!relay_ctx->sessions_ht) {
+ sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!sessions_ht) {
retval = -1;
goto exit_init_data;
}
@@ -2960,7 +2692,7 @@ int main(int argc, char **argv)
/* Setup the worker thread */
ret = pthread_create(&worker_thread, NULL,
- relay_thread_worker, (void *) relay_ctx);
+ relay_thread_worker, NULL);
if (ret) {
errno = ret;
PERROR("pthread_create worker");
@@ -2978,7 +2710,7 @@ int main(int argc, char **argv)
goto exit_listener_thread;
}
- ret = relayd_live_create(live_uri, relay_ctx);
+ ret = relayd_live_create(live_uri);
if (ret) {
ERR("Starting live viewer threads");
retval = -1;
@@ -3035,7 +2767,10 @@ exit_init_data:
health_app_destroy(health_relayd);
exit_health_app_create:
exit_options:
- relayd_cleanup(relay_ctx);
+ relayd_cleanup();
+
+ /* Ensure all prior call_rcu are done. */
+ rcu_barrier();
if (!retval) {
exit(EXIT_SUCCESS);
diff --git a/src/bin/lttng-relayd/session.c b/src/bin/lttng-relayd/session.c
index 46d9cc6..8943216 100644
--- a/src/bin/lttng-relayd/session.c
+++ b/src/bin/lttng-relayd/session.c
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -19,28 +20,25 @@
#define _GNU_SOURCE
#define _LGPL_SOURCE
#include <common/common.h>
+#include <urcu/rculist.h>
+#include "lttng-relayd.h"
#include "ctf-trace.h"
#include "session.h"
#include "stream.h"
/* Global session id used in the session creation. */
static uint64_t last_relay_session_id;
-
-static void rcu_destroy_session(struct rcu_head *head)
-{
- struct relay_session *session =
- caa_container_of(head, struct relay_session, rcu_node);
-
- free(session);
-}
+static pthread_mutex_t last_relay_session_id_lock = PTHREAD_MUTEX_INITIALIZER;
/*
* Create a new session by assigning a new session ID.
*
* Return allocated session or else NULL.
*/
-struct relay_session *session_create(void)
+struct relay_session *session_create(const char *session_name,
+ const char *hostname, uint32_t live_timer,
+ bool snapshot, uint32_t major, uint32_t minor)
{
struct relay_session *session;
@@ -57,30 +55,62 @@ struct relay_session *session_create(void)
goto error;
}
- pthread_mutex_init(&session->viewer_ready_lock, NULL);
+ pthread_mutex_lock(&last_relay_session_id_lock);
session->id = ++last_relay_session_id;
+ pthread_mutex_unlock(&last_relay_session_id_lock);
+
+ session->major = major;
+ session->minor = minor;
lttng_ht_node_init_u64(&session->session_n, session->id);
+ urcu_ref_init(&session->ref);
+ CDS_INIT_LIST_HEAD(&session->recv_list);
+ pthread_mutex_init(&session->lock, NULL);
+ pthread_mutex_init(&session->reflock, NULL);
+ pthread_mutex_init(&session->recv_list_lock, NULL);
+
+ strncpy(session->session_name, session_name,
+ sizeof(session->session_name));
+ strncpy(session->hostname, hostname,
+ sizeof(session->hostname));
+ session->live_timer = live_timer;
+ session->snapshot = snapshot;
+
+ lttng_ht_add_unique_u64(sessions_ht, &session->session_n);
error:
return session;
}
+/* Should be called with RCU read-side lock held. */
+bool session_get(struct relay_session *session)
+{
+ bool has_ref = false;
+
+ pthread_mutex_lock(&session->reflock);
+ if (session->ref.refcount != 0) {
+ has_ref = true;
+ urcu_ref_get(&session->ref);
+ }
+ pthread_mutex_unlock(&session->reflock);
+
+ return has_ref;
+}
+
/*
- * Lookup a session within the given hash table and session id. RCU read side
- * lock MUST be acquired before calling this and as long as the caller has a
- * reference to the object.
+ * Lookup a session within the session hash table using the session id
+ * as key. A session reference is taken when a session is returned.
+ * session_put() must be called on that session.
*
* Return session or NULL if not found.
*/
-struct relay_session *session_find_by_id(struct lttng_ht *ht, uint64_t id)
+struct relay_session *session_get_by_id(uint64_t id)
{
struct relay_session *session = NULL;
struct lttng_ht_node_u64 *node;
struct lttng_ht_iter iter;
- assert(ht);
-
- lttng_ht_lookup(ht, &id, &iter);
+ rcu_read_lock();
+ lttng_ht_lookup(sessions_ht, &id, &iter);
node = lttng_ht_iter_get_node_u64(&iter);
if (!node) {
DBG("Session find by ID %" PRIu64 " id NOT found", id);
@@ -88,97 +118,100 @@ struct relay_session *session_find_by_id(struct lttng_ht *ht, uint64_t id)
}
session = caa_container_of(node, struct relay_session, session_n);
DBG("Session find by ID %" PRIu64 " id found", id);
-
+ if (!session_get(session)) {
+ session = NULL;
+ }
end:
+ rcu_read_unlock();
return session;
}
+static void rcu_destroy_session(struct rcu_head *rcu_head)
+{
+ struct relay_session *session =
+ caa_container_of(rcu_head, struct relay_session, rcu_node);
+
+ free(session);
+}
+
/*
* Delete session from the given hash table.
*
* Return lttng ht del error code being 0 on success and 1 on failure.
*/
-int session_delete(struct lttng_ht *ht, struct relay_session *session)
+static int session_delete(struct relay_session *session)
{
struct lttng_ht_iter iter;
- assert(ht);
- assert(session);
-
iter.iter.node = &session->session_n.node;
- return lttng_ht_del(ht, &iter);
+ return lttng_ht_del(sessions_ht, &iter);
}
-/*
- * The caller MUST be from the viewer thread since the viewer refcount is
- * decremented. With this calue down to 0, it will try to destroy the session.
- */
-void session_viewer_try_destroy(struct lttng_ht *ht,
- struct relay_session *session)
-{
- unsigned long ret_ref;
- assert(session);
+static void destroy_session(struct relay_session *session)
+{
+ int ret;
- ret_ref = uatomic_add_return(&session->viewer_refcount, -1);
- if (ret_ref == 0) {
- session_try_destroy(ht, session);
- }
+ ret = session_delete(session);
+ assert(!ret);
+ /*
+ * Since each trace has a reference on the session, it means
+ * that if we are at the point where we teardown the session, no
+ * trace belonging to that session exist at this point.
+ */
+ lttng_ht_destroy(session->ctf_traces_ht);
+ call_rcu(&session->rcu_node, rcu_destroy_session);
}
-/*
- * Should only be called from the main streaming thread since it does not touch
- * the viewer refcount. If this refcount is down to 0, destroy the session only
- * and only if the session deletion succeeds. This is done because the viewer
- * *and* the streaming thread can both concurently try to destroy the session
- * thus the first come first serve.
- */
-void session_try_destroy(struct lttng_ht *ht, struct relay_session *session)
+void session_release(struct urcu_ref *ref)
{
- int ret = 0;
- unsigned long ret_ref;
+ struct relay_session *session =
+ caa_container_of(ref, struct relay_session, ref);
- assert(session);
+ destroy_session(session);
+}
- ret_ref = uatomic_read(&session->viewer_refcount);
- if (ret_ref == 0 && session->close_flag) {
- if (ht) {
- ret = session_delete(ht, session);
- }
- if (!ret) {
- /* Only destroy the session if the deletion was successful. */
- session_destroy(session);
- }
- }
+void session_put(struct relay_session *session)
+{
+ rcu_read_lock();
+ pthread_mutex_lock(&session->reflock);
+ urcu_ref_put(&session->ref, session_release);
+ pthread_mutex_unlock(&session->reflock);
+ rcu_read_unlock();
}
-/*
- * Destroy a session object.
- *
- * This function must *NOT* be called with an RCU read lock held since
- * the session's ctf_traces_ht is destroyed.
- */
-void session_destroy(struct relay_session *session)
+int session_close(struct relay_session *session)
{
- struct ctf_trace *ctf_trace;
+ int ret = 0;
+ struct ctf_trace *trace;
struct lttng_ht_iter iter;
- assert(session);
-
- DBG("Relay destroying session %" PRIu64, session->id);
+ pthread_mutex_lock(&session->lock);
+ if (session->connection_closed) {
+ ret = -1;
+ goto unlock;
+ }
+ session->connection_closed = true;
+unlock:
+ pthread_mutex_unlock(&session->lock);
+ if (ret) {
+ return ret;
+ }
- /*
- * Empty the ctf trace hash table which will destroy the stream contained
- * in that table.
- */
rcu_read_lock();
- cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace,
- node.node) {
- ctf_trace_delete(session->ctf_traces_ht, ctf_trace);
- ctf_trace_destroy(ctf_trace);
+ cds_lfht_for_each_entry(session->ctf_traces_ht->ht,
+ &iter.iter, trace, node.node) {
+ ret = ctf_trace_close(trace);
+ if (ret) {
+ goto rcu_unlock;
+ }
}
+rcu_unlock:
rcu_read_unlock();
- lttng_ht_destroy(session->ctf_traces_ht);
-
- call_rcu(&session->rcu_node, rcu_destroy_session);
+ if (ret) {
+ return ret;
+ }
+ /* Put self-reference from create. */
+ session_put(session);
+ return 0;
}
diff --git a/src/bin/lttng-relayd/session.h b/src/bin/lttng-relayd/session.h
index cb125be..960d8f9 100644
--- a/src/bin/lttng-relayd/session.h
+++ b/src/bin/lttng-relayd/session.h
@@ -1,6 +1,10 @@
+#ifndef _SESSION_H
+#define _SESSION_H
+
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -16,13 +20,11 @@
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef _SESSION_H
-#define _SESSION_H
-
#include <limits.h>
#include <inttypes.h>
#include <pthread.h>
#include <urcu/list.h>
+#include <urcu/ref.h>
#include <common/hashtable/hashtable.h>
@@ -31,35 +33,59 @@
*/
struct relay_session {
/*
- * This session id is used to identify a set of stream to a tracing session
- * but also make sure we have a unique session id associated with a session
- * daemon which can provide multiple data source.
+ * This session id is used to identify a set of stream to a
+ * tracing session but also make sure we have a unique session
+ * id associated with a session daemon which can provide
+ * multiple data source.
*/
uint64_t id;
char session_name[NAME_MAX];
char hostname[HOST_NAME_MAX];
uint32_t live_timer;
- struct lttng_ht_node_u64 session_n;
- struct rcu_head rcu_node;
- uint32_t stream_count;
+
/* Tell if this session is for a snapshot or not. */
- unsigned int snapshot:1;
- /* Tell if the session has been closed on the streaming side. */
- unsigned int close_flag:1;
+ bool snapshot;
+
+ /*
+ * Session has no back reference to its connection because it
+ * has a life-time that can be longer than the consumer connection
+ * life-time: a reference can still be held by the viewer
+ * connection.
+ */
+
+ /* Reference count of ctf-traces and viewers using the session. */
+ struct urcu_ref ref;
+ /* session reflock nests inside ctf_trace reflock. */
+ pthread_mutex_t reflock;
+
+ pthread_mutex_t lock;
+
+ /*
+ * major/minor version used for this session.
+ */
+ uint32_t major;
+ uint32_t minor;
- /* Number of viewer using it. Set to 0, it should be destroyed. */
- int viewer_refcount;
+ bool viewer_attached;
+ /* Tell if the session connection has been closed on the streaming side. */
+ bool connection_closed;
/* Contains ctf_trace object of that session indexed by path name. */
struct lttng_ht *ctf_traces_ht;
/*
- * Indicate version protocol for this session. This is especially useful
- * for the data thread that has no idea which version it operates on since
- * linking control/data sockets is non trivial.
+ * This contains streams that are received on that connection.
+ * It's used to store them until we get the streams sent
+ * command. When this is received, we remove those streams for
+ * the list and publish them.
+ * Updates are protected by the recv_list_lock.
+ * Traversals are protected by RCU.
+ * recv_list_lock also protects stream_count.
*/
- uint64_t minor;
- uint64_t major;
+ struct cds_list_head recv_list; /* RCU list. */
+ uint32_t stream_count;
+ pthread_mutex_t recv_list_lock;
+
/*
* Flag checked and exchanged with uatomic_cmpxchg to tell the
* viewer-side if new streams got added since the last check.
@@ -67,50 +93,25 @@ struct relay_session {
unsigned long new_streams;
/*
- * Used to synchronize the process where we flag every streams readiness
- * for the viewer when the streams_sent message is received and the viewer
- * process of sending those streams.
+ * Node in the global session hash table.
*/
- pthread_mutex_t viewer_ready_lock;
-
+ struct lttng_ht_node_u64 session_n;
/*
* Member of the session list in struct relay_viewer_session.
+ * Updates are protected by the relay_viewer_session
+ * session_list_lock. Traversals are protected by RCU.
*/
- struct cds_list_head viewer_session_list;
+ struct cds_list_head viewer_session_node;
+ struct rcu_head rcu_node; /* For call_rcu teardown. */
};
-struct relay_viewer_session {
- struct cds_list_head sessions_head;
-};
-
-static inline void session_viewer_attach(struct relay_session *session)
-{
- uatomic_inc(&session->viewer_refcount);
-}
-
-static inline void session_viewer_detach(struct relay_session *session)
-{
- uatomic_add(&session->viewer_refcount, -1);
-}
-
-struct relay_session *session_find_by_id(struct lttng_ht *ht, uint64_t id);
-struct relay_session *session_create(void);
-int session_delete(struct lttng_ht *ht, struct relay_session *session);
+struct relay_session *session_create(const char *session_name,
+ const char *hostname, uint32_t live_timer,
+ bool snapshot, uint32_t major, uint32_t minor);
+struct relay_session *session_get_by_id(uint64_t id);
+bool session_get(struct relay_session *session);
+void session_put(struct relay_session *session);
-/*
- * Direct destroy without reading the refcount.
- */
-void session_destroy(struct relay_session *session);
-
-/*
- * Destroy the session if the refcount is down to 0.
- */
-void session_try_destroy(struct lttng_ht *ht, struct relay_session *session);
-
-/*
- * Decrement the viewer refcount and destroy it if down to 0.
- */
-void session_viewer_try_destroy(struct lttng_ht *ht,
- struct relay_session *session);
+int session_close(struct relay_session *session);
#endif /* _SESSION_H */
diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c
index 17a5bcd..de2dbf6 100644
--- a/src/bin/lttng-relayd/stream.c
+++ b/src/bin/lttng-relayd/stream.c
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -19,128 +20,314 @@
#define _GNU_SOURCE
#define _LGPL_SOURCE
#include <common/common.h>
+#include <common/utils.h>
+#include <common/defaults.h>
+#include <urcu/rculist.h>
+#include <sys/stat.h>
+#include "lttng-relayd.h"
#include "index.h"
#include "stream.h"
#include "viewer-stream.h"
+/* Should be called with RCU read-side lock held. */
+bool stream_get(struct relay_stream *stream)
+{
+ bool has_ref = false;
+
+ pthread_mutex_lock(&stream->reflock);
+ if (stream->ref.refcount != 0) {
+ has_ref = true;
+ urcu_ref_get(&stream->ref);
+ }
+ pthread_mutex_unlock(&stream->reflock);
+
+ return has_ref;
+}
+
/*
- * Get stream from stream id from the given hash table. Return stream if found
- * else NULL.
- *
- * Need to be called with RCU read-side lock held.
+ * Get stream from stream id from the streams hash table. Return stream
+ * if found else NULL. A stream reference is taken when a stream is
+ * returned. stream_put() must be called on that stream.
*/
-struct relay_stream *stream_find_by_id(struct lttng_ht *ht,
- uint64_t stream_id)
+struct relay_stream *stream_get_by_id(uint64_t stream_id)
{
struct lttng_ht_node_u64 *node;
struct lttng_ht_iter iter;
struct relay_stream *stream = NULL;
- assert(ht);
-
- lttng_ht_lookup(ht, &stream_id, &iter);
+ rcu_read_lock();
+ lttng_ht_lookup(relay_streams_ht, &stream_id, &iter);
node = lttng_ht_iter_get_node_u64(&iter);
- if (node == NULL) {
+ if (!node) {
DBG("Relay stream %" PRIu64 " not found", stream_id);
goto end;
}
stream = caa_container_of(node, struct relay_stream, node);
-
+ if (!stream_get(stream)) {
+ stream = NULL;
+ }
end:
+ rcu_read_unlock();
return stream;
}
/*
- * Close a given stream. If an assosiated viewer stream exists it is updated.
- *
- * RCU read side lock MUST be acquired.
- *
- * Return 0 if close was successful or 1 if already closed.
+ * We keep ownership of path_name and channel_name.
*/
-int stream_close(struct relay_session *session, struct relay_stream *stream)
+struct relay_stream *stream_create(struct ctf_trace *trace,
+ uint64_t stream_handle, char *path_name,
+ char *channel_name, uint64_t tracefile_size,
+ uint64_t tracefile_count)
{
- int delret, ret;
- struct relay_viewer_stream *vstream;
- struct ctf_trace *ctf_trace;
+ int ret;
+ struct relay_stream *stream = NULL;
+ struct relay_session *session = trace->session;
- assert(stream);
+ stream = zmalloc(sizeof(struct relay_stream));
+ if (stream == NULL) {
+ PERROR("relay stream zmalloc");
+ ret = -1;
+ goto error_no_alloc;
+ }
- pthread_mutex_lock(&stream->lock);
+ stream->stream_handle = stream_handle;
+ stream->prev_seq = -1ULL;
+ stream->ctf_stream_id = -1ULL;
+ stream->tracefile_size = tracefile_size;
+ stream->tracefile_count = tracefile_count;
+ stream->path_name = path_name;
+ stream->channel_name = channel_name;
+ lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
+ pthread_mutex_init(&stream->lock, NULL);
+ pthread_mutex_init(&stream->reflock, NULL);
+ urcu_ref_init(&stream->ref);
+ ctf_trace_get(trace);
+ stream->trace = trace;
- if (stream->terminated_flag) {
- /* This stream is already closed. Ignore. */
- ret = 1;
- goto end_unlock;
+ stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!stream->indexes_ht) {
+ ERR("Cannot created indexes_ht");
+ ret = -1;
+ goto end;
}
- DBG("Closing stream id %" PRIu64, stream->stream_handle);
+ ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG);
+ if (ret < 0) {
+ ERR("relay creating output directory");
+ goto end;
+ }
- if (stream->fd >= 0) {
- delret = close(stream->fd);
- if (delret < 0) {
- PERROR("close stream");
+ /*
+ * No need to use run_as API here because whatever we receives,
+ * the relayd uses its own credentials for the stream files.
+ */
+ ret = utils_create_stream_file(stream->path_name, stream->channel_name,
+ stream->tracefile_size, 0, relayd_uid, relayd_gid, NULL);
+ if (ret < 0) {
+ ERR("Create output file");
+ goto end;
+ }
+ stream->stream_fd = stream_fd_create(ret);
+ if (!stream->stream_fd) {
+ if (close(ret)) {
+ PERROR("Error closing file %d", ret);
}
+ ret = -1;
+ goto end;
+ }
+ if (stream->tracefile_size) {
+ DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name);
+ } else {
+ DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name);
}
- if (stream->index_fd >= 0) {
- delret = close(stream->index_fd);
- if (delret < 0) {
- PERROR("close stream index_fd");
- }
+ if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, NAME_MAX)) {
+ stream->is_metadata = 1;
+ /* Assign quick reference to the metadata stream in the trace. */
+ trace->metadata_stream = stream;
}
- vstream = viewer_stream_find_by_id(stream->stream_handle);
- if (vstream) {
+ /*
+ * Add the stream in the recv list of the session. Once the end stream
+ * message is received, all session streams are published.
+ */
+ pthread_mutex_lock(&session->recv_list_lock);
+ cds_list_add_rcu(&stream->recv_node, &session->recv_list);
+ session->stream_count++;
+ pthread_mutex_unlock(&session->recv_list_lock);
+
+ if (stream->is_metadata) {
/*
- * Set the last good value into the viewer stream. This is done
- * right before the stream gets deleted from the hash table. The
- * lookup failure on the live thread side of a stream indicates
- * that the viewer stream index received value should be used.
+ * Session daemon expects metadata to be published
+ * without issuing any streams sent cmd in snapshot
+ * mode.
*/
- pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
- vstream->total_index_received = stream->total_index_received;
- vstream->tracefile_count_last = stream->tracefile_count_current;
- vstream->close_write_flag = 1;
- pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
+ stream_publish(stream);
}
- /* Cleanup index of that stream. */
- relay_index_destroy_by_stream_id(stream->stream_handle);
+ DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
+ stream->stream_handle);
+ ret = 0;
- ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
- stream->path_name);
- assert(ctf_trace);
- ctf_trace_put_ref(ctf_trace);
+end:
+ if (ret) {
+ if (stream->stream_fd) {
+ stream_fd_put(stream->stream_fd);
+ stream->stream_fd = NULL;
+ }
+ stream_put(stream);
+ ctf_trace_put(trace);
+ stream = NULL;
+ }
+ return stream;
- stream->close_flag = 1;
- stream->terminated_flag = 1;
- ret = 0;
+error_no_alloc:
+ /*
+ * path_name and channel_name need to be freed explicitly here
+ * because we cannot rely on stream_put().
+ */
+ free(path_name);
+ free(channel_name);
+ return NULL;
+}
+
+void stream_publish(struct relay_stream *stream)
+{
+ struct relay_session *session;
+
+ pthread_mutex_lock(&stream->lock);
+ if (stream->published) {
+ goto unlock;
+ }
+
+ session = stream->trace->session;
+
+ pthread_mutex_lock(&session->recv_list_lock);
+ if (stream->in_recv_list) {
+ cds_list_del_rcu(&stream->recv_node);
+ stream->in_recv_list = false;
+ }
+ pthread_mutex_unlock(&session->recv_list_lock);
-end_unlock:
+ /*
+ * Both in the ctf_trace object and the global stream ht since the data
+ * side of the relayd does not have the concept of session.
+ */
+ lttng_ht_add_unique_u64(relay_streams_ht, &stream->node);
+
+ pthread_mutex_lock(&stream->trace->stream_list_lock);
+ cds_list_add_rcu(&stream->stream_node, &stream->trace->stream_list);
+ pthread_mutex_unlock(&stream->trace->stream_list_lock);
+
+ stream->published = true;
+unlock:
pthread_mutex_unlock(&stream->lock);
- return ret;
}
-void stream_delete(struct lttng_ht *ht, struct relay_stream *stream)
+/*
+ * Only called from destroy. No stream lock needed, since there is a
+ * single user at this point.
+ */
+static void stream_unpublish(struct relay_stream *stream)
{
int ret;
struct lttng_ht_iter iter;
- assert(ht);
- assert(stream);
+ if (!stream->published) {
+ return;
+ }
+ pthread_mutex_lock(&stream->trace->stream_list_lock);
+ cds_list_del_rcu(&stream->stream_node);
+ pthread_mutex_unlock(&stream->trace->stream_list_lock);
iter.iter.node = &stream->node.node;
- ret = lttng_ht_del(ht, &iter);
+ ret = lttng_ht_del(relay_streams_ht, &iter);
assert(!ret);
-
- cds_list_del(&stream->trace_list);
+ stream->published = false;
}
-void stream_destroy(struct relay_stream *stream)
+static void stream_destroy(struct relay_stream *stream)
{
- assert(stream);
+ if (stream->indexes_ht) {
+ lttng_ht_destroy(stream->indexes_ht);
+ }
free(stream->path_name);
free(stream->channel_name);
free(stream);
}
+
+static void stream_destroy_rcu(struct rcu_head *rcu_head)
+{
+ struct relay_stream *stream =
+ caa_container_of(rcu_head, struct relay_stream, rcu_node);
+
+ stream_destroy(stream);
+}
+
+static void stream_release(struct urcu_ref *ref)
+{
+ struct relay_stream *stream =
+ caa_container_of(ref, struct relay_stream, ref);
+ struct relay_session *session;
+
+ session = stream->trace->session;
+
+ DBG("Releasing stream id %" PRIu64, stream->stream_handle);
+
+ pthread_mutex_lock(&session->recv_list_lock);
+ session->stream_count--;
+ if (stream->in_recv_list) {
+ cds_list_del_rcu(&stream->recv_node);
+ stream->in_recv_list = false;
+ }
+ pthread_mutex_unlock(&session->recv_list_lock);
+
+ stream_unpublish(stream);
+
+ if (stream->stream_fd) {
+ stream_fd_put(stream->stream_fd);
+ stream->stream_fd = NULL;
+ }
+ if (stream->index_fd) {
+ stream_fd_put(stream->index_fd);
+ stream->index_fd = NULL;
+ }
+ if (stream->trace) {
+ ctf_trace_put(stream->trace);
+ stream->trace = NULL;
+ }
+
+ call_rcu(&stream->rcu_node, stream_destroy_rcu);
+}
+
+void stream_put(struct relay_stream *stream)
+{
+ DBG("stream put for stream id %" PRIu64, stream->stream_handle);
+ /*
+ * Ensure existance of stream->reflock for stream unlock.
+ */
+ rcu_read_lock();
+ /*
+ * Stream reflock ensures that concurrent test and update of
+ * stream ref is atomic.
+ */
+ pthread_mutex_lock(&stream->reflock);
+ assert(stream->ref.refcount != 0);
+ /*
+ * Wait until we have processed all the stream packets before
+ * actually putting our last stream reference.
+ */
+ DBG("stream put stream id %" PRIu64 " refcount %d",
+ stream->stream_handle,
+ (int) stream->ref.refcount);
+ urcu_ref_put(&stream->ref, stream_release);
+ pthread_mutex_unlock(&stream->reflock);
+ rcu_read_unlock();
+}
+
+void stream_close(struct relay_stream *stream)
+{
+ relay_index_close_all(stream);
+ stream_put(stream);
+}
diff --git a/src/bin/lttng-relayd/stream.h b/src/bin/lttng-relayd/stream.h
index 4dd2e62..fd84f2e 100644
--- a/src/bin/lttng-relayd/stream.h
+++ b/src/bin/lttng-relayd/stream.h
@@ -1,6 +1,10 @@
+#ifndef _STREAM_H
+#define _STREAM_H
+
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -16,9 +20,6 @@
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef _STREAM_H
-#define _STREAM_H
-
#include <limits.h>
#include <inttypes.h>
#include <pthread.h>
@@ -27,100 +28,110 @@
#include <common/hashtable/hashtable.h>
#include "session.h"
+#include "stream-fd.h"
/*
* Represents a stream in the relay
*/
struct relay_stream {
uint64_t stream_handle;
- uint64_t prev_seq; /* previous data sequence number encountered */
- struct lttng_ht_node_u64 node;
+
/*
- * When we receive a stream, it gets stored in a list (on a per connection
- * basis) until we have all the streams of the same channel and the metadata
- * associated with it, then it gets flagged with viewer_ready.
+ * reflock used to synchronize the closing of this stream.
+ * stream reflock nests inside viewer stream reflock.
+ * stream reflock nests inside index reflock.
*/
- struct cds_list_head recv_list;
+ pthread_mutex_t reflock;
+ struct urcu_ref ref;
+ /* Back reference to trace. Protected by refcount on trace object. */
+ struct ctf_trace *trace;
- /* Added to the corresponding ctf_trace. */
- struct cds_list_head trace_list;
- struct rcu_head rcu_node;
- uint64_t session_id;
- int fd;
+ /*
+ * To protect from concurrent read/update. The viewer stream
+ * lock nests inside the stream lock. The stream lock nests
+ * inside the ctf_trace lock.
+ */
+ pthread_mutex_t lock;
+ uint64_t prev_seq; /* previous data sequence number encountered */
+ uint64_t last_net_seq_num; /* seq num to encounter before closing. */
+
+ /* FD on which to write the stream data. */
+ struct stream_fd *stream_fd;
/* FD on which to write the index data. */
- int index_fd;
- /* FD on which to read the index data for the viewer. */
- int read_index_fd;
+ struct stream_fd *index_fd;
char *path_name;
char *channel_name;
+
/* on-disk circular buffer of tracefiles */
uint64_t tracefile_size;
uint64_t tracefile_size_current;
uint64_t tracefile_count;
- uint64_t tracefile_count_current;
+ uint64_t current_tracefile_id;
+
/* To inform the viewer up to where it can go back in time. */
uint64_t oldest_tracefile_id;
- uint64_t total_index_received;
- uint64_t last_net_seq_num;
-
+ struct lttng_ht *indexes_ht;
/*
- * To protect from concurrent read/update. Also used to synchronize the
- * closing of this stream.
+ * Counts number of indexes in indexes_ht. Redundant info.
+ * Protected by stream lock.
*/
- pthread_mutex_t lock;
+ uint64_t total_index_received;
+ bool closed; /* Stream is closed. */
+
+ int indexes_in_flight;
/*
- * If the stream is inactive, this field is updated with the live beacon
- * timestamp end, when it is active, this field == -1ULL.
+ * If the stream is inactive, this field is updated with the
+ * live beacon timestamp end, when it is active, this
+ * field == -1ULL.
*/
uint64_t beacon_ts_end;
/*
- * Number of indexes that are supposed to be complete soon.
- * Avoid sending the inactivity beacon to the client when data is in
- * transit.
- */
- int indexes_in_flight;
- /*
* CTF stream ID, -1ULL when unset.
*/
uint64_t ctf_stream_id;
- /*
- * To protect the update of the close_write_flag and the checks of
- * the tracefile_count_current.
- * It is taken before checking whenever we need to know if the
- * writer and reader are working in the same tracefile.
- */
- pthread_mutex_t viewer_stream_rotation_lock;
- /* Information telling us when to close the stream */
- unsigned int close_flag:1;
+ /* Indicate if the stream was initialized for a data pending command. */
+ bool data_pending_check_done;
+
+ /* Is this stream a metadata stream ? */
+ int32_t is_metadata;
+
/*
- * Indicates if the stream has been effectively closed thus having the
- * information in it invalidated but NOT freed. The stream lock MUST be
- * held to read/update that value.
+ * Member of the stream list in struct ctf_trace.
+ * Updates are protected by the stream_list_lock.
+ * Traversals are protected by RCU.
*/
- unsigned int terminated_flag:1;
- /* Indicate if the stream was initialized for a data pending command. */
- unsigned int data_pending_check_done:1;
- unsigned int metadata_flag:1;
+ struct cds_list_head stream_node;
/*
- * To detect when we start overwriting old data, it is used to
- * update the oldest_tracefile_id.
+ * Temporary list belonging to the connection until all streams
+ * are received for that connection.
+ * Member of the stream recv list in the connection.
+ * Updates are protected by the stream_recv_list_lock.
+ * Traversals are protected by RCU.
*/
- unsigned int tracefile_overwrite:1;
+ bool in_recv_list;
+ struct cds_list_head recv_node;
+ bool published;
/*
- * Can this stream be used by a viewer or are we waiting for additional
- * information.
+ * Node of stream within global stream hash table.
*/
- unsigned int viewer_ready:1;
+ struct lttng_ht_node_u64 node;
+ struct rcu_head rcu_node; /* For call_rcu teardown. */
};
-struct relay_stream *stream_find_by_id(struct lttng_ht *ht,
- uint64_t stream_id);
-int stream_close(struct relay_session *session, struct relay_stream *stream);
-void stream_delete(struct lttng_ht *ht, struct relay_stream *stream);
-void stream_destroy(struct relay_stream *stream);
+struct relay_stream *stream_create(struct ctf_trace *trace,
+ uint64_t stream_handle, char *path_name,
+ char *channel_name, uint64_t tracefile_size,
+ uint64_t tracefile_count);
+
+struct relay_stream *stream_get_by_id(uint64_t stream_id);
+bool stream_get(struct relay_stream *stream);
+void stream_put(struct relay_stream *stream);
+void stream_close(struct relay_stream *stream);
+
+void stream_publish(struct relay_stream *stream);
#endif /* _STREAM_H */
diff --git a/src/bin/lttng-relayd/utils.h b/src/bin/lttng-relayd/utils.h
index de1521d..4a56980 100644
--- a/src/bin/lttng-relayd/utils.h
+++ b/src/bin/lttng-relayd/utils.h
@@ -1,6 +1,10 @@
+#ifndef RELAYD_UTILS_H
+#define RELAYD_UTILS_H
+
/*
* Copyright (C) 2012 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2 only,
@@ -16,9 +20,6 @@
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef RELAYD_UTILS_H
-#define RELAYD_UTILS_H
-
char *create_output_path(char *path_name);
#endif /* RELAYD_UTILS_H */
diff --git a/src/bin/lttng-relayd/viewer-stream.c b/src/bin/lttng-relayd/viewer-stream.c
index 3748629..60117f9 100644
--- a/src/bin/lttng-relayd/viewer-stream.c
+++ b/src/bin/lttng-relayd/viewer-stream.c
@@ -1,6 +1,7 @@
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -24,39 +25,32 @@
#include "lttng-relayd.h"
#include "viewer-stream.h"
-static void free_stream(struct relay_viewer_stream *stream)
+static void viewer_stream_destroy(struct relay_viewer_stream *vstream)
{
- assert(stream);
-
- free(stream->path_name);
- free(stream->channel_name);
- free(stream);
+ free(vstream->path_name);
+ free(vstream->channel_name);
+ free(vstream);
}
-static void deferred_free_viewer_stream(struct rcu_head *head)
+static void viewer_stream_destroy_rcu(struct rcu_head *head)
{
- struct relay_viewer_stream *stream =
+ struct relay_viewer_stream *vstream =
caa_container_of(head, struct relay_viewer_stream, rcu_node);
- free_stream(stream);
+ viewer_stream_destroy(vstream);
}
struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
- enum lttng_viewer_seek seek_t, struct ctf_trace *ctf_trace)
+ enum lttng_viewer_seek seek_t)
{
struct relay_viewer_stream *vstream;
- assert(stream);
- assert(ctf_trace);
-
vstream = zmalloc(sizeof(*vstream));
if (!vstream) {
PERROR("relay viewer stream zmalloc");
goto error;
}
- vstream->session_id = stream->session_id;
- vstream->stream_handle = stream->stream_handle;
vstream->path_name = strndup(stream->path_name, LTTNG_VIEWER_PATH_MAX);
if (vstream->path_name == NULL) {
PERROR("relay viewer path_name alloc");
@@ -68,216 +62,253 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
PERROR("relay viewer channel_name alloc");
goto error;
}
- vstream->tracefile_count = stream->tracefile_count;
- vstream->metadata_flag = stream->metadata_flag;
- vstream->tracefile_count_last = -1ULL;
switch (seek_t) {
case LTTNG_VIEWER_SEEK_BEGINNING:
- vstream->tracefile_count_current = stream->oldest_tracefile_id;
+ vstream->current_tracefile_id = stream->oldest_tracefile_id;
break;
case LTTNG_VIEWER_SEEK_LAST:
- vstream->tracefile_count_current = stream->tracefile_count_current;
+ vstream->current_tracefile_id = stream->current_tracefile_id;
break;
default:
- assert(0);
goto error;
}
-
- if (vstream->metadata_flag) {
- ctf_trace->viewer_metadata_stream = vstream;
+ if (!stream_get(stream)) {
+ ERR("Cannot get stream");
+ goto error;
}
+ vstream->stream = stream;
- /* Globally visible after the add unique. */
- lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle);
- lttng_ht_add_unique_u64(viewer_streams_ht, &vstream->stream_n);
-
- vstream->index_read_fd = -1;
- vstream->read_fd = -1;
-
- /*
- * This is to avoid a race between the initialization of this object and
- * the close of the given stream. If the stream is unable to find this
- * viewer stream when closing, this copy will at least take the latest
- * value. We also need that for the seek_last.
- */
- vstream->total_index_received = stream->total_index_received;
-
+ pthread_mutex_lock(&stream->lock);
/*
* If we never received an index for the current stream, delay the opening
* of the index, otherwise open it right now.
*/
- if (vstream->tracefile_count_current == stream->tracefile_count_current
- && vstream->total_index_received == 0) {
- vstream->index_read_fd = -1;
+ if (vstream->current_tracefile_id == stream->current_tracefile_id
+ && stream->total_index_received == 0) {
+ vstream->index_fd = NULL;
} else {
int read_fd;
read_fd = index_open(vstream->path_name, vstream->channel_name,
- vstream->tracefile_count, vstream->tracefile_count_current);
+ stream->tracefile_count,
+ vstream->current_tracefile_id);
if (read_fd < 0) {
+ pthread_mutex_unlock(&stream->lock);
+ goto error;
+ }
+ vstream->index_fd = stream_fd_create(read_fd);
+ if (!vstream->index_fd) {
+ if (close(read_fd)) {
+ PERROR("close");
+ }
goto error;
}
- vstream->index_read_fd = read_fd;
}
- if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_read_fd >= 0) {
+ if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_fd) {
off_t lseek_ret;
- lseek_ret = lseek(vstream->index_read_fd,
- vstream->total_index_received * sizeof(struct ctf_packet_index),
- SEEK_CUR);
+ lseek_ret = lseek(vstream->index_fd->fd, 0, SEEK_END);
if (lseek_ret < 0) {
+ pthread_mutex_unlock(&stream->lock);
goto error;
}
- vstream->last_sent_index = vstream->total_index_received;
+ vstream->last_sent_index = stream->total_index_received;
}
+ pthread_mutex_unlock(&stream->lock);
+
+ if (stream->is_metadata) {
+ stream->trace->viewer_metadata_stream = vstream;
+ }
+
+ /* Globally visible after the add unique. */
+ lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle);
+ lttng_ht_add_unique_u64(viewer_streams_ht, &vstream->stream_n);
+
+ pthread_mutex_init(&vstream->lock, NULL);
+ pthread_mutex_init(&vstream->reflock, NULL);
+ urcu_ref_init(&vstream->ref);
return vstream;
error:
if (vstream) {
- free_stream(vstream);
+ viewer_stream_destroy(vstream);
}
return NULL;
}
-void viewer_stream_delete(struct relay_viewer_stream *stream)
+static void viewer_stream_unpublish(struct relay_viewer_stream *vstream)
{
int ret;
struct lttng_ht_iter iter;
- iter.iter.node = &stream->stream_n.node;
+ iter.iter.node = &vstream->stream_n.node;
ret = lttng_ht_del(viewer_streams_ht, &iter);
assert(!ret);
}
-void viewer_stream_destroy(struct ctf_trace *ctf_trace,
- struct relay_viewer_stream *stream)
+static void viewer_stream_release(struct urcu_ref *ref)
{
- int ret;
+ struct relay_viewer_stream *vstream = caa_container_of(ref,
+ struct relay_viewer_stream, ref);
- assert(stream);
-
- if (ctf_trace) {
- ctf_trace_put_ref(ctf_trace);
+ if (vstream->stream->is_metadata) {
+ vstream->stream->trace->viewer_metadata_stream = NULL;
}
- if (stream->read_fd >= 0) {
- ret = close(stream->read_fd);
- if (ret < 0) {
- PERROR("close read_fd");
- }
+ viewer_stream_unpublish(vstream);
+
+ if (vstream->stream_fd) {
+ stream_fd_put(vstream->stream_fd);
+ vstream->stream_fd = NULL;
}
- if (stream->index_read_fd >= 0) {
- ret = close(stream->index_read_fd);
- if (ret < 0) {
- PERROR("close index_read_fd");
- }
+ if (vstream->index_fd) {
+ stream_fd_put(vstream->index_fd);
+ vstream->index_fd = NULL;
+ }
+ if (vstream->stream) {
+ stream_put(vstream->stream);
+ vstream->stream = NULL;
+ }
+ call_rcu(&vstream->rcu_node, viewer_stream_destroy_rcu);
+}
+
+/* Should be called with RCU read-side lock held. */
+bool viewer_stream_get(struct relay_viewer_stream *vstream)
+{
+ bool has_ref = false;
+
+ pthread_mutex_lock(&vstream->reflock);
+ if (vstream->ref.refcount != 0) {
+ has_ref = true;
+ urcu_ref_get(&vstream->ref);
}
+ pthread_mutex_unlock(&vstream->reflock);
- call_rcu(&stream->rcu_node, deferred_free_viewer_stream);
+ return has_ref;
}
/*
- * Find viewer stream by id. RCU read side lock MUST be acquired.
+ * Get viewer stream by id.
*
- * Return stream if found else NULL.
+ * Return viewer stream if found else NULL.
*/
-struct relay_viewer_stream *viewer_stream_find_by_id(uint64_t id)
+struct relay_viewer_stream *viewer_stream_get_by_id(uint64_t id)
{
struct lttng_ht_node_u64 *node;
struct lttng_ht_iter iter;
- struct relay_viewer_stream *stream = NULL;
+ struct relay_viewer_stream *vstream = NULL;
+ rcu_read_lock();
lttng_ht_lookup(viewer_streams_ht, &id, &iter);
node = lttng_ht_iter_get_node_u64(&iter);
if (!node) {
DBG("Relay viewer stream %" PRIu64 " not found", id);
goto end;
}
- stream = caa_container_of(node, struct relay_viewer_stream, stream_n);
-
+ vstream = caa_container_of(node, struct relay_viewer_stream, stream_n);
+ if (!viewer_stream_get(vstream)) {
+ vstream = NULL;
+ }
end:
- return stream;
+ rcu_read_unlock();
+ return vstream;
+}
+
+void viewer_stream_put(struct relay_viewer_stream *vstream)
+{
+ rcu_read_lock();
+ pthread_mutex_lock(&vstream->reflock);
+ urcu_ref_put(&vstream->ref, viewer_stream_release);
+ pthread_mutex_unlock(&vstream->reflock);
+ rcu_read_unlock();
+}
+
+/*
+ * Returns whether the current tracefile is readable. If not, it has
+ * been overwritten.
+ * Must be called with rstream and vstream locks held.
+ */
+bool viewer_stream_is_tracefile_id_readable(struct relay_viewer_stream *vstream, uint64_t id)
+{
+ struct relay_stream *stream = vstream->stream;
+
+ if (stream->oldest_tracefile_id <= stream->current_tracefile_id) {
+ if (id >= stream->oldest_tracefile_id && id < stream->current_tracefile_id) {
+ /* id is a readable file. */
+ return true;
+ } else {
+ /* id is not readable. */
+ return false;
+ }
+ } else {
+ if (id >= stream->oldest_tracefile_id || id < stream->current_tracefile_id) {
+ /* id is a readable file. */
+ return true;
+ } else {
+ /* id is not readable. */
+ return false;
+ }
+ }
}
/*
* Rotate a stream to the next tracefile.
*
- * Must be called with viewer_stream_rotation_lock held.
+ * Must be called with rstream and vstream locks held.
* Returns 0 on success, 1 on EOF, a negative value on error.
*/
-int viewer_stream_rotate(struct relay_viewer_stream *vstream,
- struct relay_stream *stream)
+int viewer_stream_rotate(struct relay_viewer_stream *vstream)
{
int ret;
- uint64_t tracefile_id;
-
- assert(vstream);
- assert(stream);
-
- if (vstream->tracefile_count == 0) {
- /* Ignore rotation, there is none to do. */
- ret = 0;
- goto end;
- }
-
- tracefile_id = (vstream->tracefile_count_current + 1) %
- vstream->tracefile_count;
+ uint64_t new_id;
+ struct relay_stream *stream = vstream->stream;
/* Detect the last tracefile to open. */
- if (vstream->tracefile_count_last != -1ULL &&
- vstream->tracefile_count_last ==
- vstream->tracefile_count_current) {
+ if (stream->total_index_received == vstream->last_sent_index
+ && stream->trace->session->connection_closed) {
ret = 1;
goto end;
}
- /*
- * The writer and the reader are not working in the same tracefile, we can
- * read up to EOF, we don't care about the total_index_received.
- */
- if (stream->close_flag || (stream->tracefile_count_current != tracefile_id)) {
- vstream->close_write_flag = 1;
- } else {
- /*
- * We are opening a file that is still open in write, make sure we
- * limit our reading to the number of indexes received.
- */
- vstream->close_write_flag = 0;
- if (stream->close_flag) {
- vstream->total_index_received = stream->total_index_received;
- }
+ if (stream->tracefile_count == 0) {
+ /* Ignore rotation, there is none to do. */
+ ret = 0;
+ goto end;
}
- vstream->tracefile_count_current = tracefile_id;
- ret = close(vstream->index_read_fd);
- if (ret < 0) {
- PERROR("close index file %d", vstream->index_read_fd);
+ new_id = (vstream->current_tracefile_id + 1) % stream->tracefile_count;
+ if (!viewer_stream_is_tracefile_id_readable(vstream, new_id)) {
+ new_id = stream->oldest_tracefile_id;
}
- vstream->index_read_fd = -1;
+ vstream->current_tracefile_id = new_id;
- ret = close(vstream->read_fd);
- if (ret < 0) {
- PERROR("close tracefile %d", vstream->read_fd);
+ if (vstream->index_fd) {
+ stream_fd_put(vstream->index_fd);
+ vstream->index_fd = NULL;
+ }
+ if (vstream->stream_fd) {
+ stream_fd_put(vstream->stream_fd);
+ vstream->stream_fd = NULL;
}
- vstream->read_fd = -1;
-
- pthread_mutex_lock(&vstream->overwrite_lock);
- vstream->abort_flag = 0;
- pthread_mutex_unlock(&vstream->overwrite_lock);
ret = index_open(vstream->path_name, vstream->channel_name,
- vstream->tracefile_count, vstream->tracefile_count_current);
+ stream->tracefile_count,
+ vstream->current_tracefile_id);
if (ret < 0) {
- goto error;
+ goto end;
+ }
+ vstream->index_fd = stream_fd_create(ret);
+ if (vstream->index_fd) {
+ ret = 0;
+ } else {
+ if (close(ret)) {
+ PERROR("close");
+ }
+ ret = -1;
}
- vstream->index_read_fd = ret;
-
- ret = 0;
-
end:
-error:
return ret;
}
diff --git a/src/bin/lttng-relayd/viewer-stream.h b/src/bin/lttng-relayd/viewer-stream.h
index 003b119..75527d2 100644
--- a/src/bin/lttng-relayd/viewer-stream.h
+++ b/src/bin/lttng-relayd/viewer-stream.h
@@ -1,6 +1,10 @@
+#ifndef _VIEWER_STREAM_H
+#define _VIEWER_STREAM_H
+
/*
* Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
* David Goulet <dgoulet at efficios.com>
+ * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
@@ -16,9 +20,6 @@
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef _VIEWER_STREAM_H
-#define _VIEWER_STREAM_H
-
#include <limits.h>
#include <inttypes.h>
#include <pthread.h>
@@ -33,57 +34,47 @@
struct relay_stream;
/*
- * Shadow copy of the relay_stream structure for the viewer side. The only
- * fields updated by the writer (streaming side) after allocation are :
- * total_index_received and close_flag. Everything else is updated by the
- * reader (viewer side).
+ * Shadow copy of the relay_stream structure for the viewer side.
*/
struct relay_viewer_stream {
- uint64_t stream_handle;
- uint64_t session_id;
- int read_fd;
- int index_read_fd;
+ struct urcu_ref ref;
+ pthread_mutex_t reflock;
+
+ /*
+ * This lock nests inside the stream lock.
+ */
+ pthread_mutex_t lock;
+
+ /* Back ref to stream */
+ struct relay_stream *stream;
+
+ /* FD from which to read the stream data. */
+ struct stream_fd *stream_fd;
+ /* FD from which to read the index data. */
+ struct stream_fd *index_fd;
+
char *path_name;
char *channel_name;
+
+ uint64_t current_tracefile_id;
uint64_t last_sent_index;
- uint64_t total_index_received;
- uint64_t tracefile_count;
- uint64_t tracefile_count_current;
- /* Stop after reading this tracefile. */
- uint64_t tracefile_count_last;
+
+ /* Indicates if this stream has been sent to a viewer client. */
+ bool sent_flag;
+
struct lttng_ht_node_u64 stream_n;
struct rcu_head rcu_node;
- struct ctf_trace *ctf_trace;
- /*
- * This lock blocks only when the writer is about to start overwriting
- * a file currently read by the reader.
- *
- * This is nested INSIDE the viewer_stream_rotation_lock.
- */
- pthread_mutex_t overwrite_lock;
- /* Information telling us if the stream is a metadata stream. */
- unsigned int metadata_flag:1;
- /*
- * Information telling us that the stream is closed in write, so
- * we don't expect new indexes and we can read up to EOF.
- */
- unsigned int close_write_flag:1;
- /*
- * If the streaming side closes a FD in use in the viewer side,
- * it sets this flag to inform that it is a normal error.
- */
- unsigned int abort_flag:1;
- /* Indicates if this stream has been sent to a viewer client. */
- unsigned int sent_flag:1;
};
struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
- enum lttng_viewer_seek seek_t, struct ctf_trace *ctf_trace);
-struct relay_viewer_stream *viewer_stream_find_by_id(uint64_t id);
-void viewer_stream_destroy(struct ctf_trace *ctf_trace,
- struct relay_viewer_stream *stream);
-void viewer_stream_delete(struct relay_viewer_stream *stream);
-int viewer_stream_rotate(struct relay_viewer_stream *vstream,
- struct relay_stream *stream);
+ enum lttng_viewer_seek seek_t);
+
+struct relay_viewer_stream *viewer_stream_get_by_id(uint64_t id);
+bool viewer_stream_get(struct relay_viewer_stream *vstream);
+void viewer_stream_put(struct relay_viewer_stream *vstream);
+int viewer_stream_rotate(struct relay_viewer_stream *vstream);
+bool viewer_stream_is_tracefile_id_readable(struct relay_viewer_stream *vstream,
+ uint64_t id);
+
#endif /* _VIEWER_STREAM_H */
diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c
index 87d5f34..0c5ffff 100644
--- a/src/bin/lttng-sessiond/consumer.c
+++ b/src/bin/lttng-sessiond/consumer.c
@@ -1077,11 +1077,8 @@ error:
}
/*
- * Ask the consumer if the data is ready to read (NOT pending) for the specific
- * session id.
- *
- * This function has a different behavior with the consumer i.e. that it waits
- * for a reply from the consumer if yes or no the data is pending.
+ * Ask the consumer if the data is pending for the specific session id.
+ * Returns 1 if data is pending, 0 otherwise, or < 0 on error.
*/
int consumer_is_data_pending(uint64_t session_id,
struct consumer_output *consumer)
--
2.1.4
More information about the lttng-dev
mailing list