[lttng-dev] [PATCH v3 lttng-tools 4/4] Fix: Relay daemon ownership and reference counting
Mathieu Desnoyers
mathieu.desnoyers at efficios.com
Mon Aug 17 18:44:02 EDT 2015
This version of the patch has issues with tracefile rotation.
I'm working on an updated version.
Thanks,
Mathieu
----- On Aug 17, 2015, at 11:12 AM, Mathieu Desnoyers mathieu.desnoyers at efficios.com wrote:
> The ownership and reference counting of the relay daemon is unclear and
> buggy in many ways. It is the cause of memory corruptions, double-free,
> leaks, segmentation faults, observed in various conditions.
>
> Fix this situation by introducing a clear ownership and reference
> counting scheme for this daemon.
>
> See doc/relayd-architecture.txt for details.
>
> Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> ---
> doc/Makefile.am | 3 +-
> doc/relayd-architecture.txt | 96 +++
> src/bin/lttng-relayd/Makefile.am | 4 +-
> src/bin/lttng-relayd/cmd-2-1.c | 17 +-
> src/bin/lttng-relayd/cmd-2-1.h | 10 +-
> src/bin/lttng-relayd/cmd-2-2.c | 22 +-
> src/bin/lttng-relayd/cmd-2-2.h | 10 +-
> src/bin/lttng-relayd/cmd-2-4.c | 19 +-
> src/bin/lttng-relayd/cmd-2-4.h | 10 +-
> src/bin/lttng-relayd/cmd-generic.c | 1 +
> src/bin/lttng-relayd/cmd-generic.h | 7 +-
> src/bin/lttng-relayd/cmd.h | 7 +-
> src/bin/lttng-relayd/connection.c | 128 ++--
> src/bin/lttng-relayd/connection.h | 63 +-
> src/bin/lttng-relayd/ctf-trace.c | 198 +++---
> src/bin/lttng-relayd/ctf-trace.h | 66 +-
> src/bin/lttng-relayd/index.c | 342 ++++++----
> src/bin/lttng-relayd/index.h | 60 +-
> src/bin/lttng-relayd/live.c | 1049 ++++++++++++++---------------
> src/bin/lttng-relayd/live.h | 10 +-
> src/bin/lttng-relayd/lttng-relayd.h | 23 +-
> src/bin/lttng-relayd/main.c | 1167 ++++++++++++++-------------------
> src/bin/lttng-relayd/session.c | 214 +++---
> src/bin/lttng-relayd/session.h | 116 ++--
> src/bin/lttng-relayd/stream-fd.c | 58 ++
> src/bin/lttng-relayd/stream-fd.h | 32 +
> src/bin/lttng-relayd/stream.c | 343 ++++++++--
> src/bin/lttng-relayd/stream.h | 130 ++--
> src/bin/lttng-relayd/utils.h | 7 +-
> src/bin/lttng-relayd/viewer-session.c | 164 +++++
> src/bin/lttng-relayd/viewer-session.h | 53 ++
> src/bin/lttng-relayd/viewer-stream.c | 325 +++++----
> src/bin/lttng-relayd/viewer-stream.h | 83 ++-
> src/bin/lttng-sessiond/consumer.c | 7 +-
> src/bin/lttng-sessiond/ust-consumer.c | 3 +-
> src/common/index/ctf-index.h | 2 +-
> src/common/index/index.c | 13 +
> src/common/utils.c | 109 ++-
> src/common/utils.h | 2 +
> 39 files changed, 2909 insertions(+), 2064 deletions(-)
> create mode 100644 doc/relayd-architecture.txt
> create mode 100644 src/bin/lttng-relayd/stream-fd.c
> create mode 100644 src/bin/lttng-relayd/stream-fd.h
> create mode 100644 src/bin/lttng-relayd/viewer-session.c
> create mode 100644 src/bin/lttng-relayd/viewer-session.h
>
> diff --git a/doc/Makefile.am b/doc/Makefile.am
> index 4ff71eb..7ae9303 100644
> --- a/doc/Makefile.am
> +++ b/doc/Makefile.am
> @@ -1,7 +1,8 @@
> SUBDIRS = man
> EXTRA_DIST = quickstart.txt streaming-howto.txt python-howto.txt \
> snapshot-howto.txt calibrate.txt kernel-CodingStyle.txt \
> - live-reading-howto.txt live-reading-protocol.txt
> + live-reading-howto.txt live-reading-protocol.txt \
> + relayd-architecture.txt
>
> dist_doc_DATA = quickstart.txt streaming-howto.txt python-howto.txt \
> snapshot-howto.txt calibrate.txt live-reading-howto.txt \
> diff --git a/doc/relayd-architecture.txt b/doc/relayd-architecture.txt
> new file mode 100644
> index 0000000..c45bd56
> --- /dev/null
> +++ b/doc/relayd-architecture.txt
> @@ -0,0 +1,96 @@
> +LTTng Relay Daemon Architecture
> +Mathieu Desnoyers, August 2015
> +
> +This document describes the object model and architecture of the relay
> +daemon, after the refactoring done within the commit "Fix: Relay daemon
> +ownership and reference counting".
> +
> +We have the following object composition hierarchy:
> +
> +relay connection (main.c, for sessiond/consumer)
> + |
> + \-> 0 or 1 session
> + |
> + \-> 0 or many ctf-trace
> + |
> + \-> 0 or many stream
> + | |
> + | \-> 0 or many index
> + |
> + \-------> 0 or 1 viewer stream
> +
> +live connection (live.c, for client)
> + |
> + \-> 1 viewer session
> + |
> + \-> 0 or many session (actually a reference to session as created
> + | by the relay connection)
> + |
> + \-> ..... (ctf-trace, stream, index, viewer stream)
> +
> +There are global tables declared in lttng-relayd.h for sessions
> +(sessions_ht, indexed by session id), streams (relay_streams_ht, indexed
> +by stream handle), and viewer streams (viewer_streams_ht, indexed by
> +stream handle). The purpuse of those tables is to allow fast lookup of
> +those objects using the IDs received in the communication protocols.
> +
> +There is also one connection hash table per worker thread. There is one
> +worker thread to receive data (main.c), and one worker thread to
> +interact with viewer clients (live.c). Those tables are indexed by
> +socket file descriptor.
> +
> +A RCU lookup+refcounting scheme has been introduced for all objects
> +(except viewer session which is still an exception at the moment). This
> +scheme allows looking up the objects or doing a traversal on the RCU
> +linked list or hash table in combination with a getter on the object.
> +This getter validables that there is still at least one reference to the
> +object, else the lookup acts just as if the object does not exist. This
> +scheme is protected by a "reflock" mutex in each object. "reflock"
> +mutexes can be nested from the innermost object to the outermost object.
> +IOW, the session reflock can nest within the ctf-trace reflock.
> +
> +There is also a "lock" mutex in each object. Those are used to
> +synchronize between threads (currently the main.c relay thread and
> +live.c client thread) when objects are shared. Locks can be nested form
> +the outmost object to the innermost object. IOW, the ctf-trace lock can
> +nest within the session lock.
> +
> +A "lock" should never nest within a "reflock".
> +
> +RCU linked lists are used to iterate using RCU, and protected by
> +their own mutex for modifications. Iterations should be confirmed using
> +the object "getter" to ensure its refcount is not 0 (exept in cases
> +where the caller actually owns the objects and therefore can assume its
> +refcount is not 0).
> +
> +RCU hash tables are used to iterate using RCU. Iteration should be
> +confirmed using the object "getter" to ensure its refcount is not 0
> +(except again if we have ownership and can assume the object refcount is
> +not 0).
> +
> +Object creation has a refcount of 1. Each getter increments the
> +refcount, and need to be paired with a "put" to decrement it. A final
> +put on "self" (ownership) will allow refcount to reach 0, therefore
> +triggering release, and thus free through call_rcu.
> +
> +In the composition scheme, we find back references from each composite
> +to its container. Therefore, each composite holds a reference (refcount)
> +on its container. This allows following pointers from e.g. viewer stream
> +to stream to ctf-trace to session without performing any validation,
> +due to transitive refcounting of those back-references.
> +
> +In addition to those back references, there are a few key ownership
> +references held. The connection in the relay worker thread (main.c)
> +holds ownership on the session, and on each stream it contains. The
> +connection in the live worker thread (live.c) holds ownership on each
> +the viewer stream it creates. The rest is ensured by back references
> +from composite to container objects. When a connection is closed, it
> +puts all the ownership references it is holding. This will then
> +eventually trigger destruction of the session, streams, and viewer
> +streams associated with the connection when all the back references
> +reach 0.
> +
> +RCU read-side locks are now only held during iteration on RCU lists and
> +hash tables, and within the internals of the get (lookup) and put
> +functions. Those functions then use refcounting to ensure existance of
> +the object when returned to their caller.
> diff --git a/src/bin/lttng-relayd/Makefile.am b/src/bin/lttng-relayd/Makefile.am
> index 126cdaf..428f352 100644
> --- a/src/bin/lttng-relayd/Makefile.am
> +++ b/src/bin/lttng-relayd/Makefile.am
> @@ -17,7 +17,9 @@ lttng_relayd_SOURCES = main.c lttng-relayd.h utils.h utils.c
> cmd.h \
> viewer-stream.h viewer-stream.c \
> session.c session.h \
> stream.c stream.h \
> - connection.c connection.h
> + stream-fd.c stream-fd.h \
> + connection.c connection.h \
> + viewer-session.c viewer-session.h
>
> # link on liblttngctl for check if relayd is already alive.
> lttng_relayd_LDADD = -lrt -lurcu-common -lurcu \
> diff --git a/src/bin/lttng-relayd/cmd-2-1.c b/src/bin/lttng-relayd/cmd-2-1.c
> index 0cd9b5a..a2c340f 100644
> --- a/src/bin/lttng-relayd/cmd-2-1.c
> +++ b/src/bin/lttng-relayd/cmd-2-1.c
> @@ -1,6 +1,7 @@
> /*
> * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify it
> * under the terms of the GNU General Public License, version 2 only, as
> @@ -28,30 +29,30 @@
> #include "cmd-2-1.h"
> #include "utils.h"
>
> +/*
> + * cmd_recv_stream_2_1 allocates path_name and channel_name.
> + */
> int cmd_recv_stream_2_1(struct relay_connection *conn,
> - struct relay_stream *stream)
> + char **path_name, char **channel_name)
> {
> int ret;
> struct lttcomm_relayd_add_stream stream_info;
>
> - assert(conn);
> - assert(stream);
> -
> ret = cmd_recv(conn->sock, &stream_info, sizeof(stream_info));
> if (ret < 0) {
> ERR("Unable to recv stream version 2.1");
> goto error;
> }
>
> - stream->path_name = create_output_path(stream_info.pathname);
> - if (stream->path_name == NULL) {
> + *path_name = create_output_path(stream_info.pathname);
> + if (*path_name == NULL) {
> PERROR("Path name allocation");
> ret = -ENOMEM;
> goto error;
> }
>
> - stream->channel_name = strdup(stream_info.channel_name);
> - if (stream->channel_name == NULL) {
> + *channel_name = strdup(stream_info.channel_name);
> + if (*channel_name == NULL) {
> ret = -errno;
> PERROR("Path name allocation");
> goto error;
> diff --git a/src/bin/lttng-relayd/cmd-2-1.h b/src/bin/lttng-relayd/cmd-2-1.h
> index bab8190..46283dc 100644
> --- a/src/bin/lttng-relayd/cmd-2-1.h
> +++ b/src/bin/lttng-relayd/cmd-2-1.h
> @@ -1,6 +1,10 @@
> +#ifndef RELAYD_CMD_2_1_H
> +#define RELAYD_CMD_2_1_H
> +
> /*
> * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify it
> * under the terms of the GNU General Public License, version 2 only, as
> @@ -16,13 +20,9 @@
> * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
> */
>
> -#ifndef RELAYD_CMD_2_1_H
> -#define RELAYD_CMD_2_1_H
> -
> #include "lttng-relayd.h"
> -#include "stream.h"
>
> int cmd_recv_stream_2_1(struct relay_connection *conn,
> - struct relay_stream *stream);
> + char **path_name, char **channel_name);
>
> #endif /* RELAYD_CMD_2_1_H */
> diff --git a/src/bin/lttng-relayd/cmd-2-2.c b/src/bin/lttng-relayd/cmd-2-2.c
> index 7dd99ad..1493b73 100644
> --- a/src/bin/lttng-relayd/cmd-2-2.c
> +++ b/src/bin/lttng-relayd/cmd-2-2.c
> @@ -1,6 +1,7 @@
> /*
> * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify it
> * under the terms of the GNU General Public License, version 2 only, as
> @@ -30,37 +31,38 @@
> #include "cmd-2-1.h"
> #include "utils.h"
>
> +/*
> + * cmd_recv_stream_2_2 allocates path_name and channel_name.
> + */
> int cmd_recv_stream_2_2(struct relay_connection *conn,
> - struct relay_stream *stream)
> + char **path_name, char **channel_name,
> + uint64_t *tracefile_size, uint64_t *tracefile_count)
> {
> int ret;
> struct lttcomm_relayd_add_stream_2_2 stream_info;
>
> - assert(conn);
> - assert(stream);
> -
> ret = cmd_recv(conn->sock, &stream_info, sizeof(stream_info));
> if (ret < 0) {
> ERR("Unable to recv stream version 2.2");
> goto error;
> }
>
> - stream->path_name = create_output_path(stream_info.pathname);
> - if (stream->path_name == NULL) {
> + *path_name = create_output_path(stream_info.pathname);
> + if (*path_name == NULL) {
> PERROR("Path name allocation");
> ret = -ENOMEM;
> goto error;
> }
>
> - stream->channel_name = strdup(stream_info.channel_name);
> - if (stream->channel_name == NULL) {
> + *channel_name = strdup(stream_info.channel_name);
> + if (*channel_name == NULL) {
> ret = -errno;
> PERROR("Path name allocation");
> goto error;
> }
>
> - stream->tracefile_size = be64toh(stream_info.tracefile_size);
> - stream->tracefile_count = be64toh(stream_info.tracefile_count);
> + *tracefile_size = be64toh(stream_info.tracefile_size);
> + *tracefile_count = be64toh(stream_info.tracefile_count);
> ret = 0;
>
> error:
> diff --git a/src/bin/lttng-relayd/cmd-2-2.h b/src/bin/lttng-relayd/cmd-2-2.h
> index bd1cd14..894a63a 100644
> --- a/src/bin/lttng-relayd/cmd-2-2.h
> +++ b/src/bin/lttng-relayd/cmd-2-2.h
> @@ -1,6 +1,10 @@
> +#ifndef RELAYD_CMD_2_2_H
> +#define RELAYD_CMD_2_2_H
> +
> /*
> * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify it
> * under the terms of the GNU General Public License, version 2 only, as
> @@ -16,12 +20,10 @@
> * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
> */
>
> -#ifndef RELAYD_CMD_2_2_H
> -#define RELAYD_CMD_2_2_H
> -
> #include "lttng-relayd.h"
>
> int cmd_recv_stream_2_2(struct relay_connection *conn,
> - struct relay_stream *stream);
> + char **path_name, char **channel_name,
> + uint64_t *tracefile_size, uint64_t *tracefile_count);
>
> #endif /* RELAYD_CMD_2_2_H */
> diff --git a/src/bin/lttng-relayd/cmd-2-4.c b/src/bin/lttng-relayd/cmd-2-4.c
> index d8aa737..a3290cb 100644
> --- a/src/bin/lttng-relayd/cmd-2-4.c
> +++ b/src/bin/lttng-relayd/cmd-2-4.c
> @@ -1,6 +1,7 @@
> /*
> * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify it
> * under the terms of the GNU General Public License, version 2 only, as
> @@ -30,26 +31,24 @@
> #include "lttng-relayd.h"
>
> int cmd_create_session_2_4(struct relay_connection *conn,
> - struct relay_session *session)
> + char *session_name, char *hostname,
> + uint32_t *live_timer, bool *snapshot)
> {
> int ret;
> struct lttcomm_relayd_create_session_2_4 session_info;
>
> - assert(conn);
> - assert(session);
> -
> ret = cmd_recv(conn->sock, &session_info, sizeof(session_info));
> if (ret < 0) {
> ERR("Unable to recv session info version 2.4");
> goto error;
> }
>
> - strncpy(session->session_name, session_info.session_name,
> - sizeof(session->session_name));
> - strncpy(session->hostname, session_info.hostname,
> - sizeof(session->hostname));
> - session->live_timer = be32toh(session_info.live_timer);
> - session->snapshot = be32toh(session_info.snapshot);
> + strncpy(session_name, session_info.session_name,
> + sizeof(session_info.session_name));
> + strncpy(hostname, session_info.hostname,
> + sizeof(session_info.hostname));
> + *live_timer = be32toh(session_info.live_timer);
> + *snapshot = be32toh(session_info.snapshot);
>
> ret = 0;
>
> diff --git a/src/bin/lttng-relayd/cmd-2-4.h b/src/bin/lttng-relayd/cmd-2-4.h
> index aaf572a..ab73478 100644
> --- a/src/bin/lttng-relayd/cmd-2-4.h
> +++ b/src/bin/lttng-relayd/cmd-2-4.h
> @@ -1,6 +1,10 @@
> +#ifndef RELAYD_CMD_2_4_H
> +#define RELAYD_CMD_2_4_H
> +
> /*
> * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify it
> * under the terms of the GNU General Public License, version 2 only, as
> @@ -16,12 +20,10 @@
> * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
> */
>
> -#ifndef RELAYD_CMD_2_4_H
> -#define RELAYD_CMD_2_4_H
> -
> #include "lttng-relayd.h"
>
> int cmd_create_session_2_4(struct relay_connection *conn,
> - struct relay_session *session);
> + char *session_name, char *hostname,
> + uint32_t *live_timer, bool *snapshot);
>
> #endif /* RELAYD_CMD_2_4_H */
> diff --git a/src/bin/lttng-relayd/cmd-generic.c
> b/src/bin/lttng-relayd/cmd-generic.c
> index 417d6d3..276e85b 100644
> --- a/src/bin/lttng-relayd/cmd-generic.c
> +++ b/src/bin/lttng-relayd/cmd-generic.c
> @@ -1,6 +1,7 @@
> /*
> * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify it
> * under the terms of the GNU General Public License, version 2 only, as
> diff --git a/src/bin/lttng-relayd/cmd-generic.h
> b/src/bin/lttng-relayd/cmd-generic.h
> index 640fed7..4551f0a 100644
> --- a/src/bin/lttng-relayd/cmd-generic.h
> +++ b/src/bin/lttng-relayd/cmd-generic.h
> @@ -1,6 +1,10 @@
> +#ifndef RELAYD_CMD_GENERIC_H
> +#define RELAYD_CMD_GENERIC_H
> +
> /*
> * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify it
> * under the terms of the GNU General Public License, version 2 only, as
> @@ -16,9 +20,6 @@
> * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
> */
>
> -#ifndef RELAYD_CMD_GENERIC_H
> -#define RELAYD_CMD_GENERIC_H
> -
> #include <common/sessiond-comm/sessiond-comm.h>
>
> #include "connection.h"
> diff --git a/src/bin/lttng-relayd/cmd.h b/src/bin/lttng-relayd/cmd.h
> index c8b37d5..88db09a 100644
> --- a/src/bin/lttng-relayd/cmd.h
> +++ b/src/bin/lttng-relayd/cmd.h
> @@ -1,6 +1,10 @@
> +#ifndef RELAYD_CMD_H
> +#define RELAYD_CMD_H
> +
> /*
> * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify it
> * under the terms of the GNU General Public License, version 2 only, as
> @@ -16,9 +20,6 @@
> * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
> */
>
> -#ifndef RELAYD_CMD_H
> -#define RELAYD_CMD_H
> -
> #include "cmd-generic.h"
> #include "cmd-2-1.h"
> #include "cmd-2-2.h"
> diff --git a/src/bin/lttng-relayd/connection.c
> b/src/bin/lttng-relayd/connection.c
> index 76e48a6..b425548 100644
> --- a/src/bin/lttng-relayd/connection.c
> +++ b/src/bin/lttng-relayd/connection.c
> @@ -1,6 +1,7 @@
> /*
> * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify it
> * under the terms of the GNU General Public License, version 2 only, as
> @@ -19,91 +20,132 @@
> #define _GNU_SOURCE
> #define _LGPL_SOURCE
> #include <common/common.h>
> +#include <urcu/rculist.h>
>
> #include "connection.h"
> #include "stream.h"
> +#include "viewer-session.h"
>
> -static void rcu_free_connection(struct rcu_head *head)
> +bool connection_get(struct relay_connection *conn)
> {
> - struct relay_connection *conn =
> - caa_container_of(head, struct relay_connection, rcu_node);
> + bool has_ref = false;
>
> - lttcomm_destroy_sock(conn->sock);
> - connection_free(conn);
> + pthread_mutex_lock(&conn->reflock);
> + if (conn->ref.refcount != 0) {
> + has_ref = true;
> + urcu_ref_get(&conn->ref);
> + }
> + pthread_mutex_unlock(&conn->reflock);
> +
> + return has_ref;
> }
>
> -/*
> - * Must be called with a read side lock held. The read side lock must be
> - * kept until the returned relay_connection is no longer in use.
> - */
> -struct relay_connection *connection_find_by_sock(struct lttng_ht *ht, int sock)
> +struct relay_connection *connection_get_by_sock(struct lttng_ht
> *relay_connections_ht,
> + int sock)
> {
> struct lttng_ht_node_ulong *node;
> struct lttng_ht_iter iter;
> struct relay_connection *conn = NULL;
>
> - assert(ht);
> assert(sock >= 0);
>
> - lttng_ht_lookup(ht, (void *)((unsigned long) sock), &iter);
> + rcu_read_lock();
> + lttng_ht_lookup(relay_connections_ht, (void *)((unsigned long) sock),
> + &iter);
> node = lttng_ht_iter_get_node_ulong(&iter);
> if (!node) {
> DBG2("Relay connection by sock %d not found", sock);
> goto end;
> }
> conn = caa_container_of(node, struct relay_connection, sock_n);
> -
> + if (!connection_get(conn)) {
> + conn = NULL;
> + }
> end:
> + rcu_read_unlock();
> return conn;
> }
>
> -void connection_delete(struct lttng_ht *ht, struct relay_connection *conn)
> +struct relay_connection *connection_create(struct lttcomm_sock *sock,
> + enum connection_type type)
> {
> - int ret;
> - struct lttng_ht_iter iter;
> -
> - assert(ht);
> - assert(conn);
> + struct relay_connection *conn;
>
> - iter.iter.node = &conn->sock_n.node;
> - ret = lttng_ht_del(ht, &iter);
> - assert(!ret);
> + conn = zmalloc(sizeof(*conn));
> + if (!conn) {
> + PERROR("zmalloc relay connection");
> + goto end;
> + }
> + pthread_mutex_init(&conn->lock, NULL);
> + pthread_mutex_init(&conn->reflock, NULL);
> + urcu_ref_init(&conn->ref);
> + conn->type = type;
> + conn->sock = sock;
> + lttng_ht_node_init_ulong(&conn->sock_n, (unsigned long) conn->sock->fd);
> +end:
> + return conn;
> }
>
> -void connection_destroy(struct relay_connection *conn)
> +static void rcu_free_connection(struct rcu_head *head)
> {
> - assert(conn);
> + struct relay_connection *conn =
> + caa_container_of(head, struct relay_connection, rcu_node);
>
> + lttcomm_destroy_sock(conn->sock);
> + if (conn->viewer_session) {
> + viewer_session_destroy(conn->viewer_session);
> + conn->viewer_session = NULL;
> + }
> + free(conn);
> +}
> +
> +static void destroy_connection(struct relay_connection *conn)
> +{
> call_rcu(&conn->rcu_node, rcu_free_connection);
> }
>
> -struct relay_connection *connection_create(void)
> +static void connection_release(struct urcu_ref *ref)
> {
> - struct relay_connection *conn;
> + struct relay_connection *conn =
> + caa_container_of(ref, struct relay_connection, ref);
>
> - conn = zmalloc(sizeof(*conn));
> - if (!conn) {
> - PERROR("zmalloc relay connection");
> - goto error;
> + if (conn->in_socket_ht) {
> + struct lttng_ht_iter iter;
> + int ret;
> +
> + iter.iter.node = &conn->sock_n.node;
> + ret = lttng_ht_del(conn->socket_ht, &iter);
> + assert(!ret);
> }
>
> -error:
> - return conn;
> + if (conn->session) {
> + if (session_close(conn->session)) {
> + ERR("session_close");
> + }
> + conn->session = NULL;
> + }
> + if (conn->viewer_session) {
> + viewer_session_close(conn->viewer_session);
> + }
> + destroy_connection(conn);
> }
>
> -void connection_init(struct relay_connection *conn)
> +void connection_put(struct relay_connection *conn)
> {
> - assert(conn);
> - assert(conn->sock);
> -
> - CDS_INIT_LIST_HEAD(&conn->recv_head);
> - lttng_ht_node_init_ulong(&conn->sock_n, (unsigned long) conn->sock->fd);
> + rcu_read_lock();
> + pthread_mutex_lock(&conn->reflock);
> + urcu_ref_put(&conn->ref, connection_release);
> + pthread_mutex_unlock(&conn->reflock);
> + rcu_read_unlock();
> }
>
> -void connection_free(struct relay_connection *conn)
> +void connection_ht_add(struct lttng_ht *relay_connections_ht,
> + struct relay_connection *conn)
> {
> - assert(conn);
> -
> - free(conn->viewer_session);
> - free(conn);
> + pthread_mutex_lock(&conn->lock);
> + assert(!conn->in_socket_ht);
> + lttng_ht_add_unique_ulong(relay_connections_ht, &conn->sock_n);
> + conn->in_socket_ht = 1;
> + conn->socket_ht = relay_connections_ht;
> + pthread_mutex_unlock(&conn->lock);
> }
> diff --git a/src/bin/lttng-relayd/connection.h
> b/src/bin/lttng-relayd/connection.h
> index 70fe4ba..1fff515 100644
> --- a/src/bin/lttng-relayd/connection.h
> +++ b/src/bin/lttng-relayd/connection.h
> @@ -1,6 +1,10 @@
> +#ifndef _CONNECTION_H
> +#define _CONNECTION_H
> +
> /*
> * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify it
> * under the terms of the GNU General Public License, version 2 only, as
> @@ -16,9 +20,6 @@
> * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
> */
>
> -#ifndef _CONNECTION_H
> -#define _CONNECTION_H
> -
> #include <limits.h>
> #include <inttypes.h>
> #include <pthread.h>
> @@ -32,6 +33,7 @@
> #include "session.h"
>
> enum connection_type {
> + RELAY_CONNECTION_UNKNOWN = 0,
> RELAY_DATA = 1,
> RELAY_CONTROL = 2,
> RELAY_VIEWER_COMMAND = 3,
> @@ -44,36 +46,49 @@ enum connection_type {
> */
> struct relay_connection {
> struct lttcomm_sock *sock;
> - struct relay_session *session;
> - struct relay_viewer_session *viewer_session;
> struct cds_wfcq_node qnode;
> - struct lttng_ht_node_ulong sock_n;
> - struct rcu_head rcu_node;
> +
> enum connection_type type;
> - /* Protocol version to use for this connection. */
> + /*
> + * session is only ever set for RELAY_CONTROL connection type.
> + */
> + struct relay_session *session;
> + /*
> + * viewer_session is only ever set for RELAY_VIEWER_COMMAND
> + * connection type.
> + */
> + struct relay_viewer_session *viewer_session;
> +
> + /*
> + * Protocol version to use for this connection. Only valid for
> + * RELAY_CONTROL connection type.
> + */
> uint32_t major;
> uint32_t minor;
> - uint64_t session_id;
> +
> + struct urcu_ref ref;
> + pthread_mutex_t reflock;
> +
> + pthread_mutex_t lock;
> +
> + bool version_check_done;
>
> /*
> - * This contains streams that are received on that connection. It's used to
> - * store them until we get the streams sent command where they are removed
> - * and flagged ready for the viewer. This is ONLY used by the control
> - * thread thus any action on it should happen in that thread.
> + * Node member of connection within global socket hash table.
> */
> - struct cds_list_head recv_head;
> - unsigned int version_check_done:1;
> -
> - /* Pointer to the sessions HT that this connection can use. */
> - struct lttng_ht *sessions_ht;
> + struct lttng_ht_node_ulong sock_n;
> + bool in_socket_ht;
> + struct lttng_ht *socket_ht; /* HACK: Contained within this hash table. */
> + struct rcu_head rcu_node; /* For call_rcu teardown. */
> };
>
> -struct relay_connection *connection_find_by_sock(struct lttng_ht *ht,
> +struct relay_connection *connection_create(struct lttcomm_sock *sock,
> + enum connection_type type);
> +struct relay_connection *connection_get_by_sock(struct lttng_ht
> *relay_connections_ht,
> int sock);
> -struct relay_connection *connection_create(void);
> -void connection_init(struct relay_connection *conn);
> -void connection_delete(struct lttng_ht *ht, struct relay_connection *conn);
> -void connection_destroy(struct relay_connection *conn);
> -void connection_free(struct relay_connection *conn);
> +bool connection_get(struct relay_connection *connection);
> +void connection_put(struct relay_connection *connection);
> +void connection_ht_add(struct lttng_ht *relay_connections_ht,
> + struct relay_connection *conn);
>
> #endif /* _CONNECTION_H */
> diff --git a/src/bin/lttng-relayd/ctf-trace.c b/src/bin/lttng-relayd/ctf-trace.c
> index 02a8b2b..20e1543 100644
> --- a/src/bin/lttng-relayd/ctf-trace.c
> +++ b/src/bin/lttng-relayd/ctf-trace.c
> @@ -1,6 +1,7 @@
> /*
> * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify it
> * under the terms of the GNU General Public License, version 2 only, as
> @@ -22,148 +23,191 @@
>
> #include <common/common.h>
> #include <common/utils.h>
> +#include <urcu/rculist.h>
>
> #include "ctf-trace.h"
> #include "lttng-relayd.h"
> #include "stream.h"
>
> static uint64_t last_relay_ctf_trace_id;
> +static pthread_mutex_t last_relay_ctf_trace_id_lock =
> PTHREAD_MUTEX_INITIALIZER;
>
> -static void rcu_destroy_ctf_trace(struct rcu_head *head)
> +static void rcu_destroy_ctf_trace(struct rcu_head *rcu_head)
> {
> - struct lttng_ht_node_str *node =
> - caa_container_of(head, struct lttng_ht_node_str, head);
> - struct ctf_trace *trace=
> - caa_container_of(node, struct ctf_trace, node);
> + struct ctf_trace *trace =
> + caa_container_of(rcu_head, struct ctf_trace, rcu_node);
>
> free(trace);
> }
>
> -static void rcu_destroy_stream(struct rcu_head *head)
> -{
> - struct relay_stream *stream =
> - caa_container_of(head, struct relay_stream, rcu_node);
> -
> - stream_destroy(stream);
> -}
> -
> /*
> * Destroy a ctf trace and all stream contained in it.
> *
> * MUST be called with the RCU read side lock.
> */
> -void ctf_trace_destroy(struct ctf_trace *obj)
> +void ctf_trace_destroy(struct ctf_trace *trace)
> {
> - struct relay_stream *stream, *tmp_stream;
> -
> - assert(obj);
> /*
> - * Getting to this point, every stream referenced to that object have put
> - * back their ref since the've been closed by the control side.
> + * Getting to this point, every stream referenced by that traceect
> + * have put back their ref since the've been closed by the
> + * control side.
> */
> - assert(!obj->refcount);
> + assert(cds_list_empty(&trace->stream_list));
> + session_put(trace->session);
> + trace->session = NULL;
> + call_rcu(&trace->rcu_node, rcu_destroy_ctf_trace);
> +}
>
> - cds_list_for_each_entry_safe(stream, tmp_stream, &obj->stream_list,
> - trace_list) {
> - stream_delete(relay_streams_ht, stream);
> - call_rcu(&stream->rcu_node, rcu_destroy_stream);
> - }
> +void ctf_trace_release(struct urcu_ref *ref)
> +{
> + struct ctf_trace *trace =
> + caa_container_of(ref, struct ctf_trace, ref);
> + int ret;
> + struct lttng_ht_iter iter;
>
> - call_rcu(&obj->node.head, rcu_destroy_ctf_trace);
> + iter.iter.node = &trace->node.node;
> + ret = lttng_ht_del(trace->session->ctf_traces_ht, &iter);
> + assert(!ret);
> + ctf_trace_destroy(trace);
> }
>
> -void ctf_trace_try_destroy(struct relay_session *session,
> - struct ctf_trace *ctf_trace)
> +/*
> + * Should be called with RCU read-side lock held.
> + */
> +bool ctf_trace_get(struct ctf_trace *trace)
> {
> - assert(session);
> - assert(ctf_trace);
> + bool has_ref = false;
>
> - /*
> - * Considering no viewer attach to the session and the trace having no more
> - * stream attached, wipe the trace.
> - */
> - if (uatomic_read(&session->viewer_refcount) == 0 &&
> - uatomic_read(&ctf_trace->refcount) == 0) {
> - ctf_trace_delete(session->ctf_traces_ht, ctf_trace);
> - ctf_trace_destroy(ctf_trace);
> + /* Confirm that the trace refcount has not reached 0. */
> + pthread_mutex_lock(&trace->reflock);
> + if (trace->ref.refcount != 0) {
> + has_ref = true;
> + urcu_ref_get(&trace->ref);
> }
> + pthread_mutex_unlock(&trace->reflock);
> +
> + return has_ref;
> }
>
> /*
> - * Create and return an allocated ctf_trace object. NULL on error.
> + * Create and return an allocated ctf_trace. NULL on error.
> + * There is no "open" and "close" for a ctf_trace, but rather just a
> + * create and refcounting. Whenever all the streams belonging to a trace
> + * put their reference, its refcount drops to 0.
> */
> -struct ctf_trace *ctf_trace_create(char *path_name)
> +static struct ctf_trace *ctf_trace_create(struct relay_session *session,
> + char *path_name)
> {
> - struct ctf_trace *obj;
> + struct ctf_trace *trace;
>
> - assert(path_name);
> -
> - obj = zmalloc(sizeof(*obj));
> - if (!obj) {
> + trace = zmalloc(sizeof(*trace));
> + if (!trace) {
> PERROR("ctf_trace alloc");
> goto error;
> }
>
> - CDS_INIT_LIST_HEAD(&obj->stream_list);
> + if (!session_get(session)) {
> + ERR("Cannot get session");
> + free(trace);
> + trace = NULL;
> + goto error;
> + }
> + trace->session = session;
> +
> + CDS_INIT_LIST_HEAD(&trace->stream_list);
> +
> + pthread_mutex_lock(&last_relay_ctf_trace_id_lock);
> + trace->id = ++last_relay_ctf_trace_id;
> + pthread_mutex_unlock(&last_relay_ctf_trace_id_lock);
>
> - obj->id = ++last_relay_ctf_trace_id;
> - lttng_ht_node_init_str(&obj->node, path_name);
> + lttng_ht_node_init_str(&trace->node, path_name);
> + trace->session = session;
> + urcu_ref_init(&trace->ref);
> + pthread_mutex_init(&trace->lock, NULL);
> + pthread_mutex_init(&trace->reflock, NULL);
> + pthread_mutex_init(&trace->stream_list_lock, NULL);
> + lttng_ht_add_str(session->ctf_traces_ht, &trace->node);
>
> - DBG("Created ctf_trace %" PRIu64 " with path: %s", obj->id, path_name);
> + DBG("Created ctf_trace %" PRIu64 " with path: %s", trace->id, path_name);
>
> error:
> - return obj;
> + return trace;
> }
>
> /*
> - * Return a ctf_trace object if found by id in the given hash table else NULL.
> - *
> - * Must be called with rcu_read_lock() taken.
> + * Return a ctf_trace if found by id in the given hash table else NULL.
> + * Hold a reference on the ctf_trace, and must be paired with
> + * ctf_trace_put().
> */
> -struct ctf_trace *ctf_trace_find_by_path(struct lttng_ht *ht,
> +struct ctf_trace *ctf_trace_get_by_path_or_create(struct relay_session
> *session,
> char *path_name)
> {
> struct lttng_ht_node_str *node;
> struct lttng_ht_iter iter;
> struct ctf_trace *trace = NULL;
>
> - assert(ht);
> -
> - lttng_ht_lookup(ht, (void *) path_name, &iter);
> + rcu_read_lock();
> + lttng_ht_lookup(session->ctf_traces_ht, (void *) path_name, &iter);
> node = lttng_ht_iter_get_node_str(&iter);
> if (!node) {
> DBG("CTF Trace path %s not found", path_name);
> goto end;
> }
> trace = caa_container_of(node, struct ctf_trace, node);
> -
> + if (!ctf_trace_get(trace)) {
> + trace = NULL;
> + }
> end:
> + rcu_read_unlock();
> + if (!trace) {
> + /* Try to create */
> + trace = ctf_trace_create(session, path_name);
> + }
> return trace;
> }
>
> -/*
> - * Add stream to a given hash table.
> - */
> -void ctf_trace_add(struct lttng_ht *ht, struct ctf_trace *trace)
> +void ctf_trace_put(struct ctf_trace *trace)
> {
> - assert(ht);
> - assert(trace);
> -
> - lttng_ht_add_str(ht, &trace->node);
> + rcu_read_lock();
> + pthread_mutex_lock(&trace->reflock);
> + urcu_ref_put(&trace->ref, ctf_trace_release);
> + pthread_mutex_unlock(&trace->reflock);
> + rcu_read_unlock();
> }
>
> -/*
> - * Delete stream from a given hash table.
> - */
> -void ctf_trace_delete(struct lttng_ht *ht, struct ctf_trace *trace)
> +int ctf_trace_close(struct ctf_trace *trace)
> {
> - int ret;
> - struct lttng_ht_iter iter;
> + struct relay_stream *stream;
> +
> + rcu_read_lock();
> + cds_list_for_each_entry_rcu(stream, &trace->stream_list,
> + stream_node) {
> + /*
> + * Close the stream.
> + */
> + stream_close(stream);
> + }
> + rcu_read_unlock();
> + /*
> + * Since all references to the trace are held by its streams, we
> + * don't need to do any self-ref put.
> + */
> + return 0;
> +}
>
> - assert(ht);
> - assert(trace);
> +struct relay_viewer_stream *ctf_trace_get_viewer_metadata_stream(struct
> ctf_trace *trace)
> +{
> + struct relay_viewer_stream *vstream;
>
> - iter.iter.node = &trace->node.node;
> - ret = lttng_ht_del(ht, &iter);
> - assert(!ret);
> + rcu_read_lock();
> + vstream = rcu_dereference(trace->viewer_metadata_stream);
> + if (!vstream) {
> + goto end;
> + }
> + if (!viewer_stream_get(vstream)) {
> + vstream = NULL;
> + }
> +end:
> + rcu_read_unlock();
> + return vstream;
> }
> diff --git a/src/bin/lttng-relayd/ctf-trace.h b/src/bin/lttng-relayd/ctf-trace.h
> index 489c5f1..d051f80 100644
> --- a/src/bin/lttng-relayd/ctf-trace.h
> +++ b/src/bin/lttng-relayd/ctf-trace.h
> @@ -1,6 +1,10 @@
> +#ifndef _CTF_TRACE_H
> +#define _CTF_TRACE_H
> +
> /*
> * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify it
> * under the terms of the GNU General Public License, version 2 only, as
> @@ -16,49 +20,53 @@
> * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
> */
>
> -#ifndef _CTF_TRACE_H
> -#define _CTF_TRACE_H
> -
> #include <inttypes.h>
> +#include <urcu/ref.h>
>
> #include <common/hashtable/hashtable.h>
>
> #include "session.h"
> #include "stream.h"
> +#include "viewer-stream.h"
>
> struct ctf_trace {
> - int refcount;
> - unsigned int invalid_flag:1;
> + /*
> + * The ctf_trace reflock nests inside the stream reflock.
> + */
> + pthread_mutex_t reflock; /* Protects refcounting */
> + struct urcu_ref ref; /* Every stream has a ref on the trace. */
> + struct relay_session *session; /* Back ref to trace session */
> +
> + /*
> + * The ctf_trace lock nests inside the session lock.
> + */
> + pthread_mutex_t lock;
> uint64_t id;
> - uint64_t metadata_received;
> - uint64_t metadata_sent;
> - struct relay_stream *metadata_stream;
> - struct relay_viewer_stream *viewer_metadata_stream;
> - /* Node indexed by stream path name in the corresponding session. */
> - struct lttng_ht_node_str node;
> + struct relay_viewer_stream *viewer_metadata_stream; /* RCU protected */
>
> - /* Relay stream associated with this ctf trace. */
> + /*
> + * Relay streams associated with this ctf trace.
> + * Updates are protected by the stream_list lock.
> + * Traversals are protected by RCU.
> + */
> struct cds_list_head stream_list;
> + pthread_mutex_t stream_list_lock;
> +
> + /*
> + * Node within session trace hash table. Node is indexed by
> + * stream path name.
> + */
> + struct lttng_ht_node_str node;
> + struct rcu_head rcu_node; /* For call_rcu teardown. */
> };
>
> -static inline void ctf_trace_get_ref(struct ctf_trace *trace)
> -{
> - uatomic_inc(&trace->refcount);
> -}
> +struct ctf_trace *ctf_trace_get_by_path_or_create(struct relay_session
> *session,
> + char *path_name);
> +bool ctf_trace_get(struct ctf_trace *trace);
> +void ctf_trace_put(struct ctf_trace *trace);
>
> -static inline void ctf_trace_put_ref(struct ctf_trace *trace)
> -{
> - uatomic_add(&trace->refcount, -1);
> -}
> +int ctf_trace_close(struct ctf_trace *trace);
>
> -void ctf_trace_assign(struct relay_stream *stream);
> -struct ctf_trace *ctf_trace_create(char *path_name);
> -void ctf_trace_destroy(struct ctf_trace *obj);
> -void ctf_trace_try_destroy(struct relay_session *session,
> - struct ctf_trace *ctf_trace);
> -struct ctf_trace *ctf_trace_find_by_path(struct lttng_ht *ht,
> - char *path_name);
> -void ctf_trace_add(struct lttng_ht *ht, struct ctf_trace *trace);
> -void ctf_trace_delete(struct lttng_ht *ht, struct ctf_trace *trace);
> +struct relay_viewer_stream *ctf_trace_get_viewer_metadata_stream(struct
> ctf_trace *trace);
>
> #endif /* _CTF_TRACE_H */
> diff --git a/src/bin/lttng-relayd/index.c b/src/bin/lttng-relayd/index.c
> index b7507a0..c092b7f 100644
> --- a/src/bin/lttng-relayd/index.c
> +++ b/src/bin/lttng-relayd/index.c
> @@ -1,6 +1,7 @@
> /*
> * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify it
> * under the terms of the GNU General Public License, version 2 only, as
> @@ -24,190 +25,307 @@
> #include <common/utils.h>
>
> #include "lttng-relayd.h"
> +#include "stream.h"
> #include "index.h"
>
> /*
> - * Deferred free of a relay index object. MUST only be called by a call RCU.
> + * Allocate a new relay index object. Pass the stream in which it is
> + * contained as parameter. The sequence number will be used as the hash
> + * table key.
> + *
> + * Called with stream mutex held.
> + * Return allocated object or else NULL on error.
> */
> -static void deferred_free_relay_index(struct rcu_head *head)
> +static struct relay_index *relay_index_create(struct relay_stream *stream,
> + uint64_t net_seq_num)
> {
> - struct relay_index *index =
> - caa_container_of(head, struct relay_index, rcu_node);
> + struct relay_index *index;
>
> - if (index->to_close_fd >= 0) {
> - int ret;
> + DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64,
> + stream->stream_handle, net_seq_num);
>
> - ret = close(index->to_close_fd);
> - if (ret < 0) {
> - PERROR("Relay index to close fd %d", index->to_close_fd);
> - }
> + index = zmalloc(sizeof(*index));
> + if (!index) {
> + PERROR("Relay index zmalloc");
> + goto end;
> + }
> + if (!stream_get(stream)) {
> + ERR("Cannot get stream");
> + free(index);
> + index = NULL;
> + goto end;
> }
> + index->stream = stream;
>
> - relay_index_free(index);
> + lttng_ht_node_init_u64(&index->index_n, net_seq_num);
> + pthread_mutex_init(&index->lock, NULL);
> + pthread_mutex_init(&index->reflock, NULL);
> + urcu_ref_init(&index->ref);
> +
> +end:
> + return index;
> }
>
> /*
> - * Allocate a new relay index object using the given stream ID and sequence
> - * number as the hash table key.
> + * Add unique relay index to the given hash table. In case of a collision, the
> + * already existing object is put in the given _index variable.
> *
> - * Return allocated object or else NULL on error.
> + * RCU read side lock MUST be acquired.
> */
> -struct relay_index *relay_index_create(uint64_t stream_id,
> - uint64_t net_seq_num)
> +static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
> + struct relay_index *index)
> {
> - struct relay_index *index;
> + struct cds_lfht_node *node_ptr;
> + struct relay_index *_index;
>
> - DBG2("Creating relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
> - stream_id, net_seq_num);
> + DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
> + stream->stream_handle, index->index_n.key);
>
> - index = zmalloc(sizeof(*index));
> - if (index == NULL) {
> - PERROR("Relay index zmalloc");
> - goto error;
> + node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht,
> + stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed),
> + stream->indexes_ht->match_fct, &index->index_n,
> + &index->index_n.node);
> + if (node_ptr != &index->index_n.node) {
> + _index = caa_container_of(node_ptr, struct relay_index,
> + index_n.node);
> + } else {
> + _index = NULL;
> }
> + return _index;
> +}
> +
> +/*
> + * Should be called with RCU read-side lock held.
> + */
> +static bool relay_index_get(struct relay_index *index)
> +{
> + bool has_ref = false;
>
> - index->to_close_fd = -1;
> - lttng_ht_node_init_two_u64(&index->index_n, stream_id, net_seq_num);
> + DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
> + index->stream->stream_handle, index->index_n.key,
> + (int) index->ref.refcount);
>
> -error:
> - return index;
> + /* Confirm that the index refcount has not reached 0. */
> + pthread_mutex_lock(&index->reflock);
> + if (index->ref.refcount != 0) {
> + has_ref = true;
> + urcu_ref_get(&index->ref);
> + }
> + pthread_mutex_unlock(&index->reflock);
> +
> + return has_ref;
> }
>
> /*
> - * Find a relayd index in the given hash table.
> + * Get a relayd index in within the given stream, or create it if not
> + * present.
> *
> + * Called with stream mutex held.
> * Return index object or else NULL on error.
> */
> -struct relay_index *relay_index_find(uint64_t stream_id, uint64_t net_seq_num)
> +struct relay_index *relay_index_get_by_id_or_create(struct relay_stream
> *stream,
> + uint64_t net_seq_num)
> {
> - struct lttng_ht_node_two_u64 *node;
> + struct lttng_ht_node_u64 *node;
> struct lttng_ht_iter iter;
> - struct lttng_ht_two_u64 key;
> struct relay_index *index = NULL;
>
> DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64,
> - stream_id, net_seq_num);
> -
> - key.key1 = stream_id;
> - key.key2 = net_seq_num;
> + stream->stream_handle, net_seq_num);
>
> - lttng_ht_lookup(indexes_ht, (void *)(&key), &iter);
> - node = lttng_ht_iter_get_node_two_u64(&iter);
> - if (node == NULL) {
> - goto end;
> + rcu_read_lock();
> + lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter);
> + node = lttng_ht_iter_get_node_u64(&iter);
> + if (node) {
> + index = caa_container_of(node, struct relay_index, index_n);
> + } else {
> + struct relay_index *oldindex;
> +
> + index = relay_index_create(stream, net_seq_num);
> + if (!index) {
> + ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64,
> + index->stream->stream_handle, net_seq_num);
> + goto end;
> + }
> + oldindex = relay_index_add_unique(stream, index);
> + if (oldindex) {
> + /* Added concurrently, keep old. */
> + relay_index_put(index);
> + index = oldindex;
> + if (!relay_index_get(index)) {
> + index = NULL;
> + }
> + } else {
> + stream->indexes_in_flight++;
> + index->in_hash_table = true;
> + }
> }
> - index = caa_container_of(node, struct relay_index, index_n);
> -
> end:
> - DBG2("Index %sfound in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
> - (index == NULL) ? "NOT " : "", stream_id, net_seq_num);
> + rcu_read_unlock();
> + DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %"
> PRIu64,
> + (index == NULL) ? "NOT " : "", index->stream->stream_handle, net_seq_num);
> return index;
> }
>
> -/*
> - * Add unique relay index to the given hash table. In case of a collision, the
> - * already existing object is put in the given _index variable.
> - *
> - * RCU read side lock MUST be acquired.
> - */
> -void relay_index_add(struct relay_index *index, struct relay_index **_index)
> +int relay_index_set_fd(struct relay_index *index, struct stream_fd *index_fd,
> + uint64_t data_offset)
> {
> - struct cds_lfht_node *node_ptr;
> + int ret = 0;
>
> - assert(index);
> + pthread_mutex_lock(&index->lock);
> + if (index->index_fd) {
> + ret = -1;
> + goto end;
> + }
> + stream_fd_get(index_fd);
> + index->index_fd = index_fd;
> + index->index_data.offset = data_offset;
> +end:
> + pthread_mutex_unlock(&index->lock);
> + return ret;
> +}
>
> - DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
> - index->index_n.key.key1, index->index_n.key.key2);
> +int relay_index_set_data(struct relay_index *index,
> + const struct ctf_packet_index *data)
> +{
> + int ret = 0;
>
> - node_ptr = cds_lfht_add_unique(indexes_ht->ht,
> - indexes_ht->hash_fct((void *) &index->index_n.key, lttng_ht_seed),
> - indexes_ht->match_fct, (void *) &index->index_n.key,
> - &index->index_n.node);
> - if (node_ptr != &index->index_n.node) {
> - *_index = caa_container_of(node_ptr, struct relay_index, index_n.node);
> + pthread_mutex_lock(&index->lock);
> + if (index->has_index_data) {
> + ret = -1;
> + goto end;
> }
> + /* Set everything except data_offset. */
> + index->index_data.packet_size = data->packet_size;
> + index->index_data.content_size = data->content_size;
> + index->index_data.timestamp_begin = data->timestamp_begin;
> + index->index_data.timestamp_end = data->timestamp_end;
> + index->index_data.events_discarded = data->events_discarded;
> + index->index_data.stream_id = data->stream_id;
> + index->has_index_data = true;
> +end:
> + pthread_mutex_unlock(&index->lock);
> + return ret;
> }
>
> -/*
> - * Write index on disk to the given fd. Once done error or not, it is removed
> - * from the hash table and destroy the object.
> - *
> - * MUST be called with a RCU read side lock held.
> - *
> - * Return 0 on success else a negative value.
> - */
> -int relay_index_write(int fd, struct relay_index *index)
> +static void index_destroy(struct relay_index *index)
> {
> - int ret;
> - struct lttng_ht_iter iter;
> -
> - DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
> - " on fd %d", index->index_n.key.key1,
> - index->index_n.key.key2, fd);
> + free(index);
> +}
>
> - /* Delete index from hash table. */
> - iter.iter.node = &index->index_n.node;
> - ret = lttng_ht_del(indexes_ht, &iter);
> - assert(!ret);
> - call_rcu(&index->rcu_node, deferred_free_relay_index);
> +static void index_destroy_rcu(struct rcu_head *rcu_head)
> +{
> + struct relay_index *index =
> + caa_container_of(rcu_head, struct relay_index, rcu_node);
>
> - return index_write(fd, &index->index_data, sizeof(index->index_data));
> + index_destroy(index);
> }
>
> -/*
> - * Free the given index.
> - */
> -void relay_index_free(struct relay_index *index)
> +static void index_release(struct urcu_ref *ref)
> {
> - free(index);
> + struct relay_index *index = caa_container_of(ref, struct relay_index, ref);
> + struct relay_stream *stream = index->stream;
> + int ret;
> + struct lttng_ht_iter iter;
> +
> + if (index->index_fd) {
> + stream_fd_put(index->index_fd);
> + index->index_fd = NULL;
> + }
> + if (index->in_hash_table) {
> + /* Delete index from hash table. */
> + iter.iter.node = &index->index_n.node;
> + ret = lttng_ht_del(stream->indexes_ht, &iter);
> + assert(!ret);
> + stream->indexes_in_flight--;
> + }
> +
> + stream_put(index->stream);
> + index->stream = NULL;
> +
> + call_rcu(&index->rcu_node, index_destroy_rcu);
> }
>
> /*
> - * Safely free the given index using a call RCU.
> + * Called with stream mutex held.
> */
> -void relay_index_free_safe(struct relay_index *index)
> +void relay_index_put(struct relay_index *index)
> {
> - if (!index) {
> - return;
> - }
> -
> - call_rcu(&index->rcu_node, deferred_free_relay_index);
> + DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
> + index->stream->stream_handle, index->index_n.key,
> + (int) index->ref.refcount);
> + /*
> + * Ensure existance of index->lock for index unlock.
> + */
> + rcu_read_lock();
> + /*
> + * Index lock ensures that concurrent test and update of stream
> + * ref is atomic.
> + */
> + pthread_mutex_lock(&index->reflock);
> + assert(index->ref.refcount != 0);
> + urcu_ref_put(&index->ref, index_release);
> + pthread_mutex_unlock(&index->reflock);
> + rcu_read_unlock();
> }
>
> /*
> - * Delete index from the given hash table.
> + * Try to flush index to disk. Releases self-reference to index once
> + * flush succeeds.
> *
> - * RCU read side lock MUST be acquired.
> + * Return 0 on successful flush, a negative value on error, or positive
> + * value if no flush was performed.
> */
> -void relay_index_delete(struct relay_index *index)
> +int relay_index_try_flush(struct relay_index *index)
> {
> - int ret;
> - struct lttng_ht_iter iter;
> + int ret = 1;
> + bool flushed = false;
> + int fd;
>
> - DBG3("Relay index with stream ID %" PRIu64 " and seq num %" PRIu64
> - " deleted.", index->index_n.key.key1,
> - index->index_n.key.key2);
> + pthread_mutex_lock(&index->lock);
> + if (index->flushed) {
> + goto skip;
> + }
> + /* Check if we are ready to flush. */
> + if (!index->has_index_data || !index->index_fd) {
> + goto skip;
> + }
> + fd = index->index_fd->fd;
> + DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
> + " on fd %d", index->stream->stream_handle,
> + index->index_n.key, fd);
> + flushed = true;
> + index->flushed = true;
> + ret = index_write(fd, &index->index_data, sizeof(index->index_data));
> + if (ret == sizeof(index->index_data)) {
> + ret = 0;
> + } else {
> + ret = -1;
> + }
> +skip:
> + pthread_mutex_unlock(&index->lock);
>
> - /* Delete index from hash table. */
> - iter.iter.node = &index->index_n.node;
> - ret = lttng_ht_del(indexes_ht, &iter);
> - assert(!ret);
> + if (flushed) {
> + /* Put self-ref from index now that it has been flushed. */
> + relay_index_put(index);
> + }
> + return ret;
> }
>
> /*
> - * Destroy every relay index with the given stream id as part of the key.
> + * Close every relay index within a given stream, without flushing
> + * them.
> */
> -void relay_index_destroy_by_stream_id(uint64_t stream_id)
> +void relay_index_close_all(struct relay_stream *stream)
> {
> struct lttng_ht_iter iter;
> struct relay_index *index;
>
> rcu_read_lock();
> - cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, index_n.node) {
> - if (index->index_n.key.key1 == stream_id) {
> - relay_index_delete(index);
> - relay_index_free_safe(index);
> - }
> + cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
> + index, index_n.node) {
> + /* Put self-ref from index. */
> + relay_index_put(index);
> }
> rcu_read_unlock();
> }
> diff --git a/src/bin/lttng-relayd/index.h b/src/bin/lttng-relayd/index.h
> index e7f9cdb..e882ed9 100644
> --- a/src/bin/lttng-relayd/index.h
> +++ b/src/bin/lttng-relayd/index.h
> @@ -1,6 +1,10 @@
> +#ifndef _RELAY_INDEX_H
> +#define _RELAY_INDEX_H
> +
> /*
> * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify it
> * under the terms of the GNU General Public License, version 2 only, as
> @@ -16,42 +20,56 @@
> * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
> */
>
> -#ifndef _RELAY_INDEX_H
> -#define _RELAY_INDEX_H
> -
> #include <inttypes.h>
> #include <pthread.h>
>
> #include <common/hashtable/hashtable.h>
> #include <common/index/index.h>
>
> +#include "stream-fd.h"
> +
> +struct relay_stream;
> +
> struct relay_index {
> - /* FD on which to write the index data. */
> - int fd;
> /*
> - * When destroying this object, this fd is checked and if valid, close it
> - * so this is basically a lazy close of the previous fd corresponding to
> - * the same stream id. This is used for the rotate file feature.
> + * index lock nests inside stream lock.
> */
> - int to_close_fd;
> + pthread_mutex_t reflock; /* Protects refcounting. */
> + struct urcu_ref ref; /* Reference from getters. */
> + struct relay_stream *stream; /* Back ref to stream */
> +
> + pthread_mutex_t lock;
> + /*
> + * FD on which to write the index data. May differ from
> + * stream->index_fd due to tracefile rotation.
> + */
> + struct stream_fd *index_fd;
>
> /* Index packet data. This is the data that is written on disk. */
> struct ctf_packet_index index_data;
>
> - /* key1 = stream_id, key2 = net_seq_num */
> - struct lttng_ht_node_two_u64 index_n;
> - struct rcu_head rcu_node;
> - pthread_mutex_t mutex;
> + bool has_index_data;
> + bool flushed;
> + bool in_hash_table;
> +
> + /*
> + * Node within indexes_ht that corresponds to this struct
> + * relay_index. Indexed by net_seq_num, which is unique for this
> + * index across the stream.
> + */
> + struct lttng_ht_node_u64 index_n;
> + struct rcu_head rcu_node; /* For call_rcu teardown. */
> };
>
> -struct relay_index *relay_index_create(uint64_t stream_id,
> +struct relay_index *relay_index_get_by_id_or_create(struct relay_stream
> *stream,
> uint64_t net_seq_num);
> -struct relay_index *relay_index_find(uint64_t stream_id, uint64_t net_seq_num);
> -void relay_index_add(struct relay_index *index, struct relay_index **_index);
> -int relay_index_write(int fd, struct relay_index *index);
> -void relay_index_free(struct relay_index *index);
> -void relay_index_free_safe(struct relay_index *index);
> -void relay_index_delete(struct relay_index *index);
> -void relay_index_destroy_by_stream_id(uint64_t stream_id);
> +void relay_index_put(struct relay_index *index);
> +int relay_index_set_fd(struct relay_index *index, struct stream_fd *index_fd,
> + uint64_t data_offset);
> +int relay_index_set_data(struct relay_index *index,
> + const struct ctf_packet_index *data);
> +int relay_index_try_flush(struct relay_index *index);
> +
> +void relay_index_close_all(struct relay_stream *stream);
>
> #endif /* _RELAY_INDEX_H */
> diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c
> index 562a7fa..64d37fa 100644
> --- a/src/bin/lttng-relayd/live.c
> +++ b/src/bin/lttng-relayd/live.c
> @@ -1,6 +1,7 @@
> /*
> * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify
> * it under the terms of the GNU General Public License, version 2 only,
> @@ -36,6 +37,7 @@
> #include <inttypes.h>
> #include <urcu/futex.h>
> #include <urcu/uatomic.h>
> +#include <urcu/rculist.h>
> #include <unistd.h>
> #include <fcntl.h>
> #include <config.h>
> @@ -65,6 +67,9 @@
> #include "session.h"
> #include "ctf-trace.h"
> #include "connection.h"
> +#include "viewer-session.h"
> +
> +#define SESSION_BUF_DEFAULT_COUNT 16
>
> static struct lttng_uri *live_uri;
>
> @@ -90,6 +95,8 @@ static pthread_t live_worker_thread;
> static struct relay_conn_queue viewer_conn_queue;
>
> static uint64_t last_relay_viewer_session_id;
> +static pthread_mutex_t last_relay_viewer_session_id_lock =
> + PTHREAD_MUTEX_INITIALIZER;
>
> /*
> * Cleanup the daemon
> @@ -114,9 +121,6 @@ ssize_t recv_request(struct lttcomm_sock *sock, void *buf,
> size_t size)
> {
> ssize_t ret;
>
> - assert(sock);
> - assert(buf);
> -
> ret = sock->ops->recvmsg(sock, buf, size, 0);
> if (ret < 0 || ret != size) {
> if (ret == 0) {
> @@ -143,9 +147,6 @@ ssize_t send_response(struct lttcomm_sock *sock, void *buf,
> size_t size)
> {
> ssize_t ret;
>
> - assert(sock);
> - assert(buf);
> -
> ret = sock->ops->sendmsg(sock, buf, size, 0);
> if (ret < 0) {
> ERR("Relayd failed to send response.");
> @@ -171,17 +172,22 @@ int check_new_streams(struct relay_connection *conn)
> if (!conn->viewer_session) {
> goto end;
> }
> - cds_list_for_each_entry(session,
> - &conn->viewer_session->sessions_head,
> - viewer_session_list) {
> + rcu_read_lock();
> + cds_list_for_each_entry_rcu(session,
> + &conn->viewer_session->session_list,
> + viewer_session_node) {
> + if (!session_get(session)) {
> + continue;
> + }
> current_val = uatomic_cmpxchg(&session->new_streams, 1, 0);
> ret = current_val;
> + session_put(session);
> if (ret == 1) {
> goto end;
> }
> }
> -
> end:
> + rcu_read_unlock();
> return ret;
> }
>
> @@ -200,8 +206,6 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock,
> struct lttng_ht_iter iter;
> struct relay_viewer_stream *vstream;
>
> - assert(session);
> -
> rcu_read_lock();
>
> cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream,
> @@ -210,30 +214,38 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock,
>
> health_code_update();
>
> + if (!viewer_stream_get(vstream)) {
> + continue;
> + }
> +
> + pthread_mutex_lock(&vstream->lock);
> /* Ignore if not the same session. */
> - if (vstream->session_id != session->id ||
> + if (vstream->stream->trace->session->id != session->id ||
> (!ignore_sent_flag && vstream->sent_flag)) {
> + pthread_mutex_unlock(&vstream->lock);
> + viewer_stream_put(vstream);
> continue;
> }
>
> - ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
> - vstream->path_name);
> - assert(ctf_trace);
> -
> - send_stream.id = htobe64(vstream->stream_handle);
> + ctf_trace = vstream->stream->trace;
> + send_stream.id = htobe64(vstream->stream->stream_handle);
> send_stream.ctf_trace_id = htobe64(ctf_trace->id);
> - send_stream.metadata_flag = htobe32(vstream->metadata_flag);
> + send_stream.metadata_flag = htobe32(vstream->stream->is_metadata);
> strncpy(send_stream.path_name, vstream->path_name,
> sizeof(send_stream.path_name));
> strncpy(send_stream.channel_name, vstream->channel_name,
> sizeof(send_stream.channel_name));
>
> - DBG("Sending stream %" PRIu64 " to viewer", vstream->stream_handle);
> + DBG("Sending stream %" PRIu64 " to viewer",
> + vstream->stream->stream_handle);
> + vstream->sent_flag = 1;
> + pthread_mutex_unlock(&vstream->lock);
> +
> ret = send_response(sock, &send_stream, sizeof(send_stream));
> if (ret < 0) {
> goto end_unlock;
> }
> - vstream->sent_flag = 1;
> + viewer_stream_put(vstream);
> }
>
> ret = 0;
> @@ -263,17 +275,14 @@ int make_viewer_streams(struct relay_session *session,
> assert(session);
>
> /*
> - * This is to make sure we create viewer streams for a full received
> - * channel. For instance, if we have 8 streams for a channel that are
> - * concurrently being flagged ready, we can end up creating just a subset
> - * of the 8 streams (the ones that are flagged). This lock avoids this
> - * limbo state.
> + * Hold the session lock to ensure that we see either none or
> + * all initial streams for a session, but no intermediate state.
> */
> - pthread_mutex_lock(&session->viewer_ready_lock);
> + pthread_mutex_lock(&session->lock);
>
> /*
> - * Create viewer streams for relay streams that are ready to be used for a
> - * the given session id only.
> + * Create viewer streams for relay streams that are ready to be
> + * used for a the given session id only.
> */
> rcu_read_lock();
> cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace,
> @@ -282,47 +291,59 @@ int make_viewer_streams(struct relay_session *session,
>
> health_code_update();
>
> - if (ctf_trace->invalid_flag) {
> + if (!ctf_trace_get(ctf_trace)) {
> continue;
> }
>
> - cds_list_for_each_entry(stream, &ctf_trace->stream_list, trace_list) {
> + cds_list_for_each_entry_rcu(stream, &ctf_trace->stream_list, stream_node) {
> struct relay_viewer_stream *vstream;
>
> - if (!stream->viewer_ready) {
> + if (!stream_get(stream)) {
> continue;
> }
> -
> - vstream = viewer_stream_find_by_id(stream->stream_handle);
> + /*
> + * stream published is protected by the session
> + * lock.
> + */
> + if (!stream->published) {
> + goto next;
> + }
> + vstream = viewer_stream_get_by_id(stream->stream_handle);
> if (!vstream) {
> - vstream = viewer_stream_create(stream, seek_t, ctf_trace);
> + vstream = viewer_stream_create(stream, seek_t);
> if (!vstream) {
> ret = -1;
> + ctf_trace_put(ctf_trace);
> + stream_put(stream);
> goto error_unlock;
> }
> - /* Acquire reference to ctf_trace. */
> - ctf_trace_get_ref(ctf_trace);
>
> if (nb_created) {
> /* Update number of created stream counter. */
> (*nb_created)++;
> }
> - } else if (!vstream->sent_flag && nb_unsent) {
> - /* Update number of unsent stream counter. */
> - (*nb_unsent)++;
> + } else {
> + if (!vstream->sent_flag && nb_unsent) {
> + /* Update number of unsent stream counter. */
> + (*nb_unsent)++;
> + }
> + viewer_stream_put(vstream);
> }
> /* Update number of total stream counter. */
> if (nb_total) {
> (*nb_total)++;
> }
> + next:
> + stream_put(stream);
> }
> + ctf_trace_put(ctf_trace);
> }
>
> ret = 0;
>
> error_unlock:
> rcu_read_unlock();
> - pthread_mutex_unlock(&session->viewer_ready_lock);
> + pthread_mutex_unlock(&session->lock);
> return ret;
> }
>
> @@ -505,22 +526,17 @@ restart:
> goto error;
> } else if (revents & LPOLLIN) {
> /*
> - * Get allocated in this thread, enqueued to a global queue,
> - * dequeued and freed in the worker thread.
> + * Get allocated in this thread,
> + * enqueued to a global queue, dequeued
> + * and freed in the worker thread.
> */
> int val = 1;
> struct relay_connection *new_conn;
> struct lttcomm_sock *newsock;
>
> - new_conn = connection_create();
> - if (!new_conn) {
> - goto error;
> - }
> -
> newsock = live_control_sock->ops->accept(live_control_sock);
> if (!newsock) {
> PERROR("accepting control sock");
> - connection_free(new_conn);
> goto error;
> }
> DBG("Relay viewer connection accepted socket %d", newsock->fd);
> @@ -530,18 +546,22 @@ restart:
> if (ret < 0) {
> PERROR("setsockopt inet");
> lttcomm_destroy_sock(newsock);
> - connection_free(new_conn);
> goto error;
> }
> - new_conn->sock = newsock;
> + new_conn = connection_create(newsock, RELAY_CONNECTION_UNKNOWN);
> + if (!new_conn) {
> + lttcomm_destroy_sock(newsock);
> + goto error;
> + }
>
> /* Enqueue request for the dispatcher thread. */
> cds_wfcq_enqueue(&viewer_conn_queue.head, &viewer_conn_queue.tail,
> &new_conn->qnode);
>
> /*
> - * Wake the dispatch queue futex. Implicit memory barrier with
> - * the exchange in cds_wfcq_enqueue.
> + * Wake the dispatch queue futex.
> + * Implicit memory barrier with the
> + * exchange in cds_wfcq_enqueue.
> */
> futex_nto1_wake(&viewer_conn_queue.futex);
> }
> @@ -618,14 +638,15 @@ void *thread_dispatcher(void *data)
> conn->sock->fd);
>
> /*
> - * Inform worker thread of the new request. This call is blocking
> - * so we can be assured that the data will be read at some point in
> - * time or wait to the end of the world :)
> + * Inform worker thread of the new request. This
> + * call is blocking so we can be assured that
> + * the data will be read at some point in time
> + * or wait to the end of the world :)
> */
> ret = lttng_write(live_conn_pipe[1], &conn, sizeof(conn));
> if (ret < 0) {
> PERROR("write conn pipe");
> - connection_destroy(conn);
> + connection_put(conn);
> goto error;
> }
> } while (node != NULL);
> @@ -664,8 +685,6 @@ int viewer_connect(struct relay_connection *conn)
> int ret;
> struct lttng_viewer_connect reply, msg;
>
> - assert(conn);
> -
> conn->version_check_done = 1;
>
> health_code_update();
> @@ -713,10 +732,13 @@ int viewer_connect(struct relay_connection *conn)
> reply.minor = htobe32(reply.minor);
> if (conn->type == RELAY_VIEWER_COMMAND) {
> /*
> - * Increment outside of htobe64 macro, because can be used more than once
> - * within the macro, and thus the operation may be undefined.
> + * Increment outside of htobe64 macro, because can be
> + * used more than once within the macro, and thus the
> + * operation may be undefined.
> */
> + pthread_mutex_lock(&last_relay_viewer_session_id_lock);
> last_relay_viewer_session_id++;
> + pthread_mutex_unlock(&last_relay_viewer_session_id_lock);
> reply.viewer_session_id = htobe64(last_relay_viewer_session_id);
> }
>
> @@ -738,6 +760,9 @@ end:
>
> /*
> * Send the viewer the list of current sessions.
> + * We need to create a copy of the hash table content because otherwise
> + * we cannot assume the number of entries stays the same between getting
> + * the number of HT elements and iteration over the HT.
> *
> * Return 0 on success or else a negative value.
> */
> @@ -746,156 +771,89 @@ int viewer_list_sessions(struct relay_connection *conn)
> {
> int ret;
> struct lttng_viewer_list_sessions session_list;
> - unsigned long count;
> - long approx_before, approx_after;
> struct lttng_ht_iter iter;
> - struct lttng_viewer_session send_session;
> struct relay_session *session;
> + struct lttng_viewer_session *send_session_buf = NULL;
> + uint32_t buf_count = SESSION_BUF_DEFAULT_COUNT;
> + uint32_t count = 0;
>
> DBG("List sessions received");
>
> - rcu_read_lock();
> - cds_lfht_count_nodes(conn->sessions_ht->ht, &approx_before, &count,
> - &approx_after);
> - session_list.sessions_count = htobe32(count);
> -
> - health_code_update();
> -
> - ret = send_response(conn->sock, &session_list, sizeof(session_list));
> - if (ret < 0) {
> - goto end_unlock;
> + send_session_buf = zmalloc(SESSION_BUF_DEFAULT_COUNT *
> sizeof(*send_session_buf));
> + if (!send_session_buf) {
> + return -1;
> }
>
> - health_code_update();
> -
> - cds_lfht_for_each_entry(conn->sessions_ht->ht, &iter.iter, session,
> + rcu_read_lock();
> + cds_lfht_for_each_entry(sessions_ht->ht, &iter.iter, session,
> session_n.node) {
> - health_code_update();
> -
> - strncpy(send_session.session_name, session->session_name,
> - sizeof(send_session.session_name));
> - strncpy(send_session.hostname, session->hostname,
> - sizeof(send_session.hostname));
> - send_session.id = htobe64(session->id);
> - send_session.live_timer = htobe32(session->live_timer);
> - send_session.clients = htobe32(session->viewer_refcount);
> - send_session.streams = htobe32(session->stream_count);
> + struct lttng_viewer_session *send_session;
>
> health_code_update();
>
> - ret = send_response(conn->sock, &send_session, sizeof(send_session));
> - if (ret < 0) {
> - goto end_unlock;
> - }
> - }
> - health_code_update();
> + if (count >= buf_count) {
> + struct lttng_viewer_session *newbuf;
> + uint32_t new_buf_count = buf_count << 1;
>
> - ret = 0;
> -end_unlock:
> - rcu_read_unlock();
> - return ret;
> -}
> -
> -/*
> - * Check if a connection is attached to a session.
> - * Return 1 if attached, 0 if not attached, a negative value on error.
> - */
> -static
> -int session_attached(struct relay_connection *conn, uint64_t session_id)
> -{
> - struct relay_session *session;
> - int found = 0;
> -
> - if (!conn->viewer_session) {
> - goto end;
> - }
> - cds_list_for_each_entry(session,
> - &conn->viewer_session->sessions_head,
> - viewer_session_list) {
> - if (session->id == session_id) {
> - found = 1;
> - goto end;
> + newbuf = realloc(send_session_buf,
> + new_buf_count * sizeof(*send_session_buf));
> + if (!new_buf_count) {
> + ret = -1;
> + rcu_read_unlock();
> + goto end_free;
> + }
> + send_session_buf = newbuf;
> + buf_count = new_buf_count;
> }
> - }
> -
> -end:
> - return found;
> -}
> -
> -/*
> - * Delete all streams for a specific session ID.
> - */
> -static void destroy_viewer_streams_by_session(struct relay_session *session)
> -{
> - struct relay_viewer_stream *stream;
> - struct lttng_ht_iter iter;
> -
> - assert(session);
> -
> - rcu_read_lock();
> - cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, stream,
> - stream_n.node) {
> - struct ctf_trace *ctf_trace;
> -
> - health_code_update();
> - if (stream->session_id != session->id) {
> - continue;
> + send_session = &send_session_buf[count];
> + strncpy(send_session->session_name, session->session_name,
> + sizeof(send_session->session_name));
> + strncpy(send_session->hostname, session->hostname,
> + sizeof(send_session->hostname));
> + send_session->id = htobe64(session->id);
> + send_session->live_timer = htobe32(session->live_timer);
> + if (session->viewer_attached) {
> + send_session->clients = htobe32(1);
> + } else {
> + send_session->clients = htobe32(0);
> }
> + send_session->streams = htobe32(session->stream_count);
> + count++;
> + }
> + rcu_read_unlock();
>
> - ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
> - stream->path_name);
> - assert(ctf_trace);
> -
> - viewer_stream_delete(stream);
> + session_list.sessions_count = htobe32(count);
>
> - if (stream->metadata_flag) {
> - ctf_trace->metadata_sent = 0;
> - ctf_trace->viewer_metadata_stream = NULL;
> - }
> + health_code_update();
>
> - viewer_stream_destroy(ctf_trace, stream);
> + ret = send_response(conn->sock, &session_list, sizeof(session_list));
> + if (ret < 0) {
> + goto end_free;
> }
> - rcu_read_unlock();
> -}
> -
> -static void try_destroy_streams(struct relay_session *session)
> -{
> - struct ctf_trace *ctf_trace;
> - struct lttng_ht_iter iter;
>
> - assert(session);
> + health_code_update();
>
> - cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace,
> - node.node) {
> - /* Attempt to destroy the ctf trace of that session. */
> - ctf_trace_try_destroy(session, ctf_trace);
> + ret = send_response(conn->sock, send_session_buf,
> + count * sizeof(*send_session_buf));
> + if (ret < 0) {
> + goto end_free;
> }
> -}
> + health_code_update();
>
> -/*
> - * Cleanup a session.
> - */
> -static void cleanup_session(struct relay_connection *conn,
> - struct relay_session *session)
> -{
> - /*
> - * Very important that this is done before destroying the session so we
> - * can put back every viewer stream reference from the ctf_trace.
> - */
> - destroy_viewer_streams_by_session(session);
> - try_destroy_streams(session);
> - cds_list_del(&session->viewer_session_list);
> - session_viewer_try_destroy(conn->sessions_ht, session);
> + ret = 0;
> +end_free:
> + free(send_session_buf);
> + return ret;
> }
>
> /*
> - * Send the viewer the list of current sessions.
> + * Send the viewer the list of current streams.
> */
> static
> int viewer_get_new_streams(struct relay_connection *conn)
> {
> int ret, send_streams = 0;
> - uint32_t nb_created = 0, nb_unsent = 0, nb_streams = 0;
> + uint32_t nb_created = 0, nb_unsent = 0, nb_streams = 0, nb_total = 0;
> struct lttng_viewer_new_streams_request request;
> struct lttng_viewer_new_streams_response response;
> struct relay_session *session;
> @@ -918,15 +876,14 @@ int viewer_get_new_streams(struct relay_connection *conn)
>
> memset(&response, 0, sizeof(response));
>
> - rcu_read_lock();
> - session = session_find_by_id(conn->sessions_ht, session_id);
> + session = session_get_by_id(session_id);
> if (!session) {
> DBG("Relay session %" PRIu64 " not found", session_id);
> response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
> goto send_reply;
> }
>
> - if (!session_attached(conn, session_id)) {
> + if (!viewer_session_is_attached(conn->viewer_session, session)) {
> send_streams = 0;
> response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
> goto send_reply;
> @@ -935,10 +892,10 @@ int viewer_get_new_streams(struct relay_connection *conn)
> send_streams = 1;
> response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
>
> - ret = make_viewer_streams(session, LTTNG_VIEWER_SEEK_LAST, NULL, &nb_unsent,
> + ret = make_viewer_streams(session, LTTNG_VIEWER_SEEK_LAST, &nb_total,
> &nb_unsent,
> &nb_created);
> if (ret < 0) {
> - goto end_unlock;
> + goto end_put_session;
> }
> /* Only send back the newly created streams with the unsent ones. */
> nb_streams = nb_created + nb_unsent;
> @@ -949,15 +906,9 @@ int viewer_get_new_streams(struct relay_connection *conn)
> * it means that the viewer has already received the whole trace
> * for this session and should now close it.
> */
> - if (nb_streams == 0 && session->close_flag) {
> + if (nb_total == 0 && session->connection_closed) {
> send_streams = 0;
> response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP);
> - /*
> - * Remove the session from the attached list of the connection
> - * and try to destroy it.
> - */
> - cds_list_del(&session->viewer_session_list);
> - cleanup_session(conn, session);
> goto send_reply;
> }
>
> @@ -965,30 +916,33 @@ send_reply:
> health_code_update();
> ret = send_response(conn->sock, &response, sizeof(response));
> if (ret < 0) {
> - goto end_unlock;
> + goto end_put_session;
> }
> health_code_update();
>
> /*
> - * Unknown or empty session, just return gracefully, the viewer knows what
> - * is happening.
> + * Unknown or empty session, just return gracefully, the viewer
> + * knows what is happening.
> */
> if (!send_streams || !nb_streams) {
> ret = 0;
> - goto end_unlock;
> + goto end_put_session;
> }
>
> /*
> - * Send stream and *DON'T* ignore the sent flag so every viewer streams
> - * that were not sent from that point will be sent to the viewer.
> + * Send stream and *DON'T* ignore the sent flag so every viewer
> + * streams that were not sent from that point will be sent to
> + * the viewer.
> */
> ret = send_viewer_streams(conn->sock, session, 0);
> if (ret < 0) {
> - goto end_unlock;
> + goto end_put_session;
> }
>
> -end_unlock:
> - rcu_read_unlock();
> +end_put_session:
> + if (session) {
> + session_put(session);
> + }
> error:
> return ret;
> }
> @@ -1005,7 +959,7 @@ int viewer_attach_session(struct relay_connection *conn)
> enum lttng_viewer_seek seek_type;
> struct lttng_viewer_attach_session_request request;
> struct lttng_viewer_attach_session_response response;
> - struct relay_session *session;
> + struct relay_session *session = NULL;
>
> assert(conn);
>
> @@ -1027,37 +981,34 @@ int viewer_attach_session(struct relay_connection *conn)
> goto send_reply;
> }
>
> - rcu_read_lock();
> - session = session_find_by_id(conn->sessions_ht,
> - be64toh(request.session_id));
> + session = session_get_by_id(be64toh(request.session_id));
> if (!session) {
> DBG("Relay session %" PRIu64 " not found",
> be64toh(request.session_id));
> response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
> goto send_reply;
> }
> - session_viewer_attach(session);
> - DBG("Attach session ID %" PRIu64 " received", be64toh(request.session_id));
> + DBG("Attach session ID %" PRIu64 " received",
> + be64toh(request.session_id));
>
> - if (uatomic_read(&session->viewer_refcount) > 1) {
> - DBG("Already a viewer attached");
> - response.status = htobe32(LTTNG_VIEWER_ATTACH_ALREADY);
> - session_viewer_detach(session);
> - goto send_reply;
> - } else if (session->live_timer == 0) {
> + if (session->live_timer == 0) {
> DBG("Not live session");
> response.status = htobe32(LTTNG_VIEWER_ATTACH_NOT_LIVE);
> goto send_reply;
> - } else {
> - send_streams = 1;
> - response.status = htobe32(LTTNG_VIEWER_ATTACH_OK);
> - cds_list_add(&session->viewer_session_list,
> - &conn->viewer_session->sessions_head);
> + }
> +
> + send_streams = 1;
> + ret = viewer_session_attach(conn->viewer_session, session);
> + if (ret) {
> + DBG("Already a viewer attached");
> + response.status = htobe32(LTTNG_VIEWER_ATTACH_ALREADY);
> + goto send_reply;
> }
>
> switch (be32toh(request.seek)) {
> case LTTNG_VIEWER_SEEK_BEGINNING:
> case LTTNG_VIEWER_SEEK_LAST:
> + response.status = htobe32(LTTNG_VIEWER_ATTACH_OK);
> seek_type = be32toh(request.seek);
> break;
> default:
> @@ -1069,7 +1020,7 @@ int viewer_attach_session(struct relay_connection *conn)
>
> ret = make_viewer_streams(session, seek_type, &nb_streams, NULL, NULL);
> if (ret < 0) {
> - goto end_unlock;
> + goto end_put_session;
> }
> response.streams_count = htobe32(nb_streams);
>
> @@ -1077,27 +1028,29 @@ send_reply:
> health_code_update();
> ret = send_response(conn->sock, &response, sizeof(response));
> if (ret < 0) {
> - goto end_unlock;
> + goto end_put_session;
> }
> health_code_update();
>
> /*
> - * Unknown or empty session, just return gracefully, the viewer knows what
> - * is happening.
> + * Unknown or empty session, just return gracefully, the viewer
> + * knows what is happening.
> */
> if (!send_streams || !nb_streams) {
> ret = 0;
> - goto end_unlock;
> + goto end_put_session;
> }
>
> /* Send stream and ignore the sent flag. */
> ret = send_viewer_streams(conn->sock, session, 1);
> if (ret < 0) {
> - goto end_unlock;
> + goto end_put_session;
> }
>
> -end_unlock:
> - rcu_read_unlock();
> +end_put_session:
> + if (session) {
> + session_put(session);
> + }
> error:
> return ret;
> }
> @@ -1105,38 +1058,42 @@ error:
> /*
> * Open the index file if needed for the given vstream.
> *
> - * If an index file is successfully opened, the index_read_fd of the stream is
> - * set with it.
> + * If an index file is successfully opened, the vstream index_fd set with
> + * it.
> *
> * Return 0 on success, a negative value on error (-ENOENT if not ready yet).
> + *
> + * Called with both rstream and vstream locks held.
> */
> static int try_open_index(struct relay_viewer_stream *vstream,
> struct relay_stream *rstream)
> {
> int ret = 0;
>
> - assert(vstream);
> - assert(rstream);
> -
> - if (vstream->index_read_fd >= 0) {
> + if (vstream->index_fd) {
> goto end;
> }
>
> /*
> - * First time, we open the index file and at least one index is ready. The
> - * race between the read and write of the total_index_received is
> - * acceptable here since the client will be notified to simply come back
> - * and get the next index.
> + * First time, we open the index file and at least one index is ready.
> */
> if (rstream->total_index_received <= 0) {
> ret = -ENOENT;
> goto end;
> }
> ret = index_open(vstream->path_name, vstream->channel_name,
> - vstream->tracefile_count, vstream->tracefile_count_current);
> + vstream->stream->tracefile_count,
> + vstream->current_tracefile_id);
> if (ret >= 0) {
> - vstream->index_read_fd = ret;
> - ret = 0;
> + vstream->index_fd = stream_fd_create(ret);
> + if (!vstream->index_fd) {
> + if (close(ret)) {
> + PERROR("close");
> + }
> + ret = -1;
> + } else {
> + ret = 0;
> + }
> goto end;
> }
>
> @@ -1145,13 +1102,15 @@ end:
> }
>
> /*
> - * Check the status of the index for the given stream. This function updates
> - * the index structure if needed and can destroy the vstream also for the HUP
> - * situation.
> + * Check the status of the index for the given stream. This function
> + * updates the index structure if needed and can put (close) the vstream
> + * in the HUP situation.
> + *
> + * Return 0 means that we can proceed with the index. A value of 1 means
> + * that the index has been updated and is ready to be send to the
> + * client. A negative value indicates an error that can't be handled.
> *
> - * Return 0 means that we can proceed with the index. A value of 1 means that
> - * the index has been updated and is ready to be send to the client. A negative
> - * value indicates an error that can't be handled.
> + * Called with both rstream and vstream locks held.
> */
> static int check_index_status(struct relay_viewer_stream *vstream,
> struct relay_stream *rstream, struct ctf_trace *trace,
> @@ -1159,68 +1118,104 @@ static int check_index_status(struct
> relay_viewer_stream *vstream,
> {
> int ret;
>
> - assert(vstream);
> - assert(rstream);
> - assert(index);
> - assert(trace);
> -
> - if (!rstream->close_flag) {
> - /* Rotate on abort (overwrite). */
> - if (vstream->abort_flag) {
> - DBG("Viewer stream %" PRIu64 " rotate because of overwrite",
> - vstream->stream_handle);
> - ret = viewer_stream_rotate(vstream, rstream);
> + if (trace->session->connection_closed
> + && rstream->total_index_received
> + == vstream->last_sent_index) {
> + /* Last index sent and session connection is closed. */
> + index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
> + goto hup;
> + } else if (rstream->beacon_ts_end != -1ULL &&
> + rstream->total_index_received
> + == vstream->last_sent_index) {
> + /*
> + * We've received a synchronization beacon and the last index
> + * available has been sent, the index for now is inactive.
> + */
> + index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
> + index->timestamp_end = htobe64(rstream->beacon_ts_end);
> + index->stream_id = htobe64(rstream->ctf_stream_id);
> + goto index_ready;
> + } else if (rstream->total_index_received <= vstream->last_sent_index) {
> + /* This actually checks the case where recv == last_sent. */
> + index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
> + goto index_ready;
> + } else if (!viewer_stream_is_tracefile_id_readable(vstream,
> + vstream->current_tracefile_id)) {
> + /*
> + * The producer has overwritten our current file. We
> + * need to rotate.
> + */
> + DBG("Viewer stream %" PRIu64 " rotation due to overwrite",
> + vstream->stream->stream_handle);
> + ret = viewer_stream_rotate(vstream);
> + if (ret < 0) {
> + goto end;
> + } else if (ret == 1) {
> + /* EOF across entire stream. */
> + index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
> + goto hup;
> + } else if (!viewer_stream_is_tracefile_id_readable(vstream,
> + vstream->current_tracefile_id)) {
> + /*
> + * We rotated into a file being concurrently
> + * updated, need to retry later.
> + */
> + index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
> + goto index_ready;
> + }
> + /* ret == 0 means successful so we continue. */
> + ret = 0;
> + } else {
> + ssize_t read_ret;
> + char tmp[1];
> +
> + /*
> + * Use EOF on current index file to find out when we
> + * need to rotate.
> + */
> + read_ret = lttng_read(vstream->index_fd->fd, tmp, 1);
> + if (read_ret == 1) {
> + off_t seek_ret;
> +
> + /* There is still data to read. Rewind position. */
> + seek_ret = lseek(vstream->index_fd->fd, -1, SEEK_CUR);
> + if (seek_ret < 0) {
> + ret = -1;
> + goto end;
> + }
> + ret = 0;
> + } else if (read_ret == 0) {
> + /* EOF. We need to rotate. */
> + DBG("Viewer stream %" PRIu64 " rotation due to EOF",
> + vstream->stream->stream_handle);
> + ret = viewer_stream_rotate(vstream);
> if (ret < 0) {
> - goto error;
> + goto end;
> } else if (ret == 1) {
> - /* EOF */
> + /* EOF across entire stream. */
> index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
> goto hup;
> - }
> - /* ret == 0 means successful so we continue. */
> - }
> -
> - /* Check if we are in the same trace file at this point. */
> - if (rstream->tracefile_count_current == vstream->tracefile_count_current) {
> - if (rstream->beacon_ts_end != -1ULL &&
> - vstream->last_sent_index == rstream->total_index_received) {
> + } else if (!viewer_stream_is_tracefile_id_readable(vstream,
> + vstream->current_tracefile_id)) {
> /*
> - * We've received a synchronization beacon and the last index
> - * available has been sent, the index for now is inactive.
> - */
> - index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
> - index->timestamp_end = htobe64(rstream->beacon_ts_end);
> - index->stream_id = htobe64(rstream->ctf_stream_id);
> - goto index_ready;
> - } else if (rstream->total_index_received <= vstream->last_sent_index
> - && !vstream->close_write_flag) {
> - /*
> - * Reader and writer are working in the same tracefile, so we care
> - * about the number of index received and sent. Otherwise, we read
> - * up to EOF.
> + * We rotated into a file being concurrently
> + * updated, need to retry later.
> */
> index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
> goto index_ready;
> }
> + /* ret == 0 means successful so we continue. */
> + ret = 0;
> + } else {
> + /* Error reading index. */
> + ret = -1;
> }
> - /* Nothing to do with the index, continue with it. */
> - ret = 0;
> - } else if (rstream->close_flag && vstream->close_write_flag &&
> - vstream->total_index_received == vstream->last_sent_index) {
> - /* Last index sent and current tracefile closed in write */
> - index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
> - goto hup;
> - } else {
> - vstream->close_write_flag = 1;
> - ret = 0;
> }
> -
> -error:
> +end:
> return ret;
>
> hup:
> - viewer_stream_delete(vstream);
> - viewer_stream_destroy(trace, vstream);
> + viewer_stream_put(vstream);
> index_ready:
> return 1;
> }
> @@ -1238,10 +1233,10 @@ int viewer_get_next_index(struct relay_connection *conn)
> struct lttng_viewer_get_next_index request_index;
> struct lttng_viewer_index viewer_index;
> struct ctf_packet_index packet_index;
> - struct relay_viewer_stream *vstream;
> - struct relay_stream *rstream;
> - struct ctf_trace *ctf_trace;
> - struct relay_session *session;
> + struct relay_viewer_stream *vstream = NULL;
> + struct relay_stream *rstream = NULL;
> + struct ctf_trace *ctf_trace = NULL;
> + struct relay_viewer_stream *metadata_viewer_stream = NULL;
>
> assert(conn);
>
> @@ -1255,42 +1250,41 @@ int viewer_get_next_index(struct relay_connection *conn)
> }
> health_code_update();
>
> - rcu_read_lock();
> - vstream = viewer_stream_find_by_id(be64toh(request_index.stream_id));
> + vstream = viewer_stream_get_by_id(be64toh(request_index.stream_id));
> if (!vstream) {
> ret = -1;
> - goto end_unlock;
> + goto end;
> }
>
> - session = session_find_by_id(conn->sessions_ht, vstream->session_id);
> - if (!session) {
> - ret = -1;
> - goto end_unlock;
> - }
> + /* Use back. ref. Protected by refcounts. */
> + rstream = vstream->stream;
> + ctf_trace = rstream->trace;
>
> - ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
> vstream->path_name);
> - assert(ctf_trace);
> + /* metadata_viewer_stream may be NULL. */
> + metadata_viewer_stream =
> + ctf_trace_get_viewer_metadata_stream(ctf_trace);
>
> memset(&viewer_index, 0, sizeof(viewer_index));
>
> + pthread_mutex_lock(&rstream->lock);
> + pthread_mutex_lock(&vstream->lock);
> +
> /*
> * The viewer should not ask for index on metadata stream.
> */
> - if (vstream->metadata_flag) {
> + if (rstream->is_metadata) {
> viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
> goto send_reply;
> }
>
> - rstream = stream_find_by_id(relay_streams_ht, vstream->stream_handle);
> - assert(rstream);
> -
> /* Try to open an index if one is needed for that stream. */
> ret = try_open_index(vstream, rstream);
> if (ret < 0) {
> if (ret == -ENOENT) {
> /*
> - * The index is created only when the first data packet arrives, it
> - * might not be ready at the beginning of the session
> + * The index is created only when the first data
> + * packet arrives, it might not be ready at the
> + * beginning of the session
> */
> viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
> } else {
> @@ -1300,80 +1294,71 @@ int viewer_get_next_index(struct relay_connection *conn)
> goto send_reply;
> }
>
> - pthread_mutex_lock(&rstream->viewer_stream_rotation_lock);
> ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index);
> - pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
> if (ret < 0) {
> - goto end_unlock;
> + goto error_put;
> } else if (ret == 1) {
> /*
> - * This means the viewer index data structure has been populated by the
> - * check call thus we now send back the reply to the client.
> + * This means the viewer index data structure has been
> + * populated by the check call thus we now send back the
> + * reply to the client.
> */
> goto send_reply;
> }
> /* At this point, ret MUST be 0 thus we continue with the get. */
> assert(!ret);
>
> - if (!ctf_trace->metadata_received ||
> - ctf_trace->metadata_received > ctf_trace->metadata_sent) {
> - viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
> + /*
> + * vstream->stream_fd may be NULL if it has been closed by
> + * tracefile rotation, or if we are at the beginning of the
> + * stream. We open the data stream file here to protect against
> + * overwrite caused by tracefile rotation (in association with
> + * unlink performed before overwrite).
> + */
> + if (!vstream->stream_fd) {
> + char fullpath[PATH_MAX];
> +
> + if (vstream->stream->tracefile_count > 0) {
> + ret = snprintf(fullpath, PATH_MAX, "%s/%s_%" PRIu64,
> + vstream->path_name,
> + vstream->channel_name,
> + vstream->current_tracefile_id);
> + } else {
> + ret = snprintf(fullpath, PATH_MAX, "%s/%s",
> + vstream->path_name,
> + vstream->channel_name);
> + }
> + if (ret < 0) {
> + goto error_put;
> + }
> + ret = open(fullpath, O_RDONLY);
> + if (ret < 0) {
> + PERROR("Relay opening trace file");
> + goto error_put;
> + }
> + vstream->stream_fd = stream_fd_create(ret);
> + if (!vstream->stream_fd) {
> + if (close(ret)) {
> + PERROR("close");
> + }
> + goto error_put;
> + }
> }
>
> ret = check_new_streams(conn);
> if (ret < 0) {
> - goto end_unlock;
> + viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
> + goto send_reply;
> } else if (ret == 1) {
> viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
> }
>
> - pthread_mutex_lock(&rstream->viewer_stream_rotation_lock);
> - pthread_mutex_lock(&vstream->overwrite_lock);
> - if (vstream->abort_flag) {
> - /* The file is being overwritten by the writer, we cannot use it. */
> - pthread_mutex_unlock(&vstream->overwrite_lock);
> - ret = viewer_stream_rotate(vstream, rstream);
> - pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
> - if (ret < 0) {
> - goto end_unlock;
> - } else if (ret == 1) {
> - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
> - viewer_stream_delete(vstream);
> - viewer_stream_destroy(ctf_trace, vstream);
> - } else {
> - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
> - }
> - goto send_reply;
> - }
> -
> - read_ret = lttng_read(vstream->index_read_fd, &packet_index,
> + read_ret = lttng_read(vstream->index_fd->fd, &packet_index,
> sizeof(packet_index));
> - pthread_mutex_unlock(&vstream->overwrite_lock);
> - pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
> - if (read_ret < 0) {
> - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
> - viewer_stream_delete(vstream);
> - viewer_stream_destroy(ctf_trace, vstream);
> - goto send_reply;
> - } else if (read_ret < sizeof(packet_index)) {
> - pthread_mutex_lock(&rstream->viewer_stream_rotation_lock);
> - if (vstream->close_write_flag) {
> - ret = viewer_stream_rotate(vstream, rstream);
> - if (ret < 0) {
> - pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
> - goto end_unlock;
> - } else if (ret == 1) {
> - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
> - viewer_stream_delete(vstream);
> - viewer_stream_destroy(ctf_trace, vstream);
> - } else {
> - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
> - }
> - } else {
> - ERR("Relay reading index file %d", vstream->index_read_fd);
> - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
> - }
> - pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
> + if (read_ret < sizeof(packet_index)) {
> + ERR("Relay reading index file %d returned %zd",
> + vstream->index_fd->fd, read_ret);
> + viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
> goto send_reply;
> } else {
> viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_OK);
> @@ -1383,6 +1368,9 @@ int viewer_get_next_index(struct relay_connection *conn)
> /*
> * Indexes are stored in big endian, no need to switch before sending.
> */
> + DBG("Sending viewer index for stream %" PRIu64 " offset %" PRIu64,
> + rstream->stream_handle,
> + be64toh(packet_index.offset));
> viewer_index.offset = packet_index.offset;
> viewer_index.packet_size = packet_index.packet_size;
> viewer_index.content_size = packet_index.content_size;
> @@ -1392,22 +1380,53 @@ int viewer_get_next_index(struct relay_connection *conn)
> viewer_index.stream_id = packet_index.stream_id;
>
> send_reply:
> + pthread_mutex_unlock(&vstream->lock);
> + pthread_mutex_unlock(&rstream->lock);
> +
> + if (metadata_viewer_stream) {
> + pthread_mutex_lock(&metadata_viewer_stream->stream->lock);
> + pthread_mutex_lock(&metadata_viewer_stream->lock);
> + DBG("get next index metadata check: recv %" PRIu64
> + " sent %" PRIu64,
> + metadata_viewer_stream->stream->metadata_received,
> + metadata_viewer_stream->metadata_sent);
> + if (!metadata_viewer_stream->stream->metadata_received ||
> + metadata_viewer_stream->stream->metadata_received >
> + metadata_viewer_stream->metadata_sent) {
> + viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
> + }
> + pthread_mutex_unlock(&metadata_viewer_stream->lock);
> + pthread_mutex_unlock(&metadata_viewer_stream->stream->lock);
> + }
> +
> viewer_index.flags = htobe32(viewer_index.flags);
> health_code_update();
>
> ret = send_response(conn->sock, &viewer_index, sizeof(viewer_index));
> if (ret < 0) {
> - goto end_unlock;
> + goto end;
> }
> health_code_update();
>
> DBG("Index %" PRIu64 " for stream %" PRIu64 " sent",
> - vstream->last_sent_index, vstream->stream_handle);
> -
> -end_unlock:
> - rcu_read_unlock();
> -
> + vstream->last_sent_index,
> + vstream->stream->stream_handle);
> end:
> + if (metadata_viewer_stream) {
> + viewer_stream_put(metadata_viewer_stream);
> + }
> + if (vstream) {
> + viewer_stream_put(vstream);
> + }
> + return ret;
> +
> +error_put:
> + pthread_mutex_unlock(&vstream->lock);
> + pthread_mutex_unlock(&rstream->lock);
> + if (metadata_viewer_stream) {
> + viewer_stream_put(metadata_viewer_stream);
> + }
> + viewer_stream_put(vstream);
> return ret;
> }
>
> @@ -1425,17 +1444,16 @@ int viewer_get_packet(struct relay_connection *conn)
> ssize_t read_len;
> struct lttng_viewer_get_packet get_packet_info;
> struct lttng_viewer_trace_packet reply;
> - struct relay_viewer_stream *stream;
> - struct relay_session *session;
> + struct relay_viewer_stream *vstream = NULL;
> struct ctf_trace *ctf_trace;
> -
> - assert(conn);
> + struct relay_viewer_stream *metadata_viewer_stream = NULL;
>
> DBG2("Relay get data packet");
>
> health_code_update();
>
> - ret = recv_request(conn->sock, &get_packet_info, sizeof(get_packet_info));
> + ret = recv_request(conn->sock, &get_packet_info,
> + sizeof(get_packet_info));
> if (ret < 0) {
> goto end;
> }
> @@ -1444,59 +1462,53 @@ int viewer_get_packet(struct relay_connection *conn)
> /* From this point on, the error label can be reached. */
> memset(&reply, 0, sizeof(reply));
>
> - rcu_read_lock();
> - stream = viewer_stream_find_by_id(be64toh(get_packet_info.stream_id));
> - if (!stream) {
> + vstream = viewer_stream_get_by_id(be64toh(get_packet_info.stream_id));
> + if (!vstream) {
> goto error;
> }
>
> - session = session_find_by_id(conn->sessions_ht, stream->session_id);
> - if (!session) {
> - ret = -1;
> - goto error;
> - }
> + ctf_trace = vstream->stream->trace;
>
> - ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
> - stream->path_name);
> - assert(ctf_trace);
> + /* metadata_viewer_stream may be NULL. */
> + metadata_viewer_stream =
> + ctf_trace_get_viewer_metadata_stream(ctf_trace);
>
> - /*
> - * First time we read this stream, we need open the tracefile, we should
> - * only arrive here if an index has already been sent to the viewer, so the
> - * tracefile must exist, if it does not it is a fatal error.
> - */
> - if (stream->read_fd < 0) {
> - char fullpath[PATH_MAX];
> + if (metadata_viewer_stream) {
> + bool get_packet_err = false;
>
> - if (stream->tracefile_count > 0) {
> - ret = snprintf(fullpath, PATH_MAX, "%s/%s_%" PRIu64, stream->path_name,
> - stream->channel_name,
> - stream->tracefile_count_current);
> - } else {
> - ret = snprintf(fullpath, PATH_MAX, "%s/%s", stream->path_name,
> - stream->channel_name);
> + pthread_mutex_lock(&metadata_viewer_stream->stream->lock);
> + pthread_mutex_lock(&metadata_viewer_stream->lock);
> + DBG("get packet metadata check: recv %" PRIu64 " sent %" PRIu64,
> + metadata_viewer_stream->stream->metadata_received,
> + metadata_viewer_stream->metadata_sent);
> + if (!metadata_viewer_stream->stream->metadata_received ||
> + metadata_viewer_stream->stream->metadata_received >
> + metadata_viewer_stream->metadata_sent) {
> + get_packet_err = true;
> }
> - if (ret < 0) {
> - goto error;
> - }
> - ret = open(fullpath, O_RDONLY);
> - if (ret < 0) {
> - PERROR("Relay opening trace file");
> - goto error;
> + pthread_mutex_unlock(&metadata_viewer_stream->lock);
> + pthread_mutex_unlock(&metadata_viewer_stream->stream->lock);
> + viewer_stream_put(metadata_viewer_stream);
> + if (get_packet_err) {
> + reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
> + reply.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
> + goto send_reply_nolock;
> }
> - stream->read_fd = ret;
> - }
> -
> - if (!ctf_trace->metadata_received ||
> - ctf_trace->metadata_received > ctf_trace->metadata_sent) {
> - reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
> - reply.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
> - goto send_reply;
> }
>
> + pthread_mutex_lock(&vstream->lock);
> + /*
> + * The vstream->stream_fd used here has been opened by
> + * get_next_index. It is opened there because this is what
> + * allows us to grab a reference to the file with stream lock
> + * held, thus protecting us against overwrite caused by
> + * tracefile rotation. Since tracefile rotation unlinks the old
> + * data file, we are ensured that we won't have our data
> + * overwritten under us.
> + */
> ret = check_new_streams(conn);
> if (ret < 0) {
> - goto end_unlock;
> + goto end_free;
> } else if (ret == 1) {
> reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
> reply.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
> @@ -1510,34 +1522,19 @@ int viewer_get_packet(struct relay_connection *conn)
> goto error;
> }
>
> - ret = lseek(stream->read_fd, be64toh(get_packet_info.offset), SEEK_SET);
> + ret = lseek(vstream->stream_fd->fd, be64toh(get_packet_info.offset),
> + SEEK_SET);
> if (ret < 0) {
> - /*
> - * If the read fd was closed by the streaming side, the
> - * abort_flag will be set to 1, otherwise it is an error.
> - */
> - if (stream->abort_flag == 0) {
> - PERROR("lseek");
> - goto error;
> - }
> - reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_EOF);
> - goto send_reply;
> + PERROR("lseek fd %d to offset %" PRIu64, vstream->stream_fd->fd,
> + be64toh(get_packet_info.offset));
> + goto error;
> }
> - read_len = lttng_read(stream->read_fd, data, len);
> + read_len = lttng_read(vstream->stream_fd->fd, data, len);
> if (read_len < len) {
> - /*
> - * If the read fd was closed by the streaming side, the
> - * abort_flag will be set to 1, otherwise it is an error.
> - */
> - if (stream->abort_flag == 0) {
> - PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64,
> - stream->read_fd,
> - be64toh(get_packet_info.offset));
> - goto error;
> - } else {
> - reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_EOF);
> - goto send_reply;
> - }
> + PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64,
> + vstream->stream_fd->fd,
> + be64toh(get_packet_info.offset));
> + goto error;
> }
> reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
> reply.len = htobe32(len);
> @@ -1548,13 +1545,17 @@ error:
> reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
>
> send_reply:
> + if (vstream) {
> + pthread_mutex_unlock(&vstream->lock);
> + }
> +send_reply_nolock:
> reply.flags = htobe32(reply.flags);
>
> health_code_update();
>
> ret = send_response(conn->sock, &reply, sizeof(reply));
> if (ret < 0) {
> - goto end_unlock;
> + goto end_free;
> }
> health_code_update();
>
> @@ -1562,7 +1563,7 @@ send_reply:
> health_code_update();
> ret = send_response(conn->sock, data, len);
> if (ret < 0) {
> - goto end_unlock;
> + goto end_free;
> }
> health_code_update();
> }
> @@ -1570,11 +1571,12 @@ send_reply:
> DBG("Sent %u bytes for stream %" PRIu64, len,
> be64toh(get_packet_info.stream_id));
>
> -end_unlock:
> +end_free:
> free(data);
> - rcu_read_unlock();
> -
> end:
> + if (vstream) {
> + viewer_stream_put(vstream);
> + }
> return ret;
> }
>
> @@ -1592,9 +1594,7 @@ int viewer_get_metadata(struct relay_connection *conn)
> char *data = NULL;
> struct lttng_viewer_get_metadata request;
> struct lttng_viewer_metadata_packet reply;
> - struct relay_viewer_stream *stream;
> - struct ctf_trace *ctf_trace;
> - struct relay_session *session;
> + struct relay_viewer_stream *vstream = NULL;
>
> assert(conn);
>
> @@ -1610,36 +1610,30 @@ int viewer_get_metadata(struct relay_connection *conn)
>
> memset(&reply, 0, sizeof(reply));
>
> - rcu_read_lock();
> - stream = viewer_stream_find_by_id(be64toh(request.stream_id));
> - if (!stream || !stream->metadata_flag) {
> - ERR("Invalid metadata stream");
> - goto error;
> + vstream = viewer_stream_get_by_id(be64toh(request.stream_id));
> + if (!vstream) {
> + reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
> + goto send_reply;
> }
> -
> - session = session_find_by_id(conn->sessions_ht, stream->session_id);
> - if (!session) {
> - ret = -1;
> + if (!vstream->stream->is_metadata) {
> + ERR("Invalid metadata stream");
> goto error;
> }
>
> - ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
> - stream->path_name);
> - assert(ctf_trace);
> - assert(ctf_trace->metadata_sent <= ctf_trace->metadata_received);
> + assert(vstream->metadata_sent <= vstream->stream->metadata_received);
>
> - len = ctf_trace->metadata_received - ctf_trace->metadata_sent;
> + len = vstream->stream->metadata_received - vstream->metadata_sent;
> if (len == 0) {
> reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
> goto send_reply;
> }
>
> /* first time, we open the metadata file */
> - if (stream->read_fd < 0) {
> + if (!vstream->stream_fd) {
> char fullpath[PATH_MAX];
>
> - ret = snprintf(fullpath, PATH_MAX, "%s/%s", stream->path_name,
> - stream->channel_name);
> + ret = snprintf(fullpath, PATH_MAX, "%s/%s", vstream->path_name,
> + vstream->channel_name);
> if (ret < 0) {
> goto error;
> }
> @@ -1648,7 +1642,13 @@ int viewer_get_metadata(struct relay_connection *conn)
> PERROR("Relay opening metadata file");
> goto error;
> }
> - stream->read_fd = ret;
> + vstream->stream_fd = stream_fd_create(ret);
> + if (!vstream->stream_fd) {
> + if (close(ret)) {
> + PERROR("close");
> + }
> + goto error;
> + }
> }
>
> reply.len = htobe64(len);
> @@ -1658,13 +1658,20 @@ int viewer_get_metadata(struct relay_connection *conn)
> goto error;
> }
>
> - read_len = lttng_read(stream->read_fd, data, len);
> + read_len = lttng_read(vstream->stream_fd->fd, data, len);
> if (read_len < len) {
> PERROR("Relay reading metadata file");
> goto error;
> }
> - ctf_trace->metadata_sent += read_len;
> + vstream->metadata_sent += read_len;
> + if (vstream->metadata_sent == vstream->stream->metadata_received
> + && vstream->stream->closed) {
> + /* Release ownership for the viewer metadata stream. */
> + viewer_stream_put(vstream);
> + }
> +
> reply.status = htobe32(LTTNG_VIEWER_METADATA_OK);
> +
> goto send_reply;
>
> error:
> @@ -1674,14 +1681,14 @@ send_reply:
> health_code_update();
> ret = send_response(conn->sock, &reply, sizeof(reply));
> if (ret < 0) {
> - goto end_unlock;
> + goto end_free;
> }
> health_code_update();
>
> if (len > 0) {
> ret = send_response(conn->sock, data, len);
> if (ret < 0) {
> - goto end_unlock;
> + goto end_free;
> }
> }
>
> @@ -1690,10 +1697,12 @@ send_reply:
>
> DBG("Metadata sent");
>
> -end_unlock:
> +end_free:
> free(data);
> - rcu_read_unlock();
> end:
> + if (vstream) {
> + viewer_stream_put(vstream);
> + }
> return ret;
> }
>
> @@ -1712,13 +1721,12 @@ int viewer_create_session(struct relay_connection *conn)
>
> memset(&resp, 0, sizeof(resp));
> resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_OK);
> - conn->viewer_session = zmalloc(sizeof(*conn->viewer_session));
> + conn->viewer_session = viewer_session_create();
> if (!conn->viewer_session) {
> ERR("Allocation viewer session");
> resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_ERR);
> goto send_reply;
> }
> - CDS_INIT_LIST_HEAD(&conn->viewer_session->sessions_head);
>
> send_reply:
> health_code_update();
> @@ -1757,9 +1765,6 @@ int process_control(struct lttng_viewer_cmd *recv_hdr,
> int ret = 0;
> uint32_t msg_value;
>
> - assert(recv_hdr);
> - assert(conn);
> -
> msg_value = be32toh(recv_hdr->cmd);
>
> /*
> @@ -1798,7 +1803,8 @@ int process_control(struct lttng_viewer_cmd *recv_hdr,
> ret = viewer_create_session(conn);
> break;
> default:
> - ERR("Received unknown viewer command (%u)", be32toh(recv_hdr->cmd));
> + ERR("Received unknown viewer command (%u)",
> + be32toh(recv_hdr->cmd));
> live_relay_unknown_command(conn);
> ret = -1;
> goto end;
> @@ -1813,8 +1819,6 @@ void cleanup_connection_pollfd(struct lttng_poll_event
> *events, int pollfd)
> {
> int ret;
>
> - assert(events);
> -
> (void) lttng_poll_del(events, pollfd);
>
> ret = close(pollfd);
> @@ -1824,38 +1828,6 @@ void cleanup_connection_pollfd(struct lttng_poll_event
> *events, int pollfd)
> }
>
> /*
> - * Delete and destroy a connection.
> - *
> - * RCU read side lock MUST be acquired.
> - */
> -static void destroy_connection(struct lttng_ht *relay_connections_ht,
> - struct relay_connection *conn)
> -{
> - struct relay_session *session, *tmp_session;
> -
> - assert(relay_connections_ht);
> - assert(conn);
> -
> - connection_delete(relay_connections_ht, conn);
> -
> - if (!conn->viewer_session) {
> - goto end;
> - }
> -
> - rcu_read_lock();
> - cds_list_for_each_entry_safe(session, tmp_session,
> - &conn->viewer_session->sessions_head,
> - viewer_session_list) {
> - DBG("Cleaning connection of session ID %" PRIu64, session->id);
> - cleanup_session(conn, session);
> - }
> - rcu_read_unlock();
> -
> -end:
> - connection_destroy(conn);
> -}
> -
> -/*
> * This thread does the actual work
> */
> static
> @@ -1867,8 +1839,6 @@ void *thread_worker(void *data)
> struct lttng_ht *relay_connections_ht;
> struct lttng_ht_iter iter;
> struct lttng_viewer_cmd recv_hdr;
> - struct relay_local_data *relay_ctx = (struct relay_local_data *) data;
> - struct lttng_ht *sessions_ht = relay_ctx->sessions_ht;
> struct relay_connection *destroy_conn;
>
> DBG("[thread] Live viewer relay worker started");
> @@ -1952,50 +1922,50 @@ restart:
> } else if (revents & LPOLLIN) {
> struct relay_connection *conn;
>
> - ret = lttng_read(live_conn_pipe[0], &conn, sizeof(conn));
> + ret = lttng_read(live_conn_pipe[0],
> + &conn, sizeof(conn));
> if (ret < 0) {
> goto error;
> }
> - conn->sessions_ht = sessions_ht;
> - connection_init(conn);
> lttng_poll_add(&events, conn->sock->fd,
> LPOLLIN | LPOLLRDHUP);
> - rcu_read_lock();
> - lttng_ht_add_unique_ulong(relay_connections_ht,
> - &conn->sock_n);
> - rcu_read_unlock();
> - DBG("Connection socket %d added", conn->sock->fd);
> + connection_ht_add(relay_connections_ht, conn);
> + DBG("Connection socket %d added to poll", conn->sock->fd);
> }
> } else {
> struct relay_connection *conn;
>
> - rcu_read_lock();
> - conn = connection_find_by_sock(relay_connections_ht, pollfd);
> - /* If not found, there is a synchronization issue. */
> - assert(conn);
> + conn = connection_get_by_sock(relay_connections_ht, pollfd);
> + if (!conn) {
> + continue;
> + }
>
> if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> cleanup_connection_pollfd(&events, pollfd);
> - destroy_connection(relay_connections_ht, conn);
> + /* Put "open" ownership reference. */
> + connection_put(conn);
> } else if (revents & LPOLLIN) {
> ret = conn->sock->ops->recvmsg(conn->sock, &recv_hdr,
> sizeof(recv_hdr), 0);
> if (ret <= 0) {
> /* Connection closed */
> cleanup_connection_pollfd(&events, pollfd);
> - destroy_connection(relay_connections_ht, conn);
> + /* Put "open" ownership reference. */
> + connection_put(conn);
> DBG("Viewer control conn closed with %d", pollfd);
> } else {
> ret = process_control(&recv_hdr, conn);
> if (ret < 0) {
> /* Clear the session on error. */
> cleanup_connection_pollfd(&events, pollfd);
> - destroy_connection(relay_connections_ht, conn);
> + /* Put "open" ownership reference. */
> + connection_put(conn);
> DBG("Viewer connection closed with %d", pollfd);
> }
> }
> }
> - rcu_read_unlock();
> + /* Put "get_by_sock" reference. */
> + connection_put(conn);
> }
> }
> }
> @@ -2010,7 +1980,7 @@ error:
> destroy_conn,
> sock_n.node) {
> health_code_update();
> - destroy_connection(relay_connections_ht, destroy_conn);
> + connection_put(destroy_conn);
> }
> rcu_read_unlock();
> error_poll_create:
> @@ -2078,8 +2048,7 @@ int relayd_live_join(void)
> /*
> * main
> */
> -int relayd_live_create(struct lttng_uri *uri,
> - struct relay_local_data *relay_ctx)
> +int relayd_live_create(struct lttng_uri *uri)
> {
> int ret = 0, retval = 0;
> void *status;
> @@ -2129,7 +2098,7 @@ int relayd_live_create(struct lttng_uri *uri,
>
> /* Setup the worker thread */
> ret = pthread_create(&live_worker_thread, NULL,
> - thread_worker, relay_ctx);
> + thread_worker, NULL);
> if (ret) {
> errno = ret;
> PERROR("pthread_create viewer worker");
> diff --git a/src/bin/lttng-relayd/live.h b/src/bin/lttng-relayd/live.h
> index 5db940b..2b8a3a0 100644
> --- a/src/bin/lttng-relayd/live.h
> +++ b/src/bin/lttng-relayd/live.h
> @@ -1,6 +1,10 @@
> +#ifndef LTTNG_RELAYD_LIVE_H
> +#define LTTNG_RELAYD_LIVE_H
> +
> /*
> * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify it
> * under the terms of the GNU General Public License, version 2 only, as
> @@ -16,15 +20,11 @@
> * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
> */
>
> -#ifndef LTTNG_RELAYD_LIVE_H
> -#define LTTNG_RELAYD_LIVE_H
> -
> #include <common/uri.h>
>
> #include "lttng-relayd.h"
>
> -int relayd_live_create(struct lttng_uri *live_uri,
> - struct relay_local_data *relay_ctx);
> +int relayd_live_create(struct lttng_uri *live_uri);
> int relayd_live_stop(void);
> int relayd_live_join(void);
>
> diff --git a/src/bin/lttng-relayd/lttng-relayd.h
> b/src/bin/lttng-relayd/lttng-relayd.h
> index 245c5fd..889071c 100644
> --- a/src/bin/lttng-relayd/lttng-relayd.h
> +++ b/src/bin/lttng-relayd/lttng-relayd.h
> @@ -1,6 +1,10 @@
> +#ifndef LTTNG_RELAYD_H
> +#define LTTNG_RELAYD_H
> +
> /*
> * Copyright (C) 2012 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify
> * it under the terms of the GNU General Public License, version 2 only,
> @@ -16,9 +20,6 @@
> * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
> */
>
> -#ifndef LTTNG_RELAYD_H
> -#define LTTNG_RELAYD_H
> -
> #include <limits.h>
> #include <urcu.h>
> #include <urcu/wfcqueue.h>
> @@ -34,26 +35,22 @@ struct relay_conn_queue {
> int32_t futex;
> };
>
> -struct relay_local_data {
> - struct lttng_ht *sessions_ht;
> -};
> -
> -extern char *opt_output_path;
> -
> /*
> * Contains stream indexed by ID. This is important since many commands lookup
> * streams only by ID thus also keeping them in this hash table makes the
> - * search O(1) instead of iterating over the ctf_traces_ht of the session.
> + * search O(1).
> */
> +extern struct lttng_ht *sessions_ht;
> extern struct lttng_ht *relay_streams_ht;
> -
> extern struct lttng_ht *viewer_streams_ht;
> -extern struct lttng_ht *indexes_ht;
>
> +extern char *opt_output_path;
> extern const char *tracing_group_name;
> -
> extern const char * const config_section_name;
>
> +extern uid_t relayd_uid;
> +extern gid_t relayd_gid;
> +
> extern int thread_quit_pipe[2];
>
> void lttng_relay_notify_ready(void);
> diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c
> index 8e1879f..46a4e55 100644
> --- a/src/bin/lttng-relayd/main.c
> +++ b/src/bin/lttng-relayd/main.c
> @@ -2,6 +2,7 @@
> * Copyright (C) 2012 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> * 2013 - Jérémie Galarneau <jeremie.galarneau at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify
> * it under the terms of the GNU General Public License, version 2 only,
> @@ -56,6 +57,7 @@
> #include <common/uri.h>
> #include <common/utils.h>
> #include <common/config/config.h>
> +#include <urcu/rculist.h>
>
> #include "cmd.h"
> #include "ctf-trace.h"
> @@ -114,6 +116,11 @@ static pthread_t dispatcher_thread;
> static pthread_t worker_thread;
> static pthread_t health_thread;
>
> +/*
> + * last_relay_stream_id_lock protects last_relay_stream_id increment
> + * atomicity on 32-bit architectures.
> + */
> +static pthread_mutex_t last_relay_stream_id_lock = PTHREAD_MUTEX_INITIALIZER;
> static uint64_t last_relay_stream_id;
>
> /*
> @@ -129,8 +136,8 @@ static char *data_buffer;
> static unsigned int data_buffer_size;
>
> /* We need those values for the file/dir creation. */
> -static uid_t relayd_uid;
> -static gid_t relayd_gid;
> +uid_t relayd_uid;
> +gid_t relayd_gid;
>
> /* Global relay stream hash table. */
> struct lttng_ht *relay_streams_ht;
> @@ -138,8 +145,8 @@ struct lttng_ht *relay_streams_ht;
> /* Global relay viewer stream hash table. */
> struct lttng_ht *viewer_streams_ht;
>
> -/* Global hash table that stores relay index object. */
> -struct lttng_ht *indexes_ht;
> +/* Global relay sessions hash table. */
> +struct lttng_ht *sessions_ht;
>
> /* Relayd health monitoring */
> struct health_app *health_relayd;
> @@ -163,8 +170,7 @@ static const char *config_ignore_options[] = { "help",
> "config" };
> /*
> * usage function on stderr
> */
> -static
> -void usage(void)
> +static void usage(void)
> {
> fprintf(stderr, "Usage: %s OPTIONS\n\nOptions:\n", progname);
> fprintf(stderr, " -h, --help Display this usage.\n");
> @@ -185,8 +191,7 @@ void usage(void)
> *
> * Return 0 on success else a negative value.
> */
> -static
> -int set_option(int opt, const char *arg, const char *optname)
> +static int set_option(int opt, const char *arg, const char *optname)
> {
> int ret;
>
> @@ -308,8 +313,7 @@ end:
> * See config_entry_handler_cb comment in common/config/config.h for the
> * return value conventions.
> */
> -static
> -int config_entry_handler(const struct config_entry *entry, void *unused)
> +static int config_entry_handler(const struct config_entry *entry, void *unused)
> {
> int ret = 0, i;
>
> @@ -332,9 +336,9 @@ int config_entry_handler(const struct config_entry *entry,
> void *unused)
> }
>
> /*
> - * If the option takes no argument on the command line, we have to
> - * check if the value is "true". We support non-zero numeric values,
> - * true, on and yes.
> + * If the option takes no argument on the command line,
> + * we have to check if the value is "true". We support
> + * non-zero numeric values, true, on and yes.
> */
> if (!long_options[i].has_arg) {
> ret = config_parse_value(entry->value);
> @@ -359,8 +363,7 @@ end:
> return ret;
> }
>
> -static
> -int set_options(int argc, char **argv)
> +static int set_options(int argc, char **argv)
> {
> int c, ret = 0, option_index = 0, retval = 0;
> int orig_optopt = optopt, orig_optind = optind;
> @@ -483,21 +486,32 @@ exit:
> return retval;
> }
>
> +static void print_global_objects(void)
> +{
> + rcu_register_thread();
> +
> + print_viewer_streams();
> + print_relay_streams();
> + print_sessions();
> +
> + rcu_unregister_thread();
> +}
> +
> /*
> * Cleanup the daemon
> */
> -static
> -void relayd_cleanup(struct relay_local_data *relay_ctx)
> +static void relayd_cleanup(void)
> {
> + print_global_objects();
> +
> DBG("Cleaning up");
>
> if (viewer_streams_ht)
> lttng_ht_destroy(viewer_streams_ht);
> if (relay_streams_ht)
> lttng_ht_destroy(relay_streams_ht);
> - if (relay_ctx && relay_ctx->sessions_ht)
> - lttng_ht_destroy(relay_ctx->sessions_ht);
> - free(relay_ctx);
> + if (sessions_ht)
> + lttng_ht_destroy(sessions_ht);
>
> /* free the dynamically allocated opt_output_path */
> free(opt_output_path);
> @@ -517,8 +531,7 @@ void relayd_cleanup(struct relay_local_data *relay_ctx)
> /*
> * Write to writable pipe used to notify a thread.
> */
> -static
> -int notify_thread_pipe(int wpipe)
> +static int notify_thread_pipe(int wpipe)
> {
> ssize_t ret;
>
> @@ -532,8 +545,7 @@ end:
> return ret;
> }
>
> -static
> -int notify_health_quit_pipe(int *pipe)
> +static int notify_health_quit_pipe(int *pipe)
> {
> ssize_t ret;
>
> @@ -582,8 +594,7 @@ int lttng_relay_stop_threads(void)
> * Simply stop all worker threads, leaving main() return gracefully after
> * joining all threads and calling cleanup().
> */
> -static
> -void sighandler(int sig)
> +static void sighandler(int sig)
> {
> switch (sig) {
> case SIGPIPE:
> @@ -613,8 +624,7 @@ void sighandler(int sig)
> * Setup signal handler for :
> * SIGINT, SIGTERM, SIGPIPE
> */
> -static
> -int set_signal_handler(void)
> +static int set_signal_handler(void)
> {
> int ret = 0;
> struct sigaction sa;
> @@ -668,8 +678,7 @@ void lttng_relay_notify_ready(void)
> *
> * Return -1 on error or 0 if all pipes are created.
> */
> -static
> -int init_thread_quit_pipe(void)
> +static int init_thread_quit_pipe(void)
> {
> int ret;
>
> @@ -681,8 +690,7 @@ int init_thread_quit_pipe(void)
> /*
> * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set.
> */
> -static
> -int create_thread_poll_set(struct lttng_poll_event *events, int size)
> +static int create_thread_poll_set(struct lttng_poll_event *events, int size)
> {
> int ret;
>
> @@ -713,8 +721,7 @@ error:
> *
> * Return 1 if it was triggered else 0;
> */
> -static
> -int check_thread_quit_pipe(int fd, uint32_t events)
> +static int check_thread_quit_pipe(int fd, uint32_t events)
> {
> if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) {
> return 1;
> @@ -726,8 +733,7 @@ int check_thread_quit_pipe(int fd, uint32_t events)
> /*
> * Create and init socket from uri.
> */
> -static
> -struct lttcomm_sock *relay_init_sock(struct lttng_uri *uri)
> +static struct lttcomm_sock *relay_socket_create(struct lttng_uri *uri)
> {
> int ret;
> struct lttcomm_sock *sock = NULL;
> @@ -765,63 +771,9 @@ error:
> }
>
> /*
> - * Return nonzero if stream needs to be closed.
> - */
> -static
> -int close_stream_check(struct relay_stream *stream)
> -{
> - if (stream->close_flag && stream->prev_seq == stream->last_net_seq_num) {
> - /*
> - * We are about to close the stream so set the data pending flag to 1
> - * which will make the end data pending command skip the stream which
> - * is now closed and ready. Note that after proceeding to a file close,
> - * the written file is ready for reading.
> - */
> - stream->data_pending_check_done = 1;
> - return 1;
> - }
> - return 0;
> -}
> -
> -static void try_close_stream(struct relay_session *session,
> - struct relay_stream *stream)
> -{
> - int ret;
> - struct ctf_trace *ctf_trace;
> -
> - assert(session);
> - assert(stream);
> -
> - if (!close_stream_check(stream)) {
> - /* Can't close it, not ready for that. */
> - goto end;
> - }
> -
> - ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
> - stream->path_name);
> - assert(ctf_trace);
> -
> - pthread_mutex_lock(&session->viewer_ready_lock);
> - ctf_trace->invalid_flag = 1;
> - pthread_mutex_unlock(&session->viewer_ready_lock);
> -
> - ret = stream_close(session, stream);
> - if (ret || session->snapshot) {
> - /* Already close thus the ctf trace is being or has been destroyed. */
> - goto end;
> - }
> -
> - ctf_trace_try_destroy(session, ctf_trace);
> -
> -end:
> - return;
> -}
> -
> -/*
> * This thread manages the listening for new connections on the network
> */
> -static
> -void *relay_thread_listener(void *data)
> +static void *relay_thread_listener(void *data)
> {
> int i, ret, pollfd, err = -1;
> uint32_t revents, nb_fd;
> @@ -834,18 +786,19 @@ void *relay_thread_listener(void *data)
>
> health_code_update();
>
> - control_sock = relay_init_sock(control_uri);
> + control_sock = relay_socket_create(control_uri);
> if (!control_sock) {
> goto error_sock_control;
> }
>
> - data_sock = relay_init_sock(data_uri);
> + data_sock = relay_socket_create(data_uri);
> if (!data_sock) {
> goto error_sock_relay;
> }
>
> /*
> - * Pass 3 as size here for the thread quit pipe, control and data socket.
> + * Pass 3 as size here for the thread quit pipe, control and
> + * data socket.
> */
> ret = create_thread_poll_set(&events, 3);
> if (ret < 0) {
> @@ -900,7 +853,10 @@ restart:
> pollfd = LTTNG_POLL_GETFD(&events, i);
>
> if (!revents) {
> - /* No activity for this FD (poll implementation). */
> + /*
> + * No activity for this FD (poll
> + * implementation).
> + */
> continue;
> }
>
> @@ -916,33 +872,29 @@ restart:
> goto error;
> } else if (revents & LPOLLIN) {
> /*
> - * Get allocated in this thread, enqueued to a global queue,
> - * dequeued and freed in the worker thread.
> + * Get allocated in this thread,
> + * enqueued to a global queue, dequeued
> + * and freed in the worker thread.
> */
> int val = 1;
> struct relay_connection *new_conn;
> struct lttcomm_sock *newsock;
> -
> - new_conn = connection_create();
> - if (!new_conn) {
> - goto error;
> - }
> + enum connection_type type;
>
> if (pollfd == data_sock->fd) {
> - new_conn->type = RELAY_DATA;
> + type = RELAY_DATA;
> newsock = data_sock->ops->accept(data_sock);
> DBG("Relay data connection accepted, socket %d",
> newsock->fd);
> } else {
> assert(pollfd == control_sock->fd);
> - new_conn->type = RELAY_CONTROL;
> + type = RELAY_CONTROL;
> newsock = control_sock->ops->accept(control_sock);
> DBG("Relay control connection accepted, socket %d",
> newsock->fd);
> }
> if (!newsock) {
> PERROR("accepting sock");
> - connection_free(new_conn);
> goto error;
> }
>
> @@ -951,18 +903,22 @@ restart:
> if (ret < 0) {
> PERROR("setsockopt inet");
> lttcomm_destroy_sock(newsock);
> - connection_free(new_conn);
> goto error;
> }
> - new_conn->sock = newsock;
> + new_conn = connection_create(newsock, type);
> + if (!new_conn) {
> + lttcomm_destroy_sock(newsock);
> + goto error;
> + }
>
> /* Enqueue request for the dispatcher thread. */
> cds_wfcq_enqueue(&relay_conn_queue.head, &relay_conn_queue.tail,
> &new_conn->qnode);
>
> /*
> - * Wake the dispatch queue futex. Implicit memory barrier with
> - * the exchange in cds_wfcq_enqueue.
> + * Wake the dispatch queue futex.
> + * Implicit memory barrier with the
> + * exchange in cds_wfcq_enqueue.
> */
> futex_nto1_wake(&relay_conn_queue.futex);
> }
> @@ -1004,8 +960,7 @@ error_sock_control:
> /*
> * This thread manages the dispatching of the requests to worker threads
> */
> -static
> -void *relay_thread_dispatcher(void *data)
> +static void *relay_thread_dispatcher(void *data)
> {
> int err = -1;
> ssize_t ret;
> @@ -1044,14 +999,15 @@ void *relay_thread_dispatcher(void *data)
> DBG("Dispatching request waiting on sock %d", new_conn->sock->fd);
>
> /*
> - * Inform worker thread of the new request. This call is blocking
> - * so we can be assured that the data will be read at some point in
> - * time or wait to the end of the world :)
> + * Inform worker thread of the new request. This
> + * call is blocking so we can be assured that
> + * the data will be read at some point in time
> + * or wait to the end of the world :)
> */
> ret = lttng_write(relay_conn_pipe[1], &new_conn, sizeof(new_conn));
> if (ret < 0) {
> PERROR("write connection pipe");
> - connection_destroy(new_conn);
> + connection_put(new_conn);
> goto error;
> }
> } while (node != NULL);
> @@ -1077,72 +1033,27 @@ error_testpoint:
> return NULL;
> }
>
> -static void try_close_streams(struct relay_session *session)
> -{
> - struct ctf_trace *ctf_trace;
> - struct lttng_ht_iter iter;
> -
> - assert(session);
> -
> - pthread_mutex_lock(&session->viewer_ready_lock);
> - rcu_read_lock();
> - cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace,
> - node.node) {
> - struct relay_stream *stream;
> -
> - /* Close streams. */
> - cds_list_for_each_entry(stream, &ctf_trace->stream_list, trace_list) {
> - stream_close(session, stream);
> - }
> -
> - ctf_trace->invalid_flag = 1;
> - ctf_trace_try_destroy(session, ctf_trace);
> - }
> - rcu_read_unlock();
> - pthread_mutex_unlock(&session->viewer_ready_lock);
> -}
> -
> /*
> - * Try to destroy a session within a connection.
> + * Set index data from the control port to a given index object.
> */
> -static void destroy_session(struct relay_session *session,
> - struct lttng_ht *sessions_ht)
> -{
> - assert(session);
> - assert(sessions_ht);
> -
> - /* Indicate that this session can be destroyed from now on. */
> - session->close_flag = 1;
> -
> - try_close_streams(session);
> -
> - /*
> - * This will try to delete and destroy the session if no viewer is attached
> - * to it meaning the refcount is down to zero.
> - */
> - session_try_destroy(sessions_ht, session);
> -}
> -
> -/*
> - * Copy index data from the control port to a given index object.
> - */
> -static void copy_index_control_data(struct relay_index *index,
> +static int set_index_control_data(struct relay_index *index,
> struct lttcomm_relayd_index *data)
> {
> - assert(index);
> - assert(data);
> + struct ctf_packet_index index_data;
>
> /*
> - * The index on disk is encoded in big endian, so we don't need to convert
> - * the data received on the network. The data_offset value is NEVER
> - * modified here and is updated by the data thread.
> + * The index on disk is encoded in big endian, so we don't need
> + * to convert the data received on the network. The data_offset
> + * value is NEVER modified here and is updated by the data
> + * thread.
> */
> - index->index_data.packet_size = data->packet_size;
> - index->index_data.content_size = data->content_size;
> - index->index_data.timestamp_begin = data->timestamp_begin;
> - index->index_data.timestamp_end = data->timestamp_end;
> - index->index_data.events_discarded = data->events_discarded;
> - index->index_data.stream_id = data->stream_id;
> + index_data.packet_size = data->packet_size;
> + index_data.content_size = data->content_size;
> + index_data.timestamp_begin = data->timestamp_begin;
> + index_data.timestamp_end = data->timestamp_end;
> + index_data.events_discarded = data->events_discarded;
> + index_data.stream_id = data->stream_id;
> + return relay_index_set_data(index, &index_data);
> }
>
> /*
> @@ -1150,31 +1061,22 @@ static void copy_index_control_data(struct relay_index
> *index,
> *
> * On success, send back the session id or else return a negative value.
> */
> -static
> -int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr,
> +static int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr,
> struct relay_connection *conn)
> {
> int ret = 0, send_ret;
> struct relay_session *session;
> struct lttcomm_relayd_status_session reply;
> + char session_name[NAME_MAX];
> + char hostname[HOST_NAME_MAX];
> + uint32_t live_timer = 0;
> + bool snapshot = false;
>
> - assert(recv_hdr);
> - assert(conn);
> + memset(session_name, 0, NAME_MAX);
> + memset(hostname, 0, HOST_NAME_MAX);
>
> memset(&reply, 0, sizeof(reply));
>
> - session = session_create();
> - if (!session) {
> - ret = -1;
> - goto error;
> - }
> - session->minor = conn->minor;
> - session->major = conn->major;
> - conn->session_id = session->id;
> - conn->session = session;
> -
> - reply.session_id = htobe64(session->id);
> -
> switch (conn->minor) {
> case 1:
> case 2:
> @@ -1182,13 +1084,26 @@ int relay_create_session(struct lttcomm_relayd_hdr
> *recv_hdr,
> break;
> case 4: /* LTTng sessiond 2.4 */
> default:
> - ret = cmd_create_session_2_4(conn, session);
> + ret = cmd_create_session_2_4(conn, session_name,
> + hostname, &live_timer, &snapshot);
> + }
> + if (ret < 0) {
> + goto send_reply;
> }
>
> - lttng_ht_add_unique_u64(conn->sessions_ht, &session->session_n);
> + session = session_create(session_name, hostname, live_timer,
> + snapshot, conn->major, conn->minor);
> + if (!session) {
> + ret = -1;
> + goto send_reply;
> + }
> + assert(!conn->session);
> + conn->session = session;
> DBG("Created session %" PRIu64, session->id);
>
> -error:
> + reply.session_id = htobe64(session->id);
> +
> +send_reply:
> if (ret < 0) {
> reply.ret_code = htobe32(LTTNG_ERR_FATAL);
> } else {
> @@ -1208,47 +1123,48 @@ error:
> * When we have received all the streams and the metadata for a channel,
> * we make them visible to the viewer threads.
> */
> -static
> -void set_viewer_ready_flag(struct relay_connection *conn)
> +static void publish_connection_local_streams(struct relay_connection *conn)
> {
> - struct relay_stream *stream, *tmp_stream;
> + struct relay_stream *stream;
> + struct relay_session *session = conn->session;
>
> - pthread_mutex_lock(&conn->session->viewer_ready_lock);
> - cds_list_for_each_entry_safe(stream, tmp_stream, &conn->recv_head,
> - recv_list) {
> - stream->viewer_ready = 1;
> - cds_list_del(&stream->recv_list);
> + /*
> + * We publish all streams belonging to a session atomically wrt
> + * session lock.
> + */
> + pthread_mutex_lock(&session->lock);
> + rcu_read_lock();
> + cds_list_for_each_entry_rcu(stream, &session->recv_list,
> + recv_node) {
> + stream_publish(stream);
> }
> - pthread_mutex_unlock(&conn->session->viewer_ready_lock);
> - return;
> -}
> -
> -/*
> - * Add a recv handle node to the connection recv list with the given stream
> - * handle. A new node is allocated thus must be freed when the node is deleted
> - * from the list.
> - */
> -static void queue_stream(struct relay_stream *stream,
> - struct relay_connection *conn)
> -{
> - assert(conn);
> - assert(stream);
> + rcu_read_unlock();
>
> - cds_list_add(&stream->recv_list, &conn->recv_head);
> + /*
> + * Inform the viewer that there are new streams in the session.
> + */
> + if (session->viewer_attached) {
> + uatomic_set(&session->new_streams, 1);
> + }
> + pthread_mutex_unlock(&session->lock);
> + return;
> }
>
> /*
> * relay_add_stream: allocate a new stream for a session
> */
> -static
> -int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
> +static int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
> struct relay_connection *conn)
> {
> - int ret, send_ret;
> + int ret;
> + ssize_t send_ret;
> struct relay_session *session = conn->session;
> struct relay_stream *stream = NULL;
> struct lttcomm_relayd_status_stream reply;
> struct ctf_trace *trace = NULL;
> + uint64_t stream_handle = -1ULL;
> + char *path_name = NULL, *channel_name = NULL;
> + uint64_t tracefile_size = 0, tracefile_count = 0;
>
> if (!session || conn->version_check_done == 0) {
> ERR("Trying to add a stream before version check");
> @@ -1256,107 +1172,46 @@ int relay_add_stream(struct lttcomm_relayd_hdr
> *recv_hdr,
> goto end_no_session;
> }
>
> - stream = zmalloc(sizeof(struct relay_stream));
> - if (stream == NULL) {
> - PERROR("relay stream zmalloc");
> - ret = -1;
> - goto end_no_session;
> - }
> -
> - switch (conn->minor) {
> - case 1: /* LTTng sessiond 2.1 */
> - ret = cmd_recv_stream_2_1(conn, stream);
> + switch (session->minor) {
> + case 1: /* LTTng sessiond 2.1. Allocates path_name and channel_name. */
> + ret = cmd_recv_stream_2_1(conn, &path_name,
> + &channel_name);
> break;
> - case 2: /* LTTng sessiond 2.2 */
> + case 2: /* LTTng sessiond 2.2. Allocates path_name and channel_name. */
> default:
> - ret = cmd_recv_stream_2_2(conn, stream);
> + ret = cmd_recv_stream_2_2(conn, &path_name,
> + &channel_name, &tracefile_size, &tracefile_count);
> break;
> }
> if (ret < 0) {
> - goto err_free_stream;
> + goto send_reply;
> }
>
> - stream->stream_handle = ++last_relay_stream_id;
> - stream->prev_seq = -1ULL;
> - stream->session_id = session->id;
> - stream->index_fd = -1;
> - stream->read_index_fd = -1;
> - stream->ctf_stream_id = -1ULL;
> - lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
> - pthread_mutex_init(&stream->lock, NULL);
> -
> - ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG);
> - if (ret < 0) {
> - ERR("relay creating output directory");
> - goto err_free_stream;
> - }
> -
> - /*
> - * No need to use run_as API here because whatever we receives, the relayd
> - * uses its own credentials for the stream files.
> - */
> - ret = utils_create_stream_file(stream->path_name, stream->channel_name,
> - stream->tracefile_size, 0, relayd_uid, relayd_gid, NULL);
> - if (ret < 0) {
> - ERR("Create output file");
> - goto err_free_stream;
> - }
> - stream->fd = ret;
> - if (stream->tracefile_size) {
> - DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name);
> - } else {
> - DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name);
> - }
> -
> - /* Protect access to "trace" */
> - rcu_read_lock();
> - trace = ctf_trace_find_by_path(session->ctf_traces_ht, stream->path_name);
> + trace = ctf_trace_get_by_path_or_create(session, path_name);
> if (!trace) {
> - trace = ctf_trace_create(stream->path_name);
> - if (!trace) {
> - ret = -1;
> - goto end;
> - }
> - ctf_trace_add(session->ctf_traces_ht, trace);
> + goto send_reply;
> }
> - ctf_trace_get_ref(trace);
> + /* This stream here has one reference on the trace. */
>
> - if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, NAME_MAX)) {
> - stream->metadata_flag = 1;
> - /* Assign quick reference to the metadata stream in the trace. */
> - trace->metadata_stream = stream;
> - }
> + pthread_mutex_lock(&last_relay_stream_id_lock);
> + stream_handle = ++last_relay_stream_id;
> + pthread_mutex_unlock(&last_relay_stream_id_lock);
>
> - /*
> - * Add the stream in the recv list of the connection. Once the end stream
> - * message is received, this list is emptied and streams are set with the
> - * viewer ready flag.
> - */
> - queue_stream(stream, conn);
> + /* We pass ownership of path_name and channel_name. */
> + stream = stream_create(trace, stream_handle, path_name,
> + channel_name, tracefile_size, tracefile_count);
>
> /*
> - * Both in the ctf_trace object and the global stream ht since the data
> - * side of the relayd does not have the concept of session.
> - *
> - * rcu_read_lock() is kept to protect the stream which is now part of
> - * the relay_streams_ht.
> + * Streams are the owners of their trace. Reference to trace is
> + * kept within stream_create().
> */
> - lttng_ht_add_unique_u64(relay_streams_ht, &stream->node);
> - cds_list_add_tail(&stream->trace_list, &trace->stream_list);
> -
> - session->stream_count++;
> + ctf_trace_put(trace);
>
> - DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
> - stream->stream_handle);
> -
> -end:
> +send_reply:
> memset(&reply, 0, sizeof(reply));
> - reply.handle = htobe64(stream->stream_handle);
> - /* send the session id to the client or a negative return code on error */
> - if (ret < 0) {
> + reply.handle = htobe64(stream_handle);
> + if (!stream) {
> reply.ret_code = htobe32(LTTNG_ERR_UNK);
> - /* stream was not properly added to the ht, so free it */
> - stream_destroy(stream);
> } else {
> reply.ret_code = htobe32(LTTNG_OK);
> }
> @@ -1365,29 +1220,17 @@ end:
> sizeof(struct lttcomm_relayd_status_stream), 0);
> if (send_ret < 0) {
> ERR("Relay sending stream id");
> - ret = send_ret;
> + ret = (int) send_ret;
> }
> - /*
> - * rcu_read_lock() was held to protect either "trace" OR the "stream" at
> - * this point.
> - */
> - rcu_read_unlock();
> - trace = NULL;
> - stream = NULL;
>
> end_no_session:
> return ret;
> -
> -err_free_stream:
> - stream_destroy(stream);
> - return ret;
> }
>
> /*
> * relay_close_stream: close a specific stream
> */
> -static
> -int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
> +static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
> struct relay_connection *conn)
> {
> int ret, send_ret;
> @@ -1417,23 +1260,37 @@ int relay_close_stream(struct lttcomm_relayd_hdr
> *recv_hdr,
> goto end_no_session;
> }
>
> - rcu_read_lock();
> - stream = stream_find_by_id(relay_streams_ht,
> - be64toh(stream_info.stream_id));
> + stream = stream_get_by_id(be64toh(stream_info.stream_id));
> if (!stream) {
> ret = -1;
> - goto end_unlock;
> + goto end;
> }
> -
> + pthread_mutex_lock(&stream->lock);
> + stream->closed = true;
> stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
> - stream->close_flag = 1;
> - session->stream_count--;
> + if (stream->is_metadata) {
> + struct relay_viewer_stream *vstream;
> + bool close_metadata = false;
>
> - /* Check if we can close it or else the data will do it. */
> - try_close_stream(session, stream);
> + vstream = viewer_stream_get_by_id(stream->stream_handle);
> + if (vstream) {
> + pthread_mutex_lock(&vstream->lock);
> + if (vstream->metadata_sent == stream->metadata_received) {
> + close_metadata = true;
> + ERR("main putting ownership of vstream %" PRIu64,
> + vstream->stream->stream_handle);
> + }
> + pthread_mutex_unlock(&vstream->lock);
> + viewer_stream_put(vstream);
> + }
> + if (close_metadata) {
> + viewer_stream_put(vstream);
> + }
> + }
> + pthread_mutex_unlock(&stream->lock);
> + stream_put(stream);
>
> -end_unlock:
> - rcu_read_unlock();
> +end:
>
> memset(&reply, 0, sizeof(reply));
> if (ret < 0) {
> @@ -1455,8 +1312,7 @@ end_no_session:
> /*
> * relay_unknown_command: send -1 if received unknown command
> */
> -static
> -void relay_unknown_command(struct relay_connection *conn)
> +static void relay_unknown_command(struct relay_connection *conn)
> {
> struct lttcomm_relayd_generic_reply reply;
> int ret;
> @@ -1474,8 +1330,7 @@ void relay_unknown_command(struct relay_connection *conn)
> * relay_start: send an acknowledgment to the client to tell if we are
> * ready to receive data. We are ready if a session is established.
> */
> -static
> -int relay_start(struct lttcomm_relayd_hdr *recv_hdr,
> +static int relay_start(struct lttcomm_relayd_hdr *recv_hdr,
> struct relay_connection *conn)
> {
> int ret = htobe32(LTTNG_OK);
> @@ -1529,10 +1384,9 @@ end:
> }
>
> /*
> - * relay_recv_metadata: receive the metada for the session.
> + * relay_recv_metadata: receive the metadata for the session.
> */
> -static
> -int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
> +static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
> struct relay_connection *conn)
> {
> int ret = htobe32(LTTNG_OK);
> @@ -1541,7 +1395,6 @@ int relay_recv_metadata(struct lttcomm_relayd_hdr
> *recv_hdr,
> struct lttcomm_relayd_metadata_payload *metadata_struct;
> struct relay_stream *metadata_stream;
> uint64_t data_size, payload_size;
> - struct ctf_trace *ctf_trace;
>
> if (!session) {
> ERR("Metadata sent before version check");
> @@ -1586,38 +1439,37 @@ int relay_recv_metadata(struct lttcomm_relayd_hdr
> *recv_hdr,
> }
> metadata_struct = (struct lttcomm_relayd_metadata_payload *) data_buffer;
>
> - rcu_read_lock();
> - metadata_stream = stream_find_by_id(relay_streams_ht,
> - be64toh(metadata_struct->stream_id));
> + metadata_stream = stream_get_by_id(be64toh(metadata_struct->stream_id));
> if (!metadata_stream) {
> ret = -1;
> - goto end_unlock;
> + goto end;
> }
>
> - size_ret = lttng_write(metadata_stream->fd, metadata_struct->payload,
> + pthread_mutex_lock(&metadata_stream->lock);
> +
> + size_ret = lttng_write(metadata_stream->stream_fd->fd,
> metadata_struct->payload,
> payload_size);
> if (size_ret < payload_size) {
> ERR("Relay error writing metadata on file");
> ret = -1;
> - goto end_unlock;
> + goto end_put;
> }
>
> - ret = write_padding_to_file(metadata_stream->fd,
> + ret = write_padding_to_file(metadata_stream->stream_fd->fd,
> be32toh(metadata_struct->padding_size));
> if (ret < 0) {
> - goto end_unlock;
> + goto end_put;
> }
>
> - ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
> - metadata_stream->path_name);
> - assert(ctf_trace);
> - ctf_trace->metadata_received +=
> + metadata_stream->metadata_received +=
> payload_size + be32toh(metadata_struct->padding_size);
> + DBG2("Relay metadata written. Updated metadata_received %" PRIu64,
> + metadata_stream->metadata_received);
>
> - DBG2("Relay metadata written");
> +end_put:
> + pthread_mutex_unlock(&metadata_stream->lock);
> + stream_put(metadata_stream);
>
> -end_unlock:
> - rcu_read_unlock();
> end:
> return ret;
> }
> @@ -1625,15 +1477,12 @@ end:
> /*
> * relay_send_version: send relayd version number
> */
> -static
> -int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
> +static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
> struct relay_connection *conn)
> {
> int ret;
> struct lttcomm_relayd_version reply, msg;
>
> - assert(conn);
> -
> conn->version_check_done = 1;
>
> /* Get version from the other side. */
> @@ -1657,7 +1506,7 @@ int relay_send_version(struct lttcomm_relayd_hdr
> *recv_hdr,
> if (reply.major != be32toh(msg.major)) {
> DBG("Incompatible major versions (%u vs %u), deleting session",
> reply.major, be32toh(msg.major));
> - destroy_session(conn->session, conn->sessions_ht);
> + connection_put(conn);
> ret = 0;
> goto end;
> }
> @@ -1688,8 +1537,7 @@ end:
> /*
> * Check for data pending for a given stream id from the session daemon.
> */
> -static
> -int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
> +static int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
> struct relay_connection *conn)
> {
> struct relay_session *session = conn->session;
> @@ -1723,13 +1571,14 @@ int relay_data_pending(struct lttcomm_relayd_hdr
> *recv_hdr,
> stream_id = be64toh(msg.stream_id);
> last_net_seq_num = be64toh(msg.last_net_seq_num);
>
> - rcu_read_lock();
> - stream = stream_find_by_id(relay_streams_ht, stream_id);
> + stream = stream_get_by_id(stream_id);
> if (stream == NULL) {
> ret = -1;
> - goto end_unlock;
> + goto end;
> }
>
> + pthread_mutex_lock(&stream->lock);
> +
> DBG("Data pending for stream id %" PRIu64 " prev_seq %" PRIu64
> " and last_seq %" PRIu64, stream_id, stream->prev_seq,
> last_net_seq_num);
> @@ -1743,11 +1592,11 @@ int relay_data_pending(struct lttcomm_relayd_hdr
> *recv_hdr,
> ret = 1;
> }
>
> - /* Pending check is now done. */
> - stream->data_pending_check_done = 1;
> + stream->data_pending_check_done = true;
> + pthread_mutex_unlock(&stream->lock);
>
> -end_unlock:
> - rcu_read_unlock();
> + stream_put(stream);
> +end:
>
> memset(&reply, 0, sizeof(reply));
> reply.ret_code = htobe32(ret);
> @@ -1763,18 +1612,17 @@ end_no_session:
> /*
> * Wait for the control socket to reach a quiescent state.
> *
> - * Note that for now, when receiving this command from the session daemon, this
> - * means that every subsequent commands or data received on the control socket
> - * has been handled. So, this is why we simply return OK here.
> + * Note that for now, when receiving this command from the session
> + * daemon, this means that every subsequent commands or data received on
> + * the control socket has been handled. So, this is why we simply return
> + * OK here.
> */
> -static
> -int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr,
> +static int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr,
> struct relay_connection *conn)
> {
> int ret;
> uint64_t stream_id;
> struct relay_stream *stream;
> - struct lttng_ht_iter iter;
> struct lttcomm_relayd_quiescent_control msg;
> struct lttcomm_relayd_generic_reply reply;
>
> @@ -1800,19 +1648,16 @@ int relay_quiescent_control(struct lttcomm_relayd_hdr
> *recv_hdr,
> }
>
> stream_id = be64toh(msg.stream_id);
> -
> - rcu_read_lock();
> - cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
> - node.node) {
> - if (stream->stream_handle == stream_id) {
> - stream->data_pending_check_done = 1;
> - DBG("Relay quiescent control pending flag set to %" PRIu64,
> - stream_id);
> - break;
> - }
> - }
> - rcu_read_unlock();
> -
> + stream = stream_get_by_id(stream_id);
> + if (!stream) {
> + goto reply;
> + }
> + pthread_mutex_lock(&stream->lock);
> + stream->data_pending_check_done = true;
> + pthread_mutex_unlock(&stream->lock);
> + DBG("Relay quiescent control pending flag set to %" PRIu64, stream_id);
> + stream_put(stream);
> +reply:
> memset(&reply, 0, sizeof(reply));
> reply.ret_code = htobe32(LTTNG_OK);
> ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
> @@ -1825,14 +1670,14 @@ end_no_session:
> }
>
> /*
> - * Initialize a data pending command. This means that a client is about to ask
> - * for data pending for each stream he/she holds. Simply iterate over all
> - * streams of a session and set the data_pending_check_done flag.
> + * Initialize a data pending command. This means that a client is about
> + * to ask for data pending for each stream he/she holds. Simply iterate
> + * over all streams of a session and set the data_pending_check_done
> + * flag.
> *
> * This command returns to the client a LTTNG_OK code.
> */
> -static
> -int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
> +static int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
> struct relay_connection *conn)
> {
> int ret;
> @@ -1869,18 +1714,25 @@ int relay_begin_data_pending(struct lttcomm_relayd_hdr
> *recv_hdr,
> session_id = be64toh(msg.session_id);
>
> /*
> - * Iterate over all streams to set the begin data pending flag. For now, the
> - * streams are indexed by stream handle so we have to iterate over all
> - * streams to find the one associated with the right session_id.
> + * Iterate over all streams to set the begin data pending flag.
> + * For now, the streams are indexed by stream handle so we have
> + * to iterate over all streams to find the one associated with
> + * the right session_id.
> */
> rcu_read_lock();
> cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
> node.node) {
> - if (stream->session_id == session_id) {
> - stream->data_pending_check_done = 0;
> + if (!stream_get(stream)) {
> + continue;
> + }
> + if (stream->trace->session->id == session_id) {
> + pthread_mutex_lock(&stream->lock);
> + stream->data_pending_check_done = false;
> + pthread_mutex_unlock(&stream->lock);
> DBG("Set begin data pending flag to stream %" PRIu64,
> stream->stream_handle);
> }
> + stream_put(stream);
> }
> rcu_read_unlock();
>
> @@ -1898,16 +1750,15 @@ end_no_session:
> }
>
> /*
> - * End data pending command. This will check, for a given session id, if each
> - * stream associated with it has its data_pending_check_done flag set. If not,
> - * this means that the client lost track of the stream but the data is still
> - * being streamed on our side. In this case, we inform the client that data is
> - * inflight.
> + * End data pending command. This will check, for a given session id, if
> + * each stream associated with it has its data_pending_check_done flag
> + * set. If not, this means that the client lost track of the stream but
> + * the data is still being streamed on our side. In this case, we inform
> + * the client that data is inflight.
> *
> * Return to the client if there is data in flight or not with a ret_code.
> */
> -static
> -int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
> +static int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
> struct relay_connection *conn)
> {
> int ret;
> @@ -1918,9 +1769,6 @@ int relay_end_data_pending(struct lttcomm_relayd_hdr
> *recv_hdr,
> uint64_t session_id;
> uint32_t is_data_inflight = 0;
>
> - assert(recv_hdr);
> - assert(conn);
> -
> DBG("End data pending command");
>
> if (!conn->session || conn->version_check_done == 0) {
> @@ -1944,17 +1792,33 @@ int relay_end_data_pending(struct lttcomm_relayd_hdr
> *recv_hdr,
>
> session_id = be64toh(msg.session_id);
>
> - /* Iterate over all streams to see if the begin data pending flag is set. */
> + /*
> + * Iterate over all streams to see if the begin data pending
> + * flag is set.
> + */
> rcu_read_lock();
> cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
> node.node) {
> - if (stream->session_id == session_id &&
> - !stream->data_pending_check_done && !stream->terminated_flag) {
> - is_data_inflight = 1;
> - DBG("Data is still in flight for stream %" PRIu64,
> - stream->stream_handle);
> - break;
> + if (!stream_get(stream)) {
> + continue;
> + }
> + if (stream->trace->session->id != session_id) {
> + stream_put(stream);
> + continue;
> + }
> + pthread_mutex_lock(&stream->lock);
> + if (!stream->data_pending_check_done) {
> + if (!stream->closed || !(((int64_t) (stream->prev_seq -
> stream->last_net_seq_num)) >= 0)) {
> + is_data_inflight = 1;
> + DBG("Data is still in flight for stream %" PRIu64,
> + stream->stream_handle);
> + pthread_mutex_unlock(&stream->lock);
> + stream_put(stream);
> + break;
> + }
> }
> + pthread_mutex_unlock(&stream->lock);
> + stream_put(stream);
> }
> rcu_read_unlock();
>
> @@ -1976,14 +1840,13 @@ end_no_session:
> *
> * Return 0 on success else a negative value.
> */
> -static
> -int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
> +static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
> struct relay_connection *conn)
> {
> - int ret, send_ret, index_created = 0;
> + int ret, send_ret;
> struct relay_session *session = conn->session;
> struct lttcomm_relayd_index index_info;
> - struct relay_index *index, *wr_index = NULL;
> + struct relay_index *index;
> struct lttcomm_relayd_generic_reply reply;
> struct relay_stream *stream;
> uint64_t net_seq_num;
> @@ -2013,76 +1876,66 @@ int relay_recv_index(struct lttcomm_relayd_hdr
> *recv_hdr,
>
> net_seq_num = be64toh(index_info.net_seq_num);
>
> - rcu_read_lock();
> - stream = stream_find_by_id(relay_streams_ht,
> - be64toh(index_info.relay_stream_id));
> + stream = stream_get_by_id(be64toh(index_info.relay_stream_id));
> if (!stream) {
> + ERR("stream_get_by_id not found");
> ret = -1;
> - goto end_rcu_unlock;
> + goto end;
> }
> + pthread_mutex_lock(&stream->lock);
>
> /* Live beacon handling */
> if (index_info.packet_size == 0) {
> - DBG("Received live beacon for stream %" PRIu64, stream->stream_handle);
> + DBG("Received live beacon for stream %" PRIu64,
> + stream->stream_handle);
>
> /*
> - * Only flag a stream inactive when it has already received data
> - * and no indexes are in flight.
> + * Only flag a stream inactive when it has already
> + * received data and no indexes are in flight.
> */
> - if (stream->total_index_received > 0 && stream->indexes_in_flight == 0) {
> - stream->beacon_ts_end = be64toh(index_info.timestamp_end);
> + if (stream->total_index_received > 0
> + && stream->indexes_in_flight == 0) {
> + stream->beacon_ts_end =
> + be64toh(index_info.timestamp_end);
> }
> ret = 0;
> - goto end_rcu_unlock;
> + goto end_stream_put;
> } else {
> stream->beacon_ts_end = -1ULL;
> }
>
> - index = relay_index_find(stream->stream_handle, net_seq_num);
> - if (!index) {
> - /* A successful creation will add the object to the HT. */
> - index = relay_index_create(stream->stream_handle, net_seq_num);
> - if (!index) {
> - goto end_rcu_unlock;
> - }
> - index_created = 1;
> - stream->indexes_in_flight++;
> - }
> -
> - copy_index_control_data(index, &index_info);
> if (stream->ctf_stream_id == -1ULL) {
> stream->ctf_stream_id = be64toh(index_info.stream_id);
> }
> -
> - if (index_created) {
> - /*
> - * Try to add the relay index object to the hash table. If an object
> - * already exist, destroy back the index created, set the data in this
> - * object and write it on disk.
> - */
> - relay_index_add(index, &wr_index);
> - if (wr_index) {
> - copy_index_control_data(wr_index, &index_info);
> - free(index);
> - }
> - } else {
> - /* The index already exists so write it on disk. */
> - wr_index = index;
> + index = relay_index_get_by_id_or_create(stream, net_seq_num);
> + if (!index) {
> + ret = -1;
> + ERR("relay_index_get_by_id_or_create index NULL");
> + goto end_stream_put;
> }
> -
> - /* Do we have a writable ready index to write on disk. */
> - if (wr_index) {
> - ret = relay_index_write(wr_index->fd, wr_index);
> - if (ret < 0) {
> - goto end_rcu_unlock;
> - }
> + if (set_index_control_data(index, &index_info)) {
> + ERR("set_index_control_data error");
> + relay_index_put(index);
> + ret = -1;
> + goto end_stream_put;
> + }
> + ret = relay_index_try_flush(index);
> + if (ret == 0) {
> stream->total_index_received++;
> - stream->indexes_in_flight--;
> - assert(stream->indexes_in_flight >= 0);
> + } else if (ret > 0) {
> + /* no flush. */
> + ret = 0;
> + } else {
> + ERR("relay_index_try_flush error %d", ret);
> + relay_index_put(index);
> + ret = -1;
> }
>
> -end_rcu_unlock:
> - rcu_read_unlock();
> +end_stream_put:
> + pthread_mutex_unlock(&stream->lock);
> + stream_put(stream);
> +
> +end:
>
> memset(&reply, 0, sizeof(reply));
> if (ret < 0) {
> @@ -2105,8 +1958,7 @@ end_no_session:
> *
> * Return 0 on success else a negative value.
> */
> -static
> -int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr,
> +static int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr,
> struct relay_connection *conn)
> {
> int ret, send_ret;
> @@ -2123,17 +1975,10 @@ int relay_streams_sent(struct lttcomm_relayd_hdr
> *recv_hdr,
> }
>
> /*
> - * Flag every pending stream in the connection recv list that they are
> - * ready to be used by the viewer.
> - */
> - set_viewer_ready_flag(conn);
> -
> - /*
> - * Inform the viewer that there are new streams in the session.
> + * Publish every pending stream in the connection recv list that
> + * they are ready to be used by the viewer.
> */
> - if (conn->session->viewer_refcount) {
> - uatomic_set(&conn->session->new_streams, 1);
> - }
> + publish_connection_local_streams(conn);
>
> memset(&reply, 0, sizeof(reply));
> reply.ret_code = htobe32(LTTNG_OK);
> @@ -2153,8 +1998,7 @@ end_no_session:
> /*
> * Process the commands received on the control socket
> */
> -static
> -int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
> +static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
> struct relay_connection *conn)
> {
> int ret = 0;
> @@ -2211,93 +2055,91 @@ end:
> /*
> * Handle index for a data stream.
> *
> - * RCU read side lock MUST be acquired.
> + * Called with the stream lock held.
> *
> * Return 0 on success else a negative value.
> */
> static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
> int rotate_index)
> {
> - int ret = 0, index_created = 0;
> - uint64_t stream_id, data_offset;
> - struct relay_index *index, *wr_index = NULL;
> -
> - assert(stream);
> + int ret = 0;
> + uint64_t data_offset;
> + struct relay_index *index;
>
> - stream_id = stream->stream_handle;
> /* Get data offset because we are about to update the index. */
> data_offset = htobe64(stream->tracefile_size_current);
>
> + DBG("handle_index_data: stream %" PRIu64 " data offset %" PRIu64,
> + stream->stream_handle, stream->tracefile_size_current);
> +
> /*
> - * Lookup for an existing index for that stream id/sequence number. If on
> - * exists, the control thread already received the data for it thus we need
> - * to write it on disk.
> + * Lookup for an existing index for that stream id/sequence
> + * number. If on exists, the control thread already received the
> + * data for it thus we need to write it on disk.
> */
> - index = relay_index_find(stream_id, net_seq_num);
> + index = relay_index_get_by_id_or_create(stream, net_seq_num);
> if (!index) {
> - /* A successful creation will add the object to the HT. */
> - index = relay_index_create(stream_id, net_seq_num);
> - if (!index) {
> - ret = -1;
> - goto error;
> - }
> - index_created = 1;
> - stream->indexes_in_flight++;
> + ret = -1;
> + goto end;
> }
>
> - if (rotate_index || stream->index_fd < 0) {
> - index->to_close_fd = stream->index_fd;
> - ret = index_create_file(stream->path_name, stream->channel_name,
> + if (rotate_index || !stream->index_fd) {
> + int fd;
> +
> + /* Put ref on previous index_fd. */
> + if (stream->index_fd) {
> + stream_fd_put(stream->index_fd);
> + stream->index_fd = NULL;
> + }
> +
> + fd = index_create_file(stream->path_name, stream->channel_name,
> relayd_uid, relayd_gid, stream->tracefile_size,
> - stream->tracefile_count_current);
> - if (ret < 0) {
> - /* This will close the stream's index fd if one. */
> - relay_index_free_safe(index);
> - goto error;
> + stream->current_tracefile_id);
> + if (fd < 0) {
> + ret = -1;
> + /* Put self-ref for this index due to error. */
> + relay_index_put(index);
> + goto end;
> + }
> + stream->index_fd = stream_fd_create(fd);
> + if (!stream->index_fd) {
> + ret = -1;
> + if (close(fd)) {
> + PERROR("Error closing FD %d", fd);
> + }
> + /* Put self-ref for this index due to error. */
> + relay_index_put(index);
> + /* Will put the local ref. */
> + goto end;
> }
> - stream->index_fd = ret;
> }
> - index->fd = stream->index_fd;
> - index->index_data.offset = data_offset;
>
> - if (index_created) {
> - /*
> - * Try to add the relay index object to the hash table. If an object
> - * already exist, destroy back the index created and set the data.
> - */
> - relay_index_add(index, &wr_index);
> - if (wr_index) {
> - /* Copy back data from the created index. */
> - wr_index->fd = index->fd;
> - wr_index->to_close_fd = index->to_close_fd;
> - wr_index->index_data.offset = data_offset;
> - free(index);
> - }
> - } else {
> - /* The index already exists so write it on disk. */
> - wr_index = index;
> + if (relay_index_set_fd(index, stream->index_fd, data_offset)) {
> + ret = -1;
> + /* Put self-ref for this index due to error. */
> + relay_index_put(index);
> + goto end;
> }
>
> - /* Do we have a writable ready index to write on disk. */
> - if (wr_index) {
> - ret = relay_index_write(wr_index->fd, wr_index);
> - if (ret < 0) {
> - goto error;
> - }
> + ret = relay_index_try_flush(index);
> + if (ret == 0) {
> stream->total_index_received++;
> - stream->indexes_in_flight--;
> - assert(stream->indexes_in_flight >= 0);
> + } else if (ret > 0) {
> + /* No flush. */
> + ret = 0;
> + } else {
> + /* Put self-ref for this index due to error. */
> + relay_index_put(index);
> + ret = -1;
> }
> -
> -error:
> +end:
> return ret;
> }
>
> /*
> * relay_process_data: Process the data received on the data socket
> */
> -static
> -int relay_process_data(struct relay_connection *conn)
> +static int relay_process_data(struct relay_connection *conn)
> {
> int ret = 0, rotate_index = 0;
> ssize_t size_ret;
> @@ -2308,8 +2150,6 @@ int relay_process_data(struct relay_connection *conn)
> uint32_t data_size;
> struct relay_session *session;
>
> - assert(conn);
> -
> ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr,
> sizeof(struct lttcomm_relayd_data_hdr), 0);
> if (ret <= 0) {
> @@ -2324,17 +2164,12 @@ int relay_process_data(struct relay_connection *conn)
> }
>
> stream_id = be64toh(data_hdr.stream_id);
> -
> - rcu_read_lock();
> - stream = stream_find_by_id(relay_streams_ht, stream_id);
> + stream = stream_get_by_id(stream_id);
> if (!stream) {
> ret = -1;
> - goto end_rcu_unlock;
> + goto end;
> }
> -
> - session = session_find_by_id(conn->sessions_ht, stream->session_id);
> - assert(session);
> -
> + session = stream->trace->session;
> data_size = be32toh(data_hdr.data_size);
> if (data_buffer_size < data_size) {
> char *tmp_data_ptr;
> @@ -2344,7 +2179,7 @@ int relay_process_data(struct relay_connection *conn)
> ERR("Allocating data buffer");
> free(data_buffer);
> ret = -1;
> - goto end_rcu_unlock;
> + goto end_stream_put;
> }
> data_buffer = tmp_data_ptr;
> data_buffer_size = data_size;
> @@ -2362,9 +2197,11 @@ int relay_process_data(struct relay_connection *conn)
> DBG("Socket %d did an orderly shutdown", conn->sock->fd);
> }
> ret = -1;
> - goto end_rcu_unlock;
> + goto end_stream_put;
> }
>
> + pthread_mutex_lock(&stream->lock);
> +
> /* Check if a rotation is needed. */
> if (stream->tracefile_size > 0 &&
> (stream->tracefile_size_current + data_size) >
> @@ -2372,104 +2209,84 @@ int relay_process_data(struct relay_connection *conn)
> struct relay_viewer_stream *vstream;
> uint64_t new_id;
>
> - new_id = (stream->tracefile_count_current + 1) %
> + new_id = (stream->current_tracefile_id + 1) %
> stream->tracefile_count;
> /*
> - * When we wrap-around back to 0, we start overwriting old
> - * trace data.
> + * Move viewer oldest available data position forward if
> + * we are overwriting a tracefile.
> */
> - if (!stream->tracefile_overwrite && new_id == 0) {
> - stream->tracefile_overwrite = 1;
> - }
> - pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
> - if (stream->tracefile_overwrite) {
> + if (new_id == stream->oldest_tracefile_id) {
> stream->oldest_tracefile_id =
> (stream->oldest_tracefile_id + 1) %
> stream->tracefile_count;
> }
> - vstream = viewer_stream_find_by_id(stream->stream_handle);
> + vstream = viewer_stream_get_by_id(stream->stream_handle);
> if (vstream) {
> - /*
> - * The viewer is reading a file about to be
> - * overwritten. Close the FDs it is
> - * currently using and let it handle the fault.
> - */
> - if (vstream->tracefile_count_current == new_id) {
> - pthread_mutex_lock(&vstream->overwrite_lock);
> - vstream->abort_flag = 1;
> - pthread_mutex_unlock(&vstream->overwrite_lock);
> - DBG("Streaming side setting abort_flag on stream %s_%" PRIu64 "\n",
> - stream->channel_name, new_id);
> - } else if (vstream->tracefile_count_current ==
> - stream->tracefile_count_current) {
> - /*
> - * The reader and writer were in the
> - * same trace file, inform the viewer
> - * that no new index will ever be added
> - * to this file.
> - */
> - vstream->close_write_flag = 1;
> + pthread_mutex_lock(&vstream->lock);
> + ret = utils_rotate_stream_file(stream->path_name,
> + stream->channel_name, stream->tracefile_size,
> + stream->tracefile_count, relayd_uid,
> + relayd_gid, stream->stream_fd->fd,
> + &stream->current_tracefile_id,
> + &stream->stream_fd->fd);
> + pthread_mutex_unlock(&vstream->lock);
> + viewer_stream_put(vstream);
> + if (ret < 0) {
> + ERR("Rotating stream output file");
> + goto end_stream_unlock;
> }
> }
> - ret = utils_rotate_stream_file(stream->path_name, stream->channel_name,
> - stream->tracefile_size, stream->tracefile_count,
> - relayd_uid, relayd_gid, stream->fd,
> - &(stream->tracefile_count_current), &stream->fd);
> - pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
> - if (ret < 0) {
> - ERR("Rotating stream output file");
> - goto end_rcu_unlock;
> - }
> - /* Reset current size because we just perform a stream rotation. */
> + /*
> + * Reset current size because we just performed a stream
> + * rotation.
> + */
> stream->tracefile_size_current = 0;
> rotate_index = 1;
> }
>
> /*
> - * Index are handled in protocol version 2.4 and above. Also, snapshot and
> - * index are NOT supported.
> + * Index are handled in protocol version 2.4 and above. Also,
> + * snapshot and index are NOT supported.
> */
> if (session->minor >= 4 && !session->snapshot) {
> ret = handle_index_data(stream, net_seq_num, rotate_index);
> if (ret < 0) {
> - goto end_rcu_unlock;
> + goto end_stream_unlock;
> }
> }
>
> /* Write data to stream output fd. */
> - size_ret = lttng_write(stream->fd, data_buffer, data_size);
> + size_ret = lttng_write(stream->stream_fd->fd, data_buffer, data_size);
> if (size_ret < data_size) {
> ERR("Relay error writing data to file");
> ret = -1;
> - goto end_rcu_unlock;
> + goto end_stream_unlock;
> }
>
> - DBG2("Relay wrote %d bytes to tracefile for stream id %" PRIu64,
> - ret, stream->stream_handle);
> + DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64,
> + size_ret, stream->stream_handle);
>
> - ret = write_padding_to_file(stream->fd, be32toh(data_hdr.padding_size));
> + ret = write_padding_to_file(stream->stream_fd->fd,
> + be32toh(data_hdr.padding_size));
> if (ret < 0) {
> - goto end_rcu_unlock;
> + goto end_stream_unlock;
> }
> - stream->tracefile_size_current += data_size + be32toh(data_hdr.padding_size);
> -
> + stream->tracefile_size_current +=
> + data_size + be32toh(data_hdr.padding_size);
> stream->prev_seq = net_seq_num;
>
> - try_close_stream(session, stream);
> -
> -end_rcu_unlock:
> - rcu_read_unlock();
> +end_stream_unlock:
> + pthread_mutex_unlock(&stream->lock);
> +end_stream_put:
> + stream_put(stream);
> end:
> return ret;
> }
>
> -static
> -void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
> +static void cleanup_connection_pollfd(struct lttng_poll_event *events, int
> pollfd)
> {
> int ret;
>
> - assert(events);
> -
> (void) lttng_poll_del(events, pollfd);
>
> ret = close(pollfd);
> @@ -2478,27 +2295,36 @@ void cleanup_connection_pollfd(struct lttng_poll_event
> *events, int pollfd)
> }
> }
>
> -static void destroy_connection(struct lttng_ht *relay_connections_ht,
> - struct relay_connection *conn)
> +static void relay_thread_close_connection(struct lttng_poll_event *events,
> + int pollfd, struct relay_connection *conn)
> {
> - assert(relay_connections_ht);
> - assert(conn);
> + const char *type_str;
>
> - connection_delete(relay_connections_ht, conn);
> -
> - /* For the control socket, we try to destroy the session. */
> - if (conn->type == RELAY_CONTROL && conn->session) {
> - destroy_session(conn->session, conn->sessions_ht);
> + switch (conn->type) {
> + case RELAY_DATA:
> + type_str = "Data";
> + break;
> + case RELAY_CONTROL:
> + type_str = "Control";
> + break;
> + case RELAY_VIEWER_COMMAND:
> + type_str = "Viewer Command";
> + break;
> + case RELAY_VIEWER_NOTIFICATION:
> + type_str = "Viewer Notification";
> + break;
> + default:
> + type_str = "Unknown";
> }
> -
> - connection_destroy(conn);
> + cleanup_connection_pollfd(events, pollfd);
> + connection_put(conn);
> + DBG("%s connection closed with %d", type_str, pollfd);
> }
>
> /*
> * This thread does the actual work
> */
> -static
> -void *relay_thread_worker(void *data)
> +static void *relay_thread_worker(void *data)
> {
> int ret, err = -1, last_seen_data_fd = -1;
> uint32_t nb_fd;
> @@ -2506,9 +2332,6 @@ void *relay_thread_worker(void *data)
> struct lttng_ht *relay_connections_ht;
> struct lttng_ht_iter iter;
> struct lttcomm_relayd_hdr recv_hdr;
> - struct relay_local_data *relay_ctx = (struct relay_local_data *) data;
> - struct lttng_ht *sessions_ht = relay_ctx->sessions_ht;
> - struct relay_index *index;
> struct relay_connection *destroy_conn = NULL;
>
> DBG("[thread] Relay worker started");
> @@ -2529,12 +2352,6 @@ void *relay_thread_worker(void *data)
> goto relay_connections_ht_error;
> }
>
> - /* Tables of received indexes indexed by index handle and net_seq_num. */
> - indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_TWO_U64);
> - if (!indexes_ht) {
> - goto indexes_ht_error;
> - }
> -
> ret = create_thread_poll_set(&events, 2);
> if (ret < 0) {
> goto error_poll_create;
> @@ -2569,9 +2386,9 @@ restart:
> nb_fd = ret;
>
> /*
> - * Process control. The control connection is prioritised so we
> - * don't starve it with high throughput tracing data on the data
> - * connection.
> + * Process control. The control connection is
> + * prioritised so we don't starve it with high
> + * throughput tracing data on the data connection.
> */
> for (i = 0; i < nb_fd; i++) {
> /* Fetch once the poll data */
> @@ -2581,7 +2398,10 @@ restart:
> health_code_update();
>
> if (!revents) {
> - /* No activity for this FD (poll implementation). */
> + /*
> + * No activity for this FD (poll
> + * implementation).
> + */
> continue;
> }
>
> @@ -2604,27 +2424,20 @@ restart:
> if (ret < 0) {
> goto error;
> }
> - conn->sessions_ht = sessions_ht;
> - connection_init(conn);
> lttng_poll_add(&events, conn->sock->fd,
> LPOLLIN | LPOLLRDHUP);
> - rcu_read_lock();
> - lttng_ht_add_unique_ulong(relay_connections_ht,
> - &conn->sock_n);
> - rcu_read_unlock();
> + connection_ht_add(relay_connections_ht, conn);
> DBG("Connection socket %d added", conn->sock->fd);
> }
> } else {
> struct relay_connection *ctrl_conn;
>
> - rcu_read_lock();
> - ctrl_conn = connection_find_by_sock(relay_connections_ht, pollfd);
> + ctrl_conn = connection_get_by_sock(relay_connections_ht, pollfd);
> /* If not found, there is a synchronization issue. */
> assert(ctrl_conn);
>
> if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> - cleanup_connection_pollfd(&events, pollfd);
> - destroy_connection(relay_connections_ht, ctrl_conn);
> + relay_thread_close_connection(&events, pollfd, ctrl_conn);
> if (last_seen_data_fd == pollfd) {
> last_seen_data_fd = last_notdel_data_fd;
> }
> @@ -2634,16 +2447,14 @@ restart:
> sizeof(recv_hdr), 0);
> if (ret <= 0) {
> /* Connection closed */
> - cleanup_connection_pollfd(&events, pollfd);
> - destroy_connection(relay_connections_ht, ctrl_conn);
> - DBG("Control connection closed with %d", pollfd);
> + relay_thread_close_connection(&events, pollfd,
> + ctrl_conn);
> } else {
> ret = relay_process_control(&recv_hdr, ctrl_conn);
> if (ret < 0) {
> /* Clear the session on error. */
> - cleanup_connection_pollfd(&events, pollfd);
> - destroy_connection(relay_connections_ht, ctrl_conn);
> - DBG("Connection closed with %d", pollfd);
> + relay_thread_close_connection(&events, pollfd,
> + ctrl_conn);
> }
> seen_control = 1;
> }
> @@ -2658,7 +2469,7 @@ restart:
> } else {
> ERR("Unknown poll events %u for sock %d", revents, pollfd);
> }
> - rcu_read_unlock();
> + connection_put(ctrl_conn);
> }
> }
>
> @@ -2702,26 +2513,22 @@ restart:
> continue;
> }
>
> - rcu_read_lock();
> - data_conn = connection_find_by_sock(relay_connections_ht, pollfd);
> + data_conn = connection_get_by_sock(relay_connections_ht, pollfd);
> if (!data_conn) {
> /* Skip it. Might be removed before. */
> - rcu_read_unlock();
> continue;
> }
>
> if (revents & LPOLLIN) {
> if (data_conn->type != RELAY_DATA) {
> - rcu_read_unlock();
> - continue;
> + goto put_connection;
> }
>
> ret = relay_process_data(data_conn);
> /* Connection closed */
> if (ret < 0) {
> - cleanup_connection_pollfd(&events, pollfd);
> - destroy_connection(relay_connections_ht, data_conn);
> - DBG("Data connection closed with %d", pollfd);
> + relay_thread_close_connection(&events, pollfd,
> + data_conn);
> /*
> * Every goto restart call sets the last seen fd where
> * here we don't really care since we gracefully
> @@ -2730,11 +2537,12 @@ restart:
> } else {
> /* Keep last seen port. */
> last_seen_data_fd = pollfd;
> - rcu_read_unlock();
> + connection_put(data_conn);
> goto restart;
> }
> }
> - rcu_read_unlock();
> + put_connection:
> + connection_put(data_conn);
> }
> last_seen_data_fd = -1;
> }
> @@ -2744,28 +2552,24 @@ restart:
>
> exit:
> error:
> - lttng_poll_clean(&events);
> -
> /* Cleanup reamaining connection object. */
> rcu_read_lock();
> cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter,
> destroy_conn,
> sock_n.node) {
> health_code_update();
> - destroy_connection(relay_connections_ht, destroy_conn);
> + /*
> + * No need to grab another ref, because we own
> + * destroy_conn.
> + */
> + relay_thread_close_connection(&events, destroy_conn->sock->fd,
> + destroy_conn);
> }
> rcu_read_unlock();
> +
> + lttng_poll_clean(&events);
> +
> error_poll_create:
> - rcu_read_lock();
> - cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index,
> - index_n.node) {
> - health_code_update();
> - relay_index_delete(index);
> - relay_index_free_safe(index);
> - }
> - rcu_read_unlock();
> - lttng_ht_destroy(indexes_ht);
> -indexes_ht_error:
> lttng_ht_destroy(relay_connections_ht);
> relay_connections_ht_error:
> /* Close relay conn pipes */
> @@ -2806,7 +2610,6 @@ int main(int argc, char **argv)
> {
> int ret = 0, retval = 0;
> void *status;
> - struct relay_local_data *relay_ctx = NULL;
>
> /* Parse arguments */
> progname = argv[0];
> @@ -2904,16 +2707,9 @@ int main(int argc, char **argv)
> lttcomm_init();
> lttcomm_inet_init();
>
> - relay_ctx = zmalloc(sizeof(struct relay_local_data));
> - if (!relay_ctx) {
> - PERROR("relay_ctx");
> - retval = -1;
> - goto exit_init_data;
> - }
> -
> /* tables of sessions indexed by session ID */
> - relay_ctx->sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
> - if (!relay_ctx->sessions_ht) {
> + sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
> + if (!sessions_ht) {
> retval = -1;
> goto exit_init_data;
> }
> @@ -2960,7 +2756,7 @@ int main(int argc, char **argv)
>
> /* Setup the worker thread */
> ret = pthread_create(&worker_thread, NULL,
> - relay_thread_worker, (void *) relay_ctx);
> + relay_thread_worker, NULL);
> if (ret) {
> errno = ret;
> PERROR("pthread_create worker");
> @@ -2978,7 +2774,7 @@ int main(int argc, char **argv)
> goto exit_listener_thread;
> }
>
> - ret = relayd_live_create(live_uri, relay_ctx);
> + ret = relayd_live_create(live_uri);
> if (ret) {
> ERR("Starting live viewer threads");
> retval = -1;
> @@ -3035,7 +2831,10 @@ exit_init_data:
> health_app_destroy(health_relayd);
> exit_health_app_create:
> exit_options:
> - relayd_cleanup(relay_ctx);
> + relayd_cleanup();
> +
> + /* Ensure all prior call_rcu are done. */
> + rcu_barrier();
>
> if (!retval) {
> exit(EXIT_SUCCESS);
> diff --git a/src/bin/lttng-relayd/session.c b/src/bin/lttng-relayd/session.c
> index 46d9cc6..de00fc3 100644
> --- a/src/bin/lttng-relayd/session.c
> +++ b/src/bin/lttng-relayd/session.c
> @@ -1,6 +1,7 @@
> /*
> * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify it
> * under the terms of the GNU General Public License, version 2 only, as
> @@ -19,28 +20,25 @@
> #define _GNU_SOURCE
> #define _LGPL_SOURCE
> #include <common/common.h>
> +#include <urcu/rculist.h>
>
> +#include "lttng-relayd.h"
> #include "ctf-trace.h"
> #include "session.h"
> #include "stream.h"
>
> /* Global session id used in the session creation. */
> static uint64_t last_relay_session_id;
> -
> -static void rcu_destroy_session(struct rcu_head *head)
> -{
> - struct relay_session *session =
> - caa_container_of(head, struct relay_session, rcu_node);
> -
> - free(session);
> -}
> +static pthread_mutex_t last_relay_session_id_lock = PTHREAD_MUTEX_INITIALIZER;
>
> /*
> * Create a new session by assigning a new session ID.
> *
> * Return allocated session or else NULL.
> */
> -struct relay_session *session_create(void)
> +struct relay_session *session_create(const char *session_name,
> + const char *hostname, uint32_t live_timer,
> + bool snapshot, uint32_t major, uint32_t minor)
> {
> struct relay_session *session;
>
> @@ -57,30 +55,62 @@ struct relay_session *session_create(void)
> goto error;
> }
>
> - pthread_mutex_init(&session->viewer_ready_lock, NULL);
> + pthread_mutex_lock(&last_relay_session_id_lock);
> session->id = ++last_relay_session_id;
> + pthread_mutex_unlock(&last_relay_session_id_lock);
> +
> + session->major = major;
> + session->minor = minor;
> lttng_ht_node_init_u64(&session->session_n, session->id);
> + urcu_ref_init(&session->ref);
> + CDS_INIT_LIST_HEAD(&session->recv_list);
> + pthread_mutex_init(&session->lock, NULL);
> + pthread_mutex_init(&session->reflock, NULL);
> + pthread_mutex_init(&session->recv_list_lock, NULL);
> +
> + strncpy(session->session_name, session_name,
> + sizeof(session->session_name));
> + strncpy(session->hostname, hostname,
> + sizeof(session->hostname));
> + session->live_timer = live_timer;
> + session->snapshot = snapshot;
> +
> + lttng_ht_add_unique_u64(sessions_ht, &session->session_n);
>
> error:
> return session;
> }
>
> +/* Should be called with RCU read-side lock held. */
> +bool session_get(struct relay_session *session)
> +{
> + bool has_ref = false;
> +
> + pthread_mutex_lock(&session->reflock);
> + if (session->ref.refcount != 0) {
> + has_ref = true;
> + urcu_ref_get(&session->ref);
> + }
> + pthread_mutex_unlock(&session->reflock);
> +
> + return has_ref;
> +}
> +
> /*
> - * Lookup a session within the given hash table and session id. RCU read side
> - * lock MUST be acquired before calling this and as long as the caller has a
> - * reference to the object.
> + * Lookup a session within the session hash table using the session id
> + * as key. A session reference is taken when a session is returned.
> + * session_put() must be called on that session.
> *
> * Return session or NULL if not found.
> */
> -struct relay_session *session_find_by_id(struct lttng_ht *ht, uint64_t id)
> +struct relay_session *session_get_by_id(uint64_t id)
> {
> struct relay_session *session = NULL;
> struct lttng_ht_node_u64 *node;
> struct lttng_ht_iter iter;
>
> - assert(ht);
> -
> - lttng_ht_lookup(ht, &id, &iter);
> + rcu_read_lock();
> + lttng_ht_lookup(sessions_ht, &id, &iter);
> node = lttng_ht_iter_get_node_u64(&iter);
> if (!node) {
> DBG("Session find by ID %" PRIu64 " id NOT found", id);
> @@ -88,97 +118,127 @@ struct relay_session *session_find_by_id(struct lttng_ht
> *ht, uint64_t id)
> }
> session = caa_container_of(node, struct relay_session, session_n);
> DBG("Session find by ID %" PRIu64 " id found", id);
> -
> + if (!session_get(session)) {
> + session = NULL;
> + }
> end:
> + rcu_read_unlock();
> return session;
> }
>
> +static void rcu_destroy_session(struct rcu_head *rcu_head)
> +{
> + struct relay_session *session =
> + caa_container_of(rcu_head, struct relay_session, rcu_node);
> +
> + free(session);
> +}
> +
> /*
> * Delete session from the given hash table.
> *
> * Return lttng ht del error code being 0 on success and 1 on failure.
> */
> -int session_delete(struct lttng_ht *ht, struct relay_session *session)
> +static int session_delete(struct relay_session *session)
> {
> struct lttng_ht_iter iter;
>
> - assert(ht);
> - assert(session);
> -
> iter.iter.node = &session->session_n.node;
> - return lttng_ht_del(ht, &iter);
> + return lttng_ht_del(sessions_ht, &iter);
> }
>
> -/*
> - * The caller MUST be from the viewer thread since the viewer refcount is
> - * decremented. With this calue down to 0, it will try to destroy the session.
> - */
> -void session_viewer_try_destroy(struct lttng_ht *ht,
> - struct relay_session *session)
> +
> +static void destroy_session(struct relay_session *session)
> +{
> + int ret;
> +
> + ret = session_delete(session);
> + assert(!ret);
> + /*
> + * Since each trace has a reference on the session, it means
> + * that if we are at the point where we teardown the session, no
> + * trace belonging to that session exist at this point.
> + */
> + lttng_ht_destroy(session->ctf_traces_ht);
> + call_rcu(&session->rcu_node, rcu_destroy_session);
> +}
> +
> +void session_release(struct urcu_ref *ref)
> {
> - unsigned long ret_ref;
> + struct relay_session *session =
> + caa_container_of(ref, struct relay_session, ref);
>
> - assert(session);
> + destroy_session(session);
> +}
>
> - ret_ref = uatomic_add_return(&session->viewer_refcount, -1);
> - if (ret_ref == 0) {
> - session_try_destroy(ht, session);
> - }
> +void session_put(struct relay_session *session)
> +{
> + rcu_read_lock();
> + pthread_mutex_lock(&session->reflock);
> + urcu_ref_put(&session->ref, session_release);
> + pthread_mutex_unlock(&session->reflock);
> + rcu_read_unlock();
> }
>
> -/*
> - * Should only be called from the main streaming thread since it does not touch
> - * the viewer refcount. If this refcount is down to 0, destroy the session only
> - * and only if the session deletion succeeds. This is done because the viewer
> - * *and* the streaming thread can both concurently try to destroy the session
> - * thus the first come first serve.
> - */
> -void session_try_destroy(struct lttng_ht *ht, struct relay_session *session)
> +int session_close(struct relay_session *session)
> {
> int ret = 0;
> - unsigned long ret_ref;
> -
> - assert(session);
> + struct ctf_trace *trace;
> + struct lttng_ht_iter iter;
> + struct relay_stream *stream;
> +
> + pthread_mutex_lock(&session->lock);
> + DBG("closing session %" PRIu64 ": is conn already closed %d",
> + session->id, session->connection_closed);
> + if (session->connection_closed) {
> + ret = -1;
> + goto unlock;
> + }
> + session->connection_closed = true;
> +unlock:
> + pthread_mutex_unlock(&session->lock);
> + if (ret) {
> + return ret;
> + }
>
> - ret_ref = uatomic_read(&session->viewer_refcount);
> - if (ret_ref == 0 && session->close_flag) {
> - if (ht) {
> - ret = session_delete(ht, session);
> - }
> - if (!ret) {
> - /* Only destroy the session if the deletion was successful. */
> - session_destroy(session);
> + rcu_read_lock();
> + cds_lfht_for_each_entry(session->ctf_traces_ht->ht,
> + &iter.iter, trace, node.node) {
> + ret = ctf_trace_close(trace);
> + if (ret) {
> + goto rcu_unlock;
> }
> }
> + cds_list_for_each_entry_rcu(stream, &session->recv_list,
> + recv_node) {
> + stream_close(stream);
> + }
> +rcu_unlock:
> + rcu_read_unlock();
> + if (ret) {
> + return ret;
> + }
> + /* Put self-reference from create. */
> + session_put(session);
> + return 0;
> }
>
> -/*
> - * Destroy a session object.
> - *
> - * This function must *NOT* be called with an RCU read lock held since
> - * the session's ctf_traces_ht is destroyed.
> - */
> -void session_destroy(struct relay_session *session)
> +void print_sessions(void)
> {
> - struct ctf_trace *ctf_trace;
> struct lttng_ht_iter iter;
> + struct relay_session *session;
>
> - assert(session);
> -
> - DBG("Relay destroying session %" PRIu64, session->id);
> -
> - /*
> - * Empty the ctf trace hash table which will destroy the stream contained
> - * in that table.
> - */
> rcu_read_lock();
> - cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace,
> - node.node) {
> - ctf_trace_delete(session->ctf_traces_ht, ctf_trace);
> - ctf_trace_destroy(ctf_trace);
> + cds_lfht_for_each_entry(sessions_ht->ht, &iter.iter, session,
> + session_n.node) {
> + if (!session_get(session)) {
> + continue;
> + }
> + DBG("session %p refcount %ld session %" PRIu64,
> + session,
> + session->ref.refcount,
> + session->id);
> + session_put(session);
> }
> rcu_read_unlock();
> - lttng_ht_destroy(session->ctf_traces_ht);
> -
> - call_rcu(&session->rcu_node, rcu_destroy_session);
> }
> diff --git a/src/bin/lttng-relayd/session.h b/src/bin/lttng-relayd/session.h
> index cb125be..0126e12 100644
> --- a/src/bin/lttng-relayd/session.h
> +++ b/src/bin/lttng-relayd/session.h
> @@ -1,6 +1,10 @@
> +#ifndef _SESSION_H
> +#define _SESSION_H
> +
> /*
> * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify it
> * under the terms of the GNU General Public License, version 2 only, as
> @@ -16,13 +20,11 @@
> * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
> */
>
> -#ifndef _SESSION_H
> -#define _SESSION_H
> -
> #include <limits.h>
> #include <inttypes.h>
> #include <pthread.h>
> #include <urcu/list.h>
> +#include <urcu/ref.h>
>
> #include <common/hashtable/hashtable.h>
>
> @@ -31,35 +33,59 @@
> */
> struct relay_session {
> /*
> - * This session id is used to identify a set of stream to a tracing session
> - * but also make sure we have a unique session id associated with a session
> - * daemon which can provide multiple data source.
> + * This session id is used to identify a set of stream to a
> + * tracing session but also make sure we have a unique session
> + * id associated with a session daemon which can provide
> + * multiple data source.
> */
> uint64_t id;
> char session_name[NAME_MAX];
> char hostname[HOST_NAME_MAX];
> uint32_t live_timer;
> - struct lttng_ht_node_u64 session_n;
> - struct rcu_head rcu_node;
> - uint32_t stream_count;
> +
> /* Tell if this session is for a snapshot or not. */
> - unsigned int snapshot:1;
> - /* Tell if the session has been closed on the streaming side. */
> - unsigned int close_flag:1;
> + bool snapshot;
> +
> + /*
> + * Session has no back reference to its connection because it
> + * has a life-time that can be longer than the consumer connection
> + * life-time: a reference can still be held by the viewer
> + * connection.
> + */
> +
> + /* Reference count of ctf-traces and viewers using the session. */
> + struct urcu_ref ref;
> + /* session reflock nests inside ctf_trace reflock. */
> + pthread_mutex_t reflock;
> +
> + pthread_mutex_t lock;
> +
> + /*
> + * major/minor version used for this session.
> + */
> + uint32_t major;
> + uint32_t minor;
>
> - /* Number of viewer using it. Set to 0, it should be destroyed. */
> - int viewer_refcount;
> + bool viewer_attached;
> + /* Tell if the session connection has been closed on the streaming side. */
> + bool connection_closed;
>
> /* Contains ctf_trace object of that session indexed by path name. */
> struct lttng_ht *ctf_traces_ht;
>
> /*
> - * Indicate version protocol for this session. This is especially useful
> - * for the data thread that has no idea which version it operates on since
> - * linking control/data sockets is non trivial.
> + * This contains streams that are received on that connection.
> + * It's used to store them until we get the streams sent
> + * command. When this is received, we remove those streams for
> + * the list and publish them.
> + * Updates are protected by the recv_list_lock.
> + * Traversals are protected by RCU.
> + * recv_list_lock also protects stream_count.
> */
> - uint64_t minor;
> - uint64_t major;
> + struct cds_list_head recv_list; /* RCU list. */
> + uint32_t stream_count;
> + pthread_mutex_t recv_list_lock;
> +
> /*
> * Flag checked and exchanged with uatomic_cmpxchg to tell the
> * viewer-side if new streams got added since the last check.
> @@ -67,50 +93,26 @@ struct relay_session {
> unsigned long new_streams;
>
> /*
> - * Used to synchronize the process where we flag every streams readiness
> - * for the viewer when the streams_sent message is received and the viewer
> - * process of sending those streams.
> + * Node in the global session hash table.
> */
> - pthread_mutex_t viewer_ready_lock;
> -
> + struct lttng_ht_node_u64 session_n;
> /*
> * Member of the session list in struct relay_viewer_session.
> + * Updates are protected by the relay_viewer_session
> + * session_list_lock. Traversals are protected by RCU.
> */
> - struct cds_list_head viewer_session_list;
> + struct cds_list_head viewer_session_node;
> + struct rcu_head rcu_node; /* For call_rcu teardown. */
> };
>
> -struct relay_viewer_session {
> - struct cds_list_head sessions_head;
> -};
> -
> -static inline void session_viewer_attach(struct relay_session *session)
> -{
> - uatomic_inc(&session->viewer_refcount);
> -}
> -
> -static inline void session_viewer_detach(struct relay_session *session)
> -{
> - uatomic_add(&session->viewer_refcount, -1);
> -}
> -
> -struct relay_session *session_find_by_id(struct lttng_ht *ht, uint64_t id);
> -struct relay_session *session_create(void);
> -int session_delete(struct lttng_ht *ht, struct relay_session *session);
> +struct relay_session *session_create(const char *session_name,
> + const char *hostname, uint32_t live_timer,
> + bool snapshot, uint32_t major, uint32_t minor);
> +struct relay_session *session_get_by_id(uint64_t id);
> +bool session_get(struct relay_session *session);
> +void session_put(struct relay_session *session);
>
> -/*
> - * Direct destroy without reading the refcount.
> - */
> -void session_destroy(struct relay_session *session);
> -
> -/*
> - * Destroy the session if the refcount is down to 0.
> - */
> -void session_try_destroy(struct lttng_ht *ht, struct relay_session *session);
> -
> -/*
> - * Decrement the viewer refcount and destroy it if down to 0.
> - */
> -void session_viewer_try_destroy(struct lttng_ht *ht,
> - struct relay_session *session);
> +int session_close(struct relay_session *session);
> +void print_sessions(void);
>
> #endif /* _SESSION_H */
> diff --git a/src/bin/lttng-relayd/stream-fd.c b/src/bin/lttng-relayd/stream-fd.c
> new file mode 100644
> index 0000000..57324d7
> --- /dev/null
> +++ b/src/bin/lttng-relayd/stream-fd.c
> @@ -0,0 +1,58 @@
> +/*
> + * Copyright (C) 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> + *
> + * This program is free software; you can redistribute it and/or modify it
> + * under the terms of the GNU General Public License, version 2 only, as
> + * published by the Free Software Foundation.
> + *
> + * This program is distributed in the hope that it will be useful, but WITHOUT
> + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
> + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
> + * more details.
> + *
> + * You should have received a copy of the GNU General Public License along with
> + * this program; if not, write to the Free Software Foundation, Inc., 51
> + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
> + */
> +
> +#define _GNU_SOURCE
> +#define _LGPL_SOURCE
> +#include <common/common.h>
> +
> +#include "stream-fd.h"
> +
> +struct stream_fd *stream_fd_create(int fd)
> +{
> + struct stream_fd *sf;
> +
> + sf = zmalloc(sizeof(*sf));
> + if (!sf) {
> + goto end;
> + }
> + urcu_ref_init(&sf->ref);
> + sf->fd = fd;
> +end:
> + return sf;
> +}
> +
> +void stream_fd_get(struct stream_fd *sf)
> +{
> + urcu_ref_get(&sf->ref);
> +}
> +
> +static void stream_fd_release(struct urcu_ref *ref)
> +{
> + struct stream_fd *sf = caa_container_of(ref, struct stream_fd, ref);
> + int ret;
> +
> + ret = close(sf->fd);
> + if (ret) {
> + PERROR("Error closing stream FD %d", sf->fd);
> + }
> + free(sf);
> +}
> +
> +void stream_fd_put(struct stream_fd *sf)
> +{
> + urcu_ref_put(&sf->ref, stream_fd_release);
> +}
> diff --git a/src/bin/lttng-relayd/stream-fd.h b/src/bin/lttng-relayd/stream-fd.h
> new file mode 100644
> index 0000000..64f3b16
> --- /dev/null
> +++ b/src/bin/lttng-relayd/stream-fd.h
> @@ -0,0 +1,32 @@
> +#ifndef _STREAM_FD_H
> +#define _STREAM_FD_H
> +
> +/*
> + * Copyright (C) 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> + *
> + * This program is free software; you can redistribute it and/or modify it
> + * under the terms of the GNU General Public License, version 2 only, as
> + * published by the Free Software Foundation.
> + *
> + * This program is distributed in the hope that it will be useful, but WITHOUT
> + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
> + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
> + * more details.
> + *
> + * You should have received a copy of the GNU General Public License along with
> + * this program; if not, write to the Free Software Foundation, Inc., 51
> + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
> + */
> +
> +#include <urcu/ref.h>
> +
> +struct stream_fd {
> + int fd;
> + struct urcu_ref ref;
> +};
> +
> +struct stream_fd *stream_fd_create(int fd);
> +void stream_fd_get(struct stream_fd *sf);
> +void stream_fd_put(struct stream_fd *sf);
> +
> +#endif /* _STREAM_FD_H */
> diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c
> index 17a5bcd..5525806 100644
> --- a/src/bin/lttng-relayd/stream.c
> +++ b/src/bin/lttng-relayd/stream.c
> @@ -1,6 +1,7 @@
> /*
> * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify it
> * under the terms of the GNU General Public License, version 2 only, as
> @@ -19,128 +20,332 @@
> #define _GNU_SOURCE
> #define _LGPL_SOURCE
> #include <common/common.h>
> +#include <common/utils.h>
> +#include <common/defaults.h>
> +#include <urcu/rculist.h>
> +#include <sys/stat.h>
>
> +#include "lttng-relayd.h"
> #include "index.h"
> #include "stream.h"
> #include "viewer-stream.h"
>
> +/* Should be called with RCU read-side lock held. */
> +bool stream_get(struct relay_stream *stream)
> +{
> + bool has_ref = false;
> +
> + pthread_mutex_lock(&stream->reflock);
> + if (stream->ref.refcount != 0) {
> + has_ref = true;
> + urcu_ref_get(&stream->ref);
> + }
> + pthread_mutex_unlock(&stream->reflock);
> +
> + return has_ref;
> +}
> +
> /*
> - * Get stream from stream id from the given hash table. Return stream if found
> - * else NULL.
> - *
> - * Need to be called with RCU read-side lock held.
> + * Get stream from stream id from the streams hash table. Return stream
> + * if found else NULL. A stream reference is taken when a stream is
> + * returned. stream_put() must be called on that stream.
> */
> -struct relay_stream *stream_find_by_id(struct lttng_ht *ht,
> - uint64_t stream_id)
> +struct relay_stream *stream_get_by_id(uint64_t stream_id)
> {
> struct lttng_ht_node_u64 *node;
> struct lttng_ht_iter iter;
> struct relay_stream *stream = NULL;
>
> - assert(ht);
> -
> - lttng_ht_lookup(ht, &stream_id, &iter);
> + rcu_read_lock();
> + lttng_ht_lookup(relay_streams_ht, &stream_id, &iter);
> node = lttng_ht_iter_get_node_u64(&iter);
> - if (node == NULL) {
> + if (!node) {
> DBG("Relay stream %" PRIu64 " not found", stream_id);
> goto end;
> }
> stream = caa_container_of(node, struct relay_stream, node);
> -
> + if (!stream_get(stream)) {
> + stream = NULL;
> + }
> end:
> + rcu_read_unlock();
> return stream;
> }
>
> /*
> - * Close a given stream. If an assosiated viewer stream exists it is updated.
> - *
> - * RCU read side lock MUST be acquired.
> - *
> - * Return 0 if close was successful or 1 if already closed.
> + * We keep ownership of path_name and channel_name.
> */
> -int stream_close(struct relay_session *session, struct relay_stream *stream)
> +struct relay_stream *stream_create(struct ctf_trace *trace,
> + uint64_t stream_handle, char *path_name,
> + char *channel_name, uint64_t tracefile_size,
> + uint64_t tracefile_count)
> {
> - int delret, ret;
> - struct relay_viewer_stream *vstream;
> - struct ctf_trace *ctf_trace;
> + int ret;
> + struct relay_stream *stream = NULL;
> + struct relay_session *session = trace->session;
>
> - assert(stream);
> + stream = zmalloc(sizeof(struct relay_stream));
> + if (stream == NULL) {
> + PERROR("relay stream zmalloc");
> + ret = -1;
> + goto error_no_alloc;
> + }
>
> - pthread_mutex_lock(&stream->lock);
> + stream->stream_handle = stream_handle;
> + stream->prev_seq = -1ULL;
> + stream->ctf_stream_id = -1ULL;
> + stream->tracefile_size = tracefile_size;
> + stream->tracefile_count = tracefile_count;
> + stream->path_name = path_name;
> + stream->channel_name = channel_name;
> + lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
> + pthread_mutex_init(&stream->lock, NULL);
> + pthread_mutex_init(&stream->reflock, NULL);
> + urcu_ref_init(&stream->ref);
> + ctf_trace_get(trace);
> + stream->trace = trace;
>
> - if (stream->terminated_flag) {
> - /* This stream is already closed. Ignore. */
> - ret = 1;
> - goto end_unlock;
> + stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
> + if (!stream->indexes_ht) {
> + ERR("Cannot created indexes_ht");
> + ret = -1;
> + goto end;
> }
>
> - DBG("Closing stream id %" PRIu64, stream->stream_handle);
> + ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG);
> + if (ret < 0) {
> + ERR("relay creating output directory");
> + goto end;
> + }
>
> - if (stream->fd >= 0) {
> - delret = close(stream->fd);
> - if (delret < 0) {
> - PERROR("close stream");
> + /*
> + * No need to use run_as API here because whatever we receives,
> + * the relayd uses its own credentials for the stream files.
> + */
> + ret = utils_create_stream_file(stream->path_name, stream->channel_name,
> + stream->tracefile_size, 0, relayd_uid, relayd_gid, NULL);
> + if (ret < 0) {
> + ERR("Create output file");
> + goto end;
> + }
> + stream->stream_fd = stream_fd_create(ret);
> + if (!stream->stream_fd) {
> + if (close(ret)) {
> + PERROR("Error closing file %d", ret);
> }
> + ret = -1;
> + goto end;
> + }
> + if (stream->tracefile_size) {
> + DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name);
> + } else {
> + DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name);
> + }
> +
> + if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, NAME_MAX)) {
> + stream->is_metadata = 1;
> }
>
> - if (stream->index_fd >= 0) {
> - delret = close(stream->index_fd);
> - if (delret < 0) {
> - PERROR("close stream index_fd");
> + stream->in_recv_list = true;
> +
> + /*
> + * Add the stream in the recv list of the session. Once the end stream
> + * message is received, all session streams are published.
> + */
> + pthread_mutex_lock(&session->recv_list_lock);
> + cds_list_add_rcu(&stream->recv_node, &session->recv_list);
> + session->stream_count++;
> + pthread_mutex_unlock(&session->recv_list_lock);
> +
> + /*
> + * Both in the ctf_trace object and the global stream ht since the data
> + * side of the relayd does not have the concept of session.
> + */
> + lttng_ht_add_unique_u64(relay_streams_ht, &stream->node);
> +
> + DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
> + stream->stream_handle);
> + ret = 0;
> +
> +end:
> + if (ret) {
> + if (stream->stream_fd) {
> + stream_fd_put(stream->stream_fd);
> + stream->stream_fd = NULL;
> }
> + stream_put(stream);
> + stream = NULL;
> }
> + return stream;
> +
> +error_no_alloc:
> + /*
> + * path_name and channel_name need to be freed explicitly here
> + * because we cannot rely on stream_put().
> + */
> + free(path_name);
> + free(channel_name);
> + return NULL;
> +}
> +
> +/*
> + * Called with the session lock held.
> + */
> +void stream_publish(struct relay_stream *stream)
> +{
> + struct relay_session *session;
>
> - vstream = viewer_stream_find_by_id(stream->stream_handle);
> - if (vstream) {
> - /*
> - * Set the last good value into the viewer stream. This is done
> - * right before the stream gets deleted from the hash table. The
> - * lookup failure on the live thread side of a stream indicates
> - * that the viewer stream index received value should be used.
> - */
> - pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
> - vstream->total_index_received = stream->total_index_received;
> - vstream->tracefile_count_last = stream->tracefile_count_current;
> - vstream->close_write_flag = 1;
> - pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
> + pthread_mutex_lock(&stream->lock);
> + if (stream->published) {
> + goto unlock;
> }
>
> - /* Cleanup index of that stream. */
> - relay_index_destroy_by_stream_id(stream->stream_handle);
> + session = stream->trace->session;
>
> - ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
> - stream->path_name);
> - assert(ctf_trace);
> - ctf_trace_put_ref(ctf_trace);
> + pthread_mutex_lock(&session->recv_list_lock);
> + if (stream->in_recv_list) {
> + cds_list_del_rcu(&stream->recv_node);
> + stream->in_recv_list = false;
> + }
> + pthread_mutex_unlock(&session->recv_list_lock);
>
> - stream->close_flag = 1;
> - stream->terminated_flag = 1;
> - ret = 0;
> + pthread_mutex_lock(&stream->trace->stream_list_lock);
> + cds_list_add_rcu(&stream->stream_node, &stream->trace->stream_list);
> + pthread_mutex_unlock(&stream->trace->stream_list_lock);
>
> -end_unlock:
> + stream->published = true;
> +unlock:
> pthread_mutex_unlock(&stream->lock);
> - return ret;
> }
>
> -void stream_delete(struct lttng_ht *ht, struct relay_stream *stream)
> +/*
> + * Only called from destroy. No stream lock needed, since there is a
> + * single user at this point. This is ensured by having the refcount
> + * reaching 0.
> + */
> +static void stream_unpublish(struct relay_stream *stream)
> +{
> + if (!stream->published) {
> + return;
> + }
> + pthread_mutex_lock(&stream->trace->stream_list_lock);
> + cds_list_del_rcu(&stream->stream_node);
> + pthread_mutex_unlock(&stream->trace->stream_list_lock);
> +
> + stream->published = false;
> +}
> +
> +static void stream_destroy(struct relay_stream *stream)
> +{
> + if (stream->indexes_ht) {
> + lttng_ht_destroy(stream->indexes_ht);
> + }
> + free(stream->path_name);
> + free(stream->channel_name);
> + free(stream);
> +}
> +
> +static void stream_destroy_rcu(struct rcu_head *rcu_head)
> {
> + struct relay_stream *stream =
> + caa_container_of(rcu_head, struct relay_stream, rcu_node);
> +
> + stream_destroy(stream);
> +}
> +
> +static void stream_release(struct urcu_ref *ref)
> +{
> + struct relay_stream *stream =
> + caa_container_of(ref, struct relay_stream, ref);
> + struct relay_session *session;
> int ret;
> struct lttng_ht_iter iter;
>
> - assert(ht);
> - assert(stream);
> + session = stream->trace->session;
> +
> + DBG("Releasing stream id %" PRIu64, stream->stream_handle);
> +
> + pthread_mutex_lock(&session->recv_list_lock);
> + session->stream_count--;
> + if (stream->in_recv_list) {
> + cds_list_del_rcu(&stream->recv_node);
> + stream->in_recv_list = false;
> + }
> + pthread_mutex_unlock(&session->recv_list_lock);
>
> iter.iter.node = &stream->node.node;
> - ret = lttng_ht_del(ht, &iter);
> + ret = lttng_ht_del(relay_streams_ht, &iter);
> assert(!ret);
>
> - cds_list_del(&stream->trace_list);
> + stream_unpublish(stream);
> +
> + if (stream->stream_fd) {
> + stream_fd_put(stream->stream_fd);
> + stream->stream_fd = NULL;
> + }
> + if (stream->index_fd) {
> + stream_fd_put(stream->index_fd);
> + stream->index_fd = NULL;
> + }
> + if (stream->trace) {
> + ctf_trace_put(stream->trace);
> + stream->trace = NULL;
> + }
> +
> + call_rcu(&stream->rcu_node, stream_destroy_rcu);
> }
>
> -void stream_destroy(struct relay_stream *stream)
> +void stream_put(struct relay_stream *stream)
> {
> - assert(stream);
> - free(stream->path_name);
> - free(stream->channel_name);
> - free(stream);
> + DBG("stream put for stream id %" PRIu64, stream->stream_handle);
> + /*
> + * Ensure existance of stream->reflock for stream unlock.
> + */
> + rcu_read_lock();
> + /*
> + * Stream reflock ensures that concurrent test and update of
> + * stream ref is atomic.
> + */
> + pthread_mutex_lock(&stream->reflock);
> + assert(stream->ref.refcount != 0);
> + /*
> + * Wait until we have processed all the stream packets before
> + * actually putting our last stream reference.
> + */
> + DBG("stream put stream id %" PRIu64 " refcount %d",
> + stream->stream_handle,
> + (int) stream->ref.refcount);
> + urcu_ref_put(&stream->ref, stream_release);
> + pthread_mutex_unlock(&stream->reflock);
> + rcu_read_unlock();
> +}
> +
> +void stream_close(struct relay_stream *stream)
> +{
> + DBG("closing stream %" PRIu64, stream->stream_handle);
> + relay_index_close_all(stream);
> + stream_put(stream);
> +}
> +
> +void print_relay_streams(void)
> +{
> + struct lttng_ht_iter iter;
> + struct relay_stream *stream;
> +
> + rcu_read_lock();
> + cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
> + node.node) {
> + if (!stream_get(stream)) {
> + continue;
> + }
> + DBG("stream %p refcount %ld stream %" PRIu64 " trace %" PRIu64
> + " session %" PRIu64,
> + stream,
> + stream->ref.refcount,
> + stream->stream_handle,
> + stream->trace->id,
> + stream->trace->session->id);
> + stream_put(stream);
> + }
> + rcu_read_unlock();
> }
> diff --git a/src/bin/lttng-relayd/stream.h b/src/bin/lttng-relayd/stream.h
> index 4dd2e62..92a809f 100644
> --- a/src/bin/lttng-relayd/stream.h
> +++ b/src/bin/lttng-relayd/stream.h
> @@ -1,6 +1,10 @@
> +#ifndef _STREAM_H
> +#define _STREAM_H
> +
> /*
> * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify it
> * under the terms of the GNU General Public License, version 2 only, as
> @@ -16,9 +20,6 @@
> * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
> */
>
> -#ifndef _STREAM_H
> -#define _STREAM_H
> -
> #include <limits.h>
> #include <inttypes.h>
> #include <pthread.h>
> @@ -27,100 +28,111 @@
> #include <common/hashtable/hashtable.h>
>
> #include "session.h"
> +#include "stream-fd.h"
>
> /*
> * Represents a stream in the relay
> */
> struct relay_stream {
> uint64_t stream_handle;
> - uint64_t prev_seq; /* previous data sequence number encountered */
> - struct lttng_ht_node_u64 node;
> +
> /*
> - * When we receive a stream, it gets stored in a list (on a per connection
> - * basis) until we have all the streams of the same channel and the metadata
> - * associated with it, then it gets flagged with viewer_ready.
> + * reflock used to synchronize the closing of this stream.
> + * stream reflock nests inside viewer stream reflock.
> + * stream reflock nests inside index reflock.
> */
> - struct cds_list_head recv_list;
> + pthread_mutex_t reflock;
> + struct urcu_ref ref;
> + /* Back reference to trace. Protected by refcount on trace object. */
> + struct ctf_trace *trace;
>
> - /* Added to the corresponding ctf_trace. */
> - struct cds_list_head trace_list;
> - struct rcu_head rcu_node;
> - uint64_t session_id;
> - int fd;
> + /*
> + * To protect from concurrent read/update. The viewer stream
> + * lock nests inside the stream lock. The stream lock nests
> + * inside the ctf_trace lock.
> + */
> + pthread_mutex_t lock;
> + uint64_t prev_seq; /* previous data sequence number encountered */
> + uint64_t last_net_seq_num; /* seq num to encounter before closing. */
> +
> + /* FD on which to write the stream data. */
> + struct stream_fd *stream_fd;
> /* FD on which to write the index data. */
> - int index_fd;
> - /* FD on which to read the index data for the viewer. */
> - int read_index_fd;
> + struct stream_fd *index_fd;
>
> char *path_name;
> char *channel_name;
> +
> /* on-disk circular buffer of tracefiles */
> uint64_t tracefile_size;
> uint64_t tracefile_size_current;
> uint64_t tracefile_count;
> - uint64_t tracefile_count_current;
> + uint64_t current_tracefile_id;
> +
> /* To inform the viewer up to where it can go back in time. */
> uint64_t oldest_tracefile_id;
>
> - uint64_t total_index_received;
> - uint64_t last_net_seq_num;
> -
> + struct lttng_ht *indexes_ht;
> /*
> - * To protect from concurrent read/update. Also used to synchronize the
> - * closing of this stream.
> + * Counts number of indexes in indexes_ht. Redundant info.
> + * Protected by stream lock.
> */
> - pthread_mutex_t lock;
> + uint64_t total_index_received;
>
> + bool closed; /* Stream is closed. */
> +
> + int indexes_in_flight;
> /*
> - * If the stream is inactive, this field is updated with the live beacon
> - * timestamp end, when it is active, this field == -1ULL.
> + * If the stream is inactive, this field is updated with the
> + * live beacon timestamp end, when it is active, this
> + * field == -1ULL.
> */
> uint64_t beacon_ts_end;
> /*
> - * Number of indexes that are supposed to be complete soon.
> - * Avoid sending the inactivity beacon to the client when data is in
> - * transit.
> - */
> - int indexes_in_flight;
> - /*
> * CTF stream ID, -1ULL when unset.
> */
> uint64_t ctf_stream_id;
> - /*
> - * To protect the update of the close_write_flag and the checks of
> - * the tracefile_count_current.
> - * It is taken before checking whenever we need to know if the
> - * writer and reader are working in the same tracefile.
> - */
> - pthread_mutex_t viewer_stream_rotation_lock;
>
> - /* Information telling us when to close the stream */
> - unsigned int close_flag:1;
> + /* Indicate if the stream was initialized for a data pending command. */
> + bool data_pending_check_done;
> +
> + /* Is this stream a metadata stream ? */
> + int32_t is_metadata;
> + uint64_t metadata_received;
> +
> /*
> - * Indicates if the stream has been effectively closed thus having the
> - * information in it invalidated but NOT freed. The stream lock MUST be
> - * held to read/update that value.
> + * Member of the stream list in struct ctf_trace.
> + * Updates are protected by the stream_list_lock.
> + * Traversals are protected by RCU.
> */
> - unsigned int terminated_flag:1;
> - /* Indicate if the stream was initialized for a data pending command. */
> - unsigned int data_pending_check_done:1;
> - unsigned int metadata_flag:1;
> + struct cds_list_head stream_node;
> /*
> - * To detect when we start overwriting old data, it is used to
> - * update the oldest_tracefile_id.
> + * Temporary list belonging to the connection until all streams
> + * are received for that connection.
> + * Member of the stream recv list in the connection.
> + * Updates are protected by the stream_recv_list_lock.
> + * Traversals are protected by RCU.
> */
> - unsigned int tracefile_overwrite:1;
> + bool in_recv_list;
> + struct cds_list_head recv_node;
> + bool published; /* Protected by session lock. */
> /*
> - * Can this stream be used by a viewer or are we waiting for additional
> - * information.
> + * Node of stream within global stream hash table.
> */
> - unsigned int viewer_ready:1;
> + struct lttng_ht_node_u64 node;
> + struct rcu_head rcu_node; /* For call_rcu teardown. */
> };
>
> -struct relay_stream *stream_find_by_id(struct lttng_ht *ht,
> - uint64_t stream_id);
> -int stream_close(struct relay_session *session, struct relay_stream *stream);
> -void stream_delete(struct lttng_ht *ht, struct relay_stream *stream);
> -void stream_destroy(struct relay_stream *stream);
> +struct relay_stream *stream_create(struct ctf_trace *trace,
> + uint64_t stream_handle, char *path_name,
> + char *channel_name, uint64_t tracefile_size,
> + uint64_t tracefile_count);
> +
> +struct relay_stream *stream_get_by_id(uint64_t stream_id);
> +bool stream_get(struct relay_stream *stream);
> +void stream_put(struct relay_stream *stream);
> +void stream_close(struct relay_stream *stream);
> +void stream_publish(struct relay_stream *stream);
> +void print_relay_streams(void);
>
> #endif /* _STREAM_H */
> diff --git a/src/bin/lttng-relayd/utils.h b/src/bin/lttng-relayd/utils.h
> index de1521d..4a56980 100644
> --- a/src/bin/lttng-relayd/utils.h
> +++ b/src/bin/lttng-relayd/utils.h
> @@ -1,6 +1,10 @@
> +#ifndef RELAYD_UTILS_H
> +#define RELAYD_UTILS_H
> +
> /*
> * Copyright (C) 2012 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify
> * it under the terms of the GNU General Public License, version 2 only,
> @@ -16,9 +20,6 @@
> * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
> */
>
> -#ifndef RELAYD_UTILS_H
> -#define RELAYD_UTILS_H
> -
> char *create_output_path(char *path_name);
>
> #endif /* RELAYD_UTILS_H */
> diff --git a/src/bin/lttng-relayd/viewer-session.c
> b/src/bin/lttng-relayd/viewer-session.c
> new file mode 100644
> index 0000000..ff2e41b
> --- /dev/null
> +++ b/src/bin/lttng-relayd/viewer-session.c
> @@ -0,0 +1,164 @@
> +/*
> + * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
> + * David Goulet <dgoulet at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> + *
> + * This program is free software; you can redistribute it and/or modify it
> + * under the terms of the GNU General Public License, version 2 only, as
> + * published by the Free Software Foundation.
> + *
> + * This program is distributed in the hope that it will be useful, but WITHOUT
> + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
> + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
> + * more details.
> + *
> + * You should have received a copy of the GNU General Public License along with
> + * this program; if not, write to the Free Software Foundation, Inc., 51
> + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
> + */
> +
> +#define _GNU_SOURCE
> +#define _LGPL_SOURCE
> +#include <common/common.h>
> +#include <urcu/rculist.h>
> +
> +#include "lttng-relayd.h"
> +#include "ctf-trace.h"
> +#include "session.h"
> +#include "viewer-session.h"
> +#include "viewer-stream.h"
> +#include "stream.h"
> +
> +struct relay_viewer_session *viewer_session_create(void)
> +{
> + struct relay_viewer_session *vsession;
> +
> + vsession = zmalloc(sizeof(*vsession));
> + if (!vsession) {
> + goto end;
> + }
> + CDS_INIT_LIST_HEAD(&vsession->session_list);
> +end:
> + return vsession;
> +}
> +
> +int viewer_session_attach(struct relay_viewer_session *vsession,
> + struct relay_session *session)
> +{
> + int ret = 0;
> +
> + if (!session_get(session)) {
> + ret = -1;
> + goto end;
> + }
> + pthread_mutex_lock(&session->lock);
> + if (session->viewer_attached) {
> + ret = -1;
> + } else {
> + session->viewer_attached = true;
> + }
> + pthread_mutex_unlock(&session->lock);
> +
> + if (!ret) {
> + pthread_mutex_lock(&vsession->session_list_lock);
> + cds_list_add_rcu(&session->viewer_session_node,
> + &vsession->session_list);
> + pthread_mutex_unlock(&vsession->session_list_lock);
> + } else {
> + session_put(session);
> + }
> +end:
> + return ret;
> +}
> +
> +static int viewer_session_detach(struct relay_viewer_session *vsession,
> + struct relay_session *session)
> +{
> + int ret = 0;
> +
> + pthread_mutex_lock(&session->lock);
> + if (!session->viewer_attached) {
> + ret = -1;
> + } else {
> + session->viewer_attached = false;
> + }
> + pthread_mutex_unlock(&session->lock);
> +
> + if (!ret) {
> + pthread_mutex_lock(&vsession->session_list_lock);
> + cds_list_del_rcu(&session->viewer_session_node);
> + pthread_mutex_unlock(&vsession->session_list_lock);
> + session_put(session);
> + }
> + return ret;
> +}
> +
> +void viewer_session_destroy(struct relay_viewer_session *vsession)
> +{
> + free(vsession);
> +}
> +
> +void viewer_session_close(struct relay_viewer_session *vsession)
> +{
> + struct relay_session *session;
> +
> + rcu_read_lock();
> + cds_list_for_each_entry_rcu(session,
> + &vsession->session_list, viewer_session_node) {
> + struct lttng_ht_iter iter;
> + struct relay_viewer_stream *vstream;
> +
> + /*
> + * TODO: improvement: create more efficient list of
> + * vstream per session.
> + */
> + cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter,
> + vstream, stream_n.node) {
> + if (!viewer_stream_get(vstream)) {
> + continue;
> + }
> + if (vstream->stream->trace->session != session) {
> + viewer_stream_put(vstream);
> + continue;
> + }
> + viewer_stream_put(vstream); /* put local ref */
> + viewer_stream_put(vstream); /* release ownership */
> + }
> +
> + viewer_session_detach(vsession, session);
> + }
> + rcu_read_unlock();
> +}
> +
> +/*
> + * Check if a connection is attached to a session.
> + * Return 1 if attached, 0 if not attached, a negative value on error.
> + */
> +int viewer_session_is_attached(struct relay_viewer_session *vsession,
> + struct relay_session *session)
> +{
> + struct relay_session *iter;
> + int found = 0;
> +
> + pthread_mutex_lock(&session->lock);
> + if (!vsession) {
> + goto end;
> + }
> + if (!session->viewer_attached) {
> + goto end;
> + }
> + rcu_read_lock();
> + cds_list_for_each_entry_rcu(iter,
> + &vsession->session_list,
> + viewer_session_node) {
> + if (session == iter) {
> + found = 1;
> + goto end;
> + }
> + }
> +end:
> + pthread_mutex_unlock(&session->lock);
> + rcu_read_unlock();
> + return found;
> +}
> +
> diff --git a/src/bin/lttng-relayd/viewer-session.h
> b/src/bin/lttng-relayd/viewer-session.h
> new file mode 100644
> index 0000000..4013b35
> --- /dev/null
> +++ b/src/bin/lttng-relayd/viewer-session.h
> @@ -0,0 +1,53 @@
> +#ifndef _VIEWER_SESSION_H
> +#define _VIEWER_SESSION_H
> +
> +/*
> + * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
> + * David Goulet <dgoulet at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> + *
> + * This program is free software; you can redistribute it and/or modify it
> + * under the terms of the GNU General Public License, version 2 only, as
> + * published by the Free Software Foundation.
> + *
> + * This program is distributed in the hope that it will be useful, but WITHOUT
> + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
> + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
> + * more details.
> + *
> + * You should have received a copy of the GNU General Public License along with
> + * this program; if not, write to the Free Software Foundation, Inc., 51
> + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
> + */
> +
> +#include <limits.h>
> +#include <inttypes.h>
> +#include <pthread.h>
> +#include <urcu/list.h>
> +#include <urcu/ref.h>
> +
> +#include <common/hashtable/hashtable.h>
> +
> +#include "session.h"
> +
> +struct relay_viewer_session {
> + /*
> + * Session list. Updates are protected by the session_list_lock.
> + * Traversals are protected by RCU.
> + * This list limits the design to having the sessions in at most
> + * one viewer session.
> + */
> + struct cds_list_head session_list; /* RCU list. */
> + pthread_mutex_t session_list_lock; /* Protects list updates. */
> +};
> +
> +struct relay_viewer_session *viewer_session_create(void);
> +void viewer_session_destroy(struct relay_viewer_session *vsession);
> +void viewer_session_close(struct relay_viewer_session *vsession);
> +
> +int viewer_session_attach(struct relay_viewer_session *vsession,
> + struct relay_session *session);
> +int viewer_session_is_attached(struct relay_viewer_session *vsession,
> + struct relay_session *session);
> +
> +#endif /* _VIEWER_SESSION_H */
> diff --git a/src/bin/lttng-relayd/viewer-stream.c
> b/src/bin/lttng-relayd/viewer-stream.c
> index 3748629..6ecb83e 100644
> --- a/src/bin/lttng-relayd/viewer-stream.c
> +++ b/src/bin/lttng-relayd/viewer-stream.c
> @@ -1,6 +1,7 @@
> /*
> * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify it
> * under the terms of the GNU General Public License, version 2 only, as
> @@ -24,39 +25,32 @@
> #include "lttng-relayd.h"
> #include "viewer-stream.h"
>
> -static void free_stream(struct relay_viewer_stream *stream)
> +static void viewer_stream_destroy(struct relay_viewer_stream *vstream)
> {
> - assert(stream);
> -
> - free(stream->path_name);
> - free(stream->channel_name);
> - free(stream);
> + free(vstream->path_name);
> + free(vstream->channel_name);
> + free(vstream);
> }
>
> -static void deferred_free_viewer_stream(struct rcu_head *head)
> +static void viewer_stream_destroy_rcu(struct rcu_head *head)
> {
> - struct relay_viewer_stream *stream =
> + struct relay_viewer_stream *vstream =
> caa_container_of(head, struct relay_viewer_stream, rcu_node);
>
> - free_stream(stream);
> + viewer_stream_destroy(vstream);
> }
>
> struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
> - enum lttng_viewer_seek seek_t, struct ctf_trace *ctf_trace)
> + enum lttng_viewer_seek seek_t)
> {
> struct relay_viewer_stream *vstream;
>
> - assert(stream);
> - assert(ctf_trace);
> -
> vstream = zmalloc(sizeof(*vstream));
> if (!vstream) {
> PERROR("relay viewer stream zmalloc");
> goto error;
> }
>
> - vstream->session_id = stream->session_id;
> - vstream->stream_handle = stream->stream_handle;
> vstream->path_name = strndup(stream->path_name, LTTNG_VIEWER_PATH_MAX);
> if (vstream->path_name == NULL) {
> PERROR("relay viewer path_name alloc");
> @@ -68,216 +62,285 @@ struct relay_viewer_stream *viewer_stream_create(struct
> relay_stream *stream,
> PERROR("relay viewer channel_name alloc");
> goto error;
> }
> - vstream->tracefile_count = stream->tracefile_count;
> - vstream->metadata_flag = stream->metadata_flag;
> - vstream->tracefile_count_last = -1ULL;
>
> switch (seek_t) {
> case LTTNG_VIEWER_SEEK_BEGINNING:
> - vstream->tracefile_count_current = stream->oldest_tracefile_id;
> + vstream->current_tracefile_id = stream->oldest_tracefile_id;
> break;
> case LTTNG_VIEWER_SEEK_LAST:
> - vstream->tracefile_count_current = stream->tracefile_count_current;
> + vstream->current_tracefile_id = stream->current_tracefile_id;
> break;
> default:
> - assert(0);
> goto error;
> }
> -
> - if (vstream->metadata_flag) {
> - ctf_trace->viewer_metadata_stream = vstream;
> + if (!stream_get(stream)) {
> + ERR("Cannot get stream");
> + goto error;
> }
> + vstream->stream = stream;
>
> - /* Globally visible after the add unique. */
> - lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle);
> - lttng_ht_add_unique_u64(viewer_streams_ht, &vstream->stream_n);
> -
> - vstream->index_read_fd = -1;
> - vstream->read_fd = -1;
> -
> - /*
> - * This is to avoid a race between the initialization of this object and
> - * the close of the given stream. If the stream is unable to find this
> - * viewer stream when closing, this copy will at least take the latest
> - * value. We also need that for the seek_last.
> - */
> - vstream->total_index_received = stream->total_index_received;
> -
> + pthread_mutex_lock(&stream->lock);
> /*
> * If we never received an index for the current stream, delay the opening
> * of the index, otherwise open it right now.
> */
> - if (vstream->tracefile_count_current == stream->tracefile_count_current
> - && vstream->total_index_received == 0) {
> - vstream->index_read_fd = -1;
> + if (vstream->current_tracefile_id == stream->current_tracefile_id
> + && stream->total_index_received == 0) {
> + vstream->index_fd = NULL;
> } else {
> int read_fd;
>
> read_fd = index_open(vstream->path_name, vstream->channel_name,
> - vstream->tracefile_count, vstream->tracefile_count_current);
> + stream->tracefile_count,
> + vstream->current_tracefile_id);
> if (read_fd < 0) {
> + pthread_mutex_unlock(&stream->lock);
> + goto error;
> + }
> + vstream->index_fd = stream_fd_create(read_fd);
> + if (!vstream->index_fd) {
> + if (close(read_fd)) {
> + PERROR("close");
> + }
> goto error;
> }
> - vstream->index_read_fd = read_fd;
> }
>
> - if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_read_fd >= 0) {
> + if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_fd) {
> off_t lseek_ret;
>
> - lseek_ret = lseek(vstream->index_read_fd,
> - vstream->total_index_received * sizeof(struct ctf_packet_index),
> - SEEK_CUR);
> + lseek_ret = lseek(vstream->index_fd->fd, 0, SEEK_END);
> if (lseek_ret < 0) {
> + pthread_mutex_unlock(&stream->lock);
> goto error;
> }
> - vstream->last_sent_index = vstream->total_index_received;
> + vstream->last_sent_index = stream->total_index_received;
> + }
> + pthread_mutex_unlock(&stream->lock);
> +
> + if (stream->is_metadata) {
> + stream->trace->viewer_metadata_stream = vstream;
> }
>
> + /* Globally visible after the add unique. */
> + lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle);
> + lttng_ht_add_unique_u64(viewer_streams_ht, &vstream->stream_n);
> +
> + pthread_mutex_init(&vstream->lock, NULL);
> + pthread_mutex_init(&vstream->reflock, NULL);
> + urcu_ref_init(&vstream->ref);
> +
> return vstream;
>
> error:
> if (vstream) {
> - free_stream(vstream);
> + viewer_stream_destroy(vstream);
> }
> return NULL;
> }
>
> -void viewer_stream_delete(struct relay_viewer_stream *stream)
> +static void viewer_stream_unpublish(struct relay_viewer_stream *vstream)
> {
> int ret;
> struct lttng_ht_iter iter;
>
> - iter.iter.node = &stream->stream_n.node;
> + iter.iter.node = &vstream->stream_n.node;
> ret = lttng_ht_del(viewer_streams_ht, &iter);
> assert(!ret);
> }
>
> -void viewer_stream_destroy(struct ctf_trace *ctf_trace,
> - struct relay_viewer_stream *stream)
> +static void viewer_stream_release(struct urcu_ref *ref)
> {
> - int ret;
> -
> - assert(stream);
> + struct relay_viewer_stream *vstream = caa_container_of(ref,
> + struct relay_viewer_stream, ref);
>
> - if (ctf_trace) {
> - ctf_trace_put_ref(ctf_trace);
> + if (vstream->stream->is_metadata) {
> + rcu_assign_pointer(vstream->stream->trace->viewer_metadata_stream, NULL);
> }
>
> - if (stream->read_fd >= 0) {
> - ret = close(stream->read_fd);
> - if (ret < 0) {
> - PERROR("close read_fd");
> - }
> + viewer_stream_unpublish(vstream);
> +
> + if (vstream->stream_fd) {
> + stream_fd_put(vstream->stream_fd);
> + vstream->stream_fd = NULL;
> }
> - if (stream->index_read_fd >= 0) {
> - ret = close(stream->index_read_fd);
> - if (ret < 0) {
> - PERROR("close index_read_fd");
> - }
> + if (vstream->index_fd) {
> + stream_fd_put(vstream->index_fd);
> + vstream->index_fd = NULL;
> + }
> + if (vstream->stream) {
> + stream_put(vstream->stream);
> + vstream->stream = NULL;
> + }
> + call_rcu(&vstream->rcu_node, viewer_stream_destroy_rcu);
> +}
> +
> +/* Should be called with RCU read-side lock held. */
> +bool viewer_stream_get(struct relay_viewer_stream *vstream)
> +{
> + bool has_ref = false;
> +
> + pthread_mutex_lock(&vstream->reflock);
> + if (vstream->ref.refcount != 0) {
> + has_ref = true;
> + urcu_ref_get(&vstream->ref);
> }
> + pthread_mutex_unlock(&vstream->reflock);
>
> - call_rcu(&stream->rcu_node, deferred_free_viewer_stream);
> + return has_ref;
> }
>
> /*
> - * Find viewer stream by id. RCU read side lock MUST be acquired.
> + * Get viewer stream by id.
> *
> - * Return stream if found else NULL.
> + * Return viewer stream if found else NULL.
> */
> -struct relay_viewer_stream *viewer_stream_find_by_id(uint64_t id)
> +struct relay_viewer_stream *viewer_stream_get_by_id(uint64_t id)
> {
> struct lttng_ht_node_u64 *node;
> struct lttng_ht_iter iter;
> - struct relay_viewer_stream *stream = NULL;
> + struct relay_viewer_stream *vstream = NULL;
>
> + rcu_read_lock();
> lttng_ht_lookup(viewer_streams_ht, &id, &iter);
> node = lttng_ht_iter_get_node_u64(&iter);
> if (!node) {
> DBG("Relay viewer stream %" PRIu64 " not found", id);
> goto end;
> }
> - stream = caa_container_of(node, struct relay_viewer_stream, stream_n);
> -
> + vstream = caa_container_of(node, struct relay_viewer_stream, stream_n);
> + if (!viewer_stream_get(vstream)) {
> + vstream = NULL;
> + }
> end:
> - return stream;
> + rcu_read_unlock();
> + return vstream;
> +}
> +
> +void viewer_stream_put(struct relay_viewer_stream *vstream)
> +{
> + rcu_read_lock();
> + pthread_mutex_lock(&vstream->reflock);
> + urcu_ref_put(&vstream->ref, viewer_stream_release);
> + pthread_mutex_unlock(&vstream->reflock);
> + rcu_read_unlock();
> +}
> +
> +/*
> + * Returns whether the current tracefile is readable. If not, it has
> + * been overwritten.
> + * Must be called with rstream and vstream locks held.
> + */
> +bool viewer_stream_is_tracefile_id_readable(struct relay_viewer_stream
> *vstream,
> + uint64_t id)
> +{
> + struct relay_stream *stream = vstream->stream;
> +
> + if (stream->oldest_tracefile_id <= stream->current_tracefile_id) {
> + if (id >= stream->oldest_tracefile_id
> + && id <= stream->current_tracefile_id) {
> + /* id is a readable file. */
> + return true;
> + } else {
> + /* id is not readable. */
> + return false;
> + }
> + } else {
> + if (id >= stream->oldest_tracefile_id
> + || id <= stream->current_tracefile_id) {
> + /* id is a readable file. */
> + return true;
> + } else {
> + /* id is not readable. */
> + return false;
> + }
> + }
> }
>
> /*
> * Rotate a stream to the next tracefile.
> *
> - * Must be called with viewer_stream_rotation_lock held.
> + * Must be called with rstream and vstream locks held.
> * Returns 0 on success, 1 on EOF, a negative value on error.
> */
> -int viewer_stream_rotate(struct relay_viewer_stream *vstream,
> - struct relay_stream *stream)
> +int viewer_stream_rotate(struct relay_viewer_stream *vstream)
> {
> int ret;
> - uint64_t tracefile_id;
> -
> - assert(vstream);
> - assert(stream);
> -
> - if (vstream->tracefile_count == 0) {
> - /* Ignore rotation, there is none to do. */
> - ret = 0;
> - goto end;
> - }
> -
> - tracefile_id = (vstream->tracefile_count_current + 1) %
> - vstream->tracefile_count;
> + uint64_t new_id;
> + struct relay_stream *stream = vstream->stream;
>
> /* Detect the last tracefile to open. */
> - if (vstream->tracefile_count_last != -1ULL &&
> - vstream->tracefile_count_last ==
> - vstream->tracefile_count_current) {
> + if (stream->total_index_received == vstream->last_sent_index
> + && stream->trace->session->connection_closed) {
> ret = 1;
> goto end;
> }
>
> - /*
> - * The writer and the reader are not working in the same tracefile, we can
> - * read up to EOF, we don't care about the total_index_received.
> - */
> - if (stream->close_flag || (stream->tracefile_count_current != tracefile_id)) {
> - vstream->close_write_flag = 1;
> - } else {
> - /*
> - * We are opening a file that is still open in write, make sure we
> - * limit our reading to the number of indexes received.
> - */
> - vstream->close_write_flag = 0;
> - if (stream->close_flag) {
> - vstream->total_index_received = stream->total_index_received;
> - }
> + if (stream->tracefile_count == 0) {
> + /* Ignore rotation, there is none to do. */
> + ret = 0;
> + goto end;
> }
> - vstream->tracefile_count_current = tracefile_id;
>
> - ret = close(vstream->index_read_fd);
> - if (ret < 0) {
> - PERROR("close index file %d", vstream->index_read_fd);
> + new_id = (vstream->current_tracefile_id + 1) % stream->tracefile_count;
> + DBG("viewer_stream_rotate: stream handle %" PRIu64
> + " new_id %" PRIu64 " tfcount %" PRIu64
> + " oldestid %" PRIu64 " isnewidreadable %d",
> + stream->stream_handle, new_id, stream->tracefile_count,
> + stream->oldest_tracefile_id,
> + viewer_stream_is_tracefile_id_readable(vstream, new_id));
> + if (!viewer_stream_is_tracefile_id_readable(vstream, new_id)) {
> + new_id = stream->oldest_tracefile_id;
> }
> - vstream->index_read_fd = -1;
> + vstream->current_tracefile_id = new_id;
>
> - ret = close(vstream->read_fd);
> - if (ret < 0) {
> - PERROR("close tracefile %d", vstream->read_fd);
> + if (vstream->index_fd) {
> + stream_fd_put(vstream->index_fd);
> + vstream->index_fd = NULL;
> + }
> + if (vstream->stream_fd) {
> + stream_fd_put(vstream->stream_fd);
> + vstream->stream_fd = NULL;
> }
> - vstream->read_fd = -1;
> -
> - pthread_mutex_lock(&vstream->overwrite_lock);
> - vstream->abort_flag = 0;
> - pthread_mutex_unlock(&vstream->overwrite_lock);
>
> ret = index_open(vstream->path_name, vstream->channel_name,
> - vstream->tracefile_count, vstream->tracefile_count_current);
> + stream->tracefile_count,
> + vstream->current_tracefile_id);
> if (ret < 0) {
> - goto error;
> + goto end;
> + }
> + vstream->index_fd = stream_fd_create(ret);
> + if (vstream->index_fd) {
> + ret = 0;
> + } else {
> + if (close(ret)) {
> + PERROR("close");
> + }
> + ret = -1;
> }
> - vstream->index_read_fd = ret;
> -
> - ret = 0;
> -
> end:
> -error:
> return ret;
> }
> +
> +void print_viewer_streams(void)
> +{
> + struct lttng_ht_iter iter;
> + struct relay_viewer_stream *vstream;
> +
> + rcu_read_lock();
> + cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream,
> + stream_n.node) {
> + if (!viewer_stream_get(vstream)) {
> + continue;
> + }
> + DBG("vstream %p refcount %ld stream %" PRIu64 " trace %" PRIu64
> + " session %" PRIu64,
> + vstream,
> + vstream->ref.refcount,
> + vstream->stream->stream_handle,
> + vstream->stream->trace->id,
> + vstream->stream->trace->session->id);
> + viewer_stream_put(vstream);
> + }
> + rcu_read_unlock();
> +}
> diff --git a/src/bin/lttng-relayd/viewer-stream.h
> b/src/bin/lttng-relayd/viewer-stream.h
> index 003b119..a19ffe0 100644
> --- a/src/bin/lttng-relayd/viewer-stream.h
> +++ b/src/bin/lttng-relayd/viewer-stream.h
> @@ -1,6 +1,10 @@
> +#ifndef _VIEWER_STREAM_H
> +#define _VIEWER_STREAM_H
> +
> /*
> * Copyright (C) 2013 - Julien Desfossez <jdesfossez at efficios.com>
> * David Goulet <dgoulet at efficios.com>
> + * 2015 - Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> *
> * This program is free software; you can redistribute it and/or modify it
> * under the terms of the GNU General Public License, version 2 only, as
> @@ -16,9 +20,6 @@
> * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
> */
>
> -#ifndef _VIEWER_STREAM_H
> -#define _VIEWER_STREAM_H
> -
> #include <limits.h>
> #include <inttypes.h>
> #include <pthread.h>
> @@ -33,57 +34,49 @@
> struct relay_stream;
>
> /*
> - * Shadow copy of the relay_stream structure for the viewer side. The only
> - * fields updated by the writer (streaming side) after allocation are :
> - * total_index_received and close_flag. Everything else is updated by the
> - * reader (viewer side).
> + * Shadow copy of the relay_stream structure for the viewer side.
> */
> struct relay_viewer_stream {
> - uint64_t stream_handle;
> - uint64_t session_id;
> - int read_fd;
> - int index_read_fd;
> + struct urcu_ref ref;
> + pthread_mutex_t reflock;
> +
> + /*
> + * This lock nests inside the stream lock.
> + */
> + pthread_mutex_t lock;
> +
> + /* Back ref to stream */
> + struct relay_stream *stream;
> +
> + /* FD from which to read the stream data. */
> + struct stream_fd *stream_fd;
> + /* FD from which to read the index data. */
> + struct stream_fd *index_fd;
> +
> char *path_name;
> char *channel_name;
> +
> + uint64_t current_tracefile_id;
> uint64_t last_sent_index;
> - uint64_t total_index_received;
> - uint64_t tracefile_count;
> - uint64_t tracefile_count_current;
> - /* Stop after reading this tracefile. */
> - uint64_t tracefile_count_last;
> +
> + /* Indicates if this stream has been sent to a viewer client. */
> + bool sent_flag;
> + /* For metadata stream, how much metadata has been sent. */
> + uint64_t metadata_sent;
> +
> struct lttng_ht_node_u64 stream_n;
> struct rcu_head rcu_node;
> - struct ctf_trace *ctf_trace;
> - /*
> - * This lock blocks only when the writer is about to start overwriting
> - * a file currently read by the reader.
> - *
> - * This is nested INSIDE the viewer_stream_rotation_lock.
> - */
> - pthread_mutex_t overwrite_lock;
> - /* Information telling us if the stream is a metadata stream. */
> - unsigned int metadata_flag:1;
> - /*
> - * Information telling us that the stream is closed in write, so
> - * we don't expect new indexes and we can read up to EOF.
> - */
> - unsigned int close_write_flag:1;
> - /*
> - * If the streaming side closes a FD in use in the viewer side,
> - * it sets this flag to inform that it is a normal error.
> - */
> - unsigned int abort_flag:1;
> - /* Indicates if this stream has been sent to a viewer client. */
> - unsigned int sent_flag:1;
> };
>
> struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
> - enum lttng_viewer_seek seek_t, struct ctf_trace *ctf_trace);
> -struct relay_viewer_stream *viewer_stream_find_by_id(uint64_t id);
> -void viewer_stream_destroy(struct ctf_trace *ctf_trace,
> - struct relay_viewer_stream *stream);
> -void viewer_stream_delete(struct relay_viewer_stream *stream);
> -int viewer_stream_rotate(struct relay_viewer_stream *vstream,
> - struct relay_stream *stream);
> + enum lttng_viewer_seek seek_t);
> +
> +struct relay_viewer_stream *viewer_stream_get_by_id(uint64_t id);
> +bool viewer_stream_get(struct relay_viewer_stream *vstream);
> +void viewer_stream_put(struct relay_viewer_stream *vstream);
> +int viewer_stream_rotate(struct relay_viewer_stream *vstream);
> +bool viewer_stream_is_tracefile_id_readable(struct relay_viewer_stream
> *vstream,
> + uint64_t id);
> +void print_viewer_streams(void);
>
> #endif /* _VIEWER_STREAM_H */
> diff --git a/src/bin/lttng-sessiond/consumer.c
> b/src/bin/lttng-sessiond/consumer.c
> index 87d5f34..0c5ffff 100644
> --- a/src/bin/lttng-sessiond/consumer.c
> +++ b/src/bin/lttng-sessiond/consumer.c
> @@ -1077,11 +1077,8 @@ error:
> }
>
> /*
> - * Ask the consumer if the data is ready to read (NOT pending) for the specific
> - * session id.
> - *
> - * This function has a different behavior with the consumer i.e. that it waits
> - * for a reply from the consumer if yes or no the data is pending.
> + * Ask the consumer if the data is pending for the specific session id.
> + * Returns 1 if data is pending, 0 otherwise, or < 0 on error.
> */
> int consumer_is_data_pending(uint64_t session_id,
> struct consumer_output *consumer)
> diff --git a/src/bin/lttng-sessiond/ust-consumer.c
> b/src/bin/lttng-sessiond/ust-consumer.c
> index bacbf92..4eaf341 100644
> --- a/src/bin/lttng-sessiond/ust-consumer.c
> +++ b/src/bin/lttng-sessiond/ust-consumer.c
> @@ -245,14 +245,13 @@ int ust_consumer_ask_channel(struct ust_app_session
> *ua_sess,
> }
>
> pthread_mutex_lock(socket->lock);
> -
> ret = ask_channel_creation(ua_sess, ua_chan, consumer, socket, registry);
> + pthread_mutex_unlock(socket->lock);
> if (ret < 0) {
> goto error;
> }
>
> error:
> - pthread_mutex_unlock(socket->lock);
> return ret;
> }
>
> diff --git a/src/common/index/ctf-index.h b/src/common/index/ctf-index.h
> index 0efa888..1f38d9a 100644
> --- a/src/common/index/ctf-index.h
> +++ b/src/common/index/ctf-index.h
> @@ -44,7 +44,7 @@ struct ctf_packet_index_file_hdr {
> } __attribute__((__packed__));
>
> /*
> - * Packet index generated for each trace packet store in a trace file.
> + * Packet index generated for each trace packet stored in a trace file.
> * All integer fields are stored in big endian.
> */
> struct ctf_packet_index {
> diff --git a/src/common/index/index.c b/src/common/index/index.c
> index 35cff53..3fce35d 100644
> --- a/src/common/index/index.c
> +++ b/src/common/index/index.c
> @@ -59,6 +59,19 @@ int index_create_file(char *path_name, char *stream_name, int
> uid, int gid,
> }
> }
>
> + /*
> + * For tracefile rotation. We need to unlink the old
> + * file if present to synchronize with the tail of the
> + * live viewer which could be working on this same file.
> + * By doing so, any reference to the old index file
> + * stays valid even if we re-create a new file with the
> + * same name afterwards.
> + */
> + ret = utils_unlink_stream_file(fullpath, stream_name, size, count, uid,
> + gid, DEFAULT_INDEX_FILE_SUFFIX);
> + if (ret < 0 && errno != ENOENT) {
> + goto error;
> + }
> ret = utils_create_stream_file(fullpath, stream_name, size, count, uid,
> gid, DEFAULT_INDEX_FILE_SUFFIX);
> if (ret < 0) {
> diff --git a/src/common/utils.c b/src/common/utils.c
> index 766f224..7681367 100644
> --- a/src/common/utils.c
> +++ b/src/common/utils.c
> @@ -594,21 +594,18 @@ error:
> }
>
> /*
> - * Create the stream tracefile on disk.
> - *
> - * Return 0 on success or else a negative value.
> + * path is the output parameter. It needs to be PATH_MAX len.
> */
> -LTTNG_HIDDEN
> -int utils_create_stream_file(const char *path_name, char *file_name, uint64_t
> size,
> - uint64_t count, int uid, int gid, char *suffix)
> +static int utils_stream_file_name(char *path,
> + const char *path_name, const char *file_name,
> + uint64_t size, uint64_t count,
> + const char *suffix)
> {
> - int ret, out_fd, flags, mode;
> - char full_path[PATH_MAX], *path_name_suffix = NULL, *path;
> + int ret;
> + char full_path[PATH_MAX];
> + char *path_name_suffix = NULL;
> char *extra = NULL;
>
> - assert(path_name);
> - assert(file_name);
> -
> ret = snprintf(full_path, sizeof(full_path), "%s/%s",
> path_name, file_name);
> if (ret < 0) {
> @@ -639,9 +636,37 @@ int utils_create_stream_file(const char *path_name, char
> *file_name, uint64_t si
> PERROR("Allocating path name with extra string");
> goto error_free_suffix;
> }
> - path = path_name_suffix;
> + strncpy(path, path_name_suffix, PATH_MAX - 1);
> + path[PATH_MAX - 1] = '\0';
> } else {
> - path = full_path;
> + strncpy(path, full_path, PATH_MAX - 1);
> + }
> + path[PATH_MAX - 1] = '\0';
> + ret = 0;
> +
> + free(path_name_suffix);
> +error_free_suffix:
> + free(extra);
> +error:
> + return ret;
> +}
> +
> +/*
> + * Create the stream tracefile on disk.
> + *
> + * Return 0 on success or else a negative value.
> + */
> +LTTNG_HIDDEN
> +int utils_create_stream_file(const char *path_name, char *file_name, uint64_t
> size,
> + uint64_t count, int uid, int gid, char *suffix)
> +{
> + int ret, out_fd, flags, mode;
> + char path[PATH_MAX];
> +
> + ret = utils_stream_file_name(path, path_name, file_name,
> + size, count, suffix);
> + if (ret < 0) {
> + goto error;
> }
>
> flags = O_WRONLY | O_CREAT | O_TRUNC;
> @@ -655,19 +680,49 @@ int utils_create_stream_file(const char *path_name, char
> *file_name, uint64_t si
> }
> if (out_fd < 0) {
> PERROR("open stream path %s", path);
> - goto error_open;
> + goto error;
> }
> ret = out_fd;
>
> -error_open:
> - free(path_name_suffix);
> -error_free_suffix:
> - free(extra);
> error:
> return ret;
> }
>
> /*
> + * Unlink the stream tracefile from disk.
> + *
> + * Return 0 on success or else a negative value.
> + */
> +LTTNG_HIDDEN
> +int utils_unlink_stream_file(const char *path_name, char *file_name, uint64_t
> size,
> + uint64_t count, int uid, int gid, char *suffix)
> +{
> + int ret;
> + char path[PATH_MAX];
> +
> + ret = utils_stream_file_name(path, path_name, file_name,
> + size, count, suffix);
> + if (ret < 0) {
> + goto error;
> + }
> + if (uid < 0 || gid < 0) {
> + ret = unlink(path);
> + } else {
> + ret = run_as_unlink(path, uid, gid);
> + if (ret < 0) {
> + errno = -ret;
> + ret = -1;
> + }
> + }
> + if (ret < 0) {
> + goto error;
> + }
> +error:
> + DBG("utils_unlink_stream_file %s returns %d", path, ret);
> + return ret;
> +}
> +
> +/*
> * Change the output tracefile according to the given size and count The
> * new_count pointer is set during this operation.
> *
> @@ -693,7 +748,25 @@ int utils_rotate_stream_file(char *path_name, char
> *file_name, uint64_t size,
> }
>
> if (count > 0) {
> + /*
> + * In tracefile rotation, for the relay daemon we need
> + * to unlink the old file is present, because it may
> + * still be open in reading by the live thread, and we
> + * need to ensure that we do not overwrite the content
> + * between get_index and get_packet. Since we have no
> + * way to verify integrity of the data content compared
> + * to the associated index, we need to ensure the reader
> + * has exclusive access to the file content, and that
> + * the open of the data file is performed in get_index.
> + * Unlinking the old file rather than overwriting it
> + * achieves this.
> + */
> *new_count = (*new_count + 1) % count;
> + ret = utils_unlink_stream_file(path_name, file_name,
> + size, *new_count, uid, gid, 0);
> + if (ret < 0 && errno != ENOENT) {
> + goto error;
> + }
> } else {
> (*new_count)++;
> }
> diff --git a/src/common/utils.h b/src/common/utils.h
> index 05914cc..8bc5a3f 100644
> --- a/src/common/utils.h
> +++ b/src/common/utils.h
> @@ -40,6 +40,8 @@ int utils_create_pid_file(pid_t pid, const char *filepath);
> int utils_mkdir_recursive(const char *path, mode_t mode);
> int utils_create_stream_file(const char *path_name, char *file_name, uint64_t
> size,
> uint64_t count, int uid, int gid, char *suffix);
> +int utils_unlink_stream_file(const char *path_name, char *file_name, uint64_t
> size,
> + uint64_t count, int uid, int gid, char *suffix);
> int utils_rotate_stream_file(char *path_name, char *file_name, uint64_t size,
> uint64_t count, int uid, int gid, int out_fd, uint64_t *new_count,
> int *stream_fd);
> --
> 2.1.4
--
Mathieu Desnoyers
EfficiOS Inc.
http://www.efficios.com
More information about the lttng-dev
mailing list