[lttng-dev] [RFC PATCH v2 12/13] Fix: delay termination on consumerd to allow metadata flushing
Jérémie Galarneau
jeremie.galarneau at efficios.com
Thu Dec 14 01:57:49 UTC 2017
On 18 September 2017 at 18:52, Jonathan Rajotte
<jonathan.rajotte-julien at efficios.com> wrote:
> Move consumerd ownership to thread_manage_consumer to scope the lifetime
> on the consumerd to its manager thread.
>
> "thread_manage_consumer" is responsible for signaling and waiting the
> termination of its consumerd.
>
> All thread_manage_consumer threads now wait on a unique quit pipe
> different from the global thread quit pipe. This allow control over its
> lifetime.
>
> The termination notification is sent during sessiond_cleanup after the
> destroy session command to ensure that no session are still active at
> the moment the consumerds are terminated.
>
> Signed-off-by: Jonathan Rajotte <jonathan.rajotte-julien at efficios.com>
> ---
> src/bin/lttng-sessiond/main.c | 174 +++++++++++++++++++++++++++---------------
> 1 file changed, 112 insertions(+), 62 deletions(-)
>
> diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c
> index a840e8de..fb58ab4b 100644
> --- a/src/bin/lttng-sessiond/main.c
> +++ b/src/bin/lttng-sessiond/main.c
> @@ -206,6 +206,7 @@ static int kernel_poll_pipe[2] = { -1, -1 };
> static int thread_quit_pipe[2] = { -1, -1 };
> static int thread_health_teardown_trigger_pipe[2] = { -1, -1 };
> static int thread_apps_teardown_trigger_pipe[2] = { -1, -1 };
> +static int thread_consumers_teardown_trigger_pipe[2] = { -1, -1 };
> int thread_apps_notify_teardown_trigger_pipe[2] = { -1, -1 };
>
> /*
> @@ -495,6 +496,11 @@ static int init_thread_apps_notify_teardown_trigger_pipe(void)
> return __init_thread_quit_pipe(thread_apps_notify_teardown_trigger_pipe);
> }
>
> +static int init_thread_consumers_teardown_trigger_pipe(void)
> +{
> + return __init_thread_quit_pipe(thread_consumers_teardown_trigger_pipe);
> +}
> +
> /*
> * Stop first wave threads by closing the thread quit pipe.
> * - kernel thread
> @@ -601,14 +607,15 @@ static int generate_lock_file_path(char *path, size_t len)
> /*
> * Wait on consumer process termination.
> *
> - * Need to be called with the consumer data lock held or from a context
> - * ensuring no concurrent access to data (e.g: cleanup).
> + * Need to be called with the consumer data lock held.
> */
> static void wait_consumer(struct consumer_data *consumer_data)
> {
> pid_t ret;
> int status;
>
> + assert(consumer_data);
> +
> if (consumer_data->pid <= 0) {
> return;
> }
> @@ -626,6 +633,52 @@ static void wait_consumer(struct consumer_data *consumer_data)
> }
>
> /*
> + * Signal to the consumer process to terminate.
> + *
> + * Need to be called with the consumer data lock held.
> + */
> +static void kill_consumer(struct consumer_data *consumer_data)
> +{
> + int ret;
> +
> + assert(consumer_data);
> +
> + /* Consumer pid must be a real one. */
> + if (consumer_data->pid <= 0) {
> + goto end;
> + }
> +
> + ret = kill(consumer_data->pid, SIGTERM);
> + if (ret) {
> + PERROR("Error killing consumer daemon");
> + goto end;
> + }
> +end:
> + return;
> +}
> +
> +static int join_thread_consumer(struct consumer_data *consumer_data)
> +{
> + int ret;
> + void *status;
> +
> + assert(consumer_data);
> +
> + /* Consumer pid must be a real one. */
> + if (consumer_data->pid <= 0) {
> + ret = 0;
> + goto end;
> + }
> +
> + ret = pthread_join(consumer_data->thread, &status);
> + if (ret) {
> + ERR("Joining consumer thread pid %d", consumer_data->pid);
> + }
> +end:
> + return ret;
> +}
> +
> +/*
> * Cleanup the session daemon's data structures.
> */
> static void sessiond_cleanup(void)
> @@ -707,7 +760,6 @@ static void sessiond_cleanup(void)
> (void) rmdir(path);
>
> DBG("Cleaning up all sessions");
> -
> /* Destroy session list mutex */
> if (session_list_ptr != NULL) {
> pthread_mutex_destroy(&session_list_ptr->lock);
> @@ -719,9 +771,35 @@ static void sessiond_cleanup(void)
> }
> }
>
> - wait_consumer(&kconsumer_data);
> - wait_consumer(&ustconsumer64_data);
> - wait_consumer(&ustconsumer32_data);
> + /*
> + * Delay the termination of manage_consumer_thread threads to allow
> + * proper metadata flushing, following the session destroy. Use a
> + * barrier to ensure that all call_rcu are executed at this point.
> + */
> + DBG("Teardown consurmer thread");
> + rcu_barrier();
> + ret = notify_thread_pipe(thread_consumers_teardown_trigger_pipe[1]);
> + if (ret < 0) {
> + ERR("write error on thread consumer quit pipe");
> + }
> +
> + ret = join_thread_consumer(&kconsumer_data);
> + if (ret) {
> + errno = ret;
> + PERROR("join_consumer kernel");
> + }
> +
> + ret = join_thread_consumer(&ustconsumer32_data);
> + if (ret) {
> + errno = ret;
> + PERROR("join_consumer ust32");
> + }
> +
> + ret = join_thread_consumer(&ustconsumer64_data);
> + if (ret) {
> + errno = ret;
> + PERROR("join_consumer ust64");
> + }
>
> DBG("Cleaning up all agent apps");
> agent_app_ht_clean();
> @@ -1289,14 +1367,20 @@ static void *thread_manage_consumer(void *data)
> health_code_update();
>
> /*
> - * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the
> + * Pass 3 as size here for the thread consumer quit pipe, consumerd_err_sock and the
> * metadata_sock. Nothing more will be added to this poll set.
> */
> - ret = sessiond_set_thread_pollset(&events, 3);
> + ret = lttng_poll_create(&events, 3, LTTNG_CLOEXEC);
> if (ret < 0) {
> goto error_poll;
> }
>
> + /* Add quit pipe */
> + ret = lttng_poll_add(&events, thread_consumers_teardown_trigger_pipe[0], LPOLLIN | LPOLLERR);
> + if (ret < 0) {
> + goto error;
> + }
> +
> /*
> * The error socket here is already in a listening state which was done
> * just before spawning this thread to avoid a race between the consumer
> @@ -1344,7 +1428,7 @@ restart:
> }
>
> /* Thread quit pipe has been closed. Killing thread. */
> - ret = sessiond_check_thread_quit_pipe(pollfd, revents);
> + ret = (pollfd == thread_consumers_teardown_trigger_pipe[0] && (revents & LPOLLIN)) ? 1 : 0;
I don't mind ternaries, but that's a really long expression.
I think this would be more readable using a good-old 'if'.
Also, is LPOLLERR checked somewhere?
> if (ret) {
> err = 0;
> goto exit;
> @@ -1509,7 +1593,7 @@ restart_poll:
> * but continue the current loop to handle potential data from
> * consumer.
> */
> - should_quit = sessiond_check_thread_quit_pipe(pollfd, revents);
> + should_quit = (pollfd == thread_consumers_teardown_trigger_pipe[0] && (revents & LPOLLIN)) ? 1 : 0;
Same comment applies here.
>
> if (pollfd == sock) {
> /* Event on the consumerd socket */
> @@ -1552,11 +1636,6 @@ restart_poll:
>
> exit:
> error:
> - /*
> - * We lock here because we are about to close the sockets and some other
> - * thread might be using them so get exclusive access which will abort all
> - * other consumer command by other threads.
> - */
> pthread_mutex_lock(&consumer_data->lock);
>
> /* Immediately set the consumerd state to stopped */
> @@ -1570,6 +1649,13 @@ error:
> assert(0);
> }
>
> + /*
> + * This thread is responsible for its consumerd. Make sure the
> + * consumerd teardown is complete before proceding.
> + */
> + kill_consumer(consumer_data);
> + wait_consumer(consumer_data);
> +
> if (consumer_data->err_sock >= 0) {
> ret = close(consumer_data->err_sock);
> if (ret) {
> @@ -1600,13 +1686,15 @@ error:
>
> unlink(consumer_data->err_unix_sock_path);
> unlink(consumer_data->cmd_unix_sock_path);
> - pthread_mutex_unlock(&consumer_data->lock);
>
> /* Cleanup metadata socket mutex. */
> if (consumer_data->metadata_sock.lock) {
> pthread_mutex_destroy(consumer_data->metadata_sock.lock);
> free(consumer_data->metadata_sock.lock);
> }
> +
> + pthread_mutex_unlock(&consumer_data->lock);
> +
> lttng_poll_clean(&events);
>
> if (cmd_socket_wrapper) {
> @@ -2560,27 +2648,6 @@ error:
> }
>
> /*
> - * Join consumer thread
> - */
> -static int join_consumer_thread(struct consumer_data *consumer_data)
> -{
> - void *status;
> -
> - /* Consumer pid must be a real one. */
> - if (consumer_data->pid > 0) {
> - int ret;
> - ret = kill(consumer_data->pid, SIGTERM);
> - if (ret) {
> - PERROR("Error killing consumer daemon");
> - return ret;
> - }
> - return pthread_join(consumer_data->thread, &status);
> - } else {
> - return 0;
> - }
> -}
> -
> -/*
> * Fork and exec a consumer daemon (consumerd).
> *
> * Return pid if successful else -1.
> @@ -4741,27 +4808,6 @@ error_create_poll:
>
> rcu_unregister_thread();
>
> - /*
> - * Since we are creating the consumer threads, we own them, so we need
> - * to join them before our thread exits.
> - */
> - ret = join_consumer_thread(&kconsumer_data);
> - if (ret) {
> - errno = ret;
> - PERROR("join_consumer");
> - }
> -
> - ret = join_consumer_thread(&ustconsumer32_data);
> - if (ret) {
> - errno = ret;
> - PERROR("join_consumer ust32");
> - }
> -
> - ret = join_consumer_thread(&ustconsumer64_data);
> - if (ret) {
> - errno = ret;
> - PERROR("join_consumer ust64");
> - }
> return NULL;
> }
>
> @@ -5785,6 +5831,11 @@ int main(int argc, char **argv)
> goto exit_init_data;
> }
>
> + if (init_thread_consumers_teardown_trigger_pipe()) {
> + retval = -1;
> + goto exit_init_data;
> + }
> +
> /* Check if daemon is UID = 0 */
> is_root = !getuid();
>
> @@ -6406,11 +6457,8 @@ exit_init_data:
> * perform lookups in those structures.
> */
> rcu_barrier();
> - /*
> - * sessiond_cleanup() is called when no other thread is running, except
> - * the ht_cleanup thread, which is needed to destroy the hash tables.
> - */
> rcu_thread_online();
> +
> sessiond_cleanup();
>
> /*
> @@ -6461,6 +6509,8 @@ exit_init_data:
> retval = -1;
> }
>
> + /* Consumers thread teardown pipe cleanup */
> + utils_close_pipe(thread_consumers_teardown_trigger_pipe);
> /* Health thread teardown pipe cleanup */
> utils_close_pipe(thread_health_teardown_trigger_pipe);
> /* Apps thread teardown pipe cleanup */
> --
> 2.11.0
>
--
Jérémie Galarneau
EfficiOS Inc.
http://www.efficios.com
More information about the lttng-dev
mailing list