[lttng-dev] [PATCH lttng-tools 1/5] Fix: LPOLLHUP and LPOLLERR when there is still data in pipe/socket

Jérémie Galarneau jeremie.galarneau at efficios.com
Mon Sep 14 17:56:46 EDT 2015


Merged in master and stable-2.7.

Thanks!
Jérémie

On Wed, Sep 9, 2015 at 11:56 AM, Mathieu Desnoyers
<mathieu.desnoyers at efficios.com> wrote:
> The event mask returned by poll/epoll is a bitwise mask made of all the
> events observed. On bidirectional sockets, there are cases where
> combinations of LPOLLHUP/LPOLLERR and LPOLLIN/LPOLLPRI can be raised at
> the same time.
>
> Currently the overall behavior in sessiond, consumerd and relayd is to
> handle LPOLLHUP or LPOLLERR immediately, whether or not there is still
> data to read in the socket. Unfortunately, this behavior may discard the
> last information made available on the pipe or socket.
>
> Audit all uses of LPOLLHUP and LPOLLERR on sockets on which we expect
> data to ensure that we deal with LPOLLIN or LPOLLPRI, and catch the
> hangup when read or recvmsg returns 0. Keep the LPOLLHUP and LPOLLERR
> handling, but only when LPOLLIN is not raised, just in case some
> unforeseen error happens when sending the reply.
>
> This is one correct case where we can handle LPOLLHUP and LPOLLERR
> directly without caring about LPOLLIN: sockets where we are expected to
> write and then read the reply (e.g. command sockets). It is then OK
> for a dedicated thread to watch for LPOLLHUP and LPOLLERR.
>
> Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> ---
>  src/bin/lttng-consumerd/health-consumerd.c |   3 +-
>  src/bin/lttng-relayd/health-relayd.c       |   7 +-
>  src/bin/lttng-relayd/live.c                |  36 +++++----
>  src/bin/lttng-relayd/main.c                |  97 ++++++++++++++---------
>  src/bin/lttng-sessiond/agent-thread.c      |  45 ++++++-----
>  src/bin/lttng-sessiond/ht-cleanup.c        |  43 +++++-----
>  src/bin/lttng-sessiond/main.c              | 102 ++++++++++++++++--------
>  src/bin/lttng-sessiond/ust-thread.c        |  91 +++++++++++-----------
>  src/common/consumer.c                      | 121 +++++++++++++++++------------
>  9 files changed, 319 insertions(+), 226 deletions(-)
>
> diff --git a/src/bin/lttng-consumerd/health-consumerd.c b/src/bin/lttng-consumerd/health-consumerd.c
> index fc9a266..5be1c97 100644
> --- a/src/bin/lttng-consumerd/health-consumerd.c
> +++ b/src/bin/lttng-consumerd/health-consumerd.c
> @@ -275,7 +275,8 @@ restart:
>
>                         /* Event on the registration socket */
>                         if (pollfd == sock) {
> -                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> +                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
> +                                               && !(revents & LPOLLIN)) {
>                                         ERR("Health socket poll error");
>                                         goto error;
>                                 }
> diff --git a/src/bin/lttng-relayd/health-relayd.c b/src/bin/lttng-relayd/health-relayd.c
> index d34a376..be0b0b1 100644
> --- a/src/bin/lttng-relayd/health-relayd.c
> +++ b/src/bin/lttng-relayd/health-relayd.c
> @@ -346,9 +346,14 @@ restart:
>
>                         /* Event on the registration socket */
>                         if (pollfd == sock) {
> -                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> +                               if (revents & LPOLLIN) {
> +                                       continue;
> +                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
>                                         ERR("Health socket poll error");
>                                         goto error;
> +                               } else {
> +                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
> +                                       goto error;
>                                 }
>                         }
>                 }
> diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c
> index a1fbbbe..8da2d62 100644
> --- a/src/bin/lttng-relayd/live.c
> +++ b/src/bin/lttng-relayd/live.c
> @@ -542,10 +542,7 @@ restart:
>                                 goto exit;
>                         }
>
> -                       if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> -                               ERR("socket poll error");
> -                               goto error;
> -                       } else if (revents & LPOLLIN) {
> +                       if (revents & LPOLLIN) {
>                                 /*
>                                  * A new connection is requested, therefore a
>                                  * viewer connection is allocated in this
> @@ -588,6 +585,12 @@ restart:
>                                  * exchange in cds_wfcq_enqueue.
>                                  */
>                                 futex_nto1_wake(&viewer_conn_queue.futex);
> +                       } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> +                               ERR("socket poll error");
> +                               goto error;
> +                       } else {
> +                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
> +                               goto error;
>                         }
>                 }
>         }
> @@ -1908,10 +1911,7 @@ restart:
>
>                         /* Inspect the relay conn pipe for new connection. */
>                         if (pollfd == live_conn_pipe[0]) {
> -                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> -                                       ERR("Relay live pipe error");
> -                                       goto error;
> -                               } else if (revents & LPOLLIN) {
> +                               if (revents & LPOLLIN) {
>                                         struct relay_connection *conn;
>
>                                         ret = lttng_read(live_conn_pipe[0],
> @@ -1923,6 +1923,12 @@ restart:
>                                                         LPOLLIN | LPOLLRDHUP);
>                                         connection_ht_add(viewer_connections_ht, conn);
>                                         DBG("Connection socket %d added to poll", conn->sock->fd);
> +                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> +                                       ERR("Relay live pipe error");
> +                                       goto error;
> +                               } else {
> +                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
> +                                       goto error;
>                                 }
>                         } else {
>                                 /* Connection activity. */
> @@ -1933,11 +1939,7 @@ restart:
>                                         continue;
>                                 }
>
> -                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> -                                       cleanup_connection_pollfd(&events, pollfd);
> -                                       /* Put "create" ownership reference. */
> -                                       connection_put(conn);
> -                               } else if (revents & LPOLLIN) {
> +                               if (revents & LPOLLIN) {
>                                         ret = conn->sock->ops->recvmsg(conn->sock, &recv_hdr,
>                                                         sizeof(recv_hdr), 0);
>                                         if (ret <= 0) {
> @@ -1956,6 +1958,14 @@ restart:
>                                                         DBG("Viewer connection closed with %d", pollfd);
>                                                 }
>                                         }
> +                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> +                                       cleanup_connection_pollfd(&events, pollfd);
> +                                       /* Put "create" ownership reference. */
> +                                       connection_put(conn);
> +                               } else {
> +                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
> +                                       connection_put(conn);
> +                                       goto error;
>                                 }
>                                 /* Put local "get_by_sock" reference. */
>                                 connection_put(conn);
> diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c
> index 2ce9bf0..dcaaaa8 100644
> --- a/src/bin/lttng-relayd/main.c
> +++ b/src/bin/lttng-relayd/main.c
> @@ -864,10 +864,7 @@ restart:
>                                 goto exit;
>                         }
>
> -                       if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> -                               ERR("socket poll error");
> -                               goto error;
> -                       } else if (revents & LPOLLIN) {
> +                       if (revents & LPOLLIN) {
>                                 /*
>                                  * A new connection is requested, therefore a
>                                  * sessiond/consumerd connection is allocated in
> @@ -919,6 +916,12 @@ restart:
>                                  * exchange in cds_wfcq_enqueue.
>                                  */
>                                 futex_nto1_wake(&relay_conn_queue.futex);
> +                       } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> +                               ERR("socket poll error");
> +                               goto error;
> +                       } else {
> +                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
> +                               goto error;
>                         }
>                 }
>         }
> @@ -2429,10 +2432,7 @@ restart:
>
>                         /* Inspect the relay conn pipe for new connection */
>                         if (pollfd == relay_conn_pipe[0]) {
> -                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> -                                       ERR("Relay connection pipe error");
> -                                       goto error;
> -                               } else if (revents & LPOLLIN) {
> +                               if (revents & LPOLLIN) {
>                                         struct relay_connection *conn;
>
>                                         ret = lttng_read(relay_conn_pipe[0], &conn, sizeof(conn));
> @@ -2443,6 +2443,12 @@ restart:
>                                                         LPOLLIN | LPOLLRDHUP);
>                                         connection_ht_add(relay_connections_ht, conn);
>                                         DBG("Connection socket %d added", conn->sock->fd);
> +                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> +                                       ERR("Relay connection pipe error");
> +                                       goto error;
> +                               } else {
> +                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
> +                                       goto error;
>                                 }
>                         } else {
>                                 struct relay_connection *ctrl_conn;
> @@ -2451,29 +2457,8 @@ restart:
>                                 /* If not found, there is a synchronization issue. */
>                                 assert(ctrl_conn);
>
> -                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> -                                       relay_thread_close_connection(&events, pollfd, ctrl_conn);
> -                                       if (last_seen_data_fd == pollfd) {
> -                                               last_seen_data_fd = last_notdel_data_fd;
> -                                       }
> -                               } else if (revents & LPOLLIN) {
> -                                       if (ctrl_conn->type == RELAY_CONTROL) {
> -                                               ret = ctrl_conn->sock->ops->recvmsg(ctrl_conn->sock, &recv_hdr,
> -                                                               sizeof(recv_hdr), 0);
> -                                               if (ret <= 0) {
> -                                                       /* Connection closed */
> -                                                       relay_thread_close_connection(&events, pollfd,
> -                                                               ctrl_conn);
> -                                               } else {
> -                                                       ret = relay_process_control(&recv_hdr, ctrl_conn);
> -                                                       if (ret < 0) {
> -                                                               /* Clear the session on error. */
> -                                                               relay_thread_close_connection(&events, pollfd,
> -                                                                       ctrl_conn);
> -                                                       }
> -                                                       seen_control = 1;
> -                                               }
> -                                       } else {
> +                               if (ctrl_conn->type == RELAY_DATA) {
> +                                       if (revents & LPOLLIN) {
>                                                 /*
>                                                  * Flag the last seen data fd not deleted. It will be
>                                                  * used as the last seen fd if any fd gets deleted in
> @@ -2481,9 +2466,39 @@ restart:
>                                                  */
>                                                 last_notdel_data_fd = pollfd;
>                                         }
> +                                       goto put_ctrl_connection;
> +                               }
> +                               assert(ctrl_conn->type == RELAY_CONTROL);
> +
> +                               if (revents & LPOLLIN) {
> +                                       ret = ctrl_conn->sock->ops->recvmsg(ctrl_conn->sock,
> +                                                       &recv_hdr, sizeof(recv_hdr), 0);
> +                                       if (ret <= 0) {
> +                                               /* Connection closed */
> +                                               relay_thread_close_connection(&events, pollfd,
> +                                                               ctrl_conn);
> +                                       } else {
> +                                               ret = relay_process_control(&recv_hdr, ctrl_conn);
> +                                               if (ret < 0) {
> +                                                       /* Clear the session on error. */
> +                                                       relay_thread_close_connection(&events,
> +                                                                       pollfd, ctrl_conn);
> +                                               }
> +                                               seen_control = 1;
> +                                       }
> +                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> +                                       relay_thread_close_connection(&events,
> +                                                       pollfd, ctrl_conn);
> +                                       if (last_seen_data_fd == pollfd) {
> +                                               last_seen_data_fd = last_notdel_data_fd;
> +                                       }
>                                 } else {
> -                                       ERR("Unknown poll events %u for sock %d", revents, pollfd);
> +                                       ERR("Unexpected poll events %u for control sock %d",
> +                                                       revents, pollfd);
> +                                       connection_put(ctrl_conn);
> +                                       goto error;
>                                 }
> +                       put_ctrl_connection:
>                                 connection_put(ctrl_conn);
>                         }
>                 }
> @@ -2533,17 +2548,17 @@ restart:
>                                 /* Skip it. Might be removed before. */
>                                 continue;
>                         }
> +                       if (data_conn->type == RELAY_CONTROL) {
> +                               goto put_data_connection;
> +                       }
> +                       assert(data_conn->type == RELAY_DATA);
>
>                         if (revents & LPOLLIN) {
> -                               if (data_conn->type != RELAY_DATA) {
> -                                       goto put_connection;
> -                               }
> -
>                                 ret = relay_process_data(data_conn);
>                                 /* Connection closed */
>                                 if (ret < 0) {
>                                         relay_thread_close_connection(&events, pollfd,
> -                                               data_conn);
> +                                                       data_conn);
>                                         /*
>                                          * Every goto restart call sets the last seen fd where
>                                          * here we don't really care since we gracefully
> @@ -2555,8 +2570,14 @@ restart:
>                                         connection_put(data_conn);
>                                         goto restart;
>                                 }
> +                       } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> +                               relay_thread_close_connection(&events, pollfd,
> +                                               data_conn);
> +                       } else {
> +                               ERR("Unknown poll events %u for data sock %d",
> +                                               revents, pollfd);
>                         }
> -               put_connection:
> +               put_data_connection:
>                         connection_put(data_conn);
>                 }
>                 last_seen_data_fd = -1;
> diff --git a/src/bin/lttng-sessiond/agent-thread.c b/src/bin/lttng-sessiond/agent-thread.c
> index d1bb122..2b6f776 100644
> --- a/src/bin/lttng-sessiond/agent-thread.c
> +++ b/src/bin/lttng-sessiond/agent-thread.c
> @@ -296,35 +296,22 @@ restart:
>                                 goto exit;
>                         }
>
> -                       /*
> -                        * Check first if this is a POLLERR since POLLIN is also included
> -                        * in an error value thus checking first.
> -                        */
> -                       if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> -                               /* Removing from the poll set */
> -                               ret = lttng_poll_del(&events, pollfd);
> -                               if (ret < 0) {
> -                                       goto error;
> -                               }
> -
> -                               agent_destroy_app_by_sock(pollfd);
> -                       } else if (revents & (LPOLLIN)) {
> +                       if (revents & LPOLLIN) {
>                                 int new_fd;
>                                 struct agent_app *app = NULL;
>
> -                               /* Pollin event of agent app socket should NEVER happen. */
>                                 assert(pollfd == reg_sock->fd);
> -
>                                 new_fd = handle_registration(reg_sock, &app);
>                                 if (new_fd < 0) {
> -                                       WARN("[agent-thread] agent registration failed. Ignoring.");
> -                                       /* Somehow the communication failed. Just continue. */
>                                         continue;
>                                 }
>                                 /* Should not have a NULL app on success. */
>                                 assert(app);
>
> -                               /* Only add poll error event to only detect shutdown. */
> +                               /*
> +                                * Since this is a command socket (write then read),
> +                                * only add poll error event to only detect shutdown.
> +                                */
>                                 ret = lttng_poll_add(&events, new_fd,
>                                                 LPOLLERR | LPOLLHUP | LPOLLRDHUP);
>                                 if (ret < 0) {
> @@ -336,10 +323,26 @@ restart:
>                                 update_agent_app(app);
>
>                                 /* On failure, the poll will detect it and clean it up. */
> -                               (void) agent_send_registration_done(app);
> +                               ret = agent_send_registration_done(app);
> +                               if (ret < 0) {
> +                                       /* Removing from the poll set */
> +                                       ret = lttng_poll_del(&events, new_fd);
> +                                       if (ret < 0) {
> +                                               goto error;
> +                                       }
> +                                       agent_destroy_app_by_sock(new_fd);
> +                                       continue;
> +                               }
> +                       } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> +                               /* Removing from the poll set */
> +                               ret = lttng_poll_del(&events, pollfd);
> +                               if (ret < 0) {
> +                                       goto error;
> +                               }
> +                               agent_destroy_app_by_sock(pollfd);
>                         } else {
> -                               ERR("Unknown poll events %u for sock %d", revents, pollfd);
> -                               continue;
> +                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
> +                               goto error;
>                         }
>                 }
>         }
> diff --git a/src/bin/lttng-sessiond/ht-cleanup.c b/src/bin/lttng-sessiond/ht-cleanup.c
> index c5f64da..b5f58d0 100644
> --- a/src/bin/lttng-sessiond/ht-cleanup.c
> +++ b/src/bin/lttng-sessiond/ht-cleanup.c
> @@ -103,32 +103,31 @@ restart:
>                                 continue;
>                         }
>
> -                       if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> +                       if (revents & LPOLLIN) {
> +                               /* Get socket from dispatch thread. */
> +                               size_ret = lttng_read(ht_cleanup_pipe[0], &ht,
> +                                               sizeof(ht));
> +                               if (size_ret < sizeof(ht)) {
> +                                       PERROR("ht cleanup notify pipe");
> +                                       goto error;
> +                               }
> +                               health_code_update();
> +                               /*
> +                                * The whole point of this thread is to call
> +                                * lttng_ht_destroy from a context that is NOT:
> +                                * 1) a read-side RCU lock,
> +                                * 2) a call_rcu thread.
> +                                */
> +                               lttng_ht_destroy(ht);
> +
> +                               health_code_update();
> +                       } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
>                                 ERR("ht cleanup pipe error");
>                                 goto error;
> -                       } else if (!(revents & LPOLLIN)) {
> -                               /* No POLLIN and not a catched error, stop the thread. */
> -                               ERR("ht cleanup failed. revent: %u", revents);
> +                       } else {
> +                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
>                                 goto error;
>                         }
> -
> -                       /* Get socket from dispatch thread. */
> -                       size_ret = lttng_read(ht_cleanup_pipe[0], &ht,
> -                                       sizeof(ht));
> -                       if (size_ret < sizeof(ht)) {
> -                               PERROR("ht cleanup notify pipe");
> -                               goto error;
> -                       }
> -                       health_code_update();
> -                       /*
> -                        * The whole point of this thread is to call
> -                        * lttng_ht_destroy from a context that is NOT:
> -                        * 1) a read-side RCU lock,
> -                        * 2) a call_rcu thread.
> -                        */
> -                       lttng_ht_destroy(ht);
> -
> -                       health_code_update();
>                 }
>
>                 for (i = 0; i < nb_fd; i++) {
> diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c
> index 0e7de6e..13bd649 100644
> --- a/src/bin/lttng-sessiond/main.c
> +++ b/src/bin/lttng-sessiond/main.c
> @@ -1165,31 +1165,33 @@ static void *thread_manage_kernel(void *data)
>                         }
>
>                         /* Check for data on kernel pipe */
> -                       if (pollfd == kernel_poll_pipe[0] && (revents & LPOLLIN)) {
> -                               (void) lttng_read(kernel_poll_pipe[0],
> -                                       &tmp, 1);
> -                               /*
> -                                * Ret value is useless here, if this pipe gets any actions an
> -                                * update is required anyway.
> -                                */
> -                               update_poll_flag = 1;
> -                               continue;
> -                       } else {
> -                               /*
> -                                * New CPU detected by the kernel. Adding kernel stream to
> -                                * kernel session and updating the kernel consumer
> -                                */
> -                               if (revents & LPOLLIN) {
> +                       if (revents & LPOLLIN) {
> +                               if (pollfd == kernel_poll_pipe[0]) {
> +                                       (void) lttng_read(kernel_poll_pipe[0],
> +                                               &tmp, 1);
> +                                       /*
> +                                        * Ret value is useless here, if this pipe gets any actions an
> +                                        * update is required anyway.
> +                                        */
> +                                       update_poll_flag = 1;
> +                                       continue;
> +                               } else {
> +                                       /*
> +                                        * New CPU detected by the kernel. Adding kernel stream to
> +                                        * kernel session and updating the kernel consumer
> +                                        */
>                                         ret = update_kernel_stream(&kconsumer_data, pollfd);
>                                         if (ret < 0) {
>                                                 continue;
>                                         }
>                                         break;
> -                                       /*
> -                                        * TODO: We might want to handle the LPOLLERR | LPOLLHUP
> -                                        * and unregister kernel stream at this point.
> -                                        */
>                                 }
> +                       } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> +                               update_poll_flag = 1;
> +                               continue;
> +                       } else {
> +                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
> +                               goto error;
>                         }
>                 }
>         }
> @@ -1319,9 +1321,14 @@ restart:
>
>                 /* Event on the registration socket */
>                 if (pollfd == consumer_data->err_sock) {
> -                       if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> +                       if (revents & LPOLLIN) {
> +                               continue;
> +                       } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
>                                 ERR("consumer err socket poll error");
>                                 goto error;
> +                       } else {
> +                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
> +                               goto error;
>                         }
>                 }
>         }
> @@ -1451,7 +1458,8 @@ restart_poll:
>
>                         if (pollfd == sock) {
>                                 /* Event on the consumerd socket */
> -                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> +                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
> +                                               && !(revents & LPOLLIN)) {
>                                         ERR("consumer err socket second poll error");
>                                         goto error;
>                                 }
> @@ -1469,6 +1477,11 @@ restart_poll:
>
>                                 goto exit;
>                         } else if (pollfd == consumer_data->metadata_fd) {
> +                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
> +                                               && !(revents & LPOLLIN)) {
> +                                       ERR("consumer err metadata socket second poll error");
> +                                       goto error;
> +                               }
>                                 /* UST metadata requests */
>                                 ret = ust_consumer_metadata_request(
>                                                 &consumer_data->metadata_sock);
> @@ -1636,10 +1649,7 @@ static void *thread_manage_apps(void *data)
>
>                         /* Inspect the apps cmd pipe */
>                         if (pollfd == apps_cmd_pipe[0]) {
> -                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> -                                       ERR("Apps command pipe error");
> -                                       goto error;
> -                               } else if (revents & LPOLLIN) {
> +                               if (revents & LPOLLIN) {
>                                         int sock;
>
>                                         /* Empty pipe */
> @@ -1652,9 +1662,8 @@ static void *thread_manage_apps(void *data)
>                                         health_code_update();
>
>                                         /*
> -                                        * We only monitor the error events of the socket. This
> -                                        * thread does not handle any incoming data from UST
> -                                        * (POLLIN).
> +                                        * Since this is a command socket (write then read),
> +                                        * we only monitor the error events of the socket.
>                                          */
>                                         ret = lttng_poll_add(&events, sock,
>                                                         LPOLLERR | LPOLLHUP | LPOLLRDHUP);
> @@ -1663,6 +1672,12 @@ static void *thread_manage_apps(void *data)
>                                         }
>
>                                         DBG("Apps with sock %d added to poll set", sock);
> +                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> +                                       ERR("Apps command pipe error");
> +                                       goto error;
> +                               } else {
> +                                       ERR("Unknown poll events %u for sock %d", revents, pollfd);
> +                                       goto error;
>                                 }
>                         } else {
>                                 /*
> @@ -1678,6 +1693,9 @@ static void *thread_manage_apps(void *data)
>
>                                         /* Socket closed on remote end. */
>                                         ust_app_unregister(pollfd);
> +                               } else {
> +                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
> +                                       goto error;
>                                 }
>                         }
>
> @@ -1825,6 +1843,9 @@ static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue)
>                                  */
>                                 wait_node = NULL;
>                                 break;
> +                       } else {
> +                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
> +                               goto error;
>                         }
>                 }
>         }
> @@ -2185,10 +2206,7 @@ static void *thread_registration_apps(void *data)
>
>                         /* Event on the registration socket */
>                         if (pollfd == apps_sock) {
> -                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> -                                       ERR("Register apps socket poll error");
> -                                       goto error;
> -                               } else if (revents & LPOLLIN) {
> +                               if (revents & LPOLLIN) {
>                                         sock = lttcomm_accept_unix_sock(apps_sock);
>                                         if (sock < 0) {
>                                                 goto error;
> @@ -2275,6 +2293,12 @@ static void *thread_registration_apps(void *data)
>                                          * barrier with the exchange in cds_wfcq_enqueue.
>                                          */
>                                         futex_nto1_wake(&ust_cmd_queue.futex);
> +                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> +                                       ERR("Register apps socket poll error");
> +                                       goto error;
> +                               } else {
> +                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
> +                                       goto error;
>                                 }
>                         }
>                 }
> @@ -4177,9 +4201,14 @@ restart:
>
>                         /* Event on the registration socket */
>                         if (pollfd == sock) {
> -                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> +                               if (revents & LPOLLIN) {
> +                                       continue;
> +                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
>                                         ERR("Health socket poll error");
>                                         goto error;
> +                               } else {
> +                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
> +                                       goto error;
>                                 }
>                         }
>                 }
> @@ -4354,9 +4383,14 @@ static void *thread_manage_clients(void *data)
>
>                         /* Event on the registration socket */
>                         if (pollfd == client_sock) {
> -                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> +                               if (revents & LPOLLIN) {
> +                                       continue;
> +                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
>                                         ERR("Client socket poll error");
>                                         goto error;
> +                               } else {
> +                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
> +                                       goto error;
>                                 }
>                         }
>                 }
> diff --git a/src/bin/lttng-sessiond/ust-thread.c b/src/bin/lttng-sessiond/ust-thread.c
> index b421eb2..9c944c5 100644
> --- a/src/bin/lttng-sessiond/ust-thread.c
> +++ b/src/bin/lttng-sessiond/ust-thread.c
> @@ -57,7 +57,8 @@ void *ust_thread_manage_notify(void *data)
>         }
>
>         /* Add notify pipe to the pollset. */
> -       ret = lttng_poll_add(&events, apps_cmd_notify_pipe[0], LPOLLIN | LPOLLERR);
> +       ret = lttng_poll_add(&events, apps_cmd_notify_pipe[0],
> +                       LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP);
>         if (ret < 0) {
>                 goto error;
>         }
> @@ -109,45 +110,56 @@ restart:
>                         if (pollfd == apps_cmd_notify_pipe[0]) {
>                                 int sock;
>
> -                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> +                               if (revents & LPOLLIN) {
> +                                       /* Get socket from dispatch thread. */
> +                                       size_ret = lttng_read(apps_cmd_notify_pipe[0],
> +                                                       &sock, sizeof(sock));
> +                                       if (size_ret < sizeof(sock)) {
> +                                               PERROR("read apps notify pipe");
> +                                               goto error;
> +                                       }
> +                                       health_code_update();
> +
> +                                       ret = lttng_poll_add(&events, sock,
> +                                                       LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP);
> +                                       if (ret < 0) {
> +                                               /*
> +                                                * It's possible we've reached the max poll fd allowed.
> +                                                * Let's close the socket but continue normal execution.
> +                                                */
> +                                               ret = close(sock);
> +                                               if (ret) {
> +                                                       PERROR("close notify socket %d", sock);
> +                                               }
> +                                               lttng_fd_put(LTTNG_FD_APPS, 1);
> +                                               continue;
> +                                       }
> +                                       DBG3("UST thread notify added sock %d to pollset", sock);
> +                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
>                                         ERR("Apps notify command pipe error");
>                                         goto error;
> -                               } else if (!(revents & LPOLLIN)) {
> -                                       /* No POLLIN and not a catched error, stop the thread. */
> -                                       ERR("Notify command pipe failed. revent: %u", revents);
> -                                       goto error;
> -                               }
> -
> -                               /* Get socket from dispatch thread. */
> -                               size_ret = lttng_read(apps_cmd_notify_pipe[0],
> -                                               &sock, sizeof(sock));
> -                               if (size_ret < sizeof(sock)) {
> -                                       PERROR("read apps notify pipe");
> +                               } else {
> +                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
>                                         goto error;
>                                 }
> -                               health_code_update();
> -
> -                               ret = lttng_poll_add(&events, sock,
> -                                               LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP);
> -                               if (ret < 0) {
> -                                       /*
> -                                        * It's possible we've reached the max poll fd allowed.
> -                                        * Let's close the socket but continue normal execution.
> -                                        */
> -                                       ret = close(sock);
> -                                       if (ret) {
> -                                               PERROR("close notify socket %d", sock);
> -                                       }
> -                                       lttng_fd_put(LTTNG_FD_APPS, 1);
> -                                       continue;
> -                               }
> -                               DBG3("UST thread notify added sock %d to pollset", sock);
>                         } else {
>                                 /*
>                                  * At this point, we know that a registered application
>                                  * triggered the event.
>                                  */
> -                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
> +                               if (revents & (LPOLLIN | LPOLLPRI)) {
> +                                       ret = ust_app_recv_notify(pollfd);
> +                                       if (ret < 0) {
> +                                               /* Removing from the poll set */
> +                                               ret = lttng_poll_del(&events, pollfd);
> +                                               if (ret < 0) {
> +                                                       goto error;
> +                                               }
> +
> +                                               /* The socket is closed after a grace period here. */
> +                                               ust_app_notify_sock_unregister(pollfd);
> +                                       }
> +                               } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
>                                         /* Removing from the poll set */
>                                         ret = lttng_poll_del(&events, pollfd);
>                                         if (ret < 0) {
> @@ -156,22 +168,9 @@ restart:
>
>                                         /* The socket is closed after a grace period here. */
>                                         ust_app_notify_sock_unregister(pollfd);
> -                               } else if (revents & (LPOLLIN | LPOLLPRI)) {
> -                                       ret = ust_app_recv_notify(pollfd);
> -                                       if (ret < 0) {
> -                                               /*
> -                                                * If the notification failed either the application is
> -                                                * dead or an internal error happened. In both cases,
> -                                                * we can only continue here. If the application is
> -                                                * dead, an unregistration will follow or else the
> -                                                * application will notice that we are not responding
> -                                                * on that socket and will close it.
> -                                                */
> -                                               continue;
> -                                       }
>                                 } else {
> -                                       ERR("Unknown poll events %u for sock %d", revents, pollfd);
> -                                       continue;
> +                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
> +                                       goto error;
>                                 }
>                                 health_code_update();
>                         }
> diff --git a/src/common/consumer.c b/src/common/consumer.c
> index c34f47c..c8628e8 100644
> --- a/src/common/consumer.c
> +++ b/src/common/consumer.c
> @@ -2240,26 +2240,22 @@ restart:
>                         }
>
>                         if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
> -                               if (revents & (LPOLLERR | LPOLLHUP )) {
> -                                       DBG("Metadata thread pipe hung up");
> -                                       /*
> -                                        * Remove the pipe from the poll set and continue the loop
> -                                        * since their might be data to consume.
> -                                        */
> -                                       lttng_poll_del(&events,
> -                                                       lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
> -                                       lttng_pipe_read_close(ctx->consumer_metadata_pipe);
> -                                       continue;
> -                               } else if (revents & LPOLLIN) {
> +                               if (revents & LPOLLIN) {
>                                         ssize_t pipe_len;
>
>                                         pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
>                                                         &stream, sizeof(stream));
>                                         if (pipe_len < sizeof(stream)) {
> -                                               PERROR("read metadata stream");
> +                                               if (pipe_len < 0) {
> +                                                       PERROR("read metadata stream");
> +                                               }
>                                                 /*
> -                                                * Continue here to handle the rest of the streams.
> +                                                * Remove the pipe from the poll set and continue the loop
> +                                                * since their might be data to consume.
>                                                  */
> +                                               lttng_poll_del(&events,
> +                                                               lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
> +                                               lttng_pipe_read_close(ctx->consumer_metadata_pipe);
>                                                 continue;
>                                         }
>
> @@ -2276,6 +2272,19 @@ restart:
>                                         /* Add metadata stream to the global poll events list */
>                                         lttng_poll_add(&events, stream->wait_fd,
>                                                         LPOLLIN | LPOLLPRI | LPOLLHUP);
> +                               } else if (revents & (LPOLLERR | LPOLLHUP)) {
> +                                       DBG("Metadata thread pipe hung up");
> +                                       /*
> +                                        * Remove the pipe from the poll set and continue the loop
> +                                        * since their might be data to consume.
> +                                        */
> +                                       lttng_poll_del(&events,
> +                                                       lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
> +                                       lttng_pipe_read_close(ctx->consumer_metadata_pipe);
> +                                       continue;
> +                               } else {
> +                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
> +                                       goto end;
>                                 }
>
>                                 /* Handle other stream */
> @@ -2294,8 +2303,30 @@ restart:
>                         stream = caa_container_of(node, struct lttng_consumer_stream,
>                                         node);
>
> -                       /* Check for error event */
> -                       if (revents & (LPOLLERR | LPOLLHUP)) {
> +                       if (revents & (LPOLLIN | LPOLLPRI)) {
> +                               /* Get the data out of the metadata file descriptor */
> +                               DBG("Metadata available on fd %d", pollfd);
> +                               assert(stream->wait_fd == pollfd);
> +
> +                               do {
> +                                       health_code_update();
> +
> +                                       len = ctx->on_buffer_ready(stream, ctx);
> +                                       /*
> +                                        * We don't check the return value here since if we get
> +                                        * a negative len, it means an error occured thus we
> +                                        * simply remove it from the poll set and free the
> +                                        * stream.
> +                                        */
> +                               } while (len > 0);
> +
> +                               /* It's ok to have an unavailable sub-buffer */
> +                               if (len < 0 && len != -EAGAIN && len != -ENODATA) {
> +                                       /* Clean up stream from consumer and free it. */
> +                                       lttng_poll_del(&events, stream->wait_fd);
> +                                       consumer_del_metadata_stream(stream, metadata_ht);
> +                               }
> +                       } else if (revents & (LPOLLERR | LPOLLHUP)) {
>                                 DBG("Metadata fd %d is hup|err.", pollfd);
>                                 if (!stream->hangup_flush_done
>                                                 && (consumer_data.type == LTTNG_CONSUMER32_UST
> @@ -2323,31 +2354,11 @@ restart:
>                                  * and securely free the stream.
>                                  */
>                                 consumer_del_metadata_stream(stream, metadata_ht);
> -                       } else if (revents & (LPOLLIN | LPOLLPRI)) {
> -                               /* Get the data out of the metadata file descriptor */
> -                               DBG("Metadata available on fd %d", pollfd);
> -                               assert(stream->wait_fd == pollfd);
> -
> -                               do {
> -                                       health_code_update();
> -
> -                                       len = ctx->on_buffer_ready(stream, ctx);
> -                                       /*
> -                                        * We don't check the return value here since if we get
> -                                        * a negative len, it means an error occured thus we
> -                                        * simply remove it from the poll set and free the
> -                                        * stream.
> -                                        */
> -                               } while (len > 0);
> -
> -                               /* It's ok to have an unavailable sub-buffer */
> -                               if (len < 0 && len != -EAGAIN && len != -ENODATA) {
> -                                       /* Clean up stream from consumer and free it. */
> -                                       lttng_poll_del(&events, stream->wait_fd);
> -                                       consumer_del_metadata_stream(stream, metadata_ht);
> -                               }
> +                       } else {
> +                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
> +                               rcu_read_unlock;
> +                               goto end;
>                         }
> -
>                         /* Release RCU lock for the stream looked up */
>                         rcu_read_unlock();
>                 }
> @@ -2812,21 +2823,16 @@ restart:
>                         }
>
>                         if (pollfd == ctx->consumer_channel_pipe[0]) {
> -                               if (revents & (LPOLLERR | LPOLLHUP)) {
> -                                       DBG("Channel thread pipe hung up");
> -                                       /*
> -                                        * Remove the pipe from the poll set and continue the loop
> -                                        * since their might be data to consume.
> -                                        */
> -                                       lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
> -                                       continue;
> -                               } else if (revents & LPOLLIN) {
> +                               if (revents & LPOLLIN) {
>                                         enum consumer_channel_action action;
>                                         uint64_t key;
>
>                                         ret = read_channel_pipe(ctx, &chan, &key, &action);
>                                         if (ret <= 0) {
> -                                               ERR("Error reading channel pipe");
> +                                               if (ret < 0) {
> +                                                       ERR("Error reading channel pipe");
> +                                               }
> +                                               lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
>                                                 continue;
>                                         }
>
> @@ -2843,7 +2849,7 @@ restart:
>                                                 rcu_read_unlock();
>                                                 /* Add channel to the global poll events list */
>                                                 lttng_poll_add(&events, chan->wait_fd,
> -                                                               LPOLLIN | LPOLLPRI);
> +                                                               LPOLLERR | LPOLLHUP);
>                                                 break;
>                                         case CONSUMER_CHANNEL_DEL:
>                                         {
> @@ -2903,6 +2909,17 @@ restart:
>                                                 ERR("Unknown action");
>                                                 break;
>                                         }
> +                               } else if (revents & (LPOLLERR | LPOLLHUP)) {
> +                                       DBG("Channel thread pipe hung up");
> +                                       /*
> +                                        * Remove the pipe from the poll set and continue the loop
> +                                        * since their might be data to consume.
> +                                        */
> +                                       lttng_poll_del(&events, ctx->consumer_channel_pipe[0]);
> +                                       continue;
> +                               } else {
> +                                       ERR("Unexpected poll events %u for sock %d", revents, pollfd);
> +                                       goto end;
>                                 }
>
>                                 /* Handle other stream */
> @@ -2941,6 +2958,10 @@ restart:
>                                                 && !uatomic_read(&chan->nb_init_stream_left)) {
>                                         consumer_del_channel(chan);
>                                 }
> +                       } else {
> +                               ERR("Unexpected poll events %u for sock %d", revents, pollfd);
> +                               rcu_read_unlock();
> +                               goto end;
>                         }
>
>                         /* Release RCU lock for the channel looked up */
> --
> 2.1.4
>



-- 
Jérémie Galarneau
EfficiOS Inc.
http://www.efficios.com



More information about the lttng-dev mailing list